distNodeLib.c
上传用户:baixin
上传日期:2008-03-13
资源大小:4795k
文件大小:87k
开发平台:

MultiPlatform

  1. /* distNodeLib - node library (VxFusion option) */
  2. /* Copyright 1999 - 2002 Wind River Systems, Inc. */
  3. /*
  4. modification history
  5. --------------------
  6. 01q,15apr02,jws  fix SPR74878 (not locking node database properly)
  7. 01p,23oct01,jws  fix compiler warnings (SPR 71117)
  8. 01o,04oct01,jws  final fixes for SPR 34770
  9. 01n,27sep01,p_r  Fixes for SPR#34770
  10. 01m,11jun99,drm  Changing unicast piggy-backing default to FALSE.
  11. 01l,24may99,drm  added vxfusion prefix to VxFusion related includes
  12. 01k,11sep98,drm  added #include to pick up distPanic()
  13. 01j,12aug98,drm  added #include stmt for distObjTypeP.h
  14. 01i,08aug98,drm  added code to set broadcast flag when sending broadcast msgs
  15. 01h,08apr98,ur   made `piggy backing' switchable with distNodeCtl()
  16. 01g,13mar98,ur   added support for multiple crashed and operational hooks
  17. 01f,29jan98,ur   added support for sending negative acknowledgments
  18. 01e,23jan98,ur   bug fixed in the delivery of reassembled packets
  19. 01d,20jan98,ur   splited distNodeInit() in two parts
  20. 01c,19jan98,ur   bug fixed in distNodePktAck()
  21. 01b,16jan98,ur   XACK contains state of sending node
  22. 01a,10jun97,ur   written.
  23. */
  24. /*
  25. DESCRIPTION
  26. This library contains the node database, and routines to handle it.
  27. For every node in the system, one entry should be placed in the database.
  28. The database knows about the state of the node (as it looks from the local
  29. node) and about communication between the local node and the remote one.
  30. INTERNAL
  31. The semaphore <distNodeDbSem> is used to lock the node database.  The
  32. database is a hash table of nodes in the system.  Packets chained for
  33. input/output are part of each node's entry.  The table must be locked
  34. when entering an entry.  It is also locked when finding and entry, although
  35. it is not clear that this is necessary because entries are never removed.
  36. The DIST_NODE_COMM structures are the packet queues.  The database must
  37. be locked when dealing with these structures.  It is believed that this
  38. is now true.  Because of the way the communications routines are called,
  39. <distNodeDbSem> is now a mutex, and is sometimes called recursively.
  40. AVAILABILITY
  41. This module is distributed as a component of the unbundled distributed
  42. message queues option, VxFusion.
  43. */
  44. #include "vxWorks.h"
  45. #undef  DIST_DIAGNOSTIC   /* defining this seems to break VxFusion! */
  46. #undef  DIST_DIAGNOSTIC_SHOW
  47. #if defined (DIST_NODE_REPORT) 
  48.  || defined (DIST_DIAGNOSTIC) 
  49.  || defined (DIST_DIAGNOSTIC_SHOW)
  50. #include "stdio.h"
  51. #endif
  52. #include "stdlib.h"
  53. #include "hashLib.h"
  54. #include "semLib.h"
  55. #include "taskLib.h"
  56. #include "string.h"
  57. #include "errnoLib.h"
  58. #include "netinet/in.h"
  59. #include "private/distObjTypeP.h"
  60. #include "vxfusion/distLib.h"
  61. #include "vxfusion/distIfLib.h"
  62. #include "vxfusion/distNodeLib.h"
  63. #include "vxfusion/distStatLib.h"
  64. #include "vxfusion/private/distLibP.h"
  65. #include "vxfusion/private/distNodeLibP.h"
  66. #include "vxfusion/private/distTBufLibP.h"
  67. #include "vxfusion/private/distPktLibP.h"
  68. /* defines */
  69. /* make two macros look like macros */
  70. #define DIST_NODE_DB_LOCK    distNodeDbLock()
  71. #define DIST_NODE_DB_UNLOCK  distNodeDbUnlock()
  72. #define UNUSED_ARG(x)  if(sizeof(x)) {} /* to suppress compiler warnings */
  73. /* test if b is within [a,c) */
  74. #define winWithin(a, b, c) 
  75.     (((a <= b) && (b < c)) || ((c < a) && (a <= b)) || ((b < c) && (c < a)))
  76. /* add a to b and fold it back */
  77. #define winAdd(a, b) 
  78.     (((a) + (b)) & (DIST_IF_RNG_BUF_SZ - 1))
  79. /* sub a by b and fold it back */
  80. #define winSub(a, b) 
  81.     (((a) - (b)) & (DIST_IF_RNG_BUF_SZ - 1))
  82. #define KEY_ARG              65537        /* used by hash function */
  83. #define KEY_CMP_ARG          0            /* unused */
  84. #define DIST_NODE_MAX_HOOKS  8
  85. /* global variables */
  86. DIST_NODE_ID distNodeLocalId;   /* windSh needs this global */
  87. /*
  88.  * The semaphore used to be binary, but is now initialized as mutex.
  89.  * It cannot be changed back, because it is sometimes taken recursively,
  90.  * and will block if not mutex.
  91.  */
  92. SEM_ID       distNodeDbSem;     /* Should be taken before DIST_TBUF_FREEM */
  93. /* local variables */
  94. LOCAL HASH_ID     distNodeDbId;
  95. LOCAL BOOL        distNodeLibInstalled = FALSE;
  96. LOCAL int        distNodeNumNodesAll = 0;
  97. LOCAL int        distNodeNumNodesAlive = 0;
  98. LOCAL int        distNodeNumNodesOperational = 0;
  99. LOCAL int        distNodeLocalState;
  100. LOCAL DIST_NODE_ID    distNodeGodfatherId;
  101. LOCAL FUNCPTR        distNodeOperationalHook[DIST_NODE_MAX_HOOKS];
  102. LOCAL FUNCPTR        distNodeCrashedHook[DIST_NODE_MAX_HOOKS];
  103. LOCAL int        distNodeMaxRetries = DIST_NODE_MAX_RETRIES;
  104. LOCAL int        distNodeRetryTimeout = DIST_NODE_RETRY_TIMEOUT;
  105. LOCAL BOOL        distNodeSupportNACK = TRUE;   /* negative acknowlege */
  106. LOCAL BOOL        distNodeSupportPBB = FALSE;   /* piggy backing broadcast */
  107. LOCAL BOOL        distNodeSupportPBU = FALSE;   /* piggy backing unicast */
  108. /* local prototypes */
  109. LOCAL DIST_NODE_DB_NODE * distNodeFindById (DIST_NODE_ID nodeId);
  110. LOCAL BOOL   distNodeHCmp (DIST_NODE_DB_NODE *pMatchNode,
  111.                            DIST_NODE_DB_NODE *pHNode,
  112.                            int keyArg);
  113. LOCAL BOOL   distNodeHFunc (int elements, DIST_NODE_DB_NODE *pHNode,
  114.                             int keyArg);
  115. LOCAL STATUS distNodePktResend (DIST_NODE_COMM *pComm,
  116.                                 DIST_TBUF_HDR *pTBufHdr);
  117. LOCAL void   distNodeDBTimerTask (void);
  118. LOCAL void   distNodeDBTimer (void);
  119. LOCAL BOOL   distNodeDBNodeTimer (DIST_NODE_DB_NODE *pNode,
  120.                                   DIST_NODE_BTIMO *pBtimo);
  121. LOCAL void   distNodeDBCommTimer (DIST_NODE_DB_NODE *pNode,
  122.                                   DIST_NODE_BTIMO *pBtimo,
  123.                                   BOOL isBroadcastComm);
  124. LOCAL STATUS distNodeSendNegAck (DIST_NODE_DB_NODE *pNode,
  125.                                  short id,
  126.                                  short seq);
  127. LOCAL STATUS distNodeSendAck (DIST_NODE_DB_NODE *pNode,
  128.                               int ackBroadcast,
  129.                               int options);
  130. LOCAL void   distNodeCleanup (DIST_NODE_DB_NODE *pNode);
  131. LOCAL void   distNodeSetState (DIST_NODE_DB_NODE *pNode,
  132.                                int state);
  133. LOCAL STATUS distNodeSendBootstrap (DIST_NODE_ID dest,
  134.                                     int type,
  135.                                     int timeout);
  136. #if defined(DIST_DIAGNOSTIC) || defined(DIST_DIAGNOSTIC_SHOW)
  137. LOCAL BOOL         distNodeNodeShow (DIST_NODE_DB_NODE *pNode,
  138.                                      int dummy);
  139. LOCAL const char * distNodeStateToName (int state);
  140. #endif
  141. /***************************************************************************
  142. *
  143. * distNodeLibInit - initialize this module (VxFusion option)
  144. *
  145. * This routine currently does nothing.
  146. *
  147. * AVAILABILITY
  148. * This routine is distributed as a component of the unbundled distributed
  149. * message queues option, VxFusion.
  150. *
  151. * RETURNS: N/A
  152. * NOMANUAL
  153. */
  154. void distNodeLibInit (void)
  155.     {
  156.     }
  157. /***************************************************************************
  158. *
  159. * distNodeInit - initializes the node library (VxFusion option)
  160. *
  161. * This routine initializes the node database. The database can handle
  162. * up to 2^<sizeLog2> entries.
  163. *
  164. * AVAILABILITY
  165. * This routine is distributed as a component of the unbundled distributed
  166. * message queues option, VxFusion.
  167. *
  168. * RETURNS: OK, if successful.
  169. * NOMANUAL
  170. */
  171. STATUS distNodeInit
  172.     (
  173.     int sizeLog2   /* init database with 2^^sizeLog2 entries */
  174.     )
  175.     {
  176.     int    hashTblSizeLog2;
  177.     int    i, tid;
  178.     if (sizeLog2 < 1)
  179.         return (ERROR);
  180.     if (distNodeLibInstalled == TRUE)
  181.         return (OK);
  182.     for (i = 0; i < DIST_NODE_MAX_HOOKS; i++)
  183.         {
  184.         distNodeOperationalHook[i] = NULL;
  185.         distNodeCrashedHook[i] = NULL;
  186.         }
  187.     distNodeLocalState = DIST_NODE_STATE_BOOT;
  188.     /* Initialize the node database. */
  189.     if (hashLibInit () == ERROR)
  190.         return (ERROR);    /* hashLibInit() failed */
  191.     hashTblSizeLog2 = sizeLog2 - 1;
  192.     distNodeDbId = hashTblCreate (hashTblSizeLog2, distNodeHCmp,
  193.             distNodeHFunc, KEY_ARG);
  194.     if (distNodeDbId == NULL)
  195.         return (ERROR);    /* hashTblCreate() failed */
  196. #ifdef UNDEFINED
  197.     distNodeDbSem = semBCreate (SEM_Q_PRIORITY, SEM_FULL) ; 
  198. #else
  199.     distNodeDbSem = semMCreate (SEM_Q_PRIORITY + SEM_INVERSION_SAFE) ; 
  200. #endif
  201.     /* Get the node database manager running. */
  202.     tid = taskSpawn ("tDiNodeMgr",
  203.                      DIST_NODE_MGR_PRIO,
  204.                      VX_SUPERVISOR_MODE | VX_UNBREAKABLE,
  205.                      DIST_NODE_MGR_STACK_SZ,
  206.                      (FUNCPTR) distNodeDBTimerTask,
  207.                      0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
  208.     if (tid == ERROR)
  209.         return (ERROR);    /* taskSpawn() failed */
  210.     distNodeGodfatherId = DIST_IF_BROADCAST_ADDR;
  211.     /*
  212.      * Add a virtual broadcast node.
  213.      * State information (sliding windows, etc.) for outgoing broadcasts
  214.      * must be handled at a centralized place. This is the virtual
  215.      * broadcast node.
  216.      * State information for incoming broadcasts is managed within
  217.      * the broadcast communication substructure for the corresponding
  218.      * node. When a note enters the incorporation phase, all other
  219.      * nodes in the system get aware of it and update their node
  220.      * databases.
  221.      */
  222.     if (distNodeCreate (DIST_IF_BROADCAST_ADDR, DIST_NODE_STATE_OPERATIONAL)
  223.          ==
  224.         NULL)
  225.         {
  226.         return (ERROR);
  227.         }
  228.         
  229.     distNodeLibInstalled = TRUE;
  230.     return (OK);
  231.     }
  232. /***************************************************************************
  233. *
  234. * distNodeHCmp - compare function for hashing in node DB (VxFusion option)
  235. *
  236. * This routine is the hash compare function for node IDs.
  237. *
  238. * AVAILABILITY
  239. * This routine is distributed as a component of the unbundled distributed
  240. * message queues option, VxFusion.
  241. *
  242. * RETURNS: True, if node IDs match.
  243. * NOMANUAL
  244. */
  245. LOCAL BOOL distNodeHCmp
  246.     (
  247.     DIST_NODE_DB_NODE * pMatchNode, /* first node */
  248.     DIST_NODE_DB_NODE * pHNode,     /* second node */
  249.     int                 keyArg      /* unused arg */
  250.     )
  251.     {
  252.     DIST_NODE_ID  distNodeId1 = pMatchNode->nodeId;
  253.     DIST_NODE_ID  distNodeId2 = pHNode->nodeId;
  254.     
  255.     UNUSED_ARG(keyArg);
  256.     return (distNodeId1 == distNodeId2);
  257.     }
  258. /***************************************************************************
  259. *
  260. * distNodeHFunc - hash function for node DB (VxFusion option)
  261. *
  262. * This is the hash function for node IDs.
  263. *
  264. * AVAILABILITY
  265. * This routine is distributed as a component of the unbundled distributed
  266. * message queues option, VxFusion.
  267. *
  268. * RETURNS: A hash index.
  269. * NOMANUAL
  270. */
  271. LOCAL BOOL distNodeHFunc
  272.     (
  273.     int                 elements,  /* size of hash table */
  274.     DIST_NODE_DB_NODE * pHNode,    /* node whose ID to hash */
  275.     int                 divisor    /* used by hash computation */
  276.     )
  277.     {
  278.     return ((pHNode->nodeId % divisor) & (elements - 1));
  279.     }
  280. /***************************************************************************
  281. *
  282. * distNodeCtl - control function for node database (VxFusion option)
  283. *
  284. * This routine performs a control function on the node database.
  285. * The following functions are accepted:
  286. * is
  287. * i DIST_CTL_RETRY_TIMEOUT
  288. * Set send-timeout in ticks to start with. When no ACK is received
  289. * within the timeout, the packet is resent. The timeout for the
  290. * <n>th sending is: <n> * DIST_CTL_RETRY_TIMEOUT .
  291. * i DIST_CTL_MAX_RETRIES
  292. * Set a limit for number of retries, when sending fails.
  293. * i DIST_CTL_GET_LOCAL_ID
  294. * Get local node id.
  295. * i DIST_CTL_GET_LOCAL_STATE
  296. * Get state of local node.
  297. * i DIST_CTL_NACK_SUPPORT
  298. * Negative acknowledges (NACKs) are used for requesting a resend of a single
  299. * missing fragment from a packet. NACKs are sent immediately after a
  300. * fragment is found to be missing.
  301. * If <arg> is FALSE (0), the sending of negative acknowledges is disabled.
  302. * If <arg> is TRUE (1), sending of NACKs is enabled. This is the default.
  303. * i DIST_CTL_PGGYBAK_UNICST_SUPPORT
  304. * If this is enabled, the system waits a version dependent time until it
  305. * sends an acknowledge for a previously received packet.
  306. * If a data packet is sent to
  307. * the acknowledge awaiting host in meantime, the acknowlege is delivered
  308. * in that packet. This switch turns on/off piggy backing for unicast
  309. * communication only.
  310. * If <arg> is FALSE (0), piggy backing is disabled.
  311. * If <arg> is TRUE (1), piggy backing is enabled.
  312. * Piggy backing is enabled for unicast communication by default.
  313. * i DIST_CTL_PGGYBAK_BRDCST_SUPPORT
  314. * If this is enabled, the system waits
  315. * a version dependent time until it sends an
  316. * acknowledge for a previously received packet. If a data packet is sent to
  317. * the acknowledge awaiting host in meantime, the acknowlege is delivered
  318. * in that packet. This switch turns on/off piggy backing for broadcast
  319. * communication only.
  320. * If <arg> is FALSE (0), piggy backing is disabled.
  321. * If <arg> is TRUE (1), piggy backing is enabled.
  322. * Piggy backing is disabled for broadcast communication by default.
  323. * i DIST_CTL_OPERATIONAL_HOOK
  324. * Set a function to be called, each time a node shifts to operational state.
  325. * i DIST_CTL_CRASHED_HOOK
  326. * Set a function to be called, each time a node shifts to crashed state.
  327. * ie
  328. *
  329. * The prototype of the function hooked to the database, should look
  330. * like this:
  331. * cs
  332. *    void fnc (DIST_NODE_ID nodeStateChanged);
  333. * ce
  334. * RETURNS: OK, the value requested, or ERROR if <function> is unknown.
  335. *
  336. * ERRNO:
  337. * S_distLib_UNKNOWN_REQUEST
  338. *
  339. * AVAILABILITY
  340. * This routine is distributed as a component of the unbundled distributed
  341. * message queues option, VxFusion.
  342. * NOMANUAL
  343. */
  344. int distNodeCtl
  345.     (
  346.     int function,        /* function code      */
  347.     int arg              /* arbitrary argument */
  348.     )
  349.     {
  350.     int i;
  351.     switch (function)
  352.         {
  353.         case DIST_CTL_MAX_RETRIES:
  354.             if (arg < 0)
  355.                 return (ERROR);
  356.             distNodeMaxRetries = arg;
  357.             return (OK);
  358.         case DIST_CTL_RETRY_TIMEOUT:
  359.             if (arg < 0)
  360.                 return (ERROR);
  361.             /*
  362.              * <arg> is in ticks;
  363.              * <distNodeRetryTimeout> is in manager wakeups;
  364.              */
  365.             distNodeRetryTimeout = (1000 * arg / sysClkRateGet() +
  366.                     DIST_NODE_MGR_WAKEUP_MSEC - 1) / DIST_NODE_MGR_WAKEUP_MSEC;
  367.             return (OK);
  368.         case DIST_CTL_OPERATIONAL_HOOK:
  369.             {
  370.             for (i = 0; i < DIST_NODE_MAX_HOOKS; i++)
  371.                 {
  372.                 if (distNodeOperationalHook[i] == NULL)
  373.                     {
  374.                     distNodeOperationalHook[i] = (FUNCPTR) arg;
  375.                     return (OK);
  376.                     }
  377.                 }
  378.             return (ERROR);
  379.             }
  380.         case DIST_CTL_CRASHED_HOOK:
  381.             {
  382.             for (i = 0; i < DIST_NODE_MAX_HOOKS; i++)
  383.                 {
  384.                 if (distNodeCrashedHook[i] == NULL)
  385.                     {
  386.                     distNodeCrashedHook[i] = (FUNCPTR) arg;
  387.                     return (OK);
  388.                     }
  389.                 }
  390.             return (ERROR);
  391.             }
  392.         case DIST_CTL_GET_LOCAL_ID:
  393.             return (distNodeLocalId);
  394.         case DIST_CTL_GET_LOCAL_STATE:
  395.             return (distNodeLocalState);
  396.         case DIST_CTL_NACK_SUPPORT:
  397.             if ((BOOL) arg != TRUE && (BOOL) arg != FALSE)
  398.                 return (ERROR);
  399.             distNodeSupportNACK = (BOOL) arg;
  400.             return (OK);
  401.         case DIST_CTL_PGGYBAK_BRDCST_SUPPORT:
  402.             if ((BOOL) arg != TRUE && (BOOL) arg != FALSE)
  403.                 return (ERROR);
  404.             distNodeSupportPBB = (BOOL) arg;
  405.             return (OK);
  406.         case DIST_CTL_PGGYBAK_UNICST_SUPPORT:
  407.             if ((BOOL) arg != TRUE && (BOOL) arg != FALSE)
  408.                 return (ERROR);
  409.             distNodeSupportPBU = (BOOL) arg;
  410.             return (OK);
  411.         default:
  412.             errnoSet (S_distLib_UNKNOWN_REQUEST);
  413.             return (ERROR);
  414.         }
  415.     }
  416. /***************************************************************************
  417. *
  418. * distNodeSendBootstrap - send a bootstrap packet (VxFusion option)
  419. *
  420. * This routine sends a bootstrap packet.
  421. *
  422. * AVAILABILITY
  423. * This routine is distributed as a component of the unbundled distributed
  424. * message queues option, VxFusion.
  425. *
  426. * RETURNS: OK, if packet sent.
  427. * NOMANUAL
  428. */
  429. LOCAL STATUS distNodeSendBootstrap
  430.     (
  431.     DIST_NODE_ID dest,     /* destination node */
  432.     int          type,     /* bootstrap type */
  433.     int          timeout   /* xmit timeout */
  434.     )
  435.     {
  436.     DIST_TBUF_HDR * pTBufHdr;
  437.     DIST_PKT_BOOT * pPkt;
  438.     DIST_TBUF *     pTBuf;
  439.     
  440.     if ((pTBufHdr = DIST_TBUF_HDR_ALLOC ()) == NULL)
  441.         return (ERROR);             /* out of TBufs */
  442.     pTBufHdr->tBufHdrPrio = DIST_BOOTSTRAP_PRIO;
  443.     pTBufHdr->tBufHdrDest = dest;
  444.     pTBufHdr->tBufHdrOverall = sizeof (*pPkt);
  445.     pTBufHdr->tBufHdrTimo = timeout;
  446.     
  447.     
  448.     if ((pTBuf = DIST_TBUF_ALLOC ()) == NULL)
  449.         {
  450.         DIST_TBUF_FREEM (pTBufHdr);
  451.         return (ERROR);                /* out of TBufs */
  452.         }
  453.         
  454.     pTBuf->tBufFlags = DIST_TBUF_FLAG_HDR | DIST_TBUF_FLAG_BROADCAST;
  455.     pTBuf->tBufType = DIST_TBUF_TTYPE_BOOTSTRAP;
  456.     pTBuf->tBufSeq = 0;
  457.     pTBuf->tBufNBytes = sizeof (*pPkt);
  458.     pPkt = (DIST_PKT_BOOT *) pTBuf->pTBufData;
  459.     
  460.     DIST_TBUF_ENQUEUE (pTBufHdr, pTBuf);
  461.     pPkt->pktBootType = (UINT8) type;
  462.     return (distNodePktSend (pTBufHdr));
  463.     }
  464. /***************************************************************************
  465. *
  466. * distNodeBootstrap - broadcast a bootstrap packet (VxFusion option)
  467. *
  468. * This routine broadcasts a bootstrap packet.
  469. *
  470. * AVAILABILITY
  471. * This routine is distributed as a component of the unbundled distributed
  472. * message queues option, VxFusion.
  473. *
  474. * RETURNS: OK, if broadcast succeeds.
  475. * NOMANUAL
  476. */
  477. STATUS distNodeBootstrap
  478.     (
  479.     int    timeout   /* xmit timeout */
  480.     )
  481.     {
  482.     STATUS status;
  483.     status = distNodeSendBootstrap (DIST_IF_BROADCAST_ADDR,
  484.                                     DIST_BOOTING_REQ, timeout);
  485.     return (status);
  486.     }
  487. /***************************************************************************
  488. *
  489. * distNodeFindById - find node in node database (VxFusion option)
  490. *
  491. * This routine tries to find the information node with node id <nodeId>
  492. * in the node database.
  493. *
  494. * This routine takes <distNodeDbLock>.  Usually, it will be called with
  495. * the lock already taken, but not always, so the lock is taken here
  496. * to be safe.
  497. *
  498. * RETURNS: Pointer to the node entry, or NULL if not found.
  499. *
  500. * AVAILABILITY
  501. * This routine is distributed as a component of the unbundled distributed
  502. * message queues option, VxFusion.
  503. *
  504. * NOMANUAL
  505. */
  506. LOCAL DIST_NODE_DB_NODE * distNodeFindById
  507.     (
  508.     DIST_NODE_ID    nodeId   /* ID of node to find */
  509.     )
  510.     {
  511.     DIST_NODE_DB_NODE   distNodeMatch;
  512.     DIST_NODE_DB_NODE * pDistNodeFound;
  513.     distNodeMatch.nodeId = nodeId;
  514.     
  515.     DIST_NODE_DB_LOCK;
  516.     pDistNodeFound =
  517.        (DIST_NODE_DB_NODE *) hashTblFind (distNodeDbId,
  518.                                           (HASH_NODE *) &distNodeMatch,
  519.                                           KEY_CMP_ARG);
  520.     DIST_NODE_DB_UNLOCK;
  521.     return (pDistNodeFound);
  522.     }
  523. /***************************************************************************
  524. *
  525. * distNodeCreate - create a new node (VxFusion option)
  526. *
  527. * This routine creates a new node with id <distNodeId>.
  528. * distNodeCreate() returns a pointer to the DB entry.
  529. *
  530. * This function takes the DB node database lock before inserting the
  531. * node into the database.
  532. *
  533. * AVAILABILITY
  534. * This routine is distributed as a component of the unbundled distributed
  535. * message queues option, VxFusion.
  536. *
  537. * RETURNS: Pointer to node, or NULL.
  538. * NOMANUAL
  539. */
  540. DIST_NODE_DB_NODE * distNodeCreate
  541.     (
  542.     DIST_NODE_ID    nodeId,   /* ID for new node */
  543.     int             state     /* state of new node */
  544.     )
  545.     {
  546.     DIST_NODE_DB_NODE * pNodeNew;
  547.     DIST_NODE_BFLD *    pBFld;
  548.     int                 sz;
  549.     /* This node is new to the database. */
  550. #ifdef DIST_NODE_REPORT
  551.     distLog("distNodeCreate: Creating node 0x%lx with state %dn",
  552.             nodeId, state);
  553. #endif
  554.     /* allocate needed space; quit if we can't get it */
  555.     
  556.     pNodeNew = (DIST_NODE_DB_NODE *) malloc (sizeof (DIST_NODE_DB_NODE));
  557.     if (pNodeNew == NULL)
  558.         {
  559.         distStat.memShortage++;
  560.         return (NULL);
  561.         }
  562.     sz = DIST_NODE_BFLD_SIZEOF (DIST_IF_RNG_BUF_SZ);
  563.     pBFld = (DIST_NODE_BFLD *) malloc (4 * sz);
  564.     if (pBFld == NULL)
  565.         {
  566.         free(pNodeNew);          /* no memory leaks here */
  567.         distStat.memShortage++;
  568.         return (NULL);
  569.         }
  570.     /* now fill in DIST_NODE_DB_NODE info */
  571.     bzero ((char *) pNodeNew, sizeof (DIST_NODE_DB_NODE));
  572.     pNodeNew->nodeId = nodeId;
  573.     pNodeNew->nodeBroadcast.commAckLastRecvd = (INT16) winSub (0, 1);
  574.     pNodeNew->nodeState = state;
  575.     distNodeNumNodesAll++;
  576.     if (DIST_NODE_STATE_IS_ALIVE (state))
  577.         distNodeNumNodesAlive++;
  578.     if (state == DIST_NODE_STATE_OPERATIONAL)
  579.         distNodeNumNodesOperational++;
  580.     pNodeNew->nodeState = state;
  581.     bzero ((char *) pBFld, 2 * sz);
  582.     pNodeNew->nodeUnicast.pCommCompleted = pBFld;
  583.     pNodeNew->nodeBroadcast.pCommCompleted = ((char *) pBFld) + sz;
  584.     
  585.     /* now link the node into the data base */
  586.     
  587.     DIST_NODE_DB_LOCK;
  588.     hashTblPut (distNodeDbId, (HASH_NODE *)pNodeNew);
  589.     DIST_NODE_DB_UNLOCK;
  590.     return (pNodeNew);
  591.     }
  592. /***************************************************************************
  593. *
  594. * distNodeSetState - set state of a node (VxFusion option)
  595. *
  596. * This routine is used to change the state of a node.  It is only called
  597. * from two places in distNodeReassemble() which takes the node DB lock.
  598. *
  599. * AVAILABILITY
  600. * This routine is distributed as a component of the unbundled distributed
  601. * message queues option, VxFusion.
  602. *
  603. * RETURNS: N/A
  604. * NOMANUAL
  605. */
  606. LOCAL void distNodeSetState
  607.     (
  608.     DIST_NODE_DB_NODE * pNode,     /* node */
  609.     int                 newState   /* new state */
  610.     )
  611.     {
  612.     int             curState = pNode->nodeState;
  613.     DIST_NODE_ID    nodeId   = pNode->nodeId;
  614.     int             i;
  615. #ifdef DIST_DIAGNOSTIC
  616.     if (curState != newState)
  617.         distLog ("distNodeSetState: state of node 0x%lx shifts from %s to %sn",
  618.                  nodeId,
  619.                  distNodeStateToName (curState),
  620.                  distNodeStateToName (newState));
  621. #endif
  622.     if ((! DIST_NODE_STATE_IS_ALIVE (curState)) &&
  623.          DIST_NODE_STATE_IS_ALIVE (newState))
  624.         {
  625.         distNodeNumNodesAlive++;
  626.         }
  627.         
  628.     if (DIST_NODE_STATE_IS_ALIVE (curState) &&
  629.         (! DIST_NODE_STATE_IS_ALIVE (newState)))
  630.         {
  631.         distNodeNumNodesAlive--;
  632.         }
  633.     if (curState < DIST_NODE_STATE_OPERATIONAL &&
  634.             newState == DIST_NODE_STATE_OPERATIONAL)
  635.         {
  636.         distNodeNumNodesOperational++;
  637.         for (i = 0; i < DIST_NODE_MAX_HOOKS; i++)
  638.             {
  639.             if (distNodeOperationalHook[i] != NULL)
  640.                 (* (distNodeOperationalHook[i])) (nodeId);
  641.             }
  642.         pNode->nodeState = newState;
  643.         return;
  644.         }
  645.     if (curState == DIST_NODE_STATE_OPERATIONAL &&
  646.         newState < DIST_NODE_STATE_OPERATIONAL)
  647.         {
  648.         distNodeNumNodesOperational--;
  649.         if (newState == DIST_NODE_STATE_CRASHED)
  650.             {
  651.             for (i = 0; i < DIST_NODE_MAX_HOOKS; i++)
  652.                 {
  653.                 if (distNodeCrashedHook[i] != NULL)
  654.                     (* (distNodeCrashedHook[i])) (nodeId);
  655.                 }
  656.             }
  657.         pNode->nodeState = newState;
  658.         return;
  659.         }
  660.     pNode->nodeState = newState;
  661.     }
  662. /***************************************************************************
  663. *
  664. * distNodeOperational - sets a node to be alive (VxFusion option)
  665. *
  666. * This routine changes the state of the node with the id <distNodeId>
  667. * to OPERATIONAL.
  668. *
  669. * This routine takes <distNodeDbLock>.
  670. *
  671. * AVAILABILITY
  672. * This routine is distributed as a component of the unbundled distributed
  673. * message queues option, VxFusion.
  674. *
  675. * RETURNS: Pointer to node, or NULL.
  676. * NOMANUAL
  677. */
  678. DIST_NODE_DB_NODE * distNodeOperational
  679.     (
  680.     DIST_NODE_ID   distNodeId  /* node ID */
  681.     )
  682.     {
  683.     DIST_NODE_DB_NODE * pNodeFound;
  684.     int                 i;
  685.     DIST_NODE_DB_LOCK;
  686.      
  687.     if ((pNodeFound = distNodeFindById (distNodeId)) != NULL)
  688.         {
  689.         /*
  690.          * This node is already in the node database. If state is already
  691.          * OPERATIONAL, do nothing - we even have not recognized the absence.
  692.          */
  693.         if (! DIST_NODE_IS_ALIVE (pNodeFound))
  694.             distNodeNumNodesAlive++;
  695.         if (pNodeFound->nodeState < DIST_NODE_STATE_OPERATIONAL)
  696.             {
  697. #ifdef DIST_DIAGNOSTIC
  698.         distLog ("distNodeOperational: Set node 0x%lx to state OPERATIONALn",
  699.                 distNodeId);
  700. #endif
  701.             distNodeNumNodesOperational++;
  702.             for (i = 0; i < DIST_NODE_MAX_HOOKS; i++)
  703.                 {
  704.                 if (distNodeOperationalHook[i] != NULL)
  705.                     (* (distNodeOperationalHook[i])) (distNodeId);
  706.                 }
  707.             }
  708.         pNodeFound->nodeState = DIST_NODE_STATE_OPERATIONAL;
  709.         }
  710.     
  711.     DIST_NODE_DB_UNLOCK;
  712.     
  713.     return (pNodeFound);
  714.     }
  715. /***************************************************************************
  716. *
  717. * distNodeCrashed - sets a node to be dead (VxFusion option)
  718. *
  719. * This routine changes the state of the node with the id <distNodeId>
  720. * to CRASHED.
  721. *
  722. * This routine takes <distNodeDbLock>.
  723. *
  724. * AVAILABILITY
  725. * This routine is distributed as a component of the unbundled distributed
  726. * message queues option, VxFusion.
  727. *
  728. * RETURNS: Pointer to node, or NULL.
  729. * NOMANUAL
  730. */
  731. DIST_NODE_DB_NODE * distNodeCrashed
  732.     (
  733.     DIST_NODE_ID    distNodeId  /* node ID */
  734.     )
  735.     {
  736.     DIST_NODE_DB_NODE * pNodeFound;
  737.     int                 i;
  738.     /* you cannot kill the broadcast node */
  739.     
  740.     if (distNodeId == DIST_IF_BROADCAST_ADDR)
  741.         return (NULL);
  742.     
  743.     DIST_NODE_DB_LOCK;
  744.     /* find the node */
  745.     if ((pNodeFound = distNodeFindById (distNodeId)) != NULL)
  746.         {
  747.         if (DIST_NODE_IS_ALIVE (pNodeFound))
  748.             distNodeNumNodesAlive--;
  749.         if (pNodeFound->nodeState >= DIST_NODE_STATE_OPERATIONAL)
  750.             {
  751. #ifdef DIST_DIAGNOSTIC
  752.         distLog ("distNodeCrashed: Set node 0x%lx to state CRASHEDn",
  753.                 distNodeId);
  754. #endif
  755.             distNodeNumNodesOperational--;
  756.             for (i = 0; i < DIST_NODE_MAX_HOOKS; i++)
  757.                 {
  758.                 if (distNodeCrashedHook[i] != NULL)
  759.                     (* (distNodeCrashedHook[i])) (distNodeId);
  760.                 }
  761.             }
  762.         pNodeFound->nodeState = DIST_NODE_STATE_CRASHED;
  763.         }
  764.     
  765.     DIST_NODE_DB_UNLOCK;
  766.     
  767.     return (pNodeFound);
  768.     }
  769. /***************************************************************************
  770. *
  771. * distNodeLocalGetId - returns the local node id (VxFusion option)
  772. *
  773. * This routine returns the id of the local node.
  774. *
  775. * AVAILABILITY
  776. * This routine is distributed as a component of the unbundled distributed
  777. * message queues option, VxFusion.
  778. *
  779. * RETURNS: Local node ID.
  780. * NOMANUAL
  781. */
  782. DIST_NODE_ID distNodeLocalGetId (void)
  783.     {
  784.     return (distNodeLocalId);
  785.     }
  786. /***************************************************************************
  787. *
  788. * distNodeLocalSetId - sets the local node id (VxFusion option)
  789. *
  790. * This routine sets the id of the local node.
  791. *
  792. * AVAILABILITY
  793. * This routine is distributed as a component of the unbundled distributed
  794. * message queues option, VxFusion.
  795. *
  796. * RETURNS: N/A
  797. * NOMANUAL
  798. */
  799. void distNodeLocalSetId
  800.     (
  801.     DIST_NODE_ID    myNodeId    /* node ID */
  802.     )
  803.     {
  804.     distNodeLocalId = myNodeId;
  805.     }
  806. /***************************************************************************
  807. *
  808. * distNodeLocalSetState - sets the state of the local node (VxFusion option)
  809. *
  810. * This routine sets the state of the local node.
  811. *
  812. * AVAILABILITY
  813. * This routine is distributed as a component of the unbundled distributed
  814. * message queues option, VxFusion.
  815. *
  816. * RETURNS: N/A
  817. * NOMANUAL
  818. */
  819. void distNodeLocalSetState
  820.     (
  821.     int    state    /* new state for local node */
  822.     )
  823.     {
  824.     distNodeLocalState = state;
  825.     }
  826. /***************************************************************************
  827. *
  828. * distNodeLocalGetState - get state of the local node (VxFusion option)
  829. *
  830. * This routine returns the state of the local node.
  831. *
  832. * AVAILABILITY
  833. * This routine is distributed as a component of the unbundled distributed
  834. * message queues option, VxFusion.
  835. *
  836. * RETURNS: Node state.
  837. * NOMANUAL
  838. */
  839. int distNodeLocalGetState (void)
  840.     {
  841.     return (distNodeLocalState);
  842.     }
  843. /***************************************************************************
  844. *
  845. * distNodeGetNumNodes - returns the number of nodes in a specified state (VxFusion option)
  846. *
  847. * This routine returns the number of nodes currently in a specified state.
  848. *
  849. * AVAILABILITY
  850. * This routine is distributed as a component of the unbundled distributed
  851. * message queues option, VxFusion.
  852. *
  853. * RETURNS: Count of nodes, or -1 if <typeSet> is invalid.
  854. * NOMANUAL
  855. */
  856. int distNodeGetNumNodes
  857.     (
  858.     int    typeSet   /* state to look for */
  859.     )
  860.     {
  861.     switch (typeSet)
  862.         {
  863.         case DIST_NODE_NUM_NODES_ALL:
  864.             return (distNodeNumNodesAll);
  865.         case DIST_NODE_NUM_NODES_ALIVE:
  866.             return (distNodeNumNodesAlive);
  867.         case DIST_NODE_NUM_NODES_OPERATIONAL:
  868.             return (distNodeNumNodesOperational);
  869.         default:
  870.             return (-1);
  871.         }
  872.     }
  873. /***************************************************************************
  874. *
  875. * distNodeGetGodfatherId - get the id of our godfather (VxFusion option)
  876. *
  877. * This routine returns the id of this node's godfather.
  878. * If no godfather is available, distNodeGetGodfatherId() returns ERROR,
  879. * else OK.
  880. *
  881. * AVAILABILITY
  882. * This routine is distributed as a component of the unbundled distributed
  883. * message queues option, VxFusion.
  884. *
  885. * RETURNS: OK, if godfather exists.
  886. * NOMANUAL
  887. */
  888. STATUS distNodeGetGodfatherId
  889.     (
  890.     DIST_NODE_ID * nodeId  /* where to return godfather ID */
  891.     )
  892.     {
  893.     if ((*nodeId = distNodeGodfatherId) == DIST_IF_BROADCAST_ADDR)
  894.         return (ERROR);
  895.     return (OK);
  896.     }
  897. #ifdef UNDEFINED
  898. /* This routine does not seem to be used */
  899. /***************************************************************************
  900. *
  901. * distNodeTouch - touch a node (increment message counter) (VxFusion option)
  902. *
  903. * This routine touches a node.
  904. *
  905. * NOTE: Must take <distNodeDbLock> before.
  906. *
  907. * AVAILABILITY
  908. * This routine is distributed as a component of the unbundled distributed
  909. * message queues option, VxFusion.
  910. *
  911. * RETURNS: OK, unless <distNodeId> is invalid.
  912. * NOMANUAL
  913. */
  914. STATUS distNodeTouch
  915.     (
  916.     DIST_NODE_ID    distNodeId   /* ID of node to touch */
  917.     )
  918.     {
  919.     DIST_NODE_DB_NODE * pDistNodeFound;
  920.     if ((pDistNodeFound = distNodeFindById (distNodeId)) == NULL)
  921.         {
  922.         /*  Node does not exist. Should never happen. */
  923.         distStat.nodeDBNoMatch++;
  924.         return (ERROR);
  925.         }
  926.     DIST_NODE_IN_PKT_INC(pDistNodeFound);
  927.     return (OK);
  928.     }
  929. #endif
  930. /***************************************************************************
  931. *
  932. * distNodeEach - call a routine for each node in the database (VxFusion option)
  933. *
  934. * This routine calls a user-supplied routine once for each node.
  935. * The user-supplied routine should return TRUE if distNodeEach() is to
  936. * continue calling it with the remaining nodes, or FALSE if it is done and
  937. * distNodeEach() can exit.
  938. *
  939. * distNodeEach() returns NULL if traversed whole database, or pointer to
  940. * node entry that distNodeEach() ended with.
  941. *
  942. * NOTE: Takes <distNodeDbLock>.
  943. *
  944. * AVAILABILITY
  945. * This routine is distributed as a component of the unbundled distributed
  946. * message queues option, VxFusion.
  947. *
  948. * RETURNS: Pointer to last node visited.
  949. * NOMANUAL
  950. */
  951. DIST_NODE_DB_NODE * distNodeEach
  952.     (
  953.     FUNCPTR    routine,      /* function to call at each node */
  954.     int        routineArg    /* argument to function */
  955.     )
  956.     {
  957.     DIST_NODE_DB_NODE * lastNode;
  958.     distNodeDbLock ();
  959.     lastNode = (DIST_NODE_DB_NODE *) hashTblEach (distNodeDbId,
  960.                                                   routine,
  961.                                                   routineArg);
  962.     distNodeDbUnlock ();
  963.     return (lastNode);
  964.     }
  965. /***************************************************************************
  966. *
  967. * distNodeCleanup - Flush out all communication queues (VxFusion option)
  968. *
  969. * This routine flushes all communication queues of <pNode>.  It is only
  970. * called from two places in distNodeReassemble() which takes the node
  971. * DB lock first.
  972. *
  973. * AVAILABILITY
  974. * This routine is distributed as a component of the unbundled distributed
  975. * message queues option, VxFusion.
  976. *
  977. * RETURNS: N/A
  978. * NOMANUAL
  979. */
  980. LOCAL void distNodeCleanup
  981.     (
  982.     DIST_NODE_DB_NODE * pNode    /* node to clean-up */
  983.     )
  984.     {
  985.     DIST_TBUF_HDR *  pPkt;
  986.     DIST_NODE_COMM * pCommUnicast, *pCommBroadcast;
  987.     pCommUnicast = &pNode->nodeUnicast;
  988.     pCommBroadcast = &pNode->nodeBroadcast;
  989.     /*
  990.      * Clean up communication structures of bootstrapping
  991.      * node.
  992.      *
  993.      * Delete all unicast packets waiting for reassemly.
  994.      */
  995.     while (pCommUnicast->pCommQReass)
  996.         {
  997.         DIST_TBUF_HDR_DEQUEUE (pCommUnicast->pCommQReass, pPkt);
  998.         DIST_TBUF_FREEM (pPkt);
  999.         }
  1000.     /*
  1001.      * Dequeue all unicast packets waiting for a slide in
  1002.      * the window. Wakeup sender.
  1003.      */
  1004.     while (pCommUnicast->pCommQWinOut)
  1005.         {
  1006.         DIST_TBUF_HDR_DEQUEUE (pCommUnicast->pCommQWinOut, pPkt);
  1007.         pPkt->pTBufHdrTm->tmStatus = DIST_TM_STATUS_UNREACH;
  1008.         semGive (&pPkt->pTBufHdrTm->tmWait4);
  1009.         /* distNodePktSend() frees packet */
  1010.         }
  1011.     /*
  1012.      * Dequeue all unicast packets waiting for an ACK.
  1013.      * Wakeup sender.
  1014.      */
  1015.     while (pCommUnicast->pCommQAck)
  1016.         {
  1017.         DIST_TBUF_HDR_DEQUEUE (pCommUnicast->pCommQAck, pPkt);
  1018.         pPkt->pTBufHdrTm->tmStatus = DIST_TM_STATUS_UNREACH;
  1019.         semGive (&pPkt->pTBufHdrTm->tmWait4);
  1020.         /* distNodePktSend() frees packet */
  1021.         }
  1022.     
  1023.     bzero ((char *) pCommUnicast->pCommCompleted,
  1024.             DIST_NODE_BFLD_SIZEOF (DIST_IF_RNG_BUF_SZ));
  1025.     pCommUnicast->commPktNextExpect = 0;
  1026.     pCommUnicast->commAckNextExpect = 0;
  1027.     pCommUnicast->commPktNextSend = 0;
  1028.     pCommUnicast->commAckDelayed = FALSE;
  1029.     /* Delete all broadcast packets waiting for reassembly. */
  1030.     while (pCommBroadcast->pCommQReass)
  1031.         {
  1032.         DIST_TBUF_HDR_DEQUEUE (pCommBroadcast->pCommQReass, pPkt);
  1033.         DIST_TBUF_FREEM (pPkt);
  1034.         }
  1035.     
  1036.     bzero ((char *) pCommBroadcast->pCommCompleted,
  1037.             DIST_NODE_BFLD_SIZEOF (DIST_IF_RNG_BUF_SZ));
  1038.     }
  1039. /***************************************************************************
  1040. *
  1041. * distNodeGetReassembled - get a reassembled packet (VxFusion option)
  1042. *
  1043. * This routine gets a reassembled data packet.  It takes the node DB lock.
  1044. *
  1045. * AVAILABILITY
  1046. * This routine is distributed as a component of the unbundled distributed
  1047. * message queues option, VxFusion.
  1048. *
  1049. * RETURNS: Ptr to tbuf header.
  1050. * NOMANUAL
  1051. */
  1052. DIST_TBUF_HDR * distNodeGetReassembled
  1053.     (
  1054.     DIST_NODE_COMM * pComm  /* ptr to comm node */
  1055.     )
  1056.     {
  1057.     DIST_TBUF_HDR *  pHead;         /* head of tbufs */
  1058.     short            nextExpected;  /* next expected seq number */
  1059.     DIST_TBUF_HDR *  pNext;         /* next tbuf */
  1060.     distNodeDbLock ();    
  1061.     pHead = pComm->pCommQNextDeliver;
  1062.     nextExpected = pComm->commPktNextExpect;
  1063.     if (pHead && pHead->tBufHdrId == nextExpected &&
  1064.             DIST_NODE_BTST (pComm->pCommCompleted, nextExpected))
  1065.         {
  1066.         /* Remove the packet from the reassembly queue. */
  1067.         DIST_TBUF_HDR_UNLINK (pComm->pCommQReass, pHead);
  1068.         if (nextExpected == (DIST_IF_RNG_BUF_SZ - 1))
  1069.             pNext = pComm->pCommQReass;
  1070.         else
  1071.             pNext = pHead->pTBufHdrNext;
  1072.         if (pNext && winAdd (nextExpected, 1) != pNext->tBufHdrId)
  1073.             {
  1074.             /* if a complete packet is missing, we fall in here */
  1075.             pComm->pCommQNextDeliver = NULL;
  1076.             }
  1077.         else
  1078.             {
  1079.             /* next packet in ring is the one, we expect next */
  1080.             
  1081.             pComm->pCommQNextDeliver = pNext;
  1082.             }
  1083.         distNodeDbUnlock ();            /* UNLOCK */
  1084. #ifdef DIST_NODE_REPORT
  1085.         /* this is a hack */
  1086.         printf ("distNodeGetReassembled: return %p (type %d/%d)n", pHead,
  1087.                 *((char *) (DIST_TBUF_GET_NEXT (pHead)->pTBufData)),
  1088.                 *((char *) (DIST_TBUF_GET_NEXT (pHead)->pTBufData) + 1));
  1089. #endif
  1090.         return (pHead);
  1091.         }
  1092.     distNodeDbUnlock ();                /* UNLOCK */
  1093. #ifdef DIST_NODE_REPORT
  1094.     printf ("distNodeGetReassembled: no packet availablen");
  1095. #endif
  1096.     return (NULL);
  1097.     }
  1098. /***************************************************************************
  1099. *
  1100. * distNodeReassemble - reassemble a packet (VxFusion option)
  1101. *
  1102. * This routine takes a telegram and tries to reassemble it with other
  1103. * fragments already stored in the reassembly list.
  1104. * distNodeReassemble() updates the communication windows managed
  1105. * within the database.
  1106. *
  1107. * If <nodeIdSrc> is unknown by now, distNodeReassemble() creates a
  1108. * new node in the node database. Creating a node is only allowed when
  1109. * receiving a XACK telegram in bootstrap mode or a BOOTSTRAP telegram.
  1110. *
  1111. * If a packet has been reassembled successfully, distNodeReassemble()
  1112. * returns a pointer to the communication structure. Else a NULL pointer
  1113. * is returned.
  1114. *
  1115. * NOTE: Takes <distNodeDbLock>.
  1116. *
  1117. * AVAILABILITY
  1118. * This routine is distributed as a component of the unbundled distributed
  1119. * message queues option, VxFusion.
  1120. *
  1121. * RETURNS: Ptr to comm node.
  1122. * NOMANUAL
  1123. */
  1124. DIST_NODE_COMM * distNodeReassemble
  1125.     (
  1126.     DIST_NODE_ID nodeIdSrc,    /* source node */
  1127.     int          prio,         /* priority */
  1128.     DIST_TBUF *  pTBufNew      /* ptr to tbuf */
  1129.     )
  1130.     {
  1131.     DIST_NODE_DB_NODE *   pNode;
  1132.     DIST_NODE_DB_NODE *   pNodeSrc;
  1133.     DIST_NODE_COMM *      pCommIn;
  1134.     DIST_NODE_COMM *      pCommOut;
  1135.     DIST_TBUF_HDR *       pReassHdr;
  1136.     DIST_TBUF_HDR *       pReassHdrPrev;
  1137.     DIST_TBUF_HDR *       pTBufAck;
  1138.     DIST_TBUF_HDR *       pTBufAckNext;
  1139.     DIST_TBUF_HDR *       pTBufHdrNew;
  1140.     short                 winLo, winHi, pktNext;
  1141.     short                 id;
  1142.     short                 ack;
  1143.     short                 seq;
  1144.     short                 nBytes;
  1145.     short                 type;
  1146.     int                   hasMf;
  1147.     /* Get information from telegram */
  1148.     
  1149.     id = pTBufNew->tBufId;
  1150.     ack = pTBufNew->tBufAck;
  1151.     seq = pTBufNew->tBufSeq;
  1152.     type = pTBufNew->tBufType;
  1153.     nBytes = pTBufNew->tBufNBytes;
  1154.     hasMf = DIST_TBUF_HAS_MF (pTBufNew);
  1155.     distNodeDbLock ();
  1156.     /* Find node in hash table. */
  1157.     
  1158.     pNodeSrc = distNodeFindById (nodeIdSrc);
  1159.     
  1160.     if (pNodeSrc == NULL)
  1161.         {
  1162.         if (nodeIdSrc == distNodeLocalId)
  1163.             distPanic ("distNodePktReass: someone has same node id we haven");
  1164.         /*
  1165.          * Node is unknown by now. Create new node, if this is a
  1166.          * XACK in bootstrapping mode or this is a BOOTSTRAP telegram.
  1167.          * Else ignore telegram.
  1168.          */
  1169.         if (distNodeLocalState == DIST_NODE_STATE_BOOT
  1170.                 && type == DIST_TBUF_TTYPE_BACK
  1171.                 && nBytes == sizeof (DIST_XACK))
  1172.             {
  1173.             DIST_XACK    *pXAck = (DIST_XACK *) pTBufNew->pTBufData;
  1174.             int            state;
  1175.             state = ntohs (pXAck->xAckPktNodeState);
  1176.             if ((pNodeSrc = distNodeCreate (nodeIdSrc, state)) == NULL)
  1177.                 {
  1178.                 /*
  1179.                  * We had problems creating the new node.
  1180.                  * This is a fatal error, since we will miss that one.
  1181.                  */
  1182.                 distStat.nodeDBFatal++;
  1183.                 DIST_TBUF_FREE (pTBufNew);
  1184.                 distPanic ("distNodePktReass: error accepting node 0x%lxn",
  1185.                         nodeIdSrc);
  1186.                 }
  1187.             }
  1188.         else if (type == DIST_TBUF_TTYPE_BOOTSTRAP)
  1189.             {
  1190.             int state = DIST_NODE_STATE_BOOT;
  1191. #ifdef DIST_NODE_REPORT
  1192.             printf ("distNodeReassemble: got bootstrap pkt from node 0x%lxn",
  1193.                     nodeIdSrc);
  1194. #endif
  1195.             if ((pNodeSrc = distNodeCreate (nodeIdSrc, state)) == NULL)
  1196.                 {
  1197.                 /*
  1198.                  * We had problems creating the new node.
  1199.                  * This is a fatal error, since we will miss that one.
  1200.                  */
  1201.                 distStat.nodeDBFatal++;
  1202.                 DIST_TBUF_FREE (pTBufNew);
  1203.                 distPanic ("distNodePktReass: error accepting node 0x%lxn",
  1204.                         nodeIdSrc);
  1205.                 }
  1206.             /*
  1207.              * If this is a BOOTSTRAP telegram, we have to clean up
  1208.              * the communication structures.
  1209.              */
  1210.             distNodeCleanup (pNodeSrc);
  1211.             pNodeSrc->nodeBroadcast.commPktNextExpect = (INT16)winAdd (id, 1);
  1212.             }
  1213.         else
  1214.             {
  1215.             /*
  1216.              * Packets from unknown nodes are ignored, unless they
  1217.              * are of type XACK or BOOTSTRAP.
  1218.              */
  1219. #ifdef DIST_DIAGNOSTIC
  1220.             distLog ("distNodePktReass: packet from unknown node 0x%lx 
  1221.                      --ignoredn", nodeIdSrc);
  1222. #endif
  1223.             DIST_TBUF_FREE (pTBufNew);
  1224.             distNodeDbUnlock();
  1225.             return (NULL);
  1226.             }
  1227. #ifdef DIST_DIAGNOSTIC
  1228.         distLog ("distNodePktReass: new node 0x%lx created (state %s)n",
  1229.                  nodeIdSrc,
  1230.                  distNodeStateToName (distNodeGetState (pNodeSrc)));
  1231. #endif
  1232.         }
  1233.     if (DIST_TBUF_IS_BROADCAST (pTBufNew))
  1234.         {
  1235.         if ((pNode = distNodeFindById (DIST_IF_BROADCAST_ADDR)) == NULL)
  1236.             {
  1237.             distStat.nodeDBNoMatch++;
  1238.             DIST_TBUF_FREE (pTBufNew);
  1239.             distNodeDbUnlock();
  1240.             return (NULL);
  1241.             }
  1242.         pCommIn = &pNodeSrc->nodeBroadcast;
  1243.         pCommOut = &pNode->nodeBroadcast;
  1244.         }
  1245.     else
  1246.         pCommIn = pCommOut = &pNodeSrc->nodeUnicast;
  1247. #ifdef DIST_NODE_REPORT
  1248.     {
  1249.     char * typeName;
  1250.     switch (type)
  1251.         {
  1252.         case DIST_TBUF_TTYPE_DTA:
  1253.             typeName = "DATA";
  1254.             break;
  1255.         case DIST_TBUF_TTYPE_ACK:
  1256.             typeName = "ACK";
  1257.             break;
  1258.         case DIST_TBUF_TTYPE_BDTA:
  1259.             typeName = "B-DATA";
  1260.             break;
  1261.         case DIST_TBUF_TTYPE_BACK:
  1262.             typeName = "ACK-B";
  1263.             break;
  1264.         case DIST_TBUF_TTYPE_BOOTSTRAP:
  1265.             typeName = "BOOTSTRAP";
  1266.             break;
  1267.         case DIST_TBUF_TTYPE_NACK:
  1268.             typeName = "NACK";
  1269.             break;
  1270.         default:
  1271.             typeName = "unknown";
  1272.         }
  1273.     printf ("dist..Reass:0x%lx/%s: ", nodeIdSrc, typeName);
  1274.     printf ("id %d, ack %d, seq %d, type %d, len %d, hasMf %dn",
  1275.             id, ack, seq, pTBufNew->tBufType, nBytes, hasMf);
  1276.     }
  1277. #endif
  1278.     if (type == DIST_TBUF_TTYPE_BOOTSTRAP)
  1279.         {
  1280.         int    bootTp;
  1281.         if (distNodeGetState (pNodeSrc) != DIST_NODE_STATE_BOOT)
  1282.             {
  1283.             /*
  1284.              * If we receive a BOOTSTARP telegram from a node, that is
  1285.              * not in booting mode, it must be rebooting. Cleanup
  1286.              * the structure.
  1287.              */
  1288.             distNodeCleanup (pNodeSrc);
  1289.             }
  1290.         pNodeSrc->nodeBroadcast.commPktNextExpect = (INT16) winAdd (id, 1);
  1291.         bootTp = ((DIST_PKT_BOOT *) pTBufNew->pTBufData)->pktBootType;
  1292.         switch (bootTp)
  1293.             {
  1294.             case DIST_BOOTING_REQ:
  1295.                 /* Send XACK. */
  1296.                 
  1297.                 distNodeSetState (pNodeSrc, DIST_NODE_STATE_BOOT);
  1298.                 distNodeSendAck (pNodeSrc, 1, DIST_NODE_ACK_EXTENDED);
  1299.                 break;
  1300. #ifdef DIST_DIAGNOSTIC
  1301.             default:
  1302.                 distLog ("dist..Reass: unknown type of BOOTSTRAP telegramn");
  1303. #endif
  1304.             }
  1305.         distNodeDbUnlock ();
  1306.         DIST_TBUF_FREE (pTBufNew);
  1307.         return (NULL);
  1308.         }
  1309.     /* We will fall in here, when we receive a XACK. */
  1310.     
  1311.     if (type == DIST_TBUF_TTYPE_BACK && nBytes == sizeof (DIST_XACK))
  1312.         {
  1313.         DIST_XACK    *pXAck = (DIST_XACK *) pTBufNew->pTBufData;
  1314.         distNodeSetState (pNodeSrc, ntohs (pXAck->xAckPktNodeState));
  1315.         /* In bootstrapping mode. */
  1316.         if (distNodeLocalState == DIST_NODE_STATE_BOOT)
  1317.             {
  1318.             short        pktNextSend;
  1319. #ifdef DIST_NODE_REPORT
  1320.             printf ("dist..Reass: XACK in bootstrap moden");
  1321. #endif
  1322.             /*
  1323.              * The first node that answers to our BOOTSTRAP packet,
  1324.              * will be our godfather.
  1325.              */
  1326.             if (distNodeGodfatherId == DIST_IF_BROADCAST_ADDR)
  1327.                 distNodeGodfatherId = nodeIdSrc;
  1328.             pktNextSend = ntohs (pXAck->xAckPktNextSend);
  1329.             pCommIn->commPktNextExpect = pktNextSend;
  1330.             }
  1331.         distNodeDbUnlock ();
  1332.         DIST_TBUF_FREE (pTBufNew);
  1333.         return (NULL);
  1334.         }
  1335. #ifdef DIST_NODE_REPORT
  1336.     if (! DIST_NODE_IS_ALIVE (pNodeSrc))
  1337.         {
  1338.         printf ("dist..Reass: got a telegram from crashed noden");
  1339.         }
  1340. #endif
  1341.     /*
  1342.      * Process ACK field. An ACK commits all packets with
  1343.      * an id lower or equal to the value of the ACK.
  1344.      *
  1345.      * Traverse the list of non-acknowledged packets and
  1346.      * decrease the ACK counters of all acknowledged packets
  1347.      * in the list. If no ACK is missing for a certain packet,
  1348.      * dequeue it and wakeup the sender.
  1349.      */
  1350.     pktNext = pCommOut->commPktNextSend;
  1351.     if (winWithin (pCommOut->commAckNextExpect, ack, pktNext))
  1352.         {
  1353.         short    ackLo = pCommOut->commAckNextExpect;
  1354.         short    ackHi = (INT16) winAdd (ack, 1);
  1355.         pTBufAck = pCommOut->pCommQAck;
  1356.         pCommIn->commAckLastRecvd = ack;
  1357. #ifdef DIST_NODE_REPORT
  1358.         printf ("dist..Reass: ACK %d, AckQ %p:n", ack, pTBufAck);
  1359. #endif
  1360.         while (pTBufAck)
  1361.             {
  1362.             pTBufAckNext = DIST_TBUF_HDR_GET_NEXT (pTBufAck);
  1363.             if (winWithin (ackLo, pTBufAck->tBufHdrId, ackHi))
  1364.                 {
  1365. #ifdef DIST_NODE_REPORT
  1366.                 printf ("dist..Reass: within window, wait for %d more ACKsn",
  1367.                         pTBufAck->pTBufHdrTm->tmAckCnt - 1);
  1368. #endif
  1369.                 if (--pTBufAck->pTBufHdrTm->tmAckCnt == 0)
  1370.                     {
  1371.                     pCommOut->commAckNextExpect =
  1372.                         (INT16) winAdd (pCommOut->commAckNextExpect, 1);
  1373.                     DIST_TBUF_HDR_UNLINK (pCommOut->pCommQAck, pTBufAck);
  1374.                     pTBufAck->pTBufHdrTm->tmStatus = DIST_TM_STATUS_OK;
  1375. #ifdef DIST_NODE_REPORT
  1376.                     printf ("dist..Reass: %d ACKedn", pTBufAck->tBufHdrId);
  1377. #endif
  1378.                     semGive (&pTBufAck->pTBufHdrTm->tmWait4);
  1379.                     
  1380.                     /*
  1381.                      * distNodePktSend()--which should awake now--frees
  1382.                      * the packet.
  1383.                      */
  1384.                     }
  1385.                 }
  1386.             pTBufAck = pTBufAckNext;
  1387.             }
  1388.         }
  1389.     /*
  1390.      * If this is a NACK telegram, resend the single fragment, the
  1391.      * NACK asks us for. <id> holds the id of the packet, <seq> the
  1392.      * sequence number of the fragment within the packet.
  1393.      * The NACK telegram can be destroyed at this point, since
  1394.      * it contains no data.
  1395.      */
  1396.     if (type == DIST_TBUF_TTYPE_NACK)
  1397.         {
  1398.         DIST_TBUF_FREE (pTBufNew);
  1399. #ifdef DIST_NODE_REPORT
  1400.         printf ("dist..Reass: got NACK for (id %d, seq %d) from node 0x%lxn",
  1401.                 id, seq, nodeIdSrc);
  1402. #endif
  1403.         pTBufAck = pCommOut->pCommQAck;
  1404.         while (pTBufAck)
  1405.             {
  1406.             if (pTBufAck->tBufHdrId == id)
  1407.                 {
  1408.                 DIST_TBUF    *pFrag;
  1409.                 for (pFrag = DIST_TBUF_GET_NEXT (pTBufAck);
  1410.                         pFrag && pFrag->tBufSeq != seq;
  1411.                         pFrag = DIST_TBUF_GET_NEXT (pFrag));
  1412.                 pFrag->tBufAck =
  1413.                       (UINT16)winSub (pCommIn->commPktNextExpect, 1);
  1414. #ifdef DIST_NODE_REPORT
  1415.                 if (DIST_IF_SEND (nodeIdSrc, pFrag, 0) == ERROR)
  1416.                     printf ("dist..Reass: error sending NACK to node 0x%lxn",
  1417.                             nodeIdSrc);
  1418. #else
  1419.                 /* If this fails, there is no need to mess around. */
  1420.                 DIST_IF_SEND (nodeIdSrc, pFrag, 0);
  1421. #endif
  1422.                 distStat.nodeFragResend++;
  1423.                 distNodeDbUnlock ();
  1424.                 return (NULL);
  1425.                 }
  1426.             
  1427.             pTBufAck = DIST_TBUF_HDR_GET_NEXT (pTBufAck);
  1428.             }
  1429.         distNodeDbUnlock ();
  1430.         return (NULL);
  1431.         }
  1432.     /* If this is not a DTA (data) telegram, destroy it. */
  1433.     
  1434.     if (type != DIST_TBUF_TTYPE_DTA && type != DIST_TBUF_TTYPE_BDTA)
  1435.         {
  1436.         distNodeDbUnlock ();
  1437.         DIST_TBUF_FREE (pTBufNew);
  1438.         return (NULL);
  1439.         }
  1440.     /*
  1441.      * Go on processing this telegram?
  1442.      * 1) The packet id must be within the window.
  1443.      *    The window starts with the id of the packet
  1444.      *    expected next and has a size of DIST_NODE_WIN_SZ.
  1445.      * 2) The packet should not be reassembled already.
  1446.      */
  1447.     winLo = pCommIn->commPktNextExpect;
  1448.     winHi = (INT16) winAdd (winLo, DIST_NODE_WIN_SZ);
  1449.     if (! winWithin (winLo, id, winHi) ||
  1450.             DIST_NODE_BTST (pCommIn->pCommCompleted, id))
  1451.         {
  1452.         /* Not within the window. Destroy the telegram. */
  1453.         distNodeDbUnlock ();
  1454. #ifdef DIST_NODE_REPORT
  1455.         if (DIST_NODE_BTST (pCommIn->pCommCompleted, id))
  1456.             printf ("distNodePktReass: packet already reassembledn");
  1457.         else
  1458.             printf ("distNodePktReass: not within window (%d..%d)n",
  1459.                     winLo, winSub (winHi, 1));
  1460. #endif
  1461.         DIST_TBUF_FREE (pTBufNew);
  1462.         return (NULL);
  1463.         }
  1464.     /* Find queue for packet in reassembly list. */
  1465.      
  1466.     pReassHdr = pCommIn->pCommQReass;
  1467.     pReassHdrPrev = NULL;
  1468.     while (pReassHdr)
  1469.         {
  1470.         if (pReassHdr->tBufHdrId > id)
  1471.             break;    /* insert a new packet */
  1472.         if (pReassHdr->tBufHdrId == id)
  1473.             {
  1474.             /* We have found the packet, the new telegram belongs to. */
  1475.             DIST_TBUF    *pTBuf, *pTBufPrev;
  1476.             DIST_TBUF    *pTBufLast = DIST_TBUF_GET_LAST (pReassHdr);
  1477.             short        seqLast = pTBufLast->tBufSeq;
  1478. #ifdef DIST_NODE_REPORT
  1479.             printf ("dist..Reass: packet found, inserting telegramn");
  1480. #endif
  1481.             if (seqLast < seq)
  1482.                 {
  1483.                 /*
  1484.                  * This telegram has the highest sequence number ever
  1485.                  * received. Put it to the tail of the TBuf list.
  1486.                  */
  1487.                 DIST_TBUF_ENQUEUE (pReassHdr, pTBufNew);
  1488.                 pReassHdr->tBufHdrNLeaks += seq - seqLast - 1;
  1489. #ifdef UNDEFINED  /* unused, but be careful before excising for good */
  1490.                 pTBufLast = pTBufNew;
  1491. #endif
  1492.                 if (! DIST_TBUF_IS_BROADCAST (pTBufNew)
  1493.                     && (seq - seqLast) > 1)
  1494.                     {
  1495.                     INT16 i;
  1496.                     /*
  1497.                      * unlock database--not strictly necessary, but
  1498.                      * we are consuming time, holding the lock...
  1499.                      */
  1500.                     for (i = seqLast + 1; i < seq; i++)
  1501.                         distNodeSendNegAck (pNodeSrc, id, i);
  1502.                     }
  1503.                 }
  1504.             else
  1505.                 {
  1506.                 /*
  1507.                  * This telegram must be inserted somewhere in the TBuf list.
  1508.                  */
  1509.                 pTBufPrev = (DIST_TBUF *) pReassHdr;
  1510.                 pTBuf = DIST_TBUF_GET_NEXT (pReassHdr);
  1511.                 while (pTBuf != NULL)
  1512.                     {
  1513.                     if (pTBuf->tBufSeq == seq)
  1514.                         {
  1515.                         /* We have already received this one. */
  1516.                         
  1517.                         distNodeDbUnlock ();
  1518.                         DIST_TBUF_FREE (pTBufNew);
  1519.                         return (NULL);
  1520.                         }
  1521.     
  1522.                     if (pTBuf->tBufSeq > seq)
  1523.                         {
  1524.                         DIST_TBUF_INSERT_AFTER (pReassHdr,
  1525.                                                 pTBufPrev,
  1526.                                                 pTBufNew);
  1527.                         pReassHdr->tBufHdrNLeaks--;
  1528.                         break;
  1529.                         }
  1530.                     pTBufPrev = pTBuf;
  1531.                     pTBuf = DIST_TBUF_GET_NEXT (pTBuf);
  1532.                     }
  1533.                 }
  1534.             /* The telegram is in the reassembly list now. */
  1535.             pReassHdr->tBufHdrOverall += nBytes;
  1536.             /*
  1537.              * Test if the packet is fully reassembled (there
  1538.              * are no leaks within the packet and the last
  1539.              * fragment has the 'more fragments' bit cleared).
  1540.              */
  1541.             pTBufLast = DIST_TBUF_GET_LAST (pReassHdr);
  1542.             if (!pReassHdr->tBufHdrNLeaks && !(DIST_TBUF_HAS_MF (pTBufLast)))
  1543.                 {
  1544.                 /*
  1545.                  * We have received a complete packet and
  1546.                  * do not expect to receive more fragments.
  1547.                  */
  1548. #ifdef DIST_NODE_REPORT
  1549.                 {
  1550.                 DIST_TBUF    *pFirst = DIST_TBUF_GET_NEXT (pReassHdr);
  1551.                 
  1552.                 /* this is a hack */
  1553.                 
  1554.                 printf ("dist..Reass: packet %d complete (type %d/%d)n", id,
  1555.                         *((char *) (pFirst->pTBufData)),
  1556.                         *((char *) (pFirst->pTBufData) + 1));
  1557.                 }
  1558. #endif
  1559.                 DIST_NODE_BSET (pCommIn->pCommCompleted, id);
  1560.                 /*
  1561.                  * Test if the reassembled packet is the one
  1562.                  * we expect next.
  1563.                  */
  1564.                 if (pReassHdr->tBufHdrId == pCommIn->commPktNextExpect)
  1565.                     {
  1566.                     /*
  1567.                      * This is the packet, we expect next.
  1568.                      * Return communication pointer to caller,
  1569.                      * so that it knows about the job.
  1570.                      */
  1571.                     distNodeDbUnlock ();
  1572.                     return (pCommIn);
  1573.                     }
  1574.                 }
  1575.             distNodeDbUnlock ();
  1576.             return (NULL);
  1577.             }
  1578.         pReassHdrPrev = pReassHdr;
  1579.         pReassHdr = DIST_TBUF_HDR_GET_NEXT (pReassHdr);
  1580.         }
  1581.     
  1582.     /*
  1583.      * This is the first fragment of a new packet. There are either
  1584.      * more fragments, or this packet is not the one, we are waiting
  1585.      * for.
  1586.      */
  1587.      
  1588. #ifdef DIST_NODE_REPORT
  1589.     printf ("dist..Reass: first fragment of new packetn");
  1590. #endif
  1591.     if ((pTBufHdrNew = DIST_TBUF_HDR_ALLOC ()) == NULL)
  1592.         {
  1593.         distStat.netInDiscarded++; /* nodeInDiscarded counts pkts discarded */
  1594.         distNodeDbUnlock ();
  1595.         DIST_TBUF_FREE (pTBufNew);
  1596.         return (NULL);
  1597.         }
  1598.     pTBufHdrNew->tBufHdrSrc = nodeIdSrc;
  1599.     pTBufHdrNew->tBufHdrId = id;
  1600.     pTBufHdrNew->tBufHdrPrio = prio;
  1601.     pTBufHdrNew->tBufHdrOverall = nBytes;
  1602.     pTBufHdrNew->tBufHdrNLeaks = seq;
  1603.     pTBufHdrNew->tBufHdrTimo = WAIT_FOREVER;    /* reassembly timeout */
  1604.     DIST_TBUF_SET_NEXT (pTBufHdrNew, pTBufNew);
  1605.     DIST_TBUF_SET_LAST (pTBufHdrNew, pTBufNew);
  1606.     DIST_TBUF_HDR_INSERT_AFTER (pCommIn->pCommQReass, pReassHdrPrev,
  1607.         pTBufHdrNew);
  1608.     if (pCommIn->commPktNextExpect == id)
  1609.         /* this is the packet we want to deliver next */
  1610.         pCommIn->pCommQNextDeliver = pTBufHdrNew;
  1611.     if (seq == 0 && ! hasMf)
  1612.         {
  1613.         /* We have received a complete packet in a single telegram. */
  1614. #ifdef DIST_NODE_REPORT
  1615.         /* this is a hack */
  1616.         
  1617.         printf ("dist..Reass: packet %d complete (type %d/%d)n", id,
  1618.                 *((char *) (pTBufNew->pTBufData)),
  1619.                 *((char *) (pTBufNew->pTBufData) + 1));
  1620. #endif
  1621.         DIST_NODE_BSET (pCommIn->pCommCompleted, id);
  1622.         if (id == pCommIn->commPktNextExpect)
  1623.             {
  1624.             /*
  1625.              * This is the packet, we expect next.
  1626.              * Return communication pointer to caller,
  1627.              * so that it knows about the job.
  1628.              */
  1629.             distNodeDbUnlock ();
  1630.             return (pCommIn);
  1631.             }
  1632.         }
  1633.     distNodeDbUnlock ();
  1634.     /* Check if we have to send negative acknowledges (NACKs). */
  1635.     
  1636.     if (! DIST_TBUF_IS_BROADCAST (pTBufNew))
  1637.         {
  1638.         DIST_TBUF_HDR    *pPrev;
  1639.         DIST_TBUF        *pLastOfPrev;
  1640.         /*
  1641.          * Check if we have received the last fragment of the previous
  1642.          * packet. We cannot find out how many fragments are missing at
  1643.          * the end of the packet. So just send a NACK for the successor
  1644.          * of the last telegram received.
  1645.          */
  1646.         if ((pPrev = DIST_TBUF_HDR_GET_PREV (pTBufHdrNew)) != NULL &&
  1647.                 (pLastOfPrev = DIST_TBUF_GET_LAST (pTBufHdrNew)) != NULL &&
  1648.                 DIST_TBUF_HAS_MF (pLastOfPrev))
  1649.             {
  1650.             distNodeSendNegAck (pNodeSrc, pPrev->tBufHdrId,
  1651.                     pLastOfPrev->tBufSeq + 1);
  1652.             }
  1653.         /*
  1654.          * Check if some telegrams from the beginning of the new packet
  1655.          * are missing. Send a NACK for each of them.
  1656.          */
  1657.         if (seq > 0)
  1658.             {
  1659.             INT16 i;
  1660.             for (i = 0; i < seq; i++)
  1661.                 distNodeSendNegAck (pNodeSrc, id, i);
  1662.             }
  1663.         }
  1664.     return (NULL);
  1665.     }
  1666. /***************************************************************************
  1667. *
  1668. * distNodePktSend - send a packet already stored in TBufs (VxFusion option)
  1669. *
  1670. * This routine waits for an open send window and transmits the packet,
  1671. * by sending the data of each single TBuf.
  1672. * Afterwards it blocks for an ACK from the remote node.
  1673. *
  1674. * NOTE: Takes <distNodeDbLock>.
  1675. *
  1676. * AVAILABILITY
  1677. * This routine is distributed as a component of the unbundled distributed
  1678. * message queues option, VxFusion.
  1679. *
  1680. * RETURNS: OK, if successful.
  1681. * NOMANUAL
  1682. */
  1683. STATUS distNodePktSend
  1684.     (
  1685.     DIST_TBUF_HDR * pTBufHdr    /* message to send */
  1686.     )
  1687.     {
  1688.     DIST_NODE_DB_NODE *  pDistNodeFound;
  1689.     DIST_TRANSMISSION    distTm;
  1690.     DIST_NODE_COMM *     pComm;
  1691.     DIST_NODE_ID         nodeIdDest = pTBufHdr->tBufHdrDest;
  1692.     DIST_TBUF *          pTBuf;
  1693.     short                winLo, winHi;
  1694.     short                pktId;
  1695.     int                  priority;
  1696.     int                  wait4NumAcks;
  1697.     distNodeDbLock ();
  1698.     distStat.nodeOutReceived++;
  1699.     if ((pDistNodeFound = distNodeFindById (nodeIdDest)) == NULL)
  1700.         {
  1701. #ifdef DIST_DIAGNOSTIC
  1702.         distLog ("distNodePktSend: unknown node 0x%lxn", nodeIdDest);
  1703. #endif
  1704.         distStat.nodeDBNoMatch++;
  1705.         distNodeDbUnlock ();
  1706.         DIST_TBUF_FREEM (pTBufHdr);
  1707.         return (ERROR);
  1708.         }
  1709.     if (! DIST_NODE_IS_ALIVE (pDistNodeFound))
  1710.         {
  1711. #ifdef DIST_DIAGNOSTIC
  1712.         distLog ("distNodePktSend: destination not aliven");
  1713. #endif
  1714.         distStat.nodeNotAlive++;
  1715.         distNodeDbUnlock ();
  1716.         DIST_TBUF_FREEM (pTBufHdr);
  1717.         return (ERROR);
  1718.         }
  1719. #ifdef DIST_DIAGNOSTIC
  1720.     if (DIST_TBUF_GET_NEXT (pTBufHdr) == NULL)
  1721.         {
  1722.         distLog ("distNodePktSend: TBuf chain contains no datan");
  1723.         }
  1724. #endif
  1725.     if (nodeIdDest == DIST_IF_BROADCAST_ADDR)
  1726.         {
  1727.         pComm = &pDistNodeFound->nodeBroadcast;
  1728.         wait4NumAcks = distNodeNumNodesAlive - 1;
  1729.         }
  1730.     else
  1731.         {
  1732.         pComm = &pDistNodeFound->nodeUnicast;
  1733.         wait4NumAcks = 1;
  1734.         }
  1735.     semBInit (&distTm.tmWait4, SEM_Q_FIFO, SEM_EMPTY);
  1736.     distTm.tmStatus = DIST_TM_STATUS_OK;
  1737.     distTm.tmRetmTimo = distNodeRetryTimeout;
  1738.     distTm.tmNumTm = 1;
  1739.     distTm.tmAckCnt = wait4NumAcks;
  1740.     pTBufHdr->pTBufHdrTm = &distTm;
  1741.     /*
  1742.      * Each node has two queues for outgoing packets.
  1743.      * One is the queue for packets that should been
  1744.      * send, but the "window is closed" (full).
  1745.      * The other is the queue for packets send, and
  1746.      * waiting for an ACK.
  1747.      * In both of the two queues, packets get older.
  1748.      */
  1749.     pTBufHdr->tBufHdrId = pktId = pComm->commPktNextSend;
  1750.     winLo = pComm->commAckNextExpect;
  1751.     winHi = (INT16) winAdd (pComm->commAckNextExpect, DIST_NODE_WIN_SZ);
  1752.     /*
  1753.      * Can we send the packet directly or do we have to wait
  1754.      * until the "window opens".
  1755.      */
  1756.     if (! winWithin (winLo, pktId, winHi))
  1757.         {
  1758.         /* Put packet to the WaitToSend queue. */
  1759.         
  1760.         DIST_TBUF_HDR_ENQUEUE (pComm->pCommQWinOut, pTBufHdr);
  1761.         distNodeDbUnlock ();    /* unlock before sleep */
  1762.         /* Wait for a place within the window. */
  1763.         
  1764.         semTake (&distTm.tmWait4, WAIT_FOREVER);
  1765.         if (distTm.tmStatus != DIST_TM_STATUS_OK)
  1766.             {
  1767.             distNodeDbUnlock ();       /* this is not needed here! */
  1768.             DIST_TBUF_FREEM (pTBufHdr);
  1769.             return (ERROR);            /* e.g. timeout, node removed */
  1770.             }
  1771.         distNodeDbLock ();
  1772.         
  1773.         /* Remove it from the WaitToSend queue. */
  1774.         
  1775.         DIST_TBUF_HDR_UNLINK (pComm->pCommQWinOut, pTBufHdr);
  1776.         }
  1777.     pComm->commPktNextSend = (INT16) winAdd (pComm->commPktNextSend, 1);
  1778.     /* Put packet to the WaitForAck queue and send it. */
  1779.     
  1780.     DIST_TBUF_HDR_ENQUEUE (pComm->pCommQAck, pTBufHdr);
  1781.     
  1782.     pTBuf = (DIST_TBUF *) pTBufHdr;
  1783.     priority = pTBufHdr->tBufHdrPrio;
  1784.     while ((pTBuf = DIST_TBUF_GET_NEXT (pTBuf)) != NULL)
  1785.         {
  1786.         pTBuf->tBufId = pktId;
  1787.         pTBuf->tBufAck = (UINT16) winSub (pComm->commPktNextExpect, 1);
  1788. #ifdef DIST_NODE_REPORT
  1789.         printf ("distNodePktSend:%p: id %d, ack %d, seq %d, len %d, type %dn",
  1790.                 pTBuf, pTBuf->tBufId, pTBuf->tBufAck, pTBuf->tBufSeq,
  1791.                 pTBuf->tBufNBytes, pTBuf->tBufType);
  1792. #endif
  1793.         if (DIST_IF_SEND (nodeIdDest, pTBuf, priority) == ERROR)
  1794.             {
  1795.             distNodeDbUnlock ();
  1796.             DIST_TBUF_HDR_UNLINK (pComm->pCommQAck, pTBufHdr);
  1797.             DIST_TBUF_FREEM (pTBufHdr);
  1798.             return (ERROR);
  1799.             }
  1800.         pComm->commAckDelayed = FALSE;
  1801.         }
  1802.     
  1803.     distNodeDbUnlock ();    /* unlock before sleep */
  1804.     /* Wait for ACK. */
  1805.     
  1806. #ifdef DIST_NODE_REPORT
  1807.     distLog ("distNodePktSend: sleep while waiting for ACKn");
  1808. #endif
  1809.     semTake (&distTm.tmWait4, WAIT_FOREVER);
  1810. #ifdef DIST_NODE_REPORT
  1811.     distLog ("distNodePktSend: awacked: status of tm is %dn",
  1812.              distTm.tmStatus);
  1813. #endif
  1814.     /*
  1815.      * distNodePktReass() or distNodeTimer() has already
  1816.      * dequeued the packet.
  1817.      */
  1818.     DIST_TBUF_FREEM (pTBufHdr);
  1819.     if (distTm.tmStatus != DIST_TM_STATUS_OK)
  1820.         return (ERROR);                    /* e.g. timeout */
  1821.     return (OK);
  1822.     }
  1823. /***************************************************************************
  1824. *
  1825. * distNodePktResend - resend a packet (VxFusion option)
  1826. *
  1827. * Resend a packet already transmitted with distNodePktSend ().
  1828. *
  1829. * Currently called one place in distNodeDbCommTimer() which
  1830. * has <distNodeDbLock> taken before it is called.
  1831. *
  1832. * AVAILABILITY
  1833. * This routine is distributed as a component of the unbundled distributed
  1834. * message queues option, VxFusion.
  1835. *
  1836. * RETURNS: OK, if successful.
  1837. * NOMANUAL
  1838. */
  1839. LOCAL STATUS distNodePktResend
  1840.     (
  1841.     DIST_NODE_COMM * pComm,          /* communication channel */
  1842.     DIST_TBUF_HDR *  pTBufHdr        /* packet to retransmit */
  1843.     )
  1844.     {
  1845.     DIST_TBUF *     pTBuf;
  1846.     DIST_NODE_ID    nodeIdDest = pTBufHdr->tBufHdrDest;
  1847.     int             priority = pTBufHdr->tBufHdrPrio;
  1848.     distStat.nodePktResend++;
  1849.     pTBuf = (DIST_TBUF *) pTBufHdr;
  1850.     while ((pTBuf = DIST_TBUF_GET_NEXT (pTBuf)) != NULL)
  1851.         {
  1852.         pTBuf->tBufAck = (UINT16) winSub (pComm->commPktNextExpect, 1);
  1853. #ifdef DIST_NODE_REPORT
  1854.         printf ("distNodePktResend: resend with ACK for %d (%d)n",
  1855.                 pTBuf->tBufAck, pComm->commPktNextExpect);
  1856. #endif
  1857.         if (DIST_IF_SEND (nodeIdDest, pTBuf, priority) == ERROR)
  1858.             {
  1859.             DIST_TBUF_HDR_UNLINK (pComm->pCommQAck, pTBufHdr);
  1860.             DIST_TBUF_FREEM (pTBufHdr);
  1861.             return (ERROR);
  1862.             }
  1863.         }
  1864.     return (OK);
  1865.     }
  1866.     
  1867. /***************************************************************************
  1868. *
  1869. * distNodePktAck - ask for an ACK for a received and consumed packet (VxFusion option)
  1870. *
  1871. * This routine acknowledeges a packet. If DIST_NODE_ACK_IMMEDIATELY is
  1872. * not set in <options>, an ACK telegram is not transmitted immediately
  1873. * but a flag is set. If data is sent to the ACK awaiting node, within a
  1874. * certain range of time, the ACK is sent within the data telegram.
  1875. * Else an ACK telegram is transmitted. This is known as `piggy-backing'.
  1876. * Due to the design, piggy-backing only works for unicasts. For broadcasts,
  1877. * an ACK is always sent immediately.
  1878. *
  1879. * NOTE: Takes <distNodeDbLock>.
  1880. *
  1881. * AVAILABILITY
  1882. * This routine is distributed as a component of the unbundled distributed
  1883. * message queues option, VxFusion.
  1884. *
  1885. * RETURNS: OK, if successful.
  1886. * NOMANUAL
  1887. */
  1888. STATUS distNodePktAck
  1889.     (
  1890.     DIST_NODE_ID    nodeIdSrc,  /* source node */
  1891.     DIST_TBUF_HDR * pPktAck,    /* ack packet */
  1892.     int             options     /* options */
  1893.     )
  1894.     {
  1895.     DIST_NODE_DB_NODE * pDistNodeFound;
  1896.     DIST_NODE_COMM *    pComm;
  1897.     short               id = pPktAck->tBufHdrId;
  1898.     int                 ackBroadcast;
  1899. #ifdef DIST_DIAGNOSTIC
  1900.     if (nodeIdSrc == DIST_IF_BROADCAST_ADDR)
  1901.         distPanic ("distNodePktAck: cannot send ACK to broadcast node.n");
  1902. #endif
  1903.     distNodeDbLock ();
  1904.     /*
  1905.      * Find node, the acknowledge is destinated to.
  1906.      */
  1907.     if ((pDistNodeFound = distNodeFindById (nodeIdSrc)) == NULL)
  1908.         {
  1909. #ifdef DIST_DIAGNOSTIC
  1910.         distLog ("distNodePktAck: try to send ACK to unknown node 0x%lxn",
  1911.                  nodeIdSrc);
  1912. #endif
  1913.         distStat.nodeDBNoMatch++;
  1914.         distNodeDbUnlock ();
  1915.         return (ERROR);
  1916.         }
  1917.     /* Do we respond to a broadcast? */
  1918.     ackBroadcast = DIST_TBUF_IS_BROADCAST (DIST_TBUF_GET_NEXT (pPktAck));
  1919.     if (ackBroadcast)
  1920.         {
  1921.         pComm = &pDistNodeFound->nodeBroadcast;
  1922.         if (!distNodeSupportPBB)
  1923.             options |= DIST_NODE_ACK_IMMEDIATELY;
  1924. #ifdef DIST_NODE_REPORT
  1925.         printf ("distNodePktAck: ack broadcast %d form node 0x%lx immediatelyn",
  1926.                 id, nodeIdSrc);
  1927. #endif
  1928.         }
  1929.     else
  1930.         {
  1931.         pComm = &pDistNodeFound->nodeUnicast;
  1932.         if (!distNodeSupportPBU)
  1933.             options |= DIST_NODE_ACK_IMMEDIATELY;
  1934. #ifdef DIST_NODE_REPORT
  1935.         printf ("distNodePktAck: ack unicast %d form node 0x%lx %sn",
  1936.                 id, nodeIdSrc,
  1937.                 options
  1938.                 & DIST_NODE_ACK_IMMEDIATELY ? "immediately" : "delayed");
  1939. #endif
  1940.         }
  1941.     
  1942.     if (id != pComm->commPktNextExpect)    /* should never happen */
  1943.         distPanic ("distNodePktAck: acknowledge in wrong ordern");
  1944.     DIST_NODE_BCLR (pComm->pCommCompleted, id);
  1945.     pComm->commPktNextExpect = (INT16) winAdd (pComm->commPktNextExpect, 1);
  1946.     if (options & DIST_NODE_ACK_IMMEDIATELY)
  1947.         {
  1948.         /* Ack immediately. */
  1949.         
  1950.         distNodeSendAck (pDistNodeFound, ackBroadcast, options);
  1951.         }
  1952.     else
  1953.         {
  1954.         /*
  1955.          * Ask for a delayed ACK. If data is transfered before, the ACK
  1956.          * is piggy-backed with the data packet.
  1957.          */
  1958.         pComm->commAckDelayed = TRUE;
  1959.         }
  1960.     distNodeDbUnlock ();
  1961.     distStat.nodeAcked++;
  1962.     return (OK);
  1963.     }
  1964. /***************************************************************************
  1965. *
  1966. * distNodeSendNegAck - send a negative acknowledge (NACK) (VxFusion option)
  1967. *
  1968. * Send a negative acknowledgment, asking for a resend of fragment <seq>
  1969. * of packet <id>. A NACK (like an ACK) also holds the id of the last
  1970. * correctly received packet.
  1971. *
  1972. * NOTE: Currently, NACKs cannot be used in broadcast communication.
  1973. *
  1974. * AVAILABILITY
  1975. * This routine is distributed as a component of the unbundled distributed
  1976. * message queues option, VxFusion.
  1977. *
  1978. * RETURNS: OK, if sent.
  1979. * NOMANUAL
  1980. */
  1981. LOCAL STATUS distNodeSendNegAck
  1982.     (
  1983.     DIST_NODE_DB_NODE * pNode,   /* target node */
  1984.     short               id,      /* id */
  1985.     short               seq      /* seq number */
  1986.     )
  1987.     {
  1988.     DIST_TBUF * pTBuf;
  1989.     STATUS      status;
  1990.     if (! distNodeSupportNACK)
  1991.         return (OK);
  1992. #ifdef DIST_NODE_REPORT
  1993.     printf ("distNodeSendNegAck: NACK (id %d, seq %d) to node 0x%lxn",
  1994.             id, seq, pNode->nodeId);
  1995. #endif
  1996.     if ((pTBuf = DIST_TBUF_ALLOC ()) == NULL)
  1997.         return (ERROR);
  1998.     /* fill in telegram header */
  1999.     
  2000.     pTBuf->tBufId = id;              /* id of missing telegram */
  2001.     pTBuf->tBufSeq = seq;            /* sequence number of missing telegram */
  2002.     pTBuf->tBufAck = (UINT16)winSub (pNode->nodeUnicast.commPktNextExpect, 1);
  2003.     pTBuf->tBufNBytes = 0;
  2004.     pTBuf->tBufType = DIST_TBUF_TTYPE_NACK;
  2005.     pTBuf->tBufFlags = DIST_TBUF_FLAG_HDR;
  2006.     /* Send NACK. */
  2007.     
  2008.     if ((status = DIST_IF_SEND (pNode->nodeId, pTBuf, 0)) == OK)
  2009.         pNode->nodeUnicast.commAckDelayed = FALSE;
  2010.     DIST_TBUF_FREE (pTBuf);
  2011.     return (status);
  2012.     }
  2013. /***************************************************************************
  2014. *
  2015. * distNodeSendAck - send an ACK to the network (VxFusion option)
  2016. *
  2017. * Send an acknowledge telegram to the network. The ACK holds the
  2018. * id of the last correctly received packet.
  2019. *
  2020. * This is called from three places, all of which take <distNodeDbLock>.
  2021. *
  2022. * AVAILABILITY
  2023. * This routine is distributed as a component of the unbundled distributed
  2024. * message queues option, VxFusion.
  2025. *
  2026. * RETURNS: OK, if successful.
  2027. * NOMANUAL
  2028. */
  2029. LOCAL STATUS distNodeSendAck
  2030.     (
  2031.     DIST_NODE_DB_NODE * pNode,           /* target node */
  2032.     int                 ackBroadcast,    /* broadcast flag */
  2033.     int                 options          /* options */
  2034.     )
  2035.     {
  2036.     DIST_NODE_COMM * pComm;
  2037.     DIST_NODE_ID     nodeIdDest;
  2038.     DIST_TBUF *      pTBuf;
  2039.     STATUS           status;
  2040.     nodeIdDest = pNode->nodeId;
  2041.     pComm = (ackBroadcast ? &pNode->nodeBroadcast : &pNode->nodeUnicast);
  2042.     if ((pTBuf = DIST_TBUF_ALLOC ()) == NULL)
  2043.         return (ERROR);
  2044.     pTBuf->tBufId = 0;        /* the telegram id is ignored */
  2045.     pTBuf->tBufAck = (UINT16) winSub (pComm->commPktNextExpect, 1);
  2046.     pTBuf->tBufSeq = 0;
  2047.     pTBuf->tBufNBytes = 0;
  2048.     pTBuf->tBufFlags = DIST_TBUF_FLAG_HDR;
  2049.     if (ackBroadcast)
  2050.         {
  2051.         /* This is an ACK for a broadcast. */
  2052.         
  2053.         pTBuf->tBufType = DIST_TBUF_TTYPE_BACK;
  2054.         pTBuf->tBufFlags |= DIST_TBUF_FLAG_BROADCAST;
  2055. #ifdef DIST_NODE_REPORT
  2056.         printf ("distNodeSendAck: acknowledge broadcast %d from node 0x%lxn",
  2057.                 pTBuf->tBufAck, nodeIdDest);
  2058. #endif
  2059.         if (options & DIST_NODE_ACK_EXTENDED)
  2060.             {
  2061.             DIST_NODE_DB_NODE * pBroadcastNode;
  2062.             DIST_XACK *         pXAck = pTBuf->pTBufData;
  2063.             int                 nextSend;
  2064.             pBroadcastNode = distNodeFindById (DIST_IF_BROADCAST_ADDR);
  2065.             if (pBroadcastNode == NULL)
  2066.                 {
  2067.                 DIST_TBUF_FREE (pTBuf);
  2068.                 return (ERROR);
  2069.                 }
  2070.             nextSend = htons (pBroadcastNode->nodeBroadcast.commPktNextSend);
  2071.             pXAck->xAckPktNextSend = (UINT16) nextSend;
  2072.             pXAck->xAckPktNodeState = htons ((uint16_t) distNodeLocalState);
  2073.             pTBuf->tBufNBytes += sizeof (*pXAck);
  2074.             }
  2075.         }
  2076.     else
  2077.         {
  2078.         /* This is an ACK for a unicast. */
  2079.         
  2080.         pTBuf->tBufType = DIST_TBUF_TTYPE_ACK;
  2081. #ifdef DIST_NODE_REPORT
  2082.         printf ("distNodeSendAck: acknowledge unicast %d from node 0x%lxn",
  2083.                 pTBuf->tBufAck, nodeIdDest);
  2084. #endif
  2085.         if (options & DIST_NODE_ACK_EXTENDED)
  2086.             {
  2087.             DIST_XACK * pXAck = pTBuf->pTBufData;
  2088.             pXAck->xAckPktNextSend =
  2089.                          htons (pNode->nodeUnicast.commPktNextSend);
  2090.             pXAck->xAckPktNodeState = htons ((uint16_t) distNodeLocalState);
  2091.             pTBuf->tBufNBytes += sizeof (*pXAck);
  2092.             }
  2093.         }
  2094.     /* Send ACK. */
  2095.     
  2096.     if ((status = DIST_IF_SEND (nodeIdDest, pTBuf, 0)) == OK)
  2097.         pComm->commAckDelayed = FALSE;
  2098.     DIST_TBUF_FREE (pTBuf);
  2099.     return (status);
  2100.     }
  2101. /***************************************************************************
  2102. *
  2103. * distNodePktDiscard - discard a packet (VxFusion option)
  2104. *
  2105. * This routine discards a packet. No ACK is send.
  2106. *
  2107. * NOTE: Takes <distNodeDbLock>.
  2108. *
  2109. * AVAILABILITY
  2110. * This routine is distributed as a component of the unbundled distributed
  2111. * message queues option, VxFusion.
  2112. *
  2113. * RETURNS: OK, if successful.
  2114. * NOMANUAL
  2115. */
  2116. STATUS distNodePktDiscard
  2117.     (
  2118.     DIST_NODE_ID    nodeIdSrc,
  2119.     DIST_TBUF_HDR * pPktDiscard
  2120.     )
  2121.     {
  2122.     DIST_NODE_DB_NODE * pDistNodeFound;
  2123.     DIST_NODE_COMM *    pComm;
  2124.     short               id = pPktDiscard->tBufHdrId;
  2125.     distNodeDbLock ();
  2126.     if ((pDistNodeFound = distNodeFindById (nodeIdSrc)) == NULL)
  2127.         {
  2128. #ifdef DIST_DIAGNOSTIC
  2129.         distLog ("distNodePktDel: try to delete from unknown node 0x%lxn",
  2130.                  nodeIdSrc);
  2131. #endif
  2132.         distStat.nodeDBNoMatch++;
  2133.         distNodeDbUnlock ();
  2134.         return (ERROR);
  2135.         }
  2136. #ifdef DIST_NODE_REPORT
  2137.     printf ("distNodePktDiscard: discard packet %d from node 0x%lxn",
  2138.             id, nodeIdSrc);
  2139. #endif
  2140.     if (DIST_TBUF_IS_BROADCAST (DIST_TBUF_GET_NEXT (pPktDiscard)))
  2141.         pComm = &pDistNodeFound->nodeBroadcast;
  2142.     else
  2143.         pComm = &pDistNodeFound->nodeUnicast;
  2144.     
  2145.     distStat.nodeInDiscarded++;
  2146.     /* Clear id in bitfield, but do not move the window */
  2147.     
  2148.     DIST_NODE_BCLR (pComm->pCommCompleted, id);
  2149.     distNodeDbUnlock ();
  2150.     /* Free the discarded packet. */
  2151.     
  2152.     DIST_TBUF_FREEM (pPktDiscard);
  2153.     return (OK);
  2154.     }
  2155. /***************************************************************************
  2156. *
  2157. * distNodeDBTimerTask - node database manager task (VxFusion option)
  2158. *
  2159. * For now, distNodeDBTimer() is called from a task. The caller may be a
  2160. * watchdog in future.
  2161. *
  2162. * AVAILABILITY
  2163. * This routine is distributed as a component of the unbundled distributed
  2164. * message queues option, VxFusion.
  2165. *
  2166. * RETURNS: N/A
  2167. * NOMANUAL
  2168. */
  2169. LOCAL void distNodeDBTimerTask (void)
  2170.     {
  2171.     while (1)
  2172.         {
  2173.         distNodeDBTimer ();
  2174.         taskDelay (DIST_NODE_MGR_WAKEUP_TICKS);
  2175.         }
  2176.     }
  2177. /***************************************************************************
  2178. *
  2179. * distNodeDBTimer - periodic routine, to update the node database (VxFusion option)
  2180. *
  2181. * Updates each node of the database.
  2182. *
  2183. * AVAILABILITY
  2184. * This routine is distributed as a component of the unbundled distributed
  2185. * message queues option, VxFusion.
  2186. *
  2187. * RETURNS: N/A
  2188. * NOMANUAL
  2189. */
  2190. LOCAL void distNodeDBTimer (void)
  2191.     {
  2192.     DIST_NODE_DB_NODE * pNode;
  2193.     DIST_NODE_BTIMO     btimo;
  2194.     DIST_NODE_DB_LOCK;
  2195.     
  2196.     if ((pNode = distNodeFindById (DIST_IF_BROADCAST_ADDR)) != NULL)
  2197.         distNodeDBCommTimer (pNode, &btimo, TRUE);
  2198.     DIST_NODE_DB_UNLOCK;
  2199.     distNodeEach ((FUNCPTR) distNodeDBNodeTimer, (int) &btimo);
  2200.     }
  2201. /***************************************************************************
  2202. *
  2203. * distNodeDBNodeTimer - periodic routine, to update single node of node DB (VxFusion option)
  2204. *
  2205. * This routine handles unicast and broadcast communication of a specific
  2206. * node.
  2207. *
  2208. * NOTE: <distNodeDbLock> must be taken before this function is called.
  2209. * Currently, it is only invoked by distNodeEach(), which takes the lock.
  2210. *
  2211. * AVAILABILITY
  2212. * This routine is distributed as a component of the unbundled distributed
  2213. * message queues option, VxFusion.
  2214. *
  2215. * RETURNS: TRUE, if updated.
  2216. * NOMANUAL
  2217. */
  2218. LOCAL BOOL distNodeDBNodeTimer
  2219.     (
  2220.     DIST_NODE_DB_NODE * pNode,   /* node to update */
  2221.     DIST_NODE_BTIMO *   pBtimo   /* time interval */
  2222.     )
  2223.     {
  2224.     if (pNode->nodeId == DIST_IF_BROADCAST_ADDR)
  2225.         return (TRUE);
  2226.     distNodeDBCommTimer (pNode, pBtimo, FALSE);    /* unicast */
  2227.     distNodeDBCommTimer (pNode, pBtimo, TRUE);     /* broadcast */
  2228.     return (TRUE);    /* continue */
  2229.     }
  2230. /***************************************************************************
  2231. *
  2232. * distNodeDBCommTimer - periodic routine, to update a communication link (VxFusion option)
  2233. *
  2234. * NOTE: Must take <distNodeDbLock> before.  Currently called from
  2235. * distNodeDBTimer() and distNodeDBNodeTimer(), which take the lock.
  2236. *
  2237. * AVAILABILITY
  2238. * This routine is distributed as a component of the unbundled distributed
  2239. * message queues option, VxFusion.
  2240. *
  2241. * RETURNS: N/A
  2242. * NOMANUAL
  2243. */
  2244. LOCAL void distNodeDBCommTimer
  2245.     (
  2246.     DIST_NODE_DB_NODE * pNode,            /* node to update */
  2247.     DIST_NODE_BTIMO *   pBtimo,           /* time interval */
  2248.     BOOL                isBroadcastComm   /* broadcast boolean */
  2249.     )
  2250.     {
  2251.     DIST_NODE_COMM * pComm;
  2252.     DIST_TBUF_HDR *  pTBufWin;
  2253.     DIST_TBUF_HDR *  pTBufReass;
  2254.     DIST_TBUF_HDR *  pTBufAck;
  2255.     DIST_TBUF_HDR *  pTBufNext;
  2256.     /*
  2257.      * Define, what part of the node we are working on:
  2258.      * broadcast communication or unicast communication
  2259.      */
  2260.     if (isBroadcastComm)
  2261.         pComm = &pNode->nodeBroadcast;
  2262.     else
  2263.         pComm = &pNode->nodeUnicast;
  2264.     if (pNode->nodeId == DIST_IF_BROADCAST_ADDR)
  2265.         {
  2266.         pBtimo->btimoWinLo = pComm->commAckNextExpect;
  2267.         pBtimo->btimoWinHi = pComm->commPktNextSend;
  2268.         pBtimo->btimoTimedOut = FALSE;
  2269.         }
  2270.     else
  2271.         {
  2272.         if (isBroadcastComm && pBtimo->btimoTimedOut)
  2273.             {
  2274.             short    wl = pBtimo->btimoWinLo;
  2275.             short    wh = pBtimo->btimoWinHi;
  2276.             /*
  2277.              * Last ACK should fall in between this range. If not,
  2278.              * the node is declared dead.
  2279.              */
  2280.             if (! winWithin (wl, pComm->commAckLastRecvd, wh))
  2281.                 {
  2282.                 distNodeCrashed (pNode->nodeId);
  2283.                 return;
  2284.                 }
  2285.             }
  2286.         }
  2287.     /* Look for delayed ACKs. */
  2288.     if (pComm->commAckDelayed)
  2289.         {
  2290.         /* Send ACK. */
  2291.         
  2292.         distNodeSendAck (pNode, isBroadcastComm, 0);
  2293.         }
  2294.     /*
  2295.      * Look for packets that waited for first transmission,
  2296.      * but have timed out meanwhile.
  2297.      */
  2298.     pTBufNext = pComm->pCommQWinOut;
  2299.     while ((pTBufWin = pTBufNext) != NULL)
  2300.         {
  2301.         pTBufNext = DIST_TBUF_HDR_GET_NEXT (pTBufWin);
  2302.         /* Has user's timeout expired?  */
  2303.         
  2304.         if ((pTBufWin->tBufHdrTimo != WAIT_FOREVER) &&
  2305.             (pTBufWin->tBufHdrTimo -= DIST_NODE_MGR_WAKEUP_TICKS) <= 0)
  2306.             {
  2307.             /* Timed out. */
  2308.             
  2309.             pTBufWin->pTBufHdrTm->tmStatus = DIST_TM_STATUS_TIMEOUT;
  2310.             semGive (&pTBufWin->pTBufHdrTm->tmWait4);
  2311.             }
  2312.         }
  2313.     /*
  2314.      * Look for packets that were send, but not acknowledged
  2315.      * by now.
  2316.      */
  2317.     pTBufNext = pComm->pCommQAck;
  2318.     while ((pTBufAck = pTBufNext) != NULL)
  2319.         {
  2320.         pTBufNext = DIST_TBUF_HDR_GET_NEXT (pTBufAck);
  2321.         /* Has user's timeout expired? */
  2322.         
  2323.         if ((pTBufAck->tBufHdrTimo != WAIT_FOREVER) &&
  2324.             (pTBufAck->tBufHdrTimo -= DIST_NODE_MGR_WAKEUP_TICKS) <= 0)
  2325.             {
  2326.             /* Timed out. */
  2327.             
  2328.             if (distNodeLocalState == DIST_NODE_STATE_BOOT)
  2329.                 {
  2330.                 pComm->commAckNextExpect =
  2331.                     (INT16) winAdd (pComm->commAckNextExpect, 1);
  2332.                 pTBufAck->pTBufHdrTm->tmStatus = DIST_TM_STATUS_OK;
  2333.                 distNodeLocalState = DIST_NODE_STATE_NETWORK;
  2334.                 }
  2335.             else
  2336.                 pTBufAck->pTBufHdrTm->tmStatus = DIST_TM_STATUS_TIMEOUT;
  2337.             DIST_TBUF_HDR_UNLINK (pComm->pCommQAck, pTBufAck);
  2338.             semGive (&pTBufAck->pTBufHdrTm->tmWait4);
  2339.             continue;
  2340.             }
  2341.         /* Timeout for retransmisson. */
  2342.         if (pTBufAck->pTBufHdrTm->tmRetmTimo--  <= 0)
  2343.             {
  2344.             /* Timed out. Retransmit. */
  2345.     
  2346.             if (pTBufAck->pTBufHdrTm->tmNumTm >= distNodeMaxRetries + 1)
  2347.                 {
  2348.                 if (distNodeLocalState == DIST_NODE_STATE_BOOT)
  2349.                     {
  2350.                     pComm->commAckNextExpect =
  2351.                         (INT16) winAdd (pComm->commAckNextExpect, 1);
  2352.                     pTBufAck->pTBufHdrTm->tmStatus = DIST_TM_STATUS_OK;
  2353.                     distNodeLocalState = DIST_NODE_STATE_NETWORK;
  2354.                     }
  2355.                 else
  2356.                     {
  2357.                     if (pNode->nodeId == DIST_IF_BROADCAST_ADDR)
  2358.                         {
  2359.                         short    wl = pBtimo->btimoWinLo;
  2360.                         short    wh = pComm->commPktNextSend;
  2361.                         if (winWithin (wl, pTBufAck->tBufHdrId, wh))
  2362.                             pBtimo->btimoWinLo = pTBufAck->tBufHdrId;
  2363.                         pBtimo->btimoTimedOut = TRUE;
  2364.                         }
  2365.                     else
  2366.                         {
  2367.                         /*
  2368.                          * The remote note cannot be reached. Declare the
  2369.                          * node to be dead and set the transmission status
  2370.                          * to "unreachable". Wake up sender.
  2371.                          */
  2372.                         (void) distNodeCrashed (pNode->nodeId);
  2373.                         }
  2374.                     pTBufAck->pTBufHdrTm->tmStatus = DIST_TM_STATUS_UNREACH;
  2375.                     }
  2376.                 DIST_TBUF_HDR_UNLINK (pComm->pCommQAck, pTBufAck);
  2377.                 semGive (&pTBufAck->pTBufHdrTm->tmWait4);
  2378.                 }
  2379.             else
  2380.                 {
  2381.                 distNodePktResend (pComm, pTBufAck);
  2382.                 pTBufAck->pTBufHdrTm->tmNumTm++;
  2383.                 pTBufAck->pTBufHdrTm->tmRetmTimo =
  2384.                         pTBufAck->pTBufHdrTm->tmNumTm * distNodeRetryTimeout;
  2385.                 }
  2386.             }
  2387.         }
  2388.     /*
  2389.      * Look for packets that were received in fragments, but not completed
  2390.      * by now.
  2391.      */
  2392.     pTBufNext = pComm->pCommQReass;
  2393.     while ((pTBufReass = pTBufNext) != NULL)
  2394.         {
  2395.         pTBufNext = DIST_TBUF_HDR_GET_NEXT (pTBufReass);
  2396.         if ((pTBufReass->tBufHdrTimo != WAIT_FOREVER) &&
  2397.             (pTBufReass->tBufHdrTimo -= DIST_NODE_MGR_WAKEUP_TICKS) <= 0)
  2398.             {
  2399.             distNodeCrashed (pTBufReass->tBufHdrSrc);
  2400.             DIST_TBUF_HDR_UNLINK (pComm->pCommQReass, pTBufReass);
  2401.             pComm->commPktNextExpect =
  2402.                              (INT16)winAdd (pTBufReass->tBufHdrId, 1);
  2403.             DIST_TBUF_FREEM (pTBufReass);
  2404.             }
  2405.         }
  2406.     }
  2407. #ifdef DIST_DIAGNOSTIC_SHOW
  2408. /***************************************************************************
  2409. *
  2410. * distNodeShow - show routine for a node from node database (VxFusion option)
  2411. *
  2412. * This routine looks up node <distNodeId> and displays information
  2413. * from node database.
  2414. * Only intended for internal use.
  2415. *
  2416. * NOTE: Takes <distNodeDbLock>.
  2417. *
  2418. * AVAILABILITY
  2419. * This routine is distributed as a component of the unbundled distributed
  2420. * message queues option, VxFusion.
  2421. *
  2422. * RETURNS: OK, if processed.
  2423. * NOMANUAL
  2424. */
  2425. STATUS distNodeShow
  2426.     (
  2427.     DIST_NODE_ID    distNodeId    /* node ID to show */
  2428.     )
  2429.     {
  2430.     DIST_NODE_DB_NODE * pDistNodeFound;
  2431.     distNodeDbLock ();
  2432.     if ((pDistNodeFound = distNodeFindById (distNodeId)) == NULL)
  2433.             {
  2434.             distNodeDbUnlock ();
  2435.             return (ERROR);
  2436.             }
  2437.     printf ("%d: state %dn",
  2438.             (int) pDistNodeFound->nodeId,
  2439.             pDistNodeFound->nodeState);
  2440.     printf ("  unicast/in: buffer: ");
  2441.     DIST_NODE_BPRINT (pDistNodeFound->nodeUnicast.pCommCompleted,
  2442.             DIST_IF_RNG_BUF_SZ);
  2443.     printf ("n");
  2444.     printf ("  unicast/in: reassQ %p, dlvr nxt %pn",
  2445.             pDistNodeFound->nodeUnicast.pCommQReass,
  2446.             pDistNodeFound->nodeUnicast.pCommQNextDeliver);
  2447.     printf ("  unicast/in: expect nxt %d, last ack rcvd %dn",
  2448.             pDistNodeFound->nodeUnicast.commPktNextExpect,
  2449.             pDistNodeFound->nodeUnicast.commAckLastRecvd);
  2450.     printf ("  unicast/out: winOutQ %p, ackQ %p,n",
  2451.             pDistNodeFound->nodeUnicast.pCommQWinOut,
  2452.             pDistNodeFound->nodeUnicast.pCommQAck);
  2453.     printf ("  unicast/out: nxt ack xpct %d, nxt snd %d, ack dlyd %dn",
  2454.             pDistNodeFound->nodeUnicast.commAckNextExpect,
  2455.             pDistNodeFound->nodeUnicast.commPktNextSend,
  2456.             pDistNodeFound->nodeUnicast.commAckDelayed);
  2457.     printf ("  broadcast/in: buffer: ");
  2458.     DIST_NODE_BPRINT (pDistNodeFound->nodeBroadcast.pCommCompleted,
  2459.             DIST_IF_RNG_BUF_SZ);
  2460.     printf ("n");
  2461.     printf ("  broadcast/in: reassQ %p, dlvr nxt %pn",
  2462.             pDistNodeFound->nodeBroadcast.pCommQReass,
  2463.             pDistNodeFound->nodeBroadcast.pCommQNextDeliver);
  2464.     printf ("  broadcast/in: expect next %d, last ack rcvd %dn",
  2465.             pDistNodeFound->nodeBroadcast.commPktNextExpect,
  2466.             pDistNodeFound->nodeBroadcast.commAckLastRecvd);
  2467.     printf ("  broadcast/out: winOutQ %p, ackQ %p,n",
  2468.             pDistNodeFound->nodeBroadcast.pCommQWinOut,
  2469.             pDistNodeFound->nodeBroadcast.pCommQAck);
  2470.     printf ("  broadcast/out: nxt ack xpct %d, nxt snd %d, ack dlyd %dn",
  2471.             pDistNodeFound->nodeBroadcast.commAckNextExpect,
  2472.             pDistNodeFound->nodeBroadcast.commPktNextSend,
  2473.             pDistNodeFound->nodeBroadcast.commAckDelayed);
  2474.     distNodeDbUnlock ();
  2475.     return (OK);
  2476.     }
  2477. /***************************************************************************
  2478. *
  2479. * distNodeDbShow - brief overview of node database (VxFusion option)
  2480. *
  2481. * Print a summary of the database.
  2482. *
  2483. * AVAILABILITY
  2484. * This routine is distributed as a component of the unbundled distributed
  2485. * message queues option, VxFusion.
  2486. *
  2487. * RETURNS: OK.
  2488. * NOMANUAL
  2489. */
  2490. STATUS distNodeDbShow (void)
  2491.     {
  2492.     printf ("                       UNICAST------------------- ");
  2493.     printf ("BROADCAST-----------------n");
  2494.     printf ("NODE ID    STATE       PKT XPCT ACK XPCT  PKT SND ");
  2495.     printf ("PKT XPCT ACK XPCT  PKT SNDn");
  2496.     printf ("---------- ----------- -------- -------- -------- ");
  2497.     printf ("-------- -------- --------n");
  2498.     distNodeEach ((FUNCPTR) distNodeNodeShow, 0);
  2499.     return (OK);
  2500.     }
  2501. /***************************************************************************
  2502. *
  2503. * distNodeNodeShow - helper for distNodeDbShow() (VxFusion option)
  2504. *
  2505. * Print database node info.
  2506. *
  2507. * AVAILABILITY
  2508. * This routine is distributed as a component of the unbundled distributed
  2509. * message queues option, VxFusion.
  2510. *
  2511. * RETURNS: TRUE.
  2512. * NOMANUAL
  2513. */
  2514. LOCAL BOOL distNodeNodeShow
  2515.     (
  2516.     DIST_NODE_DB_NODE * pNode,    /* node to display */
  2517.     int                 dummy     /* unused argument */
  2518.     )
  2519.     {
  2520.     char * stateNm;
  2521.     printf ("0x%8lx ", pNode->nodeId);
  2522.     switch (pNode->nodeState)
  2523.         {
  2524.         case DIST_NODE_STATE_BOOT:
  2525.             stateNm = "booting";
  2526.             break;
  2527.         case DIST_NODE_STATE_NETWORK:
  2528.             stateNm = "network";
  2529.             break;
  2530.         case DIST_NODE_STATE_OPERATIONAL:
  2531.             stateNm = "operational";
  2532.             break;
  2533.         case DIST_NODE_STATE_CRASHED:
  2534.             stateNm = "crashed";
  2535.             break;
  2536.         case DIST_NODE_STATE_RESYNC:
  2537.             stateNm = "resync";
  2538.             break;
  2539.         default:
  2540.             stateNm = "unknown";
  2541.         }
  2542.     printf ("%11s ", stateNm);
  2543.     if (pNode->nodeId == DIST_IF_BROADCAST_ADDR)
  2544.         {
  2545.         printf ("      --       --       -- ");
  2546.         printf ("      -- %8d %8dn",
  2547.                 pNode->nodeBroadcast.commAckNextExpect,
  2548.                 pNode->nodeBroadcast.commPktNextSend);
  2549.         }
  2550.     else
  2551.         {
  2552.         printf ("%8d %8d %8d ",
  2553.                 pNode->nodeUnicast.commPktNextExpect,
  2554.                 pNode->nodeUnicast.commAckNextExpect,
  2555.                 pNode->nodeUnicast.commPktNextSend);
  2556.         printf ("%8d       --       --n",
  2557.                 pNode->nodeBroadcast.commPktNextExpect);
  2558.         }
  2559.     return (TRUE);
  2560.     }
  2561. /***************************************************************************
  2562. *
  2563. * distNodeStateToName - helper for distNodeDbShow() (VxFusion option)
  2564. *
  2565. * Return string describing constant.
  2566. *
  2567. * AVAILABILITY
  2568. * This routine is distributed as a component of the unbundled distributed
  2569. * message queues option, VxFusion.
  2570. *
  2571. * RETURNS: Descriptive string.
  2572. * NOMANUAL
  2573. */
  2574. LOCAL const char *distNodeStateToName
  2575.     (
  2576.     int state      /* state to decode */
  2577.     )
  2578.     {
  2579. LOCAL const char *const dead[] =
  2580.     {"CRASHED"};
  2581. LOCAL const char *const alive[] =
  2582.     {"BOOTING", "RESYNC", "NETWORK", "OPERATIONAL"};
  2583.     if (DIST_NODE_STATE_IS_ALIVE (state))
  2584.         return (alive[((state & ~DIST_NODE_STATE_ALIVE) - 1)]);
  2585.     else
  2586.         return (dead[((state & ~DIST_NODE_STATE_ALIVE) - 1)]);
  2587.     }
  2588. #endif    /* DIST_DIAGNOSTIC_SHOW */