basicTransporterTest.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:14k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #include <ndb_global.h>
  14. #include "TransporterRegistry.hpp"
  15. #include "TransporterDefinitions.hpp"
  16. #include "TransporterCallback.hpp"
  17. #include <RefConvert.hpp>
  18. #include <NdbTick.h>
  19. #include <NdbMain.h>
  20. #include <NdbOut.hpp>
  21. #include <NdbSleep.h>
  22. int basePortTCP = 17000;
  23. SCI_TransporterConfiguration sciTemplate = {
  24.   8000, 
  25.        // Packet size
  26.   2500000,      // Buffer size
  27.   2,           // number of adapters
  28.   1,           // remote node id SCI 
  29.   2,           // Remote node Id SCI
  30.   0,           // local ndb node id (server)
  31.   0,           // remote ndb node id (client)
  32.   0,              // byteOrder;
  33.   false,          // compression;
  34.   true,           // checksum;
  35.   true            // signalId;
  36. };
  37. TCP_TransporterConfiguration tcpTemplate = {
  38.   17000,          // port; 
  39.   "",             // remoteHostName;
  40.   "",             // localhostname
  41.   2,              // remoteNodeId;
  42.   1,              // localNodeId;
  43.   10000,          // sendBufferSize - Size of SendBuffer of priority B 
  44.   10000,          // maxReceiveSize - Maximum no of bytes to receive
  45.   0,              // byteOrder;
  46.   false,          // compression;
  47.   true,           // checksum;
  48.   true            // signalId;
  49. };
  50. OSE_TransporterConfiguration oseTemplate = {
  51.   "",    // remoteHostName;
  52.   "",    // localHostName;
  53.   0,     // remoteNodeId;
  54.   0,     // localNodeId;
  55.   false, // compression;
  56.   true,  // checksum;
  57.   true,  // signalId;
  58.   0,     // byteOrder;
  59.   
  60.   2000,  // prioASignalSize;
  61.   1000,  // prioBSignalSize;
  62.   10
  63. };
  64. SHM_TransporterConfiguration shmTemplate = {
  65.   0,      //remoteNodeId
  66.   0,      //localNodeId;
  67.   false,  //compression
  68.   true,   //checksum;
  69.   true,   //signalId;
  70.   0,      //byteOrder;
  71.   123,    //shmKey;
  72.   2500000 //shmSize;
  73. };
  74. TransporterRegistry *tReg = 0;
  75. #ifndef OSE_DELTA
  76. #include <signal.h>
  77. #endif
  78. extern "C"
  79. void
  80. signalHandler(int signo){
  81. #ifndef OSE_DELTA
  82.   ::signal(13, signalHandler);
  83. #endif
  84.   char buf[255];
  85.   sprintf(buf,"Signal: %dn", signo);
  86.   ndbout << buf << endl;
  87. }
  88. void 
  89. usage(const char * progName){
  90.   ndbout << "Usage: " << progName << " <type> localNodeId localHostName" 
  91.  << " remoteHostName1 remoteHostName2" << endl;
  92.   ndbout << "  type = shm tcp ose sci" << endl;
  93.   ndbout << "  localNodeId - 1 to 3" << endl;
  94. }
  95. typedef void (* CreateTransporterFunc)(void * conf, 
  96.        NodeId localNodeId,
  97.        NodeId remoteNodeId,
  98.        const char * localHostName,
  99.        const char * remoteHostName);
  100. void createOSETransporter(void *, NodeId, NodeId, const char *, const char *);
  101. void createSCITransporter(void *, NodeId, NodeId, const char *, const char *);
  102. void createTCPTransporter(void *, NodeId, NodeId, const char *, const char *);
  103. void createSHMTransporter(void *, NodeId, NodeId, const char *, const char *);
  104. int signalReceived[4];
  105. int
  106. main(int argc, const char **argv){
  107.   
  108.   signalHandler(0);
  109.   
  110.   for(int i = 0; i<4; i++)
  111.     signalReceived[i] = 0;
  112.   
  113.   if(argc < 5){
  114.     usage(argv[0]);
  115.     return 0;
  116.   }
  117.   Uint32 noOfConnections     = 0;
  118.   const char * progName      = argv[0];
  119.   const char * type          = argv[1];
  120.   const NodeId localNodeId   = atoi(argv[2]);
  121.   const char * localHostName = argv[3];
  122.   const char * remoteHost1   = argv[4];
  123.   const char * remoteHost2   = NULL;
  124.   
  125.   if(argc == 5)
  126.     noOfConnections = 1;
  127.   else {
  128.     noOfConnections = 2;
  129.     remoteHost2 = argv[5];
  130.   }
  131.   
  132.   if(localNodeId < 1 || localNodeId > 3){
  133.     ndbout << "localNodeId = " << localNodeId << endl << endl;
  134.     usage(progName);
  135.     return 0;
  136.   }
  137.   
  138.   ndbout << "-----------------" << endl;
  139.   ndbout << "localNodeId:           " << localNodeId << endl;
  140.   ndbout << "localHostName:         " << localHostName << endl;
  141.   ndbout << "remoteHost1 (node " << (localNodeId == 1?2:1) << "): " 
  142.  << remoteHost1 << endl;
  143.   if(noOfConnections == 2){
  144.     ndbout << "remoteHost2 (node " << (localNodeId == 3?2:3) << "): " 
  145.    << remoteHost2 << endl;
  146.   }
  147.   ndbout << "-----------------" << endl;
  148.   
  149.   void * confTemplate = 0;
  150.   CreateTransporterFunc func = 0;
  151.   if(strcasecmp(type, "tcp") == 0){
  152.     func = createTCPTransporter;
  153.     confTemplate = &tcpTemplate;
  154.   } else if(strcasecmp(type, "ose") == 0){
  155.     func = createOSETransporter;
  156.     confTemplate = &oseTemplate;
  157.   } else if(strcasecmp(type, "sci") == 0){
  158.     func = createSCITransporter;
  159.     confTemplate = &sciTemplate;
  160.   } else if(strcasecmp(type, "shm") == 0){
  161.     func = createSHMTransporter;
  162.     confTemplate = &shmTemplate;
  163.   } else {
  164.     ndbout << "Unsupported transporter type" << endl;
  165.     return 0;
  166.   }
  167.   
  168.   ndbout << "Creating transporter registry" << endl;
  169.   tReg = new TransporterRegistry;
  170.   tReg->init(localNodeId);
  171.   
  172.   switch(localNodeId){
  173.   case 1:
  174.     (* func)(confTemplate, 1, 2, localHostName, remoteHost1);
  175.     if(noOfConnections == 2)
  176.       (* func)(confTemplate, 1, 3, localHostName, remoteHost2);
  177.     break;
  178.   case 2:
  179.     (* func)(confTemplate, 2, 1, localHostName, remoteHost1);
  180.     if(noOfConnections == 2)
  181.       (* func)(confTemplate, 2, 3, localHostName, remoteHost2);
  182.     break;
  183.   case 3:
  184.     (* func)(confTemplate, 3, 1, localHostName, remoteHost1);
  185.     if(noOfConnections == 2)
  186.       (* func)(confTemplate, 3, 2, localHostName, remoteHost2);
  187.     break;
  188.   }
  189.   
  190.   ndbout << "Doing startSending/startReceiving" << endl;
  191.   tReg->startSending();
  192.   tReg->startReceiving();
  193.   
  194.   ndbout << "Connecting" << endl;
  195.   tReg->setPerformState(PerformConnect);
  196.   tReg->checkConnections();
  197.   
  198.   unsigned sum = 0;
  199.   do {
  200.     sum = 0;
  201.     for(int i = 0; i<4; i++)
  202.       sum += signalReceived[i];
  203.     
  204.     tReg->checkConnections();
  205.     
  206.     tReg->external_IO(500);
  207.     NdbSleep_MilliSleep(500);
  208.     ndbout << "In main loop" << endl;
  209.   } while(sum != 2*noOfConnections);
  210.   
  211.   ndbout << "Doing setPerformState(Disconnect)" << endl;
  212.   tReg->setPerformState(PerformDisconnect);
  213.   
  214.   ndbout << "Doing checkConnections()" << endl;
  215.   tReg->checkConnections();
  216.   
  217.   ndbout << "Sleeping 3 secs" << endl;
  218.   NdbSleep_SecSleep(3);
  219.   
  220.   ndbout << "Deleting transporter registry" << endl;
  221.   delete tReg; tReg = 0;
  222.   
  223.   return 0;
  224. }
  225. void
  226. checkData(SignalHeader * const header, Uint8 prio, Uint32 * const theData,
  227.   LinearSectionPtr ptr[3]){
  228.   Uint32 expectedLength = 0;
  229.   if(prio == 0)
  230.     expectedLength = 17;
  231.   else
  232.     expectedLength = 19;
  233.   if(header->theLength != expectedLength){
  234.     ndbout << "Unexpected signal length: " << header->theLength 
  235.    << " expected: " << expectedLength << endl;
  236.     abort();
  237.   }
  238.   if(header->theVerId_signalNumber != expectedLength + 1)
  239.     abort();
  240.   if(header->theReceiversBlockNumber != expectedLength + 2)
  241.     abort();
  242.   if(refToBlock(header->theSendersBlockRef) != expectedLength + 3)
  243.     abort();
  244.   if(header->theSendersSignalId != expectedLength + 5)
  245.     abort();
  246.   if(header->theTrace != expectedLength + 6)
  247.     abort();
  248.   if(header->m_noOfSections != (prio == 0 ? 0 : 1))
  249.     abort();
  250.   if(header->m_fragmentInfo != (prio + 1))
  251.     abort();
  252.   
  253.   Uint32 dataWordStart = header->theLength ;
  254.   for(unsigned i = 0; i<header->theLength; i++){
  255.     if(theData[i] != i){ //dataWordStart){
  256.       ndbout << "data corrupt!n" << endl;
  257.       abort();
  258.     }
  259.     dataWordStart ^= (~i*i);
  260.   }
  261.   if(prio != 0){
  262.     ndbout_c("Found section");
  263.     if(ptr[0].sz != header->theLength)
  264.       abort();
  265.     if(memcmp(ptr[0].p, theData, (ptr[0].sz * 4)) != 0)
  266.       abort();
  267.   }
  268. }
  269. void
  270. sendSignalTo(NodeId nodeId, int prio){
  271.   SignalHeader sh;
  272.   sh.theLength = (prio == 0 ? 17 : 19); 
  273.   sh.theVerId_signalNumber   = sh.theLength + 1;
  274.   sh.theReceiversBlockNumber = sh.theLength + 2;
  275.   sh.theSendersBlockRef      = sh.theLength + 3;
  276.   sh.theSendersSignalId      = sh.theLength + 4;
  277.   sh.theSignalId             = sh.theLength + 5;
  278.   sh.theTrace                = sh.theLength + 6;
  279.   sh.m_noOfSections          = (prio == 0 ? 0 : 1);
  280.   sh.m_fragmentInfo          = prio + 1;
  281.   
  282.   Uint32 theData[25];
  283.   Uint32 dataWordStart = sh.theLength;
  284.   for(unsigned i = 0; i<sh.theLength; i++){
  285.     theData[i] = i;
  286.     dataWordStart ^= (~i*i);
  287.   }
  288.   ndbout << "Sending prio " << (int)prio << " signal to node: " 
  289.  << nodeId 
  290.  << " gsn = " << sh.theVerId_signalNumber << endl;
  291.   
  292.   LinearSectionPtr ptr[3];
  293.   ptr[0].p = &theData[0];
  294.   ptr[0].sz = sh.theLength;
  295.   SendStatus s = tReg->prepareSend(&sh, prio, theData, nodeId, ptr);
  296.   if(s != SEND_OK){
  297.     ndbout << "Send was not ok. Send was: " << s << endl;
  298.   }
  299. }
  300. void
  301. execute(void* callbackObj, 
  302. SignalHeader * const header, Uint8 prio, Uint32 * const theData, 
  303. LinearSectionPtr ptr[3]){
  304.   const NodeId nodeId = refToNode(header->theSendersBlockRef);
  305.   
  306.   ndbout << "Recieved prio " << (int)prio << " signal from node: " 
  307.  << nodeId
  308.  << " gsn = " << header->theVerId_signalNumber << endl;
  309.   checkData(header, prio, theData, ptr);
  310.   ndbout << " Data is ok!n" << endl;
  311.   
  312.   signalReceived[nodeId]++;
  313.   
  314.   if(prio == 0)
  315.     sendSignalTo(nodeId, 1);
  316.   else
  317.     tReg->setPerformState(nodeId, PerformDisconnect);
  318. }
  319. void 
  320. copy(Uint32 * & insertPtr, 
  321.      class SectionSegmentPool & thePool, const SegmentedSectionPtr & _ptr){
  322.   abort();
  323. }
  324. void
  325. reportError(void* callbackObj, NodeId nodeId, TransporterError errorCode){
  326.   char buf[255];
  327.   sprintf(buf, "reportError (%d, %x)", nodeId, errorCode);
  328.   ndbout << buf << endl;
  329.   if(errorCode & 0x8000){
  330.     tReg->setPerformState(nodeId, PerformDisconnect);
  331.     abort();
  332.   }
  333. }
  334. /**
  335.  * Report average send theLength in bytes (4096 last sends)
  336.  */
  337. void
  338. reportSendLen(void* callbackObj, NodeId nodeId, Uint32 count, Uint64 bytes){
  339.   char buf[255];
  340.   sprintf(buf, "reportSendLen(%d, %d)", nodeId, (Uint32)(bytes/count));
  341.   ndbout << buf << endl;
  342. }
  343. /**
  344.  * Report average receive theLength in bytes (4096 last receives)
  345.  */
  346. void
  347. reportReceiveLen(void* callbackObj, NodeId nodeId, Uint32 count, Uint64 bytes){
  348.   char buf[255];
  349.   sprintf(buf, "reportReceiveLen(%d, %d)", nodeId, (Uint32)(bytes/count));
  350.   ndbout << buf << endl;
  351. }
  352. /**
  353.  * Report connection established
  354.  */
  355. void
  356. reportConnect(void* callbackObj, NodeId nodeId){
  357.   char buf[255];
  358.   sprintf(buf, "reportConnect(%d)", nodeId);
  359.   ndbout << buf << endl;
  360.   tReg->setPerformState(nodeId, PerformIO);
  361.   
  362.   sendSignalTo(nodeId, 0);
  363. }
  364. /**
  365.  * Report connection broken
  366.  */
  367. void
  368. reportDisconnect(void* callbackObj, NodeId nodeId, Uint32 errNo){
  369.   char buf[255];
  370.   sprintf(buf, "reportDisconnect(%d)", nodeId);
  371.   ndbout << buf << endl;
  372.   if(signalReceived[nodeId] < 2)
  373.     tReg->setPerformState(nodeId, PerformConnect);
  374. }
  375. int
  376. checkJobBuffer() {
  377.   /** 
  378.    * Check to see if jobbbuffers are starting to get full
  379.    * and if so call doJob
  380.    */
  381.   return 0;
  382. }
  383. void
  384. createOSETransporter(void * _conf,
  385.      NodeId localNodeId,
  386.      NodeId remoteNodeId,
  387.      const char * localHostName,
  388.      const char * remoteHostName){
  389.   ndbout << "Creating OSE transporter from node " 
  390.  << localNodeId << "(" << localHostName << ") to "
  391.  << remoteNodeId << "(" << remoteHostName << ")..." << endl;;
  392.   
  393.   OSE_TransporterConfiguration * conf = (OSE_TransporterConfiguration*)_conf;
  394.   
  395.   conf->localNodeId    = localNodeId;
  396.   conf->localHostName  = localHostName;
  397.   conf->remoteNodeId   = remoteNodeId;
  398.   conf->remoteHostName = remoteHostName;
  399.   bool res = tReg->createTransporter(conf);
  400.   if(res)
  401.     ndbout << "... -- Success " << endl;
  402.   else
  403.     ndbout << "... -- Failure " << endl;
  404. }
  405. void
  406. createTCPTransporter(void * _conf,
  407.      NodeId localNodeId,
  408.      NodeId remoteNodeId,
  409.      const char * localHostName,
  410.      const char * remoteHostName){
  411.   ndbout << "Creating TCP transporter from node " 
  412.  << localNodeId << "(" << localHostName << ") to "
  413.  << remoteNodeId << "(" << remoteHostName << ")..." << endl;;
  414.   
  415.   TCP_TransporterConfiguration * conf = (TCP_TransporterConfiguration*)_conf;
  416.   
  417.   int port;
  418.   if(localNodeId == 1 && remoteNodeId == 2) port = basePortTCP + 0;
  419.   if(localNodeId == 1 && remoteNodeId == 3) port = basePortTCP + 1;
  420.   if(localNodeId == 2 && remoteNodeId == 1) port = basePortTCP + 0;
  421.   if(localNodeId == 2 && remoteNodeId == 3) port = basePortTCP + 2;
  422.   if(localNodeId == 3 && remoteNodeId == 1) port = basePortTCP + 1;
  423.   if(localNodeId == 3 && remoteNodeId == 2) port = basePortTCP + 2;
  424.   
  425.   conf->localNodeId    = localNodeId;
  426.   conf->localHostName  = localHostName;
  427.   conf->remoteNodeId   = remoteNodeId;
  428.   conf->remoteHostName = remoteHostName;
  429.   conf->port           = port;
  430.   bool res = tReg->createTransporter(conf);
  431.   if(res)
  432.     ndbout << "... -- Success " << endl;
  433.   else
  434.     ndbout << "... -- Failure " << endl;
  435. }
  436. void
  437. createSCITransporter(void * _conf,
  438.      NodeId localNodeId,
  439.      NodeId remoteNodeId,
  440.      const char * localHostName,
  441.      const char * remoteHostName){
  442.   ndbout << "Creating SCI transporter from node " 
  443.  << localNodeId << "(" << localHostName << ") to "
  444.  << remoteNodeId << "(" << remoteHostName << ")..." << endl;;
  445.   
  446.   
  447.   SCI_TransporterConfiguration * conf = (SCI_TransporterConfiguration*)_conf;
  448.   conf->remoteSciNodeId0= (Uint16)atoi(localHostName);
  449.   conf->remoteSciNodeId1= (Uint16)atoi(remoteHostName);
  450.   conf->localNodeId    = localNodeId;
  451.   conf->remoteNodeId   = remoteNodeId;
  452.   bool res = tReg->createTransporter(conf);
  453.   if(res)
  454.     ndbout << "... -- Success " << endl;
  455.   else
  456.     ndbout << "... -- Failure " << endl;
  457. }
  458. void
  459. createSHMTransporter(void * _conf,
  460.      NodeId localNodeId,
  461.      NodeId remoteNodeId,
  462.      const char * localHostName,
  463.      const char * remoteHostName){
  464.   ndbout << "Creating SHM transporter from node " 
  465.  << localNodeId << "(" << localHostName << ") to "
  466.  << remoteNodeId << "(" << remoteHostName << ")..." << endl;;
  467.   
  468.   
  469.   SHM_TransporterConfiguration * conf = (SHM_TransporterConfiguration*)_conf;
  470.   conf->localNodeId    = localNodeId;
  471.   conf->remoteNodeId   = remoteNodeId;
  472.   
  473.   bool res = tReg->createTransporter(conf);
  474.   if(res)
  475.     ndbout << "... -- Success " << endl;
  476.   else
  477.     ndbout << "... -- Failure " << endl;
  478. }