DbtuxScan.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:30k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #define DBTUX_SCAN_CPP
  14. #include "Dbtux.hpp"
  15. void
  16. Dbtux::execACC_SCANREQ(Signal* signal)
  17. {
  18.   jamEntry();
  19.   const AccScanReq reqCopy = *(const AccScanReq*)signal->getDataPtr();
  20.   const AccScanReq* const req = &reqCopy;
  21.   ScanOpPtr scanPtr;
  22.   scanPtr.i = RNIL;
  23.   do {
  24.     // get the index
  25.     IndexPtr indexPtr;
  26.     c_indexPool.getPtr(indexPtr, req->tableId);
  27.     // get the fragment
  28.     FragPtr fragPtr;
  29.     fragPtr.i = RNIL;
  30.     for (unsigned i = 0; i < indexPtr.p->m_numFrags; i++) {
  31.       jam();
  32.       if (indexPtr.p->m_fragId[i] == req->fragmentNo) {
  33.         jam();
  34.         c_fragPool.getPtr(fragPtr, indexPtr.p->m_fragPtrI[i]);
  35.         break;
  36.       }
  37.     }
  38.     ndbrequire(fragPtr.i != RNIL);
  39.     Frag& frag = *fragPtr.p;
  40.     // must be normal DIH/TC fragment
  41.     ndbrequire(frag.m_fragId < (1 << frag.m_fragOff));
  42.     TreeHead& tree = frag.m_tree;
  43.     // check for empty fragment
  44.     if (tree.m_root == NullTupLoc) {
  45.       jam();
  46.       AccScanConf* const conf = (AccScanConf*)signal->getDataPtrSend();
  47.       conf->scanPtr = req->senderData;
  48.       conf->accPtr = RNIL;
  49.       conf->flag = AccScanConf::ZEMPTY_FRAGMENT;
  50.       sendSignal(req->senderRef, GSN_ACC_SCANCONF,
  51.           signal, AccScanConf::SignalLength, JBB);
  52.       return;
  53.     }
  54.     // seize from pool and link to per-fragment list
  55.     if (! frag.m_scanList.seize(scanPtr)) {
  56.       jam();
  57.       break;
  58.     }
  59.     new (scanPtr.p) ScanOp(c_scanBoundPool);
  60.     scanPtr.p->m_state = ScanOp::First;
  61.     scanPtr.p->m_userPtr = req->senderData;
  62.     scanPtr.p->m_userRef = req->senderRef;
  63.     scanPtr.p->m_tableId = indexPtr.p->m_tableId;
  64.     scanPtr.p->m_indexId = indexPtr.i;
  65.     scanPtr.p->m_fragId = fragPtr.p->m_fragId;
  66.     scanPtr.p->m_fragPtrI = fragPtr.i;
  67.     scanPtr.p->m_transId1 = req->transId1;
  68.     scanPtr.p->m_transId2 = req->transId2;
  69.     scanPtr.p->m_savePointId = req->savePointId;
  70.     scanPtr.p->m_readCommitted = AccScanReq::getReadCommittedFlag(req->requestInfo);
  71.     scanPtr.p->m_lockMode = AccScanReq::getLockMode(req->requestInfo);
  72.     scanPtr.p->m_keyInfo = AccScanReq::getKeyinfoFlag(req->requestInfo);
  73. #ifdef VM_TRACE
  74.     if (debugFlags & DebugScan) {
  75.       debugOut << "Seize scan " << scanPtr.i << " " << *scanPtr.p << endl;
  76.     }
  77. #endif
  78.     /*
  79.      * readCommitted lockMode keyInfo
  80.      * 1 0 0 - read committed (no lock)
  81.      * 0 0 0 - read latest (read lock)
  82.      * 0 1 1 - read exclusive (write lock)
  83.      */
  84.     // conf
  85.     AccScanConf* const conf = (AccScanConf*)signal->getDataPtrSend();
  86.     conf->scanPtr = req->senderData;
  87.     conf->accPtr = scanPtr.i;
  88.     conf->flag = AccScanConf::ZNOT_EMPTY_FRAGMENT;
  89.     sendSignal(req->senderRef, GSN_ACC_SCANCONF,
  90.         signal, AccScanConf::SignalLength, JBB);
  91.     return;
  92.   } while (0);
  93.   if (scanPtr.i != RNIL) {
  94.     jam();
  95.     releaseScanOp(scanPtr);
  96.   }
  97.   // LQH does not handle REF
  98.   signal->theData[0] = 0x313;
  99.   sendSignal(req->senderRef, GSN_ACC_SCANREF,
  100.       signal, 1, JBB);
  101. }
  102. /*
  103.  * Receive bounds for scan in single direct call.  The bounds can arrive
  104.  * in any order.  Attribute ids are those of index table.
  105.  *
  106.  * Replace EQ by equivalent LE + GE.  Check for conflicting bounds.
  107.  * Check that sets of lower and upper bounds are on initial sequences of
  108.  * keys and that all but possibly last bound is non-strict.
  109.  *
  110.  * Finally save the sets of lower and upper bounds (i.e. start key and
  111.  * end key).  Full bound type (< 4) is included but only the strict bit
  112.  * is used since lower and upper have now been separated.
  113.  */
  114. void
  115. Dbtux::execTUX_BOUND_INFO(Signal* signal)
  116. {
  117.   jamEntry();
  118.   struct BoundInfo {
  119.     int type;
  120.     unsigned offset;
  121.     unsigned size;
  122.   };
  123.   TuxBoundInfo* const sig = (TuxBoundInfo*)signal->getDataPtrSend();
  124.   const TuxBoundInfo reqCopy = *(const TuxBoundInfo*)sig;
  125.   const TuxBoundInfo* const req = &reqCopy;
  126.   // get records
  127.   ScanOp& scan = *c_scanOpPool.getPtr(req->tuxScanPtrI);
  128.   Index& index = *c_indexPool.getPtr(scan.m_indexId);
  129.   // collect lower and upper bounds
  130.   BoundInfo boundInfo[2][MaxIndexAttributes];
  131.   // largest attrId seen plus one
  132.   Uint32 maxAttrId[2] = { 0, 0 };
  133.   unsigned offset = 0;
  134.   const Uint32* const data = (Uint32*)sig + TuxBoundInfo::SignalLength;
  135.   // walk through entries
  136.   while (offset + 2 <= req->boundAiLength) {
  137.     jam();
  138.     const unsigned type = data[offset];
  139.     if (type > 4) {
  140.       jam();
  141.       scan.m_state = ScanOp::Invalid;
  142.       sig->errorCode = TuxBoundInfo::InvalidAttrInfo;
  143.       return;
  144.     }
  145.     const AttributeHeader* ah = (const AttributeHeader*)&data[offset + 1];
  146.     const Uint32 attrId = ah->getAttributeId();
  147.     const Uint32 dataSize = ah->getDataSize();
  148.     if (attrId >= index.m_numAttrs) {
  149.       jam();
  150.       scan.m_state = ScanOp::Invalid;
  151.       sig->errorCode = TuxBoundInfo::InvalidAttrInfo;
  152.       return;
  153.     }
  154.     for (unsigned j = 0; j <= 1; j++) {
  155.       // check if lower/upper bit matches
  156.       const unsigned luBit = (j << 1);
  157.       if ((type & 0x2) != luBit && type != 4)
  158.         continue;
  159.       // EQ -> LE, GE
  160.       const unsigned type2 = (type & 0x1) | luBit;
  161.       // fill in any gap
  162.       while (maxAttrId[j] <= attrId) {
  163.         BoundInfo& b = boundInfo[j][maxAttrId[j]++];
  164.         b.type = -1;
  165.       }
  166.       BoundInfo& b = boundInfo[j][attrId];
  167.       if (b.type != -1) {
  168.         // compare with previous bound
  169.         if (b.type != (int)type2 ||
  170.             b.size != 2 + dataSize ||
  171.             memcmp(&data[b.offset + 2], &data[offset + 2], dataSize << 2) != 0) {
  172.           jam();
  173.           scan.m_state = ScanOp::Invalid;
  174.           sig->errorCode = TuxBoundInfo::InvalidBounds;
  175.           return;
  176.         }
  177.       } else {
  178.         // enter new bound
  179.         b.type = type2;
  180.         b.offset = offset;
  181.         b.size = 2 + dataSize;
  182.       }
  183.     }
  184.     // jump to next
  185.     offset += 2 + dataSize;
  186.   }
  187.   if (offset != req->boundAiLength) {
  188.     jam();
  189.     scan.m_state = ScanOp::Invalid;
  190.     sig->errorCode = TuxBoundInfo::InvalidAttrInfo;
  191.     return;
  192.   }
  193.   for (unsigned j = 0; j <= 1; j++) {
  194.     // save lower/upper bound in index attribute id order
  195.     for (unsigned i = 0; i < maxAttrId[j]; i++) {
  196.       jam();
  197.       const BoundInfo& b = boundInfo[j][i];
  198.       // check for gap or strict bound before last
  199.       if (b.type == -1 || (i + 1 < maxAttrId[j] && (b.type & 0x1))) {
  200.         jam();
  201.         scan.m_state = ScanOp::Invalid;
  202.         sig->errorCode = TuxBoundInfo::InvalidBounds;
  203.         return;
  204.       }
  205.       bool ok = scan.m_bound[j]->append(&data[b.offset], b.size);
  206.       if (! ok) {
  207.         jam();
  208.         scan.m_state = ScanOp::Invalid;
  209.         sig->errorCode = TuxBoundInfo::OutOfBuffers;
  210.         return;
  211.       }
  212.     }
  213.     scan.m_boundCnt[j] = maxAttrId[j];
  214.   }
  215.   // no error
  216.   sig->errorCode = 0;
  217. }
  218. void
  219. Dbtux::execNEXT_SCANREQ(Signal* signal)
  220. {
  221.   jamEntry();
  222.   const NextScanReq reqCopy = *(const NextScanReq*)signal->getDataPtr();
  223.   const NextScanReq* const req = &reqCopy;
  224.   ScanOpPtr scanPtr;
  225.   scanPtr.i = req->accPtr;
  226.   c_scanOpPool.getPtr(scanPtr);
  227.   ScanOp& scan = *scanPtr.p;
  228.   Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
  229. #ifdef VM_TRACE
  230.   if (debugFlags & DebugScan) {
  231.     debugOut << "NEXT_SCANREQ scan " << scanPtr.i << " " << scan << endl;
  232.   }
  233. #endif
  234.   // handle unlock previous and close scan
  235.   switch (req->scanFlag) {
  236.   case NextScanReq::ZSCAN_NEXT:
  237.     jam();
  238.     break;
  239.   case NextScanReq::ZSCAN_NEXT_COMMIT:
  240.     jam();
  241.   case NextScanReq::ZSCAN_COMMIT:
  242.     jam();
  243.     if (! scan.m_readCommitted) {
  244.       jam();
  245.       AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
  246.       lockReq->returnCode = RNIL;
  247.       lockReq->requestInfo = AccLockReq::Unlock;
  248.       lockReq->accOpPtr = req->accOperationPtr;
  249.       EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
  250.       jamEntry();
  251.       ndbrequire(lockReq->returnCode == AccLockReq::Success);
  252.       removeAccLockOp(scan, req->accOperationPtr);
  253.     }
  254.     if (req->scanFlag == NextScanReq::ZSCAN_COMMIT) {
  255.       jam();
  256.       NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
  257.       conf->scanPtr = scan.m_userPtr;
  258.       unsigned signalLength = 1;
  259.       sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF,
  260.           signal, signalLength, JBB);
  261.       return;
  262.     }
  263.     break;
  264.   case NextScanReq::ZSCAN_CLOSE:
  265.     jam();
  266.     // unlink from tree node first to avoid state changes
  267.     if (scan.m_scanPos.m_loc != NullTupLoc) {
  268.       jam();
  269.       const TupLoc loc = scan.m_scanPos.m_loc;
  270.       NodeHandle node(frag);
  271.       selectNode(node, loc);
  272.       unlinkScan(node, scanPtr);
  273.       scan.m_scanPos.m_loc = NullTupLoc;
  274.     }
  275.     if (scan.m_lockwait) {
  276.       jam();
  277.       ndbrequire(scan.m_accLockOp != RNIL);
  278.       // use ACC_ABORTCONF to flush out any reply in job buffer
  279.       AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
  280.       lockReq->returnCode = RNIL;
  281.       lockReq->requestInfo = AccLockReq::AbortWithConf;
  282.       lockReq->accOpPtr = scan.m_accLockOp;
  283.       EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
  284.       jamEntry();
  285.       ndbrequire(lockReq->returnCode == AccLockReq::Success);
  286.       scan.m_state = ScanOp::Aborting;
  287.       return;
  288.     }
  289.     if (scan.m_state == ScanOp::Locked) {
  290.       jam();
  291.       ndbrequire(scan.m_accLockOp != RNIL);
  292.       AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
  293.       lockReq->returnCode = RNIL;
  294.       lockReq->requestInfo = AccLockReq::Unlock;
  295.       lockReq->accOpPtr = scan.m_accLockOp;
  296.       EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
  297.       jamEntry();
  298.       ndbrequire(lockReq->returnCode == AccLockReq::Success);
  299.       scan.m_accLockOp = RNIL;
  300.     }
  301.     scan.m_state = ScanOp::Aborting;
  302.     scanClose(signal, scanPtr);
  303.     return;
  304.   case NextScanReq::ZSCAN_NEXT_ABORT:
  305.     jam();
  306.   default:
  307.     jam();
  308.     ndbrequire(false);
  309.     break;
  310.   }
  311.   // start looking for next scan result
  312.   AccCheckScan* checkReq = (AccCheckScan*)signal->getDataPtrSend();
  313.   checkReq->accPtr = scanPtr.i;
  314.   checkReq->checkLcpStop = AccCheckScan::ZNOT_CHECK_LCP_STOP;
  315.   EXECUTE_DIRECT(DBTUX, GSN_ACC_CHECK_SCAN, signal, AccCheckScan::SignalLength);
  316.   jamEntry();
  317. }
  318. void
  319. Dbtux::execACC_CHECK_SCAN(Signal* signal)
  320. {
  321.   jamEntry();
  322.   const AccCheckScan reqCopy = *(const AccCheckScan*)signal->getDataPtr();
  323.   const AccCheckScan* const req = &reqCopy;
  324.   ScanOpPtr scanPtr;
  325.   scanPtr.i = req->accPtr;
  326.   c_scanOpPool.getPtr(scanPtr);
  327.   ScanOp& scan = *scanPtr.p;
  328.   Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
  329. #ifdef VM_TRACE
  330.   if (debugFlags & DebugScan) {
  331.     debugOut << "ACC_CHECK_SCAN scan " << scanPtr.i << " " << scan << endl;
  332.   }
  333. #endif
  334.   if (req->checkLcpStop == AccCheckScan::ZCHECK_LCP_STOP) {
  335.     jam();
  336.     signal->theData[0] = scan.m_userPtr;
  337.     signal->theData[1] = true;
  338.     EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
  339.     jamEntry();
  340.     return;   // stop
  341.   }
  342.   if (scan.m_lockwait) {
  343.     jam();
  344.     // LQH asks if we are waiting for lock and we tell it to ask again
  345.     const TreeEnt ent = scan.m_scanEnt;
  346.     NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
  347.     conf->scanPtr = scan.m_userPtr;
  348.     conf->accOperationPtr = RNIL;       // no tuple returned
  349.     conf->fragId = frag.m_fragId | (ent.m_fragBit << frag.m_fragOff);
  350.     unsigned signalLength = 3;
  351.     // if TC has ordered scan close, it will be detected here
  352.     sendSignal(scan.m_userRef, GSN_NEXT_SCANCONF,
  353.         signal, signalLength, JBB);
  354.     return;     // stop
  355.   }
  356.   if (scan.m_state == ScanOp::First) {
  357.     jam();
  358.     // search is done only once in single range scan
  359.     scanFirst(scanPtr);
  360. #ifdef VM_TRACE
  361.     if (debugFlags & DebugScan) {
  362.       debugOut << "First scan " << scanPtr.i << " " << scan << endl;
  363.     }
  364. #endif
  365.   }
  366.   if (scan.m_state == ScanOp::Next) {
  367.     jam();
  368.     // look for next
  369.     scanNext(scanPtr);
  370.   }
  371.   // for reading tuple key in Current or Locked state
  372.   Data pkData = c_dataBuffer;
  373.   unsigned pkSize = 0; // indicates not yet done
  374.   if (scan.m_state == ScanOp::Current) {
  375.     // found an entry to return
  376.     jam();
  377.     ndbrequire(scan.m_accLockOp == RNIL);
  378.     if (! scan.m_readCommitted) {
  379.       jam();
  380.       const TreeEnt ent = scan.m_scanEnt;
  381.       // read tuple key
  382.       readTablePk(frag, ent, pkData, pkSize);
  383.       // get read lock or exclusive lock
  384.       AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
  385.       lockReq->returnCode = RNIL;
  386.       lockReq->requestInfo =
  387.         scan.m_lockMode == 0 ? AccLockReq::LockShared : AccLockReq::LockExclusive;
  388.       lockReq->accOpPtr = RNIL;
  389.       lockReq->userPtr = scanPtr.i;
  390.       lockReq->userRef = reference();
  391.       lockReq->tableId = scan.m_tableId;
  392.       lockReq->fragId = frag.m_fragId | (ent.m_fragBit << frag.m_fragOff);
  393.       lockReq->fragPtrI = frag.m_accTableFragPtrI[ent.m_fragBit];
  394.       const Uint32* const buf32 = static_cast<Uint32*>(pkData);
  395.       const Uint64* const buf64 = reinterpret_cast<const Uint64*>(buf32);
  396.       lockReq->hashValue = md5_hash(buf64, pkSize);
  397.       lockReq->tupAddr = getTupAddr(frag, ent);
  398.       lockReq->transId1 = scan.m_transId1;
  399.       lockReq->transId2 = scan.m_transId2;
  400.       // execute
  401.       EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::LockSignalLength);
  402.       jamEntry();
  403.       switch (lockReq->returnCode) {
  404.       case AccLockReq::Success:
  405.         jam();
  406.         scan.m_state = ScanOp::Locked;
  407.         scan.m_accLockOp = lockReq->accOpPtr;
  408. #ifdef VM_TRACE
  409.         if (debugFlags & DebugScan) {
  410.           debugOut << "Lock immediate scan " << scanPtr.i << " " << scan << endl;
  411.         }
  412. #endif
  413.         break;
  414.       case AccLockReq::IsBlocked:
  415.         jam();
  416.         // normal lock wait
  417.         scan.m_state = ScanOp::Blocked;
  418.         scan.m_lockwait = true;
  419.         scan.m_accLockOp = lockReq->accOpPtr;
  420. #ifdef VM_TRACE
  421.         if (debugFlags & DebugScan) {
  422.           debugOut << "Lock wait scan " << scanPtr.i << " " << scan << endl;
  423.         }
  424. #endif
  425.         // LQH will wake us up
  426.         signal->theData[0] = scan.m_userPtr;
  427.         signal->theData[1] = true;
  428.         EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
  429.         jamEntry();
  430.         return;  // stop
  431.         break;
  432.       case AccLockReq::Refused:
  433.         jam();
  434.         // we cannot see deleted tuple (assert only)
  435.         ndbassert(false);
  436.         // skip it
  437.         scan.m_state = ScanOp::Next;
  438.         signal->theData[0] = scan.m_userPtr;
  439.         signal->theData[1] = true;
  440.         EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
  441.         jamEntry();
  442.         return;  // stop
  443.         break;
  444.       case AccLockReq::NoFreeOp:
  445.         jam();
  446.         // max ops should depend on max scans (assert only)
  447.         ndbassert(false);
  448.         // stay in Current state
  449.         scan.m_state = ScanOp::Current;
  450.         signal->theData[0] = scan.m_userPtr;
  451.         signal->theData[1] = true;
  452.         EXECUTE_DIRECT(DBLQH, GSN_CHECK_LCP_STOP, signal, 2);
  453.         jamEntry();
  454.         return;  // stop
  455.         break;
  456.       default:
  457.         ndbrequire(false);
  458.         break;
  459.       }
  460.     } else {
  461.       scan.m_state = ScanOp::Locked;
  462.     }
  463.   }
  464.   if (scan.m_state == ScanOp::Locked) {
  465.     // we have lock or do not need one
  466.     jam();
  467.     // read keys if not already done (uses signal)
  468.     const TreeEnt ent = scan.m_scanEnt;
  469.     if (scan.m_keyInfo) {
  470.       jam();
  471.       if (pkSize == 0) {
  472.         jam();
  473.         readTablePk(frag, ent, pkData, pkSize);
  474.       }
  475.     }
  476.     // conf signal
  477.     NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
  478.     conf->scanPtr = scan.m_userPtr;
  479.     // the lock is passed to LQH
  480.     Uint32 accLockOp = scan.m_accLockOp;
  481.     if (accLockOp != RNIL) {
  482.       scan.m_accLockOp = RNIL;
  483.       // remember it until LQH unlocks it
  484.       addAccLockOp(scan, accLockOp);
  485.     } else {
  486.       ndbrequire(scan.m_readCommitted);
  487.       // operation RNIL in LQH would signal no tuple returned
  488.       accLockOp = (Uint32)-1;
  489.     }
  490.     conf->accOperationPtr = accLockOp;
  491.     conf->fragId = frag.m_fragId | (ent.m_fragBit << frag.m_fragOff);
  492.     conf->localKey[0] = getTupAddr(frag, ent);
  493.     conf->localKey[1] = 0;
  494.     conf->localKeyLength = 1;
  495.     unsigned signalLength = 6;
  496.     // add key info
  497.     if (scan.m_keyInfo) {
  498.       jam();
  499.       conf->keyLength = pkSize;
  500.       // piggy-back first 4 words of key data
  501.       for (unsigned i = 0; i < 4; i++) {
  502.         conf->key[i] = i < pkSize ? pkData[i] : 0;
  503.       }
  504.       signalLength = 11;
  505.     }
  506.     if (! scan.m_readCommitted) {
  507.       sendSignal(scan.m_userRef, GSN_NEXT_SCANCONF,
  508.           signal, signalLength, JBB);
  509.     } else {
  510.       Uint32 blockNo = refToBlock(scan.m_userRef);
  511.       EXECUTE_DIRECT(blockNo, GSN_NEXT_SCANCONF, signal, signalLength);
  512.     }
  513.     // send rest of key data
  514.     if (scan.m_keyInfo && pkSize > 4) {
  515.       unsigned total = 4;
  516.       while (total < pkSize) {
  517.         jam();
  518.         unsigned length = pkSize - total;
  519.         if (length > 20)
  520.           length = 20;
  521.         signal->theData[0] = scan.m_userPtr;
  522.         signal->theData[1] = 0;
  523.         signal->theData[2] = 0;
  524.         signal->theData[3] = length;
  525.         memcpy(&signal->theData[4], &pkData[total], length << 2);
  526.         sendSignal(scan.m_userRef, GSN_ACC_SCAN_INFO24,
  527.             signal, 4 + length, JBB);
  528.         total += length;
  529.       }
  530.     }
  531.     // next time look for next entry
  532.     scan.m_state = ScanOp::Next;
  533.     return;
  534.   }
  535.   // XXX in ACC this is checked before req->checkLcpStop
  536.   if (scan.m_state == ScanOp::Last ||
  537.       scan.m_state == ScanOp::Invalid) {
  538.     jam();
  539.     NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
  540.     conf->scanPtr = scan.m_userPtr;
  541.     conf->accOperationPtr = RNIL;
  542.     conf->fragId = RNIL;
  543.     unsigned signalLength = 3;
  544.     sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF,
  545.         signal, signalLength, JBB);
  546.     return;
  547.   }
  548.   ndbrequire(false);
  549. }
  550. /*
  551.  * Lock succeeded (after delay) in ACC.  If the lock is for current
  552.  * entry, set state to Locked.  If the lock is for an entry we were
  553.  * moved away from, simply unlock it.  Finally, if we are closing the
  554.  * scan, do nothing since we have already sent an abort request.
  555.  */
  556. void
  557. Dbtux::execACCKEYCONF(Signal* signal)
  558. {
  559.   jamEntry();
  560.   ScanOpPtr scanPtr;
  561.   scanPtr.i = signal->theData[0];
  562.   c_scanOpPool.getPtr(scanPtr);
  563.   ScanOp& scan = *scanPtr.p;
  564. #ifdef VM_TRACE
  565.   if (debugFlags & DebugScan) {
  566.     debugOut << "Lock obtained scan " << scanPtr.i << " " << scan << endl;
  567.   }
  568. #endif
  569.   ndbrequire(scan.m_lockwait && scan.m_accLockOp != RNIL);
  570.   scan.m_lockwait = false;
  571.   if (scan.m_state == ScanOp::Blocked) {
  572.     // the lock wait was for current entry
  573.     jam();
  574.     scan.m_state = ScanOp::Locked;
  575.     // LQH has the ball
  576.     return;
  577.   }
  578.   if (scan.m_state != ScanOp::Aborting) {
  579.     // we were moved, release lock
  580.     jam();
  581.     AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
  582.     lockReq->returnCode = RNIL;
  583.     lockReq->requestInfo = AccLockReq::Unlock;
  584.     lockReq->accOpPtr = scan.m_accLockOp;
  585.     EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
  586.     jamEntry();
  587.     ndbrequire(lockReq->returnCode == AccLockReq::Success);
  588.     scan.m_accLockOp = RNIL;
  589.     // LQH has the ball
  590.     return;
  591.   }
  592.   // lose the lock
  593.   scan.m_accLockOp = RNIL;
  594.   // continue at ACC_ABORTCONF
  595. }
  596. /*
  597.  * Lock failed (after delay) in ACC.  Probably means somebody ahead of
  598.  * us in lock queue deleted the tuple.
  599.  */
  600. void
  601. Dbtux::execACCKEYREF(Signal* signal)
  602. {
  603.   jamEntry();
  604.   ScanOpPtr scanPtr;
  605.   scanPtr.i = signal->theData[0];
  606.   c_scanOpPool.getPtr(scanPtr);
  607.   ScanOp& scan = *scanPtr.p;
  608. #ifdef VM_TRACE
  609.   if (debugFlags & DebugScan) {
  610.     debugOut << "Lock refused scan " << scanPtr.i << " " << scan << endl;
  611.   }
  612. #endif
  613.   ndbrequire(scan.m_lockwait && scan.m_accLockOp != RNIL);
  614.   scan.m_lockwait = false;
  615.   if (scan.m_state != ScanOp::Aborting) {
  616.     jam();
  617.     // release the operation
  618.     AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
  619.     lockReq->returnCode = RNIL;
  620.     lockReq->requestInfo = AccLockReq::Abort;
  621.     lockReq->accOpPtr = scan.m_accLockOp;
  622.     EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
  623.     jamEntry();
  624.     ndbrequire(lockReq->returnCode == AccLockReq::Success);
  625.     scan.m_accLockOp = RNIL;
  626.     // scan position should already have been moved (assert only)
  627.     if (scan.m_state == ScanOp::Blocked) {
  628.       jam();
  629.       ndbassert(false);
  630.       scan.m_state = ScanOp::Next;
  631.     }
  632.     // LQH has the ball
  633.     return;
  634.   }
  635.   // lose the lock
  636.   scan.m_accLockOp = RNIL;
  637.   // continue at ACC_ABORTCONF
  638. }
  639. /*
  640.  * Received when scan is closing.  This signal arrives after any
  641.  * ACCKEYCON or ACCKEYREF which may have been in job buffer.
  642.  */
  643. void
  644. Dbtux::execACC_ABORTCONF(Signal* signal)
  645. {
  646.   jamEntry();
  647.   ScanOpPtr scanPtr;
  648.   scanPtr.i = signal->theData[0];
  649.   c_scanOpPool.getPtr(scanPtr);
  650.   ScanOp& scan = *scanPtr.p;
  651. #ifdef VM_TRACE
  652.   if (debugFlags & DebugScan) {
  653.     debugOut << "ACC_ABORTCONF scan " << scanPtr.i << " " << scan << endl;
  654.   }
  655. #endif
  656.   ndbrequire(scan.m_state == ScanOp::Aborting);
  657.   // most likely we are still in lock wait
  658.   if (scan.m_lockwait) {
  659.     jam();
  660.     scan.m_lockwait = false;
  661.     scan.m_accLockOp = RNIL;
  662.   }
  663.   scanClose(signal, scanPtr);
  664. }
  665. /*
  666.  * Find start position for single range scan.  If it exists, sets state
  667.  * to Next and links the scan to the node.  The first entry is returned
  668.  * by scanNext.
  669.  */
  670. void
  671. Dbtux::scanFirst(ScanOpPtr scanPtr)
  672. {
  673.   ScanOp& scan = *scanPtr.p;
  674.   Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
  675.   TreeHead& tree = frag.m_tree;
  676.   // set up index keys for this operation
  677.   setKeyAttrs(frag);
  678.   // unpack lower bound into c_dataBuffer
  679.   const ScanBound& bound = *scan.m_bound[0];
  680.   ScanBoundIterator iter;
  681.   bound.first(iter);
  682.   for (unsigned j = 0; j < bound.getSize(); j++) {
  683.     jam();
  684.     c_dataBuffer[j] = *iter.data;
  685.     bound.next(iter);
  686.   }
  687.   // search for scan start position
  688.   TreePos treePos;
  689.   searchToScan(frag, c_dataBuffer, scan.m_boundCnt[0], treePos);
  690.   if (treePos.m_loc == NullTupLoc) {
  691.     // empty tree
  692.     jam();
  693.     scan.m_state = ScanOp::Last;
  694.     return;
  695.   }
  696.   // set position and state
  697.   scan.m_scanPos = treePos;
  698.   scan.m_state = ScanOp::Next;
  699.   // link the scan to node found
  700.   NodeHandle node(frag);
  701.   selectNode(node, treePos.m_loc);
  702.   linkScan(node, scanPtr);
  703. }
  704. /*
  705.  * Move to next entry.  The scan is already linked to some node.  When
  706.  * we leave, if an entry was found, it will be linked to a possibly
  707.  * different node.  The scan has a position, and a direction which tells
  708.  * from where we came to this position.  This is one of:
  709.  *
  710.  * 0 - up from left child (scan this node next)
  711.  * 1 - up from right child (proceed to parent)
  712.  * 2 - up from root (the scan ends)
  713.  * 3 - left to right within node (at end proceed to right child)
  714.  * 4 - down from parent (proceed to left child)
  715.  *
  716.  * If an entry was found, scan direction is 3.  Therefore tree
  717.  * re-organizations need not worry about scan direction.
  718.  */
  719. void
  720. Dbtux::scanNext(ScanOpPtr scanPtr)
  721. {
  722.   ScanOp& scan = *scanPtr.p;
  723.   Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
  724. #ifdef VM_TRACE
  725.   if (debugFlags & DebugScan) {
  726.     debugOut << "Next in scan " << scanPtr.i << " " << scan << endl;
  727.   }
  728. #endif
  729.   // cannot be moved away from tuple we have locked
  730.   ndbrequire(scan.m_state != ScanOp::Locked);
  731.   // set up index keys for this operation
  732.   setKeyAttrs(frag);
  733.   // unpack upper bound into c_dataBuffer
  734.   const ScanBound& bound = *scan.m_bound[1];
  735.   ScanBoundIterator iter;
  736.   bound.first(iter);
  737.   for (unsigned j = 0; j < bound.getSize(); j++) {
  738.     jam();
  739.     c_dataBuffer[j] = *iter.data;
  740.     bound.next(iter);
  741.   }
  742.   // use copy of position
  743.   TreePos pos = scan.m_scanPos;
  744.   // get and remember original node
  745.   NodeHandle origNode(frag);
  746.   selectNode(origNode, pos.m_loc);
  747.   ndbrequire(islinkScan(origNode, scanPtr));
  748.   // current node in loop
  749.   NodeHandle node = origNode;
  750.   // copy of entry found
  751.   TreeEnt ent;
  752.   while (true) {
  753.     jam();
  754.     if (pos.m_dir == 2) {
  755.       // coming up from root ends the scan
  756.       jam();
  757.       pos.m_loc = NullTupLoc;
  758.       scan.m_state = ScanOp::Last;
  759.       break;
  760.     }
  761.     if (node.m_loc != pos.m_loc) {
  762.       jam();
  763.       selectNode(node, pos.m_loc);
  764.     }
  765.     if (pos.m_dir == 4) {
  766.       // coming down from parent proceed to left child
  767.       jam();
  768.       TupLoc loc = node.getLink(0);
  769.       if (loc != NullTupLoc) {
  770.         jam();
  771.         pos.m_loc = loc;
  772.         pos.m_dir = 4;  // unchanged
  773.         continue;
  774.       }
  775.       // pretend we came from left child
  776.       pos.m_dir = 0;
  777.     }
  778.     if (pos.m_dir == 0) {
  779.       // coming up from left child scan current node
  780.       jam();
  781.       pos.m_pos = 0;
  782.       pos.m_match = false;
  783.       pos.m_dir = 3;
  784.     }
  785.     if (pos.m_dir == 3) {
  786.       // within node
  787.       jam();
  788.       unsigned occup = node.getOccup();
  789.       ndbrequire(occup >= 1);
  790.       // advance position
  791.       if (! pos.m_match)
  792.         pos.m_match = true;
  793.       else
  794.         pos.m_pos++;
  795.       if (pos.m_pos < occup) {
  796.         jam();
  797.         ent = node.getEnt(pos.m_pos);
  798.         pos.m_dir = 3;  // unchanged
  799.         // read and compare all attributes
  800.         readKeyAttrs(frag, ent, 0, c_entryKey);
  801.         int ret = cmpScanBound(frag, 1, c_dataBuffer, scan.m_boundCnt[1], c_entryKey);
  802.         ndbrequire(ret != NdbSqlUtil::CmpUnknown);
  803.         if (ret < 0) {
  804.           jam();
  805.           // hit upper bound of single range scan
  806.           pos.m_loc = NullTupLoc;
  807.           scan.m_state = ScanOp::Last;
  808.           break;
  809.         }
  810.         // can we see it
  811.         if (! scanVisible(scanPtr, ent)) {
  812.           jam();
  813.           continue;
  814.         }
  815.         // found entry
  816.         scan.m_state = ScanOp::Current;
  817.         break;
  818.       }
  819.       // after node proceed to right child
  820.       TupLoc loc = node.getLink(1);
  821.       if (loc != NullTupLoc) {
  822.         jam();
  823.         pos.m_loc = loc;
  824.         pos.m_dir = 4;
  825.         continue;
  826.       }
  827.       // pretend we came from right child
  828.       pos.m_dir = 1;
  829.     }
  830.     if (pos.m_dir == 1) {
  831.       // coming up from right child proceed to parent
  832.       jam();
  833.       pos.m_loc = node.getLink(2);
  834.       pos.m_dir = node.getSide();
  835.       continue;
  836.     }
  837.     ndbrequire(false);
  838.   }
  839.   // copy back position
  840.   scan.m_scanPos = pos;
  841.   // relink
  842.   if (scan.m_state == ScanOp::Current) {
  843.     ndbrequire(pos.m_match == true && pos.m_dir == 3);
  844.     ndbrequire(pos.m_loc == node.m_loc);
  845.     if (origNode.m_loc != node.m_loc) {
  846.       jam();
  847.       unlinkScan(origNode, scanPtr);
  848.       linkScan(node, scanPtr);
  849.     }
  850.     // copy found entry
  851.     scan.m_scanEnt = ent;
  852.   } else if (scan.m_state == ScanOp::Last) {
  853.     jam();
  854.     ndbrequire(pos.m_loc == NullTupLoc);
  855.     unlinkScan(origNode, scanPtr);
  856.   } else {
  857.     ndbrequire(false);
  858.   }
  859. #ifdef VM_TRACE
  860.   if (debugFlags & DebugScan) {
  861.     debugOut << "Next out scan " << scanPtr.i << " " << scan << endl;
  862.   }
  863. #endif
  864. }
  865. /*
  866.  * Check if an entry is visible to the scan.
  867.  *
  868.  * There is a special check to never accept same tuple twice in a row.
  869.  * This is faster than asking TUP.  It also fixes some special cases
  870.  * which are not analyzed or handled yet.
  871.  */
  872. bool
  873. Dbtux::scanVisible(ScanOpPtr scanPtr, TreeEnt ent)
  874. {
  875.   const ScanOp& scan = *scanPtr.p;
  876.   const Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
  877.   Uint32 fragBit = ent.m_fragBit;
  878.   Uint32 tableFragPtrI = frag.m_tupTableFragPtrI[fragBit];
  879.   Uint32 fragId = frag.m_fragId | (fragBit << frag.m_fragOff);
  880.   Uint32 tupAddr = getTupAddr(frag, ent);
  881.   Uint32 tupVersion = ent.m_tupVersion;
  882.   // check for same tuple twice in row
  883.   if (scan.m_scanEnt.m_tupLoc == ent.m_tupLoc &&
  884.       scan.m_scanEnt.m_fragBit == fragBit) {
  885.     jam();
  886.     return false;
  887.   }
  888.   Uint32 transId1 = scan.m_transId1;
  889.   Uint32 transId2 = scan.m_transId2;
  890.   Uint32 savePointId = scan.m_savePointId;
  891.   bool ret = c_tup->tuxQueryTh(tableFragPtrI, tupAddr, tupVersion, transId1, transId2, savePointId);
  892.   jamEntry();
  893.   return ret;
  894. }
  895. /*
  896.  * Finish closing of scan and send conf.  Any lock wait has been done
  897.  * already.
  898.  */
  899. void
  900. Dbtux::scanClose(Signal* signal, ScanOpPtr scanPtr)
  901. {
  902.   ScanOp& scan = *scanPtr.p;
  903.   ndbrequire(! scan.m_lockwait && scan.m_accLockOp == RNIL);
  904.   // unlock all not unlocked by LQH
  905.   for (unsigned i = 0; i < scan.m_maxAccLockOps; i++) {
  906.     if (scan.m_accLockOps[i] != RNIL) {
  907.       jam();
  908.       AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
  909.       lockReq->returnCode = RNIL;
  910.       lockReq->requestInfo = AccLockReq::Abort;
  911.       lockReq->accOpPtr = scan.m_accLockOps[i];
  912.       EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
  913.       jamEntry();
  914.       ndbrequire(lockReq->returnCode == AccLockReq::Success);
  915.       scan.m_accLockOps[i] = RNIL;
  916.     }
  917.   }
  918.   // send conf
  919.   NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
  920.   conf->scanPtr = scanPtr.p->m_userPtr;
  921.   conf->accOperationPtr = RNIL;
  922.   conf->fragId = RNIL;
  923.   unsigned signalLength = 3;
  924.   sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF,
  925.       signal, signalLength, JBB);
  926.   releaseScanOp(scanPtr);
  927. }
  928. void
  929. Dbtux::addAccLockOp(ScanOp& scan, Uint32 accLockOp)
  930. {
  931.   ndbrequire(accLockOp != RNIL);
  932.   Uint32* list = scan.m_accLockOps;
  933.   bool ok = false;
  934.   for (unsigned i = 0; i < scan.m_maxAccLockOps; i++) {
  935.     ndbrequire(list[i] != accLockOp);
  936.     if (! ok && list[i] == RNIL) {
  937.       list[i] = accLockOp;
  938.       ok = true;
  939.       // continue check for duplicates
  940.     }
  941.   }
  942.   if (! ok) {
  943.     unsigned i = scan.m_maxAccLockOps;
  944.     if (i < MaxAccLockOps) {
  945.       list[i] = accLockOp;
  946.       ok = true;
  947.       scan.m_maxAccLockOps = i + 1;
  948.     }
  949.   }
  950.   ndbrequire(ok);
  951. }
  952. void
  953. Dbtux::removeAccLockOp(ScanOp& scan, Uint32 accLockOp)
  954. {
  955.   ndbrequire(accLockOp != RNIL);
  956.   Uint32* list = scan.m_accLockOps;
  957.   bool ok = false;
  958.   for (unsigned i = 0; i < scan.m_maxAccLockOps; i++) {
  959.     if (list[i] == accLockOp) {
  960.       list[i] = RNIL;
  961.       ok = true;
  962.       break;
  963.     }
  964.   }
  965.   ndbrequire(ok);
  966. }
  967. /*
  968.  * Release allocated records.
  969.  */
  970. void
  971. Dbtux::releaseScanOp(ScanOpPtr& scanPtr)
  972. {
  973. #ifdef VM_TRACE
  974.   if (debugFlags & DebugScan) {
  975.     debugOut << "Release scan " << scanPtr.i << " " << *scanPtr.p << endl;
  976.   }
  977. #endif
  978.   Frag& frag = *c_fragPool.getPtr(scanPtr.p->m_fragPtrI);
  979.   scanPtr.p->m_boundMin.release();
  980.   scanPtr.p->m_boundMax.release();
  981.   // unlink from per-fragment list and release from pool
  982.   frag.m_scanList.release(scanPtr);
  983. }