DirectedThreadPoolQueue.cpp
上传用户:dengkfang
上传日期:2008-12-30
资源大小:5233k
文件大小:11k
源码类别:

CA认证

开发平台:

Visual C++

  1. /*
  2. Module : DirectedThreadPoolQueue.cpp
  3. Purpose: Implementation for an MFC class which implements a Pseudo IOCP like class which also 
  4.          supports a directed post to the queue. i.e. the request is for a specific thread in the 
  5.          thread pool (which is the normal use of a Directed IOCP). This can be used by a thread pool
  6.          server (see the author's CThreadPoolServer class) to implement a thread pool which implements
  7.          recycling.
  8. Created: PJN / 16-04-2002
  9. History: PJN / 11-11-2004 1. Provided a GetRequestArray() method which allows access to the internal
  10.                           array used to hold the thread pool requests. Can prove handy to have access
  11.                           to this in certain circumstances.
  12.                           2. Updated to compile cleanly when Detect 64 bit issues and Force conformance in for loop
  13.                           options are enabled in Visual Studio .Net
  14.          PJN / 30-11-2004 1. Updated the macro which detects if arrays use INT_PTR for index rather than int.
  15.                           2. Also removed some ASSERTS which were overly restrictive in light of the queue now 
  16.                           being externally modifiable via CThreadPoolServer::GetQueue
  17. Copyright (c) 2002 - 2005 by PJ Naughter.  (Web: www.naughter.com, Email: pjna@naughter.com)
  18. All rights reserved.
  19. Copyright / Usage Details:
  20. You are allowed to include the source code in any product (commercial, shareware, freeware or otherwise) 
  21. when your product is released in binary form. You are allowed to modify the source code in any way you want 
  22. except you cannot modify the copyright details at the top of each module. If you want to distribute source 
  23. code with your application, then you are only allowed to distribute versions released by the author. This is 
  24. to maintain a single distribution point for the source code. 
  25. */
  26. //////////////////// Includes /////////////////////////////////////////////////
  27. #include "stdafx.h"
  28. #include "DirectedThreadPoolQueue.h"
  29. /////////////////// Macros / Defines //////////////////////////////////////////
  30. #ifdef _DEBUG
  31. #define new DEBUG_NEW
  32. #undef THIS_FILE
  33. static char THIS_FILE[] = __FILE__;
  34. #endif
  35. ///////////////// Implementation //////////////////////////////////////////////
  36. IMPLEMENT_DYNCREATE(CDirectedThreadPoolQueue, CThreadPoolQueue)
  37. CDirectedThreadPoolQueue::CDirectedThreadPoolQueue()
  38. {
  39.   //Initialize our member variables
  40.   m_hPostRequestSemaphore = NULL;
  41.   m_hGetRequestSemaphore = NULL;
  42. }
  43. CDirectedThreadPoolQueue::~CDirectedThreadPoolQueue()
  44. {
  45.   Close();
  46. }
  47. BOOL CDirectedThreadPoolQueue::Create(DWORD dwMaxQSize)
  48. {
  49.   //Close if already created (this will empty out the request Q for us
  50.   Close();
  51.   //Create the PostRequest semaphores
  52.   m_hPostRequestSemaphore = CreateSemaphore(NULL, dwMaxQSize, dwMaxQSize, NULL);
  53.   if (m_hPostRequestSemaphore == NULL)
  54.   {
  55.     TRACE(_T("CDirectedThreadPoolQueue::Create, Failed to create a PostRequest semaphore, Error:%dn"), GetLastError());
  56.     Close();
  57.     return FALSE;
  58.   }
  59.   m_PostRequestSemaphores.SetSize(0, dwMaxQSize);
  60.   DWORD i;
  61.   for (i=0; i<dwMaxQSize; i++)
  62.   {
  63.     HANDLE hSemaphore = CreateSemaphore(NULL, dwMaxQSize, dwMaxQSize, NULL);
  64.     if (hSemaphore == NULL)
  65.     {
  66.       TRACE(_T("CDirectedThreadPoolQueue::Create, Failed to create a PostRequest semaphore, Error:%dn"), GetLastError());
  67.       Close();
  68.       return FALSE;
  69.     }
  70.     m_PostRequestSemaphores.Add(hSemaphore);
  71.   }
  72.   //Create the GetRequest semaphores
  73.   m_hGetRequestSemaphore = CreateSemaphore(NULL, 0, dwMaxQSize, NULL);
  74.   if (m_hGetRequestSemaphore == NULL)
  75.   {
  76.     TRACE(_T("CDirectedThreadPoolQueue::Create, Failed to create a GetRequest semaphore, Error:%dn"), GetLastError());
  77.     Close();
  78.     return FALSE;
  79.   }
  80.   m_GetRequestSemaphores.SetSize(0, dwMaxQSize);
  81.   for (i=0; i<dwMaxQSize; i++)
  82.   {
  83.     HANDLE hSemaphore = CreateSemaphore(NULL, 0, dwMaxQSize, NULL);
  84.     if (hSemaphore == NULL)
  85.     {
  86.       TRACE(_T("CDirectedThreadPoolQueue::Create, Failed to create a GetRequest semaphore, Error:%dn"), GetLastError());
  87.       Close();
  88.       return FALSE;
  89.     }
  90.     m_GetRequestSemaphores.Add(hSemaphore);
  91.   }
  92.   return TRUE;
  93. }
  94. BOOL CDirectedThreadPoolQueue::PostRequestWithoutLimitCheck(const CThreadPoolRequest& request)
  95. {
  96.   ASSERT(IsCreated()); //Must have been created
  97.   //By default assume the best
  98.   BOOL bSuccess = TRUE;
  99.   //Add the request to the request queue
  100.   CSingleLock sl(&m_csRequests, TRUE);
  101.   CThreadPoolRequest CopyOfRequest(request);
  102. #if (_MFC_VER >= 0x700)
  103.   INT_PTR nRequestIndex = m_Requests.Add(CopyOfRequest);
  104. #else
  105.   int nRequestIndex = m_Requests.Add(CopyOfRequest);
  106. #endif
  107.   sl.Unlock();
  108.   //Release the semaphore
  109.   if (request.m_bDirectedRequest)
  110.   {
  111.     if (!ReleaseSemaphore(m_GetRequestSemaphores.GetAt(request.m_nDirectedRequestIndex), 1, NULL))
  112.     {
  113.       bSuccess = FALSE;
  114.       //Remove the item from the Q since we could not update the "Get" semaphores
  115.       sl.Lock();
  116.       m_Requests.RemoveAt(nRequestIndex);
  117.       sl.Unlock();
  118.       TRACE(_T("CDirectedThreadPoolQueue::PostRequestWithoutLimitCheck, Failed to release a semaphore, Error:%dn"), GetLastError());
  119.     }
  120.   }
  121.   else
  122.   {
  123.     if (!ReleaseSemaphore(m_hGetRequestSemaphore, 1, NULL))
  124.     {
  125.       bSuccess = FALSE;
  126.       //Remove the item from the Q since we could not update the "Get" semaphores
  127.       sl.Lock();
  128.       m_Requests.RemoveAt(nRequestIndex);
  129.       sl.Unlock();
  130.       TRACE(_T("CDirectedThreadPoolQueue::PostRequestWithoutLimitCheck, Failed to release a semaphore, Error:%dn"), GetLastError());
  131.     }
  132.   }
  133.   return bSuccess;
  134. }
  135. BOOL CDirectedThreadPoolQueue::PostRequest(const CThreadPoolRequest& request, DWORD dwMilliseconds)
  136. {
  137.   ASSERT(IsCreated()); //Must have been created
  138.   //Wait for the post request semaphore
  139.   DWORD dwWait;
  140.   if (request.m_bDirectedRequest)
  141.     dwWait = WaitForSingleObject(m_PostRequestSemaphores.GetAt(request.m_nDirectedRequestIndex), dwMilliseconds);
  142.   else
  143.     dwWait = WaitForSingleObject(m_hPostRequestSemaphore, dwMilliseconds);
  144.   if (dwWait != WAIT_OBJECT_0)
  145.   {
  146.     TRACE(_T("CDirectedThreadPoolQueue::PostRequest, Failed while waiting for the Q to free up, Error:%dn"), GetLastError());
  147.     return FALSE;
  148.   }
  149.   //Pass the buck to the other PostRequest method
  150.   return PostRequestWithoutLimitCheck(request);
  151. }
  152. int CDirectedThreadPoolQueue::GetNonDirectedRequestIndexToRemove()
  153. {
  154.   //Work out the item to remove from the Q, by default we pick
  155.   //the first non directed request starting from the tail of the queue
  156.   int nIndexToRemoveAt = -1;
  157. #if (_MFC_VER >= 0x700)
  158.   INT_PTR nRequestSize = m_Requests.GetSize();
  159. #else
  160.   int nRequestSize = m_Requests.GetSize();
  161. #endif
  162.   for (int i=0; (i<nRequestSize) && (nIndexToRemoveAt == -1); i++)
  163.   {
  164.     CThreadPoolRequest& tempRequest = m_Requests.ElementAt(i);
  165.     if (!tempRequest.m_bDirectedRequest)
  166.       nIndexToRemoveAt = i;
  167.   }
  168.   return nIndexToRemoveAt;
  169. }
  170. #if (_MFC_VER >= 0x700)
  171. INT_PTR CDirectedThreadPoolQueue::GetDirectedRequestIndexToRemove(int nThreadIndexForDirectedRequest)
  172. #else
  173. int CDirectedThreadPoolQueue::GetDirectedRequestIndexToRemove(int nThreadIndexForDirectedRequest)
  174. #endif
  175. {
  176.   //Work out the item to remove from the Q, by default we pick the first 
  177.   //directed request for this thread starting from the tail of the queue
  178. #if (_MFC_VER >= 0x700)
  179.   INT_PTR nIndexToRemoveAt = -1;
  180.   INT_PTR nRequestSize = m_Requests.GetSize();
  181.   INT_PTR i;
  182. #else
  183.   int nIndexToRemoveAt = -1;
  184.   int nRequestSize = m_Requests.GetSize();
  185.   int i;
  186. #endif
  187.   ASSERT(nRequestSize);
  188.   for (i=0; (i<nRequestSize) && (nIndexToRemoveAt == -1); i++)
  189.   {
  190.     CThreadPoolRequest& tempRequest = m_Requests.ElementAt(i);
  191.     if (tempRequest.m_bDirectedRequest && nThreadIndexForDirectedRequest == tempRequest.m_nDirectedRequestIndex)
  192.       nIndexToRemoveAt = i;
  193.   }
  194.   return nIndexToRemoveAt;
  195. }
  196. BOOL CDirectedThreadPoolQueue::GetRequest(CThreadPoolRequest& request, int nThreadIndexForDirectedRequest, DWORD dwMilliseconds)
  197. {
  198.   ASSERT(IsCreated()); //Must have been created
  199.   //Wait for either a non directed request or a directed request for this thread to become available on the Q
  200.   HANDLE hWaitHandles[2];
  201.   hWaitHandles[0] = m_GetRequestSemaphores.GetAt(nThreadIndexForDirectedRequest);
  202.   hWaitHandles[1] = m_hGetRequestSemaphore;
  203.   DWORD dwWait = WaitForMultipleObjects(2, hWaitHandles, FALSE, dwMilliseconds);
  204.   int nSignaledHandle = dwWait - WAIT_OBJECT_0;
  205.   //Work out what the return value from WFMO means!
  206.   BOOL bRemoveDirected = FALSE;
  207.   if (nSignaledHandle == 0)
  208.     bRemoveDirected = TRUE;
  209.   else if (nSignaledHandle != 1)
  210.   {
  211.     TRACE(_T("CDirectedThreadPoolQueue::GetRequest, Failed while waiting for the item on the Q, Error:%dn"), GetLastError());
  212.     return FALSE;
  213.   }
  214.   //Lock down access to the Q
  215.   CSingleLock sl(&m_csRequests, TRUE);
  216.   //Remove some item from the request Q  
  217.   if (bRemoveDirected)
  218.   {
  219.     //Work out the item to remove from the Q
  220. #if (_MFC_VER >= 0x700)
  221.     INT_PTR nIndexToRemoveAt = GetDirectedRequestIndexToRemove(nThreadIndexForDirectedRequest);
  222. #else
  223.     int nIndexToRemoveAt = GetDirectedRequestIndexToRemove(nThreadIndexForDirectedRequest);
  224. #endif
  225.     ASSERT(nIndexToRemoveAt != -1); //something has gone badly wrong if we could not find a request to remove
  226.     request = m_Requests.GetAt(nIndexToRemoveAt);
  227.     ASSERT(request.m_bDirectedRequest); //the GetDirectedRequestIndexToRemove call above has returned an incorrect index
  228.     m_Requests.RemoveAt(nIndexToRemoveAt);
  229.     //Release the PostRequest semaphore
  230.     ReleaseSemaphore(m_PostRequestSemaphores.GetAt(nThreadIndexForDirectedRequest), 1, NULL);
  231.   }
  232.   else
  233.   {
  234.     //Work out the item to remove from the Q
  235.     int nIndexToRemoveAt = GetNonDirectedRequestIndexToRemove();
  236.     if (nIndexToRemoveAt != -1)
  237.     {
  238.       request = m_Requests.GetAt(nIndexToRemoveAt);
  239.       ASSERT(!request.m_bDirectedRequest); //the GetNonDirectedRequestIndexToRemove call above has returned an incorrect index
  240.       m_Requests.RemoveAt(nIndexToRemoveAt);
  241.       //Release the PostRequest semaphore
  242.       ReleaseSemaphore(m_hPostRequestSemaphore, 1, NULL);
  243.     }
  244.     else
  245.       return FALSE;
  246.   }
  247.   
  248.   return TRUE;
  249. }
  250. BOOL CDirectedThreadPoolQueue::Close()
  251. {
  252.   //Empty out the request queue
  253.   CSingleLock sl(&m_csRequests, TRUE);
  254.   m_Requests.RemoveAll();
  255.   sl.Unlock();
  256.   
  257.   //Free up the PostRequest semaphores
  258.   if (m_hPostRequestSemaphore)
  259.   {
  260.     CloseHandle(m_hPostRequestSemaphore);
  261.     m_hPostRequestSemaphore = NULL;
  262.   }
  263.   int i;
  264.   for (i=0; i<m_PostRequestSemaphores.GetSize(); i++)
  265.   {
  266.     HANDLE hSemaphore = m_PostRequestSemaphores.GetAt(i);
  267.     CloseHandle(hSemaphore);
  268.   }
  269.   m_PostRequestSemaphores.SetSize(0);
  270.   //Free up the GetRequest semaphores
  271.   if (m_hGetRequestSemaphore)
  272.   {
  273.     CloseHandle(m_hGetRequestSemaphore);
  274.     m_hGetRequestSemaphore = NULL;
  275.   }
  276.   for (i=0; i<m_GetRequestSemaphores.GetSize(); i++)
  277.   {
  278.     HANDLE hSemaphore = m_GetRequestSemaphores.GetAt(i);
  279.     CloseHandle(hSemaphore);
  280.   }
  281.   m_GetRequestSemaphores.SetSize(0);
  282.   return TRUE;
  283. }
  284. BOOL CDirectedThreadPoolQueue::IsCreated() const
  285. {
  286.   return (m_hPostRequestSemaphore != NULL);
  287. }