mainAsyncGenerator.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:13k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #include <ndb_global.h>
  14. #include <NdbHost.h>
  15. #include <NdbSleep.h>
  16. #include <NdbThread.h>
  17. #include <NdbMain.h>
  18. #include <NdbOut.hpp>
  19. #include <NdbEnv.h>
  20. #include <NdbTest.hpp>
  21. #include "userInterface.h"
  22. #include "dbGenerator.h"
  23. static int   numProcesses;
  24. static int   numSeconds;
  25. static int   numWarmSeconds;
  26. static int   parallellism;
  27. static int   millisSendPoll;
  28. static int   minEventSendPoll;
  29. static int   forceSendPoll;
  30. static ThreadData *data;
  31. static Ndb_cluster_connection *g_cluster_connection= 0;
  32. static void usage(const char *prog)
  33. {
  34.   const char  *progname;
  35.    /*--------------------------------------------*/
  36.    /* Get the name of the program (without path) */
  37.    /*--------------------------------------------*/
  38.    progname = strrchr(prog, '/');
  39.    if (progname == 0)
  40.      progname = prog;
  41.    else
  42.      ++progname;
  43.    ndbout_c(
  44.            "Usage: %s [-proc <num>] [-warm <num>] [-time <num>] [ -p <num>] " 
  45.    "[-t <num> ] [ -e <num> ] [ -f <num>] n"
  46.            "  -proc <num>    Specifies that <num> is the number ofn"
  47.            "                 threads. The default is 1.n"
  48.            "  -time <num>    Specifies that the test will run for <num> sec.n"
  49.            "                 The default is 10 secn"
  50.            "  -warm <num>    Specifies the warm-up/cooldown period of <num> "
  51.    "sec.n"
  52.            "                 The default is 10 secn"
  53.    "  -p <num>       The no of parallell transactions started by "
  54.    "one threadn"
  55.    "  -e <num>       Minimum no of events before wake up in call to "
  56.    "sendPolln"
  57.    "                 Default is 1n"
  58.    "  -f <num>       force parameter to sendPolln"
  59.    "                 Default is 0n",
  60.            progname);
  61. }
  62. static
  63. int
  64. parse_args(int argc, const char **argv)
  65. {
  66.    int i;
  67.    numProcesses     = 1;
  68.    numSeconds       = 10;
  69.    numWarmSeconds   = 10;
  70.    parallellism     = 1;
  71.    millisSendPoll   = 10000;
  72.    minEventSendPoll = 1;
  73.    forceSendPoll    = 0;
  74.    
  75.    i = 1;
  76.    while (i < argc){
  77.      if (strcmp("-proc",argv[i]) == 0) {
  78.        if (i + 1 >= argc) {
  79.  return 1;
  80.        }
  81.        if (sscanf(argv[i+1], "%d", &numProcesses) == -1 ||
  82.    numProcesses <= 0 || numProcesses > 127) {
  83.  ndbout_c("-proc flag requires a positive integer argument [1..127]");
  84.  return 1;
  85.        }
  86.        i += 2;
  87.      } else if (strcmp("-p", argv[i]) == 0){
  88.        if(i + 1 >= argc){
  89.  usage(argv[0]);
  90.  return 1;
  91.        }
  92.        if (sscanf(argv[i+1], "%d", &parallellism) == -1 ||
  93.    parallellism <= 0){
  94.  ndbout_c("-p flag requires a positive integer argument");
  95.  return 1;
  96.        }
  97.        i += 2;
  98.      }
  99.      else if (strcmp("-time",argv[i]) == 0) {
  100.        if (i + 1 >= argc) {
  101.  return 1;
  102.        }
  103.        if (sscanf(argv[i+1], "%d", &numSeconds) == -1 ||
  104.    numSeconds < 0) {
  105.  ndbout_c("-time flag requires a positive integer argument");
  106.  return 1;
  107.        }
  108.        i += 2;
  109.      }
  110.      else if (strcmp("-warm",argv[i]) == 0) {
  111.        if (i + 1 >= argc) {
  112.  return 1;
  113.        }
  114.        if (sscanf(argv[i+1], "%d", &numWarmSeconds) == -1 ||
  115.    numWarmSeconds < 0) {
  116.  ndbout_c("-warm flag requires a positive integer argument");
  117.  return 1;
  118.        }
  119.        i += 2;
  120.      }
  121.      else if (strcmp("-e",argv[i]) == 0) {
  122.        if (i + 1 >= argc) {
  123.  return 1;
  124.        }
  125.        if (sscanf(argv[i+1], "%d", &minEventSendPoll) == -1 ||
  126.    minEventSendPoll < 0) {
  127.  ndbout_c("-e flag requires a positive integer argument");
  128.  return 1;
  129.        }
  130.        i += 2;
  131.      }
  132.      else if (strcmp("-f",argv[i]) == 0) {
  133.        if (i + 1 >= argc) {
  134.  usage(argv[0]);
  135.  return 1;
  136.        }
  137.        if (sscanf(argv[i+1], "%d", &forceSendPoll) == -1 ||
  138.    forceSendPoll < 0) {
  139.  ndbout_c("-f flag requires a positive integer argument");
  140.  return 1;
  141.        }
  142.        i += 2;
  143.      }
  144.      else {
  145.        return 1;
  146.      }
  147.    }
  148.    if(minEventSendPoll > parallellism){
  149.      ndbout_c("minEventSendPoll(%d) > parallellism(%d)",
  150.      minEventSendPoll, parallellism);
  151.      ndbout_c("not very good...");
  152.      ndbout_c("very bad...");
  153.      ndbout_c("exiting...");
  154.      return 1;
  155.    }
  156.    return 0;
  157. }
  158. static 
  159. void 
  160. print_transaction(const char            *header,
  161.   unsigned long          totalCount,
  162.   TransactionDefinition *trans,
  163.   unsigned int           printBranch,
  164.   unsigned int           printRollback)
  165. {
  166.   double f;
  167.   
  168.   ndbout_c("  %s: %d (%.2f%%) "
  169.    "Latency(ms) avg: %d min: %d max: %d std: %d n: %d",
  170.    header,
  171.    trans->count,
  172.    (double)trans->count / (double)totalCount * 100.0,
  173.    (int)trans->latency.getMean(),
  174.    (int)trans->latency.getMin(),
  175.    (int)trans->latency.getMax(),
  176.    (int)trans->latency.getStddev(),
  177.    (int)trans->latency.getCount()
  178.    );
  179.   
  180.   if( printBranch ){
  181.     if( trans->count == 0 )
  182.       f = 0.0;
  183.     else
  184.       f = (double)trans->branchExecuted / (double)trans->count * 100.0;
  185.     ndbout_c("      Branches Executed: %d (%.2f%%)", trans->branchExecuted, f);
  186.   }
  187.   
  188.   if( printRollback ){
  189.     if( trans->count == 0 )
  190.       f = 0.0;
  191.     else
  192.       f = (double)trans->rollbackExecuted / (double)trans->count * 100.0;
  193.     ndbout_c("      Rollback Executed: %d (%.2f%%)",trans->rollbackExecuted,f);
  194.   }
  195. }
  196. void 
  197. print_stats(const char       *title,
  198.     unsigned int      length,
  199.     unsigned int      transactionFlag,
  200.     GeneratorStatistics *gen,
  201.     int numProc, int parallellism)
  202. {
  203.   int    i;
  204.   char buf[10];
  205.   char name[MAXHOSTNAMELEN];
  206.   
  207.   name[0] = 0;
  208.   NdbHost_GetHostName(name);
  209.   
  210.   ndbout_c("n------ %s ------",title);
  211.   ndbout_c("Length        : %d %s",
  212.  length,
  213.  transactionFlag ? "Transactions" : "sec");
  214.   ndbout_c("Processor     : %s", name);
  215.   ndbout_c("Number of Proc: %d",numProc);
  216.   ndbout_c("Parallellism  : %d", parallellism);
  217.   ndbout_c("n");
  218.   if( gen->totalTransactions == 0 ) {
  219.     ndbout_c("   No Transactions for this test");
  220.   }
  221.   else {
  222.     for(i = 0; i < 5; i++) {
  223.       sprintf(buf, "T%d",i+1);
  224.       print_transaction(buf,
  225. gen->totalTransactions,
  226. &gen->transactions[i],
  227. i >= 2,
  228. i >= 3 );
  229.     }
  230.     
  231.     ndbout_c("n");
  232.     ndbout_c("  Overall Statistics:");
  233.     ndbout_c("     Transactions: %d", gen->totalTransactions);
  234.     ndbout_c("     Outer       : %.0f TPS",gen->outerTps);
  235.     ndbout_c("n");
  236.   }
  237. }
  238. static 
  239. void *
  240. threadRoutine(void *arg)
  241. {
  242.   int i;
  243.   ThreadData *data = (ThreadData *)arg;
  244.   Ndb * pNDB;
  245.   pNDB = asyncDbConnect(parallellism);       
  246.   /* NdbSleep_MilliSleep(rand() % 10); */
  247.   for(i = 0; i<parallellism; i++){
  248.     data[i].pNDB = pNDB;
  249.   }
  250.   millisSendPoll = 30000;
  251.   asyncGenerator(data, parallellism,
  252.  millisSendPoll, minEventSendPoll, forceSendPoll);
  253.   asyncDbDisconnect(pNDB);
  254.   return NULL;
  255. }
  256. NDB_COMMAND(DbAsyncGenerator, "DbAsyncGenerator",
  257.     "DbAsyncGenerator", "DbAsyncGenerator", 65535)
  258. {
  259.   ndb_init();
  260.   int i;
  261.   int j;
  262.   int k;
  263.   struct NdbThread* pThread = NULL;
  264.   GeneratorStatistics  stats;
  265.   GeneratorStatistics *p;
  266.   char threadName[32];
  267.   int rc = NDBT_OK;
  268.   void* tmp = NULL;
  269.   if(parse_args(argc,argv) != 0){
  270.     usage(argv[0]);
  271.     return NDBT_ProgramExit(NDBT_WRONGARGS);
  272.   }
  273.     
  274.   ndbout_c("nStarting Test with %d process(es) for %d %s parallellism %d",
  275.    numProcesses,
  276.    numSeconds,
  277.    "sec",
  278.    parallellism);
  279.   ndbout_c("   WarmUp/coolDown = %d sec", numWarmSeconds);
  280.   Ndb_cluster_connection con;
  281.   if(con.connect(12, 5, 1) != 0)
  282.   {
  283.     ndbout << "Unable to connect to management server." << endl;
  284.     return 0;
  285.   }
  286.   if (con.wait_until_ready(30,0) < 0)
  287.   {
  288.     ndbout << "Cluster nodes not ready in 30 seconds." << endl;
  289.     return 0;
  290.   }
  291.   
  292.   g_cluster_connection= &con;
  293.   data = (ThreadData*)malloc((numProcesses*parallellism)*sizeof(ThreadData));
  294.  
  295.   for(i = 0; i < numProcesses; i++) {
  296.     for(j = 0; j<parallellism; j++){
  297.       data[i*parallellism+j].warmUpSeconds   = numWarmSeconds;
  298.       data[i*parallellism+j].testSeconds     = numSeconds;
  299.       data[i*parallellism+j].coolDownSeconds = numWarmSeconds;
  300.       data[i*parallellism+j].randomSeed      = 
  301. NdbTick_CurrentMillisecond()+i+j;
  302.       data[i*parallellism+j].changedTime     = 0;
  303.       data[i*parallellism+j].runState        = Runnable;
  304.     }
  305.     sprintf(threadName, "AsyncThread[%d]", i);
  306.     pThread = NdbThread_Create(threadRoutine, 
  307.       (void**)&data[i*parallellism], 
  308.       65535, 
  309.       threadName,
  310.                               NDB_THREAD_PRIO_LOW);
  311.     if(pThread != 0 && pThread != NULL){
  312.       (&data[i*parallellism])->pThread = pThread;
  313.     } else {      
  314.       perror("Failed to create thread");
  315.       rc = NDBT_FAILED;
  316.     }
  317.   }
  318.   showTime();
  319.   /*--------------------------------*/
  320.   /* Wait for all processes to exit */
  321.   /*--------------------------------*/
  322.   for(i = 0; i < numProcesses; i++) {
  323.     NdbThread_WaitFor(data[i*parallellism].pThread, &tmp);
  324.     NdbThread_Destroy(&data[i*parallellism].pThread);
  325.   }
  326.    
  327.   ndbout_c("All threads have finished");
  328.   
  329.   /*-------------------------------------------*/
  330.   /* Clear all structures for total statistics */
  331.   /*-------------------------------------------*/
  332.   stats.totalTransactions = 0;
  333.   stats.outerTps          = 0.0;
  334.   
  335.   for(i = 0; i < NUM_TRANSACTION_TYPES; i++ ) {
  336.     stats.transactions[i].count            = 0;
  337.     stats.transactions[i].branchExecuted   = 0;
  338.     stats.transactions[i].rollbackExecuted = 0;
  339.     stats.transactions[i].latency.reset();
  340.   }
  341.   
  342.   /*--------------------------------*/
  343.   /* Add the values for all Threads */
  344.   /*--------------------------------*/
  345.   for(i = 0; i < numProcesses; i++) {
  346.     for(k = 0; k<parallellism; k++){
  347.       p = &data[i*parallellism+k].generator;
  348.       
  349.       stats.totalTransactions += p->totalTransactions;
  350.       stats.outerTps          += p->outerTps;
  351.       
  352.       for(j = 0; j < NUM_TRANSACTION_TYPES; j++ ) {
  353. stats.transactions[j].count += 
  354.   p->transactions[j].count;
  355. stats.transactions[j].branchExecuted += 
  356.   p->transactions[j].branchExecuted;
  357. stats.transactions[j].rollbackExecuted += 
  358.   p->transactions[j].rollbackExecuted;
  359. stats.transactions[j].latency += 
  360.   p->transactions[j].latency;
  361.       }
  362.     }
  363.   }
  364.   print_stats("Test Results", 
  365.       numSeconds,
  366.       0,
  367.       &stats,
  368.       numProcesses,
  369.       parallellism);
  370.   free(data);
  371.   
  372.   NDBT_ProgramExit(rc);
  373. }
  374. /***************************************************************
  375. * I N C L U D E D   F I L E S                                  *
  376. ***************************************************************/
  377. #include <stdio.h>
  378. #include <stdlib.h>
  379. #include <sys/types.h>
  380. #include <time.h>
  381. #include "ndb_schema.hpp"
  382. #include "ndb_error.hpp"
  383. #include "userInterface.h"
  384. #include <NdbMutex.h>
  385. #include <NdbThread.h>
  386. #include <NdbTick.h>
  387. #include <NdbApi.hpp>
  388. #include <NdbOut.hpp>
  389. /***************************************************************
  390. * L O C A L   C O N S T A N T S                                *
  391. ***************************************************************/
  392. /***************************************************************
  393. * L O C A L   D A T A   S T R U C T U R E S                    *
  394. ***************************************************************/
  395. /***************************************************************
  396. * L O C A L   F U N C T I O N S                                *
  397. ***************************************************************/
  398. #ifndef NDB_WIN32
  399. #include <unistd.h>
  400. #endif
  401. Ndb*
  402. asyncDbConnect(int parallellism){
  403.   Ndb * pNDB = new Ndb(g_cluster_connection, "TEST_DB");
  404.   
  405.   pNDB->init(parallellism + 1);
  406.   
  407.   while(pNDB->waitUntilReady() != 0){
  408.   }
  409.   
  410.   return pNDB;
  411. }
  412. void 
  413. asyncDbDisconnect(Ndb* pNDB)
  414. {
  415.   delete pNDB;
  416. }
  417. double
  418. userGetTime(void)
  419. {
  420.   static bool initialized = false;
  421.   static NDB_TICKS initSecs = 0;
  422.   static Uint32 initMicros = 0;
  423.   double timeValue = 0;
  424.   if ( !initialized ) {
  425.     initialized = true;
  426.     NdbTick_CurrentMicrosecond(&initSecs, &initMicros); 
  427.     timeValue = 0.0;
  428.   } else {
  429.     NDB_TICKS secs = 0;
  430.     Uint32 micros = 0;
  431.     NdbTick_CurrentMicrosecond(&secs, &micros);
  432.     double s  = (double)secs  - (double)initSecs;
  433.     double us = (double)micros - (double)initMicros;
  434.     
  435.     timeValue = s + (us / 1000000.0);
  436.   }
  437.   return timeValue;
  438. }
  439. void showTime()
  440. {
  441.   char buf[128];
  442.   struct tm* tm_now;
  443.   time_t now;
  444.   now = ::time((time_t*)NULL);
  445.   tm_now = ::gmtime(&now);
  446.   ::snprintf(buf, 128,
  447.      "%d-%.2d-%.2d %.2d:%.2d:%.2d", 
  448.      tm_now->tm_year + 1900, 
  449.      tm_now->tm_mon, 
  450.      tm_now->tm_mday,
  451.      tm_now->tm_hour,
  452.      tm_now->tm_min,
  453.      tm_now->tm_sec);
  454.   ndbout_c("Time: %s", buf);
  455. }