RepState.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:25k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #include "RepState.hpp"
  14. #include <signaldata/SumaImpl.hpp>
  15. #include <NdbApiSignal.hpp>
  16. #include <Properties.hpp>
  17. //#define DBUG_REQUESTOR
  18. #ifdef DBUG_REQUESTOR
  19. #define DBUG_REQUESTOR_PRINT(X) ndbout_c(X);
  20. #else
  21. #define DBUG_REQUESTOR_PRINT(X)
  22. #endif
  23. /****************************************************************************
  24.  * Constructor / Destructor / Init
  25.  ****************************************************************************/
  26. RepState::RepState() 
  27. {
  28.   m_connected = UNKNOWN;
  29.   m_repConnected = UNKNOWN;
  30.   m_mutex = NdbMutex_Create();
  31.   m_stopEpoch = 0;
  32.   m_subIdToRemove = 0;
  33.   m_subKeyToRemove = 0;
  34. }
  35. RepState::~RepState() 
  36. {
  37.   NdbMutex_Destroy(m_mutex);
  38. }
  39. void
  40. RepState::setSubscriptionRequests(FuncRequestCreateSubscriptionId f1,
  41.   FuncRequestCreateSubscription f2,
  42.   FuncRequestRemoveSubscription f3)
  43. {
  44.   m_funcRequestCreateSubscriptionId = f1;
  45.   m_funcRequestCreateSubscription = f2;
  46.   m_funcRequestRemoveSubscription = f3;
  47. }
  48. void
  49. RepState::setIntervalRequests(FuncRequestTransfer f1, 
  50.       FuncRequestApply f2,
  51.       FuncRequestDeleteSS f3, 
  52.       FuncRequestDeletePS f4)
  53. {
  54.   m_funcRequestTransfer = f1;
  55.   m_funcRequestApply = f2;
  56.   m_funcRequestDeleteSS = f3;
  57.   m_funcRequestDeletePS = f4;
  58. }
  59. void
  60. RepState::setStartRequests(FuncRequestStartMetaLog * f5,
  61.    FuncRequestStartDataLog * f6,
  62.    FuncRequestStartMetaScan * f7,
  63.    FuncRequestStartDataScan * f8, 
  64.    FuncRequestEpochInfo * f9) 
  65. {
  66.   m_funcRequestStartMetaLog = f5;
  67.   m_funcRequestStartDataLog = f6;
  68.   m_funcRequestStartMetaScan = f7;
  69.   m_funcRequestStartDataScan = f8;
  70.   m_funcRequestEpochInfo = f9;
  71. }
  72. /****************************************************************************
  73.  * Private Helper functions
  74.  ****************************************************************************/
  75. void 
  76. RepState::requestTransfer(NdbApiSignal * signal) 
  77. {
  78.   DBUG_REQUESTOR_PRINT("RepState: Transfer calculations started");
  79.   for(Uint32 nodeGrp=0; nodeGrp<m_channel.getNoOfNodeGroups(); nodeGrp++) {
  80.     DBUG_REQUESTOR_PRINT("RepState: Transfer calc for node grp");
  81.     Interval i;
  82.     if (m_channel.requestTransfer(nodeGrp, &i)) {
  83.       m_funcRequestTransfer(m_extSender, signal, nodeGrp, i.first(), i.last());
  84.     }
  85.   }
  86. }
  87. void
  88. RepState::requestApply(NdbApiSignal * signal) 
  89. {
  90.   DBUG_REQUESTOR_PRINT("RepState: Apply calculations started");
  91.   for(Uint32 nodeGrp=0; nodeGrp<m_channel.getNoOfNodeGroups(); nodeGrp++) {
  92.     DBUG_REQUESTOR_PRINT("RepState: Apply calc for node grp");
  93.     Uint32 gci;
  94.     if (m_channel.requestApply(nodeGrp, &gci)) {
  95.       Uint32 force = (m_channel.getState() == Channel::LOG) ? 0 : 1;
  96.       m_funcRequestApply(m_applier, signal, nodeGrp, gci, gci, force);
  97.     }
  98.   }
  99. }
  100. void
  101. RepState::requestDelete(NdbApiSignal * signal) 
  102. {
  103.   DBUG_REQUESTOR_PRINT("RepState: Delete calculations started");
  104.   for(Uint32 nodeGrp=0; nodeGrp<m_channel.getNoOfNodeGroups(); nodeGrp++) {
  105.     DBUG_REQUESTOR_PRINT("RepState: Delete calc for node grp");
  106.     Interval i;
  107.     if (m_channel.requestDelete(nodeGrp, &i)){
  108.       m_funcRequestDeleteSS(m_gciContainer, signal, nodeGrp, 
  109.     i.first(), i.last());
  110.       m_funcRequestDeletePS(m_extSender, signal, nodeGrp, i.first(), i.last());
  111.     }
  112.   }
  113. }
  114. void
  115. RepState::requestEpochInfo(NdbApiSignal * signal) 
  116. {
  117.   DBUG_REQUESTOR_PRINT("RepState: Epoch Info calculations");
  118.   for(Uint32 nodeGrp=0; nodeGrp<m_channel.getNoOfNodeGroups(); nodeGrp++) {
  119.     m_funcRequestEpochInfo(m_extSender, signal, nodeGrp);
  120.   }
  121. }
  122. /****************************************************************************
  123.  * Public 
  124.  ****************************************************************************/
  125. GrepError::Code
  126. RepState::add(Channel::Position s, Uint32 nodeGrp, const Interval i) 
  127. {
  128.   m_channel.add(s, nodeGrp, i);
  129.   if(s == Channel::PS) 
  130.   {
  131.     m_connected = CONNECTED;
  132.     m_connected_counter = 0;
  133.   }
  134.   Interval fullEpochs;
  135.   m_channel.getFullyAppliedEpochs(&fullEpochs);
  136.   if(s == Channel::App &&
  137.      m_channel.getState() == Channel::DATASCAN_COMPLETED && 
  138.      fullEpochs.last() >= m_channel.getDataScanEpochs().last() &&
  139.      fullEpochs.last() >= m_channel.getMetaScanEpochs().last())
  140.   {
  141.     RLOG(("[%d-%d] fully applied. Channel state changed to LOG",
  142.   fullEpochs.first(), fullEpochs.last()));
  143.     m_channel.setState(Channel::LOG);
  144.     disableAutoStart();
  145.   }
  146.   return GrepError::NO_ERROR;
  147. }
  148. GrepError::Code 
  149. RepState::clear(Channel::Position s, Uint32 nodeGrp, const Interval i) 
  150. {
  151.   m_channel.clear(s, nodeGrp, i);
  152.   return GrepError::NO_ERROR;
  153. }
  154. /****************************************************************************
  155.  * Execute 
  156.  * 
  157.  * This method should only be called from Requestor!
  158.  ****************************************************************************/
  159. GrepError::Code 
  160. RepState::protectedExecute()
  161. {
  162.   GrepError::Code err;
  163.   
  164.   NdbMutex_Lock(m_mutex);
  165.   
  166.   NdbApiSignal* signal = m_extSender->getSignal();
  167.   if (signal == NULL) {
  168.     err = GrepError::COULD_NOT_ALLOCATE_MEM_FOR_SIGNAL;
  169.   } else {
  170.     err = execute(signal);
  171.   }
  172.   NdbMutex_Unlock(m_mutex);
  173.   return err;
  174. }
  175. GrepError::Code 
  176. RepState::execute(NdbApiSignal* signal)
  177. {
  178.   Uint32 subId = m_channel.getSubId();
  179.   Uint32 subKey = m_channel.getSubKey();
  180.   if (!m_channel.m_requestorEnabled) 
  181.     return GrepError::NO_ERROR;
  182.   /**
  183.    * @todo Should have subscriptions in here
  184.    */
  185.   requestEpochInfo(signal);
  186.   /**
  187.    * Update connected counter (Silence time)
  188.    */
  189.   m_connected_counter++;
  190.   if (m_connected_counter > REQUESTOR_EXECUTES_NEEDED_FOR_UNKNOWN_CONNECTION) {
  191.     m_connected = UNKNOWN;
  192.   }
  193.   switch (m_channel.getState()) 
  194.   {
  195.   case Channel::CONSISTENT:
  196.     if (isAutoStartEnabled()) {
  197.       switch (m_channel.getStateSub()) 
  198.       {
  199.       case Channel::NO_SUBSCRIPTION_EXISTS:
  200. m_funcRequestCreateSubscriptionId(m_extSender, signal);  
  201. m_channel.setStateSub(Channel::CREATING_SUBSCRIPTION_ID);
  202. break;
  203.       case Channel::CREATING_SUBSCRIPTION_ID:
  204. break;
  205.       case Channel::SUBSCRIPTION_ID_CREATED:
  206. if(m_channel.isSelective())
  207.           m_funcRequestCreateSubscription(m_extSender, signal,
  208.                                           m_channel.getSubId(),
  209.                                           m_channel.getSubKey(),
  210.                                           m_channel.getSelectedTables());
  211.         else
  212.           m_funcRequestCreateSubscription(m_extSender, signal,
  213.                                           m_channel.getSubId(),
  214.                                           m_channel.getSubKey(),
  215.                                           0);
  216. m_channel.setStateSub(Channel::STARTING_SUBSCRIPTION);
  217. break;
  218.       case Channel::STARTING_SUBSCRIPTION:
  219. break;
  220.       case Channel::SUBSCRIPTION_STARTED:
  221. m_funcRequestStartMetaLog(m_extSender, signal, 
  222.   m_channel.getSubId(),
  223.   m_channel.getSubKey());
  224. m_channel.setState(Channel::METALOG_STARTING);
  225. break;
  226.       } 
  227.     }
  228.     break;
  229.   case Channel::METALOG_STARTING:
  230.     break;
  231.   case Channel::METALOG_STARTED:
  232.     if (isAutoStartEnabled()) {
  233.       m_funcRequestStartMetaScan(m_extSender, signal, subId, subKey);
  234.       m_channel.setState(Channel::METASCAN_STARTING);
  235.     }
  236.     break;
  237.   case Channel::METASCAN_STARTING:
  238.     break;
  239.   case Channel::METASCAN_COMPLETED:
  240.     if (isAutoStartEnabled()) {
  241.       m_funcRequestStartDataLog(m_extSender, signal, subId, subKey);
  242.       m_channel.setState(Channel::DATALOG_STARTING);
  243.     }
  244.     break;
  245.   case Channel::DATALOG_STARTING:
  246.     break;
  247.   case Channel::DATALOG_STARTED:
  248.     if (isAutoStartEnabled()) {
  249.       m_funcRequestStartDataScan(m_extSender, signal, subId, subKey);
  250.       m_channel.setState(Channel::DATASCAN_STARTING);
  251.     }
  252.     break;
  253.   case Channel::DATASCAN_STARTING:
  254.     break;
  255.   case Channel::DATASCAN_COMPLETED:
  256.     break;
  257.   case Channel::LOG:
  258.     if (m_channel.shouldStop()) {
  259.       disableTransfer();
  260.       m_channel.setState(Channel::STOPPING);
  261.     }
  262.     break;
  263.   case Channel::STOPPING:
  264.     if (m_channel.m_transferEnabled) 
  265.     {
  266.       REPABORT("Illegal stopping state while transfer is still enabled");
  267.     }
  268.     /**
  269.      * check if channel has a subscription, if not,
  270.      * check if we have marked a subscription that we want to remove
  271.      * and remove it. This is used to clean up "dangling subscriptions"
  272.      * after various crashes
  273.      */
  274.     if(!m_channel.subscriptionExists())
  275.     { 
  276.       if(m_subIdToRemove && m_subKeyToRemove) 
  277.       {
  278. m_funcRequestRemoveSubscription(m_extSender, signal, 
  279. m_subIdToRemove, 
  280. m_subKeyToRemove);
  281. eventSubscriptionDeleted( m_subIdToRemove, 
  282.   m_subKeyToRemove);
  283. return GrepError::NO_ERROR;
  284.       }  
  285.       else {
  286. return GrepError::SUBSCRIPTION_ID_NOT_FOUND;
  287.       }
  288.     } else {
  289.       if (m_channel.isStoppable())
  290. {
  291.   
  292.   m_funcRequestRemoveSubscription(m_extSender, signal, 
  293.   m_channel.getSubId(), 
  294.   m_channel.getSubKey());
  295.   eventSubscriptionDeleted(m_channel.getSubId(), 
  296.    m_channel.getSubKey());  
  297. }
  298.       else 
  299. return GrepError::CHANNEL_NOT_STOPPABLE;
  300.       
  301.     }
  302.     break;
  303.   default:
  304.     REPABORT("Illegal replication state");
  305.   }
  306.   if (m_channel.m_transferEnabled)  requestTransfer(signal);
  307.   if (m_channel.m_applyEnabled)     requestApply(signal);
  308.   if (m_channel.m_deleteEnabled)    requestDelete(signal); 
  309.   return GrepError::NO_ERROR;
  310. }
  311. /****************************************************************************
  312.  * Request
  313.  * 
  314.  * This method should only be called from Main Thread!
  315.  ****************************************************************************/
  316. GrepError::Code 
  317. RepState::protectedRequest(GrepReq::Request req, Uint32 arg)
  318. {
  319.   return protectedRequest(req, arg, 0);
  320. }
  321. GrepError::Code 
  322. RepState::protectedRequest(GrepReq::Request req, Uint32 arg1, Uint32 arg2)
  323. {
  324.   GrepError::Code code;
  325.   NdbMutex_Lock(m_mutex);
  326.   NdbApiSignal* signal = m_extSender->getSignal();
  327.   if (signal == NULL) {
  328.     code = GrepError::COULD_NOT_ALLOCATE_MEM_FOR_SIGNAL;
  329.   } else {
  330.     code = request(req, arg1, arg2, signal);
  331.   }
  332.   NdbMutex_Unlock(m_mutex);
  333.   return code;
  334. }
  335. GrepError::Code
  336. RepState::protectedAddTable(const char * fullTableName)
  337. {
  338.   GrepError::Code code;
  339.   NdbMutex_Lock(m_mutex);
  340.   code  =  m_channel.addTable(fullTableName);
  341.   NdbMutex_Unlock(m_mutex);
  342.   return code;
  343. }
  344. GrepError::Code
  345. RepState::protectedRemoveTable(const char * fullTableName)
  346. {
  347.   GrepError::Code code;
  348.   if(m_channel.getStateSub() !=  Channel::NO_SUBSCRIPTION_EXISTS)
  349.     return GrepError::START_ALREADY_IN_PROGRESS;
  350.   NdbMutex_Lock(m_mutex);
  351.   code  =  m_channel.removeTable(fullTableName);
  352.   NdbMutex_Unlock(m_mutex);
  353.   return code;
  354. }
  355. GrepError::Code 
  356. RepState::request(GrepReq::Request request, Uint32 arg1, Uint32 arg2, 
  357.   NdbApiSignal* signal) 
  358. {
  359.   switch (request) 
  360.   {
  361.     /*************************************************************************
  362.      * STATUS etc
  363.      *************************************************************************/
  364.   case GrepReq::STATUS:
  365.     printStatus();
  366.     break;
  367.   case GrepReq::REMOVE_BUFFERS:
  368.     return GrepError::NOT_YET_IMPLEMENTED;
  369.     
  370.     /*************************************************************************
  371.      * START
  372.      *************************************************************************/
  373.   case GrepReq::CREATE_SUBSCR:
  374.     if (m_channel.getStateSub() != Channel::NO_SUBSCRIPTION_EXISTS) 
  375.       return GrepError::SUBSCRIPTION_ID_ALREADY_EXIST;
  376.     m_funcRequestCreateSubscriptionId(m_extSender, signal);  
  377.     m_channel.setStateSub(Channel::CREATING_SUBSCRIPTION_ID);
  378.     return GrepError::NO_ERROR;
  379.   case GrepReq::START_SUBSCR:
  380.     if (m_channel.getState() == Channel::STOPPING)
  381.       return GrepError::ILLEGAL_ACTION_WHEN_STOPPING;
  382.     if (m_channel.getStateSub() != Channel::SUBSCRIPTION_ID_CREATED)
  383.       return GrepError::SUBSCRIPTION_ID_NOT_FOUND;
  384.     if(m_channel.isSelective())
  385.       m_funcRequestCreateSubscription(m_extSender, signal,
  386.                                       m_channel.getSubId(),
  387.                                       m_channel.getSubKey(),
  388.                                       m_channel.getSelectedTables());
  389.     else
  390.       m_funcRequestCreateSubscription(m_extSender, signal,
  391.                                       m_channel.getSubId(),
  392.                                       m_channel.getSubKey(),
  393.                                       0);
  394.     m_channel.setStateSub(Channel::STARTING_SUBSCRIPTION);
  395.     return GrepError::NO_ERROR;
  396.   case GrepReq::START_METALOG:
  397.     if (m_channel.getState() == Channel::STOPPING)
  398.       return GrepError::ILLEGAL_ACTION_WHEN_STOPPING;
  399.     if (m_channel.getStateSub() != Channel::SUBSCRIPTION_STARTED)
  400.       return GrepError::SUBSCRIPTION_NOT_STARTED;
  401.     if (m_channel.getState() != Channel::CONSISTENT)
  402.       return GrepError::START_OF_COMPONENT_IN_WRONG_STATE;
  403.     
  404.     m_funcRequestStartMetaLog(m_extSender, signal, 
  405.       m_channel.getSubId(), 
  406.       m_channel.getSubKey());  
  407.     m_channel.setState(Channel::METALOG_STARTING);
  408.     return GrepError::NO_ERROR;
  409.   case GrepReq::START_METASCAN:
  410.     if (m_channel.getState() == Channel::STOPPING)
  411.       return GrepError::ILLEGAL_ACTION_WHEN_STOPPING;
  412.     if (m_channel.getStateSub() != Channel::SUBSCRIPTION_STARTED)
  413.       return GrepError::SUBSCRIPTION_NOT_STARTED;
  414.     if (m_channel.getState() != Channel::METALOG_STARTED)
  415.       return GrepError::START_OF_COMPONENT_IN_WRONG_STATE;
  416.  
  417.     m_funcRequestStartMetaScan(m_extSender, signal,
  418.        m_channel.getSubId(), 
  419.        m_channel.getSubKey());  
  420.     m_channel.setState(Channel::METASCAN_STARTING);
  421.     return GrepError::NO_ERROR;
  422.   case GrepReq::START_DATALOG:
  423.     if (m_channel.getState() == Channel::STOPPING)
  424.       return GrepError::ILLEGAL_ACTION_WHEN_STOPPING;
  425.     if (m_channel.getStateSub() != Channel::SUBSCRIPTION_STARTED)
  426.       return GrepError::SUBSCRIPTION_NOT_STARTED;
  427.     if (m_channel.getState() != Channel::METASCAN_COMPLETED)
  428.       return GrepError::START_OF_COMPONENT_IN_WRONG_STATE;
  429.     m_funcRequestStartDataLog(m_extSender, signal,
  430.       m_channel.getSubId(), 
  431.       m_channel.getSubKey());  
  432.     m_channel.setState(Channel::DATALOG_STARTING);
  433.     return GrepError::NO_ERROR;
  434.   case GrepReq::START_DATASCAN:
  435.     if (m_channel.getState() == Channel::STOPPING)
  436.       return GrepError::ILLEGAL_ACTION_WHEN_STOPPING;
  437.     if (m_channel.getStateSub() != Channel::SUBSCRIPTION_STARTED)
  438.       return GrepError::SUBSCRIPTION_NOT_STARTED;
  439.     if (m_channel.getState() != Channel::DATALOG_STARTED)
  440.       return GrepError::START_OF_COMPONENT_IN_WRONG_STATE;
  441.     m_funcRequestStartDataScan(m_extSender, signal,
  442.       m_channel.getSubId(), 
  443.       m_channel.getSubKey());  
  444.     m_channel.setState(Channel::DATASCAN_STARTING);
  445.     return GrepError::NO_ERROR;
  446.   case GrepReq::START_REQUESTOR:
  447.     enable();
  448.     return GrepError::NO_ERROR;
  449.     
  450.   case GrepReq::START_TRANSFER:
  451.     if (m_channel.getState() == Channel::STOPPING)
  452.       return GrepError::ILLEGAL_ACTION_WHEN_STOPPING;
  453.     enableTransfer();
  454.     return GrepError::NO_ERROR;
  455.   case GrepReq::START_APPLY:
  456.     enableApply();
  457.     return GrepError::NO_ERROR;
  458.   case GrepReq::START_DELETE:
  459.     enableDelete();
  460.     return GrepError::NO_ERROR;
  461.   case GrepReq::START:
  462.     if (isAutoStartEnabled())
  463.       return GrepError::START_ALREADY_IN_PROGRESS;
  464.     enableAutoStart();
  465.     return GrepError::NO_ERROR;
  466.     
  467.     /*************************************************************************
  468.      * STOP
  469.      *************************************************************************/
  470.   case GrepReq::STOP:
  471.     if (m_channel.getStateSub() == Channel::NO_SUBSCRIPTION_EXISTS)
  472.       return GrepError::SUBSCRIPTION_NOT_STARTED;
  473.     if (m_channel.getState() == Channel::STOPPING)
  474.       return GrepError::ILLEGAL_ACTION_WHEN_STOPPING;
  475.     if (arg1 == 0) { 
  476.       /**
  477.        * Stop immediately
  478.        */
  479.       disableTransfer();
  480.       m_channel.setState(Channel::STOPPING);
  481.       m_channel.setStopEpochId(0);
  482.       return GrepError::NO_ERROR;
  483.     } else {
  484.       /**
  485.        * Set future stop epoch
  486.        */
  487.       return m_channel.setStopEpochId(arg1);
  488.     }
  489.   case GrepReq::STOP_SUBSCR:
  490.     {
  491.       if(m_subIdToRemove == 0 && m_subKeyToRemove == 0) {
  492.   m_subIdToRemove   = arg1;
  493.   m_subKeyToRemove  = arg2;
  494.       } else {
  495. return GrepError::ILLEGAL_ACTION_WHEN_STOPPING;
  496.       }
  497.       
  498.       if(m_channel.getSubId() != 0 && m_channel.getSubKey() != 0)
  499. return GrepError::ILLEGAL_USE_OF_COMMAND;
  500.       if (m_channel.getState() == Channel::STOPPING)
  501. return GrepError::ILLEGAL_ACTION_WHEN_STOPPING;
  502.       disableTransfer();
  503.       m_channel.setState(Channel::STOPPING);
  504.       return GrepError::NO_ERROR;
  505.     }
  506.   case GrepReq::STOP_METALOG:
  507.   case GrepReq::STOP_METASCAN:
  508.   case GrepReq::STOP_DATALOG:
  509.   case GrepReq::STOP_DATASCAN:
  510.     return GrepError::NOT_YET_IMPLEMENTED;
  511.   case GrepReq::STOP_REQUESTOR:
  512.     disable();
  513.     return GrepError::NO_ERROR;
  514.     
  515.   case GrepReq::STOP_TRANSFER:
  516.     disableTransfer();
  517.     return GrepError::NO_ERROR;
  518.   case GrepReq::STOP_APPLY:
  519.     disableApply();
  520.     return GrepError::NO_ERROR;
  521.   case GrepReq::STOP_DELETE:
  522.     disableDelete();
  523.     return GrepError::NO_ERROR;
  524.   default:
  525.     ndbout_c("RepCommandInterpreter: Illegal request received");
  526.     return GrepError::NOT_YET_IMPLEMENTED;
  527.   }
  528.   return GrepError::NOT_YET_IMPLEMENTED;
  529. }
  530. /****************************************************************************
  531.  * 
  532.  ****************************************************************************/
  533. /*
  534. GrepError::Code
  535. RepState::slowStop() 
  536. {
  537.   switch(m_channel.getState()) 
  538.   {
  539.   case Channel::LOG:
  540.     m_channel.setState(Channel::LOG_SLOW_STOP);
  541.     return GrepError::NO_ERROR;
  542.   default:
  543.     return GrepError::REQUESTOR_ILLEGAL_STATE_FOR_SLOWSTOP;
  544.   }
  545. }
  546. GrepError::Code
  547. RepState::fastStop() 
  548. {
  549.   switch(m_channel.getState()) 
  550.   {
  551.   case Channel::LOG:
  552.     m_channel.setState(Channel::LOG_FAST_STOP);
  553.     return GrepError::NO_ERROR;
  554.   default:
  555.     return GrepError::REQUESTOR_ILLEGAL_STATE_FOR_FASTSTOP;
  556.   }
  557. }
  558. */
  559. /****************************************************************************
  560.  * Print Status
  561.  ****************************************************************************/
  562. static const char* 
  563. headerText =
  564. "+-------------------------------------------------------------------------+n"
  565. "|                         MySQL Replication Server                        |n"
  566. "+-------------------------------------------------------------------------+n"
  567. ;
  568. static const char* 
  569. channelHeaderText =
  570. "+-------------------------------------------------------------------------+n"
  571. "|                   Applier Channel 1 Replication Status                  |n"
  572. "+-------------------------------------------------------------------------+n"
  573. ;
  574. static const char* 
  575. line =
  576. "+-------------------------------------------------------------------------+n"
  577. ;
  578. Properties *
  579. RepState::getStatus() 
  580. {
  581.   Properties * prop = new Properties();
  582.   if(prop == NULL)
  583.     return NULL;
  584.   NdbMutex_Lock(m_mutex);
  585.   prop->put("nodegroups", (int)m_channel.getNoOfNodeGroups());
  586. //  prop->put("epoch_state", m_channel.getEpochState());
  587.   NdbMutex_Unlock(m_mutex);
  588.   return prop;
  589. }
  590. Properties * RepState::query(QueryCounter counter, Uint32 replicationId) 
  591. {
  592.   Properties * prop = new Properties();
  593.   if(prop == NULL)
  594.     return NULL;
  595.   NdbMutex_Lock(m_mutex);
  596.   if(counter != ~(Uint32)0)
  597.     getEpochState((Channel::Position)counter, prop );
  598.   prop->put("no_of_nodegroups", m_channel.getNoOfNodeGroups());
  599.   prop->put("subid", m_channel.getNoOfNodeGroups());
  600.   prop->put("subkey", m_channel.getSubKey());
  601.   prop->put("connected_db", m_connected);
  602.   prop->put("connected_rep", m_repConnected);
  603.   prop->put("state_sub", (int)m_channel.getStateSub());
  604.   prop->put("state", (int)m_channel.getState());    
  605.   
  606.   NdbMutex_Unlock(m_mutex);
  607.   return prop;
  608.   
  609. }
  610. void
  611. RepState::getEpochState(Channel::Position pos, Properties * p)
  612. {
  613.   char first_buf[20];
  614.   char last_buf[20];
  615.   int pos_first = 0, pos_last = 0;
  616.   Uint32 first = 0, last = 0;
  617.   for(Uint32 i = 0; i < m_channel.getNoOfNodeGroups() ; i++) 
  618.   {    
  619.     m_channel.getEpochState(pos, i, &first, &last);
  620.     pos_first += sprintf(first_buf+pos_first,"%d%s",first,",");
  621.     pos_last  += sprintf(last_buf+pos_last,"%d%s",last,",");    
  622.   }
  623. /**
  624.  * remove trailing comma
  625.  */
  626.   pos_first--;
  627.   pos_last--;
  628.   first_buf[pos_first]= '';
  629.   last_buf[pos_last]= '';
  630. #if 0
  631.   sprintf(first_buf+pos_first,"","");
  632.   sprintf(last_buf + pos_last,"","");    
  633. #endif
  634.   p->put("first", first_buf);
  635.   p->put("last", last_buf);
  636.   
  637. }
  638. void
  639. RepState::printStatus() 
  640. {
  641.   /***************************************************************************
  642.    * Global Status
  643.    ***************************************************************************/
  644.   ndbout << headerText;
  645.   switch (m_connected)
  646.   {
  647.   case CONNECTED: 
  648.     ndbout << "| Source:        Connected    "; break;
  649.   case DISCONNECTED:
  650.     ndbout << "| Source:        Disconnected "; break;
  651.   case CONNECTABLE:
  652.     ndbout << "| Source:        Disconnected "; break;
  653.   default:
  654.     ndbout << "| Source:        Unknown      "; break;
  655.   }
  656.   switch (m_repConnected)
  657.   {
  658.   case CONNECTED:
  659.     ndbout << "(Rep: Connected)       "; break;
  660.   case DISCONNECTED:
  661.     ndbout << "(Rep: Disconnected)    "; break;
  662.   case CONNECTABLE:
  663.     ndbout << "(Rep: Disconnected)    "; break;
  664.   default:
  665.     ndbout << "(Rep: Unknown)         "; break;
  666.   }
  667.   ndbout << "                     |" << endl;
  668.   ndbout << "| Autostart:     " << (isAutoStartEnabled() ? "On " : "Off")
  669.  << "                          ";
  670.   ndbout_c("  Silence time:  %10u |", m_connected_counter);
  671.   /***************************************************************************
  672.    * Channel Status
  673.    ***************************************************************************/
  674.   ndbout << channelHeaderText;
  675.   switch(m_channel.getStateSub()) {
  676.   case Channel::NO_SUBSCRIPTION_EXISTS:
  677.     ndbout_c("| Subscription:  Non-existing                      "
  678.      "                       |");
  679.     break;
  680.   case Channel::CREATING_SUBSCRIPTION_ID:
  681.     ndbout_c("| Subscription:  Non-existing (Id is being created)"
  682.      "                       |");
  683.     break;
  684.   case Channel::SUBSCRIPTION_ID_CREATED:
  685.     ndbout_c("| Subscription:  %-3d-%-6d in state: Not yet started     "
  686.      "                |", 
  687.      m_channel.getSubId(), m_channel.getSubKey());
  688.     break;
  689.   case Channel::STARTING_SUBSCRIPTION:
  690.     ndbout_c("| Subscription:  %-3d-%-6d in state: Being started       "
  691.      "                |", 
  692.      m_channel.getSubId(), m_channel.getSubKey());
  693.     break;
  694.   case Channel::SUBSCRIPTION_STARTED:
  695.     ndbout_c("| Subscription:  %-3d-%-6d in state: Started             "
  696.      "                |", 
  697.      m_channel.getSubId(), m_channel.getSubKey());
  698.     break;
  699.   default:
  700.     REPABORT("Illegal subscription state");
  701.   }
  702.   ndbout << "| Stop epoch:    ";
  703.   if (m_channel.getStopEpochId() == intervalMax) {
  704.     ndbout << "No stop defined                             "; 
  705.   } else {
  706.     ndbout.print("%-10d                                  ",
  707.  m_channel.getStopEpochId());
  708.   }
  709.   ndbout << "             |" << endl;
  710.   
  711.   ndbout << "| State:         ";
  712.   switch(m_channel.getState()) 
  713.   {
  714.   case Channel::CONSISTENT:     
  715.     ndbout << "Local database is subscription consistent   "; 
  716.     break;
  717.   case Channel::METALOG_STARTING:
  718.     ndbout << "Starting (Phase 1: Metalog starting)        "; 
  719.     break;
  720.   case Channel::METALOG_STARTED:
  721.     ndbout << "Starting (Phase 2: Metalog started)         "; 
  722.     break;
  723.   case Channel::METASCAN_STARTING:
  724.     ndbout << "Starting (Phase 3: Metascan starting)       "; 
  725.     break;
  726.   case Channel::METASCAN_COMPLETED:
  727.     ndbout << "Starting (Phase 4: Metascan completed)      "; 
  728.     break;
  729.   case Channel::DATALOG_STARTING:
  730.     ndbout << "Starting (Phase 5: Datalog starting)        "; 
  731.     break;
  732.   case Channel::DATALOG_STARTED:
  733.     ndbout << "Starting (Phase 6: Datalog started)         "; 
  734.     break;
  735.   case Channel::DATASCAN_STARTING:
  736.     ndbout << "Starting (Phase 7: Datascan completed)      "; 
  737.     break;
  738.   case Channel::DATASCAN_COMPLETED:
  739.     ndbout << "Starting (Phase 8: Datascan completed)      "; 
  740.     break;
  741.   case Channel::LOG:            
  742.     ndbout << "Logging                                     "; 
  743.     break;
  744.   case Channel::STOPPING:
  745.     ndbout << "Stopping (Stopped when all epochs applied)  ";
  746.     break;
  747.   }
  748.   ndbout << "             |" << endl;
  749. /* @todo
  750.   ndbout_c("| Syncable:      Yes/Scan/No/Unknown (Not implemented)"
  751.    "                    |");
  752. */
  753.   ndbout << "| Requestor:     " << (isEnabled() ? "On " : "Off") 
  754.  << " (Transfer: " << (isTransferEnabled() ? "On,  " : "Off, ")
  755.  << "Apply: " << (isApplyEnabled() ? "On,  " : "Off, ")
  756.  << "Delete: " << (isDeleteEnabled() ? "On) " : "Off)")
  757.  << "             |" << endl;
  758.   ndbout_c("| Tables being replicated using this channel:         "
  759.    "                    |");
  760.   m_channel.printTables();
  761.   /**
  762.    * Print node groups
  763.    */
  764.   if (getNoOfNodeGroups() == 0) 
  765.   {
  766.     ndbout_c("| No node groups are known.           "
  767.      "                                    |");
  768.   } 
  769.   else 
  770.   {
  771.     m_channel.print();
  772.   }
  773.   ndbout << line;
  774. }