Requestor.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:6k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #include "Requestor.hpp"
  14. #include "ConfigRetriever.hpp"
  15. #include <NdbApiSignal.hpp>
  16. #include <signaldata/RepImpl.hpp>
  17. #include <signaldata/GrepImpl.hpp>
  18. #include <signaldata/DictTabInfo.hpp>
  19. #include <signaldata/GetTabInfo.hpp>
  20. #include <signaldata/SumaImpl.hpp>
  21. #include <AttributeHeader.hpp>
  22. #include <rep/rep_version.hpp>
  23. #define TIME_BETWEEN_EXECUTES_MS 250
  24. /*
  25.  * @todo The requestor still has a TF, but this is not used...
  26.  *       (We will need a (set of) TF(s) for REP-REP 
  27.  *       on the same system though....)
  28.  */
  29. /*****************************************************************************
  30.  * Constructor / Destructor / Init
  31.  *****************************************************************************/
  32. Requestor::Requestor(GCIContainer * gciContainer, 
  33.      AppNDB * appNDB,
  34.      RepState * repState) 
  35. {
  36.   m_gciContainer = gciContainer;
  37.   m_applier      = appNDB;
  38.   m_repState     = repState;
  39.   //m_grepSender = new ExtSender();
  40.   //if (!m_grepSender) REPABORT("");
  41.   m_repState->setSubscriptionRequests(&requestCreateSubscriptionId,
  42.       &requestCreateSubscription,
  43.       &requestRemoveSubscription);
  44.   m_repState->setIntervalRequests(&requestTransfer, 
  45.   &requestApply,
  46.   &requestDeleteSS, 
  47.   &requestDeletePS);
  48.   m_repState->setStartRequests(&requestStartMetaLog,
  49.        &requestStartDataLog,
  50.        &requestStartMetaScan,
  51.        &requestStartDataScan,
  52.        &requestEpochInfo);
  53. }
  54. Requestor::~Requestor() {
  55.   //delete m_grepSender;
  56. }
  57. bool
  58. Requestor::init(const char * connectString) 
  59. {
  60.   m_signalExecThread = NdbThread_Create(signalExecThread_C,
  61. (void **)this,
  62. 32768,
  63. "Requestor_Service",
  64. NDB_THREAD_PRIO_LOW);
  65.   if (m_signalExecThread == NULL) 
  66.     return false;
  67.   return true;
  68. /*****************************************************************************
  69.  * Signal Queue Executor
  70.  *****************************************************************************/
  71. void *
  72. Requestor::signalExecThread_C(void *g) {
  73.   Requestor *requestor = (Requestor*)g;
  74.   requestor->signalExecThreadRun();
  75.   NdbThread_Exit(0);
  76.   /* NOTREACHED */
  77.   return 0;
  78. }
  79. class SigMatch 
  80. {
  81. public:
  82.   int gsn;
  83.   void (Requestor::* function)(NdbApiSignal *signal);
  84.   
  85.   SigMatch() { gsn = 0; function = NULL; };
  86.   
  87.   SigMatch(int _gsn, void (Requestor::* _function)(NdbApiSignal *signal)) {
  88.     gsn = _gsn;
  89.     function = _function;
  90.   };
  91.   
  92.   bool check(NdbApiSignal *signal) {
  93.     if(signal->readSignalNumber() == gsn)
  94.       return true;
  95.     return false;
  96.   };
  97. };
  98. void
  99. Requestor::signalExecThreadRun() 
  100. {
  101.   while(1) 
  102.   {
  103.     /**
  104.      * @todo  Here we would like to measure the usage size of the 
  105.      *        receive buffer of TransSS.  If the buffer contains
  106.      *        more than X signals (maybe 1k or 10k), then we should 
  107.      *        not do a protectedExecute.  
  108.      *        By having the usage size measure thingy,
  109.      *        we avoid having the Requestor requesting more 
  110.      *        things than the TransSS can handle.
  111.      *        /Lars
  112.      *
  113.      * @todo  A different implementation of this functionality 
  114.      *        would be to send a signal to myself when the protected 
  115.      *        execute is finished.  This solution could be 
  116.      *        discussed.
  117.      *        /Lars
  118.      */
  119.     m_repState->protectedExecute();
  120.     NdbSleep_MilliSleep(TIME_BETWEEN_EXECUTES_MS);
  121.   }
  122. }
  123. void 
  124. Requestor::sendSignalRep(NdbApiSignal * s) {
  125.   m_repSender->sendSignal(s);
  126. }
  127. void
  128. Requestor::execSignal(void* executorObj, NdbApiSignal* signal, 
  129.    class LinearSectionPtr ptr[3]){
  130.   Requestor * executor = (Requestor*)executorObj;  
  131.   const Uint32 gsn = signal->readSignalNumber();
  132.   const Uint32 len = signal->getLength();
  133.   
  134.   NdbApiSignal * s = new NdbApiSignal(executor->m_ownRef);
  135.   switch (gsn) {
  136.   case GSN_REP_GET_GCI_CONF:
  137.   case GSN_REP_GET_GCI_REQ:
  138.   case GSN_REP_GET_GCIBUFFER_REQ:
  139.   case GSN_REP_INSERT_GCIBUFFER_REQ:
  140.   case GSN_REP_CLEAR_SS_GCIBUFFER_REQ:
  141.   case GSN_REP_CLEAR_PS_GCIBUFFER_REQ:
  142.   case GSN_REP_DROP_TABLE_REQ:
  143.   case GSN_GREP_SUB_CREATE_REQ:
  144.   case GSN_GREP_SUB_START_REQ:
  145.   case GSN_GREP_SUB_SYNC_REQ:
  146.   case GSN_GREP_SUB_REMOVE_REQ:
  147.   case GSN_GREP_CREATE_SUBID_REQ:
  148.     s->set(0, PSREPBLOCKNO, gsn, len);
  149.     memcpy(s->getDataPtrSend(), signal->getDataPtr(), 4 * len);
  150.     executor->m_signalRecvQueue.receive(s);    
  151.     break;
  152.   default:
  153.     REPABORT1("Illegal signal received in execSignal", gsn);
  154.   }  
  155. #if 0
  156.   ndbout_c("Requestor: Inserted signal into queue (GSN: %d, Len: %d)",
  157.    signal->readSignalNumber(), len);
  158. #endif
  159. }
  160.   
  161. void 
  162. Requestor::execNodeStatus(void* obj, Uint16 nodeId, 
  163.   bool alive, bool nfCompleted)
  164. {
  165.   //Requestor * thisObj = (Requestor*)obj;
  166.   
  167.   RLOG(("Node changed status (NodeId %d, Alive %d, nfCompleted %d)",
  168. nodeId, alive, nfCompleted));
  169.   
  170.   if(alive) {
  171.     /**
  172.      *  Connected - set node as connected
  173.      *
  174.      *  @todo  Make it possible to have multiple External REP nodes
  175.      */
  176. #if 0
  177.     for(Uint32 i=0; i<thisObj->m_nodeConnectList.size(); i++) {
  178.       if(thisObj->m_nodeConnectList[i]->nodeId == nodeId)
  179. thisObj->m_nodeConnectList[i]->connected = true;
  180.     }
  181.     thisObj->m_grepSender->setNodeId(thisObj->m_nodeConnectList[0]->nodeId);
  182. #endif
  183.   }
  184.   if(!alive && !nfCompleted){
  185.     /**
  186.      *  ???
  187.      */
  188.   }
  189.   
  190.   if(!alive && nfCompleted){
  191.     /**
  192.      *  Re-connect
  193.      */
  194.   }
  195. }