OSE_Transporter.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:12k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #include <ose.h>
  14. #include "OSE_Transporter.hpp"
  15. #include "OSE_Signals.hpp"
  16. #include <TransporterCallback.hpp>
  17. #include "TransporterInternalDefinitions.hpp"
  18. #include <NdbMutex.h>
  19. #include <NdbHost.h>
  20. #include <NdbOut.hpp>
  21. #include <time.h>
  22. OSE_Transporter::OSE_Transporter(int _prioASignalSize,
  23.                                  int _prioBSignalSize,
  24.                                  NodeId localNodeId,
  25.                                  const char * lHostName,
  26.                                  NodeId remoteNodeId,
  27.                                  const char * rHostName,
  28.                                  int byteorder,
  29.                                  bool compression, 
  30.                                  bool checksum, 
  31.                                  bool signalId,
  32.                                  Uint32 reportFreq) :
  33.   Transporter(localNodeId,
  34.               remoteNodeId,
  35.               byteorder,
  36.               compression,
  37.               checksum,
  38.               signalId),
  39.   isServer(localNodeId < remoteNodeId)
  40. {
  41.   signalIdCounter = 0;
  42.   prioBSignalSize = _prioBSignalSize;
  43.   
  44.   if (strcmp(lHostName, rHostName) == 0){    
  45.     BaseString::snprintf(remoteNodeName, sizeof(remoteNodeName), 
  46.              "ndb_node%d", remoteNodeId);
  47.   } else {
  48.     BaseString::snprintf(remoteNodeName, sizeof(remoteNodeName), 
  49.              "%s/ndb_node%d", rHostName, remoteNodeId); 
  50.   }
  51.   
  52.   prioBSignal      = NIL;
  53. }
  54. OSE_Transporter::~OSE_Transporter() {
  55. #if 0 
  56.   /**
  57.   * Don't free these buffers since they have already been freed
  58.   * when the process allocating them died (wild pointers)
  59.   */
  60.   if(prioBSignal != NIL)
  61.     free_buf(&prioBSignal);
  62. #endif
  63. }
  64. bool
  65. OSE_Transporter::initTransporter() {
  66.   struct OS_pcb * pcb = get_pcb(current_process());
  67.   if(pcb != NULL){
  68.     if(pcb->type != OS_ILLEGAL){
  69.       if(prioBSignalSize > pcb->max_sigsize){
  70.         DEBUG("prioBSignalSize(" << prioBSignalSize << ") > max_sigsize("
  71.               << pcb->max_sigsize << ") using max_sigsize");
  72.         prioBSignalSize = pcb->max_sigsize;
  73.       }
  74.     }
  75.     free_buf((union SIGNAL **)&pcb);
  76.   }
  77.   maxPrioBDataSize = prioBSignalSize;
  78.   maxPrioBDataSize -= (sizeof(NdbTransporterData) + MAX_MESSAGE_SIZE - 4);
  79.   
  80.   if(maxPrioBDataSize < 0){
  81.     
  82. #ifdef DEBUG_TRANSPORTER
  83.     printf("maxPrioBDataSize < 0 %dn",
  84.            maxPrioBDataSize);
  85. #endif
  86.     return false;
  87.   }
  88.   initSignals();
  89.   
  90.   return true;
  91. }
  92. void
  93. OSE_Transporter::initSignals(){
  94.   if(prioBSignal == NIL){
  95.     prioBSignal = alloc(prioBSignalSize, NDB_TRANSPORTER_DATA);
  96.     prioBInsertPtr = &prioBSignal->dataSignal.data[0];
  97.   
  98.     prioBSignal->dataSignal.length = 0;
  99.     prioBSignal->dataSignal.senderNodeId = localNodeId;
  100.   }
  101.   dataToSend = 0;
  102. }
  103. NdbTransporterData *
  104. OSE_Transporter::allocPrioASignal(Uint32 messageLenBytes) const
  105. {
  106.   
  107.   const Uint32 lenBytes = messageLenBytes + sizeof(NdbTransporterData) - 4;
  108.   
  109.   NdbTransporterData * sig = 
  110.     (NdbTransporterData*)alloc(lenBytes, NDB_TRANSPORTER_PRIO_A);
  111.   
  112.   sig->length = 0;
  113.   sig->senderNodeId = localNodeId;
  114.   
  115.   return sig;
  116. }
  117. Uint32 *
  118. OSE_Transporter::getWritePtr(Uint32 lenBytes, Uint32 prio){
  119.   if(prio >= 1){
  120.     prio = 1;
  121.     insertPtr  = prioBInsertPtr;
  122.     signal     = (NdbTransporterData*)prioBSignal;
  123.   } else {
  124.     signal    = allocPrioASignal(lenBytes);
  125.     insertPtr = &signal->data[0];
  126.   }
  127.   return insertPtr;
  128. }
  129. void
  130. OSE_Transporter::updateWritePtr(Uint32 lenBytes, Uint32 prio){
  131.   Uint32 bufferSize = signal->length;
  132.   bufferSize += lenBytes;
  133.   signal->length = bufferSize;
  134.   if(prio >= 1){
  135.     prioBInsertPtr += (lenBytes / 4);
  136.     if(bufferSize >= maxPrioBDataSize)
  137.       doSend();
  138.   } else {
  139.     /**
  140.      * Prio A signal are sent directly
  141.      */
  142.     signal->sigId = 0;
  143.     
  144.     ::send((union SIGNAL**)&signal, remoteNodePid);
  145.   }
  146. }
  147. #if 0
  148. int getSeq(int _seq){
  149.   if (_seq > 0){
  150.     switch (_seq % 100){
  151.     case 10:
  152.       return _seq - 1;
  153.     case 9: 
  154.       return _seq + 1;
  155.     default:
  156.       return _seq;
  157.     }
  158.   }else{
  159.     return _seq;
  160.   }
  161. }
  162. int getSeq(int _seq){
  163.     switch (_seq % 40){
  164.     case 10:       
  165.       return _seq-4;
  166.     case 9:
  167.       return _seq-2;
  168.     case 8:       
  169.       return _seq;
  170.     case 7:
  171.       return _seq+2;
  172.     case 6:       
  173.       return _seq+4;
  174.     case 30:       
  175.       return _seq-9;
  176.     case 29:
  177.       return _seq-7;
  178.     case 28:       
  179.       return _seq-5;
  180.     case 27:
  181.       return _seq-3;
  182.     case 26:       
  183.       return _seq-1;
  184.     case 25:       
  185.       return _seq+1;
  186.     case 24:
  187.       return _seq+3;
  188.     case 23:       
  189.       return _seq+5;
  190.     case 22:
  191.       return _seq+7;
  192.     case 21:       
  193.       return _seq+9;
  194.     default:
  195.       return _seq;
  196.     
  197.     }
  198. }
  199. #endif
  200. void
  201. OSE_Transporter::doSend() {
  202.   /**
  203.    * restore is always called to make sure the signal buffer is taken over 
  204.    * by a process that is alive, this will otherwise lead to that these buffers
  205.    * are removed when the process that allocated them dies
  206.    */
  207.   restore(prioBSignal);
  208.   if(prioBSignal->dataSignal.length > 0){
  209.     prioBSignal->dataSignal.sigId = signalIdCounter;
  210.     signalIdCounter++;
  211.     ::send(&prioBSignal, remoteNodePid);
  212.   }
  213.   
  214.   initSignals();
  215. }
  216. void
  217. OSE_Transporter::doConnect() {
  218.   
  219.   NdbMutex_Lock(theMutexPtr);
  220.   if(_connecting || _disconnecting || _connected){
  221.     NdbMutex_Unlock(theMutexPtr);
  222.     return;
  223.   }
  224.   
  225.   _connecting = true;
  226.   signalIdCounter = 0;
  227.   if(isServer){
  228.     DEBUG("Waiting for connect req: ");
  229.     state = WAITING_FOR_CONNECT_REQ;
  230.   } else {
  231.     state = WAITING_FOR_HUNT;
  232.     
  233.     DEBUG("Hunting for: " << remoteNodeName);
  234.     
  235.     union SIGNAL* huntsig;
  236.     huntsig = alloc(sizeof(NdbTransporterHunt), NDB_TRANSPORTER_HUNT);
  237.     huntsig->ndbHunt.remoteNodeId = remoteNodeId;
  238.     hunt(remoteNodeName, 0, NULL, &huntsig);
  239.   }   
  240.   NdbMutex_Unlock(theMutexPtr);
  241. }
  242. void
  243. OSE_Transporter::doDisconnect() {  
  244.   NdbMutex_Lock(theMutexPtr);
  245.   switch(state){
  246.   case DISCONNECTED:
  247.   case WAITING_FOR_HUNT:
  248.   case WAITING_FOR_CONNECT_REQ:
  249.   case WAITING_FOR_CONNECT_CONF:
  250.     break;
  251.   case CONNECTED:
  252.     {
  253. #if 0      
  254.       /** 
  255.        * There should not be anything in the buffer that needs to be sent here
  256.        */
  257.       DEBUG("Doing send before disconnect");
  258.       doSend();
  259. #endif
  260.       union SIGNAL * sig = alloc(sizeof(NdbTransporterDisconnectOrd),
  261.                                  NDB_TRANSPORTER_DISCONNECT_ORD);
  262.       sig->ndbDisconnect.senderNodeId = localNodeId;
  263.       sig->ndbDisconnect.reason = NdbTransporterDisconnectOrd::NDB_DISCONNECT;
  264.       ::send(&sig, remoteNodePid);
  265.       detach(&remoteNodeRef);
  266.     }
  267.     break;
  268.   }
  269.   state = DISCONNECTED;
  270.   
  271.   _connected = false;
  272.   _connecting = false;
  273.   _disconnecting = false;
  274.   NdbMutex_Unlock(theMutexPtr);
  275. }
  276. void
  277. OSE_Transporter::huntReceived(struct NdbTransporterHunt * sig){
  278.   if(isServer){
  279.     WARNING("Hunt received for server: remoteNodeId: " <<
  280.             sig->remoteNodeId);
  281.     return;
  282.   }
  283.   
  284.   if(state != WAITING_FOR_HUNT){
  285.     WARNING("Hunt received while in state: " << state);
  286.     return;
  287.   }
  288.   remoteNodePid = sender((union SIGNAL**)&sig);
  289.   union SIGNAL * signal = alloc(sizeof(NdbTransporterConnectReq),
  290.                                 NDB_TRANSPORTER_CONNECT_REQ);
  291.   signal->ndbConnectReq.remoteNodeId = remoteNodeId;
  292.   signal->ndbConnectReq.senderNodeId = localNodeId;
  293.   DEBUG("Sending connect req to pid: " << hex << remoteNodePid);
  294.   
  295.   ::send(&signal, remoteNodePid);
  296.   state = WAITING_FOR_CONNECT_CONF;
  297.   return;
  298. }
  299. bool
  300. OSE_Transporter::connectReq(struct NdbTransporterConnectReq * sig){
  301.   if(!isServer){
  302.     WARNING("OSE Connect Req received for client: senderNodeId: " <<
  303.             sig->senderNodeId);
  304.     return false;
  305.   }
  306.   
  307.   if(state != WAITING_FOR_CONNECT_REQ){
  308.     PROCESS pid = sender((union SIGNAL**)&sig);
  309.     union SIGNAL * signal = alloc(sizeof(NdbTransporterConnectRef),
  310.                                   NDB_TRANSPORTER_CONNECT_REF);
  311.     signal->ndbConnectRef.senderNodeId = localNodeId;
  312.     signal->ndbConnectRef.reason = NdbTransporterConnectRef::INVALID_STATE;
  313.     DEBUG("Sending connect ref to pid: " << hex << pid);
  314.     ::send(&signal, pid);
  315.     return false;
  316.   }
  317.   
  318.   NdbMutex_Lock(theMutexPtr);
  319.   if(prioBSignal != NIL){
  320.     restore(prioBSignal);
  321.     free_buf(&prioBSignal);
  322.   }
  323.   initSignals();
  324.   remoteNodePid = sender((union SIGNAL**)&sig);
  325.   union SIGNAL * signal = alloc(sizeof(NdbTransporterConnectRef),
  326.                                 NDB_TRANSPORTER_CONNECT_CONF);
  327.   signal->ndbConnectConf.senderNodeId = localNodeId;
  328.   signal->ndbConnectConf.remoteNodeId = remoteNodeId;
  329.   union SIGNAL * discon = alloc(sizeof(NdbTransporterDisconnectOrd),
  330.                                 NDB_TRANSPORTER_DISCONNECT_ORD);
  331.   discon->ndbDisconnect.senderNodeId = remoteNodeId;
  332.   discon->ndbDisconnect.reason = NdbTransporterDisconnectOrd::PROCESS_DIED;
  333.   
  334.   DEBUG("Attaching to pid: " << hex << remoteNodePid);
  335.   remoteNodeRef = attach(&discon, remoteNodePid);
  336.   
  337.   DEBUG("Sending connect conf to pid: " << hex << remoteNodePid);
  338.   ::send(&signal, remoteNodePid);
  339.   state = CONNECTED;
  340.   
  341.   _connected     = true;
  342.   _connecting    = false;
  343.   _disconnecting = false;
  344.   NdbMutex_Unlock(theMutexPtr);
  345.   
  346.   return true;
  347. }
  348. bool
  349. OSE_Transporter::connectRef(struct NdbTransporterConnectRef * sig){
  350.   if(isServer){
  351.     WARNING("OSE Connect Ref received for server: senderNodeId: " <<
  352.             sig->senderNodeId);
  353.     return false;
  354.   }
  355.   if(state != WAITING_FOR_CONNECT_CONF){
  356.     WARNING("OSE Connect Ref received for client while in state: " <<
  357.             state << " senderNodeId: " << sig->senderNodeId);
  358.     return false;
  359.   }
  360.   doDisconnect();
  361. #if 0
  362.   /** 
  363.    * Don't call connect directly, wait until the next time 
  364.    * checkConnections is called which will trigger a new connect attempt
  365.    */
  366.   doConnect();
  367. #endif
  368.   return true;
  369. }
  370. bool
  371. OSE_Transporter::connectConf(struct NdbTransporterConnectConf * sig){
  372.   if(isServer){
  373.     WARNING("OSE Connect Conf received for server: senderNodeId: " <<
  374.             sig->senderNodeId);
  375.     return false;
  376.   }
  377.   if(state != WAITING_FOR_CONNECT_CONF){
  378.     WARNING("OSE Connect Conf received while in state: " <<
  379.             state);
  380.     return false;
  381.   }
  382.   NdbMutex_Lock(theMutexPtr);
  383.   // Free the buffers to get rid of any "junk" that they might contain
  384.   if(prioBSignal != NIL){
  385.     restore(prioBSignal);
  386.     free_buf(&prioBSignal);
  387.   }
  388.   initSignals();
  389.   union SIGNAL * discon = alloc(sizeof(NdbTransporterDisconnectOrd),
  390.                                 NDB_TRANSPORTER_DISCONNECT_ORD);
  391.   discon->ndbDisconnect.senderNodeId = remoteNodeId;
  392.   discon->ndbDisconnect.reason= NdbTransporterDisconnectOrd::PROCESS_DIED;
  393.   
  394.   remoteNodeRef = attach(&discon, remoteNodePid);
  395.   
  396.   state = CONNECTED;
  397.   _connected     = true;
  398.   _connecting    = false;
  399.   _disconnecting = false;
  400.   // Free the buffers to get rid of any "junk" that they might contain
  401.   if(prioBSignal != NIL){
  402.     restore(prioBSignal);
  403.     free_buf(&prioBSignal);
  404.   }
  405.   initSignals();
  406.   
  407.   NdbMutex_Unlock(theMutexPtr);
  408.   return true;
  409. }
  410. bool
  411. OSE_Transporter::disconnectOrd(struct NdbTransporterDisconnectOrd * sig){
  412.   if(state != CONNECTED){
  413.     WARNING("OSE Disconnect Ord received while in state: " << state <<
  414.             " reason: " << sig->reason);
  415.     return false;
  416.   }
  417.   if(sig->reason == NdbTransporterDisconnectOrd::PROCESS_DIED){
  418.     state = DISCONNECTED;
  419.   }
  420.   
  421.   doDisconnect();
  422.   reportDisconnect(callbackObj, remoteNodeId,0);
  423.   return true;
  424. }