SocketServer.cpp
上传用户:dzyhzl
上传日期:2019-04-29
资源大小:56270k
文件大小:20k
源码类别:

模拟服务器

开发平台:

C/C++

  1. #include "SocketServer.h"
  2. #include "IOCompletionPort.h"
  3. #include "Win32Exception.h"
  4. #include "Utils.h"
  5. #include "SystemInfo.h"
  6. #include "..protocolprotocol.h"
  7. #include <vector>
  8. #pragma comment(lib, "ws2_32.lib")
  9. /*
  10.  * Link options and warning
  11.  */
  12. #pragma message( "NOTE : --------------------OnlineGameLib [Server] : Announcement--------------------" )
  13. #pragma message( "NOTE : Developing a truly scalable Winsock Server using IO Completion Ports(IOCP)" )
  14. #pragma message( "NOTE : But this module depend on the microsoft platform" )
  15. #pragma message( "NOTE : Requirements :" )
  16. #pragma message( "NOTE :    * Windows NT/2000/XP: Included in Windows NT 4.0 and later." )
  17. #pragma message( "NOTE :    * Windows 95/98/Me: Unsupported." )
  18. #pragma message( "NOTE :" )
  19. #pragma message( "NOTE : liupeng xishanju.zhuhai.china 2003.1" )
  20. #pragma message( "NOTE : -----------------------------------------------------------------------------" )
  21. using std::vector;
  22. /*
  23.  * namespace OnlineGameLib::Win32
  24.  */
  25. namespace OnlineGameLib {
  26. namespace Win32 {
  27. /*
  28.  * Local enums environment
  29.  */
  30. enum IO_Operation 
  31. IO_Read_Request, 
  32. IO_Read_Completed, 
  33. IO_Write_Request, 
  34. IO_Write_Completed,
  35. IO_Close
  36. };
  37. /*
  38.  * Static helper methods
  39.  */
  40. static size_t CalculateNumberOfThreads( size_t numThreads );
  41. static size_t CalculateNumberOfThreads( size_t numThreads )
  42. {
  43. if ( 0 == numThreads )
  44. {
  45. CSystemInfo systemInfo;
  46. numThreads = systemInfo.dwNumberOfProcessors * 2;
  47. }
  48. return numThreads;
  49. }
  50. /*
  51.  * Copy
  52.  */
  53. tagSockAddrIn& tagSockAddrIn::Copy( const tagSockAddrIn& sin )
  54. {
  55. memcpy( &this->sockAddrIn, &sin.sockAddrIn, Size() );
  56. return *this;
  57. }
  58. /*
  59.  * IsEqual
  60.  */
  61. bool tagSockAddrIn::IsEqual( const tagSockAddrIn& sin )
  62. {
  63. /*
  64.  * Is it Equal? - ignore 'sin_zero'
  65.  */
  66. return ( memcmp( &this->sockAddrIn, &sin.sockAddrIn, Size()-sizeof( sockAddrIn.sin_zero ) ) == 0 );
  67. }
  68. /*
  69.  * IsGreater
  70.  */
  71. bool tagSockAddrIn::IsGreater( const tagSockAddrIn& sin )
  72. {
  73. /*
  74.  * Is it Greater? - ignore 'sin_zero'
  75.  */
  76. return ( memcmp( &this->sockAddrIn, &sin.sockAddrIn, Size()-sizeof( sockAddrIn.sin_zero ) ) > 0 );
  77. }
  78. /*
  79.  * IsLower
  80.  */
  81. bool tagSockAddrIn::IsLower( const tagSockAddrIn& sin )
  82. {
  83. /*
  84.  * Is it Lower? - ignore 'sin_zero'
  85.  */
  86. return ( memcmp( &this->sockAddrIn, &sin.sockAddrIn, Size()-sizeof( sockAddrIn.sin_zero ) ) < 0 );
  87. }
  88. /*
  89.  * CSocketServer 
  90.  */
  91. CSocketServer::CSocketServer(
  92.    unsigned long addressToListenOn,
  93.    unsigned short portToListenOn,
  94.    size_t maxFreeSockets,
  95.    size_t maxFreeBuffers,
  96.    size_t bufferSize /* = 1024 */,
  97.    size_t numThreads /* = 0 */)
  98. : CIOBuffer::Allocator(bufferSize, maxFreeBuffers),
  99.   m_numThreads(CalculateNumberOfThreads( numThreads )),
  100.   m_listeningSocket(INVALID_SOCKET),
  101.   m_iocp(0),
  102.   m_address(addressToListenOn),
  103.   m_port(portToListenOn),
  104.   m_maxFreeSockets(maxFreeSockets)
  105. {
  106. }
  107. CSocketServer::CSocketServer(
  108. size_t maxFreeSockets,
  109. size_t maxFreeBuffers,
  110. size_t bufferSize /* = 1024 */,
  111. size_t numThreads /* = 0 */)
  112. : CIOBuffer::Allocator(bufferSize, maxFreeBuffers),
  113.   m_numThreads(CalculateNumberOfThreads( numThreads )),
  114.   m_listeningSocket(INVALID_SOCKET),
  115.   m_iocp(0),
  116.   m_maxFreeSockets(maxFreeSockets)
  117. {
  118. }
  119. void CSocketServer::Open( 
  120. unsigned long addressToListenOn,
  121. unsigned short portToListenOn
  122. )
  123. {
  124. m_address = addressToListenOn;
  125. m_port = portToListenOn;
  126. Start();
  127. }
  128. CSocketServer::~CSocketServer()
  129. {
  130. try
  131. {
  132. ReleaseSockets();
  133. }
  134. catch(...)
  135. {
  136. }
  137. }
  138. void CSocketServer::ReleaseSockets()
  139. {
  140. CCriticalSection::Owner lock( m_listManipulationSection );
  141. Socket *pSocket = m_activeList.Head();
  142. while ( pSocket )
  143. {
  144. Socket *pNext = SocketList::Next( pSocket );
  145. pSocket->Close();
  146. pSocket = pNext;
  147. }
  148. while ( m_activeList.Head() )
  149. {
  150. ReleaseSocket( m_activeList.Head() );
  151. }
  152. while ( m_freeList.Head() )
  153. {
  154. DestroySocket( m_freeList.PopNode() );
  155. }
  156. if ( m_freeList.Count() + m_freeList.Count() != 0 )
  157. {
  158. /*
  159.  * call to unqualified virtual function
  160.  */
  161. OnError( _T("CSocketServer::ReleaseSockets() - Leaked sockets") );
  162. }
  163. }
  164. void CSocketServer::ReleaseBuffers()
  165. {
  166. Flush();
  167. }
  168. void CSocketServer::StartAcceptingConnections()
  169. {
  170. if ( INVALID_SOCKET == m_listeningSocket )
  171. {
  172. /*
  173.  * Call to unqualified virtual function
  174.  */
  175. OnStartAcceptingConnections();
  176. /*
  177.  * call to unqualified virtual function
  178.  */
  179. m_listeningSocket = CreateListeningSocket( m_address, m_port );
  180. m_acceptConnectionsEvent.Set();
  181. }
  182. }
  183. void CSocketServer::StopAcceptingConnections()
  184. {
  185. if ( INVALID_SOCKET != m_listeningSocket )
  186. {
  187. m_acceptConnectionsEvent.Reset();
  188. if ( 0 != ::closesocket( m_listeningSocket ) )
  189. {
  190. /*
  191.  * Call to unqualified virtual function
  192.  */
  193. OnError( _T("CSocketServer::StopAcceptingConnections() - closesocket - ") + GetLastErrorMessage( ::WSAGetLastError() ) );
  194. }
  195. m_listeningSocket = INVALID_SOCKET;
  196. /*
  197.  * Call to unqualified virtual function
  198.  */
  199. OnStopAcceptingConnections();
  200. }
  201. }
  202. void CSocketServer::InitiateShutdown()
  203. {
  204. /*
  205.  * Signal that the dispatch thread should shut down all worker threads and then exit
  206.  */
  207. StopAcceptingConnections();
  208. m_shutdownEvent.Set();
  209. /*
  210.  * Call to unqualified virtual function
  211.  */
  212. OnShutdownInitiated();
  213. }
  214. void CSocketServer::WaitForShutdownToComplete()
  215. {
  216. /*
  217.  * If we havent already started a shut down, do so...
  218.  */
  219. InitiateShutdown();
  220. Wait();
  221. }
  222. int CSocketServer::Run()
  223. {
  224. try
  225. {
  226. vector<WorkerThread *> workers;
  227. workers.reserve( m_numThreads );
  228. for ( size_t i = 0; i < m_numThreads; ++i )
  229. {
  230. /*
  231.  * Call to unqualified virtual function
  232.  */
  233. WorkerThread *pThread = CreateWorkerThread( m_iocp ); 
  234. workers.push_back( pThread );
  235. pThread->Start();
  236. }
  237. HANDLE handlesToWaitFor[2];
  238. handlesToWaitFor[0] = m_shutdownEvent.GetEvent();
  239. handlesToWaitFor[1] = m_acceptConnectionsEvent.GetEvent();
  240. while ( !m_shutdownEvent.Wait( 0 ) )
  241. {
  242. DWORD waitResult = ::WaitForMultipleObjects( 2, handlesToWaitFor, false, INFINITE );
  243. if ( waitResult == WAIT_OBJECT_0 )
  244. {
  245. /*
  246.  * Time to shutdown
  247.  */
  248. break;
  249. }
  250. else if ( waitResult == WAIT_OBJECT_0 + 1 )
  251. {
  252. /*
  253.  * accept connections
  254.  */
  255. while ( !m_shutdownEvent.Wait( 0 ) && m_acceptConnectionsEvent.Wait( 0 ) )
  256. {
  257. CIOBuffer *pAddress = Allocate();
  258. int addressSize = (int)pAddress->GetSize();
  259. SOCKET acceptedSocket = ::WSAAccept(
  260. m_listeningSocket, 
  261. reinterpret_cast<sockaddr *>(const_cast<BYTE *>( pAddress->GetBuffer() ) ), 
  262. &addressSize, 
  263. 0, 
  264. 0);
  265. pAddress->Use( addressSize );
  266. if ( acceptedSocket != INVALID_SOCKET )
  267. {
  268. Socket *pSocket = AllocateSocket( acceptedSocket );
  269. /*
  270.  * Call to unqualified virtual function
  271.  */
  272. OnConnectionEstablished( pSocket, pAddress );
  273. }
  274. else if ( m_acceptConnectionsEvent.Wait( 0 ) )
  275. {
  276. /*
  277.  * Call to unqualified virtual function
  278.  */
  279. OnError( _T("CSocketServer::Run() - WSAAccept:") + GetLastErrorMessage( ::WSAGetLastError() ) );
  280. }
  281. pAddress->Release();
  282. }
  283. }
  284. else
  285. {
  286. /*
  287.  * Call to unqualified virtual function
  288.  */
  289. OnError( _T("CSocketServer::Run() - WaitForMultipleObjects: ") + GetLastErrorMessage( ::GetLastError() ) );
  290. }
  291. }
  292. for ( i = 0; i < m_numThreads; ++i )
  293. {
  294. workers[i]->InitiateShutdown();
  295. }  
  296. for ( i = 0; i < m_numThreads; ++i )
  297. {
  298. workers[i]->WaitForShutdownToComplete();
  299. delete workers[i];
  300. workers[i] = 0;
  301. }  
  302. }
  303. catch( const CException &e )
  304. {
  305. /*
  306.  * Call to unqualified virtual function
  307.  */
  308. OnError( _T("CSocketServer::Run() - Exception: ") + e.GetWhere() + _T(" - ") + e.GetMessage() );
  309. }
  310. catch(...)
  311. {
  312. /*
  313.  * Call to unqualified virtual function
  314.  */
  315. OnError( _T("CSocketServer::Run() - Unexpected exception") );
  316. }
  317. /*
  318.  * Call to unqualified virtual function
  319.  */
  320. OnShutdownComplete();
  321. return 0;
  322. }
  323. CSocketServer::Socket *CSocketServer::AllocateSocket( SOCKET theSocket )
  324. {
  325. CCriticalSection::Owner lock( m_listManipulationSection );
  326. Socket *pSocket = 0;
  327. if ( !m_freeList.Empty() )
  328. {
  329. pSocket = m_freeList.PopNode();
  330. pSocket->Attach( theSocket );
  331. pSocket->AddRef();
  332. }
  333. else
  334. {
  335. pSocket = new Socket( *this, theSocket );
  336. /*
  337.  * Call to unqualified virtual function
  338.  */
  339. OnConnectionCreated();
  340. }
  341. m_activeList.PushNode( pSocket );
  342. /*
  343.  * suspicious cast
  344.  */
  345. m_iocp.AssociateDevice( reinterpret_cast<HANDLE>( theSocket ), (ULONG_PTR)pSocket );
  346. return pSocket;
  347. }
  348. void CSocketServer::ReleaseSocket( Socket *pSocket )
  349. {
  350. if ( !pSocket )
  351. {
  352. throw CException( _T("CSocketServer::ReleaseSocket()"), _T("pSocket is null") );
  353. }
  354. CCriticalSection::Owner lock( m_listManipulationSection );
  355. pSocket->RemoveFromList();
  356. if ( m_maxFreeSockets == 0 || m_freeList.Count() < m_maxFreeSockets )
  357. {
  358. m_freeList.PushNode( pSocket );
  359. }
  360. else
  361. {
  362. DestroySocket( pSocket );
  363. }
  364. }
  365. void CSocketServer::DestroySocket( Socket *pSocket )
  366. {
  367.    delete pSocket;
  368.    /*
  369.     * Call to unqualified virtual function
  370. */
  371.    OnConnectionDestroyed();
  372. }
  373. void CSocketServer::PostAbortiveClose( Socket *pSocket )
  374. {
  375. CIOBuffer *pBuffer = Allocate();
  376. pBuffer->SetUserData( IO_Close );
  377. pSocket->AddRef();
  378. m_iocp.PostStatus( (ULONG_PTR)pSocket, 0, pBuffer );
  379. }
  380. void CSocketServer::Read( Socket *pSocket, CIOBuffer *pBuffer )
  381. {
  382. /*
  383.  * Post a read request to the iocp so that the actual socket read gets performed by
  384.  * one of our IO threads...
  385.  */
  386. if ( !pBuffer )
  387. {
  388. pBuffer = Allocate();
  389. }
  390. else
  391. {
  392. pBuffer->AddRef();
  393. }
  394. pBuffer->SetUserData( IO_Read_Request );
  395. pSocket->AddRef();
  396. m_iocp.PostStatus( (ULONG_PTR)pSocket, 0, pBuffer );
  397. }
  398. void CSocketServer::Write(
  399.    Socket *pSocket,
  400.    const char *pData,
  401.    size_t dataLength, 
  402.    bool thenShutdown)
  403. {
  404. if ( !pSocket || !pData || dataLength <= 0 )
  405. {
  406. return;
  407. }
  408.    /*
  409.     * Post a write request to the iocp so that the actual socket write gets performed by
  410. * one of our IO threads...
  411. */
  412.    CIOBuffer *pBuffer = Allocate();
  413.    
  414.    /*
  415.     * Call to unqualified virtual function
  416.     */
  417. #ifdef NETWORK_DEBUG
  418. //{
  419.    PreWrite( pSocket, pBuffer, pData, dataLength + PACK_HEADER_LEN + sizeof(BYTE) );
  420. //}
  421. #else
  422. //{
  423.    PreWrite( pSocket, pBuffer, pData, dataLength );
  424. //}
  425. #endif // NETWORK_DEBUG
  426.    pBuffer->AddData( pData, dataLength );
  427.    
  428. #ifdef NETWORK_DEBUG
  429. //{
  430. const BYTE *pPackData = pBuffer->GetBuffer();
  431. PACK_HEADER ph = {0};
  432. memcpy( (BYTE *)&ph, pPackData, PACK_HEADER_LEN );
  433. pBuffer->AddData( (BYTE *)&ph, PACK_HEADER_LEN );
  434. pBuffer->AddData( 0xAA );
  435. //}
  436. #endif // NETWORK_DEBUG
  437.    pBuffer->SetUserData( IO_Write_Request );
  438.    pSocket->AddRef();
  439.    m_iocp.PostStatus( (ULONG_PTR)pSocket, thenShutdown, pBuffer );
  440. }
  441. void CSocketServer::Write(
  442.    Socket *pSocket,
  443.    CIOBuffer *pBuffer, 
  444.    bool thenShutdown)
  445. {
  446.    /*
  447.     * Post a write request to the iocp so that the actual socket write gets performed by
  448. * one of our IO threads...
  449. */
  450.    pBuffer->AddRef();
  451.    pBuffer->SetUserData( IO_Write_Request );
  452.    pSocket->AddRef();
  453.    m_iocp.PostStatus( (ULONG_PTR)pSocket, thenShutdown, pBuffer );
  454. }
  455. void CSocketServer::OnError( const _tstring &message )
  456. {
  457. Output( message );
  458. }
  459.   
  460. /*
  461.  * CSocketServer::Socket
  462.  */
  463. CSocketServer::Socket::Socket( CSocketServer &server, SOCKET theSocket )
  464.    :  m_server(server),
  465.   m_socket(theSocket),
  466.   m_ref(1)
  467. {
  468. if ( INVALID_SOCKET == m_socket )
  469. {
  470. throw CException( _T("CSocketServer::Socket::Socket()"), _T("Invalid socket") );
  471. }
  472. }
  473. CSocketServer::Socket::~Socket()
  474. {
  475. }
  476. void CSocketServer::Socket::Attach( SOCKET theSocket )
  477. {
  478. if ( INVALID_SOCKET != m_socket )
  479. {
  480. throw CException( _T("CSocketServer::Socket::Attach()"), _T("Socket already attached") );
  481. }
  482. m_socket = theSocket;
  483. SetUserData( 0 );
  484. }
  485. void CSocketServer::Socket::AddRef()
  486. {
  487. ::InterlockedIncrement( &m_ref );
  488. }
  489. void CSocketServer::Socket::Release()
  490. {
  491. if ( 0 == ::InterlockedDecrement( &m_ref ) )
  492. {
  493. m_server.ReleaseSocket( this );
  494. }
  495. }
  496. void CSocketServer::Socket::Shutdown( int how /* = SD_BOTH */ )
  497. {
  498. Output( _T("CSocketServer::Socket::Shutdown() ") + ToString( how ) );
  499. if ( INVALID_SOCKET != m_socket )
  500. {
  501. if ( 0 != ::shutdown( m_socket, how ) )
  502. {
  503. m_server.OnError( _T("CSocketServer::Server::Shutdown() - ") + GetLastErrorMessage( ::WSAGetLastError() ) );
  504. }
  505. Output( _T("shutdown initiated") );
  506. }
  507. }
  508. void CSocketServer::Socket::Close()
  509. {
  510. CCriticalSection::Owner lock( m_server.m_listManipulationSection );
  511. if ( INVALID_SOCKET != m_socket )
  512. {
  513. if ( 0 != ::closesocket( m_socket ) )
  514. {
  515. m_server.OnError(_T("CSocketServer::Socket::Close() - closesocket - ") + GetLastErrorMessage( ::WSAGetLastError() ) );
  516. }
  517. m_socket = INVALID_SOCKET;
  518. m_server.OnConnectionClosed( this );
  519. Release();
  520. }
  521. }
  522. void CSocketServer::Socket::AbortiveClose()
  523. {
  524. m_server.PostAbortiveClose( this );
  525. }
  526. void CSocketServer::Socket::Read( CIOBuffer *pBuffer /* = 0 */ )
  527. {
  528. m_server.Read( this, pBuffer );
  529. }
  530. void CSocketServer::Socket::Write(
  531.    const char *pData, 
  532.    size_t dataLength,
  533.    bool thenShutdown /* = false */)
  534. {
  535. m_server.Write( this, pData, dataLength, thenShutdown );
  536. }
  537. void CSocketServer::Socket::Write(
  538.    CIOBuffer *pBuffer,
  539.    bool thenShutdown /* = false */)
  540. {
  541. m_server.Write( this, pBuffer, thenShutdown );
  542. }
  543. /*
  544.  * CSocketServer::WorkerThread
  545.  */
  546. CSocketServer::WorkerThread::WorkerThread( CIOCompletionPort &iocp )
  547. : m_iocp(iocp)
  548. {
  549.    /*
  550.     * All work done in initialiser list
  551. */
  552. }
  553. int CSocketServer::WorkerThread::Run()
  554. {
  555. try
  556. {
  557. while ( true )
  558. {
  559. /*
  560.  * Continually loop to service io completion packets
  561.  */
  562. bool closeSocket = false;
  563. DWORD dwIoSize = 0;
  564. Socket *pSocket = 0;
  565. CIOBuffer *pBuffer = 0;
  566. try
  567. {
  568. m_iocp.GetStatus( (PDWORD_PTR)&pSocket, &dwIoSize, (OVERLAPPED **)&pBuffer );
  569. }
  570. catch (const CWin32Exception &e)
  571. {
  572. if ( e.GetError() != ERROR_NETNAME_DELETED &&
  573. e.GetError() != WSA_OPERATION_ABORTED )
  574. {
  575. throw;
  576. }
  577. Output( _T("IOCP error [client connection dropped] - ") +
  578. GetLastErrorMessage( ::WSAGetLastError() ) );
  579. closeSocket = true;
  580. }
  581. if ( !pSocket )
  582. {
  583. /*
  584.  * A completion key of 0 is posted to the iocp to request us to shut down...
  585.  */
  586. break;
  587. }
  588. /*
  589.  * Call to unqualified virtual function
  590.  */
  591. OnBeginProcessing();
  592. if ( pBuffer )
  593. {
  594. const IO_Operation operation = static_cast<IO_Operation>( pBuffer->GetUserData() );
  595. switch ( operation )
  596. {
  597. case IO_Read_Request:
  598. Read( pSocket, pBuffer );
  599. break;
  600. case IO_Read_Completed :
  601. if ( 0 != dwIoSize )
  602. {
  603. pBuffer->Use( dwIoSize );
  604. //DEBUG_ONLY( Output(_T("RX: ") + ToString(pBuffer) + _T("n") + DumpData(reinterpret_cast<const BYTE*>( pBuffer->GetWSABUF()->buf), dwIoSize, 40) ) );
  605. /*
  606.  * Call to unqualified virtual function
  607.  */
  608. ReadCompleted( pSocket, pBuffer );
  609. }
  610. else
  611. {
  612. /*
  613.  * client connection dropped...
  614.  */
  615. Output( _T("ReadCompleted - 0 bytes - client connection dropped") );
  616. closeSocket = true;
  617. }
  618. pSocket->Release();
  619. pBuffer->Release();
  620. break;
  621. case IO_Write_Request :
  622. Write( pSocket, pBuffer );
  623. if ( dwIoSize != 0 )
  624. {
  625. /*
  626.  * final write, now shutdown send side of connection
  627.  */
  628. pSocket->Shutdown( SD_SEND );
  629. }
  630. break;
  631. case IO_Write_Completed :
  632. pBuffer->Use( dwIoSize );
  633. //DEBUG_ONLY( Output(_T("TX: ") + ToString(pBuffer) + _T("n") + DumpData(reinterpret_cast<const BYTE*>( pBuffer->GetWSABUF()->buf), dwIoSize, 40) ) );
  634. /*
  635.  * Call to unqualified virtual function
  636.  */
  637. WriteCompleted( pSocket, pBuffer );
  638. pSocket->Release();
  639. pBuffer->Release();
  640. break;
  641. case IO_Close :
  642. AbortiveClose( pSocket );
  643. pSocket->Release();
  644. pBuffer->Release();
  645. break;
  646. default :
  647. /*
  648.  * all to unqualified virtual function
  649.  */
  650. OnError( _T("CSocketServer::WorkerThread::Run() - Unexpected operation") );
  651. break;
  652. }
  653. else
  654. {
  655. /*
  656.  * Call to unqualified virtual function
  657.  */
  658. OnError( _T("CSocketServer::WorkerThread::Run() - Unexpected - pBuffer is 0") );
  659. }
  660. if ( closeSocket )
  661. {
  662. pSocket->Close();
  663. }
  664. /*
  665.  * Call to unqualified virtual function
  666.  */
  667. OnEndProcessing();
  668.       } 
  669.    }
  670.    catch(const CException &e)
  671.    {
  672.    /*
  673.     * Call to unqualified virtual function
  674. */
  675.    OnError( _T("CSocketServer::WorkerThread::Run() - Exception: ") + e.GetWhere() + _T(" - ") + e.GetMessage() );
  676.    }
  677.    catch(...)
  678.    {
  679.    /*
  680.     * Call to unqualified virtual function
  681. */
  682.    OnError( _T("CSocketServer::WorkerThread::Run() - Unexpected exception") );
  683.    }
  684.    
  685.    return 0;
  686. }
  687. void CSocketServer::WorkerThread::InitiateShutdown()
  688. {
  689. m_iocp.PostStatus( 0 );
  690. }
  691. void CSocketServer::WorkerThread::WaitForShutdownToComplete()
  692. {
  693. /*
  694.      * If we havent already started a shut down, do so...
  695.  */
  696. InitiateShutdown();
  697. Wait();
  698. }
  699. void CSocketServer::WorkerThread::Read( Socket *pSocket, CIOBuffer *pBuffer ) const
  700. {
  701. pBuffer->SetUserData( IO_Read_Completed );
  702. pBuffer->SetupRead();
  703. DWORD dwNumBytes = 0;
  704. DWORD dwFlags = 0;
  705. if ( SOCKET_ERROR == ::WSARecv(
  706. pSocket->m_socket, 
  707. pBuffer->GetWSABUF(), 
  708. 1, 
  709. &dwNumBytes,
  710. &dwFlags,
  711. pBuffer, 
  712. NULL))
  713. {
  714. DWORD lastError = ::WSAGetLastError();
  715. if ( ERROR_IO_PENDING != lastError )
  716. {
  717. Output( _T("CSocketServer::Read() - WSARecv: ") + GetLastErrorMessage( lastError ) );
  718. if ( lastError == WSAECONNABORTED || 
  719.  lastError == WSAECONNRESET ||
  720.  lastError == WSAEDISCON)
  721. {
  722. pSocket->Close();
  723. }
  724. pSocket->Release();
  725. pBuffer->Release();
  726. }
  727. }
  728. }
  729. void CSocketServer::WorkerThread::Write( Socket *pSocket, CIOBuffer *pBuffer ) const
  730. {
  731. pBuffer->SetUserData( IO_Write_Completed );
  732. pBuffer->SetupWrite();
  733. DWORD dwFlags = 0;
  734. DWORD dwSendNumBytes = 0;
  735. if ( SOCKET_ERROR == ::WSASend(
  736. pSocket->m_socket,
  737. pBuffer->GetWSABUF(), 
  738. 1, 
  739. &dwSendNumBytes,
  740. dwFlags,
  741. pBuffer, 
  742. NULL) )
  743. {
  744. DWORD lastError = ::WSAGetLastError();
  745. if ( ERROR_IO_PENDING != lastError )
  746. {
  747. Output( _T("CSocketServer::Write() - WSASend: ") + GetLastErrorMessage( lastError ) );
  748. if ( lastError == WSAECONNABORTED || 
  749.  lastError == WSAECONNRESET ||
  750.  lastError == WSAEDISCON)
  751. {
  752. pSocket->Close();
  753. }
  754. pSocket->Release();
  755. pBuffer->Release();
  756. }
  757. }
  758. }
  759. void CSocketServer::WorkerThread::WriteCompleted( Socket * /*pSocket*/, CIOBuffer *pBuffer )
  760. {
  761. if ( pBuffer->GetUsed() != pBuffer->GetWSABUF()->len )
  762. {
  763. /*
  764.      * Call to unqualified virtual function
  765.  */
  766. _ASSERT(_T("CSocketServer::WorkerThread::WriteCompleted - Socket write where not all data was written"));
  767. }
  768.    /*
  769.     * Pointer pBuffer could be declared const (but not in derived classes...)
  770. */
  771. }
  772. void CSocketServer::WorkerThread::AbortiveClose( Socket *pSocket )
  773. {
  774.    /*
  775.     * Force an abortive close.
  776. */
  777. LINGER lingerStruct;
  778. lingerStruct.l_onoff = 1;
  779. lingerStruct.l_linger = 0;
  780. if ( SOCKET_ERROR == ::setsockopt( pSocket->m_socket, SOL_SOCKET, SO_LINGER, (char *)&lingerStruct, sizeof(lingerStruct) ) )
  781. {
  782. /*
  783.  * Call to unqualified virtual function
  784.  */
  785. OnError( _T("CSocketServer::Socket::AbortiveClose() - setsockopt(SO_LINGER) - ")  + GetLastErrorMessage( ::WSAGetLastError() ) );
  786. }
  787. pSocket->Close();
  788. }
  789. void CSocketServer::WorkerThread::OnError( const _tstring &message )
  790. {
  791. Output( message );
  792. }
  793. } // End of namespace OnlineGameLib
  794. } // End of namespace Win32