DbaccMain.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:575k
- }//for
- signal->theData[fragrecptr.p->activeDataPage + 6] = fragrecptr.p->activeDataFilePage;
- signal->theData[0] = fsConnectptr.p->fsPtr;
- signal->theData[1] = cownBlockref;
- signal->theData[2] = fsOpptr.i;
- signal->theData[3] = 0x2;
- signal->theData[4] = ZPAGE8_BASE_ADD;
- signal->theData[5] = fragrecptr.p->activeDataPage;
- sendSignal(NDBFS_REF, GSN_FSWRITEREQ, signal, 15, JBA);
- return;
- }//Dbacc::senddatapagesLab()
- void Dbacc::endsavepageLab(Signal* signal)
- {
- Page8Ptr espPageidptr;
- espPageidptr.i = fragrecptr.p->zeroPagePtr;
- ptrCheckGuard(espPageidptr, cpagesize, page8);
- dbgWord32(espPageidptr, ZPAGEZERO_NO_PAGES, fragrecptr.p->lcpDirIndex);
- espPageidptr.p->word32[ZPAGEZERO_NO_PAGES] = fragrecptr.p->lcpDirIndex;
- fragrecptr.p->fragState = LCP_SEND_OVER_PAGES;
- fragrecptr.p->noOfStoredOverPages = 0;
- fragrecptr.p->lcpDirIndex = 0;
- saveOverPagesLab(signal);
- return;
- }//Dbacc::endsavepageLab()
- /* ******************--------------------------------------------------------------- */
- /* ACC_SAVE_OVER_PAGES CONTINUE SAVING THE LEFT OVERPAGES. */
- /* ******************--------------------------------------------------------------- */
- void Dbacc::saveOverPagesLab(Signal* signal)
- {
- DirRangePtr sopDirRangePtr;
- DirectoryarrayPtr sopOverflowDirptr;
- Page8Ptr sopPageptr;
- Page8Ptr sopCopyPageptr;
- Uint32 tsopDirindex;
- Uint32 tsopDirInd;
- Uint32 tsopIndex;
- if ((fragrecptr.p->lcpDirIndex >= fragrecptr.p->lastOverIndex) ||
- (fragrecptr.p->lcpDirIndex >= fragrecptr.p->lcpMaxOverDirIndex)) {
- jam();
- endsaveoverpageLab(signal);
- return;
- }//if
- arrGuard(fragrecptr.p->activeDataPage, 8);
- sopCopyPageptr.i = fragrecptr.p->datapages[fragrecptr.p->activeDataPage];
- ptrCheckGuard(sopCopyPageptr, cpagesize, page8);
- tsopDirindex = fragrecptr.p->lcpDirIndex;
- sopDirRangePtr.i = fragrecptr.p->overflowdir;
- tsopDirInd = tsopDirindex >> 8;
- tsopIndex = tsopDirindex & 0xff;
- ptrCheckGuard(sopDirRangePtr, cdirrangesize, dirRange);
- arrGuard(tsopDirInd, 256);
- sopOverflowDirptr.i = sopDirRangePtr.p->dirArray[tsopDirInd];
- ptrCheckGuard(sopOverflowDirptr, cdirarraysize, directoryarray);
- sopPageptr.i = sopOverflowDirptr.p->pagep[tsopIndex];
- fragrecptr.p->lcpDirIndex++;
- if (sopPageptr.i != RNIL) {
- jam();
- ptrCheckGuard(sopPageptr, cpagesize, page8);
- ndbrequire(sopPageptr.p->word32[ZPOS_PAGE_ID] == tsopDirindex);
- ndbrequire(((sopPageptr.p->word32[ZPOS_PAGE_TYPE] >> ZPOS_PAGE_TYPE_BIT) & 3) != ZNORMAL_PAGE_TYPE);
- lcnPageptr = sopPageptr;
- lcnCopyPageptr = sopCopyPageptr;
- lcpCopyPage(signal);
- fragrecptr.p->noOfStoredOverPages++;
- fragrecptr.p->activeDataPage++;
- if ((sopPageptr.p->word32[ZPOS_ALLOC_CONTAINERS] == 0)) {
- //ndbrequire(((sopPageptr.p->word32[ZPOS_PAGE_TYPE] >> ZPOS_PAGE_TYPE_BIT) & 3) == ZOVERFLOW_PAGE_TYPE);
- if (((sopPageptr.p->word32[ZPOS_PAGE_TYPE] >> ZPOS_PAGE_TYPE_BIT) & 3) ==
- ZOVERFLOW_PAGE_TYPE) {
- /*--------------------------------------------------------------------------------*/
- /* THE PAGE IS EMPTY AND WAITING TO BE RELEASED. IT COULD NOT BE RELEASED */
- /* EARLIER SINCE IT WAS PART OF A LOCAL CHECKPOINT. */
- /*--------------------------------------------------------------------------------*/
- jam();
- ropPageptr = sopPageptr;
- releaseOverpage(signal);
- } else if (((sopPageptr.p->word32[ZPOS_PAGE_TYPE] >> ZPOS_PAGE_TYPE_BIT) & 3) ==
- ZLONG_PAGE_TYPE) {
- //----------------------------------------------------------------------
- // The long key page is empty, release it.
- //----------------------------------------------------------------------
- jam();
- rlopPageptr = sopPageptr;
- releaseLongPage(signal);
- } else {
- jam();
- sendSystemerror(signal);
- }
- }//if
- }
- if (fragrecptr.p->activeDataPage == ZWRITEPAGESIZE) {
- jam();
- senddatapagesLab(signal);
- return;
- }//if
- signal->theData[0] = lcpConnectptr.i;
- signal->theData[1] = fragrecptr.i;
- sendSignal(cownBlockref, GSN_ACC_SAVE_PAGES, signal, 2, JBB);
- return;
- }//Dbacc::saveOverPagesLab()
- void Dbacc::endsaveoverpageLab(Signal* signal)
- {
- Page8Ptr esoPageidptr;
- esoPageidptr.i = fragrecptr.p->zeroPagePtr;
- ptrCheckGuard(esoPageidptr, cpagesize, page8);
- dbgWord32(esoPageidptr, ZPAGEZERO_NO_OVER_PAGE, fragrecptr.p->noOfStoredOverPages);
- esoPageidptr.p->word32[ZPAGEZERO_NO_OVER_PAGE] = fragrecptr.p->noOfStoredOverPages;
- fragrecptr.p->fragState = LCP_SEND_ZERO_PAGE;
- if (fragrecptr.p->activeDataPage != 0) {
- jam();
- senddatapagesLab(signal); /* SEND LEFT PAGES TO DISK */
- return;
- }//if
- saveZeroPageLab(signal);
- return;
- }//Dbacc::endsaveoverpageLab()
- /* ******************--------------------------------------------------------------- */
- /* ACC_SAVE_ZERO_PAGE PAGE ZERO IS SENT TO DISK.IT IS THE LAST STAGE AT THE */
- /* CREATION LCP. ACC_LCPCONF IS RETURND. */
- /* ******************--------------------------------------------------------------- */
- void Dbacc::saveZeroPageLab(Signal* signal)
- {
- Page8Ptr szpPageidptr;
- Uint32 Tchs;
- Uint32 Ti;
- fragrecptr.p->createLcp = ZFALSE;
- fsConnectptr.i = fragrecptr.p->fsConnPtr;
- ptrCheckGuard(fsConnectptr, cfsConnectsize, fsConnectrec);
- szpPageidptr.i = fragrecptr.p->zeroPagePtr;
- ptrCheckGuard(szpPageidptr, cpagesize, page8);
- dbgWord32(szpPageidptr, ZPAGEZERO_PREV_UNDOP, fragrecptr.p->prevUndoposition);
- szpPageidptr.p->word32[ZPAGEZERO_PREV_UNDOP] = fragrecptr.p->prevUndoposition;
- dbgWord32(szpPageidptr, ZPAGEZERO_NEXT_UNDO_FILE, cactiveUndoFileVersion);
- szpPageidptr.p->word32[ZPAGEZERO_NEXT_UNDO_FILE] = cactiveUndoFileVersion;
- fragrecptr.p->fragState = WAIT_ZERO_PAGE_STORED;
- /* --------------------------------------------------------------------------------- */
- // Calculate the checksum and store it for the zero page of the fragment.
- /* --------------------------------------------------------------------------------- */
- szpPageidptr.p->word32[ZPOS_CHECKSUM] = 0;
- Tchs = 0;
- for (Ti = 0; Ti < 2048; Ti++) {
- Tchs = Tchs ^ szpPageidptr.p->word32[Ti];
- }//for
- szpPageidptr.p->word32[ZPOS_CHECKSUM] = Tchs;
- dbgWord32(szpPageidptr, ZPOS_CHECKSUM, Tchs);
- seizeFsOpRec(signal);
- initFsOpRec(signal);
- fsOpptr.p->fsOpstate = WAIT_WRITE_DATA;
- if (clblPageCounter > 0) {
- jam();
- clblPageCounter = clblPageCounter - 1;
- } else {
- jam();
- clblPageOver = clblPageOver + 1;
- }//if
- /* ************************ */
- /* FSWRITEREQ */
- /* ************************ */
- signal->theData[0] = fsConnectptr.p->fsPtr;
- signal->theData[1] = cownBlockref;
- signal->theData[2] = fsOpptr.i;
- signal->theData[3] = 0x10;
- /* FLAG = LIST MEM PAGES, LIST FILE PAGES */
- /* SYNC FILE AFTER WRITING */
- signal->theData[4] = ZPAGE8_BASE_ADD;
- signal->theData[5] = 1;
- /* NO OF PAGES */
- signal->theData[6] = fragrecptr.p->zeroPagePtr;
- /* ZERO PAGE */
- signal->theData[7] = 0;
- sendSignal(NDBFS_REF, GSN_FSWRITEREQ, signal, 8, JBA);
- /* ZERO PAGE AT DATA FILE */
- return;
- }//Dbacc::saveZeroPageLab()
- /* ******************--------------------------------------------------------------- */
- /* FSWRITECONF OPENFILE CONF */
- /* ENTER FSWRITECONF WITH SENDER: FS, LEVEL B */
- /* FS_OPPTR FS_CONNECTION PTR */
- /* ******************--------------------------------------------------------------- */
- void Dbacc::lcpCloseDataFileLab(Signal* signal)
- {
- rootfragrecptr.i = fragrecptr.p->myroot;
- ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
- lcpConnectptr.i = rootfragrecptr.p->lcpPtr;
- ptrCheckGuard(lcpConnectptr, clcpConnectsize, lcpConnectrec);
- fsConnectptr.p->fsState = LCP_CLOSE_DATA;
- /* ************************ */
- /* FSCLOSEREQ */
- /* ************************ */
- /* CLOSE DATA FILE */
- signal->theData[0] = fsConnectptr.p->fsPtr;
- signal->theData[1] = cownBlockref;
- signal->theData[2] = fsConnectptr.i;
- signal->theData[3] = ZFALSE;
- sendSignal(NDBFS_REF, GSN_FSCLOSEREQ, signal, 4, JBA);
- /* FLAG = 0, DO NOT DELETE FILE */
- return;
- }//Dbacc::lcpCloseDataFileLab()
- void Dbacc::checkSyncUndoPagesLab(Signal* signal)
- {
- fragrecptr.i = fsConnectptr.p->fragrecPtr;
- ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- releaseFsConnRec(signal);
- rootfragrecptr.i = fragrecptr.p->myroot;
- ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
- lcpConnectptr.i = rootfragrecptr.p->lcpPtr;
- ptrCheckGuard(lcpConnectptr, clcpConnectsize, lcpConnectrec);
- switch (lcpConnectptr.p->syncUndopageState) {
- case WAIT_NOTHING:
- jam();
- lcpConnectptr.p->syncUndopageState = WAIT_ONE_CONF;
- break;
- case WAIT_ONE_CONF:
- jam();
- lcpConnectptr.p->syncUndopageState = WAIT_TWO_CONF;
- break;
- default:
- jam();
- sendSystemerror(signal);
- return;
- break;
- }//switch
- /* ACTIVE UNDO PAGE ID */
- Uint32 tundoPageId = cundoposition >> ZUNDOPAGEINDEXBITS;
- tmp1 = tundoPageId - (tundoPageId & (ZWRITE_UNDOPAGESIZE - 1));
- /* START PAGE OF THE LAST UNDO PAGES GROUP */
- tmp2 = (tundoPageId - tmp1) + 1; /* NO OF LEFT UNDO PAGES */
- tmp1 = tmp1 & (cundopagesize - 1); /* 1 MBYTE PAGE WINDOW IN MEMORY */
- fsConnectptr.i = cactiveOpenUndoFsPtr;
- ptrCheckGuard(fsConnectptr, cfsConnectsize, fsConnectrec);
- seizeFsOpRec(signal);
- initFsOpRec(signal);
- fsOpptr.p->fsOpstate = WAIT_WRITE_UNDO;
- fsOpptr.p->fsOpMemPage = tundoPageId; /* RECORD MEMORY PAGE WRITTEN */
- if (clblPageCounter >= (4 * tmp2)) {
- jam();
- clblPageCounter = clblPageCounter - (4 * tmp2);
- } else {
- jam();
- clblPageOver = clblPageOver + ((4 * tmp2) - clblPageCounter);
- clblPageCounter = 0;
- }//if
- /* ************************ */
- /* FSWRITEREQ */
- /* ************************ */
- signal->theData[0] = fsConnectptr.p->fsPtr;
- signal->theData[1] = cownBlockref;
- signal->theData[2] = fsOpptr.i;
- /* FLAG = START MEM PAGES, START FILE PAGES */
- /* SYNC FILE AFTER WRITING */
- signal->theData[3] = 0x11;
- signal->theData[4] = ZUNDOPAGE_BASE_ADD;
- /* NO OF UNDO PAGES */
- signal->theData[5] = tmp2;
- /* FIRST MEMORY PAGE */
- signal->theData[6] = tmp1;
- /* ACTIVE PAGE AT UNDO FILE */
- signal->theData[7] = cactiveUndoFilePage;
- sendSignal(NDBFS_REF, GSN_FSWRITEREQ, signal, 8, JBA);
- return;
- }//Dbacc::checkSyncUndoPagesLab()
- void Dbacc::checkSendLcpConfLab(Signal* signal)
- {
- rootfragrecptr.i = fragrecptr.p->myroot;
- ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
- lcpConnectptr.i = rootfragrecptr.p->lcpPtr;
- ptrCheckGuard(lcpConnectptr, clcpConnectsize, lcpConnectrec);
- ndbrequire(lcpConnectptr.p->lcpstate == LCP_ACTIVE);
- switch (lcpConnectptr.p->syncUndopageState) {
- case WAIT_ONE_CONF:
- jam();
- lcpConnectptr.p->syncUndopageState = WAIT_NOTHING;
- break;
- case WAIT_TWO_CONF:
- jam();
- lcpConnectptr.p->syncUndopageState = WAIT_ONE_CONF;
- break;
- default:
- ndbrequire(false);
- break;
- }//switch
- lcpConnectptr.p->noOfLcpConf++;
- ndbrequire(lcpConnectptr.p->noOfLcpConf <= 4);
- fragrecptr.p->fragState = ACTIVEFRAG;
- rlpPageptr.i = fragrecptr.p->zeroPagePtr;
- ptrCheckGuard(rlpPageptr, cpagesize, page8);
- releaseLcpPage(signal);
- fragrecptr.p->zeroPagePtr = RNIL;
- for (Uint32 i = 0; i < ZWRITEPAGESIZE; i++) {
- jam();
- if (fragrecptr.p->datapages[i] != RNIL) {
- jam();
- rlpPageptr.i = fragrecptr.p->datapages[i];
- ptrCheckGuard(rlpPageptr, cpagesize, page8);
- releaseLcpPage(signal);
- fragrecptr.p->datapages[i] = RNIL;
- }//if
- }//for
- signal->theData[0] = fragrecptr.p->lcpLqhPtr;
- sendSignal(lcpConnectptr.p->lcpUserblockref, GSN_ACC_LCPCONF, signal, 1, JBB);
- if (lcpConnectptr.p->noOfLcpConf == 4) {
- jam();
- releaseLcpConnectRec(signal);
- rootfragrecptr.i = fragrecptr.p->myroot;
- ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
- rootfragrecptr.p->rootState = ACTIVEROOT;
- }//if
- }//Dbacc::checkSendLcpConfLab()
- /* ******************--------------------------------------------------------------- */
- /* ACC_CONTOPREQ */
- /* SENDER: LQH, LEVEL B */
- /* ENTER ACC_CONTOPREQ WITH */
- /* LCP_CONNECTPTR */
- /* TMP1 LOCAL FRAG ID */
- /* ******************--------------------------------------------------------------- */
- /* ******************--------------------------------------------------------------- */
- /* ACC_CONTOPREQ COMMIT TRANSACTION */
- /* ******************------------------------------+ */
- /* SENDER: LQH, LEVEL B */
- void Dbacc::execACC_CONTOPREQ(Signal* signal)
- {
- Uint32 tcorLocalFrag;
- jamEntry();
- lcpConnectptr.i = signal->theData[0];
- /* CONNECTION PTR */
- tcorLocalFrag = signal->theData[1];
- /* LOCAL FRAG ID */
- tresult = 0;
- ptrCheckGuard(lcpConnectptr, clcpConnectsize, lcpConnectrec);
- if(ERROR_INSERTED(3002) && lcpConnectptr.p->noOfLcpConf < 2)
- {
- sendSignalWithDelay(cownBlockref, GSN_ACC_CONTOPREQ, signal, 300,
- signal->getLength());
- return;
- }
-
- ndbrequire(lcpConnectptr.p->lcpstate == LCP_ACTIVE);
- rootfragrecptr.i = lcpConnectptr.p->rootrecptr;
- ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
- if (rootfragrecptr.p->fragmentid[0] == tcorLocalFrag) {
- jam();
- fragrecptr.i = rootfragrecptr.p->fragmentptr[0];
- ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- } else {
- ndbrequire(rootfragrecptr.p->fragmentid[1] == tcorLocalFrag);
- jam();
- fragrecptr.i = rootfragrecptr.p->fragmentptr[1];
- ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- }//if
- operationRecPtr.i = fragrecptr.p->firstWaitInQueOp;
- fragrecptr.p->sentWaitInQueOp = RNIL;
- fragrecptr.p->stopQueOp = ZFALSE;
- while (operationRecPtr.i != RNIL) {
- jam();
- ptrCheckGuard(operationRecPtr, coprecsize, operationrec);
- if (operationRecPtr.p->opState == WAIT_EXE_OP) {
- jam();
- //------------------------------------------------------------
- // Indicate that we are now a normal waiter in the queue. We
- // will remove the operation from the queue as part of starting
- // operation again.
- //------------------------------------------------------------
- operationRecPtr.p->opState = WAIT_IN_QUEUE;
- executeNextOperation(signal);
- }//if
- operationRecPtr.i = operationRecPtr.p->nextQueOp;
- }//while
- signal->theData[0] = fragrecptr.p->lcpLqhPtr;
- sendSignal(lcpConnectptr.p->lcpUserblockref, GSN_ACC_CONTOPCONF, signal, 1, JBA);
- lcpConnectptr.p->noOfLcpConf++;
- if (lcpConnectptr.p->noOfLcpConf == 4) {
- jam();
- releaseLcpConnectRec(signal);
- rootfragrecptr.i = fragrecptr.p->myroot;
- ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
- rootfragrecptr.p->rootState = ACTIVEROOT;
- }//if
- return; /* ALL QUEUED OPERATION ARE RESTARTED IF NEEDED. */
- }//Dbacc::execACC_CONTOPREQ()
- /* ******************--------------------------------------------------------------- */
- /* END_LCPREQ END OF LOCAL CHECK POINT */
- /* ENTER END_LCPREQ WITH SENDER: LQH, LEVEL B */
- /* CLQH_PTR, LQH PTR */
- /* CLQH_BLOCK_REF LQH BLOCK REF */
- /* ******************--------------------------------------------------------------- */
- /* ******************--------------------------------------------------------------- */
- /* END_LCPREQ PERFORM A LOCAL CHECK POINT */
- /* ******************------------------------------+ */
- /* SENDER: LQH, LEVEL B */
- void Dbacc::execEND_LCPREQ(Signal* signal)
- {
- jamEntry();
- clqhPtr = signal->theData[0];
- /* LQH PTR */
- clqhBlockRef = signal->theData[1];
- /* LQH BLOCK REF */
- tresult = 0;
- fsConnectptr.i = cactiveOpenUndoFsPtr;
- ptrCheckGuard(fsConnectptr, cfsConnectsize, fsConnectrec);
- fsConnectptr.p->fsState = WAIT_CLOSE_UNDO; /* CLOSE FILE AFTER WRITTING */
- /* ************************ */
- /* FSCLOSEREQ */
- /* ************************ */
- signal->theData[0] = fsConnectptr.p->fsPtr;
- signal->theData[1] = cownBlockref;
- signal->theData[2] = fsConnectptr.i;
- signal->theData[3] = ZFALSE;
- sendSignal(NDBFS_REF, GSN_FSCLOSEREQ, signal, 4, JBA);
- /* FLAG = 0, DO NOT DELETE FILE */
- cactiveUndoFileVersion = RNIL;
- cactiveOpenUndoFsPtr = RNIL;
- /* ************************ */
- /* END_LCPCONF */
- /* ************************ */
- signal->theData[0] = clqhPtr;
- sendSignal(clqhBlockRef, GSN_END_LCPCONF, signal, 1, JBB);
- return;
- }//Dbacc::execEND_LCPREQ()
- /*-----------------------------------------------------------------*/
- /* WHEN WE COPY THE PAGE WE ALSO WRITE THE ELEMENT HEADER AS */
- /* UNLOCKED IF THEY ARE CURRENTLY LOCKED. */
- /*-----------------------------------------------------------------*/
- void Dbacc::lcpCopyPage(Signal* signal)
- {
- Uint32 tlcnNextContainer;
- Uint32 tlcnTmp;
- Uint32 tlcnConIndex;
- Uint32 tlcnIndex;
- Uint32 Tmp1;
- Uint32 Tmp2;
- Uint32 Tmp3;
- Uint32 Tmp4;
- Uint32 Ti;
- Uint32 Tchs;
- Uint32 Tlimit;
- Tchs = 0;
- lupPageptr.p = lcnCopyPageptr.p;
- lcnPageptr.p->word32[ZPOS_CHECKSUM] = Tchs;
- for (Ti = 0; Ti < 32 ; Ti++) {
- Tlimit = 16 + (Ti << 6);
- for (tlcnTmp = (Ti << 6); tlcnTmp < Tlimit; tlcnTmp ++) {
- Tmp1 = lcnPageptr.p->word32[tlcnTmp];
- Tmp2 = lcnPageptr.p->word32[tlcnTmp + 16];
- Tmp3 = lcnPageptr.p->word32[tlcnTmp + 32];
- Tmp4 = lcnPageptr.p->word32[tlcnTmp + 48];
- lcnCopyPageptr.p->word32[tlcnTmp] = Tmp1;
- lcnCopyPageptr.p->word32[tlcnTmp + 16] = Tmp2;
- lcnCopyPageptr.p->word32[tlcnTmp + 32] = Tmp3;
- lcnCopyPageptr.p->word32[tlcnTmp + 48] = Tmp4;
- Tchs = Tchs ^ Tmp1;
- Tchs = Tchs ^ Tmp2;
- Tchs = Tchs ^ Tmp3;
- Tchs = Tchs ^ Tmp4;
- }//for
- }//for
- tlcnChecksum = Tchs;
- if (((lcnCopyPageptr.p->word32[ZPOS_PAGE_TYPE] >> ZPOS_PAGE_TYPE_BIT) & 3) != ZLONG_PAGE_TYPE) {
- jam();
- if (((lcnCopyPageptr.p->word32[ZPOS_PAGE_TYPE] >> ZPOS_PAGE_TYPE_BIT) & 3) == ZNORMAL_PAGE_TYPE) {
- jam();
- /*-----------------------------------------------------------------*/
- /* TAKE CARE OF ALL 64 BUFFERS ADDRESSED BY ALGORITHM IN */
- /* FIRST PAGE. IF THEY ARE EMPTY THEY STILL HAVE A CONTAINER */
- /* HEADER OF 2 WORDS. */
- /*-----------------------------------------------------------------*/
- tlcnConIndex = ZHEAD_SIZE;
- tlupForward = 1;
- for (tlcnIndex = 0; tlcnIndex <= ZNO_CONTAINERS - 1; tlcnIndex++) {
- tlupIndex = tlcnConIndex;
- tlupElemIndex = tlcnConIndex + ZCON_HEAD_SIZE;
- lcpUpdatePage(signal);
- tlcnConIndex = tlcnConIndex + ZBUF_SIZE;
- }//for
- }//if
- /*-----------------------------------------------------------------*/
- /* TAKE CARE OF ALL USED BUFFERS ON THE LEFT SIDE. */
- /*-----------------------------------------------------------------*/
- tlcnNextContainer = (lcnCopyPageptr.p->word32[ZPOS_EMPTY_LIST] >> 23) & 0x7f;
- while (tlcnNextContainer < ZEMPTYLIST) {
- tlcnConIndex = (tlcnNextContainer << ZSHIFT_PLUS) - (tlcnNextContainer << ZSHIFT_MINUS);
- tlcnConIndex = tlcnConIndex + ZHEAD_SIZE;
- tlupIndex = tlcnConIndex;
- tlupElemIndex = tlcnConIndex + ZCON_HEAD_SIZE;
- tlupForward = 1;
- lcpUpdatePage(signal);
- tlcnNextContainer = (lcnCopyPageptr.p->word32[tlcnConIndex] >> 11) & 0x7f;
- }//while
- if (tlcnNextContainer == ZEMPTYLIST) {
- jam();
- /*empty*/;
- } else {
- jam();
- sendSystemerror(signal);
- return;
- }//if
- /*-----------------------------------------------------------------*/
- /* TAKE CARE OF ALL USED BUFFERS ON THE RIGHT SIDE. */
- /*-----------------------------------------------------------------*/
- tlupForward = cminusOne;
- tlcnNextContainer = (lcnCopyPageptr.p->word32[ZPOS_EMPTY_LIST] >> 16) & 0x7f;
- while (tlcnNextContainer < ZEMPTYLIST) {
- tlcnConIndex = (tlcnNextContainer << ZSHIFT_PLUS) - (tlcnNextContainer << ZSHIFT_MINUS);
- tlcnConIndex = tlcnConIndex + ((ZHEAD_SIZE + ZBUF_SIZE) - ZCON_HEAD_SIZE);
- tlupIndex = tlcnConIndex;
- tlupElemIndex = tlcnConIndex - 1;
- lcpUpdatePage(signal);
- tlcnNextContainer = (lcnCopyPageptr.p->word32[tlcnConIndex] >> 11) & 0x7f;
- }//while
- if (tlcnNextContainer == ZEMPTYLIST) {
- jam();
- /*empty*/;
- } else {
- jam();
- sendSystemerror(signal);
- return;
- }//if
- }//if
- lcnCopyPageptr.p->word32[ZPOS_CHECKSUM] = tlcnChecksum;
- }//Dbacc::lcpCopyPage()
- /* --------------------------------------------------------------------------------- */
- /* THIS SUBROUTINE GOES THROUGH ONE CONTAINER TO CHECK FOR LOCKED ELEMENTS AND */
- /* UPDATING THEM TO ENSURE ALL ELEMENTS ARE UNLOCKED ON DISK. */
- /* --------------------------------------------------------------------------------- */
- void Dbacc::lcpUpdatePage(Signal* signal)
- {
- OperationrecPtr lupOperationRecPtr;
- Uint32 tlupElemHead;
- Uint32 tlupElemLen;
- Uint32 tlupElemStep;
- Uint32 tlupConLen;
- tlupConLen = lupPageptr.p->word32[tlupIndex] >> 26;
- tlupElemLen = fragrecptr.p->elementLength;
- tlupElemStep = tlupForward * tlupElemLen;
- while (tlupConLen > ZCON_HEAD_SIZE) {
- jam();
- tlupElemHead = lupPageptr.p->word32[tlupElemIndex];
- if (ElementHeader::getLocked(tlupElemHead)) {
- jam();
- /* --------------------------------------------------------------------------------- */
- /* WHEN CHANGING THE ELEMENT HEADER WE ALSO HAVE TO UPDATE THE CHECKSUM. IN */
- /* DOING THIS WE USE THE FORMULA (A XOR B) XOR B = A WHICH MEANS THAT IF WE */
- /* XOR SOMETHING TWICE WITH THE SAME OPERAND THEN WE RETURN TO THE ORIGINAL */
- /* VALUE. THEN WE ALSO HAVE TO USE THE NEW ELEMENT HEADER IN THE CHECKSUM */
- /* CALCULATION. */
- /* --------------------------------------------------------------------------------- */
- tlcnChecksum = tlcnChecksum ^ tlupElemHead;
- lupOperationRecPtr.i = ElementHeader::getOpPtrI(tlupElemHead);
- ptrCheckGuard(lupOperationRecPtr, coprecsize, operationrec);
- const Uint32 hv = lupOperationRecPtr.p->hashvaluePart;
- tlupElemHead = ElementHeader::setUnlocked(hv , 0);
- arrGuard(tlupElemIndex, 2048);
- lupPageptr.p->word32[tlupElemIndex] = tlupElemHead;
- tlcnChecksum = tlcnChecksum ^ tlupElemHead;
- }//if
- tlupConLen = tlupConLen - tlupElemLen;
- tlupElemIndex = tlupElemIndex + tlupElemStep;
- }//while
- if (tlupConLen < ZCON_HEAD_SIZE) {
- jam();
- sendSystemerror(signal);
- }//if
- }//Dbacc::lcpUpdatePage()
- /*-----------------------------------------------------------------*/
- // At a system restart we check that the page do not contain any
- // locks that hinder the system restart procedure.
- /*-----------------------------------------------------------------*/
- void Dbacc::srCheckPage(Signal* signal)
- {
- Uint32 tlcnNextContainer;
- Uint32 tlcnConIndex;
- Uint32 tlcnIndex;
- lupPageptr.p = lcnCopyPageptr.p;
- if (((lcnCopyPageptr.p->word32[ZPOS_PAGE_TYPE] >> ZPOS_PAGE_TYPE_BIT) & 3) != ZLONG_PAGE_TYPE) {
- jam();
- if (((lcnCopyPageptr.p->word32[ZPOS_PAGE_TYPE] >> ZPOS_PAGE_TYPE_BIT) & 3) == ZNORMAL_PAGE_TYPE) {
- jam();
- /*-----------------------------------------------------------------*/
- /* TAKE CARE OF ALL 64 BUFFERS ADDRESSED BY ALGORITHM IN */
- /* FIRST PAGE. IF THEY ARE EMPTY THEY STILL HAVE A CONTAINER */
- /* HEADER OF 2 WORDS. */
- /*-----------------------------------------------------------------*/
- tlcnConIndex = ZHEAD_SIZE;
- tlupForward = 1;
- for (tlcnIndex = 0; tlcnIndex <= ZNO_CONTAINERS - 1; tlcnIndex++) {
- tlupIndex = tlcnConIndex;
- tlupElemIndex = tlcnConIndex + ZCON_HEAD_SIZE;
- srCheckContainer(signal);
- if (tresult != 0) {
- jam();
- return;
- }//if
- tlcnConIndex = tlcnConIndex + ZBUF_SIZE;
- }//for
- }//if
- /*-----------------------------------------------------------------*/
- /* TAKE CARE OF ALL USED BUFFERS ON THE LEFT SIDE. */
- /*-----------------------------------------------------------------*/
- tlcnNextContainer = (lcnCopyPageptr.p->word32[ZPOS_EMPTY_LIST] >> 23) & 0x7f;
- while (tlcnNextContainer < ZEMPTYLIST) {
- tlcnConIndex = (tlcnNextContainer << ZSHIFT_PLUS) - (tlcnNextContainer << ZSHIFT_MINUS);
- tlcnConIndex = tlcnConIndex + ZHEAD_SIZE;
- tlupIndex = tlcnConIndex;
- tlupElemIndex = tlcnConIndex + ZCON_HEAD_SIZE;
- tlupForward = 1;
- srCheckContainer(signal);
- if (tresult != 0) {
- jam();
- return;
- }//if
- tlcnNextContainer = (lcnCopyPageptr.p->word32[tlcnConIndex] >> 11) & 0x7f;
- }//while
- if (tlcnNextContainer == ZEMPTYLIST) {
- jam();
- /*empty*/;
- } else {
- jam();
- tresult = 4;
- return;
- }//if
- /*-----------------------------------------------------------------*/
- /* TAKE CARE OF ALL USED BUFFERS ON THE RIGHT SIDE. */
- /*-----------------------------------------------------------------*/
- tlupForward = cminusOne;
- tlcnNextContainer = (lcnCopyPageptr.p->word32[ZPOS_EMPTY_LIST] >> 16) & 0x7f;
- while (tlcnNextContainer < ZEMPTYLIST) {
- tlcnConIndex = (tlcnNextContainer << ZSHIFT_PLUS) - (tlcnNextContainer << ZSHIFT_MINUS);
- tlcnConIndex = tlcnConIndex + ((ZHEAD_SIZE + ZBUF_SIZE) - ZCON_HEAD_SIZE);
- tlupIndex = tlcnConIndex;
- tlupElemIndex = tlcnConIndex - 1;
- srCheckContainer(signal);
- if (tresult != 0) {
- jam();
- return;
- }//if
- tlcnNextContainer = (lcnCopyPageptr.p->word32[tlcnConIndex] >> 11) & 0x7f;
- }//while
- if (tlcnNextContainer == ZEMPTYLIST) {
- jam();
- /*empty*/;
- } else {
- jam();
- tresult = 4;
- return;
- }//if
- }//if
- }//Dbacc::srCheckPage()
- /* --------------------------------------------------------------------------------- */
- /* THIS SUBROUTINE GOES THROUGH ONE CONTAINER TO CHECK FOR LOCKED ELEMENTS AND */
- /* UPDATING THEM TO ENSURE ALL ELEMENTS ARE UNLOCKED ON DISK. */
- /* --------------------------------------------------------------------------------- */
- void Dbacc::srCheckContainer(Signal* signal)
- {
- Uint32 tlupElemLen;
- Uint32 tlupElemStep;
- Uint32 tlupConLen;
- tlupConLen = lupPageptr.p->word32[tlupIndex] >> 26;
- tlupElemLen = fragrecptr.p->elementLength;
- tlupElemStep = tlupForward * tlupElemLen;
- while (tlupConLen > ZCON_HEAD_SIZE) {
- jam();
- const Uint32 tlupElemHead = lupPageptr.p->word32[tlupElemIndex];
- if (ElementHeader::getLocked(tlupElemHead)){
- jam();
- //-------------------------------------------------------
- // This is absolutely undesirable. We have a lock remaining
- // after the system restart. We send a crash signal that will
- // enter the trace file.
- //-------------------------------------------------------
- tresult = 2;
- return;
- }//if
- tlupConLen = tlupConLen - tlupElemLen;
- tlupElemIndex = tlupElemIndex + tlupElemStep;
- }//while
- if (tlupConLen < ZCON_HEAD_SIZE) {
- jam();
- tresult = 3;
- }//if
- return;
- }//Dbacc::srCheckContainer()
- /* ------------------------------------------------------------------------- */
- /* CHECK_UNDO_PAGES */
- /* DESCRIPTION: CHECKS WHEN A PAGE OR A GROUP OF UNDO PAGES IS FILLED.WHEN */
- /* A PAGE IS FILLED, CUNDOPOSITION WILL BE UPDATE, THE NEW */
- /* POSITION IS THE BEGNING OF THE NEXT UNDO PAGE. */
- /* IN CASE THAT A GROUP IS FILLED THE PAGES ARE SENT TO DISK, */
- /* AND A NEW GROUP IS CHOSEN. */
- /* ------------------------------------------------------------------------- */
- void Dbacc::checkUndoPages(Signal* signal)
- {
- fragrecptr.p->prevUndoposition = cundoposition;
- cprevUndoaddress = cundoposition;
- // Calculate active undo page id
- Uint32 tundoPageId = cundoposition >> ZUNDOPAGEINDEXBITS;
- /**
- * WE WILL WRITE UNTIL WE HAVE ABOUT 8 KBYTE REMAINING ON THE 32 KBYTE
- * PAGE. THIS IS TO ENSURE THAT WE DO NOT HAVE ANY UNDO LOG RECORDS THAT PASS
- * A PAGE BOUNDARIE. THIS SIMPLIFIES CODING TRADING SOME INEFFICIENCY.
- */
- static const Uint32 ZMAXUNDOPAGEINDEX = 7100;
- if (tundoindex < ZMAXUNDOPAGEINDEX) {
- jam();
- cundoposition = (tundoPageId << ZUNDOPAGEINDEXBITS) + tundoindex;
- return;
- }//if
- /**
- * WE CHECK IF MORE THAN 1 MBYTE OF WRITES ARE OUTSTANDING TO THE UNDO FILE.
- * IF SO WE HAVE TO CRASH SINCE WE HAVE NO MORE SPACE TO WRITE UNDO LOG
- * RECORDS IN
- */
- Uint16 nextUndoPageId = tundoPageId + 1;
- updateUndoPositionPage(signal, nextUndoPageId << ZUNDOPAGEINDEXBITS);
- if ((tundoPageId & (ZWRITE_UNDOPAGESIZE - 1)) == (ZWRITE_UNDOPAGESIZE - 1)) {
- jam();
- /* ---------- SEND A GROUP OF UNDO PAGES TO DISK --------- */
- fsConnectptr.i = cactiveOpenUndoFsPtr;
- ptrCheckGuard(fsConnectptr, cfsConnectsize, fsConnectrec);
- Uint32 tcupTmp1 = (tundoPageId - ZWRITE_UNDOPAGESIZE) + 1;
- tcupTmp1 = tcupTmp1 & (cundopagesize - 1); /* 1 MBYTE PAGE WINDOW */
- seizeFsOpRec(signal);
- initFsOpRec(signal);
- fsOpptr.p->fsOpstate = WAIT_WRITE_UNDO_EXIT;
- fsOpptr.p->fsOpMemPage = tundoPageId;
- fragrecptr.p->nrWaitWriteUndoExit++;
- if (clblPageCounter >= 8) {
- jam();
- clblPageCounter = clblPageCounter - 8;
- } else {
- jam();
- clblPageOver = clblPageOver + (8 - clblPageCounter);
- clblPageCounter = 0;
- }//if
- /* ************************ */
- /* FSWRITEREQ */
- /* ************************ */
- signal->theData[0] = fsConnectptr.p->fsPtr;
- signal->theData[1] = cownBlockref;
- signal->theData[2] = fsOpptr.i;
- signal->theData[3] = 0x1;
- /* FLAG = START MEM PAGES, START FILE PAGES */
- signal->theData[4] = ZUNDOPAGE_BASE_ADD;
- signal->theData[5] = ZWRITE_UNDOPAGESIZE;
- signal->theData[6] = tcupTmp1;
- signal->theData[7] = cactiveUndoFilePage;
- sendSignal(NDBFS_REF, GSN_FSWRITEREQ, signal, 8, JBA);
- cactiveUndoFilePage = cactiveUndoFilePage + ZWRITE_UNDOPAGESIZE;
- }//if
- }//Dbacc::checkUndoPages()
- /* --------------------------------------------------------------------------------- */
- /* UNDO_WRITING_PROCESS */
- /* INPUT: FRAGRECPTR, CUNDO_ELEM_INDEX, DATAPAGEPTR, CUNDOINFOLENGTH */
- /* DESCRIPTION: WHEN THE PROCESS OF CREATION LOCAL CHECK POINT HAS */
- /* STARTED. IF THE ACTIVE PAGE IS NOT ALREADY SENT TO DISK, THE */
- /* OLD VALUE OF THE ITEM WHICH IS GOING TO BE CHECKED IS STORED ON */
- /* THE ACTIVE UNDO PAGE. INFORMATION ABOUT UNDO PROCESS IN THE */
- /* BLOCK AND IN THE FRAGMENT WILL BE UPDATED. */
- /* --------------------------------------------------------------------------------- */
- void Dbacc::undoWritingProcess(Signal* signal)
- {
- const Uint32 tactivePageDir = datapageptr.p->word32[ZPOS_PAGE_ID];
- const Uint32 tpageType = (datapageptr.p->word32[ZPOS_EMPTY_LIST] >> ZPOS_PAGE_TYPE_BIT) & 3;
- if (fragrecptr.p->fragState == LCP_SEND_PAGES) {
- if (tpageType == ZNORMAL_PAGE_TYPE) {
- /* --------------------------------------------------------------------------- */
- /* HANDLING OF LOG OF NORMAL PAGES DURING WRITE OF NORMAL PAGES. */
- /* --------------------------------------------------------------------------- */
- if (tactivePageDir < fragrecptr.p->lcpDirIndex) {
- jam();
- /* ------------------------------------------------------------------- */
- /* THIS PAGE HAS ALREADY BEEN WRITTEN IN THE LOCAL CHECKPOINT. */
- /* ------------------------------------------------------------------- */
- /*empty*/;
- } else {
- if (tactivePageDir >= fragrecptr.p->lcpMaxDirIndex) {
- jam();
- /* --------------------------------------------------------------------------- */
- /* OBVIOUSLY THE FRAGMENT HAS EXPANDED SINCE THE START OF THE LOCAL CHECKPOINT.*/
- /* WE NEED NOT LOG ANY UPDATES OF PAGES THAT DID NOT EXIST AT START OF LCP. */
- /* --------------------------------------------------------------------------- */
- /*empty*/;
- } else {
- jam();
- /* --------------------------------------------------------------------------- */
- /* IN ALL OTHER CASES WE HAVE TO WRITE TO THE UNDO LOG. */
- /* --------------------------------------------------------------------------- */
- undopageptr.i = (cundoposition >> ZUNDOPAGEINDEXBITS) & (cundopagesize - 1);
- ptrAss(undopageptr, undopage);
- theadundoindex = cundoposition & ZUNDOPAGEINDEX_MASK;
- tundoindex = theadundoindex + ZUNDOHEADSIZE;
- writeUndoHeader(signal, tactivePageDir, UndoHeader::ZPAGE_INFO);
- tundoElemIndex = cundoElemIndex;
- writeUndoDataInfo(signal);
- checkUndoPages(signal);
- }//if
- }//if
- } else if (tpageType == ZOVERFLOW_PAGE_TYPE) {
- /* --------------------------------------------------------------------------------- */
- /* OVERFLOW PAGE HANDLING DURING WRITE OF NORMAL PAGES. */
- /* --------------------------------------------------------------------------------- */
- if (tactivePageDir >= fragrecptr.p->lcpMaxOverDirIndex) {
- jam();
- /* --------------------------------------------------------------------------------- */
- /* OBVIOUSLY THE FRAGMENT HAS EXPANDED THE NUMBER OF OVERFLOW PAGES SINCE THE */
- /* START OF THE LOCAL CHECKPOINT. WE NEED NOT LOG ANY UPDATES OF PAGES THAT DID*/
- /* NOT EXIST AT START OF LCP. */
- /* --------------------------------------------------------------------------------- */
- /*empty*/;
- } else {
- jam();
- undopageptr.i = (cundoposition >> ZUNDOPAGEINDEXBITS) & (cundopagesize - 1);
- ptrAss(undopageptr, undopage);
- theadundoindex = cundoposition & ZUNDOPAGEINDEX_MASK;
- tundoindex = theadundoindex + ZUNDOHEADSIZE;
- writeUndoHeader(signal, tactivePageDir, UndoHeader::ZOVER_PAGE_INFO);
- tundoElemIndex = cundoElemIndex;
- writeUndoDataInfo(signal);
- checkUndoPages(signal);
- }//if
- } else if (tpageType != ZLONG_PAGE_TYPE) {
- jam();
- /* --------------------------------------------------------------------------- */
- /* ONLY PAGE INFO AND OVERFLOW PAGE INFO CAN BE LOGGED BY THIS ROUTINE. A */
- /* SERIOUS ERROR. */
- /* --------------------------------------------------------------------------- */
- sendSystemerror(signal);
- } else {
- /* --------------------------------------------------------------------------------- */
- /* THIS LOG RECORD IS GENERATED ON A LONG KEY PAGE. THESE PAGES USE LOGICAL */
- /* LOGGING. */
- /* --------------------------------------------------------------------------------- */
- if (tactivePageDir >= fragrecptr.p->lcpMaxOverDirIndex) {
- jam();
- /* --------------------------------------------------------------------------------- */
- /* OBVIOUSLY THE FRAGMENT HAS EXPANDED THE NUMBER OF OVERFLOW PAGES SINCE THE */
- /* START OF THE LOCAL CHECKPOINT. WE NEED NOT LOG ANY UPDATES OF PAGES THAT DID*/
- /* NOT EXIST AT START OF LCP. */
- /* --------------------------------------------------------------------------------- */
- /*empty*/;
- } else {
- jam();
- /* --------------------------------------------------------------------------------- */
- /* LOGICAL LOGGING OF LONG KEY PAGES CAN EITHER BE UNDO OF AN INSERT OR UNDO */
- /* OF A DELETE KEY. UNDO OF DELETE NEEDS TO LOG THE KEY TO BE REINSERTED WHILE */
- /* UNDO OF INSERT ONLY NEEDS TO LOG THE INDEX TO BE DELETED. */
- /* --------------------------------------------------------------------------------- */
- undopageptr.i = (cundoposition >> ZUNDOPAGEINDEXBITS) & (cundopagesize - 1);
- ptrAss(undopageptr, undopage);
- theadundoindex = cundoposition & ZUNDOPAGEINDEX_MASK;
- tundoindex = theadundoindex + ZUNDOHEADSIZE;
- if (cundoinfolength == 0) {
- jam();
- writeUndoHeader(signal, tactivePageDir, UndoHeader::ZUNDO_INSERT_LONG_KEY);
- } else {
- jam();
- writeUndoHeader(signal, tactivePageDir, UndoHeader::ZUNDO_DELETE_LONG_KEY);
- arrGuard(ZWORDS_IN_PAGE - cundoElemIndex, 2048);
- tundoElemIndex = datapageptr.p->word32[ZWORDS_IN_PAGE - cundoElemIndex] & 0xffff;
- writeUndoDataInfo(signal);
- }//if
- checkUndoPages(signal);
- }//if
- }//if
- } else {
- if (fragrecptr.p->fragState == LCP_SEND_OVER_PAGES) {
- jam();
- /* --------------------------------------------------------------------------------- */
- /* DURING WRITE OF OVERFLOW PAGES WE NEED NOT WORRY ANYMORE ABOUT NORMAL PAGES.*/
- /* --------------------------------------------------------------------------------- */
- if (tpageType == ZOVERFLOW_PAGE_TYPE) {
- if (tactivePageDir < fragrecptr.p->lcpDirIndex) {
- jam();
- /* --------------------------------------------------------------------------------- */
- /* THIS PAGE HAS ALREADY BEEN WRITTEN IN THE LOCAL CHECKPOINT. */
- /* --------------------------------------------------------------------------------- */
- /*empty*/;
- } else {
- if (tactivePageDir >= fragrecptr.p->lcpMaxOverDirIndex) {
- jam();
- /* --------------------------------------------------------------------------------- */
- /* OBVIOUSLY THE FRAGMENT HAS EXPANDED THE NUMBER OF OVERFLOW PAGES SINCE THE */
- /* START OF THE LOCAL CHECKPOINT. WE NEED NOT LOG ANY UPDATES OF PAGES THAT DID*/
- /* NOT EXIST AT START OF LCP. */
- /* --------------------------------------------------------------------------------- */
- /*empty*/;
- } else {
- jam();
- undopageptr.i = (cundoposition >> ZUNDOPAGEINDEXBITS) & (cundopagesize - 1);
- ptrAss(undopageptr, undopage);
- theadundoindex = cundoposition & ZUNDOPAGEINDEX_MASK;
- tundoindex = theadundoindex + ZUNDOHEADSIZE;
- writeUndoHeader(signal, tactivePageDir, UndoHeader::ZOVER_PAGE_INFO);
- tundoElemIndex = cundoElemIndex;
- writeUndoDataInfo(signal);
- checkUndoPages(signal);
- }//if
- }//if
- } else if (tpageType == ZLONG_PAGE_TYPE) {
- if (tactivePageDir < fragrecptr.p->lcpDirIndex) {
- jam();
- // -------------------------------------------------------------
- // THIS PAGE HAS ALREADY BEEN WRITTEN IN THE LOCAL CHECKPOINT.
- // -------------------------------------------------------------
- } else {
- if (tactivePageDir >= fragrecptr.p->lcpMaxOverDirIndex) {
- jam();
- // -------------------------------------------------------------
- // OBVIOUSLY THE FRAGMENT HAS EXPANDED THE NUMBER OF OVERFLOW
- // PAGES SINCE THE START OF THE LOCAL CHECKPOINT. WE NEED NOT
- // LOG ANY UPDATES OF PAGES THAT DID NOT EXIST AT START OF LCP.
- // -------------------------------------------------------------
- } else {
- jam();
- // -------------------------------------------------------------
- // LOGICAL LOGGING OF LONG KEY PAGES CAN EITHER BE UNDO OF AN
- // INSERT OR UNDO OF A DELETE KEY. UNDO OF DELETE NEEDS TO LOG
- // THE KEY TO BE REINSERTED WHILE UNDO OF INSERT ONLY NEEDS TO
- // LOG THE INDEX TO BE DELETED.
- // -------------------------------------------------------------
- undopageptr.i = (cundoposition >> ZUNDOPAGEINDEXBITS) & (cundopagesize - 1);
- ptrAss(undopageptr, undopage);
- theadundoindex = cundoposition & ZUNDOPAGEINDEX_MASK;
- tundoindex = theadundoindex + ZUNDOHEADSIZE;
- if (cundoinfolength == 0) {
- jam();
- writeUndoHeader(signal, tactivePageDir, UndoHeader::ZUNDO_INSERT_LONG_KEY);
- } else {
- jam();
- writeUndoHeader(signal, tactivePageDir, UndoHeader::ZUNDO_DELETE_LONG_KEY);
- arrGuard(ZWORDS_IN_PAGE - cundoElemIndex, 2048);
- tundoElemIndex = datapageptr.p->word32[ZWORDS_IN_PAGE - cundoElemIndex] & 0xffff;
- writeUndoDataInfo(signal);
- }//if
- checkUndoPages(signal);
- }//if
- }//if
- }//if
- }//if
- }//if
- }//Dbacc::undoWritingProcess()
- /* --------------------------------------------------------------------------------- */
- /* OTHER STATES MEANS THAT WE HAVE ALREADY WRITTEN ALL PAGES BUT NOT YET RESET */
- /* THE CREATE_LCP FLAG. */
- /* --------------------------------------------------------------------------------- */
- /* --------------------------------------------------------------------------------- */
- /* WRITE_UNDO_DATA_INFO */
- /* --------------------------------------------------------------------------------- */
- void Dbacc::writeUndoDataInfo(Signal* signal)
- {
- Uint32 twudiIndex;
- Uint32 guard22;
- guard22 = cundoinfolength;
- arrGuard((tundoindex + guard22 - 1), 8192);
- arrGuard((tundoElemIndex + guard22 - 1), 2048);
- for (twudiIndex = 1; twudiIndex <= guard22; twudiIndex++) {
- undopageptr.p->undoword[tundoindex] = datapageptr.p->word32[tundoElemIndex];
- tundoindex++;
- tundoElemIndex++;
- }//for
- }//Dbacc::writeUndoDataInfo()
- /* --------------------------------------------------------------------------------- */
- /* WRITE_UNDO_HEADER */
- /* THE HEAD OF UNDO ELEMENT IS 24 BYTES AND CONTAINS THE FOLLOWING INFORMATION: */
- /* TABLE IDENTITY 32 BITS */
- /* ROOT FRAGMENT IDENTITY 32 BITS */
- /* LOCAL FRAGMENT IDENTITY 32 BITS */
- /* LENGTH OF ELEMENT INF0 (BIT 31 - 18) 14 BITS */
- /* INFO TYPE (BIT 17 - 14) 4 BITS */
- /* PAGE INDEX OF THE FIRST FIELD IN THE FRAGMENT (BIT 13 - 0) 14 BITS */
- /* DIRECTORY INDEX OF THE PAGE IN THE FRAGMENT 32 BITS */
- /* ADDRESS OF THE PREVIOUS ELEMENT OF THE FRAGMENT 64 BITS */
- /* ADDRESS OF THE PREVIOUS ELEMENT IN THE UNDO PAGES 64 BITS */
- /* --------------------------------------------------------------------------------- */
- void Dbacc::writeUndoHeader(Signal* signal,
- Uint32 logicalPageId,
- UndoHeader::UndoHeaderType pageType)
- {
- rootfragrecptr.i = fragrecptr.p->myroot;
- ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
- arrGuard(theadundoindex + 6, 8192);
- // Set the structpointer to point at the undo page at the right address.
- UndoHeader * const & undoHeaderPtr =
- (UndoHeader *) &undopageptr.p->undoword[theadundoindex];
- undoHeaderPtr->tableId = rootfragrecptr.p->mytabptr;
- undoHeaderPtr->rootFragId = rootfragrecptr.p->fragmentid[0];
- undoHeaderPtr->localFragId = fragrecptr.p->myfid;
- Uint32 Ttmp = cundoinfolength;
- Ttmp = (Ttmp << 4) + pageType;
- Ttmp = Ttmp << 14;
- undoHeaderPtr->variousInfo = Ttmp + cundoElemIndex;
- undoHeaderPtr->logicalPageId = logicalPageId;
- undoHeaderPtr->prevUndoAddressForThisFrag = fragrecptr.p->prevUndoposition;
- undoHeaderPtr->prevUndoAddress = cprevUndoaddress;
- }//Dbacc::writeUndoHeader()
- /* --------------------------------------------------------------------------------- */
- /* WRITE_UNDO_OP_INFO */
- /* FOR A LOCKED ELEMENT, OPERATION TYPE, UNDO OF ELEMENT HEADER AND THE LENGTH OF*/
- /* THE TUPLE KEY HAVE TO BE SAVED IN UNDO PAGES. IN THIS CASE AN UNDO ELEMENT */
- /* INCLUDES THE FLLOWING ITEMS. */
- /* OPERATION TYPE 32 BITS */
- /* HASH VALUE 32 BITS */
- /* LENGTH OF THE TUPLE = N 32 BITS */
- /* TUPLE KEYS N * 32 BITS */
- /* */
- /* --------------------------------------------------------------------------------- */
- void Dbacc::writeUndoOpInfo(Signal* signal)
- {
- Page8Ptr locPageptr;
- arrGuard((tundoindex + 3), 8192);
- undopageptr.p->undoword[tundoindex] = operationRecPtr.p->operation;
- undopageptr.p->undoword[tundoindex + 1] = operationRecPtr.p->hashValue;
- undopageptr.p->undoword[tundoindex + 2] = operationRecPtr.p->tupkeylen;
- tundoindex = tundoindex + 3;
- if (fragrecptr.p->keyLength != 0) {
- // Fixed size keys
- jam();
- locPageptr.i = operationRecPtr.p->elementPage;
- ptrCheckGuard(locPageptr, cpagesize, page8);
- Uint32 Tforward = operationRecPtr.p->elementIsforward;
- Uint32 TelemPtr = operationRecPtr.p->elementPointer;
- TelemPtr += Tforward;
- TelemPtr += Tforward;
- //---------------------------------------------------------------------------------
- // Now the pointer is at the start of the key part of the element. Now copy from there
- // to the UNDO log.
- //---------------------------------------------------------------------------------
- Uint32 keyLen = operationRecPtr.p->tupkeylen;
- ndbrequire(keyLen <= 8);
- arrGuard(tundoindex+keyLen, 8192);
- for (Uint32 twuoiIndex = 0; twuoiIndex < keyLen; twuoiIndex++) {
- jam();
- arrGuard(TelemPtr, 2048);
- undopageptr.p->undoword[tundoindex] = locPageptr.p->word32[TelemPtr];
- tundoindex++;
- TelemPtr += Tforward;
- }//for
- cundoinfolength = ZOP_HEAD_INFO_LN + operationRecPtr.p->tupkeylen;
- } else {
- // Long keys
- jam();
- arrGuard(operationRecPtr.p->longKeyPageIndex, ZMAX_NO_OF_LONGKEYS_IN_PAGE);
- locPageptr.i = operationRecPtr.p->longPagePtr;
- ptrCheckGuard(locPageptr, cpagesize, page8);
- Uint32 indexValue =
- locPageptr.p->word32[ZWORDS_IN_PAGE - operationRecPtr.p->longKeyPageIndex];
- Uint32 keyLen = indexValue >> 16;
- Uint32 physPageIndex = indexValue & 0xffff;
- ndbrequire(keyLen == operationRecPtr.p->tupkeylen);
- arrGuard(tundoindex+keyLen, 8192);
- arrGuard(physPageIndex+keyLen, 2048);
- for (Uint32 i = 0; i < keyLen; i++){
- undopageptr.p->undoword[tundoindex + i] = locPageptr.p->word32[physPageIndex+i];
- }
- tundoindex = tundoindex + keyLen;
- cundoinfolength = ZOP_HEAD_INFO_LN + keyLen;
- }//if
- }//Dbacc::writeUndoOpInfo()
- /* --------------------------------------------------------------------------------- */
- /* --------------------------------------------------------------------------------- */
- /* --------------------------------------------------------------------------------- */
- /* */
- /* END OF LOCAL CHECKPOINT MODULE */
- /* */
- /* --------------------------------------------------------------------------------- */
- /* --------------------------------------------------------------------------------- */
- /* --------------------------------------------------------------------------------- */
- /* --------------------------------------------------------------------------------- */
- /* --------------------------------------------------------------------------------- */
- /* */
- /* SYSTEM RESTART MODULE */
- /* */
- /* --------------------------------------------------------------------------------- */
- /* --------------------------------------------------------------------------------- */
- /* ******************--------------------------------------------------------------- */
- /* SR_FRAGIDREQ REQUEST FOR RESTART OF A FRAGMENT */
- /* SENDER: LQH, LEVEL B */
- /* ENTER SR_FRAGIDREQ WITH */
- /* TUSERPTR, LQH CONNECTION PTR */
- /* TUSERBLOCKREF, LQH BLOCK REFERENCE */
- /* TCHECKPOINTID, THE CHECKPOINT NUMBER TO USE */
- /* (E.G. 1,2 OR 3) */
- /* TABPTR, TABLE ID = TABLE RECORD POINTER */
- /* TFID, ROOT FRAGMENT ID */
- /* ******************--------------------------------------------------------------- */
- /* ******************--------------------------------------------------------------- */
- /* SR_FRAGIDREQ REQUEST FOR LIST OF STOPED OPERATION */
- /* ******************------------------------------+ */
- /* SENDER: LQH, LEVEL B */
- void Dbacc::execSR_FRAGIDREQ(Signal* signal)
- {
- jamEntry();
- tuserptr = signal->theData[0]; /* LQH CONNECTION PTR */
- tuserblockref = signal->theData[1]; /* LQH BLOCK REFERENCE */
- tcheckpointid = signal->theData[2]; /* THE CHECKPOINT NUMBER TO USE */
- /* (E.G. 1,2 OR 3) */
- tabptr.i = signal->theData[3];
- ptrCheckGuard(tabptr, ctablesize, tabrec);
- /* TABLE ID = TABLE RECORD POINTER */
- tfid = signal->theData[4]; /* ROOT FRAGMENT ID */
- tresult = 0; /* 0= FALSE,1= TRUE,> ZLIMIT_OF_ERROR =ERRORCODE */
- seizeLcpConnectRec(signal);
- initLcpConnRec(signal);
- ndbrequire(getrootfragmentrec(signal, rootfragrecptr, tfid));
- rootfragrecptr.p->lcpPtr = lcpConnectptr.i;
- lcpConnectptr.p->rootrecptr = rootfragrecptr.i;
- lcpConnectptr.p->localCheckPid = tcheckpointid;
- for (Uint32 i = 0; i < 2; i++) {
- Page8Ptr zeroPagePtr;
- jam();
- fragrecptr.i = rootfragrecptr.p->fragmentptr[i];
- ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- seizeLcpPage(zeroPagePtr);
- fragrecptr.p->zeroPagePtr = zeroPagePtr.i;
- }//for
- /* ---------------------------OPEN THE DATA FILE WHICH BELONGS TO TFID AND TCHECK POINT ---- */
- fragrecptr.i = rootfragrecptr.p->fragmentptr[0];
- ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- tfid = rootfragrecptr.p->fragmentid[0];
- tmp = 0;
- srOpenDataFileLoopLab(signal);
-
- return;
- }//Dbacc::execSR_FRAGIDREQ()
- void Dbacc::srOpenDataFileLoopLab(Signal* signal)
- {
- /* D6 AT FSOPENREQ. FILE TYPE = .DATA */
- tmp1 = 0x010003ff; /* VERSION OF FILENAME = 1 */
- tmp2 = 0x0; /* D7 DON'T CREATE, READ ONLY */
- ndbrequire(cfsFirstfreeconnect != RNIL);
- seizeFsConnectRec(signal);
- fragrecptr.p->fsConnPtr = fsConnectptr.i;
- fsConnectptr.p->fragrecPtr = fragrecptr.i;
- fsConnectptr.p->fsState = WAIT_OPEN_DATA_FILE_FOR_READ;
- fsConnectptr.p->activeFragId = tmp; /* LOCAL FRAG INDEX */
- /* ************************ */
- /* FSOPENREQ */
- /* ************************ */
- signal->theData[0] = cownBlockref;
- signal->theData[1] = fsConnectptr.i;
- signal->theData[2] = rootfragrecptr.p->mytabptr; /* TABLE IDENTITY */
- signal->theData[3] = tfid; /* FRAGMENT IDENTITY */
- signal->theData[4] = lcpConnectptr.p->localCheckPid; /* CHECKPOINT ID */
- signal->theData[5] = tmp1;
- signal->theData[6] = tmp2;
- sendSignal(NDBFS_REF, GSN_FSOPENREQ, signal, 7, JBA);
- return;
- }//Dbacc::srOpenDataFileLoopLab()
- void Dbacc::srFsOpenConfLab(Signal* signal)
- {
- fsConnectptr.p->fsState = WAIT_READ_PAGE_ZERO;
- /* ------------------------ READ ZERO PAGE ---------- */
- fragrecptr.i = fsConnectptr.p->fragrecPtr;
- ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- signal->theData[0] = fsConnectptr.p->fsPtr;
- signal->theData[1] = cownBlockref;
- signal->theData[2] = fsConnectptr.i;
- signal->theData[3] = 0x0;
- /* FLAG = LIST MEM PAGES, LIST FILE PAGES */
- signal->theData[4] = ZPAGE8_BASE_ADD;
- signal->theData[5] = 1; /* NO OF PAGES */
- signal->theData[6] = fragrecptr.p->zeroPagePtr; /* ZERO PAGE */
- signal->theData[7] = 0; /* PAGE ZERO OF THE DATA FILE */
- sendSignal(NDBFS_REF, GSN_FSREADREQ, signal, 8, JBA);
- return;
- }//Dbacc::srFsOpenConfLab()
- void Dbacc::srReadPageZeroLab(Signal* signal)
- {
- Page8Ptr srzPageptr;
- rootfragrecptr.i = fragrecptr.p->myroot;
- ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
- fragrecptr.p->activeDataFilePage = 1;
- srzPageptr.i = fragrecptr.p->zeroPagePtr;
- ptrCheckGuard(srzPageptr, cpagesize, page8);
- /* --------------------------------------------------------------------------------- */
- // Check that the checksum of the zero page is ok.
- /* --------------------------------------------------------------------------------- */
- ccoPageptr.p = srzPageptr.p;
- checksumControl(signal, (Uint32)0);
- if (tresult > 0) {
- jam();
- return; // We will crash through a DEBUG_SIG
- }//if
- ndbrequire(srzPageptr.p->word32[ZPAGEZERO_FRAGID0] == rootfragrecptr.p->fragmentid[0]);
- lcpConnectptr.i = rootfragrecptr.p->lcpPtr;
- ptrCheckGuard(lcpConnectptr, clcpConnectsize, lcpConnectrec);
- if (fsConnectptr.p->activeFragId == 0) {
- jam();
- rootfragrecptr.p->fragmentid[1] = srzPageptr.p->word32[ZPAGEZERO_FRAGID1];
- /* ---------------------------OPEN THE DATA FILE FOR NEXT LOCAL FRAGMENT ----------- ---- */
- tfid = rootfragrecptr.p->fragmentid[1];
- tmp = 1; /* LOCAL FRAG INDEX */
- fragrecptr.i = rootfragrecptr.p->fragmentptr[1];
- ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- srOpenDataFileLoopLab(signal);
- return;
- } else {
- jam();
- lcpConnectptr.p->lcpstate = LCP_ACTIVE;
- signal->theData[0] = lcpConnectptr.p->lcpUserptr;
- signal->theData[1] = lcpConnectptr.i;
- signal->theData[2] = 2; /* NO OF LOCAL FRAGMENTS */
- signal->theData[3] = srzPageptr.p->word32[ZPAGEZERO_FRAGID0];
- /* ROOTFRAGRECPTR:FRAGMENTID(0) */
- signal->theData[4] = srzPageptr.p->word32[ZPAGEZERO_FRAGID1];
- /* ROOTFRAGRECPTR:FRAGMENTID(1) */
- signal->theData[5] = RNIL;
- signal->theData[6] = RNIL;
- signal->theData[7] = rootfragrecptr.p->fragmentptr[0];
- signal->theData[8] = rootfragrecptr.p->fragmentptr[1];
- signal->theData[9] = srzPageptr.p->word32[ZPAGEZERO_HASH_CHECK];
- sendSignal(lcpConnectptr.p->lcpUserblockref, GSN_SR_FRAGIDCONF, signal, 10, JBB);
- }//if
- return;
- }//Dbacc::srReadPageZeroLab()
- void Dbacc::initFragAdd(Signal* signal,
- Uint32 rootFragIndex,
- Uint32 rootIndex,
- FragmentrecPtr regFragPtr)
- {
- const AccFragReq * const req = (AccFragReq*)&signal->theData[0];
- Uint32 lhFragBits = req->lhFragBits + 1;
- Uint32 minLoadFactor = (req->minLoadFactor * ZBUF_SIZE) / 100;
- Uint32 maxLoadFactor = (req->maxLoadFactor * ZBUF_SIZE) / 100;
- if (minLoadFactor >= maxLoadFactor) {
- jam();
- minLoadFactor = maxLoadFactor - 1;
- }//if
- regFragPtr.p->fragState = ACTIVEFRAG;
- // NOTE: next line must match calculation in Dblqh::execLQHFRAGREQ
- regFragPtr.p->myfid = (rootFragIndex << (lhFragBits - 1)) | req->fragId;
- regFragPtr.p->myroot = rootIndex;
- regFragPtr.p->myTableId = req->tableId;
- ndbrequire(req->kValue == 6);
- regFragPtr.p->k = req->kValue; /* TK_SIZE = 6 IN THIS VERSION */
- regFragPtr.p->expandCounter = 0;
- /**
- * Only allow shrink during SR
- * - to make sure we don't run out of pages during REDO log execution
- *
- * Is later restored to 0 by LQH at end of REDO log execution
- */
- regFragPtr.p->expandFlag = (getNodeState().getSystemRestartInProgress()?1:0);
- regFragPtr.p->p = 0;
- regFragPtr.p->maxp = (1 << req->kValue) - 1;
- regFragPtr.p->minloadfactor = minLoadFactor;
- regFragPtr.p->maxloadfactor = maxLoadFactor;
- regFragPtr.p->slack = (regFragPtr.p->maxp + 1) * maxLoadFactor;
- regFragPtr.p->lhfragbits = lhFragBits;
- regFragPtr.p->lhdirbits = 0;
- regFragPtr.p->hashcheckbit = 0; //lhFragBits;
- regFragPtr.p->localkeylen = req->localKeyLen;
- regFragPtr.p->nodetype = (req->reqInfo >> 4) & 0x3;
- regFragPtr.p->lastOverIndex = 0;
- regFragPtr.p->dirsize = 1;
- regFragPtr.p->loadingFlag = ZFALSE;
- regFragPtr.p->keyLength = req->keyLength;
- if (req->keyLength == 0) {
- jam();
- regFragPtr.p->elementLength = (1 + ZELEM_HEAD_SIZE) + regFragPtr.p->localkeylen;
- } else {
- jam();
- regFragPtr.p->elementLength = (ZELEM_HEAD_SIZE + regFragPtr.p->localkeylen) + regFragPtr.p->keyLength;
- }//if
- Uint32 Tmp1 = (regFragPtr.p->maxp + 1) + regFragPtr.p->p;
- Uint32 Tmp2 = regFragPtr.p->maxloadfactor - regFragPtr.p->minloadfactor;
- Tmp2 = Tmp1 * Tmp2;
- regFragPtr.p->slackCheck = Tmp2;
- }//Dbacc::initFragAdd()
- void Dbacc::initFragGeneral(FragmentrecPtr regFragPtr)
- {
- regFragPtr.p->directory = RNIL;
- regFragPtr.p->overflowdir = RNIL;
- regFragPtr.p->fsConnPtr = RNIL;
- regFragPtr.p->firstOverflowRec = RNIL;
- regFragPtr.p->lastOverflowRec = RNIL;
- regFragPtr.p->firstWaitInQueOp = RNIL;
- regFragPtr.p->lastWaitInQueOp = RNIL;
- regFragPtr.p->sentWaitInQueOp = RNIL;
- regFragPtr.p->lockOwnersList = RNIL;
- regFragPtr.p->firstFreeDirindexRec = RNIL;
- regFragPtr.p->zeroPagePtr = RNIL;
- regFragPtr.p->activeDataPage = 0;
- regFragPtr.p->createLcp = ZFALSE;
- regFragPtr.p->stopQueOp = ZFALSE;
- regFragPtr.p->nextAllocPage = 0;
- regFragPtr.p->nrWaitWriteUndoExit = 0;
- regFragPtr.p->lastUndoIsStored = ZFALSE;
- regFragPtr.p->loadingFlag = ZFALSE;
- regFragPtr.p->fragState = FREEFRAG;
- for (Uint32 i = 0; i < ZWRITEPAGESIZE; i++) {
- regFragPtr.p->datapages[i] = RNIL;
- }//for
- for (Uint32 j = 0; j < 4; j++) {
- regFragPtr.p->longKeyPageArray[j] = RNIL;
- }//for
- }//Dbacc::initFragGeneral()
- void Dbacc::initFragSr(FragmentrecPtr regFragPtr, Page8Ptr regPagePtr)
- {
- regFragPtr.p->prevUndoposition = regPagePtr.p->word32[ZPAGEZERO_PREV_UNDOP];
- regFragPtr.p->noOfStoredOverPages = regPagePtr.p->word32[ZPAGEZERO_NO_OVER_PAGE];
- regFragPtr.p->noStoredPages = regPagePtr.p->word32[ZPAGEZERO_NO_PAGES];
- regFragPtr.p->dirsize = regPagePtr.p->word32[ZPAGEZERO_DIRSIZE];
- regFragPtr.p->expandCounter = regPagePtr.p->word32[ZPAGEZERO_EXPCOUNTER];
- regFragPtr.p->slack = regPagePtr.p->word32[ZPAGEZERO_SLACK];
- regFragPtr.p->hashcheckbit = regPagePtr.p->word32[ZPAGEZERO_HASHCHECKBIT];
- regFragPtr.p->k = regPagePtr.p->word32[ZPAGEZERO_K];
- regFragPtr.p->lhfragbits = regPagePtr.p->word32[ZPAGEZERO_LHFRAGBITS];
- regFragPtr.p->lhdirbits = regPagePtr.p->word32[ZPAGEZERO_LHDIRBITS];
- regFragPtr.p->localkeylen = regPagePtr.p->word32[ZPAGEZERO_LOCALKEYLEN];
- regFragPtr.p->maxp = regPagePtr.p->word32[ZPAGEZERO_MAXP];
- regFragPtr.p->maxloadfactor = regPagePtr.p->word32[ZPAGEZERO_MAXLOADFACTOR];
- regFragPtr.p->minloadfactor = regPagePtr.p->word32[ZPAGEZERO_MINLOADFACTOR];
- regFragPtr.p->myfid = regPagePtr.p->word32[ZPAGEZERO_MYFID];
- regFragPtr.p->lastOverIndex = regPagePtr.p->word32[ZPAGEZERO_LAST_OVER_INDEX];
- regFragPtr.p->nodetype = regPagePtr.p->word32[ZPAGEZERO_NODETYPE];
- regFragPtr.p->p = regPagePtr.p->word32[ZPAGEZERO_P];
- regFragPtr.p->elementLength = regPagePtr.p->word32[ZPAGEZERO_ELEMENT_LENGTH];
- regFragPtr.p->keyLength = regPagePtr.p->word32[ZPAGEZERO_KEY_LENGTH];
- regFragPtr.p->slackCheck = regPagePtr.p->word32[ZPAGEZERO_SLACK_CHECK];
- regFragPtr.p->loadingFlag = ZTRUE;
- }//Dbacc::initFragSr()
- void Dbacc::initFragPageZero(FragmentrecPtr regFragPtr, Page8Ptr regPagePtr)
- {
- //------------------------------------------------------------------
- // PREV_UNDOP, NEXT_UNDO_FILE, NO_OVER_PAGE, NO_PAGES
- // is set at end of copy phase
- //------------------------------------------------------------------
- regPagePtr.p->word32[ZPAGEZERO_DIRSIZE] = regFragPtr.p->dirsize;
- regPagePtr.p->word32[ZPAGEZERO_EXPCOUNTER] = regFragPtr.p->expandCounter;
- regPagePtr.p->word32[ZPAGEZERO_SLACK] = regFragPtr.p->slack;
- regPagePtr.p->word32[ZPAGEZERO_HASHCHECKBIT] = regFragPtr.p->hashcheckbit;
- regPagePtr.p->word32[ZPAGEZERO_K] = regFragPtr.p->k;
- regPagePtr.p->word32[ZPAGEZERO_LHFRAGBITS] = regFragPtr.p->lhfragbits;
- regPagePtr.p->word32[ZPAGEZERO_LHDIRBITS] = regFragPtr.p->lhdirbits;
- regPagePtr.p->word32[ZPAGEZERO_LOCALKEYLEN] = regFragPtr.p->localkeylen;
- regPagePtr.p->word32[ZPAGEZERO_MAXP] = regFragPtr.p->maxp;
- regPagePtr.p->word32[ZPAGEZERO_MAXLOADFACTOR] = regFragPtr.p->maxloadfactor;
- regPagePtr.p->word32[ZPAGEZERO_MINLOADFACTOR] = regFragPtr.p->minloadfactor;
- regPagePtr.p->word32[ZPAGEZERO_MYFID] = regFragPtr.p->myfid;
- regPagePtr.p->word32[ZPAGEZERO_LAST_OVER_INDEX] = regFragPtr.p->lastOverIndex;
- regPagePtr.p->word32[ZPAGEZERO_NODETYPE] = regFragPtr.p->nodetype;
- regPagePtr.p->word32[ZPAGEZERO_P] = regFragPtr.p->p;
- regPagePtr.p->word32[ZPAGEZERO_ELEMENT_LENGTH] = regFragPtr.p->elementLength;
- regPagePtr.p->word32[ZPAGEZERO_KEY_LENGTH] = regFragPtr.p->keyLength;
- regPagePtr.p->word32[ZPAGEZERO_SLACK_CHECK] = regFragPtr.p->slackCheck;
- }//Dbacc::initFragPageZero()
- void Dbacc::initRootFragPageZero(RootfragmentrecPtr rootPtr, Page8Ptr regPagePtr)
- {
- regPagePtr.p->word32[ZPAGEZERO_TABID] = rootPtr.p->mytabptr;
- regPagePtr.p->word32[ZPAGEZERO_FRAGID0] = rootPtr.p->fragmentid[0];
- regPagePtr.p->word32[ZPAGEZERO_FRAGID1] = rootPtr.p->fragmentid[1];
- regPagePtr.p->word32[ZPAGEZERO_HASH_CHECK] = rootPtr.p->roothashcheck;
- regPagePtr.p->word32[ZPAGEZERO_NO_OF_ELEMENTS] = rootPtr.p->noOfElements;
- }//Dbacc::initRootFragPageZero()
- void Dbacc::initRootFragSr(RootfragmentrecPtr rootPtr, Page8Ptr regPagePtr)
- {
- rootPtr.p->roothashcheck = regPagePtr.p->word32[ZPAGEZERO_HASH_CHECK];
- rootPtr.p->noOfElements = regPagePtr.p->word32[ZPAGEZERO_NO_OF_ELEMENTS];
- }//Dbacc::initRootFragSr()
- /* ******************--------------------------------------------------------------- */
- /* ACC_SRREQ SYSTEM RESTART OF A LOCAL CHECK POINT */
- /* SENDER: LQH, LEVEL B */
- /* ENTER ACC_SRREQ WITH */
- /* LCP_CONNECTPTR, OPERATION RECORD PTR */
- /* TMP2, LQH'S LOCAL FRAG CHECK VALUE */
- /* TFID, LOCAL FRAG ID */
- /* TMP1, LOCAL CHECKPOINT ID */
- /* ******************--------------------------------------------------------------- */
- /* ******************--------------------------------------------------------------- */
- /* ACC_SRREQ PERFORM A LOCAL CHECK POINT */
- /* ******************------------------------------+ */
- /* SENDER: LQH, LEVEL B */
- void Dbacc::execACC_SRREQ(Signal* signal)
- {
- Page8Ptr asrPageidptr;
- jamEntry();
- lcpConnectptr.i = signal->theData[0];
- ptrCheckGuard(lcpConnectptr, clcpConnectsize, lcpConnectrec);
- Uint32 lqhPtr = signal->theData[1];
- Uint32 fragId = signal->theData[2];
- Uint32 lcpId = signal->theData[3];
- tresult = 0;
- ndbrequire(lcpConnectptr.p->lcpstate == LCP_ACTIVE);
- rootfragrecptr.i = lcpConnectptr.p->rootrecptr;
- ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
- if (rootfragrecptr.p->fragmentid[0] == fragId) {
- jam();
- fragrecptr.i = rootfragrecptr.p->fragmentptr[0];
- } else {
- ndbrequire(rootfragrecptr.p->fragmentid[1] == fragId);
- jam();
- fragrecptr.i = rootfragrecptr.p->fragmentptr[1];
- }//if
- ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- fragrecptr.p->lcpLqhPtr = lqhPtr;
- fragrecptr.p->localCheckpId = lcpId;
- asrPageidptr.i = fragrecptr.p->zeroPagePtr;
- ptrCheckGuard(asrPageidptr, cpagesize, page8);
- ndbrequire(asrPageidptr.p->word32[ZPAGEZERO_TABID] == rootfragrecptr.p->mytabptr);
- ndbrequire(asrPageidptr.p->word32[ZPAGEZERO_FRAGID0] == rootfragrecptr.p->fragmentid[0]);
- ndbrequire(asrPageidptr.p->word32[ZPAGEZERO_FRAGID1] == rootfragrecptr.p->fragmentid[1]);
- initRootFragSr(rootfragrecptr, asrPageidptr);
- initFragSr(fragrecptr, asrPageidptr);
- for (Uint32 i = 0; i < ZMAX_UNDO_VERSION; i++) {
- jam();
- if (csrVersList[i] != RNIL) {
- jam();
- srVersionPtr.i = csrVersList[i];
- ptrCheckGuard(srVersionPtr, csrVersionRecSize, srVersionRec);
- if (fragrecptr.p->localCheckpId == srVersionPtr.p->checkPointId) {
- jam();
- ndbrequire(srVersionPtr.p->checkPointId == asrPageidptr.p->word32[ZPAGEZERO_NEXT_UNDO_FILE]);
- /*--------------------------------------------------------------------------------*/
- /* SINCE -1 IS THE END OF LOG CODE WE MUST TREAT THIS CODE WITH CARE. WHEN */
- /* COMPARING IT IS LARGER THAN EVERYTHING ELSE BUT SHOULD BE TREATED AS THE */
- /* SMALLEST POSSIBLE VALUE, MEANING EMPTY. */
- /*--------------------------------------------------------------------------------*/
- if (fragrecptr.p->prevUndoposition != cminusOne) {
- if (srVersionPtr.p->prevAddress < fragrecptr.p->prevUndoposition) {
- jam();
- srVersionPtr.p->prevAddress = fragrecptr.p->prevUndoposition;
- } else if (srVersionPtr.p->prevAddress == cminusOne) {
- jam();
- srVersionPtr.p->prevAddress = fragrecptr.p->prevUndoposition;
- }//if
- }//if
- srAllocPage0011Lab(signal);
- return;
- }//if
- } else {
- jam();
- seizeSrVerRec(signal);
- srVersionPtr.p->checkPointId = fragrecptr.p->localCheckpId;
- srVersionPtr.p->prevAddress = fragrecptr.p->prevUndoposition;
- csrVersList[i] = srVersionPtr.i;
- srAllocPage0011Lab(signal);
- return;
- }//if
- }//for
- ndbrequire(false);
- }//Dbacc::execACC_SRREQ()
- void
- Dbacc::releaseLogicalPage(Fragmentrec * fragP, Uint32 logicalPageId){
- Ptr<struct DirRange> dirRangePtr;
- dirRangePtr.i = fragP->directory;
- ptrCheckGuard(dirRangePtr, cdirrangesize, dirRange);
- const Uint32 lp1 = logicalPageId >> 8;
- const Uint32 lp2 = logicalPageId & 0xFF;
- ndbrequire(lp1 < 256);
- Ptr<struct Directoryarray> dirArrPtr;
- dirArrPtr.i = dirRangePtr.p->dirArray[lp1];
- ptrCheckGuard(dirArrPtr, cdirarraysize, directoryarray);
- const Uint32 physicalPageId = dirArrPtr.p->pagep[lp2];
-
- rpPageptr.i = physicalPageId;
- ptrCheckGuard(rpPageptr, cpagesize, page8);
- releasePage(0);
- dirArrPtr.p->pagep[lp2] = RNIL;
- }
- void Dbacc::srAllocPage0011Lab(Signal* signal)
- {
- releaseLogicalPage(fragrecptr.p, 0);
- #if JONAS
- ndbrequire(cfirstfreeDirrange != RNIL);
- seizeDirrange(signal);
- fragrecptr.p->directory = newDirRangePtr.i;
- ndbrequire(cfirstfreeDirrange != RNIL);
- seizeDirrange(signal);
- fragrecptr.p->overflowdir = newDirRangePtr.i;
- seizeDirectory(signal);
- ndbrequire(tresult < ZLIMIT_OF_ERROR);
- newDirRangePtr.p->dirArray[0] = sdDirptr.i;
- #endif
- fragrecptr.p->nextAllocPage = 0;
- fragrecptr.p->fragState = SR_READ_PAGES;
- srReadPagesLab(signal);
- return;
- }//Dbacc::srAllocPage0011Lab()
- void Dbacc::srReadPagesLab(Signal* signal)
- {
- if (fragrecptr.p->nextAllocPage >= fragrecptr.p->noStoredPages) {
- /*--------------------------------------------------------------------------------*/
- /* WE HAVE NOW READ ALL NORMAL PAGES FROM THE FILE. */
- /*--------------------------------------------------------------------------------*/
- if (fragrecptr.p->nextAllocPage == fragrecptr.p->dirsize) {
- jam();
- /*--------------------------------------------------------------------------------*/
- /* WE HAVE NOW READ ALL NORMAL PAGES AND ALLOCATED ALL THE NEEDED PAGES. */
- /*--------------------------------------------------------------------------------*/
- fragrecptr.p->nextAllocPage = 0; /* THE NEXT OVER FLOW PAGE WHICH WILL BE READ */
- fragrecptr.p->fragState = SR_READ_OVER_PAGES;
- srReadOverPagesLab(signal);
- } else {
- ndbrequire(fragrecptr.p->nextAllocPage < fragrecptr.p->dirsize);
- jam();
- /*--------------------------------------------------------------------------------*/
- /* WE NEEDED TO ALLOCATE PAGES THAT WERE DEALLOCATED DURING THE LOCAL */
- /* CHECKPOINT. */
- /* ALLOCATE THE PAGE AND INITIALISE IT. THEN WE INSERT A REAL-TIME BREAK. */
- /*--------------------------------------------------------------------------------*/
- seizePage(signal);
- ndbrequire(tresult <= ZLIMIT_OF_ERROR);
- tipPageId = fragrecptr.p->nextAllocPage;
- inpPageptr.i = spPageptr.i;
- ptrCheckGuard(inpPageptr, cpagesize, page8);
- initPage(signal);
- fragrecptr.p->noOfExpectedPages = 1;
- fragrecptr.p->datapages[0] = spPageptr.i;
- signal->theData[0] = ZSR_READ_PAGES_ALLOC;
- signal->theData[1] = fragrecptr.i;
- sendSignal(cownBlockref, GSN_CONTINUEB, signal, 2, JBB);
- }//if
- return;
- }//if
- Uint32 limitLoop;
- if ((fragrecptr.p->noStoredPages - fragrecptr.p->nextAllocPage) < ZWRITEPAGESIZE) {
- jam();
- limitLoop = fragrecptr.p->noStoredPages - fragrecptr.p->nextAllocPage;
- } else {
- jam();
- limitLoop = ZWRITEPAGESIZE;
- }//if
- ndbrequire(limitLoop <= 8);
- for (Uint32 i = 0; i < limitLoop; i++) {
- jam();
- seizePage(signal);
- ndbrequire(tresult <= ZLIMIT_OF_ERROR);
- fragrecptr.p->datapages[i] = spPageptr.i;
- signal->theData[i + 6] = spPageptr.i;
- }//for
- signal->theData[limitLoop + 6] = fragrecptr.p->activeDataFilePage;
- fragrecptr.p->noOfExpectedPages = limitLoop;
- /* -----------------SEND READ PAGES SIGNAL TO THE FILE MANAGER --------- */
- fsConnectptr.i = fragrecptr.p->fsConnPtr;
- ptrCheckGuard(fsConnectptr, cfsConnectsize, fsConnectrec);
- fsConnectptr.p->fsState = WAIT_READ_DATA;
- signal->theData[0] = fsConnectptr.p->fsPtr;
- signal->theData[1] = cownBlockref;
- signal->theData[2] = fsConnectptr.i;
- signal->theData[3] = 2;
- /* FLAG = LIST MEM PAGES, RANGE OF FILE PAGES */
- signal->theData[4] = ZPAGE8_BASE_ADD;
- signal->theData[5] = fragrecptr.p->noOfExpectedPages;
- sendSignal(NDBFS_REF, GSN_FSREADREQ, signal, 15, JBA);
- return;
- }//Dbacc::srReadPagesLab()
- void Dbacc::storeDataPageInDirectoryLab(Signal* signal)
- {
- fragrecptr.p->activeDataFilePage += fragrecptr.p->noOfExpectedPages;
- srReadPagesAllocLab(signal);
- return;
- }//Dbacc::storeDataPageInDirectoryLab()
- void Dbacc::srReadPagesAllocLab(Signal* signal)
- {
- DirRangePtr srpDirRangePtr;
- DirectoryarrayPtr srpDirptr;
- DirectoryarrayPtr srpOverflowDirptr;
- Page8Ptr srpPageidptr;
- if (fragrecptr.p->fragState == SR_READ_PAGES) {
- jam();
- for (Uint32 i = 0; i < fragrecptr.p->noOfExpectedPages; i++) {
- jam();
- tmpP = fragrecptr.p->nextAllocPage;
- srpDirRangePtr.i = fragrecptr.p->directory;
- tmpP2 = tmpP >> 8;
- tmp = tmpP & 0xff;
- ptrCheckGuard(srpDirRangePtr, cdirrangesize, dirRange);
- arrGuard(tmpP2, 256);
- if (srpDirRangePtr.p->dirArray[tmpP2] == RNIL) {
- seizeDirectory(signal);
- ndbrequire(tresult <= ZLIMIT_OF_ERROR);
- srpDirptr.i = sdDirptr.i;
- srpDirRangePtr.p->dirArray[tmpP2] = srpDirptr.i;
- } else {
- jam();
- srpDirptr.i = srpDirRangePtr.p->dirArray[tmpP2];
- }//if
- ptrCheckGuard(srpDirptr, cdirarraysize, directoryarray);
- arrGuard(i, 8);
- srpDirptr.p->pagep[tmp] = fragrecptr.p->datapages[i];
- srpPageidptr.i = fragrecptr.p->datapages[i];
- ptrCheckGuard(srpPageidptr, cpagesize, page8);
- ndbrequire(srpPageidptr.p->word32[ZPOS_PAGE_ID] == fragrecptr.p->nextAllocPage);
- ndbrequire(((srpPageidptr.p->word32[ZPOS_PAGE_TYPE] >> ZPOS_PAGE_TYPE_BIT) & 3) == 0);
- ccoPageptr.p = srpPageidptr.p;
- checksumControl(signal, (Uint32)1);
- if (tresult > 0) {
- jam();
- return; // We will crash through a DEBUG_SIG
- }//if
- dbgWord32(srpPageidptr, ZPOS_OVERFLOWREC, RNIL);
- srpPageidptr.p->word32[ZPOS_OVERFLOWREC] = RNIL;
- fragrecptr.p->datapages[i] = RNIL;
- fragrecptr.p->nextAllocPage++;
- }//for
- srReadPagesLab(signal);
- return;
- } else {
- ndbrequire(fragrecptr.p->fragState == SR_READ_OVER_PAGES);
- for (Uint32 i = 0; i < fragrecptr.p->noOfExpectedPages; i++) {
- jam();
- arrGuard(i, 8);
- srpPageidptr.i = fragrecptr.p->datapages[i];
- ptrCheckGuard(srpPageidptr, cpagesize, page8);
- tmpP = srpPageidptr.p->word32[ZPOS_PAGE_ID]; /* DIR INDEX OF THE OVERFLOW PAGE */
- /*--------------------------------------------------------------------------------*/
- /* IT IS POSSIBLE THAT WE HAVE LOGICAL PAGES WHICH ARE NOT PART OF THE LOCAL*/
- /* CHECKPOINT. THUS WE USE THE LOGICAL PAGE ID FROM THE PAGE HERE. */
- /*--------------------------------------------------------------------------------*/
- srpDirRangePtr.i = fragrecptr.p->overflowdir;
- tmpP2 = tmpP >> 8;
- tmpP = tmpP & 0xff;
- ptrCheckGuard(srpDirRangePtr, cdirrangesize, dirRange);
- arrGuard(tmpP2, 256);
- if (srpDirRangePtr.p->dirArray[tmpP2] == RNIL) {
- jam();
- seizeDirectory(signal);
- ndbrequire(tresult <= ZLIMIT_OF_ERROR);
- srpDirRangePtr.p->dirArray[tmpP2] = sdDirptr.i;
- }//if
- srpOverflowDirptr.i = srpDirRangePtr.p->dirArray[tmpP2];
- ndbrequire(((srpPageidptr.p->word32[ZPOS_PAGE_TYPE] >> ZPOS_PAGE_TYPE_BIT) & 3) != 0);
- ndbrequire(((srpPageidptr.p->word32[ZPOS_PAGE_TYPE] >> ZPOS_PAGE_TYPE_BIT) & 3) != 3);
- ptrCheckGuard(srpOverflowDirptr, cdirarraysize, directoryarray);
- ndbrequire(srpOverflowDirptr.p->pagep[tmpP] == RNIL);
- srpOverflowDirptr.p->pagep[tmpP] = srpPageidptr.i;
- ccoPageptr.p = srpPageidptr.p;
- checksumControl(signal, (Uint32)1);
- ndbrequire(tresult == 0);
- dbgWord32(srpPageidptr, ZPOS_OVERFLOWREC, RNIL);
- srpPageidptr.p->word32[ZPOS_OVERFLOWREC] = RNIL;
- fragrecptr.p->nextAllocPage++;
- }//for
- srReadOverPagesLab(signal);
- return;
- }//if
- }//Dbacc::srReadPagesAllocLab()
- void Dbacc::srReadOverPagesLab(Signal* signal)
- {
- if (fragrecptr.p->nextAllocPage >= fragrecptr.p->noOfStoredOverPages) {
- fragrecptr.p->nextAllocPage = 0;
- if (fragrecptr.p->prevUndoposition == cminusOne) {
- jam();
- /* ************************ */
- /* ACC_OVER_REC */
- /* ************************ */
- /*--------------------------------------------------------------------------------*/
- /* UPDATE FREE LIST OF OVERFLOW PAGES AS PART OF SYSTEM RESTART AFTER */
- /* READING PAGES AND EXECUTING THE UNDO LOG. */
- /*--------------------------------------------------------------------------------*/
- signal->theData[0] = fragrecptr.i;
- sendSignal(cownBlockref, GSN_ACC_OVER_REC, signal, 1, JBB);
- } else {
- jam();
- srCloseDataFileLab(signal);
- }//if
- return;
- }//if
- Uint32 limitLoop;
- if ((fragrecptr.p->noOfStoredOverPages - fragrecptr.p->nextAllocPage) < ZWRITEPAGESIZE) {
- jam();
- limitLoop = fragrecptr.p->noOfStoredOverPages - fragrecptr.p->nextAllocPage;
- } else {
- jam();
- limitLoop = ZWRITEPAGESIZE;
- }//if
- ndbrequire(limitLoop <= 8);
- for (Uint32 i = 0; i < limitLoop; i++) {
- jam();
- seizePage(signal);
- ndbrequire(tresult <= ZLIMIT_OF_ERROR);
- fragrecptr.p->datapages[i] = spPageptr.i;
- signal->theData[i + 6] = spPageptr.i;
- }//for
- fragrecptr.p->noOfExpectedPages = limitLoop;
- signal->theData[limitLoop + 6] = fragrecptr.p->activeDataFilePage;
- /* -----------------SEND READ PAGES SIGNAL TO THE FILE MANAGER --------- */
- fsConnectptr.i = fragrecptr.p->fsConnPtr;
- ptrCheckGuard(fsConnectptr, cfsConnectsize, fsConnectrec);
- fsConnectptr.p->fsState = WAIT_READ_DATA;
- signal->theData[0] = fsConnectptr.p->fsPtr;
- signal->theData[1] = cownBlockref;
- signal->theData[2] = fsConnectptr.i;
- signal->theData[3] = 2;
- signal->theData[4] = ZPAGE8_BASE_ADD;
- signal->theData[5] = fragrecptr.p->noOfExpectedPages;
- sendSignal(NDBFS_REF, GSN_FSREADREQ, signal, 15, JBA);
- return;
- }//Dbacc::srReadOverPagesLab()
- void Dbacc::srCloseDataFileLab(Signal* signal)
- {
- fsConnectptr.i = fragrecptr.p->fsConnPtr;
- ptrCheckGuard(fsConnectptr, cfsConnectsize, fsConnectrec);
- fsConnectptr.p->fsState = SR_CLOSE_DATA;
- /* ************************ */
- /* FSCLOSEREQ */
- /* ************************ */
- signal->theData[0] = fsConnectptr.p->fsPtr;
- signal->theData[1] = cownBlockref;
- signal->theData[2] = fsConnectptr.i;
- signal->theData[3] = 0;
- sendSignal(NDBFS_REF, GSN_FSCLOSEREQ, signal, 4, JBA);
- return;
- }//Dbacc::srCloseDataFileLab()
- /* ************************ */
- /* ACC_SRCONF */
- /* ************************ */
- void Dbacc::sendaccSrconfLab(Signal* signal)
- {
- fragrecptr.i = fsConnectptr.p->fragrecPtr;
- ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- releaseFsConnRec(signal);
- rootfragrecptr.i = fragrecptr.p->myroot;
- ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
- lcpConnectptr.i = rootfragrecptr.p->lcpPtr;
- ptrCheckGuard(lcpConnectptr, clcpConnectsize, lcpConnectrec);
- fragrecptr.p->fragState = ACTIVEFRAG;
- fragrecptr.p->fsConnPtr = RNIL;
- for (Uint32 i = 0; i < ZWRITEPAGESIZE; i++) {
- fragrecptr.p->datapages[i] = RNIL;
- }//for
- rlpPageptr.i = fragrecptr.p->zeroPagePtr;
- ptrCheckGuard(rlpPageptr, cpagesize, page8);
- releaseLcpPage(signal);
- fragrecptr.p->zeroPagePtr = RNIL;
- signal->theData[0] = fragrecptr.p->lcpLqhPtr;
- sendSignal(lcpConnectptr.p->lcpUserblockref, GSN_ACC_SRCONF, signal, 1, JBB);
- lcpConnectptr.p->noOfLcpConf++;
- if (lcpConnectptr.p->noOfLcpConf == 2) {
- jam();
- releaseLcpConnectRec(signal);
- rootfragrecptr.p->lcpPtr = RNIL;
- rootfragrecptr.p->rootState = ACTIVEROOT;
- }//if
- return;
- }//Dbacc::sendaccSrconfLab()
- /* --------------------------------------------------------------------------------- */
- /* CHECKSUM_CONTROL */
- /* INPUT: CCO_PAGEPTR */
- /* OUTPUT: TRESULT */
- /* */
- /* CHECK THAT CHECKSUM IN PAGE IS CORRECT TO ENSURE THAT NO ONE HAS CORRUPTED */
- /* THE PAGE INFORMATION. WHEN CALCULATING THE CHECKSUM WE REMOVE THE CHECKSUM */
- /* ITSELF FROM THE CHECKSUM BY XOR'ING THE CHECKSUM TWICE. WHEN CALCULATING */
- /* THE CHECKSUM THE CHECKSUM WORD IS ZERO WHICH MEANS NO CHANGE FROM XOR'ING. */
- /* --------------------------------------------------------------------------------- */
- void Dbacc::checksumControl(Signal* signal, Uint32 checkPage)
- {
- Uint32 Tchs;
- Uint32 tccoIndex;
- Uint32 Ti;
- Uint32 Tmp1;
- Uint32 Tmp2;
- Uint32 Tmp3;
- Uint32 Tmp4;
- Uint32 Tlimit;
- Tchs = 0;
- for (Ti = 0; Ti < 32 ; Ti++) {
- Tlimit = 16 + (Ti << 6);
- for (tccoIndex = (Ti << 6); tccoIndex < Tlimit; tccoIndex ++) {
- Tmp1 = ccoPageptr.p->word32[tccoIndex];
- Tmp2 = ccoPageptr.p->word32[tccoIndex + 16];
- Tmp3 = ccoPageptr.p->word32[tccoIndex + 32];
- Tmp4 = ccoPageptr.p->word32[tccoIndex + 48];
- Tchs = Tchs ^ Tmp1;
- Tchs = Tchs ^ Tmp2;
- Tchs = Tchs ^ Tmp3;
- Tchs = Tchs ^ Tmp4;
- }//for
- }//for
- if (Tchs == 0) {
- tresult = 0;
- if (checkPage != 0) {
- jam();
- lcnCopyPageptr.p = ccoPageptr.p;
- srCheckPage(signal);
- }//if
- } else {
- tresult = 1;
- }//if
- if (tresult != 0) {
- jam();
- rootfragrecptr.i = fragrecptr.p->myroot;
- ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
- signal->theData[0] = RNIL;
- signal->theData[1] = rootfragrecptr.p->mytabptr;
- signal->theData[2] = fragrecptr.p->myfid;
- signal->theData[3] = ccoPageptr.p->word32[ZPOS_PAGE_ID];
- signal->theData[4] = tlupElemIndex;
- signal->theData[5] = ccoPageptr.p->word32[ZPOS_PAGE_TYPE];
- signal->theData[6] = tresult;
- sendSignal(cownBlockref, GSN_DEBUG_SIG, signal, 7, JBA);
- }//if
- }//Dbacc::checksumControl()
- /* ******************--------------------------------------------------------------- */
- /* START_RECREQ REQUEST TO START UNDO PROCESS */
- /* SENDER: LQH, LEVEL B */
- /* ENTER START_RECREQ WITH */
- /* CLQH_PTR, LQH CONNECTION PTR */
- /* CLQH_BLOCK_REF, LQH BLOCK REFERENCE */
- /* ******************--------------------------------------------------------------- */
- /* ******************--------------------------------------------------------------- */
- /* START_RECREQ REQUEST TO START UNDO PROCESS */
- /* ******************------------------------------+ */
- /* SENDER: LQH, LEVEL B */
- void Dbacc::execSTART_RECREQ(Signal* signal)
- {
- jamEntry();
- clqhPtr = signal->theData[0]; /* LQH CONNECTION PTR */
- clqhBlockRef = signal->theData[1]; /* LQH BLOCK REFERENCE */
- tresult = 0; /* 0= FALSE,1= TRUE,> ZLIMIT_OF_ERROR =ERRORCODE */
- for (int i = 0; i < UndoHeader::ZNO_UNDORECORD_TYPES; i++)
- cSrUndoRecords[i] = 0;
- startUndoLab(signal);
- return;
- }//Dbacc::execSTART_RECREQ()
- void Dbacc::startUndoLab(Signal* signal)
- {
- cundoLogActive = ZTRUE;
- /* ----- OPEN UNDO FILES --------- */
- for (tmp = 0; tmp <= ZMAX_UNDO_VERSION - 1; tmp++) {
- jam();
- if (csrVersList[tmp] != RNIL) {
- jam();
- /*---------------------------------------------------------------------------*/
- /* SELECT THE NEXT SYSTEM RESTART RECORD WHICH CONTAINS AN UNDO LOG */
- /* THAT NEEDS TO BE EXECUTED AND SET UP THE DATA TO EXECUTE IT. */
- /*---------------------------------------------------------------------------*/
- srVersionPtr.i = csrVersList[tmp];
- csrVersList[tmp] = RNIL;
- ptrCheckGuard(srVersionPtr, csrVersionRecSize, srVersionRec);
- cactiveUndoFilePage = srVersionPtr.p->prevAddress >> 13;
- cprevUndoaddress = srVersionPtr.p->prevAddress;
- cactiveCheckpId = srVersionPtr.p->checkPointId;
- releaseSrRec(signal);
- startActiveUndo(signal);
- return;
- }//if
- }//for
- // Send report of how many undo log records where executed
- signal->theData[0] = EventReport::UNDORecordsExecuted;
- signal->theData[1] = DBACC; // From block
- signal->theData[2] = 0; // Total records executed
- for (int i = 0; i < 10; i++){
- if (i < UndoHeader::ZNO_UNDORECORD_TYPES){
- signal->theData[i+3] = cSrUndoRecords[i];
- signal->theData[2] += cSrUndoRecords[i];
- }else{
- signal->theData[i+3] = 0;
- }
- }
- sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 12, JBB);
- /* ******************************< */
- /* START_RECCONF */
- /* ******************************< */
- /*---------------------------------------------------------------------------*/
- /* REPORT COMPLETION OF UNDO LOG EXECUTION. */
- /*---------------------------------------------------------------------------*/
- cundoLogActive = ZFALSE;
- signal->theData[0] = clqhPtr;
- sendSignal(clqhBlockRef, GSN_START_RECCONF, signal, 1, JBB);
- /* LQH CONNECTION PTR */
- return;
- }//Dbacc::startUndoLab()
- /*---------------------------------------------------------------------------*/
- /* START THE UNDO OF AN UNDO LOG FILE BY OPENING THE UNDO LOG FILE. */
- /*---------------------------------------------------------------------------*/
- void Dbacc::startActiveUndo(Signal* signal)
- {
- if (cprevUndoaddress == cminusOne) {
- jam();
- /*---------------------------------------------------------------------------*/
- /* THERE WAS NO UNDO LOG INFORMATION IN THIS LOG FILE. WE GET THE NEXT */
- /* OR REPORT COMPLETION. */
- /*---------------------------------------------------------------------------*/
- signal->theData[0] = ZSTART_UNDO;
- sendSignal(cownBlockref, GSN_CONTINUEB, signal, 1, JBB);
- } else {
- jam();
- /*---------------------------------------------------------------------------*/
- /* OPEN THE LOG FILE PERTAINING TO THIS UNDO LOG. */
- /*---------------------------------------------------------------------------*/
- if (cfsFirstfreeconnect == RNIL) {
- jam();
- sendSystemerror(signal);
- }//if
- seizeFsConnectRec(signal);
- cactiveSrFsPtr = fsConnectptr.i;
- fsConnectptr.p->fsState = OPEN_UNDO_FILE_SR;
- fsConnectptr.p->fsPart = 0;
- tmp1 = 1; /* FILE VERSION ? */
- tmp1 = (tmp1 << 8) + ZLOCALLOGFILE; /* .LOCLOG = 2 */
- tmp1 = (tmp1 << 8) + 4; /* ROOT DIRECTORY = D4 */
- tmp1 = (tmp1 << 8) + fsConnectptr.p->fsPart; /* P2 */
- tmp2 = 0x0; /* D7 DON'T CREATE , READ ONLY */
- /* DON'T TRUNCATE TO ZERO */
- /* ---FILE NAME "D4"/"DBACC"/LCP_CONNECTPTR:LOCAL_CHECK_PID/FS_CONNECTPTR:FS_PART".LOCLOG-- */
- /* ************************ */
- /* FSOPENREQ */
- /* ************************ */
- signal->theData[0] = cownBlockref;
- signal->theData[1] = fsConnectptr.i;
- signal->theData[2] = cminusOne; /* #FFFFFFFF */
- signal->theData[3] = cminusOne; /* #FFFFFFFF */
- signal->theData[4] = cactiveCheckpId; /* CHECKPOINT VERSION */
- signal->theData[5] = tmp1;
- signal->theData[6] = tmp2;
- sendSignal(NDBFS_REF, GSN_FSOPENREQ, signal, 7, JBA);
- }//if
- }//Dbacc::startActiveUndo()
- /* ------- READ A GROUP OF UNDO PAGES --------------- */
- void Dbacc::srStartUndoLab(Signal* signal)
- {
- /*---------------------------------------------------------------------------*/
- /* ALL LOG FILES HAVE BEEN OPENED. WE CAN NOW READ DATA FROM THE LAST */
- /* PAGE IN THE LAST LOG FILE AND BACKWARDS UNTIL WE REACH THE VERY */
- /* FIRST UNDO LOG RECORD. */
- /*---------------------------------------------------------------------------*/
- if (cactiveUndoFilePage >= ZWRITE_UNDOPAGESIZE) {
- jam();
- tmp1 = ZWRITE_UNDOPAGESIZE; /* NO OF READ UNDO PAGES */
- cactiveSrUndoPage = ZWRITE_UNDOPAGESIZE - 1; /* LAST PAGE */
- } else {
- jam();
- tmp1 = cactiveUndoFilePage + 1; /* NO OF READ UNDO PAGES */
- cactiveSrUndoPage = cactiveUndoFilePage;
- }//if
- fsConnectptr.i = cactiveSrFsPtr;
- ptrCheckGuard(fsConnectptr, cfsConnectsize, fsConnectrec);
- signal->theData[0] = fsConnectptr.p->fsPtr;
- signal->theData[1] = cownBlockref;
- signal->theData[2] = fsConnectptr.i;
- signal->theData[3] = 0;
- /* FLAG = LIST MEM PAGES, LIST FILE PAGES */
- signal->theData[4] = ZUNDOPAGE_BASE_ADD;
- signal->theData[5] = tmp1;
- signal->theData[6] = 0;
- signal->theData[7] = (cactiveUndoFilePage - tmp1) + 1;
- signal->theData[8] = 1;
- signal->theData[9] = cactiveUndoFilePage;
- sendSignal(NDBFS_REF, GSN_FSREADREQ, signal, 10, JBA);
- if (tmp1 > cactiveUndoFilePage) {
- jam();
- /*---------------------------------------------------------------------------*/
- /* THIS IS THE LAST READ IN THIS LOG FILE. WE SET THE ACTIVE FILE */
- /* POINTER. IF IT IS THE FIRST WE SHOULD NEVER ATTEMPT ANY MORE READS */
- /* SINCE WE SHOULD ENCOUNTER A FIRST LOG RECORD WITH PREVIOUS PAGE ID */
- /* EQUAL TO RNIL. */
- /*---------------------------------------------------------------------------*/
- cactiveSrFsPtr = RNIL;
- fsConnectptr.p->fsState = READ_UNDO_PAGE_AND_CLOSE;
- } else {
- jam();
- /*---------------------------------------------------------------------------*/
- /* WE STILL HAVE MORE INFORMATION IN THIS LOG FILE. WE ONLY MOVE BACK */
- /* THE FILE PAGE. */
- /*---------------------------------------------------------------------------*/
- cactiveUndoFilePage = cactiveUndoFilePage - tmp1;
- fsConnectptr.p->fsState = READ_UNDO_PAGE;
- }//if
- return;
- }//Dbacc::srStartUndoLab()
- /* ------- DO UNDO ---------------------------*/
- /* ******************--------------------------------------------------------------- */
- /* NEXTOPERATION ORD FOR EXECUTION OF NEXT OP */
- /* ******************------------------------------+ */
- /* SENDER: ACC, LEVEL B */
- void Dbacc::execNEXTOPERATION(Signal* signal)
- {
- jamEntry();
- tresult = 0;
- srDoUndoLab(signal);
- return;
- }//Dbacc::execNEXTOPERATION()
- void Dbacc::srDoUndoLab(Signal* signal)
- {
- DirRangePtr souDirRangePtr;
- DirectoryarrayPtr souDirptr;
- Page8Ptr souPageidptr;
- Uint32 tundoPageindex;
- UndoHeader *undoHeaderPtr;
- Uint32 tmpindex;
- jam();
- undopageptr.i = cactiveSrUndoPage;
- ptrCheckGuard(undopageptr, cundopagesize, undopage);
- /*---------------------------------------------------------------------------*/
- /* LAYOUT OF AN UNDO LOG RECORD: */
- /* ***************************** */
- /* */
- /* |----------------------------------------------------| */
- /* | TABLE ID | */
- /* |----------------------------------------------------| */
- /* | ROOT FRAGMENT ID | */
- /* |----------------------------------------------------| */
- /* | LOCAL FRAGMENT ID | */
- /* |----------------------------------------------------| */
- /* | UNDO INFO LEN 14 b | TYPE 4 b | PAGE INDEX 14 b | */
- /* |----------------------------------------------------| */
- /* | INDEX INTO PAGE DIRECTORY (LOGICAL PAGE ID) | */
- /* |----------------------------------------------------| */
- /* | PREVIOUS UNDO LOG RECORD FOR THE FRAGMENT | */
- /* |----------------------------------------------------| */
- /* | PREVIOUS UNDO LOG RECORD FOR ALL FRAGMENTS | */
- /* |----------------------------------------------------| */
- /* | TYPE SPECIFIC PART | */
- /* |----------------------------------------------------| */
- /*---------------------------------------------------------------------------*/
- /*---------------------------------------------------------------------------*/
- /* SET THE PAGE POINTER. WE ONLY WORK WITH TWO PAGES IN THIS RESTART */
- /* ACTIVITY. GET THE PAGE POINTER AND THE PAGE INDEX TO READ FROM. */
- /*---------------------------------------------------------------------------*/
- tundoindex = cprevUndoaddress & ZUNDOPAGEINDEX_MASK; //0x1fff, 13 bits.
- undoHeaderPtr = (UndoHeader *) &undopageptr.p->undoword[tundoindex];
- tundoindex = tundoindex + ZUNDOHEADSIZE;
-
- /*------------------------------------------------------------------------*/
- /* READ TABLE ID AND ROOT FRAGMENT ID AND USE THIS TO GET ROOT RECORD. */
- /*------------------------------------------------------------------------*/
- arrGuard((tundoindex + 6), 8192);
- // TABLE ID
- tabptr.i = undoHeaderPtr->tableId;
- ptrCheckGuard(tabptr, ctablesize, tabrec);
- // ROOT FRAGMENT ID
- tfid = undoHeaderPtr->rootFragId;
- if (!getrootfragmentrec(signal, rootfragrecptr, tfid)) {
- jam();
- /*---------------------------------------------------------------------*/
- /* THE ROOT RECORD WAS NOT FOUND. OBVIOUSLY WE ARE NOT RESTARTING THIS */
- /* FRAGMENT. WE THUS IGNORE THIS LOG RECORD AND PROCEED WITH THE NEXT. */
- /*---------------------------------------------------------------------*/
- creadyUndoaddress = cprevUndoaddress;
- // PREVIOUS UNDO LOG RECORD FOR ALL FRAGMENTS
- cprevUndoaddress = undoHeaderPtr->prevUndoAddress;
- undoNext2Lab(signal);
- return;
- }//if
- /*-----------------------------------------------------------------------*/
- /* READ THE LOCAL FRAGMENT ID AND VERIFY THAT IT IS CORRECT. */
- /*-----------------------------------------------------------------------*/
- if (rootfragrecptr.p->fragmentid[0] == undoHeaderPtr->localFragId) {
- jam();
- fragrecptr.i = rootfragrecptr.p->fragmentptr[0];
- ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- } else {
- if (rootfragrecptr.p->fragmentid[1] == undoHeaderPtr->localFragId) {
- jam();
- fragrecptr.i = rootfragrecptr.p->fragmentptr[1];
- ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- } else {
- jam();
- progError(__LINE__, 0, "Invalid local fragment id in undo log");
- return;
- }//if
- }//if
- /*------------------------------------------------------------------------*/
- /* READ UNDO INFO LENGTH, TYPE OF LOG RECORD AND PAGE INDEX WHERE TO */
- /* APPLY THIS LOG RECORD. ALSO STEP INDEX TO PREPARE READ OF LOGICAL */
- /* PAGE ID. SET TMPINDEX TO INDEX THE FIRST WORD IN THE TYPE SPECIFIC */
- /* PART. */
- /*------------------------------------------------------------------------*/
- // UNDO INFO LENGTH 14 b | TYPE 4 b | PAGE INDEX 14 b
- const Uint32 tmp1 = undoHeaderPtr->variousInfo;
- cundoinfolength = tmp1 >> 18;
- const Uint32 tpageType = (tmp1 >> 14) & 0xf;
- tundoPageindex = tmp1 & 0x3fff;
-
- // INDEX INTO PAGE DIRECTORY (LOGICAL PAGE ID)
- tmpP = undoHeaderPtr->logicalPageId ;
- tmpindex = tundoindex;
- arrGuard((tmpindex + cundoinfolength - 1), 8192);
- if (fragrecptr.p->localCheckpId != cactiveCheckpId) {
- jam();
- /*-----------------------------------------------------------------------*/
- /* THE FRAGMENT DID EXIST BUT IS NOT AFFECTED BY THIS UNDO LOG */
- /* EXECUTION. EITHER IT BELONGS TO ANOTHER OR IT IS CREATED AND ONLY IN */
- /* NEED OF EXECUTION OF REDO LOG RECORDS FROM LQH. */
- /*-----------------------------------------------------------------------*/
- creadyUndoaddress = cprevUndoaddress;
- // PREVIOUS UNDO LOG RECORD FOR ALL FRAGMENTS
- cprevUndoaddress = undoHeaderPtr->prevUndoAddress;
- undoNext2Lab(signal);
- return;
- }//if
- /*-----------------------------------------------------------------------*/
- /* VERIFY CONSISTENCY OF UNDO LOG RECORDS. */
- /*-----------------------------------------------------------------------*/
- ndbrequire(fragrecptr.p->prevUndoposition == cprevUndoaddress);
- cSrUndoRecords[tpageType]++;
- switch(tpageType){
- case UndoHeader::ZPAGE_INFO:{
- jam();
- /*----------------------------------------------------------------------*/
- /* WE HAVE TO UNDO UPDATES IN A NORMAL PAGE. GET THE PAGE POINTER BY */
- /* USING THE LOGICAL PAGE ID. THEN RESET THE OLD VALUE IN THE PAGE BY */
- /* USING THE OLD DATA WHICH IS STORED IN THIS UNDO LOG RECORD. */
- /*----------------------------------------------------------------------*/
- souDirRangePtr.i = fragrecptr.p->directory;
- tmpP2 = tmpP >> 8;
- tmpP = tmpP & 0xff;
- ptrCheckGuard(souDirRangePtr, cdirrangesize, dirRange);
- arrGuard(tmpP2, 256);
- souDirptr.i = souDirRangePtr.p->dirArray[tmpP2];
- ptrCheckGuard(souDirptr, cdirarraysize, directoryarray);
- souPageidptr.i = souDirptr.p->pagep[tmpP];
- ptrCheckGuard(souPageidptr, cpagesize, page8);
- Uint32 loopLimit = tundoPageindex + cundoinfolength;
- ndbrequire(loopLimit <= 2048);
- for (Uint32 tmp = tundoPageindex; tmp < loopLimit; tmp++) {
- dbgWord32(souPageidptr, tmp, undopageptr.p->undoword[tmpindex]);
- souPageidptr.p->word32[tmp] = undopageptr.p->undoword[tmpindex];
- tmpindex = tmpindex + 1;
- }//for
- break;
- }
-
- case UndoHeader::ZOVER_PAGE_INFO:{
- jam();
- /*----------------------------------------------------------------------*/
- /* WE HAVE TO UNDO UPDATES IN AN OVERFLOW PAGE. GET THE PAGE POINTER BY*/
- /* USING THE LOGICAL PAGE ID. THEN RESET THE OLD VALUE IN THE PAGE BY */
- /* USING THE OLD DATA WHICH IS STORED IN THIS UNDO LOG RECORD. */
- /*----------------------------------------------------------------------*/
- souDirRangePtr.i = fragrecptr.p->overflowdir;
- tmpP2 = tmpP >> 8;
- tmpP = tmpP & 0xff;
- ptrCheckGuard(souDirRangePtr, cdirrangesize, dirRange);
- arrGuard(tmpP2, 256);
- souDirptr.i = souDirRangePtr.p->dirArray[tmpP2];
- ptrCheckGuard(souDirptr, cdirarraysize, directoryarray);
- souPageidptr.i = souDirptr.p->pagep[tmpP];
- ptrCheckGuard(souPageidptr, cpagesize, page8);
- Uint32 loopLimit = tundoPageindex + cundoinfolength;
- ndbrequire(loopLimit <= 2048);
- for (Uint32 tmp = tundoPageindex; tmp < loopLimit; tmp++) {
- dbgWord32(souPageidptr, tmp, undopageptr.p->undoword[tmpindex]);
- souPageidptr.p->word32[tmp] = undopageptr.p->undoword[tmpindex];
- tmpindex = tmpindex + 1;
- }//for
- break;
- }
- case UndoHeader::ZUNDO_INSERT_LONG_KEY:{
- jam();
- /*---------------------------------------------------------------------*/
- /* WE WILL UNDO AN INSERT OF A LONG KEY. THIS IS PERFORMED BY DELETING */
- /* THE LONG KEY. */
- /*---------------------------------------------------------------------*/
- souDirRangePtr.i = fragrecptr.p->overflowdir;
- tmpP2 = tmpP >> 8;
- tmpP = tmpP & 0xff;
- arrGuard(tmpP2, 256);
- ptrCheckGuard(souDirRangePtr, cdirrangesize, dirRange);
- souDirptr.i = souDirRangePtr.p->dirArray[tmpP2];
- ptrCheckGuard(souDirptr, cdirarraysize, directoryarray);
- dlkPageptr.i = souDirptr.p->pagep[tmpP];
- ptrCheckGuard(dlkPageptr, cpagesize, page8);
- tdlkLogicalPageIndex = tundoPageindex;
- deleteLongKey(signal);
- break;
- }
- case UndoHeader::ZUNDO_DELETE_LONG_KEY: {
- jam();
- /*----------------------------------------------------------------------*/
- /* WE WILL UNDO DELETE OF A LONG KEY. THIS IS PERFORMED BY INSERTING */
- /* IT AGAIN. */
- /*----------------------------------------------------------------------*/
- souDirRangePtr.i = fragrecptr.p->overflowdir;
- taslpDirIndex = tmpP;
- tmpP2 = tmpP >> 8;
- tmpP = tmpP & 0xff;
- ptrCheckGuard(souDirRangePtr, cdirrangesize, dirRange);
- arrGuard(tmpP2, 256);
- souDirptr.i = souDirRangePtr.p->dirArray[tmpP2];
- if(souDirptr.i == RNIL) {
- //----------------------------------------------------------------
- // Allocate a directory.
- //----------------------------------------------------------------
- jam();
- seizeDirectory(signal);
- if (tresult > ZLIMIT_OF_ERROR) {
- jam();
- sendSystemerror(signal);
- return;
- }
- souDirRangePtr.p->dirArray[tmpP2] = sdDirptr.i;
- souDirptr.i = souDirRangePtr.p->dirArray[tmpP2];
- }
- ptrCheckGuard(souDirptr, cdirarraysize, directoryarray);
- slkapPageptr.i = souDirptr.p->pagep[tmpP];
-
- if(slkapPageptr.i == RNIL) {
- //----------------------------------------------------------------
- // The delete operation was probably the last on the page and the
- // page was released and not written down to disk. We need to
- // allocate a page and put it in the same dirindex as it was in
- // before it was released.
- // This is because an eventual UNDO_INSERT on the same key in the
- // same LCP must be able to find the key and it has only the
- // dirindex to go on, the key itself is not saved on disk in a
- // UNDO_INSERT.
- //----------------------------------------------------------------
- jam();
- allocSpecificLongOverflowPage(signal);
- slkapPageptr.i = aslpPageptr.i;
- }
-
- ptrCheckGuard(slkapPageptr, cpagesize, page8);
- seizePage(signal);
- ndbrequire(tresult <= ZLIMIT_OF_ERROR);
-
- slkapCopyPageptr = spPageptr;
- ndbrequire(cundoinfolength <= 2048);
-
- for (Uint32 tmp = 0; tmp < cundoinfolength; tmp++) {
- dbgWord32(slkapCopyPageptr, tmp, undopageptr.p->undoword[tmpindex]);
- slkapCopyPageptr.p->word32[tmp] = undopageptr.p->undoword[tmpindex];
- tmpindex = tmpindex + 1;
- }//for
- jam();
- //----------------------------------------------------------------
- // We must store the key at the same place it was deleted from.
- // This is because an eventual UNDO_INSERT on the same key in the
- // same LCP must be able to find the key and it has only the index
- // information stored on disk to go on, the key itself is not
- // saved on disk in an UNDO_INSERT.
- //----------------------------------------------------------------
- tslkapKeyLen = cundoinfolength;
- tslkapPageIndex = tundoPageindex;
- storeLongKeysAtPos(signal);
-
- rpPageptr = slkapCopyPageptr;
- releasePage(signal);
- break;
- }
- case UndoHeader::ZOP_INFO: {
- jam();
- /*---------------------------------------------------------------------*/
- /* AN OPERATION WAS ACTIVE WHEN LOCAL CHECKPOINT WAS EXECUTED. WE NEED */
- /* TO RESET THE LOCKS IT HAS SET. IF THE OPERATION WAS AN INSERT OR */
- /* THE ELEMENT WAS MARKED AS DISSAPEARED IT WILL ALSO BE REMOVED */
- /* FROM THE PAGE */
- /* */
- /* BEGIN BY SEARCHING AFTER THE ELEMENT, WHEN FOUND UNDO THE */
- /* CHANGES ON THE ELEMENT HEADER. IF IT WAS AN INSERT OPERATION OR */
- /* MARKED AS DISSAPEARED PROCEED BY REMOVING THE ELEMENT. */
- /*---------------------------------------------------------------------*/
- seizeOpRec(signal);
- // Initialise the opRec
- operationRecPtr.p->transId1 = 0;
- operationRecPtr.p->transId2 = RNIL;
- operationRecPtr.p->transactionstate = ACTIVE;
- operationRecPtr.p->commitDeleteCheckFlag = ZFALSE;
- operationRecPtr.p->lockMode = 0;
- operationRecPtr.p->dirtyRead = 0;
- operationRecPtr.p->nodeType = 0;
- operationRecPtr.p->fid = fragrecptr.p->myfid;
- operationRecPtr.p->nextParallelQue = RNIL;
- operationRecPtr.p->prevParallelQue = RNIL;
- operationRecPtr.p->nextQueOp = RNIL;
- operationRecPtr.p->prevQueOp = RNIL;
- operationRecPtr.p->nextSerialQue = RNIL;
- operationRecPtr.p->prevSerialQue = RNIL;
- operationRecPtr.p->elementPage = RNIL;
- operationRecPtr.p->keyinfoPage = RNIL;
- operationRecPtr.p->insertIsDone = ZFALSE;
- operationRecPtr.p->lockOwner = ZFALSE;
- operationRecPtr.p->elementIsDisappeared = ZFALSE;
- operationRecPtr.p->insertDeleteLen = fragrecptr.p->elementLength;
- operationRecPtr.p->longPagePtr = RNIL;
- operationRecPtr.p->longKeyPageIndex = RNIL;
- operationRecPtr.p->scanRecPtr = RNIL;
- operationRecPtr.p->isAccLockReq = ZFALSE;
- // Read operation values from undo page
- operationRecPtr.p->operation = undopageptr.p->undoword[tmpindex];
- tmpindex++;
- operationRecPtr.p->hashValue = undopageptr.p->undoword[tmpindex];
- tmpindex++;
- const Uint32 tkeylen = undopageptr.p->undoword[tmpindex];
- tmpindex++;
- operationRecPtr.p->tupkeylen = tkeylen;
- operationRecPtr.p->fragptr = fragrecptr.i;
- ndbrequire((fragrecptr.p->keyLength == 0) ||
- ((fragrecptr.p->keyLength != 0) &&
- (fragrecptr.p->keyLength == tkeylen)));
-
- // Read keydata from undo page
- for (Uint32 tmp = 0; tmp < tkeylen; tmp++) {
- signal->theData[7+tmp] = undopageptr.p->undoword[tmpindex];
- tmpindex = tmpindex + 1;
- }//for
- arrGuard((tmpindex - 1), 8192);
- getElement(signal);
- if (tgeResult != ZTRUE) {
- jam();
- signal->theData[0] = RNIL;
- signal->theData[1] = tabptr.i;
- signal->theData[2] = cactiveCheckpId;
- signal->theData[3] = cprevUndoaddress;
- signal->theData[4] = operationRecPtr.p->operation;
- signal->theData[5] = operationRecPtr.p->hashValue;
- signal->theData[6] = operationRecPtr.p->tupkeylen;
- sendSignal(cownBlockref, GSN_DEBUG_SIG, signal, 11, JBA);
- return;
- }//if
-
- operationRecPtr.p->elementPage = gePageptr.i;
- operationRecPtr.p->elementContainer = tgeContainerptr;
- operationRecPtr.p->elementPointer = tgeElementptr;
- operationRecPtr.p->elementIsforward = tgeForward;
-
- commitdelete(signal, true);
- releaseOpRec(signal);
- break;
- }
- default:
- jam();
- progError(__LINE__, 0, "Invalid pagetype in undo log");
- break;
- }//switch(tpageType)
- /*----------------------------------------------------------------------*/
- /* READ THE PAGE ID AND THE PAGE INDEX OF THE PREVIOUS UNDO LOG RECORD */
- /* FOR THIS FRAGMENT. */
- /*----------------------------------------------------------------------*/
- fragrecptr.p->prevUndoposition = undoHeaderPtr->prevUndoAddressForThisFrag;
- /*----------------------------------------------------------------------*/
- /* READ THE PAGE ID AND THE PAGE INDEX OF THE PREVIOUS UNDO LOG RECORD */
- /* FOR THIS UNDO LOG. */
- /*----------------------------------------------------------------------*/
- creadyUndoaddress = cprevUndoaddress;
- cprevUndoaddress = undoHeaderPtr->prevUndoAddress;
- if (fragrecptr.p->prevUndoposition == cminusOne) {
- jam();
- /*---------------------------------------------------------------------*/
- /* WE HAVE NOW EXECUTED ALL UNDO LOG RECORDS FOR THIS FRAGMENT. WE */
- /* NOW NEED TO UPDATE THE FREE LIST OF OVERFLOW PAGES. */
- /*---------------------------------------------------------------------*/
- ndbrequire(fragrecptr.p->nextAllocPage == 0);
- signal->theData[0] = fragrecptr.i;
- sendSignal(cownBlockref, GSN_ACC_OVER_REC, signal, 1, JBB);
- return;
- }//if
- undoNext2Lab(signal);
- return;
- }//Dbacc::srDoUndoLab()
- void Dbacc::undoNext2Lab(Signal* signal)
- {
- /*---------------------------------------------------------------------------*/
- /* EXECUTE NEXT UNDO LOG RECORD. */
- /*---------------------------------------------------------------------------*/
- if (cprevUndoaddress == cminusOne) {
- jam();
- /*---------------------------------------------------------------------------*/
- /* WE HAVE EXECUTED THIS UNDO LOG TO COMPLETION. IT IS NOW TIME TO TAKE*/
- /* OF THE NEXT UNDO LOG OR REPORT COMPLETION OF UNDO LOG EXECUTION. */
- /*---------------------------------------------------------------------------*/
- signal->theData[0] = ZSTART_UNDO;
- sendSignal(cownBlockref, GSN_CONTINUEB, signal, 1, JBB);
- return;
- }//if
- if ((creadyUndoaddress >> 13) != (cprevUndoaddress >> 13)) {
- /*---------------------------------------------------------------------------*/
- /* WE ARE CHANGING PAGE. */
- /*---------------------------------------------------------------------------*/
- if (cactiveSrUndoPage == 0) {
- jam();
- /*---------------------------------------------------------------------------*/
- /* WE HAVE READ AND EXECUTED ALL UNDO LOG INFORMATION IN THE CURRENTLY */
- /* READ PAGES. WE STILL HAVE MORE INFORMATION TO READ FROM FILE SINCE */
- /* WE HAVEN'T FOUND THE FIRST LOG RECORD IN THE LOG FILE YET. */
- /*---------------------------------------------------------------------------*/
- srStartUndoLab(signal);
- return;
- } else {
- jam();
- /*---------------------------------------------------------------------------*/
- /* WE HAVE ANOTHER PAGE READ THAT WE NEED TO EXECUTE. */
- /*---------------------------------------------------------------------------*/
- cactiveSrUndoPage = cactiveSrUndoPage - 1;
- }//if
- }//if
- /*---------------------------------------------------------------------------*/
- /* REAL-TIME BREAK */
- /*---------------------------------------------------------------------------*/
- /* ******************************< */
- /* NEXTOPERATION */
- /* ******************************< */
- sendSignal(cownBlockref, GSN_NEXTOPERATION, signal, 1, JBB);
- return;
- }//Dbacc::undoNext2Lab()
- /*-----------------------------------------------------------------------------------*/
- /* AFTER COMPLETING THE READING OF DATA PAGES FROM DISK AND EXECUTING THE UNDO */
- /* LOG WE ARE READY TO UPDATE THE FREE LIST OF OVERFLOW PAGES. THIS LIST MUST */
- /* BE BUILT AGAIN SINCE IT IS NOT CHECKPOINTED. WHEN THE PAGES ARE ALLOCATED */
- /* THEY ARE NOT PART OF ANY LIST. PAGES CAN EITHER BE PUT IN FREE LIST, NOT */
- /* IN FREE LIST OR BE PUT INTO LIST OF LONG KEY PAGES. */
- /*-----------------------------------------------------------------------------------*/
- void Dbacc::execACC_OVER_REC(Signal* signal)
- {
- DirRangePtr pnoDirRangePtr;
- DirectoryarrayPtr pnoOverflowDirptr;
- Page8Ptr pnoPageidptr;
- Uint32 tpnoPageType;
- Uint32 toverPageCheck;
- jamEntry();
- fragrecptr.i = signal->theData[0];
- toverPageCheck = 0;
- ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- ndbrequire((fragrecptr.p->nextAllocPage != 0) ||
- (fragrecptr.p->firstOverflowRec == RNIL));
- /*-----------------------------------------------------------------------------------*/
- /* WHO HAS PUT SOMETHING INTO THE LIST BEFORE WE EVEN STARTED PUTTING THINGS */
- /* THERE. */
- /*-----------------------------------------------------------------------------------*/
- ndbrequire(fragrecptr.p->loadingFlag == ZTRUE);
- /*---------------------------------------------------------------------------*/
- /* LOADING HAS STOPPED BEFORE WE HAVE LOADED, SYSTEM ERROR. */
- /*---------------------------------------------------------------------------*/
- while (toverPageCheck < ZNO_OF_OP_PER_SIGNAL) {
- jam();
- if (fragrecptr.p->nextAllocPage >= fragrecptr.p->lastOverIndex) {
- jam();
- fragrecptr.p->loadingFlag = ZFALSE;
- rootfragrecptr.i = fragrecptr.p->myroot;
- ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
- if (rootfragrecptr.p->lcpPtr != RNIL) {
- jam();
- srCloseDataFileLab(signal);
- } else {
- jam();
- undoNext2Lab(signal);
- }//if
- return;
- }//if
- tmpP = fragrecptr.p->nextAllocPage;
- pnoDirRangePtr.i = fragrecptr.p->overflowdir;
- tmpP2 = tmpP >> 8;
- tmpP = tmpP & 0xff;
- arrGuard(tmpP2, 256);
- ptrCheckGuard(pnoDirRangePtr, cdirrangesize, dirRange);
- if (pnoDirRangePtr.p->dirArray[tmpP2] == RNIL) {
- jam();
- pnoPageidptr.i = RNIL;
- } else {
- pnoOverflowDirptr.i = pnoDirRangePtr.p->dirArray[tmpP2];
- if (pnoOverflowDirptr.i == RNIL) {
- jam();
- pnoPageidptr.i = RNIL;
- } else {
- jam();
- ptrCheckGuard(pnoOverflowDirptr, cdirarraysize, directoryarray);
- pnoPageidptr.i = pnoOverflowDirptr.p->pagep[tmpP];
- }//if
- }//if
- if (pnoPageidptr.i == RNIL) {
- jam();
- seizeOverRec(signal);
- sorOverflowRecPtr.p->dirindex = fragrecptr.p->nextAllocPage;
- sorOverflowRecPtr.p->overpage = RNIL;
- priOverflowRecPtr = sorOverflowRecPtr;
- putRecInFreeOverdir(signal);
- } else {
- ptrCheckGuard(pnoPageidptr, cpagesize, page8);
- tpnoPageType = pnoPageidptr.p->word32[ZPOS_PAGE_TYPE];
- tpnoPageType = (tpnoPageType >> ZPOS_PAGE_TYPE_BIT) & 3;
- if (tpnoPageType == ZLONG_PAGE_TYPE) {
- jam();
- // This is to clean the list parameters.
- pnoPageidptr.p->word32[ZPOS_PREV_PAGE] = RNIL;
- pnoPageidptr.p->word32[ZPOS_NEXT_PAGE] = RNIL;
- if (pnoPageidptr.p->word32[ZPOS_ARRAY_POS] != 4) {
- jam();
- /*---------------------------------------------------------------------------*/
- /* THE PAGE WAS A LONG PAGE AND IT BELONGED TO A FREE LIST. PUT IT INTO ONE */
- /* OF THE FREE LIST THEN. */
- /*---------------------------------------------------------------------------*/
- // Insert page!
- ipaPagePtr = pnoPageidptr;
- tipaArrayPos = pnoPageidptr.p->word32[ZPOS_ARRAY_POS];
- insertPageArrayList(signal);
- }//if
- } else {
- if (pnoPageidptr.p->word32[ZPOS_ALLOC_CONTAINERS] > ZFREE_LIMIT) {
- jam();
- dbgWord32(pnoPageidptr, ZPOS_OVERFLOWREC, RNIL);
- pnoPageidptr.p->word32[ZPOS_OVERFLOWREC] = RNIL;
- ndbrequire(pnoPageidptr.p->word32[ZPOS_PAGE_ID] == fragrecptr.p->nextAllocPage);
- } else {
- jam();
- seizeOverRec(signal);
- sorOverflowRecPtr.p->dirindex = pnoPageidptr.p->word32[ZPOS_PAGE_ID];
- ndbrequire(sorOverflowRecPtr.p->dirindex == fragrecptr.p->nextAllocPage);
- dbgWord32(pnoPageidptr, ZPOS_OVERFLOWREC, sorOverflowRecPtr.i);
- pnoPageidptr.p->word32[ZPOS_OVERFLOWREC] = sorOverflowRecPtr.i;
- sorOverflowRecPtr.p->overpage = pnoPageidptr.i;
- porOverflowRecPtr = sorOverflowRecPtr;
- putOverflowRecInFrag(signal);
- if (pnoPageidptr.p->word32[ZPOS_ALLOC_CONTAINERS] == 0) {
- jam();
- ropPageptr = pnoPageidptr;
- releaseOverpage(signal);
- }//if
- }//if
- }//if
- }//if
- fragrecptr.p->nextAllocPage++;
- toverPageCheck++;
- }//while
- signal->theData[0] = fragrecptr.i;
- sendSignal(cownBlockref, GSN_ACC_OVER_REC, signal, 1, JBB);
- }//Dbacc::execACC_OVER_REC()
- /* --------------------------------------------------------------------------------- */
- /* --------------------------------------------------------------------------------- */
- /* --------------------------------------------------------------------------------- */
- /* */
- /* END OF SYSTEM RESTART MODULE */
- /* */
- /* --------------------------------------------------------------------------------- */
- /* --------------------------------------------------------------------------------- */
- /* --------------------------------------------------------------------------------- */
- /* --------------------------------------------------------------------------------- */
- /* --------------------------------------------------------------------------------- */
- /* */
- /* SCAN MODULE */
- /* */
- /* --------------------------------------------------------------------------------- */
- /* --------------------------------------------------------------------------------- */
- /* ******************--------------------------------------------------------------- */
- /* ACC_SCANREQ START OF A SCAN PROCESS */
- /* SENDER: LQH, LEVEL B */
- /* ENTER ACC_SCANREQ WITH */
- /* TUSERPTR, LQH SCAN_CONNECT POINTER */
- /* TUSERBLOCKREF, LQH BLOCK REFERENCE */
- /* TABPTR, TABLE IDENTITY AND PTR */
- /* TFID ROOT FRAGMENT IDENTITY */
- /* TSCAN_FLAG , = ZCOPY, ZSCAN, ZSCAN_LOCK_ALL */
- /* ZREADLOCK, ZWRITELOCK */
- /* TSCAN_TRID1 , TRANSACTION ID PART 1 */
- /* TSCAN_TRID2 TRANSACTION ID PART 2 */
- /* ******************--------------------------------------------------------------- */
- /* ******************--------------------------------------------------------------- */
- /* ACC_SCANREQ START OF A SCAN PROCESS */
- /* ******************------------------------------+ */
- /* SENDER: LQH, LEVEL B */
- void Dbacc::execACC_SCANREQ(Signal* signal)
- {
- jamEntry();
- AccScanReq * req = (AccScanReq*)&signal->theData[0];
- tuserptr = req->senderData;
- tuserblockref = req->senderRef;
- tabptr.i = req->tableId;
- tfid = req->fragmentNo;
- tscanFlag = req->requestInfo;
- tscanTrid1 = req->transId1;
- tscanTrid2 = req->transId2;
-
- tresult = 0;
- ptrCheckGuard(tabptr, ctablesize, tabrec);
- ndbrequire(getrootfragmentrec(signal,rootfragrecptr, tfid));
-
- Uint32 i;
- for (i = 0; i < MAX_PARALLEL_SCANS_PER_FRAG; i++) {
- jam();
- if (rootfragrecptr.p->scan[i] == RNIL) {
- jam();
- break;
- }
- }
- ndbrequire(i != MAX_PARALLEL_SCANS_PER_FRAG);
- ndbrequire(cfirstFreeScanRec != RNIL);
- seizeScanRec(signal);
- rootfragrecptr.p->scan[i] = scanPtr.i;
- scanPtr.p->scanBucketState = ScanRec::FIRST_LAP;
- scanPtr.p->scanLockMode = AccScanReq::getLockMode(tscanFlag);
- scanPtr.p->scanKeyinfoFlag = AccScanReq::getKeyinfoFlag(tscanFlag);
- scanPtr.p->scanReadCommittedFlag = AccScanReq::getReadCommittedFlag(tscanFlag);
-
- /* TWELVE BITS OF THE ELEMENT HEAD ARE SCAN */
- /* CHECK BITS. THE MASK NOTES WHICH BIT IS */
- /* ALLOCATED FOR THE ACTIVE SCAN */
- scanPtr.p->scanMask = 1 << i;
- scanPtr.p->scanUserptr = tuserptr;
- scanPtr.p->scanUserblockref = tuserblockref;
- scanPtr.p->scanTrid1 = tscanTrid1;
- scanPtr.p->scanTrid2 = tscanTrid2;
- scanPtr.p->rootPtr = rootfragrecptr.i;
- scanPtr.p->scanLockHeld = 0;
- scanPtr.p->scanOpsAllocated = 0;
- scanPtr.p->scanFirstActiveOp = RNIL;
- scanPtr.p->scanFirstQueuedOp = RNIL;
- scanPtr.p->scanLastQueuedOp = RNIL;
- scanPtr.p->scanFirstLockedOp = RNIL;
- scanPtr.p->scanLastLockedOp = RNIL;
- scanPtr.p->scanState = ScanRec::WAIT_NEXT;
- fragrecptr.i = rootfragrecptr.p->fragmentptr[0];
- ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
- initScanFragmentPart(signal);
- /*------------------------------------------------------*/
- /* We start the timeout loop for the scan process here. */
- /*------------------------------------------------------*/
- ndbrequire(scanPtr.p->scanTimer == 0);
- if (scanPtr.p->scanContinuebCounter == 0) {
- jam();
- scanPtr.p->scanContinuebCounter = 1;
- signal->theData[0] = ZSEND_SCAN_HBREP;
- signal->theData[1] = scanPtr.i;
- sendSignalWithDelay(cownBlockref, GSN_CONTINUEB, signal, 100, 2);
- }//if
- scanPtr.p->scanTimer = scanPtr.p->scanContinuebCounter;
- /* ************************ */
- /* ACC_SCANCONF */
- /* ************************ */