flexAsynch.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:30k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #include "NdbApi.hpp"
  14. #include <NdbSchemaCon.hpp>
  15. #include <NdbMain.h>
  16. #include <md5_hash.hpp>
  17. #include <NdbThread.h>
  18. #include <NdbSleep.h>
  19. #include <NdbTick.h>
  20. #include <NdbOut.hpp>
  21. #include <NdbTimer.hpp>
  22. #include <NDBT_Error.hpp>
  23. #include <NdbTest.hpp>
  24. #define MAX_PARTS 4 
  25. #define MAX_SEEK 16 
  26. #define MAXSTRLEN 16 
  27. #define MAXATTR 64
  28. #define MAXTABLES 64
  29. #define MAXTHREADS 128
  30. #define MAXPAR 1024
  31. #define MAXATTRSIZE 1000
  32. #define PKSIZE 2
  33. enum StartType { 
  34.   stIdle, 
  35.   stInsert,
  36.   stRead,
  37.   stUpdate,
  38.   stDelete, 
  39.   stStop 
  40. } ;
  41. extern "C" { static void* threadLoop(void*); }
  42. static void setAttrNames(void);
  43. static void setTableNames(void);
  44. static int readArguments(int argc, const char** argv);
  45. static int createTables(Ndb*);
  46. static void defineOperation(NdbConnection* aTransObject, StartType aType, 
  47.                             Uint32 base, Uint32 aIndex);
  48. static void execute(StartType aType);
  49. static bool executeThread(StartType aType, Ndb* aNdbObject, unsigned int);
  50. static void executeCallback(int result, NdbConnection* NdbObject,
  51.                             void* aObject);
  52. static bool error_handler(const NdbError & err);
  53. static Uint32 getKey(Uint32, Uint32) ;
  54. static void input_error();
  55. static int                              retry_opt = 3 ;
  56. static int                              failed = 0 ;
  57.                 
  58. ErrorData * flexAsynchErrorData;                        
  59. struct ThreadNdb
  60. {
  61.   int NoOfOps;
  62.   int ThreadNo;
  63. };
  64. static NdbThread*               threadLife[MAXTHREADS];
  65. static int                              tNodeId;
  66. static int                              ThreadReady[MAXTHREADS];
  67. static StartType                ThreadStart[MAXTHREADS];
  68. static char                             tableName[MAXTABLES][MAXSTRLEN+1];
  69. static char                             attrName[MAXATTR][MAXSTRLEN+1];
  70. // Program Parameters
  71. static bool                             tLocal = false;
  72. static int                              tLocalPart = 0;
  73. static int                              tSendForce = 0;
  74. static int                              tNoOfLoops = 1;
  75. static int                              tAttributeSize = 1;
  76. static unsigned int             tNoOfThreads = 1;
  77. static unsigned int             tNoOfParallelTrans = 32;
  78. static unsigned int             tNoOfAttributes = 25;
  79. static unsigned int             tNoOfTransactions = 500;
  80. static unsigned int             tNoOfOpsPerTrans = 1;
  81. static unsigned int             tLoadFactor = 80;
  82. static bool                     tempTable = false;
  83. static bool                     startTransGuess = true;
  84. //Program Flags
  85. static int                              theTestFlag = 0;
  86. static int                              theSimpleFlag = 0;
  87. static int                              theDirtyFlag = 0;
  88. static int                              theWriteFlag = 0;
  89. static int                              theStdTableNameFlag = 0;
  90. static int                              theTableCreateFlag = 0;
  91. #define START_REAL_TIME
  92. #define STOP_REAL_TIME
  93. #define START_TIMER { NdbTimer timer; timer.doStart();
  94. #define STOP_TIMER timer.doStop();
  95. #define PRINT_TIMER(text, trans, opertrans) timer.printTransactionStatistics(text, trans, opertrans); }; 
  96. static void 
  97. resetThreads(){
  98.   for (int i = 0; i < tNoOfThreads ; i++) {
  99.     ThreadReady[i] = 0;
  100.     ThreadStart[i] = stIdle;
  101.   }//for
  102. }
  103. static void 
  104. waitForThreads(void)
  105. {
  106.   int cont = 0;
  107.   do {
  108.     cont = 0;
  109.     NdbSleep_MilliSleep(20);
  110.     for (int i = 0; i < tNoOfThreads ; i++) {
  111.       if (ThreadReady[i] == 0) {
  112.         cont = 1;
  113.       }//if
  114.     }//for
  115.   } while (cont == 1);
  116. }
  117. static void 
  118. tellThreads(StartType what)
  119. {
  120.   for (int i = 0; i < tNoOfThreads ; i++) 
  121.     ThreadStart[i] = what;
  122. }
  123. NDB_COMMAND(flexAsynch, "flexAsynch", "flexAsynch", "flexAsynch", 65535)
  124. {
  125.   ndb_init();
  126.   ThreadNdb*            pThreadData;
  127.   int                   tLoops=0, i;
  128.   int                   returnValue = NDBT_OK;
  129.   flexAsynchErrorData = new ErrorData;
  130.   flexAsynchErrorData->resetErrorCounters();
  131.   if (readArguments(argc, argv) != 0){
  132.     input_error();
  133.     return NDBT_ProgramExit(NDBT_WRONGARGS);
  134.   }
  135.   pThreadData = new ThreadNdb[MAXTHREADS];
  136.   ndbout << endl << "FLEXASYNCH - Starting normal mode" << endl;
  137.   ndbout << "Perform benchmark of insert, update and delete transactions";
  138.   ndbout << endl;
  139.   ndbout << "  " << tNoOfThreads << " number of concurrent threads " << endl;
  140.   ndbout << "  " << tNoOfParallelTrans;
  141.   ndbout << " number of parallel operation per thread " << endl;
  142.   ndbout << "  " << tNoOfTransactions << " transaction(s) per round " << endl;
  143.   ndbout << "  " << tNoOfLoops << " iterations " << endl;
  144.   ndbout << "  " << "Load Factor is " << tLoadFactor << "%" << endl;
  145.   ndbout << "  " << tNoOfAttributes << " attributes per table " << endl;
  146.   ndbout << "  " << tAttributeSize;
  147.   ndbout << " is the number of 32 bit words per attribute " << endl;
  148.   if (tempTable == true) {
  149.     ndbout << "  Tables are without logging " << endl;
  150.   } else {
  151.     ndbout << "  Tables are with logging " << endl;
  152.   }//if
  153.   if (startTransGuess == true) {
  154.     ndbout << "  Transactions are executed with hint provided" << endl;
  155.   } else {
  156.     ndbout << "  Transactions are executed with round robin scheme" << endl;
  157.   }//if
  158.   if (tSendForce == 0) {
  159.     ndbout << "  No force send is used, adaptive algorithm used" << endl;
  160.   } else if (tSendForce == 1) {
  161.     ndbout << "  Force send used" << endl;
  162.   } else {
  163.     ndbout << "  No force send is used, adaptive algorithm disabled" << endl;
  164.   }//if
  165.   ndbout << endl;
  166.   NdbThread_SetConcurrencyLevel(2 + tNoOfThreads);
  167.   /* print Setting */
  168.   flexAsynchErrorData->printSettings(ndbout);
  169.   setAttrNames();
  170.   setTableNames();
  171.   Ndb * pNdb = new Ndb("TEST_DB");      
  172.   pNdb->init();
  173.   tNodeId = pNdb->getNodeId();
  174.   ndbout << "  NdbAPI node with id = " << pNdb->getNodeId() << endl;
  175.   ndbout << endl;
  176.   
  177.   ndbout << "Waiting for ndb to become ready..." <<endl;
  178.   if (pNdb->waitUntilReady(10000) != 0){
  179.     ndbout << "NDB is not ready" << endl;
  180.     ndbout << "Benchmark failed!" << endl;
  181.     returnValue = NDBT_FAILED;
  182.   }
  183.   if(returnValue == NDBT_OK){
  184.     if (createTables(pNdb) != 0){
  185.       returnValue = NDBT_FAILED;
  186.     }
  187.   }
  188.   if(returnValue == NDBT_OK){
  189.     /****************************************************************
  190.      *  Create NDB objects.                                   *
  191.      ****************************************************************/
  192.     resetThreads();
  193.     for (int i = 0; i < tNoOfThreads ; i++) {
  194.       pThreadData[i].ThreadNo = i
  195. ;
  196.       threadLife[i] = NdbThread_Create(threadLoop,
  197.                                        (void**)&pThreadData[i],
  198.                                        32768,
  199.                                        "flexAsynchThread",
  200.                                        NDB_THREAD_PRIO_LOW);
  201.     }//for
  202.     ndbout << endl <<  "All NDB objects and table created" << endl << endl;
  203.     int noOfTransacts = tNoOfParallelTrans*tNoOfTransactions*tNoOfThreads;
  204.     /****************************************************************
  205.      * Execute program.                                             *
  206.      ****************************************************************/
  207.     for(;;) {
  208.       int loopCount = tLoops + 1 ;
  209.       ndbout << endl << "Loop # " << loopCount  << endl << endl ;
  210.       /****************************************************************
  211.        * Perform inserts.                                             *
  212.        ****************************************************************/
  213.           
  214.       failed = 0 ;
  215.       START_TIMER;
  216.       execute(stInsert);
  217.       STOP_TIMER;
  218.       PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans);
  219.       if (0 < failed) {
  220.         i = retry_opt ;
  221.         int ci = 1 ;
  222.         while (0 < failed && 0 < i){
  223.           ndbout << failed << " of the transactions returned errors!" 
  224.                  << endl << endl;
  225.           ndbout << "Attempting to redo the failed transactions now..." 
  226.                  << endl ;
  227.           ndbout << "Redo attempt " << ci <<" out of " << retry_opt 
  228.                  << endl << endl;
  229.           failed = 0 ;
  230.           START_TIMER;
  231.           execute(stInsert);
  232.           STOP_TIMER;
  233.           PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans);
  234.           i-- ;
  235.           ci++;
  236.         }
  237.         if(0 == failed ){
  238.           ndbout << endl <<"Redo attempt succeeded" << endl << endl;
  239.         }else{
  240.           ndbout << endl <<"Redo attempt failed, moving on now..." << endl 
  241.                  << endl;
  242.         }//if
  243.       }//if
  244.           
  245.       /****************************************************************
  246.        * Perform read.                                                *
  247.        ****************************************************************/
  248.       
  249.       failed = 0 ;
  250.       START_TIMER;
  251.       execute(stRead);
  252.       STOP_TIMER;
  253.       PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
  254.       if (0 < failed) {
  255.         i = retry_opt ;
  256.         int cr = 1;
  257.         while (0 < failed && 0 < i){
  258.           ndbout << failed << " of the transactions returned errors!"<<endl ;
  259.           ndbout << endl;
  260.           ndbout <<"Attempting to redo the failed transactions now..." << endl;
  261.           ndbout << endl;
  262.           ndbout <<"Redo attempt " << cr <<" out of ";
  263.           ndbout << retry_opt << endl << endl;
  264.           failed = 0 ;
  265.           START_TIMER;
  266.           execute(stRead);
  267.           STOP_TIMER;
  268.           PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
  269.           i-- ;
  270.           cr++ ;
  271.         }//while
  272.         if(0 == failed ) {
  273.           ndbout << endl <<"Redo attempt succeeded" << endl << endl ;
  274.         }else{
  275.           ndbout << endl <<"Redo attempt failed, moving on now..." << endl << endl ;
  276.         }//if
  277.       }//if
  278.           
  279.           
  280.       /****************************************************************
  281.        * Perform update.                                              *
  282.        ****************************************************************/
  283.       
  284.       failed = 0 ;
  285.           
  286.       START_TIMER;
  287.       execute(stUpdate);
  288.       STOP_TIMER;
  289.       PRINT_TIMER("update", noOfTransacts, tNoOfOpsPerTrans) ;
  290.       if (0 < failed) {
  291.         i = retry_opt ;
  292.         int cu = 1 ;
  293.         while (0 < failed && 0 < i){
  294.           ndbout << failed << " of the transactions returned errors!"<<endl ;
  295.           ndbout << endl;
  296.           ndbout <<"Attempting to redo the failed transactions now..." << endl;
  297.           ndbout << endl <<"Redo attempt " << cu <<" out of ";
  298.           ndbout << retry_opt << endl << endl;
  299.           failed = 0 ;
  300.           START_TIMER;
  301.           execute(stUpdate);
  302.           STOP_TIMER;
  303.           PRINT_TIMER("update", noOfTransacts, tNoOfOpsPerTrans);
  304.           i-- ;
  305.           cu++ ;
  306.         }//while
  307.         if(0 == failed ){
  308.           ndbout << endl <<"Redo attempt succeeded" << endl << endl;
  309.         } else {
  310.           ndbout << endl;
  311.           ndbout <<"Redo attempt failed, moving on now..." << endl << endl;
  312.         }//if
  313.       }//if
  314.           
  315.       /****************************************************************
  316.        * Perform read.                                             *
  317.        ****************************************************************/
  318.       
  319.       failed = 0 ;
  320.           
  321.       START_TIMER;
  322.       execute(stRead);
  323.       STOP_TIMER;
  324.       PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
  325.       if (0 < failed) {
  326.         i = retry_opt ;
  327.         int cr2 = 1 ;
  328.         while (0 < failed && 0 < i){
  329.           ndbout << failed << " of the transactions returned errors!"<<endl ;
  330.           ndbout << endl;
  331.           ndbout <<"Attempting to redo the failed transactions now..." << endl;
  332.           ndbout << endl <<"Redo attempt " << cr2 <<" out of ";
  333.           ndbout << retry_opt << endl << endl;
  334.           failed = 0 ;
  335.           START_TIMER;
  336.           execute(stRead);
  337.           STOP_TIMER;
  338.           PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
  339.           i-- ;
  340.           cr2++ ;
  341.         }//while
  342.         if(0 == failed ){
  343.           ndbout << endl <<"Redo attempt succeeded" << endl << endl;
  344.         }else{
  345.           ndbout << endl;
  346.           ndbout << "Redo attempt failed, moving on now..." << endl << endl;
  347.         }//if
  348.       }//if
  349.           
  350.       /****************************************************************
  351.        * Perform delete.                                              *
  352.        ****************************************************************/
  353.       
  354.       failed = 0 ;
  355.           
  356.       START_TIMER;
  357.       execute(stDelete);
  358.       STOP_TIMER;
  359.       PRINT_TIMER("delete", noOfTransacts, tNoOfOpsPerTrans);
  360.       if (0 < failed) {
  361.         i = retry_opt ;
  362.         int cd = 1 ;
  363.         while (0 < failed && 0 < i){
  364.           ndbout << failed << " of the transactions returned errors!"<< endl ;
  365.           ndbout << endl;
  366.           ndbout <<"Attempting to redo the failed transactions now:" << endl ;
  367.           ndbout << endl <<"Redo attempt " << cd <<" out of ";
  368.           ndbout << retry_opt << endl << endl;
  369.           failed = 0 ;
  370.           START_TIMER;
  371.           execute(stDelete);
  372.           STOP_TIMER;
  373.           PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
  374.           i-- ;
  375.           cd++ ;
  376.         }//while
  377.         if(0 == failed ){
  378.           ndbout << endl <<"Redo attempt succeeded" << endl << endl ;
  379.         }else{
  380.           ndbout << endl;
  381.           ndbout << "Redo attempt failed, moving on now..." << endl << endl;
  382.         }//if
  383.       }//if
  384.           
  385.       tLoops++;
  386.       ndbout << "--------------------------------------------------" << endl;
  387.     
  388.       if(tNoOfLoops != 0){
  389.         if(tNoOfLoops <= tLoops)
  390.           break ;
  391.       }
  392.     }//for
  393.     
  394.     execute(stStop);
  395.     void * tmp;
  396.     for(i = 0; i<tNoOfThreads; i++){
  397.       NdbThread_WaitFor(threadLife[i], &tmp);
  398.       NdbThread_Destroy(&threadLife[i]);
  399.     }
  400.   } 
  401.   delete [] pThreadData;
  402.   delete pNdb;
  403.   //printing errorCounters
  404.   flexAsynchErrorData->printErrorCounters(ndbout);
  405.   return NDBT_ProgramExit(returnValue);
  406. }//main()
  407. static void execute(StartType aType)
  408. {
  409.   resetThreads();
  410.   tellThreads(aType);
  411.   waitForThreads();
  412. }//execute()
  413. static void*
  414. threadLoop(void* ThreadData)
  415. {
  416.   Ndb* localNdb;
  417.   StartType tType;
  418.   ThreadNdb* tabThread = (ThreadNdb*)ThreadData;
  419.   int threadNo = tabThread->ThreadNo;
  420.   localNdb = new Ndb("TEST_DB");
  421.   localNdb->init(1024);
  422.   localNdb->waitUntilReady(10000);
  423.   unsigned int threadBase = (threadNo << 16) + tNodeId ;
  424.   
  425.   for (;;){
  426.     while (ThreadStart[threadNo] == stIdle) {
  427.       NdbSleep_MilliSleep(10);
  428.     }//while
  429.     // Check if signal to exit is received
  430.     if (ThreadStart[threadNo] == stStop) {
  431.       break;
  432.     }//if
  433.     tType = ThreadStart[threadNo];
  434.     ThreadStart[threadNo] = stIdle;
  435.     if(!executeThread(tType, localNdb, threadBase)){
  436.       break;
  437.     }
  438.     ThreadReady[threadNo] = 1;
  439.   }//for
  440.   delete localNdb;
  441.   ThreadReady[threadNo] = 1;
  442.   return NULL;
  443. }//threadLoop()
  444. static 
  445. bool
  446. executeThread(StartType aType, Ndb* aNdbObject, unsigned int threadBase) {
  447.   int i, j, k;
  448.   NdbConnection* tConArray[1024];
  449.   unsigned int tBase;
  450.   unsigned int tBase2;
  451.   for (i = 0; i < tNoOfTransactions; i++) {
  452.     if (tLocal == false) {
  453.       tBase = i * tNoOfParallelTrans * tNoOfOpsPerTrans;
  454.     } else {
  455.       tBase = i * tNoOfParallelTrans * MAX_SEEK;
  456.     }//if
  457.     START_REAL_TIME;
  458.     for (j = 0; j < tNoOfParallelTrans; j++) {
  459.       if (tLocal == false) {
  460.         tBase2 = tBase + (j * tNoOfOpsPerTrans);
  461.       } else {
  462.         tBase2 = tBase + (j * MAX_SEEK);
  463.         tBase2 = getKey(threadBase, tBase2);
  464.       }//if
  465.       if (startTransGuess == true) {
  466.         Uint64 Tkey64;
  467.         Uint32* Tkey32 = (Uint32*)&Tkey64;
  468.         Tkey32[0] = threadBase;
  469.         Tkey32[1] = tBase2;
  470.         tConArray[j] = aNdbObject->startTransaction((Uint32)0, //Priority
  471.                                          (const char*)&Tkey64, //Main PKey
  472.                                          (Uint32)4);           //Key Length
  473.       } else {
  474.         tConArray[j] = aNdbObject->startTransaction();
  475.       }//if
  476.       if (tConArray[j] == NULL && 
  477.           !error_handler(aNdbObject->getNdbError()) ){
  478.         ndbout << endl << "Unable to recover! Quiting now" << endl ;
  479.         return false;
  480.       }//if
  481.       
  482.       for (k = 0; k < tNoOfOpsPerTrans; k++) {
  483.         //-------------------------------------------------------
  484.         // Define the operation, but do not execute it yet.
  485.         //-------------------------------------------------------
  486.         defineOperation(tConArray[j], aType, threadBase, (tBase2 + k));
  487.       }//for
  488.       
  489.       tConArray[j]->executeAsynchPrepare(Commit, &executeCallback, NULL);
  490.     }//for
  491.     STOP_REAL_TIME;
  492.     //-------------------------------------------------------
  493.     // Now we have defined a set of operations, it is now time
  494.     // to execute all of them.
  495.     //-------------------------------------------------------
  496.     int Tcomp = aNdbObject->sendPollNdb(3000, 0, 0);
  497.     while (Tcomp < tNoOfParallelTrans) {
  498.       int TlocalComp = aNdbObject->pollNdb(3000, 0);
  499.       Tcomp += TlocalComp;
  500.     }//while
  501.     for (j = 0 ; j < tNoOfParallelTrans ; j++) {
  502.       aNdbObject->closeTransaction(tConArray[j]);
  503.     }//for
  504.   }//for
  505.   return true;
  506. }//executeThread()
  507. static 
  508. Uint32
  509. getKey(Uint32 aBase, Uint32 anIndex) {
  510.   Uint32 Tfound = anIndex;
  511.   Uint64 Tkey64;
  512.   Uint32* Tkey32 = (Uint32*)&Tkey64;
  513.   Tkey32[0] = aBase;
  514.   Uint32 hash;
  515.   for (Uint32 i = anIndex; i < (anIndex + MAX_SEEK); i++) {
  516.     Tkey32[1] = (Uint32)i;
  517.     hash = md5_hash((Uint64*)&Tkey64, (Uint32)2);
  518.     hash = (hash >> 6) & (MAX_PARTS - 1);
  519.     if (hash == tLocalPart) {
  520.       Tfound = i;
  521.       break;
  522.     }//if
  523.   }//for
  524.   return Tfound;
  525. }//getKey()
  526. static void
  527. executeCallback(int result, NdbConnection* NdbObject, void* aObject)
  528. {
  529.   if (result == -1) {
  530.               // Add complete error handling here
  531.               int retCode = flexAsynchErrorData->handleErrorCommon(NdbObject->getNdbError());
  532.               if (retCode == 1) {
  533. if (NdbObject->getNdbError().code != 626 && NdbObject->getNdbError().code != 630){
  534.                 ndbout_c("execute: %s", NdbObject->getNdbError().message);
  535.                 ndbout_c("Error code = %d", NdbObject->getNdbError().code);}
  536.               } else if (retCode == 2) {
  537.                 ndbout << "4115 should not happen in flexAsynch" << endl;
  538.               } else if (retCode == 3) {
  539. /* What can we do here? */
  540.                 ndbout_c("execute: %s", NdbObject->getNdbError().message);
  541.                  }//if(retCode == 3)
  542.       //    ndbout << "Error occured in poll:" << endl;
  543.       //    ndbout << NdbObject->getNdbError() << endl;
  544.     failed++ ;
  545.     return;
  546.   }//if
  547.   return;
  548. }//executeCallback()
  549. static void
  550. defineOperation(NdbConnection* localNdbConnection, StartType aType,
  551.                 Uint32 threadBase, Uint32 aIndex)
  552. {
  553.   NdbOperation*  localNdbOperation;
  554.   unsigned int   loopCountAttributes = tNoOfAttributes;
  555.   unsigned int   countAttributes;
  556.   Uint32         attrValue[MAXATTRSIZE];
  557.   //-------------------------------------------------------
  558.   // Set-up the attribute values for this operation.
  559.   //-------------------------------------------------------
  560.   attrValue[0] = threadBase;
  561.   attrValue[1] = aIndex;
  562.   for (int k = 2; k < loopCountAttributes; k++) {
  563.     attrValue[k] = aIndex;
  564.   }//for
  565.   localNdbOperation = localNdbConnection->getNdbOperation(tableName[0]);        
  566.   if (localNdbOperation == NULL) {
  567.     error_handler(localNdbConnection->getNdbError());
  568.   }//if
  569.   switch (aType) {
  570.   case stInsert: {   // Insert case
  571.     if (theWriteFlag == 1 && theDirtyFlag == 1) {
  572.       localNdbOperation->dirtyWrite();
  573.     } else if (theWriteFlag == 1) {
  574.       localNdbOperation->writeTuple();
  575.     } else {
  576.       localNdbOperation->insertTuple();
  577.     }//if
  578.     break;
  579.   }//case
  580.   case stRead: {     // Read Case
  581.     if (theSimpleFlag == 1) {
  582.       localNdbOperation->simpleRead();
  583.     } else if (theDirtyFlag == 1) {
  584.       localNdbOperation->dirtyRead();
  585.     } else {
  586.       localNdbOperation->readTuple();
  587.     }//if
  588.     break;
  589.   }//case
  590.   case stUpdate: {    // Update Case
  591.     if (theWriteFlag == 1 && theDirtyFlag == 1) {
  592.       localNdbOperation->dirtyWrite();
  593.     } else if (theWriteFlag == 1) {
  594.       localNdbOperation->writeTuple();
  595.     } else if (theDirtyFlag == 1) {
  596.       localNdbOperation->dirtyUpdate();
  597.     } else {
  598.       localNdbOperation->updateTuple();
  599.     }//if
  600.     break;
  601.   }//case
  602.   case stDelete: {   // Delete Case
  603.     localNdbOperation->deleteTuple();
  604.     break;
  605.   }//case
  606.   default: {
  607.     error_handler(localNdbOperation->getNdbError()); 
  608.   }//default
  609.   }//switch
  610.   localNdbOperation->equal((Uint32)0,(char*)&attrValue[0]);
  611.   switch (aType) {
  612.   case stInsert:      // Insert case
  613.   case stUpdate:      // Update Case
  614.     {
  615.       for (countAttributes = 1;
  616.            countAttributes < loopCountAttributes; countAttributes++) {
  617.         localNdbOperation->setValue(countAttributes, 
  618.                                     (char*)&attrValue[0]);
  619.       }//for
  620.       break;
  621.     }//case
  622.   case stRead: {      // Read Case
  623.     for (countAttributes = 1;
  624.          countAttributes < loopCountAttributes; countAttributes++) {
  625.       localNdbOperation->getValue(countAttributes, 
  626.                                   (char*)&attrValue[0]);
  627.     }//for
  628.     break;
  629.   }//case
  630.   case stDelete: {    // Delete Case
  631.     break;
  632.   }//case
  633.   default: {
  634.     //goto error_handler; < epaulsa
  635.     error_handler(localNdbOperation->getNdbError());
  636.   }//default
  637.   }//switch
  638.   return;
  639. }//defineOperation()
  640. static void setAttrNames()
  641. {
  642.   int i;
  643.   for (i = 0; i < MAXATTR ; i++){
  644.     BaseString::snprintf(attrName[i], MAXSTRLEN, "COL%d", i);
  645.   }
  646. }
  647. static void setTableNames()
  648. {
  649.   // Note! Uses only uppercase letters in table name's
  650.   // so that we can look at the tables wits SQL
  651.   int i;
  652.   for (i = 0; i < MAXTABLES ; i++){
  653.     if (theStdTableNameFlag==0){
  654.       BaseString::snprintf(tableName[i], MAXSTRLEN, "TAB%d_%d", i, 
  655.                (int)(NdbTick_CurrentMillisecond()/1000));
  656.     } else {
  657.       BaseString::snprintf(tableName[i], MAXSTRLEN, "TAB%d", i);
  658.     }
  659.   }
  660. }
  661. static
  662. int 
  663. createTables(Ndb* pMyNdb){
  664.   NdbSchemaCon          *MySchemaTransaction;
  665.   NdbSchemaOp           *MySchemaOp;
  666.   int                   check;
  667.   if (theTableCreateFlag == 0) {
  668.     for(int i=0; i < 1 ;i++) {
  669.       ndbout << "Creating " << tableName[i] << "..." << endl;
  670.       MySchemaTransaction = NdbSchemaCon::startSchemaTrans(pMyNdb);
  671.       
  672.       if(MySchemaTransaction == NULL && 
  673.          (!error_handler(MySchemaTransaction->getNdbError())))
  674.         return -1;
  675.       
  676.       MySchemaOp = MySchemaTransaction->getNdbSchemaOp();       
  677.       if(MySchemaOp == NULL &&
  678.          (!error_handler(MySchemaTransaction->getNdbError())))
  679.         return -1;
  680.       check = MySchemaOp->createTable( tableName[i]
  681.                                        ,8                       // Table Size
  682.                                        ,TupleKey                // Key Type
  683.                                        ,40                      // Nr of Pages
  684.                                        ,All
  685.                                        ,6
  686.                                        ,(tLoadFactor - 5)
  687.                                        ,(tLoadFactor)
  688.                                        ,1
  689.                                        ,!tempTable
  690.                                        );
  691.       
  692.       if (check == -1 &&
  693.           (!error_handler(MySchemaTransaction->getNdbError())))
  694.         return -1;
  695.       
  696.       check = MySchemaOp->createAttribute( (char*)attrName[0],
  697.                                            TupleKey,
  698.                                            32,
  699.                                            PKSIZE,
  700.                                            UnSigned,
  701.                                            MMBased,
  702.                                            NotNullAttribute );
  703.       
  704.       if (check == -1 &&
  705.           (!error_handler(MySchemaTransaction->getNdbError())))
  706.         return -1;
  707.       for (int j = 1; j < tNoOfAttributes ; j++){
  708.         check = MySchemaOp->createAttribute( (char*)attrName[j],
  709.                                              NoKey,
  710.                                              32,
  711.                                              tAttributeSize,
  712.                                              UnSigned,
  713.                                              MMBased,
  714.                                              NotNullAttribute );
  715.         if (check == -1 &&
  716.             (!error_handler(MySchemaTransaction->getNdbError())))
  717.           return -1;
  718.       }
  719.       
  720.       if (MySchemaTransaction->execute() == -1 &&
  721.           (!error_handler(MySchemaTransaction->getNdbError())))
  722.         return -1;
  723.       
  724.       NdbSchemaCon::closeSchemaTrans(MySchemaTransaction);
  725.     }
  726.   }
  727.   
  728.   return 0;
  729. }
  730. static
  731. bool error_handler(const NdbError & err){
  732.   ndbout << err << endl ;
  733.   switch(err.classification){
  734.   case NdbError::TemporaryResourceError:
  735.   case NdbError::OverloadError:
  736.   case NdbError::SchemaError:
  737.     ndbout << endl << "Attempting to recover and continue now..." << endl ;
  738.     return true;
  739.   }
  740.   return false ; // return false to abort
  741. }
  742. static
  743. bool error_handler(const char* error_string, int error_int) {
  744.   ndbout << error_string << endl ;
  745.   if ((4008 == error_int) ||
  746.       (721 == error_int) ||
  747.       (266 == error_int)){
  748.     ndbout << endl << "Attempting to recover and continue now..." << endl ;
  749.     return true ; // return true to retry
  750.   }
  751.   return false ; // return false to abort
  752. }
  753. static
  754. int 
  755. readArguments(int argc, const char** argv){
  756.   
  757.   int i = 1;
  758.   while (argc > 1){
  759.     if (strcmp(argv[i], "-t") == 0){
  760.       tNoOfThreads = atoi(argv[i+1]);
  761.       if ((tNoOfThreads < 1) || (tNoOfThreads > MAXTHREADS)){
  762. ndbout_c("Invalid no of threads");
  763.         return -1;
  764.       }
  765.     } else if (strcmp(argv[i], "-p") == 0){
  766.       tNoOfParallelTrans = atoi(argv[i+1]);
  767.       if ((tNoOfParallelTrans < 1) || (tNoOfParallelTrans > MAXPAR)){
  768. ndbout_c("Invalid no of parallell transactions");
  769.         return -1;
  770.       }
  771.     } else if (strcmp(argv[i], "-load_factor") == 0){
  772.       tLoadFactor = atoi(argv[i+1]);
  773.       if ((tLoadFactor < 40) || (tLoadFactor > 99)){
  774. ndbout_c("Invalid load factor");
  775.         return -1;
  776.       }
  777.     } else if (strcmp(argv[i], "-c") == 0) {
  778.       tNoOfOpsPerTrans = atoi(argv[i+1]);
  779.       if (tNoOfOpsPerTrans < 1){
  780. ndbout_c("Invalid no of operations per transaction");
  781.         return -1;
  782.       }
  783.     } else if (strcmp(argv[i], "-o") == 0) {
  784.       tNoOfTransactions = atoi(argv[i+1]);
  785.       if (tNoOfTransactions < 1){
  786. ndbout_c("Invalid no of transactions");
  787.         return -1;
  788.       }
  789.     } else if (strcmp(argv[i], "-a") == 0){
  790.       tNoOfAttributes = atoi(argv[i+1]);
  791.       if ((tNoOfAttributes < 2) || (tNoOfAttributes > MAXATTR)){
  792. ndbout_c("Invalid no of attributes");
  793.         return -1;
  794.       }
  795.     } else if (strcmp(argv[i], "-n") == 0){
  796.       theStdTableNameFlag = 1;
  797.       argc++;
  798.       i--;
  799.     } else if (strcmp(argv[i], "-l") == 0){
  800.       tNoOfLoops = atoi(argv[i+1]);
  801.       if ((tNoOfLoops < 0) || (tNoOfLoops > 100000)){
  802. ndbout_c("Invalid no of loops");
  803.         return -1;
  804.       }
  805.     } else if (strcmp(argv[i], "-s") == 0){
  806.       tAttributeSize = atoi(argv[i+1]);
  807.       if ((tAttributeSize < 1) || (tAttributeSize > MAXATTRSIZE)){
  808. ndbout_c("Invalid attributes size");
  809.         return -1;
  810.       }
  811.     } else if (strcmp(argv[i], "-local") == 0){
  812.       tLocalPart = atoi(argv[i+1]);
  813.       tLocal = true;
  814.       startTransGuess = true;
  815.       if ((tLocalPart < 0) || (tLocalPart > MAX_PARTS)){
  816. ndbout_c("Invalid local part");
  817.         return -1;
  818.       }
  819.     } else if (strcmp(argv[i], "-simple") == 0){
  820.       theSimpleFlag = 1;
  821.       argc++;
  822.       i--;
  823.     } else if (strcmp(argv[i], "-adaptive") == 0){
  824.       tSendForce = 0;
  825.       argc++;
  826.       i--;
  827.     } else if (strcmp(argv[i], "-force") == 0){
  828.       tSendForce = 1;
  829.       argc++;
  830.       i--;
  831.     } else if (strcmp(argv[i], "-non_adaptive") == 0){
  832.       tSendForce = 2;
  833.       argc++;
  834.       i--;
  835.     } else if (strcmp(argv[i], "-write") == 0){
  836.       theWriteFlag = 1;
  837.       argc++;
  838.       i--;
  839.     } else if (strcmp(argv[i], "-dirty") == 0){
  840.       theDirtyFlag = 1;
  841.       argc++;
  842.       i--;
  843.     } else if (strcmp(argv[i], "-test") == 0){
  844.       theTestFlag = 1;
  845.       argc++;
  846.       i--;
  847.     } else if (strcmp(argv[i], "-no_table_create") == 0){
  848.       theTableCreateFlag = 1;
  849.       argc++;
  850.       i--;
  851.     } else if (strcmp(argv[i], "-temp") == 0){
  852.       tempTable = true;
  853.       argc++;
  854.       i--;
  855.     } else if (strcmp(argv[i], "-no_hint") == 0){
  856.       startTransGuess = false;
  857.       argc++;
  858.       i--;
  859.     } else {
  860.       return -1;
  861.     }
  862.     
  863.     argc -= 2;
  864.     i = i + 2;
  865.   }//while
  866.   if (tLocal == true) {
  867.     if (tNoOfOpsPerTrans != 1) {
  868.       ndbout_c("Not valid to have more than one op per trans with local");
  869.     }//if
  870.     if (startTransGuess == false) {
  871.       ndbout_c("Not valid to use no_hint with local");
  872.     }//if
  873.   }//if
  874.   return 0;
  875. }
  876. static
  877. void
  878. input_error(){
  879.   
  880.   ndbout_c("FLEXASYNCH");
  881.   ndbout_c("   Perform benchmark of insert, update and delete transactions");
  882.   ndbout_c("");
  883.   ndbout_c("Arguments:");
  884.   ndbout_c("   -t Number of threads to start, default 1");
  885.   ndbout_c("   -p Number of parallel transactions per thread, default 32");
  886.   ndbout_c("   -o Number of transactions per loop, default 500");
  887.   ndbout_c("   -l Number of loops to run, default 1, 0=infinite");
  888.   ndbout_c("   -load_factor Number Load factor in index in percent (40 -> 99)");
  889.   ndbout_c("   -a Number of attributes, default 25");
  890.   ndbout_c("   -c Number of operations per transaction");
  891.   ndbout_c("   -s Size of each attribute, default 1 ");
  892.   ndbout_c("      (PK is always of size 1, independent of this value)");
  893.   ndbout_c("   -simple Use simple read to read from database");
  894.   ndbout_c("   -dirty Use dirty read to read from database");
  895.   ndbout_c("   -write Use writeTuple in insert and update");
  896.   ndbout_c("   -n Use standard table names");
  897.   ndbout_c("   -no_table_create Don't create tables in db");
  898.   ndbout_c("   -temp Create table(s) without logging");
  899.   ndbout_c("   -no_hint Don't give hint on where to execute transaction coordinator");
  900.   ndbout_c("   -adaptive Use adaptive send algorithm (default)");
  901.   ndbout_c("   -force Force send when communicating");
  902.   ndbout_c("   -non_adaptive Send at a 10 millisecond interval");
  903.   ndbout_c("   -local Number of part, only use keys in one part out of 16");
  904. }
  905.