distNetLib.c
上传用户:baixin
上传日期:2008-03-13
资源大小:4795k
文件大小:22k
开发平台:

MultiPlatform

  1. /* distNetLib - distributed objects network layer (VxFusion option) */
  2. /* Copyright 1999 - 2002 Wind River Systems, Inc. */
  3. /*
  4. modification history
  5. --------------------
  6. 01k,18apr02,jws  fix SPR74878 (not lock node database properly)
  7. 01j,23oct01,jws  fix compiler warnings (SPR 71117)
  8. 01i,04oct01,jws  final fixes for SPR 34770
  9. 01h,27sep01,p_r  Fixes for SPR#34770
  10. 01g,24may99,drm  added vxfusion prefix to VxFusion related includes
  11. 01f,23feb99,drm  library documentation update
  12. 01e,22feb99,drm  added documentation for library and distNetInput() function
  13. 01d,12aug98,drm  added #include stmt for distObjTypeP.h
  14. 01c,08aug98,drm  added code to set broadcast flag when sending broadcast msgs
  15. 01b,20may98,drm  removed some warning messages by initializing pointers to NULL
  16. 01a,05sep97,ur   written.
  17. */
  18. /*
  19. DESCRIPTION
  20. This library contains the distributed objects network layer.  The network layer
  21. puts messages into telegram buffers and reassembles them from telegram 
  22. buffers.  When a message exceeds the space allocated for message data in a 
  23. telegram, the network layer fragments the message into multiple telegram 
  24. buffers, and reassembles a single message from multiple telegram buffers.  
  25. After placing a message into a telegram buffer(s), this layer calls the
  26. adapter's send routine and sends a telegram out over the
  27. transport.  If the message has to be fragmented into more than one telegram 
  28. buffer, the adapter send routine is called once for each buffer used.
  29. Upon receiving a telegram from a remote node, an adapter must call this
  30. library's distNetInput() routine to forward a telegram buffer to VxFusion.
  31. AVAILABILITY
  32. This module is distributed as a component of the unbundled distributed
  33. message queues option, VxFusion.
  34. INCLUDE FILES: distNetLib.h
  35. SEE ALSO: distIfLib
  36. */
  37. #include "vxWorks.h"
  38. #if defined (DIST_NET_REPORT) || defined (DIST_DIAGNOSTIC)
  39. #include "stdio.h"
  40. #endif
  41. #include "errnoLib.h"
  42. #include "string.h"
  43. #include "taskLib.h"
  44. #include "semLib.h"
  45. #include "netinet/in.h"
  46. #include "private/semLibP.h"
  47. #include "private/distObjTypeP.h"
  48. #include "vxfusion/distLib.h"
  49. #include "vxfusion/distIfLib.h"
  50. #include "vxfusion/distStatLib.h"
  51. #include "vxfusion/private/distNetLibP.h"
  52. #include "vxfusion/private/distNodeLibP.h"
  53. #include "vxfusion/private/distTBufLibP.h"
  54. #include "vxfusion/private/distPktLibP.h"
  55. /* defines */
  56. #ifndef MAX
  57. #define MAX(a, b) (((a) < (b)) ? (b) : (a))
  58. #endif
  59. #ifndef MIN
  60. #define MIN(a, b) (((a) > (b)) ? (b) : (a))
  61. #endif
  62. DIST_SERV_NODE    servTable[DIST_SERV_MAX];
  63. /* locals */
  64. LOCAL BOOL       distNetLibInstalled = FALSE;
  65. LOCAL FUNCPTR    distNetServiceHook = NULL;
  66. /* local prototypes */
  67. LOCAL void distNetServTask (DIST_SERV_FUNC servInput,
  68.                             DIST_SERV_NODE *servNode);
  69. LOCAL STATUS distNetServCall (int servId, DIST_TBUF_HDR *pDeliver);
  70. /***************************************************************************
  71. *
  72. * distNetLibInit - initialize this module (VxFusion option)
  73. *
  74. * This routine currently does nothing.
  75. *
  76. * AVAILABILITY
  77. * This routine is distributed as a component of the unbundled distributed
  78. * message queues option, VxFusion.
  79. *
  80. * RETURNS: N/A
  81. * NOMANUAL
  82. */
  83. void distNetLibInit (void)
  84.     {
  85.     }
  86. /***************************************************************************
  87. *
  88. * distNetInit - initialize server table (VxFusion option)
  89. *
  90. * This routine initializes the server table.
  91. *
  92. * AVAILABILITY
  93. * This routine is distributed as a component of the unbundled distributed
  94. * message queues option, VxFusion.
  95. *
  96. * RETURNS: N/A
  97. * NOMANUAL
  98. */
  99. void distNetInit (void)
  100.     {
  101.     int i;
  102.     bzero ((char *) &servTable[0], sizeof(servTable));
  103.     for (i = 0; i < DIST_SERV_MAX; i++)
  104.         {
  105.         servTable[i].servTaskPrio = servTable[i].servNetPrio = (-1);
  106.         }
  107.     distNetLibInstalled = TRUE;
  108.     }
  109. /***************************************************************************
  110. *
  111. * distNetServAdd - add a service (VxFusion option)
  112. *
  113. * This routine adds a service to the server table and spawns a task to
  114. * implement the service.
  115. *
  116. * AVAILABILITY
  117. * This routine is distributed as a component of the unbundled distributed
  118. * message queues option, VxFusion.
  119. *
  120. * RETURNS: OK, if successful.
  121. * NOMANUAL
  122. */
  123. STATUS distNetServAdd
  124.     (
  125.     int            servId,          /* id of service to add */
  126.     DIST_SERV_FUNC servInput,       /* service input function to call */
  127.     char *         servTaskName,    /* name of service task */
  128.     int            servNetPrio,     /* priority of service on the network */
  129.     int            servTaskPrio,    /* priority of service task */
  130.     int            servTaskStackSz  /* stack size for service task */
  131.     )
  132.     {
  133.     DIST_SERV_NODE    *pServNode = (DIST_SERV_NODE *) &servTable[servId];
  134.     int        taskPrio;
  135.     int        tid;
  136.     if (!distNetLibInstalled || servId < 0 || servId > DIST_SERV_MAX - 1)
  137.         return (ERROR);
  138.     semBInit (&pServNode->servQLock, SEM_Q_FIFO, SEM_FULL);
  139.     semBInit (&pServNode->servWait4Jobs, SEM_Q_FIFO, SEM_EMPTY);
  140.     pServNode->servId = servId;
  141.     pServNode->servUp = FALSE;
  142.     pServNode->pServQ = NULL;
  143.     if (pServNode->servTaskPrio == -1)
  144.         pServNode->servTaskPrio = servTaskPrio;
  145.     if (pServNode->servNetPrio == -1)
  146.         pServNode->servNetPrio = servNetPrio;
  147.     taskPrio = pServNode->servTaskPrio;
  148.     tid = taskSpawn (servTaskName,
  149.                      taskPrio,
  150.                      VX_SUPERVISOR_MODE | VX_UNBREAKABLE,
  151.                      servTaskStackSz,
  152.                      (FUNCPTR) distNetServTask,
  153.                      (int) servInput,
  154.                      (int) pServNode,
  155.                      0, 0, 0, 0, 0, 0, 0, 0);
  156.     if (tid == ERROR)
  157.         return (ERROR);
  158.     pServNode->servTaskId = tid;
  159.     return (OK);
  160.     }
  161. /***************************************************************************
  162. *
  163. * distNetServTask - service task (VxFusion option)
  164. *
  165. * This is the common entry point for all network service tasks. It
  166. * calls the individual service functions.
  167. *
  168. * AVAILABILITY
  169. * This routine is distributed as a component of the unbundled distributed
  170. * message queues option, VxFusion.
  171. *
  172. * RETURNS: N/A
  173. * NOMANUAL
  174. */
  175. LOCAL void distNetServTask
  176.     (
  177.     DIST_SERV_FUNC   servInput,     /* function to process input */
  178.     DIST_SERV_NODE * pServNode      /* server node */
  179.     )
  180.     {
  181.     DIST_TBUF_HDR * pDeliver;
  182.     DIST_NODE_ID    nodeSrcId;
  183.     DIST_STATUS     dStatus;
  184.     FOREVER
  185.         {
  186.         semTake (&pServNode->servWait4Jobs, WAIT_FOREVER);
  187.         while (pServNode->pServQ)
  188.             {
  189.             semTake (&pServNode->servQLock, WAIT_FOREVER);
  190.             DIST_TBUF_HDR_DEQUEUE (pServNode->pServQ, pDeliver);
  191.             semGive (&pServNode->servQLock);
  192.             nodeSrcId = pDeliver->tBufHdrSrc;
  193.             if ((dStatus = servInput (nodeSrcId, pDeliver)) != OK)
  194.                 {
  195. #ifdef DIST_NET_REPORT
  196.                 printf ("distNetServTask/#%d: service task returned %dn",
  197.                         pServNode->servId, dStatus);
  198. #endif  
  199.                 if (distNetServiceHook)
  200.                     (* distNetServiceHook) (pServNode->servId, dStatus);
  201.                 }
  202.                 
  203.             /* locking probably not required, but doesn't cost much */
  204.             distNodeDbLock ();
  205.             DIST_TBUF_FREEM (pDeliver);
  206.             distNodeDbUnlock ();
  207.             }
  208.         }
  209.     }
  210. /***************************************************************************
  211. *
  212. * distNetServConfig - configure service (VxFusion option)
  213. *
  214. * This routine is used to change a service's task and network priority.
  215. *
  216. * AVAILABILITY
  217. * This routine is distributed as a component of the unbundled distributed
  218. * message queues option, VxFusion.
  219. *
  220. * RETURNS: OK, if successful.
  221. * NOMANUAL
  222. */
  223. STATUS distNetServConfig
  224.     (
  225.     int        servId,      /* ID of service to configure */
  226.     int        taskPrio,    /* new task priority */
  227.     int        netPrio      /* new network priority */
  228.     )
  229.     {
  230.     DIST_SERV_NODE * pServNode = (DIST_SERV_NODE *) &servTable[servId];
  231.     if (servId >= 0 && servId < DIST_SERV_MAX)
  232.         {
  233.         if (netPrio != -1)
  234.             {
  235.             if (netPrio < 0 || netPrio > 7)
  236.                 return (ERROR);
  237.             pServNode->servNetPrio = netPrio;
  238.             }
  239.         if (taskPrio != -1)
  240.             {
  241.             if (taskPrio < 0 || taskPrio > 255)
  242.                 return (ERROR);
  243.             pServNode->servTaskPrio = taskPrio;
  244.             if (DIST_NET_SERV_INST (servId))
  245.                 return (taskPrioritySet (pServNode->servTaskId, taskPrio));
  246.             }
  247.         return (OK);
  248.         }
  249.     return (ERROR);
  250.     }
  251. /***************************************************************************
  252. *
  253. * distNetServUp - change state of service to up (VxFusion option)
  254. *
  255. * This routine marks to state of a service as UP.
  256. *
  257. * AVAILABILITY
  258. * This routine is distributed as a component of the unbundled distributed
  259. * message queues option, VxFusion.
  260. *
  261. * RETURNS: ERROR, if no such service exists.
  262. * NOMANUAL
  263. */
  264. STATUS distNetServUp
  265.     (
  266.     int servId      /* ID of service */
  267.     )
  268.     {
  269.     DIST_SERV_NODE * pServNode;
  270.     if (servId >= 0  &&  servId < DIST_SERV_MAX
  271.                      &&  DIST_NET_SERV_INST (servId))
  272.         {
  273.         pServNode = (DIST_SERV_NODE *) &servTable[servId];
  274.         pServNode->servUp = TRUE;
  275.         return (OK);
  276.         }
  277.     return (ERROR);
  278.     }
  279. /***************************************************************************
  280. *
  281. * distNetServDown - change state of service to down (VxFusion option)
  282. *
  283. * This routine changes the state of a service to DOWN.
  284. *
  285. * AVAILABILITY
  286. * This routine is distributed as a component of the unbundled distributed
  287. * message queues option, VxFusion.
  288. *
  289. * RETURNS: ERROR, if no such service exists.
  290. * NOMANUAL
  291. */
  292. STATUS distNetServDown
  293.     (
  294.     int servId    /* ID of service */
  295.     )
  296.     {
  297.     DIST_SERV_NODE * pServNode;
  298.     if (servId >= 0 && servId < DIST_SERV_MAX
  299.                     && DIST_NET_SERV_INST (servId))
  300.     {
  301.     pServNode = (DIST_SERV_NODE *) &servTable[servId];
  302.     pServNode->servUp = FALSE;
  303.     return (OK);
  304.     }
  305.     return (ERROR);
  306.     }
  307. /***************************************************************************
  308. *
  309. * distNetServCall - call a service with an incoming packet (VxFusion option)
  310. *
  311. * This routine passes incoming data to a service.
  312. *
  313. * AVAILABILITY
  314. * This routine is distributed as a component of the unbundled distributed
  315. * message queues option, VxFusion.
  316. *
  317. * RETURNS: OK, if successful.
  318. * NOMANUAL
  319. */
  320. LOCAL STATUS distNetServCall
  321.     (
  322.     int             servId,      /* ID of service */
  323.     DIST_TBUF_HDR * pDeliver     /* the message */
  324.     )
  325.     {
  326.     DIST_SERV_NODE * pServNode;
  327.     if (servId >= 0 && servId < DIST_SERV_MAX
  328.                     && DIST_NET_SERV_INST (servId))
  329.         {
  330.         pServNode = (DIST_SERV_NODE *) &servTable[servId];
  331.         /*
  332.          * Although the service is not up yet, we return OK and
  333.          * force an acknowledge for the packet. We have to keep
  334.          * the window rotating.
  335.          */
  336.         if (pServNode->servUp == FALSE)
  337.             return (OK);
  338.         semTake (&pServNode->servQLock, WAIT_FOREVER);
  339.         DIST_TBUF_HDR_ENQUEUE (pServNode->pServQ, pDeliver);
  340.         semGive (&pServNode->servQLock);
  341.         semGive (&pServNode->servWait4Jobs);
  342.         return (OK);
  343.         }
  344.     return (ERROR);
  345.     }
  346. /***************************************************************************
  347. *
  348. * distNetCtl - control function for network layer (VxFusion option)
  349. *
  350. * This routine performs control functions on the network layer.
  351. * The following functions are accepted:
  352. * is
  353. * i DIST_CTL_SERVICE_HOOK
  354. * Set a function to be called, each time a service called by a
  355. * remote node fails.
  356. * i DIST_CTL_SERVICE_CONF
  357. * Unsed to configure a certain service. The <argument> parameter is
  358. * a pointer to DIST_SERV_CONF, which holds the service id and its
  359. * configuration to set.
  360. * ie
  361. *
  362. * RETURNS: Return value of function called, or ERROR.
  363. *
  364. * ERRNO:
  365. * S_distLib_UNKNOWN_REQUEST
  366. *
  367. * AVAILABILITY
  368. * This routine is distributed as a component of the unbundled distributed
  369. * message queues option, VxFusion.
  370. *
  371. * NOMANUAL
  372. */
  373. int distNetCtl
  374.     (
  375.     int    function,        /* function code      */
  376.     int    argument         /* arbitrary argument */
  377.     )
  378.     {
  379.     DIST_SERV_CONF    *pServConfig;
  380.     switch (function)
  381.         {
  382.         case DIST_CTL_SERVICE_HOOK:
  383.             distNetServiceHook = (FUNCPTR) argument;
  384.             return (OK);
  385.         case DIST_CTL_SERVICE_CONF:
  386.             {
  387.             if ((pServConfig = (DIST_SERV_CONF *) argument) == NULL)
  388.                 return (ERROR);
  389.         
  390.             return (distNetServConfig (pServConfig->servId,
  391.                     pServConfig->taskPrio, pServConfig->netPrio));
  392.             }
  393.         default:
  394.             errnoSet (S_distLib_UNKNOWN_REQUEST);
  395.             return (ERROR);
  396.         }
  397.     }
  398. /***************************************************************************
  399. *
  400. * distNetSend - send message from a continuous buffer to the network (VxFusion option)
  401. *
  402. * This routine sends a message of length <nBytes> starting at <buffer>
  403. * to node <distNodeDest>. If necessary, the message is fragmented.
  404. *
  405. * The routine blocks until the message is acknowledged or timeout
  406. * has expired.
  407. *
  408. * AVAILABILITY
  409. * This routine is distributed as a component of the unbundled distributed
  410. * message queues option, VxFusion.
  411. *
  412. * RETURNS: OK, if successful.
  413. * NOMANUAL
  414. */
  415. STATUS distNetSend
  416.     (
  417.     DIST_NODE_ID nodeIdDest,  /* node to which message is destinated */
  418.     DIST_PKT *   pPkt,        /* packet to send                      */
  419.     UINT         nBytes,      /* length of message                   */
  420.     int          timo,        /* ticks to wait                       */
  421.     int          prio         /* priority level to send message on   */
  422.     )
  423.     {
  424.     char *          pFrag;
  425.     char *          pBufEnd;
  426.     DIST_TBUF *     pTBuf;
  427.     DIST_TBUF_HDR * pTBufHdr;
  428.     int             mtu;
  429.     int             tBufNBytes;
  430.     short           seq = 0;
  431.     distStat.netOutReceived++;
  432.     if (nodeIdDest == DIST_IF_BROADCAST_ADDR &&
  433.             distNodeGetNumNodes (DIST_NODE_NUM_NODES_ALIVE) <= 1)
  434.         return (OK);
  435.     pPkt->pktLen = htons ((UINT16)nBytes);
  436.     /* Get first TBuf for header. */
  437.     if ((pTBufHdr = DIST_TBUF_HDR_ALLOC ()) == NULL)
  438.         {
  439.         distStat.netOutDiscarded++;
  440.         return (ERROR);                /* out of TBufs */
  441.         }
  442.     pTBufHdr->tBufHdrDest = nodeIdDest;
  443.     pTBufHdr->tBufHdrOverall = (INT16) nBytes;
  444.     pTBufHdr->tBufHdrTimo = timo;
  445.     pTBufHdr->tBufHdrPrio = prio;
  446.     /* Split the packet to n telegrams and copy them to TBufs. */
  447.     pBufEnd = (pFrag = (char *) pPkt) + nBytes;
  448.     mtu = DIST_IF_MTU - DIST_IF_HDR_SZ;
  449.     while ((pTBuf = DIST_TBUF_ALLOC ()) != NULL)
  450.         {
  451.         DIST_TBUF_ENQUEUE (pTBufHdr, pTBuf);
  452.         tBufNBytes = MIN (pBufEnd - pFrag, mtu);
  453.         pTBuf->tBufFlags = DIST_TBUF_FLAG_MF;
  454.         if (seq == 0)
  455.             pTBuf->tBufFlags |= DIST_TBUF_FLAG_HDR;
  456.         if (nodeIdDest == DIST_IF_BROADCAST_ADDR)
  457.             {
  458.             pTBuf->tBufType = DIST_TBUF_TTYPE_BDTA;
  459.             pTBuf->tBufFlags |= DIST_TBUF_FLAG_BROADCAST;
  460.             }
  461.         else
  462.             pTBuf->tBufType = DIST_TBUF_TTYPE_DTA;
  463.         pTBuf->tBufSeq = seq++;
  464.         pTBuf->tBufNBytes = (UINT16) tBufNBytes;
  465.         bcopy (pFrag, pTBuf->pTBufData, tBufNBytes);
  466.         pFrag += tBufNBytes;
  467.         if (pFrag == pBufEnd)    /* done? */
  468.             {
  469.             pTBuf->tBufFlags &= ~DIST_TBUF_FLAG_MF;
  470. #ifdef DIST_NET_REPORT
  471.             printf ("distNetSend:%p: node %lu, numSeq %d, lenOverall %dn",
  472.                     pTBufHdr, nodeIdDest, seq, pTBufHdr->tBufHdrOverall);
  473. #endif
  474.             return (distNodePktSend (pTBufHdr));
  475.             }
  476.         }
  477.     /* TBuf shortage */
  478.     DIST_TBUF_FREEM (pTBufHdr);
  479.     distStat.netOutDiscarded++;
  480.     return (ERROR);    
  481.     }
  482. /***************************************************************************
  483. *
  484. * distNetIOVSend - send message from discontinuous buffer to the network (VxFusion option)
  485. *
  486. * This routine sends a message from a scatter/gather buffer (IOV)
  487. * to node <nodeIdDest>. If necessary, the message is fragmented.
  488. *
  489. * This routine blocks until the message is acknowledged or timeout
  490. * has exceeded.
  491. *
  492. * AVAILABILITY
  493. * This routine is distributed as a component of the unbundled distributed
  494. * message queues option, VxFusion.
  495. *
  496. * RETURNS: OK, if successful.
  497. * NOMANUAL
  498. */
  499. STATUS distNetIOVSend
  500.     (
  501.     DIST_NODE_ID  nodeIdDest, /* node to which message is destinated */
  502.     DIST_IOVEC *  pIOV,       /* list of buffers to gather           */
  503.     int           numIOV,     /* number of buffers                   */
  504.     int           timo,       /* ticks to wait                       */
  505.     int           prio        /* priority level to send message on   */
  506.     )
  507.     {
  508.     DIST_IOVEC *    p;
  509.     DIST_TBUF_HDR * pTBufHdr;
  510.     DIST_TBUF *     pTBuf = NULL;
  511.     DIST_PKT *      pPkt;
  512.     int             szSrc, szDest, sz;
  513.     int             seq;
  514.     char *          pSrc;
  515.     char *          pDest = NULL;
  516.     distStat.netOutReceived++;
  517.     if (numIOV <= 0)
  518.         return (ERROR);
  519.     if (nodeIdDest == DIST_IF_BROADCAST_ADDR
  520.               &&
  521.         distNodeGetNumNodes (DIST_NODE_NUM_NODES_ALIVE) <= 1)
  522.         {
  523.         return (OK);
  524.         }
  525.     /* Get first TBuf for header. */
  526.     if ((pTBufHdr = DIST_TBUF_HDR_ALLOC ()) == NULL)
  527.         {
  528.         distStat.netOutDiscarded++;
  529.         return (ERROR);
  530.         }
  531.     pTBufHdr->tBufHdrDest = nodeIdDest;
  532.     pTBufHdr->tBufHdrTimo = timo;
  533.     pTBufHdr->tBufHdrPrio = prio;
  534.     pTBufHdr->tBufHdrOverall = 0;    /* later */
  535.     /* Copy the discontinious buffer to a list of tBufs. */
  536.     p = pIOV;
  537.     szDest = 0;
  538.     seq = 0;
  539.     while (numIOV--)
  540.         {
  541.         pTBufHdr->tBufHdrOverall += (szSrc = p->IOLen);
  542.         pSrc = p->pIOBuffer;
  543.         do
  544.             {
  545.             if (szDest == 0)
  546.                 {
  547.                 if ((pTBuf = DIST_TBUF_ALLOC ()) == NULL)
  548.                     {
  549.                     distStat.netOutDiscarded++;
  550.                     DIST_TBUF_FREEM (pTBufHdr);
  551.                     return (ERROR);
  552.                     }
  553.                 pTBuf->tBufFlags = 0;
  554.                 if (seq == 0)
  555.                     pTBuf->tBufFlags |= DIST_TBUF_FLAG_HDR;
  556.                 pTBuf->tBufFlags |= DIST_TBUF_FLAG_MF;
  557.                 if (nodeIdDest == DIST_IF_BROADCAST_ADDR)
  558.                     {
  559.                     pTBuf->tBufType = DIST_TBUF_TTYPE_BDTA;
  560.                     pTBuf->tBufFlags |= DIST_TBUF_FLAG_BROADCAST;
  561.                     }
  562.                 else
  563.                     pTBuf->tBufType = DIST_TBUF_TTYPE_DTA;
  564.                 pTBuf->tBufSeq = (UINT16) seq++;
  565.                 pTBuf->tBufNBytes = 0;
  566.                 DIST_TBUF_ENQUEUE (pTBufHdr, pTBuf);
  567.                 szDest = DIST_IF_MTU - DIST_IF_HDR_SZ;
  568.                 pDest = pTBuf->pTBufData;
  569.                 }
  570.             sz = MIN (szSrc, szDest);
  571.             bcopy (pSrc, pDest, sz);
  572.             pTBuf->tBufNBytes += sz;
  573.             szSrc -= sz;
  574.             szDest -= sz;
  575.             pSrc += sz;
  576.             pDest += sz;
  577.             }
  578.         while (szSrc > 0);
  579.         p++;
  580.         }
  581.     pTBuf->tBufFlags &= ~DIST_TBUF_FLAG_MF;
  582. #ifdef DIST_NET_REPORT
  583.     printf ("distNetSend:%p: node %lu, numSeq %d, lenOverall %dn",
  584.             pTBufHdr, nodeIdDest, seq, pTBufHdr->tBufHdrOverall);
  585. #endif
  586.     pPkt = (DIST_PKT *) ((DIST_TBUF_GET_NEXT (pTBufHdr))->pTBufData);
  587.     pPkt->pktLen = htons (pTBufHdr->tBufHdrOverall);
  588.     return (distNodePktSend (pTBufHdr));
  589.     }
  590. /***************************************************************************
  591. *
  592. * distNetInput - accept a telegram buffer and send it to upper VxFusion layers (VxFusion option)
  593. *
  594. * This routine accepts a telegram buffer from an adapter and tries to 
  595. * reassemble a message from it.  If the telegram buffer contains an entire 
  596. * message, distNetInput() simply forwards the message to the upper VxFusion 
  597. * layers and sends an acknowledgment for the message.  If the telegram buffer 
  598. * contains a message fragment, the message fragment is added to existing 
  599. * message fragments, if any.  If the message fragment completes a message, the 
  600. * message is forwarded to the upper VxFusion layers and distNetInput() 
  601. * sends an acknowledgment for the message.
  602. *
  603. * AVAILABILITY
  604. * This routine is distributed as a component of the unbundled distributed
  605. * message queues option, VxFusion.
  606. *
  607. * RETURNS: N/A
  608. * NOMANUAL
  609. */
  610. void distNetInput
  611.     (
  612.     DIST_NODE_ID nodeIdIn, /* source node of the telegram buffer */
  613.     int          prioIn,   /* priority the telegram arrived with */
  614.     DIST_TBUF *  pTBufIn   /* the telegram buffer itself         */
  615.     )
  616.     {
  617.     void * pComm; /* from distNodeReassemble(), to distNodeGetReassembled() */
  618.     DIST_TBUF_HDR *  pReassembled;
  619.     DIST_PKT *       pPkt;
  620.     distStat.netInReceived++;
  621.     /* Try to reassemble. */
  622.     
  623.     if ((pComm = distNodeReassemble (nodeIdIn, prioIn, pTBufIn)) == NULL)
  624.         return;
  625.     /*
  626.      * Now that we are here, a packet has been reassembled.
  627.      * <pComm> points to the communication, that has one or
  628.      * more packets.
  629.      */
  630.     while ((pReassembled = distNodeGetReassembled (pComm)) != NULL)
  631.         {
  632. #ifdef DIST_NET_REPORT
  633.             {
  634.             char * buffer;
  635.             int    len;
  636.             printf ("distNetInput: packet %p reassembledn", pReassembled);
  637.             buffer = malloc ((len = pReassembled->tBufHdrOverall));
  638.             if (buffer)
  639.                 {
  640.                 distTBufCopy (DIST_TBUF_GET_NEXT (pReassembled), 0,
  641.                         buffer, len);
  642.                 distDump (buffer, len);
  643.                 free (buffer);
  644.                 }
  645.             }
  646. #endif
  647.         distStat.netReassembled++;
  648.         pPkt = (DIST_PKT *) ((DIST_TBUF_GET_NEXT (pReassembled))->pTBufData);
  649.         if ((ntohs (pPkt->pktLen) != pReassembled->tBufHdrOverall) ||
  650.                 (distNetServCall (pPkt->pktType, pReassembled) == ERROR))
  651.             {
  652. #ifdef DIST_DIAGNOSTIC
  653.             if (ntohs (pPkt->pktLen) != pReassembled->tBufHdrOverall)
  654.                 {
  655.                 distLog("distNetInput: wrong length (has %d, expected %d)n",
  656.                         pReassembled->tBufHdrOverall, ntohs (pPkt->pktLen));
  657.                 }
  658.             else
  659.                 distLog("distNetInput: unknown type (%d) or service downn",
  660.                         pPkt->pktType);
  661. #endif
  662.             distNodePktDiscard (pReassembled->tBufHdrSrc, pReassembled);
  663.             }
  664.         else
  665.             distNodePktAck (pReassembled->tBufHdrSrc, pReassembled, 0);
  666.         /* loop back for the next packet. */
  667.         }
  668.     }