msgQDistLib.c
上传用户:nvosite88
上传日期:2007-01-17
资源大小:4983k
文件大小:60k
源码类别:

VxWorks

开发平台:

C/C++

  1. /* msgQDistLib.c - distributed objects message queue library (VxFusion option) */
  2. /* Copyright 1999 - 2002 Wind River Systems, Inc. */
  3. /*
  4. modification history
  5. --------------------
  6. 01o,23oct01,jws  fix compiler warnings (SPR 71154); fix man pages (SPR 71239)
  7. 01n,24jul01,r_s  changed code to be ANSI compatible so that it compiles with
  8.                  diab. made asm macro changes for diab
  9. 01m,09jun99,drm  Changing an "errno = " to errnoSet()
  10. 01l,01jun99,drm  Changing documentation for msgQDistReceive() to indicate that
  11.                  the return value is the number of bytes received or ERROR
  12.                  rather than OK or ERROR.
  13. 01k,24may99,drm  added vxfusion prefix to VxFusion related includes
  14. 01j,23feb99,drm  adding S_distLib_UNREACHABLE to documentation
  15. 01i,23feb99,drm  returning different errno when overallTimeout expires
  16. 01h,18feb99,wlf  doc cleanup
  17. 01g,28oct98,drm  documentation modifications
  18. 01f,12aug98,drm  added #include stmt for distLibP.h
  19. 01e,08may98,ur   removed 8 bit node id restriction
  20. 01d,15apr98,ur   retransmit errors, if failed to send/receive
  21. 01c,09apr98,ur   added some errno setting, for remote errors
  22. 01b,04mar98,ur   patched memory leak in msgQDistInput/RECV_REQ.
  23. 01a,06jun97,ur   written.
  24. */
  25. /*
  26. DESCRIPTION
  27. This library provides the interface to distributed message queues.
  28. Any task on any node in the system can send messages to or receive
  29. from a distributed messsage queue. Full duplex communication between
  30. two tasks generally requires two distributed messsage queues, one for
  31. each direction.
  32. Distributed messsage queues are created with msgQDistCreate().  After
  33. creation, they can be manipulated using the generic routines for local
  34. message queues; for more information on the use of these routines, see the
  35. manual entry for msgQLib.  The msgQDistLib library also provides the 
  36. msgQDistSend(), msgQDistReceive(), and msgQDistNumMsgs() routines which 
  37. support additional parameters that are useful for working with distributed 
  38. message queues.
  39. The distributed objects message queue library is initialized by calling
  40. distInit().
  41. AVAILABILITY
  42. This module is distributed as a component of the unbundled distributed
  43. message queues option, VxFusion.
  44. INCLUDE FILES: msgQDistLib.h
  45. SEE ALSO: msgQLib, msgQDistShow, distLib
  46. */
  47. #include "vxWorks.h"
  48. #if defined (MSG_Q_DIST_REPORT) || defined (DIST_DIAGNOSTIC)
  49. #include "stdio.h"
  50. #endif
  51. #include "stdlib.h"
  52. #include "string.h"
  53. #include "sllLib.h"
  54. #include "errnoLib.h"
  55. #include "msgQLib.h"
  56. #include "semLib.h"
  57. #include "taskLib.h"
  58. #include "netinet/in.h"
  59. #include "private/semLibP.h"
  60. #include "private/msgQLibP.h"
  61. #include "vxfusion/msgQDistLib.h"
  62. #include "vxfusion/msgQDistGrpLib.h"
  63. #include "vxfusion/distIfLib.h"
  64. #include "vxfusion/distLib.h"
  65. #include "vxfusion/distStatLib.h"
  66. #include "vxfusion/private/msgQDistLibP.h"
  67. #include "vxfusion/private/msgQDistGrpLibP.h"
  68. #include "vxfusion/private/distPktLibP.h"
  69. #include "vxfusion/private/distNetLibP.h"
  70. #include "vxfusion/private/distLibP.h"
  71. /* globals */
  72. TBL_NODE                *pMsgQDistTbl;    /* windSh needs this global */
  73. /* locals */
  74. LOCAL SL_LIST            msgQDistTblFreeList;
  75. LOCAL SEMAPHORE          msgQDistTblLock;
  76. LOCAL int                msgQDistTblSize;
  77. LOCAL BOOL               msgQDistLibInstalled = FALSE;
  78. /* local prototypes */
  79. LOCAL STATUS                msgQDistTblPut (MSG_Q_ID msgQId, TBL_IX *pTblIx);
  80. #ifdef __SUPPORT_MSG_Q_DIST_DELETE
  81. LOCAL STATUS                msgQDistTblDelete (TBL_IX tblIx);
  82. #endif
  83. LOCAL MSG_Q_ID              msgQDistTblGet (TBL_IX tblIx);
  84. LOCAL DIST_STATUS           msgQDistInput (DIST_NODE_ID nodeIdSrc,
  85.                                            DIST_TBUF_HDR *pTBufHdr);
  86. LOCAL STATUS                msgQDistSendStatus (DIST_NODE_ID nodeIdDest,
  87.                                                 DIST_INQ_ID inqId,
  88.                                                 short error);
  89. LOCAL DIST_MSG_Q_STATUS     msgQDistRecvReply (DIST_NODE_ID nodeIdReceiver,
  90.                                                DIST_INQ_ID inqIdReceiver,
  91.                                                MSG_Q_ID msgQId,
  92.                                                char *buffer,
  93.                                                UINT maxNBytes,
  94.                                                int timeout,
  95.                                                BOOL lastTry);
  96. LOCAL DIST_MSG_Q_STATUS     msgQDistSendReply (DIST_NODE_ID nodeIdSender,
  97.                                                DIST_INQ_ID inqIdSender,
  98.                                                MSG_Q_ID msgQId,
  99.                                                char *buffer,
  100.                                                UINT nBytes,
  101.                                                int timeout,
  102.                                                int priority,
  103.                                                BOOL lastTry);
  104. /***************************************************************************
  105. *
  106. * msgQDistLibInit - initialize the distributed message queue package (VxFusion option)
  107. *
  108. * This routine initializes the distributed message queue package.
  109. * It currently does nothing.
  110. *
  111. * AVAILABILITY
  112. * This routine is distributed as a component of the unbundled distributed
  113. * message queues option, VxFusion.
  114. *
  115. * RETURNS: N/A
  116. *
  117. * NOMANUAL
  118. */
  119. void msgQDistLibInit (void)
  120.     {
  121.     }
  122. /***************************************************************************
  123. *
  124. * msgQDistInit - initialize distributed message queue library (VxFusion option)
  125. *
  126. * This routine initializes the distributed message queue library.
  127. * It must be called before any other routine in the library.
  128. * The argument <msgQDistMax> limits the number of distributed message
  129. * queues created on this node. The maximum number of distributed message
  130. * queues is DIST_MSG_Q_MAX_QS .
  131. *
  132. * RETURNS: OK or ERROR.
  133. *
  134. * NOMANUAL
  135. */
  136. STATUS msgQDistInit
  137.     (
  138.     int        sizeLog2        /* create 2^^sizeLog2 msgQ's */
  139.     )
  140.     {
  141.     TBL_IX    tblIx;
  142.     STATUS    status;
  143.     int       size;
  144.     int       msgQDistMax;
  145.     msgQDistMax = 1 << sizeLog2;
  146.     if (msgQDistMax > DIST_MSG_Q_MAX_QS)
  147.         {
  148. #ifdef DIST_DIAGNOSTIC
  149.         printf ("msgQDistInit: number of message queues is limited to %dn",
  150.                 DIST_MSG_Q_MAX_QS);
  151. #endif
  152.         return (ERROR);    /* too many local msgQs for underlying layer */
  153.         }
  154.     if (msgQDistLibInstalled == TRUE)
  155.         return (OK);
  156.     if (distInqInit (DIST_INQ_HASH_TBL_SZ_LOG2) == ERROR)
  157.         return (ERROR);
  158.     size = MEM_ROUND_UP (sizeof (TBL_NODE));
  159.     pMsgQDistTbl = (TBL_NODE *) malloc (msgQDistMax * size);
  160.     if (pMsgQDistTbl == NULL)
  161.         {
  162. #ifdef DIST_DIAGNOSTIC
  163.         printf ("msgQDistInit: memory allocation failedn");
  164. #endif
  165.         return (ERROR);    /* out of memory */
  166.         }
  167.     semBInit (&msgQDistTblLock, SEM_Q_PRIORITY, SEM_EMPTY);
  168.     msgQDistTblSize = msgQDistMax;
  169.     sllInit (&msgQDistTblFreeList);
  170.     for (tblIx = 0; tblIx < msgQDistMax; tblIx++)
  171.         {
  172.         pMsgQDistTbl[tblIx].tblIx = tblIx;
  173.         sllPutAtHead (&msgQDistTblFreeList,
  174.                       (SL_NODE *) &(pMsgQDistTbl[tblIx]));
  175.         }
  176.     msgQDistTblUnlock();
  177.     /* Add message queue service to table of services. */
  178.     status = distNetServAdd (DIST_PKT_TYPE_MSG_Q, msgQDistInput,
  179.                              DIST_MSG_Q_SERV_NAME, DIST_MSG_Q_SERV_NET_PRIO,
  180.                              DIST_MSG_Q_SERV_TASK_PRIO,
  181.                              DIST_MSG_Q_SERV_TASK_STACK_SZ);
  182.     if (status == ERROR)
  183.         {
  184. #ifdef DIST_DIAGNOSTIC
  185.         printf ("msgQDistInit: cannot attach servicen");
  186. #endif
  187.         return (ERROR);
  188.         }
  189.     msgQDistSendRtn       = (FUNCPTR) msgQDistSend;
  190.     msgQDistReceiveRtn    = (FUNCPTR) msgQDistReceive;
  191.     msgQDistNumMsgsRtn    = (FUNCPTR) msgQDistNumMsgs;
  192.     msgQDistLibInstalled = TRUE;
  193.     return (OK);
  194.     }
  195. /***************************************************************************
  196. *
  197. * msgQDistCreate - create a distributed message queue (VxFusion option)
  198. *
  199. * This routine creates a distributed message queue capable of
  200. * holding up to <maxMsgs> messages, each up to <maxMsgLength> bytes long.
  201. * This routine returns a message queue ID used to identify the created
  202. * message queue. The queue can be created with the following options:
  203. * is
  204. * i MSG_Q_FIFO (0x00)
  205. * The queue pends tasks in FIFO order.
  206. * i MSG_Q_PRIORITY (0x01)
  207. * The queue pends tasks in priority order. Remote tasks share the same
  208. * priority level.
  209. * ie
  210. *
  211. * The global message queue identifier returned can be used directly by generic
  212. * message queue handling routines in msgQLib, such as, msgQSend(), 
  213. * msgQReceive(), and msgQNumMsgs().
  214. *
  215. * AVAILABILITY
  216. * This routine is distributed as a component of the unbundled distributed
  217. * message queues option, VxFusion.
  218. *
  219. * RETURNS:
  220. * MSG_Q_ID, or NULL if there is an error.
  221. *
  222. * ERRNO:
  223. * is
  224. * i S_memLib_NOT_ENOUGH_MEMORY
  225. * If the routine is unable to allocate memory for message queues and message 
  226. * buffers.
  227. * i S_intLib_NOT_ISR_CALLABLE
  228. * If the routine is called from an interrupt service routine.
  229. * i S_msgQLib_INVALID_QUEUE_TYPE
  230. * If the type of queue is invalid.
  231. * i S_msgQDistLib_INVALID_MSG_LENGTH
  232. * If the message is too long for the VxFusion network layer.
  233. * ie
  234. *
  235. * SEE ALSO: msgQLib
  236. */
  237. MSG_Q_ID msgQDistCreate
  238.     (
  239.     int maxMsgs,         /* max messages that can be queued */
  240.     int maxMsgLength,    /* max bytes in a message */
  241.     int options          /* message queue options */
  242.     )
  243.     {
  244.     DIST_OBJ_NODE *   pObjNode;
  245.     MSG_Q_ID          msgQId;
  246.     TBL_IX            tblIx;
  247.     int               maxMsgLen;
  248.     if (!msgQDistLibInstalled)
  249.         return (NULL);    /* call msgQDistInit() first */
  250.     maxMsgLen = (DIST_IF_MAX_FRAGS * (DIST_IF_MTU - DIST_IF_HDR_SZ)) -
  251.                  DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_SEND);
  252.     if (maxMsgLength > maxMsgLen)
  253.         {
  254.         errnoSet (S_msgQDistLib_INVALID_MSG_LENGTH);
  255.         return (NULL);    /* msg too long for network layer */
  256.         }
  257.     if (options &~ MSG_Q_TYPE_MASK)
  258.         {
  259.         errnoSet (S_msgQLib_INVALID_QUEUE_TYPE);
  260.         return (NULL);    /* illegal option */
  261.         }
  262.     if ((msgQId = msgQCreate (maxMsgs, maxMsgLength, options)) == NULL)
  263.         return (NULL);    /* msgQCreate() failed */
  264.     if (msgQDistTblPut (msgQId, &tblIx) == ERROR)
  265.         {
  266.         msgQDelete (msgQId);
  267.         return (NULL);    /* table full */
  268.         }    
  269.     pObjNode = distObjNodeGet();
  270.     pObjNode->objNodeType      = DIST_OBJ_TYPE_MSG_Q;
  271.     pObjNode->objNodeReside    = distNodeLocalGetId();
  272.     pObjNode->objNodeId        = TBL_IX_TO_DIST_OBJ_ID (tblIx);
  273. #ifdef MSG_Q_DIST_REPORT
  274.     printf ("msgQDistCreate: dMsgQId 0x%lx, msgQId %pn",
  275.             dMsgQId, DIST_OBJ_NODE_TO_MSG_Q_ID (pObjNode));
  276. #endif
  277.     return (DIST_OBJ_NODE_TO_MSG_Q_ID (pObjNode));
  278.     }
  279. /***************************************************************************
  280. *
  281. * msgQDistSend - send a message to a distributed message queue (VxFusion option)
  282. *
  283. * This routine sends the message specified by <buffer> of length <nBytes> to 
  284. * the distributed message queue or group specified by <msgQId>.
  285. *
  286. * The argument <msgQTimeout> specifies the time in ticks to wait for the 
  287. * queuing of the message. The argument <overallTimeout> specifies the time in
  288. * ticks to wait for both the sending and queuing of the message.
  289. * While it is an error to set <overallTimeout> to NO_WAIT (0), 
  290. * WAIT_FOREVER (-1) is allowed for both <msgQTimeout> and <overallTimeout>.
  291. *
  292. * The <priority> parameter specifies the priority of the message being sent.
  293. * It ranges between DIST_MSG_PRI_0 (highest priority) and DIST_MSG_PRI_7 
  294. * (lowest priority).  A priority of MSG_PRI_URGENT is mapped
  295. * to DIST_MSG_PRI_0; MSG_PRI_NORMAL is mapped to DIST_MSG_PRI_4 .
  296. * Messages sent with high priorities (DIST_MSG_PRI_0 to DIST_MSG_PRI_3)
  297. * are put to the head of the list of queued messages.
  298. * Lower priority messages (DIST_MSG_PRI_4 to DIST_MSG_PRI_7) are placed
  299. * at the queue's tail.
  300. *
  301. * NOTE: When msgQDistSend() is called through msgQSend(), <msgQTimeout> is 
  302. * set to <timeout> and <overallTimeout> to WAIT_FOREVER .
  303. *
  304. * AVAILABILITY
  305. * This routine is distributed as a component of the unbundled distributed
  306. * message queues option, VxFusion.
  307. *
  308. * RETURNS: OK, or ERROR if the operation fails.
  309. *
  310. * ERRNO:
  311. * is
  312. * i S_distLib_OBJ_ID_ERROR
  313. * The argument <msgQId> is invalid.
  314. * i S_distLib_UNREACHABLE
  315. * Could not establish communications with the remote node.
  316. * i S_msgQDistLib_INVALID_PRIORITY
  317. * The argument <priority> is invalid.
  318. * i S_msgQDistLib_INVALID_TIMEOUT
  319. * The argument <overallTimeout> is NO_WAIT .
  320. * i S_msgQDistLib_RMT_MEMORY_SHORTAGE
  321. * There is not enough memory on the remote node.
  322. * i S_objLib_OBJ_UNAVAILABLE
  323. * The argument <msgQTimeout> is set to NO_WAIT, and the queue is full.
  324. * i S_objLib_OBJ_TIMEOUT
  325. * The queue is full for <msgQTimeout> ticks.
  326. * i S_msgQLib_INVALID_MSG_LENGTH
  327. * The argument <nBytes> is larger than the <maxMsgLength> set for the 
  328. * message queue.
  329. * i S_msgQDistLib_OVERALL_TIMEOUT
  330. * There was no response from the remote side in <overallTimeout> ticks.
  331. * ie
  332. *
  333. * SEE ALSO: msgQLib
  334. */
  335. STATUS msgQDistSend
  336.     (
  337.     MSG_Q_ID    msgQId,         /* message queue on which to send */
  338.     char *      buffer,         /* message to send                */
  339.     UINT        nBytes,         /* length of message              */
  340.     int         msgQTimeout,    /* ticks to wait at message queue */
  341.     int         overallTimeout, /* ticks to wait overall          */
  342.     int         priority        /* priority                       */
  343.     )
  344.     {
  345.     DIST_MSG_Q_ID     dMsgQId;
  346.     DIST_OBJ_NODE *   pObjNode;
  347.     MSG_Q_ID          lclMsgQId;
  348.     int               lclPriority;    /* MSG_PRI_URGENT or MSG_PRI_NORMAL */
  349.     DIST_MSG_Q_GRP_ID    distMsgQGrpId;
  350.     STATUS               status;
  351.     DIST_PKT_MSG_Q_SEND    pktSendHdr;
  352.     DIST_MSG_Q_SEND_INQ    inquiryNode;
  353.     DIST_INQ_ID            inquiryId;
  354.     DIST_IOVEC             distIOVec[2];
  355.     if (DIST_OBJ_VERIFY (msgQId) == ERROR)
  356.         {
  357.         errnoSet (S_distLib_OBJ_ID_ERROR);
  358.         return (ERROR);
  359.         }
  360.     if (overallTimeout == NO_WAIT)
  361.         {
  362.         errnoSet (S_msgQDistLib_INVALID_TIMEOUT);
  363.         return (ERROR);    /* makes no sense */
  364.         }
  365.     if (! DIST_MSG_Q_PRIO_VERIFY (priority))
  366.         {
  367.         errnoSet (S_msgQDistLib_INVALID_PRIORITY);
  368.         return (ERROR);    /* invalid priority */
  369.         }
  370.     pObjNode = MSG_Q_ID_TO_DIST_OBJ_NODE (msgQId);
  371.     if (! IS_DIST_MSG_Q_OBJ (pObjNode))
  372.         {
  373.         errnoSet (S_distLib_OBJ_ID_ERROR);
  374.         return (ERROR);    /* legal object id, but not a message queue */
  375.         }
  376.     dMsgQId = (DIST_MSG_Q_ID) pObjNode->objNodeId;
  377. #ifdef MSG_Q_DIST_REPORT
  378.     printf ("msgQDistSend: msgQId %p, dMsgQId 0x%lxn", msgQId, dMsgQId);
  379. #endif
  380.     if (IS_DIST_MSG_Q_TYPE_GRP (dMsgQId))
  381.         {
  382.         /* Message queue id points to a group. */
  383.         distMsgQGrpId = DIST_MSG_Q_ID_TO_DIST_MSG_Q_GRP_ID (dMsgQId);
  384.         status = msgQDistGrpSend (distMsgQGrpId, buffer, nBytes,
  385.                                   msgQTimeout, overallTimeout, priority);
  386. #ifdef MSG_Q_DIST_REPORT
  387.         printf ("msgQDistSend: msgQDistGrpSend returned = %dn", status);
  388. #endif
  389.         return (status);
  390.         }
  391.     if (!IS_DIST_OBJ_LOCAL (pObjNode))
  392.         {
  393.         /* Message queue id points to a remote queue. */
  394.         inquiryNode.sendInq.inqType = DIST_MSG_Q_INQ_TYPE_SEND;
  395.         semBInit (&(inquiryNode.sendInqWait), SEM_Q_FIFO, SEM_EMPTY);
  396.         inquiryNode.remoteError = FALSE;
  397.         inquiryNode.sendInqMsgQueued = FALSE;
  398.         inquiryNode.sendInqTask = taskIdSelf();
  399.         inquiryId = distInqRegister ((DIST_INQ *) &inquiryNode);
  400.         pktSendHdr.sendHdr.pktType = DIST_PKT_TYPE_MSG_Q;
  401.         pktSendHdr.sendHdr.pktSubType = DIST_PKT_TYPE_MSG_Q_SEND;
  402.         pktSendHdr.sendTblIx = DIST_MSG_Q_ID_TO_TBL_IX (dMsgQId);
  403.         pktSendHdr.sendInqId = (uint32_t) inquiryId;
  404.         pktSendHdr.sendTimeout =
  405.                 htonl ((uint32_t) DIST_TICKS_TO_MSEC (msgQTimeout));
  406.         /* use IOV stuff here, since we do not want to copy data */
  407.         distIOVec[0].pIOBuffer = &pktSendHdr;
  408.         distIOVec[0].IOLen = DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_SEND);
  409.         distIOVec[1].pIOBuffer = buffer;
  410.         distIOVec[1].IOLen = nBytes;
  411.         status = distNetIOVSend (pObjNode->objNodeReside, &distIOVec[0], 2,
  412.                                  WAIT_FOREVER,
  413.                                  DIST_MSG_Q_PRIO_TO_NET_PRIO (priority));
  414.         if (status == ERROR)
  415.             {
  416.             distInqCancel ((DIST_INQ *) &inquiryNode);
  417.             errnoSet (S_distLib_UNREACHABLE);
  418.             return (ERROR);
  419.             }
  420.         /*
  421.          * semTake() blocks the requesting task until the service
  422.          * task gives the semaphore, because the request has
  423.          * been processed.
  424.          */
  425.         semTake (&(inquiryNode.sendInqWait), overallTimeout);
  426.         distInqCancel ((DIST_INQ *) &inquiryNode);
  427.         if (inquiryNode.sendInqMsgQueued)
  428.             return (OK);
  429.         /* If errno = S_objLib_OBJ_TIMEOUT, it could either be a result
  430.          * of the timeout from the semaphore or the remote errno.  We must
  431.          * check the remoteError flag of inquiryNode to determine what the
  432.          * source of the error was.  If it is a result of the semaphore, we
  433.          * will set errno to S_msgQDistLib_OVERALLTIMEOUT.  Otherwise, we'll
  434.          * leave the errno as it is.
  435.          */
  436.         if (inquiryNode.remoteError == FALSE)
  437.             errno = S_msgQDistLib_OVERALL_TIMEOUT;
  438.         return (ERROR);
  439.         }
  440.     /* Message queue id points to a local queue. */
  441.     lclPriority = DIST_MSG_Q_PRIO_TO_MSG_Q_PRIO (priority);
  442.     lclMsgQId = msgQDistTblGet (DIST_MSG_Q_ID_TO_TBL_IX (dMsgQId));
  443.     if (lclMsgQId == NULL)
  444.         return (ERROR);    /* does not exist */
  445.     if (msgQSend (lclMsgQId, buffer, nBytes, msgQTimeout, lclPriority)
  446.           == ERROR)
  447.         {
  448.         return (ERROR);    /* error in msgQSend() */
  449.         }
  450.     return (OK);
  451.     }
  452. /***************************************************************************
  453. *
  454. * msgQDistReceive - receive a message from a distributed message queue (VxFusion option)
  455. *
  456. * This routine receives a message from the distributed message queue specified 
  457. * by <msgQId>.  The received message is copied into the specified buffer, 
  458. * <buffer>, which is <maxNBytes> in length.  If the message is longer than 
  459. * <maxNBytes>, the remainder of the message is discarded (no error indication
  460. * is returned).
  461. *
  462. * The argument <msgQTimeout> specifies the time in ticks to wait for the 
  463. * queuing of the message. The argument <overallTimeout> specifies the time
  464. * in ticks to wait for both the sending and queuing of the message.
  465. * While it is an error to set <overallTimeout> to NO_WAIT (0), 
  466. * WAIT_FOREVER (-1) is allowed for both <msgQTimeout> and <overallTimeout>.
  467. *
  468. * Calling msgQDistReceive() on a distributed message group returns an
  469. * error.
  470. *
  471. * NOTE: When msgQDistReceive() is called through msgQReceive(), 
  472. * <msgQTimeout> is set to <timeout> and <overallTimeout> to WAIT_FOREVER .
  473. *
  474. * AVAILABILITY
  475. * This routine is distributed as a component of the unbundled distributed
  476. * message queues option, VxFusion.
  477. *
  478. * RETURNS: The number of bytes copied to <buffer>, or ERROR. 
  479. *
  480. * ERRNO:
  481. * is
  482. * i S_distLib_OBJ_ID_ERROR
  483. * The argument <msgQId> is invalid.
  484. * i S_distLib_UNREACHABLE
  485. * Could not establish communications with the remote node.
  486. * i S_msgQLib_INVALID_MSG_LENGTH
  487. * The argument <maxNBytes> is less than 0.
  488. * i S_msgQDistLib_INVALID_TIMEOUT
  489. * The argument <overallTimeout> is NO_WAIT .
  490. * i S_msgQDistLib_RMT_MEMORY_SHORTAGE
  491. * There is not enough memory on the remote node.
  492. * i S_objLib_OBJ_UNAVAILABLE
  493. * The argument <msgQTimeout> is set to NO_WAIT, and no messages are available.
  494. * i S_objLib_OBJ_TIMEOUT
  495. * No messages were received in <msgQTimeout> ticks.
  496. * i S_msgQDistLib_OVERALL_TIMEOUT
  497. * There was no response from the remote side in <overallTimeout> ticks.
  498. * ie
  499. *
  500. *
  501. * SEE ALSO: msgQLib
  502. */
  503. int msgQDistReceive
  504.     (
  505.     MSG_Q_ID    msgQId,         /* message queue from which to receive */
  506.     char *      buffer,         /* buffer to receive message           */
  507.     UINT        maxNBytes,      /* length of buffer                    */
  508.     int         msgQTimeout,    /* ticks to wait at the message queue  */
  509.     int         overallTimeout  /* ticks to wait overall               */
  510.     )
  511.     {
  512.     DIST_MSG_Q_ID    dMsgQId;
  513.     DIST_OBJ_NODE *  pObjNode;
  514.     MSG_Q_ID         lclMsgQId;
  515.     DIST_PKT_MSG_Q_RECV_REQ    pktReq;
  516.     DIST_MSG_Q_RECV_INQ        inquiryNode;
  517.     DIST_INQ_ID                inquiryId;
  518.     STATUS                     status;
  519.     int                        nBytes;
  520.     if (DIST_OBJ_VERIFY (msgQId) == ERROR)
  521.         {
  522.         errnoSet (S_distLib_OBJ_ID_ERROR);
  523.         return (ERROR);
  524.         }
  525.     if (overallTimeout == NO_WAIT)
  526.         {
  527.         errnoSet (S_msgQDistLib_INVALID_TIMEOUT);
  528.         return (ERROR);    /* makes no sense */
  529.         }
  530.     /*
  531.      * Even though <maxNBytes> is unsigned, check for < 0 to catch
  532.      * possible caller errors.
  533.      */
  534.     if ((int) maxNBytes < 0)
  535.         {
  536.         errnoSet (S_msgQLib_INVALID_MSG_LENGTH);
  537.         return (ERROR);
  538.         }
  539.     pObjNode = MSG_Q_ID_TO_DIST_OBJ_NODE (msgQId);
  540.     if (! IS_DIST_MSG_Q_OBJ (pObjNode))
  541.         {
  542.         errnoSet (S_distLib_OBJ_ID_ERROR);
  543.         return (ERROR); /* legal object id, but not a message queue */
  544.         }
  545.     dMsgQId = (DIST_MSG_Q_ID) pObjNode->objNodeId;
  546. #ifdef MSG_Q_DIST_REPORT
  547.     printf ("msgQDistReceive: msgQId %p, dMsgQId 0x%lxn", msgQId, dMsgQId);
  548. #endif
  549.     if (IS_DIST_MSG_Q_TYPE_GRP (dMsgQId))
  550.         {
  551.         /* MSG_Q_ID is a group id. */
  552.         errnoSet (S_msgQDistLib_NOT_GROUP_CALLABLE);
  553.         return (ERROR);    /* error to call msgQReceive() on groups */
  554.         }
  555.     if (!IS_DIST_OBJ_LOCAL (pObjNode))
  556.         {
  557.         /*
  558.          * Queue is remote.
  559.          *
  560.          * Create a inquiry node and send a request to the remote
  561.          * node. Block until timeout exceeds or the request is
  562.          * answered.
  563.          */
  564.         inquiryNode.recvInq.inqType = DIST_MSG_Q_INQ_TYPE_RECV;
  565.         semBInit (&(inquiryNode.recvInqWait), SEM_Q_FIFO, SEM_EMPTY);
  566.         inquiryNode.recvInqTask = taskIdSelf();
  567.         inquiryNode.pRecvInqBuffer = buffer;
  568.         inquiryNode.recvInqMaxNBytes = maxNBytes;
  569.         inquiryNode.recvInqMsgArrived = FALSE;
  570.         inquiryNode.remoteError = FALSE;
  571.         inquiryId = distInqRegister ((DIST_INQ *) &inquiryNode);
  572.         pktReq.recvReqHdr.pktType = DIST_PKT_TYPE_MSG_Q;
  573.         pktReq.recvReqHdr.pktSubType = DIST_PKT_TYPE_MSG_Q_RECV_REQ;
  574.         pktReq.recvReqTblIx = DIST_MSG_Q_ID_TO_TBL_IX (dMsgQId);
  575.         pktReq.recvReqInqId = (uint32_t) inquiryId;
  576.         pktReq.recvReqMaxNBytes = htonl ((uint32_t) maxNBytes);
  577.         pktReq.recvReqTimeout = 
  578.                 htonl ((uint32_t) DIST_TICKS_TO_MSEC (msgQTimeout));
  579.         status = distNetSend (pObjNode->objNodeReside, (DIST_PKT *) &pktReq,
  580.                               DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_RECV_REQ),
  581.                               WAIT_FOREVER,
  582.                               DIST_MSG_Q_RECV_PRIO);
  583.         if (status == ERROR)
  584.             {
  585.             distInqCancel ((DIST_INQ *) &inquiryNode);
  586.             errnoSet (S_distLib_UNREACHABLE);
  587.             return (ERROR);
  588.             }
  589.         /*
  590.          * semTake() blocks the requesting task until
  591.          * the service task gives the semaphore, because
  592.          * the request has been processed.
  593.          */
  594.         semTake (&(inquiryNode.recvInqWait), overallTimeout);
  595.         if (inquiryNode.recvInqMsgArrived)
  596.             {
  597.             /*
  598.              * If <recvInqMsgArrived> is true, <recvInqMaxNBytes> has
  599.              * the number of bytes received.
  600.              */
  601.             nBytes = inquiryNode.recvInqMaxNBytes;
  602.             distInqCancel ((DIST_INQ *) &inquiryNode);
  603.             return (nBytes);
  604.             }
  605.         distInqCancel ((DIST_INQ *) &inquiryNode);
  606.         /* If errno = S_objLib_OBJ_TIMEOUT, it could either be a result
  607.          * of the timeout from the semaphore or the remote errno.  We must
  608.          * check the remoteError flag of inquiryNode to determine what the
  609.          * source of the error was.  If it is a result of the semaphore, we
  610.          * will set errno to S_msgQDistLib_OVERALLTIMEOUT.  Otherwise, we'll
  611.          * leave the errno as it is.
  612.          */
  613.         if (inquiryNode.remoteError == FALSE)
  614.             errnoSet (S_msgQDistLib_OVERALL_TIMEOUT);
  615.         return (ERROR);
  616.         }
  617.     /* The message queue is local to this node. This will be simple. */
  618.     lclMsgQId = msgQDistTblGet (DIST_MSG_Q_ID_TO_TBL_IX(dMsgQId));
  619.     if (lclMsgQId == NULL)
  620.         {
  621. #ifdef DIST_DIAGNOSTIC
  622.         printf ("msgQDistReceive: distributed message queue does not existn");
  623. #endif
  624.         return (ERROR);    /* does not exist */
  625.         }
  626.     return (msgQReceive (lclMsgQId, buffer, maxNBytes, msgQTimeout));
  627.     }
  628. /***************************************************************************
  629. *
  630. * msgQDistNumMsgs - get the number of messages in a distributed message queue (VxFusion option)
  631. *
  632. * This routine returns the number of messages currently queued to a specified
  633. * distributed message queue.
  634. *
  635. * NOTE:
  636. * When msgQDistNumMsgs() is called through msgQNumMsgs(), <overallTimeout>
  637. * is set to WAIT_FOREVER . You cannot set <overallTimeout> to NO_WAIT (0)
  638. * because the process of sending a message from the local node to the remote
  639. * node always takes a finite amount of time.
  640. *
  641. * AVAILABILITY
  642. * This routine is distributed as a component of the unbundled distributed
  643. * message queues option, VxFusion.
  644. *
  645. * RETURNS:
  646. * The number of messages queued, or ERROR if the operation fails.
  647. *
  648. * ERRNO:
  649. * is
  650. * i S_distLib_OBJ_ID_ERROR
  651. * The argument <msgQId> is invalid.
  652. * i S_distLib_UNREACHABLE
  653. * Could not establish communications with the remote node.
  654. * i S_msgQDistLib_INVALID_TIMEOUT
  655. * The argument <overallTimeout> is NO_WAIT .
  656. * ie
  657. *
  658. * SEE ALSO: msgQLib
  659. */
  660. int msgQDistNumMsgs
  661.     (
  662.     MSG_Q_ID    msgQId,            /* message queue to examine */
  663.     int         overallTimeout     /* ticks to wait overall    */
  664.     )
  665.     {
  666.     DIST_MSG_Q_ID    dMsgQId;
  667.     DIST_OBJ_NODE *  pObjNode;
  668.     MSG_Q_ID         lclMsgQId;
  669.     DIST_PKT_MSG_Q_NUM_MSGS_REQ    pktReq;
  670.     DIST_MSG_Q_NUM_MSGS_INQ        inquiryNode;
  671.     DIST_INQ_ID                    inquiryId;
  672.     int                            numMsgs;
  673.     STATUS                         status;
  674.     if (DIST_OBJ_VERIFY (msgQId) == ERROR)
  675.         {
  676.         errnoSet (S_distLib_OBJ_ID_ERROR);
  677.         return (ERROR);
  678.         }
  679.     if (overallTimeout == NO_WAIT)
  680.         {
  681.         errnoSet (S_msgQDistLib_INVALID_TIMEOUT);
  682.         return (ERROR);    /* makes no sense */
  683.         }
  684.     pObjNode = MSG_Q_ID_TO_DIST_OBJ_NODE (msgQId);
  685.     if (! IS_DIST_MSG_Q_OBJ (pObjNode))
  686.         {
  687.         errnoSet (S_distLib_OBJ_ID_ERROR);
  688.         return (ERROR);    /* legal object id, but not a message queue */
  689.         }
  690.     dMsgQId = (DIST_MSG_Q_ID) pObjNode->objNodeId;
  691. #ifdef MSG_Q_DIST_REPORT
  692.     printf ("msgQDistNumMsgs: msgQId %p, dMsgQId 0x%lxn", msgQId, dMsgQId);
  693. #endif
  694.     if (IS_DIST_MSG_Q_TYPE_GRP (dMsgQId))
  695.         {
  696.         errnoSet (S_msgQDistLib_NOT_GROUP_CALLABLE);
  697.         return (ERROR);    /* error to call msgQNumMsgs() on groups */
  698.         }
  699.     if (!IS_DIST_OBJ_LOCAL (pObjNode))    /* message queue is remote */
  700.         {
  701.         inquiryNode.numMsgsInq.inqType = DIST_MSG_Q_INQ_TYPE_NUM_MSGS;
  702.         semBInit (&(inquiryNode.numMsgsInqWait), SEM_Q_FIFO, SEM_EMPTY);
  703.         inquiryNode.numMsgsInqNum = ERROR;
  704.         inquiryNode.numMsgsInqTask = taskIdSelf();
  705.         inquiryId = distInqRegister ((DIST_INQ *) &inquiryNode);
  706.         
  707.         pktReq.numMsgsReqHdr.pktType = DIST_PKT_TYPE_MSG_Q;
  708.         pktReq.numMsgsReqHdr.pktSubType = DIST_PKT_TYPE_MSG_Q_NUM_MSGS_REQ;
  709.         pktReq.numMsgsReqTblIx = DIST_MSG_Q_ID_TO_TBL_IX (dMsgQId);
  710.         pktReq.numMsgsReqInqId = (uint32_t) inquiryId;
  711.         status = distNetSend (pObjNode->objNodeReside, (DIST_PKT *) &pktReq,
  712.                 DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_NUM_MSGS_REQ),
  713.                 WAIT_FOREVER, DIST_MSG_Q_NUM_MSGS_PRIO);
  714.         if (status == ERROR)
  715.             {
  716.             distInqCancel ((DIST_INQ *) &inquiryNode);
  717.             errnoSet (S_distLib_UNREACHABLE);
  718.             return (ERROR);
  719.             }
  720.         semTake (&(inquiryNode.numMsgsInqWait), overallTimeout);
  721.         numMsgs = inquiryNode.numMsgsInqNum;
  722.         distInqCancel ((DIST_INQ *) &inquiryNode);
  723.         return (numMsgs);
  724.         }
  725.     lclMsgQId = msgQDistTblGet (DIST_MSG_Q_ID_TO_TBL_IX(dMsgQId));
  726.     if (lclMsgQId == NULL)
  727.         return (ERROR);    /* does not exist */
  728.     return (msgQNumMsgs (lclMsgQId));
  729.     }
  730. /***************************************************************************
  731. *
  732. * msgQDistGetMapped - retrieve entry from distributed msgQ table (VxFusion option)
  733. *
  734. * This routine gets an entry from the distributed message queue table.
  735. *
  736. * AVAILABILITY
  737. * This routine is distributed as a component of the unbundled distributed
  738. * message queues option, VxFusion.
  739. *
  740. * RETURNS: MSG_Q_ID, or NULL.
  741. *
  742. * NOMANUAL
  743. */
  744. MSG_Q_ID msgQDistGetMapped
  745.     (
  746.     MSG_Q_ID msgQId     /* msgQ ID to map */
  747.     )
  748.     {
  749.     DIST_MSG_Q_ID     dMsgQId;
  750.     DIST_OBJ_NODE *   pObjNode;
  751.     TBL_IX            tblIx;
  752.     
  753.     if (DIST_OBJ_VERIFY (msgQId) == ERROR)
  754.         return (NULL);
  755.     pObjNode = MSG_Q_ID_TO_DIST_OBJ_NODE (msgQId);
  756.     if (! IS_DIST_MSG_Q_OBJ (pObjNode))
  757.         return (NULL); /* legal object id, but not a message queue */
  758.     if (! IS_DIST_OBJ_LOCAL (pObjNode))
  759.         return (NULL);
  760.     dMsgQId = (DIST_MSG_Q_ID) pObjNode->objNodeId;
  761.     if (IS_DIST_MSG_Q_TYPE_GRP (dMsgQId))
  762.         return (NULL);
  763.     tblIx = DIST_MSG_Q_ID_TO_TBL_IX (dMsgQId);
  764.     return (msgQDistTblGet (tblIx));
  765.     }
  766. /***************************************************************************
  767. *
  768. * msgQDistTblPut - put a message queue to the queue table (VxFusion option)
  769. *
  770. * This routine puts a MSG_Q_ID in the queue table.
  771. *
  772. * AVAILABILITY
  773. * This routine is distributed as a component of the unbundled distributed
  774. * message queues option, VxFusion.
  775. *
  776. * RETURNS: OK, if successful
  777. *
  778. * NOMANUAL
  779. */
  780. LOCAL STATUS msgQDistTblPut
  781.     (
  782.     MSG_Q_ID  msgQId,      /* ID to put in table */
  783.     TBL_IX *  pTblIx       /* where to return index in table */
  784.     )
  785.     {
  786.     TBL_NODE * pNode;
  787.     msgQDistTblLock();
  788.     pNode = (TBL_NODE *) sllGet (&msgQDistTblFreeList);
  789.     msgQDistTblUnlock();
  790.     if (pNode == NULL)
  791.         return (ERROR);    /* all elements of the table are in use */
  792.     pNode->tblMsgQId = msgQId;
  793.     *pTblIx = pNode->tblIx;
  794. #ifdef MSG_Q_DIST_REPORT
  795.     printf ("msgQDistTblPut: pTblNode %p (tblIx 0x%x), msgQId 0x%lxn", 
  796.             pNode, pNode->tblIx, (uint32_t) pNode->tblMsgQId);
  797. #endif
  798.     return (OK);
  799.     }
  800. #ifdef __SUPPORT_MSG_Q_DIST_DELETE
  801. /***************************************************************************
  802. *
  803. * msgQDistTblDelete - delete a message queue from the table (VxFusion option)
  804. *
  805. * This routine deletes in queue ID at table index <tblIx>.
  806. *
  807. * AVAILABILITY
  808. * This routine is distributed as a component of the unbundled distributed
  809. * message queues option, VxFusion.
  810. *
  811. * RETURNS: OK, if successfully deleted.
  812. *
  813. * NOMANUAL
  814. */
  815. LOCAL STATUS msgQDistTblDelete
  816.     (
  817.     TBL_IX tblIx      /* index in queue table */
  818.     )
  819.     {
  820.     TBL_NODE * pNode;
  821.     if (tblIx >=  msgQDistTblSize)
  822.         return(ERROR);    /* invalid argument */
  823.     pNode = &(pMsgQDistTbl[tblIx]);
  824.     msgQDistTblLock();
  825.     sllPutAtHead (&msgQDistTblFreeList, (SL_NODE *) pNode);
  826.     msgQDistTblUnlock();
  827.     return(OK);
  828.     }
  829. #endif /* __SUPPORT_MSG_Q_DIST_DELETE */
  830. /***************************************************************************
  831. *
  832. *  msgQDistTblGet - get message queue ID from table (VxFusion option)
  833. *
  834. * This routine takes a message queue table index, <tblIx>, and returns
  835. * the corresponding MSG_Q_ID, or NULL.
  836. *
  837. * AVAILABILITY
  838. * This routine is distributed as a component of the unbundled distributed
  839. * message queues option, VxFusion.
  840. *
  841. * RETURNS: MSG_Q_ID or NULL.
  842. *
  843. * NOMANUAL
  844. */
  845. LOCAL MSG_Q_ID msgQDistTblGet
  846.     (
  847.     TBL_IX tblIx            /* index in queue table */
  848.     )
  849.     {
  850.     if (tblIx >=  msgQDistTblSize)
  851.         return(NULL);    /* invalid argument */
  852. #ifdef MSG_Q_DIST_REPORT
  853.     printf ("msgQDistTblGet: tblIx 0x%x, msgQId 0x%lxn", 
  854.             tblIx, (uint32_t) pMsgQDistTbl[tblIx].tblMsgQId);
  855. #endif
  856.     return (pMsgQDistTbl[tblIx].tblMsgQId);
  857.     }
  858. /***************************************************************************
  859. *
  860. *  msgQDistInput - called everytime a new message arrives at the system (VxFusion option)
  861. *
  862. * This routine processes messages received by VxFusion.
  863. *
  864. * AVAILABILITY
  865. * This routine is distributed as a component of the unbundled distributed
  866. * message queues option, VxFusion.
  867. *
  868. * RETURNS: The status of message processing.
  869. *
  870. * NOMANUAL
  871. */
  872. LOCAL DIST_STATUS msgQDistInput
  873.     (
  874.     DIST_NODE_ID      nodeIdSrc,    /* source node ID */
  875.     DIST_TBUF_HDR *   pTBufHdr      /* ptr to the message */
  876.     )
  877.     {
  878.     DIST_PKT *   pPkt;
  879.     int          pktLen;
  880.             DIST_PKT_MSG_Q_SEND    pktSend;
  881.             DIST_INQ_ID            inqIdSrc;
  882.             MSG_Q_ID               msgQId;
  883.             char *                 buffer;
  884.             UINT                   nBytes;
  885.             int                    prio;
  886.             int                    ret;
  887.             int                    tid;
  888.             DIST_PKT_MSG_Q_RECV_REQ  pktReq;    /* incoming request packet */
  889.             UINT                     maxBytes;
  890.     pktLen = pTBufHdr->tBufHdrOverall;
  891.     if (pktLen < sizeof (DIST_PKT))
  892.         distPanic ("msgQDistInput: packet too shortn");
  893.     pPkt = (DIST_PKT *) ((DIST_TBUF_GET_NEXT (pTBufHdr))->pTBufData);
  894.     switch (pPkt->pktSubType)
  895.         {
  896.         case DIST_PKT_TYPE_MSG_Q_SEND:
  897.             {
  898.             /*
  899.              * Received a message from a remote sender.
  900.              * Find id of local message queue, and call msgQSend().
  901.              */
  902.             if (pktLen < DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_SEND))
  903.                 distPanic ("msgQDistInput/SEND: packet too shortn");
  904.             distTBufCopy (DIST_TBUF_GET_NEXT (pTBufHdr), 0,
  905.                           (char *) &pktSend,
  906.                           DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_SEND));
  907.             inqIdSrc = (DIST_INQ_ID) pktSend.sendInqId;
  908.             nBytes = pktLen - DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_SEND);
  909.             /*
  910.              * Using malloc() here is not very satisfiing. Maybe we can
  911.              * extend msgQLib with a routine, that directly sends a list
  912.              * of tBufs to a message queue.
  913.              */
  914.             if ((buffer = (char *) malloc (nBytes)) == NULL)
  915.                 {
  916. #ifdef MSG_Q_DIST_REPORT
  917.                 printf ("msgQDistInput/SEND: out of memoryn");
  918. #endif
  919.                 msgQDistSendStatus (nodeIdSrc, inqIdSrc,
  920.                                     MSG_Q_DIST_STATUS_NOT_ENOUGH_MEMORY);
  921.                 return (MSG_Q_DIST_STATUS_NOT_ENOUGH_MEMORY);
  922.                 }
  923.             distTBufCopy (DIST_TBUF_GET_NEXT (pTBufHdr),
  924.                           DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_SEND),
  925.                           buffer, nBytes);
  926.             prio = NET_PRIO_TO_DIST_MSG_Q_PRIO (pTBufHdr->tBufHdrPrio);
  927.             msgQId = msgQDistTblGet (pktSend.sendTblIx);
  928.             if (msgQId == NULL)
  929.                 {
  930.                 free (buffer);
  931. #ifdef MSG_Q_DIST_REPORT
  932.                 printf ("msgQDistInput/SEND: unknown message queue idn");
  933. #endif
  934.                 msgQDistSendStatus (nodeIdSrc, inqIdSrc,
  935.                                     MSG_Q_DIST_STATUS_ILLEGAL_OBJ_ID);
  936.                 return (MSG_Q_DIST_STATUS_ILLEGAL_OBJ_ID);
  937.                 }
  938.             /*
  939.              * First try to send with NO_WAIT.  We set the lastry
  940.              * argument to FALSE, because we will try again.
  941.              */
  942.             ret = msgQDistSendReply (nodeIdSrc, inqIdSrc, msgQId, buffer,
  943.                                      nBytes, NO_WAIT, prio, FALSE);
  944.             switch (ret)
  945.                 {
  946.                 case MSG_Q_DIST_STATUS_OK:
  947.                     free (buffer);
  948.                     return (MSG_Q_DIST_STATUS_OK);
  949.                 case MSG_Q_DIST_STATUS_ERROR:
  950.                     free (buffer);
  951.                     return (MSG_Q_DIST_STATUS_ERROR);
  952.                 case MSG_Q_DIST_STATUS_UNAVAIL:
  953.                     {
  954.                     int            timeout;
  955.                     timeout =
  956.                         DIST_MSEC_TO_TICKS (ntohl (pktSend.sendTimeout));
  957.                     if (timeout != NO_WAIT)
  958.                         {
  959. #ifdef MSG_Q_DIST_REPORT
  960.                         printf ("msgQDistInput/SEND: timeout = %dn", timeout);
  961. #endif
  962.                         /*
  963.                          * Send with NO_WAIT has failed and user
  964.                          * supplied timeout differs from NO_WAIT.
  965.                          * Spawn a task and wait on message queue.
  966.                          */
  967.                         
  968.                         tid = taskSpawn (NULL,
  969.                                          DIST_MSG_Q_WAIT_TASK_PRIO,
  970.                                          0,
  971.                                          DIST_MSG_Q_WAIT_TASK_STACK_SZ,
  972.                                          (FUNCPTR) msgQDistSendReply,
  973.                                          (int) nodeIdSrc,
  974.                                          (int) inqIdSrc,
  975.                                          (int) msgQId,
  976.                                          (int) buffer,
  977.                                          nBytes,
  978.                                          timeout,
  979.                                          prio,
  980.                                          TRUE,
  981.                                          0, 0);
  982.                         if (tid != ERROR)
  983.                             {
  984.                             /* msgQDistSendReply () frees <buffer> */
  985.                             return (MSG_Q_DIST_STATUS_OK);
  986.                             }
  987.                         }
  988.                     free (buffer);
  989.                     /* For this case where the user specified NO_WAIT
  990.                      * we must send back a status now.  We didn't do this
  991.                      * in msgQDistSendReply() because we didn't know if
  992.                      * the user specified NO_WAIT or whether it was the first
  993.                      * try before spawning a task. 
  994.                      */
  995.                     msgQDistSendStatus (nodeIdSrc, inqIdSrc, (INT16) ret);
  996.                     return (MSG_Q_DIST_STATUS_UNAVAIL);
  997.                     }
  998.                 default:
  999.                     free (buffer);
  1000. #ifdef MSG_Q_DIST_REPORT
  1001.                     printf ("msgQDistInput/SEND: illegal statusn");
  1002. #endif
  1003.                     return (MSG_Q_DIST_STATUS_INTERNAL_ERROR);
  1004.                 }
  1005.             }
  1006.         case DIST_PKT_TYPE_MSG_Q_RECV_REQ:
  1007.             {
  1008.             /*
  1009.              * A remote node requests to receive data from local queue.
  1010.              * Find id of local message queue, and call msgQReceive().
  1011.              */
  1012.             DIST_MSG_Q_STATUS        ret;
  1013.             if (pktLen != DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_RECV_REQ))
  1014.                 distPanic ("msgQDistInput/RECV_REQ: packet too shortn");
  1015.             distTBufCopy (DIST_TBUF_GET_NEXT (pTBufHdr), 0, (char *) &pktReq,
  1016.                           DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_RECV_REQ));
  1017.             inqIdSrc = (DIST_INQ_ID) pktReq.recvReqInqId;
  1018.             msgQId = msgQDistTblGet (pktReq.recvReqTblIx);
  1019.             if (msgQId == NULL)
  1020.                 {
  1021. #ifdef MSG_Q_DIST_REPORT
  1022.                 printf ("msgQDistInput/RECV_REQ: unknown message queue idn");
  1023. #endif
  1024.                 msgQDistSendStatus (nodeIdSrc, inqIdSrc,
  1025.                                     MSG_Q_DIST_STATUS_ILLEGAL_OBJ_ID);
  1026.                 return (MSG_Q_DIST_STATUS_ILLEGAL_OBJ_ID);
  1027.                 }
  1028.             maxBytes = ntohl ((uint32_t) pktReq.recvReqMaxNBytes);
  1029.             buffer = (char *) malloc (maxBytes);
  1030.             if (buffer == NULL)
  1031.                 {
  1032. #ifdef MSG_Q_DIST_REPORT
  1033.                 printf ("msgQDistInput/RECV_REQ: out of memoryn");
  1034. #endif
  1035.                 msgQDistSendStatus (nodeIdSrc, inqIdSrc,
  1036.                                     MSG_Q_DIST_STATUS_NOT_ENOUGH_MEMORY);
  1037.                 return (MSG_Q_DIST_STATUS_NOT_ENOUGH_MEMORY);
  1038.                 }
  1039.             /* First try to receive a message with NO_WAIT. */
  1040.             
  1041.             ret = msgQDistRecvReply (nodeIdSrc, inqIdSrc, msgQId,
  1042.                                      buffer, maxBytes, NO_WAIT,
  1043.                                      FALSE);
  1044.             switch (ret)
  1045.                 {
  1046.                 case MSG_Q_DIST_STATUS_OK:
  1047.                     free (buffer);
  1048.                     return (MSG_Q_DIST_STATUS_OK);
  1049.                 case MSG_Q_DIST_STATUS_ERROR:
  1050.                     free (buffer);
  1051.                     return (MSG_Q_DIST_STATUS_ERROR);
  1052.                 case MSG_Q_DIST_STATUS_UNAVAIL:
  1053.                     {
  1054.                     uint32_t    timeout_msec;
  1055.                     int            timeout;
  1056.                     timeout_msec = ntohl ((uint32_t) pktReq.recvReqTimeout);
  1057.                     timeout = DIST_MSEC_TO_TICKS (timeout_msec);
  1058.                     if (timeout != NO_WAIT)
  1059.                         {
  1060.                         /*
  1061.                          * Receiving with NO_WAIT has failed and user
  1062.                          * supplied timeout differs from NO_WAIT.
  1063.                          * Spawn a task and wait on message queue.
  1064.                          */
  1065.                         int tid;
  1066. #ifdef MSG_Q_DIST_REPORT
  1067.                         printf ("msgQDistInput/RECV_REQ: timeout = %dn",
  1068.                                 timeout);
  1069. #endif
  1070.                         tid = taskSpawn (NULL,
  1071.                                          DIST_MSG_Q_WAIT_TASK_PRIO,
  1072.                                          0,
  1073.                                          DIST_MSG_Q_WAIT_TASK_STACK_SZ,
  1074.                                         /* task entry point */
  1075.                                         (FUNCPTR) msgQDistRecvReply,
  1076.                                         /* who is the receiver */
  1077.                                         (int) nodeIdSrc,
  1078.                                         (int) inqIdSrc,
  1079.                                         /* receiving options */
  1080.                                         (int) msgQId,
  1081.                                         (int) buffer,
  1082.                                         maxBytes,
  1083.                                         timeout,
  1084.                                         /* some options */
  1085.                                         TRUE /* lastTry */,
  1086.                                         0, 0, 0);
  1087.                         if (tid != ERROR)
  1088.                             return (MSG_Q_DIST_STATUS_OK);
  1089.                         }
  1090.                     free (buffer);
  1091.                     /* For this case where the user specified NO_WAIT
  1092.                      * we must send back a status now.  We didn't do this
  1093.                      * in msgQDistRecvReply() because we didn't know if
  1094.                      * the user specified NO_WAIT or whether it was the first
  1095.                      * try before spawning a task. 
  1096.                      */
  1097.                     msgQDistSendStatus (nodeIdSrc, inqIdSrc, ret);
  1098.                     return (MSG_Q_DIST_STATUS_UNAVAIL);
  1099.                     }
  1100.                 default:
  1101. #ifdef MSG_Q_DIST_REPORT
  1102.                     printf ("msgQDistInput/SEND: illegal statusn");
  1103. #endif
  1104.                     free (buffer);
  1105.                     return (MSG_Q_DIST_STATUS_INTERNAL_ERROR);
  1106.                 }
  1107.             }
  1108.         case DIST_PKT_TYPE_MSG_Q_RECV_RPL:
  1109.             {
  1110.             DIST_PKT_MSG_Q_RECV_RPL    pktRpl;
  1111.             DIST_MSG_Q_RECV_INQ *      pInq;
  1112.             DIST_INQ_ID                inqId;
  1113.             int                        nBytes;
  1114.             if (pktLen < DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_RECV_RPL))
  1115.                 distPanic ("msgQDistInput/RECV_RPL: packet too shortn");
  1116.             /* First copy the reply header. */
  1117.             distTBufCopy (DIST_TBUF_GET_NEXT (pTBufHdr), 0,
  1118.                           (char *) &pktRpl,
  1119.                           DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_RECV_RPL));
  1120.             inqId = (DIST_INQ_ID) pktRpl.recvRplInqId;
  1121.             if (! (pInq = (DIST_MSG_Q_RECV_INQ *) distInqFind (inqId)))
  1122.                 return (MSG_Q_DIST_STATUS_LOCAL_TIMEOUT);
  1123.             /* Now copy message directly to user's buffer. */
  1124.             
  1125.             nBytes = distTBufCopy (DIST_TBUF_GET_NEXT (pTBufHdr),
  1126.                                    DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_RECV_RPL),
  1127.                                    pInq->pRecvInqBuffer, pInq->recvInqMaxNBytes);
  1128.             pInq->recvInqMaxNBytes = nBytes;
  1129.             pInq->recvInqMsgArrived = TRUE;
  1130.             semGive (&pInq->recvInqWait);
  1131.             return (MSG_Q_DIST_STATUS_OK);
  1132.             }
  1133.         case DIST_PKT_TYPE_MSG_Q_NUM_MSGS_REQ:
  1134.             {
  1135.             /*
  1136.              * Remote note requests numMsgs service from local queue.
  1137.              * Find id of local message queue, and call msgQNumMsgs().
  1138.              */
  1139.             DIST_PKT_MSG_Q_NUM_MSGS_REQ    pktReq;
  1140.             DIST_PKT_MSG_Q_NUM_MSGS_RPL    pktRpl;
  1141.             DIST_INQ_ID                    inqIdSrc;    /* remote inquiry id */
  1142.             MSG_Q_ID                       lclMsgQId;
  1143.             STATUS                         status;
  1144.             if (pktLen != DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_NUM_MSGS_REQ))
  1145.                 distPanic ("msgQDistInput/NUM_MSGS_REQ: packet too shortn");
  1146.             distTBufCopy (DIST_TBUF_GET_NEXT (pTBufHdr), 0,
  1147.                           (char *) &pktReq,
  1148.                           DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_NUM_MSGS_REQ));
  1149.             inqIdSrc = (DIST_INQ_ID) pktReq.numMsgsReqInqId;
  1150.             lclMsgQId = msgQDistTblGet (pktReq.numMsgsReqTblIx);
  1151.             if (lclMsgQId == NULL)
  1152.                 {
  1153. #ifdef MSG_Q_DIST_REPORT
  1154.                 printf ("msgQDistInput/RECV_REQ: unknown message queue idn");
  1155. #endif
  1156.                 msgQDistSendStatus (nodeIdSrc, inqIdSrc,
  1157.                                     MSG_Q_DIST_STATUS_ILLEGAL_OBJ_ID);
  1158.                 return (MSG_Q_DIST_STATUS_ILLEGAL_OBJ_ID);
  1159.                 }
  1160.             pktRpl.numMsgsRplHdr.pktType = DIST_PKT_TYPE_MSG_Q;
  1161.             pktRpl.numMsgsRplHdr.pktSubType = DIST_PKT_TYPE_MSG_Q_NUM_MSGS_RPL;
  1162.             pktRpl.numMsgsRplInqId = (uint32_t) inqIdSrc;
  1163.             pktRpl.numMsgsRplNum = htonl (msgQNumMsgs (lclMsgQId));
  1164.             status = distNetSend (nodeIdSrc, (DIST_PKT *) &pktRpl,
  1165.                                   DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_NUM_MSGS_RPL),
  1166.                                   WAIT_FOREVER, DIST_MSG_Q_NUM_MSGS_PRIO);
  1167.             if (status == ERROR)
  1168.                 {
  1169. #ifdef DIST_DIAGNOSTIC
  1170.                 distLog ("msgQDistInput: reply to NumMsgsReq failedn");
  1171. #endif
  1172.                 return (MSG_Q_DIST_STATUS_UNREACH);
  1173.                 }
  1174.             return (MSG_Q_DIST_STATUS_OK);
  1175.             }
  1176.         case DIST_PKT_TYPE_MSG_Q_NUM_MSGS_RPL:
  1177.             {
  1178.             DIST_PKT_MSG_Q_NUM_MSGS_RPL    pktRpl;
  1179.             DIST_MSG_Q_NUM_MSGS_INQ *      pInq;
  1180.             DIST_INQ_ID                    inqId;
  1181.             if (pktLen != DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_NUM_MSGS_RPL))
  1182.                 distPanic ("msgQDistInput/NUM_MSGS_RPL: packet too shortn");
  1183.             distTBufCopy (DIST_TBUF_GET_NEXT (pTBufHdr), 0,
  1184.                     (char *) &pktRpl,
  1185.                     DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_NUM_MSGS_RPL));
  1186.             inqId = (DIST_INQ_ID) pktRpl.numMsgsRplInqId;
  1187.             if (! (pInq = (DIST_MSG_Q_NUM_MSGS_INQ *) distInqFind (inqId)))
  1188.                 return (MSG_Q_DIST_STATUS_LOCAL_TIMEOUT);
  1189.             pInq->numMsgsInqNum = (int) ntohl (pktRpl.numMsgsRplNum);
  1190.             semGive (&(pInq->numMsgsInqWait));
  1191.             return (MSG_Q_DIST_STATUS_OK);
  1192.             }
  1193.         case DIST_PKT_TYPE_MSG_Q_STATUS:
  1194.             {
  1195.             DIST_PKT_MSG_Q_STATUS    pktStatus;
  1196.             DIST_MSG_Q_STATUS        msgQStatus;
  1197.             DIST_INQ_ID              inqId;
  1198.             DIST_INQ *               pGenInq;
  1199.             int                      errnoRemote;
  1200.             if (pktLen != DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_STATUS))
  1201.                 distPanic ("msgQDistInput/STATUS: packet too shortn");
  1202.             /* First copy the error packet form the TBuf list. */
  1203.             
  1204.             distTBufCopy (DIST_TBUF_GET_NEXT (pTBufHdr), 0,
  1205.                           (char *) &pktStatus,
  1206.                            DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_STATUS));
  1207.             msgQStatus = (DIST_MSG_Q_STATUS) ntohs (pktStatus.statusDStatus);
  1208.             errnoRemote = ntohl (pktStatus.statusErrno);
  1209.             inqId = (DIST_INQ_ID) pktStatus.statusInqId;
  1210.             if (! (pGenInq = distInqFind (inqId)))
  1211.                 return (MSG_Q_DIST_STATUS_LOCAL_TIMEOUT);
  1212.             /* See who is addressed by the STATUS telegram. */
  1213.             
  1214.             switch (pGenInq->inqType)
  1215.                 {
  1216.                 case DIST_MSG_Q_INQ_TYPE_NUM_MSGS:
  1217.                     {
  1218.                     DIST_MSG_Q_NUM_MSGS_INQ    *pInq;
  1219.                     pInq = (DIST_MSG_Q_NUM_MSGS_INQ *) pGenInq;
  1220.                     /*
  1221.                      * Possible errors are:
  1222.                      *    MSG_Q_DIST_STATUS_ILLEGAL_OBJ_ID
  1223.                      */
  1224.                      
  1225.                     switch (msgQStatus)
  1226.                         {
  1227.                         case MSG_Q_DIST_STATUS_ILLEGAL_OBJ_ID:
  1228.                             errnoOfTaskSet (pInq->numMsgsInqTask,
  1229.                                             errnoRemote);
  1230.                             break;
  1231.                         default:
  1232. #ifdef MSG_Q_DIST_REPORT
  1233.                             printf ("msgQDistInput/STATUS/NUM_MSGS: status?n");
  1234. #endif
  1235.                             break;
  1236.                         }
  1237.                     semGive (&pInq->numMsgsInqWait);
  1238.                     return (MSG_Q_DIST_STATUS_OK);
  1239.                     }
  1240.                 case DIST_MSG_Q_INQ_TYPE_SEND:
  1241.                     {
  1242.                     DIST_MSG_Q_SEND_INQ * pInq;
  1243.                     pInq = (DIST_MSG_Q_SEND_INQ *) pGenInq;
  1244.                     /*
  1245.                      * Possible errors here:
  1246.                      *    MSG_Q_DIST_STATUS_OK
  1247.                      *    MSG_Q_DIST_STATUS_ERROR
  1248.                      *    MSG_Q_DIST_STATUS_UNAVAIL
  1249.                      *    MSG_Q_DIST_STATUS_TIMEOUT
  1250.                      *     MSG_Q_DIST_STATUS_ILLEGAL_OBJ_ID
  1251.                      *    MSG_Q_DIST_STATUS_NOT_ENOUGH_MEMORY
  1252.                      */
  1253.                     switch (msgQStatus)
  1254.                         {
  1255.                         case MSG_Q_DIST_STATUS_OK:
  1256.                             pInq->sendInqMsgQueued = TRUE;
  1257.                             break;
  1258.                         case MSG_Q_DIST_STATUS_ERROR:
  1259.                         case MSG_Q_DIST_STATUS_UNAVAIL:
  1260.                         case MSG_Q_DIST_STATUS_TIMEOUT:
  1261.                             pInq->remoteError = TRUE;
  1262.                             errnoOfTaskSet (pInq->sendInqTask, errnoRemote);
  1263.                             break;
  1264.                         case MSG_Q_DIST_STATUS_ILLEGAL_OBJ_ID:
  1265.                             pInq->remoteError = TRUE;    
  1266.                             errnoOfTaskSet (pInq->sendInqTask,
  1267.                                             S_distLib_OBJ_ID_ERROR);
  1268.                             break;
  1269.                         case MSG_Q_DIST_STATUS_NOT_ENOUGH_MEMORY:
  1270.                             pInq->remoteError = TRUE;
  1271.                             errnoOfTaskSet (pInq->sendInqTask,
  1272.                                          S_msgQDistLib_RMT_MEMORY_SHORTAGE);
  1273.                             break;
  1274.                         default:
  1275. #ifdef MSG_Q_DIST_REPORT
  1276.                             printf ("msgQDistInput/STATUS/SEND: status?n");
  1277. #endif
  1278.                             break;
  1279.                         }
  1280.                     semGive (&pInq->sendInqWait);
  1281.                     return (MSG_Q_DIST_STATUS_OK);
  1282.                     }
  1283.                 case DIST_MSG_Q_INQ_TYPE_RECV:
  1284.                     {
  1285.                     DIST_MSG_Q_RECV_INQ * pInq;
  1286.                     
  1287.                     pInq = (DIST_MSG_Q_RECV_INQ *) pGenInq;
  1288.                     /*
  1289.                      * Possible errors here:
  1290.                      *    MSG_Q_DIST_STATUS_ERROR
  1291.                      *    MSG_Q_DIST_STATUS_UNAVAIL
  1292.                      *    MSG_Q_DIST_STATUS_TIMEOUT
  1293.                      *    MSG_Q_DIST_STATUS_ILLEGAL_OBJ_ID
  1294.                      *    MSG_Q_DIST_STATUS_NOT_ENOUGH_MEMORY
  1295.                      */
  1296.                     switch (msgQStatus)
  1297.                         {
  1298.                         case MSG_Q_DIST_STATUS_UNAVAIL:
  1299.                         case MSG_Q_DIST_STATUS_TIMEOUT:
  1300.                         case MSG_Q_DIST_STATUS_ERROR:
  1301.                             pInq->remoteError = TRUE;
  1302.                             errnoOfTaskSet (pInq->recvInqTask, errnoRemote);
  1303.                             break;
  1304.                         case MSG_Q_DIST_STATUS_ILLEGAL_OBJ_ID:
  1305.                             pInq->remoteError = TRUE;
  1306.                             errnoOfTaskSet (pInq->recvInqTask,
  1307.                                             S_distLib_OBJ_ID_ERROR);
  1308.                             break;
  1309.                         case MSG_Q_DIST_STATUS_NOT_ENOUGH_MEMORY:
  1310.                             pInq->remoteError = TRUE;
  1311.                             errnoOfTaskSet (pInq->recvInqTask,
  1312.                                           S_msgQDistLib_RMT_MEMORY_SHORTAGE);
  1313.                             break;
  1314.                         default:
  1315. #ifdef MSG_Q_DIST_REPORT
  1316.                             printf ("msgQDistInput/STATUS/RECV_REQ: status?n");
  1317. #endif
  1318.                             break;
  1319.                         }
  1320.                     semGive (&pInq->recvInqWait);
  1321.                     return (MSG_Q_DIST_STATUS_OK);
  1322.                     }
  1323.                 default:
  1324. #ifdef MSG_Q_DIST_REPORT
  1325.                     printf ("msgQDistInput/STATUS: unexpected inquiry (%d)n",
  1326.                             pGenInq->inqType);
  1327. #endif
  1328.                     return (MSG_Q_DIST_STATUS_INTERNAL_ERROR);
  1329.                 }
  1330.             }
  1331.         default:
  1332. #ifdef MSG_Q_DIST_REPORT
  1333.             printf ("msgQDistInput: unknown message queue subtype (%d)n",
  1334.                     pPkt->pktSubType);
  1335. #endif
  1336.             return (MSG_Q_DIST_STATUS_PROTOCOL_ERROR);
  1337.         }
  1338.     }
  1339. /***************************************************************************
  1340. *
  1341. * msgQDistSendReply - send message to message queue and respond (VxFusion option)
  1342. *
  1343. * This routine is used internally to do a msgQSend().
  1344. *
  1345. * AVAILABILITY
  1346. * This routine is distributed as a component of the unbundled distributed
  1347. * message queues option, VxFusion.
  1348. *
  1349. * RETURNS: Status of message processsing.
  1350. *
  1351. * NOMANUAL
  1352. */
  1353. LOCAL DIST_MSG_Q_STATUS msgQDistSendReply
  1354.     (
  1355.     DIST_NODE_ID        nodeIdSender,   /* sending node ID */
  1356.     DIST_INQ_ID         inqIdSender,    /* sending inquiry ID */
  1357.     MSG_Q_ID            msgQId,         /* ID destination Q */
  1358.     char *              buffer,         /* start of data to send */
  1359.     UINT                nBytes,         /* number of bytes to send */
  1360.     int                 timeout,        /* msgQSend() timeout */
  1361.     int                 priority,       /* message priority */
  1362.     BOOL                lastTry         /* clean-up if this is last attempt */
  1363.     )
  1364.     {
  1365.     STATUS               status;
  1366.     DIST_MSG_Q_STATUS    msgQStatus = MSG_Q_DIST_STATUS_OK;
  1367.     status = msgQSend (msgQId, buffer, nBytes, timeout, priority);
  1368.     if (status == ERROR)
  1369.         {
  1370.         switch (errnoGet())
  1371.             {
  1372.             case S_objLib_OBJ_UNAVAILABLE:    /* timeout == NO_WAIT */
  1373.                 msgQStatus = MSG_Q_DIST_STATUS_UNAVAIL;
  1374.                 break;
  1375.             case S_objLib_OBJ_TIMEOUT:        /* timeout != NO_WAIT */
  1376.                 msgQStatus = MSG_Q_DIST_STATUS_TIMEOUT;
  1377.                 break;
  1378.             default:
  1379.                 msgQStatus = MSG_Q_DIST_STATUS_ERROR;
  1380.             }
  1381.         }
  1382.     
  1383.     if (lastTry)
  1384.         {
  1385.         free (buffer);
  1386.         msgQDistSendStatus (nodeIdSender, inqIdSender, msgQStatus);
  1387.         }
  1388.     else
  1389.         {
  1390.         if (msgQStatus != MSG_Q_DIST_STATUS_UNAVAIL)
  1391.             msgQDistSendStatus (nodeIdSender, inqIdSender, msgQStatus);
  1392.         }
  1393.     return (msgQStatus);
  1394.     }
  1395. /***************************************************************************
  1396. *
  1397. * msgQDistRecvReply - receive message from message queue and respond (VxFusion option)
  1398. *
  1399. * This routine is used internally to call msgQReceive().
  1400. *
  1401. * AVAILABILITY
  1402. * This routine is distributed as a component of the unbundled distributed
  1403. * message queues option, VxFusion.
  1404. *
  1405. * RETURNS: Status of operation.
  1406. *
  1407. * NOMANUAL
  1408. */
  1409. LOCAL DIST_MSG_Q_STATUS msgQDistRecvReply
  1410.     (
  1411.     DIST_NODE_ID    nodeIdRespond,   /* node wanting the message */
  1412.     DIST_INQ_ID     inqIdRespond,    /* inquiry ID of that node */
  1413.     MSG_Q_ID        msgQId,          /* message Q to read */
  1414.     char *          buffer,          /* start of buffer for data */
  1415.     UINT            maxNBytes,       /* maximum number of bytes to read */
  1416.     int             timeout,         /* timeout for msgQReceive() */
  1417.     BOOL            lastTry          /* clean-up, if last attempt */
  1418.     )
  1419.     {
  1420.     DIST_PKT_MSG_Q_RECV_RPL  pktRpl;
  1421.     DIST_MSG_Q_STATUS        msgQStatus;
  1422.     DIST_IOVEC               distIOVec[2];
  1423.     STATUS                   status;
  1424.     int                      nBytes;
  1425.     nBytes = msgQReceive (msgQId, buffer, maxNBytes, timeout);
  1426.     if (nBytes == ERROR)
  1427.         {
  1428.         switch (errnoGet())
  1429.             {
  1430.             case S_objLib_OBJ_UNAVAILABLE:    /* timeout == NO_WAIT */
  1431.                 msgQStatus = MSG_Q_DIST_STATUS_UNAVAIL;
  1432.                 break;
  1433.             case S_objLib_OBJ_TIMEOUT:        /* timeout != NO_WAIT */
  1434.                 msgQStatus = MSG_Q_DIST_STATUS_TIMEOUT;
  1435.                 break;
  1436.             default:
  1437.                 msgQStatus = MSG_Q_DIST_STATUS_ERROR;
  1438.             }
  1439.         if (lastTry)
  1440.             {
  1441.             free(buffer);
  1442.             msgQDistSendStatus (nodeIdRespond, inqIdRespond, msgQStatus);
  1443.             }
  1444.         else
  1445.             {
  1446.                 if (msgQStatus != MSG_Q_DIST_STATUS_UNAVAIL)
  1447.                     msgQDistSendStatus (nodeIdRespond,
  1448.                                         inqIdRespond,msgQStatus);
  1449.             }
  1450.         return (msgQStatus);
  1451.         }
  1452.     /* We have received a message, now respond to the request. */
  1453.     
  1454.     pktRpl.recvRplHdr.pktType = DIST_PKT_TYPE_MSG_Q;
  1455.     pktRpl.recvRplHdr.pktSubType = DIST_PKT_TYPE_MSG_Q_RECV_RPL;
  1456.     pktRpl.recvRplInqId = (uint32_t) inqIdRespond;
  1457.     distIOVec[0].pIOBuffer = &pktRpl;
  1458.     distIOVec[0].IOLen = DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_RECV_RPL);
  1459.     distIOVec[1].pIOBuffer = buffer;
  1460.     distIOVec[1].IOLen = nBytes;
  1461.     status = distNetIOVSend (nodeIdRespond, &distIOVec[0], 2, WAIT_FOREVER,
  1462.             DIST_MSG_Q_RECV_PRIO);
  1463.     if (status == ERROR)
  1464.         {
  1465.         STATUS status;
  1466.         status = msgQSend (msgQId, buffer, nBytes, NO_WAIT, MSG_PRI_URGENT);
  1467.         if (status == ERROR)
  1468.             {
  1469.             /* XXX TODO this one can also fail; panic for now */
  1470.             distPanic ("msgQDistRecvReply: msgQSend failedn");
  1471.             }
  1472. #ifdef MSG_Q_DIST_REPORT
  1473.         printf ("msgQDistRecvReply: remote node is unreachablen");
  1474. #endif
  1475.         if (lastTry)
  1476.             free (buffer);
  1477.         return (MSG_Q_DIST_STATUS_UNREACH);
  1478.         }
  1479. #ifdef MSG_Q_DIST_REPORT
  1480.     printf ("msgQDistRecvReply: received a message; forwarded it to remoten");
  1481. #endif
  1482.     if (lastTry)
  1483.         free (buffer);
  1484.     return (MSG_Q_DIST_STATUS_OK);    /* packet already acknowledged */
  1485.     }
  1486. /***************************************************************************
  1487. *
  1488. * msgQDistSendStatus - send status and errno (VxFusion option)
  1489. *
  1490. * This routine sends operation status and errno inform to a remote node.
  1491. *
  1492. * AVAILABILITY
  1493. * This routine is distributed as a component of the unbundled distributed
  1494. * message queues option, VxFusion.
  1495. *
  1496. * RETURNS: N/A
  1497. *
  1498. * NOMANUAL
  1499. */
  1500. LOCAL STATUS msgQDistSendStatus
  1501.     (
  1502.     DIST_NODE_ID        nodeIdDest,   /* the node to send status to */
  1503.     DIST_INQ_ID         inqId,        /* inquiry ID of that node */
  1504.     DIST_MSG_Q_STATUS   msgQStatus    /* the status to send */
  1505.     )
  1506.     {
  1507.     DIST_PKT_MSG_Q_STATUS    pktStatus;
  1508.     STATUS                   status;
  1509.     pktStatus.statusHdr.pktType = DIST_PKT_TYPE_MSG_Q;
  1510.     pktStatus.statusHdr.pktSubType = DIST_PKT_TYPE_MSG_Q_STATUS;
  1511.     pktStatus.statusInqId = (uint32_t) inqId;
  1512.     pktStatus.statusErrno = htonl ((uint32_t) errnoGet());
  1513.     pktStatus.statusDStatus = htons ((uint16_t) msgQStatus);
  1514.     status = distNetSend (nodeIdDest, (DIST_PKT *) &pktStatus,
  1515.             DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_STATUS), WAIT_FOREVER,
  1516.             DIST_MSG_Q_ERROR_PRIO);
  1517.     return (status);
  1518.     }