WorkQueue.cpp
上传用户:dfzycw
上传日期:2010-01-10
资源大小:66k
文件大小:6k
源码类别:

进程与线程

开发平台:

Visual C++

  1. #include "stdafx.h"
  2. #include "WorkQueue.h"
  3. #include <assert.h>
  4. typedef struct _THREAD_CONTEXT
  5. {
  6.    CWorkQueue* pWorkQueue;
  7.    void*       pThreadData;
  8. } THREAD_CONTEXT,*PTHREAD_CONTEXT;
  9. /*------------------------------------------------------------------------
  10. Create
  11.   初始化工作队列
  12.   const unsigned int  nNumberOfThreads      - 要创建的工作队列中线程的数目
  13.   void*         *ThreadData                 - 送给线程的需要执行的工作项目
  14. ------------------------------------------------------------------------*/
  15. bool CWorkQueue::Create(const unsigned int  nNumberOfThreads, 
  16.                               void*         *ThreadData      /*=NULL*/)
  17. {
  18.       
  19.    //初始化工作队列
  20.    m_pWorkItemQueue = new WorkItemQueue();
  21.    
  22.    if(NULL == m_pWorkItemQueue )
  23.    {      
  24.       return false;
  25.    }
  26.    
  27.    //创建Semaphore对象   
  28.    m_phSincObjectsArray[SEMAPHORE_INDEX] = CreateSemaphore(NULL,0,LONG_MAX,NULL);
  29.    
  30.    if(m_phSincObjectsArray[SEMAPHORE_INDEX] == NULL)
  31.    {      
  32.    delete m_pWorkItemQueue;
  33.    m_pWorkItemQueue = NULL;
  34.    return false;
  35.    }
  36.    
  37.    //创建event 事件对象
  38.    m_phSincObjectsArray[ABORT_EVENT_INDEX] = CreateEvent(NULL,TRUE,FALSE,NULL);
  39.    
  40.    if(m_phSincObjectsArray[ABORT_EVENT_INDEX]  == NULL)
  41.    {      
  42.       delete m_pWorkItemQueue;
  43.       m_pWorkItemQueue = NULL;
  44.       CloseHandle(m_phSincObjectsArray[SEMAPHORE_INDEX]);
  45.       return false;
  46.    }
  47.    
  48.    //创建临界区以保护工作队列
  49.    InitializeCriticalSection(&m_CriticalSection);
  50.    //分配线程句柄数组
  51.    m_phThreads = new HANDLE[nNumberOfThreads];
  52.    if(m_phThreads == NULL)
  53.    {    
  54.       delete m_pWorkItemQueue;
  55.       m_pWorkItemQueue = NULL;
  56.       CloseHandle(m_phSincObjectsArray[SEMAPHORE_INDEX]);
  57.   CloseHandle(m_phSincObjectsArray[ABORT_EVENT_INDEX]);
  58.   DeleteCriticalSection(&m_CriticalSection);   
  59.       return false;
  60.    }
  61.    unsigned int i;
  62.    m_nNumberOfThreads = nNumberOfThreads;
  63.    DWORD dwThreadId;
  64.    PTHREAD_CONTEXT pThreadsContext ;
  65.  
  66.    //创建所有的线程
  67.    for(i = 0 ; i < nNumberOfThreads ; i++ )
  68.    {  
  69.    //初始化每个线程的上下文,用于传递给线程函数
  70.   pThreadsContext = new THREAD_CONTEXT;
  71.   pThreadsContext->pWorkQueue  = this;
  72.   pThreadsContext->pThreadData = ThreadData == NULL? NULL : ThreadData[i];    
  73.   //创建线程
  74.   m_phThreads[i] = CreateThread(NULL,
  75.   0,
  76.   CWorkQueue::ThreadFunc,
  77.   pThreadsContext,
  78.   0,
  79.   &dwThreadId);
  80.       if(m_phThreads[i] == NULL)
  81.       {  
  82.  delete pThreadsContext;
  83.          m_nNumberOfThreads = i;
  84.          Destroy();
  85.          return false;
  86.       }
  87.    }
  88.    return true;
  89. }
  90. /*------------------------------------------------------------------------
  91. InsertWorkItem
  92.   插入工作任务到工作队列
  93. ------------------------------------------------------------------------*/
  94. bool CWorkQueue::InsertWorkItem(WorkItemBase* pWorkItem)
  95. {
  96.    assert(pWorkItem != NULL);
  97.    //锁住
  98.    EnterCriticalSection(&m_CriticalSection);
  99.    //插入队列
  100.    m_pWorkItemQueue->push(pWorkItem);
  101.    //解锁
  102.    LeaveCriticalSection(&m_CriticalSection); 
  103.    //触发线程
  104.    if (!ReleaseSemaphore(m_phSincObjectsArray[SEMAPHORE_INDEX],1,NULL)) 
  105.    { 
  106.       assert(false);
  107.       return false;     
  108.    } 
  109.    
  110.  return true;
  111.  
  112. }
  113. /*------------------------------------------------------------------------
  114. RemoveWorkItem()
  115.  从工作队列中取出任务,并且移除
  116. ------------------------------------------------------------------------*/
  117. WorkItemBase*  CWorkQueue::RemoveWorkItem()
  118. {
  119.    
  120.    WorkItemBase* pWorkItem;
  121.    //锁住
  122.    EnterCriticalSection(&m_CriticalSection);  
  123.    //从队列中移除任务  
  124.    pWorkItem = m_pWorkItemQueue->front();
  125.    m_pWorkItemQueue->pop();
  126.    //解锁
  127.    LeaveCriticalSection(&m_CriticalSection);
  128.    assert(pWorkItem != NULL);
  129.    return pWorkItem;
  130. }
  131. /*------------------------------------------------------------------------
  132. ThreadFunc
  133.   这是线程函数,用来等待工作任务事件的到来或者离开线程事件的触发
  134. ------------------------------------------------------------------------*/
  135. unsigned long __stdcall CWorkQueue::ThreadFunc( void*  pParam )
  136.    PTHREAD_CONTEXT       pThreadContext =  (PTHREAD_CONTEXT)pParam;//线程的传入参数 
  137.    WorkItemBase*         pWorkItem      = NULL;
  138.    CWorkQueue*           pWorkQueue     = pThreadContext->pWorkQueue;//工作队列指针
  139.    void*                 pThreadData    = pThreadContext->pThreadData;
  140.    DWORD dwWaitResult; 
  141.    for(;;)
  142.    {
  143.   //等待两个事件
  144.   dwWaitResult = WaitForMultipleObjects(NUMBER_OF_SYNC_OBJ,pWorkQueue->m_phSincObjectsArray,FALSE,INFINITE);
  145.       switch(dwWaitResult - WAIT_OBJECT_0)
  146.       {
  147.       case ABORT_EVENT_INDEX: //离开线程事件
  148.  delete pThreadContext;
  149.          return 0; 
  150.       case SEMAPHORE_INDEX://工作任务事件
  151.          //得到工作队列的第一个工作任务
  152.          pWorkItem = pWorkQueue->RemoveWorkItem();     
  153.          if(pWorkItem == NULL)
  154.          {
  155.             assert(false);
  156.             break;
  157.          }     
  158.          //调用相应的工作函数
  159.          pWorkItem->DoWork(pThreadData);     
  160.          break;
  161.       default:
  162.           assert(false);
  163.   delete pThreadContext;
  164.           return 0; 
  165.       }      
  166.    }
  167.    delete pThreadContext;
  168.    return 1; 
  169. }
  170. /*------------------------------------------------------------------------
  171. Destroy
  172.   设置线程退出事件,等待所有线程的结束
  173. ------------------------------------------------------------------------*/
  174. void CWorkQueue::Destroy()
  175. {
  176.   //设置退出事件    
  177.    if(!SetEvent(m_phSincObjectsArray[ABORT_EVENT_INDEX]))
  178.    {     
  179.       assert(false);
  180.       return;
  181.    }
  182.    //等待所有的线程结束
  183.    WaitForMultipleObjects(m_nNumberOfThreads,m_phThreads,true,INFINITE);
  184.          
  185.    //清除队列
  186.    while(!m_pWorkItemQueue->empty())
  187.    {
  188.       m_pWorkItemQueue->front()->Abort();
  189.   m_pWorkItemQueue->pop();
  190.    }  
  191.    delete m_pWorkItemQueue;
  192.    m_pWorkItemQueue = NULL;   
  193.    CloseHandle(m_phSincObjectsArray[SEMAPHORE_INDEX]);
  194.    CloseHandle(m_phSincObjectsArray[ABORT_EVENT_INDEX]);
  195.    DeleteCriticalSection(&m_CriticalSection);
  196.    //关闭所有的线程句柄
  197.    for(int i = 0 ; i < m_nNumberOfThreads ; i++)
  198.    CloseHandle(m_phThreads[i]);
  199.    delete[] m_phThreads;
  200. }