flexTT.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:29k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #include <ndb_global.h>
  14. #include <NdbApi.hpp>
  15. #include <NdbSchemaCon.hpp>
  16. #include <NdbMain.h>
  17. #include <md5_hash.hpp>
  18. #include <NdbThread.h>
  19. #include <NdbSleep.h>
  20. #include <NdbTick.h>
  21. #include <NdbOut.hpp>
  22. #include <NdbTimer.hpp>
  23. #include <NdbTest.hpp>
  24. #include <NDBT_Error.hpp>
  25. #define MAX_PARTS 4 
  26. #define MAX_SEEK 16 
  27. #define MAXSTRLEN 16 
  28. #define MAXATTR 64
  29. #define MAXTABLES 64
  30. #define MAXTHREADS 128
  31. #define MAXPAR 1024
  32. #define MAXATTRSIZE 1000
  33. #define PKSIZE 1
  34. #ifdef NDB_WIN32
  35. inline long lrand48(void) { return rand(); };
  36. #endif
  37. enum StartType { 
  38.   stIdle, 
  39.   stInsert,
  40.   stRead,
  41.   stUpdate,
  42.   stDelete, 
  43.   stStop 
  44. } ;
  45. struct ThreadNdb
  46. {
  47.   int threadNo;
  48.   Ndb* threadNdb;
  49.   Uint32 threadBase;
  50.   Uint32 threadLoopCounter;
  51.   Uint32 threadNextStart;
  52.   Uint32 threadStop;
  53.   Uint32 threadLoopStop;
  54.   Uint32 threadIncrement;
  55.   Uint32 threadNoCompleted;
  56.   bool   threadCompleted;
  57.   StartType threadStartType;
  58. };
  59. struct TransNdb
  60. {
  61.   char transRecord[128];
  62.   Ndb* transNdb;
  63.   StartType  transStartType;
  64.   Uint32     vpn_number;
  65.   Uint32     vpn_identity;
  66.   Uint32     transErrorCount;
  67.   NdbOperation* transOperation;
  68.   ThreadNdb* transThread;
  69. };
  70. extern "C" { static void* threadLoop(void*); }
  71. static void setAttrNames(void);
  72. static void setTableNames(void);
  73. static int readArguments(int argc, const char** argv);
  74. static int createTables(Ndb*);
  75. static bool defineOperation(NdbConnection* aTransObject, TransNdb*,
  76.                             Uint32 vpn_nb, Uint32 vpn_id);
  77. static bool executeTransaction(TransNdb* transNdbRef);
  78. static StartType random_choice();
  79. static void execute(StartType aType);
  80. static bool executeThread(ThreadNdb*, TransNdb*);
  81. static void executeCallback(int result, NdbConnection* NdbObject,
  82.                             void* aObject);
  83. static bool error_handler(const NdbError & err) ;
  84. static Uint32 getKey(Uint32, Uint32) ;
  85. static void input_error();
  86.                                       
  87. ErrorData * flexTTErrorData;
  88. static NdbThread*                       threadLife[MAXTHREADS];
  89. static int                              tNodeId;
  90. static int                              ThreadReady[MAXTHREADS];
  91. static StartType                        ThreadStart[MAXTHREADS];
  92. static char                             tableName[1][MAXSTRLEN+1];
  93. static char                             attrName[5][MAXSTRLEN+1];
  94. // Program Parameters
  95. static bool                             tInsert = false;
  96. static bool                             tDelete = false;
  97. static bool                             tReadUpdate = true;
  98. static int                              tUpdateFreq = 20;
  99. static bool                             tLocal = false;
  100. static int                              tLocalPart = 0;
  101. static int                              tMinEvents = 0;
  102. static int                              tSendForce = 0;
  103. static int                              tNoOfLoops = 1;
  104. static Uint32                           tNoOfThreads = 1;
  105. static Uint32                           tNoOfParallelTrans = 32;
  106. static Uint32                           tNoOfTransactions = 500;
  107. static Uint32                           tLoadFactor = 80;
  108. static bool                             tempTable = false;
  109. static bool                             startTransGuess = true;
  110. //Program Flags
  111. static int                              theSimpleFlag = 0;
  112. static int                              theDirtyFlag = 0;
  113. static int                              theWriteFlag = 0;
  114. static int                              theTableCreateFlag = 1;
  115. #define START_REAL_TIME
  116. #define STOP_REAL_TIME
  117. #define START_TIMER { NdbTimer timer; timer.doStart();
  118. #define STOP_TIMER timer.doStop();
  119. #define PRINT_TIMER(text, trans, opertrans) timer.printTransactionStatistics(text, trans, opertrans); }; 
  120. static void 
  121. resetThreads(){
  122.   for (int i = 0; i < tNoOfThreads ; i++) {
  123.     ThreadReady[i] = 0;
  124.     ThreadStart[i] = stIdle;
  125.   }//for
  126. }
  127. static void 
  128. waitForThreads(void)
  129. {
  130.   int cont = 0;
  131.   do {
  132.     cont = 0;
  133.     NdbSleep_MilliSleep(20);
  134.     for (int i = 0; i < tNoOfThreads ; i++) {
  135.       if (ThreadReady[i] == 0) {
  136.         cont = 1;
  137.       }//if
  138.     }//for
  139.   } while (cont == 1);
  140. }
  141. static void 
  142. tellThreads(StartType what)
  143. {
  144.   for (int i = 0; i < tNoOfThreads ; i++) 
  145.     ThreadStart[i] = what;
  146. }
  147. NDB_COMMAND(flexTT, "flexTT", "flexTT", "flexTT", 65535)
  148. {
  149.   ndb_init();
  150.   ThreadNdb*            pThreadData;
  151.   int                   returnValue = NDBT_OK;
  152.   int i;
  153.   flexTTErrorData = new ErrorData;
  154.   flexTTErrorData->resetErrorCounters();
  155.   if (readArguments(argc, argv) != 0){
  156.     input_error();
  157.     return NDBT_ProgramExit(NDBT_WRONGARGS);
  158.   }
  159.   pThreadData = new ThreadNdb[MAXTHREADS];
  160.   ndbout << endl << "FLEXTT - Starting normal mode" << endl;
  161.   ndbout << "Perform TimesTen benchmark" << endl;
  162.   ndbout << "  " << tNoOfThreads << " number of concurrent threads " << endl;
  163.   ndbout << "  " << tNoOfParallelTrans;
  164.   ndbout << " number of parallel transaction per thread " << endl;
  165.   ndbout << "  " << tNoOfTransactions << " transaction(s) per round " << endl;
  166.   ndbout << "  " << tNoOfLoops << " iterations " << endl;
  167.   ndbout << "  " << "Update Frequency is " << tUpdateFreq << "%" << endl;
  168.   ndbout << "  " << "Load Factor is " << tLoadFactor << "%" << endl;
  169.   if (tLocal == true) {
  170.     ndbout << "  " << "We only use Local Part = ";
  171.     ndbout << tLocalPart << endl;
  172.   }//if
  173.   if (tempTable == true) {
  174.     ndbout << "  Tables are without logging " << endl;
  175.   } else {
  176.     ndbout << "  Tables are with logging " << endl;
  177.   }//if
  178.   if (startTransGuess == true) {
  179.     ndbout << "  Transactions are executed with hint provided" << endl;
  180.   } else {
  181.     ndbout << "  Transactions are executed with round robin scheme" << endl;
  182.   }//if
  183.   if (tSendForce == 0) {
  184.     ndbout << "  No force send is used, adaptive algorithm used" << endl;
  185.   } else if (tSendForce == 1) {
  186.     ndbout << "  Force send used" << endl;
  187.   } else {
  188.     ndbout << "  No force send is used, adaptive algorithm disabled" << endl;
  189.   }//if
  190.   ndbout << endl;
  191.   /* print Setting */
  192.   flexTTErrorData->printSettings(ndbout);
  193.   NdbThread_SetConcurrencyLevel(2 + tNoOfThreads);
  194.   setAttrNames();
  195.   setTableNames();
  196.   Ndb * pNdb = new Ndb("TEST_DB");      
  197.   pNdb->init();
  198.   tNodeId = pNdb->getNodeId();
  199.   ndbout << "  NdbAPI node with id = " << pNdb->getNodeId() << endl;
  200.   ndbout << endl;
  201.   
  202.   ndbout << "Waiting for ndb to become ready..." <<endl;
  203.   if (pNdb->waitUntilReady(2000) != 0){
  204.     ndbout << "NDB is not ready" << endl;
  205.     ndbout << "Benchmark failed!" << endl;
  206.     returnValue = NDBT_FAILED;
  207.   }
  208.   if(returnValue == NDBT_OK){
  209.     if (createTables(pNdb) != 0){
  210.       returnValue = NDBT_FAILED;
  211.     }
  212.   }
  213.   if(returnValue == NDBT_OK){
  214.     /****************************************************************
  215.      *  Create NDB objects.                                   *
  216.      ****************************************************************/
  217.     resetThreads();
  218.     for (i = 0; i < tNoOfThreads ; i++) {
  219.       pThreadData[i].threadNo = i;
  220.       threadLife[i] = NdbThread_Create(threadLoop,
  221.                                        (void**)&pThreadData[i],
  222.                                        32768,
  223.                                        "flexAsynchThread",
  224.                                        NDB_THREAD_PRIO_LOW);
  225.     }//for
  226.     ndbout << endl <<  "All NDB objects and table created" << endl << endl;
  227.     int noOfTransacts = tNoOfParallelTrans * tNoOfTransactions *
  228.                         tNoOfThreads * tNoOfLoops;
  229.     /****************************************************************
  230.      * Execute program.                                             *
  231.      ****************************************************************/
  232.     /****************************************************************
  233.      * Perform inserts.                                             *
  234.      ****************************************************************/
  235.           
  236.     if (tInsert == true) {
  237.       tInsert = false;
  238.       tReadUpdate = false;
  239.       START_TIMER;
  240.       execute(stInsert);
  241.       STOP_TIMER;
  242.       PRINT_TIMER("insert", noOfTransacts, 1);
  243.     }//if
  244.     /****************************************************************
  245.      * Perform read + updates.                                      *
  246.      ****************************************************************/
  247.       
  248.     if (tReadUpdate == true) {
  249.       START_TIMER;
  250.       execute(stRead);
  251.       STOP_TIMER;
  252.       PRINT_TIMER("update + read", noOfTransacts, 1);
  253.     }//if  
  254.     /****************************************************************
  255.      * Perform delete.                                              *
  256.      ****************************************************************/
  257.                 
  258.     if (tDelete == true) {
  259.       tDelete = false;
  260.       START_TIMER;
  261.       execute(stDelete);
  262.       STOP_TIMER;
  263.       PRINT_TIMER("delete", noOfTransacts, 1);
  264.     }//if
  265.     ndbout << "--------------------------------------------------" << endl;
  266.         
  267.     execute(stStop);
  268.     void * tmp;
  269.     for(i = 0; i<tNoOfThreads; i++){
  270.       NdbThread_WaitFor(threadLife[i], &tmp);
  271.       NdbThread_Destroy(&threadLife[i]);
  272.     }
  273.   } 
  274.   delete [] pThreadData;
  275.   delete pNdb;
  276.   //printing errorCounters
  277.   flexTTErrorData->printErrorCounters(ndbout);
  278.   return NDBT_ProgramExit(returnValue);
  279. }//main()
  280. static void execute(StartType aType)
  281. {
  282.   resetThreads();
  283.   tellThreads(aType);
  284.   waitForThreads();
  285. }//execute()
  286. static void*
  287. threadLoop(void* ThreadData)
  288. {
  289.   Ndb* localNdb;
  290.   ThreadNdb* tabThread = (ThreadNdb*)ThreadData;
  291.   int loc_threadNo = tabThread->threadNo;
  292.   void * mem = malloc(sizeof(TransNdb)*tNoOfParallelTrans);
  293.   TransNdb* pTransData = (TransNdb*)mem;
  294.   localNdb = new Ndb("TEST_DB");
  295.   localNdb->init(1024);
  296.   localNdb->waitUntilReady();
  297.   if (tLocal == false) {
  298.     tabThread->threadIncrement = 1;
  299.   } else {
  300.     tabThread->threadIncrement = MAX_SEEK;
  301.   }//if
  302.   tabThread->threadBase = (loc_threadNo << 16) + tNodeId;
  303.   tabThread->threadNdb = localNdb;
  304.   tabThread->threadStop = tNoOfParallelTrans * tNoOfTransactions;
  305.   tabThread->threadStop *= tabThread->threadIncrement;
  306.   tabThread->threadLoopStop = tNoOfLoops;
  307.   Uint32 i, j;
  308.   for (i = 0; i < tNoOfParallelTrans; i++) {
  309.     pTransData[i].transNdb = localNdb;    
  310.     pTransData[i].transThread = tabThread;    
  311.     pTransData[i].transOperation = NULL;    
  312.     pTransData[i].transStartType = stIdle;    
  313.     pTransData[i].vpn_number = tabThread->threadBase;    
  314.     pTransData[i].vpn_identity = 0;
  315.     pTransData[i].transErrorCount = 0;
  316.     for (j = 0; j < 128; j++) {
  317.       pTransData[i].transRecord[j] = 0x30;
  318.     }//for
  319.   }//for
  320.   for (;;){
  321.     while (ThreadStart[loc_threadNo] == stIdle) {
  322.       NdbSleep_MilliSleep(10);
  323.     }//while
  324.     // Check if signal to exit is received
  325.     if (ThreadStart[loc_threadNo] == stStop) {
  326.       break;
  327.     }//if
  328.     tabThread->threadStartType = ThreadStart[loc_threadNo];  
  329.     tabThread->threadLoopCounter = 0;
  330.     tabThread->threadCompleted = false;  
  331.     tabThread->threadNoCompleted = 0;
  332.     tabThread->threadNextStart = 0;
  333.     ThreadStart[loc_threadNo] = stIdle;
  334.     if(!executeThread(tabThread, pTransData)){
  335.       break;
  336.     }
  337.     ThreadReady[loc_threadNo] = 1;
  338.   }//for
  339.   free(mem);
  340.   delete localNdb;
  341.   ThreadReady[loc_threadNo] = 1;
  342.   return NULL; // Thread exits
  343. }//threadLoop()
  344. static 
  345. bool
  346. executeThread(ThreadNdb* tabThread, TransNdb* atransDataArrayPtr) {
  347.   Uint32 i;
  348.   for (i = 0; i < tNoOfParallelTrans; i++) {
  349.     TransNdb* transNdbPtr = &atransDataArrayPtr[i];
  350.     transNdbPtr->vpn_identity = i * tabThread->threadIncrement;
  351.     transNdbPtr->transStartType = tabThread->threadStartType;
  352.     if (executeTransaction(transNdbPtr) == false) {
  353.       return false;
  354.     }//if
  355.   }//for
  356.   tabThread->threadNextStart = tNoOfParallelTrans * tabThread->threadIncrement;
  357.   do {
  358.     tabThread->threadNdb->sendPollNdb(3000, tMinEvents, tSendForce);
  359.   } while (tabThread->threadCompleted == false);
  360.   return true;
  361. }//executeThread()
  362. static
  363. bool executeTransaction(TransNdb* transNdbRef)
  364. {
  365.   NdbConnection* MyTrans;
  366.   ThreadNdb* tabThread = transNdbRef->transThread;
  367.   Ndb* aNdbObject = transNdbRef->transNdb;
  368.   Uint32 threadBase = tabThread->threadBase;
  369.   Uint32 startKey = transNdbRef->vpn_identity;
  370.   if (tLocal == true) {
  371.     startKey = getKey(startKey, threadBase);
  372.   }//if
  373.   if (startTransGuess == true) {
  374.     Uint32 tKey[2];
  375.     tKey[0] = startKey;
  376.     tKey[1] = threadBase;
  377.     MyTrans = aNdbObject->startTransaction((Uint32)0, //Priority
  378.                                          (const char*)&tKey[0],   //Main PKey
  379.                                          (Uint32)8);           //Key Length
  380.   } else {
  381.    MyTrans = aNdbObject->startTransaction();
  382.   }//if
  383.   if (MyTrans == NULL) {
  384.     error_handler(aNdbObject->getNdbError());
  385.     ndbout << endl << "Unable to recover! Quiting now" << endl ;
  386.     return false;
  387.   }//if
  388.   //-------------------------------------------------------
  389.   // Define the operation, but do not execute it yet.
  390.   //-------------------------------------------------------
  391.   if (!defineOperation(MyTrans, transNdbRef, startKey, threadBase))
  392.     return false;
  393.   return true;
  394. }//executeTransaction()
  395. static 
  396. Uint32
  397. getKey(Uint32 aBase, Uint32 aThreadBase) {
  398.   Uint32 Tfound = aBase;
  399.   Uint32 hash;
  400.   Uint64 Tkey64;
  401.   Uint32* tKey32 = (Uint32*)&Tkey64;
  402.   tKey32[0] = aThreadBase;
  403.   for (int i = aBase; i < (aBase + MAX_SEEK); i++) {
  404.     tKey32[1] = (Uint32)i;
  405.     hash = md5_hash((Uint64*)&Tkey64, (Uint32)2);
  406.     hash = (hash >> 6) & (MAX_PARTS - 1);
  407.     if (hash == tLocalPart) {
  408.       Tfound = i;
  409.       break;
  410.     }//if
  411.   }//for
  412.   return Tfound;
  413. }//getKey()
  414. static void
  415. executeCallback(int result, NdbConnection* NdbObject, void* aObject)
  416. {
  417.   TransNdb* transNdbRef = (TransNdb*)aObject;
  418.   ThreadNdb* tabThread = transNdbRef->transThread;
  419.   Ndb* tNdb = transNdbRef->transNdb;
  420.   Uint32 vpn_id = transNdbRef->vpn_identity;
  421.   Uint32 vpn_nb = tabThread->threadBase;
  422.   if (result == -1) {
  423. // Add complete error handling here
  424.     int retCode = flexTTErrorData->handleErrorCommon(NdbObject->getNdbError());
  425.     if (retCode == 1) {
  426.       if (NdbObject->getNdbError().code != 626 &&
  427.           NdbObject->getNdbError().code != 630) {
  428.         ndbout_c("execute: %s", NdbObject->getNdbError().message);
  429.         ndbout_c("Error code = %d", NdbObject->getNdbError().code);
  430.       }
  431.     } else if (retCode == 2) {
  432.       ndbout << "4115 should not happen in flexTT" << endl;
  433.     } else if (retCode == 3) {
  434.       /* What can we do here? */
  435.       ndbout_c("execute: %s", NdbObject->getNdbError().message);
  436.     }//if(retCode == 3)
  437.     transNdbRef->transErrorCount++;
  438.     const NdbError & err = NdbObject->getNdbError();
  439.     switch (err.classification) {
  440.     case NdbError::NoDataFound:
  441.     case NdbError::ConstraintViolation:
  442.       ndbout << "Error with vpn_id = " << vpn_id << " and vpn_nb = ";
  443.       ndbout << vpn_nb << endl;
  444.       ndbout << err << endl;
  445.       goto checkCompleted;
  446.     case NdbError::OverloadError:
  447.       NdbSleep_MilliSleep(10);
  448.     case NdbError::NodeRecoveryError:
  449.     case NdbError::UnknownResultError:
  450.     case NdbError::TimeoutExpired:
  451.       break;
  452.     default:
  453.       goto checkCompleted;
  454.     }//if
  455.     if ((transNdbRef->transErrorCount > 10) ||
  456.         (tabThread->threadNoCompleted > 0)) {
  457.       goto checkCompleted;
  458.     }//if
  459.   } else {
  460.     if (tabThread->threadNoCompleted == 0) {
  461.       transNdbRef->transErrorCount = 0;
  462.       transNdbRef->vpn_identity = tabThread->threadNextStart;
  463.       if (tabThread->threadNextStart == tabThread->threadStop) {
  464.         tabThread->threadLoopCounter++;
  465.         transNdbRef->vpn_identity = 0;
  466.         tabThread->threadNextStart = 0;
  467.         if (tabThread->threadLoopCounter == tNoOfLoops) {
  468.           goto checkCompleted;
  469.         }//if
  470.       }//if
  471.       tabThread->threadNextStart += tabThread->threadIncrement;
  472.     } else {
  473.       goto checkCompleted;
  474.     }//if
  475.   }//if
  476.   tNdb->closeTransaction(NdbObject);
  477.   executeTransaction(transNdbRef);
  478.   return;
  479. checkCompleted:
  480.   tNdb->closeTransaction(NdbObject);
  481.   tabThread->threadNoCompleted++;
  482.   if (tabThread->threadNoCompleted == tNoOfParallelTrans) {
  483.     tabThread->threadCompleted = true;
  484.   }//if
  485.   return;      
  486. }//executeCallback()
  487. static
  488. StartType
  489. random_choice()
  490. {
  491. //----------------------------------------------------
  492. // Generate a random key between 0 and tNoOfRecords - 1
  493. //----------------------------------------------------
  494.    UintR random_number = lrand48() % 100;
  495.    if (random_number < tUpdateFreq)
  496.     return stUpdate;
  497.   else
  498.     return stRead;
  499. }//random_choice()
  500. static bool
  501. defineOperation(NdbConnection* localNdbConnection, TransNdb* transNdbRef,
  502.                 unsigned int vpn_id, unsigned int vpn_nb)
  503. {
  504.   NdbOperation*  localNdbOperation;
  505.   StartType      TType = transNdbRef->transStartType;
  506.   //-------------------------------------------------------
  507.   // Set-up the attribute values for this operation.
  508.   //-------------------------------------------------------
  509.   localNdbOperation = localNdbConnection->getNdbOperation(tableName[0]);        
  510.   if (localNdbOperation == NULL) {
  511.     error_handler(localNdbConnection->getNdbError());
  512.     return false;
  513.   }//if
  514.   switch (TType) {
  515.   case stInsert:   // Insert case
  516.     if (theWriteFlag == 1 && theDirtyFlag == 1) {
  517.       localNdbOperation->dirtyWrite();
  518.     } else if (theWriteFlag == 1) {
  519.       localNdbOperation->writeTuple();
  520.     } else {
  521.       localNdbOperation->insertTuple();
  522.     }//if
  523.     break;
  524.   case stRead:     // Read Case
  525.     TType = random_choice();
  526.     if (TType == stRead) {
  527.       if (theSimpleFlag == 1) {
  528.         localNdbOperation->simpleRead();
  529.       } else if (theDirtyFlag == 1) {
  530.         localNdbOperation->dirtyRead();
  531.       } else {
  532.         localNdbOperation->readTuple();
  533.       }//if
  534.     } else {
  535.       if (theWriteFlag == 1 && theDirtyFlag == 1) {
  536.         localNdbOperation->dirtyWrite();
  537.       } else if (theWriteFlag == 1) {
  538.         localNdbOperation->writeTuple();
  539.       } else if (theDirtyFlag == 1) {
  540.         localNdbOperation->dirtyUpdate();
  541.       } else {
  542.         localNdbOperation->updateTuple();
  543.       }//if
  544.     }//if
  545.     break;
  546.   case stDelete:  // Delete Case
  547.     localNdbOperation->deleteTuple();
  548.     break;
  549.   default:
  550.     error_handler(localNdbOperation->getNdbError());
  551.   }//switch
  552.   localNdbOperation->equal((Uint32)0,vpn_id);
  553.   localNdbOperation->equal((Uint32)1,vpn_nb);
  554.   char* attrValue = &transNdbRef->transRecord[0];
  555.   switch (TType) {
  556.   case stInsert:      // Insert case
  557.     localNdbOperation->setValue((Uint32)2, attrValue);
  558.     localNdbOperation->setValue((Uint32)3, attrValue);
  559.     localNdbOperation->setValue((Uint32)4, attrValue);
  560.     break;
  561.   case stUpdate:      // Update Case
  562.     localNdbOperation->setValue((Uint32)3, attrValue);
  563.     break;
  564.   case stRead:    // Read Case
  565.     localNdbOperation->getValue((Uint32)2, attrValue);
  566.     localNdbOperation->getValue((Uint32)3, attrValue);
  567.     localNdbOperation->getValue((Uint32)4, attrValue);
  568.     break;
  569.   case stDelete:  // Delete Case
  570.     break;
  571.   default:
  572.     error_handler(localNdbOperation->getNdbError());
  573.   }//switch
  574.   localNdbConnection->executeAsynchPrepare(Commit, &executeCallback, 
  575.                                            (void*)transNdbRef);
  576.   return true;
  577. }//defineOperation()
  578. static void setAttrNames()
  579. {
  580.   BaseString::snprintf(attrName[0], MAXSTRLEN, "VPN_ID");
  581.   BaseString::snprintf(attrName[1], MAXSTRLEN, "VPN_NB");
  582.   BaseString::snprintf(attrName[2], MAXSTRLEN, "DIRECTORY_NB");
  583.   BaseString::snprintf(attrName[3], MAXSTRLEN, "LAST_CALL_PARTY");
  584.   BaseString::snprintf(attrName[4], MAXSTRLEN, "DESCR");
  585. }
  586. static void setTableNames()
  587. {
  588.   BaseString::snprintf(tableName[0], MAXSTRLEN, "VPN_USERS");
  589. }
  590. static
  591. int 
  592. createTables(Ndb* pMyNdb){
  593.   NdbSchemaCon          *MySchemaTransaction;
  594.   NdbSchemaOp           *MySchemaOp;
  595.   int                   check;
  596.   if (theTableCreateFlag == 0) {
  597.     ndbout << "Creating Table: vpn_users " << "..." << endl;
  598.     MySchemaTransaction = NdbSchemaCon::startSchemaTrans(pMyNdb);
  599.       
  600.     if(MySchemaTransaction == NULL && 
  601.        (!error_handler(MySchemaTransaction->getNdbError())))
  602.       return -1;
  603.       
  604.     MySchemaOp = MySchemaTransaction->getNdbSchemaOp();       
  605.     if(MySchemaOp == NULL &&
  606.        (!error_handler(MySchemaTransaction->getNdbError())))
  607.       return -1;
  608.       
  609.     check = MySchemaOp->createTable( tableName[0]
  610.                                        ,8                       // Table Size
  611.                                        ,TupleKey                // Key Type
  612.                                        ,40                      // Nr of Pages
  613.                                        ,All
  614.                                        ,6
  615.                                        ,(tLoadFactor - 5)
  616.                                        ,tLoadFactor
  617.                                        ,1
  618.                                        ,!tempTable
  619.                                        );
  620.       
  621.     if (check == -1 &&
  622.         (!error_handler(MySchemaTransaction->getNdbError())))
  623.       return -1;
  624.       
  625.     check = MySchemaOp->createAttribute( (char*)attrName[0],
  626.                                          TupleKey,
  627.                                          32,
  628.                                          1,
  629.                                          UnSigned,
  630.                                          MMBased,
  631.                                          NotNullAttribute );
  632.       
  633.     if (check == -1 &&
  634.         (!error_handler(MySchemaTransaction->getNdbError())))
  635.       return -1;
  636.     check = MySchemaOp->createAttribute( (char*)attrName[1],
  637.                                          TupleKey,
  638.                                          32,
  639.                                          1,
  640.                                          UnSigned,
  641.                                          MMBased,
  642.                                          NotNullAttribute );
  643.       
  644.     if (check == -1 &&
  645.         (!error_handler(MySchemaTransaction->getNdbError())))
  646.       return -1;
  647.     check = MySchemaOp->createAttribute( (char*)attrName[2],
  648.                                              NoKey,
  649.                                              8,
  650.                                              10,
  651.                                              UnSigned,
  652.                                              MMBased,
  653.                                              NotNullAttribute );
  654.     if (check == -1 &&
  655.         (!error_handler(MySchemaTransaction->getNdbError())))
  656.       return -1;
  657.       
  658.     check = MySchemaOp->createAttribute( (char*)attrName[3],
  659.                                              NoKey,
  660.                                              8,
  661.                                              10,
  662.                                              UnSigned,
  663.                                              MMBased,
  664.                                              NotNullAttribute );
  665.     if (check == -1 &&
  666.         (!error_handler(MySchemaTransaction->getNdbError())))
  667.       return -1;
  668.       
  669.     check = MySchemaOp->createAttribute( (char*)attrName[4],
  670.                                              NoKey,
  671.                                              8,
  672.                                              100,
  673.                                              UnSigned,
  674.                                              MMBased,
  675.                                              NotNullAttribute );
  676.     if (check == -1 &&
  677.         (!error_handler(MySchemaTransaction->getNdbError())))
  678.       return -1;
  679.       
  680.     if (MySchemaTransaction->execute() == -1 &&
  681.         (!error_handler(MySchemaTransaction->getNdbError())))
  682.       return -1;
  683.       
  684.     NdbSchemaCon::closeSchemaTrans(MySchemaTransaction);
  685.   }//if
  686.   
  687.   return 0;
  688. }
  689. bool error_handler(const NdbError& err){
  690.   ndbout << err << endl ;
  691.   switch(err.classification){
  692.   case NdbError::NodeRecoveryError:
  693.   case NdbError::SchemaError:
  694.   case NdbError::TimeoutExpired:
  695.     ndbout << endl << "Attempting to recover and continue now..." << endl ;
  696.     return true ; // return true to retry
  697.   }
  698.   return false;
  699. }
  700. #if 0
  701. bool error_handler(const char* error_string, int error_int) {
  702.   ndbout << error_string << endl ;
  703.   if ((4008 == error_int) ||
  704.       (677 == error_int) ||
  705.       (891 == error_int) ||
  706.       (1221 == error_int) ||
  707.       (721 == error_int) ||
  708.       (266 == error_int)) {
  709.     ndbout << endl << "Attempting to recover and continue now..." << endl ;
  710.     return true ; // return true to retry
  711.   }
  712.   return false ; // return false to abort
  713. }
  714. #endif
  715. static
  716. int 
  717. readArguments(int argc, const char** argv){
  718.   
  719.   int i = 1;
  720.   while (argc > 1){
  721.     if (strcmp(argv[i], "-t") == 0){
  722.       tNoOfThreads = atoi(argv[i+1]);
  723.       if ((tNoOfThreads < 1) || (tNoOfThreads > MAXTHREADS)){
  724. ndbout_c("Invalid no of threads");
  725.         return -1;
  726.       }
  727.     } else if (strcmp(argv[i], "-p") == 0){
  728.       tNoOfParallelTrans = atoi(argv[i+1]);
  729.       if ((tNoOfParallelTrans < 1) || (tNoOfParallelTrans > MAXPAR)){
  730. ndbout_c("Invalid no of parallell transactions");
  731.         return -1;
  732.       }
  733.     } else if (strcmp(argv[i], "-o") == 0) {
  734.       tNoOfTransactions = atoi(argv[i+1]);
  735.       if (tNoOfTransactions < 1){
  736. ndbout_c("Invalid no of transactions");
  737.         return -1;
  738.       }
  739.     } else if (strcmp(argv[i], "-l") == 0){
  740.       tNoOfLoops = atoi(argv[i+1]);
  741.       if (tNoOfLoops < 1) {
  742. ndbout_c("Invalid no of loops");
  743.         return -1;
  744.       }
  745.     } else if (strcmp(argv[i], "-e") == 0){
  746.       tMinEvents = atoi(argv[i+1]);
  747.       if ((tMinEvents < 1) || (tMinEvents > tNoOfParallelTrans)) {
  748. ndbout_c("Invalid no of loops");
  749.         return -1;
  750.       }
  751.     } else if (strcmp(argv[i], "-local") == 0){
  752.       tLocalPart = atoi(argv[i+1]);
  753.       tLocal = true;
  754.       startTransGuess = true;
  755.       if ((tLocalPart < 0) || (tLocalPart > MAX_PARTS)){
  756. ndbout_c("Invalid local part");
  757.         return -1;
  758.       }
  759.     } else if (strcmp(argv[i], "-ufreq") == 0){
  760.       tUpdateFreq = atoi(argv[i+1]);
  761.       if ((tUpdateFreq < 0) || (tUpdateFreq > 100)){
  762. ndbout_c("Invalid Update Frequency");
  763.         return -1;
  764.       }
  765.     } else if (strcmp(argv[i], "-load_factor") == 0){
  766.       tLoadFactor = atoi(argv[i+1]);
  767.       if ((tLoadFactor < 40) || (tLoadFactor >= 100)){
  768. ndbout_c("Invalid LoadFactor");
  769.         return -1;
  770.       }
  771.     } else if (strcmp(argv[i], "-d") == 0){
  772.       tDelete = true;
  773.       argc++;
  774.       i--;
  775.     } else if (strcmp(argv[i], "-i") == 0){
  776.       tInsert = true;
  777.       argc++;
  778.       i--;
  779.     } else if (strcmp(argv[i], "-simple") == 0){
  780.       theSimpleFlag = 1;
  781.       argc++;
  782.       i--;
  783.     } else if (strcmp(argv[i], "-adaptive") == 0){
  784.       tSendForce = 0;
  785.       argc++;
  786.       i--;
  787.     } else if (strcmp(argv[i], "-force") == 0){
  788.       tSendForce = 1;
  789.       argc++;
  790.       i--;
  791.     } else if (strcmp(argv[i], "-non_adaptive") == 0){
  792.       tSendForce = 2;
  793.       argc++;
  794.       i--;
  795.     } else if (strcmp(argv[i], "-write") == 0){
  796.       theWriteFlag = 1;
  797.       argc++;
  798.       i--;
  799.     } else if (strcmp(argv[i], "-dirty") == 0){
  800.       theDirtyFlag = 1;
  801.       argc++;
  802.       i--;
  803.     } else if (strcmp(argv[i], "-table_create") == 0){
  804.       theTableCreateFlag = 0;
  805.       tInsert = true;
  806.       argc++;
  807.       i--;
  808.     } else if (strcmp(argv[i], "-temp") == 0){
  809.       tempTable = true;
  810.       argc++;
  811.       i--;
  812.     } else if (strcmp(argv[i], "-no_hint") == 0){
  813.       startTransGuess = false;
  814.       argc++;
  815.       i--;
  816.     } else {
  817.       return -1;
  818.     }
  819.     
  820.     argc -= 2;
  821.     i = i + 2;
  822.   }//while
  823.   if (tLocal == true) {
  824.     if (startTransGuess == false) {
  825.       ndbout_c("Not valid to use no_hint with local");
  826.     }//if
  827.   }//if
  828.   return 0;
  829. }
  830. static
  831. void
  832. input_error(){
  833.   
  834.   ndbout_c("FLEXTT");
  835.   ndbout_c("   Perform benchmark of insert, update and delete transactions");
  836.   ndbout_c("");
  837.   ndbout_c("Arguments:");
  838.   ndbout_c("   -t Number of threads to start, default 1");
  839.   ndbout_c("   -p Number of parallel transactions per thread, default 32");
  840.   ndbout_c("   -o Number of transactions per loop, default 500");
  841.   ndbout_c("   -ufreq Number Update Frequency in percent (0 -> 100), rest is read");
  842.   ndbout_c("   -load_factor Number Fill level in index in percent (40 -> 99)");
  843.   ndbout_c("   -l Number of loops to run, default 1, 0=infinite");
  844.   ndbout_c("   -i Start by inserting all records");
  845.   ndbout_c("   -d End by deleting all records (only one loop)");
  846.   ndbout_c("   -simple Use simple read to read from database");
  847.   ndbout_c("   -dirty Use dirty read to read from database");
  848.   ndbout_c("   -write Use writeTuple in insert and update");
  849.   ndbout_c("   -n Use standard table names");
  850.   ndbout_c("   -table_create Create tables in db");
  851.   ndbout_c("   -temp Create table(s) without logging");
  852.   ndbout_c("   -no_hint Don't give hint on where to execute transaction coordinator");
  853.   ndbout_c("   -adaptive Use adaptive send algorithm (default)");
  854.   ndbout_c("   -force Force send when communicating");
  855.   ndbout_c("   -non_adaptive Send at a 10 millisecond interval");
  856.   ndbout_c("   -local Number of part, only use keys in one part out of 16");
  857. }