ndbapi_async.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:12k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. /**
  14.  * ndbapi_async.cpp: 
  15.  * Illustrates how to use callbacks and error handling using the asynchronous
  16.  * part of the NDBAPI.
  17.  *
  18.  * Classes and methods in NDBAPI used in this example:
  19.  *
  20.  *  Ndb
  21.  *       init()
  22.  *       waitUntilRead()
  23.  *       getDictionary()
  24.  *       startTransaction()
  25.  *       closeTransaction()
  26.  *       sendPollNdb()
  27.  *       getNdbError()
  28.  *
  29.  *  NdbConnection
  30.  *       getNdbOperation()
  31.  *       executeAsynchPrepare()
  32.  *       getNdbError()
  33.  *
  34.  *  NdbDictionary::Dictionary
  35.  *       getTable()
  36.  *       dropTable()
  37.  *       createTable()
  38.  *       getNdbError()
  39.  *
  40.  *  NdbDictionary::Column
  41.  *       setName()
  42.  *       setType()
  43.  *       setLength()
  44.  *       setPrimaryKey()
  45.  *       setNullable()
  46.  *
  47.  *  NdbDictionary::Table
  48.  *       setName()
  49.  *       addColumn()
  50.  *
  51.  *  NdbOperation
  52.  *       insertTuple()
  53.  *       equal()
  54.  *       setValue()
  55.  *       
  56.  */
  57. #include <ndb_global.h>
  58. #include <NdbApi.hpp>
  59. #include <NdbScanFilter.hpp>
  60. #include <iostream> // Used for cout
  61. /**
  62.  * Helper sleep function
  63.  */
  64. int
  65. milliSleep(int milliseconds){
  66.   int result = 0;
  67.   struct timespec sleeptime;
  68.   sleeptime.tv_sec = milliseconds / 1000;
  69.   sleeptime.tv_nsec = (milliseconds - (sleeptime.tv_sec * 1000)) * 1000000;
  70.   result = nanosleep(&sleeptime, NULL);
  71.   return result;
  72. }
  73. /**
  74.  * error printout macro
  75.  */
  76. #define APIERROR(error) 
  77.   { std::cout << "Error in " << __FILE__ << ", line:" << __LINE__ << ", code:" 
  78.               << error.code << ", msg: " << error.message << "." << std::endl; 
  79.     exit(-1); }
  80. #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
  81. /**
  82.  * callback struct.
  83.  * transaction :  index of the transaction in transaction[] array below
  84.  * data : the data that the transaction was modifying.
  85.  * retries : counter for how many times the trans. has been retried
  86.  */
  87. typedef struct  {
  88.   Ndb * ndb;
  89.   int    transaction;  
  90.   int    data;
  91.   int    retries;
  92. } async_callback_t;
  93. /**
  94.  * Structure used in "free list" to a NdbConnection
  95.  */
  96. typedef struct  {
  97.   NdbConnection*  conn;   
  98.   int used; 
  99. } transaction_t;
  100. /**
  101.  * Free list holding transactions
  102.  */
  103. transaction_t   transaction[1024];  //1024 - max number of outstanding
  104.                                     //transaction in one Ndb object
  105. #endif 
  106. /**
  107.  * prototypes
  108.  */
  109. /**
  110.  * Prepare and send transaction
  111.  */
  112. int  populate(Ndb * myNdb, int data, async_callback_t * cbData);
  113. /**
  114.  * Error handler.
  115.  */
  116. bool asynchErrorHandler(NdbConnection * trans, Ndb* ndb);
  117. /**
  118.  * Exit function
  119.  */
  120. void asynchExitHandler(Ndb * m_ndb) ;
  121. /**
  122.  * Helper function used in callback(...)
  123.  */
  124. void closeTransaction(Ndb * ndb , async_callback_t * cb);
  125. /**
  126.  * Function to create table
  127.  */
  128. int create_table(Ndb * myNdb);
  129. /**
  130.  * stat. variables
  131.  */
  132. int tempErrors = 0;
  133. int permErrors = 0;
  134. /**
  135.  * Helper function for callback(...)
  136.  */
  137. void
  138. closeTransaction(Ndb * ndb , async_callback_t * cb)
  139. {
  140.   ndb->closeTransaction(transaction[cb->transaction].conn);
  141.   transaction[cb->transaction].conn = 0;
  142.   transaction[cb->transaction].used = 0;
  143.   cb->retries++;  
  144. }
  145. /**
  146.  * Callback executed when transaction has return from NDB
  147.  */
  148. static void
  149. callback(int result, NdbConnection* trans, void* aObject)
  150. {
  151.   async_callback_t * cbData = (async_callback_t *)aObject;
  152.   if (result<0)
  153.   {
  154.     /**
  155.      * Error: Temporary or permanent?
  156.      */
  157.     if (asynchErrorHandler(trans,  (Ndb*)cbData->ndb)) 
  158.     {
  159.       closeTransaction((Ndb*)cbData->ndb, cbData);
  160.       while(populate((Ndb*)cbData->ndb, cbData->data, cbData) < 0)
  161. milliSleep(10);
  162.     }
  163.     else
  164.     {
  165.       std::cout << "Restore: Failed to restore data " 
  166. << "due to a unrecoverable error. Exiting..." << std::endl;
  167.       delete cbData;
  168.       asynchExitHandler((Ndb*)cbData->ndb);
  169.     }
  170.   } 
  171.   else 
  172.   {
  173.     /**
  174.      * OK! close transaction
  175.      */
  176.     closeTransaction((Ndb*)cbData->ndb, cbData);
  177.     delete cbData;
  178.   }
  179. }
  180. /**
  181.  * Create table "GARAGE"
  182.  */
  183. int create_table(Ndb * myNdb) 
  184. {
  185.   NdbDictionary::Table myTable;
  186.   NdbDictionary::Column myColumn;
  187.   
  188.   NdbDictionary::Dictionary* myDict = myNdb->getDictionary();
  189.   
  190.   /*********************************************************
  191.    * Create a table named GARAGE if it does not exist *
  192.    *********************************************************/
  193.   if (myDict->getTable("GARAGE") != NULL) 
  194.   {
  195.     std::cout << "NDB already has example table: GARAGE. "
  196.       << "Dropping it..." << std::endl; 
  197.     if(myDict->dropTable("GARAGE") == -1)
  198.     {
  199.       std::cout << "Failed to drop: GARAGE." << std::endl; 
  200.       exit(1);
  201.     }
  202.   } 
  203.   myTable.setName("GARAGE");
  204.   
  205. /**
  206.  * Column REG_NO
  207.  */
  208.   myColumn.setName("REG_NO");
  209.   myColumn.setType(NdbDictionary::Column::Unsigned);
  210.   myColumn.setLength(1);
  211.   myColumn.setPrimaryKey(true);
  212.   myColumn.setNullable(false);
  213.   myTable.addColumn(myColumn);
  214. /**
  215.  * Column BRAND
  216.  */
  217.   myColumn.setName("BRAND");
  218.   myColumn.setType(NdbDictionary::Column::Char);
  219.   myColumn.setLength(20);
  220.   myColumn.setPrimaryKey(false);
  221.   myColumn.setNullable(false);
  222.   myTable.addColumn(myColumn);
  223. /**
  224.  * Column COLOR
  225.  */
  226.   myColumn.setName("COLOR");
  227.   myColumn.setType(NdbDictionary::Column::Char);
  228.   myColumn.setLength(20);
  229.   myColumn.setPrimaryKey(false);
  230.   myColumn.setNullable(false);
  231.   myTable.addColumn(myColumn);
  232.   if (myDict->createTable(myTable) == -1) {
  233.       APIERROR(myDict->getNdbError());
  234.   }
  235.   return 1;
  236. }
  237. void asynchExitHandler(Ndb * m_ndb) 
  238. {
  239.   if (m_ndb != NULL)
  240.     delete m_ndb;
  241.   exit(-1);
  242. }
  243. /* returns true if is recoverable (temporary),
  244.  *  false if it is an  error that is permanent.
  245.  */
  246. bool asynchErrorHandler(NdbConnection * trans, Ndb* ndb) 
  247. {  
  248.   NdbError error = trans->getNdbError();
  249.   switch(error.status)
  250.   {
  251.   case NdbError::Success:
  252.     return false;
  253.     break;
  254.     
  255.   case NdbError::TemporaryError:
  256.     /**
  257.      * The error code indicates a temporary error.
  258.      * The application should typically retry.
  259.      * (Includes classifications: NdbError::InsufficientSpace, 
  260.      *  NdbError::TemporaryResourceError, NdbError::NodeRecoveryError,
  261.      *  NdbError::OverloadError, NdbError::NodeShutdown 
  262.      *  and NdbError::TimeoutExpired.)
  263.      *     
  264.      * We should sleep for a while and retry, except for insufficient space
  265.      */
  266.     if(error.classification == NdbError::InsufficientSpace)
  267.       return false;
  268.     milliSleep(10);  
  269.     tempErrors++;  
  270.     return true;
  271.     break;    
  272.   case NdbError::UnknownResult:
  273.     std::cout << error.message << std::endl;
  274.     return false;
  275.     break;
  276.   default:
  277.   case NdbError::PermanentError:
  278.     switch (error.code)
  279.     {
  280.     case 499:
  281.     case 250:
  282.       milliSleep(10);    
  283.       return true; // SCAN errors that can be retried. Requires restart of scan.
  284.     default:
  285.       break;
  286.     }
  287.     //ERROR
  288.     std::cout << error.message << std::endl;
  289.     return false;
  290.     break;
  291.   }
  292.   return false;
  293. }
  294. static int nPreparedTransactions = 0;
  295. static int MAX_RETRIES = 10;
  296. static int parallelism = 100;
  297. /************************************************************************
  298.  * populate()
  299.  * 1. Prepare 'parallelism' number of insert transactions. 
  300.  * 2. Send transactions to NDB and wait for callbacks to execute
  301.  */
  302. int populate(Ndb * myNdb, int data, async_callback_t * cbData)
  303. {
  304.   NdbOperation*   myNdbOperation;       // For operations
  305.   async_callback_t * cb;
  306.   int retries = 0;
  307.   int current = 0;
  308.   for(int i=0; i<1024; i++)
  309.   {
  310.     if(transaction[i].used == 0)
  311.     {
  312.       current = i;
  313.       if (cbData == 0) 
  314.       {
  315.        /**
  316.         * We already have a callback
  317. * This is an absolutely new transaction
  318.         */
  319. cb = new async_callback_t;
  320. cb->retries = 0;
  321.       }
  322.       else 
  323.       { 
  324.        /**
  325.         * We already have a callback
  326.         */
  327. cb =cbData;
  328. retries = cbData->retries;
  329.       }
  330.       /**
  331.        * Set data used by the callback
  332.        */
  333.       cb->ndb = myNdb;  //handle to Ndb object so that we can close transaction
  334.                         // in the callback (alt. make myNdb global).
  335.       cb->data =  data; //this is the data we want to insert
  336.       cb->transaction = current; //This is the number (id)  of this transaction
  337.       transaction[current].used = 1 ; //Mark the transaction as used
  338.       break;
  339.     }
  340.   }
  341.   if(!current)
  342.     return -1;
  343.   while(retries < MAX_RETRIES) 
  344.     {
  345.       transaction[current].conn = myNdb->startTransaction();
  346.       if (transaction[current].conn == NULL) {
  347. if (asynchErrorHandler(transaction[current].conn, myNdb)) 
  348. {
  349.           /**
  350.            * no transaction to close since conn == null
  351.            */
  352.   milliSleep(10);
  353.   retries++;
  354.   continue;
  355. }
  356. asynchExitHandler(myNdb);
  357.       }
  358.       // Error check. If error, then maybe table GARAGE is not in database
  359.       myNdbOperation = transaction[current].conn->getNdbOperation("GARAGE");
  360.       if (myNdbOperation == NULL) 
  361.       {
  362. if (asynchErrorHandler(transaction[current].conn, myNdb)) 
  363. {
  364.   myNdb->closeTransaction(transaction[current].conn);
  365.   transaction[current].conn = 0;
  366.   milliSleep(10);
  367.   retries++;
  368.   continue;
  369. }
  370. asynchExitHandler(myNdb);
  371.       } // if
  372.       if(myNdbOperation->insertTuple() < 0  ||
  373.  myNdbOperation->equal("REG_NO", data) < 0 ||
  374.  myNdbOperation->setValue("BRAND", "Mercedes") <0 ||
  375.  myNdbOperation->setValue("COLOR", "Blue") < 0)
  376.       {
  377. if (asynchErrorHandler(transaction[current].conn, myNdb)) 
  378. {
  379.   myNdb->closeTransaction(transaction[current].conn);
  380.   transaction[current].conn = 0;
  381.   retries++;
  382.   milliSleep(10);
  383.   continue;
  384. }
  385. asynchExitHandler(myNdb);
  386.       }     
  387.       /*Prepare transaction (the transaction is NOT yet sent to NDB)*/
  388.       transaction[current].conn->executeAsynchPrepare(Commit, 
  389.        &callback,
  390.        cb);
  391.       /**
  392.        * When we have prepared parallelism number of transactions ->
  393.        * send the transaction to ndb. 
  394.        * Next time we will deal with the transactions are in the 
  395.        * callback. There we will see which ones that were successful
  396.        * and which ones to retry.
  397.        */
  398.       if (nPreparedTransactions == parallelism-1) 
  399.       {
  400. // send-poll all transactions
  401. // close transaction is done in callback
  402. myNdb->sendPollNdb(3000, parallelism );
  403. nPreparedTransactions=0;
  404.       } 
  405.       else
  406. nPreparedTransactions++;
  407.       return 1;
  408.     }
  409.     std::cout << "Unable to recover from errors. Exiting..." << std::endl;
  410.     asynchExitHandler(myNdb);
  411.     return -1;
  412. }
  413. int main()
  414. {
  415.   ndb_init();
  416.   Ndb* myNdb = new Ndb( "TEST_DB" );  // Object representing the database
  417.   
  418.   /*******************************************
  419.    * Initialize NDB and wait until its ready *
  420.    *******************************************/
  421.   if (myNdb->init(1024) == -1) {          // Set max 1024  parallel transactions
  422.     APIERROR(myNdb->getNdbError());
  423.   }
  424.   if (myNdb->waitUntilReady(30) != 0) {
  425.     std::cout << "NDB was not ready within 30 secs." << std::endl;
  426.     exit(-1);
  427.   }
  428.   create_table(myNdb);
  429.   
  430.   /**
  431.    * Initialise transaction array
  432.    */
  433.   for(int i = 0 ; i < 1024 ; i++) 
  434.   {
  435.     transaction[i].used = 0;
  436.     transaction[i].conn = 0;
  437.     
  438.   }
  439.   int i=0;
  440.   /**
  441.    * Do 20000 insert transactions.
  442.    */
  443.   while(i < 20000) 
  444.   {
  445.     while(populate(myNdb,i,0)<0)  // <0, no space on free list. Sleep and try again.
  446.       milliSleep(10);
  447.       
  448.     i++;
  449.   }
  450.   std::cout << "Number of temporary errors: " << tempErrors << std::endl;
  451.   delete myNdb; 
  452. }