OSE_Receiver.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:10k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #include <NdbOut.hpp>
  14. #include "OSE_Receiver.hpp"
  15. #include "OSE_Transporter.hpp"
  16. #include "TransporterCallback.hpp"
  17. #include <TransporterRegistry.hpp>
  18. #include "TransporterInternalDefinitions.hpp"
  19. OSE_Receiver::OSE_Receiver(TransporterRegistry * tr,
  20.    int _recBufSize,
  21.    NodeId _localNodeId) {
  22.   theTransporterRegistry = tr;
  23.   
  24.   recBufSize       = _recBufSize;
  25.   recBufReadIndex  = 0;
  26.   recBufWriteIndex = 0;
  27.   receiveBuffer = new union SIGNAL * [recBufSize];
  28.   waitStackCount   = 0;
  29.   waitStackSize    = _recBufSize;
  30.   waitStack = new union SIGNAL * [waitStackSize];
  31.   nextSigId = new Uint32[MAX_NTRANSPORTERS];
  32.   for (int i = 0; i < MAX_NTRANSPORTERS; i++)
  33.     nextSigId[i] = 0;
  34.   phantomCreated = false;
  35.   localNodeId    = _localNodeId;
  36.   BaseString::snprintf(localHostName, sizeof(localHostName), 
  37.    "ndb_node%d", localNodeId);
  38.   DEBUG("localNodeId = " << localNodeId << " -> localHostName = " 
  39. << localHostName);
  40. }
  41. OSE_Receiver::~OSE_Receiver(){
  42.   while(recBufReadIndex != recBufWriteIndex){
  43.     free_buf(&receiveBuffer[recBufReadIndex]);
  44.     recBufReadIndex = (recBufReadIndex + 1) % recBufSize;
  45.   }
  46.   delete [] receiveBuffer;
  47.   destroyPhantom();
  48. }
  49. PROCESS
  50. OSE_Receiver::createPhantom(){
  51.   redir.sig = 1;
  52.   redir.pid = current_process();
  53.   if(!phantomCreated){
  54.     phantomPid = create_process
  55.       (OS_PHANTOM,    // Type
  56.        localHostName, // Name
  57.        NULL,          // Entry point
  58.        0,             // Stack size
  59.        0,             // Prio - Not used
  60.        (OSTIME)0,     // Timeslice - Not used
  61.        0,             // Block - current block
  62.        &redir, 
  63.        (OSVECTOR)0,   // vector
  64.        (OSUSER)0);    // user
  65.     phantomCreated = true;
  66.     DEBUG("Created phantom pid: " << hex << phantomPid);
  67.   }
  68.   return phantomPid;
  69. }
  70. void
  71. OSE_Receiver::destroyPhantom(){
  72.   if(phantomCreated){
  73.     DEBUG("Destroying phantom pid: " << hex << phantomPid);
  74.     kill_proc(phantomPid);
  75.     phantomCreated = false;
  76.   }
  77. }
  78. static SIGSELECT PRIO_A_SIGNALS[] = { 6,
  79.       NDB_TRANSPORTER_PRIO_A,
  80.       NDB_TRANSPORTER_HUNT,
  81.       NDB_TRANSPORTER_CONNECT_REQ,
  82.       NDB_TRANSPORTER_CONNECT_REF,
  83.       NDB_TRANSPORTER_CONNECT_CONF,
  84.       NDB_TRANSPORTER_DISCONNECT_ORD
  85. };
  86. static SIGSELECT PRIO_B_SIGNALS[] = { 1, 
  87.       NDB_TRANSPORTER_DATA 
  88. };
  89. /**
  90.  * Check waitstack for signals that are next in sequence     
  91.  * Put any found signal in receive buffer
  92.  * Returns true if one signal is found
  93.  */
  94. bool 
  95. OSE_Receiver::checkWaitStack(NodeId _nodeId){
  96.   for(int i = 0; i < waitStackCount; i++){
  97.     if (waitStack[i]->dataSignal.senderNodeId == _nodeId && 
  98.         waitStack[i]->dataSignal.sigId == nextSigId[_nodeId]){
  99.       
  100.       ndbout_c("INFO: signal popped from waitStack, sigId = %d",
  101.                waitStack[i]->dataSignal.sigId);    
  102.       
  103.       if(isFull()){
  104.         ndbout_c("ERROR: receiveBuffer is full");
  105. reportError(callbackObj, _nodeId, TE_RECEIVE_BUFFER_FULL);
  106. return false;
  107.       }
  108.       
  109.       // The next signal was found, put it in the receive buffer
  110.       insertReceiveBuffer(waitStack[i]);
  111.       
  112.       // Increase sequence id, set it to the next expected id
  113.       nextSigId[_nodeId]++;
  114.       
  115.       // Move signals below up one step
  116.       for(int j = i; j < waitStackCount-1; j++)
  117.         waitStack[j] = waitStack[j+1];
  118.       waitStack[waitStackCount] = NULL;
  119.       waitStackCount--;
  120.       
  121.       // return true since signal was found
  122.       return true;    
  123.     }
  124.   }
  125.   return false;
  126. }
  127. /**
  128.  * Clear waitstack for signals from node with _nodeId
  129.  */
  130. void
  131. OSE_Receiver::clearWaitStack(NodeId _nodeId){
  132.   
  133.   for(int i = 0; i < waitStackCount; i++){
  134.     if (waitStack[i]->dataSignal.senderNodeId == _nodeId){
  135.       
  136.       // Free signal buffer
  137.       free_buf(&waitStack[i]);
  138.       
  139.       // Move signals below up one step
  140.       for(int j = i; j < waitStackCount-1; j++)
  141.         waitStack[j] = waitStack[j+1];
  142.       waitStack[waitStackCount] = NULL;
  143.       waitStackCount--;
  144.     }
  145.   }
  146.   nextSigId[_nodeId] = 0;
  147. }
  148. inline 
  149. void 
  150. OSE_Receiver::insertWaitStack(union SIGNAL* _sig){
  151.   if (waitStackCount <= waitStackSize){
  152.     waitStack[waitStackCount] = _sig;
  153.     waitStackCount++;
  154.   } else {     
  155.     ndbout_c("ERROR: waitStack is full");
  156.     reportError(callbackObj, localNodeId, TE_WAIT_STACK_FULL);
  157.   }
  158. }
  159. bool 
  160. OSE_Receiver::doReceive(Uint32 timeOutMillis) {
  161.   if(isFull())
  162.     return false;
  163.   
  164.   union SIGNAL * sig = receive_w_tmo(0,
  165.      PRIO_A_SIGNALS);
  166.   if(sig == NIL){
  167.     sig = receive_w_tmo(timeOutMillis,
  168. PRIO_B_SIGNALS);
  169.     if(sig == NIL)
  170.       return false;
  171.   }
  172.   
  173.   DEBUG("Received signal: " << sig->sigNo << " " 
  174. << sigNo2String(sig->sigNo));
  175.   
  176.   switch(sig->sigNo){
  177.   case NDB_TRANSPORTER_PRIO_A:
  178.     {
  179.       OSE_Transporter * t = getTransporter(sig->dataSignal.senderNodeId);
  180.       if (t != 0 && t->isConnected()){
  181. insertReceiveBuffer(sig);
  182.       } else {
  183. free_buf(&sig);
  184.       }
  185.     }
  186.     break;
  187.   case NDB_TRANSPORTER_DATA:
  188.     {
  189.       OSE_Transporter * t = getTransporter(sig->dataSignal.senderNodeId);
  190.       if (t != 0 && t->isConnected()){     
  191. int nodeId = sig->dataSignal.senderNodeId;
  192. Uint32 currSigId = sig->dataSignal.sigId;
  193.       
  194. /**
  195.  * Check if signal is the next in sequence
  196.  * nextSigId is always set to the next sigId to wait for
  197.  */
  198. if (nextSigId[nodeId] == currSigId){
  199.   
  200.   // Insert in receive buffer
  201.   insertReceiveBuffer(sig);
  202.   
  203.   // Increase sequence id, set it to the next expected id
  204.   nextSigId[nodeId]++;
  205.   
  206.   // Check if there are any signal in the wait stack
  207.   if (waitStackCount > 0){
  208.     while(checkWaitStack(nodeId));
  209.   }
  210. } else {
  211.   // Signal was not received in correct order
  212.   // Check values and put it in the waitStack
  213.   ndbout_c("WARNING: sigId out of order,"
  214.    " currSigId = %d, nextSigId = %d", 
  215.    currSigId,  nextSigId[nodeId]);
  216.   
  217.   if (currSigId < nextSigId[nodeId]){
  218.     // Current recieved sigId was smaller than nextSigId
  219.     // There is no use to put it in the waitStack
  220.     ndbout_c("ERROR: recieved sigId was smaller than nextSigId");
  221.     reportError(callbackObj, nodeId, TE_TOO_SMALL_SIGID);
  222.     return false;
  223.   }
  224.   
  225.   if (currSigId > (nextSigId[nodeId] + waitStackSize)){
  226.     // Current sigId was larger than nextSigId + size of waitStack
  227.     // we can never "save" so many signal's on the stack
  228.     ndbout_c("ERROR: currSigId >  (nextSigId + size of waitStack)"); 
  229.     reportError(callbackObj, nodeId, TE_TOO_LARGE_SIGID);
  230.     return false;
  231.   }
  232.   
  233.   // Insert in wait stack
  234.   insertWaitStack(sig);
  235. }        
  236.       } else {
  237. free_buf(&sig);
  238.       }
  239.     }
  240.     break;
  241.   case NDB_TRANSPORTER_HUNT:
  242.     {
  243.       NdbTransporterHunt * s = (NdbTransporterHunt*)sig;
  244.       OSE_Transporter * t = getTransporter(s->remoteNodeId);
  245.       if(t != 0)
  246. t->huntReceived(s);
  247.       free_buf(&sig);
  248.     }
  249.     break;
  250.   case NDB_TRANSPORTER_CONNECT_REQ:
  251.     {
  252.       NdbTransporterConnectReq * s = (NdbTransporterConnectReq*)sig;
  253.       OSE_Transporter * t = getTransporter(s->senderNodeId);
  254.       if(t != 0){
  255. if(t->connectReq(s)){
  256.   clearWaitStack(s->senderNodeId);
  257.   clearRecvBuffer(s->senderNodeId);
  258. }
  259.       }
  260.       free_buf(&sig);
  261.     }
  262.     break;
  263.   case NDB_TRANSPORTER_CONNECT_REF:
  264.     {
  265.       NdbTransporterConnectRef * s = (NdbTransporterConnectRef*)sig;
  266.       OSE_Transporter * t = getTransporter(s->senderNodeId);
  267.       if(t != 0){
  268. if(t->connectRef(s)){
  269.   clearWaitStack(s->senderNodeId);
  270.   clearRecvBuffer(s->senderNodeId);
  271. }
  272.       }
  273.       free_buf(&sig);
  274.     }
  275.     break;
  276.   case NDB_TRANSPORTER_CONNECT_CONF:
  277.     {
  278.       NdbTransporterConnectConf * s = (NdbTransporterConnectConf*)sig;
  279.       OSE_Transporter * t = getTransporter(s->senderNodeId);
  280.       if(t != 0){
  281. if(t->connectConf(s)){
  282.   clearWaitStack(s->senderNodeId);
  283.   clearRecvBuffer(s->senderNodeId);
  284. }
  285.       }
  286.       free_buf(&sig);
  287.     }
  288.     break;
  289.   case NDB_TRANSPORTER_DISCONNECT_ORD:
  290.     {
  291.       NdbTransporterDisconnectOrd * s = (NdbTransporterDisconnectOrd*)sig;
  292.       OSE_Transporter * t = getTransporter(s->senderNodeId);
  293.       if(t != 0){
  294. if(t->disconnectOrd(s)){
  295.   clearWaitStack(s->senderNodeId);
  296.   clearRecvBuffer(s->senderNodeId);
  297. }
  298.       }
  299.       free_buf(&sig);
  300.     }
  301.   }
  302.   return true;
  303. }
  304. OSE_Transporter * 
  305. OSE_Receiver::getTransporter(NodeId nodeId){
  306.   if(theTransporterRegistry->theTransporterTypes[nodeId] != tt_OSE_TRANSPORTER)
  307.     return 0;
  308.   return (OSE_Transporter *)
  309.     theTransporterRegistry->theTransporters[nodeId];
  310. }
  311. void
  312. OSE_Receiver::clearRecvBuffer(NodeId nodeId){
  313.   int tmpIndex = 0;
  314.   union SIGNAL** tmp = new union SIGNAL * [recBufSize];
  315.   /**
  316.    * Put all signal that I want to keep into tmp
  317.    */
  318.   while(recBufReadIndex != recBufWriteIndex){
  319.     if(receiveBuffer[recBufReadIndex]->dataSignal.senderNodeId != nodeId){
  320.       tmp[tmpIndex] = receiveBuffer[recBufReadIndex];
  321.       tmpIndex++;
  322.     } else {
  323.       free_buf(&receiveBuffer[recBufReadIndex]);
  324.     }
  325.     recBufReadIndex = (recBufReadIndex + 1) % recBufSize;
  326.   }
  327.   /**
  328.    * Put all signals that I kept back into receiveBuffer
  329.    */
  330.   for(int i = 0; i<tmpIndex; i++)
  331.     insertReceiveBuffer(tmp[i]);
  332.   
  333.   delete [] tmp;
  334. }