Reactor.cpp
上传用户:baixin
上传日期:2008-03-13
资源大小:4795k
文件大小:14k
开发平台:

MultiPlatform

  1. /* Reactor - IO Multiplexor */
  2. /* Copyright (c) 1999 Wind River Systems, Inc. */
  3. /*
  4. modification history
  5. --------------------
  6. 01v,17dec01,nel  Add include symbol for diab build.
  7. 01u,10dec01,dbs  diab build
  8. 01t,20sep01,nel  Fix compilation for ARM.
  9. 01s,30jul01,dbs  fix assertion logic
  10. 01r,13jul01,dbs  fix up includes
  11. 01q,19jan00,nel  Modifications for Linux debug build
  12. 01p,21sep99,aim  changed pipeDevCreate(1024, 1024) => (1,1); this fixes the
  13.                  MBX860
  14. 01o,19aug99,aim  change assert to VXDCOM_ASSERT
  15. 01n,13aug99,aim  added ARG_UNUSED to clear compiler warnings
  16. 01m,12aug99,aim  MT fixes
  17. 01l,09aug99,aim  change wakeup behaviour
  18. 01k,02aug99,aim  added wakeup event handler
  19. 01j,21jul99,aim  quantify tweaks
  20. 01i,13jul99,aim  clear errno before entering select
  21. 01h,12jul99,aim  fix returned nHandles in select
  22. 01g,08jul99,aim  added timers
  23. 01f,07jul99,aim  added critical section protection
  24. 01e,29jun99,aim  reset timeout on eventLoopReset
  25. 01d,07jun99,aim  remove dubious event loop termination
  26. 01c,04jun99,aim  removed debug
  27. 01b,03jun99,aim  remove abort
  28. 01a,07may99,aim  created
  29. */
  30. #include "EventHandler.h"
  31. #include "Reactor.h"
  32. #include "Syslog.h"
  33. #include "TraceCall.h"
  34. #include "private/comMisc.h"
  35. /* Include symbol for diab */
  36. extern "C" int include_vxdcom_Reactor (void)
  37.     {
  38.     return 0;
  39.     }
  40. #ifdef VXDCOM_PLATFORM_VXWORKS
  41. #include "pipeDrv.h"
  42. #include "selectLib.h"
  43. #endif
  44. Reactor::Reactor ()
  45.   : m_rdHandles (),
  46.     m_wrHandles (),
  47.     m_exHandles (),
  48.     m_endEventLoop (false),
  49.     m_wakeupHandler (this),
  50.     m_handle2handlerMap (),
  51.     m_handler2handleMap (),
  52.     m_mutex (),
  53.     m_timerMap ()
  54.     {
  55.     TRACE_CALL;
  56.     if (m_wakeupHandler.handleGet () != INVALID_REACTOR_HANDLE)
  57. {
  58. timerAdd (&m_wakeupHandler, TimeValue (60));
  59. handlerAdd (&m_wakeupHandler, EventHandler::READ_MASK);
  60. }
  61.     else
  62. m_endEventLoop = true;
  63.     COM_ASSERT (m_wakeupHandler.handleGet () > 0);
  64.     }
  65. Reactor::~Reactor ()
  66.     {
  67.     TRACE_CALL;
  68.     Handle2HandlerMapIter iter (m_handle2handlerMap.begin ());
  69.     while (iter != m_handle2handlerMap.end ())
  70. {
  71. REACTOR_HANDLE handle = (*iter).first;
  72. EventHandler* eventHandler = (*iter).second;
  73. // note: handleClose could remove an entry from the map we are
  74. // currently iterating over and the entry it will delete is the
  75. // current position of the iterator.  Therefore we must
  76. // advance the iter and then call handleClose.
  77. ++iter;
  78. COM_ASSERT (eventHandler);
  79. if (eventHandler)
  80.     eventHandler->handleClose (handle);
  81. }
  82.     }
  83. int Reactor::run ()
  84.     {
  85.     TRACE_CALL;
  86.     return eventLoopRun ();
  87.     }
  88. int Reactor::eventLoopRun ()
  89.     {
  90.     TRACE_CALL;
  91.     int result = 0;
  92.     while (!m_endEventLoop)
  93. {
  94. if ((result = handleEvents ()) < 0)
  95.     break;
  96. }
  97.     return result;
  98.     }
  99. void Reactor::eventLoopEnd ()
  100.     {
  101.     TRACE_CALL;
  102.     m_endEventLoop = true;
  103.     }
  104. bool Reactor::eventLoopDone ()
  105.     {
  106.     TRACE_CALL;
  107.     return m_endEventLoop;
  108.     }
  109. void Reactor::close ()
  110.     {
  111.     TRACE_CALL;
  112.     eventLoopEnd ();
  113.     }
  114. int Reactor::handlerAdd
  115.     (
  116.     EventHandler* eventHandler,
  117.     REACTOR_EVENT_MASK eventMask
  118.     )
  119.     {
  120.     TRACE_CALL;
  121.     VxCritSec cs (m_mutex);
  122.     
  123.     REACTOR_HANDLE handle = eventHandler->handleGet ();
  124.     COM_ASSERT (handle != INVALID_REACTOR_HANDLE);
  125.     int result = handlerBind (handle, eventMask);
  126.     if (result != -1)
  127. {
  128. m_handle2handlerMap[handle] = eventHandler;
  129. m_handler2handleMap[eventHandler] = handle;
  130. }
  131.     wakeup ();
  132.     
  133.     return result;
  134.     }
  135. int Reactor::handlerRemove
  136.     (
  137.     EventHandler* eventHandler,
  138.     REACTOR_EVENT_MASK eventMask
  139.     )
  140.     {
  141.     TRACE_CALL;
  142.     VxCritSec cs (m_mutex);
  143.     
  144.     REACTOR_HANDLE handle;
  145.     int result = -1;
  146.     if (handleFind (eventHandler, handle) == 0)
  147. {
  148. m_handle2handlerMap.erase (handle);
  149. m_handler2handleMap.erase (eventHandler);
  150. result = handlerUnbind (handle, eventMask, eventHandler);
  151. }
  152.     wakeup ();
  153.     return result;
  154.     }
  155. int Reactor::handleFind
  156.     (
  157.     EventHandler* eventHandler,
  158.     REACTOR_HANDLE& handle
  159.     )
  160.     {
  161.     VxCritSec cs (m_mutex);
  162.     Handler2HandleMapIter iter;
  163.     iter = m_handler2handleMap.find (eventHandler);
  164.     if (iter == m_handler2handleMap.end ())
  165. return -1;
  166.     handle = (*iter).second;
  167.     return 0;
  168.     }
  169. int Reactor::handlerFind
  170.     (
  171.     REACTOR_HANDLE handle,
  172.     EventHandler*& eventHandler
  173.     )
  174.     {
  175.     VxCritSec cs (m_mutex);
  176.     Handle2HandlerMapIter iter;
  177.     iter = m_handle2handlerMap.find (handle);
  178.     if (iter == m_handle2handlerMap.end ())
  179. return -1;
  180.     eventHandler = (*iter).second;
  181.     return 0;
  182.     }
  183. int Reactor::select
  184.     (
  185.     int nHandles,
  186.     HandleSet* rdSet,
  187.     HandleSet* wrSet,
  188.     HandleSet*  exSet,
  189.     TimeValue* timeout
  190.     )
  191.     {
  192.     timeval* t = 0;
  193.     if (timeout)
  194. t = *timeout;
  195.     REACTOR_HANDLE_SET_TYPE* r = 
  196. rdSet ? *rdSet : (REACTOR_HANDLE_SET_TYPE *)NULL;
  197.     REACTOR_HANDLE_SET_TYPE* w = 
  198. wrSet ? *wrSet : (REACTOR_HANDLE_SET_TYPE *)NULL;
  199.     REACTOR_HANDLE_SET_TYPE* e = 
  200. exSet ? *exSet : (REACTOR_HANDLE_SET_TYPE *)NULL;
  201. #ifdef VXDCOM_PLATFORM_VXWORKS
  202.     errno = 0;
  203. #endif
  204.     
  205.     return ::select (nHandles, r, w, e, t);
  206.     }
  207. int Reactor::handleEvents ()
  208.     {
  209.     TRACE_CALL;
  210.     HandleSet rdSet (m_rdHandles);
  211.     HandleSet wrSet (m_wrHandles);
  212.     HandleSet exSet; // (m_exHandles);
  213.     int haveTimer = 0;
  214.     int maxHandle = max (rdSet.maxHandle (), wrSet.maxHandle ());
  215.     TimeValue timeout;
  216.     if (nextTimerGet (timeout) == 0)
  217. {
  218. haveTimer = 1;
  219. if (timeout < TimeValue::zero ())
  220.     timeout = TimeValue::zero (); // + TimeValue (0, 100);
  221. }
  222.     TimeValue startTime = TimeValue::now ();
  223.     int selectFds = select (maxHandle +1,
  224.     (rdSet.count () > 0) ? &rdSet : 0,
  225.     (wrSet.count () > 0) ? &wrSet : 0,
  226.     (exSet.count () > 0) ? &exSet : 0,
  227.     haveTimer ? &timeout : 0);
  228.     if (m_endEventLoop)
  229. return 0;
  230.     if (selectFds < 0)
  231. return -1;
  232.     if (haveTimer)
  233. updateTimers ((TimeValue::now() - startTime));
  234.     
  235.     if (haveTimer)
  236. dispatchTimers ();
  237.     if (selectFds > 0)
  238. {
  239. rdSet.sync (maxHandle +1);
  240. wrSet.sync (maxHandle +1);
  241. exSet.sync (maxHandle +1);
  242. dispatchFdEvents (rdSet, wrSet, exSet);
  243. }
  244.     return 0;
  245.     }
  246. int Reactor::dispatchFdEvents
  247.     (
  248.     HandleSet& rdSet,
  249.     HandleSet& wrSet,
  250.     HandleSet& exSet
  251.     )
  252.     {
  253.     // exceptions not supported on vxWorks
  254.     // dispatchFdEvents (exSet,
  255.     //                m_exHandles,
  256.     //                EventHandler::EXCEPT_MASK,
  257.     //                &EventHandler::handleException);
  258.     if (rdSet.count () > 0)
  259. {
  260. dispatchFdEvents (rdSet,
  261.   m_rdHandles,
  262.   EventHandler::READ_MASK,
  263.   &EventHandler::handleInput);
  264. }
  265. #if 0
  266.     if (wrSet.count () > 0)
  267. {
  268. dispatchFdEvents (wrSet,
  269.   m_wrHandles,
  270.   EventHandler::WRITE_MASK,
  271.   &EventHandler::handleOutput);
  272. }
  273. #endif
  274.     return 0;
  275.     }
  276. int Reactor::dispatchFdEvents
  277.     (
  278.     HandleSet& selectHandles,
  279.     HandleSet&         reactorHandles,
  280.     REACTOR_EVENT_MASK mask,
  281.     EventHandlerCallback callback
  282.     )
  283.     {
  284.     TRACE_CALL;
  285.     REACTOR_HANDLE handle;
  286.     HandleSetIterator hsIter (selectHandles);
  287.     while ((handle = hsIter ()) != INVALID_REACTOR_HANDLE)
  288. dispatchFdEvent (reactorHandles, handle, mask, callback);
  289.     return 0;
  290.     }
  291. int Reactor::dispatchFdEvent
  292.     (
  293.     HandleSet& reactorHandles,
  294.     REACTOR_HANDLE handle,
  295.     REACTOR_EVENT_MASK eventMask,
  296.     EventHandlerCallback callback
  297.     )
  298.     {
  299.     TRACE_CALL;
  300.     int result = -1;
  301.     EventHandler* eventHandler = 0;
  302.     handlerFind (handle, eventHandler);
  303.     if (eventHandler)
  304. {
  305. result = (eventHandler->*callback) (handle);
  306. if (result < 0)
  307.     handlerRemove (eventHandler, eventMask);
  308. else if (result > 0)
  309.     COM_ASSERT (0);
  310. // XXX reactorHandles.clr (handle);
  311. }
  312.     else
  313. {
  314. S_DEBUG(0, "Missing eventHandler: " << handle << (*this));
  315.         COM_ASSERT (eventHandler);
  316. }
  317.     return result;
  318.     }
  319. int Reactor::handlerBind
  320.     (
  321.     REACTOR_HANDLE handle,
  322.     REACTOR_EVENT_MASK eventMask
  323.     )
  324.     {
  325.     TRACE_CALL;
  326.     VxCritSec cs (m_mutex);
  327.     if (handle == INVALID_REACTOR_HANDLE)
  328. return -1;
  329.     if (eventMask & EventHandler::READ_MASK)
  330. m_rdHandles.set (handle);
  331.     if (eventMask & EventHandler::ACCEPT_MASK)
  332. m_rdHandles.set (handle);
  333.     if (eventMask & EventHandler::CONNECT_MASK)
  334. m_rdHandles.set (handle);
  335.     if (eventMask & EventHandler::WRITE_MASK)
  336. m_wrHandles.set (handle);
  337.     if (eventMask & EventHandler::EXCEPT_MASK)
  338. m_exHandles.set (handle);
  339.     return 0;
  340.     }
  341. int Reactor::handlerUnbind
  342.     (
  343.     REACTOR_HANDLE handle,
  344.     REACTOR_EVENT_MASK eventMask,
  345.     EventHandler* eventHandler
  346.     )
  347.     {
  348.     TRACE_CALL;
  349.     VxCritSec cs (m_mutex);
  350.     if (handle == INVALID_REACTOR_HANDLE)
  351. return -1;
  352.     if (eventMask & EventHandler::READ_MASK)
  353. m_rdHandles.clr (handle);
  354.     if (eventMask & EventHandler::ACCEPT_MASK)
  355. m_rdHandles.clr (handle);
  356.     if (eventMask & EventHandler::CONNECT_MASK)
  357. m_rdHandles.clr (handle);
  358.     if (eventMask & EventHandler::WRITE_MASK)
  359. m_wrHandles.clr (handle);
  360.     if (eventMask & EventHandler::EXCEPT_MASK)
  361. m_exHandles.clr (handle);
  362.     if ((eventMask & EventHandler::DONT_CALL) == 0)
  363. eventHandler->handleClose (handle, eventMask);
  364.     return 0;
  365.     }
  366. Reactor* Reactor::instance ()
  367.     {
  368.     static Reactor r;
  369.     return &r;
  370.     }
  371. const HandleSet& Reactor::rdHandles () const
  372.     {
  373.     TRACE_CALL;
  374.     return m_rdHandles;
  375.     }
  376. const HandleSet& Reactor::wrHandles () const
  377.     {
  378.     TRACE_CALL;
  379.     return m_wrHandles;
  380.     }
  381. const HandleSet& Reactor::exHandles () const
  382.     {
  383.     TRACE_CALL;
  384.     return m_exHandles;
  385.     }
  386. ostream& operator<< (ostream& os, const Reactor& r)
  387.     {
  388.     os << "read-mask ("
  389.        << r.rdHandles ()
  390.        << ") ";
  391. #if 0
  392.     os << "write-mask ("
  393.        << r.wrHandles ()
  394.        << ") ";
  395.     os << "event-mask ("
  396.        << r.exHandles ()
  397.        << ")";
  398. #endif
  399.     return os;
  400.     }
  401. void Reactor::timerAdd
  402.     (
  403.     EventHandler* eventHandler,
  404.     const TimeValue& timeValue
  405.     )
  406.     {
  407.     TRACE_CALL;
  408.     VxCritSec cs (m_mutex);
  409.     m_timerMap [eventHandler] = TimerValuePair (timeValue, timeValue);
  410.     wakeup ();
  411.     }
  412. void Reactor::timerRemove
  413.     (
  414.     EventHandler* eventHandler
  415.     )
  416.     {
  417.     TRACE_CALL;
  418.     VxCritSec cs (m_mutex);
  419.     m_timerMap.erase (eventHandler);
  420.     }
  421. int Reactor::nextTimerGet
  422.     (
  423.     TimeValue& timeValue
  424.     )
  425.     {
  426.     TRACE_CALL;
  427.     VxCritSec cs (m_mutex);
  428.     if (m_timerMap.size () == 0)
  429. return -1;
  430.     
  431.     TimerMap::const_iterator iter (m_timerMap.begin ());
  432.     while (iter != m_timerMap.end ())
  433. {
  434. const TimerValuePair& tp = (*iter).second;
  435. if (iter == m_timerMap.begin ())
  436.     timeValue = tp.second;
  437. else
  438.     timeValue = min (timeValue, tp.second);
  439. ++iter;
  440. }
  441.     return 0;
  442.     }
  443. int Reactor::updateTimers (const TimeValue& elapsedTime)
  444.     {
  445.     TRACE_CALL;
  446.     VxCritSec cs (m_mutex);
  447.     TimerMap::iterator iter (m_timerMap.begin ());
  448.     while (iter != m_timerMap.end ())
  449. {
  450. TimerValuePair& tp = (*iter).second;
  451. ++iter;
  452. tp.second -= elapsedTime;
  453. }
  454.     return 0;
  455.     }
  456. int Reactor::dispatchTimers ()
  457.     {
  458.     TRACE_CALL;
  459.     VxCritSec cs (m_mutex);
  460.     TimerMap::iterator iter (m_timerMap.begin ());
  461.     while (iter != m_timerMap.end ())
  462. {
  463. EventHandler* eventHandler = (*iter).first;
  464. TimerValuePair& tp = (*iter).second;
  465. // note: dispatchTimer (could) remove an entry from the map we
  466. // are currently iterating over.  The entry it will delete is
  467. // the current position of the iterator.  Therefore we must
  468. // advance the iter and then call dispatchTimer.
  469. ++iter;
  470. if (tp.second <= TimeValue::zero ())
  471.     {
  472.     dispatchTimer (tp.first, eventHandler);
  473.     tp.second = tp.first;       // reset timer
  474.     }
  475. }
  476.     return 0;
  477.     }
  478. int Reactor::dispatchTimer
  479.     (
  480.     const TimeValue& timeValue,
  481.     EventHandler* eventHandler
  482.     )
  483.     {
  484.     TRACE_CALL;
  485.     
  486.     COM_ASSERT (eventHandler);
  487.     int result = eventHandler->handleTimeout (timeValue);
  488.     if (result < 0)
  489. timerRemove (eventHandler);
  490.     return result;
  491.     }
  492. int Reactor::wakeup ()
  493.     {
  494.     TRACE_CALL;
  495.     return m_wakeupHandler.reactorWakeup ();
  496.     }
  497. Reactor::WakeupHandler::WakeupHandler (Reactor* reactor)
  498.   : m_wakeupPending (false),
  499.     m_wakeupPendingLock ()
  500.     {
  501.     TRACE_CALL;
  502.     m_handles[0] = INVALID_REACTOR_HANDLE;
  503.     m_handles[1] = INVALID_REACTOR_HANDLE;
  504.     reactorSet (reactor);
  505. #if defined (VXDCOM_PLATFORM_SOLARIS) || defined (VXDCOM_PLATFORM_LINUX)
  506.     int result = ::pipe (m_handles);
  507.     if (result != -1)
  508. {
  509. handleSet (m_handles[0]);
  510. }
  511. #elif defined (VXDCOM_PLATFORM_VXWORKS)
  512.     char* filename = "/pipe/vxdcom";
  513.     if (::pipeDevCreate (filename, 1, 1) == OK)
  514. {
  515. m_handles[0] = ::open (filename, O_RDONLY, 0);
  516. m_handles[1] = ::open (filename, O_WRONLY, 0);
  517. if (m_handles[0] < 0 || m_handles[1] < 0)
  518.     {
  519.     // tidy up on any error.
  520.     
  521.     ::close (m_handles[0]);
  522.     ::close (m_handles[1]);
  523.     m_handles[0] = INVALID_REACTOR_HANDLE;
  524.     m_handles[1] = INVALID_REACTOR_HANDLE;
  525.     }
  526. }
  527.     else
  528. {
  529. S_EMERG (LOG_REACTOR, "cannot open: " << filename);
  530. }
  531. #endif
  532.     }
  533. REACTOR_HANDLE Reactor::WakeupHandler::handleGet () const
  534.     {
  535.     return m_handles[0];
  536.     }
  537. REACTOR_HANDLE Reactor::WakeupHandler::handleSet (REACTOR_HANDLE handle)
  538.     {
  539.     return m_handles[0] = handle;
  540.     }
  541.     
  542. int Reactor::WakeupHandler::handleInput (REACTOR_HANDLE handle)
  543.     {
  544.     TRACE_CALL;
  545.     
  546.     VxCritSec cs (m_wakeupPendingLock);
  547.     
  548.     char buf [1];
  549.     COM_ASSERT (m_wakeupPending);
  550.     
  551.     int n = ::read (m_handles[0], buf, 1);
  552.     if (n != 1)
  553. {
  554. S_ERR (LOG_REACTOR | LOG_ERRNO,
  555.        "Reactor::WakeupHandler read failed");
  556. }
  557.     // Mark reactorWakeup() that so that a new event may be posted.
  558.         
  559.     m_wakeupPending = false;
  560.     // Always return 0 for this EventHandler.
  561.     return 0;
  562.     }
  563. int Reactor::WakeupHandler::reactorWakeup ()
  564.     {
  565.     TRACE_CALL;
  566.     VxCritSec cs (m_wakeupPendingLock);
  567.     // Don't bombard the Reactor if there is an event outstanding.
  568.     
  569.     if (m_wakeupPending)
  570. return 0;
  571.     m_wakeupPending = true;
  572.     // Now, wakeup the Reactor
  573.     int n = ::write (m_handles[1], "", 1); // write one null byte
  574.     if (n != 1)
  575. {
  576. S_ERR (LOG_REACTOR | LOG_ERRNO,
  577.        "Reactor::Wakeup failed to write null byte");
  578. }
  579.     // Always return 0 for this EventHandler.
  580.     
  581.     return 0;
  582.     }
  583. int Reactor::WakeupHandler::handleTimeout (const TimeValue&)
  584.     {
  585.     Reactor* reactor = reactorGet ();
  586.     COM_ASSERT (reactor);
  587.     S_DEBUG(LOG_REACTOR, (*reactor));
  588.     return 0;
  589.     }