sessions.c
上传用户:liguizhu
上传日期:2015-11-01
资源大小:2422k
文件大小:14k
源码类别:

P2P编程

开发平台:

Visual C++

  1. /*
  2.  *  Openmysee
  3.  *
  4.  *  This program is free software; you can redistribute it and/or modify
  5.  *  it under the terms of the GNU General Public License as published by
  6.  *  the Free Software Foundation; either version 2 of the License, or
  7.  *  (at your option) any later version.
  8.  *
  9.  *  This program is distributed in the hope that it will be useful,
  10.  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
  11.  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  12.  *  GNU General Public License for more details.
  13.  *
  14.  *  You should have received a copy of the GNU General Public License
  15.  *  along with this program; if not, write to the Free Software
  16.  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
  17.  *
  18.  */
  19.  
  20. #define FENCE_DATA 1024
  21. int IN_LOOP = 1;
  22. struct cachetype *BufferCacheHead;
  23. inline char *NEW ()
  24. {
  25. char *result;
  26. if (BufferCacheHead != NULL)
  27. {
  28. result = (char *) BufferCacheHead;
  29. BufferCacheHead = BufferCacheHead->next;
  30. } else
  31. {
  32. result = calloc (1, MAX_DATA+FENCE_DATA);
  33. }
  34. assert (result != NULL);
  35. return result;
  36. }
  37. inline void FREE (void *p)
  38. {
  39. if (p == NULL) return;
  40. ((struct cachetype *)p)->next = BufferCacheHead;
  41. BufferCacheHead = p;
  42. }
  43. inline void closure_cache ()
  44. {
  45. struct cachetype *p, *nextp;
  46. for (p=BufferCacheHead; p; p=nextp)
  47. {
  48. nextp = p->next;
  49. free (p);
  50. }
  51. BufferCacheHead = NULL;
  52. }
  53. void terminate (int sig)
  54. {
  55. IN_LOOP --;
  56. }
  57. int handle_new_connection(int sock, int type)
  58. {
  59. struct sockaddr_in addr;
  60. socklen_t addrlen = sizeof (struct sockaddr_in);
  61. int listnum, max, flags;
  62. struct Session *head;
  63. #ifdef SO_LINGER
  64. struct linger ling;
  65. #endif
  66. int newconn = accept(sock, (struct sockaddr *)(&addr), &addrlen);
  67. int keepalive = 1;
  68. if (newconn < 0)
  69. {
  70. perror("accept");
  71. return -1;
  72. }
  73. #ifdef SO_LINGER
  74. ling.l_onoff = 1;
  75. ling.l_linger = 0;
  76. setsockopt (newconn, SOL_SOCKET, SO_LINGER, &ling, sizeof (struct linger));
  77. #endif
  78. setsockopt (newconn, SOL_SOCKET, SO_KEEPALIVE, &keepalive, sizeof (keepalive));
  79. flags = fcntl(newconn, F_GETFL, 0);
  80. flags |= O_NONBLOCK;
  81. fcntl(newconn, F_SETFL, flags);
  82. if (TRACKER[type].cur == TRACKER[type].max)
  83. {
  84. char *buf, buffer[MAX_DATA];
  85. buf = buffer+sizeof (int);
  86. *(unsigned char *)buf = P2P_MSG;
  87. buf += sizeof (char);
  88. *(unsigned short *)buf = ERR_CONNECTION_FULL;
  89. buf += sizeof (short);
  90. *(unsigned char *)buf = 1;
  91. buf += sizeof (char);
  92. *(unsigned int *)buffer = buf - buffer;
  93. write (newconn, buffer, *(unsigned int *)buffer);
  94. close (newconn);
  95. return -1;
  96. }
  97. head = TRACKER[type].head;
  98. max = TRACKER[type].max;
  99. for (listnum = 0; listnum < max; listnum ++)
  100. {
  101. if(head[listnum].socket == 0)
  102. {
  103. head[listnum].socket = newconn;
  104. head[listnum].type = type;
  105. getpeername (newconn, (struct sockaddr *)(&addr), &addrlen);
  106. head[listnum].host = ntohl(addr.sin_addr.s_addr);
  107. head[listnum].port = ntohs(addr.sin_port);
  108. head[listnum].buf = NEW ();
  109. #ifdef __CP_SOURCE
  110. head[listnum].totalup = 0;
  111. #endif
  112. head[listnum].time_sec = CurrentTime;
  113. head[listnum].last_transferblock = CurrentTime;
  114. FD_SET(newconn, &osocks);
  115. if (listnum > TRACKER[type].maxid)
  116. TRACKER[type].maxid = listnum;
  117. break;
  118. }
  119. }
  120. if (listnum >= max)
  121. {
  122. PDEBUG ("no space left for incoming client type %d.", type);
  123. close (newconn);
  124. return -1;
  125. }
  126. TRACKER[type].cur ++;
  127. return (*(TRACKER[type].init)) (listnum);
  128. }
  129. int Clientclosure (int listnum, int type)
  130. {
  131. (*(TRACKER[type].closure)) (listnum);
  132. while (TRACKER[type].maxid == listnum && TRACKER[type].head[listnum].socket == 0 && listnum > 0)
  133. {
  134. listnum --;
  135. TRACKER[type].maxid --;
  136. }
  137. return (--TRACKER[type].cur);
  138. }
  139. void my_exit() __attribute__((noreturn, destructor));
  140. #ifdef __SP_SOURCE
  141. extern struct Channel *ProgramHash[MAX_CHANNEL];
  142. #endif
  143. extern void freeJobCache ();
  144. void my_exit (int err)
  145. {
  146. int max, listnum, type;
  147. struct Session *head;
  148. int (*closure) (int);
  149. for (type=MAX_TYPE-1; type >=0; type --)
  150. {
  151. max = TRACKER[type].maxid+1;
  152. closure = TRACKER[type].closure;
  153. if ((head = TRACKER[type].head) != NULL)
  154. {
  155. for (listnum=0; listnum<max; listnum++)
  156. {
  157. if (head[listnum].socket > 0)
  158. (*closure) (listnum);
  159. }
  160. free (head);
  161. }
  162. #ifdef __CP_SOURCE
  163. if (TRACKER[type].flag == FLAG_SERVER) close (TRACKER[type].sock);
  164. #endif
  165. #ifdef __SP_SOURCE
  166. close (TRACKER[type].sock);
  167. #endif
  168. }
  169. #ifdef __CP_SOURCE
  170. #ifdef HAVE_TS
  171. closure_TS ();
  172. close (TSSOCK);
  173. #endif
  174. #endif
  175. PDEBUG ("exit...n");
  176. freeAllChannel ();
  177. #ifdef __SP_SOURCE
  178. db_end ();
  179. timer_free ();
  180. freeAllProgram ();
  181. freeAllOrder ();
  182. #endif
  183. free_config (ConfigParameters, sizeof(ConfigParameters)/sizeof(struct NamVal));
  184. closure_cache ();
  185. freeJobCache ();
  186. unlink (PIDFile);
  187. CLOSELOG;
  188. exit (err);
  189. }
  190. /* check the message in p->buf, wheter is complete */
  191. inline int checkComplete (struct Session *p)
  192. {
  193. int len;
  194. if (p->socket == 0 || p->off < sizeof (int)+sizeof(char)) return 0;
  195. len = *(unsigned int *)(p->buf+p->start);
  196. if (len >= MAX_BLOCK_SIZE) return -1;
  197. return (p->off >= len?len:0);
  198. }
  199. inline void my_memmove (char *dst, char *src, int len)
  200. {
  201. int i;
  202. if (len <= 0 || dst == src) return;
  203. for (i=0; i<len; i++)
  204. *dst++ = *src++;
  205. }
  206. /* after process a message, update the buf position in p */
  207. inline int updateBuf (struct Session *p, int len)
  208. {
  209. if (p->socket == 0) return 0;
  210. p->start += len;
  211. p->off -= len;
  212. return 0;
  213. }
  214. void process_type (int type, fd_set *socks, fd_set *wsocks, fd_set *esocks)
  215. {
  216. struct ServerDesc *ser = &(TRACKER[type]);
  217. struct Session *p;
  218. int (*process)(int listnum);
  219. int listnum, max, this_read, this_write;
  220. max = ser->maxid + 1;
  221. process = ser->process; //the pointer of function process_P2PC P2PS
  222. for (listnum = 0; listnum < max; listnum++)
  223. {
  224. p = &(ser->head[listnum]);
  225. #ifdef __CP_SOURCE
  226. if (p->flag > 0 && (CurrentTime - p->time_sec) > MAX_CONN)
  227. {
  228. if (reconnect (p) < 0)
  229. {
  230. PDEBUG("Reconnect Error.n");
  231. Clientclosure (listnum, type);
  232. }
  233. continue;
  234. }
  235. #endif
  236. if (p->socket != 0 && FD_ISSET (p->socket, wsocks))
  237. {
  238. #ifdef __CP_SOURCE
  239. struct sockaddr_in addr;
  240. socklen_t addrlen = sizeof (struct sockaddr_in);
  241. if (p->flag != 0)
  242. {
  243. p->flag = 0;
  244. getpeername (p->socket, (struct sockaddr *)(&addr), &addrlen);
  245. p->port = ntohs(addr.sin_port);
  246. /* restore file status flags */
  247. fcntl(p->socket, F_SETFL, p->sock_flag);
  248. }
  249. #endif
  250. if ((this_write = processJobs (p)) < 0)
  251. {
  252. perror ("PP: Write");
  253. Clientclosure (listnum, type);
  254. } else
  255. {
  256. // PDEBUG ("Send %d to %dn", this_write, listnum);
  257. tmpUpBytes += this_write;
  258. #ifdef __CP_SOURCE
  259. p->totalup += this_write;
  260. #endif
  261. }
  262. // continue;
  263. }
  264. if (p->socket != 0 && FD_ISSET (p->socket, socks))
  265. {
  266. if (p->start + p->off  >= MAX_DATA-1)
  267. {
  268. my_memmove (p->buf, p->buf+p->start, p->off);
  269. p->start = 0;
  270. } else if (p->off == 0)
  271. p->start = 0;
  272. if ((this_read = read(p->socket, p->buf+p->start+p->off, MAX_DATA -p->start- p->off)) <= 0)
  273. {
  274. perror("PP: read err");
  275. PDEBUG("socket: %d, p->start: %d. p->off: %d. n", p->socket, p->start, p->off);
  276. Clientclosure (listnum, type);
  277. } else
  278. {
  279. p->off += this_read;
  280. }
  281. }
  282. while (p->socket != 0 && (this_read = checkComplete (p)) > 0)
  283. {
  284. if ((*process) (listnum) == -2) break;
  285. updateBuf (p, this_read);
  286. }
  287. if (p->socket != 0)
  288. {
  289. if (this_read < 0 || this_read > MAX_DATA)
  290. {
  291. PDEBUG ("Too long message, cut off, length is %d, and p->off is %d, p->start is %d.n", this_read, p->off, p->start);
  292. Clientclosure (listnum, type);
  293. }
  294. }
  295. }
  296. }
  297. void process_child (void)
  298. {
  299. int readsocks; //?
  300. struct Session *head;
  301. int highsock;
  302. fd_set socks, wsocks, esocks;
  303. int type, listnum, max;
  304. struct timeval tm;
  305. #ifdef __CP_SOURCE
  306. time_t last_handle_conn = 0;
  307. #endif
  308. startTime = time(NULL);
  309. while (IN_LOOP > 0)
  310. {
  311. FD_ZERO(&socks);
  312. FD_ZERO(&esocks);
  313. FD_ZERO(&wsocks);
  314. #ifdef __CP_SOURCE
  315. #ifdef HAVE_TS
  316. highsock = TSSOCK;
  317. FD_SET(TSSOCK, &socks);
  318. #else
  319. highsock = 0;
  320. #endif
  321. #endif
  322. CurrentTime = time (NULL);
  323. #ifdef __SP_SOURCE
  324. highsock = 0;
  325. for(type=0; type<MAX_TS; type++)
  326. {
  327. if (tsSock[type] <= 0) continue;
  328. if (tsSock[type] > highsock)
  329. highsock = tsSock[type];
  330. FD_SET(tsSock[type], &socks);
  331. }
  332. #endif
  333. for (type=0; type<MAX_TYPE; type++)//type is P2PC and P2PS 
  334. {
  335. #ifdef __CP_SOURCE
  336. if (TRACKER[type].flag == FLAG_SERVER && highsock < TRACKER[type].sock)
  337. highsock = TRACKER[type].sock;
  338. if(CurrentTime - last_handle_conn >= 1) 
  339. {
  340. last_handle_conn = CurrentTime;
  341. }
  342. if (TRACKER[type].flag == FLAG_SERVER) FD_SET(TRACKER[type].sock, &socks);
  343. #endif
  344. #ifdef __SP_SOURCE
  345. if (highsock < TRACKER[type].sock)
  346. highsock = TRACKER[type].sock;
  347. FD_SET(TRACKER[type].sock, &socks);
  348. #endif
  349. max = TRACKER[type].maxid + 1;
  350. head = TRACKER[type].head;
  351. for (listnum = 0; listnum < max; listnum++)
  352. {
  353. if (head[listnum].socket != 0)
  354. {
  355. if (head[listnum].head)
  356. FD_SET(head[listnum].socket, &wsocks);
  357. else
  358. FD_SET(head[listnum].socket, &socks);
  359. if (head[listnum].socket > highsock)
  360. highsock = head[listnum].socket;
  361. }
  362. }
  363. }
  364. tm.tv_sec = 0;
  365. tm.tv_usec = 10000;
  366. readsocks = select(highsock+1, &socks, &wsocks, &esocks, &tm);
  367. if (readsocks <= 0)
  368. goto NEXTROUND;
  369. #ifdef __CP_SOURCE
  370. #ifdef HAVE_TS
  371. if (FD_ISSET(TSSOCK, &socks))
  372. process_TS ();
  373. if (FD_ISSET(TSSOCK, &esocks))
  374. {
  375. PDEBUG ("exit...n");
  376. exit (errno);
  377. }
  378. #endif
  379. #endif
  380. #ifdef __SP_SOURCE
  381. for(type=0; type < MAX_TS; ++type)
  382. {
  383. if(tsSock[type] > 0 && FD_ISSET(tsSock[type], &socks))
  384. process_TS2RM(type);    //
  385. }
  386. #endif
  387. for (type=0; type<MAX_TYPE; type++)
  388. {
  389. #ifdef __CP_SOURCE
  390. if (TRACKER[type].flag == FLAG_SERVER && FD_ISSET(TRACKER[type].sock, &socks))
  391. #endif
  392. #ifdef __SP_SOURCE
  393. if (FD_ISSET(TRACKER[type].sock, &socks))
  394. #endif
  395. handle_new_connection (TRACKER[type].sock, type);
  396. #ifdef __CP_SOURCE
  397. if (TRACKER[type].flag == FLAG_SERVER && FD_ISSET(TRACKER[type].sock, &esocks))
  398. #endif
  399. #ifdef __SP_SOURCE
  400. if (FD_ISSET(TRACKER[type].sock, &esocks))
  401. #endif
  402. {
  403. PDEBUG ("exit...n");
  404. exit (errno);
  405. }
  406. process_type (type, &socks, &wsocks, &esocks);
  407. }
  408. NEXTROUND:
  409. period_process ();
  410. }
  411. }
  412. /* assume the message has been transfered into p->buf */
  413. inline void writeDATAMessage (struct Session *p, struct Channel *pc, struct JobDes *ptr)
  414. {
  415. ptr->len += (*(int *)(ptr->buffer));
  416. addJob (p, pc, ptr);
  417. }
  418. int writeMessage (struct Session *p, struct Channel *pc, char *ptr)
  419. {
  420. struct JobDes *pj;
  421. char *buffer;
  422. int max = 0;
  423. int new = 0;
  424. int len = *(int *)ptr;
  425. if ((pj = findEnoughBuffer (p, pc, len)) == NULL)
  426. {
  427. new = 1;
  428. if ((pj = newJob ()) == NULL)
  429. return -1;
  430. }
  431. buffer = getJobBuffer (pj, &max);
  432. memcpy (buffer, ptr, len);
  433. pj->len += len;
  434. if (new)
  435. addJob (p, pc, pj);
  436. return 0;
  437. }
  438. struct Edge *newEdge (struct Channel *head, struct Session *me)
  439. {
  440. struct Edge *result = malloc (sizeof (struct Edge));
  441. result->head = head;
  442. result->me = me;
  443. result->cnext = head->PeerHead;
  444. head->PeerHead = result;
  445. result->enext = me->header;
  446. me->header = result;
  447. return result;
  448. }
  449. int delEdge (struct Edge *e)
  450. {
  451. struct Channel *pchannel=e->head;
  452. struct Session *psession=e->me;
  453. struct Edge *pedge;
  454. if (pchannel)
  455. {
  456. if (pchannel->PeerHead == e) pchannel->PeerHead = e->cnext;
  457. else
  458. {
  459. for (pedge=pchannel->PeerHead; pedge && pedge->cnext && pedge->cnext != e; pedge = pedge->cnext);
  460. if (pedge && pedge->cnext)
  461. pedge->cnext = e->cnext;
  462. }
  463. }
  464. if (psession)
  465. {
  466. if (psession->header == e) psession->header = e->enext;
  467. else
  468. {
  469. for (pedge=psession->header; pedge && pedge->enext && pedge->enext != e; pedge = pedge->enext);
  470. if (pedge && pedge->enext)
  471. pedge->enext = e->enext;
  472. }
  473. }
  474. if (psession->pc == pchannel) psession->pc = NULL;
  475. free (e);
  476. return 0;
  477. }
  478. void apply_session (struct Session *head, int size, void apply(struct Session *, void *), void *p)
  479. {
  480. int i;
  481. for (i = 0; i < size; i++)
  482. {
  483. if (head[i].socket > 0) apply (&(head[i]), p);
  484. }
  485. }
  486. int logto_xml (unsigned int time_interval, unsigned int tmpTime, unsigned int channelcount, unsigned int totalclient)
  487. {
  488. FILE *logf;
  489. if (LOGXML == NULL || LOGXML[0] == 0 || (logf = fopen(LOGXML,"w")) == NULL)
  490. {
  491. PDEBUG("Couldn't open xml log %s!.n", LOGXML);
  492. return -1;
  493. }
  494. fprintf(logf, "<?xml version="1.0" encoding="iso-8859-1"?>n");
  495. #ifdef __SP_SOURCE
  496. fprintf(logf, "<SP>n");
  497. #endif
  498. #ifdef __CP_SOURCE
  499. fprintf(logf, "<CP>n");
  500. #endif
  501. fprintf(logf, "<ElapsedTime>%ld</ElapsedTime>n", CurrentTime-startTime);
  502. fprintf(logf, "<CurrentIncoming>%.4f</CurrentIncoming>n", ((float)tmpDownBytes)/1024/time_interval);
  503. fprintf(logf, "<CurrentOutgoing>%.4f</CurrentOutgoing>n", ((float)tmpUpBytes)/1024/time_interval);
  504. fprintf(logf, "<AverageIncoming>%.4f</AverageIncoming>n", ((float)totalDownBytes)/1024/tmpTime);
  505. fprintf(logf, "<AverageOutgoing>%.4f</AverageOutgoing>n", ((float)totalUpBytes)/1024/tmpTime);
  506. fprintf(logf, "<ActiveChannel>%d</ActiveChannel>n", channelcount);
  507. fprintf(logf, "<OnlineUser>%d</OnlineUser>n", totalclient);
  508. fprintf(logf, "<TotalIncoming>%lld</TotalIncoming>n", totalDownBytes);
  509. fprintf(logf, "<TotalOutgoing>%lld</TotalOutgoing>n", totalUpBytes);
  510. #ifdef __SP_SOURCE
  511. fprintf(logf, "</SP>n");
  512. #endif
  513. #ifdef __SP_SOURCE
  514. fprintf(logf, "</CP>n");
  515. #endif
  516.         fclose(logf);
  517. return 0;
  518. }