SPnew.c
上传用户:liguizhu
上传日期:2015-11-01
资源大小:2422k
文件大小:37k
源码类别:

P2P编程

开发平台:

Visual C++

  1. /*
  2.  *  Openmysee
  3.  *
  4.  *  This program is free software; you can redistribute it and/or modify
  5.  *  it under the terms of the GNU General Public License as published by
  6.  *  the Free Software Foundation; either version 2 of the License, or
  7.  *  (at your option) any later version.
  8.  *
  9.  *  This program is distributed in the hope that it will be useful,
  10.  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
  11.  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  12.  *  GNU General Public License for more details.
  13.  *
  14.  *  You should have received a copy of the GNU General Public License
  15.  *  along with this program; if not, write to the Free Software
  16.  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
  17.  *
  18.  */
  19.  
  20. #include "echo.h"
  21. struct Argument
  22. {
  23. int type;
  24. int spchannelcount;
  25. int totalclient;
  26. long long totalupsize;
  27. FILE *f, *xml[MAX_TS];
  28. char *buf;
  29. };
  30. #define SPUPDATE_SLOT 1
  31. #define MAX_CHANNEL 1024 /* max number of Channel */
  32. #define SP4CP_PORT 50001
  33. #define TS4RM_PORT 22169
  34. #define RM2TS_STAT_QUERY 0x20
  35. #define TS2RM_STAT_RESPONSE 0x30
  36. int MAX_CP=512;
  37. int MAX_CS=32;
  38. int MAX_JOB_PER_SESSION=3;
  39. char *Home = "./";
  40. char *PREFIX = "/data/sp/";
  41. char *Database = "user.db";
  42. char *CONFIG = "./asp.cfg";
  43. char *PIDFile = "/var/run/spnew.pid";
  44. char *RULE = "";
  45. char *SERVERIP;
  46. char *DBFILE;
  47. char *MEDIABASE;
  48. char *AUTH_MD5;
  49. int AUTH_USERID;
  50. fd_set osocks;
  51. int NumNewChannel;
  52. struct Message TempMsg;
  53. char *NET_NAME[] = { "edu", "cnc", "uni", "tel" };
  54. long long totalDownBytes=0, totalUpBytes=0;
  55. long long tmpDownBytes=0, tmpUpBytes=0;
  56. char *urlroot;
  57. char *defaultspip;
  58. char *spip[4];
  59. char *tsip[4];
  60. #ifdef TEST
  61. char *WWW_ROOT;
  62. #endif
  63. #ifdef HAVE_MYSQL
  64. char *MYSQL_HOST;
  65. char *MYSQL_USER;
  66. char *MYSQL_PASS;
  67. char *MYSQL_DB;
  68. #endif
  69. char *CAS_ADDR;     // Content Aggregation Server, added by lixingwu, 20070313
  70. int tsSock[MAX_TS];
  71. struct sockaddr_in tsAddr[MAX_TS];
  72. extern struct Channel *ChannelHash[MAX_CHANNEL];
  73. extern struct Channel *ChannelList;
  74. extern int isDelete (struct Channel *pc);
  75. extern int errno;
  76. int JobHighWater = 10000;
  77. int BINDALL=1;
  78. int AuthCS=0;
  79. time_t startTime;
  80. time_t CurrentTime;
  81. int PERIOD;
  82. int SnapShotInterval; //make a snapshot every few second
  83. int spuprecord[MAX_CHANNEL];
  84. struct ServerDesc TRACKER[MAX_TYPE];
  85. char *LOGXML;
  86. struct NamVal ConfigParameters[] = {
  87. {"DBdir", &Home, 's'},
  88. {"AuthCS", &AuthCS, 'd'},
  89. {"MAX_CP",&MAX_CP, 'd'},
  90. {"MAX_CS", &MAX_CS, 'd'},
  91. {"Pidfile", &PIDFile, 's'},
  92. {"Prefix", &PREFIX, 's'},
  93. {"DBfile", &Database, 's'},
  94. {"TrackerIP", &SERVERIP, 's'},
  95. {"Dir", &MEDIABASE, 's'},
  96. {"authid", &AUTH_USERID, 'd'},
  97. {"authmd5", &AUTH_MD5, 's'},
  98. {"periodDump", &PERIOD, 'd'}, //make a periodCheck every periodDump second
  99. {"UrlRoot", &urlroot, 's'},
  100. {"EDUSPIP", &spip[EDUTS], 's'},
  101. {"CNCSPIP", &spip[CNCTS], 's'},
  102. {"UNISPIP", &spip[UNITS], 's'},
  103. {"TELSPIP", &spip[TELTS], 's'},
  104. {"BINDALL", &BINDALL, 'd'},
  105. {"EDUTSIP", &tsip[EDUTS], 's'},
  106. {"CNCTSIP", &tsip[CNCTS], 's'},
  107. {"UNITSIP", &tsip[UNITS], 's'},
  108. {"TELTSIP", &tsip[TELTS], 's'},
  109. {"SnapShotInterval", &SnapShotInterval, 'd'},
  110. {"LogFilePath", &LOGXML, 's'},
  111. #ifdef TEST
  112. {"WWWRoot", &WWW_ROOT, 's'},
  113. #endif
  114. #ifdef HAVE_MYSQL
  115. {"MysqlAddress", &MYSQL_HOST, 's'},
  116. {"User", &MYSQL_USER, 's'},
  117. {"Database", &MYSQL_DB, 's'},
  118. {"Password", &MYSQL_PASS, 's'},
  119. #endif
  120. {"AllowCSList", &RULE, 's'},
  121. {"JobHighWater", &JobHighWater, 'd'},
  122.     {"CAS_ADDR", &CAS_ADDR, 's'}     // Content Aggregation Server, added by lixingwu, 20070313
  123. };
  124. extern int db_end ();
  125. extern int db_init (char *home, char *database);
  126. extern int isAllowed (int id, char *md5, char *cname, float bitrate,
  127.       float *limitedbitrate, int *issave);
  128. extern char *getJobBuffer (struct JobDes *p, int *max);
  129. extern inline void setblockId (struct JobDes *pj, int id);
  130. #ifdef TEST
  131. int buildGTV (struct Channel *pc, int datalen, char *data, int type);
  132. #endif
  133. int send_P2P_SPUPDATE (struct Session *p, struct Channel *pc, char *md5, struct SPUpdate *s);
  134. int send_p2p_err (struct Session *p, unsigned short code, int quit);
  135. int init_sp ();
  136. int handle_new_connection (int sock, int type);
  137. int Clientclosure (int listnum, int type);
  138. void process_child (void);
  139. int init_CP (int listnum);
  140. int process_CP (int listnum);
  141. int closure_CP (int listnum);
  142. int init_CS (int listnum);
  143. int process_CS (int listnum);
  144. int closure_CS (int listnum);
  145. #ifdef HAVE_MYSQL
  146. MYSQL *local_mysql;
  147. #endif
  148. int process_P2P_REQUEST_real (struct Session *p, struct Channel *pc, int id);
  149. void apply_check (struct Channel *p, void *arg);
  150. void apply_idle (struct Session *p, void *arg);
  151. int period_process ();
  152. void process_TS2RM (int type);
  153. extern int locate_mplist_by_id (struct Channel *pc, unsigned int id, char *buf, int max);
  154. extern int check_newplist ();
  155. extern int timer_free ();
  156. extern int timer_process (unsigned int t);
  157. extern void hup_handler (int);
  158. #include "sessions.c"
  159. #define BUILD_NOCH_SPUPDATE(s) do
  160. {
  161. s.minKeySample = -1LL;
  162. s.maxKeySample = -1LL;
  163. s.minBlockID = 0xffffffff;
  164. s.maxBlockID = 0xffffffff;
  165. } while (0)
  166. #define BUILD_ORDER_SPUPDATE(s,pc) do
  167. {
  168. s.minKeySample = -1LL;
  169. s.maxKeySample = -1LL;
  170. s.minBlockID = 0;
  171. s.maxBlockID = (pc->downsize > 0 ? ((pc->downsize - 1) / DEFAULT_BLOCK) : 0)+1;
  172. } while (0)
  173. #define BUILD_CLOSE_SPUPDATE(s) do
  174. {
  175. s.minKeySample = 0;
  176. s.maxKeySample = 0;
  177. s.minBlockID = 0xffffffff;
  178. s.maxBlockID = 0xffffffff;
  179. } while (0)
  180. #define BUILD_MLIST_SPUPDATE(s,pc) do
  181. {
  182. s.minKeySample = -2LL;
  183. s.maxKeySample = pc->pcinfo->mlist->m_totalchannel;
  184. s.minBlockID = 0;
  185. s.maxBlockID = pc->pcinfo->maxID;
  186. } while (0)
  187. #define BUILD_LIVE_SPUPDATE(spupdate,pc) do
  188. {
  189. spupdate.minKeySample = -3LL;
  190. spupdate.maxKeySample = 1;
  191. spupdate.minBlockID = pc->pcinfo->s.minBlockID;
  192. spupdate.maxBlockID = pc->pcinfo->s.maxBlockID;
  193. } while (0)
  194. void send_all_spupdate (struct Channel *pc, struct SPUpdate *s)
  195. {
  196. struct LiveChannelInfo *pcinfo = pc->pcinfo;
  197. struct Edge *pedge;
  198. char buffer[MAX_DATA], *buf;
  199. if (pcinfo == NULL) return;
  200. pcinfo->updated = CurrentTime;
  201. buf = buffer + sizeof (int);
  202. *(unsigned char *) buf = P2P_SPUPDATE;
  203. buf += sizeof (char);
  204. memcpy (buf, pc->channel_md5, MD5_LEN);
  205. buf += MD5_LEN;
  206. if (s == NULL)
  207. memset (buf, 0, sizeof (struct SPUpdate));
  208. else
  209. memcpy (buf, s, sizeof (struct SPUpdate));
  210. buf += sizeof (struct SPUpdate);
  211. *(int *) buffer = buf - buffer;
  212. for (pedge = pc->PeerHead; pedge; pedge = pedge->cnext)
  213. {
  214. if (writeMessage (pedge->me, pc, buffer) < 0)
  215. {
  216. PDEBUG ("send SPUPDATE err.n");
  217. }
  218. }
  219. }
  220. void
  221. process_TS2RM (int type) //assure num of channel 
  222. {
  223. int len;
  224. char buffer[4096];
  225. char *buf;
  226. struct sockaddr_in addr;
  227. int addrlen = sizeof (struct sockaddr);
  228. int i, msgsize, chnlnum;
  229. struct Channel *pChannel;
  230. if ((len =
  231.      recvfrom (tsSock[type], buffer, 4096, 0,
  232.        (struct sockaddr *) &addr, &addrlen)) < 0)
  233. {
  234. perror ("recvfrom ts");
  235. return;
  236. }
  237. PINFO ("got TS msg, len %d. n", len);
  238. buf = buffer;
  239. msgsize = *(int *) buf;
  240. buf += sizeof (int);
  241. if (*(unsigned char *) buf != TS2RM_STAT_RESPONSE)
  242. {
  243. perror ("bad message type from ts");
  244. return;
  245. }
  246. buf += sizeof (unsigned char);
  247. chnlnum = *(int *) buf;
  248. buf += sizeof (int);
  249. if (chnlnum > 100)
  250. {
  251. perror ("too large channel num from ts");
  252. return;
  253. }
  254. for (i = 0; i < chnlnum; ++i)
  255. {
  256. if ((pChannel = findChannel (buf, MD5_LEN)) == NULL)
  257. {
  258. buf += MD5_LEN;
  259. buf += sizeof (int);
  260. continue;
  261. }
  262. buf += MD5_LEN;
  263. pChannel->numofnp[type] = *(int *) buf;
  264. buf += sizeof (int);
  265. PINFO ("%s->%s: client %d.n",
  266. pChannel->pcinfo ? pChannel->channel_name : "",
  267. pChannel->channel_md5, pChannel->numofnp[type]);
  268. }
  269. }
  270. time_t lastCheck, last_snapshot;
  271. int
  272. period_process ()
  273. {
  274. int type;
  275. time_t tmpTime, time_interval;
  276. struct Argument arg;
  277. char buffer[MAX_DATA];
  278. struct tm result;
  279. int *querynum;
  280. static int snapCount = 0;
  281. timer_process (CurrentTime);
  282. check_newplist ();
  283. if (CurrentTime <= lastCheck + PERIOD) //PERIOD =periodDump=60
  284. return 0;
  285. memset (&arg, 0, sizeof (arg));
  286. tmpTime = CurrentTime - startTime;
  287. time_interval = CurrentTime - lastCheck;
  288. #ifdef HAVE_MYSQL
  289. query_mysql (local_mysql, "delete from channel where '1'");
  290. #endif
  291. #ifdef TEST
  292. for (type=0; type<MAX_TS; type++)
  293. {
  294. snprintf (buffer, MAX_DATA, "%s/%s/channel.xml", WWW_ROOT,
  295.    NET_NAME[type]);
  296. if ((arg.xml[type] = fopen (buffer, "w")) == (FILE *) 0)
  297. {
  298. PDEBUG ("Error in open file %sn", buffer);
  299. continue;
  300. }
  301. fprintf (arg.xml[type],
  302.   "<?xml version="1.0" encoding="gbk"?>rn<?xml:stylesheet type="text/xsl" href="simple.xsl" ?>rn<GaoV>rn");
  303. }
  304. #endif
  305. localtime_r (&CurrentTime, &result);
  306. snprintf (buffer, MAX_DATA, "./sp-%d-%d-%d.log", result.tm_year+1900, result.tm_mon+1, result.tm_mday);
  307. if ((arg.f = fopen (buffer, "a")) == NULL)
  308. {
  309. PDEBUG ("Couldn't open sp log file! n");
  310. return -1;
  311. }
  312. fprintf (arg.f, "nn********************Start %d SnapShot of SP, Time: %u/%u %u:%u:%u.n",
  313. snapCount++, result.tm_mon + 1, result.tm_mday, result.tm_hour,
  314.   result.tm_min, result.tm_sec);
  315. fprintf(arg.f, "SP: cur Down %.4f KB. n", ((float)tmpDownBytes)/1024/time_interval);
  316. fprintf(arg.f, "SP: cur Up   %.4f KB. n", ((float)tmpUpBytes)/1024/time_interval);
  317. totalDownBytes += tmpDownBytes;
  318. totalUpBytes += tmpUpBytes;
  319. fprintf(arg.f, "SP: avg Down %.4f KB. n", ((float)totalDownBytes)/1024/tmpTime);
  320. fprintf(arg.f, "SP: avg Up   %.4f KB. n", ((float)totalUpBytes)/1024/tmpTime);
  321. arg.buf = buffer + sizeof (int);
  322. *(unsigned char *) (arg.buf) = RM2TS_STAT_QUERY;//RM2TS=0x20
  323. arg.buf += sizeof (unsigned char);
  324. querynum = (int *) (arg.buf);
  325. arg.buf += sizeof (int);
  326. arg.type = TYPE_CS;
  327. apply_session (TRACKER[TYPE_CS].head, TRACKER[TYPE_CS].maxid+1, apply_idle, &arg);
  328. /*
  329. arg.type = TYPE_CP;
  330. apply_session (TRACKER[TYPE_CP].head, TRACKER[TYPE_CP].maxid+1, apply_idle, &arg);
  331. */
  332. apply_list (ChannelList, apply_check, &arg);
  333. #ifdef TEST
  334. for (type=0; type<MAX_TS; type++)
  335. {
  336. if (arg.xml[type] == NULL)
  337. continue;
  338. fprintf (arg.xml[type], "</GaoV>rn");
  339. fclose (arg.xml[type]);
  340. }
  341. #endif
  342. fprintf (arg.f, "Channel Count : %d.Total client : %d . Total upsize :%lldB . n",
  343.   arg.spchannelcount, arg.totalclient, arg.totalupsize);
  344. fprintf (arg.f,
  345.   "n************************END SnapShot*********************n");
  346. fclose (arg.f);
  347.     // added by lixingwu, 20070313
  348.     // upload channel.xml to server, only edu
  349.     char upload_cmd[MAX_DATA];
  350.     sprintf(upload_cmd, "curl -F filename=@%s/%s/channel.xml -F domain=%s %s",
  351.         WWW_ROOT, NET_NAME[EDUTS], spip[EDUTS], CAS_ADDR);
  352.     printf("%sn", upload_cmd);
  353.     system(upload_cmd);
  354. #ifdef TEST
  355. if (arg.spchannelcount > 0)
  356. {
  357. *querynum = arg.spchannelcount;
  358. *(int *) buffer = arg.buf - buffer;
  359. for (type=0; type<MAX_TS; type++)
  360. {
  361. if (tsSock[type] <= 0)
  362. continue;
  363. if (sendto (tsSock[type], buffer, *(int *) buffer, 0, (struct sockaddr *) &tsAddr[type], sizeof (struct sockaddr_in)) != *(int *) buffer)
  364. {
  365. PDEBUG ("sent to ts %d error. n", type);
  366. return 0;
  367. }
  368. }
  369. }
  370. #endif
  371. logto_xml (time_interval, tmpTime, arg.spchannelcount, arg.totalclient);
  372. lastCheck = CurrentTime;
  373. last_snapshot ++;
  374. tmpDownBytes = tmpUpBytes = 0;
  375. if (last_snapshot > SnapShotInterval)
  376. {
  377. system ("/usr/bin/vmstat -a >> sp.log 2>&1 &");
  378. last_snapshot = 0;
  379. }
  380. return 0;
  381. }
  382. int
  383. main (int argc, char **argv)
  384. {
  385. int i, mode = 1;
  386. if (argc < 2)
  387. {
  388. printf ("usage: %s mode(0 for daemon, 1 for console).n",
  389. argv[0]);
  390. return -1;
  391. }
  392. signal (SIGPIPE, SIG_IGN);
  393. signal (SIGINT, terminate);
  394. signal (SIGHUP, hup_handler);
  395. mode = atoi (argv[1]);
  396. if (mode == 0)
  397. daemon (1, 1);
  398. read_config (CONFIG, ConfigParameters,
  399.      sizeof (ConfigParameters) / sizeof (struct NamVal));
  400. for (i = 0; i < 10 && IN_LOOP > 0; i++)
  401. {
  402. /*
  403.    pid_t pid;
  404.    if ((pid = fork ()) == 0)
  405.    {
  406.  */
  407. FD_ZERO (&osocks);
  408. if (init_sp () < 0) // || initLOG () < 0)
  409. {
  410. PDEBUG ("init_sp error, exit...n");
  411. exit (-1);
  412. }
  413. process_child ();
  414. /*
  415.    } else if (pid < 0)
  416.    {
  417.    perror ("fork");
  418.    exit (pid);
  419.    } else
  420.    {
  421.    waitpid (pid, NULL, 0);
  422.    }
  423.  */
  424. }
  425. return 0;
  426. }
  427. int
  428. process_P2P_HELLO (struct Session *p, struct Message *m)
  429. {
  430. struct SPUpdate spupdate;
  431. struct Edge *pedge;
  432. struct Channel *pc;
  433. int listnum;
  434. PDEBUG ("CP %d.%d.%d.%d:%d join channel %.32sn",
  435. IPADDR (p->host), p->port, m->buffer);
  436. listnum = p - TRACKER[TYPE_CP].head;
  437. if ((pc = findChannel (m->buffer, MD5_LEN)) == NULL)
  438. {
  439. if ((pc = findOrder (m->buffer, MD5_LEN)) == NULL &&
  440. (pc = newOrder (m->buffer)) == NULL)
  441. {
  442. BUILD_NOCH_SPUPDATE(spupdate);
  443. send_P2P_SPUPDATE (p, pc, m->buffer, &spupdate);
  444. PDEBUG ("Cannot find channel %.32sn",
  445. m->buffer);
  446. //Clientclosure (listnum, TYPE_CP);
  447. } else
  448. {
  449. BUILD_ORDER_SPUPDATE(spupdate,pc);
  450. send_P2P_SPUPDATE (p, pc, m->buffer, &spupdate);
  451. PDEBUG ("channel %.32s spupdate %d~%d. n",
  452. m->buffer, spupdate.minBlockID,
  453. spupdate.maxBlockID);
  454. }
  455. } else
  456. {
  457. for (pedge = p->header; pedge && pedge->head == pc;
  458.      pedge = pedge->enext);
  459. if (!pedge)
  460. {
  461. pedge = newEdge (pc, p);
  462. pc->numclient++;
  463. }
  464. if (pc->pcinfo)
  465. {
  466. if (pc->pcinfo->status > 0)
  467. {
  468. // this channel has been closed
  469. BUILD_CLOSE_SPUPDATE(spupdate);
  470. send_P2P_SPUPDATE (p, pc, m->buffer, &spupdate);
  471. PDEBUG ( "channel %.32s has been closed. n",
  472. m->buffer);
  473. //Clientclosure (listnum, TYPE_CP);
  474. } else if (pc->pcinfo->mlist != NULL)
  475. {
  476. BUILD_MLIST_SPUPDATE (spupdate, pc);
  477. send_P2P_SPUPDATE (p, pc, m->buffer, &spupdate);
  478. sendMedia (p, pc);
  479. PDEBUG ("Channel %.32s is a playlist.n", m->buffer);
  480. } else
  481. {
  482. BUILD_LIVE_SPUPDATE(spupdate,pc);
  483. send_P2P_SPUPDATE (p, pc, m->buffer, &spupdate);
  484. sendMedia (p, pc);
  485. }
  486. } else
  487. {
  488. BUILD_ORDER_SPUPDATE(spupdate,pc);
  489. send_P2P_SPUPDATE (p, pc, m->buffer, &spupdate);
  490. PDEBUG ("channel %.32s spupdate %d~%d. n",
  491. m->buffer, spupdate.minBlockID,
  492. spupdate.maxBlockID);
  493. }
  494. }
  495. return 0;
  496. }
  497. int
  498. process_P2P_PUSHLIST (struct Session *p, struct Message *m)
  499. {
  500. struct Edge *pedge;
  501. struct Channel *pc;
  502. char *buf;
  503. int i,size, type;
  504. if ((pc = findChannel (m->buffer, MD5_LEN)) == NULL)
  505. pc = newOrder (m->buffer);
  506. else if (pc->pcinfo != NULL)
  507. {
  508. for (pedge = p->header; pedge && pedge->head != pc;
  509.      pedge = pedge->enext);
  510. if (pedge == NULL)
  511. {
  512. pedge = newEdge (pc, p);
  513. pc->numclient++;
  514. }
  515. }
  516. if (pc == NULL)
  517. return -1;
  518. if (p->numjob >= MAX_JOB_PER_SESSION)
  519. return -2;
  520. buf = m->buffer + MD5_LEN;
  521. type = *(unsigned char *) buf;
  522. buf += sizeof (char);
  523. size = *(unsigned char *) buf;
  524. buf += sizeof (char);
  525. if (type)
  526. {
  527. deleteChannel (p, pc);
  528. for (i=0; i<size; i++)
  529. if (process_P2P_REQUEST_real (p, pc, ((unsigned int *)buf)[i]) < 0) 
  530. return -1;
  531. } else
  532. {
  533. for (i=0; i<size; i++)
  534. if (process_P2P_REQUEST_real (p, pc, ((unsigned int *)buf)[i]) < 0)
  535. return -1;
  536. buf += size * sizeof (int);
  537. size = *(unsigned char *) buf;
  538. buf += sizeof (char);
  539. deleteJob (p, pc, (unsigned int *) buf, size);
  540. }
  541. return 0;
  542. }
  543. int
  544. process_P2P_REQUEST_real (struct Session *p, struct Channel *pc, int id)
  545. {
  546. struct JobDes *pj = newJob ();
  547. char *buf, *buffer;
  548. int listnum, size=0, max;
  549. struct SPUpdate spupdate;
  550. if (pj == NULL) return -1;
  551. buffer = getJobBuffer (pj, &max);
  552. listnum = p - TRACKER[TYPE_CP].head;
  553. buf = buffer + sizeof (int);
  554. *(unsigned char *) buf = P2P_RESPONSE;
  555. buf += sizeof (char);
  556. memcpy (buf, pc->channel_md5, MD5_LEN);
  557. buf += MD5_LEN;
  558. if (pc->pcinfo == NULL)
  559. {
  560. *(int *) buf = id;
  561. buf += sizeof (int);
  562. if ((size =
  563.      locate_order_by_id (pc, id, buf + sizeof (int),
  564.  max)) < 0 && size != -2)
  565. {
  566. // BLOCK NOT FOUND
  567. spupdate.minKeySample = -1LL;
  568. spupdate.maxKeySample = -1LL;
  569. spupdate.minBlockID = 0xffffffff;
  570. spupdate.maxBlockID = 0xffffffff;
  571. send_P2P_SPUPDATE (p, pc, pc->channel_md5, &spupdate);
  572. size = 0;
  573. *(int *) buf = 0;
  574. buf += sizeof (int);
  575. } else if (size == -2)
  576. {
  577. PDEBUG ("Leave blocks %d to next round.n", id);
  578. return -1;
  579. } else
  580. {
  581. // block found
  582. *(int *) buf = size;
  583. buf += sizeof (int) + size;
  584. p->last_transferblock = CurrentTime;
  585. }
  586. //              Clientclosure (listnum, TYPE_CP);
  587. } else if (id >= 0 && pc->pcinfo->mlist != NULL
  588.    && (size = locate_mplist_by_id (pc, id, buf, max - 32)) > 0)
  589. {
  590. p->last_transferblock = CurrentTime;
  591. buf += 2 * sizeof (int) + size;
  592. } else if (id >= 0 && pc->pcinfo->mlist == NULL
  593.    && (size = locate_by_id (pc, id, buf, max - 32)) > 0)
  594. {
  595. p->last_transferblock = CurrentTime;
  596. buf += 2 * sizeof (int) + size;
  597. } else if (size == -2)
  598. {
  599. assert (0);
  600. PDEBUG ("Leave blocks %d to next round.n", id);
  601. return -1;
  602. } else
  603. {
  604. *(int *) buf = id;
  605. buf += sizeof (int);
  606. size = 0;
  607. *(int *) buf = 0;
  608. buf += sizeof (int);
  609. PDEBUG ("Cannot find block id %d required by client %d.%d.%d.%d.n",
  610. id, IPADDR (p->host));
  611. }
  612. *(int *) buffer = buf - buffer;
  613. setblockId (pj, id);
  614. writeDATAMessage (p, pc, pj);
  615. // PDEBUG ("Write block %d to %d.%d.%d.%dn", id,
  616. // IPADDR (p->host));
  617. return 0;
  618. }
  619. int
  620. init_CP (int listnum)
  621. {
  622. return 0;
  623. }
  624. int
  625. process_CP (int listnum)
  626. {
  627. int ret;
  628. struct Session *p = &(TRACKER[TYPE_CP].head[listnum]);
  629. struct Message *m = (struct Message *) (p->buf + p->start);
  630. tmpDownBytes += m->len;
  631. switch (m->type)
  632. {
  633. case P2P_HELLO:
  634. ret=  process_P2P_HELLO (p, m);
  635. break;
  636. case P2P_PUSHLIST:
  637. ret = process_P2P_PUSHLIST (p, m);
  638. break;
  639. case P2P_MSG:
  640. break;
  641. default:
  642. ret = -1;
  643. break;
  644. }
  645. switch (ret)
  646. {
  647. case -1:
  648. PDEBUG ("Message processing error from client %d.%d.%d.%dn",
  649. IPADDR (p->host));
  650. Clientclosure (listnum, TYPE_CP);
  651. return -1;
  652. case -2:
  653. return -2;
  654. default:
  655. return 0;
  656. }
  657. }
  658. int
  659. closure_CP (int listnum)
  660. {
  661. struct Session *p = &(TRACKER[TYPE_CP].head[listnum]);
  662. //      struct Channel *pc = p->pc;
  663. struct Edge *pedge, *nextedge;
  664. PDEBUG ("CP disconnected from %d.%d.%d.%d:%dn",
  665. IPADDR (p->host), p->port);
  666. for (pedge = p->header; pedge; pedge = nextedge)
  667. {
  668. nextedge = pedge->enext;
  669. if (pedge->head)
  670. pedge->head->numclient--;
  671. delEdge (pedge);
  672. }
  673. FD_CLR (p->socket, &osocks);
  674. close (p->socket);
  675. FREE (p->buf);
  676. deleteAll (p);
  677. memset (p, 0, sizeof (struct Session));
  678. return 0;
  679. }
  680. int
  681. init_CS (int listnum)
  682. {
  683. return 0;
  684. }
  685. int
  686. process_CS2SP_REGISTER (int listnum, char *msg)
  687. {
  688. int i, errmsg;
  689. char cname[MAX_LINE], cmd5[MD5_LEN + 1], buffer[MAX_DATA];
  690. //char escape_buf[MAX_LINE];
  691. char md5[MD5_LEN + 1];
  692. struct Session *p = &(TRACKER[TYPE_CS].head[listnum]), *source;
  693. struct Channel *pc;
  694. float bitrate, limitedbitrate=10000.0;
  695. int id, startblock, size, maxblocksize, datalen, issave=0;
  696. size = *(unsigned char *) msg;
  697. msg += sizeof (char);
  698. if (size > sizeof (cname) || size <= 0) //wrong Message
  699. {
  700. Clientclosure (listnum, TYPE_CS);
  701. return -1;
  702. } else
  703. memcpy (cname, msg, size);
  704. cname[size] = 0;
  705. msg += size;
  706. //      memcpy (cmd5, msg, MD5_LEN);
  707. //      msg += MD5_LEN;
  708. id = *(int *) msg;
  709. msg += sizeof (int);
  710. sprintf (buffer, "%d@%s_%s", id, defaultspip, cname);
  711. md5_calc ((unsigned char *) md5, (unsigned char *) buffer,
  712.   strlen (buffer));
  713. for (i = 0; i < MD5_LEN; i += 2)
  714. sprintf (cmd5 + i, "%02x", (unsigned char) md5[i / 2]);
  715. cmd5[MD5_LEN] = 0;
  716. memcpy (md5, msg, MD5_LEN);
  717. msg += MD5_LEN;
  718. maxblocksize = *(int *) msg;
  719. msg += sizeof (int);
  720. bitrate = *(float *) msg;
  721. msg += sizeof (float);
  722. datalen = *(unsigned short *) msg;
  723. if (datalen > MAX_LINE)
  724. {
  725. Clientclosure (listnum, TYPE_CS);
  726. return -1;
  727. }
  728. msg += sizeof (short);
  729. if (AuthCS && (errmsg =
  730.      isAllowed (id, md5, cname, bitrate, &limitedbitrate, &issave)) < 0)
  731. {
  732. PDEBUG ("User %d is not allowed to newchannel %s.n",
  733. id, cname);
  734. send_p2p_err (p, -errmsg, 1);
  735. Clientclosure (listnum, TYPE_CS);
  736. return -1;
  737. }
  738. startblock = 0;
  739. if ((pc = findChannel (cmd5, MD5_LEN)) != NULL)
  740. {
  741. if (pc->pcinfo == NULL || pc->pcinfo->mlist != NULL)
  742. {
  743. PDEBUG ("The channel %s is a playlist.n", cname);
  744. send_p2p_err (p, ERR_INTERNAL, 1);
  745. Clientclosure (listnum, TYPE_CS);
  746. return -1;
  747. }
  748. if ((source = pc->pcinfo->dataSource) != NULL)
  749. {
  750. Clientclosure (source - TRACKER[TYPE_CS].head,
  751.        TYPE_CS);
  752. }
  753. }
  754. if ((pc = newLiveChannel (cname, p, cmd5, bitrate,
  755.     maxblocksize)) != (struct Channel *) 0)
  756. {
  757. p->pc = pc;
  758. pc->pcinfo->userid = id;
  759. pc->pcinfo->limitedBitRate = limitedbitrate;
  760. pc->pcinfo->isSave = issave;
  761. pc->pcinfo->dataSource = &(TRACKER[TYPE_CS].head[listnum]);
  762. pc->pcinfo->startid = pc->pcinfo->maxID = (CurrentTime - FIX_MAGIC) * 16;
  763. } else
  764. {
  765. PDEBUG ("newLiveChannel failed.n");
  766. send_p2p_err (p, ERR_INTERNAL, 1);
  767. Clientclosure (listnum, TYPE_CS);
  768. return -1;
  769. }
  770. *(int *) buffer = 9;
  771. *(char *) (buffer + sizeof (int)) = SP2CS_WELCOME;
  772. *(int *) (buffer + sizeof (int) + sizeof (char)) = startblock;
  773. if (writeMessage (p, pc, buffer) < 0)
  774. {
  775. send_p2p_err (p, ERR_INTERNAL, 1);
  776. Clientclosure (listnum, TYPE_CS);
  777. return -1;
  778. }
  779. pc->pcinfo->cur_channel = pc->pcinfo->max_channel = 1;
  780. pc->pcinfo->media = calloc (1, sizeof (struct MediaData));
  781. pc->pcinfo->media[0].start = 0;
  782. pc->pcinfo->media[0].len = -1;
  783. pc->pcinfo->media[0].dlen = datalen;
  784. pc->pcinfo->media[0].data = calloc (1, datalen);
  785. memcpy (pc->pcinfo->media[0].data, msg, datalen);
  786. #ifdef TEST
  787. /*
  788. for (type = 0; type < MAX_TS; ++type)
  789. buildGTV (pc, datalen, msg, type);
  790. */
  791. for (i=0; i<MAX_TS; i++) {
  792. if (buildGTV (pc, datalen, msg, i) < 0)
  793. continue;
  794. }
  795. #endif
  796. #ifdef HAVE_MYSQL
  797. //      query_mysql (local_mysql, "delete from channel where ChannelMD5 = "%s"", cmd5);
  798. //      mysql_escape_string (escape_buf, msg, datalen);
  799. //      query_mysql (local_mysql, "insert into channel (ChannelName, ChannelBitrate, ChannelAttachData, ChannelMD5, ChannelOwnerID) values ("%s","%f", "%s", "%s", "%d")", cname, bitrate, escape_buf, cmd5, id);
  800. #endif
  801. return 0;
  802. }
  803. int
  804. process_CS2SP_UPDATE (int listnum, float rate) //only update bitrate
  805. {
  806. struct Session *p = &(TRACKER[TYPE_CS].head[listnum]);
  807. struct Channel *pc = p->pc;
  808. struct LiveChannelInfo *pcinfo = pc->pcinfo;
  809. pcinfo->bitrate = rate;
  810. if (rate > pcinfo->limitedBitRate)
  811. {
  812. send_p2p_err (p, ERR_EXCEED_BITRATE, 1);
  813. Clientclosure (listnum, TYPE_CS);
  814. return -1;
  815. }
  816. #ifdef HAVE_MYSQL
  817. //      query_mysql (local_mysql, "update channel set ChannelBitrate = "%f" where ChannelMD5 = "%s"", rate, pc->channel_md5);
  818. #endif
  819. return 0;
  820. }
  821. int
  822. process_CS2SP_BLOCK (int listnum, char *msg)
  823. {
  824. char *buf, buffer[MAX_DATA];
  825. struct Edge *pedge;
  826. int size; // max=TRACKER[TYPE_CP].maxid+1;
  827. struct Session *p = &(TRACKER[TYPE_CS].head[listnum]);
  828. //      struct Session *q=TRACKER[TYPE_CP].head;
  829. struct Channel *pc = p->pc;
  830. struct LiveChannelInfo *pcinfo;
  831. if (p->pc == NULL || (pcinfo = p->pc->pcinfo) == NULL || pcinfo->mlist != NULL)
  832. {
  833. PDEBUG ("Unmatched channeln");
  834. Clientclosure (listnum, TYPE_CS);
  835. return -1;
  836. }
  837. pcinfo = p->pc->pcinfo;
  838. if ((size = saveBlock (pc, msg, p)) > 0)
  839. {
  840. // directly send this block to connected CPs
  841. buf = buffer + sizeof (int);
  842. *(unsigned char *) buf = P2P_RESPONSE;
  843. buf += sizeof (char);
  844. memcpy (buf, pc->channel_md5, MD5_LEN);
  845. buf += MD5_LEN;
  846. p->last_transferblock = CurrentTime;
  847. memcpy (buf, msg, size + 2 * sizeof (int));
  848. buf += size + 2 * sizeof (int);
  849. *(int *) buffer = buf - buffer;
  850. for (pedge = pc->PeerHead; pedge; pedge = pedge->cnext)
  851. {
  852. if (pedge->me->numjob >= MAX_JOB_PER_SESSION)
  853. continue;
  854. pedge->me->last_transferblock = CurrentTime;
  855. // PDEBUG ("write %u ",
  856. // ((unsigned int *) msg)[0]);
  857. if (-1 == writeMessage (pedge->me, pc, buffer))
  858. PDEBUG ("buffer is fulln");
  859. // else
  860. // PDEBUG ("OKn");
  861. }
  862. if (pcinfo->updated <= CurrentTime + SPUPDATE_SLOT) //pcinfo is livechannel
  863. send_all_spupdate (pc, &(pcinfo->s));
  864. } else
  865. {
  866. PDEBUG ("save Block Failed ! size %d, %dn", size,
  867. listnum);
  868. Clientclosure (listnum, TYPE_CS);
  869. }
  870. return 0;
  871. }
  872. int
  873. process_CS (int listnum)
  874. {
  875. struct Session *p = &(TRACKER[TYPE_CS].head[listnum]);
  876. struct Message *m = (struct Message *) (p->buf + p->start);
  877. tmpDownBytes += m->len;
  878. switch (m->type)
  879. {
  880. case CS2SP_REGISTER:
  881. process_CS2SP_REGISTER (listnum, m->buffer);
  882. break;
  883. case CS2SP_UPDATE:
  884. process_CS2SP_UPDATE (listnum,
  885.       *(float *) (m->buffer));
  886. break;
  887. case CS2SP_BLOCK:
  888. process_CS2SP_BLOCK (listnum, m->buffer);
  889. break;
  890. default:
  891. Clientclosure (listnum, TYPE_CS);
  892. return -1;
  893. }
  894. return 0;
  895. }
  896. int
  897. closure_CS (int listnum)
  898. {
  899. struct Session *p = &(TRACKER[TYPE_CS].head[listnum]);
  900. struct Channel *pc = p->pc;
  901. PDEBUG ("CS disconnected from %d.%d.%d.%d:%dn",
  902. IPADDR (p->host), p->port);
  903. if (pc)
  904. {
  905. if (pc->pcinfo)
  906. {
  907. pc->pcinfo->dataSource = NULL;
  908. pc->pcinfo->status = 1;
  909. }
  910. freeLiveChannel (pc, NULL);
  911. }
  912. FD_CLR (TRACKER[TYPE_CS].head[listnum].socket, &osocks);
  913. close (TRACKER[TYPE_CS].head[listnum].socket);
  914. FREE (p->buf);
  915. deleteAll (p);
  916. memset (&(TRACKER[TYPE_CS].head[listnum]), 0,
  917. sizeof (struct Session));
  918. return 0;
  919. }
  920. int
  921. init_sp ()
  922. {
  923. FILE *pidf;
  924. struct rlimit rl;
  925. char *index;
  926. int type;
  927. char buffer[MAX_DATA];
  928. rl.rlim_cur = rl.rlim_max = 1000000;
  929. if (setrlimit (RLIMIT_NOFILE, &rl) != 0)
  930. {
  931. perror ("setrlimit");
  932. }
  933. for (type=0; type<MAX_TS; type++)
  934. {
  935. if (spip[type] != NULL && strlen (spip[type]) >= MIN_IPADDR_LEN)
  936. {
  937. defaultspip = spip[type];
  938. break;
  939. }
  940. }
  941. if (defaultspip == NULL)
  942. defaultspip = "127.0.0.1";
  943. OPENLOG;
  944. #ifdef DEBUG
  945. system ("ulimit -a");
  946. if (getrlimit (RLIMIT_CORE, &rl) != 0)
  947. {
  948. perror ("getrlimit");
  949. }
  950. PDEBUG ("Get core limit %d:%dn", (int)rl.rlim_cur, (int)rl.rlim_max);
  951. rl.rlim_cur = rl.rlim_max = (rlim_t )10240000;
  952. if (setrlimit (RLIMIT_CORE, &rl) != 0)
  953. {
  954. perror ("setrlimit");
  955. }
  956. if (getrlimit (RLIMIT_CORE, &rl) != 0)
  957. {
  958. perror ("getrlimit");
  959. }
  960. PDEBUG ("Set core limit to %d:%dn", (int)rl.rlim_cur, (int)rl.rlim_max);
  961. system ("ulimit -a");
  962. #endif
  963. #ifdef HAVE_MYSQL
  964. if ((local_mysql =
  965.      init_mysql (MYSQL_HOST, MYSQL_USER, MYSQL_PASS, MYSQL_DB,
  966.  "/var/run/mysqld/mysqld.sock")) == 0)
  967. {
  968. PDEBUG ("Error in init_mysql.n");
  969. exit (1);
  970. }
  971. #endif
  972. TRACKER[TYPE_CP].type = TYPE_CP; //allocate CP ServerDesc  TYPE_CP = 0
  973. TRACKER[TYPE_CP].port = SP4CP_PORT; //SP4CP_PORT = 50001
  974. TRACKER[TYPE_CP].cur = 0; //current client connection 
  975. TRACKER[TYPE_CP].max = MAX_CP; //MAX_CP = 2048
  976. TRACKER[TYPE_CP].init = init_CP; //the function pointer of init_CP,return a debug message
  977. TRACKER[TYPE_CP].process = process_CP; //the function pointer of process_CP, switch TYPE
  978. TRACKER[TYPE_CP].closure = closure_CP; //the funciont pointer of closure_CP,return debug msg and freejob
  979. TRACKER[TYPE_CP].head = calloc (sizeof (struct Session), TRACKER[TYPE_CP].max); //allocate session memory
  980. switch (BINDALL)
  981. {
  982. case 0:
  983. if ((TRACKER[TYPE_CP].sock = init_server (spip[0], SP4CP_PORT)) < 0) //PORT = 50001
  984. return -1;
  985. break;
  986. default:
  987. if ((TRACKER[TYPE_CP].sock = init_server (NULL, SP4CP_PORT)) < 0) //PORT = 50001
  988. return -1;
  989. break;
  990. }
  991. FD_SET (TRACKER[TYPE_CP].sock, &osocks);
  992. TRACKER[TYPE_CS].type = TYPE_CS; //allocate CP ServerDese,TYPE_CS = 1
  993. TRACKER[TYPE_CS].port = SP4CS_PORT;
  994. TRACKER[TYPE_CS].cur = 0;
  995. TRACKER[TYPE_CS].max = MAX_CS; //MAX_CS = 512
  996. TRACKER[TYPE_CS].init = init_CS; //return a debug message        
  997. TRACKER[TYPE_CS].process = process_CS;
  998. TRACKER[TYPE_CS].closure = closure_CS;
  999. TRACKER[TYPE_CS].head =
  1000. calloc (sizeof (struct Session), TRACKER[TYPE_CS].max);
  1001. switch (BINDALL)
  1002. {
  1003. case 0:
  1004. if ((TRACKER[TYPE_CS].sock = init_server (spip[0], SP4CS_PORT)) < 0) //port = 
  1005. return -1;
  1006. break;
  1007. default:
  1008. if ((TRACKER[TYPE_CS].sock = init_server (NULL, SP4CS_PORT)) < 0) //port = 
  1009. return -1;
  1010. break;
  1011. }
  1012. FD_SET (TRACKER[TYPE_CS].sock, &osocks);
  1013. if (db_init (Home, Database) < 0)
  1014. return -1;
  1015. memset (tsAddr, 0, sizeof (struct sockaddr_in) * MAX_TS); //sockaddr_in type
  1016. for (type = 0; type < MAX_TS; ++type)
  1017. {
  1018. #ifdef TEST
  1019. sprintf (buffer, "rm -f %s/%s/*.gtv", WWW_ROOT,
  1020.  NET_NAME[type]);
  1021. system (buffer);
  1022. sprintf (buffer, "rm -f %s/%s/*.mediadata", WWW_ROOT,
  1023.  NET_NAME[type]);
  1024. system (buffer);
  1025. sprintf (buffer, "rm -f %s/%s/channel.xml", WWW_ROOT,
  1026.  NET_NAME[type]);
  1027. system (buffer);
  1028. #endif
  1029. sprintf (buffer, "rm -fr %s/%s/*", PREFIX, LIVE_PREFIX);
  1030. system (buffer);
  1031. if (tsip[type] == NULL) continue;
  1032. tsAddr[type].sin_family = AF_INET;
  1033. tsAddr[type].sin_port = htons (TS4RM_PORT);
  1034. index = strchr (tsip[type], ':');
  1035. if (index == NULL)
  1036. inet_aton (tsip[type], &tsAddr[type].sin_addr);
  1037. else
  1038. {
  1039. *index = 0;
  1040. inet_aton (tsip[type], &tsAddr[type].sin_addr);
  1041. *index = ':';
  1042. }
  1043. tsSock[type] = socket (PF_INET, SOCK_DGRAM, 0); //upd connection
  1044. if (tsSock[type] < 0)
  1045. return -1;
  1046. }
  1047. mkdir (PREFIX, 0777);
  1048. sprintf (buffer, "%s/%s", PREFIX, LIVE_PREFIX);
  1049. mkdir (buffer, 0777);
  1050. sprintf (buffer, "%s/%s", PREFIX, ORDER_PREFIX);
  1051. mkdir (buffer, 0777);
  1052. sprintf (buffer, "%s/%s", PREFIX, PLIST_PREFIX);
  1053. mkdir (buffer, 0777);
  1054. sprintf (buffer, "%s/%s", PREFIX, PROG_PREFIX);
  1055. mkdir (buffer, 0777);
  1056. if ((pidf = fopen (PIDFile, "w")) == NULL)
  1057. {
  1058. PDEBUG ("Cannot open pidfile.n");
  1059. return -1;
  1060. }
  1061. fprintf (pidf, "%dn", getpid ());
  1062. fclose (pidf);
  1063. return 0;
  1064. }
  1065. #ifdef TEST
  1066. /*
  1067. int
  1068. buildMediaData (struct Channel *pc, int datalen, char *data, int type)
  1069. {
  1070. int i;
  1071. char olddata[MAX_DATA];
  1072. char buffer[MAX_DATA];
  1073. struct stat stbuf;
  1074. FILE *f;
  1075. assert (pc->pcinfo);
  1076. snprintf (buffer, MAX_LINE, "%s/%s/%s.mediadata", WWW_ROOT, NET_NAME[type], pc->channel_name);
  1077. if (stat (buffer, &stbuf) == 0)
  1078. {
  1079. if (stbuf.st_size != datalen)
  1080. {
  1081. PDEBUG ("old media data size %d not match new %dn", (int)(stbuf.st_size), datalen);
  1082. return -1;
  1083. }
  1084. if ((f = fopen (buffer, "r")) == NULL)
  1085. {
  1086. PDEBUG ("cannot open mediadata file %sn", buffer);
  1087. perror ("fopen");
  1088. return -1;
  1089. }
  1090. if (fread (olddata, datalen, 1, f) == 1)
  1091. {
  1092. for (i=0; i<datalen; i++)
  1093. {
  1094. if (data[i] != olddata[i])
  1095. {
  1096. PDEBUG ("media data not match %s, %dn", buffer, i);
  1097. fclose (f);
  1098. return -1;
  1099. }
  1100. }
  1101. } else
  1102. {
  1103. PDEBUG ("Error in read old mediadata %s.n", buffer);
  1104. fclose (f);
  1105. return -1;
  1106. }
  1107. fclose (f);
  1108. return 0;
  1109. } else if ((f = fopen (buffer, "w")) == NULL)
  1110. {
  1111. PDEBUG ("cannot open gtv file %sn", buffer);
  1112. perror ("fopen");
  1113. return -1;
  1114. }
  1115. fwrite (data, datalen, 1, f);
  1116. fclose (f);
  1117. return 0;
  1118. }
  1119. */
  1120. int
  1121. buildGTV (struct Channel *pc, int datalen, char *data, int type)
  1122. {
  1123. char buffer[MAX_DATA];
  1124. FILE *f;
  1125. assert (pc->pcinfo);
  1126. snprintf (buffer, MAX_LINE, "%s/%s/%s.gtv", WWW_ROOT, NET_NAME[type], pc->channel_name);
  1127. // snprintf (buffer, MAX_LINE, "%s/%s.gtv", pc->fname, pc->channel_name);
  1128. if ((f = fopen (buffer, "w")) == NULL)
  1129. {
  1130. PDEBUG ("cannot open gtv file %sn", buffer);
  1131. perror ("fopen");
  1132. return -1;
  1133. }
  1134. if (pc->pcinfo)
  1135. {
  1136. if (pc->pcinfo->mlist != NULL)
  1137. sprintf (buffer,
  1138.  "CSUserID=%drnBlockSize=%drnBitRate=%frnChannelName=%srnPlaylist=truernResourceHash=%srnTrackServer=%srnSuperPeer=%s:50001rnDataLength=%drnData=",
  1139.  pc->pcinfo->userid, pc->maxblocksize, pc->pcinfo->bitrate,
  1140.  pc->channel_name, pc->channel_md5, tsip[type], spip[type],
  1141.  datalen);
  1142. else
  1143. sprintf (buffer,
  1144.  "CSUserID=%drnBlockSize=%drnBitRate=%frnChannelName=%srnResourceHash=%srnTrackServer=%srnSuperPeer=%s:50001rnDataLength=%drnData=",
  1145.  pc->pcinfo->userid, pc->maxblocksize, pc->pcinfo->bitrate,
  1146.  pc->channel_name, pc->channel_md5, tsip[type], spip[type],
  1147.  datalen);
  1148. } else
  1149. sprintf (buffer,
  1150.  "GTVHome=%s/rnBlockSize=%drnResourceHash=%srnTrackServer=%srnSuperPeer=%s:50001rnDataLength=%drnData=",
  1151.  urlroot, pc->maxblocksize, pc->channel_md5,
  1152.  tsip[type], spip[type], datalen);
  1153. fwrite (buffer, strlen (buffer), 1, f);
  1154. fwrite (data, datalen, 1, f);
  1155. fclose (f);
  1156. /* snprintf (buffer, MAX_LINE, "%s/%s/%s.mediadata", WWW_ROOT, NET_NAME[type], pc->channel_name);
  1157. if (stat (buffer, &stbuf) == 0)
  1158. {
  1159. if (stbuf.st_size != datalen)
  1160. {
  1161. PDEBUG ("old media data size %d not match new %dn", (int)(stbuf.st_size), datalen);
  1162. return -1;
  1163. }
  1164. if ((f = fopen (buffer, "r")) == NULL)
  1165. {
  1166. PDEBUG ("cannot open mediadata file %sn", buffer);
  1167. perror ("fopen");
  1168. return -1;
  1169. }
  1170. if (fread (olddata, datalen, 1, f) == 1)
  1171. {
  1172. for (i=0; i<datalen; i++)
  1173. {
  1174. if (data[i] != olddata[i])
  1175. {
  1176. PDEBUG ("media data not match %s, %dn", buffer, i);
  1177. fclose (f);
  1178. return -1;
  1179. }
  1180. }
  1181. } else
  1182. {
  1183. PDEBUG ("Error in read old mediadata %s.n", buffer);
  1184. fclose (f);
  1185. return -1;
  1186. }
  1187. fclose (f);
  1188. return 0;
  1189. } else if ((f = fopen (buffer, "w")) == NULL)
  1190. {
  1191. PDEBUG ("cannot open gtv file %sn", buffer);
  1192. perror ("fopen");
  1193. return -1;
  1194. }
  1195. fwrite (data, datalen, 1, f);
  1196. fclose (f);
  1197. */
  1198.     // added by lixingwu, 20070313
  1199.     // upload gtv files to server
  1200.     char upload_cmd[MAX_DATA];
  1201.     sprintf(upload_cmd, "curl -F filename=@%s/%s/%s.gtv -F domain=%s %s",
  1202.         WWW_ROOT, NET_NAME[type], pc->channel_name, spip[type], CAS_ADDR);
  1203.     printf("%sn", upload_cmd);
  1204.     system(upload_cmd);
  1205.     
  1206.     return 0;
  1207. }
  1208. #endif
  1209. int
  1210. send_P2P_SPUPDATE (struct Session *p, struct Channel *pc, char *md5, struct SPUpdate *s)
  1211. {
  1212. char buffer1[MAX_DATA];
  1213. char *buf;
  1214. buf = buffer1 + sizeof (int);
  1215. *(unsigned char *) buf = P2P_SPUPDATE; //P2P_SPUPDATE=1 in ProTocol file
  1216. buf += sizeof (char);
  1217. memcpy (buf, md5, MD5_LEN);
  1218. buf += MD5_LEN;
  1219. memcpy (buf, s, sizeof (struct SPUpdate));
  1220. buf += sizeof (struct SPUpdate);
  1221. *(int *) buffer1 = buf - buffer1;
  1222. if (writeMessage (p, pc, buffer1) < 0)
  1223. return -1;
  1224. return 0;
  1225. }
  1226. int
  1227. send_p2p_err (struct Session *p, unsigned short code, int quit)
  1228. {
  1229. char buffer1[MAX_DATA];
  1230. char *buf;
  1231. buf = buffer1 + sizeof (int);
  1232. *(unsigned char *) buf = P2P_MSG;
  1233. buf += sizeof (char);
  1234. (*(unsigned short *) buf) = code;
  1235. buf += sizeof (short);
  1236. *(int *) buf = quit;
  1237. buf += sizeof (int);
  1238. *(int *) buffer1 = buf - buffer1;
  1239. PDEBUG ("Send error msg type %hd to %pn", code, p);
  1240. if (writeMessage (p, NULL, buffer1) < 0)
  1241. return -1;
  1242. return 0;
  1243. }
  1244. void apply_idle (struct Session *p, void *arg)
  1245. {
  1246. struct Argument *parg = (struct Argument *)arg;
  1247. if (CurrentTime - p->last_transferblock >= MAX_TRANSFER_IDLE)
  1248. {
  1249. fprintf (parg->f, "%s Session timeout! %ld n", parg->type == TYPE_CS? "CS":"CP", CurrentTime - p->last_transferblock);
  1250. Clientclosure (p - TRACKER[parg->type].head, parg->type);
  1251. }
  1252. }
  1253. void apply_check (struct Channel *p, void *arg)
  1254. {
  1255. struct Argument *parg = (struct Argument *)arg;
  1256. int type;
  1257. struct LiveChannelInfo *pc = p->pcinfo;
  1258. if (pc && pc->status <= 0)
  1259. {
  1260. #ifdef HAVE_MYSQL
  1261. query_mysql (local_mysql,
  1262.      "insert into channel(ChannelName, ChannelBitrate, ChannelMD5, ChannelElapsed, ChannelRange) values ('%s','%f','%d','%d')",
  1263.      pc->channel_name, pc->bitrate,
  1264.      p->channel_md5,
  1265.      time (NULL) - p->ctime,
  1266.      pc->s.maxKeySample -
  1267.      pc->s.minKeySample);
  1268. #endif
  1269. #ifdef TEST
  1270. for (type=0; type<MAX_TS; type++)
  1271. {
  1272. if (parg->xml[type] == NULL)
  1273. continue;
  1274. fprintf (parg->xml[type], "<Channel Name="%s" Desc="%s" File="%s.gtv" NumClient="%d" BitRate="%d" Start="%ld" End="-1" Elapsed="%ld"/>rn",
  1275.  p->channel_name, "gtv", //pc->userid,
  1276.  p->channel_name, p->numofnp[type],
  1277.  (int) (pc->bitrate * 8),
  1278.  (long) p->ctime, (long) CurrentTime);
  1279. }
  1280. #endif
  1281. PINFO ("query %s->%s n",
  1282. p->channel_name, p->channel_md5);
  1283. memcpy (parg->buf, p->channel_md5, MD5_LEN);
  1284. parg->buf += MD5_LEN;
  1285. fprintf (parg->f, "Channel %s have %d client. upsize %lldB, avg speed %f; downsize %lldB, avg speed %f, reported %f, real/reported is %f%%.n",
  1286.  p->channel_md5, p->numclient,
  1287.  p->upsize, ((float)(p->upsize)) / (CurrentTime - lastCheck), p->downsize, ((float)(p->downsize)) / (CurrentTime - lastCheck), p->pcinfo !=NULL ? p->pcinfo->bitrate:0, p->pcinfo != NULL && p->pcinfo->bitrate != 0 ? (((float)(p->downsize)) / (CurrentTime - lastCheck)*100/p->pcinfo->bitrate): 0);
  1288. fprintf (parg->f, "Live SPUpdate : SampleMin %lld SampleMax %lld BlockMin %d BlockMax %d n",
  1289.  pc->s.minKeySample,
  1290.  pc->s.maxKeySample,
  1291.  pc->s.minBlockID,
  1292.  pc->s.maxBlockID);
  1293. parg->spchannelcount ++;
  1294. parg->totalclient += p->numclient;
  1295. parg->totalupsize += p->upsize;
  1296. } else if (pc && pc->isSave == 0)
  1297. freeLiveChannel (p, NULL);
  1298. }