dba_bulkread.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:6k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #include "dba_internal.hpp"
  14. struct DBA__BulkReadData {
  15.   const DBA_Binding_t * const * pBindings; // The bindings
  16.   DBA_BulkReadResultSet_t * pData;       // The data 
  17.   
  18.   int NbRows;                               // NbRows per binding
  19.   int NbBindings;                           // NbBindings
  20.   int TotalRows;                            // Total rows (NbRows*NbBindings)
  21.   
  22.   DBA_AsyncCallbackFn_t CbFunc;             // Users callback
  23.   DBA_ReqId_t RequestId;                    // Users request id
  24.   DBA_Error_t Status;                       // Request status
  25.   DBA_ErrorCode_t ErrorCode;                /**< Request error
  26.        Only valid if request is
  27.        aborted */
  28.   
  29.   int RowsSubmitted;                        // No of read sent to NDB
  30.   int RowsAcknowledged;                     // No of read responses
  31.   int OpPerTrans;                           // Operations per transaction
  32.   
  33.   struct Index {
  34.     int binding;
  35.     int row;
  36.     int datarow;
  37.     void init() { row = binding = datarow = 0;}
  38.     void next(int rows) { 
  39.       datarow++; row++; 
  40.       if(row == rows){ row = 0; binding++; } 
  41.     }
  42.   };
  43.   Index lastSend;
  44.   Index nextSend;
  45.   
  46.   /**
  47.    * If "simple" bulkread
  48.    * use this storage
  49.    */
  50.   const DBA_Binding_t    * bindings[1];
  51.   DBA__BulkReadData() {
  52.     RequestId = DBA_INVALID_REQID;
  53.   }
  54.   void ProcessBulkRead();
  55.   bool ProcessCallback(int errorCode, NdbConnection * connection);
  56. };
  57. static
  58. void 
  59. NewtonCallback(int errorCode,
  60.        NdbConnection * connection,
  61.        void * anyObject){
  62.   DBA__BulkReadData * brd = (DBA__BulkReadData*)anyObject;
  63.   
  64.   brd->ProcessCallback(errorCode, connection);
  65.   DBA__TheNdb->closeTransaction(connection);
  66.   if(brd->RowsSubmitted == brd->TotalRows){
  67.     
  68.     /**
  69.      * The entire bulk read is finished,
  70.      * call users callback
  71.      */
  72.     DBA_ReqId_t reqId = brd->RequestId;
  73.     
  74.     // Invalidate BulkReadData
  75.     brd->RequestId = DBA_INVALID_REQID;
  76.     brd->CbFunc(reqId, brd->Status, brd->ErrorCode);
  77.     return;
  78.   }
  79.   
  80.   brd->ProcessBulkRead();
  81. }
  82. /**
  83.  * A BulkReadData structure
  84.  */
  85. static DBA__BulkReadData theBRD;
  86. #define CHECK_BINDINGS(Bindings) 
  87.   if(!DBA__ValidBinding(Bindings)){ 
  88.     DBA__SetLatestError(DBA_APPLICATION_ERROR, 0, "Invalid bindings"); 
  89.     return DBA_INVALID_REQID; 
  90.   } 
  91. #define CHECK_BINDINGS2(Bindings, NbBindings) 
  92.   if(!DBA__ValidBindings(Bindings, NbBindings)){ 
  93.     DBA__SetLatestError(DBA_APPLICATION_ERROR, 0, "Invalid bindings"); 
  94.     return DBA_INVALID_REQID; 
  95.   }
  96. DBA_ReqId_t
  97. DBA_BulkReadRows(const DBA_Binding_t * pBindings,
  98.  DBA_BulkReadResultSet_t pData[],
  99.  int NbRows,
  100.  DBA_AsyncCallbackFn_t CbFunc ){
  101.   CHECK_BINDINGS(pBindings);
  102.   DBA__BulkReadData * brd = &theBRD;
  103.   
  104.   NdbMutex_Lock(DBA__TheNewtonMutex);
  105.   if(brd->RequestId != DBA_INVALID_REQID){
  106.     DBA__SetLatestError(DBA_ERROR, 0,
  107. "DBA only permits 1 concurrent bulkread");
  108.     
  109.     NdbMutex_Unlock(DBA__TheNewtonMutex);
  110.     return DBA_ERROR;
  111.   }
  112.   
  113.   theBRD.RequestId = 1;
  114.   
  115.   /**
  116.    * 
  117.    */
  118.   brd->bindings[0] = pBindings;
  119.   brd->pBindings   = brd->bindings;
  120.   brd->pData       = pData;
  121.   /**
  122.    * Control data
  123.    */
  124.   brd->NbRows         = NbRows;
  125.   brd->NbBindings     = 1;
  126.   brd->TotalRows      = NbRows;
  127.   brd->CbFunc         = CbFunc;
  128.   brd->Status         = DBA_NO_ERROR;
  129.   brd->ErrorCode      = 0;
  130.   brd->OpPerTrans     = DBA__BulkReadCount;
  131.   brd->RowsSubmitted    = 0;
  132.   brd->RowsAcknowledged = 0;
  133.   brd->lastSend.init();
  134.   brd->nextSend.init();
  135.   brd->ProcessBulkRead();
  136.   NdbMutex_Unlock(DBA__TheNewtonMutex);
  137.   
  138.   return brd->RequestId;
  139. }
  140. DBA_ReqId_t
  141. DBA_BulkMultiReadRows(const DBA_Binding_t * const * pBindings,
  142.       DBA_BulkReadResultSet_t pData[],
  143.       int NbBindings,
  144.       int NbRows,
  145.       DBA_AsyncCallbackFn_t CbFunc ){
  146.   CHECK_BINDINGS2(pBindings, NbBindings);
  147.   DBA__BulkReadData * brd = &theBRD;
  148.   NdbMutex_Lock(DBA__TheNewtonMutex);
  149.   
  150.   if(brd->RequestId != DBA_INVALID_REQID){
  151.     DBA__SetLatestError(DBA_ERROR, 0,
  152. "DBA only permits 1 concurrent bulkread");
  153.     NdbMutex_Unlock(DBA__TheNewtonMutex);
  154.     return DBA_ERROR;
  155.   }
  156.   
  157.   brd->RequestId = 1;
  158.   
  159.   /**
  160.    * 
  161.    */
  162.   brd->pBindings  = pBindings;
  163.   brd->pData      = pData;
  164.   /**
  165.    * Control data
  166.    */
  167.   brd->NbRows        = NbRows;
  168.   brd->NbBindings    = NbBindings;
  169.   brd->TotalRows     = (NbRows * NbBindings);
  170.   brd->CbFunc        = CbFunc;
  171.   brd->Status        = DBA_NO_ERROR;
  172.   brd->ErrorCode     = 0;
  173.   brd->OpPerTrans    = DBA__BulkReadCount;
  174.   brd->RowsSubmitted    = 0;
  175.   brd->RowsAcknowledged = 0;
  176.   
  177.   brd->lastSend.init();
  178.   brd->nextSend.init();
  179.   
  180.   brd->ProcessBulkRead();
  181.   
  182.   NdbMutex_Unlock(DBA__TheNewtonMutex);  
  183.   return brd->RequestId;
  184. }
  185. bool
  186. DBA__BulkReadData::ProcessCallback(int errorCode, NdbConnection * con){
  187.   Index tmp = lastSend;
  188.   const NdbOperation * op = con->getNextCompletedOperation(0);
  189.   
  190.   for(int i = 0; i<OpPerTrans && RowsAcknowledged < RowsSubmitted; i++){
  191.     require(op != 0);
  192.     if(op->getNdbError().code == 0)
  193.       pData[tmp.datarow].RowFoundIndicator = 1;
  194.     else
  195.       pData[tmp.datarow].RowFoundIndicator = 0;
  196.     RowsAcknowledged++;
  197.     tmp.next(NbRows);
  198.     op = con->getNextCompletedOperation(op);
  199.   }
  200.   return true;
  201. }
  202. void
  203. DBA__BulkReadData::ProcessBulkRead(){
  204.   NdbConnection * con = DBA__TheNdb->startTransaction();
  205.   Index tmp = nextSend;
  206.   for(int i = 0; i<OpPerTrans && RowsSubmitted < TotalRows; i++){
  207.     
  208.     const DBA_Binding_t * binding = pBindings[tmp.binding];
  209.     void * data = pData[tmp.datarow].DataPtr;
  210.     
  211.     NdbOperation  * op  = con->getNdbOperation(binding->tableName);
  212.     
  213.     op->simpleRead();
  214.     
  215.     require(DBA__EqualGetValue(op, binding, data));
  216.     
  217.     RowsSubmitted++;
  218.     tmp.next(NbRows);
  219.   }
  220.   con->executeAsynchPrepare(Commit,
  221.     NewtonCallback,
  222.     (void*)this,
  223.     CommitAsMuchAsPossible);
  224.   lastSend = nextSend;
  225.   nextSend = tmp;
  226. }