CPtest.c
资源名称:p2p_vod.rar [点击查看]
上传用户:liguizhu
上传日期:2015-11-01
资源大小:2422k
文件大小:12k
源码类别:
P2P编程
开发平台:
Visual C++
- /*
- * Openmysee
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
- *
- */
- //#define HAVE_MYSQL 1
- #include <stdio.h>
- #include <stdlib.h>
- #include <stdarg.h>
- #include <ctype.h>
- #include <sys/types.h>
- #include <sys/time.h>
- #include <sys/stat.h>
- #include <sys/wait.h>
- #include <sys/socket.h>
- #include <netinet/in.h>
- #include <arpa/inet.h>
- #include <netdb.h>
- #include <unistd.h>
- #include <fcntl.h>
- #include <errno.h>
- #include <assert.h>
- #include <syslog.h>
- #include <string.h>
- #include <time.h>
- #include "ProtocolDefine.h"
- #include "StructDefine.h"
- #define MAX_DATA 100000
- #define MAX_LINE 1024
- #define MAX_IDLE 300
- #define SILENCE_TIME 2
- #define SP4CP_PORT 50001
- int DELAY=0;
- int timer=1;
- float PROB=1.0;
- struct Message
- {
- int len;
- unsigned char type;
- char buffer[MAX_DATA];
- } UDPMsg;
- unsigned short P2P_PORT=23;
- char *CPSERVER="166.111.118.6";
- unsigned long long TOTALDATA;
- unsigned long long TOTALDATASIZE;
- unsigned long long TOTALR;
- unsigned long long TOTALW;
- unsigned long long BYTES_READ;
- unsigned long long BYTES_WRITE;
- int MaxClient = 50;
- int Verbosity;
- int Current;
- int process (char *md5, int latest);
- void settimer (int sig)
- {
- timer = 0;
- }
- struct hostent * init_sockaddr (struct sockaddr_in *name, char *host, unsigned int port)
- {
- struct hostent *h;
- unsigned short sp = port;
- memset (name, 0, sizeof (*name));
- name->sin_family = PF_INET;
- name->sin_port = htons (sp);
- h = gethostbyname (host);
- if (h == (struct hostent *)0)
- {
- perror ("gethostbyname");
- return (struct hostent *)0;
- }
- name->sin_addr = *(struct in_addr *)h->h_addr;
- return h;
- }
- int my_connect (char *host, int port)
- {
- struct sockaddr_in client;
- struct hostent *h;
- int connection = socket (PF_INET, SOCK_STREAM, 0);
- if ((connection < 0) || ((h = init_sockaddr (&client, host, port)) == (struct hostent *)0))
- {
- perror ("socket||gethostbyname");
- return -1;
- }
- if (connect (connection, (struct sockaddr *) &client, sizeof (client)) < 0)
- {
- perror ("connect");
- return -1;
- }
- return connection;
- }
- int maxID=11286, maxCurrentID;
- unsigned int *maxBuf;
- unsigned int *fdBuf;
- int main(int argc, char **argv)
- {
- char *md5="13e7f8397575944c2ca1cb113531d758";
- int c;
- signal (SIGINT, settimer);
- while ((c = getopt (argc, argv, "vD:s:m:hc:T:t:p:")) != -1)
- {
- switch (c)
- {
- case 'D':
- DELAY = atoi (optarg);
- break;
- case 'v':
- Verbosity = 1;
- break;
- case 's': // channel id of start
- md5 = optarg;
- break;
- case 'm':
- maxID = atoi(optarg);
- break;
- case 'h':
- fprintf (stdout, "%s [-T timer_to_leave] [-D delay] [-s channelid] [-h] [-c numofclient] [-t spserver_ip] [-p port] [-m maxid] [-v]n", argv[0]);
- exit (0);
- case 'c':
- MaxClient = atoi (optarg);
- break;
- case 'T':
- alarm (atoi (optarg));
- signal (SIGALRM, settimer);
- break;
- case 't':
- CPSERVER = optarg;
- break;
- case 'p':
- P2P_PORT = atoi (optarg);
- break;
- default:
- fprintf (stdout, "%s [-T timer_to_leave] [-D delay] [-s channelid] [-h] [-c numofclient] [-t spserver_ip] [-p port] [-m maxid] [-v]n", argv[0]);
- exit (0);
- }
- }
- if (maxID > 0)
- {
- maxBuf = calloc (maxID, sizeof (int));
- fdBuf = calloc (maxID, sizeof (int));
- }
- fprintf (stdout, "maxID is %dn", maxID);
- signal (SIGPIPE, SIG_IGN);
- process (md5, 0);
- return 0;
- }
- int writeMessage (int sock, char *ptr)
- {
- int len = *(int *)ptr;
- if (write (sock, ptr, len) != len)
- return -1;
- TOTALW ++;
- BYTES_WRITE += len;
- return len;
- }
- struct sockInfo
- {
- int sockfd;
- int reged;
- int current;
- unsigned int delay;
- int status;
- int start;
- int len;
- char buffer[MAX_DATA];
- };
- void process_reader (struct sockInfo *sockpool, int i, int *isfull, fd_set *osock)
- {
- struct timeval tm;
- struct SPUpdate *s;
- struct Message *msg;
- int nlen, start, offset;
- nlen = read (sockpool[i].sockfd, sockpool[i].buffer+sockpool[i].start+sockpool[i].len, MAX_DATA - sockpool[i].len - sockpool[i].start);
- if (nlen < 0)
- {
- fprintf (stderr, "Error in recv from %d:%dn", i, sockpool[i].sockfd);
- FD_CLR (sockpool[i].sockfd, osock);
- close (sockpool[i].sockfd);
- sockpool[i].reged = -1;
- return;
- } else if (nlen == 0)
- {
- fprintf (stderr, "SP closed %d:%dn", i, sockpool[i].sockfd);
- FD_CLR (sockpool[i].sockfd, osock);
- close (sockpool[i].sockfd);
- sockpool[i].reged = -1;
- return;
- }
- BYTES_READ += nlen;
- sockpool[i].len += nlen;
- while (sockpool[i].len > 0)
- {
- msg = (struct Message *)(sockpool[i].buffer + sockpool[i].start);
- if (sockpool[i].len < sizeof (int) || sockpool[i].len < msg->len)
- break;
- switch (msg->type)
- {
- case P2P_RESPONSE:
- // sockpool[i].status = 0;
- if (*(int *)(msg->buffer+sizeof(int)) > 0)
- {
- TOTALDATA ++;
- TOTALDATASIZE += msg->len;
- sockpool[i].delay = DELAY;
- if (maxBuf)
- {
- maxBuf[*(int *)(msg->buffer)] = 2;
- fdBuf[*(int *)(msg->buffer)] = i;
- }
- } else
- {
- if (maxBuf)
- maxBuf[*(int *)(msg->buffer)] = 0;
- if (DELAY > 0)
- {
- tm.tv_sec = 0;
- tm.tv_usec = sockpool[i].delay;
- sockpool[i].delay = sockpool[i].delay * 2;
- select (1,NULL, NULL, NULL, &tm);
- }
- }
- break;
- case P2P_SPUPDATE:
- TOTALR ++;
- s = (struct SPUpdate *)(msg->buffer);
- if (maxID == 0)
- {
- maxCurrentID = s->maxBlockID;
- // maxBuf = calloc (maxID, sizeof (int));
- }
- break;
- }
- sockpool[i].len -= msg->len;
- sockpool[i].start += msg->len;
- }
- if (sockpool[i].len == 0) sockpool[i].start = 0;
- else if (sockpool[i].start > 0 && sockpool[i].start +sockpool[i].len > MAX_DATA/2)
- {
- for (start = 0, offset=sockpool[i].start; start<sockpool[i].len; start++, offset++)
- sockpool[i].buffer[start] = sockpool[i].buffer[offset];
- sockpool[i].start = 0;
- }
- }
- #define MAX_PUSH 5
- void process_writer (struct sockInfo *sockpool, int i, int *isfull, fd_set *osock)
- {
- int count;
- int blocks[MAX_PUSH];
- int numb = 0;
- int cur;
- char *buf;
- // if (sockpool[i].status != 0) return;
- if (maxID>0)
- {
- *isfull = 1;
- for (count=0,cur=Current; count<maxID && numb < MAX_PUSH; cur++, count++)
- {
- if (cur >= maxID) cur = 0;
- switch (maxBuf[cur])
- {
- case 0:
- blocks[numb] = cur;
- numb ++;
- break;
- case 1:
- *isfull = 0;
- break;
- }
- }
- if (count >= maxID)
- {
- // if (TOTALDATA == 9960) fprintf (stdout, "No request nown");
- return;
- }
- } else
- {
- if (sockpool[i].current + 1 <= maxCurrentID)
- cur=sockpool[i].current + 1;
- else return;
- }
- // sockpool[i].status = 1;
- Current = cur;
- // fprintf (stdout, "Request %d %d blockn", maxID, cur);
- UDPMsg.type = P2P_PUSHLIST;
- buf = UDPMsg.buffer;
- *(char *)buf = 0;
- buf += sizeof (char);
- if (maxID > 0)
- {
- *(char *)buf = numb;
- buf += sizeof (char);
- for (cur=0; cur<numb; cur++)
- {
- *(int *)buf = blocks[cur];
- buf += sizeof (int);
- maxBuf[blocks[cur]] = 1;
- fdBuf[blocks[cur]] = i;
- }
- } else
- {
- *(char *)buf = 1;
- buf += sizeof (char);
- *(int *)buf = cur;
- buf += sizeof (int);
- }
- *(char *)buf = 0;
- buf += sizeof (char);
- UDPMsg.len = buf - UDPMsg.buffer + sizeof (int)+sizeof(char);
- if (writeMessage (sockpool[i].sockfd, (char *)&UDPMsg) < 0)
- {
- fprintf (stdout, "Cannot write to sock %d, block id is %dn", sockpool[i].sockfd, cur);
- sockpool[i].reged = -1;
- }
- sockpool[i].current = cur;
- *isfull = 0;
- }
- int process (char *md5, int latest)
- {
- struct timeval tmnew;
- fd_set wsock, rsock, osock;
- int nread, i, maxsock = 0;
- struct sockInfo *sockpool = calloc (MaxClient, sizeof (struct sockInfo));
- char *buf;
- int cur;
- int isfull = 0;
- long long begin, now;
- double elapsed=0;
- struct NormalAddress SPADDR;
- struct PeerInfoWithAddr myaddr;
- SPADDR.sin_family = AF_INET;
- SPADDR.sin_port = htons (SP4CP_PORT);
- SPADDR.sin_addr.s_addr = inet_addr (CPSERVER);
- memset (&myaddr, 0, sizeof (myaddr));
- myaddr.b.outerIP.sin_family = AF_INET;
- myaddr.b.outerIP.sin_port = htons (SP4CP_PORT);
- myaddr.b.outerIP.sin_addr.s_addr = inet_addr ("166.111.215.87");
- FD_ZERO (&osock);
- for (i=0; i<MaxClient; i++)
- {
- if ((sockpool[i].sockfd = my_connect (CPSERVER, P2P_PORT)) < 0)
- {
- fprintf (stderr, "Error in my_connect %d.n", i);
- exit (1);
- }
- if (sockpool[i].sockfd > maxsock)
- maxsock = sockpool[i].sockfd;
- sockpool[i].reged = -1;
- memset (&UDPMsg, 0, sizeof (struct Message));
- UDPMsg.type = P2P_HELLO;
- buf = UDPMsg.buffer + sizeof (float);
- memcpy (buf, md5, MD5_LEN);
- buf += MD5_LEN;
- *(unsigned char *)buf = 0;
- buf += sizeof (char);
- memcpy (buf, &myaddr, sizeof (myaddr));
- buf += sizeof (myaddr);
- *(unsigned char *)buf = 1;
- buf += sizeof (char);
- memcpy (buf, &SPADDR, sizeof (struct NormalAddress));
- buf += sizeof (SPADDR);
- UDPMsg.len = buf - UDPMsg.buffer + sizeof (int) + sizeof(char);
- if (writeMessage (sockpool[i].sockfd, (char *)&UDPMsg) >= 0)
- {
- sockpool[i].current = -1;
- sockpool[i].start = 0;
- sockpool[i].len = 0;
- sockpool[i].reged = 0;
- } else
- {
- fprintf (stdout, "Cannot write register info to sock %d", sockpool[i].sockfd);
- }
- }
- gettimeofday (&tmnew, NULL);
- begin = ((long long)tmnew.tv_sec)*1000000l + tmnew.tv_usec;
- while (isfull == 0 && timer == 1)
- {
- FD_ZERO (&rsock);
- FD_ZERO (&wsock);
- for (i=0; i<MaxClient; i++)
- {
- if (sockpool[i].reged >= 0)
- {
- FD_SET (sockpool[i].sockfd, &rsock);
- // if (sockpool[i].status == 0)
- FD_SET (sockpool[i].sockfd, &wsock);
- }
- }
- if ((nread = select (maxsock+1, &rsock, &wsock, NULL, NULL)) <= 0)
- continue;
- for (i=0; i<MaxClient; i++)
- {
- if (FD_ISSET (sockpool[i].sockfd, &rsock))
- process_reader (sockpool, i, &isfull, &osock);
- if (sockpool[i].reged >= 0 && sockpool[i].sockfd > 0 && FD_ISSET (sockpool[i].sockfd, &wsock))
- process_writer (sockpool, i, &isfull, &osock);
- }
- if (Verbosity)
- {
- gettimeofday (&tmnew, NULL);
- now = ((long long)tmnew.tv_sec) * 1000000l + tmnew.tv_usec;
- if (now-begin > elapsed + 2000000l)
- {
- elapsed = now - begin;
- fprintf (stdout, "%lld: total %lld write, %lld read, %lld bytes write, %lld bytes read, %lld blocks and %lld data.n", now-begin, TOTALW, TOTALR, BYTES_WRITE, BYTES_READ, TOTALDATA, TOTALDATASIZE);
- fprintf (stdout, "avg read packets/s: %f; avg write packets/s: %fnavg read Mb/s: %f; avg write Mb/s %fntotal packets/s: %f; total Mb/s %f.n", TOTALR/(elapsed/1000000), TOTALW/(elapsed/1000000), BYTES_READ*8/elapsed, BYTES_WRITE*8/elapsed, (TOTALR+TOTALW)/(elapsed/1000000), (BYTES_READ+BYTES_WRITE)*8/elapsed);
- }
- }
- }
- gettimeofday (&tmnew, NULL);
- now = ((long long)tmnew.tv_sec) * 1000000l + tmnew.tv_usec;
- elapsed = now - begin;
- fprintf (stdout, "%lld: total %lld write, %lld read, %lld bytes write, %lld bytes read, %lld blocks and %lld data.n", now-begin, TOTALW, TOTALR, BYTES_WRITE, BYTES_READ, TOTALDATA, TOTALDATASIZE);
- fprintf (stdout, "avg read packets/s: %f; avg write packets/s: %fnavg read Mb/s: %f; avg write Mb/s %fntotal packets/s: %f; total Mb/s %f.n", TOTALR/(elapsed/1000000), TOTALW/(elapsed/1000000), BYTES_READ*8/elapsed, BYTES_WRITE*8/elapsed, (TOTALR+TOTALW)/(elapsed/1000000), (BYTES_READ+BYTES_WRITE)*8/elapsed);
- if (maxID>0)
- {
- for (cur=0; cur<maxID; cur++)
- {
- if (maxBuf[cur] == 1 || maxBuf[cur] == 0)
- fprintf(stdout, "(%d %d %d)t", cur, maxBuf[cur], fdBuf[cur]);
- }
- fprintf(stdout, "n");
- }
- for (i=0; i<MaxClient; i++)
- {
- if (sockpool[i].reged >= 0)
- close (sockpool[i].sockfd);
- }
- return 0;
- }