asyncGenerator.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:15k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. /***************************************************************
  14. * I N C L U D E D   F I L E S                                  *
  15. ***************************************************************/
  16. #include <ndb_global.h>
  17. #include "dbGenerator.h"
  18. #include <NdbApi.hpp>
  19. #include <NdbOut.hpp>
  20. #include <NdbSleep.h>
  21. /***************************************************************
  22. * L O C A L   C O N S T A N T S                                *
  23. ***************************************************************/
  24. /***************************************************************
  25. * L O C A L   D A T A   S T R U C T U R E S                    *
  26. ***************************************************************/
  27. /***************************************************************
  28. * L O C A L   F U N C T I O N S                                *
  29. ***************************************************************/
  30. static void getRandomSubscriberNumber(SubscriberNumber number);
  31. static void getRandomServerId(ServerId *serverId);
  32. static void getRandomChangedBy(ChangedBy changedBy);
  33. static void getRandomChangedTime(ChangedTime changedTime);
  34. static void clearTransaction(TransactionDefinition *trans);
  35. static void initGeneratorStatistics(GeneratorStatistics *gen);
  36. static void doOneTransaction(ThreadData * td, 
  37.      int parallellism,
  38.      int millisSendPoll,
  39.      int minEventSendPoll,
  40.      int forceSendPoll);
  41. static void doTransaction_T1(Ndb * pNDB, ThreadData * td, int async);
  42. static void doTransaction_T2(Ndb * pNDB, ThreadData * td, int async);
  43. static void doTransaction_T3(Ndb * pNDB, ThreadData * td, int async);
  44. static void doTransaction_T4(Ndb * pNDB, ThreadData * td, int async);
  45. static void doTransaction_T5(Ndb * pNDB, ThreadData * td, int async);
  46. /***************************************************************
  47. * L O C A L   D A T A                                          *
  48. ***************************************************************/
  49. static SequenceValues transactionDefinition[] = {
  50.    {25, 1},
  51.    {25, 2},
  52.    {20, 3},
  53.    {15, 4},
  54.    {15, 5},
  55.    {0,  0}
  56. };
  57. static SequenceValues rollbackDefinition[] = {
  58.    {98, 0},
  59.    {2 , 1},
  60.    {0,  0}
  61. };
  62. static int maxsize = 0;
  63. /***************************************************************
  64. * P U B L I C   D A T A                                        *
  65. ***************************************************************/
  66. /***************************************************************
  67. ****************************************************************
  68. * L O C A L   F U N C T I O N S   C O D E   S E C T I O N      *
  69. ****************************************************************
  70. ***************************************************************/
  71. static void getRandomSubscriberNumber(SubscriberNumber number)
  72. {
  73.    uint32 tmp;
  74.    char sbuf[SUBSCRIBER_NUMBER_LENGTH + 1];
  75.    tmp = myRandom48(NO_OF_SUBSCRIBERS);
  76.    sprintf(sbuf, "%.*d", SUBSCRIBER_NUMBER_LENGTH, tmp);
  77.    memcpy(number, sbuf, SUBSCRIBER_NUMBER_LENGTH);
  78. }
  79. static void getRandomServerId(ServerId *serverId)
  80. {
  81.    *serverId = myRandom48(NO_OF_SERVERS);
  82. }
  83. static void getRandomChangedBy(ChangedBy changedBy)
  84. {
  85.    memset(changedBy, myRandom48(26)+'A', CHANGED_BY_LENGTH);
  86.    changedBy[CHANGED_BY_LENGTH] = 0;
  87. }
  88. static void getRandomChangedTime(ChangedTime changedTime)
  89. {
  90.    memset(changedTime, myRandom48(26)+'A', CHANGED_TIME_LENGTH);
  91.    changedTime[CHANGED_TIME_LENGTH] = 0;
  92. }
  93. static void clearTransaction(TransactionDefinition *trans)
  94. {
  95.   trans->count            = 0;
  96.   trans->branchExecuted   = 0;
  97.   trans->rollbackExecuted = 0;
  98.   trans->latencyCounter   = myRandom48(127);
  99.   trans->latency.reset();
  100. }
  101. static int listFull(SessionList *list)
  102. {
  103.    return(list->numberInList == SESSION_LIST_LENGTH);
  104. }
  105. static int listEmpty(SessionList *list)
  106. {
  107.    return(list->numberInList == 0);
  108. }
  109. static void insertSession(SessionList     *list, 
  110.                           SubscriberNumber number,
  111.                           ServerId         serverId)
  112. {
  113.    SessionElement *e;
  114.    if( listFull(list) ) return;
  115.    e = &list->list[list->writeIndex];
  116.    strcpy(e->subscriberNumber, number);
  117.    e->serverId = serverId;
  118.    list->writeIndex = (list->writeIndex + 1) % SESSION_LIST_LENGTH;
  119.    list->numberInList++;
  120.    if( list->numberInList > maxsize )
  121.      maxsize = list->numberInList;
  122. }
  123. static SessionElement *getNextSession(SessionList *list)
  124. {
  125.    if( listEmpty(list) ) return(0);
  126.    return(&list->list[list->readIndex]);
  127. }
  128. static void deleteSession(SessionList *list)
  129. {
  130.    if( listEmpty(list) ) return;
  131.    list->readIndex = (list->readIndex + 1) % SESSION_LIST_LENGTH;
  132.    list->numberInList--;
  133. }
  134. static void initGeneratorStatistics(GeneratorStatistics *gen)
  135. {
  136.    int i;
  137.    if( initSequence(&gen->transactionSequence,
  138.                     transactionDefinition) != 0 ) {
  139.       ndbout_c("could not set the transaction types");
  140.       exit(0);
  141.    }
  142.    if( initSequence(&gen->rollbackSequenceT4,
  143.                     rollbackDefinition) != 0 ) {
  144.       ndbout_c("could not set the rollback sequence");
  145.       exit(0);
  146.    }
  147.    if( initSequence(&gen->rollbackSequenceT5,
  148.                     rollbackDefinition) != 0 ) {
  149.       ndbout_c("could not set the rollback sequence");
  150.       exit(0);
  151.    }
  152.    for(i = 0; i < NUM_TRANSACTION_TYPES; i++ )
  153.       clearTransaction(&gen->transactions[i]);
  154.    gen->totalTransactions = 0;
  155.    gen->activeSessions.numberInList = 0;
  156.    gen->activeSessions.readIndex    = 0;
  157.    gen->activeSessions.writeIndex   = 0;
  158. }
  159. static 
  160. void 
  161. doOneTransaction(ThreadData * td, int p, int millis, int minEvents, int force)
  162. {
  163.   int i;
  164.   unsigned int transactionType;
  165.   int async = 1;
  166.   if (p == 1) {
  167.     async = 0;
  168.   }//if
  169.   for(i = 0; i<p; i++){
  170.     if(td[i].runState == Runnable){
  171.       transactionType = getNextRandom(&td[i].generator.transactionSequence);
  172.       switch(transactionType) {
  173.       case 1:
  174. doTransaction_T1(td[i].pNDB, &td[i], async);
  175. break;
  176.       case 2:
  177. doTransaction_T2(td[i].pNDB, &td[i], async);
  178. break;
  179.       case 3:
  180. doTransaction_T3(td[i].pNDB, &td[i], async);
  181. break;
  182.       case 4:
  183. doTransaction_T4(td[i].pNDB, &td[i], async);
  184. break;
  185.       case 5:
  186. doTransaction_T5(td[i].pNDB, &td[i], async);
  187. break;
  188.       default:
  189. ndbout_c("Unknown transaction type: %d", transactionType);
  190.       }
  191.     }
  192.   }
  193.   if (async == 1) {
  194.     td[0].pNDB->sendPollNdb(millis, minEvents, force);
  195.   }//if
  196. }  
  197. static 
  198. void 
  199. doTransaction_T1(Ndb * pNDB, ThreadData * td, int async)
  200. {
  201.   /*----------------*/
  202.   /* Init arguments */
  203.   /*----------------*/
  204.   getRandomSubscriberNumber(td->transactionData.number);
  205.   getRandomChangedBy(td->transactionData.changed_by);
  206.   BaseString::snprintf(td->transactionData.changed_time,
  207.    sizeof(td->transactionData.changed_time),
  208.    "%ld - %d", td->changedTime++, myRandom48(65536*1024));
  209.   //getRandomChangedTime(td->transactionData.changed_time);
  210.   td->transactionData.location = td->transactionData.changed_by[0];
  211.   
  212.   /*-----------------*/
  213.   /* Run transaction */
  214.   /*-----------------*/
  215.   td->runState = Running;
  216.   td->generator.transactions[0].startLatency();
  217.   start_T1(pNDB, td, async);
  218. }
  219. static
  220. void 
  221. doTransaction_T2(Ndb * pNDB, ThreadData * td, int async)
  222. {
  223.   /*----------------*/
  224.   /* Init arguments */
  225.   /*----------------*/
  226.   getRandomSubscriberNumber(td->transactionData.number);
  227.   /*-----------------*/
  228.   /* Run transaction */
  229.   /*-----------------*/
  230.   td->runState = Running;
  231.   td->generator.transactions[1].startLatency();
  232.   start_T2(pNDB, td, async);
  233. }
  234. static
  235. void 
  236. doTransaction_T3(Ndb * pNDB, ThreadData * td, int async)
  237. {
  238.   SessionElement  *se;
  239.   
  240.   /*----------------*/
  241.   /* Init arguments */
  242.   /*----------------*/
  243.   se = getNextSession(&td->generator.activeSessions);
  244.   if( se ) {
  245.     strcpy(td->transactionData.number, se->subscriberNumber);
  246.     td->transactionData.server_id = se->serverId;
  247.     td->transactionData.sessionElement = 1;
  248.   } else {
  249.     getRandomSubscriberNumber(td->transactionData.number);
  250.     getRandomServerId(&td->transactionData.server_id);
  251.     td->transactionData.sessionElement = 0;
  252.   }
  253.   
  254.   td->transactionData.server_bit = (1 << td->transactionData.server_id);
  255.   /*-----------------*/
  256.   /* Run transaction */
  257.   /*-----------------*/
  258.   td->runState = Running;
  259.   td->generator.transactions[2].startLatency();
  260.   start_T3(pNDB, td, async);
  261. }
  262. static 
  263. void 
  264. doTransaction_T4(Ndb * pNDB, ThreadData * td, int async)
  265. {
  266.    /*----------------*/
  267.    /* Init arguments */
  268.    /*----------------*/
  269.   getRandomSubscriberNumber(td->transactionData.number);
  270.   getRandomServerId(&td->transactionData.server_id);
  271.   
  272.   td->transactionData.server_bit = (1 << td->transactionData.server_id);
  273.   td->transactionData.do_rollback = 
  274.     getNextRandom(&td->generator.rollbackSequenceT4);
  275. #if 0
  276.   memset(td->transactionData.session_details, 
  277.  myRandom48(26)+'A', SESSION_DETAILS_LENGTH);
  278. #endif
  279.   td->transactionData.session_details[SESSION_DETAILS_LENGTH] = 0;
  280.   
  281.   /*-----------------*/
  282.   /* Run transaction */
  283.   /*-----------------*/
  284.   td->runState = Running;
  285.   td->generator.transactions[3].startLatency();
  286.   start_T4(pNDB, td, async);
  287. }
  288. static 
  289. void 
  290. doTransaction_T5(Ndb * pNDB, ThreadData * td, int async)
  291. {
  292.   SessionElement * se;
  293.   se = getNextSession(&td->generator.activeSessions);
  294.   if( se ) {
  295.     strcpy(td->transactionData.number, se->subscriberNumber);
  296.     td->transactionData.server_id = se->serverId;
  297.     td->transactionData.sessionElement = 1;
  298.   }
  299.   else {
  300.     getRandomSubscriberNumber(td->transactionData.number);
  301.     getRandomServerId(&td->transactionData.server_id);
  302.     td->transactionData.sessionElement = 0;
  303.   }
  304.   
  305.   td->transactionData.server_bit = (1 << td->transactionData.server_id);
  306.   td->transactionData.do_rollback  
  307.     = getNextRandom(&td->generator.rollbackSequenceT5);
  308.   
  309.   /*-----------------*/
  310.   /* Run transaction */
  311.   /*-----------------*/
  312.   td->runState = Running;
  313.   td->generator.transactions[4].startLatency();
  314.   start_T5(pNDB, td, async);
  315. }
  316. void
  317. complete_T1(ThreadData * data){
  318.   data->generator.transactions[0].stopLatency();
  319.   data->generator.transactions[0].count++;
  320.   data->runState = Runnable;
  321.   data->generator.totalTransactions++;
  322. }
  323. void 
  324. complete_T2(ThreadData * data){
  325.   data->generator.transactions[1].stopLatency();
  326.   data->generator.transactions[1].count++;
  327.   data->runState = Runnable;
  328.   data->generator.totalTransactions++;
  329. }
  330. void 
  331. complete_T3(ThreadData * data){
  332.   data->generator.transactions[2].stopLatency();
  333.   data->generator.transactions[2].count++;
  334.   if(data->transactionData.branchExecuted)
  335.     data->generator.transactions[2].branchExecuted++;
  336.   data->runState = Runnable;
  337.   data->generator.totalTransactions++;
  338. }
  339. void 
  340. complete_T4(ThreadData * data){
  341.   data->generator.transactions[3].stopLatency();
  342.   data->generator.transactions[3].count++;
  343.   if(data->transactionData.branchExecuted)
  344.     data->generator.transactions[3].branchExecuted++;
  345.   if(data->transactionData.do_rollback)
  346.     data->generator.transactions[3].rollbackExecuted++;
  347.   
  348.   if(data->transactionData.branchExecuted &&
  349.      !data->transactionData.do_rollback){
  350.     insertSession(&data->generator.activeSessions, 
  351.   data->transactionData.number, 
  352.   data->transactionData.server_id);
  353.   }
  354.   data->runState = Runnable;
  355.   data->generator.totalTransactions++;
  356. }
  357. void 
  358. complete_T5(ThreadData * data){
  359.   data->generator.transactions[4].stopLatency();
  360.   data->generator.transactions[4].count++;
  361.   if(data->transactionData.branchExecuted)
  362.     data->generator.transactions[4].branchExecuted++;
  363.   if(data->transactionData.do_rollback)
  364.     data->generator.transactions[4].rollbackExecuted++;
  365.   
  366.   if(data->transactionData.sessionElement && 
  367.      !data->transactionData.do_rollback){
  368.     deleteSession(&data->generator.activeSessions);
  369.   }
  370.   
  371.   data->runState = Runnable;
  372.   data->generator.totalTransactions++;
  373. }
  374. /***************************************************************
  375. ****************************************************************
  376. * P U B L I C   F U N C T I O N S   C O D E   S E C T I O N    *
  377. ****************************************************************
  378. ***************************************************************/
  379. void 
  380. asyncGenerator(ThreadData *data, 
  381.        int parallellism, 
  382.        int millisSendPoll,
  383.        int minEventSendPoll,
  384.        int forceSendPoll)
  385. {
  386.   ThreadData * startUp;
  387.   
  388.   GeneratorStatistics *st;
  389.   double periodStop;
  390.   double benchTimeStart;
  391.   double benchTimeEnd;
  392.   int i, j, done;
  393.   myRandom48Init(data->randomSeed);
  394.   
  395.   for(i = 0; i<parallellism; i++){
  396.     initGeneratorStatistics(&data[i].generator);
  397.    }
  398.   startUp = (ThreadData*)malloc(parallellism * sizeof(ThreadData));
  399.   memcpy(startUp, data, (parallellism * sizeof(ThreadData)));
  400.   
  401.   /*----------------*/
  402.   /* warm up period */
  403.   /*----------------*/
  404.   periodStop = userGetTime() + (double)data[0].warmUpSeconds;
  405.   
  406.   while(userGetTime() < periodStop){
  407.     doOneTransaction(startUp, parallellism, 
  408.      millisSendPoll, minEventSendPoll, forceSendPoll);
  409.   }
  410.   
  411.   ndbout_c("Waiting for startup to finish");
  412.   /**
  413.    * Wait for all transactions
  414.    */
  415.   done = 0;
  416.   while(!done){
  417.     done = 1;
  418.     for(i = 0; i<parallellism; i++){
  419.       if(startUp[i].runState != Runnable){
  420. done = 0;
  421. break;
  422.       }
  423.     }
  424.     if(!done){
  425.       startUp[0].pNDB->sendPollNdb();
  426.     }
  427.   }
  428.   ndbout_c("Benchmark period starts");
  429.   /*-------------------------*/
  430.   /* normal benchmark period */
  431.   /*-------------------------*/
  432.   benchTimeStart = userGetTime();
  433.   
  434.   periodStop = benchTimeStart + (double)data[0].testSeconds;
  435.   while(userGetTime() < periodStop)
  436.     doOneTransaction(data, parallellism,
  437.      millisSendPoll, minEventSendPoll, forceSendPoll);  
  438.   benchTimeEnd = userGetTime();
  439.   
  440.   ndbout_c("Benchmark period done");
  441.   /**
  442.    * Wait for all transactions
  443.    */
  444.   done = 0;
  445.   while(!done){
  446.     done = 1;
  447.     for(i = 0; i<parallellism; i++){
  448.       if(data[i].runState != Runnable){
  449. done = 0;
  450. break;
  451.       }
  452.     }
  453.     if(!done){
  454.       data[0].pNDB->sendPollNdb();
  455.     }
  456.   }
  457.   /*------------------*/
  458.   /* cool down period */
  459.    /*------------------*/
  460.   periodStop = userGetTime() + (double)data[0].coolDownSeconds;
  461.   while(userGetTime() < periodStop){
  462.     doOneTransaction(startUp, parallellism,
  463.      millisSendPoll, minEventSendPoll, forceSendPoll);
  464.   }
  465.   done = 0;
  466.   while(!done){
  467.     done = 1;
  468.     for(i = 0; i<parallellism; i++){
  469.       if(startUp[i].runState != Runnable){
  470. done = 0;
  471. break;
  472.       }
  473.     }
  474.     if(!done){
  475.       startUp[0].pNDB->sendPollNdb();
  476.     }
  477.   }
  478.   /*---------------------------------------------------------*/
  479.   /* add the times for all transaction for inner loop timing */
  480.   /*---------------------------------------------------------*/
  481.   for(j = 0; j<parallellism; j++){
  482.     st = &data[j].generator;
  483.     
  484.     st->outerLoopTime = benchTimeEnd - benchTimeStart;
  485.     st->outerTps      = getTps(st->totalTransactions, st->outerLoopTime);
  486.   }
  487.   /* ndbout_c("maxsize = %dn",maxsize); */
  488.   free(startUp);
  489. }