mainAsyncGenerator.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:10k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #include <ndb_global.h>
  14. #include <NdbHost.h>
  15. #include <NdbSleep.h>
  16. #include <NdbThread.h>
  17. #include <NdbMain.h>
  18. #include <NdbOut.hpp>
  19. #include <NdbEnv.h>
  20. #include <NdbTest.hpp>
  21. #include "userInterface.h"
  22. #include "dbGenerator.h"
  23. static int   numProcesses;
  24. static int   numSeconds;
  25. static int   numWarmSeconds;
  26. static int   parallellism;
  27. static int   millisSendPoll;
  28. static int   minEventSendPoll;
  29. static int   forceSendPoll;
  30. static ThreadData *data;
  31. static void usage(const char *prog)
  32. {
  33.   const char  *progname;
  34.    /*--------------------------------------------*/
  35.    /* Get the name of the program (without path) */
  36.    /*--------------------------------------------*/
  37.    progname = strrchr(prog, '/');
  38.    if (progname == 0)
  39.      progname = prog;
  40.    else
  41.      ++progname;
  42.    ndbout_c(
  43.            "Usage: %s [-proc <num>] [-warm <num>] [-time <num>] [ -p <num>] " 
  44.    "[-t <num> ] [ -e <num> ] [ -f <num>] n"
  45.            "  -proc <num>    Specifies that <num> is the number ofn"
  46.            "                 threads. The default is 1.n"
  47.            "  -time <num>    Specifies that the test will run for <num> sec.n"
  48.            "                 The default is 10 secn"
  49.            "  -warm <num>    Specifies the warm-up/cooldown period of <num> "
  50.    "sec.n"
  51.            "                 The default is 10 secn"
  52.    "  -p <num>       The no of parallell transactions started by "
  53.    "one threadn"
  54.    "  -e <num>       Minimum no of events before wake up in call to "
  55.    "sendPolln"
  56.    "                 Default is 1n"
  57.    "  -f <num>       force parameter to sendPolln"
  58.    "                 Default is 0n",
  59.            progname);
  60. }
  61. static
  62. int
  63. parse_args(int argc, const char **argv)
  64. {
  65.    int i;
  66.    numProcesses     = 1;
  67.    numSeconds       = 10;
  68.    numWarmSeconds   = 10;
  69.    parallellism     = 1;
  70.    millisSendPoll   = 10000;
  71.    minEventSendPoll = 1;
  72.    forceSendPoll    = 0;
  73.    
  74.    i = 1;
  75.    while (i < argc){
  76.      if (strcmp("-proc",argv[i]) == 0) {
  77.        if (i + 1 >= argc) {
  78.  return 1;
  79.        }
  80.        if (sscanf(argv[i+1], "%d", &numProcesses) == -1 ||
  81.    numProcesses <= 0 || numProcesses > 127) {
  82.  ndbout_c("-proc flag requires a positive integer argument [1..127]");
  83.  return 1;
  84.        }
  85.        i += 2;
  86.      } else if (strcmp("-p", argv[i]) == 0){
  87.        if(i + 1 >= argc){
  88.  usage(argv[0]);
  89.  return 1;
  90.        }
  91.        if (sscanf(argv[i+1], "%d", &parallellism) == -1 ||
  92.    parallellism <= 0){
  93.  ndbout_c("-p flag requires a positive integer argument");
  94.  return 1;
  95.        }
  96.        i += 2;
  97.      }
  98.      else if (strcmp("-time",argv[i]) == 0) {
  99.        if (i + 1 >= argc) {
  100.  return 1;
  101.        }
  102.        if (sscanf(argv[i+1], "%d", &numSeconds) == -1 ||
  103.    numSeconds < 0) {
  104.  ndbout_c("-time flag requires a positive integer argument");
  105.  return 1;
  106.        }
  107.        i += 2;
  108.      }
  109.      else if (strcmp("-warm",argv[i]) == 0) {
  110.        if (i + 1 >= argc) {
  111.  return 1;
  112.        }
  113.        if (sscanf(argv[i+1], "%d", &numWarmSeconds) == -1 ||
  114.    numWarmSeconds < 0) {
  115.  ndbout_c("-warm flag requires a positive integer argument");
  116.  return 1;
  117.        }
  118.        i += 2;
  119.      }
  120.      else if (strcmp("-e",argv[i]) == 0) {
  121.        if (i + 1 >= argc) {
  122.  return 1;
  123.        }
  124.        if (sscanf(argv[i+1], "%d", &minEventSendPoll) == -1 ||
  125.    minEventSendPoll < 0) {
  126.  ndbout_c("-e flag requires a positive integer argument");
  127.  return 1;
  128.        }
  129.        i += 2;
  130.      }
  131.      else if (strcmp("-f",argv[i]) == 0) {
  132.        if (i + 1 >= argc) {
  133.  usage(argv[0]);
  134.  return 1;
  135.        }
  136.        if (sscanf(argv[i+1], "%d", &forceSendPoll) == -1 ||
  137.    forceSendPoll < 0) {
  138.  ndbout_c("-f flag requires a positive integer argument");
  139.  return 1;
  140.        }
  141.        i += 2;
  142.      }
  143.      else {
  144.        return 1;
  145.      }
  146.    }
  147.    if(minEventSendPoll > parallellism){
  148.      ndbout_c("minEventSendPoll(%d) > parallellism(%d)",
  149.      minEventSendPoll, parallellism);
  150.      ndbout_c("not very good...");
  151.      ndbout_c("very bad...");
  152.      ndbout_c("exiting...");
  153.      return 1;
  154.    }
  155.    return 0;
  156. }
  157. static 
  158. void 
  159. print_transaction(const char            *header,
  160.   unsigned long          totalCount,
  161.   TransactionDefinition *trans,
  162.   unsigned int           printBranch,
  163.   unsigned int           printRollback)
  164. {
  165.   double f;
  166.   
  167.   ndbout_c("  %s: %d (%.2f%%) "
  168.    "Latency(ms) avg: %d min: %d max: %d std: %d n: %d",
  169.    header,
  170.    trans->count,
  171.    (double)trans->count / (double)totalCount * 100.0,
  172.    (int)trans->latency.getMean(),
  173.    (int)trans->latency.getMin(),
  174.    (int)trans->latency.getMax(),
  175.    (int)trans->latency.getStddev(),
  176.    (int)trans->latency.getCount()
  177.    );
  178.   
  179.   if( printBranch ){
  180.     if( trans->count == 0 )
  181.       f = 0.0;
  182.     else
  183.       f = (double)trans->branchExecuted / (double)trans->count * 100.0;
  184.     ndbout_c("      Branches Executed: %d (%.2f%%)", trans->branchExecuted, f);
  185.   }
  186.   
  187.   if( printRollback ){
  188.     if( trans->count == 0 )
  189.       f = 0.0;
  190.     else
  191.       f = (double)trans->rollbackExecuted / (double)trans->count * 100.0;
  192.     ndbout_c("      Rollback Executed: %d (%.2f%%)",trans->rollbackExecuted,f);
  193.   }
  194. }
  195. void 
  196. print_stats(const char       *title,
  197.     unsigned int      length,
  198.     unsigned int      transactionFlag,
  199.     GeneratorStatistics *gen,
  200.     int numProc, int parallellism)
  201. {
  202.   int    i;
  203.   char buf[10];
  204.   char name[MAXHOSTNAMELEN];
  205.   
  206.   name[0] = 0;
  207.   NdbHost_GetHostName(name);
  208.   
  209.   ndbout_c("n------ %s ------",title);
  210.   ndbout_c("Length        : %d %s",
  211.  length,
  212.  transactionFlag ? "Transactions" : "sec");
  213.   ndbout_c("Processor     : %s", name);
  214.   ndbout_c("Number of Proc: %d",numProc);
  215.   ndbout_c("Parallellism  : %d", parallellism);
  216.   ndbout_c("n");
  217.   if( gen->totalTransactions == 0 ) {
  218.     ndbout_c("   No Transactions for this test");
  219.   }
  220.   else {
  221.     for(i = 0; i < 5; i++) {
  222.       sprintf(buf, "T%d",i+1);
  223.       print_transaction(buf,
  224. gen->totalTransactions,
  225. &gen->transactions[i],
  226. i >= 2,
  227. i >= 3 );
  228.     }
  229.     
  230.     ndbout_c("n");
  231.     ndbout_c("  Overall Statistics:");
  232.     ndbout_c("     Transactions: %d", gen->totalTransactions);
  233.     ndbout_c("     Outer       : %.0f TPS",gen->outerTps);
  234.     ndbout_c("n");
  235.   }
  236. }
  237. static 
  238. void *
  239. threadRoutine(void *arg)
  240. {
  241.   int i;
  242.   ThreadData *data = (ThreadData *)arg;
  243.   Ndb * pNDB;
  244.   pNDB = asyncDbConnect(parallellism);       
  245.   /* NdbSleep_MilliSleep(rand() % 10); */
  246.   for(i = 0; i<parallellism; i++){
  247.     data[i].pNDB = pNDB;
  248.   }
  249.   millisSendPoll = 30000;
  250.   asyncGenerator(data, parallellism,
  251.  millisSendPoll, minEventSendPoll, forceSendPoll);
  252.   asyncDbDisconnect(pNDB);
  253.   return NULL;
  254. }
  255. NDB_COMMAND(DbAsyncGenerator, "DbAsyncGenerator",
  256.     "DbAsyncGenerator", "DbAsyncGenerator", 65535)
  257. {
  258.   ndb_init();
  259.   int i;
  260.   int j;
  261.   int k;
  262.   struct NdbThread* pThread = NULL;
  263.   GeneratorStatistics  stats;
  264.   GeneratorStatistics *p;
  265.   char threadName[32];
  266.   int rc = NDBT_OK;
  267.   void* tmp = NULL;
  268.   if(parse_args(argc,argv) != 0){
  269.     usage(argv[0]);
  270.     return NDBT_ProgramExit(NDBT_WRONGARGS);
  271.   }
  272.     
  273.   ndbout_c("nStarting Test with %d process(es) for %d %s parallellism %d",
  274.    numProcesses,
  275.    numSeconds,
  276.    "sec",
  277.    parallellism);
  278.   ndbout_c("   WarmUp/coolDown = %d sec", numWarmSeconds);
  279.   
  280.   data = (ThreadData*)malloc((numProcesses*parallellism)*sizeof(ThreadData));
  281.  
  282.   for(i = 0; i < numProcesses; i++) {
  283.     for(j = 0; j<parallellism; j++){
  284.       data[i*parallellism+j].warmUpSeconds   = numWarmSeconds;
  285.       data[i*parallellism+j].testSeconds     = numSeconds;
  286.       data[i*parallellism+j].coolDownSeconds = numWarmSeconds;
  287.       data[i*parallellism+j].randomSeed      = 
  288. NdbTick_CurrentMillisecond()+i+j;
  289.       data[i*parallellism+j].changedTime     = 0;
  290.       data[i*parallellism+j].runState        = Runnable;
  291.     }
  292.     sprintf(threadName, "AsyncThread[%d]", i);
  293.     pThread = NdbThread_Create(threadRoutine, 
  294.       (void**)&data[i*parallellism], 
  295.       65535, 
  296.       threadName,
  297.                               NDB_THREAD_PRIO_LOW);
  298.     if(pThread != 0 && pThread != NULL){
  299.       (&data[i*parallellism])->pThread = pThread;
  300.     } else {      
  301.       perror("Failed to create thread");
  302.       rc = NDBT_FAILED;
  303.     }
  304.   }
  305.   showTime();
  306.   /*--------------------------------*/
  307.   /* Wait for all processes to exit */
  308.   /*--------------------------------*/
  309.   for(i = 0; i < numProcesses; i++) {
  310.     NdbThread_WaitFor(data[i*parallellism].pThread, &tmp);
  311.     NdbThread_Destroy(&data[i*parallellism].pThread);
  312.   }
  313.    
  314.   ndbout_c("All threads have finished");
  315.   
  316.   /*-------------------------------------------*/
  317.   /* Clear all structures for total statistics */
  318.   /*-------------------------------------------*/
  319.   stats.totalTransactions = 0;
  320.   stats.outerTps          = 0.0;
  321.   
  322.   for(i = 0; i < NUM_TRANSACTION_TYPES; i++ ) {
  323.     stats.transactions[i].count            = 0;
  324.     stats.transactions[i].branchExecuted   = 0;
  325.     stats.transactions[i].rollbackExecuted = 0;
  326.     stats.transactions[i].latency.reset();
  327.   }
  328.   
  329.   /*--------------------------------*/
  330.   /* Add the values for all Threads */
  331.   /*--------------------------------*/
  332.   for(i = 0; i < numProcesses; i++) {
  333.     for(k = 0; k<parallellism; k++){
  334.       p = &data[i*parallellism+k].generator;
  335.       
  336.       stats.totalTransactions += p->totalTransactions;
  337.       stats.outerTps          += p->outerTps;
  338.       
  339.       for(j = 0; j < NUM_TRANSACTION_TYPES; j++ ) {
  340. stats.transactions[j].count += 
  341.   p->transactions[j].count;
  342. stats.transactions[j].branchExecuted += 
  343.   p->transactions[j].branchExecuted;
  344. stats.transactions[j].rollbackExecuted += 
  345.   p->transactions[j].rollbackExecuted;
  346. stats.transactions[j].latency += 
  347.   p->transactions[j].latency;
  348.       }
  349.     }
  350.   }
  351.   print_stats("Test Results", 
  352.       numSeconds,
  353.       0,
  354.       &stats,
  355.       numProcesses,
  356.       parallellism);
  357.   free(data);
  358.   
  359.   NDBT_ProgramExit(rc);
  360. }