consumer_restorem.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:14k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #include "consumer_restore.hpp"
  14. #include <NdbSleep.h>
  15. extern FilteredNdbOut err;
  16. extern FilteredNdbOut info;
  17. extern FilteredNdbOut debug;
  18. static bool asynchErrorHandler(NdbConnection * trans, Ndb * ndb);
  19. static void callback(int result, NdbConnection* trans, void* aObject);
  20. bool
  21. BackupRestore::init()
  22. {
  23.   if (!m_restore && !m_restore_meta)
  24.     return true;
  25.   m_ndb = new Ndb();
  26.   if (m_ndb == NULL)
  27.     return false;
  28.   
  29.   // Turn off table name completion
  30.   m_ndb->useFullyQualifiedNames(false);
  31.   m_ndb->init(1024);
  32.   if (m_ndb->waitUntilReady(30) != 0)
  33.   {
  34.     ndbout << "Failed to connect to ndb!!" << endl;
  35.     return false;
  36.   }
  37.   ndbout << "Connected to ndb!!" << endl;
  38. #if USE_MYSQL
  39.   if(use_mysql) 
  40.   {
  41.     if ( mysql_thread_safe() == 0 ) 
  42.     {
  43.       ndbout << "Not thread safe mysql library..." << endl;
  44.       exit(-1);
  45.     }
  46.     
  47.     ndbout << "Connecting to MySQL..." <<endl;
  48.     
  49.     /**
  50.      * nwe param:
  51.      *  port
  52.      *  host
  53.      *  user
  54.      */
  55.     bool returnValue = true;
  56.     mysql_init(&mysql);
  57.     {
  58.       int portNo = 3306;
  59.       if ( mysql_real_connect(&mysql,
  60.       ga_host,
  61.       ga_user,
  62.       ga_password,
  63.       ga_database,
  64.       ga_port,
  65. ::       ga_socket,
  66.       0) == NULL ) 
  67.       {
  68. ndbout_c("Connect failed: %s", mysql_error(&mysql));
  69. returnValue = false;
  70.       }
  71.       ndbout << "Connected to MySQL!!!" <<endl;
  72.     }
  73.     /*  if(returnValue){
  74. mysql_set_server_option(&mysql, MYSQL_OPTION_MULTI_STATEMENTS_ON);
  75. }
  76.     */
  77.     return returnValue;
  78.   }
  79. #endif
  80.   if (m_callback) {
  81.     delete [] m_callback;
  82.     m_callback = 0;
  83.   }
  84.   m_callback = new restore_callback_t[m_parallelism];
  85.   if (m_callback == 0)
  86.   {
  87.     ndbout << "Failed to allocate callback structs" << endl;
  88.     return false;
  89.   }
  90.   m_free_callback = m_callback;
  91.   for (int i= 0; i < m_parallelism; i++) {
  92.     m_callback[i].restore = this;
  93.     m_callback[i].connection = 0;
  94.     m_callback[i].retries = 0;
  95.     if (i > 0)
  96.       m_callback[i-1].next = &(m_callback[i]);
  97.   }
  98.   m_callback[m_parallelism-1].next = 0;
  99.   return true;
  100.   
  101. }
  102. BackupRestore::~BackupRestore()
  103. {
  104.   if (m_ndb != 0)
  105.     delete m_ndb;
  106.   if (m_callback)
  107.     delete [] m_callback;
  108. }
  109. #ifdef USE_MYSQL
  110. bool
  111. BackupRestore::table(const TableS & table, MYSQL * mysqlp){
  112.   if (!m_restore_meta) 
  113.   {
  114.     return true;
  115.   }
  116.     
  117.   char tmpTabName[MAX_TAB_NAME_SIZE*2];
  118.   sprintf(tmpTabName, "%s", table.getTableName());
  119.   char * database = strtok(tmpTabName, "/");
  120.   char * schema   = strtok( NULL , "/");
  121.   char * tableName    = strtok( NULL , "/");
  122.   /**
  123.    * this means that the user did not specify schema
  124.    * and it is a v2x backup
  125.    */
  126.   if(database == NULL)
  127.     return false;
  128.   if(schema == NULL)
  129.     return false;
  130.   if(tableName==NULL)
  131.     tableName = schema; 
  132.   
  133.   char stmtCreateDB[255];
  134.   sprintf(stmtCreateDB,"CREATE DATABASE %s", database);
  135.   
  136.   /*ignore return value. mysql_select_db will trap errors anyways*/
  137.   if (mysql_query(mysqlp,stmtCreateDB) == 0)
  138.   {
  139.     //ndbout_c("%s", stmtCreateDB);
  140.   }
  141.   if (mysql_select_db(&mysql, database) != 0) 
  142.   {
  143.     ndbout_c("Error: %s", mysql_error(&mysql));
  144.     return false;
  145.   }
  146.   
  147.   char buf [2048];
  148.   /**
  149.    * create table ddl
  150.    */
  151.   if (create_table_string(table, tableName,  buf)) 
  152.   {
  153.     ndbout_c("Unable to create a table definition since the "
  154.      "backup contains undefined types");
  155.     return false;
  156.   }
  157.   //ndbout_c("%s", buf);
  158.   
  159.   if (mysql_query(mysqlp,buf) != 0) 
  160.   {
  161.       ndbout_c("Error: %s", mysql_error(&mysql));
  162.       return false;
  163.   } else 
  164.   {
  165.     ndbout_c("Successfully restored table %s into database %s", tableName, database);
  166.   }
  167.   
  168.   return true;
  169. }
  170. #endif
  171. bool
  172. BackupRestore::table(const TableS & table){
  173.   if (!m_restore_meta) 
  174.   {
  175.     return true;
  176.   }
  177.   NdbDictionary::Dictionary* dict = m_ndb->getDictionary();
  178.   if (dict->createTable(*table.m_dictTable) == -1) 
  179.   {
  180.     err << "Create table " << table.getTableName() << " failed: "
  181. << dict->getNdbError() << endl;
  182.     return false;
  183.   }
  184.   info << "Successfully restored table " << table.getTableName()<< endl ;
  185.   return true;
  186. }
  187. void BackupRestore::tuple(const TupleS & tup)
  188. {
  189.   if (!m_restore) 
  190.   {
  191.     delete &tup;
  192.     return;  
  193.   }
  194.   restore_callback_t * cb = m_free_callback;
  195.   if (cb)
  196.   {
  197.     m_free_callback = cb->next;
  198.     cb->retries = 0;
  199.     cb->tup = &tup;
  200.     tuple_a(cb);
  201.   }
  202.   if (m_free_callback == 0)
  203.   {
  204.     // send-poll all transactions
  205.     // close transaction is done in callback
  206.     m_ndb->sendPollNdb(3000, 1);
  207.   }
  208. }
  209. void BackupRestore::tuple_a(restore_callback_t *cb)
  210. {
  211.   while (cb->retries < 10) 
  212.   {
  213.     /**
  214.      * start transactions
  215.      */
  216.     cb->connection = m_ndb->startTransaction();
  217.     if (cb->connection == NULL) 
  218.     {
  219.       /*
  220. if (asynchErrorHandler(cb->connection, m_ndb)) 
  221. {
  222. cb->retries++;
  223. continue;
  224. }
  225.       */
  226.       asynchExitHandler();
  227.     } // if
  228.     
  229.     const TupleS &tup = *(cb->tup);
  230.     const TableS * table = tup.getTable();
  231.     NdbOperation * op = cb->connection->getNdbOperation(table->getTableName());
  232.     
  233.     if (op == NULL) 
  234.     {
  235.       if (asynchErrorHandler(cb->connection, m_ndb)) 
  236.       {
  237. cb->retries++;
  238. continue;
  239.       }
  240.       asynchExitHandler();
  241.     } // if
  242.     
  243.     if (op->writeTuple() == -1) 
  244.     {
  245.       if (asynchErrorHandler(cb->connection, m_ndb))
  246.       {
  247. cb->retries++;
  248. continue;
  249.       }
  250.       asynchExitHandler();
  251.     } // if
  252.     
  253.     Uint32 ret = 0;
  254.     for (int i = 0; i < tup.getNoOfAttributes(); i++) 
  255.     {
  256.       const AttributeS * attr = tup[i];
  257.       int size = attr->Desc->size;
  258.       int arraySize = attr->Desc->arraySize;
  259.       char * dataPtr = attr->Data.string_value;
  260.       Uint32 length = (size * arraySize) / 8;
  261.       if (attr->Desc->m_column->getPrimaryKey()) 
  262.       {
  263. ret = op->equal(i, dataPtr, length);
  264.       }
  265.       else
  266.       {
  267. if (attr->Data.null) 
  268.   ret = op->setValue(i, NULL, 0);
  269. else
  270.   ret = op->setValue(i, dataPtr, length);
  271.       }
  272.       if (ret<0) 
  273. {
  274.   ndbout_c("Column: %d type %d",i,
  275.    tup.getTable()->m_dictTable->getColumn(i)->getType());
  276.   if (asynchErrorHandler(cb->connection, m_ndb)) 
  277.     {
  278.       cb->retries++;
  279.       break;
  280.     }
  281.   asynchExitHandler();
  282. }
  283.     }
  284.     if (ret < 0)
  285.       continue;
  286.     // Prepare transaction (the transaction is NOT yet sent to NDB)
  287.     cb->connection->executeAsynchPrepare(Commit, &callback, cb);
  288.     m_transactions++;
  289.   }
  290.   ndbout_c("Unable to recover from errors. Exiting...");
  291.   asynchExitHandler();
  292. }
  293. void BackupRestore::cback(int result, restore_callback_t *cb)
  294. {
  295.   if (result<0)
  296.   {
  297.     /**
  298.        * Error. temporary or permanent?
  299.        */
  300.     if (asynchErrorHandler(cb->connection, m_ndb)) 
  301.     {
  302.       cb->retries++;
  303.       tuple_a(cb);
  304.     }
  305.     else
  306.     {
  307.       ndbout_c("Restore: Failed to restore data "
  308.        "due to a unrecoverable error. Exiting...");
  309.       delete m_ndb;
  310.       delete cb->tup;
  311.       exit(-1);
  312.     }
  313.   }
  314.   else 
  315.   {
  316.     /**
  317.      * OK! close transaction
  318.      */
  319.     m_ndb->closeTransaction(cb->connection);
  320.     delete cb->tup;
  321.     m_transactions--;
  322.   }
  323. }
  324. void BackupRestore::asynchExitHandler() 
  325. {
  326.   if (m_ndb != NULL)
  327.     delete m_ndb;
  328.   exit(-1);
  329. }
  330. #if 0 // old tuple impl
  331. void
  332. BackupRestore::tuple(const TupleS & tup)
  333. {
  334.   if (!m_restore)
  335.     return;
  336.   while (1) 
  337.   {
  338.     NdbConnection * trans = m_ndb->startTransaction();
  339.     if (trans == NULL) 
  340.     {
  341.       // Deep shit, TODO: handle the error
  342.       ndbout << "Cannot start transaction" << endl;
  343.       exit(-1);
  344.     } // if
  345.     
  346.     const TableS * table = tup.getTable();
  347.     NdbOperation * op = trans->getNdbOperation(table->getTableName());
  348.     if (op == NULL) 
  349.     {
  350.       ndbout << "Cannot get operation: ";
  351.       ndbout << trans->getNdbError() << endl;
  352.       exit(-1);
  353.     } // if
  354.     
  355.     // TODO: check return value and handle error
  356.     if (op->writeTuple() == -1) 
  357.     {
  358.       ndbout << "writeTuple call failed: ";
  359.       ndbout << trans->getNdbError() << endl;
  360.       exit(-1);
  361.     } // if
  362.     
  363.     for (int i = 0; i < tup.getNoOfAttributes(); i++) 
  364.     {
  365.       const AttributeS * attr = tup[i];
  366.       int size = attr->Desc->size;
  367.       int arraySize = attr->Desc->arraySize;
  368.       const char * dataPtr = attr->Data.string_value;
  369.       
  370.       const Uint32 length = (size * arraySize) / 8;
  371.       if (attr->Desc->m_column->getPrimaryKey()) 
  372. op->equal(i, dataPtr, length);
  373.     }
  374.     
  375.     for (int i = 0; i < tup.getNoOfAttributes(); i++) 
  376.     {
  377.       const AttributeS * attr = tup[i];
  378.       int size = attr->Desc->size;
  379.       int arraySize = attr->Desc->arraySize;
  380.       const char * dataPtr = attr->Data.string_value;
  381.       
  382.       const Uint32 length = (size * arraySize) / 8;
  383.       if (!attr->Desc->m_column->getPrimaryKey())
  384. if (attr->Data.null)
  385.   op->setValue(i, NULL, 0);
  386. else
  387.   op->setValue(i, dataPtr, length);
  388.     }
  389.     int ret = trans->execute(Commit);
  390.     if (ret != 0)
  391.     {
  392.       ndbout << "execute failed: ";
  393.       ndbout << trans->getNdbError() << endl;
  394.       exit(-1);
  395.     }
  396.     m_ndb->closeTransaction(trans);
  397.     if (ret == 0)
  398.       break;
  399.   }
  400.   m_dataCount++;
  401. }
  402. #endif
  403. void
  404. BackupRestore::endOfTuples()
  405. {
  406.   if (!m_restore)
  407.     return;
  408.   // Send all transactions to NDB 
  409.   m_ndb->sendPreparedTransactions(0);
  410.   // Poll all transactions
  411.   m_ndb->pollNdb(3000, m_transactions);
  412.   // Close all transactions
  413.   //  for (int i = 0; i < nPreparedTransactions; i++) 
  414.   // m_ndb->closeTransaction(asynchTrans[i]);
  415. }
  416. void
  417. BackupRestore::logEntry(const LogEntry & tup)
  418. {
  419.   if (!m_restore)
  420.     return;
  421.   NdbConnection * trans = m_ndb->startTransaction();
  422.   if (trans == NULL) 
  423.   {
  424.     // Deep shit, TODO: handle the error
  425.     ndbout << "Cannot start transaction" << endl;
  426.     exit(-1);
  427.   } // if
  428.   
  429.   const TableS * table = tup.m_table;
  430.   NdbOperation * op = trans->getNdbOperation(table->getTableName());
  431.   if (op == NULL) 
  432.   {
  433.     ndbout << "Cannot get operation: ";
  434.     ndbout << trans->getNdbError() << endl;
  435.     exit(-1);
  436.   } // if
  437.   
  438.   int check = 0;
  439.   switch(tup.m_type)
  440.   {
  441.   case LogEntry::LE_INSERT:
  442.     check = op->insertTuple();
  443.     break;
  444.   case LogEntry::LE_UPDATE:
  445.     check = op->updateTuple();
  446.     break;
  447.   case LogEntry::LE_DELETE:
  448.     check = op->deleteTuple();
  449.     break;
  450.   default:
  451.     ndbout << "Log entry has wrong operation type."
  452.    << " Exiting...";
  453.     exit(-1);
  454.   }
  455.   
  456.   for (int i = 0; i < tup.m_values.size(); i++) 
  457.   {
  458.     const AttributeS * attr = tup.m_values[i];
  459.     int size = attr->Desc->size;
  460.     int arraySize = attr->Desc->arraySize;
  461.     const char * dataPtr = attr->Data.string_value;
  462.     
  463.     const Uint32 length = (size / 8) * arraySize;
  464.     if (attr->Desc->m_column->getPrimaryKey()) 
  465.       op->equal(attr->Desc->attrId, dataPtr, length);
  466.     else
  467.       op->setValue(attr->Desc->attrId, dataPtr, length);
  468.   }
  469.   
  470. #if 1
  471.   trans->execute(Commit);
  472. #else
  473.   const int ret = trans->execute(Commit);
  474.   // Both insert update and delete can fail during log running
  475.   // and it's ok
  476.   
  477.   if (ret != 0)
  478.   {
  479.     ndbout << "execute failed: ";
  480.     ndbout << trans->getNdbError() << endl;
  481.     exit(-1);
  482.   }
  483. #endif
  484.   
  485.   m_ndb->closeTransaction(trans);
  486.   m_logCount++;
  487. }
  488. void
  489. BackupRestore::endOfLogEntrys()
  490. {
  491.   if (m_restore) 
  492.   {
  493.     ndbout << "Restored " << m_dataCount << " tuples and "
  494.      << m_logCount << " log entries" << endl;
  495.   }
  496. }
  497. #if 0
  498. /*****************************************
  499.  *
  500.  * Callback function for asynchronous transactions
  501.  *
  502.  * Idea for error handling: Transaction objects have to be stored globally when
  503.  *                they are prepared.
  504.  *        In the callback function if the transaction:
  505.  *          succeeded: delete the object from global storage
  506.  *          failed but can be retried: execute the object that is in global storage
  507.  *          failed but fatal: delete the object from global storage
  508.  *
  509.  ******************************************/
  510. static void restoreCallback(int result,            // Result for transaction
  511.     NdbConnection *object, // Transaction object
  512.     void *anything)        // Not used
  513. {
  514.   static Uint32 counter = 0;
  515.   
  516.   debug << "restoreCallback function called " << counter << " time(s)" << endl;
  517.   ++counter;
  518.   if (result == -1) 
  519.   {
  520.       ndbout << " restoreCallback (" << counter;
  521.       if ((counter % 10) == 1) 
  522.       {
  523.   ndbout << "st";
  524.       } // if
  525.       else if ((counter % 10) == 2) 
  526.       {
  527. ndbout << "nd";
  528.       } // else if
  529.       else if ((counter % 10 ) ==3) 
  530.       {
  531. ndbout << "rd";
  532.       } // else if
  533.       else 
  534.       {
  535. ndbout << "th";
  536.       } // else
  537.       err << " time: error detected " << object->getNdbError() << endl;
  538.     } // if
  539.   
  540. } // restoreCallback
  541. #endif
  542. /*
  543.  *   callback : This is called when the transaction is polled
  544.  *              
  545.  *   (This function must have three arguments: 
  546.  *   - The result of the transaction, 
  547.  *   - The NdbConnection object, and 
  548.  *   - A pointer to an arbitrary object.)
  549.  */
  550. static void
  551. callback(int result, NdbConnection* trans, void* aObject)
  552. {
  553.   restore_callback_t *cb = (restore_callback_t *)aObject;
  554.   (cb->restore)->cback(result, cb);
  555. }
  556. /**
  557.  * returns true if is recoverable,
  558.  * Error handling based on hugo
  559.  *  false if it is an  error that generates an abort.
  560.  */
  561. static
  562. bool asynchErrorHandler(NdbConnection * trans, Ndb* ndb) 
  563. {
  564.   NdbError error = trans->getNdbError();
  565.   ndb->closeTransaction(trans);
  566.   switch(error.status)
  567.   {
  568.   case NdbError::Success:
  569.       return false;
  570.       // ERROR!
  571.       break;
  572.       
  573.   case NdbError::TemporaryError:
  574.     NdbSleep_MilliSleep(10);
  575.     return true;
  576.     // RETRY
  577.     break;
  578.     
  579.   case NdbError::UnknownResult:
  580.     ndbout << error << endl;
  581.     return false;
  582.     // ERROR!
  583.     break;
  584.     
  585.   default:
  586.   case NdbError::PermanentError:
  587.     switch (error.code)
  588.     {
  589.     case 499:
  590.     case 250:
  591.       NdbSleep_MilliSleep(10);
  592.       return true; //temp errors?
  593.     default:
  594.       break;
  595.     }
  596.     //ERROR
  597.     ndbout << error << endl;
  598.     return false;
  599.     break;
  600.   }
  601.   return false;
  602. }