NdbReceiver.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:8k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #include <ndb_global.h>
  14. #include "NdbImpl.hpp"
  15. #include <NdbReceiver.hpp>
  16. #include "NdbDictionaryImpl.hpp"
  17. #include <NdbRecAttr.hpp>
  18. #include <AttributeHeader.hpp>
  19. #include <NdbConnection.hpp>
  20. #include <TransporterFacade.hpp>
  21. #include <signaldata/TcKeyConf.hpp>
  22. NdbReceiver::NdbReceiver(Ndb *aNdb) :
  23.   theMagicNumber(0),
  24.   m_ndb(aNdb),
  25.   m_id(NdbObjectIdMap::InvalidId),
  26.   m_type(NDB_UNINITIALIZED),
  27.   m_owner(0)
  28. {
  29.   theCurrentRecAttr = theFirstRecAttr = 0;
  30.   m_defined_rows = 0;
  31.   m_rows = new NdbRecAttr*[0];
  32. }
  33.  
  34. NdbReceiver::~NdbReceiver()
  35. {
  36.   DBUG_ENTER("NdbReceiver::~NdbReceiver");
  37.   if (m_id != NdbObjectIdMap::InvalidId) {
  38.     m_ndb->theImpl->theNdbObjectIdMap.unmap(m_id, this);
  39.   }
  40.   delete[] m_rows;
  41.   DBUG_VOID_RETURN;
  42. }
  43. void
  44. NdbReceiver::init(ReceiverType type, void* owner)
  45. {
  46.   theMagicNumber = 0x11223344;
  47.   m_type = type;
  48.   m_owner = owner;
  49.   if (m_id == NdbObjectIdMap::InvalidId) {
  50.     if (m_ndb)
  51.       m_id = m_ndb->theImpl->theNdbObjectIdMap.map(this);
  52.   }
  53.   theFirstRecAttr = NULL;
  54.   theCurrentRecAttr = NULL;
  55. }
  56. void
  57. NdbReceiver::release(){
  58.   NdbRecAttr* tRecAttr = theFirstRecAttr;
  59.   while (tRecAttr != NULL)
  60.   {
  61.     NdbRecAttr* tSaveRecAttr = tRecAttr;
  62.     tRecAttr = tRecAttr->next();
  63.     m_ndb->releaseRecAttr(tSaveRecAttr);
  64.   }
  65.   theFirstRecAttr = NULL;
  66.   theCurrentRecAttr = NULL;
  67. }
  68.   
  69. NdbRecAttr *
  70. NdbReceiver::getValue(const NdbColumnImpl* tAttrInfo, char * user_dst_ptr){
  71.   NdbRecAttr* tRecAttr = m_ndb->getRecAttr();
  72.   if(tRecAttr && !tRecAttr->setup(tAttrInfo, user_dst_ptr)){
  73.     if (theFirstRecAttr == NULL)
  74.       theFirstRecAttr = tRecAttr;
  75.     else
  76.       theCurrentRecAttr->next(tRecAttr);
  77.     theCurrentRecAttr = tRecAttr;
  78.     tRecAttr->next(NULL);
  79.     return tRecAttr;
  80.   }
  81.   if(tRecAttr){
  82.     m_ndb->releaseRecAttr(tRecAttr);
  83.   }    
  84.   return 0;
  85. }
  86. #define KEY_ATTR_ID (~(Uint32)0)
  87. void
  88. NdbReceiver::calculate_batch_size(Uint32 key_size,
  89.                                   Uint32 parallelism,
  90.                                   Uint32& batch_size,
  91.                                   Uint32& batch_byte_size,
  92.                                   Uint32& first_batch_size)
  93. {
  94.   TransporterFacade *tp= TransporterFacade::instance();
  95.   Uint32 max_scan_batch_size= tp->get_scan_batch_size();
  96.   Uint32 max_batch_byte_size= tp->get_batch_byte_size();
  97.   Uint32 max_batch_size= tp->get_batch_size();
  98.   Uint32 tot_size= (key_size ? (key_size + 32) : 0); //key + signal overhead
  99.   NdbRecAttr *rec_attr= theFirstRecAttr;
  100.   while (rec_attr != NULL) {
  101.     Uint32 attr_size= rec_attr->attrSize() * rec_attr->arraySize();
  102.     attr_size= ((attr_size + 7) >> 2) << 2; //Even to word + overhead
  103.     tot_size+= attr_size;
  104.     rec_attr= rec_attr->next();
  105.   }
  106.   tot_size+= 32; //include signal overhead
  107.   /**
  108.    * Now we calculate the batch size by trying to get upto SCAN_BATCH_SIZE
  109.    * bytes sent for each batch from each node. We do however ensure that
  110.    * no more than MAX_SCAN_BATCH_SIZE is sent from all nodes in total per
  111.    * batch.
  112.    */
  113.   batch_byte_size= max_batch_byte_size;
  114.   if (batch_byte_size * parallelism > max_scan_batch_size) {
  115.     batch_byte_size= max_scan_batch_size / parallelism;
  116.   }
  117.   batch_size= batch_byte_size / tot_size;
  118.   if (batch_size == 0) {
  119.     batch_size= 1;
  120.   } else {
  121.     if (batch_size > max_batch_size) {
  122.       batch_size= max_batch_size;
  123.     } else if (batch_size > MAX_PARALLEL_OP_PER_SCAN) {
  124.       batch_size= MAX_PARALLEL_OP_PER_SCAN;
  125.     }
  126.   }
  127.   first_batch_size= batch_size;
  128.   return;
  129. }
  130. void
  131. NdbReceiver::do_get_value(NdbReceiver * org, Uint32 rows, Uint32 key_size){
  132.   if(rows > m_defined_rows){
  133.     delete[] m_rows;
  134.     m_defined_rows = rows;
  135.     m_rows = new NdbRecAttr*[rows + 1]; 
  136.   }
  137.   m_rows[rows] = 0;
  138.   
  139.   NdbColumnImpl key;
  140.   if(key_size){
  141.     key.m_attrId = KEY_ATTR_ID;
  142.     key.m_arraySize = key_size+1;
  143.     key.m_attrSize = 4;
  144.     key.m_nullable = true; // So that receive works w.r.t KEYINFO20
  145.   }
  146.   m_key_info = key_size;
  147.   
  148.   for(Uint32 i = 0; i<rows; i++){
  149.     NdbRecAttr * prev = theCurrentRecAttr;
  150.     assert(prev == 0 || i > 0);
  151.     
  152.     // Put key-recAttr fir on each row
  153.     if(key_size && !getValue(&key, (char*)0)){
  154.       abort();
  155.       return ; // -1
  156.     }
  157.     
  158.     NdbRecAttr* tRecAttr = org->theFirstRecAttr;
  159.     while(tRecAttr != 0){
  160.       if(getValue(&NdbColumnImpl::getImpl(*tRecAttr->m_column), (char*)0) != 0)
  161. tRecAttr = tRecAttr->next();
  162.       else
  163. break;
  164.     }
  165.     
  166.     if(tRecAttr){
  167.       abort();
  168.       return ;// -1;
  169.     }
  170.     // Store first recAttr for each row in m_rows[i]
  171.     if(prev){
  172.       m_rows[i] = prev->next();
  173.     } else {
  174.       m_rows[i] = theFirstRecAttr;
  175.     }
  176.   } 
  177.   prepareSend();
  178.   return;
  179. }
  180. void
  181. NdbReceiver::copyout(NdbReceiver & dstRec){
  182.   NdbRecAttr* src = m_rows[m_current_row++];
  183.   NdbRecAttr* dst = dstRec.theFirstRecAttr;
  184.   Uint32 tmp = m_key_info;
  185.   if(tmp > 0){
  186.     src = src->next();
  187.   }
  188.   
  189.   while(dst){
  190.     Uint32 len = ((src->theAttrSize * src->theArraySize)+3)/4;
  191.     dst->receive_data((Uint32*)src->aRef(),  src->isNULL() ? 0 : len);
  192.     src = src->next();
  193.     dst = dst->next();
  194.   }
  195. }
  196. int
  197. NdbReceiver::execTRANSID_AI(const Uint32* aDataPtr, Uint32 aLength)
  198. {
  199.   bool ok = true;
  200.   NdbRecAttr* currRecAttr = theCurrentRecAttr;
  201.   
  202.   for (Uint32 used = 0; used < aLength ; used++){
  203.     AttributeHeader ah(* aDataPtr++);
  204.     const Uint32 tAttrId = ah.getAttributeId();
  205.     const Uint32 tAttrSize = ah.getDataSize();
  206.     /**
  207.      * Set all results to NULL if  not found...
  208.      */
  209.     while(currRecAttr && currRecAttr->attrId() != tAttrId){
  210.       ok &= currRecAttr->setNULL();
  211.       currRecAttr = currRecAttr->next();
  212.     }
  213.     
  214.     if(ok && currRecAttr && currRecAttr->receive_data(aDataPtr, tAttrSize)){
  215.       used += tAttrSize;
  216.       aDataPtr += tAttrSize;
  217.       currRecAttr = currRecAttr->next();
  218.     } else {
  219.       ndbout_c("%p: ok: %d tAttrId: %d currRecAttr: %p", 
  220.        this,ok, tAttrId, currRecAttr);
  221.       currRecAttr = theCurrentRecAttr;
  222.       while(currRecAttr != 0){
  223. ndbout_c("%d ", currRecAttr->attrId());
  224. currRecAttr = currRecAttr->next();
  225.       }
  226.       abort();
  227.       return -1;
  228.     }
  229.   }
  230.   theCurrentRecAttr = currRecAttr;
  231.   
  232.   /**
  233.    * Update m_received_result_length
  234.    */
  235.   Uint32 exp = m_expected_result_length; 
  236.   Uint32 tmp = m_received_result_length + aLength;
  237.   m_received_result_length = tmp;
  238.   return (tmp == exp || (exp > TcKeyConf::SimpleReadBit) ? 1 : 0);
  239. }
  240. int
  241. NdbReceiver::execKEYINFO20(Uint32 info, const Uint32* aDataPtr, Uint32 aLength)
  242. {
  243.   NdbRecAttr* currRecAttr = m_rows[m_current_row++];
  244.   assert(currRecAttr->attrId() == KEY_ATTR_ID);
  245.   currRecAttr->receive_data(aDataPtr, aLength + 1);
  246.   
  247.   /**
  248.    * Save scanInfo in the end of keyinfo
  249.    */
  250.   ((Uint32*)currRecAttr->aRef())[aLength] = info;
  251.   
  252.   Uint32 tmp = m_received_result_length + aLength;
  253.   m_received_result_length = tmp;
  254.   
  255.   return (tmp == m_expected_result_length ? 1 : 0);
  256. }
  257. void
  258. NdbReceiver::setErrorCode(int code)
  259. {
  260.   theMagicNumber = 0;
  261.   NdbOperation* op = (NdbOperation*)getOwner();
  262.   op->setErrorCode(code);
  263. }