livechannel.c
资源名称:p2p_vod.rar [点击查看]
上传用户:liguizhu
上传日期:2015-11-01
资源大小:2422k
文件大小:25k
源码类别:
P2P编程
开发平台:
Visual C++
- /*
- * Openmysee
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
- *
- */
- #include "echo.h"
- int NumNewChannel;
- struct Channel *ChannelHash[MAX_CHANNEL];
- struct Channel *ChannelList;
- extern char *PREFIX;
- extern struct ServerDesc TRACKER[MAX_TYPE];
- extern int Clientclosure (int listnum, int type);
- inline int hash_str (unsigned char *str, int len)
- {
- int hash;
- for (hash=0; len; len--, str++)
- hash += (hash << 5) - hash + (*str);
- return hash & (MAX_CHANNEL - 1);
- }
- struct Channel *getChannel (struct Channel **phash, char *name, int len)
- {
- int id = hash_str (name, len);
- struct Channel *p;
- for (p=phash[id]; p; p=p->next)
- {
- if (strncmp (name, p->channel_md5, len) == 0)
- return p;
- }
- PDEBUG("Cannot findChannel hash %.32s.n", name);
- return NULL;
- }
- void apply_hash (struct Channel **phash, void apply(struct Channel *, void *), void *p)
- {
- int i;
- struct Channel *pc, *nextpc;
- for (i=0; i<MAX_CHANNEL; i++)
- {
- for (pc=phash[i]; pc; pc=nextpc)
- {
- nextpc = pc->next;
- apply (pc, p);
- }
- }
- }
- void apply_list (struct Channel *plist, void apply(struct Channel *, void *), void *p)
- {
- struct Channel *pc, *nextpc;
- for (pc=plist; pc; pc=nextpc)
- {
- nextpc = pc->lnext;
- apply (pc, p);
- }
- }
- int freeChannel (struct Channel **phash, struct Channel **plist, int *count, struct Channel *p)
- {
- int id = hash_str (p->channel_md5, MD5_LEN);
- struct Edge *nextedge, *pedge;
- struct Channel *pchannel;
- if (phash[id] == p)
- {
- phash[id] = p->next;
- } else
- {
- for (pchannel=phash[id]; pchannel; pchannel=pchannel->next)
- {
- if (pchannel->next == p)
- {
- pchannel->next = p->next;
- break;
- }
- }
- if (!pchannel) return -1;
- }
- if ((*plist) == p)
- {
- *plist = p->lnext;
- } else
- {
- for (pchannel=*plist; pchannel; pchannel=pchannel->lnext)
- {
- if (pchannel->lnext == p)
- {
- pchannel->lnext = p->lnext;
- break;
- }
- }
- if (!pchannel) return -1;
- }
- (*count) --;
- #ifdef __CP_SOURCE
- if (p->db != NULL) fclose (p->db);
- #endif
- if (p->pcinfo)
- {
- free_livechannel (p);
- free (p->pcinfo);
- }
- for (pedge=p->PeerHead; pedge; pedge=nextedge)
- {
- p->numclient --;
- nextedge = pedge->cnext;
- delEdge (pedge);
- }
- free (p);
- return 0;
- }
- inline struct Channel *findChannel (char *name, int len)
- {
- return getChannel (ChannelHash, name, len);
- }
- void freeLiveChannel (struct Channel *pc, void *p)
- {
- freeChannel (ChannelHash, &ChannelList, &NumNewChannel, pc);
- }
- void freeAllChannel ()
- {
- apply_hash (ChannelHash, freeLiveChannel, NULL);
- }
- inline void buildLivePath (char *buf, int len, char *md5)
- {
- snprintf (buf, len, "%s/%s%.2s/", PREFIX, LIVE_PREFIX, md5);
- mkdir (buf, 0777);
- strcat (buf, md5);
- }
- #ifdef __CP_SOURCE
- int locate_by_id (struct Channel *pc, unsigned int id, char *buf, int max)
- {
- int i, *msg;
- struct LiveChannelInfo *c = pc->pcinfo;
- if (c == NULL)
- return -1;
- if (c->indisk == NULL)
- {
- c->max_queue = MAX_QUEUE;
- c->indisk = calloc (c->max_queue, 1);
- c->bitflag = calloc ((c->max_queue+7)/8, 1);
- // if (c->indisk == NULL || c->bitflag == NULL)
- PDEBUG ("allocate memory,%p,%pn",c->indisk, c->bitflag);
- return -1;
- }
- i = id % c->max_queue;
- if (c->indisk[i] == 0 || (pc->type != T_PLIST && c->indisk[i] != (id/c->max_queue + 1)))
- {
- PINFO ("empty flag %d.n", c->indisk[i]);
- return -1;
- }
- if (pc->maxblocksize + 2*sizeof(int) > max)
- {
- PDEBUG ("too small buffer %d for %d", max, pc->maxblocksize);
- return -2;
- }
- if (fseeko (pc->db, ((off_t)(i)) * (pc->maxblocksize + 2*sizeof (int)), SEEK_SET) != 0)
- {
- PDEBUG ("fseek failed.n");
- return -1;
- }
- if ((i=fread (buf, 1, pc->maxblocksize+2*sizeof (int), pc->db)) <= 2*sizeof (int) || i < ((int *)buf)[1]+2*sizeof(int))
- {
- PDEBUG ("Only %d read [%d]n", i, ((int *)buf)[1]);
- return -1;
- }
- msg = (int *)buf;
- if (pc->type == T_PLIST)
- msg[0] = id;
- if (msg[0] != id || msg[1] > pc->maxblocksize || msg[1] <= 0)
- {
- PDEBUG ("Message read is [%d %d]n", msg[0], msg[1]);
- return -1;
- }
- pc->upsize += msg[1];
- return msg[1];
- }
- int saveBlock (struct Channel *c, char *buf, struct Session *p)
- {
- unsigned int pos, id, size;
- struct LiveChannelInfo *pcinfo;
- assert (buf);
- if ((!c) || (pcinfo = c->pcinfo) == NULL)
- {
- PDEBUG ("saveBlock c is null.n");
- return -1;
- }
- if (pcinfo->indisk == NULL)
- {
- pcinfo->max_queue = MAX_QUEUE;
- pcinfo->indisk = calloc (pcinfo->max_queue, 1);
- pcinfo->bitflag = calloc ((pcinfo->max_queue+7)/8, 1);
- if (pcinfo->indisk == NULL || pcinfo->bitflag == NULL)
- return -1;
- }
- id = ((unsigned int *)buf)[0];
- size = ((unsigned int *)buf)[1];
- pos = id % pcinfo->max_queue;
- if (id > 0) clrBit (pcinfo->bitflag, pos);
- if (size > MAX_BLOCK_SIZE || size <= MIN_BLOCK_SIZE || id < 0)
- {
- PDEBUG ("saveBlock:size is %d and id is %d.n", size, id);
- return 0;
- }
- if (c->maxblocksize == 0)
- c->maxblocksize = size;
- else if (size > c->maxblocksize)
- {
- PDEBUG ("saveBlock:maxblocksize is %d, size is %d and id is %d.n", c->maxblocksize, size, id);
- return 0;
- }
- // PDEBUG ("Recv %d(%d) Save2 %d, dataSource %p and now is %pn", (int)id, (int)size, (int)pos, pcinfo->dataSource, p);
- if (fseeko (c->db, ((off_t)(pos)) * (c->maxblocksize + 2*sizeof (int)), SEEK_SET) != 0)
- {
- PDEBUG ("Error in fsseko.n");
- return -1;
- }
- if (fwrite (buf, size+2*sizeof(int), 1, c->db) != 1)
- {
- PDEBUG ("fwrite error in saveBlock:%.32s:%s", c->channel_md5, c->fname);
- return -1;
- }
- if (pcinfo->dataSource != p)
- {
- PDEBUG ("dataSource %p is not equal p %p.n", pcinfo->dataSource, p);
- pcinfo->dataSource = p;
- }
- fflush (c->db);
- pcinfo->total ++;
- c->downsize += size;
- if (id > pcinfo->maxID) pcinfo->maxID = id;
- pcinfo->indisk[pos] = (id/pcinfo->max_queue) + 1;
- if (pcinfo->indisk[pos] == 0)
- PDEBUG("Too large id %ds", id);
- return size;
- }
- int init_livechannel (struct Channel *p)
- {
- if ((p->db = fopen (p->fname, "w+")) == (FILE *)0)
- return -1;
- p->pcinfo->isSave = 0;
- return 0;
- }
- int free_livechannel (struct Channel *p)
- {
- struct LiveChannelInfo *pcinfo = p->pcinfo;
- free (pcinfo->indisk);
- free (pcinfo->bitflag);
- unlink (p->fname);
- if (pcinfo->dataSource != NULL)
- {
- pcinfo->dataSource->pc = NULL;
- PDEBUG ("close CS source for channel %.32s.n", p->channel_md5);
- Clientclosure (pcinfo->dataSource-TRACKER[TYPE_P2PC].head, TYPE_P2PC);
- pcinfo->dataSource = NULL;
- }
- freeMedia (p);
- return 0;
- }
- #endif
- #ifdef __SP_SOURCE
- extern time_t CurrentTime;
- extern char *NET_NAME[];
- extern char *WWW_ROOT;
- extern char *defaultspip;
- int Changed = 1;
- extern int timer_add (unsigned int t, TimerFunc process, void *entity, void *data);
- extern int writeMessage (struct Session *p, struct Channel *pc, char *ptr);
- extern int buildGTV (struct Channel *pc, int datalen, char *data, int type);
- extern inline void freeProgram (struct Channel *, void *);
- extern void send_all_spupdate (struct Channel *pc, struct SPUpdate *s);
- void hup_handler (int sig)
- {
- Changed ++;
- }
- inline void buildPListPath (char *buf, int len, char *md5)
- {
- snprintf (buf, len, "%s/%s/", PREFIX, PLIST_PREFIX);
- strcat (buf, md5);
- }
- FILE * open_keyfile (struct Channel *p)
- {
- char buffer[MAX_DATA];
- struct stat stbuf;
- if (p == NULL) return NULL;
- snprintf (buffer, MAX_LINE, "%s/keysample", p->fname);
- if (stat (buffer, &stbuf) == 0)
- {
- if (!S_ISREG (stbuf.st_mode))
- {
- PDEBUG ("File %s exist and not a regular file", buffer);
- return NULL;
- }
- }
- return fopen (buffer, "r");
- }
- int send_mplist_spupdate (struct Channel *pc, void * data)
- {
- int iddiff, prev;
- char *buf, buffer[MAX_DATA];
- struct Edge *pedge;
- struct LiveChannelInfo *pcinfo;
- struct SPUpdate s;
- struct logrec lrec;
- time_t slot;
- if (pc == NULL || pc->pcinfo == NULL || pc->pcinfo->mlist == NULL
- || pc->pcinfo->maxID == 0)
- {
- PDEBUG ("Wrong playlist data.n");
- return -1;
- }
- pcinfo = pc->pcinfo;
- if (pcinfo->s.maxBlockID == 0)
- {
- fseek (pcinfo->keyfile, 0, SEEK_SET);
- if (fread (&lrec, 1, sizeof(struct logrec), pcinfo->keyfile) != sizeof (struct logrec))
- {
- PDEBUG ("Cannot read keysamplen");
- return -1;
- }
- pcinfo->s.minBlockID = (((CurrentTime - FIX_MAGIC ) * 16 + pcinfo->maxID - 1) / pcinfo->maxID) * pcinfo->maxID;
- pcinfo->s.maxBlockID = lrec.id;
- if (pcinfo->s.maxKeySample == 0)
- pcinfo->s.maxKeySample = lrec.keysample;
- if (pcinfo->s.minKeySample == 0)
- pcinfo->s.minKeySample = CurrentTime;
- // pcinfo->s.minKeySample = lrec.keysample;
- }
- s = pcinfo->s;
- s.maxBlockID += s.minBlockID;
- s.maxKeySample = ((long long)CurrentTime) * 10000000;
- s.minKeySample = ((long long)(s.minKeySample)) * 10000000;
- if (pcinfo->updated < CurrentTime && pc->numclient > 0)
- {
- pcinfo->updated = CurrentTime;
- buf = buffer + sizeof (int);
- *(unsigned char *) buf = P2P_SPUPDATE;
- buf += sizeof (char);
- memcpy (buf, pc->channel_md5, MD5_LEN);
- buf += MD5_LEN;
- memcpy (buf, &s, sizeof (struct SPUpdate));
- buf += sizeof (struct SPUpdate);
- *(int *) buffer = buf - buffer;
- for (pedge = pc->PeerHead; pedge; pedge = pedge->cnext)
- {
- if (writeMessage (pedge->me, pc, buffer) < 0)
- {
- PDEBUG ("send SPUPDATE err.n");
- }
- }
- PINFO ("Send spupdate in %d, %d.n", (int)(pcinfo->s.maxKeySample), (int)(pcinfo->s.minKeySample));
- }
- if (fread (&lrec, 1, sizeof (struct logrec), pcinfo->keyfile) != sizeof (struct logrec))
- {
- fclose (pcinfo->keyfile);
- prev = pcinfo->mlist->m_cursampleid;
- pcinfo->mlist->m_cursampleid ++;
- if (pcinfo->mlist->m_cursampleid >= pcinfo->mlist->m_totalchannel)
- pcinfo->mlist->m_cursampleid = 0;
- if ((pcinfo->keyfile = open_keyfile (pcinfo->mlist->m_lists[pcinfo->mlist->m_cursampleid])) == NULL)
- {
- PDEBUG ("Error in new keysample filen");
- return -1;
- }
- fseek (pcinfo->keyfile, SEEK_SET, 0);
- if (fread (&lrec, 1, sizeof (struct logrec), pcinfo->keyfile) != sizeof (struct logrec))
- {
- PDEBUG ("Cannot read keysamplen");
- return -1;
- }
- iddiff = pcinfo->mlist->m_lists[prev]->pcinfo->maxID + lrec.id - pcinfo->mlist->m_lastmaxid;
- slot = lrec.keysample;
- pcinfo->s.maxKeySample = lrec.keysample;
- } else
- {
- if ((slot = lrec.keysample - pcinfo->s.maxKeySample) < 0)
- slot = 0;
- iddiff = lrec.id - pcinfo->mlist->m_lastmaxid;
- pcinfo->s.maxKeySample += slot;
- }
- pcinfo->mlist->m_lastmaxid = lrec.id;
- pcinfo->s.maxBlockID += iddiff;
- timer_add (CurrentTime+slot, (TimerFunc)send_mplist_spupdate, pc, NULL);
- return 0;
- }
- int init_mplistchannel (struct Channel *p)
- {
- struct LiveChannelInfo *pcinfo = p->pcinfo;
- if (p->maxblocksize == 0) p->maxblocksize = DEFAULT_BLOCK;
- pcinfo->max_queue = BLOCK_PER_FILE;
- if ((pcinfo->keyfile = open_keyfile (pcinfo->mlist->m_lists[0])) == NULL)
- {
- PDEBUG ("Error in new keysample filen");
- return -1;
- }
- pcinfo->total = 0;
- pcinfo->isSave = 0;
- timer_add (CurrentTime, (TimerFunc)send_mplist_spupdate, p, NULL);
- return 0;
- }
- struct Channel *newMPListChannel (char *name, char *cmd5, float bitrate, int maxblocksize, int nchannel, struct Channel **pchannel)
- {
- int i, id, startID=0;
- struct Channel *p;
- struct LiveChannelInfo *pcinfo;
- if (NumNewChannel >= MAX_CHANNEL) return (struct Channel *)0;
- p = (struct Channel *)calloc (sizeof (struct Channel), 1);
- memcpy (p->channel_md5, cmd5, MD5_LEN);
- if (name) strncpy (p->channel_name, name, sizeof (p->channel_name));
- p->channel_md5[MD5_LEN] = 0;
- p->upsize = 0;
- p->downsize = 0;
- p->maxblocksize = maxblocksize;
- p->ctime = time (NULL);
- p->pcinfo = (struct LiveChannelInfo *)calloc (sizeof (struct LiveChannelInfo), 1);
- pcinfo = p->pcinfo;
- pcinfo->bitrate = bitrate;
- pcinfo->mlist = (struct MList *)calloc (sizeof (struct MList), 1);
- pcinfo->mlist->m_totalchannel = nchannel;
- pcinfo->media = calloc (nchannel, sizeof (struct MediaData));
- pcinfo->max_channel = nchannel;
- for (i=0; i<nchannel; i++)
- {
- pchannel[i]->ref ++;
- pcinfo->mlist->m_lists[i] = pchannel[i];
- pcinfo->mlist->m_startID[i] = startID;
- pcinfo->maxID += pchannel[i]->pcinfo->maxID;
- addMedia (p, startID, pchannel[i]->pcinfo->maxID, pchannel[i]->pcinfo->media[0].dlen, pchannel[i]->pcinfo->media[0].data, pchannel[i]->channel_name);
- startID += pchannel[i]->pcinfo->maxID;
- }
- if (init_mplistchannel (p) < 0)
- {
- PDEBUG ("newPlistChannel error for %p.", p);
- free_livechannel (p);
- free (pcinfo);
- free (p);
- return (struct Channel *)0;
- }
- id = hash_str (p->channel_md5, MD5_LEN);
- PDEBUG("newMPlistChannel hash %.32s(fname=%s) to %d.n", p->channel_md5, p->fname, id);
- p->next = ChannelHash[id];
- ChannelHash[id] = p;
- p->lnext = ChannelList;
- ChannelList = p;
- NumNewChannel ++;
- return p;
- }
- struct Channel *add_mplist_channel (char *buffer, char *md5)
- {
- int i=0;
- FILE *in;
- struct stat stbuf;
- struct Channel *pc=NULL, *pchannel[MAX_FILEINPUT];
- char *data=NULL, cname[MAX_DATA], buf[MAX_DATA];
- float bitrate=0.0;
- int bsize=16384, dlen=0;
- if (stat (buffer, &stbuf) < 0) return NULL;
- if (S_ISREG (stbuf.st_mode))
- {
- if ((in=fopen (buffer, "r")) == NULL)
- {
- PDEBUG ("Error in open file %s.n", buffer);
- return NULL;
- }
- while (fgets (buf, MAX_DATA, in))
- {
- switch (i)
- {
- case 0:
- bitrate = atof (buf);
- i++;
- break;
- case 1:
- if (buf[strlen(buf)-1] == 'r' || buf[strlen(buf)-1] == 'n')
- buf[strlen(buf)-1] = 0;
- strncpy (cname, buf, sizeof(cname));
- i++;
- break;
- default:
- buf[MD5_LEN] = 0;
- if ((pc = getProgrambymd5 (buf, MD5_LEN)) != NULL)
- {
- pchannel[i-2] = pc;
- i++;
- }
- break;
- }
- }
- fclose (in);
- if (i <= 2) return NULL;
- dlen = pchannel[0]->pcinfo->media[0].dlen;
- data = pchannel[0]->pcinfo->media[0].data;
- // snprintf (chname, MAX_DATA, "%s/%s/%s.mediadata", WWW_ROOT, NET_NAME[0], pchannel[0]->channel_name);
- // data = read_file (chname, &dlen);
- if ((pc=newMPListChannel (cname, md5, bitrate, bsize, i-2, pchannel)) == NULL)
- {
- PDEBUG ("Error in newMPListChannel %s.n", md5);
- // free (data);
- return NULL;
- }
- if (pc != NULL)
- strncpy (pc->fname, buffer, CHNLURL_LEN);
- #ifdef TEST
- for (i=0; i<MAX_TS; i++) {
- if (dlen <= 0 || data == NULL || buildGTV (pc, dlen, data, i) < 0)
- continue;
- }
- #endif
- // free (data);
- }
- return pc;
- }
- void apply_update (struct Channel *p, void *arg)
- {
- struct stat stbuf;
- // char buffer[MAX_DATA];
- struct LiveChannelInfo *pc = p->pcinfo;
- if (pc && pc->mlist != NULL)
- {
- if (stat (p->fname, &stbuf) != 0)
- pc->status = 1;
- }
- }
- int check_newplist ()
- {
- int i;
- struct Channel *pc;
- DIR *entry;
- struct dirent *pd;
- char buffer[MAX_DATA];
- char tmp0[MD5_LEN+1], tmp[MD5_LEN+1];
- // if (Changed == 0) return 0;
- snprintf (buffer, MAX_DATA, "%s/%s/", PREFIX, PLIST_PREFIX);
- if ((entry = opendir (buffer)) == NULL)
- {
- return -1;
- }
- while ((pd = readdir (entry)) != NULL)
- {
- if (strcmp (pd->d_name, ".") == 0 || strcmp (pd->d_name, "..") == 0 || strlen (pd->d_name) != MD5_LEN)
- continue;
- snprintf (buffer, MAX_DATA, "%s_%s", defaultspip, pd->d_name);
- md5_calc ((unsigned char *) tmp0,
- (unsigned char *) buffer, strlen (buffer));
- for (i = 0; i < MD5_LEN; i += 2)
- sprintf (tmp + i, "%02x", (unsigned char) tmp0[i / 2]);
- tmp[MD5_LEN] = 0;
- if ((pc = findChannel (tmp, strlen (tmp))) != NULL)
- {
- if (pc->pcinfo != NULL) pc->pcinfo->status = 0;
- continue;
- }
- snprintf (buffer, MAX_DATA, "%s/%s/%s", PREFIX, PLIST_PREFIX, pd->d_name);
- add_mplist_channel (buffer, tmp);
- }
- closedir (entry);
- apply_list (ChannelList, apply_update, NULL);
- Changed = 0;
- return 0;
- }
- int locate_mplist_by_id (struct Channel *pc, unsigned int id, char *buf, int max)
- {
- int result;
- int i, j;
- struct LiveChannelInfo *c = pc->pcinfo;
- if (c == NULL)
- {
- PDEBUG ("c is null.n");
- return -1;
- }
- i = id % c->maxID;
- for (j=0; j<pc->pcinfo->mlist->m_totalchannel; j++)
- if (pc->pcinfo->mlist->m_startID[j] > i) break;
- j--;
- if (j < 0)
- {
- PDEBUG ("Internal error in locate id %d.n", id);
- return -1;
- }
- if ((result = locateprog_by_id (pc->pcinfo->mlist->m_lists[j], i-pc->pcinfo->mlist->m_startID[j], buf, max)) > 0)
- ((int *)buf)[0] = id;
- return result;
- }
- // Return 1 to indicate write available, return 0 to indicate now writable.
- inline int newChannelFile (struct Channel *p)
- {
- int result;
- struct stat stbuf;
- char buffer[MAX_LINE];
- struct LiveChannelInfo *pcinfo = p->pcinfo;
- if (pcinfo->numinput >= MAX_FILEINPUT)
- {
- PDEBUG ("Max file input has been reached, %s:%d.n", p->fname,pcinfo->numinput);
- return -1;
- }
- snprintf (buffer, MAX_LINE, "%s/%d", p->fname, pcinfo->numinput);
- if (stat (buffer, &stbuf) == 0)
- {
- if (stbuf.st_size / (p->maxblocksize+2*sizeof (int)) < pcinfo->max_queue)
- {
- if ((pcinfo->input[pcinfo->numinput] = fopen (buffer, "a+")) == NULL)
- {
- PDEBUG ("Cannot open file %s.n", buffer);
- return -1;
- }
- result = 1;
- } else
- {
- if ((pcinfo->input[pcinfo->numinput] = fopen (buffer, "r+")) == NULL)
- {
- PDEBUG ("Cannot open file %s.n", buffer);
- return -1;
- }
- result = 0;
- }
- pcinfo->numblocks = stbuf.st_size /(p->maxblocksize+2*sizeof (int));
- pcinfo->maxID += stbuf.st_size / (p->maxblocksize+2*sizeof(int));
- if (stbuf.st_size % (p->maxblocksize + 2*sizeof(int))) return -1;
- } else
- {
- if ((pcinfo->input[pcinfo->numinput] = fopen (buffer, "w+")) == NULL)
- {
- PDEBUG ("Cannot open file %s.n", buffer);
- return -1;
- }
- pcinfo->numblocks = 0;
- result = 1;
- }
- pcinfo->numinput ++;
- return result;
- }
- int init_livechannel (struct Channel *p)
- {
- int i;
- struct stat stbuf;
- char buffer[MAX_LINE];
- struct LiveChannelInfo *pcinfo = p->pcinfo;
- if (stat (p->fname, &stbuf) == 0 && (!S_ISDIR (stbuf.st_mode)))
- {
- PDEBUG ("directory %s not exist or not a dir.n", p->fname);
- return -1;
- }
- mkdir (p->fname, 0777);
- if (p->maxblocksize == 0) p->maxblocksize = DEFAULT_BLOCK;
- pcinfo->max_queue = BLOCK_PER_FILE;
- while ((i = newChannelFile (p)) == 0);
- if (i < 0) return -1;
- p->db = pcinfo->input[pcinfo->numinput-1];
- snprintf (buffer, MAX_LINE, "%s/keysample", p->fname);
- if (stat (buffer, &stbuf) == 0)
- {
- if (!S_ISREG (stbuf.st_mode))
- {
- PDEBUG ("File %s exist and not a regular file", buffer);
- return -1;
- }
- }
- pcinfo->keyfile = fopen (buffer, "a+");
- if (pcinfo->keyfile == NULL)
- {
- PDEBUG ("File %s can not be opened.n", buffer);
- return -1;
- }
- pcinfo->total = 0;
- return 0;
- }
- int locate_by_id (struct Channel *pc, unsigned int id, char *buf, int max)
- {
- int i, pos, *msg;
- struct LiveChannelInfo *c = pc->pcinfo;
- if (c == NULL || id > c->maxID)
- {
- PDEBUG ("c is %p and id is (%d,%d).n", c, c->maxID, id);
- return -1;
- }
- if (pc->maxblocksize + 2*sizeof(int) > max)
- {
- PDEBUG ("too small buffer %d for %d", max, pc->maxblocksize);
- return -2;
- }
- i = (id - c->startid) % c->max_queue;
- pos = (id - c->startid) / c->max_queue;
- if (pos >= c->numinput || c->input[pos] == NULL)
- {
- PDEBUG ("file %d does not exist. (%d,%p)n", pos, c->numinput, c->input[pos]);
- return -1;
- }
- if (fseeko (c->input[pos], ((off_t)(i)) * (pc->maxblocksize + 2*sizeof (int)), SEEK_SET) != 0)
- {
- PDEBUG ("Fssek failed. (%d, %d, %d)n", pos, i, pc->maxblocksize);
- return -1;
- }
- if ((i=fread (buf, 1, pc->maxblocksize+2*sizeof (int), c->input[pos])) <= 2*sizeof (int) || i < ((int *)buf)[1]+2*sizeof(int))
- {
- PDEBUG ("Fread failed. (%d, %d, %d)n", pos, i, pc->maxblocksize);
- return -1;
- }
- msg = (int *)buf;
- if (msg[1] > pc->maxblocksize || msg[1] <= 0)
- {
- PDEBUG ("msg format error. (%d, %d, %d)n", msg[0], msg[1], pc->maxblocksize);
- return -1;
- }
- PINFO ("Found block. (%d, %d, %d, %d)n", id, msg[0], msg[1], pc->maxblocksize);
- // msg[0] = id;
- pc->upsize += msg[1];
- return msg[1];
- }
- int saveBlock (struct Channel *c, char *buf, struct Session *p)
- {
- struct logrec lrec;
- int id, size;
- int j=0;
- unsigned long long keysample;
- struct SPUpdate *s;
- struct LiveChannelInfo *pcinfo;
- if ((!c) || (pcinfo = c->pcinfo) == NULL)
- {
- PDEBUG ("saveBlock c is null.n");
- return -1;
- }
- assert (buf);
- size = ((int *)buf)[1];
- if (size > MAX_BLOCK_SIZE || size <= MIN_BLOCK_SIZE)
- {
- PDEBUG ("saveBlock size is %d and id is %d.n", size, id);
- return 0;
- }
- if (c->maxblocksize == 0)
- {
- c->maxblocksize = size;
- pcinfo->max_queue = BLOCK_PER_FILE;
- }
- else if (size > c->maxblocksize)
- {
- PDEBUG ("saveBlock maxblocksize is %d, size is %d and id is %d.n", c->maxblocksize, size, id);
- return 0;
- }
- if (pcinfo->max_queue == 0)
- {
- PDEBUG ("c->max_queue is 0n");
- return -1;
- }
- // PDEBUG ("Recv %d(%d) Save2 (%d,%d,%d), dataSource %p and now is %pn", ((int*)buf)[0], (int)size, pcinfo->numinput, pcinfo->numblocks, pcinfo->maxID, pcinfo->dataSource, p);
- id = pcinfo->maxID;
- ((int *)buf)[0] = id;
- if (fseeko (c->db, ((off_t)(pcinfo->numblocks)) * (c->maxblocksize + 2*sizeof (int)), SEEK_SET) != 0)
- {
- PDEBUG ("Error in fsseko.n");
- return -1;
- }
- if (fwrite (buf, c->maxblocksize+2*sizeof(int), 1, c->db) != 1)
- {
- PDEBUG ("fwrite error in saveBlock:%.32s:%s", c->channel_md5, c->fname);
- return -1;
- }
- j = ((int *)buf)[2];
- if (j <= 0 || j >= size-sizeof(keysample)) keysample = 0;
- else keysample = *(unsigned long long *) (buf+2*sizeof(int)+j);
- s = &(pcinfo->s);
- if (keysample > 0)
- {
- if(s->maxKeySample < keysample)
- {
- //the increasement should not be larger than 1000 seconds!
- if(s->maxKeySample == 0 || (keysample-s->maxKeySample)/10000000 < 1000)
- {
- s->maxKeySample = keysample;
- } else
- PDEBUG("Error Keysample at block %d,(%lld,%lld). n", id, keysample, s->maxKeySample);
- }
- if (s->minKeySample == 0 || keysample < s->minKeySample)
- {
- s->minKeySample = keysample;
- }
- lrec.id = id;
- lrec.keysample = keysample/10000000;
- fwrite (&lrec, sizeof (lrec), 1, pcinfo->keyfile);
- fflush (pcinfo->keyfile);
- }
- if (id < s->minBlockID || s->maxBlockID == 0)
- {
- s->minBlockID = id;
- }
- if (id > s->maxBlockID)
- {
- s->maxBlockID = id;
- }
- if (pcinfo->dataSource != p)
- {
- PDEBUG ("dataSource %p is not equal p %p.n", pcinfo->dataSource, p);
- pcinfo->dataSource = p;
- }
- fflush (c->db);
- pcinfo->total ++;
- pcinfo->maxID ++;
- pcinfo->numblocks ++;
- c->downsize += size;
- if (pcinfo->numblocks >= pcinfo->max_queue)
- {
- while ((j = newChannelFile (c)) == 0);
- if (j < 0) return -1;
- c->db = pcinfo->input[pcinfo->numinput-1];
- }
- return size;
- }
- int free_livechannel (struct Channel *p)
- {
- char buffer[MAX_LINE];
- struct LiveChannelInfo *pcinfo = p->pcinfo;
- int i;
- send_all_spupdate (p, NULL);
- for (i=0; i<pcinfo->numinput; i++)
- {
- if (pcinfo->input[i]) fclose (pcinfo->input[i]);
- if (!pcinfo->isSave)
- {
- snprintf (buffer, MAX_LINE, "%s/%d", p->fname, i);
- unlink (buffer);
- }
- }
- p->db = NULL;
- if (pcinfo->keyfile != NULL) fclose (pcinfo->keyfile);
- if (!pcinfo->isSave)
- {
- snprintf (buffer, MAX_LINE, "%s/keysample", p->fname);
- unlink (buffer);
- rmdir (p->fname);
- }
- if (pcinfo->dataSource != NULL)
- {
- pcinfo->dataSource->pc = NULL;
- Clientclosure (pcinfo->dataSource - TRACKER[TYPE_CS].head, TYPE_CS);
- pcinfo->dataSource = NULL;
- }
- if (pcinfo->mlist)
- {
- timer_remove (p, NULL);
- for (i=0; i<pcinfo->max_channel; i++)
- {
- if (pcinfo->mlist->m_lists[i] != NULL)
- {
- pcinfo->mlist->m_lists[i]->ref --;
- if (pcinfo->mlist->m_lists[i]->ref <= 0)
- freeProgram (pcinfo->mlist->m_lists[i], NULL);
- }
- }
- free (pcinfo->mlist);
- }
- freeMedia (p);
- #ifdef TEST
- for (i=0; i<MAX_TS; i++)
- {
- sprintf (buffer, "%s/%s/%s.gtv", WWW_ROOT, NET_NAME[i], p->channel_name);
- remove (buffer);
- }
- #endif
- return 0;
- }
- #endif
- struct Channel *newLiveChannel (char *name, struct Session *source, char *cmd5, float bitrate, int maxblocksize)
- {
- int id;
- struct Channel *p;
- struct LiveChannelInfo *pcinfo;
- if (NumNewChannel >= MAX_CHANNEL) return (struct Channel *)0;
- p = (struct Channel *)calloc (sizeof (struct Channel), 1);
- memcpy (p->channel_md5, cmd5, MD5_LEN);
- if (name) strncpy (p->channel_name, name, sizeof (p->channel_name));
- p->channel_md5[MD5_LEN] = 0;
- p->upsize = 0;
- p->downsize = 0;
- p->maxblocksize = maxblocksize;
- p->ctime = time (NULL);
- p->pcinfo = (struct LiveChannelInfo *)calloc (sizeof (struct LiveChannelInfo), 1);
- pcinfo = p->pcinfo;
- pcinfo->dataSource = source;
- pcinfo->bitrate = bitrate;
- buildLivePath (p->fname, CHNLURL_LEN, cmd5);
- if (init_livechannel (p) < 0)
- {
- PDEBUG ("newLiveChannel error for %p.", p);
- free_livechannel (p);
- free (pcinfo);
- free (p);
- return (struct Channel *)0;
- }
- id = hash_str (p->channel_md5, MD5_LEN);
- PDEBUG("newLiveChannel hash %.32s(fname=%s) to %d.n", p->channel_md5, p->fname, id);
- p->next = ChannelHash[id];
- ChannelHash[id] = p;
- p->lnext = ChannelList;
- ChannelList = p;
- NumNewChannel ++;
- return p;
- }