DbtupBuffer.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:9k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #define DBTUP_C
  14. #include "Dbtup.hpp"
  15. #include <RefConvert.hpp>
  16. #include <ndb_limits.h>
  17. #include <pc.hpp>
  18. #include <signaldata/TransIdAI.hpp>
  19. #define ljam() { jamLine(2000 + __LINE__); }
  20. #define ljamEntry() { jamEntryLine(2000 + __LINE__); }
  21. void Dbtup::execSEND_PACKED(Signal* signal)
  22. {
  23.   Uint16 hostId;
  24.   Uint32 i;
  25.   Uint32 TpackedListIndex = cpackedListIndex;
  26.   ljamEntry();
  27.   for (i = 0; i < TpackedListIndex; i++) {
  28.     ljam();
  29.     hostId = cpackedList[i];
  30.     ndbrequire((hostId - 1) < (MAX_NODES - 1)); // Also check not zero
  31.     Uint32 TpacketTA = hostBuffer[hostId].noOfPacketsTA;
  32.     if (TpacketTA != 0) {
  33.       ljam();
  34.       BlockReference TBref = numberToRef(API_PACKED, hostId);
  35.       Uint32 TpacketLen = hostBuffer[hostId].packetLenTA;
  36.       MEMCOPY_NO_WORDS(&signal->theData[0],
  37.                        &hostBuffer[hostId].packetBufferTA[0],
  38.                        TpacketLen);
  39.       sendSignal(TBref, GSN_TRANSID_AI, signal, TpacketLen, JBB);
  40.       hostBuffer[hostId].noOfPacketsTA = 0;
  41.       hostBuffer[hostId].packetLenTA = 0;
  42.     }//if
  43.     hostBuffer[hostId].inPackedList = false;
  44.   }//for
  45.   cpackedListIndex = 0;
  46. }//Dbtup::execSEND_PACKED()
  47. void Dbtup::bufferTRANSID_AI(Signal* signal, BlockReference aRef,
  48.                              Uint32 Tlen)
  49. {
  50.   if(Tlen == 3)
  51.     return;
  52.   
  53.   Uint32 hostId = refToNode(aRef);
  54.   Uint32 Theader = ((refToBlock(aRef) << 16)+(Tlen-3));
  55.   
  56.   ndbrequire(hostId < MAX_NODES);
  57.   Uint32 TpacketLen = hostBuffer[hostId].packetLenTA;
  58.   Uint32 TnoOfPackets = hostBuffer[hostId].noOfPacketsTA;
  59.   Uint32 sig0 = signal->theData[0];
  60.   Uint32 sig1 = signal->theData[1];
  61.   Uint32 sig2 = signal->theData[2];
  62.   BlockReference TBref = numberToRef(API_PACKED, hostId);
  63.   if ((Tlen + TpacketLen + 1) <= 25) {
  64. // ----------------------------------------------------------------
  65. // There is still space in the buffer. We will copy it into the
  66. // buffer.
  67. // ----------------------------------------------------------------
  68.     ljam();
  69.     updatePackedList(signal, hostId);
  70.   } else if (false && TnoOfPackets == 1) {
  71. // ----------------------------------------------------------------
  72. // The buffer is full and there was only one packet buffered. We
  73. // will send this as a normal signal.
  74. // ----------------------------------------------------------------
  75.     Uint32 TnewRef = numberToRef((hostBuffer[hostId].packetBufferTA[0] >> 16),
  76.                                  hostId);
  77.     MEMCOPY_NO_WORDS(&signal->theData[0],
  78.                      &hostBuffer[hostId].packetBufferTA[1],
  79.                      TpacketLen - 1);
  80.     sendSignal(TnewRef, GSN_TRANSID_AI, signal, (TpacketLen - 1), JBB);
  81.     TpacketLen = 0;
  82.     TnoOfPackets = 0;
  83.   } else {
  84. // ----------------------------------------------------------------
  85. // The buffer is full but at least two packets. Send those in
  86. // packed form.
  87. // ----------------------------------------------------------------
  88.     MEMCOPY_NO_WORDS(&signal->theData[0],
  89.                      &hostBuffer[hostId].packetBufferTA[0],
  90.                      TpacketLen);
  91.     sendSignal(TBref, GSN_TRANSID_AI, signal, TpacketLen, JBB);
  92.     TpacketLen = 0;
  93.     TnoOfPackets = 0;
  94.   }//if
  95. // ----------------------------------------------------------------
  96. // Copy the signal into the buffer
  97. // ----------------------------------------------------------------
  98.   hostBuffer[hostId].packetBufferTA[TpacketLen + 0] = Theader;
  99.   hostBuffer[hostId].packetBufferTA[TpacketLen + 1] = sig0;
  100.   hostBuffer[hostId].packetBufferTA[TpacketLen + 2] = sig1;
  101.   hostBuffer[hostId].packetBufferTA[TpacketLen + 3] = sig2;
  102.   hostBuffer[hostId].noOfPacketsTA = TnoOfPackets + 1;
  103.   hostBuffer[hostId].packetLenTA = Tlen + TpacketLen + 1;
  104.   MEMCOPY_NO_WORDS(&hostBuffer[hostId].packetBufferTA[TpacketLen + 4],
  105.                    &signal->theData[25],
  106.                    Tlen - 3);
  107. }//Dbtup::bufferTRANSID_AI()
  108. void Dbtup::updatePackedList(Signal* signal, Uint16 hostId)
  109. {
  110.   if (hostBuffer[hostId].inPackedList == false) {
  111.     Uint32 TpackedListIndex = cpackedListIndex;
  112.     ljam();
  113.     hostBuffer[hostId].inPackedList = true;
  114.     cpackedList[TpackedListIndex] = hostId;
  115.     cpackedListIndex = TpackedListIndex + 1;
  116.   }//if
  117. }//Dbtup::updatePackedList()
  118. /* ---------------------------------------------------------------- */
  119. /* ----------------------- SEND READ ATTRINFO --------------------- */
  120. /* ---------------------------------------------------------------- */
  121. void Dbtup::sendReadAttrinfo(Signal* signal,
  122.                              Uint32 ToutBufIndex,
  123.                              const Operationrec * const regOperPtr)
  124. {
  125.   if(ToutBufIndex == 0)
  126.     return;
  127.   const BlockReference recBlockref = regOperPtr->recBlockref;
  128.   const Uint32 sig0 = regOperPtr->tcOperationPtr;
  129.   const Uint32 sig1 = regOperPtr->transid1;
  130.   const Uint32 sig2 = regOperPtr->transid2;
  131.   const Uint32 block = refToBlock(recBlockref);
  132.   const Uint32 nodeId = refToNode(recBlockref);
  133.   bool connectedToNode = getNodeInfo(nodeId).m_connected;
  134.   const Uint32 type = getNodeInfo(nodeId).m_type;
  135.   bool is_api = (type >= NodeInfo::API && type <= NodeInfo::REP);
  136.   bool old_dest = (getNodeInfo(nodeId).m_version < MAKE_VERSION(3,5,0));
  137.   const Uint32 TpacketTA = hostBuffer[nodeId].noOfPacketsTA;
  138.   const Uint32 TpacketLen = hostBuffer[nodeId].packetLenTA;
  139.   
  140.   if (ERROR_INSERTED(4006) && (nodeId != getOwnNodeId())){
  141.     // Use error insert to turn routing on
  142.     ljam();
  143.     connectedToNode = false;    
  144.   }
  145.   TransIdAI * transIdAI =  (TransIdAI *)signal->getDataPtrSend();
  146.   transIdAI->connectPtr = sig0;
  147.   transIdAI->transId[0] = sig1;
  148.   transIdAI->transId[1] = sig2;
  149.   if (connectedToNode){
  150.     /**
  151.      * Own node -> execute direct
  152.      */
  153.     if(nodeId != getOwnNodeId()){
  154.       ljam();
  155.     
  156.       /**
  157.        * Send long sig
  158.        */
  159.       if(ToutBufIndex >= 22 && is_api && !old_dest) {
  160. ljam();
  161. /**
  162.  * Flush buffer so that order is maintained
  163.  */
  164. if (TpacketTA != 0) {
  165.   ljam();
  166.   BlockReference TBref = numberToRef(API_PACKED, nodeId);
  167.   MEMCOPY_NO_WORDS(&signal->theData[0],
  168.    &hostBuffer[nodeId].packetBufferTA[0],
  169.    TpacketLen);
  170.   sendSignal(TBref, GSN_TRANSID_AI, signal, TpacketLen, JBB);
  171.   hostBuffer[nodeId].noOfPacketsTA = 0;
  172.   hostBuffer[nodeId].packetLenTA = 0;
  173.   transIdAI->connectPtr = sig0;
  174.   transIdAI->transId[0] = sig1;
  175.   transIdAI->transId[1] = sig2;
  176. }//if
  177. LinearSectionPtr ptr[3];
  178. ptr[0].p = &signal->theData[25];
  179. ptr[0].sz = ToutBufIndex;
  180. sendSignal(recBlockref, GSN_TRANSID_AI, signal, 3, JBB, ptr, 1);
  181. return;
  182.       }
  183.       
  184.       /**
  185.        * short sig + api -> buffer
  186.        */
  187. #ifndef NDB_NO_DROPPED_SIGNAL
  188.       if (ToutBufIndex < 22 && is_api){
  189. ljam();
  190. bufferTRANSID_AI(signal, recBlockref, 3+ToutBufIndex);
  191. return;
  192.       }//if
  193. #endif      
  194.       /**
  195.        * rest -> old send sig
  196.        */
  197.       Uint32 * src = signal->theData+25;
  198.       if(ToutBufIndex >= 22){
  199. do {
  200.   ljam();
  201.   MEMCOPY_NO_WORDS(&signal->theData[3], src, 22);
  202.   sendSignal(recBlockref, GSN_TRANSID_AI, signal, 25, JBB);
  203.   ToutBufIndex -= 22;
  204.   src += 22;
  205. } while(ToutBufIndex >= 22);
  206.       }
  207.       
  208.       if(ToutBufIndex > 0){
  209. ljam();
  210. MEMCOPY_NO_WORDS(&signal->theData[3], src, ToutBufIndex);
  211. sendSignal(recBlockref, GSN_TRANSID_AI, signal, 3+ToutBufIndex, JBB);
  212.       }
  213.       return;
  214.     }
  215.     EXECUTE_DIRECT(block, GSN_TRANSID_AI, signal, 3 + ToutBufIndex);
  216.     ljamEntry();
  217.     return;
  218.   }
  219.   /** 
  220.    * If this node does not have a direct connection 
  221.    * to the receiving node we want to send the signals 
  222.    * routed via the node that controls this read
  223.    */
  224.   Uint32 routeBlockref = regOperPtr->coordinatorTC;
  225.   
  226.   if(true){ // TODO is_api && !old_dest){
  227.     ljam();
  228.     transIdAI->attrData[0] = recBlockref;
  229.     LinearSectionPtr ptr[3];
  230.     ptr[0].p = &signal->theData[25];
  231.     ptr[0].sz = ToutBufIndex;
  232.     sendSignal(routeBlockref, GSN_TRANSID_AI_R, signal, 4, JBB, ptr, 1);
  233.     return;
  234.   }
  235.   
  236.   /**
  237.    * Fill in a TRANSID_AI signal, use last word to store
  238.    * final destination and send it to route node
  239.    * as signal TRANSID_AI_R (R as in Routed)
  240.    */ 
  241.   Uint32 tot = ToutBufIndex;
  242.   Uint32 sent = 0;
  243.   Uint32 maxLen = TransIdAI::DataLength - 1;
  244.   while (sent < tot) {
  245.     ljam();      
  246.     Uint32 dataLen = (tot - sent > maxLen) ? maxLen : tot - sent;
  247.     Uint32 sigLen = dataLen + TransIdAI::HeaderLength + 1; 
  248.     MEMCOPY_NO_WORDS(&transIdAI->attrData,
  249.      &signal->theData[25+sent],
  250.      dataLen);
  251.     // Set final destination in last word
  252.     transIdAI->attrData[dataLen] = recBlockref;
  253.     
  254.     sendSignal(routeBlockref, GSN_TRANSID_AI_R, 
  255.        signal, sigLen, JBB);
  256.     sent += dataLen;
  257.   }
  258. }//Dbtup::sendReadAttrinfo()