thread_pool.hpp
上传用户:yhdzpy8989
上传日期:2007-06-13
资源大小:13604k
文件大小:13k
源码类别:

生物技术

开发平台:

C/C++

  1. /*
  2.  * ===========================================================================
  3.  * PRODUCTION $Log: thread_pool.hpp,v $
  4.  * PRODUCTION Revision 1000.1  2004/06/03 19:28:22  gouriano
  5.  * PRODUCTION PRODUCTION: UPGRADED [GCC34_MSVC7] Dev-tree R1.12
  6.  * PRODUCTION
  7.  * ===========================================================================
  8.  */
  9. #ifndef THREAD_POOL__HPP
  10. #define THREAD_POOL__HPP
  11. /*  $Id: thread_pool.hpp,v 1000.1 2004/06/03 19:28:22 gouriano Exp $
  12. * ===========================================================================
  13. *
  14. *                            PUBLIC DOMAIN NOTICE
  15. *               National Center for Biotechnology Information
  16. *
  17. *  This software/database is a "United States Government Work" under the
  18. *  terms of the United States Copyright Act.  It was written as part of
  19. *  the author's official duties as a United States Government employee and
  20. *  thus cannot be copyrighted.  This software/database is freely available
  21. *  to the public for use. The National Library of Medicine and the U.S.
  22. *  Government have not placed any restriction on its use or reproduction.
  23. *
  24. *  Although all reasonable efforts have been taken to ensure the accuracy
  25. *  and reliability of the software and data, the NLM and the U.S.
  26. *  Government do not and cannot warrant the performance or results that
  27. *  may be obtained by using this software or data. The NLM and the U.S.
  28. *  Government disclaim all warranties, express or implied, including
  29. *  warranties of performance, merchantability or fitness for any particular
  30. *  purpose.
  31. *
  32. *  Please cite the author in any work or product based on this material.
  33. *
  34. * ===========================================================================
  35. *
  36. * Author:  Aaron Ucko
  37. *
  38. * File Description:
  39. *   Pools of generic request-handling threads.
  40. *
  41. *   TEMPLATES:
  42. *      CBlockingQueue<>  -- queue of requests, with efficiently blocking Get()
  43. *      CThreadInPool<>   -- abstract request-handling thread
  44. *      CPoolOfThreads<>  -- abstract pool of threads sharing a request queue
  45. *
  46. *   SPECIALIZATIONS:
  47. *      CStdRequest       -- abstract request type
  48. *      CStdThreadInPool  -- thread handling CStdRequest
  49. *      CStdPoolOfThreads -- pool of threads handling CStdRequest
  50. */
  51. #include <corelib/ncbistd.hpp>
  52. #include <corelib/ncbithr.hpp>
  53. #include <corelib/ncbi_limits.hpp>
  54. #include <util/util_exception.hpp>
  55. #include <deque>
  56. /** @addtogroup ThreadedPools
  57.  *
  58.  * @{
  59.  */
  60. BEGIN_NCBI_SCOPE
  61. /////////////////////////////////////////////////////////////////////////////
  62. //
  63. //  TEMPLATES:
  64. //
  65. //     CBlockingQueue<>  -- queue of requests, with efficiently blocking Get()
  66. //     CThreadInPool<>   -- abstract request-handling thread
  67. //     CPoolOfThreads<>  -- abstract pool of threads sharing a request queue
  68. //
  69. template <typename TRequest>
  70. class CBlockingQueue
  71. {
  72. public:
  73.     CBlockingQueue(unsigned int max_size = kMax_UInt)
  74.         : m_GetSem(0,1), m_PutSem(1,1), m_MaxSize(max_size) {}
  75.     void         Put(const TRequest& data); // Throws exception if full
  76.     void         WaitForRoom(unsigned int timeout_sec  = kMax_UInt,
  77.                              unsigned int timeout_nsec = kMax_UInt) const;
  78.     // Blocks politely if empty
  79.     TRequest     Get(unsigned int timeout_sec  = kMax_UInt,
  80.                      unsigned int timeout_nsec = kMax_UInt);
  81.     unsigned int GetSize(void) const;
  82.     unsigned int GetMaxSize(void) const { return m_MaxSize; }
  83.     bool         IsEmpty(void) const    { return GetSize() == 0; }
  84.     bool         IsFull(void) const     { return GetSize() == GetMaxSize(); }
  85. protected:
  86.     // Derived classes should take care to use these members properly.
  87.     volatile deque<TRequest> m_Queue;
  88.     CSemaphore               m_GetSem; // Raised iff the queue contains data
  89.     mutable CSemaphore       m_PutSem; // Raised iff the queue has room
  90.     mutable CMutex           m_Mutex;  // Guards access to queue
  91. private:
  92.     unsigned int             m_MaxSize;
  93. };
  94. // Forward declaration
  95. template <typename TRequest> class CPoolOfThreads;
  96. template <typename TRequest>
  97. /* abstract */ class CThreadInPool : public CThread
  98. {
  99. public:
  100.     typedef CPoolOfThreads<TRequest> TPool;
  101.     CThreadInPool(TPool* pool) : m_Pool(pool) {}
  102. protected:
  103.     virtual ~CThreadInPool(void) {}
  104.     virtual void Init(void) {} // called at beginning of Main()
  105.     // Called from Main() for each request this thread handles
  106.     virtual void ProcessRequest(const TRequest& req) = 0;
  107.     virtual void x_OnExit(void) {} // called by OnExit()
  108. private:
  109.     // to prevent overriding; inherited from CThread
  110.     virtual void* Main(void);
  111.     virtual void OnExit(void);
  112.     TPool* m_Pool;
  113. };
  114. template <typename TRequest>
  115. /* abstract */ class CPoolOfThreads
  116. {
  117. public:
  118.     friend class CThreadInPool<TRequest>;
  119.     typedef CThreadInPool<TRequest> TThread;
  120.     CPoolOfThreads(unsigned int max_threads, unsigned int queue_size,
  121.                    int spawn_threshold = 1)
  122.         : m_MaxThreads(max_threads), m_Threshold(spawn_threshold),
  123.           m_Queue(queue_size)
  124.         { m_ThreadCount.Set(0);  m_Delta.Set(0); }
  125.     virtual ~CPoolOfThreads(void);
  126.     void Spawn(unsigned int num_threads);
  127.     void AcceptRequest(const TRequest& req);
  128.     void WaitForRoom(void)  { m_Queue.WaitForRoom(); }
  129.     bool IsFull(void) const { return m_Queue.IsFull(); }
  130. protected:
  131.     virtual TThread* NewThread(void) = 0;
  132.     CAtomicCounter           m_ThreadCount;
  133.     volatile unsigned int    m_MaxThreads;
  134.     CAtomicCounter           m_Delta;     // # unfinished requests - # threads
  135.     int                      m_Threshold; // for delta
  136.     CMutex                   m_Mutex;     // for m_MaxThreads
  137.     CBlockingQueue<TRequest> m_Queue;
  138. };
  139. /////////////////////////////////////////////////////////////////////////////
  140. //
  141. //  SPECIALIZATIONS:
  142. //
  143. //     CStdRequest       -- abstract request type
  144. //     CStdThreadInPool  -- thread handling CStdRequest
  145. //     CStdPoolOfThreads -- pool of threads handling CStdRequest
  146. //
  147. /* abstract */ class CStdRequest : public CObject
  148. {
  149. public:
  150.     virtual ~CStdRequest(void) {}
  151.     // Called by whichever thread handles this request.
  152.     virtual void Process(void) = 0;
  153. };
  154. class NCBI_XUTIL_EXPORT CStdThreadInPool
  155.     : public CThreadInPool< CRef< CStdRequest > >
  156. {
  157. public:
  158.     typedef CThreadInPool< CRef< CStdRequest > > TParent;
  159.     CStdThreadInPool(TPool* pool) : TParent(pool) {}
  160. protected:
  161.     virtual void ProcessRequest(const CRef<CStdRequest>& req)
  162.         { const_cast<CStdRequest&>(*req).Process(); }
  163.     // virtual void Init(void); // called before processing any requests
  164.     // virtual void x_OnExit(void); // called just before exiting
  165. };
  166. class NCBI_XUTIL_EXPORT CStdPoolOfThreads
  167.     : public CPoolOfThreads< CRef< CStdRequest > >
  168. {
  169. public:
  170.     typedef CPoolOfThreads< CRef< CStdRequest > > TParent;
  171.     CStdPoolOfThreads(unsigned int max_threads, unsigned int queue_size,
  172.                       int spawn_threshold = 1)
  173.         : TParent(max_threads, queue_size, spawn_threshold) {}
  174.     // void Spawn(unsigned int num_threads);
  175.     // void AcceptRequest(const TRequest& req);
  176.     // Causes all threads in the pool to exit cleanly after finishing
  177.     // all pending requests, optionally waiting for them to die.
  178.     virtual void KillAllThreads(bool wait);
  179. protected:
  180.     virtual TThread* NewThread(void)
  181.         { return new CStdThreadInPool(this); }
  182. };
  183. /////////////////////////////////////////////////////////////////////////////
  184. /////////////////////////////////////////////////////////////////////////////
  185. //  IMPLEMENTATION of INLINE functions
  186. /////////////////////////////////////////////////////////////////////////////
  187. /////////////////////////////////////////////////////////////////////////////
  188. //   CBlockingQueue<>::
  189. //
  190. template <typename TRequest>
  191. void CBlockingQueue<TRequest>::Put(const TRequest& data)
  192. {
  193.     CMutexGuard guard(m_Mutex);
  194.     // Having the mutex, we can safely drop "volatile"
  195.     deque<TRequest>& q = const_cast<deque<TRequest>&>(m_Queue);
  196.     if (q.size() == m_MaxSize) {
  197.         m_PutSem.TryWait();
  198.         NCBI_THROW(CBlockingQueueException, eFull, "CBlockingQueue<>::Put: "
  199.                    "attempt to insert into a full queue");
  200.     } else if (q.empty()) {
  201.         m_GetSem.Post();
  202.     }
  203.     q.push_back(data);
  204. }
  205. template <typename TRequest>
  206. void CBlockingQueue<TRequest>::WaitForRoom(unsigned int timeout_sec,
  207.                                            unsigned int timeout_nsec) const
  208. {
  209.     // Make sure there's room, but don't actually consume anything
  210.     if (m_PutSem.TryWait(timeout_sec, timeout_nsec)) {
  211.         m_PutSem.Post();
  212.     } else {
  213.         NCBI_THROW(CBlockingQueueException, eTimedOut,
  214.                    "CBlockingQueue<>::WaitForRoom: timed out");        
  215.     }
  216. }
  217. template <typename TRequest>
  218. TRequest CBlockingQueue<TRequest>::Get(unsigned int timeout_sec,
  219.                                        unsigned int timeout_nsec)
  220. {
  221.     if ( !m_GetSem.TryWait(timeout_sec, timeout_nsec) ) {
  222.         NCBI_THROW(CBlockingQueueException, eTimedOut,
  223.                    "CBlockingQueue<>::Get: timed out");        
  224.     }
  225.     CMutexGuard guard(m_Mutex);
  226.     // Having the mutex, we can safely drop "volatile"
  227.     deque<TRequest>& q = const_cast<deque<TRequest>&>(m_Queue);
  228.     TRequest result = q.front();
  229.     q.pop_front();
  230.     if ( ! q.empty() ) {
  231.         m_GetSem.Post();
  232.     }
  233.     // Get the attention of WaitForRoom() or the like; do this
  234.     // regardless of queue size because derived classes may want
  235.     // to insert multiple objects atomically.
  236.     m_PutSem.TryWait();
  237.     m_PutSem.Post();
  238.     return result;
  239. }
  240. template <typename TRequest>
  241. unsigned int CBlockingQueue<TRequest>::GetSize(void) const
  242. {
  243.     CMutexGuard guard(m_Mutex);
  244.     return const_cast<const deque<TRequest>&>(m_Queue).size();
  245. }
  246. /////////////////////////////////////////////////////////////////////////////
  247. //   CThreadInPool<>::
  248. //
  249. template <typename TRequest>
  250. void* CThreadInPool<TRequest>::Main(void)
  251. {
  252.     Detach();
  253.     Init();
  254.     for (;;) {
  255.         m_Pool->m_Delta.Add(-1);
  256.         ProcessRequest(m_Pool->m_Queue.Get());
  257.     }
  258.     return 0; // Unreachable, but necessary for WorkShop build
  259. }
  260. template <typename TRequest>
  261. void CThreadInPool<TRequest>::OnExit(void)
  262. {
  263.     try {
  264.         x_OnExit();
  265.     } catch (...) {
  266.         // Ignore exceptions; there's nothing useful we can do anyway
  267.     }
  268.     m_Pool->m_ThreadCount.Add(-1);
  269. }
  270. /////////////////////////////////////////////////////////////////////////////
  271. //   CPoolOfThreads<>::
  272. //
  273. template <typename TRequest>
  274. CPoolOfThreads<TRequest>::~CPoolOfThreads(void)
  275. {
  276.     CAtomicCounter::TValue n = m_ThreadCount.Get();
  277.     if (n) {
  278.         ERR_POST(Warning << "CPoolOfThreads<>::~CPoolOfThreads: "
  279.                  << n << " thread(s) still active");
  280.     }
  281. }
  282. template <typename TRequest>
  283. void CPoolOfThreads<TRequest>::Spawn(unsigned int num_threads)
  284. {
  285.     for (unsigned int i = 0; i < num_threads; i++)
  286.     {
  287.         m_ThreadCount.Add(1);
  288.         NewThread()->Run();
  289.     }
  290. }
  291. template <typename TRequest>
  292. void CPoolOfThreads<TRequest>::AcceptRequest(const TRequest& req)
  293. {
  294.     bool new_thread = false;
  295.     {{
  296.         CMutexGuard guard(m_Mutex);
  297.         m_Queue.Put(req);
  298.         if (static_cast<int>(m_Delta.Add(1)) >= m_Threshold
  299.             &&  m_ThreadCount.Get() < m_MaxThreads) {
  300.             // Add another thread to the pool because they're all busy.
  301.             m_ThreadCount.Add(1);
  302.             new_thread = true;
  303.         }
  304.     }}
  305.     if (new_thread) {
  306.         NewThread()->Run();
  307.     }
  308. }
  309. END_NCBI_SCOPE
  310. /* @} */
  311. /*
  312. * ===========================================================================
  313. *
  314. * $Log: thread_pool.hpp,v $
  315. * Revision 1000.1  2004/06/03 19:28:22  gouriano
  316. * PRODUCTION: UPGRADED [GCC34_MSVC7] Dev-tree R1.12
  317. *
  318. * Revision 1.12  2004/06/02 17:49:08  ucko
  319. * CPoolOfThreads: change type of m_Delta and m_ThreadCount to
  320. * CAtomicCounter to reduce need for m_Mutex; warn if any threads are
  321. * still active when the destructor runs.
  322. *
  323. * Revision 1.11  2003/04/17 17:50:37  siyan
  324. * Added doxygen support
  325. *
  326. * Revision 1.10  2003/02/26 21:34:06  gouriano
  327. * modify C++ exceptions thrown by this library
  328. *
  329. * Revision 1.9  2002/12/19 14:51:00  dicuccio
  330. * Added export specifier for Win32 DLL builds.
  331. *
  332. * Revision 1.8  2002/11/04 21:29:00  grichenk
  333. * Fixed usage of const CRef<> and CRef<> constructor
  334. *
  335. * Revision 1.7  2002/09/13 15:16:03  ucko
  336. * Give CBlockingQueue<>::{WaitForRoom,Get} optional timeouts (infinite
  337. * by default); change exceptions to use new setup.
  338. *
  339. * Revision 1.6  2002/04/18 15:38:19  ucko
  340. * Use "deque" instead of "queue" -- more general, and less likely to
  341. * yield any name conflicts.
  342. * Make most of CBlockingQueue<>'s data protected for the benefit of
  343. * derived classes.
  344. * Move CVS log to end.
  345. *
  346. * Revision 1.5  2002/04/11 15:12:52  ucko
  347. * Added GetSize and GetMaxSize methods to CBlockingQueue and rewrote
  348. * Is{Empty,Full} in terms of them.
  349. *
  350. * Revision 1.4  2002/01/25 15:46:06  ucko
  351. * Add more methods needed by new threaded-server code.
  352. * Minor cleanups.
  353. *
  354. * Revision 1.3  2002/01/24 20:17:49  ucko
  355. * Introduce new exception class for full queues
  356. * Allow waiting for a full queue to have room again
  357. *
  358. * Revision 1.2  2002/01/07 20:15:06  ucko
  359. * Fully initialize thread-pool state.
  360. *
  361. * Revision 1.1  2001/12/11 19:54:44  ucko
  362. * Introduce thread pools.
  363. *
  364. * ===========================================================================
  365. */
  366. #endif  /* THREAD_POOL__HPP */