ThreadPool.cpp
上传用户:baixin
上传日期:2008-03-13
资源大小:4795k
文件大小:6k
开发平台:

MultiPlatform

  1. /* ThreadPool */
  2. /* Copyright (c) 1999 Wind River Systems, Inc. */
  3. /*
  4. modification history
  5. --------------------
  6. 01m,22apr02,nel  SPR#76056. Correct test on task creation to test for -1
  7.                  rather than < 0.
  8. 01l,17dec01,nel  Add include symbol for diab.
  9. 01k,03aug01,dbs  remove usage of Thread class
  10. 01j,13jul01,dbs  fix up includes
  11. 01i,24feb00,dbs  fix thread-killing in dtor
  12. 01h,21sep99,aim  changed API for activate
  13. 01g,20sep99,aim  added Thread name parameter
  14. 01f,19aug99,aim  dtor now tries to clean up all threads
  15. 01e,19aug99,aim  change assert to VXDCOM_ASSERT
  16. 01d,13aug99,aim  added default threadPriority
  17. 01c,12aug99,aim  added queue length ctor parameter
  18. 01b,11aug99,aim  now blocks when all threads are busy
  19. 01a,29jul99,aim  created
  20. */
  21. #include "ThreadPool.h"
  22. #include "EventHandler.h"
  23. #include "Syslog.h"
  24. #include "TraceCall.h"
  25. #include "private/comMisc.h"
  26. #include "taskLib.h"
  27. /* Include symbol for diab */
  28. extern "C" int include_vxdcom_ThreadPool (void)
  29.     {
  30.     return 0;
  31.     }
  32. ThreadPool::ThreadPool
  33.     (
  34.     size_t thrStackSize,
  35.     long thrPriority
  36.     )
  37.   : m_minThreads (0),
  38.     m_maxThreads (0),
  39.     m_threadCount (0),
  40.     m_threadCountLock (),
  41.     m_thrStackSize (thrStackSize),
  42.     m_thrPriority (thrPriority),
  43.     m_thrScavenger (0),
  44.     m_thrName (0)
  45.     {
  46.     TRACE_CALL;
  47.     }
  48. ThreadPool::~ThreadPool ()
  49.     {
  50.     TRACE_CALL;
  51.     DELZERO (m_thrScavenger);
  52.     threadNameDelete ();
  53.     }
  54. int ThreadPool::open
  55.     (
  56.     int maxThreads,
  57.     const char* threadName
  58.     )
  59.     {
  60.     TRACE_CALL;
  61.     threadNameSet (threadName);
  62.     for (int i = 0; i < maxThreads; ++i)
  63. {
  64.     if (threadAdd () < 0)
  65. return -1;
  66.     else {
  67.         ++m_minThreads;
  68. ++m_maxThreads;
  69.     }
  70. }
  71.     return 0;
  72.     }
  73. int ThreadPool::open
  74.     (
  75.     Reactor* reactor,
  76.     int minThreads,
  77.     int maxThreads,
  78.     const char* threadName
  79.     )
  80.     {
  81.     TRACE_CALL;
  82.     if ((m_thrScavenger = createScavenger (reactor)) == 0)
  83. return -1;
  84.     threadNameSet (threadName);
  85.     for (int i = 0; i < minThreads; ++i)
  86. if (threadAdd () < 0) 
  87.     {
  88.     m_minThreads = 0;
  89.     m_maxThreads = 0;
  90.     return -1;
  91.     }
  92.     m_minThreads = minThreads;
  93.     m_maxThreads = maxThreads;
  94.     return 0;
  95.     }
  96. int ThreadPool::close ()
  97.     {
  98.     TRACE_CALL;
  99.     DELZERO (m_thrScavenger);
  100.     removeAll ();
  101.     // Post a NULL event to all possible threads...
  102.     for (int i=0; i < threadCount (); ++i)
  103. threadRemove ();
  104.     
  105.     // ...and wait for threads to terminate...
  106.     cout << "waiting for threads..." << endl;
  107.     while (threadCount () > 0)
  108.         {
  109.         cout << "...sleeping..." << endl;
  110.         ::taskDelay (1);
  111.         }
  112.     return 0;
  113.     }
  114. int ThreadPool::queueFullHandler ()
  115.     {
  116.     // If we have no scavenger we cannot dynamically add threads.
  117.     if (m_thrScavenger == 0)
  118. return 0;
  119.     if (threadCount () < maxThreads ())
  120. return threadAdd ();
  121.     else
  122. return 0;
  123.     }
  124. void* ThreadPool::serviceHandler ()
  125.     {
  126.     TRACE_CALL;
  127.     EventHandler* pEventHandler;
  128.     while (1)
  129. {
  130.         ::taskPrioritySet (::taskIdSelf (), m_thrPriority);
  131. pEventHandler = 0;
  132. // remove() will block until a job is inserted into the Q.
  133. if (remove (pEventHandler) < 0)
  134.     break;
  135. if (pEventHandler == 0)
  136.     break;
  137. REACTOR_HANDLE handle = pEventHandler->handleGet ();
  138. if (pEventHandler->handleInput (handle) < 0)
  139.     pEventHandler->handleClose (handle);
  140. }
  141.     VxCritSec cs (m_threadCountLock);
  142.     queueSizeSet (--m_threadCount);
  143.     
  144.     return 0;
  145.     }
  146. int ThreadPool::minThreads () const
  147.     {
  148.     TRACE_CALL;
  149.     return m_minThreads;
  150.     }
  151. int ThreadPool::maxThreads () const
  152.     {
  153.     TRACE_CALL;
  154.     return m_maxThreads;
  155.     }
  156. int ThreadPool::threadCount () const
  157.     {
  158.     TRACE_CALL;
  159.     return m_threadCount;
  160.     }
  161. int ThreadPool::enqueue (EventHandler* pEventHandler)
  162.     {
  163.     TRACE_CALL;
  164.     return add (pEventHandler);
  165.     }
  166. int ThreadPool::threadAdd ()
  167.     {
  168.     TRACE_CALL;
  169.     VxCritSec cs (m_threadCountLock);
  170.     // Create a new (named) thread.
  171.     
  172.     int result = activate (m_thrName, m_thrPriority, m_thrStackSize);
  173.     
  174.     if (result != -1)
  175. {
  176. queueSizeSet (++m_threadCount);
  177. return 0;
  178. }
  179.     
  180.     return -1;
  181.     }
  182. int ThreadPool::threadRemove ()
  183.     {
  184.     TRACE_CALL;
  185.     return enqueue (0);
  186.     }
  187. int ThreadPool::threadReaper ()
  188.     {
  189.     TRACE_CALL;
  190.     VxCritSec cs (m_threadCountLock);
  191.     int unusedThreads = threadCount() - queueSize ();
  192.     if ((threadCount () - unusedThreads) < minThreads ())
  193. unusedThreads -= minThreads ();
  194.     if (queueIsFull () || unusedThreads <= 0)
  195. return 0;
  196.     
  197.     while (unusedThreads-- > 0)
  198. threadRemove ();
  199.     return 0;
  200.     }
  201. ThreadPool::Scavenger* ThreadPool::createScavenger
  202.     (
  203.     Reactor* reactor,
  204.     int scavengerTimeout
  205.     )
  206.     {
  207.     if (reactor == 0)
  208. return 0;
  209.     
  210.     ThreadPool::Scavenger* s = new ThreadPool::Scavenger (reactor, this);
  211.     if (s)
  212. reactor->timerAdd (s, scavengerTimeout);
  213.     else
  214. DELZERO (s);
  215.     return s;
  216.     }
  217.     
  218. ThreadPool::Scavenger::Scavenger
  219.     (
  220.     Reactor* reactor,
  221.     ThreadPool* threadPool
  222.     )
  223.   : m_threadPool (threadPool)
  224.     {
  225.     TRACE_CALL;
  226.     }
  227. ThreadPool::Scavenger::~Scavenger ()
  228.     {
  229.     TRACE_CALL;
  230.     Reactor* reactor = reactorGet ();
  231.     if (reactor)
  232. reactor->timerRemove (this);
  233.     }
  234. int ThreadPool::Scavenger::handleTimeout (const TimeValue&)
  235.     {
  236.     TRACE_CALL;
  237.     COM_ASSERT (m_threadPool);
  238.     return m_threadPool->threadReaper ();
  239.     }
  240. void ThreadPool::threadNameDelete ()
  241.     {
  242.     delete [] m_thrName;
  243.     m_thrName = 0;
  244.     }
  245. void ThreadPool::threadNameSet (const char* threadName)
  246.     {
  247.     threadNameDelete ();
  248.     xstrdup (m_thrName, threadName);
  249.     }
  250. const char* ThreadPool::xstrdup (char*& dst, const char* src)
  251.     {
  252.     if (src == 0)
  253. return 0;
  254.     dst = new char [::strlen (src) +1 ];
  255.     return dst ? ::strcpy (dst, src) : 0;
  256.     }