DblqhMain.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:703k
- break;
- }//switch
- }//if
- return;
- }//Dblqh::execATTRINFO()
- /* ************************************************************************>> */
- /* TUP_ATTRINFO: Interpreted execution in DBTUP generates redo-log info */
- /* which is sent back to DBLQH for logging. This is because the decision */
- /* to execute or not is made in DBTUP and thus we cannot start logging until */
- /* DBTUP part has been run. */
- /* ************************************************************************>> */
- void Dblqh::execTUP_ATTRINFO(Signal* signal)
- {
- TcConnectionrec *regTcConnectionrec = tcConnectionrec;
- Uint32 length = signal->length() - 3;
- Uint32 tcIndex = signal->theData[0];
- Uint32 ttcConnectrecFileSize = ctcConnectrecFileSize;
- jamEntry();
- tcConnectptr.i = tcIndex;
- ptrCheckGuard(tcConnectptr, ttcConnectrecFileSize, regTcConnectionrec);
- ndbrequire(tcConnectptr.p->transactionState == TcConnectionrec::WAIT_TUP);
- if (saveTupattrbuf(signal, &signal->theData[3], length) == ZOK) {
- return;
- } else {
- jam();
- /* ------------------------------------------------------------------------- */
- /* WE ARE WAITING FOR RESPONSE FROM TUP HERE. THUS WE NEED TO */
- /* GO THROUGH THE STATE MACHINE FOR THE OPERATION. */
- /* ------------------------------------------------------------------------- */
- localAbortStateHandlerLab(signal);
- }//if
- }//Dblqh::execTUP_ATTRINFO()
- /* ------------------------------------------------------------------------- */
- /* ------- HANDLE ATTRINFO FROM LQH ------- */
- /* */
- /* ------------------------------------------------------------------------- */
- void Dblqh::lqhAttrinfoLab(Signal* signal, Uint32* dataPtr, Uint32 length)
- {
- TcConnectionrec * const regTcPtr = tcConnectptr.p;
- if (regTcPtr->operation != ZREAD) {
- if (regTcPtr->opExec != 1) {
- if (saveTupattrbuf(signal, dataPtr, length) == ZOK) {
- ;
- } else {
- jam();
- /* ------------------------------------------------------------------------- */
- /* WE MIGHT BE WAITING FOR RESPONSE FROM SOME BLOCK HERE. THUS WE NEED TO */
- /* GO THROUGH THE STATE MACHINE FOR THE OPERATION. */
- /* ------------------------------------------------------------------------- */
- localAbortStateHandlerLab(signal);
- return;
- }//if
- }//if
- }//if
- Uint32 sig0 = regTcPtr->tupConnectrec;
- Uint32 blockNo = refToBlock(regTcPtr->tcTupBlockref);
- signal->theData[0] = sig0;
- EXECUTE_DIRECT(blockNo, GSN_ATTRINFO, signal, length + 3);
- jamEntry();
- }//Dblqh::lqhAttrinfoLab()
- /* ------------------------------------------------------------------------- */
- /* ------ FIND TRANSACTION BY USING HASH TABLE ------- */
- /* */
- /* ------------------------------------------------------------------------- */
- int Dblqh::findTransaction(UintR Transid1, UintR Transid2, UintR TcOprec)
- {
- TcConnectionrec *regTcConnectionrec = tcConnectionrec;
- Uint32 ttcConnectrecFileSize = ctcConnectrecFileSize;
- TcConnectionrecPtr locTcConnectptr;
- Uint32 ThashIndex = (Transid1 ^ TcOprec) & 1023;
- locTcConnectptr.i = ctransidHash[ThashIndex];
- while (locTcConnectptr.i != RNIL) {
- ptrCheckGuard(locTcConnectptr, ttcConnectrecFileSize, regTcConnectionrec);
- if ((locTcConnectptr.p->transid[0] == Transid1) &&
- (locTcConnectptr.p->transid[1] == Transid2) &&
- (locTcConnectptr.p->tcOprec == TcOprec)) {
- /* FIRST PART OF TRANSACTION CORRECT */
- /* SECOND PART ALSO CORRECT */
- /* THE OPERATION RECORD POINTER IN TC WAS ALSO CORRECT */
- jam();
- tcConnectptr.i = locTcConnectptr.i;
- tcConnectptr.p = locTcConnectptr.p;
- return (int)ZOK;
- }//if
- jam();
- /* THIS WAS NOT THE TRANSACTION WHICH WAS SOUGHT */
- locTcConnectptr.i = locTcConnectptr.p->nextHashRec;
- }//while
- /* WE DID NOT FIND THE TRANSACTION, REPORT NOT FOUND */
- return (int)ZNOT_FOUND;
- }//Dblqh::findTransaction()
- /* ------------------------------------------------------------------------- */
- /* ------- SAVE ATTRINFO FROM TUP IN ATTRINBUF ------- */
- /* */
- /* ------------------------------------------------------------------------- */
- int Dblqh::saveTupattrbuf(Signal* signal, Uint32* dataPtr, Uint32 length)
- {
- Uint32 tfirstfreeAttrinbuf = cfirstfreeAttrinbuf;
- TcConnectionrec * const regTcPtr = tcConnectptr.p;
- Uint32 currTupAiLen = regTcPtr->currTupAiLen;
- if (tfirstfreeAttrinbuf == RNIL) {
- jam();
- terrorCode = ZGET_ATTRINBUF_ERROR;
- return ZGET_ATTRINBUF_ERROR;
- }//if
- seizeAttrinbuf(signal);
- Attrbuf * const regAttrPtr = attrinbufptr.p;
- MEMCOPY_NO_WORDS(®AttrPtr->attrbuf[0], dataPtr, length);
- regTcPtr->currTupAiLen = currTupAiLen + length;
- regAttrPtr->attrbuf[ZINBUF_DATA_LEN] = length;
- return ZOK;
- }//Dblqh::saveTupattrbuf()
- /* ==========================================================================
- * ======= SEIZE ATTRIBUTE IN BUFFER =======
- *
- * GET A NEW ATTRINBUF AND SETS ATTRINBUFPTR.
- * ========================================================================= */
- void Dblqh::seizeAttrinbuf(Signal* signal)
- {
- AttrbufPtr tmpAttrinbufptr;
- AttrbufPtr regAttrinbufptr;
- Attrbuf *regAttrbuf = attrbuf;
- Uint32 tattrinbufFileSize = cattrinbufFileSize;
- regAttrinbufptr.i = seize_attrinbuf();
- tmpAttrinbufptr.i = tcConnectptr.p->lastAttrinbuf;
- ptrCheckGuard(regAttrinbufptr, tattrinbufFileSize, regAttrbuf);
- tcConnectptr.p->lastAttrinbuf = regAttrinbufptr.i;
- regAttrinbufptr.p->attrbuf[ZINBUF_DATA_LEN] = 0;
- if (tmpAttrinbufptr.i == RNIL) {
- jam();
- tcConnectptr.p->firstAttrinbuf = regAttrinbufptr.i;
- } else {
- jam();
- ptrCheckGuard(tmpAttrinbufptr, tattrinbufFileSize, regAttrbuf);
- tmpAttrinbufptr.p->attrbuf[ZINBUF_NEXT] = regAttrinbufptr.i;
- }//if
- regAttrinbufptr.p->attrbuf[ZINBUF_NEXT] = RNIL;
- attrinbufptr = regAttrinbufptr;
- }//Dblqh::seizeAttrinbuf()
- /* ==========================================================================
- * ======= SEIZE TC CONNECT RECORD =======
- *
- * GETS A NEW TC CONNECT RECORD FROM FREELIST.
- * ========================================================================= */
- void Dblqh::seizeTcrec()
- {
- TcConnectionrecPtr locTcConnectptr;
- locTcConnectptr.i = cfirstfreeTcConrec;
- ptrCheckGuard(locTcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
- Uint32 nextTc = locTcConnectptr.p->nextTcConnectrec;
- locTcConnectptr.p->nextTcConnectrec = RNIL;
- locTcConnectptr.p->clientConnectrec = RNIL;
- locTcConnectptr.p->clientBlockref = RNIL;
- locTcConnectptr.p->abortState = TcConnectionrec::ABORT_IDLE;
- locTcConnectptr.p->tcTimer = cLqhTimeOutCount;
- locTcConnectptr.p->tableref = RNIL;
- locTcConnectptr.p->savePointId = 0;
- cfirstfreeTcConrec = nextTc;
- tcConnectptr = locTcConnectptr;
- locTcConnectptr.p->connectState = TcConnectionrec::CONNECTED;
- }//Dblqh::seizeTcrec()
- /* ==========================================================================
- * ======= SEIZE DATA BUFFER =======
- * ========================================================================= */
- void Dblqh::seizeTupkeybuf(Signal* signal)
- {
- Databuf *regDatabuf = databuf;
- DatabufPtr tmpDatabufptr;
- DatabufPtr regDatabufptr;
- Uint32 tdatabufFileSize = cdatabufFileSize;
- /* ------- GET A DATABUF. ------- */
- regDatabufptr.i = cfirstfreeDatabuf;
- tmpDatabufptr.i = tcConnectptr.p->lastTupkeybuf;
- ptrCheckGuard(regDatabufptr, tdatabufFileSize, regDatabuf);
- Uint32 nextFirst = regDatabufptr.p->nextDatabuf;
- tcConnectptr.p->lastTupkeybuf = regDatabufptr.i;
- if (tmpDatabufptr.i == RNIL) {
- jam();
- tcConnectptr.p->firstTupkeybuf = regDatabufptr.i;
- } else {
- jam();
- ptrCheckGuard(tmpDatabufptr, tdatabufFileSize, regDatabuf);
- tmpDatabufptr.p->nextDatabuf = regDatabufptr.i;
- }//if
- cfirstfreeDatabuf = nextFirst;
- regDatabufptr.p->nextDatabuf = RNIL;
- databufptr = regDatabufptr;
- }//Dblqh::seizeTupkeybuf()
- /* ------------------------------------------------------------------------- */
- /* ------- TAKE CARE OF LQHKEYREQ ------- */
- /* LQHKEYREQ IS THE SIGNAL THAT STARTS ALL OPERATIONS IN THE LQH BLOCK */
- /* THIS SIGNAL CONTAINS A LOT OF INFORMATION ABOUT WHAT TYPE OF OPERATION, */
- /* KEY INFORMATION, ATTRIBUTE INFORMATION, NODE INFORMATION AND A LOT MORE */
- /* ------------------------------------------------------------------------- */
- void Dblqh::execLQHKEYREQ(Signal* signal)
- {
- UintR sig0, sig1, sig2, sig3, sig4, sig5;
- Uint8 tfragDistKey;
- const LqhKeyReq * const lqhKeyReq = (LqhKeyReq *)signal->getDataPtr();
- sig0 = lqhKeyReq->clientConnectPtr;
- if (cfirstfreeTcConrec != RNIL && !ERROR_INSERTED(5031)) {
- jamEntry();
- seizeTcrec();
- } else {
- /* ------------------------------------------------------------------------- */
- /* NO FREE TC RECORD AVAILABLE, THUS WE CANNOT HANDLE THE REQUEST. */
- /* ------------------------------------------------------------------------- */
- if (ERROR_INSERTED(5031)) {
- CLEAR_ERROR_INSERT_VALUE;
- }
- noFreeRecordLab(signal, lqhKeyReq, ZNO_TC_CONNECT_ERROR);
- return;
- }//if
- if(ERROR_INSERTED(5038) &&
- refToNode(signal->getSendersBlockRef()) != getOwnNodeId()){
- jam();
- SET_ERROR_INSERT_VALUE(5039);
- return;
- }
-
- c_Counters.operations++;
- TcConnectionrec * const regTcPtr = tcConnectptr.p;
- regTcPtr->clientBlockref = signal->senderBlockRef();
- regTcPtr->clientConnectrec = sig0;
- regTcPtr->tcOprec = sig0;
- regTcPtr->storedProcId = ZNIL;
- UintR TtotReclenAi = lqhKeyReq->attrLen;
- sig1 = lqhKeyReq->savePointId;
- sig2 = lqhKeyReq->hashValue;
- UintR Treqinfo = lqhKeyReq->requestInfo;
- sig4 = lqhKeyReq->tableSchemaVersion;
- sig5 = lqhKeyReq->tcBlockref;
- regTcPtr->savePointId = sig1;
- regTcPtr->hashValue = sig2;
- const Uint32 schemaVersion = regTcPtr->schemaVersion = LqhKeyReq::getSchemaVersion(sig4);
- tabptr.i = LqhKeyReq::getTableId(sig4);
- regTcPtr->tcBlockref = sig5;
- const Uint8 op = LqhKeyReq::getOperation(Treqinfo);
- if (op == ZREAD && !getAllowRead()){
- noFreeRecordLab(signal, lqhKeyReq, ZNODE_SHUTDOWN_IN_PROGESS);
- return;
- }
- regTcPtr->totReclenAi = LqhKeyReq::getAttrLen(TtotReclenAi);
- regTcPtr->tcScanInfo = lqhKeyReq->scanInfo;
- regTcPtr->indTakeOver = LqhKeyReq::getScanTakeOverFlag(TtotReclenAi);
- regTcPtr->readlenAi = 0;
- regTcPtr->currTupAiLen = 0;
- regTcPtr->listState = TcConnectionrec::NOT_IN_LIST;
- regTcPtr->logWriteState = TcConnectionrec::NOT_STARTED;
- regTcPtr->fragmentptr = RNIL;
- sig0 = lqhKeyReq->fragmentData;
- sig1 = lqhKeyReq->transId1;
- sig2 = lqhKeyReq->transId2;
- sig3 = lqhKeyReq->variableData[0];
- sig4 = lqhKeyReq->variableData[1];
- regTcPtr->fragmentid = LqhKeyReq::getFragmentId(sig0);
- regTcPtr->nextReplica = LqhKeyReq::getNextReplicaNodeId(sig0);
- regTcPtr->transid[0] = sig1;
- regTcPtr->transid[1] = sig2;
- regTcPtr->applRef = sig3;
- regTcPtr->applOprec = sig4;
- regTcPtr->commitAckMarker = RNIL;
- if(LqhKeyReq::getMarkerFlag(Treqinfo)){
- jam();
-
- CommitAckMarkerPtr markerPtr;
- m_commitAckMarkerHash.seize(markerPtr);
- if(markerPtr.i == RNIL){
- noFreeRecordLab(signal, lqhKeyReq, ZNO_FREE_MARKER_RECORDS_ERROR);
- return;
- }
- markerPtr.p->transid1 = sig1;
- markerPtr.p->transid2 = sig2;
- markerPtr.p->apiRef = sig3;
- markerPtr.p->apiOprec = sig4;
- const NodeId tcNodeId = refToNode(sig5);
- markerPtr.p->tcNodeId = tcNodeId;
-
- CommitAckMarkerPtr tmp;
- #ifdef VM_TRACE
- #ifdef MARKER_TRACE
- ndbout_c("Add marker[%.8x %.8x]", markerPtr.p->transid1, markerPtr.p->transid2);
- #endif
- ndbrequire(!m_commitAckMarkerHash.find(tmp, * markerPtr.p));
- #endif
- m_commitAckMarkerHash.add(markerPtr);
- regTcPtr->commitAckMarker = markerPtr.i;
- }
-
- regTcPtr->reqinfo = Treqinfo;
- regTcPtr->lastReplicaNo = LqhKeyReq::getLastReplicaNo(Treqinfo);
- regTcPtr->lockType = LqhKeyReq::getLockType(Treqinfo);
- regTcPtr->dirtyOp = LqhKeyReq::getDirtyFlag(Treqinfo);
- regTcPtr->opExec = LqhKeyReq::getInterpretedFlag(Treqinfo);
- regTcPtr->opSimple = LqhKeyReq::getSimpleFlag(Treqinfo);
- regTcPtr->operation = LqhKeyReq::getOperation(Treqinfo);
- regTcPtr->simpleRead = regTcPtr->operation == ZREAD && regTcPtr->opSimple;
- regTcPtr->seqNoReplica = LqhKeyReq::getSeqNoReplica(Treqinfo);
- UintR TreclenAiLqhkey = LqhKeyReq::getAIInLqhKeyReq(Treqinfo);
- regTcPtr->apiVersionNo = 0;
-
- CRASH_INSERTION2(5041, regTcPtr->simpleRead &&
- refToNode(signal->senderBlockRef()) != cownNodeid);
-
- regTcPtr->reclenAiLqhkey = TreclenAiLqhkey;
- regTcPtr->currReclenAi = TreclenAiLqhkey;
- UintR TitcKeyLen = LqhKeyReq::getKeyLen(Treqinfo);
- regTcPtr->primKeyLen = TitcKeyLen;
- regTcPtr->noFiredTriggers = lqhKeyReq->noFiredTriggers;
- UintR TapplAddressInd = LqhKeyReq::getApplicationAddressFlag(Treqinfo);
- UintR nextPos = (TapplAddressInd << 1);
- UintR TsameClientAndTcOprec = LqhKeyReq::getSameClientAndTcFlag(Treqinfo);
- if (TsameClientAndTcOprec == 1) {
- regTcPtr->tcOprec = lqhKeyReq->variableData[nextPos];
- nextPos++;
- }//if
- UintR TnextReplicasIndicator = regTcPtr->lastReplicaNo -
- regTcPtr->seqNoReplica;
- if (TnextReplicasIndicator > 1) {
- regTcPtr->nodeAfterNext[0] = lqhKeyReq->variableData[nextPos] & 0xFFFF;
- regTcPtr->nodeAfterNext[1] = lqhKeyReq->variableData[nextPos] >> 16;
- nextPos++;
- }//if
- UintR TstoredProcIndicator = LqhKeyReq::getStoredProcFlag(TtotReclenAi);
- if (TstoredProcIndicator == 1) {
- regTcPtr->storedProcId = lqhKeyReq->variableData[nextPos] & ZNIL;
- nextPos++;
- }//if
- UintR TreadLenAiIndicator = LqhKeyReq::getReturnedReadLenAIFlag(Treqinfo);
- if (TreadLenAiIndicator == 1) {
- regTcPtr->readlenAi = lqhKeyReq->variableData[nextPos] & ZNIL;
- nextPos++;
- }//if
- sig0 = lqhKeyReq->variableData[nextPos + 0];
- sig1 = lqhKeyReq->variableData[nextPos + 1];
- sig2 = lqhKeyReq->variableData[nextPos + 2];
- sig3 = lqhKeyReq->variableData[nextPos + 3];
- regTcPtr->tupkeyData[0] = sig0;
- regTcPtr->tupkeyData[1] = sig1;
- regTcPtr->tupkeyData[2] = sig2;
- regTcPtr->tupkeyData[3] = sig3;
- if (TitcKeyLen > 0) {
- if (TitcKeyLen < 4) {
- nextPos += TitcKeyLen;
- } else {
- nextPos += 4;
- }//if
- } else {
- LQHKEY_error(signal, 3);
- return;
- }//if
- if ((LqhKeyReq::FixedSignalLength + nextPos + TreclenAiLqhkey) !=
- signal->length()) {
- LQHKEY_error(signal, 2);
- return;
- }//if
- UintR TseqNoReplica = regTcPtr->seqNoReplica;
- UintR TlastReplicaNo = regTcPtr->lastReplicaNo;
- if (TseqNoReplica == TlastReplicaNo) {
- jam();
- regTcPtr->nextReplica = ZNIL;
- } else {
- if (TseqNoReplica < TlastReplicaNo) {
- jam();
- regTcPtr->nextSeqNoReplica = TseqNoReplica + 1;
- if ((regTcPtr->nextReplica == 0) ||
- (regTcPtr->nextReplica == cownNodeid)) {
- LQHKEY_error(signal, 0);
- }//if
- } else {
- LQHKEY_error(signal, 4);
- return;
- }//if
- }//if
- TcConnectionrecPtr localNextTcConnectptr;
- Uint32 hashIndex = (regTcPtr->transid[0] ^ regTcPtr->tcOprec) & 1023;
- localNextTcConnectptr.i = ctransidHash[hashIndex];
- ctransidHash[hashIndex] = tcConnectptr.i;
- regTcPtr->prevHashRec = RNIL;
- regTcPtr->nextHashRec = localNextTcConnectptr.i;
- if (localNextTcConnectptr.i != RNIL) {
- /* -------------------------------------------------------------------------- */
- /* ENSURE THAT THE NEXT RECORD HAS SET PREVIOUS TO OUR RECORD IF IT EXISTS */
- /* -------------------------------------------------------------------------- */
- ptrCheckGuard(localNextTcConnectptr,
- ctcConnectrecFileSize, tcConnectionrec);
- jam();
- localNextTcConnectptr.p->prevHashRec = tcConnectptr.i;
- }//if
- if (tabptr.i >= ctabrecFileSize) {
- LQHKEY_error(signal, 5);
- return;
- }//if
- ptrAss(tabptr, tablerec);
- if(tabptr.p->tableStatus != Tablerec::TABLE_DEFINED){
- LQHKEY_abort(signal, 4);
- return;
- }
- if(table_version_major(tabptr.p->schemaVersion) !=
- table_version_major(schemaVersion)){
- LQHKEY_abort(signal, 5);
- return;
- }
- regTcPtr->tableref = tabptr.i;
- tabptr.p->usageCount++;
-
- if (!getFragmentrec(signal, regTcPtr->fragmentid)) {
- LQHKEY_error(signal, 6);
- return;
- }//if
- regTcPtr->localFragptr = (regTcPtr->hashValue >> fragptr.p->hashCheckBit) & 1;
- Uint8 TcopyType = fragptr.p->fragCopy;
- tfragDistKey = fragptr.p->fragDistributionKey;
- if (fragptr.p->fragStatus == Fragrecord::ACTIVE_CREATION) {
- jam();
- regTcPtr->activeCreat = ZTRUE;
- CRASH_INSERTION(5002);
- } else {
- regTcPtr->activeCreat = ZFALSE;
- }//if
- regTcPtr->replicaType = TcopyType;
- regTcPtr->fragmentptr = fragptr.i;
- Uint8 TdistKey = LqhKeyReq::getDistributionKey(TtotReclenAi);
- if ((tfragDistKey != TdistKey) &&
- (regTcPtr->seqNoReplica == 0) &&
- (regTcPtr->dirtyOp == ZFALSE) &&
- (regTcPtr->simpleRead == ZFALSE)) {
- /* ----------------------------------------------------------------------
- * WE HAVE DIFFERENT OPINION THAN THE DIH THAT STARTED THE TRANSACTION.
- * THE REASON COULD BE THAT THIS IS AN OLD DISTRIBUTION WHICH IS NO LONGER
- * VALID TO USE. THIS MUST BE CHECKED.
- * ONE IS ADDED TO THE DISTRIBUTION KEY EVERY TIME WE ADD A NEW REPLICA.
- * FAILED REPLICAS DO NOT AFFECT THE DISTRIBUTION KEY. THIS MEANS THAT THE
- * MAXIMUM DEVIATION CAN BE ONE BETWEEN THOSE TWO VALUES.
- * --------------------------------------------------------------------- */
- Int32 tmp = TdistKey - tfragDistKey;
- tmp = (tmp < 0 ? - tmp : tmp);
- if ((tmp <= 1) || (tfragDistKey == 0)) {
- LQHKEY_abort(signal, 0);
- return;
- }//if
- LQHKEY_error(signal, 1);
- }//if
- if (TreclenAiLqhkey != 0) {
- if (regTcPtr->operation != ZREAD) {
- if (regTcPtr->operation != ZDELETE) {
- if (regTcPtr->opExec != 1) {
- jam();
- /*---------------------------------------------------------------------------*/
- /* */
- /* UPDATES, WRITES AND INSERTS THAT ARE NOT INTERPRETED WILL USE THE */
- /* SAME ATTRINFO IN ALL REPLICAS. THUS WE SAVE THE ATTRINFO ALREADY */
- /* TO SAVE A SIGNAL FROM TUP TO LQH. INTERPRETED EXECUTION IN TUP */
- /* WILL CREATE NEW ATTRINFO FOR THE OTHER REPLICAS AND IT IS THUS NOT */
- /* A GOOD IDEA TO SAVE THE INFORMATION HERE. READS WILL ALSO BE */
- /* UNNECESSARY TO SAVE SINCE THAT ATTRINFO WILL NEVER BE SENT TO ANY */
- /* MORE REPLICAS. */
- /*---------------------------------------------------------------------------*/
- /* READS AND DELETES CAN ONLY HAVE INFORMATION ABOUT WHAT IS TO BE READ. */
- /* NO INFORMATION THAT NEEDS LOGGING. */
- /*---------------------------------------------------------------------------*/
- sig0 = lqhKeyReq->variableData[nextPos + 0];
- sig1 = lqhKeyReq->variableData[nextPos + 1];
- sig2 = lqhKeyReq->variableData[nextPos + 2];
- sig3 = lqhKeyReq->variableData[nextPos + 3];
- sig4 = lqhKeyReq->variableData[nextPos + 4];
- regTcPtr->firstAttrinfo[0] = sig0;
- regTcPtr->firstAttrinfo[1] = sig1;
- regTcPtr->firstAttrinfo[2] = sig2;
- regTcPtr->firstAttrinfo[3] = sig3;
- regTcPtr->firstAttrinfo[4] = sig4;
- regTcPtr->currTupAiLen = TreclenAiLqhkey;
- } else {
- jam();
- regTcPtr->reclenAiLqhkey = 0;
- }//if
- } else {
- jam();
- regTcPtr->reclenAiLqhkey = 0;
- }//if
- }//if
- sig0 = lqhKeyReq->variableData[nextPos + 0];
- sig1 = lqhKeyReq->variableData[nextPos + 1];
- sig2 = lqhKeyReq->variableData[nextPos + 2];
- sig3 = lqhKeyReq->variableData[nextPos + 3];
- sig4 = lqhKeyReq->variableData[nextPos + 4];
- signal->theData[0] = regTcPtr->tupConnectrec;
- signal->theData[3] = sig0;
- signal->theData[4] = sig1;
- signal->theData[5] = sig2;
- signal->theData[6] = sig3;
- signal->theData[7] = sig4;
- EXECUTE_DIRECT(refToBlock(regTcPtr->tcTupBlockref), GSN_ATTRINFO,
- signal, TreclenAiLqhkey + 3);
- jamEntry();
- if (signal->theData[0] == (UintR)-1) {
- LQHKEY_abort(signal, 2);
- return;
- }//if
- }//if
- /* ------- TAKE CARE OF PRIM KEY DATA ------- */
- if (regTcPtr->primKeyLen <= 4) {
- endgettupkeyLab(signal);
- return;
- } else {
- jam();
- /*--------------------------------------------------------------------*/
- /* KEY LENGTH WAS MORE THAN 4 WORDS (WORD = 4 BYTE). THUS WE */
- /* HAVE TO ALLOCATE A DATA BUFFER TO STORE THE KEY DATA AND */
- /* WAIT FOR THE KEYINFO SIGNAL. */
- /*--------------------------------------------------------------------*/
- regTcPtr->save1 = 4;
- regTcPtr->transactionState = TcConnectionrec::WAIT_TUPKEYINFO;
- return;
- }//if
- return;
- }//Dblqh::execLQHKEYREQ()
- void Dblqh::endgettupkeyLab(Signal* signal)
- {
- TcConnectionrec * const regTcPtr = tcConnectptr.p;
- if (regTcPtr->totReclenAi == regTcPtr->currReclenAi) {
- ;
- } else {
- jam();
- ndbrequire(regTcPtr->currReclenAi < regTcPtr->totReclenAi);
- regTcPtr->transactionState = TcConnectionrec::WAIT_ATTR;
- return;
- }//if
- /* ---------------------------------------------------------------------- */
- /* NOW RECEPTION OF LQHKEYREQ IS COMPLETED THE NEXT STEP IS TO START*/
- /* PROCESSING THE MESSAGE. IF THE MESSAGE IS TO A STAND-BY NODE */
- /* WITHOUT NETWORK REDUNDANCY OR PREPARE-TO-COMMIT ACTIVATED THE */
- /* PREPARATION TO SEND TO THE NEXT NODE WILL START IMMEDIATELY. */
- /* */
- /* OTHERWISE THE PROCESSING WILL START AFTER SETTING THE PROPER */
- /* STATE. HOWEVER BEFORE PROCESSING THE MESSAGE */
- /* IT IS NECESSARY TO CHECK THAT THE FRAGMENT IS NOT PERFORMING */
- /* A CHECKPOINT. THE OPERATION SHALL ALSO BE LINKED INTO THE */
- /* FRAGMENT QUEUE OR LIST OF ACTIVE OPERATIONS. */
- /* */
- /* THE FIRST STEP IN PROCESSING THE MESSAGE IS TO CONTACT DBACC. */
- /*------------------------------------------------------------------------*/
- switch (fragptr.p->fragStatus) {
- case Fragrecord::FSACTIVE:
- case Fragrecord::CRASH_RECOVERING:
- case Fragrecord::ACTIVE_CREATION:
- linkActiveFrag(signal);
- prepareContinueAfterBlockedLab(signal);
- return;
- break;
- case Fragrecord::BLOCKED:
- jam();
- linkFragQueue(signal);
- regTcPtr->transactionState = TcConnectionrec::STOPPED;
- return;
- break;
- case Fragrecord::FREE:
- jam();
- case Fragrecord::DEFINED:
- jam();
- case Fragrecord::REMOVING:
- jam();
- default:
- ndbrequire(false);
- break;
- }//switch
- return;
- }//Dblqh::endgettupkeyLab()
- void Dblqh::prepareContinueAfterBlockedLab(Signal* signal)
- {
- UintR ttcScanOp;
- UintR taccreq;
- /* -------------------------------------------------------------------------- */
- /* INPUT: TC_CONNECTPTR ACTIVE CONNECTION RECORD */
- /* FRAGPTR FRAGMENT RECORD */
- /* -------------------------------------------------------------------------- */
- /* -------------------------------------------------------------------------- */
- /* CONTINUE HERE AFTER BEING BLOCKED FOR A WHILE DURING LOCAL CHECKPOINT. */
- /* -------------------------------------------------------------------------- */
- /* ALSO AFTER NORMAL PROCEDURE WE CONTINUE HERE */
- /* -------------------------------------------------------------------------- */
- Uint32 tc_ptr_i = tcConnectptr.i;
- TcConnectionrec * const regTcPtr = tcConnectptr.p;
- if (regTcPtr->indTakeOver == ZTRUE) {
- jam();
- ttcScanOp = KeyInfo20::getScanOp(regTcPtr->tcScanInfo);
- scanptr.i = RNIL;
- {
- ScanRecord key;
- key.scanNumber = KeyInfo20::getScanNo(regTcPtr->tcScanInfo);
- key.fragPtrI = fragptr.i;
- c_scanTakeOverHash.find(scanptr, key);
- #ifdef TRACE_SCAN_TAKEOVER
- if(scanptr.i == RNIL)
- ndbout_c("not finding (%d %d)", key.scanNumber, key.fragPtrI);
- #endif
- }
- if (scanptr.i == RNIL) {
- jam();
- releaseActiveFrag(signal);
- takeOverErrorLab(signal);
- return;
- }//if
- Uint32 accOpPtr= get_acc_ptr_from_scan_record(scanptr.p,
- ttcScanOp,
- true);
- if (accOpPtr == RNIL) {
- jam();
- releaseActiveFrag(signal);
- takeOverErrorLab(signal);
- return;
- }//if
- signal->theData[1] = accOpPtr;
- signal->theData[2] = regTcPtr->transid[0];
- signal->theData[3] = regTcPtr->transid[1];
- EXECUTE_DIRECT(refToBlock(regTcPtr->tcAccBlockref), GSN_ACC_TO_REQ,
- signal, 4);
- if (signal->theData[0] == (UintR)-1) {
- execACC_TO_REF(signal);
- return;
- }//if
- jamEntry();
- }//if
- /*-------------------------------------------------------------------*/
- /* IT IS NOW TIME TO CONTACT ACC. THE TUPLE KEY WILL BE SENT */
- /* AND THIS WILL BE TRANSLATED INTO A LOCAL KEY BY USING THE */
- /* LOCAL PART OF THE LH3-ALGORITHM. ALSO PROPER LOCKS ON THE */
- /* TUPLE WILL BE SET. FOR INSERTS AND DELETES THE MESSAGE WILL */
- /* START AN INSERT/DELETE INTO THE HASH TABLE. */
- /* */
- /* BEFORE SENDING THE MESSAGE THE REQUEST INFORMATION IS SET */
- /* PROPERLY. */
- /* ----------------------------------------------------------------- */
- #if 0
- if (regTcPtr->tableref != 0) {
- switch (regTcPtr->operation) {
- case ZREAD: ndbout << "Läsning "; break;
- case ZUPDATE: ndbout << " Uppdatering "; break;
- case ZWRITE: ndbout << "Write "; break;
- case ZINSERT: ndbout << "Inläggning "; break;
- case ZDELETE: ndbout << "Borttagning "; break;
- default: ndbout << "????"; break;
- }
- ndbout << "med nyckel = " << regTcPtr->tupkeyData[0] << endl;
- }
- #endif
-
- regTcPtr->transactionState = TcConnectionrec::WAIT_ACC;
- taccreq = regTcPtr->operation;
- taccreq = taccreq + (regTcPtr->opSimple << 3);
- taccreq = taccreq + (regTcPtr->lockType << 4);
- taccreq = taccreq + (regTcPtr->dirtyOp << 6);
- taccreq = taccreq + (regTcPtr->replicaType << 7);
- taccreq = taccreq + (regTcPtr->apiVersionNo << 9);
- /* ************ */
- /* ACCKEYREQ < */
- /* ************ */
- ndbrequire(regTcPtr->localFragptr < 2);
- Uint32 sig0, sig1, sig2, sig3, sig4;
- sig0 = regTcPtr->accConnectrec;
- sig1 = fragptr.p->accFragptr[regTcPtr->localFragptr];
- sig2 = regTcPtr->hashValue;
- sig3 = regTcPtr->primKeyLen;
- sig4 = regTcPtr->transid[0];
- signal->theData[0] = sig0;
- signal->theData[1] = sig1;
- signal->theData[2] = taccreq;
- signal->theData[3] = sig2;
- signal->theData[4] = sig3;
- signal->theData[5] = sig4;
- sig0 = regTcPtr->transid[1];
- sig1 = regTcPtr->tupkeyData[0];
- sig2 = regTcPtr->tupkeyData[1];
- sig3 = regTcPtr->tupkeyData[2];
- sig4 = regTcPtr->tupkeyData[3];
- signal->theData[6] = sig0;
- signal->theData[7] = sig1;
- signal->theData[8] = sig2;
- signal->theData[9] = sig3;
- signal->theData[10] = sig4;
- if (regTcPtr->primKeyLen > 4) {
- sendKeyinfoAcc(signal, 11);
- }//if
- EXECUTE_DIRECT(refToBlock(regTcPtr->tcAccBlockref), GSN_ACCKEYREQ,
- signal, 7 + regTcPtr->primKeyLen);
- if (signal->theData[0] < RNIL) {
- signal->theData[0] = tc_ptr_i;
- execACCKEYCONF(signal);
- return;
- } else if (signal->theData[0] == RNIL) {
- ;
- } else {
- ndbrequire(signal->theData[0] == (UintR)-1);
- signal->theData[0] = tc_ptr_i;
- execACCKEYREF(signal);
- }//if
- return;
- }//Dblqh::prepareContinueAfterBlockedLab()
- /* ========================================================================== */
- /* ======= SEND KEYINFO TO ACC ======= */
- /* */
- /* ========================================================================== */
- void Dblqh::sendKeyinfoAcc(Signal* signal, Uint32 Ti)
- {
- DatabufPtr regDatabufptr;
- regDatabufptr.i = tcConnectptr.p->firstTupkeybuf;
-
- do {
- jam();
- ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
- Uint32 sig0 = regDatabufptr.p->data[0];
- Uint32 sig1 = regDatabufptr.p->data[1];
- Uint32 sig2 = regDatabufptr.p->data[2];
- Uint32 sig3 = regDatabufptr.p->data[3];
- signal->theData[Ti] = sig0;
- signal->theData[Ti + 1] = sig1;
- signal->theData[Ti + 2] = sig2;
- signal->theData[Ti + 3] = sig3;
- regDatabufptr.i = regDatabufptr.p->nextDatabuf;
- Ti += 4;
- } while (regDatabufptr.i != RNIL);
- }//Dblqh::sendKeyinfoAcc()
- void Dblqh::execLQH_ALLOCREQ(Signal* signal)
- {
- TcConnectionrecPtr regTcPtr;
- FragrecordPtr regFragptr;
- jamEntry();
- regTcPtr.i = signal->theData[0];
- ptrCheckGuard(regTcPtr, ctcConnectrecFileSize, tcConnectionrec);
- regFragptr.i = regTcPtr.p->fragmentptr;
- ptrCheckGuard(regFragptr, cfragrecFileSize, fragrecord);
- ndbrequire(regTcPtr.p->localFragptr < 2);
- signal->theData[0] = regTcPtr.p->tupConnectrec;
- signal->theData[1] = regFragptr.p->tupFragptr[regTcPtr.p->localFragptr];
- signal->theData[2] = regTcPtr.p->tableref;
- Uint32 tup = refToBlock(regTcPtr.p->tcTupBlockref);
- EXECUTE_DIRECT(tup, GSN_TUP_ALLOCREQ, signal, 3);
- }//Dblqh::execTUP_ALLOCREQ()
- /* ************>> */
- /* ACCKEYCONF > */
- /* ************>> */
- void Dblqh::execACCKEYCONF(Signal* signal)
- {
- TcConnectionrec *regTcConnectionrec = tcConnectionrec;
- Uint32 ttcConnectrecFileSize = ctcConnectrecFileSize;
- Uint32 tcIndex = signal->theData[0];
- Uint32 Tfragid = signal->theData[2];
- Uint32 localKey1 = signal->theData[3];
- Uint32 localKey2 = signal->theData[4];
- Uint32 localKeyFlag = signal->theData[5];
- jamEntry();
- tcConnectptr.i = tcIndex;
- ptrCheckGuard(tcConnectptr, ttcConnectrecFileSize, regTcConnectionrec);
- TcConnectionrec * const regTcPtr = tcConnectptr.p;
- if (regTcPtr->transactionState != TcConnectionrec::WAIT_ACC) {
- LQHKEY_abort(signal, 3);
- return;
- }//if
- /* ------------------------------------------------------------------------
- * Set transaction state and also reset the activeCreat since that is only
- * valid in cases where the record was not present.
- * ------------------------------------------------------------------------ */
- regTcPtr->transactionState = TcConnectionrec::WAIT_TUP;
- regTcPtr->activeCreat = ZFALSE;
- /* ------------------------------------------------------------------------
- * IT IS NOW TIME TO CONTACT THE TUPLE MANAGER. THE TUPLE MANAGER NEEDS THE
- * INFORMATION ON WHICH TABLE AND FRAGMENT, THE LOCAL KEY AND IT NEEDS TO
- * KNOW THE TYPE OF OPERATION TO PERFORM. TUP CAN SEND THE ATTRINFO DATA
- * EITHER TO THE TC BLOCK OR DIRECTLY TO THE APPLICATION. THE SCHEMA VERSION
- * IS NEEDED SINCE TWO SCHEMA VERSIONS CAN BE ACTIVE SIMULTANEOUSLY ON A
- * TABLE.
- * ----------------------------------------------------------------------- */
- if (regTcPtr->operation == ZWRITE)
- {
- Uint32 op= signal->theData[1];
- if(likely(op == ZINSERT || op == ZUPDATE))
- {
- regTcPtr->operation = op;
- }
- else
- {
- warningEvent("Convering %d to ZUPDATE", op);
- regTcPtr->operation = ZUPDATE;
- }
- }//if
-
- ndbrequire(localKeyFlag == 1);
- localKey2 = localKey1 & MAX_TUPLES_PER_PAGE;
- localKey1 = localKey1 >> MAX_TUPLES_BITS;
- Uint32 Ttupreq = regTcPtr->dirtyOp;
- Ttupreq = Ttupreq + (regTcPtr->opSimple << 1);
- Ttupreq = Ttupreq + (regTcPtr->operation << 6);
- Ttupreq = Ttupreq + (regTcPtr->opExec << 10);
- Ttupreq = Ttupreq + (regTcPtr->apiVersionNo << 11);
- /* ---------------------------------------------------------------------
- * Clear interpreted mode bit since we do not want the next replica to
- * use interpreted mode. The next replica will receive a normal write.
- * --------------------------------------------------------------------- */
- regTcPtr->opExec = 0;
- /* ************< */
- /* TUPKEYREQ < */
- /* ************< */
- TupKeyReq * const tupKeyReq = (TupKeyReq *)signal->getDataPtrSend();
- Uint32 sig0, sig1, sig2, sig3;
- sig0 = regTcPtr->tupConnectrec;
- sig1 = regTcPtr->tableref;
- tupKeyReq->connectPtr = sig0;
- tupKeyReq->request = Ttupreq;
- tupKeyReq->tableRef = sig1;
- tupKeyReq->fragId = Tfragid;
- tupKeyReq->keyRef1 = localKey1;
- tupKeyReq->keyRef2 = localKey2;
- sig0 = regTcPtr->totReclenAi;
- sig1 = regTcPtr->applOprec;
- sig2 = regTcPtr->applRef;
- sig3 = regTcPtr->schemaVersion;
- FragrecordPtr regFragptr;
- regFragptr.i = regTcPtr->fragmentptr;
- ptrCheckGuard(regFragptr, cfragrecFileSize, fragrecord);
- tupKeyReq->attrBufLen = sig0;
- tupKeyReq->opRef = sig1;
- tupKeyReq->applRef = sig2;
- tupKeyReq->schemaVersion = sig3;
- ndbrequire(regTcPtr->localFragptr < 2);
- sig0 = regTcPtr->storedProcId;
- sig1 = regTcPtr->transid[0];
- sig2 = regTcPtr->transid[1];
- sig3 = regFragptr.p->tupFragptr[regTcPtr->localFragptr];
- Uint32 tup = refToBlock(regTcPtr->tcTupBlockref);
- tupKeyReq->storedProcedure = sig0;
- tupKeyReq->transId1 = sig1;
- tupKeyReq->transId2 = sig2;
- tupKeyReq->fragPtr = sig3;
- tupKeyReq->primaryReplica = (tcConnectptr.p->seqNoReplica == 0)?true:false;
- tupKeyReq->coordinatorTC = tcConnectptr.p->tcBlockref;
- tupKeyReq->tcOpIndex = tcConnectptr.p->tcOprec;
- tupKeyReq->savePointId = tcConnectptr.p->savePointId;
- EXECUTE_DIRECT(tup, GSN_TUPKEYREQ, signal, TupKeyReq::SignalLength);
- }//Dblqh::execACCKEYCONF()
- /* --------------------------------------------------------------------------
- * ------- ENTER TUP... -------
- * ENTER TUPKEYCONF WITH
- * TC_CONNECTPTR,
- * TDATA2, LOCAL KEY REFERENCE 1, ONLY INTERESTING AFTER INSERT
- * TDATA3, LOCAL KEY REFERENCE 1, ONLY INTERESTING AFTER INSERT
- * TDATA4, TOTAL LENGTH OF READ DATA SENT TO TC/APPLICATION
- * TDATA5 TOTAL LENGTH OF UPDATE DATA SENT TO/FROM TUP
- * GOTO TUPKEY_CONF
- *
- * TAKE CARE OF RESPONSES FROM TUPLE MANAGER.
- * -------------------------------------------------------------------------- */
- void Dblqh::tupkeyConfLab(Signal* signal)
- {
- /* ---- GET OPERATION TYPE AND CHECK WHAT KIND OF OPERATION IS REQUESTED ---- */
- const TupKeyConf * const tupKeyConf = (TupKeyConf *)&signal->theData[0];
- TcConnectionrec * const regTcPtr = tcConnectptr.p;
- if (regTcPtr->simpleRead) {
- jam();
- /* ----------------------------------------------------------------------
- * THE OPERATION IS A SIMPLE READ. WE WILL IMMEDIATELY COMMIT THE OPERATION.
- * SINCE WE HAVE NOT RELEASED THE FRAGMENT LOCK (FOR LOCAL CHECKPOINTS) YET
- * WE CAN GO IMMEDIATELY TO COMMIT_CONTINUE_AFTER_BLOCKED.
- * WE HAVE ALREADY SENT THE RESPONSE SO WE ARE NOT INTERESTED IN READ LENGTH
- * ---------------------------------------------------------------------- */
- regTcPtr->gci = cnewestGci;
- releaseActiveFrag(signal);
- commitContinueAfterBlockedLab(signal);
- return;
- }//if
- if (tupKeyConf->readLength != 0) {
- jam();
- /* SET BIT 15 IN REQINFO */
- LqhKeyReq::setApplicationAddressFlag(regTcPtr->reqinfo, 1);
- regTcPtr->readlenAi = tupKeyConf->readLength;
- }//if
- regTcPtr->totSendlenAi = tupKeyConf->writeLength;
- ndbrequire(regTcPtr->totSendlenAi == regTcPtr->currTupAiLen);
- rwConcludedLab(signal);
- return;
- }//Dblqh::tupkeyConfLab()
- /* --------------------------------------------------------------------------
- * THE CODE IS FOUND IN THE SIGNAL RECEPTION PART OF LQH
- * -------------------------------------------------------------------------- */
- void Dblqh::rwConcludedLab(Signal* signal)
- {
- TcConnectionrec * const regTcPtr = tcConnectptr.p;
- /* ------------------------------------------------------------------------
- * WE HAVE NOW CONCLUDED READING/WRITING IN ACC AND TUP FOR THIS OPERATION.
- * IT IS NOW TIME TO LOG THE OPERATION, SEND REQUEST TO NEXT NODE OR TC AND
- * FOR SOME TYPES OF OPERATIONS IT IS EVEN TIME TO COMMIT THE OPERATION.
- * ------------------------------------------------------------------------ */
- if (regTcPtr->operation == ZREAD) {
- jam();
- /* ----------------------------------------------------------------------
- * A NORMAL READ OPERATION IS NOT LOGGED BUT IS NOT COMMITTED UNTIL THE
- * COMMIT SIGNAL ARRIVES. THUS WE CONTINUE PACKING THE RESPONSE.
- * ---------------------------------------------------------------------- */
- releaseActiveFrag(signal);
- packLqhkeyreqLab(signal);
- return;
- } else {
- FragrecordPtr regFragptr;
- regFragptr.i = regTcPtr->fragmentptr;
- ptrCheckGuard(regFragptr, cfragrecFileSize, fragrecord);
- if (regFragptr.p->logFlag == Fragrecord::STATE_FALSE){
- if (regTcPtr->dirtyOp == ZTRUE) {
- jam();
- /* ------------------------------------------------------------------
- * THIS OPERATION WAS A WRITE OPERATION THAT DO NOT NEED LOGGING AND
- * THAT CAN CAN BE COMMITTED IMMEDIATELY.
- * ------------------------------------------------------------------ */
- regTcPtr->gci = cnewestGci;
- releaseActiveFrag(signal);
- commitContinueAfterBlockedLab(signal);
- return;
- } else {
- jam();
- /* ------------------------------------------------------------------
- * A NORMAL WRITE OPERATION ON A FRAGMENT WHICH DO NOT NEED LOGGING.
- * WE WILL PACK THE REQUEST/RESPONSE TO THE NEXT NODE/TO TC.
- * ------------------------------------------------------------------ */
- regTcPtr->logWriteState = TcConnectionrec::NOT_WRITTEN;
- releaseActiveFrag(signal);
- packLqhkeyreqLab(signal);
- return;
- }//if
- } else {
- jam();
- /* --------------------------------------------------------------------
- * A DIRTY OPERATION WHICH NEEDS LOGGING. WE START BY LOGGING THE
- * REQUEST. IN THIS CASE WE WILL RELEASE THE FRAGMENT LOCK FIRST.
- * --------------------------------------------------------------------
- * A NORMAL WRITE OPERATION THAT NEEDS LOGGING AND WILL NOT BE
- * PREMATURELY COMMITTED.
- * -------------------------------------------------------------------- */
- releaseActiveFrag(signal);
- logLqhkeyreqLab(signal);
- return;
- }//if
- }//if
- }//Dblqh::rwConcludedLab()
- void Dblqh::rwConcludedAiLab(Signal* signal)
- {
- TcConnectionrec * const regTcPtr = tcConnectptr.p;
- fragptr.i = regTcPtr->fragmentptr;
- /* ------------------------------------------------------------------------
- * WE HAVE NOW CONCLUDED READING/WRITING IN ACC AND TUP FOR THIS OPERATION.
- * IT IS NOW TIME TO LOG THE OPERATION, SEND REQUEST TO NEXT NODE OR TC AND
- * FOR SOME TYPES OF OPERATIONS IT IS EVEN TIME TO COMMIT THE OPERATION.
- * IN THIS CASE WE HAVE ALREADY RELEASED THE FRAGMENT LOCK.
- * ERROR CASES AT FRAGMENT CREATION AND STAND-BY NODES ARE THE REASONS FOR
- * COMING HERE.
- * ------------------------------------------------------------------------ */
- if (regTcPtr->operation == ZREAD) {
- if (regTcPtr->opSimple == 1) {
- jam();
- /* --------------------------------------------------------------------
- * THE OPERATION IS A SIMPLE READ. WE WILL IMMEDIATELY COMMIT THE
- * OPERATION.
- * -------------------------------------------------------------------- */
- regTcPtr->gci = cnewestGci;
- localCommitLab(signal);
- return;
- } else {
- jam();
- /* --------------------------------------------------------------------
- * A NORMAL READ OPERATION IS NOT LOGGED BUT IS NOT COMMITTED UNTIL
- * THE COMMIT SIGNAL ARRIVES. THUS WE CONTINUE PACKING THE RESPONSE.
- * -------------------------------------------------------------------- */
- ptrCheckGuard(fragptr, cfragrecFileSize, fragrecord);
- packLqhkeyreqLab(signal);
- return;
- }//if
- } else {
- jam();
- ptrCheckGuard(fragptr, cfragrecFileSize, fragrecord);
- if (fragptr.p->logFlag == Fragrecord::STATE_FALSE) {
- if (regTcPtr->dirtyOp == ZTRUE) {
- /* ------------------------------------------------------------------
- * THIS OPERATION WAS A WRITE OPERATION THAT DO NOT NEED LOGGING AND
- * THAT CAN CAN BE COMMITTED IMMEDIATELY.
- * ------------------------------------------------------------------ */
- jam();
- /* ----------------------------------------------------------------
- * IT MUST BE ACTIVE CREATION OF A FRAGMENT.
- * ---------------------------------------------------------------- */
- regTcPtr->gci = cnewestGci;
- localCommitLab(signal);
- return;
- } else {
- /* ------------------------------------------------------------------
- * A NORMAL WRITE OPERATION ON A FRAGMENT WHICH DO NOT NEED LOGGING.
- * WE WILL PACK THE REQUEST/RESPONSE TO THE NEXT NODE/TO TC.
- * ------------------------------------------------------------------ */
- jam();
- /* ---------------------------------------------------------------
- * IT MUST BE ACTIVE CREATION OF A FRAGMENT.
- * NOT A DIRTY OPERATION THUS PACK REQUEST/RESPONSE.
- * ---------------------------------------------------------------- */
- regTcPtr->logWriteState = TcConnectionrec::NOT_WRITTEN;
- packLqhkeyreqLab(signal);
- return;
- }//if
- } else {
- jam();
- /* --------------------------------------------------------------------
- * A DIRTY OPERATION WHICH NEEDS LOGGING. WE START BY LOGGING THE
- * REQUEST. IN THIS CASE WE WILL RELEASE THE FRAGMENT LOCK FIRST.
- * -------------------------------------------------------------------- */
- /* A NORMAL WRITE OPERATION THAT NEEDS LOGGING AND WILL NOT BE
- * PREMATURELY COMMITTED.
- * -------------------------------------------------------------------- */
- logLqhkeyreqLab(signal);
- return;
- }//if
- }//if
- }//Dblqh::rwConcludedAiLab()
- /* ##########################################################################
- * ####### LOG MODULE #######
- *
- * ##########################################################################
- * --------------------------------------------------------------------------
- * THE LOG MODULE HANDLES THE READING AND WRITING OF THE LOG
- * IT IS ALSO RESPONSIBLE FOR HANDLING THE SYSTEM RESTART.
- * IT CONTROLS THE SYSTEM RESTART IN TUP AND ACC AS WELL.
- * -------------------------------------------------------------------------- */
- void Dblqh::logLqhkeyreqLab(Signal* signal)
- {
- UintR tcurrentFilepage;
- TcConnectionrecPtr tmpTcConnectptr;
- if (cnoOfLogPages < ZMIN_LOG_PAGES_OPERATION || ERROR_INSERTED(5032)) {
- jam();
- if(ERROR_INSERTED(5032)){
- CLEAR_ERROR_INSERT_VALUE;
- }
- /*---------------------------------------------------------------------------*/
- // The log disk is having problems in catching up with the speed of execution.
- // We must wait with writing the log of this operation to ensure we do not
- // overload the log.
- /*---------------------------------------------------------------------------*/
- terrorCode = ZTEMPORARY_REDO_LOG_FAILURE;
- abortErrorLab(signal);
- return;
- }//if
- TcConnectionrec * const regTcPtr = tcConnectptr.p;
- logPartPtr.i = regTcPtr->hashValue & 3;
- ptrCheckGuard(logPartPtr, clogPartFileSize, logPartRecord);
- /* -------------------------------------------------- */
- /* THIS PART IS USED TO WRITE THE LOG */
- /* -------------------------------------------------- */
- /* -------------------------------------------------- */
- /* CHECK IF A LOG OPERATION IS ONGOING ALREADY. */
- /* IF SO THEN QUEUE THE OPERATION FOR LATER */
- /* RESTART WHEN THE LOG PART IS FREE AGAIN. */
- /* -------------------------------------------------- */
- LogPartRecord * const regLogPartPtr = logPartPtr.p;
- if(ERROR_INSERTED(5033)){
- jam();
- CLEAR_ERROR_INSERT_VALUE;
- if ((regLogPartPtr->firstLogQueue != RNIL) &&
- (regLogPartPtr->LogLqhKeyReqSent == ZFALSE)) {
- /* -------------------------------------------------- */
- /* WE HAVE A PROBLEM IN THAT THE LOG HAS NO */
- /* ROOM FOR ADDITIONAL OPERATIONS AT THE MOMENT.*/
- /* -------------------------------------------------- */
- /* -------------------------------------------------- */
- /* WE MUST STILL RESTART QUEUED OPERATIONS SO */
- /* THEY ALSO CAN BE ABORTED. */
- /* -------------------------------------------------- */
- regLogPartPtr->LogLqhKeyReqSent = ZTRUE;
- signal->theData[0] = ZLOG_LQHKEYREQ;
- signal->theData[1] = logPartPtr.i;
- sendSignal(cownref, GSN_CONTINUEB, signal, 2, JBB);
- }//if
-
- terrorCode = ZTAIL_PROBLEM_IN_LOG_ERROR;
- abortErrorLab(signal);
- return;
- }
-
- if (regLogPartPtr->logPartState == LogPartRecord::IDLE) {
- ;
- } else if (regLogPartPtr->logPartState == LogPartRecord::ACTIVE) {
- jam();
- linkWaitLog(signal, logPartPtr);
- regTcPtr->transactionState = TcConnectionrec::LOG_QUEUED;
- return;
- } else {
- if ((regLogPartPtr->firstLogQueue != RNIL) &&
- (regLogPartPtr->LogLqhKeyReqSent == ZFALSE)) {
- /* -------------------------------------------------- */
- /* WE HAVE A PROBLEM IN THAT THE LOG HAS NO */
- /* ROOM FOR ADDITIONAL OPERATIONS AT THE MOMENT.*/
- /* -------------------------------------------------- */
- /* -------------------------------------------------- */
- /* WE MUST STILL RESTART QUEUED OPERATIONS SO */
- /* THEY ALSO CAN BE ABORTED. */
- /* -------------------------------------------------- */
- regLogPartPtr->LogLqhKeyReqSent = ZTRUE;
- signal->theData[0] = ZLOG_LQHKEYREQ;
- signal->theData[1] = logPartPtr.i;
- sendSignal(cownref, GSN_CONTINUEB, signal, 2, JBB);
- }//if
- if (regLogPartPtr->logPartState == LogPartRecord::TAIL_PROBLEM) {
- jam();
- terrorCode = ZTAIL_PROBLEM_IN_LOG_ERROR;
- } else {
- ndbrequire(regLogPartPtr->logPartState == LogPartRecord::FILE_CHANGE_PROBLEM);
- jam();
- terrorCode = ZFILE_CHANGE_PROBLEM_IN_LOG_ERROR;
- }//if
- abortErrorLab(signal);
- return;
- }//if
- regLogPartPtr->logPartState = LogPartRecord::ACTIVE;
- logFilePtr.i = regLogPartPtr->currentLogfile;
- ptrCheckGuard(logFilePtr, clogFileFileSize, logFileRecord);
- /* -------------------------------------------------- */
- /* CHECK IF A NEW MBYTE IS TO BE STARTED. IF */
- /* SO INSERT A NEXT LOG RECORD, WRITE THE LOG */
- /* AND PLACE THE LOG POINTER ON THE NEW POSITION*/
- /* IF A NEW FILE IS TO BE USED, CHANGE FILE AND */
- /* ALSO START OPENING THE NEXT LOG FILE. IF A */
- /* LAP HAS BEEN COMPLETED THEN ADD ONE TO LAP */
- /* COUNTER. */
- /* -------------------------------------------------- */
- checkNewMbyte(signal);
- /* -------------------------------------------------- */
- /* INSERT THE OPERATION RECORD LAST IN THE LIST */
- /* OF NOT COMPLETED OPERATIONS. ALSO RECORD THE */
- /* FILE NO, PAGE NO AND PAGE INDEX OF THE START */
- /* OF THIS LOG RECORD. */
- /* IT IS NOT ALLOWED TO INSERT IT INTO THE LIST */
- /* BEFORE CHECKING THE NEW MBYTE SINCE THAT WILL*/
- /* CAUSE THE OLD VALUES OF TC_CONNECTPTR TO BE */
- /* USED IN WRITE_FILE_DESCRIPTOR. */
- /* -------------------------------------------------- */
- Uint32 tcIndex = tcConnectptr.i;
- tmpTcConnectptr.i = regLogPartPtr->lastLogTcrec;
- regLogPartPtr->lastLogTcrec = tcIndex;
- if (tmpTcConnectptr.i == RNIL) {
- jam();
- regLogPartPtr->firstLogTcrec = tcIndex;
- } else {
- ptrCheckGuard(tmpTcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
- tmpTcConnectptr.p->nextLogTcrec = tcIndex;
- }//if
- Uint32 fileNo = logFilePtr.p->fileNo;
- tcurrentFilepage = logFilePtr.p->currentFilepage;
- logPagePtr.i = logFilePtr.p->currentLogpage;
- regTcPtr->nextLogTcrec = RNIL;
- regTcPtr->prevLogTcrec = tmpTcConnectptr.i;
- ptrCheckGuard(logPagePtr, clogPageFileSize, logPageRecord);
- Uint32 pageIndex = logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX];
- regTcPtr->logStartFileNo = fileNo;
- regTcPtr->logStartPageNo = tcurrentFilepage;
- regTcPtr->logStartPageIndex = pageIndex;
- /* -------------------------------------------------- */
- /* WRITE THE LOG HEADER OF THIS OPERATION. */
- /* -------------------------------------------------- */
- writeLogHeader(signal);
- /* -------------------------------------------------- */
- /* WRITE THE TUPLE KEY OF THIS OPERATION. */
- /* -------------------------------------------------- */
- writeKey(signal);
- /* -------------------------------------------------- */
- /* WRITE THE ATTRIBUTE INFO OF THIS OPERATION. */
- /* -------------------------------------------------- */
- writeAttrinfoLab(signal);
- logNextStart(signal);
- /* -------------------------------------------------- */
- /* RESET THE STATE OF THE LOG PART. IF ANY */
- /* OPERATIONS HAVE QUEUED THEN START THE FIRST */
- /* OF THESE. */
- /* -------------------------------------------------- */
- /* -------------------------------------------------- */
- /* CONTINUE WITH PACKING OF LQHKEYREQ */
- /* -------------------------------------------------- */
- tcurrentFilepage = logFilePtr.p->currentFilepage;
- if (logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX] == ZPAGE_HEADER_SIZE) {
- jam();
- tcurrentFilepage--;
- }//if
- regTcPtr->logStopPageNo = tcurrentFilepage;
- regTcPtr->logWriteState = TcConnectionrec::WRITTEN;
- if (regTcPtr->abortState != TcConnectionrec::ABORT_IDLE) {
- /* -------------------------------------------------- */
- /* AN ABORT HAVE BEEN ORDERED. THE ABORT WAITED */
- /* FOR THE LOG WRITE TO BE COMPLETED. NOW WE */
- /* CAN PROCEED WITH THE NORMAL ABORT HANDLING. */
- /* -------------------------------------------------- */
- abortCommonLab(signal);
- return;
- }//if
- if (regTcPtr->dirtyOp != ZTRUE) {
- packLqhkeyreqLab(signal);
- } else {
- /* ----------------------------------------------------------------------
- * I NEED TO INSERT A COMMIT LOG RECORD SINCE WE ARE WRITING LOG IN THIS
- * TRANSACTION. SINCE WE RELEASED THE LOG LOCK JUST NOW NO ONE ELSE CAN BE
- * ACTIVE IN WRITING THE LOG. WE THUS WRITE THE LOG WITHOUT GETTING A LOCK
- * SINCE WE ARE ONLY WRITING A COMMIT LOG RECORD.
- * ---------------------------------------------------------------------- */
- writeCommitLog(signal, logPartPtr);
- /* ----------------------------------------------------------------------
- * DIRTY OPERATIONS SHOULD COMMIT BEFORE THEY PACK THE REQUEST/RESPONSE.
- * ---------------------------------------------------------------------- */
- regTcPtr->gci = cnewestGci;
- localCommitLab(signal);
- }//if
- }//Dblqh::logLqhkeyreqLab()
- /* ------------------------------------------------------------------------- */
- /* ------- SEND LQHKEYREQ */
- /* */
- /* NO STATE CHECKING SINCE THE SIGNAL IS A LOCAL SIGNAL. THE EXECUTION OF */
- /* THE OPERATION IS COMPLETED. IT IS NOW TIME TO SEND THE OPERATION TO THE */
- /* NEXT REPLICA OR TO TC. */
- /* ------------------------------------------------------------------------- */
- void Dblqh::packLqhkeyreqLab(Signal* signal)
- {
- TcConnectionrec * const regTcPtr = tcConnectptr.p;
- if (regTcPtr->nextReplica == ZNIL) {
- /* ------------------------------------------------------------------------- */
- /* ------- SEND LQHKEYCONF ------- */
- /* */
- /* ------------------------------------------------------------------------- */
- sendLqhkeyconfTc(signal, regTcPtr->tcBlockref);
- if (regTcPtr->dirtyOp != ZTRUE) {
- jam();
- regTcPtr->transactionState = TcConnectionrec::PREPARED;
- releaseOprec(signal);
- } else {
- jam();
- /*************************************************************>*/
- /* DIRTY WRITES ARE USED IN TWO SITUATIONS. THE FIRST */
- /* SITUATION IS WHEN THEY ARE USED TO UPDATE COUNTERS AND*/
- /* OTHER ATTRIBUTES WHICH ARE NOT SENSITIVE TO CONSISTE- */
- /* NCY. THE SECOND SITUATION IS BY OPERATIONS THAT ARE */
- /* SENT AS PART OF A COPY FRAGMENT PROCESS. */
- /* */
- /* DURING A COPY FRAGMENT PROCESS THERE IS NO LOGGING */
- /* ONGOING SINCE THE FRAGMENT IS NOT COMPLETE YET. THE */
- /* LOGGING STARTS AFTER COMPLETING THE LAST COPY TUPLE */
- /* OPERATION. THE EXECUTION OF THE LAST COPY TUPLE DOES */
- /* ALSO START A LOCAL CHECKPOINT SO THAT THE FRAGMENT */
- /* REPLICA IS RECOVERABLE. THUS GLOBAL CHECKPOINT ID FOR */
- /* THOSE OPERATIONS ARE NOT INTERESTING. */
- /* */
- /* A DIRTY WRITE IS BY DEFINITION NOT CONSISTENT. THUS */
- /* IT CAN USE ANY GLOBAL CHECKPOINT. THE IDEA HERE IS TO */
- /* ALWAYS USE THE LATEST DEFINED GLOBAL CHECKPOINT ID IN */
- /* THIS NODE. */
- /*************************************************************>*/
- cleanUp(signal);
- }//if
- return;
- }//if
- /* ------------------------------------------------------------------------- */
- /* ------- SEND LQHKEYREQ ------- */
- /* */
- /* ------------------------------------------------------------------------- */
- /* ------------------------------------------------------------------------- */
- /* THERE ARE MORE REPLICAS TO SEND THE OPERATION TO. A NEW LQHKEYREQ WILL BE */
- /* PREPARED FOR THE NEXT REPLICA. */
- /* ------------------------------------------------------------------------- */
- /* CLEAR REPLICA TYPE, ATTRINFO INDICATOR (IN LQHKEYREQ), */
- /* INTERPRETED EXECUTION, SEQUENTIAL NUMBER OF REPLICA. */
- // Set bit indicating Client and TC record not the same.
- // Set readlenAi indicator if readlenAi != 0
- // Stored Procedure Indicator not set.
- /* ------------------------------------------------------------------------- */
- LqhKeyReq * const lqhKeyReq = (LqhKeyReq *)&signal->theData[0];
- UintR Treqinfo;
- UintR sig0, sig1, sig2, sig3, sig4, sig5, sig6;
- Treqinfo = preComputedRequestInfoMask & regTcPtr->reqinfo;
- UintR TapplAddressIndicator = (regTcPtr->nextSeqNoReplica == 0 ? 0 : 1);
- LqhKeyReq::setApplicationAddressFlag(Treqinfo, TapplAddressIndicator);
- LqhKeyReq::setInterpretedFlag(Treqinfo, regTcPtr->opExec);
- LqhKeyReq::setSeqNoReplica(Treqinfo, regTcPtr->nextSeqNoReplica);
- LqhKeyReq::setAIInLqhKeyReq(Treqinfo, regTcPtr->reclenAiLqhkey);
- UintR TreadLenAiInd = (regTcPtr->readlenAi == 0 ? 0 : 1);
- UintR TsameLqhAndClient = (tcConnectptr.i ==
- regTcPtr->tcOprec ? 0 : 1);
- LqhKeyReq::setSameClientAndTcFlag(Treqinfo, TsameLqhAndClient);
- LqhKeyReq::setReturnedReadLenAIFlag(Treqinfo, TreadLenAiInd);
- UintR TotReclenAi = regTcPtr->totSendlenAi;
- /* ------------------------------------------------------------------------- */
- /* WE ARE NOW PREPARED TO SEND THE LQHKEYREQ. WE HAVE TO DECIDE IF ATTRINFO */
- /* IS INCLUDED IN THE LQHKEYREQ SIGNAL AND THEN SEND IT. */
- /* TAKE OVER SCAN OPERATION IS NEVER USED ON BACKUPS, LOG RECORDS AND START-UP*/
- /* OF NEW REPLICA AND THUS ONLY TOT_SENDLEN_AI IS USED THE UPPER 16 BITS ARE */
- /* ZERO. */
- /* ------------------------------------------------------------------------- */
- sig0 = tcConnectptr.i;
- sig1 = regTcPtr->savePointId;
- sig2 = regTcPtr->hashValue;
- sig4 = regTcPtr->tcBlockref;
- lqhKeyReq->clientConnectPtr = sig0;
- lqhKeyReq->attrLen = TotReclenAi;
- lqhKeyReq->savePointId = sig1;
- lqhKeyReq->hashValue = sig2;
- lqhKeyReq->requestInfo = Treqinfo;
- lqhKeyReq->tcBlockref = sig4;
- sig0 = regTcPtr->tableref + ((regTcPtr->schemaVersion << 16) & 0xFFFF0000);
- sig1 = regTcPtr->fragmentid + (regTcPtr->nodeAfterNext[0] << 16);
- sig2 = regTcPtr->transid[0];
- sig3 = regTcPtr->transid[1];
- sig4 = regTcPtr->applRef;
- sig5 = regTcPtr->applOprec;
- sig6 = regTcPtr->tcOprec;
- UintR nextPos = (TapplAddressIndicator << 1);
- lqhKeyReq->tableSchemaVersion = sig0;
- lqhKeyReq->fragmentData = sig1;
- lqhKeyReq->transId1 = sig2;
- lqhKeyReq->transId2 = sig3;
- lqhKeyReq->noFiredTriggers = regTcPtr->noFiredTriggers;
- lqhKeyReq->variableData[0] = sig4;
- lqhKeyReq->variableData[1] = sig5;
- lqhKeyReq->variableData[2] = sig6;
- nextPos += TsameLqhAndClient;
- if ((regTcPtr->lastReplicaNo - regTcPtr->nextSeqNoReplica) > 1) {
- sig0 = (UintR)regTcPtr->nodeAfterNext[1] +
- (UintR)(regTcPtr->nodeAfterNext[2] << 16);
- lqhKeyReq->variableData[nextPos] = sig0;
- nextPos++;
- }//if
- sig0 = regTcPtr->readlenAi;
- sig1 = regTcPtr->tupkeyData[0];
- sig2 = regTcPtr->tupkeyData[1];
- sig3 = regTcPtr->tupkeyData[2];
- sig4 = regTcPtr->tupkeyData[3];
- lqhKeyReq->variableData[nextPos] = sig0;
- nextPos += TreadLenAiInd;
- lqhKeyReq->variableData[nextPos] = sig1;
- lqhKeyReq->variableData[nextPos + 1] = sig2;
- lqhKeyReq->variableData[nextPos + 2] = sig3;
- lqhKeyReq->variableData[nextPos + 3] = sig4;
- UintR TkeyLen = LqhKeyReq::getKeyLen(Treqinfo);
- if (TkeyLen < 4) {
- nextPos += TkeyLen;
- } else {
- nextPos += 4;
- }//if
- sig0 = regTcPtr->firstAttrinfo[0];
- sig1 = regTcPtr->firstAttrinfo[1];
- sig2 = regTcPtr->firstAttrinfo[2];
- sig3 = regTcPtr->firstAttrinfo[3];
- sig4 = regTcPtr->firstAttrinfo[4];
- UintR TAiLen = regTcPtr->reclenAiLqhkey;
- BlockReference lqhRef = calcLqhBlockRef(regTcPtr->nextReplica);
- lqhKeyReq->variableData[nextPos] = sig0;
- lqhKeyReq->variableData[nextPos + 1] = sig1;
- lqhKeyReq->variableData[nextPos + 2] = sig2;
- lqhKeyReq->variableData[nextPos + 3] = sig3;
- lqhKeyReq->variableData[nextPos + 4] = sig4;
- nextPos += TAiLen;
- sendSignal(lqhRef, GSN_LQHKEYREQ, signal,
- nextPos + LqhKeyReq::FixedSignalLength, JBB);
- if (regTcPtr->primKeyLen > 4) {
- jam();
- /* ------------------------------------------------------------------------- */
- /* MORE THAN 4 WORDS OF KEY DATA IS IN THE OPERATION. THEREFORE WE NEED TO */
- /* PREPARE A KEYINFO SIGNAL. MORE THAN ONE KEYINFO SIGNAL CAN BE SENT. */
- /* ------------------------------------------------------------------------- */
- sendTupkey(signal);
- }//if
- /* ------------------------------------------------------------------------- */
- /* NOW I AM PREPARED TO SEND ALL THE ATTRINFO SIGNALS. AT THE MOMENT A LOOP */
- /* SENDS ALL AT ONCE. LATER WE HAVE TO ADDRESS THE PROBLEM THAT THESE COULD */
- /* LEAD TO BUFFER EXPLOSION => NODE CRASH. */
- /* ------------------------------------------------------------------------- */
- /* NEW CODE TO SEND ATTRINFO IN PACK_LQHKEYREQ */
- /* THIS CODE USES A REAL-TIME BREAK AFTER */
- /* SENDING 16 SIGNALS. */
- /* -------------------------------------------------- */
- sig0 = regTcPtr->tcOprec;
- sig1 = regTcPtr->transid[0];
- sig2 = regTcPtr->transid[1];
- signal->theData[0] = sig0;
- signal->theData[1] = sig1;
- signal->theData[2] = sig2;
- AttrbufPtr regAttrinbufptr;
- regAttrinbufptr.i = regTcPtr->firstAttrinbuf;
- while (regAttrinbufptr.i != RNIL) {
- ptrCheckGuard(regAttrinbufptr, cattrinbufFileSize, attrbuf);
- jam();
- Uint32 dataLen = regAttrinbufptr.p->attrbuf[ZINBUF_DATA_LEN];
- ndbrequire(dataLen != 0);
- MEMCOPY_NO_WORDS(&signal->theData[3], ®Attrinbufptr.p->attrbuf[0], dataLen);
- regAttrinbufptr.i = regAttrinbufptr.p->attrbuf[ZINBUF_NEXT];
- sendSignal(lqhRef, GSN_ATTRINFO, signal, dataLen + 3, JBB);
- }//while
- regTcPtr->transactionState = TcConnectionrec::PREPARED;
- if (regTcPtr->dirtyOp == ZTRUE) {
- jam();
- /*************************************************************>*/
- /* DIRTY WRITES ARE USED IN TWO SITUATIONS. THE FIRST */
- /* SITUATION IS WHEN THEY ARE USED TO UPDATE COUNTERS AND*/
- /* OTHER ATTRIBUTES WHICH ARE NOT SENSITIVE TO CONSISTE- */
- /* NCY. THE SECOND SITUATION IS BY OPERATIONS THAT ARE */
- /* SENT AS PART OF A COPY FRAGMENT PROCESS. */
- /* */
- /* DURING A COPY FRAGMENT PROCESS THERE IS NO LOGGING */
- /* ONGOING SINCE THE FRAGMENT IS NOT COMPLETE YET. THE */
- /* LOGGING STARTS AFTER COMPLETING THE LAST COPY TUPLE */
- /* OPERATION. THE EXECUTION OF THE LAST COPY TUPLE DOES */
- /* ALSO START A LOCAL CHECKPOINT SO THAT THE FRAGMENT */
- /* REPLICA IS RECOVERABLE. THUS GLOBAL CHECKPOINT ID FOR */
- /* THOSE OPERATIONS ARE NOT INTERESTING. */
- /* */
- /* A DIRTY WRITE IS BY DEFINITION NOT CONSISTENT. THUS */
- /* IT CAN USE ANY GLOBAL CHECKPOINT. THE IDEA HERE IS TO */
- /* ALWAYS USE THE LATEST DEFINED GLOBAL CHECKPOINT ID IN */
- /* THIS NODE. */
- /*************************************************************>*/
- cleanUp(signal);
- return;
- }//if
- /* ------------------------------------------------------------------------
- * ALL INFORMATION NEEDED BY THE COMMIT PHASE AND COMPLETE PHASE IS
- * KEPT IN THE TC_CONNECT RECORD. TO ENSURE PROPER USE OF MEMORY
- * RESOURCES WE DEALLOCATE THE ATTRINFO RECORD AND KEY RECORDS
- * AS SOON AS POSSIBLE.
- * ------------------------------------------------------------------------ */
- releaseOprec(signal);
- }//Dblqh::packLqhkeyreqLab()
- /* ========================================================================= */
- /* ==== CHECK IF THE LOG RECORD FITS INTO THE CURRENT MBYTE, ======= */
- /* OTHERWISE SWITCH TO NEXT MBYTE. */
- /* */
- /* ========================================================================= */
- void Dblqh::checkNewMbyte(Signal* signal)
- {
- UintR tcnmTmp;
- UintR ttotalLogSize;
- /* -------------------------------------------------- */
- /* CHECK IF A NEW MBYTE OF LOG RECORD IS TO BE */
- /* OPENED BEFORE WRITING THE LOG RECORD. NO LOG */
- /* RECORDS ARE ALLOWED TO SPAN A MBYTE BOUNDARY */
- /* */
- /* INPUT: TC_CONNECTPTR THE OPERATION */
- /* LOG_FILE_PTR THE LOG FILE */
- /* OUTPUT: LOG_FILE_PTR THE NEW LOG FILE */
- /* -------------------------------------------------- */
- ttotalLogSize = ZLOG_HEAD_SIZE + tcConnectptr.p->currTupAiLen;
- ttotalLogSize = ttotalLogSize + tcConnectptr.p->primKeyLen;
- tcnmTmp = logFilePtr.p->remainingWordsInMbyte;
- if ((ttotalLogSize + ZNEXT_LOG_SIZE) <= tcnmTmp) {
- ndbrequire(tcnmTmp >= ttotalLogSize);
- logFilePtr.p->remainingWordsInMbyte = tcnmTmp - ttotalLogSize;
- return;
- } else {
- jam();
- /* -------------------------------------------------- */
- /* IT WAS NOT ENOUGH SPACE IN THIS MBYTE FOR */
- /* THIS LOG RECORD. MOVE TO NEXT MBYTE */
- /* THIS MIGHT INCLUDE CHANGING LOG FILE */
- /* -------------------------------------------------- */
- /* WE HAVE TO INSERT A NEXT LOG RECORD FIRST */
- /* -------------------------------------------------- */
- /* THEN CONTINUE BY WRITING THE FILE DESCRIPTORS*/
- /* -------------------------------------------------- */
- logPagePtr.i = logFilePtr.p->currentLogpage;
- ptrCheckGuard(logPagePtr, clogPageFileSize, logPageRecord);
- changeMbyte(signal);
- tcnmTmp = logFilePtr.p->remainingWordsInMbyte;
- }//if
- ndbrequire(tcnmTmp >= ttotalLogSize);
- logFilePtr.p->remainingWordsInMbyte = tcnmTmp - ttotalLogSize;
- }//Dblqh::checkNewMbyte()
- /* --------------------------------------------------------------------------
- * ------- WRITE OPERATION HEADER TO LOG -------
- *
- * SUBROUTINE SHORT NAME: WLH
- * ------------------------------------------------------------------------- */
- void Dblqh::writeLogHeader(Signal* signal)
- {
- Uint32 logPos = logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX];
- Uint32 hashValue = tcConnectptr.p->hashValue;
- Uint32 operation = tcConnectptr.p->operation;
- Uint32 keyLen = tcConnectptr.p->primKeyLen;
- Uint32 aiLen = tcConnectptr.p->currTupAiLen;
- Uint32 totLogLen = aiLen + keyLen + ZLOG_HEAD_SIZE;
- if ((logPos + ZLOG_HEAD_SIZE) < ZPAGE_SIZE) {
- Uint32* dataPtr = &logPagePtr.p->logPageWord[logPos];
- logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX] = logPos + ZLOG_HEAD_SIZE;
- dataPtr[0] = ZPREP_OP_TYPE;
- dataPtr[1] = totLogLen;
- dataPtr[2] = hashValue;
- dataPtr[3] = operation;
- dataPtr[4] = aiLen;
- dataPtr[5] = keyLen;
- } else {
- writeLogWord(signal, ZPREP_OP_TYPE);
- writeLogWord(signal, totLogLen);
- writeLogWord(signal, hashValue);
- writeLogWord(signal, operation);
- writeLogWord(signal, aiLen);
- writeLogWord(signal, keyLen);
- }//if
- }//Dblqh::writeLogHeader()
- /* --------------------------------------------------------------------------
- * ------- WRITE TUPLE KEY TO LOG -------
- *
- * SUBROUTINE SHORT NAME: WK
- * ------------------------------------------------------------------------- */
- void Dblqh::writeKey(Signal* signal)
- {
- TcConnectionrec * const regTcPtr = tcConnectptr.p;
- Uint32 logPos, endPos, dataLen;
- Int32 remainingLen;
- logPos = logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX];
- remainingLen = regTcPtr->primKeyLen;
- dataLen = remainingLen;
- if (remainingLen > 4)
- dataLen = 4;
- remainingLen -= dataLen;
- endPos = logPos + dataLen;
- if (endPos < ZPAGE_SIZE) {
- MEMCOPY_NO_WORDS(&logPagePtr.p->logPageWord[logPos],
- ®TcPtr->tupkeyData[0],
- dataLen);
- } else {
- jam();
- for (Uint32 i = 0; i < dataLen; i++)
- writeLogWord(signal, regTcPtr->tupkeyData[i]);
- endPos = logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX];
- }//if
- DatabufPtr regDatabufptr;
- regDatabufptr.i = regTcPtr->firstTupkeybuf;
- while (remainingLen > 0) {
- logPos = endPos;
- ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
- dataLen = remainingLen;
- if (remainingLen > 4)
- dataLen = 4;
- remainingLen -= dataLen;
- endPos += dataLen;
- if (endPos < ZPAGE_SIZE) {
- MEMCOPY_NO_WORDS(&logPagePtr.p->logPageWord[logPos],
- ®Databufptr.p->data[0],
- dataLen);
- } else {
- logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX] = logPos;
- for (Uint32 i = 0; i < dataLen; i++)
- writeLogWord(signal, regDatabufptr.p->data[i]);
- endPos = logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX];
- }//if
- regDatabufptr.i = regDatabufptr.p->nextDatabuf;
- }//while
- logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX] = endPos;
- ndbrequire(regDatabufptr.i == RNIL);
- }//Dblqh::writeKey()
- /* --------------------------------------------------------------------------
- * ------- WRITE ATTRINFO TO LOG -------
- *
- * SUBROUTINE SHORT NAME: WA
- * ------------------------------------------------------------------------- */
- void Dblqh::writeAttrinfoLab(Signal* signal)
- {
- TcConnectionrec * const regTcPtr = tcConnectptr.p;
- Uint32 totLen = regTcPtr->currTupAiLen;
- if (totLen == 0)
- return;
- Uint32 logPos = logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX];
- Uint32 lqhLen = regTcPtr->reclenAiLqhkey;
- ndbrequire(totLen >= lqhLen);
- Uint32 endPos = logPos + lqhLen;
- totLen -= lqhLen;
- if (endPos < ZPAGE_SIZE) {
- MEMCOPY_NO_WORDS(&logPagePtr.p->logPageWord[logPos],
- ®TcPtr->firstAttrinfo[0],
- lqhLen);
- } else {
- for (Uint32 i = 0; i < lqhLen; i++)
- writeLogWord(signal, regTcPtr->firstAttrinfo[i]);
- endPos = logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX];
- }//if
- AttrbufPtr regAttrinbufptr;
- regAttrinbufptr.i = regTcPtr->firstAttrinbuf;
- while (totLen > 0) {
- logPos = endPos;
- ptrCheckGuard(regAttrinbufptr, cattrinbufFileSize, attrbuf);
- Uint32 dataLen = regAttrinbufptr.p->attrbuf[ZINBUF_DATA_LEN];
- ndbrequire(totLen >= dataLen);
- ndbrequire(dataLen > 0);
- totLen -= dataLen;
- endPos += dataLen;
- if (endPos < ZPAGE_SIZE) {
- MEMCOPY_NO_WORDS(&logPagePtr.p->logPageWord[logPos],
- ®Attrinbufptr.p->attrbuf[0],
- dataLen);
- } else {
- logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX] = logPos;
- for (Uint32 i = 0; i < dataLen; i++)
- writeLogWord(signal, regAttrinbufptr.p->attrbuf[i]);
- endPos = logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX];
- }//if
- regAttrinbufptr.i = regAttrinbufptr.p->attrbuf[ZINBUF_NEXT];
- }//while
- logPagePtr.p->logPageWord[ZCURR_PAGE_INDEX] = endPos;
- ndbrequire(regAttrinbufptr.i == RNIL);
- }//Dblqh::writeAttrinfoLab()
- /* ------------------------------------------------------------------------- */
- /* ------- SEND TUPLE KEY IN KEYINFO SIGNAL(S) ------- */
- /* */
- /* SUBROUTINE SHORT NAME: STU */
- /* ------------------------------------------------------------------------- */
- void Dblqh::sendTupkey(Signal* signal)
- {
- UintR TdataPos = 3;
- BlockReference lqhRef = calcLqhBlockRef(tcConnectptr.p->nextReplica);
- signal->theData[0] = tcConnectptr.p->tcOprec;
- signal->theData[1] = tcConnectptr.p->transid[0];
- signal->theData[2] = tcConnectptr.p->transid[1];
- databufptr.i = tcConnectptr.p->firstTupkeybuf;
- do {
- ptrCheckGuard(databufptr, cdatabufFileSize, databuf);
- signal->theData[TdataPos] = databufptr.p->data[0];
- signal->theData[TdataPos + 1] = databufptr.p->data[1];
- signal->theData[TdataPos + 2] = databufptr.p->data[2];
- signal->theData[TdataPos + 3] = databufptr.p->data[3];
- databufptr.i = databufptr.p->nextDatabuf;
- TdataPos += 4;
- if (databufptr.i == RNIL) {
- jam();
- sendSignal(lqhRef, GSN_KEYINFO, signal, TdataPos, JBB);
- return;
- } else if (TdataPos == 23) {
- jam();
- sendSignal(lqhRef, GSN_KEYINFO, signal, 23, JBB);
- TdataPos = 3;
- }
- } while (1);
- }//Dblqh::sendTupkey()
- void Dblqh::cleanUp(Signal* signal)
- {
- releaseOprec(signal);
- deleteTransidHash(signal);
- releaseTcrec(signal, tcConnectptr);
- }//Dblqh::cleanUp()
- /* --------------------------------------------------------------------------
- * ---- RELEASE ALL RECORDS CONNECTED TO THE OPERATION RECORD AND THE ----
- * OPERATION RECORD ITSELF
- * ------------------------------------------------------------------------- */
- void Dblqh::releaseOprec(Signal* signal)
- {
- UintR Tmpbuf;
- TcConnectionrec * const regTcPtr = tcConnectptr.p;
- /* ---- RELEASE DATA BUFFERS ------------------- */
- DatabufPtr regDatabufptr;
- regDatabufptr.i = regTcPtr->firstTupkeybuf;
- /* --------------------------------------------------------------------------
- * ------- RELEASE DATA BUFFERS -------
- *
- * ------------------------------------------------------------------------- */
- while (regDatabufptr.i != RNIL) {
- jam();
- ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
- Tmpbuf = regDatabufptr.p->nextDatabuf;
- regDatabufptr.p->nextDatabuf = cfirstfreeDatabuf;
- cfirstfreeDatabuf = regDatabufptr.i;
- regDatabufptr.i = Tmpbuf;
- }//while
- /* ---- RELEASE ATTRINFO BUFFERS ------------------- */
- AttrbufPtr regAttrinbufptr;
- regAttrinbufptr.i = regTcPtr->firstAttrinbuf;
- /* ########################################################################
- * ####### RELEASE_ATTRINBUF #######
- *
- * ####################################################################### */
- while (regAttrinbufptr.i != RNIL) {
- jam();
- regAttrinbufptr.i= release_attrinbuf(regAttrinbufptr.i);
- }//while
- regTcPtr->firstAttrinbuf = RNIL;
- regTcPtr->lastAttrinbuf = RNIL;
- regTcPtr->firstTupkeybuf = RNIL;
- regTcPtr->lastTupkeybuf = RNIL;
- }//Dblqh::releaseOprec()
- /* ------------------------------------------------------------------------- */
- /* ------ DELETE TRANSACTION ID FROM HASH TABLE ------- */
- /* */
- /* ------------------------------------------------------------------------- */
- void Dblqh::deleteTransidHash(Signal* signal)
- {
- TcConnectionrec * const regTcPtr = tcConnectptr.p;
- TcConnectionrecPtr prevHashptr;
- TcConnectionrecPtr nextHashptr;
- prevHashptr.i = regTcPtr->prevHashRec;
- nextHashptr.i = regTcPtr->nextHashRec;
- if (prevHashptr.i != RNIL) {
- jam();
- ptrCheckGuard(prevHashptr, ctcConnectrecFileSize, tcConnectionrec);
- prevHashptr.p->nextHashRec = nextHashptr.i;
- } else {
- jam();
- /* ------------------------------------------------------------------------- */
- /* THE OPERATION WAS PLACED FIRST IN THE LIST OF THE HASH TABLE. NEED TO SET */
- /* A NEW LEADER OF THE LIST. */
- /* ------------------------------------------------------------------------- */
- Uint32 hashIndex = (regTcPtr->transid[0] ^ regTcPtr->tcOprec) & 1023;
- ctransidHash[hashIndex] = nextHashptr.i;
- }//if
- if (nextHashptr.i != RNIL) {
- jam();
- ptrCheckGuard(nextHashptr, ctcConnectrecFileSize, tcConnectionrec);
- nextHashptr.p->prevHashRec = prevHashptr.i;
- }//if
- }//Dblqh::deleteTransidHash()
- /* --------------------------------------------------------------------------
- * ------- LINK OPERATION IN ACTIVE LIST ON FRAGMENT -------
- *
- * SUBROUTINE SHORT NAME: LAF
- // Input Pointers:
- // tcConnectptr
- // fragptr
- * ------------------------------------------------------------------------- */
- void Dblqh::linkActiveFrag(Signal* signal)
- {
- TcConnectionrecPtr lafTcConnectptr;
- TcConnectionrec * const regTcPtr = tcConnectptr.p;
- Fragrecord * const regFragPtr = fragptr.p;
- Uint32 tcIndex = tcConnectptr.i;
- lafTcConnectptr.i = regFragPtr->activeList;
- regTcPtr->prevTc = RNIL;
- regFragPtr->activeList = tcIndex;
- ndbrequire(regTcPtr->listState == TcConnectionrec::NOT_IN_LIST);
- regTcPtr->nextTc = lafTcConnectptr.i;
- regTcPtr->listState = TcConnectionrec::IN_ACTIVE_LIST;
- if (lafTcConnectptr.i == RNIL) {
- return;
- } else {
- jam();
- ptrCheckGuard(lafTcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
- lafTcConnectptr.p->prevTc = tcIndex;
- }//if
- return;
- }//Dblqh::linkActiveFrag()
- /* -------------------------------------------------------------------------
- * ------- RELEASE OPERATION FROM ACTIVE LIST ON FRAGMENT -------
- *
- * SUBROUTINE SHORT NAME = RAF
- * ------------------------------------------------------------------------- */
- void Dblqh::releaseActiveFrag(Signal* signal)
- {
- TcConnectionrec * const regTcPtr = tcConnectptr.p;
- TcConnectionrecPtr ralTcNextConnectptr;
- TcConnectionrecPtr ralTcPrevConnectptr;
- fragptr.i = regTcPtr->fragmentptr;
- ralTcPrevConnectptr.i = regTcPtr->prevTc;
- ralTcNextConnectptr.i = regTcPtr->nextTc;
- ptrCheckGuard(fragptr, cfragrecFileSize, fragrecord);
- Fragrecord * const regFragPtr = fragptr.p;
- ndbrequire(regTcPtr->listState == TcConnectionrec::IN_ACTIVE_LIST);
- regTcPtr->listState = TcConnectionrec::NOT_IN_LIST;
- if (ralTcNextConnectptr.i != RNIL) {
- jam();
- ptrCheckGuard(ralTcNextConnectptr, ctcConnectrecFileSize, tcConnectionrec);
- ralTcNextConnectptr.p->prevTc = ralTcPrevConnectptr.i;
- }//if
- if (ralTcPrevConnectptr.i != RNIL) {
- jam();
- ptrCheckGuard(ralTcPrevConnectptr, ctcConnectrecFileSize, tcConnectionrec);
- ralTcPrevConnectptr.p->nextTc = regTcPtr->nextTc;
- } else {
- jam();
- /* ----------------------------------------------------------------------
- * OPERATION RECORD IS FIRST IN ACTIVE LIST
- * THIS MEANS THAT THERE EXISTS NO PREVIOUS TC THAT NEEDS TO BE UPDATED.
- * --------------------------------------------------------------------- */
- regFragPtr->activeList = ralTcNextConnectptr.i;
- }//if
- if (regFragPtr->lcpRef != RNIL) {
- jam();
- lcpPtr.i = regFragPtr->lcpRef;
- ptrCheckGuard(lcpPtr, clcpFileSize, lcpRecord);
- ndbrequire(lcpPtr.p->lcpState == LcpRecord::LCP_WAIT_ACTIVE_FINISH);
- /* --------------------------------------------------------------------
- * IF A FRAGMENT IS CURRENTLY STARTING A LOCAL CHECKPOINT AND IT
- * IS WAITING FOR ACTIVE OPERATIONS TO BE COMPLETED WITH THE
- * CURRENT PHASE, THEN IT IS CHECKED WHETHER THE
- * LAST ACTIVE OPERATION WAS NOW COMPLETED.
- * ------------------------------------------------------------------- */
- if (regFragPtr->activeList == RNIL) {
- jam();
- /* ------------------------------------------------------------------
- * ACTIVE LIST ON FRAGMENT IS EMPTY AND WE ARE WAITING FOR
- * THIS TO HAPPEN.
- * WE WILL NOW START THE CHECKPOINT IN TUP AND ACC.
- * ----------------------------------------------------------------- */
- /* SEND START LOCAL CHECKPOINT TO ACC AND TUP */
- /* ----------------------------------------------------------------- */
- fragptr.p->lcpRef = RNIL;
- lcpPtr.p->lcpState = LcpRecord::LCP_START_CHKP;
- sendStartLcp(signal);
- }//if
- }//if
- }//Dblqh::releaseActiveFrag()
- /* ######################################################################### */
- /* ####### TRANSACTION MODULE ####### */
- /* THIS MODULE HANDLES THE COMMIT AND THE COMPLETE PHASE. */
- /* ######################################################################### */
- void Dblqh::warningReport(Signal* signal, int place)
- {
- switch (place) {
- case 0:
- jam();
- #ifdef ABORT_TRACE
- ndbout << "W: Received COMMIT in wrong state in Dblqh" << endl;
- #endif
- break;
- case 1:
- jam();
- #ifdef ABORT_TRACE
- ndbout << "W: Received COMMIT with wrong transid in Dblqh" << endl;
- #endif
- break;
- case 2:
- jam();
- #ifdef ABORT_TRACE
- ndbout << "W: Received COMPLETE in wrong state in Dblqh" << endl;
- #endif
- break;
- case 3:
- jam();
- #ifdef ABORT_TRACE
- ndbout << "W: Received COMPLETE with wrong transid in Dblqh" << endl;
- #endif
- break;
- case 4:
- jam();
- #ifdef ABORT_TRACE
- ndbout << "W: Received COMMITREQ in wrong state in Dblqh" << endl;
- #endif
- break;
- case 5:
- jam();
- #ifdef ABORT_TRACE
- ndbout << "W: Received COMMITREQ with wrong transid in Dblqh" << endl;
- #endif
- break;
- case 6:
- jam();
- #ifdef ABORT_TRACE
- ndbout << "W: Received COMPLETEREQ in wrong state in Dblqh" << endl;
- #endif
- break;
- case 7:
- jam();
- #ifdef ABORT_TRACE
- ndbout << "W: Received COMPLETEREQ with wrong transid in Dblqh" << endl;
- #endif
- break;
- case 8:
- jam();
- #ifdef ABORT_TRACE
- ndbout << "W: Received ABORT with non-existing transid in Dblqh" << endl;
- #endif
- break;
- case 9:
- jam();
- #ifdef ABORT_TRACE
- ndbout << "W: Received ABORTREQ with non-existing transid in Dblqh" << endl;
- #endif
- break;
- case 10:
- jam();
- #ifdef ABORT_TRACE
- ndbout << "W: Received ABORTREQ in wrong state in Dblqh" << endl;
- #endif
- break;
- case 11:
- jam();
- #ifdef ABORT_TRACE
- ndbout << "W: Received COMMIT when tc-rec released in Dblqh" << endl;
- #endif
- break;
- case 12:
- jam();
- #ifdef ABORT_TRACE
- ndbout << "W: Received COMPLETE when tc-rec released in Dblqh" << endl;
- #endif
- break;
- case 13:
- jam();
- #ifdef ABORT_TRACE
- ndbout << "W: Received LQHKEYREF when tc-rec released in Dblqh" << endl;
- #endif
- break;
- case 14:
- jam();
- #ifdef ABORT_TRACE
- ndbout << "W: Received LQHKEYREF with wrong transid in Dblqh" << endl;
- #endif
- break;
- case 15:
- jam();
- #ifdef ABORT_TRACE
- ndbout << "W: Received LQHKEYREF when already aborting in Dblqh" << endl;
- #endif
- break;
- case 16:
- jam();
- ndbrequire(cstartPhase == ZNIL);
- #ifdef ABORT_TRACE
- ndbout << "W: Received LQHKEYREF in wrong state in Dblqh" << endl;
- #endif
- break;
- default:
- jam();
- break;
- }//switch
- return;
- }//Dblqh::warningReport()
- void Dblqh::errorReport(Signal* signal, int place)
- {
- switch (place) {
- case 0:
- jam();
- break;
- case 1:
- jam();
- break;
- case 2:
- jam();
- break;
- case 3:
- jam();
- break;
- default:
- jam();
- break;
- }//switch
- systemErrorLab(signal);
- return;
- }//Dblqh::errorReport()
- /* ************************************************************************>>
- * COMMIT: Start commit request from TC. This signal is originally sent as a
- * packed signal and this function is called from execPACKED_SIGNAL.
- * This is the normal commit protocol where TC first send this signal to the
- * backup node which then will send COMMIT to the primary node. If
- * everything is ok the primary node send COMMITTED back to TC.
- * ************************************************************************>> */
- void Dblqh::execCOMMIT(Signal* signal)
- {
- TcConnectionrec *regTcConnectionrec = tcConnectionrec;
- Uint32 ttcConnectrecFileSize = ctcConnectrecFileSize;
- Uint32 tcIndex = signal->theData[0];
- Uint32 gci = signal->theData[1];
- Uint32 transid1 = signal->theData[2];
- Uint32 transid2 = signal->theData[3];
- jamEntry();
- if (tcIndex >= ttcConnectrecFileSize) {
- errorReport(signal, 0);
- return;
- }//if
- if (ERROR_INSERTED(5011)) {
- CLEAR_ERROR_INSERT_VALUE;
- sendSignalWithDelay(cownref, GSN_COMMIT, signal, 2000, 4);
- return;
- }//if
- if (ERROR_INSERTED(5012)) {
- SET_ERROR_INSERT_VALUE(5017);
- sendSignalWithDelay(cownref, GSN_COMMIT, signal, 2000, 4);
- return;
- }//if
- tcConnectptr.i = tcIndex;
- ptrAss(tcConnectptr, regTcConnectionrec);
- if ((tcConnectptr.p->transid[0] == transid1) &&
- (tcConnectptr.p->transid[1] == transid2)) {
- commitReqLab(signal, gci);
- return;
- }//if
- warningReport(signal, 1);
- return;
- }//Dblqh::execCOMMIT()
- /* ************************************************************************>>
- * COMMITREQ: Commit request from TC. This is the commit protocol used if
- * one of the nodes is not behaving correctly. TC explicitly sends COMMITREQ
- * to both the backup and primary node and gets a COMMITCONF back if the
- * COMMIT was ok.
- * ************************************************************************>> */
- void Dblqh::execCOMMITREQ(Signal* signal)
- {
- jamEntry();
- Uint32 reqPtr = signal->theData[0];
- BlockReference reqBlockref = signal->theData[1];
- Uint32 gci = signal->theData[2];
- Uint32 transid1 = signal->theData[3];
- Uint32 transid2 = signal->theData[4];
- Uint32 tcOprec = signal->theData[6];
- if (ERROR_INSERTED(5004)) {
- systemErrorLab(signal);
- }
- if (ERROR_INSERTED(5017)) {
- CLEAR_ERROR_INSERT_VALUE;
- sendSignalWithDelay(cownref, GSN_COMMITREQ, signal, 2000, 7);
- return;
- }//if
- if (findTransaction(transid1,
- transid2,
- tcOprec) != ZOK) {
- warningReport(signal, 5);
- return;
- }//if
- TcConnectionrec * const regTcPtr = tcConnectptr.p;
- switch (regTcPtr->transactionState) {
- case TcConnectionrec::PREPARED:
- case TcConnectionrec::LOG_COMMIT_QUEUED_WAIT_SIGNAL:
- case TcConnectionrec::LOG_COMMIT_WRITTEN_WAIT_SIGNAL:
- jam();
- /*-------------------------------------------------------*/
- /* THE NORMAL CASE. */
- /*-------------------------------------------------------*/
- regTcPtr->reqBlockref = reqBlockref;
- regTcPtr->reqRef = reqPtr;
- regTcPtr->abortState = TcConnectionrec::REQ_FROM_TC;
- commitReqLab(signal, gci);
- return;
- break;
- case TcConnectionrec::COMMITTED:
- jam();
- /*---------------------------------------------------------*/
- /* FOR SOME REASON THE COMMIT PHASE HAVE BEEN */
- /* FINISHED AFTER A TIME OUT. WE NEED ONLY SEND A */
- /* COMMITCONF SIGNAL. */
- /*---------------------------------------------------------*/
- regTcPtr->reqBlockref = reqBlockref;
- regTcPtr->reqRef = reqPtr;
- regTcPtr->abortState = TcConnectionrec::REQ_FROM_TC;
- signal->theData[0] = regTcPtr->reqRef;
- signal->theData[1] = cownNodeid;
- signal->theData[2] = regTcPtr->transid[0];
- signal->theData[3] = regTcPtr->transid[1];
- sendSignal(regTcPtr->reqBlockref, GSN_COMMITCONF, signal, 4, JBB);
- break;
- case TcConnectionrec::COMMIT_STOPPED:
- jam();
- regTcPtr->reqBlockref = reqBlockref;
- regTcPtr->reqRef = reqPtr;
- regTcPtr->abortState = TcConnectionrec::REQ_FROM_TC;
- /*empty*/;
- break;
- default:
- jam();
- warningReport(signal, 4);
- return;
- break;
- }//switch
- return;
- }//Dblqh::execCOMMITREQ()
- /* ************************************************************************>>
- * COMPLETE : Complete the transaction. Sent as a packed signal from TC.
- * Works the same way as COMMIT protocol. This is the normal case with both
- * primary and backup working (See COMMIT).
- * ************************************************************************>> */
- void Dblqh::execCOMPLETE(Signal* signal)
- {
- TcConnectionrec *regTcConnectionrec = tcConnectionrec;
- Uint32 ttcConnectrecFileSize = ctcConnectrecFileSize;
- Uint32 tcIndex = signal->theData[0];
- Uint32 transid1 = signal->theData[1];
- Uint32 transid2 = signal->theData[2];
- jamEntry();
- if (tcIndex >= ttcConnectrecFileSize) {
- errorReport(signal, 1);
- return;
- }//if
- if (ERROR_INSERTED(5013)) {
- CLEAR_ERROR_INSERT_VALUE;
- sendSignalWithDelay(cownref, GSN_COMPLETE, signal, 2000, 3);
- return;
- }//if
- if (ERROR_INSERTED(5014)) {
- SET_ERROR_INSERT_VALUE(5018);
- sendSignalWithDelay(cownref, GSN_COMPLETE, signal, 2000, 3);
- return;
- }//if
- tcConnectptr.i = tcIndex;
- ptrAss(tcConnectptr, regTcConnectionrec);
- if ((tcConnectptr.p->transactionState == TcConnectionrec::COMMITTED) &&
- (tcConnectptr.p->transid[0] == transid1) &&
- (tcConnectptr.p->transid[1] == transid2)) {
- if (tcConnectptr.p->seqNoReplica != 0) {
- jam();
- localCommitLab(signal);
- return;
- } else {
- jam();
- completeTransLastLab(signal);
- return;
- }//if
- }//if
- if (tcConnectptr.p->transactionState != TcConnectionrec::COMMITTED) {
- warningReport(signal, 2);
- } else {
- warningReport(signal, 3);
- }//if
- }//Dblqh::execCOMPLETE()
- /* ************************************************************************>>
- * COMPLETEREQ: Complete request from TC. Same as COMPLETE but used if one
- * node is not working ok (See COMMIT).
- * ************************************************************************>> */
- void Dblqh::execCOMPLETEREQ(Signal* signal)
- {
- jamEntry();
- Uint32 reqPtr = signal->theData[0];
- BlockReference reqBlockref = signal->theData[1];
- Uint32 transid1 = signal->theData[2];
- Uint32 transid2 = signal->theData[3];
- Uint32 tcOprec = signal->theData[5];
- if (ERROR_INSERTED(5005)) {
- systemErrorLab(signal);
- }
- if (ERROR_INSERTED(5018)) {
- CLEAR_ERROR_INSERT_VALUE;
- sendSignalWithDelay(cownref, GSN_COMPLETEREQ, signal, 2000, 6);
- return;
- }//if
- if (findTransaction(transid1,
- transid2,
- tcOprec) != ZOK) {
- jam();
- /*---------------------------------------------------------*/
- /* FOR SOME REASON THE COMPLETE PHASE STARTED AFTER */
- /* A TIME OUT. THE TRANSACTION IS GONE. WE NEED TO */
- /* REPORT COMPLETION ANYWAY. */
- /*---------------------------------------------------------*/
- signal->theData[0] = reqPtr;
- signal->theData[1] = cownNodeid;
- signal->theData[2] = transid1;
- signal->theData[3] = transid2;
- sendSignal(reqBlockref, GSN_COMPLETECONF, signal, 4, JBB);
- warningReport(signal, 7);
- return;
- }//if
- TcConnectionrec * const regTcPtr = tcConnectptr.p;
- switch (regTcPtr->transactionState) {
- case TcConnectionrec::COMMITTED:
- jam();
- regTcPtr->reqBlockref = reqBlockref;
- regTcPtr->reqRef = reqPtr;
- regTcPtr->abortState = TcConnectionrec::REQ_FROM_TC;
- /*empty*/;
- break;
- /*---------------------------------------------------------*/
- /* THE NORMAL CASE. */
- /*---------------------------------------------------------*/
- case TcConnectionrec::COMMIT_STOPPED:
- jam();
- /*---------------------------------------------------------*/
- /* FOR SOME REASON THE COMPLETE PHASE STARTED AFTER */
- /* A TIME OUT. WE HAVE SET THE PROPER VARIABLES SUCH */
- /* THAT A COMPLETECONF WILL BE SENT WHEN COMPLETE IS */
- /* FINISHED. */
- /*---------------------------------------------------------*/
- regTcPtr->reqBlockref = reqBlockref;
- regTcPtr->reqRef = reqPtr;
- regTcPtr->abortState = TcConnectionrec::REQ_FROM_TC;
- return;
- break;
- default:
- jam();
- warningReport(signal, 6);
- return;
- break;
- }//switch
- if (regTcPtr->seqNoReplica != 0) {
- jam();
- localCommitLab(signal);
- return;
- } else {
- jam();
- completeTransLastLab(signal);
- return;
- }//if
- }//Dblqh::execCOMPLETEREQ()
- /* ************> */
- /* COMPLETED > */
- /* ************> */
- void Dblqh::execLQHKEYCONF(Signal* signal)
- {
- LqhKeyConf * const lqhKeyConf = (LqhKeyConf *)signal->getDataPtr();
- Uint32 tcIndex = lqhKeyConf->opPtr;
- Uint32 ttcConnectrecFileSize = ctcConnectrecFileSize;
- TcConnectionrec *regTcConnectionrec = tcConnectionrec;
- jamEntry();
- if (tcIndex >= ttcConnectrecFileSize) {
- errorReport(signal, 2);
- return;
- }//if
- tcConnectptr.i = tcIndex;
- ptrAss(tcConnectptr, regTcConnectionrec);
- switch (tcConnectptr.p->connectState) {
- case TcConnectionrec::LOG_CONNECTED:
- jam();
- completedLab(signal);
- return;
- break;
- case TcConnectionrec::COPY_CONNECTED:
- jam();
- copyCompletedLab(signal);
- return;
- break;
- default:
- jam();
- ndbrequire(false);
- break;
- }//switch
- return;
- }//Dblqh::execLQHKEYCONF()
- /* ------------------------------------------------------------------------- */
- /* ------- COMMIT PHASE ------- */
- /* */
- /* ------------------------------------------------------------------------- */
- void Dblqh::commitReqLab(Signal* signal, Uint32 gci)
- {
- TcConnectionrec * const regTcPtr = tcConnectptr.p;
- TcConnectionrec::LogWriteState logWriteState = regTcPtr->logWriteState;
- TcConnectionrec::TransactionState transState = regTcPtr->transactionState;
- regTcPtr->gci = gci;
- if (transState == TcConnectionrec::PREPARED) {
- if (logWriteState == TcConnectionrec::WRITTEN) {
- jam();
- regTcPtr->transactionState = TcConnectionrec::PREPARED_RECEIVED_COMMIT;
- TcConnectionrecPtr saveTcPtr = tcConnectptr;
- Uint32 blockNo = refToBlock(regTcPtr->tcTupBlockref);
- signal->theData[0] = regTcPtr->tupConnectrec;
- signal->theData[1] = gci;
- EXECUTE_DIRECT(blockNo, GSN_TUP_WRITELOG_REQ, signal, 2);
- jamEntry();
- if (regTcPtr->transactionState == TcConnectionrec::LOG_COMMIT_QUEUED) {
- jam();
- return;
- }//if
- ndbrequire(regTcPtr->transactionState == TcConnectionrec::LOG_COMMIT_WRITTEN);
- tcConnectptr = saveTcPtr;
- } else if (logWriteState == TcConnectionrec::NOT_STARTED) {
- jam();
- } else if (logWriteState == TcConnectionrec::NOT_WRITTEN) {
- jam();
- /*---------------------------------------------------------------------------*/
- /* IT IS A READ OPERATION OR OTHER OPERATION THAT DO NOT USE THE LOG. */
- /*---------------------------------------------------------------------------*/
- /*---------------------------------------------------------------------------*/
- /* THE LOG HAS NOT BEEN WRITTEN SINCE THE LOG FLAG WAS FALSE. THIS CAN OCCUR */
- /* WHEN WE ARE STARTING A NEW FRAGMENT. */
- /*---------------------------------------------------------------------------*/
- regTcPtr->logWriteState = TcConnectionrec::NOT_STARTED;
- } else {
- ndbrequire(logWriteState == TcConnectionrec::NOT_WRITTEN_WAIT);
- jam();
- /*---------------------------------------------------------------------------*/
- /* THE STATE WAS SET TO NOT_WRITTEN BY THE OPERATION BUT LATER A SCAN OF ALL */
- /* OPERATION RECORD CHANGED IT INTO NOT_WRITTEN_WAIT. THIS INDICATES THAT WE */
- /* ARE WAITING FOR THIS OPERATION TO COMMIT OR ABORT SO THAT WE CAN FIND THE */
- /* STARTING GLOBAL CHECKPOINT OF THIS NEW FRAGMENT. */
- /*---------------------------------------------------------------------------*/
- checkScanTcCompleted(signal);
- }//if
- } else if (transState == TcConnectionrec::LOG_COMMIT_QUEUED_WAIT_SIGNAL) {
- jam();
- regTcPtr->transactionState = TcConnectionrec::LOG_COMMIT_QUEUED;
- return;
- } else if (transState == TcConnectionrec::LOG_COMMIT_WRITTEN_WAIT_SIGNAL) {
- jam();
- } else {
- warningReport(signal, 0);
- return;
- }//if
- if (regTcPtr->seqNoReplica != 0) {
- jam();
- commitReplyLab(signal);
- return;
- }//if
- localCommitLab(signal);
- return;
- }//Dblqh::commitReqLab()
- void Dblqh::execLQH_WRITELOG_REQ(Signal* signal)
- {
- jamEntry();
- tcConnectptr.i = signal->theData[0];
- ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
- TcConnectionrec * const regTcPtr = tcConnectptr.p;
- Uint32 gci = signal->theData[1];
- Uint32 newestGci = cnewestGci;
- TcConnectionrec::LogWriteState logWriteState = regTcPtr->logWriteState;
- TcConnectionrec::TransactionState transState = regTcPtr->transactionState;
- regTcPtr->gci = gci;
- if (gci > newestGci) {
- jam();
- /* ------------------------------------------------------------------------- */
- /* KEEP TRACK OF NEWEST GLOBAL CHECKPOINT THAT LQH HAS HEARD OF. */
- /* ------------------------------------------------------------------------- */
- cnewestGci = gci;
- }//if
- if (logWriteState == TcConnectionrec::WRITTEN) {
- /*---------------------------------------------------------------------------*/
- /* I NEED TO INSERT A COMMIT LOG RECORD SINCE WE ARE WRITING LOG IN THIS */
- /* TRANSACTION. */
- /*---------------------------------------------------------------------------*/
- jam();
- LogPartRecordPtr regLogPartPtr;
- Uint32 noOfLogPages = cnoOfLogPages;
- jam();
- regLogPartPtr.i = regTcPtr->hashValue & 3;
- ptrCheckGuard(regLogPartPtr, clogPartFileSize, logPartRecord);
- if ((regLogPartPtr.p->logPartState == LogPartRecord::ACTIVE) ||
- (noOfLogPages == 0)) {
- jam();
- /*---------------------------------------------------------------------------*/
- /* THIS LOG PART WAS CURRENTLY ACTIVE WRITING ANOTHER LOG RECORD. WE MUST */
- /* WAIT UNTIL THIS PART HAS COMPLETED ITS OPERATION. */
- /*---------------------------------------------------------------------------*/
- // We must delay the write of commit info to the log to safe-guard against
- // a crash due to lack of log pages. We temporary stop all log writes to this
- // log part to ensure that we don't get a buffer explosion in the delayed
- // signal buffer instead.
- /*---------------------------------------------------------------------------*/
- linkWaitLog(signal, regLogPartPtr);
- if (transState == TcConnectionrec::PREPARED) {
- jam();
- regTcPtr->transactionState = TcConnectionrec::LOG_COMMIT_QUEUED_WAIT_SIGNAL;
- } else {
- jam();
- ndbrequire(transState == TcConnectionrec::PREPARED_RECEIVED_COMMIT);
- regTcPtr->transactionState = TcConnectionrec::LOG_COMMIT_QUEUED;
- }//if
- if (regLogPartPtr.p->logPartState == LogPartRecord::IDLE) {
- jam();
- regLogPartPtr.p->logPartState = LogPartRecord::ACTIVE;
- }//if
- return;
- }//if
- writeCommitLog(signal, regLogPartPtr);
- if (transState == TcConnectionrec::PREPARED) {
- jam();
- regTcPtr->transactionState = TcConnectionrec::LOG_COMMIT_WRITTEN_WAIT_SIGNAL;
- } else {
- jam();
- ndbrequire(transState == TcConnectionrec::PREPARED_RECEIVED_COMMIT);
- regTcPtr->transactionState = TcConnectionrec::LOG_COMMIT_WRITTEN;
- }//if
- }//if
- }//Dblqh::execLQH_WRITELOG_REQ()
- void Dblqh::localCommitLab(Signal* signal)
- {
- FragrecordPtr regFragptr;
- regFragptr.i = tcConnectptr.p->fragmentptr;
- ptrCheckGuard(regFragptr, cfragrecFileSize, fragrecord);
- Fragrecord::FragStatus status = regFragptr.p->fragStatus;
- fragptr = regFragptr;
- switch (status) {
- case Fragrecord::FSACTIVE:
- case Fragrecord::CRASH_RECOVERING:
- case Fragrecord::ACTIVE_CREATION:
- jam();
- commitContinueAfterBlockedLab(signal);
- return;
- break;
- case Fragrecord::BLOCKED:
- jam();
- linkFragQueue(signal);
- tcConnectptr.p->transactionState = TcConnectionrec::COMMIT_STOPPED;
- break;
- case Fragrecord::FREE:
- jam();
- case Fragrecord::DEFINED:
- jam();
- case Fragrecord::REMOVING:
- jam();
- default:
- ndbrequire(false);
- break;
- }//switch
- }//Dblqh::localCommitLab()
- void Dblqh::commitContinueAfterBlockedLab(Signal* signal)
- {
- /* ------------------------------------------------------------------------- */
- /*INPUT: TC_CONNECTPTR ACTIVE OPERATION RECORD */
- /* ------------------------------------------------------------------------- */
- /* ------------------------------------------------------------------------- */
- /*CONTINUE HERE AFTER BEING BLOCKED FOR A WHILE DURING LOCAL CHECKPOINT. */
- /*The operation is already removed from the active list since there is no */
- /*chance for any real-time breaks before we need to release it. */
- /* ------------------------------------------------------------------------- */
- /*ALSO AFTER NORMAL PROCEDURE WE CONTINUE */
- /*WE MUST COMMIT TUP BEFORE ACC TO ENSURE THAT NO ONE RACES IN AND SEES A */
- /*DIRTY STATE IN TUP. */
- /* ------------------------------------------------------------------------- */
- TcConnectionrec * const regTcPtr = tcConnectptr.p;
- Fragrecord * const regFragptr = fragptr.p;
- Uint32 operation = regTcPtr->operation;
- Uint32 simpleRead = regTcPtr->simpleRead;
- Uint32 dirtyOp = regTcPtr->dirtyOp;
- if (regTcPtr->activeCreat == ZFALSE) {
- if ((cCommitBlocked == true) &&
- (regFragptr->fragActiveStatus == ZTRUE)) {
- jam();
- /* ------------------------------------------------------------------------- */
- // TUP and/or ACC have problems in writing the undo log to disk fast enough.
- // We must avoid the commit at this time and try later instead. The fragment
- // is also active with a local checkpoint and this commit can generate UNDO
- // log records that overflow the UNDO log buffer.
- /* ------------------------------------------------------------------------- */
- /*---------------------------------------------------------------------------*/
- // We must delay the write of commit info to the log to safe-guard against
- // a crash due to lack of log pages. We temporary stop all log writes to this
- // log part to ensure that we don't get a buffer explosion in the delayed
- // signal buffer instead.
- /*---------------------------------------------------------------------------*/
- logPartPtr.i = regTcPtr->hashValue & 3;
- ptrCheckGuard(logPartPtr, clogPartFileSize, logPartRecord);
- linkWaitLog(signal, logPartPtr);
- regTcPtr->transactionState = TcConnectionrec::COMMIT_QUEUED;
- if (logPartPtr.p->logPartState == LogPartRecord::IDLE) {
- jam();
- logPartPtr.p->logPartState = LogPartRecord::ACTIVE;
- }//if
- return;
- }//if
- if (operation != ZREAD) {
- TupCommitReq * const tupCommitReq =
- (TupCommitReq *)signal->getDataPtrSend();
- Uint32 sig0 = regTcPtr->tupConnectrec;
- Uint32 tup = refToBlock(regTcPtr->tcTupBlockref);
- jam();
- tupCommitReq->opPtr = sig0;
- tupCommitReq->gci = regTcPtr->gci;
- tupCommitReq->hashValue = regTcPtr->hashValue;
- EXECUTE_DIRECT(tup, GSN_TUP_COMMITREQ, signal,
- TupCommitReq::SignalLength);
- Uint32 acc = refToBlock(regTcPtr->tcAccBlockref);
- signal->theData[0] = regTcPtr->accConnectrec;
- EXECUTE_DIRECT(acc, GSN_ACC_COMMITREQ, signal, 1);
- } else {
- if(!dirtyOp){
- Uint32 acc = refToBlock(regTcPtr->tcAccBlockref);
- signal->theData[0] = regTcPtr->accConnectrec;
- EXECUTE_DIRECT(acc, GSN_ACC_COMMITREQ, signal, 1);
- }
- }
- jamEntry();
- if (simpleRead) {
- jam();
- /* ------------------------------------------------------------------------- */
- /*THE OPERATION WAS A SIMPLE READ THUS THE COMMIT PHASE IS ONLY NEEDED TO */
- /*RELEASE THE LOCKS. AT THIS POINT IN THE CODE THE LOCKS ARE RELEASED AND WE */
- /*ARE IN A POSITION TO SEND LQHKEYCONF TO TC. WE WILL ALSO RELEASE ALL */
- /*RESOURCES BELONGING TO THIS OPERATION SINCE NO MORE WORK WILL BE */
- /*PERFORMED. */
- /* ------------------------------------------------------------------------- */
- cleanUp(signal);
- return;
- }//if
- }//if
- Uint32 seqNoReplica = regTcPtr->seqNoReplica;
- if (regTcPtr->gci > regFragptr->newestGci) {
- jam();
- /* ------------------------------------------------------------------------- */
- /*IT IS THE FIRST TIME THIS GLOBAL CHECKPOINT IS INVOLVED IN UPDATING THIS */
- /*FRAGMENT. UPDATE THE VARIABLE THAT KEEPS TRACK OF NEWEST GCI IN FRAGMENT */
- /* ------------------------------------------------------------------------- */
- regFragptr->newestGci = regTcPtr->gci;
- }//if
- if (dirtyOp != ZTRUE) {
- if (seqNoReplica != 0) {
- jam();
- completeTransNotLastLab(signal);
- return;
- }//if
- commitReplyLab(signal);
- return;
- } else {
- /* ------------------------------------------------------------------------- */
- /*WE MUST HANDLE DIRTY WRITES IN A SPECIAL WAY. THESE OPERATIONS WILL NOT */
- /*SEND ANY COMMIT OR COMPLETE MESSAGES TO OTHER NODES. THEY WILL MERELY SEND */
- /*THOSE SIGNALS INTERNALLY. */
- /* ------------------------------------------------------------------------- */
- if (regTcPtr->abortState == TcConnectionrec::ABORT_IDLE) {
- jam();
- packLqhkeyreqLab(signal);
- } else {
- ndbrequire(regTcPtr->abortState != TcConnectionrec::NEW_FROM_TC);
- jam();
- sendLqhTransconf(signal, LqhTransConf::Committed);
- cleanUp(signal);
- }//if
- }//if
- }//Dblqh::commitContinueAfterBlockedLab()
- void Dblqh::commitReplyLab(Signal* signal)
- {
- /* -------------------------------------------------------------- */
- /* BACKUP AND STAND-BY REPLICAS ONLY UPDATE THE TRANSACTION STATE */
- /* -------------------------------------------------------------- */
- TcConnectionrec * const regTcPtr = tcConnectptr.p;
- TcConnectionrec::AbortState abortState = regTcPtr->abortState;
- regTcPtr->transactionState = TcConnectionrec::COMMITTED;
- if (abortState == TcConnectionrec::ABORT_IDLE) {
- Uint32 clientBlockref = regTcPtr->clientBlockref;
- if (regTcPtr->seqNoReplica == 0) {
- jam();
- sendCommittedTc(signal, clientBlockref);
- return;
- } else {
- jam();
- sendCommitLqh(signal, clientBlockref);
- return;
- }//if
- } else if (regTcPtr->abortState == TcConnectionrec::REQ_FROM_TC) {
- jam();
- signal->theData[0] = regTcPtr->reqRef;
- signal->theData[1] = cownNodeid;
- signal->theData[2] = regTcPtr->transid[0];
- signal->theData[3] = regTcPtr->transid[1];
- sendSignal(tcConnectptr.p->reqBlockref, GSN_COMMITCONF, signal, 4, JBB);
- } else {
- ndbrequire(regTcPtr->abortState == TcConnectionrec::NEW_FROM_TC);
- jam();
- sendLqhTransconf(signal, LqhTransConf::Committed);
- }//if
- return;
- }//Dblqh::commitReplyLab()
- /* ------------------------------------------------------------------------- */
- /* ------- COMPLETE PHASE ------- */
- /* */
- /* ------------------------------------------------------------------------- */
- void Dblqh::completeTransNotLastLab(Signal* signal)
- {
- TcConnectionrec * const regTcPtr = tcConnectptr.p;
- if (regTcPtr->abortState == TcConnectionrec::ABORT_IDLE) {
- Uint32 clientBlockref = regTcPtr->clientBlockref;
- jam();
- sendCompleteLqh(signal, clientBlockref);
- cleanUp(signal);
- return;
- } else {
- jam();
- completeUnusualLab(signal);
- return;
- }//if
- }//Dblqh::completeTransNotLastLab()
- void Dblqh::completeTransLastLab(Signal* signal)
- {
- TcConnectionrec * const regTcPtr = tcConnectptr.p;
- if (regTcPtr->abortState == TcConnectionrec::ABORT_IDLE) {
- Uint32 clientBlockref = regTcPtr->clientBlockref;
- jam();
- /* ------------------------------------------------------------------------- */
- /*DIRTY WRITES WHICH ARE LAST IN THE CHAIN OF REPLICAS WILL SEND COMPLETED */
- /*INSTEAD OF SENDING PREPARED TO THE TC (OR OTHER INITIATOR OF OPERATION). */
- /* ------------------------------------------------------------------------- */
- sendCompletedTc(signal, clientBlockref);
- cleanUp(signal);
- return;
- } else {
- jam();
- completeUnusualLab(signal);
- return;
- }//if
- }//Dblqh::completeTransLastLab()
- void Dblqh::completeUnusualLab(Signal* signal)
- {
- TcConnectionrec * const regTcPtr = tcConnectptr.p;
- if (regTcPtr->abortState == TcConnectionrec::ABORT_FROM_TC) {
- jam();
- sendAborted(signal);
- } else if (regTcPtr->abortState == TcConnectionrec::NEW_FROM_TC) {
- jam();
- sendLqhTransconf(signal, LqhTransConf::Committed);
- } else {
- ndbrequire(regTcPtr->abortState == TcConnectionrec::REQ_FROM_TC);
- jam();
- signal->theData[0] = regTcPtr->reqRef;
- signal->theData[1] = cownNodeid;
- signal->theData[2] = regTcPtr->transid[0];
- signal->theData[3] = regTcPtr->transid[1];
- sendSignal(regTcPtr->reqBlockref,
- GSN_COMPLETECONF, signal, 4, JBB);
- }//if
- cleanUp(signal);
- return;
- }//Dblqh::completeUnusualLab()
- /* ========================================================================= */
- /* ======= RELEASE TC CONNECT RECORD ======= */
- /* */
- /* RELEASE A TC CONNECT RECORD TO THE FREELIST. */
- /* ========================================================================= */
- void Dblqh::releaseTcrec(Signal* signal, TcConnectionrecPtr locTcConnectptr)
- {
- jam();
- locTcConnectptr.p->tcTimer = 0;
- locTcConnectptr.p->transactionState = TcConnectionrec::TC_NOT_CONNECTED;
- locTcConnectptr.p->nextTcConnectrec = cfirstfreeTcConrec;
- cfirstfreeTcConrec = locTcConnectptr.i;
- TablerecPtr tabPtr;
- tabPtr.i = locTcConnectptr.p->tableref;
- if(tabPtr.i == RNIL)
- return;
- ptrCheckGuard(tabPtr, ctabrecFileSize, tablerec);
-
- /**
- * Normal case
- */
- ndbrequire(tabPtr.p->usageCount > 0);
- tabPtr.p->usageCount--;
- }//Dblqh::releaseTcrec()
- void Dblqh::releaseTcrecLog(Signal* signal, TcConnectionrecPtr locTcConnectptr)
- {
- jam();
- locTcConnectptr.p->tcTimer = 0;
- locTcConnectptr.p->transactionState = TcConnectionrec::TC_NOT_CONNECTED;
- locTcConnectptr.p->nextTcConnectrec = cfirstfreeTcConrec;
- cfirstfreeTcConrec = locTcConnectptr.i;
- TablerecPtr tabPtr;
- tabPtr.i = locTcConnectptr.p->tableref;
- if(tabPtr.i == RNIL)
- return;
- }//Dblqh::releaseTcrecLog()
- /* ------------------------------------------------------------------------- */
- /* ------- ABORT PHASE ------- */
- /* */
- /*THIS PART IS USED AT ERRORS THAT CAUSE ABORT OF TRANSACTION. */
- /* ------------------------------------------------------------------------- */
- /* ***************************************************>> */
- /* ABORT: Abort transaction in connection. Sender TC. */
- /* This is the normal protocol (See COMMIT) */
- /* ***************************************************>> */
- void Dblqh::execABORT(Signal* signal)
- {
- jamEntry();
- Uint32 tcOprec = signal->theData[0];
- BlockReference tcBlockref = signal->theData[1];
- Uint32 transid1 = signal->theData[2];
- Uint32 transid2 = signal->theData[3];
- CRASH_INSERTION(5003);
- if (ERROR_INSERTED(5015)) {
- CLEAR_ERROR_INSERT_VALUE;
- sendSignalWithDelay(cownref, GSN_ABORT, signal, 2000, 4);
- return;
- }//if
- if (findTransaction(transid1,
- transid2,
- tcOprec) != ZOK) {
- jam();
- if(ERROR_INSERTED(5039) &&
- refToNode(signal->getSendersBlockRef()) != getOwnNodeId()){
- jam();
- SET_ERROR_INSERT_VALUE(5040);
- return;
- }
- if(ERROR_INSERTED(5040) &&
- refToNode(signal->getSendersBlockRef()) != getOwnNodeId()){
- jam();
- SET_ERROR_INSERT_VALUE(5003);
- return;
- }
-
- /* ------------------------------------------------------------------------- */
- // SEND ABORTED EVEN IF NOT FOUND.
- //THE TRANSACTION MIGHT NEVER HAVE ARRIVED HERE.
- /* ------------------------------------------------------------------------- */
- signal->theData[0] = tcOprec;
- signal->theData[1] = transid1;
- signal->theData[2] = transid2;
- signal->theData[3] = cownNodeid;
- signal->theData[4] = ZTRUE;
- sendSignal(tcBlockref, GSN_ABORTED, signal, 5, JBB);
- warningReport(signal, 8);
- return;
- }//if
- /* ------------------------------------------------------------------------- */
- /*A GUIDING DESIGN PRINCIPLE IN HANDLING THESE ERROR SITUATIONS HAVE BEEN */
- /*KEEP IT SIMPLE. THUS WE RATHER INSERT A WAIT AND SET THE ABORT_STATE TO */
- /*ACTIVE RATHER THAN WRITE NEW CODE TO HANDLE EVERY SPECIAL SITUATION. */
- /* ------------------------------------------------------------------------- */
- TcConnectionrec * const regTcPtr = tcConnectptr.p;
- if (regTcPtr->nextReplica != ZNIL) {
- /* ------------------------------------------------------------------------- */
- // We will immediately send the ABORT message also to the next LQH node in line.
- /* ------------------------------------------------------------------------- */
- BlockReference TLqhRef = calcLqhBlockRef(regTcPtr->nextReplica);
- signal->theData[0] = regTcPtr->tcOprec;
- signal->theData[1] = regTcPtr->tcBlockref;
- signal->theData[2] = regTcPtr->transid[0];
- signal->theData[3] = regTcPtr->transid[1];
- sendSignal(TLqhRef, GSN_ABORT, signal, 4, JBB);
- }//if
- regTcPtr->abortState = TcConnectionrec::ABORT_FROM_TC;
- regTcPtr->activeCreat = ZFALSE;
- const Uint32 commitAckMarker = regTcPtr->commitAckMarker;
- if(commitAckMarker != RNIL){
- jam();
- #ifdef MARKER_TRACE
- {
- CommitAckMarkerPtr tmp;
- m_commitAckMarkerHash.getPtr(tmp, commitAckMarker);
- ndbout_c("Ab2 marker[%.8x %.8x]", tmp.p->transid1, tmp.p->transid2);
- }
- #endif
- m_commitAckMarkerHash.release(commitAckMarker);
- regTcPtr->commitAckMarker = RNIL;
- }
- abortStateHandlerLab(signal);
- return;
- }//Dblqh::execABORT()
- /* ************************************************************************>>
- * ABORTREQ: Same as ABORT but used in case one node isn't working ok.
- * (See COMMITREQ)
- * ************************************************************************>> */
- void Dblqh::execABORTREQ(Signal* signal)
- {
- jamEntry();
- Uint32 reqPtr = signal->theData[0];
- BlockReference reqBlockref = signal->theData[1];
- Uint32 transid1 = signal->theData[2];
- Uint32 transid2 = signal->theData[3];
- Uint32 tcOprec = signal->theData[5];
- if (ERROR_INSERTED(5006)) {
- systemErrorLab(signal);
- }
- if (ERROR_INSERTED(5016)) {
- CLEAR_ERROR_INSERT_VALUE;
- sendSignalWithDelay(cownref, GSN_ABORTREQ, signal, 2000, 6);
- return;
- }//if
- if (findTransaction(transid1,
- transid2,
- tcOprec) != ZOK) {
- signal->theData[0] = reqPtr;
- signal->theData[2] = cownNodeid;
- signal->theData[3] = transid1;
- signal->theData[4] = transid2;
- sendSignal(reqBlockref, GSN_ABORTCONF, signal, 5, JBB);
- warningReport(signal, 9);
- return;
- }//if
- TcConnectionrec * const regTcPtr = tcConnectptr.p;
- if (regTcPtr->transactionState != TcConnectionrec::PREPARED) {
- warningReport(signal, 10);
- return;
- }//if
- regTcPtr->reqBlockref = reqBlockref;
- regTcPtr->reqRef = reqPtr;
- regTcPtr->abortState = TcConnectionrec::REQ_FROM_TC;
- regTcPtr->activeCreat = ZFALSE;
- abortCommonLab(signal);
- return;
- }//Dblqh::execABORTREQ()
- /* ************>> */
- /* ACC_TO_REF > */
- /* ************>> */
- void Dblqh::execACC_TO_REF(Signal* signal)
- {
- jamEntry();
- terrorCode = signal->theData[1];
- releaseActiveFrag(signal);
- abortErrorLab(signal);
- return;
- }//Dblqh::execACC_TO_REF()
- /* ************> */
- /* ACCKEYREF > */
- /* ************> */
- void Dblqh::execACCKEYREF(Signal* signal)
- {
- jamEntry();
- tcConnectptr.i = signal->theData[0];
- terrorCode = signal->theData[1];
- ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
- TcConnectionrec * const tcPtr = tcConnectptr.p;
- switch (tcPtr->transactionState) {
- case TcConnectionrec::WAIT_ACC:
- jam();
- releaseActiveFrag(signal);
- break;
- case TcConnectionrec::WAIT_ACC_ABORT:
- case TcConnectionrec::ABORT_STOPPED:
- case TcConnectionrec::ABORT_QUEUED:
- jam();
- /* ------------------------------------------------------------------------- */
- /*IGNORE SINCE ABORT OF THIS OPERATION IS ONGOING ALREADY. */
- /* ------------------------------------------------------------------------- */
- return;
- break;
- default:
- ndbrequire(false);
- break;
- }//switch
- const Uint32 errCode = terrorCode;
- tcPtr->errorCode = errCode;
- /* ------------------------------------------------------------------------- */
- /*WHEN AN ABORT FROM TC ARRIVES IT COULD ACTUALLY BE A CORRECT BEHAVIOUR */
- /*SINCE THE TUPLE MIGHT NOT HAVE ARRIVED YET OR ALREADY HAVE BEEN INSERTED. */
- /* ------------------------------------------------------------------------- */
- if (tcPtr->activeCreat == ZTRUE) {
- jam();
- /* ------------------------------------------------------------------------- */
- /*THIS IS A NORMAL EVENT DURING CREATION OF A FRAGMENT. PERFORM ABORT IN */
- /*TUP AND ACC AND THEN CONTINUE WITH NORMAL COMMIT PROCESSING. IF THE ERROR */
- /*HAPPENS TO BE A SERIOUS ERROR THEN PERFORM ABORT PROCESSING AS NORMAL. */
- /* ------------------------------------------------------------------------- */
- switch (tcPtr->operation) {
- case ZUPDATE:
- case ZDELETE:
- jam();
- if (errCode != ZNO_TUPLE_FOUND) {
- jam();
- /* ------------------------------------------------------------------------- */
- /*A NORMAL ERROR WILL BE TREATED AS A NORMAL ABORT AND WILL ABORT THE */
- /*TRANSACTION. NO SPECIAL HANDLING IS NEEDED. */
- /* ------------------------------------------------------------------------- */
- tcPtr->activeCreat = ZFALSE;
- }//if
- break;
- case ZINSERT:
- jam();
- if (errCode != ZTUPLE_ALREADY_EXIST) {
- jam();
- /* ------------------------------------------------------------------------- */
- /*A NORMAL ERROR WILL BE TREATED AS A NORMAL ABORT AND WILL ABORT THE */
- /*TRANSACTION. NO SPECIAL HANDLING IS NEEDED. */
- /* ------------------------------------------------------------------------- */
- tcPtr->activeCreat = ZFALSE;
- }//if
- break;
- default:
- jam();
- /* ------------------------------------------------------------------------- */
- /*A NORMAL ERROR WILL BE TREATED AS A NORMAL ABORT AND WILL ABORT THE */
- /*TRANSACTION. NO SPECIAL HANDLING IS NEEDED. */
- /* ------------------------------------------------------------------------- */
- tcPtr->activeCreat = ZFALSE;
- break;
- }//switch
- } else {
- /**
- * Only primary replica can get ZTUPLE_ALREADY_EXIST || ZNO_TUPLE_FOUND
- *
- * Unless it's a simple or dirty read
- *
- * NOT TRUE!
- * 1) op1 - primary insert ok
- * 2) op1 - backup insert fail (log full or what ever)
- * 3) op1 - delete ok @ primary
- * 4) op1 - delete fail @ backup
- *
- * -> ZNO_TUPLE_FOUND is possible
- */
- ndbrequire
- (tcPtr->seqNoReplica == 0 ||
- errCode != ZTUPLE_ALREADY_EXIST ||
- (tcPtr->operation == ZREAD && (tcPtr->dirtyOp || tcPtr->opSimple)));
- }
- tcPtr->abortState = TcConnectionrec::ABORT_FROM_LQH;
- abortCommonLab(signal);
- return;
- }//Dblqh::execACCKEYREF()
- void Dblqh::localAbortStateHandlerLab(Signal* signal)
- {
- TcConnectionrec * const regTcPtr = tcConnectptr.p;
- if (regTcPtr->abortState != TcConnectionrec::ABORT_IDLE) {
- jam();
- return;
- }//if
- regTcPtr->activeCreat = ZFALSE;
- regTcPtr->abortState = TcConnectionrec::ABORT_FROM_LQH;
- regTcPtr->errorCode = terrorCode;
- abortStateHandlerLab(signal);
- return;
- }//Dblqh::localAbortStateHandlerLab()
- void Dblqh::abortStateHandlerLab(Signal* signal)
- {
- TcConnectionrec * const regTcPtr = tcConnectptr.p;
- switch (regTcPtr->transactionState) {
- case TcConnectionrec::PREPARED:
- jam();
- /* ------------------------------------------------------------------------- */
- /*THE OPERATION IS ALREADY PREPARED AND SENT TO THE NEXT LQH OR BACK TO TC. */
- /*WE CAN SIMPLY CONTINUE WITH THE ABORT PROCESS. */
- /*IF IT WAS A CHECK FOR TRANSACTION STATUS THEN WE REPORT THE STATUS TO THE */
- /*NEW TC AND CONTINUE WITH THE NEXT OPERATION IN LQH. */
- /* ------------------------------------------------------------------------- */
- if (regTcPtr->abortState == TcConnectionrec::NEW_FROM_TC) {
- jam();
- sendLqhTransconf(signal, LqhTransConf::Prepared);
- return;
- }//if
- break;
- case TcConnectionrec::LOG_COMMIT_WRITTEN_WAIT_SIGNAL:
- case TcConnectionrec::LOG_COMMIT_QUEUED_WAIT_SIGNAL:
- jam();
- /* ------------------------------------------------------------------------- */
- // We can only reach these states for multi-updates on a record in a transaction.
- // We know that at least one of those has received the COMMIT signal, thus we
- // declare us only prepared since we then receive the expected COMMIT signal.
- /* ------------------------------------------------------------------------- */
- ndbrequire(regTcPtr->abortState == TcConnectionrec::NEW_FROM_TC);
- sendLqhTransconf(signal, LqhTransConf::Prepared);
- break;
- case TcConnectionrec::WAIT_TUPKEYINFO:
- case TcConnectionrec::WAIT_ATTR:
- jam();
- /* ------------------------------------------------------------------------- */
- /* WE ARE CURRENTLY WAITING FOR MORE INFORMATION. WE CAN START THE ABORT */
- /* PROCESS IMMEDIATELY. THE KEYINFO AND ATTRINFO SIGNALS WILL BE DROPPED */
- /* SINCE THE ABORT STATE WILL BE SET. */
- /* ------------------------------------------------------------------------- */
- break;
- case TcConnectionrec::WAIT_TUP:
- jam();
- /* ------------------------------------------------------------------------- */
- // TUP is currently active. We have to wait for the TUPKEYREF or TUPKEYCONF
- // to arrive since we might otherwise jeopardise the local checkpoint
- // consistency in overload situations.
- /* ------------------------------------------------------------------------- */
- regTcPtr->transactionState = TcConnectionrec::WAIT_TUP_TO_ABORT;
- return;
- case TcConnectionrec::WAIT_ACC:
- jam();
- if (regTcPtr->listState == TcConnectionrec::ACC_BLOCK_LIST) {
- jam();
- /* ------------------------------------------------------------------------- */
- // If the operation is in the ACC Blocked list the operation is not allowed
- // to start yet. We release it from the ACC Blocked list and will go through
- // the gate in abortCommonLab(..) where it will be blocked.
- /* ------------------------------------------------------------------------- */
- fragptr.i = regTcPtr->fragmentptr;
- ptrCheckGuard(fragptr, cfragrecFileSize, fragrecord);
- releaseAccList(signal);
- } else {
- jam();
- /* ------------------------------------------------------------------------- */
- // We start the abort immediately since the operation is still in the active
- // list and the fragment cannot have been frozen yet. By sending LCP_HOLDOPCONF
- // as direct signals we avoid the problem that we might find the operation
- // in an unexpected list in ACC.
- // We cannot accept being blocked before aborting ACC here since that would
- // lead to seriously complex issues.
- /* ------------------------------------------------------------------------- */
- abortContinueAfterBlockedLab(signal, false);
- return;
- }//if
- break;
- case TcConnectionrec::LOG_QUEUED:
- jam();
- /* ------------------------------------------------------------------------- */
- /*CURRENTLY QUEUED FOR LOGGING. WAIT UNTIL THE LOG RECORD HAVE BEEN INSERTED */
- /*AND THEN CONTINUE THE ABORT PROCESS. */
- //Could also be waiting for an overloaded log disk. In this case it is easy
- //to abort when CONTINUEB arrives.
- /* ------------------------------------------------------------------------- */
- return;
- break;
- case TcConnectionrec::STOPPED:
- jam();
- /* ---------------------------------------------------------------------
- * WE ARE CURRENTLY QUEUED FOR ACCESS TO THE FRAGMENT BY A LCP
- * Since nothing has been done, just release operation
- * i.e. no prepare log record has been written
- * so no abort log records needs to be written
- */
- releaseWaitQueue(signal);
- continueAfterLogAbortWriteLab(signal);
- return;
- break;
- case TcConnectionrec::WAIT_AI_AFTER_ABORT:
- jam();
- /* ------------------------------------------------------------------------- */