pcp.cpp
上传用户:chn_coc
上传日期:2007-12-20
资源大小:563k
文件大小:16k
源码类别:

P2P编程

开发平台:

Windows_Unix

  1. // ------------------------------------------------
  2. // File : pcp.cpp
  3. // Date: 1-mar-2004
  4. // Author: giles
  5. //
  6. // (c) 2002-4 peercast.org
  7. // ------------------------------------------------
  8. // This program is free software; you can redistribute it and/or modify
  9. // it under the terms of the GNU General Public License as published by
  10. // the Free Software Foundation; either version 2 of the License, or
  11. // (at your option) any later version.
  12. // This program is distributed in the hope that it will be useful,
  13. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  15. // GNU General Public License for more details.
  16. // ------------------------------------------------
  17. #include "atom.h"
  18. #include "pcp.h"
  19. #include "peercast.h"
  20. #include "version2.h"
  21. // ------------------------------------------
  22. void PCPStream::init(GnuID &rid)
  23. {
  24. remoteID = rid;
  25. routeList.clear();
  26. lastPacketTime = 0;
  27. nextRootPacket = 0;  // 0 seconds (never)
  28. inData.init();
  29. inData.accept = ChanPacket::T_PCP;
  30. outData.init();
  31. outData.accept = ChanPacket::T_PCP;
  32. }
  33. // ------------------------------------------
  34. void PCPStream::readVersion(Stream &in)
  35. {
  36. int len = in.readInt();
  37. if (len != 4)
  38. throw StreamException("Invalid PCP");
  39. int ver = in.readInt();
  40. LOG_DEBUG("PCP ver: %d",ver);
  41. }
  42. // ------------------------------------------
  43. void PCPStream::readHeader(Stream &in,Channel *)
  44. {
  45. // AtomStream atom(in);
  46. // if (in.readInt() != PCP_CONNECT)
  47. // throw StreamException("Not PCP");
  48. // readVersion(in);
  49. }
  50. // ------------------------------------------
  51. bool PCPStream::sendPacket(ChanPacket &pack,GnuID &destID)
  52. {
  53. if (destID.isSet())
  54. if (!destID.isSame(remoteID))
  55. if (!routeList.contains(destID))
  56. return false;
  57. return outData.writePacket(pack);
  58. }
  59. // ------------------------------------------
  60. void PCPStream::flush(Stream &in)
  61. {
  62. ChanPacket pack;
  63. // send outward packets
  64. while (outData.numPending())
  65. {
  66. outData.readPacket(pack);
  67. pack.writeRaw(in);
  68. }
  69. }
  70. // ------------------------------------------
  71. int PCPStream::readPacket(Stream &in,Channel *)
  72. {
  73. BroadcastState bcs;
  74. return readPacket(in,bcs);
  75. }
  76. // ------------------------------------------
  77. int PCPStream::readPacket(Stream &in,BroadcastState &bcs)
  78. {
  79. int error = PCP_ERROR_GENERAL;
  80. try
  81. {
  82. AtomStream atom(in);
  83. ChanPacket pack;
  84. MemoryStream mem(pack.data,sizeof(pack.data));
  85. AtomStream patom(mem);
  86. // send outward packets
  87. error = PCP_ERROR_WRITE;
  88. if (outData.numPending())
  89. {
  90. outData.readPacket(pack);
  91. pack.writeRaw(in);
  92. }
  93. error = PCP_ERROR_GENERAL;
  94. if (outData.willSkip())
  95. {
  96. error = PCP_ERROR_WRITE+PCP_ERROR_SKIP;
  97. throw StreamException("Send too slow");
  98. }
  99. error = PCP_ERROR_READ;
  100. // poll for new downward packet
  101. if (in.readReady())
  102. {
  103. int numc,numd;
  104. ID4 id;
  105. id = atom.read(numc,numd);
  106. mem.rewind();
  107. pack.len = patom.writeAtoms(id, in, numc, numd);
  108. pack.type = ChanPacket::T_PCP;
  109. inData.writePacket(pack);
  110. }
  111. error = PCP_ERROR_GENERAL;
  112. // process downward packets
  113. if (inData.numPending())
  114. {
  115. inData.readPacket(pack);
  116. mem.rewind();
  117. int numc,numd;
  118. ID4 id = patom.read(numc,numd);
  119. error = PCPStream::procAtom(patom,id,numc,numd,bcs);
  120. if (error)
  121. throw StreamException("PCP exception");
  122. }
  123. error = 0;
  124. }catch(StreamException &e)
  125. {
  126. LOG_ERROR("PCP readPacket: %s (%d)",e.msg,error);
  127. }
  128. return error;
  129. }
  130. // ------------------------------------------
  131. void PCPStream::readEnd(Stream &,Channel *)
  132. {
  133. }
  134. // ------------------------------------------
  135. void PCPStream::readPushAtoms(AtomStream &atom, int numc,BroadcastState &bcs)
  136. {
  137. Host host;
  138. GnuID chanID;
  139. chanID.clear();
  140. for(int i=0; i<numc; i++)
  141. {
  142. int c,d;
  143. ID4 id = atom.read(c,d);
  144. if (id == PCP_PUSH_IP)
  145. host.ip = atom.readInt();
  146. else if (id == PCP_PUSH_PORT)
  147. host.port = atom.readShort();
  148. else if (id == PCP_PUSH_CHANID)
  149. atom.readBytes(chanID.id,16);
  150. else
  151. {
  152. LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
  153. atom.skip(c,d);
  154. }
  155. }
  156. if (bcs.forMe)
  157. {
  158. char ipstr[64];
  159. host.toStr(ipstr);
  160. Servent *s = NULL;
  161. if (chanID.isSet())
  162. {
  163. Channel *ch = chanMgr->findChannelByID(chanID);
  164. if (ch)
  165. if (ch->isBroadcasting() || !ch->isFull() && !servMgr->relaysFull() && ch->info.id.isSame(chanID))
  166. s = servMgr->allocServent();
  167. }else{
  168. s = servMgr->allocServent();
  169. }
  170. if (s)
  171. {
  172. LOG_DEBUG("GIVing to %s",ipstr);
  173. s->initGIV(host,chanID);
  174. }
  175. }
  176. }
  177. // ------------------------------------------
  178. void PCPStream::readRootAtoms(AtomStream &atom, int numc,BroadcastState &bcs)
  179. {
  180. String url;
  181. for(int i=0; i<numc; i++)
  182. {
  183. int c,d;
  184. ID4 id = atom.read(c,d);
  185. if (id == PCP_ROOT_UPDINT)
  186. {
  187. int si = atom.readInt();
  188. chanMgr->setUpdateInterval(si);
  189. LOG_DEBUG("PCP got new host update interval: %ds",si);
  190. }else if (id == PCP_ROOT_URL)
  191. {
  192. url = "http://www.peercast.org/";
  193. String loc;
  194. atom.readString(loc.data,sizeof(loc.data),d);
  195. url.append(loc);
  196. }else if (id == PCP_ROOT_CHECKVER)
  197. {
  198. unsigned int newVer = atom.readInt();
  199. if (newVer > PCP_CLIENT_VERSION)
  200. {
  201. strcpy(servMgr->downloadURL,url.cstr());
  202. peercastApp->notifyMessage(ServMgr::NT_UPGRADE,"There is a new version available, please click here to upgrade your client.");
  203. }
  204. LOG_DEBUG("PCP got version check: %d / %d",newVer,PCP_CLIENT_VERSION);
  205. }else if (id == PCP_ROOT_NEXT)
  206. {
  207. unsigned int time = atom.readInt();
  208. if (time)
  209. {
  210. unsigned int ctime = sys->getTime();
  211. nextRootPacket = ctime+time;
  212. LOG_DEBUG("PCP expecting next root packet in %ds",time);
  213. }else
  214. {
  215. nextRootPacket = 0;
  216. }
  217. }else if (id == PCP_ROOT_UPDATE)
  218. {
  219. atom.skip(c,d);
  220. chanMgr->broadcastTrackerUpdate(remoteID,true);
  221. }else if ((id == PCP_MESG_ASCII) || (id == PCP_MESG)) // PCP_MESG_ASCII to be depreciated 
  222. {
  223. String newMsg;
  224. atom.readString(newMsg.data,sizeof(newMsg.data),d);
  225. if (!newMsg.isSame(servMgr->rootMsg.cstr()))
  226. {
  227. servMgr->rootMsg = newMsg;
  228. LOG_DEBUG("PCP got new root mesg: %s",servMgr->rootMsg.cstr());
  229. peercastApp->notifyMessage(ServMgr::NT_PEERCAST,servMgr->rootMsg.cstr());
  230. }
  231. }else
  232. {
  233. LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
  234. atom.skip(c,d);
  235. }
  236. }
  237. }
  238. // ------------------------------------------
  239. void PCPStream::readPktAtoms(Channel *ch,AtomStream &atom,int numc,BroadcastState &bcs)
  240. {
  241. ChanPacket pack;
  242. ID4 type;
  243. for(int i=0; i<numc; i++)
  244. {
  245. int c,d;
  246. ID4 id = atom.read(c,d);
  247. if (id == PCP_CHAN_PKT_TYPE)
  248. {
  249. type = atom.readID4();
  250. if (type == PCP_CHAN_PKT_HEAD)
  251. pack.type = ChanPacket::T_HEAD;
  252. else if (type == PCP_CHAN_PKT_DATA)
  253. pack.type = ChanPacket::T_DATA;
  254. else
  255. pack.type = ChanPacket::T_UNKNOWN;
  256. }else if (id == PCP_CHAN_PKT_POS)
  257. {
  258. pack.pos = atom.readInt();
  259. }else if (id == PCP_CHAN_PKT_DATA)
  260. {
  261. pack.len = d;
  262. atom.readBytes(pack.data,pack.len);
  263. }
  264. else
  265. {
  266. LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
  267. atom.skip(c,d);
  268. }
  269. }
  270. if (ch)
  271. {
  272. int diff = pack.pos - ch->streamPos;
  273. if (diff)
  274. LOG_DEBUG("PCP skipping %s%d (%d -> %d)",(diff>0)?"+":"",diff,ch->streamPos,pack.pos);
  275. if (pack.type == ChanPacket::T_HEAD)
  276. {
  277. LOG_DEBUG("New head packet at %d",pack.pos);
  278. // check for stream restart
  279. if (pack.pos == 0)
  280. {
  281. LOG_CHANNEL("PCP resetting stream");
  282. ch->streamIndex++;
  283. ch->rawData.init();
  284. }
  285. ch->headPack = pack;
  286. ch->rawData.writePacket(pack,true);
  287. ch->streamPos = pack.pos+pack.len;
  288. }else if (pack.type == ChanPacket::T_DATA)
  289. {
  290. ch->rawData.writePacket(pack,true);
  291. ch->streamPos = pack.pos+pack.len;
  292. }
  293. }
  294. // update this parent packet stream position
  295. if ((pack.pos) && (!bcs.streamPos || (pack.pos < bcs.streamPos)))
  296. bcs.streamPos = pack.pos;
  297. }
  298. // -----------------------------------
  299. void PCPStream::readHostAtoms(AtomStream &atom, int numc, BroadcastState &bcs)
  300. {
  301. ChanHit hit;
  302. hit.init();
  303. GnuID chanID = bcs.chanID; //use default
  304. bool busy=false;
  305. unsigned int ipNum=0;
  306. for(int i=0; i<numc; i++)
  307. {
  308. int c,d;
  309. ID4 id = atom.read(c,d);
  310. if (id == PCP_HOST_IP)
  311. {
  312. unsigned int ip = atom.readInt();
  313. hit.rhost[ipNum].ip = ip;
  314. }else if (id == PCP_HOST_PORT)
  315. {
  316. int port = atom.readShort();
  317. hit.rhost[ipNum++].port = port;
  318. if (ipNum > 1)
  319. ipNum = 1;
  320. }
  321. else if (id == PCP_HOST_NUML)
  322. hit.numListeners = atom.readInt();
  323. else if (id == PCP_HOST_NUMR)
  324. hit.numRelays = atom.readInt();
  325. else if (id == PCP_HOST_UPTIME)
  326. hit.upTime = atom.readInt();
  327. else if (id == PCP_HOST_OLDPOS)
  328. hit.oldestPos = atom.readInt();
  329. else if (id == PCP_HOST_NEWPOS)
  330. hit.newestPos = atom.readInt();
  331. else if (id == PCP_HOST_VERSION)
  332. hit.version = atom.readInt();
  333. else if (id == PCP_HOST_FLAGS1)
  334. {
  335. int fl1 = atom.readChar();
  336. hit.recv = (fl1 & PCP_HOST_FLAGS1_RECV) !=0;
  337. hit.relay = (fl1 & PCP_HOST_FLAGS1_RELAY) !=0;
  338. hit.direct = (fl1 & PCP_HOST_FLAGS1_DIRECT) !=0;
  339. hit.cin = (fl1 & PCP_HOST_FLAGS1_CIN) !=0;
  340. hit.tracker = (fl1 & PCP_HOST_FLAGS1_TRACKER) !=0;
  341. hit.firewalled = (fl1 & PCP_HOST_FLAGS1_PUSH) !=0;
  342. }else if (id == PCP_HOST_ID)
  343. atom.readBytes(hit.sessionID.id,16);
  344. else if (id == PCP_HOST_CHANID)
  345. atom.readBytes(chanID.id,16);
  346. else
  347. {
  348. LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
  349. atom.skip(c,d);
  350. }
  351. }
  352. hit.host = hit.rhost[0];
  353. hit.chanID = chanID;
  354. hit.numHops = bcs.numHops;
  355. if (hit.recv)
  356. chanMgr->addHit(hit);
  357. else
  358. chanMgr->delHit(hit);
  359. }
  360. // ------------------------------------------
  361. void PCPStream::readChanAtoms(AtomStream &atom,int numc,BroadcastState &bcs)
  362. {
  363. Channel *ch=NULL;
  364. ChanHitList *chl=NULL;
  365. ChanInfo newInfo;
  366. ch = chanMgr->findChannelByID(bcs.chanID);
  367. chl = chanMgr->findHitListByID(bcs.chanID);
  368. if (ch)
  369. newInfo = ch->info;
  370. else if (chl)
  371. newInfo = chl->info;
  372. for(int i=0; i<numc; i++)
  373. {
  374. int c,d;
  375. ID4 id = atom.read(c,d);
  376. if ((id == PCP_CHAN_PKT) && (ch))
  377. {
  378. readPktAtoms(ch,atom,c,bcs);
  379. }else if (id == PCP_CHAN_INFO)
  380. {
  381. newInfo.readInfoAtoms(atom,c);
  382. }else if (id == PCP_CHAN_TRACK)
  383. {
  384. newInfo.readTrackAtoms(atom,c);
  385. }else if (id == PCP_CHAN_BCID)
  386. {
  387. atom.readBytes(newInfo.bcID.id,16);
  388. }else if (id == PCP_CHAN_KEY) // depreciated
  389. {
  390. atom.readBytes(newInfo.bcID.id,16);
  391. newInfo.bcID.id[0] = 0; // clear flags
  392. }else if (id == PCP_CHAN_ID)
  393. {
  394. atom.readBytes(newInfo.id.id,16);
  395. ch = chanMgr->findChannelByID(newInfo.id);
  396. chl = chanMgr->findHitListByID(newInfo.id);
  397. }else
  398. {
  399. LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
  400. atom.skip(c,d);
  401. }
  402. }
  403. if (!chl)
  404. chl = chanMgr->addHitList(newInfo);
  405. if (chl)
  406. {
  407. chl->info.update(newInfo);
  408. if (!servMgr->chanLog.isEmpty())
  409. {
  410. //if (chl->numListeners())
  411. {
  412. try
  413. {
  414. FileStream file;
  415. file.openWriteAppend(servMgr->chanLog.cstr());
  416.          XML::Node *rn = new XML::Node("update time="%d"",sys->getTime());
  417.         XML::Node *n = chl->info.createChannelXML();
  418.          n->add(chl->createXML(false));
  419.          n->add(chl->info.createTrackXML());
  420. rn->add(n);
  421. rn->write(file,0);
  422. delete rn;
  423. file.close();
  424. }catch(StreamException &e)
  425. {
  426. LOG_ERROR("Unable to update channel log: %s",e.msg);
  427. }
  428. }
  429. }
  430. }
  431. if (ch && !ch->isBroadcasting())
  432. ch->updateInfo(newInfo);
  433. }
  434. // ------------------------------------------
  435. int PCPStream::readBroadcastAtoms(AtomStream &atom,int numc,BroadcastState &bcs)
  436. {
  437. ChanPacket pack;
  438. int ttl=1;
  439. int ver=0;
  440. GnuID fromID,destID;
  441. fromID.clear();
  442. destID.clear();
  443. bcs.initPacketSettings();
  444. MemoryStream pmem(pack.data,sizeof(pack.data));
  445. AtomStream patom(pmem);
  446. patom.writeParent(PCP_BCST,numc);
  447. for(int i=0; i<numc; i++)
  448. {
  449. int c,d;
  450. ID4 id = atom.read(c,d);
  451. if (id == PCP_BCST_TTL)
  452. {
  453. ttl = atom.readChar()-1;
  454. patom.writeChar(id,ttl);
  455. }else if (id == PCP_BCST_HOPS)
  456. {
  457. bcs.numHops = atom.readChar()+1;
  458. patom.writeChar(id,bcs.numHops);
  459. }else if (id == PCP_BCST_FROM)
  460. {
  461. atom.readBytes(fromID.id,16);
  462. patom.writeBytes(id,fromID.id,16);
  463. routeList.add(fromID);
  464. }else if (id == PCP_BCST_GROUP)
  465. {
  466. bcs.group = atom.readChar();
  467. patom.writeChar(id,bcs.group);
  468. }else if (id == PCP_BCST_DEST)
  469. {
  470. atom.readBytes(destID.id,16);
  471. patom.writeBytes(id,destID.id,16);
  472. bcs.forMe = destID.isSame(servMgr->sessionID);
  473. char idstr1[64];
  474. char idstr2[64];
  475. destID.toStr(idstr1);
  476. servMgr->sessionID.toStr(idstr2);
  477. }else if (id == PCP_BCST_CHANID)
  478. {
  479. atom.readBytes(bcs.chanID.id,16);
  480. patom.writeBytes(id,bcs.chanID.id,16);
  481. }else if (id == PCP_BCST_VERSION)
  482. {
  483. ver = atom.readInt();
  484. patom.writeInt(id,ver);
  485. }else
  486. {
  487. // copy and process atoms
  488. int oldPos = pmem.pos;
  489. patom.writeAtoms(id,atom.io,c,d);
  490. pmem.pos = oldPos;
  491. readAtom(patom,bcs);
  492. }
  493. }
  494. char fromStr[64];
  495. fromStr[0] = 0;
  496. if (fromID.isSet())
  497. fromID.toStr(fromStr);
  498. char destStr[64];
  499. destStr[0] = 0;
  500. if (destID.isSet())
  501. destID.toStr(destStr);
  502. LOG_DEBUG("PCP bcst: group=%d, hops=%d, ver=%d, from=%s, dest=%s",bcs.group,bcs.numHops,ver,fromStr,destStr);
  503. if (fromID.isSet())
  504. if (fromID.isSame(servMgr->sessionID))
  505. {
  506. LOG_ERROR("BCST loopback"); 
  507. return PCP_ERROR_BCST+PCP_ERROR_LOOPBACK;
  508. }
  509. // broadcast back out if ttl > 0 
  510. if ((ttl>0) && (!bcs.forMe))
  511. {
  512. pack.len = pmem.pos;
  513. pack.type = ChanPacket::T_PCP;
  514. if (bcs.group & (PCP_BCST_GROUP_ROOT|PCP_BCST_GROUP_TRACKERS|PCP_BCST_GROUP_RELAYS))
  515. {
  516. chanMgr->broadcastPacketUp(pack,bcs.chanID,remoteID,destID);
  517. }
  518. if (bcs.group & (PCP_BCST_GROUP_ROOT|PCP_BCST_GROUP_TRACKERS|PCP_BCST_GROUP_RELAYS))
  519. {
  520. servMgr->broadcastPacket(pack,bcs.chanID,remoteID,destID,Servent::T_COUT);
  521. }
  522. if (bcs.group & (PCP_BCST_GROUP_RELAYS|PCP_BCST_GROUP_TRACKERS))
  523. {
  524. servMgr->broadcastPacket(pack,bcs.chanID,remoteID,destID,Servent::T_CIN);
  525. }
  526. if (bcs.group & (PCP_BCST_GROUP_RELAYS))
  527. {
  528. servMgr->broadcastPacket(pack,bcs.chanID,remoteID,destID,Servent::T_RELAY);
  529. }
  530. }
  531. return 0;
  532. }
  533. // ------------------------------------------
  534. int PCPStream::procAtom(AtomStream &atom,ID4 id,int numc, int dlen,BroadcastState &bcs)
  535. {
  536. int r=0;
  537. if (id == PCP_CHAN)
  538. {
  539. readChanAtoms(atom,numc,bcs);
  540. }else if (id == PCP_ROOT)
  541. {
  542. if (servMgr->isRoot)
  543. throw StreamException("Unauthorized root message");
  544. else
  545. readRootAtoms(atom,numc,bcs);
  546. }else if (id == PCP_HOST)
  547. {
  548. readHostAtoms(atom,numc,bcs);
  549. }else if ((id == PCP_MESG_ASCII) || (id == PCP_MESG)) // PCP_MESG_ASCII to be depreciated
  550. {
  551. String msg;
  552. atom.readString(msg.data,sizeof(msg.data),dlen);
  553. LOG_DEBUG("PCP got text: %s",msg.cstr());
  554. }else if (id == PCP_BCST)
  555. {
  556. r = readBroadcastAtoms(atom,numc,bcs);
  557. }else if (id == PCP_HELO)
  558. {
  559. atom.skip(numc,dlen);
  560. atom.writeParent(PCP_OLEH,1);
  561. atom.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
  562. }else if (id == PCP_PUSH)
  563. {
  564. readPushAtoms(atom,numc,bcs);
  565. }else if (id == PCP_OK)
  566. {
  567. atom.readInt();
  568. }else if (id == PCP_QUIT)
  569. {
  570. r = atom.readInt();
  571. if (!r)
  572. r = PCP_ERROR_QUIT;
  573. }else if (id == PCP_ATOM)
  574. {
  575. for(int i=0; i<numc; i++)
  576. {
  577. int nc,nd;
  578. ID4 aid = atom.read(nc,nd);
  579. int ar = procAtom(atom,aid,nc,nd,bcs);
  580. if (ar)
  581. r = ar;
  582. }
  583. }else
  584. {
  585. LOG_CHANNEL("PCP skip: %s",id.getString().str());
  586. atom.skip(numc,dlen);
  587. }
  588. return r;
  589. }
  590. // ------------------------------------------
  591. int PCPStream::readAtom(AtomStream &atom,BroadcastState &bcs)
  592. {
  593. int numc,dlen;
  594. ID4 id = atom.read(numc,dlen);
  595. return procAtom(atom,id,numc,dlen,bcs);
  596. }