DbaccMain.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:575k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1.   arrGuard(trlpTmp2, 256);
  2.   rlpOverflowDirptr.i = rlpOverflowrangeptr.p->dirArray[trlpTmp2];
  3.   ptrCheckGuard(rlpOverflowDirptr, cdirarraysize, directoryarray);
  4.   rlpOverflowDirptr.p->pagep[trlpTmp3] = RNIL;
  5.   if (cundoLogActive != ZTRUE) {
  6.     // Remove from page array.
  7.     trfpArrayPos = rlopPageptr.p->word32[ZPOS_ARRAY_POS];
  8.     rfpPageptr = rlopPageptr;
  9.     removeFromPageArrayList(signal);
  10.   }
  11.   // Reset page header
  12.   iloPageptr = rlopPageptr;
  13.   tiloIndex = rlopPageptr.p->word32[ZPOS_PAGE_ID];
  14.   initLongOverpage(signal);
  15.   rpPageptr = rlopPageptr;
  16.   releasePage(signal);
  17. }//Dbacc::releaseLongPage()
  18. /* ------------------------------------------------------------------------- */
  19. /* ------------------------------------------------------------------------- */
  20. /* ------------------------------------------------------------------------- */
  21. /*                                                                           */
  22. /*       END OF DELETE MODULE                                                */
  23. /*                                                                           */
  24. /* ------------------------------------------------------------------------- */
  25. /* ------------------------------------------------------------------------- */
  26. /* ------------------------------------------------------------------------- */
  27. /* ------------------------------------------------------------------------- */
  28. /* ------------------------------------------------------------------------- */
  29. /* ------------------------------------------------------------------------- */
  30. /*                                                                           */
  31. /*       COMMIT AND ABORT MODULE                                             */
  32. /*                                                                           */
  33. /* ------------------------------------------------------------------------- */
  34. /* ------------------------------------------------------------------------- */
  35. /* ------------------------------------------------------------------------- */
  36. /* ------------------------------------------------------------------------- */
  37. /* ABORT_OPERATION                                                           */
  38. /*DESCRIPTION: AN OPERATION RECORD CAN BE IN A LOCK QUEUE OF AN ELEMENT OR   */
  39. /*OWNS THE LOCK. BY THIS SUBROUTINE THE LOCK STATE OF THE OPERATION WILL     */
  40. /*BE CHECKED. THE OPERATION RECORD WILL BE REMOVED FROM THE QUEUE IF IT      */
  41. /*BELONGED TO ANY ONE, OTHERWISE THE ELEMENT HEAD WILL BE UPDATED.           */
  42. /* ------------------------------------------------------------------------- */
  43. void Dbacc::abortOperation(Signal* signal) 
  44. {
  45.   OperationrecPtr aboOperRecPtr;
  46.   OperationrecPtr TaboOperRecPtr;
  47.   Page8Ptr aboPageidptr;
  48.   Uint32 taboElementptr;
  49.   Uint32 tmp2Olq;
  50.   if (operationRecPtr.p->lockOwner == ZTRUE) {
  51.     takeOutLockOwnersList(signal, operationRecPtr);
  52.     if (operationRecPtr.p->insertIsDone == ZTRUE) { 
  53.       jam();
  54.       operationRecPtr.p->elementIsDisappeared = ZTRUE;
  55.     }//if
  56.     if ((operationRecPtr.p->nextParallelQue != RNIL) ||
  57.         (operationRecPtr.p->nextSerialQue != RNIL)) {
  58.       jam();
  59.       releaselock(signal);
  60.     } else {
  61.       /* --------------------------------------------------------------------------------- */
  62.       /* WE ARE OWNER OF THE LOCK AND NO OTHER OPERATIONS ARE QUEUED. IF INSERT OR STANDBY */
  63.       /* WE DELETE THE ELEMENT OTHERWISE WE REMOVE THE LOCK FROM THE ELEMENT.              */
  64.       /* --------------------------------------------------------------------------------- */
  65.       if (operationRecPtr.p->elementIsDisappeared == ZFALSE) {
  66.         jam();
  67.         taboElementptr = operationRecPtr.p->elementPointer;
  68.         aboPageidptr.i = operationRecPtr.p->elementPage;
  69.         tmp2Olq = ElementHeader::setUnlocked(operationRecPtr.p->hashvaluePart,
  70.      operationRecPtr.p->scanBits);
  71.         ptrCheckGuard(aboPageidptr, cpagesize, page8);
  72.         dbgWord32(aboPageidptr, taboElementptr, tmp2Olq);
  73.         arrGuard(taboElementptr, 2048);
  74.         aboPageidptr.p->word32[taboElementptr] = tmp2Olq;
  75.         return;
  76.       } else {
  77.         jam();
  78.         commitdelete(signal, false);
  79.       }//if
  80.     }//if
  81.   } else {
  82.     /* --------------------------------------------------------------- */
  83.     // We are not the lock owner.
  84.     /* --------------------------------------------------------------- */
  85.     jam();
  86.     takeOutFragWaitQue(signal);
  87.     if (operationRecPtr.p->prevParallelQue != RNIL) {
  88.       jam();
  89.       /* ---------------------------------------------------------------------------------- */
  90.       /* SINCE WE ARE NOT QUEUE LEADER WE NEED NOT CONSIDER IF THE ELEMENT IS TO BE DELETED.*/
  91.       /* We will simply remove it from the parallel list without any other rearrangements.  */
  92.       /* ---------------------------------------------------------------------------------- */
  93.       aboOperRecPtr.i = operationRecPtr.p->prevParallelQue;
  94.       ptrCheckGuard(aboOperRecPtr, coprecsize, operationrec);
  95.       aboOperRecPtr.p->nextParallelQue = operationRecPtr.p->nextParallelQue;
  96.       if (operationRecPtr.p->nextParallelQue != RNIL) {
  97.         jam();
  98.         aboOperRecPtr.i = operationRecPtr.p->nextParallelQue;
  99.         ptrCheckGuard(aboOperRecPtr, coprecsize, operationrec);
  100.         aboOperRecPtr.p->prevParallelQue = operationRecPtr.p->prevParallelQue;
  101.       }//if
  102.     } else if (operationRecPtr.p->prevSerialQue != RNIL) {
  103.       /* ------------------------------------------------------------------------- */
  104.       // We are not in the parallel queue owning the lock. Thus we are in another parallel
  105.       // queue longer down in the serial queue. We are however first since prevParallelQue
  106.       // == RNIL.
  107.       /* ------------------------------------------------------------------------- */
  108.       if (operationRecPtr.p->nextParallelQue != RNIL) {
  109.         jam();
  110. /* ------------------------------------------------------------------------- */
  111. // We have an operation in the queue after us. We simply rearrange this parallel queue.
  112. // The new leader of this parallel queue will be operation in the serial queue.
  113. /* ------------------------------------------------------------------------- */
  114.         aboOperRecPtr.i = operationRecPtr.p->nextParallelQue;
  115.         ptrCheckGuard(aboOperRecPtr, coprecsize, operationrec);
  116.         aboOperRecPtr.p->nextSerialQue = operationRecPtr.p->nextSerialQue;
  117.         aboOperRecPtr.p->prevSerialQue = operationRecPtr.p->prevSerialQue;
  118.         aboOperRecPtr.p->prevParallelQue = RNIL; // Queue Leader
  119.         if (operationRecPtr.p->nextSerialQue != RNIL) {
  120.           jam();
  121.           TaboOperRecPtr.i = operationRecPtr.p->nextSerialQue;
  122.           ptrCheckGuard(TaboOperRecPtr, coprecsize, operationrec);
  123.           TaboOperRecPtr.p->prevSerialQue = aboOperRecPtr.i;
  124.         }//if
  125.         TaboOperRecPtr.i = operationRecPtr.p->prevSerialQue;
  126.         ptrCheckGuard(TaboOperRecPtr, coprecsize, operationrec);
  127.         TaboOperRecPtr.p->nextSerialQue = aboOperRecPtr.i;
  128.       } else {
  129.         jam();
  130. /* ------------------------------------------------------------------------- */
  131. // We are the only operation in this parallel queue. We will thus shrink the serial
  132. // queue.
  133. /* ------------------------------------------------------------------------- */
  134.         aboOperRecPtr.i = operationRecPtr.p->prevSerialQue;
  135.         ptrCheckGuard(aboOperRecPtr, coprecsize, operationrec);
  136.         aboOperRecPtr.p->nextSerialQue = operationRecPtr.p->nextSerialQue;
  137.         if (operationRecPtr.p->nextSerialQue != RNIL) {
  138.           jam();
  139.           aboOperRecPtr.i = operationRecPtr.p->nextSerialQue;
  140.           ptrCheckGuard(aboOperRecPtr, coprecsize, operationrec);
  141.           aboOperRecPtr.p->prevSerialQue = operationRecPtr.p->prevSerialQue;
  142.         }//if
  143.       }//if
  144.     }//if
  145.   }//if
  146.   /* ------------------------------------------------------------------------- */
  147.   // If prevParallelQue = RNIL and prevSerialQue = RNIL and we are not owner of the
  148.   // lock then we cannot be in any lock queue at all.
  149.   /* ------------------------------------------------------------------------- */
  150. }//Dbacc::abortOperation()
  151. void Dbacc::commitDeleteCheck()
  152. {
  153.   OperationrecPtr opPtr;
  154.   OperationrecPtr lastOpPtr;
  155.   OperationrecPtr deleteOpPtr;
  156.   bool elementDeleted = false;
  157.   bool deleteCheckOngoing = true;
  158.   Uint32 hashValue = 0;
  159.   lastOpPtr = operationRecPtr;
  160.   opPtr.i = operationRecPtr.p->nextParallelQue;
  161.   while (opPtr.i != RNIL) {
  162.     jam();
  163.     ptrCheckGuard(opPtr, coprecsize, operationrec);
  164.     lastOpPtr = opPtr;
  165.     opPtr.i = opPtr.p->nextParallelQue;
  166.   }//while
  167.   deleteOpPtr = lastOpPtr;
  168.   do {
  169.     if (deleteOpPtr.p->operation == ZDELETE) {
  170.       jam();
  171.       /* --------------------------------------------------------------------------------- */
  172.       /* IF THE CURRENT OPERATION TO BE COMMITTED IS A DELETE OPERATION DUE TO A           */
  173.       /* SCAN-TAKEOVER THE ACTUAL DELETE WILL BE PERFORMED BY THE PREVIOUS OPERATION (SCAN)*/
  174.       /* IN THE PARALLEL QUEUE WHICH OWNS THE LOCK.THE PROBLEM IS THAT THE SCAN OPERATION  */
  175.       /* DOES NOT HAVE A HASH VALUE ASSIGNED TO IT SO WE COPY IT FROM THIS OPERATION.      */
  176.       /*                                                                                   */
  177.       /* WE ASSUME THAT THIS SOLUTION WILL WORK BECAUSE THE ONLY WAY A SCAN CAN PERFORM    */
  178.       /* A DELETE IS BY BEING FOLLOWED BY A NORMAL DELETE-OPERATION THAT HAS A HASH VALUE. */
  179.       /* --------------------------------------------------------------------------------- */
  180.       hashValue = deleteOpPtr.p->hashValue;
  181.       elementDeleted = true;
  182.       deleteCheckOngoing = false;
  183.     } else if ((deleteOpPtr.p->operation == ZREAD) ||
  184.                (deleteOpPtr.p->operation == ZSCAN_OP)) {
  185.       /* --------------------------------------------------------------------------------- */
  186.       /* We are trying to find out whether the commit will in the end delete the tuple.    */
  187.       /* Normally the delete will be the last operation in the list of operations on this  */
  188.       /* It is however possible to issue reads and scans in the same savepoint as the      */
  189.       /* delete operation was issued and these can end up after the delete in the list of  */
  190.       /* operations in the parallel queue. Thus if we discover a read or a scan we have to */
  191.       /* continue scanning the list looking for a delete operation.                        */
  192.       /* --------------------------------------------------------------------------------- */
  193.       deleteOpPtr.i = deleteOpPtr.p->prevParallelQue;
  194.       if (deleteOpPtr.i == RNIL) {
  195.         jam();
  196.         deleteCheckOngoing = false;
  197.       } else {
  198.         jam();
  199.         ptrCheckGuard(deleteOpPtr, coprecsize, operationrec);
  200.       }//if
  201.     } else {
  202.       jam();
  203.       /* --------------------------------------------------------------------------------- */
  204.       /* Finding an UPDATE or INSERT before finding a DELETE means we cannot be deleting   */
  205.       /* as the end result of this transaction.                                            */
  206.       /* --------------------------------------------------------------------------------- */
  207.       deleteCheckOngoing = false;
  208.     }//if
  209.   } while (deleteCheckOngoing);
  210.   opPtr = lastOpPtr;
  211.   do {
  212.     jam();
  213.     opPtr.p->commitDeleteCheckFlag = ZTRUE;
  214.     if (elementDeleted) {
  215.       jam();
  216.       opPtr.p->elementIsDisappeared = ZTRUE;
  217.       opPtr.p->hashValue = hashValue;
  218.     }//if
  219.     opPtr.i = opPtr.p->prevParallelQue;
  220.     if (opPtr.i == RNIL) {
  221.       jam();
  222.       break;
  223.     }//if
  224.     ptrCheckGuard(opPtr, coprecsize, operationrec);
  225.   } while (true);
  226. }//Dbacc::commitDeleteCheck()
  227. /* ------------------------------------------------------------------------- */
  228. /* COMMIT_OPERATION                                                          */
  229. /* INPUT: OPERATION_REC_PTR, POINTER TO AN OPERATION RECORD                  */
  230. /* DESCRIPTION: THE OPERATION RECORD WILL BE TAKE OUT OF ANY LOCK QUEUE.     */
  231. /*         IF IT OWNS THE ELEMENT LOCK. HEAD OF THE ELEMENT WILL BE UPDATED. */
  232. /* ------------------------------------------------------------------------- */
  233. void Dbacc::commitOperation(Signal* signal) 
  234. {
  235.   OperationrecPtr tolqTmpPtr;
  236.   Page8Ptr coPageidptr;
  237.   Uint32 tcoElementptr;
  238.   Uint32 tmp2Olq;
  239.   if ((operationRecPtr.p->commitDeleteCheckFlag == ZFALSE) &&
  240.       (operationRecPtr.p->operation != ZSCAN_OP) &&
  241.       (operationRecPtr.p->operation != ZREAD)) {
  242.     jam();
  243.     /*  This method is used to check whether the end result of the transaction
  244.         will be to delete the tuple. In this case all operation will be marked
  245.         with elementIsDisappeared = true to ensure that the last operation
  246.         committed will remove the tuple. We only run this once per transaction
  247.         (commitDeleteCheckFlag = true if performed earlier) and we don't
  248.         execute this code when committing a scan operation since committing
  249.         a scan operation only means that the scan is continuing and the scan
  250.         lock is released.
  251.     */
  252.     commitDeleteCheck();
  253.   }//if
  254.   if (operationRecPtr.p->lockOwner == ZTRUE) {
  255.     takeOutLockOwnersList(signal, operationRecPtr);
  256.     if ((operationRecPtr.p->nextParallelQue == RNIL) &&
  257.         (operationRecPtr.p->nextSerialQue == RNIL) &&
  258.         (operationRecPtr.p->elementIsDisappeared == ZFALSE)) {
  259.       /* 
  260.        This is the normal path through the commit for operations owning the
  261.        lock without any queues and not a delete operation.
  262.       */
  263.       coPageidptr.i = operationRecPtr.p->elementPage;
  264.       tcoElementptr = operationRecPtr.p->elementPointer;
  265.       tmp2Olq = ElementHeader::setUnlocked(operationRecPtr.p->hashvaluePart,
  266.    operationRecPtr.p->scanBits);   
  267.       ptrCheckGuard(coPageidptr, cpagesize, page8);
  268.       dbgWord32(coPageidptr, tcoElementptr, tmp2Olq);
  269.       arrGuard(tcoElementptr, 2048);
  270.       coPageidptr.p->word32[tcoElementptr] = tmp2Olq;
  271.       return;
  272.     } else if ((operationRecPtr.p->nextParallelQue != RNIL) ||
  273.                (operationRecPtr.p->nextSerialQue != RNIL)) {
  274.       jam();
  275.       /*
  276.        The case when there is a queue lined up.
  277.        Release the lock and pass it to the next operation lined up.
  278.       */
  279.       releaselock(signal);
  280.       return;
  281.     } else {
  282.       jam();
  283.       /*
  284.        No queue and elementIsDisappeared is true. We perform the actual delete
  285.        operation.
  286.       */
  287.       commitdelete(signal, false);
  288.       return;
  289.     }//if
  290.   } else {
  291.     /*
  292.      THE OPERATION DOES NOT OWN THE LOCK. IT MUST BE IN A LOCK QUEUE OF THE
  293.      ELEMENT.
  294.     */
  295.     ndbrequire(operationRecPtr.p->prevParallelQue != RNIL);
  296.     jam();
  297.     tolqTmpPtr.i = operationRecPtr.p->prevParallelQue;
  298.     ptrCheckGuard(tolqTmpPtr, coprecsize, operationrec);
  299.     tolqTmpPtr.p->nextParallelQue = operationRecPtr.p->nextParallelQue;
  300.     if (operationRecPtr.p->nextParallelQue != RNIL) {
  301.       jam();
  302.       tolqTmpPtr.i = operationRecPtr.p->nextParallelQue;
  303.       ptrCheckGuard(tolqTmpPtr, coprecsize, operationrec);
  304.       tolqTmpPtr.p->prevParallelQue = operationRecPtr.p->prevParallelQue;
  305.     }//if
  306.     /**
  307.      * Check possible lock upgrade
  308.      * 1) Find lock owner
  309.      * 2) Count transactions in parallel que
  310.      * 3) If count == 1 and TRANSID(next serial) == TRANSID(lock owner)
  311.      *      upgrade next serial
  312.      */
  313.     if(operationRecPtr.p->lockMode)
  314.     {
  315.       jam();
  316.       /**
  317.        * Committing a non shared operation can't lead to lock upgrade
  318.        */
  319.       return;
  320.     }
  321.     
  322.     OperationrecPtr lock_owner;
  323.     lock_owner.i = operationRecPtr.p->prevParallelQue;
  324.     ptrCheckGuard(lock_owner, coprecsize, operationrec);
  325.     Uint32 transid[2] = { lock_owner.p->transId1, 
  326.   lock_owner.p->transId2 };
  327.     
  328.     
  329.     while(lock_owner.p->prevParallelQue != RNIL)
  330.     {
  331.       lock_owner.i = lock_owner.p->prevParallelQue;
  332.       ptrCheckGuard(lock_owner, coprecsize, operationrec);
  333.       
  334.       if(lock_owner.p->transId1 != transid[0] || 
  335.  lock_owner.p->transId2 != transid[1])
  336.       {
  337. jam();
  338. /**
  339.  * If more than 1 trans in lock queue -> no lock upgrade
  340.  */
  341. return;
  342.       }
  343.     }
  344.     
  345.     check_lock_upgrade(signal, lock_owner, operationRecPtr);
  346.   }
  347. }//Dbacc::commitOperation()
  348. void
  349. Dbacc::check_lock_upgrade(Signal* signal, 
  350.   OperationrecPtr lock_owner,
  351.   OperationrecPtr release_op)
  352. {
  353.   if((lock_owner.p->transId1 == release_op.p->transId1 &&
  354.       lock_owner.p->transId2 == release_op.p->transId2) ||
  355.      release_op.p->lockMode ||
  356.      lock_owner.p->nextSerialQue == RNIL)
  357.   {
  358.     jam();
  359.     /**
  360.      * No lock upgrade if same trans or lock owner has no serial queue
  361.      *                 or releasing non shared op
  362.      */
  363.     return;
  364.   }
  365.   OperationrecPtr next;
  366.   next.i = lock_owner.p->nextSerialQue;
  367.   ptrCheckGuard(next, coprecsize, operationrec);
  368.   
  369.   if(lock_owner.p->transId1 != next.p->transId1 ||
  370.      lock_owner.p->transId2 != next.p->transId2)
  371.   {
  372.     jam();
  373.     /**
  374.      * No lock upgrad if !same trans in serial queue
  375.      */
  376.     return;
  377.   }
  378.   
  379.   if (getNoParallelTransaction(lock_owner.p) > 1)
  380.   {
  381.     jam();
  382.     /**
  383.      * No lock upgrade if more than 1 transaction in parallell queue
  384.      */
  385.     return;
  386.   }
  387.   if (getNoParallelTransaction(next.p) > 1)
  388.   {
  389.     jam();
  390.     /**
  391.      * No lock upgrade if more than 1 transaction in next's parallell queue
  392.      */
  393.     return;
  394.   }
  395.   
  396.   OperationrecPtr tmp;
  397.   tmp.i = lock_owner.p->nextSerialQue = next.p->nextSerialQue;
  398.   if(tmp.i != RNIL)
  399.   {
  400.     ptrCheckGuard(tmp, coprecsize, operationrec);
  401.     ndbassert(tmp.p->prevSerialQue == next.i);
  402.     tmp.p->prevSerialQue = lock_owner.i;
  403.   }
  404.   next.p->nextSerialQue = next.p->prevSerialQue = RNIL;
  405.   
  406.   // Find end of parallell que
  407.   tmp = lock_owner;
  408.   Uint32 lockMode = next.p->lockMode > lock_owner.p->lockMode ?
  409.     next.p->lockMode : lock_owner.p->lockMode;
  410.   while(tmp.p->nextParallelQue != RNIL)
  411.   {
  412.     jam();
  413.     tmp.i = tmp.p->nextParallelQue;
  414.     tmp.p->lockMode = lockMode;
  415.     ptrCheckGuard(tmp, coprecsize, operationrec);
  416.   }
  417.   tmp.p->lockMode = lockMode;
  418.   
  419.   next.p->prevParallelQue = tmp.i;
  420.   tmp.p->nextParallelQue = next.i;
  421.   
  422.   OperationrecPtr save = operationRecPtr;
  423.   Uint32 localdata[2];
  424.   localdata[0] = lock_owner.p->localdata[0];
  425.   localdata[1] = lock_owner.p->localdata[1];
  426.   do {
  427.     next.p->localdata[0] = localdata[0];
  428.     next.p->localdata[1] = localdata[1];
  429.     next.p->lockMode = lockMode;
  430.     
  431.     operationRecPtr = next;
  432.     executeNextOperation(signal);
  433.     if (next.p->nextParallelQue != RNIL) 
  434.     {
  435.       jam();
  436.       next.i = next.p->nextParallelQue;
  437.       ptrCheckGuard(next, coprecsize, operationrec);
  438.     } else {
  439.       jam();
  440.       break;
  441.     }//if
  442.   } while (1);
  443.   
  444.   operationRecPtr = save;
  445.   
  446. }
  447. /* ------------------------------------------------------------------------- */
  448. /* RELEASELOCK                                                               */
  449. /*          RESETS LOCK OF AN ELEMENT.                                       */
  450. /*          INFORMATION ABOUT THE ELEMENT IS SAVED IN THE OPERATION RECORD   */
  451. /*          THESE INFORMATION IS USED TO UPDATE HEADER OF THE ELEMENT        */
  452. /* ------------------------------------------------------------------------- */
  453. void Dbacc::releaselock(Signal* signal) 
  454. {
  455.   OperationrecPtr rloOperPtr;
  456.   OperationrecPtr trlOperPtr;
  457.   OperationrecPtr trlTmpOperPtr;
  458.   Uint32 TelementIsDisappeared;
  459.   trlOperPtr.i = RNIL;
  460.   if (operationRecPtr.p->nextParallelQue != RNIL) {
  461.     jam();
  462.     /* --------------------------------------------------------------------------------- */
  463.     /* NEXT OPERATION TAKES OVER THE LOCK. We will simply move the info from the leader  */
  464.     // to the new queue leader.
  465.     /* --------------------------------------------------------------------------------- */
  466.     trlOperPtr.i = operationRecPtr.p->nextParallelQue;
  467.     ptrCheckGuard(trlOperPtr, coprecsize, operationrec);
  468.     copyInOperPtr = trlOperPtr;
  469.     copyOperPtr = operationRecPtr;
  470.     copyOpInfo(signal);
  471.     trlOperPtr.p->prevParallelQue = RNIL;
  472.     if (operationRecPtr.p->nextSerialQue != RNIL) {
  473.       jam();
  474.       /* --------------------------------------------------------------------------------- */
  475.       /* THERE IS A SERIAL QUEUE. MOVE IT FROM RELEASED OP REC TO THE NEW LOCK OWNER.      */
  476.       /* --------------------------------------------------------------------------------- */
  477.       trlOperPtr.p->nextSerialQue = operationRecPtr.p->nextSerialQue;
  478.       trlTmpOperPtr.i = trlOperPtr.p->nextSerialQue;
  479.       ptrCheckGuard(trlTmpOperPtr, coprecsize, operationrec);
  480.       trlTmpOperPtr.p->prevSerialQue = trlOperPtr.i;
  481.     }//if
  482.     check_lock_upgrade(signal, copyInOperPtr, operationRecPtr);
  483.     /* --------------------------------------------------------------------------------- */
  484.     /*       SINCE THERE ARE STILL ITEMS IN THE PARALLEL QUEUE WE NEED NOT WORRY ABOUT   */
  485.     /*       STARTING QUEUED OPERATIONS. THUS WE CAN END HERE.                           */
  486.     /* --------------------------------------------------------------------------------- */
  487.   } else {
  488.     ndbrequire(operationRecPtr.p->nextSerialQue != RNIL);
  489.     jam();
  490.     /* --------------------------------------------------------------------------------- */
  491.     /* THE PARALLEL QUEUE IS EMPTY AND THE SERIAL QUEUE IS NOT EMPTY. WE NEED TO         */
  492.     /* REARRANGE LISTS AND START A NUMBER OF OPERATIONS.                                 */
  493.     /* --------------------------------------------------------------------------------- */
  494.     trlOperPtr.i = operationRecPtr.p->nextSerialQue;
  495.     ptrCheckGuard(trlOperPtr, coprecsize, operationrec);
  496.     copyOperPtr = operationRecPtr;
  497.     copyInOperPtr = trlOperPtr;
  498.     copyOpInfo(signal);
  499.     trlOperPtr.p->prevSerialQue = RNIL;
  500.     ndbrequire(trlOperPtr.p->prevParallelQue == RNIL);
  501.     /* --------------------------------------------------------------------------------- */
  502.     /*       WE HAVE MOVED TO THE NEXT PARALLEL QUEUE. WE MUST START ALL OF THOSE        */
  503.     /*       OPERATIONS WHICH UP TILL NOW HAVE BEEN QUEUED WAITING FOR THE LOCK.         */
  504.     /* --------------------------------------------------------------------------------- */
  505.     rloOperPtr = operationRecPtr;
  506.     trlTmpOperPtr = trlOperPtr;
  507.     TelementIsDisappeared = trlOperPtr.p->elementIsDisappeared;
  508.     Uint32 ThashValue = trlOperPtr.p->hashValue;
  509.     do {
  510.       /* --------------------------------------------------------------------------------- */
  511.       // Ensure that all operations in the queue are assigned with the elementIsDisappeared
  512.       // to ensure that the element is removed after a previous delete. An insert does
  513.       // however revert this decision since the element is put back again. Local checkpoints
  514.       // complicate life here since they do not execute the next operation but simply change
  515.       // the state on the operation. We need to set-up the variable elementIsDisappeared
  516.       // properly even when local checkpoints and inserts/writes after deletes occur.
  517.       /* --------------------------------------------------------------------------------- */
  518.       trlTmpOperPtr.p->elementIsDisappeared = TelementIsDisappeared;
  519.       if (TelementIsDisappeared == ZTRUE) {
  520. /* --------------------------------------------------------------------------------- */
  521. // If the elementIsDisappeared is set then we know that the hashValue is also set
  522. // since it always originates from a committing abort or a aborting insert. Scans
  523. // do not initialise the hashValue and must have this value initialised if they are
  524. // to successfully commit the delete.
  525. /* --------------------------------------------------------------------------------- */
  526.         jam();
  527.         trlTmpOperPtr.p->hashValue = ThashValue;
  528.       }//if
  529.       trlTmpOperPtr.p->localdata[0] = trlOperPtr.p->localdata[0];
  530.       trlTmpOperPtr.p->localdata[1] = trlOperPtr.p->localdata[1];
  531.       /* --------------------------------------------------------------------------------- */
  532.       // Restart the queued operation.
  533.       /* --------------------------------------------------------------------------------- */
  534.       operationRecPtr = trlTmpOperPtr;
  535.       TelementIsDisappeared = executeNextOperation(signal);
  536.       ThashValue = operationRecPtr.p->hashValue;
  537.       if (trlTmpOperPtr.p->nextParallelQue != RNIL) {
  538.         jam();
  539. /* --------------------------------------------------------------------------------- */
  540. // We will continue with the next operation in the parallel queue and start this as
  541. // well.
  542. /* --------------------------------------------------------------------------------- */
  543.         trlTmpOperPtr.i = trlTmpOperPtr.p->nextParallelQue;
  544.         ptrCheckGuard(trlTmpOperPtr, coprecsize, operationrec);
  545.       } else {
  546.         jam();
  547.         break;
  548.       }//if
  549.     } while (1);
  550.     operationRecPtr = rloOperPtr;
  551.   }//if
  552.   // Insert the next op into the lock owner list
  553.   insertLockOwnersList(signal, trlOperPtr);
  554.   return;
  555. }//Dbacc::releaselock()
  556. /* --------------------------------------------------------------------------------- */
  557. /* COPY_OP_INFO                                                                      */
  558. /*        INPUT: COPY_IN_OPER_PTR AND COPY_OPER_PTR.                                 */
  559. /*        DESCRIPTION:INFORMATION ABOUT THE ELEMENT WILL BE MOVED FROM  OPERATION    */
  560. /*                      REC TO QUEUE OP REC. QUE OP REC TAKES OVER THE LOCK.         */
  561. /* --------------------------------------------------------------------------------- */
  562. void Dbacc::copyOpInfo(Signal* signal) 
  563. {
  564.   Page8Ptr coiPageidptr;
  565.   copyInOperPtr.p->elementPage = copyOperPtr.p->elementPage;
  566.   copyInOperPtr.p->elementIsforward = copyOperPtr.p->elementIsforward;
  567.   copyInOperPtr.p->elementContainer = copyOperPtr.p->elementContainer;
  568.   copyInOperPtr.p->elementPointer = copyOperPtr.p->elementPointer;
  569.   copyInOperPtr.p->scanBits = copyOperPtr.p->scanBits;
  570.   copyInOperPtr.p->hashvaluePart = copyOperPtr.p->hashvaluePart;
  571.   copyInOperPtr.p->elementIsDisappeared = copyOperPtr.p->elementIsDisappeared;
  572.   if (copyInOperPtr.p->elementIsDisappeared == ZTRUE) {
  573.     /* --------------------------------------------------------------------------------- */
  574.     // If the elementIsDisappeared is set then we know that the hashValue is also set
  575.     // since it always originates from a committing abort or a aborting insert. Scans
  576.     // do not initialise the hashValue and must have this value initialised if they are
  577.     // to successfully commit the delete.
  578.     /* --------------------------------------------------------------------------------- */
  579.     jam();
  580.     copyInOperPtr.p->hashValue = copyOperPtr.p->hashValue;
  581.   }//if
  582.   coiPageidptr.i = copyOperPtr.p->elementPage;
  583.   ptrCheckGuard(coiPageidptr, cpagesize, page8);
  584.   const Uint32 tmp = ElementHeader::setLocked(copyInOperPtr.i);
  585.   dbgWord32(coiPageidptr, copyOperPtr.p->elementPointer, tmp);
  586.   arrGuard(copyOperPtr.p->elementPointer, 2048);
  587.   coiPageidptr.p->word32[copyOperPtr.p->elementPointer] = tmp;
  588.   copyInOperPtr.p->localdata[0] = copyOperPtr.p->localdata[0];
  589.   copyInOperPtr.p->localdata[1] = copyOperPtr.p->localdata[1];
  590. }//Dbacc::copyOpInfo()
  591. /* ******************--------------------------------------------------------------- */
  592. /* EXECUTE NEXT OPERATION                                                            */
  593. /*          NEXT OPERATION IN A LOCK QUEUE WILL BE EXECUTED.                         */
  594. /* --------------------------------------------------------------------------------- */
  595. Uint32 Dbacc::executeNextOperation(Signal* signal) 
  596. {
  597.   ndbrequire(operationRecPtr.p->transactionstate == ACTIVE);
  598.   if (fragrecptr.p->stopQueOp == ZTRUE) {
  599.     Uint32 TelemDisappeared;
  600.     jam();
  601.     TelemDisappeared = operationRecPtr.p->elementIsDisappeared;
  602.     if ((operationRecPtr.p->elementIsDisappeared == ZTRUE) &&
  603.         (operationRecPtr.p->prevParallelQue == RNIL) &&
  604.         ((operationRecPtr.p->operation == ZINSERT) ||
  605.          (operationRecPtr.p->operation == ZWRITE))) {
  606.       jam();
  607.       /* --------------------------------------------------------------------------------- */
  608.       // In this case we do not wish to change the elementIsDisappeared since that would
  609.       // create an error the next time this method is called for this operation after local
  610.       // checkpoint starts up operations again. We must however ensure that operations
  611.       // that follow in the queue do not get the value ZTRUE when actually an INSERT/WRITE
  612.       // precedes them (only if the INSERT/WRITE is the first operation).
  613.       /* --------------------------------------------------------------------------------- */
  614.       TelemDisappeared = ZFALSE;
  615.     }//if    
  616.     /* --------------------------------------------------------------------------------- */
  617.     /*       A LOCAL CHECKPOINT HAS STOPPED OPERATIONS. WE MUST NOT START THE OPERATION  */
  618.     /*       AT THIS TIME. WE SET THE STATE TO INDICATE THAT WE ARE READY TO START AS    */
  619.     /*       SOON AS WE ARE ALLOWED.                                                     */
  620.     /* --------------------------------------------------------------------------------- */
  621.     operationRecPtr.p->opState = WAIT_EXE_OP;
  622.     return TelemDisappeared;
  623.   }//if
  624.   takeOutFragWaitQue(signal);
  625.   if (operationRecPtr.p->elementIsDisappeared == ZTRUE) {
  626.     /* --------------------------------------------------------------------------------- */
  627.     /* PREVIOUS OPERATION WAS DELETE OPERATION AND THE ELEMENT IS ALREADY DELETED.       */
  628.     /* --------------------------------------------------------------------------------- */
  629.     if (((operationRecPtr.p->operation != ZINSERT) &&
  630.  (operationRecPtr.p->operation != ZWRITE)) ||
  631.         (operationRecPtr.p->prevParallelQue != RNIL)) {
  632.       if (operationRecPtr.p->operation != ZSCAN_OP ||
  633.           operationRecPtr.p->isAccLockReq) {
  634.         jam();
  635. /* --------------------------------------------------------------------------------- */
  636. // Updates and reads with a previous delete simply aborts with read error indicating
  637. // that tuple did not exist. Also inserts and writes not being the first operation.
  638. /* --------------------------------------------------------------------------------- */
  639.         operationRecPtr.p->transactionstate = WAIT_COMMIT_ABORT;
  640.         signal->theData[0] = operationRecPtr.p->userptr;
  641.         signal->theData[1] = ZREAD_ERROR;
  642.         sendSignal(operationRecPtr.p->userblockref, GSN_ACCKEYREF, signal, 2, JBB);
  643.         return operationRecPtr.p->elementIsDisappeared;
  644.       } else {
  645. /* --------------------------------------------------------------------------------- */
  646. /* ABORT OF OPERATION NEEDED BUT THE OPERATION IS A SCAN => SPECIAL TREATMENT. */
  647. /* IF THE SCAN WAITS IN QUEUE THEN WE MUST REMOVE THE OPERATION FROM THE SCAN  */
  648. /* LOCK QUEUE AND IF NO MORE OPERATIONS ARE QUEUED THEN WE SHOULD RESTART THE  */
  649. /* SCAN PROCESS. OTHERWISE WE SIMPLY RELEASE THE OPERATION AND DECREASE THE    */
  650. /* NUMBER OF LOCKS HELD.                                                       */
  651. /* --------------------------------------------------------------------------------- */
  652.         takeOutScanLockQueue(operationRecPtr.p->scanRecPtr);
  653.         putReadyScanQueue(signal, operationRecPtr.p->scanRecPtr);
  654.         return operationRecPtr.p->elementIsDisappeared;
  655.       }//if
  656.     }//if
  657.     /* --------------------------------------------------------------------------------- */
  658.     // Insert and writes can continue but need to be converted to inserts.
  659.     /* --------------------------------------------------------------------------------- */
  660.     jam();
  661.     operationRecPtr.p->elementIsDisappeared = ZFALSE;
  662.     operationRecPtr.p->operation = ZINSERT;
  663.     operationRecPtr.p->insertIsDone = ZTRUE;
  664.   } else if (operationRecPtr.p->operation == ZINSERT) {
  665.     bool abortFlag = true;
  666.     if (operationRecPtr.p->prevParallelQue != RNIL) {
  667.       OperationrecPtr prevOpPtr;
  668.       jam();
  669.       prevOpPtr.i = operationRecPtr.p->prevParallelQue;
  670.       ptrCheckGuard(prevOpPtr, coprecsize, operationrec);
  671.       if (prevOpPtr.p->operation == ZDELETE) {
  672.         jam();
  673.         abortFlag = false;
  674.       }//if
  675.     }//if
  676.     if (abortFlag) {
  677.       jam();
  678.       /* --------------------------------------------------------------------------------- */
  679.       /* ELEMENT STILL REMAINS AND WE ARE TRYING TO INSERT IT AGAIN. THIS IS CLEARLY  */
  680.       /* NOT A GOOD IDEA.                                                             */
  681.       /* --------------------------------------------------------------------------------- */
  682.       operationRecPtr.p->transactionstate = WAIT_COMMIT_ABORT;
  683.       signal->theData[0] = operationRecPtr.p->userptr;
  684.       signal->theData[1] = ZWRITE_ERROR;
  685.       sendSignal(operationRecPtr.p->userblockref, GSN_ACCKEYREF, signal, 2, JBB);
  686.       return operationRecPtr.p->elementIsDisappeared;
  687.     }//if
  688.   } 
  689.   else if(operationRecPtr.p->operation == ZWRITE)
  690.   {
  691.     jam();
  692.     operationRecPtr.p->operation = ZINSERT;
  693.     if (operationRecPtr.p->prevParallelQue != RNIL) {
  694.       OperationrecPtr prevOpPtr;
  695.       jam();
  696.       prevOpPtr.i = operationRecPtr.p->prevParallelQue;
  697.       ptrCheckGuard(prevOpPtr, coprecsize, operationrec);
  698.       if (prevOpPtr.p->operation != ZDELETE) 
  699.       {
  700.         jam();
  701.         operationRecPtr.p->operation = ZUPDATE;
  702.       }
  703.     }
  704.   }
  705.   if (operationRecPtr.p->operation == ZSCAN_OP &&
  706.       ! operationRecPtr.p->isAccLockReq) {
  707.     jam();
  708.     takeOutScanLockQueue(operationRecPtr.p->scanRecPtr);
  709.     putReadyScanQueue(signal, operationRecPtr.p->scanRecPtr);
  710.   } else {
  711.     jam();
  712.     sendAcckeyconf(signal);
  713.     sendSignal(operationRecPtr.p->userblockref, GSN_ACCKEYCONF, signal, 6, JBB);
  714.   }//if
  715.   return operationRecPtr.p->elementIsDisappeared;
  716. }//Dbacc::executeNextOperation()
  717. /* --------------------------------------------------------------------------------- */
  718. /* TAKE_OUT_FRAG_WAIT_QUE                                                            */
  719. /*         DESCRIPTION: AN OPERATION WHICH OWNS A LOCK OF AN ELEMENT, IS IN A LIST   */
  720. /*                    OF THE FRAGMENT. THIS LIST IS USED TO STOP THE QUEUE OPERATION */
  721. /*                    DURING CREATE CHECK POINT PROSESS FOR STOP AND RESTART OF THE  */
  722. /*                OPERATIONS. THIS SUBRUTIN TAKES A OPERATION RECORD OUT OF THE LIST */
  723. /* -------------------------------------------------------------------------------- */
  724. void Dbacc::takeOutFragWaitQue(Signal* signal) 
  725. {
  726.   OperationrecPtr tofwqOperRecPtr;
  727.   if (operationRecPtr.p->opState == WAIT_IN_QUEUE) {
  728.     if (fragrecptr.p->sentWaitInQueOp == operationRecPtr.i) {
  729.       jam();
  730.       fragrecptr.p->sentWaitInQueOp = operationRecPtr.p->nextQueOp;
  731.     }//if
  732.     if (operationRecPtr.p->prevQueOp != RNIL) {
  733.       jam();
  734.       tofwqOperRecPtr.i = operationRecPtr.p->prevQueOp;
  735.       ptrCheckGuard(tofwqOperRecPtr, coprecsize, operationrec);
  736.       tofwqOperRecPtr.p->nextQueOp = operationRecPtr.p->nextQueOp;
  737.     } else {
  738.       jam();
  739.       fragrecptr.p->firstWaitInQueOp = operationRecPtr.p->nextQueOp;
  740.     }//if
  741.     if (operationRecPtr.p->nextQueOp != RNIL) {
  742.       jam();
  743.       tofwqOperRecPtr.i = operationRecPtr.p->nextQueOp;
  744.       ptrCheckGuard(tofwqOperRecPtr, coprecsize, operationrec);
  745.       tofwqOperRecPtr.p->prevQueOp = operationRecPtr.p->prevQueOp;
  746.     } else {
  747.       jam();
  748.       fragrecptr.p->lastWaitInQueOp = operationRecPtr.p->prevQueOp;
  749.     }//if
  750.     operationRecPtr.p->opState = FREE_OP;
  751.     return;
  752.   } else {
  753.     ndbrequire(operationRecPtr.p->opState == FREE_OP);
  754.   }//if
  755. }//Dbacc::takeOutFragWaitQue()
  756. /**
  757.  * takeOutLockOwnersList
  758.  *
  759.  * Description: Take out an operation from the doubly linked 
  760.  * lock owners list on the fragment.
  761.  *
  762.  */
  763. void Dbacc::takeOutLockOwnersList(Signal* signal,
  764.   const OperationrecPtr& outOperPtr) 
  765. {
  766.   const Uint32 Tprev = outOperPtr.p->prevLockOwnerOp;
  767.   const Uint32 Tnext = outOperPtr.p->nextLockOwnerOp;
  768. #ifdef VM_TRACE
  769.   // Check that operation is already in the list
  770.   OperationrecPtr tmpOperPtr;
  771.   bool inList = false;
  772.   tmpOperPtr.i = fragrecptr.p->lockOwnersList;
  773.   while (tmpOperPtr.i != RNIL){
  774.     ptrCheckGuard(tmpOperPtr, coprecsize, operationrec);
  775.     if (tmpOperPtr.i == outOperPtr.i)
  776.       inList = true;
  777.     tmpOperPtr.i = tmpOperPtr.p->nextLockOwnerOp;
  778.   }
  779.   ndbrequire(inList == true);
  780. #endif
  781.   ndbrequire(outOperPtr.p->lockOwner == ZTRUE);
  782.   outOperPtr.p->lockOwner = ZFALSE;
  783.   // Fast path through the code for the common case.
  784.   if ((Tprev == RNIL) && (Tnext == RNIL)) {
  785.     ndbrequire(fragrecptr.p->lockOwnersList == outOperPtr.i);
  786.     fragrecptr.p->lockOwnersList = RNIL;
  787.     return;
  788.   } 
  789.   // Check previous operation 
  790.   if (Tprev != RNIL) {    
  791.     jam();
  792.     arrGuard(Tprev, coprecsize);
  793.     operationrec[Tprev].nextLockOwnerOp = Tnext;
  794.   } else {
  795.     fragrecptr.p->lockOwnersList = Tnext;
  796.   }//if
  797.   // Check next operation
  798.   if (Tnext == RNIL) {
  799.     return;
  800.   } else {
  801.     jam();
  802.     arrGuard(Tnext, coprecsize);
  803.     operationrec[Tnext].prevLockOwnerOp = Tprev;
  804.   }//if
  805.   return;
  806. }//Dbacc::takeOutLockOwnersList()
  807. /**
  808.  * insertLockOwnersList
  809.  *
  810.  * Description: Insert an operation first in the dubly linked lock owners 
  811.  * list on the fragment.
  812.  *
  813.  */
  814. void Dbacc::insertLockOwnersList(Signal* signal, 
  815.  const OperationrecPtr& insOperPtr) 
  816. {
  817.   OperationrecPtr tmpOperPtr;
  818.   
  819. #ifdef VM_TRACE
  820.   // Check that operation is not already in list
  821.   tmpOperPtr.i = fragrecptr.p->lockOwnersList;
  822.   while(tmpOperPtr.i != RNIL){
  823.     ptrCheckGuard(tmpOperPtr, coprecsize, operationrec);
  824.     ndbrequire(tmpOperPtr.i != insOperPtr.i);
  825.     tmpOperPtr.i = tmpOperPtr.p->nextLockOwnerOp;    
  826.   }
  827. #endif
  828.   ndbrequire(insOperPtr.p->lockOwner == ZFALSE);
  829.   insOperPtr.p->lockOwner = ZTRUE;
  830.   insOperPtr.p->prevLockOwnerOp = RNIL;
  831.   tmpOperPtr.i = fragrecptr.p->lockOwnersList;
  832.   fragrecptr.p->lockOwnersList = insOperPtr.i;
  833.   insOperPtr.p->nextLockOwnerOp = tmpOperPtr.i;
  834.   if (tmpOperPtr.i == RNIL) {
  835.     return;
  836.   } else {
  837.     jam();
  838.     ptrCheckGuard(tmpOperPtr, coprecsize, operationrec);
  839.     tmpOperPtr.p->prevLockOwnerOp = insOperPtr.i;
  840.   }//if
  841. }//Dbacc::insertLockOwnersList()
  842. /* --------------------------------------------------------------------------------- */
  843. /* --------------------------------------------------------------------------------- */
  844. /* --------------------------------------------------------------------------------- */
  845. /*                                                                                   */
  846. /*       END OF COMMIT AND ABORT MODULE                                              */
  847. /*                                                                                   */
  848. /* --------------------------------------------------------------------------------- */
  849. /* --------------------------------------------------------------------------------- */
  850. /* --------------------------------------------------------------------------------- */
  851. /* --------------------------------------------------------------------------------- */
  852. /* --------------------------------------------------------------------------------- */
  853. /* ALLOC_OVERFLOW_PAGE                                                               */
  854. /*          DESCRIPTION:                                                             */
  855. /* --------------------------------------------------------------------------------- */
  856. void Dbacc::allocOverflowPage(Signal* signal) 
  857. {
  858.   DirRangePtr aopDirRangePtr;
  859.   DirectoryarrayPtr aopOverflowDirptr;
  860.   OverflowRecordPtr aopOverflowRecPtr;
  861.   Uint32 taopTmp1;
  862.   Uint32 taopTmp2;
  863.   Uint32 taopTmp3;
  864.   tresult = 0;
  865.   if ((cfirstfreepage == RNIL) &&
  866.       (cfreepage >= cpagesize)) {
  867.     jam();  
  868.     zpagesize_error("Dbacc::allocOverflowPage");
  869.     tresult = ZPAGESIZE_ERROR;
  870.     return;
  871.   }//if
  872.   if (fragrecptr.p->firstFreeDirindexRec != RNIL) {
  873.     jam();
  874.     /* FRAGRECPTR:FIRST_FREE_DIRINDEX_REC POINTS  */
  875.     /* TO THE FIRST ELEMENT IN A FREE LIST OF THE */
  876.     /* DIRECTORY INDEX WICH HAVE NULL AS PAGE     */
  877.     aopOverflowRecPtr.i = fragrecptr.p->firstFreeDirindexRec;
  878.     ptrCheckGuard(aopOverflowRecPtr, coverflowrecsize, overflowRecord);
  879.     troOverflowRecPtr.p = aopOverflowRecPtr.p;
  880.     takeRecOutOfFreeOverdir(signal);
  881.   } else if (cfirstfreeoverrec == RNIL) {
  882.     jam();
  883.     tresult = ZOVER_REC_ERROR;
  884.     return;
  885.   } else if ((cfirstfreedir == RNIL) &&
  886.              (cdirarraysize <= cdirmemory)) {
  887.     jam();
  888.     tresult = ZDIRSIZE_ERROR;
  889.     return;
  890.   } else {
  891.     jam();
  892.     seizeOverRec(signal);
  893.     aopOverflowRecPtr = sorOverflowRecPtr;
  894.     aopOverflowRecPtr.p->dirindex = fragrecptr.p->lastOverIndex;
  895.   }//if
  896.   aopOverflowRecPtr.p->nextOverRec = RNIL;
  897.   aopOverflowRecPtr.p->prevOverRec = RNIL;
  898.   fragrecptr.p->firstOverflowRec = aopOverflowRecPtr.i;
  899.   fragrecptr.p->lastOverflowRec = aopOverflowRecPtr.i;
  900.   taopTmp1 = aopOverflowRecPtr.p->dirindex;
  901.   aopDirRangePtr.i = fragrecptr.p->overflowdir;
  902.   taopTmp2 = taopTmp1 >> 8;
  903.   taopTmp3 = taopTmp1 & 0xff;
  904.   ptrCheckGuard(aopDirRangePtr, cdirrangesize, dirRange);
  905.   arrGuard(taopTmp2, 256);
  906.   if (aopDirRangePtr.p->dirArray[taopTmp2] == RNIL) {
  907.     jam();
  908.     seizeDirectory(signal);
  909.     ndbrequire(tresult <= ZLIMIT_OF_ERROR);
  910.     aopDirRangePtr.p->dirArray[taopTmp2] = sdDirptr.i;
  911.   }//if
  912.   aopOverflowDirptr.i = aopDirRangePtr.p->dirArray[taopTmp2];
  913.   seizePage(signal);
  914.   ndbrequire(tresult <= ZLIMIT_OF_ERROR);
  915.   ptrCheckGuard(aopOverflowDirptr, cdirarraysize, directoryarray);
  916.   aopOverflowDirptr.p->pagep[taopTmp3] = spPageptr.i;
  917.   tiopPageId = aopOverflowRecPtr.p->dirindex;
  918.   iopOverflowRecPtr = aopOverflowRecPtr;
  919.   iopPageptr = spPageptr;
  920.   initOverpage(signal);
  921.   aopOverflowRecPtr.p->overpage = spPageptr.i;
  922.   if (fragrecptr.p->lastOverIndex <= aopOverflowRecPtr.p->dirindex) {
  923.     jam();
  924.     ndbrequire(fragrecptr.p->lastOverIndex == aopOverflowRecPtr.p->dirindex);
  925.     fragrecptr.p->lastOverIndex++;
  926.   }//if
  927. }//Dbacc::allocOverflowPage()
  928. /* --------------------------------------------------------------------------------- */
  929. /* --------------------------------------------------------------------------------- */
  930. /* --------------------------------------------------------------------------------- */
  931. /*                                                                                   */
  932. /*       EXPAND/SHRINK MODULE                                                        */
  933. /*                                                                                   */
  934. /* --------------------------------------------------------------------------------- */
  935. /* --------------------------------------------------------------------------------- */
  936. /* ******************--------------------------------------------------------------- */
  937. /*EXPANDCHECK                                        EXPAND BUCKET ORD               */
  938. /* SENDER: ACC,    LEVEL B         */
  939. /*   INPUT:   FRAGRECPTR, POINTS TO A FRAGMENT RECORD.                               */
  940. /*   DESCRIPTION: A BUCKET OF A FRAGMENT PAGE WILL BE EXPAND INTO TWO BUCKETS        */
  941. /*                                 ACCORDING TO LH3.                                 */
  942. /* ******************--------------------------------------------------------------- */
  943. /* ******************--------------------------------------------------------------- */
  944. /* EXPANDCHECK                                        EXPAND BUCKET ORD              */
  945. /* ******************------------------------------+                                 */
  946. /* SENDER: ACC,    LEVEL B         */
  947. /* A BUCKET OF THE FRAGMENT WILL   */
  948. /* BE EXPANDED ACORDING TO LH3,    */
  949. /* AND COMMIT TRANSACTION PROCESS  */
  950. /* WILL BE CONTINUED */
  951. Uint32 Dbacc::checkScanExpand(Signal* signal)
  952. {
  953.   Uint32 Ti;
  954.   Uint32 TreturnCode = 0;
  955.   Uint32 TPageIndex;
  956.   Uint32 TDirInd;
  957.   Uint32 TSplit;
  958.   Uint32 TreleaseInd = 0;
  959.   Uint32 TreleaseScanBucket;
  960.   Uint32 TreleaseScanIndicator[4];
  961.   DirectoryarrayPtr TDirptr;
  962.   DirRangePtr TDirRangePtr;
  963.   Page8Ptr TPageptr;
  964.   ScanRecPtr TscanPtr;
  965.   RootfragmentrecPtr Trootfragrecptr;
  966.   Trootfragrecptr.i = fragrecptr.p->myroot;
  967.   TSplit = fragrecptr.p->p;
  968.   ptrCheckGuard(Trootfragrecptr, crootfragmentsize, rootfragmentrec);
  969.   for (Ti = 0; Ti < 4; Ti++) {
  970.     TreleaseScanIndicator[Ti] = 0;
  971.     if (Trootfragrecptr.p->scan[Ti] != RNIL) {
  972.       //-------------------------------------------------------------
  973.       // A scan is ongoing on this particular local fragment. We have
  974.       // to check its current state.
  975.       //-------------------------------------------------------------
  976.       TscanPtr.i = Trootfragrecptr.p->scan[Ti];
  977.       ptrCheckGuard(TscanPtr, cscanRecSize, scanRec);
  978.       if (TscanPtr.p->activeLocalFrag == fragrecptr.i) {
  979.         if (TscanPtr.p->scanBucketState ==  ScanRec::FIRST_LAP) {
  980.           if (TSplit == TscanPtr.p->nextBucketIndex) {
  981.             jam();
  982.     //-------------------------------------------------------------
  983.     // We are currently scanning this bucket. We cannot split it
  984.     // simultaneously with the scan. We have to pass this offer for
  985.     // splitting the bucket.
  986.     //-------------------------------------------------------------
  987.             TreturnCode = 1;
  988.             return TreturnCode;
  989.           } else if (TSplit > TscanPtr.p->nextBucketIndex) {
  990.             jam();
  991.     //-------------------------------------------------------------
  992.     // This bucket has not yet been scanned. We must reset the scanned
  993.     // bit indicator for this scan on this bucket.
  994.     //-------------------------------------------------------------
  995.             TreleaseScanIndicator[Ti] = 1;
  996.             TreleaseInd = 1;
  997.           } else {
  998.             jam();
  999.           }//if
  1000.         } else if (TscanPtr.p->scanBucketState ==  ScanRec::SECOND_LAP) {
  1001.           jam();
  1002.   //-------------------------------------------------------------
  1003.   // We are performing a second lap to handle buckets that was
  1004.   // merged during the first lap of scanning. During this second
  1005.   // lap we do not allow any splits or merges.
  1006.   //-------------------------------------------------------------
  1007.           TreturnCode = 1;
  1008.           return TreturnCode;
  1009.         } else {
  1010.           ndbrequire(TscanPtr.p->scanBucketState ==  ScanRec::SCAN_COMPLETED);
  1011.           jam();
  1012.   //-------------------------------------------------------------
  1013.   // The scan is completed and we can thus go ahead and perform
  1014.   // the split.
  1015.   //-------------------------------------------------------------
  1016.         }//if
  1017.       }//if
  1018.     }//if
  1019.   }//for
  1020.   if (TreleaseInd == 1) {
  1021.     TreleaseScanBucket = TSplit;
  1022.     TDirRangePtr.i = fragrecptr.p->directory;
  1023.     TPageIndex = TreleaseScanBucket & ((1 << fragrecptr.p->k) - 1); /* PAGE INDEX OBS K = 6 */
  1024.     TDirInd = TreleaseScanBucket >> fragrecptr.p->k; /* DIRECTORY INDEX OBS K = 6 */
  1025.     ptrCheckGuard(TDirRangePtr, cdirrangesize, dirRange);
  1026.     arrGuard((TDirInd >> 8), 256);
  1027.     TDirptr.i = TDirRangePtr.p->dirArray[TDirInd >> 8];
  1028.     ptrCheckGuard(TDirptr, cdirarraysize, directoryarray);
  1029.     TPageptr.i = TDirptr.p->pagep[TDirInd & 0xff];
  1030.     ptrCheckGuard(TPageptr, cpagesize, page8);
  1031.     for (Ti = 0; Ti < 4; Ti++) {
  1032.       if (TreleaseScanIndicator[Ti] == 1) {
  1033.         jam();
  1034.         scanPtr.i = Trootfragrecptr.p->scan[Ti];
  1035.         ptrCheckGuard(scanPtr, cscanRecSize, scanRec);
  1036.         rsbPageidptr = TPageptr;
  1037.         trsbPageindex = TPageIndex;
  1038.         releaseScanBucket(signal);
  1039.       }//if
  1040.     }//for
  1041.   }//if
  1042.   return TreturnCode;
  1043. }//Dbacc::checkScanExpand()
  1044. void Dbacc::execEXPANDCHECK2(Signal* signal) 
  1045. {
  1046.   jamEntry();
  1047.   if(refToBlock(signal->getSendersBlockRef()) == DBLQH){
  1048.     jam();
  1049.     reenable_expand_after_redo_log_exection_complete(signal);
  1050.     return;
  1051.   }
  1052.   DirectoryarrayPtr newDirptr;
  1053.   fragrecptr.i = signal->theData[0];
  1054.   tresult = 0; /* 0= FALSE,1= TRUE,> ZLIMIT_OF_ERROR =ERRORCODE */
  1055.   Uint32 tmp = 1;
  1056.   tmp = tmp << 31;
  1057.   ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
  1058.   fragrecptr.p->expandFlag = 0;
  1059.   if (fragrecptr.p->slack < tmp) {
  1060.     jam();
  1061.     /* IT MEANS THAT IF SLACK > ZERO */
  1062.     /*--------------------------------------------------------------*/
  1063.     /*       THE SLACK HAS IMPROVED AND IS NOW ACCEPTABLE AND WE    */
  1064.     /*       CAN FORGET ABOUT THE EXPAND PROCESS.                   */
  1065.     /*--------------------------------------------------------------*/
  1066.     return;
  1067.   }//if
  1068.   if (fragrecptr.p->firstOverflowRec == RNIL) {
  1069.     jam();
  1070.     allocOverflowPage(signal);
  1071.     if (tresult > ZLIMIT_OF_ERROR) {
  1072.       jam();
  1073.       /*--------------------------------------------------------------*/
  1074.       /* WE COULD NOT ALLOCATE ANY OVERFLOW PAGE. THUS WE HAVE TO STOP*/
  1075.       /* THE EXPAND SINCE WE CANNOT GUARANTEE ITS COMPLETION.         */
  1076.       /*--------------------------------------------------------------*/
  1077.       return;
  1078.     }//if
  1079.   }//if
  1080.   if (cfirstfreepage == RNIL) {
  1081.     if (cfreepage >= cpagesize) {
  1082.       jam();
  1083.       /*--------------------------------------------------------------*/
  1084.       /* WE HAVE TO STOP THE EXPAND PROCESS SINCE THERE ARE NO FREE   */
  1085.       /* PAGES. THIS MEANS THAT WE COULD BE FORCED TO CRASH SINCE WE  */
  1086.       /* CANNOT COMPLETE THE EXPAND. TO AVOID THE CRASH WE EXIT HERE. */
  1087.       /*--------------------------------------------------------------*/
  1088.       return;
  1089.     }//if
  1090.   }//if
  1091.   if (checkScanExpand(signal) == 1) {
  1092.     jam();
  1093.     /*--------------------------------------------------------------*/
  1094.     // A scan state was inconsistent with performing an expand
  1095.     // operation.
  1096.     /*--------------------------------------------------------------*/
  1097.     return;
  1098.   }//if
  1099.   if (fragrecptr.p->createLcp == ZTRUE) {
  1100.     if (remainingUndoPages() < ZMIN_UNDO_PAGES_AT_EXPAND) {
  1101.       jam();
  1102.       /*--------------------------------------------------------------*/
  1103.       // We did not have enough undo log buffers to start up an
  1104.       // expand operation
  1105.       /*--------------------------------------------------------------*/
  1106.       return;
  1107.     }//if
  1108.   }//if
  1109.   /*--------------------------------------------------------------------------*/
  1110.   /*       WE START BY FINDING THE PAGE, THE PAGE INDEX AND THE PAGE DIRECTORY*/
  1111.   /*       OF THE NEW BUCKET WHICH SHALL RECEIVE THE ELEMENT WHICH HAVE A 1 IN*/
  1112.   /*       THE NEXT HASH BIT. THIS BIT IS USED IN THE SPLIT MECHANISM TO      */
  1113.   /*       DECIDE WHICH ELEMENT GOES WHERE.                                   */
  1114.   /*--------------------------------------------------------------------------*/
  1115.   expDirRangePtr.i = fragrecptr.p->directory;
  1116.   texpReceivedBucket = (fragrecptr.p->maxp + fragrecptr.p->p) + 1; /* RECEIVED BUCKET */
  1117.   texpDirInd = texpReceivedBucket >> fragrecptr.p->k;
  1118.   newDirptr.i = RNIL;
  1119.   ptrNull(newDirptr);
  1120.   texpDirRangeIndex = texpDirInd >> 8;
  1121.   ptrCheckGuard(expDirRangePtr, cdirrangesize, dirRange);
  1122.   arrGuard(texpDirRangeIndex, 256);
  1123.   expDirptr.i = expDirRangePtr.p->dirArray[texpDirRangeIndex];
  1124.   if (expDirptr.i == RNIL) {
  1125.     jam();
  1126.     seizeDirectory(signal);
  1127.     if (tresult > ZLIMIT_OF_ERROR) {
  1128.       jam();
  1129.       return;
  1130.     } else {
  1131.       jam();
  1132.       newDirptr = sdDirptr;
  1133.       expDirptr = sdDirptr;
  1134.       expDirRangePtr.p->dirArray[texpDirRangeIndex] = sdDirptr.i;
  1135.     }//if
  1136.   } else {
  1137.     ptrCheckGuard(expDirptr, cdirarraysize, directoryarray);
  1138.   }//if
  1139.   texpDirPageIndex = texpDirInd & 0xff;
  1140.   expPageptr.i = expDirptr.p->pagep[texpDirPageIndex];
  1141.   if (expPageptr.i == RNIL) {
  1142.     jam();
  1143.     seizePage(signal);
  1144.     if (tresult > ZLIMIT_OF_ERROR) {
  1145.       jam();
  1146.       if (newDirptr.i != RNIL) {
  1147.         jam();
  1148.         rdDirptr.i = newDirptr.i;
  1149.         releaseDirectory(signal);
  1150.       }//if
  1151.       return;
  1152.     }//if
  1153.     expDirptr.p->pagep[texpDirPageIndex] = spPageptr.i;
  1154.     tipPageId = texpDirInd;
  1155.     inpPageptr = spPageptr;
  1156.     initPage(signal);
  1157.     fragrecptr.p->dirsize++;
  1158.     expPageptr = spPageptr;
  1159.   } else {
  1160.     ptrCheckGuard(expPageptr, cpagesize, page8);
  1161.   }//if
  1162.   fragrecptr.p->expReceivePageptr = expPageptr.i;
  1163.   fragrecptr.p->expReceiveIndex = texpReceivedBucket & ((1 << fragrecptr.p->k) - 1);
  1164.   /*--------------------------------------------------------------------------*/
  1165.   /*       THE NEXT ACTION IS TO FIND THE PAGE, THE PAGE INDEX AND THE PAGE   */
  1166.   /*       DIRECTORY OF THE BUCKET TO BE SPLIT.                               */
  1167.   /*--------------------------------------------------------------------------*/
  1168.   expDirRangePtr.i = fragrecptr.p->directory;
  1169.   cexcPageindex = fragrecptr.p->p & ((1 << fragrecptr.p->k) - 1); /* PAGE INDEX OBS K = 6 */
  1170.   texpDirInd = fragrecptr.p->p >> fragrecptr.p->k; /* DIRECTORY INDEX OBS K = 6 */
  1171.   ptrCheckGuard(expDirRangePtr, cdirrangesize, dirRange);
  1172.   arrGuard((texpDirInd >> 8), 256);
  1173.   expDirptr.i = expDirRangePtr.p->dirArray[texpDirInd >> 8];
  1174.   ptrCheckGuard(expDirptr, cdirarraysize, directoryarray);
  1175.   excPageptr.i = expDirptr.p->pagep[texpDirInd & 0xff];
  1176.   fragrecptr.p->expSenderIndex = cexcPageindex;
  1177.   fragrecptr.p->expSenderPageptr = excPageptr.i;
  1178.   if (excPageptr.i == RNIL) {
  1179.     jam();
  1180.     endofexpLab(signal); /* EMPTY BUCKET */
  1181.     return;
  1182.   }//if
  1183.   fragrecptr.p->expReceiveForward = ZTRUE;
  1184.   ptrCheckGuard(excPageptr, cpagesize, page8);
  1185.   expandcontainer(signal);
  1186.   endofexpLab(signal);
  1187.   return;
  1188. }//Dbacc::execEXPANDCHECK2()
  1189.   
  1190. void Dbacc::endofexpLab(Signal* signal) 
  1191. {
  1192.   fragrecptr.p->p++;
  1193.   fragrecptr.p->slack += fragrecptr.p->maxloadfactor;
  1194.   fragrecptr.p->expandCounter++;
  1195.   if (fragrecptr.p->p > fragrecptr.p->maxp) {
  1196.     jam();
  1197.     fragrecptr.p->maxp = (fragrecptr.p->maxp << 1) | 1;
  1198.     fragrecptr.p->lhdirbits++;
  1199.     fragrecptr.p->hashcheckbit++;
  1200.     fragrecptr.p->p = 0;
  1201.   }//if
  1202.   Uint32 noOfBuckets = (fragrecptr.p->maxp + 1) + fragrecptr.p->p;
  1203.   Uint32 Thysteres = fragrecptr.p->maxloadfactor - fragrecptr.p->minloadfactor;
  1204.   fragrecptr.p->slackCheck = noOfBuckets * Thysteres;
  1205.   if (fragrecptr.p->slack > (1u << 31)) {
  1206.     jam();
  1207.     /* IT MEANS THAT IF SLACK < ZERO */
  1208.     /* --------------------------------------------------------------------------------- */
  1209.     /*       IT IS STILL NECESSARY TO EXPAND THE FRAGMENT EVEN MORE. START IT FROM HERE  */
  1210.     /*       WITHOUT WAITING FOR NEXT COMMIT ON THE FRAGMENT.                            */
  1211.     /* --------------------------------------------------------------------------------- */
  1212.     fragrecptr.p->expandFlag = 2;
  1213.     signal->theData[0] = fragrecptr.i;
  1214.     signal->theData[1] = fragrecptr.p->p;
  1215.     signal->theData[2] = fragrecptr.p->maxp;
  1216.     sendSignal(cownBlockref, GSN_EXPANDCHECK2, signal, 3, JBB);
  1217.   }//if
  1218.   return;
  1219. }//Dbacc::endofexpLab()
  1220. void Dbacc::reenable_expand_after_redo_log_exection_complete(Signal* signal){
  1221.   tabptr.i = signal->theData[0];
  1222.   Uint32 fragId = signal->theData[1];
  1223.   ptrCheckGuard(tabptr, ctablesize, tabrec);
  1224.   ndbrequire(getrootfragmentrec(signal, rootfragrecptr, fragId));
  1225. #if 0
  1226.   ndbout_c("reenable expand check for table %d fragment: %d", 
  1227.    tabptr.i, fragId);
  1228. #endif
  1229.   for (Uint32 i = 0; i < 2; i++) {
  1230.     fragrecptr.i = rootfragrecptr.p->fragmentptr[i];
  1231.     ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
  1232.     switch(fragrecptr.p->expandFlag){
  1233.     case 0:
  1234.       /**
  1235.        * Hmm... this means that it's alreay has been reenabled...
  1236.        */
  1237.       ndbassert(false);
  1238.       continue;
  1239.     case 1:
  1240.       /**
  1241.        * Nothing is going on start expand check
  1242.        */
  1243.     case 2:
  1244.       /**
  1245.        * A shrink is running, do expand check anyway
  1246.        *  (to reset expandFlag)
  1247.        */
  1248.       fragrecptr.p->expandFlag = 2; 
  1249.       signal->theData[0] = fragrecptr.i;
  1250.       signal->theData[1] = fragrecptr.p->p;
  1251.       signal->theData[2] = fragrecptr.p->maxp;
  1252.       sendSignal(cownBlockref, GSN_EXPANDCHECK2, signal, 3, JBB);
  1253.       break;
  1254.     }
  1255.   }
  1256. }
  1257. void Dbacc::execDEBUG_SIG(Signal* signal) 
  1258. {
  1259.   jamEntry();
  1260.   expPageptr.i = signal->theData[0];
  1261.   progError(__LINE__,
  1262.     ERR_SR_UNDOLOG);
  1263.   return;
  1264. }//Dbacc::execDEBUG_SIG()
  1265. /* --------------------------------------------------------------------------------- */
  1266. /* EXPANDCONTAINER                                                                   */
  1267. /*        INPUT: EXC_PAGEPTR (POINTER TO THE ACTIVE PAGE RECORD)                     */
  1268. /*               CEXC_PAGEINDEX (INDEX OF THE BUCKET).                               */
  1269. /*                                                                                   */
  1270. /*        DESCRIPTION: THE HASH VALUE OF ALL ELEMENTS IN THE CONTAINER WILL BE       */
  1271. /*                  CHECKED. SOME OF THIS ELEMENTS HAVE TO MOVE TO THE NEW CONTAINER */
  1272. /* --------------------------------------------------------------------------------- */
  1273. void Dbacc::expandcontainer(Signal* signal) 
  1274. {
  1275.   Uint32 texcHashvalue;
  1276.   Uint32 texcTmp;
  1277.   Uint32 texcIndex;
  1278.   Uint32 texpKeyLen;
  1279.   Uint32 guard20;
  1280.   texpKeyLen = fragrecptr.p->keyLength;
  1281.   if (texpKeyLen == 0) {
  1282.     jam();
  1283.     texpKeyLen = ZACTIVE_LONG_KEY_LEN;
  1284.   }//if
  1285.   cexcPrevpageptr = RNIL;
  1286.   cexcPrevconptr = 0;
  1287.   cexcForward = ZTRUE;
  1288.  EXP_CONTAINER_LOOP:
  1289.   cexcContainerptr = (cexcPageindex << ZSHIFT_PLUS) - (cexcPageindex << ZSHIFT_MINUS);
  1290.   if (cexcForward == ZTRUE) {
  1291.     jam();
  1292.     cexcContainerptr = cexcContainerptr + ZHEAD_SIZE;
  1293.     cexcElementptr = cexcContainerptr + ZCON_HEAD_SIZE;
  1294.   } else {
  1295.     jam();
  1296.     cexcContainerptr = ((cexcContainerptr + ZHEAD_SIZE) + ZBUF_SIZE) - ZCON_HEAD_SIZE;
  1297.     cexcElementptr = cexcContainerptr - 1;
  1298.   }//if
  1299.   arrGuard(cexcContainerptr, 2048);
  1300.   cexcContainerhead = excPageptr.p->word32[cexcContainerptr];
  1301.   cexcContainerlen = cexcContainerhead >> 26;
  1302.   cexcMovedLen = ZCON_HEAD_SIZE;
  1303.   if (cexcContainerlen <= ZCON_HEAD_SIZE) {
  1304.     ndbrequire(cexcContainerlen >= ZCON_HEAD_SIZE);
  1305.     jam();
  1306.     goto NEXT_ELEMENT;
  1307.   }//if
  1308.  NEXT_ELEMENT_LOOP:
  1309.   idrOperationRecPtr.i = RNIL;
  1310.   ptrNull(idrOperationRecPtr);
  1311.   /* --------------------------------------------------------------------------------- */
  1312.   /*       CEXC_PAGEINDEX         PAGE INDEX OF CURRENT CONTAINER BEING EXAMINED.      */
  1313.   /*       CEXC_CONTAINERPTR      INDEX OF CURRENT CONTAINER BEING EXAMINED.           */
  1314.   /*       CEXC_ELEMENTPTR        INDEX OF CURRENT ELEMENT BEING EXAMINED.             */
  1315.   /*       EXC_PAGEPTR            PAGE WHERE CURRENT ELEMENT RESIDES.                  */
  1316.   /*       CEXC_PREVPAGEPTR        PAGE OF PREVIOUS CONTAINER.                         */
  1317.   /*       CEXC_PREVCONPTR        INDEX OF PREVIOUS CONTAINER                          */
  1318.   /*       CEXC_FORWARD           DIRECTION OF CURRENT CONTAINER                       */
  1319.   /* --------------------------------------------------------------------------------- */
  1320.   arrGuard(cexcElementptr, 2048);
  1321.   tidrElemhead = excPageptr.p->word32[cexcElementptr];
  1322.   if (ElementHeader::getUnlocked(tidrElemhead)){
  1323.     jam();
  1324.     texcHashvalue = ElementHeader::getHashValuePart(tidrElemhead);
  1325.   } else {
  1326.     jam();
  1327.     idrOperationRecPtr.i = ElementHeader::getOpPtrI(tidrElemhead);
  1328.     ptrCheckGuard(idrOperationRecPtr, coprecsize, operationrec);
  1329.     texcHashvalue = idrOperationRecPtr.p->hashvaluePart;
  1330.     if ((fragrecptr.p->createLcp == ZTRUE) &&
  1331.         (((texcHashvalue >> fragrecptr.p->hashcheckbit) & 1) != 0)) {
  1332.       jam();
  1333.       /* --------------------------------------------------------------------------------- */
  1334.       // During local checkpoints we must ensure that we restore the element header in
  1335.       // unlocked state and with the hash value part there with tuple status zeroed.
  1336.       // Otherwise a later insert over the same element will write an UNDO log that will
  1337.       // ensure that the now removed element is restored together with its locked element
  1338.       // header and without the hash value part.
  1339.       /* --------------------------------------------------------------------------------- */
  1340.       const Uint32 hv = idrOperationRecPtr.p->hashvaluePart;
  1341.       const Uint32 eh = ElementHeader::setUnlocked(hv, 0);
  1342.       excPageptr.p->word32[cexcElementptr] = eh;
  1343.     }//if
  1344.   }//if
  1345.   if (((texcHashvalue >> fragrecptr.p->hashcheckbit) & 1) == 0) {
  1346.     jam();
  1347.     /* --------------------------------------------------------------------------------- */
  1348.     /*       THIS ELEMENT IS NOT TO BE MOVED. WE CALCULATE THE WHEREABOUTS OF THE NEXT   */
  1349.     /*       ELEMENT AND PROCEED WITH THAT OR END THE SEARCH IF THERE ARE NO MORE        */
  1350.     /*       ELEMENTS IN THIS CONTAINER.                                                 */
  1351.     /* --------------------------------------------------------------------------------- */
  1352.     goto NEXT_ELEMENT;
  1353.   }//if
  1354.   /* --------------------------------------------------------------------------------- */
  1355.   /*       THE HASH BIT WAS SET AND WE SHALL MOVE THIS ELEMENT TO THE NEW BUCKET.      */
  1356.   /*       WE START BY READING THE ELEMENT TO BE ABLE TO INSERT IT INTO THE NEW BUCKET.*/
  1357.   /*       THEN WE INSERT THE ELEMENT INTO THE NEW BUCKET. THE NEXT STEP IS TO DELETE  */
  1358.   /*       THE ELEMENT FROM THIS BUCKET. THIS IS PERFORMED BY REPLACING IT WITH THE    */
  1359.   /*       LAST ELEMENT IN THE BUCKET. IF THIS ELEMENT IS TO BE MOVED WE MOVE IT AND   */
  1360.   /*       GET THE LAST ELEMENT AGAIN UNTIL WE EITHER FIND ONE THAT STAYS OR THIS      */
  1361.   /*       ELEMENT IS THE LAST ELEMENT.                                                */
  1362.   /* --------------------------------------------------------------------------------- */
  1363.   texcTmp = cexcElementptr + cexcForward;
  1364.   guard20 = fragrecptr.p->localkeylen - 1;
  1365.   for (texcIndex = 0; texcIndex <= guard20; texcIndex++) {
  1366.     arrGuard(texcIndex, 2);
  1367.     arrGuard(texcTmp, 2048);
  1368.     clocalkey[texcIndex] = excPageptr.p->word32[texcTmp];
  1369.     texcTmp = texcTmp + cexcForward;
  1370.   }//for
  1371.   guard20 = texpKeyLen - 1;
  1372.   for (texcIndex = 0; texcIndex <= guard20; texcIndex++) {
  1373.     arrGuard(texcIndex, 2048);
  1374.     arrGuard(texcTmp, 2048);
  1375.     ckeys[texcIndex] = excPageptr.p->word32[texcTmp];
  1376.     texcTmp = texcTmp + cexcForward;
  1377.   }//for
  1378.   tidrPageindex = fragrecptr.p->expReceiveIndex;
  1379.   idrPageptr.i = fragrecptr.p->expReceivePageptr;
  1380.   ptrCheckGuard(idrPageptr, cpagesize, page8);
  1381.   tidrForward = fragrecptr.p->expReceiveForward;
  1382.   tidrKeyLen = texpKeyLen;
  1383.   insertElement(signal);
  1384.   fragrecptr.p->expReceiveIndex = tidrPageindex;
  1385.   fragrecptr.p->expReceivePageptr = idrPageptr.i;
  1386.   fragrecptr.p->expReceiveForward = tidrForward;
  1387.  REMOVE_LAST_LOOP:
  1388.   jam();
  1389.   lastPageptr.i = excPageptr.i;
  1390.   lastPageptr.p = excPageptr.p;
  1391.   tlastContainerptr = cexcContainerptr;
  1392.   lastPrevpageptr.i = cexcPrevpageptr;
  1393.   ptrCheck(lastPrevpageptr, cpagesize, page8);
  1394.   tlastPrevconptr = cexcPrevconptr;
  1395.   arrGuard(tlastContainerptr, 2048);
  1396.   tlastContainerhead = lastPageptr.p->word32[tlastContainerptr];
  1397.   tlastContainerlen = tlastContainerhead >> 26;
  1398.   tlastForward = cexcForward;
  1399.   tlastPageindex = cexcPageindex;
  1400.   getLastAndRemove(signal);
  1401.   if (excPageptr.i == lastPageptr.i) {
  1402.     if (cexcElementptr == tlastElementptr) {
  1403.       jam();
  1404.       /* --------------------------------------------------------------------------------- */
  1405.       /*       THE CURRENT ELEMENT WAS ALSO THE LAST ELEMENT.                              */
  1406.       /* --------------------------------------------------------------------------------- */
  1407.       return;
  1408.     }//if
  1409.   }//if
  1410.   /* --------------------------------------------------------------------------------- */
  1411.   /*       THE CURRENT ELEMENT WAS NOT THE LAST ELEMENT. IF THE LAST ELEMENT SHOULD    */
  1412.   /*       STAY WE COPY IT TO THE POSITION OF THE CURRENT ELEMENT, OTHERWISE WE INSERT */
  1413.   /*       INTO THE NEW BUCKET, REMOVE IT AND TRY WITH THE NEW LAST ELEMENT.           */
  1414.   /* --------------------------------------------------------------------------------- */
  1415.   idrOperationRecPtr.i = RNIL;
  1416.   ptrNull(idrOperationRecPtr);
  1417.   arrGuard(tlastElementptr, 2048);
  1418.   tidrElemhead = lastPageptr.p->word32[tlastElementptr];
  1419.   if (ElementHeader::getUnlocked(tidrElemhead)) {
  1420.     jam();
  1421.     texcHashvalue = ElementHeader::getHashValuePart(tidrElemhead);
  1422.   } else {
  1423.     jam();
  1424.     idrOperationRecPtr.i = ElementHeader::getOpPtrI(tidrElemhead);
  1425.     ptrCheckGuard(idrOperationRecPtr, coprecsize, operationrec);
  1426.     texcHashvalue = idrOperationRecPtr.p->hashvaluePart;
  1427.     if ((fragrecptr.p->createLcp == ZTRUE) &&
  1428.         (((texcHashvalue >> fragrecptr.p->hashcheckbit) & 1) != 0)) {
  1429.       jam();
  1430.       /* --------------------------------------------------------------------------------- */
  1431.       // During local checkpoints we must ensure that we restore the element header in
  1432.       // unlocked state and with the hash value part there with tuple status zeroed.
  1433.       // Otherwise a later insert over the same element will write an UNDO log that will
  1434.       // ensure that the now removed element is restored together with its locked element
  1435.       // header and without the hash value part.
  1436.       /* --------------------------------------------------------------------------------- */
  1437.       const Uint32 hv = idrOperationRecPtr.p->hashvaluePart;
  1438.       const Uint32 eh = ElementHeader::setUnlocked(hv, 0);
  1439.       lastPageptr.p->word32[tlastElementptr] = eh;
  1440.     }//if
  1441.   }//if
  1442.   if (((texcHashvalue >> fragrecptr.p->hashcheckbit) & 1) == 0) {
  1443.     jam();
  1444.     /* --------------------------------------------------------------------------------- */
  1445.     /*       THE LAST ELEMENT IS NOT TO BE MOVED. WE COPY IT TO THE CURRENT ELEMENT.     */
  1446.     /* --------------------------------------------------------------------------------- */
  1447.     delPageptr = excPageptr;
  1448.     tdelContainerptr = cexcContainerptr;
  1449.     tdelForward = cexcForward;
  1450.     tdelElementptr = cexcElementptr;
  1451.     deleteElement(signal);
  1452.   } else {
  1453.     jam();
  1454.     /* --------------------------------------------------------------------------------- */
  1455.     /*       THE LAST ELEMENT IS ALSO TO BE MOVED.                                       */
  1456.     /* --------------------------------------------------------------------------------- */
  1457.     texcTmp = tlastElementptr + tlastForward;
  1458.     for (texcIndex = 0; texcIndex < fragrecptr.p->localkeylen; texcIndex++) {
  1459.       arrGuard(texcIndex, 2);
  1460.       arrGuard(texcTmp, 2048);
  1461.       clocalkey[texcIndex] = lastPageptr.p->word32[texcTmp];
  1462.       texcTmp = texcTmp + tlastForward;
  1463.     }//for
  1464.     for (texcIndex = 0; texcIndex < texpKeyLen; texcIndex++) {
  1465.       arrGuard(texcIndex, 2048);
  1466.       arrGuard(texcTmp, 2048);
  1467.       ckeys[texcIndex] = lastPageptr.p->word32[texcTmp];
  1468.       texcTmp = texcTmp + tlastForward;
  1469.     }//for
  1470.     tidrPageindex = fragrecptr.p->expReceiveIndex;
  1471.     idrPageptr.i = fragrecptr.p->expReceivePageptr;
  1472.     ptrCheckGuard(idrPageptr, cpagesize, page8);
  1473.     tidrForward = fragrecptr.p->expReceiveForward;
  1474.     tidrKeyLen = texpKeyLen;
  1475.     insertElement(signal);
  1476.     fragrecptr.p->expReceiveIndex = tidrPageindex;
  1477.     fragrecptr.p->expReceivePageptr = idrPageptr.i;
  1478.     fragrecptr.p->expReceiveForward = tidrForward;
  1479.     goto REMOVE_LAST_LOOP;
  1480.   }//if
  1481.  NEXT_ELEMENT:
  1482.   arrGuard(cexcContainerptr, 2048);
  1483.   cexcContainerhead = excPageptr.p->word32[cexcContainerptr];
  1484.   cexcMovedLen = cexcMovedLen + fragrecptr.p->elementLength;
  1485.   if ((cexcContainerhead >> 26) > cexcMovedLen) {
  1486.     jam();
  1487.     /* --------------------------------------------------------------------------------- */
  1488.     /*       WE HAVE NOT YET MOVED THE COMPLETE CONTAINER. WE PROCEED WITH THE NEXT      */
  1489.     /*       ELEMENT IN THE CONTAINER. IT IS IMPORTANT TO READ THE CONTAINER LENGTH      */
  1490.     /*       FROM THE CONTAINER HEADER SINCE IT MIGHT CHANGE BY REMOVING THE LAST        */
  1491.     /*       ELEMENT IN THE BUCKET.                                                      */
  1492.     /* --------------------------------------------------------------------------------- */
  1493.     cexcElementptr = cexcElementptr + (cexcForward * fragrecptr.p->elementLength);
  1494.     goto NEXT_ELEMENT_LOOP;
  1495.   }//if
  1496.   if (((cexcContainerhead >> 7) & 3) != 0) {
  1497.     jam();
  1498.     /* --------------------------------------------------------------------------------- */
  1499.     /*       WE PROCEED TO THE NEXT CONTAINER IN THE BUCKET.                             */
  1500.     /* --------------------------------------------------------------------------------- */
  1501.     cexcPrevpageptr = excPageptr.i;
  1502.     cexcPrevconptr = cexcContainerptr;
  1503.     nextcontainerinfoExp(signal);
  1504.     goto EXP_CONTAINER_LOOP;
  1505.   }//if
  1506. }//Dbacc::expandcontainer()
  1507. /* ******************--------------------------------------------------------------- */
  1508. /* SHRINKCHECK                                        JOIN BUCKET ORD                */
  1509. /*                                                   SENDER: ACC,    LEVEL B         */
  1510. /*   INPUT:   FRAGRECPTR, POINTS TO A FRAGMENT RECORD.                               */
  1511. /*   DESCRIPTION: TWO BUCKET OF A FRAGMENT PAGE WILL BE JOINED TOGETHER              */
  1512. /*                                 ACCORDING TO LH3.                                 */
  1513. /* ******************--------------------------------------------------------------- */
  1514. /* ******************--------------------------------------------------------------- */
  1515. /* SHRINKCHECK                                            JOIN BUCKET ORD            */
  1516. /* ******************------------------------------+                                 */
  1517. /*   SENDER: ACC,    LEVEL B       */
  1518. /* TWO BUCKETS OF THE FRAGMENT     */
  1519. /* WILL BE JOINED  ACORDING TO LH3 */
  1520. /* AND COMMIT TRANSACTION PROCESS  */
  1521. /* WILL BE CONTINUED */
  1522. Uint32 Dbacc::checkScanShrink(Signal* signal)
  1523. {
  1524.   Uint32 Ti;
  1525.   Uint32 TreturnCode = 0;
  1526.   Uint32 TPageIndex;
  1527.   Uint32 TDirInd;
  1528.   Uint32 TmergeDest;
  1529.   Uint32 TmergeSource;
  1530.   Uint32 TreleaseScanBucket;
  1531.   Uint32 TreleaseInd = 0;
  1532.   Uint32 TreleaseScanIndicator[4];
  1533.   DirectoryarrayPtr TDirptr;
  1534.   DirRangePtr TDirRangePtr;
  1535.   Page8Ptr TPageptr;
  1536.   ScanRecPtr TscanPtr;
  1537.   RootfragmentrecPtr Trootfragrecptr;
  1538.   Trootfragrecptr.i = fragrecptr.p->myroot;
  1539.   ptrCheckGuard(Trootfragrecptr, crootfragmentsize, rootfragmentrec);
  1540.   if (fragrecptr.p->p == 0) {
  1541.     jam();
  1542.     TmergeDest = fragrecptr.p->maxp >> 1;
  1543.   } else {
  1544.     jam();
  1545.     TmergeDest = fragrecptr.p->p - 1;
  1546.   }//if
  1547.   TmergeSource = fragrecptr.p->maxp + fragrecptr.p->p;
  1548.   for (Ti = 0; Ti < 4; Ti++) {
  1549.     TreleaseScanIndicator[Ti] = 0;
  1550.     if (Trootfragrecptr.p->scan[Ti] != RNIL) {
  1551.       TscanPtr.i = Trootfragrecptr.p->scan[Ti];
  1552.       ptrCheckGuard(TscanPtr, cscanRecSize, scanRec);
  1553.       if (TscanPtr.p->activeLocalFrag == fragrecptr.i) {
  1554. //-------------------------------------------------------------
  1555. // A scan is ongoing on this particular local fragment. We have
  1556. // to check its current state.
  1557. //-------------------------------------------------------------
  1558.         if (TscanPtr.p->scanBucketState ==  ScanRec::FIRST_LAP) {
  1559.           jam();
  1560.           if ((TmergeDest == TscanPtr.p->nextBucketIndex) ||
  1561.               (TmergeSource == TscanPtr.p->nextBucketIndex)) {
  1562.             jam();
  1563.     //-------------------------------------------------------------
  1564.     // We are currently scanning one of the buckets involved in the
  1565.     // merge. We cannot merge while simultaneously performing a scan.
  1566.     // We have to pass this offer for merging the buckets.
  1567.     //-------------------------------------------------------------
  1568.             TreturnCode = 1;
  1569.             return TreturnCode;
  1570.           } else if (TmergeDest < TscanPtr.p->nextBucketIndex) {
  1571.             jam();
  1572.             TreleaseScanIndicator[Ti] = 1;
  1573.             TreleaseInd = 1;
  1574.           }//if
  1575.         } else if (TscanPtr.p->scanBucketState ==  ScanRec::SECOND_LAP) {
  1576.           jam();
  1577.   //-------------------------------------------------------------
  1578.   // We are performing a second lap to handle buckets that was
  1579.   // merged during the first lap of scanning. During this second
  1580.   // lap we do not allow any splits or merges.
  1581.   //-------------------------------------------------------------
  1582.           TreturnCode = 1;
  1583.           return TreturnCode;
  1584.         } else if (TscanPtr.p->scanBucketState ==  ScanRec::SCAN_COMPLETED) {
  1585.           jam();
  1586.   //-------------------------------------------------------------
  1587.   // The scan is completed and we can thus go ahead and perform
  1588.   // the split.
  1589.   //-------------------------------------------------------------
  1590.         } else {
  1591.           jam();
  1592.           sendSystemerror(signal);
  1593.           return TreturnCode;
  1594.         }//if
  1595.       }//if
  1596.     }//if
  1597.   }//for
  1598.   if (TreleaseInd == 1) {
  1599.     jam();
  1600.     TreleaseScanBucket = TmergeSource;
  1601.     TDirRangePtr.i = fragrecptr.p->directory;
  1602.     TPageIndex = TreleaseScanBucket & ((1 << fragrecptr.p->k) - 1); /* PAGE INDEX OBS K = 6 */
  1603.     TDirInd = TreleaseScanBucket >> fragrecptr.p->k; /* DIRECTORY INDEX OBS K = 6 */
  1604.     ptrCheckGuard(TDirRangePtr, cdirrangesize, dirRange);
  1605.     arrGuard((TDirInd >> 8), 256);
  1606.     TDirptr.i = TDirRangePtr.p->dirArray[TDirInd >> 8];
  1607.     ptrCheckGuard(TDirptr, cdirarraysize, directoryarray);
  1608.     TPageptr.i = TDirptr.p->pagep[TDirInd & 0xff];
  1609.     ptrCheckGuard(TPageptr, cpagesize, page8);
  1610.     for (Ti = 0; Ti < 4; Ti++) {
  1611.       if (TreleaseScanIndicator[Ti] == 1) {
  1612.         jam();
  1613.         scanPtr.i = Trootfragrecptr.p->scan[Ti];
  1614.         ptrCheckGuard(scanPtr, cscanRecSize, scanRec);
  1615.         rsbPageidptr.i = TPageptr.i;
  1616.         rsbPageidptr.p = TPageptr.p;
  1617.         trsbPageindex = TPageIndex;
  1618.         releaseScanBucket(signal);
  1619.         if (TmergeDest < scanPtr.p->minBucketIndexToRescan) {
  1620.           jam();
  1621.   //-------------------------------------------------------------
  1622.   // We have to keep track of the starting bucket to Rescan in the
  1623.   // second lap.
  1624.   //-------------------------------------------------------------
  1625.           scanPtr.p->minBucketIndexToRescan = TmergeDest;
  1626.         }//if
  1627.         if (TmergeDest > scanPtr.p->maxBucketIndexToRescan) {
  1628.           jam();
  1629.   //-------------------------------------------------------------
  1630.   // We have to keep track of the ending bucket to Rescan in the
  1631.   // second lap.
  1632.   //-------------------------------------------------------------
  1633.           scanPtr.p->maxBucketIndexToRescan = TmergeDest;
  1634.         }//if
  1635.       }//if
  1636.     }//for
  1637.   }//if
  1638.   return TreturnCode;
  1639. }//Dbacc::checkScanShrink()
  1640. void Dbacc::execSHRINKCHECK2(Signal* signal) 
  1641. {
  1642.   Uint32 tshrTmp1;
  1643.   jamEntry();
  1644.   fragrecptr.i = signal->theData[0];
  1645.   Uint32 oldFlag = signal->theData[3];
  1646.   ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
  1647.   fragrecptr.p->expandFlag = oldFlag;
  1648.   tresult = 0; /* 0= FALSE,1= TRUE,> ZLIMIT_OF_ERROR =ERRORCODE */
  1649.   if (fragrecptr.p->slack <= fragrecptr.p->slackCheck) {
  1650.     jam();
  1651.     /* TIME FOR JOIN BUCKETS PROCESS */
  1652.     /*--------------------------------------------------------------*/
  1653.     /*       NO LONGER NECESSARY TO SHRINK THE FRAGMENT.            */
  1654.     /*--------------------------------------------------------------*/
  1655.     return;
  1656.   }//if
  1657.   if (fragrecptr.p->slack > (1u << 31)) {
  1658.     jam();
  1659.     /*--------------------------------------------------------------*/
  1660.     /* THE SLACK IS NEGATIVE, IN THIS CASE WE WILL NOT NEED ANY     */
  1661.     /* SHRINK.                                                      */
  1662.     /*--------------------------------------------------------------*/
  1663.     return;
  1664.   }//if
  1665.   texpDirInd = (fragrecptr.p->maxp + fragrecptr.p->p) >> fragrecptr.p->k;
  1666.   if (((fragrecptr.p->maxp + fragrecptr.p->p) & ((1 << fragrecptr.p->k) - 1)) == 0) {
  1667.     if (fragrecptr.p->createLcp == ZTRUE) {
  1668.       if (fragrecptr.p->fragState == LCP_SEND_PAGES) {
  1669.         if (fragrecptr.p->lcpMaxDirIndex > texpDirInd) {
  1670.           if (fragrecptr.p->lcpDirIndex <= texpDirInd) {
  1671.             jam();
  1672.     /*--------------------------------------------------------------*/
  1673.     /*       WE DO NOT ALLOW ANY SHRINKS THAT REMOVE PAGES THAT ARE */
  1674.     /*       NEEDED AS PART OF THE LOCAL CHECKPOINT.                */
  1675.     /*--------------------------------------------------------------*/
  1676.             return;
  1677.           }//if
  1678.         }//if
  1679.       }//if
  1680.     }//if
  1681.   }//if
  1682.   if (fragrecptr.p->firstOverflowRec == RNIL) {
  1683.     jam();
  1684.     allocOverflowPage(signal);
  1685.     if (tresult > ZLIMIT_OF_ERROR) {
  1686.       jam();
  1687.       return;
  1688.     }//if
  1689.   }//if
  1690.   if (cfirstfreepage == RNIL) {
  1691.     if (cfreepage >= cpagesize) {
  1692.       jam();
  1693.       /*--------------------------------------------------------------*/
  1694.       /* WE HAVE TO STOP THE SHRINK PROCESS SINCE THERE ARE NO FREE   */
  1695.       /* PAGES. THIS MEANS THAT WE COULD BE FORCED TO CRASH SINCE WE  */
  1696.       /* CANNOT COMPLETE THE SHRINK. TO AVOID THE CRASH WE EXIT HERE. */
  1697.       /*--------------------------------------------------------------*/
  1698.       return;
  1699.     }//if
  1700.   }//if
  1701.   if (checkScanShrink(signal) == 1) {
  1702.     jam();
  1703.     /*--------------------------------------------------------------*/
  1704.     // A scan state was inconsistent with performing a shrink
  1705.     // operation.
  1706.     /*--------------------------------------------------------------*/
  1707.     return;
  1708.   }//if
  1709.   if (fragrecptr.p->createLcp == ZTRUE) {
  1710.     if (remainingUndoPages() < ZMIN_UNDO_PAGES_AT_EXPAND) {
  1711.       jam();
  1712.       /*--------------------------------------------------------------*/
  1713.       // We did not have enough undo log buffers to start up an
  1714.       // shrink operation
  1715.       /*--------------------------------------------------------------*/
  1716.       return;
  1717.     }//if
  1718.   }//if
  1719.   if (fragrecptr.p->p == 0) {
  1720.     jam();
  1721.     fragrecptr.p->maxp = fragrecptr.p->maxp >> 1;
  1722.     fragrecptr.p->p = fragrecptr.p->maxp;
  1723.     fragrecptr.p->lhdirbits--;
  1724.     fragrecptr.p->hashcheckbit--;
  1725.   } else {
  1726.     jam();
  1727.     fragrecptr.p->p--;
  1728.   }//if
  1729.   /*--------------------------------------------------------------------------*/
  1730.   /*       WE START BY FINDING THE NECESSARY INFORMATION OF THE BUCKET TO BE  */
  1731.   /*       REMOVED WHICH WILL SEND ITS ELEMENTS TO THE RECEIVING BUCKET.      */
  1732.   /*--------------------------------------------------------------------------*/
  1733.   expDirRangePtr.i = fragrecptr.p->directory;
  1734.   cexcPageindex = ((fragrecptr.p->maxp + fragrecptr.p->p) + 1) & ((1 << fragrecptr.p->k) - 1);
  1735.   texpDirInd = ((fragrecptr.p->maxp + fragrecptr.p->p) + 1) >> fragrecptr.p->k;
  1736.   texpDirRangeIndex = texpDirInd >> 8;
  1737.   texpDirPageIndex = texpDirInd & 0xff;
  1738.   ptrCheckGuard(expDirRangePtr, cdirrangesize, dirRange);
  1739.   arrGuard(texpDirRangeIndex, 256);
  1740.   expDirptr.i = expDirRangePtr.p->dirArray[texpDirRangeIndex];
  1741.   ptrCheckGuard(expDirptr, cdirarraysize, directoryarray);
  1742.   excPageptr.i = expDirptr.p->pagep[texpDirPageIndex];
  1743.   fragrecptr.p->expSenderDirptr = expDirptr.i;
  1744.   fragrecptr.p->expSenderIndex = cexcPageindex;
  1745.   fragrecptr.p->expSenderPageptr = excPageptr.i;
  1746.   fragrecptr.p->expSenderDirIndex = texpDirInd;
  1747.   /*--------------------------------------------------------------------------*/
  1748.   /*       WE NOW PROCEED BY FINDING THE NECESSARY INFORMATION ABOUT THE      */
  1749.   /*       RECEIVING BUCKET.                                                  */
  1750.   /*--------------------------------------------------------------------------*/
  1751.   expDirRangePtr.i = fragrecptr.p->directory;
  1752.   texpReceivedBucket = fragrecptr.p->p >> fragrecptr.p->k;
  1753.   ptrCheckGuard(expDirRangePtr, cdirrangesize, dirRange);
  1754.   arrGuard((texpReceivedBucket >> 8), 256);
  1755.   expDirptr.i = expDirRangePtr.p->dirArray[texpReceivedBucket >> 8];
  1756.   ptrCheckGuard(expDirptr, cdirarraysize, directoryarray);
  1757.   fragrecptr.p->expReceivePageptr = expDirptr.p->pagep[texpReceivedBucket & 0xff];
  1758.   fragrecptr.p->expReceiveIndex = fragrecptr.p->p & ((1 << fragrecptr.p->k) - 1);
  1759.   fragrecptr.p->expReceiveForward = ZTRUE;
  1760.   if (excPageptr.i == RNIL) {
  1761.     jam();
  1762.     endofshrinkbucketLab(signal); /* EMPTY BUCKET */
  1763.     return;
  1764.   }//if
  1765.   /*--------------------------------------------------------------------------*/
  1766.   /*       INITIALISE THE VARIABLES FOR THE SHRINK PROCESS.                   */
  1767.   /*--------------------------------------------------------------------------*/
  1768.   ptrCheckGuard(excPageptr, cpagesize, page8);
  1769.   cexcForward = ZTRUE;
  1770.   cexcContainerptr = (cexcPageindex << ZSHIFT_PLUS) - (cexcPageindex << ZSHIFT_MINUS);
  1771.   cexcContainerptr = cexcContainerptr + ZHEAD_SIZE;
  1772.   arrGuard(cexcContainerptr, 2048);
  1773.   cexcContainerhead = excPageptr.p->word32[cexcContainerptr];
  1774.   cexcContainerlen = cexcContainerhead >> 26;
  1775.   if (cexcContainerlen <= ZCON_HEAD_SIZE) {
  1776.     ndbrequire(cexcContainerlen == ZCON_HEAD_SIZE);
  1777.   } else {
  1778.     jam();
  1779.     shrinkcontainer(signal);
  1780.   }//if
  1781.   /*--------------------------------------------------------------------------*/
  1782.   /*       THIS CONTAINER IS NOT YET EMPTY AND WE REMOVE ALL THE ELEMENTS.    */
  1783.   /*--------------------------------------------------------------------------*/
  1784.   if (((cexcContainerhead >> 10) & 1) == 1) {
  1785.     jam();
  1786.     rlPageptr = excPageptr;
  1787.     trlPageindex = cexcPageindex;
  1788.     trlRelCon = ZFALSE;
  1789.     turlIndex = cexcContainerptr + (ZBUF_SIZE - ZCON_HEAD_SIZE);
  1790.     releaseRightlist(signal);
  1791.   }//if
  1792.   tshrTmp1 = ZCON_HEAD_SIZE;
  1793.   tshrTmp1 = tshrTmp1 << 26;
  1794.   if (fragrecptr.p->createLcp == ZTRUE) {
  1795.     jam();
  1796.     datapageptr.p = excPageptr.p;
  1797.     cundoinfolength = 1;
  1798.     cundoElemIndex = cexcContainerptr;
  1799.     undoWritingProcess(signal);
  1800.   }//if
  1801.   dbgWord32(excPageptr, cexcContainerptr, tshrTmp1);
  1802.   arrGuard(cexcContainerptr, 2048);
  1803.   excPageptr.p->word32[cexcContainerptr] = tshrTmp1;
  1804.   if (((cexcContainerhead >> 7) & 0x3) == 0) {
  1805.     jam();
  1806.     endofshrinkbucketLab(signal);
  1807.     return;
  1808.   }//if
  1809.   nextcontainerinfoExp(signal);
  1810.   do {
  1811.     cexcContainerptr = (cexcPageindex << ZSHIFT_PLUS) - (cexcPageindex << ZSHIFT_MINUS);
  1812.     if (cexcForward == ZTRUE) {
  1813.       jam();
  1814.       cexcContainerptr = cexcContainerptr + ZHEAD_SIZE;
  1815.     } else {
  1816.       jam();
  1817.       cexcContainerptr = ((cexcContainerptr + ZHEAD_SIZE) + ZBUF_SIZE) - ZCON_HEAD_SIZE;
  1818.     }//if
  1819.     arrGuard(cexcContainerptr, 2048);
  1820.     cexcContainerhead = excPageptr.p->word32[cexcContainerptr];
  1821.     cexcContainerlen = cexcContainerhead >> 26;
  1822.     ndbrequire(cexcContainerlen > ZCON_HEAD_SIZE);
  1823.     /*--------------------------------------------------------------------------*/
  1824.     /*       THIS CONTAINER IS NOT YET EMPTY AND WE REMOVE ALL THE ELEMENTS.    */
  1825.     /*--------------------------------------------------------------------------*/
  1826.     shrinkcontainer(signal);
  1827.     cexcPrevpageptr = excPageptr.i;
  1828.     cexcPrevpageindex = cexcPageindex;
  1829.     cexcPrevforward = cexcForward;
  1830.     if (((cexcContainerhead >> 7) & 0x3) != 0) {
  1831.       jam();
  1832.       /*--------------------------------------------------------------------------*/
  1833.       /*       WE MUST CALL THE NEXT CONTAINER INFO ROUTINE BEFORE WE RELEASE THE */
  1834.       /*       CONTAINER SINCE THE RELEASE WILL OVERWRITE THE NEXT POINTER.       */
  1835.       /*--------------------------------------------------------------------------*/
  1836.       nextcontainerinfoExp(signal);
  1837.     }//if
  1838.     rlPageptr.i = cexcPrevpageptr;
  1839.     ptrCheckGuard(rlPageptr, cpagesize, page8);
  1840.     trlPageindex = cexcPrevpageindex;
  1841.     if (cexcPrevforward == ZTRUE) {
  1842.       jam();
  1843.       if (((cexcContainerhead >> 10) & 1) == 1) {
  1844.         jam();
  1845.         trlRelCon = ZFALSE;
  1846.         turlIndex = cexcContainerptr + (ZBUF_SIZE - ZCON_HEAD_SIZE);
  1847.         releaseRightlist(signal);
  1848.       }//if
  1849.       trlRelCon = ZTRUE;
  1850.       tullIndex = cexcContainerptr;
  1851.       releaseLeftlist(signal);
  1852.     } else {
  1853.       jam();
  1854.       if (((cexcContainerhead >> 10) & 1) == 1) {
  1855.         jam();
  1856.         trlRelCon = ZFALSE;
  1857.         tullIndex = cexcContainerptr - (ZBUF_SIZE - ZCON_HEAD_SIZE);
  1858.         releaseLeftlist(signal);
  1859.       }//if
  1860.       trlRelCon = ZTRUE;
  1861.       turlIndex = cexcContainerptr;
  1862.       releaseRightlist(signal);
  1863.     }//if
  1864.   } while (((cexcContainerhead >> 7) & 0x3) != 0);
  1865.   endofshrinkbucketLab(signal);
  1866.   return;
  1867. }//Dbacc::execSHRINKCHECK2()
  1868. void Dbacc::endofshrinkbucketLab(Signal* signal) 
  1869. {
  1870.   fragrecptr.p->expandCounter--;
  1871.   fragrecptr.p->slack -= fragrecptr.p->maxloadfactor;
  1872.   if (fragrecptr.p->expSenderIndex == 0) {
  1873.     jam();
  1874.     fragrecptr.p->dirsize--;
  1875.     if (fragrecptr.p->expSenderPageptr != RNIL) {
  1876.       jam();
  1877.       rpPageptr.i = fragrecptr.p->expSenderPageptr;
  1878.       ptrCheckGuard(rpPageptr, cpagesize, page8);
  1879.       releasePage(signal);
  1880.       expDirptr.i = fragrecptr.p->expSenderDirptr;
  1881.       ptrCheckGuard(expDirptr, cdirarraysize, directoryarray);
  1882.       expDirptr.p->pagep[fragrecptr.p->expSenderDirIndex & 0xff] = RNIL;
  1883.     }//if
  1884.     if (((((fragrecptr.p->p + fragrecptr.p->maxp) + 1) >> fragrecptr.p->k) & 0xff) == 0) {
  1885.       jam();
  1886.       rdDirptr.i = fragrecptr.p->expSenderDirptr;
  1887.       releaseDirectory(signal);
  1888.       expDirRangePtr.i = fragrecptr.p->directory;
  1889.       ptrCheckGuard(expDirRangePtr, cdirrangesize, dirRange);
  1890.       arrGuard((fragrecptr.p->expSenderDirIndex >> 8), 256);
  1891.       expDirRangePtr.p->dirArray[fragrecptr.p->expSenderDirIndex >> 8] = RNIL;
  1892.     }//if
  1893.   }//if
  1894.   if (fragrecptr.p->slack < (1u << 31)) {
  1895.     jam();
  1896.     /*--------------------------------------------------------------*/
  1897.     /* THE SLACK IS POSITIVE, IN THIS CASE WE WILL CHECK WHETHER    */
  1898.     /* WE WILL CONTINUE PERFORM ANOTHER SHRINK.                     */
  1899.     /*--------------------------------------------------------------*/
  1900.     Uint32 noOfBuckets = (fragrecptr.p->maxp + 1) + fragrecptr.p->p;
  1901.     Uint32 Thysteresis = fragrecptr.p->maxloadfactor - fragrecptr.p->minloadfactor;
  1902.     fragrecptr.p->slackCheck = noOfBuckets * Thysteresis;
  1903.     if (fragrecptr.p->slack > Thysteresis) {
  1904.       /*--------------------------------------------------------------*/
  1905.       /*       IT IS STILL NECESSARY TO SHRINK THE FRAGMENT MORE. THIS*/
  1906.       /*       CAN HAPPEN WHEN A NUMBER OF SHRINKS GET REJECTED       */
  1907.       /*       DURING A LOCAL CHECKPOINT. WE START A NEW SHRINK       */
  1908.       /*       IMMEDIATELY FROM HERE WITHOUT WAITING FOR A COMMIT TO  */
  1909.       /*       START IT.                                              */
  1910.       /*--------------------------------------------------------------*/
  1911.       if (fragrecptr.p->expandCounter > 0) {
  1912.         jam();
  1913. /*--------------------------------------------------------------*/
  1914. /*       IT IS VERY IMPORTANT TO NOT TRY TO SHRINK MORE THAN    */
  1915. /*       WAS EXPANDED. IF MAXP IS SET TO A VALUE BELOW 63 THEN  */
  1916. /*       WE WILL LOSE RECORDS SINCE GETDIRINDEX CANNOT HANDLE   */
  1917. /*       SHRINKING BELOW 2^K - 1 (NOW 63). THIS WAS A BUG THAT  */
  1918. /*       WAS REMOVED 2000-05-12.                                */
  1919. /*--------------------------------------------------------------*/
  1920.         signal->theData[0] = fragrecptr.i;
  1921.         signal->theData[1] = fragrecptr.p->p;
  1922.         signal->theData[2] = fragrecptr.p->maxp;
  1923.         signal->theData[3] = fragrecptr.p->expandFlag;
  1924. ndbrequire(fragrecptr.p->expandFlag < 2);
  1925.         fragrecptr.p->expandFlag = 2;
  1926.         sendSignal(cownBlockref, GSN_SHRINKCHECK2, signal, 4, JBB);
  1927.       }//if
  1928.     }//if
  1929.   }//if
  1930.   ndbrequire(fragrecptr.p->maxp >= (Uint32)((1 << fragrecptr.p->k) - 1));
  1931.   return;
  1932. }//Dbacc::endofshrinkbucketLab()
  1933. /* --------------------------------------------------------------------------------- */
  1934. /* SHRINKCONTAINER                                                                   */
  1935. /*        INPUT: EXC_PAGEPTR (POINTER TO THE ACTIVE PAGE RECORD)                     */
  1936. /*               CEXC_CONTAINERLEN (LENGTH OF THE CONTAINER).                        */
  1937. /*               CEXC_CONTAINERPTR (ARRAY INDEX OF THE CONTAINER).                   */
  1938. /*               CEXC_FORWARD (CONTAINER FORWARD (+1) OR BACKWARD (-1))              */
  1939. /*                                                                                   */
  1940. /*        DESCRIPTION: ALL ELEMENTS OF THE ACTIVE CONTAINER HAVE TO MOVE TO THE NEW  */
  1941. /*                  CONTAINER.                                                       */
  1942. /* --------------------------------------------------------------------------------- */
  1943. void Dbacc::shrinkcontainer(Signal* signal) 
  1944. {
  1945.   Uint32 tshrElementptr;
  1946.   Uint32 tshrRemLen;
  1947.   Uint32 tshrInc;
  1948.   Uint32 tshrKeyLen;
  1949.   Uint32 tshrTmp;
  1950.   Uint32 tshrIndex;
  1951.   Uint32 guard21;
  1952.   tshrRemLen = cexcContainerlen - ZCON_HEAD_SIZE;
  1953.   tshrKeyLen = fragrecptr.p->keyLength;
  1954.   if (tshrKeyLen == 0) {
  1955.     jam();
  1956.     tshrKeyLen = ZACTIVE_LONG_KEY_LEN;
  1957.   }//if
  1958.   tshrInc = (ZELEM_HEAD_SIZE + tshrKeyLen) + fragrecptr.p->localkeylen;
  1959.   if (cexcForward == ZTRUE) {
  1960.     jam();
  1961.     tshrElementptr = cexcContainerptr + ZCON_HEAD_SIZE;
  1962.   } else {
  1963.     jam();
  1964.     tshrElementptr = cexcContainerptr - 1;
  1965.   }//if
  1966.  SHR_LOOP:
  1967.   idrOperationRecPtr.i = RNIL;
  1968.   ptrNull(idrOperationRecPtr);
  1969.   /* --------------------------------------------------------------------------------- */
  1970.   /*       THE CODE BELOW IS ALL USED TO PREPARE FOR THE CALL TO INSERT_ELEMENT AND    */
  1971.   /*       HANDLE THE RESULT FROM INSERT_ELEMENT. INSERT_ELEMENT INSERTS THE ELEMENT   */
  1972.   /*       INTO ANOTHER BUCKET.                                                        */
  1973.   /* --------------------------------------------------------------------------------- */
  1974.   arrGuard(tshrElementptr, 2048);
  1975.   tidrElemhead = excPageptr.p->word32[tshrElementptr];
  1976.   if (ElementHeader::getLocked(tidrElemhead)) {
  1977.     jam();
  1978.     /* --------------------------------------------------------------------------------- */
  1979.     /*       IF THE ELEMENT IS LOCKED WE MUST UPDATE THE ELEMENT INFO IN THE OPERATION   */
  1980.     /*       RECORD OWNING THE LOCK. WE DO THIS BY READING THE OPERATION RECORD POINTER  */
  1981.     /*       FROM THE ELEMENT HEADER.                                                    */
  1982.     /* --------------------------------------------------------------------------------- */
  1983.     idrOperationRecPtr.i = ElementHeader::getOpPtrI(tidrElemhead);
  1984.     ptrCheckGuard(idrOperationRecPtr, coprecsize, operationrec);
  1985.     if (fragrecptr.p->createLcp == ZTRUE) {
  1986.       jam();
  1987.       /* --------------------------------------------------------------------------------- */
  1988.       // During local checkpoints we must ensure that we restore the element header in
  1989.       // unlocked state and with the hash value part there with tuple status zeroed.
  1990.       // Otherwise a later insert over the same element will write an UNDO log that will
  1991.       // ensure that the now removed element is restored together with its locked element
  1992.       // header and without the hash value part.
  1993.       /* --------------------------------------------------------------------------------- */
  1994.       const Uint32 hv = idrOperationRecPtr.p->hashvaluePart;
  1995.       const Uint32 eh = ElementHeader::setUnlocked(hv, 0);
  1996.       excPageptr.p->word32[tshrElementptr] = eh;    
  1997.     }//if
  1998.   }//if
  1999.   tshrTmp = tshrElementptr + cexcForward;
  2000.   guard21 = fragrecptr.p->localkeylen - 1;
  2001.   for (tshrIndex = 0; tshrIndex <= guard21; tshrIndex++) {
  2002.     arrGuard(tshrIndex, 2);
  2003.     arrGuard(tshrTmp, 2048);
  2004.     clocalkey[tshrIndex] = excPageptr.p->word32[tshrTmp];
  2005.     tshrTmp = tshrTmp + cexcForward;
  2006.   }//for
  2007.   guard21 = tshrKeyLen - 1;
  2008.   for (tshrIndex = 0; tshrIndex <= guard21; tshrIndex++) {
  2009.     arrGuard(tshrIndex, 2048);
  2010.     arrGuard(tshrTmp, 2048);
  2011.     ckeys[tshrIndex] = excPageptr.p->word32[tshrTmp];
  2012.     tshrTmp = tshrTmp + cexcForward;
  2013.   }//for
  2014.   tidrPageindex = fragrecptr.p->expReceiveIndex;
  2015.   idrPageptr.i = fragrecptr.p->expReceivePageptr;
  2016.   ptrCheckGuard(idrPageptr, cpagesize, page8);
  2017.   tidrForward = fragrecptr.p->expReceiveForward;
  2018.   tidrKeyLen = tshrKeyLen;
  2019.   insertElement(signal);
  2020.   /* --------------------------------------------------------------------------------- */
  2021.   /*       TAKE CARE OF RESULT FROM INSERT_ELEMENT.                                    */
  2022.   /* --------------------------------------------------------------------------------- */
  2023.   fragrecptr.p->expReceiveIndex = tidrPageindex;
  2024.   fragrecptr.p->expReceivePageptr = idrPageptr.i;
  2025.   fragrecptr.p->expReceiveForward = tidrForward;
  2026.   if (tshrRemLen < tshrInc) {
  2027.     jam();
  2028.     sendSystemerror(signal);
  2029.   }//if
  2030.   tshrRemLen = tshrRemLen - tshrInc;
  2031.   if (tshrRemLen != 0) {
  2032.     jam();
  2033.     tshrElementptr = tshrTmp;
  2034.     goto SHR_LOOP;
  2035.   }//if
  2036. }//Dbacc::shrinkcontainer()
  2037. /* --------------------------------------------------------------------------------- */
  2038. /* NEXTCONTAINERINFO_EXP                                                             */
  2039. /*        DESCRIPTION:THE CONTAINER HEAD WILL BE CHECKED TO CALCULATE INFORMATION    */
  2040. /*                    ABOUT NEXT CONTAINER IN THE BUCKET.                            */
  2041. /*          INPUT:       CEXC_CONTAINERHEAD                                          */
  2042. /*                       CEXC_CONTAINERPTR                                           */
  2043. /*                       EXC_PAGEPTR                                                 */
  2044. /*          OUTPUT:                                                                  */
  2045. /*             CEXC_PAGEINDEX (INDEX FROM WHICH PAGE INDEX CAN BE CALCULATED.        */
  2046. /*             EXC_PAGEPTR (PAGE REFERENCE OF NEXT CONTAINER)                        */
  2047. /*             CEXC_FORWARD                                                          */
  2048. /* --------------------------------------------------------------------------------- */
  2049. void Dbacc::nextcontainerinfoExp(Signal* signal) 
  2050. {
  2051.   tnciNextSamePage = (cexcContainerhead >> 9) & 0x1; /* CHECK BIT FOR CHECKING WHERE */
  2052.   /* THE NEXT CONTAINER IS IN THE SAME PAGE */
  2053.   cexcPageindex = cexcContainerhead & 0x7f; /* NEXT CONTAINER PAGE INDEX 7 BITS */
  2054.   if (((cexcContainerhead >> 7) & 3) == ZLEFT) {
  2055.     jam();
  2056.     cexcForward = ZTRUE;
  2057.   } else if (((cexcContainerhead >> 7) & 3) == ZRIGHT) {
  2058.     jam();
  2059.     cexcForward = cminusOne;
  2060.   } else {
  2061.     jam();
  2062.     sendSystemerror(signal);
  2063.     cexcForward = 0; /* DUMMY FOR COMPILER */
  2064.   }//if
  2065.   if (tnciNextSamePage == ZFALSE) {
  2066.     jam();
  2067.     /* NEXT CONTAINER IS IN AN OVERFLOW PAGE */
  2068.     arrGuard(cexcContainerptr + 1, 2048);
  2069.     tnciTmp = excPageptr.p->word32[cexcContainerptr + 1];
  2070.     nciOverflowrangeptr.i = fragrecptr.p->overflowdir;
  2071.     ptrCheckGuard(nciOverflowrangeptr, cdirrangesize, dirRange);
  2072.     arrGuard((tnciTmp >> 8), 256);
  2073.     nciOverflowDirptr.i = nciOverflowrangeptr.p->dirArray[tnciTmp >> 8];
  2074.     ptrCheckGuard(nciOverflowDirptr, cdirarraysize, directoryarray);
  2075.     excPageptr.i = nciOverflowDirptr.p->pagep[tnciTmp & 0xff];
  2076.     ptrCheckGuard(excPageptr, cpagesize, page8);
  2077.   }//if
  2078. }//Dbacc::nextcontainerinfoExp()
  2079. /* --------------------------------------------------------------------------------- */
  2080. /* --------------------------------------------------------------------------------- */
  2081. /* --------------------------------------------------------------------------------- */
  2082. /*                                                                                   */
  2083. /*       END OF EXPAND/SHRINK MODULE                                                 */
  2084. /*                                                                                   */
  2085. /* --------------------------------------------------------------------------------- */
  2086. /* --------------------------------------------------------------------------------- */
  2087. /* --------------------------------------------------------------------------------- */
  2088. /* --------------------------------------------------------------------------------- */
  2089. /* --------------------------------------------------------------------------------- */
  2090. /*                                                                                   */
  2091. /*       LOCAL CHECKPOINT MODULE                                                     */
  2092. /*                                                                                   */
  2093. /* --------------------------------------------------------------------------------- */
  2094. /* --------------------------------------------------------------------------------- */
  2095. /* ******************--------------------------------------------------------------- */
  2096. /* LCP_FRAGIDREQ                                                                     */
  2097. /*                                                     SENDER: LQH,    LEVEL B       */
  2098. /*          ENTER  LCP_FRAGIDREQ WITH                                                */
  2099. /*                    TUSERPTR                       LQH CONNECTION PTR              */
  2100. /*                    TUSERBLOCKREF,                 LQH BLOCK REFERENCE             */
  2101. /*                    TCHECKPOINTID,                 THE CHECKPOINT NUMBER TO USE    */
  2102. /*                                                     (E.G. 1,2 OR 3)               */
  2103. /*                    TABPTR,                        TABLE ID = TABLE RECORD POINTER */
  2104. /*                    TFID                           ROOT FRAGMENT ID                */
  2105. /*                    CACTIVE_UNDO_FILE_VERSION      UNDO FILE VERSION 0,1,2 OR 3.   */
  2106. /* ******************--------------------------------------------------------------- */
  2107. /* ******************--------------------------------------------------------------- */
  2108. /* LCP_FRAGIDREQ                           REQUEST FOR LIST OF STOPED OPERATION  */
  2109. /* ******************------------------------------+                                 */
  2110. /*   SENDER: LQH,    LEVEL B       */
  2111. void Dbacc::execLCP_FRAGIDREQ(Signal* signal) 
  2112. {
  2113.   jamEntry();
  2114.   tuserptr = signal->theData[0];       /* LQH CONNECTION PTR              */
  2115.   tuserblockref = signal->theData[1];  /* LQH BLOCK REFERENCE             */
  2116.   tcheckpointid = signal->theData[2];  /* THE CHECKPOINT NUMBER TO USE    */
  2117.                                        /*   (E.G. 1,2 OR 3)               */
  2118.   tabptr.i = signal->theData[3];       /* TABLE ID = TABLE RECORD POINTER */
  2119.   ptrCheck(tabptr, ctablesize, tabrec);
  2120.   tfid = signal->theData[4];           /* ROOT FRAGMENT ID                */
  2121.   cactiveUndoFileVersion = signal->theData[5]; /* UNDO FILE VERSION 0,1,2 OR 3.   */
  2122.   tresult = 0;
  2123.   ndbrequire(getrootfragmentrec(signal, rootfragrecptr, tfid));
  2124.   ndbrequire(rootfragrecptr.p->rootState == ACTIVEROOT);
  2125.   seizeLcpConnectRec(signal);
  2126.   initLcpConnRec(signal);
  2127.   lcpConnectptr.p->rootrecptr = rootfragrecptr.i;
  2128.   rootfragrecptr.p->lcpPtr = lcpConnectptr.i;
  2129.   lcpConnectptr.p->localCheckPid = tcheckpointid;
  2130.   lcpConnectptr.p->lcpstate = LCP_ACTIVE;
  2131.   rootfragrecptr.p->rootState = LCP_CREATION;
  2132.   fragrecptr.i = rootfragrecptr.p->fragmentptr[0];
  2133.   /* D6 AT FSOPENREQ =#010003FF. */
  2134.   tlfrTmp1 = 0x010003ff;      /*  FILE TYPE = .DATA ,VERSION OF FILENAME = 1 */
  2135.   tlfrTmp2 = 0x301;       /* D7 CREATE, WRITE ONLY, TRUNCATE TO ZERO */
  2136.   ndbrequire(cfsFirstfreeconnect != RNIL);
  2137.   seizeFsConnectRec(signal);
  2138.   fsConnectptr.p->fragrecPtr = fragrecptr.i;
  2139.   fsConnectptr.p->fsState = WAIT_OPEN_DATA_FILE_FOR_WRITE;
  2140.   /* ----------- FILENAME  (FILESYSTEM)/D3/DBACC/"T"TABID/"F"FRAGID/"S"VERSIONID.DATA ------------ */
  2141.   /* ************************ */
  2142.   /* FSOPENREQ                */
  2143.   /* ************************ */
  2144.   signal->theData[0] = cownBlockref;
  2145.   signal->theData[1] = fsConnectptr.i;
  2146.   signal->theData[2] = tabptr.i;                        /* TABLE IDENTITY */
  2147.   signal->theData[3] = rootfragrecptr.p->fragmentid[0]; /* FRAGMENT IDENTITY */
  2148.   signal->theData[4] = lcpConnectptr.p->localCheckPid;  /* CHECKPOINT ID */
  2149.   signal->theData[5] = tlfrTmp1;
  2150.   signal->theData[6] = tlfrTmp2;
  2151.   sendSignal(NDBFS_REF, GSN_FSOPENREQ, signal, 7, JBA);
  2152.   return;
  2153. }//Dbacc::execLCP_FRAGIDREQ()
  2154. /* ******************--------------------------------------------------------------- */
  2155. /* FSOPENCONF                                         OPENFILE CONF                  */
  2156. /*                                                     SENDER: FS,     LEVEL B       */
  2157. /*          ENTER FSOPENCONF WITH                                                    */
  2158. /*                    FS_CONNECTPTR,                 FS_CONNECTION PTR               */
  2159. /*                    TUSERPOINTER,                  FILE POINTER                    */
  2160. /* ******************--------------------------------------------------------------- */
  2161. void Dbacc::lcpFsOpenConfLab(Signal* signal) 
  2162. {
  2163.   fsConnectptr.p->fsPtr = tuserptr;
  2164.   fragrecptr.i = fsConnectptr.p->fragrecPtr;
  2165.   ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
  2166.   rootfragrecptr.i = fragrecptr.p->myroot;
  2167.   fragrecptr.p->activeDataFilePage = 1; /* ZERO IS KEPT FOR PAGE_ZERO */
  2168.   fragrecptr.p->fsConnPtr = fsConnectptr.i;
  2169.   ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
  2170.   lcpConnectptr.i = rootfragrecptr.p->lcpPtr;
  2171.   ptrCheckGuard(lcpConnectptr, clcpConnectsize, lcpConnectrec);
  2172.   if (rootfragrecptr.p->fragmentptr[0] == fragrecptr.i) {
  2173.     jam();
  2174.     fragrecptr.i = rootfragrecptr.p->fragmentptr[1];
  2175.     ptrCheck(fragrecptr, cfragmentsize, fragmentrec);
  2176.     /* ----------- FILENAME  (FILESYSTEM)/D3/DBACC/"T"TABID/"F"FRAGID/"S"VERSIONID.DATA ------------ */
  2177.     /* D6 AT FSOPENREQ =#010003FF. */
  2178.     tlfrTmp1 = 0x010003ff; /*  FILE TYPE = .DATA ,VERSION OF FILENAME = 1 */
  2179.     tlfrTmp2 = 0x301;         /* D7 CREATE, WRITE ONLY, TRUNCATE TO ZERO */
  2180.     ndbrequire(cfsFirstfreeconnect != RNIL);
  2181.     seizeFsConnectRec(signal);
  2182.     fsConnectptr.p->fragrecPtr = fragrecptr.i;
  2183.     fsConnectptr.p->fsState = WAIT_OPEN_DATA_FILE_FOR_WRITE;
  2184.     /* ************************ */
  2185.     /* FSOPENREQ                */
  2186.     /* ************************ */
  2187.     signal->theData[0] = cownBlockref;
  2188.     signal->theData[1] = fsConnectptr.i;
  2189.     signal->theData[2] = rootfragrecptr.p->mytabptr;        /* TABLE IDENTITY */
  2190.     signal->theData[3] = rootfragrecptr.p->fragmentid[1];   /* FRAGMENT IDENTITY */
  2191.     signal->theData[4] = lcpConnectptr.p->localCheckPid;    /* CHECKPOINT ID */
  2192.     signal->theData[5] = tlfrTmp1;
  2193.     signal->theData[6] = tlfrTmp2;
  2194.     sendSignal(NDBFS_REF, GSN_FSOPENREQ, signal, 7, JBA);
  2195.     return;
  2196.   } else {
  2197.     ndbrequire(rootfragrecptr.p->fragmentptr[1] == fragrecptr.i);
  2198.   }//if
  2199.   /*---- BOTH DATA FILES ARE OPEN------*/
  2200.   /* ----IF THE UNDO FILE IS CLOSED , OPEN IT.-----  */
  2201.   if (cactiveOpenUndoFsPtr != RNIL) {
  2202.     jam();
  2203.     sendLcpFragidconfLab(signal);
  2204.     return;
  2205.   }//if
  2206.   cactiveUndoFilePage = 0;
  2207.   cprevUndoaddress = cminusOne;
  2208.   cundoposition = 0;
  2209.   clastUndoPageIdWritten = 0;
  2210.   ndbrequire(cfsFirstfreeconnect != RNIL);
  2211.   seizeFsConnectRec(signal);
  2212.   fsConnectptr.p->fsState = WAIT_OPEN_UNDO_LCP;
  2213.   fsConnectptr.p->fsPart = 0;                         /* FILE INDEX, SECOND FILE IN THE DIRECTORY  */
  2214.   cactiveOpenUndoFsPtr = fsConnectptr.i;
  2215.   cactiveRootfrag = rootfragrecptr.i;
  2216.   tlfrTmp1 = 1;                                         /* FILE VERSION */
  2217.   tlfrTmp1 = (tlfrTmp1 << 8) + ZLOCALLOGFILE;         /* .LOCLOG = 2 */
  2218.   tlfrTmp1 = (tlfrTmp1 << 8) + 4;                 /* ROOT DIRECTORY = D4 */
  2219.   tlfrTmp1 = (tlfrTmp1 << 8) + fsConnectptr.p->fsPart; /*        P2  */
  2220.   tlfrTmp2 = 0x302;                                 /* D7 CREATE , READ / WRITE , TRUNCATE TO ZERO */
  2221.   /* ---FILE NAME "D4"/"DBACC"/LCP_CONNECTPTR:LOCAL_CHECK_PID/FS_CONNECTPTR:FS_PART".LOCLOG-- */
  2222.   /* ************************ */
  2223.   /* FSOPENREQ                */
  2224.   /* ************************ */
  2225.   signal->theData[0] = cownBlockref;
  2226.   signal->theData[1] = fsConnectptr.i;
  2227.   signal->theData[2] = cminusOne;         /* #FFFFFFFF */
  2228.   signal->theData[3] = cminusOne;         /* #FFFFFFFF */
  2229.   signal->theData[4] = cactiveUndoFileVersion;
  2230.   /* A GROUP OF UNDO FILES WHICH ARE UPDATED */
  2231.   signal->theData[5] = tlfrTmp1;
  2232.   signal->theData[6] = tlfrTmp2;
  2233.   sendSignal(NDBFS_REF, GSN_FSOPENREQ, signal, 7, JBA);
  2234.   return;
  2235. }//Dbacc::lcpFsOpenConfLab()
  2236. void Dbacc::lcpOpenUndofileConfLab(Signal* signal) 
  2237. {
  2238.   ptrGuard(fsConnectptr);
  2239.   fsConnectptr.p->fsState = WAIT_NOTHING;
  2240.   rootfragrecptr.i = cactiveRootfrag;
  2241.   ptrCheck(rootfragrecptr, crootfragmentsize, rootfragmentrec);
  2242.   fsConnectptr.p->fsPtr = tuserptr;
  2243.   sendLcpFragidconfLab(signal);
  2244.   return;
  2245. }//Dbacc::lcpOpenUndofileConfLab()
  2246. void Dbacc::sendLcpFragidconfLab(Signal* signal) 
  2247. {
  2248.   ptrGuard(rootfragrecptr);
  2249.   lcpConnectptr.i = rootfragrecptr.p->lcpPtr;
  2250.   ptrCheckGuard(lcpConnectptr, clcpConnectsize, lcpConnectrec);
  2251.   /* ************************ */
  2252.   /* LCP_FRAGIDCONF           */
  2253.   /* ************************ */
  2254.   signal->theData[0] = lcpConnectptr.p->lcpUserptr;
  2255.   signal->theData[1] = lcpConnectptr.i;
  2256.   signal->theData[2] = 2;
  2257.   /* NO OF LOCAL FRAGMENTS */
  2258.   signal->theData[3] = rootfragrecptr.p->fragmentid[0];
  2259.   signal->theData[4] = rootfragrecptr.p->fragmentid[1];
  2260.   signal->theData[5] = RNIL;
  2261.   signal->theData[6] = RNIL;
  2262.   sendSignal(lcpConnectptr.p->lcpUserblockref, GSN_LCP_FRAGIDCONF, signal, 7, JBB);
  2263.   return;
  2264. }//Dbacc::sendLcpFragidconfLab()
  2265. /* ******************--------------------------------------------------------------- */
  2266. /* LCP_HOLDOPERATION                           REQUEST FOR LIST OF STOPED OPERATION  */
  2267. /*                                                     SENDER: LQH,    LEVEL B       */
  2268. /*         ENTER  LCP_HOLDOPREQ WITH                                                 */
  2269. /*                    LCP_CONNECTPTR                CONNECTION POINTER               */
  2270. /*                    TFID,                         LOCAL FRAGMENT ID                */
  2271. /*                    THOLD_PREV_SENT_OP            NR OF SENT OPERATIONS AT         */
  2272. /*                                                  PREVIOUS SIGNALS                 */
  2273. /*                    TLQH_POINTER                  LQH USER POINTER                 */
  2274. /* ******************--------------------------------------------------------------- */
  2275. /* ******************--------------------------------------------------------------- */
  2276. /* LCP_HOLDOPERATION                           REQUEST FOR LIST OF STOPED OPERATION  */
  2277. /* ******************------------------------------+                                 */
  2278. /*   SENDER: LQH,    LEVEL B       */
  2279. void Dbacc::execLCP_HOLDOPREQ(Signal* signal) 
  2280. {
  2281.   Uint32 tholdPrevSentOp;
  2282.   jamEntry();
  2283.   lcpConnectptr.i = signal->theData[0];    /* CONNECTION POINTER              */
  2284.   tfid = signal->theData[1];               /* LOCAL FRAGMENT ID               */
  2285.   tholdPrevSentOp = signal->theData[2];    /* NR OF SENT OPERATIONS AT        */
  2286.                                            /* PREVIOUS SIGNALS                */
  2287.   tlqhPointer = signal->theData[3];        /* LQH USER POINTER                */
  2288.   tresult = 0;
  2289.   ptrCheckGuard(lcpConnectptr, clcpConnectsize, lcpConnectrec);
  2290.   ndbrequire(lcpConnectptr.p->lcpstate == LCP_ACTIVE);
  2291.   rootfragrecptr.i = lcpConnectptr.p->rootrecptr;
  2292.   ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
  2293.   if (rootfragrecptr.p->fragmentid[0] == tfid) {
  2294.     jam();
  2295.     fragrecptr.i = rootfragrecptr.p->fragmentptr[0];
  2296.     ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
  2297.   } else {
  2298.     ndbrequire(rootfragrecptr.p->fragmentid[1] == tfid);
  2299.     jam();
  2300.     fragrecptr.i = rootfragrecptr.p->fragmentptr[1];
  2301.   }//if
  2302.   ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
  2303.   fragrecptr.p->lcpLqhPtr = tlqhPointer;
  2304.   if (tholdPrevSentOp != 0) {
  2305.     ndbrequire(fragrecptr.p->fragState == SEND_QUE_OP);
  2306.   } else if (tholdPrevSentOp == 0) {
  2307.     jam();
  2308.     fragrecptr.p->fragState = SEND_QUE_OP;
  2309.     fragrecptr.p->stopQueOp = ZTRUE;
  2310.     fragrecptr.p->sentWaitInQueOp = fragrecptr.p->firstWaitInQueOp;
  2311.   }//if
  2312.   tholdSentOp = 0; /* NR OF OPERATION WHICH ARE SENT THIS TIME */
  2313.   operationRecPtr.i = fragrecptr.p->sentWaitInQueOp;
  2314.   /* --------------------------------------------- */
  2315.   /* GO THROUGH ALL OPERATION IN THE WAIT          */
  2316.   /* LIST AND SEND THE LQH CONNECTION  PTR OF THE  */
  2317.   /* OPERATIONS TO THE LQH BLOCK. MAX 23 0PERATION */
  2318.   /* PER SIGNAL                                    */
  2319.   /* --------------------------------------------- */
  2320.   while (operationRecPtr.i != RNIL) {
  2321.     jam();
  2322.     ptrCheckGuard(operationRecPtr, coprecsize, operationrec);
  2323.     ckeys[tholdSentOp] = operationRecPtr.p->userptr;
  2324.     operationRecPtr.i = operationRecPtr.p->nextQueOp;
  2325.     tholdSentOp++;
  2326.     if ((tholdSentOp >= 23) &&
  2327.         (operationRecPtr.i != RNIL)) {
  2328.       jam();
  2329.       /* ----------------------------------------------- */
  2330.       /* THERE IS MORE THAN 23 WAIT OPERATION. WE        */
  2331.       /* HAVE TO SEND THESE 23 AND WAITE FOR NEXT SIGNAL */
  2332.       /* ----------------------------------------------- */
  2333.       tholdMore = ZTRUE; /* SECOUND DATA AT THE CONF SIGNAL , = MORE */
  2334.       fragrecptr.p->sentWaitInQueOp = operationRecPtr.i;
  2335.       sendholdconfsignalLab(signal);
  2336.       return;
  2337.     }//if
  2338.   }//while
  2339.   /* ----------------------------------------------- */
  2340.   /* OPERATION_REC_PTR = RNIL                        */
  2341.   /* THERE IS NO MORE WAITING OPERATION, STATE OF    */
  2342.   /* THE FRAGMENT RRECORD IS CHANGED AND RETURN      */
  2343.   /* SIGNAL IS SENT                                  */
  2344.   /* ----------------------------------------------- */
  2345.   fragrecptr.p->sentWaitInQueOp = RNIL;
  2346.   tholdMore = ZFALSE; /* SECOND DATA AT THE CONF SIGNAL , = NOT MORE */
  2347.   fragrecptr.p->fragState = WAIT_ACC_LCPREQ;
  2348.   sendholdconfsignalLab(signal);
  2349.   return;
  2350. }//Dbacc::execLCP_HOLDOPREQ()
  2351. void Dbacc::sendholdconfsignalLab(Signal* signal) 
  2352. {
  2353.   tholdMore = (tholdMore << 16) + tholdSentOp;
  2354.   /* SECOND SIGNAL DATA, LENGTH + MORE */
  2355.   /* ************************ */
  2356.   /* LCP_HOLDOPCONF           */
  2357.   /* ************************ */
  2358.   signal->theData[0] = fragrecptr.p->lcpLqhPtr;
  2359.   signal->theData[1] = tholdMore;
  2360.   signal->theData[2] = ckeys[0];
  2361.   signal->theData[3] = ckeys[1];
  2362.   signal->theData[4] = ckeys[2];
  2363.   signal->theData[5] = ckeys[3];
  2364.   signal->theData[6] = ckeys[4];
  2365.   signal->theData[7] = ckeys[5];
  2366.   signal->theData[8] = ckeys[6];
  2367.   signal->theData[9] = ckeys[7];
  2368.   signal->theData[10] = ckeys[8];
  2369.   signal->theData[11] = ckeys[9];
  2370.   signal->theData[12] = ckeys[10];
  2371.   signal->theData[13] = ckeys[11];
  2372.   signal->theData[14] = ckeys[12];
  2373.   signal->theData[15] = ckeys[13];
  2374.   signal->theData[16] = ckeys[14];
  2375.   signal->theData[17] = ckeys[15];
  2376.   signal->theData[18] = ckeys[16];
  2377.   signal->theData[19] = ckeys[17];
  2378.   signal->theData[20] = ckeys[18];
  2379.   signal->theData[21] = ckeys[19];
  2380.   signal->theData[22] = ckeys[20];
  2381.   signal->theData[23] = ckeys[21];
  2382.   signal->theData[24] = ckeys[22];
  2383.   sendSignal(lcpConnectptr.p->lcpUserblockref, GSN_LCP_HOLDOPCONF, signal, 25, JBA);
  2384.   return;
  2385. }//Dbacc::sendholdconfsignalLab()
  2386. /**
  2387.  * execACC_LCPREQ                                        
  2388.  * Perform local checkpoint of a fragment
  2389.  *
  2390.  *  SENDER: LQH,    LEVEL B
  2391.  *  ENTER ACC_LCPREQ WITH 
  2392.  *  LCP_CONNECTPTR,                OPERATION RECORD PTR
  2393.  *  TLCP_LQH_CHECK_V,              LQH'S  LOCAL FRAG CHECK VALUE
  2394.  *  TLCP_LOCAL_FRAG_ID,            LOCAL FRAG ID
  2395.  *
  2396.  */
  2397. void Dbacc::execACC_LCPREQ(Signal* signal) 
  2398. {
  2399.   Uint32 tlcpLocalFragId;
  2400.   Uint32 tlcpLqhCheckV;
  2401.   jamEntry();
  2402.   lcpConnectptr.i = signal->theData[0];   // CONNECTION  PTR
  2403.   tlcpLqhCheckV = signal->theData[1];     // LQH'S  LOCAL FRAG CHECK VALUE
  2404.   tlcpLocalFragId = signal->theData[2];   // LOCAL FRAG ID
  2405.   tresult = 0;
  2406.   ptrCheckGuard(lcpConnectptr, clcpConnectsize, lcpConnectrec);
  2407.   ndbrequire(lcpConnectptr.p->lcpstate == LCP_ACTIVE);
  2408.   rootfragrecptr.i = lcpConnectptr.p->rootrecptr;
  2409.   ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
  2410.   if (rootfragrecptr.p->fragmentid[0] == tlcpLocalFragId) {
  2411.     jam();
  2412.     fragrecptr.i = rootfragrecptr.p->fragmentptr[0];
  2413.   } else {
  2414.     ndbrequire(rootfragrecptr.p->fragmentid[1] == tlcpLocalFragId);
  2415.     jam();
  2416.     fragrecptr.i = rootfragrecptr.p->fragmentptr[1];
  2417.   }//if
  2418.   ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
  2419.   ndbrequire(fragrecptr.p->fragState == WAIT_ACC_LCPREQ);
  2420.   fragrecptr.p->lcpLqhPtr = tlcpLqhCheckV;
  2421.   Page8Ptr zeroPagePtr;
  2422.   seizeLcpPage(zeroPagePtr);
  2423.   fragrecptr.p->zeroPagePtr = zeroPagePtr.i;
  2424.   fragrecptr.p->prevUndoposition = cminusOne;
  2425.   initRootFragPageZero(rootfragrecptr, zeroPagePtr);
  2426.   initFragPageZero(fragrecptr, zeroPagePtr);
  2427.   /*-----------------------------------------------------------------*/
  2428.   /*       SEIZE ZERO PAGE FIRST AND THEN SEIZE DATA PAGES IN        */
  2429.   /*       BACKWARDS ORDER. THIS IS TO ENSURE THAT WE GET THE PAGES  */
  2430.   /*       IN ORDER. ON WINDOWS NT THIS WILL BE A BENEFIT SINCE WE   */
  2431.   /*       CAN THEN DO 1 WRITE_FILE INSTEAD OF 8.                    */
  2432.   /*       WHEN WE RELEASE THE PAGES WE RELEASE THEM IN THE OPPOSITE */
  2433.   /*       ORDER.                                                    */
  2434.   /*-----------------------------------------------------------------*/
  2435.   for (Uint32 taspTmp = ZWRITEPAGESIZE - 1; (Uint32)~taspTmp; taspTmp--) {
  2436.     Page8Ptr dataPagePtr;
  2437.     jam();
  2438.     ndbrequire(fragrecptr.p->datapages[taspTmp] == RNIL);
  2439.     seizeLcpPage(dataPagePtr);
  2440.     fragrecptr.p->datapages[taspTmp] = dataPagePtr.i;
  2441.   }//for
  2442.   fragrecptr.p->lcpMaxDirIndex = fragrecptr.p->dirsize;
  2443.   fragrecptr.p->lcpMaxOverDirIndex = fragrecptr.p->lastOverIndex;
  2444.   fragrecptr.p->createLcp = ZTRUE;
  2445.   operationRecPtr.i = fragrecptr.p->lockOwnersList;
  2446.   lcp_write_op_to_undolog(signal);
  2447. }
  2448. void
  2449. Dbacc::lcp_write_op_to_undolog(Signal* signal)
  2450. {
  2451.   bool delay_continueb= false;
  2452.   Uint32 i, j;
  2453.   for (i= 0; i < 16; i++) {
  2454.     jam();
  2455.     if (remainingUndoPages() <= ZMIN_UNDO_PAGES_AT_COMMIT) {
  2456.       jam();
  2457.       delay_continueb= true;
  2458.       break;
  2459.     }
  2460.     for (j= 0; j < 32; j++) {
  2461.       if (operationRecPtr.i == RNIL) {
  2462.         jam();
  2463.         break;
  2464.       }
  2465.       jam();
  2466.       ptrCheckGuard(operationRecPtr, coprecsize, operationrec);
  2467.       if ((operationRecPtr.p->operation == ZINSERT) ||
  2468.           (operationRecPtr.p->elementIsDisappeared == ZTRUE)){
  2469.       /*******************************************************************
  2470.        * Only log inserts and elements that are marked as dissapeared.
  2471.        * All other operations update the element header and that is handled
  2472.        * when pages are written to disk
  2473.        ********************************************************************/
  2474.         undopageptr.i = (cundoposition>>ZUNDOPAGEINDEXBITS) & (cundopagesize-1);
  2475.         ptrAss(undopageptr, undopage);
  2476.         theadundoindex = cundoposition & ZUNDOPAGEINDEX_MASK;
  2477.         tundoindex = theadundoindex + ZUNDOHEADSIZE;
  2478.         writeUndoOpInfo(signal);/* THE INFORMATION ABOUT ELEMENT HEADER, STORED*/
  2479.                                 /* IN OP REC, IS WRITTEN AT UNDO PAGES */
  2480.         cundoElemIndex = 0;/* DEFAULT VALUE USED BY WRITE_UNDO_HEADER SUBROTINE */
  2481.         writeUndoHeader(signal, RNIL, UndoHeader::ZOP_INFO); /* WRITE THE HEAD OF THE UNDO ELEMENT */
  2482.         checkUndoPages(signal); /* SEND UNDO PAGE TO DISK WHEN A GROUP OF  */
  2483.                                 /* UNDO PAGES,CURRENTLY 8, IS FILLED */
  2484.       }
  2485.       operationRecPtr.i = operationRecPtr.p->nextLockOwnerOp;
  2486.     }
  2487.     if (operationRecPtr.i == RNIL) {
  2488.       jam();
  2489.       break;
  2490.     }
  2491.   }
  2492.   if (operationRecPtr.i != RNIL) {
  2493.     jam();
  2494.     signal->theData[0]= ZLCP_OP_WRITE_RT_BREAK;
  2495.     signal->theData[1]= operationRecPtr.i;
  2496.     signal->theData[2]= fragrecptr.i;
  2497.     signal->theData[3]= lcpConnectptr.i;
  2498.     if (delay_continueb) {
  2499.       jam();
  2500.       sendSignalWithDelay(cownBlockref, GSN_CONTINUEB, signal, 10, 4);
  2501.     } else {
  2502.       jam();
  2503.       sendSignal(cownBlockref, GSN_CONTINUEB, signal, 4, JBB);
  2504.     }
  2505.     return;
  2506.   }
  2507.   signal->theData[0] = fragrecptr.p->lcpLqhPtr;
  2508.   sendSignal(lcpConnectptr.p->lcpUserblockref, GSN_ACC_LCPSTARTED, 
  2509.      signal, 1, JBA);
  2510.   fragrecptr.p->activeDataPage = 0;
  2511.   fragrecptr.p->lcpDirIndex = 0;
  2512.   fragrecptr.p->fragState = LCP_SEND_PAGES;
  2513.   signal->theData[0] = lcpConnectptr.i;
  2514.   signal->theData[1] = fragrecptr.i;
  2515.   sendSignal(cownBlockref, GSN_ACC_SAVE_PAGES, signal, 2, JBB);
  2516. }
  2517. /* ******************--------------------------------------------------------------- */
  2518. /* ACC_SAVE_PAGES           A GROUP OF PAGES IS ALLOCATED. THE PAGES AND OVERFLOW    */
  2519. /*                          PAGES OF THE FRAGMENT ARE COPIED IN THEM AND IS SEND TO  */
  2520. /*                          THE DATA FILE OF THE CHECK POINT.                        */
  2521. /*                                                     SENDER: ACC,    LEVEL B       */
  2522. /*         ENTER ACC_SAVE_PAGES WITH                                                 */
  2523. /*                   LCP_CONNECTPTR,                 CONNECTION RECORD PTR           */
  2524. /*                   FRAGRECPTR                      FRAGMENT RECORD PTR             */
  2525. /* ******************--------------------------------------------------------------- */
  2526. /* ******************--------------------------------------------------------------- */
  2527. /* ACC_SAVE_PAGES            REQUEST TO SEND THE PAGE  TO DISK                       */
  2528. /* ******************------------------------------+   UNDO PAGES                    */
  2529. /*   SENDER: ACC,    LEVEL B       */
  2530. void Dbacc::execACC_SAVE_PAGES(Signal* signal) 
  2531. {
  2532.   jamEntry();
  2533.   lcpConnectptr.i = signal->theData[0];
  2534.   /* CONNECTION RECORD PTR           */
  2535.   fragrecptr.i = signal->theData[1];
  2536.   /* FRAGMENT RECORD PTR             */
  2537.   tresult = 0;
  2538.   ptrCheckGuard(lcpConnectptr, clcpConnectsize, lcpConnectrec);
  2539.   if (lcpConnectptr.p->lcpstate != LCP_ACTIVE) {
  2540.     jam();
  2541.     sendSystemerror(signal);
  2542.     return;
  2543.   }//if
  2544.   if (ERROR_INSERTED(3000)) {
  2545.     ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
  2546.     rootfragrecptr.i = fragrecptr.p->myroot;
  2547.     ptrCheckGuard(rootfragrecptr, crootfragmentsize, rootfragmentrec);
  2548.     if (rootfragrecptr.p->mytabptr == c_errorInsert3000_TableId){
  2549.       ndbout << "Delay writing of datapages" << endl;
  2550.       // Delay writing of pages
  2551.       jam();
  2552.       sendSignalWithDelay(cownBlockref, GSN_ACC_SAVE_PAGES, signal, 1000, 2);
  2553.       return;
  2554.     }
  2555.   }
  2556.   if (clblPageCounter == 0) {
  2557.     jam();
  2558.     signal->theData[0] = lcpConnectptr.i;
  2559.     signal->theData[1] = fragrecptr.i;
  2560.     sendSignalWithDelay(cownBlockref, GSN_ACC_SAVE_PAGES, signal, 100, 2);
  2561.     return;
  2562.   } else {
  2563.     jam();
  2564.     clblPageCounter = clblPageCounter - 1;
  2565.   }//if
  2566.   ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
  2567.   if (fragrecptr.p->fragState == LCP_SEND_PAGES) {
  2568.     jam();
  2569.     savepagesLab(signal);
  2570.     return;
  2571.   } else {
  2572.     if (fragrecptr.p->fragState == LCP_SEND_OVER_PAGES) {
  2573.       jam();
  2574.       saveOverPagesLab(signal);
  2575.       return;
  2576.     } else {
  2577.       ndbrequire(fragrecptr.p->fragState == LCP_SEND_ZERO_PAGE);
  2578.       jam();
  2579.       saveZeroPageLab(signal);
  2580.       return;
  2581.     }//if
  2582.   }//if
  2583. }//Dbacc::execACC_SAVE_PAGES()
  2584. void Dbacc::savepagesLab(Signal* signal) 
  2585. {
  2586.   DirRangePtr spDirRangePtr;
  2587.   DirectoryarrayPtr spDirptr;
  2588.   Page8Ptr aspPageptr;
  2589.   Page8Ptr aspCopyPageptr;
  2590.   Uint32 taspDirindex;
  2591.   Uint32 taspDirIndex;
  2592.   Uint32 taspIndex;
  2593.   if ((fragrecptr.p->lcpDirIndex >= fragrecptr.p->dirsize) ||
  2594.       (fragrecptr.p->lcpDirIndex >= fragrecptr.p->lcpMaxDirIndex)) {
  2595.     jam();
  2596.     endsavepageLab(signal);
  2597.     return;
  2598.   }//if
  2599.   /* SOME EXPAND PROCESSES HAVE BEEN PERFORMED. */
  2600.   /* THE ADDED PAGE ARE NOT SENT TO DISK */
  2601.   arrGuard(fragrecptr.p->activeDataPage, 8);
  2602.   aspCopyPageptr.i = fragrecptr.p->datapages[fragrecptr.p->activeDataPage];
  2603.   ptrCheckGuard(aspCopyPageptr, cpagesize, page8);
  2604.   taspDirindex = fragrecptr.p->lcpDirIndex; /* DIRECTORY OF ACTIVE PAGE */
  2605.   spDirRangePtr.i = fragrecptr.p->directory;
  2606.   taspDirIndex = taspDirindex >> 8;
  2607.   taspIndex = taspDirindex & 0xff;
  2608.   ptrCheckGuard(spDirRangePtr, cdirrangesize, dirRange);
  2609.   arrGuard(taspDirIndex, 256);
  2610.   spDirptr.i = spDirRangePtr.p->dirArray[taspDirIndex];
  2611.   ptrCheckGuard(spDirptr, cdirarraysize, directoryarray);
  2612.   aspPageptr.i = spDirptr.p->pagep[taspIndex];
  2613.   ptrCheckGuard(aspPageptr, cpagesize, page8);
  2614.   ndbrequire(aspPageptr.p->word32[ZPOS_PAGE_ID] == fragrecptr.p->lcpDirIndex);
  2615.   lcnPageptr = aspPageptr;
  2616.   lcnCopyPageptr = aspCopyPageptr;
  2617.   lcpCopyPage(signal);
  2618.   fragrecptr.p->lcpDirIndex++;
  2619.   fragrecptr.p->activeDataPage++;
  2620.   if (fragrecptr.p->activeDataPage < ZWRITEPAGESIZE) {
  2621.     jam();
  2622.     signal->theData[0] = lcpConnectptr.i;
  2623.     signal->theData[1] = fragrecptr.i;
  2624.     sendSignal(cownBlockref, GSN_ACC_SAVE_PAGES, signal, 2, JBB);
  2625.     return;
  2626.   }//if
  2627.   senddatapagesLab(signal);
  2628.   return;
  2629. }//Dbacc::savepagesLab()
  2630. /*  FRAGRECPTR:ACTIVE_DATA_PAGE = ZWRITEPAGESIZE */
  2631. /*  SEND A GROUP OF PAGES TO DISK     */
  2632. void Dbacc::senddatapagesLab(Signal* signal) 
  2633. {
  2634.   fsConnectptr.i = fragrecptr.p->fsConnPtr;
  2635.   ptrCheckGuard(fsConnectptr, cfsConnectsize, fsConnectrec);
  2636.   seizeFsOpRec(signal);
  2637.   initFsOpRec(signal);
  2638.   fsOpptr.p->fsOpstate = WAIT_WRITE_DATA;
  2639.   ndbrequire(fragrecptr.p->activeDataPage <= 8);
  2640.   for (Uint32 i = 0; i < fragrecptr.p->activeDataPage; i++) {
  2641.     signal->theData[i + 6] = fragrecptr.p->datapages[i];