FastScheduler.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:14k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #include "FastScheduler.hpp"
  14. #include "RefConvert.hpp"
  15. #include "Emulator.hpp"
  16. #include "VMSignal.hpp"
  17. #include <Error.hpp>
  18. #include <SignalLoggerManager.hpp>
  19. #include <BlockNumbers.h>
  20. #include <GlobalSignalNumbers.h>
  21. #include <signaldata/EventReport.hpp>
  22. #include "LongSignal.hpp"
  23. #include <NdbTick.h>
  24. #define MIN_NUMBER_OF_SIG_PER_DO_JOB 64
  25. #define MAX_NUMBER_OF_SIG_PER_DO_JOB 2048
  26. #define EXTRA_SIGNALS_PER_DO_JOB 32
  27. FastScheduler::FastScheduler()
  28. {
  29.    // These constants work for sun only, but they should be initated from
  30.    // Emulator.C as soon as VMTime has been initiated.
  31.    theJobBuffers[0].newBuffer(JBASIZE);
  32.    theJobBuffers[1].newBuffer(JBBSIZE);
  33.    theJobBuffers[2].newBuffer(JBCSIZE);
  34.    theJobBuffers[3].newBuffer(JBDSIZE);
  35.    clear();
  36. }
  37. FastScheduler::~FastScheduler()
  38. {
  39. }
  40. void 
  41. FastScheduler::clear()
  42. {
  43.   int i;
  44.   // Make sure the restart signals are not sent too early
  45.   // the prio is set back in 'main' using the 'ready' method.
  46.   globalData.highestAvailablePrio = LEVEL_IDLE;
  47.   globalData.sendPackedActivated = 0;
  48.   globalData.activateSendPacked = 0;
  49.   for (i = 0; i < JB_LEVELS; i++){
  50.     theJobBuffers[i].clear();
  51.   }
  52.   globalData.JobCounter = 0;
  53.   globalData.JobLap = 0;
  54.   globalData.loopMax = 32;
  55.   globalData.VMSignals[0].header.theSignalId = 0;
  56.   
  57.   theDoJobTotalCounter = 0;
  58.   theDoJobCallCounter = 0;
  59. }
  60. void
  61. FastScheduler::activateSendPacked()
  62. {
  63.   globalData.sendPackedActivated = 1;
  64.   globalData.activateSendPacked = 0;
  65.   globalData.loopMax = 2048;
  66. }//FastScheduler::activateSendPacked()
  67. //------------------------------------------------------------------------
  68. // sendPacked is executed at the end of the loop.
  69. // To ensure that we don't send any messages before executing all local
  70. // packed signals we do another turn in the loop (unless we have already
  71. // executed too many signals in the loop).
  72. //------------------------------------------------------------------------
  73. void 
  74. FastScheduler::doJob()
  75. {
  76.   Uint32 loopCount = 0;
  77.   Uint32 TminLoops = getBOccupancy() + EXTRA_SIGNALS_PER_DO_JOB;
  78.   Uint32 TloopMax = (Uint32)globalData.loopMax;
  79.   if (TminLoops < TloopMax) {
  80.     TloopMax = TminLoops;
  81.   }//if
  82.   if (TloopMax < MIN_NUMBER_OF_SIG_PER_DO_JOB) {
  83.     TloopMax = MIN_NUMBER_OF_SIG_PER_DO_JOB;
  84.   }//if
  85.   register Signal* signal = getVMSignals();
  86.   register Uint32 tHighPrio= globalData.highestAvailablePrio;
  87.   do{
  88.     while ((tHighPrio < LEVEL_IDLE) && (loopCount < TloopMax)) {
  89.       // signal->garbage_register(); 
  90.       // To ensure we find bugs quickly
  91.       register Uint32 gsnbnr = theJobBuffers[tHighPrio].retrieve(signal);
  92.       register BlockNumber reg_bnr = gsnbnr & 0xFFF;
  93.       register GlobalSignalNumber reg_gsn = gsnbnr >> 16;
  94.       globalData.incrementWatchDogCounter(1);
  95.       if (reg_bnr > 0) {
  96.         Uint32 tJobCounter = globalData.JobCounter;
  97.         Uint32 tJobLap = globalData.JobLap;
  98.         SimulatedBlock* b = globalData.getBlock(reg_bnr);
  99.         theJobPriority[tJobCounter] = (Uint8)tHighPrio;
  100.         globalData.JobCounter = (tJobCounter + 1) & 4095;
  101.         globalData.JobLap = tJobLap + 1;
  102. #ifdef VM_TRACE_TIME
  103. Uint32 us1, us2;
  104. Uint64 ms1, ms2;
  105. NdbTick_CurrentMicrosecond(&ms1, &us1);
  106. b->m_currentGsn = reg_gsn;
  107. #endif
  108. getSections(signal->header.m_noOfSections, signal->m_sectionPtr);
  109. #ifdef VM_TRACE
  110.         {
  111.           if (globalData.testOn) {
  112.     signal->header.theVerId_signalNumber = reg_gsn;
  113.     signal->header.theReceiversBlockNumber = reg_bnr;
  114.     
  115.             globalSignalLoggers.executeSignal(signal->header,
  116.       tHighPrio,
  117.       &signal->theData[0], 
  118.       globalData.ownId,
  119.                                               signal->m_sectionPtr,
  120.                                               signal->header.m_noOfSections);
  121.           }//if
  122.         }
  123. #endif
  124.         b->executeFunction(reg_gsn, signal);
  125. releaseSections(signal->header.m_noOfSections, signal->m_sectionPtr);
  126. signal->header.m_noOfSections = 0;
  127. #ifdef VM_TRACE_TIME
  128. NdbTick_CurrentMicrosecond(&ms2, &us2);
  129. Uint64 diff = ms2;
  130. diff -= ms1;
  131. diff *= 1000000;
  132. diff += us2;
  133. diff -= us1;
  134. b->addTime(reg_gsn, diff);
  135. #endif
  136.         tHighPrio = globalData.highestAvailablePrio;
  137.       } else {
  138.         tHighPrio++;
  139.         globalData.highestAvailablePrio = tHighPrio;
  140.       }//if
  141.       loopCount++;
  142.     }//while
  143.     sendPacked();
  144.     tHighPrio = globalData.highestAvailablePrio;
  145.     if(getBOccupancy() > MAX_OCCUPANCY)
  146.     {
  147.       if(loopCount != TloopMax)
  148. abort();
  149.       assert( loopCount == TloopMax );
  150.       TloopMax += 512;
  151.     }
  152.   } while ((getBOccupancy() > MAX_OCCUPANCY) ||
  153.            ((loopCount < TloopMax) &&
  154.             (tHighPrio < LEVEL_IDLE)));
  155.   theDoJobCallCounter ++;
  156.   theDoJobTotalCounter += loopCount;
  157.   if (theDoJobCallCounter == 8192) {
  158.     reportDoJobStatistics(theDoJobTotalCounter >> 13);
  159.     theDoJobCallCounter = 0;
  160.     theDoJobTotalCounter = 0;
  161.   }//if
  162. }//FastScheduler::doJob()
  163. void FastScheduler::sendPacked()
  164. {
  165.   if (globalData.sendPackedActivated == 1) {
  166.     SimulatedBlock* b_lqh = globalData.getBlock(DBLQH);
  167.     SimulatedBlock* b_tc = globalData.getBlock(DBTC);
  168.     SimulatedBlock* b_tup = globalData.getBlock(DBTUP);
  169.     Signal* signal = getVMSignals();
  170.     b_lqh->executeFunction(GSN_SEND_PACKED, signal);
  171.     b_tc->executeFunction(GSN_SEND_PACKED, signal);
  172.     b_tup->executeFunction(GSN_SEND_PACKED, signal);
  173.     return;
  174.   } else if (globalData.activateSendPacked == 0) {
  175.     return;
  176.   } else {
  177.     activateSendPacked();
  178.   }//if
  179.   return;
  180. }//FastScheduler::sendPacked()
  181. Uint32
  182. APZJobBuffer::retrieve(Signal* signal)
  183. {              
  184.   Uint32 tOccupancy = theOccupancy;
  185.   Uint32 myRPtr = rPtr;
  186.   BufferEntry& buf = buffer[myRPtr];
  187.   Uint32 gsnbnr;
  188.   Uint32 cond =  (++myRPtr == bufSize) - 1;
  189.   Uint32 tRecBlockNo = buf.header.theReceiversBlockNumber;
  190.   
  191.   if (tOccupancy != 0) {
  192.     if (tRecBlockNo != 0) {
  193.       // Transform protocol to signal. 
  194.       rPtr = myRPtr & cond;
  195.       theOccupancy = tOccupancy - 1;
  196.       gsnbnr = buf.header.theVerId_signalNumber << 16 | tRecBlockNo;
  197.       
  198.       Uint32 tSignalId = globalData.theSignalId;
  199.       Uint32 tLength = buf.header.theLength;
  200.       Uint32 tFirstData = buf.theDataRegister[0];
  201.       signal->header = buf.header;
  202.       
  203.       // Recall our signal Id for restart purposes
  204.       buf.header.theSignalId = tSignalId;  
  205.       globalData.theSignalId = tSignalId + 1;
  206.       
  207.       Uint32* tDataRegPtr = &buf.theDataRegister[0];
  208.       Uint32* tSigDataPtr = signal->getDataPtrSend();
  209.       *tSigDataPtr = tFirstData;
  210.       tDataRegPtr++;
  211.       tSigDataPtr++;
  212.       Uint32  tLengthCopied = 1;
  213.       while (tLengthCopied < tLength) {
  214.         Uint32 tData0 = tDataRegPtr[0];
  215.         Uint32 tData1 = tDataRegPtr[1];
  216.         Uint32 tData2 = tDataRegPtr[2];
  217.         Uint32 tData3 = tDataRegPtr[3];
  218.         tDataRegPtr += 4;
  219.         tLengthCopied += 4;
  220.         tSigDataPtr[0] = tData0;
  221.         tSigDataPtr[1] = tData1;
  222.         tSigDataPtr[2] = tData2;
  223.         tSigDataPtr[3] = tData3;
  224.         tSigDataPtr += 4;
  225.       }//while
  226.       /**
  227.        * Copy sections references (copy all without if-statements)
  228.        */
  229.       tDataRegPtr = &buf.theDataRegister[tLength];
  230.       SegmentedSectionPtr * tSecPtr = &signal->m_sectionPtr[0];
  231.       Uint32 tData0 = tDataRegPtr[0];
  232.       Uint32 tData1 = tDataRegPtr[1];
  233.       Uint32 tData2 = tDataRegPtr[2];
  234.       
  235.       tSecPtr[0].i = tData0;
  236.       tSecPtr[1].i = tData1;
  237.       tSecPtr[2].i = tData2;
  238.       
  239.       //---------------------------------------------------------
  240.       // Prefetch of buffer[rPtr] is done here. We prefetch for
  241.       // read both the first cache line and the next 64 byte
  242.       // entry
  243.       //---------------------------------------------------------
  244.       PREFETCH((void*)&buffer[rPtr]);
  245.       PREFETCH((void*)(((char*)&buffer[rPtr]) + 64));
  246.       return gsnbnr;
  247.     } else {
  248.       bnr_error();
  249.       return 0; // Will never come here, simply to keep GCC happy.
  250.     }//if
  251.   } else {
  252.     //------------------------------------------------------------
  253.     // The Job Buffer was empty, signal this by return zero.
  254.     //------------------------------------------------------------
  255.     return 0;
  256.   }//if
  257. }//APZJobBuffer::retrieve()
  258. void 
  259. APZJobBuffer::signal2buffer(Signal* signal,
  260.     BlockNumber bnr, GlobalSignalNumber gsn,
  261.     BufferEntry& buf)
  262. {
  263.   Uint32 tSignalId = globalData.theSignalId;
  264.   Uint32 tFirstData = signal->theData[0];
  265.   Uint32 tLength = signal->header.theLength;
  266.   Uint32 tSigId  = buf.header.theSignalId;
  267.   
  268.   buf.header = signal->header;
  269.   buf.header.theVerId_signalNumber = gsn;
  270.   buf.header.theReceiversBlockNumber = bnr;
  271.   buf.header.theSendersSignalId = tSignalId - 1;
  272.   buf.header.theSignalId = tSigId;
  273.   buf.theDataRegister[0] = tFirstData;
  274.   
  275.   Uint32 tLengthCopied = 1;
  276.   Uint32* tSigDataPtr = &signal->theData[1];
  277.   Uint32* tDataRegPtr = &buf.theDataRegister[1];
  278.   while (tLengthCopied < tLength) {
  279.     Uint32 tData0 = tSigDataPtr[0];
  280.     Uint32 tData1 = tSigDataPtr[1];
  281.     Uint32 tData2 = tSigDataPtr[2];
  282.     Uint32 tData3 = tSigDataPtr[3];
  283.     
  284.     tLengthCopied += 4;
  285.     tSigDataPtr += 4;
  286.     tDataRegPtr[0] = tData0;
  287.     tDataRegPtr[1] = tData1;
  288.     tDataRegPtr[2] = tData2;
  289.     tDataRegPtr[3] = tData3;
  290.     tDataRegPtr += 4;
  291.   }//while
  292.   /**
  293.    * Copy sections references (copy all without if-statements)
  294.    */
  295.   tDataRegPtr = &buf.theDataRegister[tLength];
  296.   SegmentedSectionPtr * tSecPtr = &signal->m_sectionPtr[0];
  297.   Uint32 tData0 = tSecPtr[0].i;
  298.   Uint32 tData1 = tSecPtr[1].i;
  299.   Uint32 tData2 = tSecPtr[2].i;
  300.   tDataRegPtr[0] = tData0;
  301.   tDataRegPtr[1] = tData1;
  302.   tDataRegPtr[2] = tData2;
  303. }//APZJobBuffer::signal2buffer()
  304. void
  305. APZJobBuffer::insert(const SignalHeader * const sh,
  306.      const Uint32 * const theData, const Uint32 secPtrI[3]){
  307.   Uint32 tOccupancy = theOccupancy + 1;
  308.   Uint32 myWPtr = wPtr;
  309.   register BufferEntry& buf = buffer[myWPtr];
  310.   
  311.   if (tOccupancy < bufSize) {
  312.     Uint32 cond =  (++myWPtr == bufSize) - 1;
  313.     wPtr = myWPtr & cond;
  314.     theOccupancy = tOccupancy;
  315.     
  316.     buf.header = * sh;
  317.     const Uint32 len = buf.header.theLength;
  318.     memcpy(buf.theDataRegister, theData, 4 * len);
  319.     memcpy(&buf.theDataRegister[len], &secPtrI[0], 4 * 3);
  320.     //---------------------------------------------------------
  321.     // Prefetch of buffer[wPtr] is done here. We prefetch for
  322.     // write both the first cache line and the next 64 byte
  323.     // entry
  324.     //---------------------------------------------------------
  325.     WRITEHINT((void*)&buffer[wPtr]);
  326.     WRITEHINT((void*)(((char*)&buffer[wPtr]) + 64));
  327.     
  328.   } else {
  329.     jbuf_error();
  330.   }//if
  331. }
  332. APZJobBuffer::APZJobBuffer()
  333.   : bufSize(0), buffer(NULL), memRef(NULL)
  334. {
  335.   clear();
  336. }
  337. APZJobBuffer::~APZJobBuffer()
  338. {
  339.   delete [] buffer;
  340. }
  341. void
  342. APZJobBuffer::newBuffer(int size)
  343. {
  344.   buffer = new BufferEntry[size + 1]; // +1 to support "overrrun"
  345.   if(buffer){
  346. #ifndef NDB_PURIFY
  347.     ::memset(buffer, 0, (size * sizeof(BufferEntry)));
  348. #endif
  349.     bufSize = size;
  350.   } else
  351.     bufSize = 0;
  352. }
  353. void
  354. APZJobBuffer::clear()
  355. {
  356.   rPtr = 0;
  357.   wPtr = 0;
  358.   theOccupancy = 0;
  359. }
  360. /**
  361.  * Function prototype for print_restart
  362.  *
  363.  *   Defined later in this file
  364.  */
  365. void print_restart(FILE * output, Signal* signal, Uint32 aLevel);
  366. void FastScheduler::dumpSignalMemory(FILE * output)
  367. {
  368.   Signal signal;
  369.   Uint32 ReadPtr[5];
  370.   Uint32 tJob;
  371.   Uint32 tLastJob;
  372.   fprintf(output, "n");
  373.  
  374.   if (globalData.JobLap > 4095) {
  375.     if (globalData.JobCounter != 0)
  376.       tJob = globalData.JobCounter - 1;
  377.     else
  378.       tJob = 4095;
  379.     tLastJob = globalData.JobCounter;
  380.   } else {
  381.     if (globalData.JobCounter == 0)
  382.       return; // No signals sent
  383.     else {
  384.       tJob = globalData.JobCounter - 1;
  385.       tLastJob = 4095;
  386.     }
  387.   }
  388.   ReadPtr[0] = theJobBuffers[0].getReadPtr();
  389.   ReadPtr[1] = theJobBuffers[1].getReadPtr();
  390.   ReadPtr[2] = theJobBuffers[2].getReadPtr();
  391.   ReadPtr[3] = theJobBuffers[3].getReadPtr();
  392.   
  393.   do {
  394.     unsigned char tLevel = theJobPriority[tJob];
  395.     globalData.incrementWatchDogCounter(4);
  396.     if (ReadPtr[tLevel] == 0)
  397.       ReadPtr[tLevel] = theJobBuffers[tLevel].getBufSize() - 1;
  398.     else
  399.       ReadPtr[tLevel]--;
  400.     
  401.     theJobBuffers[tLevel].retrieveDump(&signal, ReadPtr[tLevel]);
  402.     print_restart(output, &signal, tLevel);
  403.     
  404.     if (tJob == 0)
  405.       tJob = 4095;
  406.     else
  407.       tJob--;
  408.     
  409.   } while (tJob != tLastJob);
  410.   fflush(output);
  411. }
  412. void
  413. FastScheduler::prio_level_error()
  414. {
  415.   ERROR_SET(ecError, ERROR_WRONG_PRIO_LEVEL, 
  416.     "Wrong Priority Level", "FastScheduler.C");
  417. }
  418. void 
  419. jbuf_error()
  420. {
  421.   ERROR_SET(ecError, BLOCK_ERROR_JBUFCONGESTION, 
  422.     "Job Buffer Full", "APZJobBuffer.C");
  423. }
  424. void 
  425. bnr_error()
  426. {
  427.   ERROR_SET(ecError, BLOCK_ERROR_BNR_ZERO, 
  428.     "Block Number Zero", "FastScheduler.C");
  429. }
  430. void
  431. print_restart(FILE * output, Signal* signal, Uint32 aLevel)
  432. {
  433.   fprintf(output, "--------------- Signal ----------------n");
  434.   SignalLoggerManager::printSignalHeader(output, 
  435.  signal->header,
  436.  aLevel,
  437.  globalData.ownId, 
  438.  true);
  439.   SignalLoggerManager::printSignalData  (output, 
  440.  signal->header,
  441.  &signal->theData[0]);
  442. }
  443. /**
  444.  * This method used to be a Cmvmi member function
  445.  * but is now a "ordinary" function"
  446.  *
  447.  * See TransporterCallback.cpp for explanation
  448.  */
  449. void 
  450. FastScheduler::reportDoJobStatistics(Uint32 tMeanLoopCount) {
  451.   Signal signal; 
  452.   memset(&signal.header, 0, sizeof(signal.header));
  453.   signal.theData[0] = EventReport::JobStatistic;
  454.   signal.theData[1] = tMeanLoopCount;
  455.   
  456.   memset(&signal.header, 0, sizeof(SignalHeader));
  457.   signal.header.theLength = 2;
  458.   signal.header.theSendersSignalId = 0;
  459.   signal.header.theSendersBlockRef = numberToRef(0, 0);
  460.   
  461.   execute(&signal, JBA, CMVMI, GSN_EVENT_REP);
  462. }