HugoAsynchTransactions.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:13k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #include <NdbSleep.h>
  14. #include <HugoAsynchTransactions.hpp>
  15. HugoAsynchTransactions::HugoAsynchTransactions(const NdbDictionary::Table& _tab):
  16.   HugoTransactions(_tab),
  17.   transactionsCompleted(0),
  18.   numTransactions(0),
  19.   transactions(NULL){
  20. }
  21. HugoAsynchTransactions::~HugoAsynchTransactions(){
  22.   deallocTransactions();
  23. }
  24. void asynchCallback(int result, NdbConnection* pTrans, 
  25.     void* anObject) {
  26.   HugoAsynchTransactions* pHugo = (HugoAsynchTransactions*) anObject;
  27.   
  28.   pHugo->transactionCompleted();
  29.   if (result == -1) {
  30.     const NdbError err = pTrans->getNdbError();
  31.     switch(err.status) {
  32.     case NdbError::Success:
  33.       ERR(err);
  34.       g_info << "ERROR: NdbError reports success when transcaction failed"
  35.      << endl;
  36.       break;
  37.       
  38.     case NdbError::TemporaryError:      
  39.       ERR(err);
  40.       break;
  41. #if 0      
  42.     case 626: // Tuple did not exist
  43.       g_info << (unsigned int)pHugo->getTransactionsCompleted() << ": " 
  44.      << err.code << " " << err.message << endl;
  45.       break;
  46. #endif
  47.  
  48.     case NdbError::UnknownResult:
  49.       ERR(err);
  50.       break;
  51.       
  52.     case NdbError::PermanentError:
  53.       switch (err.classification) {
  54.       case NdbError::ConstraintViolation:
  55. // Tuple already existed, OK in this application, 
  56. // but should be reported
  57. g_info << (unsigned int)pHugo->getTransactionsCompleted() 
  58.        << ": " << err.code << " " << err.message << endl;
  59. break;
  60.       default:
  61. ERR(err);
  62. break;
  63.       }
  64.       break;
  65.     }
  66.   } else {// if (result == -1)
  67.     /*
  68.     ndbout << (unsigned int)pHugo->getTransactionsCompleted() << " completed" 
  69.    << endl;
  70.     */
  71.   }
  72. }
  73. int
  74. HugoAsynchTransactions::loadTableAsynch(Ndb* pNdb, 
  75.   int records,
  76.   int batch,
  77.   int trans,
  78.   int operations){
  79.   int result = executeAsynchOperation(pNdb, records, batch, trans, operations, 
  80.       NO_INSERT);
  81.   g_info << (unsigned int)transactionsCompleted * operations 
  82.  << "|- inserted..." << endl;
  83.   return result;
  84. void
  85. HugoAsynchTransactions::transactionCompleted() {
  86.   transactionsCompleted++;
  87. }
  88. long
  89. HugoAsynchTransactions::getTransactionsCompleted() {
  90.   return transactionsCompleted;
  91. }
  92. int 
  93. HugoAsynchTransactions::pkDelRecordsAsynch(Ndb* pNdb, 
  94.      int records,
  95.      int batch,
  96.      int trans,
  97.      int operations) {
  98.   
  99.   g_info << "|- Deleting records asynchronous..." << endl;
  100.   int result =  executeAsynchOperation(pNdb, records, batch, trans, 
  101.        operations, 
  102.        NO_DELETE);
  103.   g_info << "|- " << (unsigned int)transactionsCompleted * operations 
  104.  << " deleted..." << endl;
  105.   return result;  
  106. }
  107. int 
  108. HugoAsynchTransactions::pkReadRecordsAsynch(Ndb* pNdb, 
  109.       int records,
  110.       int batch,
  111.       int trans,
  112.       int operations) {
  113.   g_info << "|- Reading records asynchronous..." << endl;
  114.   allocRows(trans*operations);
  115.   int result = executeAsynchOperation(pNdb, records, batch, trans, operations, 
  116.       NO_READ);
  117.   g_info << "|- " << (unsigned int)transactionsCompleted * operations 
  118.  << " read..."
  119.  << endl;
  120.   deallocRows();
  121.   return result;
  122. }
  123. int 
  124. HugoAsynchTransactions::pkUpdateRecordsAsynch(Ndb* pNdb, 
  125. int records,
  126. int batch,
  127. int trans,
  128. int operations) {
  129.   g_info << "|- Updating records asynchronous..." << endl;
  130.   int             check = 0;
  131.   int             cTrans = 0;
  132.   int             cReadRecords = 0;
  133.   int             cReadIndex = 0;
  134.   int             cRecords = 0;
  135.   int             cIndex = 0;
  136.   transactionsCompleted = 0;
  137.   allocRows(trans*operations);
  138.   allocTransactions(trans);
  139.   int a, t, r;
  140.   for (int i = 0; i < batch; i++) { // For each batch
  141.     while (cRecords < records*batch) {
  142.       cTrans = 0;
  143.       cReadIndex = 0;
  144.       for (t = 0; t < trans; t++) { // For each transaction
  145. transactions[t] = pNdb->startTransaction();
  146. if (transactions[t] == NULL) {
  147.   ERR(pNdb->getNdbError());
  148.   return NDBT_FAILED;
  149. }
  150. for (int k = 0; k < operations; k++) { // For each operation
  151.   NdbOperation* pOp = transactions[t]->getNdbOperation(tab.getName());
  152.   if (pOp == NULL) { 
  153.     ERR(transactions[t]->getNdbError());
  154.     pNdb->closeTransaction(transactions[t]);
  155.     return NDBT_FAILED;
  156.   }
  157.   
  158.   // Read
  159.   // Define primary keys
  160.   check = pOp->readTupleExclusive();
  161.   for (a = 0; a < tab.getNoOfColumns(); a++) {
  162.     if (tab.getColumn(a)->getPrimaryKey() == true) {
  163.       if (equalForAttr(pOp, a, cReadRecords) != 0){
  164. ERR(transactions[t]->getNdbError());
  165. pNdb->closeTransaction(transactions[t]);
  166. return NDBT_FAILED;
  167.       }
  168.     }
  169.   }     
  170.   // Define attributes to read  
  171.   for (a = 0; a < tab.getNoOfColumns(); a++) {
  172.     if ((rows[cReadIndex]->attributeStore(a) = 
  173.  pOp->getValue(tab.getColumn(a)->getName())) == 0) {
  174.       ERR(transactions[t]->getNdbError());
  175.       pNdb->closeTransaction(transactions[t]);
  176.       return NDBT_FAILED;
  177.     }
  178.   }        
  179.   cReadIndex++;
  180.   cReadRecords++;
  181.   
  182. } // For each operation
  183. // Let's prepare...
  184. transactions[t]->executeAsynchPrepare(NoCommit, &asynchCallback, 
  185. this);
  186. cTrans++;
  187. if (cReadRecords >= records) {
  188.   // No more transactions needed
  189.   break;
  190. }      
  191.       } // For each transaction
  192.       // Wait for all outstanding transactions
  193.       pNdb->sendPollNdb(3000, 0, 0);
  194.       // Verify the data!
  195.       for (r = 0; r < trans*operations; r++) {
  196. if (calc.verifyRowValues(rows[r]) != 0) {
  197.   g_info << "|- Verify failed..." << endl;
  198.   // Close all transactions
  199.   for (int t = 0; t < cTrans; t++) {
  200.     pNdb->closeTransaction(transactions[t]);
  201.   }
  202.   return NDBT_FAILED;
  203. }
  204.       }
  205.       // Update
  206.       cTrans = 0;
  207.       cIndex = 0;
  208.       for (t = 0; t < trans; t++) { // For each transaction
  209. for (int k = 0; k < operations; k++) { // For each operation
  210.   NdbOperation* pOp = transactions[t]->getNdbOperation(tab.getName());
  211.   if (pOp == NULL) { 
  212.     ERR(transactions[t]->getNdbError());
  213.     pNdb->closeTransaction(transactions[t]);
  214.     return NDBT_FAILED;
  215.   }
  216.   
  217.   int updates = calc.getUpdatesValue(rows[cIndex]) + 1;
  218.   check = pOp->updateTuple();
  219.   if (check == -1) {
  220.     ERR(transactions[t]->getNdbError());
  221.     pNdb->closeTransaction(transactions[t]);
  222.       return NDBT_FAILED;
  223.   }
  224.   // Set search condition for the record
  225.   for (a = 0; a < tab.getNoOfColumns(); a++) {
  226.     if (tab.getColumn(a)->getPrimaryKey() == true) {
  227.       if (equalForAttr(pOp, a, cRecords) != 0) {
  228. ERR(transactions[t]->getNdbError());
  229. pNdb->closeTransaction(transactions[t]);
  230. return NDBT_FAILED;
  231.       }
  232.     }
  233.   }
  234.   // Update the record
  235.   for (a = 0; a < tab.getNoOfColumns(); a++) {
  236.     if (tab.getColumn(a)->getPrimaryKey() == false) {
  237.       if (setValueForAttr(pOp, a, cRecords, updates) != 0) {
  238. ERR(transactions[t]->getNdbError());
  239. pNdb->closeTransaction(transactions[t]);
  240. return NDBT_FAILED;
  241.       }
  242.     }
  243.   }   
  244.   cIndex++;
  245.   cRecords++;
  246.   
  247. } // For each operation
  248. // Let's prepare...
  249. transactions[t]->executeAsynchPrepare(Commit, &asynchCallback, 
  250. this);
  251. cTrans++;
  252. if (cRecords >= records) {
  253.   // No more transactions needed
  254.   break;
  255. }      
  256.       } // For each transaction
  257.       // Wait for all outstanding transactions
  258.       pNdb->sendPollNdb(3000, 0, 0);
  259.       // Close all transactions
  260.       for (t = 0; t < cTrans; t++) {
  261. pNdb->closeTransaction(transactions[t]);
  262.       }
  263.     } // while (cRecords < records*batch)
  264.   } // For each batch
  265.   deallocTransactions();
  266.   deallocRows();
  267.   
  268.   g_info << "|- " << ((unsigned int)transactionsCompleted * operations)/2 
  269.  << " updated..." << endl;
  270.   return NDBT_OK;
  271. }
  272. void 
  273. HugoAsynchTransactions::allocTransactions(int trans) {
  274.   if (transactions != NULL) {
  275.     deallocTransactions(); 
  276.   }
  277.   numTransactions = trans;
  278.   transactions = new NdbConnection*[numTransactions];  
  279. }
  280. void 
  281. HugoAsynchTransactions::deallocTransactions() {
  282.   if (transactions != NULL){
  283.     delete[] transactions;
  284.   }
  285.   transactions = NULL;
  286. }
  287. int 
  288. HugoAsynchTransactions::executeAsynchOperation(Ndb* pNdb,       
  289.  int records,
  290.  int batch,
  291.  int trans,
  292.  int operations,
  293.  NDB_OPERATION theOperation,
  294.  ExecType theType) {
  295.   int             check = 0;
  296.   //  int             retryAttempt = 0;  // Not used at the moment
  297.   //  int             retryMax = 5;      // Not used at the moment
  298.   int             cTrans = 0;
  299.   int             cRecords = 0;
  300.   int             cIndex = 0;
  301.   int a,t,r;
  302.   transactionsCompleted = 0;
  303.   allocTransactions(trans);
  304.   for (int i = 0; i < batch; i++) { // For each batch
  305.     while (cRecords < records*batch) {
  306.       cTrans = 0;
  307.       cIndex = 0;
  308.       for (t = 0; t < trans; t++) { // For each transaction
  309. transactions[t] = pNdb->startTransaction();
  310. if (transactions[t] == NULL) {
  311.   ERR(pNdb->getNdbError());
  312.   return NDBT_FAILED;
  313. }
  314. for (int k = 0; k < operations; k++) { // For each operation
  315.   NdbOperation* pOp = transactions[t]->getNdbOperation(tab.getName());
  316.   if (pOp == NULL) { 
  317.     ERR(transactions[t]->getNdbError());
  318.     pNdb->closeTransaction(transactions[t]);
  319.     return NDBT_FAILED;
  320.   }
  321.   
  322.   switch (theOperation) {
  323.   case NO_INSERT: 
  324.     // Insert
  325.     check = pOp->insertTuple();
  326.     if (check == -1) { 
  327.       ERR(transactions[t]->getNdbError());
  328.       pNdb->closeTransaction(transactions[t]);
  329.       return NDBT_FAILED;
  330.     }
  331.     
  332.     // Set a calculated value for each attribute in this table  
  333.     for (a = 0; a < tab.getNoOfColumns(); a++) {
  334.       if (setValueForAttr(pOp, a, cRecords, 0 ) != 0) {   
  335. ERR(transactions[t]->getNdbError());
  336. pNdb->closeTransaction(transactions[t]);   
  337. return NDBT_FAILED;
  338.       }
  339.     } // For each attribute
  340.     break;
  341.   case NO_UPDATE:
  342.     // This is a special case and is handled in the calling client...
  343.     break;
  344.   break;
  345.   case NO_READ:
  346.     // Define primary keys
  347.     check = pOp->readTuple();
  348.     for (a = 0; a < tab.getNoOfColumns(); a++) {
  349.       if (tab.getColumn(a)->getPrimaryKey() == true) {
  350. if (equalForAttr(pOp, a, cRecords) != 0){
  351.   ERR(transactions[t]->getNdbError());
  352.   pNdb->closeTransaction(transactions[t]);
  353.   return NDBT_FAILED;
  354. }
  355.       }
  356.     }     
  357.     // Define attributes to read  
  358.     for (a = 0; a < tab.getNoOfColumns(); a++) {
  359.       if ((rows[cIndex]->attributeStore(a) = 
  360.    pOp->getValue(tab.getColumn(a)->getName())) == 0) {
  361. ERR(transactions[t]->getNdbError());
  362. pNdb->closeTransaction(transactions[t]);
  363. return NDBT_FAILED;
  364.       }
  365.     }        
  366.     break;
  367.   case NO_DELETE:
  368.     // Delete
  369.     check = pOp->deleteTuple();
  370.     if (check == -1) { 
  371.       ERR(transactions[t]->getNdbError());
  372.       pNdb->closeTransaction(transactions[t]);
  373.       return NDBT_FAILED;
  374.     }
  375.     // Define primary keys
  376.     for (a = 0; a < tab.getNoOfColumns(); a++) {
  377.       if (tab.getColumn(a)->getPrimaryKey() == true){
  378. if (equalForAttr(pOp, a, cRecords) != 0) {
  379.   ERR(transactions[t]->getNdbError());
  380.   pNdb->closeTransaction(transactions[t]);
  381.   return NDBT_FAILED;
  382. }
  383.       }
  384.     }
  385.     break;
  386.   default:
  387.     // Should not happen...
  388.     pNdb->closeTransaction(transactions[t]);
  389.     return NDBT_FAILED;
  390.   }
  391.   cIndex++;
  392.   cRecords++;
  393. } // For each operation
  394.     
  395. // Let's prepare...
  396. transactions[t]->executeAsynchPrepare(theType, &asynchCallback, 
  397. this);
  398. cTrans++;
  399. if (cRecords >= records) {
  400.   // No more transactions needed
  401.   break;
  402. }      
  403.       } // For each transaction
  404.       // Wait for all outstanding transactions
  405.       pNdb->sendPollNdb(3000, 0, 0);
  406.       // ugly... it's starts to resemble flexXXX ...:(
  407.       switch (theOperation) {
  408.       case NO_READ:
  409. // Verify the data!
  410. for (r = 0; r < trans*operations; r++) {
  411.   if (calc.verifyRowValues(rows[r]) != 0) {
  412.     g_info << "|- Verify failed..." << endl;
  413.     // Close all transactions
  414.     for (int t = 0; t < cTrans; t++) {
  415.       pNdb->closeTransaction(transactions[t]);
  416.     }
  417.     return NDBT_FAILED;
  418.   }
  419. }
  420. break;
  421.       case NO_INSERT:
  422.       case NO_UPDATE:
  423.       case NO_DELETE:
  424. break;
  425.       }
  426.       // Close all transactions
  427.       for (t = 0; t < cTrans; t++) {
  428. pNdb->closeTransaction(transactions[t]);
  429.       }
  430.     } // while (cRecords < records*batch)
  431.   } // For each batch
  432.   deallocTransactions();
  433.   return NDBT_OK;
  434. }