TransporterCallback.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:12k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #include <ndb_global.h>
  14. #include <TransporterCallback.hpp>
  15. #include <TransporterRegistry.hpp>
  16. #include <FastScheduler.hpp>
  17. #include <Emulator.hpp>
  18. #include <ErrorHandlingMacros.hpp>
  19. #include "LongSignal.hpp"
  20. #include <signaldata/EventReport.hpp>
  21. #include <signaldata/TestOrd.hpp>
  22. #include <signaldata/SignalDroppedRep.hpp>
  23. #include <signaldata/DisconnectRep.hpp>
  24. #include "VMSignal.hpp"
  25. #include <NdbOut.hpp>
  26. #include "DataBuffer.hpp"
  27. /**
  28.  * The instance
  29.  */
  30. SectionSegmentPool g_sectionSegmentPool;
  31. bool
  32. import(Ptr<SectionSegment> & first, const Uint32 * src, Uint32 len){
  33.   /**
  34.    * Dummy data used when setting prev.m_nextSegment for first segment of a
  35.    *   section
  36.    */
  37.   Uint32 dummyPrev[4]; 
  38.   first.p = 0;
  39.   if(g_sectionSegmentPool.seize(first)){
  40.     ;
  41.   } else {
  42.     return false;
  43.   }
  44.   first.p->m_sz = len;
  45.   first.p->m_ownerRef = 0;
  46.   
  47.   Ptr<SectionSegment> prevPtr = { (SectionSegment *)&dummyPrev[0], 0 };
  48.   Ptr<SectionSegment> currPtr = first;
  49.   
  50.   while(len > SectionSegment::DataLength){
  51.     prevPtr.p->m_nextSegment = currPtr.i;
  52.     memcpy(&currPtr.p->theData[0], src, 4 * SectionSegment::DataLength);
  53.     src += SectionSegment::DataLength;
  54.     len -= SectionSegment::DataLength;
  55.     prevPtr = currPtr;
  56.     if(g_sectionSegmentPool.seize(currPtr)){
  57.       ;
  58.     } else {
  59.       first.p->m_lastSegment = prevPtr.i;
  60.       return false;
  61.     }
  62.   }
  63.   first.p->m_lastSegment = currPtr.i;
  64.   currPtr.p->m_nextSegment = RNIL;
  65.   memcpy(&currPtr.p->theData[0], src, 4 * len);
  66.   return true;
  67. }
  68. void
  69. linkSegments(Uint32 head, Uint32 tail){
  70.   
  71.   Ptr<SectionSegment> headPtr;
  72.   g_sectionSegmentPool.getPtr(headPtr, head);
  73.   
  74.   Ptr<SectionSegment> tailPtr;
  75.   g_sectionSegmentPool.getPtr(tailPtr, tail);
  76.   
  77.   Ptr<SectionSegment> oldTailPtr;
  78.   g_sectionSegmentPool.getPtr(oldTailPtr, headPtr.p->m_lastSegment);
  79.   
  80.   headPtr.p->m_lastSegment = tailPtr.p->m_lastSegment;
  81.   headPtr.p->m_sz += tailPtr.p->m_sz;
  82.   
  83.   oldTailPtr.p->m_nextSegment = tailPtr.i;
  84. }
  85. void 
  86. copy(Uint32 * & insertPtr, 
  87.      class SectionSegmentPool & thePool, const SegmentedSectionPtr & _ptr){
  88.   Uint32 len = _ptr.sz;
  89.   SectionSegment * ptrP = _ptr.p;
  90.   
  91.   while(len > 60){
  92.     memcpy(insertPtr, &ptrP->theData[0], 4 * 60);
  93.     len -= 60;
  94.     insertPtr += 60;
  95.     ptrP = thePool.getPtr(ptrP->m_nextSegment);
  96.   }
  97.   memcpy(insertPtr, &ptrP->theData[0], 4 * len);
  98.   insertPtr += len;
  99. }
  100. void
  101. copy(Uint32 * dst, SegmentedSectionPtr src){
  102.   copy(dst, g_sectionSegmentPool, src);
  103. }
  104. void
  105. getSections(Uint32 secCount, SegmentedSectionPtr ptr[3]){
  106.   Uint32 tSec0 = ptr[0].i;
  107.   Uint32 tSec1 = ptr[1].i;
  108.   Uint32 tSec2 = ptr[2].i;
  109.   SectionSegment * p;
  110.   switch(secCount){
  111.   case 3:
  112.     p = g_sectionSegmentPool.getPtr(tSec2);
  113.     ptr[2].p = p;
  114.     ptr[2].sz = p->m_sz;
  115.   case 2:
  116.     p = g_sectionSegmentPool.getPtr(tSec1);
  117.     ptr[1].p = p;
  118.     ptr[1].sz = p->m_sz;
  119.   case 1:
  120.     p = g_sectionSegmentPool.getPtr(tSec0);
  121.     ptr[0].p = p;
  122.     ptr[0].sz = p->m_sz;
  123.   case 0:
  124.     return;
  125.   }
  126.   char msg[40];
  127.   sprintf(msg, "secCount=%d", secCount);
  128.   ErrorReporter::handleAssert(msg, __FILE__, __LINE__);
  129. }
  130. void
  131. getSection(SegmentedSectionPtr & ptr, Uint32 i){
  132.   ptr.i = i;
  133.   SectionSegment * p = g_sectionSegmentPool.getPtr(i);
  134.   ptr.p = p;
  135.   ptr.sz = p->m_sz;
  136. }
  137. #define relSz(x) ((x + SectionSegment::DataLength - 1) / SectionSegment::DataLength)
  138. void
  139. release(SegmentedSectionPtr & ptr){
  140.   g_sectionSegmentPool.releaseList(relSz(ptr.sz),
  141.    ptr.i, 
  142.    ptr.p->m_lastSegment);
  143. }
  144. void
  145. releaseSections(Uint32 secCount, SegmentedSectionPtr ptr[3]){
  146.   Uint32 tSec0 = ptr[0].i;
  147.   Uint32 tSz0 = ptr[0].sz;
  148.   Uint32 tSec1 = ptr[1].i;
  149.   Uint32 tSz1 = ptr[1].sz;
  150.   Uint32 tSec2 = ptr[2].i;
  151.   Uint32 tSz2 = ptr[2].sz;
  152.   switch(secCount){
  153.   case 3:
  154.     g_sectionSegmentPool.releaseList(relSz(tSz2), tSec2, 
  155.      ptr[2].p->m_lastSegment);
  156.   case 2:
  157.     g_sectionSegmentPool.releaseList(relSz(tSz1), tSec1, 
  158.      ptr[1].p->m_lastSegment);
  159.   case 1:
  160.     g_sectionSegmentPool.releaseList(relSz(tSz0), tSec0, 
  161.      ptr[0].p->m_lastSegment);
  162.   case 0:
  163.     return;
  164.   }
  165.   char msg[40];
  166.   sprintf(msg, "secCount=%d", secCount);
  167.   ErrorReporter::handleAssert(msg, __FILE__, __LINE__);
  168. }
  169. #include <DebuggerNames.hpp>
  170. void
  171. execute(void * callbackObj, 
  172. SignalHeader * const header, 
  173. Uint8 prio, 
  174. Uint32 * const theData,
  175. LinearSectionPtr ptr[3]){
  176.   const Uint32 secCount = header->m_noOfSections;
  177.   const Uint32 length = header->theLength;
  178. #ifdef TRACE_DISTRIBUTED
  179.   ndbout_c("recv: %s(%d) from (%s, %d)",
  180.    getSignalName(header->theVerId_signalNumber), 
  181.    header->theVerId_signalNumber,
  182.    getBlockName(refToBlock(header->theSendersBlockRef)),
  183.    refToNode(header->theSendersBlockRef));
  184. #endif
  185.   
  186.   bool ok = true;
  187.   Ptr<SectionSegment> secPtr[3];
  188.   switch(secCount){
  189.   case 3:
  190.     ok &= import(secPtr[2], ptr[2].p, ptr[2].sz);
  191.   case 2:
  192.     ok &= import(secPtr[1], ptr[1].p, ptr[1].sz);
  193.   case 1:
  194.     ok &= import(secPtr[0], ptr[0].p, ptr[0].sz);
  195.   }
  196.   /**
  197.    * Check that we haven't received a too long signal
  198.    */
  199.   ok &= (length + secCount <= 25);
  200.   
  201.   Uint32 secPtrI[3];
  202.   if(ok){
  203.     /**
  204.      * Normal path 
  205.      */
  206.     secPtrI[0] = secPtr[0].i;
  207.     secPtrI[1] = secPtr[1].i;
  208.     secPtrI[2] = secPtr[2].i;
  209.     globalScheduler.execute(header, prio, theData, secPtrI);  
  210.     return;
  211.   }
  212.   
  213.   /**
  214.    * Out of memory
  215.    */
  216.   for(Uint32 i = 0; i<secCount; i++){
  217.     if(secPtr[i].p != 0){
  218.       g_sectionSegmentPool.releaseList(relSz(ptr[i].sz), secPtr[i].i, 
  219.        secPtr[i].p->m_lastSegment);
  220.     }
  221.   }
  222.   Uint32 gsn = header->theVerId_signalNumber;
  223.   Uint32 len = header->theLength;
  224.   Uint32 newLen= (len > 22 ? 22 : len);
  225.   SignalDroppedRep * rep = (SignalDroppedRep*)theData;
  226.   memmove(rep->originalData, theData, (4 * newLen));
  227.   rep->originalGsn = gsn;
  228.   rep->originalLength = len;
  229.   rep->originalSectionCount = secCount;
  230.   header->theVerId_signalNumber = GSN_SIGNAL_DROPPED_REP;
  231.   header->theLength = newLen + 3;
  232.   header->m_noOfSections = 0;
  233.   globalScheduler.execute(header, prio, theData, secPtrI);    
  234. }
  235. NdbOut & 
  236. operator<<(NdbOut& out, const SectionSegment & ss){
  237.   out << "[ last= " << ss.m_lastSegment << " next= " << ss.nextPool << " ]";
  238.   return out;
  239. }
  240. void
  241. print(SectionSegment * s, Uint32 len, FILE* out){
  242.   for(Uint32 i = 0; i<len; i++){
  243.     fprintf(out, "H'0x%.8x ", s->theData[i]);
  244.     if(((i + 1) % 6) == 0)
  245.       fprintf(out, "n");
  246.   }
  247. }
  248. void
  249. print(SegmentedSectionPtr ptr, FILE* out){
  250.   ptr.p = g_sectionSegmentPool.getPtr(ptr.i);
  251.   Uint32 len = ptr.p->m_sz;
  252.   
  253.   fprintf(out, "ptr.i = %d(%p) ptr.sz = %d(%d)n", ptr.i, ptr.p, len, ptr.sz);
  254.   while(len > SectionSegment::DataLength){
  255.     print(ptr.p, SectionSegment::DataLength, out);
  256.     
  257.     len -= SectionSegment::DataLength;
  258.     fprintf(out, "ptr.i = %dn", ptr.p->m_nextSegment);
  259.     ptr.p = g_sectionSegmentPool.getPtr(ptr.p->m_nextSegment);
  260.   }
  261.   
  262.   print(ptr.p, len, out);
  263.   fprintf(out, "n");
  264. }
  265. int
  266. checkJobBuffer() {
  267.   /** 
  268.    * Check to see if jobbbuffers are starting to get full
  269.    * and if so call doJob
  270.    */
  271.   return globalScheduler.checkDoJob();
  272. }
  273. void
  274. reportError(void * callbackObj, NodeId nodeId, TransporterError errorCode){
  275. #ifdef DEBUG_TRANSPORTER
  276.   char buf[255];
  277.   sprintf(buf, "reportError (%d, 0x%x)", nodeId, errorCode);
  278.   ndbout << buf << endl;
  279. #endif
  280.   if(errorCode == TE_SIGNAL_LOST_SEND_BUFFER_FULL){
  281.     ErrorReporter::handleError(ecError,
  282.        ERR_PROGRAMERROR,
  283.        "Signal lost, send buffer full",
  284.        __FILE__,
  285.        NST_ErrorHandler);
  286.   }
  287.   if(errorCode == TE_SIGNAL_LOST){
  288.     ErrorReporter::handleError(ecError,
  289.        ERR_PROGRAMERROR,
  290.        "Signal lost (unknown reason)",
  291.        __FILE__,
  292.        NST_ErrorHandler);
  293.   }
  294.   
  295.   if(errorCode & 0x8000){
  296.     reportDisconnect(callbackObj, nodeId, errorCode);
  297.   }
  298.   
  299.   Signal signal;
  300.   memset(&signal.header, 0, sizeof(signal.header));
  301.   if(errorCode & 0x8000)
  302.     signal.theData[0] = EventReport::TransporterError;
  303.   else
  304.     signal.theData[0] = EventReport::TransporterWarning;
  305.   
  306.   signal.theData[1] = nodeId;
  307.   signal.theData[2] = errorCode;
  308.   
  309.   signal.header.theLength = 3;  
  310.   signal.header.theSendersSignalId = 0;
  311.   signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
  312.   globalScheduler.execute(&signal, JBA, CMVMI, GSN_EVENT_REP);
  313. }
  314. /**
  315.  * Report average send length in bytes (4096 last sends)
  316.  */
  317. void
  318. reportSendLen(void * callbackObj, 
  319.       NodeId nodeId, Uint32 count, Uint64 bytes){
  320.   Signal signal;
  321.   memset(&signal.header, 0, sizeof(signal.header));
  322.   signal.header.theLength = 3;
  323.   signal.header.theSendersSignalId = 0;
  324.   signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
  325.   signal.theData[0] = EventReport::SendBytesStatistic;
  326.   signal.theData[1] = nodeId;
  327.   signal.theData[2] = (bytes/count);
  328.   globalScheduler.execute(&signal, JBA, CMVMI, GSN_EVENT_REP);
  329. }
  330. /**
  331.  * Report average receive length in bytes (4096 last receives)
  332.  */
  333. void
  334. reportReceiveLen(void * callbackObj, 
  335.  NodeId nodeId, Uint32 count, Uint64 bytes){
  336.   Signal signal;
  337.   memset(&signal.header, 0, sizeof(signal.header));
  338.   signal.header.theLength = 3;  
  339.   signal.header.theSendersSignalId = 0;
  340.   signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
  341.   signal.theData[0] = EventReport::ReceiveBytesStatistic;
  342.   signal.theData[1] = nodeId;
  343.   signal.theData[2] = (bytes/count);
  344.   globalScheduler.execute(&signal, JBA, CMVMI, GSN_EVENT_REP);
  345. }
  346. /**
  347.  * Report connection established
  348.  */
  349. void
  350. reportConnect(void * callbackObj, NodeId nodeId){
  351.   Signal signal;
  352.   memset(&signal.header, 0, sizeof(signal.header));
  353.   signal.header.theLength = 1; 
  354.   signal.header.theSendersSignalId = 0;
  355.   signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
  356.   signal.theData[0] = nodeId;
  357.   
  358.   globalScheduler.execute(&signal, JBA, CMVMI, GSN_CONNECT_REP);
  359. }
  360. /**
  361.  * Report connection broken
  362.  */
  363. void
  364. reportDisconnect(void * callbackObj, NodeId nodeId, Uint32 errNo){
  365.   Signal signal;
  366.   memset(&signal.header, 0, sizeof(signal.header));
  367.   signal.header.theLength = DisconnectRep::SignalLength; 
  368.   signal.header.theSendersSignalId = 0;
  369.   signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
  370.   signal.header.theTrace = TestOrd::TraceDisconnect;
  371.   DisconnectRep * const  rep = (DisconnectRep *)&signal.theData[0];
  372.   rep->nodeId = nodeId;
  373.   rep->err = errNo;
  374.   globalScheduler.execute(&signal, JBA, CMVMI, GSN_DISCONNECT_REP);
  375. }
  376. void
  377. SignalLoggerManager::printSegmentedSection(FILE * output,
  378.                                            const SignalHeader & sh,
  379.                                            const SegmentedSectionPtr ptr[3],
  380.                                            unsigned i)
  381. {
  382.   fprintf(output, "SECTION %u type=segmented", i);
  383.   if (i >= 3) {
  384.     fprintf(output, " *** invalid ***n");
  385.     return;
  386.   }
  387.   const Uint32 len = ptr[i].sz;
  388.   SectionSegment * ssp = ptr[i].p;
  389.   Uint32 pos = 0;
  390.   fprintf(output, " size=%un", (unsigned)len);
  391.   while (pos < len) {
  392.     if (pos > 0 && pos % SectionSegment::DataLength == 0) {
  393.       ssp = g_sectionSegmentPool.getPtr(ssp->m_nextSegment);
  394.     }
  395.     printDataWord(output, pos, ssp->theData[pos % SectionSegment::DataLength]);
  396.   }
  397.   if (len > 0)
  398.     putc('n', output);
  399. }