TSnew.c
上传用户:liguizhu
上传日期:2015-11-01
资源大小:2422k
文件大小:67k
源码类别:

P2P编程

开发平台:

Visual C++

  1. /*
  2.  *  Openmysee
  3.  *
  4.  *  This program is free software; you can redistribute it and/or modify
  5.  *  it under the terms of the GNU General Public License as published by
  6.  *  the Free Software Foundation; either version 2 of the License, or
  7.  *  (at your option) any later version.
  8.  *
  9.  *  This program is distributed in the hope that it will be useful,
  10.  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
  11.  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  12.  *  GNU General Public License for more details.
  13.  *
  14.  *  You should have received a copy of the GNU General Public License
  15.  *  along with this program; if not, write to the Free Software
  16.  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
  17.  *
  18.  */
  19.  
  20. #include <stdio.h>
  21. #include <stdlib.h>
  22. #include <stdarg.h>
  23. #include <ctype.h>
  24. #include <sys/types.h>
  25. #include <sys/time.h>
  26. #include <sys/stat.h>
  27. #include <sys/wait.h>
  28. #include <sys/resource.h>
  29. #include <sys/socket.h>
  30. #include <netinet/in.h>
  31. #include <arpa/inet.h>
  32. #include <netdb.h>
  33. #include <unistd.h>
  34. #include <fcntl.h>
  35. #include <errno.h>
  36. #include <assert.h>
  37. #include <syslog.h>
  38. #include <string.h>
  39. #include <time.h>
  40. #include <assert.h>
  41. #include "ProtocolDefine.h"
  42. #include "StructDefine.h"
  43. #include "ErrorDefine.h"
  44. #include "util.h"
  45. #include "findcp.h"
  46. #define MAX_BIND 10
  47. #define AUTH_HEADER             12
  48. #define NORMAL_HEADER           5
  49. #define MAX_NET_NUM 5
  50. #define MAX_CLIENT 3072
  51. #define MAX_PEER 0x10 /* max peers returned by NEED_PEERS */
  52. #define IPADDR_LEN 16 /*****IP地址最大长度*****/
  53. #define MD5_LEN 32
  54. #define SERVICE_LEN 128
  55. #define TYPE_NP 0
  56. #define TYPE_CP 1
  57. #define MAX_CHANNEL 0x400 //1024
  58. #define MAX_NP 0x40000 //262144
  59. #define MAX_CP 0x10000 //65536
  60. #define MAX_RM 0x4 //4
  61. //#define TS4CP_PORT 22168
  62. //#define TS4RM_PORT 22169
  63. #define MAX_DATA 20000
  64. #define MAX_LINE 1024
  65. #define MAX_IDLE 120
  66. #define SILENCE_TIME 2
  67. #define BUILD_SOCKADDR(host,port,client)
  68. {
  69. memset ((char *)&client, 0, sizeof (client));
  70. client.sin_port = htons (pnp->port);
  71. client.sin_family = AF_INET;
  72. client.sin_addr.s_addr = htonl (pnp->host);
  73. }
  74. #define FREE_TRACKER(tracker)
  75. {
  76. int i;
  77. max = tracker.max;
  78. closure = tracker.closure;
  79. if ((head = tracker.head) != NULL)
  80. {
  81. for (listnum=0; listnum<max; listnum++)
  82. {
  83. if (head[listnum].socket > 0)
  84. (*closure) (head+listnum);
  85. }
  86. free (head);
  87. }
  88. if (tracker.hash) free (NPTRACKER.hash);
  89. for (i=0; tracker.sock[i] != 0 && i<MAX_BIND; i++)
  90. close (tracker.sock[i]);
  91. }
  92. #define MAX_INTERVAL 20
  93. #define OBSERVE_LAYER 10
  94. #define BASEDIR "/home/channel/"
  95. int current_log_count = 0;
  96. #define MAX_LOG_COUNT 100 // call fflush() when current_log_count reaches MAX_LOG_COUNT
  97. // 用来表示NP上面所有的快区间, 目前块区间由一个开始字段和一个长度字段来标识. 
  98. struct Interval
  99. {
  100. unsigned int start;
  101. unsigned int len;
  102. };
  103. // Generic format of messages between TS and other peers
  104. // Most often used in login process
  105. struct Message
  106. {
  107. unsigned int len;
  108. unsigned char type;
  109. char buffer[MAX_DATA];
  110. };
  111. // format of messages between TS and other peers
  112. // with authcodes specified
  113. struct TSMessage
  114. {
  115. unsigned int len;
  116. unsigned char type;
  117. unsigned int authcode1:24;
  118. unsigned int authcode2;
  119. char buffer[MAX_DATA];
  120. } UDPMsg;
  121. struct ChannelInfo
  122. {
  123. char md5[MD5_LEN+1];
  124. unsigned char numinter;
  125. struct Interval inter[MAX_INTERVAL];
  126. };
  127. struct Channel
  128. {
  129. char name[MD5_LEN+1];
  130. unsigned int numclient;
  131. unsigned int accumclient;
  132. unsigned int latest_time;
  133. #ifndef SORT_NET
  134. struct Edge *PeerHead;
  135. #else
  136. unsigned int nclient_net[MAX_NET_NUM];
  137. struct Edge *PeerHead[MAX_NET_NUM];
  138. #endif
  139. struct Session *SCPhead;
  140. struct Channel *next;
  141. };
  142. struct Edge
  143. {
  144. struct Channel *head;
  145. struct Session *me;
  146. struct Edge *cnext;
  147. struct Edge *enext;
  148. unsigned char numinter;
  149. unsigned int current;
  150. struct Interval inter[MAX_INTERVAL];
  151. };
  152. struct NPInfo
  153. {
  154. struct CorePeerInfo p;
  155. struct TransferInfo t;
  156. struct StatInfo  s;
  157. int    startBlock; // starting position of NP
  158. int numchannel;
  159. struct Edge *cur;
  160. struct Edge *header;
  161. };
  162. struct CPInfo
  163. {
  164. char type;
  165. unsigned char numHeads;
  166. unsigned short connnum;
  167. unsigned char maxConn;
  168. char parameter[42];
  169. int userid;
  170. int resnum;
  171. float band;
  172. char servicetype[128];
  173. };
  174. struct Session
  175. {
  176. int type;
  177. int socket;
  178. unsigned int host;
  179. unsigned int intra;
  180. unsigned short port;
  181. unsigned short npport;
  182. #ifdef SORT_NET
  183. unsigned int net;
  184. struct Session *cachepeer[MAX_NET_NUM];
  185. #else
  186. struct Session *cachepeer;
  187. #endif
  188. unsigned int auth;
  189. unsigned int time_sec;
  190. unsigned int last_access;
  191. union
  192. {
  193. struct NPInfo p;
  194. struct CPInfo cp;
  195. } u;
  196. float clientVer;
  197. struct Session *hnext;
  198. };
  199. FILE *statlog = NULL; // statistics log file
  200. extern int errno;
  201. int OUTPUT_STAT;
  202. time_t CurTimeSec;
  203. time_t startTime;
  204. char *CONFIG="./ats.cfg";
  205. char *LOCALHOST;
  206. #ifdef HAVE_MYSQL
  207. char *MYSQL_HOST="localhost";
  208. char *MYSQL_USER="root";
  209. char *MYSQL_PASS="gtv";
  210. char *MYSQL_DB="gtv";
  211. #endif
  212. int CurrentSock;
  213. fd_set osocks;
  214. int highsock;
  215. char *LOGXML;
  216. struct TransferInfo Transfer;
  217. struct sockaddr_in UDPCLIENT;
  218. extern int init();
  219. extern int readconfig(char * filename);
  220. extern const char* find_ip_from_list(unsigned long ip);
  221. extern int findcppeers(unsigned long ip, void *p);
  222. extern int findnettype(const char *servicetype, void* p);
  223. extern void add_cp_to_list(void *p);
  224. extern void remove_cp_from_list(void *p);
  225. extern const char* find_cp_service_type(unsigned long ip);
  226. #define MAX_POLLUTE 1000
  227. int BINDALL;
  228. int Polluted; /* should we call periodLOG? if polluted>MAX_POLLUTE call */
  229. struct Array cfgTS4NP_PORT;
  230. struct Array cfgTS4CP_PORT;
  231. struct Array cfgTS4RM_PORT;
  232. int SnapShotInterval; // make a snap shot every few seconds
  233. float MIN_CLIENT_VERSION; // client version should be bigger than this one.
  234. long long np2tsLoginCount = 0;
  235. long long np2tsResListCount = 0;
  236. long long np2tsReqResCount = 0;
  237. long long np2tsDelResCount = 0;
  238. long long np2tsReportCount = 0;
  239. long long np2tsNeedPeerCount = 0;
  240. long long np2tsLogoutCount = 0;
  241. long long ts2npWelcomeCount = 0;
  242. long long ts2npPeersCount = 0;
  243. long long ts2npConnectToCount = 0;
  244. long long ts2npMsgCount = 0;
  245. /*************************************************** 
  246.  * SessionCluster是有关每种服务的信息.             *
  247.  * 其中有一个指向该服务相关的各个Session的指针.    *
  248.  ***************************************************/ 
  249. struct SessionCluster
  250. {
  251. unsigned int port[MAX_BIND];
  252. unsigned int maxbuf;
  253. int sock[MAX_BIND]; // 描述所使用的socket
  254. int cur; // current number of sessions 当前客户端的个数
  255. int max; // maximum number of sessions 最多容纳客户端的个数
  256. int maxid; // maxid: maximum session index currently in the list. for optimization of search
  257. struct Session *head; /* pointer to the session pool 
  258.          head[0]为第一个Session,head[max-1]为最后一个session */
  259. struct Session **hash; // session hash table
  260. int (*process) (struct Session *); //这一服务中消息的处理函数
  261. int (*closure) (struct Session *); //这一服务中需要的析构函数
  262. };
  263. struct Session *GCPCHOICE;
  264. struct Channel *ChannelHash[MAX_CHANNEL];
  265. struct SessionCluster NPTRACKER, CPTRACKER;
  266. #ifdef HAVE_RM
  267. struct SessionCluster RMTRACKER;
  268. #endif
  269. #ifdef SORT_NET
  270. char *NETFN;
  271. #endif
  272. struct NamVal ConfigParameters[]
  273. {
  274. #ifdef HAVE_MYSQL
  275. {"MysqlAddress", &MYSQL_HOST, 's'},
  276. {"User", &MYSQL_USER, 's'},
  277. {"Password", &MYSQL_PASS, 's'},
  278. {"Database", &MYSQL_DB, 's'},
  279. #endif
  280. #ifdef SORT_NET
  281. {"Netfile", &NETFN, 's'},
  282. #endif
  283. {"Bind", &LOCALHOST, 's'},
  284. {"TS4NP_PORT", &cfgTS4NP_PORT, 'a'},
  285. {"TS4CP_PORT", &cfgTS4CP_PORT, 'a'},
  286. {"TS4RM_PORT", &cfgTS4RM_PORT, 'a'},
  287. {"SnapShotInterval", &SnapShotInterval, 'd'},
  288. {"ClientVersion", &MIN_CLIENT_VERSION, 'f'},
  289. {"LogFilePath", &LOGXML, 's'},
  290. {"BINDALL", &BINDALL, 'd'}
  291. };
  292. #ifdef DEBUG
  293. #define PDEBUG(fmt, args...) fprintf(stderr, "TS: (%s,%d)" fmt, __FILE__, __LINE__, ## args)
  294. #else
  295. #define PDEBUG(fmt, args...) do {} while (0)
  296. #endif
  297. #ifdef SORT_NET
  298. #define MAX_NET 2048
  299. struct networks
  300. {
  301. unsigned int host;
  302. unsigned int mask;
  303. int net;
  304. } NETBLOCKS[MAX_NET];
  305. unsigned int MASKS[33]={0x0,0x80000000,0xc0000000,0xe0000000,0xf0000000,0xf8000000,0xfc000000,0xfe000000,0xff000000,0xff800000,0xffc00000,0xffe00000,0xfff00000,0xfff80000,0xfffc0000,0xfffe0000,0xffff0000,0xffff8000,0xffffc000,0xffffe000,0xfffff000,0xfffff800,0xfffffc00,0xfffffe00,0xffffff00,0xffffff80,0xffffffc0,0xffffffe0,0xfffffff0,0xfffffff8,0xfffffffc,0xfffffffe,0xffffffff};
  306. // globals
  307. int maxNet; // max id
  308. int readNETBLOCK (char *fname); //用来读入network配置文件
  309. struct networks *getnetwork (unsigned int host, struct networks *head, int n);
  310. int compareNet (const void *a, const void *b);
  311. #endif
  312. struct Message ErrMSG;
  313. #define SEND_NPMSG(sock,msg,code,quit,client)
  314. {
  315. ErrMSG.len = 8;
  316. ErrMSG.type = msg;
  317. ++ts2npMsgCount;
  318. *(unsigned short *)(ErrMSG.buffer) = code;
  319. *(unsigned char *)(ErrMSG.buffer+sizeof(short)) = quit;
  320. sendMessage(sock,(char *)&ErrMSG,client);
  321. }
  322. int init_ts ();
  323. void my_exit () __attribute__((noreturn, destructor));
  324. void process_child (void);
  325. int init_NP (struct Session *);
  326. int process_NP (int idsock);
  327. int closure_NP (struct Session *);
  328. int process_NP2TS_LOGIN (struct Message *);
  329. int process_NP2TS_REPORT (struct Session *, struct TSMessage *);
  330. int process_NP2TS_REPORT2 (struct Session *, struct TSMessage *);
  331. int process_NP2TS_RES_LIST (struct Session *, struct TSMessage *);
  332. int process_NP2TS_REQ_RES (struct Session *, struct TSMessage *);
  333. int process_NP2TS_DEL_RES (struct Session *, struct TSMessage *);
  334. int process_NP2TS_NEED_PEERS (struct Session *, struct TSMessage *m);
  335. int process_NP2TS_QUERY_RES (struct Session *p, struct TSMessage *m);
  336. int process_NEED_PEERS_real (struct Session *p, char *md5, int needcp, unsigned int cur, unsigned char layer);
  337. int findNPPeers (struct Channel *pc, struct Session *me, int playing, int num, char **buffer, char *buffer1);
  338. int findCPPeers (unsigned long host, char *md5, char **buffer);
  339. int init_CP (struct Session *);
  340. int process_CP (int idsock);
  341. int closure_CP (struct Session *);
  342. int process_CP2TS_UPDATE (struct Session *, struct TSMessage *m);
  343. int process_CP2TS_REGISTER (struct Message *m);
  344. int process_CP2TS_NEED_PEERS (struct Session *p, struct TSMessage *m);
  345. void periodLOG (int s);
  346. void makeSnapShot(int count);
  347. //int memlog(char *pwd,char *cmd);
  348. void freeChannel (struct Channel *p);
  349. int check_valid (struct Edge *e, int play);
  350. int merge (struct Interval *head, int total, struct Interval *_new, int num);
  351. int delete_interval (struct Interval *head, int total, struct Interval *_new, int num);
  352. #ifdef HAVE_RM
  353. int init_RM (struct Session *p);
  354. int process_RM (int idsock);
  355. int closure_RM (struct Session *p);
  356. #endif
  357. int logto_xml (long channelcount, unsigned int totalclient, long long totalstay);
  358. inline int hash_np (int h, int p)
  359. {
  360. int id = (((h & (0xffffffff-MAX_NP+1)) >> 14) ^ (h & (MAX_NP-1)) ^ (p & 0xffff)) & (MAX_NP -1);
  361. return id?id:(MAX_NP-1);
  362. }
  363. inline int hash_cp (int h, int p)
  364. {
  365. int id = (((h & (0xffffffff-MAX_CP+1)) >> 12) ^ (h & (MAX_CP-1)) ^ (p & 0xffff)) & (MAX_CP -1);
  366. return id?id:(MAX_CP-1);
  367. }
  368. inline int hash_str (unsigned char *str, int len)
  369. {
  370. int hash;
  371. for (hash=0; len; len--, str++)
  372. hash += (hash << 5) - hash + (*str);
  373. return hash & (MAX_CHANNEL-1);
  374. }
  375. // process_child: 主要函数, 这一函数主要用来设置socks和wsocks.
  376. void process_child (void)
  377. {
  378. time_t last_update=0, last_snapshot=0;
  379. int snapCount = 0;
  380. int i, readsocks;
  381. struct timeval tm;
  382. fd_set socks, esocks;
  383. startTime = time(NULL);
  384. for (i=0; ; i++) 
  385. {
  386. CurTimeSec = time (NULL);
  387. if (CurTimeSec-last_update > MAX_IDLE)
  388. {
  389. periodLOG (1); // close timeout peers
  390. last_update = CurTimeSec;
  391. Polluted = 0; // clear dirty flag
  392. }
  393. if(CurTimeSec - last_snapshot > SnapShotInterval)
  394. {
  395. makeSnapShot(snapCount++); // read it!
  396. system("/usr/bin/vmstat >> ts.log 2>&1 &");
  397. last_snapshot = CurTimeSec;
  398. }
  399. socks = osocks;
  400. esocks = osocks;
  401. tm.tv_sec = 1;
  402. tm.tv_usec = 0;
  403. readsocks = select(highsock+1, &socks, (fd_set *) 0, &esocks, &tm);
  404. if (readsocks <= 0)
  405. continue;
  406. for (i=0; i<MAX_BIND && NPTRACKER.sock[i] != 0; i++)
  407. {
  408. if (FD_ISSET (NPTRACKER.sock[i], &socks))
  409. {
  410. CurrentSock = i;
  411. process_NP (i);
  412. }
  413. // should have a 'break' here to cut off unnecessary check
  414. }
  415. for (i=0; i<MAX_BIND && CPTRACKER.sock[i] != 0; i++)
  416. {
  417. if (FD_ISSET (CPTRACKER.sock[i], &socks))
  418. {
  419. CurrentSock = i;
  420. process_CP (i);
  421. }
  422. }
  423. #ifdef HAVE_RM
  424. for (i=0; i<MAX_BIND && RMTRACKER.sock[i] != 0; i++)
  425. {
  426. if (FD_ISSET(RMTRACKER.sock[i], &socks))
  427. {
  428. CurrentSock = i;
  429. process_RM (i);
  430. }
  431. }
  432. #endif
  433. }
  434. }
  435. void my_exit ()
  436. {
  437. int max, listnum;
  438. struct Session *head;
  439. int (*closure) (struct Session *);
  440. #ifdef HAVE_RM
  441. FREE_TRACKER(RMTRACKER);
  442. #endif
  443. FREE_TRACKER(NPTRACKER);
  444. FREE_TRACKER(CPTRACKER);
  445. // free configuration file.
  446. // 参数为和read_config相同的struct NamVal *以及项数.
  447. free_config (ConfigParameters, sizeof(ConfigParameters)/sizeof(struct NamVal));
  448. #ifdef HAVE_MYSQL
  449. mysql_close (local_mysql);
  450. #endif
  451. PDEBUG ("exit...n");
  452. exit (0);
  453. }
  454. inline void my_memmove (char *dst, char *src, int len)
  455. {
  456. int i;
  457. if (len <= 0 || dst == src) return;
  458. for (i=0; i<len; i++)
  459. *dst++ = *src++;
  460. }
  461. struct Channel *findChannel (char *name, int len)
  462. {
  463. int id = hash_str ((unsigned char*)name, len);
  464. struct Channel *p;
  465. for (p=ChannelHash[id]; p; p=p->next)
  466. {
  467. if (strncmp (name, p->name, len) == 0)
  468. return p;
  469. }
  470. return NULL;
  471. }
  472. struct Edge *newEdge (struct Channel *head, struct Session *me)
  473. {
  474. struct Edge *result = (struct Edge *)malloc (sizeof (struct Edge));
  475. result->head = head;
  476. result->me = me;
  477. #ifndef SORT_NET
  478. result->cnext = head->PeerHead;
  479. head->PeerHead = result;
  480. #else
  481. result->cnext = head->PeerHead[me->net];
  482. head->PeerHead[me->net] = result;
  483. head->nclient_net[me->net] ++;
  484. #endif
  485. result->enext = me->u.p.header;
  486. me->u.p.header = result;
  487. head->numclient ++;
  488. head->accumclient ++;
  489. return result;
  490. }
  491. int delEdge (struct Edge *e)
  492. {
  493. struct Channel *pchannel=e->head;
  494. struct Session *psession=e->me;
  495. struct Edge *pedge;
  496. if (pchannel)
  497. {
  498. #ifndef SORT_NET
  499. if (pchannel->PeerHead == e) pchannel->PeerHead = e->cnext;
  500. #else
  501. pchannel->nclient_net[psession->net] --;
  502. if (pchannel->PeerHead[psession->net] == e) pchannel->PeerHead[psession->net] = e->cnext;
  503. #endif
  504. else
  505. {
  506. #ifndef SORT_NET
  507. for (pedge=pchannel->PeerHead; pedge && pedge->cnext && pedge->cnext != e; pedge = pedge->cnext);
  508. #else
  509. for (pedge=pchannel->PeerHead[psession->net]; pedge && pedge->cnext && pedge->cnext != e; pedge = pedge->cnext);
  510. #endif
  511. if (pedge && pedge->cnext) pedge->cnext = e->cnext;
  512. }
  513. pchannel->numclient --;
  514. if (pchannel->numclient == 0)
  515. freeChannel (pchannel);
  516. }
  517. if (psession)
  518. {
  519. if (psession->u.p.header == e) psession->u.p.header = e->enext;
  520. else
  521. {
  522. for (pedge=psession->u.p.header; pedge && pedge->enext && pedge->enext != e; pedge = pedge->enext);
  523. if (pedge && pedge->enext) pedge->enext = e->enext;
  524. }
  525. }
  526. if (psession->u.p.cur == e) psession->u.p.cur = NULL;
  527. free (e);
  528. return 0;
  529. }
  530. struct Channel *newChannel (char *name, int len)
  531. {
  532. int id = hash_str ((unsigned char*)name, len);
  533. struct Channel *result = (struct Channel *)calloc (1, sizeof (struct Channel));
  534. if (!result) return NULL;
  535. if (len != MD5_LEN)
  536. {
  537. free (result);
  538. return NULL;
  539. }
  540. memcpy (result->name, name, len);
  541. result->name[len] = 0;
  542. result->latest_time = CurTimeSec;
  543. result->next = ChannelHash[id];
  544. ChannelHash[id] = result;
  545. return result;
  546. }
  547. void freeChannel (struct Channel *p)
  548. {
  549. int id = hash_str ((unsigned char*)(p->name), MD5_LEN);
  550. struct Channel *q;
  551. if (ChannelHash[id] == p)
  552. ChannelHash[id] = p->next;
  553. else
  554. {
  555. for (q=ChannelHash[id]; q && q->next != p; q=q->next);
  556. if (q) q->next = p->next;
  557. }
  558. free (p);
  559. }
  560. // join in a channel for session *p
  561. // Add a session-channel mapping (aka. an edge)
  562. int addSession (struct Session *p, struct ChannelInfo *c, int isdefault, unsigned int cur)
  563. {
  564. struct Channel *pc;
  565. struct Edge *pedge = NULL;
  566. if ((pc=findChannel (c->md5, MD5_LEN)) == NULL) /* Since the channel do not exist, the edge does not exist. 
  567.    So there is no need to search the list. */
  568. {
  569. if ((pc = newChannel (c->md5, MD5_LEN)) == NULL)
  570. return -1;
  571. } else{
  572. for (pedge=p->u.p.header; pedge && pedge->head != pc; pedge=pedge->enext);
  573. }
  574. if (pedge == NULL)
  575. {
  576. pedge = newEdge (pc, p);
  577. }
  578. pedge->numinter = c->numinter;
  579. memcpy (pedge->inter, c->inter, c->numinter*sizeof(struct Interval));
  580. pedge->current = cur;
  581. if (isdefault) p->u.p.cur = pedge;
  582. return 0;
  583. }
  584. // delete corresponding session in the session-channel map
  585. int delSession (struct Session *p)
  586. {
  587. struct Edge *pedge, *prevedge;
  588. for (pedge=p->u.p.header; pedge; pedge=prevedge)
  589. {
  590. prevedge = pedge->enext;
  591. delEdge (pedge);
  592. }
  593. return 0;
  594. }
  595. void periodLOG (int s)
  596. {
  597. int i;
  598. struct Session *p;
  599. struct tm result;
  600. localtime_r (&CurTimeSec, &result);
  601. if (s)
  602. {
  603. for (i=0,p=NPTRACKER.head; i<=NPTRACKER.maxid; i++, p++)
  604. {
  605. if (p->socket > 0 && CurTimeSec - p->last_access > MAX_IDLE)
  606. {
  607. closure_NP (p);
  608. }
  609. }
  610. for (i=0,p=CPTRACKER.head; i<=CPTRACKER.maxid; i++, p++)
  611. {
  612. if (p->socket > 0 && CurTimeSec - p->last_access > MAX_IDLE)
  613. {
  614. closure_CP (p);
  615. }
  616. }
  617. }
  618. PDEBUG ("STAT %u,%u/%u %u:%u:%u.n", result.tm_year+1900, result.tm_mon+1, result.tm_mday, result.tm_hour, result.tm_min, result.tm_sec);
  619. PDEBUG ("STAT: "%ld小时%ld分钟%ld秒"n", CurTimeSec/3600, (CurTimeSec%3600)/60, CurTimeSec%60);
  620. return;
  621. }
  622. void makeSnapShot(int count)
  623. {
  624. FILE *f = fopen ("./ts.log", "a");
  625. if (!f)
  626. {
  627. PDEBUG("Couldn't open log file!.n");
  628. return;
  629. }
  630. // seek to the end for write
  631. fseek(f, 0, SEEK_END);
  632. // 1. start SnapShot 
  633. struct tm result;
  634. localtime_r (&CurTimeSec, &result);
  635. fprintf (f, "nn********************Start %d SnapShot, Time: %u/%u %u:%u:%u.n", count, result.tm_mon+1, result.tm_mday, result.tm_hour, result.tm_min, result.tm_sec);
  636. // 2. log cpu & mem state
  637. //memlog(BASEDIR,"./log.sh");
  638. // 3. log message count
  639. fprintf(f, "Login: %lld. ResList: %lld. ReqRes: %lld. DelRes: %lld. Report: %lld. NeedPeers: %lld. Logout: %lld. n", np2tsLoginCount, np2tsResListCount, np2tsReqResCount, np2tsDelResCount, np2tsReportCount, np2tsNeedPeerCount, np2tsLogoutCount);
  640. fprintf(f, "Welcome: %lld. Peers: %lld. ConnectTo: %lld. Msg: %lld.n", ts2npWelcomeCount, ts2npPeersCount, ts2npConnectToCount, ts2npMsgCount);
  641. fflush(f);
  642. // 4. log channel state
  643. struct Channel *pChannel;
  644. int i, total=0, j, watchingCount, noIdleCount;
  645. struct Edge* pedge;
  646. int layerarray[OBSERVE_LAYER], noconnectioncount, higherlayercount;
  647. long long totalup, totaldown;
  648. long long totalcurrblock;
  649. long ts_ChannelCount = 0;
  650. long long ts_stay, totalstay = 0;
  651. for (i=0; i<MAX_CHANNEL; i++)
  652. {
  653. for (pChannel=ChannelHash[i]; pChannel; pChannel=pChannel->next)
  654. {
  655. ++ts_ChannelCount;
  656. total += pChannel->numclient;
  657. watchingCount = 0;
  658. noIdleCount = 0;
  659. ts_stay = 0;
  660. totalup = totaldown = totalcurrblock = 0;
  661. memset(layerarray, 0, OBSERVE_LAYER*sizeof(int));
  662. noconnectioncount = higherlayercount = 0;
  663. #ifndef SORT_NET
  664. for (j=0, pedge=pChannel->PeerHead; pedge && j < pChannel->numclient; pedge=pedge->cnext)
  665. #else
  666. {
  667. int mnetnum;
  668. for (mnetnum = 0; mnetnum < MAX_NET_NUM; mnetnum ++)
  669. {
  670. for (j=0, pedge=pChannel->PeerHead[mnetnum]; pedge && j < pChannel->nclient_net[mnetnum]; pedge=pedge->cnext)
  671. #endif
  672. {
  673. if(pedge->me->socket <= 0)
  674. continue;
  675. if(pedge->me->u.p.p.layer == 0xff)
  676. noconnectioncount++;
  677. else if(pedge->me->u.p.p.layer < OBSERVE_LAYER)
  678. layerarray[pedge->me->u.p.p.layer]++;
  679. else
  680. higherlayercount++;
  681. ts_stay += CurTimeSec - pedge->me->time_sec;
  682. totalup += pedge->me->u.p.t.totalDownBytes;
  683. totaldown += pedge->me->u.p.t.totalUpBytes;
  684. if(pedge->current != 0xffffffff)
  685. {
  686. totalcurrblock += pedge->current;
  687. watchingCount++;
  688. if(CurTimeSec < pedge->me->last_access + 40)
  689. noIdleCount++;
  690. }
  691. }
  692. #ifdef SORT_NET
  693. }
  694. }
  695. #endif
  696. totalstay += ts_stay;
  697. fprintf(f, "%s: accumulated %d clients, current %d clients and  %d/%d is watchings. Average Stay Time : %fn", pChannel->name, pChannel->accumclient, pChannel->numclient, watchingCount,noIdleCount,pChannel->numclient==0?0:(ts_stay/(double)pChannel->numclient));
  698. fprintf(f, "total down: %llu, total up: %llu, down-up: %llu. Avg currBlock: %llu n", totaldown, totalup, (totaldown-totalup), watchingCount==0?0:(totalcurrblock/watchingCount));
  699. fprintf(f, "No Connect: %d. Higher: %d. n Detail: ", noconnectioncount, higherlayercount);
  700. for(j = 0; j < OBSERVE_LAYER; j++)
  701. {
  702. if(layerarray[j] != 0)
  703. fprintf(f, "%d: %d(%.1f), ", j, layerarray[j ], pChannel->numclient==0?0:(layerarray[j]/(double)pChannel->numclient));
  704. }
  705. fprintf(f, "n");
  706. }
  707. }
  708. fprintf(f,"ChannelCount: %ld n",ts_ChannelCount);
  709. fprintf(f, "n ********************END SnapShot.nn");
  710. fclose(f);
  711. logto_xml (ts_ChannelCount, total, totalstay);
  712. }
  713. #if 0
  714. int memlog(char *pwd, char *cmd)
  715. {
  716. int pid = fork();
  717. if (pid == 0)
  718. {
  719. char buffer[10];
  720. daemon(0,1);
  721. snprintf(buffer,10,"%s",cmd);
  722. chdir(pwd);
  723. execlp("/bin/sh", "sh" ,"-c",buffer,NULL);
  724. }
  725. else if(pid < 0)
  726. {
  727. perror("fork error!!!");
  728. return -1;
  729. }
  730. return 0;
  731. }
  732. #endif
  733. #ifdef SORT_NET
  734. int compareNet (const void *a, const void *b)
  735. {
  736. struct networks *p = (struct networks *)a;
  737. struct networks *q = (struct networks *)b;
  738. if (p->host > q->host) return 1;
  739. else if (p->host < q->host) return -1;
  740. return 0;
  741. }
  742. // This is implemented with recursion
  743. struct networks *getnetwork (unsigned int host, struct networks *head, int n)
  744. {
  745. int i = n/2-1;
  746. unsigned int net = host & head[i].mask;
  747. if (n <= 0) return NULL;
  748. if (n == 1)
  749. {
  750. if ((host & head[0].mask) != (head[0].host & head[0].mask))
  751. return NULL;
  752. else return &(head[0]);
  753. }
  754. if (net < (head[i].host & head[i].mask)) return getnetwork (host, head, i);
  755. else if (net > (head[i].host & head[i].mask)) return getnetwork (host, head+i+1, n-i-1);
  756. else return &(head[i]);
  757. }
  758. //192.168.0.0/16/1
  759. //host/mask/netid
  760. int readNETBLOCK (char *fname)
  761. {
  762. int i = 0;
  763. char buffer[MAX_LINE],*p, *q;
  764. struct in_addr inp;
  765. FILE *f = fopen (fname, "r");
  766. if (!f) return -1;
  767. while (fgets (buffer, sizeof (buffer), f))
  768. {
  769. p = index (buffer, '/');
  770. q = rindex (buffer, '/');
  771. if (p == q || (!p) || (!q)) continue;
  772. *p = 0;
  773. *q = 0;
  774. if (inet_aton (buffer, &inp) == 0) continue;
  775. NETBLOCKS[i].host = ntohl (inp.s_addr);
  776. NETBLOCKS[i].mask = MASKS[atoi(p+1)];
  777. NETBLOCKS[i].net = atoi (q+1);
  778. i++;
  779. if (i >= MAX_NET) break;
  780. }
  781. fclose (f);
  782. qsort (NETBLOCKS, i, sizeof (struct networks), compareNet);
  783. return i;
  784. }
  785. int Net = 0;
  786. int Layer = 0;
  787. int compareSession (const void *a, const void *b)
  788. {
  789. struct Session *p = *(struct Session **)a;
  790. struct Session *q = *(struct Session **)b;
  791. if (p->u.p.p.layer == q->u.p.p.layer)
  792. {
  793. if  (p->net == Net) return -1;
  794. else if (q->net == Net) return 1;
  795. else if (p->net > q->net) return 1;
  796. else return -1;
  797. } else if (p->u.p.p.layer <= Layer)
  798. return -1;
  799. else if (q->u.p.p.layer <= Layer)
  800. return 1;
  801. else return (p->u.p.p.layer - q->u.p.p.layer);
  802. }
  803. #endif
  804. int init_udpserver (struct SessionCluster *c, char *host, int *port, unsigned int max, int numbind)
  805. {
  806. int i;
  807. struct Session *head;
  808. // c->port = port;
  809. c->cur = 0; // current count
  810. c->max = max; // maximum count of sessions
  811. c->maxbuf = 0;
  812. if (max > 0)
  813. {
  814. c->head = (struct Session*)calloc (sizeof (struct Session), max); // allocate continuous session pool space
  815. c->hash = (struct Session**)calloc (sizeof (struct Session *), max); // allocate session hash table
  816. head = c->head;
  817. // chain the sessions in session pool as a freelist
  818. for (i=0; i<max-1; i++)
  819. {
  820. head[i].hnext = &(head[i+1]);
  821. }
  822. // initialize hash table
  823. for (i=1; i<max; i++) c->hash[i] = 0;
  824. // this is really ugly!
  825. c->hash[0] = &(head[0]); // c->hash[0] points to the empty list
  826. } // HOW if max <= 0 ? return with error
  827. for (i=0; i<MAX_BIND && i<numbind; i++)
  828. {
  829. switch (BINDALL) // means 'BIND ANY HOST'
  830. {
  831. case 0:
  832. if ((c->sock[i] = init_udp (host, port[i])) < 0)
  833. return -1;
  834. break;
  835. default:
  836. if ((c->sock[i] = init_udp (NULL, port[i])) < 0)
  837. return -1;
  838. break;
  839. }
  840. // highsock indicates the largest active fd. used in select() Used to speed up search
  841. if (c->sock[i] > highsock) highsock = c->sock[i];
  842. FD_SET(c->sock[i], &osocks);
  843. }
  844. return 0;
  845. }
  846. int init_ts ()
  847. {
  848. struct rlimit rl;
  849. #ifdef DEBUG
  850. system ("ulimit -a");
  851. if (getrlimit (RLIMIT_CORE, &rl) != 0)
  852. {
  853. perror ("getrlimit");
  854. return -1;
  855. }
  856. fprintf (stderr, "Get core limit %d:%dn", (int)rl.rlim_cur, (int)rl.rlim_max);
  857. rl.rlim_cur = rl.rlim_max = (rlim_t )10240000;
  858. if (setrlimit (RLIMIT_CORE, &rl) != 0)
  859. {
  860. perror ("getrlimit");
  861. return -1;
  862. }
  863. if (getrlimit (RLIMIT_CORE, &rl) != 0)
  864. {
  865. perror ("getrlimit");
  866. return -1;
  867. }
  868. fprintf (stderr, "Set core limit to %d:%dn", (int)rl.rlim_cur, (int)rl.rlim_max);
  869. system ("ulimit -a");
  870. #endif
  871. if (init_udpserver (&NPTRACKER, LOCALHOST, (int*)cfgTS4NP_PORT.ptr, MAX_NP, cfgTS4NP_PORT.size) < 0
  872. #ifdef HAVE_RM
  873. || init_udpserver (&RMTRACKER, LOCALHOST, (int*)cfgTS4RM_PORT.ptr, MAX_RM, cfgTS4RM_PORT.size) < 0
  874. #endif
  875. || init_udpserver (&CPTRACKER, LOCALHOST, (int*)cfgTS4CP_PORT.ptr, MAX_CP, cfgTS4CP_PORT.size) < 0)
  876. {
  877. return -1;
  878. }
  879. #ifdef SORT_NET
  880. maxNet = readNETBLOCK (NETFN);
  881. #endif
  882. statlog = fopen("./stat.log", "a+");// create file "stat.log" automatically if it does not exist. 
  883. if (statlog == NULL) {
  884. perror ("error opening statistics log file: stat.log.n");
  885. }
  886. return 0;
  887. }
  888. int main(int argc, char **argv)
  889. {
  890. int i, mode = 1;
  891. // struct itimerval t, ot;
  892. signal (SIGPIPE, SIG_IGN); // SIGPIPE is raised when the client closes the socket exceptionally
  893.                            // if not handled, SIGPIPE would cause unexpected termination.
  894. // signal (SIGINT, my_exit);
  895. // argv[1]: daemon mode, not clear
  896. // argv[2]: output status, not used and not clear
  897. if (argc > 1)
  898. {
  899. mode = atoi (argv[1]);
  900. if (argc > 2) OUTPUT_STAT = atoi (argv[2]);
  901. }
  902. if (mode == 0)
  903. daemon(1,1); // run in the background
  904. // read configuration file. just ignored right now.
  905. // 参数为文件名, 一个struct NamVal *, 以及该struct NamVal的项数
  906. read_config (CONFIG, ConfigParameters, sizeof(ConfigParameters)/sizeof (struct NamVal));
  907. readconfig("ip.list");
  908. for (i=0; i<10; i++) // retry 10 times. not useful here.
  909. {
  910. FD_ZERO(&osocks);
  911. if (init_ts () < 0)
  912. {
  913. PDEBUG ("Error in initialization.n");
  914. exit (1);
  915. }
  916. #ifdef HAVE_MYSQL
  917. if ((local_mysql = init_mysql (MYSQL_HOST, MYSQL_USER, MYSQL_PASS, MYSQL_DB, "/var/run/mysqld/mysqld.sock")) == 0)
  918. {
  919. PDEBUG ("Error in init_mysql.n");
  920. exit (1);
  921. }
  922. #endif
  923. process_child ();
  924. }
  925. return 0;
  926. }
  927. //===============================================
  928. //===== Here begin the message process part =====
  929. //===============================================
  930. int init_NP (struct Session *p)
  931. {
  932. // maxid: maximum session index currently in the list. for optimization of search
  933. int listnum = p - NPTRACKER.head; // this is the index!
  934. if (listnum > NPTRACKER.maxid)
  935. NPTRACKER.maxid = listnum;
  936. NPTRACKER.cur ++; // cur is in fact the counter of sessions
  937. if (p->u.p.cur) // if there is an edge, then the client is already in a channel
  938. PDEBUG ("NP %d in %d enter Channel %.32s(%d clients).n", p-NPTRACKER.head, NPTRACKER.cur, p->u.p.cur->head->name, p->u.p.cur->head->numclient);
  939. else
  940. PDEBUG ("NP %d in %d no default channel.n", p-NPTRACKER.head, NPTRACKER.cur);
  941. return 0;
  942. }
  943. int process_NP (int idsock)
  944. {
  945. int len, listnum;
  946. struct Session *p;
  947. struct TSMessage *m = &UDPMsg;
  948. #ifdef MEASUREMENT
  949. //struct timeval tm;
  950. //long long msec;
  951. #endif
  952. socklen_t addr_len = sizeof (UDPCLIENT);
  953. memset ((char *)&UDPCLIENT, 0, sizeof (UDPCLIENT));
  954. if ((len = (recvfrom (NPTRACKER.sock[idsock], &UDPMsg, MAX_DATA, 0, (struct sockaddr *)&UDPCLIENT, &addr_len))) < 0)
  955. {
  956. perror ("recvfrom:");
  957. return -1;
  958. }
  959. #ifdef MEASUREMENT
  960. //gettimeofday (&tm, NULL);
  961. //msec = ((long long)tm.tv_sec) * 1000000l + tm.tv_usec;
  962. #endif
  963. PDEBUG ("type %d len %d.", m->type, m->len);
  964. if (m->type == NP2TS_LOGIN)
  965. {
  966. process_NP2TS_LOGIN ((struct Message *)m);
  967. ++np2tsLoginCount;
  968. } else
  969. {
  970. listnum = m->authcode1; // index of session object
  971. p = NPTRACKER.head+listnum;
  972. // check the session: 1. bad index; 2. uninitialized or cleared; 3. not match
  973. if (listnum >= NPTRACKER.max || p->socket == 0
  974. || p->auth != m->authcode2)
  975. {
  976. if (m->type != NP2TS_LOGOUT)
  977. SEND_NPMSG(NPTRACKER.sock[idsock],TS2NP_MSG,ERR_INTERNAL,1,&UDPCLIENT);
  978. return -1;
  979. }
  980. switch (m->type)
  981. {
  982. case NP2TS_REPORT: // 报告Interval信息,如果refresh为true, 则重置, 否则则先增加后删除.
  983. process_NP2TS_REPORT (p, m);
  984. ++np2tsReportCount;
  985. break;
  986. case NP2TS_NEED_PEERS:
  987. PDEBUG("Need peers!!n");
  988. process_NP2TS_NEED_PEERS (p, m);
  989. ++np2tsNeedPeerCount;
  990. break;
  991. case NP2TS_LOGOUT: // 退出
  992. closure_NP (p);
  993. ++np2tsLogoutCount;
  994. break;
  995. case NP2TS_RES_LIST: /* 发送当前NP的所有RESOURCE,使用addSession来进行处理, 
  996.         如果还没有这条边, 就添加. */
  997. process_NP2TS_RES_LIST (p, m);
  998. ++np2tsResListCount;
  999. break;
  1000. case NP2TS_REQ_RES: // 添加RES, 并返回Peers
  1001. process_NP2TS_REQ_RES (p, m);
  1002. ++np2tsReqResCount;
  1003. break;
  1004. case NP2TS_DEL_RES: // 删除RES
  1005. process_NP2TS_DEL_RES (p, m);
  1006. ++np2tsDelResCount;
  1007. break;
  1008. case NP2TS_QUERY_RES: //查询RES
  1009. process_NP2TS_QUERY_RES (p, m);
  1010. break;
  1011. case NP2TS_REPORT2:
  1012. process_NP2TS_REPORT2 (p, m);
  1013. break;
  1014. default:
  1015. SEND_NPMSG(NPTRACKER.sock[idsock],TS2NP_MSG,ERR_INTERNAL,1,&UDPCLIENT);
  1016. closure_NP (p);
  1017. break;
  1018. }
  1019. p->last_access = CurTimeSec;
  1020. }
  1021. PDEBUG ("donen");
  1022. #ifdef MEASUREMENT
  1023. //gettimeofday (&tm, NULL);
  1024. //PDEBUG ("msg type %d, len %d: %lld msec.n", m->type, m->len, ((long long)tm.tv_sec) * 1000000l + tm.tv_usec - msec);
  1025. #endif
  1026. return 0;
  1027. }
  1028. void logStat(struct Session *p)
  1029. {
  1030. // 打印记录的时间和客户端的版本号
  1031. fprintf(statlog, "%u %f", time(NULL), p->clientVer);
  1032.        fprintf(statlog, "%d:%d | %d:%dn", p->host, p->port, p->intra, p->npport);
  1033.        fprintf(statlog, "%ut%ut%ut%ut%ut%ut%ut%ut%ut%fn", 
  1034.                        p->u.p.s.playingBlock - p->u.p.startBlock,
  1035.                        p->u.p.s.currBufferTime,
  1036.                        p->u.p.s.bufferCount,
  1037.                        p->u.p.s.bufferTime,
  1038.                        p->u.p.s.connFailCount,
  1039.                        p->u.p.s.inConnections,
  1040.                        p->u.p.s.outConnections,
  1041.                        p->u.p.s.avgInConnTime,
  1042.                        p->u.p.s.avgOutConnTime,
  1043.                        p->u.p.s.messagePercent);
  1044.        fprintf(statlog, "%lldt%lldt%ft%ft%ft%fn",
  1045.                        p->u.p.t.totalDownBytes,
  1046.                        p->u.p.t.totalUpBytes,
  1047.                        p->u.p.t.currDownSpeed,
  1048.                        p->u.p.t.currUpSpeed,
  1049.                        p->u.p.t.avgDownSpeed,
  1050.                        p->u.p.t.avgUpSpeed);
  1051.                        
  1052. current_log_count ++;
  1053. if(current_log_count == MAX_LOG_COUNT)
  1054. {
  1055. fflush(statlog);
  1056. current_log_count = 0;
  1057. }
  1058. }
  1059. int closure_NP (struct Session *p)
  1060. {
  1061. int i, id;
  1062. struct Session *q;
  1063.         // write statistics to log file
  1064.         logStat(p);
  1065. // 1. decrease <maxid> if current session is the last
  1066. if ((i = p - NPTRACKER.head) == NPTRACKER.maxid && i > 0)
  1067. {
  1068. for (i--; NPTRACKER.head[i].socket == 0 && i> 0; i--);
  1069. NPTRACKER.maxid = i;
  1070. }
  1071. // 2. delete corresponding session in the session-channel map
  1072. delSession (p);
  1073. // 3. remove session from the hash table
  1074. id = hash_np (p->host, p->npport);
  1075. if ((q = NPTRACKER.hash[id]) != p) // not head of chain
  1076. {
  1077. for (; q && q->hnext != p; q=q->hnext); // search through the chain for the parent of <p>
  1078. assert (q);
  1079. if (q) q->hnext = p->hnext; // remove
  1080. } else NPTRACKER.hash[id] = p->hnext; // head of chain, got it
  1081. // 4. clear and free session object to the freelist
  1082. memset (p, 0, sizeof (struct Session)); // clear session
  1083. p->hnext = NPTRACKER.hash[0];
  1084. NPTRACKER.hash[0] = p;
  1085. Polluted ++;
  1086. NPTRACKER.cur --;
  1087. return 0;
  1088. }
  1089. int init_CP (struct Session *p)
  1090. {
  1091. const char* servicetype;
  1092. servicetype = find_cp_service_type(p->host);
  1093. if(servicetype == NULL)
  1094. servicetype = "UNKNOWN";
  1095. strcpy(p->u.cp.servicetype, servicetype);
  1096. PDEBUG("n******************************************************************ninit_CP: cp service type is %sn", servicetype);
  1097. int listnum = p - CPTRACKER.head;
  1098. if (listnum > CPTRACKER.maxid)
  1099. CPTRACKER.maxid = listnum;
  1100. CPTRACKER.cur ++;
  1101. GCPCHOICE = p;
  1102. // add_cp_to_list((void*)p);
  1103. return 0;
  1104. }
  1105. int process_CP (int idsock)
  1106. {
  1107. int len, listnum;
  1108. struct Session *p;
  1109. struct TSMessage *m = &UDPMsg;
  1110. #ifdef MEASUREMENT
  1111. struct timeval tm;
  1112. long long msec;
  1113. #endif
  1114. socklen_t addr_len = sizeof (UDPCLIENT);
  1115. memset ((char *)&UDPCLIENT, 0, sizeof (UDPCLIENT));
  1116. if ((len = (recvfrom (CPTRACKER.sock[idsock], &UDPMsg, MAX_DATA, 0, (struct sockaddr *)&UDPCLIENT, &addr_len))) < 0)
  1117. {
  1118. perror ("recvfrom:");
  1119. return -1;
  1120. }
  1121. #ifdef MEASUREMENT
  1122. gettimeofday (&tm, NULL);
  1123. msec = ((long long)tm.tv_sec) * 1000000l + tm.tv_usec;
  1124. #endif
  1125. if (m->type == CP2TS_REGISTER)
  1126. {
  1127. process_CP2TS_REGISTER ((struct Message *)m);
  1128. } else
  1129. {
  1130. listnum = m->authcode1;
  1131. p = CPTRACKER.head+listnum;
  1132. if (listnum >= CPTRACKER.max || p->socket == 0
  1133. || p->auth != m->authcode2)
  1134. {
  1135. if (m->type != CP2TS_LOGOUT)
  1136. {
  1137. PDEBUG("CP error. listnum=%d/%d. socket=%d auth=%d/%dn", listnum, CPTRACKER.max, p->socket, p->auth, m->authcode2);
  1138. SEND_NPMSG(CPTRACKER.sock[idsock],TS2CP_MSG,ERR_INTERNAL,1,&UDPCLIENT);
  1139. }
  1140. return -1;
  1141. }
  1142. switch (m->type)
  1143. {
  1144. case CP2TS_NEED_PEERS: // ECP查询用, 目前尚未使用
  1145. process_CP2TS_NEED_PEERS (p, m);
  1146. break;
  1147. case CP2TS_UPDATE: // 报告CP负载
  1148. process_CP2TS_UPDATE (p, m);
  1149. break;
  1150. case CP2TS_LOGOUT:
  1151. closure_CP (p);
  1152. break;
  1153. default:
  1154. closure_CP (p);
  1155. break;
  1156. }
  1157. p->last_access = CurTimeSec;
  1158. }
  1159. return 0;
  1160. }
  1161. int closure_CP (struct Session *p)
  1162. {
  1163. int i, id;
  1164. struct Session *q;
  1165. //PDEBUG("closure_CP.n");
  1166. //remove_cp_from_list((void*)p);
  1167. //PDEBUG("closure_CP OK.n");
  1168. if ((i = p - CPTRACKER.head) == CPTRACKER.maxid && i > 0)
  1169. {
  1170. for (i--; CPTRACKER.head[i].socket == 0 && i> 0; i--);
  1171. CPTRACKER.maxid = i;
  1172. }
  1173. if (GCPCHOICE == p)
  1174. {
  1175. for (i=CPTRACKER.maxid; i>=0; i--)
  1176. {
  1177. if (CPTRACKER.head[i].socket != 0 && CPTRACKER.head[i].u.cp.type == CT_GENERAL)
  1178. {
  1179. GCPCHOICE = &(CPTRACKER.head[i]);
  1180. break;
  1181. }
  1182. }
  1183. }
  1184. id = hash_cp (p->host, p->npport);
  1185. if ((q = CPTRACKER.hash[id]) != p)
  1186. {
  1187. for (; q && q->hnext != p; q=q->hnext);
  1188. assert (q);
  1189. if (q) q->hnext = p->hnext;
  1190. } else CPTRACKER.hash[id] = p->hnext;
  1191. memset (p, 0, sizeof (struct Session));
  1192. p->hnext = CPTRACKER.hash[0];
  1193. CPTRACKER.hash[0] = p;
  1194. Polluted ++;
  1195. CPTRACKER.cur --;
  1196. return 0;
  1197. }
  1198. #ifdef HAVE_RM
  1199. int getChannelInfo (char *md5, char **buf)
  1200. {
  1201. struct Channel *pc;
  1202. int i, total=0;
  1203. if (strcmp (md5, "*") == 0)
  1204. {
  1205. for (i=0; i<MAX_CHANNEL; i++)
  1206. {
  1207. for (pc=ChannelHash[i]; pc; pc=pc->next)
  1208. {
  1209. memcpy (*buf, pc->name, MD5_LEN);
  1210. *buf += MD5_LEN;
  1211. *(int *)(*buf) = pc->numclient;
  1212. *buf += sizeof (int);
  1213. total ++;
  1214. }
  1215. }
  1216. return total;
  1217. }
  1218. if ((pc=findChannel (md5, MD5_LEN)) != NULL)
  1219. {
  1220. memcpy (*buf, md5, MD5_LEN);
  1221. *buf += MD5_LEN;
  1222. *(int *)(*buf) = pc->numclient;
  1223. *buf += sizeof (int);
  1224. return 1;
  1225. }
  1226. for (i=0; i<MAX_CHANNEL; i++)
  1227. {
  1228. for (pc=ChannelHash[i]; pc; pc=pc->next)
  1229. {
  1230. if (strstr (pc->name, md5) != NULL)
  1231. {
  1232. memcpy (*buf, pc->name, MD5_LEN);
  1233. *buf += MD5_LEN;
  1234. *(int *)(*buf) = pc->numclient;
  1235. *buf += sizeof (int);
  1236. total ++;
  1237. }
  1238. }
  1239. }
  1240. return total;
  1241. }
  1242. int init_RM (struct Session *p)
  1243. {
  1244. int listnum = p - RMTRACKER.head;
  1245. if (listnum > RMTRACKER.maxid)
  1246. RMTRACKER.maxid = listnum;
  1247. RMTRACKER.cur ++;
  1248. return 0;
  1249. }
  1250. #define RM2TS_STAT_QUERY 0x20
  1251. #define TS2RM_STAT_RESPONSE 0x30
  1252. int process_RM (int idsock)
  1253. {
  1254. char buffer[MAX_DATA];
  1255. char *p, *buf = buffer;
  1256. int * psize;
  1257. int querynum;
  1258. int len, total, i;
  1259. struct Message Msg, *m=&Msg;
  1260. #ifdef MEASUREMENT
  1261. struct timeval tm;
  1262. long long msec;
  1263. #endif
  1264. socklen_t addr_len = sizeof (UDPCLIENT);
  1265. memset ((char *)&UDPCLIENT, 0, sizeof (UDPCLIENT));
  1266. if ((len = (recvfrom (RMTRACKER.sock[idsock], m, MAX_DATA, 0, (struct sockaddr *)&UDPCLIENT, &addr_len))) < 0)
  1267. {
  1268. perror ("recvfrom:");
  1269. return -1;
  1270. }
  1271. #ifdef MEASUREMENT
  1272. gettimeofday (&tm, NULL);
  1273. msec = ((long long)tm.tv_sec) * 1000000l + tm.tv_usec;
  1274. #endif
  1275. PDEBUG("got RM msg, len %d. n", len);
  1276. switch (m->type)
  1277. {
  1278. case RM2TS_STAT_QUERY:
  1279. querynum = *(int*)m->buffer;
  1280. if(querynum > 100)
  1281. break; 
  1282. if(querynum > 0)
  1283. {
  1284. buf += sizeof (int);
  1285. *(unsigned char *)buf = TS2RM_STAT_RESPONSE;
  1286. buf += sizeof (char);
  1287. psize = (int*)buf;
  1288. buf += sizeof (int);
  1289. total = 0;
  1290. p = m->buffer + sizeof(int);
  1291. for(i = 0; i < querynum; ++i)
  1292. {
  1293. PDEBUG("query %s. n", p);
  1294. total += getChannelInfo(p, &buf);
  1295. p += MD5_LEN;
  1296. }
  1297. *(int *)buffer = buf - buffer;
  1298. *psize = total;
  1299. if(*psize > 0)
  1300. sendMessage(RMTRACKER.sock[idsock], buffer, &UDPCLIENT);
  1301. }
  1302. break;
  1303. default:
  1304. break;
  1305. }
  1306. #ifdef MEASUREMENT
  1307. gettimeofday (&tm, NULL);
  1308. PDEBUG ("msg type %d, len %d: %lld msec.n", m->type, m->len, ((long long)tm.tv_sec) * 1000000l + tm.tv_usec - msec);
  1309. #endif
  1310. return 0;
  1311. }
  1312. int closure_RM (struct Session *p)
  1313. {
  1314. int i;
  1315. if ((i = p - RMTRACKER.head) == RMTRACKER.maxid && i > 0)
  1316. {
  1317. for (i--; RMTRACKER.head[i].socket == 0 && i> 0; i--);
  1318. RMTRACKER.maxid = i;
  1319. }
  1320. return 0;
  1321. }
  1322. #endif
  1323. //------------------------------------------------
  1324. //- Here begin the specific message process part -
  1325. //------------------------------------------------
  1326. // | login id(UINT32) | md5 password(MD5_LEN) |
  1327. // | version of client(float) | listening port(USHORT) |
  1328. // | size of local ip list(UINT8) | first ip addr(in_addr) |... |
  1329. /* NP向TS登录, 按照来源IP地址和所报告的npport进行hash, 如果距离上次
  1330.    发送NP2TS_LOGIN消息的时间小于SILENCE_TIME, 则直接返回,否则发送WELCOME消息. */
  1331. int process_NP2TS_LOGIN (struct Message *m)
  1332. {
  1333. struct Session *p;
  1334. // struct ChannelInfo tmpch;
  1335. char md5[MD5_LEN+1];
  1336. unsigned int host, myhost, intra;
  1337. unsigned short port, npport;
  1338. int i,id, userID;
  1339. float clientVer;
  1340. unsigned int num;
  1341. char *buf;
  1342. struct P2PAddress *addr;
  1343. buf = m->buffer;
  1344. userID = *(int *)buf;
  1345. buf += sizeof (int);
  1346. memcpy (md5, buf, MD5_LEN);
  1347. md5[MD5_LEN] = 0;
  1348. buf += MD5_LEN;
  1349. #ifdef HAVE_MYSQL
  1350. if (authUser (userID, md5, local_mysql, NULL) == 0)
  1351. {
  1352. SEND_NPMSG(NPTRACKER.sock[CurrentSock],TS2NP_MSG,ERR_INTERNAL,1,&UDPCLIENT);
  1353. closure_NP (p);
  1354. return -1;
  1355. }
  1356. #endif
  1357. // check protocol version
  1358. clientVer = *(float*)buf;
  1359. buf += sizeof (float);
  1360. if(clientVer < MIN_CLIENT_VERSION) {
  1361. SEND_NPMSG(NPTRACKER.sock[CurrentSock], TS2NP_MSG, ERR_LOW_VERSION, 1, &UDPCLIENT);
  1362. return -1;
  1363. }
  1364. npport = ntohs (*(unsigned short *)buf);
  1365. buf += sizeof (short);
  1366. host = ntohl (UDPCLIENT.sin_addr.s_addr);
  1367. port = ntohs (UDPCLIENT.sin_port);
  1368. // find client session in the session cluster (hash table)
  1369. id = hash_np (host, npport);
  1370. for (p=NPTRACKER.hash[id]; p; p=p->hnext)
  1371. if (p->host == host && p->port == port && p->npport == npport)
  1372. break;
  1373. if (!p) // not found, allocate and add new session object
  1374. {
  1375. // error adding session: TS full or uninitialized
  1376. if (NPTRACKER.cur >= NPTRACKER.max || NPTRACKER.hash[0] == 0)
  1377. {
  1378. SEND_NPMSG(NPTRACKER.sock[CurrentSock],TS2NP_MSG,ERR_INTERNAL,1,&UDPCLIENT);
  1379. return -1;
  1380. }
  1381. // allocate session object from the freelist indicated by hash[0]
  1382. // allocate the FIRST free object in hash[0] and insert it into HEAD of corresponding bucket
  1383. p = NPTRACKER.hash[0];
  1384. NPTRACKER.hash[0] = p->hnext;
  1385. p->hnext = NPTRACKER.hash[id];
  1386. NPTRACKER.hash[id] = p;
  1387. // fill in the object
  1388. p->socket = NPTRACKER.sock[CurrentSock];
  1389. p->type = TYPE_NP;
  1390. p->port = port;
  1391. p->npport = npport;
  1392. p->host = host;
  1393. p->clientVer = clientVer;
  1394. #ifdef SORT_NET
  1395. {
  1396. struct networks *pnetworks = getnetwork (host, NETBLOCKS, maxNet);
  1397. if (pnetworks) p->net = pnetworks->net;
  1398. else p->net = 0;
  1399. }
  1400. #endif
  1401. p->time_sec = CurTimeSec;
  1402. // process the ip list
  1403. num = *(unsigned char *)buf; // size of local ip list
  1404. buf += sizeof (char);
  1405. intra = 0;
  1406. for (i=0; i<num; i++)
  1407. {
  1408. myhost = ntohl (*(unsigned int *)buf);
  1409. buf += sizeof (int);
  1410. if (intra == 0xffffffff)
  1411. continue; // Must use continue to modify buf to right place
  1412. if ((myhost >> 16) == 0xc0a8) // 0xc0a8 == 192.168
  1413. {
  1414. intra = myhost;
  1415. if(host == intra)
  1416. intra = 0xffffffff;
  1417. }
  1418. else if (myhost == host)
  1419. intra = 0xffffffff;
  1420. }
  1421. p->intra = intra;
  1422. p->auth = random ();
  1423. //init statistics
  1424. p->u.p.startBlock = -1;
  1425. init_NP (p);
  1426. } else if (CurTimeSec < p->last_access + SILENCE_TIME)
  1427. return 0; // still active, do NOTHING
  1428. if (buf - m->buffer + NORMAL_HEADER > m->len)
  1429. {
  1430. PDEBUG ("Invalid message NP2TS_LOGIN, length %d not enough.n", m->len);
  1431. return -1;
  1432. }
  1433. // Time to send back welcome message to NP
  1434. UDPMsg.type = TS2NP_WELCOME;
  1435. UDPMsg.len = 12+sizeof(struct P2PAddress);
  1436. UDPMsg.authcode1 = p-NPTRACKER.head;
  1437. UDPMsg.authcode2 = p->auth;
  1438. addr = (struct P2PAddress *)(UDPMsg.buffer);
  1439. addr->outerIP.sin_family = addr->subnetIP.sin_family = AF_INET;
  1440. addr->outerIP.sin_addr.s_addr = htonl (p->host);
  1441. addr->subnetIP.sin_addr.s_addr = htonl (p->intra);
  1442. addr->outerIP.sin_port = ntohs (port);
  1443. addr->subnetIP.sin_port = ntohs (npport);
  1444. ++ts2npWelcomeCount;
  1445. if (sendMessage(p->socket, (char *)&UDPMsg, &UDPCLIENT) < 0)
  1446. {
  1447. closure_NP (p);
  1448. return -1;
  1449. }
  1450. // process_NEED_PEERS_real (p, NULL, 1, 0, 0);
  1451. return 0;
  1452. }
  1453. // |---CHECK DIGITS(7 BYTEs)---|res count(UINT8)|RESOURCE MD5(MD5_LEN)|...|
  1454. int process_NP2TS_QUERY_RES (struct Session *p, struct TSMessage *m)
  1455. {
  1456. int i;
  1457. char *buf = m->buffer;
  1458. char buffer[MAX_DATA];
  1459. char *resultMsg, *prescount;
  1460. struct Channel *pc;
  1461. unsigned char num = *(unsigned char *)buf; //# of queried resources
  1462. buf += sizeof (char);
  1463. resultMsg = buffer+sizeof(int);
  1464. *(unsigned char *)resultMsg = TS2NP_RESINFO;
  1465. resultMsg += sizeof (char);
  1466. prescount = resultMsg;
  1467. *(unsigned char *)prescount = 0; // counter of resources
  1468. for (i=0; i<num; i++)
  1469. {
  1470. if ((pc = findChannel (buf, MD5_LEN)) != NULL)
  1471. {
  1472. (*(unsigned char *)prescount) ++;
  1473. memcpy (resultMsg, buf, MD5_LEN);
  1474. resultMsg += MD5_LEN;
  1475. *(unsigned short *)resultMsg = pc->numclient;
  1476. resultMsg += sizeof (short);
  1477. }
  1478. buf += MD5_LEN;
  1479. }
  1480. if (buf - m->buffer + AUTH_HEADER > m->len)
  1481. {
  1482. PDEBUG ("Invalid message NP2TS_LOGIN, length %d not enough.n", m->len);
  1483. closure_NP (p);
  1484. return -1;
  1485. }
  1486. *(unsigned int *)buffer = resultMsg - buffer; // The length of the message
  1487. if (sendMessage(p->socket,buffer,&UDPCLIENT) < 0)
  1488. {
  1489. // closure_NP (p);
  1490. return -1;
  1491. }
  1492. return 0;
  1493. }
  1494. // new version of REPORT with TransferInfo, ignoring TransferInfo
  1495. // | ---CHECK DIGITS(7 BYTEs)--- |
  1496. // | info of peer(CorePeerInfo) | refresh(bool) |
  1497. // | Interval count(UINT8) | first BlockInterval | ... |
  1498. // | transfer Info(TransferInfo) |
  1499. int process_NP2TS_REPORT (struct Session *p, struct TSMessage *m)
  1500. {
  1501. unsigned char type;
  1502. char *buf = m->buffer;
  1503. // 1. extract CorePeerInfo
  1504. memcpy (&(p->u.p.p), buf, sizeof (struct CorePeerInfo));
  1505. buf += sizeof (struct CorePeerInfo);
  1506. if (p->u.p.cur)
  1507. {
  1508. type = *(unsigned char *)buf;
  1509. buf += sizeof (char);
  1510. if (type) // refresh==true
  1511. {
  1512. p->u.p.cur->numinter = *(unsigned char *)buf;
  1513. PDEBUG ("Set %d intervalsn", p->u.p.cur->numinter);
  1514. buf += sizeof (char);
  1515. if (p->u.p.cur->numinter > 0 && p->u.p.cur->numinter < MAX_INTERVAL)
  1516. memcpy (p->u.p.cur->inter, buf, p->u.p.cur->numinter*sizeof (struct Interval));
  1517. else p->u.p.cur->numinter = 0;
  1518. } else // refresh==false incremental update
  1519. {
  1520. type = *(unsigned char *)buf; // # of newly added intervals
  1521. buf += sizeof (char);
  1522. PDEBUG ("Add %d intervals,", type);
  1523. if (type > 0) // merge & delete are not good name!
  1524. {
  1525. p->u.p.cur->numinter = merge (p->u.p.cur->inter, p->u.p.cur->numinter, (struct Interval *)buf, type);
  1526. buf += type*sizeof(struct Interval);
  1527. }
  1528. type = *(unsigned char *)buf;
  1529. buf += sizeof (char);
  1530. if (type > 0)
  1531. {
  1532. p->u.p.cur->numinter = delete_interval (p->u.p.cur->inter, p->u.p.cur->numinter, (struct Interval *)buf, type);
  1533. buf += type*sizeof(struct Interval);
  1534. }
  1535. PDEBUG ("del %d intervals, now is %d.n", type, p->u.p.cur->numinter);
  1536. }
  1537. }
  1538. // ignore TransferInfo, but still count the size for integrity check
  1539. // --DELETED-- copy transferinfo of client
  1540. // --DELETED-- memcpy (&(p->u.p.t), buf, sizeof (struct TransferInfo));
  1541. buf += sizeof(struct TransferInfo);
  1542. if (buf - m->buffer + AUTH_HEADER > m->len)
  1543. {
  1544. PDEBUG ("Invalid message NP2TS_LOGIN, length %d not enough.n", m->len);
  1545. closure_NP (p);
  1546. return -1;
  1547. }
  1548. return 0;
  1549. }
  1550. int process_NP2TS_REPORT2 (struct Session *p, struct TSMessage *m)
  1551. {
  1552. char *buf = m->buffer;
  1553. // TODO: 
  1554. //  extract current playing block and count them
  1555. memcpy (&(p->u.p.s), buf, sizeof (struct StatInfo));
  1556. buf += sizeof (struct StatInfo);
  1557. memcpy (&(p->u.p.t), buf, sizeof (struct TransferInfo));
  1558. buf += sizeof (struct TransferInfo);
  1559. // check if this is the first report and record starting block of NP
  1560. if ( p->u.p.startBlock == -1 ) {
  1561. p->u.p.startBlock = p->u.p.s.playingBlock;
  1562. }
  1563. if (buf - m->buffer + AUTH_HEADER > m->len)
  1564. {
  1565. PDEBUG ("Invalid message NP2TS_LOGIN, length %d not enough.n", m->len);
  1566. closure_NP (p);
  1567. return -1;
  1568. }
  1569. return 0;
  1570. }
  1571. // |---CHECK DIGITS(7 BYTEs)---|
  1572. // |resource list size(UINT8)|
  1573. // |RESOURCE MD5(MD5_LEN)|Interval count(UINT8)|first BlockInterval|...|
  1574. int process_NP2TS_RES_LIST (struct Session *p, struct TSMessage *m)
  1575. {
  1576. unsigned int num, i;
  1577. struct ChannelInfo c;
  1578. char *buf = m->buffer;
  1579. // unsigned char needcp;
  1580. num = *(unsigned char *)buf;
  1581. buf += sizeof (char);
  1582. for (i=0; i<num; i++)
  1583. {
  1584. memcpy (c.md5, buf, MD5_LEN);
  1585. c.md5[MD5_LEN] = 0;
  1586. buf += MD5_LEN;
  1587. c.numinter = *(unsigned char *)buf;
  1588. buf += sizeof (char);
  1589. if (c.numinter > MAX_INTERVAL) return -1;
  1590. memcpy (c.inter, buf, c.numinter*sizeof(struct Interval));
  1591. buf += sizeof(struct Interval) * c.numinter;
  1592. addSession (p, &c, 1, 0);
  1593. }
  1594. if (buf - m->buffer + AUTH_HEADER > m->len)
  1595. {
  1596. PDEBUG ("Invalid message NP2TS_LOGIN, length %d not enough.n", m->len);
  1597. closure_NP (p);
  1598. return -1;
  1599. }
  1600. return 0;
  1601. }
  1602. // |---CHECK DIGITS(7 BYTEs)---|
  1603. // |RESOURCE MD5(MD5_LEN)|Interval count(UINT8)|first BlockInterval|...|
  1604. // |current block(UINT32)|need CP(bool)|
  1605. int process_NP2TS_REQ_RES (struct Session *p, struct TSMessage *m)
  1606. {
  1607. struct ChannelInfo c;
  1608. unsigned int cur;
  1609. char *buf = m->buffer;
  1610. unsigned char needcp;
  1611. memcpy (c.md5, buf, MD5_LEN);
  1612. c.md5[MD5_LEN] = 0;
  1613. buf += MD5_LEN;
  1614. c.numinter = *(unsigned char *)buf;
  1615. buf += sizeof (char);
  1616. if (c.numinter > MAX_INTERVAL) return -1;
  1617. memcpy (c.inter, buf, c.numinter*sizeof(struct Interval));
  1618. buf += sizeof(struct Interval) * c.numinter;
  1619. cur = *(unsigned int *)buf;
  1620. buf += sizeof (int);
  1621. needcp = *(unsigned char *)buf;
  1622. buf += sizeof (char);
  1623. if (buf - m->buffer + AUTH_HEADER > m->len)
  1624. {
  1625. PDEBUG ("Invalid message NP2TS_LOGIN, length %d not enough.n", m->len);
  1626. closure_NP (p);
  1627. return -1;
  1628. }
  1629. addSession (p, &c, 1, cur);
  1630. process_NEED_PEERS_real (p, c.md5, needcp, cur, 0);
  1631. SEND_NPMSG(NPTRACKER.sock[CurrentSock],TS2NP_MSG,ERR_ADD_RES_OK,0,&UDPCLIENT);
  1632. return 0;
  1633. }
  1634. // NP has quit from one channel
  1635. // |---CHECK DIGITS(7 BYTEs)---|RESOURCE MD5(MD5_LEN)|
  1636. int process_NP2TS_DEL_RES (struct Session *p, struct TSMessage *m)
  1637. {
  1638. struct Edge *pedge;
  1639. struct Channel *pc;
  1640. char *buf = m->buffer;
  1641. if ((pc=findChannel (buf, MD5_LEN)) == NULL)
  1642. return -1;
  1643. buf += MD5_LEN;
  1644. if (buf - m->buffer + AUTH_HEADER > m->len)
  1645. {
  1646. PDEBUG ("Invalid message NP2TS_LOGIN, length %d not enough.n", m->len);
  1647. closure_NP (p);
  1648. return -1;
  1649. }
  1650. for (pedge=p->u.p.header; pedge && pedge->head != pc; pedge=pedge->enext);
  1651. if (pedge) delEdge (pedge);
  1652. return 0;
  1653. }
  1654. int process_NEED_PEERS_real (struct Session *p, char *md5, int needcp, unsigned int cur, unsigned char layer)
  1655. {
  1656. char buffer[MAX_DATA], buffer1[MAX_DATA]; /* buffer is used to hold TS2CP/NP_PEERS message, 
  1657.      buffer1 is used to hold TS2NP_CONNECT_TO message */
  1658. int num, numcp=0, numnp, conn = p->socket;
  1659. char *buf, *psize;
  1660. struct Channel *pc;
  1661. struct P2PAddress *addr;
  1662. PDEBUG ("NEED_PEERS type %d needcp %d cur %dn", p->type, needcp, cur);
  1663. buf = buffer+sizeof (int);
  1664. if (p->type == TYPE_CP)
  1665. *(unsigned char *)buf = TS2CP_PEERS;
  1666. else
  1667. *(unsigned char *)buf = TS2NP_PEERS;
  1668. buf += sizeof (char);
  1669. ++ts2npPeersCount;
  1670. if (md5) // md5 of the channel
  1671. {
  1672. pc = findChannel (md5, MD5_LEN);
  1673. if (p->type == TYPE_CP)
  1674. {
  1675. memcpy (buf, md5, MD5_LEN);
  1676. buf += MD5_LEN;
  1677. }
  1678. } else if (p->u.p.cur) // Session->NPInfo->CurrentEdge
  1679. pc = p->u.p.cur->head;
  1680. else
  1681. pc = NULL;
  1682. if (pc == NULL) 
  1683. {
  1684. PDEBUG("no channeln");
  1685. return -1;
  1686. }
  1687. num = MAX_PEER;
  1688. if (needcp)
  1689. {
  1690. psize = buf;
  1691. buf += sizeof (char);
  1692. if (p->type == TYPE_CP)
  1693. *(unsigned char *)psize = numcp = findCPPeers (0, pc->name, &buf);
  1694. else
  1695. //*(unsigned char *)psize = numcp = findCPPeers (ntohs (p->host & 0xffff0000), pc->name, &buf);
  1696. *(unsigned char *)psize = numcp = findCPPeers (p->host, pc->name, &buf);
  1697. } else
  1698. {
  1699. *(unsigned char *)buf = 0;
  1700. buf += sizeof (char);
  1701. }
  1702. psize = buf;
  1703. buf += sizeof (char);
  1704. if (p->intra == 0xffffffff) //NP is in the public network, ask NP to contact other NPs
  1705. {
  1706. // |len(INT32) | type(INT8)|target addr(P2PAddress)|connect for free(bool)|
  1707. *(int *)buffer1 = sizeof(struct P2PAddress) + sizeof (int) + 2*sizeof (char);
  1708. *(unsigned char *)(buffer1+sizeof (int)) = TS2NP_CONNECT_TO;
  1709. addr = (struct P2PAddress *)(buffer1+sizeof(int)+sizeof(char));
  1710. addr->outerIP.sin_family = PF_INET;
  1711. addr->subnetIP.sin_family = PF_INET;
  1712. addr->outerIP.sin_port = htons (p->port);
  1713. addr->outerIP.sin_addr.s_addr = htonl (p->host);
  1714. addr->subnetIP.sin_port = htons (p->npport);
  1715. addr->subnetIP.sin_addr.s_addr = htonl (p->intra);
  1716. *(unsigned char *)(addr + 1) = 0; // connect for free(bool)
  1717. PDEBUG("Send ConnectTo %sn", inet_ntoa(addr->outerIP.sin_addr));
  1718. numnp = findNPPeers (pc, p, cur, num, &buf, buffer1);
  1719. } else
  1720. {
  1721. numnp = findNPPeers (pc, p, cur, num, &buf, NULL);
  1722. }
  1723. *psize = numnp; // set # of NP
  1724. *(int *)buffer = buf - buffer;
  1725. if (sendMessage(conn,buffer,&UDPCLIENT) < 0)
  1726. {
  1727. // closure_NP (p);
  1728. PDEBUG("send msg errn");
  1729. return -1;
  1730. }
  1731. PDEBUG ("find %d NP and %d CPn", numnp, numcp);
  1732. return numnp+numcp;
  1733. }
  1734. /* 查询Peer信息, 使用findCPPeer寻找合适的CP, 使用findNPPeers寻找合适的NP. 
  1735.    NP寻找时, 找到结果后按照networks来排序,保证在同一个网络中的排在前面. */
  1736. int process_NP2TS_NEED_PEERS (struct Session *p, struct TSMessage *m)
  1737. {
  1738. char *buf = m->buffer;
  1739. int needcp;
  1740. needcp = *(unsigned char *)buf;
  1741. buf += sizeof (char);
  1742. if(p->u.p.cur == NULL)
  1743. {
  1744. PDEBUG("no current in NPInfon");
  1745. return -1;
  1746. }
  1747. p->u.p.cur->current = *(unsigned int *)buf;
  1748. buf += sizeof (int);
  1749. p->u.p.p.layer= *(unsigned char *)buf;
  1750. buf += sizeof (char);
  1751. if (buf - m->buffer + AUTH_HEADER > m->len)
  1752. {
  1753. PDEBUG ("Invalid message NP2TS_LOGIN, length %d not enough.n", m->len);
  1754. closure_NP (p);
  1755. return -1;
  1756. }
  1757. return process_NEED_PEERS_real (p, NULL, needcp, p->u.p.cur->current, p->u.p.p.layer);
  1758. }
  1759. int findCPPeers (unsigned long host, char *md5, char **buffer)
  1760. {
  1761. // float minband = -1;
  1762. struct Session* p = NULL, *choice1 = NULL, *choice2 = NULL;
  1763. struct NormalAddress *addr1;
  1764. struct NormalAddress *addr2;
  1765. int i;
  1766. // We should start with a random address
  1767. p = CPTRACKER.head + rand() % (CPTRACKER.maxid+1);
  1768. //use pure random address instead of comparing the band
  1769. PDEBUG("In findCPPeers.n");
  1770. int minpriority = -1;
  1771. for( i=0; i <= CPTRACKER.maxid && host != 0; i++, p++)
  1772. {
  1773. PDEBUG("begin to call findcppeers.n");
  1774. if( p-CPTRACKER.head > CPTRACKER.maxid ) // round back to head
  1775. {
  1776. p = CPTRACKER.head;
  1777. }
  1778. if(p->u.cp.maxConn == 1 || p->socket == 0)
  1779. {
  1780. PDEBUG("Invalid CP.n");
  1781. continue;
  1782. }
  1783. int priority = findcppeers(host, (void*)p);
  1784. //如果不是对应的CP,查找下一个
  1785. if(priority == -1)
  1786. {
  1787. PDEBUG("not found.n");
  1788. continue;
  1789. }
  1790. PDEBUG("found: host : %d  servicetype: %s.n", host, p->u.cp.servicetype);
  1791. if(minpriority == -1)
  1792. minpriority = priority;
  1793. if(priority == 1)// priority - 1~n
  1794. {
  1795. if(choice1 == NULL)
  1796. {
  1797. choice1 = p;
  1798. minpriority = -1;
  1799. }
  1800. else
  1801. {
  1802. choice2 = p;
  1803. break;
  1804. }
  1805. }
  1806. else
  1807. {
  1808. if(priority < minpriority)
  1809. {
  1810. if(choice1 == NULL)
  1811. {
  1812. choice1 = p;
  1813. minpriority = -1;
  1814. }
  1815. else
  1816. {
  1817. choice2 = p;
  1818. break;
  1819. }
  1820. }
  1821. }
  1822. }
  1823. if(i > CPTRACKER.maxid)//find nothing via findcppeers, return CP directly
  1824. {
  1825. PDEBUG("begin to find sequencely.n");
  1826. for( i=0; i <= CPTRACKER.maxid; i++, p++)
  1827. {
  1828. if( p-CPTRACKER.head > CPTRACKER.maxid ) // round back to head
  1829. {
  1830. p = CPTRACKER.head;
  1831. }
  1832. if(p->u.cp.maxConn == 1 || p->socket == 0)
  1833. continue;
  1834. // 始终保证choice1比choice2先获得值,如果两个都有了就break
  1835. if(choice1 == NULL)
  1836. {
  1837. choice1 = p;
  1838. }
  1839. else if(p->host == choice1->host)// 避免返回两个相同的CP
  1840. continue;
  1841. else if(choice2 == NULL)
  1842. {
  1843. choice2 = p;
  1844. }
  1845. else
  1846. break;
  1847. }
  1848. }
  1849. int found_cp_count = 0;
  1850. if (choice1)
  1851. {
  1852. addr1 = (struct NormalAddress *)*buffer;
  1853. addr1->sin_family = PF_INET;
  1854. addr1->sin_port = htons (choice1->npport);
  1855. addr1->sin_addr.s_addr = htonl (choice1->host);
  1856. *buffer += sizeof (*addr1);
  1857. found_cp_count ++; // one CP is found
  1858. }
  1859. if(choice2)
  1860. {
  1861. addr2 = (struct NormalAddress *)*buffer;
  1862. addr2->sin_family = PF_INET;
  1863. addr2->sin_port = htons (choice2->npport);
  1864. addr2->sin_addr.s_addr = htonl (choice2->host);
  1865. *buffer += sizeof (*addr2);
  1866. found_cp_count ++;
  1867. }
  1868. return found_cp_count;// 返回最终找到的CP的个数
  1869. }
  1870. #ifndef SORT_NET
  1871. int findNPPeers (struct Channel *pc, struct Session *me, int playing, int num, char **buffer, char *buffer1)
  1872. {
  1873. struct Edge *pedge = NULL;
  1874. struct Session *ps;
  1875. struct P2PAddress addr;
  1876. int j, k;
  1877. unsigned int randstart;
  1878. struct sockaddr_in client;
  1879. if (pc == NULL || pc->numclient <= 0) return 0;
  1880. if (me->cachepeer != NULL && me->cachepeer->u.p.header != NULL && me->type == TYPE_NP)
  1881. {
  1882. for (pedge=me->cachepeer->u.p.header; pedge && pedge->head != pc; pedge=pedge->enext);
  1883. }
  1884. if (pedge == NULL)
  1885. {
  1886. randstart = rand () % pc->numclient;
  1887. for (pedge=pc->PeerHead; pedge && randstart > 0; pedge=pedge->cnext, randstart--);
  1888. }
  1889. for (j=0,k=0; j<num && k<pc->numclient; pedge=pedge->cnext)
  1890. {
  1891. k++;
  1892. if (pedge == NULL) pedge = pc->PeerHead;
  1893. if ((ps = pedge->me) == me) continue; // exclude myself
  1894. if (CurTimeSec > pedge->me->last_access+60) continue; // don't bother it too often
  1895. if (ps->u.p.p.isMaxIn == 0 && ((me->type == TYPE_CP) || playing == 0xffffffff || check_valid (pedge, playing)))
  1896. {
  1897. if (ps->intra == 0xffffffff || ps->host == me->host) // 0xffffffff means ps is on the public network, ps->host == me->host means in the same private network
  1898. {
  1899. memcpy (*buffer, &(ps->u.p.p), sizeof (struct CorePeerInfo));
  1900. addr = (*buffer) + sizeof (struct CorePeerInfo);
  1901. addr->outerIP.sin_port = htons (ps->port);
  1902. addr->outerIP.sin_addr.s_addr = htonl (ps->host);
  1903. addr->subnetIP.sin_port = htons (ps->npport);
  1904. addr->subnetIP.sin_addr.s_addr = htonl (ps->intra);
  1905. addr->outerIP.sin_family = PF_INET;
  1906. addr->subnetIP.sin_family = PF_INET;
  1907. *buffer = (struct char *)(addr + 1);
  1908. j++;
  1909. } else if (buffer1)
  1910. {
  1911. memset ((char *)&client, 0, sizeof (client));
  1912. client.sin_port = htons (ps->port);
  1913. client.sin_family = AF_INET;
  1914. client.sin_addr.s_addr = htonl (ps->host);
  1915. if (sendMessage(ps->socket,buffer1, &client) < 0)
  1916. {
  1917. closure_NP (ps);
  1918. }
  1919. PDEBUG("Send ConnecTo to %sn", inet_ntoa(client.sin_addr));
  1920. }
  1921. }
  1922. }
  1923. if (pedge != NULL) me->cachepeer = pedge->me;
  1924. return j;
  1925. }
  1926. #else
  1927. int findNPPeers (struct Channel *pc, struct Session *me, int playing, int num, char **buffer, char *buffer1)
  1928. {
  1929. int i, j, k, m, mnetnum;
  1930. unsigned int randstart;
  1931. struct Session *result[MAX_PEER];
  1932. struct Session *ps;
  1933. struct sockaddr_in client;
  1934. struct Edge *pedge;
  1935. struct P2PAddress *addr;
  1936. if (pc == NULL || pc->numclient <= 0) return 0;
  1937. Net = me->net;
  1938. for (j=0,m=0; m<MAX_NET_NUM; m++) { mnetnum = (Net + m) % MAX_NET_NUM;
  1939. k = 0;
  1940. if (pc->nclient_net[mnetnum] <= 0) continue;
  1941. pedge = NULL;
  1942. if (me->cachepeer[mnetnum] != NULL && me->cachepeer[mnetnum]->u.p.header != NULL && me->type == TYPE_NP)
  1943. {
  1944. for (pedge=me->cachepeer[mnetnum]->u.p.header; pedge && pedge->head != pc; pedge=pedge->enext);
  1945. }
  1946. if (pedge == NULL)
  1947. {
  1948. randstart = rand () % pc->nclient_net[mnetnum];
  1949. for (pedge=pc->PeerHead[mnetnum]; pedge && randstart > 0; pedge=pedge->cnext, randstart--);
  1950. }
  1951. for (; j<MAX_PEER && k < pc->nclient_net[mnetnum]; pedge=pedge->cnext)
  1952. {
  1953. k++;
  1954. if (pedge == NULL) pedge = pc->PeerHead[mnetnum];
  1955. if ((ps = pedge->me) == me) continue;
  1956. if (CurTimeSec > pedge->me->last_access+60) continue;
  1957. if (ps->u.p.p.isMaxIn == 0 && ((me->type == TYPE_CP) || playing == 0xffffffff || check_valid (pedge, playing)))
  1958. {
  1959. if (ps->intra == 0xffffffff || ps->host == me->host)
  1960. {
  1961. result[j] = ps;
  1962. j++;
  1963. } else if (buffer1)
  1964. {
  1965. memset ((char *)&client, 0, sizeof (client));
  1966. client.sin_port = htons (ps->port);
  1967. client.sin_family = AF_INET;
  1968. client.sin_addr.s_addr = htonl (ps->host);
  1969. if (sendMessage(ps->socket,buffer1, &client) < 0)
  1970. {
  1971. closure_NP (ps);
  1972. }
  1973. PDEBUG("Send ConnecTo to %sn", inet_ntoa(client.sin_addr));
  1974. }
  1975. }
  1976. }
  1977. if (pedge != NULL) me->cachepeer[mnetnum] = pedge->me;
  1978. }
  1979. if (j > 1)
  1980. qsort (result, j, sizeof (struct Session *), compareSession);
  1981. if (num > 0 && j > num) j = num;
  1982. if (me->type == TYPE_NP)
  1983. PDEBUG ("NP %d find %d NP:", me-NPTRACKER.head, j);
  1984. else
  1985. PDEBUG ("CP %d find %d CP:", me-CPTRACKER.head, j);
  1986. for (i=0; i<j; i++)
  1987. {
  1988. ps = result[i];
  1989. PDEBUG ("%dt", ps-NPTRACKER.head);
  1990. memcpy (*buffer, &(ps->u.p.p), sizeof (struct CorePeerInfo));
  1991. addr = (struct P2PAddress *)((*buffer) + sizeof (struct CorePeerInfo));
  1992. addr->outerIP.sin_port = htons (ps->port);
  1993. addr->outerIP.sin_addr.s_addr = htonl (ps->host);
  1994. addr->subnetIP.sin_port = htons (ps->npport);
  1995. addr->subnetIP.sin_addr.s_addr = htonl (ps->intra);
  1996. addr->outerIP.sin_family = PF_INET;
  1997. addr->subnetIP.sin_family = PF_INET;
  1998. *buffer = (char *)(addr + 1);
  1999. }
  2000. PDEBUG ("n");
  2001. return j;
  2002. }
  2003. #endif
  2004. int process_CP2TS_UPDATE (struct Session *p, struct TSMessage *m)
  2005. {
  2006. char *buf = m->buffer;
  2007. p->u.cp.resnum = *(int *)buf;
  2008. buf += sizeof (int);
  2009. p->u.cp.connnum = *(unsigned short *)buf;
  2010. buf += sizeof (unsigned short);
  2011. p->u.cp.band = *(float *)buf;
  2012. buf += sizeof(float);
  2013. p->u.cp.maxConn = *(unsigned char *)buf;
  2014. buf += sizeof(char);
  2015. if (buf - m->buffer + AUTH_HEADER > m->len)
  2016. {
  2017. PDEBUG ("Invalid message NP2TS_LOGIN, length %d not enough.n", m->len);
  2018. closure_CP (p);
  2019. return -1;
  2020. }
  2021. if (p->u.cp.type == CT_GENERAL &&
  2022. (GCPCHOICE == NULL || p->u.cp.resnum < GCPCHOICE->u.cp.resnum))
  2023. GCPCHOICE = p;
  2024. if (p->u.cp.resnum < 0) p->u.cp.resnum = 0;
  2025. if (p->u.cp.band < 0) p->u.cp.band = 0.001;
  2026. return 0;
  2027. }
  2028. int process_CP2TS_NEED_PEERS (struct Session *p, struct TSMessage *m)
  2029. {
  2030. return process_NEED_PEERS_real (p, m->buffer/*md5*/, 1/*needcp*/, 0/*cur*/, 0/*layer*/);
  2031. }
  2032. /* 登录, CP向TS登录, 按照来源IP地址和所报告的npport进行hash,
  2033.    如果距离上次发送CP2TS_REGISTER消息的时间小于SILENCE_TIME, 则直接返回,否则发送WELCOME消息. */
  2034. int process_CP2TS_REGISTER (struct Message *m)
  2035. {
  2036. char md5[MD5_LEN+1];
  2037. unsigned int host, cur = 0;
  2038. unsigned short port, npport;
  2039. int id, userID;
  2040. char *buf;
  2041. struct Session *p;
  2042. buf = m->buffer;
  2043. userID = *(int *)buf;
  2044. buf += sizeof (int);
  2045. memcpy (md5, buf, MD5_LEN);
  2046. md5[MD5_LEN] = 0;
  2047. buf += MD5_LEN;
  2048. #ifdef HAVE_MYSQL
  2049. if (authUser (userID, md5, local_mysql, NULL) == 0)
  2050. {
  2051. }
  2052. #endif
  2053. npport = ntohs(*(unsigned short *)buf);
  2054. buf += sizeof (short);
  2055. host = ntohl (UDPCLIENT.sin_addr.s_addr);
  2056. port = ntohs (UDPCLIENT.sin_port);
  2057. id = hash_cp (host, npport);
  2058. for (p=CPTRACKER.hash[id]; p; p=p->hnext)
  2059. if (p->host == host && p->port == port && p->npport == npport)
  2060. break;
  2061. if (!p)
  2062. {
  2063. if (CPTRACKER.cur >= CPTRACKER.max || CPTRACKER.hash[0] == 0)
  2064. {
  2065. PDEBUG("CP reg err, wrong cp index in array.n");
  2066. SEND_NPMSG(CPTRACKER.sock[CurrentSock],TS2CP_MSG,ERR_INTERNAL,1,&UDPCLIENT);
  2067. return -1;
  2068. }
  2069. p = CPTRACKER.hash[0];
  2070. CPTRACKER.hash[0] = p->hnext;
  2071. p->hnext = CPTRACKER.hash[id];
  2072. CPTRACKER.hash[id] = p;
  2073. p->socket = CPTRACKER.sock[CurrentSock];
  2074. p->type = TYPE_CP;
  2075. p->port = port;
  2076. p->npport = npport;
  2077. p->host = host;
  2078. #ifdef SORT_NET
  2079. {
  2080. struct networks *pnetworks = getnetwork (host, NETBLOCKS, maxNet);
  2081. if (pnetworks) p->net = pnetworks->net;
  2082. else p->net = 0;
  2083. }
  2084. #endif
  2085. p->time_sec = cur;
  2086. p->u.cp.userid = userID;
  2087. p->u.cp.type = *(unsigned char *)buf;
  2088. buf += sizeof (char);
  2089. if (p->u.cp.type == CT_EDGE)
  2090. {
  2091. p->u.cp.numHeads = *(unsigned short *)buf;
  2092. buf += sizeof (short);
  2093. if (((char *)m) +m->len - buf <= sizeof (p->u.cp.parameter))
  2094. memcpy (p->u.cp.parameter, buf, (char *)m+m->len-buf);
  2095. else
  2096. memcpy (p->u.cp.parameter, buf,sizeof (p->u.cp.parameter));
  2097. if (p->u.cp.numHeads > sizeof(p->u.cp.parameter)/2)
  2098. p->u.cp.numHeads = sizeof (p->u.cp.parameter)/2;
  2099. } else if (p->u.cp.type == CT_SPECIFIED_RES)
  2100. {
  2101. memcpy (p->u.cp.parameter, buf, MD5_LEN);
  2102. buf += MD5_LEN;
  2103. }
  2104. p->auth = random ();
  2105. init_CP (p);
  2106. } else if (CurTimeSec < p->last_access + SILENCE_TIME)
  2107. return 0;
  2108. if (buf - m->buffer + NORMAL_HEADER > m->len)
  2109. {
  2110. PDEBUG ("Invalid message NP2TS_LOGIN, length %d not enough.n", m->len);
  2111. closure_CP (p);
  2112. return -1;
  2113. }
  2114. PDEBUG("CP register. socket=%d listnum=%d auth=%d.n", p->socket, p-CPTRACKER.head, p->auth);
  2115. UDPMsg.len = 12;
  2116. UDPMsg.type=TS2CP_WELCOME;
  2117. UDPMsg.authcode1 = p-CPTRACKER.head;
  2118. UDPMsg.authcode2 = p->auth;
  2119. if (sendMessage(p->socket,(char *)&UDPMsg, &UDPCLIENT) < 0)
  2120. {
  2121. closure_CP (p);
  2122. return -1;
  2123. }
  2124. return 0;
  2125. }
  2126. int compareInter (const void *a, const void *b)
  2127. {
  2128. struct Interval *p = (struct Interval *) a;
  2129. struct Interval *q = (struct Interval *) b;
  2130. if ((p->start >= q->start && p->start+p->len <= q->start + q->len)
  2131. || (q->start >= p->start && q->start+q->len <= p->start + p->len))
  2132. return 0;
  2133. return (p->start - q->start);
  2134. }
  2135. // delete用于从原有的Interval当中去掉新的.
  2136. int delete_interval (struct Interval *head, int total, struct Interval *_new, int num)
  2137. {
  2138. int i,j,k;
  2139. struct Interval tmp[MAX_INTERVAL*2];
  2140. for (i=0,j=0,k=0; i<total && j<num;)
  2141. {
  2142. if (_new[j].start+_new[j].len <= head[i].start)
  2143. j++;
  2144. else if (head[i].start+head[i].len <= _new[j].start)
  2145. {
  2146. tmp[k].start = head[i].start;
  2147. tmp[k].len = head[i].len;
  2148. i++;
  2149. k++;
  2150. } else
  2151. {
  2152. if (_new[j].start <= head[i].start)
  2153. {
  2154. if (_new[j].start+_new[j].len >= head[i].start+head[i].len)
  2155. i++;
  2156. else
  2157. {
  2158. tmp[k].start = _new[j].start+_new[j].len;
  2159. tmp[k].len = head[i].start+head[i].len-tmp[k].start;
  2160. i++;
  2161. if (tmp[k].len > 0) k++;
  2162. }
  2163. } else
  2164. {
  2165. tmp[k].start = head[i].start;
  2166. tmp[k].len = _new[j].start-tmp[k].start;
  2167. i++;
  2168. if (tmp[k].len > 0) k++;
  2169. if (_new[j].start+_new[j].len < head[i].start+head[i].len)
  2170. {
  2171. tmp[k].start = _new[j].start+_new[j].len;
  2172. tmp[k].len = head[i].start+head[i].len-tmp[k].start;
  2173. i++;
  2174. j++;
  2175. if (tmp[k].len > 0) k++;
  2176. }
  2177. }
  2178. }
  2179. }
  2180. if (i<total)
  2181. {
  2182. memcpy (tmp+k, head+i, (total-i)*sizeof(struct Interval));
  2183. k += (total-i);
  2184. }
  2185. if (k > MAX_INTERVAL) k = MAX_INTERVAL;
  2186. if (k>0) memcpy (head, tmp, k*sizeof(struct Interval));
  2187. return k;
  2188. }
  2189. // merge用于将原有的Interval和新的Interval列表合在一起 
  2190. int merge (struct Interval *head, int total, struct Interval *_new, int num)
  2191. {
  2192. int i,j,k;
  2193. struct Interval tmp[MAX_INTERVAL*2];
  2194. for (i=0,j=0,k=0; i<total && j<num;)
  2195. {
  2196. if (head[i].start <= _new[j].start)
  2197. {
  2198. tmp[k].start = head[i].start;
  2199. tmp[k].len = head[i].len;
  2200. i++;
  2201. } else
  2202. {
  2203. tmp[k].start = _new[j].start;
  2204. tmp[k].len = _new[j].len;
  2205. j++;
  2206. }
  2207. for (; i<total || j<num;)
  2208. {
  2209. if (i<total && head[i].start <= tmp[k].start+tmp[k].len)
  2210. {
  2211. if (head[i].start+head[i].len > tmp[k].start+tmp[k].len)
  2212. tmp[k].len = head[i].start+head[i].len-tmp[k].start;
  2213. i++;
  2214. } else if (j<num && _new[j].start <= tmp[k].start+tmp[k].len)
  2215. {
  2216. if (_new[j].start+_new[j].len > tmp[k].start+tmp[k].len)
  2217. tmp[k].len = _new[j].start+_new[j].len-tmp[k].start;
  2218. j++;
  2219. } else
  2220. break;
  2221. }
  2222. k++;
  2223. }
  2224. if (i<total)
  2225. {
  2226. memcpy (tmp+k, head+i, (total-i)*sizeof(struct Interval));
  2227. k += (total-i);
  2228. } else if (j<num)
  2229. {
  2230. memcpy (tmp+k, _new+j, (num-j)*sizeof(struct Interval));
  2231. k += (num-j);
  2232. }
  2233. if (k > MAX_INTERVAL) k = MAX_INTERVAL;
  2234. if (k>0) memcpy (head, tmp, k*sizeof(struct Interval));
  2235. return k;
  2236. }
  2237. int check_valid (struct Edge *e, int play)
  2238. {
  2239. struct Interval tmp;
  2240. struct Interval *i = e->inter, *result;
  2241. int num = e->numinter;
  2242. if (num <= 0) return 0;
  2243. if (i[0].start > play || i[num-1].start+i[num-1].len <= play) return 0;
  2244. tmp.start = play;
  2245. tmp.len = 1;
  2246. if ((result = (struct Interval*)bsearch ((void*)&tmp, i, num, sizeof (struct Interval), compareInter)) == NULL)
  2247. return 0;
  2248. return 1;
  2249. }
  2250. int logto_xml (long channelcount, unsigned int totalclient, long long totalstay)
  2251. {
  2252. FILE *logf = fopen (LOGXML, "w");
  2253. if(!logf)
  2254. {
  2255. PDEBUG("Couldn't open log xml!.n");
  2256. return -1;
  2257. }
  2258. fprintf(logf, "<?xml version="1.0" encoding="iso-8859-1"?>n");
  2259. fprintf(logf, "<TS>n");
  2260. fprintf(logf, "<ElapsedTime>%ld</ElapsedTime>n", CurTimeSec - startTime);
  2261. fprintf(logf, "<TotalLogin>%lld</TotalLogin>n", np2tsLoginCount);
  2262. fprintf(logf, "<TotalMsg>%lld</TotalMsg>n", np2tsLoginCount + np2tsResListCount + np2tsReqResCount + np2tsDelResCount + np2tsReportCount + np2tsNeedPeerCount + np2tsLogoutCount);
  2263. fprintf(logf, "<ActiveChannel>%ld</ActiveChannel>n", channelcount);
  2264. fprintf(logf, "<OnlineUser>%d</OnlineUser>n", totalclient);
  2265. fprintf(logf, "<AvgTime>%.1f</AvgTime>n", totalclient==0?0:(totalstay/((double)totalclient)));
  2266. fprintf(logf, "</TS>n");
  2267. fclose(logf);
  2268. return 0;
  2269. }