TransPS.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:16k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #include "ConfigRetriever.hpp"
  14. #include <NdbSleep.h>
  15. #include <NdbApiSignal.hpp>
  16. #include <AttributeHeader.hpp>
  17. #include <signaldata/DictTabInfo.hpp>
  18. #include <signaldata/GetTabInfo.hpp>
  19. #include <signaldata/SumaImpl.hpp>
  20. #include <GrepError.hpp>
  21. #include <SimpleProperties.hpp>
  22. #include "TransPS.hpp"
  23. #include <rep/storage/NodeGroupInfo.hpp>
  24. /*****************************************************************************
  25.  * Constructor / Destructor / Init
  26.  *****************************************************************************/
  27. TransPS::TransPS(GCIContainerPS* gciContainer) 
  28. {
  29.   m_repSender = new ExtSender();
  30.   m_gciContainerPS = gciContainer;
  31. }
  32. TransPS::~TransPS() 
  33. {
  34.   delete m_repSender;
  35. }
  36. void
  37. TransPS::init(TransporterFacade * tf, const char * connectString) 
  38. {
  39.   abort();
  40. #ifdef NOT_FUNCTIONAL
  41.   m_signalExecThread = NdbThread_Create(signalExecThread_C,
  42. (void **)this,
  43. 32768,
  44. "TransPS_Service",
  45. NDB_THREAD_PRIO_LOW);
  46.   ConfigRetriever configRetriever;
  47.   //  configRetriever.setConnectString(connectString);
  48.   Properties* config = configRetriever.getConfig("REP", REP_VERSION_ID);
  49.   if (config == 0) {
  50.     ndbout << "TransPS: Configuration error: ";
  51.     const char* erString = configRetriever.getErrorString();
  52.     if (erString == 0) {
  53.       erString = "No error specified!";
  54.     }
  55.     ndbout << erString << endl;
  56.     exit(-1);
  57.   }
  58.   Properties * extConfig;
  59.   /**
  60.    * @todo Hardcoded primary system name
  61.    */
  62.   if (!config->getCopy("EXTERNAL SYSTEM_External", &extConfig)) {
  63.     ndbout << "External System "External" not found in configuration. "
  64.    << "Check config.ini." << endl;
  65.     config->print();
  66.     exit(-1);
  67.   }
  68.   m_ownNodeId = configRetriever.getOwnNodeId();
  69.   extConfig->put("LocalNodeId", m_ownNodeId);
  70.   extConfig->put("LocalNodeType", "REP");
  71.   Uint32 noOfConnections;
  72.   extConfig->get("NoOfConnections", &noOfConnections);
  73.   /*  if (noOfConnections != 1) {
  74.     ndbout << "TransPS: There are " << noOfConnections << " connections "
  75.    << "defined in configuration" 
  76.    << endl
  77.    << "       There should be exactly one!" << endl;
  78.     exit(-1);
  79.   }
  80.   */
  81.   /******************************
  82.    * Set node id of external REP
  83.    ******************************/
  84.   const Properties * connection;
  85.   const char * extSystem;
  86.   Uint32 extRepNodeId, tmpOwnNodeId;
  87.   
  88.   for(Uint32 i=0; i < noOfConnections; i++) {
  89.     extConfig->get("Connection", i, &connection);
  90.     if(connection == 0) REPABORT("No connection found");
  91.     if(connection->get("System1", &extSystem)) {
  92.       connection->get("NodeId1", &extRepNodeId);
  93.       connection->get("NodeId2", &tmpOwnNodeId);
  94.     } else {
  95.       connection->get("System2", &extSystem);
  96.       connection->get("NodeId1", &tmpOwnNodeId);
  97.       connection->get("NodeId2", &extRepNodeId);
  98.     }
  99.     if(m_ownNodeId == tmpOwnNodeId)
  100.       break;
  101.   }
  102.   if(extRepNodeId==0) REPABORT("External replication server not found");
  103.   if(extSystem==0) REPABORT("External system not found");
  104.   m_ownBlockNo = tf->open(this, execSignal, execNodeStatus);
  105.   assert(m_ownBlockNo > 0);
  106.   m_ownRef = numberToRef(m_ownBlockNo, m_ownNodeId);
  107.   assert(m_ownNodeId == tf->ownId());
  108.   ndbout_c("Phase 4 (TransPS): Connection %d to external REP node %d opened",
  109.    m_ownBlockNo, extRepNodeId);
  110.   m_repSender->setNodeId(extRepNodeId);
  111.   m_repSender->setOwnRef(m_ownRef);
  112.   m_repSender->setTransporterFacade(tf);
  113. #endif
  114. }
  115. /*****************************************************************************
  116.  * Signal Queue Executor
  117.  *****************************************************************************/
  118. class SigMatch 
  119. {
  120. public:
  121.   int gsn;
  122.   void (TransPS::* function)(NdbApiSignal *signal);
  123.   SigMatch() { gsn = 0; function = NULL; };
  124.   SigMatch(int _gsn, void (TransPS::* _function)(NdbApiSignal *signal)) { 
  125.     gsn = _gsn;
  126.     function = _function;
  127.   };
  128.   
  129.   bool check(NdbApiSignal *signal) {
  130.     if(signal->readSignalNumber() == gsn) return true;
  131.     return false;
  132.   };
  133. };
  134. extern "C"
  135. void *
  136. signalExecThread_C(void *r) 
  137. {
  138.   TransPS *repps = (TransPS*)r;
  139.   repps->signalExecThreadRun();
  140.   NdbThread_Exit(0);
  141.   /* NOTREACHED */
  142.   return 0;
  143. }
  144. void
  145. TransPS::signalExecThreadRun() 
  146. {
  147.   Vector<SigMatch> sl;
  148.   /**
  149.    * Signals executed here
  150.    */
  151.   sl.push_back(SigMatch(GSN_REP_GET_GCI_REQ, 
  152. &TransPS::execREP_GET_GCI_REQ));
  153.   sl.push_back(SigMatch(GSN_REP_GET_GCIBUFFER_REQ,
  154. &TransPS::execREP_GET_GCIBUFFER_REQ));
  155.   sl.push_back(SigMatch(GSN_REP_CLEAR_PS_GCIBUFFER_REQ,
  156. &TransPS::execREP_CLEAR_PS_GCIBUFFER_REQ));
  157.   /** 
  158.    * Signals to be forwarded to GREP::PSCoord
  159.    */
  160.   sl.push_back(SigMatch(GSN_GREP_SUB_CREATE_REQ, &TransPS::sendSignalGrep));
  161.   
  162.   /** 
  163.    * Signals to be forwarded to GREP::PSCoord
  164.    */
  165.   sl.push_back(SigMatch(GSN_GREP_CREATE_SUBID_REQ, &TransPS::sendSignalGrep));
  166.   sl.push_back(SigMatch(GSN_GREP_SUB_START_REQ, &TransPS::sendSignalGrep));
  167.   sl.push_back(SigMatch(GSN_GREP_SUB_SYNC_REQ, &TransPS::sendSignalGrep));
  168.   sl.push_back(SigMatch(GSN_GREP_SUB_REMOVE_REQ, &TransPS::sendSignalGrep));
  169.   while(1) {
  170.     SigMatch *handler = NULL;
  171.     NdbApiSignal *signal = NULL;
  172.     if(m_signalRecvQueue.waitFor(sl, handler, signal, DEFAULT_TIMEOUT)) {
  173. #if 0
  174.       ndbout_c("TransPS: Removed signal from queue (GSN: %d, QSize: %d)",
  175.        signal->readSignalNumber(), m_signalRecvQueue.size());
  176. #endif
  177.       if(handler->function != 0) {
  178. (this->*handler->function)(signal);
  179. delete signal;
  180. signal = 0;
  181.       } else {
  182. REPABORT("Illegal handler for signal");
  183.       }
  184.     }
  185.   }
  186. }
  187. void
  188. TransPS::sendSignalRep(NdbApiSignal * s)
  189. {
  190.   m_repSender->sendSignal(s);
  191. }
  192. void
  193. TransPS::sendSignalGrep(NdbApiSignal * s) 
  194. {
  195.   m_grepSender->sendSignal(s);
  196. }
  197. void
  198. TransPS::sendFragmentedSignalRep(NdbApiSignal * s, 
  199.  LinearSectionPtr ptr[3], 
  200.  Uint32 sections)
  201. {
  202.   m_repSender->sendFragmentedSignal(s, ptr, sections);
  203. }
  204. void
  205. TransPS::sendFragmentedSignalGrep(NdbApiSignal * s, 
  206.   LinearSectionPtr ptr[3], 
  207.   Uint32 sections)
  208. {
  209.   m_grepSender->sendFragmentedSignal(s, ptr, sections);
  210. }
  211. void 
  212. TransPS::execNodeStatus(void* obj, Uint16 nodeId, bool alive, bool nfCompleted)
  213. {
  214. //  TransPS * thisObj = (TransPS*)obj;
  215.   
  216.   RLOG(("Node changed state (NodeId %d, Alive %d, nfCompleted %d)",
  217. nodeId, alive, nfCompleted));
  218.   
  219.   if(!alive && !nfCompleted) { }
  220.   
  221.   if(!alive && nfCompleted) { }
  222. }
  223. void
  224. TransPS::execSignal(void* executeObj, NdbApiSignal* signal, 
  225.   class LinearSectionPtr ptr[3]){
  226.   TransPS * executor = (TransPS *) executeObj;
  227.   const Uint32 gsn = signal->readSignalNumber();
  228.   const Uint32 len = signal->getLength();
  229.   
  230.   NdbApiSignal * s = new NdbApiSignal(executor->m_ownRef);
  231.   switch(gsn){
  232.   case GSN_REP_GET_GCI_REQ:
  233.   case GSN_REP_GET_GCIBUFFER_REQ:
  234.   case GSN_REP_CLEAR_PS_GCIBUFFER_REQ:
  235.     s->set(0, SSREPBLOCKNO, gsn, len);
  236.     memcpy(s->getDataPtrSend(), signal->getDataPtr(), 4 * len);
  237.     executor->m_signalRecvQueue.receive(s);
  238.     break;
  239.   case GSN_GREP_SUB_CREATE_REQ:    
  240.     {
  241.       if(signal->m_noOfSections > 0) {
  242. memcpy(s->getDataPtrSend(), signal->getDataPtr(), 4 * len);
  243. s->set(0, GREP, gsn,
  244.        len);
  245. executor->sendFragmentedSignalGrep(s,ptr,1);
  246. delete s;
  247.       } else {
  248. s->set(0, GREP, gsn, len);
  249. memcpy(s->getDataPtrSend(), signal->getDataPtr(), 4 * len);
  250. executor->m_signalRecvQueue.receive(s);
  251.       }
  252.     }
  253.     break;
  254.   case GSN_GREP_SUB_START_REQ:
  255.   case GSN_GREP_SUB_SYNC_REQ:
  256.   case GSN_GREP_SUB_REMOVE_REQ:
  257.   case GSN_GREP_CREATE_SUBID_REQ:
  258.     s->set(0, GREP, gsn, len);
  259.     memcpy(s->getDataPtrSend(), signal->getDataPtr(), 4 * len);
  260.     executor->m_signalRecvQueue.receive(s);
  261.     break;
  262.   default:
  263.     REPABORT1("Illegal signal received in execSignal", gsn);
  264.   }
  265. #if 0
  266.   ndbout_c("TransPS: Inserted signal into queue (GSN: %d, Len: %d)",
  267.    signal->readSignalNumber(), len);
  268. #endif
  269. }
  270. /*****************************************************************************
  271.  * Signal Receivers 
  272.  *****************************************************************************/
  273. void
  274. TransPS::execREP_GET_GCIBUFFER_REQ(NdbApiSignal* signal) 
  275. {
  276.   RepGetGciBufferReq * req = (RepGetGciBufferReq*)signal->getDataPtr();
  277.   Uint32 firstGCI = req->firstGCI;
  278.   Uint32 lastGCI  = req->lastGCI;
  279.   Uint32 nodeGrp  = req->nodeGrp;
  280.   
  281.   RLOG(("Received request for %d:[%d-%d]", nodeGrp, firstGCI, lastGCI));
  282.   
  283.   NodeGroupInfo * tmp = m_gciContainerPS->getNodeGroupInfo();
  284.   Uint32 nodeId = tmp->getPrimaryNode(nodeGrp);
  285.   /**
  286.    * If there is no connected node in the nodegroup -> abort.
  287.    * @todo: Handle error when a nodegroup is "dead"
  288.    */
  289.   if(!nodeId) {
  290.     RLOG(("There are no connected nodes in node group %d", nodeGrp));
  291.     sendREP_GET_GCIBUFFER_REF(signal, firstGCI, lastGCI, nodeGrp,
  292.       GrepError::REP_NO_CONNECTED_NODES);
  293.     return;
  294.   }
  295.   transferPages(firstGCI, lastGCI, nodeId, nodeGrp, signal);
  296.  
  297.   /**
  298.    * Done tfxing pages, sending GCIBuffer conf.
  299.    */
  300.   Uint32 first, last;
  301.   m_gciContainerPS->getAvailableGCIBuffers(nodeGrp, &first, &last);  
  302.   RepGetGciBufferConf * conf = (RepGetGciBufferConf*)req;
  303.   conf->senderRef  = m_ownRef;
  304.   conf->firstPSGCI = first;    // Buffers found on REP PS (piggy-back info)
  305.   conf->lastPSGCI  = last;
  306.   conf->firstSSGCI = firstGCI; // Now been transferred to REP SS
  307.   conf->lastSSGCI  = lastGCI;
  308.   conf->nodeGrp    = nodeGrp;
  309.   signal->set(0, SSREPBLOCKNO, GSN_REP_GET_GCIBUFFER_CONF, 
  310.       RepGetGciBufferConf::SignalLength);
  311.   sendSignalRep(signal);
  312.   RLOG(("Sent %d:[%d-%d] (Stored PS:%d:[%d-%d])",
  313. nodeGrp, firstGCI, lastGCI, nodeGrp, first, last));
  314. }
  315. void 
  316. TransPS::transferPages(Uint32 firstGCI, Uint32 lastGCI, 
  317.        Uint32 nodeId, Uint32 nodeGrp, 
  318.        NdbApiSignal * signal) 
  319. {
  320.   /**
  321.    *  Transfer pages in GCI Buffer to SS
  322.    *  When buffer is sent, send accounting information.
  323.    */
  324.   RepDataPage * pageData = (RepDataPage*)signal->getDataPtr();
  325.   LinearSectionPtr ptr[1];
  326.   GCIPage * page;
  327.   for(Uint32 i=firstGCI; i<=lastGCI; i++) {
  328.     Uint32 totalSizeSent = 0;
  329.     GCIBuffer * buffer = m_gciContainerPS->getGCIBuffer(i, nodeId);
  330.     if(buffer != 0) {   
  331.       GCIBuffer::iterator it(buffer);
  332.       /**
  333.        *  Send all pages to SS
  334.        */
  335.       for (page = it.first(); page != 0; page = it.next()) {
  336. ptr[0].p = page->getStoragePtr();
  337. ptr[0].sz = page->getStorageWordSize();
  338. totalSizeSent += ptr[0].sz;
  339. pageData->gci     = i;
  340. pageData->nodeGrp = nodeGrp;
  341. signal->set(0, SSREPBLOCKNO, GSN_REP_DATA_PAGE, 
  342.     RepDataPage::SignalLength);
  343. sendFragmentedSignalRep(signal, ptr, 1);
  344.       }
  345.       
  346.       /**
  347.        *  Send accounting information to SS
  348.        */ 
  349.       RepGciBufferAccRep * rep = (RepGciBufferAccRep *)pageData;
  350.       rep->gci = i;
  351.       rep->nodeGrp = nodeGrp;
  352.       rep->totalSentBytes = (4 * totalSizeSent); //words to bytes
  353.       signal->set(0, SSREPBLOCKNO, GSN_REP_GCIBUFFER_ACC_REP, 
  354.   RepGciBufferAccRep::SignalLength);
  355.       sendSignalRep(signal);
  356.       
  357.       RLOG(("Sending %d:[%d] (%d bytes) to external REP (nodeId %d)", 
  358.     nodeGrp, i, 4*totalSizeSent, nodeId));
  359.     }
  360.   }
  361.   page = 0;
  362. }
  363. void 
  364. TransPS::execREP_GET_GCI_REQ(NdbApiSignal* signal) 
  365. {
  366.   RepGetGciReq * req = (RepGetGciReq*)signal->getDataPtr();
  367.   Uint32 nodeGrp = req->nodeGrp;
  368.   Uint32 first, last;
  369.   m_gciContainerPS->getAvailableGCIBuffers(nodeGrp, &first, &last);
  370.   
  371.   RepGetGciConf * conf = (RepGetGciConf*) req;
  372.   conf->firstPSGCI = first;
  373.   conf->lastPSGCI  = last;
  374.   conf->senderRef  = m_ownRef;
  375.   conf->nodeGrp    = nodeGrp;
  376.   signal->set(0, SSREPBLOCKNO, GSN_REP_GET_GCI_CONF, 
  377.       RepGetGciConf::SignalLength);
  378.   sendSignalRep(signal); 
  379. }
  380. /**
  381.  * REP_CLEAR_PS_GCIBUFFER_REQ
  382.  * destroy the GCI buffer in the GCI Container
  383.  *  and send a CONF to Grep::SSCoord
  384.  */
  385. void 
  386. TransPS::execREP_CLEAR_PS_GCIBUFFER_REQ(NdbApiSignal * signal) 
  387. {
  388.   RepClearPSGciBufferReq * const req = 
  389.     (RepClearPSGciBufferReq*)signal->getDataPtr();
  390.   Uint32 firstGCI     = req->firstGCI;
  391.   Uint32 lastGCI      = req->lastGCI;
  392.   Uint32 nodeGrp = req->nodeGrp;
  393.   assert(firstGCI >= 0 && lastGCI > 0);
  394.   if(firstGCI<0 && lastGCI <= 0) 
  395.   {
  396.     RLOG(("WARNING! Illegal delete request ignored"));
  397.     sendREP_CLEAR_PS_GCIBUFFER_REF(signal, firstGCI, lastGCI,
  398.    0, nodeGrp,
  399.    GrepError::REP_DELETE_NEGATIVE_EPOCH);
  400.   }
  401.   if(firstGCI==0 && lastGCI==(Uint32)0xFFFF) {
  402.     m_gciContainerPS->getAvailableGCIBuffers(nodeGrp, &firstGCI, &lastGCI);
  403.     RLOG(("Deleting PS:[%d-%d]", firstGCI, lastGCI));
  404.   }
  405.   if(firstGCI == 0) {
  406.     Uint32 f, l;
  407.     m_gciContainerPS->getAvailableGCIBuffers(nodeGrp, &f, &l);
  408.     RLOG(("Deleting PS:[%d-%d]", f, l));
  409.     
  410.     if(f>firstGCI)
  411.       firstGCI = f;
  412.   }
  413.   
  414.   /**
  415.    * Delete buffer
  416.    * Abort if we try to destroy a buffer that does not exist
  417.    * Deleting buffer from every node in the nodegroup
  418.    */
  419.   for(Uint32 i=firstGCI; i<=lastGCI; i++) {    
  420.     if(!m_gciContainerPS->destroyGCIBuffer(i, nodeGrp)) {
  421.       sendREP_CLEAR_PS_GCIBUFFER_REF(signal, firstGCI, lastGCI, i, nodeGrp,
  422.      GrepError::REP_DELETE_NONEXISTING_EPOCH);
  423.       return;
  424.     }
  425.     
  426.     RLOG(("Deleted PS:%d:[%d]", nodeGrp, i));
  427.   }  
  428.   /**
  429.    * Send reply to Grep::SSCoord
  430.    */
  431.   RepClearPSGciBufferConf * conf = (RepClearPSGciBufferConf*)req;
  432.   conf->firstGCI = firstGCI;
  433.   conf->lastGCI  = lastGCI;
  434.   conf->nodeGrp  = nodeGrp;
  435.   signal->set(0, SSREPBLOCKNO, GSN_REP_CLEAR_PS_GCIBUFFER_CONF, 
  436.       RepClearPSGciBufferConf::SignalLength);   
  437.   sendSignalRep(signal);
  438. }
  439. /*****************************************************************************
  440.  * Signal Senders
  441.  *****************************************************************************/
  442. void 
  443. TransPS::sendREP_GET_GCI_REF(NdbApiSignal* signal,
  444.      Uint32 nodeGrp,
  445.      Uint32 firstPSGCI, Uint32 lastPSGCI,
  446.      GrepError::Code err)
  447.   RepGetGciRef * ref = (RepGetGciRef *)signal->getDataPtrSend();
  448.   ref->firstPSGCI          = firstPSGCI;
  449.   ref->lastPSGCI           = lastPSGCI;
  450.   ref->firstSSGCI          = 0;
  451.   ref->lastSSGCI           = 0;
  452.   ref->nodeGrp             = nodeGrp;
  453.   ref->err                 = err;
  454.   signal->set(0, SSREPBLOCKNO, GSN_REP_GET_GCI_REF, 
  455.       RepGetGciRef::SignalLength);   
  456.   sendSignalRep(signal);
  457. }
  458. void 
  459. TransPS::sendREP_CLEAR_PS_GCIBUFFER_REF(NdbApiSignal* signal, 
  460. Uint32 firstGCI, Uint32 lastGCI, 
  461. Uint32 currentGCI,
  462. Uint32 nodeGrp,
  463. GrepError::Code err)
  464. {
  465.   RepClearPSGciBufferRef * ref = 
  466.     (RepClearPSGciBufferRef *)signal->getDataPtrSend();
  467.   ref->firstGCI            = firstGCI;
  468.   ref->lastGCI             = lastGCI;
  469.   ref->currentGCI          = currentGCI; 
  470.   ref->nodeGrp             = nodeGrp;
  471.   ref->err                 = err;
  472.   signal->set(0, SSREPBLOCKNO, GSN_REP_CLEAR_PS_GCIBUFFER_REF, 
  473.       RepClearPSGciBufferRef::SignalLength);   
  474.   sendSignalRep(signal);
  475. }
  476. void
  477. TransPS::sendREP_GET_GCIBUFFER_REF(NdbApiSignal* signal,
  478.    Uint32 firstGCI, Uint32 lastGCI,
  479.    Uint32 nodeGrp,
  480.    GrepError::Code err)
  481. {
  482.   RepGetGciBufferRef * ref = 
  483.     (RepGetGciBufferRef *)signal->getDataPtrSend();
  484.   ref->firstPSGCI          = firstGCI;
  485.   ref->lastPSGCI           = lastGCI;
  486.   ref->firstSSGCI          = 0;
  487.   ref->lastSSGCI           = 0;
  488.   ref->nodeGrp             = nodeGrp;
  489.   ref->err                 = err;
  490.   signal->set(0, SSREPBLOCKNO, GSN_REP_GET_GCIBUFFER_REF, 
  491.       RepGetGciBufferRef::SignalLength);   
  492.   sendSignalRep(signal);
  493. }