CPnew.c
资源名称:p2p_vod.rar [点击查看]
上传用户:liguizhu
上传日期:2015-11-01
资源大小:2422k
文件大小:35k
源码类别:
P2P编程
开发平台:
Visual C++
- /*
- * Openmysee
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
- *
- */
- #include "echo.h"
- #define TYPE_NP 0
- #define TYPE_CP 1
- #define TYPE_SCP 2
- #define TYPE_ECP 3
- #define TYPE_GCP 4
- #define MEDIATYPE_FIRST 0.149
- #define MAX_IDLE 90 /* destruct a connection to sp/np after 90s */
- //#define P2PS_PORT 50002
- #define TS4CP_PORT 22168
- #define REQUEST_AHEAD 15 /* Request how many blocks if a block doesn't exist in cache */
- int MAX_P2PS=500;
- int MAX_P2PC=512;
- int MAX_JOB_PER_SESSION=3;
- //char *CONFIG = "./acp.cfg";
- char *PREFIX="/data/cp/";
- char *SCP_CHANNEL;
- char *ECP_REGION;
- int isGCP;
- char *SERVERIP;
- char *BINDIP;
- char *PIDFile="/var/run/cpnew.pid";
- int MAX_BANDWIDTH;
- char *AUTH_MD5;
- int AUTH_USERID;
- fd_set osocks;
- int BINDALL;
- #ifdef HAVE_TS
- int TSSOCK = 0;
- //#define RANDOM_PORT 3947
- #endif
- time_t CurrentTime;
- time_t startTime;
- int SnapShotInterval;
- int NearPeerInterval = 30;
- int isSet = 0; /* whether TS has returned WELCOME message */
- struct TSMessage UDPMsg;
- extern struct Channel *ChannelHash[MAX_CHANNEL];
- extern struct Channel *ChannelList;
- struct sockaddr_in TSADDR;
- socklen_t addrlen = sizeof (struct sockaddr_in);
- // calculate avg & cur speed
- long long totalDownBytes=0, totalUpBytes=0;
- long long tmpDownBytes=0, tmpUpBytes=0;
- struct ServerDesc TRACKER[MAX_TYPE];
- char *LOGXML;
- extern int errno;
- int JobHighWater = 10000;
- int MaxNPPerChannel = 300;
- int cfgP2PS_PORT = 23;
- int cfgCP2TS_PORT = CP2TS_PORT;
- struct NamVal ConfigParameters[]
- =
- {
- {"Prefix", &PREFIX, 's'},
- {"MAX_NP", &MAX_P2PS, 'd'},
- {"MAX_SP", &MAX_P2PC, 'd'},
- {"Pidfile", &PIDFile, 's'},
- {"TrackerIP", &SERVERIP, 's'},
- {"SCP", &SCP_CHANNEL, 's'},
- {"GCP", &isGCP, 'd'},
- {"authid", &AUTH_USERID, 'd'},
- {"authmd5", &AUTH_MD5, 's'},
- {"ECP", &ECP_REGION, 's'},
- {"BandWidth", &MAX_BANDWIDTH, 'd'},
- {"BindIP", &BINDIP, 's'},
- {"SnapShotInterval",&SnapShotInterval,'d'},
- {"CP4NP_PORT", &cfgP2PS_PORT, 'd'},
- {"CP2TS_PORT", &cfgCP2TS_PORT, 'd'},
- {"LogFilePath", &LOGXML, 's'},
- {"NearPeerInterval", &NearPeerInterval, 'd'},
- {"JobHighWater", &JobHighWater, 'd'},
- {"MaxNPPerChannel", &MaxNPPerChannel, 'd'},
- {"BINDALL", &BINDALL, 'd'}
- };
- int register_cp ();
- int init_cp ();
- int handle_new_connection(int sock, int type);
- int Clientclosure (int listnum, int type);
- void process_child (void);
- int init_P2PS (int listnum);
- int process_P2PS (int listnum);
- int closure_P2PS (int listnum);
- int init_P2PC (int listnum);
- int process_P2PC (int listnum);
- int closure_P2PC (int listnum);
- int sendMessage (int sock, char *ptr, struct sockaddr_in *dest);
- int process_TS2CP_PEERS (char *buf);
- void process_type (int type, fd_set *socks, fd_set *wsocks, fd_set *esocks);
- int reconnect (struct Session *p);
- char *parseECP (char *str, char *buf);
- int closure_TS ();
- int periodCheck (float KBPSused);
- void makeSnapShot(int count, int time_interval);
- int send_nearpeers (struct Channel *pc, struct Edge *pme);
- int send_nearpeers_toall (struct Channel *pc);
- #ifdef HAVE_TS
- int process_TS();
- #endif
- int send_P2P_PUSHLIST (struct Channel *pc, int id);
- int period_process (void);
- extern char *getJobBuffer (struct JobDes *p, int *max);
- extern inline void setblockId (struct JobDes *pj, int id);
- #include "sessions.c"
- #define INIT_MAXQ(pc,s,maxq) do
- {
- if (s->maxBlockID == 0) return -1;
- if (s->maxBlockID >= MAX_QUEUE) maxq = MAX_QUEUE;
- else maxq = s->maxBlockID;
- if (pc->pcinfo->indisk == NULL)
- {
- pc->pcinfo->max_queue = maxq;
- pc->pcinfo->indisk = calloc (maxq, 1);
- pc->pcinfo->bitflag = calloc ((maxq+7)/8, 1);
- if (pc->pcinfo->indisk == NULL || pc->pcinfo->bitflag == NULL)
- return -1;
- } else if (maxq < pc->pcinfo->max_queue)
- {
- pc->pcinfo->max_queue = maxq;
- pc->pcinfo->indisk = realloc (pc->pcinfo->indisk, maxq);
- pc->pcinfo->bitflag = realloc (pc->pcinfo->bitflag, (maxq+7)/8);
- if (pc->pcinfo->indisk == NULL || pc->pcinfo->bitflag == NULL)
- return -1;
- }
- } while (0)
- int period_process (void)
- {
- static time_t last_snapshot;
- static int snapCount = 0;
- if (CurrentTime - last_snapshot > SnapShotInterval)
- {
- makeSnapShot(snapCount++, CurrentTime-last_snapshot);
- system("/usr/bin/vmstat -a >> cp.log 2>&1 &");
- last_snapshot = CurrentTime;
- }
- return 0;
- }
- int main(int argc, char **argv)
- {
- int i, mode = 1, tmp = 0;
- if (argc < 2)
- {
- printf("usage: %s mode(0 for daemon, 1 for console).n", argv[0]);
- return -1;
- }
- signal (SIGPIPE, SIG_IGN);
- signal (SIGINT, terminate);
- mode = atoi (argv[1]);
- if (mode == 0)
- daemon(1,1);
- read_config (CONFIG, ConfigParameters, sizeof(ConfigParameters)/sizeof(struct NamVal));
- for(i = 0 ; i < argc; ++i)
- {
- if(strncmp(argv[i], "tcp=", 4) == 0)
- {
- tmp = atoi(argv[i]+4);
- if(tmp > 0 && tmp < 65535)
- cfgP2PS_PORT = tmp;
- }
- else if(strncmp(argv[i], "udp=", 4) == 0)
- {
- tmp = atoi (argv[i]+4);
- if(tmp > 0 && tmp < 65535)
- cfgCP2TS_PORT = tmp;
- }
- }
- for (i=0; i<10 && IN_LOOP > 0; i++)
- {
- /*
- pid_t pid;
- if ((pid = fork ()) == 0)
- {
- */
- FD_ZERO(&osocks);
- if (init_cp () < 0)// || initLOG () < 0)
- {
- PDEBUG ("init_cp error, exit...n");
- exit (-1);
- }
- process_child ();
- /*
- } else if (pid < 0)
- {
- perror ("fork");
- exit (pid);
- } else
- {
- waitpid (pid, NULL, 0);
- }
- */
- }
- return 0;
- }
- int init_P2PS (int listnum)
- {
- return 0;
- }
- int process_P2P_HELLO (struct Session *p, struct Message *m)
- {
- struct Edge *pedge = NULL;
- float version = *(float *)(m->buffer);
- char *buf = m->buffer + sizeof (float);
- struct Channel *pc;
- struct LiveChannelInfo *pcinfo;
- int listnum;
- // char *buf, buffer[MAX_DATA];
- PINFO ("RECV P2P_HELLO. n");
- p->version = version;
- listnum = p - TRACKER[TYPE_P2PS].head;
- if ((pc = findChannel (buf, MD5_LEN)) == NULL)
- {
- if ((pc = newLiveChannel (buf, NULL, buf, 0, 0)) != (struct Channel *)0)
- {
- pedge=newEdge (pc, p);
- pc->numclient ++;
- } else
- {
- Clientclosure (listnum, TYPE_P2PS);
- return -1;
- }
- p->pc = pc;
- } else
- {
- if (pc->numclient > MaxNPPerChannel)
- {
- Clientclosure (listnum, TYPE_P2PS);
- return -1;
- }
- for (pedge=p->header; pedge && pedge->head != pc; pedge=pedge->enext);
- if (pedge == NULL)
- {
- pedge=newEdge (pc, p);
- pc->numclient ++;
- }
- p->pc = pc;
- if (pc->pcinfo->dataSource)
- {
- /*
- // connect exist, send p2p_hello to check channel state.
- buf = buffer+sizeof (int);
- *(unsigned char *)buf = P2P_HELLO;
- buf += sizeof (char);
- memcpy (buf, pc->channel_md5, MD5_LEN);
- buf += MD5_LEN;
- *(unsigned char *)buf = 0;
- buf += sizeof (char);
- *(int *)buffer = buf - buffer;
- if (writeMessage (pc->pcinfo->dataSource, buffer) < 0)
- {
- Clientclosure (listnum, TYPE_P2PC);
- return -1;
- }
- PDEBUG("sent P2P_HELLO to SP. n");
- */
- buf += MD5_LEN + sizeof (char);
- memcpy (&(p->addr), buf, sizeof (p->addr));
- if (pedge) send_nearpeers (pc, pedge);
- return 0;
- }
- PDEBUG("NO connection to SP. try connect.n");
- }
- buf += MD5_LEN + sizeof (char);
- memcpy (&(p->addr), buf, sizeof (p->addr));
- buf += sizeof (p->addr);
- pcinfo = pc->pcinfo;
- pcinfo->numofsp = (unsigned char)*buf;
- buf += sizeof (char);
- if (pcinfo->numofsp == 0 || pcinfo->numofsp > MAX_REPSP)
- {
- Clientclosure (listnum, TYPE_P2PS);
- return -1;
- }
- memcpy (&(pcinfo->SPLIST), buf, pcinfo->numofsp*sizeof(struct NormalAddress));
- buf += pcinfo->numofsp*sizeof (struct NormalAddress);
- if (buf - m->buffer + NORMAL_HEADER > m->len)
- {
- PDEBUG ("Invalid message %d, length %d not enoughn", m->type, m->len);
- Clientclosure (listnum, TYPE_P2PS);
- return -1;
- }
- if (isGCP || SCP_CHANNEL)
- {
- if ((listnum = newChannel (pc, pcinfo->SPLIST, pcinfo->numofsp, 0)) < 0)
- {
- PDEBUG("Unable connect to SP.n");
- Clientclosure (p-TRACKER[TYPE_P2PS].head, TYPE_P2PS);
- return -1;
- }
- } else /* ECP, need peers */
- {
- #ifdef HAVE_TS
- UDPMsg.type = CP2TS_NEED_PEERS;
- UDPMsg.len = 12+MD5_LEN;
- memcpy (UDPMsg.buffer, pc->channel_md5, MD5_LEN);
- /* isSet whether TS has returned WELCOME message */
- if (isSet && sendMessage (TSSOCK, (char *)&UDPMsg, &TSADDR) < 0)
- {
- PDEBUG ("exit...n");
- exit (1);
- }
- #endif
- }
- if (pedge) send_nearpeers (pc, pedge);
- return 0;
- }
- int process_P2P_MEDIATYPE (int listnum, struct Message *m)
- {
- struct Channel *pc;
- char *buf, *media, *name, *channel_name;
- int start, length, size, proglen, chnllen;
- buf = m->buffer;
- if ((pc = findChannel (buf, MD5_LEN)) == NULL)
- return -1;
- buf += MD5_LEN;
- start = *(int *)buf;
- buf += sizeof (int);
- length = *(int *)buf;
- buf += sizeof (int);
- size = *(int *)buf;
- buf += sizeof (int);
- media = buf;
- buf += size;
- proglen = *(unsigned char *)buf;
- buf += sizeof (char);
- name = buf;
- buf += proglen;
- buf += sizeof (int);
- chnllen = *(unsigned char *)buf;
- buf += sizeof (char);
- channel_name = buf;
- buf += chnllen;
- if (buf - (char *)m > m->len)
- return -1;
- addMedia (pc, start, length, size, media, name, channel_name);
- return 0;
- }
- int process_P2P_PUSHLIST (struct Session *p, struct Message *m)
- {
- struct Channel *pc;
- struct Edge *pedge;
- char *buf;
- int i, type, listnum, size;
- listnum = p - TRACKER[TYPE_P2PS].head;
- buf = m->buffer;
- if (p->npcp == TYPE_CP)
- {
- if ((pc = findChannel (buf, MD5_LEN)) == NULL)
- {
- if ((pc = newLiveChannel (m->buffer, NULL, m->buffer, 0, 0)) != (struct Channel *)0)
- {
- pedge=newEdge (pc, p);
- pc->numclient ++;
- }
- } else
- {
- for (pedge=p->header; pedge && pedge->head != pc; pedge=pedge->enext);
- if (pedge == NULL)
- {
- pedge=newEdge (pc, p);
- pc->numclient ++;
- }
- }
- buf += MD5_LEN;
- } else
- {
- pc = p->pc;
- }
- if (pc == NULL && p->npcp != TYPE_CP)
- {
- Clientclosure (listnum, TYPE_P2PS);
- return -1;
- }
- if (p->numjob >= MAX_JOB_PER_SESSION)
- return -2;
- type = *(unsigned char *)buf;
- buf += sizeof (char);
- size = *(unsigned char *)buf;
- buf += sizeof (char);
- if (type)
- {
- deleteChannel (p, pc);
- for (i=0; i<size; i++)
- if (process_P2P_REQUEST_real (p, pc, ((unsigned int *)buf)[i]) < 0)
- return -1;
- } else
- {
- for (i=0; i<size; i++)
- if (process_P2P_REQUEST_real (p, pc, ((unsigned int *)buf)[i]) < 0)
- return -1;
- buf += size*sizeof(int);
- size = *(unsigned char *)buf;
- buf += sizeof (char);
- deleteJob (p, pc, (unsigned int *)buf, size);
- }
- if (buf - m->buffer + NORMAL_HEADER > m->len)
- {
- PDEBUG ("Invalid message %d, length %d not enoughn", m->type, m->len);
- Clientclosure (listnum, TYPE_P2PS);
- return -1;
- }
- return 0;
- }
- int process_P2P_REQUEST_real (struct Session *p, struct Channel *pc, int id)
- {
- struct JobDes *pj = newJob ();
- char *buf, *buffer;
- struct LiveChannelInfo *pcinfo;
- int size, max, i;
- buffer = getJobBuffer (pj, &max);
- buf = buffer + sizeof (int);
- *(unsigned char *)buf = P2P_RESPONSE;
- buf += sizeof (char);
- if (p->npcp == TYPE_CP)
- {
- memcpy (buf, pc->channel_md5, MD5_LEN);
- buf += MD5_LEN;
- }
- pcinfo = pc->pcinfo;
- if (p->first == 0)
- {
- p->first ++;
- if (p->version >= MEDIATYPE_FIRST)
- sendIdMedia (p, pc, id, 0);
- }
- if (pcinfo->dataSource == NULL)
- {
- *(int *)buf = id;
- buf += sizeof (int);
- *(int *)buf = 0;
- buf += sizeof (int);
- if (isGCP || SCP_CHANNEL)
- {
- if ((i = newChannel (pc, pcinfo->SPLIST, pcinfo->numofsp, 0)) < 0)
- freeLiveChannel (pc, NULL);
- } else
- {
- #ifdef HAVE_TS
- UDPMsg.type = CP2TS_NEED_PEERS;
- UDPMsg.len = 12+MD5_LEN;
- memcpy (UDPMsg.buffer, pc->channel_md5, MD5_LEN);
- /* isSet whether TS has returned WELCOME message */
- if (isSet && sendMessage (TSSOCK, (char *)&UDPMsg, &TSADDR) < 0)
- {
- PDEBUG ("exit...n");
- exit (1);
- }
- #endif
- }
- } else if ((size=locate_by_id (pc, id, buf, max-32)) > 0)
- {
- buf += 2*sizeof (int) + size;
- if (p->version >= MEDIATYPE_FIRST && (i=isHit (pc, id)) >= 0)
- sendHitMedia (p, pc, i, id, 0);
- p->last_transferblock = CurrentTime;
- } else if (size == -2)
- {
- assert (0);
- PDEBUG ("Leave block %d to next round.n", id);
- return -1;
- } else if (id >= 0)
- {
- *(int *)buf = id;
- buf += sizeof (int);
- *(int *)buf = 0;
- buf += sizeof (int);
- PINFO ("no block %dn", id);
- send_P2P_PUSHLIST (pc, id);
- }
- *(int *)buffer = buf - buffer;
- setblockId(pj, id);
- writeDATAMessage(p,pc, pj);
- // PDEBUG ("Write block %dn", id);
- return 0;
- }
- int process_P2PS (int listnum)
- {
- struct Session *p = &(TRACKER[TYPE_P2PS].head[listnum]);
- struct Message *m = (struct Message *)(p->buf+p->start);
- tmpDownBytes += m->len;
- switch (m->type)
- {
- case P2P_HELLO:
- if (process_P2P_HELLO (p, m) == -2)
- return -2;
- break;
- case P2P_PUSHLIST:
- if (process_P2P_PUSHLIST (p, m) == -2)
- return -2;
- break;
- case P2P_REPORT: /* At present no action */
- break;
- case P2P_MSG:
- break;
- case P2P_SPUPDATE:
- break;
- case P2P_RESPONSE:
- break;
- case P2P_NEAR_PEERS:
- break;
- case P2P_REQMEDIA:
- sendIdMedia (p, p->pc, *(int *)(m->buffer), 0);
- break;
- default:
- PDEBUG ("Unknown message format from clientn");
- Clientclosure (listnum, TYPE_P2PS);
- return -1;
- }
- return 0;
- }
- int closure_P2PS (int listnum)
- {
- struct Session *p = &(TRACKER[TYPE_P2PS].head[listnum]);
- // struct Channel *pc = p->pc;
- struct Edge *pedge, *prevedge;
- PDEBUG ("CP disconnected from %d.%d.%d.%d:%d.n", IPADDR (p->host), p->port);
- for (pedge=p->header; pedge; pedge=prevedge)
- {
- pedge->head->numclient --;
- prevedge=pedge->enext;
- delEdge (pedge);
- }
- FD_CLR(p->socket, &osocks);
- close (p->socket);
- FREE (p->buf);
- deleteAll (p);
- memset (p, 0, sizeof (struct Session));
- return 0;
- }
- int init_P2PC (int listnum)
- {
- return 0;
- }
- int newChannel (struct Channel *pc, struct NormalAddress *client, int n, int flag)
- {
- struct Session *p;
- int listnum, newconn = -1, max, sock_flag;
- struct Session *head;
- struct sockaddr_in addr;
- char *buf, buffer[MAX_DATA];
- if (pc == NULL) return -1;
- head = TRACKER[TYPE_P2PC].head;
- max = TRACKER[TYPE_P2PC].max;
- for (listnum = 0; listnum < max; listnum ++)
- {
- if(head[listnum].socket == 0)
- {
- if ((newconn = socket (PF_INET, SOCK_STREAM, 0)) < 0)
- {
- perror ("socket||gethostbyname");
- return -1;
- }
- memset (&addr, 0, sizeof (addr));
- addr.sin_port = client[0].sin_port;
- addr.sin_addr = client[0].sin_addr;
- addr.sin_family = AF_INET;
- if ((sock_flag = connect_nonb(newconn, &addr, sizeof (addr))) == -1)
- {
- close (newconn);
- return -1;
- }
- head[listnum].socket = newconn;
- head[listnum].type = TYPE_P2PC;
- head[listnum].flag = 1;
- head[listnum].sock_flag = sock_flag;
- head[listnum].buf = NEW();
- head[listnum].pc = pc;
- head[listnum].time_sec = CurrentTime;
- head[listnum].totalup = 0;
- head[listnum].last_transferblock = CurrentTime;
- FD_SET(newconn, &osocks);
- if (listnum > TRACKER[TYPE_P2PC].maxid)
- TRACKER[TYPE_P2PC].maxid = listnum;
- break;
- }
- }
- if (listnum >= max)
- {
- PDEBUG ("no space left for new incoming client.");
- close (newconn);
- return -1;
- }
- TRACKER[TYPE_P2PC].cur ++;
- (*(TRACKER[TYPE_P2PC].init)) (listnum);
- p = &(TRACKER[TYPE_P2PC].head[listnum]);
- pc->pcinfo->dataSource = p;
- pc->upsize = 0;
- pc->downsize = 0;
- PDEBUG("Connect to %s:%d. and send P2P_HELLO.n", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
- buf = buffer+sizeof (int);
- *(unsigned char *)buf = P2P_HELLO;
- buf += sizeof (char);
- memcpy (buf, pc->channel_md5, MD5_LEN);
- buf += MD5_LEN;
- *(unsigned char *)buf = 0;
- buf += sizeof (char);
- if (flag == TYPE_CP)
- {
- *(unsigned char *)buf = pc->pcinfo->numofsp;
- buf += sizeof (char);
- if (pc->pcinfo->numofsp)
- {
- memcpy (buf, &(pc->pcinfo->SPLIST), pc->pcinfo->numofsp*sizeof (struct NormalAddress));
- buf += pc->pcinfo->numofsp*sizeof (struct NormalAddress);
- }
- }
- *(int *)buffer = buf - buffer;
- if (writeMessage (p, pc, buffer) < 0)
- {
- perror ("CP: write SP");
- Clientclosure (listnum, TYPE_P2PC);
- return -1;
- }
- return listnum;
- }
- int process_P2P_SPUPDATE (int listnum, struct Message *m)
- {
- int maxq;
- struct Edge *pedge;
- char buffer[MAX_DATA];
- struct Channel *pc;
- struct SPUpdate *s = (struct SPUpdate *)(m->buffer+MD5_LEN);
- int bShouldCloseSP = 0;
- if (m->len < sizeof (struct SPUpdate) + MD5_LEN + 5)
- {
- PDEBUG ("Invalid message %d, length %d not enoughn", m->type, m->len);
- Clientclosure (listnum, TYPE_P2PC);
- return -1;
- }
- *(int *)buffer = sizeof (struct SPUpdate) + sizeof(int) + sizeof (char);
- *(unsigned char *)(buffer + sizeof (int)) = P2P_SPUPDATE;
- PINFO ("Recv SPUPDATE(%lld,%lld,%u,%u).n", s->minKeySample, s->maxKeySample, s->minBlockID, s->maxBlockID);
- memcpy (buffer+sizeof(int)+sizeof(char), s, sizeof(struct SPUpdate));
- if ((pc=findChannel (m->buffer, MD5_LEN)) == NULL) return -1;
- if (s->minBlockID == 0xffffffff && s->maxBlockID == 0xffffffff)
- {
- pc->pcinfo->dataSource = NULL; /* indication of end */
- if(s->minKeySample == -1 && s->maxKeySample == -1)
- PDEBUG("NO SUCH RESOURCE!n"); // no such resource
- else if(s->minKeySample == 0 && s->maxKeySample == 0)
- PDEBUG("CHANNEL HAS BEEN CLOSED!n");// channel has been closed
- else
- PDEBUG("UNKNOWN MESSAGE! 1n"); // unknown message
- bShouldCloseSP = 1; // should close connection
- } else
- {
- if(s->minBlockID == 0 && s->maxBlockID == 0 && s->minKeySample == 0 && s->maxKeySample == 0)
- {
- PDEBUG("END OF CHANNEL!n"); // end of channel
- bShouldCloseSP = 1;
- } else if (s->minKeySample == -1ULL && s->maxKeySample == -1ULL)
- {
- INIT_MAXQ(pc,s,maxq);
- return 0;
- } else if (s->minKeySample == -2LL)
- {
- INIT_MAXQ(pc,s,maxq);
- pc->type = T_PLIST;
- pc->pcinfo->max_channel = (int)(s->maxKeySample);
- if (pc->pcinfo->max_channel <= 0 || pc->pcinfo->max_channel >= MAX_FILEINPUT)
- return -1;
- if (pc->pcinfo->media != NULL)
- freeMedia (pc);
- pc->pcinfo->media = calloc (sizeof (struct MediaData), pc->pcinfo->max_channel);
- return 0;
- } else if (s->minKeySample == -3LL)
- {
- pc->pcinfo->max_channel = 1;
- if (pc->pcinfo->media != NULL)
- freeMedia (pc);
- pc->pcinfo->media = calloc (sizeof (struct MediaData), 1);
- } else
- {
- memcpy (&(pc->pcinfo->s), s, sizeof (struct SPUpdate));
- // request block after spupdate, not wait!
- // now, block will be sent automaticlly by SP
- // send_P2P_PUSHLIST (pc, s->maxBlockID);
- }
- }
- {
- int i = 0;
- unsigned char vcode = 0;
- for (i = 0; i < sizeof (struct SPUpdate); ++i) {
- vcode += ((unsigned char*)s)[i];
- }
- buffer[*(int*)buffer] = vcode;
- ++*(int*)buffer;
- }
- for (pedge=pc->PeerHead; pedge; pedge=pedge->cnext)
- {
- if (pedge->me->npcp == TYPE_CP)
- {
- // PDEBUG ("Send SPUPDATE to CP %d.n", pedge->me-TRACKER[TYPE_P2PC].head);
- writeMessage (pedge->me, pc, (char *)m);
- } else
- {
- // PDEBUG ("Send SPUPDATE to NP %d.n", pedge->me-TRACKER[TYPE_P2PS].head);
- writeMessage (pedge->me, pc, buffer);
- }
- }
- if(bShouldCloseSP != 0 || pedge == pc->PeerHead/* no NP*/)
- return -1; // Close Connection to SP
- return 0;
- }
- int process_P2P_RESPONSE (int listnum, struct Message *m)
- {
- int size;
- struct Session *p = &(TRACKER[TYPE_P2PC].head[listnum]);
- struct Channel *pc;
- char *msg = m->buffer+MD5_LEN;
- if ((pc=findChannel (m->buffer, MD5_LEN)) == NULL)
- return -1;
- if ((size = saveBlock (pc, msg, p)) <= 0)
- {
- PDEBUG ("save block error, size %d, %dn", size, listnum);
- return -1;
- // Clientclosure (listnum, TYPE_P2PC);
- }
- p->last_transferblock = CurrentTime;
- return 0;
- }
- int process_P2PC (int listnum)
- {
- struct Session *p = &(TRACKER[TYPE_P2PC].head[listnum]);
- struct Message *m = (struct Message *)(p->buf+p->start);
- tmpDownBytes += m->len;
- switch (m->type)
- {
- case P2P_SPUPDATE:
- if(process_P2P_SPUPDATE (listnum, m) < 0)
- {
- Clientclosure(listnum, TYPE_P2PC);
- }
- break;
- case P2P_RESPONSE:
- process_P2P_RESPONSE (listnum, m);
- break;
- case P2P_MSG:
- break;
- case P2P_MEDIATYPE:
- process_P2P_MEDIATYPE (listnum, m);
- break;
- default:
- PDEBUG("Err msg type from SP.n");
- Clientclosure (listnum, TYPE_P2PC);
- return -1;
- }
- return 0;
- }
- int closure_P2PC (int listnum)
- {
- // struct Edge *pedge, *prevedge;
- // char buffer[MAX_DATA], *buf;
- struct Session *p=&(TRACKER[TYPE_P2PC].head[listnum]);
- struct Channel *pc = p->pc;
- if (pc)
- {
- /*
- buf = buffer + sizeof (int);
- *(unsigned char *)buf = P2P_SPUPDATE;
- buf += sizeof (char);
- memcpy (buf, pc->channel_md5, MD5_LEN);
- buf += MD5_LEN;
- memset (buf, 0, sizeof (struct SPUpdate));
- buf += sizeof (struct SPUpdate);
- *(int *)buffer = buf - buffer;
- for (pedge=pc->PeerHead; pedge; pedge=prevedge)
- {
- pc->numclient --;
- writeMessage (pedge->me, buffer);
- prevedge = pedge->cnext;
- delEdge (pedge);
- }
- */
- pc->pcinfo->dataSource = NULL;
- }
- PDEBUG ("SP disconnected from %d.%d.%d.%d:%d.n", IPADDR (p->host), p->port);
- FD_CLR(p->socket, &osocks);
- close (p->socket);
- FREE (p->buf);
- deleteAll (p);
- memset (p, 0, sizeof (struct Session));
- return 0;
- }
- #ifdef HAVE_TS
- int register_cp () //send UDP msg
- {
- const int max_times = 10;
- int i;
- char *buf;
- char buffer[MAX_DATA];
- isSet = 0;
- if (TSSOCK == 0)
- {
- for (i=0; i<max_times; i++)
- {
- if (BINDALL == 0)
- TSSOCK = init_udp (BINDIP, cfgCP2TS_PORT);
- else
- TSSOCK = init_udp (NULL, cfgCP2TS_PORT);
- if (TSSOCK > 0) break;
- PDEBUG("Sleep 1000. cause init UDP port %d failed.", cfgCP2TS_PORT);
- sleep (1000);
- }
- if (TSSOCK <= 0)
- {
- PDEBUG ("exit...n");
- exit (1); //the max times try init_udp failure
- }
- }
- buf = buffer + sizeof (int);
- *(unsigned char *)buf = CP2TS_REGISTER;
- buf += sizeof (char);
- *(int *)buf = AUTH_USERID;
- buf += sizeof (int);
- strncpy (buf, AUTH_MD5, MD5_LEN);
- buf += MD5_LEN;
- *(unsigned short *)buf = htons (cfgP2PS_PORT);
- buf += sizeof (short);
- if (SCP_CHANNEL) //Now only GCP is available
- {
- *buf = CT_SPECIFIED_RES;
- buf += sizeof (char);
- memcpy (buf, SCP_CHANNEL, strlen(SCP_CHANNEL)+1);
- buf += strlen (SCP_CHANNEL)+1;
- } else if (ECP_REGION)
- {
- *buf = CT_EDGE;
- buf += sizeof (char);
- buf = parseECP (ECP_REGION, buf);
- } else
- {
- *buf = CT_GENERAL;
- buf += sizeof (char);
- }
- *(int *)buffer = buf - buffer; //the size of buffer;
- TSADDR.sin_port = htons (TS4CP_PORT);
- TSADDR.sin_addr.s_addr = inet_addr (SERVERIP);
- TSADDR.sin_family = AF_INET;
- if (sendMessage (TSSOCK, buffer, &TSADDR) < 0) //send register msg
- {
- PDEBUG ("Cannot write to servern");
- return -1;
- }
- return 0;
- }
- #endif
- int init_cp ()
- {
- FILE *pidf;
- struct rlimit rl;
- char buffer[MAX_DATA];
- rl.rlim_cur = rl.rlim_max = 1000000;
- if (setrlimit (RLIMIT_NOFILE, &rl) != 0)
- {
- perror ("getrlimit");
- }
- OPENLOG;
- #ifdef DEBUG
- system ("ulimit -a");
- if (getrlimit (RLIMIT_CORE, &rl) != 0)
- {
- perror ("getrlimit");
- }
- fprintf (stderr, "Get core limit %d:%dn", (int)rl.rlim_cur, (int)rl.rlim_max);
- rl.rlim_cur = rl.rlim_max = (rlim_t )102400;
- if (setrlimit (RLIMIT_CORE, &rl) != 0)
- {
- perror ("getrlimit");
- }
- if (getrlimit (RLIMIT_CORE, &rl) != 0)
- {
- perror ("getrlimit");
- }
- fprintf (stderr, "Set core limit to %d:%dn", (int)rl.rlim_cur, (int)rl.rlim_max);
- system ("ulimit -a");
- #endif
- #ifdef HAVE_TS
- if (register_cp () < 0)
- {
- PDEBUG ("Cannot init TS connectionn");
- return -1;
- }
- #endif
- TRACKER[TYPE_P2PS].flag = FLAG_SERVER;
- TRACKER[TYPE_P2PS].type = TYPE_P2PS;
- TRACKER[TYPE_P2PS].port = cfgP2PS_PORT;
- TRACKER[TYPE_P2PS].cur = 0;
- TRACKER[TYPE_P2PS].max = MAX_P2PS;
- TRACKER[TYPE_P2PS].init = init_P2PS;
- TRACKER[TYPE_P2PS].process = process_P2PS;
- TRACKER[TYPE_P2PS].closure = closure_P2PS;
- TRACKER[TYPE_P2PS].head = calloc (sizeof (struct Session), TRACKER[TYPE_P2PS].max);
- switch (BINDALL)
- {
- case 0:
- if ((TRACKER[TYPE_P2PS].sock = init_server (BINDIP, cfgP2PS_PORT)) < 0)
- return -1;
- break;
- default:
- if ((TRACKER[TYPE_P2PS].sock = init_server (NULL, cfgP2PS_PORT)) < 0)
- return -1;
- break;
- }
- FD_SET(TRACKER[TYPE_P2PS].sock, &osocks);
- TRACKER[TYPE_P2PC].flag = FLAG_CLIENT;
- TRACKER[TYPE_P2PC].type = TYPE_P2PC;
- TRACKER[TYPE_P2PC].port = 0;
- TRACKER[TYPE_P2PC].cur = 0;
- TRACKER[TYPE_P2PC].max = MAX_P2PC;
- TRACKER[TYPE_P2PC].init = init_P2PC;
- TRACKER[TYPE_P2PC].process = process_P2PC;
- TRACKER[TYPE_P2PC].closure = closure_P2PC;
- TRACKER[TYPE_P2PC].head = calloc (sizeof (struct Session), TRACKER[TYPE_P2PC].max);
- mkdir (PREFIX, 0777);
- snprintf (buffer, MAX_DATA, "%s/%s", PREFIX, LIVE_PREFIX);
- mkdir (buffer, 0777);
- snprintf (buffer, MAX_DATA, "rm -fr %s/%s/*", PREFIX, LIVE_PREFIX);
- system (buffer);
- if ((pidf = fopen (PIDFile, "w")) == NULL)
- {
- PDEBUG ("Cannot open pidfile.n");
- return -1;
- }
- fprintf (pidf, "%dn", getpid ());
- fclose (pidf);
- return 0;
- }
- #ifdef HAVE_TS
- int process_TS()
- {
- struct sockaddr_in dest;
- int addr_len = sizeof (dest);
- struct Message m;
- int i;
- if ((i = recvfrom (TSSOCK, &m, sizeof (struct Message), 0, (struct sockaddr *)&dest, &addr_len)) <= 0)
- {
- PDEBUG ("Error in recving ts message.n");
- register_cp ();
- return 0;
- }
- switch (m.type)
- {
- case TS2CP_WELCOME:
- memcpy (&UDPMsg, &m, 12);
- PDEBUG("recv WELCOME from TS.n");
- /* isSet whether TS has returned WELCOME message */
- isSet = 1;
- break;
- case TS2CP_PEERS:
- process_TS2CP_PEERS (m.buffer);
- break;
- case TS2CP_MSG:
- if (*(char *)(m.buffer+sizeof(short)))
- {
- PDEBUG ("Error in TS2CP_MSG. n");
- register_cp ();
- }
- break;
- default:
- PDEBUG ("Error in trackerserver message formatn");
- register_cp ();
- return -1;
- }
- return 0;
- }
- int closure_TS ()
- {
- struct TSMessage *m = &UDPMsg;
- m->type = CP2TS_LOGOUT;
- m->len = 12;
- sendMessage (TSSOCK, (char *)m, &TSADDR);
- return 0;
- }
- int process_TS2CP_PEERS (char *buf)
- {
- struct Channel *pc;
- int listnum;
- char *channel_md5;
- unsigned char cpsize;
- struct NormalAddress *CPlist;
- unsigned char peersize;
- struct PeerInfoWithAddr *pinfo;
- channel_md5 = buf;
- buf += MD5_LEN;
- cpsize = *(unsigned char *)buf;
- buf += sizeof (char);
- CPlist = (struct NormalAddress *)buf;
- buf += sizeof(struct NormalAddress)*cpsize;
- peersize = *(unsigned char *)buf;
- buf += sizeof (char);
- pinfo = (struct PeerInfoWithAddr *)buf;
- // now find the channel
- if ((pc = findChannel (channel_md5, MD5_LEN)) == NULL) return -1;
- if ((listnum = newChannel (pc, CPlist, cpsize, TYPE_CP)) < 0)
- return -1;
- return 0;
- }
- #endif
- char *parseECP (char *str, char *buf)
- {
- char *buffer = buf;
- int flag = -1;
- unsigned char c;
- unsigned char part;
- buf += sizeof (int);
- for (part=0; *str ;str++)
- {
- c = *str;
- switch (c)
- {
- case ':':
- flag = 0;
- break;
- case '.':
- if (flag < 2)
- {
- *(unsigned char *)buf = part;
- buf += sizeof (char);
- }
- part = 0;
- flag ++;
- break;
- default:
- if (c >= '0' && c <= '9')
- part = c;
- break;
- }
- }
- *(int *)buffer = (buf - buffer - sizeof (int))/sizeof(short);
- return buf;
- }
- int periodCheck (float KBPSused)
- {
- struct Session *head;
- int max, listnum;
- struct statistics
- {
- unsigned int resnum;
- unsigned short connnum;
- float bandwidth;
- } stat;
- stat.bandwidth = KBPSused/(MAX_BANDWIDTH*1024)/8;
- /* isSet whether TS has returned WELCOME message */
- if (isSet == 0)
- {
- register_cp ();
- return 0;
- }
- max = TRACKER[TYPE_P2PC].maxid + 1;
- head = TRACKER[TYPE_P2PC].head;
- memset (&stat, 0, sizeof(stat));
- for (listnum = 0; listnum < max; listnum++)
- {
- if (head[listnum].socket > 0)
- {
- stat.resnum ++;
- stat.connnum ++;
- }
- }
- max = TRACKER[TYPE_P2PS].maxid + 1;
- head = TRACKER[TYPE_P2PS].head;
- for (listnum = 0; listnum < max; listnum++)
- {
- if (head[listnum].socket > 0)
- {
- if (head[listnum].pc == NULL &&
- head[listnum].header == NULL &&
- CurrentTime - head[listnum].last_transferblock > MAX_TRANSFER_IDLE)
- {
- PDEBUG ("timeout %d from NP %d to %d.n", listnum, head[listnum].last_transferblock, (int)CurrentTime);
- Clientclosure (listnum, TYPE_P2PS);
- }
- else
- stat.connnum ++;
- }
- }
- #ifdef HAVE_TS
- *(unsigned int *)(UDPMsg.buffer) = stat.resnum;
- *(unsigned short *)(UDPMsg.buffer+sizeof(int)) = stat.connnum;
- *(float *)(UDPMsg.buffer+sizeof(int)+sizeof(short)) = stat.bandwidth;
- if(stat.connnum <= MAX_P2PS)
- *(unsigned char *)(UDPMsg.buffer+sizeof(int)+sizeof(short)+sizeof(float)) = 0;
- else
- *(unsigned char *)(UDPMsg.buffer+sizeof(int)+sizeof(short)+sizeof(float)) = 1;
- UDPMsg.len = 23;
- UDPMsg.type = CP2TS_UPDATE;
- if (sendMessage (TSSOCK, (char *)&UDPMsg, &TSADDR) < 0)
- {
- PDEBUG ("exit...n");
- exit (1);
- }
- #endif
- PDEBUG("Res Num: %d. Connection Num: %d. BandWidth Usage: %.4f. n", stat.resnum, stat.connnum, stat.bandwidth);
- return 0;
- }
- void makeSnapShot(int count, int time_interval)
- {
- time_t tmpTime;
- struct tm result;
- struct Channel *pc, *nextpc;
- // struct Session *ps;
- // struct Edge *pe;
- int cpchannelcount = 0;
- int totalclient = 0;
- long long totalupsize = 0, totaldownsize = 0;
- FILE *f;
- char buffer[MAX_DATA];
- if (time_interval <= 0)
- return;
- localtime_r(&CurrentTime, &result);
- sprintf (buffer, "./cp-%d-%d-%d.log", result.tm_year+1900, result.tm_mon+1, result.tm_mday);
- if ((f = fopen(buffer,"a")) == NULL)
- {
- PDEBUG("Couldn't open cp.log file! n");
- return;
- }
- fseeko(f, 0, SEEK_END);
- // 1. start CP SnapShot
- fprintf(f, "nn**************Start %d SnapShot of CP, Time : %u/%u %u:%u:%u.********* n",count,result.tm_mon+1, result.tm_mday, result.tm_hour, result.tm_min, result.tm_sec);
- // 2. log speed
- fprintf(f, "CP: cur Down %.4f KB. n", ((float)tmpDownBytes)/1024/time_interval);
- fprintf(f, "CP: cur Up %.4f KB. n", ((float)tmpUpBytes)/1024/time_interval);
- periodCheck(((float)tmpDownBytes+tmpUpBytes)/1024/time_interval);
- totalDownBytes += tmpDownBytes;
- totalUpBytes += tmpUpBytes;
- tmpTime = CurrentTime - startTime;
- fprintf(f, "CP: avg Down %.4f KB. n", ((float)totalDownBytes)/1024/tmpTime);
- fprintf(f, "CP: avg Up %.4f KB. n", ((float)totalUpBytes)/1024/tmpTime);
- // 3. log channel state
- for (pc=ChannelList; pc; pc=nextpc)
- {
- nextpc = pc->lnext;
- ++cpchannelcount;
- totalclient += pc->numclient;
- totalupsize += pc->upsize;
- totaldownsize += pc->downsize;
- fprintf(f,"Channel %s have %d client. Down size %lld, avg speed %f. Up Size %lld, avg speed %f. n",pc->fname,pc->numclient,pc->downsize, ((float)(pc->downsize)) / time_interval, pc->upsize, ((float)(pc->upsize))/ time_interval);
- /*
- for (pe=pc->PeerHead; pe; pe = pe->cnext)
- {
- // if bitrate < 300kb/s ,then kill it
- if (pe->me->totalup/(CurrentTime - pe->me->time_sec)/1024 < 300)
- fprintf(f,"Session bitrate:%lld .Too slow ! n",pe->me->totalup/(CurrentTime - pe->me->time_sec));
- }
- */
- if (pc->numclient == 0)
- freeLiveChannel (pc, NULL);
- else if (CurrentTime - pc->last_nearpeer > NearPeerInterval)
- {
- send_nearpeers_toall (pc);
- pc->last_nearpeer = CurrentTime;
- }
- }
- fprintf(f,"Channel Count : %d. Total client : %d. Total dowsize: %lld. Total upsize %lld n",cpchannelcount,totalclient,totaldownsize,totalupsize);
- fprintf(f,"n*********************End of SnapShot************************n");
- fclose(f);
- logto_xml (time_interval, tmpTime, cpchannelcount, totalclient);
- tmpDownBytes = tmpUpBytes = 0;
- }
- int reconnect (struct Session *p)
- {
- struct NormalAddress *client;
- struct sockaddr_in addr;
- if (p->pc == NULL || p->pc->pcinfo->numofsp <= p->flag) return -1;
- close (p->socket);
- FD_CLR(p->socket, &osocks);
- if ((p->socket = socket (PF_INET, SOCK_STREAM, 0)) < 0)
- {
- perror ("socket||gethostbyname");
- p->socket = 0;
- return -1;
- }
- client = &(p->pc->pcinfo->SPLIST[p->flag]);
- memset (&addr, 0, sizeof (addr));
- addr.sin_port = client->sin_port;
- addr.sin_addr = client->sin_addr;
- addr.sin_family = AF_INET;
- p->flag ++;
- if ((p->sock_flag = connect_nonb(p->socket, &addr, sizeof (addr))) == -1)
- {
- close (p->socket);
- return -1;
- }
- FD_SET(p->socket, &osocks);
- p->time_sec = CurrentTime;
- p->totalup = 0;
- return p->flag;
- }
- int send_P2P_PUSHLIST (struct Channel *pc, int id)
- {
- unsigned char *ptr;
- char buffer[MAX_DATA], *buf;
- struct LiveChannelInfo *pcinfo = pc->pcinfo;
- int i, j;
- buf = buffer+sizeof (int);
- *(unsigned char *)buf = P2P_PUSHLIST;
- buf += sizeof (char);
- memcpy (buf, pc->channel_md5, MD5_LEN);
- buf += MD5_LEN;
- *(unsigned char *)buf = 0;
- buf += sizeof (char);
- ptr = buf;
- buf += sizeof (char);
- *ptr = 0;
- for (i=id; (i >= pcinfo->s.minBlockID && i <= pcinfo->s.maxBlockID) && i<id+REQUEST_AHEAD; i++)
- {
- j = i % pcinfo->max_queue;
- if (pcinfo->indisk[j] == (i/pcinfo->max_queue+1) || isSet (pcinfo->bitflag, j))
- continue;
- *(int *)buf = i;
- buf += sizeof (int);
- (*ptr) ++;
- setBit (pcinfo->bitflag, j);
- }
- if (*ptr > 0)
- {
- *(unsigned char *)buf = 0;
- buf += sizeof (char);
- *(int *)buffer = buf - buffer;
- writeMessage (pcinfo->dataSource, pc, buffer);
- }
- return 0;
- }
- int send_nearpeers (struct Channel *pc, struct Edge *pme)
- {
- char buffer[MAX_DATA], *buf, *ptr;
- struct Edge *pedge;
- int i;
- buf = buffer+sizeof (int);
- *(unsigned char *)buf = P2P_NEAR_PEERS;
- buf += sizeof (char);
- ptr = buf;
- buf += sizeof (char);
- for (i=0, pedge=pme->cnext; i<MAX_NEARPEER; pedge=pedge->cnext)
- {
- if (pedge == NULL)
- pedge = pc->PeerHead;
- if (pedge == pme)
- break;
- memcpy (buf, &(pedge->me->addr), sizeof (struct PeerInfoWithAddr));
- buf += sizeof (struct PeerInfoWithAddr);
- i++;
- }
- if (i > 0)
- {
- *(unsigned char *)ptr = i;
- *(int *)buffer = buf - buffer;
- writeMessage (pme->me, pc, buffer);
- }
- return 0;
- }
- int send_nearpeers_toall (struct Channel *pc)
- {
- struct Edge *pedge;
- for (pedge=pc->PeerHead; pedge; pedge=pedge->cnext)
- send_nearpeers (pc, pedge);
- return 0;
- }