consumer_restore.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:16k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #include <NDBT_ReturnCodes.h>
  14. #include "consumer_restore.hpp"
  15. #include <NdbSleep.h>
  16. extern my_bool opt_core;
  17. extern FilteredNdbOut err;
  18. extern FilteredNdbOut info;
  19. extern FilteredNdbOut debug;
  20. static void callback(int, NdbConnection*, void*);
  21. bool
  22. BackupRestore::init()
  23. {
  24.   release();
  25.   if (!m_restore && !m_restore_meta)
  26.     return true;
  27.   m_ndb = new Ndb();
  28.   if (m_ndb == NULL)
  29.     return false;
  30.   
  31.   m_ndb->init(1024);
  32.   if (m_ndb->waitUntilReady(30) != 0)
  33.   {
  34.     err << "Failed to connect to ndb!!" << endl;
  35.     return false;
  36.   }
  37.   info << "Connected to ndb!!" << endl;
  38.   m_callback = new restore_callback_t[m_parallelism];
  39.   if (m_callback == 0)
  40.   {
  41.     err << "Failed to allocate callback structs" << endl;
  42.     return false;
  43.   }
  44.   m_free_callback= m_callback;
  45.   for (Uint32 i= 0; i < m_parallelism; i++) {
  46.     m_callback[i].restore= this;
  47.     m_callback[i].connection= 0;
  48.     if (i > 0)
  49.       m_callback[i-1].next= &(m_callback[i]);
  50.   }
  51.   m_callback[m_parallelism-1].next = 0;
  52.   return true;
  53. }
  54. void BackupRestore::release()
  55. {
  56.   if (m_ndb)
  57.   {
  58.     delete m_ndb;
  59.     m_ndb= 0;
  60.   }
  61.   if (m_callback)
  62.   {
  63.     delete [] m_callback;
  64.     m_callback= 0;
  65.   }
  66. }
  67. BackupRestore::~BackupRestore()
  68. {
  69.   release();
  70. }
  71. static
  72. int 
  73. match_blob(const char * name){
  74.   int cnt, id1, id2;
  75.   char buf[256];
  76.   if((cnt = sscanf(name, "%[^/]/%[^/]/NDB$BLOB_%d_%d", buf, buf, &id1, &id2)) == 4){
  77.     return id1;
  78.   }
  79.   
  80.   return -1;
  81. }
  82. const NdbDictionary::Table*
  83. BackupRestore::get_table(const NdbDictionary::Table* tab){
  84.   if(m_cache.m_old_table == tab)
  85.     return m_cache.m_new_table;
  86.   m_cache.m_old_table = tab;
  87.   int cnt, id1, id2;
  88.   char db[256], schema[256];
  89.   if((cnt = sscanf(tab->getName(), "%[^/]/%[^/]/NDB$BLOB_%d_%d", 
  90.    db, schema, &id1, &id2)) == 4){
  91.     m_ndb->setDatabaseName(db);
  92.     m_ndb->setSchemaName(schema);
  93.     
  94.     BaseString::snprintf(db, sizeof(db), "NDB$BLOB_%d_%d", 
  95.  m_new_tables[id1]->getTableId(), id2);
  96.     
  97.     m_cache.m_new_table = m_ndb->getDictionary()->getTable(db);
  98.     
  99.   } else {
  100.     m_cache.m_new_table = m_new_tables[tab->getTableId()];
  101.   }
  102.   assert(m_cache.m_new_table);
  103.   return m_cache.m_new_table;
  104. }
  105. bool
  106. BackupRestore::finalize_table(const TableS & table){
  107.   bool ret= true;
  108.   if (!m_restore && !m_restore_meta)
  109.     return ret;
  110.   if (table.have_auto_inc())
  111.   {
  112.     Uint64 max_val= table.get_max_auto_val();
  113.     Uint64 auto_val= m_ndb->readAutoIncrementValue(get_table(table.m_dictTable));
  114.     if (max_val+1 > auto_val || auto_val == ~(Uint64)0)
  115.       ret= m_ndb->setAutoIncrementValue(get_table(table.m_dictTable), max_val+1, false);
  116.   }
  117.   return ret;
  118. }
  119. bool
  120. BackupRestore::table(const TableS & table){
  121.   if (!m_restore && !m_restore_meta)
  122.     return true;
  123.   const char * name = table.getTableName();
  124.   
  125.   /**
  126.    * Ignore blob tables
  127.    */
  128.   if(match_blob(name) >= 0)
  129.     return true;
  130.   
  131.   const NdbTableImpl & tmptab = NdbTableImpl::getImpl(* table.m_dictTable);
  132.   if(tmptab.m_indexType != NdbDictionary::Index::Undefined){
  133.     m_indexes.push_back(table.m_dictTable);
  134.     return true;
  135.   }
  136.   
  137.   BaseString tmp(name);
  138.   Vector<BaseString> split;
  139.   if(tmp.split(split, "/") != 3){
  140.     err << "Invalid table name format " << name << endl;
  141.     return false;
  142.   }
  143.   m_ndb->setDatabaseName(split[0].c_str());
  144.   m_ndb->setSchemaName(split[1].c_str());
  145.   
  146.   NdbDictionary::Dictionary* dict = m_ndb->getDictionary();
  147.   if(m_restore_meta){
  148.     NdbDictionary::Table copy(*table.m_dictTable);
  149.     copy.setName(split[2].c_str());
  150.     if (dict->createTable(copy) == -1) 
  151.     {
  152.       err << "Create table " << table.getTableName() << " failed: "
  153.   << dict->getNdbError() << endl;
  154.       return false;
  155.     }
  156.     info << "Successfully restored table " << table.getTableName()<< endl ;
  157.   }  
  158.   
  159.   const NdbDictionary::Table* tab = dict->getTable(split[2].c_str());
  160.   if(tab == 0){
  161.     err << "Unable to find table: " << split[2].c_str() << endl;
  162.     return false;
  163.   }
  164.   if(m_restore_meta){
  165.     m_ndb->setAutoIncrementValue(tab, ~(Uint64)0, false);
  166.   }
  167.   const NdbDictionary::Table* null = 0;
  168.   m_new_tables.fill(table.m_dictTable->getTableId(), null);
  169.   m_new_tables[table.m_dictTable->getTableId()] = tab;
  170.   return true;
  171. }
  172. bool
  173. BackupRestore::endOfTables(){
  174.   if(!m_restore_meta)
  175.     return true;
  176.   NdbDictionary::Dictionary* dict = m_ndb->getDictionary();
  177.   for(size_t i = 0; i<m_indexes.size(); i++){
  178.     const NdbTableImpl & indtab = NdbTableImpl::getImpl(* m_indexes[i]);
  179.     BaseString tmp(indtab.m_primaryTable.c_str());
  180.     Vector<BaseString> split;
  181.     if(tmp.split(split, "/") != 3){
  182.       err << "Invalid table name format " << indtab.m_primaryTable.c_str()
  183.   << endl;
  184.       return false;
  185.     }
  186.     
  187.     m_ndb->setDatabaseName(split[0].c_str());
  188.     m_ndb->setSchemaName(split[1].c_str());
  189.     
  190.     const NdbDictionary::Table * prim = dict->getTable(split[2].c_str());
  191.     if(prim == 0){
  192.       err << "Unable to find base table "" << split[2].c_str() 
  193.   << "" for index "
  194.   << indtab.getName() << endl;
  195.       return false;
  196.     }
  197.     NdbTableImpl& base = NdbTableImpl::getImpl(*prim);
  198.     NdbIndexImpl* idx;
  199.     int id;
  200.     char idxName[255], buf[255];
  201.     if(sscanf(indtab.getName(), "%[^/]/%[^/]/%d/%s",
  202.       buf, buf, &id, idxName) != 4){
  203.       err << "Invalid index name format " << indtab.getName() << endl;
  204.       return false;
  205.     }
  206.     if(NdbDictInterface::create_index_obj_from_table(&idx, &indtab, &base))
  207.     {
  208.       err << "Failed to create index " << idxName
  209.   << " on " << split[2].c_str() << endl;
  210. return false;
  211.     }
  212.     idx->setName(idxName);
  213.     if(dict->createIndex(* idx) != 0)
  214.     {
  215.       delete idx;
  216.       err << "Failed to create index " << idxName
  217.   << " on " << split[2].c_str() << endl
  218.   << dict->getNdbError() << endl;
  219.       return false;
  220.     }
  221.     delete idx;
  222.     info << "Successfully created index " << idxName
  223.  << " on " << split[2].c_str() << endl;
  224.   }
  225.   return true;
  226. }
  227. void BackupRestore::tuple(const TupleS & tup)
  228. {
  229.   if (!m_restore) 
  230.     return;
  231.   while (m_free_callback == 0)
  232.   {
  233.     assert(m_transactions == m_parallelism);
  234.     // send-poll all transactions
  235.     // close transaction is done in callback
  236.     m_ndb->sendPollNdb(3000, 1);
  237.   }
  238.   
  239.   restore_callback_t * cb = m_free_callback;
  240.   
  241.   if (cb == 0)
  242.     assert(false);
  243.   
  244.   m_free_callback = cb->next;
  245.   cb->retries = 0;
  246.   cb->tup = tup; // must do copy!
  247.   tuple_a(cb);
  248. }
  249. void BackupRestore::tuple_a(restore_callback_t *cb)
  250. {
  251.   while (cb->retries < 10) 
  252.   {
  253.     /**
  254.      * start transactions
  255.      */
  256.     cb->connection = m_ndb->startTransaction();
  257.     if (cb->connection == NULL) 
  258.     {
  259.       if (errorHandler(cb)) 
  260.       {
  261. m_ndb->sendPollNdb(3000, 1);
  262. continue;
  263.       }
  264.       exitHandler();
  265.     } // if
  266.     
  267.     const TupleS &tup = cb->tup;
  268.     const NdbDictionary::Table * table = get_table(tup.getTable()->m_dictTable);
  269.     NdbOperation * op = cb->connection->getNdbOperation(table);
  270.     
  271.     if (op == NULL) 
  272.     {
  273.       if (errorHandler(cb)) 
  274. continue;
  275.       exitHandler();
  276.     } // if
  277.     
  278.     if (op->writeTuple() == -1) 
  279.     {
  280.       if (errorHandler(cb))
  281. continue;
  282.       exitHandler();
  283.     } // if
  284.     
  285.     int ret = 0;
  286.     for (int j = 0; j < 2; j++)
  287.     {
  288.       for (int i = 0; i < tup.getNoOfAttributes(); i++) 
  289.       {
  290. const AttributeDesc * attr_desc = tup.getDesc(i);
  291. const AttributeData * attr_data = tup.getData(i);
  292. int size = attr_desc->size;
  293. int arraySize = attr_desc->arraySize;
  294. char * dataPtr = attr_data->string_value;
  295. Uint32 length = (size * arraySize) / 8;
  296. if (j == 0 && tup.getTable()->have_auto_inc(i))
  297.   tup.getTable()->update_max_auto_val(dataPtr,size);
  298. if (attr_desc->m_column->getPrimaryKey())
  299. {
  300.   if (j == 1) continue;
  301.   ret = op->equal(i, dataPtr, length);
  302. }
  303. else
  304. {
  305.   if (j == 0) continue;
  306.   if (attr_data->null) 
  307.     ret = op->setValue(i, NULL, 0);
  308.   else
  309.     ret = op->setValue(i, dataPtr, length);
  310. }
  311. if (ret < 0) {
  312.   ndbout_c("Column: %d type %d %d %d %d",i,
  313.    attr_desc->m_column->getType(),
  314.    size, arraySize, attr_data->size);
  315.   break;
  316. }
  317.       }
  318.       if (ret < 0)
  319. break;
  320.     }
  321.     if (ret < 0)
  322.     {
  323.       if (errorHandler(cb)) 
  324. continue;
  325.       exitHandler();
  326.     }
  327.     // Prepare transaction (the transaction is NOT yet sent to NDB)
  328.     cb->connection->executeAsynchPrepare(Commit, &callback, cb);
  329.     m_transactions++;
  330.     return;
  331.   }
  332.   err << "Retried transaction " << cb->retries << " times.nLast error"
  333.       << m_ndb->getNdbError(cb->error_code) << endl
  334.       << "...Unable to recover from errors. Exiting..." << endl;
  335.   exitHandler();
  336. }
  337. void BackupRestore::cback(int result, restore_callback_t *cb)
  338. {
  339.   m_transactions--;
  340.   if (result < 0)
  341.   {
  342.     /**
  343.      * Error. temporary or permanent?
  344.      */
  345.     if (errorHandler(cb))
  346.       tuple_a(cb); // retry
  347.     else
  348.     {
  349.       err << "Restore: Failed to restore data due to a unrecoverable error. Exiting..." << endl;
  350.       exitHandler();
  351.     }
  352.   }
  353.   else
  354.   {
  355.     /**
  356.      * OK! close transaction
  357.      */
  358.     m_ndb->closeTransaction(cb->connection);
  359.     cb->connection= 0;
  360.     cb->next= m_free_callback;
  361.     m_free_callback= cb;
  362.     m_dataCount++;
  363.   }
  364. }
  365. /**
  366.  * returns true if is recoverable,
  367.  * Error handling based on hugo
  368.  *  false if it is an  error that generates an abort.
  369.  */
  370. bool BackupRestore::errorHandler(restore_callback_t *cb) 
  371. {
  372.   NdbError error;
  373.   if(cb->connection)
  374.   {
  375.     error= cb->connection->getNdbError();
  376.     m_ndb->closeTransaction(cb->connection);
  377.     cb->connection= 0;
  378.   }
  379.   else
  380.   {
  381.     error= m_ndb->getNdbError();
  382.   } 
  383.   Uint32 sleepTime = 100 + cb->retries * 300;
  384.   
  385.   cb->retries++;
  386.   cb->error_code = error.code;
  387.   switch(error.status)
  388.   {
  389.   case NdbError::Success:
  390.     return false;
  391.     // ERROR!
  392.     break;
  393.     
  394.   case NdbError::TemporaryError:
  395.     err << "Temporary error: " << error << endl;
  396.     NdbSleep_MilliSleep(sleepTime);
  397.     return true;
  398.     // RETRY
  399.     break;
  400.     
  401.   case NdbError::UnknownResult:
  402.     err << error << endl;
  403.     return false;
  404.     // ERROR!
  405.     break;
  406.     
  407.   default:
  408.   case NdbError::PermanentError:
  409.     //ERROR
  410.     err << error << endl;
  411.     return false;
  412.     break;
  413.   }
  414.   return false;
  415. }
  416. void BackupRestore::exitHandler() 
  417. {
  418.   release();
  419.   NDBT_ProgramExit(NDBT_FAILED);
  420.   if (opt_core)
  421.     abort();
  422.   else
  423.     exit(NDBT_FAILED);
  424. }
  425. void
  426. BackupRestore::tuple_free()
  427. {
  428.   if (!m_restore)
  429.     return;
  430.   // Poll all transactions
  431.   while (m_transactions)
  432.   {
  433.     m_ndb->sendPollNdb(3000);
  434.   }
  435. }
  436. void
  437. BackupRestore::endOfTuples()
  438. {
  439.   tuple_free();
  440. }
  441. void
  442. BackupRestore::logEntry(const LogEntry & tup)
  443. {
  444.   if (!m_restore)
  445.     return;
  446.   NdbConnection * trans = m_ndb->startTransaction();
  447.   if (trans == NULL) 
  448.   {
  449.     // Deep shit, TODO: handle the error
  450.     err << "Cannot start transaction" << endl;
  451.     exitHandler();
  452.   } // if
  453.   
  454.   const NdbDictionary::Table * table = get_table(tup.m_table->m_dictTable);
  455.   NdbOperation * op = trans->getNdbOperation(table);
  456.   if (op == NULL) 
  457.   {
  458.     err << "Cannot get operation: " << trans->getNdbError() << endl;
  459.     exitHandler();
  460.   } // if
  461.   
  462.   int check = 0;
  463.   switch(tup.m_type)
  464.   {
  465.   case LogEntry::LE_INSERT:
  466.     check = op->insertTuple();
  467.     break;
  468.   case LogEntry::LE_UPDATE:
  469.     check = op->updateTuple();
  470.     break;
  471.   case LogEntry::LE_DELETE:
  472.     check = op->deleteTuple();
  473.     break;
  474.   default:
  475.     err << "Log entry has wrong operation type."
  476.    << " Exiting...";
  477.     exitHandler();
  478.   }
  479.   if (check != 0) 
  480.   {
  481.     err << "Error defining op: " << trans->getNdbError() << endl;
  482.     exitHandler();
  483.   } // if
  484.   
  485.   Bitmask<4096> keys;
  486.   for (Uint32 i= 0; i < tup.size(); i++) 
  487.   {
  488.     const AttributeS * attr = tup[i];
  489.     int size = attr->Desc->size;
  490.     int arraySize = attr->Desc->arraySize;
  491.     const char * dataPtr = attr->Data.string_value;
  492.     
  493.     if (tup.m_table->have_auto_inc(attr->Desc->attrId))
  494.       tup.m_table->update_max_auto_val(dataPtr,size);
  495.     const Uint32 length = (size / 8) * arraySize;
  496.     if (attr->Desc->m_column->getPrimaryKey())
  497.     {
  498.       if(!keys.get(attr->Desc->attrId))
  499.       {
  500. keys.set(attr->Desc->attrId);
  501. check= op->equal(attr->Desc->attrId, dataPtr, length);
  502.       }
  503.     }
  504.     else
  505.       check= op->setValue(attr->Desc->attrId, dataPtr, length);
  506.     
  507.     if (check != 0) 
  508.     {
  509.       err << "Error defining op: " << trans->getNdbError() << endl;
  510.       exitHandler();
  511.     } // if
  512.   }
  513.   
  514.   const int ret = trans->execute(Commit);
  515.   if (ret != 0)
  516.   {
  517.     // Both insert update and delete can fail during log running
  518.     // and it's ok
  519.     // TODO: check that the error is either tuple exists or tuple does not exist?
  520.     bool ok= false;
  521.     NdbError errobj= trans->getNdbError();
  522.     switch(tup.m_type)
  523.     {
  524.     case LogEntry::LE_INSERT:
  525.       if(errobj.status == NdbError::PermanentError &&
  526.  errobj.classification == NdbError::ConstraintViolation)
  527. ok= true;
  528.       break;
  529.     case LogEntry::LE_UPDATE:
  530.     case LogEntry::LE_DELETE:
  531.       if(errobj.status == NdbError::PermanentError &&
  532.  errobj.classification == NdbError::NoDataFound)
  533. ok= true;
  534.       break;
  535.     }
  536.     if (!ok)
  537.     {
  538.       err << "execute failed: " << errobj << endl;
  539.       exitHandler();
  540.     }
  541.   }
  542.   
  543.   m_ndb->closeTransaction(trans);
  544.   m_logCount++;
  545. }
  546. void
  547. BackupRestore::endOfLogEntrys()
  548. {
  549.   if (!m_restore)
  550.     return;
  551.   info << "Restored " << m_dataCount << " tuples and "
  552.        << m_logCount << " log entries" << endl;
  553. }
  554. /*
  555.  *   callback : This is called when the transaction is polled
  556.  *              
  557.  *   (This function must have three arguments: 
  558.  *   - The result of the transaction, 
  559.  *   - The NdbConnection object, and 
  560.  *   - A pointer to an arbitrary object.)
  561.  */
  562. static void
  563. callback(int result, NdbConnection* trans, void* aObject)
  564. {
  565.   restore_callback_t *cb = (restore_callback_t *)aObject;
  566.   (cb->restore)->cback(result, cb);
  567. }
  568. #if 0 // old tuple impl
  569. void
  570. BackupRestore::tuple(const TupleS & tup)
  571. {
  572.   if (!m_restore)
  573.     return;
  574.   while (1) 
  575.   {
  576.     NdbConnection * trans = m_ndb->startTransaction();
  577.     if (trans == NULL) 
  578.     {
  579.       // Deep shit, TODO: handle the error
  580.       ndbout << "Cannot start transaction" << endl;
  581.       exitHandler();
  582.     } // if
  583.     
  584.     const TableS * table = tup.getTable();
  585.     NdbOperation * op = trans->getNdbOperation(table->getTableName());
  586.     if (op == NULL) 
  587.     {
  588.       ndbout << "Cannot get operation: ";
  589.       ndbout << trans->getNdbError() << endl;
  590.       exitHandler();
  591.     } // if
  592.     
  593.     // TODO: check return value and handle error
  594.     if (op->writeTuple() == -1) 
  595.     {
  596.       ndbout << "writeTuple call failed: ";
  597.       ndbout << trans->getNdbError() << endl;
  598.       exitHandler();
  599.     } // if
  600.     
  601.     for (int i = 0; i < tup.getNoOfAttributes(); i++) 
  602.     {
  603.       const AttributeS * attr = tup[i];
  604.       int size = attr->Desc->size;
  605.       int arraySize = attr->Desc->arraySize;
  606.       const char * dataPtr = attr->Data.string_value;
  607.       
  608.       const Uint32 length = (size * arraySize) / 8;
  609.       if (attr->Desc->m_column->getPrimaryKey()) 
  610. op->equal(i, dataPtr, length);
  611.     }
  612.     
  613.     for (int i = 0; i < tup.getNoOfAttributes(); i++) 
  614.     {
  615.       const AttributeS * attr = tup[i];
  616.       int size = attr->Desc->size;
  617.       int arraySize = attr->Desc->arraySize;
  618.       const char * dataPtr = attr->Data.string_value;
  619.       
  620.       const Uint32 length = (size * arraySize) / 8;
  621.       if (!attr->Desc->m_column->getPrimaryKey())
  622. if (attr->Data.null)
  623.   op->setValue(i, NULL, 0);
  624. else
  625.   op->setValue(i, dataPtr, length);
  626.     }
  627.     int ret = trans->execute(Commit);
  628.     if (ret != 0)
  629.     {
  630.       ndbout << "execute failed: ";
  631.       ndbout << trans->getNdbError() << endl;
  632.       exitHandler();
  633.     }
  634.     m_ndb->closeTransaction(trans);
  635.     if (ret == 0)
  636.       break;
  637.   }
  638.   m_dataCount++;
  639. }
  640. #endif
  641. template class Vector<NdbDictionary::Table*>;
  642. template class Vector<const NdbDictionary::Table*>;