zserver.cpp
上传用户:dzyhzl
上传日期:2019-04-29
资源大小:56270k
文件大小:19k
源码类别:

模拟服务器

开发平台:

C/C++

  1. #ifdef WIN32
  2. #include <winsock2.h>
  3. #define socklen_t int
  4. #endif
  5. #include "IServer.h"
  6. #include "Cipher.h"
  7. #include "KSG_EncodeDecode.h"
  8. #include <time.h>
  9. #include <string.h>
  10. #include <errno.h>
  11. #include <stdlib.h>
  12. #define MAX_SIGNAL 16
  13. #define SIG_SENDDATA SIGRTMIN + MAX_SIGNAL + 1
  14. static unsigned gs_holdrand = time(NULL);
  15. static inline unsigned _Rand() {
  16.     gs_holdrand = gs_holdrand * 214013L + 2531011L;
  17.     return gs_holdrand;
  18. }
  19. static inline void RandMemSet(int nSize, unsigned char *pbyBuffer) {
  20.     while (nSize--) {
  21.         *pbyBuffer++ = (unsigned char)_Rand();
  22.     }
  23. }
  24. int do_sendData(IServer* server, client_info* client);
  25. void JustDoIt(IServer* pIServer, client_info* client)
  26. {
  27. if (client->recv_flag)
  28. {
  29. client->recv_flag = false;
  30. pIServer->receiveData(client);
  31. }
  32. if (client->state == STATE_SENDING)
  33. {
  34. do_sendData(pIServer, client);
  35. }
  36. }
  37. void DoEveryThing(IServer* pIServer)
  38. {
  39. for (int i = 0; i < pIServer->max_client; i++)
  40. {
  41. client_info& client = pIServer->clients[i];
  42. if (client.sock != INVALID_SOCKET)
  43. JustDoIt(pIServer, &client);
  44. }
  45. }
  46. int ZListenThread::action() {
  47. int index;
  48. while(true) {
  49. //响应请求-----------------
  50. struct sockaddr client_addr;
  51. socklen_t length = sizeof(client_addr);
  52. SOCKET s = accept(parent->listen_socket, &client_addr, &length);
  53. if(s == INVALID_SOCKET) continue;
  54. //分配一个连接的资源
  55. index = parent->getConnection(s);
  56. if(index== -1) {
  57. printf("server max connection reached, client attempt fail.n");
  58. closesocket(s);
  59. }
  60. else {
  61. //平台相关,创建完成端口或者事件
  62. #ifdef WIN32
  63. CreateIoCompletionPort((HANDLE)s, parent->CompletionPort, (DWORD)&parent->clients[index], 0);
  64. #else
  65. fcntl(s, F_SETSIG, SIGRTMIN);
  66. fcntl(s, F_SETOWN, parent->pid);
  67. int flags = fcntl(s, F_GETFL);
  68. flags |= O_NONBLOCK | O_ASYNC;
  69. fcntl(s, F_SETFL, flags);
  70. fcntl(s, F_SETAUXFL, O_ONESIGFD);
  71. #endif
  72. //发送key值到客户端
  73. client_info* client = &parent->clients[index];
  74. //client->mutex.lock();
  75. memcpy(&client->addr, &client_addr, length);
  76. client->server_key = _Rand();
  77. client->client_key = _Rand();
  78. #pragma pack(1)
  79. struct {
  80. unsigned short uSize;
  81. ACCOUNT_BEGIN ab;
  82. }account_begin;
  83. #pragma pack()
  84.             RandMemSet(sizeof(account_begin), (unsigned char *)&account_begin);
  85. account_begin.uSize = sizeof(ACCOUNT_BEGIN);
  86. account_begin.ab.ProtocolType = CIPHER_PROTOCOL_TYPE;
  87. account_begin.ab.Mode         = 0;
  88. account_begin.ab.ServerKey    = ~client->server_key;
  89. account_begin.ab.ClientKey    = ~client->client_key;
  90. if (sizeof(account_begin) != send(client->sock, &account_begin, sizeof(account_begin), 0))
  91. {
  92. closesocket(client->sock);
  93. client->state = STATE_IDLE;
  94. printf("client [%d] %d create failed.n", index, client->sock);
  95. client->sock = INVALID_SOCKET;
  96. }
  97. else
  98. {
  99. #ifdef WIN32
  100. client->recv_flag = true;
  101. #endif
  102. client->conn_flag = true;
  103. printf("client [%d] %d created. (%u, %u)n", index, client->sock, client->server_key, client->client_key);
  104. }
  105. //client->mutex.unlock();
  106. }
  107. }
  108. return 0;
  109. }
  110. #ifndef WIN32
  111. // flying add this functino
  112. // Data shoule be sent in a thread, so that the main-process will not be blocked.
  113. int do_sendData(IServer* server, client_info* client) {
  114. if (client->state == STATE_CLOSING)
  115. return -1;
  116. int data_length;
  117. char *data_buffer = client->buffer->getSendData(data_length);
  118. if(!data_buffer) {
  119. client->state = STATE_IDLE;
  120. return -1;
  121. }
  122. int nRetVal = send(client->sock, data_buffer, data_length, 0);
  123. if(nRetVal != -1) {
  124. client->buffer->sendData(nRetVal);
  125. if(nRetVal != data_length) { //发送的数据不够再次发送
  126. server->sendData(client);
  127. }
  128. else client->state = STATE_IDLE;
  129. return 1;
  130. }
  131. else {
  132. if (errno == EAGAIN || errno == EWOULDBLOCK) {
  133. client->state = STATE_BLOCKED;
  134. }
  135. else {
  136. printf("sending failed %d....close connection [%d].n", errno, client->index);
  137. client->state = STATE_CLOSING;
  138. return 0;
  139. }
  140. }
  141. return 1;
  142. }
  143. #endif //WIN32
  144. //实际的工作线程,平台相关
  145. #ifdef WIN32
  146. DWORD WINAPI sig_thread(LPVOID id) {
  147. #else
  148. void *sig_thread(LPVOID id) {
  149. #endif
  150. IServer *parent = (IServer *)id;
  151. #ifdef WIN32
  152. DWORD transferred;
  153. LPOVERLAPPED overlapped;
  154. DWORD flags;
  155. DWORD dwIoSize;
  156. client_info *client;
  157. while(true) {
  158. bool bIORet = GetQueuedCompletionStatus(parent->CompletionPort, &dwIoSize, (LPDWORD) &client, &overlapped, INFINITE);
  159. DWORD dwIOError = GetLastError();
  160. if(!bIORet && dwIOError != WAIT_TIMEOUT ) {
  161. closesocket(client->sock);
  162. client->buffer->clear();
  163. client->sock = INVALID_SOCKET;
  164. if(parent->call_back)
  165. parent->call_back(parent->parameter, client->index, enumClientConnectClose); //notify
  166. continue;
  167. }
  168. if(overlapped == &client->write_overlapped) {
  169. if(dwIoSize > 0) {
  170. client->buffer->sendData(dwIoSize);
  171. }
  172. parent->sendData(client);
  173. }
  174. else {
  175. if(dwIoSize > 0) {
  176. client->buffer->receiveData(dwIoSize);
  177. }
  178. parent->receiveData(client);
  179. }
  180. }
  181. #else
  182.   siginfo_t siginfo;
  183.   sigset_t sset;
  184.   parent->pid = getpid();
  185.   printf("signal thread startn");
  186. sigemptyset(&sset);
  187. sigaddset(&sset, SIGIO);
  188. sigaddset(&sset, SIGRTMIN);
  189. pthread_sigmask(SIG_BLOCK, &sset, NULL);
  190.   while(true) {
  191.     int signum = sigwaitinfo(&sset, &siginfo);
  192.     if(signum == -1)
  193. {
  194. printf("fatal signum in signal thread ...errno = %dn", errno);
  195. continue;
  196. }
  197.     if(signum == SIGIO) {
  198.   //client_info* client = (client_info*)siginfo.si_value.sival_ptr;
  199.       printf("overflow .... n");
  200.   exit(-2);
  201.     }
  202. /*if (signum == SIG_SENDDATA) {
  203. client_info* client = (client_info*)siginfo.si_value.sival_ptr;
  204. if(client->state == STATE_SENDING) {
  205. do_sendData(parent, client);
  206. }
  207. }
  208.     else*/ if(signum == SIGRTMIN) {
  209. client_info *client;
  210. int index = 0;
  211. while(index < parent->max_client) {
  212. client = &parent->clients[index];
  213. if(client->sock == siginfo.si_fd) {
  214. //client->mutex.lock();
  215. if (client->state != STATE_CLOSING)
  216. {
  217. if (siginfo.si_band & 0x01) { //有等待读入的数据
  218. client->recv_flag = true;
  219. //parent->receiveData(client);
  220. }
  221. if (siginfo.si_band & 0x04) {
  222. if(client->state == STATE_BLOCKED) {
  223. client->state = STATE_IDLE;//STATE_SENDING;
  224. //parent->sendData(client);
  225. }
  226. }
  227. else if (!(siginfo.si_band & 0x01))
  228. {
  229. printf("signal notify close connection %dn", client->index);
  230. client->state = STATE_CLOSING;
  231. }
  232. }
  233. //client->mutex.unlock();
  234. break;
  235. }
  236. index++;
  237. }
  238.     }
  239.   }
  240. sigprocmask(SIG_UNBLOCK, &sset, NULL);
  241. #endif
  242. }
  243. void IServer::closeConnection(client_info *client) {
  244. if(client->sock == INVALID_SOCKET) return;
  245. closesocket(client->sock);
  246. client->buffer->clear();
  247. client->state = STATE_IDLE;
  248. client->conn_flag = false;
  249. client->recv_flag = false;
  250. client->sock = INVALID_SOCKET;
  251. }
  252. bool IServer::sendData(client_info *client) {
  253. //int nRetryTime = 0;
  254. //平台相关代码,实际发出重叠的IO请求,发送数据---------------------------------------
  255. int size;
  256. #ifdef WIN32
  257. client->m_wsaOutBuffer.buf = client->buffer->getSendData((int &)client->m_wsaOutBuffer.len);
  258. if(client->m_wsaOutBuffer.buf) {
  259. ULONG ulFlags = MSG_PARTIAL;
  260. ZeroMemory(&client->write_overlapped, sizeof(OVERLAPPED));
  261. int nRetVal = WSASend(client->sock, &client->m_wsaOutBuffer, 1, &client->m_wsaOutBuffer.len, 
  262. ulFlags, &client->write_overlapped, NULL);
  263. if ( nRetVal == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING ) { //出现错误
  264. closeConnection(client);
  265. }
  266. else return true;
  267. }
  268. #else
  269. if(client->state == STATE_IDLE) {
  270. /* union sigval info;
  271. info.sival_ptr = (void*)client;
  272. int nRet = 0;*/
  273. client->state = STATE_SENDING;
  274. /*if (0 != (nRet = sigqueue(pids[client->index % max_thread], SIG_SENDDATA, info))) {
  275. printf("sigqueue failed...%dn", nRet);
  276. }*/
  277. }
  278. #endif
  279. return false;
  280. }
  281. bool IServer::receiveData(client_info *client) {
  282. //平台相关代码,实际发出重叠的IO请求,接收数据---------------------------------------
  283. #ifdef WIN32
  284. ULONG ulFlags = MSG_PARTIAL;
  285. DWORD dwIoSize;
  286. int bufsize = 0;
  287. client->m_wsaInBuffer.buf = (char *)client->buffer->getReceiveBuffer(bufsize);
  288. client->m_wsaInBuffer.len = bufsize;
  289. if (!client->m_wsaInBuffer.buf)
  290. {
  291. printf("receive buffer full...close connection [%d].n", client->index);
  292. closeConnection(client);
  293. return false;
  294. }
  295. UINT nRetVal = WSARecv(client->sock, &client->m_wsaInBuffer, 1, &dwIoSize, &ulFlags, &client->read_overlapped, NULL);
  296. if(nRetVal == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING) {
  297.     printf("receive data pending? close connection [%d].n", client->index);
  298. closeConnection(client);
  299. return false;
  300. }
  301. #else
  302.   int bufsize = 0;
  303.   if (client->state == STATE_CLOSING)
  304.   return false;
  305.   char* recv_buffer = client->buffer->getReceiveBuffer(bufsize);
  306. if (!recv_buffer)
  307. {
  308. printf("receive buffer full...close connection [%d].n", client->index);
  309. client->state = STATE_CLOSING;
  310. return false;
  311. }
  312.   int ret = recv(client->sock, recv_buffer, bufsize, 0);
  313.   if(ret <= 0) {
  314. if (errno != EAGAIN)
  315. {
  316.     printf("receive data pending? close connection [%d].n", client->index);
  317. client->state = STATE_CLOSING;
  318. }
  319.     return false;
  320.   }
  321.   client->buffer->receiveData(ret);
  322. #endif
  323.   return true;
  324. }
  325. IServer::IServer(int number, int thread_number, int max_send, int max_receive) {
  326. clients = new client_info[number];
  327. max_client = number;
  328. int index;
  329. for(index = 0; index < max_client; index++) {
  330. clients[index].buffer = new ZBuffer(max_send, max_receive);
  331. clients[index].buffer->startPerf(&recvPerf, &sendPerf);
  332. clients[index].sock = INVALID_SOCKET;
  333. clients[index].index = index;
  334. }
  335. call_back = NULL;
  336. max_thread = thread_number;
  337. #ifndef WIN32
  338. //current_thread = 0;
  339. //pids = new int[max_thread];
  340. //sigsets = new sigset_t[max_thread];
  341. //send_threads = new ZSendThread[max_thread];
  342. //recv_threads = new ZRecvThread[max_thread];
  343. create_callback_thread = new ZCreateCallBackThread(this);
  344. close_callback_thread = new ZCloseCallBackThread(this);
  345. for (index = 0; index < max_thread; index++)
  346. {
  347. //send_threads[index].parent = this;
  348. //send_threads[index].index = index;
  349. //recv_threads[index].parent = this;
  350. //recv_threads[index].index = index;
  351. }
  352. #endif //WIN32
  353. listen_thread = new ZListenThread(this);
  354. }
  355. IServer::~IServer() {
  356. printf("destruct IServer....n");
  357. listen_thread->stop();
  358. int index;
  359. for (index = 0; index < max_thread; index++)
  360. {
  361. //send_threads[index].stop();
  362. //recv_threads[index].stop();
  363. }
  364. create_callback_thread->stop();
  365. close_callback_thread->stop();
  366. #ifndef WIN32
  367. //delete[] pids;
  368. //delete[] sigsets;
  369. //delete send_threads;
  370. //delete recv_threads;
  371. delete create_callback_thread;
  372. delete close_callback_thread;
  373. #endif //WIN32
  374. delete listen_thread;
  375. delete[] clients;
  376. }
  377. int IServer::getConnection(SOCKET sock) {
  378. int index;
  379. for(index = 0; index < max_client; index++) {
  380. //clients[index].mutex.lock();
  381. if(clients[index].sock == INVALID_SOCKET) {
  382. clients[index].sock = sock;
  383. //clients[index].mutex.unlock();
  384. return index;
  385. }
  386. //clients[index].mutex.unlock();
  387. }
  388. return -1;
  389. }
  390. int init_socket = 0;
  391. int IServer::Startup() {
  392. if(!init_socket) {
  393. #ifdef WIN32
  394. WSADATA wsa_data;
  395. WSAStartup(MAKEWORD(1, 1), &wsa_data);
  396. /*#else
  397. int index;
  398. int signal_number;
  399. for(index = 0; index < max_thread; index++) {
  400. sigemptyset(&sigsets[index]);
  401. sigaddset(&sigsets[index], SIGIO);
  402. //sigaddset(&sigsets[index], SIG_SENDDATA);
  403. signal_number = SIGRTMIN + index;
  404. while(signal_number < SIGRTMIN + MAX_SIGNAL) {
  405. sigaddset(&sigsets[index], signal_number);
  406. signal_number += max_thread;
  407. }
  408. pthread_sigmask(SIG_BLOCK, &sigsets[index], NULL);
  409. }*/
  410. #endif
  411. init_socket++;
  412. }
  413. return 0;
  414. }
  415. int IServer::Cleanup() {
  416. if(init_socket--) {
  417. #ifdef WIN32
  418. WSACleanup();
  419. #endif
  420. }
  421. return 0;
  422. }
  423. int IServer::Release() {
  424. return 0;
  425. }
  426. int IServer::Open(const unsigned long &ulnAddressToListenOn, const unsigned short &usnPortToListenOn) {
  427. //在此创建套接字,绑定,启动侦听线程
  428. #ifdef WIN32
  429. CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
  430. int index;
  431. DWORD thread_id;
  432. for(index = 0; index < max_thread; index++) {
  433. HANDLE hThread;
  434. hThread = CreateThread(NULL, 0, sig_thread, this, 0, &thread_id);
  435. CloseHandle(hThread);
  436. }
  437. listen_socket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
  438. #else
  439.     listen_socket = socket(AF_INET, SOCK_STREAM, 0);
  440.     pthread_t work;
  441. int index;
  442. //for(index = 0; index < max_thread; index++)
  443. {
  444. int ret = pthread_create(&work, NULL, sig_thread, this);
  445. if (ret == 0)
  446. pthread_detach(work);
  447. else
  448. return 0;
  449. }
  450. #endif
  451. if(listen_socket == INVALID_SOCKET) return 0;
  452. struct sockaddr_in addr;
  453. memset((void*)&addr, 0, sizeof(addr));
  454. addr.sin_family = AF_INET;
  455. addr.sin_port = htons(usnPortToListenOn);
  456. addr.sin_addr.s_addr = INADDR_ANY;
  457. if(bind(listen_socket, (const sockaddr *)&addr, sizeof(addr))) {
  458. return 0;
  459. }
  460. if(listen(listen_socket, 5)) return 0;
  461. listen_thread->start();
  462. create_callback_thread->start();
  463. close_callback_thread->start();
  464. #ifndef WIN32
  465. for (index = 0; index < max_thread; index++)
  466. {
  467. //send_threads[index].start();
  468. //recv_threads[index].start();
  469. }
  470. #endif
  471. printf("GameServer on Port %d started....n", usnPortToListenOn);
  472. return 1;
  473. }
  474. int IServer::CloseService() {
  475. closesocket(listen_socket);
  476. #ifdef _COND_SEND_
  477. pthread_cond_destroy(&cond_send_data);
  478. #endif
  479. return 0;
  480. }
  481. int IServer::RegisterMsgFilter(void * lpParam, CALLBACK_SERVER_EVENT pfnEventNotify) {
  482. call_back = pfnEventNotify;
  483. parameter = lpParam;
  484. return 0;
  485. }
  486. int IServer::PreparePackSink() { //准备开始数据封装
  487. return 0;
  488. }
  489. int IServer::PackDataToClient(const unsigned long &ulnClientID, const void * pData, unsigned long datalength) {
  490. if (ulnClientID < 0 || ulnClientID >= max_client)
  491. return -1;
  492. if(clients[ulnClientID].sock != INVALID_SOCKET) {
  493. if (clients[ulnClientID].state != STATE_CLOSING &&
  494. !clients[ulnClientID].buffer->packData((const char *)pData, datalength))
  495. {
  496. printf("sending buffer exceed....close connection [%d].n", clients[ulnClientID].index);
  497. clients[ulnClientID].state = STATE_CLOSING;
  498. return -1;
  499. }
  500. }
  501. JustDoIt(this, &clients[ulnClientID]);
  502. return 0;
  503. }
  504. int IServer::SendPackToClient() {
  505. int index;
  506. int nTotalLength = 0;
  507. for(index = 0; index < max_client; index++) {
  508. client_info& client = clients[index];
  509. //clients[index].mutex.lock();
  510. if (client.sock != INVALID_SOCKET) {
  511. JustDoIt(this, &client);
  512. int length = 0;
  513. char *data = client.buffer->completePacket(length);
  514. if(data && client.server_key) {
  515. KSG_EncodeBuf(length, (unsigned char *)data, (unsigned int *)&client.server_key);
  516. client.buffer->sendPacket();
  517. //printf("_<%08X>_", data);
  518. {
  519. //if (client.sock != INVALID_SOCKET && client.state == STATE_SENDING)
  520. {
  521. //client.mutex.lock();
  522. sendData(&client);
  523. //client.mutex.unlock();
  524. }
  525. }
  526. }
  527. nTotalLength += length;
  528. }
  529. JustDoIt(this, &client);
  530. //clients[index].mutex.unlock();
  531. }
  532. return nTotalLength;
  533. }
  534. int IServer::SendPackToClient(int index) {
  535. if (index < 0 || index >= max_client)
  536. {
  537. return SendPackToClient();
  538. }
  539. //clients[index].mutex.lock();
  540. client_info& client = clients[index];
  541. if(client.sock != INVALID_SOCKET) {
  542. int length;
  543. char *data = client.buffer->completePacket(length);
  544. if(data) {
  545. KSG_EncodeBuf(length, (unsigned char *)data, (unsigned int *)&client.server_key);
  546. client.buffer->sendPacket();
  547. sendData(&client);
  548. }
  549. //clients[index].mutex.unlock();
  550. JustDoIt(this, &client);
  551. return length;
  552. }
  553. //clients[index].mutex.unlock();
  554. return 0;
  555. }
  556. int IServer::SendData(const unsigned long &ulnClientID, const void * pData, unsigned long datalength) {
  557. PackDataToClient(ulnClientID, pData, datalength);
  558. SendPackToClient(ulnClientID);
  559. //发送一个消息
  560. return 0;
  561. }
  562. const void *IServer::GetPackFromClient(unsigned long ulnClientID, unsigned int &datalength) {
  563. if (ulnClientID < 0 || ulnClientID >= max_client)
  564. return NULL;
  565. client_info& client = clients[ulnClientID];
  566. JustDoIt(this, &client);
  567. if(client.sock != INVALID_SOCKET) {
  568. //clients[ulnClientID].mutex.lock();
  569. char *data = (char*)client.buffer->getPacket((int &)datalength);
  570. if(data) {
  571. KSG_DecodeBuf(datalength, (unsigned char *)data, (unsigned int *)&client.client_key);
  572. //client.mutex.unlock();
  573. return data;
  574. }
  575. //clients[ulnClientID].mutex.unlock();
  576. }
  577. return NULL; 
  578. }
  579. int IServer::ShutdownClient(const unsigned long &ulnClientID) { 
  580. if (ulnClientID < 0 || ulnClientID >= max_client)
  581. return NULL;
  582. //clients[ulnClientID].mutex.lock();
  583. if(clients[ulnClientID].sock != INVALID_SOCKET) {
  584. clients[ulnClientID].state = STATE_CLOSING;
  585. }
  586. //clients[ulnClientID].mutex.unlock();
  587. return 0; 
  588. }
  589. const char * IServer::GetClientInfo(const unsigned long &ulnClientID)
  590. {
  591. return inet_ntoa(((sockaddr_in*)&(clients[ulnClientID].addr))->sin_addr);
  592. }
  593. #ifndef WIN32
  594. int ZCloseCallBackThread::action()
  595. {
  596. int i = 0;
  597. printf("Close CallBack Thread...startn");
  598. while (1) {
  599. usleep(5000);
  600. for (i = 0; i < parent->max_client; i++)
  601. {
  602. client_info& client = parent->clients[i];
  603. {
  604. if (client.state == STATE_CLOSING) {
  605. //client.mutex.lock();
  606. parent->closeConnection(&client);
  607. //client.mutex.unlock();
  608. if(parent->call_back) {
  609. //printf("close call_back %dn", i);
  610. parent->call_back(parent->parameter, i, enumClientConnectClose);
  611. usleep(200000);
  612. }
  613. }
  614. }
  615. }
  616. }
  617. return 0;
  618. }
  619. int ZCreateCallBackThread::action()
  620. {
  621. int i = 0;
  622. printf("Create CallBack Thread...startn");
  623. while (1) {
  624. usleep(5000);
  625. for (i = 0; i < parent->max_client; i++)
  626. {
  627. client_info& client = parent->clients[i];
  628. {
  629. if (client.conn_flag)
  630. {
  631. //client.mutex.lock();
  632. client.conn_flag = false;
  633. //client.mutex.unlock();
  634. if(parent->call_back) {
  635. //printf("create call_back %dn", i);
  636. parent->call_back(parent->parameter, i, enumClientConnectCreate); //notify
  637. }
  638. }
  639. }
  640. }
  641. }
  642. return 0;
  643. }
  644. int ZRecvThread::action()
  645. {
  646. int i = 0;
  647. printf("receiving thread %d...startn", index);
  648. while (1) {
  649. usleep(5000);
  650. for (i = index; i < parent->max_client; i += parent->max_thread)
  651. {
  652. client_info& client = parent->clients[i];
  653. {
  654. if (client.sock != INVALID_SOCKET && client.recv_flag)
  655. {
  656. //client.mutex.lock();
  657. parent->receiveData(&client);
  658. client.recv_flag = false;
  659. //client.mutex.unlock();
  660. }
  661. }
  662. }
  663. }
  664. printf("receiving thread...endn");
  665. return 0;
  666. }
  667. int ZSendThread::action()
  668. {
  669. int i = 0;
  670. printf("sending thread %d...startn", index);
  671. while (1) {
  672. usleep(5000);
  673. for (i = index; i < parent->max_client; i += parent->max_thread)
  674. {
  675. client_info& client = parent->clients[i];
  676. {
  677. if (client.sock != INVALID_SOCKET && client.state == STATE_SENDING)
  678. {
  679. //client.mutex.lock();
  680. do_sendData(parent, &client);
  681. //client.mutex.unlock();
  682. }
  683. }
  684. }
  685. }
  686. printf("sending thread...endn");
  687. return 0;
  688. }
  689. #endif