asyncGenerator.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:15k
源码类别:
MySQL数据库
开发平台:
Visual C++
- /* Copyright (C) 2003 MySQL AB
- This program is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation; either version 2 of the License, or
- (at your option) any later version.
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
- You should have received a copy of the GNU General Public License
- along with this program; if not, write to the Free Software
- Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
- /***************************************************************
- * I N C L U D E D F I L E S *
- ***************************************************************/
- #include <ndb_global.h>
- #include "dbGenerator.h"
- #include <NdbApi.hpp>
- #include <NdbOut.hpp>
- #include <NdbSleep.h>
- /***************************************************************
- * L O C A L C O N S T A N T S *
- ***************************************************************/
- /***************************************************************
- * L O C A L D A T A S T R U C T U R E S *
- ***************************************************************/
- /***************************************************************
- * L O C A L F U N C T I O N S *
- ***************************************************************/
- static void getRandomSubscriberNumber(SubscriberNumber number);
- static void getRandomServerId(ServerId *serverId);
- static void getRandomChangedBy(ChangedBy changedBy);
- static void getRandomChangedTime(ChangedTime changedTime);
- static void clearTransaction(TransactionDefinition *trans);
- static void initGeneratorStatistics(GeneratorStatistics *gen);
- static void doOneTransaction(ThreadData * td,
- int parallellism,
- int millisSendPoll,
- int minEventSendPoll,
- int forceSendPoll);
- static void doTransaction_T1(Ndb * pNDB, ThreadData * td, int async);
- static void doTransaction_T2(Ndb * pNDB, ThreadData * td, int async);
- static void doTransaction_T3(Ndb * pNDB, ThreadData * td, int async);
- static void doTransaction_T4(Ndb * pNDB, ThreadData * td, int async);
- static void doTransaction_T5(Ndb * pNDB, ThreadData * td, int async);
- /***************************************************************
- * L O C A L D A T A *
- ***************************************************************/
- static SequenceValues transactionDefinition[] = {
- {25, 1},
- {25, 2},
- {20, 3},
- {15, 4},
- {15, 5},
- {0, 0}
- };
- static SequenceValues rollbackDefinition[] = {
- {98, 0},
- {2 , 1},
- {0, 0}
- };
- static int maxsize = 0;
- /***************************************************************
- * P U B L I C D A T A *
- ***************************************************************/
- /***************************************************************
- ****************************************************************
- * L O C A L F U N C T I O N S C O D E S E C T I O N *
- ****************************************************************
- ***************************************************************/
- static void getRandomSubscriberNumber(SubscriberNumber number)
- {
- uint32 tmp;
- char sbuf[SUBSCRIBER_NUMBER_LENGTH + 1];
- tmp = myRandom48(NO_OF_SUBSCRIBERS);
- sprintf(sbuf, "%.*d", SUBSCRIBER_NUMBER_LENGTH, tmp);
- memcpy(number, sbuf, SUBSCRIBER_NUMBER_LENGTH);
- }
- static void getRandomServerId(ServerId *serverId)
- {
- *serverId = myRandom48(NO_OF_SERVERS);
- }
- static void getRandomChangedBy(ChangedBy changedBy)
- {
- memset(changedBy, myRandom48(26)+'A', CHANGED_BY_LENGTH);
- changedBy[CHANGED_BY_LENGTH] = 0;
- }
- static void getRandomChangedTime(ChangedTime changedTime)
- {
- memset(changedTime, myRandom48(26)+'A', CHANGED_TIME_LENGTH);
- changedTime[CHANGED_TIME_LENGTH] = 0;
- }
- static void clearTransaction(TransactionDefinition *trans)
- {
- trans->count = 0;
- trans->branchExecuted = 0;
- trans->rollbackExecuted = 0;
- trans->latencyCounter = myRandom48(127);
- trans->latency.reset();
- }
- static int listFull(SessionList *list)
- {
- return(list->numberInList == SESSION_LIST_LENGTH);
- }
- static int listEmpty(SessionList *list)
- {
- return(list->numberInList == 0);
- }
- static void insertSession(SessionList *list,
- SubscriberNumber number,
- ServerId serverId)
- {
- SessionElement *e;
- if( listFull(list) ) return;
- e = &list->list[list->writeIndex];
- strcpy(e->subscriberNumber, number);
- e->serverId = serverId;
- list->writeIndex = (list->writeIndex + 1) % SESSION_LIST_LENGTH;
- list->numberInList++;
- if( list->numberInList > maxsize )
- maxsize = list->numberInList;
- }
- static SessionElement *getNextSession(SessionList *list)
- {
- if( listEmpty(list) ) return(0);
- return(&list->list[list->readIndex]);
- }
- static void deleteSession(SessionList *list)
- {
- if( listEmpty(list) ) return;
- list->readIndex = (list->readIndex + 1) % SESSION_LIST_LENGTH;
- list->numberInList--;
- }
- static void initGeneratorStatistics(GeneratorStatistics *gen)
- {
- int i;
- if( initSequence(&gen->transactionSequence,
- transactionDefinition) != 0 ) {
- ndbout_c("could not set the transaction types");
- exit(0);
- }
- if( initSequence(&gen->rollbackSequenceT4,
- rollbackDefinition) != 0 ) {
- ndbout_c("could not set the rollback sequence");
- exit(0);
- }
- if( initSequence(&gen->rollbackSequenceT5,
- rollbackDefinition) != 0 ) {
- ndbout_c("could not set the rollback sequence");
- exit(0);
- }
- for(i = 0; i < NUM_TRANSACTION_TYPES; i++ )
- clearTransaction(&gen->transactions[i]);
- gen->totalTransactions = 0;
- gen->activeSessions.numberInList = 0;
- gen->activeSessions.readIndex = 0;
- gen->activeSessions.writeIndex = 0;
- }
- static
- void
- doOneTransaction(ThreadData * td, int p, int millis, int minEvents, int force)
- {
- int i;
- unsigned int transactionType;
- int async = 1;
- if (p == 1) {
- async = 0;
- }//if
- for(i = 0; i<p; i++){
- if(td[i].runState == Runnable){
- transactionType = getNextRandom(&td[i].generator.transactionSequence);
- switch(transactionType) {
- case 1:
- doTransaction_T1(td[i].pNDB, &td[i], async);
- break;
- case 2:
- doTransaction_T2(td[i].pNDB, &td[i], async);
- break;
- case 3:
- doTransaction_T3(td[i].pNDB, &td[i], async);
- break;
- case 4:
- doTransaction_T4(td[i].pNDB, &td[i], async);
- break;
- case 5:
- doTransaction_T5(td[i].pNDB, &td[i], async);
- break;
- default:
- ndbout_c("Unknown transaction type: %d", transactionType);
- }
- }
- }
- if (async == 1) {
- td[0].pNDB->sendPollNdb(millis, minEvents, force);
- }//if
- }
- static
- void
- doTransaction_T1(Ndb * pNDB, ThreadData * td, int async)
- {
- /*----------------*/
- /* Init arguments */
- /*----------------*/
- getRandomSubscriberNumber(td->transactionData.number);
- getRandomChangedBy(td->transactionData.changed_by);
- BaseString::snprintf(td->transactionData.changed_time,
- sizeof(td->transactionData.changed_time),
- "%ld - %d", td->changedTime++, myRandom48(65536*1024));
- //getRandomChangedTime(td->transactionData.changed_time);
- td->transactionData.location = td->transactionData.changed_by[0];
- /*-----------------*/
- /* Run transaction */
- /*-----------------*/
- td->runState = Running;
- td->generator.transactions[0].startLatency();
- start_T1(pNDB, td, async);
- }
- static
- void
- doTransaction_T2(Ndb * pNDB, ThreadData * td, int async)
- {
- /*----------------*/
- /* Init arguments */
- /*----------------*/
- getRandomSubscriberNumber(td->transactionData.number);
- /*-----------------*/
- /* Run transaction */
- /*-----------------*/
- td->runState = Running;
- td->generator.transactions[1].startLatency();
- start_T2(pNDB, td, async);
- }
- static
- void
- doTransaction_T3(Ndb * pNDB, ThreadData * td, int async)
- {
- SessionElement *se;
- /*----------------*/
- /* Init arguments */
- /*----------------*/
- se = getNextSession(&td->generator.activeSessions);
- if( se ) {
- strcpy(td->transactionData.number, se->subscriberNumber);
- td->transactionData.server_id = se->serverId;
- td->transactionData.sessionElement = 1;
- } else {
- getRandomSubscriberNumber(td->transactionData.number);
- getRandomServerId(&td->transactionData.server_id);
- td->transactionData.sessionElement = 0;
- }
- td->transactionData.server_bit = (1 << td->transactionData.server_id);
- /*-----------------*/
- /* Run transaction */
- /*-----------------*/
- td->runState = Running;
- td->generator.transactions[2].startLatency();
- start_T3(pNDB, td, async);
- }
- static
- void
- doTransaction_T4(Ndb * pNDB, ThreadData * td, int async)
- {
- /*----------------*/
- /* Init arguments */
- /*----------------*/
- getRandomSubscriberNumber(td->transactionData.number);
- getRandomServerId(&td->transactionData.server_id);
- td->transactionData.server_bit = (1 << td->transactionData.server_id);
- td->transactionData.do_rollback =
- getNextRandom(&td->generator.rollbackSequenceT4);
- #if 0
- memset(td->transactionData.session_details,
- myRandom48(26)+'A', SESSION_DETAILS_LENGTH);
- #endif
- td->transactionData.session_details[SESSION_DETAILS_LENGTH] = 0;
- /*-----------------*/
- /* Run transaction */
- /*-----------------*/
- td->runState = Running;
- td->generator.transactions[3].startLatency();
- start_T4(pNDB, td, async);
- }
- static
- void
- doTransaction_T5(Ndb * pNDB, ThreadData * td, int async)
- {
- SessionElement * se;
- se = getNextSession(&td->generator.activeSessions);
- if( se ) {
- strcpy(td->transactionData.number, se->subscriberNumber);
- td->transactionData.server_id = se->serverId;
- td->transactionData.sessionElement = 1;
- }
- else {
- getRandomSubscriberNumber(td->transactionData.number);
- getRandomServerId(&td->transactionData.server_id);
- td->transactionData.sessionElement = 0;
- }
- td->transactionData.server_bit = (1 << td->transactionData.server_id);
- td->transactionData.do_rollback
- = getNextRandom(&td->generator.rollbackSequenceT5);
- /*-----------------*/
- /* Run transaction */
- /*-----------------*/
- td->runState = Running;
- td->generator.transactions[4].startLatency();
- start_T5(pNDB, td, async);
- }
- void
- complete_T1(ThreadData * data){
- data->generator.transactions[0].stopLatency();
- data->generator.transactions[0].count++;
- data->runState = Runnable;
- data->generator.totalTransactions++;
- }
- void
- complete_T2(ThreadData * data){
- data->generator.transactions[1].stopLatency();
- data->generator.transactions[1].count++;
- data->runState = Runnable;
- data->generator.totalTransactions++;
- }
- void
- complete_T3(ThreadData * data){
- data->generator.transactions[2].stopLatency();
- data->generator.transactions[2].count++;
- if(data->transactionData.branchExecuted)
- data->generator.transactions[2].branchExecuted++;
- data->runState = Runnable;
- data->generator.totalTransactions++;
- }
- void
- complete_T4(ThreadData * data){
- data->generator.transactions[3].stopLatency();
- data->generator.transactions[3].count++;
- if(data->transactionData.branchExecuted)
- data->generator.transactions[3].branchExecuted++;
- if(data->transactionData.do_rollback)
- data->generator.transactions[3].rollbackExecuted++;
- if(data->transactionData.branchExecuted &&
- !data->transactionData.do_rollback){
- insertSession(&data->generator.activeSessions,
- data->transactionData.number,
- data->transactionData.server_id);
- }
- data->runState = Runnable;
- data->generator.totalTransactions++;
- }
- void
- complete_T5(ThreadData * data){
- data->generator.transactions[4].stopLatency();
- data->generator.transactions[4].count++;
- if(data->transactionData.branchExecuted)
- data->generator.transactions[4].branchExecuted++;
- if(data->transactionData.do_rollback)
- data->generator.transactions[4].rollbackExecuted++;
- if(data->transactionData.sessionElement &&
- !data->transactionData.do_rollback){
- deleteSession(&data->generator.activeSessions);
- }
- data->runState = Runnable;
- data->generator.totalTransactions++;
- }
- /***************************************************************
- ****************************************************************
- * P U B L I C F U N C T I O N S C O D E S E C T I O N *
- ****************************************************************
- ***************************************************************/
- void
- asyncGenerator(ThreadData *data,
- int parallellism,
- int millisSendPoll,
- int minEventSendPoll,
- int forceSendPoll)
- {
- ThreadData * startUp;
- GeneratorStatistics *st;
- double periodStop;
- double benchTimeStart;
- double benchTimeEnd;
- int i, j, done;
- myRandom48Init(data->randomSeed);
- for(i = 0; i<parallellism; i++){
- initGeneratorStatistics(&data[i].generator);
- }
- startUp = (ThreadData*)malloc(parallellism * sizeof(ThreadData));
- memcpy(startUp, data, (parallellism * sizeof(ThreadData)));
- /*----------------*/
- /* warm up period */
- /*----------------*/
- periodStop = userGetTime() + (double)data[0].warmUpSeconds;
- while(userGetTime() < periodStop){
- doOneTransaction(startUp, parallellism,
- millisSendPoll, minEventSendPoll, forceSendPoll);
- }
- ndbout_c("Waiting for startup to finish");
- /**
- * Wait for all transactions
- */
- done = 0;
- while(!done){
- done = 1;
- for(i = 0; i<parallellism; i++){
- if(startUp[i].runState != Runnable){
- done = 0;
- break;
- }
- }
- if(!done){
- startUp[0].pNDB->sendPollNdb();
- }
- }
- ndbout_c("Benchmark period starts");
- /*-------------------------*/
- /* normal benchmark period */
- /*-------------------------*/
- benchTimeStart = userGetTime();
- periodStop = benchTimeStart + (double)data[0].testSeconds;
- while(userGetTime() < periodStop)
- doOneTransaction(data, parallellism,
- millisSendPoll, minEventSendPoll, forceSendPoll);
- benchTimeEnd = userGetTime();
- ndbout_c("Benchmark period done");
- /**
- * Wait for all transactions
- */
- done = 0;
- while(!done){
- done = 1;
- for(i = 0; i<parallellism; i++){
- if(data[i].runState != Runnable){
- done = 0;
- break;
- }
- }
- if(!done){
- data[0].pNDB->sendPollNdb();
- }
- }
- /*------------------*/
- /* cool down period */
- /*------------------*/
- periodStop = userGetTime() + (double)data[0].coolDownSeconds;
- while(userGetTime() < periodStop){
- doOneTransaction(startUp, parallellism,
- millisSendPoll, minEventSendPoll, forceSendPoll);
- }
- done = 0;
- while(!done){
- done = 1;
- for(i = 0; i<parallellism; i++){
- if(startUp[i].runState != Runnable){
- done = 0;
- break;
- }
- }
- if(!done){
- startUp[0].pNDB->sendPollNdb();
- }
- }
- /*---------------------------------------------------------*/
- /* add the times for all transaction for inner loop timing */
- /*---------------------------------------------------------*/
- for(j = 0; j<parallellism; j++){
- st = &data[j].generator;
- st->outerLoopTime = benchTimeEnd - benchTimeStart;
- st->outerTps = getTps(st->totalTransactions, st->outerLoopTime);
- }
- /* ndbout_c("maxsize = %dn",maxsize); */
- free(startUp);
- }