gnutella.cpp
上传用户:chn_coc
上传日期:2007-12-20
资源大小:563k
文件大小:15k
源码类别:

P2P编程

开发平台:

Windows_Unix

  1. // ------------------------------------------------ // File : gnutella.cpp // Date: 4-apr-2002 // Author: giles // Desc:  // GnuPacket is a Gnutella protocol packet. // GnuStream is a Stream that reads/writes GnuPackets // // // (c) 2002 peercast.org //  // ------------------------------------------------ // This program is free software; you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation; either version 2 of the License, or // (at your option) any later version. // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the // GNU General Public License for more details. // ------------------------------------------------ #include "gnutella.h" #include "stream.h" #include "common.h" #include "servent.h" #include "servmgr.h" #include "stats.h" #include <stdlib.h> // --------------------------- const char *GNU_FUNC_STR(int func) { switch (func) { case GNU_FUNC_PING: return "PING"; break; case GNU_FUNC_PONG: return "PONG"; break; case GNU_FUNC_QUERY: return "QUERY"; break; case GNU_FUNC_HIT: return "HIT"; break; case GNU_FUNC_PUSH: return "PUSH"; break; default: return "UNKNOWN"; break; } } // --------------------------- const char *GnuStream::getRouteStr(R_TYPE r) { switch(r) { case R_PROCESS: return "PROCESS"; break; case R_DEAD: return "DEAD"; break; case R_DISCARD: return "DISCARD"; break; case R_ACCEPTED: return "ACCEPTED"; break; case R_BROADCAST: return "BROADCAST"; break; case R_ROUTE: return "ROUTE"; break; case R_DUPLICATE: return "DUPLICATE"; break; case R_BADVERSION: return "BADVERSION"; break; case R_DROP: return "DROP"; break; default: return "UNKNOWN"; break; } } // --------------------------- void GnuPacket::makeChecksumID() { for(unsigned int i=0; i<len; i++) id.id[i%16] += data[i]; } // --------------------------- void GnuPacket::initPing(int t) { func = GNU_FUNC_PING; ttl = t; hops = 0; len = 0; id.generate(); } // --------------------------- void GnuPacket::initPong(Host &h, bool ownPong, GnuPacket &ping) { func = GNU_FUNC_PONG; ttl = ping.hops; hops = 0; len = 14; id = ping.id; MemoryStream data(data,len); data.writeShort(h.port); // port data.writeLong(SWAP4(h.ip)); // ip if (ownPong) { data.writeLong(chanMgr->numChannels()); // cnt data.writeLong(servMgr->totalOutput(false)); // total }else{ data.writeLong(0); // cnt data.writeLong(0); // total } } // --------------------------- void GnuPacket::initPush(ChanHit &ch, Host &sh) {
  2. #if 0 func = GNU_FUNC_PUSH; ttl = ch.numHops; hops = 0; len = 26; id.generate(); MemoryStream data(data,len); // ID of Hit packet data.write(ch.packetID.id,16); // index of channel data.writeLong(ch.index); data.writeLong(SWAP4(sh.ip)); // ip data.writeShort(sh.port); // port
  3. #endif } // --------------------------- bool GnuPacket::initHit(Host &h, Channel *ch, GnuPacket *query, bool push, bool busy, bool stable, bool tracker, int maxttl) { if (!ch)
  4. return false;
  5. func = GNU_FUNC_HIT; hops = 0; id.generate(); ttl = maxttl; MemoryStream mem(data,MAX_DATA); mem.writeChar(1); // num hits mem.writeShort(h.port); // port mem.writeLong(SWAP4(h.ip)); // ip if (query) mem.writeLong(0); // speed - route else mem.writeLong(1); // broadcast //mem.writeLong(ch->index); // index mem.writeLong(0); // index
  6. mem.writeShort(ch->getBitrate()); // bitrate mem.writeShort(ch->localListeners()); // num listeners mem.writeChar(0); // no name XML xml; XML::Node *cn = ch->info.createChannelXML(); cn->add(ch->info.createTrackXML()); xml.setRoot(cn); xml.writeCompact(mem); mem.writeChar(0); // extra null  // QHD mem.writeLong('PCST'); // vendor ID mem.writeChar(2); // public sector length  int f1 = 0, f2 = 0; f1 = 1 | 4 | 8 | 32 | 64; // use push | busy | stable | broadcast | tracker  if (push) f2 |= 1; if (busy) f2 |= 4; if (stable) f2 |= 8; if (!query) f2 |= 32; if (tracker) f2 |= 64;
  7. mem.writeChar(f1); mem.writeChar(f2);
  8. { // write private sector char pbuf[256]; MemoryStream pmem(pbuf,sizeof(pbuf)); XML xml; XML::Node *pn = servMgr->createServentXML(); xml.setRoot(pn); xml.writeCompact(pmem); pmem.writeChar(0); // add null terminator if (pmem.pos <= 255) { mem.writeChar(pmem.pos); mem.write(pmem.buf,pmem.pos); }else mem.writeChar(0);
  9. } // queryID/not used if (query) mem.write(query->id.id,16); else mem.write(id.id,16); len = mem.pos; LOG_NETWORK("Created Hit packet: %d bytes",len); if (len >= MAX_DATA) return false; // servMgr->addReplyID(id); return true; } // --------------------------- void GnuPacket::initFind(const char *str, XML *xml, int maxTTL) { func = GNU_FUNC_QUERY; ttl = maxTTL; hops = 0; id.generate(); MemoryStream mem(data,MAX_DATA); mem.writeShort(0); // min speed if (str) { int slen = strlen(str); mem.write((void *)str,slen+1); // string }else mem.writeChar(0); // null string if (xml) xml->writeCompact(mem); len = mem.pos; } // --------------------------- void GnuStream::ping(int ttl) { GnuPacket ping; ping.initPing(ttl); // servMgr->addReplyID(ping.id); sendPacket(ping); LOG_NETWORK("ping out %02x%02x%02x%02x",ping.id.id[0],ping.id.id[1],ping.id.id[2],ping.id.id[3]); } // --------------------------- void GnuStream::sendPacket(GnuPacket &p) { try  { lock.on(); packetsOut++; stats.add(Stats::NUMPACKETSOUT); switch(p.func) { case GNU_FUNC_PING: stats.add(Stats::NUMPINGOUT); break; case GNU_FUNC_PONG: stats.add(Stats::NUMPONGOUT); break; case GNU_FUNC_QUERY: stats.add(Stats::NUMQUERYOUT); break; case GNU_FUNC_HIT: stats.add(Stats::NUMHITOUT); break; case GNU_FUNC_PUSH: stats.add(Stats::NUMPUSHOUT); break; default: stats.add(Stats::NUMOTHEROUT); break; } write(p.id.id,16); writeChar(p.func); // ping func writeChar(p.ttl); // ttl writeChar(p.hops); // hops writeLong(p.len); // len if (p.len) write(p.data,p.len); stats.add(Stats::PACKETDATAOUT,23+p.len); lock.off(); }catch(StreamException &e) { lock.off(); throw e; } } // --------------------------- bool GnuStream::readPacket(GnuPacket &p) { try  { lock.on(); packetsIn++; stats.add(Stats::NUMPACKETSIN); read(p.id.id,16); p.func = readChar(); p.ttl = readChar(); p.hops = readChar(); p.len = readLong(); if ((p.hops >= 1) && (p.hops <= 10)) stats.add((Stats::STAT)((int)Stats::NUMHOPS1+p.hops-1)); stats.add(Stats::PACKETDATAIN,23+p.len); switch(p.func) { case GNU_FUNC_PING: stats.add(Stats::NUMPINGIN); break; case GNU_FUNC_PONG: stats.add(Stats::NUMPONGIN); break; case GNU_FUNC_QUERY: stats.add(Stats::NUMQUERYIN); break; case GNU_FUNC_HIT: stats.add(Stats::NUMHITIN); break; case GNU_FUNC_PUSH: stats.add(Stats::NUMPUSHIN); break; default: stats.add(Stats::NUMOTHERIN); break; } if (p.len) { if (p.len > GnuPacket::MAX_DATA) { while (p.len--) readChar(); lock.off(); return false; } read(p.data,p.len); } lock.off(); return true; }catch(StreamException &e) { lock.off(); throw e; } } // --------------------------- GnuStream::R_TYPE GnuStream::processPacket(GnuPacket &in, Servent *serv, GnuID &routeID) { R_TYPE ret = R_DISCARD; MemoryStream data(in.data,in.len); Host remoteHost = serv->getHost(); in.ttl--; in.hops++; routeID = in.id; switch(in.func) { case GNU_FUNC_PING: // ping { LOG_NETWORK("ping: from %d.%d.%d.%d : %02x%02x%02x%02x", remoteHost.ip>>24&0xff,remoteHost.ip>>16&0xff,remoteHost.ip>>8&0xff,remoteHost.ip&0xff, in.id.id[0],in.id.id[1],in.id.id[2],in.id.id[3] ); Host sh = servMgr->serverHost; if (sh.isValid())
  10. { if ((servMgr->getFirewall() != ServMgr::FW_ON) && (!servMgr->pubInFull())) { GnuPacket pong; pong.initPong(sh,true,in); if (serv->outputPacket(pong,true)) LOG_NETWORK("pong out"); } ret = R_BROADCAST;
  11. }
  12. } break; case GNU_FUNC_PONG: // pong { { int ip,port,cnt,total; port = data.readShort(); ip = data.readLong(); cnt = data.readLong(); total = data.readLong(); ip = SWAP4(ip); Host h; h.ip = ip; h.port = port; char sIP[64],rIP[64]; h.toStr(sIP); remoteHost.toStr(rIP); LOG_NETWORK("pong: %s via %s : %02x%02x%02x%02x",sIP,ip,rIP,in.id.id[0],in.id.id[1],in.id.id[2],in.id.id[3]); ret = R_DISCARD; if (h.isValid()) {
  13. #if 0 // accept if this pong is a reply from one of our own pings, otherwise route back if (servMgr->isReplyID(in.id)) { servMgr->addHost(h,ServHost::T_SERVENT,sys->getTime()); ret = R_ACCEPTED; }else ret = R_ROUTE; #endif } } } break; case GNU_FUNC_QUERY: // query ret = R_BROADCAST; { Host sh = servMgr->serverHost; if (!sh.isValid()) sh.ip = 127<<24|1; char words[256]; short spd = data.readShort(); data.readString(words,sizeof(words)); words[sizeof(words)-1] = 0; MemoryStream xm(&data.buf[data.pos],data.len-data.pos); xm.buf[xm.len] = 0; Channel *hits[16]; int numHits=0; if (strncmp(xm.buf,"<?xml",5)==0) { XML xml; xml.read(xm); XML::Node *cn = xml.findNode("channel"); if (cn) { ChanInfo info; info.init(cn); info.status = ChanInfo::S_PLAY; numHits = chanMgr->findChannels(info,hits,16); } LOG_NETWORK("query XML: %s : found %d",xm.buf,numHits); }else{ ChanInfo info; info.name.set(words); info.genre.set(words); info.id.fromStr(words); info.status = ChanInfo::S_PLAY; numHits = chanMgr->findChannels(info,hits,16); LOG_NETWORK("query STR: %s : found %d",words,numHits); }
  14. for(int i=0; i<numHits; i++) { bool push = (servMgr->getFirewall()!=ServMgr::FW_OFF); bool busy = (servMgr->pubInFull() && servMgr->outFull()) || servMgr->relaysFull(); bool stable = servMgr->totalStreams>0;
  15. bool tracker =  hits[i]->isBroadcasting();
  16. GnuPacket hit; if (hit.initHit(sh,hits[i],&in,push,busy,stable,tracker,in.hops)) serv->outputPacket(hit,true); } } break; case GNU_FUNC_PUSH: // push { GnuID pid; data.read(pid.id,16); //LOG("push serv= %02x%02x%02x%02x",servMgr->id[0],servMgr->id[1],servMgr->id[2],servMgr->id[3]); //LOG("pack = %02x%02x%02x%02x",id[0],id[1],id[2],id[3]); int index = data.readLong(); int ip = data.readLong(); int port = data.readShort(); ip = SWAP4(ip); Host h(ip,port); char hostName[64]; h.toStr(hostName);
  17. #if 0 if (servMgr->isReplyID(pid)) { #if 0 Channel *c = chanMgr->findChannelByIndex(index); if (!c) { LOG_NETWORK("push 0x%x to %s: Not found",index,hostName); }else { if (!c->isFull() && !servMgr->streamFull()) { LOG_NETWORK("push: 0x%x to %s: OK",index,hostName); Servent *s = servMgr->allocServent(); if (s) s->initGIV(h,c->info.id); }else LOG_NETWORK("push: 0x%x to %s: FULL",index,hostName); }
  18. #endif ret = R_ACCEPTED; }else{ LOG_NETWORK("push: 0x%x to %s: ROUTE",index,hostName); routeID = pid; ret = R_ROUTE; } #endif } break; case GNU_FUNC_HIT: // hit {
  19. ret = R_DISCARD;
  20. ChanHit hit;
  21. if (readHit(data,hit,in.hops,in.id))
  22. {
  23. char flstr[64];
  24. flstr[0]=0;
  25. if (hit.firewalled) strcat(flstr,"Push,");
  26. if (hit.tracker) strcat(flstr,"Tracker,");
  27. #if 0 if ((spd == 0) && (!isBroadcastHit)) { if (servMgr->isReplyID(queryID)) { ret = R_ACCEPTED; LOG_NETWORK("self-hit: %s 0x%02x %s %d chan",hostName,f2,flstr,num); }else { routeID = queryID; ret = R_ROUTE; LOG_NETWORK("route-hit: %s 0x%02x %s %d chan",hostName,f2,flstr,num); } }else { ret = R_BROADCAST; LOG_NETWORK("broadcast-hit: %s 0x%02x %s %d chan",hostName,f2,flstr,num); }
  28. #else
  29. ret = R_BROADCAST;
  30. LOG_NETWORK("broadcast-hit: %s",flstr);
  31. #endif } } break; default: LOG_NETWORK("packet: %d",in.func); break; } if ((in.ttl > 10) || (in.hops > 10) || (in.ttl==0)) if ((ret == R_BROADCAST) || (ret == R_ROUTE)) ret = R_DEAD; return ret; }
  32. // ---------------------------
  33. bool GnuStream::readHit(Stream &data, ChanHit &ch,int hops,GnuID &id)
  34. {
  35. int i;
  36. int num = data.readChar(); // hits
  37. int port = data.readShort(); // port
  38. int ip = data.readLong(); // ip
  39. ip = SWAP4(ip);
  40. int spd = data.readLong(); // speed/broadcast
  41. Host h(ip,port);
  42. char hostName[64];
  43. h.IPtoStr(hostName);
  44. bool dataValid=true;
  45. ChanHit *hits[100];
  46. int numHits=0;
  47. for(i=0; i<num; i++)
  48. {
  49. int index,bitrate,listeners;
  50. index = data.readLong(); // index
  51. bitrate = data.readShort(); // bitrate
  52. listeners = data.readShort(); // listeners
  53. // read name .. not used.
  54. char fname[256];
  55. data.readString(fname,sizeof(fname));
  56. fname[sizeof(fname)-1] = 0;
  57. ch.init();
  58. ch.firewalled = false; // default to NO as we dont get the info until the next section.
  59. ch.host = h;
  60. ch.numListeners = listeners;
  61. ch.numHops = hops;
  62. ch.rhost[0] = ch.host;
  63. ChanInfo info;
  64. {
  65. char xmlData[4000];
  66. int xlen = data.readString(xmlData,sizeof(xmlData));
  67. if ((strncmp(xmlData,"<?xml",5)==0) && (xlen < GnuPacket::MAX_DATA))
  68. {
  69. //LOG_NETWORK("Hit XML: %s",xmlData);
  70. MemoryStream xm(xmlData,xlen);
  71. XML xml;
  72. xml.read(xm);
  73. XML::Node *n = xml.findNode("channel");
  74. if (n)
  75. {
  76. info.init(n);
  77. char idStr[64];
  78. info.id.toStr(idStr);
  79. LOG_NETWORK("got hit %s %s",idStr,info.name.cstr());
  80. ch.upTime = n->findAttrInt("uptime");
  81. }else
  82. LOG_NETWORK("Missing Channel node");
  83. }else
  84. {
  85. LOG_NETWORK("Missing XML data");
  86. //LOG_NETWORK("%s",xmlData);
  87. dataValid = false;
  88. }
  89. }
  90. if (info.id.isSet())
  91. {
  92. if (!chanMgr->findHitList(info))
  93. chanMgr->addHitList(info);
  94. ch.recv = true;
  95. ch.chanID = info.id;
  96. ChanHit *chp = chanMgr->addHit(ch);
  97. if ((chp) && (numHits<100))
  98. hits[numHits++] = chp;
  99. }
  100. }
  101. int vendor = data.readLong(); // vendor ID
  102. int pubLen = data.readChar(); // public sec length - should be 2
  103. int f1 = data.readChar() & 0xff; // flags 1
  104. int f2 = data.readChar() & 0xff; // flags 2
  105. pubLen -= 2;
  106. while (pubLen-->0)
  107. data.readChar();
  108. char agentStr[16];
  109. agentStr[0]=0;
  110. int maxPreviewTime=0;
  111. // read private sector with peercast servant specific info
  112. int privLen = data.readChar();
  113. if (privLen)
  114. {
  115. char privData[256];
  116. data.read(privData,privLen);
  117. if (strncmp(privData,"<?xml",5)==0)
  118. {
  119. MemoryStream xm(privData,privLen);
  120. XML xml;
  121. xml.read(xm);
  122. XML::Node *sn = xml.findNode("servent");
  123. if (sn)
  124. {
  125. char *ag = sn->findAttr("agent");
  126. if (ag)
  127. {
  128. strncpy(agentStr,ag,16);
  129. agentStr[15]=0;
  130. }
  131. maxPreviewTime = sn->findAttrInt("preview");
  132. }
  133. }
  134. }
  135. // not used anymore
  136. GnuID queryID;
  137. data.read(queryID.id,16);
  138. bool isBroadcastHit=false;
  139. if (f1 & 32)
  140. isBroadcastHit = (f2 & 32)!=0;
  141. for(i=0; i<numHits; i++)
  142. {
  143. if (f1 & 1)
  144. hits[i]->firewalled = (f2 & 1)!=0;
  145. if (f1 & 64)
  146. hits[i]->tracker = (f2 & 64)!=0;
  147. }
  148. return dataValid;
  149. }