ExtNDB.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:16k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #include "ExtNDB.hpp"
  14. #include "ConfigRetriever.hpp"
  15. #include <NdbSleep.h>
  16. #include <NdbApiSignal.hpp>
  17. #include <signaldata/DictTabInfo.hpp>
  18. #include <signaldata/GetTabInfo.hpp>
  19. #include <signaldata/SumaImpl.hpp>
  20. #include <AttributeHeader.hpp>
  21. #include <rep/rep_version.hpp>
  22. #include <ndb_limits.h>
  23. /*****************************************************************************
  24.  * Constructor / Destructor / Init
  25.  *****************************************************************************/
  26. ExtNDB::ExtNDB(GCIContainerPS * gciContainer, ExtAPI * extAPI)
  27. {
  28.   m_grepSender = new ExtSender();
  29.   if (!m_grepSender) REPABORT("Could not allocate object");
  30.   m_gciContainerPS = gciContainer;
  31.   m_nodeGroupInfo = new NodeGroupInfo();
  32.   m_gciContainerPS->setNodeGroupInfo(m_nodeGroupInfo);
  33.   
  34.   m_doneSetGrepSender = false;
  35.   m_subId = 0;
  36.   m_subKey = 0;
  37.   m_firstGCI = 0;
  38.   m_dataLogStarted = false;
  39.   m_extAPI = extAPI;
  40.   if (!m_extAPI) REPABORT("Could not allocate object");
  41. }
  42. ExtNDB::~ExtNDB()
  43. {
  44.   delete m_grepSender;
  45.   delete m_nodeGroupInfo;
  46. }    
  47. void 
  48. ExtNDB::signalErrorHandler(NdbApiSignal  * signal, Uint32 nodeId) 
  49. {
  50.   //const Uint32 gsn = signal->readSignalNumber();
  51.   //const Uint32 len = signal->getLength();
  52.   RLOG(("Send signal failed. Signal %p", signal));
  53. }
  54. bool
  55. ExtNDB::init(const char * connectString) 
  56. {
  57.   m_signalExecThread = NdbThread_Create(signalExecThread_C,
  58. (void **)this,
  59. 32768,
  60. "ExtNDB_Service",
  61. NDB_THREAD_PRIO_LOW);
  62. #if 0
  63.   /**
  64.    * I don't see that this does anything
  65.    *
  66.    * Jonas 13/2-04
  67.    */
  68.   ConfigRetriever cr; cr.setConnectString(connectString);
  69.   ndb_mgm_configuration * config = cr.getConfig(NDB_VERSION, NODE_TYPE_REP);
  70.   if (config == 0) {
  71.     ndbout << "ExtNDB: Configuration error: ";
  72.     const char* erString = cr.getErrorString();
  73.     if (erString == 0) {
  74.       erString = "No error specified!";
  75.     }
  76.     ndbout << erString << endl;
  77.     return false;
  78.   }
  79.   NdbAutoPtr autoPtr(config);
  80.   m_ownNodeId = r.getOwnNodeId();
  81.   
  82.   /**
  83.    * Check which GREPs to connect to (in configuration)
  84.    * 
  85.    * @note SYSTEM LIMITATION: Only connects to one GREP
  86.    */
  87.   Uint32 noOfConnections=0;
  88.   NodeId grepNodeId=0;
  89.   const Properties * connection;
  90.   config->get("NoOfConnections", &noOfConnections);
  91.   for (Uint32 i=0; i<noOfConnections; i++) {
  92.     Uint32 nodeId1, nodeId2;
  93.     config->get("Connection", i, &connection);
  94.     connection->get("NodeId1", &nodeId1);
  95.     connection->get("NodeId2", &nodeId2);
  96.     if (!connection->contains("System1") &&
  97. !connection->contains("System2") &&
  98. (nodeId1 == m_ownNodeId || nodeId2 == m_ownNodeId)) {
  99.       /**
  100.        * Found connection 
  101.        */
  102.       if (nodeId1 == m_ownNodeId) {
  103. grepNodeId = nodeId2;
  104.       } else {
  105. grepNodeId = nodeId1;
  106.       }
  107.     }
  108.   }
  109. #endif
  110.   m_transporterFacade = TransporterFacade::instance();
  111.   
  112.   assert(m_transporterFacade != 0);
  113.   
  114.   m_ownBlockNo = m_transporterFacade->open(this, execSignal, execNodeStatus);
  115.   assert(m_ownBlockNo > 0);
  116.   m_ownRef = numberToRef(m_ownBlockNo, m_ownNodeId);
  117.   ndbout_c("EXTNDB blockno %d ownref %d ", m_ownBlockNo, m_ownRef);
  118.   assert(m_ownNodeId == m_transporterFacade->ownId());
  119.   
  120.   m_grepSender->setOwnRef(m_ownRef);
  121.   m_grepSender->setTransporterFacade(m_transporterFacade);
  122.   if(!m_grepSender->connected(50000)){
  123.     ndbout_c("ExtNDB: Failed to connect to DB nodes!");
  124.     ndbout_c("ExtNDB: Tried to create transporter as (node %d, block %d).",
  125.      m_ownNodeId, m_ownBlockNo);
  126.     ndbout_c("ExtNDB: Check that DB nodes are started.");
  127.     return false; 
  128.   }
  129.   ndbout_c("Phase 3 (ExtNDB): Connection %d to NDB Cluster opened (Extractor)",
  130.    m_ownBlockNo);
  131.   
  132.   for (Uint32 i=1; i<MAX_NDB_NODES; i++) {
  133.     if (m_transporterFacade->getIsDbNode(i) && 
  134. m_transporterFacade->getIsNodeSendable(i)) 
  135.       {
  136. Uint32 nodeGrp = m_transporterFacade->getNodeGrp(i);
  137. m_nodeGroupInfo->addNodeToNodeGrp(i, true, nodeGrp);
  138. Uint32 nodeId = m_nodeGroupInfo->getFirstConnectedNode(nodeGrp);
  139. m_grepSender->setNodeId(nodeId);
  140. if(m_nodeGroupInfo->getPrimaryNode(nodeGrp) == 0) {
  141.   m_nodeGroupInfo->setPrimaryNode(nodeGrp, nodeId);
  142. }
  143. m_doneSetGrepSender = true;
  144. #if 0
  145. RLOG(("Added node %d to node group %d", i, nodeGrp));
  146. #endif
  147.     }
  148.   }
  149.   return true;
  150. }
  151. /*****************************************************************************
  152.  * Signal Queue Executor
  153.  *****************************************************************************/
  154. class SigMatch 
  155. {
  156. public:
  157.   int gsn;
  158.   void (ExtNDB::* function)(NdbApiSignal *signal);
  159.   SigMatch() { gsn = 0; function = NULL; };
  160.   SigMatch(int _gsn, void (ExtNDB::* _function)(NdbApiSignal *signal)) {
  161.     gsn = _gsn;
  162.     function = _function;
  163.   };
  164.   bool check(NdbApiSignal *signal) {
  165.     if(signal->readSignalNumber() == gsn)
  166.       return true;
  167.     return false;
  168.   };
  169. };
  170. extern "C"
  171. void *signalExecThread_C(void *r) 
  172. {
  173.   ExtNDB *grepps = (ExtNDB*)r;
  174.   grepps->signalExecThreadRun();
  175.   NdbThread_Exit(0);
  176.   /* NOTREACHED */
  177.   return 0;
  178. }
  179. void
  180. ExtNDB::signalExecThreadRun() 
  181. {
  182.   Vector<SigMatch> sl;
  183.   /**
  184.    * Signals to be executed
  185.    */
  186.   sl.push_back(SigMatch(GSN_SUB_GCP_COMPLETE_REP, 
  187. &ExtNDB::execSUB_GCP_COMPLETE_REP));
  188.   
  189.   /**
  190.    * Is also forwarded to SSCoord
  191.    */
  192.   sl.push_back(SigMatch(GSN_GREP_SUB_START_CONF,
  193. &ExtNDB::execGREP_SUB_START_CONF));
  194.   sl.push_back(SigMatch(GSN_GREP_SUB_CREATE_CONF,
  195. &ExtNDB::execGREP_SUB_CREATE_CONF));
  196.   sl.push_back(SigMatch(GSN_GREP_SUB_REMOVE_CONF, 
  197. &ExtNDB::execGREP_SUB_REMOVE_CONF));
  198.   /**
  199.    * Signals to be forwarded 
  200.    */  
  201.   sl.push_back(SigMatch(GSN_GREP_CREATE_SUBID_CONF, 
  202. &ExtNDB::execGREP_CREATE_SUBID_CONF));
  203.   sl.push_back(SigMatch(GSN_GREP_SUB_SYNC_CONF, &ExtNDB::sendSignalRep));
  204.   sl.push_back(SigMatch(GSN_GREP_SUB_REMOVE_REF, &ExtNDB::sendSignalRep));
  205.   sl.push_back(SigMatch(GSN_GREP_SUB_SYNC_REF, &ExtNDB::sendSignalRep));
  206.   sl.push_back(SigMatch(GSN_GREP_CREATE_SUBID_REF, &ExtNDB::sendSignalRep));
  207.   sl.push_back(SigMatch(GSN_GREP_SUB_START_REF, &ExtNDB::sendSignalRep));
  208.   sl.push_back(SigMatch(GSN_GREP_SUB_CREATE_REF, &ExtNDB::sendSignalRep));
  209.   while(1) {
  210.     SigMatch *handler = NULL;
  211.     NdbApiSignal *signal = NULL;
  212.     if(m_signalRecvQueue.waitFor(sl, handler, signal, DEFAULT_TIMEOUT)) {
  213. #if 0
  214.       RLOG(("Removed signal from queue (GSN: %d, QSize: %d)",
  215.     signal->readSignalNumber(), m_signalRecvQueue.size()));
  216. #endif
  217.       if(handler->function != 0) {
  218. (this->*handler->function)(signal);
  219. delete signal;  signal = 0;
  220.       } else {
  221. REPABORT("Illegal handler for signal");
  222.       }
  223.     }
  224.   }
  225. }
  226. void
  227. ExtNDB::sendSignalRep(NdbApiSignal * s)
  228. {
  229.   if(m_repSender->sendSignal(s) == -1)
  230.   {
  231.     signalErrorHandler(s, 0);
  232.   }
  233. }
  234. void
  235. ExtNDB::execSignal(void* executorObj, NdbApiSignal* signal, 
  236.    class LinearSectionPtr ptr[3])
  237. {
  238.   ExtNDB * executor = (ExtNDB*)executorObj;
  239.      
  240.   const Uint32 gsn = signal->readSignalNumber();
  241.   const Uint32 len = signal->getLength();
  242.   NdbApiSignal * s = new NdbApiSignal(executor->m_ownRef);
  243.   switch(gsn){
  244.   case GSN_SUB_GCP_COMPLETE_REP:
  245.   case GSN_GREP_CREATE_SUBID_CONF:
  246.   case GSN_GREP_SUB_CREATE_CONF:
  247.   case GSN_GREP_SUB_START_CONF:
  248.   case GSN_GREP_SUB_SYNC_CONF:
  249.   case GSN_GREP_SUB_REMOVE_CONF:
  250.   case GSN_GREP_CREATE_SUBID_REF:
  251.   case GSN_GREP_SUB_CREATE_REF:
  252.   case GSN_GREP_SUB_START_REF:
  253.   case GSN_GREP_SUB_SYNC_REF:
  254.   case GSN_GREP_SUB_REMOVE_REF:
  255.     s->set(0, SSREPBLOCKNO, gsn, len);
  256.     memcpy(s->getDataPtrSend(), signal->getDataPtr(), 4 * len);
  257.     executor->m_signalRecvQueue.receive(s);    
  258.     break;
  259.   case GSN_SUB_TABLE_DATA:
  260.     executor->execSUB_TABLE_DATA(signal, ptr);
  261.     delete s;  s=0;
  262.     break;
  263.   case GSN_SUB_META_DATA:
  264.     executor->execSUB_META_DATA(signal, ptr);
  265.     delete s;  s=0;
  266.     break;
  267.   default:
  268.     REPABORT1("Illegal signal received in execSignal", gsn);
  269.   }
  270.   s=0;
  271. #if 0
  272.   ndbout_c("ExtNDB: Inserted signal into queue (GSN: %d, Len: %d)",
  273.    signal->readSignalNumber(), len);
  274. #endif
  275. }
  276. void 
  277. ExtNDB::execNodeStatus(void* obj, Uint16 nodeId, bool alive, bool nfCompleted)
  278. {
  279.   ExtNDB * thisObj = (ExtNDB*)obj;
  280.   RLOG(("Changed node status (Id %d, Alive %d, nfCompleted %d)",
  281. nodeId, alive, nfCompleted));
  282.   
  283.   if(alive) {
  284.     /**
  285.      *  Connected
  286.      */
  287.     Uint32 nodeGrp = thisObj->m_transporterFacade->getNodeGrp(nodeId);
  288.     RLOG(("DB node %d of node group %d connected", nodeId, nodeGrp));
  289.   
  290.     thisObj->m_nodeGroupInfo->addNodeToNodeGrp(nodeId, true, nodeGrp);
  291.     Uint32 firstNode = thisObj->m_nodeGroupInfo->getPrimaryNode(nodeGrp);
  292.       
  293.     if(firstNode == 0)
  294.       thisObj->m_nodeGroupInfo->setPrimaryNode(nodeGrp, nodeId);
  295.     if (!thisObj->m_doneSetGrepSender) {
  296.       thisObj->m_grepSender->setNodeId(firstNode);
  297.       thisObj->m_doneSetGrepSender = true;
  298.     }
  299.     RLOG(("Connect: First connected node in nodegroup: %d", 
  300.   thisObj->m_nodeGroupInfo->getPrimaryNode(nodeGrp)));
  301.   } else if (!nfCompleted) {
  302.     
  303.     /**
  304.      *  Set node as "disconnected" in m_nodeGroupInfo until 
  305.      *  node comes up again.
  306.      */
  307.     Uint32 nodeGrp = thisObj->m_transporterFacade->getNodeGrp(nodeId);
  308.     RLOG(("DB node %d of node group %d disconnected", 
  309.   nodeId, nodeGrp));
  310.     thisObj->m_nodeGroupInfo->setConnectStatus(nodeId, false);
  311.     /**
  312.      * The node that crashed was also the primary node, the we must change
  313.      * primary node 
  314.      */
  315.     if(nodeId == thisObj->m_nodeGroupInfo->getPrimaryNode(nodeGrp)) {
  316.       Uint32 node = thisObj->m_nodeGroupInfo->getFirstConnectedNode(nodeGrp);
  317.       if(node > 0) {
  318. thisObj->m_grepSender->setNodeId(node);
  319. thisObj->m_nodeGroupInfo->setPrimaryNode(nodeGrp, node);
  320.       }
  321.       else {
  322. thisObj->sendDisconnectRep(nodeGrp);
  323.       }
  324.     }
  325.     RLOG(("Disconnect: First connected node in nodegroup: %d", 
  326.   thisObj->m_nodeGroupInfo->getPrimaryNode(nodeGrp)));
  327.   } else if(nfCompleted) {
  328.   } else {
  329.     REPABORT("Function execNodeStatus with wrong parameters");
  330.   }
  331. }
  332. /*****************************************************************************
  333.  * Signal Receivers for LOG and SCAN
  334.  *****************************************************************************/
  335. /**
  336.  * Receive datalog/datascan from GREP/SUMA
  337.  */
  338. void
  339. ExtNDB::execSUB_TABLE_DATA(NdbApiSignal * signal, LinearSectionPtr ptr[3])
  340. {
  341.   SubTableData * const data = (SubTableData*)signal->getDataPtr();
  342.   Uint32 tableId            = data->tableId;
  343.   Uint32 operation          = data->operation;
  344.   Uint32 gci                = data->gci;
  345.   Uint32 nodeId             = refToNode(signal->theSendersBlockRef);
  346.   if((SubTableData::LogType)data->logType == SubTableData::SCAN) 
  347.   {
  348.     Uint32 nodeGrp =  m_nodeGroupInfo->findNodeGroup(nodeId);
  349.     NodeGroupInfo::iterator * it;  
  350.     it = new NodeGroupInfo::iterator(nodeGrp, m_nodeGroupInfo);
  351.     for(NodeConnectInfo * nci=it->first(); it->exists();nci=it->next()) {
  352.       m_gciContainerPS->insertLogRecord(nci->nodeId, tableId, 
  353. operation, ptr, gci);
  354.     }
  355.     delete it;  it = 0;
  356.   } else {
  357.     m_gciContainerPS->insertLogRecord(nodeId, tableId, operation, ptr, gci);   
  358.   }
  359. }
  360. /**
  361.  * Receive metalog/metascan from GREP/SUMA
  362.  */
  363. void
  364. ExtNDB::execSUB_META_DATA(NdbApiSignal * signal, LinearSectionPtr ptr[3]) 
  365. {
  366.   Uint32 nodeId = refToNode(signal->theSendersBlockRef);
  367.   SubMetaData * const data = (SubMetaData*)signal->getDataPtr();
  368.   Uint32 tableId           = data->tableId;
  369.   Uint32 gci               = data->gci;
  370.   Uint32 nodeGrp = m_nodeGroupInfo->findNodeGroup(nodeId);
  371.   NodeGroupInfo::iterator * it;  
  372.   it = new NodeGroupInfo::iterator(nodeGrp, m_nodeGroupInfo);
  373.   for(NodeConnectInfo * nci=it->first(); it->exists();nci=it->next()) {
  374.     m_gciContainerPS->insertMetaRecord(nci->nodeId, tableId, ptr, gci);
  375.     RLOG(("Received meta record in %d[%d]", nci->nodeId, gci));
  376.   }
  377.   delete it;  it = 0;    
  378. }
  379. /*****************************************************************************
  380.  * Signal Receivers (Signals that are actually just forwarded to SS REP)
  381.  *****************************************************************************/
  382. void 
  383. ExtNDB::execGREP_CREATE_SUBID_CONF(NdbApiSignal * signal) 
  384. {
  385.   CreateSubscriptionIdConf const * conf = 
  386.     (CreateSubscriptionIdConf *)signal->getDataPtr();
  387.   Uint32 subId  = conf->subscriptionId;
  388.   Uint32 subKey = conf->subscriptionKey;
  389.   ndbout_c("GREP_CREATE_SUBID_CONF m_extAPI=%pn", m_extAPI);
  390.   m_extAPI->eventSubscriptionIdCreated(subId, subKey);
  391. }
  392. /*****************************************************************************
  393.  * Signal Receivers 
  394.  *****************************************************************************/
  395. /**
  396.  * Receive information about completed GCI from GREP/SUMA
  397.  *
  398.  * GCI completed, i.e. no more unsent log records exists in SUMA
  399.  * @todo use node id to identify buffers?
  400.  */
  401. void
  402. ExtNDB::execSUB_GCP_COMPLETE_REP(NdbApiSignal * signal) 
  403. {
  404.   SubGcpCompleteRep * const rep = (SubGcpCompleteRep*)signal->getDataPtr();
  405.   const Uint32 gci              = rep->gci;
  406.   Uint32 nodeId                 = refToNode(rep->senderRef);
  407.   RLOG(("Epoch %d completed at node %d", gci, nodeId));
  408.   m_gciContainerPS->setCompleted(gci, nodeId);
  409.   if(m_firstGCI == gci && !m_dataLogStarted) {
  410.     sendGREP_SUB_START_CONF(signal, m_firstGCI);
  411.     m_dataLogStarted = true;
  412.   }
  413. }
  414. /**
  415.  * Send info that scan is competed to SS REP
  416.  *
  417.  * @todo  Use node id to identify buffers?
  418.  */
  419. void 
  420. ExtNDB::sendGREP_SUB_START_CONF(NdbApiSignal * signal, Uint32 gci)
  421. {
  422.   RLOG(("Datalog started (Epoch %d)", gci));
  423.   GrepSubStartConf * conf = (GrepSubStartConf *)signal->getDataPtrSend();  
  424.   conf->firstGCI                = gci;
  425.   conf->subscriptionId          = m_subId;
  426.   conf->subscriptionKey         = m_subKey;
  427.   conf->part                    = SubscriptionData::TableData;
  428.   signal->m_noOfSections = 0;
  429.   signal->set(0, SSREPBLOCKNO, GSN_GREP_SUB_START_CONF,
  430.       GrepSubStartConf::SignalLength);  
  431.   sendSignalRep(signal);
  432. }
  433. /**
  434.  * Scan is completed... says SUMA/GREP
  435.  *
  436.  * @todo  Use node id to identify buffers?
  437.  */
  438. void 
  439. ExtNDB::execGREP_SUB_START_CONF(NdbApiSignal * signal)
  440. {
  441.   GrepSubStartConf * const conf = (GrepSubStartConf *)signal->getDataPtr();  
  442.   Uint32 part                   = conf->part;
  443.   //Uint32 nodeId                 = refToNode(conf->senderRef);
  444.   m_firstGCI                    = conf->firstGCI;
  445.   if (part == SubscriptionData::TableData) {
  446.     RLOG(("Datalog started (Epoch %d)", m_firstGCI));
  447.     return;
  448.   } 
  449.   RLOG(("Metalog started (Epoch %d)", m_firstGCI));
  450.   signal->set(0, SSREPBLOCKNO, GSN_GREP_SUB_START_CONF,
  451.       GrepSubStartConf::SignalLength);  
  452.   sendSignalRep(signal);
  453. }
  454. /**
  455.  * Receive no of node groups that PS has and pass signal on to SS
  456.  */
  457. void 
  458. ExtNDB::execGREP_SUB_CREATE_CONF(NdbApiSignal * signal) 
  459. {
  460.   GrepSubCreateConf * conf = (GrepSubCreateConf *)signal->getDataPtrSend();  
  461.   m_subId                  = conf->subscriptionId;
  462.   m_subKey                 = conf->subscriptionKey;
  463.   conf->noOfNodeGroups  = m_nodeGroupInfo->getNoOfNodeGroups();
  464.   sendSignalRep(signal);
  465. }
  466. /**
  467.  * Receive conf that subscription has been remove in GREP/SUMA
  468.  *
  469.  * Pass signal on to TransPS
  470.  */
  471. void 
  472. ExtNDB::execGREP_SUB_REMOVE_CONF(NdbApiSignal * signal) 
  473. {  
  474.   m_gciContainerPS->reset();
  475.   sendSignalRep(signal);
  476. }
  477. /**
  478.  * If all PS nodes has disconnected, then remove all epochs 
  479.  * for this subscription.
  480.  */
  481. void
  482. ExtNDB::sendDisconnectRep(Uint32 nodeId) 
  483. {
  484.   NdbApiSignal * signal = new NdbApiSignal(m_ownRef);
  485.   signal->set(0, SSREPBLOCKNO, GSN_REP_DISCONNECT_REP,
  486.       RepDisconnectRep::SignalLength);
  487.   RepDisconnectRep * rep = (RepDisconnectRep*) signal->getDataPtrSend();
  488.   rep->nodeId = nodeId;
  489.   rep->subId  = m_subId;
  490.   rep->subKey = m_subKey;
  491.   sendSignalRep(signal);
  492. }