FastScheduler.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:14k
- /* Copyright (C) 2003 MySQL AB
- This program is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation; either version 2 of the License, or
- (at your option) any later version.
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
- You should have received a copy of the GNU General Public License
- along with this program; if not, write to the Free Software
- Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
- #include "FastScheduler.hpp"
- #include "RefConvert.hpp"
- #include "Emulator.hpp"
- #include "VMSignal.hpp"
- #include <Error.hpp>
- #include <SignalLoggerManager.hpp>
- #include <BlockNumbers.h>
- #include <GlobalSignalNumbers.h>
- #include <signaldata/EventReport.hpp>
- #include "LongSignal.hpp"
- #include <NdbTick.h>
- #define MIN_NUMBER_OF_SIG_PER_DO_JOB 64
- #define MAX_NUMBER_OF_SIG_PER_DO_JOB 2048
- #define EXTRA_SIGNALS_PER_DO_JOB 32
- FastScheduler::FastScheduler()
- {
- // These constants work for sun only, but they should be initated from
- // Emulator.C as soon as VMTime has been initiated.
- theJobBuffers[0].newBuffer(JBASIZE);
- theJobBuffers[1].newBuffer(JBBSIZE);
- theJobBuffers[2].newBuffer(JBCSIZE);
- theJobBuffers[3].newBuffer(JBDSIZE);
- clear();
- }
- FastScheduler::~FastScheduler()
- {
- }
- void
- FastScheduler::clear()
- {
- int i;
- // Make sure the restart signals are not sent too early
- // the prio is set back in 'main' using the 'ready' method.
- globalData.highestAvailablePrio = LEVEL_IDLE;
- globalData.sendPackedActivated = 0;
- globalData.activateSendPacked = 0;
- for (i = 0; i < JB_LEVELS; i++){
- theJobBuffers[i].clear();
- }
- globalData.JobCounter = 0;
- globalData.JobLap = 0;
- globalData.loopMax = 32;
- globalData.VMSignals[0].header.theSignalId = 0;
-
- theDoJobTotalCounter = 0;
- theDoJobCallCounter = 0;
- }
- void
- FastScheduler::activateSendPacked()
- {
- globalData.sendPackedActivated = 1;
- globalData.activateSendPacked = 0;
- globalData.loopMax = 2048;
- }//FastScheduler::activateSendPacked()
- //------------------------------------------------------------------------
- // sendPacked is executed at the end of the loop.
- // To ensure that we don't send any messages before executing all local
- // packed signals we do another turn in the loop (unless we have already
- // executed too many signals in the loop).
- //------------------------------------------------------------------------
- void
- FastScheduler::doJob()
- {
- Uint32 loopCount = 0;
- Uint32 TminLoops = getBOccupancy() + EXTRA_SIGNALS_PER_DO_JOB;
- Uint32 TloopMax = (Uint32)globalData.loopMax;
- if (TminLoops < TloopMax) {
- TloopMax = TminLoops;
- }//if
- if (TloopMax < MIN_NUMBER_OF_SIG_PER_DO_JOB) {
- TloopMax = MIN_NUMBER_OF_SIG_PER_DO_JOB;
- }//if
- register Signal* signal = getVMSignals();
- register Uint32 tHighPrio= globalData.highestAvailablePrio;
- do{
- while ((tHighPrio < LEVEL_IDLE) && (loopCount < TloopMax)) {
- // signal->garbage_register();
- // To ensure we find bugs quickly
- register Uint32 gsnbnr = theJobBuffers[tHighPrio].retrieve(signal);
- register BlockNumber reg_bnr = gsnbnr & 0xFFF;
- register GlobalSignalNumber reg_gsn = gsnbnr >> 16;
- globalData.incrementWatchDogCounter(1);
- if (reg_bnr > 0) {
- Uint32 tJobCounter = globalData.JobCounter;
- Uint32 tJobLap = globalData.JobLap;
- SimulatedBlock* b = globalData.getBlock(reg_bnr);
- theJobPriority[tJobCounter] = (Uint8)tHighPrio;
- globalData.JobCounter = (tJobCounter + 1) & 4095;
- globalData.JobLap = tJobLap + 1;
-
- #ifdef VM_TRACE_TIME
- Uint32 us1, us2;
- Uint64 ms1, ms2;
- NdbTick_CurrentMicrosecond(&ms1, &us1);
- b->m_currentGsn = reg_gsn;
- #endif
-
- getSections(signal->header.m_noOfSections, signal->m_sectionPtr);
- #ifdef VM_TRACE
- {
- if (globalData.testOn) {
- signal->header.theVerId_signalNumber = reg_gsn;
- signal->header.theReceiversBlockNumber = reg_bnr;
-
- globalSignalLoggers.executeSignal(signal->header,
- tHighPrio,
- &signal->theData[0],
- globalData.ownId,
- signal->m_sectionPtr,
- signal->header.m_noOfSections);
- }//if
- }
- #endif
- b->executeFunction(reg_gsn, signal);
- releaseSections(signal->header.m_noOfSections, signal->m_sectionPtr);
- signal->header.m_noOfSections = 0;
- #ifdef VM_TRACE_TIME
- NdbTick_CurrentMicrosecond(&ms2, &us2);
- Uint64 diff = ms2;
- diff -= ms1;
- diff *= 1000000;
- diff += us2;
- diff -= us1;
- b->addTime(reg_gsn, diff);
- #endif
- tHighPrio = globalData.highestAvailablePrio;
- } else {
- tHighPrio++;
- globalData.highestAvailablePrio = tHighPrio;
- }//if
- loopCount++;
- }//while
- sendPacked();
- tHighPrio = globalData.highestAvailablePrio;
- if(getBOccupancy() > MAX_OCCUPANCY)
- {
- if(loopCount != TloopMax)
- abort();
- assert( loopCount == TloopMax );
- TloopMax += 512;
- }
- } while ((getBOccupancy() > MAX_OCCUPANCY) ||
- ((loopCount < TloopMax) &&
- (tHighPrio < LEVEL_IDLE)));
- theDoJobCallCounter ++;
- theDoJobTotalCounter += loopCount;
- if (theDoJobCallCounter == 8192) {
- reportDoJobStatistics(theDoJobTotalCounter >> 13);
- theDoJobCallCounter = 0;
- theDoJobTotalCounter = 0;
- }//if
- }//FastScheduler::doJob()
- void FastScheduler::sendPacked()
- {
- if (globalData.sendPackedActivated == 1) {
- SimulatedBlock* b_lqh = globalData.getBlock(DBLQH);
- SimulatedBlock* b_tc = globalData.getBlock(DBTC);
- SimulatedBlock* b_tup = globalData.getBlock(DBTUP);
- Signal* signal = getVMSignals();
- b_lqh->executeFunction(GSN_SEND_PACKED, signal);
- b_tc->executeFunction(GSN_SEND_PACKED, signal);
- b_tup->executeFunction(GSN_SEND_PACKED, signal);
- return;
- } else if (globalData.activateSendPacked == 0) {
- return;
- } else {
- activateSendPacked();
- }//if
- return;
- }//FastScheduler::sendPacked()
- Uint32
- APZJobBuffer::retrieve(Signal* signal)
- {
- Uint32 tOccupancy = theOccupancy;
- Uint32 myRPtr = rPtr;
- BufferEntry& buf = buffer[myRPtr];
- Uint32 gsnbnr;
- Uint32 cond = (++myRPtr == bufSize) - 1;
- Uint32 tRecBlockNo = buf.header.theReceiversBlockNumber;
-
- if (tOccupancy != 0) {
- if (tRecBlockNo != 0) {
- // Transform protocol to signal.
- rPtr = myRPtr & cond;
- theOccupancy = tOccupancy - 1;
- gsnbnr = buf.header.theVerId_signalNumber << 16 | tRecBlockNo;
-
- Uint32 tSignalId = globalData.theSignalId;
- Uint32 tLength = buf.header.theLength;
- Uint32 tFirstData = buf.theDataRegister[0];
- signal->header = buf.header;
-
- // Recall our signal Id for restart purposes
- buf.header.theSignalId = tSignalId;
- globalData.theSignalId = tSignalId + 1;
-
- Uint32* tDataRegPtr = &buf.theDataRegister[0];
- Uint32* tSigDataPtr = signal->getDataPtrSend();
- *tSigDataPtr = tFirstData;
- tDataRegPtr++;
- tSigDataPtr++;
- Uint32 tLengthCopied = 1;
- while (tLengthCopied < tLength) {
- Uint32 tData0 = tDataRegPtr[0];
- Uint32 tData1 = tDataRegPtr[1];
- Uint32 tData2 = tDataRegPtr[2];
- Uint32 tData3 = tDataRegPtr[3];
-
- tDataRegPtr += 4;
- tLengthCopied += 4;
-
- tSigDataPtr[0] = tData0;
- tSigDataPtr[1] = tData1;
- tSigDataPtr[2] = tData2;
- tSigDataPtr[3] = tData3;
- tSigDataPtr += 4;
- }//while
- /**
- * Copy sections references (copy all without if-statements)
- */
- tDataRegPtr = &buf.theDataRegister[tLength];
- SegmentedSectionPtr * tSecPtr = &signal->m_sectionPtr[0];
- Uint32 tData0 = tDataRegPtr[0];
- Uint32 tData1 = tDataRegPtr[1];
- Uint32 tData2 = tDataRegPtr[2];
-
- tSecPtr[0].i = tData0;
- tSecPtr[1].i = tData1;
- tSecPtr[2].i = tData2;
-
- //---------------------------------------------------------
- // Prefetch of buffer[rPtr] is done here. We prefetch for
- // read both the first cache line and the next 64 byte
- // entry
- //---------------------------------------------------------
- PREFETCH((void*)&buffer[rPtr]);
- PREFETCH((void*)(((char*)&buffer[rPtr]) + 64));
- return gsnbnr;
- } else {
- bnr_error();
- return 0; // Will never come here, simply to keep GCC happy.
- }//if
- } else {
- //------------------------------------------------------------
- // The Job Buffer was empty, signal this by return zero.
- //------------------------------------------------------------
- return 0;
- }//if
- }//APZJobBuffer::retrieve()
- void
- APZJobBuffer::signal2buffer(Signal* signal,
- BlockNumber bnr, GlobalSignalNumber gsn,
- BufferEntry& buf)
- {
- Uint32 tSignalId = globalData.theSignalId;
- Uint32 tFirstData = signal->theData[0];
- Uint32 tLength = signal->header.theLength;
- Uint32 tSigId = buf.header.theSignalId;
-
- buf.header = signal->header;
- buf.header.theVerId_signalNumber = gsn;
- buf.header.theReceiversBlockNumber = bnr;
- buf.header.theSendersSignalId = tSignalId - 1;
- buf.header.theSignalId = tSigId;
- buf.theDataRegister[0] = tFirstData;
-
- Uint32 tLengthCopied = 1;
- Uint32* tSigDataPtr = &signal->theData[1];
- Uint32* tDataRegPtr = &buf.theDataRegister[1];
- while (tLengthCopied < tLength) {
- Uint32 tData0 = tSigDataPtr[0];
- Uint32 tData1 = tSigDataPtr[1];
- Uint32 tData2 = tSigDataPtr[2];
- Uint32 tData3 = tSigDataPtr[3];
-
- tLengthCopied += 4;
- tSigDataPtr += 4;
- tDataRegPtr[0] = tData0;
- tDataRegPtr[1] = tData1;
- tDataRegPtr[2] = tData2;
- tDataRegPtr[3] = tData3;
- tDataRegPtr += 4;
- }//while
- /**
- * Copy sections references (copy all without if-statements)
- */
- tDataRegPtr = &buf.theDataRegister[tLength];
- SegmentedSectionPtr * tSecPtr = &signal->m_sectionPtr[0];
- Uint32 tData0 = tSecPtr[0].i;
- Uint32 tData1 = tSecPtr[1].i;
- Uint32 tData2 = tSecPtr[2].i;
- tDataRegPtr[0] = tData0;
- tDataRegPtr[1] = tData1;
- tDataRegPtr[2] = tData2;
- }//APZJobBuffer::signal2buffer()
- void
- APZJobBuffer::insert(const SignalHeader * const sh,
- const Uint32 * const theData, const Uint32 secPtrI[3]){
- Uint32 tOccupancy = theOccupancy + 1;
- Uint32 myWPtr = wPtr;
- register BufferEntry& buf = buffer[myWPtr];
-
- if (tOccupancy < bufSize) {
- Uint32 cond = (++myWPtr == bufSize) - 1;
- wPtr = myWPtr & cond;
- theOccupancy = tOccupancy;
-
- buf.header = * sh;
- const Uint32 len = buf.header.theLength;
- memcpy(buf.theDataRegister, theData, 4 * len);
- memcpy(&buf.theDataRegister[len], &secPtrI[0], 4 * 3);
- //---------------------------------------------------------
- // Prefetch of buffer[wPtr] is done here. We prefetch for
- // write both the first cache line and the next 64 byte
- // entry
- //---------------------------------------------------------
- WRITEHINT((void*)&buffer[wPtr]);
- WRITEHINT((void*)(((char*)&buffer[wPtr]) + 64));
-
- } else {
- jbuf_error();
- }//if
- }
- APZJobBuffer::APZJobBuffer()
- : bufSize(0), buffer(NULL), memRef(NULL)
- {
- clear();
- }
- APZJobBuffer::~APZJobBuffer()
- {
- delete [] buffer;
- }
- void
- APZJobBuffer::newBuffer(int size)
- {
- buffer = new BufferEntry[size + 1]; // +1 to support "overrrun"
- if(buffer){
- #ifndef NDB_PURIFY
- ::memset(buffer, 0, (size * sizeof(BufferEntry)));
- #endif
- bufSize = size;
- } else
- bufSize = 0;
- }
- void
- APZJobBuffer::clear()
- {
- rPtr = 0;
- wPtr = 0;
- theOccupancy = 0;
- }
- /**
- * Function prototype for print_restart
- *
- * Defined later in this file
- */
- void print_restart(FILE * output, Signal* signal, Uint32 aLevel);
- void FastScheduler::dumpSignalMemory(FILE * output)
- {
- Signal signal;
- Uint32 ReadPtr[5];
- Uint32 tJob;
- Uint32 tLastJob;
- fprintf(output, "n");
-
- if (globalData.JobLap > 4095) {
- if (globalData.JobCounter != 0)
- tJob = globalData.JobCounter - 1;
- else
- tJob = 4095;
- tLastJob = globalData.JobCounter;
- } else {
- if (globalData.JobCounter == 0)
- return; // No signals sent
- else {
- tJob = globalData.JobCounter - 1;
- tLastJob = 4095;
- }
- }
- ReadPtr[0] = theJobBuffers[0].getReadPtr();
- ReadPtr[1] = theJobBuffers[1].getReadPtr();
- ReadPtr[2] = theJobBuffers[2].getReadPtr();
- ReadPtr[3] = theJobBuffers[3].getReadPtr();
-
- do {
- unsigned char tLevel = theJobPriority[tJob];
- globalData.incrementWatchDogCounter(4);
- if (ReadPtr[tLevel] == 0)
- ReadPtr[tLevel] = theJobBuffers[tLevel].getBufSize() - 1;
- else
- ReadPtr[tLevel]--;
-
- theJobBuffers[tLevel].retrieveDump(&signal, ReadPtr[tLevel]);
- print_restart(output, &signal, tLevel);
-
- if (tJob == 0)
- tJob = 4095;
- else
- tJob--;
-
- } while (tJob != tLastJob);
- fflush(output);
- }
- void
- FastScheduler::prio_level_error()
- {
- ERROR_SET(ecError, ERROR_WRONG_PRIO_LEVEL,
- "Wrong Priority Level", "FastScheduler.C");
- }
- void
- jbuf_error()
- {
- ERROR_SET(ecError, BLOCK_ERROR_JBUFCONGESTION,
- "Job Buffer Full", "APZJobBuffer.C");
- }
- void
- bnr_error()
- {
- ERROR_SET(ecError, BLOCK_ERROR_BNR_ZERO,
- "Block Number Zero", "FastScheduler.C");
- }
- void
- print_restart(FILE * output, Signal* signal, Uint32 aLevel)
- {
- fprintf(output, "--------------- Signal ----------------n");
- SignalLoggerManager::printSignalHeader(output,
- signal->header,
- aLevel,
- globalData.ownId,
- true);
- SignalLoggerManager::printSignalData (output,
- signal->header,
- &signal->theData[0]);
- }
- /**
- * This method used to be a Cmvmi member function
- * but is now a "ordinary" function"
- *
- * See TransporterCallback.cpp for explanation
- */
- void
- FastScheduler::reportDoJobStatistics(Uint32 tMeanLoopCount) {
- Signal signal;
- memset(&signal.header, 0, sizeof(signal.header));
- signal.theData[0] = EventReport::JobStatistic;
- signal.theData[1] = tMeanLoopCount;
-
- memset(&signal.header, 0, sizeof(SignalHeader));
- signal.header.theLength = 2;
- signal.header.theSendersSignalId = 0;
- signal.header.theSendersBlockRef = numberToRef(0, 0);
-
- execute(&signal, JBA, CMVMI, GSN_EVENT_REP);
- }