IOCPServer.cpp
上传用户:hkcoast
上传日期:2007-01-12
资源大小:979k
文件大小:30k
源码类别:

手机短信编程

开发平台:

Visual C++

  1. // IOCPServer.cpp: implementation of the CIOCPServer class.
  2. //
  3. //////////////////////////////////////////////////////////////////////
  4. #include "stdafx.h"
  5. #include "IOCPServer.h"
  6. #include "SmppLibTest.h"
  7. #include "common.h"
  8. #ifdef _DEBUG
  9. #undef THIS_FILE
  10. static char THIS_FILE[]=__FILE__;
  11. #define new DEBUG_NEW
  12. #endif
  13. // Change at your Own Peril
  14. #define HDR_SIZE sizeof(int)
  15. #define HUERISTIC_VALUE 2
  16. CRITICAL_SECTION CIOCPServer::m_cs;
  17. //////////////////////////////////////////////////////////////////////
  18. // Construction/Destruction
  19. //////////////////////////////////////////////////////////////////////
  20. ////////////////////////////////////////////////////////////////////////////////
  21. // 
  22. // FUNCTION: CIOCPServer::CIOCPServer
  23. // 
  24. // DESCRIPTION: C'tor initializes Winsock2 and miscelleanous events etc.
  25. // 
  26. // INPUTS:
  27. // 
  28. // NOTES:
  29. // 
  30. // MODIFICATIONS:
  31. // 
  32. // Name                  Date       Version    Comments
  33. // N T ALMOND            06042001 1.0        Origin
  34. // 
  35. ////////////////////////////////////////////////////////////////////////////////
  36. CIOCPServer::CIOCPServer()
  37. {
  38. TRACE("CIOCPServer=%pn",this);
  39. // 
  40. WSADATA wsaData;
  41. WSAStartup(MAKEWORD(2,2), &wsaData);
  42. InitializeCriticalSection(&m_cs);
  43. m_hThread = NULL;
  44. m_hKillEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
  45. m_socListen = NULL;
  46. m_bTimeToKill = false;
  47. m_bDisconnectAll = false;
  48. m_hEvent = NULL;
  49. m_hCompletionPort= NULL;
  50. m_bInit = false;
  51. m_nCurrentThreads = 0;
  52. m_nBusyThreads = 0;
  53. m_nWorkerCnt = 0;
  54. //create class to handle smpp test
  55. m_pSmppLibTest = new CSmppLibTest(this);
  56. }
  57. ////////////////////////////////////////////////////////////////////////////////
  58. // 
  59. // FUNCTION: CIOCPServer::CIOCPServer
  60. // 
  61. // DESCRIPTION: Tidy up
  62. // 
  63. // INPUTS:
  64. // 
  65. // NOTES:
  66. // 
  67. // MODIFICATIONS:
  68. // 
  69. // Name                  Date       Version    Comments
  70. // N T ALMOND            06042001 1.0        Origin
  71. // 
  72. ////////////////////////////////////////////////////////////////////////////////
  73. CIOCPServer::~CIOCPServer()
  74. {
  75. Shutdown();
  76. }
  77. ////////////////////////////////////////////////////////////////////////////////
  78. // 
  79. // FUNCTION: Init
  80. // 
  81. // DESCRIPTION: Starts listener into motion
  82. // 
  83. // INPUTS:
  84. // 
  85. // NOTES:
  86. // 
  87. // MODIFICATIONS:
  88. // 
  89. // Name                  Date       Version    Comments
  90. // N T ALMOND            06042001 1.0        Origin
  91. // 
  92. ////////////////////////////////////////////////////////////////////////////////
  93. bool CIOCPServer::Initialize(int nConnections, int nPort)
  94. {
  95. m_socListen = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
  96. if (m_socListen == INVALID_SOCKET)
  97. {
  98. TRACE(_T("Could not create listen socket %ldn"),WSAGetLastError());
  99. return false;
  100. }
  101. // Event for handling Network IO
  102. m_hEvent = WSACreateEvent();
  103. if (m_hEvent == WSA_INVALID_EVENT)
  104. {
  105. TRACE(_T("WSACreateEvent() error %ldn"),WSAGetLastError());
  106. closesocket(m_socListen);
  107. return false;
  108. }
  109. // The listener is ONLY interested in FD_ACCEPT
  110. // That is when a client connects to or IP/Port
  111. // Request async notification
  112. int nRet = WSAEventSelect(m_socListen,
  113.   m_hEvent,
  114.   FD_ACCEPT);
  115. if (nRet == SOCKET_ERROR)
  116. {
  117. TRACE(_T("WSAAsyncSelect() error %ldn"),WSAGetLastError());
  118. closesocket(m_socListen);
  119. return false;
  120. }
  121. SOCKADDR_IN saServer;
  122. // Listen on our designated Port#
  123. saServer.sin_port = htons(nPort);
  124. // Fill in the rest of the address structure
  125. saServer.sin_family = AF_INET;
  126. saServer.sin_addr.s_addr = INADDR_ANY;
  127. // bind our name to the socket
  128. nRet = bind(m_socListen, 
  129. (LPSOCKADDR)&saServer, 
  130. sizeof(struct sockaddr));
  131. if (nRet == SOCKET_ERROR)
  132. {
  133. TRACE(_T("bind() error %ldn"),WSAGetLastError());
  134. closesocket(m_socListen);
  135. return false;
  136. }
  137. // Set the socket to listen
  138. nRet = listen(m_socListen, nConnections);
  139. if (nRet == SOCKET_ERROR)
  140. {
  141. TRACE(_T("listen() error %ldn"),WSAGetLastError());
  142. closesocket(m_socListen);
  143. return false;
  144. }
  145. ////////////////////////////////////////////////////////////////////////////////////////
  146. ////////////////////////////////////////////////////////////////////////////////////////
  147. UINT dwThreadId = 0;
  148. m_hThread =
  149. (HANDLE)_beginthreadex(NULL, // Security
  150.  0, // Stack size - use default
  151.  ListenThreadProc,  // Thread fn entry point
  152.  (void*) this,     
  153.  0, // Init flag
  154.  &dwThreadId); // Thread address
  155. if (m_hThread != INVALID_HANDLE_VALUE)
  156. {
  157. m_bInit = true;
  158. InitializeIOCP();
  159. return true;
  160. }
  161. return false;
  162. }
  163. ////////////////////////////////////////////////////////////////////////////////
  164. // 
  165. // FUNCTION: CIOCPServer::ListenThreadProc
  166. // 
  167. // DESCRIPTION: Listens for incoming clients
  168. // 
  169. // INPUTS:
  170. // 
  171. // NOTES:
  172. // 
  173. // MODIFICATIONS:
  174. // 
  175. // Name                  Date       Version    Comments
  176. // N T ALMOND            06042001 1.0        Origin
  177. // 
  178. ////////////////////////////////////////////////////////////////////////////////
  179. unsigned CIOCPServer::ListenThreadProc(LPVOID lParam)
  180. {
  181. CIOCPServer* pThis = reinterpret_cast<CIOCPServer*>(lParam);
  182. WSANETWORKEVENTS events;
  183. while(1)
  184. {
  185. //
  186. // Wait for something to happen
  187. //
  188.         if (WaitForSingleObject(pThis->m_hKillEvent, 100) == WAIT_OBJECT_0)
  189.             break;
  190. DWORD dwRet;
  191. dwRet = WSAWaitForMultipleEvents(1,
  192.  &pThis->m_hEvent,
  193.  FALSE,
  194.  100,
  195.  FALSE);
  196. if (dwRet == WSA_WAIT_TIMEOUT)
  197. continue;
  198. //
  199. // Figure out what happened
  200. //
  201. int nRet = WSAEnumNetworkEvents(pThis->m_socListen,
  202.  pThis->m_hEvent,
  203.  &events);
  204. if (nRet == SOCKET_ERROR)
  205. {
  206. TRACE(_T("WSAEnumNetworkEvents error %ldn"),WSAGetLastError());
  207. break;
  208. }
  209. // Handle Network events //
  210. // ACCEPT
  211. if (events.lNetworkEvents & FD_ACCEPT)
  212. {
  213. if (events.iErrorCode[FD_ACCEPT_BIT] == 0)
  214. pThis->OnAccept();
  215. else
  216. {
  217. TRACE(_T("Unknown network event error %ldn"),WSAGetLastError());
  218. break;
  219. }
  220. }
  221. } // while....
  222. return 0; // Normal Thread Exit Code...
  223. }
  224. ////////////////////////////////////////////////////////////////////////////////
  225. // 
  226. // FUNCTION: CIOCPServer::OnAccept
  227. // 
  228. // DESCRIPTION: Listens for incoming clients
  229. // 
  230. // INPUTS:
  231. // 
  232. // NOTES:
  233. // 
  234. // MODIFICATIONS:
  235. // 
  236. // Name                  Date       Version    Comments
  237. // N T ALMOND            06042001 1.0        Origin
  238. // Ulf Hedlund  09072001    Changes for OVERLAPPEDPLUS
  239. // Mark Tutt  09072001    setsockopt fix
  240. ////////////////////////////////////////////////////////////////////////////////
  241. void CIOCPServer::OnAccept()
  242. {
  243. SOCKADDR_IN SockAddr;
  244. SOCKET clientSocket;
  245. int nRet;
  246. int nLen;
  247. if (m_bTimeToKill || m_bDisconnectAll)
  248. return;
  249. //
  250. // accept the new socket descriptor
  251. //
  252. nLen = sizeof(SOCKADDR_IN);
  253. clientSocket = accept(m_socListen,
  254.     (LPSOCKADDR)&SockAddr,
  255. &nLen); 
  256. if (clientSocket == SOCKET_ERROR)
  257. {
  258. nRet = WSAGetLastError();
  259. if (nRet != WSAEWOULDBLOCK)
  260. {
  261. //
  262. // Just log the error and return
  263. //
  264. TRACE(_T("accept() errorn"),WSAGetLastError());
  265. return;
  266. }
  267. }
  268. // Create the Client context to be associted with the completion port
  269. ClientContext* pContext = AllocateContext();
  270.     pContext->m_Socket = clientSocket;
  271. // Fix up In Buffer
  272. pContext->m_wsaInBuffer.buf = (char*)pContext->m_byInBuffer;
  273. pContext->m_wsaInBuffer.len = sizeof(pContext->m_byInBuffer);
  274. BOOL chOpt = true;
  275. int nErr = setsockopt(pContext->m_Socket, IPPROTO_TCP, TCP_NODELAY, (char *)&chOpt, sizeof(chOpt));
  276. if (nErr == -1)
  277. {
  278. TRACE(_T("setsockopt() errorn"),WSAGetLastError());
  279. return;
  280. }
  281.    // Associate the new socket with a completion port.
  282. if (!AssociateSocketWithCompletionPort(clientSocket, m_hCompletionPort, (DWORD) pContext))
  283.     {
  284.         delete pContext;
  285. pContext = NULL;
  286.         closesocket( clientSocket );
  287.         closesocket( m_socListen );
  288.         return;
  289.     }
  290. {
  291.     CLock cs(m_cs, "OnAccept" );
  292. // Hold a reference to the context
  293. m_listContexts.SetAt(GetHostName(pContext->m_Socket), pContext);
  294. }
  295. // Trigger first IO Completion Request
  296. // Otherwise the Worker thread will remain blocked waiting for GetQueuedCompletionStatus...
  297. // The first message that gets queued up is ClientIoInitializing - see ThreadPoolFunc and 
  298. // IO_MESSAGE_HANDLER
  299. OVERLAPPEDPLUS *pOverlap = new OVERLAPPEDPLUS(IOInitialize);
  300. BOOL bSuccess = PostQueuedCompletionStatus(m_hCompletionPort, 0, (DWORD) pContext, &pOverlap->m_ol);
  301. if ( (!bSuccess && GetLastError( ) != ERROR_IO_PENDING))
  302. {            
  303.         RemoveStaleClient(pContext,TRUE);
  304.     return;
  305.     }
  306. }
  307. ClientContext* CIOCPServer::FindClient(const CString& strClient)
  308. {
  309. CString strHost = strClient;
  310. ClientContext* pContext = NULL;
  311. m_listContexts.Lookup(strHost, pContext);
  312. return pContext;
  313. }
  314. ////////////////////////////////////////////////////////////////////////////////
  315. // 
  316. // FUNCTION: CIOCPServer::InitializeIOCP
  317. // 
  318. // DESCRIPTION: Create a dummy socket and associate a completion port with it.
  319. // once completion port is create we can dicard the socket
  320. // 
  321. // INPUTS:
  322. // 
  323. // NOTES:
  324. // 
  325. // MODIFICATIONS:
  326. // 
  327. // Name                  Date       Version    Comments
  328. // N T ALMOND            06042001 1.0        Origin
  329. // 
  330. ////////////////////////////////////////////////////////////////////////////////
  331. bool CIOCPServer::InitializeIOCP(void)
  332. {
  333.     SOCKET s;
  334.     DWORD i;
  335.     UINT  nThreadID;
  336.     SYSTEM_INFO systemInfo;
  337.     //
  338.     // First open a temporary socket that we will use to create the
  339.     // completion port.  In NT 3.51 it will not be necessary to specify
  340.     // the FileHandle parameter of CreateIoCompletionPort()--it will
  341.     // be legal to specify FileHandle as NULL.  However, for NT 3.5
  342.     // we need an overlapped file handle.
  343.     //
  344.     s = socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
  345.     if ( s == INVALID_SOCKET ) 
  346.         return false;
  347.     // Create the completion port that will be used by all the worker
  348.     // threads.
  349.     m_hCompletionPort = CreateIoCompletionPort( (HANDLE)s, NULL, 0, 0 );
  350.     if ( m_hCompletionPort == NULL ) 
  351. {
  352.         closesocket( s );
  353.         return false;
  354.     }
  355.     // Close the socket, we don't need it any longer.
  356.     closesocket( s );
  357.     // Determine how many processors are on the system.
  358.     GetSystemInfo( &systemInfo );
  359. m_nThreadPoolMin  = systemInfo.dwNumberOfProcessors * HUERISTIC_VALUE;
  360. m_nThreadPoolMax  = m_nThreadPoolMin;
  361. m_nCPULoThreshold = 10; 
  362. m_nCPUHiThreshold = 75; 
  363. // m_cpu.Init();
  364.     // We use two worker threads for eachprocessor on the system--this is choosen as a good balance
  365.     // that ensures that there are a sufficient number of threads available to get useful work done 
  366. // but not too many that context switches consume significant overhead.
  367. UINT nWorkerCnt = systemInfo.dwNumberOfProcessors * HUERISTIC_VALUE;
  368. // We need to save the Handles for Later Termination...
  369. HANDLE hWorker;
  370.     for ( i = 0; i < nWorkerCnt; i++ ) 
  371. {
  372. hWorker = (HANDLE)_beginthreadex(NULL, // Security
  373. 0, // Stack size - use default
  374. ThreadPoolFunc,      // Thread fn entry point
  375. (void*) this, // Param for thread
  376. 0, // Init flag
  377. &nThreadID); // Thread address
  378.         if (hWorker == NULL ) 
  379. {
  380.             CloseHandle( m_hCompletionPort );
  381.             return false;
  382.         }
  383. m_nWorkerCnt++;
  384. CloseHandle(hWorker);
  385.     }
  386. return true;
  387. ////////////////////////////////////////////////////////////////////////////////
  388. // 
  389. // FUNCTION: CIOCPServer::ThreadPoolFunc 
  390. // 
  391. // DESCRIPTION: This is the main worker routine for the worker threads.  
  392. // Worker threads wait on a completion port for I/O to complete.  
  393. // When it completes, the worker thread processes the I/O, then either pends 
  394. // new I/O or closes the client's connection.  When the service shuts 
  395. // down, other code closes the completion port which causes 
  396. // GetQueuedCompletionStatus() to wake up and the worker thread then 
  397. // exits.
  398. // 
  399. // INPUTS:
  400. // 
  401. // NOTES:
  402. // 
  403. // MODIFICATIONS:
  404. // 
  405. // Name                  Date       Version    Comments
  406. // N T ALMOND            06042001 1.0        Origin
  407. // Ulf Hedlund  09062001              Changes for OVERLAPPEDPLUS
  408. ////////////////////////////////////////////////////////////////////////////////
  409. unsigned CIOCPServer::ThreadPoolFunc (LPVOID thisContext)    
  410. {
  411. // Get back our pointer to the class
  412. ULONG ulFlags = MSG_PARTIAL;
  413. CIOCPServer* pThis = reinterpret_cast<CIOCPServer*>(thisContext);
  414. ASSERT(pThis);
  415.     HANDLE hCompletionPort = pThis->m_hCompletionPort;
  416.     
  417.     DWORD dwIoSize;
  418.     LPOVERLAPPED lpOverlapped;
  419.     ClientContext* lpClientContext;
  420. OVERLAPPEDPLUS* pOverlapPlus;
  421. bool bError;
  422. bool bEnterRead;
  423. InterlockedIncrement(&pThis->m_nCurrentThreads);
  424. InterlockedIncrement(&pThis->m_nBusyThreads);
  425. //
  426.     // Loop round and round servicing I/O completions.
  427. // 
  428. for ( BOOL bStayInPool = TRUE; bStayInPool && pThis->m_bTimeToKill == false; ) 
  429. {
  430. pOverlapPlus = NULL;
  431. lpClientContext = NULL;
  432. bError = false;
  433. bEnterRead = false;
  434. // Thread is Block waiting for IO completion
  435. InterlockedDecrement(&pThis->m_nBusyThreads);
  436. // Get a completed IO request.
  437. BOOL bIORet = GetQueuedCompletionStatus(
  438.                hCompletionPort,
  439.                &dwIoSize,
  440.                (LPDWORD) &lpClientContext,
  441.                &lpOverlapped, INFINITE);
  442. DWORD dwIOError = GetLastError();
  443. pOverlapPlus = CONTAINING_RECORD(lpOverlapped, OVERLAPPEDPLUS, m_ol);
  444. int nBusyThreads = InterlockedIncrement(&pThis->m_nBusyThreads);
  445.         if (!bIORet && dwIOError != WAIT_TIMEOUT )
  446. {
  447. if (lpClientContext && pThis->m_bTimeToKill == false)
  448. pThis->RemoveStaleClient(lpClientContext, FALSE);
  449. continue;
  450. // anyway, this was an error and we should exit
  451. bError = true;
  452. }
  453. if (!bError) 
  454. {
  455. // Allocate another thread to the thread Pool?
  456. if (nBusyThreads == pThis->m_nCurrentThreads)
  457. {
  458. if (nBusyThreads < pThis->m_nThreadPoolMax)
  459. {
  460. // if (pThis->m_cpu.GetUsage() > pThis->m_nCPUHiThreshold)
  461. {
  462. // UINT nThreadID = -1;
  463. // HANDLE hThread = (HANDLE)_beginthreadex(NULL, // Security
  464. //  0, // Stack size - use default
  465. //  ThreadPoolFunc,  // Thread fn entry point
  466. ///  (void*) pThis,     
  467. //  0, // Init flag
  468. //  &nThreadID); // Thread address
  469. // CloseHandle(hThread);
  470. }
  471. }
  472. }
  473. // Thread timed out - IDLE?
  474. if (!bIORet && dwIOError == WAIT_TIMEOUT)
  475. {
  476. if (lpClientContext == NULL)
  477. {
  478. // if (pThis->m_cpu.GetUsage() < pThis->m_nCPULoThreshold)
  479. // {
  480. // // Thread has no outstanding IO - Server hasn't much to do so die
  481. // if (pThis->m_nCurrentThreads > pThis->m_nThreadPoolMin)
  482. bStayInPool =  FALSE;
  483. // }
  484. bError = true;
  485. }
  486. }
  487. }
  488. //////////////////////////////////////////////////////////////////////////////////////////
  489. //////////////////////////////////////////////////////////////////////////////////////////
  490. if (!bError) 
  491. {
  492. if(bIORet && NULL != pOverlapPlus && NULL != lpClientContext) 
  493. {
  494. bEnterRead = pThis->ProcessIOMessage(pOverlapPlus->m_ioType, lpClientContext, dwIoSize);
  495. }
  496. }
  497. if(! bError && bEnterRead) 
  498. {
  499. // issue a read request 
  500. OVERLAPPEDPLUS * pOverlap = new OVERLAPPEDPLUS(IORead);
  501. ULONG ulFlags = 0;
  502. UINT nRetVal = WSARecv(lpClientContext->m_Socket, 
  503. &lpClientContext->m_wsaInBuffer,
  504. 1,
  505. &dwIoSize, 
  506. &ulFlags,
  507. &pOverlap->m_ol, 
  508. NULL);
  509. if ( nRetVal == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING) 
  510. {
  511. DWORD dwErr  = WSAGetLastError();
  512. pThis->RemoveStaleClient( lpClientContext, FALSE );
  513. }
  514. }
  515. if(pOverlapPlus)
  516. delete pOverlapPlus; // from previous call
  517.     }
  518. InterlockedDecrement(&pThis->m_nWorkerCnt);
  519. InterlockedDecrement(&pThis->m_nCurrentThreads);
  520. InterlockedDecrement(&pThis->m_nBusyThreads);
  521.     return 0;
  522. ////////////////////////////////////////////////////////////////////////////////
  523. // 
  524. // FUNCTION: CIOCPServer::Stop
  525. // 
  526. // DESCRIPTION: Signal the listener to quit his thread
  527. // 
  528. // INPUTS:
  529. // 
  530. // NOTES:
  531. // 
  532. // MODIFICATIONS:
  533. // 
  534. // Name                  Date       Version    Comments
  535. // N T ALMOND            06042001 1.0        Origin
  536. // 
  537. ////////////////////////////////////////////////////////////////////////////////
  538. void CIOCPServer::Stop()
  539. {
  540.     ::SetEvent(m_hKillEvent);
  541.     WaitForSingleObject(m_hThread, INFINITE);
  542. CloseHandle(m_hThread);
  543.     CloseHandle(m_hKillEvent);
  544. }
  545. ////////////////////////////////////////////////////////////////////////////////
  546. // 
  547. // FUNCTION: CIOCPServer::GetHostName
  548. // 
  549. // DESCRIPTION: Get the host name of the connect client
  550. // 
  551. // INPUTS:
  552. // 
  553. // NOTES:
  554. // 
  555. // MODIFICATIONS:
  556. // 
  557. // Name                  Date       Version    Comments
  558. // N T ALMOND            06042001 1.0        Origin
  559. // 
  560. ////////////////////////////////////////////////////////////////////////////////
  561. CString CIOCPServer::GetHostName(SOCKET socket)
  562. {
  563. sockaddr_in  sockAddr;
  564. memset(&sockAddr, 0, sizeof(sockAddr));
  565. int nSockAddrLen = sizeof(sockAddr);
  566. BOOL bResult = getpeername(socket,(SOCKADDR*)&sockAddr, &nSockAddrLen);
  567. return bResult != INVALID_SOCKET ? inet_ntoa(sockAddr.sin_addr) : "";
  568. }
  569. ////////////////////////////////////////////////////////////////////////////////
  570. // 
  571. // FUNCTION: CIOCPServer::Send
  572. // 
  573. // DESCRIPTION: Posts a Write + Data to IO CompletionPort for transfer
  574. // 
  575. // INPUTS:
  576. // 
  577. // NOTES:
  578. // 
  579. // MODIFICATIONS:
  580. // 
  581. // Name                  Date       Version    Comments
  582. // N T ALMOND            06042001 1.0        Origin
  583. // Ulf Hedlund  09062001    Changes for OVERLAPPEDPLUS
  584. ////////////////////////////////////////////////////////////////////////////////
  585. void CIOCPServer::Send(const CString& strClient, CString strData)
  586. {
  587. ClientContext* pContext = FindClient(strClient);
  588. if (pContext == NULL)
  589. return;
  590. int nBufLen = strData.GetLength();
  591. // 4 byte header [Size of Entire Packet]
  592. pContext->m_WriteBuffer.Write((PBYTE) &nBufLen, sizeof(nBufLen));
  593. pContext->m_WriteBuffer.Write((PBYTE) strData.GetBuffer(nBufLen), nBufLen);
  594. // Wait for Data Ready signal to become available
  595. // WaitForSingleObject(pContext->m_hWriteComplete, INFINITE);
  596. // Prepare Packet
  597. int nSize = pContext->m_WriteBuffer.GetBufferLen();
  598. // pContext->m_wsaOutBuffer.buf = (CHAR*) new BYTE[nSize];
  599. // pContext->m_wsaOutBuffer.len = nSize;
  600. OVERLAPPEDPLUS * pOverlap = new OVERLAPPEDPLUS(IOWrite);
  601. PostQueuedCompletionStatus(m_hCompletionPort, 0, (DWORD) pContext, &pOverlap->m_ol);
  602. pContext->m_nMsgOut++;
  603. }
  604. void CIOCPServer::Send(ClientContext* pContext, PBYTE pbyData, const long nSize)
  605. {
  606. pContext->m_WriteBuffer.Write((PBYTE) pbyData, nSize);
  607. // Prepare Packet
  608. int pkSize = pContext->m_WriteBuffer.GetBufferLen();
  609. pContext->m_wsaOutBuffer.buf = (CHAR*) new BYTE[pkSize];
  610. pContext->m_wsaOutBuffer.len = pkSize;
  611. OVERLAPPEDPLUS * pOverlap = new OVERLAPPEDPLUS(IOWrite);
  612. PostQueuedCompletionStatus(m_hCompletionPort, 0, (DWORD) pContext, &pOverlap->m_ol);
  613. pContext->m_nMsgOut++;
  614. }
  615. ////////////////////////////////////////////////////////////////////////////////
  616. // 
  617. // FUNCTION: CClientListener::OnClientInitializing
  618. // 
  619. // DESCRIPTION: Called when client is initailizing
  620. // 
  621. // INPUTS:
  622. // 
  623. // NOTES:
  624. // 
  625. // MODIFICATIONS:
  626. // 
  627. // Name                  Date       Version    Comments
  628. // N T ALMOND            06042001 1.0        Origin
  629. // Ulf Hedlund           09062001        Changes for OVERLAPPEDPLUS
  630. ////////////////////////////////////////////////////////////////////////////////
  631. bool CIOCPServer::OnClientInitializing(ClientContext* pContext, DWORD dwIoSize)
  632. {
  633. // We are not actually doing anything here, but we could for instance make
  634. // a call to Send() to send a greeting message or something
  635. return true; // make sure to issue a read after this
  636. }
  637. ////////////////////////////////////////////////////////////////////////////////
  638. // 
  639. // FUNCTION: CIOCPServer::OnClientReading
  640. // 
  641. // DESCRIPTION: Called when client is reading 
  642. // 
  643. // INPUTS:
  644. // 
  645. // NOTES:
  646. // 
  647. // MODIFICATIONS:
  648. // 
  649. // Name                  Date       Version    Comments
  650. // N T ALMOND            06042001 1.0        Origin
  651. // Ulf Hedlund           09062001        Changes for OVERLAPPEDPLUS
  652. // Igor Janjetovic  12122001        Fixed Echo drop problem
  653. ////////////////////////////////////////////////////////////////////////////////
  654. bool CIOCPServer::OnClientReading(ClientContext* pContext, DWORD dwIoSize)
  655. {
  656. CLock cs(CIOCPServer::m_cs, "OnClientReading");
  657. if (dwIoSize == 0)
  658. {
  659. RemoveStaleClient( pContext, FALSE );
  660. return false;
  661. }
  662. // Add the message to out message
  663. // Dont forget there could be a partial, 1, 1 or more + partial mesages
  664. pContext->m_ReadBuffer.Write(pContext->m_byInBuffer,dwIoSize);
  665. // Check real Data
  666. while (pContext->m_ReadBuffer.GetBufferLen() > HDR_SIZE)
  667. {
  668. BYTE hdr_pk[4];
  669. int nSize = 0;
  670. int nCommand = 0;
  671. CopyMemory(hdr_pk, pContext->m_ReadBuffer.GetBuffer(), sizeof(int));
  672. nSize = readInt(hdr_pk);
  673. if (nSize && pContext->m_ReadBuffer.GetBufferLen() >= nSize)
  674. {
  675. // Read off header
  676. pContext->m_ReadBuffer.Read((PBYTE) hdr_pk, sizeof(int));
  677. ////////////////////////////////////////////////////////
  678. ////////////////////////////////////////////////////////
  679. // SO you would process your data here
  680. // 
  681. // I'm just going to post message so we can see the data
  682. PBYTE pData = new BYTE[nSize-4];
  683. pContext->m_ReadBuffer.Read(pData,nSize-4);
  684. //here we got a complete message to process
  685. TRACE("Got a message in OnClientReading and going to processn");
  686. //route the message to smpplibtest to handle
  687. m_pSmppLibTest->parsePacket(pContext, pData, nSize);
  688. // Clean Up
  689. delete pData;
  690. }
  691. else
  692. break;
  693. }
  694. return true;
  695. }
  696. ////////////////////////////////////////////////////////////////////////////////
  697. // 
  698. // FUNCTION: CIOCPServer::OnClientWriting
  699. // 
  700. // DESCRIPTION: Called when client is writing
  701. // 
  702. // INPUTS:
  703. // 
  704. // NOTES:
  705. // 
  706. // MODIFICATIONS:
  707. // 
  708. // Name                  Date       Version    Comments
  709. // N T ALMOND            06042001 1.0        Origin
  710. // Ulf Hedlund           09062001        Changes for OVERLAPPEDPLUS
  711. // John Dresher          30022002        Changes for OVERLAPPEDPLUS
  712. ////////////////////////////////////////////////////////////////////////////////
  713. bool CIOCPServer::OnClientWriting(ClientContext* pContext, DWORD dwIoSize)
  714. {
  715. ULONG ulFlags = MSG_PARTIAL;
  716. TCHAR buf[100];
  717. sprintf(buf, "OnClientWriting, dwIoSize is : %d", static_cast<unsigned long>(dwIoSize));
  718. TRACE(buf);
  719. // if ( dwIoSize == 0 ) 
  720. // return true;
  721. // Finished writing - tidy up
  722. pContext->m_WriteBuffer.Delete(dwIoSize);
  723. if (pContext->m_WriteBuffer.GetBufferLen() == 0)
  724. {
  725. pContext->m_WriteBuffer.ClearBuffer();
  726. // Write complete
  727. // SetEvent(pContext->m_hWriteComplete);
  728. return true; // issue new read after this one
  729. }
  730. else
  731. {
  732. OVERLAPPEDPLUS * pOverlap = new OVERLAPPEDPLUS(IOWrite);
  733. pContext->m_wsaOutBuffer.buf = (char*) pContext->m_WriteBuffer.GetBuffer();
  734. pContext->m_wsaOutBuffer.len = pContext->m_WriteBuffer.GetBufferLen();
  735. int nRetVal = WSASend(pContext->m_Socket, 
  736. &pContext->m_wsaOutBuffer,
  737. 1,
  738. &pContext->m_wsaOutBuffer.len, 
  739. ulFlags,
  740. &pOverlap->m_ol, 
  741. NULL);
  742. TRACE("Writingn");
  743. if ( nRetVal == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING ) 
  744. RemoveStaleClient( pContext, FALSE );
  745. }
  746. return false; // issue new read after this one
  747. }
  748. ////////////////////////////////////////////////////////////////////////////////
  749. // 
  750. // FUNCTION: CIOCPServer::CloseCompletionPort
  751. // 
  752. // DESCRIPTION: Close down the IO Complete Port, queue and associated client context structs
  753. // which in turn will close the sockets...
  754. //
  755. // 
  756. // INPUTS:
  757. // 
  758. // NOTES:
  759. // 
  760. // MODIFICATIONS:
  761. // 
  762. // Name                  Date       Version    Comments
  763. // N T ALMOND            06042001 1.0        Origin
  764. // 
  765. ////////////////////////////////////////////////////////////////////////////////
  766. void CIOCPServer::CloseCompletionPort()
  767. {
  768. while (m_nWorkerCnt)
  769. {
  770. PostQueuedCompletionStatus(m_hCompletionPort, 0, (DWORD) NULL, NULL);
  771. Sleep(1000);
  772. }
  773. // Close the CompletionPort and stop any more requests
  774. CloseHandle(m_hCompletionPort);
  775. CString strHost;
  776. ClientContext* pContext = NULL;
  777. do 
  778. {
  779. POSITION pos  = m_listContexts.GetStartPosition();
  780. if (pos)
  781. {
  782. m_listContexts.GetNextAssoc(pos, strHost, pContext);
  783. RemoveStaleClient(pContext, FALSE);
  784. }
  785. }
  786. while (!m_listContexts.IsEmpty());
  787. POSITION pos  = m_listContexts.GetStartPosition();
  788. while (pos)
  789. {
  790. m_listContexts.GetNextAssoc(pos, strHost, pContext);
  791. delete pContext;
  792. }
  793. }
  794. BOOL CIOCPServer::AssociateSocketWithCompletionPort(SOCKET socket, HANDLE hCompletionPort, DWORD dwCompletionKey)
  795. {
  796. HANDLE h = CreateIoCompletionPort((HANDLE) socket, hCompletionPort, dwCompletionKey, 0);
  797. return h == hCompletionPort;
  798. }
  799. ////////////////////////////////////////////////////////////////////////////////
  800. // 
  801. // FUNCTION: CIOCPServer::RemoveStaleClient
  802. // 
  803. // DESCRIPTION: Client has died on us, close socket and remove context from our list
  804. // 
  805. // INPUTS:
  806. // 
  807. // NOTES:
  808. // 
  809. // MODIFICATIONS:
  810. // 
  811. // Name                  Date       Version    Comments
  812. // N T ALMOND            06042001 1.0        Origin
  813. // 
  814. ////////////////////////////////////////////////////////////////////////////////
  815. void CIOCPServer::RemoveStaleClient(ClientContext* pContext, BOOL bGraceful)
  816. {
  817.     CLock cs(m_cs, "RemoveStaleClient");
  818. TRACE("CIOCPServer::RemoveStaleClientn");
  819.     LINGER lingerStruct;
  820. CString strHost = GetHostName(pContext->m_Socket);
  821. if (strHost.IsEmpty())
  822. return;
  823.     //
  824.     // If we're supposed to abort the connection, set the linger value
  825.     // on the socket to 0.
  826.     //
  827.     if ( !bGraceful ) 
  828. {
  829.         lingerStruct.l_onoff = 1;
  830.         lingerStruct.l_linger = 0;
  831.         setsockopt( pContext->m_Socket, SOL_SOCKET, SO_LINGER,
  832.                     (char *)&lingerStruct, sizeof(lingerStruct) );
  833.     }
  834.     //
  835.     // Free context structures
  836. if (m_listContexts.Lookup(strHost, pContext)) 
  837. {
  838. //
  839. // Now close the socket handle.  This will do an abortive or  graceful close, as requested.  
  840. CancelIo((HANDLE) pContext->m_Socket);
  841. closesocket( pContext->m_Socket );
  842. pContext->m_Socket = INVALID_SOCKET;
  843.         while (!HasOverlappedIoCompleted((LPOVERLAPPED)pContext)) 
  844.                 Sleep(0);
  845. MoveToFreePool(strHost);
  846. }
  847. }
  848. void CIOCPServer::Shutdown()
  849. {
  850. if (m_bInit == false)
  851. return;
  852. m_bInit = false;
  853. m_bTimeToKill = true;
  854. // Stop the listener
  855. Stop();
  856. closesocket(m_socListen);
  857. WSACloseEvent(m_hEvent);
  858. CloseCompletionPort();
  859. DeleteCriticalSection(&m_cs);
  860. WSACleanup();
  861. while (!m_listFreePool.IsEmpty())
  862. delete m_listFreePool.RemoveTail();
  863. }
  864. ////////////////////////////////////////////////////////////////////////////////
  865. // 
  866. // FUNCTION: CIOCPServer::MoveToFreePool
  867. // 
  868. // DESCRIPTION: Checks free pool otherwise allocates a context
  869. // 
  870. // INPUTS:
  871. // 
  872. // NOTES:
  873. // 
  874. // MODIFICATIONS:
  875. // 
  876. // Name                  Date       Version    Comments
  877. // N T ALMOND            06042001 1.0        Origin
  878. // 
  879. ////////////////////////////////////////////////////////////////////////////////
  880. void CIOCPServer::MoveToFreePool(CString& strKey)
  881. {
  882. ClientContext* pContext = NULL;
  883.     // Free context structures
  884. if (m_listContexts.Lookup(strKey, pContext)) 
  885. {
  886. pContext->m_ReadBuffer.ClearBuffer();
  887. pContext->m_WriteBuffer.ClearBuffer();
  888. m_listFreePool.AddTail(pContext);
  889. m_listContexts.RemoveKey(strKey);
  890. }
  891. }
  892. ////////////////////////////////////////////////////////////////////////////////
  893. // 
  894. // FUNCTION: CIOCPServer::MoveToFreePool
  895. // 
  896. // DESCRIPTION: Moves an 'used/stale' Context to the free pool for reuse
  897. // 
  898. // INPUTS:
  899. // 
  900. // NOTES:
  901. // 
  902. // MODIFICATIONS:
  903. // 
  904. // Name                  Date       Version    Comments
  905. // N T ALMOND            06042001 1.0        Origin
  906. // 
  907. ////////////////////////////////////////////////////////////////////////////////
  908. ClientContext*  CIOCPServer::AllocateContext()
  909. {
  910. ClientContext* pContext = NULL;
  911. CLock cs(CIOCPServer::m_cs, "AllocateContext");
  912. if (!m_listFreePool.IsEmpty())
  913. {
  914. pContext = m_listFreePool.RemoveHead();
  915. }
  916. else
  917. {
  918. pContext = new ClientContext;
  919. }
  920. ASSERT(pContext);
  921. ZeroMemory(pContext, sizeof(ClientContext));
  922. return pContext;
  923. }
  924. void CIOCPServer::ResetConnection(ClientContext* pContext)
  925. {
  926. CString strHost;
  927. ClientContext* pCompContext = NULL;
  928. CLock cs(CIOCPServer::m_cs, "ResetConnection");
  929. POSITION pos  = m_listContexts.GetStartPosition();
  930. while (pos)
  931. {
  932. m_listContexts.GetNextAssoc(pos, strHost, pCompContext);
  933. if (pCompContext == pContext)
  934. {
  935. RemoveStaleClient(pContext, TRUE);
  936. break;
  937. }
  938. }
  939. }
  940. void CIOCPServer::DisconnectAll()
  941. {
  942. m_bDisconnectAll = true;
  943. CString strHost;
  944. ClientContext* pContext = NULL;
  945. CLock cs(CIOCPServer::m_cs, "ResetConnection");
  946. POSITION pos  = m_listContexts.GetStartPosition();
  947. while (pos)
  948. {
  949. m_listContexts.GetNextAssoc(pos, strHost, pContext);
  950. RemoveStaleClient(pContext, TRUE);
  951. }
  952. m_bDisconnectAll = false;
  953. }