DblqhMain.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:703k
- /* ABORT OF ACC AND TUP ALREADY COMPLETED. THIS STATE IS ONLY USED WHEN */
- /* CREATING A NEW FRAGMENT. */
- /* ------------------------------------------------------------------------- */
- continueAbortLab(signal);
- return;
- break;
- case TcConnectionrec::WAIT_TUP_TO_ABORT:
- case TcConnectionrec::ABORT_STOPPED:
- case TcConnectionrec::LOG_ABORT_QUEUED:
- case TcConnectionrec::WAIT_ACC_ABORT:
- case TcConnectionrec::ABORT_QUEUED:
- jam();
- /* ------------------------------------------------------------------------- */
- /*ABORT IS ALREADY ONGOING DUE TO SOME ERROR. WE HAVE ALREADY SET THE STATE */
- /*OF THE ABORT SO THAT WE KNOW THAT TC EXPECTS A REPORT. WE CAN THUS SIMPLY */
- /*EXIT. */
- /* ------------------------------------------------------------------------- */
- return;
- break;
- case TcConnectionrec::COMMIT_STOPPED:
- case TcConnectionrec::LOG_COMMIT_QUEUED:
- case TcConnectionrec::COMMIT_QUEUED:
- jam();
- /* ------------------------------------------------------------------------- */
- /*THIS IS ONLY AN ALLOWED STATE IF A DIRTY WRITE OR SIMPLE READ IS PERFORMED.*/
- /*IF WE ARE MERELY CHECKING THE TRANSACTION STATE IT IS ALSO AN ALLOWED STATE*/
- /* ------------------------------------------------------------------------- */
- if (regTcPtr->dirtyOp == ZTRUE) {
- jam();
- /* ------------------------------------------------------------------------- */
- /*COMPLETE THE DIRTY WRITE AND THEN REPORT COMPLETED BACK TO TC. SINCE IT IS */
- /*A DIRTY WRITE IT IS ALLOWED TO COMMIT EVEN IF THE TRANSACTION ABORTS. */
- /* ------------------------------------------------------------------------- */
- return;
- }//if
- if (regTcPtr->simpleRead) {
- jam();
- /* ------------------------------------------------------------------------- */
- /*A SIMPLE READ IS CURRENTLY RELEASING THE LOCKS OR WAITING FOR ACCESS TO */
- /*ACC TO CLEAR THE LOCKS. COMPLETE THIS PROCESS AND THEN RETURN AS NORMAL. */
- /*NO DATA HAS CHANGED DUE TO THIS SIMPLE READ ANYWAY. */
- /* ------------------------------------------------------------------------- */
- return;
- }//if
- ndbrequire(regTcPtr->abortState == TcConnectionrec::NEW_FROM_TC);
- jam();
- /* ------------------------------------------------------------------------- */
- /*WE ARE ONLY CHECKING THE STATUS OF THE TRANSACTION. IT IS COMMITTING. */
- /*COMPLETE THE COMMIT LOCALLY AND THEN SEND REPORT OF COMMITTED TO THE NEW TC*/
- /* ------------------------------------------------------------------------- */
- return;
- break;
- case TcConnectionrec::COMMITTED:
- jam();
- ndbrequire(regTcPtr->abortState == TcConnectionrec::NEW_FROM_TC);
- /* ------------------------------------------------------------------------- */
- /*WE ARE CHECKING TRANSACTION STATUS. REPORT COMMITTED AND CONTINUE WITH THE */
- /*NEXT OPERATION. */
- /* ------------------------------------------------------------------------- */
- sendLqhTransconf(signal, LqhTransConf::Committed);
- return;
- break;
- default:
- ndbrequire(false);
- /* ------------------------------------------------------------------------- */
- /*THE STATE WAS NOT AN ALLOWED STATE ON A NORMAL OPERATION. SCANS AND COPY */
- /*FRAGMENT OPERATIONS SHOULD HAVE EXECUTED IN ANOTHER PATH. */
- /* ------------------------------------------------------------------------- */
- break;
- }//switch
- abortCommonLab(signal);
- return;
- }//Dblqh::abortStateHandlerLab()
- void Dblqh::abortErrorLab(Signal* signal)
- {
- ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
- TcConnectionrec * const regTcPtr = tcConnectptr.p;
- if (regTcPtr->abortState == TcConnectionrec::ABORT_IDLE) {
- jam();
- regTcPtr->abortState = TcConnectionrec::ABORT_FROM_LQH;
- regTcPtr->errorCode = terrorCode;
- }//if
- /* -----------------------------------------------------------------------
- * ACTIVE CREATION IS RESET FOR ALL ERRORS WHICH SHOULD BE HANDLED
- * WITH NORMAL ABORT HANDLING.
- * ----------------------------------------------------------------------- */
- regTcPtr->activeCreat = ZFALSE;
- abortCommonLab(signal);
- return;
- }//Dblqh::abortErrorLab()
- void Dblqh::abortCommonLab(Signal* signal)
- {
- TcConnectionrec * const regTcPtr = tcConnectptr.p;
- const Uint32 commitAckMarker = regTcPtr->commitAckMarker;
- if(regTcPtr->activeCreat != ZTRUE && commitAckMarker != RNIL){
- /**
- * There is no NR ongoing and we have a marker
- */
- jam();
- #ifdef MARKER_TRACE
- {
- CommitAckMarkerPtr tmp;
- m_commitAckMarkerHash.getPtr(tmp, commitAckMarker);
- ndbout_c("Abo marker[%.8x %.8x]", tmp.p->transid1, tmp.p->transid2);
- }
- #endif
- m_commitAckMarkerHash.release(commitAckMarker);
- regTcPtr->commitAckMarker = RNIL;
- }
-
- fragptr.i = regTcPtr->fragmentptr;
- if (fragptr.i != RNIL) {
- jam();
- ptrCheckGuard(fragptr, cfragrecFileSize, fragrecord);
- switch (fragptr.p->fragStatus) {
- case Fragrecord::FSACTIVE:
- case Fragrecord::CRASH_RECOVERING:
- case Fragrecord::ACTIVE_CREATION:
- jam();
- linkActiveFrag(signal);
- abortContinueAfterBlockedLab(signal, true);
- return;
- break;
- case Fragrecord::BLOCKED:
- jam();
- linkFragQueue(signal);
- regTcPtr->transactionState = TcConnectionrec::ABORT_STOPPED;
- return;
- break;
- case Fragrecord::FREE:
- jam();
- case Fragrecord::DEFINED:
- jam();
- case Fragrecord::REMOVING:
- jam();
- default:
- ndbrequire(false);
- break;
- }//switch
- } else {
- jam();
- continueAbortLab(signal);
- }//if
- }//Dblqh::abortCommonLab()
- void Dblqh::abortContinueAfterBlockedLab(Signal* signal, bool canBlock)
- {
- /* ------------------------------------------------------------------------
- * INPUT: TC_CONNECTPTR ACTIVE OPERATION RECORD
- * ------------------------------------------------------------------------
- * ------------------------------------------------------------------------
- * CAN COME HERE AS RESTART AFTER BEING BLOCKED BY A LOCAL CHECKPOINT.
- * ------------------------------------------------------------------------
- * ALSO AS PART OF A NORMAL ABORT WITHOUT BLOCKING.
- * WE MUST ABORT TUP BEFORE ACC TO ENSURE THAT NO ONE RACES IN
- * AND SEES A STATE IN TUP.
- * ------------------------------------------------------------------------ */
- TcConnectionrec * const regTcPtr = tcConnectptr.p;
- fragptr.i = regTcPtr->fragmentptr;
- ptrCheckGuard(fragptr, cfragrecFileSize, fragrecord);
- if ((cCommitBlocked == true) &&
- (fragptr.p->fragActiveStatus == ZTRUE) &&
- (canBlock == true) &&
- (regTcPtr->operation != ZREAD)) {
- jam();
- /* ------------------------------------------------------------------------- */
- // TUP and/or ACC have problems in writing the undo log to disk fast enough.
- // We must avoid the abort at this time and try later instead. The fragment
- // is also active with a local checkpoint and this commit can generate UNDO
- // log records that overflow the UNDO log buffer.
- //
- // In certain situations it is simply too complex to insert a wait state here
- // since ACC is active and we cannot release the operation from the active
- // list without causing great complexity.
- /* ------------------------------------------------------------------------- */
- /*---------------------------------------------------------------------------*/
- // We must delay the write of abort info to the log to safe-guard against
- // a crash due to lack of log pages. We temporary stop all log writes to this
- // log part to ensure that we don't get a buffer explosion in the delayed
- // signal buffer instead.
- /*---------------------------------------------------------------------------*/
- releaseActiveFrag(signal);
- logPartPtr.i = regTcPtr->hashValue & 3;
- ptrCheckGuard(logPartPtr, clogPartFileSize, logPartRecord);
- linkWaitLog(signal, logPartPtr);
- regTcPtr->transactionState = TcConnectionrec::ABORT_QUEUED;
- if (logPartPtr.p->logPartState == LogPartRecord::IDLE) {
- jam();
- logPartPtr.p->logPartState = LogPartRecord::ACTIVE;
- }//if
- return;
- }//if
- signal->theData[0] = regTcPtr->tupConnectrec;
- EXECUTE_DIRECT(DBTUP, GSN_TUP_ABORTREQ, signal, 1);
- regTcPtr->transactionState = TcConnectionrec::WAIT_ACC_ABORT;
- signal->theData[0] = regTcPtr->accConnectrec;
- EXECUTE_DIRECT(DBACC, GSN_ACC_ABORTREQ, signal, 1);
- /* ------------------------------------------------------------------------
- * We need to insert a real-time break by sending ACC_ABORTCONF through the
- * job buffer to ensure that we catch any ACCKEYCONF or TUPKEYCONF or
- * TUPKEYREF that are in the job buffer but not yet processed. Doing
- * everything without that would race and create a state error when they
- * are executed.
- * ----------------------------------------------------------------------- */
- return;
- }//Dblqh::abortContinueAfterBlockedLab()
- /* ******************>> */
- /* ACC_ABORTCONF > */
- /* ******************>> */
- void Dblqh::execACC_ABORTCONF(Signal* signal)
- {
- jamEntry();
- tcConnectptr.i = signal->theData[0];
- ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
- TcConnectionrec * const regTcPtr = tcConnectptr.p;
- ndbrequire(regTcPtr->transactionState == TcConnectionrec::WAIT_ACC_ABORT);
- if (regTcPtr->activeCreat == ZTRUE) {
- /* ----------------------------------------------------------------------
- * A NORMAL EVENT DURING CREATION OF A FRAGMENT. WE NOW NEED TO CONTINUE
- * WITH NORMAL COMMIT PROCESSING.
- * ---------------------------------------------------------------------- */
- if (regTcPtr->currTupAiLen == regTcPtr->totReclenAi) {
- jam();
- regTcPtr->abortState = TcConnectionrec::ABORT_IDLE;
- rwConcludedLab(signal);
- return;
- } else {
- ndbrequire(regTcPtr->currTupAiLen < regTcPtr->totReclenAi);
- jam();
- releaseActiveFrag(signal);
- regTcPtr->transactionState = TcConnectionrec::WAIT_AI_AFTER_ABORT;
- return;
- }//if
- }//if
- releaseActiveFrag(signal);
- continueAbortLab(signal);
- return;
- }//Dblqh::execACC_ABORTCONF()
- void Dblqh::continueAbortLab(Signal* signal)
- {
- TcConnectionrec * const regTcPtr = tcConnectptr.p;
- /* ------------------------------------------------------------------------
- * AN ERROR OCCURED IN THE ACTIVE CREATION AFTER THE ABORT PHASE.
- * WE NEED TO CONTINUE WITH A NORMAL ABORT.
- * ------------------------------------------------------------------------
- * ALSO USED FOR NORMAL CLEAN UP AFTER A NORMAL ABORT.
- * ------------------------------------------------------------------------
- * ALSO USED WHEN NO FRAGMENT WAS SET UP ON OPERATION.
- * ------------------------------------------------------------------------ */
- if (regTcPtr->logWriteState == TcConnectionrec::WRITTEN) {
- jam();
- /* ----------------------------------------------------------------------
- * I NEED TO INSERT A ABORT LOG RECORD SINCE WE ARE WRITING LOG IN THIS
- * TRANSACTION.
- * ---------------------------------------------------------------------- */
- initLogPointers(signal);
- if (logPartPtr.p->logPartState == LogPartRecord::ACTIVE) {
- jam();
- /* --------------------------------------------------------------------
- * A PREPARE OPERATION IS CURRENTLY WRITING IN THE LOG.
- * WE MUST WAIT ON OUR TURN TO WRITE THE LOG.
- * IT IS NECESSARY TO WRITE ONE LOG RECORD COMPLETELY
- * AT A TIME OTHERWISE WE WILL SCRAMBLE THE LOG.
- * -------------------------------------------------------------------- */
- linkWaitLog(signal, logPartPtr);
- regTcPtr->transactionState = TcConnectionrec::LOG_ABORT_QUEUED;
- return;
- }//if
- if (cnoOfLogPages == 0) {
- jam();
- /*---------------------------------------------------------------------------*/
- // We must delay the write of commit info to the log to safe-guard against
- // a crash due to lack of log pages. We temporary stop all log writes to this
- // log part to ensure that we don't get a buffer explosion in the delayed
- // signal buffer instead.
- /*---------------------------------------------------------------------------*/
- linkWaitLog(signal, logPartPtr);
- regTcPtr->transactionState = TcConnectionrec::LOG_ABORT_QUEUED;
- if (logPartPtr.p->logPartState == LogPartRecord::IDLE) {
- jam();
- logPartPtr.p->logPartState = LogPartRecord::ACTIVE;
- }//if
- return;
- }//if
- writeAbortLog(signal);
- removeLogTcrec(signal);
- } else if (regTcPtr->logWriteState == TcConnectionrec::NOT_STARTED) {
- jam();
- } else if (regTcPtr->logWriteState == TcConnectionrec::NOT_WRITTEN) {
- jam();
- /* ------------------------------------------------------------------
- * IT IS A READ OPERATION OR OTHER OPERATION THAT DO NOT USE THE LOG.
- * ------------------------------------------------------------------ */
- /* ------------------------------------------------------------------
- * THE LOG HAS NOT BEEN WRITTEN SINCE THE LOG FLAG WAS FALSE.
- * THIS CAN OCCUR WHEN WE ARE STARTING A NEW FRAGMENT.
- * ------------------------------------------------------------------ */
- regTcPtr->logWriteState = TcConnectionrec::NOT_STARTED;
- } else {
- ndbrequire(regTcPtr->logWriteState == TcConnectionrec::NOT_WRITTEN_WAIT);
- jam();
- /* ----------------------------------------------------------------
- * THE STATE WAS SET TO NOT_WRITTEN BY THE OPERATION BUT LATER
- * A SCAN OF ALL OPERATION RECORD CHANGED IT INTO NOT_WRITTEN_WAIT.
- * THIS INDICATES THAT WE ARE WAITING FOR THIS OPERATION TO COMMIT
- * OR ABORT SO THAT WE CAN FIND THE
- * STARTING GLOBAL CHECKPOINT OF THIS NEW FRAGMENT.
- * ---------------------------------------------------------------- */
- checkScanTcCompleted(signal);
- }//if
- continueAfterLogAbortWriteLab(signal);
- return;
- }//Dblqh::continueAbortLab()
- void Dblqh::continueAfterLogAbortWriteLab(Signal* signal)
- {
- TcConnectionrec * const regTcPtr = tcConnectptr.p;
- if (regTcPtr->simpleRead) {
- jam();
- TcKeyRef * const tcKeyRef = (TcKeyRef *) signal->getDataPtrSend();
-
- tcKeyRef->connectPtr = regTcPtr->applOprec;
- tcKeyRef->transId[0] = regTcPtr->transid[0];
- tcKeyRef->transId[1] = regTcPtr->transid[1];
- tcKeyRef->errorCode = regTcPtr->errorCode;
- sendSignal(regTcPtr->applRef,
- GSN_TCKEYREF, signal, TcKeyRef::SignalLength, JBB);
- cleanUp(signal);
- return;
- }//if
- if (regTcPtr->abortState == TcConnectionrec::ABORT_FROM_LQH) {
- LqhKeyRef * const lqhKeyRef = (LqhKeyRef *)signal->getDataPtrSend();
- jam();
- lqhKeyRef->userRef = regTcPtr->clientConnectrec;
- lqhKeyRef->connectPtr = regTcPtr->tcOprec;
- lqhKeyRef->errorCode = regTcPtr->errorCode;
- lqhKeyRef->transId1 = regTcPtr->transid[0];
- lqhKeyRef->transId2 = regTcPtr->transid[1];
- sendSignal(regTcPtr->clientBlockref, GSN_LQHKEYREF, signal,
- LqhKeyRef::SignalLength, JBB);
- } else if (regTcPtr->abortState == TcConnectionrec::ABORT_FROM_TC) {
- jam();
- sendAborted(signal);
- } else if (regTcPtr->abortState == TcConnectionrec::NEW_FROM_TC) {
- jam();
- sendLqhTransconf(signal, LqhTransConf::Aborted);
- } else {
- ndbrequire(regTcPtr->abortState == TcConnectionrec::REQ_FROM_TC);
- jam();
- signal->theData[0] = regTcPtr->reqRef;
- signal->theData[1] = tcConnectptr.i;
- signal->theData[2] = cownNodeid;
- signal->theData[3] = regTcPtr->transid[0];
- signal->theData[4] = regTcPtr->transid[1];
- sendSignal(regTcPtr->reqBlockref, GSN_ABORTCONF,
- signal, 5, JBB);
- }//if
- cleanUp(signal);
- }//Dblqh::continueAfterLogAbortWriteLab()
- /* ##########################################################################
- * ####### MODULE TO HANDLE TC FAILURE #######
- *
- * ########################################################################## */
- /* ************************************************************************>>
- * NODE_FAILREP: Node failure report. Sender Ndbcntr. Set status of failed
- * node to down and reply with NF_COMPLETEREP to DIH which will report that
- * LQH has completed failure handling.
- * ************************************************************************>> */
- void Dblqh::execNODE_FAILREP(Signal* signal)
- {
- UintR TfoundNodes = 0;
- UintR TnoOfNodes;
- UintR Tdata[MAX_NDB_NODES];
- Uint32 i;
- NodeFailRep * const nodeFail = (NodeFailRep *)&signal->theData[0];
- TnoOfNodes = nodeFail->noOfNodes;
- UintR index = 0;
- for (i = 1; i < MAX_NDB_NODES; i++) {
- jam();
- if(NodeBitmask::get(nodeFail->theNodes, i)){
- jam();
- Tdata[index] = i;
- index++;
- }//if
- }//for
- lcpPtr.i = 0;
- ptrAss(lcpPtr, lcpRecord);
-
- ndbrequire(index == TnoOfNodes);
- ndbrequire(cnoOfNodes - 1 < MAX_NDB_NODES);
- for (i = 0; i < TnoOfNodes; i++) {
- const Uint32 nodeId = Tdata[i];
- lcpPtr.p->m_EMPTY_LCP_REQ.clear(nodeId);
-
- for (Uint32 j = 0; j < cnoOfNodes; j++) {
- jam();
- if (cnodeData[j] == nodeId){
- jam();
- cnodeStatus[j] = ZNODE_DOWN;
-
- TfoundNodes++;
- }//if
- }//for
- NFCompleteRep * const nfCompRep = (NFCompleteRep *)&signal->theData[0];
- nfCompRep->blockNo = DBLQH;
- nfCompRep->nodeId = cownNodeid;
- nfCompRep->failedNodeId = Tdata[i];
- sendSignal(DBDIH_REF, GSN_NF_COMPLETEREP, signal,
- NFCompleteRep::SignalLength, JBB);
- }//for
- ndbrequire(TnoOfNodes == TfoundNodes);
- }//Dblqh::execNODE_FAILREP()
- /* ************************************************************************>>
- * LQH_TRANSREQ: Report status of all transactions where TC was coordinated
- * by a crashed TC
- * ************************************************************************>> */
- /* ************************************************************************>>
- * THIS SIGNAL IS RECEIVED AFTER A NODE CRASH.
- * THE NODE HAD A TC AND COORDINATED A NUMBER OF TRANSACTIONS.
- * NOW THE MASTER NODE IS PICKING UP THOSE TRANSACTIONS
- * TO COMPLETE THEM. EITHER ABORT THEM OR COMMIT THEM.
- * ************************************************************************>> */
- void Dblqh::execLQH_TRANSREQ(Signal* signal)
- {
- jamEntry();
- Uint32 newTcPtr = signal->theData[0];
- BlockReference newTcBlockref = signal->theData[1];
- Uint32 oldNodeId = signal->theData[2];
- tcNodeFailptr.i = oldNodeId;
- ptrCheckGuard(tcNodeFailptr, ctcNodeFailrecFileSize, tcNodeFailRecord);
- if ((tcNodeFailptr.p->tcFailStatus == TcNodeFailRecord::TC_STATE_TRUE) ||
- (tcNodeFailptr.p->tcFailStatus == TcNodeFailRecord::TC_STATE_BREAK)) {
- jam();
- tcNodeFailptr.p->lastNewTcBlockref = newTcBlockref;
- /* ------------------------------------------------------------------------
- * WE HAVE RECEIVED A SIGNAL SPECIFYING THAT WE NEED TO HANDLE THE FAILURE
- * OF A TC. NOW WE RECEIVE ANOTHER SIGNAL WITH THE SAME ORDER. THIS CAN
- * OCCUR IF THE NEW TC FAILS. WE MUST BE CAREFUL IN THIS CASE SO THAT WE DO
- * NOT START PARALLEL ACTIVITIES TRYING TO DO THE SAME THING. WE SAVE THE
- * NEW BLOCK REFERENCE TO THE LAST NEW TC IN A VARIABLE AND ASSIGN TO IT TO
- * NEW_TC_BLOCKREF WHEN THE OLD PROCESS RETURNS TO LQH_TRANS_NEXT. IT IS
- * CERTAIN TO COME THERE SINCE THIS IS THE ONLY PATH TO TAKE CARE OF THE
- * NEXT TC CONNECT RECORD. WE SET THE STATUS TO BREAK TO INDICATE TO THE OLD
- * PROCESS WHAT IS HAPPENING.
- * ------------------------------------------------------------------------ */
- tcNodeFailptr.p->lastNewTcRef = newTcPtr;
- tcNodeFailptr.p->tcFailStatus = TcNodeFailRecord::TC_STATE_BREAK;
- return;
- }//if
- tcNodeFailptr.p->oldNodeId = oldNodeId;
- tcNodeFailptr.p->newTcBlockref = newTcBlockref;
- tcNodeFailptr.p->newTcRef = newTcPtr;
- tcNodeFailptr.p->tcRecNow = 0;
- tcNodeFailptr.p->tcFailStatus = TcNodeFailRecord::TC_STATE_TRUE;
- signal->theData[0] = ZLQH_TRANS_NEXT;
- signal->theData[1] = tcNodeFailptr.i;
- sendSignal(cownref, GSN_CONTINUEB, signal, 2, JBB);
- return;
- }//Dblqh::execLQH_TRANSREQ()
- void Dblqh::lqhTransNextLab(Signal* signal)
- {
- UintR tend;
- UintR tstart;
- UintR guard0;
- if (tcNodeFailptr.p->tcFailStatus == TcNodeFailRecord::TC_STATE_BREAK) {
- jam();
- /* ----------------------------------------------------------------------
- * AN INTERRUPTION TO THIS NODE FAIL HANDLING WAS RECEIVED AND A NEW
- * TC HAVE BEEN ASSIGNED TO TAKE OVER THE FAILED TC. PROBABLY THE OLD
- * NEW TC HAVE FAILED.
- * ---------------------------------------------------------------------- */
- tcNodeFailptr.p->newTcBlockref = tcNodeFailptr.p->lastNewTcBlockref;
- tcNodeFailptr.p->newTcRef = tcNodeFailptr.p->lastNewTcRef;
- tcNodeFailptr.p->tcRecNow = 0;
- tcNodeFailptr.p->tcFailStatus = TcNodeFailRecord::TC_STATE_TRUE;
- }//if
- tstart = tcNodeFailptr.p->tcRecNow;
- tend = tstart + 200;
- guard0 = tend;
- for (tcConnectptr.i = tstart; tcConnectptr.i <= guard0; tcConnectptr.i++) {
- jam();
- if (tcConnectptr.i >= ctcConnectrecFileSize) {
- jam();
- /**
- * Finished with scanning operation record
- *
- * now scan markers
- */
- scanMarkers(signal, tcNodeFailptr.i, 0, RNIL);
- return;
- }//if
- ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
- if (tcConnectptr.p->transactionState != TcConnectionrec::IDLE) {
- if (tcConnectptr.p->transactionState != TcConnectionrec::TC_NOT_CONNECTED) {
- if (tcConnectptr.p->tcScanRec == RNIL) {
- if (refToNode(tcConnectptr.p->tcBlockref) == tcNodeFailptr.p->oldNodeId) {
- if (tcConnectptr.p->operation != ZREAD) {
- jam();
- tcConnectptr.p->tcNodeFailrec = tcNodeFailptr.i;
- tcConnectptr.p->abortState = TcConnectionrec::NEW_FROM_TC;
- abortStateHandlerLab(signal);
- return;
- } else {
- jam();
- if (tcConnectptr.p->opSimple != ZTRUE) {
- jam();
- tcConnectptr.p->tcNodeFailrec = tcNodeFailptr.i;
- tcConnectptr.p->abortState = TcConnectionrec::NEW_FROM_TC;
- abortStateHandlerLab(signal);
- return;
- }//if
- }//if
- }//if
- } else {
- scanptr.i = tcConnectptr.p->tcScanRec;
- c_scanRecordPool.getPtr(scanptr);
- if (scanptr.p->scanType == ScanRecord::COPY) {
- jam();
- if (scanptr.p->scanNodeId == tcNodeFailptr.p->oldNodeId) {
- jam();
- /* ------------------------------------------------------------
- * THE RECEIVER OF THE COPY HAVE FAILED.
- * WE HAVE TO CLOSE THE COPY PROCESS.
- * ------------------------------------------------------------ */
- tcConnectptr.p->tcNodeFailrec = tcNodeFailptr.i;
- tcConnectptr.p->abortState = TcConnectionrec::NEW_FROM_TC;
- closeCopyRequestLab(signal);
- return;
- }//if
- } else {
- if (scanptr.p->scanType == ScanRecord::SCAN) {
- jam();
- if (refToNode(tcConnectptr.p->tcBlockref) ==
- tcNodeFailptr.p->oldNodeId) {
- jam();
- tcConnectptr.p->tcNodeFailrec = tcNodeFailptr.i;
- tcConnectptr.p->abortState = TcConnectionrec::NEW_FROM_TC;
- closeScanRequestLab(signal);
- return;
- }//if
- } else {
- jam();
- /* ------------------------------------------------------------
- * THIS IS AN ERROR THAT SHOULD NOT OCCUR. WE CRASH THE SYSTEM.
- * ------------------------------------------------------------ */
- systemErrorLab(signal);
- return;
- }//if
- }//if
- }//if
- }//if
- }//if
- }//for
- tcNodeFailptr.p->tcRecNow = tend + 1;
- signal->theData[0] = ZLQH_TRANS_NEXT;
- signal->theData[1] = tcNodeFailptr.i;
- sendSignal(cownref, GSN_CONTINUEB, signal, 2, JBB);
- return;
- }//Dblqh::lqhTransNextLab()
- void
- Dblqh::scanMarkers(Signal* signal,
- Uint32 tcNodeFail,
- Uint32 startBucket,
- Uint32 i){
- jam();
-
- TcNodeFailRecordPtr tcNodeFailPtr;
- tcNodeFailPtr.i = tcNodeFail;
- ptrCheckGuard(tcNodeFailPtr, ctcNodeFailrecFileSize, tcNodeFailRecord);
- const Uint32 crashedTcNodeId = tcNodeFailPtr.p->oldNodeId;
-
- CommitAckMarkerIterator iter;
- if(i == RNIL){
- m_commitAckMarkerHash.next(startBucket, iter);
- } else {
- jam();
- iter.curr.i = i;
- iter.bucket = startBucket;
- m_commitAckMarkerHash.getPtr(iter.curr);
- m_commitAckMarkerHash.next(iter);
- }
- const Uint32 RT_BREAK = 256;
- for(i = 0; i<RT_BREAK || iter.bucket == startBucket; i++){
- jam();
-
- if(iter.curr.i == RNIL){
- /**
- * Done with iteration
- */
- jam();
-
- tcNodeFailPtr.p->tcFailStatus = TcNodeFailRecord::TC_STATE_FALSE;
- signal->theData[0] = tcNodeFailPtr.p->newTcRef;
- signal->theData[1] = cownNodeid;
- signal->theData[2] = LqhTransConf::LastTransConf;
- sendSignal(tcNodeFailPtr.p->newTcBlockref, GSN_LQH_TRANSCONF,
- signal, 3, JBB);
- return;
- }
-
- if(iter.curr.p->tcNodeId == crashedTcNodeId){
- jam();
-
- /**
- * Found marker belonging to crashed node
- */
- LqhTransConf * const lqhTransConf = (LqhTransConf *)&signal->theData[0];
- lqhTransConf->tcRef = tcNodeFailPtr.p->newTcRef;
- lqhTransConf->lqhNodeId = cownNodeid;
- lqhTransConf->operationStatus = LqhTransConf::Marker;
- lqhTransConf->transId1 = iter.curr.p->transid1;
- lqhTransConf->transId2 = iter.curr.p->transid2;
- lqhTransConf->apiRef = iter.curr.p->apiRef;
- lqhTransConf->apiOpRec = iter.curr.p->apiOprec;
- sendSignal(tcNodeFailPtr.p->newTcBlockref, GSN_LQH_TRANSCONF,
- signal, 7, JBB);
-
- signal->theData[0] = ZSCAN_MARKERS;
- signal->theData[1] = tcNodeFailPtr.i;
- signal->theData[2] = iter.bucket;
- signal->theData[3] = iter.curr.i;
- sendSignal(cownref, GSN_CONTINUEB, signal, 4, JBB);
- return;
- }
-
- m_commitAckMarkerHash.next(iter);
- }
-
- signal->theData[0] = ZSCAN_MARKERS;
- signal->theData[1] = tcNodeFailPtr.i;
- signal->theData[2] = iter.bucket;
- signal->theData[3] = RNIL;
- sendSignal(cownref, GSN_CONTINUEB, signal, 4, JBB);
- }
- /* #########################################################################
- * ####### SCAN MODULE #######
- *
- * #########################################################################
- * -------------------------------------------------------------------------
- * THIS MODULE CONTAINS THE CODE THAT HANDLES A SCAN OF A PARTICULAR FRAGMENT
- * IT OPERATES UNDER THE CONTROL OF TC AND ORDERS ACC TO PERFORM A SCAN OF
- * ALL TUPLES IN THE FRAGMENT. TUP PERFORMS THE NECESSARY SEARCH CONDITIONS
- * TO ENSURE THAT ONLY VALID TUPLES ARE RETURNED TO THE APPLICATION.
- * ------------------------------------------------------------------------- */
- void Dblqh::execACC_SCAN_INFO(Signal* signal)
- {
- jamEntry();
- scanptr.i = signal->theData[0];
- c_scanRecordPool.getPtr(scanptr);
- Uint32 length = signal->theData[3];
- ndbrequire(length <= 4);
- accScanInfoEnterLab(signal, &signal->theData[4], length);
- }//Dblqh::execACC_SCAN_INFO()
- void Dblqh::execACC_SCAN_INFO24(Signal* signal)
- {
- jamEntry();
- scanptr.i = signal->theData[0];
- c_scanRecordPool.getPtr(scanptr);
- Uint32 length = signal->theData[3];
- ndbrequire(length <= 20);
- accScanInfoEnterLab(signal, &signal->theData[4], length);
- }//Dblqh::execACC_SCAN_INFO24()
- void Dblqh::accScanInfoEnterLab(Signal* signal,
- Uint32* dataPtr,
- Uint32 length)
- {
- ndbrequire(length != 0);
- if (scanptr.p->scanState == ScanRecord::WAIT_SCAN_KEYINFO) {
- jam();
- if (keyinfoLab(signal, dataPtr, length)) {
- jam();
- nextScanConfLoopLab(signal);
- }//if
- } else {
- ndbrequire(scanptr.p->scanState == ScanRecord::WAIT_COPY_KEYINFO);
- jam();
- if (keyinfoLab(signal, dataPtr, length)) {
- jam();
- copySendTupkeyReqLab(signal);
- }//if
- }//if
- }//Dblqh::accScanInfoEnterLab()
- /* *************** */
- /* ACC_SCANCONF > */
- /* *************** */
- void Dblqh::execACC_SCANCONF(Signal* signal)
- {
- AccScanConf * const accScanConf = (AccScanConf *)&signal->theData[0];
- jamEntry();
- scanptr.i = accScanConf->scanPtr;
- c_scanRecordPool.getPtr(scanptr);
- if (scanptr.p->scanState == ScanRecord::WAIT_ACC_SCAN) {
- accScanConfScanLab(signal);
- } else {
- ndbrequire(scanptr.p->scanState == ScanRecord::WAIT_ACC_COPY);
- accScanConfCopyLab(signal);
- }//if
- }//Dblqh::execACC_SCANCONF()
- /* ************>> */
- /* ACC_SCANREF > */
- /* ************>> */
- void Dblqh::execACC_SCANREF(Signal* signal)
- {
- jamEntry();
- ndbrequire(false);
- }//Dblqh::execACC_SCANREF()
- /* ***************>> */
- /* NEXT_SCANCONF > */
- /* ***************>> */
- void Dblqh::execNEXT_SCANCONF(Signal* signal)
- {
- NextScanConf * const nextScanConf = (NextScanConf *)&signal->theData[0];
- jamEntry();
- scanptr.i = nextScanConf->scanPtr;
- c_scanRecordPool.getPtr(scanptr);
- if (nextScanConf->localKeyLength == 1) {
- jam();
- nextScanConf->localKey[1] =
- nextScanConf->localKey[0] & MAX_TUPLES_PER_PAGE;
- nextScanConf->localKey[0] = nextScanConf->localKey[0] >> MAX_TUPLES_BITS;
- }//if
- switch (scanptr.p->scanState) {
- case ScanRecord::WAIT_CLOSE_SCAN:
- jam();
- accScanCloseConfLab(signal);
- break;
- case ScanRecord::WAIT_CLOSE_COPY:
- jam();
- accCopyCloseConfLab(signal);
- break;
- case ScanRecord::WAIT_NEXT_SCAN:
- jam();
- nextScanConfScanLab(signal);
- break;
- case ScanRecord::WAIT_NEXT_SCAN_COPY:
- jam();
- nextScanConfCopyLab(signal);
- break;
- case ScanRecord::WAIT_RELEASE_LOCK:
- jam();
- ndbrequire(signal->length() == 1);
- scanLockReleasedLab(signal);
- break;
- default:
- ndbrequire(false);
- }//switch
- }//Dblqh::execNEXT_SCANCONF()
- /* ***************> */
- /* NEXT_SCANREF > */
- /* ***************> */
- void Dblqh::execNEXT_SCANREF(Signal* signal)
- {
- jamEntry();
- systemErrorLab(signal);
- return;
- }//Dblqh::execNEXT_SCANREF()
- /* ******************> */
- /* STORED_PROCCONF > */
- /* ******************> */
- void Dblqh::execSTORED_PROCCONF(Signal* signal)
- {
- jamEntry();
- tcConnectptr.i = signal->theData[0];
- Uint32 storedProcId = signal->theData[1];
- ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
- scanptr.i = tcConnectptr.p->tcScanRec;
- c_scanRecordPool.getPtr(scanptr);
- switch (scanptr.p->scanState) {
- case ScanRecord::WAIT_STORED_PROC_SCAN:
- jam();
- scanptr.p->scanStoredProcId = storedProcId;
- storedProcConfScanLab(signal);
- break;
- case ScanRecord::WAIT_DELETE_STORED_PROC_ID_SCAN:
- jam();
- releaseActiveFrag(signal);
- tupScanCloseConfLab(signal);
- break;
- case ScanRecord::WAIT_STORED_PROC_COPY:
- jam();
- scanptr.p->scanStoredProcId = storedProcId;
- storedProcConfCopyLab(signal);
- break;
- case ScanRecord::WAIT_DELETE_STORED_PROC_ID_COPY:
- jam();
- releaseActiveFrag(signal);
- tupCopyCloseConfLab(signal);
- break;
- default:
- ndbrequire(false);
- }//switch
- }//Dblqh::execSTORED_PROCCONF()
- /* ****************** */
- /* STORED_PROCREF > */
- /* ****************** */
- void Dblqh::execSTORED_PROCREF(Signal* signal)
- {
- jamEntry();
- tcConnectptr.i = signal->theData[0];
- Uint32 errorCode = signal->theData[1];
- ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
- scanptr.i = tcConnectptr.p->tcScanRec;
- c_scanRecordPool.getPtr(scanptr);
- switch (scanptr.p->scanState) {
- case ScanRecord::WAIT_STORED_PROC_SCAN:
- jam();
- scanptr.p->scanStoredProcId = signal->theData[2];
- tcConnectptr.p->errorCode = errorCode;
- closeScanLab(signal);
- break;
- default:
- ndbrequire(false);
- }//switch
- }//Dblqh::execSTORED_PROCREF()
- /* --------------------------------------------------------------------------
- * ENTER SCAN_NEXTREQ
- * --------------------------------------------------------------------------
- * PRECONDITION:
- * TRANSACTION_STATE = SCAN_STATE
- * SCAN_STATE = WAIT_SCAN_NEXTREQ
- *
- * Case scanLockHold: ZTRUE = Unlock previous round of
- * scanned row(s) and fetch next set of rows.
- * ZFALSE = Fetch new set of rows.
- * Number of rows to read depends on parallelism and how many rows
- * left to scan in the fragment. SCAN_NEXTREQ can also be sent with
- * closeFlag == ZTRUE to close the scan.
- * ------------------------------------------------------------------------- */
- void Dblqh::execSCAN_NEXTREQ(Signal* signal)
- {
- jamEntry();
- const ScanFragNextReq * const nextReq =
- (ScanFragNextReq*)&signal->theData[0];
- const Uint32 transid1 = nextReq->transId1;
- const Uint32 transid2 = nextReq->transId2;
- const Uint32 senderData = nextReq->senderData;
- if (findTransaction(transid1, transid2, senderData) != ZOK){
- jam();
- DEBUG(senderData <<
- " Received SCAN_NEXTREQ in LQH with close flag when closed");
- ndbrequire(nextReq->closeFlag == ZTRUE);
- return;
- }
- // Crash node if signal sender is same node
- CRASH_INSERTION2(5021, refToNode(signal->senderBlockRef()) == cownNodeid);
- // Crash node if signal sender is NOT same node
- CRASH_INSERTION2(5022, refToNode(signal->senderBlockRef()) != cownNodeid);
- if (ERROR_INSERTED(5023)){
- // Drop signal if sender is same node
- if (refToNode(signal->senderBlockRef()) == cownNodeid) {
- CLEAR_ERROR_INSERT_VALUE;
- return;
- }
- }//if
- if (ERROR_INSERTED(5024)){
- // Drop signal if sender is NOT same node
- if (refToNode(signal->senderBlockRef()) != cownNodeid) {
- CLEAR_ERROR_INSERT_VALUE;
- return;
- }
- }//if
- if (ERROR_INSERTED(5025)){
- // Delay signal if sender is NOT same node
- if (refToNode(signal->senderBlockRef()) != cownNodeid) {
- CLEAR_ERROR_INSERT_VALUE;
- sendSignalWithDelay(cownref, GSN_SCAN_NEXTREQ, signal, 1000,
- signal->length());
- return;
- }
- }//if
- if (ERROR_INSERTED(5030)){
- ndbout << "ERROR 5030" << endl;
- CLEAR_ERROR_INSERT_VALUE;
- // Drop signal
- return;
- }//if
- if(ERROR_INSERTED(5036)){
- return;
- }
- scanptr.i = tcConnectptr.p->tcScanRec;
- ndbrequire(scanptr.i != RNIL);
- c_scanRecordPool.getPtr(scanptr);
- scanptr.p->scanTcWaiting = ZTRUE;
- /* ------------------------------------------------------------------
- * If close flag is set this scan should be closed
- * If we are waiting for SCAN_NEXTREQ set flag to stop scanning and
- * continue execution else set flags and wait until the scan
- * completes itself
- * ------------------------------------------------------------------ */
- if (nextReq->closeFlag == ZTRUE){
- jam();
- if(ERROR_INSERTED(5034)){
- CLEAR_ERROR_INSERT_VALUE;
- }
- if(ERROR_INSERTED(5036)){
- CLEAR_ERROR_INSERT_VALUE;
- return;
- }
- closeScanRequestLab(signal);
- return;
- }//if
- fragptr.i = tcConnectptr.p->fragmentptr;
- ptrCheckGuard(fragptr, cfragrecFileSize, fragrecord);
- /**
- * Change parameters while running
- * (is currently not supported)
- */
- const Uint32 max_rows = nextReq->batch_size_rows;
- const Uint32 max_bytes = nextReq->batch_size_bytes;
- ndbrequire(scanptr.p->m_max_batch_size_rows == max_rows);
- ndbrequire(scanptr.p->m_max_batch_size_bytes == max_bytes);
- /* --------------------------------------------------------------------
- * If scanLockHold = TRUE we need to unlock previous round of
- * scanned records.
- * scanReleaseLocks will set states for this and send a NEXT_SCANREQ.
- * When confirm signal NEXT_SCANCONF arrives we call
- * continueScanNextReqLab to continue scanning new rows and
- * acquiring new locks.
- * -------------------------------------------------------------------- */
- if ((scanptr.p->scanLockHold == ZTRUE) &&
- (scanptr.p->m_curr_batch_size_rows > 0)) {
- jam();
- scanptr.p->scanReleaseCounter = 1;
- scanReleaseLocksLab(signal);
- return;
- }//if
- /* -----------------------------------------------------------------------
- * We end up here when scanLockHold = FALSE or no rows was locked from
- * previous round.
- * Simply continue scanning.
- * ----------------------------------------------------------------------- */
- continueScanNextReqLab(signal);
- }//Dblqh::execSCAN_NEXTREQ()
- void Dblqh::continueScanNextReqLab(Signal* signal)
- {
- if (scanptr.p->scanCompletedStatus == ZTRUE) {
- jam();
- closeScanLab(signal);
- return;
- }//if
-
- if(scanptr.p->m_last_row){
- jam();
- scanptr.p->scanCompletedStatus = ZTRUE;
- scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ;
- sendScanFragConf(signal, ZFALSE);
- return;
- }
- // Update timer on tcConnectRecord
- tcConnectptr.p->tcTimer = cLqhTimeOutCount;
- init_acc_ptr_list(scanptr.p);
- scanptr.p->scanFlag = NextScanReq::ZSCAN_NEXT;
- scanNextLoopLab(signal);
- }//Dblqh::continueScanNextReqLab()
- /* -------------------------------------------------------------------------
- * WE NEED TO RELEASE LOCKS BEFORE CONTINUING
- * ------------------------------------------------------------------------- */
- void Dblqh::scanReleaseLocksLab(Signal* signal)
- {
- switch (fragptr.p->fragStatus) {
- case Fragrecord::FSACTIVE:
- jam();
- linkActiveFrag(signal);
- break;
- case Fragrecord::BLOCKED:
- jam();
- linkFragQueue(signal);
- tcConnectptr.p->transactionState = TcConnectionrec::SCAN_RELEASE_STOPPED;
- return;
- break;
- case Fragrecord::FREE:
- jam();
- case Fragrecord::ACTIVE_CREATION:
- jam();
- case Fragrecord::CRASH_RECOVERING:
- jam();
- case Fragrecord::DEFINED:
- jam();
- case Fragrecord::REMOVING:
- jam();
- default:
- ndbrequire(false);
- }//switch
- continueScanReleaseAfterBlockedLab(signal);
- }//Dblqh::scanReleaseLocksLab()
- void Dblqh::continueScanReleaseAfterBlockedLab(Signal* signal)
- {
- scanptr.i = tcConnectptr.p->tcScanRec;
- c_scanRecordPool.getPtr(scanptr);
- scanptr.p->scanState = ScanRecord::WAIT_RELEASE_LOCK;
- signal->theData[0] = scanptr.p->scanAccPtr;
- signal->theData[1]=
- get_acc_ptr_from_scan_record(scanptr.p,
- scanptr.p->scanReleaseCounter -1,
- false);
- signal->theData[2] = NextScanReq::ZSCAN_COMMIT;
- if (! scanptr.p->rangeScan)
- sendSignal(tcConnectptr.p->tcAccBlockref, GSN_NEXT_SCANREQ, signal, 3, JBB);
- else
- sendSignal(tcConnectptr.p->tcTuxBlockref, GSN_NEXT_SCANREQ, signal, 3, JBB);
- }//Dblqh::continueScanReleaseAfterBlockedLab()
- /* -------------------------------------------------------------------------
- * ENTER SCAN_NEXTREQ
- * -------------------------------------------------------------------------
- * SCAN_NEXT_REQ SIGNAL ARRIVED IN THE MIDDLE OF EXECUTION OF THE SCAN.
- * IT WAS A REQUEST TO CLOSE THE SCAN. WE WILL CLOSE THE SCAN IN A
- * CAREFUL MANNER TO ENSURE THAT NO ERROR OCCURS.
- * -------------------------------------------------------------------------
- * PRECONDITION:
- * TRANSACTION_STATE = SCAN_STATE_USED
- * TSCAN_COMPLETED = ZTRUE
- * -------------------------------------------------------------------------
- * WE CAN ALSO ARRIVE AT THIS LABEL AFTER A NODE CRASH OF THE SCAN
- * COORDINATOR.
- * ------------------------------------------------------------------------- */
- void Dblqh::closeScanRequestLab(Signal* signal)
- {
- DEBUG("transactionState = " << tcConnectptr.p->transactionState);
- switch (tcConnectptr.p->transactionState) {
- case TcConnectionrec::SCAN_STATE_USED:
- DEBUG("scanState = " << scanptr.p->scanState);
- switch (scanptr.p->scanState) {
- case ScanRecord::IN_QUEUE:
- jam();
- tupScanCloseConfLab(signal);
- break;
- case ScanRecord::WAIT_SCAN_KEYINFO:
- case ScanRecord::WAIT_NEXT_SCAN:
- jam();
- /* -------------------------------------------------------------------
- * SET COMPLETION STATUS AND WAIT FOR OPPORTUNITY TO STOP THE SCAN.
- * ------------------------------------------------------------------- */
- scanptr.p->scanCompletedStatus = ZTRUE;
- break;
- case ScanRecord::WAIT_ACC_SCAN:
- case ScanRecord::WAIT_STORED_PROC_SCAN:
- jam();
- /* -------------------------------------------------------------------
- * WE ARE CURRENTLY STARTING UP THE SCAN. SET COMPLETED STATUS
- * AND WAIT FOR COMPLETION OF STARTUP.
- * ------------------------------------------------------------------- */
- scanptr.p->scanCompletedStatus = ZTRUE;
- break;
- case ScanRecord::WAIT_CLOSE_SCAN:
- case ScanRecord::WAIT_DELETE_STORED_PROC_ID_SCAN:
- jam();
- /*empty*/;
- break;
- /* -------------------------------------------------------------------
- * CLOSE IS ALREADY ONGOING. WE NEED NOT DO ANYTHING.
- * ------------------------------------------------------------------- */
- case ScanRecord::WAIT_RELEASE_LOCK:
- jam();
- /* -------------------------------------------------------------------
- * WE ARE CURRENTLY RELEASING RECORD LOCKS. AFTER COMPLETING THIS
- * WE WILL START TO CLOSE THE SCAN.
- * ------------------------------------------------------------------- */
- scanptr.p->scanCompletedStatus = ZTRUE;
- break;
- case ScanRecord::WAIT_SCAN_NEXTREQ:
- jam();
- /* -------------------------------------------------------------------
- * WE ARE WAITING FOR A SCAN_NEXTREQ FROM SCAN COORDINATOR(TC)
- * WICH HAVE CRASHED. CLOSE THE SCAN
- * ------------------------------------------------------------------- */
- scanptr.p->scanCompletedStatus = ZTRUE;
- fragptr.i = tcConnectptr.p->fragmentptr;
- ptrCheckGuard(fragptr, cfragrecFileSize, fragrecord);
- if (scanptr.p->scanLockHold == ZTRUE) {
- if (scanptr.p->m_curr_batch_size_rows > 0) {
- jam();
- scanptr.p->scanReleaseCounter = 1;
- scanReleaseLocksLab(signal);
- return;
- }//if
- }//if
- closeScanLab(signal);
- break;
- default:
- ndbrequire(false);
- }//switch
- break;
- case TcConnectionrec::WAIT_SCAN_AI:
- jam();
- /* ---------------------------------------------------------------------
- * WE ARE STILL WAITING FOR THE ATTRIBUTE INFORMATION THAT
- * OBVIOUSLY WILL NOT ARRIVE. WE CAN QUIT IMMEDIATELY HERE.
- * --------------------------------------------------------------------- */
- releaseOprec(signal);
- if (tcConnectptr.p->abortState == TcConnectionrec::NEW_FROM_TC) {
- jam();
- tcNodeFailptr.i = tcConnectptr.p->tcNodeFailrec;
- ptrCheckGuard(tcNodeFailptr, ctcNodeFailrecFileSize, tcNodeFailRecord);
- tcNodeFailptr.p->tcRecNow = tcConnectptr.i + 1;
- signal->theData[0] = ZLQH_TRANS_NEXT;
- signal->theData[1] = tcNodeFailptr.i;
- sendSignal(cownref, GSN_CONTINUEB, signal, 2, JBB);
- return;
- }//if
- tcConnectptr.p->abortState = TcConnectionrec::ABORT_ACTIVE;
- scanptr.p->m_curr_batch_size_rows = 0;
- scanptr.p->m_curr_batch_size_bytes= 0;
- sendScanFragConf(signal, ZTRUE);
- abort_scan(signal, scanptr.i, 0);
- return;
- break;
- case TcConnectionrec::SCAN_TUPKEY:
- case TcConnectionrec::SCAN_FIRST_STOPPED:
- case TcConnectionrec::SCAN_CHECK_STOPPED:
- case TcConnectionrec::SCAN_STOPPED:
- jam();
- /* ---------------------------------------------------------------------
- * SET COMPLETION STATUS AND WAIT FOR OPPORTUNITY TO STOP THE SCAN.
- * --------------------------------------------------------------------- */
- scanptr.p->scanCompletedStatus = ZTRUE;
- break;
- case TcConnectionrec::SCAN_RELEASE_STOPPED:
- jam();
- /* ---------------------------------------------------------------------
- * WE ARE CURRENTLY RELEASING RECORD LOCKS. AFTER COMPLETING
- * THIS WE WILL START TO CLOSE THE SCAN.
- * --------------------------------------------------------------------- */
- scanptr.p->scanCompletedStatus = ZTRUE;
- break;
- case TcConnectionrec::SCAN_CLOSE_STOPPED:
- jam();
- /* ---------------------------------------------------------------------
- * CLOSE IS ALREADY ONGOING. WE NEED NOT DO ANYTHING.
- * --------------------------------------------------------------------- */
- /*empty*/;
- break;
- default:
- ndbrequire(false);
- }//switch
- }//Dblqh::closeScanRequestLab()
- /* -------------------------------------------------------------------------
- * ENTER NEXT_SCANCONF
- * -------------------------------------------------------------------------
- * PRECONDITION: SCAN_STATE = WAIT_RELEASE_LOCK
- * ------------------------------------------------------------------------- */
- void Dblqh::scanLockReleasedLab(Signal* signal)
- {
- tcConnectptr.i = scanptr.p->scanTcrec;
- ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
- releaseActiveFrag(signal);
- if (scanptr.p->scanReleaseCounter == scanptr.p->m_curr_batch_size_rows) {
- if ((scanptr.p->scanErrorCounter > 0) ||
- (scanptr.p->scanCompletedStatus == ZTRUE)) {
- jam();
- scanptr.p->m_curr_batch_size_rows = 0;
- scanptr.p->m_curr_batch_size_bytes = 0;
- closeScanLab(signal);
- } else if (scanptr.p->check_scan_batch_completed() &&
- scanptr.p->scanLockHold != ZTRUE) {
- jam();
- scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ;
- sendScanFragConf(signal, ZFALSE);
- } else if (scanptr.p->m_last_row && !scanptr.p->scanLockHold) {
- jam();
- closeScanLab(signal);
- return;
- } else {
- jam();
- /*
- * We came here after releasing locks after
- * receiving SCAN_NEXTREQ from TC. We only come here
- * when scanHoldLock == ZTRUE
- */
- scanptr.p->m_curr_batch_size_rows = 0;
- scanptr.p->m_curr_batch_size_bytes = 0;
- continueScanNextReqLab(signal);
- }//if
- } else if (scanptr.p->scanReleaseCounter < scanptr.p->m_curr_batch_size_rows) {
- jam();
- scanptr.p->scanReleaseCounter++;
- scanReleaseLocksLab(signal);
- } else {
- jam();
- /*
- We come here when we have been scanning for a long time and not been able
- to find m_max_batch_size_rows records to return. We needed to release
- the record we didn't want, but now we are returning all found records to
- the API.
- */
- scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ;
- sendScanFragConf(signal, ZFALSE);
- }//if
- }//Dblqh::scanLockReleasedLab()
- bool
- Dblqh::seize_acc_ptr_list(ScanRecord* scanP, Uint32 batch_size)
- {
- Uint32 i;
- Uint32 attr_buf_recs= (batch_size + 30) / 32;
- if (batch_size > 1) {
- if (c_no_attrinbuf_recs < attr_buf_recs) {
- jam();
- return false;
- }
- for (i= 1; i <= attr_buf_recs; i++) {
- scanP->scan_acc_op_ptr[i]= seize_attrinbuf();
- }
- }
- scanP->scan_acc_attr_recs= attr_buf_recs;
- scanP->scan_acc_index = 0;
- return true;
- }
- void
- Dblqh::release_acc_ptr_list(ScanRecord* scanP)
- {
- Uint32 i, attr_buf_recs;
- attr_buf_recs= scanP->scan_acc_attr_recs;
- for (i= 1; i <= attr_buf_recs; i++) {
- release_attrinbuf(scanP->scan_acc_op_ptr[i]);
- }
- scanP->scan_acc_attr_recs= 0;
- scanP->scan_acc_index = 0;
- }
- Uint32
- Dblqh::seize_attrinbuf()
- {
- AttrbufPtr regAttrPtr;
- Uint32 ret_attr_buf;
- ndbrequire(c_no_attrinbuf_recs > 0);
- c_no_attrinbuf_recs--;
- ret_attr_buf= cfirstfreeAttrinbuf;
- regAttrPtr.i= ret_attr_buf;
- ptrCheckGuard(regAttrPtr, cattrinbufFileSize, attrbuf);
- cfirstfreeAttrinbuf= regAttrPtr.p->attrbuf[ZINBUF_NEXT];
- return ret_attr_buf;
- }
- Uint32
- Dblqh::release_attrinbuf(Uint32 attr_buf_i)
- {
- Uint32 next_buf;
- AttrbufPtr regAttrPtr;
- c_no_attrinbuf_recs++;
- regAttrPtr.i= attr_buf_i;
- ptrCheckGuard(regAttrPtr, cattrinbufFileSize, attrbuf);
- next_buf= regAttrPtr.p->attrbuf[ZINBUF_NEXT];
- regAttrPtr.p->attrbuf[ZINBUF_NEXT]= cfirstfreeAttrinbuf;
- cfirstfreeAttrinbuf= regAttrPtr.i;
- return next_buf;
- }
- void
- Dblqh::init_acc_ptr_list(ScanRecord* scanP)
- {
- scanP->scan_acc_index = 0;
- }
- Uint32
- Dblqh::get_acc_ptr_from_scan_record(ScanRecord* scanP,
- Uint32 index,
- bool crash_flag)
- {
- Uint32* acc_ptr;
- Uint32 attr_buf_rec, attr_buf_index;
- if (!((index < MAX_PARALLEL_OP_PER_SCAN) &&
- index < scanP->scan_acc_index)) {
- ndbrequire(crash_flag);
- return RNIL;
- }
- i_get_acc_ptr(scanP, acc_ptr, index);
- return *acc_ptr;
- }
- void
- Dblqh::set_acc_ptr_in_scan_record(ScanRecord* scanP,
- Uint32 index, Uint32 acc)
- {
- Uint32 *acc_ptr;
- ndbrequire((index == 0 || scanP->scan_acc_index == index) &&
- (index < MAX_PARALLEL_OP_PER_SCAN));
- scanP->scan_acc_index= index + 1;
- i_get_acc_ptr(scanP, acc_ptr, index);
- *acc_ptr= acc;
- }
- /* -------------------------------------------------------------------------
- * SCAN_FRAGREQ: Request to start scanning the specified fragment of a table.
- * ------------------------------------------------------------------------- */
- void Dblqh::execSCAN_FRAGREQ(Signal* signal)
- {
- ScanFragReq * const scanFragReq = (ScanFragReq *)&signal->theData[0];
- ScanFragRef * ref;
- const Uint32 transid1 = scanFragReq->transId1;
- const Uint32 transid2 = scanFragReq->transId2;
- Uint32 errorCode= 0;
- Uint32 senderData;
- Uint32 hashIndex;
- TcConnectionrecPtr nextHashptr;
- jamEntry();
- const Uint32 reqinfo = scanFragReq->requestInfo;
- const Uint32 fragId = (scanFragReq->fragmentNoKeyLen & 0xFFFF);
- const Uint32 keyLen = (scanFragReq->fragmentNoKeyLen >> 16);
- tabptr.i = scanFragReq->tableId;
- const Uint32 max_rows = scanFragReq->batch_size_rows;
- const Uint32 scanLockMode = ScanFragReq::getLockMode(reqinfo);
- const Uint8 keyinfo = ScanFragReq::getKeyinfoFlag(reqinfo);
- const Uint8 rangeScan = ScanFragReq::getRangeScanFlag(reqinfo);
-
- ptrCheckGuard(tabptr, ctabrecFileSize, tablerec);
- if(tabptr.p->tableStatus != Tablerec::TABLE_DEFINED){
- senderData = scanFragReq->senderData;
- goto error_handler_early_1;
- }
-
- if (cfirstfreeTcConrec != RNIL) {
- seizeTcrec();
- tcConnectptr.p->clientConnectrec = scanFragReq->senderData;
- tcConnectptr.p->clientBlockref = signal->senderBlockRef();
- tcConnectptr.p->savePointId = scanFragReq->savePointId;
- } else {
- jam();
- /* --------------------------------------------------------------------
- * NO FREE TC RECORD AVAILABLE, THUS WE CANNOT HANDLE THE REQUEST.
- * -------------------------------------------------------------------- */
- errorCode = ZNO_TC_CONNECT_ERROR;
- senderData = scanFragReq->senderData;
- goto error_handler_early;
- }//if
- /**
- * A write allways have to get keyinfo
- */
- ndbrequire(scanLockMode == 0 || keyinfo);
- ndbrequire(max_rows > 0 && max_rows <= MAX_PARALLEL_OP_PER_SCAN);
- if (!getFragmentrec(signal, fragId)) {
- errorCode = 1231;
- goto error_handler;
- }//if
- // Verify scan type vs table type (both sides are boolean)
- if (rangeScan != DictTabInfo::isOrderedIndex(fragptr.p->tableType)) {
- errorCode = 1232;
- goto error_handler;
- }//if
-
- // 1 scan record is reserved for node recovery
- if (cscanNoFreeRec < 2) {
- jam();
- errorCode = ScanFragRef::ZNO_FREE_SCANREC_ERROR;
- goto error_handler;
- }
- // XXX adjust cmaxAccOps for range scans and remove this comment
- if ((cbookedAccOps + max_rows) > cmaxAccOps) {
- jam();
- errorCode = ScanFragRef::ZSCAN_BOOK_ACC_OP_ERROR;
- goto error_handler;
- }//if
- ndbrequire(c_scanRecordPool.seize(scanptr));
- initScanTc(signal,
- transid1,
- transid2,
- fragId,
- ZNIL);
- tcConnectptr.p->save1 = 4;
- tcConnectptr.p->primKeyLen = keyLen + 4; // hard coded in execKEYINFO
- errorCode = initScanrec(scanFragReq);
- if (errorCode != ZOK) {
- jam();
- goto error_handler2;
- }//if
- cscanNoFreeRec--;
- cbookedAccOps += max_rows;
- hashIndex = (tcConnectptr.p->transid[0] ^ tcConnectptr.p->tcOprec) & 1023;
- nextHashptr.i = ctransidHash[hashIndex];
- ctransidHash[hashIndex] = tcConnectptr.i;
- tcConnectptr.p->prevHashRec = RNIL;
- tcConnectptr.p->nextHashRec = nextHashptr.i;
- if (nextHashptr.i != RNIL) {
- jam();
- /* ---------------------------------------------------------------------
- * ENSURE THAT THE NEXT RECORD HAS SET PREVIOUS TO OUR RECORD
- * IF IT EXISTS
- * --------------------------------------------------------------------- */
- ptrCheckGuard(nextHashptr, ctcConnectrecFileSize, tcConnectionrec);
- nextHashptr.p->prevHashRec = tcConnectptr.i;
- }//if
- if (scanptr.p->scanAiLength > 0) {
- jam();
- tcConnectptr.p->transactionState = TcConnectionrec::WAIT_SCAN_AI;
- return;
- }//if
- continueAfterReceivingAllAiLab(signal);
- return;
- error_handler2:
- // no scan number allocated
- c_scanRecordPool.release(scanptr);
- error_handler:
- ref = (ScanFragRef*)&signal->theData[0];
- tcConnectptr.p->abortState = TcConnectionrec::ABORT_ACTIVE;
- ref->senderData = tcConnectptr.p->clientConnectrec;
- ref->transId1 = transid1;
- ref->transId2 = transid2;
- ref->errorCode = errorCode;
- sendSignal(tcConnectptr.p->clientBlockref, GSN_SCAN_FRAGREF, signal,
- ScanFragRef::SignalLength, JBB);
- releaseOprec(signal);
- releaseTcrec(signal, tcConnectptr);
- return;
- error_handler_early_1:
- if(tabptr.p->tableStatus == Tablerec::NOT_DEFINED){
- jam();
- errorCode = ZTABLE_NOT_DEFINED;
- } else if (tabptr.p->tableStatus == Tablerec::PREP_DROP_TABLE_ONGOING ||
- tabptr.p->tableStatus == Tablerec::PREP_DROP_TABLE_DONE){
- jam();
- errorCode = ZDROP_TABLE_IN_PROGRESS;
- } else {
- ndbrequire(0);
- }
- error_handler_early:
- ref = (ScanFragRef*)&signal->theData[0];
- ref->senderData = senderData;
- ref->transId1 = transid1;
- ref->transId2 = transid2;
- ref->errorCode = errorCode;
- sendSignal(signal->senderBlockRef(), GSN_SCAN_FRAGREF, signal,
- ScanFragRef::SignalLength, JBB);
- }//Dblqh::execSCAN_FRAGREQ()
- void Dblqh::continueAfterReceivingAllAiLab(Signal* signal)
- {
- tcConnectptr.p->transactionState = TcConnectionrec::SCAN_STATE_USED;
- if(scanptr.p->scanState == ScanRecord::IN_QUEUE){
- jam();
- return;
- }
-
- scanptr.p->scanState = ScanRecord::WAIT_ACC_SCAN;
- AccScanReq * req = (AccScanReq*)&signal->theData[0];
- req->senderData = scanptr.i;
- req->senderRef = cownref;
- req->tableId = tcConnectptr.p->tableref;
- req->fragmentNo = tcConnectptr.p->fragmentid;
- req->requestInfo = 0;
- AccScanReq::setLockMode(req->requestInfo, scanptr.p->scanLockMode);
- AccScanReq::setKeyinfoFlag(req->requestInfo, scanptr.p->scanKeyinfoFlag);
- AccScanReq::setReadCommittedFlag(req->requestInfo, scanptr.p->readCommitted);
- req->transId1 = tcConnectptr.p->transid[0];
- req->transId2 = tcConnectptr.p->transid[1];
- req->savePointId = tcConnectptr.p->savePointId;
- // always use if-stmt to switch (instead of setting a "scan block ref")
- if (! scanptr.p->rangeScan)
- sendSignal(tcConnectptr.p->tcAccBlockref, GSN_ACC_SCANREQ, signal,
- AccScanReq::SignalLength, JBB);
- else
- sendSignal(tcConnectptr.p->tcTuxBlockref, GSN_ACC_SCANREQ, signal,
- AccScanReq::SignalLength, JBB);
- }//Dblqh::continueAfterReceivingAllAiLab()
- void Dblqh::scanAttrinfoLab(Signal* signal, Uint32* dataPtr, Uint32 length)
- {
- scanptr.i = tcConnectptr.p->tcScanRec;
- c_scanRecordPool.getPtr(scanptr);
- if (saveTupattrbuf(signal, dataPtr, length) == ZOK) {
- if (tcConnectptr.p->currTupAiLen < scanptr.p->scanAiLength) {
- jam();
- } else {
- jam();
- ndbrequire(tcConnectptr.p->currTupAiLen == scanptr.p->scanAiLength);
- continueAfterReceivingAllAiLab(signal);
- }//if
- return;
- }//if
- abort_scan(signal, scanptr.i, ZGET_ATTRINBUF_ERROR);
- }
- void Dblqh::abort_scan(Signal* signal, Uint32 scan_ptr_i, Uint32 errcode){
- jam();
- scanptr.i = scan_ptr_i;
- c_scanRecordPool.getPtr(scanptr);
- fragptr.i = tcConnectptr.p->fragmentptr;
- ptrCheckGuard(fragptr, cfragrecFileSize, fragrecord);
- finishScanrec(signal);
- releaseScanrec(signal);
- tcConnectptr.p->transactionState = TcConnectionrec::IDLE;
- tcConnectptr.p->abortState = TcConnectionrec::ABORT_ACTIVE;
- if(errcode)
- {
- jam();
- ScanFragRef * ref = (ScanFragRef*)&signal->theData[0];
- ref->senderData = tcConnectptr.p->clientConnectrec;
- ref->transId1 = tcConnectptr.p->transid[0];
- ref->transId2 = tcConnectptr.p->transid[1];
- ref->errorCode = errcode;
- sendSignal(tcConnectptr.p->clientBlockref, GSN_SCAN_FRAGREF, signal,
- ScanFragRef::SignalLength, JBB);
- }
- deleteTransidHash(signal);
- releaseOprec(signal);
- releaseTcrec(signal, tcConnectptr);
- }
- /*---------------------------------------------------------------------*/
- /* Send this 'I am alive' signal to TC when it is received from ACC */
- /* We include the scanPtr.i that comes from ACC in signalData[1], this */
- /* tells TC which fragment record to check for a timeout. */
- /*---------------------------------------------------------------------*/
- void Dblqh::execSCAN_HBREP(Signal* signal)
- {
- jamEntry();
- scanptr.i = signal->theData[0];
- c_scanRecordPool.getPtr(scanptr);
- switch(scanptr.p->scanType){
- case ScanRecord::SCAN:
- if (scanptr.p->scanTcWaiting == ZTRUE) {
- jam();
- tcConnectptr.i = scanptr.p->scanTcrec;
- ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
- ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
- const Uint32 transid1 = signal->theData[1];
- const Uint32 transid2 = signal->theData[2];
- ndbrequire(transid1 == tcConnectptr.p->transid[0] &&
- transid2 == tcConnectptr.p->transid[1]);
- // Update counter on tcConnectPtr
- if (tcConnectptr.p->tcTimer != 0){
- tcConnectptr.p->tcTimer = cLqhTimeOutCount;
- } else {
- jam();
- //ndbout << "SCAN_HBREP when tcTimer was off" << endl;
- }
-
- signal->theData[0] = tcConnectptr.p->clientConnectrec;
- signal->theData[1] = tcConnectptr.p->transid[0];
- signal->theData[2] = tcConnectptr.p->transid[1];
- sendSignal(tcConnectptr.p->clientBlockref,
- GSN_SCAN_HBREP, signal, 3, JBB);
- }//if
- break;
- case ScanRecord::COPY:
- // ndbout << "Dblqh::execSCAN_HBREP Dropping SCAN_HBREP" << endl;
- break;
- default:
- ndbrequire(false);
- }
- }
- void Dblqh::sendScanFragRefLateLab(Signal* signal)
- {
- }//Dblqh::sendScanFragRefLateLab()
- void Dblqh::accScanConfScanLab(Signal* signal)
- {
- AccScanConf * const accScanConf = (AccScanConf *)&signal->theData[0];
- tcConnectptr.i = scanptr.p->scanTcrec;
- ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
- /* -----------------------------------------------------------------------
- * PRECONDITION: SCAN_STATE = WAIT_ACC_SCAN
- * ----------------------------------------------------------------------- */
- if (accScanConf->flag == AccScanConf::ZEMPTY_FRAGMENT) {
- jam();
- /* ---------------------------------------------------------------------
- * THE FRAGMENT WAS EMPTY.
- * REPORT SUCCESSFUL COPYING.
- * --------------------------------------------------------------------- */
- tupScanCloseConfLab(signal);
- return;
- }//if
- scanptr.p->scanAccPtr = accScanConf->accPtr;
- Uint32 boundAiLength = tcConnectptr.p->primKeyLen - 4;
- if (scanptr.p->rangeScan) {
- jam();
- TuxBoundInfo* const req = (TuxBoundInfo*)signal->getDataPtrSend();
- req->errorCode = RNIL;
- req->tuxScanPtrI = scanptr.p->scanAccPtr;
- req->boundAiLength = boundAiLength;
- if(boundAiLength > 0)
- sendKeyinfoAcc(signal, TuxBoundInfo::SignalLength);
- EXECUTE_DIRECT(DBTUX, GSN_TUX_BOUND_INFO,
- signal, TuxBoundInfo::SignalLength + boundAiLength);
- jamEntry();
- if (req->errorCode != 0) {
- jam();
- /*
- * Cannot use STORED_PROCREF to abort since even the REF
- * returns a stored proc id. So record error and continue.
- * The scan is already Invalid in TUX and returns empty set.
- */
- tcConnectptr.p->errorCode = req->errorCode;
- }
- }
- scanptr.p->scanState = ScanRecord::WAIT_STORED_PROC_SCAN;
- signal->theData[0] = tcConnectptr.p->tupConnectrec;
- signal->theData[1] = tcConnectptr.p->tableref;
- signal->theData[2] = scanptr.p->scanSchemaVersion;
- signal->theData[3] = ZSTORED_PROC_SCAN;
- signal->theData[4] = scanptr.p->scanAiLength;
- sendSignal(tcConnectptr.p->tcTupBlockref,
- GSN_STORED_PROCREQ, signal, 5, JBB);
- signal->theData[0] = tcConnectptr.p->tupConnectrec;
- AttrbufPtr regAttrinbufptr;
- regAttrinbufptr.i = tcConnectptr.p->firstAttrinbuf;
- while (regAttrinbufptr.i != RNIL) {
- ptrCheckGuard(regAttrinbufptr, cattrinbufFileSize, attrbuf);
- jam();
- Uint32 dataLen = regAttrinbufptr.p->attrbuf[ZINBUF_DATA_LEN];
- ndbrequire(dataLen != 0);
- // first 3 words already set in STORED_PROCREQ
- MEMCOPY_NO_WORDS(&signal->theData[3],
- ®Attrinbufptr.p->attrbuf[0],
- dataLen);
- sendSignal(tcConnectptr.p->tcTupBlockref,
- GSN_ATTRINFO, signal, dataLen + 3, JBB);
- regAttrinbufptr.i = regAttrinbufptr.p->attrbuf[ZINBUF_NEXT];
- }//while
- releaseOprec(signal);
- }//Dblqh::accScanConfScanLab()
- /* -------------------------------------------------------------------------
- * ENTER STORED_PROCCONF WITH
- * TC_CONNECTPTR,
- * TSTORED_PROC_ID
- * -------------------------------------------------------------------------
- * PRECONDITION: SCAN_STATE = WAIT_STORED_PROC_SCAN
- * ------------------------------------------------------------------------- */
- void Dblqh::storedProcConfScanLab(Signal* signal)
- {
- fragptr.i = tcConnectptr.p->fragmentptr;
- ptrCheckGuard(fragptr, cfragrecFileSize, fragrecord);
- if (scanptr.p->scanCompletedStatus == ZTRUE) {
- jam();
- // STOP THE SCAN PROCESS IF THIS HAS BEEN REQUESTED.
- closeScanLab(signal);
- return;
- }//if
- switch (fragptr.p->fragStatus) {
- case Fragrecord::FSACTIVE:
- jam();
- linkActiveFrag(signal);
- break;
- case Fragrecord::BLOCKED:
- jam();
- linkFragQueue(signal);
- tcConnectptr.p->transactionState = TcConnectionrec::SCAN_FIRST_STOPPED;
- return;
- break;
- case Fragrecord::FREE:
- jam();
- case Fragrecord::ACTIVE_CREATION:
- jam();
- case Fragrecord::CRASH_RECOVERING:
- jam();
- case Fragrecord::DEFINED:
- jam();
- case Fragrecord::REMOVING:
- jam();
- default:
- ndbrequire(false);
- break;
- }//switch
- continueFirstScanAfterBlockedLab(signal);
- }//Dblqh::storedProcConfScanLab()
- void Dblqh::continueFirstScanAfterBlockedLab(Signal* signal)
- {
- scanptr.i = tcConnectptr.p->tcScanRec;
- c_scanRecordPool.getPtr(scanptr);
- scanptr.p->scanState = ScanRecord::WAIT_NEXT_SCAN;
- init_acc_ptr_list(scanptr.p);
- signal->theData[0] = scanptr.p->scanAccPtr;
- signal->theData[1] = RNIL;
- signal->theData[2] = NextScanReq::ZSCAN_NEXT;
- if (! scanptr.p->rangeScan)
- sendSignal(tcConnectptr.p->tcAccBlockref, GSN_NEXT_SCANREQ, signal, 3, JBB);
- else
- sendSignal(tcConnectptr.p->tcTuxBlockref, GSN_NEXT_SCANREQ, signal, 3, JBB);
- return;
- }//Dblqh::continueFirstScanAfterBlockedLab()
- /* -------------------------------------------------------------------------
- * When executing a scan we must come up to the surface at times to make
- * sure we can quickly start local checkpoints.
- * ------------------------------------------------------------------------- */
- void Dblqh::execCHECK_LCP_STOP(Signal* signal)
- {
- jamEntry();
- scanptr.i = signal->theData[0];
- c_scanRecordPool.getPtr(scanptr);
- tcConnectptr.i = scanptr.p->scanTcrec;
- ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
- fragptr.i = tcConnectptr.p->fragmentptr;
- ptrCheckGuard(fragptr, cfragrecFileSize, fragrecord);
- if (signal->theData[1] == ZTRUE) {
- jam();
- releaseActiveFrag(signal);
- signal->theData[0] = ZCHECK_LCP_STOP_BLOCKED;
- signal->theData[1] = scanptr.i;
- sendSignalWithDelay(cownref, GSN_CONTINUEB, signal, 10, 2);
- signal->theData[0] = RNIL;
- return;
- }//if
- if (fragptr.p->fragStatus != Fragrecord::FSACTIVE) {
- ndbrequire(fragptr.p->fragStatus == Fragrecord::BLOCKED);
- releaseActiveFrag(signal);
- linkFragQueue(signal);
- tcConnectptr.p->transactionState = TcConnectionrec::SCAN_CHECK_STOPPED;
- signal->theData[0] = RNIL;
- }//if
- }//Dblqh::execCHECK_LCP_STOP()
- void Dblqh::checkLcpStopBlockedLab(Signal* signal)
- {
- switch (fragptr.p->fragStatus) {
- case Fragrecord::FSACTIVE:
- jam();
- linkActiveFrag(signal);
- continueAfterCheckLcpStopBlocked(signal);
- break;
- case Fragrecord::BLOCKED:
- jam();
- linkFragQueue(signal);
- tcConnectptr.p->transactionState = TcConnectionrec::SCAN_CHECK_STOPPED;
- return;
- break;
- case Fragrecord::FREE:
- jam();
- case Fragrecord::ACTIVE_CREATION:
- jam();
- case Fragrecord::CRASH_RECOVERING:
- jam();
- case Fragrecord::DEFINED:
- jam();
- case Fragrecord::REMOVING:
- jam();
- default:
- ndbrequire(false);
- }//switch
- }//Dblqh::checkLcpStopBlockedLab()
- void Dblqh::continueAfterCheckLcpStopBlocked(Signal* signal)
- {
- scanptr.i = tcConnectptr.p->tcScanRec;
- c_scanRecordPool.getPtr(scanptr);
- signal->theData[0] = scanptr.p->scanAccPtr;
- signal->theData[1] = AccCheckScan::ZNOT_CHECK_LCP_STOP;
- if (! scanptr.p->rangeScan)
- EXECUTE_DIRECT(DBACC, GSN_ACC_CHECK_SCAN, signal, 2);
- else
- EXECUTE_DIRECT(DBTUX, GSN_ACC_CHECK_SCAN, signal, 2);
- }//Dblqh::continueAfterCheckLcpStopBlocked()
- /* -------------------------------------------------------------------------
- * ENTER NEXT_SCANCONF
- * -------------------------------------------------------------------------
- * PRECONDITION: SCAN_STATE = WAIT_NEXT_SCAN
- * ------------------------------------------------------------------------- */
- void Dblqh::nextScanConfScanLab(Signal* signal)
- {
- NextScanConf * const nextScanConf = (NextScanConf *)&signal->theData[0];
- tcConnectptr.i = scanptr.p->scanTcrec;
- ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
- if (nextScanConf->fragId == RNIL) {
- jam();
- /* ---------------------------------------------------------------------
- * THERE ARE NO MORE TUPLES TO FETCH. IF WE HAVE ANY
- * OPERATIONS STILL NEEDING A LOCK WE REPORT TO THE
- * APPLICATION AND CLOSE THE SCAN WHEN THE NEXT SCAN
- * REQUEST IS RECEIVED. IF WE DO NOT HAVE ANY NEED FOR
- * LOCKS WE CAN CLOSE THE SCAN IMMEDIATELY.
- * --------------------------------------------------------------------- */
- releaseActiveFrag(signal);
- /*************************************************************
- * STOP THE SCAN PROCESS IF THIS HAS BEEN REQUESTED.
- ************************************************************ */
- if (!scanptr.p->scanLockHold)
- {
- jam();
- closeScanLab(signal);
- return;
- }
- if (scanptr.p->scanCompletedStatus == ZTRUE) {
- if ((scanptr.p->scanLockHold == ZTRUE) &&
- (scanptr.p->m_curr_batch_size_rows > 0)) {
- jam();
- scanptr.p->scanReleaseCounter = 1;
- scanReleaseLocksLab(signal);
- return;
- }//if
- jam();
- closeScanLab(signal);
- return;
- }//if
- if (scanptr.p->m_curr_batch_size_rows > 0) {
- jam();
- scanptr.p->scanCompletedStatus = ZTRUE;
- scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ;
- sendScanFragConf(signal, ZFALSE);
- return;
- }//if
- closeScanLab(signal);
- return;
- }//if
- // If accOperationPtr == RNIL no record was returned by ACC
- if (nextScanConf->accOperationPtr == RNIL) {
- jam();
- /*************************************************************
- * STOP THE SCAN PROCESS IF THIS HAS BEEN REQUESTED.
- ************************************************************ */
- if (scanptr.p->scanCompletedStatus == ZTRUE) {
- releaseActiveFrag(signal);
- if ((scanptr.p->scanLockHold == ZTRUE) &&
- (scanptr.p->m_curr_batch_size_rows > 0)) {
- jam();
- scanptr.p->scanReleaseCounter = 1;
- scanReleaseLocksLab(signal);
- return;
- }//if
- jam();
- closeScanLab(signal);
- return;
- }//if
- if (scanptr.p->m_curr_batch_size_rows > 0) {
- jam();
- releaseActiveFrag(signal);
- scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ;
- sendScanFragConf(signal, ZFALSE);
- return;
- }//if
- signal->theData[0] = scanptr.p->scanAccPtr;
- signal->theData[1] = AccCheckScan::ZCHECK_LCP_STOP;
- if (! scanptr.p->rangeScan)
- sendSignal(tcConnectptr.p->tcAccBlockref,
- GSN_ACC_CHECK_SCAN, signal, 2, JBB);
- else
- sendSignal(tcConnectptr.p->tcTuxBlockref,
- GSN_ACC_CHECK_SCAN, signal, 2, JBB);
- return;
- }//if
- jam();
- set_acc_ptr_in_scan_record(scanptr.p,
- scanptr.p->m_curr_batch_size_rows,
- nextScanConf->accOperationPtr);
- jam();
- scanptr.p->scanLocalref[0] = nextScanConf->localKey[0];
- scanptr.p->scanLocalref[1] = nextScanConf->localKey[1];
- scanptr.p->scanLocalFragid = nextScanConf->fragId;
- if (scanptr.p->scanKeyinfoFlag) {
- jam();
- tcConnectptr.p->primKeyLen = nextScanConf->keyLength;
- seizeTupkeybuf(signal);
- databufptr.p->data[0] = nextScanConf->key[0];
- databufptr.p->data[1] = nextScanConf->key[1];
- databufptr.p->data[2] = nextScanConf->key[2];
- databufptr.p->data[3] = nextScanConf->key[3];
- if (nextScanConf->keyLength > 4) {
- jam();
- tcConnectptr.p->save1 = 4;
- scanptr.p->scanState = ScanRecord::WAIT_SCAN_KEYINFO;
- return;
- }//if
- }//if
- jam();
- nextScanConfLoopLab(signal);
- }//Dblqh::nextScanConfScanLab()
- void Dblqh::nextScanConfLoopLab(Signal* signal)
- {
- /* ----------------------------------------------------------------------
- * STOP THE SCAN PROCESS IF THIS HAS BEEN REQUESTED.
- * ---------------------------------------------------------------------- */
- if (scanptr.p->scanCompletedStatus == ZTRUE) {
- jam();
- releaseActiveFrag(signal);
- releaseOprec(signal);
- if ((scanptr.p->scanLockHold == ZTRUE) &&
- (scanptr.p->m_curr_batch_size_rows > 0)) {
- jam();
- scanptr.p->scanReleaseCounter = 1;
- scanReleaseLocksLab(signal);
- return;
- }//if
- closeScanLab(signal);
- return;
- }//if
- jam();
- Uint32 tableRef;
- Uint32 tupFragPtr;
- Uint32 reqinfo = (scanptr.p->scanLockHold == ZFALSE);
- reqinfo = reqinfo + (tcConnectptr.p->operation << 6);
- reqinfo = reqinfo + (tcConnectptr.p->opExec << 10);
- tcConnectptr.p->transactionState = TcConnectionrec::SCAN_TUPKEY;
- fragptr.i = tcConnectptr.p->fragmentptr;
- ptrCheckGuard(fragptr, cfragrecFileSize, fragrecord);
- if (! scanptr.p->rangeScan) {
- tableRef = tcConnectptr.p->tableref;
- if (fragptr.p->fragId == scanptr.p->scanLocalFragid) {
- jam();
- tupFragPtr = fragptr.p->tupFragptr[0];
- } else {
- jam();
- tupFragPtr = fragptr.p->tupFragptr[1];
- }//if
- } else {
- jam();
- // for ordered index use primary table
- FragrecordPtr tFragPtr;
- tFragPtr.i = fragptr.p->tableFragptr;
- ptrCheckGuard(tFragPtr, cfragrecFileSize, fragrecord);
- tableRef = tFragPtr.p->tabRef;
- if (tFragPtr.p->fragId == scanptr.p->scanLocalFragid) {
- jam();
- tupFragPtr = tFragPtr.p->tupFragptr[0];
- } else {
- jam();
- tupFragPtr = tFragPtr.p->tupFragptr[1];
- }//if
- }
- {
- jam();
- TupKeyReq * const tupKeyReq = (TupKeyReq *)signal->getDataPtrSend();
- tupKeyReq->connectPtr = tcConnectptr.p->tupConnectrec;
- tupKeyReq->request = reqinfo;
- tupKeyReq->tableRef = tableRef;
- tupKeyReq->fragId = scanptr.p->scanLocalFragid;
- tupKeyReq->keyRef1 = scanptr.p->scanLocalref[0];
- tupKeyReq->keyRef2 = scanptr.p->scanLocalref[1];
- tupKeyReq->attrBufLen = 0;
- tupKeyReq->opRef = scanptr.p->scanApiOpPtr;
- tupKeyReq->applRef = scanptr.p->scanApiBlockref;
- tupKeyReq->schemaVersion = scanptr.p->scanSchemaVersion;
- tupKeyReq->storedProcedure = scanptr.p->scanStoredProcId;
- tupKeyReq->transId1 = tcConnectptr.p->transid[0];
- tupKeyReq->transId2 = tcConnectptr.p->transid[1];
- tupKeyReq->fragPtr = tupFragPtr;
- tupKeyReq->primaryReplica = (tcConnectptr.p->seqNoReplica == 0)?true:false;
- tupKeyReq->coordinatorTC = tcConnectptr.p->tcBlockref;
- tupKeyReq->tcOpIndex = tcConnectptr.p->tcOprec;
- tupKeyReq->savePointId = tcConnectptr.p->savePointId;
- Uint32 blockNo = refToBlock(tcConnectptr.p->tcTupBlockref);
- EXECUTE_DIRECT(blockNo, GSN_TUPKEYREQ, signal,
- TupKeyReq::SignalLength);
- }
- }
- /* -------------------------------------------------------------------------
- * RECEPTION OF FURTHER KEY INFORMATION WHEN KEY SIZE > 16 BYTES.
- * -------------------------------------------------------------------------
- * PRECONDITION: SCAN_STATE = WAIT_SCAN_KEYINFO
- * ------------------------------------------------------------------------- */
- bool Dblqh::keyinfoLab(Signal* signal, Uint32* dataPtr, Uint32 length)
- {
- tcConnectptr.i = scanptr.p->scanTcrec;
- ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
- Uint32 index = 0;
- do {
- jam();
- seizeTupkeybuf(signal);
- databufptr.p->data[0] = dataPtr[index];
- databufptr.p->data[1] = dataPtr[index + 1];
- databufptr.p->data[2] = dataPtr[index + 2];
- databufptr.p->data[3] = dataPtr[index + 3];
- index += 4;
- tcConnectptr.p->save1 = tcConnectptr.p->save1 + 4;
- if (tcConnectptr.p->save1 >= tcConnectptr.p->primKeyLen) {
- jam();
- return true;
- }//if
- if (index >= length) {
- jam();
- return false;
- }//if
- } while (index < 20);
- ndbrequire(false);
- return false;
- }//Dblqh::keyinfoLab()
- /* -------------------------------------------------------------------------
- * ENTER TUPKEYCONF
- * -------------------------------------------------------------------------
- * PRECONDITION: TRANSACTION_STATE = SCAN_TUPKEY
- * ------------------------------------------------------------------------- */
- void Dblqh::scanTupkeyConfLab(Signal* signal)
- {
- const TupKeyConf * conf = (TupKeyConf *)signal->getDataPtr();
- UintR tdata4 = conf->readLength;
- UintR tdata5 = conf->lastRow;
- tcConnectptr.p->transactionState = TcConnectionrec::SCAN_STATE_USED;
- scanptr.i = tcConnectptr.p->tcScanRec;
- releaseActiveFrag(signal);
- c_scanRecordPool.getPtr(scanptr);
- if (scanptr.p->scanCompletedStatus == ZTRUE) {
- /* ---------------------------------------------------------------------
- * STOP THE SCAN PROCESS IF THIS HAS BEEN REQUESTED.
- * --------------------------------------------------------------------- */
- releaseOprec(signal);
- if ((scanptr.p->scanLockHold == ZTRUE) &&
- (scanptr.p->m_curr_batch_size_rows > 0)) {
- jam();
- scanptr.p->scanReleaseCounter = 1;
- scanReleaseLocksLab(signal);
- return;
- }//if
- jam();
- closeScanLab(signal);
- return;
- }//if
- if (scanptr.p->scanKeyinfoFlag) {
- jam();
- sendKeyinfo20(signal, scanptr.p, tcConnectptr.p);
- releaseOprec(signal);
- tdata4 += tcConnectptr.p->primKeyLen;// Inform API about keyinfo len aswell
- }//if
- ndbrequire(scanptr.p->m_curr_batch_size_rows < MAX_PARALLEL_OP_PER_SCAN);
- scanptr.p->m_curr_batch_size_bytes+= tdata4;
- scanptr.p->m_curr_batch_size_rows++;
- scanptr.p->m_last_row = tdata5;
- if (scanptr.p->check_scan_batch_completed() | tdata5){
- if (scanptr.p->scanLockHold == ZTRUE) {
- jam();
- scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ;
- sendScanFragConf(signal, ZFALSE);
- return;
- } else {
- jam();
- scanptr.p->scanReleaseCounter = scanptr.p->m_curr_batch_size_rows;
- scanReleaseLocksLab(signal);
- return;
- }
- } else {
- if (scanptr.p->scanLockHold == ZTRUE) {
- jam();
- scanptr.p->scanFlag = NextScanReq::ZSCAN_NEXT;
- } else {
- jam();
- scanptr.p->scanFlag = NextScanReq::ZSCAN_NEXT_COMMIT;
- }
- }
- scanNextLoopLab(signal);
- }//Dblqh::scanTupkeyConfLab()
- void Dblqh::scanNextLoopLab(Signal* signal)
- {
- switch (fragptr.p->fragStatus) {
- case Fragrecord::FSACTIVE:
- jam();
- linkActiveFrag(signal);
- break;
- case Fragrecord::BLOCKED:
- jam();
- linkFragQueue(signal);
- tcConnectptr.p->transactionState = TcConnectionrec::SCAN_STOPPED;
- return;
- break;
- case Fragrecord::FREE:
- jam();
- case Fragrecord::ACTIVE_CREATION:
- jam();
- case Fragrecord::CRASH_RECOVERING:
- jam();
- case Fragrecord::DEFINED:
- jam();
- case Fragrecord::REMOVING:
- jam();
- default:
- ndbrequire(false);
- }//switch
- continueScanAfterBlockedLab(signal);
- }//Dblqh::scanNextLoopLab()
- void Dblqh::continueScanAfterBlockedLab(Signal* signal)
- {
- scanptr.i = tcConnectptr.p->tcScanRec;
- c_scanRecordPool.getPtr(scanptr);
- Uint32 accOpPtr;
- if (scanptr.p->scanFlag == NextScanReq::ZSCAN_NEXT_ABORT) {
- jam();
- scanptr.p->scanFlag = NextScanReq::ZSCAN_NEXT_COMMIT;
- accOpPtr= get_acc_ptr_from_scan_record(scanptr.p,
- scanptr.p->m_curr_batch_size_rows,
- false);
- scanptr.p->scan_acc_index--;
- } else if (scanptr.p->scanFlag == NextScanReq::ZSCAN_NEXT_COMMIT) {
- jam();
- accOpPtr= get_acc_ptr_from_scan_record(scanptr.p,
- scanptr.p->m_curr_batch_size_rows-1,
- false);
- } else {
- jam();
- accOpPtr = RNIL; // The value is not used in ACC
- }//if
- scanptr.p->scanState = ScanRecord::WAIT_NEXT_SCAN;
- signal->theData[0] = scanptr.p->scanAccPtr;
- signal->theData[1] = accOpPtr;
- signal->theData[2] = scanptr.p->scanFlag;
- if (! scanptr.p->rangeScan)
- sendSignal(tcConnectptr.p->tcAccBlockref, GSN_NEXT_SCANREQ, signal, 3,JBB);
- else
- sendSignal(tcConnectptr.p->tcTuxBlockref, GSN_NEXT_SCANREQ, signal, 3,JBB);
- }//Dblqh::continueScanAfterBlockedLab()
- /* -------------------------------------------------------------------------
- * ENTER TUPKEYREF WITH
- * TC_CONNECTPTR,
- * TERROR_CODE
- * -------------------------------------------------------------------------
- * PRECONDITION: TRANSACTION_STATE = SCAN_TUPKEY
- * ------------------------------------------------------------------------- */
- void Dblqh::scanTupkeyRefLab(Signal* signal)
- {
- tcConnectptr.p->transactionState = TcConnectionrec::SCAN_STATE_USED;
- scanptr.i = tcConnectptr.p->tcScanRec;
- releaseActiveFrag(signal);
- releaseOprec(signal);
- c_scanRecordPool.getPtr(scanptr);
- if (scanptr.p->scanCompletedStatus == ZTRUE) {
- /* ---------------------------------------------------------------------
- * STOP THE SCAN PROCESS IF THIS HAS BEEN REQUESTED.
- * --------------------------------------------------------------------- */
- if ((scanptr.p->scanLockHold == ZTRUE) &&
- (scanptr.p->m_curr_batch_size_rows > 0)) {
- jam();
- scanptr.p->scanReleaseCounter = 1;
- scanReleaseLocksLab(signal);
- return;
- }//if
- jam();
- closeScanLab(signal);
- return;
- }//if
- if ((terrorCode != ZSEARCH_CONDITION_FALSE) &&
- (terrorCode != ZNO_TUPLE_FOUND) &&
- (terrorCode >= ZUSER_ERROR_CODE_LIMIT)) {
- scanptr.p->scanErrorCounter++;
- tcConnectptr.p->errorCode = terrorCode;
- if (scanptr.p->scanLockHold == ZTRUE) {
- jam();
- scanptr.p->scanReleaseCounter = 1;
- } else {
- jam();
- scanptr.p->m_curr_batch_size_rows++;
- scanptr.p->scanReleaseCounter = scanptr.p->m_curr_batch_size_rows;
- }//if
- /* --------------------------------------------------------------------
- * WE NEED TO RELEASE ALL LOCKS CURRENTLY
- * HELD BY THIS SCAN.
- * -------------------------------------------------------------------- */
- scanReleaseLocksLab(signal);
- return;
- }//if
- Uint32 time_passed= tcConnectptr.p->tcTimer - cLqhTimeOutCount;
- if (scanptr.p->m_curr_batch_size_rows > 0) {
- if (time_passed > 1) {
- /* -----------------------------------------------------------------------
- * WE NEED TO ENSURE THAT WE DO NOT SEARCH FOR THE NEXT TUPLE FOR A
- * LONG TIME WHILE WE KEEP A LOCK ON A FOUND TUPLE. WE RATHER REPORT
- * THE FOUND TUPLE IF FOUND TUPLES ARE RARE. If more than 10 ms passed we
- * send the found tuples to the API.
- * ----------------------------------------------------------------------- */
- scanptr.p->scanReleaseCounter = scanptr.p->m_curr_batch_size_rows + 1;
- scanReleaseLocksLab(signal);
- return;
- }
- } else {
- if (time_passed > 10) {
- jam();
- signal->theData[0]= scanptr.i;
- signal->theData[1]= tcConnectptr.p->transid[0];
- signal->theData[2]= tcConnectptr.p->transid[1];
- execSCAN_HBREP(signal);
- }
- }
- scanptr.p->scanFlag = NextScanReq::ZSCAN_NEXT_ABORT;
- scanNextLoopLab(signal);
- }//Dblqh::scanTupkeyRefLab()
- /* -------------------------------------------------------------------------
- * THE SCAN HAS BEEN COMPLETED. EITHER BY REACHING THE END OR BY COMMAND
- * FROM THE APPLICATION OR BY SOME SORT OF ERROR CONDITION.
- * ------------------------------------------------------------------------- */
- void Dblqh::closeScanLab(Signal* signal)
- {
- fragptr.i = tcConnectptr.p->fragmentptr;
- ptrCheckGuard(fragptr, cfragrecFileSize, fragrecord);
- switch (fragptr.p->fragStatus) {
- case Fragrecord::FSACTIVE:
- jam();
- linkActiveFrag(signal);
- break;
- case Fragrecord::BLOCKED:
- jam();
- linkFragQueue(signal);
- tcConnectptr.p->transactionState = TcConnectionrec::SCAN_CLOSE_STOPPED;
- return;
- break;
- case Fragrecord::FREE:
- jam();
- case Fragrecord::ACTIVE_CREATION:
- jam();
- case Fragrecord::CRASH_RECOVERING:
- jam();
- case Fragrecord::DEFINED:
- jam();
- case Fragrecord::REMOVING:
- jam();
- default:
- ndbrequire(false);
- }//switch
- continueCloseScanAfterBlockedLab(signal);
- }//Dblqh::closeScanLab()
- void Dblqh::continueCloseScanAfterBlockedLab(Signal* signal)
- {
- tcConnectptr.p->transactionState = TcConnectionrec::SCAN_STATE_USED;
- scanptr.i = tcConnectptr.p->tcScanRec;
- c_scanRecordPool.getPtr(scanptr);
- scanptr.p->scanState = ScanRecord::WAIT_CLOSE_SCAN;
- signal->theData[0] = scanptr.p->scanAccPtr;
- signal->theData[1] = RNIL;
- signal->theData[2] = NextScanReq::ZSCAN_CLOSE;
- if (! scanptr.p->rangeScan)
- sendSignal(tcConnectptr.p->tcAccBlockref, GSN_NEXT_SCANREQ, signal, 3, JBB);
- else
- sendSignal(tcConnectptr.p->tcTuxBlockref, GSN_NEXT_SCANREQ, signal, 3, JBB);
- }//Dblqh::continueCloseScanAfterBlockedLab()
- /* -------------------------------------------------------------------------
- * ENTER NEXT_SCANCONF
- * -------------------------------------------------------------------------
- * PRECONDITION: SCAN_STATE = WAIT_CLOSE_SCAN
- * ------------------------------------------------------------------------- */
- void Dblqh::accScanCloseConfLab(Signal* signal)
- {
- tcConnectptr.i = scanptr.p->scanTcrec;
- scanptr.p->scanState = ScanRecord::WAIT_DELETE_STORED_PROC_ID_SCAN;
- ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
- signal->theData[0] = tcConnectptr.p->tupConnectrec;
- signal->theData[1] = tcConnectptr.p->tableref;
- signal->theData[2] = scanptr.p->scanSchemaVersion;
- signal->theData[3] = ZDELETE_STORED_PROC_ID;
- signal->theData[4] = scanptr.p->scanStoredProcId;
- sendSignal(tcConnectptr.p->tcTupBlockref,
- GSN_STORED_PROCREQ, signal, 5, JBB);
- }//Dblqh::accScanCloseConfLab()
- /* -------------------------------------------------------------------------
- * ENTER STORED_PROCCONF WITH
- * -------------------------------------------------------------------------
- * PRECONDITION: SCAN_STATE = WAIT_DELETE_STORED_PROC_ID_SCAN
- * ------------------------------------------------------------------------- */
- void Dblqh::tupScanCloseConfLab(Signal* signal)
- {
- fragptr.i = tcConnectptr.p->fragmentptr;
- ptrCheckGuard(fragptr, cfragrecFileSize, fragrecord);
- if (tcConnectptr.p->abortState == TcConnectionrec::NEW_FROM_TC) {
- jam();
- tcNodeFailptr.i = tcConnectptr.p->tcNodeFailrec;
- ptrCheckGuard(tcNodeFailptr, ctcNodeFailrecFileSize, tcNodeFailRecord);
- tcNodeFailptr.p->tcRecNow = tcConnectptr.i + 1;
- signal->theData[0] = ZLQH_TRANS_NEXT;
- signal->theData[1] = tcNodeFailptr.i;
- sendSignal(cownref, GSN_CONTINUEB, signal, 2, JBB);
- } else if (tcConnectptr.p->errorCode != 0) {
- jam();
- ScanFragRef * ref = (ScanFragRef*)&signal->theData[0];
- ref->senderData = tcConnectptr.p->clientConnectrec;
- ref->transId1 = tcConnectptr.p->transid[0];
- ref->transId2 = tcConnectptr.p->transid[1];
- ref->errorCode = tcConnectptr.p->errorCode;
- sendSignal(tcConnectptr.p->clientBlockref, GSN_SCAN_FRAGREF, signal,
- ScanFragRef::SignalLength, JBB);
- } else {
- jam();
- sendScanFragConf(signal, ZSCAN_FRAG_CLOSED);
- }//if
- finishScanrec(signal);
- releaseScanrec(signal);
- tcConnectptr.p->tcScanRec = RNIL;
- deleteTransidHash(signal);
- releaseOprec(signal);
- releaseTcrec(signal, tcConnectptr);
- }//Dblqh::tupScanCloseConfLab()
- /* =========================================================================
- * ======= INITIATE SCAN RECORD =======
- *
- * SUBROUTINE SHORT NAME = ISC
- * ========================================================================= */
- Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
- {
- const Uint32 reqinfo = scanFragReq->requestInfo;
- const Uint32 max_rows = scanFragReq->batch_size_rows;
- const Uint32 max_bytes = scanFragReq->batch_size_bytes;
- const Uint32 scanLockMode = ScanFragReq::getLockMode(reqinfo);
- const Uint32 scanLockHold = ScanFragReq::getHoldLockFlag(reqinfo);
- const Uint32 keyinfo = ScanFragReq::getKeyinfoFlag(reqinfo);
- const Uint32 readCommitted = ScanFragReq::getReadCommittedFlag(reqinfo);
- const Uint32 idx = ScanFragReq::getRangeScanFlag(reqinfo);
- const Uint32 attrLen = ScanFragReq::getAttrLen(reqinfo);
- const Uint32 scanPrio = ScanFragReq::getScanPrio(reqinfo);
- scanptr.p->scanKeyinfoFlag = keyinfo;
- scanptr.p->scanLockHold = scanLockHold;
- scanptr.p->scanCompletedStatus = ZFALSE;
- scanptr.p->scanType = ScanRecord::SCAN;
- scanptr.p->scanApiBlockref = scanFragReq->resultRef;
- scanptr.p->scanAiLength = attrLen;
- scanptr.p->scanTcrec = tcConnectptr.i;
- scanptr.p->scanSchemaVersion = scanFragReq->schemaVersion;
- scanptr.p->m_curr_batch_size_rows = 0;
- scanptr.p->m_curr_batch_size_bytes= 0;
- scanptr.p->m_max_batch_size_rows = max_rows;
- scanptr.p->m_max_batch_size_bytes = max_bytes;
- scanptr.p->scanErrorCounter = 0;
- scanptr.p->scanLockMode = scanLockMode;
- scanptr.p->readCommitted = readCommitted;
- scanptr.p->rangeScan = idx;
- scanptr.p->scanState = ScanRecord::SCAN_FREE;
- scanptr.p->scanFlag = ZFALSE;
- scanptr.p->scanLocalref[0] = 0;
- scanptr.p->scanLocalref[1] = 0;
- scanptr.p->scanLocalFragid = 0;
- scanptr.p->scanTcWaiting = ZTRUE;
- scanptr.p->scanNumber = ~0;
- scanptr.p->scanApiOpPtr = scanFragReq->clientOpPtr;
- scanptr.p->m_last_row = 0;
- if (max_rows == 0 || (max_bytes > 0 && max_rows > max_bytes)){
- jam();
- return ScanFragRef::ZWRONG_BATCH_SIZE;
- }
- if (!seize_acc_ptr_list(scanptr.p, max_rows)){
- jam();
- return ScanFragRef::ZTOO_MANY_ACTIVE_SCAN_ERROR;
- }
- /**
- * Used for scan take over
- */
- FragrecordPtr tFragPtr;
- tFragPtr.i = fragptr.p->tableFragptr;
- ptrCheckGuard(tFragPtr, cfragrecFileSize, fragrecord);
- scanptr.p->fragPtrI = fragptr.p->tableFragptr;
-
- /**
- * !idx uses 1 - (MAX_PARALLEL_SCANS_PER_FRAG - 1) = 1-11
- * idx uses from MAX_PARALLEL_SCANS_PER_FRAG - MAX = 12-42)
- */
- Uint32 start = (idx ? MAX_PARALLEL_SCANS_PER_FRAG : 1 );
- Uint32 stop = (idx ? MAX_PARALLEL_INDEX_SCANS_PER_FRAG : MAX_PARALLEL_SCANS_PER_FRAG - 1);
- stop += start;
- Uint32 free = tFragPtr.p->m_scanNumberMask.find(start);
-
- if(free == Fragrecord::ScanNumberMask::NotFound || free >= stop){
- jam();
-
- if(scanPrio == 0){
- jam();
- return ScanFragRef::ZTOO_MANY_ACTIVE_SCAN_ERROR;
- }
-
- /**
- * Put on queue
- */
- scanptr.p->scanState = ScanRecord::IN_QUEUE;
- LocalDLFifoList<ScanRecord> queue(c_scanRecordPool,
- fragptr.p->m_queuedScans);
- queue.add(scanptr);
- return ZOK;
- }
-
- scanptr.p->scanNumber = free;
- tFragPtr.p->m_scanNumberMask.clear(free);// Update mask
-
- LocalDLList<ScanRecord> active(c_scanRecordPool, fragptr.p->m_activeScans);
- active.add(scanptr);
- if(scanptr.p->scanKeyinfoFlag){
- jam();
- #ifdef VM_TRACE
- ScanRecordPtr tmp;
- ndbrequire(!c_scanTakeOverHash.find(tmp, * scanptr.p));
- #endif
- #ifdef TRACE_SCAN_TAKEOVER
- ndbout_c("adding (%d %d) table: %d fragId: %d frag.i: %d tableFragptr: %d",
- scanptr.p->scanNumber, scanptr.p->fragPtrI,
- tabptr.i, scanFragReq->fragmentNo, fragptr.i, fragptr.p->tableFragptr);
- #endif
- c_scanTakeOverHash.add(scanptr);
- }
- init_acc_ptr_list(scanptr.p);
- return ZOK;
- }
- /* =========================================================================
- * ======= INITIATE TC RECORD AT SCAN =======
- *
- * SUBROUTINE SHORT NAME = IST
- * ========================================================================= */
- void Dblqh::initScanTc(Signal* signal,
- Uint32 transid1,
- Uint32 transid2,
- Uint32 fragId,
- Uint32 nodeId)
- {
- tcConnectptr.p->transid[0] = transid1;
- tcConnectptr.p->transid[1] = transid2;
- tcConnectptr.p->tcScanRec = scanptr.i;
- tcConnectptr.p->tableref = tabptr.i;
- tcConnectptr.p->fragmentid = fragId;
- tcConnectptr.p->fragmentptr = fragptr.i;
- tcConnectptr.p->tcOprec = tcConnectptr.p->clientConnectrec;
- tcConnectptr.p->tcBlockref = tcConnectptr.p->clientBlockref;
- tcConnectptr.p->errorCode = 0;
- tcConnectptr.p->reclenAiLqhkey = 0;
- tcConnectptr.p->abortState = TcConnectionrec::ABORT_IDLE;
- tcConnectptr.p->nextReplica = nodeId;
- tcConnectptr.p->currTupAiLen = 0;
- tcConnectptr.p->opExec = 1;
- tcConnectptr.p->operation = ZREAD;
- tcConnectptr.p->listState = TcConnectionrec::NOT_IN_LIST;
- tcConnectptr.p->commitAckMarker = RNIL;
- tabptr.p->usageCount++;
- }//Dblqh::initScanTc()
- /* =========================================================================
- * ======= FINISH SCAN RECORD =======
- *
- * REMOVE SCAN RECORD FROM PER FRAGMENT LIST.
- * ========================================================================= */
- void Dblqh::finishScanrec(Signal* signal)
- {
- release_acc_ptr_list(scanptr.p);
- LocalDLFifoList<ScanRecord> queue(c_scanRecordPool,
- fragptr.p->m_queuedScans);
-
- if(scanptr.p->scanState == ScanRecord::IN_QUEUE){
- jam();
- queue.release(scanptr);
- return;
- }
- if(scanptr.p->scanKeyinfoFlag){
- jam();
- ScanRecordPtr tmp;
- #ifdef TRACE_SCAN_TAKEOVER
- ndbout_c("removing (%d %d)", scanptr.p->scanNumber, scanptr.p->fragPtrI);
- #endif
- c_scanTakeOverHash.remove(tmp, * scanptr.p);
- ndbrequire(tmp.p == scanptr.p);
- }
-
- LocalDLList<ScanRecord> scans(c_scanRecordPool, fragptr.p->m_activeScans);
- scans.release(scanptr);
-
- FragrecordPtr tFragPtr;
- tFragPtr.i = scanptr.p->fragPtrI;
- ptrCheckGuard(tFragPtr, cfragrecFileSize, fragrecord);
- const Uint32 scanNumber = scanptr.p->scanNumber;
- ndbrequire(!tFragPtr.p->m_scanNumberMask.get(scanNumber));
- ScanRecordPtr restart;
- /**
- * Start on of queued scans
- */
- if(scanNumber == NR_ScanNo || !queue.first(restart)){
- jam();
- tFragPtr.p->m_scanNumberMask.set(scanNumber);
- return;
- }
- if(ERROR_INSERTED(5034)){
- jam();
- tFragPtr.p->m_scanNumberMask.set(scanNumber);
- return;
- }
- ndbrequire(restart.p->scanState == ScanRecord::IN_QUEUE);
- ScanRecordPtr tmpScan = scanptr;
- TcConnectionrecPtr tmpTc = tcConnectptr;
-
- tcConnectptr.i = restart.p->scanTcrec;
- ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
- restart.p->scanNumber = scanNumber;
- queue.remove(restart);
- scans.add(restart);
- if(restart.p->scanKeyinfoFlag){
- jam();
- #ifdef VM_TRACE
- ScanRecordPtr tmp;
- ndbrequire(!c_scanTakeOverHash.find(tmp, * restart.p));
- #endif
- c_scanTakeOverHash.add(restart);
- #ifdef TRACE_SCAN_TAKEOVER
- ndbout_c("adding-r (%d %d)", restart.p->scanNumber, restart.p->fragPtrI);
- #endif
- }
- restart.p->scanState = ScanRecord::SCAN_FREE; // set in initScanRec
- if(tcConnectptr.p->transactionState == TcConnectionrec::SCAN_STATE_USED)
- {
- jam();
- scanptr = restart;
- continueAfterReceivingAllAiLab(signal);
- }
- else
- {
- ndbrequire(tcConnectptr.p->transactionState == TcConnectionrec::WAIT_SCAN_AI);
- }
- scanptr = tmpScan;
- tcConnectptr = tmpTc;
- }//Dblqh::finishScanrec()
- /* =========================================================================
- * ======= RELEASE SCAN RECORD =======
- *
- * RELEASE A SCAN RECORD TO THE FREELIST.
- * ========================================================================= */
- void Dblqh::releaseScanrec(Signal* signal)
- {
- scanptr.p->scanState = ScanRecord::SCAN_FREE;
- scanptr.p->scanType = ScanRecord::ST_IDLE;
- scanptr.p->scanTcWaiting = ZFALSE;
- cbookedAccOps -= scanptr.p->m_max_batch_size_rows;
- cscanNoFreeRec++;
- }//Dblqh::releaseScanrec()
- /* ------------------------------------------------------------------------
- * ------- SEND KEYINFO20 TO API -------
- *
- * ------------------------------------------------------------------------ */
- void Dblqh::sendKeyinfo20(Signal* signal,
- ScanRecord * scanP,
- TcConnectionrec * tcConP)
- {
- ndbrequire(scanP->m_curr_batch_size_rows < MAX_PARALLEL_OP_PER_SCAN);
- KeyInfo20 * keyInfo = (KeyInfo20 *)&signal->theData[0];
-
- DatabufPtr TdataBuf;
- TdataBuf.i = tcConP->firstTupkeybuf;
- Uint32 keyLen = tcConP->primKeyLen;
- const Uint32 dataBufSz = cdatabufFileSize;
-
- /**
- * Note that this code requires signal->theData to be big enough for
- * a entire key
- */
- ndbrequire(keyLen * 4 <= sizeof(signal->theData));
- const BlockReference ref = scanP->scanApiBlockref;
- const Uint32 scanOp = scanP->m_curr_batch_size_rows;
- const Uint32 nodeId = refToNode(ref);
- const bool connectedToNode = getNodeInfo(nodeId).m_connected;
- const Uint32 type = getNodeInfo(nodeId).m_type;
- const bool is_api = (type >= NodeInfo::API && type <= NodeInfo::REP);
- const bool old_dest = (getNodeInfo(nodeId).m_version < MAKE_VERSION(3,5,0));
- const bool longable = true; // TODO is_api && !old_dest;
- Uint32 * dst = keyInfo->keyData;
- dst += nodeId == getOwnNodeId() ? 0 : KeyInfo20::DataLength;
- /**
- * Copy keydata from data buffer into signal
- *
- */
- for(Uint32 i = 0; i < keyLen; i += 4){
- ptrCheckGuard(TdataBuf, dataBufSz, databuf);
- * dst++ = TdataBuf.p->data[0];
- * dst++ = TdataBuf.p->data[1];
- * dst++ = TdataBuf.p->data[2];
- * dst++ = TdataBuf.p->data[3];
- TdataBuf.i = TdataBuf.p->nextDatabuf;
- }
-
- keyInfo->clientOpPtr = scanP->scanApiOpPtr;
- keyInfo->keyLen = keyLen;
- keyInfo->scanInfo_Node = KeyInfo20::setScanInfo(scanOp,
- scanP->scanNumber)+
- (getOwnNodeId() << 20);
- keyInfo->transId1 = tcConP->transid[0];
- keyInfo->transId2 = tcConP->transid[1];
-
- Uint32 * src = signal->theData+25;
- if(connectedToNode){
- jam();
-
- if(nodeId != getOwnNodeId()){
- jam();
-
- if(keyLen <= KeyInfo20::DataLength || !longable) {
- while(keyLen > KeyInfo20::DataLength){
- jam();
- MEMCOPY_NO_WORDS(keyInfo->keyData, src, KeyInfo20::DataLength);
- sendSignal(ref, GSN_KEYINFO20, signal, 25, JBB);
- src += KeyInfo20::DataLength;;
- keyLen -= KeyInfo20::DataLength;
- }
-
- MEMCOPY_NO_WORDS(keyInfo->keyData, src, keyLen);
- sendSignal(ref, GSN_KEYINFO20, signal,
- KeyInfo20::HeaderLength+keyLen, JBB);
- return;
- }
-
- LinearSectionPtr ptr[3];
- ptr[0].p = src;
- ptr[0].sz = keyLen;
- sendSignal(ref, GSN_KEYINFO20, signal, KeyInfo20::HeaderLength,
- JBB, ptr, 1);
- return;
- }
-
- EXECUTE_DIRECT(refToBlock(ref), GSN_KEYINFO20, signal,
- KeyInfo20::HeaderLength + keyLen);
- jamEntry();
- return;
- }
-
- /**
- * If this node does not have a direct connection
- * to the receiving node we want to send the signals
- * routed via the node that controls this read
- */
- Uint32 routeBlockref = tcConP->clientBlockref;
-
- if(keyLen < KeyInfo20::DataLength || !longable){
- jam();
-
- while (keyLen > (KeyInfo20::DataLength - 1)) {
- jam();
- MEMCOPY_NO_WORDS(keyInfo->keyData, src, KeyInfo20::DataLength - 1);
- keyInfo->keyData[KeyInfo20::DataLength-1] = ref;
- sendSignal(routeBlockref, GSN_KEYINFO20_R, signal, 25, JBB);
- src += KeyInfo20::DataLength - 1;
- keyLen -= KeyInfo20::DataLength - 1;
- }
- MEMCOPY_NO_WORDS(keyInfo->keyData, src, keyLen);
- keyInfo->keyData[keyLen] = ref;
- sendSignal(routeBlockref, GSN_KEYINFO20_R, signal,
- KeyInfo20::HeaderLength+keyLen+1, JBB);
- return;
- }
- keyInfo->keyData[0] = ref;
- LinearSectionPtr ptr[3];
- ptr[0].p = src;
- ptr[0].sz = keyLen;
- sendSignal(routeBlockref, GSN_KEYINFO20_R, signal,
- KeyInfo20::HeaderLength+1, JBB, ptr, 1);
- return;
- }
-
- /* ------------------------------------------------------------------------
- * ------- SEND SCAN_FRAGCONF TO TC THAT CONTROLS THE SCAN -------
- *
- * ------------------------------------------------------------------------ */
- void Dblqh::sendScanFragConf(Signal* signal, Uint32 scanCompleted)
- {
- Uint32 completed_ops= scanptr.p->m_curr_batch_size_rows;
- Uint32 total_len= scanptr.p->m_curr_batch_size_bytes;
- scanptr.p->scanTcWaiting = ZFALSE;
- if(ERROR_INSERTED(5037)){
- CLEAR_ERROR_INSERT_VALUE;
- return;
- }
- ScanFragConf * conf = (ScanFragConf*)&signal->theData[0];
- NodeId tc_node_id= refToNode(tcConnectptr.p->clientBlockref);
- Uint32 trans_id1= tcConnectptr.p->transid[0];
- Uint32 trans_id2= tcConnectptr.p->transid[1];
- conf->senderData = tcConnectptr.p->clientConnectrec;
- conf->completedOps = completed_ops;
- conf->fragmentCompleted = scanCompleted;
- conf->transId1 = trans_id1;
- conf->transId2 = trans_id2;
- conf->total_len= total_len;
- sendSignal(tcConnectptr.p->clientBlockref, GSN_SCAN_FRAGCONF,
- signal, ScanFragConf::SignalLength, JBB);
-
- if(!scanptr.p->scanLockHold)
- {
- jam();
- scanptr.p->m_curr_batch_size_rows = 0;
- scanptr.p->m_curr_batch_size_bytes= 0;
- }
- }//Dblqh::sendScanFragConf()
- /* ######################################################################### */
- /* ####### NODE RECOVERY MODULE ####### */
- /* */
- /* ######################################################################### */
- /*---------------------------------------------------------------------------*/
- /* */
- /* THIS MODULE IS USED WHEN A NODE HAS FAILED. IT PERFORMS A COPY OF A */
- /* FRAGMENT TO A NEW REPLICA OF THE FRAGMENT. IT DOES ALSO SHUT DOWN ALL */
- /* CONNECTIONS TO THE FAILED NODE. */
- /*---------------------------------------------------------------------------*/
- void Dblqh::calculateHash(Signal* signal)
- {
- DatabufPtr locDatabufptr;
- UintR Ti;
- UintR Tdata0;
- UintR Tdata1;
- UintR Tdata2;
- UintR Tdata3;
- UintR* Tdata32;
- Uint64 Tdata[512];
- Tdata32 = (UintR*)&Tdata[0];
- Tdata0 = tcConnectptr.p->tupkeyData[0];
- Tdata1 = tcConnectptr.p->tupkeyData[1];
- Tdata2 = tcConnectptr.p->tupkeyData[2];
- Tdata3 = tcConnectptr.p->tupkeyData[3];
- Tdata32[0] = Tdata0;
- Tdata32[1] = Tdata1;
- Tdata32[2] = Tdata2;
- Tdata32[3] = Tdata3;
- locDatabufptr.i = tcConnectptr.p->firstTupkeybuf;
- Ti = 4;
- while (locDatabufptr.i != RNIL) {
- ptrCheckGuard(locDatabufptr, cdatabufFileSize, databuf);
- Tdata0 = locDatabufptr.p->data[0];
- Tdata1 = locDatabufptr.p->data[1];
- Tdata2 = locDatabufptr.p->data[2];
- Tdata3 = locDatabufptr.p->data[3];
- Tdata32[Ti ] = Tdata0;
- Tdata32[Ti + 1] = Tdata1;
- Tdata32[Ti + 2] = Tdata2;
- Tdata32[Ti + 3] = Tdata3;
- locDatabufptr.i = locDatabufptr.p->nextDatabuf;
- Ti += 4;
- }//while
- tcConnectptr.p->hashValue =
- md5_hash((Uint64*)&Tdata32[0], (UintR)tcConnectptr.p->primKeyLen);
- }//Dblqh::calculateHash()
- /* *************************************** */
- /* COPY_FRAGREQ: Start copying a fragment */
- /* *************************************** */
- void Dblqh::execCOPY_FRAGREQ(Signal* signal)
- {
- jamEntry();
- const CopyFragReq * const copyFragReq = (CopyFragReq *)&signal->theData[0];
- tabptr.i = copyFragReq->tableId;
- ptrCheckGuard(tabptr, ctabrecFileSize, tablerec);
- const Uint32 fragId = copyFragReq->fragId;
- const Uint32 copyPtr = copyFragReq->userPtr;
- const Uint32 userRef = copyFragReq->userRef;
- const Uint32 nodeId = copyFragReq->nodeId;
- ndbrequire(cnoActiveCopy < 3);
- ndbrequire(getFragmentrec(signal, fragId));
- ndbrequire(fragptr.p->copyFragState == ZIDLE);
- ndbrequire(cfirstfreeTcConrec != RNIL);
- ndbrequire(fragptr.p->m_scanNumberMask.get(NR_ScanNo));
- fragptr.p->fragDistributionKey = copyFragReq->distributionKey;
- if (DictTabInfo::isOrderedIndex(tabptr.p->tableType)) {
- jam();
- /**
- * Ordered index doesn't need to be copied
- */
- CopyFragConf * const conf = (CopyFragConf *)&signal->theData[0];
- conf->userPtr = copyPtr;
- conf->sendingNodeId = cownNodeid;
- conf->startingNodeId = nodeId;
- conf->tableId = tabptr.i;
- conf->fragId = fragId;
- sendSignal(userRef, GSN_COPY_FRAGCONF, signal,
- CopyFragConf::SignalLength, JBB);
- return;
- }//if
-
- LocalDLList<ScanRecord> scans(c_scanRecordPool, fragptr.p->m_activeScans);
- ndbrequire(scans.seize(scanptr));
- /* ------------------------------------------------------------------------- */
- // We keep track of how many operation records in ACC that has been booked.
- // Copy fragment has records always booked and thus need not book any. The
- // most operations in parallel use is the m_max_batch_size_rows.
- // This variable has to be set-up here since it is used by releaseScanrec
- // to unbook operation records in ACC.
- /* ------------------------------------------------------------------------- */
- scanptr.p->m_max_batch_size_rows = 0;
- scanptr.p->rangeScan = 0;
- seizeTcrec();
-
- /**
- * Remove implicit cast/usage of CopyFragReq
- */
- //initCopyrec(signal);
- scanptr.p->copyPtr = copyPtr;
- scanptr.p->scanType = ScanRecord::COPY;
- scanptr.p->scanApiBlockref = userRef;
- scanptr.p->scanNodeId = nodeId;
- scanptr.p->scanTcrec = tcConnectptr.i;
- scanptr.p->scanSchemaVersion = copyFragReq->schemaVersion;
- scanptr.p->scanCompletedStatus = ZFALSE;
- scanptr.p->scanErrorCounter = 0;
- scanptr.p->scanNumber = NR_ScanNo;
- scanptr.p->scanKeyinfoFlag = 0; // Don't put into hash
- scanptr.p->fragPtrI = fragptr.i;
- fragptr.p->m_scanNumberMask.clear(NR_ScanNo);
- initScanTc(signal,
- 0,
- (DBLQH << 20) + (cownNodeid << 8),
- fragId,
- copyFragReq->nodeId);
- cactiveCopy[cnoActiveCopy] = fragptr.i;
- cnoActiveCopy++;
- tcConnectptr.p->copyCountWords = 0;
- tcConnectptr.p->tcOprec = tcConnectptr.i;
- tcConnectptr.p->schemaVersion = scanptr.p->scanSchemaVersion;
- scanptr.p->scanState = ScanRecord::WAIT_ACC_COPY;
- AccScanReq * req = (AccScanReq*)&signal->theData[0];
- req->senderData = scanptr.i;
- req->senderRef = cownref;
- req->tableId = tabptr.i;
- req->fragmentNo = fragId;
- req->requestInfo = 0;
- AccScanReq::setLockMode(req->requestInfo, 0);
- AccScanReq::setKeyinfoFlag(req->requestInfo, 1);
- AccScanReq::setReadCommittedFlag(req->requestInfo, 0);
- req->transId1 = tcConnectptr.p->transid[0];
- req->transId2 = tcConnectptr.p->transid[1];
- req->savePointId = tcConnectptr.p->savePointId;
- sendSignal(tcConnectptr.p->tcAccBlockref, GSN_ACC_SCANREQ, signal,
- AccScanReq::SignalLength, JBB);
- return;
- }//Dblqh::execCOPY_FRAGREQ()
- void Dblqh::accScanConfCopyLab(Signal* signal)
- {
- AccScanConf * const accScanConf = (AccScanConf *)&signal->theData[0];
- tcConnectptr.i = scanptr.p->scanTcrec;
- ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
- /*--------------------------------------------------------------------------*/
- /* PRECONDITION: SCAN_STATE = WAIT_ACC_COPY */
- /*--------------------------------------------------------------------------*/
- if (accScanConf->flag == AccScanConf::ZEMPTY_FRAGMENT) {
- jam();
- /*---------------------------------------------------------------------------*/
- /* THE FRAGMENT WAS EMPTY. */
- /* REPORT SUCCESSFUL COPYING. */
- /*---------------------------------------------------------------------------*/
- tupCopyCloseConfLab(signal);
- return;
- }//if
- scanptr.p->scanAccPtr = accScanConf->accPtr;
- scanptr.p->scanState = ScanRecord::WAIT_STORED_PROC_COPY;
- signal->theData[0] = tcConnectptr.p->tupConnectrec;
- signal->theData[1] = tcConnectptr.p->tableref;
- signal->theData[2] = scanptr.p->scanSchemaVersion;
- signal->theData[3] = ZSTORED_PROC_COPY;
- // theData[4] is not used in TUP with ZSTORED_PROC_COPY
- sendSignal(tcConnectptr.p->tcTupBlockref, GSN_STORED_PROCREQ, signal, 5, JBB);
- return;
- }//Dblqh::accScanConfCopyLab()
- /*---------------------------------------------------------------------------*/
- /* ENTER STORED_PROCCONF WITH */
- /* TC_CONNECTPTR, */
- /* TSTORED_PROC_ID */
- /*---------------------------------------------------------------------------*/
- void Dblqh::storedProcConfCopyLab(Signal* signal)
- {
- /*---------------------------------------------------------------------------*/
- /* PRECONDITION: SCAN_STATE = WAIT_STORED_PROC_COPY */
- /*---------------------------------------------------------------------------*/
- fragptr.i = tcConnectptr.p->fragmentptr;
- ptrCheckGuard(fragptr, cfragrecFileSize, fragrecord);
- if (scanptr.p->scanCompletedStatus == ZTRUE) {
- jam();
- /*---------------------------------------------------------------------------*/
- /* THE COPY PROCESS HAVE BEEN COMPLETED, MOST LIKELY DUE TO A NODE FAILURE.*/
- /*---------------------------------------------------------------------------*/
- closeCopyLab(signal);
- return;
- }//if
- scanptr.i = tcConnectptr.p->tcScanRec;
- c_scanRecordPool.getPtr(scanptr);
- scanptr.p->scanState = ScanRecord::WAIT_NEXT_SCAN_COPY;
- switch (fragptr.p->fragStatus) {
- case Fragrecord::FSACTIVE:
- jam();
- linkActiveFrag(signal);
- break;
- case Fragrecord::BLOCKED:
- jam();
- linkFragQueue(signal);
- tcConnectptr.p->transactionState = TcConnectionrec::COPY_FIRST_STOPPED;
- return;
- break;
- case Fragrecord::FREE:
- jam();
- case Fragrecord::ACTIVE_CREATION:
- jam();
- case Fragrecord::CRASH_RECOVERING:
- jam();
- case Fragrecord::DEFINED:
- jam();
- case Fragrecord::REMOVING:
- jam();
- default:
- jam();
- systemErrorLab(signal);
- return;
- break;
- }//switch
- continueFirstCopyAfterBlockedLab(signal);
- return;
- }//Dblqh::storedProcConfCopyLab()
- void Dblqh::continueFirstCopyAfterBlockedLab(Signal* signal)
- {
- scanptr.i = tcConnectptr.p->tcScanRec;
- c_scanRecordPool.getPtr(scanptr);
- signal->theData[0] = scanptr.p->scanAccPtr;
- signal->theData[1] = RNIL;
- signal->theData[2] = NextScanReq::ZSCAN_NEXT;
- sendSignal(tcConnectptr.p->tcAccBlockref, GSN_NEXT_SCANREQ, signal, 3, JBB);
- return;
- }//Dblqh::continueFirstCopyAfterBlockedLab()
- /*---------------------------------------------------------------------------*/
- /* ENTER NEXT_SCANCONF WITH */
- /* SCANPTR, */
- /* TFRAGID, */
- /* TACC_OPPTR, */
- /* TLOCAL_KEY1, */
- /* TLOCAL_KEY2, */
- /* TKEY_LENGTH, */
- /* TKEY1, */
- /* TKEY2, */
- /* TKEY3, */
- /* TKEY4 */
- /*---------------------------------------------------------------------------*/
- /* PRECONDITION: SCAN_STATE = WAIT_NEXT_SCAN_COPY */
- /*---------------------------------------------------------------------------*/
- void Dblqh::nextScanConfCopyLab(Signal* signal)
- {
- NextScanConf * const nextScanConf = (NextScanConf *)&signal->theData[0];
- tcConnectptr.i = scanptr.p->scanTcrec;
- ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
- if (nextScanConf->fragId == RNIL) {
- jam();
- /*---------------------------------------------------------------------------*/
- /* THERE ARE NO MORE TUPLES TO FETCH. WE NEED TO CLOSE */
- /* THE COPY IN ACC AND DELETE THE STORED PROCEDURE IN TUP */
- /*---------------------------------------------------------------------------*/
- releaseActiveFrag(signal);
- if (tcConnectptr.p->copyCountWords == 0) {
- closeCopyLab(signal);
- return;
- }//if
- /*---------------------------------------------------------------------------*/
- // Wait until copying is completed also at the starting node before reporting
- // completion. Signal completion through scanCompletedStatus-flag.
- /*---------------------------------------------------------------------------*/
- scanptr.p->scanCompletedStatus = ZTRUE;
- return;
- }//if
- // If accOperationPtr == RNIL no record was returned by ACC
- if (nextScanConf->accOperationPtr == RNIL) {
- jam();
- signal->theData[0] = scanptr.p->scanAccPtr;
- signal->theData[1] = AccCheckScan::ZCHECK_LCP_STOP;
- sendSignal(tcConnectptr.p->tcAccBlockref, GSN_ACC_CHECK_SCAN, signal, 2, JBB);
- return;
- }
- set_acc_ptr_in_scan_record(scanptr.p, 0, nextScanConf->accOperationPtr);
- initCopyTc(signal);
- if (tcConnectptr.p->primKeyLen > 4) {
- jam();
- tcConnectptr.p->save1 = 4;
- scanptr.p->scanState = ScanRecord::WAIT_COPY_KEYINFO;
- return;
- }//if
- copySendTupkeyReqLab(signal);
- return;
- }//Dblqh::nextScanConfCopyLab()
- void Dblqh::copySendTupkeyReqLab(Signal* signal)
- {
- Uint32 reqinfo = 0;
- Uint32 tupFragPtr;
- reqinfo = reqinfo + (tcConnectptr.p->operation << 6);
- reqinfo = reqinfo + (tcConnectptr.p->opExec << 10);
- tcConnectptr.p->transactionState = TcConnectionrec::COPY_TUPKEY;
- scanptr.p->scanState = ScanRecord::WAIT_TUPKEY_COPY;
- fragptr.i = tcConnectptr.p->fragmentptr;
- ptrCheckGuard(fragptr, cfragrecFileSize, fragrecord);
- if (fragptr.p->fragId == scanptr.p->scanLocalFragid) {
- jam();
- tupFragPtr = fragptr.p->tupFragptr[0];
- } else {
- jam();
- tupFragPtr = fragptr.p->tupFragptr[1];
- }//if
- {
- TupKeyReq * const tupKeyReq = (TupKeyReq *)signal->getDataPtrSend();
- tupKeyReq->connectPtr = tcConnectptr.p->tupConnectrec;
- tupKeyReq->request = reqinfo;
- tupKeyReq->tableRef = tcConnectptr.p->tableref;
- tupKeyReq->fragId = scanptr.p->scanLocalFragid;
- tupKeyReq->keyRef1 = scanptr.p->scanLocalref[0];
- tupKeyReq->keyRef2 = scanptr.p->scanLocalref[1];
- tupKeyReq->attrBufLen = 0;