flexBench.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:36k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. /* ***************************************************
  14. FLEXBENCH
  15. Perform benchmark of insert, update and delete transactions
  16. Arguments:
  17.     -t Number of threads to start, default 1
  18.     -o Number of operations per loop, default 500
  19.     -l Number of loops to run, default 1, 0=infinite
  20.     -a Number of attributes, default 25
  21.     -c Number of tables, default 1
  22.     -s Size of each attribute, default 1 (Primary Key is always of size 1,
  23.     independent of this value)
  24.     -lkn Number of long primary keys, default 1
  25.     -lks Size of each long primary key, default 1
  26.     -simple Use simple read to read from database
  27.     -dirty Use dirty read to read from database
  28.     -write Use writeTuple in insert and update
  29.     -stdtables Use standard table names
  30.     -no_table_create Don't create tables in db
  31.     -sleep Sleep a number of seconds before running the test, this 
  32.     can be used so that another flexBench have time to create tables
  33.     -temp Use tables without logging
  34.     -verify Verify inserts, updates and deletes
  35. #ifdef CEBIT_STAT
  36.     -statserv host:port  statistics server to report to
  37.     -statfreq ops        report every ops operations (default 100)
  38. #endif
  39.     Returns:
  40.     0 - Test passed
  41.     1 - Test failed
  42.     2 - Invalid arguments
  43. * *************************************************** */
  44. #include "NdbApi.hpp"
  45. #include <NdbMain.h>
  46. #include <NdbOut.hpp>
  47. #include <NdbSleep.h>
  48. #include <NdbTick.h>
  49. #include <NdbTimer.hpp>
  50. #include <NdbThread.h>
  51. #include <NdbTest.hpp>
  52. #define MAXSTRLEN 16 
  53. #define MAXATTR 64
  54. #define MAXTABLES 128
  55. #define MAXATTRSIZE 1000
  56. #define MAXNOLONGKEY 16 // Max number of long keys.
  57. #define MAXLONGKEYTOTALSIZE 1023 // words = 4092 bytes
  58. extern "C" { static void* flexBenchThread(void*); }
  59. static int readArguments(int argc, const char** argv);
  60. static int createTables(Ndb*);
  61. static void sleepBeforeStartingTest(int seconds);
  62. static void input_error();
  63. enum StartType { 
  64.   stIdle,
  65.   stInsert,
  66.   stVerify,
  67.   stRead,
  68.   stUpdate,
  69.   stDelete,
  70.   stTryDelete,
  71.   stVerifyDelete,
  72.   stStop 
  73. };
  74. struct ThreadData
  75. {
  76.   int threadNo;
  77.   NdbThread* threadLife;
  78.   int threadReady;  
  79.   StartType threadStart;
  80.   int threadResult;
  81. };
  82. static int                  tNodeId = 0 ;
  83. static char                 tableName[MAXTABLES][MAXSTRLEN+1];
  84. static char                 attrName[MAXATTR][MAXSTRLEN+1];
  85. static char**               longKeyAttrName;
  86. // Program Parameters
  87. static int                  tNoOfLoops = 1;
  88. static int                  tAttributeSize = 1;
  89. static unsigned int         tNoOfThreads = 1;
  90. static unsigned int         tNoOfTables = 1;
  91. static unsigned int         tNoOfAttributes = 25;
  92. static unsigned int         tNoOfOperations = 500;
  93. static unsigned int         tSleepTime = 0;
  94. static unsigned int         tNoOfLongPK = 1;
  95. static unsigned int         tSizeOfLongPK = 1;
  96. //Program Flags
  97. static int                  theSimpleFlag = 0;
  98. static int                  theDirtyFlag = 0;
  99. static int                  theWriteFlag = 0;
  100. static int                  theStdTableNameFlag = 0;
  101. static int                  theTableCreateFlag = 0;
  102. static bool                 theTempTable = false;
  103. static bool                 VerifyFlag = true;
  104. static bool                 useLongKeys = false;
  105. static ErrorData theErrorData; // Part of flexBench-program
  106. #define START_TIMER { NdbTimer timer; timer.doStart();
  107. #define STOP_TIMER timer.doStop();
  108. #define PRINT_TIMER(text, trans, opertrans) timer.printTransactionStatistics(text, trans, opertrans); };
  109. #include <NdbTCP.h>
  110. #ifdef CEBIT_STAT
  111. #include <NdbMutex.h>
  112. static bool statEnable = false;
  113. static char statHost[100];
  114. static int statFreq = 100;
  115. static int statPort = 0;
  116. static int statSock = -1;
  117. static enum { statError = -1, statClosed, statOpen } statState;
  118. static NdbMutex statMutex = NDB_MUTEX_INITIALIZER;
  119. #endif
  120. //-------------------------------------------------------------------
  121. // Statistical Reporting routines
  122. //-------------------------------------------------------------------
  123. #ifdef CEBIT_STAT
  124. // Experimental client-side statistic for CeBIT
  125. static void
  126. statReport(enum StartType st, int ops)
  127. {
  128.   if (!statEnable)
  129.     return;
  130.   if (NdbMutex_Lock(&statMutex) < 0) {
  131.     if (statState != statError) {
  132.       ndbout_c("stat: lock mutex failed: %s", strerror(errno));
  133.       statState = statError;
  134.     }
  135.     return;
  136.   }
  137.   static int nodeid;
  138.   // open connection
  139.   if (statState != statOpen) {
  140.     char *p = getenv("NDB_NODEID"); // ndbnet sets NDB_NODEID
  141.     nodeid = p == 0 ? 0 : atoi(p);
  142.     if ((statSock = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
  143.       if (statState != statError) {
  144. ndbout_c("stat: create socket failed: %s", strerror(errno));
  145. statState = statError;
  146.       }
  147.       (void)NdbMutex_Unlock(&statMutex);
  148.       return;
  149.     }
  150.     struct sockaddr_in saddr;
  151.     memset(&saddr, 0, sizeof(saddr));
  152.     saddr.sin_family = AF_INET;
  153.     saddr.sin_port = htons(statPort);
  154.     if (Ndb_getInAddr(&saddr.sin_addr, statHost) < 0) {
  155.       if (statState != statError) {
  156. ndbout_c("stat: host %s not found", statHost);
  157. statState = statError;
  158.       }
  159.       (void)close(statSock);
  160.       (void)NdbMutex_Unlock(&statMutex);
  161.       return;
  162.     }
  163.     if (connect(statSock, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) {
  164.       if (statState != statError) {
  165. ndbout_c("stat: connect failed: %s", strerror(errno));
  166. statState = statError;
  167.       }
  168.       (void)close(statSock);
  169.       (void)NdbMutex_Unlock(&statMutex);
  170.       return;
  171.     }
  172.     statState = statOpen;
  173.     ndbout_c("stat: connection to %s:%d opened", statHost, (int)statPort);
  174.   }
  175.   const char *text;
  176.   switch (st) {
  177.   case stInsert:
  178.     text = "insert";
  179.     break;
  180.   case stVerify:
  181.     text = "verify";
  182.     break;
  183.   case stRead:
  184.     text = "read";
  185.     break;
  186.   case stUpdate:
  187.     text = "update";
  188.     break;
  189.   case stDelete:
  190.     text = "delete";
  191.     break;
  192.   case stVerifyDelete:
  193.     text = "verifydelete";
  194.     break;
  195.   default:
  196.     text = "unknown";
  197.     break;
  198.   }
  199.   char buf[100];
  200.   sprintf(buf, "%d %s %dn", nodeid, text, ops);
  201.   int len = strlen(buf);
  202.   // assume SIGPIPE already ignored
  203.   if (write(statSock, buf, len) != len) {
  204.     if (statState != statError) {
  205.       ndbout_c("stat: write failed: %s", strerror(errno));
  206.       statState = statError;
  207.     }
  208.     (void)close(statSock);
  209.     (void)NdbMutex_Unlock(&statMutex);
  210.     return;
  211.   }
  212.   (void)NdbMutex_Unlock(&statMutex);
  213. }
  214. #endif // CEBIT_STAT
  215. static void 
  216. resetThreads(ThreadData* pt){
  217.   for (unsigned int i = 0; i < tNoOfThreads; i++){
  218.     pt[i].threadReady = 0;
  219.     pt[i].threadResult = 0;
  220.     pt[i].threadStart = stIdle;
  221.   }
  222. }
  223. static int 
  224. checkThreadResults(ThreadData* pt){
  225.   for (unsigned int i = 0; i < tNoOfThreads; i++){
  226.     if(pt[i].threadResult != 0){
  227.       ndbout_c("Thread%d reported fatal error %d", i, pt[i].threadResult);
  228.       return -1;
  229.     }
  230.   }
  231.   return 0;
  232. }
  233. static
  234. void 
  235. waitForThreads(ThreadData* pt)
  236. {
  237.   int cont = 1;
  238.   while (cont){
  239.     NdbSleep_MilliSleep(100);
  240.     cont = 0;
  241.     for (unsigned int i = 0; i < tNoOfThreads; i++){
  242.       if (pt[i].threadReady == 0) 
  243.     cont = 1;
  244.     }
  245.   }
  246. }
  247. static void 
  248. tellThreads(ThreadData* pt, StartType what)
  249. {
  250.   for (unsigned int i = 0; i < tNoOfThreads; i++) 
  251.     pt[i].threadStart = what;
  252. }
  253. NDB_COMMAND(flexBench, "flexBench", "flexBench", "flexbench", 65535)
  254. {
  255.   ndb_init();
  256.   ThreadData*           pThreadsData;
  257.   int                   tLoops = 0, i;
  258.   int                   returnValue = NDBT_OK;
  259.     
  260.   if (readArguments(argc, argv) != 0){
  261.     input_error();
  262.     return NDBT_ProgramExit(NDBT_WRONGARGS);
  263.   }
  264.   if(useLongKeys){
  265.     longKeyAttrName = (char **) malloc(sizeof(char*) * tNoOfLongPK);
  266.     for (Uint32 i = 0; i < tNoOfLongPK; i++) {
  267.       longKeyAttrName[i] = (char *) malloc(strlen("KEYATTR  ") + 1);
  268.       memset(longKeyAttrName[i], 0, strlen("KEYATTR  ") + 1);
  269.       sprintf(longKeyAttrName[i], "KEYATTR%i", i);
  270.     }
  271.   }
  272.   
  273.   pThreadsData = new ThreadData[tNoOfThreads];
  274.  
  275.   ndbout << endl << "FLEXBENCH - Starting normal mode" << endl;
  276.   ndbout << "Perform benchmark of insert, update and delete transactions"<< endl;
  277.   ndbout << "  " << tNoOfThreads << " thread(s) " << endl;
  278.   ndbout << "  " << tNoOfLoops << " iterations " << endl;
  279.   ndbout << "  " << tNoOfTables << " table(s) and " << 1 << " operation(s) per transaction " <<endl;
  280.   ndbout << "  " << tNoOfAttributes << " attributes per table " << endl;
  281.   ndbout << "  " << tNoOfOperations << " transaction(s) per thread and round " << endl;
  282.   ndbout << "  " << tAttributeSize << " is the number of 32 bit words per attribute "<< endl;
  283.   ndbout << "  " << "Table(s) without logging: " << (Uint32)theTempTable << endl;
  284.   
  285.   if(useLongKeys)
  286.     ndbout << "  " << "Using long keys with " << tNoOfLongPK << " keys a' " << 
  287.       tSizeOfLongPK * 4 << " bytes each." << endl;
  288.   
  289.   ndbout << "  " << "Verification is " ; 
  290.   if(VerifyFlag) {
  291.       ndbout << "enabled" << endl ;
  292.   }else{
  293.       ndbout << "disabled" << endl ;
  294.   }
  295.   theErrorData.printSettings(ndbout);
  296.   
  297.   NdbThread_SetConcurrencyLevel(tNoOfThreads + 2);
  298.   Ndb* pNdb;
  299.   pNdb = new Ndb( "TEST_DB" );  
  300.   pNdb->init();
  301.   tNodeId = pNdb->getNodeId();
  302.   ndbout << "  NdbAPI node with id = " << tNodeId << endl;
  303.   ndbout << endl;
  304.   
  305.   ndbout << "Waiting for ndb to become ready..." <<endl;
  306.   if (pNdb->waitUntilReady(2000) != 0){
  307.     ndbout << "NDB is not ready" << endl;
  308.     ndbout << "Benchmark failed!" << endl;
  309.     returnValue = NDBT_FAILED;
  310.   }
  311.   if(returnValue == NDBT_OK){
  312.     if (createTables(pNdb) != 0){
  313.       returnValue = NDBT_FAILED;
  314.     }
  315.   }
  316.   if(returnValue == NDBT_OK){
  317.     sleepBeforeStartingTest(tSleepTime);
  318.     
  319.     /****************************************************************
  320.      *  Create threads.                                           *
  321.      ****************************************************************/
  322.     resetThreads(pThreadsData);
  323.     
  324.     for (i = 0; i < tNoOfThreads; i++){  
  325.       pThreadsData[i].threadNo = i;
  326.       pThreadsData[i].threadLife = NdbThread_Create(flexBenchThread,
  327.                                                     (void**)&pThreadsData[i],
  328.                                                     32768,
  329.                                                     "flexBenchThread",
  330.                                                     NDB_THREAD_PRIO_LOW);
  331.     }
  332.     
  333.     waitForThreads(pThreadsData);
  334.     
  335.     ndbout << endl <<  "All threads started" << endl << endl;
  336.     
  337.     /****************************************************************
  338.      * Execute program.                                             *
  339.      ****************************************************************/
  340.   
  341.     for(;;){
  342.       int loopCount = tLoops + 1;
  343.       ndbout << endl << "Loop # " << loopCount  << endl << endl;
  344.       
  345.       /****************************************************************
  346.        * Perform inserts.                                             *
  347.        ****************************************************************/
  348.       // Reset and start timer
  349.       START_TIMER;
  350.       // Give insert-command to all threads
  351.       resetThreads(pThreadsData);
  352.       tellThreads(pThreadsData, stInsert);
  353.       waitForThreads(pThreadsData);
  354.       if (checkThreadResults(pThreadsData) != 0){
  355.         ndbout << "Error: Threads failed in performing insert" << endl;
  356.         returnValue = NDBT_FAILED;
  357.         break;
  358.       }        
  359.       // stop timer and print results.
  360.       STOP_TIMER;
  361.       PRINT_TIMER("insert", tNoOfOperations*tNoOfThreads, tNoOfTables);
  362.       /****************************************************************
  363.       * Verify inserts.                                             *
  364.       ****************************************************************/
  365.       if (VerifyFlag) {
  366.       resetThreads(pThreadsData);
  367.       ndbout << "Verifying inserts...t" ;
  368.       tellThreads(pThreadsData, stVerify);
  369.       waitForThreads(pThreadsData);
  370.       if (checkThreadResults(pThreadsData) != 0){
  371.         ndbout << "Error: Threads failed while verifying inserts" << endl;
  372.         returnValue = NDBT_FAILED;
  373.         break;
  374.       }else{
  375.           ndbout << "ttOK" << endl << endl ;
  376.       }
  377.       }
  378.       
  379.       /****************************************************************
  380.        * Perform read.                                                *
  381.        ****************************************************************/
  382.       // Reset and start timer 
  383.       START_TIMER;
  384.       // Give read-command to all threads
  385.       resetThreads(pThreadsData);
  386.       tellThreads(pThreadsData, stRead);
  387.       waitForThreads(pThreadsData);
  388.       if (checkThreadResults(pThreadsData) != 0){
  389.         ndbout << "Error: Threads failed in performing read" << endl;
  390.         returnValue = NDBT_FAILED;
  391.         break;
  392.       }
  393.       // stop timer and print results.
  394.       STOP_TIMER;
  395.       PRINT_TIMER("read", tNoOfOperations*tNoOfThreads, tNoOfTables);
  396.       
  397.       /****************************************************************
  398.        * Perform update.                                              *
  399.        ****************************************************************/
  400.       // Reset and start timer
  401.       START_TIMER;
  402.       // Give insert-command to all threads
  403.       resetThreads(pThreadsData);
  404.       tellThreads(pThreadsData, stUpdate);
  405.       waitForThreads(pThreadsData);
  406.       if (checkThreadResults(pThreadsData) != 0){
  407.         ndbout << "Error: Threads failed in performing update" << endl;
  408.         returnValue = NDBT_FAILED;
  409.         break;
  410.       }
  411.       // stop timer and print results.
  412.       STOP_TIMER;
  413.       PRINT_TIMER("update", tNoOfOperations*tNoOfThreads, tNoOfTables);
  414.       
  415.       /****************************************************************
  416.       * Verify updates.                                             *
  417.       ****************************************************************/
  418.       if (VerifyFlag) {
  419.       resetThreads(pThreadsData);
  420.       ndbout << "Verifying updates...t" ;
  421.       tellThreads(pThreadsData, stVerify);
  422.       waitForThreads(pThreadsData);
  423.       if (checkThreadResults(pThreadsData) != 0){
  424.         ndbout << "Error: Threads failed while verifying updates" << endl;
  425.         returnValue = NDBT_FAILED;
  426.         break;
  427.       }else{
  428.           ndbout << "ttOK" << endl << endl ;
  429.       }
  430.       }
  431.       
  432.       /****************************************************************
  433.        * Perform read.                                             *
  434.        ****************************************************************/
  435.       // Reset and start timer
  436.       START_TIMER;
  437.       // Give insert-command to all threads
  438.       resetThreads(pThreadsData);
  439.       tellThreads(pThreadsData, stRead);
  440.       waitForThreads(pThreadsData);
  441.       if (checkThreadResults(pThreadsData) != 0){
  442.         ndbout << "Error: Threads failed in performing read" << endl;
  443.         returnValue = NDBT_FAILED;
  444.         break;
  445.       }
  446.       // stop timer and print results.
  447.       STOP_TIMER;
  448.       PRINT_TIMER("read", tNoOfOperations*tNoOfThreads, tNoOfTables);
  449.       /****************************************************************
  450.        * Perform delete.                                              *
  451.        ****************************************************************/
  452.       // Reset and start timer
  453.       START_TIMER;
  454.       // Give insert-command to all threads
  455.       resetThreads(pThreadsData);
  456.       tellThreads(pThreadsData, stDelete);
  457.       waitForThreads(pThreadsData);
  458.       if (checkThreadResults(pThreadsData) != 0){
  459.         ndbout << "Error: Threads failed in performing delete" << endl;
  460.         returnValue = NDBT_FAILED;
  461.         break;
  462.       }
  463.       // stop timer and print results.
  464.       STOP_TIMER;
  465.       PRINT_TIMER("delete", tNoOfOperations*tNoOfThreads, tNoOfTables);
  466.       /****************************************************************
  467.       * Verify deletes.                                              *
  468.       ****************************************************************/
  469.       if (VerifyFlag) {
  470.       resetThreads(pThreadsData);
  471.       ndbout << "Verifying tuple deletion..." ;
  472.       tellThreads(pThreadsData, stVerifyDelete);
  473.       waitForThreads(pThreadsData);
  474.       if (checkThreadResults(pThreadsData) != 0){
  475.           ndbout << "Error: Threads failed in verifying deletes" << endl;
  476.           returnValue = NDBT_FAILED;
  477.           break;
  478.       }else{ 
  479.           ndbout << "ttOK" << endl << endl ;
  480.       }
  481.       }
  482.       ndbout << "--------------------------------------------------" << endl;
  483.       tLoops++;
  484.       if ( 0 != tNoOfLoops && tNoOfLoops <= tLoops ) 
  485.         break;
  486.       theErrorData.printErrorCounters();
  487.     }
  488.     
  489.     resetThreads(pThreadsData);
  490.     tellThreads(pThreadsData, stStop);
  491.     waitForThreads(pThreadsData);
  492.     void * tmp;
  493.     for(i = 0; i<tNoOfThreads; i++){
  494.       NdbThread_WaitFor(pThreadsData[i].threadLife, &tmp);
  495.       NdbThread_Destroy(&pThreadsData[i].threadLife);
  496.     }
  497.   }
  498.   if (useLongKeys == true) {
  499.     // Only free these areas if they have been allocated
  500.     // Otherwise cores will happen
  501.     for (i = 0; i < tNoOfLongPK; i++)
  502.       free(longKeyAttrName[i]);
  503.     free(longKeyAttrName);
  504.   } // if
  505.   delete [] pThreadsData;
  506.   delete pNdb;
  507.   theErrorData.printErrorCounters();
  508.   return NDBT_ProgramExit(returnValue);
  509. }
  510. ////////////////////////////////////////
  511. unsigned long get_hash(unsigned long * hash_key, int len)
  512. {
  513.   unsigned long hash_value = 147;
  514.   unsigned h_key;
  515.   int i;
  516.   for (i = 0; i < len; i++)
  517.     {
  518.       h_key = hash_key[i];
  519.       hash_value = (hash_value << 5) + hash_value + (h_key & 255);
  520.       hash_value = (hash_value << 5) + hash_value + ((h_key >> 8) & 255);
  521.       hash_value = (hash_value << 5) + hash_value + ((h_key >> 16) & 255);
  522.       hash_value = (hash_value << 5) + hash_value + ((h_key >> 24) & 255);
  523.     }
  524.   return hash_value;
  525. }
  526. // End of warming up phase
  527. static void* flexBenchThread(void* pArg)
  528. {
  529.   ThreadData*       pThreadData = (ThreadData*)pArg;
  530.   unsigned int      threadNo, threadBase;
  531.   Ndb*              pNdb = NULL ;
  532.   NdbConnection     *pTrans = NULL ;
  533.   NdbOperation**    pOps = NULL ;
  534.   StartType         tType ;
  535.   StartType         tSaveType ;
  536.   NdbRecAttr*       tTmp = NULL ;
  537.   int*              attrValue = NULL ;
  538.   int*              attrRefValue = NULL ;
  539.   int               check = 0 ;
  540.   int               loopCountOps, loopCountTables, loopCountAttributes;
  541.   int               tAttemptNo = 0;
  542.   int               tRetryAttempts = 20;
  543.   int               tResult = 0;
  544.   int               tSpecialTrans = 0;
  545.   int               nRefLocalOpOffset = 0 ;
  546.   int               nReadBuffSize = 
  547.     tNoOfTables * tNoOfAttributes * sizeof(int) * tAttributeSize ;
  548.   int               nRefBuffSize = 
  549.     tNoOfOperations * tNoOfAttributes * sizeof(int) * tAttributeSize ;
  550.   unsigned***           longKeyAttrValue;
  551.   threadNo = pThreadData->threadNo ;
  552.   attrValue = (int*)malloc(nReadBuffSize) ;
  553.   attrRefValue = (int*)malloc(nRefBuffSize) ;
  554.   pOps = (NdbOperation**)malloc(tNoOfTables*sizeof(NdbOperation*)) ;
  555.   pNdb = new Ndb( "TEST_DB" );
  556.   
  557.   if(!attrValue || !attrRefValue || !pOps || !pNdb){
  558.     // Check allocations to make sure we got all the memory we asked for
  559.     ndbout << "One or more memory allocations failed when starting thread #";
  560.     ndbout << threadNo << endl ;
  561.     ndbout << "Thread #" << threadNo << " will now exit" << endl ;
  562.     tResult = 13 ;
  563.     free(attrValue) ;
  564.     free(attrRefValue) ;
  565.     free(pOps) ;
  566.     delete pNdb ;
  567.     return 0; // thread exits
  568.   }
  569.   
  570.   pNdb->init();
  571.   pNdb->waitUntilReady();
  572.   // To make sure that two different threads doesn't operate on the same record
  573.   // Calculate an "unique" number to use as primary key
  574.   threadBase = (threadNo * 2000000) + (tNodeId * 260000000);
  575.   
  576.   if(useLongKeys){
  577.     // Allocate and populate the longkey array.
  578.     longKeyAttrValue = (unsigned ***) malloc(sizeof(unsigned**) * tNoOfOperations );
  579.     Uint32 n;
  580.     for (n = 0; n < tNoOfOperations; n++)
  581.       longKeyAttrValue[n] = (unsigned **) malloc(sizeof(unsigned*) * tNoOfLongPK );
  582.     for (n = 0; n < tNoOfOperations; n++){
  583.       for (Uint32 i = 0; i < tNoOfLongPK ; i++) {
  584. longKeyAttrValue[n][i] = (unsigned *) malloc(sizeof(unsigned) * tSizeOfLongPK);
  585. memset(longKeyAttrValue[n][i], 0, sizeof(unsigned) * tSizeOfLongPK);
  586. for(Uint32 j = 0; j < tSizeOfLongPK; j++) {
  587.   // Repeat the unique value to fill up the long key.
  588.   longKeyAttrValue[n][i][j] = threadBase + n; 
  589. }
  590.       }
  591.     }
  592.   }
  593.   int nRefOpOffset = 0 ;
  594.   //Assign reference attribute values to memory
  595.   for(Uint32 ops = 1 ; ops < tNoOfOperations ; ops++){
  596.     // Calculate offset value before going into the next loop
  597.     nRefOpOffset = tAttributeSize*tNoOfAttributes*(ops-1) ; 
  598.     for(Uint32 a = 0 ; a < tNoOfAttributes ; a++){
  599.       *(int*)&attrRefValue[nRefOpOffset + tAttributeSize*a] = 
  600. (int)(threadBase + ops + a) ;
  601.     }
  602.   }
  603. #ifdef CEBIT_STAT
  604.   // ops not yet reported
  605.   int statOps = 0;
  606. #endif
  607.   for (;;) {
  608.     pThreadData->threadResult = tResult; // Report error to main thread, 
  609.     // normally tResult is set to 0
  610.     pThreadData->threadReady = 1;
  611.     while (pThreadData->threadStart == stIdle){
  612.       NdbSleep_MilliSleep(100);
  613.     }//while
  614.     // Check if signal to exit is received
  615.     if (pThreadData->threadStart == stStop){
  616.       pThreadData->threadReady = 1;
  617.       // ndbout_c("Thread%d is stopping", threadNo);
  618.       // In order to stop this thread, the main thread has signaled
  619.       // stStop, break out of the for loop so that destructors
  620.       // and the proper exit functions are called
  621.       break;
  622.     }//if
  623.     tType = pThreadData->threadStart;
  624.     tSaveType = tType;
  625.     pThreadData->threadStart = stIdle;
  626.     // Start transaction, type of transaction
  627.     // is received in the array ThreadStart
  628.     loopCountOps = tNoOfOperations;
  629.     loopCountTables = tNoOfTables;
  630.     loopCountAttributes = tNoOfAttributes;
  631.     for (int count = 1; count < loopCountOps && tResult == 0;){
  632.       pTrans = pNdb->startTransaction();
  633.       if (pTrans == NULL) {
  634. // This is a fatal error, abort program
  635. ndbout << "Could not start transaction in thread" << threadNo;
  636. ndbout << endl;
  637. ndbout << pNdb->getNdbError() << endl;
  638. tResult = 1; // Indicate fatal error
  639. break; // Break out of for loop
  640.       }
  641.       // Calculate the current operation offset in the reference array
  642.       nRefLocalOpOffset = tAttributeSize*tNoOfAttributes*(count - 1) ;
  643.       for (int countTables = 0;
  644.    countTables < loopCountTables && tResult == 0;
  645.    countTables++) {
  646. pOps[countTables] = pTrans->getNdbOperation(tableName[countTables]);  
  647. if (pOps[countTables] == NULL) {
  648.   // This is a fatal error, abort program
  649.   ndbout << "getNdbOperation: " << pTrans->getNdbError();
  650.   tResult = 2; // Indicate fatal error
  651.   break;
  652. }//if
  653. switch (tType) {
  654. case stInsert:          // Insert case
  655.   if (theWriteFlag == 1 && theDirtyFlag == 1)
  656.     pOps[countTables]->dirtyWrite();
  657.   else if (theWriteFlag == 1)
  658.     pOps[countTables]->writeTuple();
  659.   else
  660.     pOps[countTables]->insertTuple();
  661.   break;
  662. case stRead:            // Read Case
  663.   if (theSimpleFlag == 1)
  664.     pOps[countTables]->simpleRead();
  665.   else if (theDirtyFlag == 1)
  666.     pOps[countTables]->dirtyRead();
  667.   else
  668.     pOps[countTables]->readTuple();
  669.   break;
  670. case stUpdate:          // Update Case
  671.   if (theWriteFlag == 1 && theDirtyFlag == 1)
  672.     pOps[countTables]->dirtyWrite();
  673.   else if (theWriteFlag == 1)
  674.     pOps[countTables]->writeTuple();
  675.   else if (theDirtyFlag == 1)
  676.     pOps[countTables]->dirtyUpdate();
  677.   else
  678.     pOps[countTables]->updateTuple();
  679.   break;
  680. case stDelete:          // Delete Case
  681.   pOps[countTables]->deleteTuple();
  682.   break;
  683. case stVerify:
  684.   pOps[countTables]->readTuple();
  685.   break;
  686. case stVerifyDelete:
  687.   pOps[countTables]->readTuple();
  688.   break;
  689. default:
  690.   assert(false);
  691. }//switch
  692. if(useLongKeys){
  693.   // Loop the equal call so the complete key is send to the kernel.
  694.   for(Uint32 i = 0; i < tNoOfLongPK; i++) 
  695.     pOps[countTables]->equal(longKeyAttrName[i], 
  696.      (char *)longKeyAttrValue[count - 1][i], tSizeOfLongPK*4); 
  697. }
  698. else 
  699.   pOps[countTables]->equal((char*)attrName[0], 
  700.    (char*)&attrRefValue[nRefLocalOpOffset]);
  701. if (tType == stInsert || tType == stUpdate){
  702.   for (int ca = 1; ca < loopCountAttributes; ca++){
  703.     pOps[countTables]->setValue((char*)attrName[ca],
  704. (char*)&attrRefValue[nRefLocalOpOffset + tAttributeSize*ca]);
  705.   }//for
  706. } else if (tType == stRead || stVerify == tType) {
  707.   int nTableOffset = tAttributeSize * 
  708.     loopCountAttributes *
  709.     countTables ;
  710.   for (int ca = 1; ca < loopCountAttributes; ca++) {
  711.     tTmp = pOps[countTables]->getValue((char*)attrName[ca], 
  712.        (char*)&attrValue[nTableOffset + tAttributeSize*ca]);
  713.   }//for
  714. } else if (stVerifyDelete == tType) {
  715.   if(useLongKeys){
  716.     int nTableOffset = tAttributeSize *
  717.       loopCountAttributes *
  718.       countTables ;
  719.     tTmp = pOps[countTables]->getValue(longKeyAttrName[0], 
  720.        (char*)&attrValue[nTableOffset]);
  721.   } else {
  722.     int nTableOffset = tAttributeSize *
  723.       loopCountAttributes *
  724.       countTables ;
  725.     tTmp = pOps[countTables]->getValue((char*)attrName[0], 
  726.        (char*)&attrValue[nTableOffset]);
  727.   }
  728. }//if
  729.       }//for Tables loop
  730.       if (tResult != 0)
  731. break;
  732.       check = pTrans->execute(Commit);
  733.       // Decide what kind of error this is
  734.       if ((tSpecialTrans == 1) &&
  735.   (check == -1)) {
  736. // --------------------------------------------------------------------
  737. // A special transaction have been executed, change to check = 0 in
  738. // certain situations.
  739. // --------------------------------------------------------------------
  740. switch (tType) {
  741. case stInsert:          // Insert case
  742.   if (630 == pTrans->getNdbError().code ) {
  743.     check = 0;
  744.     ndbout << "Insert with 4007 was successful" << endl;
  745.   }//if
  746.   break;
  747. case stDelete:          // Delete Case
  748.   if (626 == pTrans->getNdbError().code ) {
  749.     check = 0;
  750.     ndbout << "Delete with 4007 was successful" << endl;
  751.   }//if
  752.   break;
  753. default:
  754.   assert(false);
  755. }//switch
  756.       }//if
  757.       tSpecialTrans = 0;
  758.       if (check == -1) {
  759. if ((stVerifyDelete == tType) && 
  760.     (626 == pTrans->getNdbError().code)) {
  761.   // ----------------------------------------------
  762.   // It's good news - the deleted tuple is gone, 
  763.   // so reset "check" flag
  764.   // ----------------------------------------------
  765.   check = 0 ;
  766. } else {
  767.   int retCode = 
  768.     theErrorData.handleErrorCommon(pTrans->getNdbError());
  769.   if (retCode == 1) {
  770.     ndbout_c("execute: %d, %d, %s", count, tType, 
  771.      pTrans->getNdbError().message );
  772.     ndbout_c("Error code = %d", pTrans->getNdbError().code );
  773.     tResult = 20;
  774.   } else if (retCode == 2) {
  775.     ndbout << "4115 should not happen in flexBench" << endl;
  776.     tResult = 20;
  777.   } else if (retCode == 3) {
  778.     // --------------------------------------------------------------------
  779.     // We are not certain if the transaction was successful or not.
  780.     // We must reexecute but might very well find that the transaction
  781.     // actually was updated. Updates and Reads are no problem here. Inserts
  782.     // will not cause a problem if error code 630 arrives. Deletes will
  783.     // not cause a problem if 626 arrives.
  784.     // --------------------------------------------------------------------
  785.     if ((tType == stInsert) || (tType == stDelete)) {
  786.       tSpecialTrans = 1;
  787.     }//if
  788.   }//if
  789. }//if
  790.       }//if
  791.       // Check if retries should be made
  792.       if (check == -1 && tResult == 0) {
  793. if (tAttemptNo < tRetryAttempts){
  794.   tAttemptNo++;
  795. } else {
  796.   // --------------------------------------------------------------------
  797.   // Too many retries have been made, report error and break out of loop
  798.   // --------------------------------------------------------------------
  799.   ndbout << "Thread" << threadNo;
  800.   ndbout << ": too many errors reported" << endl;
  801.   tResult = 10;
  802.   break;
  803. }//if            
  804.       }//if
  805.       if (check == 0){
  806. // Go to the next record
  807. count++;
  808. tAttemptNo = 0;
  809. #ifdef CEBIT_STAT
  810. // report successful ops
  811. if (statEnable) {
  812.   statOps += loopCountTables;
  813.   if (statOps >= statFreq) {
  814.     statReport(tType, statOps);
  815.     statOps = 0;
  816.   }//if
  817. }//if
  818. #endif
  819.       }//if
  820.       if (stVerify == tType && 0 == check){
  821. int nTableOffset = 0 ;
  822. for (int a = 1 ; a < loopCountAttributes ; a++){
  823.   for (int tables = 0 ; tables < loopCountTables ; tables++){
  824.     nTableOffset = tables*loopCountAttributes*tAttributeSize ;
  825.     if (*(int*)&attrValue[nTableOffset + tAttributeSize*a] != *(int*)&attrRefValue[nRefLocalOpOffset + tAttributeSize*a]){
  826.       ndbout << "Error in verify:" << endl ;
  827.       ndbout << "attrValue[" << nTableOffset + tAttributeSize*a << "] = " << attrValue[a] << endl ;
  828.       ndbout << "attrRefValue[" << nRefLocalOpOffset + tAttributeSize*a << "]" << attrRefValue[nRefLocalOpOffset + tAttributeSize*a] << endl ;
  829.       tResult = 11 ;
  830.       break ;
  831.     }//if
  832.   }//for
  833. }//for
  834.       }// if(stVerify ... )
  835.       pNdb->closeTransaction(pTrans) ;  
  836.     }// operations loop
  837. #ifdef CEBIT_STAT
  838.     // report remaining successful ops
  839.     if (statEnable) {
  840.       if (statOps > 0) {
  841. statReport(tType, statOps);
  842. statOps = 0;
  843.       }//if
  844.     }//if
  845. #endif
  846.   }
  847.   delete pNdb;
  848.   free(attrValue) ;
  849.   free(attrRefValue) ;
  850.   free(pOps) ;
  851.   if (useLongKeys == true) {
  852.     // Only free these areas if they have been allocated
  853.     // Otherwise cores will occur
  854.     for (Uint32 n = 0; n < tNoOfOperations; n++){
  855.       for (Uint32 i = 0; i < tNoOfLongPK; i++) {
  856. free(longKeyAttrValue[n][i]);
  857.       }
  858.       free(longKeyAttrValue[n]);
  859.     }
  860.     free(longKeyAttrValue);
  861.   } // if
  862.   return NULL; // Thread exits
  863. }
  864. static int readArguments(int argc, const char** argv)
  865. {
  866.   int i = 1;
  867.   while (argc > 1){
  868.     if (strcmp(argv[i], "-t") == 0){
  869.       tNoOfThreads = atoi(argv[i+1]);
  870.       if ((tNoOfThreads < 1)) 
  871.         return -1;
  872.       argc -= 1;
  873.       i++;
  874.     }else if (strcmp(argv[i], "-o") == 0){
  875.       tNoOfOperations = atoi(argv[i+1]);
  876.       if (tNoOfOperations < 1) 
  877.         return -1;;
  878.       argc -= 1;
  879.       i++;
  880.     }else if (strcmp(argv[i], "-a") == 0){
  881.       tNoOfAttributes = atoi(argv[i+1]);
  882.       if ((tNoOfAttributes < 2) || (tNoOfAttributes > MAXATTR)) 
  883.         return -1;
  884.       argc -= 1;
  885.       i++;
  886.     }else if (strcmp(argv[i], "-lkn") == 0){
  887.      tNoOfLongPK = atoi(argv[i+1]);
  888.      useLongKeys = true;
  889.       if ((tNoOfLongPK < 1) || (tNoOfLongPK > MAXNOLONGKEY) || 
  890.   (tNoOfLongPK * tSizeOfLongPK) > MAXLONGKEYTOTALSIZE){
  891.        ndbout << "Argument -lkn is not in the proper range." << endl;  
  892. return -1;
  893.       }
  894.       argc -= 1;
  895.       i++;
  896.     }else if (strcmp(argv[i], "-lks") == 0){
  897.       tSizeOfLongPK = atoi(argv[i+1]);
  898.       useLongKeys = true;
  899.       if ((tSizeOfLongPK < 1) || (tNoOfLongPK * tSizeOfLongPK) > MAXLONGKEYTOTALSIZE){
  900. ndbout << "Argument -lks is not in the proper range 1 to " << 
  901.   MAXLONGKEYTOTALSIZE << endl;
  902.         return -1;
  903.       }
  904.       argc -= 1;
  905.       i++;
  906.     }else if (strcmp(argv[i], "-c") == 0){
  907.       tNoOfTables = atoi(argv[i+1]);
  908.       if ((tNoOfTables < 1) || (tNoOfTables > MAXTABLES)) 
  909.         return -1;
  910.       argc -= 1;
  911.       i++;
  912.     }else if (strcmp(argv[i], "-stdtables") == 0){
  913.       theStdTableNameFlag = 1;
  914.     }else if (strcmp(argv[i], "-l") == 0){
  915.       tNoOfLoops = atoi(argv[i+1]);
  916.       if ((tNoOfLoops < 0) || (tNoOfLoops > 100000)) 
  917.         return -1;
  918.       argc -= 1;
  919.       i++;
  920.     }else if (strcmp(argv[i], "-s") == 0){
  921.       tAttributeSize = atoi(argv[i+1]);
  922.       if ((tAttributeSize < 1) || (tAttributeSize > MAXATTRSIZE)) 
  923.         return -1;
  924.       argc -= 1;
  925.       i++;
  926.     }else if (strcmp(argv[i], "-sleep") == 0){
  927.       tSleepTime = atoi(argv[i+1]);
  928.       if ((tSleepTime < 1) || (tSleepTime > 3600)) 
  929.         return -1;
  930.       argc -= 1;
  931.       i++;
  932.     }else if (strcmp(argv[i], "-simple") == 0){
  933.       theSimpleFlag = 1;
  934.     }else if (strcmp(argv[i], "-write") == 0){
  935.       theWriteFlag = 1;
  936.     }else if (strcmp(argv[i], "-dirty") == 0){
  937.       theDirtyFlag = 1;
  938.     }else if (strcmp(argv[i], "-no_table_create") == 0){
  939.       theTableCreateFlag = 1;
  940.     }else if (strcmp(argv[i], "-temp") == 0){
  941.       theTempTable = true;
  942.     }else if (strcmp(argv[i], "-noverify") == 0){
  943.       VerifyFlag = false ;
  944.     }else if (theErrorData.parseCmdLineArg(argv, i) == true){
  945.       ; //empty, updated in errorArg(..)
  946.     }else if (strcmp(argv[i], "-verify") == 0){
  947.       VerifyFlag = true ;
  948. #ifdef CEBIT_STAT
  949.     }else if (strcmp(argv[i], "-statserv") == 0){
  950.       if (! (argc > 2))
  951. return -1;
  952.       const char *p = argv[i+1];
  953.       const char *q = strrchr(p, ':');
  954.       if (q == 0)
  955. return -1;
  956.       BaseString::snprintf(statHost, sizeof(statHost), "%.*s", q-p, p);
  957.       statPort = atoi(q+1);
  958.       statEnable = true;
  959.       argc -= 1;
  960.       i++;
  961.     }else if (strcmp(argv[i], "-statfreq") == 0){
  962.       if (! (argc > 2))
  963. return -1;
  964.       statFreq = atoi(argv[i+1]);
  965.       if (statFreq < 1)
  966. return -1;
  967.       argc -= 1;
  968.       i++;
  969. #endif
  970.     }else{       
  971.       return -1;
  972.     }
  973.     argc -= 1;
  974.     i++;
  975.   }
  976.   return 0;
  977. }
  978. static void sleepBeforeStartingTest(int seconds){
  979.   if (seconds > 0){
  980.       ndbout << "Sleeping(" <<seconds << ")...";
  981.       NdbSleep_SecSleep(seconds);
  982.       ndbout << " done!" << endl;
  983.     }
  984. }
  985. static int
  986. createTables(Ndb* pMyNdb){
  987.   int i;
  988.   for (i = 0; i < tNoOfAttributes; i++){
  989.     BaseString::snprintf(attrName[i], MAXSTRLEN, "COL%d", i);
  990.   }
  991.   // Note! Uses only uppercase letters in table name's
  992.   // so that we can look at the tables with SQL
  993.   for (i = 0; i < tNoOfTables; i++){
  994.     if (theStdTableNameFlag == 0){
  995.       BaseString::snprintf(tableName[i], MAXSTRLEN, "TAB%d_%d", i, 
  996.        (int)(NdbTick_CurrentMillisecond() / 1000));
  997.     } else {
  998.       BaseString::snprintf(tableName[i], MAXSTRLEN, "TAB%d", i);
  999.     }
  1000.   }
  1001.   
  1002.   for(i = 0; i < tNoOfTables; i++){
  1003.     ndbout << "Creating " << tableName[i] << "... ";
  1004.     
  1005.     NdbDictionary::Table tmpTable(tableName[i]);
  1006.     
  1007.     tmpTable.setStoredTable(!theTempTable);
  1008.     
  1009.     if(useLongKeys){
  1010.       for(Uint32 i = 0; i < tNoOfLongPK; i++) {
  1011. NdbDictionary::Column col(longKeyAttrName[i]);
  1012. col.setType(NdbDictionary::Column::Unsigned);
  1013. col.setLength(tSizeOfLongPK);
  1014. col.setPrimaryKey(true);
  1015. tmpTable.addColumn(col);
  1016.       }
  1017.     } else {
  1018.       NdbDictionary::Column col(attrName[0]);
  1019.       col.setType(NdbDictionary::Column::Unsigned);
  1020.       col.setLength(1);
  1021.       col.setPrimaryKey(true);
  1022.       tmpTable.addColumn(col);
  1023.     }
  1024.     
  1025.     
  1026.     NdbDictionary::Column col;
  1027.     col.setType(NdbDictionary::Column::Unsigned);
  1028.     col.setLength(tAttributeSize);
  1029.     for (unsigned j = 1; j < tNoOfAttributes; j++){
  1030.       col.setName(attrName[j]);
  1031.       tmpTable.addColumn(col);
  1032.     }
  1033.     
  1034.     if(pMyNdb->getDictionary()->createTable(tmpTable) == -1){
  1035.       return -1;
  1036.     }
  1037.     ndbout << "done" << endl;
  1038.   }
  1039.   
  1040.   return 0;
  1041. }
  1042.       
  1043. static void input_error(){
  1044.   ndbout << endl << "Invalid argument!" << endl;
  1045.   ndbout << endl << "Arguments:" << endl;
  1046.   ndbout << "   -t Number of threads to start, default 1" << endl;
  1047.   ndbout << "   -o Number of operations per loop, default 500" << endl;
  1048.   ndbout << "   -l Number of loops to run, default 1, 0=infinite" << endl;
  1049.   ndbout << "   -a Number of attributes, default 25" << endl;
  1050.   ndbout << "   -c Number of tables, default 1" << endl;
  1051.   ndbout << "   -s Size of each attribute, default 1 (Primary Key is always of size 1," << endl;
  1052.   ndbout << "      independent of this value)" << endl;
  1053.   ndbout << "   -lkn Number of long primary keys, default 1" << endl;
  1054.   ndbout << "   -lks Size of each long primary key, default 1" << endl;
  1055.   ndbout << "   -simple Use simple read to read from database" << endl;
  1056.   ndbout << "   -dirty Use dirty read to read from database" << endl;
  1057.   ndbout << "   -write Use writeTuple in insert and update" << endl;
  1058.   ndbout << "   -stdtables Use standard table names" << endl;
  1059.   ndbout << "   -no_table_create Don't create tables in db" << endl;
  1060.   ndbout << "   -sleep Sleep a number of seconds before running the test, this" << endl;
  1061.   ndbout << "    can be used so that another flexBench have time to create tables" << endl;
  1062.   ndbout << "   -temp Use tables without logging" << endl;
  1063.   ndbout << "   -verify Verify inserts, updates and deletes" << endl ;
  1064.   theErrorData.printCmdLineArgs(ndbout);
  1065.   ndbout << endl <<"Returns:" << endl;
  1066.   ndbout << "t 0 - Test passed" << endl;
  1067.   ndbout << "t 1 - Test failed" << endl;
  1068.   ndbout << "t 2 - Invalid arguments" << endl << endl;
  1069. }
  1070. // vim: set sw=2: