DbtcMain.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:428k
- * ourselves vicarious for the failed node.
- *--------------------------------------------------------------*/
- setApiConTimer(apiConnectptr.i, ctcTimer, __LINE__);
- signal->theData[0] = tcConnectptr.i;
- signal->theData[1] = apiConnectptr.p->transid[0];
- signal->theData[2] = apiConnectptr.p->transid[1];
- signal->theData[3] = hostptr.i;
- signal->theData[4] = ZFALSE;
- sendSignal(cownref, GSN_ABORTED, signal, 5, JBB);
- }//if
- }//if
- }//for
- }//if
- tcConnectptr.i = tcConnectptr.p->nextTcConnect;
- } while (1);
- }//Dbtc::sendAbortedAfterTimeout()
- void Dbtc::reportNodeFailed(Signal* signal, Uint32 nodeId)
- {
- DisconnectRep * const rep = (DisconnectRep *)&signal->theData[0];
- rep->nodeId = nodeId;
- rep->err = DisconnectRep::TcReportNodeFailed;
- sendSignal(QMGR_REF, GSN_DISCONNECT_REP, signal,
- DisconnectRep::SignalLength, JBB);
- }//Dbtc::reportNodeFailed()
- /*-------------------------------------------------*/
- /* Timeout-loop for scanned fragments. */
- /*-------------------------------------------------*/
- void Dbtc::timeOutLoopStartFragLab(Signal* signal, Uint32 TscanConPtr)
- {
- ScanFragRecPtr timeOutPtr[8];
- UintR tfragTimer[8];
- UintR texpiredTime[8];
- UintR TloopCount = 0;
- Uint32 TtcTimer = ctcTimer;
- while ((TscanConPtr + 8) < cscanFragrecFileSize) {
- jam();
- timeOutPtr[0].i = TscanConPtr + 0;
- timeOutPtr[1].i = TscanConPtr + 1;
- timeOutPtr[2].i = TscanConPtr + 2;
- timeOutPtr[3].i = TscanConPtr + 3;
- timeOutPtr[4].i = TscanConPtr + 4;
- timeOutPtr[5].i = TscanConPtr + 5;
- timeOutPtr[6].i = TscanConPtr + 6;
- timeOutPtr[7].i = TscanConPtr + 7;
-
- c_scan_frag_pool.getPtrForce(timeOutPtr[0]);
- c_scan_frag_pool.getPtrForce(timeOutPtr[1]);
- c_scan_frag_pool.getPtrForce(timeOutPtr[2]);
- c_scan_frag_pool.getPtrForce(timeOutPtr[3]);
- c_scan_frag_pool.getPtrForce(timeOutPtr[4]);
- c_scan_frag_pool.getPtrForce(timeOutPtr[5]);
- c_scan_frag_pool.getPtrForce(timeOutPtr[6]);
- c_scan_frag_pool.getPtrForce(timeOutPtr[7]);
- tfragTimer[0] = timeOutPtr[0].p->scanFragTimer;
- tfragTimer[1] = timeOutPtr[1].p->scanFragTimer;
- tfragTimer[2] = timeOutPtr[2].p->scanFragTimer;
- tfragTimer[3] = timeOutPtr[3].p->scanFragTimer;
- tfragTimer[4] = timeOutPtr[4].p->scanFragTimer;
- tfragTimer[5] = timeOutPtr[5].p->scanFragTimer;
- tfragTimer[6] = timeOutPtr[6].p->scanFragTimer;
- tfragTimer[7] = timeOutPtr[7].p->scanFragTimer;
- texpiredTime[0] = TtcTimer - tfragTimer[0];
- texpiredTime[1] = TtcTimer - tfragTimer[1];
- texpiredTime[2] = TtcTimer - tfragTimer[2];
- texpiredTime[3] = TtcTimer - tfragTimer[3];
- texpiredTime[4] = TtcTimer - tfragTimer[4];
- texpiredTime[5] = TtcTimer - tfragTimer[5];
- texpiredTime[6] = TtcTimer - tfragTimer[6];
- texpiredTime[7] = TtcTimer - tfragTimer[7];
-
- for (Uint32 Ti = 0; Ti < 8; Ti++) {
- jam();
- if (tfragTimer[Ti] != 0) {
- if (texpiredTime[Ti] > ctimeOutValue) {
- jam();
- DEBUG("Fragment timeout found:"<<
- " ctimeOutValue=" <<ctimeOutValue
- <<", texpiredTime="<<texpiredTime[Ti]<<endl
- <<" tfragTimer="<<tfragTimer[Ti]
- <<", ctcTimer="<<ctcTimer);
- timeOutFoundFragLab(signal, TscanConPtr + Ti);
- return;
- }//if
- }//if
- }//for
- TscanConPtr += 8;
- /*----------------------------------------------------------------*/
- /* We split the process up checking 1024 fragmentrecords at a time*/
- /* to maintain real time behaviour. */
- /*----------------------------------------------------------------*/
- if (TloopCount++ > 128 ) {
- jam();
- signal->theData[0] = TcContinueB::ZCONTINUE_TIME_OUT_FRAG_CONTROL;
- signal->theData[1] = TscanConPtr;
- sendSignal(cownref, GSN_CONTINUEB, signal, 2, JBB);
- return;
- }//if
- }//while
- for ( ; TscanConPtr < cscanFragrecFileSize; TscanConPtr++){
- jam();
- timeOutPtr[0].i = TscanConPtr;
- c_scan_frag_pool.getPtrForce(timeOutPtr[0]);
- if (timeOutPtr[0].p->scanFragTimer != 0) {
- texpiredTime[0] = ctcTimer - timeOutPtr[0].p->scanFragTimer;
- if (texpiredTime[0] > ctimeOutValue) {
- jam();
- DEBUG("Fragment timeout found:"<<
- " ctimeOutValue=" <<ctimeOutValue
- <<", texpiredTime="<<texpiredTime[0]<<endl
- <<" tfragTimer="<<tfragTimer[0]
- <<", ctcTimer="<<ctcTimer);
- timeOutFoundFragLab(signal, TscanConPtr);
- return;
- }//if
- }//if
- }//for
- ctimeOutCheckFragActive = TOCS_FALSE;
- return;
- }//timeOutLoopStartFragLab()
- /*--------------------------------------------------------------------------*/
- /*Handle the heartbeat signal from LQH in a scan process */
- // (Set timer on fragrec.)
- /*--------------------------------------------------------------------------*/
- void Dbtc::execSCAN_HBREP(Signal* signal)
- {
- jamEntry();
- scanFragptr.i = signal->theData[0];
- c_scan_frag_pool.getPtr(scanFragptr);
- switch (scanFragptr.p->scanFragState){
- case ScanFragRec::LQH_ACTIVE:
- break;
- default:
- DEBUG("execSCAN_HBREP: scanFragState="<<scanFragptr.p->scanFragState);
- systemErrorLab(signal);
- break;
- }
- ScanRecordPtr scanptr;
- scanptr.i = scanFragptr.p->scanRec;
- ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord);
- apiConnectptr.i = scanptr.p->scanApiRec;
- ptrCheckGuard(apiConnectptr, capiConnectFilesize, apiConnectRecord);
- if (!(apiConnectptr.p->transid[0] == signal->theData[1] &&
- apiConnectptr.p->transid[1] == signal->theData[2])){
- jam();
- /**
- * Send signal back to sender so that the crash occurs there
- */
- // Save original transid
- signal->theData[3] = signal->theData[0];
- signal->theData[4] = signal->theData[1];
- // Set transid to illegal values
- signal->theData[1] = RNIL;
- signal->theData[2] = RNIL;
-
- sendSignal(signal->senderBlockRef(), GSN_SCAN_HBREP, signal, 5, JBA);
- DEBUG("SCAN_HBREP with wrong transid("
- <<signal->theData[3]<<", "<<signal->theData[4]<<")");
- return;
- }//if
- // Update timer on ScanFragRec
- if (scanFragptr.p->scanFragTimer != 0){
- updateBuddyTimer(apiConnectptr);
- scanFragptr.p->startFragTimer(ctcTimer);
- } else {
- ndbassert(false);
- DEBUG("SCAN_HBREP when scanFragTimer was turned off");
- }
- }//execSCAN_HBREP()
- /*--------------------------------------------------------------------------*/
- /* Timeout has occured on a fragment which means a scan has timed out. */
- /* If this is true we have an error in LQH/ACC. */
- /*--------------------------------------------------------------------------*/
- void Dbtc::timeOutFoundFragLab(Signal* signal, UintR TscanConPtr)
- {
- ScanFragRecPtr ptr;
- c_scan_frag_pool.getPtr(ptr, TscanConPtr);
- DEBUG(TscanConPtr << " timeOutFoundFragLab: scanFragState = "<< ptr.p->scanFragState);
- /*-------------------------------------------------------------------------*/
- // The scan fragment has expired its timeout. Check its state to decide
- // what to do.
- /*-------------------------------------------------------------------------*/
- switch (ptr.p->scanFragState) {
- case ScanFragRec::WAIT_GET_PRIMCONF:
- jam();
- ndbrequire(false);
- break;
- case ScanFragRec::LQH_ACTIVE:{
- jam();
- /**
- * The LQH expired it's timeout, try to close it
- */
- Uint32 nodeId = refToNode(ptr.p->lqhBlockref);
- Uint32 connectCount = getNodeInfo(nodeId).m_connectCount;
- ScanRecordPtr scanptr;
- scanptr.i = ptr.p->scanRec;
- ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord);
- if(connectCount != ptr.p->m_connectCount){
- jam();
- /**
- * The node has died
- */
- ptr.p->scanFragState = ScanFragRec::COMPLETED;
- ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags);
-
- run.release(ptr);
- ptr.p->stopFragTimer();
- }
-
- scanError(signal, scanptr, ZSCAN_FRAG_LQH_ERROR);
- break;
- }
- case ScanFragRec::DELIVERED:
- jam();
- case ScanFragRec::IDLE:
- jam();
- case ScanFragRec::QUEUED_FOR_DELIVERY:
- jam();
- /*-----------------------------------------------------------------------
- * Should never occur. We will simply report set the timer to zero and
- * continue. In a debug version we should crash here but not in a release
- * version. In a release version we will simply set the time-out to zero.
- *-----------------------------------------------------------------------*/
- #ifdef VM_TRACE
- systemErrorLab(signal);
- #endif
- scanFragptr.p->stopFragTimer();
- break;
- default:
- jam();
- /*-----------------------------------------------------------------------
- * Non-existent state. Crash.
- *-----------------------------------------------------------------------*/
- systemErrorLab(signal);
- break;
- }//switch
-
- signal->theData[0] = TcContinueB::ZCONTINUE_TIME_OUT_FRAG_CONTROL;
- signal->theData[1] = TscanConPtr + 1;
- sendSignal(cownref, GSN_CONTINUEB, signal, 2, JBB);
- return;
- }//timeOutFoundFragLab()
- /*
- 4.3.16 GCP_NOMORETRANS
- ----------------------
- */
- /*****************************************************************************
- * G C P _ N O M O R E T R A N S
- *
- * WHEN DBTC RECEIVES SIGNAL GCP_NOMORETRANS A CHECK IS DONE TO FIND OUT IF
- * THERE ARE ANY GLOBAL CHECKPOINTS GOING ON - CFIRSTGCP /= RNIL. DBTC THEN
- * SEARCHES THE GCP_RECORD FILE TO FIND OUT IF THERE ARE ANY TRANSACTIONS NOT
- * CONCLUDED WITH THIS SPECIFIC CHECKPOINT - GCP_PTR:GCP_ID = TCHECK_GCP_ID.
- * FOR EACH TRANSACTION WHERE API_CONNECTSTATE EQUALS PREPARED, COMMITTING,
- * COMMITTED OR COMPLETING SIGNAL CONTINUEB IS SENT WITH A DELAY OF 100 MS,
- * THE COUNTER GCP_PTR:OUTSTANDINGAPI IS INCREASED. WHEN CONTINUEB IS RECEIVED
- * THE COUNTER IS DECREASED AND A CHECK IS DONE TO FIND OUT IF ALL
- * TRANSACTIONS ARE CONCLUDED. IF SO, SIGNAL GCP_TCFINISHED IS SENT.
- *****************************************************************************/
- void Dbtc::execGCP_NOMORETRANS(Signal* signal)
- {
- jamEntry();
- tcheckGcpId = signal->theData[1];
- if (cfirstgcp != RNIL) {
- jam();
- /* A GLOBAL CHECKPOINT IS GOING ON */
- gcpPtr.i = cfirstgcp; /* SET POINTER TO FIRST GCP IN QUEUE*/
- ptrCheckGuard(gcpPtr, cgcpFilesize, gcpRecord);
- if (gcpPtr.p->gcpId == tcheckGcpId) {
- jam();
- if (gcpPtr.p->firstApiConnect != RNIL) {
- jam();
- gcpPtr.p->gcpNomoretransRec = ZTRUE;
- } else {
- jam();
- gcpTcfinished(signal);
- unlinkGcp(signal);
- }//if
- } else {
- jam();
- /*------------------------------------------------------------*/
- /* IF IT IS NOT THE FIRST THEN THERE SHOULD BE NO */
- /* RECORD FOR THIS GLOBAL CHECKPOINT. WE ALWAYS REMOVE */
- /* THE GLOBAL CHECKPOINTS IN ORDER. */
- /*------------------------------------------------------------*/
- gcpTcfinished(signal);
- }//if
- } else {
- jam();
- gcpTcfinished(signal);
- }//if
- return;
- }//Dbtc::execGCP_NOMORETRANS()
- /*****************************************************************************/
- /* */
- /* TAKE OVER MODULE */
- /* */
- /*****************************************************************************/
- /* */
- /* THIS PART OF TC TAKES OVER THE COMMIT/ABORT OF TRANSACTIONS WHERE THE */
- /* NODE ACTING AS TC HAVE FAILED. IT STARTS BY QUERYING ALL NODES ABOUT */
- /* ANY OPERATIONS PARTICIPATING IN A TRANSACTION WHERE THE TC NODE HAVE */
- /* FAILED. */
- /* */
- /* AFTER RECEIVING INFORMATION FROM ALL NODES ABOUT OPERATION STATUS THIS */
- /* CODE WILL ENSURE THAT ALL AFFECTED TRANSACTIONS ARE PROPERLY ABORTED OR*/
- /* COMMITTED. THE ORIGINATING APPLICATION NODE WILL ALSO BE CONTACTED. */
- /* IF THE ORIGINATING APPLICATION ALSO FAILED THEN THERE IS CURRENTLY NO */
- /* WAY TO FIND OUT WHETHER A TRANSACTION WAS PERFORMED OR NOT. */
- /*****************************************************************************/
- void Dbtc::execNODE_FAILREP(Signal* signal)
- {
- HostRecordPtr tmpHostptr;
- jamEntry();
- NodeFailRep * const nodeFail = (NodeFailRep *)&signal->theData[0];
- cfailure_nr = nodeFail->failNo;
- const Uint32 tnoOfNodes = nodeFail->noOfNodes;
- const Uint32 tnewMasterId = nodeFail->masterNodeId;
-
- arrGuard(tnoOfNodes, MAX_NDB_NODES);
- int index = 0;
- for (unsigned i = 1; i< MAX_NDB_NODES; i++) {
- if(NodeBitmask::get(nodeFail->theNodes, i)){
- cdata[index] = i;
- index++;
- }//if
- }//for
- tcNodeFailptr.i = 0;
- ptrAss(tcNodeFailptr, tcFailRecord);
- Uint32 tindex;
- for (tindex = 0; tindex < tnoOfNodes; tindex++) {
- jam();
- hostptr.i = cdata[tindex];
- ptrCheckGuard(hostptr, chostFilesize, hostRecord);
- /*------------------------------------------------------------*/
- /* SET STATUS OF THE FAILED NODE TO DEAD SINCE IT HAS */
- /* FAILED. */
- /*------------------------------------------------------------*/
- hostptr.p->hostStatus = HS_DEAD;
- if (hostptr.p->takeOverStatus == TOS_COMPLETED) {
- jam();
- /*------------------------------------------------------------*/
- /* A VERY UNUSUAL SITUATION. THE TAKE OVER WAS COMPLETED*/
- /* EVEN BEFORE WE HEARD ABOUT THE NODE FAILURE REPORT. */
- /* HOWEVER UNUSUAL THIS SITUATION IS POSSIBLE. */
- /*------------------------------------------------------------*/
- /* RELEASE THE CURRENTLY UNUSED LQH CONNECTIONS. THE */
- /* REMAINING WILL BE RELEASED WHEN THE TRANSACTION THAT */
- /* USED THEM IS COMPLETED. */
- /*------------------------------------------------------------*/
- {
- NFCompleteRep * const nfRep = (NFCompleteRep *)&signal->theData[0];
- nfRep->blockNo = DBTC;
- nfRep->nodeId = cownNodeid;
- nfRep->failedNodeId = hostptr.i;
- }
- sendSignal(cdihblockref, GSN_NF_COMPLETEREP, signal,
- NFCompleteRep::SignalLength, JBB);
- } else {
- ndbrequire(hostptr.p->takeOverStatus == TOS_IDLE);
- hostptr.p->takeOverStatus = TOS_NODE_FAILED;
- }//if
-
- if (tcNodeFailptr.p->failStatus == FS_LISTENING) {
- jam();
- /*------------------------------------------------------------*/
- /* THE CURRENT TAKE OVER CAN BE AFFECTED BY THIS NODE */
- /* FAILURE. */
- /*------------------------------------------------------------*/
- if (hostptr.p->lqhTransStatus == LTS_ACTIVE) {
- jam();
- /*------------------------------------------------------------*/
- /* WE WERE WAITING FOR THE FAILED NODE IN THE TAKE OVER */
- /* PROTOCOL FOR TC. */
- /*------------------------------------------------------------*/
- signal->theData[0] = TcContinueB::ZNODE_TAKE_OVER_COMPLETED;
- signal->theData[1] = hostptr.i;
- sendSignal(cownref, GSN_CONTINUEB, signal, 2, JBB);
- }//if
- }//if
-
- }//for
- const bool masterFailed = (cmasterNodeId != tnewMasterId);
- cmasterNodeId = tnewMasterId;
- if(getOwnNodeId() == cmasterNodeId && masterFailed){
- /**
- * Master has failed and I'm the new master
- */
- jam();
-
- for (hostptr.i = 1; hostptr.i < MAX_NDB_NODES; hostptr.i++) {
- jam();
- ptrAss(hostptr, hostRecord);
- if (hostptr.p->hostStatus != HS_ALIVE) {
- jam();
- if (hostptr.p->takeOverStatus == TOS_COMPLETED) {
- jam();
- /*------------------------------------------------------------*/
- /* SEND TAKE OVER CONFIRMATION TO ALL ALIVE NODES IF */
- /* TAKE OVER IS COMPLETED. THIS IS PERFORMED TO ENSURE */
- /* THAT ALL NODES AGREE ON THE IDLE STATE OF THE TAKE */
- /* OVER. THIS MIGHT BE MISSED IN AN ERROR SITUATION IF */
- /* MASTER FAILS AFTER SENDING CONFIRMATION TO NEW */
- /* MASTER BUT FAILING BEFORE SENDING TO ANOTHER NODE */
- /* WHICH WAS NOT MASTER. IF THIS NODE LATER BECOMES */
- /* MASTER IT MIGHT START A NEW TAKE OVER EVEN AFTER THE */
- /* CRASHED NODE HAVE ALREADY RECOVERED. */
- /*------------------------------------------------------------*/
- for(tmpHostptr.i = 1; tmpHostptr.i < MAX_NDB_NODES;tmpHostptr.i++) {
- jam();
- ptrAss(tmpHostptr, hostRecord);
- if (tmpHostptr.p->hostStatus == HS_ALIVE) {
- jam();
- tblockref = calcTcBlockRef(tmpHostptr.i);
- signal->theData[0] = hostptr.i;
- sendSignal(tblockref, GSN_TAKE_OVERTCCONF, signal, 1, JBB);
- }//if
- }//for
- }//if
- }//if
- }//for
- }
- if(getOwnNodeId() == cmasterNodeId){
- jam();
- for (hostptr.i = 1; hostptr.i < MAX_NDB_NODES; hostptr.i++) {
- jam();
- ptrAss(hostptr, hostRecord);
- if (hostptr.p->hostStatus != HS_ALIVE) {
- jam();
- if (hostptr.p->takeOverStatus == TOS_NODE_FAILED) {
- jam();
- /*------------------------------------------------------------*/
- /* CONCLUDE ALL ACTIVITIES THE FAILED TC DID CONTROL */
- /* SINCE WE ARE THE MASTER. THIS COULD HAVE BEEN STARTED*/
- /* BY A PREVIOUS MASTER BUT HAVE NOT BEEN CONCLUDED YET.*/
- /*------------------------------------------------------------*/
- hostptr.p->takeOverStatus = TOS_ACTIVE;
- signal->theData[0] = hostptr.i;
- sendSignal(cownref, GSN_TAKE_OVERTCREQ, signal, 1, JBB);
- }//if
- }//if
- }//for
- }//if
- for (tindex = 0; tindex < tnoOfNodes; tindex++) {
- jam();
- hostptr.i = cdata[tindex];
- ptrCheckGuard(hostptr, chostFilesize, hostRecord);
- /*------------------------------------------------------------*/
- /* LOOP THROUGH AND ABORT ALL SCANS THAT WHERE */
- /* CONTROLLED BY THIS TC AND ACTIVE IN THE FAILED */
- /* NODE'S LQH */
- /*------------------------------------------------------------*/
- checkScanActiveInFailedLqh(signal, 0, hostptr.i);
- checkWaitDropTabFailedLqh(signal, hostptr.i, 0); // nodeid, tableid
- }//for
- }//Dbtc::execNODE_FAILREP()
- void Dbtc::checkScanActiveInFailedLqh(Signal* signal,
- Uint32 scanPtrI,
- Uint32 failedNodeId){
- ScanRecordPtr scanptr;
- for (scanptr.i = scanPtrI; scanptr.i < cscanrecFileSize; scanptr.i++) {
- jam();
- ptrAss(scanptr, scanRecord);
- bool found = false;
- if (scanptr.p->scanState != ScanRecord::IDLE){
- jam();
- ScanFragRecPtr ptr;
- ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags);
-
- for(run.first(ptr); !ptr.isNull(); ){
- jam();
- ScanFragRecPtr curr = ptr;
- run.next(ptr);
- if (curr.p->scanFragState == ScanFragRec::LQH_ACTIVE &&
- refToNode(curr.p->lqhBlockref) == failedNodeId){
- jam();
-
- run.release(curr);
- curr.p->scanFragState = ScanFragRec::COMPLETED;
- curr.p->stopFragTimer();
- found = true;
- }
- }
- }
- if(found){
- jam();
- scanError(signal, scanptr, ZSCAN_LQH_ERROR);
- }
- // Send CONTINUEB to continue later
- signal->theData[0] = TcContinueB::ZCHECK_SCAN_ACTIVE_FAILED_LQH;
- signal->theData[1] = scanptr.i + 1; // Check next scanptr
- signal->theData[2] = failedNodeId;
- sendSignal(cownref, GSN_CONTINUEB, signal, 3, JBB);
- return;
- }//for
- }
- void
- Dbtc::checkScanFragList(Signal* signal,
- Uint32 failedNodeId,
- ScanRecord * scanP,
- ScanFragList::Head & head){
-
- DEBUG("checkScanActiveInFailedLqh: scanFragError");
- }
- void Dbtc::execTAKE_OVERTCCONF(Signal* signal)
- {
- jamEntry();
- tfailedNodeId = signal->theData[0];
- hostptr.i = tfailedNodeId;
- ptrCheckGuard(hostptr, chostFilesize, hostRecord);
- switch (hostptr.p->takeOverStatus) {
- case TOS_IDLE:
- jam();
- /*------------------------------------------------------------*/
- /* THIS MESSAGE ARRIVED EVEN BEFORE THE NODE_FAILREP */
- /* MESSAGE. THIS IS POSSIBLE IN EXTREME SITUATIONS. */
- /* WE SET THE STATE TO TAKE_OVER_COMPLETED AND WAIT */
- /* FOR THE NODE_FAILREP MESSAGE. */
- /*------------------------------------------------------------*/
- hostptr.p->takeOverStatus = TOS_COMPLETED;
- break;
- case TOS_NODE_FAILED:
- case TOS_ACTIVE:
- jam();
- /*------------------------------------------------------------*/
- /* WE ARE NOT MASTER AND THE TAKE OVER IS ACTIVE OR WE */
- /* ARE MASTER AND THE TAKE OVER IS ACTIVE. IN BOTH */
- /* WE SET THE STATE TO TAKE_OVER_COMPLETED. */
- /*------------------------------------------------------------*/
- /* RELEASE THE CURRENTLY UNUSED LQH CONNECTIONS. THE */
- /* REMAINING WILL BE RELEASED WHEN THE TRANSACTION THAT */
- /* USED THEM IS COMPLETED. */
- /*------------------------------------------------------------*/
- hostptr.p->takeOverStatus = TOS_COMPLETED;
- {
- NFCompleteRep * const nfRep = (NFCompleteRep *)&signal->theData[0];
- nfRep->blockNo = DBTC;
- nfRep->nodeId = cownNodeid;
- nfRep->failedNodeId = hostptr.i;
- }
- sendSignal(cdihblockref, GSN_NF_COMPLETEREP, signal,
- NFCompleteRep::SignalLength, JBB);
- break;
- case TOS_COMPLETED:
- jam();
- /*------------------------------------------------------------*/
- /* WE HAVE ALREADY RECEIVED THE CONF SIGNAL. IT IS MOST */
- /* LIKELY SENT FROM A NEW MASTER WHICH WASN'T SURE IF */
- /* THIS NODE HEARD THE CONF SIGNAL FROM THE OLD MASTER. */
- /* WE SIMPLY IGNORE THE MESSAGE. */
- /*------------------------------------------------------------*/
- /*empty*/;
- break;
- default:
- jam();
- systemErrorLab(signal);
- return;
- }//switch
- }//Dbtc::execTAKE_OVERTCCONF()
- void Dbtc::execTAKE_OVERTCREQ(Signal* signal)
- {
- jamEntry();
- tfailedNodeId = signal->theData[0];
- tcNodeFailptr.i = 0;
- ptrAss(tcNodeFailptr, tcFailRecord);
- if (tcNodeFailptr.p->failStatus != FS_IDLE) {
- jam();
- /*------------------------------------------------------------*/
- /* WE CAN CURRENTLY ONLY HANDLE ONE TAKE OVER AT A TIME */
- /*------------------------------------------------------------*/
- /* IF MORE THAN ONE TAKE OVER IS REQUESTED WE WILL */
- /* QUEUE THE TAKE OVER AND START IT AS SOON AS THE */
- /* PREVIOUS ARE COMPLETED. */
- /*------------------------------------------------------------*/
- arrGuard(tcNodeFailptr.p->queueIndex, MAX_NDB_NODES);
- tcNodeFailptr.p->queueList[tcNodeFailptr.p->queueIndex] = tfailedNodeId;
- tcNodeFailptr.p->queueIndex = tcNodeFailptr.p->queueIndex + 1;
- return;
- }//if
- startTakeOverLab(signal);
- }//Dbtc::execTAKE_OVERTCREQ()
- /*------------------------------------------------------------*/
- /* INITIALISE THE HASH TABLES FOR STORING TRANSACTIONS */
- /* AND OPERATIONS DURING TC TAKE OVER. */
- /*------------------------------------------------------------*/
- void Dbtc::startTakeOverLab(Signal* signal)
- {
- for (tindex = 0; tindex <= 511; tindex++) {
- ctransidFailHash[tindex] = RNIL;
- }//for
- for (tindex = 0; tindex <= 1023; tindex++) {
- ctcConnectFailHash[tindex] = RNIL;
- }//for
- tcNodeFailptr.p->failStatus = FS_LISTENING;
- tcNodeFailptr.p->takeOverNode = tfailedNodeId;
- for (hostptr.i = 1; hostptr.i < MAX_NDB_NODES; hostptr.i++) {
- jam();
- ptrAss(hostptr, hostRecord);
- if (hostptr.p->hostStatus == HS_ALIVE) {
- jam();
- tblockref = calcLqhBlockRef(hostptr.i);
- hostptr.p->lqhTransStatus = LTS_ACTIVE;
- signal->theData[0] = tcNodeFailptr.i;
- signal->theData[1] = cownref;
- signal->theData[2] = tfailedNodeId;
- sendSignal(tblockref, GSN_LQH_TRANSREQ, signal, 3, JBB);
- }//if
- }//for
- }//Dbtc::startTakeOverLab()
- /*------------------------------------------------------------*/
- /* A REPORT OF AN OPERATION WHERE TC FAILED HAS ARRIVED.*/
- /*------------------------------------------------------------*/
- void Dbtc::execLQH_TRANSCONF(Signal* signal)
- {
- jamEntry();
- LqhTransConf * const lqhTransConf = (LqhTransConf *)&signal->theData[0];
-
- tcNodeFailptr.i = lqhTransConf->tcRef;
- ptrCheckGuard(tcNodeFailptr, 1, tcFailRecord);
- tnodeid = lqhTransConf->lqhNodeId;
- ttransStatus = (LqhTransConf::OperationStatus)lqhTransConf->operationStatus;
- ttransid1 = lqhTransConf->transId1;
- ttransid2 = lqhTransConf->transId2;
- ttcOprec = lqhTransConf->oldTcOpRec;
- treqinfo = lqhTransConf->requestInfo;
- tgci = lqhTransConf->gci;
- cnodes[0] = lqhTransConf->nextNodeId1;
- cnodes[1] = lqhTransConf->nextNodeId2;
- cnodes[2] = lqhTransConf->nextNodeId3;
- const Uint32 ref = tapplRef = lqhTransConf->apiRef;
- tapplOprec = lqhTransConf->apiOpRec;
- const Uint32 tableId = lqhTransConf->tableId;
- if (ttransStatus == LqhTransConf::LastTransConf){
- jam();
- /*------------------------------------------------------------*/
- /* A NODE HAS REPORTED COMPLETION OF TAKE OVER REPORTING*/
- /*------------------------------------------------------------*/
- nodeTakeOverCompletedLab(signal);
- return;
- }//if
- if (ttransStatus == LqhTransConf::Marker){
- jam();
- treqinfo = 0;
- LqhTransConf::setMarkerFlag(treqinfo, 1);
- } else {
- TableRecordPtr tabPtr;
- tabPtr.i = tableId;
- ptrCheckGuard(tabPtr, ctabrecFilesize, tableRecord);
- switch((DictTabInfo::TableType)tabPtr.p->tableType){
- case DictTabInfo::SystemTable:
- case DictTabInfo::UserTable:
- break;
- default:
- tapplRef = 0;
- tapplOprec = 0;
- }
- }
- findApiConnectFail(signal);
- if(apiConnectptr.p->ndbapiBlockref == 0 && tapplRef != 0){
- apiConnectptr.p->ndbapiBlockref = ref;
- apiConnectptr.p->ndbapiConnect = tapplOprec;
- }
-
- if (ttransStatus != LqhTransConf::Marker){
- jam();
- findTcConnectFail(signal);
- }
- }//Dbtc::execLQH_TRANSCONF()
- /*------------------------------------------------------------*/
- /* A NODE HAS REPORTED COMPLETION OF TAKE OVER REPORTING*/
- /*------------------------------------------------------------*/
- void Dbtc::nodeTakeOverCompletedLab(Signal* signal)
- {
- Uint32 guard0;
- hostptr.i = tnodeid;
- ptrCheckGuard(hostptr, chostFilesize, hostRecord);
- hostptr.p->lqhTransStatus = LTS_IDLE;
- for (hostptr.i = 1; hostptr.i < MAX_NDB_NODES; hostptr.i++) {
- jam();
- ptrAss(hostptr, hostRecord);
- if (hostptr.p->hostStatus == HS_ALIVE) {
- if (hostptr.p->lqhTransStatus == LTS_ACTIVE) {
- jam();
- /*------------------------------------------------------------*/
- /* NOT ALL NODES ARE COMPLETED WITH REPORTING IN THE */
- /* TAKE OVER. */
- /*------------------------------------------------------------*/
- return;
- }//if
- }//if
- }//for
- /*------------------------------------------------------------*/
- /* ALL NODES HAVE REPORTED ON THE STATUS OF THE VARIOUS */
- /* OPERATIONS THAT WAS CONTROLLED BY THE FAILED TC. WE */
- /* ARE NOW IN A POSITION TO COMPLETE ALL OF THOSE */
- /* TRANSACTIONS EITHER IN A SUCCESSFUL WAY OR IN AN */
- /* UNSUCCESSFUL WAY. WE WILL ALSO REPORT THIS CONCLUSION*/
- /* TO THE APPLICATION IF THAT IS STILL ALIVE. */
- /*------------------------------------------------------------*/
- tcNodeFailptr.p->currentHashIndexTakeOver = 0;
- tcNodeFailptr.p->completedTakeOver = 0;
- tcNodeFailptr.p->failStatus = FS_COMPLETING;
- guard0 = cnoParallelTakeOver - 1;
- /*------------------------------------------------------------*/
- /* WE WILL COMPLETE THE TRANSACTIONS BY STARTING A */
- /* NUMBER OF PARALLEL ACTIVITIES. EACH ACTIVITY WILL */
- /* COMPLETE ONE TRANSACTION AT A TIME AND IN THAT */
- /* TRANSACTION IT WILL COMPLETE ONE OPERATION AT A TIME.*/
- /* WHEN ALL ACTIVITIES ARE COMPLETED THEN THE TAKE OVER */
- /* IS COMPLETED. */
- /*------------------------------------------------------------*/
- arrGuard(guard0, MAX_NDB_NODES);
- for (tindex = 0; tindex <= guard0; tindex++) {
- jam();
- tcNodeFailptr.p->takeOverProcState[tindex] = ZTAKE_OVER_ACTIVE;
- signal->theData[0] = TcContinueB::ZCOMPLETE_TRANS_AT_TAKE_OVER;
- signal->theData[1] = tcNodeFailptr.i;
- signal->theData[2] = tindex;
- sendSignal(cownref, GSN_CONTINUEB, signal, 3, JBB);
- }//for
- }//Dbtc::nodeTakeOverCompletedLab()
- /*------------------------------------------------------------*/
- /* COMPLETE A NEW TRANSACTION FROM THE HASH TABLE OF */
- /* TRANSACTIONS TO COMPLETE. */
- /*------------------------------------------------------------*/
- void Dbtc::completeTransAtTakeOverLab(Signal* signal, UintR TtakeOverInd)
- {
- jam();
- while (tcNodeFailptr.p->currentHashIndexTakeOver < 512){
- jam();
- apiConnectptr.i =
- ctransidFailHash[tcNodeFailptr.p->currentHashIndexTakeOver];
- if (apiConnectptr.i != RNIL) {
- jam();
- /*------------------------------------------------------------*/
- /* WE HAVE FOUND A TRANSACTION THAT NEEDS TO BE */
- /* COMPLETED. REMOVE IT FROM THE HASH TABLE SUCH THAT */
- /* NOT ANOTHER ACTIVITY ALSO TRIES TO COMPLETE THIS */
- /* TRANSACTION. */
- /*------------------------------------------------------------*/
- ptrCheckGuard(apiConnectptr, capiConnectFilesize, apiConnectRecord);
- ctransidFailHash[tcNodeFailptr.p->currentHashIndexTakeOver] =
- apiConnectptr.p->nextApiConnect;
- completeTransAtTakeOverDoOne(signal, TtakeOverInd);
- // One transaction taken care of, return from this function
- // and wait for the next CONTINUEB to continue processing
- break;
- } else {
- if (tcNodeFailptr.p->currentHashIndexTakeOver < 511){
- jam();
- tcNodeFailptr.p->currentHashIndexTakeOver++;
- } else {
- jam();
- completeTransAtTakeOverDoLast(signal, TtakeOverInd);
- tcNodeFailptr.p->currentHashIndexTakeOver++;
- }//if
- }//if
- }//while
- }//Dbtc::completeTransAtTakeOverLab()
- void Dbtc::completeTransAtTakeOverDoLast(Signal* signal, UintR TtakeOverInd)
- {
- Uint32 guard0;
- /*------------------------------------------------------------*/
- /* THERE ARE NO MORE TRANSACTIONS TO COMPLETE. THIS */
- /* ACTIVITY IS COMPLETED. */
- /*------------------------------------------------------------*/
- arrGuard(TtakeOverInd, MAX_NDB_NODES);
- if (tcNodeFailptr.p->takeOverProcState[TtakeOverInd] != ZTAKE_OVER_ACTIVE) {
- jam();
- systemErrorLab(signal);
- return;
- }//if
- tcNodeFailptr.p->takeOverProcState[TtakeOverInd] = ZTAKE_OVER_IDLE;
- tcNodeFailptr.p->completedTakeOver++;
- if (tcNodeFailptr.p->completedTakeOver == cnoParallelTakeOver) {
- jam();
- /*------------------------------------------------------------*/
- /* WE WERE THE LAST ACTIVITY THAT WAS COMPLETED. WE NEED*/
- /* TO REPORT THE COMPLETION OF THE TAKE OVER TO ALL */
- /* NODES THAT ARE ALIVE. */
- /*------------------------------------------------------------*/
- for (hostptr.i = 1; hostptr.i < MAX_NDB_NODES; hostptr.i++) {
- jam();
- ptrAss(hostptr, hostRecord);
- if (hostptr.p->hostStatus == HS_ALIVE) {
- jam();
- tblockref = calcTcBlockRef(hostptr.i);
- signal->theData[0] = tcNodeFailptr.p->takeOverNode;
- sendSignal(tblockref, GSN_TAKE_OVERTCCONF, signal, 1, JBB);
- }//if
- }//for
- if (tcNodeFailptr.p->queueIndex > 0) {
- jam();
- /*------------------------------------------------------------*/
- /* THERE ARE MORE NODES TO TAKE OVER. WE NEED TO START */
- /* THE TAKE OVER. */
- /*------------------------------------------------------------*/
- tfailedNodeId = tcNodeFailptr.p->queueList[0];
- guard0 = tcNodeFailptr.p->queueIndex - 1;
- arrGuard(guard0 + 1, MAX_NDB_NODES);
- for (tindex = 0; tindex <= guard0; tindex++) {
- jam();
- tcNodeFailptr.p->queueList[tindex] =
- tcNodeFailptr.p->queueList[tindex + 1];
- }//for
- tcNodeFailptr.p->queueIndex--;
- startTakeOverLab(signal);
- return;
- } else {
- jam();
- tcNodeFailptr.p->failStatus = FS_IDLE;
- }//if
- }//if
- return;
- }//Dbtc::completeTransAtTakeOverDoLast()
- void Dbtc::completeTransAtTakeOverDoOne(Signal* signal, UintR TtakeOverInd)
- {
- apiConnectptr.p->takeOverRec = (Uint8)tcNodeFailptr.i;
- apiConnectptr.p->takeOverInd = TtakeOverInd;
- switch (apiConnectptr.p->apiConnectstate) {
- case CS_FAIL_COMMITTED:
- jam();
- /*------------------------------------------------------------*/
- /* ALL PARTS OF THE TRANSACTIONS REPORTED COMMITTED. WE */
- /* HAVE THUS COMPLETED THE COMMIT PHASE. WE CAN REPORT */
- /* COMMITTED TO THE APPLICATION AND CONTINUE WITH THE */
- /* COMPLETE PHASE. */
- /*------------------------------------------------------------*/
- sendTCKEY_FAILCONF(signal, apiConnectptr.p);
- tcConnectptr.i = apiConnectptr.p->firstTcConnect;
- ptrCheckGuard(tcConnectptr, ctcConnectFilesize, tcConnectRecord);
- apiConnectptr.p->currentTcConnect = tcConnectptr.i;
- apiConnectptr.p->currentReplicaNo = tcConnectptr.p->lastReplicaNo;
- tcurrentReplicaNo = tcConnectptr.p->lastReplicaNo;
- toCompleteHandlingLab(signal);
- return;
- case CS_FAIL_COMMITTING:
- jam();
- /*------------------------------------------------------------*/
- /* AT LEAST ONE PART WAS ONLY PREPARED AND AT LEAST ONE */
- /* PART WAS COMMITTED. COMPLETE THE COMMIT PHASE FIRST. */
- /* THEN CONTINUE AS AFTER COMMITTED. */
- /*------------------------------------------------------------*/
- tcConnectptr.i = apiConnectptr.p->firstTcConnect;
- ptrCheckGuard(tcConnectptr, ctcConnectFilesize, tcConnectRecord);
- apiConnectptr.p->currentTcConnect = tcConnectptr.i;
- apiConnectptr.p->currentReplicaNo = tcConnectptr.p->lastReplicaNo;
- tcurrentReplicaNo = tcConnectptr.p->lastReplicaNo;
- toCommitHandlingLab(signal);
- return;
- case CS_FAIL_ABORTING:
- case CS_FAIL_PREPARED:
- jam();
- /*------------------------------------------------------------*/
- /* WE WILL ABORT THE TRANSACTION IF IT IS IN A PREPARED */
- /* STATE IN THIS VERSION. IN LATER VERSIONS WE WILL */
- /* HAVE TO ADD CODE FOR HANDLING OF PREPARED-TO-COMMIT */
- /* TRANSACTIONS. THESE ARE NOT ALLOWED TO ABORT UNTIL WE*/
- /* HAVE HEARD FROM THE TRANSACTION COORDINATOR. */
- /* */
- /* IT IS POSSIBLE TO COMMIT TRANSACTIONS THAT ARE */
- /* PREPARED ACTUALLY. WE WILL LEAVE THIS PROBLEM UNTIL */
- /* LATER VERSIONS. */
- /*------------------------------------------------------------*/
- tcConnectptr.i = apiConnectptr.p->firstTcConnect;
- ptrCheckGuard(tcConnectptr, ctcConnectFilesize, tcConnectRecord);
- apiConnectptr.p->currentTcConnect = tcConnectptr.i;
- apiConnectptr.p->currentReplicaNo = tcConnectptr.p->lastReplicaNo;
- tcurrentReplicaNo = tcConnectptr.p->lastReplicaNo;
- toAbortHandlingLab(signal);
- return;
- case CS_FAIL_ABORTED:
- jam();
- sendTCKEY_FAILREF(signal, apiConnectptr.p);
-
- signal->theData[0] = TcContinueB::ZCOMPLETE_TRANS_AT_TAKE_OVER;
- signal->theData[1] = (UintR)apiConnectptr.p->takeOverRec;
- signal->theData[2] = apiConnectptr.p->takeOverInd;
- sendSignal(cownref, GSN_CONTINUEB, signal, 3, JBB);
- releaseTakeOver(signal);
- break;
- case CS_FAIL_COMPLETED:
- jam();
- sendTCKEY_FAILCONF(signal, apiConnectptr.p);
-
- signal->theData[0] = TcContinueB::ZCOMPLETE_TRANS_AT_TAKE_OVER;
- signal->theData[1] = (UintR)apiConnectptr.p->takeOverRec;
- signal->theData[2] = apiConnectptr.p->takeOverInd;
- sendSignal(cownref, GSN_CONTINUEB, signal, 3, JBB);
- releaseApiConnectFail(signal);
- break;
- default:
- jam();
- systemErrorLab(signal);
- return;
- }//switch
- }//Dbtc::completeTransAtTakeOverDoOne()
- void
- Dbtc::sendTCKEY_FAILREF(Signal* signal, const ApiConnectRecord * regApiPtr){
- jam();
- const Uint32 ref = regApiPtr->ndbapiBlockref;
- if(ref != 0){
- signal->theData[0] = regApiPtr->ndbapiConnect;
- signal->theData[1] = regApiPtr->transid[0];
- signal->theData[2] = regApiPtr->transid[1];
-
- sendSignal(ref, GSN_TCKEY_FAILREF, signal, 3, JBB);
- }
- }
- void
- Dbtc::sendTCKEY_FAILCONF(Signal* signal, ApiConnectRecord * regApiPtr){
- jam();
- TcKeyFailConf * const failConf = (TcKeyFailConf *)&signal->theData[0];
-
- const Uint32 ref = regApiPtr->ndbapiBlockref;
- const Uint32 marker = regApiPtr->commitAckMarker;
- if(ref != 0){
- failConf->apiConnectPtr = regApiPtr->ndbapiConnect | (marker != RNIL);
- failConf->transId1 = regApiPtr->transid[0];
- failConf->transId2 = regApiPtr->transid[1];
-
- sendSignal(regApiPtr->ndbapiBlockref,
- GSN_TCKEY_FAILCONF, signal, TcKeyFailConf::SignalLength, JBB);
- }
- regApiPtr->commitAckMarker = RNIL;
- }
- /*------------------------------------------------------------*/
- /* THIS PART HANDLES THE ABORT PHASE IN THE CASE OF A */
- /* NODE FAILURE BEFORE THE COMMIT DECISION. */
- /*------------------------------------------------------------*/
- /* ABORT REQUEST SUCCESSFULLY COMPLETED ON TNODEID */
- /*------------------------------------------------------------*/
- void Dbtc::execABORTCONF(Signal* signal)
- {
- UintR compare_transid1, compare_transid2;
- jamEntry();
- tcConnectptr.i = signal->theData[0];
- tnodeid = signal->theData[2];
- if (ERROR_INSERTED(8045)) {
- CLEAR_ERROR_INSERT_VALUE;
- sendSignalWithDelay(cownref, GSN_ABORTCONF, signal, 2000, 5);
- return;
- }//if
- if (tcConnectptr.i >= ctcConnectFilesize) {
- errorReport(signal, 5);
- return;
- }//if
- ptrAss(tcConnectptr, tcConnectRecord);
- if (tcConnectptr.p->tcConnectstate != OS_WAIT_ABORT_CONF) {
- warningReport(signal, 16);
- return;
- }//if
- apiConnectptr.i = tcConnectptr.p->apiConnect;
- ptrCheckGuard(apiConnectptr, capiConnectFilesize, apiConnectRecord);
- if (apiConnectptr.p->apiConnectstate != CS_WAIT_ABORT_CONF) {
- warningReport(signal, 17);
- return;
- }//if
- compare_transid1 = apiConnectptr.p->transid[0] ^ signal->theData[3];
- compare_transid2 = apiConnectptr.p->transid[1] ^ signal->theData[4];
- compare_transid1 = compare_transid1 | compare_transid2;
- if (compare_transid1 != 0) {
- warningReport(signal, 18);
- return;
- }//if
- arrGuard(apiConnectptr.p->currentReplicaNo, 4);
- if (tcConnectptr.p->tcNodedata[apiConnectptr.p->currentReplicaNo] !=
- tnodeid) {
- warningReport(signal, 19);
- return;
- }//if
- tcurrentReplicaNo = (Uint8)Z8NIL;
- tcConnectptr.p->tcConnectstate = OS_ABORTING;
- toAbortHandlingLab(signal);
- }//Dbtc::execABORTCONF()
- void Dbtc::toAbortHandlingLab(Signal* signal)
- {
- do {
- if (tcurrentReplicaNo != (Uint8)Z8NIL) {
- jam();
- arrGuard(tcurrentReplicaNo, 4);
- const LqhTransConf::OperationStatus stat =
- (LqhTransConf::OperationStatus)
- tcConnectptr.p->failData[tcurrentReplicaNo];
- switch(stat){
- case LqhTransConf::InvalidStatus:
- case LqhTransConf::Aborted:
- jam();
- /*empty*/;
- break;
- case LqhTransConf::Prepared:
- jam();
- hostptr.i = tcConnectptr.p->tcNodedata[tcurrentReplicaNo];
- ptrCheckGuard(hostptr, chostFilesize, hostRecord);
- if (hostptr.p->hostStatus == HS_ALIVE) {
- jam();
- tblockref = calcLqhBlockRef(hostptr.i);
- setApiConTimer(apiConnectptr.i, ctcTimer, __LINE__);
- tcConnectptr.p->tcConnectstate = OS_WAIT_ABORT_CONF;
- apiConnectptr.p->apiConnectstate = CS_WAIT_ABORT_CONF;
- apiConnectptr.p->timeOutCounter = 0;
- signal->theData[0] = tcConnectptr.i;
- signal->theData[1] = cownref;
- signal->theData[2] = apiConnectptr.p->transid[0];
- signal->theData[3] = apiConnectptr.p->transid[1];
- signal->theData[4] = apiConnectptr.p->tcBlockref;
- signal->theData[5] = tcConnectptr.p->tcOprec;
- sendSignal(tblockref, GSN_ABORTREQ, signal, 6, JBB);
- return;
- }//if
- break;
- default:
- jam();
- systemErrorLab(signal);
- return;
- }//switch
- }//if
- if (apiConnectptr.p->currentReplicaNo > 0) {
- jam();
- /*------------------------------------------------------------*/
- /* THERE IS STILL ANOTHER REPLICA THAT NEEDS TO BE */
- /* ABORTED. */
- /*------------------------------------------------------------*/
- apiConnectptr.p->currentReplicaNo--;
- tcurrentReplicaNo = apiConnectptr.p->currentReplicaNo;
- } else {
- /*------------------------------------------------------------*/
- /* THE LAST REPLICA IN THIS OPERATION HAVE COMMITTED. */
- /*------------------------------------------------------------*/
- tcConnectptr.i = tcConnectptr.p->nextTcConnect;
- if (tcConnectptr.i == RNIL) {
- /*------------------------------------------------------------*/
- /* WE HAVE COMPLETED THE ABORT PHASE. WE CAN NOW REPORT */
- /* THE ABORT STATUS TO THE APPLICATION AND CONTINUE */
- /* WITH THE NEXT TRANSACTION. */
- /*------------------------------------------------------------*/
- if (apiConnectptr.p->takeOverRec != (Uint8)Z8NIL) {
- jam();
- sendTCKEY_FAILREF(signal, apiConnectptr.p);
- const Uint32 marker = apiConnectptr.p->commitAckMarker;
- if(marker != RNIL){
- jam();
- CommitAckMarkerPtr tmp;
- tmp.i = marker;
- tmp.p = m_commitAckMarkerHash.getPtr(tmp.i);
-
- m_commitAckMarkerHash.release(tmp);
- apiConnectptr.p->commitAckMarker = RNIL;
- }
-
- /*------------------------------------------------------------*/
- /* WE HAVE COMPLETED THIS TRANSACTION NOW AND CAN */
- /* CONTINUE THE PROCESS WITH THE NEXT TRANSACTION. */
- /*------------------------------------------------------------*/
- signal->theData[0] = TcContinueB::ZCOMPLETE_TRANS_AT_TAKE_OVER;
- signal->theData[1] = (UintR)apiConnectptr.p->takeOverRec;
- signal->theData[2] = apiConnectptr.p->takeOverInd;
- sendSignal(cownref, GSN_CONTINUEB, signal, 3, JBB);
- releaseTakeOver(signal);
- } else {
- jam();
- releaseAbortResources(signal);
- }//if
- return;
- }//if
- apiConnectptr.p->currentTcConnect = tcConnectptr.i;
- ptrCheckGuard(tcConnectptr, ctcConnectFilesize, tcConnectRecord);
- apiConnectptr.p->currentReplicaNo = tcConnectptr.p->lastReplicaNo;
- tcurrentReplicaNo = tcConnectptr.p->lastReplicaNo;
- }//if
- } while (1);
- }//Dbtc::toAbortHandlingLab()
- /*------------------------------------------------------------*/
- /* THIS PART HANDLES THE COMMIT PHASE IN THE CASE OF A */
- /* NODE FAILURE IN THE MIDDLE OF THE COMMIT PHASE. */
- /*------------------------------------------------------------*/
- /* COMMIT REQUEST SUCCESSFULLY COMPLETED ON TNODEID */
- /*------------------------------------------------------------*/
- void Dbtc::execCOMMITCONF(Signal* signal)
- {
- UintR compare_transid1, compare_transid2;
- jamEntry();
- tcConnectptr.i = signal->theData[0];
- tnodeid = signal->theData[1];
- if (ERROR_INSERTED(8046)) {
- CLEAR_ERROR_INSERT_VALUE;
- sendSignalWithDelay(cownref, GSN_COMMITCONF, signal, 2000, 4);
- return;
- }//if
- if (tcConnectptr.i >= ctcConnectFilesize) {
- errorReport(signal, 4);
- return;
- }//if
- ptrAss(tcConnectptr, tcConnectRecord);
- if (tcConnectptr.p->tcConnectstate != OS_WAIT_COMMIT_CONF) {
- warningReport(signal, 8);
- return;
- }//if
- apiConnectptr.i = tcConnectptr.p->apiConnect;
- ptrCheckGuard(apiConnectptr, capiConnectFilesize, apiConnectRecord);
- if (apiConnectptr.p->apiConnectstate != CS_WAIT_COMMIT_CONF) {
- warningReport(signal, 9);
- return;
- }//if
- compare_transid1 = apiConnectptr.p->transid[0] ^ signal->theData[2];
- compare_transid2 = apiConnectptr.p->transid[1] ^ signal->theData[3];
- compare_transid1 = compare_transid1 | compare_transid2;
- if (compare_transid1 != 0) {
- warningReport(signal, 10);
- return;
- }//if
- arrGuard(apiConnectptr.p->currentReplicaNo, 4);
- if (tcConnectptr.p->tcNodedata[apiConnectptr.p->currentReplicaNo] !=
- tnodeid) {
- warningReport(signal, 11);
- return;
- }//if
- if (ERROR_INSERTED(8026)) {
- jam();
- systemErrorLab(signal);
- }//if
- tcurrentReplicaNo = (Uint8)Z8NIL;
- tcConnectptr.p->tcConnectstate = OS_COMMITTED;
- toCommitHandlingLab(signal);
- }//Dbtc::execCOMMITCONF()
- void Dbtc::toCommitHandlingLab(Signal* signal)
- {
- do {
- if (tcurrentReplicaNo != (Uint8)Z8NIL) {
- jam();
- arrGuard(tcurrentReplicaNo, 4);
- switch (tcConnectptr.p->failData[tcurrentReplicaNo]) {
- case LqhTransConf::InvalidStatus:
- jam();
- /*empty*/;
- break;
- case LqhTransConf::Committed:
- jam();
- /*empty*/;
- break;
- case LqhTransConf::Prepared:
- jam();
- /*------------------------------------------------------------*/
- /* THE NODE WAS PREPARED AND IS WAITING FOR ABORT OR */
- /* COMMIT REQUEST FROM TC. */
- /*------------------------------------------------------------*/
- hostptr.i = tcConnectptr.p->tcNodedata[tcurrentReplicaNo];
- ptrCheckGuard(hostptr, chostFilesize, hostRecord);
- if (hostptr.p->hostStatus == HS_ALIVE) {
- jam();
- tblockref = calcLqhBlockRef(hostptr.i);
- setApiConTimer(apiConnectptr.i, ctcTimer, __LINE__);
- apiConnectptr.p->apiConnectstate = CS_WAIT_COMMIT_CONF;
- apiConnectptr.p->timeOutCounter = 0;
- tcConnectptr.p->tcConnectstate = OS_WAIT_COMMIT_CONF;
- signal->theData[0] = tcConnectptr.i;
- signal->theData[1] = cownref;
- signal->theData[2] = apiConnectptr.p->globalcheckpointid;
- signal->theData[3] = apiConnectptr.p->transid[0];
- signal->theData[4] = apiConnectptr.p->transid[1];
- signal->theData[5] = apiConnectptr.p->tcBlockref;
- signal->theData[6] = tcConnectptr.p->tcOprec;
- sendSignal(tblockref, GSN_COMMITREQ, signal, 7, JBB);
- return;
- }//if
- break;
- default:
- jam();
- systemErrorLab(signal);
- return;
- break;
- }//switch
- }//if
- if (apiConnectptr.p->currentReplicaNo > 0) {
- jam();
- /*------------------------------------------------------------*/
- /* THERE IS STILL ANOTHER REPLICA THAT NEEDS TO BE */
- /* COMMITTED. */
- /*------------------------------------------------------------*/
- apiConnectptr.p->currentReplicaNo--;
- tcurrentReplicaNo = apiConnectptr.p->currentReplicaNo;
- } else {
- /*------------------------------------------------------------*/
- /* THE LAST REPLICA IN THIS OPERATION HAVE COMMITTED. */
- /*------------------------------------------------------------*/
- tcConnectptr.i = tcConnectptr.p->nextTcConnect;
- if (tcConnectptr.i == RNIL) {
- /*------------------------------------------------------------*/
- /* WE HAVE COMPLETED THE COMMIT PHASE. WE CAN NOW REPORT*/
- /* THE COMMIT STATUS TO THE APPLICATION AND CONTINUE */
- /* WITH THE COMPLETE PHASE. */
- /*------------------------------------------------------------*/
- if (apiConnectptr.p->takeOverRec != (Uint8)Z8NIL) {
- jam();
- sendTCKEY_FAILCONF(signal, apiConnectptr.p);
- } else {
- jam();
- sendApiCommit(signal);
- }//if
- apiConnectptr.p->currentTcConnect = apiConnectptr.p->firstTcConnect;
- tcConnectptr.i = apiConnectptr.p->firstTcConnect;
- ptrCheckGuard(tcConnectptr, ctcConnectFilesize, tcConnectRecord);
- tcurrentReplicaNo = tcConnectptr.p->lastReplicaNo;
- apiConnectptr.p->currentReplicaNo = tcurrentReplicaNo;
- toCompleteHandlingLab(signal);
- return;
- }//if
- apiConnectptr.p->currentTcConnect = tcConnectptr.i;
- ptrCheckGuard(tcConnectptr, ctcConnectFilesize, tcConnectRecord);
- apiConnectptr.p->currentReplicaNo = tcConnectptr.p->lastReplicaNo;
- tcurrentReplicaNo = tcConnectptr.p->lastReplicaNo;
- }//if
- } while (1);
- }//Dbtc::toCommitHandlingLab()
- /*------------------------------------------------------------*/
- /* COMMON PART TO HANDLE COMPLETE PHASE WHEN ANY NODE */
- /* HAVE FAILED. */
- /*------------------------------------------------------------*/
- /* THE NODE WITH TNODEID HAVE COMPLETED THE OPERATION */
- /*------------------------------------------------------------*/
- void Dbtc::execCOMPLETECONF(Signal* signal)
- {
- UintR compare_transid1, compare_transid2;
- jamEntry();
- tcConnectptr.i = signal->theData[0];
- tnodeid = signal->theData[1];
- if (ERROR_INSERTED(8047)) {
- CLEAR_ERROR_INSERT_VALUE;
- sendSignalWithDelay(cownref, GSN_COMPLETECONF, signal, 2000, 4);
- return;
- }//if
- if (tcConnectptr.i >= ctcConnectFilesize) {
- errorReport(signal, 3);
- return;
- }//if
- ptrAss(tcConnectptr, tcConnectRecord);
- if (tcConnectptr.p->tcConnectstate != OS_WAIT_COMPLETE_CONF) {
- warningReport(signal, 12);
- return;
- }//if
- apiConnectptr.i = tcConnectptr.p->apiConnect;
- ptrCheckGuard(apiConnectptr, capiConnectFilesize, apiConnectRecord);
- if (apiConnectptr.p->apiConnectstate != CS_WAIT_COMPLETE_CONF) {
- warningReport(signal, 13);
- return;
- }//if
- compare_transid1 = apiConnectptr.p->transid[0] ^ signal->theData[2];
- compare_transid2 = apiConnectptr.p->transid[1] ^ signal->theData[3];
- compare_transid1 = compare_transid1 | compare_transid2;
- if (compare_transid1 != 0) {
- warningReport(signal, 14);
- return;
- }//if
- arrGuard(apiConnectptr.p->currentReplicaNo, 4);
- if (tcConnectptr.p->tcNodedata[apiConnectptr.p->currentReplicaNo] !=
- tnodeid) {
- warningReport(signal, 15);
- return;
- }//if
- if (ERROR_INSERTED(8028)) {
- jam();
- systemErrorLab(signal);
- }//if
- tcConnectptr.p->tcConnectstate = OS_COMPLETED;
- tcurrentReplicaNo = (Uint8)Z8NIL;
- toCompleteHandlingLab(signal);
- }//Dbtc::execCOMPLETECONF()
- void Dbtc::toCompleteHandlingLab(Signal* signal)
- {
- do {
- if (tcurrentReplicaNo != (Uint8)Z8NIL) {
- jam();
- arrGuard(tcurrentReplicaNo, 4);
- switch (tcConnectptr.p->failData[tcurrentReplicaNo]) {
- case LqhTransConf::InvalidStatus:
- jam();
- /*empty*/;
- break;
- default:
- jam();
- /*------------------------------------------------------------*/
- /* THIS NODE DID NOT REPORT ANYTHING FOR THIS OPERATION */
- /* IT MUST HAVE FAILED. */
- /*------------------------------------------------------------*/
- /*------------------------------------------------------------*/
- /* SEND COMPLETEREQ TO THE NEXT REPLICA. */
- /*------------------------------------------------------------*/
- hostptr.i = tcConnectptr.p->tcNodedata[tcurrentReplicaNo];
- ptrCheckGuard(hostptr, chostFilesize, hostRecord);
- if (hostptr.p->hostStatus == HS_ALIVE) {
- jam();
- tblockref = calcLqhBlockRef(hostptr.i);
- setApiConTimer(apiConnectptr.i, ctcTimer, __LINE__);
- tcConnectptr.p->tcConnectstate = OS_WAIT_COMPLETE_CONF;
- apiConnectptr.p->apiConnectstate = CS_WAIT_COMPLETE_CONF;
- apiConnectptr.p->timeOutCounter = 0;
- tcConnectptr.p->apiConnect = apiConnectptr.i;
- signal->theData[0] = tcConnectptr.i;
- signal->theData[1] = cownref;
- signal->theData[2] = apiConnectptr.p->transid[0];
- signal->theData[3] = apiConnectptr.p->transid[1];
- signal->theData[4] = apiConnectptr.p->tcBlockref;
- signal->theData[5] = tcConnectptr.p->tcOprec;
- sendSignal(tblockref, GSN_COMPLETEREQ, signal, 6, JBB);
- return;
- }//if
- break;
- }//switch
- }//if
- if (apiConnectptr.p->currentReplicaNo != 0) {
- jam();
- /*------------------------------------------------------------*/
- /* THERE ARE STILL MORE REPLICAS IN THIS OPERATION. WE */
- /* NEED TO CONTINUE WITH THOSE REPLICAS. */
- /*------------------------------------------------------------*/
- apiConnectptr.p->currentReplicaNo--;
- tcurrentReplicaNo = apiConnectptr.p->currentReplicaNo;
- } else {
- tcConnectptr.i = tcConnectptr.p->nextTcConnect;
- if (tcConnectptr.i == RNIL) {
- /*------------------------------------------------------------*/
- /* WE HAVE COMPLETED THIS TRANSACTION NOW AND CAN */
- /* CONTINUE THE PROCESS WITH THE NEXT TRANSACTION. */
- /*------------------------------------------------------------*/
- if (apiConnectptr.p->takeOverRec != (Uint8)Z8NIL) {
- jam();
- signal->theData[0] = TcContinueB::ZCOMPLETE_TRANS_AT_TAKE_OVER;
- signal->theData[1] = (UintR)apiConnectptr.p->takeOverRec;
- signal->theData[2] = apiConnectptr.p->takeOverInd;
- sendSignal(cownref, GSN_CONTINUEB, signal, 3, JBB);
- releaseTakeOver(signal);
- } else {
- jam();
- releaseTransResources(signal);
- }//if
- return;
- }//if
- /*------------------------------------------------------------*/
- /* WE HAVE COMPLETED AN OPERATION AND THERE ARE MORE TO */
- /* COMPLETE. TAKE THE NEXT OPERATION AND START WITH THE */
- /* FIRST REPLICA SINCE IT IS THE COMPLETE PHASE. */
- /*------------------------------------------------------------*/
- apiConnectptr.p->currentTcConnect = tcConnectptr.i;
- ptrCheckGuard(tcConnectptr, ctcConnectFilesize, tcConnectRecord);
- tcurrentReplicaNo = tcConnectptr.p->lastReplicaNo;
- apiConnectptr.p->currentReplicaNo = tcurrentReplicaNo;
- }//if
- } while (1);
- }//Dbtc::toCompleteHandlingLab()
- /*------------------------------------------------------------*/
- /* */
- /* FIND THE API CONNECT RECORD FOR THIS TRANSACTION */
- /* DURING TAKE OVER FROM A FAILED TC. IF NONE EXISTS */
- /* YET THEN SEIZE A NEW API CONNECT RECORD AND LINK IT */
- /* INTO THE HASH TABLE. */
- /*------------------------------------------------------------*/
- void Dbtc::findApiConnectFail(Signal* signal)
- {
- ApiConnectRecordPtr fafPrevApiConnectptr;
- ApiConnectRecordPtr fafNextApiConnectptr;
- UintR tfafHashNumber;
- tfafHashNumber = ttransid1 & 511;
- fafPrevApiConnectptr.i = RNIL;
- ptrNull(fafPrevApiConnectptr);
- arrGuard(tfafHashNumber, 512);
- fafNextApiConnectptr.i = ctransidFailHash[tfafHashNumber];
- ptrCheck(fafNextApiConnectptr, capiConnectFilesize, apiConnectRecord);
- FAF_LOOP:
- jam();
- if (fafNextApiConnectptr.i == RNIL) {
- jam();
- if (cfirstfreeApiConnectFail == RNIL) {
- jam();
- systemErrorLab(signal);
- return;
- }//if
- seizeApiConnectFail(signal);
- if (fafPrevApiConnectptr.i == RNIL) {
- jam();
- ctransidFailHash[tfafHashNumber] = apiConnectptr.i;
- } else {
- jam();
- ptrGuard(fafPrevApiConnectptr);
- fafPrevApiConnectptr.p->nextApiConnect = apiConnectptr.i;
- }//if
- apiConnectptr.p->nextApiConnect = RNIL;
- initApiConnectFail(signal);
- } else {
- jam();
- fafPrevApiConnectptr.i = fafNextApiConnectptr.i;
- fafPrevApiConnectptr.p = fafNextApiConnectptr.p;
- apiConnectptr.i = fafNextApiConnectptr.i;
- ptrCheckGuard(apiConnectptr, capiConnectFilesize, apiConnectRecord);
- fafNextApiConnectptr.i = apiConnectptr.p->nextApiConnect;
- ptrCheck(fafNextApiConnectptr, capiConnectFilesize, apiConnectRecord);
- if ((apiConnectptr.p->transid[1] != ttransid2) ||
- (apiConnectptr.p->transid[0] != ttransid1)) {
- goto FAF_LOOP;
- }//if
- updateApiStateFail(signal);
- }//if
- }//Dbtc::findApiConnectFail()
- /*----------------------------------------------------------*/
- /* FIND THE TC CONNECT AND IF NOT FOUND ALLOCATE A NEW */
- /*----------------------------------------------------------*/
- void Dbtc::findTcConnectFail(Signal* signal)
- {
- UintR tftfHashNumber;
- tftfHashNumber = (ttransid1 ^ ttcOprec) & 1023;
- tcConnectptr.i = ctcConnectFailHash[tftfHashNumber];
- do {
- if (tcConnectptr.i == RNIL) {
- jam();
- if (cfirstfreeTcConnectFail == RNIL) {
- jam();
- systemErrorLab(signal);
- return;
- }//if
- seizeTcConnectFail(signal);
- linkTcInConnectionlist(signal);
- tcConnectptr.p->nextTcFailHash = ctcConnectFailHash[tftfHashNumber];
- ctcConnectFailHash[tftfHashNumber] = tcConnectptr.i;
- initTcConnectFail(signal);
- return;
- } else {
- ptrCheckGuard(tcConnectptr, ctcConnectFilesize, tcConnectRecord);
- if (tcConnectptr.p->tcOprec != ttcOprec) {
- jam(); /* FRAGMENTID = TC_OPREC HERE, LOOP ANOTHER TURN */
- tcConnectptr.i = tcConnectptr.p->nextTcFailHash;
- } else {
- updateTcStateFail(signal);
- return;
- }//if
- }//if
- } while (1);
- }//Dbtc::findTcConnectFail()
- /*----------------------------------------------------------*/
- /* INITIALISE AN API CONNECT FAIL RECORD */
- /*----------------------------------------------------------*/
- void Dbtc::initApiConnectFail(Signal* signal)
- {
- apiConnectptr.p->transid[0] = ttransid1;
- apiConnectptr.p->transid[1] = ttransid2;
- apiConnectptr.p->firstTcConnect = RNIL;
- apiConnectptr.p->currSavePointId = 0;
- apiConnectptr.p->lastTcConnect = RNIL;
- tblockref = calcTcBlockRef(tcNodeFailptr.p->takeOverNode);
- apiConnectptr.p->tcBlockref = tblockref;
- apiConnectptr.p->ndbapiBlockref = 0;
- apiConnectptr.p->ndbapiConnect = 0;
- apiConnectptr.p->buddyPtr = RNIL;
- setApiConTimer(apiConnectptr.i, 0, __LINE__);
- switch(ttransStatus){
- case LqhTransConf::Committed:
- jam();
- apiConnectptr.p->globalcheckpointid = tgci;
- apiConnectptr.p->apiConnectstate = CS_FAIL_COMMITTED;
- break;
- case LqhTransConf::Prepared:
- jam();
- apiConnectptr.p->apiConnectstate = CS_FAIL_PREPARED;
- break;
- case LqhTransConf::Aborted:
- jam();
- apiConnectptr.p->apiConnectstate = CS_FAIL_ABORTED;
- break;
- case LqhTransConf::Marker:
- jam();
- apiConnectptr.p->apiConnectstate = CS_FAIL_COMPLETED;
- break;
- default:
- jam();
- systemErrorLab(signal);
- }//if
- apiConnectptr.p->commitAckMarker = RNIL;
- if(LqhTransConf::getMarkerFlag(treqinfo)){
- jam();
- CommitAckMarkerPtr tmp;
- m_commitAckMarkerHash.seize(tmp);
-
- ndbrequire(tmp.i != RNIL);
-
- apiConnectptr.p->commitAckMarker = tmp.i;
- tmp.p->transid1 = ttransid1;
- tmp.p->transid2 = ttransid2;
- tmp.p->apiNodeId = refToNode(tapplRef);
- tmp.p->noOfLqhs = 1;
- tmp.p->lqhNodeId[0] = tnodeid;
- tmp.p->apiConnectPtr = apiConnectptr.i;
- m_commitAckMarkerHash.add(tmp);
- }
- }//Dbtc::initApiConnectFail()
- /*------------------------------------------------------------*/
- /* INITIALISE AT TC CONNECT AT TAKE OVER WHEN ALLOCATING*/
- /* THE TC CONNECT RECORD. */
- /*------------------------------------------------------------*/
- void Dbtc::initTcConnectFail(Signal* signal)
- {
- tcConnectptr.p->apiConnect = apiConnectptr.i;
- tcConnectptr.p->tcOprec = ttcOprec;
- Uint32 treplicaNo = LqhTransConf::getReplicaNo(treqinfo);
- for (Uint32 i = 0; i < MAX_REPLICAS; i++) {
- tcConnectptr.p->failData[i] = LqhTransConf::InvalidStatus;
- }//for
- tcConnectptr.p->tcNodedata[treplicaNo] = tnodeid;
- tcConnectptr.p->failData[treplicaNo] = ttransStatus;
- tcConnectptr.p->lastReplicaNo = LqhTransConf::getLastReplicaNo(treqinfo);
- tcConnectptr.p->dirtyOp = LqhTransConf::getDirtyFlag(treqinfo);
-
- }//Dbtc::initTcConnectFail()
- /*----------------------------------------------------------*/
- /* INITIALISE TC NODE FAIL RECORD. */
- /*----------------------------------------------------------*/
- void Dbtc::initTcFail(Signal* signal)
- {
- tcNodeFailptr.i = 0;
- ptrAss(tcNodeFailptr, tcFailRecord);
- tcNodeFailptr.p->queueIndex = 0;
- tcNodeFailptr.p->failStatus = FS_IDLE;
- }//Dbtc::initTcFail()
- /*----------------------------------------------------------*/
- /* RELEASE_TAKE_OVER */
- /*----------------------------------------------------------*/
- void Dbtc::releaseTakeOver(Signal* signal)
- {
- TcConnectRecordPtr rtoNextTcConnectptr;
- rtoNextTcConnectptr.i = apiConnectptr.p->firstTcConnect;
- do {
- jam();
- tcConnectptr.i = rtoNextTcConnectptr.i;
- ptrCheckGuard(tcConnectptr, ctcConnectFilesize, tcConnectRecord);
- rtoNextTcConnectptr.i = tcConnectptr.p->nextTcConnect;
- releaseTcConnectFail(signal);
- } while (rtoNextTcConnectptr.i != RNIL);
- releaseApiConnectFail(signal);
- }//Dbtc::releaseTakeOver()
- /*---------------------------------------------------------------------------*/
- /* SETUP_FAIL_DATA */
- /* SETUP DATA TO REUSE TAKE OVER CODE FOR HANDLING ABORT/COMMIT IN NODE */
- /* FAILURE SITUATIONS. */
- /*---------------------------------------------------------------------------*/
- void Dbtc::setupFailData(Signal* signal)
- {
- tcConnectptr.i = apiConnectptr.p->firstTcConnect;
- do {
- ptrCheckGuard(tcConnectptr, ctcConnectFilesize, tcConnectRecord);
- switch (tcConnectptr.p->tcConnectstate) {
- case OS_PREPARED:
- case OS_COMMITTING:
- jam();
- for (tindex = 0; tindex <= tcConnectptr.p->lastReplicaNo; tindex++) {
- jam();
- /*-------------------------------------------------------------------
- * KEYDATA IS USED TO KEEP AN INDICATION OF STATE IN LQH.
- * IN THIS CASE ALL LQH'S ARE PREPARED AND WAITING FOR
- * COMMIT/ABORT DECISION.
- *------------------------------------------------------------------*/
- arrGuard(tindex, 4);
- tcConnectptr.p->failData[tindex] = LqhTransConf::Prepared;
- }//for
- break;
- case OS_COMMITTED:
- case OS_COMPLETING:
- jam();
- for (tindex = 0; tindex <= tcConnectptr.p->lastReplicaNo; tindex++) {
- jam();
- /*-------------------------------------------------------------------
- * KEYDATA IS USED TO KEEP AN INDICATION OF STATE IN LQH.
- * IN THIS CASE ALL LQH'S ARE COMMITTED AND WAITING FOR
- * COMPLETE MESSAGE.
- *------------------------------------------------------------------*/
- arrGuard(tindex, 4);
- tcConnectptr.p->failData[tindex] = LqhTransConf::Committed;
- }//for
- break;
- case OS_COMPLETED:
- jam();
- for (tindex = 0; tindex <= tcConnectptr.p->lastReplicaNo; tindex++) {
- jam();
- /*-------------------------------------------------------------------
- * KEYDATA IS USED TO KEEP AN INDICATION OF STATE IN LQH.
- * IN THIS CASE ALL LQH'S ARE COMPLETED.
- *-------------------------------------------------------------------*/
- arrGuard(tindex, 4);
- tcConnectptr.p->failData[tindex] = LqhTransConf::InvalidStatus;
- }//for
- break;
- default:
- jam();
- sendSystemError(signal);
- break;
- }//switch
- if (tabortInd != ZCOMMIT_SETUP) {
- jam();
- for (UintR Ti = 0; Ti <= tcConnectptr.p->lastReplicaNo; Ti++) {
- hostptr.i = tcConnectptr.p->tcNodedata[Ti];
- ptrCheckGuard(hostptr, chostFilesize, hostRecord);
- if (hostptr.p->hostStatus != HS_ALIVE) {
- jam();
- /*-----------------------------------------------------------------
- * FAILURE OF ANY INVOLVED NODE ALWAYS INVOKES AN ABORT DECISION.
- *-----------------------------------------------------------------*/
- tabortInd = ZTRUE;
- }//if
- }//for
- }//if
- tcConnectptr.p->tcConnectstate = OS_TAKE_OVER;
- tcConnectptr.p->tcOprec = tcConnectptr.i;
- tcConnectptr.i = tcConnectptr.p->nextTcConnect;
- } while (tcConnectptr.i != RNIL);
- apiConnectptr.p->tcBlockref = cownref;
- apiConnectptr.p->currentTcConnect = apiConnectptr.p->firstTcConnect;
- tcConnectptr.i = apiConnectptr.p->firstTcConnect;
- ptrCheckGuard(tcConnectptr, ctcConnectFilesize, tcConnectRecord);
- apiConnectptr.p->currentReplicaNo = tcConnectptr.p->lastReplicaNo;
- tcurrentReplicaNo = tcConnectptr.p->lastReplicaNo;
- }//Dbtc::setupFailData()
- /*----------------------------------------------------------*/
- /* UPDATE THE STATE OF THE API CONNECT FOR THIS PART. */
- /*----------------------------------------------------------*/
- void Dbtc::updateApiStateFail(Signal* signal)
- {
- if(LqhTransConf::getMarkerFlag(treqinfo)){
- jam();
- const Uint32 marker = apiConnectptr.p->commitAckMarker;
- if(marker == RNIL){
- jam();
- CommitAckMarkerPtr tmp;
- m_commitAckMarkerHash.seize(tmp);
- ndbrequire(tmp.i != RNIL);
-
- apiConnectptr.p->commitAckMarker = tmp.i;
- tmp.p->transid1 = ttransid1;
- tmp.p->transid2 = ttransid2;
- tmp.p->apiNodeId = refToNode(tapplRef);
- tmp.p->noOfLqhs = 1;
- tmp.p->lqhNodeId[0] = tnodeid;
- tmp.p->apiConnectPtr = apiConnectptr.i;
- m_commitAckMarkerHash.add(tmp);
- } else {
- jam();
-
- CommitAckMarkerPtr tmp;
- tmp.i = marker;
- tmp.p = m_commitAckMarkerHash.getPtr(marker);
- const Uint32 noOfLqhs = tmp.p->noOfLqhs;
- ndbrequire(noOfLqhs < MAX_REPLICAS);
- tmp.p->lqhNodeId[noOfLqhs] = tnodeid;
- tmp.p->noOfLqhs = (noOfLqhs + 1);
- }
- }
- switch (ttransStatus) {
- case LqhTransConf::Committed:
- jam();
- switch (apiConnectptr.p->apiConnectstate) {
- case CS_FAIL_COMMITTING:
- case CS_FAIL_COMMITTED:
- jam();
- ndbrequire(tgci == apiConnectptr.p->globalcheckpointid);
- break;
- case CS_FAIL_PREPARED:
- jam();
- apiConnectptr.p->apiConnectstate = CS_FAIL_COMMITTING;
- apiConnectptr.p->globalcheckpointid = tgci;
- break;
- case CS_FAIL_COMPLETED:
- jam();
- apiConnectptr.p->globalcheckpointid = tgci;
- apiConnectptr.p->apiConnectstate = CS_FAIL_COMMITTED;
- break;
- default:
- jam();
- systemErrorLab(signal);
- break;
- }//switch
- break;
- case LqhTransConf::Prepared:
- jam();
- switch (apiConnectptr.p->apiConnectstate) {
- case CS_FAIL_COMMITTED:
- jam();
- apiConnectptr.p->apiConnectstate = CS_FAIL_COMMITTING;
- break;
- case CS_FAIL_ABORTED:
- jam();
- apiConnectptr.p->apiConnectstate = CS_FAIL_ABORTING;
- break;
- case CS_FAIL_COMMITTING:
- case CS_FAIL_PREPARED:
- case CS_FAIL_ABORTING:
- jam();
- /*empty*/;
- break;
- default:
- jam();
- systemErrorLab(signal);
- break;
- }//switch
- break;
- case LqhTransConf::Aborted:
- jam();
- switch (apiConnectptr.p->apiConnectstate) {
- case CS_FAIL_COMMITTING:
- case CS_FAIL_COMMITTED:
- jam();
- systemErrorLab(signal);
- break;
- case CS_FAIL_PREPARED:
- jam();
- apiConnectptr.p->apiConnectstate = CS_FAIL_ABORTING;
- break;
- case CS_FAIL_ABORTING:
- case CS_FAIL_ABORTED:
- jam();
- /*empty*/;
- break;
- default:
- jam();
- systemErrorLab(signal);
- break;
- }//switch
- break;
- case LqhTransConf::Marker:
- jam();
- break;
- default:
- jam();
- systemErrorLab(signal);
- break;
- }//switch
- }//Dbtc::updateApiStateFail()
- /*------------------------------------------------------------*/
- /* UPDATE_TC_STATE_FAIL */
- /* */
- /* WE NEED TO UPDATE THE STATUS OF TC_CONNECT RECORD AND*/
- /* WE ALSO NEED TO CHECK THAT THERE IS CONSISTENCY */
- /* BETWEEN THE DIFFERENT REPLICAS. */
- /*------------------------------------------------------------*/
- void Dbtc::updateTcStateFail(Signal* signal)
- {
- const Uint8 treplicaNo = LqhTransConf::getReplicaNo(treqinfo);
- const Uint8 tlastReplicaNo = LqhTransConf::getLastReplicaNo(treqinfo);
- const Uint8 tdirtyOp = LqhTransConf::getDirtyFlag(treqinfo);
- TcConnectRecord * regTcPtr = tcConnectptr.p;
-
- ndbrequire(regTcPtr->apiConnect == apiConnectptr.i);
- ndbrequire(regTcPtr->failData[treplicaNo] == LqhTransConf::InvalidStatus);
- ndbrequire(regTcPtr->lastReplicaNo == tlastReplicaNo);
- ndbrequire(regTcPtr->dirtyOp == tdirtyOp);
-
- regTcPtr->tcNodedata[treplicaNo] = tnodeid;
- regTcPtr->failData[treplicaNo] = ttransStatus;
- }//Dbtc::updateTcStateFail()
- void Dbtc::execTCGETOPSIZEREQ(Signal* signal)
- {
- jamEntry();
- CRASH_INSERTION(8000);
- UintR Tuserpointer = signal->theData[0]; /* DBDIH POINTER */
- BlockReference Tusersblkref = signal->theData[1];/* DBDIH BLOCK REFERENCE */
- signal->theData[0] = Tuserpointer;
- signal->theData[1] = coperationsize;
- sendSignal(Tusersblkref, GSN_TCGETOPSIZECONF, signal, 2, JBB);
- }//Dbtc::execTCGETOPSIZEREQ()
- void Dbtc::execTC_CLOPSIZEREQ(Signal* signal)
- {
- jamEntry();
- CRASH_INSERTION(8001);
- tuserpointer = signal->theData[0];
- tusersblkref = signal->theData[1];
- /* DBDIH BLOCK REFERENCE */
- coperationsize = 0;
- signal->theData[0] = tuserpointer;
- sendSignal(tusersblkref, GSN_TC_CLOPSIZECONF, signal, 1, JBB);
- }//Dbtc::execTC_CLOPSIZEREQ()
- /* ######################################################################### */
- /* ####### ERROR MODULE ####### */
- /* ######################################################################### */
- void Dbtc::tabStateErrorLab(Signal* signal)
- {
- terrorCode = ZSTATE_ERROR;
- releaseAtErrorLab(signal);
- }//Dbtc::tabStateErrorLab()
- void Dbtc::wrongSchemaVersionErrorLab(Signal* signal)
- {
- const TcKeyReq * const tcKeyReq = (TcKeyReq *)&signal->theData[0];
- TableRecordPtr tabPtr;
- tabPtr.i = tcKeyReq->tableId;
- const Uint32 schemVer = tcKeyReq->tableSchemaVersion;
- ptrCheckGuard(tabPtr, ctabrecFilesize, tableRecord);
- terrorCode = tabPtr.p->getErrorCode(schemVer);
-
- abortErrorLab(signal);
- }//Dbtc::wrongSchemaVersionErrorLab()
- void Dbtc::noFreeConnectionErrorLab(Signal* signal)
- {
- terrorCode = ZNO_FREE_TC_CONNECTION;
- abortErrorLab(signal); /* RECORD. OTHERWISE GOTO ERRORHANDLING */
- }//Dbtc::noFreeConnectionErrorLab()
- void Dbtc::aiErrorLab(Signal* signal)
- {
- terrorCode = ZLENGTH_ERROR;
- abortErrorLab(signal);
- }//Dbtc::aiErrorLab()
- void Dbtc::seizeAttrbuferrorLab(Signal* signal)
- {
- terrorCode = ZGET_ATTRBUF_ERROR;
- abortErrorLab(signal);
- }//Dbtc::seizeAttrbuferrorLab()
- void Dbtc::seizeDatabuferrorLab(Signal* signal)
- {
- terrorCode = ZGET_DATAREC_ERROR;
- releaseAtErrorLab(signal);
- }//Dbtc::seizeDatabuferrorLab()
- void Dbtc::releaseAtErrorLab(Signal* signal)
- {
- ptrGuard(tcConnectptr);
- tcConnectptr.p->tcConnectstate = OS_ABORTING;
- /*-------------------------------------------------------------------------*
- * A FAILURE OF THIS OPERATION HAS OCCURRED. THIS FAILURE WAS EITHER A
- * FAULTY PARAMETER OR A RESOURCE THAT WAS NOT AVAILABLE.
- * WE WILL ABORT THE ENTIRE TRANSACTION SINCE THIS IS THE SAFEST PATH
- * TO HANDLE THIS PROBLEM.
- * SINCE WE HAVE NOT YET CONTACTED ANY LQH WE SET NUMBER OF NODES TO ZERO
- * WE ALSO SET THE STATE TO ABORTING TO INDICATE THAT WE ARE NOT EXPECTING
- * ANY SIGNALS.
- *-------------------------------------------------------------------------*/
- tcConnectptr.p->noOfNodes = 0;
- abortErrorLab(signal);
- }//Dbtc::releaseAtErrorLab()
- void Dbtc::warningHandlerLab(Signal* signal)
- {
- ndbassert(false);
- }//Dbtc::warningHandlerLab()
- void Dbtc::systemErrorLab(Signal* signal)
- {
- progError(0, 0);
- }//Dbtc::systemErrorLab()
- /* ######################################################################### *
- * ####### SCAN MODULE ####### *
- * ######################################################################### *
- The application orders a scan of a table. We divide the scan into a scan on
- each fragment. The scan uses the primary replicas since the scan might be
- used for an update in a separate transaction.
- Scans are always done as a separate transaction. Locks from the scan
- can be overtaken by another transaction. Scans can never lock the entire
- table. Locks are released immediately after the read has been verified
- by the application. There is not even an option to leave the locks.
- The reason is that this would hurt real-time behaviour too much.
- -# The first step in handling a scan of a table is to receive all signals
- defining the scan. If failures occur during this step we release all
- resource and reply with SCAN_TABREF providing the error code.
- If system load is too high, the request will not be allowed.
-
- -# The second step retrieves the number of fragments that exist in the
- table. It also ensures that the table actually exist. After this,
- the scan is ready to be parallelised. The idea is that the receiving
- process (hereafter called delivery process) will start up a number
- of scan processes. Each of these scan processes will
- independently scan one fragment at a time. The delivery
- process object is the scan record and the scan process object is
- the scan fragment record plus the scan operation record.
-
- -# The third step is thus performed in parallel. In the third step each
- scan process retrieves the primary replica of the fragment it will
- scan. Then it starts the scan as soon as the load on that node permits.
-
- The LQH returns either when it retrieved the maximum number of tuples or
- when it has retrived at least one tuple and is hindered by a lock to
- retrieve the next tuple. This is to ensure that a scan process never
- can be involved in a deadlock situation.
-
- When the scan process receives a number of tuples to report to the
- application it checks the state of the delivery process. Only one delivery
- at a time is handled by the application. Thus if the delivery process
- has already sent a number of tuples to the application this set of tuples
- are queued.
-
- When the application requests the next set of tuples it is immediately
- delivered if any are queued, otherwise it waits for the next scan
- process that is ready to deliver.
- ERROR HANDLING
-
- As already mentioned it is rather easy to handle errors before the scan
- processes have started. In this case it is enough to release the resources
- and send SCAN_TAB_REF.
-
- If an error occurs in any of the scan processes then we have to stop all
- scan processes. We do however only stop the delivery process and ask
- the api to order us to close the scan. The reason is that we can easily
- enter into difficult timing problems since the application and this
- block is out of synch we will thus always start by report the error to
- the application and wait for a close request. This error report uses the
- SCAN_TABREF signal with a special error code that the api must check for.
-
-
- CLOSING AN ACTIVE SCAN
-
- The application can close a scan for several reasons before it is completed.
- One reason was mentioned above where an error in a scan process led to a
- request to close the scan. Another reason could simply be that the
- application found what it looked for and is thus not interested in the
- rest of the scan.
- IT COULD ALSO BE DEPENDENT ON INTERNAL ERRORS IN THE API.
-
- When a close scan request is received, all scan processes are stopped and all
- resources belonging to those scan processes are released. Stopping the scan
- processes most often includes communication with an LQH where the local scan
- is controlled. Finally all resources belonging to the scan is released and
- the SCAN_TABCONF is sent with an indication of that the scan is closed.
-
-
- CLOSING A COMPLETED SCAN
-
- When all scan processes are completed then a report is sent to the
- application which indicates that no more tuples can be fetched.
- The application will send a close scan and the same action as when
- closing an active scan is performed.
- In this case it will of course not find any active scan processes.
- It will even find all scan processes already released.
-
- The reason for requiring the api to close the scan is the same as above.
- It is to avoid any timing problems due to that the api and this block
- is out of synch.
-
- * ######################################################################## */
- void Dbtc::execSCAN_TABREQ(Signal* signal)
- {
- const ScanTabReq * const scanTabReq = (ScanTabReq *)&signal->theData[0];
- const Uint32 reqinfo = scanTabReq->requestInfo;
- const Uint32 aiLength = (scanTabReq->attrLenKeyLen & 0xFFFF);
- const Uint32 keyLen = scanTabReq->attrLenKeyLen >> 16;
- const Uint32 schemaVersion = scanTabReq->tableSchemaVersion;
- const Uint32 transid1 = scanTabReq->transId1;
- const Uint32 transid2 = scanTabReq->transId2;
- const Uint32 tmpXX = scanTabReq->buddyConPtr;
- const Uint32 buddyPtr = (tmpXX == 0xFFFFFFFF ? RNIL : tmpXX);
- Uint32 currSavePointId = 0;
-
- Uint32 scanConcurrency = scanTabReq->getParallelism(reqinfo);
- Uint32 noOprecPerFrag = ScanTabReq::getScanBatch(reqinfo);
- Uint32 scanParallel = scanConcurrency;
- Uint32 errCode;
- ScanRecordPtr scanptr;
- jamEntry();
- SegmentedSectionPtr api_op_ptr;
- signal->getSection(api_op_ptr, 0);
- copy(&cdata[0], api_op_ptr);
- releaseSections(signal);
- apiConnectptr.i = scanTabReq->apiConnectPtr;
- tabptr.i = scanTabReq->tableId;
- if (apiConnectptr.i >= capiConnectFilesize)
- {
- jam();
- warningHandlerLab(signal);
- return;
- }//if
- ptrAss(apiConnectptr, apiConnectRecord);
- ApiConnectRecord * transP = apiConnectptr.p;
- if (transP->apiConnectstate != CS_CONNECTED) {
- jam();
- // could be left over from TCKEYREQ rollback
- if (transP->apiConnectstate == CS_ABORTING &&
- transP->abortState == AS_IDLE) {
- jam();
- } else if(transP->apiConnectstate == CS_STARTED &&
- transP->firstTcConnect == RNIL){
- jam();
- // left over from simple/dirty read
- } else {
- jam();
- errCode = ZSTATE_ERROR;
- goto SCAN_TAB_error_no_state_change;
- }
- }
- if(tabptr.i >= ctabrecFilesize)
- {
- errCode = ZUNKNOWN_TABLE_ERROR;
- goto SCAN_TAB_error;
- }
- ptrAss(tabptr, tableRecord);
- if ((aiLength == 0) ||
- (!tabptr.p->checkTable(schemaVersion)) ||
- (scanConcurrency == 0) ||
- (cfirstfreeTcConnect == RNIL) ||
- (cfirstfreeScanrec == RNIL)) {
- goto SCAN_error_check;
- }
- if (buddyPtr != RNIL) {
- jam();
- ApiConnectRecordPtr buddyApiPtr;
- buddyApiPtr.i = buddyPtr;
- ptrCheckGuard(buddyApiPtr, capiConnectFilesize, apiConnectRecord);
- if ((transid1 == buddyApiPtr.p->transid[0]) &&
- (transid2 == buddyApiPtr.p->transid[1])) {
- jam();
-
- if (buddyApiPtr.p->apiConnectstate == CS_ABORTING) {
- // transaction has been aborted
- jam();
- errCode = buddyApiPtr.p->returncode;
- goto SCAN_TAB_error;
- }//if
- currSavePointId = buddyApiPtr.p->currSavePointId;
- buddyApiPtr.p->currSavePointId++;
- }
- }
-
- seizeTcConnect(signal);
- tcConnectptr.p->apiConnect = apiConnectptr.i;
- tcConnectptr.p->tcConnectstate = OS_WAIT_SCAN;
- apiConnectptr.p->lastTcConnect = tcConnectptr.i;
- seizeCacheRecord(signal);
- cachePtr.p->keylen = keyLen;
- cachePtr.p->save1 = 0;
- scanptr = seizeScanrec(signal);
- ndbrequire(transP->apiScanRec == RNIL);
- ndbrequire(scanptr.p->scanApiRec == RNIL);
- initScanrec(scanptr, scanTabReq, scanParallel, noOprecPerFrag);
- transP->apiScanRec = scanptr.i;
- transP->returncode = 0;
- transP->transid[0] = transid1;
- transP->transid[1] = transid2;
- transP->buddyPtr = buddyPtr;
- // The scan is started
- transP->apiConnectstate = CS_START_SCAN;
- transP->currSavePointId = currSavePointId;
- /**********************************************************
- * We start the timer on scanRec to be able to discover a
- * timeout in the API the API now is in charge!
- ***********************************************************/
- setApiConTimer(apiConnectptr.i, ctcTimer, __LINE__);
- updateBuddyTimer(apiConnectptr);
- /***********************************************************
- * WE HAVE NOW RECEIVED ALL REFERENCES TO SCAN OBJECTS IN
- * THE API. WE ARE NOW READY TO RECEIVE THE ATTRIBUTE INFO
- * IF ANY TO RECEIVE.
- **********************************************************/
- scanptr.p->scanState = ScanRecord::WAIT_AI;
- return;
- SCAN_error_check:
- if (aiLength == 0) {
- jam()
- errCode = ZSCAN_AI_LEN_ERROR;
- goto SCAN_TAB_error;
- }//if
- if (!tabptr.p->checkTable(schemaVersion)){
- jam();
- errCode = tabptr.p->getErrorCode(schemaVersion);
- goto SCAN_TAB_error;
- }//if
- if (scanConcurrency == 0) {
- jam();
- errCode = ZNO_CONCURRENCY_ERROR;
- goto SCAN_TAB_error;
- }//if
- if (cfirstfreeTcConnect == RNIL) {
- jam();
- errCode = ZNO_FREE_TC_CONNECTION;
- goto SCAN_TAB_error;
- }//if
- ndbrequire(cfirstfreeScanrec == RNIL);
- jam();
- errCode = ZNO_SCANREC_ERROR;
- goto SCAN_TAB_error;
-
- SCAN_TAB_error:
- jam();
- /**
- * Prepare for up coming ATTRINFO/KEYINFO
- */
- transP->apiConnectstate = CS_ABORTING;
- transP->abortState = AS_IDLE;
- transP->transid[0] = transid1;
- transP->transid[1] = transid2;
-
- SCAN_TAB_error_no_state_change:
-
- ScanTabRef * ref = (ScanTabRef*)&signal->theData[0];
- ref->apiConnectPtr = transP->ndbapiConnect;
- ref->transId1 = transid1;
- ref->transId2 = transid2;
- ref->errorCode = errCode;
- ref->closeNeeded = 0;
- sendSignal(transP->ndbapiBlockref, GSN_SCAN_TABREF,
- signal, ScanTabRef::SignalLength, JBB);
- return;
- }//Dbtc::execSCAN_TABREQ()
- void Dbtc::initScanrec(ScanRecordPtr scanptr,
- const ScanTabReq * scanTabReq,
- UintR scanParallel,
- UintR noOprecPerFrag)
- {
- scanptr.p->scanTcrec = tcConnectptr.i;
- scanptr.p->scanApiRec = apiConnectptr.i;
- scanptr.p->scanAiLength = scanTabReq->attrLenKeyLen & 0xFFFF;
- scanptr.p->scanKeyLen = scanTabReq->attrLenKeyLen >> 16;
- scanptr.p->scanTableref = tabptr.i;
- scanptr.p->scanSchemaVersion = scanTabReq->tableSchemaVersion;
- scanptr.p->scanParallel = scanParallel;
- scanptr.p->first_batch_size_rows = scanTabReq->first_batch_size;
- scanptr.p->batch_byte_size = scanTabReq->batch_byte_size;
- scanptr.p->batch_size_rows = noOprecPerFrag;
- Uint32 tmp = 0;
- const UintR ri = scanTabReq->requestInfo;
- ScanFragReq::setLockMode(tmp, ScanTabReq::getLockMode(ri));
- ScanFragReq::setHoldLockFlag(tmp, ScanTabReq::getHoldLockFlag(ri));
- ScanFragReq::setKeyinfoFlag(tmp, ScanTabReq::getKeyinfoFlag(ri));
- ScanFragReq::setReadCommittedFlag(tmp,ScanTabReq::getReadCommittedFlag(ri));
- ScanFragReq::setRangeScanFlag(tmp, ScanTabReq::getRangeScanFlag(ri));
- ScanFragReq::setAttrLen(tmp, scanTabReq->attrLenKeyLen & 0xFFFF);
-
- scanptr.p->scanRequestInfo = tmp;
- scanptr.p->scanStoredProcId = scanTabReq->storedProcId;
- scanptr.p->scanState = ScanRecord::RUNNING;
- scanptr.p->m_queued_count = 0;
- ScanFragList list(c_scan_frag_pool,
- scanptr.p->m_running_scan_frags);
- for (Uint32 i = 0; i < scanParallel; i++) {
- jam();
- ScanFragRecPtr ptr;
- ndbrequire(list.seize(ptr));
- ptr.p->scanRec = scanptr.i;
- ptr.p->scanFragId = 0;
- ptr.p->m_apiPtr = cdata[i];
- }//for
- (* (ScanTabReq::getRangeScanFlag(ri) ?
- &c_counters.c_range_scan_count :
- &c_counters.c_scan_count))++;
- }//Dbtc::initScanrec()
- void Dbtc::scanTabRefLab(Signal* signal, Uint32 errCode)
- {
- ScanTabRef * ref = (ScanTabRef*)&signal->theData[0];
- ref->apiConnectPtr = apiConnectptr.p->ndbapiConnect;
- ref->transId1 = apiConnectptr.p->transid[0];
- ref->transId2 = apiConnectptr.p->transid[1];
- ref->errorCode = errCode;
- ref->closeNeeded = 0;
- sendSignal(apiConnectptr.p->ndbapiBlockref, GSN_SCAN_TABREF,
- signal, ScanTabRef::SignalLength, JBB);
- }//Dbtc::scanTabRefLab()
- /*---------------------------------------------------------------------------*/
- /* */
- /* RECEPTION OF ATTRINFO FOR SCAN TABLE REQUEST. */
- /*---------------------------------------------------------------------------*/
- void Dbtc::scanAttrinfoLab(Signal* signal, UintR Tlen)
- {
- ScanRecordPtr scanptr;
- scanptr.i = apiConnectptr.p->apiScanRec;
- ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord);
- tcConnectptr.i = scanptr.p->scanTcrec;
- ptrCheckGuard(tcConnectptr, ctcConnectFilesize, tcConnectRecord);
- cachePtr.i = apiConnectptr.p->cachePtr;
- ptrCheckGuard(cachePtr, ccacheFilesize, cacheRecord);
- CacheRecord * const regCachePtr = cachePtr.p;
- ndbrequire(scanptr.p->scanState == ScanRecord::WAIT_AI);
- regCachePtr->currReclenAi = regCachePtr->currReclenAi + Tlen;
- if (regCachePtr->currReclenAi < scanptr.p->scanAiLength) {
- if (cfirstfreeAttrbuf == RNIL) {
- goto scanAttrinfo_attrbuf_error;
- }//if
- saveAttrbuf(signal);
- } else {
- if (regCachePtr->currReclenAi > scanptr.p->scanAiLength) {
- goto scanAttrinfo_len_error;
- } else {
- /* CURR_RECLEN_AI = SCAN_AI_LENGTH */
- if (cfirstfreeAttrbuf == RNIL) {
- goto scanAttrinfo_attrbuf2_error;
- }//if
- saveAttrbuf(signal);
- /**************************************************
- * WE HAVE NOW RECEIVED ALL INFORMATION CONCERNING
- * THIS SCAN. WE ARE READY TO START THE ACTUAL
- * EXECUTION OF THE SCAN QUERY
- **************************************************/
- diFcountReqLab(signal, scanptr);
- return;
- }//if
- }//if
- return;
- scanAttrinfo_attrbuf_error:
- jam();
- abortScanLab(signal, scanptr, ZGET_ATTRBUF_ERROR);
- return;
- scanAttrinfo_attrbuf2_error:
- jam();
- abortScanLab(signal, scanptr, ZGET_ATTRBUF_ERROR);
- return;
- scanAttrinfo_len_error:
- jam();
- abortScanLab(signal, scanptr, ZLENGTH_ERROR);
- return;
- }//Dbtc::scanAttrinfoLab()
- void Dbtc::diFcountReqLab(Signal* signal, ScanRecordPtr scanptr)
- {
- /**
- * Check so that the table is not being dropped
- */
- TableRecordPtr tabPtr;
- tabPtr.i = scanptr.p->scanTableref;
- tabPtr.p = &tableRecord[tabPtr.i];
- if (tabPtr.p->checkTable(scanptr.p->scanSchemaVersion)){
- ;
- } else {
- abortScanLab(signal, scanptr,
- tabPtr.p->getErrorCode(scanptr.p->scanSchemaVersion));
- return;
- }
- scanptr.p->scanState = ScanRecord::WAIT_FRAGMENT_COUNT;
- /*************************************************
- * THE FIRST STEP TO RECEIVE IS SUCCESSFULLY COMPLETED.
- * WE MUST FIRST GET THE NUMBER OF FRAGMENTS IN THE TABLE.
- ***************************************************/
- signal->theData[0] = tcConnectptr.p->dihConnectptr;
- signal->theData[1] = scanptr.p->scanTableref;
- sendSignal(cdihblockref, GSN_DI_FCOUNTREQ, signal, 2, JBB);
- return;
- }//Dbtc::diFcountReqLab()
- /********************************************************************
- * execDI_FCOUNTCONF
- *
- * WE HAVE ASKED DIH ABOUT THE NUMBER OF FRAGMENTS IN THIS TABLE.
- * WE WILL NOW START A NUMBER OF PARALLEL SCAN PROCESSES. EACH OF
- * THESE WILL SCAN ONE FRAGMENT AT A TIME. THEY WILL CONTINUE THIS
- * UNTIL THERE ARE NO MORE FRAGMENTS TO SCAN OR UNTIL THE APPLICATION
- * CLOSES THE SCAN.
- ********************************************************************/
- void Dbtc::execDI_FCOUNTCONF(Signal* signal)
- {
- jamEntry();
- tcConnectptr.i = signal->theData[0];
- const UintR tfragCount = signal->theData[1];
- ptrCheckGuard(tcConnectptr, ctcConnectFilesize, tcConnectRecord);
- apiConnectptr.i = tcConnectptr.p->apiConnect;
- ptrCheckGuard(apiConnectptr, capiConnectFilesize, apiConnectRecord);
- ScanRecordPtr scanptr;
- scanptr.i = apiConnectptr.p->apiScanRec;
- ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord);
- ndbrequire(scanptr.p->scanState == ScanRecord::WAIT_FRAGMENT_COUNT);
- if (apiConnectptr.p->apiFailState == ZTRUE) {
- jam();
- releaseScanResources(scanptr);
- handleApiFailState(signal, apiConnectptr.i);
- return;
- }//if
- if (tfragCount == 0) {
- jam();
- abortScanLab(signal, scanptr, ZNO_FRAGMENT_ERROR);
- return;
- }//if
-
- /**
- * Check so that the table is not being dropped
- */
- TableRecordPtr tabPtr;
- tabPtr.i = scanptr.p->scanTableref;
- tabPtr.p = &tableRecord[tabPtr.i];
- if (tabPtr.p->checkTable(scanptr.p->scanSchemaVersion)){
- ;
- } else {
- abortScanLab(signal, scanptr,
- tabPtr.p->getErrorCode(scanptr.p->scanSchemaVersion));
- return;
- }
- if(scanptr.p->scanParallel > tfragCount){
- jam();
- abortScanLab(signal, scanptr, ZTOO_HIGH_CONCURRENCY_ERROR);
- return;
- }
-
- scanptr.p->scanParallel = tfragCount;
- scanptr.p->scanNoFrag = tfragCount;
- scanptr.p->scanNextFragId = 0;
- scanptr.p->scanState = ScanRecord::RUNNING;
- setApiConTimer(apiConnectptr.i, 0, __LINE__);
- updateBuddyTimer(apiConnectptr);
-
- ScanFragRecPtr ptr;
- ScanFragList list(c_scan_frag_pool,
- scanptr.p->m_running_scan_frags);
- for (list.first(ptr); !ptr.isNull(); list.next(ptr)){
- jam();
- ptr.p->lqhBlockref = 0;
- ptr.p->startFragTimer(ctcTimer);
- ptr.p->scanFragId = scanptr.p->scanNextFragId++;
- ptr.p->scanFragState = ScanFragRec::WAIT_GET_PRIMCONF;
- ptr.p->startFragTimer(ctcTimer);
- signal->theData[0] = tcConnectptr.p->dihConnectptr;
- signal->theData[1] = ptr.i;
- signal->theData[2] = scanptr.p->scanTableref;
- signal->theData[3] = ptr.p->scanFragId;
- sendSignal(cdihblockref, GSN_DIGETPRIMREQ, signal, 4, JBB);
- }//for
- }//Dbtc::execDI_FCOUNTCONF()
- /******************************************************
- * execDI_FCOUNTREF
- ******************************************************/
- void Dbtc::execDI_FCOUNTREF(Signal* signal)
- {
- jamEntry();
- tcConnectptr.i = signal->theData[0];
- ptrCheckGuard(tcConnectptr, ctcConnectFilesize, tcConnectRecord);
- const Uint32 errCode = signal->theData[1];
- apiConnectptr.i = tcConnectptr.p->apiConnect;
- ptrCheckGuard(apiConnectptr, capiConnectFilesize, apiConnectRecord);
- ScanRecordPtr scanptr;
- scanptr.i = apiConnectptr.p->apiScanRec;
- ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord);
- ndbrequire(scanptr.p->scanState == ScanRecord::WAIT_FRAGMENT_COUNT);
- if (apiConnectptr.p->apiFailState == ZTRUE) {
- jam();
- releaseScanResources(scanptr);
- handleApiFailState(signal, apiConnectptr.i);
- return;
- }//if
- abortScanLab(signal, scanptr, errCode);
- }//Dbtc::execDI_FCOUNTREF()
- void Dbtc::abortScanLab(Signal* signal, ScanRecordPtr scanptr, Uint32 errCode)
- {
- scanTabRefLab(signal, errCode);
- releaseScanResources(scanptr);
- }//Dbtc::abortScanLab()
- void Dbtc::releaseScanResources(ScanRecordPtr scanPtr)
- {
- if (apiConnectptr.p->cachePtr != RNIL) {
- cachePtr.i = apiConnectptr.p->cachePtr;
- ptrCheckGuard(cachePtr, ccacheFilesize, cacheRecord);
- releaseKeys();
- releaseAttrinfo();
- }//if
- tcConnectptr.i = scanPtr.p->scanTcrec;
- ptrCheckGuard(tcConnectptr, ctcConnectFilesize, tcConnectRecord);
- releaseTcCon();
- ndbrequire(scanPtr.p->m_running_scan_frags.isEmpty());
- ndbrequire(scanPtr.p->m_queued_scan_frags.isEmpty());
- ndbrequire(scanPtr.p->m_delivered_scan_frags.isEmpty());
- ndbassert(scanPtr.p->scanApiRec == apiConnectptr.i);
- ndbassert(apiConnectptr.p->apiScanRec == scanPtr.i);
-
- // link into free list
- scanPtr.p->nextScan = cfirstfreeScanrec;
- scanPtr.p->scanState = ScanRecord::IDLE;
- scanPtr.p->scanTcrec = RNIL;
- scanPtr.p->scanApiRec = RNIL;
- cfirstfreeScanrec = scanPtr.i;
-
- apiConnectptr.p->apiScanRec = RNIL;
- apiConnectptr.p->apiConnectstate = CS_CONNECTED;
- setApiConTimer(apiConnectptr.i, 0, __LINE__);
- }//Dbtc::releaseScanResources()
- /****************************************************************
- * execDIGETPRIMCONF
- *
- * WE HAVE RECEIVED THE PRIMARY NODE OF THIS FRAGMENT.
- * WE ARE NOW READY TO ASK FOR PERMISSION TO LOAD THIS
- * SPECIFIC NODE WITH A SCAN OPERATION.
- ****************************************************************/
- void Dbtc::execDIGETPRIMCONF(Signal* signal)
- {
- jamEntry();
- // tcConnectptr.i in theData[0] is not used
- scanFragptr.i = signal->theData[1];
- c_scan_frag_pool.getPtr(scanFragptr);
- tnodeid = signal->theData[2];
- arrGuard(tnodeid, MAX_NDB_NODES);
- ndbrequire(scanFragptr.p->scanFragState == ScanFragRec::WAIT_GET_PRIMCONF);
- scanFragptr.p->stopFragTimer();
- ScanRecordPtr scanptr;
- scanptr.i = scanFragptr.p->scanRec;
- ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord);
- /**
- * This must be false as select count(*) otherwise
- * can "pass" committing on backup fragments and
- * get incorrect row count
- */
- if(false && ScanFragReq::getReadCommittedFlag(scanptr.p->scanRequestInfo))
- {
- jam();
- Uint32 max = 3+signal->theData[6];
- Uint32 nodeid = getOwnNodeId();
- for(Uint32 i = 3; i<max; i++)
- if(signal->theData[i] == nodeid)
- {
- jam();
- tnodeid = nodeid;
- break;
- }
- }
-
- {
- /**
- * Check table
- */
- TableRecordPtr tabPtr;
- tabPtr.i = scanptr.p->scanTableref;
- ptrAss(tabPtr, tableRecord);
- Uint32 schemaVersion = scanptr.p->scanSchemaVersion;
- if(tabPtr.p->checkTable(schemaVersion) == false){
- jam();
- ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags);
-
- run.release(scanFragptr);
- scanError(signal, scanptr, tabPtr.p->getErrorCode(schemaVersion));
- return;
- }
- }
-
- tcConnectptr.i = scanptr.p->scanTcrec;
- ptrCheckGuard(tcConnectptr, ctcConnectFilesize, tcConnectRecord);
- apiConnectptr.i = scanptr.p->scanApiRec;
- ptrCheckGuard(apiConnectptr, capiConnectFilesize, apiConnectRecord);
- cachePtr.i = apiConnectptr.p->cachePtr;
- ptrCheckGuard(cachePtr, ccacheFilesize, cacheRecord);
- switch (scanptr.p->scanState) {
- case ScanRecord::CLOSING_SCAN:
- jam();
- updateBuddyTimer(apiConnectptr);
- {
- ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags);
-
- run.release(scanFragptr);
- }
- close_scan_req_send_conf(signal, scanptr);
- return;
- default:
- jam();
- /*empty*/;
- break;
- }//switch
- Uint32 ref = calcLqhBlockRef(tnodeid);
- scanFragptr.p->lqhBlockref = ref;
- scanFragptr.p->m_connectCount = getNodeInfo(tnodeid).m_connectCount;
- sendScanFragReq(signal, scanptr.p, scanFragptr.p);
- if(ERROR_INSERTED(8035))
- globalTransporterRegistry.performSend();
- attrbufptr.i = cachePtr.p->firstAttrbuf;
- while (attrbufptr.i != RNIL) {
- jam();
- ptrCheckGuard(attrbufptr, cattrbufFilesize, attrbufRecord);
- sendAttrinfo(signal,
- scanFragptr.i,
- attrbufptr.p,
- ref);
- attrbufptr.i = attrbufptr.p->attrbuf[ZINBUF_NEXT];
- if(ERROR_INSERTED(8035))
- globalTransporterRegistry.performSend();
- }//while
- scanFragptr.p->scanFragState = ScanFragRec::LQH_ACTIVE;
- scanFragptr.p->startFragTimer(ctcTimer);
- updateBuddyTimer(apiConnectptr);
- /*********************************************
- * WE HAVE NOW STARTED A FRAGMENT SCAN. NOW
- * WAIT FOR THE FIRST SCANNED RECORDS
- *********************************************/
- }//Dbtc::execDIGETPRIMCONF
- /***************************************************
- * execDIGETPRIMREF
- *
- * WE ARE NOW FORCED TO STOP THE SCAN. THIS ERROR
- * IS NOT RECOVERABLE SINCE THERE IS A PROBLEM WITH
- * FINDING A PRIMARY REPLICA OF A CERTAIN FRAGMENT.
- ***************************************************/
- void Dbtc::execDIGETPRIMREF(Signal* signal)
- {
- jamEntry();
- // tcConnectptr.i in theData[0] is not used.
- scanFragptr.i = signal->theData[1];
- const Uint32 errCode = signal->theData[2];
- c_scan_frag_pool.getPtr(scanFragptr);
- ndbrequire(scanFragptr.p->scanFragState == ScanFragRec::WAIT_GET_PRIMCONF);
- ScanRecordPtr scanptr;
- scanptr.i = scanFragptr.p->scanRec;
- ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord);
- ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags);
-
- run.release(scanFragptr);
- scanError(signal, scanptr, errCode);
- }//Dbtc::execDIGETPRIMREF()
- /**
- * Dbtc::execSCAN_FRAGREF
- * Our attempt to scan a fragment was refused
- * set error code and close all other fragment
- * scan's belonging to this scan
- */
- void Dbtc::execSCAN_FRAGREF(Signal* signal)
- {
- const ScanFragRef * const ref = (ScanFragRef *)&signal->theData[0];
-
- jamEntry();
- const Uint32 errCode = ref->errorCode;
- scanFragptr.i = ref->senderData;
- c_scan_frag_pool.getPtr(scanFragptr);
- ScanRecordPtr scanptr;
- scanptr.i = scanFragptr.p->scanRec;
- ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord);
- apiConnectptr.i = scanptr.p->scanApiRec;
- ptrCheckGuard(apiConnectptr, capiConnectFilesize, apiConnectRecord);
- Uint32 transid1 = apiConnectptr.p->transid[0] ^ ref->transId1;
- Uint32 transid2 = apiConnectptr.p->transid[1] ^ ref->transId2;
- transid1 = transid1 | transid2;
- if (transid1 != 0) {
- jam();
- systemErrorLab(signal);
- }//if
- /**
- * Set errorcode, close connection to this lqh fragment,
- * stop fragment timer and call scanFragError to start
- * close of the other fragment scans
- */
- ndbrequire(scanFragptr.p->scanFragState == ScanFragRec::LQH_ACTIVE);
- {
- scanFragptr.p->scanFragState = ScanFragRec::COMPLETED;
- ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags);
-
- run.release(scanFragptr);
- scanFragptr.p->stopFragTimer();
- }
- scanError(signal, scanptr, errCode);
- }//Dbtc::execSCAN_FRAGREF()
- /**
- * Dbtc::scanError
- *
- * Called when an error occurs during
- */
- void Dbtc::scanError(Signal* signal, ScanRecordPtr scanptr, Uint32 errorCode)
- {
- jam();
- ScanRecord* scanP = scanptr.p;
-
- DEBUG("scanError, errorCode = "<< errorCode <<
- ", scanState = " << scanptr.p->scanState);
- apiConnectptr.i = scanP->scanApiRec;
- ptrCheckGuard(apiConnectptr, capiConnectFilesize, apiConnectRecord);
- ndbrequire(apiConnectptr.p->apiScanRec == scanptr.i);
- if(scanP->scanState == ScanRecord::CLOSING_SCAN){
- jam();
- close_scan_req_send_conf(signal, scanptr);
- return;
- }
-
- ndbrequire(scanP->scanState == ScanRecord::RUNNING);
-
- /**
- * Close scan wo/ having received an order to do so
- */
- close_scan_req(signal, scanptr, false);
-
- const bool apiFail = (apiConnectptr.p->apiFailState == ZTRUE);
- if(apiFail){
- jam();
- return;
- }
-
- ScanTabRef * ref = (ScanTabRef*)&signal->theData[0];
- ref->apiConnectPtr = apiConnectptr.p->ndbapiConnect;
- ref->transId1 = apiConnectptr.p->transid[0];
- ref->transId2 = apiConnectptr.p->transid[1];
- ref->errorCode = errorCode;
- ref->closeNeeded = 1;
- sendSignal(apiConnectptr.p->ndbapiBlockref, GSN_SCAN_TABREF,
- signal, ScanTabRef::SignalLength, JBB);
- }//Dbtc::scanError()
- /************************************************************
- * execSCAN_FRAGCONF
- *
- * A NUMBER OF OPERATIONS HAVE BEEN COMPLETED IN THIS
- * FRAGMENT. TAKE CARE OF AND ISSUE FURTHER ACTIONS.
- ************************************************************/
- void Dbtc::execSCAN_FRAGCONF(Signal* signal)
- {
- Uint32 transid1, transid2, total_len;
- jamEntry();
- const ScanFragConf * const conf = (ScanFragConf*)&signal->theData[0];
- const Uint32 noCompletedOps = conf->completedOps;
- const Uint32 status = conf->fragmentCompleted;
- scanFragptr.i = conf->senderData;
- c_scan_frag_pool.getPtr(scanFragptr);
-
- ScanRecordPtr scanptr;
- scanptr.i = scanFragptr.p->scanRec;
- ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord);
-
- apiConnectptr.i = scanptr.p->scanApiRec;
- ptrCheckGuard(apiConnectptr, capiConnectFilesize, apiConnectRecord);
- transid1 = apiConnectptr.p->transid[0] ^ conf->transId1;
- transid2 = apiConnectptr.p->transid[1] ^ conf->transId2;
- total_len= conf->total_len;
- transid1 = transid1 | transid2;
- if (transid1 != 0) {
- jam();
- systemErrorLab(signal);
- }//if
-
- ndbrequire(scanFragptr.p->scanFragState == ScanFragRec::LQH_ACTIVE);
-
- if(scanptr.p->scanState == ScanRecord::CLOSING_SCAN){
- jam();
- if(status == 0){
- /**
- * We have started closing = we sent a close -> ignore this
- */
- return;
- } else {
- jam();
- ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags);
-
- run.release(scanFragptr);
- scanFragptr.p->stopFragTimer();
- scanFragptr.p->scanFragState = ScanFragRec::COMPLETED;
- }
- close_scan_req_send_conf(signal, scanptr);
- return;
- }
- if(noCompletedOps == 0 && status != 0 &&
- scanptr.p->scanNextFragId < scanptr.p->scanNoFrag){
- /**
- * Start on next fragment
- */
- scanFragptr.p->scanFragState = ScanFragRec::WAIT_GET_PRIMCONF;
- scanFragptr.p->startFragTimer(ctcTimer);
- tcConnectptr.i = scanptr.p->scanTcrec;
- ptrCheckGuard(tcConnectptr, ctcConnectFilesize, tcConnectRecord);
- scanFragptr.p->scanFragId = scanptr.p->scanNextFragId++;
- signal->theData[0] = tcConnectptr.p->dihConnectptr;
- signal->theData[1] = scanFragptr.i;
- signal->theData[2] = scanptr.p->scanTableref;
- signal->theData[3] = scanFragptr.p->scanFragId;
- sendSignal(cdihblockref, GSN_DIGETPRIMREQ, signal, 4, JBB);
- return;
- }
- /*
- Uint32 totalLen = 0;
- for(Uint32 i = 0; i<noCompletedOps; i++){
- Uint32 tmp = conf->opReturnDataLen[i];
- totalLen += tmp;
- }
- */
- {
- ScanFragList run(c_scan_frag_pool, scanptr.p->m_running_scan_frags);
- ScanFragList queued(c_scan_frag_pool, scanptr.p->m_queued_scan_frags);
-
- run.remove(scanFragptr);
- queued.add(scanFragptr);
- scanptr.p->m_queued_count++;
- }
- scanFragptr.p->m_scan_frag_conf_status = status;
- scanFragptr.p->m_ops = noCompletedOps;
- scanFragptr.p->m_totalLen = total_len;
- scanFragptr.p->scanFragState = ScanFragRec::QUEUED_FOR_DELIVERY;
- scanFragptr.p->stopFragTimer();
-
- if(scanptr.p->m_queued_count > /** Min */ 0){
- jam();
- sendScanTabConf(signal, scanptr);
- }
- }//Dbtc::execSCAN_FRAGCONF()
- /****************************************************************************
- * execSCAN_NEXTREQ
- *
- * THE APPLICATION HAVE PROCESSED THE TUPLES TRANSFERRED AND IS NOW READY FOR
- * MORE. THIS SIGNAL IS ALSO USED TO CLOSE THE SCAN.
- ****************************************************************************/
- void Dbtc::execSCAN_NEXTREQ(Signal* signal)
- {
- const ScanNextReq * const req = (ScanNextReq *)&signal->theData[0];
- const UintR transid1 = req->transId1;
- const UintR transid2 = req->transId2;
- const UintR stopScan = req->stopScan;
- jamEntry();
- apiConnectptr.i = req->apiConnectPtr;
- if (apiConnectptr.i >= capiConnectFilesize) {
- jam();
- warningHandlerLab(signal);
- return;
- }//if
- ptrAss(apiConnectptr, apiConnectRecord);
- /**
- * Check transid
- */
- const UintR ctransid1 = apiConnectptr.p->transid[0] ^ transid1;
- const UintR ctransid2 = apiConnectptr.p->transid[1] ^ transid2;
- if ((ctransid1 | ctransid2) != 0){
- ScanTabRef * ref = (ScanTabRef*)&signal->theData[0];
- ref->apiConnectPtr = apiConnectptr.p->ndbapiConnect;
- ref->transId1 = transid1;
- ref->transId2 = transid2;
- ref->errorCode = ZSTATE_ERROR;
- ref->closeNeeded = 0;
- sendSignal(signal->senderBlockRef(), GSN_SCAN_TABREF,
- signal, ScanTabRef::SignalLength, JBB);
- DEBUG("Wrong transid");
- return;
- }
- /**
- * Check state of API connection
- */
- if (apiConnectptr.p->apiConnectstate != CS_START_SCAN) {
- jam();
- if (apiConnectptr.p->apiConnectstate == CS_CONNECTED) {
- jam();
- /*********************************************************************
- * The application sends a SCAN_NEXTREQ after experiencing a time-out.
- * We will send a SCAN_TABREF to indicate a time-out occurred.
- *********************************************************************/
- DEBUG("scanTabRefLab: ZSCANTIME_OUT_ERROR2");
- ndbout_c("apiConnectptr(%d) -> abort", apiConnectptr.i);
- ndbrequire(false); //B2 indication of strange things going on
- scanTabRefLab(signal, ZSCANTIME_OUT_ERROR2);
- return;
- }
- DEBUG("scanTabRefLab: ZSTATE_ERROR");
- DEBUG(" apiConnectstate="<<apiConnectptr.p->apiConnectstate);
- ndbrequire(false); //B2 indication of strange things going on
- scanTabRefLab(signal, ZSTATE_ERROR);
- return;
- }//if
-
- /*******************************************************
- * START THE ACTUAL LOGIC OF SCAN_NEXTREQ.
- ********************************************************/
- // Stop the timer that is used to check for timeout in the API
- setApiConTimer(apiConnectptr.i, 0, __LINE__);
- ScanRecordPtr scanptr;
- scanptr.i = apiConnectptr.p->apiScanRec;
- ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord);
- ScanRecord* scanP = scanptr.p;
- const Uint32 len = signal->getLength() - 4;
- if (stopScan == ZTRUE) {
- jam();
- /*********************************************************************
- * APPLICATION IS CLOSING THE SCAN.
- **********************************************************************/
- close_scan_req(signal, scanptr, true);
- return;
- }//if
- if (scanptr.p->scanState == ScanRecord::CLOSING_SCAN){
- jam();
- /**
- * The scan is closing (typically due to error)
- * but the API hasn't understood it yet
- *
- * Wait for API close request
- */
- return;
- }
- // Copy op ptrs so I dont overwrite them when sending...
- memcpy(signal->getDataPtrSend()+25, signal->getDataPtr()+4, 4 * len);
- ScanFragNextReq tmp;
- tmp.closeFlag = ZFALSE;
- tmp.transId1 = apiConnectptr.p->transid[0];
- tmp.transId2 = apiConnectptr.p->transid[1];
- tmp.batch_size_rows = scanP->batch_size_rows;
- tmp.batch_size_bytes = scanP->batch_byte_size;
- ScanFragList running(c_scan_frag_pool, scanP->m_running_scan_frags);
- ScanFragList delivered(c_scan_frag_pool, scanP->m_delivered_scan_frags);
- for(Uint32 i = 0 ; i<len; i++){
- jam();
- scanFragptr.i = signal->theData[i+25];
- c_scan_frag_pool.getPtr(scanFragptr);
- ndbrequire(scanFragptr.p->scanFragState == ScanFragRec::DELIVERED);
-
- scanFragptr.p->startFragTimer(ctcTimer);
- scanFragptr.p->m_ops = 0;
- if(scanFragptr.p->m_scan_frag_conf_status)
- {
- /**
- * last scan was complete
- */
- jam();
- ndbrequire(scanptr.p->scanNextFragId < scanptr.p->scanNoFrag);
- scanFragptr.p->scanFragState = ScanFragRec::WAIT_GET_PRIMCONF;
-
- tcConnectptr.i = scanptr.p->scanTcrec;
- ptrCheckGuard(tcConnectptr, ctcConnectFilesize, tcConnectRecord);
- scanFragptr.p->scanFragId = scanptr.p->scanNextFragId++;
- signal->theData[0] = tcConnectptr.p->dihConnectptr;
- signal->theData[1] = scanFragptr.i;
- signal->theData[2] = scanptr.p->scanTableref;
- signal->theData[3] = scanFragptr.p->scanFragId;
- sendSignal(cdihblockref, GSN_DIGETPRIMREQ, signal, 4, JBB);
- }
- else
- {
- jam();
- scanFragptr.p->scanFragState = ScanFragRec::LQH_ACTIVE;
- ScanFragNextReq * req = (ScanFragNextReq*)signal->getDataPtrSend();
- * req = tmp;
- req->senderData = scanFragptr.i;
- sendSignal(scanFragptr.p->lqhBlockref, GSN_SCAN_NEXTREQ, signal,
- ScanFragNextReq::SignalLength, JBB);
- }
- delivered.remove(scanFragptr);
- running.add(scanFragptr);
- }//for
-
- }//Dbtc::execSCAN_NEXTREQ()
- void
- Dbtc::close_scan_req(Signal* signal, ScanRecordPtr scanPtr, bool req_received){
- ScanRecord* scanP = scanPtr.p;
- ndbrequire(scanPtr.p->scanState != ScanRecord::IDLE);
- scanPtr.p->scanState = ScanRecord::CLOSING_SCAN;
- scanPtr.p->m_close_scan_req = req_received;
- /**
- * Queue : Action
- * ============= : =================
- * completed : -
- * running : close -> LQH
- * delivered w/ : close -> LQH
- * delivered wo/ : move to completed
- * queued w/ : close -> LQH
- * queued wo/ : move to completed
- */
-
- ScanFragNextReq * nextReq = (ScanFragNextReq*)&signal->theData[0];
- nextReq->closeFlag = ZTRUE;
- nextReq->transId1 = apiConnectptr.p->transid[0];
- nextReq->transId2 = apiConnectptr.p->transid[1];
-
- {
- ScanFragRecPtr ptr;
- ScanFragList running(c_scan_frag_pool, scanP->m_running_scan_frags);
- ScanFragList delivered(c_scan_frag_pool, scanP->m_delivered_scan_frags);
- ScanFragList queued(c_scan_frag_pool, scanP->m_queued_scan_frags);
-
- // Close running
- for(running.first(ptr); !ptr.isNull(); ){
- ScanFragRecPtr curr = ptr; // Remove while iterating...
- running.next(ptr);
- if(curr.p->scanFragState == ScanFragRec::WAIT_GET_PRIMCONF){
- jam();
- continue;
- }
- ndbrequire(curr.p->scanFragState == ScanFragRec::LQH_ACTIVE);
-
- curr.p->startFragTimer(ctcTimer);
- curr.p->scanFragState = ScanFragRec::LQH_ACTIVE;
- nextReq->senderData = curr.i;
- sendSignal(curr.p->lqhBlockref, GSN_SCAN_NEXTREQ, signal,
- ScanFragNextReq::SignalLength, JBB);
- }
- // Close delivered
- for(delivered.first(ptr); !ptr.isNull(); ){
- jam();
- ScanFragRecPtr curr = ptr; // Remove while iterating...
- delivered.next(ptr);
- ndbrequire(curr.p->scanFragState == ScanFragRec::DELIVERED);
- delivered.remove(curr);
-
- if(curr.p->m_ops > 0 && curr.p->m_scan_frag_conf_status == 0){
- jam();
- running.add(curr);
- curr.p->scanFragState = ScanFragRec::LQH_ACTIVE;
- curr.p->startFragTimer(ctcTimer);
- nextReq->senderData = curr.i;
- sendSignal(curr.p->lqhBlockref, GSN_SCAN_NEXTREQ, signal,
- ScanFragNextReq::SignalLength, JBB);
-
- } else {
- jam();
- c_scan_frag_pool.release(curr);
- curr.p->scanFragState = ScanFragRec::COMPLETED;
- curr.p->stopFragTimer();
- }
- }//for
- /**
- * All queued with data should be closed
- */
- for(queued.first(ptr); !ptr.isNull(); ){
- jam();
- ndbrequire(ptr.p->scanFragState == ScanFragRec::QUEUED_FOR_DELIVERY);
- ScanFragRecPtr curr = ptr; // Remove while iterating...
- queued.next(ptr);
-
- queued.remove(curr);
- scanP->m_queued_count--;
-
- if(curr.p->m_ops > 0){
- jam();
- running.add(curr);
- curr.p->scanFragState = ScanFragRec::LQH_ACTIVE;
- curr.p->startFragTimer(ctcTimer);
- nextReq->senderData = curr.i;
- sendSignal(curr.p->lqhBlockref, GSN_SCAN_NEXTREQ, signal,
- ScanFragNextReq::SignalLength, JBB);
- } else {
- jam();
- c_scan_frag_pool.release(curr);
- curr.p->scanFragState = ScanFragRec::COMPLETED;
- curr.p->stopFragTimer();
- }
- }
- }
- close_scan_req_send_conf(signal, scanPtr);
- }
- void
- Dbtc::close_scan_req_send_conf(Signal* signal, ScanRecordPtr scanPtr){
- jam();
- ndbrequire(scanPtr.p->m_queued_scan_frags.isEmpty());
- ndbrequire(scanPtr.p->m_delivered_scan_frags.isEmpty());
- //ndbrequire(scanPtr.p->m_running_scan_frags.isEmpty());
- #if 0
- {
- ScanFragList comp(c_scan_frag_pool, scanPtr.p->m_completed_scan_frags);
- ScanFragRecPtr ptr;
- for(comp.first(ptr); !ptr.isNull(); comp.next(ptr)){
- ndbrequire(ptr.p->scanFragTimer == 0);
- ndbrequire(ptr.p->scanFragState == ScanFragRec::COMPLETED);
- }
- }
- #endif
-
- if(!scanPtr.p->m_running_scan_frags.isEmpty()){
- jam();
- return;
- }
- const bool apiFail = (apiConnectptr.p->apiFailState == ZTRUE);
-
- if(!scanPtr.p->m_close_scan_req){
- jam();
- /**
- * The API hasn't order closing yet
- */
- return;
- }
- Uint32 ref = apiConnectptr.p->ndbapiBlockref;
- if(!apiFail && ref){
- jam();
- ScanTabConf * conf = (ScanTabConf*)&signal->theData[0];
- conf->apiConnectPtr = apiConnectptr.p->ndbapiConnect;
- conf->requestInfo = ScanTabConf::EndOfData;
- conf->transId1 = apiConnectptr.p->transid[0];
- conf->transId2 = apiConnectptr.p->transid[1];
- sendSignal(ref, GSN_SCAN_TABCONF, signal, ScanTabConf::SignalLength, JBB);
- }
-
- releaseScanResources(scanPtr);
-
- if(apiFail){
- jam();
- /**
- * API has failed
- */
- handleApiFailState(signal, apiConnectptr.i);
- }
- }
- Dbtc::ScanRecordPtr
- Dbtc::seizeScanrec(Signal* signal) {
- ScanRecordPtr scanptr;
- scanptr.i = cfirstfreeScanrec;
- ptrCheckGuard(scanptr, cscanrecFileSize, scanRecord);
- cfirstfreeScanrec = scanptr.p->nextScan;
- scanptr.p->nextScan = RNIL;
- ndbrequire(scanptr.p->scanState == ScanRecord::IDLE);
- return scanptr;
- }//Dbtc::seizeScanrec()
- void Dbtc::sendScanFragReq(Signal* signal,
- ScanRecord* scanP,
- ScanFragRec* scanFragP)
- {
- ScanFragReq * const req = (ScanFragReq *)&signal->theData[0];
- Uint32 requestInfo = scanP->scanRequestInfo;
- ScanFragReq::setScanPrio(requestInfo, 1);
- apiConnectptr.i = scanP->scanApiRec;
- req->tableId = scanP->scanTableref;
- req->schemaVersion = scanP->scanSchemaVersion;
- ptrCheckGuard(apiConnectptr, capiConnectFilesize, apiConnectRecord);
- req->senderData = scanFragptr.i;
- req->requestInfo = requestInfo;
- req->fragmentNoKeyLen = scanFragP->scanFragId | (scanP->scanKeyLen << 16);
- req->resultRef = apiConnectptr.p->ndbapiBlockref;
- req->savePointId = apiConnectptr.p->currSavePointId;
- req->transId1 = apiConnectptr.p->transid[0];
- req->transId2 = apiConnectptr.p->transid[1];
- req->clientOpPtr = scanFragP->m_apiPtr;
- req->batch_size_rows= scanP->batch_size_rows;
- req->batch_size_bytes= scanP->batch_byte_size;
- sendSignal(scanFragP->lqhBlockref, GSN_SCAN_FRAGREQ, signal,
- ScanFragReq::SignalLength, JBB);
- if(scanP->scanKeyLen > 0)
- {
- tcConnectptr.i = scanFragptr.i;
- packKeyData000Lab(signal, scanFragP->lqhBlockref, scanP->scanKeyLen);
- }
- updateBuddyTimer(apiConnectptr);
- scanFragP->startFragTimer(ctcTimer);
- }//Dbtc::sendScanFragReq()
- void Dbtc::sendScanTabConf(Signal* signal, ScanRecordPtr scanPtr) {
- jam();
- Uint32* ops = signal->getDataPtrSend()+4;
- Uint32 op_count = scanPtr.p->m_queued_count;
- if(4 + 3 * op_count > 25){
- jam();
- ops += 21;
- }
- Uint32 left = scanPtr.p->scanNoFrag - scanPtr.p->scanNextFragId;
-
- ScanTabConf * conf = (ScanTabConf*)&signal->theData[0];
- conf->apiConnectPtr = apiConnectptr.p->ndbapiConnect;
- conf->requestInfo = op_count;
- conf->transId1 = apiConnectptr.p->transid[0];
- conf->transId2 = apiConnectptr.p->transid[1];
- ScanFragRecPtr ptr;
- {
- ScanFragList queued(c_scan_frag_pool, scanPtr.p->m_queued_scan_frags);
- ScanFragList delivered(c_scan_frag_pool,scanPtr.p->m_delivered_scan_frags);
- for(queued.first(ptr); !ptr.isNull(); ){
- ndbrequire(ptr.p->scanFragState == ScanFragRec::QUEUED_FOR_DELIVERY);
- ScanFragRecPtr curr = ptr; // Remove while iterating...
- queued.next(ptr);
-
- bool done = curr.p->m_scan_frag_conf_status && --left;
- * ops++ = curr.p->m_apiPtr;
- * ops++ = done ? RNIL : curr.i;
- * ops++ = (curr.p->m_totalLen << 10) + curr.p->m_ops;
-
- queued.remove(curr);
- if(!done){
- delivered.add(curr);
- curr.p->scanFragState = ScanFragRec::DELIVERED;
- curr.p->stopFragTimer();
- } else {
- c_scan_frag_pool.release(curr);
- curr.p->scanFragState = ScanFragRec::COMPLETED;
- curr.p->stopFragTimer();
- }
- }
- }
-
- if(scanPtr.p->m_delivered_scan_frags.isEmpty() &&
- scanPtr.p->m_running_scan_frags.isEmpty()){
- conf->requestInfo = op_count | ScanTabConf::EndOfData;
- releaseScanResources(scanPtr);
- }