SocketServer.cpp
上传用户:dzyhzl
上传日期:2019-04-29
资源大小:56270k
文件大小:26k
源码类别:

模拟服务器

开发平台:

C/C++

  1. #include "stdafx.h"
  2. #include "SocketServer.h"
  3. #include "IOCompletionPort.h"
  4. #include "Win32Exception.h"
  5. #include "Utils.h"
  6. #include "SystemInfo.h"
  7. #include "Socket.h"
  8. #include "Macro.h"
  9. #include "..heavenserverstage.h"
  10. #include <vector>
  11. #pragma comment(lib, "ws2_32.lib")
  12. /*
  13.  * Link options and warning
  14.  */
  15. #pragma message( "NOTE : --------------------OnlineGame [Server] : Announcement--------------------" )
  16. #pragma message( "NOTE : Developing a truly scalable Winsock Server using IO Completion Ports(IOCP)" )
  17. #pragma message( "NOTE : But this module depend on the microsoft platform" )
  18. #pragma message( "NOTE : Requirements :" )
  19. #pragma message( "NOTE :    * Windows NT/2000/XP: Included in Windows NT 4.0 and later." )
  20. #pragma message( "NOTE :    * Windows 95/98/Me: Unsupported." )
  21. #pragma message( "NOTE :" )
  22. #pragma message( "NOTE : liupeng xishanju.zhuhai.china 2003.1" )
  23. #pragma message( "NOTE : -----------------------------------------------------------------------------" )
  24. using std::vector;
  25. /*
  26.  * namespace OnlineGameLib::Win32
  27.  */
  28. namespace OnlineGameLib {
  29. namespace Win32 {
  30. /*
  31.  * Static helper methods
  32.  */
  33. static size_t CalculateNumberOfThreads( size_t numThreads );
  34. static size_t CalculateNumberOfThreads( size_t numThreads )
  35. {
  36. if ( numThreads == 0 )
  37. {
  38. CSystemInfo systemInfo;
  39. numThreads = systemInfo.dwNumberOfProcessors * 2 + 2;
  40. }
  41. return numThreads;
  42. }
  43. /*
  44.  * CSocketServer
  45.  */
  46. CSocketServer::CSocketServer(
  47.    size_t maxFreeSockets,
  48.    size_t maxFreeBuffers,
  49.    size_t bufferSize /* = 1024 */,
  50.    size_t numThreads /* = 0 */,
  51.    bool useSequenceNumbers /* = true */,
  52.    bool postZeroByteReads /* = false */)
  53.    :  CIOBuffer::Allocator( bufferSize, maxFreeBuffers ),
  54.   m_numThreads( CalculateNumberOfThreads( numThreads ) ),
  55.   m_listeningSocket( INVALID_SOCKET ),
  56.   m_iocp( 0 ),
  57.   m_maxFreeSockets( maxFreeSockets ),
  58.   m_useSequenceNumbers( useSequenceNumbers ),
  59.   m_postZeroByteReads( postZeroByteReads )
  60. {
  61. }
  62. CSocketServer::~CSocketServer()
  63. {
  64. }
  65. void CSocketServer::Open( unsigned long addressToListenOn, 
  66. unsigned short portToListenOn )
  67. {
  68. m_address = addressToListenOn;
  69. m_port = portToListenOn;
  70. }
  71. void CSocketServer::ReleaseSockets()
  72. {
  73. CCriticalSection::Owner lock( m_listManipulationSection );
  74. while ( m_activeList.Head() )
  75. {
  76. ReleaseSocket( m_activeList.Head() );
  77. }
  78. while ( m_freeList.Head() )
  79. {
  80. DestroySocket( m_freeList.PopNode() );
  81. }
  82. if ( m_activeList.Count() + m_freeList.Count() != 0 )
  83. {
  84. OnError( _T("CSocketServer::ReleaseSockets() - Leaked sockets") );
  85. }
  86. }
  87. void CSocketServer::StartAcceptingConnections()
  88. {
  89. if ( m_listeningSocket == INVALID_SOCKET )
  90. {
  91. OnStartAcceptingConnections();
  92. m_listeningSocket = CreateListeningSocket( m_address, m_port );
  93. m_acceptConnectionsEvent.Set();
  94. }
  95. }
  96. void CSocketServer::StopAcceptingConnections()
  97. {
  98. if ( m_listeningSocket != INVALID_SOCKET )
  99. {
  100. m_acceptConnectionsEvent.Reset();
  101. if ( 0 != ::closesocket( m_listeningSocket ) )
  102. {
  103. OnError( _T("CSocketServer::StopAcceptingConnections() - closesocket - ") +
  104. GetLastErrorMessage( ::WSAGetLastError() ) );
  105. }
  106. m_listeningSocket = INVALID_SOCKET;
  107. OnStopAcceptingConnections();
  108. }
  109. }
  110. void CSocketServer::InitiateShutdown()
  111. {
  112. StopAcceptingConnections();
  113.    /*
  114.     * enter m_listManipulationSection
  115. */
  116.    {
  117.    CCriticalSection::Owner lock( m_listManipulationSection );
  118.    
  119.    Socket *pSocket = m_activeList.Head();
  120.    
  121.    while ( pSocket )
  122.    {
  123.    Socket *pNext = SocketList::Next( pSocket );
  124.    
  125.    pSocket->AbortiveClose();
  126.    
  127.    pSocket = pNext;
  128.    }
  129.    }
  130.    /*
  131.     * leave m_listManipulationSection
  132. */
  133.    /*
  134.     * signal that the dispatch thread should shut down all worker threads and then exit
  135. */
  136.    m_shutdownEvent.Set();
  137.    
  138.    OnShutdownInitiated();
  139. }
  140. void CSocketServer::WaitForShutdownToComplete()
  141. {
  142. /*
  143.  * If we havent already started a shut down, do so...
  144.  */
  145. InitiateShutdown();
  146. Wait();
  147. ReleaseSockets();
  148. Flush();
  149. }
  150. SOCKET CSocketServer::CreateListeningSocket( unsigned long address, unsigned short port )
  151. {
  152. SOCKET s = ::WSASocket( AF_INET, 
  153. SOCK_STREAM, 
  154. IPPROTO_IP, 
  155. NULL, 
  156. 0, 
  157. WSA_FLAG_OVERLAPPED /* for IOCP */ );
  158. if ( s == INVALID_SOCKET )
  159. {
  160. throw CWin32Exception( _T("CSocket::CreateListeningSocket()"), 
  161. ::WSAGetLastError() );
  162. }
  163. CSocket listeningSocket( s );
  164. CSocket::InternetAddress localAddress( address, port );
  165. listeningSocket.Bind( localAddress );
  166. listeningSocket.Listen( 5 );
  167. return listeningSocket.Detatch();
  168. }
  169. CSocketServer::WorkerThread *CSocketServer::CreateWorkerThread(
  170. CIOCompletionPort &iocp )
  171. {
  172. return new WorkerThread( *this, iocp );
  173. }
  174. int CSocketServer::Run()
  175. {
  176. try
  177. {
  178. vector< WorkerThread * > workers;
  179. workers.reserve( m_numThreads );
  180. for ( size_t i = 0; i < m_numThreads; ++i )
  181. {
  182. WorkerThread *pThread = CreateWorkerThread( m_iocp ); 
  183. workers.push_back( pThread );
  184. pThread->Start();
  185. }
  186. HANDLE handlesToWaitFor[2];
  187. handlesToWaitFor[0] = m_shutdownEvent.GetEvent();
  188. handlesToWaitFor[1] = m_acceptConnectionsEvent.GetEvent();
  189. while ( !m_shutdownEvent.Wait( 0 ) )
  190. {
  191. DWORD waitResult = ::WaitForMultipleObjects( 2,
  192. handlesToWaitFor, 
  193. false, 
  194. INFINITE );
  195. if ( waitResult == WAIT_OBJECT_0 )
  196. {
  197. /*
  198.  * Time to shutdown
  199.  */
  200. break;
  201. }
  202. else if (waitResult == WAIT_OBJECT_0 + 1)
  203. {
  204. /*
  205.  * accept connections
  206.  */
  207. while ( !m_shutdownEvent.Wait(0) && 
  208. m_acceptConnectionsEvent.Wait( 0 ) )
  209. {
  210. CIOBuffer *pAddress = Allocate();
  211. int addressSize = (int)pAddress->GetSize();
  212. SOCKET acceptedSocket = ::WSAAccept(
  213. m_listeningSocket, 
  214. reinterpret_cast< sockaddr * >( const_cast< BYTE * >( pAddress->GetBuffer() ) ), 
  215. &addressSize, 
  216. 0, 
  217. 0);
  218. pAddress->Use( addressSize );
  219. if ( acceptedSocket != INVALID_SOCKET )
  220. {
  221. Socket *pSocket = AllocateSocket( acceptedSocket );
  222. OnConnectionEstablished( pSocket, pAddress );
  223. pSocket->Release();
  224. }
  225. else if ( m_acceptConnectionsEvent.Wait( 0 ) )
  226. {
  227. OnError( _T("CSocketServer::Run() - WSAAccept:") + 
  228. GetLastErrorMessage( ::WSAGetLastError() ) );
  229. }
  230. pAddress->Release();
  231. }
  232. }
  233. else
  234. {
  235. OnError( _T("CSocketServer::Run() - WaitForMultipleObjects: ") +
  236. GetLastErrorMessage( ::GetLastError() ) );
  237. }
  238. }
  239. for ( i = 0; i < m_numThreads; ++i )
  240. {
  241. workers[i]->InitiateShutdown();
  242. }  
  243. for ( i = 0; i < m_numThreads; ++i )
  244. {
  245. workers[i]->WaitForShutdownToComplete();
  246. delete workers[i];
  247. workers[i] = 0;
  248. }  
  249. }
  250. catch( const CException &e )
  251. {
  252. OnError( _T("CSocketServer::Run() - Exception: ") + 
  253. e.GetWhere() + 
  254. _T(" - ") + 
  255. e.GetMessage() );
  256. }
  257. catch(...)
  258. {
  259. OnError( _T("CSocketServer::Run() - Unexpected exception") );
  260. }
  261. OnShutdownComplete();
  262. return 0;
  263. }
  264. CSocketServer::Socket *CSocketServer::AllocateSocket( SOCKET theSocket )
  265. {
  266. CCriticalSection::Owner lock( m_listManipulationSection );
  267. Socket *pSocket = 0;
  268. if ( !m_freeList.Empty() )
  269. {
  270. pSocket = m_freeList.PopNode();
  271. pSocket->Attach( theSocket );
  272. pSocket->AddRef();
  273. }
  274. else
  275. {
  276. pSocket = new Socket( *this, theSocket, m_useSequenceNumbers );
  277. OnConnectionCreated();
  278. }
  279. m_activeList.PushNode( pSocket );
  280. m_iocp.AssociateDevice( reinterpret_cast< HANDLE >( theSocket ), 
  281. ( ULONG_PTR )pSocket);
  282. return pSocket;
  283. }
  284. void CSocketServer::ReleaseSocket( Socket *pSocket )
  285. {
  286. if ( !pSocket )
  287. {
  288. throw CException( _T("CSocketServer::ReleaseSocket()"),
  289. _T("pSocket is null") );
  290. }
  291. CCriticalSection::Owner lock( m_listManipulationSection );
  292. pSocket->SetMessageCallback(NULL);
  293. pSocket->RemoveFromList();
  294. if ( m_maxFreeSockets == 0 || 
  295. m_freeList.Count() < m_maxFreeSockets )
  296. {
  297. m_freeList.PushNode( pSocket );
  298. }
  299. else
  300. {
  301. DestroySocket( pSocket );
  302. }
  303. }
  304. void CSocketServer::DestroySocket( Socket *pSocket )
  305. {
  306. delete pSocket;
  307. OnConnectionDestroyed();
  308. }
  309. void CSocketServer::OnError( const _tstring &message )
  310. {
  311. DEBUG_ONLY( Output( message ) );
  312. }
  313.   
  314. void CSocketServer::WriteCompleted( Socket * /*pSocket*/, CIOBuffer *pBuffer )
  315. {
  316. if ( pBuffer->GetUsed() != pBuffer->GetWSABUF()->len )
  317. {
  318. OnError( _T("CSocketServer::WorkerThread::WriteCompleted - Socket write where not all data was written") );
  319. }
  320. }
  321. void CSocketServer::SetServerDataPtr( Socket *pSocket, void *pData )
  322. {
  323. pSocket->SetServerDataPtr( pData );
  324. }
  325. void *CSocketServer::GetServerDataPtr( const Socket *pSocket )
  326. {
  327. return pSocket->GetServerDataPtr();
  328. }
  329. void CSocketServer::WriteCompleted( Socket *pSocket )
  330. {
  331. pSocket->WriteCompleted();
  332. }
  333. bool CSocketServer::FilterSocketShutdown( Socket * /*pSocket*/, int /*how*/ )
  334. {
  335. return true;
  336. }
  337. bool CSocketServer::FilterSocketClose( Socket * /*pSocket*/ )
  338. {
  339. return true;
  340. }
  341. void CSocketServer::PostIoOperation( Socket *pSocket,
  342. CIOBuffer *pBuffer,
  343. enumIO_Operation operation)
  344. {
  345. pBuffer->SetOperation( operation );
  346. pBuffer->AddRef();
  347. pSocket->AddRef();
  348. m_iocp.PostStatus( ( ULONG_PTR )pSocket, 0, pBuffer );
  349. }
  350. CSocketServer &CSocketServer::GetServer( Socket *pSocket )
  351. {
  352. return pSocket->m_server;
  353. }
  354. void CSocketServer::OnConnectionError( enumConnectionErrorSource /*source*/,
  355. Socket * /*pSocket*/,
  356. CIOBuffer * /*pBuffer*/,
  357. DWORD lastError)
  358. {
  359. _tstring sErrorInfo = GetLastErrorMessage( lastError );
  360. DEBUG_ONLY( Output( _T("CSocketServer::OnConnectionError() - : ") + sErrorInfo ) );
  361. //DEBUG_ONLY( Message( sErrorInfo.c_str() ) );
  362. }
  363. /*
  364.  * CSocketServer::Socket
  365.  */
  366. CSocketServer::Socket::Socket( CSocketServer &server,                                 
  367. SOCKET theSocket,
  368. bool useSequenceNumbers )
  369. : m_server( server ),
  370.  m_socket( theSocket ),
  371.  m_ref( 1 ),
  372.  m_outstandingWrites( 0 ),
  373.  m_readShutdown( false ),
  374.  m_writeShutdown( false ),
  375.  m_closing( false ),
  376.  m_clientClosed( false ),
  377.  m_pSequenceData( 0 ),
  378.  m_pIM (0 )
  379. {
  380. if ( !IsValid() )
  381. {
  382. throw CException( _T("CSocketServer::Socket::Socket()"), 
  383. _T("Invalid socket") );
  384. }
  385. if ( useSequenceNumbers )
  386. {
  387. m_pSequenceData = new SequenceData( m_crit );
  388. }
  389. }
  390. CSocketServer::Socket::~Socket()
  391. {
  392. try
  393. {
  394. delete m_pSequenceData;
  395. }
  396. catch(...)
  397. {
  398. TRACE( "CSocketServer::Socket::~Socket exception!" );
  399. }
  400. }
  401. void CSocketServer::Socket::Attach( SOCKET theSocket )
  402. {
  403. if ( IsValid() )
  404. {
  405. throw CException( _T("CSocketServer::Socket::Attach()"), 
  406. _T("Socket already attached"));
  407. }
  408. m_socket = theSocket;
  409. SetUserData( 0 );
  410. m_readShutdown = false;
  411. m_writeShutdown = false;
  412. m_outstandingWrites = 0;
  413. m_closing = false;
  414. m_clientClosed = false;
  415. if ( m_pSequenceData )
  416. {
  417. m_pSequenceData->Reset();
  418. }
  419. }
  420. void CSocketServer::Socket::AddRef()
  421. {
  422. ::InterlockedIncrement( &m_ref );
  423. }
  424. void CSocketServer::Socket::Release()
  425. {
  426. if ( 0 == ::InterlockedDecrement( &m_ref ) )
  427. {
  428. if ( IsValid() )
  429. {
  430. AddRef();
  431. if ( !m_closing )
  432. {
  433. m_closing = true;
  434. if ( !m_server.OnConnectionClosing( this ) )
  435. {
  436. AbortiveClose();
  437. }
  438. }
  439. else
  440. {
  441. AbortiveClose();
  442. }
  443. Release();
  444. return;
  445. }
  446. m_server.ReleaseSocket( this );
  447. }
  448. }
  449. void CSocketServer::Socket::Shutdown( int how /* = SD_BOTH */ )
  450. {
  451. DEBUG_ONLY( Output( _T("CSocketServer::Socket::Shutdown() ") + ToString( how ) ) );
  452. if ( m_server.FilterSocketShutdown( this, how ) )
  453. {
  454. if ( how == SD_RECEIVE || how == SD_BOTH )
  455. {
  456. m_readShutdown = true;
  457. }
  458. if ( how == SD_SEND || how == SD_BOTH )
  459. {
  460. m_writeShutdown = true;
  461. }
  462. if ( ::InterlockedExchange( &m_outstandingWrites, m_outstandingWrites ) > 0 )
  463. {
  464. /*
  465.  * Send side will be shut down when last pending write completes...
  466.  */
  467. if ( how == SD_BOTH )
  468. {
  469. how = SD_RECEIVE;      
  470. }
  471. else if ( how == SD_SEND )
  472. {
  473. return;
  474. }
  475. }
  476. if ( IsValid() )
  477. {
  478. if ( 0 != ::shutdown( m_socket, how ) )
  479. {
  480. m_server.OnError( _T("CSocketServer::Server::Shutdown() - ") +
  481. GetLastErrorMessage( ::WSAGetLastError() ) );
  482. }
  483. DEBUG_ONLY( Output( _T("shutdown initiated") ) );
  484. }
  485. }
  486. }
  487. bool CSocketServer::Socket::IsConnected( int how /*= SD_BOTH*/ ) const
  488. {
  489. if ( how == SD_RECEIVE )
  490. {
  491. return !m_readShutdown;
  492. }
  493. if ( how == SD_SEND )
  494. {
  495. return !m_writeShutdown;
  496. }
  497. if ( how == SD_BOTH )
  498. {
  499. return ( !m_writeShutdown && !m_readShutdown );
  500. }
  501. return false;
  502. }
  503. void CSocketServer::Socket::Close()
  504. {
  505. CCriticalSection::Owner lock( m_crit );
  506. if ( IsValid() )
  507. {
  508. if ( m_server.FilterSocketClose( this ) )
  509. {
  510. InternalClose();
  511. }
  512. }
  513. }
  514. bool CSocketServer::Socket::WritePending()
  515. {
  516. if ( m_writeShutdown )
  517. {
  518. DEBUG_ONLY( Output( _T("CSocketServer::Socket::WritePending() - Attempt to write after write shutdown") ) );
  519. return false;
  520. }
  521. ::InterlockedIncrement( &m_outstandingWrites );
  522. return true;
  523. }
  524. void CSocketServer::Socket::WriteCompleted()
  525. {
  526. if ( ::InterlockedDecrement( &m_outstandingWrites ) == 0 )
  527. {
  528. if ( m_writeShutdown )
  529. {
  530. /*
  531.  * The final pending write has been completed so we can now shutdown
  532.  * the send side of the connection.  
  533.  */
  534. Shutdown( SD_SEND );
  535. }
  536. }
  537. }
  538. void CSocketServer::Socket::AbortiveClose()
  539. {
  540. /*
  541.  * Force an abortive close.
  542.  */
  543. LINGER lingerStruct;
  544. lingerStruct.l_onoff = 1;
  545. lingerStruct.l_linger = 0;
  546. if ( SOCKET_ERROR == ::setsockopt( m_socket, 
  547. SOL_SOCKET, 
  548. SO_LINGER, 
  549. ( char * )&lingerStruct, 
  550. sizeof( lingerStruct ) ) )
  551. {
  552. m_server.OnError( _T("CSocketServer::Socket::AbortiveClose() - setsockopt( SO_LINGER ) - ") + 
  553. GetLastErrorMessage(::WSAGetLastError() ) );
  554. }
  555. InternalClose();
  556. }
  557. void CSocketServer::Socket::OnClientClose()
  558. {
  559. if ( 0 == ::InterlockedExchange( &m_clientClosed, 1 ) )
  560. {
  561. Shutdown( SD_RECEIVE );
  562. m_server.OnConnectionClientClose( this );
  563. }
  564. }
  565. void CSocketServer::Socket::OnConnectionReset()
  566. {
  567. CCriticalSection::Owner lock( m_crit );
  568. if ( IsValid() )
  569. {
  570. m_server.OnConnectionReset( this );
  571. InternalClose();
  572. }
  573. }
  574. void CSocketServer::Socket::InternalClose()
  575. {
  576. CCriticalSection::Owner lock( m_crit );
  577. if ( IsValid() )
  578. {
  579. if ( 0 != ::closesocket( m_socket ) )
  580. {
  581. m_server.OnError( _T("CSocketServer::Socket::InternalClose() - closesocket - ") + 
  582. GetLastErrorMessage( ::WSAGetLastError() ) );
  583. }
  584. m_socket = INVALID_SOCKET;
  585. m_readShutdown = true;
  586. m_writeShutdown = true;
  587. m_server.OnConnectionClosed( this );
  588. }
  589. }
  590. bool CSocketServer::Socket::Read( CIOBuffer *pBuffer /* = 0 */, bool throwOnFailure /* = false*/ )
  591. {
  592. if ( !IsValid() )
  593. {
  594. if ( throwOnFailure )
  595. {
  596. throw CException( _T("CSocketServer::Socket::Read()"), _T("Socket is closed") );
  597. }
  598. else
  599. {
  600. return false;
  601. }
  602. }
  603. /*
  604.  * Post a read request to the iocp so that the actual socket read gets performed by
  605.  * one of the server's IO threads...
  606.  */
  607. if ( !pBuffer )
  608. {
  609. pBuffer = m_server.Allocate();
  610. }
  611. else
  612. {
  613. pBuffer->AddRef();
  614. }
  615. m_server.PostIoOperation( this, pBuffer, m_server.m_postZeroByteReads ? enumIO_Zero_Byte_Read_Request : enumIO_Read_Request );
  616. pBuffer->Release();
  617. return true;
  618. }
  619. bool CSocketServer::Socket::Write( const char *pData, size_t dataLength, bool throwOnFailure /* = false*/ )
  620. {
  621. return Write( reinterpret_cast< const BYTE * >( pData ), dataLength, throwOnFailure );
  622. }
  623. bool CSocketServer::Socket::Write( const BYTE *pData, size_t dataLength, bool throwOnFailure /* = false*/ )
  624. {
  625. if ( !IsValid() )
  626. {
  627. if ( throwOnFailure )
  628. {
  629. /*
  630.  * Todo throw SocketClosedException();
  631.  */
  632. throw CException( _T("CSocketServer::Socket::Write()"), _T("Socket is closed") );
  633. }
  634. else
  635. {
  636. return false;
  637. }
  638. }
  639. if ( !WritePending() )
  640. {
  641. if ( throwOnFailure )
  642. {
  643. /*
  644.  * Todo throw SocketClosedException();
  645.  */
  646. throw CException(_T("CSocketServer::Socket::Write()"), _T("Socket is shutdown"));
  647. }
  648. else
  649. {
  650. return false;
  651. }
  652. }
  653. CIOBuffer *pBuffer = m_server.Allocate();
  654. pBuffer->AddData( pData, dataLength );
  655. pBuffer->SetSequenceNumber( GetSequenceNumber( enumWriteSequenceNo ) );
  656. m_server.PostIoOperation( this, pBuffer, enumIO_Write_Request );
  657. pBuffer->Release();
  658. return true;
  659. }
  660. bool CSocketServer::Socket::Write( CIOBuffer *pBuffer, bool throwOnFailure /* = false*/ )
  661. {
  662. if ( !IsValid() )
  663. {
  664. if ( throwOnFailure )
  665. {
  666. throw CException( _T("CSocketServer::Socket::Write()"), _T("Socket is closed") );
  667. }
  668. else
  669. {
  670. return false;
  671. }
  672. }
  673. if ( !WritePending() )
  674. {
  675. if ( throwOnFailure )
  676. {
  677. throw CException( _T("CSocketServer::Socket::Write()"), _T("Socket is shutdown") );
  678. }
  679. else
  680. {
  681. return false;
  682. }
  683. }
  684. pBuffer->SetSequenceNumber( GetSequenceNumber( enumWriteSequenceNo ) );
  685. m_server.PostIoOperation( this, pBuffer, enumIO_Write_Request );
  686. return true;
  687. }
  688. long CSocketServer::Socket::GetSequenceNumber( enumSequenceType type )
  689. {
  690. if ( m_pSequenceData )
  691. {
  692. return m_pSequenceData->m_numbers[type]++;
  693. }
  694. return 0;
  695. }
  696. CIOBuffer *CSocketServer::Socket::GetNextBuffer( CIOBuffer *pBuffer /* = 0 */ )
  697. {
  698. if ( m_pSequenceData )
  699. {
  700. if ( pBuffer )
  701. {
  702. return m_pSequenceData->m_outOfSequenceWrites.GetNext( pBuffer );
  703. }
  704. else
  705. {
  706. return m_pSequenceData->m_outOfSequenceWrites.ProcessAndGetNext();
  707. }
  708. }
  709. return pBuffer;
  710. }
  711. bool CSocketServer::Socket::IsValid()
  712. {
  713. CCriticalSection::Owner lock(m_crit);
  714. return (INVALID_SOCKET != m_socket);
  715. }
  716. void CSocketServer::Socket::OnConnectionError( CSocketServer::enumConnectionErrorSource source,
  717.    CIOBuffer *pBuffer,
  718.    DWORD lastError)
  719. {
  720. if ( WSAESHUTDOWN == lastError )
  721. {
  722. OnClientClose();
  723. }
  724. else if ( WSAECONNRESET == lastError || WSAECONNABORTED == lastError )
  725. {
  726. OnConnectionReset();
  727. }
  728. else if ( !IsValid() && WSAENOTSOCK == lastError )
  729. {
  730. /*
  731.  * Swallow this error as we expect it...
  732.  */
  733. }
  734. else
  735. {
  736. m_server.OnConnectionError( source, this, pBuffer, lastError );
  737. }
  738. }
  739. /*
  740.  * CSocketServer::Socket::SequenceData
  741.  */
  742. CSocketServer::Socket::SequenceData::SequenceData( CCriticalSection &section )
  743. : m_outOfSequenceWrites( section )
  744. {
  745. memset( m_numbers, 0, sizeof( m_numbers ) );
  746. }
  747. void CSocketServer::Socket::SequenceData::Reset()
  748. {
  749.    memset( m_numbers, 0, sizeof( m_numbers ) );
  750.    m_outOfSequenceWrites.Reset();
  751. }
  752. /*
  753.  * CSocketServer::WorkerThread
  754.  */
  755. CSocketServer::WorkerThread::WorkerThread( CSocketServer &server, CIOCompletionPort &iocp )
  756. : m_server( server ),
  757.   m_iocp( iocp )
  758. {
  759. m_server.OnThreadCreated();
  760. }
  761. CSocketServer::WorkerThread::~WorkerThread()
  762. {
  763. try
  764. {
  765. m_server.OnThreadDestroyed();
  766. }
  767. catch(...)
  768. {
  769. TRACE( "CSocketServer::WorkerThread::~WorkerThread exception!" );
  770. }
  771. }
  772. int CSocketServer::WorkerThread::Run()
  773. {
  774. try
  775. {
  776. while ( true )
  777. {
  778. /*
  779.  * continually loop to service io completion packets
  780.  */
  781. DWORD dwIoSize = 0;
  782. Socket *pSocket = 0;
  783. CIOBuffer *pBuffer = 0;
  784. bool weClosedSocket = false;
  785. DWORD dwResult = m_iocp.GetStatus( ( PDWORD_PTR )&pSocket, &dwIoSize, ( OVERLAPPED ** )&pBuffer );
  786. if ( S_OK != dwResult )
  787. {
  788. if ( ERROR_NETNAME_DELETED == dwResult )
  789. {
  790. weClosedSocket = true;
  791. }
  792. else if ( WSA_OPERATION_ABORTED != dwResult )
  793. {
  794. throw CWin32Exception( _T("CIOCompletionPort::GetStatus() - GetQueuedCompletionStatus"), dwResult );
  795. }
  796. DEBUG_ONLY( Output( _T("IOCP error - client connection dropped") ) );
  797. }
  798. if ( !pSocket )
  799. {
  800. /*
  801.  * A completion key of 0 is posted to the iocp to request us to shut down...
  802.  */
  803. break;
  804. }
  805. /*
  806.  * Thread Begin Processing
  807.  */
  808. HandleOperation( pSocket, pBuffer, dwIoSize, weClosedSocket );
  809. /*
  810.  * Thread End Processing
  811.  */
  812. }// while ( true )
  813. }
  814. catch( const CException &e )
  815. {
  816. m_server.OnError( _T("CSocketServer::WorkerThread::Run() - Exception: ") +
  817. e.GetWhere() +
  818. _T(" - ") + 
  819. e.GetMessage() );
  820. }
  821. catch(...)
  822. {
  823. m_server.OnError( _T("CSocketServer::WorkerThread::Run() - Unexpected exception") );
  824. }
  825. return 0;
  826. }
  827. void CSocketServer::WorkerThread::InitiateShutdown()
  828. {
  829. m_iocp.PostStatus( 0 );
  830. }
  831. void CSocketServer::WorkerThread::WaitForShutdownToComplete()
  832. {
  833. /*
  834.      * if we havent already started a shut down, do so...
  835.  */
  836. InitiateShutdown();
  837. Wait();
  838. }
  839. void CSocketServer::WorkerThread::HandleOperation( Socket *pSocket,
  840.    CIOBuffer *pBuffer,
  841.    DWORD dwIoSize,
  842.    bool weClosedSocket )
  843. {
  844. if ( pBuffer )
  845. {
  846. const enumIO_Operation operation = static_cast< enumIO_Operation >( pBuffer->GetOperation() );
  847. switch ( operation )
  848. {
  849. case enumIO_Zero_Byte_Read_Request:
  850.             DEBUG_ONLY( Output( _T("ZeroByteRead:") + ToString( pBuffer ) ) );
  851.             ZeroByteRead( pSocket, pBuffer );
  852.             pSocket->Release();
  853.             pBuffer->Release();
  854. break ;
  855. case enumIO_Zero_Byte_Read_Completed :
  856. case enumIO_Read_Request :
  857.             DEBUG_ONLY( Output( _T("enumIO_Zero_Byte_Read_Completed | enumIO_Read_Request:") + ToString( pBuffer ) ) );
  858. Read( pSocket, pBuffer );
  859.             
  860.             pSocket->Release();
  861.             pBuffer->Release();
  862. break;
  863. case enumIO_Read_Completed :
  864.             DEBUG_ONLY( Output( _T("enumIO_Read_Completed:") + ToString( pBuffer ) ) );
  865.             pBuffer->Use( dwIoSize );            
  866.            
  867. if ( 0 != dwIoSize )
  868. {
  869. m_server.ReadCompleted( pSocket, pBuffer );
  870. }
  871.             if ( 0 == dwIoSize && !weClosedSocket )
  872.             {
  873. /*
  874.  * client connection dropped?
  875.  */
  876. DEBUG_ONLY( Output( _T("ReadCompleted - 0 bytes - client connection dropped") ) );
  877. pSocket->OnClientClose();
  878.             }
  879.             pSocket->Release();
  880.             pBuffer->Release();
  881. break;
  882. case enumIO_Write_Request :
  883.             DEBUG_ONLY( Output( _T("enumIO_Write_Request:") +
  884. ToString( pBuffer ) ) );
  885. Write( pSocket, pBuffer );
  886.             pSocket->Release();
  887.             pBuffer->Release();
  888. break;
  889. case enumIO_Write_Completed :
  890.             DEBUG_ONLY( Output( _T("enumIO_Write_Completed:") + ToString( pBuffer ) ) );
  891.             pBuffer->Use( dwIoSize );
  892. m_server.WriteCompleted( pSocket, pBuffer );
  893. pSocket->WriteCompleted();
  894.             pSocket->Release();
  895.             pBuffer->Release();
  896. break;
  897. default :
  898.             m_server.OnError( _T("CSocketServer::WorkerThread::Run() - Unexpected operation") );
  899. break;
  900.       } 
  901.    }
  902.    else
  903.    {
  904.    m_server.OnError( _T("CSocketServer::WorkerThread::Run() - Unexpected - pBuffer is 0") );
  905.    }
  906. }
  907. void CSocketServer::WorkerThread::ZeroByteRead( Socket *pSocket, CIOBuffer *pBuffer ) const
  908. {
  909. pSocket->AddRef();
  910. pBuffer->SetOperation( enumIO_Zero_Byte_Read_Completed );
  911. pBuffer->SetupZeroByteRead();
  912. pBuffer->AddRef();
  913. DWORD dwNumBytes = 0;
  914. DWORD dwFlags = 0;
  915. if ( SOCKET_ERROR == ::WSARecv(
  916. pSocket->m_socket, 
  917. pBuffer->GetWSABUF(), 
  918. 1, 
  919. &dwNumBytes,
  920. &dwFlags,
  921. pBuffer, 
  922. NULL ) )
  923. {
  924. DWORD lastError = ::WSAGetLastError();
  925. if ( ERROR_IO_PENDING != lastError )
  926. {
  927. pSocket->OnConnectionError( enumZeroByteReadError, pBuffer, lastError );
  928. pSocket->Release();
  929. pBuffer->Release();
  930. }
  931. }
  932. }
  933. void CSocketServer::WorkerThread::Read( Socket *pSocket, CIOBuffer *pBuffer ) const
  934. {
  935. pSocket->AddRef();
  936. pBuffer->SetOperation( enumIO_Read_Completed );
  937. pBuffer->SetupRead();
  938. pBuffer->AddRef();
  939. CCriticalSection::Owner lock( pSocket->m_crit );
  940. pBuffer->SetSequenceNumber( pSocket->GetSequenceNumber( Socket::enumReadSequenceNo ) );
  941. DWORD dwNumBytes = 0;
  942. DWORD dwFlags = 0;
  943. if ( SOCKET_ERROR == ::WSARecv(
  944. pSocket->m_socket, 
  945. pBuffer->GetWSABUF(), 
  946. 1, 
  947. &dwNumBytes,
  948. &dwFlags,
  949. pBuffer, 
  950. NULL ) )
  951. {
  952. DWORD lastError = ::WSAGetLastError();
  953. if ( ERROR_IO_PENDING != lastError )
  954. {
  955. pSocket->OnConnectionError( enumReadError, pBuffer, lastError );
  956. pSocket->Release();
  957. pBuffer->Release();
  958. }
  959. }
  960. }
  961. void CSocketServer::WorkerThread::Write( Socket *pSocket, CIOBuffer *pBuffer ) const
  962. {
  963. pSocket->AddRef();
  964. {
  965. CCriticalSection::Owner lock( pSocket->m_critWriteQueue );
  966. pBuffer->SetOperation( enumIO_Write_Completed );
  967. pBuffer->SetupWrite();
  968. pBuffer->AddRef();
  969. pBuffer = pSocket->GetNextBuffer( pBuffer );
  970. // DWORD dwID = ::GetCurrentThreadId();
  971. // printf( "Begin ID:%d - Socket:%dn", dwID, ( DWORD )pSocket );
  972. //
  973. // ::Sleep( 10 );
  974. while ( pBuffer )
  975. {
  976. DWORD dwFlags = 0;
  977. DWORD dwSendNumBytes = 0;
  978. // ::Sleep( 10 );
  979. if ( SOCKET_ERROR == ::WSASend(
  980. pSocket->m_socket,
  981. pBuffer->GetWSABUF(), 
  982. 1, 
  983. &dwSendNumBytes,
  984. dwFlags,
  985. pBuffer, 
  986. NULL ) )
  987. {
  988. DWORD lastError = ::WSAGetLastError();
  989. if ( ERROR_IO_PENDING != lastError )
  990. {
  991. pSocket->OnConnectionError( enumWriteError, pBuffer, lastError );
  992. /*
  993. * this pending write will never complete...
  994. */
  995. pSocket->WriteCompleted();
  996. pSocket->Release();
  997. pBuffer->Release();
  998. }
  999. }
  1000. pBuffer = pSocket->GetNextBuffer();
  1001. }
  1002. // printf( "End ID:%d - Socket:%dn", dwID, ( DWORD )pSocket );
  1003. }
  1004. }
  1005. } // End of namespace OnlineGameLib
  1006. } // End of namespace Win32