prioTransporterTest.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:20k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #include <ndb_global.h>
  14. #include "TransporterRegistry.hpp"
  15. #include "TransporterDefinitions.hpp"
  16. #include "TransporterCallback.hpp"
  17. #include <RefConvert.hpp>
  18. #include "prioTransporterTest.hpp"
  19. #include <NdbTick.h>
  20. #include <NdbMain.h>
  21. #include <NdbOut.hpp>
  22. #include <NdbSleep.h>
  23. int basePortTCP = 17000;
  24. SCI_TransporterConfiguration sciTemplate = {
  25.   2000, 
  26.        // Packet size
  27.   2000000,      // Buffer size
  28.   2,           // number of adapters
  29.   1,           // remote node id SCI 
  30.   2,           // Remote node Id SCI
  31.   0,           // local ndb node id (server)
  32.   0,           // remote ndb node id (client)
  33.   0,              // byteOrder;
  34.   false,          // compression;
  35.   true,          // checksum;
  36.   true            // signalId;
  37. };
  38. SHM_TransporterConfiguration shmTemplate = {
  39.   100000, // shmSize
  40.   0,      // shmKey
  41.   1,           // local ndb node id (server)
  42.   2,           // remote ndb node id (client)
  43.   0,              // byteOrder;
  44.   false,          // compression;
  45.   true,           // checksum;
  46.   true            // signalId;
  47. };
  48. TCP_TransporterConfiguration tcpTemplate = {
  49.   17000,          // port; 
  50.   "",             // remoteHostName;
  51.   "",             // localhostname
  52.   2,              // remoteNodeId;
  53.   1,              // localNodeId;
  54.   2000000,          // sendBufferSize - Size of SendBuffer of priority B 
  55.   2000,           // maxReceiveSize - Maximum no of bytes to receive
  56.   0,              // byteOrder;
  57.   false,          // compression;
  58.   true,           // checksum;
  59.   true            // signalId;
  60. };
  61. OSE_TransporterConfiguration oseTemplate = {
  62.   "",    // remoteHostName;
  63.   "",    // localHostName;
  64.   0,     // remoteNodeId;
  65.   0,     // localNodeId;
  66.   false, // compression;
  67.   true,  // checksum;
  68.   true,  // signalId;
  69.   0,     // byteOrder;
  70.   
  71.   2000,  // prioASignalSize;
  72.   2000,  // prioBSignalSize;
  73.   10     // Recv buf size
  74. };
  75. TransporterRegistry *tReg = 0;
  76. #ifndef OSE_DELTA
  77. #include <signal.h>
  78. #endif
  79. extern "C"
  80. void
  81. signalHandler(int signo){
  82. #ifndef OSE_DELTA
  83.   ::signal(13, signalHandler);
  84. #endif
  85.   char buf[255];
  86.   sprintf(buf,"Signal: %dn", signo);
  87.   ndbout << buf << endl;
  88. }
  89. void 
  90. usage(const char * progName){
  91.   ndbout << "Usage: " << progName << " localNodeId localHostName" 
  92.  << " remoteHostName"
  93.  << " [<loop count>] [<send buf size>] [<recv buf size>]" << endl;
  94.   ndbout << "  localNodeId - {1,2}" << endl;
  95. }
  96. typedef void (* CreateTransporterFunc)(void * conf, 
  97.        NodeId localNodeId,
  98.        NodeId remoteNodeId,
  99.        const char * localHostName,
  100.        const char * remoteHostName,
  101.        int sendBuf,
  102.        int recvBuf);
  103. void
  104. createOSETransporter(void * _conf,
  105.      NodeId localNodeId,
  106.      NodeId remoteNodeId,
  107.      const char * localHostName,
  108.      const char * remoteHostName,
  109.      int sendBuf,
  110.      int recvBuf){
  111.   
  112.   ndbout << "Creating OSE transporter from node " 
  113.  << localNodeId << "(" << localHostName << ") to "
  114.  << remoteNodeId << "(" << remoteHostName << ")..." << endl;;
  115.   
  116.   OSE_TransporterConfiguration * conf = (OSE_TransporterConfiguration*)_conf;
  117.   
  118.   if(sendBuf != -1){
  119.     conf->prioBSignalSize = sendBuf;
  120.   }
  121.   if(recvBuf != -1){
  122.     conf->receiveBufferSize = recvBuf;
  123.   }
  124.   ndbout << "tSendBufferSize:    " << conf->prioBSignalSize << endl;
  125.   ndbout << "tReceiveBufferSize: " << conf->receiveBufferSize << endl;
  126.   conf->localNodeId    = localNodeId;
  127.   conf->localHostName  = localHostName;
  128.   conf->remoteNodeId   = remoteNodeId;
  129.   conf->remoteHostName = remoteHostName;
  130.   bool res = tReg->createTransporter(conf);
  131.   if(res)
  132.     ndbout << "... -- Success " << endl;
  133.   else
  134.     ndbout << "... -- Failure " << endl;
  135. }
  136. void
  137. createSCITransporter(void * _conf,
  138.      NodeId localNodeId,
  139.      NodeId remoteNodeId,
  140.      const char * localHostName,
  141.      const char * remoteHostName,
  142.      int sendbuf,
  143.      int recvbuf) {
  144.   ndbout << "Creating SCI transporter from node " 
  145.  << localNodeId << "(" << localHostName << ") to "
  146.  << remoteNodeId << "(" << remoteHostName << ")..." << endl;;
  147.   
  148.   
  149.   SCI_TransporterConfiguration * conf = (SCI_TransporterConfiguration*)_conf;
  150.   conf->remoteSciNodeId0= (Uint16)atoi(localHostName);
  151.   conf->remoteSciNodeId1= (Uint16)atoi(remoteHostName);
  152.   conf->localNodeId    = localNodeId;
  153.   conf->remoteNodeId   = remoteNodeId;
  154.   bool res = tReg->createTransporter(conf);
  155.   if(res)
  156.     ndbout << "... -- Success " << endl;
  157.   else
  158.     ndbout << "... -- Failure " << endl;
  159. }
  160. void
  161. createSHMTransporter(void * _conf,
  162.      NodeId localNodeId,
  163.      NodeId remoteNodeId,
  164.      const char * localHostName,
  165.      const char * remoteHostName,
  166.      int sendbuf,
  167.      int recvbuf) {
  168.   ndbout << "Creating SHM transporter from node " 
  169.  << localNodeId << "(" << localHostName << ") to "
  170.  << remoteNodeId << "(" << remoteHostName << ")..." << endl;;
  171.   
  172.   
  173.   SHM_TransporterConfiguration * conf = (SHM_TransporterConfiguration*)_conf;
  174.   conf->localNodeId    = localNodeId;
  175.   conf->remoteNodeId   = remoteNodeId;
  176.   bool res = tReg->createTransporter(conf);
  177.   if(res)
  178.     ndbout << "... -- Success " << endl;
  179.   else
  180.     ndbout << "... -- Failure " << endl;
  181. }
  182. void
  183. createTCPTransporter(void * _conf,
  184.      NodeId localNodeId,
  185.      NodeId remoteNodeId,
  186.      const char * localHostName,
  187.      const char * remoteHostName,
  188.      int sendBuf,
  189.      int recvBuf){
  190.   ndbout << "Creating TCP transporter from node " 
  191.  << localNodeId << "(" << localHostName << ") to "
  192.  << remoteNodeId << "(" << remoteHostName << ")..." << endl;;
  193.   
  194.   TCP_TransporterConfiguration * conf = (TCP_TransporterConfiguration*)_conf;
  195.   
  196.   int port;
  197.   if(localNodeId == 1 && remoteNodeId == 2) port = basePortTCP + 0;
  198.   if(localNodeId == 1 && remoteNodeId == 3) port = basePortTCP + 1;
  199.   if(localNodeId == 2 && remoteNodeId == 1) port = basePortTCP + 0;
  200.   if(localNodeId == 2 && remoteNodeId == 3) port = basePortTCP + 2;
  201.   if(localNodeId == 3 && remoteNodeId == 1) port = basePortTCP + 1;
  202.   if(localNodeId == 3 && remoteNodeId == 2) port = basePortTCP + 2;
  203.   if(sendBuf != -1){
  204.     conf->sendBufferSize = sendBuf;
  205.   }
  206.   if(recvBuf != -1){
  207.     conf->maxReceiveSize = recvBuf;
  208.   }
  209.   ndbout << "tSendBufferSize:    " << conf->sendBufferSize << endl;
  210.   ndbout << "tReceiveBufferSize: " << conf->maxReceiveSize << endl;
  211.   conf->localNodeId    = localNodeId;
  212.   conf->localHostName  = localHostName;
  213.   conf->remoteNodeId   = remoteNodeId;
  214.   conf->remoteHostName = remoteHostName;
  215.   conf->port           = port;
  216.   bool res = tReg->createTransporter(conf);
  217.   if(res)
  218.     ndbout << "... -- Success " << endl;
  219.   else
  220.     ndbout << "... -- Failure " << endl;
  221. }
  222. struct TestPhase {
  223.   int signalSize;
  224.   int noOfSignals;
  225.   int noOfSignalSent;
  226.   int noOfSignalReceived;
  227.   NDB_TICKS startTime;
  228.   NDB_TICKS stopTime;
  229.   NDB_TICKS startTimePrioA;
  230.   NDB_TICKS stopTimePrioA;
  231.   NDB_TICKS totTimePrioA;
  232.   int bytesSentBeforePrioA;
  233.   NDB_TICKS accTime;
  234.   int loopCount;
  235.   Uint64 sendLenBytes, sendCount;
  236.   Uint64 recvLenBytes, recvCount;
  237. };
  238. TestPhase testSpec[] = {
  239.    {  1,    10, 0,0, 0,0,0,0,0,0,0 } //    10 signals of size 1  word
  240.   ,{  1,   10000, 0,0, 0,0,0,0,0,0,0 } //   100 signals of size 1  word
  241.   ,{  1,  10000, 0,0, 0,0,0,0,0,0,0 } //  1000 signals of size 1  word
  242.   ,{  1, 10000, 0,0, 0,0,0,0,0,0,0 } // 10000 signals of size 1  word
  243.   ,{  8,    10, 0,0, 0,0,0,0,0,0,0 } //    10 signals of size 1  word
  244.   ,{  8,   10000, 0,0, 0,0,0,0,0,0,0 } //   100 signals of size 1  word
  245.   ,{  8,  10000, 0,0, 0,0,0,0,0,0,0 } //  1000 signals of size 1  word
  246.   ,{  8, 10000, 0,0, 0,0,0,0,0,0,0 } // 10000 signals of size 1  word
  247.   ,{ 16,    10, 0,0, 0,0,0,0,0,0,0 } //    10 signals of size 1  word
  248.   ,{ 16,   100, 0,0, 0,0,0,0,0,0,0 } //   100 signals of size 1  word
  249.   ,{ 16,  1000, 0,0, 0,0,0,0,0,0,0 } //  1000 signals of size 1  word
  250.   ,{ 16, 10000, 0,0, 0,0,0,0,0,0,0 } // 10000 signals of size 1  word
  251.   ,{ 24,    10, 0,0, 0,0,0,0,0,0,0 } //    10 signals of size 1  word
  252.   ,{ 24,   100, 0,0, 0,0,0,0,0,0,0 } //   100 signals of size 1  word
  253.   ,{ 24,  1000, 0,0, 0,0,0,0,0,0,0 } //  1000 signals of size 1  word
  254.   ,{ 24, 10000, 0,0, 0,0,0,0,0,0,0 } // 10000 signals of size 1  word
  255.   ,{  0,    10, 0,0, 0,0,0,0,0,0,0 } //    10 signals of random size
  256.   ,{  0,   100, 0,0, 0,0,0,0,0,0,0 } //   100 signals of random size
  257.   ,{  0,  1000, 0,0, 0,0,0,0,0,0,0 } //  1000 signals of random size
  258.   ,{  0, 10000, 0,0, 0,0,0,0,0,0,0 } // 10000 signals of random size
  259. };
  260. const int noOfTests = sizeof(testSpec)/sizeof(TestPhase);
  261. SendStatus
  262. sendSignalTo(NodeId nodeId, int signalSize, int prio){
  263.   if(signalSize == 0)
  264.     signalSize = (rand() % 25) + 1;
  265.   SignalHeader sh;
  266.   sh.theLength               = signalSize;
  267.   sh.theVerId_signalNumber   = rand();
  268.   sh.theReceiversBlockNumber = rand();   
  269.   sh.theSendersBlockRef      = rand(); 
  270.   sh.theSendersSignalId      = rand(); 
  271.   sh.theSignalId             = rand(); 
  272.   sh.theTrace                = rand(); 
  273.   
  274.   Uint32 theData[25];
  275.   for(int i = 0; i<signalSize; i++)
  276.     theData[i] = (i+1) * (Uint32)(&theData[i]);
  277.   
  278.   return tReg->prepareSend(&sh, prio, theData, nodeId);
  279. }
  280. void
  281. reportHeader(){
  282.   ndbout << "#SigstSztPayloadtTimetSig/sectBpst"
  283.  << "s lentr lentprioAtimetbytesb4pA" << endl;
  284. }
  285. void
  286. printReport(TestPhase & p){
  287.   if(p.accTime > 0) {
  288.     Uint32 secs = (p.accTime/p.loopCount)/1000;
  289.     Uint32 mill = (p.accTime/p.loopCount)%1000;
  290.     char st[255];
  291.     if(secs > 0){
  292.       sprintf(st, "%d.%.2ds", secs, (mill/10));
  293.     } else {
  294.       sprintf(st, "%dms", mill);
  295.     }
  296.   
  297.     Uint32 sps = (1000*p.noOfSignals*p.loopCount)/p.accTime;
  298.     Uint32 bps = ((4000*p.noOfSignals)/p.accTime)*(p.loopCount*(p.signalSize+3));
  299.     if(p.signalSize == 0)
  300.       ((4000*p.noOfSignals)/p.accTime)*(p.loopCount*(13+3));
  301.     
  302.     char ssps[255];
  303.     if(sps > 1000000){
  304.       sps /= 1000000;
  305.       sprintf(ssps, "%dM", (int)sps);
  306.     } else if(sps > 1000){
  307.     sps /= 1000;
  308.     sprintf(ssps, "%dk", (int)sps);
  309.     } else {
  310.       sprintf(ssps, "%d", (int)sps);
  311.     }
  312.     
  313.     char sbps[255];
  314.     if(bps > 1000000){
  315.       bps /= 1000000;
  316.       sprintf(sbps, "%dM", bps);
  317.     } else if(bps>1000){
  318.       bps /= 1000;
  319.       sprintf(sbps, "%dk", bps);
  320.     } else {
  321.       sprintf(sbps, "%d", bps);
  322.   }
  323.     
  324.     char buf[255];
  325.     if(p.signalSize != 0){
  326.       BaseString::snprintf(buf, 255,
  327.        "%dt%dt%dt%st%st%st%dt%dt%dt%d",
  328.        p.noOfSignals,
  329.        p.signalSize,
  330.        (4*p.signalSize),
  331.        st,
  332.        ssps,
  333.        sbps,
  334.        (int)(p.sendLenBytes / (p.sendCount == 0 ? 1 : p.sendCount)),
  335.        (int)(p.recvLenBytes / (p.recvCount == 0 ? 1 : p.recvCount)),
  336.        (int)(p.totTimePrioA / p.loopCount),
  337.        (int)(p.bytesSentBeforePrioA));
  338.     } else {
  339.       BaseString::snprintf(buf, 255,
  340.        "%dtrandt4*randt%st%st%st%dt%dt%dt%d",
  341.        p.noOfSignals,
  342.        st,
  343.        ssps,
  344.        sbps,
  345.        (int)(p.sendLenBytes / (p.sendCount == 0 ? 1 : p.sendCount)),
  346.        (int)(p.recvLenBytes / (p.recvCount == 0 ? 1 : p.recvCount)),
  347.        (int)(p.totTimePrioA / p.loopCount),
  348.        (int)(p.bytesSentBeforePrioA));
  349.       
  350.     }
  351.     ndbout << buf << endl;
  352.   }
  353. }
  354. int loopCount = 1;
  355. int sendBufSz = -1;
  356. int recvBufSz = -1;
  357. NDB_TICKS startSec=0;
  358. NDB_TICKS stopSec=0;
  359. Uint32 startMicro=0;
  360. Uint32 stopMicro=0;
  361. int timerStarted;
  362. int timerStopped;
  363. bool      isClient     = false;
  364. bool      isConnected  = false;
  365. bool      isStarted    = false;
  366. int       currentPhase = 0;
  367. TestPhase allPhases[noOfTests];
  368. Uint32    signalToEcho;
  369. NDB_TICKS startTime, stopTime;
  370. void
  371. client(NodeId remoteNodeId){
  372.   isClient = true;
  373.   currentPhase = 0;
  374.   memcpy(allPhases, testSpec, sizeof(testSpec));
  375.   int counter = 0;
  376.   while(true){
  377.     TestPhase * current = &allPhases[currentPhase];
  378.     if(current->noOfSignals == current->noOfSignalSent &&
  379.        current->noOfSignals == current->noOfSignalReceived){
  380.       /**
  381.        * Test phase done
  382.        */
  383.       current->stopTime  = NdbTick_CurrentMillisecond();
  384.       current->accTime  += (current->stopTime - current->startTime);
  385.       NdbSleep_MilliSleep(500 / loopCount);
  386.       
  387.       current->startTime = NdbTick_CurrentMillisecond();
  388.       
  389.       current->noOfSignalSent     = 0;
  390.       current->noOfSignalReceived = 0;
  391.       current->loopCount ++;
  392.       if(current->loopCount == loopCount){
  393. printReport(allPhases[currentPhase]);
  394. currentPhase ++;
  395. if(currentPhase == noOfTests){
  396.   /**
  397.    * Now we are done
  398.    */
  399.   break;
  400. }
  401. NdbSleep_MilliSleep(500);
  402. current = &allPhases[currentPhase];
  403. current->startTime = NdbTick_CurrentMillisecond();
  404.       }
  405.     } 
  406.     int signalsLeft = current->noOfSignals - current->noOfSignalSent;
  407.     if(signalsLeft > 0){
  408.       for(; signalsLeft > 1; signalsLeft--){
  409. if(sendSignalTo(remoteNodeId, current->signalSize, 1) == SEND_OK) {
  410.   current->noOfSignalSent++;
  411.   //   ndbout << "sent prio b" << endl;
  412.   current->bytesSentBeforePrioA += (current->signalSize << 2);
  413. }
  414. else {
  415.   tReg->external_IO(10);
  416.   break;
  417. }
  418.       }
  419.       //prio A
  420.       if(signalsLeft==1) {
  421. NDB_TICKS sec = 0;
  422. Uint32 micro=0;
  423. int ret = NdbTick_CurrentMicrosecond(&sec,&micro);
  424. if(ret==0)
  425.   current->startTimePrioA  = micro + sec*1000000;
  426. if(sendSignalTo(remoteNodeId, current->signalSize, 0) == SEND_OK) {
  427.   current->noOfSignalSent++;
  428.   signalsLeft--;
  429. }
  430. else {
  431.   tReg->external_IO(10);
  432.   break;
  433. }
  434.       }
  435.     }
  436.     
  437.     if(counter % 10 == 0)
  438.       tReg->checkConnections();
  439.     tReg->external_IO(0);
  440.     counter++;
  441.   }
  442. }
  443. void 
  444. server(){
  445.   isClient = false;
  446.   
  447.   signalToEcho = 0;
  448.   for(int i = 0; i<noOfTests; i++)
  449.     signalToEcho += testSpec[i].noOfSignals;
  450.   
  451.   signalToEcho *= loopCount;
  452.   
  453.   while(signalToEcho > 0){
  454.     tReg->checkConnections();
  455.     for(int i = 0; i<10; i++)
  456.       tReg->external_IO(10);
  457.   }
  458. }
  459. int
  460. prioTransporterTest(TestType tt, const char * progName, 
  461.     int argc, const char **argv){
  462.   
  463.   loopCount = 100;
  464.   sendBufSz = -1;
  465.   recvBufSz = -1;
  466.   
  467.   isClient     = false;
  468.   isConnected  = false;
  469.   isStarted    = false;
  470.   currentPhase = 0;
  471.   signalHandler(0);
  472.   
  473.   if(argc < 4){
  474.     usage(progName);
  475.     return 0;
  476.   }
  477.   
  478.   const NodeId localNodeId   = atoi(argv[1]);
  479.   const char * localHostName = argv[2];
  480.   const char * remoteHost1   = argv[3];
  481.   
  482.   if(argc >= 5)
  483.     loopCount = atoi(argv[4]);
  484.   if(argc >= 6)
  485.     sendBufSz = atoi(argv[5]);
  486.   if(argc >= 7)
  487.     recvBufSz = atoi(argv[6]);
  488.   if(localNodeId < 1 || localNodeId > 2){
  489.     ndbout << "localNodeId = " << localNodeId << endl << endl;
  490.     usage(progName);
  491.     return 0;
  492.   }
  493.   
  494.   if(localNodeId == 1)
  495.     ndbout << "-- ECHO CLIENT --" << endl;
  496.   else
  497.     ndbout << "-- ECHO SERVER --" << endl;
  498.   ndbout << "localNodeId:           " << localNodeId << endl;
  499.   ndbout << "localHostName:         " << localHostName << endl;
  500.   ndbout << "remoteHost1 (node " << (localNodeId == 1?2:1) << "): " 
  501.  << remoteHost1 << endl;
  502.   ndbout << "Loop count: " << loopCount << endl;
  503.   ndbout << "-----------------" << endl;
  504.   
  505.   void * confTemplate = 0;
  506.   CreateTransporterFunc func = 0;
  507.   switch(tt){
  508.   case TestTCP:
  509.     func = createTCPTransporter;
  510.     confTemplate = &tcpTemplate;
  511.     break;
  512.   case TestOSE:
  513.     func = createOSETransporter;
  514.     confTemplate = &oseTemplate;
  515.     break;
  516.   case TestSCI:
  517.     func = createSCITransporter;
  518.     confTemplate = &sciTemplate;
  519.     break;
  520.   case TestSHM:
  521.     func = createSHMTransporter;
  522.     confTemplate = &shmTemplate;
  523.     break;
  524.   default:
  525.     ndbout << "Unsupported transporter type" << endl;
  526.     return 0;
  527.   }
  528.   
  529.   ndbout << "Creating transporter registry" << endl;
  530.   tReg = new TransporterRegistry;
  531.   tReg->init(localNodeId);
  532.   
  533.   switch(localNodeId){
  534.   case 1:
  535.     (* func)(confTemplate, 1, 2, localHostName, remoteHost1, 
  536.      sendBufSz, recvBufSz);
  537.     break;
  538.   case 2:
  539.     (* func)(confTemplate, 2, 1, localHostName, remoteHost1,
  540.      sendBufSz, recvBufSz);
  541.     break;
  542.   }
  543.   
  544.   ndbout << "Doing startSending/startReceiving" << endl;
  545.   tReg->startSending();
  546.   tReg->startReceiving();
  547.   
  548.   ndbout << "Connecting" << endl;
  549.   tReg->setPerformState(PerformConnect);
  550.   tReg->checkConnections();
  551.   if(localNodeId == 1)
  552.     client(2);
  553.   else
  554.     server();
  555.     
  556.   isStarted = false;
  557.   
  558.   ndbout << "Sleep 3 secs" << endl;
  559.   NdbSleep_SecSleep(3);
  560.   ndbout << "Doing setPerformState(Disconnect)" << endl;
  561.   tReg->setPerformState(PerformDisconnect);
  562.   
  563.   ndbout << "Doing checkConnections()" << endl;
  564.   tReg->checkConnections();
  565.     
  566.   ndbout << "Deleting transporter registry" << endl;
  567.   delete tReg; tReg = 0;
  568.   
  569.   return 0;
  570. }
  571. NdbOut & operator <<(NdbOut & out, SignalHeader & sh){
  572.   out << "-- Signal Header --" << endl;
  573.   out << "theLength:    " << sh.theLength << endl;
  574.   out << "gsn:          " << sh.theVerId_signalNumber << endl;
  575.   out << "recBlockNo:   " << sh.theReceiversBlockNumber << endl;
  576.   out << "sendBlockRef: " << sh.theSendersBlockRef << endl;
  577.   out << "sendersSig:   " << sh.theSendersSignalId << endl;
  578.   out << "theSignalId:  " << sh.theSignalId << endl;
  579.   out << "trace:        " << (int)sh.theTrace << endl;
  580.   return out;
  581. void
  582. execute(SignalHeader * const header, Uint8 prio, Uint32 * const theData){
  583.   const NodeId nodeId = refToNode(header->theSendersBlockRef);
  584.   NDB_TICKS sec = 0;
  585.   Uint32 micro=0;
  586.   int ret = NdbTick_CurrentMicrosecond(&sec,&micro);
  587.   if(prio == 0 && isClient && ret == 0) {
  588.     allPhases[currentPhase].stopTimePrioA  = micro + sec*1000000;
  589.     allPhases[currentPhase].totTimePrioA += 
  590.       allPhases[currentPhase].stopTimePrioA -  
  591.       allPhases[currentPhase].startTimePrioA;
  592.   }
  593.   if(ret!=0)
  594.     allPhases[currentPhase].totTimePrioA = -1;
  595.   if(isClient){
  596.     allPhases[currentPhase].noOfSignalReceived++;
  597.   } else {
  598.     int sleepTime = 10;
  599.     while(tReg->prepareSend(header, prio, theData, nodeId) != SEND_OK){
  600.       ndbout << "Failed to echo" << sleepTime << endl;
  601.       NdbSleep_MilliSleep(sleepTime);
  602.       // sleepTime += 10;
  603.     }
  604.     
  605.     signalToEcho--;
  606.   }
  607. }
  608. void
  609. reportError(NodeId nodeId, TransporterError errorCode){
  610.   char buf[255];
  611.   sprintf(buf, "reportError (%d, %x) in perfTest", nodeId, errorCode);
  612.   ndbout << buf << endl;
  613.   if(errorCode & 0x8000){
  614.     tReg->setPerformState(nodeId, PerformDisconnect);
  615.   }
  616. }
  617. /**
  618.  * Report average send theLength in bytes (4096 last sends)
  619.  */
  620. void
  621. reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes){
  622.   allPhases[currentPhase].sendCount    += count;
  623.   allPhases[currentPhase].sendLenBytes += bytes;
  624.   if(!isClient){
  625.     ndbout << "reportSendLen(" << nodeId << ", " 
  626.    << (bytes/count) << ")" << endl;
  627.   }
  628. }
  629. /**
  630.  * Report average receive theLength in bytes (4096 last receives)
  631.  */
  632. void
  633. reportReceiveLen(NodeId nodeId, Uint32 count, Uint64 bytes){
  634.   allPhases[currentPhase].recvCount    += count;
  635.   allPhases[currentPhase].recvLenBytes += bytes;
  636.   if(!isClient){
  637.     ndbout << "reportReceiveLen(" << nodeId << ", " 
  638.    << (bytes/count) << ")" << endl;
  639.   }
  640. }
  641. /**
  642.  * Report connection established
  643.  */
  644. void
  645. reportConnect(NodeId nodeId){
  646.   char buf[255];
  647.   sprintf(buf, "reportConnect(%d)", nodeId);
  648.   ndbout << buf << endl;
  649.   tReg->setPerformState(nodeId, PerformIO);
  650.   
  651.   if(!isStarted){
  652.     isStarted = true;
  653.     startTime = NdbTick_CurrentMillisecond();
  654.     if(isClient){
  655.       reportHeader();
  656.       allPhases[0].startTime = startTime;
  657.     }
  658.   }
  659.   else{
  660.     // Resend signals that were lost when connection failed
  661.     TestPhase * current = &allPhases[currentPhase];
  662.     current->noOfSignalSent = current->noOfSignalReceived;
  663.   }
  664. }
  665. /**
  666.  * Report connection broken
  667.  */
  668. void
  669. reportDisconnect(NodeId nodeId, Uint32 errNo){
  670.   char buf[255];
  671.   sprintf(buf, "reportDisconnect(%d)", nodeId);
  672.   ndbout << buf << endl;
  673.   
  674.   if(isStarted)
  675.     tReg->setPerformState(nodeId, PerformConnect);
  676. }
  677. int
  678. checkJobBuffer() {
  679.   /** 
  680.    * Check to see if jobbbuffers are starting to get full
  681.    * and if so call doJob
  682.    */
  683.   return 0;
  684. }