TransporterFacade.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:31k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #include <ndb_global.h>
  14. #include <my_pthread.h>
  15. #include <ndb_limits.h>
  16. #include "TransporterFacade.hpp"
  17. #include "ClusterMgr.hpp"
  18. #include <IPCConfig.hpp>
  19. #include <TransporterCallback.hpp>
  20. #include <TransporterRegistry.hpp>
  21. #include "NdbApiSignal.hpp"
  22. #include <NdbOut.hpp>
  23. #include <NdbEnv.h>
  24. #include <NdbSleep.h>
  25. #include "API.hpp"
  26. #include <ConfigRetriever.hpp>
  27. #include <mgmapi_config_parameters.h>
  28. #include <mgmapi_configuration.hpp>
  29. #include <NdbConfig.h>
  30. #include <ndb_version.h>
  31. #include <SignalLoggerManager.hpp>
  32. #include <kernel/ndb_limits.h>
  33. #include <signaldata/AlterTable.hpp>
  34. //#define REPORT_TRANSPORTER
  35. //#define API_TRACE;
  36. static int numberToIndex(int number)
  37. {
  38.   return number - MIN_API_BLOCK_NO;
  39. }
  40. static int indexToNumber(int index)
  41. {
  42.   return index + MIN_API_BLOCK_NO;
  43. }
  44. #if defined DEBUG_TRANSPORTER
  45. #define TRP_DEBUG(t) ndbout << __FILE__ << ":" << __LINE__ << ":" << t << endl;
  46. #else
  47. #define TRP_DEBUG(t)
  48. #endif
  49. TransporterFacade* TransporterFacade::theFacadeInstance = NULL;
  50. /*****************************************************************************
  51.  * Call back functions
  52.  *****************************************************************************/
  53. void
  54. reportError(void * callbackObj, NodeId nodeId, TransporterError errorCode){
  55. #ifdef REPORT_TRANSPORTER
  56.   ndbout_c("REPORT_TRANSP: reportError (nodeId=%d, errorCode=%d)", 
  57.    (int)nodeId, (int)errorCode);
  58. #endif
  59.   if(errorCode & 0x8000) {
  60.     ndbout_c("reportError (%d, %d)n", (int)nodeId, (int)errorCode);
  61.     ((TransporterFacade*)(callbackObj))->doDisconnect(nodeId);
  62.   }
  63. }
  64. /**
  65.  * Report average send length in bytes (4096 last sends)
  66.  */
  67. void
  68. reportSendLen(void * callbackObj, NodeId nodeId, Uint32 count, Uint64 bytes){
  69. #ifdef REPORT_TRANSPORTER
  70.   ndbout_c("REPORT_TRANSP: reportSendLen (nodeId=%d, bytes/count=%d)", 
  71.    (int)nodeId, (Uint32)(bytes/count));
  72. #endif
  73.   (void)nodeId;
  74.   (void)count;
  75.   (void)bytes;
  76. }
  77. /** 
  78.  * Report average receive length in bytes (4096 last receives)
  79.  */
  80. void
  81. reportReceiveLen(void * callbackObj, 
  82.  NodeId nodeId, Uint32 count, Uint64 bytes){
  83. #ifdef REPORT_TRANSPORTER
  84.   ndbout_c("REPORT_TRANSP: reportReceiveLen (nodeId=%d, bytes/count=%d)", 
  85.    (int)nodeId, (Uint32)(bytes/count));
  86. #endif
  87.   (void)nodeId;
  88.   (void)count;
  89.   (void)bytes;
  90. }
  91. /**
  92.  * Report connection established
  93.  */
  94. void
  95. reportConnect(void * callbackObj, NodeId nodeId){
  96. #ifdef REPORT_TRANSPORTER
  97.   ndbout_c("REPORT_TRANSP: API reportConnect (nodeId=%d)", (int)nodeId);
  98. #endif
  99.   ((TransporterFacade*)(callbackObj))->reportConnected(nodeId);
  100.   //  TransporterFacade::instance()->reportConnected(nodeId);
  101. }
  102. /**
  103.  * Report connection broken
  104.  */
  105. void
  106. reportDisconnect(void * callbackObj, NodeId nodeId, Uint32 error){
  107. #ifdef REPORT_TRANSPORTER
  108.   ndbout_c("REPORT_TRANSP: API reportDisconnect (nodeId=%d)", (int)nodeId);
  109. #endif
  110.   ((TransporterFacade*)(callbackObj))->reportDisconnected(nodeId);
  111.   //TransporterFacade::instance()->reportDisconnected(nodeId);
  112. }
  113. /****************************************************************************
  114.  * 
  115.  *****************************************************************************/
  116. /**
  117.  * Report connection broken
  118.  */
  119. int checkJobBuffer() {
  120.   return 0;
  121. }
  122. #ifdef API_TRACE
  123. static const char * API_SIGNAL_LOG = "API_SIGNAL_LOG";
  124. static const char * apiSignalLog   = 0;
  125. static SignalLoggerManager signalLogger;
  126. static
  127. inline
  128. bool
  129. setSignalLog(){
  130.   signalLogger.flushSignalLog();
  131.   const char * tmp = NdbEnv_GetEnv(API_SIGNAL_LOG, (char *)0, 0);
  132.   if(tmp != 0 && apiSignalLog != 0 && strcmp(tmp,apiSignalLog) == 0){
  133.     return true;
  134.   } else if(tmp == 0 && apiSignalLog == 0){
  135.     return false;
  136.   } else if(tmp == 0 && apiSignalLog != 0){
  137.     signalLogger.setOutputStream(0);
  138.     apiSignalLog = tmp;
  139.     return false;
  140.   } else if(tmp !=0){
  141.     if (strcmp(tmp, "-") == 0)
  142.         signalLogger.setOutputStream(stdout);
  143. #ifndef DBUG_OFF
  144.     else if (strcmp(tmp, "+") == 0)
  145.         signalLogger.setOutputStream(DBUG_FILE);
  146. #endif
  147.     else
  148.         signalLogger.setOutputStream(fopen(tmp, "w"));
  149.     apiSignalLog = tmp;
  150.     return true;
  151.   }
  152.   return false;
  153. }
  154. #ifdef TRACE_APIREGREQ
  155. #define TRACE_GSN(gsn) true
  156. #else
  157. #define TRACE_GSN(gsn) (gsn != GSN_API_REGREQ && gsn != GSN_API_REGCONF)
  158. #endif
  159. #endif
  160. /**
  161.  * The execute function : Handle received signal
  162.  */
  163. void
  164. execute(void * callbackObj, SignalHeader * const header, 
  165. Uint8 prio, Uint32 * const theData,
  166. LinearSectionPtr ptr[3]){
  167.   TransporterFacade * theFacade = (TransporterFacade*)callbackObj;
  168.   TransporterFacade::ThreadData::Object_Execute oe; 
  169.   Uint32 tRecBlockNo = header->theReceiversBlockNumber;
  170.   
  171. #ifdef API_TRACE
  172.   if(setSignalLog() && TRACE_GSN(header->theVerId_signalNumber)){
  173.     signalLogger.executeSignal(* header, 
  174.        prio,
  175.                                theData,
  176.        theFacade->ownId(), 
  177.                                ptr, header->m_noOfSections);
  178.     signalLogger.flushSignalLog();
  179.   }
  180. #endif  
  181.   if (tRecBlockNo >= MIN_API_BLOCK_NO) {
  182.     oe = theFacade->m_threads.get(tRecBlockNo);
  183.     if (oe.m_object != 0 && oe.m_executeFunction != 0) {
  184.       /**
  185.        * Handle received signal immediately to avoid any unnecessary
  186.        * copying of data, allocation of memory and other things. Copying
  187.        * of data could be interesting to support several priority levels
  188.        * and to support a special memory structure when executing the
  189.        * signals. Neither of those are interesting when receiving data
  190.        * in the NDBAPI. The NDBAPI will thus read signal data directly as
  191.        * it was written by the sender (SCI sender is other node, Shared
  192.        * memory sender is other process and TCP/IP sender is the OS that
  193.        * writes the TCP/IP message into a message buffer).
  194.        */
  195.       NdbApiSignal tmpSignal(*header);
  196.       NdbApiSignal * tSignal = &tmpSignal;
  197.       tSignal->setDataPtr(theData);
  198.       (* oe.m_executeFunction) (oe.m_object, tSignal, ptr);
  199.     }//if
  200.   } else if (tRecBlockNo == API_PACKED) {
  201.     /**
  202.      * Block number == 2047 is used to signal a signal that consists of
  203.      * multiple instances of the same signal. This is an effort to
  204.      * package the signals so as to avoid unnecessary communication
  205.      * overhead since TCP/IP has a great performance impact.
  206.      */
  207.     Uint32 Tlength = header->theLength;
  208.     Uint32 Tsent = 0;
  209.     /**
  210.      * Since it contains at least two data packets we will first
  211.      * copy the signal data to safe place.
  212.      */
  213.     while (Tsent < Tlength) {
  214.       Uint32 Theader = theData[Tsent];
  215.       Tsent++;
  216.       Uint32 TpacketLen = (Theader & 0x1F) + 3;
  217.       tRecBlockNo = Theader >> 16;
  218.       if (TpacketLen <= 25) {
  219. if ((TpacketLen + Tsent) <= Tlength) {
  220.   /**
  221.    * Set the data length of the signal and the receivers block
  222.    * reference and then call the API.
  223.    */
  224.   header->theLength = TpacketLen;
  225.   header->theReceiversBlockNumber = tRecBlockNo;
  226.   Uint32* tDataPtr = &theData[Tsent];
  227.   Tsent += TpacketLen;
  228.   if (tRecBlockNo >= MIN_API_BLOCK_NO) {
  229.     oe = theFacade->m_threads.get(tRecBlockNo);
  230.     if(oe.m_object != 0 && oe.m_executeFunction != 0){
  231.       NdbApiSignal tmpSignal(*header);
  232.       NdbApiSignal * tSignal = &tmpSignal;
  233.       tSignal->setDataPtr(tDataPtr);
  234.       (*oe.m_executeFunction)(oe.m_object, tSignal, 0);
  235.     }
  236.   }
  237. }
  238.       }
  239.     }
  240.     return;
  241.   } else if (tRecBlockNo == API_CLUSTERMGR) {
  242.      /**
  243.       * The signal was aimed for the Cluster Manager. 
  244.       * We handle it immediately here.
  245.       */     
  246.      ClusterMgr * clusterMgr = theFacade->theClusterMgr;
  247.      const Uint32 gsn = header->theVerId_signalNumber;
  248.      switch (gsn){
  249.      case GSN_API_REGREQ:
  250.        clusterMgr->execAPI_REGREQ(theData);
  251.        break;
  252.      case GSN_API_REGCONF:
  253.        clusterMgr->execAPI_REGCONF(theData);
  254.        break;
  255.      
  256.      case GSN_API_REGREF:
  257.        clusterMgr->execAPI_REGREF(theData);
  258.        break;
  259.      case GSN_NODE_FAILREP:
  260.        clusterMgr->execNODE_FAILREP(theData);
  261.        break;
  262.        
  263.      case GSN_NF_COMPLETEREP:
  264.        clusterMgr->execNF_COMPLETEREP(theData);
  265.        break;
  266.      case GSN_ARBIT_STARTREQ:
  267.        if (theFacade->theArbitMgr != NULL)
  268.  theFacade->theArbitMgr->doStart(theData);
  269.        break;
  270.        
  271.      case GSN_ARBIT_CHOOSEREQ:
  272.        if (theFacade->theArbitMgr != NULL)
  273.  theFacade->theArbitMgr->doChoose(theData);
  274.        break;
  275.        
  276.      case GSN_ARBIT_STOPORD:
  277.        if(theFacade->theArbitMgr != NULL)
  278.  theFacade->theArbitMgr->doStop(theData);
  279.        break;
  280.      case GSN_ALTER_TABLE_REP:
  281.      {
  282.        const AlterTableRep* rep = (const AlterTableRep*)theData;
  283.        theFacade->m_globalDictCache.lock();
  284.        theFacade->m_globalDictCache.
  285.  alter_table_rep((const char*)ptr[0].p, 
  286.  rep->tableId,
  287.  rep->tableVersion,
  288.  rep->changeType == AlterTableRep::CT_ALTERED);
  289.        theFacade->m_globalDictCache.unlock();
  290.      }
  291.      default:
  292.        break;
  293.        
  294.      }
  295.      return;
  296.   } else {
  297.     ; // Ignore all other block numbers.
  298.     if(header->theVerId_signalNumber!=3) {
  299.       TRP_DEBUG( "TransporterFacade received signal to unknown block no." );
  300.       ndbout << "BLOCK NO: "  << tRecBlockNo << " sig " 
  301.      << header->theVerId_signalNumber  << endl;
  302.       abort();
  303.     }
  304.   }
  305. }
  306. // These symbols are needed, but not used in the API
  307. void 
  308. SignalLoggerManager::printSegmentedSection(FILE *, const SignalHeader &,
  309.    const SegmentedSectionPtr ptr[3],
  310.    unsigned i){
  311.   abort();
  312. }
  313. void 
  314. copy(Uint32 * & insertPtr, 
  315.      class SectionSegmentPool & thePool, const SegmentedSectionPtr & _ptr){
  316.   abort();
  317. }
  318. /**
  319.  * Note that this function need no locking since its
  320.  * only called from the constructor of Ndb (the NdbObject)
  321.  * 
  322.  * Which is protected by a mutex
  323.  */
  324. int
  325. TransporterFacade::start_instance(int nodeId, 
  326.   const ndb_mgm_configuration* props)
  327. {
  328.   if (! theFacadeInstance->init(nodeId, props)) {
  329.     return -1;
  330.   }
  331.   
  332.   /**
  333.    * Install signal handler for SIGPIPE
  334.    *
  335.    * This due to the fact that a socket connection might have
  336.    * been closed in between a select and a corresponding send
  337.    */
  338. #if !defined NDB_OSE && !defined NDB_SOFTOSE && !defined NDB_WIN32
  339.   signal(SIGPIPE, SIG_IGN);
  340. #endif
  341.   return 0;
  342. }
  343. /**
  344.  * Note that this function need no locking since its
  345.  * only called from the destructor of Ndb (the NdbObject)
  346.  * 
  347.  * Which is protected by a mutex
  348.  */
  349. void
  350. TransporterFacade::stop_instance(){
  351.   DBUG_ENTER("TransporterFacade::stop_instance");
  352.   if(theFacadeInstance)
  353.     theFacadeInstance->doStop();
  354.   DBUG_VOID_RETURN;
  355. }
  356. void
  357. TransporterFacade::doStop(){
  358.   DBUG_ENTER("TransporterFacade::doStop");
  359.   /**
  360.    * First stop the ClusterMgr because it needs to send one more signal
  361.    * and also uses theFacadeInstance to lock/unlock theMutexPtr
  362.    */
  363.   if (theClusterMgr != NULL) theClusterMgr->doStop();
  364.   if (theArbitMgr != NULL) theArbitMgr->doStop(NULL);
  365.   
  366.   /**
  367.    * Now stop the send and receive threads
  368.    */
  369.   void *status;
  370.   theStopReceive = 1;
  371.   if (theReceiveThread) {
  372.     NdbThread_WaitFor(theReceiveThread, &status);
  373.     NdbThread_Destroy(&theReceiveThread);
  374.   }
  375.   if (theSendThread) {
  376.     NdbThread_WaitFor(theSendThread, &status);
  377.     NdbThread_Destroy(&theSendThread);
  378.   }
  379.   DBUG_VOID_RETURN;
  380. }
  381. extern "C" 
  382. void* 
  383. runSendRequest_C(void * me)
  384. {
  385.   ((TransporterFacade*) me)->threadMainSend();
  386.   return 0;
  387. }
  388. void TransporterFacade::threadMainSend(void)
  389. {
  390.   theTransporterRegistry->startSending();
  391.   if (!theTransporterRegistry->start_clients()){
  392.     ndbout_c("Unable to start theTransporterRegistry->start_clients");
  393.     exit(0);
  394.   }
  395.   m_socket_server.startServer();
  396.   while(!theStopReceive) {
  397.     NdbSleep_MilliSleep(10);
  398.     NdbMutex_Lock(theMutexPtr);
  399.     if (sendPerformedLastInterval == 0) {
  400.       theTransporterRegistry->performSend();
  401.     }
  402.     sendPerformedLastInterval = 0;
  403.     NdbMutex_Unlock(theMutexPtr);
  404.   }
  405.   theTransporterRegistry->stopSending();
  406.   m_socket_server.stopServer();
  407.   m_socket_server.stopSessions(true);
  408.   theTransporterRegistry->stop_clients();
  409. }
  410. extern "C" 
  411. void* 
  412. runReceiveResponse_C(void * me)
  413. {
  414.   ((TransporterFacade*) me)->threadMainReceive();
  415.   return 0;
  416. }
  417. void TransporterFacade::threadMainReceive(void)
  418. {
  419.   theTransporterRegistry->startReceiving();
  420.   NdbMutex_Lock(theMutexPtr);
  421.   theTransporterRegistry->update_connections();
  422.   NdbMutex_Unlock(theMutexPtr);
  423.   while(!theStopReceive) {
  424.     for(int i = 0; i<10; i++){
  425.       const int res = theTransporterRegistry->pollReceive(10);
  426.       if(res > 0){
  427.         NdbMutex_Lock(theMutexPtr);
  428.         theTransporterRegistry->performReceive();
  429.         NdbMutex_Unlock(theMutexPtr);
  430.       }
  431.     }
  432.     NdbMutex_Lock(theMutexPtr);
  433.     theTransporterRegistry->update_connections();
  434.     NdbMutex_Unlock(theMutexPtr);
  435.   }//while
  436.   theTransporterRegistry->stopReceiving();
  437. }
  438. TransporterFacade::TransporterFacade() :
  439.   theTransporterRegistry(0),
  440.   theStopReceive(0),
  441.   theSendThread(NULL),
  442.   theReceiveThread(NULL),
  443.   m_fragmented_signal_id(0)
  444. {
  445.   DBUG_ENTER("TransporterFacade::TransporterFacade");
  446.   theOwnId = 0;
  447.   theMutexPtr = NdbMutex_Create();
  448.   sendPerformedLastInterval = 0;
  449.   checkCounter = 4;
  450.   currentSendLimit = 1;
  451.   theClusterMgr = NULL;
  452.   theArbitMgr = NULL;
  453.   theStartNodeId = 1;
  454.   m_scan_batch_size= MAX_SCAN_BATCH_SIZE;
  455.   m_batch_byte_size= SCAN_BATCH_SIZE;
  456.   m_batch_size= DEF_BATCH_SIZE;
  457.   m_max_trans_id = 0;
  458.   theClusterMgr = new ClusterMgr(* this);
  459.   DBUG_VOID_RETURN;
  460. }
  461. bool
  462. TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props)
  463. {
  464.   DBUG_ENTER("TransporterFacade::init");
  465.   theOwnId = nodeId;
  466.   theTransporterRegistry = new TransporterRegistry(this);
  467.   const int res = IPCConfig::configureTransporters(nodeId, 
  468.    * props, 
  469.    * theTransporterRegistry);
  470.   if(res <= 0){
  471.     TRP_DEBUG( "configureTransporters returned 0 or less" );
  472.     DBUG_RETURN(false);
  473.   }
  474.   
  475.   ndb_mgm_configuration_iterator iter(* props, CFG_SECTION_NODE);
  476.   iter.first();
  477.   theClusterMgr->init(iter);
  478.   
  479.   /**
  480.    * Unless there is a "Name", the initiated transporter is within 
  481.    * an NDB Cluster.  (If "Name" is defined, then the transporter
  482.    * is used to connect to a different system, i.e. NDB Cluster.)
  483.    */
  484. #if 0  
  485.   if (!props->contains("Name")) {
  486. #endif
  487.     iter.first();
  488.     if(iter.find(CFG_NODE_ID, nodeId)){
  489.       TRP_DEBUG( "Node info missing from config." );
  490.       DBUG_RETURN(false);
  491.     }
  492.     
  493.     Uint32 rank = 0;
  494.     if(!iter.get(CFG_NODE_ARBIT_RANK, &rank) && rank>0){
  495.       theArbitMgr = new ArbitMgr(* this);
  496.       theArbitMgr->setRank(rank);
  497.       Uint32 delay = 0;
  498.       iter.get(CFG_NODE_ARBIT_DELAY, &delay);
  499.       theArbitMgr->setDelay(delay);
  500.     }
  501.     Uint32 scan_batch_size= 0;
  502.     if (!iter.get(CFG_MAX_SCAN_BATCH_SIZE, &scan_batch_size)) {
  503.       m_scan_batch_size= scan_batch_size;
  504.     }
  505.     Uint32 batch_byte_size= 0;
  506.     if (!iter.get(CFG_BATCH_BYTE_SIZE, &batch_byte_size)) {
  507.       m_batch_byte_size= batch_byte_size;
  508.     }
  509.     Uint32 batch_size= 0;
  510.     if (!iter.get(CFG_BATCH_SIZE, &batch_size)) {
  511.       m_batch_size= batch_size;
  512.     }
  513. #if 0
  514.   }
  515. #endif
  516.   
  517.   if (!theTransporterRegistry->start_service(m_socket_server)){
  518.     ndbout_c("Unable to start theTransporterRegistry->start_service");
  519.     DBUG_RETURN(false);
  520.   }
  521.   theReceiveThread = NdbThread_Create(runReceiveResponse_C,
  522.                                       (void**)this,
  523.                                       32768,
  524.                                       "ndb_receive",
  525.                                       NDB_THREAD_PRIO_LOW);
  526.   theSendThread = NdbThread_Create(runSendRequest_C,
  527.                                    (void**)this,
  528.                                    32768,
  529.                                    "ndb_send",
  530.                                    NDB_THREAD_PRIO_LOW);
  531.   theClusterMgr->startThread();
  532.   
  533. #ifdef API_TRACE
  534.   signalLogger.logOn(true, 0, SignalLoggerManager::LogInOut);
  535. #endif
  536.   
  537.   DBUG_RETURN(true);
  538. }
  539. void
  540. TransporterFacade::connected()
  541. {
  542.   DBUG_ENTER("TransporterFacade::connected");
  543.   Uint32 sz = m_threads.m_statusNext.size();
  544.   for (Uint32 i = 0; i < sz ; i ++) {
  545.     if (m_threads.getInUse(i)){
  546.       void * obj = m_threads.m_objectExecute[i].m_object;
  547.       NodeStatusFunction RegPC = m_threads.m_statusFunction[i];
  548.       (*RegPC) (obj, numberToRef(indexToNumber(i), theOwnId), true, true);
  549.     }
  550.   }
  551.   DBUG_VOID_RETURN;
  552. }
  553. void
  554. TransporterFacade::ReportNodeDead(NodeId tNodeId)
  555. {
  556.   /**
  557.    * When a node fails we must report this to each Ndb object. 
  558.    * The function that is used for communicating node failures is called.
  559.    * This is to ensure that the Ndb objects do not think their connections 
  560.    * are correct after a failure followed by a restart. 
  561.    * After the restart the node is up again and the Ndb object 
  562.    * might not have noticed the failure.
  563.    */
  564.   Uint32 sz = m_threads.m_statusNext.size();
  565.   for (Uint32 i = 0; i < sz ; i ++) {
  566.     if (m_threads.getInUse(i)){
  567.       void * obj = m_threads.m_objectExecute[i].m_object;
  568.       NodeStatusFunction RegPC = m_threads.m_statusFunction[i];
  569.       (*RegPC) (obj, tNodeId, false, false);
  570.     }
  571.   }
  572. }
  573. void
  574. TransporterFacade::ReportNodeFailureComplete(NodeId tNodeId)
  575. {
  576.   /**
  577.    * When a node fails we must report this to each Ndb object. 
  578.    * The function that is used for communicating node failures is called.
  579.    * This is to ensure that the Ndb objects do not think their connections 
  580.    * are correct after a failure followed by a restart. 
  581.    * After the restart the node is up again and the Ndb object 
  582.    * might not have noticed the failure.
  583.    */
  584.   DBUG_ENTER("TransporterFacade::ReportNodeFailureComplete");
  585.   DBUG_PRINT("enter",("nodeid= %d", tNodeId));
  586.   Uint32 sz = m_threads.m_statusNext.size();
  587.   for (Uint32 i = 0; i < sz ; i ++) {
  588.     if (m_threads.getInUse(i)){
  589.       void * obj = m_threads.m_objectExecute[i].m_object;
  590.       NodeStatusFunction RegPC = m_threads.m_statusFunction[i];
  591.       (*RegPC) (obj, tNodeId, false, true);
  592.     }
  593.   }
  594.   DBUG_VOID_RETURN;
  595. }
  596. void
  597. TransporterFacade::ReportNodeAlive(NodeId tNodeId)
  598. {
  599.   /**
  600.    * When a node fails we must report this to each Ndb object. 
  601.    * The function that is used for communicating node failures is called.
  602.    * This is to ensure that the Ndb objects do not think there connections 
  603.    * are correct after a failure
  604.    * followed by a restart. 
  605.    * After the restart the node is up again and the Ndb object 
  606.    * might not have noticed the failure.
  607.    */
  608.   Uint32 sz = m_threads.m_statusNext.size();
  609.   for (Uint32 i = 0; i < sz ; i ++) {
  610.     if (m_threads.getInUse(i)){
  611.       void * obj = m_threads.m_objectExecute[i].m_object;
  612.       NodeStatusFunction RegPC = m_threads.m_statusFunction[i];
  613.       (*RegPC) (obj, tNodeId, true, false);
  614.     }
  615.   }
  616. }
  617. int 
  618. TransporterFacade::close(BlockNumber blockNumber, Uint64 trans_id)
  619. {
  620.   NdbMutex_Lock(theMutexPtr);
  621.   Uint32 low_bits = (Uint32)trans_id;
  622.   m_max_trans_id = m_max_trans_id > low_bits ? m_max_trans_id : low_bits;
  623.   close_local(blockNumber);
  624.   NdbMutex_Unlock(theMutexPtr);
  625.   return 0;
  626. }
  627. int 
  628. TransporterFacade::close_local(BlockNumber blockNumber){
  629.   m_threads.close(blockNumber);
  630.   return 0;
  631. }
  632. int
  633. TransporterFacade::open(void* objRef, 
  634.                         ExecuteFunction fun, 
  635.                         NodeStatusFunction statusFun)
  636. {
  637.   DBUG_ENTER("TransporterFacade::open");
  638.   int r= m_threads.open(objRef, fun, statusFun);
  639.   if (r < 0)
  640.     DBUG_RETURN(r);
  641. #if 1
  642.   if (theOwnId > 0) {
  643.     (*statusFun)(objRef, numberToRef(r, theOwnId), true, true);
  644.   }
  645. #endif
  646.   DBUG_RETURN(r);
  647. }
  648. TransporterFacade::~TransporterFacade()
  649. {  
  650.   DBUG_ENTER("TransporterFacade::~TransporterFacade");
  651.   NdbMutex_Lock(theMutexPtr);
  652.   delete theClusterMgr;  
  653.   delete theArbitMgr;
  654.   delete theTransporterRegistry;
  655.   NdbMutex_Unlock(theMutexPtr);
  656.   NdbMutex_Destroy(theMutexPtr);
  657. #ifdef API_TRACE
  658.   signalLogger.setOutputStream(0);
  659. #endif
  660.   DBUG_VOID_RETURN;
  661. }
  662. void 
  663. TransporterFacade::calculateSendLimit()
  664. {
  665.   Uint32 Ti;
  666.   Uint32 TthreadCount = 0;
  667.   
  668.   Uint32 sz = m_threads.m_statusNext.size();
  669.   for (Ti = 0; Ti < sz; Ti++) {
  670.     if (m_threads.m_statusNext[Ti] == (ThreadData::ACTIVE)){
  671.       TthreadCount++;
  672.       m_threads.m_statusNext[Ti] = ThreadData::INACTIVE;
  673.     }
  674.   }
  675.   currentSendLimit = TthreadCount;
  676.   if (currentSendLimit == 0) {
  677.     currentSendLimit = 1;
  678.   }
  679.   checkCounter = currentSendLimit << 2;
  680. }
  681. //-------------------------------------------------
  682. // Force sending but still report the sending to the
  683. // adaptive algorithm.
  684. //-------------------------------------------------
  685. void TransporterFacade::forceSend(Uint32 block_number) {
  686.   checkCounter--;
  687.   m_threads.m_statusNext[numberToIndex(block_number)] = ThreadData::ACTIVE;
  688.   sendPerformedLastInterval = 1;
  689.   if (checkCounter < 0) {
  690.     calculateSendLimit();
  691.   }
  692.   theTransporterRegistry->forceSendCheck(0);
  693. }
  694. //-------------------------------------------------
  695. // Improving API performance
  696. //-------------------------------------------------
  697. void
  698. TransporterFacade::checkForceSend(Uint32 block_number) {  
  699.   m_threads.m_statusNext[numberToIndex(block_number)] = ThreadData::ACTIVE;
  700.   //-------------------------------------------------
  701.   // This code is an adaptive algorithm to discover when
  702.   // the API should actually send its buffers. The reason
  703.   // is that the performance is highly dependent on the
  704.   // size of the writes over the communication network.
  705.   // Thus we try to ensure that the send size is as big
  706.   // as possible. At the same time we don't want response
  707.   // time to increase so therefore we have to keep track of
  708.   // how the users are performing adaptively.
  709.   //-------------------------------------------------
  710.   
  711.   if (theTransporterRegistry->forceSendCheck(currentSendLimit) == 1) {
  712.     sendPerformedLastInterval = 1;
  713.   }
  714.   checkCounter--;
  715.   if (checkCounter < 0) {
  716.     calculateSendLimit();
  717.   }
  718. }
  719. /******************************************************************************
  720.  * SEND SIGNAL METHODS
  721.  *****************************************************************************/
  722. int
  723. TransporterFacade::sendSignal(NdbApiSignal * aSignal, NodeId aNode){
  724.   Uint32* tDataPtr = aSignal->getDataPtrSend();
  725.   Uint32 Tlen = aSignal->theLength;
  726.   Uint32 TBno = aSignal->theReceiversBlockNumber;
  727.   if(getIsNodeSendable(aNode) == true){
  728. #ifdef API_TRACE
  729.     if(setSignalLog() && TRACE_GSN(aSignal->theVerId_signalNumber)){
  730.       Uint32 tmp = aSignal->theSendersBlockRef;
  731.       aSignal->theSendersBlockRef = numberToRef(tmp, theOwnId);
  732.       LinearSectionPtr ptr[3];
  733.       signalLogger.sendSignal(* aSignal,
  734.       1,
  735.       aSignal->getDataPtr(),
  736.       aNode, ptr, 0);
  737.       signalLogger.flushSignalLog();
  738.       aSignal->theSendersBlockRef = tmp;
  739.     }
  740. #endif
  741.     if ((Tlen != 0) && (Tlen <= 25) && (TBno != 0)) {
  742.       SendStatus ss = theTransporterRegistry->prepareSend(aSignal, 
  743.   1, // JBB
  744.   tDataPtr, 
  745.   aNode, 
  746.   0);
  747.       //if (ss != SEND_OK) ndbout << ss << endl;
  748.       return (ss == SEND_OK ? 0 : -1);
  749.     } else {
  750.       ndbout << "ERR: SigLen = " << Tlen << " BlockRec = " << TBno;
  751.       ndbout << " SignalNo = " << aSignal->theVerId_signalNumber << endl;
  752.       assert(0);
  753.     }//if
  754.   }
  755.   //const ClusterMgr::Node & node = theClusterMgr->getNodeInfo(aNode);
  756.   //const Uint32 startLevel = node.m_state.startLevel;
  757.   return -1; // Node Dead
  758. }
  759. int
  760. TransporterFacade::sendSignalUnCond(NdbApiSignal * aSignal, NodeId aNode){
  761. #ifdef API_TRACE
  762.   if(setSignalLog() && TRACE_GSN(aSignal->theVerId_signalNumber)){
  763.     Uint32 tmp = aSignal->theSendersBlockRef;
  764.     aSignal->theSendersBlockRef = numberToRef(tmp, theOwnId);
  765.     LinearSectionPtr ptr[3];
  766.     signalLogger.sendSignal(* aSignal,
  767.     0,
  768.     aSignal->getDataPtr(),
  769.     aNode, ptr, 0);
  770.     signalLogger.flushSignalLog();
  771.     aSignal->theSendersBlockRef = tmp;
  772.   }
  773. #endif
  774.   assert((aSignal->theLength != 0) &&
  775.          (aSignal->theLength <= 25) &&
  776.          (aSignal->theReceiversBlockNumber != 0));
  777.   SendStatus ss = theTransporterRegistry->prepareSend(aSignal, 
  778.       0, 
  779.       aSignal->getDataPtr(), 
  780.       aNode, 
  781.       0);
  782.   
  783.   return (ss == SEND_OK ? 0 : -1);
  784. }
  785. #define CHUNK_SZ NDB_SECTION_SEGMENT_SZ*64 // related to MAX_MESSAGE_SIZE
  786. int
  787. TransporterFacade::sendFragmentedSignal(NdbApiSignal* aSignal, NodeId aNode, 
  788. LinearSectionPtr ptr[3], Uint32 secs)
  789. {
  790.   if(getIsNodeSendable(aNode) != true)
  791.     return -1;
  792. #ifdef API_TRACE
  793.   if(setSignalLog() && TRACE_GSN(aSignal->theVerId_signalNumber)){
  794.     Uint32 tmp = aSignal->theSendersBlockRef;
  795.     aSignal->theSendersBlockRef = numberToRef(tmp, theOwnId);
  796.     signalLogger.sendSignal(* aSignal,
  797.     1,
  798.     aSignal->getDataPtrSend(),
  799.     aNode,
  800.     ptr, secs);
  801.     aSignal->theSendersBlockRef = tmp;
  802.   }
  803. #endif
  804.   NdbApiSignal tmp_signal(*(SignalHeader*)aSignal);
  805.   LinearSectionPtr tmp_ptr[3];
  806.   Uint32 unique_id= m_fragmented_signal_id++; // next unique id
  807.   unsigned i;
  808.   for (i= 0; i < secs; i++)
  809.     tmp_ptr[i]= ptr[i];
  810.   unsigned start_i= 0;
  811.   unsigned chunk_sz= 0;
  812.   unsigned fragment_info= 0;
  813.   Uint32 *tmp_data= tmp_signal.getDataPtrSend();
  814.   for (i= 0; i < secs;) {
  815.     unsigned save_sz= tmp_ptr[i].sz;
  816.     tmp_data[i-start_i]= i;
  817.     if (chunk_sz + save_sz > CHUNK_SZ) {
  818.       // truncate
  819.       unsigned send_sz= CHUNK_SZ - chunk_sz;
  820.       if (i != start_i) // first piece of a new section has to be a multiple of NDB_SECTION_SEGMENT_SZ
  821.       {
  822. send_sz=
  823.   NDB_SECTION_SEGMENT_SZ
  824.   *(send_sz+NDB_SECTION_SEGMENT_SZ-1)
  825.   /NDB_SECTION_SEGMENT_SZ;
  826. if (send_sz > save_sz)
  827.   send_sz= save_sz;
  828.       }
  829.       tmp_ptr[i].sz= send_sz;
  830.       
  831.       if (fragment_info < 2) // 1 = first fragment, 2 = middle fragments
  832. fragment_info++;
  833.       // send tmp_signal
  834.       tmp_data[i-start_i+1]= unique_id;
  835.       tmp_signal.setLength(i-start_i+2);
  836.       tmp_signal.m_fragmentInfo= fragment_info;
  837.       tmp_signal.m_noOfSections= i-start_i+1;
  838.       // do prepare send
  839.       {
  840. SendStatus ss = theTransporterRegistry->prepareSend
  841.   (&tmp_signal, 
  842.    1, /*JBB*/
  843.    tmp_data,
  844.    aNode, 
  845.    &tmp_ptr[start_i]);
  846. assert(ss != SEND_MESSAGE_TOO_BIG);
  847. if (ss != SEND_OK) return -1;
  848.       }
  849.       // setup variables for next signal
  850.       start_i= i;
  851.       chunk_sz= 0;
  852.       tmp_ptr[i].sz= save_sz-send_sz;
  853.       tmp_ptr[i].p+= send_sz;
  854.       if (tmp_ptr[i].sz == 0)
  855. i++;
  856.     }
  857.     else
  858.     {
  859.       chunk_sz+=save_sz;
  860.       i++;
  861.     }
  862.   }
  863.   unsigned a_sz= aSignal->getLength();
  864.   if (fragment_info > 0) {
  865.     // update the original signal to include section info
  866.     Uint32 *a_data= aSignal->getDataPtrSend();
  867.     unsigned tmp_sz= i-start_i;
  868.     memcpy(a_data+a_sz,
  869.    tmp_data,
  870.    tmp_sz*sizeof(Uint32));
  871.     a_data[a_sz+tmp_sz]= unique_id;
  872.     aSignal->setLength(a_sz+tmp_sz+1);
  873.     // send last fragment
  874.     aSignal->m_fragmentInfo= 3; // 3 = last fragment
  875.     aSignal->m_noOfSections= i-start_i;
  876.   } else {
  877.     aSignal->m_noOfSections= secs;
  878.   }
  879.   // send aSignal
  880.   int ret;
  881.   {
  882.     SendStatus ss = theTransporterRegistry->prepareSend
  883.       (aSignal,
  884.        1/*JBB*/,
  885.        aSignal->getDataPtrSend(),
  886.        aNode,
  887.        &tmp_ptr[start_i]);
  888.     assert(ss != SEND_MESSAGE_TOO_BIG);
  889.     ret = (ss == SEND_OK ? 0 : -1);
  890.   }
  891.   aSignal->m_noOfSections = 0;
  892.   aSignal->m_fragmentInfo = 0;
  893.   aSignal->setLength(a_sz);
  894.   return ret;
  895. }
  896. int
  897. TransporterFacade::sendSignal(NdbApiSignal* aSignal, NodeId aNode, 
  898.       LinearSectionPtr ptr[3], Uint32 secs){
  899.   aSignal->m_noOfSections = secs;
  900.   if(getIsNodeSendable(aNode) == true){
  901. #ifdef API_TRACE
  902.     if(setSignalLog() && TRACE_GSN(aSignal->theVerId_signalNumber)){
  903.       Uint32 tmp = aSignal->theSendersBlockRef;
  904.       aSignal->theSendersBlockRef = numberToRef(tmp, theOwnId);
  905.       signalLogger.sendSignal(* aSignal,
  906.       1,
  907.       aSignal->getDataPtrSend(),
  908.       aNode,
  909.                               ptr, secs);
  910.       signalLogger.flushSignalLog();
  911.       aSignal->theSendersBlockRef = tmp;
  912.     }
  913. #endif
  914.     SendStatus ss = theTransporterRegistry->prepareSend
  915.       (aSignal, 
  916.        1, // JBB
  917.        aSignal->getDataPtrSend(),
  918.        aNode, 
  919.        ptr);
  920.     assert(ss != SEND_MESSAGE_TOO_BIG);
  921.     aSignal->m_noOfSections = 0;
  922.     return (ss == SEND_OK ? 0 : -1);
  923.   }
  924.   aSignal->m_noOfSections = 0;
  925.   return -1;
  926. }
  927. /******************************************************************************
  928.  * CONNECTION METHODS  Etc
  929.  ******************************************************************************/
  930. void
  931. TransporterFacade::doConnect(int aNodeId){
  932.   theTransporterRegistry->setIOState(aNodeId, NoHalt);
  933.   theTransporterRegistry->do_connect(aNodeId);
  934. }
  935. void
  936. TransporterFacade::doDisconnect(int aNodeId)
  937. {
  938.   theTransporterRegistry->do_disconnect(aNodeId);
  939. }
  940. void
  941. TransporterFacade::reportConnected(int aNodeId)
  942. {
  943.   theClusterMgr->reportConnected(aNodeId);
  944.   return;
  945. }
  946. void
  947. TransporterFacade::reportDisconnected(int aNodeId)
  948. {
  949.   theClusterMgr->reportDisconnected(aNodeId);
  950.   return;
  951. }
  952. NodeId
  953. TransporterFacade::ownId() const
  954. {
  955.   return theOwnId;
  956. }
  957. bool
  958. TransporterFacade::isConnected(NodeId aNodeId){
  959.   return theTransporterRegistry->is_connected(aNodeId);
  960. }
  961. NodeId
  962. TransporterFacade::get_an_alive_node()
  963. {
  964.   DBUG_ENTER("TransporterFacade::get_an_alive_node");
  965.   DBUG_PRINT("enter", ("theStartNodeId: %d", theStartNodeId));
  966. #ifdef VM_TRACE
  967.   const char* p = NdbEnv_GetEnv("NDB_ALIVE_NODE_ID", (char*)0, 0);
  968.   if (p != 0 && *p != 0)
  969.     return atoi(p);
  970. #endif
  971.   NodeId i;
  972.   for (i = theStartNodeId; i < MAX_NDB_NODES; i++) {
  973.     if (get_node_alive(i)){
  974.       DBUG_PRINT("info", ("Node %d is alive", i));
  975.       theStartNodeId = ((i + 1) % MAX_NDB_NODES);
  976.       DBUG_RETURN(i);
  977.     }
  978.   }
  979.   for (i = 1; i < theStartNodeId; i++) {
  980.     if (get_node_alive(i)){
  981.       DBUG_PRINT("info", ("Node %d is alive", i));
  982.       theStartNodeId = ((i + 1) % MAX_NDB_NODES);
  983.       DBUG_RETURN(i);
  984.     }
  985.   }
  986.   DBUG_RETURN((NodeId)0);
  987. }
  988. TransporterFacade::ThreadData::ThreadData(Uint32 size){
  989.   m_firstFree = END_OF_LIST;
  990.   expand(size);
  991. }
  992. void
  993. TransporterFacade::ThreadData::expand(Uint32 size){
  994.   Object_Execute oe = { 0 ,0 };
  995.   NodeStatusFunction fun = 0;
  996.   const Uint32 sz = m_statusNext.size();
  997.   m_objectExecute.fill(sz + size, oe);
  998.   m_statusFunction.fill(sz + size, fun);
  999.   for(Uint32 i = 0; i<size; i++){
  1000.     m_statusNext.push_back(sz + i + 1);
  1001.   }
  1002.   m_statusNext.back() = m_firstFree;
  1003.   m_firstFree = m_statusNext.size() - size;
  1004. }
  1005. int
  1006. TransporterFacade::ThreadData::open(void* objRef, 
  1007.     ExecuteFunction fun, 
  1008.     NodeStatusFunction fun2)
  1009. {
  1010.   Uint32 nextFree = m_firstFree;
  1011.   if(m_statusNext.size() >= MAX_NO_THREADS && nextFree == END_OF_LIST){
  1012.     return -1;
  1013.   }
  1014.   
  1015.   if(nextFree == END_OF_LIST){
  1016.     expand(10);
  1017.     nextFree = m_firstFree;
  1018.   }
  1019.   
  1020.   m_firstFree = m_statusNext[nextFree];
  1021.   
  1022.   Object_Execute oe = { objRef , fun };
  1023.   m_statusNext[nextFree] = INACTIVE;
  1024.   m_objectExecute[nextFree] = oe;
  1025.   m_statusFunction[nextFree] = fun2;
  1026.   return indexToNumber(nextFree);
  1027. }
  1028. int
  1029. TransporterFacade::ThreadData::close(int number){
  1030.   number= numberToIndex(number);
  1031.   assert(getInUse(number));
  1032.   m_statusNext[number] = m_firstFree;
  1033.   m_firstFree = number;
  1034.   Object_Execute oe = { 0, 0 };
  1035.   m_objectExecute[number] = oe;
  1036.   m_statusFunction[number] = 0;
  1037.   return 0;
  1038. }
  1039. template class Vector<NodeStatusFunction>;
  1040. template class Vector<TransporterFacade::ThreadData::Object_Execute>;