SocketServer.cpp
上传用户:dzyhzl
上传日期:2019-04-29
资源大小:56270k
文件大小:20k
源码类别:

模拟服务器

开发平台:

C/C++

  1. #include "SocketServer.h"
  2. #include "IOCompletionPort.h"
  3. #include "Win32Exception.h"
  4. #include "Utils.h"
  5. #include "SystemInfo.h"
  6. #include "..protocolprotocol.h"
  7. #include <vector>
  8. #pragma comment(lib, "ws2_32.lib")
  9. /*
  10.  * Link options and warning
  11.  */
  12. #pragma message( "NOTE : --------------------OnlineGameLib [Server] : Announcement--------------------" )
  13. #pragma message( "NOTE : Developing a truly scalable Winsock Server using IO Completion Ports(IOCP)" )
  14. #pragma message( "NOTE : But this module depend on the microsoft platform" )
  15. #pragma message( "NOTE : Requirements :" )
  16. #pragma message( "NOTE :    * Windows NT/2000/XP: Included in Windows NT 4.0 and later." )
  17. #pragma message( "NOTE :    * Windows 95/98/Me: Unsupported." )
  18. #pragma message( "NOTE :" )
  19. #pragma message( "NOTE : liupeng xishanju.zhuhai.china 2003.1" )
  20. #pragma message( "NOTE : -----------------------------------------------------------------------------" )
  21. using std::vector;
  22. /*
  23.  * namespace OnlineGameLib::Win32
  24.  */
  25. namespace OnlineGameLib {
  26. namespace Win32 {
  27. /*
  28.  * Local enums environment
  29.  */
  30. enum IO_Operation 
  31. IO_Read_Request, 
  32. IO_Read_Completed, 
  33. IO_Write_Request, 
  34. IO_Write_Completed,
  35. IO_Close
  36. };
  37. /*
  38.  * Static helper methods
  39.  */
  40. static size_t CalculateNumberOfThreads( size_t numThreads );
  41. static size_t CalculateNumberOfThreads( size_t numThreads )
  42. {
  43. if ( 0 == numThreads )
  44. {
  45. CSystemInfo systemInfo;
  46. numThreads = systemInfo.dwNumberOfProcessors * 2;
  47. }
  48. return numThreads;
  49. }
  50. /*
  51.  * CSocketServer 
  52.  */
  53. CSocketServer::CSocketServer(
  54.    unsigned long addressToListenOn,
  55.    unsigned short portToListenOn,
  56.    size_t maxFreeSockets,
  57.    size_t maxFreeBuffers,
  58.    size_t bufferSize /* = 1024 */,
  59.    size_t numThreads /* = 0 */)
  60. : CIOBuffer::Allocator(bufferSize, maxFreeBuffers),
  61.   m_numThreads(CalculateNumberOfThreads( numThreads )),
  62.   m_listeningSocket(INVALID_SOCKET),
  63.   m_iocp(0),
  64.   m_address(addressToListenOn),
  65.   m_port(portToListenOn),
  66.   m_maxFreeSockets(maxFreeSockets)
  67. {
  68. }
  69. CSocketServer::CSocketServer(
  70. size_t maxFreeSockets,
  71. size_t maxFreeBuffers,
  72. size_t bufferSize /* = 1024 */,
  73. size_t numThreads /* = 0 */)
  74. : CIOBuffer::Allocator(bufferSize, maxFreeBuffers),
  75.   m_numThreads(CalculateNumberOfThreads( numThreads )),
  76.   m_listeningSocket(INVALID_SOCKET),
  77.   m_iocp(0),
  78.   m_maxFreeSockets(maxFreeSockets)
  79. {
  80. }
  81. void CSocketServer::Open( 
  82. unsigned long addressToListenOn,
  83. unsigned short portToListenOn
  84. )
  85. {
  86. m_address = addressToListenOn;
  87. m_port = portToListenOn;
  88. }
  89. CSocketServer::~CSocketServer()
  90. {
  91. try
  92. {
  93. ReleaseSockets();
  94. }
  95. catch(...)
  96. {
  97. }
  98. }
  99. void CSocketServer::ReleaseSockets()
  100. {
  101. CCriticalSection::Owner lock( m_listManipulationSection );
  102. Socket *pSocket = m_activeList.Head();
  103. while ( pSocket )
  104. {
  105. Socket *pNext = SocketList::Next( pSocket );
  106. pSocket->Close();
  107. pSocket = pNext;
  108. }
  109. while ( m_activeList.Head() )
  110. {
  111. ReleaseSocket( m_activeList.Head() );
  112. }
  113. while ( m_freeList.Head() )
  114. {
  115. DestroySocket( m_freeList.PopNode() );
  116. }
  117. if ( m_freeList.Count() + m_freeList.Count() != 0 )
  118. {
  119. /*
  120.  * call to unqualified virtual function
  121.  */
  122. OnError( _T("CSocketServer::ReleaseSockets() - Leaked sockets") );
  123. }
  124. }
  125. void CSocketServer::ReleaseBuffers()
  126. {
  127. Flush();
  128. }
  129. void CSocketServer::StartAcceptingConnections()
  130. {
  131. if ( INVALID_SOCKET == m_listeningSocket )
  132. {
  133. /*
  134.  * Call to unqualified virtual function
  135.  */
  136. OnStartAcceptingConnections();
  137. /*
  138.  * call to unqualified virtual function
  139.  */
  140. m_listeningSocket = CreateListeningSocket( m_address, m_port );
  141. m_acceptConnectionsEvent.Set();
  142. }
  143. }
  144. void CSocketServer::StopAcceptingConnections()
  145. {
  146. if ( INVALID_SOCKET != m_listeningSocket )
  147. {
  148. m_acceptConnectionsEvent.Reset();
  149. if ( 0 != ::closesocket( m_listeningSocket ) )
  150. {
  151. /*
  152.  * Call to unqualified virtual function
  153.  */
  154. OnError( _T("CSocketServer::StopAcceptingConnections() - closesocket - ") + GetLastErrorMessage( ::WSAGetLastError() ) );
  155. }
  156. m_listeningSocket = INVALID_SOCKET;
  157. /*
  158.  * Call to unqualified virtual function
  159.  */
  160. OnStopAcceptingConnections();
  161. }
  162. }
  163. void CSocketServer::InitiateShutdown()
  164. {
  165. /*
  166.  * Signal that the dispatch thread should shut down all worker threads and then exit
  167.  */
  168. StopAcceptingConnections();
  169. m_shutdownEvent.Set();
  170. /*
  171.  * Call to unqualified virtual function
  172.  */
  173. OnShutdownInitiated();
  174. }
  175. void CSocketServer::WaitForShutdownToComplete()
  176. {
  177. /*
  178.  * If we havent already started a shut down, do so...
  179.  */
  180. InitiateShutdown();
  181. Wait();
  182. }
  183. int CSocketServer::Run()
  184. {
  185. try
  186. {
  187. vector<WorkerThread *> workers;
  188. workers.reserve( m_numThreads );
  189. for ( size_t i = 0; i < m_numThreads; ++i )
  190. {
  191. /*
  192.  * Call to unqualified virtual function
  193.  */
  194. WorkerThread *pThread = CreateWorkerThread( m_iocp ); 
  195. workers.push_back( pThread );
  196. pThread->Start();
  197. }
  198. HANDLE handlesToWaitFor[2];
  199. handlesToWaitFor[0] = m_shutdownEvent.GetEvent();
  200. handlesToWaitFor[1] = m_acceptConnectionsEvent.GetEvent();
  201. while ( !m_shutdownEvent.Wait( 0 ) )
  202. {
  203. DWORD waitResult = ::WaitForMultipleObjects( 2, handlesToWaitFor, false, INFINITE );
  204. if ( waitResult == WAIT_OBJECT_0 )
  205. {
  206. /*
  207.  * Time to shutdown
  208.  */
  209. break;
  210. }
  211. else if ( waitResult == WAIT_OBJECT_0 + 1 )
  212. {
  213. /*
  214.  * accept connections
  215.  */
  216. while ( !m_shutdownEvent.Wait( 0 ) && m_acceptConnectionsEvent.Wait( 0 ) )
  217. {
  218. CIOBuffer *pAddress = Allocate();
  219. int addressSize = (int)pAddress->GetSize();
  220. SOCKET acceptedSocket = ::WSAAccept(
  221. m_listeningSocket, 
  222. reinterpret_cast<sockaddr *>(const_cast<BYTE *>( pAddress->GetBuffer() ) ), 
  223. &addressSize, 
  224. 0, 
  225. 0);
  226. pAddress->Use( addressSize );
  227. if ( acceptedSocket != INVALID_SOCKET )
  228. {
  229. Socket *pSocket = AllocateSocket( acceptedSocket );
  230. /*
  231.  * Call to unqualified virtual function
  232.  */
  233. OnConnectionEstablished( pSocket, pAddress );
  234. }
  235. else if ( m_acceptConnectionsEvent.Wait( 0 ) )
  236. {
  237. /*
  238.  * Call to unqualified virtual function
  239.  */
  240. OnError( _T("CSocketServer::Run() - WSAAccept:") + GetLastErrorMessage( ::WSAGetLastError() ) );
  241. }
  242. pAddress->Release();
  243. }
  244. }
  245. else
  246. {
  247. /*
  248.  * Call to unqualified virtual function
  249.  */
  250. OnError( _T("CSocketServer::Run() - WaitForMultipleObjects: ") + GetLastErrorMessage( ::GetLastError() ) );
  251. }
  252. }
  253. for ( i = 0; i < m_numThreads; ++i )
  254. {
  255. workers[i]->InitiateShutdown();
  256. }  
  257. for ( i = 0; i < m_numThreads; ++i )
  258. {
  259. workers[i]->WaitForShutdownToComplete();
  260. delete workers[i];
  261. workers[i] = 0;
  262. }  
  263. }
  264. catch( const CException &e )
  265. {
  266. /*
  267.  * Call to unqualified virtual function
  268.  */
  269. OnError( _T("CSocketServer::Run() - Exception: ") + e.GetWhere() + _T(" - ") + e.GetMessage() );
  270. }
  271. catch(...)
  272. {
  273. /*
  274.  * Call to unqualified virtual function
  275.  */
  276. OnError( _T("CSocketServer::Run() - Unexpected exception") );
  277. }
  278. /*
  279.  * Call to unqualified virtual function
  280.  */
  281. OnShutdownComplete();
  282. return 0;
  283. }
  284. CSocketServer::Socket *CSocketServer::AllocateSocket( SOCKET theSocket )
  285. {
  286. CCriticalSection::Owner lock( m_listManipulationSection );
  287. Socket *pSocket = 0;
  288. if ( !m_freeList.Empty() )
  289. {
  290. pSocket = m_freeList.PopNode();
  291. pSocket->Attach( theSocket );
  292. pSocket->AddRef();
  293. }
  294. else
  295. {
  296. pSocket = new Socket( *this, theSocket );
  297. /*
  298.  * Call to unqualified virtual function
  299.  */
  300. OnConnectionCreated();
  301. }
  302. m_activeList.PushNode( pSocket );
  303. /*
  304.  * suspicious cast
  305.  */
  306. m_iocp.AssociateDevice( reinterpret_cast<HANDLE>( theSocket ), (ULONG_PTR)pSocket );
  307. return pSocket;
  308. }
  309. void CSocketServer::ReleaseSocket( Socket *pSocket )
  310. {
  311. if ( !pSocket )
  312. {
  313. throw CException( _T("CSocketServer::ReleaseSocket()"), _T("pSocket is null") );
  314. }
  315. CCriticalSection::Owner lock( m_listManipulationSection );
  316. pSocket->RemoveFromList();
  317. if ( m_maxFreeSockets == 0 || m_freeList.Count() < m_maxFreeSockets )
  318. {
  319. m_freeList.PushNode( pSocket );
  320. }
  321. else
  322. {
  323. DestroySocket( pSocket );
  324. }
  325. }
  326. void CSocketServer::DestroySocket( Socket *pSocket )
  327. {
  328.    delete pSocket;
  329.    /*
  330.     * Call to unqualified virtual function
  331. */
  332.    OnConnectionDestroyed();
  333. }
  334. void CSocketServer::PostAbortiveClose( Socket *pSocket )
  335. {
  336. CIOBuffer *pBuffer = Allocate();
  337. pBuffer->SetUserData( IO_Close );
  338. pSocket->AddRef();
  339. m_iocp.PostStatus( (ULONG_PTR)pSocket, 0, pBuffer );
  340. }
  341. void CSocketServer::Read( Socket *pSocket, CIOBuffer *pBuffer )
  342. {
  343. /*
  344.  * Post a read request to the iocp so that the actual socket read gets performed by
  345.  * one of our IO threads...
  346.  */
  347. if ( !pBuffer )
  348. {
  349. pBuffer = Allocate();
  350. }
  351. else
  352. {
  353. pBuffer->AddRef();
  354. }
  355. pBuffer->SetUserData( IO_Read_Request );
  356. pSocket->AddRef();
  357. m_iocp.PostStatus( (ULONG_PTR)pSocket, 0, pBuffer );
  358. }
  359. void CSocketServer::Write(
  360.    Socket *pSocket,
  361.    const char *pData,
  362.    size_t dataLength, 
  363.    bool thenShutdown)
  364. {
  365. if ( !pSocket || !pData || dataLength <= 0 )
  366. {
  367. return;
  368. }
  369.    /*
  370.     * Post a write request to the iocp so that the actual socket write gets performed by
  371. * one of our IO threads...
  372. */
  373.    CIOBuffer *pBuffer = Allocate();
  374.    
  375.    /*
  376.     * Call to unqualified virtual function
  377.     */
  378. #ifdef NETWORK_DEBUG
  379. //{
  380.    PreWrite( pSocket, pBuffer, pData, dataLength + PACK_HEADER_LEN + sizeof(BYTE) );
  381. //}
  382. #else
  383. //{
  384.    PreWrite( pSocket, pBuffer, pData, dataLength );
  385. //}
  386. #endif // NETWORK_DEBUG
  387.    pBuffer->AddData( pData, dataLength );
  388.    
  389. #ifdef NETWORK_DEBUG
  390. //{
  391. const BYTE *pPackData = pBuffer->GetBuffer();
  392. PACK_HEADER ph = {0};
  393. memcpy( (BYTE *)&ph, pPackData, PACK_HEADER_LEN );
  394. pBuffer->AddData( (BYTE *)&ph, PACK_HEADER_LEN );
  395. pBuffer->AddData( 0xAA );
  396. //}
  397. #endif // NETWORK_DEBUG
  398.    pBuffer->SetUserData( IO_Write_Request );
  399.    pSocket->AddRef();
  400.    m_iocp.PostStatus( (ULONG_PTR)pSocket, thenShutdown, pBuffer );
  401. }
  402. void CSocketServer::Write(
  403.    Socket *pSocket,
  404.    CIOBuffer *pBuffer, 
  405.    bool thenShutdown)
  406. {
  407.    /*
  408.     * Post a write request to the iocp so that the actual socket write gets performed by
  409. * one of our IO threads...
  410. */
  411.    pBuffer->AddRef();
  412.    pBuffer->SetUserData( IO_Write_Request );
  413.    pSocket->AddRef();
  414.    m_iocp.PostStatus( (ULONG_PTR)pSocket, thenShutdown, pBuffer );
  415. }
  416. void CSocketServer::OnError( const _tstring &message )
  417. {
  418. Output( message );
  419. }
  420.   
  421. /*
  422.  * CSocketServer::Socket
  423.  */
  424. CSocketServer::Socket::Socket( CSocketServer &server, SOCKET theSocket )
  425.    :  m_server(server),
  426.   m_socket(theSocket),
  427.   m_ref(1)
  428. {
  429. if ( INVALID_SOCKET == m_socket )
  430. {
  431. throw CException( _T("CSocketServer::Socket::Socket()"), _T("Invalid socket") );
  432. }
  433. }
  434. CSocketServer::Socket::~Socket()
  435. {
  436. }
  437. void CSocketServer::Socket::Attach( SOCKET theSocket )
  438. {
  439. if ( INVALID_SOCKET != m_socket )
  440. {
  441. throw CException( _T("CSocketServer::Socket::Attach()"), _T("Socket already attached") );
  442. }
  443. m_socket = theSocket;
  444. SetUserData( 0 );
  445. }
  446. void CSocketServer::Socket::AddRef()
  447. {
  448. ::InterlockedIncrement( &m_ref );
  449. }
  450. void CSocketServer::Socket::Release()
  451. {
  452. if ( 0 == ::InterlockedDecrement( &m_ref ) )
  453. {
  454. m_server.ReleaseSocket( this );
  455. }
  456. }
  457. void CSocketServer::Socket::Shutdown( int how /* = SD_BOTH */ )
  458. {
  459. Output( _T("CSocketServer::Socket::Shutdown() ") + ToString( how ) );
  460. if ( INVALID_SOCKET != m_socket )
  461. {
  462. if ( 0 != ::shutdown( m_socket, how ) )
  463. {
  464. m_server.OnError( _T("CSocketServer::Server::Shutdown() - ") + GetLastErrorMessage( ::WSAGetLastError() ) );
  465. }
  466. Output( _T("shutdown initiated") );
  467. }
  468. }
  469. void CSocketServer::Socket::Close()
  470. {
  471. CCriticalSection::Owner lock( m_server.m_listManipulationSection );
  472. if ( INVALID_SOCKET != m_socket )
  473. {
  474. if ( 0 != ::closesocket( m_socket ) )
  475. {
  476. m_server.OnError(_T("CSocketServer::Socket::Close() - closesocket - ") + GetLastErrorMessage( ::WSAGetLastError() ) );
  477. }
  478. m_socket = INVALID_SOCKET;
  479. m_server.OnConnectionClosed( this );
  480. Release();
  481. }
  482. }
  483. void CSocketServer::Socket::AbortiveClose()
  484. {
  485. m_server.PostAbortiveClose( this );
  486. }
  487. void CSocketServer::Socket::Read( CIOBuffer *pBuffer /* = 0 */ )
  488. {
  489. m_server.Read( this, pBuffer );
  490. }
  491. void CSocketServer::Socket::Write(
  492.    const char *pData, 
  493.    size_t dataLength,
  494.    bool thenShutdown /* = false */)
  495. {
  496. m_server.Write( this, pData, dataLength, thenShutdown );
  497. }
  498. void CSocketServer::Socket::Write(
  499.    CIOBuffer *pBuffer,
  500.    bool thenShutdown /* = false */)
  501. {
  502. m_server.Write( this, pBuffer, thenShutdown );
  503. }
  504. /*
  505.  * CSocketServer::WorkerThread
  506.  */
  507. CSocketServer::WorkerThread::WorkerThread( CIOCompletionPort &iocp )
  508. : m_iocp(iocp)
  509. {
  510.    /*
  511.     * All work done in initialiser list
  512. */
  513. }
  514. int CSocketServer::WorkerThread::Run()
  515. {
  516. try
  517. {
  518. while ( true )
  519. {
  520. /*
  521.  * Continually loop to service io completion packets
  522.  */
  523. bool closeSocket = false;
  524. DWORD dwIoSize = 0;
  525. Socket *pSocket = 0;
  526. CIOBuffer *pBuffer = 0;
  527. try
  528. {
  529. m_iocp.GetStatus( (PDWORD_PTR)&pSocket, &dwIoSize, (OVERLAPPED **)&pBuffer );
  530. }
  531. catch (const CWin32Exception &e)
  532. {
  533. if ( e.GetError() != ERROR_NETNAME_DELETED &&
  534. e.GetError() != WSA_OPERATION_ABORTED )
  535. {
  536. throw;
  537. }
  538. Output( _T("IOCP error [client connection dropped] - ") +
  539. GetLastErrorMessage( ::WSAGetLastError() ) );
  540. closeSocket = true;
  541. }
  542. if ( !pSocket )
  543. {
  544. /*
  545.  * A completion key of 0 is posted to the iocp to request us to shut down...
  546.  */
  547. break;
  548. }
  549. /*
  550.  * Call to unqualified virtual function
  551.  */
  552. OnBeginProcessing();
  553. if ( pBuffer )
  554. {
  555. const IO_Operation operation = static_cast<IO_Operation>( pBuffer->GetUserData() );
  556. switch ( operation )
  557. {
  558. case IO_Read_Request:
  559. Read( pSocket, pBuffer );
  560. break;
  561. case IO_Read_Completed :
  562. if ( 0 != dwIoSize )
  563. {
  564. pBuffer->Use( dwIoSize );
  565. //DEBUG_ONLY( Output(_T("RX: ") + ToString(pBuffer) + _T("n") + DumpData(reinterpret_cast<const BYTE*>( pBuffer->GetWSABUF()->buf), dwIoSize, 40) ) );
  566. /*
  567.  * Call to unqualified virtual function
  568.  */
  569. ReadCompleted( pSocket, pBuffer );
  570. }
  571. else
  572. {
  573. /*
  574.  * client connection dropped...
  575.  */
  576. Output( _T("ReadCompleted - 0 bytes - client connection dropped") );
  577. closeSocket = true;
  578. }
  579. pSocket->Release();
  580. pBuffer->Release();
  581. break;
  582. case IO_Write_Request :
  583. Write( pSocket, pBuffer );
  584. if ( dwIoSize != 0 )
  585. {
  586. /*
  587.  * final write, now shutdown send side of connection
  588.  */
  589. pSocket->Shutdown( SD_SEND );
  590. }
  591. break;
  592. case IO_Write_Completed :
  593. pBuffer->Use( dwIoSize );
  594. //DEBUG_ONLY( Output(_T("TX: ") + ToString(pBuffer) + _T("n") + DumpData(reinterpret_cast<const BYTE*>( pBuffer->GetWSABUF()->buf), dwIoSize, 40) ) );
  595. /*
  596.  * Call to unqualified virtual function
  597.  */
  598. WriteCompleted( pSocket, pBuffer );
  599. pSocket->Release();
  600. pBuffer->Release();
  601. break;
  602. case IO_Close :
  603. AbortiveClose( pSocket );
  604. pSocket->Release();
  605. pBuffer->Release();
  606. break;
  607. default :
  608. /*
  609.  * all to unqualified virtual function
  610.  */
  611. OnError( _T("CSocketServer::WorkerThread::Run() - Unexpected operation") );
  612. break;
  613. }
  614. else
  615. {
  616. /*
  617.  * Call to unqualified virtual function
  618.  */
  619. OnError( _T("CSocketServer::WorkerThread::Run() - Unexpected - pBuffer is 0") );
  620. }
  621. if ( closeSocket )
  622. {
  623. pSocket->Close();
  624. }
  625. /*
  626.  * Call to unqualified virtual function
  627.  */
  628. OnEndProcessing();
  629.       } 
  630.    }
  631.    catch(const CException &e)
  632.    {
  633.    /*
  634.     * Call to unqualified virtual function
  635. */
  636.    OnError( _T("CSocketServer::WorkerThread::Run() - Exception: ") + e.GetWhere() + _T(" - ") + e.GetMessage() );
  637.    }
  638.    catch(...)
  639.    {
  640.    /*
  641.     * Call to unqualified virtual function
  642. */
  643.    OnError( _T("CSocketServer::WorkerThread::Run() - Unexpected exception") );
  644.    }
  645.    
  646.    return 0;
  647. }
  648. void CSocketServer::WorkerThread::InitiateShutdown()
  649. {
  650. m_iocp.PostStatus( 0 );
  651. }
  652. void CSocketServer::WorkerThread::WaitForShutdownToComplete()
  653. {
  654. /*
  655.      * If we havent already started a shut down, do so...
  656.  */
  657. InitiateShutdown();
  658. Wait();
  659. }
  660. void CSocketServer::WorkerThread::Read( Socket *pSocket, CIOBuffer *pBuffer ) const
  661. {
  662. pBuffer->SetUserData( IO_Read_Completed );
  663. pBuffer->SetupRead();
  664. DWORD dwNumBytes = 0;
  665. DWORD dwFlags = 0;
  666. if ( SOCKET_ERROR == ::WSARecv(
  667. pSocket->m_socket, 
  668. pBuffer->GetWSABUF(), 
  669. 1, 
  670. &dwNumBytes,
  671. &dwFlags,
  672. pBuffer, 
  673. NULL))
  674. {
  675. DWORD lastError = ::WSAGetLastError();
  676. if ( ERROR_IO_PENDING != lastError )
  677. {
  678. Output( _T("CSocketServer::Read() - WSARecv: ") + GetLastErrorMessage( lastError ) );
  679. if ( lastError == WSAECONNABORTED || 
  680.  lastError == WSAECONNRESET ||
  681.  lastError == WSAEDISCON)
  682. {
  683. pSocket->Close();
  684. }
  685. pSocket->Release();
  686. pBuffer->Release();
  687. }
  688. }
  689. }
  690. void CSocketServer::WorkerThread::Write( Socket *pSocket, CIOBuffer *pBuffer ) const
  691. {
  692. pBuffer->SetUserData( IO_Write_Completed );
  693. pBuffer->SetupWrite();
  694. DWORD dwFlags = 0;
  695. DWORD dwSendNumBytes = 0;
  696. if ( SOCKET_ERROR == ::WSASend(
  697. pSocket->m_socket,
  698. pBuffer->GetWSABUF(), 
  699. 1, 
  700. &dwSendNumBytes,
  701. dwFlags,
  702. pBuffer, 
  703. NULL) )
  704. {
  705. DWORD lastError = ::WSAGetLastError();
  706. if ( ERROR_IO_PENDING != lastError )
  707. {
  708. Output( _T("CSocketServer::Write() - WSASend: ") + GetLastErrorMessage( lastError ) );
  709. if ( lastError == WSAECONNABORTED || 
  710.  lastError == WSAECONNRESET ||
  711.  lastError == WSAEDISCON)
  712. {
  713. pSocket->Close();
  714. }
  715. pSocket->Release();
  716. pBuffer->Release();
  717. }
  718. }
  719. }
  720. void CSocketServer::WorkerThread::WriteCompleted( Socket * /*pSocket*/, CIOBuffer *pBuffer )
  721. {
  722. if ( pBuffer->GetUsed() != pBuffer->GetWSABUF()->len )
  723. {
  724. /*
  725.      * Call to unqualified virtual function
  726.  */
  727. _ASSERT(_T("CSocketServer::WorkerThread::WriteCompleted - Socket write where not all data was written"));
  728. }
  729.    /*
  730.     * Pointer pBuffer could be declared const (but not in derived classes...)
  731. */
  732. }
  733. void CSocketServer::WorkerThread::AbortiveClose( Socket *pSocket )
  734. {
  735.    /*
  736.     * Force an abortive close.
  737. */
  738. LINGER lingerStruct;
  739. lingerStruct.l_onoff = 1;
  740. lingerStruct.l_linger = 0;
  741. if ( SOCKET_ERROR == ::setsockopt( pSocket->m_socket, SOL_SOCKET, SO_LINGER, (char *)&lingerStruct, sizeof(lingerStruct) ) )
  742. {
  743. /*
  744.  * Call to unqualified virtual function
  745.  */
  746. OnError( _T("CSocketServer::Socket::AbortiveClose() - setsockopt(SO_LINGER) - ")  + GetLastErrorMessage( ::WSAGetLastError() ) );
  747. }
  748. pSocket->Close();
  749. }
  750. void CSocketServer::WorkerThread::OnError( const _tstring &message )
  751. {
  752. Output( message );
  753. }
  754. } // End of namespace OnlineGameLib
  755. } // End of namespace Win32