zserver.cpp
上传用户:dzyhzl
上传日期:2019-04-29
资源大小:56270k
文件大小:13k
源码类别:

模拟服务器

开发平台:

C/C++

  1. #ifdef WIN32
  2. #include <winsock2.h>
  3. #define socklen_t int
  4. #endif
  5. #include "IServer.h"
  6. #include "Cipher.h"
  7. #include "KSG_EncodeDecode.h"
  8. #include <time.h>
  9. #include <string.h>
  10. #include <errno.h>
  11. #define MAX_SIGNAL 20
  12. #define SIG_SENDDATA SIGRTMIN + MAX_SIGNAL + 1
  13. static unsigned gs_holdrand = time(NULL);
  14. static inline unsigned _Rand() {
  15.     gs_holdrand = gs_holdrand * 214013L + 2531011L;
  16.     return gs_holdrand;
  17. }
  18. static inline void RandMemSet(int nSize, unsigned char *pbyBuffer) {
  19.     while (nSize--) {
  20.         *pbyBuffer++ = (unsigned char)_Rand();
  21.     }
  22. }
  23. int ZListenThread::action() {
  24. int index;
  25. while(true) {
  26. //响应请求-----------------
  27. // SOCKET s = WSAAccept(parent->listen_socket, NULL, NULL, NULL, 0);
  28. struct sockaddr client_addr;
  29. socklen_t length = sizeof(client_addr);
  30. SOCKET s = accept(parent->listen_socket, &client_addr, &length);
  31. if (s == INVALID_SOCKET)
  32. {
  33. #ifdef WIN32
  34. Sleep(10);
  35. #else
  36. usleep(10);
  37. #endif
  38. continue;
  39. }
  40. //分配一个连接的资源
  41. index = parent->getConnection(s);
  42. if(index== -1) {
  43. printf("server max connection reached, client attempt fail.n");
  44. closesocket(s);
  45. }
  46. else {
  47. //平台相关,创建完成端口或者事件
  48. #ifdef WIN32
  49. CreateIoCompletionPort((HANDLE)s, parent->CompletionPort, (DWORD)&parent->clients[index], 0);
  50. #else
  51.   fcntl(s, F_SETSIG, SIGRTMIN + (index % MAX_SIGNAL));
  52.   fcntl(s, F_SETOWN, parent->pids[index % parent->max_thread]);
  53.   int flags = fcntl(s, F_GETFL);
  54.   flags |= O_NONBLOCK | O_ASYNC;
  55.   fcntl(s, F_SETFL, flags);
  56. //  fcntl(s, F_SET
  57. #endif
  58. //发送key值到客户端
  59. memcpy(&parent->clients[index].addr, &client_addr, length);
  60. parent->clients[index].server_key = _Rand();
  61. parent->clients[index].client_key = _Rand();
  62. ACCOUNT_BEGIN account_begin;
  63.             RandMemSet(sizeof(account_begin), (unsigned char *)&account_begin);
  64. account_begin.ProtocolType = CIPHER_PROTOCOL_TYPE;
  65. account_begin.Mode         = 0;
  66. account_begin.ServerKey    = ~parent->clients[index].server_key;
  67. account_begin.ClientKey    = ~parent->clients[index].client_key;
  68. parent->PackDataToClient(index, &account_begin, sizeof(account_begin));
  69. int length;
  70. parent->buffer->sendPacket(parent->clients[index].write_index, length);
  71. parent->sendData(&parent->clients[index]);
  72. #ifdef WIN32
  73. parent->receiveData(&parent->clients[index]);
  74. #endif
  75. if(parent->call_back)
  76. parent->call_back(parent->parameter, index, enumClientConnectCreate); //notify
  77. printf("client [%d] created. (%u, %u)n", index, parent->clients[index].server_key, parent->clients[index].client_key);
  78. }
  79. }
  80. return 0;
  81. }
  82. // flying add this functino
  83. // Data shoule be sent in a thread, so that the main-process will not be blocked.
  84. int do_sendData(IServer* server, client_info* client) {
  85. int data_length;
  86. char *data_buffer = server->buffer->getSendData(client->write_index, data_length);
  87. if(!data_buffer) return -1;
  88. int nRetVal = send(client->sock, data_buffer, data_length, 0);
  89. if(nRetVal != -1) {
  90. server->buffer->sendData(client->write_index, nRetVal);
  91. if(nRetVal != data_length) {
  92. server->sendData(client);
  93. }
  94. return 1;
  95. }
  96. else {
  97. printf("send error %dn", errno);
  98. if(errno == EAGAIN) {
  99. server->sendData(client);
  100. }
  101. else {
  102. printf("sending failed....close connection [%d].n", client->index);
  103. server->closeConnection(client);
  104. }
  105. }
  106. return 1;
  107. }
  108. //实际的工作线程,平台相关
  109. #ifdef WIN32
  110. DWORD WINAPI work_thread(LPVOID id) {
  111. #else
  112. void *work_thread(LPVOID id) {
  113. #endif
  114. IServer *parent = (IServer *)id;
  115. #ifdef WIN32
  116. DWORD transferred;
  117. LPOVERLAPPED overlapped;
  118. DWORD flags;
  119. DWORD dwIoSize;
  120. client_info *client;
  121. while(true) {
  122. bool bIORet = GetQueuedCompletionStatus(parent->CompletionPort, &dwIoSize, (LPDWORD) &client, &overlapped, INFINITE);
  123. DWORD dwIOError = GetLastError();
  124. if(!bIORet && dwIOError != WAIT_TIMEOUT ) {
  125. closesocket(client->sock);
  126. parent->buffer->clear(client->read_index);
  127. parent->buffer->clear(client->write_index);
  128. client->sock = INVALID_SOCKET;
  129. if(parent->call_back)
  130. parent->call_back(parent->parameter, client->index, enumClientConnectClose); //notify
  131. continue;
  132. }
  133. if(overlapped == &client->write_overlapped) {
  134. if(dwIoSize > 0) {
  135. parent->buffer->sendData(client->write_index, dwIoSize);
  136. }
  137. parent->sendData(client);
  138. }
  139. else {
  140. if(dwIoSize > 0) {
  141. parent->buffer->receiveData(client->read_index, (const char *)client->m_byInBuffer, dwIoSize);
  142. }
  143. parent->receiveData(client);
  144. }
  145. }
  146. #else
  147.   siginfo_t siginfo;
  148.   int thread_index = parent->current_thread++;
  149.   parent->pids[thread_index] = getpid();
  150.   printf("thread %d startn", thread_index);
  151.   while(true) {
  152.     int signum = sigwaitinfo(&parent->sigsets[thread_index], &siginfo);
  153.     if(signum == -1) break;
  154.     if(signum == SIGIO) {
  155.       printf("overflown");
  156.     }
  157. // insert this branch to send data out
  158. if (signum == SIG_SENDDATA) {
  159. client_info* client = (client_info*)siginfo.si_value.sival_ptr;
  160. do_sendData(parent, client);
  161. }
  162.     else if(signum >= SIGRTMIN) {
  163. client_info *client;
  164. int index = signum - SIGRTMIN;
  165. while(index < parent->max_client) {
  166. client = &parent->clients[index];
  167. if(client->sock == siginfo.si_fd) {
  168. if(siginfo.si_band & 1) { //有等待读入的数据
  169. parent->receiveData(client);
  170. }
  171. break;
  172. }
  173. index += MAX_SIGNAL;
  174. }
  175.     }
  176.   }
  177. #endif
  178. }
  179. void IServer::closeConnection(client_info *client) {
  180. if(client->sock == INVALID_SOCKET) return;
  181. closesocket(client->sock);
  182. buffer->clear(client->read_index);
  183. buffer->clear(client->write_index);
  184. client->sock = INVALID_SOCKET;
  185. if(call_back) call_back(parameter, client->index, enumClientConnectClose);
  186. }
  187. bool IServer::sendData(client_info *client) {
  188. //int nRetryTime = 0;
  189. //平台相关代码,实际发出重叠的IO请求,发送数据---------------------------------------
  190. int size;
  191. #ifdef WIN32
  192. client->m_wsaOutBuffer.buf = buffer->getSendData(client->write_index, (int &)client->m_wsaOutBuffer.len);
  193. if(client->m_wsaOutBuffer.buf) {
  194. ULONG ulFlags = MSG_PARTIAL;
  195. ZeroMemory(&client->write_overlapped, sizeof(OVERLAPPED));
  196. int nRetVal = WSASend(client->sock, &client->m_wsaOutBuffer, 1, &client->m_wsaOutBuffer.len, 
  197. ulFlags, &client->write_overlapped, NULL);
  198. if ( nRetVal == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING ) { //出现错误
  199. closeConnection(client);
  200. }
  201. else return true;
  202. }
  203. #else
  204. // kill a signal
  205. union sigval info;
  206. info.sival_ptr = (void*)client;
  207. sigqueue(pids[client->index % max_thread], SIG_SENDDATA, info);
  208. #endif
  209. return false;
  210. }
  211. bool IServer::receiveData(client_info *client) {
  212. //平台相关代码,实际发出重叠的IO请求,接收数据---------------------------------------
  213. #ifdef WIN32
  214. ULONG ulFlags = MSG_PARTIAL;
  215. DWORD dwIoSize;
  216. client->m_wsaInBuffer.buf = (char *)client->m_byInBuffer;
  217. client->m_wsaInBuffer.len = MAX_IN_BUFFER;
  218. UINT nRetVal = WSARecv(client->sock, &client->m_wsaInBuffer, 1, &dwIoSize, &ulFlags, &client->read_overlapped, NULL);
  219. if(nRetVal == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING) {
  220.     printf("receive data pending? close connection [%d].n", client->index);
  221. closeConnection(client);
  222. return false;
  223. }
  224. #else
  225.   int ret = recv(client->sock, client->m_byInBuffer, MAX_IN_BUFFER, 0);
  226.   if(ret <= 0 && errno != EWOULDBLOCK) {
  227.     printf("receive data pending? close connection [%d].n", client->index);
  228. closeConnection(client);
  229.     return false;
  230.   }
  231.   buffer->receiveData(client->read_index, (const char *)client->m_byInBuffer, ret);
  232.   #endif
  233.   return true;
  234. }
  235. IServer::IServer(int number, int thread_number) {
  236. buffer = new ZBuffer(number * 800000, number * 2);
  237. clients = new client_info[number];
  238. max_client = number;
  239. int index;
  240. for(index = 0; index < max_client; index++) {
  241. clients[index].sock = INVALID_SOCKET;
  242. clients[index].index = index;
  243. }
  244. call_back = NULL;
  245. max_thread = thread_number;
  246. current_thread = 0;
  247. pids = new int[max_thread];
  248. sigsets = new sigset_t[max_thread];
  249. listen_thread = new ZListenThread(this);
  250. }
  251. IServer::~IServer() {
  252. listen_thread->stop();
  253. delete[] pids;
  254. delete[] sigsets;
  255. delete listen_thread;
  256. delete[] clients;
  257. delete buffer;
  258. }
  259. int IServer::getConnection(SOCKET sock) {
  260. int index;
  261. for(index = 0; index < max_client; index++) {
  262. if(clients[index].sock == INVALID_SOCKET) {
  263. clients[index].sock = sock;
  264. clients[index].read_index = buffer->getConnection();
  265. clients[index].write_index = buffer->getConnection();
  266. return index;
  267. }
  268. }
  269. return -1;
  270. }
  271. int init_socket = 0;
  272. int IServer::Startup() {
  273. if(!init_socket) {
  274. #ifdef WIN32
  275. WSADATA wsa_data;
  276. WSAStartup(MAKEWORD(1, 1), &wsa_data);
  277. #else
  278. int index;
  279. int signal_number;
  280. for(index = 0; index < max_thread; index++) {
  281. sigemptyset(&sigsets[index]);
  282. sigaddset(&sigsets[index], SIGIO);
  283. sigaddset(&sigsets[index], SIG_SENDDATA);
  284. signal_number = SIGRTMIN + index;
  285. while(signal_number < SIGRTMIN + MAX_SIGNAL) {
  286. sigaddset(&sigsets[index], signal_number);
  287. signal_number += max_thread;
  288. }
  289. sigprocmask(SIG_BLOCK, &sigsets[index], NULL);
  290. }
  291. #endif
  292. init_socket++;
  293. }
  294. return 0;
  295. }
  296. int IServer::Cleanup() {
  297. if(init_socket--) {
  298. #ifdef WIN32
  299. WSACleanup();
  300. #else
  301. int index;
  302. for(index = 0; index < max_thread; index++) {
  303. sigprocmask(SIG_UNBLOCK, &sigsets[index], NULL);
  304. }
  305. #endif
  306. }
  307. return 0;
  308. }
  309. int IServer::Release() {
  310. return 0;
  311. }
  312. int IServer::Open(const unsigned long &ulnAddressToListenOn, const unsigned short &usnPortToListenOn) {
  313. //在此创建套接字,绑定,启动侦听线程
  314. #ifdef WIN32
  315. CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
  316. int index;
  317. DWORD thread_id;
  318. for(index = 0; index < max_thread; index++) {
  319. HANDLE hThread;
  320. hThread = CreateThread(NULL, 0, work_thread, this, 0, &thread_id);
  321. CloseHandle(hThread);
  322. }
  323. listen_socket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
  324. #else
  325.     listen_socket = socket(AF_INET, SOCK_STREAM, 0);
  326.     pthread_t work;
  327. int index;
  328. for(index = 0; index < max_thread; index++) {
  329. int ret = pthread_create(&work, NULL, work_thread, this);
  330. }
  331. #endif
  332. if(listen_socket == INVALID_SOCKET) return 0;
  333. struct sockaddr_in addr;
  334. memset((void*)&addr, 0, sizeof(addr));
  335. addr.sin_family = AF_INET;
  336. addr.sin_port = htons(usnPortToListenOn);
  337. addr.sin_addr.s_addr = INADDR_ANY;
  338. if(bind(listen_socket, (const sockaddr *)&addr, sizeof(addr))) {
  339. return 0;
  340. }
  341. if(listen(listen_socket, 5)) return 0;
  342. listen_thread->start();
  343. return 1;
  344. }
  345. int IServer::CloseService() {
  346. closesocket(listen_socket);
  347. return 0;
  348. }
  349. int IServer::RegisterMsgFilter(void * lpParam, CALLBACK_SERVER_EVENT pfnEventNotify) {
  350. call_back = pfnEventNotify;
  351. parameter = lpParam;
  352. return 0;
  353. }
  354. int IServer::PreparePackSink() { //准备开始数据封装
  355. return 0;
  356. }
  357. int IServer::PackDataToClient(const unsigned long &ulnClientID, const void * pData, unsigned long datalength) {
  358. if(clients[ulnClientID].sock != INVALID_SOCKET) {
  359. buffer->packData(clients[ulnClientID].write_index, (const char *)pData, datalength);
  360. }
  361. return 0;
  362. }
  363. int IServer::SendPackToClient() {
  364. int index;
  365. for(index = 0; index < max_client; index++) {
  366. if(clients[index].sock != INVALID_SOCKET) {
  367. int length;
  368. char *data = buffer->sendPacket(clients[index].write_index, length);
  369. if(data && clients[index].server_key) {
  370. KSG_EncodeBuf(length, (unsigned char *)data, (unsigned int *)&clients[index].server_key);
  371. sendData(&clients[index]);
  372. }
  373. return length;
  374. }
  375. }
  376. return 0;
  377. }
  378. int IServer::SendPackToClient(int index) {
  379. if (index < 0)
  380. {
  381. return SendPackToClient();
  382. }
  383. if(clients[index].sock != INVALID_SOCKET) {
  384. int length;
  385. char *data = buffer->sendPacket(clients[index].write_index, length);
  386. if(data) {
  387. KSG_EncodeBuf(length, (unsigned char *)data, (unsigned int *)&clients[index].server_key);
  388. sendData(&clients[index]);
  389. }
  390. return length;
  391. }
  392. return 0;
  393. }
  394. int IServer::SendData(const unsigned long &ulnClientID, const void * pData, unsigned long datalength) {
  395. PackDataToClient(ulnClientID, pData, datalength);
  396. SendPackToClient(ulnClientID);
  397. //发送一个消息
  398. return 0;
  399. }
  400. const void *IServer::GetPackFromClient(unsigned long ulnClientID, unsigned int &datalength) {
  401. if(clients[ulnClientID].sock != INVALID_SOCKET) {
  402. char *data = buffer->getPacket(clients[ulnClientID].read_index, (int &)datalength);
  403. if(data) {
  404. KSG_DecodeBuf(datalength, (unsigned char *)data, (unsigned int *)&clients[ulnClientID].client_key);
  405. return data;
  406. }
  407. }
  408. return NULL; 
  409. }
  410. int IServer::ShutdownClient(const unsigned long &ulnClientID) { 
  411. if(clients[ulnClientID].sock != INVALID_SOCKET) {
  412. closesocket(clients[ulnClientID].sock);
  413. buffer->clear(clients[ulnClientID].read_index);
  414. buffer->clear(clients[ulnClientID].write_index);
  415. clients[ulnClientID].sock = INVALID_SOCKET;
  416. printf("shut down client [%d]n", ulnClientID); 
  417. if(call_back)
  418. call_back(parameter, ulnClientID, enumClientConnectClose); //notify
  419. }
  420. return 0; 
  421. }