TransSS.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:19k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #include "ConfigRetriever.hpp"
  14. #include <NdbApiSignal.hpp>
  15. #include <AttributeHeader.hpp>
  16. #include <signaldata/RepImpl.hpp>
  17. #include <signaldata/DictTabInfo.hpp>
  18. #include <signaldata/GetTabInfo.hpp>
  19. #include <signaldata/SumaImpl.hpp>
  20. #include <signaldata/GrepImpl.hpp>
  21. #include <SimpleProperties.hpp>
  22. #include <rep/rep_version.hpp>
  23. #include "TransSS.hpp"
  24. //#define DEBUG_REP_GET_GCI_CONF
  25. /*****************************************************************************
  26.  * Constructor / Destructor / Init
  27.  *****************************************************************************/
  28. TransSS::TransSS(GCIContainer * gciContainer, RepState * repState) 
  29. {
  30.   m_repSender = new ExtSender();
  31.   if (!m_repSender) REPABORT("Could not allocate new ExtSender");
  32.   m_gciContainer = gciContainer;
  33.   m_repState = repState;
  34. }
  35. TransSS::~TransSS() 
  36. {
  37.   delete m_repSender;
  38. }
  39. void
  40. TransSS::init(const char * connectString) 
  41. {
  42.   abort();
  43. #ifdef NOT_FUNCTIONAL
  44.   m_signalExecThread = NdbThread_Create(signalExecThread_C,
  45. (void **)this,
  46. 32768,
  47. "TransSS_Service",
  48. NDB_THREAD_PRIO_LOW);
  49.   ConfigRetriever configRetriever;
  50.   configRetriever.setConnectString(connectString);
  51.   
  52.   Properties* config = configRetriever.getConfig("REP", REP_VERSION_ID);
  53.   if (config == 0) {
  54.     ndbout << "Configuration error: ";
  55.     const char* erString = configRetriever.getErrorString();
  56.     if (erString == 0) {
  57.       erString = "No error specified!";
  58.     }
  59.     ndbout << erString << endl;
  60.     exit(-1);
  61.   }
  62.   Properties * extConfig;
  63.   /**
  64.    * @todo Hardcoded standby system name
  65.    */
  66.   if (!config->getCopy("EXTERNAL SYSTEM_External", &extConfig)) {
  67.     ndbout << "External System "External" not found in configuration. "
  68.    << "Check config.ini." << endl;
  69.     config->print();
  70.     exit(-1);
  71.   }
  72.   m_ownNodeId = configRetriever.getOwnNodeId();
  73.   extConfig->put("LocalNodeId", m_ownNodeId);
  74.   extConfig->put("LocalNodeType", "REP");
  75.   Uint32 noOfConnections;
  76.   extConfig->get("NoOfConnections", &noOfConnections);
  77.   /*  if (noOfConnections != 1) {
  78.     ndbout << "TransSS: There are " << noOfConnections << " connections "
  79.    << "defined in configuration" 
  80.    << endl
  81.    << "       There should be exactly one!" << endl;
  82.     exit(-1);
  83.     }*/
  84.   
  85.   /******************************
  86.    * Set node id of external REP
  87.    ******************************/
  88.   const Properties * connection;
  89.   const char * extSystem;
  90.  
  91.   Uint32 extRepNodeId, tmpOwnNodeId;
  92.   
  93.   for(Uint32 i=0; i < noOfConnections; i++) {
  94.     extConfig->get("Connection", i, &connection);
  95.     if(connection == 0) REPABORT("Connection not found");
  96.     if(connection->get("System1", &extSystem)) {
  97.       connection->get("NodeId1", &extRepNodeId);
  98.       connection->get("NodeId2", &tmpOwnNodeId);
  99.     } else {
  100.       connection->get("System2", &extSystem);
  101.       connection->get("NodeId1", &tmpOwnNodeId);
  102.       connection->get("NodeId2", &extRepNodeId);
  103.     }
  104.     if(m_ownNodeId == tmpOwnNodeId)
  105.       break;
  106.   }
  107.   if(extRepNodeId==0) REPABORT("External replication server not found");
  108.   if(extSystem==0) REPABORT("External system not found");
  109.   m_transporterFacade = new TransporterFacade();
  110.   if (!m_transporterFacade->init(extConfig)) 
  111.   {
  112.     ndbout << "TransSS: Failed to initialize transporter facade" << endl;
  113.     exit(-1);
  114.   } 
  115.   
  116.   m_ownBlockNo = m_transporterFacade->open(this, execSignal, execNodeStatus);
  117.   assert(m_ownBlockNo > 0);
  118.   m_ownRef = numberToRef(m_ownBlockNo, m_ownNodeId);
  119.   assert(m_ownNodeId == m_transporterFacade->ownId());
  120.   
  121.   ndbout_c("Phase 2 (TransSS): Connection %d to external REP node %d opened",
  122.    m_ownBlockNo, extRepNodeId);
  123.   
  124.   m_repSender->setNodeId(extRepNodeId);
  125.   m_repSender->setOwnRef(m_ownRef);
  126.   m_repSender->setTransporterFacade(m_transporterFacade);
  127. #endif
  128. }
  129. /*****************************************************************************
  130.  * Signal Queue Executor
  131.  *****************************************************************************/
  132. class SigMatch 
  133. {
  134. public:
  135.   int gsn;
  136.   void (TransSS::* function)(NdbApiSignal *signal);
  137.   
  138.   SigMatch() { gsn = 0; function = NULL; };
  139.   SigMatch(int _gsn, void (TransSS::* _function)(NdbApiSignal *signal)) {
  140.     gsn = _gsn;
  141.     function = _function;
  142.   };
  143.   
  144.   bool check(NdbApiSignal *signal) {
  145.     if(signal->readSignalNumber() == gsn)
  146.       return true;
  147.     return false;
  148.   };
  149. };
  150. extern "C"
  151. void *
  152. signalExecThread_C(void *r) 
  153. {
  154.   TransSS *transss = (TransSS*)r;
  155.   transss->signalExecThreadRun();
  156.   NdbThread_Exit(0);
  157.   /* NOTREACHED */
  158.   return 0;
  159. }
  160. void
  161. TransSS::signalExecThreadRun() 
  162. {
  163.   Vector<SigMatch> sl;
  164.   /**
  165.    * Signals to be forwarded to TransPS 
  166.    */
  167.   sl.push_back(SigMatch(GSN_REP_GET_GCI_REQ, 
  168. &TransSS::sendSignalRep));
  169.   sl.push_back(SigMatch(GSN_REP_GET_GCIBUFFER_REQ, 
  170. &TransSS::sendSignalRep));
  171.   /**
  172.    * Signals to be executed
  173.    */
  174.   sl.push_back(SigMatch(GSN_REP_GCIBUFFER_ACC_REP, 
  175. &TransSS::execREP_GCIBUFFER_ACC_REP));
  176.   sl.push_back(SigMatch(GSN_REP_DISCONNECT_REP, 
  177. &TransSS::execREP_DISCONNECT_REP));
  178.   sl.push_back(SigMatch(GSN_GREP_SUB_REMOVE_CONF, 
  179. &TransSS::execGREP_SUB_REMOVE_CONF));
  180.   sl.push_back(SigMatch(GSN_REP_GET_GCIBUFFER_CONF, 
  181. &TransSS::execREP_GET_GCIBUFFER_CONF));
  182.   sl.push_back(SigMatch(GSN_REP_CLEAR_PS_GCIBUFFER_CONF, 
  183. &TransSS::execREP_CLEAR_PS_GCIBUFFER_CONF));
  184.   sl.push_back(SigMatch(GSN_GREP_SUB_SYNC_CONF, 
  185. &TransSS::execGREP_SUB_SYNC_CONF));
  186.   sl.push_back(SigMatch(GSN_GREP_SUB_SYNC_REF, 
  187. &TransSS::execGREP_SUB_SYNC_REF));
  188.   sl.push_back(SigMatch(GSN_REP_GET_GCIBUFFER_REF, 
  189. &TransSS::execREP_GET_GCIBUFFER_REF));
  190.   /**
  191.    * Signals to be executed : Subscriptions
  192.    */
  193.   sl.push_back(SigMatch(GSN_GREP_CREATE_SUBID_CONF,
  194. &TransSS::execGREP_CREATE_SUBID_CONF));
  195.   sl.push_back(SigMatch(GSN_GREP_CREATE_SUBID_REF,
  196. &TransSS::execGREP_CREATE_SUBID_REF));
  197.   sl.push_back(SigMatch(GSN_GREP_SUB_CREATE_CONF, 
  198. &TransSS::execGREP_SUB_CREATE_CONF));
  199.   sl.push_back(SigMatch(GSN_GREP_SUB_CREATE_REF, 
  200. &TransSS::execGREP_SUB_CREATE_REF));
  201.   sl.push_back(SigMatch(GSN_GREP_SUB_START_CONF, 
  202. &TransSS::execGREP_SUB_START_CONF));
  203.   sl.push_back(SigMatch(GSN_GREP_SUB_START_REF,
  204. &TransSS::execGREP_SUB_START_REF));
  205.   /**
  206.    * Signals to be executed and forwarded
  207.    */
  208.   sl.push_back(SigMatch(GSN_REP_GET_GCI_CONF, 
  209. &TransSS::execREP_GET_GCI_CONF));
  210.   /**
  211.    * Signals to be forwarded
  212.    */
  213.   sl.push_back(SigMatch(GSN_GREP_SUB_REMOVE_REF, 
  214. &TransSS::execGREP_SUB_REMOVE_REF));
  215.   sl.push_back(SigMatch(GSN_REP_CLEAR_PS_GCIBUFFER_REF,
  216. &TransSS::execREP_CLEAR_PS_GCIBUFFER_REF));
  217.   sl.push_back(SigMatch(GSN_REP_GET_GCI_REF, 
  218. &TransSS::execREP_GET_GCI_REF));
  219.       
  220.   while(1) {
  221.     SigMatch *handler = NULL;
  222.     NdbApiSignal *signal = NULL;
  223.     if(m_signalRecvQueue.waitFor(sl, handler, signal, DEFAULT_TIMEOUT)) 
  224.     {
  225. #if 0
  226.       ndbout_c("TransSS: Removed signal from queue (GSN: %d, QSize: %d)",
  227.        signal->readSignalNumber(), m_signalRecvQueue.size());
  228. #endif
  229.       if(handler->function != 0) 
  230.       {
  231. (this->*handler->function)(signal);
  232. delete signal;
  233. signal = 0;
  234.       } else {
  235. REPABORT("Illegal handler for signal");
  236.       }
  237.     } 
  238.   }
  239. }
  240. void 
  241. TransSS::sendSignalRep(NdbApiSignal * s) 
  242. {
  243.   m_repSender->sendSignal(s);
  244. }
  245. void 
  246. TransSS::execNodeStatus(void* obj, Uint16 nodeId, 
  247. bool alive, bool nfCompleted)
  248. {
  249.   TransSS * thisObj = (TransSS*)obj;
  250.   if (alive) {
  251.     thisObj->m_repState->eventNodeConnected(nodeId);
  252.   } else if (!nfCompleted) {
  253.     thisObj->m_repState->eventNodeDisconnected(nodeId);
  254.   } else if (nfCompleted) {
  255.     thisObj->m_repState->eventNodeConnectable(nodeId);
  256.   } else {
  257.     REPABORT("Illegal state for execNodeStatus");
  258.   }
  259. }
  260. void
  261. TransSS::execSignal(void* executorObj, NdbApiSignal* signal, 
  262.     class LinearSectionPtr ptr[3])
  263. {
  264.   TransSS * executor = (TransSS *) executorObj;
  265.   const Uint32 gsn = signal->readSignalNumber();
  266.   const Uint32 len = signal->getLength();
  267.   NdbApiSignal * s = new NdbApiSignal(executor->m_ownRef);
  268.   switch (gsn) {
  269.   case GSN_REP_GET_GCI_REQ:
  270.   case GSN_REP_GET_GCIBUFFER_REQ:
  271.   case GSN_REP_GET_GCIBUFFER_CONF:
  272.   case GSN_GREP_SUB_REMOVE_CONF:
  273.   case GSN_REP_DISCONNECT_REP:
  274.   case GSN_REP_GCIBUFFER_ACC_REP:
  275.     s->set(0, PSREPBLOCKNO, gsn, len);    
  276.     memcpy(s->getDataPtrSend(), signal->getDataPtr(), 4 * len);
  277.     executor->m_signalRecvQueue.receive(s);    
  278.     break;
  279.   case GSN_GREP_CREATE_SUBID_CONF:
  280.   case GSN_GREP_SUB_CREATE_CONF:
  281.   case GSN_GREP_SUB_START_CONF:
  282.   case GSN_GREP_SUB_SYNC_CONF:
  283.   case GSN_REP_GET_GCI_CONF:
  284.   case GSN_REP_CLEAR_PS_GCIBUFFER_CONF:
  285.   case GSN_GREP_CREATE_SUBID_REF:
  286.   case GSN_GREP_SUB_CREATE_REF:
  287.   case GSN_GREP_SUB_START_REF:
  288.   case GSN_GREP_SUB_SYNC_REF:
  289.   case GSN_GREP_SUB_REMOVE_REF:
  290.   case GSN_REP_GET_GCI_REF:
  291.   case GSN_REP_GET_GCIBUFFER_REF:
  292.   case GSN_REP_CLEAR_PS_GCIBUFFER_REF:
  293.     s->set(0, GREP, gsn, len);    
  294.     memcpy(s->getDataPtrSend(), signal->getDataPtr(), 4 * len);
  295.     executor->m_signalRecvQueue.receive(s);    
  296.     break;
  297.   case GSN_REP_DATA_PAGE:
  298.     executor->execREP_DATA_PAGE(signal, ptr);
  299.     delete s;  s = 0;
  300.     break;
  301.   default:
  302.     REPABORT1("Illegal signal received in execSignal %d", gsn);
  303.   }
  304. #if 0
  305.   ndbout_c("TransSS: Inserted signal into queue (GSN: %d, Len: %d)",
  306.    signal->readSignalNumber(), len);
  307. #endif
  308. }
  309. /*****************************************************************************
  310.  * Signal Executors
  311.  *****************************************************************************/
  312. void
  313. TransSS::execREP_DATA_PAGE(NdbApiSignal * signal, LinearSectionPtr ptr[3])
  314. {
  315.   RepDataPage * const page = (RepDataPage*)signal->getDataPtr();
  316.   m_gciContainer->insertPage(page->gci, page->nodeGrp,
  317.      (char*)(ptr[0].p), 4 * ptr[0].sz); 
  318. }
  319. /**
  320.  * Recd from TransPS
  321.  */
  322. void
  323. TransSS::execREP_GCIBUFFER_ACC_REP(NdbApiSignal * signal) 
  324. {
  325.   RepGciBufferAccRep * const  rep = 
  326.     (RepGciBufferAccRep * )signal->getDataPtr();
  327.   Uint32 gci              = rep->gci;
  328.   Uint32 nodeGrp          = rep->nodeGrp;
  329.   Uint32 totalSize        = rep->totalSentBytes;
  330.   GCIBuffer * buffer      = m_gciContainer->getGCIBuffer(gci, nodeGrp);
  331.   Uint32 getReceivedBytes = 0;
  332.   if (buffer != 0) 
  333.     getReceivedBytes = buffer->getReceivedBytes();
  334.   RLOG(("TransSS: Received %d:[%d] (%d of %d bytes)",
  335. nodeGrp, gci, getReceivedBytes, totalSize));
  336.   if(getReceivedBytes != totalSize) {
  337.     REPABORT("Did not receive correct number of bytes");
  338.   } 
  339. }
  340. /**
  341.  *  Received from primary system
  342.  */
  343. void
  344. TransSS::execREP_GET_GCIBUFFER_CONF(NdbApiSignal * signal) 
  345. {
  346.   RepGetGciBufferConf * conf = (RepGetGciBufferConf*)signal->getDataPtr();
  347.   conf->senderRef = m_ownRef;
  348.   Uint32 first = conf->firstSSGCI;
  349.   Uint32 last  = conf->lastSSGCI;
  350.   for(Uint32 i = first; i <= last; i++) {
  351.     m_gciContainer->setCompleted(i, conf->nodeGrp);
  352.   }
  353.   /**
  354.    * Buffers @ PS
  355.    */
  356.   Interval ps(conf->firstPSGCI, conf->lastPSGCI);
  357.   m_repState->add(Channel::PS, conf->nodeGrp, ps);
  358.   /**
  359.    * Buffers @ SS
  360.    */
  361.   Uint32 ssfirst, sslast;
  362.   m_gciContainer->getAvailableGCIBuffers(conf->nodeGrp, &ssfirst, &sslast);
  363.   Interval ss(ssfirst, sslast);
  364.   m_repState->clear(Channel::SS, conf->nodeGrp, universeInterval);
  365.   m_repState->add(Channel::SS, conf->nodeGrp, ss);
  366.   m_repState->clear(Channel::SSReq, conf->nodeGrp, ss);
  367.   RLOG(("Transfered epochs (PS:%d[%d-%d], SS:%d[%d-%d])",
  368. conf->nodeGrp, conf->firstPSGCI, conf->lastPSGCI,
  369. conf->nodeGrp, conf->firstSSGCI, conf->lastSSGCI));
  370. }
  371. /**
  372.  *  Received from primary system
  373.  */
  374. void
  375. TransSS::execGREP_SUB_REMOVE_CONF(NdbApiSignal * signal) 
  376. {
  377.   GrepSubRemoveConf * conf = (GrepSubRemoveConf* )signal->getDataPtr();
  378.   Uint32 subId  = conf->subscriptionId;
  379.   Uint32 subKey = conf->subscriptionKey;
  380.   
  381.   /**
  382.    * @todo Fix this sending
  383.    */
  384. #if 0
  385.   signal->theData[0] = EventReport::GrepSubscriptionInfo;
  386.   signal->theData[1] = GrepEvent::GrepSS_SubRemoveConf;
  387.   signal->theData[2] = subId;
  388.   signal->theData[3] = subKey;
  389.   sendSignal(CMVMI_REF,GSN_EVENT_REP,signal, 4, JBB);
  390. #endif
  391.   m_repState->eventSubscriptionDeleted(subId, subKey);
  392.   RLOG(("Subscription deleted (SubId: %d, SubKey: %d)", subId, subKey));
  393. }
  394. void
  395. TransSS::execGREP_SUB_REMOVE_REF(NdbApiSignal * signal) 
  396. {
  397.   GrepSubRemoveRef * ref = (GrepSubRemoveRef* )signal->getDataPtr();
  398.   Uint32 subId  = ref->subscriptionId;
  399.   Uint32 subKey = ref->subscriptionKey;
  400.   /** @todo: Add repevent for this */
  401.   RLOG(("TransSS: Warning: Grep sub remove ref (SubId: %d, SubKey: %d)", 
  402. subId, subKey));
  403. }
  404. /**
  405.  *  Received from primary system
  406.  */
  407. void
  408. TransSS::execREP_GET_GCI_CONF(NdbApiSignal * signal) 
  409. {
  410.   RepGetGciConf * conf = (RepGetGciConf*)signal->getDataPtr();
  411.   Uint32 nodeGrp = conf->nodeGrp;
  412.   Interval i(conf->firstPSGCI, conf->lastPSGCI);
  413.   m_repState->add(Channel::PS, nodeGrp, i);
  414.   Uint32 first, last;
  415.   m_gciContainer->getAvailableGCIBuffers(nodeGrp, &first, &last);
  416.   Interval j(first, last);
  417.   m_repState->clear(Channel::SS, nodeGrp, universeInterval);
  418.   m_repState->add(Channel::SS, nodeGrp, j);
  419. #ifdef DEBUG_REP_GET_GCI_CONF
  420.   RLOG(("TransSS: Requestor info received "
  421.  "(PS: %d:[%d-%d], SS: %d:[%d-%d])",
  422.  conf->nodeGrp, conf->firstPSGCI, conf->lastPSGCI,
  423.  conf->nodeGrp, conf->firstSSGCI, conf->lastSSGCI));
  424. #endif
  425. }
  426. void
  427. TransSS::execREP_GET_GCI_REF(NdbApiSignal * signal) 
  428. {
  429.   RepGetGciRef * ref = (RepGetGciRef*)signal->getDataPtr();
  430.   Uint32 nodeGrp = ref->nodeGrp;
  431.   RLOG(("WARNING! Requestor info request failed (Nodegrp: %d)", nodeGrp));
  432. }
  433. /**
  434.  * Recd from GrepPS
  435.  * This signal means that a DB node has disconnected.
  436.  * @todo Do we need to know that a DB node disconnected?
  437.  *
  438.  * A node has disconnected (REP or PS DB)
  439.  * @todo let the requestor respond to this event 
  440.  * in a proper way.
  441.  */
  442. void
  443. TransSS::execREP_DISCONNECT_REP(NdbApiSignal * signal) 
  444. {
  445.   RepDisconnectRep * const rep = 
  446.     (RepDisconnectRep*)signal->getDataPtr();
  447.   
  448.   //Uint32 nodeId      = rep->nodeId;
  449.   Uint32 nodeType    = rep->nodeType;
  450.   if((RepDisconnectRep::NodeType)nodeType == RepDisconnectRep::REP)
  451.   {
  452.     m_repState->disable();
  453.   }
  454. }
  455. /**
  456.  *  The buffer is now deleted on REP PS.  We can now clear it from PS.
  457.  */
  458. void
  459. TransSS::execREP_CLEAR_PS_GCIBUFFER_CONF(NdbApiSignal * signal)
  460. {
  461.   RepClearPSGciBufferConf * const conf = 
  462.     (RepClearPSGciBufferConf*)signal->getDataPtr();
  463.   Uint32 firstGCI    = conf->firstGCI;
  464.   Uint32 lastGCI     = conf->lastGCI;
  465.   Uint32 nodeGrp     = conf->nodeGrp;
  466.   Interval i(firstGCI, lastGCI);
  467.   m_repState->clear(Channel::PS, nodeGrp, i);
  468.   m_repState->clear(Channel::DelReq, nodeGrp, i);
  469.  
  470.   RLOG(("Deleted PS:%d:[%d-%d]", nodeGrp, firstGCI, lastGCI));
  471. }
  472. /**
  473.  * Something went wrong when deleting buffer on REP PS
  474.  */
  475. void
  476. TransSS::execREP_CLEAR_PS_GCIBUFFER_REF(NdbApiSignal * signal) 
  477. {
  478.   RepClearPSGciBufferRef * const ref = 
  479.     (RepClearPSGciBufferRef*)signal->getDataPtr();
  480.   Uint32 firstGCI    = ref->firstGCI;
  481.   Uint32 lastGCI     = ref->lastGCI;
  482.   Uint32 nodeGrp     = ref->nodeGrp;
  483.   
  484.   RLOG(("WARNING! Could not delete PS:%d:[%d-%d]", nodeGrp, firstGCI, lastGCI));
  485. }
  486. /*****************************************************************************
  487.  * Signal Executors : SCAN
  488.  *****************************************************************************/
  489. /**
  490.  * Scan has started on PS side... (says PS REP)
  491.  */
  492. void
  493. TransSS::execGREP_SUB_SYNC_CONF(NdbApiSignal* signal) 
  494. {
  495.   GrepSubSyncConf * const conf = (GrepSubSyncConf * ) signal->getDataPtr();
  496.   Uint32 subId                 = conf->subscriptionId;
  497.   Uint32 subKey                = conf->subscriptionKey;
  498.   Interval epochs(conf->firstGCI, conf->lastGCI);
  499.   SubscriptionData::Part part  = (SubscriptionData::Part) conf->part;
  500.  
  501.   switch(part) {
  502.   case SubscriptionData::MetaData:
  503.     RLOG(("Metascan completed. Subcription %d-%d, Epochs [%d-%d]",
  504.   subId, subKey, epochs.first(), epochs.last()));
  505.     m_repState->eventMetaScanCompleted(signal, subId, subKey, epochs);
  506. #if 0
  507.     signal->theData[0] = EventReport::GrepSubscriptionInfo;
  508.     signal->theData[1] = GrepEvent::GrepSS_SubSyncMetaConf;
  509.     signal->theData[2] = subId;
  510.     signal->theData[3] = subKey;
  511.     signal->theData[4] = gci;
  512.     sendSignal(CMVMI_REF,GSN_EVENT_REP,signal, 5, JBB);
  513. #endif
  514.     break;
  515.   case SubscriptionData::TableData:
  516.     RLOG(("Datascan completed. Subcription %d-%d, Epochs [%d-%d]",
  517.   subId, subKey, epochs.first(), epochs.last()));
  518.     m_repState->eventDataScanCompleted(signal, subId, subKey, epochs);
  519. #if 0
  520.     signal->theData[0] = EventReport::GrepSubscriptionInfo;
  521.     signal->theData[1] = GrepEvent::GrepSS_SubSyncDataConf;
  522.     signal->theData[2] = subId;
  523.     signal->theData[3] = subKey;
  524.     signal->theData[4] = gci;
  525.     sendSignal(CMVMI_REF,GSN_EVENT_REP,signal, 5, JBB);
  526. #endif
  527.     break;
  528.   default:
  529.     REPABORT3("Wrong subscription part", part, subId, subKey);
  530.   }
  531. }
  532. void
  533. TransSS::execGREP_SUB_SYNC_REF(NdbApiSignal* signal) 
  534. {
  535.   GrepSubSyncRef * const ref = (GrepSubSyncRef * ) signal->getDataPtr();
  536.   Uint32 subId                    = ref->subscriptionId;
  537.   Uint32 subKey                   = ref->subscriptionKey;
  538.   SubscriptionData::Part part     = (SubscriptionData::Part) ref->part;
  539.   GrepError::Code error           = (GrepError::Code) ref->err;
  540.  
  541.   switch(part) {
  542.   case SubscriptionData::MetaData:
  543.     m_repState->eventMetaScanFailed(subId, subKey, error);
  544. #if 0
  545.     signal->theData[0] = EventReport::GrepSubscriptionAlert;
  546.     signal->theData[1] = GrepEvent::GrepSS_SubSyncMetaRef;
  547.     signal->theData[2] = subId;
  548.     signal->theData[3] = subKey;
  549.     //    signal->theData[4] = gci;
  550.     sendSignal(CMVMI_REF,GSN_EVENT_REP,signal, 5, JBB);
  551. #endif
  552.     break;
  553.   case SubscriptionData::TableData:
  554.     m_repState->eventDataScanFailed(subId, subKey, error);
  555. #if 0
  556.     signal->theData[0] = EventReport::GrepSubscriptionAlert;
  557.     signal->theData[1] = GrepEvent::GrepSS_SubSyncDataRef;
  558.     signal->theData[2] = subId;
  559.     signal->theData[3] = subKey;
  560.     //signal->theData[4] = m_lastScanGCI;
  561.     sendSignal(CMVMI_REF,GSN_EVENT_REP,signal, 5, JBB);
  562. #endif
  563.     break;
  564.   default:
  565.     REPABORT3("Wrong subscription part", part, subId, subKey);
  566.   }
  567. }
  568. /**
  569.  * Something went wrong says REP PS
  570.  */
  571. void 
  572. TransSS::execREP_GET_GCIBUFFER_REF(NdbApiSignal* signal) 
  573. {
  574.   RepGetGciBufferRef * const ref = (RepGetGciBufferRef*)signal->getDataPtr();
  575.   /*
  576.   Uint32 senderData       = ref->senderData;
  577.   Uint32 senderRef        = ref->senderRef;
  578.   Uint32 firstPSGCI       = ref->firstPSGCI;
  579.   Uint32 lastPSGCI        = ref->lastPSGCI;
  580.   Uint32 firstSSGCI       = ref->firstSSGCI;
  581.   Uint32 lastSSGCI        = ref->lastSSGCI;
  582.   Uint32 currentGCIBuffer = ref->currentGCIBuffer;
  583.   Uint32 nodeGrp          = ref->nodeGrp;
  584.   */
  585.   GrepError::Code err     = ref->err;
  586.   RLOG(("WARNING! Request to get buffer failed. Error %d:%s",
  587. err, GrepError::getErrorDesc(err)));
  588. }