sessions.c
资源名称:p2p_vod.rar [点击查看]
上传用户:liguizhu
上传日期:2015-11-01
资源大小:2422k
文件大小:14k
源码类别:
P2P编程
开发平台:
Visual C++
- /*
- * Openmysee
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
- *
- */
- #define FENCE_DATA 1024
- int IN_LOOP = 1;
- struct cachetype *BufferCacheHead;
- inline char *NEW ()
- {
- char *result;
- if (BufferCacheHead != NULL)
- {
- result = (char *) BufferCacheHead;
- BufferCacheHead = BufferCacheHead->next;
- } else
- {
- result = calloc (1, MAX_DATA+FENCE_DATA);
- }
- assert (result != NULL);
- return result;
- }
- inline void FREE (void *p)
- {
- if (p == NULL) return;
- ((struct cachetype *)p)->next = BufferCacheHead;
- BufferCacheHead = p;
- }
- inline void closure_cache ()
- {
- struct cachetype *p, *nextp;
- for (p=BufferCacheHead; p; p=nextp)
- {
- nextp = p->next;
- free (p);
- }
- BufferCacheHead = NULL;
- }
- void terminate (int sig)
- {
- IN_LOOP --;
- }
- int handle_new_connection(int sock, int type)
- {
- struct sockaddr_in addr;
- socklen_t addrlen = sizeof (struct sockaddr_in);
- int listnum, max, flags;
- struct Session *head;
- #ifdef SO_LINGER
- struct linger ling;
- #endif
- int newconn = accept(sock, (struct sockaddr *)(&addr), &addrlen);
- int keepalive = 1;
- if (newconn < 0)
- {
- perror("accept");
- return -1;
- }
- #ifdef SO_LINGER
- ling.l_onoff = 1;
- ling.l_linger = 0;
- setsockopt (newconn, SOL_SOCKET, SO_LINGER, &ling, sizeof (struct linger));
- #endif
- setsockopt (newconn, SOL_SOCKET, SO_KEEPALIVE, &keepalive, sizeof (keepalive));
- flags = fcntl(newconn, F_GETFL, 0);
- flags |= O_NONBLOCK;
- fcntl(newconn, F_SETFL, flags);
- if (TRACKER[type].cur == TRACKER[type].max)
- {
- char *buf, buffer[MAX_DATA];
- buf = buffer+sizeof (int);
- *(unsigned char *)buf = P2P_MSG;
- buf += sizeof (char);
- *(unsigned short *)buf = ERR_CONNECTION_FULL;
- buf += sizeof (short);
- *(unsigned char *)buf = 1;
- buf += sizeof (char);
- *(unsigned int *)buffer = buf - buffer;
- write (newconn, buffer, *(unsigned int *)buffer);
- close (newconn);
- return -1;
- }
- head = TRACKER[type].head;
- max = TRACKER[type].max;
- for (listnum = 0; listnum < max; listnum ++)
- {
- if(head[listnum].socket == 0)
- {
- head[listnum].socket = newconn;
- head[listnum].type = type;
- getpeername (newconn, (struct sockaddr *)(&addr), &addrlen);
- head[listnum].host = ntohl(addr.sin_addr.s_addr);
- head[listnum].port = ntohs(addr.sin_port);
- head[listnum].buf = NEW ();
- #ifdef __CP_SOURCE
- head[listnum].totalup = 0;
- #endif
- head[listnum].time_sec = CurrentTime;
- head[listnum].last_transferblock = CurrentTime;
- FD_SET(newconn, &osocks);
- if (listnum > TRACKER[type].maxid)
- TRACKER[type].maxid = listnum;
- break;
- }
- }
- if (listnum >= max)
- {
- PDEBUG ("no space left for incoming client type %d.", type);
- close (newconn);
- return -1;
- }
- TRACKER[type].cur ++;
- return (*(TRACKER[type].init)) (listnum);
- }
- int Clientclosure (int listnum, int type)
- {
- (*(TRACKER[type].closure)) (listnum);
- while (TRACKER[type].maxid == listnum && TRACKER[type].head[listnum].socket == 0 && listnum > 0)
- {
- listnum --;
- TRACKER[type].maxid --;
- }
- return (--TRACKER[type].cur);
- }
- void my_exit() __attribute__((noreturn, destructor));
- #ifdef __SP_SOURCE
- extern struct Channel *ProgramHash[MAX_CHANNEL];
- #endif
- extern void freeJobCache ();
- void my_exit (int err)
- {
- int max, listnum, type;
- struct Session *head;
- int (*closure) (int);
- for (type=MAX_TYPE-1; type >=0; type --)
- {
- max = TRACKER[type].maxid+1;
- closure = TRACKER[type].closure;
- if ((head = TRACKER[type].head) != NULL)
- {
- for (listnum=0; listnum<max; listnum++)
- {
- if (head[listnum].socket > 0)
- (*closure) (listnum);
- }
- free (head);
- }
- #ifdef __CP_SOURCE
- if (TRACKER[type].flag == FLAG_SERVER) close (TRACKER[type].sock);
- #endif
- #ifdef __SP_SOURCE
- close (TRACKER[type].sock);
- #endif
- }
- #ifdef __CP_SOURCE
- #ifdef HAVE_TS
- closure_TS ();
- close (TSSOCK);
- #endif
- #endif
- PDEBUG ("exit...n");
- freeAllChannel ();
- #ifdef __SP_SOURCE
- db_end ();
- timer_free ();
- freeAllProgram ();
- freeAllOrder ();
- #endif
- free_config (ConfigParameters, sizeof(ConfigParameters)/sizeof(struct NamVal));
- closure_cache ();
- freeJobCache ();
- unlink (PIDFile);
- CLOSELOG;
- exit (err);
- }
- /* check the message in p->buf, wheter is complete */
- inline int checkComplete (struct Session *p)
- {
- int len;
- if (p->socket == 0 || p->off < sizeof (int)+sizeof(char)) return 0;
- len = *(unsigned int *)(p->buf+p->start);
- if (len >= MAX_BLOCK_SIZE) return -1;
- return (p->off >= len?len:0);
- }
- inline void my_memmove (char *dst, char *src, int len)
- {
- int i;
- if (len <= 0 || dst == src) return;
- for (i=0; i<len; i++)
- *dst++ = *src++;
- }
- /* after process a message, update the buf position in p */
- inline int updateBuf (struct Session *p, int len)
- {
- if (p->socket == 0) return 0;
- p->start += len;
- p->off -= len;
- return 0;
- }
- void process_type (int type, fd_set *socks, fd_set *wsocks, fd_set *esocks)
- {
- struct ServerDesc *ser = &(TRACKER[type]);
- struct Session *p;
- int (*process)(int listnum);
- int listnum, max, this_read, this_write;
- max = ser->maxid + 1;
- process = ser->process; //the pointer of function process_P2PC P2PS
- for (listnum = 0; listnum < max; listnum++)
- {
- p = &(ser->head[listnum]);
- #ifdef __CP_SOURCE
- if (p->flag > 0 && (CurrentTime - p->time_sec) > MAX_CONN)
- {
- if (reconnect (p) < 0)
- {
- PDEBUG("Reconnect Error.n");
- Clientclosure (listnum, type);
- }
- continue;
- }
- #endif
- if (p->socket != 0 && FD_ISSET (p->socket, wsocks))
- {
- #ifdef __CP_SOURCE
- struct sockaddr_in addr;
- socklen_t addrlen = sizeof (struct sockaddr_in);
- if (p->flag != 0)
- {
- p->flag = 0;
- getpeername (p->socket, (struct sockaddr *)(&addr), &addrlen);
- p->port = ntohs(addr.sin_port);
- /* restore file status flags */
- fcntl(p->socket, F_SETFL, p->sock_flag);
- }
- #endif
- if ((this_write = processJobs (p)) < 0)
- {
- perror ("PP: Write");
- Clientclosure (listnum, type);
- } else
- {
- // PDEBUG ("Send %d to %dn", this_write, listnum);
- tmpUpBytes += this_write;
- #ifdef __CP_SOURCE
- p->totalup += this_write;
- #endif
- }
- // continue;
- }
- if (p->socket != 0 && FD_ISSET (p->socket, socks))
- {
- if (p->start + p->off >= MAX_DATA-1)
- {
- my_memmove (p->buf, p->buf+p->start, p->off);
- p->start = 0;
- } else if (p->off == 0)
- p->start = 0;
- if ((this_read = read(p->socket, p->buf+p->start+p->off, MAX_DATA -p->start- p->off)) <= 0)
- {
- perror("PP: read err");
- PDEBUG("socket: %d, p->start: %d. p->off: %d. n", p->socket, p->start, p->off);
- Clientclosure (listnum, type);
- } else
- {
- p->off += this_read;
- }
- }
- while (p->socket != 0 && (this_read = checkComplete (p)) > 0)
- {
- if ((*process) (listnum) == -2) break;
- updateBuf (p, this_read);
- }
- if (p->socket != 0)
- {
- if (this_read < 0 || this_read > MAX_DATA)
- {
- PDEBUG ("Too long message, cut off, length is %d, and p->off is %d, p->start is %d.n", this_read, p->off, p->start);
- Clientclosure (listnum, type);
- }
- }
- }
- }
- void process_child (void)
- {
- int readsocks; //?
- struct Session *head;
- int highsock;
- fd_set socks, wsocks, esocks;
- int type, listnum, max;
- struct timeval tm;
- #ifdef __CP_SOURCE
- time_t last_handle_conn = 0;
- #endif
- startTime = time(NULL);
- while (IN_LOOP > 0)
- {
- FD_ZERO(&socks);
- FD_ZERO(&esocks);
- FD_ZERO(&wsocks);
- #ifdef __CP_SOURCE
- #ifdef HAVE_TS
- highsock = TSSOCK;
- FD_SET(TSSOCK, &socks);
- #else
- highsock = 0;
- #endif
- #endif
- CurrentTime = time (NULL);
- #ifdef __SP_SOURCE
- highsock = 0;
- for(type=0; type<MAX_TS; type++)
- {
- if (tsSock[type] <= 0) continue;
- if (tsSock[type] > highsock)
- highsock = tsSock[type];
- FD_SET(tsSock[type], &socks);
- }
- #endif
- for (type=0; type<MAX_TYPE; type++)//type is P2PC and P2PS
- {
- #ifdef __CP_SOURCE
- if (TRACKER[type].flag == FLAG_SERVER && highsock < TRACKER[type].sock)
- highsock = TRACKER[type].sock;
- if(CurrentTime - last_handle_conn >= 1)
- {
- last_handle_conn = CurrentTime;
- }
- if (TRACKER[type].flag == FLAG_SERVER) FD_SET(TRACKER[type].sock, &socks);
- #endif
- #ifdef __SP_SOURCE
- if (highsock < TRACKER[type].sock)
- highsock = TRACKER[type].sock;
- FD_SET(TRACKER[type].sock, &socks);
- #endif
- max = TRACKER[type].maxid + 1;
- head = TRACKER[type].head;
- for (listnum = 0; listnum < max; listnum++)
- {
- if (head[listnum].socket != 0)
- {
- if (head[listnum].head)
- FD_SET(head[listnum].socket, &wsocks);
- else
- FD_SET(head[listnum].socket, &socks);
- if (head[listnum].socket > highsock)
- highsock = head[listnum].socket;
- }
- }
- }
- tm.tv_sec = 0;
- tm.tv_usec = 10000;
- readsocks = select(highsock+1, &socks, &wsocks, &esocks, &tm);
- if (readsocks <= 0)
- goto NEXTROUND;
- #ifdef __CP_SOURCE
- #ifdef HAVE_TS
- if (FD_ISSET(TSSOCK, &socks))
- process_TS ();
- if (FD_ISSET(TSSOCK, &esocks))
- {
- PDEBUG ("exit...n");
- exit (errno);
- }
- #endif
- #endif
- #ifdef __SP_SOURCE
- for(type=0; type < MAX_TS; ++type)
- {
- if(tsSock[type] > 0 && FD_ISSET(tsSock[type], &socks))
- process_TS2RM(type); //
- }
- #endif
- for (type=0; type<MAX_TYPE; type++)
- {
- #ifdef __CP_SOURCE
- if (TRACKER[type].flag == FLAG_SERVER && FD_ISSET(TRACKER[type].sock, &socks))
- #endif
- #ifdef __SP_SOURCE
- if (FD_ISSET(TRACKER[type].sock, &socks))
- #endif
- handle_new_connection (TRACKER[type].sock, type);
- #ifdef __CP_SOURCE
- if (TRACKER[type].flag == FLAG_SERVER && FD_ISSET(TRACKER[type].sock, &esocks))
- #endif
- #ifdef __SP_SOURCE
- if (FD_ISSET(TRACKER[type].sock, &esocks))
- #endif
- {
- PDEBUG ("exit...n");
- exit (errno);
- }
- process_type (type, &socks, &wsocks, &esocks);
- }
- NEXTROUND:
- period_process ();
- }
- }
- /* assume the message has been transfered into p->buf */
- inline void writeDATAMessage (struct Session *p, struct Channel *pc, struct JobDes *ptr)
- {
- ptr->len += (*(int *)(ptr->buffer));
- addJob (p, pc, ptr);
- }
- int writeMessage (struct Session *p, struct Channel *pc, char *ptr)
- {
- struct JobDes *pj;
- char *buffer;
- int max = 0;
- int new = 0;
- int len = *(int *)ptr;
- if ((pj = findEnoughBuffer (p, pc, len)) == NULL)
- {
- new = 1;
- if ((pj = newJob ()) == NULL)
- return -1;
- }
- buffer = getJobBuffer (pj, &max);
- memcpy (buffer, ptr, len);
- pj->len += len;
- if (new)
- addJob (p, pc, pj);
- return 0;
- }
- struct Edge *newEdge (struct Channel *head, struct Session *me)
- {
- struct Edge *result = malloc (sizeof (struct Edge));
- result->head = head;
- result->me = me;
- result->cnext = head->PeerHead;
- head->PeerHead = result;
- result->enext = me->header;
- me->header = result;
- return result;
- }
- int delEdge (struct Edge *e)
- {
- struct Channel *pchannel=e->head;
- struct Session *psession=e->me;
- struct Edge *pedge;
- if (pchannel)
- {
- if (pchannel->PeerHead == e) pchannel->PeerHead = e->cnext;
- else
- {
- for (pedge=pchannel->PeerHead; pedge && pedge->cnext && pedge->cnext != e; pedge = pedge->cnext);
- if (pedge && pedge->cnext)
- pedge->cnext = e->cnext;
- }
- }
- if (psession)
- {
- if (psession->header == e) psession->header = e->enext;
- else
- {
- for (pedge=psession->header; pedge && pedge->enext && pedge->enext != e; pedge = pedge->enext);
- if (pedge && pedge->enext)
- pedge->enext = e->enext;
- }
- }
- if (psession->pc == pchannel) psession->pc = NULL;
- free (e);
- return 0;
- }
- void apply_session (struct Session *head, int size, void apply(struct Session *, void *), void *p)
- {
- int i;
- for (i = 0; i < size; i++)
- {
- if (head[i].socket > 0) apply (&(head[i]), p);
- }
- }
- int logto_xml (unsigned int time_interval, unsigned int tmpTime, unsigned int channelcount, unsigned int totalclient)
- {
- FILE *logf;
- if (LOGXML == NULL || LOGXML[0] == 0 || (logf = fopen(LOGXML,"w")) == NULL)
- {
- PDEBUG("Couldn't open xml log %s!.n", LOGXML);
- return -1;
- }
- fprintf(logf, "<?xml version="1.0" encoding="iso-8859-1"?>n");
- #ifdef __SP_SOURCE
- fprintf(logf, "<SP>n");
- #endif
- #ifdef __CP_SOURCE
- fprintf(logf, "<CP>n");
- #endif
- fprintf(logf, "<ElapsedTime>%ld</ElapsedTime>n", CurrentTime-startTime);
- fprintf(logf, "<CurrentIncoming>%.4f</CurrentIncoming>n", ((float)tmpDownBytes)/1024/time_interval);
- fprintf(logf, "<CurrentOutgoing>%.4f</CurrentOutgoing>n", ((float)tmpUpBytes)/1024/time_interval);
- fprintf(logf, "<AverageIncoming>%.4f</AverageIncoming>n", ((float)totalDownBytes)/1024/tmpTime);
- fprintf(logf, "<AverageOutgoing>%.4f</AverageOutgoing>n", ((float)totalUpBytes)/1024/tmpTime);
- fprintf(logf, "<ActiveChannel>%d</ActiveChannel>n", channelcount);
- fprintf(logf, "<OnlineUser>%d</OnlineUser>n", totalclient);
- fprintf(logf, "<TotalIncoming>%lld</TotalIncoming>n", totalDownBytes);
- fprintf(logf, "<TotalOutgoing>%lld</TotalOutgoing>n", totalUpBytes);
- #ifdef __SP_SOURCE
- fprintf(logf, "</SP>n");
- #endif
- #ifdef __SP_SOURCE
- fprintf(logf, "</CP>n");
- #endif
- fclose(logf);
- return 0;
- }