TransporterRegistry.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:35k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #include <ndb_global.h>
  14. #include <my_pthread.h>
  15. #include <TransporterRegistry.hpp>
  16. #include "TransporterInternalDefinitions.hpp"
  17. #include "Transporter.hpp"
  18. #include <SocketAuthenticator.hpp>
  19. #ifdef NDB_TCP_TRANSPORTER
  20. #include "TCP_Transporter.hpp"
  21. #endif
  22. #ifdef NDB_OSE_TRANSPORTER
  23. #include "OSE_Receiver.hpp"
  24. #include "OSE_Transporter.hpp"
  25. #endif
  26. #ifdef NDB_SCI_TRANSPORTER
  27. #include "SCI_Transporter.hpp"
  28. #endif
  29. #ifdef NDB_SHM_TRANSPORTER
  30. #include "SHM_Transporter.hpp"
  31. extern int g_ndb_shm_signum;
  32. #endif
  33. #include "TransporterCallback.hpp"
  34. #include "NdbOut.hpp"
  35. #include <NdbSleep.h>
  36. #include <NdbTick.h>
  37. #include <InputStream.hpp>
  38. #include <OutputStream.hpp>
  39. #include <EventLogger.hpp>
  40. extern EventLogger g_eventLogger;
  41. struct in_addr
  42. TransporterRegistry::get_connect_address(NodeId node_id) const
  43. {
  44.   return theTransporters[node_id]->m_connect_address;
  45. }
  46. SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd)
  47. {
  48.   DBUG_ENTER("SocketServer::Session * TransporterService::newSession");
  49.   if (m_auth && !m_auth->server_authenticate(sockfd)){
  50.     NDB_CLOSE_SOCKET(sockfd);
  51.     DBUG_RETURN(0);
  52.   }
  53.   if (!m_transporter_registry->connect_server(sockfd))
  54.   {
  55.     NDB_CLOSE_SOCKET(sockfd);
  56.     DBUG_RETURN(0);
  57.   }
  58.   DBUG_RETURN(0);
  59. }
  60. TransporterRegistry::TransporterRegistry(void * callback,
  61.  unsigned _maxTransporters,
  62.  unsigned sizeOfLongSignalMemory)
  63. {
  64.   DBUG_ENTER("TransporterRegistry::TransporterRegistry");
  65.   nodeIdSpecified = false;
  66.   maxTransporters = _maxTransporters;
  67.   sendCounter = 1;
  68.   
  69.   callbackObj=callback;
  70.   theTCPTransporters  = new TCP_Transporter * [maxTransporters];
  71.   theSCITransporters  = new SCI_Transporter * [maxTransporters];
  72.   theSHMTransporters  = new SHM_Transporter * [maxTransporters];
  73.   theOSETransporters  = new OSE_Transporter * [maxTransporters];
  74.   theTransporterTypes = new TransporterType   [maxTransporters];
  75.   theTransporters     = new Transporter     * [maxTransporters];
  76.   performStates       = new PerformState      [maxTransporters];
  77.   ioStates            = new IOState           [maxTransporters]; 
  78.   
  79.   // Initialize member variables
  80.   nTransporters    = 0;
  81.   nTCPTransporters = 0;
  82.   nSCITransporters = 0;
  83.   nSHMTransporters = 0;
  84.   nOSETransporters = 0;
  85.   
  86.   // Initialize the transporter arrays
  87.   for (unsigned i=0; i<maxTransporters; i++) {
  88.     theTCPTransporters[i] = NULL;
  89.     theSCITransporters[i] = NULL;
  90.     theSHMTransporters[i] = NULL;
  91.     theOSETransporters[i] = NULL;
  92.     theTransporters[i]    = NULL;
  93.     performStates[i]      = DISCONNECTED;
  94.     ioStates[i]           = NoHalt;
  95.   }
  96.   theOSEReceiver = 0;
  97.   theOSEJunkSocketSend = 0;
  98.   theOSEJunkSocketRecv = 0;
  99.   DBUG_VOID_RETURN;
  100. }
  101. TransporterRegistry::~TransporterRegistry()
  102. {
  103.   DBUG_ENTER("TransporterRegistry::~TransporterRegistry");
  104.   
  105.   removeAll();
  106.   
  107.   delete[] theTCPTransporters;
  108.   delete[] theSCITransporters;
  109.   delete[] theSHMTransporters;
  110.   delete[] theOSETransporters;
  111.   delete[] theTransporterTypes;
  112.   delete[] theTransporters;
  113.   delete[] performStates;
  114.   delete[] ioStates;
  115. #ifdef NDB_OSE_TRANSPORTER
  116.   if(theOSEReceiver != NULL){
  117.     theOSEReceiver->destroyPhantom();
  118.     delete theOSEReceiver;
  119.     theOSEReceiver = 0;
  120.   }
  121. #endif
  122.   DBUG_VOID_RETURN;
  123. }
  124. void
  125. TransporterRegistry::removeAll(){
  126.   for(unsigned i = 0; i<maxTransporters; i++){
  127.     if(theTransporters[i] != NULL)
  128.       removeTransporter(theTransporters[i]->getRemoteNodeId());
  129.   }
  130. }
  131. void
  132. TransporterRegistry::disconnectAll(){
  133.   for(unsigned i = 0; i<maxTransporters; i++){
  134.     if(theTransporters[i] != NULL)
  135.       theTransporters[i]->doDisconnect();
  136.   }
  137. }
  138. bool
  139. TransporterRegistry::init(NodeId nodeId) {
  140.   DBUG_ENTER("TransporterRegistry::init");
  141.   nodeIdSpecified = true;
  142.   localNodeId = nodeId;
  143.   
  144.   DEBUG("TransporterRegistry started node: " << localNodeId);
  145.   
  146.   DBUG_RETURN(true);
  147. }
  148. bool
  149. TransporterRegistry::connect_server(NDB_SOCKET_TYPE sockfd)
  150. {
  151.   DBUG_ENTER("TransporterRegistry::connect_server");
  152.   // read node id from client
  153.   // read transporter type
  154.   int nodeId, remote_transporter_type= -1;
  155.   SocketInputStream s_input(sockfd);
  156.   char buf[256];
  157.   if (s_input.gets(buf, 256) == 0) {
  158.     DBUG_PRINT("error", ("Could not get node id from client"));
  159.     DBUG_RETURN(false);
  160.   }
  161.   int r= sscanf(buf, "%d %d", &nodeId, &remote_transporter_type);
  162.   switch (r) {
  163.   case 2:
  164.     break;
  165.   case 1:
  166.     // we're running version prior to 4.1.9
  167.     // ok, but with no checks on transporter configuration compatability
  168.     break;
  169.   default:
  170.     DBUG_PRINT("error", ("Error in node id from client"));
  171.     DBUG_RETURN(false);
  172.   }
  173.   DBUG_PRINT("info", ("nodeId=%d remote_transporter_type=%d",
  174.       nodeId,remote_transporter_type));
  175.   //check that nodeid is valid and that there is an allocated transporter
  176.   if ( nodeId < 0 || nodeId >= (int)maxTransporters) {
  177.     DBUG_PRINT("error", ("Node id out of range from client"));
  178.     DBUG_RETURN(false);
  179.   }
  180.   if (theTransporters[nodeId] == 0) {
  181.       DBUG_PRINT("error", ("No transporter for this node id from client"));
  182.       DBUG_RETURN(false);
  183.   }
  184.   //check that the transporter should be connected
  185.   if (performStates[nodeId] != TransporterRegistry::CONNECTING) {
  186.     DBUG_PRINT("error", ("Transporter in wrong state for this node id from client"));
  187.     DBUG_RETURN(false);
  188.   }
  189.   Transporter *t= theTransporters[nodeId];
  190.   // send info about own id (just as response to acknowledge connection)
  191.   // send info on own transporter type
  192.   SocketOutputStream s_output(sockfd);
  193.   s_output.println("%d %d", t->getLocalNodeId(), t->m_type);
  194.   if (remote_transporter_type != -1)
  195.   {
  196.     if (remote_transporter_type != t->m_type)
  197.     {
  198.       DBUG_PRINT("error", ("Transporter types mismatch this=%d remote=%d",
  199.    t->m_type, remote_transporter_type));
  200.       g_eventLogger.error("Incompatible configuration: Transporter type "
  201.   "mismatch with node %d", nodeId);
  202.       // wait for socket close for 1 second to let message arrive at client
  203.       {
  204. fd_set a_set;
  205. FD_ZERO(&a_set);
  206. FD_SET(sockfd, &a_set);
  207. struct timeval timeout;
  208. timeout.tv_sec  = 1; timeout.tv_usec = 0;
  209. select(sockfd+1, &a_set, 0, 0, &timeout);
  210.       }
  211.       DBUG_RETURN(false);
  212.     }
  213.   }
  214.   else if (t->m_type == tt_SHM_TRANSPORTER)
  215.   {
  216.     g_eventLogger.warning("Unable to verify transporter compatability with node %d", nodeId);
  217.   }
  218.   // setup transporter (transporter responsible for closing sockfd)
  219.   t->connect_server(sockfd);
  220.   DBUG_RETURN(true);
  221. }
  222. bool
  223. TransporterRegistry::createTransporter(TCP_TransporterConfiguration *config) {
  224. #ifdef NDB_TCP_TRANSPORTER
  225.   if(!nodeIdSpecified){
  226.     init(config->localNodeId);
  227.   }
  228.   
  229.   if(config->localNodeId != localNodeId) 
  230.     return false;
  231.   
  232.   if(theTransporters[config->remoteNodeId] != NULL)
  233.     return false;
  234.    
  235.   TCP_Transporter * t = new TCP_Transporter(*this,
  236.     config->sendBufferSize,
  237.     config->maxReceiveSize,
  238.     config->localHostName,
  239.     config->remoteHostName,
  240.     config->port,
  241.     localNodeId,
  242.     config->remoteNodeId,
  243.     config->checksum,
  244.     config->signalId);
  245.   if (t == NULL) 
  246.     return false;
  247.   else if (!t->initTransporter()) {
  248.     delete t;
  249.     return false;
  250.   }
  251.   // Put the transporter in the transporter arrays
  252.   theTCPTransporters[nTCPTransporters]      = t;
  253.   theTransporters[t->getRemoteNodeId()]     = t;
  254.   theTransporterTypes[t->getRemoteNodeId()] = tt_TCP_TRANSPORTER;
  255.   performStates[t->getRemoteNodeId()]       = DISCONNECTED;
  256.   nTransporters++;
  257.   nTCPTransporters++;
  258. #if defined NDB_OSE || defined NDB_SOFTOSE
  259.   t->theReceiverPid = theReceiverPid;
  260. #endif
  261.   
  262.   return true;
  263. #else
  264.   return false;
  265. #endif
  266. }
  267. bool
  268. TransporterRegistry::createTransporter(OSE_TransporterConfiguration *conf) {
  269. #ifdef NDB_OSE_TRANSPORTER
  270.   if(!nodeIdSpecified){
  271.     init(conf->localNodeId);
  272.   }
  273.   
  274.   if(conf->localNodeId != localNodeId)
  275.     return false;
  276.   
  277.   if(theTransporters[conf->remoteNodeId] != NULL)
  278.     return false;
  279.   if(theOSEReceiver == NULL){
  280.     theOSEReceiver = new OSE_Receiver(this,
  281.       10,
  282.       localNodeId);
  283.   }
  284.   
  285.   OSE_Transporter * t = new OSE_Transporter(conf->prioASignalSize,
  286.     conf->prioBSignalSize,
  287.     localNodeId,
  288.     conf->localHostName,
  289.     conf->remoteNodeId,
  290.     conf->remoteHostName,
  291.     conf->checksum,
  292.     conf->signalId);
  293.   if (t == NULL)
  294.     return false;
  295.   else if (!t->initTransporter()) {
  296.     delete t;
  297.     return false;
  298.   }
  299.   // Put the transporter in the transporter arrays
  300.   theOSETransporters[nOSETransporters]      = t;
  301.   theTransporters[t->getRemoteNodeId()]     = t;
  302.   theTransporterTypes[t->getRemoteNodeId()] = tt_OSE_TRANSPORTER;
  303.   performStates[t->getRemoteNodeId()]       = DISCONNECTED;
  304.   
  305.   nTransporters++;
  306.   nOSETransporters++;
  307.   return true;
  308. #else
  309.   return false;
  310. #endif
  311. }
  312. bool
  313. TransporterRegistry::createTransporter(SCI_TransporterConfiguration *config) {
  314. #ifdef NDB_SCI_TRANSPORTER
  315.   if(!SCI_Transporter::initSCI())
  316.     abort();
  317.   
  318.   if(!nodeIdSpecified){
  319.     init(config->localNodeId);
  320.   }
  321.   
  322.   if(config->localNodeId != localNodeId)
  323.     return false;
  324.  
  325.   if(theTransporters[config->remoteNodeId] != NULL)
  326.     return false;
  327.  
  328.   SCI_Transporter * t = new SCI_Transporter(*this,
  329.                                             config->localHostName,
  330.                                             config->remoteHostName,
  331.                                             config->port,
  332.                                             config->sendLimit, 
  333.     config->bufferSize,
  334.     config->nLocalAdapters,
  335.     config->remoteSciNodeId0,
  336.     config->remoteSciNodeId1,
  337.     localNodeId,
  338.     config->remoteNodeId,
  339.     config->checksum,
  340.     config->signalId);
  341.   
  342.   if (t == NULL) 
  343.     return false;
  344.   else if (!t->initTransporter()) {
  345.     delete t;
  346.     return false;
  347.   }
  348.   // Put the transporter in the transporter arrays
  349.   theSCITransporters[nSCITransporters]      = t;
  350.   theTransporters[t->getRemoteNodeId()]     = t;
  351.   theTransporterTypes[t->getRemoteNodeId()] = tt_SCI_TRANSPORTER;
  352.   performStates[t->getRemoteNodeId()]       = DISCONNECTED;
  353.   nTransporters++;
  354.   nSCITransporters++;
  355.   
  356.   return true;
  357. #else
  358.   return false;
  359. #endif
  360. }
  361. bool
  362. TransporterRegistry::createTransporter(SHM_TransporterConfiguration *config) {
  363.   DBUG_ENTER("TransporterRegistry::createTransporter SHM");
  364. #ifdef NDB_SHM_TRANSPORTER
  365.   if(!nodeIdSpecified){
  366.     init(config->localNodeId);
  367.   }
  368.   
  369.   if(config->localNodeId != localNodeId)
  370.     return false;
  371.   
  372.   if (!g_ndb_shm_signum) {
  373.     g_ndb_shm_signum= config->signum;
  374.     DBUG_PRINT("info",("Block signum %d",g_ndb_shm_signum));
  375.     /**
  376.      * Make sure to block g_ndb_shm_signum
  377.      *   TransporterRegistry::init is run from "main" thread
  378.      */
  379.     sigset_t mask;
  380.     sigemptyset(&mask);
  381.     sigaddset(&mask, g_ndb_shm_signum);
  382.     pthread_sigmask(SIG_BLOCK, &mask, 0);
  383.   }
  384.   if(config->signum != g_ndb_shm_signum)
  385.     return false;
  386.   
  387.   if(theTransporters[config->remoteNodeId] != NULL)
  388.     return false;
  389.   SHM_Transporter * t = new SHM_Transporter(*this,
  390.     config->localHostName,
  391.     config->remoteHostName,
  392.     config->port,
  393.     localNodeId,
  394.     config->remoteNodeId,
  395.     config->checksum,
  396.     config->signalId,
  397.     config->shmKey,
  398.     config->shmSize
  399.     );
  400.   if (t == NULL)
  401.     return false;
  402.   else if (!t->initTransporter()) {
  403.     delete t;
  404.     return false;
  405.   }
  406.   // Put the transporter in the transporter arrays
  407.   theSHMTransporters[nSHMTransporters]      = t;
  408.   theTransporters[t->getRemoteNodeId()]     = t;
  409.   theTransporterTypes[t->getRemoteNodeId()] = tt_SHM_TRANSPORTER;
  410.   performStates[t->getRemoteNodeId()]       = DISCONNECTED;
  411.   
  412.   nTransporters++;
  413.   nSHMTransporters++;
  414.   DBUG_RETURN(true);
  415. #else
  416.   DBUG_RETURN(false);
  417. #endif
  418. }
  419. void
  420. TransporterRegistry::removeTransporter(NodeId nodeId) {
  421.   DEBUG("Removing transporter from " << localNodeId
  422. << " to " << nodeId);
  423.   
  424.   if(theTransporters[nodeId] == NULL)
  425.     return;
  426.   
  427.   theTransporters[nodeId]->doDisconnect();
  428.   
  429.   const TransporterType type = theTransporterTypes[nodeId];
  430.   int ind = 0;
  431.   switch(type){
  432.   case tt_TCP_TRANSPORTER:
  433. #ifdef NDB_TCP_TRANSPORTER
  434.     for(; ind < nTCPTransporters; ind++)
  435.       if(theTCPTransporters[ind]->getRemoteNodeId() == nodeId)
  436. break;
  437.     ind++;
  438.     for(; ind<nTCPTransporters; ind++)
  439.       theTCPTransporters[ind-1] = theTCPTransporters[ind];
  440.     nTCPTransporters --;
  441. #endif
  442.     break;
  443.   case tt_SCI_TRANSPORTER:
  444. #ifdef NDB_SCI_TRANSPORTER
  445.     for(; ind < nSCITransporters; ind++)
  446.       if(theSCITransporters[ind]->getRemoteNodeId() == nodeId)
  447. break;
  448.     ind++;
  449.     for(; ind<nSCITransporters; ind++)
  450.       theSCITransporters[ind-1] = theSCITransporters[ind];
  451.     nSCITransporters --;
  452. #endif
  453.     break;
  454.   case tt_SHM_TRANSPORTER:
  455. #ifdef NDB_SHM_TRANSPORTER
  456.     for(; ind < nSHMTransporters; ind++)
  457.       if(theSHMTransporters[ind]->getRemoteNodeId() == nodeId)
  458. break;
  459.     ind++;
  460.     for(; ind<nSHMTransporters; ind++)
  461.       theSHMTransporters[ind-1] = theSHMTransporters[ind];
  462.     nSHMTransporters --;
  463. #endif
  464.     break;
  465.   case tt_OSE_TRANSPORTER:
  466. #ifdef NDB_OSE_TRANSPORTER
  467.     for(; ind < nOSETransporters; ind++)
  468.       if(theOSETransporters[ind]->getRemoteNodeId() == nodeId)
  469. break;
  470.     ind++;
  471.     for(; ind<nOSETransporters; ind++)
  472.       theOSETransporters[ind-1] = theOSETransporters[ind];
  473.     nOSETransporters --;
  474. #endif
  475.     break;
  476.   }
  477.   
  478.   nTransporters--;
  479.   // Delete the transporter and remove it from theTransporters array
  480.   delete theTransporters[nodeId];
  481.   theTransporters[nodeId] = NULL;        
  482. }
  483. Uint32
  484. TransporterRegistry::get_free_buffer(Uint32 node) const
  485. {
  486.   Transporter *t;
  487.   if(likely((t = theTransporters[node]) != 0))
  488.   {
  489.     return t->get_free_buffer();
  490.   }
  491.   return 0;
  492. }
  493. SendStatus
  494. TransporterRegistry::prepareSend(const SignalHeader * const signalHeader, 
  495.  Uint8 prio,
  496.  const Uint32 * const signalData,
  497.  NodeId nodeId, 
  498.  const LinearSectionPtr ptr[3]){
  499.   Transporter *t = theTransporters[nodeId];
  500.   if(t != NULL && 
  501.      (((ioStates[nodeId] != HaltOutput) && (ioStates[nodeId] != HaltIO)) || 
  502.       ((signalHeader->theReceiversBlockNumber == 252) ||
  503.        (signalHeader->theReceiversBlockNumber == 4002)))) {
  504.  
  505.     if(t->isConnected()){
  506.       Uint32 lenBytes = t->m_packer.getMessageLength(signalHeader, ptr);
  507.       if(lenBytes <= MAX_MESSAGE_SIZE){
  508. Uint32 * insertPtr = t->getWritePtr(lenBytes, prio);
  509. if(insertPtr != 0){
  510.   t->m_packer.pack(insertPtr, prio, signalHeader, signalData, ptr);
  511.   t->updateWritePtr(lenBytes, prio);
  512.   return SEND_OK;
  513. }
  514. int sleepTime = 2;
  515. /**
  516.  * @note: on linux/i386 the granularity is 10ms
  517.  *        so sleepTime = 2 generates a 10 ms sleep.
  518.  */
  519. for(int i = 0; i<50; i++){
  520.   if((nSHMTransporters+nSCITransporters) == 0)
  521.     NdbSleep_MilliSleep(sleepTime); 
  522.   insertPtr = t->getWritePtr(lenBytes, prio);
  523.   if(insertPtr != 0){
  524.     t->m_packer.pack(insertPtr, prio, signalHeader, signalData, ptr);
  525.     t->updateWritePtr(lenBytes, prio);
  526.     break;
  527.   }
  528. }
  529. if(insertPtr != 0){
  530.   /**
  531.    * Send buffer full, but resend works
  532.    */
  533.   reportError(callbackObj, nodeId, TE_SEND_BUFFER_FULL);
  534.   return SEND_OK;
  535. }
  536. WARNING("Signal to " << nodeId << " lost(buffer)");
  537. reportError(callbackObj, nodeId, TE_SIGNAL_LOST_SEND_BUFFER_FULL);
  538. return SEND_BUFFER_FULL;
  539.       } else {
  540. return SEND_MESSAGE_TOO_BIG;
  541.       }
  542.     } else {
  543.       DEBUG("Signal to " << nodeId << " lost(disconnect) ");
  544.       return SEND_DISCONNECTED;
  545.     }
  546.   } else {
  547.     DEBUG("Discarding message to block: " 
  548.   << signalHeader->theReceiversBlockNumber 
  549.   << " node: " << nodeId);
  550.     
  551.     if(t == NULL)
  552.       return SEND_UNKNOWN_NODE;
  553.     
  554.     return SEND_BLOCKED;
  555.   }
  556. }
  557. SendStatus
  558. TransporterRegistry::prepareSend(const SignalHeader * const signalHeader, 
  559.  Uint8 prio,
  560.  const Uint32 * const signalData,
  561.  NodeId nodeId, 
  562.  class SectionSegmentPool & thePool,
  563.  const SegmentedSectionPtr ptr[3]){
  564.   
  565.   Transporter *t = theTransporters[nodeId];
  566.   if(t != NULL && 
  567.      (((ioStates[nodeId] != HaltOutput) && (ioStates[nodeId] != HaltIO)) || 
  568.       ((signalHeader->theReceiversBlockNumber == 252)|| 
  569.        (signalHeader->theReceiversBlockNumber == 4002)))) {
  570.     
  571.     if(t->isConnected()){
  572.       Uint32 lenBytes = t->m_packer.getMessageLength(signalHeader, ptr);
  573.       if(lenBytes <= MAX_MESSAGE_SIZE){
  574. Uint32 * insertPtr = t->getWritePtr(lenBytes, prio);
  575. if(insertPtr != 0){
  576.   t->m_packer.pack(insertPtr, prio, signalHeader, signalData, thePool, ptr);
  577.   t->updateWritePtr(lenBytes, prio);
  578.   return SEND_OK;
  579. }
  580. /**
  581.  * @note: on linux/i386 the granularity is 10ms
  582.  *        so sleepTime = 2 generates a 10 ms sleep.
  583.  */
  584. int sleepTime = 2;
  585. for(int i = 0; i<50; i++){
  586.   if((nSHMTransporters+nSCITransporters) == 0)
  587.     NdbSleep_MilliSleep(sleepTime); 
  588.   insertPtr = t->getWritePtr(lenBytes, prio);
  589.   if(insertPtr != 0){
  590.     t->m_packer.pack(insertPtr, prio, signalHeader, signalData, thePool, ptr);
  591.     t->updateWritePtr(lenBytes, prio);
  592.     break;
  593.   }
  594. }
  595. if(insertPtr != 0){
  596.   /**
  597.    * Send buffer full, but resend works
  598.    */
  599.   reportError(callbackObj, nodeId, TE_SEND_BUFFER_FULL);
  600.   return SEND_OK;
  601. }
  602. WARNING("Signal to " << nodeId << " lost(buffer)");
  603. reportError(callbackObj, nodeId, TE_SIGNAL_LOST_SEND_BUFFER_FULL);
  604. return SEND_BUFFER_FULL;
  605.       } else {
  606. return SEND_MESSAGE_TOO_BIG;
  607.       }
  608.     } else {
  609.       DEBUG("Signal to " << nodeId << " lost(disconnect) ");
  610.       return SEND_DISCONNECTED;
  611.     }
  612.   } else {
  613.     DEBUG("Discarding message to block: " 
  614.   << signalHeader->theReceiversBlockNumber 
  615.   << " node: " << nodeId);
  616.     
  617.     if(t == NULL)
  618.       return SEND_UNKNOWN_NODE;
  619.     
  620.     return SEND_BLOCKED;
  621.   }
  622. }
  623. void
  624. TransporterRegistry::external_IO(Uint32 timeOutMillis) {
  625.   //-----------------------------------------------------------
  626.   // Most of the time we will send the buffers here and then wait
  627.   // for new signals. Thus we start by sending without timeout
  628.   // followed by the receive part where we expect to sleep for
  629.   // a while.
  630.   //-----------------------------------------------------------
  631.   if(pollReceive(timeOutMillis)){
  632.     performReceive();
  633.   }
  634.   performSend();
  635. }
  636. Uint32
  637. TransporterRegistry::pollReceive(Uint32 timeOutMillis){
  638.   Uint32 retVal = 0;
  639. #ifdef NDB_OSE_TRANSPORTER
  640.   retVal |= poll_OSE(timeOutMillis);
  641.   retVal |= poll_TCP(0);
  642.   return retVal;
  643. #endif
  644.   
  645.   if((nSCITransporters) > 0)
  646.   {
  647.     timeOutMillis=0;
  648.   }
  649. #ifdef NDB_SHM_TRANSPORTER
  650.   if(nSHMTransporters > 0)
  651.   {
  652.     Uint32 res = poll_SHM(0);
  653.     if(res)
  654.     {
  655.       retVal |= res;
  656.       timeOutMillis = 0;
  657.     }
  658.   }
  659. #endif
  660. #ifdef NDB_TCP_TRANSPORTER
  661.   if(nTCPTransporters > 0 || retVal == 0)
  662.   {
  663.     retVal |= poll_TCP(timeOutMillis);
  664.   }
  665.   else
  666.     tcpReadSelectReply = 0;
  667. #endif
  668. #ifdef NDB_SCI_TRANSPORTER
  669.   if(nSCITransporters > 0)
  670.     retVal |= poll_SCI(timeOutMillis);
  671. #endif
  672. #ifdef NDB_SHM_TRANSPORTER
  673.   if(nSHMTransporters > 0 && retVal == 0)
  674.   {
  675.     int res = poll_SHM(0);
  676.     retVal |= res;
  677.   }
  678. #endif
  679.   return retVal;
  680. }
  681. #ifdef NDB_SCI_TRANSPORTER
  682. Uint32
  683. TransporterRegistry::poll_SCI(Uint32 timeOutMillis)
  684. {
  685.   for (int i=0; i<nSCITransporters; i++) {
  686.     SCI_Transporter * t = theSCITransporters[i];
  687.     if (t->isConnected()) {
  688.       if(t->hasDataToRead())
  689. return 1;
  690.     }
  691.   }
  692.   return 0;
  693. }
  694. #endif
  695. #ifdef NDB_SHM_TRANSPORTER
  696. static int g_shm_counter = 0;
  697. Uint32
  698. TransporterRegistry::poll_SHM(Uint32 timeOutMillis)
  699. {  
  700.   for(int j=0; j < 100; j++)
  701.   {
  702.     for (int i=0; i<nSHMTransporters; i++) {
  703.       SHM_Transporter * t = theSHMTransporters[i];
  704.       if (t->isConnected()) {
  705. if(t->hasDataToRead()) {
  706.   return 1;
  707. }
  708.       }
  709.     }
  710.   }
  711.   return 0;
  712. }
  713. #endif
  714. #ifdef NDB_OSE_TRANSPORTER
  715. Uint32
  716. TransporterRegistry::poll_OSE(Uint32 timeOutMillis)
  717. {
  718.   if(theOSEReceiver != NULL){
  719.     return theOSEReceiver->doReceive(timeOutMillis);
  720.   }
  721.   NdbSleep_MilliSleep(timeOutMillis);
  722.   return 0;
  723. }
  724. #endif
  725. #ifdef NDB_TCP_TRANSPORTER
  726. Uint32 
  727. TransporterRegistry::poll_TCP(Uint32 timeOutMillis)
  728. {
  729.   if (false && nTCPTransporters == 0)
  730.   {
  731.     tcpReadSelectReply = 0;
  732.     return 0;
  733.   }
  734.   
  735.   struct timeval timeout;
  736. #ifdef NDB_OSE
  737.   // Return directly if there are no TCP transporters configured
  738.   
  739.   if(timeOutMillis <= 1){
  740.     timeout.tv_sec  = 0;
  741.     timeout.tv_usec = 1025;
  742.   } else {
  743.     timeout.tv_sec  = timeOutMillis / 1000;
  744.     timeout.tv_usec = (timeOutMillis % 1000) * 1000;
  745.   }
  746. #else  
  747.   timeout.tv_sec  = timeOutMillis / 1000;
  748.   timeout.tv_usec = (timeOutMillis % 1000) * 1000;
  749. #endif
  750.   NDB_SOCKET_TYPE maxSocketValue = -1;
  751.   
  752.   // Needed for TCP/IP connections
  753.   // The read- and writeset are used by select
  754.   
  755.   FD_ZERO(&tcpReadset);
  756.   // Prepare for sending and receiving
  757.   for (int i = 0; i < nTCPTransporters; i++) {
  758.     TCP_Transporter * t = theTCPTransporters[i];
  759.     
  760.     // If the transporter is connected
  761.     if (t->isConnected()) {
  762.       
  763.       const NDB_SOCKET_TYPE socket = t->getSocket();
  764.       // Find the highest socket value. It will be used by select
  765.       if (socket > maxSocketValue)
  766. maxSocketValue = socket;
  767.       
  768.       // Put the connected transporters in the socket read-set 
  769.       FD_SET(socket, &tcpReadset);
  770.     }
  771.   }
  772.   
  773.   // The highest socket value plus one
  774.   maxSocketValue++; 
  775.   
  776.   tcpReadSelectReply = select(maxSocketValue, &tcpReadset, 0, 0, &timeout);  
  777.   if(false && tcpReadSelectReply == -1 && errno == EINTR)
  778.     ndbout_c("woke-up by signal");
  779. #ifdef NDB_WIN32
  780.   if(tcpReadSelectReply == SOCKET_ERROR)
  781.   {
  782.     NdbSleep_MilliSleep(timeOutMillis);
  783.   }
  784. #endif
  785.   
  786.   return tcpReadSelectReply;
  787. }
  788. #endif
  789. void
  790. TransporterRegistry::performReceive()
  791. {
  792. #ifdef NDB_OSE_TRANSPORTER
  793.   if(theOSEReceiver != 0)
  794.   {
  795.     while(theOSEReceiver->hasData())
  796.     {
  797.       NodeId remoteNodeId;
  798.       Uint32 * readPtr;
  799.       Uint32 sz = theOSEReceiver->getReceiveData(&remoteNodeId, &readPtr);
  800.       Uint32 szUsed = unpack(readPtr,
  801.      sz,
  802.      remoteNodeId,
  803.      ioStates[remoteNodeId]);
  804. #ifdef DEBUG_TRANSPORTER
  805.       /**
  806.        * OSE transporter can handle executions of
  807.        *   half signals
  808.        */
  809.       assert(sz == szUsed);
  810. #endif
  811.       theOSEReceiver->updateReceiveDataPtr(szUsed);
  812.       theOSEReceiver->doReceive(0);
  813.       //      checkJobBuffer();
  814.     }
  815.   }
  816. #endif
  817. #ifdef NDB_TCP_TRANSPORTER
  818.   if(tcpReadSelectReply > 0)
  819.   {
  820.     for (int i=0; i<nTCPTransporters; i++) 
  821.     {
  822.       checkJobBuffer();
  823.       TCP_Transporter *t = theTCPTransporters[i];
  824.       const NodeId nodeId = t->getRemoteNodeId();
  825.       const NDB_SOCKET_TYPE socket    = t->getSocket();
  826.       if(is_connected(nodeId)){
  827. if(t->isConnected() && FD_ISSET(socket, &tcpReadset)) 
  828. {
  829.   const int receiveSize = t->doReceive();
  830.   if(receiveSize > 0)
  831.   {
  832.     Uint32 * ptr;
  833.     Uint32 sz = t->getReceiveData(&ptr);
  834.     Uint32 szUsed = unpack(ptr, sz, nodeId, ioStates[nodeId]);
  835.     t->updateReceiveDataPtr(szUsed);
  836.           }
  837. }
  838.       } 
  839.     }
  840.   }
  841. #endif
  842.   
  843. #ifdef NDB_SCI_TRANSPORTER
  844.   //performReceive
  845.   //do prepareReceive on the SCI transporters  (prepareReceive(t,,,,))
  846.   for (int i=0; i<nSCITransporters; i++) 
  847.   {
  848.     checkJobBuffer();
  849.     SCI_Transporter  *t = theSCITransporters[i];
  850.     const NodeId nodeId = t->getRemoteNodeId();
  851.     if(is_connected(nodeId))
  852.     {
  853.       if(t->isConnected() && t->checkConnected())
  854.       {
  855. Uint32 * readPtr, * eodPtr;
  856. t->getReceivePtr(&readPtr, &eodPtr);
  857. Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]);
  858. t->updateReceivePtr(newPtr);
  859.       }
  860.     } 
  861.   }
  862. #endif
  863. #ifdef NDB_SHM_TRANSPORTER
  864.   for (int i=0; i<nSHMTransporters; i++) 
  865.   {
  866.     checkJobBuffer();
  867.     SHM_Transporter *t = theSHMTransporters[i];
  868.     const NodeId nodeId = t->getRemoteNodeId();
  869.     if(is_connected(nodeId)){
  870.       if(t->isConnected() && t->checkConnected())
  871.       {
  872. Uint32 * readPtr, * eodPtr;
  873. t->getReceivePtr(&readPtr, &eodPtr);
  874. Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]);
  875. t->updateReceivePtr(newPtr);
  876.       }
  877.     } 
  878.   }
  879. #endif
  880. }
  881. static int x = 0;
  882. void
  883. TransporterRegistry::performSend()
  884. {
  885.   int i; 
  886.   sendCounter = 1;
  887.   
  888. #ifdef NDB_OSE_TRANSPORTER
  889.   for (int i = 0; i < nOSETransporters; i++)
  890.   {
  891.     OSE_Transporter *t = theOSETransporters[i]; 
  892.     if(is_connected(t->getRemoteNodeId()) &&& (t->isConnected()))
  893.     {
  894.       t->doSend();
  895.     }//if
  896.   }//for
  897. #endif
  898.   
  899. #ifdef NDB_TCP_TRANSPORTER
  900. #ifdef NDB_OSE
  901.   {
  902.     int maxSocketValue = 0;
  903.     
  904.     // Needed for TCP/IP connections
  905.     // The writeset are used by select
  906.     fd_set writeset;
  907.     FD_ZERO(&writeset);
  908.     
  909.     // Prepare for sending and receiving
  910.     for (i = 0; i < nTCPTransporters; i++) {
  911.       TCP_Transporter * t = theTCPTransporters[i];
  912.       
  913.       // If the transporter is connected
  914.       if ((t->hasDataToSend()) && (t->isConnected())) {
  915. const int socket = t->getSocket();
  916. // Find the highest socket value. It will be used by select
  917. if (socket > maxSocketValue) {
  918.   maxSocketValue = socket;
  919. }//if
  920. FD_SET(socket, &writeset);
  921.       }//if
  922.     }//for
  923.     
  924.     // The highest socket value plus one
  925.     if(maxSocketValue == 0)
  926.       return;
  927.     
  928.     maxSocketValue++; 
  929.     struct timeval timeout = { 0, 1025 };
  930.     Uint32 tmp = select(maxSocketValue, 0, &writeset, 0, &timeout);
  931.     
  932.     if (tmp == 0) 
  933.     {
  934.       return;
  935.     }//if
  936.     for (i = 0; i < nTCPTransporters; i++) {
  937.       TCP_Transporter *t = theTCPTransporters[i];
  938.       const NodeId nodeId = t->getRemoteNodeId();
  939.       const int socket    = t->getSocket();
  940.       if(is_connected(nodeId)){
  941.   if(t->isConnected() && FD_ISSET(socket, &writeset)) {
  942.     t->doSend();
  943.   }//if
  944. }//if
  945.       }//for
  946.     }
  947. #endif
  948. #ifdef NDB_TCP_TRANSPORTER
  949.   for (i = x; i < nTCPTransporters; i++) 
  950.   {
  951.     TCP_Transporter *t = theTCPTransporters[i];
  952.     if (t && t->hasDataToSend() && t->isConnected() &&
  953. is_connected(t->getRemoteNodeId())) 
  954.     {
  955.       t->doSend();
  956.     }
  957.   }
  958.   for (i = 0; i < x && i < nTCPTransporters; i++) 
  959.   {
  960.     TCP_Transporter *t = theTCPTransporters[i];
  961.     if (t && t->hasDataToSend() && t->isConnected() &&
  962. is_connected(t->getRemoteNodeId())) 
  963.     {
  964.       t->doSend();
  965.     }
  966.   }
  967.   x++;
  968.   if (x == nTCPTransporters) x = 0;
  969. #endif
  970. #endif
  971. #ifdef NDB_SCI_TRANSPORTER
  972.   //scroll through the SCI transporters, 
  973.   // get each transporter, check if connected, send data
  974.   for (i=0; i<nSCITransporters; i++) {
  975.     SCI_Transporter  *t = theSCITransporters[i];
  976.     const NodeId nodeId = t->getRemoteNodeId();
  977.     
  978.     if(is_connected(nodeId))
  979.     {
  980.       if(t->isConnected() && t->hasDataToSend()) {
  981. t->doSend();
  982.       } //if
  983.     } //if
  984.   }
  985. #endif
  986.   
  987. #ifdef NDB_SHM_TRANSPORTER
  988.   for (i=0; i<nSHMTransporters; i++) 
  989.   {
  990.     SHM_Transporter  *t = theSHMTransporters[i];
  991.     const NodeId nodeId = t->getRemoteNodeId();
  992.     if(is_connected(nodeId))
  993.     {
  994.       if(t->isConnected())
  995.       {
  996. t->doSend();
  997.       }
  998.     }
  999.   }
  1000. #endif
  1001. }
  1002. int
  1003. TransporterRegistry::forceSendCheck(int sendLimit){
  1004.   int tSendCounter = sendCounter;
  1005.   sendCounter = tSendCounter + 1;
  1006.   if (tSendCounter >= sendLimit) {
  1007.     performSend();
  1008.     sendCounter = 1;
  1009.     return 1;
  1010.   }//if
  1011.   return 0;
  1012. }//TransporterRegistry::forceSendCheck()
  1013. #ifdef DEBUG_TRANSPORTER
  1014. void
  1015. TransporterRegistry::printState(){
  1016.   ndbout << "-- TransporterRegistry -- " << endl << endl
  1017.  << "Transporters = " << nTransporters << endl;
  1018.   for(int i = 0; i<maxTransporters; i++)
  1019.     if(theTransporters[i] != NULL){
  1020.       const NodeId remoteNodeId = theTransporters[i]->getRemoteNodeId();
  1021.       ndbout << "Transporter: " << remoteNodeId 
  1022.      << " PerformState: " << performStates[remoteNodeId]
  1023.      << " IOState: " << ioStates[remoteNodeId] << endl;
  1024.     }
  1025. }
  1026. #endif
  1027. IOState
  1028. TransporterRegistry::ioState(NodeId nodeId) { 
  1029.   return ioStates[nodeId]; 
  1030. }
  1031. void
  1032. TransporterRegistry::setIOState(NodeId nodeId, IOState state) {
  1033.   DEBUG("TransporterRegistry::setIOState("
  1034. << nodeId << ", " << state << ")");
  1035.   ioStates[nodeId] = state;
  1036. }
  1037. static void * 
  1038. run_start_clients_C(void * me)
  1039. {
  1040.   ((TransporterRegistry*) me)->start_clients_thread();
  1041.   return 0;
  1042. }
  1043. // Run by kernel thread
  1044. void
  1045. TransporterRegistry::do_connect(NodeId node_id)
  1046. {
  1047.   PerformState &curr_state = performStates[node_id];
  1048.   switch(curr_state){
  1049.   case DISCONNECTED:
  1050.     break;
  1051.   case CONNECTED:
  1052.     return;
  1053.   case CONNECTING:
  1054.     return;
  1055.   case DISCONNECTING:
  1056.     break;
  1057.   }
  1058.   curr_state= CONNECTING;
  1059. }
  1060. void
  1061. TransporterRegistry::do_disconnect(NodeId node_id)
  1062. {
  1063.   PerformState &curr_state = performStates[node_id];
  1064.   switch(curr_state){
  1065.   case DISCONNECTED:
  1066.     return;
  1067.   case CONNECTED:
  1068.     break;
  1069.   case CONNECTING:
  1070.     break;
  1071.   case DISCONNECTING:
  1072.     return;
  1073.   }
  1074.   curr_state= DISCONNECTING;
  1075. }
  1076. void
  1077. TransporterRegistry::report_connect(NodeId node_id)
  1078. {
  1079.   performStates[node_id] = CONNECTED;
  1080.   reportConnect(callbackObj, node_id);
  1081. }
  1082. void
  1083. TransporterRegistry::report_disconnect(NodeId node_id, int errnum)
  1084. {
  1085.   performStates[node_id] = DISCONNECTED;
  1086.   reportDisconnect(callbackObj, node_id, errnum);
  1087. }
  1088. void
  1089. TransporterRegistry::update_connections()
  1090. {
  1091.   for (int i= 0, n= 0; n < nTransporters; i++){
  1092.     Transporter * t = theTransporters[i];
  1093.     if (!t)
  1094.       continue;
  1095.     n++;
  1096.     const NodeId nodeId = t->getRemoteNodeId();
  1097.     switch(performStates[nodeId]){
  1098.     case CONNECTED:
  1099.     case DISCONNECTED:
  1100.       break;
  1101.     case CONNECTING:
  1102.       if(t->isConnected())
  1103. report_connect(nodeId);
  1104.       break;
  1105.     case DISCONNECTING:
  1106.       if(!t->isConnected())
  1107. report_disconnect(nodeId, 0);
  1108.       break;
  1109.     }
  1110.   }
  1111. }
  1112. // run as own thread
  1113. void
  1114. TransporterRegistry::start_clients_thread()
  1115. {
  1116.   DBUG_ENTER("TransporterRegistry::start_clients_thread");
  1117.   while (m_run_start_clients_thread) {
  1118.     NdbSleep_MilliSleep(100);
  1119.     for (int i= 0, n= 0; n < nTransporters && m_run_start_clients_thread; i++){
  1120.       Transporter * t = theTransporters[i];
  1121.       if (!t)
  1122. continue;
  1123.       n++;
  1124.       const NodeId nodeId = t->getRemoteNodeId();
  1125.       switch(performStates[nodeId]){
  1126.       case CONNECTING:
  1127. if(!t->isConnected() && !t->isServer)
  1128.     t->connect_client();
  1129. break;
  1130.       case DISCONNECTING:
  1131. if(t->isConnected())
  1132.   t->doDisconnect();
  1133. break;
  1134.       default:
  1135. break;
  1136.       }
  1137.     }
  1138.   }
  1139.   DBUG_VOID_RETURN;
  1140. }
  1141. bool
  1142. TransporterRegistry::start_clients()
  1143. {
  1144.   m_run_start_clients_thread= true;
  1145.   m_start_clients_thread= NdbThread_Create(run_start_clients_C,
  1146.    (void**)this,
  1147.    32768,
  1148.    "ndb_start_clients",
  1149.    NDB_THREAD_PRIO_LOW);
  1150.   if (m_start_clients_thread == 0) {
  1151.     m_run_start_clients_thread= false;
  1152.     return false;
  1153.   }
  1154.   return true;
  1155. }
  1156. bool
  1157. TransporterRegistry::stop_clients()
  1158. {
  1159.   if (m_start_clients_thread) {
  1160.     m_run_start_clients_thread= false;
  1161.     void* status;
  1162.     int r= NdbThread_WaitFor(m_start_clients_thread, &status);
  1163.     NdbThread_Destroy(&m_start_clients_thread);
  1164.   }
  1165.   return true;
  1166. }
  1167. void
  1168. TransporterRegistry::add_transporter_interface(const char *interf, unsigned short port)
  1169. {
  1170.   DBUG_ENTER("TransporterRegistry::add_transporter_interface");
  1171.   DBUG_PRINT("enter",("interface=%s, port= %d", interf, port));
  1172.   if (interf && strlen(interf) == 0)
  1173.     interf= 0;
  1174.   for (unsigned i= 0; i < m_transporter_interface.size(); i++)
  1175.   {
  1176.     Transporter_interface &tmp= m_transporter_interface[i];
  1177.     if (port != tmp.m_service_port)
  1178.       continue;
  1179.     if (interf != 0 && tmp.m_interface != 0 &&
  1180. strcmp(interf, tmp.m_interface) == 0)
  1181.     {
  1182.       DBUG_VOID_RETURN; // found match, no need to insert
  1183.     }
  1184.     if (interf == 0 && tmp.m_interface == 0)
  1185.     {
  1186.       DBUG_VOID_RETURN; // found match, no need to insert
  1187.     }
  1188.   }
  1189.   Transporter_interface t;
  1190.   t.m_service_port= port;
  1191.   t.m_interface= interf;
  1192.   m_transporter_interface.push_back(t);
  1193.   DBUG_PRINT("exit",("interface and port added"));
  1194.   DBUG_VOID_RETURN;
  1195. }
  1196. bool
  1197. TransporterRegistry::start_service(SocketServer& socket_server)
  1198. {
  1199.   if (m_transporter_interface.size() > 0 && !nodeIdSpecified)
  1200.   {
  1201.     ndbout_c("TransporterRegistry::startReceiving: localNodeId not specified");
  1202.     return false;
  1203.   }
  1204.   for (unsigned i= 0; i < m_transporter_interface.size(); i++)
  1205.   {
  1206.     Transporter_interface &t= m_transporter_interface[i];
  1207.     if (t.m_service_port == 0)
  1208.     {
  1209.       continue;
  1210.     }
  1211.     TransporterService *transporter_service =
  1212.       new TransporterService(new SocketAuthSimple("ndbd", "ndbd passwd"));
  1213.     if(!socket_server.setup(transporter_service,
  1214.     t.m_service_port, t.m_interface))
  1215.     {
  1216.       ndbout_c("Unable to setup transporter service port: %s:%d!n"
  1217.        "Please check if the port is already used,n"
  1218.        "(perhaps the node is already running)",
  1219.        t.m_interface ? t.m_interface : "*", t.m_service_port);
  1220.       delete transporter_service;
  1221.       return false;
  1222.     }
  1223.     transporter_service->setTransporterRegistry(this);
  1224.   }
  1225.   return true;
  1226. }
  1227. #ifdef NDB_SHM_TRANSPORTER
  1228. static
  1229. RETSIGTYPE 
  1230. shm_sig_handler(int signo)
  1231. {
  1232.   g_shm_counter++;
  1233. }
  1234. #endif
  1235. void
  1236. TransporterRegistry::startReceiving()
  1237. {
  1238.   DBUG_ENTER("TransporterRegistry::startReceiving");
  1239. #ifdef NDB_OSE_TRANSPORTER
  1240.   if(theOSEReceiver != NULL){
  1241.     theOSEReceiver->createPhantom();
  1242.   }
  1243. #endif
  1244. #ifdef NDB_OSE
  1245.   theOSEJunkSocketRecv = socket(AF_INET, SOCK_STREAM, 0);
  1246. #endif
  1247. #if defined NDB_OSE || defined NDB_SOFTOSE
  1248.   theReceiverPid = current_process();
  1249.   for(int i = 0; i<nTCPTransporters; i++)
  1250.     theTCPTransporters[i]->theReceiverPid = theReceiverPid;
  1251. #endif
  1252. #ifdef NDB_SHM_TRANSPORTER
  1253.   m_shm_own_pid = getpid();
  1254.   if (g_ndb_shm_signum)
  1255.   {
  1256.     DBUG_PRINT("info",("Install signal handler for signum %d",
  1257.        g_ndb_shm_signum));
  1258.     struct sigaction sa;
  1259.     sigemptyset(&sa.sa_mask);
  1260.     sigaddset(&sa.sa_mask, g_ndb_shm_signum);
  1261.     pthread_sigmask(SIG_UNBLOCK, &sa.sa_mask, 0);
  1262.     sa.sa_handler = shm_sig_handler;
  1263.     sigemptyset(&sa.sa_mask);
  1264.     sa.sa_flags = 0;
  1265.     int ret;
  1266.     while((ret = sigaction(g_ndb_shm_signum, &sa, 0)) == -1 && errno == EINTR);
  1267.     if(ret != 0)
  1268.     {
  1269.       DBUG_PRINT("error",("Install failed"));
  1270.       g_eventLogger.error("Failed to install signal handler for"
  1271.   " SHM transporter errno: %d (%s)", errno, 
  1272. #ifdef HAVE_STRERROR
  1273.   strerror(errno)
  1274. #else
  1275.                           ""
  1276. #endif
  1277.   );
  1278.     }
  1279.   }
  1280. #endif // NDB_SHM_TRANSPORTER
  1281.   DBUG_VOID_RETURN;
  1282. }
  1283. void
  1284. TransporterRegistry::stopReceiving(){
  1285. #ifdef NDB_OSE_TRANSPORTER
  1286.   if(theOSEReceiver != NULL){
  1287.     theOSEReceiver->destroyPhantom();
  1288.   }
  1289. #endif
  1290.   /**
  1291.    * Disconnect all transporters, this includes detach from remote node
  1292.    * and since that must be done from the same process that called attach
  1293.    * it's done here in the receive thread
  1294.    */
  1295.   disconnectAll();
  1296. #if defined NDB_OSE || defined NDB_SOFTOSE
  1297.   if(theOSEJunkSocketRecv > 0)
  1298.     close(theOSEJunkSocketRecv);
  1299.   theOSEJunkSocketRecv = -1;
  1300. #endif
  1301. }
  1302. void
  1303. TransporterRegistry::startSending(){
  1304. #if defined NDB_OSE || defined NDB_SOFTOSE
  1305.   theOSEJunkSocketSend = socket(AF_INET, SOCK_STREAM, 0);
  1306. #endif
  1307. }
  1308. void
  1309. TransporterRegistry::stopSending(){
  1310. #if defined NDB_OSE || defined NDB_SOFTOSE
  1311.   if(theOSEJunkSocketSend > 0)
  1312.     close(theOSEJunkSocketSend);
  1313.   theOSEJunkSocketSend = -1;
  1314. #endif
  1315. }
  1316. NdbOut & operator <<(NdbOut & out, SignalHeader & sh){
  1317.   out << "-- Signal Header --" << endl;
  1318.   out << "theLength:    " << sh.theLength << endl;
  1319.   out << "gsn:          " << sh.theVerId_signalNumber << endl;
  1320.   out << "recBlockNo:   " << sh.theReceiversBlockNumber << endl;
  1321.   out << "sendBlockRef: " << sh.theSendersBlockRef << endl;
  1322.   out << "sendersSig:   " << sh.theSendersSignalId << endl;
  1323.   out << "theSignalId:  " << sh.theSignalId << endl;
  1324.   out << "trace:        " << (int)sh.theTrace << endl;
  1325.   return out;
  1326. template class Vector<TransporterRegistry::Transporter_interface>;