CPtest.c
上传用户:liguizhu
上传日期:2015-11-01
资源大小:2422k
文件大小:12k
源码类别:

P2P编程

开发平台:

Visual C++

  1. /*
  2.  *  Openmysee
  3.  *
  4.  *  This program is free software; you can redistribute it and/or modify
  5.  *  it under the terms of the GNU General Public License as published by
  6.  *  the Free Software Foundation; either version 2 of the License, or
  7.  *  (at your option) any later version.
  8.  *
  9.  *  This program is distributed in the hope that it will be useful,
  10.  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
  11.  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  12.  *  GNU General Public License for more details.
  13.  *
  14.  *  You should have received a copy of the GNU General Public License
  15.  *  along with this program; if not, write to the Free Software
  16.  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
  17.  *
  18.  */
  19.  
  20. //#define HAVE_MYSQL 1
  21. #include <stdio.h>
  22. #include <stdlib.h>
  23. #include <stdarg.h>
  24. #include <ctype.h>
  25. #include <sys/types.h>
  26. #include <sys/time.h>
  27. #include <sys/stat.h>
  28. #include <sys/wait.h>
  29. #include <sys/socket.h>
  30. #include <netinet/in.h>
  31. #include <arpa/inet.h>
  32. #include <netdb.h>
  33. #include <unistd.h>
  34. #include <fcntl.h>
  35. #include <errno.h>
  36. #include <assert.h>
  37. #include <syslog.h>
  38. #include <string.h>
  39. #include <time.h>
  40. #include "ProtocolDefine.h"
  41. #include "StructDefine.h"
  42. #define MAX_DATA 100000
  43. #define MAX_LINE 1024
  44. #define MAX_IDLE 300
  45. #define SILENCE_TIME 2
  46. #define SP4CP_PORT 50001
  47. int DELAY=0;
  48. int timer=1;
  49. float PROB=1.0;
  50. struct Message
  51. {
  52. int len;
  53. unsigned char type;
  54. char buffer[MAX_DATA];
  55. } UDPMsg;
  56. unsigned short P2P_PORT=23;
  57. char *CPSERVER="166.111.118.6";
  58. unsigned long long TOTALDATA;
  59. unsigned long long TOTALDATASIZE;
  60. unsigned long long TOTALR;
  61. unsigned long long TOTALW;
  62. unsigned long long BYTES_READ;
  63. unsigned long long BYTES_WRITE;
  64. int MaxClient = 50;
  65. int Verbosity;
  66. int Current;
  67. int process (char *md5, int latest);
  68. void settimer (int sig)
  69. {
  70. timer = 0;
  71. }
  72. struct hostent * init_sockaddr (struct sockaddr_in *name, char *host, unsigned int port)
  73. {
  74. struct hostent *h;
  75. unsigned short sp = port;
  76. memset (name, 0, sizeof (*name));
  77. name->sin_family = PF_INET;
  78. name->sin_port = htons (sp);
  79. h = gethostbyname (host);
  80. if (h == (struct hostent *)0)
  81. {
  82. perror ("gethostbyname");
  83. return (struct hostent *)0;
  84. }
  85. name->sin_addr = *(struct in_addr *)h->h_addr;
  86. return h;
  87. }
  88. int my_connect (char *host, int port)
  89. {
  90. struct sockaddr_in client;
  91. struct hostent *h;
  92. int connection = socket (PF_INET, SOCK_STREAM, 0);
  93. if ((connection < 0) || ((h = init_sockaddr (&client, host, port)) == (struct hostent *)0))
  94. {
  95. perror ("socket||gethostbyname");
  96. return -1;
  97. }
  98. if (connect (connection, (struct sockaddr *) &client, sizeof (client)) < 0)
  99. {
  100. perror ("connect");
  101. return -1;
  102. }
  103. return connection;
  104. }
  105. int maxID=11286, maxCurrentID;
  106. unsigned int *maxBuf;
  107. unsigned int *fdBuf;
  108. int main(int argc, char **argv)
  109. {
  110. char *md5="13e7f8397575944c2ca1cb113531d758";
  111. int c;
  112. signal (SIGINT, settimer);
  113. while ((c = getopt (argc, argv, "vD:s:m:hc:T:t:p:")) != -1)
  114. {
  115. switch (c)
  116. {
  117. case 'D':
  118. DELAY = atoi (optarg);
  119. break;
  120. case 'v':
  121. Verbosity = 1;
  122. break;
  123. case 's': // channel id of start
  124. md5 = optarg;
  125. break;
  126. case 'm':
  127. maxID = atoi(optarg);
  128. break;
  129. case 'h':
  130. fprintf (stdout, "%s [-T timer_to_leave] [-D delay] [-s channelid] [-h] [-c numofclient] [-t spserver_ip] [-p port] [-m maxid] [-v]n", argv[0]);
  131. exit (0);
  132. case 'c':
  133. MaxClient = atoi (optarg);
  134. break;
  135. case 'T':
  136. alarm (atoi (optarg));
  137. signal (SIGALRM, settimer);
  138. break;
  139. case 't':
  140. CPSERVER = optarg;
  141. break;
  142. case 'p':
  143. P2P_PORT = atoi (optarg);
  144. break;
  145. default:
  146. fprintf (stdout, "%s [-T timer_to_leave] [-D delay] [-s channelid] [-h] [-c numofclient] [-t spserver_ip] [-p port] [-m maxid] [-v]n", argv[0]);
  147. exit (0);
  148. }
  149. }
  150. if (maxID > 0)
  151. {
  152. maxBuf = calloc (maxID, sizeof (int));
  153. fdBuf = calloc (maxID, sizeof (int));
  154. }
  155. fprintf (stdout, "maxID is %dn", maxID);
  156. signal (SIGPIPE, SIG_IGN);
  157. process (md5, 0);
  158. return 0;
  159. }
  160. int writeMessage (int sock, char *ptr)
  161. {
  162. int len = *(int *)ptr;
  163. if (write (sock, ptr, len) != len)
  164. return -1;
  165. TOTALW ++;
  166. BYTES_WRITE += len;
  167. return len;
  168. }
  169. struct sockInfo
  170. {
  171. int sockfd;
  172. int reged;
  173. int current;
  174. unsigned int delay;
  175. int status;
  176. int start;
  177. int len;
  178. char buffer[MAX_DATA];
  179. };
  180. void process_reader (struct sockInfo *sockpool, int i, int *isfull, fd_set *osock)
  181. {
  182. struct timeval tm;
  183. struct SPUpdate *s;
  184. struct Message *msg;
  185. int nlen, start, offset;
  186. nlen = read (sockpool[i].sockfd, sockpool[i].buffer+sockpool[i].start+sockpool[i].len, MAX_DATA - sockpool[i].len - sockpool[i].start);
  187. if (nlen < 0)
  188. {
  189. fprintf (stderr, "Error in recv from %d:%dn", i, sockpool[i].sockfd);
  190. FD_CLR (sockpool[i].sockfd, osock);
  191. close (sockpool[i].sockfd);
  192. sockpool[i].reged = -1;
  193. return;
  194. } else if (nlen == 0)
  195. {
  196. fprintf (stderr, "SP closed %d:%dn", i, sockpool[i].sockfd);
  197. FD_CLR (sockpool[i].sockfd, osock);
  198. close (sockpool[i].sockfd);
  199. sockpool[i].reged = -1;
  200. return;
  201. }
  202. BYTES_READ += nlen;
  203. sockpool[i].len += nlen;
  204. while (sockpool[i].len > 0)
  205. {
  206. msg = (struct Message *)(sockpool[i].buffer + sockpool[i].start);
  207. if (sockpool[i].len < sizeof (int) || sockpool[i].len < msg->len)
  208. break;
  209. switch (msg->type)
  210. {
  211. case P2P_RESPONSE:
  212. // sockpool[i].status = 0;
  213. if (*(int *)(msg->buffer+sizeof(int)) > 0)
  214. {
  215. TOTALDATA ++;
  216. TOTALDATASIZE += msg->len;
  217. sockpool[i].delay = DELAY;
  218. if (maxBuf)
  219. {
  220. maxBuf[*(int *)(msg->buffer)] = 2;
  221. fdBuf[*(int *)(msg->buffer)] = i;
  222. }
  223. } else
  224. {
  225. if (maxBuf)
  226. maxBuf[*(int *)(msg->buffer)] = 0;
  227. if (DELAY > 0)
  228. {
  229. tm.tv_sec = 0;
  230. tm.tv_usec = sockpool[i].delay;
  231. sockpool[i].delay = sockpool[i].delay * 2;
  232. select (1,NULL, NULL, NULL, &tm);
  233. }
  234. }
  235. break;
  236. case P2P_SPUPDATE:
  237. TOTALR ++;
  238. s = (struct SPUpdate *)(msg->buffer);
  239. if (maxID == 0)
  240. {
  241. maxCurrentID = s->maxBlockID;
  242. // maxBuf = calloc (maxID, sizeof (int));
  243. }
  244. break;
  245. }
  246. sockpool[i].len -= msg->len;
  247. sockpool[i].start += msg->len;
  248. }
  249. if (sockpool[i].len == 0) sockpool[i].start = 0;
  250. else if (sockpool[i].start > 0 && sockpool[i].start +sockpool[i].len > MAX_DATA/2)
  251. {
  252. for (start = 0, offset=sockpool[i].start; start<sockpool[i].len; start++, offset++)
  253. sockpool[i].buffer[start] = sockpool[i].buffer[offset];
  254. sockpool[i].start = 0;
  255. }
  256. }
  257. #define MAX_PUSH 5
  258. void process_writer (struct sockInfo *sockpool, int i, int *isfull, fd_set *osock)
  259. {
  260. int count;
  261. int blocks[MAX_PUSH];
  262. int numb = 0;
  263. int cur;
  264. char *buf;
  265. // if (sockpool[i].status != 0) return;
  266. if (maxID>0)
  267. {
  268. *isfull = 1;
  269. for (count=0,cur=Current; count<maxID && numb < MAX_PUSH; cur++, count++)
  270. {
  271. if (cur >= maxID) cur = 0;
  272. switch (maxBuf[cur])
  273. {
  274. case 0:
  275. blocks[numb] = cur;
  276. numb ++;
  277. break;
  278. case 1:
  279. *isfull = 0;
  280. break;
  281. }
  282. }
  283. if (count >= maxID)
  284. {
  285. // if (TOTALDATA == 9960) fprintf (stdout, "No request nown");
  286. return;
  287. }
  288. } else
  289. {
  290. if (sockpool[i].current + 1 <= maxCurrentID)
  291. cur=sockpool[i].current + 1;
  292. else return;
  293. }
  294. // sockpool[i].status = 1;
  295. Current = cur;
  296. // fprintf (stdout, "Request %d %d blockn", maxID, cur);
  297. UDPMsg.type = P2P_PUSHLIST;
  298. buf = UDPMsg.buffer;
  299. *(char *)buf = 0;
  300. buf += sizeof (char);
  301. if (maxID > 0)
  302. {
  303. *(char *)buf = numb;
  304. buf += sizeof (char);
  305. for (cur=0; cur<numb; cur++)
  306. {
  307. *(int *)buf = blocks[cur];
  308. buf += sizeof (int);
  309. maxBuf[blocks[cur]] = 1;
  310. fdBuf[blocks[cur]] = i;
  311. }
  312. } else
  313. {
  314. *(char *)buf = 1;
  315. buf += sizeof (char);
  316. *(int *)buf = cur;
  317. buf += sizeof (int);
  318. }
  319. *(char *)buf = 0;
  320. buf += sizeof (char);
  321. UDPMsg.len = buf - UDPMsg.buffer + sizeof (int)+sizeof(char);
  322. if (writeMessage (sockpool[i].sockfd, (char *)&UDPMsg) < 0)
  323. {
  324. fprintf (stdout, "Cannot write to sock %d, block id is %dn", sockpool[i].sockfd, cur);
  325. sockpool[i].reged = -1;
  326. }
  327. sockpool[i].current = cur;
  328. *isfull = 0;
  329. }
  330. int process (char *md5, int latest)
  331. {
  332. struct timeval tmnew;
  333. fd_set wsock, rsock, osock;
  334. int nread, i, maxsock = 0;
  335. struct sockInfo *sockpool = calloc (MaxClient, sizeof (struct sockInfo));
  336. char *buf;
  337. int cur;
  338. int isfull = 0;
  339. long long begin, now;
  340. double elapsed=0;
  341. struct NormalAddress SPADDR;
  342. struct PeerInfoWithAddr myaddr;
  343. SPADDR.sin_family = AF_INET;
  344. SPADDR.sin_port = htons (SP4CP_PORT);
  345. SPADDR.sin_addr.s_addr = inet_addr (CPSERVER);
  346. memset (&myaddr, 0, sizeof (myaddr));
  347. myaddr.b.outerIP.sin_family = AF_INET;
  348. myaddr.b.outerIP.sin_port = htons (SP4CP_PORT);
  349. myaddr.b.outerIP.sin_addr.s_addr = inet_addr ("166.111.215.87");
  350. FD_ZERO (&osock);
  351. for (i=0; i<MaxClient; i++)
  352. {
  353. if ((sockpool[i].sockfd = my_connect (CPSERVER, P2P_PORT)) < 0)
  354. {
  355. fprintf (stderr, "Error in my_connect %d.n", i);
  356. exit (1);
  357. }
  358. if (sockpool[i].sockfd > maxsock)
  359. maxsock = sockpool[i].sockfd;
  360. sockpool[i].reged = -1;
  361. memset (&UDPMsg, 0, sizeof (struct Message));
  362. UDPMsg.type = P2P_HELLO;
  363. buf = UDPMsg.buffer + sizeof (float);
  364. memcpy (buf, md5, MD5_LEN);
  365. buf += MD5_LEN;
  366. *(unsigned char *)buf = 0;
  367. buf += sizeof (char);
  368. memcpy (buf, &myaddr, sizeof (myaddr));
  369. buf += sizeof (myaddr);
  370. *(unsigned char *)buf = 1;
  371. buf += sizeof (char);
  372. memcpy (buf, &SPADDR, sizeof (struct NormalAddress));
  373. buf += sizeof (SPADDR);
  374. UDPMsg.len = buf - UDPMsg.buffer + sizeof (int) + sizeof(char);
  375. if (writeMessage (sockpool[i].sockfd, (char *)&UDPMsg) >= 0)
  376. {
  377. sockpool[i].current = -1;
  378. sockpool[i].start = 0;
  379. sockpool[i].len = 0;
  380. sockpool[i].reged = 0;
  381. } else
  382. {
  383. fprintf (stdout, "Cannot write register info to sock %d", sockpool[i].sockfd);
  384. }
  385. }
  386. gettimeofday (&tmnew, NULL);
  387. begin = ((long long)tmnew.tv_sec)*1000000l + tmnew.tv_usec;
  388. while (isfull == 0 && timer == 1)
  389. {
  390. FD_ZERO (&rsock);
  391. FD_ZERO (&wsock);
  392. for (i=0; i<MaxClient; i++)
  393. {
  394. if (sockpool[i].reged >= 0)
  395. {
  396. FD_SET (sockpool[i].sockfd, &rsock);
  397. // if (sockpool[i].status == 0)
  398. FD_SET (sockpool[i].sockfd, &wsock);
  399. }
  400. }
  401. if ((nread = select (maxsock+1, &rsock, &wsock, NULL, NULL)) <= 0)
  402. continue;
  403. for (i=0; i<MaxClient; i++)
  404. {
  405. if (FD_ISSET (sockpool[i].sockfd, &rsock))
  406. process_reader (sockpool, i, &isfull, &osock);
  407. if (sockpool[i].reged >= 0 && sockpool[i].sockfd > 0 && FD_ISSET (sockpool[i].sockfd, &wsock))
  408. process_writer (sockpool, i, &isfull, &osock);
  409. }
  410. if (Verbosity)
  411. {
  412. gettimeofday (&tmnew, NULL);
  413. now = ((long long)tmnew.tv_sec) * 1000000l + tmnew.tv_usec;
  414. if (now-begin > elapsed + 2000000l)
  415. {
  416. elapsed = now - begin;
  417. fprintf (stdout, "%lld: total %lld write, %lld read, %lld bytes write, %lld bytes read, %lld blocks and %lld data.n", now-begin, TOTALW, TOTALR, BYTES_WRITE, BYTES_READ, TOTALDATA, TOTALDATASIZE);
  418. fprintf (stdout, "avg read packets/s: %f; avg write packets/s: %fnavg read Mb/s: %f; avg write Mb/s %fntotal packets/s: %f; total Mb/s %f.n", TOTALR/(elapsed/1000000), TOTALW/(elapsed/1000000), BYTES_READ*8/elapsed, BYTES_WRITE*8/elapsed, (TOTALR+TOTALW)/(elapsed/1000000), (BYTES_READ+BYTES_WRITE)*8/elapsed);
  419. }
  420. }
  421. }
  422. gettimeofday (&tmnew, NULL);
  423. now = ((long long)tmnew.tv_sec) * 1000000l + tmnew.tv_usec;
  424. elapsed = now - begin;
  425. fprintf (stdout, "%lld: total %lld write, %lld read, %lld bytes write, %lld bytes read, %lld blocks and %lld data.n", now-begin, TOTALW, TOTALR, BYTES_WRITE, BYTES_READ, TOTALDATA, TOTALDATASIZE);
  426. fprintf (stdout, "avg read packets/s: %f; avg write packets/s: %fnavg read Mb/s: %f; avg write Mb/s %fntotal packets/s: %f; total Mb/s %f.n", TOTALR/(elapsed/1000000), TOTALW/(elapsed/1000000), BYTES_READ*8/elapsed, BYTES_WRITE*8/elapsed, (TOTALR+TOTALW)/(elapsed/1000000), (BYTES_READ+BYTES_WRITE)*8/elapsed);
  427. if (maxID>0)
  428. {
  429. for (cur=0; cur<maxID; cur++)
  430. {
  431. if (maxBuf[cur] == 1 || maxBuf[cur] == 0)
  432. fprintf(stdout, "(%d %d %d)t", cur, maxBuf[cur], fdBuf[cur]);
  433. }
  434. fprintf(stdout, "n");
  435. }
  436. for (i=0; i<MaxClient; i++)
  437. {
  438. if (sockpool[i].reged >= 0)
  439. close (sockpool[i].sockfd);
  440. }
  441. return 0;
  442. }