DbaccMain.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:575k
- arrGuard(trlpTmp2, 256);
- rlpOverflowDirptr.i = rlpOverflowrangeptr.p->dirArray[trlpTmp2];
- ptrCheckGuard(rlpOverflowDirptr, cdirarraysize, directoryarray);
- rlpOverflowDirptr.p->pagep[trlpTmp3] = RNIL;
- if (cundoLogActive != ZTRUE) {
- // Remove from page array.
- trfpArrayPos = rlopPageptr.p->word32[ZPOS_ARRAY_POS];
- rfpPageptr = rlopPageptr;
- removeFromPageArrayList(signal);
- }
- // Reset page header
- iloPageptr = rlopPageptr;
- tiloIndex = rlopPageptr.p->word32[ZPOS_PAGE_ID];
- initLongOverpage(signal);
- rpPageptr = rlopPageptr;
- releasePage(signal);
- }//Dbacc::releaseLongPage()
- /* ------------------------------------------------------------------------- */
- /* ------------------------------------------------------------------------- */
- /* ------------------------------------------------------------------------- */
- /* */
- /* END OF DELETE MODULE */
- /* */
- /* ------------------------------------------------------------------------- */
- /* ------------------------------------------------------------------------- */
- /* ------------------------------------------------------------------------- */
- /* ------------------------------------------------------------------------- */
- /* ------------------------------------------------------------------------- */
- /* ------------------------------------------------------------------------- */
- /* */
- /* COMMIT AND ABORT MODULE */
- /* */
- /* ------------------------------------------------------------------------- */
- /* ------------------------------------------------------------------------- */
- /* ------------------------------------------------------------------------- */
- /* ------------------------------------------------------------------------- */
- /* ABORT_OPERATION */
- /*DESCRIPTION: AN OPERATION RECORD CAN BE IN A LOCK QUEUE OF AN ELEMENT OR */
- /*OWNS THE LOCK. BY THIS SUBROUTINE THE LOCK STATE OF THE OPERATION WILL */
- /*BE CHECKED. THE OPERATION RECORD WILL BE REMOVED FROM THE QUEUE IF IT */
- /*BELONGED TO ANY ONE, OTHERWISE THE ELEMENT HEAD WILL BE UPDATED. */
- /* ------------------------------------------------------------------------- */
- void Dbacc::abortOperation(Signal* signal)
- {
- OperationrecPtr aboOperRecPtr;
- OperationrecPtr TaboOperRecPtr;
- Page8Ptr aboPageidptr;
- Uint32 taboElementptr;
- Uint32 tmp2Olq;
- if (operationRecPtr.p->lockOwner == ZTRUE) {
- takeOutLockOwnersList(signal, operationRecPtr);
- if (operationRecPtr.p->insertIsDone == ZTRUE) {
- jam();
- operationRecPtr.p->elementIsDisappeared = ZTRUE;
- }//if
- if ((operationRecPtr.p->nextParallelQue != RNIL) ||
- (operationRecPtr.p->nextSerialQue != RNIL)) {
- jam();
- releaselock(signal);
- } else {
- /* --------------------------------------------------------------------------------- */
- /* WE ARE OWNER OF THE LOCK AND NO OTHER OPERATIONS ARE QUEUED. IF INSERT OR STANDBY */
- /* WE DELETE THE ELEMENT OTHERWISE WE REMOVE THE LOCK FROM THE ELEMENT. */
- /* --------------------------------------------------------------------------------- */
- if (operationRecPtr.p->elementIsDisappeared == ZFALSE) {
- jam();
- taboElementptr = operationRecPtr.p->elementPointer;
- aboPageidptr.i = operationRecPtr.p->elementPage;
- tmp2Olq = ElementHeader::setUnlocked(operationRecPtr.p->hashvaluePart,
- operationRecPtr.p->scanBits);
- ptrCheckGuard(aboPageidptr, cpagesize, page8);
- dbgWord32(aboPageidptr, taboElementptr, tmp2Olq);
- arrGuard(taboElementptr, 2048);
- aboPageidptr.p->word32[taboElementptr] = tmp2Olq;
- return;
- } else {
- jam();
- commitdelete(signal, false);
- }//if
- }//if
- } else {
- /* --------------------------------------------------------------- */
- // We are not the lock owner.
- /* --------------------------------------------------------------- */
- jam();
- takeOutFragWaitQue(signal);
- if (operationRecPtr.p->prevParallelQue != RNIL) {
- jam();
- /* ---------------------------------------------------------------------------------- */
- /* SINCE WE ARE NOT QUEUE LEADER WE NEED NOT CONSIDER IF THE ELEMENT IS TO BE DELETED.*/
- /* We will simply remove it from the parallel list without any other rearrangements. */
- /* ---------------------------------------------------------------------------------- */
- aboOperRecPtr.i = operationRecPtr.p->prevParallelQue;
- ptrCheckGuard(aboOperRecPtr, coprecsize, operationrec);
- aboOperRecPtr.p->nextParallelQue = operationRecPtr.p->nextParallelQue;
- if (operationRecPtr.p->nextParallelQue != RNIL) {
- jam();
- aboOperRecPtr.i = operationRecPtr.p->nextParallelQue;
- ptrCheckGuard(aboOperRecPtr, coprecsize, operationrec);
- aboOperRecPtr.p->prevParallelQue = operationRecPtr.p->prevParallelQue;
- }//if
- } else if (operationRecPtr.p->prevSerialQue != RNIL) {
- /* ------------------------------------------------------------------------- */
- // We are not in the parallel queue owning the lock. Thus we are in another parallel
- // queue longer down in the serial queue. We are however first since prevParallelQue
- // == RNIL.
- /* ------------------------------------------------------------------------- */
- if (operationRecPtr.p->nextParallelQue != RNIL) {
- jam();
- /* ------------------------------------------------------------------------- */
- // We have an operation in the queue after us. We simply rearrange this parallel queue.
- // The new leader of this parallel queue will be operation in the serial queue.
- /* ------------------------------------------------------------------------- */
- aboOperRecPtr.i = operationRecPtr.p->nextParallelQue;
- ptrCheckGuard(aboOperRecPtr, coprecsize, operationrec);
- aboOperRecPtr.p->nextSerialQue = operationRecPtr.p->nextSerialQue;
- aboOperRecPtr.p->prevSerialQue = operationRecPtr.p->prevSerialQue;
- aboOperRecPtr.p->prevParallelQue = RNIL; // Queue Leader
- if (operationRecPtr.p->nextSerialQue != RNIL) {
- jam();
- TaboOperRecPtr.i = operationRecPtr.p->nextSerialQue;
- ptrCheckGuard(TaboOperRecPtr, coprecsize, operationrec);
- TaboOperRecPtr.p->prevSerialQue = aboOperRecPtr.i;
- }//if
- TaboOperRecPtr.i = operationRecPtr.p->prevSerialQue;
- ptrCheckGuard(TaboOperRecPtr, coprecsize, operationrec);
- TaboOperRecPtr.p->nextSerialQue = aboOperRecPtr.i;
- } else {
- jam();
- /* ------------------------------------------------------------------------- */
- // We are the only operation in this parallel queue. We will thus shrink the serial
- // queue.
- /* ------------------------------------------------------------------------- */
- aboOperRecPtr.i = operationRecPtr.p->prevSerialQue;
- ptrCheckGuard(aboOperRecPtr, coprecsize, operationrec);
- aboOperRecPtr.p->nextSerialQue = operationRecPtr.p->nextSerialQue;
- if (operationRecPtr.p->nextSerialQue != RNIL) {
- jam();
- aboOperRecPtr.i = operationRecPtr.p->nextSerialQue;
- ptrCheckGuard(aboOperRecPtr, coprecsize, operationrec);
- aboOperRecPtr.p->prevSerialQue = operationRecPtr.p->prevSerialQue;
- }//if
- }//if
- }//if
- }//if
- /* ------------------------------------------------------------------------- */
- // If prevParallelQue = RNIL and prevSerialQue = RNIL and we are not owner of the
- // lock then we cannot be in any lock queue at all.
- /* ------------------------------------------------------------------------- */
- }//Dbacc::abortOperation()
- void Dbacc::commitDeleteCheck()
- {
- OperationrecPtr opPtr;
- OperationrecPtr lastOpPtr;
- OperationrecPtr deleteOpPtr;
- bool elementDeleted = false;
- bool deleteCheckOngoing = true;
- Uint32 hashValue = 0;
- lastOpPtr = operationRecPtr;
- opPtr.i = operationRecPtr.p->nextParallelQue;
- while (opPtr.i != RNIL) {
- jam();
- ptrCheckGuard(opPtr, coprecsize, operationrec);
- lastOpPtr = opPtr;
- opPtr.i = opPtr.p->nextParallelQue;
- }//while
- deleteOpPtr = lastOpPtr;
- do {
- if (deleteOpPtr.p->operation == ZDELETE) {
- jam();
- /* --------------------------------------------------------------------------------- */
- /* IF THE CURRENT OPERATION TO BE COMMITTED IS A DELETE OPERATION DUE TO A */
- /* SCAN-TAKEOVER THE ACTUAL DELETE WILL BE PERFORMED BY THE PREVIOUS OPERATION (SCAN)*/
- /* IN THE PARALLEL QUEUE WHICH OWNS THE LOCK.THE PROBLEM IS THAT THE SCAN OPERATION */
- /* DOES NOT HAVE A HASH VALUE ASSIGNED TO IT SO WE COPY IT FROM THIS OPERATION. */
- /* */
- /* WE ASSUME THAT THIS SOLUTION WILL WORK BECAUSE THE ONLY WAY A SCAN CAN PERFORM */
- /* A DELETE IS BY BEING FOLLOWED BY A NORMAL DELETE-OPERATION THAT HAS A HASH VALUE. */
- /* --------------------------------------------------------------------------------- */
- hashValue = deleteOpPtr.p->hashValue;
- elementDeleted = true;
- deleteCheckOngoing = false;
- } else if ((deleteOpPtr.p->operation == ZREAD) ||
- (deleteOpPtr.p->operation == ZSCAN_OP)) {
- /* --------------------------------------------------------------------------------- */
- /* We are trying to find out whether the commit will in the end delete the tuple. */
- /* Normally the delete will be the last operation in the list of operations on this */
- /* It is however possible to issue reads and scans in the same savepoint as the */
- /* delete operation was issued and these can end up after the delete in the list of */
- /* operations in the parallel queue. Thus if we discover a read or a scan we have to */
- /* continue scanning the list looking for a delete operation. */
- /* --------------------------------------------------------------------------------- */
- deleteOpPtr.i = deleteOpPtr.p->prevParallelQue;
- if (deleteOpPtr.i == RNIL) {
- jam();
- deleteCheckOngoing = false;
- } else {
- jam();
- ptrCheckGuard(deleteOpPtr, coprecsize, operationrec);
- }//if
- } else {
- jam();
- /* --------------------------------------------------------------------------------- */
- /* Finding an UPDATE or INSERT before finding a DELETE means we cannot be deleting */
- /* as the end result of this transaction. */
- /* --------------------------------------------------------------------------------- */
- deleteCheckOngoing = false;
- }//if
- } while (deleteCheckOngoing);
- opPtr = lastOpPtr;
- do {
- jam();
- opPtr.p->commitDeleteCheckFlag = ZTRUE;
- if (elementDeleted) {
- jam();
- opPtr.p->elementIsDisappeared = ZTRUE;
- opPtr.p->hashValue = hashValue;
- }//if
- opPtr.i = opPtr.p->prevParallelQue;
- if (opPtr.i == RNIL) {
- jam();
- break;
- }//if
- ptrCheckGuard(opPtr, coprecsize, operationrec);
- } while (true);
- }//Dbacc::commitDeleteCheck()
- /* ------------------------------------------------------------------------- */
- /* COMMIT_OPERATION */
- /* INPUT: OPERATION_REC_PTR, POINTER TO AN OPERATION RECORD */
- /* DESCRIPTION: THE OPERATION RECORD WILL BE TAKE OUT OF ANY LOCK QUEUE. */
- /* IF IT OWNS THE ELEMENT LOCK. HEAD OF THE ELEMENT WILL BE UPDATED. */
- /* ------------------------------------------------------------------------- */
- void Dbacc::commitOperation(Signal* signal)
- {
- OperationrecPtr tolqTmpPtr;
- Page8Ptr coPageidptr;
- Uint32 tcoElementptr;
- Uint32 tmp2Olq;
- if ((operationRecPtr.p->commitDeleteCheckFlag == ZFALSE) &&
- (operationRecPtr.p->operation != ZSCAN_OP) &&
- (operationRecPtr.p->operation != ZREAD)) {
- jam();
- /* This method is used to check whether the end result of the transaction
- will be to delete the tuple. In this case all operation will be marked
- with elementIsDisappeared = true to ensure that the last operation
- committed will remove the tuple. We only run this once per transaction
- (commitDeleteCheckFlag = true if performed earlier) and we don't
- execute this code when committing a scan operation since committing
- a scan operation only means that the scan is continuing and the scan
- lock is released.
- */
- commitDeleteCheck();
- }//if
- if (operationRecPtr.p->lockOwner == ZTRUE) {
- takeOutLockOwnersList(signal, operationRecPtr);
- if ((operationRecPtr.p->nextParallelQue == RNIL) &&
- (operationRecPtr.p->nextSerialQue == RNIL) &&
- (operationRecPtr.p->elementIsDisappeared == ZFALSE)) {
- /*
- This is the normal path through the commit for operations owning the
- lock without any queues and not a delete operation.
- */
- coPageidptr.i = operationRecPtr.p->elementPage;
- tcoElementptr = operationRecPtr.p->elementPointer;
- tmp2Olq = ElementHeader::setUnlocked(operationRecPtr.p->hashvaluePart,
- operationRecPtr.p->scanBits);
- ptrCheckGuard(coPageidptr, cpagesize, page8);
- dbgWord32(coPageidptr, tcoElementptr, tmp2Olq);
- arrGuard(tcoElementptr, 2048);
- coPageidptr.p->word32[tcoElementptr] = tmp2Olq;
- return;
- } else if ((operationRecPtr.p->nextParallelQue != RNIL) ||
- (operationRecPtr.p->nextSerialQue != RNIL)) {
- jam();
- /*
- The case when there is a queue lined up.
- Release the lock and pass it to the next operation lined up.
- */
- releaselock(signal);
- return;
- } else {
- jam();
- /*
- No queue and elementIsDisappeared is true. We perform the actual delete
- operation.
- */
- commitdelete(signal, false);
- return;
- }//if
- } else {
- /*
- THE OPERATION DOES NOT OWN THE LOCK. IT MUST BE IN A LOCK QUEUE OF THE
- ELEMENT.
- */
- ndbrequire(operationRecPtr.p->prevParallelQue != RNIL);
- jam();
- tolqTmpPtr.i = operationRecPtr.p->prevParallelQue;
- ptrCheckGuard(tolqTmpPtr, coprecsize, operationrec);
- tolqTmpPtr.p->nextParallelQue = operationRecPtr.p->nextParallelQue;
- if (operationRecPtr.p->nextParallelQue != RNIL) {
- jam();
- tolqTmpPtr.i = operationRecPtr.p->nextParallelQue;
- ptrCheckGuard(tolqTmpPtr, coprecsize, operationrec);
- tolqTmpPtr.p->prevParallelQue = operationRecPtr.p->prevParallelQue;
- }//if
- /**
- * Check possible lock upgrade
- * 1) Find lock owner
- * 2) Count transactions in parallel que
- * 3) If count == 1 and TRANSID(next serial) == TRANSID(lock owner)
- * upgrade next serial
- */
- if(operationRecPtr.p->lockMode)
- {
- jam();
- /**
- * Committing a non shared operation can't lead to lock upgrade
- */
- return;
- }
-
- OperationrecPtr lock_owner;
- lock_owner.i = operationRecPtr.p->prevParallelQue;
- ptrCheckGuard(lock_owner, coprecsize, operationrec);
- Uint32 transid[2] = { lock_owner.p->transId1,
- lock_owner.p->transId2 };
-
-
- while(lock_owner.p->prevParallelQue != RNIL)
- {
- lock_owner.i = lock_owner.p->prevParallelQue;
- ptrCheckGuard(lock_owner, coprecsize, operationrec);
-
- if(lock_owner.p->transId1 != transid[0] ||
- lock_owner.p->transId2 != transid[1])
- {
- jam();
- /**
- * If more than 1 trans in lock queue -> no lock upgrade
- */
- return;
- }
- }
-
- check_lock_upgrade(signal, lock_owner, operationRecPtr);
- }
- }//Dbacc::commitOperation()
- void
- Dbacc::check_lock_upgrade(Signal* signal,
- OperationrecPtr lock_owner,
- OperationrecPtr release_op)
- {
- if((lock_owner.p->transId1 == release_op.p->transId1 &&
- lock_owner.p->transId2 == release_op.p->transId2) ||
- release_op.p->lockMode ||
- lock_owner.p->nextSerialQue == RNIL)
- {
- jam();
- /**
- * No lock upgrade if same trans or lock owner has no serial queue
- * or releasing non shared op
- */
- return;
- }
- OperationrecPtr next;
- next.i = lock_owner.p->nextSerialQue;
- ptrCheckGuard(next, coprecsize, operationrec);
-
- if(lock_owner.p->transId1 != next.p->transId1 ||
- lock_owner.p->transId2 != next.p->transId2)
- {
- jam();
- /**
- * No lock upgrad if !same trans in serial queue
- */
- return;
- }
-
- if (getNoParallelTransaction(lock_owner.p) > 1)
- {
- jam();
- /**
- * No lock upgrade if more than 1 transaction in parallell queue
- */
- return;
- }
- if (getNoParallelTransaction(next.p) > 1)
- {
- jam();
- /**
- * No lock upgrade if more than 1 transaction in next's parallell queue
- */
- return;
- }
-
- OperationrecPtr tmp;
- tmp.i = lock_owner.p->nextSerialQue = next.p->nextSerialQue;
- if(tmp.i != RNIL)
- {
- ptrCheckGuard(tmp, coprecsize, operationrec);
- ndbassert(tmp.p->prevSerialQue == next.i);
- tmp.p->prevSerialQue = lock_owner.i;
- }
- next.p->nextSerialQue = next.p->prevSerialQue = RNIL;
-
- // Find end of parallell que
- tmp = lock_owner;
- Uint32 lockMode = next.p->lockMode > lock_owner.p->lockMode ?
- next.p->lockMode : lock_owner.p->lockMode;
- while(tmp.p->nextParallelQue != RNIL)
- {
- jam();
- tmp.i = tmp.p->nextParallelQue;
- tmp.p->lockMode = lockMode;
- ptrCheckGuard(tmp, coprecsize, operationrec);
- }
- tmp.p->lockMode = lockMode;
-
- next.p->prevParallelQue = tmp.i;
- tmp.p->nextParallelQue = next.i;
-
- OperationrecPtr save = operationRecPtr;
- Uint32 localdata[2];
- localdata[0] = lock_owner.p->localdata[0];
- localdata[1] = lock_owner.p->localdata[1];
- do {
- next.p->localdata[0] = localdata[0];
- next.p->localdata[1] = localdata[1];
- next.p->lockMode = lockMode;
-
- operationRecPtr = next;
- executeNextOperation(signal);
- if (next.p->nextParallelQue != RNIL)
- {
- jam();
- next.i = next.p->nextParallelQue;
- ptrCheckGuard(next, coprecsize, operationrec);
- } else {
- jam();
- break;
- }//if
- } while (1);
-
- operationRecPtr = save;
-
- }
- /* ------------------------------------------------------------------------- */
- /* RELEASELOCK */
- /* RESETS LOCK OF AN ELEMENT. */
- /* INFORMATION ABOUT THE ELEMENT IS SAVED IN THE OPERATION RECORD */
- /* THESE INFORMATION IS USED TO UPDATE HEADER OF THE ELEMENT */
- /* ------------------------------------------------------------------------- */
- void Dbacc::releaselock(Signal* signal)
- {
- OperationrecPtr rloOperPtr;
- OperationrecPtr trlOperPtr;
- OperationrecPtr trlTmpOperPtr;
- Uint32 TelementIsDisappeared;
- trlOperPtr.i = RNIL;
- if (operationRecPtr.p->nextParallelQue != RNIL) {
- jam();
- /* --------------------------------------------------------------------------------- */
- /* NEXT OPERATION TAKES OVER THE LOCK. We will simply move the info from the leader */
- // to the new queue leader.
- /* --------------------------------------------------------------------------------- */
- trlOperPtr.i = operationRecPtr.p->nextParallelQue;
- ptrCheckGuard(trlOperPtr, coprecsize, operationrec);
- copyInOperPtr = trlOperPtr;
- copyOperPtr = operationRecPtr;
- copyOpInfo(signal);
- trlOperPtr.p->prevParallelQue = RNIL;
- if (operationRecPtr.p->nextSerialQue != RNIL) {
- jam();
- /* --------------------------------------------------------------------------------- */
- /* THERE IS A SERIAL QUEUE. MOVE IT FROM RELEASED OP REC TO THE NEW LOCK OWNER. */
- /* --------------------------------------------------------------------------------- */
- trlOperPtr.p->nextSerialQue = operationRecPtr.p->nextSerialQue;
- trlTmpOperPtr.i = trlOperPtr.p->nextSerialQue;
- ptrCheckGuard(trlTmpOperPtr, coprecsize, operationrec);
- trlTmpOperPtr.p->prevSerialQue = trlOperPtr.i;
- }//if
- check_lock_upgrade(signal, copyInOperPtr, operationRecPtr);
- /* --------------------------------------------------------------------------------- */
- /* SINCE THERE ARE STILL ITEMS IN THE PARALLEL QUEUE WE NEED NOT WORRY ABOUT */
- /* STARTING QUEUED OPERATIONS. THUS WE CAN END HERE. */
- /* --------------------------------------------------------------------------------- */
- } else {
- ndbrequire(operationRecPtr.p->nextSerialQue != RNIL);
- jam();
- /* --------------------------------------------------------------------------------- */
- /* THE PARALLEL QUEUE IS EMPTY AND THE SERIAL QUEUE IS NOT EMPTY. WE NEED TO */
- /* REARRANGE LISTS AND START A NUMBER OF OPERATIONS. */
- /* --------------------------------------------------------------------------------- */
- trlOperPtr.i = operationRecPtr.p->nextSerialQue;
- ptrCheckGuard(trlOperPtr, coprecsize, operationrec);
- copyOperPtr = operationRecPtr;
- copyInOperPtr = trlOperPtr;
- copyOpInfo(signal);
- trlOperPtr.p->prevSerialQue = RNIL;
- ndbrequire(trlOperPtr.p->prevParallelQue == RNIL);
- /* --------------------------------------------------------------------------------- */
- /* WE HAVE MOVED TO THE NEXT PARALLEL QUEUE. WE MUST START ALL OF THOSE */
- /* OPERATIONS WHICH UP TILL NOW HAVE BEEN QUEUED WAITING FOR THE LOCK. */
- /* --------------------------------------------------------------------------------- */
- rloOperPtr = operationRecPtr;
- trlTmpOperPtr = trlOperPtr;
- TelementIsDisappeared = trlOperPtr.p->elementIsDisappeared;
- Uint32 ThashValue = trlOperPtr.p->hashValue;
- do {
- /* --------------------------------------------------------------------------------- */
- // Ensure that all operations in the queue are assigned with the elementIsDisappeared
- // to ensure that the element is removed after a previous delete. An insert does
- // however revert this decision since the element is put back again. Local checkpoints
- // complicate life here since they do not execute the next operation but simply change
- // the state on the operation. We need to set-up the variable elementIsDisappeared
- // properly even when local checkpoints and inserts/writes after deletes occur.
- /* --------------------------------------------------------------------------------- */
- trlTmpOperPtr.p->elementIsDisappeared = TelementIsDisappeared;
- if (TelementIsDisappeared == ZTRUE) {
- /* --------------------------------------------------------------------------------- */
- // If the elementIsDisappeared is set then we know that the hashValue is also set
- // since it always originates from a committing abort or a aborting insert. Scans
- // do not initialise the hashValue and must have this value initialised if they are
- // to successfully commit the delete.
- /* --------------------------------------------------------------------------------- */
- jam();
- trlTmpOperPtr.p->hashValue = ThashValue;
- }//if
- trlTmpOperPtr.p->localdata[0] = trlOperPtr.p->localdata[0];
- trlTmpOperPtr.p->localdata[1] = trlOperPtr.p->localdata[1];
- /* --------------------------------------------------------------------------------- */
- // Restart the queued operation.
- /* --------------------------------------------------------------------------------- */
- operationRecPtr = trlTmpOperPtr;
- TelementIsDisappeared = executeNextOperation(signal);
- ThashValue = operationRecPtr.p->hashValue;
- if (trlTmpOperPtr.p->nextParallelQue != RNIL) {
- jam();
- /* --------------------------------------------------------------------------------- */
- // We will continue with the next operation in the parallel queue and start this as
- // well.
- /* --------------------------------------------------------------------------------- */
- trlTmpOperPtr.i = trlTmpOperPtr.p->nextParallelQue;
- ptrCheckGuard(trlTmpOperPtr, coprecsize, operationrec);
- } else {
- jam();
- break;
- }//if
- } while (1);
- operationRecPtr = rloOperPtr;
- }//if
- // Insert the next op into the lock owner list
- insertLockOwnersList(signal, trlOperPtr);
- return;
- }//Dbacc::releaselock()
- /* --------------------------------------------------------------------------------- */
- /* COPY_OP_INFO */
- /* INPUT: COPY_IN_OPER_PTR AND COPY_OPER_PTR. */
- /* DESCRIPTION:INFORMATION ABOUT THE ELEMENT WILL BE MOVED FROM OPERATION */
- /* REC TO QUEUE OP REC. QUE OP REC TAKES OVER THE LOCK. */
- /* --------------------------------------------------------------------------------- */
- void Dbacc::copyOpInfo(Signal* signal)
- {
- Page8Ptr coiPageidptr;
- copyInOperPtr.p->elementPage = copyOperPtr.p->elementPage;
- copyInOperPtr.p->elementIsforward = copyOperPtr.p->elementIsforward;
- copyInOperPtr.p->elementContainer = copyOperPtr.p->elementContainer;
- copyInOperPtr.p->elementPointer = copyOperPtr.p->elementPointer;
- copyInOperPtr.p->scanBits = copyOperPtr.p->scanBits;
- copyInOperPtr.p->hashvaluePart = copyOperPtr.p->hashvaluePart;
- copyInOperPtr.p->elementIsDisappeared = copyOperPtr.p->elementIsDisappeared;
- if (copyInOperPtr.p->elementIsDisappeared == ZTRUE) {
- /* --------------------------------------------------------------------------------- */
- // If the elementIsDisappeared is set then we know that the hashValue is also set
- // since it always originates from a committing abort or a aborting insert. Scans
- // do not initialise the hashValue and must have this value initialised if they are
- // to successfully commit the delete.
- /* --------------------------------------------------------------------------------- */
- jam();
- copyInOperPtr.p->hashValue = copyOperPtr.p->hashValue;
- }//if
- coiPageidptr.i = copyOperPtr.p->elementPage;
- ptrCheckGuard(coiPageidptr, cpagesize, page8);
- const Uint32 tmp = ElementHeader::setLocked(copyInOperPtr.i);
- dbgWord32(coiPageidptr, copyOperPtr.p->elementPointer, tmp);
- arrGuard(copyOperPtr.p->elementPointer, 2048);
- coiPageidptr.p->word32[copyOperPtr.p->elementPointer] = tmp;
- copyInOperPtr.p->localdata[0] = copyOperPtr.p->localdata[0];
- copyInOperPtr.p->localdata[1] = copyOperPtr.p->localdata[1];
- }//Dbacc::copyOpInfo()
- /* ******************--------------------------------------------------------------- */
- /* EXECUTE NEXT OPERATION */
- /* NEXT OPERATION IN A LOCK QUEUE WILL BE EXECUTED. */
- /* --------------------------------------------------------------------------------- */
- Uint32 Dbacc::executeNextOperation(Signal* signal)
- {
- ndbrequire(operationRecPtr.p->transactionstate == ACTIVE);
- if (fragrecptr.p->stopQueOp == ZTRUE) {
- Uint32 TelemDisappeared;
- jam();
- TelemDisappeared = operationRecPtr.p->elementIsDisappeared;
- if ((operationRecPtr.p->elementIsDisappeared == ZTRUE) &&
- (operationRecPtr.p->prevParallelQue == RNIL) &&
- ((operationRecPtr.p->operation == ZINSERT) ||
- (operationRecPtr.p->operation == ZWRITE))) {
- jam();
- /* --------------------------------------------------------------------------------- */
- // In this case we do not wish to change the elementIsDisappeared since that would
- // create an error the next time this method is called for this operation after local
- // checkpoint starts up operations again. We must however ensure that operations
- // that follow in the queue do not get the value ZTRUE when actually an INSERT/WRITE
- // precedes them (only if the INSERT/WRITE is the first operation).
- /* --------------------------------------------------------------------------------- */
- TelemDisappeared = ZFALSE;
- }//if
- /* --------------------------------------------------------------------------------- */
- /* A LOCAL CHECKPOINT HAS STOPPED OPERATIONS. WE MUST NOT START THE OPERATION */
- /* AT THIS TIME. WE SET THE STATE TO INDICATE THAT WE ARE READY TO START AS */
- /* SOON AS WE ARE ALLOWED. */
- /* --------------------------------------------------------------------------------- */
- operationRecPtr.p->opState = WAIT_EXE_OP;
- return TelemDisappeared;
- }//if
- takeOutFragWaitQue(signal);
- if (operationRecPtr.p->elementIsDisappeared == ZTRUE) {
- /* --------------------------------------------------------------------------------- */
- /* PREVIOUS OPERATION WAS DELETE OPERATION AND THE ELEMENT IS ALREADY DELETED. */
- /* --------------------------------------------------------------------------------- */
- if (((operationRecPtr.p->operation != ZINSERT) &&
- (operationRecPtr.p->operation != ZWRITE)) ||
- (operationRecPtr.p->prevParallelQue != RNIL)) {
- if (operationRecPtr.p->operation != ZSCAN_OP ||
- operationRecPtr.p->isAccLockReq) {
- jam();
- /* --------------------------------------------------------------------------------- */
- // Updates and reads with a previous delete simply aborts with read error indicating
- // that tuple did not exist. Also inserts and writes not being the first operation.
- /* --------------------------------------------------------------------------------- */
- operationRecPtr.p->transactionstate = WAIT_COMMIT_ABORT;
- signal->theData[0] = operationRecPtr.p->userptr;
- signal->theData[1] = ZREAD_ERROR;
- sendSignal(operationRecPtr.p->userblockref, GSN_ACCKEYREF, signal, 2, JBB);
- return operationRecPtr.p->elementIsDisappeared;
- } else {
- /* --------------------------------------------------------------------------------- */
- /* ABORT OF OPERATION NEEDED BUT THE OPERATION IS A SCAN => SPECIAL TREATMENT. */
- /* IF THE SCAN WAITS IN QUEUE THEN WE MUST REMOVE THE OPERATION FROM THE SCAN */
- /* LOCK QUEUE AND IF NO MORE OPERATIONS ARE QUEUED THEN WE SHOULD RESTART THE */
- /* SCAN PROCESS. OTHERWISE WE SIMPLY RELEASE THE OPERATION AND DECREASE THE */
- /* NUMBER OF LOCKS HELD. */
- /* --------------------------------------------------------------------------------- */
- takeOutScanLockQueue(operationRecPtr.p->scanRecPtr);
- putReadyScanQueue(signal, operationRecPtr.p->scanRecPtr);
- return operationRecPtr.p->elementIsDisappeared;
- }//if
- }//if
- /* --------------------------------------------------------------------------------- */
- // Insert and writes can continue but need to be converted to inserts.
- /* --------------------------------------------------------------------------------- */
- jam();
- operationRecPtr.p->elementIsDisappeared = ZFALSE;
- operationRecPtr.p->operation = ZINSERT;
- operationRecPtr.p->insertIsDone = ZTRUE;
- } else if (operationRecPtr.p->operation == ZINSERT) {
- bool abortFlag = true;
- if (operationRecPtr.p->prevParallelQue != RNIL) {
- OperationrecPtr prevOpPtr;
- jam();
- prevOpPtr.i = operationRecPtr.p->prevParallelQue;
- ptrCheckGuard(prevOpPtr, coprecsize, operationrec);
- if (prevOpPtr.p->operation == ZDELETE) {
- jam();
- abortFlag = false;
- }//if
- }//if
- if (abortFlag) {
- jam();
- /* --------------------------------------------------------------------------------- */
- /* ELEMENT STILL REMAINS AND WE ARE TRYING TO INSERT IT AGAIN. THIS IS CLEARLY */
- /* NOT A GOOD IDEA. */
- /* --------------------------------------------------------------------------------- */
- operationRecPtr.p->transactionstate = WAIT_COMMIT_ABORT;
- signal->theData[0] = operationRecPtr.p->userptr;
- signal->theData[1] = ZWRITE_ERROR;
- sendSignal(operationRecPtr.p->userblockref, GSN_ACCKEYREF, signal, 2, JBB);
- return operationRecPtr.p->elementIsDisappeared;
- }//if
- }
- else if(operationRecPtr.p->operation == ZWRITE)
- {
- jam();
- operationRecPtr.p->operation = ZINSERT;
- if (operationRecPtr.p->prevParallelQue != RNIL) {
- OperationrecPtr prevOpPtr;
- jam();
- prevOpPtr.i = operationRecPtr.p->prevParallelQue;
- ptrCheckGuard(prevOpPtr, coprecsize, operationrec);
- if (prevOpPtr.p->operation != ZDELETE)
- {
- jam();
- operationRecPtr.p->operation = ZUPDATE;
- }
- }
- }
- if (operationRecPtr.p->operation == ZSCAN_OP &&
- ! operationRecPtr.p->isAccLockReq) {
- jam();
- takeOutScanLockQueue(operationRecPtr.p->scanRecPtr);
- putReadyScanQueue(signal, operationRecPtr.p->scanRecPtr);
- } else {
- jam();
- sendAcckeyconf(signal);
- sendSignal(operationRecPtr.p->userblockref, GSN_ACCKEYCONF, signal, 6, JBB);
- }//if
- return operationRecPtr.p->elementIsDisappeared;
- }//Dbacc::executeNextOperation()
- /* --------------------------------------------------------------------------------- */
- /* TAKE_OUT_FRAG_WAIT_QUE */
- /* DESCRIPTION: AN OPERATION WHICH OWNS A LOCK OF AN ELEMENT, IS IN A LIST */
- /* OF THE FRAGMENT. THIS LIST IS USED TO STOP THE QUEUE OPERATION */
- /* DURING CREATE CHECK POINT PROSESS FOR STOP AND RESTART OF THE */
- /* OPERATIONS. THIS SUBRUTIN TAKES A OPERATION RECORD OUT OF THE LIST */
- /* -------------------------------------------------------------------------------- */
- void Dbacc::takeOutFragWaitQue(Signal* signal)
- {
- OperationrecPtr tofwqOperRecPtr;
- if (operationRecPtr.p->opState == WAIT_IN_QUEUE) {
- if (fragrecptr.p->sentWaitInQueOp == operationRecPtr.i) {
- jam();
- fragrecptr.p->sentWaitInQueOp = operationRecPtr.p->nextQueOp;
- }//if
- if (operationRecPtr.p->prevQueOp != RNIL) {
- jam();
- tofwqOperRecPtr.i = operationRecPtr.p->prevQueOp;
- ptrCheckGuard(tofwqOperRecPtr, coprecsize, operationrec);
- tofwqOperRecPtr.p->nextQueOp = operationRecPtr.p->nextQueOp;
- } else {
- jam();
- fragrecptr.p->firstWaitInQueOp = operationRecPtr.p->nextQueOp;
- }//if
- if (operationRecPtr.p->nextQueOp != RNIL) {
- jam();
- tofwqOperRecPtr.i = operationRecPtr.p->nextQueOp;
- ptrCheckGuard(tofwqOperRecPtr, coprecsize, operationrec);
- tofwqOperRecPtr.p->prevQueOp = operationRecPtr.p->prevQueOp;
- } else {
- jam();
- fragrecptr.p->lastWaitInQueOp = operationRecPtr.p->prevQueOp;
- }//if
- operationRecPtr.p->opState = FREE_OP;
- return;
- } else {
- ndbrequire(operationRecPtr.p->opState == FREE_OP);
- }//if
- }//Dbacc::takeOutFragWaitQue()
- /**
- * takeOutLockOwnersList
- *
- * Description: Take out an operation from the doubly linked
- * lock owners list on the fragment.
- *
- */
- void Dbacc::takeOutLockOwnersList(Signal* signal,
- const OperationrecPtr& outOperPtr)
- {
- const Uint32 Tprev = outOperPtr.p->prevLockOwnerOp;
- const Uint32 Tnext = outOperPtr.p->nextLockOwnerOp;
- #ifdef VM_TRACE
- // Check that operation is already in the list
- OperationrecPtr tmpOperPtr;
- bool inList = false;
- tmpOperPtr.i = fragrecptr.p->lockOwnersList;
- while (tmpOperPtr.i != RNIL){
- ptrCheckGuard(tmpOperPtr, coprecsize, operationrec);
- if (tmpOperPtr.i == outOperPtr.i)
- inList = true;
- tmpOperPtr.i = tmpOperPtr.p->nextLockOwnerOp;
- }
- ndbrequire(inList == true);
- #endif
- ndbrequire(outOperPtr.p->lockOwner == ZTRUE);
- outOperPtr.p->lockOwner = ZFALSE;
- // Fast path through the code for the common case.
- if ((Tprev == RNIL) && (Tnext == RNIL)) {
- ndbrequire(fragrecptr.p->lockOwnersList == outOperPtr.i);
- fragrecptr.p->lockOwnersList = RNIL;
- return;
- }
- // Check previous operation
- if (Tprev != RNIL) {
- jam();
- arrGuard(Tprev, coprecsize);
- operationrec[Tprev].nextLockOwnerOp = Tnext;
- } else {
- fragrecptr.p->lockOwnersList = Tnext;
- }//if
- // Check next operation
- if (Tnext == RNIL) {
- return;
- } else {
- jam();
- arrGuard(Tnext, coprecsize);
- operationrec[Tnext].prevLockOwnerOp = Tprev;
- }//if
- return;
- }//Dbacc::takeOutLockOwnersList()
- /**
- * insertLockOwnersList
- *
- * Description: Insert an operation first in the dubly linked lock owners
- * list on the fragment.
- *
- */
- void Dbacc::insertLockOwnersList(Signal* signal,
- const OperationrecPtr& insOperPtr)
- {
- OperationrecPtr tmpOperPtr;
-
- #ifdef VM_TRACE
- // Check that operation is not already in list
- tmpOperPtr.i = fragrecptr.p->lockOwnersList;
- while(tmpOperPtr.i != RNIL){
- ptrCheckGuard(tmpOperPtr, coprecsize, operationrec);
- ndbrequire(tmpOperPtr.i != insOperPtr.i);
- tmpOperPtr.i = tmpOperPtr.p->nextLockOwnerOp;
- }
- #endif
- ndbrequire(insOperPtr.p->lockOwner == ZFALSE);
- insOperPtr.p->lockOwner = ZTRUE;
- insOperPtr.p->prevLockOwnerOp = RNIL;
- tmpOperPtr.i = fragrecptr.p->lockOwnersList;
- fragrecptr.p->lockOwnersList = insOperPtr.i;
- insOperPtr.p->nextLockOwnerOp = tmpOperPtr.i;
- if (tmpOperPtr.i == RNIL) {
- return;
- } else {
- jam();
- ptrCheckGuard(tmpOperPtr, coprecsize, operationrec);
- tmpOperPtr.p->prevLockOwnerOp = insOperPtr.i;
- }//if
- }//Dbacc::insertLockOwnersList()
- /* --------------------------------------------------------------------------------- */
- /* --------------------------------------------------------------------------------- */
- /* --------------------------------------------------------------------------------- */
- /* */
- /* END OF COMMIT AND ABORT MODULE */
- /* */
- /* --------------------------------------------------------------------------------- */
- /* --------------------------------------------------------------------------------- */
- /* --------------------------------------------------------------------------------- */
- /* --------------------------------------------------------------------------------- */
- /* --------------------------------------------------------------------------------- */
- /* ALLOC_OVERFLOW_PAGE */
- /* DESCRIPTION: */
- /* --------------------------------------------------------------------------------- */
- void Dbacc::allocOverflowPage(Signal* signal)
- {
- DirRangePtr aopDirRangePtr;
- DirectoryarrayPtr aopOverflowDirptr;
- OverflowRecordPtr aopOverflowRecPtr;
- Uint32 taopTmp1;
- Uint32 taopTmp2;
- Uint32 taopTmp3;
- tresult = 0;
- if ((cfirstfreepage == RNIL) &&
- (cfreepage >= cpagesize)) {
- jam();
- zpagesize_error("Dbacc::allocOverflowPage");
- tresult = ZPAGESIZE_ERROR;
- return;
- }//if
- if (fragrecptr.p->firstFreeDirindexRec != RNIL) {
- jam();
- /* FRAGRECPTR:FIRST_FREE_DIRINDEX_REC POINTS */
- /* TO THE FIRST ELEMENT IN A FREE LIST OF THE */
- /* DIRECTORY INDEX WICH HAVE NULL AS PAGE */
- aopOverflowRecPtr.i = fragrecptr.p->firstFreeDirindexRec;
- ptrCheckGuard(aopOverflowRecPtr, coverflowrecsize, overflowRecord);
- troOverflowRecPtr.p = aopOverflowRecPtr.p;
- takeRecOutOfFreeOverdir(signal);
- } else if (cfirstfreeoverrec == RNIL) {
- jam();
- tresult = ZOVER_REC_ERROR;
- return;
- } else if ((cfirstfreedir == RNIL) &&
- (cdirarraysize <= cdirmemory)) {
- jam();
- tresult = ZDIRSIZE_ERROR;
- return;
- } else {
- jam();
- seizeOverRec(signal);
- aopOverflowRecPtr = sorOverflowRecPtr;
- aopOverflowRecPtr.p->dirindex = fragrecptr.p->lastOverIndex;
- }//if
- aopOverflowRecPtr.p->nextOverRec = RNIL;
- aopOverflowRecPtr.p->prevOverRec = RNIL;
- fragrecptr.p->firstOverflowRec = aopOverflowRecPtr.i;
- fragrecptr.p->lastOverflowRec = aopOverflowRecPtr.i;
- taopTmp1 = aopOverflowRecPtr.p->dirindex;
- aopDirRangePtr.i = fragrecptr.p->overflowdir;
- taopTmp2 = taopTmp1 >> 8;
- taopTmp3 = taopTmp1 & 0xff;
- ptrCheckGuard(aopDirRangePtr, cdirrangesize, dirRange);
- arrGuard(taopTmp2, 256);
- if (aopDirRangePtr.p->dirArray[taopTmp2] == RNIL) {
- jam();
- seizeDirectory(signal);
- ndbrequire(tresult <= ZLIMIT_OF_ERROR);
- aopDirRangePtr.p->dirArray[taopTmp2] = sdDirptr.i;
- }//if
- aopOverflowDirptr.i = aopDirRangePtr.p->dirArray[taopTmp2];
- seizePage(signal);
- ndbrequire(tresult <= ZLIMIT_OF_ERROR);
- ptrCheckGuard(aopOverflowDirptr, cdirarraysize, directoryarray);
- aopOverflowDirptr.p->pagep[taopTmp3] = spPageptr.i;
- tiopPageId = aopOverflowRecPtr.p->dirindex;
- iopOverflowRecPtr = aopOverflowRecPtr;
- iopPageptr = spPageptr;
- initOverpage(signal);
- aopOverflowRecPtr.p->overpage = spPageptr.i;
- if (fragrecptr.p->lastOverIndex <= aopOverflowRecPtr.p->dirindex) {
- jam();
- ndbrequire(fragrecptr.p->lastOverIndex == aopOverflowRecPtr.p->dirindex);
- fragrecptr.p->lastOverIndex++;
- }//if
- }//Dbacc::allocOverflowPage()
- /* --------------------------------------------------------------------------------- */
- /* --------------------------------------------------------------------------------- */
- /* --------------------------------------------------------------------------------- */
- /* */
- /* EXPAND/SHRINK MODULE */
- /* */
- /* --------------------------------------------------------------------------------- */
- /* --------------------------------------------------------------------------------- */
- /* ******************--------------------------------------------------------------- */
- /*EXPANDCHECK EXPAND BUCKET ORD */
- /* SENDER: ACC, LEVEL B */
- /* INPUT: FRAGRECPTR, POINTS TO A FRAGMENT RECORD. */
- /* DESCRIPTION: A BUCKET OF A FRAGMENT PAGE WILL BE EXPAND INTO TWO BUCKETS */
- /* ACCORDING TO LH3. */
- /* ******************--------------------------------------------------------------- */
- /* ******************--------------------------------------------------------------- */
- /* EXPANDCHECK EXPAND BUCKET ORD */
- /* ******************------------------------------+ */
- /* SENDER: ACC, LEVEL B */
- /* A BUCKET OF THE FRAGMENT WILL */
- /* BE EXPANDED ACORDING TO LH3, */
- /* AND COMMIT TRANSACTION PROCESS */
- /* WILL BE CONTINUED */
- Uint32 Dbacc::checkScanExpand(Signal* signal)
- {
- Uint32 Ti;
- Uint32 TreturnCode = 0;
- Uint32 TPageIndex;
- Uint32 TDirInd;
- Uint32 TSplit;
- Uint32 TreleaseInd = 0;
- Uint32 TreleaseScanBucket;
- Uint32 TreleaseScanIndicator[4];
- DirectoryarrayPtr TDirptr;
- DirRangePtr TDirRangePtr;
- Page8Ptr TPageptr;
- ScanRecPtr TscanPtr;
- RootfragmentrecPtr Trootfragrecptr;
- Trootfragrecptr.i = fragrecptr.p->myroot;
- TSplit = fragrecptr.p->p;
- ptrCheckGuard(Trootfragrecptr, crootfragmentsize, rootfragmentrec);
- for (Ti = 0; Ti < 4; Ti++) {
- TreleaseScanIndicator[Ti] = 0;
- if (Trootfragrecptr.p->scan[Ti] != RNIL) {
- //-------------------------------------------------------------
- // A scan is ongoing on this particular local fragment. We have
- // to check its current state.
- //-------------------------------------------------------------
- TscanPtr.i = Trootfragrecptr.p->scan[Ti];
- ptrCheckGuard(TscanPtr, cscanRecSize, scanRec);
- if (TscanPtr.p->activeLocalFrag == fragrecptr.i) {
- if (TscanPtr.p->scanBucketState == ScanRec::FIRST_LAP) {
- if (TSplit == TscanPtr.p->nextBucketIndex) {
- jam();
- //-------------------------------------------------------------
- // We are currently scanning this bucket. We cannot split it
- // simultaneously with the scan. We have to pass this offer for
- // splitting the bucket.
- //-------------------------------------------------------------
- TreturnCode = 1;
- return TreturnCode;
- } else if (TSplit > TscanPtr.p->nextBucketIndex) {
- jam();
- //-------------------------------------------------------------
- // This bucket has not yet been scanned. We must reset the scanned
- // bit indicator for this scan on this bucket.
- //-------------------------------------------------------------
- TreleaseScanIndicator[Ti] = 1;
- TreleaseInd = 1;
- } else {
- jam();
- }//if
- } else if (TscanPtr.p->scanBucketState == ScanRec::SECOND_LAP) {
- jam();
- //-------------------------------------------------------------
- // We are performing a second lap to handle buckets that was
- // merged during the first lap of scanning. During this second
- // lap we do not allow any splits or merges.
- //-------------------------------------------------------------
- TreturnCode = 1;
- return TreturnCode;
- } else {
- ndbrequire(TscanPtr.p->scanBucketState == ScanRec::SCAN_COMPLETED);
- jam();
- //-------------------------------------------------------------
- // The scan is completed and we can thus go ahead and perform
- // the split.
- //-------------------------------------------------------------
- }//if
- }//if
- }//if
- }//for
- if (TreleaseInd == 1) {
- TreleaseScanBucket = TSplit;
- TDirRangePtr.i = fragrecptr.p->directory;
- TPageIndex = TreleaseScanBucket & ((1 << fragrecptr.p->k) - 1); /* PAGE INDEX OBS K = 6 */
- TDirInd = TreleaseScanBucket >> fragrecptr.p->k; /* DIRECTORY INDEX OBS K = 6 */
- ptrCheckGuard(TDirRangePtr, cdirrangesize, dirRange);
- arrGuard((TDirInd >> 8), 256);
- TDirptr.i = TDirRangePtr.p->dirArray[TDirInd >> 8];
- ptrCheckGuard(TDirptr, cdirarraysize, directoryarray);
- TPageptr.i = TDirptr.p->pagep[TDirInd & 0xff];
- ptrCheckGuard(TPageptr, cpagesize, page8);
- for (Ti = 0; Ti < 4; Ti++) {
- if (TreleaseScanIndicator[Ti] == 1) {
- jam();
- scanPtr.i = Trootfragrecptr.p->scan[Ti];
- ptrCheckGuard(scanPtr, cscanRecSize, scanRec);
- rsbPageidptr = TPageptr;
- trsbPageindex = TPageIndex;
- releaseScanBucket(signal);
- }//if
- }//for
- }//if
- return TreturnCode;
- }//Dbacc::checkScanExpand()
- void Dbacc::execEXPANDCHECK2(Signal* signal)
- {
- jamEntry();
- if(refToBlock(signal->getSendersBlockRef()) == DBLQH){
- jam();
- reenable_expand_after_redo_log_exection_complete(signal);
- return;
- }
- DirectoryarrayPtr newDirptr;
- fragrecptr.i = signal->theData[0];
- tresult = 0; /* 0= FALSE,1= TRUE,> ZLIMIT_OF_ERROR =ERRORCODE */
- Uint32 tmp = 1;
- tmp = tmp << 31;
- ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- fragrecptr.p->expandFlag = 0;
- if (fragrecptr.p->slack < tmp) {
- jam();
- /* IT MEANS THAT IF SLACK > ZERO */
- /*--------------------------------------------------------------*/
- /* THE SLACK HAS IMPROVED AND IS NOW ACCEPTABLE AND WE */
- /* CAN FORGET ABOUT THE EXPAND PROCESS. */
- /*--------------------------------------------------------------*/
- return;
- }//if
- if (fragrecptr.p->firstOverflowRec == RNIL) {
- jam();
- allocOverflowPage(signal);
- if (tresult > ZLIMIT_OF_ERROR) {
- jam();
- /*--------------------------------------------------------------*/
- /* WE COULD NOT ALLOCATE ANY OVERFLOW PAGE. THUS WE HAVE TO STOP*/
- /* THE EXPAND SINCE WE CANNOT GUARANTEE ITS COMPLETION. */
- /*--------------------------------------------------------------*/
- return;
- }//if
- }//if
- if (cfirstfreepage == RNIL) {
- if (cfreepage >= cpagesize) {
- jam();
- /*--------------------------------------------------------------*/
- /* WE HAVE TO STOP THE EXPAND PROCESS SINCE THERE ARE NO FREE */
- /* PAGES. THIS MEANS THAT WE COULD BE FORCED TO CRASH SINCE WE */
- /* CANNOT COMPLETE THE EXPAND. TO AVOID THE CRASH WE EXIT HERE. */
- /*--------------------------------------------------------------*/
- return;
- }//if
- }//if
- if (checkScanExpand(signal) == 1) {
- jam();
- /*--------------------------------------------------------------*/
- // A scan state was inconsistent with performing an expand
- // operation.
- /*--------------------------------------------------------------*/
- return;
- }//if
- if (fragrecptr.p->createLcp == ZTRUE) {
- if (remainingUndoPages() < ZMIN_UNDO_PAGES_AT_EXPAND) {
- jam();
- /*--------------------------------------------------------------*/
- // We did not have enough undo log buffers to start up an
- // expand operation
- /*--------------------------------------------------------------*/
- return;
- }//if
- }//if
- /*--------------------------------------------------------------------------*/
- /* WE START BY FINDING THE PAGE, THE PAGE INDEX AND THE PAGE DIRECTORY*/
- /* OF THE NEW BUCKET WHICH SHALL RECEIVE THE ELEMENT WHICH HAVE A 1 IN*/
- /* THE NEXT HASH BIT. THIS BIT IS USED IN THE SPLIT MECHANISM TO */
- /* DECIDE WHICH ELEMENT GOES WHERE. */
- /*--------------------------------------------------------------------------*/
- expDirRangePtr.i = fragrecptr.p->directory;
- texpReceivedBucket = (fragrecptr.p->maxp + fragrecptr.p->p) + 1; /* RECEIVED BUCKET */
- texpDirInd = texpReceivedBucket >> fragrecptr.p->k;
- newDirptr.i = RNIL;
- ptrNull(newDirptr);
- texpDirRangeIndex = texpDirInd >> 8;
- ptrCheckGuard(expDirRangePtr, cdirrangesize, dirRange);
- arrGuard(texpDirRangeIndex, 256);
- expDirptr.i = expDirRangePtr.p->dirArray[texpDirRangeIndex];
- if (expDirptr.i == RNIL) {
- jam();
- seizeDirectory(signal);
- if (tresult > ZLIMIT_OF_ERROR) {
- jam();
- return;
- } else {
- jam();
- newDirptr = sdDirptr;
- expDirptr = sdDirptr;
- expDirRangePtr.p->dirArray[texpDirRangeIndex] = sdDirptr.i;
- }//if
- } else {
- ptrCheckGuard(expDirptr, cdirarraysize, directoryarray);
- }//if
- texpDirPageIndex = texpDirInd & 0xff;
- expPageptr.i = expDirptr.p->pagep[texpDirPageIndex];
- if (expPageptr.i == RNIL) {
- jam();
- seizePage(signal);
- if (tresult > ZLIMIT_OF_ERROR) {
- jam();
- if (newDirptr.i != RNIL) {
- jam();
- rdDirptr.i = newDirptr.i;
- releaseDirectory(signal);
- }//if
- return;
- }//if
- expDirptr.p->pagep[texpDirPageIndex] = spPageptr.i;
- tipPageId = texpDirInd;
- inpPageptr = spPageptr;
- initPage(signal);
- fragrecptr.p->dirsize++;
- expPageptr = spPageptr;
- } else {
- ptrCheckGuard(expPageptr, cpagesize, page8);
- }//if
- fragrecptr.p->expReceivePageptr = expPageptr.i;
- fragrecptr.p->expReceiveIndex = texpReceivedBucket & ((1 << fragrecptr.p->k) - 1);
- /*--------------------------------------------------------------------------*/
- /* THE NEXT ACTION IS TO FIND THE PAGE, THE PAGE INDEX AND THE PAGE */
- /* DIRECTORY OF THE BUCKET TO BE SPLIT. */
- /*--------------------------------------------------------------------------*/
- expDirRangePtr.i = fragrecptr.p->directory;
- cexcPageindex = fragrecptr.p->p & ((1 << fragrecptr.p->k) - 1); /* PAGE INDEX OBS K = 6 */
- texpDirInd = fragrecptr.p->p >> fragrecptr.p->k; /* DIRECTORY INDEX OBS K = 6 */
- ptrCheckGuard(expDirRangePtr, cdirrangesize, dirRange);
- arrGuard((texpDirInd >> 8), 256);
- expDirptr.i = expDirRangePtr.p->dirArray[texpDirInd >> 8];
- ptrCheckGuard(expDirptr, cdirarraysize, directoryarray);
- excPageptr.i = expDirptr.p->pagep[texpDirInd & 0xff];
- fragrecptr.p->expSenderIndex = cexcPageindex;
- fragrecptr.p->expSenderPageptr = excPageptr.i;
- if (excPageptr.i == RNIL) {
- jam();
- endofexpLab(signal); /* EMPTY BUCKET */
- return;
- }//if
- fragrecptr.p->expReceiveForward = ZTRUE;
- ptrCheckGuard(excPageptr, cpagesize, page8);
- expandcontainer(signal);
- endofexpLab(signal);
- return;
- }//Dbacc::execEXPANDCHECK2()
-
- void Dbacc::endofexpLab(Signal* signal)
- {
- fragrecptr.p->p++;
- fragrecptr.p->slack += fragrecptr.p->maxloadfactor;
- fragrecptr.p->expandCounter++;
- if (fragrecptr.p->p > fragrecptr.p->maxp) {
- jam();
- fragrecptr.p->maxp = (fragrecptr.p->maxp << 1) | 1;
- fragrecptr.p->lhdirbits++;
- fragrecptr.p->hashcheckbit++;
- fragrecptr.p->p = 0;
- }//if
- Uint32 noOfBuckets = (fragrecptr.p->maxp + 1) + fragrecptr.p->p;
- Uint32 Thysteres = fragrecptr.p->maxloadfactor - fragrecptr.p->minloadfactor;
- fragrecptr.p->slackCheck = noOfBuckets * Thysteres;
- if (fragrecptr.p->slack > (1u << 31)) {
- jam();
- /* IT MEANS THAT IF SLACK < ZERO */
- /* --------------------------------------------------------------------------------- */
- /* IT IS STILL NECESSARY TO EXPAND THE FRAGMENT EVEN MORE. START IT FROM HERE */
- /* WITHOUT WAITING FOR NEXT COMMIT ON THE FRAGMENT. */
- /* --------------------------------------------------------------------------------- */
- fragrecptr.p->expandFlag = 2;
- signal->theData[0] = fragrecptr.i;
- signal->theData[1] = fragrecptr.p->p;
- signal->theData[2] = fragrecptr.p->maxp;
- sendSignal(cownBlockref, GSN_EXPANDCHECK2, signal, 3, JBB);
- }//if
- return;
- }//Dbacc::endofexpLab()
- void Dbacc::reenable_expand_after_redo_log_exection_complete(Signal* signal){
- tabptr.i = signal->theData[0];
- Uint32 fragId = signal->theData[1];
- ptrCheckGuard(tabptr, ctablesize, tabrec);
- ndbrequire(getrootfragmentrec(signal, rootfragrecptr, fragId));
- #if 0
- ndbout_c("reenable expand check for table %d fragment: %d",
- tabptr.i, fragId);
- #endif
- for (Uint32 i = 0; i < 2; i++) {
- fragrecptr.i = rootfragrecptr.p->fragmentptr[i];
- ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- switch(fragrecptr.p->expandFlag){
- case 0:
- /**
- * Hmm... this means that it's alreay has been reenabled...
- */
- ndbassert(false);
- continue;
- case 1:
- /**
- * Nothing is going on start expand check
- */
- case 2:
- /**
- * A shrink is running, do expand check anyway
- * (to reset expandFlag)
- */
- fragrecptr.p->expandFlag = 2;
- signal->theData[0] = fragrecptr.i;
- signal->theData[1] = fragrecptr.p->p;
- signal->theData[2] = fragrecptr.p->maxp;
- sendSignal(cownBlockref, GSN_EXPANDCHECK2, signal, 3, JBB);
- break;
- }
- }
- }
- void Dbacc::execDEBUG_SIG(Signal* signal)
- {
- jamEntry();
- expPageptr.i = signal->theData[0];
- progError(__LINE__,
- ERR_SR_UNDOLOG);
- return;
- }//Dbacc::execDEBUG_SIG()
- /* --------------------------------------------------------------------------------- */
- /* EXPANDCONTAINER */
- /* INPUT: EXC_PAGEPTR (POINTER TO THE ACTIVE PAGE RECORD) */
- /* CEXC_PAGEINDEX (INDEX OF THE BUCKET). */
- /* */
- /* DESCRIPTION: THE HASH VALUE OF ALL ELEMENTS IN THE CONTAINER WILL BE */
- /* CHECKED. SOME OF THIS ELEMENTS HAVE TO MOVE TO THE NEW CONTAINER */
- /* --------------------------------------------------------------------------------- */
- void Dbacc::expandcontainer(Signal* signal)
- {
- Uint32 texcHashvalue;
- Uint32 texcTmp;
- Uint32 texcIndex;
- Uint32 texpKeyLen;
- Uint32 guard20;
- texpKeyLen = fragrecptr.p->keyLength;
- if (texpKeyLen == 0) {
- jam();
- texpKeyLen = ZACTIVE_LONG_KEY_LEN;
- }//if
- cexcPrevpageptr = RNIL;
- cexcPrevconptr = 0;
- cexcForward = ZTRUE;
- EXP_CONTAINER_LOOP:
- cexcContainerptr = (cexcPageindex << ZSHIFT_PLUS) - (cexcPageindex << ZSHIFT_MINUS);
- if (cexcForward == ZTRUE) {
- jam();
- cexcContainerptr = cexcContainerptr + ZHEAD_SIZE;
- cexcElementptr = cexcContainerptr + ZCON_HEAD_SIZE;
- } else {
- jam();
- cexcContainerptr = ((cexcContainerptr + ZHEAD_SIZE) + ZBUF_SIZE) - ZCON_HEAD_SIZE;
- cexcElementptr = cexcContainerptr - 1;
- }//if
- arrGuard(cexcContainerptr, 2048);
- cexcContainerhead = excPageptr.p->word32[cexcContainerptr];
- cexcContainerlen = cexcContainerhead >> 26;
- cexcMovedLen = ZCON_HEAD_SIZE;
- if (cexcContainerlen <= ZCON_HEAD_SIZE) {
- ndbrequire(cexcContainerlen >= ZCON_HEAD_SIZE);
- jam();
- goto NEXT_ELEMENT;
- }//if
- NEXT_ELEMENT_LOOP:
- idrOperationRecPtr.i = RNIL;
- ptrNull(idrOperationRecPtr);
- /* --------------------------------------------------------------------------------- */
- /* CEXC_PAGEINDEX PAGE INDEX OF CURRENT CONTAINER BEING EXAMINED. */
- /* CEXC_CONTAINERPTR INDEX OF CURRENT CONTAINER BEING EXAMINED. */
- /* CEXC_ELEMENTPTR INDEX OF CURRENT ELEMENT BEING EXAMINED. */
- /* EXC_PAGEPTR PAGE WHERE CURRENT ELEMENT RESIDES. */
- /* CEXC_PREVPAGEPTR PAGE OF PREVIOUS CONTAINER. */
- /* CEXC_PREVCONPTR INDEX OF PREVIOUS CONTAINER */
- /* CEXC_FORWARD DIRECTION OF CURRENT CONTAINER */
- /* --------------------------------------------------------------------------------- */
- arrGuard(cexcElementptr, 2048);
- tidrElemhead = excPageptr.p->word32[cexcElementptr];
- if (ElementHeader::getUnlocked(tidrElemhead)){
- jam();
- texcHashvalue = ElementHeader::getHashValuePart(tidrElemhead);
- } else {
- jam();
- idrOperationRecPtr.i = ElementHeader::getOpPtrI(tidrElemhead);
- ptrCheckGuard(idrOperationRecPtr, coprecsize, operationrec);
- texcHashvalue = idrOperationRecPtr.p->hashvaluePart;
- if ((fragrecptr.p->createLcp == ZTRUE) &&
- (((texcHashvalue >> fragrecptr.p->hashcheckbit) & 1) != 0)) {
- jam();
- /* --------------------------------------------------------------------------------- */
- // During local checkpoints we must ensure that we restore the element header in
- // unlocked state and with the hash value part there with tuple status zeroed.
- // Otherwise a later insert over the same element will write an UNDO log that will
- // ensure that the now removed element is restored together with its locked element
- // header and without the hash value part.
- /* --------------------------------------------------------------------------------- */
- const Uint32 hv = idrOperationRecPtr.p->hashvaluePart;
- const Uint32 eh = ElementHeader::setUnlocked(hv, 0);
- excPageptr.p->word32[cexcElementptr] = eh;
- }//if
- }//if
- if (((texcHashvalue >> fragrecptr.p->hashcheckbit) & 1) == 0) {
- jam();
- /* --------------------------------------------------------------------------------- */
- /* THIS ELEMENT IS NOT TO BE MOVED. WE CALCULATE THE WHEREABOUTS OF THE NEXT */
- /* ELEMENT AND PROCEED WITH THAT OR END THE SEARCH IF THERE ARE NO MORE */
- /* ELEMENTS IN THIS CONTAINER. */
- /* --------------------------------------------------------------------------------- */
- goto NEXT_ELEMENT;
- }//if
- /* --------------------------------------------------------------------------------- */
- /* THE HASH BIT WAS SET AND WE SHALL MOVE THIS ELEMENT TO THE NEW BUCKET. */
- /* WE START BY READING THE ELEMENT TO BE ABLE TO INSERT IT INTO THE NEW BUCKET.*/
- /* THEN WE INSERT THE ELEMENT INTO THE NEW BUCKET. THE NEXT STEP IS TO DELETE */
- /* THE ELEMENT FROM THIS BUCKET. THIS IS PERFORMED BY REPLACING IT WITH THE */
- /* LAST ELEMENT IN THE BUCKET. IF THIS ELEMENT IS TO BE MOVED WE MOVE IT AND */
- /* GET THE LAST ELEMENT AGAIN UNTIL WE EITHER FIND ONE THAT STAYS OR THIS */
- /* ELEMENT IS THE LAST ELEMENT. */
- /* --------------------------------------------------------------------------------- */
- texcTmp = cexcElementptr + cexcForward;
- guard20 = fragrecptr.p->localkeylen - 1;
- for (texcIndex = 0; texcIndex <= guard20; texcIndex++) {
- arrGuard(texcIndex, 2);
- arrGuard(texcTmp, 2048);
- clocalkey[texcIndex] = excPageptr.p->word32[texcTmp];
- texcTmp = texcTmp + cexcForward;
- }//for
- guard20 = texpKeyLen - 1;
- for (texcIndex = 0; texcIndex <= guard20; texcIndex++) {
- arrGuard(texcIndex, 2048);
- arrGuard(texcTmp, 2048);
- ckeys[texcIndex] = excPageptr.p->word32[texcTmp];
- texcTmp = texcTmp + cexcForward;
- }//for
- tidrPageindex = fragrecptr.p->expReceiveIndex;
- idrPageptr.i = fragrecptr.p->expReceivePageptr;
- ptrCheckGuard(idrPageptr, cpagesize, page8);
- tidrForward = fragrecptr.p->expReceiveForward;
- tidrKeyLen = texpKeyLen;
- insertElement(signal);
- fragrecptr.p->expReceiveIndex = tidrPageindex;
- fragrecptr.p->expReceivePageptr = idrPageptr.i;
- fragrecptr.p->expReceiveForward = tidrForward;
- REMOVE_LAST_LOOP:
- jam();
- lastPageptr.i = excPageptr.i;
- lastPageptr.p = excPageptr.p;
- tlastContainerptr = cexcContainerptr;
- lastPrevpageptr.i = cexcPrevpageptr;
- ptrCheck(lastPrevpageptr, cpagesize, page8);
- tlastPrevconptr = cexcPrevconptr;
- arrGuard(tlastContainerptr, 2048);
- tlastContainerhead = lastPageptr.p->word32[tlastContainerptr];
- tlastContainerlen = tlastContainerhead >> 26;
- tlastForward = cexcForward;
- tlastPageindex = cexcPageindex;
- getLastAndRemove(signal);
- if (excPageptr.i == lastPageptr.i) {
- if (cexcElementptr == tlastElementptr) {
- jam();
- /* --------------------------------------------------------------------------------- */
- /* THE CURRENT ELEMENT WAS ALSO THE LAST ELEMENT. */
- /* --------------------------------------------------------------------------------- */
- return;
- }//if
- }//if
- /* --------------------------------------------------------------------------------- */
- /* THE CURRENT ELEMENT WAS NOT THE LAST ELEMENT. IF THE LAST ELEMENT SHOULD */
- /* STAY WE COPY IT TO THE POSITION OF THE CURRENT ELEMENT, OTHERWISE WE INSERT */
- /* INTO THE NEW BUCKET, REMOVE IT AND TRY WITH THE NEW LAST ELEMENT. */
- /* --------------------------------------------------------------------------------- */
- idrOperationRecPtr.i = RNIL;
- ptrNull(idrOperationRecPtr);
- arrGuard(tlastElementptr, 2048);
- tidrElemhead = lastPageptr.p->word32[tlastElementptr];
- if (ElementHeader::getUnlocked(tidrElemhead)) {
- jam();
- texcHashvalue = ElementHeader::getHashValuePart(tidrElemhead);
- } else {
- jam();
- idrOperationRecPtr.i = ElementHeader::getOpPtrI(tidrElemhead);
- ptrCheckGuard(idrOperationRecPtr, coprecsize, operationrec);
- texcHashvalue = idrOperationRecPtr.p->hashvaluePart;
- if ((fragrecptr.p->createLcp == ZTRUE) &&
- (((texcHashvalue >> fragrecptr.p->hashcheckbit) & 1) != 0)) {
- jam();
- /* --------------------------------------------------------------------------------- */
- // During local checkpoints we must ensure that we restore the element header in
- // unlocked state and with the hash value part there with tuple status zeroed.
- // Otherwise a later insert over the same element will write an UNDO log that will
- // ensure that the now removed element is restored together with its locked element
- // header and without the hash value part.
- /* --------------------------------------------------------------------------------- */
- const Uint32 hv = idrOperationRecPtr.p->hashvaluePart;
- const Uint32 eh = ElementHeader::setUnlocked(hv, 0);
- lastPageptr.p->word32[tlastElementptr] = eh;
- }//if
- }//if
- if (((texcHashvalue >> fragrecptr.p->hashcheckbit) & 1) == 0) {
- jam();
- /* --------------------------------------------------------------------------------- */
- /* THE LAST ELEMENT IS NOT TO BE MOVED. WE COPY IT TO THE CURRENT ELEMENT. */
- /* --------------------------------------------------------------------------------- */
- delPageptr = excPageptr;
- tdelContainerptr = cexcContainerptr;
- tdelForward = cexcForward;
- tdelElementptr = cexcElementptr;
- deleteElement(signal);
- } else {
- jam();
- /* --------------------------------------------------------------------------------- */
- /* THE LAST ELEMENT IS ALSO TO BE MOVED. */
- /* --------------------------------------------------------------------------------- */
- texcTmp = tlastElementptr + tlastForward;
- for (texcIndex = 0; texcIndex < fragrecptr.p->localkeylen; texcIndex++) {
- arrGuard(texcIndex, 2);
- arrGuard(texcTmp, 2048);
- clocalkey[texcIndex] = lastPageptr.p->word32[texcTmp];
- texcTmp = texcTmp + tlastForward;
- }//for
- for (texcIndex = 0; texcIndex < texpKeyLen; texcIndex++) {
- arrGuard(texcIndex, 2048);
- arrGuard(texcTmp, 2048);
- ckeys[texcIndex] = lastPageptr.p->word32[texcTmp];
- texcTmp = texcTmp + tlastForward;
- }//for
- tidrPageindex = fragrecptr.p->expReceiveIndex;
- idrPageptr.i = fragrecptr.p->expReceivePageptr;
- ptrCheckGuard(idrPageptr, cpagesize, page8);
- tidrForward = fragrecptr.p->expReceiveForward;
- tidrKeyLen = texpKeyLen;
- insertElement(signal);
- fragrecptr.p->expReceiveIndex = tidrPageindex;
- fragrecptr.p->expReceivePageptr = idrPageptr.i;
- fragrecptr.p->expReceiveForward = tidrForward;
- goto REMOVE_LAST_LOOP;
- }//if
- NEXT_ELEMENT:
- arrGuard(cexcContainerptr, 2048);
- cexcContainerhead = excPageptr.p->word32[cexcContainerptr];
- cexcMovedLen = cexcMovedLen + fragrecptr.p->elementLength;
- if ((cexcContainerhead >> 26) > cexcMovedLen) {
- jam();
- /* --------------------------------------------------------------------------------- */
- /* WE HAVE NOT YET MOVED THE COMPLETE CONTAINER. WE PROCEED WITH THE NEXT */
- /* ELEMENT IN THE CONTAINER. IT IS IMPORTANT TO READ THE CONTAINER LENGTH */
- /* FROM THE CONTAINER HEADER SINCE IT MIGHT CHANGE BY REMOVING THE LAST */
- /* ELEMENT IN THE BUCKET. */
- /* --------------------------------------------------------------------------------- */
- cexcElementptr = cexcElementptr + (cexcForward * fragrecptr.p->elementLength);
- goto NEXT_ELEMENT_LOOP;
- }//if
- if (((cexcContainerhead >> 7) & 3) != 0) {
- jam();
- /* --------------------------------------------------------------------------------- */
- /* WE PROCEED TO THE NEXT CONTAINER IN THE BUCKET. */
- /* --------------------------------------------------------------------------------- */
- cexcPrevpageptr = excPageptr.i;
- cexcPrevconptr = cexcContainerptr;
- nextcontainerinfoExp(signal);
- goto EXP_CONTAINER_LOOP;
- }//if
- }//Dbacc::expandcontainer()
- /* ******************--------------------------------------------------------------- */
- /* SHRINKCHECK JOIN BUCKET ORD */
- /* SENDER: ACC, LEVEL B */
- /* INPUT: FRAGRECPTR, POINTS TO A FRAGMENT RECORD. */
- /* DESCRIPTION: TWO BUCKET OF A FRAGMENT PAGE WILL BE JOINED TOGETHER */
- /* ACCORDING TO LH3. */
- /* ******************--------------------------------------------------------------- */
- /* ******************--------------------------------------------------------------- */
- /* SHRINKCHECK JOIN BUCKET ORD */
- /* ******************------------------------------+ */
- /* SENDER: ACC, LEVEL B */
- /* TWO BUCKETS OF THE FRAGMENT */
- /* WILL BE JOINED ACORDING TO LH3 */
- /* AND COMMIT TRANSACTION PROCESS */
- /* WILL BE CONTINUED */
- Uint32 Dbacc::checkScanShrink(Signal* signal)
- {
- Uint32 Ti;
- Uint32 TreturnCode = 0;
- Uint32 TPageIndex;
- Uint32 TDirInd;
- Uint32 TmergeDest;
- Uint32 TmergeSource;
- Uint32 TreleaseScanBucket;
- Uint32 TreleaseInd = 0;
- Uint32 TreleaseScanIndicator[4];
- DirectoryarrayPtr TDirptr;
- DirRangePtr TDirRangePtr;
- Page8Ptr TPageptr;
- ScanRecPtr TscanPtr;
- RootfragmentrecPtr Trootfragrecptr;
- Trootfragrecptr.i = fragrecptr.p->myroot;
- ptrCheckGuard(Trootfragrecptr, crootfragmentsize, rootfragmentrec);
- if (fragrecptr.p->p == 0) {
- jam();
- TmergeDest = fragrecptr.p->maxp >> 1;
- } else {
- jam();
- TmergeDest = fragrecptr.p->p - 1;
- }//if
- TmergeSource = fragrecptr.p->maxp + fragrecptr.p->p;
- for (Ti = 0; Ti < 4; Ti++) {
- TreleaseScanIndicator[Ti] = 0;
- if (Trootfragrecptr.p->scan[Ti] != RNIL) {
- TscanPtr.i = Trootfragrecptr.p->scan[Ti];
- ptrCheckGuard(TscanPtr, cscanRecSize, scanRec);
- if (TscanPtr.p->activeLocalFrag == fragrecptr.i) {
- //-------------------------------------------------------------
- // A scan is ongoing on this particular local fragment. We have
- // to check its current state.
- //-------------------------------------------------------------
- if (TscanPtr.p->scanBucketState == ScanRec::FIRST_LAP) {
- jam();
- if ((TmergeDest == TscanPtr.p->nextBucketIndex) ||
- (TmergeSource == TscanPtr.p->nextBucketIndex)) {
- jam();
- //-------------------------------------------------------------
- // We are currently scanning one of the buckets involved in the
- // merge. We cannot merge while simultaneously performing a scan.
- // We have to pass this offer for merging the buckets.
- //-------------------------------------------------------------
- TreturnCode = 1;
- return TreturnCode;
- } else if (TmergeDest < TscanPtr.p->nextBucketIndex) {
- jam();
- TreleaseScanIndicator[Ti] = 1;
- TreleaseInd = 1;
- }//if
- } else if (TscanPtr.p->scanBucketState == ScanRec::SECOND_LAP) {
- jam();
- //-------------------------------------------------------------
- // We are performing a second lap to handle buckets that was
- // merged during the first lap of scanning. During this second
- // lap we do not allow any splits or merges.
- //-------------------------------------------------------------
- TreturnCode = 1;
- return TreturnCode;
- } else if (TscanPtr.p->scanBucketState == ScanRec::SCAN_COMPLETED) {
- jam();
- //-------------------------------------------------------------
- // The scan is completed and we can thus go ahead and perform
- // the split.
- //-------------------------------------------------------------
- } else {
- jam();
- sendSystemerror(signal);
- return TreturnCode;
- }//if
- }//if
- }//if
- }//for
- if (TreleaseInd == 1) {
- jam();
- TreleaseScanBucket = TmergeSource;
- TDirRangePtr.i = fragrecptr.p->directory;
- TPageIndex = TreleaseScanBucket & ((1 << fragrecptr.p->k) - 1); /* PAGE INDEX OBS K = 6 */
- TDirInd = TreleaseScanBucket >> fragrecptr.p->k; /* DIRECTORY INDEX OBS K = 6 */
- ptrCheckGuard(TDirRangePtr, cdirrangesize, dirRange);
- arrGuard((TDirInd >> 8), 256);
- TDirptr.i = TDirRangePtr.p->dirArray[TDirInd >> 8];
- ptrCheckGuard(TDirptr, cdirarraysize, directoryarray);
- TPageptr.i = TDirptr.p->pagep[TDirInd & 0xff];
- ptrCheckGuard(TPageptr, cpagesize, page8);
- for (Ti = 0; Ti < 4; Ti++) {
- if (TreleaseScanIndicator[Ti] == 1) {
- jam();
- scanPtr.i = Trootfragrecptr.p->scan[Ti];
- ptrCheckGuard(scanPtr, cscanRecSize, scanRec);
- rsbPageidptr.i = TPageptr.i;
- rsbPageidptr.p = TPageptr.p;
- trsbPageindex = TPageIndex;
- releaseScanBucket(signal);
- if (TmergeDest < scanPtr.p->minBucketIndexToRescan) {
- jam();
- //-------------------------------------------------------------
- // We have to keep track of the starting bucket to Rescan in the
- // second lap.
- //-------------------------------------------------------------
- scanPtr.p->minBucketIndexToRescan = TmergeDest;
- }//if
- if (TmergeDest > scanPtr.p->maxBucketIndexToRescan) {
- jam();
- //-------------------------------------------------------------
- // We have to keep track of the ending bucket to Rescan in the
- // second lap.
- //-------------------------------------------------------------
- scanPtr.p->maxBucketIndexToRescan = TmergeDest;
- }//if
- }//if
- }//for
- }//if
- return TreturnCode;
- }//Dbacc::checkScanShrink()
- void Dbacc::execSHRINKCHECK2(Signal* signal)
- {
- Uint32 tshrTmp1;
- jamEntry();
- fragrecptr.i = signal->theData[0];
- Uint32 oldFlag = signal->theData[3];
- ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- fragrecptr.p->expandFlag = oldFlag;
- tresult = 0; /* 0= FALSE,1= TRUE,> ZLIMIT_OF_ERROR =ERRORCODE */
- if (fragrecptr.p->slack <= fragrecptr.p->slackCheck) {
- jam();
- /* TIME FOR JOIN BUCKETS PROCESS */
- /*--------------------------------------------------------------*/
- /* NO LONGER NECESSARY TO SHRINK THE FRAGMENT. */
- /*--------------------------------------------------------------*/
- return;
- }//if
- if (fragrecptr.p->slack > (1u << 31)) {
- jam();
- /*--------------------------------------------------------------*/
- /* THE SLACK IS NEGATIVE, IN THIS CASE WE WILL NOT NEED ANY */
- /* SHRINK. */
- /*--------------------------------------------------------------*/
- return;
- }//if
- texpDirInd = (fragrecptr.p->maxp + fragrecptr.p->p) >> fragrecptr.p->k;
- if (((fragrecptr.p->maxp + fragrecptr.p->p) & ((1 << fragrecptr.p->k) - 1)) == 0) {
- if (fragrecptr.p->createLcp == ZTRUE) {
- if (fragrecptr.p->fragState == LCP_SEND_PAGES) {
- if (fragrecptr.p->lcpMaxDirIndex > texpDirInd) {
- if (fragrecptr.p->lcpDirIndex <= texpDirInd) {
- jam();
- /*--------------------------------------------------------------*/
- /* WE DO NOT ALLOW ANY SHRINKS THAT REMOVE PAGES THAT ARE */
- /* NEEDED AS PART OF THE LOCAL CHECKPOINT. */
- /*--------------------------------------------------------------*/
- return;
- }//if
- }//if
- }//if
- }//if
- }//if
- if (fragrecptr.p->firstOverflowRec == RNIL) {
- jam();
- allocOverflowPage(signal);
- if (tresult > ZLIMIT_OF_ERROR) {
- jam();
- return;
- }//if
- }//if
- if (cfirstfreepage == RNIL) {
- if (cfreepage >= cpagesize) {
- jam();
- /*--------------------------------------------------------------*/
- /* WE HAVE TO STOP THE SHRINK PROCESS SINCE THERE ARE NO FREE */
- /* PAGES. THIS MEANS THAT WE COULD BE FORCED TO CRASH SINCE WE */
- /* CANNOT COMPLETE THE SHRINK. TO AVOID THE CRASH WE EXIT HERE. */
- /*--------------------------------------------------------------*/
- return;
- }//if
- }//if
- if (checkScanShrink(signal) == 1) {
- jam();
- /*--------------------------------------------------------------*/
- // A scan state was inconsistent with performing a shrink
- // operation.
- /*--------------------------------------------------------------*/
- return;
- }//if
- if (fragrecptr.p->createLcp == ZTRUE) {
- if (remainingUndoPages() < ZMIN_UNDO_PAGES_AT_EXPAND) {
- jam();
- /*--------------------------------------------------------------*/
- // We did not have enough undo log buffers to start up an
- // shrink operation
- /*--------------------------------------------------------------*/
- return;
- }//if
- }//if
- if (fragrecptr.p->p == 0) {
- jam();
- fragrecptr.p->maxp = fragrecptr.p->maxp >> 1;
- fragrecptr.p->p = fragrecptr.p->maxp;
- fragrecptr.p->lhdirbits--;
- fragrecptr.p->hashcheckbit--;
- } else {
- jam();
- fragrecptr.p->p--;
- }//if
- /*--------------------------------------------------------------------------*/
- /* WE START BY FINDING THE NECESSARY INFORMATION OF THE BUCKET TO BE */
- /* REMOVED WHICH WILL SEND ITS ELEMENTS TO THE RECEIVING BUCKET. */
- /*--------------------------------------------------------------------------*/
- expDirRangePtr.i = fragrecptr.p->directory;
- cexcPageindex = ((fragrecptr.p->maxp + fragrecptr.p->p) + 1) & ((1 << fragrecptr.p->k) - 1);
- texpDirInd = ((fragrecptr.p->maxp + fragrecptr.p->p) + 1) >> fragrecptr.p->k;
- texpDirRangeIndex = texpDirInd >> 8;
- texpDirPageIndex = texpDirInd & 0xff;
- ptrCheckGuard(expDirRangePtr, cdirrangesize, dirRange);
- arrGuard(texpDirRangeIndex, 256);
- expDirptr.i = expDirRangePtr.p->dirArray[texpDirRangeIndex];
- ptrCheckGuard(expDirptr, cdirarraysize, directoryarray);
- excPageptr.i = expDirptr.p->pagep[texpDirPageIndex];
- fragrecptr.p->expSenderDirptr = expDirptr.i;
- fragrecptr.p->expSenderIndex = cexcPageindex;
- fragrecptr.p->expSenderPageptr = excPageptr.i;
- fragrecptr.p->expSenderDirIndex = texpDirInd;
- /*--------------------------------------------------------------------------*/
- /* WE NOW PROCEED BY FINDING THE NECESSARY INFORMATION ABOUT THE */
- /* RECEIVING BUCKET. */
- /*--------------------------------------------------------------------------*/
- expDirRangePtr.i = fragrecptr.p->directory;
- texpReceivedBucket = fragrecptr.p->p >> fragrecptr.p->k;
- ptrCheckGuard(expDirRangePtr, cdirrangesize, dirRange);
- arrGuard((texpReceivedBucket >> 8), 256);
- expDirptr.i = expDirRangePtr.p->dirArray[texpReceivedBucket >> 8];
- ptrCheckGuard(expDirptr, cdirarraysize, directoryarray);
- fragrecptr.p->expReceivePageptr = expDirptr.p->pagep[texpReceivedBucket & 0xff];
- fragrecptr.p->expReceiveIndex = fragrecptr.p->p & ((1 << fragrecptr.p->k) - 1);
- fragrecptr.p->expReceiveForward = ZTRUE;
- if (excPageptr.i == RNIL) {
- jam();
- endofshrinkbucketLab(signal); /* EMPTY BUCKET */
- return;
- }//if
- /*--------------------------------------------------------------------------*/
- /* INITIALISE THE VARIABLES FOR THE SHRINK PROCESS. */
- /*--------------------------------------------------------------------------*/
- ptrCheckGuard(excPageptr, cpagesize, page8);
- cexcForward = ZTRUE;
- cexcContainerptr = (cexcPageindex << ZSHIFT_PLUS) - (cexcPageindex << ZSHIFT_MINUS);
- cexcContainerptr = cexcContainerptr + ZHEAD_SIZE;
- arrGuard(cexcContainerptr, 2048);
- cexcContainerhead = excPageptr.p->word32[cexcContainerptr];
- cexcContainerlen = cexcContainerhead >> 26;
- if (cexcContainerlen <= ZCON_HEAD_SIZE) {
- ndbrequire(cexcContainerlen == ZCON_HEAD_SIZE);
- } else {
- jam();
- shrinkcontainer(signal);
- }//if
- /*--------------------------------------------------------------------------*/
- /* THIS CONTAINER IS NOT YET EMPTY AND WE REMOVE ALL THE ELEMENTS. */
- /*--------------------------------------------------------------------------*/
- if (((cexcContainerhead >> 10) & 1) == 1) {
- jam();
- rlPageptr = excPageptr;
- trlPageindex = cexcPageindex;
- trlRelCon = ZFALSE;
- turlIndex = cexcContainerptr + (ZBUF_SIZE - ZCON_HEAD_SIZE);
- releaseRightlist(signal);
- }//if
- tshrTmp1 = ZCON_HEAD_SIZE;
- tshrTmp1 = tshrTmp1 << 26;
- if (fragrecptr.p->createLcp == ZTRUE) {
- jam();
- datapageptr.p = excPageptr.p;
- cundoinfolength = 1;
- cundoElemIndex = cexcContainerptr;
- undoWritingProcess(signal);
- }//if
- dbgWord32(excPageptr, cexcContainerptr, tshrTmp1);
- arrGuard(cexcContainerptr, 2048);
- excPageptr.p->word32[cexcContainerptr] = tshrTmp1;
- if (((cexcContainerhead >> 7) & 0x3) == 0) {
- jam();
- endofshrinkbucketLab(signal);
- return;
- }//if
- nextcontainerinfoExp(signal);
- do {
- cexcContainerptr = (cexcPageindex << ZSHIFT_PLUS) - (cexcPageindex << ZSHIFT_MINUS);
- if (cexcForward == ZTRUE) {
- jam();
- cexcContainerptr = cexcContainerptr + ZHEAD_SIZE;
- } else {
- jam();
- cexcContainerptr = ((cexcContainerptr + ZHEAD_SIZE) + ZBUF_SIZE) - ZCON_HEAD_SIZE;
- }//if
- arrGuard(cexcContainerptr, 2048);
- cexcContainerhead = excPageptr.p->word32[cexcContainerptr];
- cexcContainerlen = cexcContainerhead >> 26;
- ndbrequire(cexcContainerlen > ZCON_HEAD_SIZE);
- /*--------------------------------------------------------------------------*/
- /* THIS CONTAINER IS NOT YET EMPTY AND WE REMOVE ALL THE ELEMENTS. */
- /*--------------------------------------------------------------------------*/
- shrinkcontainer(signal);
- cexcPrevpageptr = excPageptr.i;
- cexcPrevpageindex = cexcPageindex;
- cexcPrevforward = cexcForward;
- if (((cexcContainerhead >> 7) & 0x3) != 0) {
- jam();
- /*--------------------------------------------------------------------------*/
- /* WE MUST CALL THE NEXT CONTAINER INFO ROUTINE BEFORE WE RELEASE THE */
- /* CONTAINER SINCE THE RELEASE WILL OVERWRITE THE NEXT POINTER. */
- /*--------------------------------------------------------------------------*/
- nextcontainerinfoExp(signal);
- }//if
- rlPageptr.i = cexcPrevpageptr;
- ptrCheckGuard(rlPageptr, cpagesize, page8);
- trlPageindex = cexcPrevpageindex;
- if (cexcPrevforward == ZTRUE) {
- jam();
- if (((cexcContainerhead >> 10) & 1) == 1) {
- jam();
- trlRelCon = ZFALSE;
- turlIndex = cexcContainerptr + (ZBUF_SIZE - ZCON_HEAD_SIZE);
- releaseRightlist(signal);
- }//if
- trlRelCon = ZTRUE;
- tullIndex = cexcContainerptr;
- releaseLeftlist(signal);
- } else {
- jam();
- if (((cexcContainerhead >> 10) & 1) == 1) {
- jam();
- trlRelCon = ZFALSE;
- tullIndex = cexcContainerptr - (ZBUF_SIZE - ZCON_HEAD_SIZE);
- releaseLeftlist(signal);
- }//if
- trlRelCon = ZTRUE;
- turlIndex = cexcContainerptr;
- releaseRightlist(signal);
- }//if
- } while (((cexcContainerhead >> 7) & 0x3) != 0);
- endofshrinkbucketLab(signal);
- return;
- }//Dbacc::execSHRINKCHECK2()
- void Dbacc::endofshrinkbucketLab(Signal* signal)
- {
- fragrecptr.p->expandCounter--;
- fragrecptr.p->slack -= fragrecptr.p->maxloadfactor;
- if (fragrecptr.p->expSenderIndex == 0) {
- jam();
- fragrecptr.p->dirsize--;
- if (fragrecptr.p->expSenderPageptr != RNIL) {
- jam();
- rpPageptr.i = fragrecptr.p->expSenderPageptr;
- ptrCheckGuard(rpPageptr, cpagesize, page8);
- releasePage(signal);
- expDirptr.i = fragrecptr.p->expSenderDirptr;
- ptrCheckGuard(expDirptr, cdirarraysize, directoryarray);
- expDirptr.p->pagep[fragrecptr.p->expSenderDirIndex & 0xff] = RNIL;
- }//if
- if (((((fragrecptr.p->p + fragrecptr.p->maxp) + 1) >> fragrecptr.p->k) & 0xff) == 0) {
- jam();
- rdDirptr.i = fragrecptr.p->expSenderDirptr;
- releaseDirectory(signal);
- expDirRangePtr.i = fragrecptr.p->directory;
- ptrCheckGuard(expDirRangePtr, cdirrangesize, dirRange);
- arrGuard((fragrecptr.p->expSenderDirIndex >> 8), 256);
- expDirRangePtr.p->dirArray[fragrecptr.p->expSenderDirIndex >> 8] = RNIL;
- }//if
- }//if
- if (fragrecptr.p->slack < (1u << 31)) {
- jam();
- /*--------------------------------------------------------------*/
- /* THE SLACK IS POSITIVE, IN THIS CASE WE WILL CHECK WHETHER */
- /* WE WILL CONTINUE PERFORM ANOTHER SHRINK. */
- /*--------------------------------------------------------------*/
- Uint32 noOfBuckets = (fragrecptr.p->maxp + 1) + fragrecptr.p->p;
- Uint32 Thysteresis = fragrecptr.p->maxloadfactor - fragrecptr.p->minloadfactor;
- fragrecptr.p->slackCheck = noOfBuckets * Thysteresis;
- if (fragrecptr.p->slack > Thysteresis) {
- /*--------------------------------------------------------------*/
- /* IT IS STILL NECESSARY TO SHRINK THE FRAGMENT MORE. THIS*/
- /* CAN HAPPEN WHEN A NUMBER OF SHRINKS GET REJECTED */
- /* DURING A LOCAL CHECKPOINT. WE START A NEW SHRINK */
- /* IMMEDIATELY FROM HERE WITHOUT WAITING FOR A COMMIT TO */
- /* START IT. */
- /*--------------------------------------------------------------*/
- if (fragrecptr.p->expandCounter > 0) {
- jam();
- /*--------------------------------------------------------------*/
- /* IT IS VERY IMPORTANT TO NOT TRY TO SHRINK MORE THAN */
- /* WAS EXPANDED. IF MAXP IS SET TO A VALUE BELOW 63 THEN */
- /* WE WILL LOSE RECORDS SINCE GETDIRINDEX CANNOT HANDLE */
- /* SHRINKING BELOW 2^K - 1 (NOW 63). THIS WAS A BUG THAT */
- /* WAS REMOVED 2000-05-12. */
- /*--------------------------------------------------------------*/
- signal->theData[0] = fragrecptr.i;
- signal->theData[1] = fragrecptr.p->p;
- signal->theData[2] = fragrecptr.p->maxp;
- signal->theData[3] = fragrecptr.p->expandFlag;
- ndbrequire(fragrecptr.p->expandFlag < 2);
- fragrecptr.p->expandFlag = 2;
- sendSignal(cownBlockref, GSN_SHRINKCHECK2, signal, 4, JBB);
- }//if
- }//if
- }//if
- ndbrequire(fragrecptr.p->maxp >= (Uint32)((1 << fragrecptr.p->k) - 1));
- return;
- }//Dbacc::endofshrinkbucketLab()
- /* --------------------------------------------------------------------------------- */
- /* SHRINKCONTAINER */
- /* INPUT: EXC_PAGEPTR (POINTER TO THE ACTIVE PAGE RECORD) */
- /* CEXC_CONTAINERLEN (LENGTH OF THE CONTAINER). */
- /* CEXC_CONTAINERPTR (ARRAY INDEX OF THE CONTAINER). */
- /* CEXC_FORWARD (CONTAINER FORWARD (+1) OR BACKWARD (-1)) */
- /* */
- /* DESCRIPTION: ALL ELEMENTS OF THE ACTIVE CONTAINER HAVE TO MOVE TO THE NEW */
- /* CONTAINER. */
- /* --------------------------------------------------------------------------------- */
- void Dbacc::shrinkcontainer(Signal* signal)
- {
- Uint32 tshrElementptr;
- Uint32 tshrRemLen;
- Uint32 tshrInc;
- Uint32 tshrKeyLen;
- Uint32 tshrTmp;
- Uint32 tshrIndex;
- Uint32 guard21;
- tshrRemLen = cexcContainerlen - ZCON_HEAD_SIZE;
- tshrKeyLen = fragrecptr.p->keyLength;
- if (tshrKeyLen == 0) {
- jam();
- tshrKeyLen = ZACTIVE_LONG_KEY_LEN;
- }//if
- tshrInc = (ZELEM_HEAD_SIZE + tshrKeyLen) + fragrecptr.p->localkeylen;
- if (cexcForward == ZTRUE) {
- jam();
- tshrElementptr = cexcContainerptr + ZCON_HEAD_SIZE;
- } else {
- jam();
- tshrElementptr = cexcContainerptr - 1;
- }//if
- SHR_LOOP:
- idrOperationRecPtr.i = RNIL;
- ptrNull(idrOperationRecPtr);
- /* --------------------------------------------------------------------------------- */
- /* THE CODE BELOW IS ALL USED TO PREPARE FOR THE CALL TO INSERT_ELEMENT AND */
- /* HANDLE THE RESULT FROM INSERT_ELEMENT. INSERT_ELEMENT INSERTS THE ELEMENT */
- /* INTO ANOTHER BUCKET. */
- /* --------------------------------------------------------------------------------- */
- arrGuard(tshrElementptr, 2048);
- tidrElemhead = excPageptr.p->word32[tshrElementptr];
- if (ElementHeader::getLocked(tidrElemhead)) {
- jam();
- /* --------------------------------------------------------------------------------- */
- /* IF THE ELEMENT IS LOCKED WE MUST UPDATE THE ELEMENT INFO IN THE OPERATION */
- /* RECORD OWNING THE LOCK. WE DO THIS BY READING THE OPERATION RECORD POINTER */
- /* FROM THE ELEMENT HEADER. */
- /* --------------------------------------------------------------------------------- */
- idrOperationRecPtr.i = ElementHeader::getOpPtrI(tidrElemhead);
- ptrCheckGuard(idrOperationRecPtr, coprecsize, operationrec);
- if (fragrecptr.p->createLcp == ZTRUE) {
- jam();
- /* --------------------------------------------------------------------------------- */
- // During local checkpoints we must ensure that we restore the element header in
- // unlocked state and with the hash value part there with tuple status zeroed.
- // Otherwise a later insert over the same element will write an UNDO log that will
- // ensure that the now removed element is restored together with its locked element
- // header and without the hash value part.
- /* --------------------------------------------------------------------------------- */
- const Uint32 hv = idrOperationRecPtr.p->hashvaluePart;
- const Uint32 eh = ElementHeader::setUnlocked(hv, 0);
- excPageptr.p->word32[tshrElementptr] = eh;
- }//if
- }//if
- tshrTmp = tshrElementptr + cexcForward;
- guard21 = fragrecptr.p->localkeylen - 1;
- for (tshrIndex = 0; tshrIndex <= guard21; tshrIndex++) {
- arrGuard(tshrIndex, 2);
- arrGuard(tshrTmp, 2048);
- clocalkey[tshrIndex] = excPageptr.p->word32[tshrTmp];
- tshrTmp = tshrTmp + cexcForward;
- }//for
- guard21 = tshrKeyLen - 1;
- for (tshrIndex = 0; tshrIndex <= guard21; tshrIndex++) {
- arrGuard(tshrIndex, 2048);
- arrGuard(tshrTmp, 2048);
- ckeys[tshrIndex] = excPageptr.p->word32[tshrTmp];
- tshrTmp = tshrTmp + cexcForward;
- }//for
- tidrPageindex = fragrecptr.p->expReceiveIndex;
- idrPageptr.i = fragrecptr.p->expReceivePageptr;
- ptrCheckGuard(idrPageptr, cpagesize, page8);
- tidrForward = fragrecptr.p->expReceiveForward;
- tidrKeyLen = tshrKeyLen;
- insertElement(signal);
- /* --------------------------------------------------------------------------------- */
- /* TAKE CARE OF RESULT FROM INSERT_ELEMENT. */
- /* --------------------------------------------------------------------------------- */
- fragrecptr.p->expReceiveIndex = tidrPageindex;
- fragrecptr.p->expReceivePageptr = idrPageptr.i;
- fragrecptr.p->expReceiveForward = tidrForward;
- if (tshrRemLen < tshrInc) {
- jam();
- sendSystemerror(signal);
- }//if
- tshrRemLen = tshrRemLen - tshrInc;
- if (tshrRemLen != 0) {
- jam();
- tshrElementptr = tshrTmp;
- goto SHR_LOOP;
- }//if
- }//Dbacc::shrinkcontainer()
- /* --------------------------------------------------------------------------------- */
- /* NEXTCONTAINERINFO_EXP */
- /* DESCRIPTION:THE CONTAINER HEAD WILL BE CHECKED TO CALCULATE INFORMATION */
- /* ABOUT NEXT CONTAINER IN THE BUCKET. */
- /* INPUT: CEXC_CONTAINERHEAD */
- /* CEXC_CONTAINERPTR */
- /* EXC_PAGEPTR */
- /* OUTPUT: */
- /* CEXC_PAGEINDEX (INDEX FROM WHICH PAGE INDEX CAN BE CALCULATED. */
- /* EXC_PAGEPTR (PAGE REFERENCE OF NEXT CONTAINER) */
- /* CEXC_FORWARD */
- /* --------------------------------------------------------------------------------- */
- void Dbacc::nextcontainerinfoExp(Signal* signal)
- {
- tnciNextSamePage = (cexcContainerhead >> 9) & 0x1; /* CHECK BIT FOR CHECKING WHERE */
- /* THE NEXT CONTAINER IS IN THE SAME PAGE */
- cexcPageindex = cexcContainerhead & 0x7f; /* NEXT CONTAINER PAGE INDEX 7 BITS */
- if (((cexcContainerhead >> 7) & 3) == ZLEFT) {
- jam();
- cexcForward = ZTRUE;
- } else if (((cexcContainerhead >> 7) & 3) == ZRIGHT) {
- jam();
- cexcForward = cminusOne;
- } else {
- jam();
- sendSystemerror(signal);
- cexcForward = 0; /* DUMMY FOR COMPILER */
- }//if
- if (tnciNextSamePage == ZFALSE) {
- jam();
- /* NEXT CONTAINER IS IN AN OVERFLOW PAGE */
- arrGuard(cexcContainerptr + 1, 2048);
- tnciTmp = excPageptr.p->word32[cexcContainerptr + 1];
- nciOverflowrangeptr.i = fragrecptr.p->overflowdir;
- ptrCheckGuard(nciOverflowrangeptr, cdirrangesize, dirRange);
- arrGuard((tnciTmp >> 8), 256);
- nciOverflowDirptr.i = nciOverflowrangeptr.p->dirArray[tnciTmp >> 8];
- ptrCheckGuard(nciOverflowDirptr, cdirarraysize, directoryarray);
- excPageptr.i = nciOverflowDirptr.p->pagep[tnciTmp & 0xff];
- ptrCheckGuard(excPageptr, cpagesize, page8);
- }//if
- }//Dbacc::nextcontainerinfoExp()
- /* --------------------------------------------------------------------------------- */
- /* --------------------------------------------------------------------------------- */
- /* --------------------------------------------------------------------------------- */
- /* */
- /* END OF EXPAND/SHRINK MODULE */
- /* */
- /* --------------------------------------------------------------------------------- */
- /* --------------------------------------------------------------------------------- */
- /* --------------------------------------------------------------------------------- */
- /* --------------------------------------------------------------------------------- */
- /* --------------------------------------------------------------------------------- */
- /* */
- /* LOCAL CHECKPOINT MODULE */
- /* */
- /* --------------------------------------------------------------------------------- */
- /* --------------------------------------------------------------------------------- */
- /* ******************--------------------------------------------------------------- */
- /* LCP_FRAGIDREQ */
- /* SENDER: LQH, LEVEL B */
- /* ENTER LCP_FRAGIDREQ WITH */
- /* TUSERPTR LQH CONNECTION PTR */
- /* TUSERBLOCKREF, LQH BLOCK REFERENCE */
- /* TCHECKPOINTID, THE CHECKPOINT NUMBER TO USE */
- /* (E.G. 1,2 OR 3) */
- /* TABPTR, TABLE ID = TABLE RECORD POINTER */
- /* TFID ROOT FRAGMENT ID */
- /* CACTIVE_UNDO_FILE_VERSION UNDO FILE VERSION 0,1,2 OR 3. */
- /* ******************--------------------------------------------------------------- */
- /* ******************--------------------------------------------------------------- */
- /* LCP_FRAGIDREQ REQUEST FOR LIST OF STOPED OPERATION */
- /* ******************------------------------------+ */
- /* SENDER: LQH, LEVEL B */
- void Dbacc::execLCP_FRAGIDREQ(Signal* signal)
- {
- jamEntry();
- tuserptr = signal->theData[0]; /* LQH CONNECTION PTR */
- tuserblockref = signal->theData[1]; /* LQH BLOCK REFERENCE */
- tcheckpointid = signal->theData[2]; /* THE CHECKPOINT NUMBER TO USE */
- /* (E.G. 1,2 OR 3) */
- tabptr.i = signal->theData[3]; /* TABLE ID = TABLE RECORD POINTER */
- ptrCheck(tabptr, ctablesize, tabrec);
- tfid = signal->theData[4]; /* ROOT FRAGMENT ID */
- cactiveUndoFileVersion = signal->theData[5]; /* UNDO FILE VERSION 0,1,2 OR 3. */
- tresult = 0;
- ndbrequire(getrootfragmentrec(signal, rootfragrecptr, tfid));
- ndbrequire(rootfragrecptr.p->rootState == ACTIVEROOT);
- seizeLcpConnectRec(signal);
- initLcpConnRec(signal);
- lcpConnectptr.p->rootrecptr = rootfragrecptr.i;
- rootfragrecptr.p->lcpPtr = lcpConnectptr.i;
- lcpConnectptr.p->localCheckPid = tcheckpointid;
- lcpConnectptr.p->lcpstate = LCP_ACTIVE;
- rootfragrecptr.p->rootState = LCP_CREATION;
- fragrecptr.i = rootfragrecptr.p->fragmentptr[0];
- /* D6 AT FSOPENREQ =#010003FF. */
- tlfrTmp1 = 0x010003ff; /* FILE TYPE = .DATA ,VERSION OF FILENAME = 1 */
- tlfrTmp2 = 0x301; /* D7 CREATE, WRITE ONLY, TRUNCATE TO ZERO */
- ndbrequire(cfsFirstfreeconnect != RNIL);
- seizeFsConnectRec(signal);
- fsConnectptr.p->fragrecPtr = fragrecptr.i;
- fsConnectptr.p->fsState = WAIT_OPEN_DATA_FILE_FOR_WRITE;
- /* ----------- FILENAME (FILESYSTEM)/D3/DBACC/"T"TABID/"F"FRAGID/"S"VERSIONID.DATA ------------ */
- /* ************************ */
- /* FSOPENREQ */
- /* ************************ */
- signal->theData[0] = cownBlockref;
- signal->theData[1] = fsConnectptr.i;
- signal->theData[2] = tabptr.i; /* TABLE IDENTITY */
- signal->theData[3] = rootfragrecptr.p->fragmentid[0]; /* FRAGMENT IDENTITY */
- signal->theData[4] = lcpConnectptr.p->localCheckPid; /* CHECKPOINT ID */
- signal->theData[5] = tlfrTmp1;
- signal->theData[6] = tlfrTmp2;
- sendSignal(NDBFS_REF, GSN_FSOPENREQ, signal, 7, JBA);
- return;
- }//Dbacc::execLCP_FRAGIDREQ()
- /* ******************--------------------------------------------------------------- */
- /* FSOPENCONF OPENFILE CONF */
- /* SENDER: FS, LEVEL B */
- /* ENTER FSOPENCONF WITH */
- /* FS_CONNECTPTR, FS_CONNECTION PTR */
- /* TUSERPOINTER, FILE POINTER */
- /* ******************--------------------------------------------------------------- */
- void Dbacc::lcpFsOpenConfLab(Signal* signal)
- {
- fsConnectptr.p->fsPtr = tuserptr;
- fragrecptr.i = fsConnectptr.p->fragrecPtr;
- ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- rootfragrecptr.i = fragrecptr.p->myroot;
- fragrecptr.p->activeDataFilePage = 1; /* ZERO IS KEPT FOR PAGE_ZERO */
- fragrecptr.p->fsConnPtr = fsConnectptr.i;
- ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
- lcpConnectptr.i = rootfragrecptr.p->lcpPtr;
- ptrCheckGuard(lcpConnectptr, clcpConnectsize, lcpConnectrec);
- if (rootfragrecptr.p->fragmentptr[0] == fragrecptr.i) {
- jam();
- fragrecptr.i = rootfragrecptr.p->fragmentptr[1];
- ptrCheck(fragrecptr, cfragmentsize, fragmentrec);
- /* ----------- FILENAME (FILESYSTEM)/D3/DBACC/"T"TABID/"F"FRAGID/"S"VERSIONID.DATA ------------ */
- /* D6 AT FSOPENREQ =#010003FF. */
- tlfrTmp1 = 0x010003ff; /* FILE TYPE = .DATA ,VERSION OF FILENAME = 1 */
- tlfrTmp2 = 0x301; /* D7 CREATE, WRITE ONLY, TRUNCATE TO ZERO */
- ndbrequire(cfsFirstfreeconnect != RNIL);
- seizeFsConnectRec(signal);
- fsConnectptr.p->fragrecPtr = fragrecptr.i;
- fsConnectptr.p->fsState = WAIT_OPEN_DATA_FILE_FOR_WRITE;
- /* ************************ */
- /* FSOPENREQ */
- /* ************************ */
- signal->theData[0] = cownBlockref;
- signal->theData[1] = fsConnectptr.i;
- signal->theData[2] = rootfragrecptr.p->mytabptr; /* TABLE IDENTITY */
- signal->theData[3] = rootfragrecptr.p->fragmentid[1]; /* FRAGMENT IDENTITY */
- signal->theData[4] = lcpConnectptr.p->localCheckPid; /* CHECKPOINT ID */
- signal->theData[5] = tlfrTmp1;
- signal->theData[6] = tlfrTmp2;
- sendSignal(NDBFS_REF, GSN_FSOPENREQ, signal, 7, JBA);
- return;
- } else {
- ndbrequire(rootfragrecptr.p->fragmentptr[1] == fragrecptr.i);
- }//if
- /*---- BOTH DATA FILES ARE OPEN------*/
- /* ----IF THE UNDO FILE IS CLOSED , OPEN IT.----- */
- if (cactiveOpenUndoFsPtr != RNIL) {
- jam();
- sendLcpFragidconfLab(signal);
- return;
- }//if
- cactiveUndoFilePage = 0;
- cprevUndoaddress = cminusOne;
- cundoposition = 0;
- clastUndoPageIdWritten = 0;
- ndbrequire(cfsFirstfreeconnect != RNIL);
- seizeFsConnectRec(signal);
- fsConnectptr.p->fsState = WAIT_OPEN_UNDO_LCP;
- fsConnectptr.p->fsPart = 0; /* FILE INDEX, SECOND FILE IN THE DIRECTORY */
- cactiveOpenUndoFsPtr = fsConnectptr.i;
- cactiveRootfrag = rootfragrecptr.i;
- tlfrTmp1 = 1; /* FILE VERSION */
- tlfrTmp1 = (tlfrTmp1 << 8) + ZLOCALLOGFILE; /* .LOCLOG = 2 */
- tlfrTmp1 = (tlfrTmp1 << 8) + 4; /* ROOT DIRECTORY = D4 */
- tlfrTmp1 = (tlfrTmp1 << 8) + fsConnectptr.p->fsPart; /* P2 */
- tlfrTmp2 = 0x302; /* D7 CREATE , READ / WRITE , TRUNCATE TO ZERO */
- /* ---FILE NAME "D4"/"DBACC"/LCP_CONNECTPTR:LOCAL_CHECK_PID/FS_CONNECTPTR:FS_PART".LOCLOG-- */
- /* ************************ */
- /* FSOPENREQ */
- /* ************************ */
- signal->theData[0] = cownBlockref;
- signal->theData[1] = fsConnectptr.i;
- signal->theData[2] = cminusOne; /* #FFFFFFFF */
- signal->theData[3] = cminusOne; /* #FFFFFFFF */
- signal->theData[4] = cactiveUndoFileVersion;
- /* A GROUP OF UNDO FILES WHICH ARE UPDATED */
- signal->theData[5] = tlfrTmp1;
- signal->theData[6] = tlfrTmp2;
- sendSignal(NDBFS_REF, GSN_FSOPENREQ, signal, 7, JBA);
- return;
- }//Dbacc::lcpFsOpenConfLab()
- void Dbacc::lcpOpenUndofileConfLab(Signal* signal)
- {
- ptrGuard(fsConnectptr);
- fsConnectptr.p->fsState = WAIT_NOTHING;
- rootfragrecptr.i = cactiveRootfrag;
- ptrCheck(rootfragrecptr, crootfragmentsize, rootfragmentrec);
- fsConnectptr.p->fsPtr = tuserptr;
- sendLcpFragidconfLab(signal);
- return;
- }//Dbacc::lcpOpenUndofileConfLab()
- void Dbacc::sendLcpFragidconfLab(Signal* signal)
- {
- ptrGuard(rootfragrecptr);
- lcpConnectptr.i = rootfragrecptr.p->lcpPtr;
- ptrCheckGuard(lcpConnectptr, clcpConnectsize, lcpConnectrec);
- /* ************************ */
- /* LCP_FRAGIDCONF */
- /* ************************ */
- signal->theData[0] = lcpConnectptr.p->lcpUserptr;
- signal->theData[1] = lcpConnectptr.i;
- signal->theData[2] = 2;
- /* NO OF LOCAL FRAGMENTS */
- signal->theData[3] = rootfragrecptr.p->fragmentid[0];
- signal->theData[4] = rootfragrecptr.p->fragmentid[1];
- signal->theData[5] = RNIL;
- signal->theData[6] = RNIL;
- sendSignal(lcpConnectptr.p->lcpUserblockref, GSN_LCP_FRAGIDCONF, signal, 7, JBB);
- return;
- }//Dbacc::sendLcpFragidconfLab()
- /* ******************--------------------------------------------------------------- */
- /* LCP_HOLDOPERATION REQUEST FOR LIST OF STOPED OPERATION */
- /* SENDER: LQH, LEVEL B */
- /* ENTER LCP_HOLDOPREQ WITH */
- /* LCP_CONNECTPTR CONNECTION POINTER */
- /* TFID, LOCAL FRAGMENT ID */
- /* THOLD_PREV_SENT_OP NR OF SENT OPERATIONS AT */
- /* PREVIOUS SIGNALS */
- /* TLQH_POINTER LQH USER POINTER */
- /* ******************--------------------------------------------------------------- */
- /* ******************--------------------------------------------------------------- */
- /* LCP_HOLDOPERATION REQUEST FOR LIST OF STOPED OPERATION */
- /* ******************------------------------------+ */
- /* SENDER: LQH, LEVEL B */
- void Dbacc::execLCP_HOLDOPREQ(Signal* signal)
- {
- Uint32 tholdPrevSentOp;
- jamEntry();
- lcpConnectptr.i = signal->theData[0]; /* CONNECTION POINTER */
- tfid = signal->theData[1]; /* LOCAL FRAGMENT ID */
- tholdPrevSentOp = signal->theData[2]; /* NR OF SENT OPERATIONS AT */
- /* PREVIOUS SIGNALS */
- tlqhPointer = signal->theData[3]; /* LQH USER POINTER */
- tresult = 0;
- ptrCheckGuard(lcpConnectptr, clcpConnectsize, lcpConnectrec);
- ndbrequire(lcpConnectptr.p->lcpstate == LCP_ACTIVE);
- rootfragrecptr.i = lcpConnectptr.p->rootrecptr;
- ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
- if (rootfragrecptr.p->fragmentid[0] == tfid) {
- jam();
- fragrecptr.i = rootfragrecptr.p->fragmentptr[0];
- ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- } else {
- ndbrequire(rootfragrecptr.p->fragmentid[1] == tfid);
- jam();
- fragrecptr.i = rootfragrecptr.p->fragmentptr[1];
- }//if
- ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- fragrecptr.p->lcpLqhPtr = tlqhPointer;
- if (tholdPrevSentOp != 0) {
- ndbrequire(fragrecptr.p->fragState == SEND_QUE_OP);
- } else if (tholdPrevSentOp == 0) {
- jam();
- fragrecptr.p->fragState = SEND_QUE_OP;
- fragrecptr.p->stopQueOp = ZTRUE;
- fragrecptr.p->sentWaitInQueOp = fragrecptr.p->firstWaitInQueOp;
- }//if
- tholdSentOp = 0; /* NR OF OPERATION WHICH ARE SENT THIS TIME */
- operationRecPtr.i = fragrecptr.p->sentWaitInQueOp;
- /* --------------------------------------------- */
- /* GO THROUGH ALL OPERATION IN THE WAIT */
- /* LIST AND SEND THE LQH CONNECTION PTR OF THE */
- /* OPERATIONS TO THE LQH BLOCK. MAX 23 0PERATION */
- /* PER SIGNAL */
- /* --------------------------------------------- */
- while (operationRecPtr.i != RNIL) {
- jam();
- ptrCheckGuard(operationRecPtr, coprecsize, operationrec);
- ckeys[tholdSentOp] = operationRecPtr.p->userptr;
- operationRecPtr.i = operationRecPtr.p->nextQueOp;
- tholdSentOp++;
- if ((tholdSentOp >= 23) &&
- (operationRecPtr.i != RNIL)) {
- jam();
- /* ----------------------------------------------- */
- /* THERE IS MORE THAN 23 WAIT OPERATION. WE */
- /* HAVE TO SEND THESE 23 AND WAITE FOR NEXT SIGNAL */
- /* ----------------------------------------------- */
- tholdMore = ZTRUE; /* SECOUND DATA AT THE CONF SIGNAL , = MORE */
- fragrecptr.p->sentWaitInQueOp = operationRecPtr.i;
- sendholdconfsignalLab(signal);
- return;
- }//if
- }//while
- /* ----------------------------------------------- */
- /* OPERATION_REC_PTR = RNIL */
- /* THERE IS NO MORE WAITING OPERATION, STATE OF */
- /* THE FRAGMENT RRECORD IS CHANGED AND RETURN */
- /* SIGNAL IS SENT */
- /* ----------------------------------------------- */
- fragrecptr.p->sentWaitInQueOp = RNIL;
- tholdMore = ZFALSE; /* SECOND DATA AT THE CONF SIGNAL , = NOT MORE */
- fragrecptr.p->fragState = WAIT_ACC_LCPREQ;
- sendholdconfsignalLab(signal);
- return;
- }//Dbacc::execLCP_HOLDOPREQ()
- void Dbacc::sendholdconfsignalLab(Signal* signal)
- {
- tholdMore = (tholdMore << 16) + tholdSentOp;
- /* SECOND SIGNAL DATA, LENGTH + MORE */
- /* ************************ */
- /* LCP_HOLDOPCONF */
- /* ************************ */
- signal->theData[0] = fragrecptr.p->lcpLqhPtr;
- signal->theData[1] = tholdMore;
- signal->theData[2] = ckeys[0];
- signal->theData[3] = ckeys[1];
- signal->theData[4] = ckeys[2];
- signal->theData[5] = ckeys[3];
- signal->theData[6] = ckeys[4];
- signal->theData[7] = ckeys[5];
- signal->theData[8] = ckeys[6];
- signal->theData[9] = ckeys[7];
- signal->theData[10] = ckeys[8];
- signal->theData[11] = ckeys[9];
- signal->theData[12] = ckeys[10];
- signal->theData[13] = ckeys[11];
- signal->theData[14] = ckeys[12];
- signal->theData[15] = ckeys[13];
- signal->theData[16] = ckeys[14];
- signal->theData[17] = ckeys[15];
- signal->theData[18] = ckeys[16];
- signal->theData[19] = ckeys[17];
- signal->theData[20] = ckeys[18];
- signal->theData[21] = ckeys[19];
- signal->theData[22] = ckeys[20];
- signal->theData[23] = ckeys[21];
- signal->theData[24] = ckeys[22];
- sendSignal(lcpConnectptr.p->lcpUserblockref, GSN_LCP_HOLDOPCONF, signal, 25, JBA);
- return;
- }//Dbacc::sendholdconfsignalLab()
- /**
- * execACC_LCPREQ
- * Perform local checkpoint of a fragment
- *
- * SENDER: LQH, LEVEL B
- * ENTER ACC_LCPREQ WITH
- * LCP_CONNECTPTR, OPERATION RECORD PTR
- * TLCP_LQH_CHECK_V, LQH'S LOCAL FRAG CHECK VALUE
- * TLCP_LOCAL_FRAG_ID, LOCAL FRAG ID
- *
- */
- void Dbacc::execACC_LCPREQ(Signal* signal)
- {
- Uint32 tlcpLocalFragId;
- Uint32 tlcpLqhCheckV;
- jamEntry();
- lcpConnectptr.i = signal->theData[0]; // CONNECTION PTR
- tlcpLqhCheckV = signal->theData[1]; // LQH'S LOCAL FRAG CHECK VALUE
- tlcpLocalFragId = signal->theData[2]; // LOCAL FRAG ID
- tresult = 0;
- ptrCheckGuard(lcpConnectptr, clcpConnectsize, lcpConnectrec);
- ndbrequire(lcpConnectptr.p->lcpstate == LCP_ACTIVE);
- rootfragrecptr.i = lcpConnectptr.p->rootrecptr;
- ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
- if (rootfragrecptr.p->fragmentid[0] == tlcpLocalFragId) {
- jam();
- fragrecptr.i = rootfragrecptr.p->fragmentptr[0];
- } else {
- ndbrequire(rootfragrecptr.p->fragmentid[1] == tlcpLocalFragId);
- jam();
- fragrecptr.i = rootfragrecptr.p->fragmentptr[1];
- }//if
- ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- ndbrequire(fragrecptr.p->fragState == WAIT_ACC_LCPREQ);
- fragrecptr.p->lcpLqhPtr = tlcpLqhCheckV;
- Page8Ptr zeroPagePtr;
- seizeLcpPage(zeroPagePtr);
- fragrecptr.p->zeroPagePtr = zeroPagePtr.i;
- fragrecptr.p->prevUndoposition = cminusOne;
- initRootFragPageZero(rootfragrecptr, zeroPagePtr);
- initFragPageZero(fragrecptr, zeroPagePtr);
- /*-----------------------------------------------------------------*/
- /* SEIZE ZERO PAGE FIRST AND THEN SEIZE DATA PAGES IN */
- /* BACKWARDS ORDER. THIS IS TO ENSURE THAT WE GET THE PAGES */
- /* IN ORDER. ON WINDOWS NT THIS WILL BE A BENEFIT SINCE WE */
- /* CAN THEN DO 1 WRITE_FILE INSTEAD OF 8. */
- /* WHEN WE RELEASE THE PAGES WE RELEASE THEM IN THE OPPOSITE */
- /* ORDER. */
- /*-----------------------------------------------------------------*/
- for (Uint32 taspTmp = ZWRITEPAGESIZE - 1; (Uint32)~taspTmp; taspTmp--) {
- Page8Ptr dataPagePtr;
- jam();
- ndbrequire(fragrecptr.p->datapages[taspTmp] == RNIL);
- seizeLcpPage(dataPagePtr);
- fragrecptr.p->datapages[taspTmp] = dataPagePtr.i;
- }//for
- fragrecptr.p->lcpMaxDirIndex = fragrecptr.p->dirsize;
- fragrecptr.p->lcpMaxOverDirIndex = fragrecptr.p->lastOverIndex;
- fragrecptr.p->createLcp = ZTRUE;
- operationRecPtr.i = fragrecptr.p->lockOwnersList;
- lcp_write_op_to_undolog(signal);
- }
- void
- Dbacc::lcp_write_op_to_undolog(Signal* signal)
- {
- bool delay_continueb= false;
- Uint32 i, j;
- for (i= 0; i < 16; i++) {
- jam();
- if (remainingUndoPages() <= ZMIN_UNDO_PAGES_AT_COMMIT) {
- jam();
- delay_continueb= true;
- break;
- }
- for (j= 0; j < 32; j++) {
- if (operationRecPtr.i == RNIL) {
- jam();
- break;
- }
- jam();
- ptrCheckGuard(operationRecPtr, coprecsize, operationrec);
- if ((operationRecPtr.p->operation == ZINSERT) ||
- (operationRecPtr.p->elementIsDisappeared == ZTRUE)){
- /*******************************************************************
- * Only log inserts and elements that are marked as dissapeared.
- * All other operations update the element header and that is handled
- * when pages are written to disk
- ********************************************************************/
- undopageptr.i = (cundoposition>>ZUNDOPAGEINDEXBITS) & (cundopagesize-1);
- ptrAss(undopageptr, undopage);
- theadundoindex = cundoposition & ZUNDOPAGEINDEX_MASK;
- tundoindex = theadundoindex + ZUNDOHEADSIZE;
- writeUndoOpInfo(signal);/* THE INFORMATION ABOUT ELEMENT HEADER, STORED*/
- /* IN OP REC, IS WRITTEN AT UNDO PAGES */
- cundoElemIndex = 0;/* DEFAULT VALUE USED BY WRITE_UNDO_HEADER SUBROTINE */
- writeUndoHeader(signal, RNIL, UndoHeader::ZOP_INFO); /* WRITE THE HEAD OF THE UNDO ELEMENT */
- checkUndoPages(signal); /* SEND UNDO PAGE TO DISK WHEN A GROUP OF */
- /* UNDO PAGES,CURRENTLY 8, IS FILLED */
- }
- operationRecPtr.i = operationRecPtr.p->nextLockOwnerOp;
- }
- if (operationRecPtr.i == RNIL) {
- jam();
- break;
- }
- }
- if (operationRecPtr.i != RNIL) {
- jam();
- signal->theData[0]= ZLCP_OP_WRITE_RT_BREAK;
- signal->theData[1]= operationRecPtr.i;
- signal->theData[2]= fragrecptr.i;
- signal->theData[3]= lcpConnectptr.i;
- if (delay_continueb) {
- jam();
- sendSignalWithDelay(cownBlockref, GSN_CONTINUEB, signal, 10, 4);
- } else {
- jam();
- sendSignal(cownBlockref, GSN_CONTINUEB, signal, 4, JBB);
- }
- return;
- }
- signal->theData[0] = fragrecptr.p->lcpLqhPtr;
- sendSignal(lcpConnectptr.p->lcpUserblockref, GSN_ACC_LCPSTARTED,
- signal, 1, JBA);
- fragrecptr.p->activeDataPage = 0;
- fragrecptr.p->lcpDirIndex = 0;
- fragrecptr.p->fragState = LCP_SEND_PAGES;
- signal->theData[0] = lcpConnectptr.i;
- signal->theData[1] = fragrecptr.i;
- sendSignal(cownBlockref, GSN_ACC_SAVE_PAGES, signal, 2, JBB);
- }
- /* ******************--------------------------------------------------------------- */
- /* ACC_SAVE_PAGES A GROUP OF PAGES IS ALLOCATED. THE PAGES AND OVERFLOW */
- /* PAGES OF THE FRAGMENT ARE COPIED IN THEM AND IS SEND TO */
- /* THE DATA FILE OF THE CHECK POINT. */
- /* SENDER: ACC, LEVEL B */
- /* ENTER ACC_SAVE_PAGES WITH */
- /* LCP_CONNECTPTR, CONNECTION RECORD PTR */
- /* FRAGRECPTR FRAGMENT RECORD PTR */
- /* ******************--------------------------------------------------------------- */
- /* ******************--------------------------------------------------------------- */
- /* ACC_SAVE_PAGES REQUEST TO SEND THE PAGE TO DISK */
- /* ******************------------------------------+ UNDO PAGES */
- /* SENDER: ACC, LEVEL B */
- void Dbacc::execACC_SAVE_PAGES(Signal* signal)
- {
- jamEntry();
- lcpConnectptr.i = signal->theData[0];
- /* CONNECTION RECORD PTR */
- fragrecptr.i = signal->theData[1];
- /* FRAGMENT RECORD PTR */
- tresult = 0;
- ptrCheckGuard(lcpConnectptr, clcpConnectsize, lcpConnectrec);
- if (lcpConnectptr.p->lcpstate != LCP_ACTIVE) {
- jam();
- sendSystemerror(signal);
- return;
- }//if
- if (ERROR_INSERTED(3000)) {
- ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- rootfragrecptr.i = fragrecptr.p->myroot;
- ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
- if (rootfragrecptr.p->mytabptr == c_errorInsert3000_TableId){
- ndbout << "Delay writing of datapages" << endl;
- // Delay writing of pages
- jam();
- sendSignalWithDelay(cownBlockref, GSN_ACC_SAVE_PAGES, signal, 1000, 2);
- return;
- }
- }
- if (clblPageCounter == 0) {
- jam();
- signal->theData[0] = lcpConnectptr.i;
- signal->theData[1] = fragrecptr.i;
- sendSignalWithDelay(cownBlockref, GSN_ACC_SAVE_PAGES, signal, 100, 2);
- return;
- } else {
- jam();
- clblPageCounter = clblPageCounter - 1;
- }//if
- ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- if (fragrecptr.p->fragState == LCP_SEND_PAGES) {
- jam();
- savepagesLab(signal);
- return;
- } else {
- if (fragrecptr.p->fragState == LCP_SEND_OVER_PAGES) {
- jam();
- saveOverPagesLab(signal);
- return;
- } else {
- ndbrequire(fragrecptr.p->fragState == LCP_SEND_ZERO_PAGE);
- jam();
- saveZeroPageLab(signal);
- return;
- }//if
- }//if
- }//Dbacc::execACC_SAVE_PAGES()
- void Dbacc::savepagesLab(Signal* signal)
- {
- DirRangePtr spDirRangePtr;
- DirectoryarrayPtr spDirptr;
- Page8Ptr aspPageptr;
- Page8Ptr aspCopyPageptr;
- Uint32 taspDirindex;
- Uint32 taspDirIndex;
- Uint32 taspIndex;
- if ((fragrecptr.p->lcpDirIndex >= fragrecptr.p->dirsize) ||
- (fragrecptr.p->lcpDirIndex >= fragrecptr.p->lcpMaxDirIndex)) {
- jam();
- endsavepageLab(signal);
- return;
- }//if
- /* SOME EXPAND PROCESSES HAVE BEEN PERFORMED. */
- /* THE ADDED PAGE ARE NOT SENT TO DISK */
- arrGuard(fragrecptr.p->activeDataPage, 8);
- aspCopyPageptr.i = fragrecptr.p->datapages[fragrecptr.p->activeDataPage];
- ptrCheckGuard(aspCopyPageptr, cpagesize, page8);
- taspDirindex = fragrecptr.p->lcpDirIndex; /* DIRECTORY OF ACTIVE PAGE */
- spDirRangePtr.i = fragrecptr.p->directory;
- taspDirIndex = taspDirindex >> 8;
- taspIndex = taspDirindex & 0xff;
- ptrCheckGuard(spDirRangePtr, cdirrangesize, dirRange);
- arrGuard(taspDirIndex, 256);
- spDirptr.i = spDirRangePtr.p->dirArray[taspDirIndex];
- ptrCheckGuard(spDirptr, cdirarraysize, directoryarray);
- aspPageptr.i = spDirptr.p->pagep[taspIndex];
- ptrCheckGuard(aspPageptr, cpagesize, page8);
- ndbrequire(aspPageptr.p->word32[ZPOS_PAGE_ID] == fragrecptr.p->lcpDirIndex);
- lcnPageptr = aspPageptr;
- lcnCopyPageptr = aspCopyPageptr;
- lcpCopyPage(signal);
- fragrecptr.p->lcpDirIndex++;
- fragrecptr.p->activeDataPage++;
- if (fragrecptr.p->activeDataPage < ZWRITEPAGESIZE) {
- jam();
- signal->theData[0] = lcpConnectptr.i;
- signal->theData[1] = fragrecptr.i;
- sendSignal(cownBlockref, GSN_ACC_SAVE_PAGES, signal, 2, JBB);
- return;
- }//if
- senddatapagesLab(signal);
- return;
- }//Dbacc::savepagesLab()
- /* FRAGRECPTR:ACTIVE_DATA_PAGE = ZWRITEPAGESIZE */
- /* SEND A GROUP OF PAGES TO DISK */
- void Dbacc::senddatapagesLab(Signal* signal)
- {
- fsConnectptr.i = fragrecptr.p->fsConnPtr;
- ptrCheckGuard(fsConnectptr, cfsConnectsize, fsConnectrec);
- seizeFsOpRec(signal);
- initFsOpRec(signal);
- fsOpptr.p->fsOpstate = WAIT_WRITE_DATA;
- ndbrequire(fragrecptr.p->activeDataPage <= 8);
- for (Uint32 i = 0; i < fragrecptr.p->activeDataPage; i++) {
- signal->theData[i + 6] = fragrecptr.p->datapages[i];