flex_bench_mysql.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:53k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. /* ***************************************************
  14. FLEXBENCH
  15. Perform benchmark of insert, update and delete transactions
  16. Arguments:
  17.     -t Number of threads to start, default 1
  18.     -o Number of operations per loop, default 500
  19.     -l Number of loops to run, default 1, 0=infinite
  20.     -a Number of attributes, default 25
  21.     -c Number of tables, default 1
  22.     -s Size of each attribute, default 1 (Primary Key is always of size 1,
  23.     independent of this value)
  24.     -lkn Number of long primary keys, default 1
  25.     -lks Size of each long primary key, default 1
  26.     -simple Use simple read to read from database
  27.     -dirty Use dirty read to read from database
  28.     -write Use writeTuple in insert and update
  29.     -stdtables Use standard table names
  30.     -no_table_create Don't create tables in db
  31.     -sleep Sleep a number of seconds before running the test, this 
  32.     can be used so that another flexBench have time to create tables
  33.     -temp Use tables without logging
  34.     -verify Verify inserts, updates and deletes
  35.     -use_ndb Use NDB API, otherwise use mysql client
  36. #ifdef CEBIT_STAT
  37.     -statserv host:port  statistics server to report to
  38.     -statfreq ops        report every ops operations (default 100)
  39. #endif
  40.     Returns:
  41.     0 - Test passed
  42.     1 - Test failed
  43.     2 - Invalid arguments
  44. * *************************************************** */
  45. #define USE_MYSQL
  46. #ifdef USE_MYSQL
  47. #include <mysql.h>
  48. #endif
  49. #include "NdbApi.hpp"
  50. #include <NdbMain.h>
  51. #include <NdbOut.hpp>
  52. #include <NdbSleep.h>
  53. #include <NdbTick.h>
  54. #include <NdbTimer.hpp>
  55. #include <NdbThread.h>
  56. #include <NdbAutoPtr.hpp>
  57. #include <NdbTest.hpp>
  58. #define MAXSTRLEN 16 
  59. #define MAXATTR 64
  60. #define MAXTABLES 128
  61. #define MAXATTRSIZE 1000
  62. #define MAXNOLONGKEY 16 // Max number of long keys.
  63. #define MAXLONGKEYTOTALSIZE 1023 // words = 4092 bytes
  64. extern "C" { static void* flexBenchThread(void*); }
  65. static int readArguments(int argc, const char** argv);
  66. #ifdef USE_MYSQL
  67. static int createTables(MYSQL*);
  68. static int dropTables(MYSQL*);
  69. #endif
  70. static int createTables(Ndb*);
  71. static void sleepBeforeStartingTest(int seconds);
  72. static void input_error();
  73. enum StartType { 
  74.   stIdle,
  75.   stInsert,
  76.   stVerify,
  77.   stRead,
  78.   stUpdate,
  79.   stDelete,
  80.   stTryDelete,
  81.   stVerifyDelete,
  82.   stStop 
  83. };
  84. struct ThreadData
  85. {
  86.   int threadNo;
  87.   NdbThread* threadLife;
  88.   int threadReady;  
  89.   StartType threadStart;
  90.   int threadResult;
  91. };
  92. static int                  tNodeId = 0 ;
  93. static char                 tableName[MAXTABLES][MAXSTRLEN+1];
  94. static char                 attrName[MAXATTR][MAXSTRLEN+1];
  95. static char**               longKeyAttrName;
  96. // Program Parameters
  97. static int                  tNoOfLoops = 1;
  98. static int                  tAttributeSize = 1;
  99. static unsigned int         tNoOfThreads = 1;
  100. static unsigned int         tNoOfTables = 1;
  101. static unsigned int         tNoOfAttributes = 25;
  102. static unsigned int         tNoOfOperations = 500;
  103. static unsigned int         tSleepTime = 0;
  104. static unsigned int         tNoOfLongPK = 1;
  105. static unsigned int         tSizeOfLongPK = 1;
  106. static unsigned int         t_instances = 1;
  107. //Program Flags
  108. static int                  theSimpleFlag = 0;
  109. static int                  theDirtyFlag = 0;
  110. static int                  theWriteFlag = 0;
  111. static int                  theStdTableNameFlag = 0;
  112. static int                  theTableCreateFlag = 0;
  113. static bool                 theTempTable = false;
  114. static bool                 VerifyFlag = true;
  115. static bool                 useLongKeys = false;
  116. static bool                 verbose = false;
  117. #ifdef USE_MYSQL
  118. static bool                 use_ndb = false;
  119. static int                  engine_id = 0;
  120. static int                  sockets[16];
  121. static int                  n_sockets = 0;
  122. static char*                engine[] =
  123.   { 
  124.     " ENGINE = NDBCLUSTER ", // use default engine
  125.     " ENGINE = MEMORY ",
  126.     " ENGINE = MYISAM ",
  127.     " ENGINE = INNODB "
  128.   };
  129. #else
  130. static bool                 use_ndb = true;
  131. #endif
  132. static ErrorData theErrorData; // Part of flexBench-program
  133. #define START_TIMER { NdbTimer timer; timer.doStart();
  134. #define STOP_TIMER timer.doStop();
  135. #define PRINT_TIMER(text, trans, opertrans) timer.printTransactionStatistics(text, trans, opertrans); };
  136. #include <NdbTCP.h>
  137. #ifdef CEBIT_STAT
  138. #include <NdbMutex.h>
  139. static bool statEnable = false;
  140. static char statHost[100];
  141. static int statFreq = 100;
  142. static int statPort = 0;
  143. static int statSock = -1;
  144. static enum { statError = -1, statClosed, statOpen } statState;
  145. static NdbMutex statMutex = NDB_MUTEX_INITIALIZER;
  146. #endif
  147. //-------------------------------------------------------------------
  148. // Statistical Reporting routines
  149. //-------------------------------------------------------------------
  150. #ifdef CEBIT_STAT
  151. // Experimental client-side statistic for CeBIT
  152. static void
  153. statReport(enum StartType st, int ops)
  154. {
  155.   if (!statEnable)
  156.     return;
  157.   if (NdbMutex_Lock(&statMutex) < 0) {
  158.     if (statState != statError) {
  159.       ndbout_c("stat: lock mutex failed: %s", strerror(errno));
  160.       statState = statError;
  161.     }
  162.     return;
  163.   }
  164.   static int nodeid;
  165.   // open connection
  166.   if (statState != statOpen) {
  167.     char *p = getenv("NDB_NODEID"); // ndbnet sets NDB_NODEID
  168.     nodeid = p == 0 ? 0 : atoi(p);
  169.     if ((statSock = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
  170.       if (statState != statError) {
  171. ndbout_c("stat: create socket failed: %s", strerror(errno));
  172. statState = statError;
  173.       }
  174.       (void)NdbMutex_Unlock(&statMutex);
  175.       return;
  176.     }
  177.     struct sockaddr_in saddr;
  178.     memset(&saddr, 0, sizeof(saddr));
  179.     saddr.sin_family = AF_INET;
  180.     saddr.sin_port = htons(statPort);
  181.     if (Ndb_getInAddr(&saddr.sin_addr, statHost) < 0) {
  182.       if (statState != statError) {
  183. ndbout_c("stat: host %s not found", statHost);
  184. statState = statError;
  185.       }
  186.       (void)close(statSock);
  187.       (void)NdbMutex_Unlock(&statMutex);
  188.       return;
  189.     }
  190.     if (connect(statSock, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) {
  191.       if (statState != statError) {
  192. ndbout_c("stat: connect failed: %s", strerror(errno));
  193. statState = statError;
  194.       }
  195.       (void)close(statSock);
  196.       (void)NdbMutex_Unlock(&statMutex);
  197.       return;
  198.     }
  199.     statState = statOpen;
  200.     ndbout_c("stat: connection to %s:%d opened", statHost, (int)statPort);
  201.   }
  202.   const char *text;
  203.   switch (st) {
  204.   case stInsert:
  205.     text = "insert";
  206.     break;
  207.   case stVerify:
  208.     text = "verify";
  209.     break;
  210.   case stRead:
  211.     text = "read";
  212.     break;
  213.   case stUpdate:
  214.     text = "update";
  215.     break;
  216.   case stDelete:
  217.     text = "delete";
  218.     break;
  219.   case stVerifyDelete:
  220.     text = "verifydelete";
  221.     break;
  222.   default:
  223.     text = "unknown";
  224.     break;
  225.   }
  226.   char buf[100];
  227.   sprintf(buf, "%d %s %dn", nodeid, text, ops);
  228.   int len = strlen(buf);
  229.   // assume SIGPIPE already ignored
  230.   if (write(statSock, buf, len) != len) {
  231.     if (statState != statError) {
  232.       ndbout_c("stat: write failed: %s", strerror(errno));
  233.       statState = statError;
  234.     }
  235.     (void)close(statSock);
  236.     (void)NdbMutex_Unlock(&statMutex);
  237.     return;
  238.   }
  239.   (void)NdbMutex_Unlock(&statMutex);
  240. }
  241. #endif // CEBIT_STAT
  242. static void 
  243. resetThreads(ThreadData* pt){
  244.   for (unsigned int i = 0; i < tNoOfThreads; i++){
  245.     pt[i].threadReady = 0;
  246.     pt[i].threadResult = 0;
  247.     pt[i].threadStart = stIdle;
  248.   }
  249. }
  250. static int 
  251. checkThreadResults(ThreadData* pt){
  252.   for (unsigned int i = 0; i < tNoOfThreads; i++){
  253.     if(pt[i].threadResult != 0){
  254.       ndbout_c("Thread%d reported fatal error %d", i, pt[i].threadResult);
  255.       return -1;
  256.     }
  257.   }
  258.   return 0;
  259. }
  260. static
  261. void 
  262. waitForThreads(ThreadData* pt)
  263. {
  264.   int cont = 1;
  265.   while (cont){
  266.     NdbSleep_MilliSleep(100);
  267.     cont = 0;
  268.     for (unsigned int i = 0; i < tNoOfThreads; i++){
  269.       if (pt[i].threadReady == 0) 
  270.     cont = 1;
  271.     }
  272.   }
  273. }
  274. static void 
  275. tellThreads(ThreadData* pt, StartType what)
  276. {
  277.   for (unsigned int i = 0; i < tNoOfThreads; i++) 
  278.     pt[i].threadStart = what;
  279. }
  280. NDB_COMMAND(flexBench, "flexBench", "flexBench", "flexbench", 65535)
  281. {
  282.   ndb_init();
  283.   ThreadData*           pThreadsData;
  284.   int                   tLoops = 0;
  285.   int                   returnValue = NDBT_OK;
  286.   if (readArguments(argc, argv) != 0){
  287.     input_error();
  288.     return NDBT_ProgramExit(NDBT_WRONGARGS);
  289.   }
  290.   NdbAutoPtr<char> p10;
  291.   if(useLongKeys){
  292.     int e1 = sizeof(char*) * tNoOfLongPK;
  293.     int e2_1 = strlen("KEYATTR  ") + 1;
  294.     int e2 = e2_1 * tNoOfLongPK;
  295.     char *tmp = (char *) malloc(e1 + e2);
  296.     p10.reset(tmp);
  297.     longKeyAttrName = (char **) tmp;
  298.     tmp += e1;
  299.     for (Uint32 i = 0; i < tNoOfLongPK; i++) {
  300.       //      longKeyAttrName[i] = (char *) malloc(strlen("KEYATTR  ") + 1);
  301.       longKeyAttrName[i] = tmp;
  302.       tmp += e2_1;
  303.       memset(longKeyAttrName[i], 0, e2_1);
  304.       sprintf(longKeyAttrName[i], "KEYATTR%i", i);
  305.     }
  306.   }
  307.   NdbAutoObjArrayPtr<ThreadData>
  308.     p12( pThreadsData = new ThreadData[tNoOfThreads] );
  309.  
  310.   ndbout << endl << "FLEXBENCH - Starting normal mode" << endl;
  311.   ndbout << "Perform benchmark of insert, update and delete transactions"<< endl;
  312.   ndbout << "  " << tNoOfThreads << " thread(s) " << endl;
  313.   ndbout << "  " << tNoOfLoops << " iterations " << endl;
  314.   ndbout << "  " << tNoOfTables << " table(s) and " << 1 << " operation(s) per transaction " <<endl;
  315.   ndbout << "  " << tNoOfAttributes << " attributes per table " << endl;
  316.   ndbout << "  " << tNoOfOperations << " transaction(s) per thread and round " << endl;
  317.   ndbout << "  " << tAttributeSize << " is the number of 32 bit words per attribute "<< endl;
  318.   ndbout << "  " << "Table(s) without logging: " << (Uint32)theTempTable << endl;
  319.   
  320.   if(useLongKeys)
  321.     ndbout << "  " << "Using long keys with " << tNoOfLongPK << " keys a' " << 
  322.       tSizeOfLongPK * 4 << " bytes each." << endl;
  323.   
  324.   ndbout << "  " << "Verification is " ; 
  325.   if(VerifyFlag) {
  326.       ndbout << "enabled" << endl ;
  327.   }else{
  328.       ndbout << "disabled" << endl ;
  329.   }
  330.   if (use_ndb) {
  331.     ndbout << "Use NDB API with NdbPool in this test case" << endl;
  332.     ndbout << "Pool size = " << t_instances << endl;
  333.   } else {
  334.     ndbout << "Use mysql client with " << engine[engine_id];
  335.     ndbout << " as engine" << endl;
  336.   }
  337.   theErrorData.printSettings(ndbout);
  338.   
  339.   NdbThread_SetConcurrencyLevel(tNoOfThreads + 2);
  340. #ifdef USE_MYSQL
  341.   MYSQL mysql;
  342.   if (!use_ndb) {
  343.     if ( mysql_thread_safe() == 0 ) {
  344.       ndbout << "Not thread safe mysql library..." << endl;
  345.       return NDBT_ProgramExit(NDBT_FAILED);
  346.     }
  347.     ndbout << "Connecting to MySQL..." <<endl;
  348.     mysql_init(&mysql);
  349.     {
  350.       int the_socket = sockets[0];
  351.       char the_socket_name[1024];
  352.       sprintf(the_socket_name, "%s%u%s", "/tmp/mysql.",the_socket,".sock");
  353.       //    sprintf(the_socket_name, "%s", "/tmp/mysql.sock");
  354.       ndbout << the_socket_name << endl;
  355.       if ( mysql_real_connect(&mysql,
  356.     "localhost",
  357.     "root",
  358.     "",
  359.     "test",
  360.     the_socket,
  361.     the_socket_name,
  362.     0) == NULL ) {
  363.         ndbout << "Connect failed" <<endl;
  364.         returnValue = NDBT_FAILED;
  365.       }
  366.     }
  367.     if(returnValue == NDBT_OK){
  368.       mysql_set_server_option(&mysql, MYSQL_OPTION_MULTI_STATEMENTS_ON);
  369.       if (createTables(&mysql) != 0){
  370.         returnValue = NDBT_FAILED;
  371.       }
  372.     }
  373.   }
  374. #endif
  375.   if (use_ndb) {
  376.     Uint32 ndb_id = 0;
  377.     if (!create_instance(t_instances, 1, t_instances)) {
  378.       ndbout << "Creation of the NdbPool failed" << endl;
  379.       returnValue = NDBT_FAILED;
  380.     } else {
  381.       Ndb* pNdb = get_ndb_object(ndb_id, "test", "def");
  382.       if (pNdb == NULL) {
  383.         ndbout << "Failed to get a NDB object" << endl;
  384.         returnValue = NDBT_FAILED;
  385.       } else {
  386.         tNodeId = pNdb->getNodeId();
  387.         ndbout << "  NdbAPI node with id = " << tNodeId << endl;
  388.         ndbout << endl;
  389.   
  390.         ndbout << "Waiting for ndb to become ready..." <<endl;
  391.         if (pNdb->waitUntilReady(2000) != 0){
  392.           ndbout << "NDB is not ready" << endl;
  393.           ndbout << "Benchmark failed!" << endl;
  394.           returnValue = NDBT_FAILED;
  395.         }
  396.         if(returnValue == NDBT_OK){
  397.           if (createTables(pNdb) != 0){
  398.             returnValue = NDBT_FAILED;
  399.           }
  400.         }
  401.         return_ndb_object(pNdb, ndb_id);
  402.       }
  403.     }
  404.   }
  405.   if(returnValue == NDBT_OK){
  406.     sleepBeforeStartingTest(tSleepTime);
  407.     
  408.     /****************************************************************
  409.      *  Create threads.                                           *
  410.      ****************************************************************/
  411.     resetThreads(pThreadsData);
  412.     
  413.     for (unsigned int i = 0; i < tNoOfThreads; i++){  
  414.       pThreadsData[i].threadNo = i;
  415.       pThreadsData[i].threadLife = NdbThread_Create(flexBenchThread,
  416.                                                     (void**)&pThreadsData[i],
  417.                                                     32768,
  418.                                                     "flexBenchThread",
  419.                                                     NDB_THREAD_PRIO_LOW);
  420.     }
  421.     
  422.     waitForThreads(pThreadsData);
  423.     
  424.     ndbout << endl <<  "All threads started" << endl << endl;
  425.     
  426.     /****************************************************************
  427.      * Execute program.                                             *
  428.      ****************************************************************/
  429.   
  430.     for(;;){
  431.       int loopCount = tLoops + 1;
  432.       ndbout << endl << "Loop # " << loopCount  << endl << endl;
  433.       
  434.       /****************************************************************
  435.        * Perform inserts.                                             *
  436.        ****************************************************************/
  437.       // Reset and start timer
  438.       START_TIMER;
  439.       // Give insert-command to all threads
  440.       resetThreads(pThreadsData);
  441.       tellThreads(pThreadsData, stInsert);
  442.       waitForThreads(pThreadsData);
  443.       if (checkThreadResults(pThreadsData) != 0){
  444.         ndbout << "Error: Threads failed in performing insert" << endl;
  445.         returnValue = NDBT_FAILED;
  446.         break;
  447.       }        
  448.       // stop timer and print results.
  449.       STOP_TIMER;
  450.       PRINT_TIMER("insert", tNoOfOperations*tNoOfThreads, tNoOfTables);
  451.       /****************************************************************
  452.       * Verify inserts.                                             *
  453.       ****************************************************************/
  454.       if (VerifyFlag) {
  455. resetThreads(pThreadsData);
  456. ndbout << "Verifying inserts...t" ;
  457. tellThreads(pThreadsData, stVerify);
  458. waitForThreads(pThreadsData);
  459. if (checkThreadResults(pThreadsData) != 0){
  460.   ndbout << "Error: Threads failed while verifying inserts" << endl;
  461.   returnValue = NDBT_FAILED;
  462.   break;
  463. }else{
  464.   ndbout << "ttOK" << endl << endl ;
  465. }
  466.       }
  467.       
  468.       /****************************************************************
  469.        * Perform read.                                                *
  470.        ****************************************************************/
  471.       // Reset and start timer 
  472.       START_TIMER;
  473.       // Give read-command to all threads
  474.       resetThreads(pThreadsData);
  475.       tellThreads(pThreadsData, stRead);
  476.       waitForThreads(pThreadsData);
  477.       if (checkThreadResults(pThreadsData) != 0){
  478.         ndbout << "Error: Threads failed in performing read" << endl;
  479.         returnValue = NDBT_FAILED;
  480.         break;
  481.       }
  482.       // stop timer and print results.
  483.       STOP_TIMER;
  484.       PRINT_TIMER("read", tNoOfOperations*tNoOfThreads, tNoOfTables);
  485.       
  486.       /****************************************************************
  487.        * Perform update.                                              *
  488.        ****************************************************************/
  489.       // Reset and start timer
  490.       START_TIMER;
  491.       // Give update-command to all threads
  492.       resetThreads(pThreadsData);
  493.       tellThreads(pThreadsData, stUpdate);
  494.       waitForThreads(pThreadsData);
  495.       if (checkThreadResults(pThreadsData) != 0){
  496.         ndbout << "Error: Threads failed in performing update" << endl;
  497.         returnValue = NDBT_FAILED;
  498.         break;
  499.       }
  500.       // stop timer and print results.
  501.       STOP_TIMER;
  502.       PRINT_TIMER("update", tNoOfOperations*tNoOfThreads, tNoOfTables);
  503.       
  504.       /****************************************************************
  505.       * Verify updates.                                             *
  506.       ****************************************************************/
  507.       if (VerifyFlag) {
  508. resetThreads(pThreadsData);
  509. ndbout << "Verifying updates...t" ;
  510. tellThreads(pThreadsData, stVerify);
  511. waitForThreads(pThreadsData);
  512. if (checkThreadResults(pThreadsData) != 0){
  513.   ndbout << "Error: Threads failed while verifying updates" << endl;
  514.   returnValue = NDBT_FAILED;
  515.   break;
  516. }else{
  517.           ndbout << "ttOK" << endl << endl ;
  518. }
  519.       }
  520.       
  521.       /****************************************************************
  522.        * Perform read.                                             *
  523.        ****************************************************************/
  524.       // Reset and start timer
  525.       START_TIMER;
  526.       // Give read-command to all threads
  527.       resetThreads(pThreadsData);
  528.       tellThreads(pThreadsData, stRead);
  529.       waitForThreads(pThreadsData);
  530.       if (checkThreadResults(pThreadsData) != 0){
  531.         ndbout << "Error: Threads failed in performing read" << endl;
  532.         returnValue = NDBT_FAILED;
  533.         break;
  534.       }
  535.       // stop timer and print results.
  536.       STOP_TIMER;
  537.       PRINT_TIMER("read", tNoOfOperations*tNoOfThreads, tNoOfTables);
  538.       /****************************************************************
  539.        * Perform delete.                                              *
  540.        ****************************************************************/
  541.       // Reset and start timer
  542.       START_TIMER;
  543.       // Give delete-command to all threads
  544.       resetThreads(pThreadsData);
  545.       tellThreads(pThreadsData, stDelete);
  546.       waitForThreads(pThreadsData);
  547.       if (checkThreadResults(pThreadsData) != 0){
  548.         ndbout << "Error: Threads failed in performing delete" << endl;
  549.         returnValue = NDBT_FAILED;
  550.         break;
  551.       }
  552.       // stop timer and print results.
  553.       STOP_TIMER;
  554.       PRINT_TIMER("delete", tNoOfOperations*tNoOfThreads, tNoOfTables);
  555.       /****************************************************************
  556.       * Verify deletes.                                              *
  557.       ****************************************************************/
  558.       if (VerifyFlag) {
  559. resetThreads(pThreadsData);
  560. ndbout << "Verifying tuple deletion..." ;
  561. tellThreads(pThreadsData, stVerifyDelete);
  562. waitForThreads(pThreadsData);
  563. if (checkThreadResults(pThreadsData) != 0){
  564.           ndbout << "Error: Threads failed in verifying deletes" << endl;
  565.           returnValue = NDBT_FAILED;
  566.           break;
  567. }else{ 
  568.           ndbout << "ttOK" << endl << endl ;
  569. }
  570.       }
  571.       
  572.       ndbout << "--------------------------------------------------" << endl;
  573.       tLoops++;
  574.       if ( 0 != tNoOfLoops && tNoOfLoops <= tLoops ) 
  575.         break;
  576.       theErrorData.printErrorCounters();
  577.     }
  578.     
  579.     resetThreads(pThreadsData);
  580.     tellThreads(pThreadsData, stStop);
  581.     waitForThreads(pThreadsData);
  582.     void * tmp;
  583.     for(Uint32 i = 0; i<tNoOfThreads; i++){
  584.       NdbThread_WaitFor(pThreadsData[i].threadLife, &tmp);
  585.       NdbThread_Destroy(&pThreadsData[i].threadLife);
  586.     }
  587.   }
  588. #ifdef USE_MYSQL
  589.   if (!use_ndb) {
  590.     dropTables(&mysql);
  591.     mysql_close(&mysql);
  592.   }
  593. #endif
  594.   if (use_ndb) {
  595.     drop_instance();
  596.   }
  597.   theErrorData.printErrorCounters();
  598.   return NDBT_ProgramExit(returnValue);
  599. }
  600. ////////////////////////////////////////
  601. unsigned long get_hash(unsigned long * hash_key, int len)
  602. {
  603.   unsigned long hash_value = 147;
  604.   unsigned h_key;
  605.   int i;
  606.   for (i = 0; i < len; i++)
  607.     {
  608.       h_key = hash_key[i];
  609.       hash_value = (hash_value << 5) + hash_value + (h_key & 255);
  610.       hash_value = (hash_value << 5) + hash_value + ((h_key >> 8) & 255);
  611.       hash_value = (hash_value << 5) + hash_value + ((h_key >> 16) & 255);
  612.       hash_value = (hash_value << 5) + hash_value + ((h_key >> 24) & 255);
  613.     }
  614.   return hash_value;
  615. }
  616. // End of warming up phase
  617. static void* flexBenchThread(void* pArg)
  618. {
  619.   ThreadData*       pThreadData = (ThreadData*)pArg;
  620.   unsigned int      threadNo, threadBase;
  621.   Ndb*              pNdb = NULL ;
  622.   Uint32            ndb_id = 0;
  623.   NdbConnection     *pTrans = NULL ;
  624.   NdbOperation**    pOps = NULL ;
  625.   StartType         tType ;
  626.   StartType         tSaveType ;
  627.   NdbRecAttr*       tTmp = NULL ;
  628.   int*              attrValue = NULL ;
  629.   int*              attrRefValue = NULL ;
  630.   int               check = 0 ;
  631.   int               loopCountOps, loopCountTables, loopCountAttributes;
  632.   int               tAttemptNo = 0;
  633.   int               tRetryAttempts = 20;
  634.   int               tResult = 0;
  635.   int               tSpecialTrans = 0;
  636.   int               nRefLocalOpOffset = 0 ;
  637.   int               nReadBuffSize = 
  638.     tNoOfTables * tNoOfAttributes * sizeof(int) * tAttributeSize ;
  639.   int               nRefBuffSize = 
  640.     tNoOfOperations * tNoOfAttributes * sizeof(int) * tAttributeSize ;
  641.   unsigned***           longKeyAttrValue = NULL;
  642.   threadNo = pThreadData->threadNo ;
  643. #ifdef USE_MYSQL
  644.   MYSQL mysql;
  645.   int the_socket = sockets[threadNo % n_sockets];
  646.   char the_socket_name[1024];
  647.   //sprintf(the_socket_name, "%s", "/tmp/mysql.sock");
  648.   sprintf(the_socket_name, "%s%u%s", "/tmp/mysql.",the_socket,".sock");
  649.   if (!use_ndb) {
  650.     ndbout << the_socket_name << endl;
  651.     ndbout << "Thread connecting to MySQL... " << endl;
  652.     mysql_init(&mysql);
  653.     
  654.     if ( mysql_real_connect(&mysql,
  655.     "localhost",
  656.     "root",
  657.     "",
  658.     "test",
  659.     the_socket,
  660.     the_socket_name,
  661.     0) == NULL ) {
  662.       ndbout << "failed" << endl;
  663.       return 0;
  664.     }
  665.     ndbout << "ok" << endl;
  666.     int r;
  667.     if (tNoOfTables > 1)
  668.       r = mysql_autocommit(&mysql, 0);
  669.     else
  670.       r = mysql_autocommit(&mysql, 1);
  671.     if (r) {
  672.       ndbout << "autocommit on/off failed" << endl;
  673.       return 0;
  674.     }
  675.   }
  676. #endif
  677.   NdbAutoPtr<int> p00( attrValue= (int*)malloc(nReadBuffSize) ) ;
  678.   NdbAutoPtr<int> p01( attrRefValue= (int*)malloc(nRefBuffSize) );
  679.   if (use_ndb) {
  680.     pOps = (NdbOperation**)malloc(tNoOfTables*sizeof(NdbOperation*)) ;
  681.   }
  682.   NdbAutoPtr<NdbOperation*> p02( pOps );
  683.   if( !attrValue || !attrRefValue ||
  684.       ( use_ndb && ( !pOps) ) ){
  685.     // Check allocations to make sure we got all the memory we asked for
  686.     ndbout << "One or more memory allocations failed when starting thread #";
  687.     ndbout << threadNo << endl ;
  688.     ndbout << "Thread #" << threadNo << " will now exit" << endl ;
  689.     tResult = 13 ;
  690.     return 0;
  691.   }
  692.   
  693.   if (use_ndb) {
  694.     pNdb = get_ndb_object(ndb_id, "test", "def");
  695.     if (pNdb == NULL) {
  696.       ndbout << "Failed to get an NDB object" << endl;
  697.       ndbout << "Thread #" << threadNo << " will now exit" << endl ;
  698.       tResult = 13;
  699.       return 0;
  700.     }
  701.     pNdb->waitUntilReady();
  702.     return_ndb_object(pNdb, ndb_id);
  703.     pNdb = NULL;
  704.   }
  705.   // To make sure that two different threads doesn't operate on the same record
  706.   // Calculate an "unique" number to use as primary key
  707.   threadBase = (threadNo * 2000000) + (tNodeId * 260000000);
  708.   NdbAutoPtr<char> p22;
  709.   if(useLongKeys){
  710.     // Allocate and populate the longkey array.
  711.     int e1 = sizeof(unsigned**) * tNoOfOperations;
  712.     int e2 = sizeof(unsigned*) * tNoOfLongPK * tNoOfOperations;
  713.     int e3 = sizeof(unsigned) * tSizeOfLongPK * tNoOfLongPK * tNoOfOperations;
  714.     char* tmp;
  715.     p22.reset(tmp = (char*)malloc(e1+e2+e3));
  716.     longKeyAttrValue = (unsigned ***) tmp;
  717.     tmp += e1;
  718.     for (Uint32 n = 0; n < tNoOfOperations; n++) {
  719.       longKeyAttrValue[n] = (unsigned **) tmp;
  720.       tmp += sizeof(unsigned*) * tNoOfLongPK;
  721.     }
  722.     for (Uint32 n = 0; n < tNoOfOperations; n++){
  723.       for (Uint32 i = 0; i < tNoOfLongPK ; i++) {
  724. longKeyAttrValue[n][i] = (unsigned *) tmp;
  725. tmp += sizeof(unsigned) * tSizeOfLongPK;
  726. memset(longKeyAttrValue[n][i], 0, sizeof(unsigned) * tSizeOfLongPK);
  727. for(Uint32 j = 0; j < tSizeOfLongPK; j++) {
  728.   // Repeat the unique value to fill up the long key.
  729.   longKeyAttrValue[n][i][j] = threadBase + n; 
  730. }
  731.       }
  732.     }
  733.   }
  734.   int nRefOpOffset = 0 ;
  735.   //Assign reference attribute values to memory
  736.   for(Uint32 ops = 1 ; ops < tNoOfOperations ; ops++){
  737.     // Calculate offset value before going into the next loop
  738.     nRefOpOffset = tAttributeSize*tNoOfAttributes*(ops-1) ; 
  739.     for(Uint32 a = 0 ; a < tNoOfAttributes ; a++){
  740.       *(int*)&attrRefValue[nRefOpOffset + tAttributeSize*a] = 
  741. (int)(threadBase + ops + a) ;
  742.     }
  743.   }
  744. #ifdef CEBIT_STAT
  745.   // ops not yet reported
  746.   int statOps = 0;
  747. #endif
  748. #ifdef USE_MYSQL
  749.   // temporary buffer to store prepared statement text
  750.   char buf[2048];
  751.   MYSQL_STMT** prep_read   = NULL;
  752.   MYSQL_STMT** prep_delete = NULL;
  753.   MYSQL_STMT** prep_update = NULL;
  754.   MYSQL_STMT** prep_insert = NULL;
  755.   MYSQL_BIND* bind_delete = NULL;
  756.   MYSQL_BIND* bind_read   = NULL;
  757.   MYSQL_BIND* bind_update = NULL;
  758.   MYSQL_BIND* bind_insert = NULL;
  759.   int* mysql_data = NULL;
  760.   NdbAutoPtr<char> p21;
  761.   if (!use_ndb) {
  762.     // data array to which prepared statements are bound
  763.     char* tmp;
  764.     int e1 = sizeof(int)*tAttributeSize*tNoOfAttributes;
  765.     int e2 = sizeof(MYSQL_BIND)*tNoOfAttributes;
  766.     int e3 = sizeof(MYSQL_BIND)*tNoOfAttributes;
  767.     int e4 = sizeof(MYSQL_BIND)*tNoOfAttributes;
  768.     int e5 = sizeof(MYSQL_BIND)*1;
  769.     int e6 = sizeof(MYSQL_STMT*)*tNoOfTables;
  770.     int e7 = sizeof(MYSQL_STMT*)*tNoOfTables;
  771.     int e8 = sizeof(MYSQL_STMT*)*tNoOfTables;
  772.     int e9 = sizeof(MYSQL_STMT*)*tNoOfTables;
  773.     p21.reset(tmp = (char*)malloc(e1+e2+e3+e4+e5+e6+e7+e8+e9));
  774.     mysql_data  = (int*)tmp;         tmp += e1;
  775.     bind_insert = (MYSQL_BIND*)tmp;  tmp += e2;
  776.     bind_update = (MYSQL_BIND*)tmp;  tmp += e3;
  777.     bind_read   = (MYSQL_BIND*)tmp;  tmp += e4;
  778.     bind_delete = (MYSQL_BIND*)tmp;  tmp += e5;
  779.     prep_insert = (MYSQL_STMT**)tmp; tmp += e6;
  780.     prep_update = (MYSQL_STMT**)tmp; tmp += e7;
  781.     prep_read   = (MYSQL_STMT**)tmp; tmp += e8;
  782.     prep_delete = (MYSQL_STMT**)tmp;
  783.     for (Uint32 ca = 0; ca < tNoOfAttributes; ca++){
  784.       MYSQL_BIND& bi = bind_insert[ca];
  785.       bi.buffer_type = MYSQL_TYPE_LONG;
  786.       bi.buffer = (char*)&mysql_data[ca*tAttributeSize];
  787.       bi.buffer_length = 0;
  788.       bi.length = NULL;
  789.       bi.is_null = NULL;
  790.     }//for
  791.     
  792.     for (Uint32 ca = 0; ca < tNoOfAttributes; ca++){
  793.       MYSQL_BIND& bi = bind_update[ca];
  794.       bi.buffer_type = MYSQL_TYPE_LONG;
  795.       if ( ca == tNoOfAttributes-1 ) // the primary key comes last in statement
  796. bi.buffer = (char*)&mysql_data[0];
  797.       else
  798. bi.buffer = (char*)&mysql_data[(ca+1)*tAttributeSize];
  799.       bi.buffer_length = 0;
  800.       bi.length = NULL;
  801.       bi.is_null = NULL;
  802.     }//for
  803.     
  804.     for (Uint32 ca = 0; ca < tNoOfAttributes; ca++){
  805.       MYSQL_BIND& bi = bind_read[ca];
  806.       bi.buffer_type = MYSQL_TYPE_LONG;
  807.       bi.buffer = (char*)&mysql_data[ca*tAttributeSize];
  808.       bi.buffer_length = 4;
  809.       bi.length = NULL;
  810.       bi.is_null = NULL;
  811.     }//for
  812.     
  813.     for (Uint32 ca = 0; ca < 1; ca++){
  814.       MYSQL_BIND& bi = bind_delete[ca];
  815.       bi.buffer_type = MYSQL_TYPE_LONG;
  816.       bi.buffer = (char*)&mysql_data[ca*tAttributeSize];
  817.       bi.buffer_length = 0;
  818.       bi.length = NULL;
  819.       bi.is_null = NULL;
  820.     }//for
  821.     
  822.     for (Uint32 i = 0; i < tNoOfTables; i++) {
  823.       int pos = 0;
  824.       pos += sprintf(buf+pos, "%s%s%s",
  825.      "INSERT INTO ",
  826.      tableName[i],
  827.      " VALUES(");
  828.       pos += sprintf(buf+pos, "%s", "?");
  829.       for (Uint32 j = 1; j < tNoOfAttributes; j++) {
  830. pos += sprintf(buf+pos, "%s", ",?");
  831.       }
  832.       pos += sprintf(buf+pos, "%s", ")");
  833.       if (verbose)
  834. ndbout << buf << endl;
  835.       prep_insert[i] = mysql_prepare(&mysql, buf, pos);
  836.       if (prep_insert[i] == 0) {
  837. ndbout << "mysql_prepare: " << mysql_error(&mysql) << endl;
  838. return 0;
  839.       }
  840.       if (mysql_bind_param(prep_insert[i], bind_insert)) {
  841. ndbout << "mysql_bind_param: " << mysql_error(&mysql) << endl;
  842. return 0;
  843.       }
  844.     }
  845.     
  846.     for (Uint32 i = 0; i < tNoOfTables; i++) {
  847.       int pos = 0;
  848.       pos += sprintf(buf+pos, "%s%s%s",
  849.      "UPDATE ",
  850.      tableName[i],
  851.      " SET ");
  852.       for (Uint32 j = 1; j < tNoOfAttributes; j++) {
  853. if (j != 1)
  854.   pos += sprintf(buf+pos, "%s", ",");
  855. pos += sprintf(buf+pos, "%s%s", attrName[j],"=?");
  856.       }
  857.       pos += sprintf(buf+pos, "%s%s%s", " WHERE ", attrName[0], "=?");
  858.       
  859.       if (verbose)
  860. ndbout << buf << endl;
  861.       prep_update[i] = mysql_prepare(&mysql, buf, pos);
  862.       if (prep_update[i] == 0) {
  863. ndbout << "mysql_prepare: " << mysql_error(&mysql) << endl;
  864. return 0;
  865.       }
  866.       if (mysql_bind_param(prep_update[i], bind_update)) {
  867. ndbout << "mysql_bind_param: " << mysql_error(&mysql) << endl;
  868. return 0;
  869.       }
  870.     }
  871.     
  872.     for (Uint32 i = 0; i < tNoOfTables; i++) {
  873.       int pos = 0;
  874.       pos += sprintf(buf+pos, "%s", "SELECT ");
  875.       for (Uint32 j = 1; j < tNoOfAttributes; j++) {
  876. if (j != 1)
  877.   pos += sprintf(buf+pos, "%s", ",");
  878. pos += sprintf(buf+pos, "%s", attrName[j]);
  879.       }
  880.       pos += sprintf(buf+pos, "%s%s%s%s%s",
  881.      " FROM ",
  882.      tableName[i],
  883.      " WHERE ",
  884.      attrName[0],
  885.      "=?");
  886.       if (verbose)
  887. ndbout << buf << endl;
  888.       prep_read[i] = mysql_prepare(&mysql, buf, pos);
  889.       if (prep_read[i] == 0) {
  890. ndbout << "mysql_prepare: " << mysql_error(&mysql) << endl;
  891. return 0;
  892.       }
  893.       if (mysql_bind_param(prep_read[i], bind_read)) {
  894. ndbout << "mysql_bind_param: " << mysql_error(&mysql) << endl;
  895. return 0;
  896.       }
  897.       if (mysql_bind_result(prep_read[i], &bind_read[1])) {
  898. ndbout << "mysql_bind_result: " << mysql_error(&mysql) << endl;
  899. return 0;
  900.       }
  901.     }
  902.     
  903.     for (Uint32 i = 0; i < tNoOfTables; i++) {
  904.       int pos = 0;
  905.       pos += sprintf(buf+pos, "%s%s%s%s%s",
  906.      "DELETE FROM ",
  907.      tableName[i],
  908.      " WHERE ",
  909.      attrName[0],
  910.      "=?");
  911.       if (verbose)
  912. ndbout << buf << endl;
  913.       prep_delete[i] = mysql_prepare(&mysql, buf, pos);
  914.       if (prep_delete[i] == 0) {
  915. ndbout << "mysql_prepare: " << mysql_error(&mysql) << endl;
  916. return 0;
  917.       }
  918.       if (mysql_bind_param(prep_delete[i], bind_delete)) {
  919. ndbout << "mysql_bind_param: " << mysql_error(&mysql) << endl;
  920. return 0;
  921.       }
  922.     }
  923.   }
  924. #endif
  925.   for (;;) {
  926.     pThreadData->threadResult = tResult; // Report error to main thread, 
  927.     // normally tResult is set to 0
  928.     pThreadData->threadReady = 1;
  929.     while (pThreadData->threadStart == stIdle){
  930.       NdbSleep_MilliSleep(100);
  931.     }//while
  932.     // Check if signal to exit is received
  933.     if (pThreadData->threadStart == stStop){
  934.       pThreadData->threadReady = 1;
  935.       // ndbout_c("Thread%d is stopping", threadNo);
  936.       // In order to stop this thread, the main thread has signaled
  937.       // stStop, break out of the for loop so that destructors
  938.       // and the proper exit functions are called
  939.       break;
  940.     }//if
  941.     tType = pThreadData->threadStart;
  942.     tSaveType = tType;
  943.     pThreadData->threadStart = stIdle;
  944.     // Start transaction, type of transaction
  945.     // is received in the array ThreadStart
  946.     loopCountOps = tNoOfOperations;
  947.     loopCountTables = tNoOfTables;
  948.     loopCountAttributes = tNoOfAttributes;
  949.     for (int count = 1; count < loopCountOps && tResult == 0;){
  950.       if (use_ndb) {
  951.         pNdb = get_ndb_object(ndb_id, "test", "def");
  952.         if (pNdb == NULL) {
  953.           ndbout << "Could not get Ndb object in thread" << threadNo;
  954.           ndbout << endl;
  955.           tResult = 1; //Indicate fatal error
  956.           break;
  957.         }
  958. pTrans = pNdb->startTransaction();
  959. if (pTrans == NULL) {
  960.   // This is a fatal error, abort program
  961.   ndbout << "Could not start transaction in thread" << threadNo;
  962.   ndbout << endl;
  963.   ndbout << pNdb->getNdbError() << endl;
  964.   tResult = 1; // Indicate fatal error
  965.   break; // Break out of for loop
  966. }
  967.       }
  968.       // Calculate the current operation offset in the reference array
  969.       nRefLocalOpOffset = tAttributeSize*tNoOfAttributes*(count - 1) ;
  970.       int* tmpAttrRefValue = attrRefValue + nRefLocalOpOffset;
  971.       for (int countTables = 0;
  972.    countTables < loopCountTables && tResult == 0;
  973.    countTables++) {
  974. int nTableOffset = tAttributeSize *
  975.   loopCountAttributes *
  976.   countTables ;
  977. int* tmpAttrValue = attrValue + nTableOffset;
  978. if (use_ndb) {
  979.   pOps[countTables] = pTrans->getNdbOperation(tableName[countTables]); 
  980.   if (pOps[countTables] == NULL) {
  981.     // This is a fatal error, abort program
  982.     ndbout << "getNdbOperation: " << pTrans->getNdbError();
  983.     tResult = 2; // Indicate fatal error
  984.     break;
  985.   }//if
  986.   switch (tType) {
  987.   case stInsert:          // Insert case
  988.     if (theWriteFlag == 1 && theDirtyFlag == 1)
  989.     pOps[countTables]->dirtyWrite();
  990.     else if (theWriteFlag == 1)
  991.       pOps[countTables]->writeTuple();
  992.     else
  993.       pOps[countTables]->insertTuple();
  994.     break;
  995.   case stRead:            // Read Case
  996.     if (theSimpleFlag == 1)
  997.       pOps[countTables]->simpleRead();
  998.     else if (theDirtyFlag == 1)
  999.       pOps[countTables]->dirtyRead();
  1000.     else
  1001.       pOps[countTables]->readTuple();
  1002.     break;
  1003.   case stUpdate:          // Update Case
  1004.     if (theWriteFlag == 1 && theDirtyFlag == 1)
  1005.       pOps[countTables]->dirtyWrite();
  1006.     else if (theWriteFlag == 1)
  1007.       pOps[countTables]->writeTuple();
  1008.     else if (theDirtyFlag == 1)
  1009.       pOps[countTables]->dirtyUpdate();
  1010.     else
  1011.       pOps[countTables]->updateTuple();
  1012.     break;
  1013.   case stDelete:          // Delete Case
  1014.     pOps[countTables]->deleteTuple();
  1015.     break;
  1016.   case stVerify:
  1017.     pOps[countTables]->readTuple();
  1018.     break;
  1019.   case stVerifyDelete:
  1020.     pOps[countTables]->readTuple();
  1021.     break;
  1022.   default:
  1023.     assert(false);
  1024.   }//switch
  1025.   
  1026.   if(useLongKeys){
  1027.     // Loop the equal call so the complete key is send to the kernel.
  1028.     for(Uint32 i = 0; i < tNoOfLongPK; i++) 
  1029.       pOps[countTables]->equal(longKeyAttrName[i], 
  1030.        (char *)longKeyAttrValue[count - 1][i],
  1031.        tSizeOfLongPK*4); 
  1032.   }
  1033.   else 
  1034.     pOps[countTables]->equal((char*)attrName[0], 
  1035.      (char*)&tmpAttrRefValue[0]);
  1036.   if (tType == stInsert) {
  1037.     for (int ca = 1; ca < loopCountAttributes; ca++){
  1038.       pOps[countTables]->setValue((char*)attrName[ca],
  1039.   (char*)&tmpAttrRefValue[tAttributeSize*ca]);
  1040.     }//for
  1041.   } else if (tType == stUpdate) {
  1042.     for (int ca = 1; ca < loopCountAttributes; ca++){
  1043.       int* tmp = (int*)&tmpAttrRefValue[tAttributeSize*ca];
  1044.       if (countTables == 0)
  1045. (*tmp)++;
  1046.       pOps[countTables]->setValue((char*)attrName[ca],(char*)tmp);
  1047.     }//for
  1048.   } else if (tType == stRead || stVerify == tType) {
  1049.     for (int ca = 1; ca < loopCountAttributes; ca++) {
  1050.       tTmp =
  1051. pOps[countTables]->getValue((char*)attrName[ca], 
  1052.     (char*)&tmpAttrValue[tAttributeSize*ca]);
  1053.     }//for
  1054.   } else if (stVerifyDelete == tType) {
  1055.     if(useLongKeys){
  1056.       tTmp = pOps[countTables]->getValue(longKeyAttrName[0], 
  1057.  (char*)&tmpAttrValue[0]);
  1058.     } else {
  1059.       tTmp = pOps[countTables]->getValue((char*)attrName[0], 
  1060.  (char*)&tmpAttrValue[0]);
  1061.     }
  1062.   }//if
  1063. } else { // !use_ndb
  1064. #ifndef USE_MYSQL
  1065.   assert(false);
  1066. #else
  1067.   switch (tType)
  1068.     {
  1069.     case stInsert:
  1070.       for (int ca = 0; ca < loopCountAttributes; ca++){
  1071. mysql_data[ca] = tmpAttrRefValue[tAttributeSize*ca];
  1072.       }//for
  1073.       if (mysql_execute(prep_insert[countTables])) {
  1074. ndbout << tableName[countTables];  
  1075. ndbout << " mysql_execute: " << mysql_error(&mysql) << endl;
  1076. tResult = 1 ;
  1077.       }
  1078.       break;
  1079.     case stUpdate:          // Update Case
  1080.       mysql_data[0] = tmpAttrRefValue[0];
  1081.       for (int ca = 1; ca < loopCountAttributes; ca++){
  1082. int* tmp = (int*)&tmpAttrRefValue[tAttributeSize*ca];
  1083. if (countTables == 0)
  1084.   (*tmp)++;
  1085. mysql_data[ca] = *tmp;
  1086.       }//for
  1087.       if (mysql_execute(prep_update[countTables])) {
  1088. ndbout << tableName[countTables];  
  1089. ndbout << " mysql_execute: " << mysql_error(&mysql) << endl;
  1090. tResult = 2 ;
  1091.       }
  1092.       break;
  1093.     case stVerify:
  1094.     case stRead:            // Read Case
  1095.       mysql_data[0] = tmpAttrRefValue[0];
  1096.       if (mysql_execute(prep_read[countTables])) {
  1097. ndbout << tableName[countTables];  
  1098. ndbout << " mysql_execute: " << mysql_error(&mysql) << endl;
  1099. tResult = 3 ;
  1100. break;
  1101.       }
  1102.       if (mysql_stmt_store_result(prep_read[countTables])) {
  1103. ndbout << tableName[countTables];  
  1104. ndbout << " mysql_stmt_store_result: "
  1105.        << mysql_error(&mysql) << endl;
  1106. tResult = 4 ;
  1107. break;
  1108.       }
  1109.       {
  1110. int rows= 0;
  1111. int r;
  1112. while ( (r= mysql_fetch(prep_read[countTables])) == 0 ){
  1113.   rows++;
  1114. }
  1115. if ( r == 1 ) {
  1116.   ndbout << tableName[countTables];  
  1117.   ndbout << " mysql_fetch: " << mysql_error(&mysql) << endl;
  1118.   tResult = 5 ;
  1119.   break;
  1120. }
  1121. if ( rows != 1 ) {
  1122.   ndbout << tableName[countTables];  
  1123.   ndbout << " mysql_fetch: rows = " << rows << endl;
  1124.   tResult = 6 ;
  1125.   break;
  1126. }
  1127.       }
  1128.       {
  1129. for (int ca = 1; ca < loopCountAttributes; ca++) {
  1130.   tmpAttrValue[tAttributeSize*ca] = mysql_data[ca];
  1131. }
  1132.       }
  1133.       break;
  1134.     case stDelete:          // Delete Case
  1135.       mysql_data[0] = tmpAttrRefValue[0];
  1136.       if (mysql_execute(prep_delete[countTables])) {
  1137. ndbout << tableName[countTables];  
  1138. ndbout << " mysql_execute: " << mysql_error(&mysql) << endl;
  1139. tResult = 7 ;
  1140. break;
  1141.       }
  1142.       break;
  1143.     case stVerifyDelete:
  1144.       {
  1145. sprintf(buf, "%s%s%s",
  1146. "SELECT COUNT(*) FROM ",tableName[countTables],";");
  1147. if (mysql_query(&mysql, buf)) {
  1148.   ndbout << buf << endl;
  1149.   ndbout << "Error: " << mysql_error(&mysql) << endl;
  1150.   tResult = 8 ;
  1151.   break;
  1152. }
  1153. MYSQL_RES *res = mysql_store_result(&mysql);
  1154. if ( res == NULL ) {
  1155.   ndbout << "mysql_store_result: "
  1156.  << mysql_error(&mysql) << endl
  1157.  << "errno: " << mysql_errno(&mysql) << endl;
  1158.   tResult = 9 ;
  1159.   break;
  1160. }
  1161. int num_fields = mysql_num_fields(res);
  1162. int num_rows   = mysql_num_rows(res);
  1163. if ( num_rows != 1 || num_fields != 1 ) {
  1164.   ndbout << tableName[countTables];  
  1165.   ndbout << " mysql_store_result: num_rows = " << num_rows
  1166.  << " num_fields = " << num_fields << endl;
  1167.   tResult = 10 ;
  1168.   break;
  1169. }
  1170. MYSQL_ROW row = mysql_fetch_row(res);
  1171. if ( row == NULL ) {
  1172.   ndbout << "mysql_fetch_row: "
  1173.  << mysql_error(&mysql) << endl;
  1174.   tResult = 11 ;
  1175.   break;
  1176. }
  1177. if ( *(char*)row[0] != '0' ) {
  1178.   ndbout << tableName[countTables];  
  1179.   ndbout << " mysql_fetch_row: value = "
  1180.  << (char*)(row[0]) << endl;
  1181.   tResult = 12 ;
  1182.   break;
  1183. }
  1184. mysql_free_result(res);
  1185.       }
  1186.       break;
  1187.     default:
  1188.       assert(false);
  1189.     }
  1190. #endif
  1191. }
  1192.       }//for Tables loop
  1193.       if (tResult != 0)
  1194. break;
  1195.       if (use_ndb){
  1196. check = pTrans->execute(Commit);
  1197.       } else {
  1198. #ifdef USE_MYSQL
  1199. if (tNoOfTables > 1)
  1200.   if (mysql_commit(&mysql)) {
  1201.     ndbout << " mysql_commit: " << mysql_error(&mysql) << endl;
  1202.     tResult = 13;
  1203.   } else 
  1204.     check = 0;
  1205. #endif
  1206.       }
  1207.       if (use_ndb) {
  1208. // Decide what kind of error this is
  1209. if ((tSpecialTrans == 1) &&
  1210.     (check == -1)) {
  1211. // --------------------------------------------------------------------
  1212. // A special transaction have been executed, change to check = 0 in
  1213. // certain situations.
  1214. // --------------------------------------------------------------------
  1215.   switch (tType) {
  1216.   case stInsert:          // Insert case
  1217.     if (630 == pTrans->getNdbError().code ) {
  1218.       check = 0;
  1219.       ndbout << "Insert with 4007 was successful" << endl;
  1220.     }//if
  1221.     break;
  1222.   case stDelete:          // Delete Case
  1223.     if (626 == pTrans->getNdbError().code ) {
  1224.       check = 0;
  1225.       ndbout << "Delete with 4007 was successful" << endl;
  1226.     }//if
  1227.     break;
  1228.   default:
  1229.     assert(false);
  1230.   }//switch
  1231. }//if
  1232. tSpecialTrans = 0;
  1233. if (check == -1) {
  1234.   if ((stVerifyDelete == tType) && 
  1235.       (626 == pTrans->getNdbError().code)) {
  1236.     // ----------------------------------------------
  1237.     // It's good news - the deleted tuple is gone, 
  1238.     // so reset "check" flag
  1239.     // ----------------------------------------------
  1240.     check = 0 ;
  1241.   } else {
  1242.     int retCode = 
  1243.       theErrorData.handleErrorCommon(pTrans->getNdbError());
  1244.     if (retCode == 1) {
  1245.       ndbout_c("execute: %d, %d, %s", count, tType, 
  1246.        pTrans->getNdbError().message );
  1247.       ndbout_c("Error code = %d", pTrans->getNdbError().code );
  1248.       tResult = 20;
  1249.     } else if (retCode == 2) {
  1250.       ndbout << "4115 should not happen in flexBench" << endl;
  1251.       tResult = 20;
  1252.     } else if (retCode == 3) {
  1253. // --------------------------------------------------------------------
  1254. // We are not certain if the transaction was successful or not.
  1255. // We must reexecute but might very well find that the transaction
  1256. // actually was updated. Updates and Reads are no problem here. Inserts
  1257. // will not cause a problem if error code 630 arrives. Deletes will
  1258. // not cause a problem if 626 arrives.
  1259. // --------------------------------------------------------------------
  1260.       if ((tType == stInsert) || (tType == stDelete)) {
  1261. tSpecialTrans = 1;
  1262.       }//if
  1263.     }//if
  1264.   }//if
  1265. }//if
  1266. // Check if retries should be made
  1267. if (check == -1 && tResult == 0) {
  1268.   if (tAttemptNo < tRetryAttempts){
  1269.     tAttemptNo++;
  1270.   } else {
  1271. // --------------------------------------------------------------------
  1272. // Too many retries have been made, report error and break out of loop
  1273. // --------------------------------------------------------------------
  1274.     ndbout << "Thread" << threadNo;
  1275.     ndbout << ": too many errors reported" << endl;
  1276.     tResult = 10;
  1277.     break;
  1278.   }//if            
  1279. }//if
  1280.       }
  1281.       if (check == 0){
  1282. // Go to the next record
  1283. count++;
  1284. tAttemptNo = 0;
  1285. #ifdef CEBIT_STAT
  1286. // report successful ops
  1287. if (statEnable) {
  1288.   statOps += loopCountTables;
  1289.   if (statOps >= statFreq) {
  1290.     statReport(tType, statOps);
  1291.     statOps = 0;
  1292.   }//if
  1293. }//if
  1294. #endif
  1295.       }//if
  1296.       if (stVerify == tType && 0 == check){
  1297. int nTableOffset = 0 ;
  1298. for (int a = 1 ; a < loopCountAttributes ; a++){
  1299.   for (int tables = 0 ; tables < loopCountTables ; tables++){
  1300.     nTableOffset = tables*loopCountAttributes*tAttributeSize;
  1301.     int ov =*(int*)&attrValue[nTableOffset + tAttributeSize*a];
  1302.     int nv =*(int*)&tmpAttrRefValue[tAttributeSize*a];
  1303.     if (ov != nv){
  1304.       ndbout << "Error in verify ";
  1305.       ndbout << "pk = " << tmpAttrRefValue[0] << ":" << endl;
  1306.       ndbout << "attrValue[" << nTableOffset + tAttributeSize*a << "] = " << ov << endl ;
  1307.       ndbout << "attrRefValue[" << nRefLocalOpOffset + tAttributeSize*a << "]" << nv << endl ;
  1308.       tResult = 11 ;
  1309.       break ;
  1310.     }//if
  1311.   }//for
  1312. }//for
  1313.       }// if(stVerify ... )
  1314.       if (use_ndb) {
  1315. pNdb->closeTransaction(pTrans);
  1316.         return_ndb_object(pNdb, ndb_id);
  1317.         pNdb = NULL;
  1318.       }
  1319.     }// operations loop
  1320. #ifdef CEBIT_STAT
  1321.     // report remaining successful ops
  1322.     if (statEnable) {
  1323.       if (statOps > 0) {
  1324. statReport(tType, statOps);
  1325. statOps = 0;
  1326.       }//if
  1327.     }//if
  1328. #endif
  1329.     if (pNdb) {
  1330.       pNdb->closeTransaction(pTrans);
  1331.       return_ndb_object(pNdb, ndb_id);
  1332.       pNdb = NULL;
  1333.     }
  1334.   }
  1335. #ifdef USE_MYSQL
  1336.   if (!use_ndb) {
  1337.     mysql_close(&mysql);
  1338.     for (Uint32 i = 0; i < tNoOfTables; i++) {
  1339.       mysql_stmt_close(prep_insert[i]);
  1340.       mysql_stmt_close(prep_update[i]);
  1341.       mysql_stmt_close(prep_delete[i]);
  1342.       mysql_stmt_close(prep_read[i]);
  1343.     }
  1344.   }
  1345. #endif
  1346.   if (use_ndb && pNdb) {
  1347.     ndbout << "I got here " << endl;
  1348.     return_ndb_object(pNdb, ndb_id);
  1349.   }
  1350.   return NULL;
  1351. }
  1352. static int readArguments(int argc, const char** argv)
  1353. {
  1354.   int i = 1;
  1355.   while (argc > 1){
  1356.     if (strcmp(argv[i], "-t") == 0){
  1357.       tNoOfThreads = atoi(argv[i+1]);
  1358.       if ((tNoOfThreads < 1)) 
  1359.         return -1;
  1360.       argc -= 1;
  1361.       i++;
  1362.     }else if (strcmp(argv[i], "-o") == 0){
  1363.       tNoOfOperations = atoi(argv[i+1]);
  1364.       if (tNoOfOperations < 1) 
  1365.         return -1;;
  1366.       argc -= 1;
  1367.       i++;
  1368.     }else if (strcmp(argv[i], "-a") == 0){
  1369.       tNoOfAttributes = atoi(argv[i+1]);
  1370.       if ((tNoOfAttributes < 2) || (tNoOfAttributes > MAXATTR)) 
  1371.         return -1;
  1372.       argc -= 1;
  1373.       i++;
  1374.     }else if (strcmp(argv[i], "-c") == 0){
  1375.       tNoOfTables = atoi(argv[i+1]);
  1376.       if ((tNoOfTables < 1) || (tNoOfTables > MAXTABLES)) 
  1377.         return -1;
  1378.       argc -= 1;
  1379.       i++;
  1380.     }else if (strcmp(argv[i], "-stdtables") == 0){
  1381.       theStdTableNameFlag = 1;
  1382.     }else if (strcmp(argv[i], "-l") == 0){
  1383.       tNoOfLoops = atoi(argv[i+1]);
  1384.       if ((tNoOfLoops < 0) || (tNoOfLoops > 100000)) 
  1385.         return -1;
  1386.       argc -= 1;
  1387.       i++;
  1388.     }else if (strcmp(argv[i], "-pool_size") == 0){
  1389.       t_instances = atoi(argv[i+1]);
  1390.       if ((t_instances < 1) || (t_instances > 240)) 
  1391.         return -1;
  1392.       argc -= 1;
  1393.       i++;
  1394. #ifdef USE_MYSQL
  1395.     }else if (strcmp(argv[i], "-engine") == 0){
  1396.       engine_id = atoi(argv[i+1]);
  1397.       if ((engine_id < 0) || (engine_id > 3)) 
  1398.         return -1;
  1399.       argc -= 1;
  1400.       i++;
  1401.     }else if (strcmp(argv[i], "-socket") == 0){
  1402.       sockets[n_sockets] = atoi(argv[i+1]);
  1403.       if (sockets[n_sockets] <= 0)
  1404.         return -1;
  1405.       n_sockets++;
  1406.       argc -= 1;
  1407.       i++;
  1408.     }else if (strcmp(argv[i], "-use_ndb") == 0){
  1409.       use_ndb = true;
  1410. #endif
  1411.     }else if (strcmp(argv[i], "-s") == 0){
  1412.       tAttributeSize = atoi(argv[i+1]);
  1413.       if ((tAttributeSize < 1) || (tAttributeSize > MAXATTRSIZE)) 
  1414.         return -1;
  1415.       argc -= 1;
  1416.       i++;
  1417.     }else if (strcmp(argv[i], "-lkn") == 0){
  1418.      tNoOfLongPK = atoi(argv[i+1]);
  1419.      useLongKeys = true;
  1420.       if ((tNoOfLongPK < 1) || (tNoOfLongPK > MAXNOLONGKEY) || 
  1421.   (tNoOfLongPK * tSizeOfLongPK) > MAXLONGKEYTOTALSIZE){
  1422.        ndbout << "Argument -lkn is not in the proper range." << endl;  
  1423. return -1;
  1424.       }
  1425.       argc -= 1;
  1426.       i++;
  1427.     }else if (strcmp(argv[i], "-lks") == 0){
  1428.       tSizeOfLongPK = atoi(argv[i+1]);
  1429.       useLongKeys = true;
  1430.       if ((tSizeOfLongPK < 1) || (tNoOfLongPK * tSizeOfLongPK) > MAXLONGKEYTOTALSIZE){
  1431. ndbout << "Argument -lks is not in the proper range 1 to " << 
  1432.   MAXLONGKEYTOTALSIZE << endl;
  1433.         return -1;
  1434.       }
  1435.       argc -= 1;
  1436.       i++;
  1437.     }else if (strcmp(argv[i], "-simple") == 0){
  1438.       theSimpleFlag = 1;
  1439.     }else if (strcmp(argv[i], "-write") == 0){
  1440.       theWriteFlag = 1;
  1441.     }else if (strcmp(argv[i], "-dirty") == 0){
  1442.       theDirtyFlag = 1;
  1443.     }else if (strcmp(argv[i], "-sleep") == 0){
  1444.       tSleepTime = atoi(argv[i+1]);
  1445.       if ((tSleepTime < 1) || (tSleepTime > 3600)) 
  1446.         return -1;
  1447.       argc -= 1;
  1448.       i++;
  1449.     }else if (strcmp(argv[i], "-no_table_create") == 0){
  1450.       theTableCreateFlag = 1;
  1451.     }else if (strcmp(argv[i], "-temp") == 0){
  1452.       theTempTable = true;
  1453.     }else if (strcmp(argv[i], "-noverify") == 0){
  1454.       VerifyFlag = false ;
  1455.     }else if (theErrorData.parseCmdLineArg(argv, i) == true){
  1456.       ; //empty, updated in errorArg(..)
  1457.     }else if (strcmp(argv[i], "-verify") == 0){
  1458.       VerifyFlag = true ;
  1459. #ifdef CEBIT_STAT
  1460.     }else if (strcmp(argv[i], "-statserv") == 0){
  1461.       if (! (argc > 2))
  1462. return -1;
  1463.       const char *p = argv[i+1];
  1464.       const char *q = strrchr(p, ':');
  1465.       if (q == 0)
  1466. return -1;
  1467.       BaseString::snprintf(statHost, sizeof(statHost), "%.*s", q-p, p);
  1468.       statPort = atoi(q+1);
  1469.       statEnable = true;
  1470.       argc -= 1;
  1471.       i++;
  1472.     }else if (strcmp(argv[i], "-statfreq") == 0){
  1473.       if (! (argc > 2))
  1474. return -1;
  1475.       statFreq = atoi(argv[i+1]);
  1476.       if (statFreq < 1)
  1477. return -1;
  1478.       argc -= 1;
  1479.       i++;
  1480. #endif
  1481.     }else{       
  1482.       return -1;
  1483.     }
  1484.     argc -= 1;
  1485.     i++;
  1486.   }
  1487. #ifdef USE_MYSQL
  1488.   if (n_sockets == 0) {
  1489.     n_sockets = 1;
  1490.     sockets[0] = 3306;
  1491.   }
  1492. #endif
  1493.   return 0;
  1494. }
  1495. static void sleepBeforeStartingTest(int seconds){
  1496.   if (seconds > 0){
  1497.       ndbout << "Sleeping(" <<seconds << ")...";
  1498.       NdbSleep_SecSleep(seconds);
  1499.       ndbout << " done!" << endl;
  1500.     }
  1501. }
  1502. #ifdef USE_MYSQL
  1503. static int
  1504. dropTables(MYSQL* mysqlp){
  1505.   char buf[2048];
  1506.   for(unsigned i = 0; i < tNoOfTables; i++){
  1507.     int pos = 0;
  1508.     ndbout << "Dropping " << tableName[i] << "... ";
  1509.     pos += sprintf(buf+pos, "%s", "DROP TABLE ");
  1510.     pos += sprintf(buf+pos, "%s%s", tableName[i], ";");
  1511.     if (verbose)
  1512.       ndbout << endl << buf << endl;
  1513.     if (mysql_query(mysqlp, buf) != 0){
  1514.       ndbout << "Failed!"<<endl
  1515.      <<mysql_error(mysqlp)<<endl
  1516.      <<buf<<endl;
  1517.     } else
  1518.       ndbout << "OK!" << endl;
  1519.   }
  1520.   
  1521.   return 0;
  1522. }
  1523. #endif
  1524. #ifdef USE_MYSQL
  1525. static int
  1526. createTables(MYSQL* mysqlp){
  1527.   for (Uint32 i = 0; i < tNoOfAttributes; i++){
  1528.     BaseString::snprintf(attrName[i], MAXSTRLEN, "COL%d", i);
  1529.   }
  1530.   // Note! Uses only uppercase letters in table name's
  1531.   // so that we can look at the tables with SQL
  1532.   for (Uint32 i = 0; i < tNoOfTables; i++){
  1533.     if (theStdTableNameFlag == 0){
  1534.       BaseString::snprintf(tableName[i], MAXSTRLEN, "TAB%d_%d", i, 
  1535.        (int)(NdbTick_CurrentMillisecond() / 1000));
  1536.     } else {
  1537.       BaseString::snprintf(tableName[i], MAXSTRLEN, "TAB%d", i);
  1538.     }
  1539.   }
  1540.   
  1541.   char buf[2048];
  1542.   for(unsigned i = 0; i < tNoOfTables; i++){
  1543.     int pos = 0;
  1544.     ndbout << "Creating " << tableName[i] << "... ";
  1545.     
  1546.     pos += sprintf(buf+pos, "%s", "CREATE TABLE ");
  1547.     pos += sprintf(buf+pos, "%s%s", tableName[i], " ");
  1548.     if(useLongKeys){
  1549.       for(Uint32 i = 0; i < tNoOfLongPK; i++) {
  1550.       }
  1551.     } else {
  1552.       pos += sprintf(buf+pos, "%s%s%s",
  1553.      "(", attrName[0], " int unsigned primary key");
  1554.     }
  1555.     for (unsigned j = 1; j < tNoOfAttributes; j++)
  1556.       pos += sprintf(buf+pos, "%s%s%s", ",", attrName[j], " int unsigned");
  1557.     pos += sprintf(buf+pos, "%s%s%s", ")", engine[engine_id], ";");
  1558.     if (verbose)
  1559.       ndbout << endl << buf << endl;
  1560.     if (mysql_query(mysqlp, buf) != 0)
  1561.       return -1;
  1562.     ndbout << "done" << endl;
  1563.   }
  1564.   return 0;
  1565. }
  1566. #endif
  1567. static int
  1568. createTables(Ndb* pMyNdb){
  1569.   for (Uint32 i = 0; i < tNoOfAttributes; i++){
  1570.     BaseString::snprintf(attrName[i], MAXSTRLEN, "COL%d", i);
  1571.   }
  1572.   // Note! Uses only uppercase letters in table name's
  1573.   // so that we can look at the tables with SQL
  1574.   for (Uint32 i = 0; i < tNoOfTables; i++){
  1575.     if (theStdTableNameFlag == 0){
  1576.       BaseString::snprintf(tableName[i], MAXSTRLEN, "TAB%d_%d", i, 
  1577.        (int)(NdbTick_CurrentMillisecond() / 1000));
  1578.     } else {
  1579.       BaseString::snprintf(tableName[i], MAXSTRLEN, "TAB%d", i);
  1580.     }
  1581.   }
  1582.   
  1583.   for(unsigned i = 0; i < tNoOfTables; i++){
  1584.     ndbout << "Creating " << tableName[i] << "... ";
  1585.     
  1586.     NdbDictionary::Table tmpTable(tableName[i]);
  1587.     
  1588.     tmpTable.setStoredTable(!theTempTable);
  1589.     if(useLongKeys){
  1590.       for(Uint32 i = 0; i < tNoOfLongPK; i++) {
  1591. NdbDictionary::Column col(longKeyAttrName[i]);
  1592. col.setType(NdbDictionary::Column::Unsigned);
  1593. col.setLength(tSizeOfLongPK);
  1594. col.setPrimaryKey(true);
  1595. tmpTable.addColumn(col);
  1596.       }
  1597.     } else {
  1598.       NdbDictionary::Column col(attrName[0]);
  1599.       col.setType(NdbDictionary::Column::Unsigned);
  1600.       col.setLength(1);
  1601.       col.setPrimaryKey(true);
  1602.       tmpTable.addColumn(col);
  1603.     }
  1604.     NdbDictionary::Column col;
  1605.     col.setType(NdbDictionary::Column::Unsigned);
  1606.     col.setLength(tAttributeSize);
  1607.     for (unsigned j = 1; j < tNoOfAttributes; j++){
  1608.       col.setName(attrName[j]);
  1609.       tmpTable.addColumn(col);
  1610.     }
  1611.     if(pMyNdb->getDictionary()->createTable(tmpTable) == -1){
  1612.       return -1;
  1613.     }
  1614.     ndbout << "done" << endl;
  1615.   }
  1616.   
  1617.   return 0;
  1618. }
  1619.       
  1620. static void input_error(){
  1621.   ndbout << endl << "Invalid argument!" << endl;
  1622.   ndbout << endl << "Arguments:" << endl;
  1623.   ndbout << "   -t Number of threads to start, default 1" << endl;
  1624.   ndbout << "   -o Number of operations per loop, default 500" << endl;
  1625.   ndbout << "   -l Number of loops to run, default 1, 0=infinite" << endl;
  1626.   ndbout << "   -a Number of attributes, default 25" << endl;
  1627.   ndbout << "   -c Number of tables, default 1" << endl;
  1628.   ndbout << "   -s Size of each attribute, default 1 (Primary Key is always of size 1," << endl;
  1629.   ndbout << "      independent of this value)" << endl;
  1630.   ndbout << "   -lkn Number of long primary keys, default 1" << endl;
  1631.   ndbout << "   -lks Size of each long primary key, default 1" << endl;
  1632.   ndbout << "   -simple Use simple read to read from database" << endl;
  1633.   ndbout << "   -dirty Use dirty read to read from database" << endl;
  1634.   ndbout << "   -write Use writeTuple in insert and update" << endl;
  1635.   ndbout << "   -stdtables Use standard table names" << endl;
  1636.   ndbout << "   -no_table_create Don't create tables in db" << endl;
  1637.   ndbout << "   -sleep Sleep a number of seconds before running the test, this" << endl;
  1638.   ndbout << "    can be used so that another flexBench have time to create tables" << endl;
  1639.   ndbout << "   -temp Use tables without logging" << endl;
  1640.   ndbout << "   -verify Verify inserts, updates and deletes" << endl ;
  1641.   ndbout << "   -use_ndb Use NDB API (otherwise use mysql client)" << endl ;
  1642.   ndbout << "   -pool_size Number of Ndb objects in pool" << endl ;
  1643.   theErrorData.printCmdLineArgs(ndbout);
  1644.   ndbout << endl <<"Returns:" << endl;
  1645.   ndbout << "t 0 - Test passed" << endl;
  1646.   ndbout << "t 1 - Test failed" << endl;
  1647.   ndbout << "t 2 - Invalid arguments" << endl << endl;
  1648. }
  1649. // vim: set sw=2: