AppNDB.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:15k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #include "AppNDB.hpp"
  14. #include <ConfigRetriever.hpp>
  15. #include <AttributeHeader.hpp>
  16. #include <NdbOperation.hpp>
  17. #include <NdbDictionaryImpl.hpp>
  18. #include <signaldata/RepImpl.hpp>
  19. #include <TransporterFacade.hpp>
  20. #include <trigger_definitions.h>
  21. #include <rep/storage/GCIPage.hpp>
  22. #include <rep/storage/GCIBuffer.hpp>
  23. #include <rep/rep_version.hpp>
  24. /*****************************************************************************
  25.  * Constructor / Destructor / Init
  26.  *****************************************************************************/
  27. AppNDB::~AppNDB() 
  28. {
  29.   delete m_tableInfoPs;
  30.   delete m_ndb;
  31.   m_tableInfoPs = 0;
  32. }
  33. AppNDB::AppNDB(GCIContainer * gciContainer, RepState * repState)
  34. {
  35.   m_gciContainer = gciContainer;
  36.   m_repState = repState;
  37.   m_cond = NdbCondition_Create();
  38.   m_started = true;
  39. }
  40. void 
  41. AppNDB::init(const char* connectString) {
  42.   //  NdbThread_SetConcurrencyLevel(1+ 2);
  43.   m_ndb = new Ndb("");
  44.   m_ndb->useFullyQualifiedNames(false);
  45.   m_ndb->setConnectString(connectString);
  46.   /**
  47.    * @todo  Set proper max no of transactions?? needed?? Default 12??
  48.    */
  49.   m_ndb->init(2048);
  50.   m_dict = m_ndb->getDictionary();
  51.   m_ownNodeId = m_ndb->getNodeId();
  52.   ndbout << "-- NDB Cluster -- REP node " << m_ownNodeId << " -- Version " 
  53.  << REP_VERSION_ID << " --" << endl;
  54.   ndbout_c("Connecting to NDB Cluster...");
  55.   if (m_ndb->waitUntilReady() != 0){
  56.     REPABORT("NDB Cluster not ready for connections");
  57.   }
  58.   ndbout_c("Phase 1 (AppNDB): Connection 1 to NDB Cluster opened (Applier)");
  59.   
  60.   m_tableInfoPs = new TableInfoPs();
  61.   m_applierThread = NdbThread_Create(runAppNDB_C,
  62.      (void**)this,
  63.      32768,
  64.      "AppNDBThread",
  65.      NDB_THREAD_PRIO_LOW);  
  66. }
  67. /*****************************************************************************
  68.  * Threads
  69.  *****************************************************************************/
  70. extern "C" 
  71. void* 
  72. runAppNDB_C(void * me)
  73. {
  74.   ((AppNDB *) me)->threadMainAppNDB();
  75.   NdbThread_Exit(0);
  76.   return me;
  77. }
  78. void 
  79. AppNDB::threadMainAppNDB() {
  80.   MetaRecord * mr;
  81.   LogRecord * lr;
  82.   GCIBuffer::iterator * itBuffer;
  83.   GCIPage::iterator * itPage;
  84.   GCIBuffer * buffer;
  85.   GCIPage * page;
  86.   Uint32 gci=0;
  87.   bool force;
  88.   while(true){
  89.     m_gciBufferList.lock();
  90.     if(m_gciBufferList.size()==0)
  91.       NdbCondition_Wait(m_cond, m_gciBufferList.getMutex());
  92.     m_gciBufferList.unlock();
  93.     
  94.     /**
  95.      * Do nothing if we are not started!
  96.      */
  97.     if(!m_started)
  98.       continue;
  99.     
  100.     if(m_gciBufferList.size()>0) {
  101.       m_gciBufferList.lock();
  102.       buffer = m_gciBufferList[0];
  103.       assert(buffer!=0);
  104.       if(buffer==0) {
  105. m_gciBufferList.unlock();      
  106. // stopApplier(GrepError::REP_APPLY_NULL_GCIBUFFER);
  107. return;
  108.       }
  109.       m_gciBufferList.unlock();      
  110.       RLOG(("Applying %d:[%d]", buffer->getId(), buffer->getGCI()));
  111.       gci = buffer->getGCI();
  112.       /**
  113.        * Do stuff with buffer
  114.        */
  115.      
  116.       force = buffer->m_force;
  117.       itBuffer = new GCIBuffer::iterator(buffer);
  118.       page = itBuffer->first();
  119.       
  120.       Record * record;     
  121.       while(page!=0 && m_started) {
  122.   
  123. itPage = new GCIPage::iterator(page);
  124. record = itPage->first();
  125. while(record!=0 && m_started) {
  126.   switch(Record::RecordType(record->recordType)) {
  127.   case Record::META:
  128.     mr  = (MetaRecord*)record;
  129.     if(applyMetaRecord(mr, gci) < 0){
  130.       /**
  131.        * If we fail with a meta record then 
  132.        * we should fail the replication!
  133.          */
  134.       //stopApplier(GrepError::REP_APPLY_METARECORD_FAILED);
  135.     }
  136.   break;
  137.   case Record::LOG:
  138.     lr  = (LogRecord*)record;
  139.     if(applyLogRecord(lr, force, gci) < 0) {
  140.       /**
  141.        * If we fail to apply a log record AND
  142.        * we have sent a ref to repstate event,
  143.        * then we should not try to apply another one!
  144.        */
  145. //       stopApplier(GrepError::REP_APPLY_LOGRECORD_FAILED);
  146.     }
  147.     break;
  148.   default:
  149.     REPABORT("Illegal record type");
  150.   };
  151.   record = itPage->next();
  152. }
  153. delete itPage;
  154. itPage = 0;
  155. page = itBuffer->next();
  156.       }
  157.       m_gciBufferList.erase(0, true);
  158.       /**
  159.        * "callback" to RepState to send REP_INSERT_GCIBUFFER_CONF
  160.        */
  161.       m_repState->eventInsertConf(buffer->getGCI(), buffer->getId());
  162.       delete itBuffer;
  163.       itBuffer = 0;
  164.       mr = 0;
  165.       lr = 0;
  166.       page = 0;
  167.       buffer = 0;
  168.     }
  169.   }
  170.  
  171. }
  172. void AppNDB::startApplier(){
  173.   m_started = true;
  174. }
  175. void AppNDB::stopApplier(GrepError::Code err){
  176.   m_started = false;
  177.   m_repState->eventInsertRef(0,0,0, err);
  178. }
  179. GrepError::Code
  180. AppNDB::applyBuffer(Uint32 nodeGrp, Uint32 epoch, Uint32 force)
  181. {
  182.   m_gciBufferList.lock();
  183.   GCIBuffer * buffer = m_gciContainer->getGCIBuffer(epoch, nodeGrp);
  184.   if (buffer == NULL) {
  185.     RLOG(("WARNING! Request to apply NULL buffer %d[%d]. Force %d", 
  186.   nodeGrp, epoch, force));
  187.     return GrepError::NO_ERROR;
  188.   }
  189.   if (!buffer->isComplete()) {
  190.     RLOG(("WARNING! Request to apply non-complete buffer %d[%d]. Force %d",
  191.   nodeGrp, epoch, force));
  192.     return GrepError::REP_APPLY_NONCOMPLETE_GCIBUFFER;
  193.   }
  194.   buffer->m_force = force;
  195.   assert(buffer!=0);
  196.   m_gciBufferList.push_back(buffer, false);
  197.   NdbCondition_Broadcast(m_cond);
  198.   m_gciBufferList.unlock();
  199.   return GrepError::NO_ERROR;
  200. }
  201. int
  202. AppNDB::applyLogRecord(LogRecord*  lr, bool force, Uint32 gci) 
  203. {
  204. #if 0
  205.   RLOG(("Applying log record (force %d, Op %d, GCI %d)", 
  206. force, lr->operation, gci));
  207. #endif
  208.   
  209.   int  retries =0;
  210.  retry:
  211.   if(retries == 10) {
  212.     m_repState->eventInsertRef(gci, 0, lr->tableId,     
  213.        GrepError::REP_APPLIER_EXECUTE_TRANSACTION);
  214.     return -1;
  215.   }
  216.   NdbConnection * trans = m_ndb->startTransaction();
  217.   if (trans == NULL) {
  218.     /**
  219.      * Transaction could not be started
  220.      * @todo Handle the error by:
  221.      *       1. Return error code
  222.      *       2. Print log message
  223.      *       3. On higher level indicate that DB has been tainted
  224.      */
  225.     ndbout_c("AppNDB: Send the following error msg to NDB Cluster support");
  226.     reportNdbError("Cannot start transaction!", trans->getNdbError());
  227.     m_repState->eventInsertRef(gci, 0, 0, 
  228.        GrepError::REP_APPLIER_START_TRANSACTION);
  229.     REPABORT("Can not start transaction");
  230.   }
  231.   
  232.   /**
  233.    * Resolve table name based on table id
  234.    */
  235.   const Uint32 tableId = lr->tableId;
  236.   const char * tableName = m_tableInfoPs->getTableName(tableId);
  237.   
  238.   /**
  239.    * Close trans and return if it is systab_0.
  240.    */
  241.   if (tableId == 0) {
  242.     RLOG(("WARNING! System table log record received"));
  243.     m_ndb->closeTransaction(trans);    
  244.     return -1;
  245.   }
  246.   
  247.   if (tableName==0) {
  248.     /**
  249.      * Table probably does not exist
  250.      * (Under normal operation this should not happen 
  251.      * since log records should not appear unless the 
  252.      * table has been created.)
  253.      *
  254.      * @todo Perhaps the table is not cached due to a restart,
  255.      *       so let's check in the dictionary if it exists.
  256.      */
  257.     m_ndb->closeTransaction(trans);
  258.     m_repState->eventInsertRef(gci, 0, tableId, 
  259.        GrepError::REP_APPLIER_NO_TABLE);
  260.     return -1;
  261.   }
  262.   
  263.   const NdbDictionary::Table * table  = m_dict->getTable(tableName);
  264.   
  265.   NdbOperation * op = trans->getNdbOperation(tableName);
  266.   if (op == NULL) {
  267.     ndbout_c("AppNDB: Send the following error msg to NDB Cluster support");
  268.     reportNdbError("Cannot get NdbOperation record",
  269.    trans->getNdbError());
  270.     m_repState->eventInsertRef(gci,0,tableId,
  271.        GrepError::REP_APPLIER_NO_OPERATION);
  272.     REPABORT("Can not get NdbOperation record");
  273.   }
  274.   
  275.   int check=0;
  276.   switch(lr->operation) {
  277.   case TriggerEvent::TE_INSERT: // INSERT
  278.     check = op->insertTuple();
  279.     break;
  280.   case TriggerEvent::TE_DELETE: // DELETE
  281.     check = op->deleteTuple();    
  282.     break;
  283.   case TriggerEvent::TE_UPDATE: // UPDATE
  284.     if (force) {
  285.       check = op->writeTuple();
  286.     } else {
  287.       check = op->updateTuple();
  288.     }
  289.     break;
  290.   case TriggerEvent::TE_CUSTOM: //SCAN
  291.     check = op->writeTuple();
  292.     break;
  293.   default:
  294.     m_ndb->closeTransaction(trans);
  295.     return -1;
  296.   };
  297.   if (check<0) {
  298.     ndbout_c("AppNDB: Something is weird");
  299.   }
  300.   
  301.   /**
  302.    * @todo index inside LogRecord struct somewhat prettier
  303.    * Now it 4 (sizeof(Uint32)), and 9 the position inside the struct 
  304.    * where the data starts.
  305.    */
  306.   AttributeHeader * ah=(AttributeHeader *)((char *)lr + sizeof(Uint32) * 9);
  307.   AttributeHeader *end = (AttributeHeader *)(ah + lr->attributeHeaderWSize); 
  308.   Uint32 * dataPtr = (Uint32 *)(end);
  309.   /**
  310.    *  @note attributeheader for operaration insert includes a duplicate
  311.    *  p.k.  The quick fix for this problem/bug is to skip the first set of 
  312.    *  of p.k, and start from the other set of P.Ks. Data is duplicated for
  313.    *  the p.k.
  314.    */
  315.   if (lr->operation == 0) {
  316.     for(int i = 0; i< table->getNoOfPrimaryKeys(); i++) {
  317.       ah+=ah->getHeaderSize();
  318.       dataPtr = dataPtr + ah->getDataSize();
  319.     }
  320.   }
  321.   while (ah < end) {
  322.     const NdbDictionary::Column * column = 
  323.       table->getColumn(ah->getAttributeId());
  324.     /**
  325.      * @todo: Here is a limitation. I don't care if it is a tuplekey 
  326.      * that is autogenerated or an ordinary pk. I just whack it in.
  327.      * However, this must be examined.
  328.      */
  329.     if(column->getPrimaryKey()) {
  330.       if(op->equal(ah->getAttributeId(), (const char *)dataPtr) < 0) {
  331. ndbout_c("AppNDB: Equal failed id %d op %d name %s, gci %d force %d", 
  332.  ah->getAttributeId(),
  333.  lr->operation,
  334.  column->getName(), gci, force);
  335. reportNdbError("Equal!", trans->getNdbError());
  336. }
  337.       
  338.     } else {
  339.       if(op->setValue(ah->getAttributeId(), (const char *)dataPtr) < 0)
  340.        ndbout_c("AppNDB: setvalue failed id %d op %d name %s, gci %d force %d",
  341. ah->getAttributeId(),
  342. lr->operation,
  343. column->getName(), gci, force);
  344.     }
  345.     
  346.     dataPtr = dataPtr + ah->getDataSize();
  347.     ah = ah + ah->getHeaderSize() ;
  348.   }
  349.   
  350.   if(trans->execute(Commit) != 0) {
  351.     /**
  352.      * Transaction commit failure
  353.      */
  354.       const NdbError err = trans->getNdbError();
  355.       m_ndb->closeTransaction(trans);      
  356.       switch(err.status){
  357.       case NdbError::Success:
  358. {
  359.   m_repState->eventInsertRef(gci, 0, tableId,     
  360.      GrepError::REP_APPLIER_EXECUTE_TRANSACTION);
  361.   return -1;
  362. }
  363.         break;
  364.       case NdbError::TemporaryError:      
  365. {
  366.   NdbSleep_MilliSleep(50);
  367.   retries++;
  368.   goto retry;
  369. }
  370. break;
  371.       case NdbError::UnknownResult:
  372. {
  373.   ndbout_c("AppNDB: Send the following error msg to NDB Cluster support");
  374.   reportNdbError("Execute transaction failed!",
  375.  trans->getNdbError());
  376.   m_repState->eventInsertRef(gci, 0, tableId,     
  377.      GrepError::REP_APPLIER_EXECUTE_TRANSACTION);
  378.   return -1;
  379. }
  380. break;
  381.       case NdbError::PermanentError: 
  382. {
  383.   if(err.code == 626) {
  384.     if(force && lr->operation == TriggerEvent::TE_DELETE) /**delete*/ {
  385.       /**tuple was not found. Ignore this, since 
  386.        * we are trying to apply a "delete a tuple"-log record before 
  387.        * having applied the scan data.
  388.        */
  389.       return -1;
  390.     }
  391.   }
  392.   ndbout_c("AppNDB: Send the following error msg to NDB Cluster support");      reportNdbError("Execute transaction failed!",
  393.  trans->getNdbError());
  394.   ndbout_c("nnAppNDB: RepNode will now crash.");
  395.   m_ndb->closeTransaction(trans);
  396.   m_repState->eventInsertRef(gci, 0, tableId,     
  397.      GrepError::REP_APPLIER_EXECUTE_TRANSACTION);
  398.   return -1;
  399. }
  400.       break;
  401.       }
  402.   }
  403.   /**
  404.    * No errors. Close transaction and continue in applierThread.
  405.    */
  406.   m_ndb->closeTransaction(trans); 
  407.   return 1;
  408. }
  409. int
  410. AppNDB::applyMetaRecord(MetaRecord*  mr, Uint32 gci) 
  411. {
  412.   /**
  413.    * Validate table id
  414.    */
  415.   Uint32 tableId = mr->tableId;
  416.   if (tableId==0) {
  417.     RLOG(("WARNING! Meta record contained record with tableId 0"));
  418.     return 0;
  419.   }
  420.   
  421.   /**
  422.    * Prepare meta record 
  423.    */
  424.   NdbDictionary::Table * table = prepareMetaRecord(mr);
  425.   if(table == 0) {
  426.     RLOG(("WARNING! Prepare table meta record failed for table %d", tableId));
  427.     m_dict->getNdbError();
  428.     m_repState->eventInsertRef(gci,0,tableId, 
  429.        GrepError::REP_APPLIER_PREPARE_TABLE);
  430.     return -1;
  431.   }
  432.   
  433.   /**
  434.    * Table does not exist in TableInfoPs -> add it
  435.    */
  436.   if(m_tableInfoPs->getTableName(tableId)==0) {
  437.     RLOG(("Table %d:%s added to m_tableInfoPs", tableId, table->getName()));
  438.     m_tableInfoPs->insert(tableId,table->getName());
  439.   }
  440.   
  441.   /**
  442.    * Validate that table does not exist in Dict
  443.    */
  444.   const NdbDictionary::Table * tmpTable = m_dict->getTable(table->getName());
  445.   if(tmpTable !=0) {
  446.     /**
  447.      * Oops, a table with the same name exists
  448.      */
  449.     if(tmpTable->getObjectVersion()!=table->getObjectVersion()) {
  450.       char buf[100];
  451.       sprintf(buf,"WARNING! Another version of table %d:%s already exists."
  452.       "Currently, we dont support versions, so will abort now!",
  453.        tableId, table->getName());
  454.     
  455.       REPABORT(buf);
  456.     }
  457.     RLOG(("WARNING! An identical table %d:%s already exists.", 
  458.   tableId, table->getName()));
  459.     return -1;
  460.   }
  461.   /**
  462.    * @todo WARNING! Should scan table MR for columns that are not supported
  463.    */
  464.   /*
  465.   NdbDictionary::Column * column;
  466.   
  467.   for(int i=0; i<table->getNoOfColumns(); i++) {
  468.     column = table->getColumn(i);
  469.     if(column->getAutoIncrement()) {
  470.       reportWarning(table->getName(), column->getName(),
  471.     "Uses AUTOINCREMENT of PK");   
  472.     }
  473.   }
  474.   */
  475.     
  476.   
  477.   /**
  478.    * Create table
  479.    */
  480.   if(m_dict->createTable(*table)<0) {
  481.     ndbout_c("AppNDB: Send the following error msg to NDB Cluster support");
  482.     reportNdbError("Create table failed!", m_dict->getNdbError());
  483.     m_repState->eventCreateTableRef(gci, 
  484.     tableId,
  485.     table->getName(),   
  486.     GrepError::REP_APPLIER_CREATE_TABLE);
  487.     return -1;
  488.   }
  489.   
  490.   RLOG(("Table %d:%s created", tableId, table->getName()));
  491.   return 0;
  492. }
  493. NdbDictionary::Table*
  494. AppNDB::prepareMetaRecord(MetaRecord* mr) {
  495.   NdbTableImpl * tmp = 0;
  496.   NdbDictionary::Table * table =0;
  497.   Uint32 * data =(Uint32*)( ((char*)mr + sizeof(Uint32)*6));
  498.   int res = NdbDictInterface::parseTableInfo(&tmp, data, mr->dataLen,
  499.      m_ndb->usingFullyQualifiedNames());
  500.   if(res == 0) {
  501.     table = tmp;
  502.     return table;
  503.   } else{
  504.     return 0;
  505.   }
  506. }
  507. void 
  508. AppNDB::reportNdbError(const char * msg, const NdbError & err) {
  509.   ndbout_c("%s : Error code %d , error message %s", 
  510.    msg, err.code,
  511.    (err.message ? err.message : ""));  
  512. }
  513. void 
  514. AppNDB::reportWarning(const char * tableName, const char * message) {
  515.   ndbout_c("WARNING: Table %s, %s", tableName, message); 
  516. }
  517. void 
  518. AppNDB::reportWarning(const char * tableName, const char * columnName,
  519.        const char * message) {
  520.   ndbout_c("WARNING: Table %s, column %s, %s", tableName, columnName,message);
  521. }
  522. int 
  523. AppNDB::dropTable(Uint32 tableId) 
  524. {
  525.   char * tableName = m_tableInfoPs->getTableName(tableId);
  526.   if(tableName == 0) return -1;
  527.   ndbout_c("AppNDB: Dropping table ");
  528.   if(m_dict->dropTable(tableName) != 0) {
  529.     reportNdbError("Failed dropping table",m_dict->getNdbError());
  530.     return -1;
  531.   }
  532.   m_tableInfoPs->del(tableId);
  533.   return 1;
  534. }