NdbScanOperation.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:45k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #include <ndb_global.h>
  14. #include <Ndb.hpp>
  15. #include <NdbScanOperation.hpp>
  16. #include <NdbIndexScanOperation.hpp>
  17. #include <NdbConnection.hpp>
  18. #include <NdbResultSet.hpp>
  19. #include "NdbApiSignal.hpp"
  20. #include <NdbOut.hpp>
  21. #include "NdbDictionaryImpl.hpp"
  22. #include <NdbRecAttr.hpp>
  23. #include <NdbReceiver.hpp>
  24. #include <stdlib.h>
  25. #include <NdbSqlUtil.hpp>
  26. #include <signaldata/ScanTab.hpp>
  27. #include <signaldata/KeyInfo.hpp>
  28. #include <signaldata/AttrInfo.hpp>
  29. #include <signaldata/TcKeyReq.hpp>
  30. #define DEBUG_NEXT_RESULT 0
  31. NdbScanOperation::NdbScanOperation(Ndb* aNdb) :
  32.   NdbOperation(aNdb),
  33.   m_resultSet(0),
  34.   m_transConnection(NULL)
  35. {
  36.   theParallelism = 0;
  37.   m_allocated_receivers = 0;
  38.   m_prepared_receivers = 0;
  39.   m_api_receivers = 0;
  40.   m_conf_receivers = 0;
  41.   m_sent_receivers = 0;
  42.   m_receivers = 0;
  43.   m_array = new Uint32[1]; // skip if on delete in fix_receivers
  44.   theSCAN_TABREQ = 0;
  45. }
  46. NdbScanOperation::~NdbScanOperation()
  47. {
  48.   for(Uint32 i = 0; i<m_allocated_receivers; i++){
  49.     m_receivers[i]->release();
  50.     theNdb->releaseNdbScanRec(m_receivers[i]);
  51.   }
  52.   delete[] m_array;
  53.   if (m_resultSet)
  54.     delete m_resultSet;
  55. }
  56. NdbResultSet* 
  57. NdbScanOperation::getResultSet()
  58. {
  59.   if (!m_resultSet)
  60.     m_resultSet = new NdbResultSet(this);
  61.   return m_resultSet;
  62. }
  63. void
  64. NdbScanOperation::setErrorCode(int aErrorCode){
  65.   NdbConnection* tmp = theNdbCon;
  66.   theNdbCon = m_transConnection;
  67.   NdbOperation::setErrorCode(aErrorCode);
  68.   theNdbCon = tmp;
  69. }
  70. void
  71. NdbScanOperation::setErrorCodeAbort(int aErrorCode){
  72.   NdbConnection* tmp = theNdbCon;
  73.   theNdbCon = m_transConnection;
  74.   NdbOperation::setErrorCodeAbort(aErrorCode);
  75.   theNdbCon = tmp;
  76. }
  77.   
  78. /*****************************************************************************
  79.  * int init();
  80.  *
  81.  * Return Value:  Return 0 : init was successful.
  82.  *                Return -1: In all other case.  
  83.  * Remark:        Initiates operation record after allocation.
  84.  *****************************************************************************/
  85. int
  86. NdbScanOperation::init(const NdbTableImpl* tab, NdbConnection* myConnection)
  87. {
  88.   m_transConnection = myConnection;
  89.   //NdbConnection* aScanConnection = theNdb->startTransaction(myConnection);
  90.   NdbConnection* aScanConnection = theNdb->hupp(myConnection);
  91.   if (!aScanConnection){
  92.     setErrorCodeAbort(theNdb->getNdbError().code);
  93.     return -1;
  94.   }
  95.   // NOTE! The hupped trans becomes the owner of the operation
  96.   if(NdbOperation::init(tab, aScanConnection) != 0){
  97.     return -1;
  98.   }
  99.   
  100.   initInterpreter();
  101.   
  102.   theStatus = GetValue;
  103.   theOperationType = OpenScanRequest;
  104.   theNdbCon->theMagicNumber = 0xFE11DF;
  105.   return 0;
  106. }
  107. NdbResultSet* NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
  108.    Uint32 batch, 
  109.    Uint32 parallel)
  110. {
  111.   m_ordered = 0;
  112.   Uint32 fragCount = m_currentTable->m_fragmentCount;
  113.   if (parallel > fragCount || parallel == 0) {
  114.      parallel = fragCount;
  115.   }
  116.   // It is only possible to call openScan if 
  117.   //  1. this transcation don't already  contain another scan operation
  118.   //  2. this transaction don't already contain other operations
  119.   //  3. theScanOp contains a NdbScanOperation
  120.   if (theNdbCon->theScanningOp != NULL){
  121.     setErrorCode(4605);
  122.     return 0;
  123.   }
  124.   theNdbCon->theScanningOp = this;
  125.   theLockMode = lm;
  126.   bool lockExcl, lockHoldMode, readCommitted;
  127.   switch(lm){
  128.   case NdbScanOperation::LM_Read:
  129.     lockExcl = false;
  130.     lockHoldMode = true;
  131.     readCommitted = false;
  132.     break;
  133.   case NdbScanOperation::LM_Exclusive:
  134.     lockExcl = true;
  135.     lockHoldMode = true;
  136.     readCommitted = false;
  137.     break;
  138.   case NdbScanOperation::LM_CommittedRead:
  139.     lockExcl = false;
  140.     lockHoldMode = false;
  141.     readCommitted = true;
  142.     break;
  143.   default:
  144.     setErrorCode(4003);
  145.     return 0;
  146.   }
  147.   m_keyInfo = lockExcl ? 1 : 0;
  148.   bool range = false;
  149.   if (m_accessTable->m_indexType == NdbDictionary::Index::OrderedIndex ||
  150.       m_accessTable->m_indexType == NdbDictionary::Index::UniqueOrderedIndex){
  151.     if (m_currentTable == m_accessTable){
  152.       // Old way of scanning indexes, should not be allowed
  153.       m_currentTable = theNdb->theDictionary->
  154. getTable(m_currentTable->m_primaryTable.c_str());
  155.       assert(m_currentTable != NULL);
  156.     }
  157.     assert (m_currentTable != m_accessTable);
  158.     // Modify operation state
  159.     theStatus = GetValue;
  160.     theOperationType  = OpenRangeScanRequest;
  161.     range = true;
  162.   }
  163.   
  164.   theParallelism = parallel;
  165.   if(fix_receivers(parallel) == -1){
  166.     setErrorCodeAbort(4000);
  167.     return 0;
  168.   }
  169.   
  170.   theSCAN_TABREQ = (!theSCAN_TABREQ ? theNdb->getSignal() : theSCAN_TABREQ);
  171.   if (theSCAN_TABREQ == NULL) {
  172.     setErrorCodeAbort(4000);
  173.     return 0;
  174.   }//if
  175.   
  176.   ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
  177.   req->apiConnectPtr = theNdbCon->theTCConPtr;
  178.   req->tableId = m_accessTable->m_tableId;
  179.   req->tableSchemaVersion = m_accessTable->m_version;
  180.   req->storedProcId = 0xFFFF;
  181.   req->buddyConPtr = theNdbCon->theBuddyConPtr;
  182.   
  183.   Uint32 reqInfo = 0;
  184.   ScanTabReq::setParallelism(reqInfo, parallel);
  185.   ScanTabReq::setScanBatch(reqInfo, 0);
  186.   ScanTabReq::setLockMode(reqInfo, lockExcl);
  187.   ScanTabReq::setHoldLockFlag(reqInfo, lockHoldMode);
  188.   ScanTabReq::setReadCommittedFlag(reqInfo, readCommitted);
  189.   ScanTabReq::setRangeScanFlag(reqInfo, range);
  190.   req->requestInfo = reqInfo;
  191.   Uint64 transId = theNdbCon->getTransactionId();
  192.   req->transId1 = (Uint32) transId;
  193.   req->transId2 = (Uint32) (transId >> 32);
  194.   NdbApiSignal* tSignal = 
  195.     theFirstKEYINFO;
  196.   theFirstKEYINFO = (tSignal ? tSignal : tSignal = theNdb->getSignal());
  197.   theLastKEYINFO = tSignal;
  198.   
  199.   tSignal->setSignal(GSN_KEYINFO);
  200.   theKEYINFOptr = ((KeyInfo*)tSignal->getDataPtrSend())->keyData;
  201.   theTotalNrOfKeyWordInSignal= 0;
  202.   getFirstATTRINFOScan();
  203.   return getResultSet();
  204. }
  205. int
  206. NdbScanOperation::fix_receivers(Uint32 parallel){
  207.   assert(parallel > 0);
  208.   if(parallel > m_allocated_receivers){
  209.     const Uint32 sz = parallel * (4*sizeof(char*)+sizeof(Uint32));
  210.     Uint64 * tmp = new Uint64[(sz+7)/8];
  211.     // Save old receivers
  212.     memcpy(tmp, m_receivers, m_allocated_receivers*sizeof(char*));
  213.     delete[] m_array;
  214.     m_array = (Uint32*)tmp;
  215.     
  216.     m_receivers = (NdbReceiver**)tmp;
  217.     m_api_receivers = m_receivers + parallel;
  218.     m_conf_receivers = m_api_receivers + parallel;
  219.     m_sent_receivers = m_conf_receivers + parallel;
  220.     m_prepared_receivers = (Uint32*)(m_sent_receivers + parallel);
  221.     // Only get/init "new" receivers
  222.     NdbReceiver* tScanRec;
  223.     for (Uint32 i = m_allocated_receivers; i < parallel; i ++) {
  224.       tScanRec = theNdb->getNdbScanRec();
  225.       if (tScanRec == NULL) {
  226. setErrorCodeAbort(4000);
  227. return -1;
  228.       }//if
  229.       m_receivers[i] = tScanRec;
  230.       tScanRec->init(NdbReceiver::NDB_SCANRECEIVER, this);
  231.     }
  232.     m_allocated_receivers = parallel;
  233.   }
  234.   
  235.   reset_receivers(parallel, 0);
  236.   return 0;
  237. }
  238. /**
  239.  * Move receiver from send array to conf:ed array
  240.  */
  241. void
  242. NdbScanOperation::receiver_delivered(NdbReceiver* tRec){
  243.   if(theError.code == 0){
  244.     if(DEBUG_NEXT_RESULT)
  245.       ndbout_c("receiver_delivered");
  246.     
  247.     Uint32 idx = tRec->m_list_index;
  248.     Uint32 last = m_sent_receivers_count - 1;
  249.     if(idx != last){
  250.       NdbReceiver * move = m_sent_receivers[last];
  251.       m_sent_receivers[idx] = move;
  252.       move->m_list_index = idx;
  253.     }
  254.     m_sent_receivers_count = last;
  255.     
  256.     last = m_conf_receivers_count;
  257.     m_conf_receivers[last] = tRec;
  258.     m_conf_receivers_count = last + 1;
  259.     tRec->m_list_index = last;
  260.     tRec->m_current_row = 0;
  261.   }
  262. }
  263. /**
  264.  * Remove receiver as it's completed
  265.  */
  266. void
  267. NdbScanOperation::receiver_completed(NdbReceiver* tRec){
  268.   if(theError.code == 0){
  269.     if(DEBUG_NEXT_RESULT)
  270.       ndbout_c("receiver_completed");
  271.     
  272.     Uint32 idx = tRec->m_list_index;
  273.     Uint32 last = m_sent_receivers_count - 1;
  274.     if(idx != last){
  275.       NdbReceiver * move = m_sent_receivers[last];
  276.       m_sent_receivers[idx] = move;
  277.       move->m_list_index = idx;
  278.     }
  279.     m_sent_receivers_count = last;
  280.   }
  281. }
  282. /*****************************************************************************
  283.  * int getFirstATTRINFOScan( U_int32 aData )
  284.  *
  285.  * Return Value:  Return 0:   Successful
  286.  *         Return -1:  All other cases
  287.  * Parameters:    None:     Only allocate the first signal.
  288.  * Remark:        When a scan is defined we need to use this method instead 
  289.  *                of insertATTRINFO for the first signal. 
  290.  *                This is because we need not to mess up the code in 
  291.  *                insertATTRINFO with if statements since we are not 
  292.  *                interested in the TCKEYREQ signal.
  293.  *****************************************************************************/
  294. int
  295. NdbScanOperation::getFirstATTRINFOScan()
  296. {
  297.   NdbApiSignal* tSignal;
  298.   tSignal = theNdb->getSignal();
  299.   if (tSignal == NULL){
  300.     setErrorCodeAbort(4000);      
  301.     return -1;    
  302.   }
  303.   tSignal->setSignal(m_attrInfoGSN);
  304.   theAI_LenInCurrAI = 8;
  305.   theATTRINFOptr = &tSignal->getDataPtrSend()[8];
  306.   theFirstATTRINFO = tSignal;
  307.   theCurrentATTRINFO = tSignal;
  308.   theCurrentATTRINFO->next(NULL);
  309.   return 0;
  310. }
  311. /**
  312.  * Constats for theTupleKeyDefined[][0]
  313.  */
  314. #define SETBOUND_EQ 1
  315. #define FAKE_PTR 2
  316. #define API_PTR 3
  317. /*
  318.  * After setBound() are done, move the accumulated ATTRINFO signals to
  319.  * a separate list.  Then continue with normal scan.
  320.  */
  321. #if 0
  322. int
  323. NdbIndexScanOperation::saveBoundATTRINFO()
  324. {
  325.   theCurrentATTRINFO->setLength(theAI_LenInCurrAI);
  326.   theBoundATTRINFO = theFirstATTRINFO;
  327.   theTotalBoundAI_Len = theTotalCurrAI_Len;
  328.   theTotalCurrAI_Len = 5;
  329.   theBoundATTRINFO->setData(theTotalBoundAI_Len, 4);
  330.   theBoundATTRINFO->setData(0, 5);
  331.   theBoundATTRINFO->setData(0, 6);
  332.   theBoundATTRINFO->setData(0, 7);
  333.   theBoundATTRINFO->setData(0, 8);
  334.   theStatus = GetValue;
  335.   int res = getFirstATTRINFOScan();
  336.   /**
  337.    * Define each key with getValue (if ordered)
  338.    *   unless the one's with EqBound
  339.    */
  340.   if(!res && m_ordered){
  341.     /**
  342.      * If setBound EQ
  343.      */
  344.     Uint32 i = 0;
  345.     while(theTupleKeyDefined[i][0] == SETBOUND_EQ)
  346.       i++;
  347.     
  348.     
  349.     Uint32 cnt = m_accessTable->getNoOfColumns() - 1;
  350.     m_sort_columns = cnt - i;
  351.     for(; i<cnt; i++){
  352.       const NdbColumnImpl* key = m_accessTable->m_index->m_columns[i];
  353.       const NdbColumnImpl* col = m_currentTable->getColumn(key->m_keyInfoPos);
  354.       NdbRecAttr* tmp = NdbScanOperation::getValue_impl(col, (char*)-1);
  355.       UintPtr newVal = UintPtr(tmp);
  356.       theTupleKeyDefined[i][0] = FAKE_PTR;
  357.       theTupleKeyDefined[i][1] = (newVal & 0xFFFFFFFF);
  358. #if (SIZEOF_CHARP == 8)
  359.       theTupleKeyDefined[i][2] = (newVal >> 32);
  360. #endif
  361.     }
  362.   }
  363.   return res;
  364. }
  365. #endif
  366. #define WAITFOR_SCAN_TIMEOUT 120000
  367. int
  368. NdbScanOperation::executeCursor(int nodeId){
  369.   NdbConnection * tCon = theNdbCon;
  370.   TransporterFacade* tp = TransporterFacade::instance();
  371.   Guard guard(tp->theMutexPtr);
  372.   Uint32 magic = tCon->theMagicNumber;
  373.   Uint32 seq = tCon->theNodeSequence;
  374.   if (tp->get_node_alive(nodeId) &&
  375.       (tp->getNodeSequence(nodeId) == seq)) {
  376.     /**
  377.      * Only call prepareSendScan first time (incase of restarts)
  378.      *   - check with theMagicNumber
  379.      */
  380.     tCon->theMagicNumber = 0x37412619;
  381.     if(magic != 0x37412619 && 
  382.        prepareSendScan(tCon->theTCConPtr, tCon->theTransactionId) == -1)
  383.       return -1;
  384.     
  385.     
  386.     if (doSendScan(nodeId) == -1)
  387.       return -1;
  388.     return 0;
  389.   } else {
  390.     if (!(tp->get_node_stopping(nodeId) &&
  391.   (tp->getNodeSequence(nodeId) == seq))){
  392.       TRACE_DEBUG("The node is hard dead when attempting to start a scan");
  393.       setErrorCode(4029);
  394.       tCon->theReleaseOnClose = true;
  395.     } else {
  396.       TRACE_DEBUG("The node is stopping when attempting to start a scan");
  397.       setErrorCode(4030);
  398.     }//if
  399.     tCon->theCommitStatus = NdbConnection::Aborted;
  400.   }//if
  401.   return -1;
  402. }
  403. int NdbScanOperation::nextResult(bool fetchAllowed, bool forceSend)
  404. {
  405.   if(m_ordered)
  406.     return ((NdbIndexScanOperation*)this)->next_result_ordered(fetchAllowed,
  407.        forceSend);
  408.   
  409.   /**
  410.    * Check current receiver
  411.    */
  412.   int retVal = 2;
  413.   Uint32 idx = m_current_api_receiver;
  414.   Uint32 last = m_api_receivers_count;
  415.   if(DEBUG_NEXT_RESULT)
  416.     ndbout_c("nextResult(%d) idx=%d last=%d", fetchAllowed, idx, last);
  417.   
  418.   /**
  419.    * Check next buckets
  420.    */
  421.   for(; idx < last; idx++){
  422.     NdbReceiver* tRec = m_api_receivers[idx];
  423.     if(tRec->nextResult()){
  424.       tRec->copyout(theReceiver);      
  425.       retVal = 0;
  426.       break;
  427.     }
  428.   }
  429.     
  430.   /**
  431.    * We have advanced atleast one bucket
  432.    */
  433.   if(!fetchAllowed || !retVal){
  434.     m_current_api_receiver = idx;
  435.     if(DEBUG_NEXT_RESULT) ndbout_c("return %d", retVal);
  436.     return retVal;
  437.   }
  438.   
  439.   Uint32 nodeId = theNdbCon->theDBnode;
  440.   TransporterFacade* tp = TransporterFacade::instance();
  441.   Guard guard(tp->theMutexPtr);
  442.   if(theError.code)
  443.     return -1;
  444.   Uint32 seq = theNdbCon->theNodeSequence;
  445.   if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false,
  446.   forceSend) == 0){
  447.       
  448.     idx = m_current_api_receiver;
  449.     last = m_api_receivers_count;
  450.       
  451.     do {
  452.       if(theError.code){
  453. setErrorCode(theError.code);
  454. if(DEBUG_NEXT_RESULT) ndbout_c("return -1");
  455. return -1;
  456.       }
  457.       
  458.       Uint32 cnt = m_conf_receivers_count;
  459.       Uint32 sent = m_sent_receivers_count;
  460.       if(DEBUG_NEXT_RESULT)
  461. ndbout_c("idx=%d last=%d cnt=%d sent=%d", idx, last, cnt, sent);
  462.       if(cnt > 0){
  463. /**
  464.  * Just move completed receivers
  465.  */
  466. memcpy(m_api_receivers+last, m_conf_receivers, cnt * sizeof(char*));
  467. last += cnt;
  468. m_conf_receivers_count = 0;
  469.       } else if(retVal == 2 && sent > 0){
  470. /**
  471.  * No completed...
  472.  */
  473. theNdb->theImpl->theWaiter.m_node = nodeId;
  474. theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
  475. int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
  476. if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) {
  477.   continue;
  478. } else {
  479.   idx = last;
  480.   retVal = -2; //return_code;
  481. }
  482.       } else if(retVal == 2){
  483. /**
  484.  * No completed & no sent -> EndOfData
  485.  */
  486. theError.code = -1; // make sure user gets error if he tries again
  487. if(DEBUG_NEXT_RESULT) ndbout_c("return 1");
  488. return 1;
  489.       }
  490.       if(retVal == 0)
  491. break;
  492.       for(; idx < last; idx++){
  493. NdbReceiver* tRec = m_api_receivers[idx];
  494. if(tRec->nextResult()){
  495.   tRec->copyout(theReceiver);      
  496.   retVal = 0;
  497.   break;
  498. }
  499.       }
  500.     } while(retVal == 2);
  501.   } else {
  502.     retVal = -3;
  503.   }
  504.     
  505.   m_api_receivers_count = last;
  506.   m_current_api_receiver = idx;
  507.     
  508.   switch(retVal){
  509.   case 0:
  510.   case 1:
  511.   case 2:
  512.     if(DEBUG_NEXT_RESULT) ndbout_c("return %d", retVal);
  513.     return retVal;
  514.   case -1:
  515.     setErrorCode(4008); // Timeout
  516.     break;
  517.   case -2:
  518.     setErrorCode(4028); // Node fail
  519.     break;
  520.   case -3: // send_next_scan -> return fail (set error-code self)
  521.     if(theError.code == 0)
  522.       setErrorCode(4028); // seq changed = Node fail
  523.     break;
  524.   }
  525.     
  526.   theNdbCon->theTransactionIsStarted = false;
  527.   theNdbCon->theReleaseOnClose = true;
  528.   if(DEBUG_NEXT_RESULT) ndbout_c("return -1", retVal);
  529.   return -1;
  530. }
  531. int
  532. NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag,
  533.  bool forceSend){  
  534.   if(cnt > 0){
  535.     NdbApiSignal tSignal(theNdb->theMyRef);
  536.     tSignal.setSignal(GSN_SCAN_NEXTREQ);
  537.     
  538.     Uint32* theData = tSignal.getDataPtrSend();
  539.     theData[0] = theNdbCon->theTCConPtr;
  540.     theData[1] = stopScanFlag == true ? 1 : 0;
  541.     Uint64 transId = theNdbCon->theTransactionId;
  542.     theData[2] = transId;
  543.     theData[3] = (Uint32) (transId >> 32);
  544.     
  545.     /**
  546.      * Prepare ops
  547.      */
  548.     Uint32 last = m_sent_receivers_count;
  549.     Uint32 * prep_array = (cnt > 21 ? m_prepared_receivers : theData + 4);
  550.     Uint32 sent = 0;
  551.     for(Uint32 i = 0; i<cnt; i++){
  552.       NdbReceiver * tRec = m_api_receivers[i];
  553.       if((prep_array[sent] = tRec->m_tcPtrI) != RNIL)
  554.       {
  555. m_sent_receivers[last+sent] = tRec;
  556. tRec->m_list_index = last+sent;
  557. tRec->prepareSend();
  558. sent++;
  559.       }
  560.     }
  561.     memmove(m_api_receivers, m_api_receivers+cnt, 
  562.     (theParallelism-cnt) * sizeof(char*));
  563.     
  564.     int ret = 0;
  565.     if(sent)
  566.     {
  567.       Uint32 nodeId = theNdbCon->theDBnode;
  568.       TransporterFacade * tp = TransporterFacade::instance();
  569.       if(cnt > 21){
  570. tSignal.setLength(4);
  571. LinearSectionPtr ptr[3];
  572. ptr[0].p = prep_array;
  573. ptr[0].sz = sent;
  574. ret = tp->sendSignal(&tSignal, nodeId, ptr, 1);
  575.       } else {
  576. tSignal.setLength(4+sent);
  577. ret = tp->sendSignal(&tSignal, nodeId);
  578.       }
  579.     }
  580.     
  581.     if (!ret) checkForceSend(forceSend);
  582.     m_sent_receivers_count = last + sent;
  583.     m_api_receivers_count -= cnt;
  584.     m_current_api_receiver = 0;
  585.     
  586.     return ret;
  587.   }
  588.   return 0;
  589. }
  590. void NdbScanOperation::checkForceSend(bool forceSend)
  591. {
  592.   if (forceSend) {
  593.     TransporterFacade::instance()->forceSend(theNdb->theNdbBlockNumber);
  594.   } else {
  595.     TransporterFacade::instance()->checkForceSend(theNdb->theNdbBlockNumber);
  596.   }//if
  597. }
  598. int 
  599. NdbScanOperation::prepareSend(Uint32  TC_ConnectPtr, Uint64  TransactionId)
  600. {
  601.   printf("NdbScanOperation::prepareSendn");
  602.   abort();
  603.   return 0;
  604. }
  605. int 
  606. NdbScanOperation::doSend(int ProcessorId)
  607. {
  608.   printf("NdbScanOperation::doSendn");
  609.   return 0;
  610. }
  611. void NdbScanOperation::closeScan(bool forceSend, bool releaseOp)
  612. {
  613.   if(m_transConnection){
  614.     if(DEBUG_NEXT_RESULT)
  615.       ndbout_c("closeScan() theError.code = %d "
  616.        "m_api_receivers_count = %d "
  617.        "m_conf_receivers_count = %d "
  618.        "m_sent_receivers_count = %d",
  619.        theError.code, 
  620.        m_api_receivers_count,
  621.        m_conf_receivers_count,
  622.        m_sent_receivers_count);
  623.     
  624.     TransporterFacade* tp = TransporterFacade::instance();
  625.     Guard guard(tp->theMutexPtr);
  626.     close_impl(tp, forceSend);
  627.     
  628.   }
  629.   NdbConnection* tCon = theNdbCon;
  630.   NdbConnection* tTransCon = m_transConnection;
  631.   theNdbCon = NULL;
  632.   m_transConnection = NULL;
  633.   if (releaseOp && tTransCon) {
  634.     NdbIndexScanOperation* tOp = (NdbIndexScanOperation*)this;
  635.     tTransCon->releaseExecutedScanOperation(tOp);
  636.   }
  637.   
  638.   tCon->theScanningOp = 0;
  639.   theNdb->closeTransaction(tCon);
  640. }
  641. void
  642. NdbScanOperation::execCLOSE_SCAN_REP(){
  643.   m_conf_receivers_count = 0;
  644.   m_sent_receivers_count = 0;
  645. }
  646. void NdbScanOperation::release()
  647. {
  648.   if(theNdbCon != 0 || m_transConnection != 0){
  649.     closeScan();
  650.   }
  651.   for(Uint32 i = 0; i<m_allocated_receivers; i++){
  652.     m_receivers[i]->release();
  653.   }
  654.   if(theSCAN_TABREQ)
  655.   {
  656.     theNdb->releaseSignal(theSCAN_TABREQ);
  657.     theSCAN_TABREQ = 0;
  658.   }
  659.   NdbOperation::release();
  660. }
  661. /***************************************************************************
  662. int prepareSendScan(Uint32 aTC_ConnectPtr,
  663.                     Uint64 aTransactionId)
  664. Return Value:   Return 0 : preparation of send was succesful.
  665.                 Return -1: In all other case.   
  666. Parameters:     aTC_ConnectPtr: the Connect pointer to TC.
  667. aTransactionId: the Transaction identity of the transaction.
  668. Remark:         Puts the the final data into ATTRINFO signal(s)  after this 
  669.                 we know the how many signal to send and their sizes
  670. ***************************************************************************/
  671. int NdbScanOperation::prepareSendScan(Uint32 aTC_ConnectPtr,
  672.       Uint64 aTransactionId){
  673.   if (theInterpretIndicator != 1 ||
  674.       (theOperationType != OpenScanRequest &&
  675.        theOperationType != OpenRangeScanRequest)) {
  676.     setErrorCodeAbort(4005);
  677.     return -1;
  678.   }
  679.   theErrorLine = 0;
  680.   // In preapareSendInterpreted we set the sizes (word 4-8) in the
  681.   // first ATTRINFO signal.
  682.   if (prepareSendInterpreted() == -1)
  683.     return -1;
  684.   
  685.   if(m_ordered){
  686.     ((NdbIndexScanOperation*)this)->fix_get_values();
  687.   }
  688.   
  689.   theCurrentATTRINFO->setLength(theAI_LenInCurrAI);
  690.   /**
  691.    * Prepare all receivers
  692.    */
  693.   theReceiver.prepareSend();
  694.   bool keyInfo = m_keyInfo;
  695.   Uint32 key_size = keyInfo ? m_currentTable->m_keyLenInWords : 0;
  696.   /**
  697.    * The number of records sent by each LQH is calculated and the kernel
  698.    * is informed of this number by updating the SCAN_TABREQ signal
  699.    */
  700.   Uint32 batch_size, batch_byte_size, first_batch_size;
  701.   theReceiver.calculate_batch_size(key_size,
  702.                                    theParallelism,
  703.                                    batch_size,
  704.                                    batch_byte_size,
  705.                                    first_batch_size);
  706.   ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
  707.   ScanTabReq::setScanBatch(req->requestInfo, batch_size);
  708.   req->batch_byte_size= batch_byte_size;
  709.   req->first_batch_size= first_batch_size;
  710.   /**
  711.    * Set keyinfo flag
  712.    *  (Always keyinfo when using blobs)
  713.    */
  714.   Uint32 reqInfo = req->requestInfo;
  715.   ScanTabReq::setKeyinfoFlag(reqInfo, keyInfo);
  716.   req->requestInfo = reqInfo;
  717.   
  718.   for(Uint32 i = 0; i<theParallelism; i++){
  719.     m_receivers[i]->do_get_value(&theReceiver, batch_size, key_size);
  720.   }
  721.   return 0;
  722. }
  723. /*****************************************************************************
  724. int doSend()
  725. Return Value:   Return >0 : send was succesful, returns number of signals sent
  726.                 Return -1: In all other case.   
  727. Parameters:     aProcessorId: Receiving processor node
  728. Remark:         Sends the ATTRINFO signal(s)
  729. *****************************************************************************/
  730. int
  731. NdbScanOperation::doSendScan(int aProcessorId)
  732. {
  733.   Uint32 tSignalCount = 0;
  734.   NdbApiSignal* tSignal;
  735.  
  736.   if (theInterpretIndicator != 1 ||
  737.       (theOperationType != OpenScanRequest &&
  738.        theOperationType != OpenRangeScanRequest)) {
  739.       setErrorCodeAbort(4005);
  740.       return -1;
  741.   }
  742.   
  743.   assert(theSCAN_TABREQ != NULL);
  744.   tSignal = theSCAN_TABREQ;
  745.   if (tSignal->setSignal(GSN_SCAN_TABREQ) == -1) {
  746.     setErrorCode(4001);
  747.     return -1;
  748.   }
  749.   
  750.   Uint32 tupKeyLen = theTupKeyLen;
  751.   Uint32 len = theTotalNrOfKeyWordInSignal;
  752.   Uint32 aTC_ConnectPtr = theNdbCon->theTCConPtr;
  753.   Uint64 transId = theNdbCon->theTransactionId;
  754.   
  755.   // Update the "attribute info length in words" in SCAN_TABREQ before 
  756.   // sending it. This could not be done in openScan because 
  757.   // we created the ATTRINFO signals after the SCAN_TABREQ signal.
  758.   ScanTabReq * const req = CAST_PTR(ScanTabReq, tSignal->getDataPtrSend());
  759.   req->attrLenKeyLen = (tupKeyLen << 16) | theTotalCurrAI_Len;
  760.   
  761.   TransporterFacade *tp = TransporterFacade::instance();
  762.   LinearSectionPtr ptr[3];
  763.   ptr[0].p = m_prepared_receivers;
  764.   ptr[0].sz = theParallelism;
  765.   if (tp->sendSignal(tSignal, aProcessorId, ptr, 1) == -1) {
  766.     setErrorCode(4002);
  767.     return -1;
  768.   } 
  769.   if (tupKeyLen > 0){
  770.     // must have at least one signal since it contains attrLen for bounds
  771.     assert(theLastKEYINFO != NULL);
  772.     tSignal = theLastKEYINFO;
  773.     tSignal->setLength(KeyInfo::HeaderLength + theTotalNrOfKeyWordInSignal);
  774.     
  775.     assert(theFirstKEYINFO != NULL);
  776.     tSignal = theFirstKEYINFO;
  777.     
  778.     NdbApiSignal* last;
  779.     do {
  780.       KeyInfo * keyInfo = CAST_PTR(KeyInfo, tSignal->getDataPtrSend());
  781.       keyInfo->connectPtr = aTC_ConnectPtr;
  782.       keyInfo->transId[0] = Uint32(transId);
  783.       keyInfo->transId[1] = Uint32(transId >> 32);
  784.       
  785.       if (tp->sendSignal(tSignal,aProcessorId) == -1){
  786. setErrorCode(4002);
  787. return -1;
  788.       }
  789.       
  790.       tSignalCount++;
  791.       last = tSignal;
  792.       tSignal = tSignal->next();
  793.     } while(last != theLastKEYINFO);
  794.   }
  795.   
  796.   tSignal = theFirstATTRINFO;
  797.   while (tSignal != NULL) {
  798.     AttrInfo * attrInfo = CAST_PTR(AttrInfo, tSignal->getDataPtrSend());
  799.     attrInfo->connectPtr = aTC_ConnectPtr;
  800.     attrInfo->transId[0] = Uint32(transId);
  801.     attrInfo->transId[1] = Uint32(transId >> 32);
  802.     
  803.     if (tp->sendSignal(tSignal,aProcessorId) == -1){
  804.       setErrorCode(4002);
  805.       return -1;
  806.     }
  807.     tSignalCount++;
  808.     tSignal = tSignal->next();
  809.   }    
  810.   theStatus = WaitResponse;  
  811.   m_sent_receivers_count = theParallelism;
  812.   if(m_ordered)
  813.   {
  814.     m_current_api_receiver = theParallelism;
  815.     m_api_receivers_count = theParallelism;
  816.   }
  817.   
  818.   return tSignalCount;
  819. }//NdbOperation::doSendScan()
  820. /*****************************************************************************
  821.  * NdbOperation* takeOverScanOp(NdbConnection* updateTrans);
  822.  *
  823.  * Parameters:     The update transactions NdbConnection pointer.
  824.  * Return Value:   A reference to the transferred operation object 
  825.  *                   or NULL if no success.
  826.  * Remark:         Take over the scanning transactions NdbOperation 
  827.  *                 object for a tuple to an update transaction, 
  828.  *                 which is the last operation read in nextScanResult()
  829.  *    (theNdbCon->thePreviousScanRec)
  830.  *
  831.  *     FUTURE IMPLEMENTATION:   (This note was moved from header file.)
  832.  *     In the future, it will even be possible to transfer 
  833.  *     to a NdbConnection on another Ndb-object.  
  834.  *     In this case the receiving NdbConnection-object must call 
  835.  *     a method receiveOpFromScan to actually receive the information.  
  836.  *     This means that the updating transactions can be placed
  837.  *     in separate threads and thus increasing the parallelism during
  838.  *     the scan process. 
  839.  ****************************************************************************/
  840. int
  841. NdbScanOperation::getKeyFromKEYINFO20(Uint32* data, unsigned size)
  842. {
  843.   Uint32 idx = m_current_api_receiver;
  844.   Uint32 last = m_api_receivers_count;
  845.   Uint32 row;
  846.   NdbReceiver * tRec;
  847.   NdbRecAttr * tRecAttr;
  848.   if(idx < last && (tRec = m_api_receivers[idx]) 
  849.      && ((row = tRec->m_current_row) <= tRec->m_defined_rows)
  850.      && (tRecAttr = tRec->m_rows[row-1])){
  851.     const Uint32 * src = (Uint32*)tRecAttr->aRef();
  852.     memcpy(data, src, 4*size);
  853.     return 0;
  854.   }
  855.   return -1;
  856. }
  857. NdbOperation*
  858. NdbScanOperation::takeOverScanOp(OperationType opType, NdbConnection* pTrans){
  859.   
  860.   Uint32 idx = m_current_api_receiver;
  861.   Uint32 last = m_api_receivers_count;
  862.   Uint32 row;
  863.   NdbReceiver * tRec;
  864.   NdbRecAttr * tRecAttr;
  865.   if(idx < last && (tRec = m_api_receivers[idx]) 
  866.      && ((row = tRec->m_current_row) <= tRec->m_defined_rows)
  867.      && (tRecAttr = tRec->m_rows[row-1])){
  868.     
  869.     NdbOperation * newOp = pTrans->getNdbOperation(m_currentTable);
  870.     if (newOp == NULL){
  871.       return NULL;
  872.     }
  873.     pTrans->theSimpleState = 0;
  874.     
  875.     const Uint32 len = (tRecAttr->attrSize() * tRecAttr->arraySize() + 3)/4-1;
  876.     newOp->theTupKeyLen = len;
  877.     newOp->theOperationType = opType;
  878.     if (opType == DeleteRequest) {
  879.       newOp->theStatus = GetValue;  
  880.     } else {
  881.       newOp->theStatus = SetValue;  
  882.     }
  883.     
  884.     const Uint32 * src = (Uint32*)tRecAttr->aRef();
  885.     const Uint32 tScanInfo = src[len] & 0x3FFFF;
  886.     const Uint32 tTakeOverNode = src[len] >> 20;
  887.     {
  888.       UintR scanInfo = 0;
  889.       TcKeyReq::setTakeOverScanFlag(scanInfo, 1);
  890.       TcKeyReq::setTakeOverScanNode(scanInfo, tTakeOverNode);
  891.       TcKeyReq::setTakeOverScanInfo(scanInfo, tScanInfo);
  892.       newOp->theScanInfo = scanInfo;
  893.     }
  894.     // Copy the first 8 words of key info from KEYINF20 into TCKEYREQ
  895.     TcKeyReq * tcKeyReq = CAST_PTR(TcKeyReq,newOp->theTCREQ->getDataPtrSend());
  896.     Uint32 i = 0;
  897.     for (i = 0; i < TcKeyReq::MaxKeyInfo && i < len; i++) {
  898.       tcKeyReq->keyInfo[i] = * src++;
  899.     }
  900.     
  901.     if(i < len){
  902.       NdbApiSignal* tSignal = theNdb->getSignal();
  903.       newOp->theFirstKEYINFO = tSignal;      
  904.       
  905.       Uint32 left = len - i;
  906.       while(tSignal && left > KeyInfo::DataLength){
  907. tSignal->setSignal(GSN_KEYINFO);
  908. KeyInfo * keyInfo = CAST_PTR(KeyInfo, tSignal->getDataPtrSend());
  909. memcpy(keyInfo->keyData, src, 4 * KeyInfo::DataLength);
  910. src += KeyInfo::DataLength;
  911. left -= KeyInfo::DataLength;
  912. tSignal->next(theNdb->getSignal());
  913. tSignal = tSignal->next();
  914.       }
  915.       if(tSignal && left > 0){
  916. tSignal->setSignal(GSN_KEYINFO);
  917. KeyInfo * keyInfo = CAST_PTR(KeyInfo, tSignal->getDataPtrSend());
  918. memcpy(keyInfo->keyData, src, 4 * left);
  919.       }      
  920.     }
  921.     // create blob handles automatically
  922.     if (opType == DeleteRequest && m_currentTable->m_noOfBlobs != 0) {
  923.       for (unsigned i = 0; i < m_currentTable->m_columns.size(); i++) {
  924. NdbColumnImpl* c = m_currentTable->m_columns[i];
  925. assert(c != 0);
  926. if (c->getBlobType()) {
  927.   if (newOp->getBlobHandle(pTrans, c) == NULL)
  928.     return NULL;
  929. }
  930.       }
  931.     }
  932.     
  933.     return newOp;
  934.   }
  935.   return 0;
  936. }
  937. NdbBlob*
  938. NdbScanOperation::getBlobHandle(const char* anAttrName)
  939. {
  940.   m_keyInfo = 1;
  941.   return NdbOperation::getBlobHandle(m_transConnection, 
  942.      m_currentTable->getColumn(anAttrName));
  943. }
  944. NdbBlob*
  945. NdbScanOperation::getBlobHandle(Uint32 anAttrId)
  946. {
  947.   m_keyInfo = 1;
  948.   return NdbOperation::getBlobHandle(m_transConnection, 
  949.      m_currentTable->getColumn(anAttrId));
  950. }
  951. NdbIndexScanOperation::NdbIndexScanOperation(Ndb* aNdb)
  952.   : NdbScanOperation(aNdb)
  953. {
  954. }
  955. NdbIndexScanOperation::~NdbIndexScanOperation(){
  956. }
  957. int
  958. NdbIndexScanOperation::setBound(const char* anAttrName, int type, 
  959. const void* aValue, Uint32 len)
  960. {
  961.   return setBound(m_accessTable->getColumn(anAttrName), type, aValue, len);
  962. }
  963. int
  964. NdbIndexScanOperation::setBound(Uint32 anAttrId, int type, 
  965. const void* aValue, Uint32 len)
  966. {
  967.   return setBound(m_accessTable->getColumn(anAttrId), type, aValue, len);
  968. }
  969. int
  970. NdbIndexScanOperation::equal_impl(const NdbColumnImpl* anAttrObject, 
  971.   const char* aValue, 
  972.   Uint32 len){
  973.   return setBound(anAttrObject, BoundEQ, aValue, len);
  974. }
  975. NdbRecAttr*
  976. NdbIndexScanOperation::getValue_impl(const NdbColumnImpl* attrInfo, 
  977.      char* aValue){
  978.   if(!m_ordered){
  979.     return NdbScanOperation::getValue_impl(attrInfo, aValue);
  980.   }
  981.   
  982.   int id = attrInfo->m_attrId;                // In "real" table
  983.   assert(m_accessTable->m_index);
  984.   int sz = (int)m_accessTable->m_index->m_key_ids.size();
  985.   if(id >= sz || (id = m_accessTable->m_index->m_key_ids[id]) == -1){
  986.     return NdbScanOperation::getValue_impl(attrInfo, aValue);
  987.   }
  988.   
  989.   assert(id < NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY);
  990.   Uint32 marker = theTupleKeyDefined[id][0];
  991.   
  992.   if(marker == SETBOUND_EQ){
  993.     return NdbScanOperation::getValue_impl(attrInfo, aValue);
  994.   } else if(marker == API_PTR){
  995.     return NdbScanOperation::getValue_impl(attrInfo, aValue);
  996.   }
  997.   
  998.   assert(marker == FAKE_PTR);
  999.   
  1000.   UintPtr oldVal;
  1001.   oldVal = theTupleKeyDefined[id][1];
  1002. #if (SIZEOF_CHARP == 8)
  1003.   oldVal = oldVal | (((UintPtr)theTupleKeyDefined[id][2]) << 32);
  1004. #endif
  1005.   theTupleKeyDefined[id][0] = API_PTR;
  1006.   NdbRecAttr* tmp = (NdbRecAttr*)oldVal;
  1007.   tmp->setup(attrInfo, aValue);
  1008.   return tmp;
  1009. }
  1010. #include <AttributeHeader.hpp>
  1011. /*
  1012.  * Define bound on index column in range scan.
  1013.  */
  1014. int
  1015. NdbIndexScanOperation::setBound(const NdbColumnImpl* tAttrInfo, 
  1016. int type, const void* aValue, Uint32 len)
  1017. {
  1018.   if (theOperationType == OpenRangeScanRequest &&
  1019.       (0 <= type && type <= 4) &&
  1020.       len <= 8000) {
  1021.     // insert bound type
  1022.     Uint32 currLen = theTotalNrOfKeyWordInSignal;
  1023.     Uint32 remaining = KeyInfo::DataLength - currLen;
  1024.     Uint32 sizeInBytes = tAttrInfo->m_attrSize * tAttrInfo->m_arraySize;
  1025.     // normalize char bound
  1026.     CHARSET_INFO* cs = tAttrInfo->m_cs;
  1027.     Uint32 xfrmData[2000];
  1028.     if (cs != NULL && aValue != NULL) {
  1029.       // current limitation: strxfrm does not increase length
  1030.       assert(cs->strxfrm_multiply <= 1);
  1031.       unsigned n =
  1032.       (*cs->coll->strnxfrm)(cs,
  1033.                             (uchar*)xfrmData, sizeof(xfrmData),
  1034.                             (const uchar*)aValue, sizeInBytes);
  1035.       while (n < sizeInBytes)
  1036.         ((uchar*)xfrmData)[n++] = 0x20;
  1037.       aValue = (char*)xfrmData;
  1038.     }
  1039.     if (len != sizeInBytes && (len != 0)) {
  1040.       setErrorCodeAbort(4209);
  1041.       return -1;
  1042.     }
  1043.     // insert attribute header
  1044.     len = aValue != NULL ? sizeInBytes : 0;
  1045.     Uint32 tIndexAttrId = tAttrInfo->m_attrId;
  1046.     Uint32 sizeInWords = (len + 3) / 4;
  1047.     AttributeHeader ah(tIndexAttrId, sizeInWords);
  1048.     const Uint32 ahValue = ah.m_value;
  1049.     const bool aligned = (UintPtr(aValue) & 3) == 0;
  1050.     const bool nobytes = (len & 0x3) == 0;
  1051.     const Uint32 totalLen = 2 + sizeInWords;
  1052.     Uint32 tupKeyLen = theTupKeyLen;
  1053.     if(remaining > totalLen &&  aligned && nobytes){
  1054.       Uint32 * dst = theKEYINFOptr + currLen;
  1055.       * dst ++ = type;
  1056.       * dst ++ = ahValue;
  1057.       memcpy(dst, aValue, 4 * sizeInWords);
  1058.       theTotalNrOfKeyWordInSignal = currLen + totalLen;
  1059.     } else {
  1060.       if(!aligned || !nobytes){
  1061. Uint32 tempData[2002];
  1062. tempData[0] = type;
  1063. tempData[1] = ahValue;
  1064.         memcpy(tempData+2, aValue, len);
  1065.         while ((len & 0x3) != 0)
  1066.           ((char*)&tempData[2])[len++] = 0;
  1067. insertBOUNDS(tempData, 2+sizeInWords);
  1068.       } else {
  1069. Uint32 buf[2] = { type, ahValue };
  1070. insertBOUNDS(buf, 2);
  1071. insertBOUNDS((Uint32*)aValue, sizeInWords);
  1072.       }
  1073.     }
  1074.     theTupKeyLen = tupKeyLen + totalLen;
  1075.     /**
  1076.      * Do sorted stuff
  1077.      */
  1078.     /**
  1079.      * The primary keys for an ordered index is defined in the beginning
  1080.      * so it's safe to use [tIndexAttrId] 
  1081.      * (instead of looping as is NdbOperation::equal_impl)
  1082.      */
  1083.     if(type == BoundEQ && !theTupleKeyDefined[tIndexAttrId][0]){
  1084.       theNoOfTupKeyDefined++;
  1085.       theTupleKeyDefined[tIndexAttrId][0] = SETBOUND_EQ;
  1086.     }
  1087.     
  1088.     return 0;
  1089.   } else {
  1090.     setErrorCodeAbort(4228);    // XXX wrong code
  1091.     return -1;
  1092.   }
  1093. }
  1094. int
  1095. NdbIndexScanOperation::insertBOUNDS(Uint32 * data, Uint32 sz){
  1096.   Uint32 len;
  1097.   Uint32 remaining = KeyInfo::DataLength - theTotalNrOfKeyWordInSignal;
  1098.   Uint32 * dst = theKEYINFOptr + theTotalNrOfKeyWordInSignal;
  1099.   do {
  1100.     len = (sz < remaining ? sz : remaining);
  1101.     memcpy(dst, data, 4 * len);
  1102.     
  1103.     if(sz >= remaining){
  1104.       NdbApiSignal* tCurr = theLastKEYINFO;
  1105.       tCurr->setLength(KeyInfo::MaxSignalLength);
  1106.       NdbApiSignal* tSignal = tCurr->next();
  1107.       if(tSignal)
  1108. ;
  1109.       else if((tSignal = theNdb->getSignal()) != 0)
  1110.       {
  1111. tCurr->next(tSignal);
  1112. tSignal->setSignal(GSN_KEYINFO);
  1113.       } else {
  1114. goto error;
  1115.       }
  1116.       theLastKEYINFO = tSignal;
  1117.       theKEYINFOptr = dst = ((KeyInfo*)tSignal->getDataPtrSend())->keyData;
  1118.       remaining = KeyInfo::DataLength;
  1119.       sz -= len;
  1120.       data += len;
  1121.     } else {
  1122.       len = (KeyInfo::DataLength - remaining) + len;
  1123.       break;
  1124.     }
  1125.   } while(true);   
  1126.   theTotalNrOfKeyWordInSignal = len;
  1127.   return 0;
  1128. error:
  1129.   setErrorCodeAbort(4228);    // XXX wrong code
  1130.   return -1;
  1131. }
  1132. NdbResultSet*
  1133. NdbIndexScanOperation::readTuples(LockMode lm,
  1134.   Uint32 batch,
  1135.   Uint32 parallel,
  1136.   bool order_by){
  1137.   NdbResultSet * rs = NdbScanOperation::readTuples(lm, batch, 0);
  1138.   if(rs && order_by){
  1139.     m_ordered = 1;
  1140.     Uint32 cnt = m_accessTable->getNoOfColumns() - 1;
  1141.     m_sort_columns = cnt; // -1 for NDB$NODE
  1142.     m_current_api_receiver = m_sent_receivers_count;
  1143.     m_api_receivers_count = m_sent_receivers_count;
  1144.     
  1145.     m_sort_columns = cnt;
  1146.     for(Uint32 i = 0; i<cnt; i++){
  1147.       const NdbColumnImpl* key = m_accessTable->m_index->m_columns[i];
  1148.       const NdbColumnImpl* col = m_currentTable->getColumn(key->m_keyInfoPos);
  1149.       NdbRecAttr* tmp = NdbScanOperation::getValue_impl(col, (char*)-1);
  1150.       UintPtr newVal = UintPtr(tmp);
  1151.       theTupleKeyDefined[i][0] = FAKE_PTR;
  1152.       theTupleKeyDefined[i][1] = (newVal & 0xFFFFFFFF);
  1153. #if (SIZEOF_CHARP == 8)
  1154.       theTupleKeyDefined[i][2] = (newVal >> 32);
  1155. #endif
  1156.     }
  1157.   }
  1158.   return rs;
  1159. }
  1160. void
  1161. NdbIndexScanOperation::fix_get_values(){
  1162.   /**
  1163.    * Loop through all getValues and set buffer pointer to "API" pointer
  1164.    */
  1165.   NdbRecAttr * curr = theReceiver.theFirstRecAttr;
  1166.   Uint32 cnt = m_accessTable->getNoOfColumns() - 1;
  1167.   assert(cnt <  NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY);
  1168.   
  1169.   const NdbIndexImpl * idx = m_accessTable->m_index;
  1170.   const NdbTableImpl * tab = m_currentTable;
  1171.   for(Uint32 i = 0; i<cnt; i++){
  1172.     Uint32 val = theTupleKeyDefined[i][0];
  1173.     switch(val){
  1174.     case FAKE_PTR:
  1175.       curr->setup(curr->m_column, 0);
  1176.     case API_PTR:
  1177.       curr = curr->next();
  1178.       break;
  1179.     case SETBOUND_EQ:
  1180.       break;
  1181. #ifdef VM_TRACE
  1182.     default:
  1183.       abort();
  1184. #endif
  1185.     }
  1186.   }
  1187. }
  1188. int
  1189. NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols, 
  1190.        const NdbReceiver* t1, 
  1191.        const NdbReceiver* t2){
  1192.   NdbRecAttr * r1 = t1->m_rows[t1->m_current_row];
  1193.   NdbRecAttr * r2 = t2->m_rows[t2->m_current_row];
  1194.   r1 = (skip ? r1->next() : r1);
  1195.   r2 = (skip ? r2->next() : r2);
  1196.   
  1197.   while(cols > 0){
  1198.     Uint32 * d1 = (Uint32*)r1->aRef();
  1199.     Uint32 * d2 = (Uint32*)r2->aRef();
  1200.     unsigned r1_null = r1->isNULL();
  1201.     if((r1_null ^ (unsigned)r2->isNULL())){
  1202.       return (r1_null ? -1 : 1);
  1203.     }
  1204.     const NdbColumnImpl & col = NdbColumnImpl::getImpl(* r1->m_column);
  1205.     Uint32 size = (r1->theAttrSize * r1->theArraySize + 3) / 4;
  1206.     if(!r1_null){
  1207.       const NdbSqlUtil::Type& sqlType = NdbSqlUtil::getType(col.m_extType);
  1208.       int r = (*sqlType.m_cmp)(col.m_cs, d1, d2, size, size);
  1209.       if(r){
  1210. assert(r != NdbSqlUtil::CmpUnknown);
  1211. return r;
  1212.       }
  1213.     }
  1214.     cols--;
  1215.     r1 = r1->next();
  1216.     r2 = r2->next();
  1217.   }
  1218.   return 0;
  1219. }
  1220. int
  1221. NdbIndexScanOperation::next_result_ordered(bool fetchAllowed,
  1222.    bool forceSend){
  1223.   
  1224.   Uint32 u_idx = 0, u_last = 0;
  1225.   Uint32 s_idx   = m_current_api_receiver; // first sorted
  1226.   Uint32 s_last  = theParallelism;         // last sorted
  1227.   NdbReceiver** arr = m_api_receivers;
  1228.   NdbReceiver* tRec = arr[s_idx];
  1229.   
  1230.   if(DEBUG_NEXT_RESULT) ndbout_c("nextOrderedResult(%d) nextResult: %d",
  1231.  fetchAllowed, 
  1232.  (s_idx < s_last ? tRec->nextResult() : 0));
  1233.   
  1234.   if(DEBUG_NEXT_RESULT) ndbout_c("u=[%d %d] s=[%d %d]", 
  1235.  u_idx, u_last,
  1236.  s_idx, s_last);
  1237.   
  1238.   bool fetchNeeded = (s_idx == s_last) || !tRec->nextResult();
  1239.   
  1240.   if(fetchNeeded){
  1241.     if(fetchAllowed){
  1242.       if(DEBUG_NEXT_RESULT) ndbout_c("performing fetch...");
  1243.       TransporterFacade* tp = TransporterFacade::instance();
  1244.       Guard guard(tp->theMutexPtr);
  1245.       if(theError.code)
  1246. return -1;
  1247.       Uint32 seq = theNdbCon->theNodeSequence;
  1248.       Uint32 nodeId = theNdbCon->theDBnode;
  1249.       if(seq == tp->getNodeSequence(nodeId) &&
  1250.  !send_next_scan_ordered(s_idx, forceSend)){
  1251. Uint32 tmp = m_sent_receivers_count;
  1252. s_idx = m_current_api_receiver; 
  1253. while(m_sent_receivers_count > 0 && !theError.code){
  1254.   theNdb->theImpl->theWaiter.m_node = nodeId;
  1255.   theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
  1256.   int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
  1257.   if (return_code == 0 && seq == tp->getNodeSequence(nodeId)) {
  1258.     continue;
  1259.   }
  1260.   if(DEBUG_NEXT_RESULT) ndbout_c("return -1");
  1261.   setErrorCode(4028);
  1262.   return -1;
  1263. }
  1264. if(theError.code){
  1265.   setErrorCode(theError.code);
  1266.   if(DEBUG_NEXT_RESULT) ndbout_c("return -1");
  1267.   return -1;
  1268. }
  1269. u_idx = 0;
  1270. u_last = m_conf_receivers_count;
  1271. m_conf_receivers_count = 0;
  1272. memcpy(arr, m_conf_receivers, u_last * sizeof(char*));
  1273. if(DEBUG_NEXT_RESULT) ndbout_c("sent: %d recv: %d", tmp, u_last);
  1274.       } else {
  1275. setErrorCode(4028);
  1276. return -1;
  1277.       }
  1278.     } else {
  1279.       if(DEBUG_NEXT_RESULT) ndbout_c("return 2");
  1280.       return 2;
  1281.     }
  1282.   } else {
  1283.     u_idx = s_idx;
  1284.     u_last = s_idx + 1;
  1285.     s_idx++;
  1286.   }
  1287.   
  1288.   if(DEBUG_NEXT_RESULT) ndbout_c("u=[%d %d] s=[%d %d]", 
  1289.  u_idx, u_last,
  1290.  s_idx, s_last);
  1291.   Uint32 cols = m_sort_columns;
  1292.   Uint32 skip = m_keyInfo;
  1293.   while(u_idx < u_last){
  1294.     u_last--;
  1295.     tRec = arr[u_last];
  1296.     
  1297.     // Do binary search instead to find place
  1298.     Uint32 place = s_idx;
  1299.     for(; place < s_last; place++){
  1300.       if(compare(skip, cols, tRec, arr[place]) <= 0){
  1301. break;
  1302.       }
  1303.     }
  1304.     
  1305.     if(place != s_idx){
  1306.       if(DEBUG_NEXT_RESULT) 
  1307. ndbout_c("memmove(%d, %d, %d)", s_idx-1, s_idx, (place - s_idx));
  1308.       memmove(arr+s_idx-1, arr+s_idx, sizeof(char*)*(place - s_idx));
  1309.     }
  1310.     
  1311.     if(DEBUG_NEXT_RESULT) ndbout_c("putting %d @ %d", u_last, place - 1);
  1312.     m_api_receivers[place-1] = tRec;
  1313.     s_idx--;
  1314.   }
  1315.   if(DEBUG_NEXT_RESULT) ndbout_c("u=[%d %d] s=[%d %d]", 
  1316.  u_idx, u_last,
  1317.  s_idx, s_last);
  1318.   
  1319.   m_current_api_receiver = s_idx;
  1320.   
  1321.   if(DEBUG_NEXT_RESULT)
  1322.     for(Uint32 i = s_idx; i<s_last; i++)
  1323.       ndbout_c("%p", arr[i]);
  1324.   
  1325.   tRec = m_api_receivers[s_idx];    
  1326.   if(s_idx < s_last && tRec->nextResult()){
  1327.     tRec->copyout(theReceiver);      
  1328.     if(DEBUG_NEXT_RESULT) ndbout_c("return 0");
  1329.     return 0;
  1330.   }
  1331.   theError.code = -1;
  1332.   if(DEBUG_NEXT_RESULT) ndbout_c("return 1");
  1333.   return 1;
  1334. }
  1335. int
  1336. NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx, bool forceSend){  
  1337.   if(idx == theParallelism)
  1338.     return 0;
  1339.   
  1340.   NdbReceiver* tRec = m_api_receivers[idx];
  1341.   NdbApiSignal tSignal(theNdb->theMyRef);
  1342.   tSignal.setSignal(GSN_SCAN_NEXTREQ);
  1343.   
  1344.   Uint32 last = m_sent_receivers_count;
  1345.   Uint32* theData = tSignal.getDataPtrSend();
  1346.   Uint32* prep_array = theData + 4;
  1347.   
  1348.   m_current_api_receiver = idx + 1;
  1349.   if((prep_array[0] = tRec->m_tcPtrI) == RNIL)
  1350.   {
  1351.     if(DEBUG_NEXT_RESULT)
  1352.       ndbout_c("receiver completed, don't send");
  1353.     return 0;
  1354.   }
  1355.   
  1356.   theData[0] = theNdbCon->theTCConPtr;
  1357.   theData[1] = 0;
  1358.   Uint64 transId = theNdbCon->theTransactionId;
  1359.   theData[2] = transId;
  1360.   theData[3] = (Uint32) (transId >> 32);
  1361.   
  1362.   /**
  1363.    * Prepare ops
  1364.    */
  1365.   m_sent_receivers[last] = tRec;
  1366.   tRec->m_list_index = last;
  1367.   tRec->prepareSend();
  1368.   m_sent_receivers_count = last + 1;
  1369.   
  1370.   Uint32 nodeId = theNdbCon->theDBnode;
  1371.   TransporterFacade * tp = TransporterFacade::instance();
  1372.   tSignal.setLength(4+1);
  1373.   int ret= tp->sendSignal(&tSignal, nodeId);
  1374.   if (!ret) checkForceSend(forceSend);
  1375.   return ret;
  1376. }
  1377. int
  1378. NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){
  1379.   Uint32 seq = theNdbCon->theNodeSequence;
  1380.   Uint32 nodeId = theNdbCon->theDBnode;
  1381.   
  1382.   if(seq != tp->getNodeSequence(nodeId))
  1383.   {
  1384.     theNdbCon->theReleaseOnClose = true;
  1385.     return -1;
  1386.   }
  1387.   
  1388.   /**
  1389.    * Wait for outstanding
  1390.    */
  1391.   while(theError.code == 0 && m_sent_receivers_count)
  1392.   {
  1393.     theNdb->theImpl->theWaiter.m_node = nodeId;
  1394.     theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
  1395.     int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
  1396.     switch(return_code){
  1397.     case 0:
  1398.       break;
  1399.     case -1:
  1400.       setErrorCode(4008);
  1401.     case -2:
  1402.       m_api_receivers_count = 0;
  1403.       m_conf_receivers_count = 0;
  1404.       m_sent_receivers_count = 0;
  1405.       theNdbCon->theReleaseOnClose = true;
  1406.       return -1;
  1407.     }
  1408.   }
  1409.   if(theError.code)
  1410.   {
  1411.     m_api_receivers_count = 0;
  1412.     m_current_api_receiver = m_ordered ? theParallelism : 0;
  1413.   }
  1414.   /**
  1415.    * move all conf'ed into api
  1416.    *   so that send_next_scan can check if they needs to be closed
  1417.    */
  1418.   Uint32 api = m_api_receivers_count;
  1419.   Uint32 conf = m_conf_receivers_count;
  1420.   if(m_ordered)
  1421.   {
  1422.     /**
  1423.      * Ordered scan, keep the m_api_receivers "to the right"
  1424.      */
  1425.     memmove(m_api_receivers, m_api_receivers+m_current_api_receiver, 
  1426.     (theParallelism - m_current_api_receiver) * sizeof(char*));
  1427.     api = (theParallelism - m_current_api_receiver);
  1428.     m_api_receivers_count = api;
  1429.   }
  1430.   
  1431.   if(DEBUG_NEXT_RESULT)
  1432.     ndbout_c("close_impl: [order api conf sent curr parr] %d %d %d %d %d %d",
  1433.      m_ordered, api, conf, 
  1434.      m_sent_receivers_count, m_current_api_receiver, theParallelism);
  1435.   
  1436.   if(api+conf)
  1437.   {
  1438.     /**
  1439.      * There's something to close
  1440.      *   setup m_api_receivers (for send_next_scan)
  1441.      */
  1442.     memcpy(m_api_receivers+api, m_conf_receivers, conf * sizeof(char*));
  1443.     m_api_receivers_count = api + conf;
  1444.     m_conf_receivers_count = 0;
  1445.   }
  1446.   
  1447.   // Send close scan
  1448.   if(send_next_scan(api+conf, true, forceSend) == -1)
  1449.   {
  1450.     theNdbCon->theReleaseOnClose = true;
  1451.     return -1;
  1452.   }
  1453.   
  1454.   /**
  1455.    * wait for close scan conf
  1456.    */
  1457.   while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count)
  1458.   {
  1459.     theNdb->theImpl->theWaiter.m_node = nodeId;
  1460.     theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
  1461.     int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
  1462.     switch(return_code){
  1463.     case 0:
  1464.       break;
  1465.     case -1:
  1466.       setErrorCode(4008);
  1467.     case -2:
  1468.       m_api_receivers_count = 0;
  1469.       m_conf_receivers_count = 0;
  1470.       m_sent_receivers_count = 0;
  1471.       theNdbCon->theReleaseOnClose = true;
  1472.       return -1;
  1473.     }
  1474.   }
  1475.   
  1476.   return 0;
  1477. }
  1478. void
  1479. NdbScanOperation::reset_receivers(Uint32 parallell, Uint32 ordered){
  1480.   for(Uint32 i = 0; i<parallell; i++){
  1481.     m_receivers[i]->m_list_index = i;
  1482.     m_prepared_receivers[i] = m_receivers[i]->getId();
  1483.     m_sent_receivers[i] = m_receivers[i];
  1484.     m_conf_receivers[i] = 0;
  1485.     m_api_receivers[i] = 0;
  1486.     m_receivers[i]->prepareSend();
  1487.   }
  1488.   
  1489.   m_api_receivers_count = 0;
  1490.   m_current_api_receiver = 0;
  1491.   m_sent_receivers_count = 0;
  1492.   m_conf_receivers_count = 0;
  1493. }
  1494. int
  1495. NdbScanOperation::restart(bool forceSend)
  1496. {
  1497.   
  1498.   TransporterFacade* tp = TransporterFacade::instance();
  1499.   Guard guard(tp->theMutexPtr);
  1500.   Uint32 nodeId = theNdbCon->theDBnode;
  1501.   
  1502.   {
  1503.     int res;
  1504.     if((res= close_impl(tp, forceSend)))
  1505.     {
  1506.       return res;
  1507.     }
  1508.   }
  1509.   
  1510.   /**
  1511.    * Reset receivers
  1512.    */
  1513.   reset_receivers(theParallelism, m_ordered);
  1514.   
  1515.   theError.code = 0;
  1516.   if (doSendScan(nodeId) == -1)
  1517.     return -1;
  1518.   
  1519.   return 0;
  1520. }
  1521. int
  1522. NdbIndexScanOperation::reset_bounds(bool forceSend){
  1523.   int res;
  1524.   
  1525.   {
  1526.     TransporterFacade* tp = TransporterFacade::instance();
  1527.     Guard guard(tp->theMutexPtr);
  1528.     res= close_impl(tp, forceSend);
  1529.   }
  1530.   if(!res)
  1531.   {
  1532.     theError.code = 0;
  1533.     reset_receivers(theParallelism, m_ordered);
  1534.     
  1535.     theLastKEYINFO = theFirstKEYINFO;
  1536.     theKEYINFOptr = ((KeyInfo*)theFirstKEYINFO->getDataPtrSend())->keyData;
  1537.     theTupKeyLen = 0;
  1538.     theTotalNrOfKeyWordInSignal = 0;
  1539.     m_transConnection
  1540.       ->remove_list((NdbOperation*&)m_transConnection->m_firstExecutedScanOp,
  1541.     this);
  1542.     m_transConnection->define_scan_op(this);
  1543.     return 0;
  1544.   }
  1545.   return res;
  1546. }