SCI_Transporter.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:28k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #include <ndb_global.h> 
  14. #include "SCI_Transporter.hpp" 
  15. #include <NdbOut.hpp> 
  16. #include <NdbSleep.h> 
  17. #include <NdbTick.h> 
  18. #include <NdbTick.h> 
  19. #include "TransporterInternalDefinitions.hpp" 
  20. #include <TransporterCallback.hpp> 
  21. #include <InputStream.hpp>
  22. #include <OutputStream.hpp> 
  23. #define FLAGS 0  
  24. #define DEBUG_TRANSPORTER 
  25. SCI_Transporter::SCI_Transporter(TransporterRegistry &t_reg,
  26.                                  const char *lHostName,
  27.                                  const char *rHostName,
  28.                                  int r_port,
  29.                                  Uint32 packetSize,         
  30.  Uint32 bufferSize,       
  31.  Uint32 nAdapters, 
  32.  Uint16 remoteSciNodeId0,        
  33.  Uint16 remoteSciNodeId1, 
  34.  NodeId _localNodeId,      
  35.  NodeId _remoteNodeId,     
  36.  bool chksm,  
  37.  bool signalId, 
  38.  Uint32 reportFreq) :  
  39.   Transporter(t_reg, tt_SCI_TRANSPORTER,
  40.       lHostName, rHostName, r_port, _localNodeId,
  41.               _remoteNodeId, 0, false, chksm, signalId) 
  42. {
  43.   DBUG_ENTER("SCI_Transporter::SCI_Transporter");
  44.   m_PacketSize = (packetSize + 3)/4 ; 
  45.   m_BufferSize = bufferSize; 
  46.   m_sendBuffer.m_buffer = NULL;
  47.   
  48.   m_RemoteSciNodeId = remoteSciNodeId0; 
  49.    
  50.   if(remoteSciNodeId0 == 0 || remoteSciNodeId1 == 0) 
  51.     m_numberOfRemoteNodes=1; 
  52.   else 
  53.     m_numberOfRemoteNodes=2; 
  54.  
  55.   m_RemoteSciNodeId1 = remoteSciNodeId1; 
  56.  
  57.    
  58.   m_initLocal=false; 
  59.   m_swapCounter=0; 
  60.   m_failCounter=0; 
  61.   m_remoteNodes[0]=remoteSciNodeId0; 
  62.   m_remoteNodes[1]=remoteSciNodeId1; 
  63.   m_adapters = nAdapters;   
  64.   // The maximum number of times to try and create,  
  65.   // start and destroy a sequence 
  66.   m_ActiveAdapterId=0; 
  67.   m_StandbyAdapterId=1; 
  68.   
  69.   m_mapped = false; 
  70.   m_sciinit=false; 
  71.   
  72.   sciAdapters= new SciAdapter[nAdapters* (sizeof (SciAdapter))]; 
  73.   if(sciAdapters==NULL) { 
  74.   } 
  75.   m_SourceSegm= new sourceSegm[nAdapters* (sizeof (sourceSegm))]; 
  76.   if(m_SourceSegm==NULL) { 
  77.   } 
  78.   m_TargetSegm= new targetSegm[nAdapters* (sizeof (targetSegm))]; 
  79.   if(m_TargetSegm==NULL) { 
  80.   } 
  81.   m_reportFreq= reportFreq; 
  82.   
  83.   //reset all statistic counters. 
  84. #ifdef DEBUG_TRANSPORTER 
  85.  i1024=0; 
  86.  i2048=0; 
  87.  i2049=0; 
  88.  i10242048=0; 
  89.  i20484096=0; 
  90.  i4096=0; 
  91.  i4097=0; 
  92. #endif
  93.   DBUG_VOID_RETURN;
  94.  
  95.  
  96.  
  97. void SCI_Transporter::disconnectImpl() 
  98.   DBUG_ENTER("SCI_Transporter::disconnectImpl");
  99.   sci_error_t err; 
  100.   if(m_mapped){ 
  101.     setDisconnect(); 
  102.     DBUG_PRINT("info", ("connect status = %d, remote node = %d",
  103.     (int)getConnectionStatus(), remoteNodeId)); 
  104.     disconnectRemote(); 
  105.     disconnectLocal(); 
  106.   } 
  107.   
  108.   // Empty send buffer 
  109.   m_sendBuffer.m_dataSize = 0;
  110.   m_initLocal=false; 
  111.   m_mapped = false; 
  112.   
  113.   if(m_sciinit) { 
  114.     for(Uint32 i=0; i<m_adapters ; i++) {       
  115.       SCIClose(sciAdapters[i].scidesc, FLAGS, &err);  
  116.       
  117.       if(err != SCI_ERR_OK)  { 
  118. report_error(TE_SCI_UNABLE_TO_CLOSE_CHANNEL); 
  119.         DBUG_PRINT("error", ("Cannot close channel to the driver. Error code 0x%x",  
  120.     err)); 
  121.       } 
  122.     } 
  123.   } 
  124.   m_sciinit=false; 
  125.    
  126. #ifdef DEBUG_TRANSPORTER 
  127.       ndbout << "total: " <<  i1024+ i10242048 + i2048+i2049 << endl; 
  128.       ndbout << "<1024: " << i1024 << endl; 
  129.       ndbout << "1024-2047: " << i10242048 << endl; 
  130.       ndbout << "==2048: " << i2048 << endl; 
  131.       ndbout << "2049-4096: " << i20484096 << endl; 
  132.       ndbout << "==4096: " << i4096 << endl; 
  133.       ndbout << ">4096: " << i4097 << endl; 
  134. #endif 
  135.   DBUG_VOID_RETURN;  
  136. }  
  137.  
  138.  
  139. bool SCI_Transporter::initTransporter() { 
  140.   DBUG_ENTER("SCI_Transporter::initTransporter");
  141.   if(m_BufferSize < (2*MAX_MESSAGE_SIZE + 4096)){ 
  142.     m_BufferSize = 2 * MAX_MESSAGE_SIZE + 4096; 
  143.   } 
  144.   // Allocate buffers for sending, send buffer size plus 2048 bytes for avoiding
  145.   // the need to send twice when a large message comes around. Send buffer size is
  146.   // measured in words. 
  147.   Uint32 sz = 4 * m_PacketSize + MAX_MESSAGE_SIZE;;
  148.   
  149.   m_sendBuffer.m_sendBufferSize = 4 * ((sz + 3) / 4); 
  150.   m_sendBuffer.m_buffer = new Uint32[m_sendBuffer.m_sendBufferSize / 4];
  151.   m_sendBuffer.m_dataSize = 0;
  152.  
  153.   DBUG_PRINT("info", ("Created SCI Send Buffer with buffer size %d and packet size %d",
  154.               m_sendBuffer.m_sendBufferSize, m_PacketSize * 4));
  155.   if(!getLinkStatus(m_ActiveAdapterId) ||  
  156.      (m_adapters > 1 &&
  157.      !getLinkStatus(m_StandbyAdapterId))) { 
  158.     DBUG_PRINT("error", ("The link is not fully operational. Check the cables and the switches")); 
  159.     //reportDisconnect(remoteNodeId, 0); 
  160.     //doDisconnect(); 
  161.     //NDB should terminate 
  162.     report_error(TE_SCI_LINK_ERROR); 
  163.     DBUG_RETURN(false); 
  164.   } 
  165.   
  166.   DBUG_RETURN(true); 
  167. } // initTransporter()  
  168.  
  169.  
  170. Uint32 SCI_Transporter::getLocalNodeId(Uint32 adapterNo) 
  171.   sci_query_adapter_t queryAdapter; 
  172.   sci_error_t  error; 
  173.   Uint32 _localNodeId; 
  174.    
  175.   queryAdapter.subcommand = SCI_Q_ADAPTER_NODEID; 
  176.   queryAdapter.localAdapterNo = adapterNo; 
  177.   queryAdapter.data = &_localNodeId; 
  178.    
  179.   SCIQuery(SCI_Q_ADAPTER,(void*)(&queryAdapter),(Uint32)NULL,&error); 
  180.    
  181.   if(error != SCI_ERR_OK) 
  182.     return 0; 
  183.   return _localNodeId;  
  184.  
  185.  
  186. bool SCI_Transporter::getLinkStatus(Uint32 adapterNo) 
  187.   sci_query_adapter_t queryAdapter; 
  188.   sci_error_t  error; 
  189.   int linkstatus; 
  190.   queryAdapter.subcommand = SCI_Q_ADAPTER_LINK_OPERATIONAL; 
  191.    
  192.   queryAdapter.localAdapterNo = adapterNo; 
  193.   queryAdapter.data = &linkstatus; 
  194.    
  195.   SCIQuery(SCI_Q_ADAPTER,(void*)(&queryAdapter),(Uint32)NULL,&error); 
  196.    
  197.   if(error != SCI_ERR_OK) { 
  198.     DBUG_PRINT("error", ("error %d querying adapter", error)); 
  199.     return false; 
  200.   } 
  201.   if(linkstatus<=0) 
  202.     return false; 
  203.   return true; 
  204.  
  205.  
  206.  
  207. sci_error_t SCI_Transporter::initLocalSegment() { 
  208.   DBUG_ENTER("SCI_Transporter::initLocalSegment");
  209.   Uint32 segmentSize = m_BufferSize; 
  210.   Uint32 offset  = 0; 
  211.   sci_error_t err; 
  212.   if(!m_sciinit) { 
  213.     for(Uint32 i=0; i<m_adapters ; i++) { 
  214.       SCIOpen(&(sciAdapters[i].scidesc), FLAGS, &err); 
  215.       sciAdapters[i].localSciNodeId=getLocalNodeId(i); 
  216.       DBUG_PRINT("info", ("SCInode iD %d  adapter %dn",  
  217.          sciAdapters[i].localSciNodeId, i)); 
  218.       if(err != SCI_ERR_OK) { 
  219.         DBUG_PRINT("error", ("Cannot open an SCI virtual device. Error code 0x%x", 
  220.    err)); 
  221. DBUG_RETURN(err); 
  222.       } 
  223.     } 
  224.   } 
  225.    
  226.   m_sciinit=true; 
  227.  
  228.   SCICreateSegment(sciAdapters[0].scidesc,            
  229.    &(m_SourceSegm[0].localHandle),  
  230.    hostSegmentId(localNodeId, remoteNodeId),    
  231.    segmentSize,                
  232.    0, 
  233.    0, 
  234.    0,         
  235.    &err);             
  236.    
  237.   if(err != SCI_ERR_OK) { 
  238.     DBUG_PRINT("error", ("Error creating segment, err = 0x%x", err));
  239.     DBUG_RETURN(err); 
  240.   } else { 
  241.     DBUG_PRINT("info", ("created segment id : %d",
  242.        hostSegmentId(localNodeId, remoteNodeId))); 
  243.   } 
  244.    
  245.   /** Prepare the segment*/ 
  246.   for(Uint32 i=0; i < m_adapters; i++) { 
  247.     SCIPrepareSegment((m_SourceSegm[0].localHandle),  
  248.       i, 
  249.       FLAGS, 
  250.       &err); 
  251.      
  252.     if(err != SCI_ERR_OK) { 
  253.       DBUG_PRINT("error", ("Local Segment is not accessible by an SCI adapter. Error code 0x%xn",
  254.                   err)); 
  255.       DBUG_RETURN(err); 
  256.     } 
  257.   } 
  258.  
  259.   
  260.   m_SourceSegm[0].mappedMemory =  
  261.     SCIMapLocalSegment((m_SourceSegm[0].localHandle), 
  262.        &(m_SourceSegm[0].lhm[0].map), 
  263.        offset, 
  264.        segmentSize, 
  265.        NULL, 
  266.        FLAGS, 
  267.        &err); 
  268.  
  269.  
  270.  
  271.   if(err != SCI_ERR_OK) { 
  272.     DBUG_PRINT("error", ("Cannot map area of size %d. Error code 0x%x", 
  273.         segmentSize,err)); 
  274.     doDisconnect(); 
  275.     DBUG_RETURN(err); 
  276.   } 
  277.   
  278.   
  279.   /** Make the local segment available*/ 
  280.   for(Uint32 i=0; i < m_adapters; i++) { 
  281.     SCISetSegmentAvailable((m_SourceSegm[0].localHandle),  
  282.      i, 
  283.    FLAGS, 
  284.    &err); 
  285.      
  286.     if(err != SCI_ERR_OK) { 
  287.       DBUG_PRINT("error", ("Local Segment is not available for remote connections. Error code 0x%xn",
  288.                  err)); 
  289.       DBUG_RETURN(err); 
  290.     } 
  291.   } 
  292.   
  293.   
  294.   setupLocalSegment(); 
  295.   
  296.   DBUG_RETURN(err); 
  297.    
  298. } // initLocalSegment() 
  299.  
  300.  
  301. bool SCI_Transporter::doSend() { 
  302. #ifdef DEBUG_TRANSPORTER  
  303.   NDB_TICKS startSec=0, stopSec=0; 
  304.   Uint32 startMicro=0, stopMicro=0, totalMicro=0; 
  305. #endif
  306.   sci_error_t             err; 
  307.   Uint32 retry=0; 
  308.  
  309.   const char * const sendPtr = (char*)m_sendBuffer.m_buffer;
  310.   const Uint32 sizeToSend    = 4 * m_sendBuffer.m_dataSize; //Convert to number of bytes
  311.   
  312.   if (sizeToSend > 0){
  313. #ifdef DEBUG_TRANSPORTER 
  314.     if(sizeToSend < 1024 ) 
  315.       i1024++; 
  316.     if(sizeToSend > 1024 && sizeToSend < 2048 ) 
  317.       i10242048++; 
  318.     if(sizeToSend==2048) 
  319.       i2048++; 
  320.     if(sizeToSend>2048 && sizeToSend < 4096) 
  321.       i20484096++; 
  322.     if(sizeToSend==4096) 
  323.       i4096++; 
  324.     if(sizeToSend==4097) 
  325.       i4097++; 
  326. #endif
  327.     if(startSequence(m_ActiveAdapterId)!=SCI_ERR_OK) { 
  328.       DBUG_PRINT("error", ("Start sequence failed")); 
  329.       report_error(TE_SCI_UNABLE_TO_START_SEQUENCE); 
  330.       return false; 
  331.     } 
  332.     
  333.       
  334.   tryagain:
  335.     retry++;
  336.     if (retry > 3) { 
  337.       DBUG_PRINT("error", ("SCI Transfer failed"));
  338.       report_error(TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR);
  339.       return false; 
  340.     } 
  341.     Uint32 * insertPtr = (Uint32 *) 
  342.       (m_TargetSegm[m_ActiveAdapterId].writer)->getWritePtr(sizeToSend); 
  343.     
  344.     if(insertPtr != 0) {    
  345.       
  346.       const Uint32 remoteOffset=(Uint32) 
  347. ((char*)insertPtr -  
  348.  (char*)(m_TargetSegm[m_ActiveAdapterId].mappedMemory)); 
  349.       
  350.       SCIMemCpy(m_TargetSegm[m_ActiveAdapterId].sequence, 
  351. (void*)sendPtr, 
  352. m_TargetSegm[m_ActiveAdapterId].rhm[m_ActiveAdapterId].map, 
  353. remoteOffset, 
  354. sizeToSend, 
  355. SCI_FLAG_ERROR_CHECK, 
  356. &err);   
  357.       
  358.       
  359.       if (err != SCI_ERR_OK) { 
  360.       if(err == SCI_ERR_OUT_OF_RANGE) { 
  361.         DBUG_PRINT("error", ("Data transfer : out of range error")); 
  362. goto tryagain; 
  363.       } 
  364.       if(err == SCI_ERR_SIZE_ALIGNMENT) { 
  365.         DBUG_PRINT("error", ("Data transfer : alignment error")); 
  366.         DBUG_PRINT("info", ("sendPtr 0x%x, sizeToSend = %d", sendPtr, sizeToSend));
  367. goto tryagain; 
  368.       } 
  369.       if(err == SCI_ERR_OFFSET_ALIGNMENT) { 
  370.         DBUG_PRINT("error", ("Data transfer : offset alignment")); 
  371. goto tryagain; 
  372.       }   
  373.       if(err == SCI_ERR_TRANSFER_FAILED) { 
  374. //(m_TargetSegm[m_StandbyAdapterId].writer)->heavyLock(); 
  375. if(getLinkStatus(m_ActiveAdapterId)) { 
  376.   goto tryagain; 
  377. }
  378.         if (m_adapters == 1) {
  379.           DBUG_PRINT("error", ("SCI Transfer failed"));
  380.           report_error(TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR);
  381.   return false; 
  382.         }
  383. m_failCounter++; 
  384. Uint32 temp=m_ActiveAdapterId;           
  385. switch(m_swapCounter) { 
  386. case 0:  
  387.   /**swap from active (0) to standby (1)*/ 
  388.   if(getLinkStatus(m_StandbyAdapterId)) { 
  389.             DBUG_PRINT("error", ("Swapping from adapter 0 to 1")); 
  390.     failoverShmWriter();  
  391.     SCIStoreBarrier(m_TargetSegm[m_StandbyAdapterId].sequence,0); 
  392.     m_ActiveAdapterId=m_StandbyAdapterId; 
  393.     m_StandbyAdapterId=temp; 
  394.     SCIRemoveSequence((m_TargetSegm[m_StandbyAdapterId].sequence),
  395.       FLAGS,  
  396.       &err); 
  397.     if(err!=SCI_ERR_OK) { 
  398.       report_error(TE_SCI_UNABLE_TO_REMOVE_SEQUENCE); 
  399.               DBUG_PRINT("error", ("Unable to remove sequence"));
  400.       return false; 
  401.     } 
  402.     if(startSequence(m_ActiveAdapterId)!=SCI_ERR_OK) { 
  403.               DBUG_PRINT("error", ("Start sequence failed")); 
  404.       report_error(TE_SCI_UNABLE_TO_START_SEQUENCE); 
  405.       return false; 
  406.     } 
  407.     m_swapCounter++; 
  408.             DBUG_PRINT("info", ("failover complete")); 
  409.     goto tryagain; 
  410.   }  else {
  411.     report_error(TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR);
  412.             DBUG_PRINT("error", ("SCI Transfer failed")); 
  413.     return false;
  414.   }
  415.   return false; 
  416.   break; 
  417. case 1: 
  418.   /** swap back from 1 to 0 
  419.       must check that the link is up */ 
  420.   
  421.   if(getLinkStatus(m_StandbyAdapterId)) { 
  422.     failoverShmWriter(); 
  423.     m_ActiveAdapterId=m_StandbyAdapterId; 
  424.     m_StandbyAdapterId=temp; 
  425.             DBUG_PRINT("info", ("Swapping from 1 to 0"));  
  426.     if(createSequence(m_ActiveAdapterId)!=SCI_ERR_OK) { 
  427.               DBUG_PRINT("error", ("Unable to create sequence"));
  428.       report_error(TE_SCI_UNABLE_TO_CREATE_SEQUENCE); 
  429.       return false; 
  430.     } 
  431.     if(startSequence(m_ActiveAdapterId)!=SCI_ERR_OK) { 
  432.               DBUG_PRINT("error", ("startSequence failed... disconnecting")); 
  433.       report_error(TE_SCI_UNABLE_TO_START_SEQUENCE); 
  434.       return false; 
  435.     } 
  436.     
  437.     SCIRemoveSequence((m_TargetSegm[m_StandbyAdapterId].sequence) 
  438.       , FLAGS,  
  439.       &err); 
  440.     if(err!=SCI_ERR_OK) { 
  441.               DBUG_PRINT("error", ("Unable to remove sequence"));
  442.       report_error(TE_SCI_UNABLE_TO_REMOVE_SEQUENCE); 
  443.       return false;
  444.     } 
  445.     
  446.     if(createSequence(m_StandbyAdapterId)!=SCI_ERR_OK) { 
  447.               DBUG_PRINT("error", ("Unable to create sequence on standby"));
  448.       report_error(TE_SCI_UNABLE_TO_CREATE_SEQUENCE); 
  449.       return false; 
  450.     } 
  451.     
  452.     m_swapCounter=0; 
  453.     
  454.             DBUG_PRINT("info", ("failover complete..")); 
  455.     goto tryagain; 
  456.     
  457.   } else {
  458.             DBUG_PRINT("error", ("Unrecoverable data transfer error")); 
  459.     report_error(TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR);
  460.     return false;
  461.   }
  462.   
  463.   break; 
  464. default: 
  465.           DBUG_PRINT("error", ("Unrecoverable data transfer error")); 
  466.   report_error(TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR); 
  467.   return false; 
  468.   break; 
  469. }  
  470.       }
  471.       } else { 
  472. SHM_Writer * writer = (m_TargetSegm[m_ActiveAdapterId].writer);
  473. writer->updateWritePtr(sizeToSend); 
  474. Uint32 sendLimit = writer->getBufferSize();
  475. sendLimit -= writer->getWriteIndex();
  476. m_sendBuffer.m_dataSize = 0;
  477. m_sendBuffer.m_forceSendLimit = sendLimit;
  478.       } 
  479.       
  480.     } else { 
  481.       /** 
  482.        * If we end up here, the SCI segment is full.  
  483.        */ 
  484.       DBUG_PRINT("error", ("the segment is full for some reason")); 
  485.       return false; 
  486.     } //if  
  487.   } 
  488.   return true; 
  489. } // doSend() 
  490.  
  491.  
  492. void SCI_Transporter::failoverShmWriter() { 
  493. #if 0
  494.   (m_TargetSegm[m_StandbyAdapterId].writer)
  495.     ->copyIndexes((m_TargetSegm[m_StandbyAdapterId].writer));
  496. #endif
  497. } //failoverShm 
  498.  
  499.  
  500. void SCI_Transporter::setupLocalSegment()   
  501.    DBUG_ENTER("SCI_Transporter::setupLocalSegment"); 
  502.    Uint32 sharedSize = 0; 
  503.    sharedSize =4096;   //start of the buffer is page aligend 
  504.     
  505.    Uint32 sizeOfBuffer = m_BufferSize; 
  506.  
  507.    sizeOfBuffer -= sharedSize; 
  508.  
  509.    Uint32 * localReadIndex =  
  510.      (Uint32*)m_SourceSegm[m_ActiveAdapterId].mappedMemory;  
  511.    Uint32 * localWriteIndex =  (Uint32*)(localReadIndex+ 1); 
  512.    m_localStatusFlag = (Uint32*)(localReadIndex + 3); 
  513.  
  514.    char * localStartOfBuf = (char*)  
  515.      ((char*)m_SourceSegm[m_ActiveAdapterId].mappedMemory+sharedSize); 
  516.  
  517.    * localReadIndex = 0; 
  518.    * localWriteIndex = 0; 
  519.    const Uint32 slack = MAX_MESSAGE_SIZE;
  520.    reader = new SHM_Reader(localStartOfBuf,  
  521.    sizeOfBuffer, 
  522.    slack,
  523.    localReadIndex, 
  524.    localWriteIndex);
  525.     
  526.    reader->clear(); 
  527.    DBUG_VOID_RETURN;
  528. } //setupLocalSegment 
  529.  
  530.  
  531.  
  532. void SCI_Transporter::setupRemoteSegment()   
  533.    DBUG_ENTER("SCI_Transporter::setupRemoteSegment");
  534.    Uint32 sharedSize = 0; 
  535.    sharedSize =4096;   //start of the buffer is page aligned 
  536.  
  537.  
  538.    Uint32 sizeOfBuffer = m_BufferSize; 
  539.    const Uint32 slack = MAX_MESSAGE_SIZE;
  540.    sizeOfBuffer -= sharedSize; 
  541.    Uint32 *segPtr = (Uint32*) m_TargetSegm[m_ActiveAdapterId].mappedMemory ;   
  542.     
  543.    Uint32 * remoteReadIndex = (Uint32*)segPtr;  
  544.    Uint32 * remoteWriteIndex = (Uint32*)(segPtr + 1); 
  545.    m_remoteStatusFlag = (Uint32*)(segPtr + 3);
  546.     
  547.    char * remoteStartOfBuf = ( char*)((char*)segPtr+(sharedSize)); 
  548.     
  549.    writer = new SHM_Writer(remoteStartOfBuf,  
  550.    sizeOfBuffer, 
  551.    slack,
  552.    remoteReadIndex, 
  553.    remoteWriteIndex);
  554.    
  555.    writer->clear(); 
  556.     
  557.    m_TargetSegm[0].writer=writer; 
  558.  
  559.    m_sendBuffer.m_forceSendLimit = writer->getBufferSize();
  560.     
  561.    if(createSequence(m_ActiveAdapterId)!=SCI_ERR_OK) { 
  562.      report_error(TE_SCI_UNABLE_TO_CREATE_SEQUENCE); 
  563.      DBUG_PRINT("error", ("Unable to create sequence on active"));
  564.      doDisconnect(); 
  565.    } 
  566.    if (m_adapters > 1) {
  567.      segPtr = (Uint32*) m_TargetSegm[m_StandbyAdapterId].mappedMemory ; 
  568.     
  569.      Uint32 * remoteReadIndex2 = (Uint32*)segPtr;  
  570.      Uint32 * remoteWriteIndex2 = (Uint32*) (segPtr + 1); 
  571.      m_remoteStatusFlag2 = (Uint32*)(segPtr + 3);
  572.     
  573.      char * remoteStartOfBuf2 = ( char*)((char *)segPtr+sharedSize); 
  574.     
  575.      /** 
  576.       * setup a writer. writer2 is used to mirror the changes of 
  577.       * writer on the standby 
  578.       * segment, so that in the case of a failover, we can switch 
  579.       * to the stdby seg. quickly.* 
  580.       */ 
  581.      writer2 = new SHM_Writer(remoteStartOfBuf2,  
  582.                               sizeOfBuffer, 
  583.                               slack,
  584.                               remoteReadIndex2, 
  585.                               remoteWriteIndex2);
  586.      * remoteReadIndex = 0; 
  587.      * remoteWriteIndex = 0; 
  588.      writer2->clear(); 
  589.      m_TargetSegm[1].writer=writer2; 
  590.      if(createSequence(m_StandbyAdapterId)!=SCI_ERR_OK) { 
  591.        report_error(TE_SCI_UNABLE_TO_CREATE_SEQUENCE); 
  592.        DBUG_PRINT("error", ("Unable to create sequence on standby"));
  593.        doDisconnect(); 
  594.      } 
  595.    }
  596.    DBUG_VOID_RETURN; 
  597. } //setupRemoteSegment 
  598. bool
  599. SCI_Transporter::init_local()
  600. {
  601.   DBUG_ENTER("SCI_Transporter::init_local");
  602.   if(!m_initLocal) { 
  603.     if(initLocalSegment()!=SCI_ERR_OK){ 
  604.       NdbSleep_MilliSleep(10);
  605.       //NDB SHOULD TERMINATE AND COMPUTER REBOOTED! 
  606.       report_error(TE_SCI_CANNOT_INIT_LOCALSEGMENT);
  607.       DBUG_RETURN(false);
  608.     } 
  609.     m_initLocal=true;
  610.   } 
  611.   DBUG_RETURN(true);
  612. }
  613. bool
  614. SCI_Transporter::init_remote()
  615. {
  616.   DBUG_ENTER("SCI_Transporter::init_remote");
  617.   sci_error_t err; 
  618.   Uint32 offset = 0;
  619.   if(!m_mapped ) {
  620.     DBUG_PRINT("info", ("Map remote segments"));
  621.     for(Uint32 i=0; i < m_adapters ; i++) {
  622.       m_TargetSegm[i].rhm[i].remoteHandle=0;
  623.       SCIConnectSegment(sciAdapters[i].scidesc,
  624.                         &(m_TargetSegm[i].rhm[i].remoteHandle),
  625.                         m_remoteNodes[i],
  626.                         remoteSegmentId(localNodeId, remoteNodeId),
  627.                         i,
  628.                         0,
  629.                         0,
  630.                         0,
  631.                         0,
  632.                         &err);
  633.       if(err != SCI_ERR_OK) {
  634.         NdbSleep_MilliSleep(10);
  635.         DBUG_PRINT("error", ("Error connecting segment, err 0x%x", err));
  636.         DBUG_RETURN(false);
  637.       }
  638.     }
  639.     // Map the remote memory segment into program space  
  640.     for(Uint32 i=0; i < m_adapters ; i++) {
  641.       m_TargetSegm[i].mappedMemory =
  642.         SCIMapRemoteSegment((m_TargetSegm[i].rhm[i].remoteHandle),
  643.                             &(m_TargetSegm[i].rhm[i].map),
  644.                             offset,
  645.                             m_BufferSize,
  646.                             NULL,
  647.                             FLAGS,
  648.                             &err);
  649.         if(err!= SCI_ERR_OK) {
  650.           DBUG_PRINT("error", ("Cannot map a segment to the remote node %d. Error code 0x%x",m_RemoteSciNodeId, err));
  651.           //NDB SHOULD TERMINATE AND COMPUTER REBOOTED! 
  652.           report_error(TE_SCI_CANNOT_MAP_REMOTESEGMENT);
  653.           DBUG_RETURN(false);
  654.         }
  655.     }
  656.     m_mapped=true;
  657.     setupRemoteSegment();
  658.     setConnected();
  659.     DBUG_PRINT("info", ("connected and mapped to segment, remoteNode: %d",
  660.                remoteNodeId));
  661.     DBUG_PRINT("info", ("remoteSegId: %d",
  662.                remoteSegmentId(localNodeId, remoteNodeId)));
  663.     DBUG_RETURN(true);
  664.   } else {
  665.     DBUG_RETURN(getConnectionStatus());
  666.   }
  667. }
  668. bool
  669. SCI_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd)
  670. {
  671.   SocketInputStream s_input(sockfd);
  672.   SocketOutputStream s_output(sockfd);
  673.   char buf[256];
  674.   DBUG_ENTER("SCI_Transporter::connect_client_impl");
  675.   // Wait for server to create and attach
  676.   if (s_input.gets(buf, 256) == 0) {
  677.     DBUG_PRINT("error", ("No initial response from server in SCI"));
  678.     NDB_CLOSE_SOCKET(sockfd);
  679.     DBUG_RETURN(false);
  680.   }
  681.   if (!init_local()) {
  682.     NDB_CLOSE_SOCKET(sockfd);
  683.     DBUG_RETURN(false);
  684.   }
  685.   // Send ok to server
  686.   s_output.println("sci client 1 ok");
  687.   if (!init_remote()) {
  688.     NDB_CLOSE_SOCKET(sockfd);
  689.     DBUG_RETURN(false);
  690.   }
  691.   // Wait for ok from server
  692.   if (s_input.gets(buf, 256) == 0) {
  693.     DBUG_PRINT("error", ("No second response from server in SCI"));
  694.     NDB_CLOSE_SOCKET(sockfd);
  695.     DBUG_RETURN(false);
  696.   }
  697.   // Send ok to server
  698.   s_output.println("sci client 2 ok");
  699.   NDB_CLOSE_SOCKET(sockfd);
  700.   DBUG_PRINT("info", ("Successfully connected client to node %d",
  701.               remoteNodeId));
  702.   DBUG_RETURN(true);
  703. }
  704. bool
  705. SCI_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd)
  706. {
  707.   SocketOutputStream s_output(sockfd);
  708.   SocketInputStream s_input(sockfd);
  709.   char buf[256];
  710.   DBUG_ENTER("SCI_Transporter::connect_server_impl");
  711.   if (!init_local()) {
  712.     NDB_CLOSE_SOCKET(sockfd);
  713.     DBUG_RETURN(false);
  714.   }
  715.   // Send ok to client
  716.   s_output.println("sci server 1 ok");
  717.   // Wait for ok from client
  718.   if (s_input.gets(buf, 256) == 0) {
  719.     DBUG_PRINT("error", ("No response from client in SCI"));
  720.     NDB_CLOSE_SOCKET(sockfd);
  721.     DBUG_RETURN(false);
  722.   }
  723.   if (!init_remote()) {
  724.     NDB_CLOSE_SOCKET(sockfd);
  725.     DBUG_RETURN(false);
  726.   }
  727.   // Send ok to client
  728.   s_output.println("sci server 2 ok");
  729.   // Wait for ok from client
  730.   if (s_input.gets(buf, 256) == 0) {
  731.     DBUG_PRINT("error", ("No second response from client in SCI"));
  732.     NDB_CLOSE_SOCKET(sockfd);
  733.     DBUG_RETURN(false);
  734.   }
  735.   NDB_CLOSE_SOCKET(sockfd);
  736.   DBUG_PRINT("info", ("Successfully connected server to node %d",
  737.               remoteNodeId));
  738.   DBUG_RETURN(true);
  739. }
  740.  
  741. sci_error_t SCI_Transporter::createSequence(Uint32 adapterid) { 
  742.   sci_error_t err; 
  743.   SCICreateMapSequence((m_TargetSegm[adapterid].rhm[adapterid].map),  
  744.        &(m_TargetSegm[adapterid].sequence),  
  745.        SCI_FLAG_FAST_BARRIER,  
  746.        &err);  
  747.   
  748.   
  749.   return err; 
  750. } // createSequence()  
  751.  
  752.  
  753. sci_error_t SCI_Transporter::startSequence(Uint32 adapterid) { 
  754.   
  755.   sci_error_t err; 
  756.   /** Perform preliminary error check on an SCI adapter before starting a 
  757.    * sequence of read and write operations on the mapped segment. 
  758.    */ 
  759.   m_SequenceStatus = SCIStartSequence( 
  760.        (m_TargetSegm[adapterid].sequence),  
  761.        FLAGS, &err); 
  762.    
  763.    
  764.   // If there still is an error then data cannot be safely send 
  765.   return err; 
  766. } // startSequence() 
  767.  
  768.    
  769.  
  770. bool SCI_Transporter::disconnectLocal()  
  771. {
  772.   DBUG_ENTER("SCI_Transporter::disconnectLocal"); 
  773.   sci_error_t err; 
  774.   m_ActiveAdapterId=0; 
  775.  
  776.   /** Free resources used by a local segment 
  777.    */ 
  778.  
  779.   SCIUnmapSegment(m_SourceSegm[0].lhm[0].map,0,&err); 
  780.   if(err!=SCI_ERR_OK) { 
  781.     report_error(TE_SCI_UNABLE_TO_UNMAP_SEGMENT); 
  782.     DBUG_PRINT("error", ("Unable to unmap segment"));
  783.     DBUG_RETURN(false); 
  784.   } 
  785.  
  786.   SCIRemoveSegment((m_SourceSegm[m_ActiveAdapterId].localHandle), 
  787.    FLAGS, 
  788.    &err); 
  789.   
  790.   if(err!=SCI_ERR_OK) { 
  791.     report_error(TE_SCI_UNABLE_TO_REMOVE_SEGMENT); 
  792.     DBUG_PRINT("error", ("Unable to remove segment"));
  793.     DBUG_RETURN(false); 
  794.   } 
  795.   DBUG_PRINT("info", ("Local memory segment is unmapped and removed")); 
  796.   DBUG_RETURN(true); 
  797. } // disconnectLocal() 
  798.  
  799.  
  800. bool SCI_Transporter::disconnectRemote()  { 
  801.   DBUG_ENTER("SCI_Transporter::disconnectRemote");
  802.   sci_error_t err; 
  803.   for(Uint32 i=0; i<m_adapters; i++) { 
  804.     /** 
  805.      * Segment unmapped, disconnect from the remotely connected segment 
  806.      */   
  807.     SCIUnmapSegment(m_TargetSegm[i].rhm[i].map,0,&err); 
  808.     if(err!=SCI_ERR_OK) { 
  809.       report_error(TE_SCI_UNABLE_TO_UNMAP_SEGMENT); 
  810.       DBUG_PRINT("error", ("Unable to unmap segment"));
  811.       DBUG_RETURN(false); 
  812.     } 
  813.  
  814.     SCIDisconnectSegment(m_TargetSegm[i].rhm[i].remoteHandle, 
  815.  FLAGS, 
  816.  &err); 
  817.     if(err!=SCI_ERR_OK) { 
  818.       report_error(TE_SCI_UNABLE_TO_DISCONNECT_SEGMENT); 
  819.       DBUG_PRINT("error", ("Unable to disconnect segment"));
  820.       DBUG_RETURN(false); 
  821.     } 
  822.     DBUG_PRINT("info", ("Remote memory segment is unmapped and disconnected")); 
  823.   } 
  824.   DBUG_RETURN(true); 
  825. } // disconnectRemote() 
  826. SCI_Transporter::~SCI_Transporter() { 
  827.   DBUG_ENTER("SCI_Transporter::~SCI_Transporter");
  828.   // Close channel to the driver 
  829.   doDisconnect(); 
  830.   if(m_sendBuffer.m_buffer != NULL)
  831.     delete[] m_sendBuffer.m_buffer;
  832.   DBUG_VOID_RETURN;
  833. } // ~SCI_Transporter() 
  834.  
  835.  
  836.  
  837.  
  838. void SCI_Transporter::closeSCI() { 
  839.   // Termination of SCI 
  840.   sci_error_t err; 
  841.   DBUG_ENTER("SCI_Transporter::closeSCI");
  842.    
  843.   // Disconnect and remove remote segment 
  844.   disconnectRemote(); 
  845.  
  846.   // Unmap and remove local segment 
  847.    
  848.   disconnectLocal(); 
  849.    
  850.   // Closes an SCI virtual device 
  851.   SCIClose(activeSCIDescriptor, FLAGS, &err);  
  852.    
  853.   if(err != SCI_ERR_OK) {
  854.     DBUG_PRINT("error", ("Cannot close SCI channel to the driver. Error code 0x%x",  
  855.         err)); 
  856.   }
  857.   SCITerminate(); 
  858.   DBUG_VOID_RETURN;
  859. } // closeSCI() 
  860.  
  861. Uint32 *
  862. SCI_Transporter::getWritePtr(Uint32 lenBytes, Uint32 prio)
  863. {
  864.   Uint32 sci_buffer_remaining = m_sendBuffer.m_forceSendLimit;
  865.   Uint32 send_buf_size = m_sendBuffer.m_sendBufferSize;
  866.   Uint32 curr_data_size = m_sendBuffer.m_dataSize << 2;
  867.   Uint32 new_curr_data_size = curr_data_size + lenBytes;
  868.   if ((curr_data_size >= send_buf_size) ||
  869.       (curr_data_size >= sci_buffer_remaining)) {
  870.     /**
  871.      * The new message will not fit in the send buffer. We need to
  872.      * send the send buffer before filling it up with the new
  873.      * signal data. If current data size will spill over buffer edge
  874.      * we will also send to ensure correct operation.
  875.      */  
  876.     if (!doSend()) { 
  877.       /**
  878.        * We were not successfull sending, report 0 as meaning buffer full and
  879.        * upper levels handle retries and other recovery matters.
  880.        */
  881.       return 0;
  882.     }
  883.   }
  884.   /**
  885.    * New signal fits, simply fill it up with more data.
  886.    */
  887.   Uint32 sz = m_sendBuffer.m_dataSize;
  888.   return &m_sendBuffer.m_buffer[sz];
  889. }
  890. void
  891. SCI_Transporter::updateWritePtr(Uint32 lenBytes, Uint32 prio){
  892.   
  893.   Uint32 sz = m_sendBuffer.m_dataSize;
  894.   Uint32 packet_size = m_PacketSize;
  895.   sz += ((lenBytes + 3) >> 2);
  896.   m_sendBuffer.m_dataSize = sz;
  897.   
  898.   if(sz > packet_size) { 
  899.     /**------------------------------------------------- 
  900.      * Buffer is full and we are ready to send. We will 
  901.      * not wait since the signal is already in the buffer. 
  902.      * Force flag set has the same indication that we 
  903.      * should always send. If it is not possible to send 
  904.      * we will not worry since we will soon be back for 
  905.      * a renewed trial. 
  906.      *------------------------------------------------- 
  907.      */ 
  908.     doSend();
  909.   }
  910. }
  911. enum SciStatus {
  912.   SCIDISCONNECT = 1,
  913.   SCICONNECTED  = 2
  914. };
  915.  
  916. bool 
  917. SCI_Transporter::getConnectionStatus() { 
  918.   if(*m_localStatusFlag == SCICONNECTED &&  
  919.      (*m_remoteStatusFlag == SCICONNECTED || 
  920.      ((m_adapters > 1) &&
  921.       *m_remoteStatusFlag2 == SCICONNECTED))) 
  922.     return true; 
  923.   else 
  924.     return false; 
  925.  
  926.  
  927. void  
  928. SCI_Transporter::setConnected() { 
  929.   *m_remoteStatusFlag = SCICONNECTED; 
  930.   if (m_adapters > 1) {
  931.     *m_remoteStatusFlag2 = SCICONNECTED; 
  932.   }
  933.   *m_localStatusFlag = SCICONNECTED; 
  934.  
  935.  
  936. void  
  937. SCI_Transporter::setDisconnect() { 
  938.   if(getLinkStatus(m_ActiveAdapterId)) 
  939.     *m_remoteStatusFlag = SCIDISCONNECT; 
  940.   if (m_adapters > 1) {
  941.     if(getLinkStatus(m_StandbyAdapterId)) 
  942.       *m_remoteStatusFlag2 = SCIDISCONNECT; 
  943.   }
  944.  
  945.  
  946. bool 
  947. SCI_Transporter::checkConnected() { 
  948.   if (*m_localStatusFlag == SCIDISCONNECT) { 
  949.     return false; 
  950.   } 
  951.   else 
  952.     return true; 
  953.  
  954. static bool init = false; 
  955.  
  956. bool  
  957. SCI_Transporter::initSCI() { 
  958.   DBUG_ENTER("SCI_Transporter::initSCI");
  959.   if(!init){ 
  960.     sci_error_t error; 
  961.     // Initialize SISCI library 
  962.     SCIInitialize(0, &error); 
  963.     if(error != SCI_ERR_OK)  { 
  964.       DBUG_PRINT("error", ("Cannot initialize SISCI library."));
  965.       DBUG_PRINT("error", ("Inconsistency between SISCI library and SISCI driver. Error code 0x%x",
  966.                  error)); 
  967.       DBUG_RETURN(false);
  968.     } 
  969.     init = true; 
  970.   } 
  971.   DBUG_RETURN(true);
  972.  
  973. Uint32
  974. SCI_Transporter::get_free_buffer() const
  975. {
  976.   return (m_TargetSegm[m_ActiveAdapterId].writer)->get_free_buffer();
  977. }