RepStateRequests.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:9k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #include "RepState.hpp"
  14. #include <NdbApiSignal.hpp>
  15. #include <SimpleProperties.hpp>
  16. #include <UtilBuffer.hpp>
  17. #include <signaldata/GrepImpl.hpp>
  18. #include <signaldata/RepImpl.hpp>
  19. #include <signaldata/SumaImpl.hpp>
  20. #include <rep/rep_version.hpp>
  21. #include "Channel.hpp"
  22. /*****************************************************************************
  23.  * Helper functions
  24.  *****************************************************************************/
  25. void
  26. startSubscription(void * cbObj, NdbApiSignal* signal, 
  27.   SubscriptionData::Part part, 
  28.   Uint32 subId, Uint32 subKey)
  29.   ExtSender * ext = (ExtSender *) cbObj;
  30.   GrepSubStartReq * req = (GrepSubStartReq *)signal->getDataPtrSend();
  31.   req->subscriptionId   = subId;
  32.   req->subscriptionKey  = subKey;
  33.   req->part             = (Uint32) part;
  34.   signal->set(0, PSREPBLOCKNO, GSN_GREP_SUB_START_REQ,
  35.       GrepSubStartReq::SignalLength);   
  36.   ext->sendSignal(signal);
  37. }
  38. void
  39. scanSubscription(void * cbObj, NdbApiSignal* signal, 
  40.  SubscriptionData::Part part, 
  41.  Uint32 subId, Uint32 subKey)
  42.   ExtSender * ext = (ExtSender *) cbObj;
  43.   GrepSubSyncReq * req = (GrepSubSyncReq *)signal->getDataPtrSend();
  44.   req->subscriptionId  = subId;
  45.   req->subscriptionKey = subKey;
  46.   req->part            = part;
  47.   signal->set(0, PSREPBLOCKNO, GSN_GREP_SUB_SYNC_REQ,
  48.       GrepSubSyncReq::SignalLength);
  49.   ext->sendSignal(signal);
  50. }
  51. /*****************************************************************************
  52.  * RepState registered functions
  53.  *
  54.  * These registered functions are executed by RepState when
  55.  * RepState needs to have stuff done.
  56.  *****************************************************************************/
  57. void
  58. requestCreateSubscriptionId(void * cbObj, NdbApiSignal* signal) 
  59. {
  60.   ExtSender * ext = (ExtSender *) cbObj;
  61.   CreateSubscriptionIdReq * req = 
  62.     (CreateSubscriptionIdReq *)signal->getDataPtrSend();
  63.   req->senderData = ext->getOwnRef();
  64.   signal->set(0, PSREPBLOCKNO, GSN_GREP_CREATE_SUBID_REQ,
  65.       CreateSubscriptionIdReq::SignalLength);
  66.   ext->sendSignal(signal);
  67.     
  68. #ifdef DEBUG_GREP_SUBSCRIPTION
  69.   ndbout_c("Sent request for creation of subscription id to PS");
  70. #endif
  71. }
  72. void
  73. requestCreateSubscription(void * cbObj, 
  74.   NdbApiSignal* signal,
  75.   Uint32 subId, 
  76.   Uint32 subKey,
  77.   Vector<struct table *> * selectedTables) 
  78. {
  79.   ExtSender * ext = (ExtSender *) cbObj;
  80.   GrepSubCreateReq * req = (GrepSubCreateReq *)signal->getDataPtrSend();
  81.   req->senderRef = ext->getOwnRef();
  82.   req->subscriptionId = subId;
  83.   req->subscriptionKey = subKey;
  84.   if(selectedTables!=0) {
  85.     UtilBuffer m_buffer;
  86.     UtilBufferWriter w(m_buffer);
  87.     LinearSectionPtr tablePtr[3];
  88.     req->subscriptionType = SubCreateReq::SelectiveTableSnapshot;
  89.     for(Uint32 i=0; i< selectedTables->size(); i++) {
  90.       w.add(SimpleProperties::StringValue, (*selectedTables)[i]->tableName);
  91.     }
  92.     tablePtr[0].p = (Uint32*)m_buffer.get_data();
  93.     tablePtr[0].sz = m_buffer.length() >> 2;
  94.     signal->set(0, PSREPBLOCKNO, GSN_GREP_SUB_CREATE_REQ,
  95.                 GrepSubCreateReq::SignalLength);
  96.     ext->sendFragmentedSignal(signal, tablePtr, 1);
  97.   }
  98.   else {
  99.     req->subscriptionType = SubCreateReq::DatabaseSnapshot;
  100.     signal->set(0, PSREPBLOCKNO, GSN_GREP_SUB_CREATE_REQ,
  101.                 GrepSubCreateReq::SignalLength);
  102.     ext->sendFragmentedSignal(signal, 0, 0);
  103.   }
  104.   
  105. #ifdef DEBUG_GREP_SUBSCRIPTION
  106.   ndbout_c("Requestor: Sent request for creation of subscription");
  107. #endif
  108. }
  109. void
  110. requestRemoveSubscription(void * cbObj, NdbApiSignal* signal, 
  111.   Uint32 subId, Uint32 subKey) 
  112.   ExtSender * ext = (ExtSender *) cbObj;
  113.   GrepSubRemoveReq * req = (GrepSubRemoveReq *)signal->getDataPtrSend();
  114.   req->subscriptionId    = subId;
  115.   req->subscriptionKey   = subKey;
  116.   signal->set(0, PSREPBLOCKNO, GSN_GREP_SUB_REMOVE_REQ,
  117.       GrepSubRemoveReq::SignalLength);
  118.   ext->sendSignal(signal);
  119. }
  120. void
  121. requestTransfer(void * cbObj, NdbApiSignal * signal, 
  122. Uint32 nodeGrp, Uint32 first, Uint32 last) 
  123. {
  124.   ExtSender * ext = (ExtSender *) cbObj;
  125.   RepGetGciBufferReq * req = (RepGetGciBufferReq*)signal->getDataPtrSend();
  126.   req->firstGCI  = first;
  127.   req->lastGCI   = last;
  128.   req->nodeGrp   = nodeGrp;
  129.   req->senderRef = ext->getOwnRef();
  130.   signal->set(0, PSREPBLOCKNO, GSN_REP_GET_GCIBUFFER_REQ, 
  131.       RepGetGciBufferReq::SignalLength);   
  132.   ext->sendSignal(signal);
  133. #ifdef DEBUG_GREP_TRANSFER
  134.   ndbout_c("Requestor: Requested PS GCI buffers %d:[%d-%d]", 
  135.    nodeGrp, first, last);
  136. #endif
  137. }
  138. void
  139. requestApply(void * applyObj, NdbApiSignal * signal, 
  140.      Uint32 nodeGrp, Uint32 first, Uint32 last, Uint32 force) 
  141. {
  142.   AppNDB * applier = (AppNDB *) applyObj;
  143.   if (first != last) {
  144.     RLOG(("WARNING! Trying to apply range [%d-%d]. This is not implemeted",
  145.   first, last));
  146.   }
  147.   /**
  148.    * Apply GCIBuffer even if it is empty.
  149.    */
  150.   applier->applyBuffer(nodeGrp, first, force);
  151.   /**
  152.    *  @todo Handle return value from the method above
  153.    */
  154. }
  155. void 
  156. requestDeleteSS(void * cbObj, NdbApiSignal * signal, 
  157. Uint32 nodeGrp, Uint32 firstGCI, Uint32 lastGCI) 
  158. {
  159.   GCIContainer * container = (GCIContainer *) cbObj;
  160.   RLOG(("Deleting SS:%d:[%d-%d]", nodeGrp, firstGCI, lastGCI));
  161.   
  162.   if(firstGCI < 0 || lastGCI<=0 || nodeGrp < 0) {
  163.     REPABORT("Illegal interval or wrong node group"); 
  164.     //return GrepError::REP_DELETE_NEGATIVE_EPOCH;
  165.   }
  166.   /*********************************************
  167.    * All buffers : Modify to the available ones
  168.    *********************************************/
  169.   if(firstGCI==0 && lastGCI==(Uint32)0xFFFF) {
  170.     container->getAvailableGCIBuffers(nodeGrp, &firstGCI, &lastGCI);
  171.   }
  172.   if(firstGCI == 0) {
  173.     Uint32 f, l;
  174.     container->getAvailableGCIBuffers(nodeGrp, &f, &l);
  175.     RLOG(("Deleting SS:[%d-%d]", f, l));
  176.     if(f > firstGCI) firstGCI = f;
  177.   }
  178.   /**
  179.    * Delete buffers
  180.    */
  181.   for(Uint32 i=firstGCI; i<=lastGCI; i++) {
  182.     if(!container->destroyGCIBuffer(i, nodeGrp)) {
  183.       RLOG(("WARNING! Delete non-existing epoch SS:%d:[%d]", nodeGrp, i)); 
  184.     }
  185.     //RLOG(("RepStateRequests: Deleting buffer SS:%d:[%d]", nodeGrp, i));
  186.   }
  187. }
  188. void 
  189. requestDeletePS(void * cbObj, NdbApiSignal * signal, 
  190. Uint32 nodeGrp, Uint32 firstGCI, Uint32 lastGCI)
  191. {
  192.   ExtSender * ext = (ExtSender *) cbObj;
  193.   RepClearPSGciBufferReq * psReq = 
  194.     (RepClearPSGciBufferReq*)signal->getDataPtrSend();
  195.   /**
  196.    * @todo Should have better senderData /Lars
  197.    */
  198.   psReq->senderData = 4711;
  199.   psReq->senderRef = ext->getOwnRef();
  200.   psReq->firstGCI = firstGCI;
  201.   psReq->lastGCI = lastGCI;
  202.   psReq->nodeGrp = nodeGrp;
  203.   signal->set(0, PSREPBLOCKNO, GSN_REP_CLEAR_PS_GCIBUFFER_REQ,
  204.       RepClearPSGciBufferReq::SignalLength);   
  205.   ext->sendSignal(signal);
  206.   
  207.   RLOG(("Requesting deletion of PS:%d:[%d-%d]", nodeGrp, firstGCI, lastGCI));
  208. }
  209. /**
  210.  * Function that requests information from REP PS about stored GCI Buffers
  211.  */
  212. void 
  213. requestEpochInfo(void * cbObj, NdbApiSignal* signal, Uint32 nodeGrp) 
  214. {
  215.   ExtSender * ext = (ExtSender *) cbObj;
  216.   RepGetGciReq * req = (RepGetGciReq *) signal->getDataPtrSend();  
  217.   req->nodeGrp = nodeGrp;
  218.   signal->set(0, PSREPBLOCKNO, GSN_REP_GET_GCI_REQ,
  219.       RepGetGciReq::SignalLength);
  220.   ext->sendSignal(signal);
  221. }
  222. void
  223. requestStartMetaLog(void * cbObj, NdbApiSignal * signal,
  224.     Uint32 subId, Uint32 subKey)
  225.   RLOG(("Metalog starting. Subscription %d-%d", subId, subKey));
  226.   startSubscription(cbObj, signal, SubscriptionData::MetaData, subId, subKey);
  227. }
  228. void
  229. requestStartDataLog(void * cbObj, NdbApiSignal * signal,
  230.     Uint32 subId, Uint32 subKey)
  231.   RLOG(("Datalog starting. Subscription %d-%d", subId, subKey));
  232.   startSubscription(cbObj, signal, SubscriptionData::TableData, subId, subKey);
  233. }
  234. void 
  235. requestStartMetaScan(void * cbObj, NdbApiSignal* signal,
  236.      Uint32 subId, Uint32 subKey)
  237. {
  238.   RLOG(("Metascan starting. Subscription %d-%d", subId, subKey));
  239.   scanSubscription(cbObj, signal, SubscriptionData::MetaData, subId, subKey);
  240. }
  241. void 
  242. requestStartDataScan(void * cbObj, NdbApiSignal* signal,
  243.      Uint32 subId, Uint32 subKey)
  244. {
  245.   RLOG(("Datascan starting. Subscription %d-%d", subId, subKey));
  246.   scanSubscription(cbObj, signal, SubscriptionData::TableData, subId, subKey);
  247. }