DbtupCommit.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:21k
- /* Copyright (C) 2003 MySQL AB
- This program is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation; either version 2 of the License, or
- (at your option) any later version.
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
- You should have received a copy of the GNU General Public License
- along with this program; if not, write to the Free Software
- Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
- #define DBTUP_C
- #include "Dbtup.hpp"
- #include <RefConvert.hpp>
- #include <ndb_limits.h>
- #include <pc.hpp>
- #include <signaldata/TupCommit.hpp>
- #define ljam() { jamLine(5000 + __LINE__); }
- #define ljamEntry() { jamEntryLine(5000 + __LINE__); }
- void Dbtup::execTUP_WRITELOG_REQ(Signal* signal)
- {
- jamEntry();
- OperationrecPtr loopOpPtr;
- loopOpPtr.i = signal->theData[0];
- Uint32 gci = signal->theData[1];
- ptrCheckGuard(loopOpPtr, cnoOfOprec, operationrec);
- while (loopOpPtr.p->nextActiveOp != RNIL) {
- ljam();
- loopOpPtr.i = loopOpPtr.p->nextActiveOp;
- ptrCheckGuard(loopOpPtr, cnoOfOprec, operationrec);
- }//while
- do {
- Uint32 blockNo = refToBlock(loopOpPtr.p->userblockref);
- ndbrequire(loopOpPtr.p->transstate == STARTED);
- signal->theData[0] = loopOpPtr.p->userpointer;
- signal->theData[1] = gci;
- if (loopOpPtr.p->prevActiveOp == RNIL) {
- ljam();
- EXECUTE_DIRECT(blockNo, GSN_LQH_WRITELOG_REQ, signal, 2);
- return;
- }//if
- ljam();
- EXECUTE_DIRECT(blockNo, GSN_LQH_WRITELOG_REQ, signal, 2);
- jamEntry();
- loopOpPtr.i = loopOpPtr.p->prevActiveOp;
- ptrCheckGuard(loopOpPtr, cnoOfOprec, operationrec);
- } while (true);
- }//Dbtup::execTUP_WRITELOG_REQ()
- void Dbtup::execTUP_DEALLOCREQ(Signal* signal)
- {
- TablerecPtr regTabPtr;
- FragrecordPtr regFragPtr;
- jamEntry();
- Uint32 fragId = signal->theData[0];
- regTabPtr.i = signal->theData[1];
- Uint32 fragPageId = signal->theData[2];
- Uint32 pageIndex = signal->theData[3];
- ptrCheckGuard(regTabPtr, cnoOfTablerec, tablerec);
- getFragmentrec(regFragPtr, fragId, regTabPtr.p);
- ndbrequire(regFragPtr.p != NULL);
- PagePtr pagePtr;
- pagePtr.i = getRealpid(regFragPtr.p, fragPageId);
- ptrCheckGuard(pagePtr, cnoOfPage, page);
- Uint32 pageIndexScaled = pageIndex >> 1;
- ndbrequire((pageIndex & 1) == 0);
- Uint32 pageOffset = ZPAGE_HEADER_SIZE +
- (regTabPtr.p->tupheadsize * pageIndexScaled);
- //---------------------------------------------------
- /* --- Deallocate a tuple as requested by ACC --- */
- //---------------------------------------------------
- if (isUndoLoggingNeeded(regFragPtr.p, fragPageId)) {
- ljam();
- cprAddUndoLogRecord(signal,
- ZLCPR_TYPE_INSERT_TH,
- fragPageId,
- pageIndex,
- regTabPtr.i,
- fragId,
- regFragPtr.p->checkpointVersion);
- cprAddData(signal,
- regFragPtr.p,
- pagePtr.i,
- regTabPtr.p->tupheadsize,
- pageOffset);
- }//if
- {
- freeTh(regFragPtr.p,
- regTabPtr.p,
- signal,
- pagePtr.p,
- pageOffset);
- }
- }
- /* ---------------------------------------------------------------- */
- /* ------------ PERFORM A COMMIT ON AN UPDATE OPERATION ---------- */
- /* ---------------------------------------------------------------- */
- void Dbtup::commitUpdate(Signal* signal,
- Operationrec* const regOperPtr,
- Fragrecord* const regFragPtr,
- Tablerec* const regTabPtr)
- {
- if (regOperPtr->realPageIdC != RNIL) {
- if (isUndoLoggingNeeded(regFragPtr, regOperPtr->fragPageIdC)) {
- /* ------------------------------------------------------------------------ */
- /* IF THE COPY WAS CREATED WITHIN THIS CHECKPOINT WE ONLY HAVE */
- /* TO LOG THE CREATION OF THE COPY. IF HOWEVER IT WAS CREATED BEFORE SAVE */
- /* THIS CHECKPOINT, WE HAVE TO THE DATA AS WELL. */
- /* ------------------------------------------------------------------------ */
- if (regOperPtr->undoLogged) {
- ljam();
- cprAddUndoLogRecord(signal,
- ZLCPR_TYPE_INSERT_TH_NO_DATA,
- regOperPtr->fragPageIdC,
- regOperPtr->pageIndexC,
- regOperPtr->tableRef,
- regOperPtr->fragId,
- regFragPtr->checkpointVersion);
- } else {
- ljam();
- cprAddUndoLogRecord(signal,
- ZLCPR_TYPE_INSERT_TH,
- regOperPtr->fragPageIdC,
- regOperPtr->pageIndexC,
- regOperPtr->tableRef,
- regOperPtr->fragId,
- regFragPtr->checkpointVersion);
- cprAddData(signal,
- regFragPtr,
- regOperPtr->realPageIdC,
- regTabPtr->tupheadsize,
- regOperPtr->pageOffsetC);
- }//if
- }//if
- PagePtr copyPagePtr;
- copyPagePtr.i = regOperPtr->realPageIdC;
- ptrCheckGuard(copyPagePtr, cnoOfPage, page);
- freeTh(regFragPtr,
- regTabPtr,
- signal,
- copyPagePtr.p,
- (Uint32)regOperPtr->pageOffsetC);
- regOperPtr->realPageIdC = RNIL;
- regOperPtr->fragPageIdC = RNIL;
- regOperPtr->pageOffsetC = ZNIL;
- regOperPtr->pageIndexC = ZNIL;
- }//if
- }//Dbtup::commitUpdate()
- void
- Dbtup::commitSimple(Signal* signal,
- Operationrec* const regOperPtr,
- Fragrecord* const regFragPtr,
- Tablerec* const regTabPtr)
- {
- operPtr.p = regOperPtr;
- fragptr.p = regFragPtr;
- tabptr.p = regTabPtr;
- // Checking detached triggers
- checkDetachedTriggers(signal,
- regOperPtr,
- regTabPtr);
- removeActiveOpList(regOperPtr);
- if (regOperPtr->optype == ZUPDATE) {
- ljam();
- commitUpdate(signal, regOperPtr, regFragPtr, regTabPtr);
- if (regTabPtr->GCPIndicator) {
- updateGcpId(signal, regOperPtr, regFragPtr, regTabPtr);
- }//if
- } else if (regOperPtr->optype == ZINSERT) {
- ljam();
- if (regTabPtr->GCPIndicator) {
- updateGcpId(signal, regOperPtr, regFragPtr, regTabPtr);
- }//if
- } else {
- ndbrequire(regOperPtr->optype == ZDELETE);
- }//if
- }//Dbtup::commitSimple()
- void Dbtup::removeActiveOpList(Operationrec* const regOperPtr)
- {
- if (regOperPtr->inActiveOpList == ZTRUE) {
- OperationrecPtr raoOperPtr;
- regOperPtr->inActiveOpList = ZFALSE;
- if (regOperPtr->prevActiveOp != RNIL) {
- ljam();
- raoOperPtr.i = regOperPtr->prevActiveOp;
- ptrCheckGuard(raoOperPtr, cnoOfOprec, operationrec);
- raoOperPtr.p->nextActiveOp = regOperPtr->nextActiveOp;
- } else {
- ljam();
- PagePtr pagePtr;
- pagePtr.i = regOperPtr->realPageId;
- ptrCheckGuard(pagePtr, cnoOfPage, page);
- ndbrequire(regOperPtr->pageOffset < ZWORDS_ON_PAGE);
- pagePtr.p->pageWord[regOperPtr->pageOffset] = regOperPtr->nextActiveOp;
- }//if
- if (regOperPtr->nextActiveOp != RNIL) {
- ljam();
- raoOperPtr.i = regOperPtr->nextActiveOp;
- ptrCheckGuard(raoOperPtr, cnoOfOprec, operationrec);
- raoOperPtr.p->prevActiveOp = regOperPtr->prevActiveOp;
- }//if
- regOperPtr->prevActiveOp = RNIL;
- regOperPtr->nextActiveOp = RNIL;
- }//if
- }//Dbtup::removeActiveOpList()
- /* ---------------------------------------------------------------- */
- /* INITIALIZATION OF ONE CONNECTION RECORD TO PREPARE FOR NEXT OP. */
- /* ---------------------------------------------------------------- */
- void Dbtup::initOpConnection(Operationrec* regOperPtr,
- Fragrecord * fragPtrP)
- {
- Uint32 RinFragList = regOperPtr->inFragList;
- regOperPtr->transstate = IDLE;
- regOperPtr->currentAttrinbufLen = 0;
- regOperPtr->optype = ZREAD;
- if (RinFragList == ZTRUE) {
- OperationrecPtr tropNextLinkPtr;
- OperationrecPtr tropPrevLinkPtr;
- /*----------------------------------------------------------------- */
- /* TO ENSURE THAT WE HAVE SUCCESSFUL ABORTS OF FOLLOWING */
- /* OPERATIONS WHICH NEVER STARTED WE SET THE OPTYPE TO READ. */
- /*----------------------------------------------------------------- */
- /* REMOVE IT FROM THE DOUBLY LINKED LIST ON THE FRAGMENT */
- /*----------------------------------------------------------------- */
- tropPrevLinkPtr.i = regOperPtr->prevOprecInList;
- tropNextLinkPtr.i = regOperPtr->nextOprecInList;
- regOperPtr->inFragList = ZFALSE;
- if (tropPrevLinkPtr.i == RNIL) {
- ljam();
- fragPtrP->firstusedOprec = tropNextLinkPtr.i;
- } else {
- ljam();
- ptrCheckGuard(tropPrevLinkPtr, cnoOfOprec, operationrec);
- tropPrevLinkPtr.p->nextOprecInList = tropNextLinkPtr.i;
- }//if
- if (tropNextLinkPtr.i == RNIL) {
- fragPtrP->lastusedOprec = tropPrevLinkPtr.i;
- } else {
- ptrCheckGuard(tropNextLinkPtr, cnoOfOprec, operationrec);
- tropNextLinkPtr.p->prevOprecInList = tropPrevLinkPtr.i;
- }
- regOperPtr->prevOprecInList = RNIL;
- regOperPtr->nextOprecInList = RNIL;
- }//if
- }//Dbtup::initOpConnection()
- /* ----------------------------------------------------------------- */
- /* --------------- COMMIT THIS PART OF A TRANSACTION --------------- */
- /* ----------------------------------------------------------------- */
- void Dbtup::execTUP_COMMITREQ(Signal* signal)
- {
- FragrecordPtr regFragPtr;
- OperationrecPtr regOperPtr;
- TablerecPtr regTabPtr;
- TupCommitReq * const tupCommitReq = (TupCommitReq *)signal->getDataPtr();
- ljamEntry();
- regOperPtr.i = tupCommitReq->opPtr;
- ptrCheckGuard(regOperPtr, cnoOfOprec, operationrec);
- ndbrequire(regOperPtr.p->transstate == STARTED);
- regOperPtr.p->gci = tupCommitReq->gci;
- regOperPtr.p->hashValue = tupCommitReq->hashValue;
- regFragPtr.i = regOperPtr.p->fragmentPtr;
- ptrCheckGuard(regFragPtr, cnoOfFragrec, fragrecord);
- regTabPtr.i = regOperPtr.p->tableRef;
- ptrCheckGuard(regTabPtr, cnoOfTablerec, tablerec);
- if (!regTabPtr.p->tuxCustomTriggers.isEmpty()) {
- ljam();
- executeTuxCommitTriggers(signal,
- regOperPtr.p,
- regTabPtr.p);
- }
- if (regOperPtr.p->tupleState == NO_OTHER_OP) {
- if ((regOperPtr.p->prevActiveOp == RNIL) &&
- (regOperPtr.p->nextActiveOp == RNIL)) {
- ljam();
- /* ---------------------------------------------------------- */
- // We handle the simple case separately as an optimisation
- /* ---------------------------------------------------------- */
- commitSimple(signal,
- regOperPtr.p,
- regFragPtr.p,
- regTabPtr.p);
- } else {
- /* ---------------------------------------------------------- */
- // This is the first commit message of this record in this
- // transaction. We will commit this record completely for this
- // transaction. If there are other operations they will be
- // responsible to release their own resources. Also commit of
- // a delete is postponed until the last operation is committed
- // on the tuple.
- //
- // As part of this commitRecord we will also handle detached
- // triggers and release of resources for this operation.
- /* ---------------------------------------------------------- */
- ljam();
- commitRecord(signal,
- regOperPtr.p,
- regFragPtr.p,
- regTabPtr.p);
- removeActiveOpList(regOperPtr.p);
- }//if
- } else {
- ljam();
- /* ---------------------------------------------------------- */
- // Release any copy tuples
- /* ---------------------------------------------------------- */
- ndbrequire(regOperPtr.p->tupleState == TO_BE_COMMITTED);
- commitUpdate(signal, regOperPtr.p, regFragPtr.p, regTabPtr.p);
- removeActiveOpList(regOperPtr.p);
- }//if
- initOpConnection(regOperPtr.p, regFragPtr.p);
- }//execTUP_COMMITREQ()
- void
- Dbtup::updateGcpId(Signal* signal,
- Operationrec* const regOperPtr,
- Fragrecord* const regFragPtr,
- Tablerec* const regTabPtr)
- {
- PagePtr pagePtr;
- ljam();
- //--------------------------------------------------------------------
- // Is this code safe for UNDO logging. Not sure currently. RONM
- //--------------------------------------------------------------------
- pagePtr.i = regOperPtr->realPageId;
- ptrCheckGuard(pagePtr, cnoOfPage, page);
- Uint32 temp = regOperPtr->pageOffset + regTabPtr->tupGCPIndex;
- ndbrequire((temp < ZWORDS_ON_PAGE) &&
- (regTabPtr->tupGCPIndex < regTabPtr->tupheadsize));
- if (isUndoLoggingNeeded(regFragPtr, regOperPtr->fragPageId)) {
- Uint32 prevGCI = pagePtr.p->pageWord[temp];
- ljam();
- cprAddUndoLogRecord(signal,
- ZLCPR_TYPE_UPDATE_GCI,
- regOperPtr->fragPageId,
- regOperPtr->pageIndex,
- regOperPtr->tableRef,
- regOperPtr->fragId,
- regFragPtr->checkpointVersion);
- cprAddGCIUpdate(signal,
- prevGCI,
- regFragPtr);
- }//if
- pagePtr.p->pageWord[temp] = regOperPtr->gci;
- if (regTabPtr->checksumIndicator) {
- ljam();
- setChecksum(pagePtr.p, regOperPtr->pageOffset, regTabPtr->tupheadsize);
- }//if
- }//Dbtup::updateGcpId()
- void
- Dbtup::commitRecord(Signal* signal,
- Operationrec* const regOperPtr,
- Fragrecord* const regFragPtr,
- Tablerec* const regTabPtr)
- {
- Uint32 opType;
- OperationrecPtr firstOpPtr;
- PagePtr pagePtr;
- pagePtr.i = regOperPtr->realPageId;
- ptrCheckGuard(pagePtr, cnoOfPage, page);
- setTupleStatesSetOpType(regOperPtr, pagePtr.p, opType, firstOpPtr);
- fragptr.p = regFragPtr;
- tabptr.p = regTabPtr;
- if (opType == ZINSERT_DELETE) {
- ljam();
- //--------------------------------------------------------------------
- // We started by inserting the tuple and ended by deleting. Seen from
- // transactions point of view no changes were made.
- //--------------------------------------------------------------------
- commitUpdate(signal, regOperPtr, regFragPtr, regTabPtr);
- return;
- } else if (opType == ZINSERT) {
- ljam();
- //--------------------------------------------------------------------
- // We started by inserting whereafter we made several changes to the
- // tuple that could include updates, deletes and new inserts. The final
- // state of the tuple is the original tuple. This is reached from this
- // operation. We change the optype on this operation to ZINSERT to
- // ensure proper operation of the detached trigger.
- // We restore the optype after executing triggers although not really
- // needed.
- //--------------------------------------------------------------------
- Uint32 saveOpType = regOperPtr->optype;
- regOperPtr->optype = ZINSERT;
- operPtr.p = regOperPtr;
- checkDetachedTriggers(signal,
- regOperPtr,
- regTabPtr);
- regOperPtr->optype = saveOpType;
- } else if (opType == ZUPDATE) {
- ljam();
- //--------------------------------------------------------------------
- // We want to use the first operation which contains a copy tuple
- // reference. This operation contains the before value of this record
- // for this transaction. Then this operation is used for executing
- // triggers with optype set to update.
- //--------------------------------------------------------------------
- OperationrecPtr befOpPtr;
- findBeforeValueOperation(befOpPtr, firstOpPtr);
- Uint32 saveOpType = befOpPtr.p->optype;
- Bitmask<MAXNROFATTRIBUTESINWORDS> attributeMask;
- Bitmask<MAXNROFATTRIBUTESINWORDS> saveAttributeMask;
- calculateChangeMask(pagePtr.p,
- regTabPtr,
- befOpPtr.p->pageOffset,
- attributeMask);
- saveAttributeMask.clear();
- saveAttributeMask.bitOR(befOpPtr.p->changeMask);
- befOpPtr.p->changeMask.clear();
- befOpPtr.p->changeMask.bitOR(attributeMask);
- befOpPtr.p->gci = regOperPtr->gci;
-
- operPtr.p = befOpPtr.p;
- checkDetachedTriggers(signal,
- befOpPtr.p,
- regTabPtr);
- befOpPtr.p->changeMask.clear();
- befOpPtr.p->changeMask.bitOR(saveAttributeMask);
- befOpPtr.p->optype = saveOpType;
- } else if (opType == ZDELETE) {
- ljam();
- //--------------------------------------------------------------------
- // We want to use the first operation which contains a copy tuple.
- // We benefit from the fact that we know that it cannot be a simple
- // delete and it cannot be an insert followed by a delete. Thus there
- // must either be an update or a insert following a delete. In both
- // cases we will find a before value in a copy tuple.
- //
- // An added complexity is that the trigger handling assumes that the
- // before value is located in the original tuple so we have to move the
- // copy tuple reference to the original tuple reference and afterwards
- // restore it again.
- //--------------------------------------------------------------------
- OperationrecPtr befOpPtr;
- findBeforeValueOperation(befOpPtr, firstOpPtr);
- Uint32 saveOpType = befOpPtr.p->optype;
- Uint32 realPageId = befOpPtr.p->realPageId;
- Uint32 pageOffset = befOpPtr.p->pageOffset;
- Uint32 fragPageId = befOpPtr.p->fragPageId;
- Uint32 pageIndex = befOpPtr.p->pageIndex;
- befOpPtr.p->realPageId = befOpPtr.p->realPageIdC;
- befOpPtr.p->pageOffset = befOpPtr.p->pageOffsetC;
- befOpPtr.p->fragPageId = befOpPtr.p->fragPageIdC;
- befOpPtr.p->pageIndex = befOpPtr.p->pageIndexC;
- befOpPtr.p->gci = regOperPtr->gci;
- operPtr.p = befOpPtr.p;
- checkDetachedTriggers(signal,
- befOpPtr.p,
- regTabPtr);
- befOpPtr.p->realPageId = realPageId;
- befOpPtr.p->pageOffset = pageOffset;
- befOpPtr.p->fragPageId = fragPageId;
- befOpPtr.p->pageIndex = pageIndex;
- befOpPtr.p->optype = saveOpType;
- } else {
- ndbrequire(false);
- }//if
- commitUpdate(signal, regOperPtr, regFragPtr, regTabPtr);
- if (regTabPtr->GCPIndicator) {
- updateGcpId(signal, regOperPtr, regFragPtr, regTabPtr);
- }//if
- }//Dbtup::commitRecord()
- void
- Dbtup::setTupleStatesSetOpType(Operationrec* const regOperPtr,
- Page* const pagePtr,
- Uint32& opType,
- OperationrecPtr& firstOpPtr)
- {
- OperationrecPtr loopOpPtr;
- OperationrecPtr lastOpPtr;
- ndbrequire(regOperPtr->pageOffset < ZWORDS_ON_PAGE);
- loopOpPtr.i = pagePtr->pageWord[regOperPtr->pageOffset];
- ptrCheckGuard(loopOpPtr, cnoOfOprec, operationrec);
- lastOpPtr = loopOpPtr;
- if (loopOpPtr.p->optype == ZDELETE) {
- ljam();
- opType = ZDELETE;
- } else {
- ljam();
- opType = ZUPDATE;
- }//if
- do {
- ljam();
- ptrCheckGuard(loopOpPtr, cnoOfOprec, operationrec);
- firstOpPtr = loopOpPtr;
- loopOpPtr.p->tupleState = TO_BE_COMMITTED;
- loopOpPtr.i = loopOpPtr.p->nextActiveOp;
- } while (loopOpPtr.i != RNIL);
- if (opType == ZDELETE) {
- ljam();
- if (firstOpPtr.p->optype == ZINSERT) {
- ljam();
- opType = ZINSERT_DELETE;
- }//if
- } else {
- ljam();
- if (firstOpPtr.p->optype == ZINSERT) {
- ljam();
- opType = ZINSERT;
- }//if
- }///if
- }//Dbtup::setTupleStatesSetOpType()
- void Dbtup::findBeforeValueOperation(OperationrecPtr& befOpPtr,
- OperationrecPtr firstOpPtr)
- {
- befOpPtr = firstOpPtr;
- if (befOpPtr.p->realPageIdC != RNIL) {
- ljam();
- return;
- } else {
- ljam();
- befOpPtr.i = befOpPtr.p->prevActiveOp;
- ptrCheckGuard(befOpPtr, cnoOfOprec, operationrec);
- ndbrequire(befOpPtr.p->realPageIdC != RNIL);
- }//if
- }//Dbtup::findBeforeValueOperation()
- void
- Dbtup::calculateChangeMask(Page* const pagePtr,
- Tablerec* const regTabPtr,
- Uint32 pageOffset,
- Bitmask<MAXNROFATTRIBUTESINWORDS>& attributeMask)
- {
- OperationrecPtr loopOpPtr;
- attributeMask.clear();
- ndbrequire(pageOffset < ZWORDS_ON_PAGE);
- loopOpPtr.i = pagePtr->pageWord[pageOffset];
- do {
- ptrCheckGuard(loopOpPtr, cnoOfOprec, operationrec);
- if (loopOpPtr.p->optype == ZUPDATE) {
- ljam();
- attributeMask.bitOR(loopOpPtr.p->changeMask);
- } else if (loopOpPtr.p->optype == ZINSERT) {
- ljam();
- attributeMask.set();
- return;
- } else {
- ndbrequire(loopOpPtr.p->optype == ZDELETE);
- }//if
- loopOpPtr.i = loopOpPtr.p->nextActiveOp;
- } while (loopOpPtr.i != RNIL);
- }//Dbtup::calculateChangeMask()