Channel.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:12k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #include "Channel.hpp"
  14. Channel::Channel() 
  15. {
  16.   reset();
  17. }
  18. Channel::~Channel() 
  19. {
  20.   /**
  21.    * Destroy list of selected tables
  22.    */
  23.   for(Uint32 i=0; i < m_selectedTables.size(); i++) {
  24.     delete m_selectedTables[i];
  25.     m_selectedTables[i] = 0;
  26.   }
  27.   m_selectedTables=0;
  28. }
  29. void
  30. Channel::reset() 
  31. {
  32.   for (Uint32 i=0; i<MAX_NO_OF_NODE_GROUPS; i++) {
  33.     for (Uint32 j=0; j<NO_OF_POSITIONS; j++) {
  34.       state[i][j].set(1,0);
  35.     }
  36.   }
  37.   m_noOfNodeGroups = 0;
  38.   m_requestorEnabled = true;
  39.   m_transferEnabled = true;
  40.   m_applyEnabled = true;
  41.   m_deleteEnabled = true;
  42.   m_autoStartEnabled = false;
  43.   m_stopEpochId = intervalMax;
  44.   setSubKey(0);
  45.   setSubId(0);
  46.   m_stateSub = NO_SUBSCRIPTION_EXISTS;
  47.   m_stateRep = CONSISTENT;
  48.   m_metaScanEpochs = emptyInterval;
  49.   m_dataScanEpochs = emptyInterval;
  50. }
  51. bool 
  52. Channel::requestTransfer(Uint32 nodeGrp, Interval * i)
  53. {
  54.   invariant();
  55.   Interval tmp1, tmp2;
  56.   // i = PS - SSReq - SS - App
  57.   intervalLeftMinus(state[nodeGrp][PS], state[nodeGrp][SSReq], &tmp1);
  58.   intervalLeftMinus(tmp1, state[nodeGrp][SS], &tmp2);
  59.   intervalLeftMinus(tmp2, state[nodeGrp][App], i);
  60.   
  61.   i->onlyLeft(GREP_SYSTEM_TABLE_MAX_RANGE);
  62.   i->onlyUpToValue(m_stopEpochId);
  63.   if (i->isEmpty()) return false;
  64.   add(SSReq, nodeGrp, *i);
  65.   invariant();
  66.   return true;
  67. }
  68. bool 
  69. Channel::requestApply(Uint32 nodeGrp, Uint32 * epoch)
  70. {
  71.   invariant();
  72.   Interval tmp1, tmp2;
  73.   // tmp2 = SS - AppReq - App
  74.   intervalLeftMinus(state[nodeGrp][SS], state[nodeGrp][AppReq], &tmp1);
  75.   intervalLeftMinus(tmp1, state[nodeGrp][App], &tmp2);
  76.   tmp2.onlyUpToValue(m_stopEpochId);
  77.   if (tmp2.isEmpty()) return false;
  78.   tmp2.onlyLeft(1);
  79.   // Check that all GCI Buffers for epoch exists in SS
  80.   for (Uint32 i=0; i<m_noOfNodeGroups; i++) {
  81.     if (!state[nodeGrp][SS].inInterval(tmp2.first()))
  82.       return false;
  83.   }
  84.   invariant();
  85.   add(AppReq, nodeGrp, tmp2);
  86.   invariant();
  87.   *epoch = tmp2.first();
  88.   return true;
  89. }
  90. bool 
  91. Channel::requestDelete(Uint32 nodeGrp, Interval * i)
  92. {
  93.   invariant();
  94.   Interval tmp1;
  95.   // i = (App cut PS) - DelReq
  96.   intervalCut(state[nodeGrp][App], state[nodeGrp][PS], &tmp1);
  97.   intervalLeftMinus(tmp1, state[nodeGrp][DelReq], i);
  98.   
  99.   if (i->isEmpty()) return false;
  100.   i->onlyLeft(GREP_SYSTEM_TABLE_MAX_RANGE);
  101.   invariant();
  102.   add(DelReq, nodeGrp, *i);
  103.   invariant();
  104.   return true;
  105. }
  106. void 
  107. Channel::add(Position pos, Uint32 nodeGrp, const Interval i) 
  108. {
  109.   Interval r;
  110.   intervalAdd(state[nodeGrp][pos], i, &r);
  111.   state[nodeGrp][pos].set(r);
  112. }
  113. void
  114. Channel::clear(Position p, Uint32 nodeGrp, const Interval i)
  115. {
  116.   Interval r;
  117.   intervalLeftMinus(state[nodeGrp][p], i, &r);
  118.   state[nodeGrp][p].set(r);
  119. }
  120. bool
  121. Channel::isSynchable(Uint32 nodeGrp)
  122. {
  123.   return true;
  124.   /*
  125.     @todo This should be implemented...
  126.     Interval tmp1, tmp2;
  127.     intervalAdd(state[nodeGrp][PS], state[nodeGrp][SSReq], &tmp1);
  128.     intervalAdd(tmp1, state[nodeGrp][SSReq], &tmp2);
  129.     intervalAdd(tmp2, state[nodeGrp][SS], &tmp1);
  130.     intervalAdd(tmp1, state[nodeGrp][AppReq], &tmp2);
  131.     intervalAdd(tmp2, state[nodeGrp][App], &tmp1);
  132.     if (intervalInclude(state[nodeGrp][PS], tmp1.right()))
  133.     return true;
  134.     else 
  135.     return false;
  136.   */
  137. }
  138. /**
  139.  * Return the cut of all App:s.
  140.  */
  141. void
  142. Channel::getFullyAppliedEpochs(Interval * interval)
  143. {
  144.   if (m_noOfNodeGroups < 1) {
  145.     *interval = emptyInterval;
  146.     return;
  147.   }
  148.   *interval = universeInterval;
  149.   for (Uint32 i=0; i<m_noOfNodeGroups; i++) {
  150.      if (state[i][App].isEmpty()) {
  151.        *interval = emptyInterval;
  152.       return;
  153.     }
  154.     
  155.      if (interval->first() < state[i][App].first()) {
  156.        interval->setFirst(state[i][App].first());
  157.      }
  158.      if (state[i][App].last() < interval->last()) {
  159.        interval->setLast(state[i][App].last());
  160.      }
  161.   }
  162.   interval->normalize();
  163.   return;
  164. }
  165. /**
  166.  * Return true if it is ok to remove the subscription and then stop channel
  167.  */
  168. bool 
  169. Channel::isStoppable()
  170. {
  171.   /**
  172.    * Check that AppReq are empty for all nodegrps
  173.    */
  174.   for (Uint32 i=0; i<m_noOfNodeGroups; i++) {
  175.     if (!state[i][AppReq].isEmpty()) {
  176.       RLOG(("Stop disallowed. AppReq is non-empty"));
  177.       return false;
  178.     }
  179.   }
  180.   /**
  181.    * If stop immediately, then it is ok to stop now
  182.    */
  183.   if (m_stopEpochId == 0) {
  184.     RLOG(("Stop allowed. AppReq empty and immediate stop requested"));
  185.     return true;
  186.   }
  187.   /**
  188.    * If stop on a certain epoch, then 
  189.    * check that stopEpochId is equal to the last applied epoch
  190.    */
  191.   Interval interval;
  192.   getFullyAppliedEpochs(&interval);
  193.   if (m_stopEpochId > interval.last()) {
  194.     RLOG(("Stop disallowed. AppReq empty. Stop %d, LastApplied %d",
  195.   m_stopEpochId, interval.last()));
  196.     return false;
  197.   }
  198.   return true;
  199. }
  200. GrepError::Code
  201. Channel::setStopEpochId(Uint32 n) 
  202. {
  203.   /**
  204.    * If n equal to zero, use next possible epoch (max(App, AppReq))
  205.    */
  206.   if (n == 0) {
  207.     for (Uint32 i=0; i<m_noOfNodeGroups; i++) {
  208.       n = (state[i][App].last() > n) ? state[i][App].last() : n;
  209.       n = (state[i][AppReq].last() > n) ? state[i][AppReq].last() : n;
  210.     }
  211.   }
  212.   /**
  213.    *  If n >= max(App, AppReq) then set value, else return error code
  214.    */
  215.   for (Uint32 i=0; i<m_noOfNodeGroups; i++) {
  216.     if (n < state[i][App].last()) return GrepError::ILLEGAL_STOP_EPOCH_ID;
  217.     if (n < state[i][AppReq].last()) return GrepError::ILLEGAL_STOP_EPOCH_ID;
  218.   }
  219.   m_stopEpochId = n;
  220.   return GrepError::NO_ERROR;
  221. };
  222. bool 
  223. Channel::shouldStop() 
  224. {
  225.   /**
  226.    * If (m_stopEpochId == App) then channel should stop
  227.    */
  228.   for (Uint32 i=0; i<m_noOfNodeGroups; i++) {
  229.     if(m_stopEpochId != state[i][App].last()) return false;
  230.   }
  231.   return true;
  232. }
  233. /*****************************************************************************
  234.  * SELECTIVE TABLE INTERFACE
  235.  *****************************************************************************/
  236. GrepError::Code  
  237. Channel::addTable(const char * tableName)
  238. {
  239.   if(strlen(tableName)>MAX_TAB_NAME_SIZE)
  240.     return GrepError::REP_NOT_PROPER_TABLE;
  241.   /**
  242.    * No of separators are the number of table_name_separator found in tableName
  243.    * since a table is defined as <db>/<schema>/tablename.
  244.    * if noOfSeparators is not equal to 2, then it is not a valid
  245.    * table name.
  246.    */
  247.   Uint32 noOfSeps = 0;
  248.   if(strlen(tableName) < 5)
  249.     return GrepError::REP_NOT_PROPER_TABLE;
  250.   for(Uint32 i =0; i < strlen(tableName); i++)
  251.     if(tableName[i]==table_name_separator)
  252.       noOfSeps++;
  253.   if(noOfSeps!=2)
  254.     return GrepError::REP_NOT_PROPER_TABLE;
  255.   table * t= new table(tableName);
  256.   for(Uint32 i=0; i<m_selectedTables.size(); i++) {
  257.     if(strcmp(tableName, m_selectedTables[i]->tableName)==0)
  258.       return GrepError::REP_TABLE_ALREADY_SELECTED;
  259.   }
  260.   m_selectedTables.push_back(t);
  261.   return GrepError::NO_ERROR;
  262. }
  263. GrepError::Code  
  264. Channel::removeTable(const char * tableName)
  265. {
  266.   if(strlen(tableName)>MAX_TAB_NAME_SIZE)
  267.     return GrepError::REP_NOT_PROPER_TABLE;
  268.   /**
  269.    * No of separators are the number of table_name_separator found in tableName
  270.    * since a table is defined as <db>/<schema>/tablename.
  271.    * If noOfSeparators is not equal to 2, 
  272.    * then it is not a valid table name.
  273.    */
  274.   Uint32 noOfSeps = 0;
  275.   if(strlen(tableName) < 5)
  276.     return GrepError::REP_NOT_PROPER_TABLE;
  277.   for(Uint32 i =0; i < strlen(tableName); i++)
  278.     if(tableName[i]==table_name_separator)
  279.       noOfSeps++;
  280.   if(noOfSeps!=2)
  281.     return GrepError::REP_NOT_PROPER_TABLE;
  282.   for(Uint32 i=0; i<m_selectedTables.size(); i++) {
  283.     if(strcmp(tableName, m_selectedTables[i]->tableName)==0) {
  284.       delete m_selectedTables[i];
  285.       m_selectedTables.erase(i);
  286.       return GrepError::NO_ERROR;
  287.     }
  288.   }
  289.   return GrepError::REP_TABLE_NOT_FOUND;
  290. }
  291. void 
  292. Channel::printTables() 
  293. {
  294.   if(m_selectedTables.size() == 0)
  295.     ndbout_c("|   ALL TABLES                                        "
  296.      "                    |");
  297.   else {
  298.     for(Uint32 i=0; i<m_selectedTables.size(); i++)
  299.       ndbout_c("|   %-69s |", m_selectedTables[i]->tableName);
  300.   }
  301. }
  302. Vector<struct table *> *  
  303. Channel::getSelectedTables() 
  304. {
  305.   if(m_selectedTables.size() == 0) return 0;
  306.   return &m_selectedTables;
  307. }
  308. /*****************************************************************************
  309.  * PRINT
  310.  *****************************************************************************/
  311. void 
  312. Channel::print(Position pos) 
  313. {
  314.   switch(pos){
  315.   case PS:      ndbout << "PS Rep"; break;
  316.   case SSReq:   ndbout << "Tra-Req"; break;
  317.   case SS:      ndbout << "SS Rep"; break;
  318.   case AppReq:  ndbout << "App-Req"; break;
  319.   case App:     ndbout << "Applied"; break;
  320.   case DelReq:  ndbout << "Del-Req"; break;
  321.   default:      REPABORT("Unknown replication position");
  322.   }
  323. }
  324. void 
  325. Channel::print() 
  326. {
  327.   for (Uint32 i=0; i<m_noOfNodeGroups; i++) {
  328.     print(i); 
  329.   }
  330. }
  331. void 
  332. Channel::print(Position pos, Uint32 nodeGrp) 
  333. {
  334.   print(pos); 
  335.   if (state[nodeGrp][pos].first() == 1 && state[nodeGrp][pos].last() == 0) {
  336.     ndbout << " EMPTY";
  337.   } else {
  338.     ndbout << " [" << state[nodeGrp][pos].first() << "-" 
  339.    << state[nodeGrp][pos].last() << "]";
  340.   }
  341. }
  342. static const char* 
  343. channelline =
  344. "+-------------------------------------------------------------------------+n"
  345. ;
  346. void
  347. Channel::getEpochState(Position p, 
  348.        Uint32 nodeGrp, 
  349.        Uint32 * first,
  350.        Uint32 * last) {
  351.   if(state[nodeGrp][p].isEmpty()) {
  352.     *first = 1;
  353.     *last  = 0;
  354.     return;
  355.   }  
  356.   *first = state[nodeGrp][p].first();
  357.   *last  = state[nodeGrp][p].last();
  358. }
  359. void 
  360. Channel::print(Uint32 nodeGrp) 
  361. {
  362.   ndbout << channelline;
  363.   ndbout_c("|                        |       Meta scan       |"
  364.    "        Data scan       |");
  365.   ndbout.print("|                        ");
  366.   if (m_metaScanEpochs.isEmpty()) {
  367.     ndbout.print("|                       ");
  368.   } else {
  369.     ndbout.print("| %10u-%-10u ",
  370.  m_metaScanEpochs.first(), m_metaScanEpochs.last());
  371.   }
  372.   if (m_dataScanEpochs.isEmpty()) {
  373.     ndbout_c("|                        |");
  374.   } else {
  375.     ndbout_c("|  %10u-%-10u |",
  376.      m_dataScanEpochs.first(), m_dataScanEpochs.last());
  377.   }
  378.   /* --- */
  379.   ndbout << channelline;
  380.   ndbout_c("|    Source Rep Server   |    Being Transfered   |"
  381.    " Destination Rep Server |");
  382.   if (state[nodeGrp][PS].isEmpty()) {
  383.     ndbout.print("|                        ");
  384.   } else {
  385.     ndbout.print("|  %10u-%-10u ",
  386.  state[nodeGrp][PS].first(), state[nodeGrp][PS].last());
  387.   }
  388.   if (state[nodeGrp][SSReq].isEmpty()) {
  389.     ndbout.print("|                       ");
  390.   } else {
  391.     ndbout.print("| %10u-%-10u ",
  392.  state[nodeGrp][SSReq].first(), state[nodeGrp][SSReq].last());
  393.   }
  394.   if (state[nodeGrp][SS].isEmpty()) {
  395.     ndbout_c("|                        |");
  396.   } else {
  397.     ndbout_c("|  %10u-%-10u |",
  398.      state[nodeGrp][SS].first(), state[nodeGrp][SS].last());
  399.   }
  400.   /* --- */
  401.   ndbout << channelline;
  402.   ndbout_c("|      Being Applied     |        Applied        |"
  403.    "      Being Deleted     |");
  404.   if (state[nodeGrp][AppReq].isEmpty()) {
  405.     ndbout.print("|                        ");
  406.   } else {
  407.     ndbout.print("|  %10u-%-10u ", state[nodeGrp][AppReq].first(), 
  408.  state[nodeGrp][AppReq].last());
  409.   }
  410.   if (state[nodeGrp][App].isEmpty()) {
  411.     ndbout.print("|                       ");
  412.   } else {
  413.     ndbout.print("| %10u-%-10u ",
  414.  state[nodeGrp][App].first(), state[nodeGrp][App].last());
  415.   }
  416.   if (state[nodeGrp][DelReq].isEmpty()) {
  417.     ndbout_c("|                        |");
  418.   } else {
  419.     ndbout_c("|  %10u-%-10u |",
  420.      state[nodeGrp][DelReq].first(), state[nodeGrp][DelReq].last());
  421.   }
  422. }
  423. /*****************************************************************************
  424.  * Private Methods
  425.  *****************************************************************************/
  426. void
  427. Channel::invariant() 
  428. {
  429.   for (Uint32 j=0; j<MAX_NO_OF_NODE_GROUPS; j++) 
  430.   {
  431.     if (!intervalDisjoint(state[j][SSReq], state[j][SS]))
  432.       REPABORT("Invariant 1 violated");
  433.     if (!intervalDisjoint(state[j][AppReq], state[j][App]))
  434.       REPABORT("Invariant 2 violated");
  435.   }
  436. }