TSnew.c
资源名称:p2p_vod.rar [点击查看]
上传用户:liguizhu
上传日期:2015-11-01
资源大小:2422k
文件大小:67k
源码类别:
P2P编程
开发平台:
Visual C++
- /*
- * Openmysee
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
- *
- */
- #include <stdio.h>
- #include <stdlib.h>
- #include <stdarg.h>
- #include <ctype.h>
- #include <sys/types.h>
- #include <sys/time.h>
- #include <sys/stat.h>
- #include <sys/wait.h>
- #include <sys/resource.h>
- #include <sys/socket.h>
- #include <netinet/in.h>
- #include <arpa/inet.h>
- #include <netdb.h>
- #include <unistd.h>
- #include <fcntl.h>
- #include <errno.h>
- #include <assert.h>
- #include <syslog.h>
- #include <string.h>
- #include <time.h>
- #include <assert.h>
- #include "ProtocolDefine.h"
- #include "StructDefine.h"
- #include "ErrorDefine.h"
- #include "util.h"
- #include "findcp.h"
- #define MAX_BIND 10
- #define AUTH_HEADER 12
- #define NORMAL_HEADER 5
- #define MAX_NET_NUM 5
- #define MAX_CLIENT 3072
- #define MAX_PEER 0x10 /* max peers returned by NEED_PEERS */
- #define IPADDR_LEN 16 /*****IP地址最大长度*****/
- #define MD5_LEN 32
- #define SERVICE_LEN 128
- #define TYPE_NP 0
- #define TYPE_CP 1
- #define MAX_CHANNEL 0x400 //1024
- #define MAX_NP 0x40000 //262144
- #define MAX_CP 0x10000 //65536
- #define MAX_RM 0x4 //4
- //#define TS4CP_PORT 22168
- //#define TS4RM_PORT 22169
- #define MAX_DATA 20000
- #define MAX_LINE 1024
- #define MAX_IDLE 120
- #define SILENCE_TIME 2
- #define BUILD_SOCKADDR(host,port,client)
- {
- memset ((char *)&client, 0, sizeof (client));
- client.sin_port = htons (pnp->port);
- client.sin_family = AF_INET;
- client.sin_addr.s_addr = htonl (pnp->host);
- }
- #define FREE_TRACKER(tracker)
- {
- int i;
- max = tracker.max;
- closure = tracker.closure;
- if ((head = tracker.head) != NULL)
- {
- for (listnum=0; listnum<max; listnum++)
- {
- if (head[listnum].socket > 0)
- (*closure) (head+listnum);
- }
- free (head);
- }
- if (tracker.hash) free (NPTRACKER.hash);
- for (i=0; tracker.sock[i] != 0 && i<MAX_BIND; i++)
- close (tracker.sock[i]);
- }
- #define MAX_INTERVAL 20
- #define OBSERVE_LAYER 10
- #define BASEDIR "/home/channel/"
- int current_log_count = 0;
- #define MAX_LOG_COUNT 100 // call fflush() when current_log_count reaches MAX_LOG_COUNT
- // 用来表示NP上面所有的快区间, 目前块区间由一个开始字段和一个长度字段来标识.
- struct Interval
- {
- unsigned int start;
- unsigned int len;
- };
- // Generic format of messages between TS and other peers
- // Most often used in login process
- struct Message
- {
- unsigned int len;
- unsigned char type;
- char buffer[MAX_DATA];
- };
- // format of messages between TS and other peers
- // with authcodes specified
- struct TSMessage
- {
- unsigned int len;
- unsigned char type;
- unsigned int authcode1:24;
- unsigned int authcode2;
- char buffer[MAX_DATA];
- } UDPMsg;
- struct ChannelInfo
- {
- char md5[MD5_LEN+1];
- unsigned char numinter;
- struct Interval inter[MAX_INTERVAL];
- };
- struct Channel
- {
- char name[MD5_LEN+1];
- unsigned int numclient;
- unsigned int accumclient;
- unsigned int latest_time;
- #ifndef SORT_NET
- struct Edge *PeerHead;
- #else
- unsigned int nclient_net[MAX_NET_NUM];
- struct Edge *PeerHead[MAX_NET_NUM];
- #endif
- struct Session *SCPhead;
- struct Channel *next;
- };
- struct Edge
- {
- struct Channel *head;
- struct Session *me;
- struct Edge *cnext;
- struct Edge *enext;
- unsigned char numinter;
- unsigned int current;
- struct Interval inter[MAX_INTERVAL];
- };
- struct NPInfo
- {
- struct CorePeerInfo p;
- struct TransferInfo t;
- struct StatInfo s;
- int startBlock; // starting position of NP
- int numchannel;
- struct Edge *cur;
- struct Edge *header;
- };
- struct CPInfo
- {
- char type;
- unsigned char numHeads;
- unsigned short connnum;
- unsigned char maxConn;
- char parameter[42];
- int userid;
- int resnum;
- float band;
- char servicetype[128];
- };
- struct Session
- {
- int type;
- int socket;
- unsigned int host;
- unsigned int intra;
- unsigned short port;
- unsigned short npport;
- #ifdef SORT_NET
- unsigned int net;
- struct Session *cachepeer[MAX_NET_NUM];
- #else
- struct Session *cachepeer;
- #endif
- unsigned int auth;
- unsigned int time_sec;
- unsigned int last_access;
- union
- {
- struct NPInfo p;
- struct CPInfo cp;
- } u;
- float clientVer;
- struct Session *hnext;
- };
- FILE *statlog = NULL; // statistics log file
- extern int errno;
- int OUTPUT_STAT;
- time_t CurTimeSec;
- time_t startTime;
- char *CONFIG="./ats.cfg";
- char *LOCALHOST;
- #ifdef HAVE_MYSQL
- char *MYSQL_HOST="localhost";
- char *MYSQL_USER="root";
- char *MYSQL_PASS="gtv";
- char *MYSQL_DB="gtv";
- #endif
- int CurrentSock;
- fd_set osocks;
- int highsock;
- char *LOGXML;
- struct TransferInfo Transfer;
- struct sockaddr_in UDPCLIENT;
- extern int init();
- extern int readconfig(char * filename);
- extern const char* find_ip_from_list(unsigned long ip);
- extern int findcppeers(unsigned long ip, void *p);
- extern int findnettype(const char *servicetype, void* p);
- extern void add_cp_to_list(void *p);
- extern void remove_cp_from_list(void *p);
- extern const char* find_cp_service_type(unsigned long ip);
- #define MAX_POLLUTE 1000
- int BINDALL;
- int Polluted; /* should we call periodLOG? if polluted>MAX_POLLUTE call */
- struct Array cfgTS4NP_PORT;
- struct Array cfgTS4CP_PORT;
- struct Array cfgTS4RM_PORT;
- int SnapShotInterval; // make a snap shot every few seconds
- float MIN_CLIENT_VERSION; // client version should be bigger than this one.
- long long np2tsLoginCount = 0;
- long long np2tsResListCount = 0;
- long long np2tsReqResCount = 0;
- long long np2tsDelResCount = 0;
- long long np2tsReportCount = 0;
- long long np2tsNeedPeerCount = 0;
- long long np2tsLogoutCount = 0;
- long long ts2npWelcomeCount = 0;
- long long ts2npPeersCount = 0;
- long long ts2npConnectToCount = 0;
- long long ts2npMsgCount = 0;
- /***************************************************
- * SessionCluster是有关每种服务的信息. *
- * 其中有一个指向该服务相关的各个Session的指针. *
- ***************************************************/
- struct SessionCluster
- {
- unsigned int port[MAX_BIND];
- unsigned int maxbuf;
- int sock[MAX_BIND]; // 描述所使用的socket
- int cur; // current number of sessions 当前客户端的个数
- int max; // maximum number of sessions 最多容纳客户端的个数
- int maxid; // maxid: maximum session index currently in the list. for optimization of search
- struct Session *head; /* pointer to the session pool
- head[0]为第一个Session,head[max-1]为最后一个session */
- struct Session **hash; // session hash table
- int (*process) (struct Session *); //这一服务中消息的处理函数
- int (*closure) (struct Session *); //这一服务中需要的析构函数
- };
- struct Session *GCPCHOICE;
- struct Channel *ChannelHash[MAX_CHANNEL];
- struct SessionCluster NPTRACKER, CPTRACKER;
- #ifdef HAVE_RM
- struct SessionCluster RMTRACKER;
- #endif
- #ifdef SORT_NET
- char *NETFN;
- #endif
- struct NamVal ConfigParameters[]
- =
- {
- #ifdef HAVE_MYSQL
- {"MysqlAddress", &MYSQL_HOST, 's'},
- {"User", &MYSQL_USER, 's'},
- {"Password", &MYSQL_PASS, 's'},
- {"Database", &MYSQL_DB, 's'},
- #endif
- #ifdef SORT_NET
- {"Netfile", &NETFN, 's'},
- #endif
- {"Bind", &LOCALHOST, 's'},
- {"TS4NP_PORT", &cfgTS4NP_PORT, 'a'},
- {"TS4CP_PORT", &cfgTS4CP_PORT, 'a'},
- {"TS4RM_PORT", &cfgTS4RM_PORT, 'a'},
- {"SnapShotInterval", &SnapShotInterval, 'd'},
- {"ClientVersion", &MIN_CLIENT_VERSION, 'f'},
- {"LogFilePath", &LOGXML, 's'},
- {"BINDALL", &BINDALL, 'd'}
- };
- #ifdef DEBUG
- #define PDEBUG(fmt, args...) fprintf(stderr, "TS: (%s,%d)" fmt, __FILE__, __LINE__, ## args)
- #else
- #define PDEBUG(fmt, args...) do {} while (0)
- #endif
- #ifdef SORT_NET
- #define MAX_NET 2048
- struct networks
- {
- unsigned int host;
- unsigned int mask;
- int net;
- } NETBLOCKS[MAX_NET];
- unsigned int MASKS[33]={0x0,0x80000000,0xc0000000,0xe0000000,0xf0000000,0xf8000000,0xfc000000,0xfe000000,0xff000000,0xff800000,0xffc00000,0xffe00000,0xfff00000,0xfff80000,0xfffc0000,0xfffe0000,0xffff0000,0xffff8000,0xffffc000,0xffffe000,0xfffff000,0xfffff800,0xfffffc00,0xfffffe00,0xffffff00,0xffffff80,0xffffffc0,0xffffffe0,0xfffffff0,0xfffffff8,0xfffffffc,0xfffffffe,0xffffffff};
- // globals
- int maxNet; // max id
- int readNETBLOCK (char *fname); //用来读入network配置文件
- struct networks *getnetwork (unsigned int host, struct networks *head, int n);
- int compareNet (const void *a, const void *b);
- #endif
- struct Message ErrMSG;
- #define SEND_NPMSG(sock,msg,code,quit,client)
- {
- ErrMSG.len = 8;
- ErrMSG.type = msg;
- ++ts2npMsgCount;
- *(unsigned short *)(ErrMSG.buffer) = code;
- *(unsigned char *)(ErrMSG.buffer+sizeof(short)) = quit;
- sendMessage(sock,(char *)&ErrMSG,client);
- }
- int init_ts ();
- void my_exit () __attribute__((noreturn, destructor));
- void process_child (void);
- int init_NP (struct Session *);
- int process_NP (int idsock);
- int closure_NP (struct Session *);
- int process_NP2TS_LOGIN (struct Message *);
- int process_NP2TS_REPORT (struct Session *, struct TSMessage *);
- int process_NP2TS_REPORT2 (struct Session *, struct TSMessage *);
- int process_NP2TS_RES_LIST (struct Session *, struct TSMessage *);
- int process_NP2TS_REQ_RES (struct Session *, struct TSMessage *);
- int process_NP2TS_DEL_RES (struct Session *, struct TSMessage *);
- int process_NP2TS_NEED_PEERS (struct Session *, struct TSMessage *m);
- int process_NP2TS_QUERY_RES (struct Session *p, struct TSMessage *m);
- int process_NEED_PEERS_real (struct Session *p, char *md5, int needcp, unsigned int cur, unsigned char layer);
- int findNPPeers (struct Channel *pc, struct Session *me, int playing, int num, char **buffer, char *buffer1);
- int findCPPeers (unsigned long host, char *md5, char **buffer);
- int init_CP (struct Session *);
- int process_CP (int idsock);
- int closure_CP (struct Session *);
- int process_CP2TS_UPDATE (struct Session *, struct TSMessage *m);
- int process_CP2TS_REGISTER (struct Message *m);
- int process_CP2TS_NEED_PEERS (struct Session *p, struct TSMessage *m);
- void periodLOG (int s);
- void makeSnapShot(int count);
- //int memlog(char *pwd,char *cmd);
- void freeChannel (struct Channel *p);
- int check_valid (struct Edge *e, int play);
- int merge (struct Interval *head, int total, struct Interval *_new, int num);
- int delete_interval (struct Interval *head, int total, struct Interval *_new, int num);
- #ifdef HAVE_RM
- int init_RM (struct Session *p);
- int process_RM (int idsock);
- int closure_RM (struct Session *p);
- #endif
- int logto_xml (long channelcount, unsigned int totalclient, long long totalstay);
- inline int hash_np (int h, int p)
- {
- int id = (((h & (0xffffffff-MAX_NP+1)) >> 14) ^ (h & (MAX_NP-1)) ^ (p & 0xffff)) & (MAX_NP -1);
- return id?id:(MAX_NP-1);
- }
- inline int hash_cp (int h, int p)
- {
- int id = (((h & (0xffffffff-MAX_CP+1)) >> 12) ^ (h & (MAX_CP-1)) ^ (p & 0xffff)) & (MAX_CP -1);
- return id?id:(MAX_CP-1);
- }
- inline int hash_str (unsigned char *str, int len)
- {
- int hash;
- for (hash=0; len; len--, str++)
- hash += (hash << 5) - hash + (*str);
- return hash & (MAX_CHANNEL-1);
- }
- // process_child: 主要函数, 这一函数主要用来设置socks和wsocks.
- void process_child (void)
- {
- time_t last_update=0, last_snapshot=0;
- int snapCount = 0;
- int i, readsocks;
- struct timeval tm;
- fd_set socks, esocks;
- startTime = time(NULL);
- for (i=0; ; i++)
- {
- CurTimeSec = time (NULL);
- if (CurTimeSec-last_update > MAX_IDLE)
- {
- periodLOG (1); // close timeout peers
- last_update = CurTimeSec;
- Polluted = 0; // clear dirty flag
- }
- if(CurTimeSec - last_snapshot > SnapShotInterval)
- {
- makeSnapShot(snapCount++); // read it!
- system("/usr/bin/vmstat >> ts.log 2>&1 &");
- last_snapshot = CurTimeSec;
- }
- socks = osocks;
- esocks = osocks;
- tm.tv_sec = 1;
- tm.tv_usec = 0;
- readsocks = select(highsock+1, &socks, (fd_set *) 0, &esocks, &tm);
- if (readsocks <= 0)
- continue;
- for (i=0; i<MAX_BIND && NPTRACKER.sock[i] != 0; i++)
- {
- if (FD_ISSET (NPTRACKER.sock[i], &socks))
- {
- CurrentSock = i;
- process_NP (i);
- }
- // should have a 'break' here to cut off unnecessary check
- }
- for (i=0; i<MAX_BIND && CPTRACKER.sock[i] != 0; i++)
- {
- if (FD_ISSET (CPTRACKER.sock[i], &socks))
- {
- CurrentSock = i;
- process_CP (i);
- }
- }
- #ifdef HAVE_RM
- for (i=0; i<MAX_BIND && RMTRACKER.sock[i] != 0; i++)
- {
- if (FD_ISSET(RMTRACKER.sock[i], &socks))
- {
- CurrentSock = i;
- process_RM (i);
- }
- }
- #endif
- }
- }
- void my_exit ()
- {
- int max, listnum;
- struct Session *head;
- int (*closure) (struct Session *);
- #ifdef HAVE_RM
- FREE_TRACKER(RMTRACKER);
- #endif
- FREE_TRACKER(NPTRACKER);
- FREE_TRACKER(CPTRACKER);
- // free configuration file.
- // 参数为和read_config相同的struct NamVal *以及项数.
- free_config (ConfigParameters, sizeof(ConfigParameters)/sizeof(struct NamVal));
- #ifdef HAVE_MYSQL
- mysql_close (local_mysql);
- #endif
- PDEBUG ("exit...n");
- exit (0);
- }
- inline void my_memmove (char *dst, char *src, int len)
- {
- int i;
- if (len <= 0 || dst == src) return;
- for (i=0; i<len; i++)
- *dst++ = *src++;
- }
- struct Channel *findChannel (char *name, int len)
- {
- int id = hash_str ((unsigned char*)name, len);
- struct Channel *p;
- for (p=ChannelHash[id]; p; p=p->next)
- {
- if (strncmp (name, p->name, len) == 0)
- return p;
- }
- return NULL;
- }
- struct Edge *newEdge (struct Channel *head, struct Session *me)
- {
- struct Edge *result = (struct Edge *)malloc (sizeof (struct Edge));
- result->head = head;
- result->me = me;
- #ifndef SORT_NET
- result->cnext = head->PeerHead;
- head->PeerHead = result;
- #else
- result->cnext = head->PeerHead[me->net];
- head->PeerHead[me->net] = result;
- head->nclient_net[me->net] ++;
- #endif
- result->enext = me->u.p.header;
- me->u.p.header = result;
- head->numclient ++;
- head->accumclient ++;
- return result;
- }
- int delEdge (struct Edge *e)
- {
- struct Channel *pchannel=e->head;
- struct Session *psession=e->me;
- struct Edge *pedge;
- if (pchannel)
- {
- #ifndef SORT_NET
- if (pchannel->PeerHead == e) pchannel->PeerHead = e->cnext;
- #else
- pchannel->nclient_net[psession->net] --;
- if (pchannel->PeerHead[psession->net] == e) pchannel->PeerHead[psession->net] = e->cnext;
- #endif
- else
- {
- #ifndef SORT_NET
- for (pedge=pchannel->PeerHead; pedge && pedge->cnext && pedge->cnext != e; pedge = pedge->cnext);
- #else
- for (pedge=pchannel->PeerHead[psession->net]; pedge && pedge->cnext && pedge->cnext != e; pedge = pedge->cnext);
- #endif
- if (pedge && pedge->cnext) pedge->cnext = e->cnext;
- }
- pchannel->numclient --;
- if (pchannel->numclient == 0)
- freeChannel (pchannel);
- }
- if (psession)
- {
- if (psession->u.p.header == e) psession->u.p.header = e->enext;
- else
- {
- for (pedge=psession->u.p.header; pedge && pedge->enext && pedge->enext != e; pedge = pedge->enext);
- if (pedge && pedge->enext) pedge->enext = e->enext;
- }
- }
- if (psession->u.p.cur == e) psession->u.p.cur = NULL;
- free (e);
- return 0;
- }
- struct Channel *newChannel (char *name, int len)
- {
- int id = hash_str ((unsigned char*)name, len);
- struct Channel *result = (struct Channel *)calloc (1, sizeof (struct Channel));
- if (!result) return NULL;
- if (len != MD5_LEN)
- {
- free (result);
- return NULL;
- }
- memcpy (result->name, name, len);
- result->name[len] = 0;
- result->latest_time = CurTimeSec;
- result->next = ChannelHash[id];
- ChannelHash[id] = result;
- return result;
- }
- void freeChannel (struct Channel *p)
- {
- int id = hash_str ((unsigned char*)(p->name), MD5_LEN);
- struct Channel *q;
- if (ChannelHash[id] == p)
- ChannelHash[id] = p->next;
- else
- {
- for (q=ChannelHash[id]; q && q->next != p; q=q->next);
- if (q) q->next = p->next;
- }
- free (p);
- }
- // join in a channel for session *p
- // Add a session-channel mapping (aka. an edge)
- int addSession (struct Session *p, struct ChannelInfo *c, int isdefault, unsigned int cur)
- {
- struct Channel *pc;
- struct Edge *pedge = NULL;
- if ((pc=findChannel (c->md5, MD5_LEN)) == NULL) /* Since the channel do not exist, the edge does not exist.
- So there is no need to search the list. */
- {
- if ((pc = newChannel (c->md5, MD5_LEN)) == NULL)
- return -1;
- } else{
- for (pedge=p->u.p.header; pedge && pedge->head != pc; pedge=pedge->enext);
- }
- if (pedge == NULL)
- {
- pedge = newEdge (pc, p);
- }
- pedge->numinter = c->numinter;
- memcpy (pedge->inter, c->inter, c->numinter*sizeof(struct Interval));
- pedge->current = cur;
- if (isdefault) p->u.p.cur = pedge;
- return 0;
- }
- // delete corresponding session in the session-channel map
- int delSession (struct Session *p)
- {
- struct Edge *pedge, *prevedge;
- for (pedge=p->u.p.header; pedge; pedge=prevedge)
- {
- prevedge = pedge->enext;
- delEdge (pedge);
- }
- return 0;
- }
- void periodLOG (int s)
- {
- int i;
- struct Session *p;
- struct tm result;
- localtime_r (&CurTimeSec, &result);
- if (s)
- {
- for (i=0,p=NPTRACKER.head; i<=NPTRACKER.maxid; i++, p++)
- {
- if (p->socket > 0 && CurTimeSec - p->last_access > MAX_IDLE)
- {
- closure_NP (p);
- }
- }
- for (i=0,p=CPTRACKER.head; i<=CPTRACKER.maxid; i++, p++)
- {
- if (p->socket > 0 && CurTimeSec - p->last_access > MAX_IDLE)
- {
- closure_CP (p);
- }
- }
- }
- PDEBUG ("STAT %u,%u/%u %u:%u:%u.n", result.tm_year+1900, result.tm_mon+1, result.tm_mday, result.tm_hour, result.tm_min, result.tm_sec);
- PDEBUG ("STAT: "%ld小时%ld分钟%ld秒"n", CurTimeSec/3600, (CurTimeSec%3600)/60, CurTimeSec%60);
- return;
- }
- void makeSnapShot(int count)
- {
- FILE *f = fopen ("./ts.log", "a");
- if (!f)
- {
- PDEBUG("Couldn't open log file!.n");
- return;
- }
- // seek to the end for write
- fseek(f, 0, SEEK_END);
- // 1. start SnapShot
- struct tm result;
- localtime_r (&CurTimeSec, &result);
- fprintf (f, "nn********************Start %d SnapShot, Time: %u/%u %u:%u:%u.n", count, result.tm_mon+1, result.tm_mday, result.tm_hour, result.tm_min, result.tm_sec);
- // 2. log cpu & mem state
- //memlog(BASEDIR,"./log.sh");
- // 3. log message count
- fprintf(f, "Login: %lld. ResList: %lld. ReqRes: %lld. DelRes: %lld. Report: %lld. NeedPeers: %lld. Logout: %lld. n", np2tsLoginCount, np2tsResListCount, np2tsReqResCount, np2tsDelResCount, np2tsReportCount, np2tsNeedPeerCount, np2tsLogoutCount);
- fprintf(f, "Welcome: %lld. Peers: %lld. ConnectTo: %lld. Msg: %lld.n", ts2npWelcomeCount, ts2npPeersCount, ts2npConnectToCount, ts2npMsgCount);
- fflush(f);
- // 4. log channel state
- struct Channel *pChannel;
- int i, total=0, j, watchingCount, noIdleCount;
- struct Edge* pedge;
- int layerarray[OBSERVE_LAYER], noconnectioncount, higherlayercount;
- long long totalup, totaldown;
- long long totalcurrblock;
- long ts_ChannelCount = 0;
- long long ts_stay, totalstay = 0;
- for (i=0; i<MAX_CHANNEL; i++)
- {
- for (pChannel=ChannelHash[i]; pChannel; pChannel=pChannel->next)
- {
- ++ts_ChannelCount;
- total += pChannel->numclient;
- watchingCount = 0;
- noIdleCount = 0;
- ts_stay = 0;
- totalup = totaldown = totalcurrblock = 0;
- memset(layerarray, 0, OBSERVE_LAYER*sizeof(int));
- noconnectioncount = higherlayercount = 0;
- #ifndef SORT_NET
- for (j=0, pedge=pChannel->PeerHead; pedge && j < pChannel->numclient; pedge=pedge->cnext)
- #else
- {
- int mnetnum;
- for (mnetnum = 0; mnetnum < MAX_NET_NUM; mnetnum ++)
- {
- for (j=0, pedge=pChannel->PeerHead[mnetnum]; pedge && j < pChannel->nclient_net[mnetnum]; pedge=pedge->cnext)
- #endif
- {
- if(pedge->me->socket <= 0)
- continue;
- if(pedge->me->u.p.p.layer == 0xff)
- noconnectioncount++;
- else if(pedge->me->u.p.p.layer < OBSERVE_LAYER)
- layerarray[pedge->me->u.p.p.layer]++;
- else
- higherlayercount++;
- ts_stay += CurTimeSec - pedge->me->time_sec;
- totalup += pedge->me->u.p.t.totalDownBytes;
- totaldown += pedge->me->u.p.t.totalUpBytes;
- if(pedge->current != 0xffffffff)
- {
- totalcurrblock += pedge->current;
- watchingCount++;
- if(CurTimeSec < pedge->me->last_access + 40)
- noIdleCount++;
- }
- }
- #ifdef SORT_NET
- }
- }
- #endif
- totalstay += ts_stay;
- fprintf(f, "%s: accumulated %d clients, current %d clients and %d/%d is watchings. Average Stay Time : %fn", pChannel->name, pChannel->accumclient, pChannel->numclient, watchingCount,noIdleCount,pChannel->numclient==0?0:(ts_stay/(double)pChannel->numclient));
- fprintf(f, "total down: %llu, total up: %llu, down-up: %llu. Avg currBlock: %llu n", totaldown, totalup, (totaldown-totalup), watchingCount==0?0:(totalcurrblock/watchingCount));
- fprintf(f, "No Connect: %d. Higher: %d. n Detail: ", noconnectioncount, higherlayercount);
- for(j = 0; j < OBSERVE_LAYER; j++)
- {
- if(layerarray[j] != 0)
- fprintf(f, "%d: %d(%.1f), ", j, layerarray[j ], pChannel->numclient==0?0:(layerarray[j]/(double)pChannel->numclient));
- }
- fprintf(f, "n");
- }
- }
- fprintf(f,"ChannelCount: %ld n",ts_ChannelCount);
- fprintf(f, "n ********************END SnapShot.nn");
- fclose(f);
- logto_xml (ts_ChannelCount, total, totalstay);
- }
- #if 0
- int memlog(char *pwd, char *cmd)
- {
- int pid = fork();
- if (pid == 0)
- {
- char buffer[10];
- daemon(0,1);
- snprintf(buffer,10,"%s",cmd);
- chdir(pwd);
- execlp("/bin/sh", "sh" ,"-c",buffer,NULL);
- }
- else if(pid < 0)
- {
- perror("fork error!!!");
- return -1;
- }
- return 0;
- }
- #endif
- #ifdef SORT_NET
- int compareNet (const void *a, const void *b)
- {
- struct networks *p = (struct networks *)a;
- struct networks *q = (struct networks *)b;
- if (p->host > q->host) return 1;
- else if (p->host < q->host) return -1;
- return 0;
- }
- // This is implemented with recursion
- struct networks *getnetwork (unsigned int host, struct networks *head, int n)
- {
- int i = n/2-1;
- unsigned int net = host & head[i].mask;
- if (n <= 0) return NULL;
- if (n == 1)
- {
- if ((host & head[0].mask) != (head[0].host & head[0].mask))
- return NULL;
- else return &(head[0]);
- }
- if (net < (head[i].host & head[i].mask)) return getnetwork (host, head, i);
- else if (net > (head[i].host & head[i].mask)) return getnetwork (host, head+i+1, n-i-1);
- else return &(head[i]);
- }
- //192.168.0.0/16/1
- //host/mask/netid
- int readNETBLOCK (char *fname)
- {
- int i = 0;
- char buffer[MAX_LINE],*p, *q;
- struct in_addr inp;
- FILE *f = fopen (fname, "r");
- if (!f) return -1;
- while (fgets (buffer, sizeof (buffer), f))
- {
- p = index (buffer, '/');
- q = rindex (buffer, '/');
- if (p == q || (!p) || (!q)) continue;
- *p = 0;
- *q = 0;
- if (inet_aton (buffer, &inp) == 0) continue;
- NETBLOCKS[i].host = ntohl (inp.s_addr);
- NETBLOCKS[i].mask = MASKS[atoi(p+1)];
- NETBLOCKS[i].net = atoi (q+1);
- i++;
- if (i >= MAX_NET) break;
- }
- fclose (f);
- qsort (NETBLOCKS, i, sizeof (struct networks), compareNet);
- return i;
- }
- int Net = 0;
- int Layer = 0;
- int compareSession (const void *a, const void *b)
- {
- struct Session *p = *(struct Session **)a;
- struct Session *q = *(struct Session **)b;
- if (p->u.p.p.layer == q->u.p.p.layer)
- {
- if (p->net == Net) return -1;
- else if (q->net == Net) return 1;
- else if (p->net > q->net) return 1;
- else return -1;
- } else if (p->u.p.p.layer <= Layer)
- return -1;
- else if (q->u.p.p.layer <= Layer)
- return 1;
- else return (p->u.p.p.layer - q->u.p.p.layer);
- }
- #endif
- int init_udpserver (struct SessionCluster *c, char *host, int *port, unsigned int max, int numbind)
- {
- int i;
- struct Session *head;
- // c->port = port;
- c->cur = 0; // current count
- c->max = max; // maximum count of sessions
- c->maxbuf = 0;
- if (max > 0)
- {
- c->head = (struct Session*)calloc (sizeof (struct Session), max); // allocate continuous session pool space
- c->hash = (struct Session**)calloc (sizeof (struct Session *), max); // allocate session hash table
- head = c->head;
- // chain the sessions in session pool as a freelist
- for (i=0; i<max-1; i++)
- {
- head[i].hnext = &(head[i+1]);
- }
- // initialize hash table
- for (i=1; i<max; i++) c->hash[i] = 0;
- // this is really ugly!
- c->hash[0] = &(head[0]); // c->hash[0] points to the empty list
- } // HOW if max <= 0 ? return with error
- for (i=0; i<MAX_BIND && i<numbind; i++)
- {
- switch (BINDALL) // means 'BIND ANY HOST'
- {
- case 0:
- if ((c->sock[i] = init_udp (host, port[i])) < 0)
- return -1;
- break;
- default:
- if ((c->sock[i] = init_udp (NULL, port[i])) < 0)
- return -1;
- break;
- }
- // highsock indicates the largest active fd. used in select() Used to speed up search
- if (c->sock[i] > highsock) highsock = c->sock[i];
- FD_SET(c->sock[i], &osocks);
- }
- return 0;
- }
- int init_ts ()
- {
- struct rlimit rl;
- #ifdef DEBUG
- system ("ulimit -a");
- if (getrlimit (RLIMIT_CORE, &rl) != 0)
- {
- perror ("getrlimit");
- return -1;
- }
- fprintf (stderr, "Get core limit %d:%dn", (int)rl.rlim_cur, (int)rl.rlim_max);
- rl.rlim_cur = rl.rlim_max = (rlim_t )10240000;
- if (setrlimit (RLIMIT_CORE, &rl) != 0)
- {
- perror ("getrlimit");
- return -1;
- }
- if (getrlimit (RLIMIT_CORE, &rl) != 0)
- {
- perror ("getrlimit");
- return -1;
- }
- fprintf (stderr, "Set core limit to %d:%dn", (int)rl.rlim_cur, (int)rl.rlim_max);
- system ("ulimit -a");
- #endif
- if (init_udpserver (&NPTRACKER, LOCALHOST, (int*)cfgTS4NP_PORT.ptr, MAX_NP, cfgTS4NP_PORT.size) < 0
- #ifdef HAVE_RM
- || init_udpserver (&RMTRACKER, LOCALHOST, (int*)cfgTS4RM_PORT.ptr, MAX_RM, cfgTS4RM_PORT.size) < 0
- #endif
- || init_udpserver (&CPTRACKER, LOCALHOST, (int*)cfgTS4CP_PORT.ptr, MAX_CP, cfgTS4CP_PORT.size) < 0)
- {
- return -1;
- }
- #ifdef SORT_NET
- maxNet = readNETBLOCK (NETFN);
- #endif
- statlog = fopen("./stat.log", "a+");// create file "stat.log" automatically if it does not exist.
- if (statlog == NULL) {
- perror ("error opening statistics log file: stat.log.n");
- }
- return 0;
- }
- int main(int argc, char **argv)
- {
- int i, mode = 1;
- // struct itimerval t, ot;
- signal (SIGPIPE, SIG_IGN); // SIGPIPE is raised when the client closes the socket exceptionally
- // if not handled, SIGPIPE would cause unexpected termination.
- // signal (SIGINT, my_exit);
- // argv[1]: daemon mode, not clear
- // argv[2]: output status, not used and not clear
- if (argc > 1)
- {
- mode = atoi (argv[1]);
- if (argc > 2) OUTPUT_STAT = atoi (argv[2]);
- }
- if (mode == 0)
- daemon(1,1); // run in the background
- // read configuration file. just ignored right now.
- // 参数为文件名, 一个struct NamVal *, 以及该struct NamVal的项数
- read_config (CONFIG, ConfigParameters, sizeof(ConfigParameters)/sizeof (struct NamVal));
- readconfig("ip.list");
- for (i=0; i<10; i++) // retry 10 times. not useful here.
- {
- FD_ZERO(&osocks);
- if (init_ts () < 0)
- {
- PDEBUG ("Error in initialization.n");
- exit (1);
- }
- #ifdef HAVE_MYSQL
- if ((local_mysql = init_mysql (MYSQL_HOST, MYSQL_USER, MYSQL_PASS, MYSQL_DB, "/var/run/mysqld/mysqld.sock")) == 0)
- {
- PDEBUG ("Error in init_mysql.n");
- exit (1);
- }
- #endif
- process_child ();
- }
- return 0;
- }
- //===============================================
- //===== Here begin the message process part =====
- //===============================================
- int init_NP (struct Session *p)
- {
- // maxid: maximum session index currently in the list. for optimization of search
- int listnum = p - NPTRACKER.head; // this is the index!
- if (listnum > NPTRACKER.maxid)
- NPTRACKER.maxid = listnum;
- NPTRACKER.cur ++; // cur is in fact the counter of sessions
- if (p->u.p.cur) // if there is an edge, then the client is already in a channel
- PDEBUG ("NP %d in %d enter Channel %.32s(%d clients).n", p-NPTRACKER.head, NPTRACKER.cur, p->u.p.cur->head->name, p->u.p.cur->head->numclient);
- else
- PDEBUG ("NP %d in %d no default channel.n", p-NPTRACKER.head, NPTRACKER.cur);
- return 0;
- }
- int process_NP (int idsock)
- {
- int len, listnum;
- struct Session *p;
- struct TSMessage *m = &UDPMsg;
- #ifdef MEASUREMENT
- //struct timeval tm;
- //long long msec;
- #endif
- socklen_t addr_len = sizeof (UDPCLIENT);
- memset ((char *)&UDPCLIENT, 0, sizeof (UDPCLIENT));
- if ((len = (recvfrom (NPTRACKER.sock[idsock], &UDPMsg, MAX_DATA, 0, (struct sockaddr *)&UDPCLIENT, &addr_len))) < 0)
- {
- perror ("recvfrom:");
- return -1;
- }
- #ifdef MEASUREMENT
- //gettimeofday (&tm, NULL);
- //msec = ((long long)tm.tv_sec) * 1000000l + tm.tv_usec;
- #endif
- PDEBUG ("type %d len %d.", m->type, m->len);
- if (m->type == NP2TS_LOGIN)
- {
- process_NP2TS_LOGIN ((struct Message *)m);
- ++np2tsLoginCount;
- } else
- {
- listnum = m->authcode1; // index of session object
- p = NPTRACKER.head+listnum;
- // check the session: 1. bad index; 2. uninitialized or cleared; 3. not match
- if (listnum >= NPTRACKER.max || p->socket == 0
- || p->auth != m->authcode2)
- {
- if (m->type != NP2TS_LOGOUT)
- SEND_NPMSG(NPTRACKER.sock[idsock],TS2NP_MSG,ERR_INTERNAL,1,&UDPCLIENT);
- return -1;
- }
- switch (m->type)
- {
- case NP2TS_REPORT: // 报告Interval信息,如果refresh为true, 则重置, 否则则先增加后删除.
- process_NP2TS_REPORT (p, m);
- ++np2tsReportCount;
- break;
- case NP2TS_NEED_PEERS:
- PDEBUG("Need peers!!n");
- process_NP2TS_NEED_PEERS (p, m);
- ++np2tsNeedPeerCount;
- break;
- case NP2TS_LOGOUT: // 退出
- closure_NP (p);
- ++np2tsLogoutCount;
- break;
- case NP2TS_RES_LIST: /* 发送当前NP的所有RESOURCE,使用addSession来进行处理,
- 如果还没有这条边, 就添加. */
- process_NP2TS_RES_LIST (p, m);
- ++np2tsResListCount;
- break;
- case NP2TS_REQ_RES: // 添加RES, 并返回Peers
- process_NP2TS_REQ_RES (p, m);
- ++np2tsReqResCount;
- break;
- case NP2TS_DEL_RES: // 删除RES
- process_NP2TS_DEL_RES (p, m);
- ++np2tsDelResCount;
- break;
- case NP2TS_QUERY_RES: //查询RES
- process_NP2TS_QUERY_RES (p, m);
- break;
- case NP2TS_REPORT2:
- process_NP2TS_REPORT2 (p, m);
- break;
- default:
- SEND_NPMSG(NPTRACKER.sock[idsock],TS2NP_MSG,ERR_INTERNAL,1,&UDPCLIENT);
- closure_NP (p);
- break;
- }
- p->last_access = CurTimeSec;
- }
- PDEBUG ("donen");
- #ifdef MEASUREMENT
- //gettimeofday (&tm, NULL);
- //PDEBUG ("msg type %d, len %d: %lld msec.n", m->type, m->len, ((long long)tm.tv_sec) * 1000000l + tm.tv_usec - msec);
- #endif
- return 0;
- }
- void logStat(struct Session *p)
- {
- // 打印记录的时间和客户端的版本号
- fprintf(statlog, "%u %f", time(NULL), p->clientVer);
- fprintf(statlog, "%d:%d | %d:%dn", p->host, p->port, p->intra, p->npport);
- fprintf(statlog, "%ut%ut%ut%ut%ut%ut%ut%ut%ut%fn",
- p->u.p.s.playingBlock - p->u.p.startBlock,
- p->u.p.s.currBufferTime,
- p->u.p.s.bufferCount,
- p->u.p.s.bufferTime,
- p->u.p.s.connFailCount,
- p->u.p.s.inConnections,
- p->u.p.s.outConnections,
- p->u.p.s.avgInConnTime,
- p->u.p.s.avgOutConnTime,
- p->u.p.s.messagePercent);
- fprintf(statlog, "%lldt%lldt%ft%ft%ft%fn",
- p->u.p.t.totalDownBytes,
- p->u.p.t.totalUpBytes,
- p->u.p.t.currDownSpeed,
- p->u.p.t.currUpSpeed,
- p->u.p.t.avgDownSpeed,
- p->u.p.t.avgUpSpeed);
- current_log_count ++;
- if(current_log_count == MAX_LOG_COUNT)
- {
- fflush(statlog);
- current_log_count = 0;
- }
- }
- int closure_NP (struct Session *p)
- {
- int i, id;
- struct Session *q;
- // write statistics to log file
- logStat(p);
- // 1. decrease <maxid> if current session is the last
- if ((i = p - NPTRACKER.head) == NPTRACKER.maxid && i > 0)
- {
- for (i--; NPTRACKER.head[i].socket == 0 && i> 0; i--);
- NPTRACKER.maxid = i;
- }
- // 2. delete corresponding session in the session-channel map
- delSession (p);
- // 3. remove session from the hash table
- id = hash_np (p->host, p->npport);
- if ((q = NPTRACKER.hash[id]) != p) // not head of chain
- {
- for (; q && q->hnext != p; q=q->hnext); // search through the chain for the parent of <p>
- assert (q);
- if (q) q->hnext = p->hnext; // remove
- } else NPTRACKER.hash[id] = p->hnext; // head of chain, got it
- // 4. clear and free session object to the freelist
- memset (p, 0, sizeof (struct Session)); // clear session
- p->hnext = NPTRACKER.hash[0];
- NPTRACKER.hash[0] = p;
- Polluted ++;
- NPTRACKER.cur --;
- return 0;
- }
- int init_CP (struct Session *p)
- {
- const char* servicetype;
- servicetype = find_cp_service_type(p->host);
- if(servicetype == NULL)
- servicetype = "UNKNOWN";
- strcpy(p->u.cp.servicetype, servicetype);
- PDEBUG("n******************************************************************ninit_CP: cp service type is %sn", servicetype);
- int listnum = p - CPTRACKER.head;
- if (listnum > CPTRACKER.maxid)
- CPTRACKER.maxid = listnum;
- CPTRACKER.cur ++;
- GCPCHOICE = p;
- // add_cp_to_list((void*)p);
- return 0;
- }
- int process_CP (int idsock)
- {
- int len, listnum;
- struct Session *p;
- struct TSMessage *m = &UDPMsg;
- #ifdef MEASUREMENT
- struct timeval tm;
- long long msec;
- #endif
- socklen_t addr_len = sizeof (UDPCLIENT);
- memset ((char *)&UDPCLIENT, 0, sizeof (UDPCLIENT));
- if ((len = (recvfrom (CPTRACKER.sock[idsock], &UDPMsg, MAX_DATA, 0, (struct sockaddr *)&UDPCLIENT, &addr_len))) < 0)
- {
- perror ("recvfrom:");
- return -1;
- }
- #ifdef MEASUREMENT
- gettimeofday (&tm, NULL);
- msec = ((long long)tm.tv_sec) * 1000000l + tm.tv_usec;
- #endif
- if (m->type == CP2TS_REGISTER)
- {
- process_CP2TS_REGISTER ((struct Message *)m);
- } else
- {
- listnum = m->authcode1;
- p = CPTRACKER.head+listnum;
- if (listnum >= CPTRACKER.max || p->socket == 0
- || p->auth != m->authcode2)
- {
- if (m->type != CP2TS_LOGOUT)
- {
- PDEBUG("CP error. listnum=%d/%d. socket=%d auth=%d/%dn", listnum, CPTRACKER.max, p->socket, p->auth, m->authcode2);
- SEND_NPMSG(CPTRACKER.sock[idsock],TS2CP_MSG,ERR_INTERNAL,1,&UDPCLIENT);
- }
- return -1;
- }
- switch (m->type)
- {
- case CP2TS_NEED_PEERS: // ECP查询用, 目前尚未使用
- process_CP2TS_NEED_PEERS (p, m);
- break;
- case CP2TS_UPDATE: // 报告CP负载
- process_CP2TS_UPDATE (p, m);
- break;
- case CP2TS_LOGOUT:
- closure_CP (p);
- break;
- default:
- closure_CP (p);
- break;
- }
- p->last_access = CurTimeSec;
- }
- return 0;
- }
- int closure_CP (struct Session *p)
- {
- int i, id;
- struct Session *q;
- //PDEBUG("closure_CP.n");
- //remove_cp_from_list((void*)p);
- //PDEBUG("closure_CP OK.n");
- if ((i = p - CPTRACKER.head) == CPTRACKER.maxid && i > 0)
- {
- for (i--; CPTRACKER.head[i].socket == 0 && i> 0; i--);
- CPTRACKER.maxid = i;
- }
- if (GCPCHOICE == p)
- {
- for (i=CPTRACKER.maxid; i>=0; i--)
- {
- if (CPTRACKER.head[i].socket != 0 && CPTRACKER.head[i].u.cp.type == CT_GENERAL)
- {
- GCPCHOICE = &(CPTRACKER.head[i]);
- break;
- }
- }
- }
- id = hash_cp (p->host, p->npport);
- if ((q = CPTRACKER.hash[id]) != p)
- {
- for (; q && q->hnext != p; q=q->hnext);
- assert (q);
- if (q) q->hnext = p->hnext;
- } else CPTRACKER.hash[id] = p->hnext;
- memset (p, 0, sizeof (struct Session));
- p->hnext = CPTRACKER.hash[0];
- CPTRACKER.hash[0] = p;
- Polluted ++;
- CPTRACKER.cur --;
- return 0;
- }
- #ifdef HAVE_RM
- int getChannelInfo (char *md5, char **buf)
- {
- struct Channel *pc;
- int i, total=0;
- if (strcmp (md5, "*") == 0)
- {
- for (i=0; i<MAX_CHANNEL; i++)
- {
- for (pc=ChannelHash[i]; pc; pc=pc->next)
- {
- memcpy (*buf, pc->name, MD5_LEN);
- *buf += MD5_LEN;
- *(int *)(*buf) = pc->numclient;
- *buf += sizeof (int);
- total ++;
- }
- }
- return total;
- }
- if ((pc=findChannel (md5, MD5_LEN)) != NULL)
- {
- memcpy (*buf, md5, MD5_LEN);
- *buf += MD5_LEN;
- *(int *)(*buf) = pc->numclient;
- *buf += sizeof (int);
- return 1;
- }
- for (i=0; i<MAX_CHANNEL; i++)
- {
- for (pc=ChannelHash[i]; pc; pc=pc->next)
- {
- if (strstr (pc->name, md5) != NULL)
- {
- memcpy (*buf, pc->name, MD5_LEN);
- *buf += MD5_LEN;
- *(int *)(*buf) = pc->numclient;
- *buf += sizeof (int);
- total ++;
- }
- }
- }
- return total;
- }
- int init_RM (struct Session *p)
- {
- int listnum = p - RMTRACKER.head;
- if (listnum > RMTRACKER.maxid)
- RMTRACKER.maxid = listnum;
- RMTRACKER.cur ++;
- return 0;
- }
- #define RM2TS_STAT_QUERY 0x20
- #define TS2RM_STAT_RESPONSE 0x30
- int process_RM (int idsock)
- {
- char buffer[MAX_DATA];
- char *p, *buf = buffer;
- int * psize;
- int querynum;
- int len, total, i;
- struct Message Msg, *m=&Msg;
- #ifdef MEASUREMENT
- struct timeval tm;
- long long msec;
- #endif
- socklen_t addr_len = sizeof (UDPCLIENT);
- memset ((char *)&UDPCLIENT, 0, sizeof (UDPCLIENT));
- if ((len = (recvfrom (RMTRACKER.sock[idsock], m, MAX_DATA, 0, (struct sockaddr *)&UDPCLIENT, &addr_len))) < 0)
- {
- perror ("recvfrom:");
- return -1;
- }
- #ifdef MEASUREMENT
- gettimeofday (&tm, NULL);
- msec = ((long long)tm.tv_sec) * 1000000l + tm.tv_usec;
- #endif
- PDEBUG("got RM msg, len %d. n", len);
- switch (m->type)
- {
- case RM2TS_STAT_QUERY:
- querynum = *(int*)m->buffer;
- if(querynum > 100)
- break;
- if(querynum > 0)
- {
- buf += sizeof (int);
- *(unsigned char *)buf = TS2RM_STAT_RESPONSE;
- buf += sizeof (char);
- psize = (int*)buf;
- buf += sizeof (int);
- total = 0;
- p = m->buffer + sizeof(int);
- for(i = 0; i < querynum; ++i)
- {
- PDEBUG("query %s. n", p);
- total += getChannelInfo(p, &buf);
- p += MD5_LEN;
- }
- *(int *)buffer = buf - buffer;
- *psize = total;
- if(*psize > 0)
- sendMessage(RMTRACKER.sock[idsock], buffer, &UDPCLIENT);
- }
- break;
- default:
- break;
- }
- #ifdef MEASUREMENT
- gettimeofday (&tm, NULL);
- PDEBUG ("msg type %d, len %d: %lld msec.n", m->type, m->len, ((long long)tm.tv_sec) * 1000000l + tm.tv_usec - msec);
- #endif
- return 0;
- }
- int closure_RM (struct Session *p)
- {
- int i;
- if ((i = p - RMTRACKER.head) == RMTRACKER.maxid && i > 0)
- {
- for (i--; RMTRACKER.head[i].socket == 0 && i> 0; i--);
- RMTRACKER.maxid = i;
- }
- return 0;
- }
- #endif
- //------------------------------------------------
- //- Here begin the specific message process part -
- //------------------------------------------------
- // | login id(UINT32) | md5 password(MD5_LEN) |
- // | version of client(float) | listening port(USHORT) |
- // | size of local ip list(UINT8) | first ip addr(in_addr) |... |
- /* NP向TS登录, 按照来源IP地址和所报告的npport进行hash, 如果距离上次
- 发送NP2TS_LOGIN消息的时间小于SILENCE_TIME, 则直接返回,否则发送WELCOME消息. */
- int process_NP2TS_LOGIN (struct Message *m)
- {
- struct Session *p;
- // struct ChannelInfo tmpch;
- char md5[MD5_LEN+1];
- unsigned int host, myhost, intra;
- unsigned short port, npport;
- int i,id, userID;
- float clientVer;
- unsigned int num;
- char *buf;
- struct P2PAddress *addr;
- buf = m->buffer;
- userID = *(int *)buf;
- buf += sizeof (int);
- memcpy (md5, buf, MD5_LEN);
- md5[MD5_LEN] = 0;
- buf += MD5_LEN;
- #ifdef HAVE_MYSQL
- if (authUser (userID, md5, local_mysql, NULL) == 0)
- {
- SEND_NPMSG(NPTRACKER.sock[CurrentSock],TS2NP_MSG,ERR_INTERNAL,1,&UDPCLIENT);
- closure_NP (p);
- return -1;
- }
- #endif
- // check protocol version
- clientVer = *(float*)buf;
- buf += sizeof (float);
- if(clientVer < MIN_CLIENT_VERSION) {
- SEND_NPMSG(NPTRACKER.sock[CurrentSock], TS2NP_MSG, ERR_LOW_VERSION, 1, &UDPCLIENT);
- return -1;
- }
- npport = ntohs (*(unsigned short *)buf);
- buf += sizeof (short);
- host = ntohl (UDPCLIENT.sin_addr.s_addr);
- port = ntohs (UDPCLIENT.sin_port);
- // find client session in the session cluster (hash table)
- id = hash_np (host, npport);
- for (p=NPTRACKER.hash[id]; p; p=p->hnext)
- if (p->host == host && p->port == port && p->npport == npport)
- break;
- if (!p) // not found, allocate and add new session object
- {
- // error adding session: TS full or uninitialized
- if (NPTRACKER.cur >= NPTRACKER.max || NPTRACKER.hash[0] == 0)
- {
- SEND_NPMSG(NPTRACKER.sock[CurrentSock],TS2NP_MSG,ERR_INTERNAL,1,&UDPCLIENT);
- return -1;
- }
- // allocate session object from the freelist indicated by hash[0]
- // allocate the FIRST free object in hash[0] and insert it into HEAD of corresponding bucket
- p = NPTRACKER.hash[0];
- NPTRACKER.hash[0] = p->hnext;
- p->hnext = NPTRACKER.hash[id];
- NPTRACKER.hash[id] = p;
- // fill in the object
- p->socket = NPTRACKER.sock[CurrentSock];
- p->type = TYPE_NP;
- p->port = port;
- p->npport = npport;
- p->host = host;
- p->clientVer = clientVer;
- #ifdef SORT_NET
- {
- struct networks *pnetworks = getnetwork (host, NETBLOCKS, maxNet);
- if (pnetworks) p->net = pnetworks->net;
- else p->net = 0;
- }
- #endif
- p->time_sec = CurTimeSec;
- // process the ip list
- num = *(unsigned char *)buf; // size of local ip list
- buf += sizeof (char);
- intra = 0;
- for (i=0; i<num; i++)
- {
- myhost = ntohl (*(unsigned int *)buf);
- buf += sizeof (int);
- if (intra == 0xffffffff)
- continue; // Must use continue to modify buf to right place
- if ((myhost >> 16) == 0xc0a8) // 0xc0a8 == 192.168
- {
- intra = myhost;
- if(host == intra)
- intra = 0xffffffff;
- }
- else if (myhost == host)
- intra = 0xffffffff;
- }
- p->intra = intra;
- p->auth = random ();
- //init statistics
- p->u.p.startBlock = -1;
- init_NP (p);
- } else if (CurTimeSec < p->last_access + SILENCE_TIME)
- return 0; // still active, do NOTHING
- if (buf - m->buffer + NORMAL_HEADER > m->len)
- {
- PDEBUG ("Invalid message NP2TS_LOGIN, length %d not enough.n", m->len);
- return -1;
- }
- // Time to send back welcome message to NP
- UDPMsg.type = TS2NP_WELCOME;
- UDPMsg.len = 12+sizeof(struct P2PAddress);
- UDPMsg.authcode1 = p-NPTRACKER.head;
- UDPMsg.authcode2 = p->auth;
- addr = (struct P2PAddress *)(UDPMsg.buffer);
- addr->outerIP.sin_family = addr->subnetIP.sin_family = AF_INET;
- addr->outerIP.sin_addr.s_addr = htonl (p->host);
- addr->subnetIP.sin_addr.s_addr = htonl (p->intra);
- addr->outerIP.sin_port = ntohs (port);
- addr->subnetIP.sin_port = ntohs (npport);
- ++ts2npWelcomeCount;
- if (sendMessage(p->socket, (char *)&UDPMsg, &UDPCLIENT) < 0)
- {
- closure_NP (p);
- return -1;
- }
- // process_NEED_PEERS_real (p, NULL, 1, 0, 0);
- return 0;
- }
- // |---CHECK DIGITS(7 BYTEs)---|res count(UINT8)|RESOURCE MD5(MD5_LEN)|...|
- int process_NP2TS_QUERY_RES (struct Session *p, struct TSMessage *m)
- {
- int i;
- char *buf = m->buffer;
- char buffer[MAX_DATA];
- char *resultMsg, *prescount;
- struct Channel *pc;
- unsigned char num = *(unsigned char *)buf; //# of queried resources
- buf += sizeof (char);
- resultMsg = buffer+sizeof(int);
- *(unsigned char *)resultMsg = TS2NP_RESINFO;
- resultMsg += sizeof (char);
- prescount = resultMsg;
- *(unsigned char *)prescount = 0; // counter of resources
- for (i=0; i<num; i++)
- {
- if ((pc = findChannel (buf, MD5_LEN)) != NULL)
- {
- (*(unsigned char *)prescount) ++;
- memcpy (resultMsg, buf, MD5_LEN);
- resultMsg += MD5_LEN;
- *(unsigned short *)resultMsg = pc->numclient;
- resultMsg += sizeof (short);
- }
- buf += MD5_LEN;
- }
- if (buf - m->buffer + AUTH_HEADER > m->len)
- {
- PDEBUG ("Invalid message NP2TS_LOGIN, length %d not enough.n", m->len);
- closure_NP (p);
- return -1;
- }
- *(unsigned int *)buffer = resultMsg - buffer; // The length of the message
- if (sendMessage(p->socket,buffer,&UDPCLIENT) < 0)
- {
- // closure_NP (p);
- return -1;
- }
- return 0;
- }
- // new version of REPORT with TransferInfo, ignoring TransferInfo
- // | ---CHECK DIGITS(7 BYTEs)--- |
- // | info of peer(CorePeerInfo) | refresh(bool) |
- // | Interval count(UINT8) | first BlockInterval | ... |
- // | transfer Info(TransferInfo) |
- int process_NP2TS_REPORT (struct Session *p, struct TSMessage *m)
- {
- unsigned char type;
- char *buf = m->buffer;
- // 1. extract CorePeerInfo
- memcpy (&(p->u.p.p), buf, sizeof (struct CorePeerInfo));
- buf += sizeof (struct CorePeerInfo);
- if (p->u.p.cur)
- {
- type = *(unsigned char *)buf;
- buf += sizeof (char);
- if (type) // refresh==true
- {
- p->u.p.cur->numinter = *(unsigned char *)buf;
- PDEBUG ("Set %d intervalsn", p->u.p.cur->numinter);
- buf += sizeof (char);
- if (p->u.p.cur->numinter > 0 && p->u.p.cur->numinter < MAX_INTERVAL)
- memcpy (p->u.p.cur->inter, buf, p->u.p.cur->numinter*sizeof (struct Interval));
- else p->u.p.cur->numinter = 0;
- } else // refresh==false incremental update
- {
- type = *(unsigned char *)buf; // # of newly added intervals
- buf += sizeof (char);
- PDEBUG ("Add %d intervals,", type);
- if (type > 0) // merge & delete are not good name!
- {
- p->u.p.cur->numinter = merge (p->u.p.cur->inter, p->u.p.cur->numinter, (struct Interval *)buf, type);
- buf += type*sizeof(struct Interval);
- }
- type = *(unsigned char *)buf;
- buf += sizeof (char);
- if (type > 0)
- {
- p->u.p.cur->numinter = delete_interval (p->u.p.cur->inter, p->u.p.cur->numinter, (struct Interval *)buf, type);
- buf += type*sizeof(struct Interval);
- }
- PDEBUG ("del %d intervals, now is %d.n", type, p->u.p.cur->numinter);
- }
- }
- // ignore TransferInfo, but still count the size for integrity check
- // --DELETED-- copy transferinfo of client
- // --DELETED-- memcpy (&(p->u.p.t), buf, sizeof (struct TransferInfo));
- buf += sizeof(struct TransferInfo);
- if (buf - m->buffer + AUTH_HEADER > m->len)
- {
- PDEBUG ("Invalid message NP2TS_LOGIN, length %d not enough.n", m->len);
- closure_NP (p);
- return -1;
- }
- return 0;
- }
- int process_NP2TS_REPORT2 (struct Session *p, struct TSMessage *m)
- {
- char *buf = m->buffer;
- // TODO:
- // extract current playing block and count them
- memcpy (&(p->u.p.s), buf, sizeof (struct StatInfo));
- buf += sizeof (struct StatInfo);
- memcpy (&(p->u.p.t), buf, sizeof (struct TransferInfo));
- buf += sizeof (struct TransferInfo);
- // check if this is the first report and record starting block of NP
- if ( p->u.p.startBlock == -1 ) {
- p->u.p.startBlock = p->u.p.s.playingBlock;
- }
- if (buf - m->buffer + AUTH_HEADER > m->len)
- {
- PDEBUG ("Invalid message NP2TS_LOGIN, length %d not enough.n", m->len);
- closure_NP (p);
- return -1;
- }
- return 0;
- }
- // |---CHECK DIGITS(7 BYTEs)---|
- // |resource list size(UINT8)|
- // |RESOURCE MD5(MD5_LEN)|Interval count(UINT8)|first BlockInterval|...|
- int process_NP2TS_RES_LIST (struct Session *p, struct TSMessage *m)
- {
- unsigned int num, i;
- struct ChannelInfo c;
- char *buf = m->buffer;
- // unsigned char needcp;
- num = *(unsigned char *)buf;
- buf += sizeof (char);
- for (i=0; i<num; i++)
- {
- memcpy (c.md5, buf, MD5_LEN);
- c.md5[MD5_LEN] = 0;
- buf += MD5_LEN;
- c.numinter = *(unsigned char *)buf;
- buf += sizeof (char);
- if (c.numinter > MAX_INTERVAL) return -1;
- memcpy (c.inter, buf, c.numinter*sizeof(struct Interval));
- buf += sizeof(struct Interval) * c.numinter;
- addSession (p, &c, 1, 0);
- }
- if (buf - m->buffer + AUTH_HEADER > m->len)
- {
- PDEBUG ("Invalid message NP2TS_LOGIN, length %d not enough.n", m->len);
- closure_NP (p);
- return -1;
- }
- return 0;
- }
- // |---CHECK DIGITS(7 BYTEs)---|
- // |RESOURCE MD5(MD5_LEN)|Interval count(UINT8)|first BlockInterval|...|
- // |current block(UINT32)|need CP(bool)|
- int process_NP2TS_REQ_RES (struct Session *p, struct TSMessage *m)
- {
- struct ChannelInfo c;
- unsigned int cur;
- char *buf = m->buffer;
- unsigned char needcp;
- memcpy (c.md5, buf, MD5_LEN);
- c.md5[MD5_LEN] = 0;
- buf += MD5_LEN;
- c.numinter = *(unsigned char *)buf;
- buf += sizeof (char);
- if (c.numinter > MAX_INTERVAL) return -1;
- memcpy (c.inter, buf, c.numinter*sizeof(struct Interval));
- buf += sizeof(struct Interval) * c.numinter;
- cur = *(unsigned int *)buf;
- buf += sizeof (int);
- needcp = *(unsigned char *)buf;
- buf += sizeof (char);
- if (buf - m->buffer + AUTH_HEADER > m->len)
- {
- PDEBUG ("Invalid message NP2TS_LOGIN, length %d not enough.n", m->len);
- closure_NP (p);
- return -1;
- }
- addSession (p, &c, 1, cur);
- process_NEED_PEERS_real (p, c.md5, needcp, cur, 0);
- SEND_NPMSG(NPTRACKER.sock[CurrentSock],TS2NP_MSG,ERR_ADD_RES_OK,0,&UDPCLIENT);
- return 0;
- }
- // NP has quit from one channel
- // |---CHECK DIGITS(7 BYTEs)---|RESOURCE MD5(MD5_LEN)|
- int process_NP2TS_DEL_RES (struct Session *p, struct TSMessage *m)
- {
- struct Edge *pedge;
- struct Channel *pc;
- char *buf = m->buffer;
- if ((pc=findChannel (buf, MD5_LEN)) == NULL)
- return -1;
- buf += MD5_LEN;
- if (buf - m->buffer + AUTH_HEADER > m->len)
- {
- PDEBUG ("Invalid message NP2TS_LOGIN, length %d not enough.n", m->len);
- closure_NP (p);
- return -1;
- }
- for (pedge=p->u.p.header; pedge && pedge->head != pc; pedge=pedge->enext);
- if (pedge) delEdge (pedge);
- return 0;
- }
- int process_NEED_PEERS_real (struct Session *p, char *md5, int needcp, unsigned int cur, unsigned char layer)
- {
- char buffer[MAX_DATA], buffer1[MAX_DATA]; /* buffer is used to hold TS2CP/NP_PEERS message,
- buffer1 is used to hold TS2NP_CONNECT_TO message */
- int num, numcp=0, numnp, conn = p->socket;
- char *buf, *psize;
- struct Channel *pc;
- struct P2PAddress *addr;
- PDEBUG ("NEED_PEERS type %d needcp %d cur %dn", p->type, needcp, cur);
- buf = buffer+sizeof (int);
- if (p->type == TYPE_CP)
- *(unsigned char *)buf = TS2CP_PEERS;
- else
- *(unsigned char *)buf = TS2NP_PEERS;
- buf += sizeof (char);
- ++ts2npPeersCount;
- if (md5) // md5 of the channel
- {
- pc = findChannel (md5, MD5_LEN);
- if (p->type == TYPE_CP)
- {
- memcpy (buf, md5, MD5_LEN);
- buf += MD5_LEN;
- }
- } else if (p->u.p.cur) // Session->NPInfo->CurrentEdge
- pc = p->u.p.cur->head;
- else
- pc = NULL;
- if (pc == NULL)
- {
- PDEBUG("no channeln");
- return -1;
- }
- num = MAX_PEER;
- if (needcp)
- {
- psize = buf;
- buf += sizeof (char);
- if (p->type == TYPE_CP)
- *(unsigned char *)psize = numcp = findCPPeers (0, pc->name, &buf);
- else
- //*(unsigned char *)psize = numcp = findCPPeers (ntohs (p->host & 0xffff0000), pc->name, &buf);
- *(unsigned char *)psize = numcp = findCPPeers (p->host, pc->name, &buf);
- } else
- {
- *(unsigned char *)buf = 0;
- buf += sizeof (char);
- }
- psize = buf;
- buf += sizeof (char);
- if (p->intra == 0xffffffff) //NP is in the public network, ask NP to contact other NPs
- {
- // |len(INT32) | type(INT8)|target addr(P2PAddress)|connect for free(bool)|
- *(int *)buffer1 = sizeof(struct P2PAddress) + sizeof (int) + 2*sizeof (char);
- *(unsigned char *)(buffer1+sizeof (int)) = TS2NP_CONNECT_TO;
- addr = (struct P2PAddress *)(buffer1+sizeof(int)+sizeof(char));
- addr->outerIP.sin_family = PF_INET;
- addr->subnetIP.sin_family = PF_INET;
- addr->outerIP.sin_port = htons (p->port);
- addr->outerIP.sin_addr.s_addr = htonl (p->host);
- addr->subnetIP.sin_port = htons (p->npport);
- addr->subnetIP.sin_addr.s_addr = htonl (p->intra);
- *(unsigned char *)(addr + 1) = 0; // connect for free(bool)
- PDEBUG("Send ConnectTo %sn", inet_ntoa(addr->outerIP.sin_addr));
- numnp = findNPPeers (pc, p, cur, num, &buf, buffer1);
- } else
- {
- numnp = findNPPeers (pc, p, cur, num, &buf, NULL);
- }
- *psize = numnp; // set # of NP
- *(int *)buffer = buf - buffer;
- if (sendMessage(conn,buffer,&UDPCLIENT) < 0)
- {
- // closure_NP (p);
- PDEBUG("send msg errn");
- return -1;
- }
- PDEBUG ("find %d NP and %d CPn", numnp, numcp);
- return numnp+numcp;
- }
- /* 查询Peer信息, 使用findCPPeer寻找合适的CP, 使用findNPPeers寻找合适的NP.
- NP寻找时, 找到结果后按照networks来排序,保证在同一个网络中的排在前面. */
- int process_NP2TS_NEED_PEERS (struct Session *p, struct TSMessage *m)
- {
- char *buf = m->buffer;
- int needcp;
- needcp = *(unsigned char *)buf;
- buf += sizeof (char);
- if(p->u.p.cur == NULL)
- {
- PDEBUG("no current in NPInfon");
- return -1;
- }
- p->u.p.cur->current = *(unsigned int *)buf;
- buf += sizeof (int);
- p->u.p.p.layer= *(unsigned char *)buf;
- buf += sizeof (char);
- if (buf - m->buffer + AUTH_HEADER > m->len)
- {
- PDEBUG ("Invalid message NP2TS_LOGIN, length %d not enough.n", m->len);
- closure_NP (p);
- return -1;
- }
- return process_NEED_PEERS_real (p, NULL, needcp, p->u.p.cur->current, p->u.p.p.layer);
- }
- int findCPPeers (unsigned long host, char *md5, char **buffer)
- {
- // float minband = -1;
- struct Session* p = NULL, *choice1 = NULL, *choice2 = NULL;
- struct NormalAddress *addr1;
- struct NormalAddress *addr2;
- int i;
- // We should start with a random address
- p = CPTRACKER.head + rand() % (CPTRACKER.maxid+1);
- //use pure random address instead of comparing the band
- PDEBUG("In findCPPeers.n");
- int minpriority = -1;
- for( i=0; i <= CPTRACKER.maxid && host != 0; i++, p++)
- {
- PDEBUG("begin to call findcppeers.n");
- if( p-CPTRACKER.head > CPTRACKER.maxid ) // round back to head
- {
- p = CPTRACKER.head;
- }
- if(p->u.cp.maxConn == 1 || p->socket == 0)
- {
- PDEBUG("Invalid CP.n");
- continue;
- }
- int priority = findcppeers(host, (void*)p);
- //如果不是对应的CP,查找下一个
- if(priority == -1)
- {
- PDEBUG("not found.n");
- continue;
- }
- PDEBUG("found: host : %d servicetype: %s.n", host, p->u.cp.servicetype);
- if(minpriority == -1)
- minpriority = priority;
- if(priority == 1)// priority - 1~n
- {
- if(choice1 == NULL)
- {
- choice1 = p;
- minpriority = -1;
- }
- else
- {
- choice2 = p;
- break;
- }
- }
- else
- {
- if(priority < minpriority)
- {
- if(choice1 == NULL)
- {
- choice1 = p;
- minpriority = -1;
- }
- else
- {
- choice2 = p;
- break;
- }
- }
- }
- }
- if(i > CPTRACKER.maxid)//find nothing via findcppeers, return CP directly
- {
- PDEBUG("begin to find sequencely.n");
- for( i=0; i <= CPTRACKER.maxid; i++, p++)
- {
- if( p-CPTRACKER.head > CPTRACKER.maxid ) // round back to head
- {
- p = CPTRACKER.head;
- }
- if(p->u.cp.maxConn == 1 || p->socket == 0)
- continue;
- // 始终保证choice1比choice2先获得值,如果两个都有了就break
- if(choice1 == NULL)
- {
- choice1 = p;
- }
- else if(p->host == choice1->host)// 避免返回两个相同的CP
- continue;
- else if(choice2 == NULL)
- {
- choice2 = p;
- }
- else
- break;
- }
- }
- int found_cp_count = 0;
- if (choice1)
- {
- addr1 = (struct NormalAddress *)*buffer;
- addr1->sin_family = PF_INET;
- addr1->sin_port = htons (choice1->npport);
- addr1->sin_addr.s_addr = htonl (choice1->host);
- *buffer += sizeof (*addr1);
- found_cp_count ++; // one CP is found
- }
- if(choice2)
- {
- addr2 = (struct NormalAddress *)*buffer;
- addr2->sin_family = PF_INET;
- addr2->sin_port = htons (choice2->npport);
- addr2->sin_addr.s_addr = htonl (choice2->host);
- *buffer += sizeof (*addr2);
- found_cp_count ++;
- }
- return found_cp_count;// 返回最终找到的CP的个数
- }
- #ifndef SORT_NET
- int findNPPeers (struct Channel *pc, struct Session *me, int playing, int num, char **buffer, char *buffer1)
- {
- struct Edge *pedge = NULL;
- struct Session *ps;
- struct P2PAddress addr;
- int j, k;
- unsigned int randstart;
- struct sockaddr_in client;
- if (pc == NULL || pc->numclient <= 0) return 0;
- if (me->cachepeer != NULL && me->cachepeer->u.p.header != NULL && me->type == TYPE_NP)
- {
- for (pedge=me->cachepeer->u.p.header; pedge && pedge->head != pc; pedge=pedge->enext);
- }
- if (pedge == NULL)
- {
- randstart = rand () % pc->numclient;
- for (pedge=pc->PeerHead; pedge && randstart > 0; pedge=pedge->cnext, randstart--);
- }
- for (j=0,k=0; j<num && k<pc->numclient; pedge=pedge->cnext)
- {
- k++;
- if (pedge == NULL) pedge = pc->PeerHead;
- if ((ps = pedge->me) == me) continue; // exclude myself
- if (CurTimeSec > pedge->me->last_access+60) continue; // don't bother it too often
- if (ps->u.p.p.isMaxIn == 0 && ((me->type == TYPE_CP) || playing == 0xffffffff || check_valid (pedge, playing)))
- {
- if (ps->intra == 0xffffffff || ps->host == me->host) // 0xffffffff means ps is on the public network, ps->host == me->host means in the same private network
- {
- memcpy (*buffer, &(ps->u.p.p), sizeof (struct CorePeerInfo));
- addr = (*buffer) + sizeof (struct CorePeerInfo);
- addr->outerIP.sin_port = htons (ps->port);
- addr->outerIP.sin_addr.s_addr = htonl (ps->host);
- addr->subnetIP.sin_port = htons (ps->npport);
- addr->subnetIP.sin_addr.s_addr = htonl (ps->intra);
- addr->outerIP.sin_family = PF_INET;
- addr->subnetIP.sin_family = PF_INET;
- *buffer = (struct char *)(addr + 1);
- j++;
- } else if (buffer1)
- {
- memset ((char *)&client, 0, sizeof (client));
- client.sin_port = htons (ps->port);
- client.sin_family = AF_INET;
- client.sin_addr.s_addr = htonl (ps->host);
- if (sendMessage(ps->socket,buffer1, &client) < 0)
- {
- closure_NP (ps);
- }
- PDEBUG("Send ConnecTo to %sn", inet_ntoa(client.sin_addr));
- }
- }
- }
- if (pedge != NULL) me->cachepeer = pedge->me;
- return j;
- }
- #else
- int findNPPeers (struct Channel *pc, struct Session *me, int playing, int num, char **buffer, char *buffer1)
- {
- int i, j, k, m, mnetnum;
- unsigned int randstart;
- struct Session *result[MAX_PEER];
- struct Session *ps;
- struct sockaddr_in client;
- struct Edge *pedge;
- struct P2PAddress *addr;
- if (pc == NULL || pc->numclient <= 0) return 0;
- Net = me->net;
- for (j=0,m=0; m<MAX_NET_NUM; m++) { mnetnum = (Net + m) % MAX_NET_NUM;
- k = 0;
- if (pc->nclient_net[mnetnum] <= 0) continue;
- pedge = NULL;
- if (me->cachepeer[mnetnum] != NULL && me->cachepeer[mnetnum]->u.p.header != NULL && me->type == TYPE_NP)
- {
- for (pedge=me->cachepeer[mnetnum]->u.p.header; pedge && pedge->head != pc; pedge=pedge->enext);
- }
- if (pedge == NULL)
- {
- randstart = rand () % pc->nclient_net[mnetnum];
- for (pedge=pc->PeerHead[mnetnum]; pedge && randstart > 0; pedge=pedge->cnext, randstart--);
- }
- for (; j<MAX_PEER && k < pc->nclient_net[mnetnum]; pedge=pedge->cnext)
- {
- k++;
- if (pedge == NULL) pedge = pc->PeerHead[mnetnum];
- if ((ps = pedge->me) == me) continue;
- if (CurTimeSec > pedge->me->last_access+60) continue;
- if (ps->u.p.p.isMaxIn == 0 && ((me->type == TYPE_CP) || playing == 0xffffffff || check_valid (pedge, playing)))
- {
- if (ps->intra == 0xffffffff || ps->host == me->host)
- {
- result[j] = ps;
- j++;
- } else if (buffer1)
- {
- memset ((char *)&client, 0, sizeof (client));
- client.sin_port = htons (ps->port);
- client.sin_family = AF_INET;
- client.sin_addr.s_addr = htonl (ps->host);
- if (sendMessage(ps->socket,buffer1, &client) < 0)
- {
- closure_NP (ps);
- }
- PDEBUG("Send ConnecTo to %sn", inet_ntoa(client.sin_addr));
- }
- }
- }
- if (pedge != NULL) me->cachepeer[mnetnum] = pedge->me;
- }
- if (j > 1)
- qsort (result, j, sizeof (struct Session *), compareSession);
- if (num > 0 && j > num) j = num;
- if (me->type == TYPE_NP)
- PDEBUG ("NP %d find %d NP:", me-NPTRACKER.head, j);
- else
- PDEBUG ("CP %d find %d CP:", me-CPTRACKER.head, j);
- for (i=0; i<j; i++)
- {
- ps = result[i];
- PDEBUG ("%dt", ps-NPTRACKER.head);
- memcpy (*buffer, &(ps->u.p.p), sizeof (struct CorePeerInfo));
- addr = (struct P2PAddress *)((*buffer) + sizeof (struct CorePeerInfo));
- addr->outerIP.sin_port = htons (ps->port);
- addr->outerIP.sin_addr.s_addr = htonl (ps->host);
- addr->subnetIP.sin_port = htons (ps->npport);
- addr->subnetIP.sin_addr.s_addr = htonl (ps->intra);
- addr->outerIP.sin_family = PF_INET;
- addr->subnetIP.sin_family = PF_INET;
- *buffer = (char *)(addr + 1);
- }
- PDEBUG ("n");
- return j;
- }
- #endif
- int process_CP2TS_UPDATE (struct Session *p, struct TSMessage *m)
- {
- char *buf = m->buffer;
- p->u.cp.resnum = *(int *)buf;
- buf += sizeof (int);
- p->u.cp.connnum = *(unsigned short *)buf;
- buf += sizeof (unsigned short);
- p->u.cp.band = *(float *)buf;
- buf += sizeof(float);
- p->u.cp.maxConn = *(unsigned char *)buf;
- buf += sizeof(char);
- if (buf - m->buffer + AUTH_HEADER > m->len)
- {
- PDEBUG ("Invalid message NP2TS_LOGIN, length %d not enough.n", m->len);
- closure_CP (p);
- return -1;
- }
- if (p->u.cp.type == CT_GENERAL &&
- (GCPCHOICE == NULL || p->u.cp.resnum < GCPCHOICE->u.cp.resnum))
- GCPCHOICE = p;
- if (p->u.cp.resnum < 0) p->u.cp.resnum = 0;
- if (p->u.cp.band < 0) p->u.cp.band = 0.001;
- return 0;
- }
- int process_CP2TS_NEED_PEERS (struct Session *p, struct TSMessage *m)
- {
- return process_NEED_PEERS_real (p, m->buffer/*md5*/, 1/*needcp*/, 0/*cur*/, 0/*layer*/);
- }
- /* 登录, CP向TS登录, 按照来源IP地址和所报告的npport进行hash,
- 如果距离上次发送CP2TS_REGISTER消息的时间小于SILENCE_TIME, 则直接返回,否则发送WELCOME消息. */
- int process_CP2TS_REGISTER (struct Message *m)
- {
- char md5[MD5_LEN+1];
- unsigned int host, cur = 0;
- unsigned short port, npport;
- int id, userID;
- char *buf;
- struct Session *p;
- buf = m->buffer;
- userID = *(int *)buf;
- buf += sizeof (int);
- memcpy (md5, buf, MD5_LEN);
- md5[MD5_LEN] = 0;
- buf += MD5_LEN;
- #ifdef HAVE_MYSQL
- if (authUser (userID, md5, local_mysql, NULL) == 0)
- {
- }
- #endif
- npport = ntohs(*(unsigned short *)buf);
- buf += sizeof (short);
- host = ntohl (UDPCLIENT.sin_addr.s_addr);
- port = ntohs (UDPCLIENT.sin_port);
- id = hash_cp (host, npport);
- for (p=CPTRACKER.hash[id]; p; p=p->hnext)
- if (p->host == host && p->port == port && p->npport == npport)
- break;
- if (!p)
- {
- if (CPTRACKER.cur >= CPTRACKER.max || CPTRACKER.hash[0] == 0)
- {
- PDEBUG("CP reg err, wrong cp index in array.n");
- SEND_NPMSG(CPTRACKER.sock[CurrentSock],TS2CP_MSG,ERR_INTERNAL,1,&UDPCLIENT);
- return -1;
- }
- p = CPTRACKER.hash[0];
- CPTRACKER.hash[0] = p->hnext;
- p->hnext = CPTRACKER.hash[id];
- CPTRACKER.hash[id] = p;
- p->socket = CPTRACKER.sock[CurrentSock];
- p->type = TYPE_CP;
- p->port = port;
- p->npport = npport;
- p->host = host;
- #ifdef SORT_NET
- {
- struct networks *pnetworks = getnetwork (host, NETBLOCKS, maxNet);
- if (pnetworks) p->net = pnetworks->net;
- else p->net = 0;
- }
- #endif
- p->time_sec = cur;
- p->u.cp.userid = userID;
- p->u.cp.type = *(unsigned char *)buf;
- buf += sizeof (char);
- if (p->u.cp.type == CT_EDGE)
- {
- p->u.cp.numHeads = *(unsigned short *)buf;
- buf += sizeof (short);
- if (((char *)m) +m->len - buf <= sizeof (p->u.cp.parameter))
- memcpy (p->u.cp.parameter, buf, (char *)m+m->len-buf);
- else
- memcpy (p->u.cp.parameter, buf,sizeof (p->u.cp.parameter));
- if (p->u.cp.numHeads > sizeof(p->u.cp.parameter)/2)
- p->u.cp.numHeads = sizeof (p->u.cp.parameter)/2;
- } else if (p->u.cp.type == CT_SPECIFIED_RES)
- {
- memcpy (p->u.cp.parameter, buf, MD5_LEN);
- buf += MD5_LEN;
- }
- p->auth = random ();
- init_CP (p);
- } else if (CurTimeSec < p->last_access + SILENCE_TIME)
- return 0;
- if (buf - m->buffer + NORMAL_HEADER > m->len)
- {
- PDEBUG ("Invalid message NP2TS_LOGIN, length %d not enough.n", m->len);
- closure_CP (p);
- return -1;
- }
- PDEBUG("CP register. socket=%d listnum=%d auth=%d.n", p->socket, p-CPTRACKER.head, p->auth);
- UDPMsg.len = 12;
- UDPMsg.type=TS2CP_WELCOME;
- UDPMsg.authcode1 = p-CPTRACKER.head;
- UDPMsg.authcode2 = p->auth;
- if (sendMessage(p->socket,(char *)&UDPMsg, &UDPCLIENT) < 0)
- {
- closure_CP (p);
- return -1;
- }
- return 0;
- }
- int compareInter (const void *a, const void *b)
- {
- struct Interval *p = (struct Interval *) a;
- struct Interval *q = (struct Interval *) b;
- if ((p->start >= q->start && p->start+p->len <= q->start + q->len)
- || (q->start >= p->start && q->start+q->len <= p->start + p->len))
- return 0;
- return (p->start - q->start);
- }
- // delete用于从原有的Interval当中去掉新的.
- int delete_interval (struct Interval *head, int total, struct Interval *_new, int num)
- {
- int i,j,k;
- struct Interval tmp[MAX_INTERVAL*2];
- for (i=0,j=0,k=0; i<total && j<num;)
- {
- if (_new[j].start+_new[j].len <= head[i].start)
- j++;
- else if (head[i].start+head[i].len <= _new[j].start)
- {
- tmp[k].start = head[i].start;
- tmp[k].len = head[i].len;
- i++;
- k++;
- } else
- {
- if (_new[j].start <= head[i].start)
- {
- if (_new[j].start+_new[j].len >= head[i].start+head[i].len)
- i++;
- else
- {
- tmp[k].start = _new[j].start+_new[j].len;
- tmp[k].len = head[i].start+head[i].len-tmp[k].start;
- i++;
- if (tmp[k].len > 0) k++;
- }
- } else
- {
- tmp[k].start = head[i].start;
- tmp[k].len = _new[j].start-tmp[k].start;
- i++;
- if (tmp[k].len > 0) k++;
- if (_new[j].start+_new[j].len < head[i].start+head[i].len)
- {
- tmp[k].start = _new[j].start+_new[j].len;
- tmp[k].len = head[i].start+head[i].len-tmp[k].start;
- i++;
- j++;
- if (tmp[k].len > 0) k++;
- }
- }
- }
- }
- if (i<total)
- {
- memcpy (tmp+k, head+i, (total-i)*sizeof(struct Interval));
- k += (total-i);
- }
- if (k > MAX_INTERVAL) k = MAX_INTERVAL;
- if (k>0) memcpy (head, tmp, k*sizeof(struct Interval));
- return k;
- }
- // merge用于将原有的Interval和新的Interval列表合在一起
- int merge (struct Interval *head, int total, struct Interval *_new, int num)
- {
- int i,j,k;
- struct Interval tmp[MAX_INTERVAL*2];
- for (i=0,j=0,k=0; i<total && j<num;)
- {
- if (head[i].start <= _new[j].start)
- {
- tmp[k].start = head[i].start;
- tmp[k].len = head[i].len;
- i++;
- } else
- {
- tmp[k].start = _new[j].start;
- tmp[k].len = _new[j].len;
- j++;
- }
- for (; i<total || j<num;)
- {
- if (i<total && head[i].start <= tmp[k].start+tmp[k].len)
- {
- if (head[i].start+head[i].len > tmp[k].start+tmp[k].len)
- tmp[k].len = head[i].start+head[i].len-tmp[k].start;
- i++;
- } else if (j<num && _new[j].start <= tmp[k].start+tmp[k].len)
- {
- if (_new[j].start+_new[j].len > tmp[k].start+tmp[k].len)
- tmp[k].len = _new[j].start+_new[j].len-tmp[k].start;
- j++;
- } else
- break;
- }
- k++;
- }
- if (i<total)
- {
- memcpy (tmp+k, head+i, (total-i)*sizeof(struct Interval));
- k += (total-i);
- } else if (j<num)
- {
- memcpy (tmp+k, _new+j, (num-j)*sizeof(struct Interval));
- k += (num-j);
- }
- if (k > MAX_INTERVAL) k = MAX_INTERVAL;
- if (k>0) memcpy (head, tmp, k*sizeof(struct Interval));
- return k;
- }
- int check_valid (struct Edge *e, int play)
- {
- struct Interval tmp;
- struct Interval *i = e->inter, *result;
- int num = e->numinter;
- if (num <= 0) return 0;
- if (i[0].start > play || i[num-1].start+i[num-1].len <= play) return 0;
- tmp.start = play;
- tmp.len = 1;
- if ((result = (struct Interval*)bsearch ((void*)&tmp, i, num, sizeof (struct Interval), compareInter)) == NULL)
- return 0;
- return 1;
- }
- int logto_xml (long channelcount, unsigned int totalclient, long long totalstay)
- {
- FILE *logf = fopen (LOGXML, "w");
- if(!logf)
- {
- PDEBUG("Couldn't open log xml!.n");
- return -1;
- }
- fprintf(logf, "<?xml version="1.0" encoding="iso-8859-1"?>n");
- fprintf(logf, "<TS>n");
- fprintf(logf, "<ElapsedTime>%ld</ElapsedTime>n", CurTimeSec - startTime);
- fprintf(logf, "<TotalLogin>%lld</TotalLogin>n", np2tsLoginCount);
- fprintf(logf, "<TotalMsg>%lld</TotalMsg>n", np2tsLoginCount + np2tsResListCount + np2tsReqResCount + np2tsDelResCount + np2tsReportCount + np2tsNeedPeerCount + np2tsLogoutCount);
- fprintf(logf, "<ActiveChannel>%ld</ActiveChannel>n", channelcount);
- fprintf(logf, "<OnlineUser>%d</OnlineUser>n", totalclient);
- fprintf(logf, "<AvgTime>%.1f</AvgTime>n", totalclient==0?0:(totalstay/((double)totalclient)));
- fprintf(logf, "</TS>n");
- fclose(logf);
- return 0;
- }