prioTransporterTest.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:20k
源码类别:
MySQL数据库
开发平台:
Visual C++
- /* Copyright (C) 2003 MySQL AB
- This program is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation; either version 2 of the License, or
- (at your option) any later version.
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
- You should have received a copy of the GNU General Public License
- along with this program; if not, write to the Free Software
- Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
- #include <ndb_global.h>
- #include "TransporterRegistry.hpp"
- #include "TransporterDefinitions.hpp"
- #include "TransporterCallback.hpp"
- #include <RefConvert.hpp>
- #include "prioTransporterTest.hpp"
- #include <NdbTick.h>
- #include <NdbMain.h>
- #include <NdbOut.hpp>
- #include <NdbSleep.h>
- int basePortTCP = 17000;
- SCI_TransporterConfiguration sciTemplate = {
- 2000,
- // Packet size
- 2000000, // Buffer size
- 2, // number of adapters
- 1, // remote node id SCI
- 2, // Remote node Id SCI
- 0, // local ndb node id (server)
- 0, // remote ndb node id (client)
- 0, // byteOrder;
- false, // compression;
- true, // checksum;
- true // signalId;
- };
- SHM_TransporterConfiguration shmTemplate = {
- 100000, // shmSize
- 0, // shmKey
- 1, // local ndb node id (server)
- 2, // remote ndb node id (client)
- 0, // byteOrder;
- false, // compression;
- true, // checksum;
- true // signalId;
- };
- TCP_TransporterConfiguration tcpTemplate = {
- 17000, // port;
- "", // remoteHostName;
- "", // localhostname
- 2, // remoteNodeId;
- 1, // localNodeId;
- 2000000, // sendBufferSize - Size of SendBuffer of priority B
- 2000, // maxReceiveSize - Maximum no of bytes to receive
- 0, // byteOrder;
- false, // compression;
- true, // checksum;
- true // signalId;
- };
- OSE_TransporterConfiguration oseTemplate = {
- "", // remoteHostName;
- "", // localHostName;
- 0, // remoteNodeId;
- 0, // localNodeId;
- false, // compression;
- true, // checksum;
- true, // signalId;
- 0, // byteOrder;
- 2000, // prioASignalSize;
- 2000, // prioBSignalSize;
- 10 // Recv buf size
- };
- TransporterRegistry *tReg = 0;
- #ifndef OSE_DELTA
- #include <signal.h>
- #endif
- extern "C"
- void
- signalHandler(int signo){
- #ifndef OSE_DELTA
- ::signal(13, signalHandler);
- #endif
- char buf[255];
- sprintf(buf,"Signal: %dn", signo);
- ndbout << buf << endl;
- }
- void
- usage(const char * progName){
- ndbout << "Usage: " << progName << " localNodeId localHostName"
- << " remoteHostName"
- << " [<loop count>] [<send buf size>] [<recv buf size>]" << endl;
- ndbout << " localNodeId - {1,2}" << endl;
- }
- typedef void (* CreateTransporterFunc)(void * conf,
- NodeId localNodeId,
- NodeId remoteNodeId,
- const char * localHostName,
- const char * remoteHostName,
- int sendBuf,
- int recvBuf);
- void
- createOSETransporter(void * _conf,
- NodeId localNodeId,
- NodeId remoteNodeId,
- const char * localHostName,
- const char * remoteHostName,
- int sendBuf,
- int recvBuf){
- ndbout << "Creating OSE transporter from node "
- << localNodeId << "(" << localHostName << ") to "
- << remoteNodeId << "(" << remoteHostName << ")..." << endl;;
- OSE_TransporterConfiguration * conf = (OSE_TransporterConfiguration*)_conf;
- if(sendBuf != -1){
- conf->prioBSignalSize = sendBuf;
- }
- if(recvBuf != -1){
- conf->receiveBufferSize = recvBuf;
- }
- ndbout << "tSendBufferSize: " << conf->prioBSignalSize << endl;
- ndbout << "tReceiveBufferSize: " << conf->receiveBufferSize << endl;
- conf->localNodeId = localNodeId;
- conf->localHostName = localHostName;
- conf->remoteNodeId = remoteNodeId;
- conf->remoteHostName = remoteHostName;
- bool res = tReg->createTransporter(conf);
- if(res)
- ndbout << "... -- Success " << endl;
- else
- ndbout << "... -- Failure " << endl;
- }
- void
- createSCITransporter(void * _conf,
- NodeId localNodeId,
- NodeId remoteNodeId,
- const char * localHostName,
- const char * remoteHostName,
- int sendbuf,
- int recvbuf) {
- ndbout << "Creating SCI transporter from node "
- << localNodeId << "(" << localHostName << ") to "
- << remoteNodeId << "(" << remoteHostName << ")..." << endl;;
- SCI_TransporterConfiguration * conf = (SCI_TransporterConfiguration*)_conf;
- conf->remoteSciNodeId0= (Uint16)atoi(localHostName);
- conf->remoteSciNodeId1= (Uint16)atoi(remoteHostName);
- conf->localNodeId = localNodeId;
- conf->remoteNodeId = remoteNodeId;
- bool res = tReg->createTransporter(conf);
- if(res)
- ndbout << "... -- Success " << endl;
- else
- ndbout << "... -- Failure " << endl;
- }
- void
- createSHMTransporter(void * _conf,
- NodeId localNodeId,
- NodeId remoteNodeId,
- const char * localHostName,
- const char * remoteHostName,
- int sendbuf,
- int recvbuf) {
- ndbout << "Creating SHM transporter from node "
- << localNodeId << "(" << localHostName << ") to "
- << remoteNodeId << "(" << remoteHostName << ")..." << endl;;
- SHM_TransporterConfiguration * conf = (SHM_TransporterConfiguration*)_conf;
- conf->localNodeId = localNodeId;
- conf->remoteNodeId = remoteNodeId;
- bool res = tReg->createTransporter(conf);
- if(res)
- ndbout << "... -- Success " << endl;
- else
- ndbout << "... -- Failure " << endl;
- }
- void
- createTCPTransporter(void * _conf,
- NodeId localNodeId,
- NodeId remoteNodeId,
- const char * localHostName,
- const char * remoteHostName,
- int sendBuf,
- int recvBuf){
- ndbout << "Creating TCP transporter from node "
- << localNodeId << "(" << localHostName << ") to "
- << remoteNodeId << "(" << remoteHostName << ")..." << endl;;
- TCP_TransporterConfiguration * conf = (TCP_TransporterConfiguration*)_conf;
- int port;
- if(localNodeId == 1 && remoteNodeId == 2) port = basePortTCP + 0;
- if(localNodeId == 1 && remoteNodeId == 3) port = basePortTCP + 1;
- if(localNodeId == 2 && remoteNodeId == 1) port = basePortTCP + 0;
- if(localNodeId == 2 && remoteNodeId == 3) port = basePortTCP + 2;
- if(localNodeId == 3 && remoteNodeId == 1) port = basePortTCP + 1;
- if(localNodeId == 3 && remoteNodeId == 2) port = basePortTCP + 2;
- if(sendBuf != -1){
- conf->sendBufferSize = sendBuf;
- }
- if(recvBuf != -1){
- conf->maxReceiveSize = recvBuf;
- }
- ndbout << "tSendBufferSize: " << conf->sendBufferSize << endl;
- ndbout << "tReceiveBufferSize: " << conf->maxReceiveSize << endl;
- conf->localNodeId = localNodeId;
- conf->localHostName = localHostName;
- conf->remoteNodeId = remoteNodeId;
- conf->remoteHostName = remoteHostName;
- conf->port = port;
- bool res = tReg->createTransporter(conf);
- if(res)
- ndbout << "... -- Success " << endl;
- else
- ndbout << "... -- Failure " << endl;
- }
- struct TestPhase {
- int signalSize;
- int noOfSignals;
- int noOfSignalSent;
- int noOfSignalReceived;
- NDB_TICKS startTime;
- NDB_TICKS stopTime;
- NDB_TICKS startTimePrioA;
- NDB_TICKS stopTimePrioA;
- NDB_TICKS totTimePrioA;
- int bytesSentBeforePrioA;
- NDB_TICKS accTime;
- int loopCount;
- Uint64 sendLenBytes, sendCount;
- Uint64 recvLenBytes, recvCount;
- };
- TestPhase testSpec[] = {
- { 1, 10, 0,0, 0,0,0,0,0,0,0 } // 10 signals of size 1 word
- ,{ 1, 10000, 0,0, 0,0,0,0,0,0,0 } // 100 signals of size 1 word
- ,{ 1, 10000, 0,0, 0,0,0,0,0,0,0 } // 1000 signals of size 1 word
- ,{ 1, 10000, 0,0, 0,0,0,0,0,0,0 } // 10000 signals of size 1 word
- ,{ 8, 10, 0,0, 0,0,0,0,0,0,0 } // 10 signals of size 1 word
- ,{ 8, 10000, 0,0, 0,0,0,0,0,0,0 } // 100 signals of size 1 word
- ,{ 8, 10000, 0,0, 0,0,0,0,0,0,0 } // 1000 signals of size 1 word
- ,{ 8, 10000, 0,0, 0,0,0,0,0,0,0 } // 10000 signals of size 1 word
- ,{ 16, 10, 0,0, 0,0,0,0,0,0,0 } // 10 signals of size 1 word
- ,{ 16, 100, 0,0, 0,0,0,0,0,0,0 } // 100 signals of size 1 word
- ,{ 16, 1000, 0,0, 0,0,0,0,0,0,0 } // 1000 signals of size 1 word
- ,{ 16, 10000, 0,0, 0,0,0,0,0,0,0 } // 10000 signals of size 1 word
- ,{ 24, 10, 0,0, 0,0,0,0,0,0,0 } // 10 signals of size 1 word
- ,{ 24, 100, 0,0, 0,0,0,0,0,0,0 } // 100 signals of size 1 word
- ,{ 24, 1000, 0,0, 0,0,0,0,0,0,0 } // 1000 signals of size 1 word
- ,{ 24, 10000, 0,0, 0,0,0,0,0,0,0 } // 10000 signals of size 1 word
- ,{ 0, 10, 0,0, 0,0,0,0,0,0,0 } // 10 signals of random size
- ,{ 0, 100, 0,0, 0,0,0,0,0,0,0 } // 100 signals of random size
- ,{ 0, 1000, 0,0, 0,0,0,0,0,0,0 } // 1000 signals of random size
- ,{ 0, 10000, 0,0, 0,0,0,0,0,0,0 } // 10000 signals of random size
- };
- const int noOfTests = sizeof(testSpec)/sizeof(TestPhase);
- SendStatus
- sendSignalTo(NodeId nodeId, int signalSize, int prio){
- if(signalSize == 0)
- signalSize = (rand() % 25) + 1;
- SignalHeader sh;
- sh.theLength = signalSize;
- sh.theVerId_signalNumber = rand();
- sh.theReceiversBlockNumber = rand();
- sh.theSendersBlockRef = rand();
- sh.theSendersSignalId = rand();
- sh.theSignalId = rand();
- sh.theTrace = rand();
- Uint32 theData[25];
- for(int i = 0; i<signalSize; i++)
- theData[i] = (i+1) * (Uint32)(&theData[i]);
- return tReg->prepareSend(&sh, prio, theData, nodeId);
- }
- void
- reportHeader(){
- ndbout << "#SigstSztPayloadtTimetSig/sectBpst"
- << "s lentr lentprioAtimetbytesb4pA" << endl;
- }
- void
- printReport(TestPhase & p){
- if(p.accTime > 0) {
- Uint32 secs = (p.accTime/p.loopCount)/1000;
- Uint32 mill = (p.accTime/p.loopCount)%1000;
- char st[255];
- if(secs > 0){
- sprintf(st, "%d.%.2ds", secs, (mill/10));
- } else {
- sprintf(st, "%dms", mill);
- }
- Uint32 sps = (1000*p.noOfSignals*p.loopCount)/p.accTime;
- Uint32 bps = ((4000*p.noOfSignals)/p.accTime)*(p.loopCount*(p.signalSize+3));
- if(p.signalSize == 0)
- ((4000*p.noOfSignals)/p.accTime)*(p.loopCount*(13+3));
- char ssps[255];
- if(sps > 1000000){
- sps /= 1000000;
- sprintf(ssps, "%dM", (int)sps);
- } else if(sps > 1000){
- sps /= 1000;
- sprintf(ssps, "%dk", (int)sps);
- } else {
- sprintf(ssps, "%d", (int)sps);
- }
- char sbps[255];
- if(bps > 1000000){
- bps /= 1000000;
- sprintf(sbps, "%dM", bps);
- } else if(bps>1000){
- bps /= 1000;
- sprintf(sbps, "%dk", bps);
- } else {
- sprintf(sbps, "%d", bps);
- }
- char buf[255];
- if(p.signalSize != 0){
- BaseString::snprintf(buf, 255,
- "%dt%dt%dt%st%st%st%dt%dt%dt%d",
- p.noOfSignals,
- p.signalSize,
- (4*p.signalSize),
- st,
- ssps,
- sbps,
- (int)(p.sendLenBytes / (p.sendCount == 0 ? 1 : p.sendCount)),
- (int)(p.recvLenBytes / (p.recvCount == 0 ? 1 : p.recvCount)),
- (int)(p.totTimePrioA / p.loopCount),
- (int)(p.bytesSentBeforePrioA));
- } else {
- BaseString::snprintf(buf, 255,
- "%dtrandt4*randt%st%st%st%dt%dt%dt%d",
- p.noOfSignals,
- st,
- ssps,
- sbps,
- (int)(p.sendLenBytes / (p.sendCount == 0 ? 1 : p.sendCount)),
- (int)(p.recvLenBytes / (p.recvCount == 0 ? 1 : p.recvCount)),
- (int)(p.totTimePrioA / p.loopCount),
- (int)(p.bytesSentBeforePrioA));
- }
- ndbout << buf << endl;
- }
- }
- int loopCount = 1;
- int sendBufSz = -1;
- int recvBufSz = -1;
- NDB_TICKS startSec=0;
- NDB_TICKS stopSec=0;
- Uint32 startMicro=0;
- Uint32 stopMicro=0;
- int timerStarted;
- int timerStopped;
- bool isClient = false;
- bool isConnected = false;
- bool isStarted = false;
- int currentPhase = 0;
- TestPhase allPhases[noOfTests];
- Uint32 signalToEcho;
- NDB_TICKS startTime, stopTime;
- void
- client(NodeId remoteNodeId){
- isClient = true;
- currentPhase = 0;
- memcpy(allPhases, testSpec, sizeof(testSpec));
- int counter = 0;
- while(true){
- TestPhase * current = &allPhases[currentPhase];
- if(current->noOfSignals == current->noOfSignalSent &&
- current->noOfSignals == current->noOfSignalReceived){
- /**
- * Test phase done
- */
- current->stopTime = NdbTick_CurrentMillisecond();
- current->accTime += (current->stopTime - current->startTime);
- NdbSleep_MilliSleep(500 / loopCount);
- current->startTime = NdbTick_CurrentMillisecond();
- current->noOfSignalSent = 0;
- current->noOfSignalReceived = 0;
- current->loopCount ++;
- if(current->loopCount == loopCount){
- printReport(allPhases[currentPhase]);
- currentPhase ++;
- if(currentPhase == noOfTests){
- /**
- * Now we are done
- */
- break;
- }
- NdbSleep_MilliSleep(500);
- current = &allPhases[currentPhase];
- current->startTime = NdbTick_CurrentMillisecond();
- }
- }
- int signalsLeft = current->noOfSignals - current->noOfSignalSent;
- if(signalsLeft > 0){
- for(; signalsLeft > 1; signalsLeft--){
- if(sendSignalTo(remoteNodeId, current->signalSize, 1) == SEND_OK) {
- current->noOfSignalSent++;
- // ndbout << "sent prio b" << endl;
- current->bytesSentBeforePrioA += (current->signalSize << 2);
- }
- else {
- tReg->external_IO(10);
- break;
- }
- }
- //prio A
- if(signalsLeft==1) {
- NDB_TICKS sec = 0;
- Uint32 micro=0;
- int ret = NdbTick_CurrentMicrosecond(&sec,µ);
- if(ret==0)
- current->startTimePrioA = micro + sec*1000000;
- if(sendSignalTo(remoteNodeId, current->signalSize, 0) == SEND_OK) {
- current->noOfSignalSent++;
- signalsLeft--;
- }
- else {
- tReg->external_IO(10);
- break;
- }
- }
- }
- if(counter % 10 == 0)
- tReg->checkConnections();
- tReg->external_IO(0);
- counter++;
- }
- }
- void
- server(){
- isClient = false;
- signalToEcho = 0;
- for(int i = 0; i<noOfTests; i++)
- signalToEcho += testSpec[i].noOfSignals;
- signalToEcho *= loopCount;
- while(signalToEcho > 0){
- tReg->checkConnections();
- for(int i = 0; i<10; i++)
- tReg->external_IO(10);
- }
- }
- int
- prioTransporterTest(TestType tt, const char * progName,
- int argc, const char **argv){
- loopCount = 100;
- sendBufSz = -1;
- recvBufSz = -1;
- isClient = false;
- isConnected = false;
- isStarted = false;
- currentPhase = 0;
- signalHandler(0);
- if(argc < 4){
- usage(progName);
- return 0;
- }
- const NodeId localNodeId = atoi(argv[1]);
- const char * localHostName = argv[2];
- const char * remoteHost1 = argv[3];
- if(argc >= 5)
- loopCount = atoi(argv[4]);
- if(argc >= 6)
- sendBufSz = atoi(argv[5]);
- if(argc >= 7)
- recvBufSz = atoi(argv[6]);
- if(localNodeId < 1 || localNodeId > 2){
- ndbout << "localNodeId = " << localNodeId << endl << endl;
- usage(progName);
- return 0;
- }
- if(localNodeId == 1)
- ndbout << "-- ECHO CLIENT --" << endl;
- else
- ndbout << "-- ECHO SERVER --" << endl;
- ndbout << "localNodeId: " << localNodeId << endl;
- ndbout << "localHostName: " << localHostName << endl;
- ndbout << "remoteHost1 (node " << (localNodeId == 1?2:1) << "): "
- << remoteHost1 << endl;
- ndbout << "Loop count: " << loopCount << endl;
- ndbout << "-----------------" << endl;
- void * confTemplate = 0;
- CreateTransporterFunc func = 0;
- switch(tt){
- case TestTCP:
- func = createTCPTransporter;
- confTemplate = &tcpTemplate;
- break;
- case TestOSE:
- func = createOSETransporter;
- confTemplate = &oseTemplate;
- break;
- case TestSCI:
- func = createSCITransporter;
- confTemplate = &sciTemplate;
- break;
- case TestSHM:
- func = createSHMTransporter;
- confTemplate = &shmTemplate;
- break;
- default:
- ndbout << "Unsupported transporter type" << endl;
- return 0;
- }
- ndbout << "Creating transporter registry" << endl;
- tReg = new TransporterRegistry;
- tReg->init(localNodeId);
- switch(localNodeId){
- case 1:
- (* func)(confTemplate, 1, 2, localHostName, remoteHost1,
- sendBufSz, recvBufSz);
- break;
- case 2:
- (* func)(confTemplate, 2, 1, localHostName, remoteHost1,
- sendBufSz, recvBufSz);
- break;
- }
- ndbout << "Doing startSending/startReceiving" << endl;
- tReg->startSending();
- tReg->startReceiving();
- ndbout << "Connecting" << endl;
- tReg->setPerformState(PerformConnect);
- tReg->checkConnections();
- if(localNodeId == 1)
- client(2);
- else
- server();
- isStarted = false;
- ndbout << "Sleep 3 secs" << endl;
- NdbSleep_SecSleep(3);
- ndbout << "Doing setPerformState(Disconnect)" << endl;
- tReg->setPerformState(PerformDisconnect);
- ndbout << "Doing checkConnections()" << endl;
- tReg->checkConnections();
- ndbout << "Deleting transporter registry" << endl;
- delete tReg; tReg = 0;
- return 0;
- }
- NdbOut & operator <<(NdbOut & out, SignalHeader & sh){
- out << "-- Signal Header --" << endl;
- out << "theLength: " << sh.theLength << endl;
- out << "gsn: " << sh.theVerId_signalNumber << endl;
- out << "recBlockNo: " << sh.theReceiversBlockNumber << endl;
- out << "sendBlockRef: " << sh.theSendersBlockRef << endl;
- out << "sendersSig: " << sh.theSendersSignalId << endl;
- out << "theSignalId: " << sh.theSignalId << endl;
- out << "trace: " << (int)sh.theTrace << endl;
- return out;
- }
- void
- execute(SignalHeader * const header, Uint8 prio, Uint32 * const theData){
- const NodeId nodeId = refToNode(header->theSendersBlockRef);
- NDB_TICKS sec = 0;
- Uint32 micro=0;
- int ret = NdbTick_CurrentMicrosecond(&sec,µ);
- if(prio == 0 && isClient && ret == 0) {
- allPhases[currentPhase].stopTimePrioA = micro + sec*1000000;
- allPhases[currentPhase].totTimePrioA +=
- allPhases[currentPhase].stopTimePrioA -
- allPhases[currentPhase].startTimePrioA;
- }
- if(ret!=0)
- allPhases[currentPhase].totTimePrioA = -1;
- if(isClient){
- allPhases[currentPhase].noOfSignalReceived++;
- } else {
- int sleepTime = 10;
- while(tReg->prepareSend(header, prio, theData, nodeId) != SEND_OK){
- ndbout << "Failed to echo" << sleepTime << endl;
- NdbSleep_MilliSleep(sleepTime);
- // sleepTime += 10;
- }
- signalToEcho--;
- }
- }
- void
- reportError(NodeId nodeId, TransporterError errorCode){
- char buf[255];
- sprintf(buf, "reportError (%d, %x) in perfTest", nodeId, errorCode);
- ndbout << buf << endl;
- if(errorCode & 0x8000){
- tReg->setPerformState(nodeId, PerformDisconnect);
- }
- }
- /**
- * Report average send theLength in bytes (4096 last sends)
- */
- void
- reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes){
- allPhases[currentPhase].sendCount += count;
- allPhases[currentPhase].sendLenBytes += bytes;
- if(!isClient){
- ndbout << "reportSendLen(" << nodeId << ", "
- << (bytes/count) << ")" << endl;
- }
- }
- /**
- * Report average receive theLength in bytes (4096 last receives)
- */
- void
- reportReceiveLen(NodeId nodeId, Uint32 count, Uint64 bytes){
- allPhases[currentPhase].recvCount += count;
- allPhases[currentPhase].recvLenBytes += bytes;
- if(!isClient){
- ndbout << "reportReceiveLen(" << nodeId << ", "
- << (bytes/count) << ")" << endl;
- }
- }
- /**
- * Report connection established
- */
- void
- reportConnect(NodeId nodeId){
- char buf[255];
- sprintf(buf, "reportConnect(%d)", nodeId);
- ndbout << buf << endl;
- tReg->setPerformState(nodeId, PerformIO);
- if(!isStarted){
- isStarted = true;
- startTime = NdbTick_CurrentMillisecond();
- if(isClient){
- reportHeader();
- allPhases[0].startTime = startTime;
- }
- }
- else{
- // Resend signals that were lost when connection failed
- TestPhase * current = &allPhases[currentPhase];
- current->noOfSignalSent = current->noOfSignalReceived;
- }
- }
- /**
- * Report connection broken
- */
- void
- reportDisconnect(NodeId nodeId, Uint32 errNo){
- char buf[255];
- sprintf(buf, "reportDisconnect(%d)", nodeId);
- ndbout << buf << endl;
- if(isStarted)
- tReg->setPerformState(nodeId, PerformConnect);
- }
- int
- checkJobBuffer() {
- /**
- * Check to see if jobbbuffers are starting to get full
- * and if so call doJob
- */
- return 0;
- }