msgQDistGrpLib.c
上传用户:baixin
上传日期:2008-03-13
资源大小:4795k
文件大小:56k
开发平台:

MultiPlatform

  1. /* msgQDistGrpLib.c - distributed message queue group library (VxFusion option) */
  2. /* Copyright 1999-2002 Wind River Systems, Inc. */
  3. /*
  4. modification history
  5. --------------------
  6. 01q,23oct01,jws  fix compiler warnings (SPR 71117); fix man pages (SPR 71239)
  7. 01p,24jul01,r_s  changed code to be ANSI compatible so that it compiles with
  8.                  diab. made asm macro changes for diab
  9. 01o,09jun99,drm  Allowing check for empty groups.
  10. 01n,09jun99,drm  Adding some htonl() / ntohl()
  11. 01m,09jun99,drm  Adding code to return S_msgQDistLib_OVERALL_TIMEOUT
  12. 01l,01jun99,drm  Adding check to make sure that only distributed message
  13.                  queues (and not groups) can be added to a group.
  14. 01k,24may99,drm  added vxfusion prefix to VxFusion related includes
  15. 01j,23feb99,wlf  doc edits
  16. 01i,18feb99,wlf  doc cleanup
  17. 01h,29oct98,drm  documentation modifications
  18. 01g,09oct98,drm  fixed a bug reported by Oce
  19. 01f,13may98,ur   cleanup when msgQDistGrpInit() fails
  20.                  locking on group send inquiries
  21. 01e,08may98,ur   removed 8 bit node id restriction
  22. 01d,08apr98,ur   optional enhancement CHECK_FOR_EMPTY_GROUP--untested
  23. 01c,20mar98,ur   set errno when database is full
  24. 01b,01jul97,ur   tested, ok.
  25. 01a,11jun97,ur   written.
  26. */
  27. /*
  28. DESCRIPTION
  29. This library provides the grouping facility for distributed message queues.  
  30. Single distributed message queues can join one or more groups.  A message
  31. sent to a group is sent to all message queues that are members of that
  32. group.  A group, however, is prohibited from sending messages.  Also, it is
  33. an error to call msgQDistNumMsgs() with a distributed message queue group ID.
  34. Groups are created with symbolic names and identified by a unique ID,
  35.  MSG_Q_ID, as with normal message queues.
  36. If the group is new to the distributed system, the group agreement 
  37. protocol (GAP) is employed to determine a globally unique identifier.  
  38. As part of the protocol's negotiation, all group databases throughout 
  39. the system are updated.
  40. The distributed message queue group library is initialized by calling
  41. distInit().
  42. AVAILABILITY
  43. This module is distributed as a component of the unbundled distributed
  44. message queues option, VxFusion.
  45. INCLUDE FILES: msgQDistGrpLib.h
  46. SEE ALSO: distLib, msgQDistGrpShow
  47. */
  48. #include "vxWorks.h"
  49. #if defined (MSG_Q_DIST_GRP_REPORT) || defined (DIST_DIAGNOSTIC)
  50. #include "stdio.h"
  51. #endif
  52. #include "string.h"
  53. #include "stdlib.h"
  54. #include "taskLib.h"
  55. #include "hashLib.h"
  56. #include "sllLib.h"
  57. #include "errnoLib.h"
  58. #include "msgQLib.h"
  59. #include "netinet/in.h"
  60. #include "vxfusion/msgQDistLib.h"
  61. #include "vxfusion/msgQDistGrpLib.h"
  62. #include "vxfusion/distIfLib.h"
  63. #include "vxfusion/distStatLib.h"
  64. #include "vxfusion/private/msgQDistGrpLibP.h"
  65. #include "vxfusion/private/distNetLibP.h"
  66. #include "vxfusion/private/distNodeLibP.h"
  67. #include "vxfusion/private/distObjLibP.h"
  68. #include "vxfusion/private/distLibP.h"
  69. /* defines */
  70. #define UNUSED_ARG(x)  if(sizeof(x)) {} /* to suppress compiler warnings */
  71. #define KEY_ARG_STR        13
  72. #define KEY_CMP_ARG_STR    0       /* not used */
  73. #define KEY_ARG_ID        65537
  74. #define KEY_CMP_ARG_ID    0        /* not used */
  75. /* global data */
  76. SEMAPHORE                    distGrpDbSemaphore;
  77. DIST_MSG_Q_GRP_ID            distGrpIdNext = 0;
  78. /* local data */
  79. LOCAL HASH_ID                distGrpDbNmId = NULL;
  80. LOCAL DIST_GRP_HASH_NODE *   distGrpDbNm = NULL;
  81. LOCAL HASH_ID                distGrpDbIdId = NULL;
  82. LOCAL DIST_GRP_HASH_NODE *   distGrpDbId = NULL;
  83. LOCAL DIST_GRP_DB_NODE *     distGrpDb = NULL;
  84. LOCAL SL_LIST                msgQDistGrpFreeList;
  85. LOCAL BOOL                   msgQDistGrpLibInstalled = FALSE;
  86. /* local prototypes */
  87. LOCAL BOOL          msgQDistGrpHCmpStr (DIST_GRP_HASH_NODE *pMatchNode,
  88.                                         DIST_GRP_HASH_NODE *pHNode,
  89.                                         int keyArg);
  90. LOCAL INT32         msgQDistGrpHFuncStr (int elements,
  91.                                          DIST_GRP_HASH_NODE *pHNode,
  92.                                          int keyArg);
  93. LOCAL BOOL          msgQDistGrpHCmpId (DIST_GRP_HASH_NODE *pMatchNode,
  94.                                        DIST_GRP_HASH_NODE *pHNode,
  95.                                        int keyArg);
  96. LOCAL INT32         msgQDistGrpHFuncId (int elements,
  97.                                         DIST_GRP_HASH_NODE *pHNode,
  98.                                         int keyArg);
  99. LOCAL DIST_STATUS    msgQDistGdbInput (DIST_NODE_ID nodeIdSrc,
  100.                                        DIST_TBUF_HDR *pTBufHdr);
  101. LOCAL DIST_STATUS    msgQDistGrpInput (DIST_NODE_ID nodeIdSrc,
  102.                                        DIST_TBUF_HDR *pTBufHdr);
  103. LOCAL STATUS         msgQDistGrpLclSend (DIST_MSG_Q_GRP_SEND_INQ *pInq,
  104.                                          DIST_MSG_Q_GRP_ID distMsgQGrpId,
  105.                                          char *buffer,
  106.                                          UINT nBytes, int timeout,
  107.                                          int priority);
  108. LOCAL void           msgQDistGrpLclSendCanWait (DIST_INQ_ID inqId,
  109.                                                 MSG_Q_ID msgQId,
  110.                                                 char *buffer, UINT nBytes,
  111.                                                 int timeout, int priority);
  112. LOCAL STATUS         msgQDistGrpAgent (DIST_NODE_ID nodeIdSender,
  113.                                        DIST_INQ_ID inqIdSender,
  114.                                        DIST_MSG_Q_GRP_ID distMsgQGrpId,
  115.                                        char *buffer,
  116.                                        UINT nBytes, int timeout,
  117.                                        int priority);
  118. LOCAL STATUS         msgQDistGrpSendStatus (DIST_NODE_ID nodeIdDest,
  119.                                             DIST_INQ_ID inqId,
  120.                                             DIST_MSG_Q_STATUS msgQStatus);
  121. LOCAL BOOL           msgQDistGrpBurstOne (DIST_GRP_HASH_NODE *pNode,
  122.                                           DIST_GRP_BURST *pBurst);
  123. /***************************************************************************
  124. *
  125. * msgQDistGrpLibInit - initialize the distributed message queue group package (VxFusion option)
  126. *
  127. * This routine currently does nothing.
  128. *
  129. * AVAILABILITY
  130. * This routine is distributed as a component of the unbundled distributed
  131. * message queues option, VxFusion.
  132. *
  133. * RETURNS: N/A
  134. *
  135. * NOMANUAL
  136. */
  137. void msgQDistGrpLibInit (void)
  138.     {
  139.     }
  140. /***************************************************************************
  141. *
  142. * msgQDistGrpInit - initialize the group database (VxFusion option)
  143. *
  144. * This routine initializes the group database.
  145. *
  146. * AVAILABILITY
  147. * This routine is distributed as a component of the unbundled distributed
  148. * message queues option, VxFusion.
  149. *
  150. * RETURNS: OK, if successful; ERROR, if unsucessful.
  151. *
  152. * NOMANUAL
  153. */
  154. STATUS msgQDistGrpInit
  155.     (
  156.     int               sizeLog2   /* init 2^^sizeLog2 entries */
  157.     )
  158.     {
  159.     DIST_GRP_DB_NODE * pDbNode;
  160.     int                hashTblSizeLog2;
  161.     int                grpHNBytes;
  162.     int                grpDbNBytes;
  163.     int                grpDbSize;
  164.     int                ix;
  165.     STATUS             grpDbServAddStatus;
  166.     STATUS             msgQGrpServAddStatus;
  167.     if (sizeLog2 < 1)
  168.         return (ERROR);
  169.     if (msgQDistGrpLibInstalled == TRUE)
  170.         return (OK);
  171.     if (hashLibInit () == ERROR)
  172.         return (ERROR); /* hashLibInit() failed */
  173.     if (distInqInit (DIST_INQ_HASH_TBL_SZ_LOG2) == ERROR)
  174.         return (ERROR);
  175.     hashTblSizeLog2 = sizeLog2 - 1;
  176.     distGrpDbNmId = hashTblCreate (hashTblSizeLog2, msgQDistGrpHCmpStr,
  177.                                    msgQDistGrpHFuncStr, KEY_ARG_STR);
  178.     distGrpDbIdId = hashTblCreate (hashTblSizeLog2, msgQDistGrpHCmpId,
  179.                                    msgQDistGrpHFuncId, KEY_ARG_ID);
  180.     if (distGrpDbNmId && distGrpDbIdId)
  181.         {
  182.         grpDbSize = 1 << sizeLog2;
  183.         grpHNBytes = grpDbSize * sizeof (DIST_GRP_HASH_NODE);
  184.         grpDbNBytes = grpDbSize * sizeof (DIST_GRP_DB_NODE);
  185.         distGrpDbNm = (DIST_GRP_HASH_NODE *) malloc (grpHNBytes);
  186.         distGrpDbId = (DIST_GRP_HASH_NODE *) malloc (grpHNBytes);
  187.         distGrpDb = (DIST_GRP_DB_NODE *) malloc (grpDbNBytes);
  188.         if (distGrpDbNm && distGrpDbId && distGrpDb)
  189.             {
  190.             sllInit (&msgQDistGrpFreeList);
  191.             for (ix = 0; ix < grpDbSize; ix++)
  192.                 {
  193.                 pDbNode = &distGrpDb[ix];
  194.                 pDbNode->ixNode = ix;
  195.                 sllPutAtHead (&msgQDistGrpFreeList, (SL_NODE *) pDbNode);
  196.                 distGrpDbNm[ix].pDbNode = pDbNode;
  197.                 distGrpDbId[ix].pDbNode = pDbNode;
  198.                 }
  199.             msgQDistGrpDbLockInit();
  200.             /* Add distributed group database service to table of services. */
  201.             grpDbServAddStatus = distNetServAdd (
  202.                     DIST_PKT_TYPE_DGDB,
  203.                     msgQDistGdbInput,
  204.                     DIST_DGDB_SERV_NAME,
  205.                     DIST_DGDB_SERV_NET_PRIO,
  206.                     DIST_DGDB_SERV_TASK_PRIO,
  207.                     DIST_DGDB_SERV_TASK_STACK_SZ);
  208.             /* Add message group service to table of services. */
  209.             
  210.             msgQGrpServAddStatus =
  211.                 distNetServAdd ( DIST_PKT_TYPE_MSG_Q_GRP,
  212.                                  msgQDistGrpInput,
  213.                                  DIST_MSG_Q_GRP_SERV_NAME,
  214.                                  DIST_MSG_Q_GRP_SERV_NET_PRIO,
  215.                                  DIST_MSG_Q_GRP_SERV_TASK_PRIO,
  216.                                  DIST_MSG_Q_GRP_SERV_TASK_STACK_SZ);
  217.             if (grpDbServAddStatus != ERROR && msgQGrpServAddStatus != ERROR)
  218.                 {
  219.                 msgQDistGrpLibInstalled = TRUE;
  220.                 return (OK);
  221.                 }
  222.             }
  223.         }
  224.         /* cleanup, when error */
  225.         if (distGrpDbNmId)    hashTblDelete (distGrpDbNmId);
  226.         if (distGrpDbIdId)    hashTblDelete (distGrpDbIdId);
  227.         if (distGrpDbNm)      free (distGrpDbNm);
  228.         if (distGrpDbId)      free (distGrpDbId);
  229.         if (distGrpDb)        free (distGrpDb);
  230.         return (ERROR);
  231.     }
  232. /***************************************************************************
  233. *
  234. * msgQDistGrpAdd - add a distributed message queue to a group (VxFusion option)
  235. *
  236. * This routine adds the queue identified by the argument <msgQId> to a group
  237. * with the ASCII name specified by the argument <distGrpName>. 
  238. *
  239. * Multicasting is based on distributed message queue groups.  If the group 
  240. * does not exist, one is created.  Any number of message queues from different 
  241. * nodes can be bound to a single group. In addition, a message queue can
  242. * be added into any number of groups; msgQDistGrpAdd() must be called for each
  243. * group of which the message queue is to be a member.
  244. *
  245. * The <options> parameter is presently unused and must be set to 0.
  246. *
  247. * This routine returns a message queue ID, MSG_Q_ID, that can be used directly 
  248. * by msgQDistSend() or by the generic msgQSend() routine.  Do not call the
  249. * msgQReceive() or msgQNumMsgs() routines or their distributed counterparts,
  250. * msgQDistReceive() and msgQDistNumMsgs(), with a group message queue ID.
  251. *
  252. * As with msgQDistCreate(), use distNameAdd() to add the group message 
  253. * queue ID returned by this routine to the distributed name database so 
  254. * that the ID can be used by tasks on other nodes.
  255. *
  256. * AVAILABILITY
  257. * This routine is distributed as a component of the unbundled distributed
  258. * message queues option, VxFusion.
  259. *
  260. * RETURNS: MSG_Q_ID, or NULL if there is an error.
  261. *
  262. * ERRNO:
  263. * is
  264. * i S_msgQDistGrpLib_NAME_TOO_LONG
  265. * The name of the group is too long.
  266. * i S_msgQDistGrpLib_INVALID_OPTION
  267. * The <options> parameter is invalid.
  268. * i S_msgQDistGrpLib_DATABASE_FULL
  269. * The group database is full.
  270. * i S_distLib_OBJ_ID_ERROR
  271. * The <msgQId> parameter is not a distributed message queue.
  272. * ie
  273. *
  274. * SEE ALSO: msgQLib, msgQDistLib, distNameLib
  275. * INTERNAL NOTE: Takes <distGrpDbSemaphore>.
  276. */
  277. MSG_Q_ID msgQDistGrpAdd
  278.     (
  279.     char *            distGrpName,  /* new or existing group name           */
  280.     MSG_Q_ID          msgQId,       /* message queue to add to the group    */
  281.     DIST_GRP_OPT      options       /* group message queue options - UNUSED */
  282.     )
  283.     {
  284.     DIST_GRP_DB_NODE * pDistGrpDbNode;
  285.     DIST_MSG_Q_ID      dMsgQId;      /* distributed message queue ID */
  286.     DIST_OBJ_NODE * pObjNode;     /* ptr to object containing real ID */ 
  287.     DIST_GAP_NODE   dGapNode;
  288.     DIST_GAP_NODE * pDGapNodeTemp;
  289.     /*
  290.      * Check the parameters:  
  291.      * - msgQId must be a distributed message queue. It cannot be a
  292.      *   a standard message queue or a group.
  293.      * - the group name cannot exceed DIST_NAME_MAX_LENGTH
  294.      * - options must be 0
  295.      *
  296.      * If any of the parameters are invalid, set errno and return
  297.      * NULL to indicate failure.
  298.      */
  299.     if (!ID_IS_DISTRIBUTED (msgQId)) /* not a distributed message queue */
  300.         {
  301.         errnoSet (S_distLib_OBJ_ID_ERROR);
  302.         return NULL;
  303.         }
  304.     else                             /* is distributed msgQ */
  305.         {
  306.         pObjNode = MSG_Q_ID_TO_DIST_OBJ_NODE (msgQId);
  307.         if (!IS_DIST_MSG_Q_OBJ (pObjNode))
  308.             {
  309.             /* legal object ID, but not a message queue */
  310.             errnoSet (S_distLib_OBJ_ID_ERROR);
  311.             return NULL;    
  312.             }
  313.        
  314.         dMsgQId = (DIST_MSG_Q_ID) pObjNode->objNodeId;
  315.         if (IS_DIST_MSG_Q_TYPE_GRP(dMsgQId))
  316.             {
  317.             /* ID refers to a group, not a plain dist. message queue */
  318.             errnoSet (S_distLib_OBJ_ID_ERROR);
  319.             return NULL;
  320.             }
  321.         }
  322.     if (strlen (distGrpName) > DIST_NAME_MAX_LENGTH)
  323.         {
  324.         errnoSet (S_msgQDistGrpLib_NAME_TOO_LONG);
  325.         return (NULL);    /* name too long */
  326.         }
  327.     if (options != 0)
  328.         {
  329.         errnoSet (S_msgQDistGrpLib_INVALID_OPTION);
  330.         return (NULL);    /* options parameter currently unused */
  331.         }
  332.     /* Lock the database and try to find the group. */
  333.     msgQDistGrpDbLock();
  334.     pDistGrpDbNode = msgQDistGrpLclFindByName (distGrpName);
  335.     if (pDistGrpDbNode == NULL)
  336.         {
  337. #ifdef MSG_Q_DIST_GRP_REPORT
  338.         DIST_MSG_Q_GRP_ID    grpId;
  339. #endif
  340.         /* Group is unknown by now, create it. */
  341. #ifdef MSG_Q_DIST_GRP_REPORT
  342.         printf ("msgQDistGrpAdd: group is unknown by now--create itn");
  343. #endif
  344.         pDistGrpDbNode = msgQDistGrpLclCreate (distGrpName, distGrpIdNext++,
  345.                                                DIST_GRP_STATE_LOCAL_TRY);
  346.         msgQDistGrpDbUnlock();
  347.         if (pDistGrpDbNode == NULL)
  348.             {
  349. #ifdef DIST_DIAGNOSTIC
  350.             distLog ("msgQDistGrpAdd: failed to create new groupn");
  351. #endif
  352.             errnoSet (S_msgQDistGrpLib_DATABASE_FULL);
  353.             return (NULL);
  354.             }
  355.         /*
  356.          * Agree on a global unique identifier for the group and
  357.          * add the fist group member.
  358.          */
  359. #ifdef MSG_Q_DIST_GRP_REPORT
  360.         printf ("msgQDistGrpAdd: enter agreement phase, propose 0x%lxn",
  361.                 (u_long) msgQDistGrpLclGetId (pDistGrpDbNode));
  362.         grpId =
  363. #endif
  364.             msgQDistGrpAgree (pDistGrpDbNode);
  365. #ifdef MSG_Q_DIST_GRP_REPORT
  366.         printf ("msgQDistGrpAdd: agreed on 0x%lxn", (u_long) grpId);
  367. #endif
  368.         msgQDistGrpLclAddMember (pDistGrpDbNode, msgQId);
  369.         }
  370.     else
  371.         {
  372.         /* Group already exists. Check the state. */
  373. #ifdef MSG_Q_DIST_GRP_REPORT
  374.         printf ("msgQDistGrpAdd: group already exists--add objectn");
  375. #endif
  376.         while (pDistGrpDbNode->grpDbState < DIST_GRP_STATE_GLOBAL)
  377.             {
  378.                 /*
  379.                  * Group already exists, but has a local state. This means,
  380.                  * somebody else tries to install the group.
  381.                  * We unlock the group database and wait for a go-ahead.
  382.                  */
  383.                 if (! pDistGrpDbNode->pGrpDbGapNode)
  384.                     {
  385.                     /* GAP node is not initialized by now */
  386.                     distGapNodeInit (&dGapNode, pDistGrpDbNode, FALSE);
  387.                     pDistGrpDbNode->pGrpDbGapNode = &dGapNode;
  388.                     }
  389.                  msgQDistGrpDbUnlock();
  390.     
  391.                  semTake (&dGapNode.gapWaitFor, WAIT_FOREVER);
  392.                  msgQDistGrpDbLock();
  393.             }
  394.         if ((pDGapNodeTemp = pDistGrpDbNode->pGrpDbGapNode) != NULL)
  395.             {
  396.             pDistGrpDbNode->pGrpDbGapNode = NULL;
  397.             distGapNodeDelete (pDGapNodeTemp);
  398.             }
  399.         msgQDistGrpDbUnlock();
  400.         /*
  401.          * Group exists and has a global state (at least now).
  402.          * Add member to group.
  403.          */
  404. #ifdef MSG_Q_DIST_GRP_REPORT
  405.         printf ("msgQDistGrpAdd: add membern");
  406. #endif
  407.         msgQDistGrpLclAddMember (pDistGrpDbNode, msgQId);
  408.         }
  409.     return (pDistGrpDbNode->grpDbMsgQId);   /* retrun MSG_Q_ID */
  410.     }
  411. /***************************************************************************
  412. *
  413. * msgQDistGrpDelete - delete a distributed message queue from a group (VxFusion option)
  414. *
  415. * This routine deletes a distributed message queue from a group. 
  416. *
  417. * NOTE: For this release, it is not possible to remove a distributed message
  418. * queue from a group.
  419. *
  420. * AVAILABILITY
  421. * This routine is distributed as a component of the unbundled distributed
  422. * message queues option, VxFusion.
  423. *
  424. * RETURNS: ERROR, always.
  425. *
  426. * ERRNO: S_distLib_NO_OBJECT_DESTROY
  427. *
  428. * INTERNAL NOTE: Takes <distGrpDbSemaphore>.
  429. */
  430. STATUS msgQDistGrpDelete
  431.     (
  432.     char *   distGrpName,   /* group containing the queue to be deleted */
  433.     MSG_Q_ID msgQId         /* ID of the message queue to delete */
  434.     )
  435.     {
  436. #if 0
  437.     DIST_MSG_Q_ID  distMsgQId;
  438.     DIST_OBJ_ID    objId;
  439. #else
  440.         UNUSED_ARG(distGrpName);
  441.         UNUSED_ARG(msgQId);
  442.         
  443.     errnoSet (S_distLib_NO_OBJECT_DESTROY);
  444.     return (ERROR);    /* BY NOW */
  445. #endif
  446.     /* the rest of this function is not compiled */
  447. #if 0
  448.     if (ID_IS_DISTRIBUTED (msgQId))
  449.         {
  450.         objId = ((DIST_OBJ_NODE *) msgQId)->objNodeId;
  451.         distMsgQId = DIST_OBJ_ID_TO_DIST_MSG_Q_ID (objId);
  452.         }
  453.     if (! (ID_IS_DISTRIBUTED (msgQId)) || IS_DIST_MSG_Q_LOCAL (distMsgQId))
  454.         {
  455.         /* msgQ is local */
  456.         SL_NODE                *pNode;
  457.         DIST_GRP_DB_NODE    *pDistGrpDbNode;
  458.         DIST_GRP_MSG_Q_NODE *pDistGrpMsgQNode;
  459.         DIST_GRP_MSG_Q_NODE    *pPrevNode = NULL;
  460.         msgQDistGrpDbLock();
  461.         pDistGrpDbNode = msgQDistGrpLclFindByName (distGrpName);
  462.         msgQDistGrpDbUnlock();
  463.         if (pDistGrpDbNode == NULL)
  464.             return (ERROR);
  465.         for (pNode = SLL_FIRST ((SL_LIST *) &pDistGrpDbNode->grpDbMsgQIdLst);
  466.              pNode != NULL;
  467.              pNode = SLL_NEXT (pNode))
  468.             {
  469.             pDistGrpMsgQNode = (DIST_GRP_MSG_Q_NODE *) pNode;
  470.             if (pDistGrpMsgQNode->msgQId == msgQId)
  471.                 {
  472.                 sllRemove ((SL_LIST *) &pDistGrpDbNode->grpDbMsgQIdLst,
  473.                         (SL_NODE *) pDistGrpMsgQNode, (SL_NODE *) pPrevNode);
  474.                 break;
  475.                 }
  476.                 pPrevNode = pDistGrpMsgQNode;
  477.             }
  478.         }
  479.     else
  480.         {
  481.         /* msgQ is remote */
  482.         return (ERROR);
  483.         }
  484.     return (OK);
  485. #endif
  486.     }
  487. /***************************************************************************
  488. *
  489. * msgQDistGrpAgree - agree on a group identifier (VxFusion option)
  490. *
  491. * This routine determines the DIST_MSG_Q_GRP_ID associated with a
  492. * DIST_GRP_DB_NODE .
  493. *
  494. * AVAILABILITY
  495. * This routine is distributed as a component of the unbundled distributed
  496. * message queues option, VxFusion.
  497. *
  498. * RETURNS: A DIST_MSG_Q_GRP_ID .
  499. *
  500. * NOMANUAL
  501. */
  502. DIST_MSG_Q_GRP_ID msgQDistGrpAgree
  503.     (
  504.     DIST_GRP_DB_NODE *pDistGrpDbNode  /* ptr to database node */
  505.     )
  506.     {
  507.     DIST_MSG_Q_GRP_ID    grpId;
  508.     /* Start GAP. */
  509.     grpId = distGapStart (pDistGrpDbNode);
  510.     return (grpId);
  511.     }
  512. /***************************************************************************
  513. *
  514. * msgQDistGrpLclSend - send to all group members registrated locally (VxFusion option)
  515. *
  516. * This routine sends a message to locally registered members of a group.
  517. *
  518. * AVAILABILITY
  519. * This routine is distributed as a component of the unbundled distributed
  520. * message queues option, VxFusion.
  521. *
  522. * RETURNS: OK, if successful; ERROR, if not.
  523. *
  524. * NOMANUAL
  525. */
  526. LOCAL STATUS msgQDistGrpLclSend
  527.     (
  528.     DIST_MSG_Q_GRP_SEND_INQ * pInq,          /* pointer to inquiry node */
  529.     DIST_MSG_Q_GRP_ID         distMsgQGrpId, /* group on which to send  */
  530.     char *                    buffer,        /* message to send         */
  531.     UINT                      nBytes,        /* length of message       */
  532.     int                       timeout,       /* ticks to wait           */
  533.     int                       priority       /* priority                */
  534.     )
  535.     {
  536.     DIST_GRP_DB_NODE * pDistGrpDbNode;
  537.     SL_NODE * pNode;
  538.     DIST_GRP_MSG_Q_NODE * pDistGrpMsgQNode;
  539.     STATUS                status;
  540.     int                   tid;
  541.     msgQDistGrpDbLock();
  542.     pDistGrpDbNode = msgQDistGrpLclFindById (distMsgQGrpId);
  543.     msgQDistGrpDbUnlock();
  544.     if (pDistGrpDbNode == NULL)
  545.         {
  546. #ifdef MSG_Q_DIST_GRP_REPORT
  547.         printf ("msgQDistGrpLclSend: group 0x%lx is unknownn",
  548.                 (u_long) distMsgQGrpId);
  549. #endif
  550.         pInq->sendInqStatus = MSG_Q_DIST_GRP_STATUS_ILLEGAL_OBJ_ID;
  551.         return (ERROR);                    /* illegal object id */
  552.         }
  553.     /*
  554.      * For all members of this group; send'em the message.
  555.      * Note: a member does not need to be a local message queue.
  556.      */
  557.     for (pNode = SLL_FIRST ((SL_LIST *) &pDistGrpDbNode->grpDbMsgQIdLst);
  558.             pNode != NULL;
  559.             pNode = SLL_NEXT (pNode))
  560.         {
  561.         pDistGrpMsgQNode = (DIST_GRP_MSG_Q_NODE *) pNode;
  562. #ifdef MSG_Q_DIST_GRP_REPORT
  563.         printf ("msgQDistGrpLclSend: send message to msgQId %pn",
  564.                 pDistGrpMsgQNode->msgQId);
  565. #endif
  566.         /*
  567.          * First try to send with NO_WAIT. If this fails due to a full
  568.          * message queue and timeout is not NO_WAIT, spawn a task
  569.          * and call msgQSend with the user specified timeout.
  570.          */
  571.         status = msgQDistSend (pDistGrpMsgQNode->msgQId, buffer, nBytes,
  572.                                NO_WAIT, WAIT_FOREVER, priority);
  573.         if (status == ERROR)
  574.             {
  575. #ifdef MSG_Q_DIST_GRP_REPORT
  576.             printf ("msgQDistGrpLclSend: msgQDistSend() returned errorn");
  577. #endif
  578.             if (timeout != NO_WAIT && errno == S_objLib_OBJ_UNAVAILABLE)
  579.                 {
  580.                 msgQGrpSendInqLock(pInq);    /* lock inquiry node */
  581.                 /* timeout is not NO_WAIT and msgQ is full */
  582.                 pInq->sendInqNumBlocked++;
  583.                 msgQGrpSendInqUnlock(pInq);    /* unlock inquiry node */
  584.     
  585.                 /* spawn a task that can wait */
  586.                 tid = taskSpawn (NULL,
  587.                                  DIST_MSG_Q_GRP_WAIT_TASK_PRIO,
  588.                                  0,
  589.                                  DIST_MSG_Q_GRP_WAIT_TASK_STACK_SZ,
  590.                                  (FUNCPTR) msgQDistGrpLclSendCanWait,
  591.                                  (int) distInqGetId ((DIST_INQ *) pInq),
  592.                                  (int) pDistGrpMsgQNode->msgQId,
  593.                                  (int) buffer,
  594.                                  (int) nBytes,
  595.                                  (int) timeout,
  596.                                  (int) priority,
  597.                                  0, 0, 0, 0);
  598.                 if (tid == ERROR)
  599.                     pInq->sendInqStatus = MSG_Q_DIST_GRP_STATUS_ERROR;
  600.                 }
  601.             else
  602.                 {
  603.                 pInq->sendInqStatus = MSG_Q_DIST_GRP_STATUS_ERROR;
  604.                 }
  605.             }
  606.         }
  607.     if (pInq->sendInqNumBlocked == 0)
  608.         {
  609.         /* If we leave nobody blocked, decrease the outstanding counter. */
  610.         msgQGrpSendInqLock(pInq);    /* lock inquiry node */
  611.         if (--pInq->sendInqNumOutstanding == 0)
  612.             {
  613.             msgQGrpSendInqUnlock(pInq);    /* unlock inquiry node */
  614.             semGive (&pInq->sendInqWait);
  615.             }
  616.         else
  617.             msgQGrpSendInqUnlock(pInq);    /* unlock inquiry node */
  618.         }
  619.     return (OK);
  620.     }
  621. /***************************************************************************
  622. *
  623. * msgQDistGrpLclSendCanWait - send buffer to local members of a group (VxFusion option)
  624. *
  625. * This routine sends a message to local group members.
  626. *
  627. * AVAILABILITY
  628. * This routine is distributed as a component of the unbundled distributed
  629. * message queues option, VxFusion.
  630. *
  631. * RETURNS: N/A
  632. *
  633. * NOMANUAL
  634. */
  635. LOCAL void msgQDistGrpLclSendCanWait
  636.     (
  637.     DIST_INQ_ID    inqId,         /* inquiry id             */
  638.     MSG_Q_ID       msgQId,        /* queue on which to send */
  639.     char *         buffer,        /* message to send        */
  640.     UINT           nBytes,        /* length of message      */
  641.     int            timeout,       /* ticks to wait          */
  642.     int            priority       /* priority               */
  643.     )
  644.     {
  645.     DIST_MSG_Q_GRP_SEND_INQ * pInq;
  646.     STATUS                    status;
  647.     status = msgQDistSend (msgQId, buffer, nBytes, timeout,
  648.             WAIT_FOREVER, priority);
  649.     if (status == OK)
  650.         {
  651.         if ((pInq = (DIST_MSG_Q_GRP_SEND_INQ *) distInqFind (inqId)) == NULL)
  652.             return;
  653.         msgQGrpSendInqLock(pInq);            /* lock inquiry node */
  654.         if (--pInq->sendInqNumBlocked == 0)
  655.             if (--pInq->sendInqNumOutstanding == 0)
  656.                 {
  657.                 msgQGrpSendInqUnlock(pInq);    /* unlock inquiry node */
  658.                 semGive (&pInq->sendInqWait);
  659.                 return;
  660.                 }
  661.         msgQGrpSendInqUnlock(pInq);        /* unlock inquiry node */
  662.         }
  663.     }
  664. /***************************************************************************
  665. *
  666. * msgQDistGrpSend - send buffer to local and remote members of a group (VxFusion option)
  667. *
  668. * This routine sends a message to local and remote members of a group.
  669. *
  670. * AVAILABILITY
  671. * This routine is distributed as a component of the unbundled distributed
  672. * message queues option, VxFusion.
  673. *
  674. * RETURNS: OK, if successful; ERROR, is unsuccessful.
  675. *
  676. * NOMANUAL
  677. */
  678. STATUS msgQDistGrpSend
  679.     (
  680.     DIST_MSG_Q_GRP_ID   distMsgQGrpId,  /* group on which to send          */
  681.     char *              buffer,         /* message to send                 */
  682.     UINT                nBytes,         /* length of message               */
  683.     int                 msgQTimeout,    /* ticks to wait at message queues */
  684.     int                 overallTimeout, /* ticks to wait overall           */
  685.     int                 priority        /* priority                        */
  686.     )
  687.     {
  688.     DIST_PKT_MSG_Q_GRP_SEND    pktSend;
  689.     DIST_MSG_Q_GRP_SEND_INQ    inquiryNode;
  690.     DIST_INQ_ID                inquiryId;
  691.     DIST_IOVEC                 distIOVec[2];
  692.     STATUS                     status;
  693.     inquiryNode.sendInq.inqType = DIST_MSG_Q_GRP_INQ_TYPE_SEND;
  694.     msgQGrpSendInqLockInit (&inquiryNode);
  695.     semBInit (&(inquiryNode.sendInqWait), SEM_Q_FIFO, SEM_EMPTY);
  696.     inquiryNode.sendInqTask = taskIdSelf();
  697.     inquiryNode.sendInqNumBlocked = 0;
  698.     inquiryNode.sendInqNumOutstanding =
  699.             distNodeGetNumNodes(DIST_NODE_NUM_NODES_ALIVE);
  700.     inquiryNode.sendInqStatus = MSG_Q_DIST_GRP_STATUS_OK;
  701.     inquiryId = distInqRegister ((DIST_INQ *) &inquiryNode);
  702.     status = msgQDistGrpLclSend (&inquiryNode, distMsgQGrpId, buffer, nBytes,
  703.                                  msgQTimeout, priority);
  704.     if (status == ERROR)
  705.         {
  706. #ifdef MSG_Q_DIST_GRP_REPORT
  707.         printf ("msgQDistGrpSend: group is unknownn");
  708. #endif
  709.         distInqCancel ((DIST_INQ *) &inquiryNode);
  710.         return (ERROR);
  711.         }
  712. #ifdef MSG_Q_DIST_GRP_REPORT
  713.     if (inquiryNode.sendInqNumBlocked)
  714.         printf ("msgQDistGrpSend: %d local agents blocked on queuesn",
  715.                 inquiryNode.sendInqNumBlocked);
  716. #endif
  717.     /*
  718.      * Broadcast message to the net. Every message addressed to a group
  719.      * is broadcasted to the net.
  720.      */
  721.     pktSend.pktMsgQGrpSendHdr.pktType = DIST_PKT_TYPE_MSG_Q_GRP;
  722.     pktSend.pktMsgQGrpSendHdr.pktSubType = DIST_PKT_TYPE_MSG_Q_GRP_SEND;
  723.     pktSend.pktMsgQGrpSendInqId = htonl((uint32_t) inquiryId);
  724.     pktSend.pktMsgQGrpSendTimeout =
  725.             htonl ((uint32_t) DIST_TICKS_TO_MSEC (msgQTimeout));
  726.     pktSend.pktMsgQGrpSendId = htons (distMsgQGrpId);
  727.     /* use IOV stuff here, since we do not want to copy data */
  728.     distIOVec[0].pIOBuffer = &pktSend;
  729.     distIOVec[0].IOLen = DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_GRP_SEND);
  730.     distIOVec[1].pIOBuffer = buffer;
  731.     distIOVec[1].IOLen = nBytes;
  732. #ifdef MSG_Q_DIST_GRP_REPORT
  733.     printf ("msgQDistGrpSend: broadcast messagen");
  734. #endif
  735.     status = distNetIOVSend (DIST_IF_BROADCAST_ADDR, &distIOVec[0], 2,
  736.             WAIT_FOREVER, DIST_MSG_Q_PRIO_TO_NET_PRIO (priority));
  737.     if (status == ERROR)
  738.         {
  739. #ifdef MSG_Q_DIST_GRP_REPORT
  740.         printf ("msgQDistGrpSend: broadcast failedn");
  741. #endif
  742.         distInqCancel ((DIST_INQ *) &inquiryNode);
  743.         errnoSet (S_distLib_UNREACHABLE);
  744.         return (ERROR);
  745.         }
  746.     /*
  747.      * semTake() blocks the requesting task until the service
  748.      * task gives the semaphore, because the request has
  749.      * been processed.
  750.      */
  751.     semTake (&(inquiryNode.sendInqWait), overallTimeout);
  752.     distInqCancel ((DIST_INQ *) &inquiryNode);
  753.     if (inquiryNode.sendInqNumOutstanding > 0)
  754.         {
  755.         /* overall timeout */
  756.         errnoSet (S_msgQDistLib_OVERALL_TIMEOUT);
  757.         return (ERROR);
  758.         }
  759.  
  760.     if (inquiryNode.sendInqStatus != MSG_Q_DIST_GRP_STATUS_OK)
  761.         return (ERROR);
  762.     return (OK);
  763.     }
  764. /***************************************************************************
  765. *
  766. * msgQDistGrpAgent - send an inquiry to a group (VxFusion option)
  767. *
  768. * This routine sends an inquiry message to a message group.
  769. *
  770. * AVAILABILITY
  771. * This routine is distributed as a component of the unbundled distributed
  772. * message queues option, VxFusion.
  773. *
  774. * RETURNS: OK, if successfully sent; ERROR, if not sent.
  775. *
  776. * NOMANUAL
  777. */
  778. LOCAL STATUS msgQDistGrpAgent
  779.     (
  780.     DIST_NODE_ID            nodeIdSender,   /* node id of the sender  */
  781.     DIST_INQ_ID             inqIdSender,    /* inquiry id at sender   */
  782.     DIST_MSG_Q_GRP_ID       distMsgQGrpId,  /* group on which to send */
  783.     char *                  buffer,         /* message to send        */
  784.     UINT                    nBytes,         /* length of message      */
  785.     int                     timeout,        /* ticks to wait          */
  786.     int                     priority        /* priority               */
  787.     )
  788.     {
  789.     DIST_MSG_Q_GRP_SEND_INQ    inquiryNode;
  790.     /*
  791.      * Our caller cannot wait, so we have to create
  792.      * the inquiry id.
  793.      */
  794.     inquiryNode.sendInq.inqType = DIST_MSG_Q_GRP_INQ_TYPE_SEND;
  795.     semBInit (&inquiryNode.sendInqWait, SEM_Q_FIFO, SEM_EMPTY);
  796.     /* inquiryNode.sendInqTask = taskIdSelf(); */
  797.     inquiryNode.sendInqNumBlocked = 0;
  798.     inquiryNode.sendInqNumOutstanding = 1;
  799.     inquiryNode.sendInqStatus = MSG_Q_DIST_GRP_STATUS_OK;
  800.     distInqRegister ((DIST_INQ *) &inquiryNode);
  801.     if (msgQDistGrpLclSend (&inquiryNode, distMsgQGrpId, buffer, nBytes,
  802.             timeout, priority) == ERROR)
  803.         {
  804.         free (buffer);
  805.         msgQDistGrpSendStatus (nodeIdSender, inqIdSender,
  806.                 MSG_Q_DIST_GRP_STATUS_ILLEGAL_OBJ_ID);
  807.         return (ERROR);
  808.         }
  809.     semTake (&inquiryNode.sendInqWait, WAIT_FOREVER);
  810.     
  811.     distInqCancel ((DIST_INQ *) &inquiryNode);
  812.     
  813.     free (buffer);
  814.     if (inquiryNode.sendInqStatus != MSG_Q_DIST_GRP_STATUS_OK)
  815.         {
  816. #ifdef MSG_Q_DIST_GRP_REPORT
  817.         printf ("msgQDistGrpAgent: something failedn");
  818. #endif
  819.         msgQDistGrpSendStatus (nodeIdSender, inqIdSender,
  820.                 (INT16) inquiryNode.sendInqStatus);
  821.         return (ERROR);
  822.         }
  823. #ifdef MSG_Q_DIST_GRP_REPORT
  824.     printf ("msgQDistGrpAgent: respond with OKn");
  825. #endif
  826.     msgQDistGrpSendStatus (nodeIdSender, inqIdSender,
  827.             MSG_Q_DIST_GRP_STATUS_OK);
  828.     return (OK);
  829.     }
  830. /***************************************************************************
  831. *
  832. * msgQDistGrpSendStatus - send status and errno (VxFusion option)
  833. *
  834. * This routine sends local status to an inquirying node.
  835. *
  836. * AVAILABILITY
  837. * This routine is distributed as a component of the unbundled distributed
  838. * message queues option, VxFusion.
  839. *
  840. * RETURNS: OK, if status sent; ERROR, if not sent.
  841. *
  842. * NOMANUAL
  843. */
  844. LOCAL STATUS msgQDistGrpSendStatus
  845.     (
  846.     DIST_NODE_ID        nodeIdDest,   /* node ID of destination */
  847.     DIST_INQ_ID         inqId,        /* the inquiry ID */
  848.     DIST_MSG_Q_STATUS   dStatus       /* status to send */
  849.     )
  850.     {
  851.     DIST_PKT_MSG_Q_GRP_STATUS    pktStatus;
  852.     STATUS                        status;
  853.     pktStatus.pktMsgQGrpStatusHdr.pktType = DIST_PKT_TYPE_MSG_Q_GRP;
  854.     pktStatus.pktMsgQGrpStatusHdr.pktSubType = DIST_PKT_TYPE_MSG_Q_GRP_STATUS;
  855.     pktStatus.pktMsgQGrpStatusInqId = htonl ((uint32_t) inqId);
  856.     pktStatus.pktMsgQGrpStatusErrno = htonl ((uint32_t) errnoGet());
  857.     pktStatus.pktMsgQGrpStatusDStatus = htons ((uint16_t) dStatus);
  858.     status = distNetSend (nodeIdDest, (DIST_PKT *) &pktStatus,
  859.                           DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_GRP_STATUS),
  860.                           WAIT_FOREVER, DIST_MSG_Q_GRP_PRIO);
  861.     return (status);
  862.     }
  863. /***************************************************************************
  864. *
  865. * msgQDistGdbInput - called for distributed group database updates (VxFusion option)
  866. *
  867. * This routine updates the local group database.
  868. *
  869. * AVAILABILITY
  870. * This routine is distributed as a component of the unbundled distributed
  871. * message queues option, VxFusion.
  872. *
  873. * RETURNS: Status of update.
  874. *
  875. * NOMANUAL
  876. */
  877. LOCAL DIST_STATUS msgQDistGdbInput
  878.     (
  879.     DIST_NODE_ID    nodeIdSrc,   /* unused */
  880.     DIST_TBUF_HDR * pTBufHdr     /* ptr to received TBUF header */
  881.     )
  882.     {
  883.     DIST_PKT * pPkt;
  884.     int        pktLen;
  885.     DIST_PKT_DGDB_ADD    pktAdd;
  886.     DIST_MSG_Q_GRP_ID    grpId;
  887.     DIST_GRP_DB_NODE *   pDbNodeNew;
  888.     DIST_NODE_ID         grpCreator;
  889.     char                 grpName[DIST_NAME_MAX_LENGTH + 1];
  890.     int                  sz;
  891.     
  892.     UNUSED_ARG(nodeIdSrc);
  893.     
  894.     pktLen = pTBufHdr->tBufHdrOverall;
  895.     if (pktLen < sizeof (DIST_PKT))
  896.         distPanic ("msgQDistGdbInput: packet too shortn");
  897.     pPkt = (DIST_PKT *) (DIST_TBUF_GET_NEXT (pTBufHdr))->pTBufData;
  898.     switch (pPkt->pktSubType)
  899.         {
  900.         case DIST_PKT_TYPE_DGDB_ADD:
  901.             {
  902.             if (pktLen < DIST_PKT_HDR_SIZEOF (DIST_PKT_DGDB_ADD))
  903.                 distPanic ("msgQDistGdbInput: packet too shortn");
  904.             distTBufCopy (DIST_TBUF_GET_NEXT (pTBufHdr), 0, (char *) &pktAdd,
  905.                           DIST_PKT_HDR_SIZEOF (DIST_PKT_DGDB_ADD));
  906.             
  907.             if ((sz = pktLen - DIST_PKT_HDR_SIZEOF (DIST_PKT_DGDB_ADD)) >
  908.                     DIST_NAME_MAX_LENGTH + 1)
  909.                 distPanic ("msgQDistGdbInput: name too longn");
  910.             distTBufCopy (DIST_TBUF_GET_NEXT (pTBufHdr),
  911.                           DIST_PKT_HDR_SIZEOF (DIST_PKT_DGDB_ADD),
  912.                           (char *) &grpName, sz);
  913.             grpId = ntohs (pktAdd.pktDgdbAddId);
  914.             grpCreator = ntohl (pktAdd.pktDgdbAddCreator);
  915.             if (grpId >= distGrpIdNext)
  916.                 distGrpIdNext = grpId + 1;
  917. #ifdef MSG_Q_DIST_GRP_REPORT
  918.             printf ("msgQDistGrpInput: group database add from node %ldn",
  919.                     nodeIdSrc);
  920.             printf ("msgQDistGrpInput: bind `%s' to group id %dn",
  921.                     (char *) &grpName, grpId);
  922. #endif
  923.             msgQDistGrpDbLock();
  924.             pDbNodeNew = msgQDistGrpLclCreate ((char *) &grpName, grpId,
  925.                                                DIST_GRP_STATE_GLOBAL);
  926.             if (pDbNodeNew == NULL)
  927.                 {
  928.                 msgQDistGrpDbUnlock();
  929.                 distPanic ("msgQDistGdbInput: group creation failedn");
  930.                 }
  931.             msgQDistGrpLclSetCreator (pDbNodeNew, grpCreator);
  932.             msgQDistGrpDbUnlock();
  933.             return (DIST_GDB_STATUS_OK);
  934.             }
  935.         default:
  936.             return (DIST_GDB_STATUS_PROTOCOL_ERROR);
  937.         }
  938.     }
  939. /***************************************************************************
  940. *
  941. * msgQDistGrpInput - called when a new group message arrives at the system (VxFusion option)
  942. *
  943. * This routine is called whenever a group message is received.
  944. *
  945. * AVAILABILITY
  946. * This routine is distributed as a component of the unbundled distributed
  947. * message queues option, VxFusion.
  948. *
  949. * RETURNS: Status of message processing.
  950. *
  951. * NOMANUAL
  952. */
  953. LOCAL DIST_STATUS msgQDistGrpInput
  954.     (
  955.     DIST_NODE_ID    nodeIdSrc,        /* source node ID */
  956.     DIST_TBUF_HDR * pTBufHdr          /* ptr to message */
  957.     )
  958.     {
  959.     DIST_PKT * pPkt;
  960.     int        pktLen;
  961.     DIST_PKT_MSG_Q_GRP_SEND  pktSend;
  962.     DIST_MSG_Q_GRP_ID        dMsgQGrpId;
  963.     DIST_GRP_DB_NODE *       pDistGrpDbNode;
  964.     DIST_INQ_ID              inqIdSrc;
  965.     uint32_t                 timeout_msec;
  966.     UINT                     nBytes;
  967.     char *                   buffer;
  968.     int                      tid;
  969.     int                      timeout;
  970.     int                      prio;
  971.     DIST_PKT_MSG_Q_GRP_STATUS    pktStatus;
  972.     DIST_STATUS                  dStatus;
  973.     DIST_INQ_ID                  inqId;
  974.     DIST_INQ *                   pGenInq;
  975.     int                          errnoRemote;
  976.     DIST_MSG_Q_GRP_SEND_INQ *    pInq;
  977.     pktLen = pTBufHdr->tBufHdrOverall;
  978.     if (pktLen < sizeof (DIST_PKT))
  979.         distPanic ("msgQDistGrpInput: packet too shortn");
  980.     pPkt = (DIST_PKT *) ((DIST_TBUF_GET_NEXT (pTBufHdr))->pTBufData);
  981.     switch (pPkt->pktSubType)
  982.         {
  983.         case DIST_PKT_TYPE_MSG_Q_GRP_SEND:
  984.             {
  985.             if (pktLen < DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_GRP_SEND))
  986.                  distPanic ("msgQDistGrpInput/SEND: packet too shortn");
  987.             distTBufCopy (DIST_TBUF_GET_NEXT (pTBufHdr), 0, (char *)&pktSend,
  988.                           DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_GRP_SEND));
  989.             inqIdSrc = (DIST_INQ_ID) 
  990.                        htonl(pktSend.pktMsgQGrpSendInqId);
  991.             timeout_msec = ntohl (pktSend.pktMsgQGrpSendTimeout);
  992.             timeout = DIST_MSEC_TO_TICKS (timeout_msec);
  993.             nBytes = pktLen - DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_GRP_SEND);
  994.             dMsgQGrpId = ntohs (pktSend.pktMsgQGrpSendId);
  995.             /* Check for an empty group */
  996.             msgQDistGrpDbLock();
  997.             pDistGrpDbNode = msgQDistGrpLclFindById (dMsgQGrpId);
  998.             msgQDistGrpDbUnlock();
  999.             if (pDistGrpDbNode == NULL)
  1000.                 {
  1001.                 msgQDistGrpSendStatus (nodeIdSrc, inqIdSrc,
  1002.                                        MSG_Q_DIST_GRP_STATUS_UNAVAIL);
  1003.                 return (MSG_Q_DIST_GRP_STATUS_UNAVAIL);
  1004.                 }
  1005.             if (SLL_FIRST ((SL_LIST *)&pDistGrpDbNode->grpDbMsgQIdLst)
  1006.                         == NULL)
  1007.                 {
  1008.                 msgQDistGrpSendStatus (nodeIdSrc, inqIdSrc,
  1009.                                        MSG_Q_DIST_GRP_STATUS_OK);
  1010. #ifdef MSG_Q_DIST_GRP_REPORT
  1011.                 printf ("msgQDistGrpInput/SEND: group has no local membersn");
  1012. #endif
  1013.                 return (MSG_Q_DIST_GRP_STATUS_OK);
  1014.                 }
  1015.             /* Using malloc() here is not very satisfiing. */
  1016.             if ((buffer = (char *) malloc (nBytes)) == NULL)
  1017.                 {
  1018. #ifdef MSG_Q_DIST_GRP_REPORT
  1019.                 printf ("msgQDistGrpInput/SEND: out of memoryn");
  1020. #endif
  1021.                 msgQDistGrpSendStatus (nodeIdSrc,
  1022.                                     inqIdSrc,
  1023.                                     MSG_Q_DIST_GRP_STATUS_NOT_ENOUGH_MEMORY);
  1024.                 distStat.memShortage++;
  1025.                 distStat.msgQGrpInDiscarded++;
  1026.                 return (MSG_Q_DIST_GRP_STATUS_NOT_ENOUGH_MEMORY);
  1027.                 }
  1028.     
  1029.             distTBufCopy (DIST_TBUF_GET_NEXT (pTBufHdr),
  1030.                           DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_GRP_SEND),
  1031.                           buffer, nBytes);
  1032.             prio = NET_PRIO_TO_DIST_MSG_Q_PRIO (pTBufHdr->tBufHdrPrio);
  1033. #ifdef MSG_Q_DIST_GRP_REPORT
  1034.             printf ("msgQDistGrpInput: message from node %ld to group 0x%xn",
  1035.                     nodeIdSrc, dMsgQGrpId);
  1036. #endif
  1037.             /*
  1038.              * Send message to all members, registrated on the local node for
  1039.              * this group.
  1040.              */
  1041.             tid = taskSpawn (NULL,
  1042.                             DIST_MSG_Q_GRP_WAIT_TASK_PRIO,
  1043.                             0,
  1044.                             DIST_MSG_Q_GRP_WAIT_TASK_STACK_SZ,
  1045.                             (FUNCPTR) msgQDistGrpAgent,
  1046.                             (int) nodeIdSrc,
  1047.                             (int) inqIdSrc,
  1048.                             (int) dMsgQGrpId,
  1049.                             (int) buffer,
  1050.                             (int) nBytes,
  1051.                             (int) timeout,
  1052.                             (int) prio,
  1053.                             0, 0, 0);
  1054.             if (tid == ERROR)
  1055.                 {
  1056.                 free (buffer);
  1057.                 msgQDistGrpSendStatus (nodeIdSrc, inqIdSrc,
  1058.                                        MSG_Q_DIST_GRP_STATUS_UNAVAIL);
  1059.                 return (MSG_Q_DIST_GRP_STATUS_UNAVAIL);
  1060.                 }
  1061.             return (MSG_Q_DIST_GRP_STATUS_OK);
  1062.             }
  1063.         case DIST_PKT_TYPE_MSG_Q_GRP_STATUS:
  1064.             {
  1065.             if (pktLen != DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_GRP_STATUS))
  1066.                 distPanic ("msgQDistGrpInput/STATUS: packet too shortn");
  1067.             /* First copy the error packet form the TBuf list. */
  1068.             
  1069.             distTBufCopy (DIST_TBUF_GET_NEXT (pTBufHdr), 0,
  1070.                           (char *) &pktStatus,
  1071.                           DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_GRP_STATUS));
  1072.             dStatus = (DIST_STATUS) ntohs (pktStatus.pktMsgQGrpStatusDStatus);
  1073.             errnoRemote = ntohl (pktStatus.pktMsgQGrpStatusErrno);
  1074.             inqId = (DIST_INQ_ID) ntohl (pktStatus.pktMsgQGrpStatusInqId);
  1075.             if (! (pGenInq = distInqFind (inqId)))
  1076.                 return (MSG_Q_DIST_GRP_STATUS_LOCAL_TIMEOUT);
  1077.             /* See who is addressed by the STATUS telegram. */
  1078.             switch (pGenInq->inqType)
  1079.                 {
  1080.                 case DIST_MSG_Q_GRP_INQ_TYPE_SEND:
  1081.                     {
  1082.                     pInq = (DIST_MSG_Q_GRP_SEND_INQ *) pGenInq;
  1083.                     /*
  1084.                      * Possible errors here:
  1085.                      *    MSG_Q_DIST_GRP_STATUS_OK
  1086.                      *    MSG_Q_DIST_GRP_STATUS_ERROR
  1087.                      *    MSG_Q_DIST_GRP_STATUS_UNAVAIL
  1088.                      *    MSG_Q_DIST_GRP_STATUS_ILLEGAL_OBJ_ID
  1089.                      *    MSG_Q_DIST_GRP_STATUS_NOT_ENOUGH_MEMORY
  1090.                      */
  1091.                     switch (dStatus)
  1092.                         {
  1093.                         case MSG_Q_DIST_GRP_STATUS_OK:
  1094.                             if (--pInq->sendInqNumOutstanding == 0)
  1095.                                 semGive (&pInq->sendInqWait);
  1096.                             break;
  1097.                         case MSG_Q_DIST_GRP_STATUS_ERROR:
  1098.                         case MSG_Q_DIST_GRP_STATUS_UNAVAIL:
  1099.                         case MSG_Q_DIST_GRP_STATUS_ILLEGAL_OBJ_ID:
  1100.                         case MSG_Q_DIST_GRP_STATUS_NOT_ENOUGH_MEMORY:
  1101.                             errnoOfTaskSet (pInq->sendInqTask, errnoRemote);
  1102.                             break;
  1103.                         default:
  1104. #ifdef MSG_Q_DIST_GRP_REPORT
  1105.                             printf ("msgQDistGrpInput/STATUS/SEND: status?n");
  1106. #endif
  1107.                             break;
  1108.                         }
  1109.                     return (MSG_Q_DIST_GRP_STATUS_OK);
  1110.                     }
  1111.                 default:
  1112. #ifdef MSG_Q_DIST_GRP_REPORT
  1113.                     printf ("msgQDistGrpInput/STATUS: unexpected inq (%d)n",
  1114.                             pGenInq->inqType);
  1115. #endif
  1116.                     return (MSG_Q_DIST_GRP_STATUS_INTERNAL_ERROR);
  1117.                 }
  1118.             }
  1119.         default:
  1120. #ifdef MSG_Q_DIST_GRP_REPORT
  1121.             printf ("msgQDistGrpInput/STATUS: unknown group subtype (%d)n",
  1122.                     pPkt->pktSubType);
  1123. #endif
  1124.             return (MSG_Q_DIST_GRP_STATUS_PROTOCOL_ERROR);
  1125.         }
  1126. #ifdef UNDEFINED   /* supposedly unreached */
  1127.     if (status == ERROR)
  1128.         return (MSG_Q_DIST_GRP_STATUS_ERROR);
  1129.     return (MSG_Q_DIST_GRP_STATUS_OK);
  1130. #endif
  1131.     }
  1132. /***************************************************************************
  1133. *
  1134. * msgQDistGrpLclSetId - change id of a group in local database (VxFusion option)
  1135. *
  1136. * This routine changes the group id of a node in the local database.
  1137. *
  1138. * AVAILABILITY
  1139. * This routine is distributed as a component of the unbundled distributed
  1140. * message queues option, VxFusion.
  1141. *
  1142. * RETURNS: N/A
  1143. *
  1144. * NOTE: Call with <distGrpDbSemaphore> taken.
  1145. *
  1146. * NOMANUAL
  1147. */
  1148. void msgQDistGrpLclSetId
  1149.     (
  1150.     DIST_GRP_DB_NODE *   pGrpDbNode,   /* the node to change */
  1151.     DIST_MSG_Q_GRP_ID    grpId         /* the new ID */
  1152.     )
  1153.     {
  1154.     DIST_OBJ_NODE *   pObjNode;
  1155.     DIST_MSG_Q_ID     dMsgQId;
  1156.     int               ix = pGrpDbNode->ixNode;
  1157.     hashTblRemove (distGrpDbIdId, (HASH_NODE *) &(distGrpDbId[ix]));
  1158.     pGrpDbNode->grpDbId = grpId;
  1159.     pObjNode = MSG_Q_ID_TO_DIST_OBJ_NODE (pGrpDbNode->grpDbMsgQId);
  1160.     dMsgQId = DIST_MSG_Q_GRP_ID_TO_DIST_MSG_Q_ID (grpId);
  1161.     pObjNode->objNodeId = DIST_MSG_Q_ID_TO_DIST_OBJ_ID (dMsgQId);
  1162.     hashTblPut (distGrpDbIdId, (HASH_NODE *) &(distGrpDbId[ix]));
  1163.     }
  1164. /***************************************************************************
  1165. *
  1166. * msgQDistGrpLclCreate - create a new group in local database (VxFusion option)
  1167. *
  1168. * This routine creates a new group in the local database.
  1169. *
  1170. * AVAILABILITY
  1171. * This routine is distributed as a component of the unbundled distributed
  1172. * message queues option, VxFusion.
  1173. *
  1174. * RETURNS: A ptr to the group node.
  1175. *
  1176. * NOTE: Call with <distGrpDbSemaphore> taken.
  1177. *
  1178. * NOMANUAL
  1179. */
  1180. DIST_GRP_DB_NODE * msgQDistGrpLclCreate
  1181.     (
  1182.     char *               grpName,     /* group name */
  1183.     DIST_MSG_Q_GRP_ID    grpId,       /* group ID */
  1184.     DIST_GRP_STATE       grpState     /* initial state */
  1185.     )
  1186.     {
  1187.     DIST_GRP_DB_NODE *   pDbNode;
  1188.     DIST_OBJ_NODE *      pObjNode;
  1189.     /* get a free database node */
  1190.     if (! (pDbNode = (DIST_GRP_DB_NODE *) sllGet (&msgQDistGrpFreeList)))
  1191.         {
  1192.         return (NULL);    /* database is full */
  1193.         }
  1194.     /* init database node */
  1195.     bcopy (grpName, (char *) &(pDbNode->grpDbName), strlen (grpName) + 1);
  1196.     sllInit (&(pDbNode->grpDbMsgQIdLst));
  1197.     pDbNode->grpDbState = grpState;
  1198.     pDbNode->grpDbId = grpId;
  1199.     pDbNode->pGrpDbGapNode = NULL;
  1200.     /* Create message queue id. */
  1201.     pObjNode = distObjNodeGet();
  1202.     pObjNode->objNodeType    = DIST_OBJ_TYPE_MSG_Q;
  1203.     pObjNode->objNodeId      = DIST_MSG_Q_GRP_ID_TO_DIST_OBJ_ID (grpId);
  1204.     pDbNode->grpDbMsgQId = DIST_OBJ_NODE_TO_MSG_Q_ID (pObjNode);
  1205.     /* Link group name hash node in name hash table. */
  1206.     hashTblPut (distGrpDbNmId, (HASH_NODE *) &(distGrpDbNm[pDbNode->ixNode]));
  1207.     /* Link group id hash node in id hash table. */
  1208.     hashTblPut (distGrpDbIdId, (HASH_NODE *) &(distGrpDbId[pDbNode->ixNode]));
  1209. #ifdef DIST_MSG_Q_GRP_REPORT
  1210.     printf ("msgQDistGrpLclCreate: `%s' (id 0x%lx, state %d) createdn",
  1211.             grpName, grpId, grpState);
  1212. #endif
  1213.     return (pDbNode);
  1214.     }
  1215. /***************************************************************************
  1216. *
  1217. * msgQDistGrpLclAddMember - add a member to a group in the local database (VxFusion option)
  1218. *
  1219. * This routine adds a member to a group in the local database.
  1220. *
  1221. * AVAILABILITY
  1222. * This routine is distributed as a component of the unbundled distributed
  1223. * message queues option, VxFusion.
  1224. *
  1225. * RETURNS: OK, if the member was added; ERROR, if not.
  1226. *
  1227. * NOMANUAL
  1228. */
  1229. STATUS msgQDistGrpLclAddMember
  1230.     (
  1231.     DIST_GRP_DB_NODE * pDbNode,      /* ptr to node of group */
  1232.     MSG_Q_ID           msgQId        /* msgQ ID to add to group */
  1233.     )
  1234.     {
  1235.     DIST_GRP_MSG_Q_NODE * distGrpMsgQNode;
  1236.     distGrpMsgQNode = (DIST_GRP_MSG_Q_NODE *)
  1237.                         malloc (sizeof (DIST_GRP_MSG_Q_NODE));
  1238.     if (distGrpMsgQNode == NULL)
  1239.         return (ERROR);    /* out of memory */
  1240.     distGrpMsgQNode->msgQId = msgQId;
  1241.     msgQDistGrpDbLock();
  1242.     sllPutAtHead (&(pDbNode->grpDbMsgQIdLst), (SL_NODE *) distGrpMsgQNode);
  1243.     msgQDistGrpDbUnlock();
  1244.     return (OK);
  1245.     }
  1246. /***************************************************************************
  1247. *
  1248. * msgQDistGrpLclFindById - Find a group's node given in group ID (VxFusion option)
  1249. *
  1250. * This routine looks up a group node when given the group ID.
  1251. *
  1252. * AVAILABILITY
  1253. * This routine is distributed as a component of the unbundled distributed
  1254. * message queues option, VxFusion.
  1255. *
  1256. * RETURNS: A pointer to a group node, or NULL.
  1257. *
  1258. * NOTE: Take <distGrpDbSemaphore> before calling.
  1259. *
  1260. * NOMANUAL
  1261. */
  1262. DIST_GRP_DB_NODE * msgQDistGrpLclFindById
  1263.     (
  1264.     DIST_MSG_Q_GRP_ID distGrpId    /* the group ID to look for */
  1265.     )
  1266.     {
  1267.     DIST_GRP_HASH_NODE    hMatchNode;
  1268.     DIST_GRP_HASH_NODE *  pHNode;
  1269.     DIST_GRP_DB_NODE      dbMatchNode;
  1270.     hMatchNode.pDbNode = &dbMatchNode;
  1271.     dbMatchNode.grpDbId = distGrpId;
  1272.     pHNode = (DIST_GRP_HASH_NODE *) hashTblFind (distGrpDbIdId,
  1273.                                                  (HASH_NODE *) &hMatchNode,
  1274.                                                  KEY_CMP_ARG_ID);
  1275.     if (pHNode == NULL)
  1276.         return (NULL);    /* not found */
  1277.     return (pHNode->pDbNode);
  1278.     }
  1279. /***************************************************************************
  1280. *
  1281. * msgQDistGrpLclFindByName - Find a group node given a group name (VxFusion option)
  1282. *
  1283. * This routine looks up a group node when given the name of the group.
  1284. *
  1285. * AVAILABILITY
  1286. * This routine is distributed as a component of the unbundled distributed
  1287. * message queues option, VxFusion.
  1288. *
  1289. * RETURNS: A pointer to a group node, or NULL.
  1290. *
  1291. * NOTE: Call with <distGrpDbSemaphore> taken.
  1292. *
  1293. * NOMANUAL
  1294. */
  1295. DIST_GRP_DB_NODE * msgQDistGrpLclFindByName
  1296.     (
  1297.     char * grpName         /* group name to find */
  1298.     )
  1299.     {
  1300.     DIST_GRP_HASH_NODE  hMatchNode, *pHNode;
  1301.     DIST_GRP_DB_NODE    dbMatchNode;
  1302.     hMatchNode.pDbNode = &dbMatchNode;
  1303.     bcopy (grpName, (char *) &(dbMatchNode.grpDbName), strlen (grpName) + 1);
  1304.     pHNode = (DIST_GRP_HASH_NODE *) hashTblFind (distGrpDbNmId,
  1305.                                                  (HASH_NODE *) &hMatchNode,
  1306.                                                  KEY_CMP_ARG_STR);
  1307.     if (pHNode == NULL)
  1308.         return (NULL);    /* not found */
  1309.     return (pHNode->pDbNode);
  1310.     }
  1311. /***************************************************************************
  1312. *
  1313. * msgQDistGrpLclEach - Step through nodes in the database (VxFusion option)
  1314. *
  1315. * This routine steps through all nodes in the database, calling <routine>
  1316. * with argument <routineArg> for each node.
  1317. *
  1318. * AVAILABILITY
  1319. * This routine is distributed as a component of the unbundled distributed
  1320. * message queues option, VxFusion.
  1321. *
  1322. * RETURNS: N/A
  1323. *
  1324. * NOTE: Takes <distGrpDbSemaphore>.
  1325. *
  1326. * NOMANUAL
  1327. */
  1328. void msgQDistGrpLclEach
  1329.     (
  1330.     FUNCPTR routine,        /* routine to call for each node */
  1331.     int     routineArg      /* argument to pass to routine */
  1332.     )
  1333.     {
  1334.     msgQDistGrpDbLock();
  1335.     
  1336.     hashTblEach (distGrpDbNmId, routine, routineArg);
  1337.     msgQDistGrpDbUnlock();
  1338.     }
  1339. /***************************************************************************
  1340. *
  1341. * msgQDistGrpBurst - burst out group database (VxFusion option)
  1342. *
  1343. * This routine is used by INCO to update the remote group database on
  1344. * node <nodeId>. All entries in the database are transmitted.
  1345. *
  1346. * AVAILABILITY
  1347. * This routine is distributed as a component of the unbundled distributed
  1348. * message queues option, VxFusion.
  1349. *
  1350. * RETURNS: STATUS of remote operation.
  1351. *
  1352. * NOMANUAL
  1353. */
  1354. STATUS msgQDistGrpBurst
  1355.     (
  1356.     DIST_NODE_ID    nodeId          /* node to update */
  1357.     )
  1358.     {
  1359.     DIST_GRP_BURST    burst;
  1360.     burst.burstNodeId = nodeId;
  1361.     burst.burstStatus = OK;
  1362.     msgQDistGrpLclEach ((FUNCPTR) msgQDistGrpBurstOne, (int) &burst);
  1363.     return (burst.burstStatus);
  1364.     }
  1365. /***************************************************************************
  1366. *
  1367. * msgQDistGrpBurstOne - burst out single group database entry (VxFusion option)
  1368. *
  1369. * This routine bursts out a database entry to a node.
  1370. *
  1371. * AVAILABILITY
  1372. * This routine is distributed as a component of the unbundled distributed
  1373. * message queues option, VxFusion.
  1374. *
  1375. * RETURNS: TRUE, if successfully; FALSE, if not.
  1376. *
  1377. * NOMANUAL
  1378. */
  1379. BOOL msgQDistGrpBurstOne
  1380.     (
  1381.     DIST_GRP_HASH_NODE * pGrpHashNode,   /* specifies node to send */
  1382.     DIST_GRP_BURST *     pBurst          /* target node */
  1383.     )
  1384.     {
  1385.     DIST_PKT_DGDB_ADD    pktAdd;
  1386.     DIST_GRP_DB_NODE *   pNode = pGrpHashNode->pDbNode;
  1387.     DIST_IOVEC           distIOVec[2];
  1388.     STATUS               status;
  1389.     pktAdd.pktDgdbAddHdr.pktType = DIST_PKT_TYPE_DGDB;
  1390.     pktAdd.pktDgdbAddHdr.pktSubType = DIST_PKT_TYPE_DGDB_ADD;
  1391.     pktAdd.pktDgdbAddId = htons (pNode->grpDbId);
  1392.     pktAdd.pktDgdbAddCreator = htonl (pNode->grpDbNodeId);
  1393.     /* use IOV stuff here, since we do not want to copy data */
  1394.     distIOVec[0].pIOBuffer = &pktAdd;
  1395.     distIOVec[0].IOLen = DIST_PKT_HDR_SIZEOF (DIST_PKT_DGDB_ADD);
  1396.     distIOVec[1].pIOBuffer = (char *) &pNode->grpDbName;
  1397.     distIOVec[1].IOLen = strlen ((char *) &pNode->grpDbName) + 1;
  1398.     status = distNetIOVSend (pBurst->burstNodeId, &distIOVec[0], 2,
  1399.             WAIT_FOREVER, DIST_DGDB_PRIO);
  1400.     if ((pBurst->burstStatus = status) == ERROR)
  1401.         return (FALSE);
  1402.     return (TRUE);
  1403.     }
  1404. /***************************************************************************
  1405. *
  1406. * msgQDistGrpHCmpStr - determine if two nodes have the same name (VxFusion option)
  1407. *
  1408. * This routine is the hash compare function for group names.
  1409. *
  1410. * AVAILABILITY
  1411. * This routine is distributed as a component of the unbundled distributed
  1412. * message queues option, VxFusion.
  1413. *
  1414. * RETURNS: TRUE, if the nodes have the same name; FALSE, otherwise.
  1415. *
  1416. * NOMANUAL
  1417. */
  1418. LOCAL BOOL msgQDistGrpHCmpStr
  1419.     (
  1420.     DIST_GRP_HASH_NODE * pMatchHNode,   /* first node */
  1421.     DIST_GRP_HASH_NODE * pHNode,        /* second node */
  1422.     int                  keyCmpArg      /* not used */
  1423.     )
  1424.     {
  1425.     
  1426.     UNUSED_ARG(keyCmpArg);
  1427.     
  1428.     if (strcmp ((char *) &pMatchHNode->pDbNode->grpDbName,
  1429.                 (char *) &pHNode->pDbNode->grpDbName) == 0)
  1430.         return (TRUE);
  1431.     else
  1432.         return (FALSE);
  1433.     }
  1434. /***************************************************************************
  1435. *
  1436. * msgQDistGrpHFuncStr - hash function for strings (VxFusion option)
  1437. *
  1438. * This routine computes the hash value for a node's group name.
  1439. *
  1440. * AVAILABILITY
  1441. * This routine is distributed as a component of the unbundled distributed
  1442. * message queues option, VxFusion.
  1443. *
  1444. * RETURNS: The hash value.
  1445. *
  1446. * NOMANUAL
  1447. */
  1448. LOCAL INT32 msgQDistGrpHFuncStr
  1449.     (
  1450.     int                    elements,    /* elements in hash table */
  1451.     DIST_GRP_HASH_NODE *   pHNode,      /* node whose name to hash */
  1452.     int                    seed         /* seed for hashing */
  1453.     )
  1454.     {
  1455.     char * tkey;
  1456.     int    hash = 0;
  1457.     for (tkey = (char *) &pHNode->pDbNode->grpDbName; *tkey != ''; tkey++)
  1458.         hash = hash * seed + (unsigned int) *tkey;
  1459.     return (hash & (elements - 1));
  1460.     }
  1461. /***************************************************************************
  1462. *
  1463. * msgQDistGrpHCmpId - compare two group ID's (VxFusion option)
  1464. *
  1465. * This routine is the hash compare function for group ID's.
  1466. *
  1467. * AVAILABILITY
  1468. * This routine is distributed as a component of the unbundled distributed
  1469. * message queues option, VxFusion.
  1470. *
  1471. * RETURNS: TRUE, if ID's match; FALSE, otherwise.
  1472. *
  1473. * NOMANUAL
  1474. */
  1475. LOCAL BOOL msgQDistGrpHCmpId
  1476.     (
  1477.     DIST_GRP_HASH_NODE *   pMatchNode,  /* first node */
  1478.     DIST_GRP_HASH_NODE *   pHNode,      /* second node */
  1479.     int                    keyArg       /* unused */
  1480.     )
  1481.     {
  1482.     DIST_MSG_Q_GRP_ID    distGrpId1 = pMatchNode->pDbNode->grpDbId;
  1483.     DIST_MSG_Q_GRP_ID    distGrpId2 = pHNode->pDbNode->grpDbId;
  1484.     
  1485.     UNUSED_ARG(keyArg);
  1486.     
  1487.     if (distGrpId1 == distGrpId2)
  1488.         return (TRUE);
  1489.     else
  1490.         return (FALSE);
  1491.     }
  1492. /***************************************************************************
  1493. *
  1494. * msgQDistGrpHFuncId - hash function for group ID's (VxFusion option)
  1495. *
  1496. * This routine computes a hash value for a group's ID.
  1497. *
  1498. * AVAILABILITY
  1499. * This routine is distributed as a component of the unbundled distributed
  1500. * message queues option, VxFusion.
  1501. *
  1502. * RETURNS: The hash value.
  1503. *
  1504. * NOMANUAL
  1505. */
  1506. LOCAL INT32 msgQDistGrpHFuncId
  1507.     (
  1508.     int                    elements,   /* elements in hash table */
  1509.     DIST_GRP_HASH_NODE *   pHNode,     /* node whose ID to hash */
  1510.     int                    divisor     /* used by hash computation */
  1511.     )
  1512.     {
  1513.     return ((pHNode->pDbNode->grpDbId % divisor) & (elements - 1));
  1514.     }