distIncoLib.c
上传用户:baixin
上传日期:2008-03-13
资源大小:4795k
文件大小:11k
开发平台:

MultiPlatform

  1. /* distIncoLib.c - incorporation protocol library (VxFusion option) */
  2. /* Copyright 1999-2002 Wind River Systems, Inc. */
  3. /*
  4. modification history
  5. --------------------
  6. 01e,23oct01,jws  fix compiler warnings (SPR 71117)
  7. 01d,24may99,drm  added vxfusion prefix to VxFusion related includes
  8. 01c,11sep98,drm  changed distStatLib.h to private/distLibP.h
  9. 01b,20jan98,ur   changed booting behaviour, do always send an INCO_UPNOW.
  10. 01a,10sep97,ur   written.
  11. */
  12. /*
  13. DESCRIPTION
  14. The incorporation library handles the appearence of a new node.
  15. AVAILABILITY
  16. This module is distributed as a component of the unbundled distributed
  17. message queues option, VxFusion.
  18. */
  19. #include "vxWorks.h"
  20. #if defined (DIST_INCO_REPORT) || defined (DIST_DIAGNOSTIC)
  21. #include "stdio.h"
  22. #endif
  23. #include "stdlib.h"
  24. #include "semLib.h"
  25. #include "vxfusion/distIfLib.h"
  26. #include "vxfusion/private/distLibP.h"
  27. #include "vxfusion/private/distNodeLibP.h"
  28. #include "vxfusion/private/msgQDistGrpLibP.h"
  29. #include "vxfusion/private/distNameLibP.h"
  30. #include "vxfusion/private/distTBufLibP.h"
  31. #include "vxfusion/private/distIncoLibP.h"
  32. #include "vxfusion/private/distNetLibP.h"
  33. #include "vxfusion/private/distPktLibP.h"
  34. /* locals */
  35. LOCAL SEMAPHORE           distIncoWait4Done;
  36. LOCAL BOOL                distIncoLibInstalled = FALSE;
  37. /* local prototypes */
  38. LOCAL DIST_STATUS    distIncoInput (DIST_NODE_ID nodeIdIn,
  39.                                     DIST_TBUF_HDR *pReassembled);
  40. /***************************************************************************
  41. *
  42. * distIncoLibInit - initialize this module (VxFusion option)
  43. *
  44. * This routine currently does nothing.
  45. *
  46. * AVAILABILITY
  47. * This routine is distributed as a component of the unbundled distributed
  48. * message queues option, VxFusion.
  49. *
  50. * RETURNS: N/A
  51. * NOMANUAL
  52. */
  53. void distIncoLibInit (void)
  54.     {
  55.     }
  56. /***************************************************************************
  57. *
  58. * distIncoInit - start incorporation service (VxFusion option)
  59. *
  60. * This routine starts the incorporation service.
  61. *
  62. * AVAILABILITY
  63. * This routine is distributed as a component of the unbundled distributed
  64. * message queues option, VxFusion.
  65. *
  66. * RETURNS: OK, if service is started.
  67. * NOMANUAL
  68. */
  69. STATUS distIncoInit (void)
  70.     {
  71.     STATUS    status;
  72.     if (distIncoLibInstalled)
  73.         return (OK);
  74.     semBInit (&distIncoWait4Done, SEM_Q_FIFO, SEM_EMPTY);
  75.     /* Add INCO service to table of services. */
  76.     status = distNetServAdd (DIST_PKT_TYPE_INCO,
  77.                              distIncoInput,
  78.                              DIST_INCO_SERV_NAME,
  79.                              DIST_INCO_SERV_NET_PRIO,
  80.                              DIST_INCO_SERV_TASK_PRIO,
  81.                              DIST_INCO_SERV_TASK_STACK_SZ);
  82.     if (status == ERROR)
  83.         return (ERROR);
  84.     distIncoLibInstalled = TRUE;
  85.     return (OK);
  86.     }
  87. /***************************************************************************
  88. *
  89. * distIncoStart - start the incorporation protocol (VxWorks option)
  90. *
  91. * This routine starts the incorporation protocol.
  92. *
  93. * AVAILABILITY
  94. * This routine is distributed as a component of the unbundled distributed
  95. * message queues option, VxFusion.
  96. *
  97. * RETURNS: OK, is successful.
  98. * NOMANUAL
  99. */
  100. STATUS distIncoStart
  101.     (
  102.     int    waitNTicks        /* ticks distNodeBootstrap() should wait */
  103.     )
  104.     {
  105.     DIST_PKT_INCO    pktInco;
  106.     DIST_NODE_ID     nodeId;
  107.     STATUS           status, s1, s2, s3, s4;
  108.     DIST_PKT_INCO    pktUpNow;
  109.     s1 = distNetServUp (DIST_PKT_TYPE_INCO);    /* incorporation service   */
  110.     s2 = distNetServUp (DIST_PKT_TYPE_DGDB);    /* group database service  */
  111.     s3 = distNetServUp (DIST_PKT_TYPE_GAP);     /* group agreement service */
  112.     s4 = distNetServUp (DIST_PKT_TYPE_DNDB);    /* name database service   */
  113.     if (s1 == ERROR || s2 == ERROR || s3 == ERROR || s4 == ERROR)
  114.         return (ERROR);
  115. #ifdef DIST_DIAGNOSTIC
  116.     distLog ("distIncoStart: bootstrappingn");
  117. #endif
  118.     distNodeBootstrap (waitNTicks);
  119. #ifdef DIST_DIAGNOSTIC
  120.     distLog ("distIncoStart: bootstrapping done--okn");
  121. #endif
  122.     if (distNodeGetGodfatherId (&nodeId) == ERROR)
  123.         {
  124.         /*
  125.          * Since we found no godfather, we have to assume,
  126.          * that we got up first. There is no need to try
  127.          * to send an INCO_REQ.
  128.          * But there are some rare situations, where some
  129.          * other nodes that are also booting, do know about
  130.          * us, unless we don't. They have registrated us in
  131.          * BOOTING state. Therefore we send an INCO_UPNOW
  132.          * broadcast, to force them to shift our state to
  133.          * OPERATIONAL.
  134.          */
  135.         s1 = distNetServUp (DIST_PKT_TYPE_MSG_Q);     /* message queue serv */
  136.         s2 = distNetServUp (DIST_PKT_TYPE_MSG_Q_GRP); /* message group serv */
  137.         if (s1 == ERROR || s2 == ERROR)
  138.             distPanic ("distIncoInput/DONE: Cannot activate service.n");
  139.         distNodeLocalSetState (DIST_NODE_STATE_OPERATIONAL);
  140.         pktUpNow.pktIncoHdr.pktType = DIST_PKT_TYPE_INCO;
  141.         pktUpNow.pktIncoHdr.pktSubType = DIST_PKT_TYPE_INCO_UPNOW;
  142.         distNetSend (DIST_IF_BROADCAST_ADDR,
  143.                      (DIST_PKT *) &pktUpNow, sizeof (pktUpNow),
  144.                       WAIT_FOREVER, DIST_INCO_PRIO);
  145. #ifdef DIST_DIAGNOSTIC
  146.         distLog ("distIncoStart: incorporation done--we are 1stn");
  147. #endif
  148.         return (OK);
  149.         }
  150.     pktInco.pktIncoHdr.pktType = DIST_PKT_TYPE_INCO;
  151.     pktInco.pktIncoHdr.pktSubType = DIST_PKT_TYPE_INCO_REQ;
  152. #ifdef DIST_DIAGNOSTIC
  153.     distLog ("distIncoStart: sending incorporation request to node 0x%lxn",
  154.             nodeId);
  155. #endif
  156.     status = distNetSend (nodeId, (DIST_PKT *) &pktInco, sizeof (pktInco),
  157.                           WAIT_FOREVER, DIST_INCO_PRIO);
  158.     if (status == ERROR)
  159.         return (ERROR);
  160. #ifdef DIST_DIAGNOSTIC
  161.     distLog ("distIncoStart: incorporation request sent--okn");
  162. #endif
  163.     semTake (&distIncoWait4Done, WAIT_FOREVER);
  164. #ifdef DIST_DIAGNOSTIC
  165.     distLog ("distIncoStart: we are incorporated--okn");
  166. #endif
  167.     return (OK);
  168.     }
  169. /***************************************************************************
  170. *
  171. * distIncoInput - handle incorporation input (VxFusion option)
  172. *
  173. * This routine handles incorporation input.
  174. *
  175. * AVAILABILITY
  176. * This routine is distributed as a component of the unbundled distributed
  177. * message queues option, VxFusion.
  178. *
  179. * RETURNS: Status of input processing.
  180. * NOMANUAL
  181. */
  182. LOCAL DIST_STATUS distIncoInput
  183.     (
  184.     DIST_NODE_ID    nodeIdSrc,    /* source node's Id */
  185.     DIST_TBUF_HDR * pTBufHdr      /* the message to process */
  186.     )
  187.     {
  188.     DIST_PKT_INCO       pktInco;
  189.     DIST_PKT_INCO       pktDone;
  190.     int                 pktLen = pTBufHdr->tBufHdrOverall;
  191.     DIST_PKT_INCO       pktUpNow;
  192.     STATUS              s1, s2, s3;
  193.     DIST_NODE_DB_NODE * pNode;
  194. #ifdef DIST_DIAGNOSTIC
  195.     STATUS    status;
  196. #endif
  197.     if (pktLen != sizeof (DIST_PKT_INCO))
  198.         distPanic ("distIncoInput: packet too shortn");
  199.     distTBufCopy (DIST_TBUF_GET_NEXT (pTBufHdr), 0, (char *)&pktInco, pktLen);
  200.     switch (pktInco.pktIncoHdr.pktSubType)
  201.         {
  202.         case DIST_PKT_TYPE_INCO_REQ:
  203.             {
  204.             /*
  205.              * A new node introduces itself. Send name database
  206.              * and group information.
  207.              */
  208. #ifdef DIST_DIAGNOSTIC
  209.             distLog ("distIncoInput/REQ: node 0x%lx wants to get incorporatedn",
  210.                      nodeIdSrc);
  211. #endif
  212. #ifdef DIST_INCO_REPORT
  213.             printf ("distIncoInput: we are in state %dn",
  214.                     distNodeLocalGetState());
  215. #endif
  216.             if (msgQDistGrpBurst (nodeIdSrc) == ERROR)
  217.                 {
  218. #ifdef DIST_DIAGNOSTIC
  219.                 distLog ("distIncoInput/REQ: group db update failedn");
  220. #endif
  221.                 break;
  222.                 }
  223.             if (distNameBurst (nodeIdSrc) == ERROR)
  224.                 {
  225. #ifdef DIST_DIAGNOSTIC
  226.                 distLog ("distIncoInput/REQ: name db update failedn");
  227. #endif
  228.                 break;
  229.                 }
  230.             distNodeOperational (nodeIdSrc);
  231.             /* Signal DONE to godchild. */
  232.             pktDone.pktIncoHdr.pktType = DIST_PKT_TYPE_INCO;
  233.             pktDone.pktIncoHdr.pktSubType = DIST_PKT_TYPE_INCO_DONE;
  234. #ifdef DIST_DIAGNOSTIC
  235.             status = distNetSend (nodeIdSrc, (DIST_PKT *) &pktDone,
  236.                                   sizeof (pktDone), WAIT_FOREVER,
  237.                                   DIST_INCO_PRIO);
  238.             if (status == ERROR)
  239.                 distLog ("distIncoInput/REQ: failed sending DONEn");
  240.             else
  241.                 distLog ("distIncoInput/REQ: node 0x%lx is updated, DONE sentn",
  242.                          nodeIdSrc);
  243. #else
  244.             distNetSend (nodeIdSrc, (DIST_PKT *) &pktDone,
  245.                          sizeof (pktDone), WAIT_FOREVER, DIST_INCO_PRIO);
  246. #endif
  247.             break;
  248.             }
  249.         case DIST_PKT_TYPE_INCO_DONE:
  250.             {
  251. #ifdef DIST_INCO_REPORT
  252.             printf ("distIncoInput/DONE: we are incorporatedn");
  253. #endif
  254.             distNodeLocalSetState (DIST_NODE_STATE_OPERATIONAL);
  255.             s1 = distNetServUp (DIST_PKT_TYPE_MSG_Q);
  256.             s2 = distNetServUp (DIST_PKT_TYPE_GAP);
  257.             s3 = distNetServUp (DIST_PKT_TYPE_MSG_Q_GRP);
  258.             if (s1 == ERROR || s2 == ERROR || s3 == ERROR)
  259.                 distPanic ("distIncoInput/DONE: Cannot activate service.n");
  260.             pktUpNow.pktIncoHdr.pktType = DIST_PKT_TYPE_INCO;
  261.             pktUpNow.pktIncoHdr.pktSubType = DIST_PKT_TYPE_INCO_UPNOW;
  262. #ifdef DIST_DIAGNOSTIC
  263.             status = distNetSend (DIST_IF_BROADCAST_ADDR,
  264.                                   (DIST_PKT *) &pktUpNow, sizeof (pktUpNow),
  265.                                   WAIT_FOREVER, DIST_INCO_PRIO);
  266.             if (status == ERROR)
  267.                 distLog ("distIncoInput/DONE: failed sending UPNOWn");
  268. #else
  269.             distNetSend (DIST_IF_BROADCAST_ADDR,
  270.                          (DIST_PKT *) &pktUpNow, sizeof (pktUpNow),
  271.                          WAIT_FOREVER, DIST_INCO_PRIO);
  272. #endif
  273.             semGive (&distIncoWait4Done);
  274.             break;
  275.             }
  276.         case DIST_PKT_TYPE_INCO_UPNOW:
  277.             {
  278. #ifdef DIST_INCO_REPORT
  279.             printf ("distIncoInput/UPNOW: node 0x%lx is up nown",
  280.                     nodeIdSrc);
  281. #endif
  282.             pNode = distNodeOperational (nodeIdSrc);
  283.             if (pNode == NULL)
  284.                 {
  285.                 /*
  286.                  * This should never happen. But if it happens,
  287.                  * try to create that node.
  288.                  */
  289. #ifdef DIST_DIAGNOSTIC
  290.                 distLog ("distIncoInput/UPNOW: UPNOW from unknown node 0x%lxn",
  291.                         nodeIdSrc);
  292.                 pNode =
  293.                     distNodeCreate (nodeIdSrc, DIST_NODE_STATE_OPERATIONAL);
  294.                 if (pNode == NULL)
  295.                     distLog ("distIncoInput/UPNOW: creation of node failedn");
  296. #else
  297.                 distNodeCreate (nodeIdSrc, DIST_NODE_STATE_OPERATIONAL);
  298. #endif
  299.                 }
  300.             break;
  301.             }
  302.         default:
  303. #ifdef DIST_DIAGNOSTIC
  304.             distLog ("distIncoInput: unknown subtype (%d) of INCO protocoln",
  305.                     pktInco.pktIncoHdr.pktSubType);
  306. #endif
  307.             return (DIST_INCO_STATUS_PROTOCOL_ERROR);
  308.         }
  309.     return (DIST_INCO_STATUS_OK);
  310.     }
  311. /***************************************************************************
  312. *
  313. * distIncoIntro - introduce this node to the system (VxFusion option)
  314. *
  315. * This routine introduces this node to the system.
  316. *
  317. * AVAILABILITY
  318. * This routine is distributed as a component of the unbundled distributed
  319. * message queues option, VxFusion.
  320. *
  321. * RETURNS: OK
  322. * NOMANUAL
  323. */
  324. STATUS distIncoIntro (void)
  325.     {
  326.     return (OK);
  327.     }