Packer.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:14k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #include <ndb_global.h>
  14. #include "Packer.hpp"
  15. #include <TransporterRegistry.hpp>
  16. #include <TransporterCallback.hpp>
  17. #include <RefConvert.hpp>
  18. #define MAX_RECEIVED_SIGNALS 1024
  19. Uint32
  20. TransporterRegistry::unpack(Uint32 * readPtr,
  21.     Uint32 sizeOfData,
  22.     NodeId remoteNodeId,
  23.     IOState state) {
  24.   SignalHeader signalHeader;
  25.   LinearSectionPtr ptr[3];
  26.   
  27.   Uint32 usedData   = 0;
  28.   Uint32 loop_count = 0; 
  29.  
  30.   if(state == NoHalt || state == HaltOutput){
  31.     while ((sizeOfData >= 4 + sizeof(Protocol6)) &&
  32.            (loop_count < MAX_RECEIVED_SIGNALS)) {
  33.       Uint32 word1 = readPtr[0];
  34.       Uint32 word2 = readPtr[1];
  35.       Uint32 word3 = readPtr[2];
  36.       loop_count++;
  37.       
  38. #if 0
  39.       if(Protocol6::getByteOrder(word1) != MY_OWN_BYTE_ORDER){
  40. //Do funky stuff
  41.       }
  42. #endif
  43.       const Uint16 messageLen32    = Protocol6::getMessageLength(word1);
  44.       const Uint32 messageLenBytes = ((Uint32)messageLen32) << 2;
  45.       if(messageLen32 == 0 || messageLen32 > MAX_MESSAGE_SIZE){
  46.         DEBUG("Message Size = " << messageLenBytes);
  47. reportError(callbackObj, remoteNodeId, TE_INVALID_MESSAGE_LENGTH);
  48.         return usedData;
  49.       }//if
  50.       
  51.       if (sizeOfData < messageLenBytes) {
  52. break;
  53.       }//if
  54.       
  55.       if(Protocol6::getCheckSumIncluded(word1)){
  56. const Uint32 tmpLen = messageLen32 - 1;
  57. const Uint32 checkSumSent     = readPtr[tmpLen];
  58. const Uint32 checkSumComputed = computeChecksum(&readPtr[0], tmpLen);
  59. if(checkSumComputed != checkSumSent){
  60.   reportError(callbackObj, remoteNodeId, TE_INVALID_CHECKSUM);
  61.           return usedData;
  62. }//if
  63.       }//if
  64.       
  65. #if 0
  66.       if(Protocol6::getCompressed(word1)){
  67. //Do funky stuff
  68.       }//if
  69. #endif
  70.       
  71.       Protocol6::createSignalHeader(&signalHeader, word1, word2, word3);
  72.       
  73.       Uint32 sBlockNum = signalHeader.theSendersBlockRef;
  74.       sBlockNum = numberToRef(sBlockNum, remoteNodeId);
  75.       signalHeader.theSendersBlockRef = sBlockNum;
  76.       
  77.       Uint8 prio = Protocol6::getPrio(word1);
  78.       
  79.       Uint32 * signalData = &readPtr[3];
  80.       
  81.       if(Protocol6::getSignalIdIncluded(word1) == 0){
  82. signalHeader.theSendersSignalId = ~0;
  83.       } else {
  84. signalHeader.theSendersSignalId = * signalData;
  85. signalData ++;
  86.       }//if
  87.       
  88.       Uint32 * sectionPtr = signalData + signalHeader.theLength;
  89.       Uint32 * sectionData = sectionPtr + signalHeader.m_noOfSections;
  90.       for(Uint32 i = 0; i<signalHeader.m_noOfSections; i++){
  91. Uint32 sz = * sectionPtr;
  92. ptr[i].sz = sz;
  93. ptr[i].p = sectionData;
  94. sectionPtr ++;
  95. sectionData += sz;
  96.       }
  97.       execute(callbackObj, &signalHeader, prio, signalData, ptr);
  98.       
  99.       readPtr     += messageLen32;
  100.       sizeOfData  -= messageLenBytes;
  101.       usedData    += messageLenBytes;
  102.     }//while
  103.     
  104.     return usedData;
  105.   } else {
  106.     /** state = HaltIO || state == HaltInput */
  107.     while ((sizeOfData >= 4 + sizeof(Protocol6)) &&
  108.            (loop_count < MAX_RECEIVED_SIGNALS)) {
  109.       Uint32 word1 = readPtr[0];
  110.       Uint32 word2 = readPtr[1];
  111.       Uint32 word3 = readPtr[2];
  112.       loop_count++;
  113.       
  114. #if 0
  115.       if(Protocol6::getByteOrder(word1) != MY_OWN_BYTE_ORDER){
  116. //Do funky stuff
  117.       }//if
  118. #endif
  119.       
  120.       const Uint16 messageLen32    = Protocol6::getMessageLength(word1);
  121.       const Uint32 messageLenBytes = ((Uint32)messageLen32) << 2;
  122.       if(messageLen32 == 0 || messageLen32 > MAX_MESSAGE_SIZE){
  123. DEBUG("Message Size = " << messageLenBytes);
  124. reportError(callbackObj, remoteNodeId, TE_INVALID_MESSAGE_LENGTH);
  125.         return usedData;
  126.       }//if
  127.       
  128.       if (sizeOfData < messageLenBytes) {
  129. break;
  130.       }//if
  131.       
  132.       if(Protocol6::getCheckSumIncluded(word1)){
  133. const Uint32 tmpLen = messageLen32 - 1;
  134. const Uint32 checkSumSent     = readPtr[tmpLen];
  135. const Uint32 checkSumComputed = computeChecksum(&readPtr[0], tmpLen);
  136. if(checkSumComputed != checkSumSent){
  137.   
  138.   //theTransporters[remoteNodeId]->disconnect();
  139.   reportError(callbackObj, remoteNodeId, TE_INVALID_CHECKSUM);
  140.           return usedData;
  141. }//if
  142.       }//if
  143.       
  144. #if 0
  145.       if(Protocol6::getCompressed(word1)){
  146. //Do funky stuff
  147.       }//if
  148. #endif
  149.       
  150.       Protocol6::createSignalHeader(&signalHeader, word1, word2, word3);
  151.       
  152.       Uint32 rBlockNum = signalHeader.theReceiversBlockNumber;
  153.       if(rBlockNum == 252){
  154. Uint32 sBlockNum = signalHeader.theSendersBlockRef;
  155. sBlockNum = numberToRef(sBlockNum, remoteNodeId);
  156. signalHeader.theSendersBlockRef = sBlockNum;
  157. Uint8 prio = Protocol6::getPrio(word1);
  158. Uint32 * signalData = &readPtr[3];
  159. if(Protocol6::getSignalIdIncluded(word1) == 0){
  160.   signalHeader.theSendersSignalId = ~0;
  161. } else {
  162.   signalHeader.theSendersSignalId = * signalData;
  163.   signalData ++;
  164. }//if
  165. Uint32 * sectionPtr = signalData + signalHeader.theLength;
  166. Uint32 * sectionData = sectionPtr + signalHeader.m_noOfSections;
  167. for(Uint32 i = 0; i<signalHeader.m_noOfSections; i++){
  168.   Uint32 sz = * sectionPtr;
  169.   ptr[i].sz = sz;
  170.   ptr[i].p = sectionData;
  171.   
  172.   sectionPtr ++;
  173.   sectionData += sz;
  174. }
  175. execute(callbackObj, &signalHeader, prio, signalData, ptr);
  176.       } else {
  177. DEBUG("prepareReceive(...) - Discarding message to block: "
  178.       << rBlockNum << " from Node: " << remoteNodeId);
  179.       }//if
  180.       
  181.       readPtr     += messageLen32;
  182.       sizeOfData  -= messageLenBytes;
  183.       usedData    += messageLenBytes;
  184.     }//while
  185.     
  186.     return usedData;
  187.   }//if
  188. }
  189. Uint32 *
  190. TransporterRegistry::unpack(Uint32 * readPtr,
  191.     Uint32 * eodPtr,
  192.     NodeId remoteNodeId,
  193.     IOState state) {
  194.   static SignalHeader signalHeader;
  195.   static LinearSectionPtr ptr[3];
  196.   Uint32 loop_count = 0;
  197.   if(state == NoHalt || state == HaltOutput){
  198.     while ((readPtr < eodPtr) && (loop_count < MAX_RECEIVED_SIGNALS)) {
  199.       Uint32 word1 = readPtr[0];
  200.       Uint32 word2 = readPtr[1];
  201.       Uint32 word3 = readPtr[2];
  202.       loop_count++; 
  203. #if 0
  204.       if(Protocol6::getByteOrder(word1) != MY_OWN_BYTE_ORDER){
  205. //Do funky stuff
  206.       }
  207. #endif
  208.       
  209.       const Uint16 messageLen32    = Protocol6::getMessageLength(word1);
  210.       
  211.       if(messageLen32 == 0 || messageLen32 > MAX_MESSAGE_SIZE){
  212.         DEBUG("Message Size(words) = " << messageLen32);
  213. reportError(callbackObj, remoteNodeId, TE_INVALID_MESSAGE_LENGTH);
  214.         return readPtr;
  215.       }//if
  216.       
  217.       if(Protocol6::getCheckSumIncluded(word1)){
  218. const Uint32 tmpLen = messageLen32 - 1;
  219. const Uint32 checkSumSent     = readPtr[tmpLen];
  220. const Uint32 checkSumComputed = computeChecksum(&readPtr[0], tmpLen);
  221. if(checkSumComputed != checkSumSent){
  222.   reportError(callbackObj, remoteNodeId, TE_INVALID_CHECKSUM);
  223.   return readPtr;
  224. }//if
  225.       }//if
  226.       
  227. #if 0
  228.       if(Protocol6::getCompressed(word1)){
  229. //Do funky stuff
  230.       }//if
  231. #endif
  232.       
  233.       Protocol6::createSignalHeader(&signalHeader, word1, word2, word3);
  234.       
  235.       Uint32 sBlockNum = signalHeader.theSendersBlockRef;
  236.       sBlockNum = numberToRef(sBlockNum, remoteNodeId);
  237.       signalHeader.theSendersBlockRef = sBlockNum;
  238.       
  239.       Uint8 prio = Protocol6::getPrio(word1);
  240.       
  241.       Uint32 * signalData = &readPtr[3];
  242.       
  243.       if(Protocol6::getSignalIdIncluded(word1) == 0){
  244. signalHeader.theSendersSignalId = ~0;
  245.       } else {
  246. signalHeader.theSendersSignalId = * signalData;
  247. signalData ++;
  248.       }//if
  249.       Uint32 * sectionPtr = signalData + signalHeader.theLength;
  250.       Uint32 * sectionData = sectionPtr + signalHeader.m_noOfSections;
  251.       for(Uint32 i = 0; i<signalHeader.m_noOfSections; i++){
  252. Uint32 sz = * sectionPtr;
  253. ptr[i].sz = sz;
  254. ptr[i].p = sectionData;
  255. sectionPtr ++;
  256. sectionData += sz;
  257.       }
  258.       
  259.       execute(callbackObj, &signalHeader, prio, signalData, ptr);
  260.       
  261.       readPtr += messageLen32;
  262.     }//while
  263.   } else {
  264.     /** state = HaltIO || state == HaltInput */
  265.     while ((readPtr < eodPtr) && (loop_count < MAX_RECEIVED_SIGNALS)) {
  266.       Uint32 word1 = readPtr[0];
  267.       Uint32 word2 = readPtr[1];
  268.       Uint32 word3 = readPtr[2];
  269.       loop_count++; 
  270. #if 0
  271.       if(Protocol6::getByteOrder(word1) != MY_OWN_BYTE_ORDER){
  272. //Do funky stuff
  273.       }//if
  274. #endif
  275.       
  276.       const Uint16 messageLen32    = Protocol6::getMessageLength(word1);
  277.       if(messageLen32 == 0 || messageLen32 > MAX_MESSAGE_SIZE){
  278. DEBUG("Message Size(words) = " << messageLen32);
  279. reportError(callbackObj, remoteNodeId, TE_INVALID_MESSAGE_LENGTH);
  280.         return readPtr;
  281.       }//if
  282.       
  283.       if(Protocol6::getCheckSumIncluded(word1)){
  284. const Uint32 tmpLen = messageLen32 - 1;
  285. const Uint32 checkSumSent     = readPtr[tmpLen];
  286. const Uint32 checkSumComputed = computeChecksum(&readPtr[0], tmpLen);
  287. if(checkSumComputed != checkSumSent){
  288.   
  289.   //theTransporters[remoteNodeId]->disconnect();
  290.   reportError(callbackObj, remoteNodeId, TE_INVALID_CHECKSUM);
  291.   return readPtr;
  292. }//if
  293.       }//if
  294.       
  295. #if 0
  296.       if(Protocol6::getCompressed(word1)){
  297. //Do funky stuff
  298.       }//if
  299. #endif
  300.       
  301.       Protocol6::createSignalHeader(&signalHeader, word1, word2, word3);
  302.       
  303.       Uint32 rBlockNum = signalHeader.theReceiversBlockNumber;
  304.       
  305.       if(rBlockNum == 252){
  306. Uint32 sBlockNum = signalHeader.theSendersBlockRef;
  307. sBlockNum = numberToRef(sBlockNum, remoteNodeId);
  308. signalHeader.theSendersBlockRef = sBlockNum;
  309. Uint8 prio = Protocol6::getPrio(word1);
  310. Uint32 * signalData = &readPtr[3];
  311. if(Protocol6::getSignalIdIncluded(word1) == 0){
  312.   signalHeader.theSendersSignalId = ~0;
  313. } else {
  314.   signalHeader.theSendersSignalId = * signalData;
  315.   signalData ++;
  316. }//if
  317. Uint32 * sectionPtr = signalData + signalHeader.theLength;
  318. Uint32 * sectionData = sectionPtr + signalHeader.m_noOfSections;
  319. for(Uint32 i = 0; i<signalHeader.m_noOfSections; i++){
  320.   Uint32 sz = * sectionPtr;
  321.   ptr[i].sz = sz;
  322.   ptr[i].p = sectionData;
  323.   
  324.   sectionPtr ++;
  325.   sectionData += sz;
  326. }
  327. execute(callbackObj, &signalHeader, prio, signalData, ptr);
  328.       } else {
  329. DEBUG("prepareReceive(...) - Discarding message to block: "
  330.       << rBlockNum << " from Node: " << remoteNodeId);
  331.       }//if
  332.       
  333.       readPtr += messageLen32;
  334.     }//while
  335.   }//if
  336.   return readPtr;
  337. }
  338. Packer::Packer(bool signalId, bool checksum) {
  339.   
  340.   checksumUsed    = (checksum ? 1 : 0);
  341.   signalIdUsed    = (signalId ? 1 : 0);
  342.   
  343.   // Set the priority
  344.   preComputedWord1 = 0;
  345.   Protocol6::setByteOrder(preComputedWord1, 0);
  346.   Protocol6::setSignalIdIncluded(preComputedWord1, signalIdUsed);
  347.   Protocol6::setCheckSumIncluded(preComputedWord1, checksumUsed);
  348.   Protocol6::setCompressed(preComputedWord1, 0);
  349. }
  350. inline
  351. void
  352. import(Uint32 * & insertPtr, const LinearSectionPtr & ptr){
  353.   const Uint32 sz = ptr.sz;
  354.   memcpy(insertPtr, ptr.p, 4 * sz);
  355.   insertPtr += sz;
  356. }
  357. void copy(Uint32 * & insertPtr, 
  358.   class SectionSegmentPool &, const SegmentedSectionPtr & ptr);
  359. void
  360. Packer::pack(Uint32 * insertPtr, 
  361.      Uint32 prio, 
  362.      const SignalHeader * header, 
  363.      const Uint32 * theData,
  364.      const LinearSectionPtr ptr[3]) const {
  365.   Uint32 i;
  366.   
  367.   Uint32 dataLen32 = header->theLength;
  368.   Uint32 no_segs = header->m_noOfSections;
  369.   Uint32 len32 = 
  370.     dataLen32 + no_segs + 
  371.     checksumUsed + signalIdUsed + (sizeof(Protocol6)/4);
  372.   
  373.   for(i = 0; i<no_segs; i++){
  374.     len32 += ptr[i].sz;
  375.   }
  376.   
  377.   /**
  378.    * Do insert of data
  379.    */
  380.   Uint32 word1 = preComputedWord1;
  381.   Uint32 word2 = 0;
  382.   Uint32 word3 = 0;
  383.   
  384.   Protocol6::setPrio(word1, prio);
  385.   Protocol6::setMessageLength(word1, len32);
  386.   Protocol6::createProtocol6Header(word1, word2, word3, header);
  387.   insertPtr[0] = word1;
  388.   insertPtr[1] = word2;
  389.   insertPtr[2] = word3;
  390.   
  391.   Uint32 * tmpInserPtr = &insertPtr[3];
  392.   
  393.   if(signalIdUsed){
  394.     * tmpInserPtr = header->theSignalId;
  395.     tmpInserPtr++;
  396.   }
  397.   
  398.   memcpy(tmpInserPtr, theData, 4 * dataLen32);
  399.   tmpInserPtr += dataLen32;
  400.   for(i = 0; i<no_segs; i++){
  401.     tmpInserPtr[i] = ptr[i].sz;
  402.   }
  403.   tmpInserPtr += no_segs;
  404.   for(i = 0; i<no_segs; i++){
  405.     import(tmpInserPtr, ptr[i]);
  406.   }
  407.   
  408.   if(checksumUsed){
  409.     * tmpInserPtr = computeChecksum(&insertPtr[0], len32-1);
  410.   }
  411. }
  412. void
  413. Packer::pack(Uint32 * insertPtr, 
  414.      Uint32 prio, 
  415.      const SignalHeader * header, 
  416.      const Uint32 * theData,
  417.      class SectionSegmentPool & thePool,
  418.      const SegmentedSectionPtr ptr[3]) const {
  419.   Uint32 i;
  420.   
  421.   Uint32 dataLen32 = header->theLength;
  422.   Uint32 no_segs = header->m_noOfSections;
  423.   Uint32 len32 = 
  424.     dataLen32 + no_segs + 
  425.     checksumUsed + signalIdUsed + (sizeof(Protocol6)/4);
  426.   
  427.   for(i = 0; i<no_segs; i++){
  428.     len32 += ptr[i].sz;
  429.   }
  430.   
  431.   /**
  432.    * Do insert of data
  433.    */
  434.   Uint32 word1 = preComputedWord1;
  435.   Uint32 word2 = 0;
  436.   Uint32 word3 = 0;
  437.   
  438.   Protocol6::setPrio(word1, prio);
  439.   Protocol6::setMessageLength(word1, len32);
  440.   Protocol6::createProtocol6Header(word1, word2, word3, header);
  441.   insertPtr[0] = word1;
  442.   insertPtr[1] = word2;
  443.   insertPtr[2] = word3;
  444.   
  445.   Uint32 * tmpInserPtr = &insertPtr[3];
  446.   
  447.   if(signalIdUsed){
  448.     * tmpInserPtr = header->theSignalId;
  449.     tmpInserPtr++;
  450.   }
  451.   
  452.   memcpy(tmpInserPtr, theData, 4 * dataLen32);
  453.   
  454.   tmpInserPtr += dataLen32;
  455.   for(i = 0; i<no_segs; i++){
  456.     tmpInserPtr[i] = ptr[i].sz;
  457.   }
  458.   tmpInserPtr += no_segs;
  459.   for(i = 0; i<no_segs; i++){
  460.     copy(tmpInserPtr, thePool, ptr[i]);
  461.   }
  462.   
  463.   if(checksumUsed){
  464.     * tmpInserPtr = computeChecksum(&insertPtr[0], len32-1);
  465.   }
  466. }