RpcEventHandler.cpp
上传用户:nvosite88
上传日期:2007-01-17
资源大小:4983k
文件大小:9k
源码类别:

VxWorks

开发平台:

C/C++

  1. /* RpcEventHandler */
  2. /* Copyright (c) 1999 Wind River Systems, Inc. */
  3. /*
  4. modification history
  5. --------------------
  6. 01x,17dec01,nel  Add include symbol for diab.
  7. 01w,02oct01,nel  Move debug hooks into SockStream class.
  8. 01v,25sep01,nel  Correct prototype error under test harness build.
  9. 01u,10sep01,nel  Add dcomShow hooks into EventHandler.
  10. 01t,03aug01,dbs  remove usage of Thread class
  11. 01s,13jul01,dbs  fix up includes
  12. 01r,26jun00,dbs  implement presentation context IDs
  13. 01q,24may00,dbs  add fault diagnostics
  14. 01p,19aug99,aim  change assert to VXDCOM_ASSERT
  15. 01o,30jul99,aim  added thread pooling
  16. 01n,13jul99,aim  syslog api changes
  17. 01m,09jul99,dbs  use final filenames
  18. 01l,09jul99,dbs  tidy up logging of packets
  19. 01k,07jul99,aim  added ostream operator<<
  20. 01j,02jul99,aim  fix for name changes in RpcPduFactory
  21. 01i,28jun99,dbs  remove defaultInstance method
  22. 01h,28jun99,dbs  make sure authn-status is preserved after BIND PDU
  23. 01g,24jun99,dbs  add authn checking
  24. 01f,24jun99,dbs  move authn into new class
  25. 01e,22jun99,dbs  fix includes again
  26. 01d,18jun99,aim  set data rep on outgoing packets
  27. 01c,08jun99,aim  rework
  28. 01b,08jun99,aim  now uses NRpcPdu
  29. 01a,27may99,aim  created
  30. */
  31. #include "RpcEventHandler.h"
  32. #include "RpcDispatcher.h"
  33. #include "RpcDispatchTable.h"
  34. #include "RpcPduFactory.h"
  35. #include "RpcIfServer.h"
  36. #include "Reactor.h"
  37. #include "SCM.h"
  38. #include "Syslog.h"
  39. #include "TraceCall.h"
  40. #include "private/comMisc.h"
  41. #include "private/DebugHooks.h"
  42. #include "taskLib.h"
  43. /* Include symbol for diab */
  44. extern "C" int include_vxdcom_RpcEventHandler (void)
  45.     {
  46.     return 0;
  47.     }
  48. RpcEventHandler::~RpcEventHandler ()
  49.     {
  50.     TRACE_CALL;
  51.     S_INFO (LOG_RPC, "disconnect: " << (*this));
  52.     NTLMSSP* ssp = SCM::ssp ();
  53.     if (ssp)
  54. ssp->channelRemove (channelId ());
  55.     DELZERO (m_pdu);
  56.     }
  57. RpcEventHandler::RpcEventHandler (Reactor* reactor)
  58.   : SvcHandler<SockStream> (reactor),
  59.     m_pdu (0),
  60.     m_hostAddr (),
  61.     m_peerAddr (),
  62.     m_dispatcher (0),
  63.     m_creatorTaskId (::taskIdSelf ()),
  64.     m_acceptor (0)
  65.     {
  66.     TRACE_CALL;
  67.     NTLMSSP* ssp = SCM::ssp ();
  68.     if (ssp)
  69. ssp->channelAdd (channelId ());
  70.     }
  71. RpcEventHandler::RpcEventHandler
  72.     (
  73.     Reactor* reactor,
  74.     RpcDispatcher* dispatcher
  75.     )
  76.   : SvcHandler<SockStream> (reactor),
  77.     m_pdu (0),
  78.     m_hostAddr (),
  79.     m_peerAddr (),
  80.     m_dispatcher (dispatcher),
  81.     m_creatorTaskId (::taskIdSelf ()),
  82.     m_acceptor (0)
  83.     {
  84.     TRACE_CALL;
  85.     COM_ASSERT (m_dispatcher);
  86.     NTLMSSP* ssp = SCM::ssp ();
  87.     if (ssp)
  88. ssp->channelAdd (channelId ());
  89.     }
  90. int RpcEventHandler::open (void* pv)
  91.     {
  92.     acceptorSet (static_cast<RpcIfServer*> (pv));
  93.     if (concurrency() == RpcIfServer::ThreadPerConnection)
  94. {
  95. COM_ASSERT (0); // NYI
  96. }
  97.     return super::open (pv);
  98.     }
  99. int RpcEventHandler::close (unsigned long flags)
  100.     {
  101.     return super::close (flags);
  102.     }
  103. const INETSockAddr&
  104. RpcEventHandler::peerAddr ()
  105.     {
  106.     stream().peerAddrGet (m_peerAddr);
  107.     return m_peerAddr;
  108.     }
  109. const INETSockAddr&
  110. RpcEventHandler::hostAddr ()
  111.     {
  112.     stream().hostAddrGet (m_hostAddr);
  113.     return m_hostAddr;
  114.     }
  115. int
  116. RpcEventHandler::handleInput (REACTOR_HANDLE handle)
  117.     {
  118.     TRACE_CALL;
  119.     if (concurrency () == RpcIfServer::ThreadPooled)
  120. {
  121. if (::taskIdSelf () == m_creatorTaskId)
  122.     {
  123.     reactorGet()->handlerRemove (this,
  124.  EventHandler::READ_MASK |
  125.  EventHandler::DONT_CALL);
  126.     return threadPool()->enqueue (this);
  127.     }
  128. }
  129.     // Any strategy other than thread-per-connection will eventually get
  130.     // here.  If we're in the single-threaded implementation or the
  131.     // thread-pool, we still have to pass this way.
  132.     int result = process ();
  133.     // Now, we look again to see if we're in the thread-pool
  134.     // implementation.  If so then we need to re-register ourselves with
  135.     // the reactor so that we can get more work when it is available.
  136.     if (concurrency () == RpcIfServer::ThreadPooled)
  137. {
  138. if (result != -1)
  139.     reactorGet()->handlerAdd (this, EventHandler::READ_MASK);
  140. }
  141.     return result;
  142.     }
  143. int RpcEventHandler::process ()
  144.     {
  145.     TRACE_CALL;
  146.     const int buflen = 1024;
  147.     char buf [buflen];
  148.     char* pbuf = buf;
  149.     ssize_t n = stream().recv (pbuf, buflen);
  150.     if (n > 0)
  151. {
  152. S_DEBUG (LOG_RPC, (*this) << " read: " << (int) n);
  153. /* We only process the first fragment of the packet because */
  154. /* RpcPdu doesn't store the data contiguiouly in memory at the */
  155. /* moment so we can't just feed the byte stream into the debug */
  156. /* hook */
  157. stream ().processDebugOutput (pRpcServerInput, (BYTE *)pbuf, n);
  158. }
  159.     int result = -1; // guilty until proved innocent
  160.     while (n > 0)
  161. {
  162. if (m_pdu == 0 && (m_pdu = new RpcPdu ()) == 0)
  163.     break; // ENOMEM
  164. int consumed = m_pdu->append (pbuf, n);
  165. if (m_pdu->complete ())
  166.     {
  167.     result = dispatchPdu (*m_pdu);
  168.     DELZERO (m_pdu);
  169.     if (result != 0)
  170. break;
  171.     }
  172. else
  173.     {
  174.     S_DEBUG (LOG_RPC, "pdu not complete");
  175.     result = 0; // hang on for more data
  176.     }
  177. // move offset into appended data
  178. pbuf += consumed;
  179. n -= consumed;
  180. }
  181.     
  182.     if (result < 0)
  183. DELZERO (m_pdu);
  184.     return result;
  185.     }
  186.     
  187. int
  188. RpcEventHandler::dispatchAuth3 (const RpcPdu& auth3Pdu)
  189.     {
  190.     TRACE_CALL;
  191.     NTLMSSP* ssp = SCM::ssp ();
  192.     if (ssp)
  193. ssp->serverAuth3Validate (channelId (), auth3Pdu);
  194.     return 0;
  195.     }
  196. int
  197. RpcEventHandler::dispatchBind (const RpcPdu& bindPdu)
  198.     {
  199.     TRACE_CALL;
  200.     if (m_dispatcher == 0)
  201. return -1;
  202.     RpcPdu responsePdu;
  203.     IID iid = bindPdu.bind().presCtxList.presCtxElem[0].abstractSyntax.id;
  204.     USHORT presCtxId = bindPdu.bind().presCtxList.presCtxElem[0].presCtxId;
  205.     // If the dispatcher supports this interface-ID, then we are bound
  206.     // to it, via the given presentation-context ID...
  207.     if (m_dispatcher->supportsInterface (iid))
  208. RpcPduFactory::formatBindAckPdu (bindPdu,
  209.  responsePdu,
  210.  reinterpret_cast<ULONG> (this));
  211.     else
  212. RpcPduFactory::formatBindNakPdu (bindPdu, responsePdu);
  213.     // Process authentication trailers...
  214.     NTLMSSP* ssp = SCM::ssp ();
  215.     if (ssp)
  216. ssp->serverBindValidate (channelId (),
  217.  bindPdu,
  218.  responsePdu);
  219.     // Reply to the BIND...
  220.     int result = reply (bindPdu, responsePdu);
  221.     // If successful, record the presentation context...
  222.     if (result == 0)
  223. m_presCtxMap [presCtxId] = iid;
  224.     return result;
  225.     }
  226. int
  227. RpcEventHandler::dispatchRequest (const RpcPdu& requestPdu)
  228.     {
  229.     RpcPdu responsePdu;
  230.     // Find which presentation context this request is for, and so,
  231.     // which interface ID to use...
  232.     USHORT presCtxId = requestPdu.request().presCtxId;
  233.     PresCtxMap::const_iterator i = m_presCtxMap.find (presCtxId);
  234.     if (i == m_presCtxMap.end ())
  235. return -1;
  236.     
  237.     // Use the presentation context to select the right IID...    
  238.     m_dispatcher->dispatch (requestPdu,
  239.     responsePdu,
  240.     channelId (),
  241.     (*i).second);
  242.     // Process authentication trailers...
  243.     NTLMSSP* ssp = SCM::ssp ();
  244.     if (ssp)
  245. ssp->serverRequestValidate (channelId (),
  246.     requestPdu,
  247.     responsePdu);
  248.         
  249.     return reply (requestPdu, responsePdu);
  250.     }
  251. int
  252. RpcEventHandler::dispatchPdu (const RpcPdu& pdu)
  253.     {
  254.     TRACE_CALL;
  255.     
  256.     S_DEBUG (LOG_RPC, "recvPdu: " << pdu);
  257.     if (pdu.isRequest ())
  258. return dispatchRequest (pdu);
  259.     else if (pdu.isBind ())
  260. return dispatchBind (pdu);
  261.     else if (pdu.isAuth3 ())
  262. return dispatchAuth3 (pdu);
  263.     return 0;
  264.     }
  265. int RpcEventHandler::sendPdu (RpcPdu& pdu)
  266.     {
  267.     TRACE_CALL;
  268.     size_t len;
  269.     char *buf = 0;
  270.     int status = -1;
  271.     S_DEBUG (LOG_RPC, "sendPdu: " << pdu);
  272.     if (pdu.makeReplyBuffer (buf, len) < 0)
  273. stream().close ();
  274.     else if (stream().send (buf, len) == len)
  275. {
  276. stream ().processDebugOutput (pRpcServerOutput, (BYTE *)buf, len);
  277. status = 0;
  278. }
  279.     delete [] buf;
  280.     return status;
  281.     }
  282. int
  283. RpcEventHandler::reply (const RpcPdu& pdu, RpcPdu& replyPdu)
  284.     {
  285.     TRACE_CALL;
  286.     replyPdu.drepSet (pdu.drep ());
  287.     if (replyPdu.isFault ())
  288. {
  289. S_ERR (LOG_RPC, "RxREQUEST:" << pdu);
  290. S_ERR (LOG_RPC, "TxFAULT:" << replyPdu);
  291. }
  292.     
  293.     // send reply
  294.     return sendPdu (replyPdu);
  295.     }
  296. int
  297. RpcEventHandler::channelId () const
  298.     {
  299.     TRACE_CALL;
  300.     return reinterpret_cast<int> (this);
  301.     }
  302. int RpcEventHandler::concurrency ()
  303.     {
  304.     TRACE_CALL;
  305.     return acceptorGet()->concurrency ();
  306.     }
  307. ThreadPool* RpcEventHandler::threadPool ()
  308.     {
  309.     TRACE_CALL;
  310.     return acceptorGet()->threadPool ();
  311.     }
  312. RpcIfServer* RpcEventHandler::acceptorGet () const
  313.     {
  314.     TRACE_CALL;
  315.     return m_acceptor;
  316.     }
  317. RpcIfServer* RpcEventHandler::acceptorSet (RpcIfServer* pRpcIfServer)
  318.     {
  319.     TRACE_CALL;
  320.     return m_acceptor = pRpcIfServer;
  321.     }
  322.     
  323. ostream& operator<< (ostream& os, const RpcEventHandler& eh)
  324.     {
  325.     RpcEventHandler* p = const_cast<RpcEventHandler*> (&eh);
  326.     os << p->hostAddr ()
  327.        << " => "
  328.        << p->peerAddr ()
  329.        << " (fd="
  330.        << p->handleGet ()
  331.        << ") ";
  332.     return os;
  333.     }