servent.cpp
上传用户:chn_coc
上传日期:2007-12-20
资源大小:563k
文件大小:58k
源码类别:

P2P编程

开发平台:

Windows_Unix

  1. // ------------------------------------------------ // File : servent.cpp // Date: 4-apr-2002 // Author: giles // Desc:  // Servents are the actual connections between clients. They do the handshaking, // transfering of data and processing of GnuPackets. Each servent has one socket allocated // to it on connect, it uses this to transfer all of its data. // // (c) 2002 peercast.org // ------------------------------------------------ // This program is free software; you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation; either version 2 of the License, or // (at your option) any later version. // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the // GNU General Public License for more details. // ------------------------------------------------ // todo: make lan->yp not check firewall #include <stdlib.h> #include "servent.h" #include "sys.h" #include "gnutella.h" #include "xml.h" #include "html.h" #include "http.h" #include "stats.h" #include "servmgr.h" #include "peercast.h" #include "atom.h"
  2. #include "pcp.h"
  3. #include "version2.h"
  4. const int DIRECT_WRITE_TIMEOUT = 60; // ----------------------------------- char *Servent::statusMsgs[]= {         "NONE", "CONNECTING",         "PROTOCOL",         "HANDSHAKE",         "CONNECTED",         "CLOSING", "LISTENING", "TIMEOUT", "REFUSED", "VERIFIED", "ERROR", "WAIT",
  5. "FREE" }; // ----------------------------------- char *Servent::typeMsgs[]= {
  6. "NONE",         "INCOMING",
  7.         "SERVER", "RELAY", "DIRECT",
  8. "COUT",
  9. "CIN",
  10. "PGNU"
  11. }; // ----------------------------------- bool Servent::isPrivate()  { Host h = getHost(); return servMgr->isFiltered(ServFilter::F_PRIVATE,h) || h.isLocalhost(); } // ----------------------------------- bool Servent::isAllowed(int a)  { Host h = getHost(); if (servMgr->isFiltered(ServFilter::F_BAN,h)) return false; return (allow&a)!=0; }
  12. // -----------------------------------
  13. bool Servent::isFiltered(int f) 
  14. {
  15. Host h = getHost();
  16. return servMgr->isFiltered(f,h);
  17. }
  18. // ----------------------------------- Servent::Servent(int index) :outPacketsPri(MAX_OUTPACKETS) ,outPacketsNorm(MAX_OUTPACKETS) ,seenIDs(MAX_HASH) ,serventIndex(index) ,sock(NULL) ,next(NULL) { reset(); } // ----------------------------------- Servent::~Servent() { } // ----------------------------------- void Servent::kill()  {
  19. thread.active = false;
  20. setStatus(S_CLOSING);
  21. if (pcpStream)
  22. {
  23. PCPStream *pcp = pcpStream;
  24. pcpStream = NULL;
  25. pcp->kill();
  26. delete pcp;
  27. }
  28. if (sock)
  29. {
  30. sock->close();
  31. delete sock;
  32. sock = NULL;
  33. }
  34. if (pushSock)
  35. {
  36. pushSock->close();
  37. delete pushSock;
  38. pushSock = NULL;
  39. }
  40. // thread.unlock();
  41. if (type != T_SERVER) { reset(); setStatus(S_FREE); }
  42. } // ----------------------------------- void Servent::abort()  { thread.active = false; if (sock)
  43. { sock->close();
  44. } } // ----------------------------------- void Servent::reset() {
  45. remoteID.clear();
  46. servPort = 0;
  47. pcpStream = NULL;
  48. flowControl = false; networkID.clear();
  49. chanID.clear();
  50. outputProtocol = ChanInfo::SP_UNKNOWN;
  51. agent.clear(); sock = NULL; allow = ALLOW_ALL; syncPos = 0; addMetadata = false;
  52. nsSwitchNum = 0;
  53. pack.func = 255; lastConnect = lastPing = lastPacket = 0; loginPassword[0] = 0; loginMount[0] = 0; bytesPerSecond = 0; priorityConnect = false;
  54. pushSock = NULL;
  55. sendHeader = true; outPacketsNorm.reset(); outPacketsPri.reset(); seenIDs.clear(); status = S_NONE; type = T_NONE; }
  56. // -----------------------------------
  57. bool Servent::sendPacket(ChanPacket &pack,GnuID &cid,GnuID &sid,GnuID &did,Servent::TYPE t)
  58. {
  59. if  (    (type == t) 
  60. && (isConnected())
  61. && (!cid.isSet() || chanID.isSame(cid))
  62. && (!sid.isSet() || !sid.isSame(remoteID))
  63. && (pcpStream != NULL)
  64. )
  65. {
  66. return pcpStream->sendPacket(pack,did);
  67. }
  68. return false;
  69. }
  70. // -----------------------------------
  71. bool Servent::acceptGIV(ClientSocket *givSock)
  72. {
  73. if (!pushSock)
  74. {
  75. pushSock = givSock;
  76. return true;
  77. }else
  78. return false;
  79. }
  80. // ----------------------------------- Host Servent::getHost() { Host h(0,0); if (sock) h = sock->host; return h; } // ----------------------------------- bool Servent::outputPacket(GnuPacket &p, bool pri) { lock.on(); bool r=false; if (pri) r = outPacketsPri.write(p); else { if (servMgr->useFlowControl) { int per = outPacketsNorm.percentFull(); if (per > 50) flowControl = true; else if (per < 25) flowControl = false; } bool send=true; if (flowControl) { // if in flowcontrol, only allow packets with less of a hop count than already in queue if (p.hops >= outPacketsNorm.findMinHop()) send = false; } if (send) r = outPacketsNorm.write(p); } lock.off(); return r; } // ----------------------------------- bool Servent::initServer(Host &h) { try { checkFree(); status = S_WAIT; createSocket(); sock->bind(h); thread.data = this; thread.func = serverProc;
  81. type = T_SERVER; if (!sys->startThread(&thread)) throw StreamException("Can`t start thread"); }catch(StreamException &e) { LOG_ERROR("Bad server: %s",e.msg); kill(); return false; } return true; } // ----------------------------------- void Servent::checkFree() { if (sock) throw StreamException("Socket already set"); if (thread.active) throw StreamException("Thread already active"); } // ----------------------------------- void Servent::initIncoming(ClientSocket *s, unsigned int a) { try{ checkFree(); type = T_INCOMING; sock = s; allow = a; thread.data = this; thread.func = incomingProc; setStatus(S_PROTOCOL);
  82. char ipStr[64];
  83. sock->host.toStr(ipStr);
  84. LOG_DEBUG("Incoming from %s",ipStr);
  85. if (!sys->startThread(&thread)) throw StreamException("Can`t start thread"); }catch(StreamException &e) { //LOG_ERROR("!!FATAL!! Incoming error: %s",e.msg); //servMgr->shutdownTimer = 1;  
  86. kill();
  87. LOG_ERROR("INCOMING FAILED: %s",e.msg);
  88. } } // ----------------------------------- void Servent::initOutgoing(TYPE ty) { try  { checkFree(); type = ty; thread.data = this; thread.func = outgoingProc; if (!sys->startThread(&thread)) throw StreamException("Can`t start thread"); }catch(StreamException &e) { LOG_ERROR("Unable to start outgoing: %s",e.msg); kill(); } }
  89. // -----------------------------------
  90. void Servent::initPCP(Host &rh)
  91. {
  92. char ipStr[64];
  93. rh.toStr(ipStr);
  94. try 
  95. {
  96. checkFree();
  97.     createSocket();
  98. type = T_COUT;
  99. sock->open(rh);
  100. if (!isAllowed(ALLOW_NETWORK))
  101. throw StreamException("Servent not allowed");
  102. thread.data = this;
  103. thread.func = outgoingProc;
  104. LOG_DEBUG("Outgoing to %s",ipStr);
  105. if (!sys->startThread(&thread))
  106. throw StreamException("Can`t start thread");
  107. }catch(StreamException &e)
  108. {
  109. LOG_ERROR("Unable to open connection to %s - %s",ipStr,e.msg);
  110. kill();
  111. }
  112. }
  113. #if 0
  114. // -----------------------------------
  115. void Servent::initChannelFetch(Host &host)
  116. {
  117. type = T_STREAM;
  118. char ipStr[64];
  119. host.toStr(ipStr);
  120. checkFree();
  121.  
  122. createSocket();
  123. sock->open(host);
  124. if (!isAllowed(ALLOW_DATA))
  125. throw StreamException("Servent not allowed");
  126. sock->connect();
  127. }
  128. #endif
  129. // ----------------------------------- void Servent::initGIV(Host &h, GnuID &id) { char ipStr[64]; h.toStr(ipStr); try  { checkFree(); givID = id;     createSocket(); sock->open(h); if (!isAllowed(ALLOW_NETWORK)) throw StreamException("Servent not allowed"); sock->connect(); thread.data = this; thread.func = givProc; type = T_RELAY; if (!sys->startThread(&thread)) throw StreamException("Can`t start thread"); }catch(StreamException &e) { LOG_ERROR("GIV error to %s: %s",ipStr,e.msg); kill(); } } // ----------------------------------- void Servent::createSocket() { if (sock) LOG_ERROR("Servent::createSocket attempt made while active"); sock = sys->createSocket(); } // ----------------------------------- void Servent::setStatus(STATUS s) { if (s != status) { status = s; if ((s == S_HANDSHAKE) || (s == S_CONNECTED) || (s == S_LISTENING)) lastConnect = sys->getTime(); } }
  130. // ----------------------------------- void Servent::handshakeOut() {     sock->writeLine(GNU_PEERCONN); char str[64];      sock->writeLineF("%s %s",HTTP_HS_AGENT,PCX_AGENT);     sock->writeLineF("%s %d",PCX_HS_PCP,1);
  131. if (priorityConnect)     sock->writeLineF("%s %d",PCX_HS_PRIORITY,1); if (networkID.isSet()) { networkID.toStr(str); sock->writeLineF("%s %s",PCX_HS_NETWORKID,str); } servMgr->sessionID.toStr(str); sock->writeLineF("%s %s",PCX_HS_ID,str);     sock->writeLineF("%s %s",PCX_HS_OS,peercastApp->getClientTypeOS()); sock->writeLine(""); HTTP http(*sock);
  132. int r = http.readResponse();
  133. if (r != 200)
  134. {
  135. LOG_ERROR("Expected 200, got %d",r);
  136. throw StreamException("Unexpected HTTP response");
  137. }
  138. bool versionValid = false; GnuID clientID; clientID.clear();     while (http.nextHeader())     { LOG_DEBUG(http.cmdLine); char *arg = http.getArgStr(); if (!arg) continue; if (http.isHeader(HTTP_HS_AGENT)) { agent.set(arg); if (strnicmp(arg,"PeerCast/",9)==0) versionValid = (stricmp(arg+9,MIN_CONNECTVER)>=0); }else if (http.isHeader(PCX_HS_NETWORKID)) clientID.fromStr(arg);     }
  139. if (!clientID.isSame(networkID)) throw HTTPException(HTTP_SC_UNAVAILABLE,503); if (!versionValid) throw HTTPException(HTTP_SC_UNAUTHORIZED,401);     sock->writeLine(GNU_OK);     sock->writeLine("");
  140. }
  141. // -----------------------------------
  142. void Servent::processOutChannel()
  143. {
  144. }
  145. // ----------------------------------- void Servent::handshakeIn() { int osType=0; HTTP http(*sock); bool versionValid = false; bool diffRootVer = false;
  146. GnuID clientID; clientID.clear();     while (http.nextHeader())     { LOG_DEBUG(http.cmdLine); char *arg = http.getArgStr(); if (!arg) continue; if (http.isHeader(HTTP_HS_AGENT)) { agent.set(arg); if (strnicmp(arg,"PeerCast/",9)==0) { versionValid = (stricmp(arg+9,MIN_CONNECTVER)>=0); diffRootVer = stricmp(arg+9,MIN_ROOTVER)<0; } }else if (http.isHeader(PCX_HS_NETWORKID)) { clientID.fromStr(arg); }else if (http.isHeader(PCX_HS_PRIORITY)) { priorityConnect = atoi(arg)!=0;
  147. }else if (http.isHeader(PCX_HS_ID)) { GnuID id; id.fromStr(arg); if (id.isSame(servMgr->sessionID)) throw StreamException("Servent loopback"); }else if (http.isHeader(PCX_HS_OS)) { if (stricmp(arg,PCX_OS_LINUX)==0) osType = 1; else if (stricmp(arg,PCX_OS_WIN32)==0) osType = 2; else if (stricmp(arg,PCX_OS_MACOSX)==0) osType = 3; else if (stricmp(arg,PCX_OS_WINAMP2)==0) osType = 4; }     }
  148. if (!clientID.isSame(networkID)) throw HTTPException(HTTP_SC_UNAVAILABLE,503); // if this is a priority connection and all incoming connections  // are full then kill an old connection to make room. Otherwise reject connection. //if (!priorityConnect) { if (!isPrivate()) if (servMgr->pubInFull()) throw HTTPException(HTTP_SC_UNAVAILABLE,503); } if (!versionValid) throw HTTPException(HTTP_SC_FORBIDDEN,403);     sock->writeLine(GNU_OK);     sock->writeLineF("%s %s",HTTP_HS_AGENT,PCX_OLDAGENT); if (networkID.isSet()) { char idStr[64]; networkID.toStr(idStr); sock->writeLineF("%s %s",PCX_HS_NETWORKID,idStr); } if (servMgr->isRoot) { sock->writeLineF("%s %d",PCX_HS_FLOWCTL,servMgr->useFlowControl?1:0); sock->writeLineF("%s %d",PCX_HS_MINBCTTL,chanMgr->minBroadcastTTL); sock->writeLineF("%s %d",PCX_HS_MAXBCTTL,chanMgr->maxBroadcastTTL); sock->writeLineF("%s %d",PCX_HS_RELAYBC,servMgr->relayBroadcast); //sock->writeLine("%s %d",PCX_HS_FULLHIT,2); if (diffRootVer) { sock->writeString(PCX_HS_DL); sock->writeLine(PCX_DL_URL); } sock->writeLineF("%s %s",PCX_HS_MSG,servMgr->rootMsg.cstr()); } char hostIP[64]; Host h = sock->host; h.IPtoStr(hostIP);     sock->writeLineF("%s %s",PCX_HS_REMOTEIP,hostIP);     sock->writeLine(""); while (http.nextHeader());
  149. }
  150. // -----------------------------------
  151. bool Servent::pingHost(Host &rhost,GnuID &rsid)
  152. {
  153. char ipstr[64];
  154. rhost.toStr(ipstr);
  155. LOG_DEBUG("Ping host %s: trying..",ipstr);
  156. ClientSocket *s=NULL;
  157. bool hostOK=false;
  158. try
  159. {
  160. s = sys->createSocket();
  161. if (!s)
  162. return false;
  163. else
  164. {
  165. s->setReadTimeout(15000);
  166. s->setWriteTimeout(15000);
  167. s->open(rhost);
  168. s->connect();
  169. AtomStream atom(*s);
  170. atom.writeInt(PCP_CONNECT,1);
  171. atom.writeParent(PCP_HELO,1);
  172. atom.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
  173. GnuID sid;
  174. sid.clear();
  175. int numc,numd;
  176. ID4 id = atom.read(numc,numd);
  177. if (id == PCP_OLEH)
  178. {
  179. for(int i=0; i<numc; i++)
  180. {
  181. int c,d;
  182. ID4 pid = atom.read(c,d);
  183. if (pid == PCP_SESSIONID)
  184. atom.readBytes(sid.id,16,d);
  185. else
  186. atom.skip(c,d);
  187. }
  188. }else
  189. {
  190. LOG_DEBUG("Ping response: %s",id.getString().str());
  191. throw StreamException("Bad ping response");
  192. }
  193. if (!sid.isSame(rsid))
  194. throw StreamException("SIDs don`t match");
  195. hostOK = true;
  196. LOG_DEBUG("Ping host %s: OK",ipstr);
  197. atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT);
  198. }
  199. }catch(StreamException &e)
  200. {
  201. LOG_DEBUG("Ping host %s: %s",ipstr,e.msg);
  202. }
  203. if (s)
  204. {
  205. s->close();
  206. delete s;
  207. }
  208. if (!hostOK)
  209. rhost.port = 0;
  210. return true;
  211. }
  212. // ----------------------------------- bool Servent::handshakeStream(ChanInfo &chanInfo) {
  213. HTTP http(*sock);
  214. bool gotPCP=false;
  215. unsigned int reqPos=0;
  216. nsSwitchNum=0;
  217. while (http.nextHeader()) { char *arg = http.getArgStr(); if (!arg) continue; if (http.isHeader(PCX_HS_PCP))
  218. gotPCP = atoi(arg)!=0;
  219. else if (http.isHeader(PCX_HS_POS))
  220. reqPos = atoi(arg); else if (http.isHeader("icy-metadata")) addMetadata = atoi(arg) > 0;
  221. else if (http.isHeader(HTTP_HS_AGENT))
  222. agent = arg;
  223. else if (http.isHeader("Pragma"))
  224. {
  225. char *ssc = stristr(arg,"stream-switch-count=");
  226. char *so = stristr(arg,"stream-offset");
  227. if (ssc || so)
  228. {
  229. nsSwitchNum=1;
  230. //nsSwitchNum = atoi(ssc+20);
  231. }
  232. }
  233. LOG_DEBUG("Stream: %s",http.cmdLine); }
  234. if ((!gotPCP) && (outputProtocol == ChanInfo::SP_PCP))
  235. outputProtocol = ChanInfo::SP_PEERCAST;
  236. if (outputProtocol == ChanInfo::SP_HTTP)
  237. {
  238. if  ( (chanInfo.srcProtocol == ChanInfo::SP_MMS)
  239.   || (chanInfo.contentType == ChanInfo::T_WMA)
  240.   || (chanInfo.contentType == ChanInfo::T_WMV)
  241.   || (chanInfo.contentType == ChanInfo::T_ASX)
  242. )
  243. outputProtocol = ChanInfo::SP_MMS;
  244. }
  245. bool chanFound=false;
  246. bool chanReady=false;
  247. Channel *ch = chanMgr->findChannelByID(chanInfo.id);
  248. if (ch)
  249. {
  250. sendHeader = true;
  251. if (reqPos)
  252. {
  253. streamPos = ch->rawData.findOldestPos(reqPos);
  254. }else
  255. {
  256. streamPos = ch->rawData.getLatestPos();
  257. }
  258. chanReady = canStream(ch);
  259. }
  260. ChanHitList *chl = chanMgr->findHitList(chanInfo);
  261. if (chl)
  262. {
  263. chanFound = true;
  264. }
  265. bool result = false;
  266. char idStr[64];
  267. chanInfo.id.toStr(idStr);
  268. char sidStr[64];
  269. servMgr->sessionID.toStr(sidStr);
  270. Host rhost = sock->host;
  271. AtomStream atom(*sock);
  272. if (!chanFound)
  273. {
  274. sock->writeLine(HTTP_SC_NOTFOUND);
  275.     sock->writeLine("");
  276. LOG_DEBUG("Sending channel not found");
  277. return false;
  278. }
  279. if (!chanReady)
  280. {
  281. if (outputProtocol == ChanInfo::SP_PCP)
  282. {
  283. sock->writeLine(HTTP_SC_UNAVAILABLE);
  284. sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XPCP);
  285. sock->writeLine("");
  286. handshakeIncomingPCP(atom,rhost,remoteID,agent);
  287. char ripStr[64];
  288. rhost.toStr(ripStr);
  289. LOG_DEBUG("Sending channel unavailable");
  290. ChanHitSearch chs;
  291. int error = PCP_ERROR_QUIT+PCP_ERROR_UNAVAILABLE;
  292. if (chl)
  293. {
  294. ChanHit best;
  295. // search for up to 8 other hits
  296. int cnt=0;
  297. for(int i=0; i<8; i++)
  298. {
  299. best.init();
  300. // find best hit this network if local IP
  301. if (!rhost.globalIP())
  302. {
  303. chs.init();
  304. chs.matchHost = servMgr->serverHost;
  305. chs.waitDelay = 2;
  306. chs.excludeID = remoteID;
  307. if (chl->pickHits(chs))
  308. best = chs.best[0];
  309. }
  310. // find best hit on same network
  311. if (!best.host.ip)
  312. {
  313. chs.init();
  314. chs.matchHost = rhost;
  315. chs.waitDelay = 2;
  316. chs.excludeID = remoteID;
  317. if (chl->pickHits(chs))
  318. best = chs.best[0];
  319. }
  320. // find best hit on other networks
  321. if (!best.host.ip)
  322. {
  323. chs.init();
  324. chs.waitDelay = 2;
  325. chs.excludeID = remoteID;
  326. if (chl->pickHits(chs))
  327. best = chs.best[0];
  328. }
  329. if (!best.host.ip)
  330. break;
  331. best.writeAtoms(atom,chanInfo.id);
  332. cnt++;
  333. }
  334. if (cnt)
  335. {
  336. LOG_DEBUG("Sent %d channel hit(s) to %s",cnt,ripStr);
  337. }
  338. else if (rhost.port)
  339. {
  340. // find firewalled host
  341. chs.init();
  342. chs.waitDelay = 30;
  343. chs.useFirewalled = true;
  344. chs.excludeID = remoteID;
  345. if (chl->pickHits(chs))
  346. {
  347. best = chs.best[0];
  348. int cnt = servMgr->broadcastPushRequest(best,rhost,chl->info.id,Servent::T_RELAY);
  349. LOG_DEBUG("Broadcasted channel push request to %d clients for %s",cnt,ripStr);
  350. }
  351. // if all else fails, use tracker
  352. if (!best.host.ip)
  353. {
  354. // find best tracker on this network if local IP
  355. if (!rhost.globalIP())
  356. {
  357. chs.init();
  358. chs.matchHost = servMgr->serverHost;
  359. chs.trackersOnly = true;
  360. chs.excludeID = remoteID;
  361. if (chl->pickHits(chs))
  362. best = chs.best[0];
  363. }
  364. // find local tracker
  365. if (!best.host.ip)
  366. {
  367. chs.init();
  368. chs.matchHost = rhost;
  369. chs.trackersOnly = true;
  370. chs.excludeID = remoteID;
  371. if (chl->pickHits(chs))
  372. best = chs.best[0];
  373. }
  374. // find global tracker
  375. if (!best.host.ip)
  376. {
  377. chs.init();
  378. chs.trackersOnly = true;
  379. chs.excludeID = remoteID;
  380. if (chl->pickHits(chs))
  381. best = chs.best[0];
  382. }
  383. if (best.host.ip)
  384. {
  385. best.writeAtoms(atom,chanInfo.id);
  386. LOG_DEBUG("Sent 1 tracker hit to %s",ripStr);
  387. }else if (rhost.port)
  388. {
  389. // find firewalled tracker
  390. chs.init();
  391. chs.useFirewalled = true;
  392. chs.trackersOnly = true;
  393. chs.excludeID = remoteID;
  394. chs.waitDelay = 30;
  395. if (chl->pickHits(chs))
  396. {
  397. best = chs.best[0];
  398. int cnt = servMgr->broadcastPushRequest(best,rhost,chl->info.id,Servent::T_CIN);
  399. LOG_DEBUG("Broadcasted tracker push request to %d clients for %s",cnt,ripStr);
  400. }
  401. }
  402. }
  403. }
  404. // return not available yet code
  405. atom.writeInt(PCP_QUIT,error);
  406. result = false;
  407. }else
  408. {
  409. LOG_DEBUG("Sending channel unavailable");
  410. sock->writeLine(HTTP_SC_UNAVAILABLE);
  411. sock->writeLine("");
  412. result = false;
  413. }
  414. } else {
  415. if (chanInfo.contentType != ChanInfo::T_MP3)
  416. addMetadata=false;
  417. if (addMetadata && (outputProtocol == ChanInfo::SP_HTTP)) // winamp mp3 metadata check {
  418. sock->writeLine(ICY_OK); sock->writeLineF("%s %s",HTTP_HS_SERVER,PCX_AGENT); sock->writeLineF("icy-name:%s",chanInfo.name.cstr()); sock->writeLineF("icy-br:%d",chanInfo.bitrate); sock->writeLineF("icy-genre:%s",chanInfo.genre.cstr()); sock->writeLineF("icy-url:%s",chanInfo.url.cstr()); sock->writeLineF("icy-metaint:%d",chanMgr->icyMetaInterval); sock->writeLineF("%s %s",PCX_HS_CHANNELID,idStr); sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MP3); }else { sock->writeLine(HTTP_SC_OK); if ((chanInfo.contentType != ChanInfo::T_ASX) && (chanInfo.contentType != ChanInfo::T_WMV) && (chanInfo.contentType != ChanInfo::T_WMA)) { sock->writeLineF("%s %s",HTTP_HS_SERVER,PCX_AGENT); sock->writeLine("Accept-Ranges: none"); sock->writeLineF("x-audiocast-name: %s",chanInfo.name.cstr()); sock->writeLineF("x-audiocast-bitrate: %d",chanInfo.bitrate); sock->writeLineF("x-audiocast-genre: %s",chanInfo.genre.cstr()); sock->writeLineF("x-audiocast-description: %s",chanInfo.desc.cstr()); sock->writeLineF("x-audiocast-url: %s",chanInfo.url.cstr()); sock->writeLineF("%s %s",PCX_HS_CHANNELID,idStr); }
  419. if (outputProtocol == ChanInfo::SP_HTTP) { switch (chanInfo.contentType) { case ChanInfo::T_OGG: sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XOGG); break; case ChanInfo::T_MP3: sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MP3); break; case ChanInfo::T_MOV: sock->writeLine("Connection: close"); sock->writeLine("Content-Length: 10000000"); sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MOV); break; case ChanInfo::T_MPG: sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MPG); break; case ChanInfo::T_NSV: sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_NSV); break; case ChanInfo::T_ASX: sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_ASX);
  420. break;
  421. case ChanInfo::T_WMA:
  422. sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_WMA);
  423. break;
  424. case ChanInfo::T_WMV: sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_WMV); break; } } else if (outputProtocol == ChanInfo::SP_MMS)
  425. {
  426. sock->writeLine("Server: Rex/9.0.0.2980");
  427. sock->writeLine("Cache-Control: no-cache");
  428. sock->writeLine("Pragma: no-cache");
  429. sock->writeLine("Pragma: client-id=3587303426");
  430. sock->writeLine("Pragma: features="broadcast,playlist"");
  431. if (nsSwitchNum)
  432. {
  433. sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MMS);
  434. }else
  435. {
  436. sock->writeLine("Content-Type: application/vnd.ms.wms-hdr.asfv1");
  437. if (ch)
  438. sock->writeLineF("Content-Length: %d",ch->headPack.len);
  439. sock->writeLine("Connection: Keep-Alive");
  440. }
  441. } else if (outputProtocol == ChanInfo::SP_PCP)
  442. {
  443. sock->writeLineF("%s %d",PCX_HS_POS,streamPos);
  444. sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XPCP);
  445. }else if (outputProtocol == ChanInfo::SP_PEERCAST) { sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XPEERCAST); } }
  446. sock->writeLine("");
  447. result = true;
  448. if (gotPCP)
  449. {
  450. handshakeIncomingPCP(atom,rhost,remoteID,agent);
  451. atom.writeInt(PCP_OK,0);
  452. }
  453. }
  454. return result; } // ----------------------------------- void Servent::handshakeGiv(GnuID &id) { if (id.isSet())
  455. { char idstr[64]; id.toStr(idstr); sock->writeLineF("GIV /%s",idstr);
  456. }else sock->writeLine("GIV");
  457. sock->writeLine("");
  458. } // ----------------------------------- void Servent::processGnutella() {
  459. type = T_PGNU;
  460. //if (servMgr->isRoot && !servMgr->needConnections())
  461. if (servMgr->isRoot)
  462. {
  463. processRoot();
  464. return;
  465. }
  466. gnuStream.init(sock); setStatus(S_CONNECTED); if (!servMgr->isRoot)
  467. {
  468. chanMgr->broadcastRelays(this, 1, 1);
  469. GnuPacket *p;
  470. if ((p=outPacketsNorm.curr())) 
  471. gnuStream.sendPacket(*p);
  472. return;
  473. } gnuStream.ping(2); // if (type != T_LOOKUP) // chanMgr->broadcastRelays(this,chanMgr->minBroadcastTTL,2); lastPacket = lastPing = sys->getTime(); bool doneBigPing=false; const unsigned int abortTimeoutSecs = 60; // abort connection after 60 secs of no activitiy const unsigned int packetTimeoutSecs = 30; // ping connection after 30 secs of no activity unsigned int currBytes=0; unsigned int lastWait=0; unsigned int lastTotalIn=0,lastTotalOut=0; while (thread.active && sock->active()) { if (sock->readReady()) { lastPacket = sys->getTime(); if (gnuStream.readPacket(pack)) { char ipstr[64]; sock->host.toStr(ipstr); GnuID routeID; GnuStream::R_TYPE ret = GnuStream::R_PROCESS; if (pack.func != GNU_FUNC_PONG) if (servMgr->seenPacket(pack)) ret = GnuStream::R_DUPLICATE; seenIDs.add(pack.id); if (ret == GnuStream::R_PROCESS) { GnuID routeID; ret = gnuStream.processPacket(pack,this,routeID); if (flowControl && (ret == GnuStream::R_BROADCAST)) ret = GnuStream::R_DROP; } switch(ret) { case GnuStream::R_BROADCAST: if (servMgr->broadcast(pack,this)) stats.add(Stats::NUMBROADCASTED); else stats.add(Stats::NUMDROPPED); break; case GnuStream::R_ROUTE: if (servMgr->route(pack,routeID,NULL)) stats.add(Stats::NUMROUTED); else stats.add(Stats::NUMDROPPED); break; case GnuStream::R_ACCEPTED: stats.add(Stats::NUMACCEPTED); break; case GnuStream::R_DUPLICATE: stats.add(Stats::NUMDUP); break; case GnuStream::R_DEAD: stats.add(Stats::NUMDEAD); break; case GnuStream::R_DISCARD: stats.add(Stats::NUMDISCARDED); break; case GnuStream::R_BADVERSION: stats.add(Stats::NUMOLD); break; case GnuStream::R_DROP: stats.add(Stats::NUMDROPPED); break; } LOG_NETWORK("packet in: %s-%s, %d bytes, %d hops, %d ttl, from %s",GNU_FUNC_STR(pack.func),GnuStream::getRouteStr(ret),pack.len,pack.hops,pack.ttl,ipstr); }else{ LOG_ERROR("Bad packet"); } } GnuPacket *p; if ((p=outPacketsPri.curr())) // priority packet { gnuStream.sendPacket(*p); seenIDs.add(p->id); outPacketsPri.next(); } else if ((p=outPacketsNorm.curr()))  // or.. normal packet { gnuStream.sendPacket(*p); seenIDs.add(p->id); outPacketsNorm.next(); } int lpt =  sys->getTime()-lastPacket; if (!doneBigPing) { if ((sys->getTime()-lastPing) > 15) { gnuStream.ping(7); lastPing = sys->getTime(); doneBigPing = true; } }else{ if (lpt > packetTimeoutSecs) { if ((sys->getTime()-lastPing) > packetTimeoutSecs) { gnuStream.ping(1); lastPing = sys->getTime(); } } } if (lpt > abortTimeoutSecs) throw TimeoutException(); unsigned int totIn = sock->totalBytesIn-lastTotalIn; unsigned int totOut = sock->totalBytesOut-lastTotalOut; unsigned int bytes = totIn+totOut; lastTotalIn = sock->totalBytesIn; lastTotalOut = sock->totalBytesOut; const int serventBandwidth = 1000;
  474. int delay = sys->idleSleepTime; if ((bytes) && (serventBandwidth >= 8)) delay = (bytes*1000)/(serventBandwidth/8); // set delay relative packetsize if (delay < (int)sys->idleSleepTime) delay = sys->idleSleepTime; //LOG("delay %d, in %d, out %d",delay,totIn,totOut); sys->sleep(delay); } }
  475. // ----------------------------------- void Servent::processRoot() { try  { gnuStream.init(sock); setStatus(S_CONNECTED); gnuStream.ping(2);
  476. unsigned int lastConnect = sys->getTime();
  477. while (thread.active && sock->active()) { if (gnuStream.readPacket(pack)) { char ipstr[64]; sock->host.toStr(ipstr); LOG_NETWORK("packet in: %d from %s",pack.func,ipstr); if (pack.func == GNU_FUNC_PING) // if ping then pong back some hosts and close { Host hl[32]; int cnt = servMgr->getNewestServents(hl,32,sock->host); if (cnt) { int start = sys->rnd() % cnt;
  478. int max = cnt>8?8:cnt;
  479. for(int i=0; i<max; i++) { GnuPacket pong; pack.hops = 1; pong.initPong(hl[start],false,pack); gnuStream.sendPacket(pong); char ipstr[64]; hl[start].toStr(ipstr); //LOG_NETWORK("Pong %d: %s",start+1,ipstr); start = (start+1) % cnt; } char str[64]; sock->host.toStr(str); LOG_NETWORK("Sent %d pong(s) to %s",max,str); }else { LOG_NETWORK("No Pongs to send"); //return; } }else if (pack.func == GNU_FUNC_PONG) // pong? { MemoryStream pong(pack.data,pack.len); int ip,port; port = pong.readShort(); ip = pong.readLong(); ip = SWAP4(ip); Host h(ip,port);
  480. if ((ip) && (port) && (h.globalIP())) {
  481. LOG_NETWORK("added pong: %d.%d.%d.%d:%d",ip>>24&0xff,ip>>16&0xff,ip>>8&0xff,ip&0xff,port); servMgr->addHost(h,ServHost::T_SERVENT,sys->getTime()); } //return; } else if (pack.func == GNU_FUNC_HIT)
  482. {
  483. MemoryStream data(pack.data,pack.len);
  484. ChanHit hit;
  485. gnuStream.readHit(data,hit,pack.hops,pack.id);
  486. } //if (gnuStream.packetsIn > 5) // die if we get too many packets // return; }
  487. if((sys->getTime()-lastConnect > 60))
  488. break; } }catch(StreamException &e) { LOG_ERROR("Relay: %s",e.msg); } } // ----------------------------------- int Servent::givProc(ThreadInfo *thread) { // thread->lock(); Servent *sv = (Servent*)thread->data; try  {
  489. sv->handshakeGiv(sv->givID);
  490. sv->handshakeIncoming();
  491. }catch(StreamException &e) { LOG_ERROR("GIV: %s",e.msg); } sv->kill(); sys->endThread(thread);
  492. return 0; }
  493. // -----------------------------------
  494. void Servent::handshakeOutgoingPCP(AtomStream &atom, Host &rhost, GnuID &rid, String &agent, bool isTrusted)
  495. {
  496. bool nonFW = (servMgr->getFirewall() != ServMgr::FW_ON);
  497. bool testFW = (servMgr->getFirewall() == ServMgr::FW_UNKNOWN);
  498. bool sendBCID = isTrusted && chanMgr->isBroadcasting();
  499. atom.writeParent(PCP_HELO,3 + (testFW?1:0) + (nonFW?1:0) + (sendBCID?1:0));
  500. atom.writeString(PCP_HELO_AGENT,PCX_AGENT);
  501. atom.writeInt(PCP_HELO_VERSION,PCP_CLIENT_VERSION);
  502. atom.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
  503. if (nonFW)
  504. atom.writeShort(PCP_HELO_PORT,servMgr->serverHost.port);
  505. if (testFW)
  506. atom.writeShort(PCP_HELO_PING,servMgr->serverHost.port);
  507. if (sendBCID)
  508. atom.writeBytes(PCP_HELO_BCID,chanMgr->broadcastID.id,16);
  509. LOG_DEBUG("PCP outgoing waiting for OLEH..");
  510. int numc,numd;
  511. ID4 id = atom.read(numc,numd);
  512. if (id != PCP_OLEH)
  513. {
  514. LOG_DEBUG("PCP outgoing reply: %s",id.getString().str());
  515. atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_BADRESPONSE);
  516. throw StreamException("Got unexpected PCP response");
  517. }
  518. char arg[64];
  519. GnuID clientID;
  520. clientID.clear();
  521. rid.clear();
  522. int version=0;
  523. int disable=0;
  524. Host thisHost;
  525. // read OLEH response
  526. for(int i=0; i<numc; i++)
  527. {
  528. int c,dlen;
  529. ID4 id = atom.read(c,dlen);
  530. if (id == PCP_HELO_AGENT)
  531. {
  532. atom.readString(arg,sizeof(arg),dlen);
  533. agent.set(arg);
  534. }else if (id == PCP_HELO_REMOTEIP)
  535. {
  536. thisHost.ip = atom.readInt();
  537. }else if (id == PCP_HELO_PORT)
  538. {
  539. thisHost.port = atom.readShort();
  540. }else if (id == PCP_HELO_VERSION)
  541. {
  542. version = atom.readInt();
  543. }else if (id == PCP_HELO_DISABLE)
  544. {
  545. disable = atom.readInt();
  546. }else if (id == PCP_HELO_SESSIONID)
  547. {
  548. atom.readBytes(rid.id,16);
  549. if (rid.isSame(servMgr->sessionID))
  550. throw StreamException("Servent loopback");
  551. }else
  552. {
  553. LOG_DEBUG("PCP handshake skip: %s",id.getString().str());
  554. atom.skip(c,dlen);
  555. }
  556.     }
  557. // update server ip/firewall status
  558. if (isTrusted)
  559. {
  560. if (thisHost.isValid())
  561. {
  562. if ((servMgr->serverHost.ip != thisHost.ip) && (servMgr->forceIP.isEmpty()))
  563. {
  564. char ipstr[64];
  565. thisHost.toStr(ipstr);
  566. LOG_DEBUG("Got new ip: %s",ipstr);
  567. servMgr->serverHost.ip = thisHost.ip;
  568. }
  569. if (servMgr->getFirewall() == ServMgr::FW_UNKNOWN)
  570. {
  571. if (thisHost.port && thisHost.globalIP())
  572. servMgr->setFirewall(ServMgr::FW_OFF);
  573. else
  574. servMgr->setFirewall(ServMgr::FW_ON);
  575. }
  576. }
  577. if (disable == 1)
  578. {
  579. LOG_ERROR("client disabled: %d",disable);
  580. servMgr->isDisabled = true;
  581. }else
  582. {
  583. servMgr->isDisabled = false;
  584. }
  585. }
  586. if (!rid.isSet())
  587. {
  588. atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_NOTIDENTIFIED);
  589. throw StreamException("Remote host not identified");
  590. }
  591. LOG_DEBUG("PCP Outgoing handshake complete.");
  592. }
  593. // -----------------------------------
  594. void Servent::handshakeIncomingPCP(AtomStream &atom, Host &rhost, GnuID &rid, String &agent)
  595. {
  596. int numc,numd;
  597. ID4 id = atom.read(numc,numd);
  598. if (id != PCP_HELO)
  599. {
  600. LOG_DEBUG("PCP incoming reply: %s",id.getString().str());
  601. atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_BADRESPONSE);
  602. throw StreamException("Got unexpected PCP response");
  603. }
  604. char arg[64];
  605. ID4 osType;
  606. int version=0;
  607. int pingPort=0;
  608. GnuID bcID;
  609. GnuID clientID;
  610. bcID.clear();
  611. clientID.clear();
  612. rhost.port = 0;
  613. for(int i=0; i<numc; i++)
  614. {
  615. int c,dlen;
  616. ID4 id = atom.read(c,dlen);
  617. if (id == PCP_HELO_AGENT)
  618. {
  619. atom.readString(arg,sizeof(arg),dlen);
  620. agent.set(arg);
  621. }else if (id == PCP_HELO_VERSION)
  622. {
  623. version = atom.readInt();
  624. }else if (id == PCP_HELO_SESSIONID)
  625. {
  626. atom.readBytes(rid.id,16);
  627. if (rid.isSame(servMgr->sessionID))
  628. throw StreamException("Servent loopback");
  629. }else if (id == PCP_HELO_BCID)
  630. {
  631. atom.readBytes(bcID.id,16);
  632. }else if (id == PCP_HELO_OSTYPE)
  633. {
  634. osType = atom.readInt();
  635. }else if (id == PCP_HELO_PORT)
  636. {
  637. rhost.port = atom.readShort();
  638. }else if (id == PCP_HELO_PING)
  639. {
  640. pingPort = atom.readShort();
  641. }else
  642. {
  643. LOG_DEBUG("PCP handshake skip: %s",id.getString().str());
  644. atom.skip(c,dlen);
  645. }
  646.     }
  647. if (version)
  648. LOG_DEBUG("Incoming PCP is %s : v%d", agent.cstr(),version);
  649. if (!rhost.globalIP() && servMgr->serverHost.globalIP())
  650. rhost.ip = servMgr->serverHost.ip;
  651. if (pingPort)
  652. {
  653. char ripStr[64];
  654. rhost.toStr(ripStr);
  655. LOG_DEBUG("Incoming firewalled test request: %s ", ripStr);
  656. rhost.port = pingPort;
  657. if (!rhost.globalIP() || !pingHost(rhost,rid))
  658. rhost.port = 0;
  659. }
  660. if (servMgr->isRoot)
  661. {
  662. if (bcID.isSet())
  663. {
  664. if (bcID.getFlags() & 1) // private
  665. {
  666. BCID *bcid = servMgr->findValidBCID(bcID);
  667. if (!bcid || (bcid && !bcid->valid))
  668. {
  669. atom.writeParent(PCP_OLEH,1);
  670. atom.writeInt(PCP_HELO_DISABLE,1);
  671. throw StreamException("Client is banned");
  672. }
  673. }
  674. }
  675. }
  676. atom.writeParent(PCP_OLEH,5);
  677. atom.writeString(PCP_HELO_AGENT,PCX_AGENT);
  678. atom.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
  679. atom.writeInt(PCP_HELO_VERSION,PCP_CLIENT_VERSION);
  680. atom.writeInt(PCP_HELO_REMOTEIP,rhost.ip);
  681. atom.writeShort(PCP_HELO_PORT,rhost.port);
  682. if (version)
  683. {
  684. if (version < PCP_CLIENT_MINVERSION)
  685. {
  686. atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_BADAGENT);
  687. throw StreamException("Agent is not valid");
  688. }
  689. }
  690. if (!rid.isSet())
  691. {
  692. atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_NOTIDENTIFIED);
  693. throw StreamException("Remote host not identified");
  694. }
  695. if (servMgr->isRoot)
  696. {
  697. servMgr->writeRootAtoms(atom,false);
  698. }
  699. LOG_DEBUG("PCP Incoming handshake complete.");
  700. }
  701. // -----------------------------------
  702. void Servent::processIncomingPCP(bool suggestOthers)
  703. {
  704. PCPStream::readVersion(*sock);
  705. AtomStream atom(*sock);
  706. Host rhost = sock->host;
  707. handshakeIncomingPCP(atom,rhost,remoteID,agent);
  708. bool alreadyConnected = (servMgr->findConnection(Servent::T_COUT,remoteID)!=NULL)
  709. || (servMgr->findConnection(Servent::T_CIN,remoteID)!=NULL);
  710. bool unavailable = servMgr->controlInFull();
  711. bool offair = !servMgr->isRoot && !chanMgr->isBroadcasting();
  712. char rstr[64];
  713. rhost.toStr(rstr);
  714. if (unavailable || alreadyConnected || offair)
  715. {
  716. int error;
  717. if (alreadyConnected)
  718. error = PCP_ERROR_QUIT+PCP_ERROR_ALREADYCONNECTED;
  719. else if (unavailable)
  720. error = PCP_ERROR_QUIT+PCP_ERROR_UNAVAILABLE;
  721. else if (offair)
  722. error = PCP_ERROR_QUIT+PCP_ERROR_OFFAIR;
  723. else 
  724. error = PCP_ERROR_QUIT;
  725. if (suggestOthers)
  726. {
  727. ChanHit best;
  728. ChanHitSearch chs;
  729. int cnt=0;
  730. for(int i=0; i<8; i++)
  731. {
  732. best.init();
  733. // find best hit on this network
  734. if (!rhost.globalIP())
  735. {
  736. chs.init();
  737. chs.matchHost = servMgr->serverHost;
  738. chs.waitDelay = 2;
  739. chs.excludeID = remoteID;
  740. chs.trackersOnly = true;
  741. chs.useBusyControls = false;
  742. if (chanMgr->pickHits(chs))
  743. best = chs.best[0];
  744. }
  745. // find best hit on same network
  746. if (!best.host.ip)
  747. {
  748. chs.init();
  749. chs.matchHost = rhost;
  750. chs.waitDelay = 2;
  751. chs.excludeID = remoteID;
  752. chs.trackersOnly = true;
  753. chs.useBusyControls = false;
  754. if (chanMgr->pickHits(chs))
  755. best = chs.best[0];
  756. }
  757. // else find best hit on other networks
  758. if (!best.host.ip)
  759. {
  760. chs.init();
  761. chs.waitDelay = 2;
  762. chs.excludeID = remoteID;
  763. chs.trackersOnly = true;
  764. chs.useBusyControls = false;
  765. if (chanMgr->pickHits(chs))
  766. best = chs.best[0];
  767. }
  768. if (!best.host.ip)
  769. break;
  770. GnuID noID;
  771. noID.clear();
  772. best.writeAtoms(atom,noID);
  773. cnt++;
  774. }
  775. if (cnt)
  776. {
  777. LOG_DEBUG("Sent %d tracker(s) to %s",cnt,rstr);
  778. }
  779. else if (rhost.port)
  780. {
  781. // send push request to best firewalled tracker on other network
  782. chs.init();
  783. chs.waitDelay = 30;
  784. chs.excludeID = remoteID;
  785. chs.trackersOnly = true;
  786. chs.useFirewalled = true;
  787. chs.useBusyControls = false;
  788. if (chanMgr->pickHits(chs))
  789. {
  790. best = chs.best[0];
  791. GnuID noID;
  792. noID.clear();
  793. int cnt = servMgr->broadcastPushRequest(best,rhost,noID,Servent::T_CIN);
  794. LOG_DEBUG("Broadcasted tracker push request to %d clients for %s",cnt,rstr);
  795. }
  796. }
  797. }
  798. LOG_ERROR("Sending QUIT to incoming: %d",error);
  799. atom.writeInt(PCP_QUIT,error);
  800. return;
  801. }
  802. type = T_CIN;
  803. setStatus(S_CONNECTED);
  804. atom.writeInt(PCP_OK,0);
  805. // ask for update
  806. atom.writeParent(PCP_ROOT,1);
  807. atom.writeParent(PCP_ROOT_UPDATE,0);
  808. pcpStream = new PCPStream(remoteID);
  809. int error = 0;
  810. BroadcastState bcs;
  811. while (!error && thread.active && !sock->eof())
  812. {
  813. error = pcpStream->readPacket(*sock,bcs);
  814. sys->sleepIdle();
  815. if (!servMgr->isRoot && !chanMgr->isBroadcasting())
  816. error = PCP_ERROR_OFFAIR;
  817. if (peercastInst->isQuitting)
  818. error = PCP_ERROR_SHUTDOWN;
  819. }
  820. pcpStream->flush(*sock);
  821. error += PCP_ERROR_QUIT;
  822. atom.writeInt(PCP_QUIT,error);
  823. LOG_DEBUG("PCP Incoming to %s closed: %d",rstr,error);
  824. }
  825. // ----------------------------------- int Servent::outgoingProc(ThreadInfo *thread) { // thread->lock(); LOG_DEBUG("COUT started");
  826. Servent *sv = (Servent*)thread->data;
  827. GnuID noID;
  828. noID.clear();
  829. sv->pcpStream = new PCPStream(noID);
  830. while (sv->thread.active)
  831. {
  832. sv->setStatus(S_WAIT);
  833. if (chanMgr->isBroadcasting() && servMgr->autoServe)
  834. {
  835. ChanHit bestHit;
  836. ChanHitSearch chs;
  837. char ipStr[64];
  838. do
  839. {
  840. bestHit.init();
  841. if (servMgr->rootHost.isEmpty())
  842. break;
  843. if (sv->pushSock)
  844. {
  845. sv->sock = sv->pushSock;
  846. sv->pushSock = NULL;
  847. bestHit.host = sv->sock->host;
  848. break;
  849. }
  850. GnuID noID;
  851. noID.clear();
  852. ChanHitList *chl = chanMgr->findHitListByID(noID);
  853. if (chl)
  854. {
  855. // find local tracker
  856. chs.init();
  857. chs.matchHost = servMgr->serverHost;
  858. chs.waitDelay = MIN_TRACKER_RETRY;
  859. chs.excludeID = servMgr->sessionID;
  860. chs.trackersOnly = true;
  861. if (!chl->pickHits(chs))
  862. {
  863. // else find global tracker
  864. chs.init();
  865. chs.waitDelay = MIN_TRACKER_RETRY;
  866. chs.excludeID = servMgr->sessionID;
  867. chs.trackersOnly = true;
  868. chl->pickHits(chs);
  869. }
  870. if (chs.numResults)
  871. {
  872. bestHit = chs.best[0];
  873. }
  874. }
  875. unsigned int ctime = sys->getTime();
  876. if ((!bestHit.host.ip) && ((ctime-chanMgr->lastYPConnect) > MIN_YP_RETRY))
  877. {
  878. bestHit.host.fromStrName(servMgr->rootHost.cstr(),DEFAULT_PORT);
  879. bestHit.yp = true;
  880. chanMgr->lastYPConnect = ctime;
  881. }
  882. sys->sleepIdle();
  883. }while (!bestHit.host.ip && (sv->thread.active));
  884. if (!bestHit.host.ip) // give up
  885. {
  886. LOG_ERROR("COUT giving up");
  887. break;
  888. }
  889. bestHit.host.toStr(ipStr);
  890. int error=0;
  891. try  {
  892. LOG_DEBUG("COUT to %s: Connecting..",ipStr);
  893. if (!sv->sock)
  894. {
  895. sv->setStatus(S_CONNECTING);
  896. sv->sock = sys->createSocket();
  897. if (!sv->sock)
  898. throw StreamException("Unable to create socket");
  899. sv->sock->open(bestHit.host);
  900. sv->sock->connect();
  901. }
  902. sv->sock->setReadTimeout(30000);
  903. AtomStream atom(*sv->sock);
  904. sv->setStatus(S_HANDSHAKE);
  905. Host rhost = sv->sock->host;
  906. atom.writeInt(PCP_CONNECT,1);
  907. handshakeOutgoingPCP(atom,rhost,sv->remoteID,sv->agent,bestHit.yp);
  908. sv->setStatus(S_CONNECTED);
  909. LOG_DEBUG("COUT to %s: OK",ipStr);
  910. sv->pcpStream->init(sv->remoteID);
  911. BroadcastState bcs;
  912. error = 0;
  913. while (!error && sv->thread.active && !sv->sock->eof() && servMgr->autoServe)
  914. {
  915. error = sv->pcpStream->readPacket(*sv->sock,bcs);
  916. sys->sleepIdle();
  917. if (!chanMgr->isBroadcasting())
  918. error = PCP_ERROR_OFFAIR;
  919. if (peercastInst->isQuitting)
  920. error = PCP_ERROR_SHUTDOWN;
  921. if (sv->pcpStream->nextRootPacket)
  922. if (sys->getTime() > (sv->pcpStream->nextRootPacket+30))
  923. error = PCP_ERROR_NOROOT;
  924. }
  925. sv->setStatus(S_CLOSING);
  926. sv->pcpStream->flush(*sv->sock);
  927. error += PCP_ERROR_QUIT;
  928. atom.writeInt(PCP_QUIT,error);
  929. LOG_ERROR("COUT to %s closed: %d",ipStr,error);
  930. }catch(TimeoutException &e) { LOG_ERROR("COUT to %s: timeout (%s)",ipStr,e.msg); sv->setStatus(S_TIMEOUT);
  931. }catch(StreamException &e) { LOG_ERROR("COUT to %s: %s",ipStr,e.msg); sv->setStatus(S_ERROR); }
  932. try
  933. {
  934. if (sv->sock)
  935. {
  936. sv->sock->close();
  937. delete sv->sock;
  938. sv->sock = NULL;
  939. }
  940. }catch(StreamException &) {}
  941. // don`t discard this hit if we caused the disconnect (stopped broadcasting)
  942. if (error != (PCP_ERROR_QUIT+PCP_ERROR_OFFAIR))
  943. chanMgr->deadHit(bestHit);
  944. }
  945. sys->sleepIdle();
  946. } sv->kill(); sys->endThread(thread);
  947. LOG_DEBUG("COUT ended");
  948. return 0; } // ----------------------------------- int Servent::incomingProc(ThreadInfo *thread) { // thread->lock(); Servent *sv = (Servent*)thread->data;
  949. char ipStr[64];
  950. sv->sock->host.toStr(ipStr);
  951. try  { sv->handshakeIncoming(); }catch(HTTPException &e) { try { sv->sock->writeLine(e.msg); if (e.code == 401) sv->sock->writeLine("WWW-Authenticate: Basic realm="PeerCast""); sv->sock->writeLine(""); }catch(StreamException &){} LOG_ERROR("Incoming from %s: %s",ipStr,e.msg); }catch(StreamException &e) { LOG_ERROR("Incoming from %s: %s",ipStr,e.msg); }
  952. sv->kill();
  953. sys->endThread(thread);
  954. return 0; } // ----------------------------------- void Servent::processServent() { setStatus(S_HANDSHAKE); handshakeIn(); if (!sock) throw StreamException("Servent has no socket");
  955. processGnutella(); } // ----------------------------------- void Servent::processStream(bool doneHandshake,ChanInfo &chanInfo) {
  956. if (!doneHandshake)
  957. {
  958. setStatus(S_HANDSHAKE);
  959. if (!handshakeStream(chanInfo))
  960. return;
  961. }
  962. if (chanInfo.id.isSet())
  963. {
  964. chanID = chanInfo.id;
  965. LOG_CHANNEL("Sending channel: %s ",ChanInfo::getProtocolStr(outputProtocol));
  966. if (!waitForChannelHeader(chanInfo))
  967. throw StreamException("Channel not ready");
  968. servMgr->totalStreams++; Host host = sock->host; host.port = 0; // force to 0 so we ignore the incoming port
  969. Channel *ch = chanMgr->findChannelByID(chanID);
  970. if (!ch)
  971. throw StreamException("Channel not found");
  972. if (outputProtocol == ChanInfo::SP_HTTP) { if ((addMetadata) && (chanMgr->icyMetaInterval)) sendRawMetaChannel(chanMgr->icyMetaInterval); else 
  973. sendRawChannel(true,true);
  974. }else if (outputProtocol == ChanInfo::SP_MMS)
  975. {
  976. if (nsSwitchNum)
  977. {
  978. sendRawChannel(true,true);
  979. }else
  980. {
  981. sendRawChannel(true,false);
  982. }
  983. }else if (outputProtocol  == ChanInfo::SP_PCP) { sendPCPChannel();
  984. } else if (outputProtocol  == ChanInfo::SP_PEERCAST)
  985. {
  986. sendPeercastChannel();
  987. }
  988. }
  989. setStatus(S_CLOSING); } // ----------------------------------------- #if 0 // debug FileStream file; file.openReadOnly("c://test.mp3"); LOG_DEBUG("raw file read"); char buf[4000]; int cnt=0; while (!file.eof()) { LOG_DEBUG("send %d",cnt++); file.read(buf,sizeof(buf)); sock->write(buf,sizeof(buf)); } file.close(); LOG_DEBUG("raw file sent"); return; // debug #endif // ----------------------------------- bool Servent::waitForChannelHeader(ChanInfo &info) { for(int i=0; i<30*10; i++) {
  990. Channel *ch = chanMgr->findChannelByID(info.id);
  991. if (!ch)
  992. return false;
  993. if (ch->isPlaying() && (ch->rawData.writePos>0)) return true; if (!thread.active || !sock->active()) break; sys->sleep(100); } return false; } // ----------------------------------- void Servent::sendRawChannel(bool sendHead, bool sendData) {
  994. try {
  995. sock->setWriteTimeout(DIRECT_WRITE_TIMEOUT*1000);
  996. Channel *ch = chanMgr->findChannelByID(chanID);
  997. if (!ch)
  998. throw StreamException("Channel not found");
  999. setStatus(S_CONNECTED); LOG_DEBUG("Starting Raw stream of %s at %d",ch->info.name.cstr(),streamPos);
  1000. if (sendHead)
  1001. { ch->headPack.writeRaw(*sock);
  1002. streamPos = ch->headPack.pos + ch->headPack.len;
  1003. LOG_DEBUG("Sent %d bytes header ",ch->headPack.len);
  1004. }
  1005. if (sendData)
  1006. {
  1007. unsigned int streamIndex = ch->streamIndex; unsigned int connectTime = sys->getTime();
  1008. unsigned int lastWriteTime = connectTime;
  1009. while ((thread.active) && sock->active()) {
  1010. ch = chanMgr->findChannelByID(chanID);
  1011. if (ch)
  1012. {
  1013. if (streamIndex != ch->streamIndex)
  1014. {
  1015. streamIndex = ch->streamIndex;
  1016. streamPos = ch->headPack.pos;
  1017. LOG_DEBUG("sendRaw got new stream index");
  1018. }
  1019. ChanPacket rawPack;
  1020. if (ch->rawData.findPacket(streamPos,rawPack))
  1021. {
  1022. if (syncPos != rawPack.sync)
  1023. LOG_ERROR("Send skip: %d",rawPack.sync-syncPos);
  1024. syncPos = rawPack.sync+1;
  1025. if ((rawPack.type == ChanPacket::T_DATA) || (rawPack.type == ChanPacket::T_HEAD))
  1026. {
  1027. rawPack.writeRaw(*sock);
  1028. lastWriteTime = sys->getTime();
  1029. }
  1030. if (rawPack.pos < streamPos)
  1031. LOG_DEBUG("raw: skip back %d",rawPack.pos-streamPos);
  1032. streamPos = rawPack.pos+rawPack.len;
  1033. }
  1034. }
  1035. if ((sys->getTime()-lastWriteTime) > DIRECT_WRITE_TIMEOUT)
  1036. throw TimeoutException();
  1037. sys->sleepIdle(); }
  1038. } }catch(StreamException &e) { LOG_ERROR("Stream channel: %s",e.msg); } }
  1039. #if 0
  1040. // -----------------------------------
  1041. void Servent::sendRawMultiChannel(bool sendHead, bool sendData)
  1042. {
  1043. try
  1044. {
  1045. unsigned int chanStreamIndex[ChanMgr::MAX_CHANNELS];
  1046. unsigned int chanStreamPos[ChanMgr::MAX_CHANNELS];
  1047. GnuID chanIDs[ChanMgr::MAX_CHANNELS];
  1048. int numChanIDs=0;
  1049. for(int i=0; i<ChanMgr::MAX_CHANNELS; i++)
  1050. {
  1051. Channel *ch = &chanMgr->channels[i];
  1052. if (ch->isPlaying())
  1053. chanIDs[numChanIDs++]=ch->info.id;
  1054. }
  1055. setStatus(S_CONNECTED);
  1056. if (sendHead)
  1057. {
  1058. for(int i=0; i<numChanIDs; i++)
  1059. {
  1060. Channel *ch = chanMgr->findChannelByID(chanIDs[i]);
  1061. if (ch)
  1062. {
  1063. LOG_DEBUG("Starting RawMulti stream: %s",ch->info.name.cstr());
  1064. ch->headPack.writeRaw(*sock);
  1065. chanStreamPos[i] = ch->headPack.pos + ch->headPack.len;
  1066. chanStreamIndex[i] = ch->streamIndex;
  1067. LOG_DEBUG("Sent %d bytes header",ch->headPack.len);
  1068. }
  1069. }
  1070. }
  1071. if (sendData)
  1072. {
  1073. unsigned int connectTime=sys->getTime();
  1074. while ((thread.active) && sock->active())
  1075. {
  1076. for(int i=1; i<numChanIDs; i++)
  1077. {
  1078. Channel *ch = chanMgr->findChannelByID(chanIDs[i]);
  1079. if (ch)
  1080. {
  1081. if (chanStreamIndex[i] != ch->streamIndex)
  1082. {
  1083. chanStreamIndex[i] = ch->streamIndex;
  1084. chanStreamPos[i] = ch->headPack.pos;
  1085. LOG_DEBUG("sendRawMulti got new stream index for chan %d",i);
  1086. }
  1087. ChanPacket rawPack;
  1088. if (ch->rawData.findPacket(chanStreamPos[i],rawPack))
  1089. {
  1090. if ((rawPack.type == ChanPacket::T_DATA) || (rawPack.type == ChanPacket::T_HEAD))
  1091. rawPack.writeRaw(*sock);
  1092. if (rawPack.pos < chanStreamPos[i])
  1093. LOG_DEBUG("raw: skip back %d",rawPack.pos-chanStreamPos[i]);
  1094. chanStreamPos[i] = rawPack.pos+rawPack.len;
  1095. //LOG("raw at %d: %d %d",streamPos,ch->rawData.getStreamPos(ch->rawData.firstPos),ch->rawData.getStreamPos(ch->rawData.lastPos));
  1096. }
  1097. }
  1098. break;
  1099. }
  1100. sys->sleepIdle();
  1101. }
  1102. }
  1103. }catch(StreamException &e)
  1104. {
  1105. LOG_ERROR("Stream channel: %s",e.msg);
  1106. }
  1107. }
  1108. #endif
  1109. // ----------------------------------- void Servent::sendRawMetaChannel(int interval) {
  1110. try {
  1111. Channel *ch = chanMgr->findChannelByID(chanID);
  1112. if (!ch)
  1113. throw StreamException("Channel not found");
  1114. sock->setWriteTimeout(DIRECT_WRITE_TIMEOUT*1000);
  1115. setStatus(S_CONNECTED); LOG_DEBUG("Starting Raw Meta stream of %s (metaint: %d) at %d",ch->info.name.cstr(),interval,streamPos); String lastTitle,lastURL; int lastMsgTime=sys->getTime(); bool showMsg=true; char buf[16384]; int bufPos=0; if ((interval > sizeof(buf)) || (interval < 1)) throw StreamException("Bad ICY Meta Interval value");
  1116. unsigned int connectTime = sys->getTime(); unsigned int lastWriteTime = connectTime;
  1117. streamPos = 0; // raw meta channel has no header (its MP3)
  1118. while ((thread.active) && sock->active()) {
  1119. ch = chanMgr->findChannelByID(chanID);
  1120. if (ch)
  1121. {
  1122. ChanPacket rawPack;
  1123. if (ch->rawData.findPacket(streamPos,rawPack))
  1124. {
  1125. if (syncPos != rawPack.sync)
  1126. LOG_ERROR("Send skip: %d",rawPack.sync-syncPos);
  1127. syncPos = rawPack.sync+1;
  1128. MemoryStream mem(rawPack.data,rawPack.len);
  1129. if (rawPack.type == ChanPacket::T_DATA) {
  1130. int len = rawPack.len; char *p = rawPack.data; while (len) { int rl = len; if ((bufPos+rl) > interval) rl = interval-bufPos; memcpy(&buf[bufPos],p,rl); bufPos+=rl; p+=rl; len-=rl; if (bufPos >= interval) { bufPos = 0; sock->write(buf,interval); lastWriteTime = sys->getTime();
  1131. if (chanMgr->broadcastMsgInterval) if ((sys->getTime()-lastMsgTime) >= chanMgr->broadcastMsgInterval) { showMsg ^= true; lastMsgTime = sys->getTime(); } String *metaTitle = &ch->info.track.title; if (!ch->info.comment.isEmpty() && (showMsg)) metaTitle = &ch->info.comment; if (!metaTitle->isSame(lastTitle) || !ch->info.url.isSame(lastURL)) { char tmp[1024]; String title,url; title = *metaTitle; url = ch->info.url; title.convertTo(String::T_META); url.convertTo(String::T_META); sprintf(tmp,"StreamTitle='%s';StreamUrl='%s';",title.cstr(),url.cstr()); int len = ((strlen(tmp) + 15+1) / 16); sock->writeChar(len); sock->write(tmp,len*16); lastTitle = *metaTitle; lastURL = ch->info.url; LOG_DEBUG("StreamTitle: %s, StreamURL: %s",lastTitle.cstr(),lastURL.cstr()); }else { sock->writeChar(0); } }
  1132. } }
  1133. streamPos = rawPack.pos + rawPack.len; }
  1134. }
  1135. if ((sys->getTime()-lastWriteTime) > DIRECT_WRITE_TIMEOUT)
  1136. throw TimeoutException();
  1137. sys->sleepIdle();
  1138. } }catch(StreamException &e) { LOG_ERROR("Stream channel: %s",e.msg); } } // ----------------------------------- void Servent::sendPeercastChannel() {
  1139. try { setStatus(S_CONNECTED);
  1140. Channel *ch = chanMgr->findChannelByID(chanID);
  1141. if (!ch)
  1142. throw StreamException("Channel not found");
  1143. LOG_DEBUG("Starting PeerCast stream: %s",ch->info.name.cstr()); sock->writeTag("PCST"); ChanPacket pack;
  1144. ch->headPack.writePeercast(*sock);
  1145. pack.init(ChanPacket::T_META,ch->insertMeta.data,ch->insertMeta.len,ch->streamPos); pack.writePeercast(*sock); streamPos = 0;
  1146. unsigned int syncPos=0; while ((thread.active) && sock->active()) { ch = chanMgr->findChannelByID(chanID);
  1147. if (ch)
  1148. {
  1149. ChanPacket rawPack;
  1150. if (ch->rawData.findPacket(streamPos,rawPack))
  1151. {
  1152. if ((rawPack.type == ChanPacket::T_DATA) || (rawPack.type == ChanPacket::T_HEAD))
  1153. {
  1154. sock->writeTag("SYNC");
  1155. sock->writeShort(4);
  1156. sock->writeShort(0);
  1157. sock->write(&syncPos,4);
  1158. syncPos++;
  1159. rawPack.writePeercast(*sock);
  1160. }
  1161. streamPos = rawPack.pos + rawPack.len;
  1162. }
  1163. }
  1164. sys->sleepIdle(); } }catch(StreamException &e) { LOG_ERROR("Stream channel: %s",e.msg); } }
  1165. // -----------------------------------
  1166. void Servent::sendPCPChannel()
  1167. {
  1168. Channel *ch = chanMgr->findChannelByID(chanID);
  1169. if (!ch)
  1170. throw StreamException("Channel not found");
  1171. AtomStream atom(*sock);
  1172. pcpStream = new PCPStream(remoteID);
  1173. int error=0;
  1174. try
  1175. {
  1176. LOG_DEBUG("Starting PCP stream of channel at %d",streamPos);
  1177. setStatus(S_CONNECTED);
  1178. atom.writeParent(PCP_CHAN,3 + ((sendHeader)?1:0));
  1179. atom.writeBytes(PCP_CHAN_ID,chanID.id,16);
  1180. ch->info.writeInfoAtoms(atom);
  1181. ch->info.writeTrackAtoms(atom);
  1182. if (sendHeader)
  1183. {
  1184. atom.writeParent(PCP_CHAN_PKT,3);
  1185. atom.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_HEAD);
  1186. atom.writeInt(PCP_CHAN_PKT_POS,ch->headPack.pos);
  1187. atom.writeBytes(PCP_CHAN_PKT_DATA,ch->headPack.data,ch->headPack.len);
  1188. streamPos = ch->headPack.pos+ch->headPack.len;
  1189. LOG_DEBUG("Sent %d bytes header",ch->headPack.len);
  1190. }
  1191. unsigned int streamIndex = ch->streamIndex;
  1192. while (thread.active)
  1193. {
  1194. Channel *ch = chanMgr->findChannelByID(chanID);
  1195. if (ch)
  1196. {
  1197. if (streamIndex != ch->streamIndex)
  1198. {
  1199. streamIndex = ch->streamIndex;
  1200. streamPos = ch->headPack.pos;
  1201. LOG_DEBUG("sendPCPStream got new stream index");
  1202. }
  1203. ChanPacket rawPack;
  1204. if (ch->rawData.findPacket(streamPos,rawPack))
  1205. {
  1206. if (rawPack.type == ChanPacket::T_HEAD)
  1207. {
  1208. atom.writeParent(PCP_CHAN,2);
  1209. atom.writeBytes(PCP_CHAN_ID,chanID.id,16);
  1210. atom.writeParent(PCP_CHAN_PKT,3);
  1211. atom.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_HEAD);
  1212. atom.writeInt(PCP_CHAN_PKT_POS,rawPack.pos);
  1213. atom.writeBytes(PCP_CHAN_PKT_DATA,rawPack.data,rawPack.len);
  1214. }else if (rawPack.type == ChanPacket::T_DATA)
  1215. {
  1216. atom.writeParent(PCP_CHAN,2);
  1217. atom.writeBytes(PCP_CHAN_ID,chanID.id,16);
  1218. atom.writeParent(PCP_CHAN_PKT,3);
  1219. atom.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_DATA);
  1220. atom.writeInt(PCP_CHAN_PKT_POS,rawPack.pos);
  1221. atom.writeBytes(PCP_CHAN_PKT_DATA,rawPack.data,rawPack.len);
  1222. }
  1223. if (rawPack.pos < streamPos)
  1224. LOG_DEBUG("pcp: skip back %d",rawPack.pos-streamPos);
  1225. //LOG_DEBUG("Sending %d-%d (%d,%d,%d)",rawPack.pos,rawPack.pos+rawPack.len,ch->streamPos,ch->rawData.getLatestPos(),ch->rawData.getOldestPos());
  1226. streamPos = rawPack.pos+rawPack.len;
  1227. }
  1228. }
  1229. BroadcastState bcs;
  1230. error = pcpStream->readPacket(*sock,bcs);
  1231. if (error)
  1232. throw StreamException("PCP exception");
  1233. sys->sleepIdle();
  1234. }
  1235. LOG_DEBUG("PCP channel stream closed normally.");
  1236. }catch(StreamException &e)
  1237. {
  1238. LOG_ERROR("Stream channel: %s",e.msg);
  1239. }
  1240. try
  1241. {
  1242. atom.writeInt(PCP_QUIT,error);
  1243. }catch(StreamException &) {}
  1244. }
  1245. // ----------------------------------- int Servent::serverProc(ThreadInfo *thread) { // thread->lock();
  1246. Servent *sv = (Servent*)thread->data; try  { if (!sv->sock) throw StreamException("Server has no socket"); sv->setStatus(S_LISTENING); char servIP[64]; sv->sock->host.toStr(servIP); if (servMgr->isRoot) LOG_DEBUG("Root Server started: %s",servIP); else LOG_DEBUG("Server started: %s",servIP); while ((thread->active) && (sv->sock->active())) {
  1247. if (servMgr->numActiveOnPort(sv->sock->host.port) < servMgr->maxServIn)
  1248. { ClientSocket *cs = sv->sock->accept(); if (cs) {
  1249. LOG_DEBUG("accepted incoming"); Servent *ns = servMgr->allocServent(); if (ns) {
  1250. servMgr->lastIncoming = sys->getTime();
  1251. ns->servPort = sv->sock->host.port; ns->networkID = servMgr->networkID; ns->initIncoming(cs,sv->allow); }else LOG_ERROR("Out of servents"); }
  1252. }
  1253. sys->sleep(100);
  1254. } }catch(StreamException &e) { LOG_ERROR("Server Error: %s:%d",e.msg,e.err); } LOG_DEBUG("Server stopped");
  1255. sv->kill();
  1256. sys->endThread(thread); return 0; }   // -----------------------------------
  1257. bool Servent::writeVariable(Stream &s, const String &var)
  1258. {
  1259. char buf[1024];
  1260. if (var == "type")
  1261. strcpy(buf,getTypeStr());
  1262. else if (var == "status")
  1263. strcpy(buf,getStatusStr());
  1264. else if (var == "address")
  1265. getHost().toStr(buf);
  1266. else if (var == "agent")
  1267. strcpy(buf,agent.cstr());
  1268. else if (var == "bitrate")
  1269. {
  1270. if (sock)
  1271. {
  1272. unsigned int tot = sock->bytesInPerSec+sock->bytesOutPerSec;
  1273. sprintf(buf,"%.1f",BYTES_TO_KBPS(tot));
  1274. }else
  1275. strcpy(buf,"0");
  1276. }else if (var == "uptime")
  1277. {
  1278. String uptime;
  1279. if (lastConnect)
  1280. uptime.setFromStopwatch(sys->getTime()-lastConnect);
  1281. else
  1282. uptime.set("-");
  1283. strcpy(buf,uptime.cstr());
  1284. }else if (var.startsWith("gnet."))
  1285. {
  1286. float ctime = (float)(sys->getTime()-lastConnect);
  1287. if (var == "gnet.packetsIn")
  1288. sprintf(buf,"%d",gnuStream.packetsIn);
  1289. else if (var == "gnet.packetsInPerSec")
  1290. sprintf(buf,"%.1f",ctime>0?((float)gnuStream.packetsIn)/ctime:0);
  1291. else if (var == "gnet.packetsOut")
  1292. sprintf(buf,"%d",gnuStream.packetsOut);
  1293. else if (var == "gnet.packetsOutPerSec")
  1294. sprintf(buf,"%.1f",ctime>0?((float)gnuStream.packetsOut)/ctime:0);
  1295. else if (var == "gnet.normQueue")
  1296. sprintf(buf,"%d",outPacketsNorm.numPending());
  1297. else if (var == "gnet.priQueue")
  1298. sprintf(buf,"%d",outPacketsPri.numPending());
  1299. else if (var == "gnet.flowControl")
  1300. sprintf(buf,"%d",flowControl?1:0);
  1301. else if (var == "gnet.routeTime")
  1302. {
  1303. int nr = seenIDs.numUsed();
  1304. unsigned int tim = sys->getTime()-seenIDs.getOldest();
  1305. String tstr;
  1306. tstr.setFromStopwatch(tim);
  1307. if (nr)
  1308. strcpy(buf,tstr.cstr());
  1309. else
  1310. strcpy(buf,"-");
  1311. }
  1312. else
  1313. return false;
  1314. }else
  1315. return false;
  1316. s.writeString(buf);
  1317. return true;
  1318. }