ClusterMgr.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:21k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #include <ndb_global.h>
  14. #include <my_pthread.h>
  15. #include <ndb_limits.h>
  16. #include <ndb_version.h>
  17. #include "TransporterFacade.hpp"
  18. #include "ClusterMgr.hpp"
  19. #include <IPCConfig.hpp>
  20. #include "NdbApiSignal.hpp"
  21. #include "API.hpp"
  22. #include <NdbSleep.h>
  23. #include <NdbOut.hpp>
  24. #include <NdbTick.h>
  25. #include <signaldata/NodeFailRep.hpp>
  26. #include <signaldata/NFCompleteRep.hpp>
  27. #include <signaldata/ApiRegSignalData.hpp>
  28. #include <mgmapi.h>
  29. #include <mgmapi_configuration.hpp>
  30. #include <mgmapi_config_parameters.h>
  31. int global_flag_send_heartbeat_now= 0;
  32. // Just a C wrapper for threadMain
  33. extern "C" 
  34. void*
  35. runClusterMgr_C(void * me)
  36. {
  37.   ((ClusterMgr*) me)->threadMain();
  38.   /** 
  39.    * Sleep to allow another thread that is not exiting to take control 
  40.    * of signals allocated by this thread
  41.    *
  42.    * see Ndb::~Ndb() in Ndbinit.cpp
  43.    */  
  44. #ifdef NDB_OSE
  45.   NdbSleep_MilliSleep(50);
  46. #endif
  47.   return NULL;
  48. }
  49. extern "C" {
  50.   void ndbSetOwnVersion();
  51. }
  52. ClusterMgr::ClusterMgr(TransporterFacade & _facade):
  53.   theStop(0),
  54.   theFacade(_facade)
  55. {
  56.   DBUG_ENTER("ClusterMgr::ClusterMgr");
  57.   ndbSetOwnVersion();
  58.   clusterMgrThreadMutex = NdbMutex_Create();
  59.   noOfAliveNodes= 0;
  60.   noOfConnectedNodes= 0;
  61.   theClusterMgrThread= 0;
  62.   DBUG_VOID_RETURN;
  63. }
  64. ClusterMgr::~ClusterMgr()
  65. {
  66.   DBUG_ENTER("ClusterMgr::~ClusterMgr");
  67.   doStop();  
  68.   NdbMutex_Destroy(clusterMgrThreadMutex);
  69.   DBUG_VOID_RETURN;
  70. }
  71. void
  72. ClusterMgr::init(ndb_mgm_configuration_iterator & iter){
  73.   for(iter.first(); iter.valid(); iter.next()){
  74.     Uint32 tmp = 0;
  75.     if(iter.get(CFG_NODE_ID, &tmp))
  76.       continue;
  77.     theNodes[tmp].defined = true;
  78. #if 0
  79.     ndbout << "--------------------------------------" << endl;
  80.     ndbout << "--------------------------------------" << endl;
  81.     ndbout_c("ClusterMgr: Node %d defined as %s", tmp, config.getNodeType(tmp));
  82. #endif
  83.     unsigned type;
  84.     if(iter.get(CFG_TYPE_OF_SECTION, &type))
  85.       continue;
  86.     switch(type){
  87.     case NODE_TYPE_DB:
  88.       theNodes[tmp].m_info.m_type = NodeInfo::DB;
  89.       break;
  90.     case NODE_TYPE_API:
  91.       theNodes[tmp].m_info.m_type = NodeInfo::API;
  92.       break;
  93.     case NODE_TYPE_MGM:
  94.       theNodes[tmp].m_info.m_type = NodeInfo::MGM;
  95.       break;
  96.     case NODE_TYPE_REP:
  97.       theNodes[tmp].m_info.m_type = NodeInfo::REP;
  98.       break;
  99.     case NODE_TYPE_EXT_REP:
  100.       theNodes[tmp].m_info.m_type = NodeInfo::REP;
  101.       {
  102. Uint32 hbFreq = 10000;
  103. //ndb_mgm_get_int_parameter(iter, CFG_, &hbFreq);
  104. theNodes[tmp].hbFrequency = hbFreq;
  105. assert(100 <= hbFreq && hbFreq < 60 * 60 * 1000);
  106.       }
  107.       break;
  108.     default:
  109.       type = type;
  110. #if 0
  111.       ndbout_c("ClusterMgr: Unknown node type: %d", type);
  112. #endif
  113.     }
  114.   }
  115. }
  116. void
  117. ClusterMgr::startThread() {
  118.   NdbMutex_Lock(clusterMgrThreadMutex);
  119.   
  120.   theStop = 0;
  121.   
  122.   theClusterMgrThread = NdbThread_Create(runClusterMgr_C,
  123.                                          (void**)this,
  124.                                          32768,
  125.                                          "ndb_clustermgr",
  126.                                          NDB_THREAD_PRIO_LOW);
  127.   NdbMutex_Unlock(clusterMgrThreadMutex);
  128. }
  129. void
  130. ClusterMgr::doStop( ){
  131.   DBUG_ENTER("ClusterMgr::doStop");
  132.   NdbMutex_Lock(clusterMgrThreadMutex);
  133.   if(theStop){
  134.     NdbMutex_Unlock(clusterMgrThreadMutex);
  135.     DBUG_VOID_RETURN;
  136.   }
  137.   void *status;
  138.   theStop = 1;
  139.   if (theClusterMgrThread) {
  140.     NdbThread_WaitFor(theClusterMgrThread, &status);  
  141.     NdbThread_Destroy(&theClusterMgrThread);
  142.   }
  143.   NdbMutex_Unlock(clusterMgrThreadMutex);
  144.   DBUG_VOID_RETURN;
  145. }
  146. void
  147. ClusterMgr::threadMain( ){
  148.   NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId()));
  149.   
  150.   signal.theVerId_signalNumber   = GSN_API_REGREQ;
  151.   signal.theReceiversBlockNumber = QMGR;
  152.   signal.theTrace                = 0;
  153.   signal.theLength               = ApiRegReq::SignalLength;
  154.   ApiRegReq * req = CAST_PTR(ApiRegReq, signal.getDataPtrSend());
  155.   req->ref = numberToRef(API_CLUSTERMGR, theFacade.ownId());
  156.   req->version = NDB_VERSION;
  157.   
  158.   Uint32 timeSlept = 100;
  159.   Uint64 now = NdbTick_CurrentMillisecond();
  160.   while(!theStop){
  161.     /**
  162.      * Start of Secure area for use of Transporter
  163.      */
  164.     int send_heartbeat_now= global_flag_send_heartbeat_now;
  165.     global_flag_send_heartbeat_now= 0;
  166.     theFacade.lock_mutex();
  167.     for (int i = 1; i < MAX_NODES; i++){
  168.       /**
  169.        * Send register request (heartbeat) to all available nodes 
  170.        * at specified timing intervals
  171.        */
  172.       const NodeId nodeId = i;
  173.       Node & theNode = theNodes[nodeId];
  174.       
  175.       if (!theNode.defined)
  176. continue;
  177.       if (theNode.connected == false){
  178. theFacade.doConnect(nodeId);
  179. continue;
  180.       }
  181.       
  182.       if (!theNode.compatible){
  183. continue;
  184.       }
  185.       
  186.       theNode.hbCounter += timeSlept;
  187.       if (theNode.hbCounter >= theNode.hbFrequency ||
  188.   send_heartbeat_now) {
  189. /**
  190.  * It is now time to send a new Heartbeat
  191.  */
  192. if (theNode.hbCounter >= theNode.hbFrequency) {
  193.   theNode.hbSent++;
  194.   theNode.hbCounter = 0;
  195. }
  196. /**
  197.  * If the node is of type REP, 
  198.  * then the receiver of the signal should be API_CLUSTERMGR
  199.  */
  200. if (theNode.m_info.m_type == NodeInfo::REP) {
  201.   signal.theReceiversBlockNumber = API_CLUSTERMGR;
  202. }
  203. #if 0 
  204. ndbout_c("ClusterMgr: Sending API_REGREQ to node %d", (int)nodeId);
  205. #endif
  206. theFacade.sendSignalUnCond(&signal, nodeId);
  207.       }//if
  208.       
  209.       if (theNode.hbSent == 4 && theNode.hbFrequency > 0){
  210. reportNodeFailed(i);
  211.       }//if
  212.     }
  213.     
  214.     /**
  215.      * End of secure area. Let other threads in
  216.      */
  217.     theFacade.unlock_mutex();
  218.     
  219.     // Sleep for 100 ms between each Registration Heartbeat
  220.     Uint64 before = now;
  221.     NdbSleep_MilliSleep(100); 
  222.     now = NdbTick_CurrentMillisecond();
  223.     timeSlept = (now - before);
  224.   }
  225. }
  226. #if 0
  227. void
  228. ClusterMgr::showState(NodeId nodeId){
  229.   ndbout << "-- ClusterMgr - NodeId = " << nodeId << endl;
  230.   ndbout << "theNodeList      = " << theNodeList[nodeId] << endl;
  231.   ndbout << "theNodeState     = " << theNodeState[nodeId] << endl;
  232.   ndbout << "theNodeCount     = " << theNodeCount[nodeId] << endl;
  233.   ndbout << "theNodeStopDelay = " << theNodeStopDelay[nodeId] << endl;
  234.   ndbout << "theNodeSendDelay = " << theNodeSendDelay[nodeId] << endl;
  235. }
  236. #endif
  237. ClusterMgr::Node::Node()
  238.   : m_state(NodeState::SL_NOTHING) { 
  239.   compatible = nfCompleteRep = true;
  240.   connected = defined = m_alive = false; 
  241. }
  242. /******************************************************************************
  243.  * API_REGREQ and friends
  244.  ******************************************************************************/
  245. void
  246. ClusterMgr::execAPI_REGREQ(const Uint32 * theData){
  247.   const ApiRegReq * const apiRegReq = (ApiRegReq *)&theData[0];
  248.   const NodeId nodeId = refToNode(apiRegReq->ref);
  249. #if 0
  250.   ndbout_c("ClusterMgr: Recd API_REGREQ from node %d", nodeId);
  251. #endif
  252.   assert(nodeId > 0 && nodeId < MAX_NODES);
  253.   Node & node = theNodes[nodeId];
  254.   assert(node.defined == true);
  255.   assert(node.connected == true);
  256.   if(node.m_info.m_version != apiRegReq->version){
  257.     node.m_info.m_version = apiRegReq->version;
  258.     if (getMajor(node.m_info.m_version) < getMajor(NDB_VERSION) ||
  259. getMinor(node.m_info.m_version) < getMinor(NDB_VERSION)) {
  260.       node.compatible = false;
  261.     } else {
  262.       node.compatible = true;
  263.     }
  264.   }
  265.   NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId()));
  266.   signal.theVerId_signalNumber   = GSN_API_REGCONF;
  267.   signal.theReceiversBlockNumber = API_CLUSTERMGR;
  268.   signal.theTrace                = 0;
  269.   signal.theLength               = ApiRegConf::SignalLength;
  270.   
  271.   ApiRegConf * const conf = CAST_PTR(ApiRegConf, signal.getDataPtrSend());
  272.   conf->qmgrRef = numberToRef(API_CLUSTERMGR, theFacade.ownId());
  273.   conf->version = NDB_VERSION;
  274.   conf->apiHeartbeatFrequency = node.hbFrequency;
  275.   theFacade.sendSignalUnCond(&signal, nodeId);
  276. }
  277. int global_mgmt_server_check = 0; // set to one in mgmtsrvr main;
  278. void
  279. ClusterMgr::execAPI_REGCONF(const Uint32 * theData){
  280.   const ApiRegConf * const apiRegConf = (ApiRegConf *)&theData[0];
  281.   const NodeId nodeId = refToNode(apiRegConf->qmgrRef);
  282.   
  283. #if 0 
  284.   ndbout_c("ClusterMgr: Recd API_REGCONF from node %d", nodeId);
  285. #endif
  286.   assert(nodeId > 0 && nodeId < MAX_NODES);
  287.   
  288.   Node & node = theNodes[nodeId];
  289.   assert(node.defined == true);
  290.   assert(node.connected == true);
  291.   if(node.m_info.m_version != apiRegConf->version){
  292.     node.m_info.m_version = apiRegConf->version;
  293.     if (global_mgmt_server_check == 1)
  294.       node.compatible = ndbCompatible_mgmt_ndb(NDB_VERSION,
  295.        node.m_info.m_version);
  296.     else
  297.       node.compatible = ndbCompatible_api_ndb(NDB_VERSION,
  298.       node.m_info.m_version);
  299.   }
  300.   
  301.   node.m_state = apiRegConf->nodeState;
  302.   if (node.compatible && (node.m_state.startLevel == NodeState::SL_STARTED  ||
  303.   node.m_state.startLevel == NodeState::SL_SINGLEUSER)){
  304.     set_node_alive(node, true);
  305.   } else {
  306.     set_node_alive(node, false);
  307.   }//if
  308.   node.hbSent = 0;
  309.   node.hbCounter = 0;
  310.   if (node.m_info.m_type != NodeInfo::REP) {
  311.     node.hbFrequency = (apiRegConf->apiHeartbeatFrequency * 10) - 50;
  312.   }
  313. }
  314. void
  315. ClusterMgr::execAPI_REGREF(const Uint32 * theData){
  316.   
  317.   ApiRegRef * ref = (ApiRegRef*)theData;
  318.   
  319.   const NodeId nodeId = refToNode(ref->ref);
  320.   
  321.   assert(nodeId > 0 && nodeId < MAX_NODES);
  322.   
  323.   Node & node = theNodes[nodeId];
  324.   assert(node.connected == true);
  325.   assert(node.defined == true);
  326.   node.compatible = false;
  327.   set_node_alive(node, false);
  328.   node.m_state = NodeState::SL_NOTHING;
  329.   node.m_info.m_version = ref->version;
  330.   switch(ref->errorCode){
  331.   case ApiRegRef::WrongType:
  332.     ndbout_c("Node %d reports that this node should be a NDB node", nodeId);
  333.     abort();
  334.   case ApiRegRef::UnsupportedVersion:
  335.   default:
  336.     break;
  337.   }
  338. }
  339. void
  340. ClusterMgr::execNODE_FAILREP(const Uint32 * theData){
  341.   NodeFailRep * const nodeFail = (NodeFailRep *)&theData[0];
  342.   for(int i = 1; i<MAX_NODES; i++){
  343.     if(NodeBitmask::get(nodeFail->theNodes, i)){
  344.       reportNodeFailed(i);
  345.     }
  346.   }
  347. }
  348. void
  349. ClusterMgr::execNF_COMPLETEREP(const Uint32 * theData){
  350.   NFCompleteRep * const nfComp = (NFCompleteRep *)theData;
  351.   const NodeId nodeId = nfComp->failedNodeId;
  352.   assert(nodeId > 0 && nodeId < MAX_NODES);
  353.   
  354.   theFacade.ReportNodeFailureComplete(nodeId);
  355.   theNodes[nodeId].nfCompleteRep = true;
  356. }
  357. void
  358. ClusterMgr::reportConnected(NodeId nodeId){
  359.   /**
  360.    * Ensure that we are sending heartbeat every 100 ms
  361.    * until we have got the first reply from NDB providing
  362.    * us with the real time-out period to use.
  363.    */
  364.   assert(nodeId > 0 && nodeId < MAX_NODES);
  365.   noOfConnectedNodes++;
  366.   Node & theNode = theNodes[nodeId];
  367.   theNode.connected = true;
  368.   theNode.hbSent = 0;
  369.   theNode.hbCounter = 0;
  370.   
  371.   if (theNode.m_info.m_type != NodeInfo::REP) {
  372.     theNode.hbFrequency = 0;
  373.   }
  374.   theNode.m_info.m_version = 0;
  375.   theNode.compatible = true;
  376.   theNode.nfCompleteRep = true;
  377.   
  378.   theFacade.ReportNodeAlive(nodeId);
  379. }
  380. void
  381. ClusterMgr::reportDisconnected(NodeId nodeId){
  382.   assert(nodeId > 0 && nodeId < MAX_NODES);
  383.   assert(noOfConnectedNodes > 0);
  384.   noOfConnectedNodes--;
  385.   theNodes[nodeId].connected = false;
  386.   theNodes[nodeId].m_info.m_connectCount ++;
  387.   reportNodeFailed(nodeId);
  388. }
  389. void
  390. ClusterMgr::reportNodeFailed(NodeId nodeId){
  391.   Node & theNode = theNodes[nodeId];
  392.  
  393.   set_node_alive(theNode, false);
  394.   if(theNode.connected)
  395.     theFacade.doDisconnect(nodeId);
  396.   
  397.   const bool report = (theNode.m_state.startLevel != NodeState::SL_NOTHING);  
  398.   theNode.m_state.startLevel = NodeState::SL_NOTHING;
  399.   
  400.   if(report){
  401.     theFacade.ReportNodeDead(nodeId);
  402.   }  
  403.   theNode.nfCompleteRep = false;
  404.   
  405.   if(noOfAliveNodes == 0){
  406.     NFCompleteRep rep;
  407.     for(Uint32 i = 1; i<MAX_NODES; i++){
  408.       if(theNodes[i].defined && theNodes[i].nfCompleteRep == false){
  409. rep.failedNodeId = i;
  410. execNF_COMPLETEREP((Uint32*)&rep);
  411.       }
  412.     }
  413.   }
  414. }
  415. /******************************************************************************
  416.  * Arbitrator
  417.  ******************************************************************************/
  418. ArbitMgr::ArbitMgr(TransporterFacade & _fac)
  419.   : theFacade(_fac)
  420. {
  421.   DBUG_ENTER("ArbitMgr::ArbitMgr");
  422.   theThreadMutex = NdbMutex_Create();
  423.   theInputCond = NdbCondition_Create();
  424.   theInputMutex = NdbMutex_Create();
  425.   
  426.   theRank = 0;
  427.   theDelay = 0;
  428.   theThread = 0;
  429.   theInputTimeout = 0;
  430.   theInputFull = false;
  431.   memset(&theInputFull, 0, sizeof(theInputFull));
  432.   theState = StateInit;
  433.   memset(&theStartReq, 0, sizeof(theStartReq));
  434.   memset(&theChooseReq1, 0, sizeof(theChooseReq1));
  435.   memset(&theChooseReq2, 0, sizeof(theChooseReq2));
  436.   memset(&theStopOrd, 0, sizeof(theStopOrd));
  437.   DBUG_VOID_RETURN;
  438. }
  439. ArbitMgr::~ArbitMgr()
  440. {
  441.   DBUG_ENTER("ArbitMgr::~ArbitMgr");
  442.   NdbMutex_Destroy(theThreadMutex);
  443.   NdbCondition_Destroy(theInputCond);
  444.   NdbMutex_Destroy(theInputMutex);
  445.   DBUG_VOID_RETURN;
  446. }
  447. // Start arbitrator thread.  This is kernel request.
  448. // First stop any previous thread since it is a left-over
  449. // which was never used and which now has wrong ticket.
  450. void
  451. ArbitMgr::doStart(const Uint32* theData)
  452. {
  453.   ArbitSignal aSignal;
  454.   NdbMutex_Lock(theThreadMutex);
  455.   if (theThread != NULL) {
  456.     aSignal.init(GSN_ARBIT_STOPORD, NULL);
  457.     aSignal.data.code = StopRestart;
  458.     sendSignalToThread(aSignal);
  459.     void* value;
  460.     NdbThread_WaitFor(theThread, &value);
  461.     NdbThread_Destroy(&theThread);
  462.     theState = StateInit;
  463.     theInputFull = false;
  464.   }
  465.   aSignal.init(GSN_ARBIT_STARTREQ, theData);
  466.   sendSignalToThread(aSignal);
  467.   theThread = NdbThread_Create(
  468.     runArbitMgr_C, (void**)this, 32768, "ndb_arbitmgr",
  469.     NDB_THREAD_PRIO_HIGH);
  470.   NdbMutex_Unlock(theThreadMutex);
  471. }
  472. // The "choose me" signal from a candidate.
  473. void
  474. ArbitMgr::doChoose(const Uint32* theData)
  475. {
  476.   ArbitSignal aSignal;
  477.   aSignal.init(GSN_ARBIT_CHOOSEREQ, theData);
  478.   sendSignalToThread(aSignal);
  479. }
  480. // Stop arbitrator thread via stop signal from the kernel
  481. // or when exiting API program.
  482. void
  483. ArbitMgr::doStop(const Uint32* theData)
  484. {
  485.   DBUG_ENTER("ArbitMgr::doStop");
  486.   ArbitSignal aSignal;
  487.   NdbMutex_Lock(theThreadMutex);
  488.   if (theThread != NULL) {
  489.     aSignal.init(GSN_ARBIT_STOPORD, theData);
  490.     if (theData == 0) {
  491.       aSignal.data.code = StopExit;
  492.     } else {
  493.       aSignal.data.code = StopRequest;
  494.     }
  495.     sendSignalToThread(aSignal);
  496.     void* value;
  497.     NdbThread_WaitFor(theThread, &value);
  498.     NdbThread_Destroy(&theThread);
  499.     theState = StateInit;
  500.   }
  501.   NdbMutex_Unlock(theThreadMutex);
  502.   DBUG_VOID_RETURN;
  503. }
  504. // private methods
  505. extern "C" 
  506. void*
  507. runArbitMgr_C(void* me)
  508. {
  509.   ((ArbitMgr*) me)->threadMain();
  510.   return NULL;
  511. }
  512. void
  513. ArbitMgr::sendSignalToThread(ArbitSignal& aSignal)
  514. {
  515. #ifdef DEBUG_ARBIT
  516.   char buf[17] = "";
  517.   ndbout << "arbit recv: ";
  518.   ndbout << " gsn=" << aSignal.gsn;
  519.   ndbout << " send=" << aSignal.data.sender;
  520.   ndbout << " code=" << aSignal.data.code;
  521.   ndbout << " node=" << aSignal.data.node;
  522.   ndbout << " ticket=" << aSignal.data.ticket.getText(buf, sizeof(buf));
  523.   ndbout << " mask=" << aSignal.data.mask.getText(buf, sizeof(buf));
  524.   ndbout << endl;
  525. #endif
  526.   aSignal.setTimestamp();       // signal arrival time
  527.   NdbMutex_Lock(theInputMutex);
  528.   while (theInputFull) {
  529.     NdbCondition_WaitTimeout(theInputCond, theInputMutex, 1000);
  530.   }
  531.   theInputBuffer = aSignal;
  532.   theInputFull = true;
  533.   NdbCondition_Signal(theInputCond);
  534.   NdbMutex_Unlock(theInputMutex);
  535. }
  536. void
  537. ArbitMgr::threadMain()
  538. {
  539.   ArbitSignal aSignal;
  540.   aSignal = theInputBuffer;
  541.   threadStart(aSignal);
  542.   bool stop = false;
  543.   while (! stop) {
  544.     NdbMutex_Lock(theInputMutex);
  545.     while (! theInputFull) {
  546.       NdbCondition_WaitTimeout(theInputCond, theInputMutex, theInputTimeout);
  547.       threadTimeout();
  548.     }
  549.     aSignal = theInputBuffer;
  550.     theInputFull = false;
  551.     NdbCondition_Signal(theInputCond);
  552.     NdbMutex_Unlock(theInputMutex);
  553.     switch (aSignal.gsn) {
  554.     case GSN_ARBIT_CHOOSEREQ:
  555.       threadChoose(aSignal);
  556.       break;
  557.     case GSN_ARBIT_STOPORD:
  558.       stop = true;
  559.       break;
  560.     }
  561.   }
  562.   threadStop(aSignal);
  563. }
  564. // handle events in the thread
  565. void
  566. ArbitMgr::threadStart(ArbitSignal& aSignal)
  567. {
  568.   theStartReq = aSignal;
  569.   sendStartConf(theStartReq, ArbitCode::ApiStart);
  570.   theState = StateStarted;
  571.   theInputTimeout = 1000;
  572. }
  573. void
  574. ArbitMgr::threadChoose(ArbitSignal& aSignal)
  575. {
  576.   switch (theState) {
  577.   case StateStarted:            // first REQ
  578.     if (! theStartReq.data.match(aSignal.data)) {
  579.       sendChooseRef(aSignal, ArbitCode::ErrTicket);
  580.       break;
  581.     }
  582.     theChooseReq1 = aSignal;
  583.     if (theDelay == 0) {
  584.       sendChooseConf(aSignal, ArbitCode::WinChoose);
  585.       theState = StateFinished;
  586.       theInputTimeout = 1000;
  587.       break;
  588.     }
  589.     theState = StateChoose1;
  590.     theInputTimeout = 1;
  591.     return;
  592.   case StateChoose1:            // second REQ within Delay
  593.     if (! theStartReq.data.match(aSignal.data)) {
  594.       sendChooseRef(aSignal, ArbitCode::ErrTicket);
  595.       break;
  596.     }
  597.     theChooseReq2 = aSignal;
  598.     theState = StateChoose2;
  599.     theInputTimeout = 1;
  600.     return;
  601.   case StateChoose2:            // too many REQs - refuse all
  602.     if (! theStartReq.data.match(aSignal.data)) {
  603.       sendChooseRef(aSignal, ArbitCode::ErrTicket);
  604.       break;
  605.     }
  606.     sendChooseRef(theChooseReq1, ArbitCode::ErrToomany);
  607.     sendChooseRef(theChooseReq2, ArbitCode::ErrToomany);
  608.     sendChooseRef(aSignal, ArbitCode::ErrToomany);
  609.     theState = StateFinished;
  610.     theInputTimeout = 1000;
  611.     return;
  612.   default:
  613.     sendChooseRef(aSignal, ArbitCode::ErrState);
  614.     break;
  615.   }
  616. }
  617. void
  618. ArbitMgr::threadTimeout()
  619. {
  620.   switch (theState) {
  621.   case StateStarted:
  622.     break;
  623.   case StateChoose1:
  624.     if (theChooseReq1.getTimediff() < theDelay)
  625.       break;
  626.     sendChooseConf(theChooseReq1, ArbitCode::WinChoose);
  627.     theState = StateFinished;
  628.     theInputTimeout = 1000;
  629.     break;
  630.   case StateChoose2:
  631.     sendChooseConf(theChooseReq1, ArbitCode::WinChoose);
  632.     sendChooseConf(theChooseReq2, ArbitCode::LoseChoose);
  633.     theState = StateFinished;
  634.     theInputTimeout = 1000;
  635.     break;
  636.   default:
  637.     break;
  638.   }
  639. }
  640. void
  641. ArbitMgr::threadStop(ArbitSignal& aSignal)
  642. {
  643.   switch (aSignal.data.code) {
  644.   case StopExit:
  645.     switch (theState) {
  646.     case StateStarted:
  647.       sendStopRep(theStartReq, 0);
  648.       break;
  649.     case StateChoose1:                  // just in time
  650.       sendChooseConf(theChooseReq1, ArbitCode::WinChoose);
  651.       break;
  652.     case StateChoose2:
  653.       sendChooseConf(theChooseReq1, ArbitCode::WinChoose);
  654.       sendChooseConf(theChooseReq2, ArbitCode::LoseChoose);
  655.       break;
  656.     case StateInit:
  657.     case StateFinished:
  658.       //??
  659.       break;
  660.     }
  661.     break;
  662.   case StopRequest:
  663.     break;
  664.   case StopRestart:
  665.     break;
  666.   }
  667. }
  668. // output routines
  669. void
  670. ArbitMgr::sendStartConf(ArbitSignal& aSignal, Uint32 code)
  671. {
  672.   ArbitSignal copySignal = aSignal;
  673.   copySignal.gsn = GSN_ARBIT_STARTCONF;
  674.   copySignal.data.code = code;
  675.   sendSignalToQmgr(copySignal);
  676. }
  677. void
  678. ArbitMgr::sendChooseConf(ArbitSignal& aSignal, Uint32 code)
  679. {
  680.   ArbitSignal copySignal = aSignal;
  681.   copySignal.gsn = GSN_ARBIT_CHOOSECONF;
  682.   copySignal.data.code = code;
  683.   sendSignalToQmgr(copySignal);
  684. }
  685. void
  686. ArbitMgr::sendChooseRef(ArbitSignal& aSignal, Uint32 code)
  687. {
  688.   ArbitSignal copySignal = aSignal;
  689.   copySignal.gsn = GSN_ARBIT_CHOOSEREF;
  690.   copySignal.data.code = code;
  691.   sendSignalToQmgr(copySignal);
  692. }
  693. void
  694. ArbitMgr::sendStopRep(ArbitSignal& aSignal, Uint32 code)
  695. {
  696.   ArbitSignal copySignal = aSignal;
  697.   copySignal.gsn = GSN_ARBIT_STOPREP;
  698.   copySignal.data.code = code;
  699.   sendSignalToQmgr(copySignal);
  700. }
  701. /**
  702.  * Send signal to QMGR.  The input includes signal number and
  703.  * signal data.  The signal data is normally a copy of a received
  704.  * signal so it contains expected arbitrator node id and ticket.
  705.  * The sender in signal data is the QMGR node id.
  706.  */
  707. void
  708. ArbitMgr::sendSignalToQmgr(ArbitSignal& aSignal)
  709. {
  710.   NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId()));
  711.   signal.theVerId_signalNumber = aSignal.gsn;
  712.   signal.theReceiversBlockNumber = QMGR;
  713.   signal.theTrace  = 0;
  714.   signal.theLength = ArbitSignalData::SignalLength;
  715.   ArbitSignalData* sd = CAST_PTR(ArbitSignalData, signal.getDataPtrSend());
  716.   sd->sender = numberToRef(API_CLUSTERMGR, theFacade.ownId());
  717.   sd->code = aSignal.data.code;
  718.   sd->node = aSignal.data.node;
  719.   sd->ticket = aSignal.data.ticket;
  720.   sd->mask = aSignal.data.mask;
  721. #ifdef DEBUG_ARBIT
  722.   char buf[17] = "";
  723.   ndbout << "arbit send: ";
  724.   ndbout << " gsn=" << aSignal.gsn;
  725.   ndbout << " recv=" << aSignal.data.sender;
  726.   ndbout << " code=" << aSignal.data.code;
  727.   ndbout << " node=" << aSignal.data.node;
  728.   ndbout << " ticket=" << aSignal.data.ticket.getText(buf, sizeof(buf));
  729.   ndbout << " mask=" << aSignal.data.mask.getText(buf, sizeof(buf));
  730.   ndbout << endl;
  731. #endif
  732.   theFacade.lock_mutex();
  733.   theFacade.sendSignalUnCond(&signal, aSignal.data.sender);
  734.   theFacade.unlock_mutex();
  735. }