livechannel.c
上传用户:liguizhu
上传日期:2015-11-01
资源大小:2422k
文件大小:25k
源码类别:

P2P编程

开发平台:

Visual C++

  1. /*
  2.  *  Openmysee
  3.  *
  4.  *  This program is free software; you can redistribute it and/or modify
  5.  *  it under the terms of the GNU General Public License as published by
  6.  *  the Free Software Foundation; either version 2 of the License, or
  7.  *  (at your option) any later version.
  8.  *
  9.  *  This program is distributed in the hope that it will be useful,
  10.  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
  11.  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  12.  *  GNU General Public License for more details.
  13.  *
  14.  *  You should have received a copy of the GNU General Public License
  15.  *  along with this program; if not, write to the Free Software
  16.  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
  17.  *
  18.  */
  19.  
  20. #include "echo.h"
  21. int NumNewChannel;
  22. struct Channel *ChannelHash[MAX_CHANNEL];
  23. struct Channel *ChannelList;
  24. extern char *PREFIX;
  25. extern struct ServerDesc TRACKER[MAX_TYPE];
  26. extern int Clientclosure (int listnum, int type);
  27. inline int hash_str (unsigned char *str, int len)
  28. {
  29. int hash;
  30. for (hash=0; len; len--, str++)
  31. hash += (hash << 5) - hash + (*str);
  32. return hash & (MAX_CHANNEL - 1);
  33. }
  34. struct Channel *getChannel (struct Channel **phash, char *name, int len)
  35. {
  36. int id = hash_str (name, len);
  37. struct Channel *p;
  38. for (p=phash[id]; p; p=p->next)
  39. {
  40. if (strncmp (name, p->channel_md5, len) == 0)
  41. return p;
  42. }
  43. PDEBUG("Cannot findChannel hash %.32s.n", name);
  44. return NULL;
  45. }
  46. void apply_hash (struct Channel **phash, void apply(struct Channel *, void *), void *p)
  47. {
  48. int i;
  49. struct Channel *pc, *nextpc;
  50. for (i=0; i<MAX_CHANNEL; i++)
  51. {
  52. for (pc=phash[i]; pc; pc=nextpc)
  53. {
  54. nextpc = pc->next;
  55. apply (pc, p);
  56. }
  57. }
  58. }
  59. void apply_list (struct Channel *plist, void apply(struct Channel *, void *), void *p)
  60. {
  61. struct Channel *pc, *nextpc;
  62. for (pc=plist; pc; pc=nextpc)
  63. {
  64. nextpc = pc->lnext;
  65. apply (pc, p);
  66. }
  67. }
  68. int freeChannel (struct Channel **phash, struct Channel **plist, int *count, struct Channel *p)
  69. {
  70. int id = hash_str (p->channel_md5, MD5_LEN);
  71. struct Edge *nextedge, *pedge;
  72. struct Channel *pchannel;
  73. if (phash[id] == p)
  74. {
  75. phash[id] = p->next;
  76. } else
  77. {
  78. for (pchannel=phash[id]; pchannel; pchannel=pchannel->next)
  79. {
  80. if (pchannel->next == p)
  81. {
  82. pchannel->next = p->next;
  83. break;
  84. }
  85. }
  86. if (!pchannel) return -1;
  87. }
  88. if ((*plist) == p)
  89. {
  90. *plist = p->lnext;
  91. } else
  92. {
  93. for (pchannel=*plist; pchannel; pchannel=pchannel->lnext)
  94. {
  95. if (pchannel->lnext == p)
  96. {
  97. pchannel->lnext = p->lnext;
  98. break;
  99. }
  100. }
  101. if (!pchannel) return -1;
  102. }
  103. (*count) --;
  104. #ifdef __CP_SOURCE
  105. if (p->db != NULL) fclose (p->db);
  106. #endif
  107. if (p->pcinfo)
  108. {
  109. free_livechannel (p);
  110. free (p->pcinfo);
  111. }
  112. for (pedge=p->PeerHead; pedge; pedge=nextedge)
  113. {
  114. p->numclient --;
  115. nextedge = pedge->cnext;
  116. delEdge (pedge);
  117. }
  118. free (p);
  119. return 0;
  120. }
  121. inline struct Channel *findChannel (char *name, int len)
  122. {
  123. return getChannel (ChannelHash, name, len);
  124. }
  125. void freeLiveChannel (struct Channel *pc, void *p)
  126. {
  127. freeChannel (ChannelHash, &ChannelList, &NumNewChannel, pc);
  128. }
  129. void freeAllChannel ()
  130. {
  131. apply_hash (ChannelHash, freeLiveChannel, NULL);
  132. }
  133. inline void buildLivePath (char *buf, int len, char *md5)
  134. {
  135. snprintf (buf, len, "%s/%s%.2s/", PREFIX, LIVE_PREFIX, md5);
  136. mkdir (buf, 0777);
  137. strcat (buf, md5);
  138. }
  139. #ifdef __CP_SOURCE
  140. int locate_by_id (struct Channel *pc, unsigned int id, char *buf, int max)
  141. {
  142. int i, *msg;
  143. struct LiveChannelInfo *c = pc->pcinfo;
  144. if (c == NULL)
  145. return -1;
  146. if (c->indisk == NULL)
  147. {
  148. c->max_queue = MAX_QUEUE;
  149. c->indisk = calloc (c->max_queue, 1);
  150. c->bitflag = calloc ((c->max_queue+7)/8, 1);
  151. // if (c->indisk == NULL || c->bitflag == NULL)
  152. PDEBUG ("allocate memory,%p,%pn",c->indisk, c->bitflag);
  153. return -1;
  154. }
  155. i = id % c->max_queue;
  156. if (c->indisk[i] == 0 || (pc->type != T_PLIST && c->indisk[i] != (id/c->max_queue + 1)))
  157. {
  158. PINFO ("empty flag %d.n", c->indisk[i]);
  159. return -1;
  160. }
  161. if (pc->maxblocksize + 2*sizeof(int) > max)
  162. {
  163. PDEBUG ("too small buffer %d for %d", max, pc->maxblocksize);
  164. return -2;
  165. }
  166. if (fseeko (pc->db, ((off_t)(i)) * (pc->maxblocksize + 2*sizeof (int)), SEEK_SET) != 0)
  167. {
  168. PDEBUG ("fseek failed.n");
  169. return -1;
  170. }
  171. if ((i=fread (buf, 1, pc->maxblocksize+2*sizeof (int), pc->db)) <= 2*sizeof (int) || i < ((int *)buf)[1]+2*sizeof(int))
  172. {
  173. PDEBUG ("Only %d read [%d]n", i, ((int *)buf)[1]);
  174. return -1;
  175. }
  176. msg = (int *)buf;
  177. if (pc->type == T_PLIST)
  178. msg[0] = id;
  179. if (msg[0] != id || msg[1] > pc->maxblocksize || msg[1] <= 0)
  180. {
  181. PDEBUG ("Message read is [%d %d]n", msg[0], msg[1]);
  182. return -1;
  183. }
  184. pc->upsize += msg[1];
  185. return msg[1];
  186. }
  187. int saveBlock (struct Channel *c, char *buf, struct Session *p)
  188. {
  189. unsigned int pos, id, size;
  190. struct LiveChannelInfo *pcinfo;
  191. assert (buf);
  192. if ((!c) || (pcinfo = c->pcinfo) == NULL)
  193. {
  194. PDEBUG ("saveBlock c is null.n");
  195. return -1;
  196. }
  197. if (pcinfo->indisk == NULL)
  198. {
  199. pcinfo->max_queue = MAX_QUEUE;
  200. pcinfo->indisk = calloc (pcinfo->max_queue, 1);
  201. pcinfo->bitflag = calloc ((pcinfo->max_queue+7)/8, 1);
  202. if (pcinfo->indisk == NULL || pcinfo->bitflag == NULL)
  203. return -1;
  204. }
  205. id = ((unsigned int *)buf)[0];
  206. size = ((unsigned int *)buf)[1];
  207. pos = id % pcinfo->max_queue;
  208. if (id > 0) clrBit (pcinfo->bitflag, pos);
  209. if (size > MAX_BLOCK_SIZE || size <= MIN_BLOCK_SIZE || id < 0)
  210. {
  211. PDEBUG ("saveBlock:size is %d and id is %d.n", size, id);
  212. return 0;
  213. }
  214. if (c->maxblocksize == 0)
  215. c->maxblocksize = size;
  216. else if (size > c->maxblocksize)
  217. {
  218. PDEBUG ("saveBlock:maxblocksize is %d, size is %d and id is %d.n", c->maxblocksize, size, id);
  219. return 0;
  220. }
  221. // PDEBUG ("Recv %d(%d) Save2 %d, dataSource %p and now is %pn", (int)id, (int)size, (int)pos, pcinfo->dataSource, p);
  222. if (fseeko (c->db, ((off_t)(pos)) * (c->maxblocksize + 2*sizeof (int)), SEEK_SET) != 0)
  223. {
  224. PDEBUG ("Error in fsseko.n");
  225. return -1;
  226. }
  227. if (fwrite (buf, size+2*sizeof(int), 1, c->db) != 1)
  228. {
  229. PDEBUG ("fwrite error in saveBlock:%.32s:%s", c->channel_md5, c->fname);
  230. return -1;
  231. }
  232. if (pcinfo->dataSource != p)
  233. {
  234. PDEBUG ("dataSource %p is not equal p %p.n", pcinfo->dataSource, p);
  235. pcinfo->dataSource = p;
  236. }
  237. fflush (c->db);
  238. pcinfo->total ++;
  239. c->downsize += size;
  240. if (id > pcinfo->maxID) pcinfo->maxID = id;
  241. pcinfo->indisk[pos] = (id/pcinfo->max_queue) + 1;
  242. if (pcinfo->indisk[pos] == 0)
  243. PDEBUG("Too large id %ds", id);
  244. return size;
  245. }
  246. int init_livechannel (struct Channel *p)
  247. {
  248. if ((p->db = fopen (p->fname, "w+")) == (FILE *)0)
  249. return -1;
  250. p->pcinfo->isSave = 0;
  251. return 0;
  252. }
  253. int free_livechannel (struct Channel *p)
  254. {
  255. struct LiveChannelInfo *pcinfo = p->pcinfo;
  256. free (pcinfo->indisk);
  257. free (pcinfo->bitflag);
  258. unlink (p->fname);
  259. if (pcinfo->dataSource != NULL)
  260. {
  261. pcinfo->dataSource->pc = NULL;
  262. PDEBUG ("close CS source for channel %.32s.n", p->channel_md5);
  263. Clientclosure (pcinfo->dataSource-TRACKER[TYPE_P2PC].head, TYPE_P2PC);
  264. pcinfo->dataSource = NULL;
  265. }
  266. freeMedia (p);
  267. return 0;
  268. }
  269. #endif
  270. #ifdef __SP_SOURCE
  271. extern time_t CurrentTime;
  272. extern char *NET_NAME[];
  273. extern char *WWW_ROOT;
  274. extern char *defaultspip;
  275. int Changed = 1;
  276. extern int timer_add (unsigned int t, TimerFunc process, void *entity, void *data);
  277. extern int writeMessage (struct Session *p, struct Channel *pc, char *ptr);
  278. extern int buildGTV (struct Channel *pc, int datalen, char *data, int type);
  279. extern inline void freeProgram (struct Channel *, void *);
  280. extern void send_all_spupdate (struct Channel *pc, struct SPUpdate *s);
  281. void hup_handler (int sig)
  282. {
  283. Changed ++;
  284. }
  285. inline void buildPListPath (char *buf, int len, char *md5)
  286. {
  287. snprintf (buf, len, "%s/%s/", PREFIX, PLIST_PREFIX);
  288. strcat (buf, md5);
  289. }
  290. FILE * open_keyfile (struct Channel *p)
  291. {
  292. char buffer[MAX_DATA];
  293. struct stat stbuf;
  294. if (p == NULL) return NULL;
  295. snprintf (buffer, MAX_LINE, "%s/keysample", p->fname);
  296. if (stat (buffer, &stbuf) == 0)
  297. {
  298. if (!S_ISREG (stbuf.st_mode))
  299. {
  300. PDEBUG ("File %s exist and not a regular file", buffer);
  301. return NULL;
  302. }
  303. }
  304. return fopen (buffer, "r");
  305. }
  306. int send_mplist_spupdate (struct Channel *pc, void * data)
  307. {
  308. int iddiff, prev;
  309. char *buf, buffer[MAX_DATA];
  310. struct Edge *pedge;
  311. struct LiveChannelInfo *pcinfo;
  312. struct SPUpdate s;
  313. struct logrec lrec;
  314. time_t slot;
  315. if (pc == NULL || pc->pcinfo == NULL || pc->pcinfo->mlist == NULL
  316. || pc->pcinfo->maxID == 0)
  317. {
  318. PDEBUG ("Wrong playlist data.n");
  319. return -1;
  320. }
  321. pcinfo = pc->pcinfo;
  322. if (pcinfo->s.maxBlockID == 0)
  323. {
  324. fseek (pcinfo->keyfile, 0, SEEK_SET);
  325. if (fread (&lrec, 1, sizeof(struct logrec), pcinfo->keyfile) != sizeof (struct logrec))
  326. {
  327. PDEBUG ("Cannot read keysamplen");
  328. return -1;
  329. }
  330. pcinfo->s.minBlockID = (((CurrentTime - FIX_MAGIC ) * 16 + pcinfo->maxID - 1) / pcinfo->maxID) * pcinfo->maxID;
  331. pcinfo->s.maxBlockID = lrec.id;
  332. if (pcinfo->s.maxKeySample == 0)
  333. pcinfo->s.maxKeySample = lrec.keysample;
  334. if (pcinfo->s.minKeySample == 0)
  335. pcinfo->s.minKeySample = CurrentTime;
  336. // pcinfo->s.minKeySample = lrec.keysample;
  337. }
  338. s = pcinfo->s;
  339. s.maxBlockID += s.minBlockID;
  340. s.maxKeySample = ((long long)CurrentTime) * 10000000;
  341. s.minKeySample = ((long long)(s.minKeySample)) * 10000000;
  342. if (pcinfo->updated < CurrentTime && pc->numclient > 0)
  343. {
  344. pcinfo->updated = CurrentTime;
  345. buf = buffer + sizeof (int);
  346. *(unsigned char *) buf = P2P_SPUPDATE;
  347. buf += sizeof (char);
  348. memcpy (buf, pc->channel_md5, MD5_LEN);
  349. buf += MD5_LEN;
  350. memcpy (buf, &s, sizeof (struct SPUpdate));
  351. buf += sizeof (struct SPUpdate);
  352. *(int *) buffer = buf - buffer;
  353. for (pedge = pc->PeerHead; pedge; pedge = pedge->cnext)
  354. {
  355. if (writeMessage (pedge->me, pc, buffer) < 0)
  356. {
  357. PDEBUG ("send SPUPDATE err.n");
  358. }
  359. }
  360. PINFO ("Send spupdate in %d, %d.n", (int)(pcinfo->s.maxKeySample), (int)(pcinfo->s.minKeySample));
  361. }
  362. if (fread (&lrec, 1, sizeof (struct logrec), pcinfo->keyfile) != sizeof (struct logrec))
  363. {
  364. fclose (pcinfo->keyfile);
  365. prev = pcinfo->mlist->m_cursampleid;
  366. pcinfo->mlist->m_cursampleid ++;
  367. if (pcinfo->mlist->m_cursampleid >= pcinfo->mlist->m_totalchannel)
  368. pcinfo->mlist->m_cursampleid = 0;
  369. if ((pcinfo->keyfile = open_keyfile (pcinfo->mlist->m_lists[pcinfo->mlist->m_cursampleid])) == NULL)
  370. {
  371. PDEBUG ("Error in new keysample filen");
  372. return -1;
  373. }
  374. fseek (pcinfo->keyfile, SEEK_SET, 0);
  375. if (fread (&lrec, 1, sizeof (struct logrec), pcinfo->keyfile) != sizeof (struct logrec))
  376. {
  377. PDEBUG ("Cannot read keysamplen");
  378. return -1;
  379. }
  380. iddiff = pcinfo->mlist->m_lists[prev]->pcinfo->maxID + lrec.id - pcinfo->mlist->m_lastmaxid;
  381. slot = lrec.keysample;
  382. pcinfo->s.maxKeySample = lrec.keysample;
  383. } else
  384. {
  385. if ((slot = lrec.keysample - pcinfo->s.maxKeySample) < 0)
  386. slot = 0;
  387. iddiff = lrec.id - pcinfo->mlist->m_lastmaxid;
  388. pcinfo->s.maxKeySample += slot;
  389. }
  390. pcinfo->mlist->m_lastmaxid = lrec.id;
  391. pcinfo->s.maxBlockID += iddiff;
  392. timer_add (CurrentTime+slot, (TimerFunc)send_mplist_spupdate, pc, NULL);
  393. return 0;
  394. }
  395. int init_mplistchannel (struct Channel *p)
  396. {
  397. struct LiveChannelInfo *pcinfo = p->pcinfo;
  398. if (p->maxblocksize == 0) p->maxblocksize = DEFAULT_BLOCK;
  399. pcinfo->max_queue = BLOCK_PER_FILE;
  400. if ((pcinfo->keyfile = open_keyfile (pcinfo->mlist->m_lists[0])) == NULL)
  401. {
  402. PDEBUG ("Error in new keysample filen");
  403. return -1;
  404. }
  405. pcinfo->total = 0;
  406. pcinfo->isSave = 0;
  407. timer_add (CurrentTime, (TimerFunc)send_mplist_spupdate, p, NULL);
  408. return 0;
  409. }
  410. struct Channel *newMPListChannel (char *name, char *cmd5, float bitrate, int maxblocksize, int nchannel, struct Channel **pchannel)
  411. {
  412. int i, id, startID=0;
  413. struct Channel *p;
  414. struct LiveChannelInfo *pcinfo;
  415. if (NumNewChannel >= MAX_CHANNEL) return (struct Channel *)0;
  416. p = (struct Channel *)calloc (sizeof (struct Channel), 1);
  417. memcpy (p->channel_md5, cmd5, MD5_LEN);
  418. if (name) strncpy (p->channel_name, name, sizeof (p->channel_name));
  419. p->channel_md5[MD5_LEN] = 0;
  420. p->upsize = 0;
  421. p->downsize = 0;
  422. p->maxblocksize = maxblocksize;
  423. p->ctime = time (NULL);
  424. p->pcinfo = (struct LiveChannelInfo *)calloc (sizeof (struct LiveChannelInfo), 1);
  425. pcinfo = p->pcinfo;
  426. pcinfo->bitrate = bitrate;
  427. pcinfo->mlist = (struct MList *)calloc (sizeof (struct MList), 1);
  428. pcinfo->mlist->m_totalchannel = nchannel;
  429. pcinfo->media = calloc (nchannel, sizeof (struct MediaData));
  430. pcinfo->max_channel = nchannel;
  431. for (i=0; i<nchannel; i++)
  432. {
  433. pchannel[i]->ref ++;
  434. pcinfo->mlist->m_lists[i] = pchannel[i];
  435. pcinfo->mlist->m_startID[i] = startID;
  436. pcinfo->maxID += pchannel[i]->pcinfo->maxID;
  437. addMedia (p, startID, pchannel[i]->pcinfo->maxID, pchannel[i]->pcinfo->media[0].dlen, pchannel[i]->pcinfo->media[0].data, pchannel[i]->channel_name);
  438. startID += pchannel[i]->pcinfo->maxID;
  439. }
  440. if (init_mplistchannel (p) < 0)
  441. {
  442. PDEBUG ("newPlistChannel error for %p.", p);
  443. free_livechannel (p);
  444. free (pcinfo);
  445. free (p);
  446. return (struct Channel *)0;
  447. }
  448. id = hash_str (p->channel_md5, MD5_LEN);
  449. PDEBUG("newMPlistChannel hash %.32s(fname=%s) to %d.n", p->channel_md5, p->fname, id);
  450. p->next = ChannelHash[id];
  451. ChannelHash[id] = p;
  452. p->lnext = ChannelList;
  453. ChannelList = p;
  454. NumNewChannel ++;
  455. return p;
  456. }
  457. struct Channel *add_mplist_channel (char *buffer, char *md5)
  458. {
  459. int i=0;
  460. FILE *in;
  461. struct stat stbuf;
  462. struct Channel *pc=NULL, *pchannel[MAX_FILEINPUT];
  463. char *data=NULL, cname[MAX_DATA], buf[MAX_DATA];
  464. float bitrate=0.0;
  465. int bsize=16384, dlen=0;
  466. if (stat (buffer, &stbuf) < 0) return NULL;
  467. if (S_ISREG (stbuf.st_mode))
  468. {
  469. if ((in=fopen (buffer, "r")) == NULL)
  470. {
  471. PDEBUG ("Error in open file %s.n", buffer);
  472. return NULL;
  473. }
  474. while (fgets (buf, MAX_DATA, in))
  475. {
  476. switch (i)
  477. {
  478. case 0:
  479. bitrate = atof (buf);
  480. i++;
  481. break;
  482. case 1:
  483. if (buf[strlen(buf)-1] == 'r' || buf[strlen(buf)-1] == 'n')
  484. buf[strlen(buf)-1] = 0;
  485. strncpy (cname, buf, sizeof(cname));
  486. i++;
  487. break;
  488. default:
  489. buf[MD5_LEN] = 0;
  490. if ((pc = getProgrambymd5 (buf, MD5_LEN)) != NULL)
  491. {
  492. pchannel[i-2] = pc;
  493. i++;
  494. }
  495. break;
  496. }
  497. }
  498. fclose (in);
  499. if (i <= 2) return NULL;
  500. dlen = pchannel[0]->pcinfo->media[0].dlen;
  501. data = pchannel[0]->pcinfo->media[0].data;
  502. // snprintf (chname, MAX_DATA, "%s/%s/%s.mediadata", WWW_ROOT, NET_NAME[0], pchannel[0]->channel_name);
  503. // data = read_file (chname, &dlen);
  504. if ((pc=newMPListChannel (cname, md5, bitrate, bsize, i-2, pchannel)) == NULL)
  505. {
  506. PDEBUG ("Error in newMPListChannel %s.n", md5);
  507. // free (data);
  508. return NULL;
  509. }
  510. if (pc != NULL)
  511. strncpy (pc->fname, buffer, CHNLURL_LEN);
  512. #ifdef TEST
  513. for (i=0; i<MAX_TS; i++) {
  514. if (dlen <= 0 || data == NULL || buildGTV (pc, dlen, data, i) < 0)
  515. continue;
  516. }
  517. #endif
  518. // free (data);
  519. }
  520. return pc;
  521. }
  522. void apply_update (struct Channel *p, void *arg)
  523. {
  524. struct stat stbuf;
  525. // char buffer[MAX_DATA];
  526. struct LiveChannelInfo *pc = p->pcinfo;
  527. if (pc && pc->mlist != NULL)
  528. {
  529. if (stat (p->fname, &stbuf) != 0)
  530. pc->status = 1;
  531. }
  532. }
  533. int check_newplist ()
  534. {
  535. int i;
  536. struct Channel *pc;
  537. DIR *entry;
  538. struct dirent *pd;
  539. char buffer[MAX_DATA];
  540. char tmp0[MD5_LEN+1], tmp[MD5_LEN+1];
  541. // if (Changed == 0) return 0;
  542. snprintf (buffer, MAX_DATA, "%s/%s/", PREFIX, PLIST_PREFIX);
  543. if ((entry = opendir (buffer)) == NULL)
  544. {
  545. return -1;
  546. }
  547. while ((pd = readdir (entry)) != NULL)
  548. {
  549. if (strcmp (pd->d_name, ".") == 0 || strcmp (pd->d_name, "..") == 0 || strlen (pd->d_name) != MD5_LEN)
  550. continue;
  551. snprintf (buffer, MAX_DATA, "%s_%s", defaultspip, pd->d_name);
  552. md5_calc ((unsigned char *) tmp0,
  553. (unsigned char *) buffer, strlen (buffer));
  554. for (i = 0; i < MD5_LEN; i += 2)
  555. sprintf (tmp + i, "%02x", (unsigned char) tmp0[i / 2]);
  556. tmp[MD5_LEN] = 0;
  557. if ((pc = findChannel (tmp, strlen (tmp))) != NULL)
  558. {
  559. if (pc->pcinfo != NULL) pc->pcinfo->status = 0;
  560. continue;
  561. }
  562. snprintf (buffer, MAX_DATA, "%s/%s/%s", PREFIX, PLIST_PREFIX, pd->d_name);
  563. add_mplist_channel (buffer, tmp);
  564. }
  565. closedir (entry);
  566. apply_list (ChannelList, apply_update, NULL);
  567. Changed = 0;
  568. return 0;
  569. }
  570. int locate_mplist_by_id (struct Channel *pc, unsigned int id, char *buf, int max)
  571. {
  572. int result;
  573. int i, j;
  574. struct LiveChannelInfo *c = pc->pcinfo;
  575. if (c == NULL)
  576. {
  577. PDEBUG ("c is null.n");
  578. return -1;
  579. }
  580. i = id % c->maxID;
  581. for (j=0; j<pc->pcinfo->mlist->m_totalchannel; j++)
  582. if (pc->pcinfo->mlist->m_startID[j] > i) break;
  583. j--;
  584. if (j < 0)
  585. {
  586. PDEBUG ("Internal error in locate id %d.n", id);
  587. return -1;
  588. }
  589. if ((result = locateprog_by_id (pc->pcinfo->mlist->m_lists[j], i-pc->pcinfo->mlist->m_startID[j], buf, max)) > 0)
  590. ((int *)buf)[0] = id;
  591. return result;
  592. }
  593. // Return 1 to indicate write available, return 0 to indicate now writable.
  594. inline int newChannelFile (struct Channel *p)
  595. {
  596. int result;
  597. struct stat stbuf;
  598. char buffer[MAX_LINE];
  599. struct LiveChannelInfo *pcinfo = p->pcinfo;
  600. if (pcinfo->numinput >= MAX_FILEINPUT)
  601. {
  602. PDEBUG ("Max file input has been reached, %s:%d.n", p->fname,pcinfo->numinput);
  603. return -1;
  604. }
  605. snprintf (buffer, MAX_LINE, "%s/%d", p->fname, pcinfo->numinput);
  606. if (stat (buffer, &stbuf) == 0)
  607. {
  608. if (stbuf.st_size / (p->maxblocksize+2*sizeof (int)) < pcinfo->max_queue)
  609. {
  610. if ((pcinfo->input[pcinfo->numinput] = fopen (buffer, "a+")) == NULL)
  611. {
  612. PDEBUG ("Cannot open file %s.n", buffer);
  613. return -1;
  614. }
  615. result = 1;
  616. } else
  617. {
  618. if ((pcinfo->input[pcinfo->numinput] = fopen (buffer, "r+")) == NULL)
  619. {
  620. PDEBUG ("Cannot open file %s.n", buffer);
  621. return -1;
  622. }
  623. result = 0;
  624. }
  625. pcinfo->numblocks = stbuf.st_size /(p->maxblocksize+2*sizeof (int));
  626. pcinfo->maxID += stbuf.st_size / (p->maxblocksize+2*sizeof(int));
  627. if (stbuf.st_size % (p->maxblocksize + 2*sizeof(int))) return -1;
  628. } else
  629. {
  630. if ((pcinfo->input[pcinfo->numinput] = fopen (buffer, "w+")) == NULL)
  631. {
  632. PDEBUG ("Cannot open file %s.n", buffer);
  633. return -1;
  634. }
  635. pcinfo->numblocks = 0;
  636. result = 1;
  637. }
  638. pcinfo->numinput ++;
  639. return result;
  640. }
  641. int init_livechannel (struct Channel *p)
  642. {
  643. int i;
  644. struct stat stbuf;
  645. char buffer[MAX_LINE];
  646. struct LiveChannelInfo *pcinfo = p->pcinfo;
  647. if (stat (p->fname, &stbuf) == 0 && (!S_ISDIR (stbuf.st_mode)))
  648. {
  649. PDEBUG ("directory %s not exist or not a dir.n", p->fname);
  650. return -1;
  651. }
  652. mkdir (p->fname, 0777);
  653. if (p->maxblocksize == 0) p->maxblocksize = DEFAULT_BLOCK;
  654. pcinfo->max_queue = BLOCK_PER_FILE;
  655. while ((i = newChannelFile (p)) == 0);
  656. if (i < 0) return -1;
  657. p->db = pcinfo->input[pcinfo->numinput-1];
  658. snprintf (buffer, MAX_LINE, "%s/keysample", p->fname);
  659. if (stat (buffer, &stbuf) == 0)
  660. {
  661. if (!S_ISREG (stbuf.st_mode))
  662. {
  663. PDEBUG ("File %s exist and not a regular file", buffer);
  664. return -1;
  665. }
  666. }
  667. pcinfo->keyfile = fopen (buffer, "a+");
  668. if (pcinfo->keyfile == NULL)
  669. {
  670. PDEBUG ("File %s can not be opened.n", buffer);
  671. return -1;
  672. }
  673. pcinfo->total = 0;
  674. return 0;
  675. }
  676. int locate_by_id (struct Channel *pc, unsigned int id, char *buf, int max)
  677. {
  678. int i, pos, *msg;
  679. struct LiveChannelInfo *c = pc->pcinfo;
  680. if (c == NULL || id > c->maxID)
  681. {
  682. PDEBUG ("c is %p and id is (%d,%d).n", c, c->maxID, id);
  683. return -1;
  684. }
  685. if (pc->maxblocksize + 2*sizeof(int) > max)
  686. {
  687. PDEBUG ("too small buffer %d for %d", max, pc->maxblocksize);
  688. return -2;
  689. }
  690. i = (id - c->startid) % c->max_queue;
  691. pos = (id - c->startid) / c->max_queue;
  692. if (pos >= c->numinput || c->input[pos] == NULL)
  693. {
  694. PDEBUG ("file %d does not exist. (%d,%p)n", pos, c->numinput, c->input[pos]);
  695. return -1;
  696. }
  697. if (fseeko (c->input[pos], ((off_t)(i)) * (pc->maxblocksize + 2*sizeof (int)), SEEK_SET) != 0)
  698. {
  699. PDEBUG ("Fssek failed. (%d, %d, %d)n", pos, i, pc->maxblocksize);
  700. return -1;
  701. }
  702. if ((i=fread (buf, 1, pc->maxblocksize+2*sizeof (int), c->input[pos])) <= 2*sizeof (int) || i < ((int *)buf)[1]+2*sizeof(int))
  703. {
  704. PDEBUG ("Fread failed. (%d, %d, %d)n", pos, i, pc->maxblocksize);
  705. return -1;
  706. }
  707. msg = (int *)buf;
  708. if (msg[1] > pc->maxblocksize || msg[1] <= 0)
  709. {
  710. PDEBUG ("msg format error. (%d, %d, %d)n", msg[0], msg[1], pc->maxblocksize);
  711. return -1;
  712. }
  713. PINFO ("Found block. (%d, %d, %d, %d)n", id, msg[0], msg[1], pc->maxblocksize);
  714. // msg[0] = id;
  715. pc->upsize += msg[1];
  716. return msg[1];
  717. }
  718. int saveBlock (struct Channel *c, char *buf, struct Session *p)
  719. {
  720. struct logrec lrec;
  721. int id, size;
  722. int j=0;
  723. unsigned long long keysample;
  724. struct SPUpdate *s;
  725. struct LiveChannelInfo *pcinfo;
  726. if ((!c) || (pcinfo = c->pcinfo) == NULL)
  727. {
  728. PDEBUG ("saveBlock c is null.n");
  729. return -1;
  730. }
  731. assert (buf);
  732. size = ((int *)buf)[1];
  733. if (size > MAX_BLOCK_SIZE || size <= MIN_BLOCK_SIZE)
  734. {
  735. PDEBUG ("saveBlock size is %d and id is %d.n", size, id);
  736. return 0;
  737. }
  738. if (c->maxblocksize == 0)
  739. {
  740. c->maxblocksize = size;
  741. pcinfo->max_queue = BLOCK_PER_FILE;
  742. }
  743. else if (size > c->maxblocksize)
  744. {
  745. PDEBUG ("saveBlock maxblocksize is %d, size is %d and id is %d.n", c->maxblocksize, size, id);
  746. return 0;
  747. }
  748. if (pcinfo->max_queue == 0)
  749. {
  750. PDEBUG ("c->max_queue is 0n");
  751. return -1;
  752. }
  753. // PDEBUG ("Recv %d(%d) Save2 (%d,%d,%d), dataSource %p and now is %pn", ((int*)buf)[0], (int)size, pcinfo->numinput, pcinfo->numblocks, pcinfo->maxID, pcinfo->dataSource, p);
  754. id = pcinfo->maxID;
  755. ((int *)buf)[0] = id;
  756. if (fseeko (c->db, ((off_t)(pcinfo->numblocks)) * (c->maxblocksize + 2*sizeof (int)), SEEK_SET) != 0)
  757. {
  758. PDEBUG ("Error in fsseko.n");
  759. return -1;
  760. }
  761. if (fwrite (buf, c->maxblocksize+2*sizeof(int), 1, c->db) != 1)
  762. {
  763. PDEBUG ("fwrite error in saveBlock:%.32s:%s", c->channel_md5, c->fname);
  764. return -1;
  765. }
  766. j = ((int *)buf)[2];
  767. if (j <= 0 || j >= size-sizeof(keysample)) keysample = 0;
  768. else keysample = *(unsigned long long *) (buf+2*sizeof(int)+j);
  769. s = &(pcinfo->s);
  770. if (keysample > 0)
  771. {
  772. if(s->maxKeySample < keysample)
  773. {
  774. //the increasement should not be larger than 1000 seconds!
  775. if(s->maxKeySample == 0 || (keysample-s->maxKeySample)/10000000 < 1000)
  776. {
  777. s->maxKeySample = keysample;
  778. } else
  779. PDEBUG("Error Keysample at block %d,(%lld,%lld). n", id, keysample, s->maxKeySample);
  780. }
  781. if (s->minKeySample == 0 || keysample < s->minKeySample)
  782. {
  783. s->minKeySample = keysample;
  784. }
  785. lrec.id = id;
  786. lrec.keysample = keysample/10000000;
  787. fwrite (&lrec, sizeof (lrec), 1, pcinfo->keyfile);
  788. fflush (pcinfo->keyfile);
  789. }
  790. if (id < s->minBlockID || s->maxBlockID == 0)
  791. {
  792. s->minBlockID = id;
  793. }
  794. if (id > s->maxBlockID)
  795. {
  796. s->maxBlockID = id;
  797. }
  798. if (pcinfo->dataSource != p)
  799. {
  800. PDEBUG ("dataSource %p is not equal p %p.n", pcinfo->dataSource, p);
  801. pcinfo->dataSource = p;
  802. }
  803. fflush (c->db);
  804. pcinfo->total ++;
  805. pcinfo->maxID ++;
  806. pcinfo->numblocks ++;
  807. c->downsize += size;
  808. if (pcinfo->numblocks >= pcinfo->max_queue)
  809. {
  810. while ((j = newChannelFile (c)) == 0);
  811. if (j < 0) return -1;
  812. c->db = pcinfo->input[pcinfo->numinput-1];
  813. }
  814. return size;
  815. }
  816. int free_livechannel (struct Channel *p)
  817. {
  818. char buffer[MAX_LINE];
  819. struct LiveChannelInfo *pcinfo = p->pcinfo;
  820. int i;
  821. send_all_spupdate (p, NULL);
  822. for (i=0; i<pcinfo->numinput; i++)
  823. {
  824. if (pcinfo->input[i]) fclose (pcinfo->input[i]);
  825. if (!pcinfo->isSave)
  826. {
  827. snprintf (buffer, MAX_LINE, "%s/%d", p->fname, i);
  828. unlink (buffer);
  829. }
  830. }
  831. p->db = NULL;
  832. if (pcinfo->keyfile != NULL) fclose (pcinfo->keyfile);
  833. if (!pcinfo->isSave)
  834. {
  835. snprintf (buffer, MAX_LINE, "%s/keysample", p->fname);
  836. unlink (buffer);
  837. rmdir (p->fname);
  838. }
  839. if (pcinfo->dataSource != NULL)
  840. {
  841. pcinfo->dataSource->pc = NULL;
  842. Clientclosure (pcinfo->dataSource - TRACKER[TYPE_CS].head, TYPE_CS);
  843. pcinfo->dataSource = NULL;
  844. }
  845. if (pcinfo->mlist)
  846. {
  847. timer_remove (p, NULL);
  848. for (i=0; i<pcinfo->max_channel; i++)
  849. {
  850. if (pcinfo->mlist->m_lists[i] != NULL)
  851. {
  852. pcinfo->mlist->m_lists[i]->ref --;
  853. if (pcinfo->mlist->m_lists[i]->ref <= 0)
  854. freeProgram (pcinfo->mlist->m_lists[i], NULL);
  855. }
  856. }
  857. free (pcinfo->mlist);
  858. }
  859. freeMedia (p);
  860. #ifdef TEST
  861. for (i=0; i<MAX_TS; i++)
  862. {
  863. sprintf (buffer, "%s/%s/%s.gtv", WWW_ROOT, NET_NAME[i], p->channel_name);
  864. remove (buffer);
  865. }
  866. #endif
  867. return 0;
  868. }
  869. #endif
  870. struct Channel *newLiveChannel (char *name, struct Session *source, char *cmd5, float bitrate, int maxblocksize)
  871. {
  872. int id;
  873. struct Channel *p;
  874. struct LiveChannelInfo *pcinfo;
  875. if (NumNewChannel >= MAX_CHANNEL) return (struct Channel *)0;
  876. p = (struct Channel *)calloc (sizeof (struct Channel), 1);
  877. memcpy (p->channel_md5, cmd5, MD5_LEN);
  878. if (name) strncpy (p->channel_name, name, sizeof (p->channel_name));
  879. p->channel_md5[MD5_LEN] = 0;
  880. p->upsize = 0;
  881. p->downsize = 0;
  882. p->maxblocksize = maxblocksize;
  883. p->ctime = time (NULL);
  884. p->pcinfo = (struct LiveChannelInfo *)calloc (sizeof (struct LiveChannelInfo), 1);
  885. pcinfo = p->pcinfo;
  886. pcinfo->dataSource = source;
  887. pcinfo->bitrate = bitrate;
  888. buildLivePath (p->fname, CHNLURL_LEN, cmd5);
  889. if (init_livechannel (p) < 0)
  890. {
  891. PDEBUG ("newLiveChannel error for %p.", p);
  892. free_livechannel (p);
  893. free (pcinfo);
  894. free (p);
  895. return (struct Channel *)0;
  896. }
  897. id = hash_str (p->channel_md5, MD5_LEN);
  898. PDEBUG("newLiveChannel hash %.32s(fname=%s) to %d.n", p->channel_md5, p->fname, id);
  899. p->next = ChannelHash[id];
  900. ChannelHash[id] = p;
  901. p->lnext = ChannelList;
  902. ChannelList = p;
  903. NumNewChannel ++;
  904. return p;
  905. }