DblqhMain.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:703k
- tupKeyReq->opRef = tcConnectptr.i;
- tupKeyReq->applRef = cownref;
- tupKeyReq->schemaVersion = scanptr.p->scanSchemaVersion;
- tupKeyReq->storedProcedure = scanptr.p->scanStoredProcId;
- tupKeyReq->transId1 = tcConnectptr.p->transid[0];
- tupKeyReq->transId2 = tcConnectptr.p->transid[1];
- tupKeyReq->fragPtr = tupFragPtr;
- tupKeyReq->primaryReplica = (tcConnectptr.p->seqNoReplica == 0)?true:false;
- tupKeyReq->coordinatorTC = tcConnectptr.p->tcBlockref;
- tupKeyReq->tcOpIndex = tcConnectptr.p->tcOprec;
- tupKeyReq->savePointId = tcConnectptr.p->savePointId;
- Uint32 blockNo = refToBlock(tcConnectptr.p->tcTupBlockref);
- EXECUTE_DIRECT(blockNo, GSN_TUPKEYREQ, signal,
- TupKeyReq::SignalLength);
- }
- }//Dblqh::copySendTupkeyReqLab()
- /*---------------------------------------------------------------------------*/
- /* USED IN COPYING OPERATION TO RECEIVE ATTRINFO FROM TUP. */
- /*---------------------------------------------------------------------------*/
- /* ************>> */
- /* TRANSID_AI > */
- /* ************>> */
- void Dblqh::execTRANSID_AI(Signal* signal)
- {
- jamEntry();
- tcConnectptr.i = signal->theData[0];
- ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
- Uint32 length = signal->length() - 3;
- ndbrequire(tcConnectptr.p->transactionState == TcConnectionrec::COPY_TUPKEY);
- Uint32 * src = &signal->theData[3];
- while(length > 22){
- if (saveTupattrbuf(signal, src, 22) == ZOK) {
- ;
- } else {
- jam();
- tcConnectptr.p->errorCode = ZGET_ATTRINBUF_ERROR;
- return;
- }//if
- src += 22;
- length -= 22;
- }
- if (saveTupattrbuf(signal, src, length) == ZOK) {
- return;
- }
- jam();
- tcConnectptr.p->errorCode = ZGET_ATTRINBUF_ERROR;
- }//Dblqh::execTRANSID_AI()
- /*--------------------------------------------------------------------------*/
- /* ENTER TUPKEYCONF WITH */
- /* TC_CONNECTPTR, */
- /* TDATA2, */
- /* TDATA3, */
- /* TDATA4, */
- /* TDATA5 */
- /*--------------------------------------------------------------------------*/
- /* PRECONDITION: TRANSACTION_STATE = COPY_TUPKEY */
- /*--------------------------------------------------------------------------*/
- void Dblqh::copyTupkeyConfLab(Signal* signal)
- {
- const TupKeyConf * const tupKeyConf = (TupKeyConf *)signal->getDataPtr();
- UintR readLength = tupKeyConf->readLength;
- scanptr.i = tcConnectptr.p->tcScanRec;
- c_scanRecordPool.getPtr(scanptr);
- releaseActiveFrag(signal);
- if (tcConnectptr.p->errorCode != 0) {
- jam();
- closeCopyLab(signal);
- return;
- }//if
- if (scanptr.p->scanCompletedStatus == ZTRUE) {
- jam();
- /*---------------------------------------------------------------------------*/
- /* THE COPY PROCESS HAVE BEEN CLOSED. MOST LIKELY A NODE FAILURE. */
- /*---------------------------------------------------------------------------*/
- closeCopyLab(signal);
- return;
- }//if
- tcConnectptr.p->totSendlenAi = readLength;
- tcConnectptr.p->connectState = TcConnectionrec::COPY_CONNECTED;
- calculateHash(signal);
- /*---------------------------------------------------------------------------*/
- // To avoid using up to many operation records in ACC we will increase the
- // constant to ensure that we never send more than 40 records at a time.
- // This is where the constant 56 comes from. For long records this constant
- // will not matter that much. The current maximum is 6000 words outstanding
- // (including a number of those 56 words not really sent). We also have to
- // ensure that there are never more simultaneous usage of these operation
- // records to ensure that node recovery does not fail because of simultaneous
- // scanning.
- /*---------------------------------------------------------------------------*/
- UintR TnoOfWords = readLength + tcConnectptr.p->primKeyLen;
- TnoOfWords = TnoOfWords + MAGIC_CONSTANT;
- TnoOfWords = TnoOfWords + (TnoOfWords >> 2);
- /*-----------------------------------------------------------------
- * NOTE for transid1!
- * Transid1 in the tcConnection record is used load regulate the
- * copy(node recovery) process.
- * The number of outstanding words are written in the transid1
- * variable. This will be sent to the starting node in the
- * LQHKEYREQ signal and when the answer is returned in the LQHKEYCONF
- * we can reduce the number of outstanding words and check to see
- * if more LQHKEYREQ signals should be sent.
- *
- * However efficient this method is rather unsafe in such way that
- * it overwrites the transid1 original data.
- *
- * Also see TR 587.
- *----------------------------------------------------------------*/
- tcConnectptr.p->transid[0] = TnoOfWords; // Data overload, see note!
- packLqhkeyreqLab(signal);
- tcConnectptr.p->copyCountWords += TnoOfWords;
- scanptr.p->scanState = ScanRecord::WAIT_LQHKEY_COPY;
- if (tcConnectptr.p->copyCountWords < cmaxWordsAtNodeRec) {
- nextRecordCopy(signal);
- return;
- }//if
- return;
- }//Dblqh::copyTupkeyConfLab()
- /*---------------------------------------------------------------------------*/
- /* ENTER LQHKEYCONF */
- /*---------------------------------------------------------------------------*/
- /* PRECONDITION: CONNECT_STATE = COPY_CONNECTED */
- /*---------------------------------------------------------------------------*/
- void Dblqh::copyCompletedLab(Signal* signal)
- {
- const LqhKeyConf * const lqhKeyConf = (LqhKeyConf *)signal->getDataPtr();
- ndbrequire(tcConnectptr.p->transid[1] == lqhKeyConf->transId2);
- scanptr.i = tcConnectptr.p->tcScanRec;
- c_scanRecordPool.getPtr(scanptr);
- if (tcConnectptr.p->copyCountWords >= cmaxWordsAtNodeRec) {
- tcConnectptr.p->copyCountWords -= lqhKeyConf->transId1; // Data overload, see note!
- if (scanptr.p->scanCompletedStatus == ZTRUE) {
- jam();
- /*---------------------------------------------------------------------------*/
- // Copy to complete, we will not start any new copying.
- /*---------------------------------------------------------------------------*/
- closeCopyLab(signal);
- return;
- }//if
- if (tcConnectptr.p->copyCountWords < cmaxWordsAtNodeRec) {
- jam();
- nextRecordCopy(signal);
- }//if
- return;
- }//if
- tcConnectptr.p->copyCountWords -= lqhKeyConf->transId1; // Data overload, see note!
- ndbrequire(tcConnectptr.p->copyCountWords <= cmaxWordsAtNodeRec);
- if (tcConnectptr.p->copyCountWords > 0) {
- jam();
- return;
- }//if
- /*---------------------------------------------------------------------------*/
- // No more outstanding copies. We will only start new ones from here if it was
- // stopped before and this only happens when copyCountWords is bigger than the
- // threshold value. Since this did not occur we must be waiting for completion.
- // Check that this is so. If not we crash to find out what is going on.
- /*---------------------------------------------------------------------------*/
- if (scanptr.p->scanCompletedStatus == ZTRUE) {
- jam();
- closeCopyLab(signal);
- return;
- }//if
- if (scanptr.p->scanState == ScanRecord::WAIT_LQHKEY_COPY) {
- jam();
- /*---------------------------------------------------------------------------*/
- // Make sure that something is in progress. Otherwise we will simply stop
- // and nothing more will happen.
- /*---------------------------------------------------------------------------*/
- systemErrorLab(signal);
- return;
- }//if
- return;
- }//Dblqh::copyCompletedLab()
- void Dblqh::nextRecordCopy(Signal* signal)
- {
- fragptr.i = tcConnectptr.p->fragmentptr;
- ptrCheckGuard(fragptr, cfragrecFileSize, fragrecord);
- scanptr.i = tcConnectptr.p->tcScanRec;
- c_scanRecordPool.getPtr(scanptr);
- if (scanptr.p->scanState != ScanRecord::WAIT_LQHKEY_COPY) {
- jam();
- /*---------------------------------------------------------------------------*/
- // Make sure that nothing is in progress. Otherwise we will have to simultaneous
- // scans on the same record and this will certainly lead to unexpected
- // behaviour.
- /*---------------------------------------------------------------------------*/
- systemErrorLab(signal);
- return;
- }//if
- scanptr.p->scanState = ScanRecord::WAIT_NEXT_SCAN_COPY;
- switch (fragptr.p->fragStatus) {
- case Fragrecord::FSACTIVE:
- jam();
- linkActiveFrag(signal);
- break;
- case Fragrecord::BLOCKED:
- jam();
- linkFragQueue(signal);
- tcConnectptr.p->transactionState = TcConnectionrec::COPY_STOPPED;
- return;
- break;
- case Fragrecord::FREE:
- jam();
- case Fragrecord::ACTIVE_CREATION:
- jam();
- case Fragrecord::CRASH_RECOVERING:
- jam();
- case Fragrecord::DEFINED:
- jam();
- case Fragrecord::REMOVING:
- jam();
- default:
- jam();
- systemErrorLab(signal);
- return;
- break;
- }//switch
- continueCopyAfterBlockedLab(signal);
- return;
- }//Dblqh::nextRecordCopy()
- void Dblqh::continueCopyAfterBlockedLab(Signal* signal)
- {
- scanptr.i = tcConnectptr.p->tcScanRec;
- c_scanRecordPool.getPtr(scanptr);
- tcConnectptr.p->errorCode = 0;
- Uint32 acc_op_ptr= get_acc_ptr_from_scan_record(scanptr.p, 0, false);
- signal->theData[0] = scanptr.p->scanAccPtr;
- signal->theData[1] = acc_op_ptr;
- signal->theData[2] = NextScanReq::ZSCAN_NEXT_COMMIT;
- sendSignal(tcConnectptr.p->tcAccBlockref, GSN_NEXT_SCANREQ, signal, 3, JBB);
- return;
- }//Dblqh::continueCopyAfterBlockedLab()
- void Dblqh::copyLqhKeyRefLab(Signal* signal)
- {
- ndbrequire(tcConnectptr.p->transid[1] == signal->theData[4]);
- tcConnectptr.p->copyCountWords -= signal->theData[3];
- scanptr.i = tcConnectptr.p->tcScanRec;
- c_scanRecordPool.getPtr(scanptr);
- scanptr.p->scanErrorCounter++;
- tcConnectptr.p->errorCode = terrorCode;
- closeCopyLab(signal);
- return;
- }//Dblqh::copyLqhKeyRefLab()
- void Dblqh::closeCopyLab(Signal* signal)
- {
- if (tcConnectptr.p->copyCountWords > 0) {
- /*---------------------------------------------------------------------------*/
- // We are still waiting for responses from the starting node.
- // Wait until all of those have arrived until we start the
- // close process.
- /*---------------------------------------------------------------------------*/
- jam();
- return;
- }//if
- tcConnectptr.p->transid[0] = 0;
- tcConnectptr.p->transid[1] = 0;
- fragptr.i = tcConnectptr.p->fragmentptr;
- ptrCheckGuard(fragptr, cfragrecFileSize, fragrecord);
- scanptr.i = tcConnectptr.p->tcScanRec;
- c_scanRecordPool.getPtr(scanptr);
- scanptr.p->scanState = ScanRecord::WAIT_CLOSE_COPY;
- switch (fragptr.p->fragStatus) {
- case Fragrecord::FSACTIVE:
- jam();
- linkActiveFrag(signal);
- break;
- case Fragrecord::BLOCKED:
- jam();
- linkFragQueue(signal);
- tcConnectptr.p->transactionState = TcConnectionrec::COPY_CLOSE_STOPPED;
- return;
- break;
- case Fragrecord::FREE:
- jam();
- case Fragrecord::ACTIVE_CREATION:
- jam();
- case Fragrecord::CRASH_RECOVERING:
- jam();
- case Fragrecord::DEFINED:
- jam();
- case Fragrecord::REMOVING:
- jam();
- default:
- jam();
- systemErrorLab(signal);
- return;
- break;
- }//switch
- continueCloseCopyAfterBlockedLab(signal);
- return;
- }//Dblqh::closeCopyLab()
- void Dblqh::continueCloseCopyAfterBlockedLab(Signal* signal)
- {
- scanptr.i = tcConnectptr.p->tcScanRec;
- c_scanRecordPool.getPtr(scanptr);
- signal->theData[0] = scanptr.p->scanAccPtr;
- signal->theData[1] = RNIL;
- signal->theData[2] = ZCOPY_CLOSE;
- sendSignal(tcConnectptr.p->tcAccBlockref, GSN_NEXT_SCANREQ, signal, 3, JBB);
- return;
- }//Dblqh::continueCloseCopyAfterBlockedLab()
- /*---------------------------------------------------------------------------*/
- /* ENTER NEXT_SCANCONF WITH */
- /* SCANPTR, */
- /* TFRAGID, */
- /* TACC_OPPTR, */
- /* TLOCAL_KEY1, */
- /* TLOCAL_KEY2, */
- /* TKEY_LENGTH, */
- /* TKEY1, */
- /* TKEY2, */
- /* TKEY3, */
- /* TKEY4 */
- /*---------------------------------------------------------------------------*/
- /* PRECONDITION: SCAN_STATE = WAIT_CLOSE_COPY */
- /*---------------------------------------------------------------------------*/
- void Dblqh::accCopyCloseConfLab(Signal* signal)
- {
- tcConnectptr.i = scanptr.p->scanTcrec;
- scanptr.p->scanState = ScanRecord::WAIT_DELETE_STORED_PROC_ID_COPY;
- ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
- signal->theData[0] = tcConnectptr.p->tupConnectrec;
- signal->theData[1] = tcConnectptr.p->tableref;
- signal->theData[2] = scanptr.p->scanSchemaVersion;
- signal->theData[3] = ZDELETE_STORED_PROC_ID;
- signal->theData[4] = scanptr.p->scanStoredProcId;
- sendSignal(tcConnectptr.p->tcTupBlockref, GSN_STORED_PROCREQ, signal, 5, JBB);
- return;
- }//Dblqh::accCopyCloseConfLab()
- /*---------------------------------------------------------------------------*/
- /* ENTER STORED_PROCCONF WITH */
- /* TC_CONNECTPTR, */
- /* TSTORED_PROC_ID */
- /*---------------------------------------------------------------------------*/
- /* PRECONDITION: SCAN_STATE = WAIT_DELETE_STORED_PROC_ID_COPY */
- /*---------------------------------------------------------------------------*/
- void Dblqh::tupCopyCloseConfLab(Signal* signal)
- {
- fragptr.i = tcConnectptr.p->fragmentptr;
- ptrCheckGuard(fragptr, cfragrecFileSize, fragrecord);
- fragptr.p->copyFragState = ZIDLE;
- if (tcConnectptr.p->abortState == TcConnectionrec::NEW_FROM_TC) {
- jam();
- tcNodeFailptr.i = tcConnectptr.p->tcNodeFailrec;
- ptrCheckGuard(tcNodeFailptr, ctcNodeFailrecFileSize, tcNodeFailRecord);
- tcNodeFailptr.p->tcRecNow = tcConnectptr.i + 1;
- signal->theData[0] = ZLQH_TRANS_NEXT;
- signal->theData[1] = tcNodeFailptr.i;
- sendSignal(cownref, GSN_CONTINUEB, signal, 2, JBB);
- CopyFragRef * const ref = (CopyFragRef *)&signal->theData[0];
- ref->userPtr = scanptr.p->copyPtr;
- ref->sendingNodeId = cownNodeid;
- ref->startingNodeId = scanptr.p->scanNodeId;
- ref->tableId = fragptr.p->tabRef;
- ref->fragId = fragptr.p->fragId;
- ref->errorCode = ZNODE_FAILURE_ERROR;
- sendSignal(scanptr.p->scanApiBlockref, GSN_COPY_FRAGREF, signal,
- CopyFragRef::SignalLength, JBB);
- } else {
- if (scanptr.p->scanErrorCounter > 0) {
- jam();
- CopyFragRef * const ref = (CopyFragRef *)&signal->theData[0];
- ref->userPtr = scanptr.p->copyPtr;
- ref->sendingNodeId = cownNodeid;
- ref->startingNodeId = scanptr.p->scanNodeId;
- ref->tableId = fragptr.p->tabRef;
- ref->fragId = fragptr.p->fragId;
- ref->errorCode = tcConnectptr.p->errorCode;
- sendSignal(scanptr.p->scanApiBlockref, GSN_COPY_FRAGREF, signal,
- CopyFragRef::SignalLength, JBB);
- } else {
- jam();
- CopyFragConf * const conf = (CopyFragConf *)&signal->theData[0];
- conf->userPtr = scanptr.p->copyPtr;
- conf->sendingNodeId = cownNodeid;
- conf->startingNodeId = scanptr.p->scanNodeId;
- conf->tableId = tcConnectptr.p->tableref;
- conf->fragId = tcConnectptr.p->fragmentid;
- sendSignal(scanptr.p->scanApiBlockref, GSN_COPY_FRAGCONF, signal,
- CopyFragConf::SignalLength, JBB);
- }//if
- }//if
- releaseActiveCopy(signal);
- tcConnectptr.p->tcScanRec = RNIL;
- finishScanrec(signal);
- releaseOprec(signal);
- releaseTcrec(signal, tcConnectptr);
- releaseScanrec(signal);
- }//Dblqh::tupCopyCloseConfLab()
- /*---------------------------------------------------------------------------*/
- /* A NODE FAILURE OCCURRED DURING THE COPY PROCESS. WE NEED TO CLOSE THE */
- /* COPY PROCESS SINCE A NODE FAILURE DURING THE COPY PROCESS WILL ALSO */
- /* FAIL THE NODE THAT IS TRYING TO START-UP. */
- /*---------------------------------------------------------------------------*/
- void Dblqh::closeCopyRequestLab(Signal* signal)
- {
- scanptr.p->scanErrorCounter++;
- switch (scanptr.p->scanState) {
- case ScanRecord::WAIT_TUPKEY_COPY:
- case ScanRecord::WAIT_COPY_KEYINFO:
- case ScanRecord::WAIT_NEXT_SCAN_COPY:
- jam();
- /*---------------------------------------------------------------------------*/
- /* SET COMPLETION STATUS AND WAIT FOR OPPORTUNITY TO STOP THE SCAN. */
- // ALSO SET NO OF WORDS OUTSTANDING TO ZERO TO AVOID ETERNAL WAIT.
- /*---------------------------------------------------------------------------*/
- scanptr.p->scanCompletedStatus = ZTRUE;
- tcConnectptr.p->copyCountWords = 0;
- break;
- case ScanRecord::WAIT_ACC_COPY:
- case ScanRecord::WAIT_STORED_PROC_COPY:
- jam();
- /*---------------------------------------------------------------------------*/
- /* WE ARE CURRENTLY STARTING UP THE SCAN. SET COMPLETED STATUS AND WAIT FOR*/
- /* COMPLETION OF STARTUP. */
- /*---------------------------------------------------------------------------*/
- scanptr.p->scanCompletedStatus = ZTRUE;
- break;
- case ScanRecord::WAIT_CLOSE_COPY:
- case ScanRecord::WAIT_DELETE_STORED_PROC_ID_COPY:
- jam();
- /*---------------------------------------------------------------------------*/
- /* CLOSE IS ALREADY ONGOING. WE NEED NOT DO ANYTHING. */
- /*---------------------------------------------------------------------------*/
- break;
- case ScanRecord::WAIT_LQHKEY_COPY:
- jam();
- /*---------------------------------------------------------------------------*/
- /* WE ARE WAITING FOR THE FAILED NODE. THE NODE WILL NEVER COME BACK. */
- // WE NEED TO START THE FAILURE HANDLING IMMEDIATELY.
- // ALSO SET NO OF WORDS OUTSTANDING TO ZERO TO AVOID ETERNAL WAIT.
- /*---------------------------------------------------------------------------*/
- tcConnectptr.p->copyCountWords = 0;
- closeCopyLab(signal);
- break;
- default:
- ndbrequire(false);
- break;
- }//switch
- return;
- }//Dblqh::closeCopyRequestLab()
- /* ****************************************************** */
- /* COPY_ACTIVEREQ: Change state of a fragment to ACTIVE. */
- /* ****************************************************** */
- void Dblqh::execCOPY_ACTIVEREQ(Signal* signal)
- {
- CRASH_INSERTION(5026);
- const CopyActiveReq * const req = (CopyActiveReq *)&signal->theData[0];
- jamEntry();
- Uint32 masterPtr = req->userPtr;
- BlockReference masterRef = req->userRef;
- tabptr.i = req->tableId;
- ptrCheckGuard(tabptr, ctabrecFileSize, tablerec);
- Uint32 fragId = req->fragId;
- ndbrequire(getFragmentrec(signal, fragId));
- fragptr.p->fragDistributionKey = req->distributionKey;
-
- ndbrequire(cnoActiveCopy < 3);
- cactiveCopy[cnoActiveCopy] = fragptr.i;
- cnoActiveCopy++;
- fragptr.p->masterBlockref = masterRef;
- fragptr.p->masterPtr = masterPtr;
- if (fragptr.p->fragStatus == Fragrecord::FSACTIVE) {
- jam();
- /*------------------------------------------------------*/
- /* PROCESS HAVE ALREADY BEEN STARTED BY PREVIOUS */
- /* MASTER. WE HAVE ALREADY SET THE PROPER MASTER */
- /* BLOCK REFERENCE. */
- /*------------------------------------------------------*/
- if (fragptr.p->activeTcCounter == 0) {
- jam();
- /*------------------------------------------------------*/
- /* PROCESS WAS EVEN COMPLETED. */
- /*------------------------------------------------------*/
- sendCopyActiveConf(signal, tabptr.i);
- }//if
- return;
- }//if
- fragptr.p->fragStatus = Fragrecord::FSACTIVE;
- if (fragptr.p->lcpFlag == Fragrecord::LCP_STATE_TRUE) {
- jam();
- fragptr.p->logFlag = Fragrecord::STATE_TRUE;
- }//if
- fragptr.p->activeTcCounter = 1;
- /*------------------------------------------------------*/
- /* SET IT TO ONE TO ENSURE THAT IT IS NOT POSSIBLE*/
- /* TO DECREASE IT TO ZERO UNTIL WE HAVE COMPLETED */
- /* THE SCAN. */
- /*------------------------------------------------------*/
- signal->theData[0] = ZSCAN_TC_CONNECT;
- signal->theData[1] = 0;
- signal->theData[2] = tabptr.i;
- signal->theData[3] = fragId;
- sendSignal(cownref, GSN_CONTINUEB, signal, 4, JBB);
- return;
- }//Dblqh::execCOPY_ACTIVEREQ()
- void Dblqh::scanTcConnectLab(Signal* signal, Uint32 tstartTcConnect, Uint32 fragId)
- {
- Uint32 tendTcConnect;
- ndbrequire(getFragmentrec(signal, fragId));
- if ((tstartTcConnect + 200) >= ctcConnectrecFileSize) {
- jam();
- tendTcConnect = ctcConnectrecFileSize - 1;
- } else {
- jam();
- tendTcConnect = tstartTcConnect + 200;
- }//if
- for (tcConnectptr.i = tstartTcConnect;
- tcConnectptr.i <= tendTcConnect;
- tcConnectptr.i++) {
- jam();
- ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
- if (tcConnectptr.p->transactionState != TcConnectionrec::IDLE) {
- switch (tcConnectptr.p->logWriteState) {
- case TcConnectionrec::NOT_WRITTEN:
- jam();
- if (fragptr.i == tcConnectptr.p->fragmentptr) {
- jam();
- fragptr.p->activeTcCounter = fragptr.p->activeTcCounter + 1;
- tcConnectptr.p->logWriteState = TcConnectionrec::NOT_WRITTEN_WAIT;
- }//if
- break;
- default:
- jam();
- /*empty*/;
- break;
- }//switch
- }//if
- }//for
- if (tendTcConnect < (ctcConnectrecFileSize - 1)) {
- jam();
- signal->theData[0] = ZSCAN_TC_CONNECT;
- signal->theData[1] = tendTcConnect + 1;
- signal->theData[2] = tabptr.i;
- signal->theData[3] = fragId;
- sendSignal(cownref, GSN_CONTINUEB, signal, 4, JBB);
- } else {
- jam();
- /*------------------------------------------------------*/
- /* THE SCAN HAVE BEEN COMPLETED. WE CHECK IF ALL */
- /* OPERATIONS HAVE ALREADY BEEN COMPLETED. */
- /*------------------------------------------------------*/
- ndbrequire(fragptr.p->activeTcCounter > 0);
- fragptr.p->activeTcCounter--;
- if (fragptr.p->activeTcCounter == 0) {
- jam();
- /*------------------------------------------------------*/
- /* SET START GLOBAL CHECKPOINT TO THE NEXT */
- /* CHECKPOINT WE HAVE NOT YET HEARD ANYTHING ABOUT*/
- /* THIS GCP WILL BE COMPLETELY COVERED BY THE LOG.*/
- /*------------------------------------------------------*/
- fragptr.p->startGci = cnewestGci + 1;
- sendCopyActiveConf(signal, tabptr.i);
- }//if
- }//if
- return;
- }//Dblqh::scanTcConnectLab()
- /*---------------------------------------------------------------------------*/
- /* A NEW MASTER IS REQUESTING THE STATE IN LQH OF THE COPY FRAGMENT PARTS. */
- /*---------------------------------------------------------------------------*/
- /* ***************>> */
- /* COPY_STATEREQ > */
- /* ***************>> */
- void Dblqh::execCOPY_STATEREQ(Signal* signal)
- {
- jamEntry();
- ndbrequire(0)
- #if 0
- Uint32* dataPtr = &signal->theData[2];
- BlockReference tmasterBlockref = signal->theData[0];
- Uint32 tnoCopy = 0;
- do {
- jam();
- arrGuard(tnoCopy, 4);
- fragptr.i = cactiveCopy[tnoCopy];
- if (fragptr.i == RNIL) {
- jam();
- break;
- }//if
- ptrCheckGuard(fragptr, cfragrecFileSize, fragrecord);
- if (fragptr.p->copyFragState != ZIDLE) {
- jam();
- /*---------------------------------------------------------------------------*/
- /* THIS FRAGMENT IS CURRENTLY ACTIVE IN COPYING THE FRAGMENT. */
- /*---------------------------------------------------------------------------*/
- scanptr.i = fragptr.p->fragScanRec[NR_ScanNo];
- c_scanRecordPool.getPtr(scanptr);
- if (scanptr.p->scanCompletedStatus == ZTRUE) {
- jam();
- dataPtr[3 + (tnoCopy << 2)] = ZCOPY_CLOSING;
- } else {
- jam();
- dataPtr[3 + (tnoCopy << 2)] = ZCOPY_ONGOING;
- }//if
- dataPtr[2 + (tnoCopy << 2)] = scanptr.p->scanSchemaVersion;
- scanptr.p->scanApiBlockref = tmasterBlockref;
- } else {
- ndbrequire(fragptr.p->activeTcCounter != 0);
- /*---------------------------------------------------------------------------*/
- /* COPY FRAGMENT IS COMPLETED AND WE ARE CURRENTLY GETTING THE STARTING */
- /* GCI OF THE NEW REPLICA OF THIS FRAGMENT. */
- /*---------------------------------------------------------------------------*/
- fragptr.p->masterBlockref = tmasterBlockref;
- dataPtr[3 + (tnoCopy << 2)] = ZCOPY_ACTIVATION;
- }//if
- dataPtr[tnoCopy << 2] = fragptr.p->tabRef;
- dataPtr[1 + (tnoCopy << 2)] = fragptr.p->fragId;
- tnoCopy++;
- } while (tnoCopy < cnoActiveCopy);
- signal->theData[0] = cownNodeid;
- signal->theData[1] = tnoCopy;
- sendSignal(tmasterBlockref, GSN_COPY_STATECONF, signal, 18, JBB);
- #endif
- return;
- }//Dblqh::execCOPY_STATEREQ()
- /* ========================================================================= */
- /* ======= INITIATE TC RECORD AT COPY FRAGMENT ======= */
- /* */
- /* SUBROUTINE SHORT NAME = ICT */
- /* ========================================================================= */
- void Dblqh::initCopyTc(Signal* signal)
- {
- const NextScanConf * const nextScanConf = (NextScanConf *)&signal->theData[0];
- tcConnectptr.p->primKeyLen = nextScanConf->keyLength;
- tcConnectptr.p->tupkeyData[0] = nextScanConf->key[0];
- tcConnectptr.p->tupkeyData[1] = nextScanConf->key[1];
- tcConnectptr.p->tupkeyData[2] = nextScanConf->key[2];
- tcConnectptr.p->tupkeyData[3] = nextScanConf->key[3];
- scanptr.p->scanLocalref[0] = nextScanConf->localKey[0];
- scanptr.p->scanLocalref[1] = nextScanConf->localKey[1];
- scanptr.p->scanLocalFragid = nextScanConf->fragId;
- tcConnectptr.p->operation = ZREAD;
- tcConnectptr.p->apiVersionNo = 0;
- tcConnectptr.p->opExec = 0; /* NOT INTERPRETED MODE */
- tcConnectptr.p->schemaVersion = scanptr.p->scanSchemaVersion;
- Uint32 reqinfo = 0;
- LqhKeyReq::setKeyLen(reqinfo, nextScanConf->keyLength);
- LqhKeyReq::setLockType(reqinfo, ZINSERT);
- LqhKeyReq::setDirtyFlag(reqinfo, 1);
- LqhKeyReq::setSimpleFlag(reqinfo, 1);
- LqhKeyReq::setOperation(reqinfo, ZWRITE);
- /* AILen in LQHKEYREQ IS ZERO */
- tcConnectptr.p->reqinfo = reqinfo;
- /* ------------------------------------------------------------------------ */
- /* THE RECEIVING NODE WILL EXPECT THAT IT IS THE LAST NODE AND WILL */
- /* SEND COMPLETED AS THE RESPONSE SIGNAL SINCE DIRTY_OP BIT IS SET. */
- /* ------------------------------------------------------------------------ */
- tcConnectptr.p->nodeAfterNext[0] = ZNIL;
- tcConnectptr.p->nodeAfterNext[1] = ZNIL;
- tcConnectptr.p->tcBlockref = cownref;
- tcConnectptr.p->readlenAi = 0;
- tcConnectptr.p->storedProcId = ZNIL;
- tcConnectptr.p->opExec = 0;
- tcConnectptr.p->nextSeqNoReplica = 0;
- tcConnectptr.p->dirtyOp = ZFALSE;
- tcConnectptr.p->lastReplicaNo = 0;
- tcConnectptr.p->currTupAiLen = 0;
- tcConnectptr.p->tcTimer = cLqhTimeOutCount;
- }//Dblqh::initCopyTc()
- /* ------------------------------------------------------------------------- */
- /* ------- SEND COPY_ACTIVECONF TO MASTER DIH ------- */
- /* */
- /* ------------------------------------------------------------------------- */
- void Dblqh::sendCopyActiveConf(Signal* signal, Uint32 tableId)
- {
- releaseActiveCopy(signal);
- CopyActiveConf * const conf = (CopyActiveConf *)&signal->theData[0];
- conf->userPtr = fragptr.p->masterPtr;
- conf->tableId = tableId;
- conf->fragId = fragptr.p->fragId;
- conf->startingNodeId = cownNodeid;
- conf->startGci = fragptr.p->startGci;
- sendSignal(fragptr.p->masterBlockref, GSN_COPY_ACTIVECONF, signal,
- CopyActiveConf::SignalLength, JBB);
- }//Dblqh::sendCopyActiveConf()
- /* ##########################################################################
- * ####### LOCAL CHECKPOINT MODULE #######
- *
- * ##########################################################################
- * --------------------------------------------------------------------------
- * THIS MODULE HANDLES THE EXECUTION AND CONTROL OF LOCAL CHECKPOINTS
- * IT CONTROLS THE LOCAL CHECKPOINTS IN TUP AND ACC. IT DOES ALSO INTERACT
- * WITH DIH TO CONTROL WHICH GLOBAL CHECKPOINTS THAT ARE RECOVERABLE
- * ------------------------------------------------------------------------- */
- void Dblqh::execEMPTY_LCP_REQ(Signal* signal)
- {
- jamEntry();
- CRASH_INSERTION(5008);
- EmptyLcpReq * const emptyLcpOrd = (EmptyLcpReq*)&signal->theData[0];
- lcpPtr.i = 0;
- ptrAss(lcpPtr, lcpRecord);
-
- Uint32 nodeId = refToNode(emptyLcpOrd->senderRef);
- lcpPtr.p->m_EMPTY_LCP_REQ.set(nodeId);
- lcpPtr.p->reportEmpty = true;
- if (lcpPtr.p->lcpState == LcpRecord::LCP_IDLE){
- jam();
- bool ok = false;
- switch(clcpCompletedState){
- case LCP_IDLE:
- ok = true;
- sendEMPTY_LCP_CONF(signal, true);
- break;
- case LCP_RUNNING:
- ok = true;
- sendEMPTY_LCP_CONF(signal, false);
- break;
- case LCP_CLOSE_STARTED:
- jam();
- case ACC_LCP_CLOSE_COMPLETED:
- jam();
- case TUP_LCP_CLOSE_COMPLETED:
- jam();
- ok = true;
- break;
- }
- ndbrequire(ok);
-
- }//if
-
- return;
- }//Dblqh::execEMPTY_LCPREQ()
- void Dblqh::execLCP_FRAG_ORD(Signal* signal)
- {
- jamEntry();
- CRASH_INSERTION(5010);
- LcpFragOrd * const lcpFragOrd = (LcpFragOrd *)&signal->theData[0];
- Uint32 lcpId = lcpFragOrd->lcpId;
- lcpPtr.i = 0;
- ptrAss(lcpPtr, lcpRecord);
-
- lcpPtr.p->lastFragmentFlag = lcpFragOrd->lastFragmentFlag;
- if (lcpFragOrd->lastFragmentFlag) {
- jam();
- if (lcpPtr.p->lcpState == LcpRecord::LCP_IDLE) {
- jam();
- /* ----------------------------------------------------------
- * NOW THE COMPLETE LOCAL CHECKPOINT ROUND IS COMPLETED.
- * -------------------------------------------------------- */
- if (cnoOfFragsCheckpointed > 0) {
- jam();
- completeLcpRoundLab(signal);
- } else {
- jam();
- sendLCP_COMPLETE_REP(signal, lcpId);
- }//if
- }
- return;
- }//if
- tabptr.i = lcpFragOrd->tableId;
- ptrCheckGuard(tabptr, ctabrecFileSize, tablerec);
-
- ndbrequire(tabptr.p->tableStatus == Tablerec::PREP_DROP_TABLE_ONGOING ||
- tabptr.p->tableStatus == Tablerec::PREP_DROP_TABLE_DONE ||
- tabptr.p->tableStatus == Tablerec::TABLE_DEFINED);
- ndbrequire(getFragmentrec(signal, lcpFragOrd->fragmentId));
-
- lcpPtr.i = 0;
- ptrAss(lcpPtr, lcpRecord);
- ndbrequire(!lcpPtr.p->lcpQueued);
- if (c_lcpId < lcpFragOrd->lcpId) {
- jam();
- /**
- * A new LCP
- */
- c_lcpId = lcpFragOrd->lcpId;
- ndbrequire(lcpPtr.p->lcpState == LcpRecord::LCP_IDLE);
- setLogTail(signal, lcpFragOrd->keepGci);
- ndbrequire(clcpCompletedState == LCP_IDLE);
- clcpCompletedState = LCP_RUNNING;
- }//if
- cnoOfFragsCheckpointed++;
-
- if(tabptr.p->tableStatus == Tablerec::PREP_DROP_TABLE_DONE){
- jam();
- LcpRecord::FragOrd fragOrd;
- fragOrd.fragPtrI = fragptr.i;
- fragOrd.lcpFragOrd = * lcpFragOrd;
- sendLCP_FRAG_REP(signal, fragOrd);
- return;
- }
- if (lcpPtr.p->lcpState != LcpRecord::LCP_IDLE) {
- ndbrequire(lcpPtr.p->lcpQueued == false);
- lcpPtr.p->lcpQueued = true;
- lcpPtr.p->queuedFragment.fragPtrI = fragptr.i;
- lcpPtr.p->queuedFragment.lcpFragOrd = * lcpFragOrd;
- return;
- }//if
-
- lcpPtr.p->currentFragment.fragPtrI = fragptr.i;
- lcpPtr.p->currentFragment.lcpFragOrd = * lcpFragOrd;
-
- sendLCP_FRAGIDREQ(signal);
- }//Dblqh::execLCP_FRAGORD()
- /* --------------------------------------------------------------------------
- * PRECONDITION: LCP_PTR:LCP_STATE = WAIT_FRAGID
- * --------------------------------------------------------------------------
- * WE NOW HAVE THE LOCAL FRAGMENTS THAT THE LOCAL CHECKPOINT WILL USE.
- * -------------------------------------------------------------------------- */
- void Dblqh::execLCP_FRAGIDCONF(Signal* signal)
- {
- UintR Tfragid[4];
- jamEntry();
- lcpPtr.i = signal->theData[0];
-
- Uint32 TaccPtr = signal->theData[1];
- Uint32 noLocfrag = signal->theData[2];
- Tfragid[0] = signal->theData[3];
- Tfragid[1] = signal->theData[4];
- ptrCheckGuard(lcpPtr, clcpFileSize, lcpRecord);
- ndbrequire(lcpPtr.p->lcpState == LcpRecord::LCP_WAIT_FRAGID);
- /* ------------------------------------------------------------------------
- * NO ERROR CHECKING OF TNO_LOCFRAG VALUE. OUT OF BOUND WILL IMPLY THAT AN
- * INDEX OUT OF RANGE WILL CAUSE A SYSTEM RESTART WHICH IS DESIRED.
- * ------------------------------------------------------------------------ */
- lcpPtr.p->lcpAccptr = TaccPtr;
- fragptr.i = lcpPtr.p->currentFragment.fragPtrI;
- ptrCheckGuard(fragptr, cfragrecFileSize, fragrecord);
- ndbrequire(noLocfrag - 1 < 2);
- for (Uint32 Tindex = 0; Tindex < noLocfrag; Tindex++) {
- jam();
- Uint32 fragId = Tfragid[Tindex];
- /* ----------------------------------------------------------------------
- * THERE IS NO ERROR CHECKING ON PURPOSE. IT IS POSSIBLE TO CALCULATE HOW
- * MANY LOCAL LCP RECORDS THERE SHOULD BE. IT SHOULD NEVER HAPPEN THAT
- * THERE IS NO ONE FREE. IF THERE IS NO ONE IT WILL ALSO BE A POINTER
- * OUT OF RANGE WHICH IS AN ERROR CODE IN ITSELF. REUSES ERROR HANDLING
- * IN AXE VM.
- * ---------------------------------------------------------------------- */
- seizeLcpLoc(signal);
- initLcpLocAcc(signal, fragId);
- seizeLcpLoc(signal);
- initLcpLocTup(signal, fragId);
- signal->theData[0] = lcpLocptr.i;
- signal->theData[1] = cownref;
- signal->theData[2] = lcpPtr.p->currentFragment.lcpFragOrd.tableId;
- signal->theData[3] = lcpLocptr.p->locFragid;
- signal->theData[4] = lcpPtr.p->currentFragment.lcpFragOrd.lcpNo;
- signal->theData[5] = lcpPtr.p->currentFragment.lcpFragOrd.lcpId % MAX_LCP_STORED;
- sendSignal(fragptr.p->tupBlockref, GSN_TUP_PREPLCPREQ, signal, 6, JBB);
- }//for
- lcpPtr.p->lcpState = LcpRecord::LCP_WAIT_TUP_PREPLCP;
- return;
- }//Dblqh::execLCP_FRAGIDCONF()
- /* --------------------------------------------------------------------------
- * PRECONDITION: LCP_LOCPTR:LCP_STATE = WAIT_TUPPREPLCP
- * --------------------------------------------------------------------------
- * WE HAVE NOW PREPARED A LOCAL FRAGMENT IN TUP FOR LCP EXECUTION.
- * -------------------------------------------------------------------------- */
- void Dblqh::execTUP_PREPLCPCONF(Signal* signal)
- {
- UintR ttupPtr;
- jamEntry();
- lcpLocptr.i = signal->theData[0];
- ttupPtr = signal->theData[1];
- ptrCheckGuard(lcpLocptr, clcpLocrecFileSize, lcpLocRecord);
- lcpPtr.i = lcpLocptr.p->masterLcpRec;
- ptrCheckGuard(lcpPtr, clcpFileSize, lcpRecord);
- ndbrequire(lcpLocptr.p->lcpLocstate == LcpLocRecord::WAIT_TUP_PREPLCP);
- lcpLocptr.p->tupRef = ttupPtr;
- lcpLocptr.p->lcpLocstate = LcpLocRecord::IDLE;
- checkLcpTupprep(signal);
- if (lcpPtr.p->lcpState != LcpRecord::LCP_WAIT_HOLDOPS) {
- jam();
- return;
- }//if
- fragptr.i = lcpPtr.p->currentFragment.fragPtrI;
- ptrCheckGuard(fragptr, cfragrecFileSize, fragrecord);
- lcpLocptr.i = lcpPtr.p->firstLcpLocAcc;
- do {
- jam();
- ptrCheckGuard(lcpLocptr, clcpLocrecFileSize, lcpLocRecord);
- lcpLocptr.p->lcpLocstate = LcpLocRecord::WAIT_LCPHOLDOP;
- signal->theData[0] = lcpPtr.p->lcpAccptr;
- signal->theData[1] = lcpLocptr.p->locFragid;
- signal->theData[2] = 0;
- signal->theData[3] = lcpLocptr.i;
- sendSignal(fragptr.p->accBlockref, GSN_LCP_HOLDOPREQ, signal, 4, JBA);
- lcpLocptr.i = lcpLocptr.p->nextLcpLoc;
- } while (lcpLocptr.i != RNIL);
- /* ------------------------------------------------------------------------
- * SET STATE ON FRAGMENT TO BLOCKED TO ENSURE THAT NO MORE OPERATIONS ARE
- * STARTED FROM LQH IN TUP AND ACC UNTIL THE START CHECKPOINT HAS BEEN
- * COMPLETED. ALSO SET THE LOCAL CHECKPOINT STATE TO WAIT FOR
- * LCP_HOLDOPCONF
- * ----------------------------------------------------------------------- */
- fragptr.p->fragStatus = Fragrecord::BLOCKED;
- fragptr.p->fragActiveStatus = ZTRUE;
- lcpPtr.p->lcpState = LcpRecord::LCP_WAIT_HOLDOPS;
- return;
- }//Dblqh::execTUP_PREPLCPCONF()
- void Dblqh::execTUP_PREPLCPREF(Signal* signal)
- {
- jamEntry();
- ndbrequire(false);
- }//Dblqh::execTUP_PREPLCPREF()
- void Dblqh::execLCP_FRAGIDREF(Signal* signal)
- {
- jamEntry();
- ndbrequire(false);
- }//Dblqh::execLCP_FRAGIDREF()
- /* --------------------------------------------------------------------------
- * A NUMBER OF OPERATIONS THAT HAVE BEEN SET ON HOLD IN ACC. MOVE THOSE TO
- * LIST OF BLOCKED ACC OPERATIONS. IF MORE OPERATIONS ARE BLOCKED GET THOSE
- * OTHERWISE CONTINUE THE LOCAL CHECKPOINT BY REQUESTING TUP AND ACC TO
- * WRITE THEIR START CHECKPOINT.
- * --------------------------------------------------------------------------
- * PRECONDITION: LCP_LOCPTR:LCP_LOCSTATE = WAIT_LCPHOLDOP
- * ------------------------------------------------------------------------- */
- /* ***************>> */
- /* LCP_HOLDOPCONF > */
- /* ***************>> */
- void Dblqh::execLCP_HOLDOPCONF(Signal* signal)
- {
- UintR tnoHoldops;
- Uint32 Tdata[23];
- Uint32 Tlength;
- jamEntry();
- lcpLocptr.i = signal->theData[0];
- Tlength = signal->theData[1];
- for (Uint32 i = 0; i < 23; i++)
- Tdata[i] = signal->theData[i + 2];
- ptrCheckGuard(lcpLocptr, clcpLocrecFileSize, lcpLocRecord);
- ndbrequire(lcpLocptr.p->lcpLocstate == LcpLocRecord::WAIT_LCPHOLDOP);
- lcpPtr.i = lcpLocptr.p->masterLcpRec;
- /* ------------------------------------------------------------------------
- * NO ERROR CHECK ON USING VALUE IN MASTER_LCP_REC. ERROR IN THIS
- * REFERENCE WILL CAUSE POINTER OUT OF RANGE WHICH CAUSES A SYSTEM RESTART.
- * ----------------------------------------------------------------------- */
- tnoHoldops = Tlength & 65535;
- ptrCheckGuard(lcpPtr, clcpFileSize, lcpRecord);
- fragptr.i = lcpPtr.p->currentFragment.fragPtrI;
- ptrCheckGuard(fragptr, cfragrecFileSize, fragrecord);
- ndbrequire(tnoHoldops <= 23);
- for (Uint32 Tindex = 0; Tindex < tnoHoldops; Tindex++) {
- jam();
- tcConnectptr.i = Tdata[Tindex];
- ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
- moveActiveToAcc(signal);
- }//for
- if ((Tlength >> 16) == 1) {
- jam();
- /* MORE HOLDOPS NEEDED */
- signal->theData[0] = lcpPtr.p->lcpAccptr;
- signal->theData[1] = lcpLocptr.p->locFragid;
- signal->theData[2] = 1;
- signal->theData[3] = lcpLocptr.i;
- sendSignal(fragptr.p->accBlockref, GSN_LCP_HOLDOPREQ, signal, 4, JBA);
- return;
- } else {
- jam();
- /* NO MORE HOLDOPS NEEDED */
- lcpLocptr.p->lcpLocstate = LcpLocRecord::HOLDOP_READY;
- checkLcpHoldop(signal);
- if (lcpPtr.p->lcpState == LcpRecord::LCP_WAIT_ACTIVE_FINISH) {
- if (fragptr.p->activeList == RNIL) {
- jam();
- /* ------------------------------------------------------------------
- * THERE ARE NO MORE ACTIVE OPERATIONS. IT IS NOW OK TO START THE
- * LOCAL CHECKPOINT IN BOTH TUP AND ACC.
- * ----------------------------------------------------------------- */
- sendStartLcp(signal);
- lcpPtr.p->lcpState = LcpRecord::LCP_START_CHKP;
- } else {
- jam();
- // Set this to signal releaseActiveFrag
- // that it should check to see if itäs time to call sendStartLcp
- fragptr.p->lcpRef = lcpPtr.i;
- }//if
- }//if
- }//if
- /* ----------------------- */
- /* ELSE */
- /* ------------------------------------------------------------------------
- * THERE ARE STILL MORE ACTIVE OPERATIONS. WAIT UNTIL THEY ARE FINSIHED.
- * THIS IS DISCOVERED WHEN RELEASE_ACTIVE_FRAG IS EXECUTED.
- * ------------------------------------------------------------------------
- * DO NOTHING, EXIT IS EXECUTED BELOW
- * ----------------------------------------------------------------------- */
- return;
- }//Dblqh::execLCP_HOLDOPCONF()
- /* ***************> */
- /* LCP_HOLDOPREF > */
- /* ***************> */
- void Dblqh::execLCP_HOLDOPREF(Signal* signal)
- {
- jamEntry();
- ndbrequire(false);
- }//Dblqh::execLCP_HOLDOPREF()
- /* ************************************************************************>>
- * ACC_LCPSTARTED: Confirm that ACC started local checkpoint and undo
- * logging is on.
- * ************************************************************************>>
- * --------------------------------------------------------------------------
- * PRECONDITION: LCP_LOCPTR:LCP_LOCSTATE = ACC_WAIT_STARTED
- * ------------------------------------------------------------------------- */
- void Dblqh::execACC_LCPSTARTED(Signal* signal)
- {
- jamEntry();
- lcpLocptr.i = signal->theData[0];
- ptrCheckGuard(lcpLocptr, clcpLocrecFileSize, lcpLocRecord);
- ndbrequire(lcpLocptr.p->lcpLocstate == LcpLocRecord::ACC_WAIT_STARTED);
- lcpPtr.i = lcpLocptr.p->masterLcpRec;
- ptrCheckGuard(lcpPtr, clcpFileSize, lcpRecord);
- /* ------------------------------------------------------------------------
- * NO ERROR CHECK ON USING VALUE IN MASTER_LCP_REC. ERROR IN THIS
- * REFERENCE WILL CAUSE POINTER OUT OF RANGE WHICH CAUSES A SYSTEM RESTART.
- * ----------------------------------------------------------------------- */
- lcpLocptr.p->lcpLocstate = LcpLocRecord::ACC_STARTED;
- lcpStartedLab(signal);
- return;
- }//Dblqh::execACC_LCPSTARTED()
- /* ******************************************> */
- /* TUP_LCPSTARTED: Same as above but for TUP. */
- /* ******************************************> */
- /* --------------------------------------------------------------------------
- * PRECONDITION: LCP_LOCPTR:LCP_LOCSTATE = TUP_WAIT_STARTED
- * ------------------------------------------------------------------------- */
- void Dblqh::execTUP_LCPSTARTED(Signal* signal)
- {
- jamEntry();
- lcpLocptr.i = signal->theData[0];
- ptrCheckGuard(lcpLocptr, clcpLocrecFileSize, lcpLocRecord);
- ndbrequire(lcpLocptr.p->lcpLocstate == LcpLocRecord::TUP_WAIT_STARTED);
- lcpPtr.i = lcpLocptr.p->masterLcpRec;
- ptrCheckGuard(lcpPtr, clcpFileSize, lcpRecord);
- /* ------------------------------------------------------------------------
- * NO ERROR CHECK ON USING VALUE IN MASTER_LCP_REC. ERROR IN THIS REFERENCE
- * WILL CAUSE POINTER OUT OF RANGE WHICH CAUSES A SYSTEM RESTART.
- * ----------------------------------------------------------------------- */
- lcpLocptr.p->lcpLocstate = LcpLocRecord::TUP_STARTED;
- lcpStartedLab(signal);
- return;
- }//Dblqh::execTUP_LCPSTARTED()
- void Dblqh::lcpStartedLab(Signal* signal)
- {
- if (checkLcpStarted(signal))
- {
- jam();
- /* ----------------------------------------------------------------------
- * THE LOCAL CHECKPOINT HAS BEEN STARTED. IT IS NOW TIME TO
- * RESTART THE TRANSACTIONS WHICH HAVE BEEN BLOCKED.
- * --------------------------------------------------------------------- */
- fragptr.i = lcpPtr.p->currentFragment.fragPtrI;
- ptrCheckGuard(fragptr, cfragrecFileSize, fragrecord);
- /* ----------------------------------------------------------------------
- * UPDATE THE MAX_GCI_IN_LCP AND MAX_GCI_COMPLETED_IN_LCP NOW BEFORE
- * ACTIVATING THE FRAGMENT AGAIN.
- * --------------------------------------------------------------------- */
- ndbrequire(lcpPtr.p->currentFragment.lcpFragOrd.lcpNo < MAX_LCP_STORED);
- fragptr.p->maxGciInLcp = fragptr.p->newestGci;
- fragptr.p->maxGciCompletedInLcp = cnewestCompletedGci;
- sendAccContOp(signal); /* START OPERATIONS IN ACC */
- moveAccActiveFrag(signal); /* MOVE FROM ACC BLOCKED LIST TO ACTIVE LIST
- ON FRAGMENT */
- }
- /*---------------*/
- /* ELSE */
- /*-------------------------------------------------------------------------*/
- /* THE LOCAL CHECKPOINT HAS NOT BEEN STARTED. EXIT AND WAIT FOR
- * MORE SIGNALS */
- /*-------------------------------------------------------------------------*/
- /* DO NOTHING, EXIT IS EXECUTED BELOW */
- /*-------------------------------------------------------------------------*/
- return;
- }//Dblqh::lcpStartedLab()
- /*---------------------------------------------------------------------------
- * ACC HAVE RESTARTED THE BLOCKED OPERATIONS AGAIN IN ONE FRAGMENT PART.
- * IT IS NOW OUR TURN TO RESTART ALL OPERATIONS QUEUED IN LQH IF ALL
- * FRAGMENT PARTS ARE COMPLETED.
- *-------------------------------------------------------------------------- */
- void Dblqh::execACC_CONTOPCONF(Signal* signal)
- {
- if(ERROR_INSERTED(5035) && signal->getSendersBlockRef() != reference()){
- sendSignalWithDelay(reference(), GSN_ACC_CONTOPCONF, signal, 1000,
- signal->length());
- return;
- }
- jamEntry();
- lcpLocptr.i = signal->theData[0];
- ptrCheckGuard(lcpLocptr, clcpLocrecFileSize, lcpLocRecord);
- lcpLocptr.p->accContCounter = 1;
- lcpPtr.i = lcpLocptr.p->masterLcpRec;
- ptrCheckGuard(lcpPtr, clcpFileSize, lcpRecord);
- lcpLocptr.i = lcpPtr.p->firstLcpLocAcc;
- do {
- ptrCheckGuard(lcpLocptr, clcpLocrecFileSize, lcpLocRecord);
- if (lcpLocptr.p->accContCounter == 0) {
- jam();
- return;
- }//if
- lcpLocptr.i = lcpLocptr.p->nextLcpLoc;
- } while (lcpLocptr.i != RNIL);
- fragptr.i = lcpPtr.p->currentFragment.fragPtrI;
- ptrCheckGuard(fragptr, cfragrecFileSize, fragrecord);
- restartOperationsLab(signal);
- return;
- }//Dblqh::execACC_CONTOPCONF()
- /* ********************************************************* */
- /* LQH_RESTART_OP: Restart operations after beeing blocked. */
- /* ********************************************************* */
- /*---------------------------------------------------------------------------*/
- /* PRECONDITION: FRAG_STATUS = BLOCKED AND LCP_STATE = STARTED */
- /*---------------------------------------------------------------------------*/
- void Dblqh::execLQH_RESTART_OP(Signal* signal)
- {
- jamEntry();
- fragptr.i = signal->theData[0];
- ptrCheckGuard(fragptr, cfragrecFileSize, fragrecord);
- lcpPtr.i = signal->theData[1];
- ptrCheckGuard(lcpPtr, clcpFileSize, lcpRecord);
- ndbrequire(fragptr.p->fragStatus == Fragrecord::BLOCKED);
- restartOperationsLab(signal);
- }//Dblqh::execLQH_RESTART_OP()
- void Dblqh::restartOperationsLab(Signal* signal)
- {
- Uint32 loopCount = 0;
- tcConnectptr.i = fragptr.p->firstWaitQueue;
- do {
- if (tcConnectptr.i != RNIL) {
- jam();
- /*---------------------------------------------------------------------------*/
- /* START UP THE TRANSACTION AGAIN. WE START IT AS A SEPARATE SIGNAL. */
- /*---------------------------------------------------------------------------*/
- signal->theData[0] = ZRESTART_OPERATIONS_AFTER_STOP;
- signal->theData[1] = tcConnectptr.i;
- signal->theData[2] = fragptr.i;
- sendSignal(cownref, GSN_CONTINUEB, signal, 3, JBB);
- ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
- tcConnectptr.i = tcConnectptr.p->nextTc;
- } else {
- jam();
- /*--------------------------------------------------------------------------*/
- /* NO MORE OPERATIONS TO RESTART. WE CAN NOW RESET THE STATE TO ACTIVE AND */
- /* RESTART NORMAL ACTIVITIES ON THE FRAGMENT WHILE THE FUZZY PART OF THE */
- /* LOCAL CHECKPOINT IS COMPLETING. */
- /* IF THE CHECKPOINT WAS COMPLETED ALREADY ON THIS FRAGMENT WE PROCEED WITH */
- /* THE NEXT FRAGMENT NOW THAT WE HAVE COMPLETED THIS CHECKPOINT. */
- /*--------------------------------------------------------------------------*/
- fragptr.p->fragStatus = Fragrecord::FSACTIVE;
- if (lcpPtr.p->lcpState == LcpRecord::LCP_BLOCKED_COMP) {
- jam();
- contChkpNextFragLab(signal);
- return;
- }//if
- return;
- }//if
- loopCount++;
- if (loopCount > 16) {
- jam();
- signal->theData[0] = fragptr.i;
- signal->theData[1] = lcpPtr.i;
- sendSignal(cownref, GSN_LQH_RESTART_OP, signal, 2, JBB);
- return;
- }//if
- } while (1);
- }//Dblqh::restartOperationsLab()
- void Dblqh::restartOperationsAfterStopLab(Signal* signal)
- {
- /*-------------------------------------------------------------------------
- * WHEN ARRIVING HERE THE OPERATION IS ALREADY SET IN THE ACTIVE LIST.
- * THUS WE CAN IMMEDIATELY CALL THE METHODS THAT EXECUTE FROM WHERE
- * THE OPERATION WAS STOPPED.
- *------------------------------------------------------------------------ */
- switch (tcConnectptr.p->transactionState) {
- case TcConnectionrec::STOPPED:
- jam();
- /*-----------------------------------------------------------------------
- * STOPPED BEFORE TRYING TO SEND ACCKEYREQ
- *---------------------------------------------------------------------- */
- prepareContinueAfterBlockedLab(signal);
- return;
- break;
- case TcConnectionrec::COMMIT_STOPPED:
- jam();
- /* ----------------------------------------------------------------------
- * STOPPED BEFORE TRYING TO SEND ACC_COMMITREQ
- * --------------------------------------------------------------------- */
- releaseActiveFrag(signal);
- commitContinueAfterBlockedLab(signal);
- return;
- break;
- case TcConnectionrec::ABORT_STOPPED:
- jam();
- /* ----------------------------------------------------------------------
- * STOPPED BEFORE TRYING TO SEND ACC_ABORTREQ
- * --------------------------------------------------------------------- */
- abortContinueAfterBlockedLab(signal, true);
- return;
- break;
- case TcConnectionrec::COPY_STOPPED:
- jam();
- /* ----------------------------------------------------------------------
- * STOPPED BEFORE TRYING TO SEND NEXT_SCANREQ DURING COPY FRAGMENT
- * --------------------------------------------------------------------- */
- continueCopyAfterBlockedLab(signal);
- return;
- break;
- case TcConnectionrec::COPY_FIRST_STOPPED:
- jam();
- /* ----------------------------------------------------------------------
- * STOPPED BEFORE TRYING TO SEND NEXT_SCANREQ DURING COPY FRAGMENT
- * --------------------------------------------------------------------- */
- continueFirstCopyAfterBlockedLab(signal);
- return;
- break;
- case TcConnectionrec::SCAN_FIRST_STOPPED:
- jam();
- /* ----------------------------------------------------------------------
- * STOPPED BEFORE TRYING TO SEND NEXT_SCANREQ DURING SCAN
- * --------------------------------------------------------------------- */
- tcConnectptr.p->transactionState = TcConnectionrec::SCAN_STATE_USED;
- continueFirstScanAfterBlockedLab(signal);
- return;
- break;
- case TcConnectionrec::SCAN_CHECK_STOPPED:
- jam();
- /* ----------------------------------------------------------------------
- * STOPPED BEFORE TRYING TO SEND NEXT_SCANREQ DURING SCAN
- * --------------------------------------------------------------------- */
- tcConnectptr.p->transactionState = TcConnectionrec::SCAN_STATE_USED;
- continueAfterCheckLcpStopBlocked(signal);
- return;
- break;
- case TcConnectionrec::SCAN_STOPPED:
- jam();
- /* ----------------------------------------------------------------------
- * STOPPED BEFORE TRYING TO SEND NEXT_SCANREQ DURING SCAN
- * --------------------------------------------------------------------- */
- tcConnectptr.p->transactionState = TcConnectionrec::SCAN_STATE_USED;
- continueScanAfterBlockedLab(signal);
- return;
- break;
- case TcConnectionrec::SCAN_RELEASE_STOPPED:
- jam();
- /* ----------------------------------------------------------------------
- * STOPPED BEFORE TRYING TO SEND NEXT_SCANREQ DURING RELEASE
- * LOCKS IN SCAN
- * --------------------------------------------------------------------- */
- tcConnectptr.p->transactionState = TcConnectionrec::SCAN_STATE_USED;
- continueScanReleaseAfterBlockedLab(signal);
- return;
- break;
- case TcConnectionrec::SCAN_CLOSE_STOPPED:
- jam();
- /* ----------------------------------------------------------------------
- * STOPPED BEFORE TRYING TO SEND NEXT_SCANREQ DURING CLOSE OF SCAN
- * --------------------------------------------------------------------- */
- continueCloseScanAfterBlockedLab(signal);
- return;
- break;
- case TcConnectionrec::COPY_CLOSE_STOPPED:
- jam();
- /* ----------------------------------------------------------------------
- * STOPPED BEFORE TRYING TO SEND NEXT_SCANREQ DURING CLOSE OF COPY
- * --------------------------------------------------------------------- */
- continueCloseCopyAfterBlockedLab(signal);
- return;
- break;
- default:
- jam();
- systemErrorLab(signal);
- return;
- break;
- }//switch
- }//Dblqh::restartOperationsAfterStopLab()
- /* *************** */
- /* ACC_LCPCONF > */
- /* *************** */
- /*---------------------------------------------------------------------------
- * PRECONDITION: LCP_LOCPTR:LCP_LOCSTATE = ACC_STARTED
- *-------------------------------------------------------------------------- */
- void Dblqh::execACC_LCPCONF(Signal* signal)
- {
- jamEntry();
- lcpLocptr.i = signal->theData[0];
- ptrCheckGuard(lcpLocptr, clcpLocrecFileSize, lcpLocRecord);
- ndbrequire(lcpLocptr.p->lcpLocstate == LcpLocRecord::ACC_STARTED);
- lcpPtr.i = lcpLocptr.p->masterLcpRec;
- ptrCheckGuard(lcpPtr, clcpFileSize, lcpRecord);
- /* ------------------------------------------------------------------------
- * NO ERROR CHECK ON USING VALUE IN MASTER_LCP_REC. ERROR IN
- * THIS REFERENCE WILL CAUSE POINTER OUT OF RANGE WHICH CAUSES A
- * SYSTEM RESTART.
- * ----------------------------------------------------------------------- */
- lcpLocptr.p->lcpLocstate = LcpLocRecord::ACC_COMPLETED;
- lcpCompletedLab(signal);
- return;
- }//Dblqh::execACC_LCPCONF()
- /* *************** */
- /* TUP_LCPCONF > */
- /* *************** */
- /* --------------------------------------------------------------------------
- * PRECONDITION: LCP_LOCPTR:LCP_LOCSTATE = TUP_STARTED
- * ------------------------------------------------------------------------- */
- void Dblqh::execTUP_LCPCONF(Signal* signal)
- {
- jamEntry();
- lcpLocptr.i = signal->theData[0];
- ptrCheckGuard(lcpLocptr, clcpLocrecFileSize, lcpLocRecord);
- ndbrequire(lcpLocptr.p->lcpLocstate == LcpLocRecord::TUP_STARTED);
- lcpPtr.i = lcpLocptr.p->masterLcpRec;
- ptrCheckGuard(lcpPtr, clcpFileSize, lcpRecord);
- /* ------------------------------------------------------------------------
- * NO ERROR CHECK ON USING VALUE IN MASTER_LCP_REC. ERROR IN THIS
- * REFERENCE WILL CAUSE POINTER OUT OF RANGE WHICH CAUSES A SYSTEM RESTART.
- * ----------------------------------------------------------------------- */
- lcpLocptr.p->lcpLocstate = LcpLocRecord::TUP_COMPLETED;
- lcpCompletedLab(signal);
- return;
- }//Dblqh::execTUP_LCPCONF()
- void Dblqh::lcpCompletedLab(Signal* signal)
- {
- checkLcpCompleted(signal);
- if (lcpPtr.p->lcpState != LcpRecord::LCP_COMPLETED) {
- jam();
- /* ----------------------------------------------------------------------
- * THE LOCAL CHECKPOINT HAS NOT BEEN COMPLETED, EXIT & WAIT
- * FOR MORE SIGNALS
- * --------------------------------------------------------------------- */
- return;
- }//if
- /* ------------------------------------------------------------------------
- * THE LOCAL CHECKPOINT HAS BEEN COMPLETED. IT IS NOW TIME TO START
- * A LOCAL CHECKPOINT ON THE NEXT FRAGMENT OR COMPLETE THIS LCP ROUND.
- * ------------------------------------------------------------------------
- * WE START BY SENDING LCP_REPORT TO DIH TO REPORT THE COMPLETED LCP.
- * TO CATER FOR NODE CRASHES WE SEND IT IN PARALLEL TO ALL NODES.
- * ----------------------------------------------------------------------- */
- fragptr.i = lcpPtr.p->currentFragment.fragPtrI;
- ptrCheckGuard(fragptr, cfragrecFileSize, fragrecord);
- fragptr.p->fragActiveStatus = ZFALSE;
- contChkpNextFragLab(signal);
- return;
- }//Dblqh::lcpCompletedLab()
- void
- Dblqh::sendLCP_FRAG_REP(Signal * signal,
- const LcpRecord::FragOrd & fragOrd) const {
-
- FragrecordPtr fragPtr;
- fragPtr.i = fragOrd.fragPtrI;
- ptrCheckGuard(fragPtr, cfragrecFileSize, fragrecord);
- ndbrequire(fragOrd.lcpFragOrd.lcpNo < MAX_LCP_STORED);
- LcpFragRep * const lcpReport = (LcpFragRep *)&signal->theData[0];
- lcpReport->nodeId = cownNodeid;
- lcpReport->lcpId = fragOrd.lcpFragOrd.lcpId;
- lcpReport->lcpNo = fragOrd.lcpFragOrd.lcpNo;
- lcpReport->tableId = fragOrd.lcpFragOrd.tableId;
- lcpReport->fragId = fragOrd.lcpFragOrd.fragmentId;
- lcpReport->maxGciCompleted = fragPtr.p->maxGciCompletedInLcp;
- lcpReport->maxGciStarted = fragPtr.p->maxGciInLcp;
-
- for (Uint32 i = 0; i < cnoOfNodes; i++) {
- jam();
- Uint32 nodeId = cnodeData[i];
- if(cnodeStatus[i] == ZNODE_UP){
- jam();
- BlockReference Tblockref = calcDihBlockRef(nodeId);
- sendSignal(Tblockref, GSN_LCP_FRAG_REP, signal,
- LcpFragRep::SignalLength, JBB);
- }//if
- }//for
- }
- void Dblqh::contChkpNextFragLab(Signal* signal)
- {
- /* ------------------------------------------------------------------------
- * UPDATE THE LATEST LOCAL CHECKPOINT COMPLETED ON FRAGMENT.
- * UPDATE THE LCP_ID OF THIS CHECKPOINT.
- * REMOVE THE LINK BETWEEN THE FRAGMENT RECORD AND THE LCP RECORD.
- * ----------------------------------------------------------------------- */
- if (fragptr.p->fragStatus == Fragrecord::BLOCKED) {
- jam();
- /**
- * LCP of fragment complete
- * but restarting of operations isn't
- */
- lcpPtr.p->lcpState = LcpRecord::LCP_BLOCKED_COMP;
- //restartOperationsLab(signal);
- return;
- }//if
- /**
- * Send rep when fragment is done + unblocked
- */
- sendLCP_FRAG_REP(signal, lcpPtr.p->currentFragment);
-
- /* ------------------------------------------------------------------------
- * WE ALSO RELEASE THE LOCAL LCP RECORDS.
- * ----------------------------------------------------------------------- */
- releaseLocalLcps(signal);
- if (lcpPtr.p->lcpQueued) {
- jam();
- /* ----------------------------------------------------------------------
- * Transfer the state from the queued to the active LCP.
- * --------------------------------------------------------------------- */
- lcpPtr.p->lcpQueued = false;
- lcpPtr.p->currentFragment = lcpPtr.p->queuedFragment;
-
- /* ----------------------------------------------------------------------
- * START THE QUEUED LOCAL CHECKPOINT.
- * --------------------------------------------------------------------- */
- sendLCP_FRAGIDREQ(signal);
- return;
- }//if
-
- lcpPtr.p->lcpState = LcpRecord::LCP_IDLE;
- if (lcpPtr.p->lastFragmentFlag){
- jam();
- /* ----------------------------------------------------------------------
- * NOW THE COMPLETE LOCAL CHECKPOINT ROUND IS COMPLETED.
- * --------------------------------------------------------------------- */
- completeLcpRoundLab(signal);
- return;
- }//if
-
- if (lcpPtr.p->reportEmpty) {
- jam();
- sendEMPTY_LCP_CONF(signal, false);
- }//if
- return;
- }//Dblqh::contChkpNextFragLab()
- void Dblqh::sendLCP_FRAGIDREQ(Signal* signal)
- {
- ndbrequire(lcpPtr.p->firstLcpLocTup == RNIL);
- ndbrequire(lcpPtr.p->firstLcpLocAcc == RNIL);
-
- TablerecPtr tabPtr;
- tabPtr.i = lcpPtr.p->currentFragment.lcpFragOrd.tableId;
- ptrAss(tabPtr, tablerec);
- if(tabPtr.p->tableStatus == Tablerec::PREP_DROP_TABLE_ONGOING ||
- tabPtr.p->tableStatus == Tablerec::PREP_DROP_TABLE_DONE){
- jam();
- /**
- * Fake that the fragment is done
- */
- lcpCompletedLab(signal);
- return;
- }
-
- ndbrequire(tabPtr.p->tableStatus == Tablerec::TABLE_DEFINED);
-
- lcpPtr.p->lcpState = LcpRecord::LCP_WAIT_FRAGID;
- signal->theData[0] = lcpPtr.i;
- signal->theData[1] = cownref;
- signal->theData[2] = lcpPtr.p->currentFragment.lcpFragOrd.lcpNo;
- signal->theData[3] = lcpPtr.p->currentFragment.lcpFragOrd.tableId;
- signal->theData[4] = lcpPtr.p->currentFragment.lcpFragOrd.fragmentId;
- signal->theData[5] = lcpPtr.p->currentFragment.lcpFragOrd.lcpId % MAX_LCP_STORED;
- sendSignal(fragptr.p->accBlockref, GSN_LCP_FRAGIDREQ, signal, 6, JBB);
- }//Dblqh::sendLCP_FRAGIDREQ()
- void Dblqh::sendEMPTY_LCP_CONF(Signal* signal, bool idle)
- {
-
- EmptyLcpConf * const rep = (EmptyLcpConf*)&signal->theData[0];
- /* ----------------------------------------------------------------------
- * We have been requested to report when there are no more local
- * waiting to be started or ongoing. In this signal we also report
- * the last completed fragments state.
- * ---------------------------------------------------------------------- */
- rep->senderNodeId = getOwnNodeId();
- if(!idle){
- jam();
- rep->idle = 0 ;
- rep->tableId = lcpPtr.p->currentFragment.lcpFragOrd.tableId;
- rep->fragmentId = lcpPtr.p->currentFragment.lcpFragOrd.fragmentId;
- rep->lcpNo = lcpPtr.p->currentFragment.lcpFragOrd.lcpNo;
- rep->lcpId = lcpPtr.p->currentFragment.lcpFragOrd.lcpId;
- } else {
- jam();
- rep->idle = 1;
- rep->tableId = ~0;
- rep->fragmentId = ~0;
- rep->lcpNo = ~0;
- rep->lcpId = c_lcpId;
- }
-
- for (Uint32 i = 0; i < cnoOfNodes; i++) {
- jam();
- Uint32 nodeId = cnodeData[i];
- if (lcpPtr.p->m_EMPTY_LCP_REQ.get(nodeId)) {
- jam();
-
- BlockReference blockref = calcDihBlockRef(nodeId);
- sendSignal(blockref, GSN_EMPTY_LCP_CONF, signal,
- EmptyLcpConf::SignalLength, JBB);
- }//if
- }//for
- lcpPtr.p->reportEmpty = false;
- lcpPtr.p->m_EMPTY_LCP_REQ.clear();
- }//Dblqh::sendEMPTY_LCPCONF()
- void Dblqh::execACC_LCPREF(Signal* signal)
- {
- jamEntry();
- ndbrequire(false);
- }//Dblqh::execACC_LCPREF()
- void Dblqh::execTUP_LCPREF(Signal* signal)
- {
- jamEntry();
- ndbrequire(false);
- }//Dblqh::execTUP_LCPREF()
- /* --------------------------------------------------------------------------
- * THE LOCAL CHECKPOINT ROUND IS NOW COMPLETED. SEND COMPLETED MESSAGE
- * TO THE MASTER DIH.
- * ------------------------------------------------------------------------- */
- void Dblqh::completeLcpRoundLab(Signal* signal)
- {
- clcpCompletedState = LCP_CLOSE_STARTED;
- signal->theData[0] = caccBlockref;
- signal->theData[1] = cownref;
- sendSignal(caccBlockref, GSN_END_LCPREQ, signal, 2, JBB);
- signal->theData[0] = ctupBlockref;
- signal->theData[1] = cownref;
- sendSignal(ctupBlockref, GSN_END_LCPREQ, signal, 2, JBB);
- return;
- }//Dblqh::completeLcpRoundLab()
- void Dblqh::execEND_LCPCONF(Signal* signal)
- {
- jamEntry();
- BlockReference userpointer = signal->theData[0];
- if (userpointer == caccBlockref) {
- if (clcpCompletedState == LCP_CLOSE_STARTED) {
- jam();
- clcpCompletedState = ACC_LCP_CLOSE_COMPLETED;
- return;
- } else {
- jam();
- ndbrequire(clcpCompletedState == TUP_LCP_CLOSE_COMPLETED);
- clcpCompletedState = LCP_IDLE;
- }//if
- } else {
- ndbrequire(userpointer == ctupBlockref);
- if (clcpCompletedState == LCP_CLOSE_STARTED) {
- jam();
- clcpCompletedState = TUP_LCP_CLOSE_COMPLETED;
- return;
- } else {
- jam();
- ndbrequire(clcpCompletedState == ACC_LCP_CLOSE_COMPLETED);
- clcpCompletedState = LCP_IDLE;
- }//if
- }//if
- lcpPtr.i = 0;
- ptrAss(lcpPtr, lcpRecord);
- sendLCP_COMPLETE_REP(signal, lcpPtr.p->currentFragment.lcpFragOrd.lcpId);
- }//Dblqh::execEND_LCPCONF()
- void Dblqh::sendLCP_COMPLETE_REP(Signal* signal, Uint32 lcpId)
- {
- cnoOfFragsCheckpointed = 0;
- ndbrequire((cnoOfNodes - 1) < (MAX_NDB_NODES - 1));
- /* ------------------------------------------------------------------------
- * WE SEND COMP_LCP_ROUND TO ALL NODES TO PREPARE FOR NODE CRASHES.
- * ----------------------------------------------------------------------- */
- lcpPtr.i = 0;
- ptrAss(lcpPtr, lcpRecord);
- lcpPtr.p->lastFragmentFlag = false;
-
- LcpCompleteRep* rep = (LcpCompleteRep*)signal->getDataPtrSend();
- rep->nodeId = getOwnNodeId();
- rep->lcpId = lcpId;
- rep->blockNo = DBLQH;
-
- for (Uint32 i = 0; i < cnoOfNodes; i++) {
- jam();
- Uint32 nodeId = cnodeData[i];
- if(cnodeStatus[i] == ZNODE_UP){
- jam();
-
- BlockReference blockref = calcDihBlockRef(nodeId);
- sendSignal(blockref, GSN_LCP_COMPLETE_REP, signal,
- LcpCompleteRep::SignalLength, JBB);
- }//if
- }//for
- if(lcpPtr.p->reportEmpty){
- jam();
- sendEMPTY_LCP_CONF(signal, true);
- }
- return;
- }//Dblqh::sendCOMP_LCP_ROUND()
- /* ==========================================================================
- * ======= CHECK IF ALL PARTS OF A LOCAL CHECKPOINT ARE COMPLETED =======
- *
- * SUBROUTINE SHORT NAME = CLC
- * ========================================================================= */
- void Dblqh::checkLcpCompleted(Signal* signal)
- {
- LcpLocRecordPtr clcLcpLocptr;
- clcLcpLocptr.i = lcpPtr.p->firstLcpLocAcc;
- while (clcLcpLocptr.i != RNIL) {
- ptrCheckGuard(clcLcpLocptr, clcpLocrecFileSize, lcpLocRecord);
- if (clcLcpLocptr.p->lcpLocstate != LcpLocRecord::ACC_COMPLETED) {
- jam();
- ndbrequire((clcLcpLocptr.p->lcpLocstate == LcpLocRecord::ACC_WAIT_STARTED) ||
- (clcLcpLocptr.p->lcpLocstate == LcpLocRecord::ACC_STARTED));
- return;
- }//if
- clcLcpLocptr.i = clcLcpLocptr.p->nextLcpLoc;
- }
- clcLcpLocptr.i = lcpPtr.p->firstLcpLocTup;
- while (clcLcpLocptr.i != RNIL){
- ptrCheckGuard(clcLcpLocptr, clcpLocrecFileSize, lcpLocRecord);
- if (clcLcpLocptr.p->lcpLocstate != LcpLocRecord::TUP_COMPLETED) {
- jam();
- ndbrequire((clcLcpLocptr.p->lcpLocstate==LcpLocRecord::TUP_WAIT_STARTED)
- ||(clcLcpLocptr.p->lcpLocstate == LcpLocRecord::TUP_STARTED));
- return;
- }//if
- clcLcpLocptr.i = clcLcpLocptr.p->nextLcpLoc;
- }
-
- lcpPtr.p->lcpState = LcpRecord::LCP_COMPLETED;
- }//Dblqh::checkLcpCompleted()
- /* ==========================================================================
- * ======= CHECK IF ALL HOLD OPERATIONS ARE COMPLETED =======
- *
- * SUBROUTINE SHORT NAME = CHO
- * ========================================================================= */
- void Dblqh::checkLcpHoldop(Signal* signal)
- {
- LcpLocRecordPtr choLcpLocptr;
- choLcpLocptr.i = lcpPtr.p->firstLcpLocAcc;
- do {
- ptrCheckGuard(choLcpLocptr, clcpLocrecFileSize, lcpLocRecord);
- if (choLcpLocptr.p->lcpLocstate != LcpLocRecord::HOLDOP_READY) {
- ndbrequire(choLcpLocptr.p->lcpLocstate == LcpLocRecord::WAIT_LCPHOLDOP);
- return;
- }//if
- choLcpLocptr.i = choLcpLocptr.p->nextLcpLoc;
- } while (choLcpLocptr.i != RNIL);
- lcpPtr.p->lcpState = LcpRecord::LCP_WAIT_ACTIVE_FINISH;
- }//Dblqh::checkLcpHoldop()
- /* ==========================================================================
- * ======= CHECK IF ALL PARTS OF A LOCAL CHECKPOINT ARE STARTED =======
- *
- * SUBROUTINE SHORT NAME = CLS
- * ========================================================================== */
- bool
- Dblqh::checkLcpStarted(Signal* signal)
- {
- LcpLocRecordPtr clsLcpLocptr;
- terrorCode = ZOK;
- clsLcpLocptr.i = lcpPtr.p->firstLcpLocAcc;
- int i = 0;
- do {
- ptrCheckGuard(clsLcpLocptr, clcpLocrecFileSize, lcpLocRecord);
- if (clsLcpLocptr.p->lcpLocstate == LcpLocRecord::ACC_WAIT_STARTED){
- return false;
- }//if
- clsLcpLocptr.i = clsLcpLocptr.p->nextLcpLoc;
- i++;
- } while (clsLcpLocptr.i != RNIL);
- i = 0;
- clsLcpLocptr.i = lcpPtr.p->firstLcpLocTup;
- do {
- ptrCheckGuard(clsLcpLocptr, clcpLocrecFileSize, lcpLocRecord);
- if (clsLcpLocptr.p->lcpLocstate == LcpLocRecord::TUP_WAIT_STARTED){
- return false;
- }//if
- clsLcpLocptr.i = clsLcpLocptr.p->nextLcpLoc;
- i++;
- } while (clsLcpLocptr.i != RNIL);
-
- return true;
- }//Dblqh::checkLcpStarted()
- /* ==========================================================================
- * ======= CHECK IF ALL PREPARE TUP OPERATIONS ARE COMPLETED =======
- *
- * SUBROUTINE SHORT NAME = CLT
- * ========================================================================== */
- void Dblqh::checkLcpTupprep(Signal* signal)
- {
- LcpLocRecordPtr cltLcpLocptr;
- cltLcpLocptr.i = lcpPtr.p->firstLcpLocTup;
- do {
- ptrCheckGuard(cltLcpLocptr, clcpLocrecFileSize, lcpLocRecord);
- if (cltLcpLocptr.p->lcpLocstate != LcpLocRecord::IDLE) {
- ndbrequire(cltLcpLocptr.p->lcpLocstate == LcpLocRecord::WAIT_TUP_PREPLCP);
- return;
- }//if
- cltLcpLocptr.i = cltLcpLocptr.p->nextLcpLoc;
- } while (cltLcpLocptr.i != RNIL);
- lcpPtr.p->lcpState = LcpRecord::LCP_WAIT_HOLDOPS;
- }//Dblqh::checkLcpTupprep()
- /* ==========================================================================
- * ======= INITIATE LCP LOCAL RECORD USED TOWARDS ACC =======
- *
- * ========================================================================== */
- void Dblqh::initLcpLocAcc(Signal* signal, Uint32 fragId)
- {
- lcpLocptr.p->nextLcpLoc = lcpPtr.p->firstLcpLocAcc;
- lcpPtr.p->firstLcpLocAcc = lcpLocptr.i;
- lcpLocptr.p->locFragid = fragId;
- lcpLocptr.p->waitingBlock = LcpLocRecord::ACC;
- lcpLocptr.p->lcpLocstate = LcpLocRecord::IDLE;
- lcpLocptr.p->masterLcpRec = lcpPtr.i;
- lcpLocptr.p->tupRef = RNIL;
- }//Dblqh::initLcpLocAcc()
- /* ==========================================================================
- * ======= INITIATE LCP LOCAL RECORD USED TOWARDS TUP =======
- *
- * ========================================================================== */
- void Dblqh::initLcpLocTup(Signal* signal, Uint32 fragId)
- {
- lcpLocptr.p->nextLcpLoc = lcpPtr.p->firstLcpLocTup;
- lcpPtr.p->firstLcpLocTup = lcpLocptr.i;
- lcpLocptr.p->locFragid = fragId;
- lcpLocptr.p->waitingBlock = LcpLocRecord::TUP;
- lcpLocptr.p->lcpLocstate = LcpLocRecord::WAIT_TUP_PREPLCP;
- lcpLocptr.p->masterLcpRec = lcpPtr.i;
- lcpLocptr.p->tupRef = RNIL;
- }//Dblqh::initLcpLocTup()
- /* --------------------------------------------------------------------------
- * ------- MOVE OPERATION FROM ACC WAITING LIST ON FRAGMENT -------
- * ------- TO ACTIVE LIST ON FRAGMENT -------
- *
- * SUBROUTINE SHORT NAME = MAA
- * -------------------------------------------------------------------------- */
- void Dblqh::moveAccActiveFrag(Signal* signal)
- {
- UintR maaTcNextConnectptr;
- tcConnectptr.i = fragptr.p->accBlockedList;
- fragptr.p->accBlockedList = RNIL;
- /* ------------------------------------------------------------------------
- * WE WILL MOVE ALL RECORDS FROM THE ACC BLOCKED LIST AT ONCE.
- * ------------------------------------------------------------------------ */
- while (tcConnectptr.i != RNIL) {
- jam();
- ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
- maaTcNextConnectptr = tcConnectptr.p->nextTc;
- ndbrequire(tcConnectptr.p->listState == TcConnectionrec::ACC_BLOCK_LIST);
- tcConnectptr.p->listState = TcConnectionrec::NOT_IN_LIST;
- linkActiveFrag(signal);
- tcConnectptr.i = maaTcNextConnectptr;
- }//while
- }//Dblqh::moveAccActiveFrag()
- /* --------------------------------------------------------------------------
- * ------- MOVE OPERATION FROM ACTIVE LIST ON FRAGMENT -------
- * ------- TO ACC BLOCKED LIST ON FRAGMENT -------
- *
- * SUBROUTINE SHORT NAME = MAT
- * -------------------------------------------------------------------------- */
- void Dblqh::moveActiveToAcc(Signal* signal)
- {
- TcConnectionrecPtr matTcNextConnectptr;
- releaseActiveList(signal);
- /* ------------------------------------------------------------------------
- * PUT OPERATION RECORD FIRST IN ACC BLOCKED LIST.
- * ------------------------------------------------------------------------ */
- matTcNextConnectptr.i = fragptr.p->accBlockedList;
- tcConnectptr.p->nextTc = matTcNextConnectptr.i;
- tcConnectptr.p->prevTc = RNIL;
- tcConnectptr.p->listState = TcConnectionrec::ACC_BLOCK_LIST;
- fragptr.p->accBlockedList = tcConnectptr.i;
- if (matTcNextConnectptr.i != RNIL) {
- jam();
- ptrCheckGuard(matTcNextConnectptr, ctcConnectrecFileSize, tcConnectionrec);
- matTcNextConnectptr.p->prevTc = tcConnectptr.i;
- }//if
- }//Dblqh::moveActiveToAcc()
- /* ------------------------------------------------------------------------- */
- /* ---- RELEASE LOCAL LCP RECORDS AFTER COMPLETION OF A LOCAL CHECKPOINT---- */
- /* */
- /* SUBROUTINE SHORT NAME = RLL */
- /* ------------------------------------------------------------------------- */
- void Dblqh::releaseLocalLcps(Signal* signal)
- {
- lcpLocptr.i = lcpPtr.p->firstLcpLocAcc;
- while (lcpLocptr.i != RNIL){
- ptrCheckGuard(lcpLocptr, clcpLocrecFileSize, lcpLocRecord);
- Uint32 tmp = lcpLocptr.p->nextLcpLoc;
- releaseLcpLoc(signal);
- lcpLocptr.i = tmp;
- }
- lcpPtr.p->firstLcpLocAcc = RNIL;
-
- lcpLocptr.i = lcpPtr.p->firstLcpLocTup;
- while (lcpLocptr.i != RNIL){
- ptrCheckGuard(lcpLocptr, clcpLocrecFileSize, lcpLocRecord);
- Uint32 tmp = lcpLocptr.p->nextLcpLoc;
- releaseLcpLoc(signal);
- lcpLocptr.i = tmp;
- }
- lcpPtr.p->firstLcpLocTup = RNIL;
-
- }//Dblqh::releaseLocalLcps()
- /* ------------------------------------------------------------------------- */
- /* ------- SEIZE LCP LOCAL RECORD ------- */
- /* */
- /* ------------------------------------------------------------------------- */
- void Dblqh::seizeLcpLoc(Signal* signal)
- {
- lcpLocptr.i = cfirstfreeLcpLoc;
- ptrCheckGuard(lcpLocptr, clcpLocrecFileSize, lcpLocRecord);
- cfirstfreeLcpLoc = lcpLocptr.p->nextLcpLoc;
- lcpLocptr.p->nextLcpLoc = RNIL;
- }//Dblqh::seizeLcpLoc()
- /* ------------------------------------------------------------------------- */
- /* ------- SEND ACC_CONT_OP ------- */
- /* */
- /* INPUT: LCP_PTR LOCAL CHECKPOINT RECORD */
- /* FRAGPTR FRAGMENT RECORD */
- /* */
- /* SUBROUTINE SHORT NAME = SAC */
- /* ------------------------------------------------------------------------- */
- void Dblqh::sendAccContOp(Signal* signal)
- {
- LcpLocRecordPtr sacLcpLocptr;
- int count = 0;
- sacLcpLocptr.i = lcpPtr.p->firstLcpLocAcc;
- do {
- ptrCheckGuard(sacLcpLocptr, clcpLocrecFileSize, lcpLocRecord);
- sacLcpLocptr.p->accContCounter = 0;
- /* ------------------------------------------------------------------- */
- /*SEND START OPERATIONS TO ACC AGAIN */
- /* ------------------------------------------------------------------- */
- signal->theData[0] = lcpPtr.p->lcpAccptr;
- signal->theData[1] = sacLcpLocptr.p->locFragid;
- sendSignal(fragptr.p->accBlockref, GSN_ACC_CONTOPREQ, signal, 2, JBA);
- sacLcpLocptr.i = sacLcpLocptr.p->nextLcpLoc;
- } while (sacLcpLocptr.i != RNIL);
-
- }//Dblqh::sendAccContOp()
- /* ------------------------------------------------------------------------- */
- /* ------- SEND ACC_LCPREQ AND TUP_LCPREQ ------- */
- /* */
- /* INPUT: LCP_PTR LOCAL CHECKPOINT RECORD */
- /* FRAGPTR FRAGMENT RECORD */
- /* SUBROUTINE SHORT NAME = STL */
- /* ------------------------------------------------------------------------- */
- void Dblqh::sendStartLcp(Signal* signal)
- {
- LcpLocRecordPtr stlLcpLocptr;
- stlLcpLocptr.i = lcpPtr.p->firstLcpLocAcc;
- do {
- jam();
- ptrCheckGuard(stlLcpLocptr, clcpLocrecFileSize, lcpLocRecord);
- stlLcpLocptr.p->lcpLocstate = LcpLocRecord::ACC_WAIT_STARTED;
- signal->theData[0] = lcpPtr.p->lcpAccptr;
- signal->theData[1] = stlLcpLocptr.i;
- signal->theData[2] = stlLcpLocptr.p->locFragid;
- sendSignal(fragptr.p->accBlockref, GSN_ACC_LCPREQ, signal, 3, JBA);
- stlLcpLocptr.i = stlLcpLocptr.p->nextLcpLoc;
- } while (stlLcpLocptr.i != RNIL);
- stlLcpLocptr.i = lcpPtr.p->firstLcpLocTup;
- do {
- jam();
- ptrCheckGuard(stlLcpLocptr, clcpLocrecFileSize, lcpLocRecord);
- stlLcpLocptr.p->lcpLocstate = LcpLocRecord::TUP_WAIT_STARTED;
- signal->theData[0] = stlLcpLocptr.i;
- signal->theData[1] = cownref;
- signal->theData[2] = stlLcpLocptr.p->tupRef;
- if(ERROR_INSERTED(5077))
- sendSignalWithDelay(fragptr.p->tupBlockref, GSN_TUP_LCPREQ,
- signal, 5000, 3);
- else
- sendSignal(fragptr.p->tupBlockref, GSN_TUP_LCPREQ, signal, 3, JBA);
- stlLcpLocptr.i = stlLcpLocptr.p->nextLcpLoc;
- } while (stlLcpLocptr.i != RNIL);
- if(ERROR_INSERTED(5077))
- {
- ndbout_c("Delayed TUP_LCPREQ with 5 sec");
- }
- }//Dblqh::sendStartLcp()
- /* ------------------------------------------------------------------------- */
- /* ------- SET THE LOG TAIL IN THE LOG FILES ------- */
- /* */
- /*THIS SUBROUTINE HAVE BEEN BUGGY AND IS RATHER COMPLEX. IT IS IMPORTANT TO */
- /*REMEMBER THAT WE SEARCH FROM THE TAIL UNTIL WE REACH THE HEAD (CURRENT). */
- /*THE TAIL AND HEAD CAN BE ON THE SAME MBYTE. WE SEARCH UNTIL WE FIND A MBYTE*/
- /*THAT WE NEED TO KEEP. WE THEN SET THE TAIL TO BE THE PREVIOUS. IF WE DO */
- /*NOT FIND A MBYTE THAT WE NEED TO KEEP UNTIL WE REACH THE HEAD THEN WE USE */
- /*THE HEAD AS TAIL. FINALLY WE HAVE TO MOVE BACK THE TAIL TO ALSO INCLUDE */
- /*ALL PREPARE RECORDS. THIS MEANS THAT LONG-LIVED TRANSACTIONS ARE DANGEROUS */
- /*FOR SHORT LOGS. */
- /* ------------------------------------------------------------------------- */
- // this function has not been verified yet
- Uint32 Dblqh::remainingLogSize(const LogFileRecordPtr &sltCurrLogFilePtr,
- const LogPartRecordPtr &sltLogPartPtr)
- {
- Uint32 hf = sltCurrLogFilePtr.p->fileNo*ZNO_MBYTES_IN_FILE+sltCurrLogFilePtr.p->currentMbyte;
- Uint32 tf = sltLogPartPtr.p->logTailFileNo*ZNO_MBYTES_IN_FILE+sltLogPartPtr.p->logTailMbyte;
- Uint32 sz = sltLogPartPtr.p->noLogFiles*ZNO_MBYTES_IN_FILE;
- if (tf > hf) hf += sz;
- return sz-(hf-tf);
- }
- void Dblqh::setLogTail(Signal* signal, Uint32 keepGci)
- {
- LogPartRecordPtr sltLogPartPtr;
- LogFileRecordPtr sltLogFilePtr;
- #if 0
- LogFileRecordPtr sltCurrLogFilePtr;
- #endif
- UintR tsltMbyte;
- UintR tsltStartMbyte;
- UintR tsltIndex;
- UintR tsltFlag;
- for (sltLogPartPtr.i = 0; sltLogPartPtr.i < 4; sltLogPartPtr.i++) {
- jam();
- ptrAss(sltLogPartPtr, logPartRecord);
- findLogfile(signal, sltLogPartPtr.p->logTailFileNo,
- sltLogPartPtr, &sltLogFilePtr);
- #if 0
- sltCurrLogFilePtr.i = sltLogPartPtr.p->currentLogfile;
- ptrCheckGuard(sltCurrLogFilePtr, clogFileFileSize, logFileRecord);
- infoEvent("setLogTail: Available log file %d size = %d[mbytes]+%d[words]", sltLogPartPtr.i,
- remainingLogSize(sltCurrLogFilePtr, sltLogPartPtr), sltCurrLogFilePtr.p->remainingWordsInMbyte);
- #endif
- tsltMbyte = sltLogPartPtr.p->logTailMbyte;
- tsltStartMbyte = tsltMbyte;
- tsltFlag = ZFALSE;
- if (sltLogFilePtr.i == sltLogPartPtr.p->currentLogfile) {
- /* ------------------------------------------------------------------------- */
- /*THE LOG AND THE TAIL IS ALREADY IN THE SAME FILE. */
- /* ------------------------------------------------------------------------- */
- if (sltLogFilePtr.p->currentMbyte >= sltLogPartPtr.p->logTailMbyte) {
- jam();
- /* ------------------------------------------------------------------------- */
- /*THE CURRENT MBYTE IS AHEAD OF OR AT THE TAIL. THUS WE WILL ONLY LOOK FOR */
- /*THE TAIL UNTIL WE REACH THE CURRENT MBYTE WHICH IS IN THIS LOG FILE. */
- /*IF THE LOG TAIL IS AHEAD OF THE CURRENT MBYTE BUT IN THE SAME LOG FILE */
- /*THEN WE HAVE TO SEARCH THROUGH ALL FILES BEFORE WE COME TO THE CURRENT */
- /*MBYTE. WE ALWAYS STOP WHEN WE COME TO THE CURRENT MBYTE SINCE THE TAIL */
- /*CAN NEVER BE BEFORE THE HEAD. */
- /* ------------------------------------------------------------------------- */
- tsltFlag = ZTRUE;
- }//if
- }//if
- /* ------------------------------------------------------------------------- */
- /*NOW START SEARCHING FOR THE NEW TAIL, STARTING AT THE CURRENT TAIL AND */
- /*PROCEEDING UNTIL WE FIND A MBYTE WHICH IS NEEDED TO KEEP OR UNTIL WE REACH */
- /*CURRENT MBYTE (THE HEAD). */
- /* ------------------------------------------------------------------------- */
- SLT_LOOP:
- for (tsltIndex = tsltStartMbyte;
- tsltIndex <= ZNO_MBYTES_IN_FILE - 1;
- tsltIndex++) {
- if (sltLogFilePtr.p->logMaxGciStarted[tsltIndex] >= keepGci) {
- /* ------------------------------------------------------------------------- */
- /*WE ARE NOT ALLOWED TO STEP THE LOG ANY FURTHER AHEAD */
- /*SET THE NEW LOG TAIL AND CONTINUE WITH NEXT LOG PART. */
- /*THIS MBYTE IS NOT TO BE INCLUDED SO WE NEED TO STEP BACK ONE MBYTE. */
- /* ------------------------------------------------------------------------- */
- if (tsltIndex != 0) {
- jam();
- tsltMbyte = tsltIndex - 1;
- } else {
- jam();
- /* ------------------------------------------------------------------------- */
- /*STEPPING BACK INCLUDES ALSO STEPPING BACK TO THE PREVIOUS LOG FILE. */
- /* ------------------------------------------------------------------------- */
- tsltMbyte = ZNO_MBYTES_IN_FILE - 1;
- sltLogFilePtr.i = sltLogFilePtr.p->prevLogFile;
- ptrCheckGuard(sltLogFilePtr, clogFileFileSize, logFileRecord);
- }//if
- goto SLT_BREAK;
- } else {
- jam();
- if (tsltFlag == ZTRUE) {
- /* ------------------------------------------------------------------------- */
- /*WE ARE IN THE SAME FILE AS THE CURRENT MBYTE AND WE CAN REACH THE CURRENT */
- /*MBYTE BEFORE WE REACH A NEW TAIL. */
- /* ------------------------------------------------------------------------- */
- if (tsltIndex == sltLogFilePtr.p->currentMbyte) {
- jam();
- /* ------------------------------------------------------------------------- */
- /*THE TAIL OF THE LOG IS ACTUALLY WITHIN THE CURRENT MBYTE. THUS WE SET THE */
- /*LOG TAIL TO BE THE CURRENT MBYTE. */
- /* ------------------------------------------------------------------------- */
- tsltMbyte = sltLogFilePtr.p->currentMbyte;
- goto SLT_BREAK;
- }//if
- }//if
- }//if
- }//for
- sltLogFilePtr.i = sltLogFilePtr.p->nextLogFile;
- ptrCheckGuard(sltLogFilePtr, clogFileFileSize, logFileRecord);
- if (sltLogFilePtr.i == sltLogPartPtr.p->currentLogfile) {
- jam();
- tsltFlag = ZTRUE;
- }//if
- tsltStartMbyte = 0;
- goto SLT_LOOP;
- SLT_BREAK:
- jam();
- {
- UintR ToldTailFileNo = sltLogPartPtr.p->logTailFileNo;
- UintR ToldTailMByte = sltLogPartPtr.p->logTailMbyte;
- arrGuard(tsltMbyte, 16);
- sltLogPartPtr.p->logTailFileNo =
- sltLogFilePtr.p->logLastPrepRef[tsltMbyte] >> 16;
- /* ------------------------------------------------------------------------- */
- /*SINCE LOG_MAX_GCI_STARTED ONLY KEEP TRACK OF COMMIT LOG RECORDS WE ALSO */
- /*HAVE TO STEP BACK THE TAIL SO THAT WE INCLUDE ALL PREPARE RECORDS */
- /*NEEDED FOR THOSE COMMIT RECORDS IN THIS MBYTE. THIS IS A RATHER */
- /*CONSERVATIVE APPROACH BUT IT WORKS. */
- /* ------------------------------------------------------------------------- */
- sltLogPartPtr.p->logTailMbyte =
- sltLogFilePtr.p->logLastPrepRef[tsltMbyte] & 65535;
- if ((ToldTailFileNo != sltLogPartPtr.p->logTailFileNo) ||
- (ToldTailMByte != sltLogPartPtr.p->logTailMbyte)) {
- jam();
- if (sltLogPartPtr.p->logPartState == LogPartRecord::TAIL_PROBLEM) {
- if (sltLogPartPtr.p->firstLogQueue == RNIL) {
- jam();
- sltLogPartPtr.p->logPartState = LogPartRecord::IDLE;
- } else {
- jam();
- sltLogPartPtr.p->logPartState = LogPartRecord::ACTIVE;
- }//if
- }//if
- }//if
- }
- #if 0
- infoEvent("setLogTail: Available log file %d size = %d[mbytes]+%d[words]", sltLogPartPtr.i,
- remainingLogSize(sltCurrLogFilePtr, sltLogPartPtr), sltCurrLogFilePtr.p->remainingWordsInMbyte);
- #endif
- }//for
- }//Dblqh::setLogTail()
- /* ######################################################################### */
- /* ####### GLOBAL CHECKPOINT MODULE ####### */
- /* */
- /* ######################################################################### */
- /*---------------------------------------------------------------------------*/
- /* THIS MODULE HELPS DIH IN DISCOVERING WHEN GLOBAL CHECKPOINTS ARE */
- /* RECOVERABLE. IT HANDLES THE REQUEST GCP_SAVEREQ THAT REQUESTS LQH TO */
- /* SAVE A PARTICULAR GLOBAL CHECKPOINT TO DISK AND RESPOND WHEN COMPLETED. */
- /*---------------------------------------------------------------------------*/
- /* *************** */
- /* GCP_SAVEREQ > */
- /* *************** */
- void Dblqh::execGCP_SAVEREQ(Signal* signal)
- {
- jamEntry();
- const GCPSaveReq * const saveReq = (GCPSaveReq *)&signal->theData[0];
- if (ERROR_INSERTED(5000)) {
- systemErrorLab(signal);
- }
- if (ERROR_INSERTED(5007)){
- CLEAR_ERROR_INSERT_VALUE;
- sendSignalWithDelay(cownref, GSN_GCP_SAVEREQ, signal, 10000,
- signal->length());
- return;
- }
- const Uint32 dihBlockRef = saveReq->dihBlockRef;
- const Uint32 dihPtr = saveReq->dihPtr;
- const Uint32 gci = saveReq->gci;
-
- ndbrequire(gci >= cnewestCompletedGci);
-
- if (gci == cnewestCompletedGci) {
- /*---------------------------------------------------------------------------*/
- /* GLOBAL CHECKPOINT HAVE ALREADY BEEN HANDLED. REQUEST MUST HAVE BEEN SENT */
- /* FROM NEW MASTER DIH. */
- /*---------------------------------------------------------------------------*/
- if (ccurrentGcprec == RNIL) {
- jam();
- /*---------------------------------------------------------------------------*/
- /* THIS INDICATES THAT WE HAVE ALREADY SENT GCP_SAVECONF TO PREVIOUS MASTER. */
- /* WE SIMPLY SEND IT ALSO TO THE NEW MASTER. */
- /*---------------------------------------------------------------------------*/
- GCPSaveConf * const saveConf = (GCPSaveConf*)&signal->theData[0];
- saveConf->dihPtr = dihPtr;
- saveConf->nodeId = getOwnNodeId();
- saveConf->gci = cnewestCompletedGci;
- sendSignal(dihBlockRef, GSN_GCP_SAVECONF, signal,
- GCPSaveConf::SignalLength, JBA);
- return;
- }
- jam();
- /*---------------------------------------------------------------------------*/
- /* WE HAVE NOT YET SENT THE RESPONSE TO THE OLD MASTER. WE WILL SET THE NEW */
- /* RECEIVER OF THE RESPONSE AND THEN EXIT SINCE THE PROCESS IS ALREADY */
- /* STARTED. */
- /*---------------------------------------------------------------------------*/
- gcpPtr.i = ccurrentGcprec;
- ptrCheckGuard(gcpPtr, cgcprecFileSize, gcpRecord);
- gcpPtr.p->gcpUserptr = dihPtr;
- gcpPtr.p->gcpBlockref = dihBlockRef;
- return;
- }//if
-
- ndbrequire(ccurrentGcprec == RNIL);
-
-
- if(getNodeState().startLevel >= NodeState::SL_STOPPING_4){
- GCPSaveRef * const saveRef = (GCPSaveRef*)&signal->theData[0];
- saveRef->dihPtr = dihPtr;
- saveRef->nodeId = getOwnNodeId();
- saveRef->gci = gci;
- saveRef->errorCode = GCPSaveRef::NodeShutdownInProgress;
- sendSignal(dihBlockRef, GSN_GCP_SAVEREF, signal,
- GCPSaveRef::SignalLength, JBB);
- return;
- }
- if(getNodeState().getNodeRestartInProgress()){
- GCPSaveRef * const saveRef = (GCPSaveRef*)&signal->theData[0];
- saveRef->dihPtr = dihPtr;
- saveRef->nodeId = getOwnNodeId();
- saveRef->gci = gci;
- saveRef->errorCode = GCPSaveRef::NodeRestartInProgress;
- sendSignal(dihBlockRef, GSN_GCP_SAVEREF, signal,
- GCPSaveRef::SignalLength, JBB);
- return;
- }
-
- ccurrentGcprec = 0;
- gcpPtr.i = ccurrentGcprec;
- ptrCheckGuard(gcpPtr, cgcprecFileSize, gcpRecord);
-
- cnewestCompletedGci = gci;
- if (gci > cnewestGci) {
- jam();
- cnewestGci = gci;
- }//if
-
- gcpPtr.p->gcpBlockref = dihBlockRef;
- gcpPtr.p->gcpUserptr = dihPtr;
- gcpPtr.p->gcpId = gci;
- bool tlogActive = false;
- for (logPartPtr.i = 0; logPartPtr.i <= 3; logPartPtr.i++) {
- ptrAss(logPartPtr, logPartRecord);
- if (logPartPtr.p->logPartState == LogPartRecord::ACTIVE) {
- jam();
- logPartPtr.p->waitWriteGciLog = LogPartRecord::WWGL_TRUE;
- tlogActive = true;
- } else {
- jam();
- logPartPtr.p->waitWriteGciLog = LogPartRecord::WWGL_FALSE;
- logFilePtr.i = logPartPtr.p->currentLogfile;
- ptrCheckGuard(logFilePtr, clogFileFileSize, logFileRecord);
- logPagePtr.i = logFilePtr.p->currentLogpage;
- ptrCheckGuard(logPagePtr, clogPageFileSize, logPageRecord);
- writeCompletedGciLog(signal);
- }//if
- }//for
- if (tlogActive == true) {
- jam();
- return;
- }//if
- initGcpRecLab(signal);
- startTimeSupervision(signal);
- return;
- }//Dblqh::execGCP_SAVEREQ()
- /* ------------------------------------------------------------------------- */
- /* START TIME SUPERVISION OF THE LOG PARTS. */
- /* ------------------------------------------------------------------------- */
- void Dblqh::startTimeSupervision(Signal* signal)
- {
- for (logPartPtr.i = 0; logPartPtr.i <= 3; logPartPtr.i++) {
- jam();
- ptrAss(logPartPtr, logPartRecord);
- /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
- /* WE HAVE TO START CHECKING IF THE LOG IS TO BE WRITTEN EVEN IF PAGES ARE */
- /* FULL. INITIALISE THE VALUES OF WHERE WE ARE IN THE LOG CURRENTLY. */
- /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
- logPartPtr.p->logPartTimer = 0;
- logPartPtr.p->logTimer = 1;
- signal->theData[0] = ZTIME_SUPERVISION;
- signal->theData[1] = logPartPtr.i;
- sendSignal(cownref, GSN_CONTINUEB, signal, 2, JBB);
- }//for
- }//Dblqh::startTimeSupervision()
- /*---------------------------------------------------------------------------*/
- /* WE SET THE GLOBAL CHECKPOINT VARIABLES AFTER WRITING THE COMPLETED GCI LOG*/
- /* RECORD. THIS ENSURES THAT WE WILL ENCOUNTER THE COMPLETED GCI RECORD WHEN */
- /* WE EXECUTE THE FRAGMENT LOG. */
- /*---------------------------------------------------------------------------*/
- void Dblqh::initGcpRecLab(Signal* signal)
- {
- /* ======================================================================== */
- /* ======= INITIATE GCP RECORD ======= */
- /* */
- /* SUBROUTINE SHORT NAME = IGR */
- /* ======================================================================== */
- for (logPartPtr.i = 0; logPartPtr.i <= 3; logPartPtr.i++) {
- jam();
- ptrAss(logPartPtr, logPartRecord);
- /*--------------------------------------------------*/
- /* BY SETTING THE GCPREC = 0 WE START THE */
- /* CHECKING BY CHECK_GCP_COMPLETED. THIS */
- /* CHECKING MUST NOT BE STARTED UNTIL WE HAVE */
- /* INSERTED ALL COMPLETE GCI LOG RECORDS IN */
- /* ALL LOG PARTS. */
- /*--------------------------------------------------*/
- logPartPtr.p->gcprec = 0;
- gcpPtr.p->gcpLogPartState[logPartPtr.i] = ZWAIT_DISK;
- gcpPtr.p->gcpSyncReady[logPartPtr.i] = ZFALSE;
- logFilePtr.i = logPartPtr.p->currentLogfile;
- ptrCheckGuard(logFilePtr, clogFileFileSize, logFileRecord);
- gcpPtr.p->gcpFilePtr[logPartPtr.i] = logFilePtr.i;
- logPagePtr.i = logFilePtr.p->currentLogpage;
- ptrCheckGuard(logPagePtr, clogPageFileSize, logPageRecord);
- if (logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX] == ZPAGE_HEADER_SIZE) {
- jam();
- /*--------------------------------------------------*/
- /* SINCE THE CURRENT FILEPAGE POINTS AT THE */
- /* NEXT WORD TO BE WRITTEN WE HAVE TO ADJUST */
- /* FOR THIS BY DECREASING THE FILE PAGE BY ONE*/
- /* IF NO WORD HAS BEEN WRITTEN ON THE CURRENT */
- /* FILEPAGE. */
- /*--------------------------------------------------*/
- gcpPtr.p->gcpPageNo[logPartPtr.i] = logFilePtr.p->currentFilepage - 1;
- gcpPtr.p->gcpWordNo[logPartPtr.i] = ZPAGE_SIZE - 1;
- } else {
- jam();
- gcpPtr.p->gcpPageNo[logPartPtr.i] = logFilePtr.p->currentFilepage;
- gcpPtr.p->gcpWordNo[logPartPtr.i] =
- logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX] - 1;
- }//if
- }//for
- return;
- }//Dblqh::initGcpRecLab()
- /* ========================================================================= */
- /* ==== CHECK IF ANY GLOBAL CHECKPOINTS ARE COMPLETED AFTER A COMPLETED===== */
- /* DISK WRITE. */
- /* */
- /* SUBROUTINE SHORT NAME = CGC */
- /* ========================================================================= */
- void Dblqh::checkGcpCompleted(Signal* signal,
- Uint32 tcgcPageWritten,
- Uint32 tcgcWordWritten)
- {
- UintR tcgcFlag;
- UintR tcgcJ;
- gcpPtr.i = logPartPtr.p->gcprec;
- if (gcpPtr.i != RNIL) {
- jam();
- /* ------------------------------------------------------------------------- */
- /* IF THE GLOBAL CHECKPOINT IS NOT WAITING FOR COMPLETION THEN WE CAN QUIT */
- /* THE SEARCH IMMEDIATELY. */
- /* ------------------------------------------------------------------------- */
- ptrCheckGuard(gcpPtr, cgcprecFileSize, gcpRecord);
- if (gcpPtr.p->gcpFilePtr[logPartPtr.i] == logFilePtr.i) {
- /* ------------------------------------------------------------------------- */
- /* IF THE COMPLETED DISK OPERATION WAS ON ANOTHER FILE THAN THE ONE WE ARE */
- /* WAITING FOR, THEN WE CAN ALSO QUIT THE SEARCH IMMEDIATELY. */
- /* ------------------------------------------------------------------------- */
- if (tcgcPageWritten < gcpPtr.p->gcpPageNo[logPartPtr.i]) {
- jam();
- /* ------------------------------------------------------------------------- */
- /* THIS LOG PART HAVE NOT YET WRITTEN THE GLOBAL CHECKPOINT TO DISK. */
- /* ------------------------------------------------------------------------- */
- return;
- } else {
- if (tcgcPageWritten == gcpPtr.p->gcpPageNo[logPartPtr.i]) {
- if (tcgcWordWritten < gcpPtr.p->gcpWordNo[logPartPtr.i]) {
- jam();
- /* ------------------------------------------------------------------------- */
- /* THIS LOG PART HAVE NOT YET WRITTEN THE GLOBAL CHECKPOINT TO DISK. */
- /* ------------------------------------------------------------------------- */
- return;
- }//if
- }//if
- }//if
- /* ------------------------------------------------------------------------- */
- /* THIS LOG PART HAVE WRITTEN THE GLOBAL CHECKPOINT TO DISK. */
- /* ------------------------------------------------------------------------- */
- logPartPtr.p->gcprec = RNIL;
- gcpPtr.p->gcpLogPartState[logPartPtr.i] = ZON_DISK;
- tcgcFlag = ZTRUE;
- for (tcgcJ = 0; tcgcJ <= 3; tcgcJ++) {
- jam();
- if (gcpPtr.p->gcpLogPartState[tcgcJ] != ZON_DISK) {
- jam();
- /* ------------------------------------------------------------------------- */
- /*ALL LOG PARTS HAVE NOT SAVED THIS GLOBAL CHECKPOINT TO DISK YET. WAIT FOR */
- /*THEM TO COMPLETE. */
- /* ------------------------------------------------------------------------- */
- tcgcFlag = ZFALSE;
- }//if
- }//for
- if (tcgcFlag == ZTRUE) {
- jam();
- /* ------------------------------------------------------------------------- */
- /*WE HAVE FOUND A COMPLETED GLOBAL CHECKPOINT OPERATION. WE NOW NEED TO SEND */
- /*GCP_SAVECONF, REMOVE THE GCP RECORD FROM THE LIST OF WAITING GCP RECORDS */
- /*ON THIS LOG PART AND RELEASE THE GCP RECORD. */
- // After changing the log implementation we need to perform a FSSYNCREQ on all
- // log files where the last log word resided first before proceeding.
- /* ------------------------------------------------------------------------- */
- UintR Ti;
- for (Ti = 0; Ti < 4; Ti++) {
- LogFileRecordPtr loopLogFilePtr;
- loopLogFilePtr.i = gcpPtr.p->gcpFilePtr[Ti];
- ptrCheckGuard(loopLogFilePtr, clogFileFileSize, logFileRecord);
- if (loopLogFilePtr.p->logFileStatus == LogFileRecord::OPEN) {
- jam();
- signal->theData[0] = loopLogFilePtr.p->fileRef;
- signal->theData[1] = cownref;
- signal->theData[2] = gcpPtr.p->gcpFilePtr[Ti];
- sendSignal(NDBFS_REF, GSN_FSSYNCREQ, signal, 3, JBA);
- } else {
- ndbrequire((loopLogFilePtr.p->logFileStatus ==
- LogFileRecord::CLOSED) ||
- (loopLogFilePtr.p->logFileStatus ==
- LogFileRecord::CLOSING_WRITE_LOG) ||
- (loopLogFilePtr.p->logFileStatus ==
- LogFileRecord::OPENING_WRITE_LOG));
- signal->theData[0] = loopLogFilePtr.i;
- execFSSYNCCONF(signal);
- }//if
- }//for
- return;
- }//if
- }//if
- }//if
- }//Dblqh::checkGcpCompleted()
- void
- Dblqh::execFSSYNCCONF(Signal* signal)
- {
- GcpRecordPtr localGcpPtr;
- LogFileRecordPtr localLogFilePtr;
- LogPartRecordPtr localLogPartPtr;
- localLogFilePtr.i = signal->theData[0];
- ptrCheckGuard(localLogFilePtr, clogFileFileSize, logFileRecord);
- localLogPartPtr.i = localLogFilePtr.p->logPartRec;
- localGcpPtr.i = ccurrentGcprec;
- ptrCheckGuard(localGcpPtr, cgcprecFileSize, gcpRecord);
- localGcpPtr.p->gcpSyncReady[localLogPartPtr.i] = ZTRUE;
- UintR Ti;
- for (Ti = 0; Ti < 4; Ti++) {
- jam();
- if (localGcpPtr.p->gcpSyncReady[Ti] == ZFALSE) {
- jam();
- return;
- }//if
- }//for
- GCPSaveConf * const saveConf = (GCPSaveConf *)&signal->theData[0];
- saveConf->dihPtr = localGcpPtr.p->gcpUserptr;
- saveConf->nodeId = getOwnNodeId();
- saveConf->gci = localGcpPtr.p->gcpId;
- sendSignal(localGcpPtr.p->gcpBlockref, GSN_GCP_SAVECONF, signal,
- GCPSaveConf::SignalLength, JBA);
- ccurrentGcprec = RNIL;
- }//Dblqh::execFSSYNCCONF()
- /* ######################################################################### */
- /* ####### FILE HANDLING MODULE ####### */
- /* */
- /* ######################################################################### */
- /* THIS MODULE HANDLES RESPONSE MESSAGES FROM THE FILE SYSTEM */
- /* ######################################################################### */
- /* ######################################################################### */
- /* SIGNAL RECEPTION MODULE */
- /* THIS MODULE IS A SUB-MODULE OF THE FILE SYSTEM HANDLING. */
- /* */
- /* THIS MODULE CHECKS THE STATE AND JUMPS TO THE PROPER PART OF THE FILE */
- /* HANDLING MODULE. */
- /* ######################################################################### */
- /* *************** */
- /* FSCLOSECONF > */
- /* *************** */
- void Dblqh::execFSCLOSECONF(Signal* signal)
- {
- jamEntry();
- logFilePtr.i = signal->theData[0];
- ptrCheckGuard(logFilePtr, clogFileFileSize, logFileRecord);
- switch (logFilePtr.p->logFileStatus) {
- case LogFileRecord::CLOSE_SR_INVALIDATE_PAGES:
- jam();
- logFilePtr.p->logFileStatus = LogFileRecord::CLOSED;
- // Set the prev file to check if we shall close it.
- logFilePtr.i = logFilePtr.p->prevLogFile;
- ptrCheckGuard(logFilePtr, clogFileFileSize, logFileRecord);
- exitFromInvalidate(signal);
- return;
- case LogFileRecord::CLOSING_INIT:
- jam();
- closingInitLab(signal);
- return;
- case LogFileRecord::CLOSING_SR:
- jam();
- closingSrLab(signal);
- return;
- case LogFileRecord::CLOSING_EXEC_SR:
- jam();
- closeExecSrLab(signal);
- return;
- case LogFileRecord::CLOSING_EXEC_SR_COMPLETED:
- jam();
- closeExecSrCompletedLab(signal);
- return;
- case LogFileRecord::CLOSING_WRITE_LOG:
- jam();
- closeWriteLogLab(signal);
- return;
- case LogFileRecord::CLOSING_EXEC_LOG:
- jam();
- closeExecLogLab(signal);
- return;
- default:
- jam();
- systemErrorLab(signal);
- return;
- }//switch
- }//Dblqh::execFSCLOSECONF()
- /* ************>> */
- /* FSOPENCONF > */
- /* ************>> */
- void Dblqh::execFSOPENCONF(Signal* signal)
- {
- jamEntry();
- initFsopenconf(signal);
- switch (logFilePtr.p->logFileStatus) {
- case LogFileRecord::OPEN_SR_INVALIDATE_PAGES:
- jam();
- logFilePtr.p->logFileStatus = LogFileRecord::OPEN;
- readFileInInvalidate(signal);
- return;
- case LogFileRecord::OPENING_INIT:
- jam();
- logFilePtr.p->logFileStatus = LogFileRecord::OPEN;
- openFileInitLab(signal);
- return;
- case LogFileRecord::OPEN_SR_FRONTPAGE:
- jam();
- logFilePtr.p->logFileStatus = LogFileRecord::OPEN;
- openSrFrontpageLab(signal);
- return;
- case LogFileRecord::OPEN_SR_LAST_FILE:
- jam();
- logFilePtr.p->logFileStatus = LogFileRecord::OPEN;
- openSrLastFileLab(signal);
- return;
- case LogFileRecord::OPEN_SR_NEXT_FILE:
- jam();
- logFilePtr.p->logFileStatus = LogFileRecord::OPEN;
- openSrNextFileLab(signal);
- return;
- case LogFileRecord::OPEN_EXEC_SR_START:
- jam();
- logFilePtr.p->logFileStatus = LogFileRecord::OPEN;
- openExecSrStartLab(signal);
- return;
- case LogFileRecord::OPEN_EXEC_SR_NEW_MBYTE:
- jam();
- logFilePtr.p->logFileStatus = LogFileRecord::OPEN;
- openExecSrNewMbyteLab(signal);
- return;
- case LogFileRecord::OPEN_SR_FOURTH_PHASE:
- jam();
- logFilePtr.p->logFileStatus = LogFileRecord::OPEN;
- openSrFourthPhaseLab(signal);
- return;
- case LogFileRecord::OPEN_SR_FOURTH_NEXT:
- jam();
- logFilePtr.p->logFileStatus = LogFileRecord::OPEN;
- openSrFourthNextLab(signal);
- return;
- case LogFileRecord::OPEN_SR_FOURTH_ZERO:
- jam();
- logFilePtr.p->logFileStatus = LogFileRecord::OPEN;
- openSrFourthZeroLab(signal);
- return;
- case LogFileRecord::OPENING_WRITE_LOG:
- jam();
- logFilePtr.p->logFileStatus = LogFileRecord::OPEN;
- return;
- case LogFileRecord::OPEN_EXEC_LOG:
- jam();
- logFilePtr.p->logFileStatus = LogFileRecord::OPEN;
- openExecLogLab(signal);
- return;
- default:
- jam();
- systemErrorLab(signal);
- return;
- }//switch
- }//Dblqh::execFSOPENCONF()
- /* ************>> */
- /* FSREADCONF > */
- /* ************>> */
- void Dblqh::execFSREADCONF(Signal* signal)
- {
- jamEntry();
- initFsrwconf(signal, false);
- switch (lfoPtr.p->lfoState) {
- case LogFileOperationRecord::READ_SR_LAST_MBYTE:
- jam();
- releaseLfo(signal);
- readSrLastMbyteLab(signal);
- return;
- case LogFileOperationRecord::READ_SR_FRONTPAGE:
- jam();
- releaseLfo(signal);
- readSrFrontpageLab(signal);
- return;
- case LogFileOperationRecord::READ_SR_LAST_FILE:
- jam();
- releaseLfo(signal);
- readSrLastFileLab(signal);
- return;
- case LogFileOperationRecord::READ_SR_NEXT_FILE:
- jam();
- releaseLfo(signal);
- readSrNextFileLab(signal);
- return;
- case LogFileOperationRecord::READ_EXEC_SR:
- jam();
- readExecSrLab(signal);
- return;
- case LogFileOperationRecord::READ_EXEC_LOG:
- jam();
- readExecLogLab(signal);
- return;
- case LogFileOperationRecord::READ_SR_INVALIDATE_PAGES:
- jam();
- invalidateLogAfterLastGCI(signal);
- return;
- case LogFileOperationRecord::READ_SR_FOURTH_PHASE:
- jam();
- releaseLfo(signal);
- readSrFourthPhaseLab(signal);
- return;
- case LogFileOperationRecord::READ_SR_FOURTH_ZERO:
- jam();
- releaseLfo(signal);
- readSrFourthZeroLab(signal);
- return;
- default:
- jam();
- systemErrorLab(signal);
- return;
- }//switch
- }//Dblqh::execFSREADCONF()
- /* ************>> */
- /* FSREADCONF > */
- /* ************>> */
- void Dblqh::execFSREADREF(Signal* signal)
- {
- jamEntry();
- lfoPtr.i = signal->theData[0];
- ptrCheckGuard(lfoPtr, clfoFileSize, logFileOperationRecord);
- switch (lfoPtr.p->lfoState) {
- case LogFileOperationRecord::READ_SR_LAST_MBYTE:
- jam();
- break;
- case LogFileOperationRecord::READ_SR_FRONTPAGE:
- jam();
- break;
- case LogFileOperationRecord::READ_SR_LAST_FILE:
- jam();
- break;
- case LogFileOperationRecord::READ_SR_NEXT_FILE:
- jam();
- break;
- case LogFileOperationRecord::READ_EXEC_SR:
- jam();
- break;
- case LogFileOperationRecord::READ_EXEC_LOG:
- jam();
- break;
- case LogFileOperationRecord::READ_SR_FOURTH_PHASE:
- jam();
- break;
- case LogFileOperationRecord::READ_SR_FOURTH_ZERO:
- jam();
- break;
- case LogFileOperationRecord::READ_SR_INVALIDATE_PAGES:
- jam()
- break;
- default:
- jam();
- break;
- }//switch
- {
- char msg[100];
- sprintf(msg, "File system read failed during LogFileOperationRecord state %d", (Uint32)lfoPtr.p->lfoState);
- fsRefError(signal,__LINE__,msg);
- }
- }//Dblqh::execFSREADREF()
- /* *************** */
- /* FSWRITECONF > */
- /* *************** */
- void Dblqh::execFSWRITECONF(Signal* signal)
- {
- jamEntry();
- initFsrwconf(signal, true);
- switch (lfoPtr.p->lfoState) {
- case LogFileOperationRecord::WRITE_SR_INVALIDATE_PAGES:
- jam();
- invalidateLogAfterLastGCI(signal);
- return;
- case LogFileOperationRecord::WRITE_PAGE_ZERO:
- jam();
- writePageZeroLab(signal);
- return;
- case LogFileOperationRecord::LAST_WRITE_IN_FILE:
- jam();
- lastWriteInFileLab(signal);
- return;
- case LogFileOperationRecord::INIT_WRITE_AT_END:
- jam();
- initWriteEndLab(signal);
- return;
- case LogFileOperationRecord::INIT_FIRST_PAGE:
- jam();
- initFirstPageLab(signal);
- return;
- case LogFileOperationRecord::WRITE_GCI_ZERO:
- jam();
- writeGciZeroLab(signal);
- return;
- case LogFileOperationRecord::WRITE_DIRTY:
- jam();
- writeDirtyLab(signal);
- return;
- case LogFileOperationRecord::WRITE_INIT_MBYTE:
- jam();
- writeInitMbyteLab(signal);
- return;
- case LogFileOperationRecord::ACTIVE_WRITE_LOG:
- jam();
- writeLogfileLab(signal);
- return;
- case LogFileOperationRecord::FIRST_PAGE_WRITE_IN_LOGFILE:
- jam();
- firstPageWriteLab(signal);
- return;
- default:
- jam();
- systemErrorLab(signal);
- return;
- }//switch
- }//Dblqh::execFSWRITECONF()
- /* ************>> */
- /* FSWRITEREF > */
- /* ************>> */
- void Dblqh::execFSWRITEREF(Signal* signal)
- {
- jamEntry();
- lfoPtr.i = signal->theData[0];
- ptrCheckGuard(lfoPtr, clfoFileSize, logFileOperationRecord);
- terrorCode = signal->theData[1];
- switch (lfoPtr.p->lfoState) {
- case LogFileOperationRecord::WRITE_PAGE_ZERO:
- jam();
- break;
- case LogFileOperationRecord::LAST_WRITE_IN_FILE:
- jam();
- break;
- case LogFileOperationRecord::INIT_WRITE_AT_END:
- jam();
- break;
- case LogFileOperationRecord::INIT_FIRST_PAGE:
- jam();
- break;
- case LogFileOperationRecord::WRITE_GCI_ZERO:
- jam();
- break;
- case LogFileOperationRecord::WRITE_DIRTY:
- jam();
- break;
- case LogFileOperationRecord::WRITE_INIT_MBYTE:
- jam();
- break;
- case LogFileOperationRecord::ACTIVE_WRITE_LOG:
- jam();
- break;
- case LogFileOperationRecord::FIRST_PAGE_WRITE_IN_LOGFILE:
- jam();
- break;
- case LogFileOperationRecord::WRITE_SR_INVALIDATE_PAGES:
- jam();
- systemErrorLab(signal);
- default:
- jam();
- break;
- }//switch
- {
- char msg[100];
- sprintf(msg, "File system write failed during LogFileOperationRecord state %d", (Uint32)lfoPtr.p->lfoState);
- fsRefError(signal,__LINE__,msg);
- }
- }//Dblqh::execFSWRITEREF()
- /* ========================================================================= */
- /* ======= INITIATE WHEN RECEIVING FSOPENCONF ======= */
- /* */
- /* ========================================================================= */
- void Dblqh::initFsopenconf(Signal* signal)
- {
- logFilePtr.i = signal->theData[0];
- ptrCheckGuard(logFilePtr, clogFileFileSize, logFileRecord);
- logFilePtr.p->fileRef = signal->theData[1];
- logPartPtr.i = logFilePtr.p->logPartRec;
- ptrCheckGuard(logPartPtr, clogPartFileSize, logPartRecord);
- logFilePtr.p->currentMbyte = 0;
- logFilePtr.p->filePosition = 0;
- logFilePtr.p->logFilePagesToDiskWithoutSynch = 0;
- }//Dblqh::initFsopenconf()
- /* ========================================================================= */
- /* ======= INITIATE WHEN RECEIVING FSREADCONF AND FSWRITECONF ======= */
- /* */
- /* ========================================================================= */
- void Dblqh::initFsrwconf(Signal* signal, bool write)
- {
- LogPageRecordPtr logP;
- Uint32 noPages, totPages;
- lfoPtr.i = signal->theData[0];
- ptrCheckGuard(lfoPtr, clfoFileSize, logFileOperationRecord);
- totPages= lfoPtr.p->noPagesRw;
- logFilePtr.i = lfoPtr.p->logFileRec;
- ptrCheckGuard(logFilePtr, clogFileFileSize, logFileRecord);
- logPartPtr.i = logFilePtr.p->logPartRec;
- ptrCheckGuard(logPartPtr, clogPartFileSize, logPartRecord);
- logPagePtr.i = lfoPtr.p->firstLfoPage;
- ptrCheckGuard(logPagePtr, clogPageFileSize, logPageRecord);
- logP= logPagePtr;
- noPages= 1;
- ndbassert(totPages > 0);
- for (;;)
- {
- logP.p->logPageWord[ZPOS_IN_WRITING]= 0;
- logP.p->logPageWord[ZPOS_IN_FREE_LIST]= 0;
- if (noPages == totPages)
- return;
- if (write)
- logP.i= logP.p->logPageWord[ZNEXT_PAGE];
- else
- logP.i= lfoPtr.p->logPageArray[noPages];
- ptrCheckGuard(logP, clogPageFileSize, logPageRecord);
- noPages++;
- }
- }//Dblqh::initFsrwconf()
- /* ######################################################################### */
- /* NORMAL OPERATION MODULE */
- /* THIS MODULE IS A SUB-MODULE OF THE FILE SYSTEM HANDLING. */
- /* */
- /* THIS PART HANDLES THE NORMAL OPENING, CLOSING AND WRITING OF LOG FILES */
- /* DURING NORMAL OPERATION. */
- /* ######################################################################### */
- /*---------------------------------------------------------------------------*/
- /* THIS SIGNAL IS USED TO SUPERVISE THAT THE LOG RECORDS ARE NOT KEPT IN MAIN*/
- /* MEMORY FOR MORE THAN 1 SECOND TO ACHIEVE THE PROPER RELIABILITY. */
- /*---------------------------------------------------------------------------*/
- void Dblqh::timeSup(Signal* signal)
- {
- LogPageRecordPtr origLogPagePtr;
- Uint32 wordWritten;
- jamEntry();
- logPartPtr.i = signal->theData[0];
- ptrCheckGuard(logPartPtr, clogPartFileSize, logPartRecord);
- logFilePtr.i = logPartPtr.p->currentLogfile;
- ptrCheckGuard(logFilePtr, clogFileFileSize, logFileRecord);
- logPagePtr.i = logFilePtr.p->currentLogpage;
- ptrCheckGuard(logPagePtr, clogPageFileSize, logPageRecord);
- if (logPartPtr.p->logPartTimer != logPartPtr.p->logTimer) {
- jam();
- /*--------------------------------------------------------------------------*/
- /* THIS LOG PART HAS NOT WRITTEN TO DISK DURING THE LAST SECOND. */
- /*--------------------------------------------------------------------------*/
- switch (logPartPtr.p->logPartState) {
- case LogPartRecord::FILE_CHANGE_PROBLEM:
- jam();
- /*--------------------------------------------------------------------------*/
- /* THIS LOG PART HAS PROBLEMS IN CHANGING FILES MAKING IT IMPOSSIBLE */
- // TO WRITE TO THE FILE CURRENTLY. WE WILL COMEBACK LATER AND SEE IF
- // THE PROBLEM HAS BEEN FIXED.
- /*--------------------------------------------------------------------------*/
- case LogPartRecord::ACTIVE:
- jam();
- /*---------------------------------------------------------------------------*/
- /* AN OPERATION IS CURRENTLY ACTIVE IN WRITING THIS LOG PART. WE THUS CANNOT */
- /* WRITE ANYTHING TO DISK AT THIS MOMENT. WE WILL SEND A SIGNAL DELAYED FOR */
- /* 10 MS AND THEN TRY AGAIN. POSSIBLY THE LOG PART WILL HAVE BEEN WRITTEN */
- /* UNTIL THEN OR ELSE IT SHOULD BE FREE TO WRITE AGAIN. */
- /*---------------------------------------------------------------------------*/
- signal->theData[0] = ZTIME_SUPERVISION;
- signal->theData[1] = logPartPtr.i;
- sendSignalWithDelay(cownref, GSN_CONTINUEB, signal, 10, 2);
- return;
- break;
- case LogPartRecord::IDLE:
- case LogPartRecord::TAIL_PROBLEM:
- jam();
- /*---------------------------------------------------------------------------*/
- /* IDLE AND NOT WRITTEN TO DISK IN A SECOND. ALSO WHEN WE HAVE A TAIL PROBLEM*/
- /* WE HAVE TO WRITE TO DISK AT TIMES. WE WILL FIRST CHECK WHETHER ANYTHING */
- /* AT ALL HAVE BEEN WRITTEN TO THE PAGES BEFORE WRITING TO DISK. */
- /*---------------------------------------------------------------------------*/
- /* WE HAVE TO WRITE TO DISK IN ALL CASES SINCE THERE COULD BE INFORMATION */
- /* STILL IN THE LOG THAT WAS GENERATED BEFORE THE PREVIOUS TIME SUPERVISION */
- /* BUT AFTER THE LAST DISK WRITE. THIS PREVIOUSLY STOPPED ALL DISK WRITES */
- /* WHEN NO MORE LOG WRITES WERE PERFORMED (THIS HAPPENED WHEN LOG GOT FULL */
- /* AND AFTER LOADING THE INITIAL RECORDS IN INITIAL START). */
- /*---------------------------------------------------------------------------*/
- if (((logFilePtr.p->currentFilepage + 1) & (ZPAGES_IN_MBYTE -1)) == 0) {
- jam();
- /*---------------------------------------------------------------------------*/
- /* THIS IS THE LAST PAGE IN THIS MBYTE. WRITE NEXT LOG AND SWITCH TO NEXT */
- /* MBYTE. */
- /*---------------------------------------------------------------------------*/
- changeMbyte(signal);
- } else {
- /*---------------------------------------------------------------------------*/
- /* WRITE THE LOG PAGE TO DISK EVEN IF IT IS NOT FULL. KEEP PAGE AND WRITE A */
- /* COPY. THE ORIGINAL PAGE WILL BE WRITTEN AGAIN LATER ON. */
- /*---------------------------------------------------------------------------*/
- wordWritten = logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX] - 1;
- origLogPagePtr.i = logPagePtr.i;
- origLogPagePtr.p = logPagePtr.p;
- seizeLogpage(signal);
- MEMCOPY_NO_WORDS(&logPagePtr.p->logPageWord[0],
- &origLogPagePtr.p->logPageWord[0],
- wordWritten + 1);
- ndbrequire(wordWritten < ZPAGE_SIZE);
- if (logFilePtr.p->noLogpagesInBuffer > 0) {
- jam();
- completedLogPage(signal, ZENFORCE_WRITE, __LINE__);
- /*---------------------------------------------------------------------------*/
- /*SINCE WE ARE ONLY WRITING PART OF THE LAST PAGE WE HAVE TO UPDATE THE WORD */
- /*WRITTEN TO REFLECT THE REAL LAST WORD WRITTEN. WE ALSO HAVE TO MOVE THE */
- /*FILE POSITION ONE STEP BACKWARDS SINCE WE ARE NOT WRITING THE LAST PAGE */
- /*COMPLETELY. IT WILL BE WRITTEN AGAIN. */
- /*---------------------------------------------------------------------------*/
- lfoPtr.p->lfoWordWritten = wordWritten;
- logFilePtr.p->filePosition = logFilePtr.p->filePosition - 1;
- } else {
- if (wordWritten == (ZPAGE_HEADER_SIZE - 1)) {
- /*---------------------------------------------------------------------------*/
- /*THIS IS POSSIBLE BUT VERY UNLIKELY. IF THE PAGE WAS COMPLETED AFTER THE LAST*/
- /*WRITE TO DISK THEN NO_LOG_PAGES_IN_BUFFER > 0 AND IF NOT WRITTEN SINCE LAST*/
- /*WRITE TO DISK THEN THE PREVIOUS PAGE MUST HAVE BEEN WRITTEN BY SOME */
- /*OPERATION AND THAT BECAME COMPLETELY FULL. IN ANY CASE WE NEED NOT WRITE AN*/
- /*EMPTY PAGE TO DISK. */
- /*---------------------------------------------------------------------------*/
- jam();
- releaseLogpage(signal);
- } else {
- jam();
- writeSinglePage(signal, logFilePtr.p->currentFilepage,
- wordWritten, __LINE__);
- lfoPtr.p->lfoState = LogFileOperationRecord::ACTIVE_WRITE_LOG;
- }//if
- }//if
- }//if
- break;
- default:
- ndbrequire(false);
- break;
- }//switch
- }//if
- logPartPtr.p->logTimer++;
- return;
- }//Dblqh::timeSup()
- void Dblqh::writeLogfileLab(Signal* signal)
- {
- /*---------------------------------------------------------------------------*/
- /* CHECK IF ANY GLOBAL CHECKPOINTS ARE COMPLETED DUE TO THIS COMPLETED DISK */
- /* WRITE. */
- /*---------------------------------------------------------------------------*/
- switch (logFilePtr.p->fileChangeState) {
- case LogFileRecord::NOT_ONGOING:
- jam();
- checkGcpCompleted(signal,
- ((lfoPtr.p->lfoPageNo + lfoPtr.p->noPagesRw) - 1),
- lfoPtr.p->lfoWordWritten);
- break;
- #if 0
- case LogFileRecord::BOTH_WRITES_ONGOING:
- jam();
- ndbout_c("not crashing!!");
- // Fall-through
- #endif
- case LogFileRecord::WRITE_PAGE_ZERO_ONGOING:
- case LogFileRecord::LAST_WRITE_ONGOING:
- jam();
- logFilePtr.p->lastPageWritten = (lfoPtr.p->lfoPageNo + lfoPtr.p->noPagesRw) - 1;
- logFilePtr.p->lastWordWritten = lfoPtr.p->lfoWordWritten;
- break;
- default:
- jam();
- systemErrorLab(signal);
- return;
- break;
- }//switch
- releaseLfoPages(signal);
- releaseLfo(signal);
- return;
- }//Dblqh::writeLogfileLab()
- void Dblqh::closeWriteLogLab(Signal* signal)
- {
- logFilePtr.p->logFileStatus = LogFileRecord::CLOSED;
- return;
- }//Dblqh::closeWriteLogLab()
- /* ######################################################################### */
- /* FILE CHANGE MODULE */
- /* THIS MODULE IS A SUB-MODULE OF THE FILE SYSTEM HANDLING. */
- /* */
- /*THIS PART OF THE FILE MODULE HANDLES WHEN WE ARE CHANGING LOG FILE DURING */
- /*NORMAL OPERATION. WE HAVE TO BE CAREFUL WHEN WE ARE CHANGING LOG FILE SO */
- /*THAT WE DO NOT COMPLICATE THE SYSTEM RESTART PROCESS TOO MUCH. */
- /*THE IDEA IS THAT WE START BY WRITING THE LAST WRITE IN THE OLD FILE AND WE */
- /*ALSO WRITE THE FIRST PAGE OF THE NEW FILE CONCURRENT WITH THAT. THIS FIRST */
- /*PAGE IN THE NEW FILE DO NOT CONTAIN ANY LOG RECORDS OTHER THAN A DESCRIPTOR*/
- /*CONTAINING INFORMATION ABOUT GCI'S NEEDED AT SYSTEM RESTART AND A NEXT LOG */
- /*RECORD. */
- /* */
- /*WHEN BOTH OF THOSE WRITES HAVE COMPLETED WE ALSO WRITE PAGE ZERO IN FILE */
- /*ZERO. THE ONLY INFORMATION WHICH IS INTERESTING HERE IS THE NEW FILE NUMBER*/
- /* */
- /*IF OPTIMISATIONS ARE NEEDED OF THE LOG HANDLING THEN IT IS POSSIBLE TO */
- /*AVOID WRITING THE FIRST PAGE OF THE NEW PAGE IMMEDIATELY. THIS COMPLICATES */
- /*THE SYSTEM RESTART AND ONE HAS TO TAKE SPECIAL CARE WITH FILE ZERO. IT IS */
- /*HOWEVER NO LARGE PROBLEM TO CHANGE INTO THIS SCENARIO. TO AVOID ALSO THE */
- /*WRITING OF PAGE ZERO IS ALSO POSSIBLE BUT COMPLICATES THE DESIGN EVEN */
- /*FURTHER. IT GETS FAIRLY COMPLEX TO FIND THE END OF THE LOG. SOME SORT OF */
- /*BINARY SEARCH IS HOWEVER MOST LIKELY A GOOD METHODOLOGY FOR THIS. */
- /* ######################################################################### */
- void Dblqh::firstPageWriteLab(Signal* signal)
- {
- releaseLfo(signal);
- /*---------------------------------------------------------------------------*/
- /* RELEASE PAGE ZERO IF THE FILE IS NOT FILE 0. */
- /*---------------------------------------------------------------------------*/
- Uint32 fileNo = logFilePtr.p->fileNo;
- if (fileNo != 0) {
- jam();
- releaseLogpage(signal);
- }//if
- /*---------------------------------------------------------------------------*/
- /* IF A NEW FILE HAS BEEN OPENED WE SHALL ALWAYS ALSO WRITE TO PAGE O IN */
- /* FILE 0. THE AIM IS TO MAKE RESTARTS EASIER BY SPECIFYING WHICH IS THE */
- /* LAST FILE WHERE LOGGING HAS STARTED. */
- /*---------------------------------------------------------------------------*/
- /* FIRST CHECK WHETHER THE LAST WRITE IN THE PREVIOUS FILE HAVE COMPLETED */
- /*---------------------------------------------------------------------------*/
- if (logFilePtr.p->fileChangeState == LogFileRecord::BOTH_WRITES_ONGOING) {
- jam();
- /*---------------------------------------------------------------------------*/
- /* THE LAST WRITE WAS STILL ONGOING. */
- /*---------------------------------------------------------------------------*/
- logFilePtr.p->fileChangeState = LogFileRecord::LAST_WRITE_ONGOING;
- return;
- } else {
- jam();
- ndbrequire(logFilePtr.p->fileChangeState == LogFileRecord::FIRST_WRITE_ONGOING);
- /*---------------------------------------------------------------------------*/
- /* WRITE TO PAGE 0 IN IN FILE 0 NOW. */
- /*---------------------------------------------------------------------------*/
- logFilePtr.p->fileChangeState = LogFileRecord::WRITE_PAGE_ZERO_ONGOING;
- if (fileNo == 0) {
- jam();
- /*---------------------------------------------------------------------------*/
- /* IF THE NEW FILE WAS 0 THEN WE HAVE ALREADY WRITTEN PAGE ZERO IN FILE 0. */
- /*---------------------------------------------------------------------------*/
- logFilePtr.p->fileChangeState = LogFileRecord::NOT_ONGOING;
- return;
- } else {
- jam();
- /*---------------------------------------------------------------------------*/