CPnew.c
上传用户:liguizhu
上传日期:2015-11-01
资源大小:2422k
文件大小:35k
源码类别:

P2P编程

开发平台:

Visual C++

  1. /*
  2.  *  Openmysee
  3.  *
  4.  *  This program is free software; you can redistribute it and/or modify
  5.  *  it under the terms of the GNU General Public License as published by
  6.  *  the Free Software Foundation; either version 2 of the License, or
  7.  *  (at your option) any later version.
  8.  *
  9.  *  This program is distributed in the hope that it will be useful,
  10.  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
  11.  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  12.  *  GNU General Public License for more details.
  13.  *
  14.  *  You should have received a copy of the GNU General Public License
  15.  *  along with this program; if not, write to the Free Software
  16.  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
  17.  *
  18.  */
  19.  
  20. #include "echo.h"
  21. #define TYPE_NP 0
  22. #define TYPE_CP 1
  23. #define TYPE_SCP 2
  24. #define TYPE_ECP 3
  25. #define TYPE_GCP 4
  26. #define MEDIATYPE_FIRST 0.149
  27. #define MAX_IDLE 90 /* destruct a connection to sp/np after 90s */
  28. //#define P2PS_PORT 50002
  29. #define TS4CP_PORT 22168
  30. #define REQUEST_AHEAD 15 /* Request how many blocks if a block doesn't exist in cache */
  31. int MAX_P2PS=500;
  32. int MAX_P2PC=512;
  33. int MAX_JOB_PER_SESSION=3;
  34. //char *CONFIG = "./acp.cfg";
  35. char *PREFIX="/data/cp/";
  36. char *SCP_CHANNEL;
  37. char *ECP_REGION;
  38. int isGCP;
  39. char *SERVERIP;
  40. char *BINDIP;
  41. char *PIDFile="/var/run/cpnew.pid";
  42. int MAX_BANDWIDTH;
  43. char *AUTH_MD5;
  44. int  AUTH_USERID;
  45. fd_set osocks;
  46. int BINDALL;
  47. #ifdef HAVE_TS
  48. int TSSOCK = 0;
  49. //#define RANDOM_PORT 3947
  50. #endif
  51. time_t CurrentTime;
  52. time_t startTime;
  53. int SnapShotInterval;
  54. int NearPeerInterval = 30;
  55. int isSet = 0; /* whether TS has returned WELCOME message */
  56. struct TSMessage UDPMsg;
  57. extern struct Channel *ChannelHash[MAX_CHANNEL];
  58. extern struct Channel *ChannelList;
  59. struct sockaddr_in TSADDR;
  60. socklen_t addrlen = sizeof (struct sockaddr_in);
  61. // calculate avg & cur speed
  62. long long totalDownBytes=0, totalUpBytes=0;
  63. long long tmpDownBytes=0, tmpUpBytes=0;
  64. struct ServerDesc TRACKER[MAX_TYPE];
  65. char *LOGXML;
  66. extern int errno;
  67. int JobHighWater = 10000;
  68. int MaxNPPerChannel = 300;
  69. int cfgP2PS_PORT = 23;
  70. int cfgCP2TS_PORT = CP2TS_PORT;
  71. struct NamVal ConfigParameters[]
  72. {
  73. {"Prefix", &PREFIX, 's'},
  74. {"MAX_NP", &MAX_P2PS, 'd'},
  75. {"MAX_SP", &MAX_P2PC, 'd'},
  76. {"Pidfile", &PIDFile, 's'},
  77. {"TrackerIP", &SERVERIP, 's'},
  78. {"SCP", &SCP_CHANNEL, 's'},
  79. {"GCP", &isGCP, 'd'},
  80. {"authid", &AUTH_USERID, 'd'},
  81. {"authmd5", &AUTH_MD5, 's'},
  82. {"ECP", &ECP_REGION, 's'},
  83. {"BandWidth", &MAX_BANDWIDTH, 'd'},
  84. {"BindIP", &BINDIP, 's'},
  85. {"SnapShotInterval",&SnapShotInterval,'d'},
  86. {"CP4NP_PORT", &cfgP2PS_PORT, 'd'},
  87. {"CP2TS_PORT", &cfgCP2TS_PORT, 'd'},
  88. {"LogFilePath", &LOGXML, 's'},
  89. {"NearPeerInterval", &NearPeerInterval, 'd'},
  90. {"JobHighWater", &JobHighWater, 'd'},
  91. {"MaxNPPerChannel", &MaxNPPerChannel, 'd'},
  92. {"BINDALL", &BINDALL, 'd'}
  93. };
  94. int register_cp ();
  95. int init_cp ();
  96. int handle_new_connection(int sock, int type);
  97. int Clientclosure (int listnum, int type);
  98. void process_child (void);
  99. int init_P2PS (int listnum);
  100. int process_P2PS (int listnum);
  101. int closure_P2PS (int listnum);
  102. int init_P2PC (int listnum);
  103. int process_P2PC (int listnum);
  104. int closure_P2PC (int listnum);
  105. int sendMessage (int sock, char *ptr, struct sockaddr_in *dest);
  106. int process_TS2CP_PEERS (char *buf);
  107. void process_type (int type, fd_set *socks, fd_set *wsocks, fd_set *esocks);
  108. int reconnect (struct Session *p);
  109. char *parseECP (char *str, char *buf);
  110. int closure_TS ();
  111. int periodCheck (float KBPSused);
  112. void makeSnapShot(int count, int time_interval);
  113. int send_nearpeers (struct Channel *pc, struct Edge *pme);
  114. int send_nearpeers_toall (struct Channel *pc);
  115. #ifdef HAVE_TS
  116. int process_TS();
  117. #endif
  118. int send_P2P_PUSHLIST (struct Channel *pc, int id);
  119. int period_process (void);
  120. extern char *getJobBuffer (struct JobDes *p, int *max);
  121. extern inline void setblockId (struct JobDes *pj, int id);
  122. #include "sessions.c"
  123. #define INIT_MAXQ(pc,s,maxq) do
  124. {
  125. if (s->maxBlockID == 0) return -1;
  126. if (s->maxBlockID >= MAX_QUEUE) maxq = MAX_QUEUE;
  127. else maxq = s->maxBlockID;
  128. if (pc->pcinfo->indisk == NULL)
  129. {
  130. pc->pcinfo->max_queue = maxq;
  131. pc->pcinfo->indisk = calloc (maxq, 1);
  132. pc->pcinfo->bitflag = calloc ((maxq+7)/8, 1);
  133. if (pc->pcinfo->indisk == NULL || pc->pcinfo->bitflag == NULL)
  134. return -1;
  135. } else if (maxq < pc->pcinfo->max_queue)
  136. {
  137. pc->pcinfo->max_queue = maxq;
  138. pc->pcinfo->indisk = realloc (pc->pcinfo->indisk, maxq);
  139. pc->pcinfo->bitflag = realloc (pc->pcinfo->bitflag, (maxq+7)/8);
  140. if (pc->pcinfo->indisk == NULL || pc->pcinfo->bitflag == NULL)
  141. return -1;
  142. }
  143. } while (0)
  144. int period_process (void)
  145. {
  146. static time_t last_snapshot;
  147. static int snapCount = 0;
  148. if (CurrentTime - last_snapshot > SnapShotInterval)
  149. {
  150. makeSnapShot(snapCount++, CurrentTime-last_snapshot);
  151. system("/usr/bin/vmstat -a >> cp.log 2>&1 &");
  152. last_snapshot = CurrentTime;
  153. }
  154. return 0;
  155. }
  156. int main(int argc, char **argv)
  157. {
  158. int i, mode = 1, tmp = 0;
  159. if (argc < 2)
  160. {
  161. printf("usage: %s mode(0 for daemon, 1 for console).n", argv[0]);
  162. return -1;
  163. }
  164. signal (SIGPIPE, SIG_IGN);
  165. signal (SIGINT, terminate);
  166. mode = atoi (argv[1]);
  167. if (mode == 0)
  168. daemon(1,1);
  169. read_config (CONFIG, ConfigParameters, sizeof(ConfigParameters)/sizeof(struct NamVal));
  170. for(i = 0 ; i < argc; ++i)
  171. {
  172. if(strncmp(argv[i], "tcp=", 4) == 0)
  173. {
  174. tmp = atoi(argv[i]+4);
  175. if(tmp > 0 && tmp < 65535)
  176. cfgP2PS_PORT = tmp;
  177. }
  178. else if(strncmp(argv[i], "udp=", 4) == 0)
  179. {
  180. tmp = atoi (argv[i]+4);
  181. if(tmp > 0 && tmp < 65535)
  182. cfgCP2TS_PORT = tmp;
  183. }
  184. }
  185. for (i=0; i<10 && IN_LOOP > 0; i++)
  186. {
  187. /*
  188. pid_t pid;
  189. if ((pid = fork ()) == 0)
  190. {
  191. */
  192. FD_ZERO(&osocks);
  193. if (init_cp () < 0)// || initLOG () < 0)
  194. {
  195. PDEBUG ("init_cp error, exit...n");
  196. exit (-1);
  197. }
  198. process_child ();
  199. /*
  200. } else if (pid < 0)
  201. {
  202. perror ("fork");
  203. exit (pid);
  204. } else
  205. {
  206. waitpid (pid, NULL, 0);
  207. }
  208. */
  209. }
  210. return 0;
  211. }
  212. int init_P2PS (int listnum)
  213. {
  214. return 0;
  215. }
  216. int process_P2P_HELLO (struct Session *p, struct Message *m)
  217. {
  218. struct Edge *pedge = NULL;
  219. float version = *(float *)(m->buffer);
  220. char *buf = m->buffer + sizeof (float);
  221. struct Channel *pc;
  222. struct LiveChannelInfo *pcinfo;
  223. int listnum;
  224. // char *buf, buffer[MAX_DATA];
  225. PINFO ("RECV P2P_HELLO. n");
  226. p->version = version;
  227. listnum = p - TRACKER[TYPE_P2PS].head;
  228. if ((pc = findChannel (buf, MD5_LEN)) == NULL)
  229. {
  230. if ((pc = newLiveChannel (buf, NULL, buf, 0, 0)) != (struct Channel *)0)
  231. {
  232. pedge=newEdge (pc, p);
  233. pc->numclient ++;
  234. } else
  235. {
  236. Clientclosure (listnum, TYPE_P2PS);
  237. return -1;
  238. }
  239. p->pc = pc;
  240. } else
  241. {
  242. if (pc->numclient > MaxNPPerChannel)
  243. {
  244. Clientclosure (listnum, TYPE_P2PS);
  245. return -1;
  246. }
  247. for (pedge=p->header; pedge && pedge->head != pc; pedge=pedge->enext);
  248. if (pedge == NULL)
  249. {
  250. pedge=newEdge (pc, p);
  251. pc->numclient ++;
  252. }
  253. p->pc = pc;
  254. if (pc->pcinfo->dataSource) 
  255. {
  256. /*
  257. // connect exist, send p2p_hello to check channel state.
  258. buf = buffer+sizeof (int);
  259. *(unsigned char *)buf = P2P_HELLO;
  260. buf += sizeof (char);
  261. memcpy (buf, pc->channel_md5, MD5_LEN);
  262. buf += MD5_LEN;
  263. *(unsigned char *)buf = 0;
  264. buf += sizeof (char);
  265. *(int *)buffer = buf - buffer;
  266. if (writeMessage (pc->pcinfo->dataSource, buffer) < 0)
  267. {
  268. Clientclosure (listnum, TYPE_P2PC);
  269. return -1;
  270. }
  271. PDEBUG("sent P2P_HELLO to SP. n");
  272. */
  273. buf += MD5_LEN + sizeof (char);
  274. memcpy (&(p->addr), buf, sizeof (p->addr));
  275. if (pedge) send_nearpeers (pc, pedge);
  276. return 0;
  277. }
  278. PDEBUG("NO connection to SP. try connect.n");
  279. }
  280. buf += MD5_LEN + sizeof (char);
  281. memcpy (&(p->addr), buf, sizeof (p->addr));
  282. buf += sizeof (p->addr);
  283. pcinfo = pc->pcinfo;
  284. pcinfo->numofsp = (unsigned char)*buf;
  285. buf += sizeof (char);
  286. if (pcinfo->numofsp == 0 || pcinfo->numofsp > MAX_REPSP)
  287. {
  288. Clientclosure (listnum, TYPE_P2PS);
  289. return -1;
  290. }
  291. memcpy (&(pcinfo->SPLIST), buf, pcinfo->numofsp*sizeof(struct NormalAddress));
  292. buf += pcinfo->numofsp*sizeof (struct NormalAddress);
  293. if (buf - m->buffer + NORMAL_HEADER > m->len)
  294. {
  295. PDEBUG ("Invalid message %d, length %d not enoughn", m->type, m->len);
  296. Clientclosure (listnum, TYPE_P2PS);
  297. return -1;
  298. }
  299. if (isGCP || SCP_CHANNEL)
  300. {
  301. if ((listnum = newChannel (pc, pcinfo->SPLIST, pcinfo->numofsp, 0)) < 0)
  302. {
  303. PDEBUG("Unable connect to SP.n");
  304. Clientclosure (p-TRACKER[TYPE_P2PS].head, TYPE_P2PS);
  305. return -1;
  306. }
  307. } else /* ECP, need peers */
  308. {
  309. #ifdef HAVE_TS
  310. UDPMsg.type = CP2TS_NEED_PEERS;
  311. UDPMsg.len = 12+MD5_LEN;
  312. memcpy (UDPMsg.buffer, pc->channel_md5, MD5_LEN);
  313. /* isSet whether TS has returned WELCOME message */
  314. if (isSet && sendMessage (TSSOCK, (char *)&UDPMsg, &TSADDR) < 0)
  315. {
  316. PDEBUG ("exit...n");
  317. exit (1);
  318. }
  319. #endif
  320. }
  321. if (pedge) send_nearpeers (pc, pedge);
  322. return 0;
  323. }
  324. int process_P2P_MEDIATYPE (int listnum, struct Message *m)
  325. {
  326. struct Channel *pc;
  327. char *buf, *media, *name, *channel_name;
  328. int start, length, size, proglen, chnllen;
  329. buf = m->buffer;
  330. if ((pc = findChannel (buf, MD5_LEN)) == NULL)
  331. return -1;
  332. buf += MD5_LEN;
  333. start = *(int *)buf;
  334. buf += sizeof (int);
  335. length = *(int *)buf;
  336. buf += sizeof (int);
  337. size = *(int *)buf;
  338. buf += sizeof (int);
  339. media = buf;
  340. buf += size;
  341. proglen = *(unsigned char *)buf;
  342. buf += sizeof (char);
  343. name = buf;
  344. buf += proglen;
  345. buf += sizeof (int);
  346. chnllen = *(unsigned char *)buf;
  347. buf += sizeof (char);
  348. channel_name = buf;
  349. buf += chnllen;
  350. if (buf - (char *)m > m->len)
  351. return -1;
  352. addMedia (pc, start, length, size, media, name, channel_name);
  353. return 0;
  354. }
  355. int process_P2P_PUSHLIST (struct Session *p, struct Message *m)
  356. {
  357. struct Channel *pc;
  358. struct Edge *pedge;
  359. char *buf;
  360. int i, type, listnum, size;
  361. listnum = p - TRACKER[TYPE_P2PS].head;
  362. buf = m->buffer;
  363. if (p->npcp == TYPE_CP)
  364. {
  365. if ((pc = findChannel (buf, MD5_LEN)) == NULL)
  366. {
  367. if ((pc = newLiveChannel (m->buffer, NULL, m->buffer, 0, 0)) != (struct Channel *)0)
  368. {
  369. pedge=newEdge (pc, p);
  370. pc->numclient ++;
  371. }
  372. } else
  373. {
  374. for (pedge=p->header; pedge && pedge->head != pc; pedge=pedge->enext);
  375. if (pedge == NULL)
  376. {
  377. pedge=newEdge (pc, p);
  378. pc->numclient ++;
  379. }
  380. }
  381. buf += MD5_LEN;
  382. } else
  383. {
  384. pc = p->pc;
  385. }
  386. if (pc == NULL && p->npcp != TYPE_CP)
  387. {
  388. Clientclosure (listnum, TYPE_P2PS);
  389. return -1;
  390. }
  391. if (p->numjob >= MAX_JOB_PER_SESSION)
  392. return -2;
  393. type = *(unsigned char *)buf;
  394. buf += sizeof (char);
  395. size = *(unsigned char *)buf;
  396. buf += sizeof (char);
  397. if (type)
  398. {
  399. deleteChannel (p, pc);
  400. for (i=0; i<size; i++)
  401. if (process_P2P_REQUEST_real (p, pc, ((unsigned int *)buf)[i]) < 0)
  402. return -1;
  403. } else
  404. {
  405. for (i=0; i<size; i++)
  406. if (process_P2P_REQUEST_real (p, pc, ((unsigned int *)buf)[i]) < 0)
  407. return -1;
  408. buf += size*sizeof(int);
  409. size = *(unsigned char *)buf;
  410. buf += sizeof (char);
  411. deleteJob (p, pc, (unsigned int *)buf, size);
  412. }
  413. if (buf - m->buffer + NORMAL_HEADER > m->len)
  414. {
  415. PDEBUG ("Invalid message %d, length %d not enoughn", m->type, m->len);
  416. Clientclosure (listnum, TYPE_P2PS);
  417. return -1;
  418. }
  419. return 0;
  420. }
  421. int process_P2P_REQUEST_real (struct Session *p, struct Channel *pc, int id)
  422. {
  423. struct JobDes *pj = newJob ();
  424. char *buf, *buffer;
  425. struct LiveChannelInfo *pcinfo;
  426. int size, max, i;
  427. buffer = getJobBuffer (pj, &max);
  428. buf = buffer + sizeof (int);
  429. *(unsigned char *)buf = P2P_RESPONSE;
  430. buf += sizeof (char);
  431. if (p->npcp == TYPE_CP)
  432. {
  433. memcpy (buf, pc->channel_md5, MD5_LEN);
  434. buf += MD5_LEN;
  435. }
  436. pcinfo = pc->pcinfo;
  437. if (p->first == 0)
  438. {
  439. p->first ++;
  440. if (p->version >= MEDIATYPE_FIRST)
  441. sendIdMedia (p, pc, id, 0);
  442. }
  443. if (pcinfo->dataSource == NULL)
  444. {
  445. *(int *)buf = id;
  446. buf += sizeof (int);
  447. *(int *)buf = 0;
  448. buf += sizeof (int);
  449. if (isGCP || SCP_CHANNEL)
  450. {
  451. if ((i = newChannel (pc, pcinfo->SPLIST, pcinfo->numofsp, 0)) < 0)
  452. freeLiveChannel (pc, NULL);
  453. } else
  454. {
  455. #ifdef HAVE_TS
  456. UDPMsg.type = CP2TS_NEED_PEERS;
  457. UDPMsg.len = 12+MD5_LEN;
  458. memcpy (UDPMsg.buffer, pc->channel_md5, MD5_LEN);
  459. /* isSet whether TS has returned WELCOME message */
  460. if (isSet && sendMessage (TSSOCK, (char *)&UDPMsg, &TSADDR) < 0)
  461. {
  462. PDEBUG ("exit...n");
  463. exit (1);
  464. }
  465. #endif
  466. }
  467. } else if ((size=locate_by_id (pc, id, buf, max-32)) > 0)
  468. {
  469. buf += 2*sizeof (int) + size;
  470. if (p->version >= MEDIATYPE_FIRST && (i=isHit (pc, id)) >= 0)
  471. sendHitMedia (p, pc, i, id, 0);
  472. p->last_transferblock = CurrentTime;
  473. } else if (size == -2)
  474. {
  475. assert (0);
  476. PDEBUG ("Leave block %d to next round.n", id);
  477. return -1;
  478. } else if (id >= 0)
  479. {
  480. *(int *)buf = id;
  481. buf += sizeof (int);
  482. *(int *)buf = 0;
  483. buf += sizeof (int);
  484. PINFO ("no block %dn", id);
  485. send_P2P_PUSHLIST (pc, id);
  486. }
  487. *(int *)buffer = buf - buffer;
  488. setblockId(pj, id);
  489. writeDATAMessage(p,pc, pj);
  490. // PDEBUG ("Write block %dn", id);
  491. return 0;
  492. }
  493. int process_P2PS (int listnum)
  494. {
  495. struct Session *p = &(TRACKER[TYPE_P2PS].head[listnum]);
  496. struct Message *m = (struct Message *)(p->buf+p->start);
  497. tmpDownBytes += m->len;
  498. switch (m->type)
  499. {
  500. case P2P_HELLO:
  501. if (process_P2P_HELLO (p, m) == -2)
  502. return -2;
  503. break;
  504. case P2P_PUSHLIST:
  505. if (process_P2P_PUSHLIST (p, m) == -2)
  506. return -2;
  507. break;
  508. case P2P_REPORT: /* At present no action */
  509. break;
  510. case P2P_MSG:
  511. break;
  512. case P2P_SPUPDATE:
  513. break;
  514. case P2P_RESPONSE:
  515. break;
  516. case P2P_NEAR_PEERS:
  517. break;
  518. case P2P_REQMEDIA:
  519. sendIdMedia (p, p->pc, *(int *)(m->buffer), 0);
  520. break;
  521. default:
  522. PDEBUG ("Unknown message format from clientn");
  523. Clientclosure (listnum, TYPE_P2PS);
  524. return -1;
  525. }
  526. return 0;
  527. }
  528. int closure_P2PS (int listnum)
  529. {
  530. struct Session *p = &(TRACKER[TYPE_P2PS].head[listnum]);
  531. // struct Channel *pc = p->pc;
  532. struct Edge *pedge, *prevedge;
  533. PDEBUG ("CP disconnected from %d.%d.%d.%d:%d.n", IPADDR (p->host), p->port);
  534. for (pedge=p->header; pedge; pedge=prevedge)
  535. {
  536. pedge->head->numclient --;
  537. prevedge=pedge->enext;
  538. delEdge (pedge);
  539. }
  540. FD_CLR(p->socket, &osocks);
  541. close (p->socket);
  542. FREE (p->buf);
  543. deleteAll (p);
  544. memset (p, 0, sizeof (struct Session));
  545. return 0;
  546. }
  547. int init_P2PC (int listnum)
  548. {
  549. return 0;
  550. }
  551. int newChannel (struct Channel *pc, struct NormalAddress *client, int n, int flag)
  552. {
  553. struct Session *p;
  554. int listnum, newconn = -1, max, sock_flag;
  555. struct Session *head;
  556. struct sockaddr_in addr;
  557. char *buf, buffer[MAX_DATA];
  558. if (pc == NULL) return -1;
  559. head = TRACKER[TYPE_P2PC].head;
  560. max = TRACKER[TYPE_P2PC].max;
  561. for (listnum = 0; listnum < max; listnum ++)
  562. {
  563. if(head[listnum].socket == 0)
  564. {
  565. if ((newconn = socket (PF_INET, SOCK_STREAM, 0)) < 0)
  566. {
  567. perror ("socket||gethostbyname");
  568. return -1;
  569. }
  570. memset (&addr, 0, sizeof (addr));
  571. addr.sin_port = client[0].sin_port;
  572. addr.sin_addr = client[0].sin_addr;
  573. addr.sin_family = AF_INET;
  574. if ((sock_flag = connect_nonb(newconn, &addr, sizeof (addr))) == -1)
  575. {
  576. close (newconn);
  577. return -1;
  578. }
  579. head[listnum].socket = newconn;
  580. head[listnum].type = TYPE_P2PC;
  581. head[listnum].flag = 1;
  582. head[listnum].sock_flag = sock_flag;
  583. head[listnum].buf = NEW();
  584. head[listnum].pc = pc;
  585. head[listnum].time_sec = CurrentTime;
  586. head[listnum].totalup = 0;
  587. head[listnum].last_transferblock = CurrentTime;
  588. FD_SET(newconn, &osocks);
  589. if (listnum > TRACKER[TYPE_P2PC].maxid)
  590. TRACKER[TYPE_P2PC].maxid = listnum;
  591. break;
  592. }
  593. }
  594. if (listnum >= max)
  595. {
  596. PDEBUG ("no space left for new incoming client.");
  597. close (newconn);
  598. return -1;
  599. }
  600. TRACKER[TYPE_P2PC].cur ++;
  601. (*(TRACKER[TYPE_P2PC].init)) (listnum);
  602. p = &(TRACKER[TYPE_P2PC].head[listnum]);
  603. pc->pcinfo->dataSource = p;
  604. pc->upsize = 0;
  605. pc->downsize = 0;
  606. PDEBUG("Connect to %s:%d. and send P2P_HELLO.n", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
  607. buf = buffer+sizeof (int);
  608. *(unsigned char *)buf = P2P_HELLO;
  609. buf += sizeof (char);
  610. memcpy (buf, pc->channel_md5, MD5_LEN);
  611. buf += MD5_LEN;
  612. *(unsigned char *)buf = 0;
  613. buf += sizeof (char);
  614. if (flag == TYPE_CP)
  615. {
  616. *(unsigned char *)buf = pc->pcinfo->numofsp;
  617. buf += sizeof (char);
  618. if (pc->pcinfo->numofsp)
  619. {
  620. memcpy (buf, &(pc->pcinfo->SPLIST), pc->pcinfo->numofsp*sizeof (struct NormalAddress));
  621. buf += pc->pcinfo->numofsp*sizeof (struct NormalAddress);
  622. }
  623. }
  624. *(int *)buffer = buf - buffer;
  625. if (writeMessage (p, pc, buffer) < 0)
  626. {
  627. perror ("CP: write SP");
  628. Clientclosure (listnum, TYPE_P2PC);
  629. return -1;
  630. }
  631. return listnum;
  632. }
  633. int process_P2P_SPUPDATE (int listnum, struct Message *m)
  634. {
  635. int maxq;
  636. struct Edge *pedge;
  637. char buffer[MAX_DATA];
  638. struct Channel *pc;
  639. struct SPUpdate *s = (struct SPUpdate *)(m->buffer+MD5_LEN);
  640. int bShouldCloseSP = 0;
  641. if (m->len < sizeof (struct SPUpdate) + MD5_LEN + 5)
  642. {
  643. PDEBUG ("Invalid message %d, length %d not enoughn", m->type, m->len);
  644. Clientclosure (listnum, TYPE_P2PC);
  645. return -1;
  646. }
  647. *(int *)buffer = sizeof (struct SPUpdate) + sizeof(int) + sizeof (char);
  648. *(unsigned char *)(buffer + sizeof (int)) = P2P_SPUPDATE;
  649. PINFO ("Recv SPUPDATE(%lld,%lld,%u,%u).n", s->minKeySample, s->maxKeySample, s->minBlockID, s->maxBlockID);
  650. memcpy (buffer+sizeof(int)+sizeof(char), s, sizeof(struct SPUpdate));
  651. if ((pc=findChannel (m->buffer, MD5_LEN)) == NULL) return -1;
  652. if (s->minBlockID == 0xffffffff && s->maxBlockID == 0xffffffff)
  653. {
  654. pc->pcinfo->dataSource = NULL; /* indication of end */
  655. if(s->minKeySample == -1 && s->maxKeySample == -1)
  656. PDEBUG("NO SUCH RESOURCE!n"); // no such resource
  657. else if(s->minKeySample == 0 && s->maxKeySample == 0)
  658. PDEBUG("CHANNEL HAS BEEN CLOSED!n");// channel has been closed
  659. else
  660. PDEBUG("UNKNOWN MESSAGE! 1n"); // unknown message
  661. bShouldCloseSP = 1; // should close connection
  662. } else
  663. {
  664. if(s->minBlockID == 0 && s->maxBlockID == 0 && s->minKeySample == 0 && s->maxKeySample == 0)
  665. {
  666. PDEBUG("END OF CHANNEL!n"); // end of channel
  667. bShouldCloseSP = 1;
  668. } else if (s->minKeySample == -1ULL && s->maxKeySample == -1ULL)
  669. {
  670. INIT_MAXQ(pc,s,maxq);
  671. return 0;
  672. } else if (s->minKeySample == -2LL)
  673. {
  674. INIT_MAXQ(pc,s,maxq);
  675. pc->type = T_PLIST;
  676. pc->pcinfo->max_channel = (int)(s->maxKeySample);
  677. if (pc->pcinfo->max_channel <= 0 || pc->pcinfo->max_channel >= MAX_FILEINPUT)
  678. return -1;
  679. if (pc->pcinfo->media != NULL)
  680. freeMedia (pc);
  681. pc->pcinfo->media = calloc (sizeof (struct MediaData), pc->pcinfo->max_channel);
  682. return 0;
  683. } else if (s->minKeySample == -3LL)
  684. {
  685. pc->pcinfo->max_channel = 1;
  686. if (pc->pcinfo->media != NULL)
  687. freeMedia (pc);
  688. pc->pcinfo->media = calloc (sizeof (struct MediaData), 1);
  689. } else
  690. {
  691. memcpy (&(pc->pcinfo->s), s, sizeof (struct SPUpdate));
  692. // request block after spupdate, not wait!
  693. // now, block will be sent automaticlly by SP
  694. // send_P2P_PUSHLIST (pc, s->maxBlockID);
  695. }
  696. }
  697. {
  698. int i = 0;
  699. unsigned char vcode = 0;
  700. for (i = 0; i < sizeof (struct SPUpdate); ++i) {
  701. vcode += ((unsigned char*)s)[i];
  702. }
  703. buffer[*(int*)buffer] = vcode;
  704. ++*(int*)buffer;
  705. }
  706. for (pedge=pc->PeerHead; pedge; pedge=pedge->cnext)
  707. {
  708. if (pedge->me->npcp == TYPE_CP)
  709. {
  710. // PDEBUG ("Send SPUPDATE to CP %d.n", pedge->me-TRACKER[TYPE_P2PC].head);
  711. writeMessage (pedge->me, pc, (char *)m);
  712. } else
  713. {
  714. // PDEBUG ("Send SPUPDATE to NP %d.n", pedge->me-TRACKER[TYPE_P2PS].head);
  715. writeMessage (pedge->me, pc, buffer);
  716. }
  717. }
  718. if(bShouldCloseSP != 0 || pedge == pc->PeerHead/* no NP*/)
  719. return -1; // Close Connection to SP
  720. return 0;
  721. }
  722. int process_P2P_RESPONSE (int listnum, struct Message *m)
  723. {
  724. int size;
  725. struct Session *p = &(TRACKER[TYPE_P2PC].head[listnum]);
  726. struct Channel *pc;
  727. char *msg = m->buffer+MD5_LEN;
  728. if ((pc=findChannel (m->buffer, MD5_LEN)) == NULL)
  729. return -1;
  730.   if ((size = saveBlock (pc, msg, p)) <= 0)
  731. {
  732.     PDEBUG ("save block error, size %d, %dn", size, listnum);
  733. return -1;
  734. // Clientclosure (listnum, TYPE_P2PC);
  735. }
  736. p->last_transferblock = CurrentTime;
  737. return 0;
  738. }
  739. int process_P2PC (int listnum)
  740. {
  741. struct Session *p = &(TRACKER[TYPE_P2PC].head[listnum]);
  742. struct Message *m = (struct Message *)(p->buf+p->start);
  743. tmpDownBytes += m->len;
  744. switch (m->type)
  745. {
  746. case P2P_SPUPDATE:
  747. if(process_P2P_SPUPDATE (listnum, m) < 0)
  748. {
  749. Clientclosure(listnum, TYPE_P2PC);
  750. }
  751. break;
  752. case P2P_RESPONSE:
  753. process_P2P_RESPONSE (listnum, m);
  754. break;
  755. case P2P_MSG:
  756. break;
  757. case P2P_MEDIATYPE:
  758. process_P2P_MEDIATYPE (listnum, m);
  759. break;
  760. default:
  761. PDEBUG("Err msg type from SP.n");
  762. Clientclosure (listnum, TYPE_P2PC);
  763. return -1;
  764. }
  765. return 0;
  766. }
  767. int closure_P2PC (int listnum)
  768. {
  769. // struct Edge *pedge, *prevedge;
  770. // char buffer[MAX_DATA], *buf;
  771. struct Session *p=&(TRACKER[TYPE_P2PC].head[listnum]);
  772. struct Channel *pc = p->pc;
  773. if (pc)
  774. {
  775. /*
  776. buf = buffer + sizeof (int);
  777. *(unsigned char *)buf = P2P_SPUPDATE;
  778. buf += sizeof (char);
  779. memcpy (buf, pc->channel_md5, MD5_LEN);
  780. buf += MD5_LEN;
  781. memset (buf, 0, sizeof (struct SPUpdate));
  782. buf += sizeof (struct SPUpdate);
  783. *(int *)buffer = buf - buffer;
  784. for (pedge=pc->PeerHead; pedge; pedge=prevedge)
  785. {
  786. pc->numclient --;
  787. writeMessage (pedge->me, buffer);
  788. prevedge = pedge->cnext;
  789. delEdge (pedge);
  790. }
  791. */
  792. pc->pcinfo->dataSource = NULL;
  793. }
  794. PDEBUG ("SP disconnected from %d.%d.%d.%d:%d.n", IPADDR (p->host), p->port);
  795. FD_CLR(p->socket, &osocks);
  796. close (p->socket);
  797. FREE (p->buf);
  798. deleteAll (p);
  799. memset (p, 0, sizeof (struct Session));
  800. return 0;
  801. }
  802. #ifdef HAVE_TS
  803. int register_cp () //send UDP msg
  804. {
  805. const int max_times = 10;
  806. int i;
  807. char *buf;
  808. char buffer[MAX_DATA];
  809. isSet = 0;
  810. if (TSSOCK == 0)
  811. {
  812. for (i=0; i<max_times; i++)
  813. {
  814. if (BINDALL == 0)
  815. TSSOCK = init_udp (BINDIP, cfgCP2TS_PORT);
  816. else
  817. TSSOCK = init_udp (NULL, cfgCP2TS_PORT);
  818. if (TSSOCK > 0) break;
  819. PDEBUG("Sleep 1000. cause init UDP port %d failed.", cfgCP2TS_PORT);
  820. sleep (1000);
  821. }
  822. if (TSSOCK <= 0)
  823. {
  824. PDEBUG ("exit...n");
  825. exit (1); //the max times try init_udp failure
  826. }
  827. }
  828. buf = buffer + sizeof (int);
  829. *(unsigned char *)buf = CP2TS_REGISTER;
  830. buf += sizeof (char);
  831. *(int *)buf = AUTH_USERID;
  832. buf += sizeof (int);
  833. strncpy (buf, AUTH_MD5, MD5_LEN);
  834. buf += MD5_LEN;
  835. *(unsigned short *)buf = htons (cfgP2PS_PORT);
  836. buf += sizeof (short);
  837. if (SCP_CHANNEL) //Now only GCP is available
  838. {
  839. *buf = CT_SPECIFIED_RES;
  840. buf += sizeof (char);
  841. memcpy (buf, SCP_CHANNEL, strlen(SCP_CHANNEL)+1);
  842. buf += strlen (SCP_CHANNEL)+1;
  843. } else if (ECP_REGION)
  844. {
  845. *buf = CT_EDGE;
  846. buf += sizeof (char);
  847. buf = parseECP (ECP_REGION, buf);
  848. } else
  849. {
  850. *buf = CT_GENERAL;
  851. buf += sizeof (char);
  852. }
  853. *(int *)buffer = buf - buffer; //the size of buffer;
  854. TSADDR.sin_port = htons (TS4CP_PORT);
  855. TSADDR.sin_addr.s_addr = inet_addr (SERVERIP);
  856. TSADDR.sin_family = AF_INET;
  857. if (sendMessage (TSSOCK, buffer, &TSADDR) < 0) //send register msg
  858. {
  859. PDEBUG ("Cannot write to servern");
  860. return -1;
  861. }
  862. return 0;
  863. }
  864. #endif
  865. int init_cp ()
  866. {
  867. FILE *pidf;
  868. struct rlimit rl;
  869. char buffer[MAX_DATA];
  870. rl.rlim_cur = rl.rlim_max = 1000000;
  871. if (setrlimit (RLIMIT_NOFILE, &rl) != 0)
  872. {
  873. perror ("getrlimit");
  874. }
  875. OPENLOG;
  876. #ifdef DEBUG
  877. system ("ulimit -a");
  878. if (getrlimit (RLIMIT_CORE, &rl) != 0)
  879. {
  880. perror ("getrlimit");
  881. }
  882. fprintf (stderr, "Get core limit %d:%dn", (int)rl.rlim_cur, (int)rl.rlim_max);
  883. rl.rlim_cur = rl.rlim_max = (rlim_t )102400;
  884. if (setrlimit (RLIMIT_CORE, &rl) != 0)
  885. {
  886. perror ("getrlimit");
  887. }
  888. if (getrlimit (RLIMIT_CORE, &rl) != 0)
  889. {
  890. perror ("getrlimit");
  891. }
  892. fprintf (stderr, "Set core limit to %d:%dn", (int)rl.rlim_cur, (int)rl.rlim_max);
  893. system ("ulimit -a");
  894. #endif
  895. #ifdef HAVE_TS
  896. if (register_cp () < 0)
  897. {
  898. PDEBUG ("Cannot init TS connectionn");
  899. return -1;
  900. }
  901. #endif
  902. TRACKER[TYPE_P2PS].flag = FLAG_SERVER;
  903. TRACKER[TYPE_P2PS].type = TYPE_P2PS;
  904. TRACKER[TYPE_P2PS].port = cfgP2PS_PORT;
  905. TRACKER[TYPE_P2PS].cur = 0;
  906. TRACKER[TYPE_P2PS].max = MAX_P2PS;
  907. TRACKER[TYPE_P2PS].init = init_P2PS;
  908. TRACKER[TYPE_P2PS].process = process_P2PS;
  909. TRACKER[TYPE_P2PS].closure = closure_P2PS;
  910. TRACKER[TYPE_P2PS].head = calloc (sizeof (struct Session), TRACKER[TYPE_P2PS].max);
  911. switch (BINDALL)
  912. {
  913. case 0:
  914. if ((TRACKER[TYPE_P2PS].sock = init_server (BINDIP, cfgP2PS_PORT)) < 0)
  915. return -1;
  916. break;
  917. default:
  918. if ((TRACKER[TYPE_P2PS].sock = init_server (NULL, cfgP2PS_PORT)) < 0)
  919. return -1;
  920. break;
  921. }
  922. FD_SET(TRACKER[TYPE_P2PS].sock, &osocks);
  923. TRACKER[TYPE_P2PC].flag = FLAG_CLIENT;
  924. TRACKER[TYPE_P2PC].type = TYPE_P2PC;
  925. TRACKER[TYPE_P2PC].port = 0;
  926. TRACKER[TYPE_P2PC].cur = 0;
  927. TRACKER[TYPE_P2PC].max = MAX_P2PC;
  928. TRACKER[TYPE_P2PC].init = init_P2PC;
  929. TRACKER[TYPE_P2PC].process = process_P2PC;
  930. TRACKER[TYPE_P2PC].closure = closure_P2PC;
  931. TRACKER[TYPE_P2PC].head = calloc (sizeof (struct Session), TRACKER[TYPE_P2PC].max);
  932. mkdir (PREFIX, 0777);
  933. snprintf (buffer, MAX_DATA, "%s/%s", PREFIX, LIVE_PREFIX);
  934. mkdir (buffer, 0777);
  935. snprintf (buffer, MAX_DATA, "rm -fr %s/%s/*", PREFIX, LIVE_PREFIX);
  936. system (buffer);
  937. if ((pidf = fopen (PIDFile, "w")) == NULL)
  938. {
  939. PDEBUG ("Cannot open pidfile.n");
  940. return -1;
  941. }
  942. fprintf (pidf, "%dn", getpid ());
  943. fclose (pidf);
  944. return 0;
  945. }
  946. #ifdef HAVE_TS
  947. int process_TS()
  948. {
  949. struct sockaddr_in dest;
  950. int addr_len = sizeof (dest);
  951. struct Message m;
  952. int i;
  953. if ((i = recvfrom (TSSOCK, &m, sizeof (struct Message), 0, (struct sockaddr *)&dest, &addr_len)) <= 0)
  954. {
  955. PDEBUG ("Error in recving ts message.n");
  956. register_cp ();
  957. return 0;
  958. }
  959. switch (m.type)
  960. {
  961. case TS2CP_WELCOME:
  962. memcpy (&UDPMsg, &m, 12);
  963. PDEBUG("recv WELCOME from TS.n");
  964. /* isSet whether TS has returned WELCOME message */
  965. isSet = 1;
  966. break;
  967. case TS2CP_PEERS:
  968. process_TS2CP_PEERS (m.buffer);
  969. break;
  970. case TS2CP_MSG:
  971. if (*(char *)(m.buffer+sizeof(short)))
  972. {
  973. PDEBUG ("Error in TS2CP_MSG. n");
  974. register_cp ();
  975. }
  976. break;
  977. default:
  978. PDEBUG ("Error in trackerserver message formatn");
  979. register_cp ();
  980. return -1;
  981. }
  982. return 0;
  983. }
  984. int closure_TS ()
  985. {
  986. struct TSMessage *m = &UDPMsg;
  987. m->type = CP2TS_LOGOUT;
  988. m->len = 12;
  989. sendMessage (TSSOCK, (char *)m, &TSADDR);
  990. return 0;
  991. }
  992. int process_TS2CP_PEERS (char *buf)
  993. {
  994. struct Channel *pc;
  995. int listnum;
  996. char *channel_md5;
  997. unsigned char cpsize;
  998. struct NormalAddress *CPlist;
  999. unsigned char peersize;
  1000. struct PeerInfoWithAddr *pinfo;
  1001. channel_md5 = buf;
  1002. buf += MD5_LEN;
  1003. cpsize = *(unsigned char *)buf;
  1004. buf += sizeof (char);
  1005. CPlist = (struct NormalAddress *)buf;
  1006. buf += sizeof(struct NormalAddress)*cpsize;
  1007. peersize = *(unsigned char *)buf;
  1008. buf += sizeof (char);
  1009. pinfo = (struct PeerInfoWithAddr *)buf;
  1010. // now find the channel
  1011. if ((pc = findChannel (channel_md5, MD5_LEN)) == NULL) return -1;
  1012. if ((listnum = newChannel (pc, CPlist, cpsize, TYPE_CP)) < 0)
  1013. return -1;
  1014. return 0;
  1015. }
  1016. #endif
  1017. char *parseECP (char *str, char *buf)
  1018. {
  1019. char  *buffer = buf;
  1020. int flag = -1;
  1021. unsigned char c;
  1022. unsigned char part;
  1023. buf += sizeof (int);
  1024. for (part=0; *str ;str++)
  1025. {
  1026. c = *str;
  1027. switch (c)
  1028. {
  1029. case ':':
  1030. flag = 0;
  1031. break;
  1032. case '.':
  1033. if (flag < 2)
  1034. {
  1035. *(unsigned char *)buf = part;
  1036. buf += sizeof (char);
  1037. }
  1038. part = 0;
  1039. flag ++;
  1040. break;
  1041. default:
  1042. if (c >= '0' && c <= '9')
  1043. part = c;
  1044. break;
  1045. }
  1046. }
  1047. *(int *)buffer = (buf - buffer - sizeof (int))/sizeof(short);
  1048. return buf;
  1049. }
  1050. int periodCheck (float KBPSused)
  1051. {
  1052. struct Session *head;
  1053. int max, listnum;
  1054. struct statistics
  1055. {
  1056. unsigned int resnum;
  1057. unsigned short connnum;
  1058. float bandwidth;
  1059. } stat;
  1060. stat.bandwidth = KBPSused/(MAX_BANDWIDTH*1024)/8;
  1061. /* isSet whether TS has returned WELCOME message */
  1062. if (isSet == 0)
  1063. {
  1064. register_cp ();
  1065. return 0;
  1066. }
  1067. max = TRACKER[TYPE_P2PC].maxid + 1;
  1068. head = TRACKER[TYPE_P2PC].head;
  1069. memset (&stat, 0, sizeof(stat));
  1070. for (listnum = 0; listnum < max; listnum++)
  1071. {
  1072. if (head[listnum].socket > 0)
  1073. {
  1074. stat.resnum ++;
  1075. stat.connnum ++;
  1076. }
  1077. }
  1078. max = TRACKER[TYPE_P2PS].maxid + 1;
  1079. head = TRACKER[TYPE_P2PS].head;
  1080. for (listnum = 0; listnum < max; listnum++)
  1081. {
  1082. if (head[listnum].socket > 0)
  1083. {
  1084. if (head[listnum].pc == NULL &&
  1085. head[listnum].header == NULL &&
  1086. CurrentTime - head[listnum].last_transferblock > MAX_TRANSFER_IDLE)
  1087. {
  1088. PDEBUG ("timeout %d from NP %d to %d.n", listnum, head[listnum].last_transferblock, (int)CurrentTime);
  1089. Clientclosure (listnum, TYPE_P2PS);
  1090. }
  1091. else
  1092. stat.connnum ++;
  1093. }
  1094. }
  1095. #ifdef HAVE_TS
  1096. *(unsigned int *)(UDPMsg.buffer) = stat.resnum;
  1097. *(unsigned short *)(UDPMsg.buffer+sizeof(int)) = stat.connnum;
  1098. *(float *)(UDPMsg.buffer+sizeof(int)+sizeof(short)) = stat.bandwidth;
  1099. if(stat.connnum <= MAX_P2PS)
  1100. *(unsigned char *)(UDPMsg.buffer+sizeof(int)+sizeof(short)+sizeof(float)) = 0;
  1101. else
  1102. *(unsigned char *)(UDPMsg.buffer+sizeof(int)+sizeof(short)+sizeof(float)) = 1;
  1103. UDPMsg.len = 23;
  1104. UDPMsg.type = CP2TS_UPDATE;
  1105. if (sendMessage (TSSOCK, (char *)&UDPMsg, &TSADDR) < 0)
  1106. {
  1107. PDEBUG ("exit...n");
  1108. exit (1);
  1109. }
  1110. #endif
  1111. PDEBUG("Res Num: %d. Connection Num: %d. BandWidth Usage: %.4f. n", stat.resnum, stat.connnum, stat.bandwidth);
  1112. return 0;
  1113. }
  1114. void makeSnapShot(int count, int time_interval)
  1115. {
  1116. time_t tmpTime;
  1117. struct tm result;
  1118. struct Channel *pc, *nextpc;
  1119. // struct Session *ps;
  1120. // struct Edge *pe;
  1121. int cpchannelcount = 0;
  1122. int totalclient = 0;
  1123. long long totalupsize = 0, totaldownsize = 0;
  1124. FILE *f;
  1125. char buffer[MAX_DATA];
  1126. if (time_interval <= 0)
  1127. return;
  1128. localtime_r(&CurrentTime, &result);
  1129. sprintf (buffer, "./cp-%d-%d-%d.log", result.tm_year+1900, result.tm_mon+1, result.tm_mday);
  1130. if ((f = fopen(buffer,"a")) == NULL)
  1131. {
  1132. PDEBUG("Couldn't open cp.log file! n");
  1133. return;
  1134. }
  1135. fseeko(f, 0, SEEK_END);
  1136. // 1. start CP SnapShot 
  1137. fprintf(f, "nn**************Start %d SnapShot of CP, Time : %u/%u %u:%u:%u.*********  n",count,result.tm_mon+1, result.tm_mday, result.tm_hour, result.tm_min, result.tm_sec);
  1138. // 2. log speed
  1139. fprintf(f, "CP: cur Down %.4f KB. n", ((float)tmpDownBytes)/1024/time_interval);
  1140. fprintf(f, "CP: cur Up   %.4f KB. n", ((float)tmpUpBytes)/1024/time_interval);
  1141. periodCheck(((float)tmpDownBytes+tmpUpBytes)/1024/time_interval);
  1142. totalDownBytes += tmpDownBytes;
  1143. totalUpBytes += tmpUpBytes;
  1144. tmpTime = CurrentTime - startTime; 
  1145. fprintf(f, "CP: avg Down %.4f KB. n", ((float)totalDownBytes)/1024/tmpTime);
  1146. fprintf(f, "CP: avg Up   %.4f KB. n", ((float)totalUpBytes)/1024/tmpTime);
  1147. // 3. log channel state
  1148. for (pc=ChannelList; pc; pc=nextpc)
  1149. {
  1150. nextpc = pc->lnext;
  1151. ++cpchannelcount;
  1152. totalclient += pc->numclient;
  1153. totalupsize += pc->upsize;
  1154. totaldownsize += pc->downsize;
  1155. fprintf(f,"Channel %s have %d client. Down size %lld, avg speed %f. Up Size %lld, avg speed %f. n",pc->fname,pc->numclient,pc->downsize, ((float)(pc->downsize)) / time_interval, pc->upsize, ((float)(pc->upsize))/ time_interval);
  1156. /*
  1157. for (pe=pc->PeerHead; pe; pe = pe->cnext)
  1158. {
  1159. // if bitrate < 300kb/s ,then kill it
  1160. if (pe->me->totalup/(CurrentTime - pe->me->time_sec)/1024 < 300)
  1161. fprintf(f,"Session bitrate:%lld .Too slow ! n",pe->me->totalup/(CurrentTime - pe->me->time_sec));
  1162. }
  1163. */
  1164. if (pc->numclient == 0)
  1165. freeLiveChannel (pc, NULL);
  1166. else if (CurrentTime - pc->last_nearpeer > NearPeerInterval)
  1167. {
  1168. send_nearpeers_toall (pc);
  1169. pc->last_nearpeer = CurrentTime;
  1170. }
  1171. }
  1172. fprintf(f,"Channel Count : %d. Total client : %d. Total dowsize: %lld. Total upsize %lld n",cpchannelcount,totalclient,totaldownsize,totalupsize);
  1173. fprintf(f,"n*********************End of SnapShot************************n");
  1174. fclose(f);
  1175. logto_xml (time_interval, tmpTime, cpchannelcount, totalclient);
  1176. tmpDownBytes = tmpUpBytes = 0;
  1177. }
  1178. int reconnect (struct Session *p)
  1179. {
  1180. struct NormalAddress *client;
  1181. struct sockaddr_in addr;
  1182. if (p->pc == NULL || p->pc->pcinfo->numofsp <= p->flag) return -1;
  1183. close (p->socket);
  1184. FD_CLR(p->socket, &osocks);
  1185. if ((p->socket = socket (PF_INET, SOCK_STREAM, 0)) < 0)
  1186. {
  1187. perror ("socket||gethostbyname");
  1188. p->socket = 0;
  1189. return -1;
  1190. }
  1191. client = &(p->pc->pcinfo->SPLIST[p->flag]);
  1192. memset (&addr, 0, sizeof (addr));
  1193. addr.sin_port = client->sin_port;
  1194. addr.sin_addr = client->sin_addr;
  1195. addr.sin_family = AF_INET;
  1196. p->flag ++;
  1197. if ((p->sock_flag = connect_nonb(p->socket, &addr, sizeof (addr))) == -1)
  1198. {
  1199. close (p->socket);
  1200. return -1;
  1201. }
  1202. FD_SET(p->socket, &osocks);
  1203. p->time_sec = CurrentTime;
  1204. p->totalup = 0;
  1205. return p->flag;
  1206. }
  1207. int send_P2P_PUSHLIST (struct Channel *pc, int id)
  1208. {
  1209. unsigned char *ptr;
  1210. char buffer[MAX_DATA], *buf;
  1211. struct LiveChannelInfo *pcinfo = pc->pcinfo;
  1212. int i, j;
  1213. buf = buffer+sizeof (int);
  1214. *(unsigned char *)buf = P2P_PUSHLIST;
  1215. buf += sizeof (char);
  1216. memcpy (buf, pc->channel_md5, MD5_LEN);
  1217. buf += MD5_LEN;
  1218. *(unsigned char *)buf = 0;
  1219. buf += sizeof (char);
  1220. ptr = buf;
  1221. buf += sizeof (char);
  1222. *ptr = 0;
  1223. for (i=id; (i >= pcinfo->s.minBlockID && i <= pcinfo->s.maxBlockID) && i<id+REQUEST_AHEAD; i++)
  1224. {
  1225. j = i % pcinfo->max_queue;
  1226. if (pcinfo->indisk[j] == (i/pcinfo->max_queue+1) || isSet (pcinfo->bitflag, j))
  1227. continue;
  1228. *(int *)buf = i;
  1229. buf += sizeof (int);
  1230. (*ptr) ++;
  1231. setBit (pcinfo->bitflag, j);
  1232. }
  1233. if (*ptr > 0)
  1234. {
  1235. *(unsigned char *)buf = 0;
  1236. buf += sizeof (char);
  1237. *(int *)buffer = buf - buffer;
  1238. writeMessage (pcinfo->dataSource, pc, buffer);
  1239. }
  1240. return 0;
  1241. }
  1242. int send_nearpeers (struct Channel *pc, struct Edge *pme)
  1243. {
  1244. char buffer[MAX_DATA], *buf, *ptr;
  1245. struct Edge *pedge;
  1246. int i;
  1247. buf = buffer+sizeof (int);
  1248. *(unsigned char *)buf = P2P_NEAR_PEERS;
  1249. buf += sizeof (char);
  1250. ptr = buf;
  1251. buf += sizeof (char);
  1252. for (i=0, pedge=pme->cnext; i<MAX_NEARPEER; pedge=pedge->cnext)
  1253. {
  1254. if (pedge == NULL)
  1255. pedge = pc->PeerHead;
  1256. if (pedge == pme)
  1257. break;
  1258. memcpy (buf, &(pedge->me->addr), sizeof (struct PeerInfoWithAddr));
  1259. buf += sizeof (struct PeerInfoWithAddr);
  1260. i++;
  1261. }
  1262. if (i > 0)
  1263. {
  1264. *(unsigned char *)ptr = i;
  1265. *(int *)buffer = buf - buffer;
  1266. writeMessage (pme->me, pc, buffer);
  1267. }
  1268. return 0;
  1269. }
  1270. int send_nearpeers_toall (struct Channel *pc)
  1271. {
  1272. struct Edge *pedge;
  1273. for (pedge=pc->PeerHead; pedge; pedge=pedge->cnext)
  1274. send_nearpeers (pc, pedge);
  1275. return 0;
  1276. }