TCP_Transporter.hpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:5k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #ifndef TCP_TRANSPORTER_HPP
  14. #define TCP_TRANSPORTER_HPP
  15. #include "Transporter.hpp"
  16. #include "SendBuffer.hpp"
  17. #include <NdbTCP.h>
  18. struct ReceiveBuffer {
  19.   Uint32 *startOfBuffer;    // Pointer to start of the receive buffer 
  20.   Uint32 *readPtr;          // Pointer to start reading data
  21.   
  22.   char   *insertPtr;        // Pointer to first position in the receiveBuffer
  23.                             // in which to insert received data. Earlier
  24.                             // received incomplete messages (slack) are 
  25.                             // copied into the first part of the receiveBuffer
  26.   Uint32 sizeOfData;        // In bytes
  27.   Uint32 sizeOfBuffer;
  28.   
  29.   bool init(int bytes);
  30.   void destroy();
  31.   
  32.   void clear();
  33.   void incompleteMessage();
  34. };
  35. class TCP_Transporter : public Transporter {
  36.   friend class TransporterRegistry;
  37. private:
  38.   // Initialize member variables
  39.   TCP_Transporter(TransporterRegistry&,
  40.   int sendBufferSize, int maxReceiveSize,
  41.   const char *lHostName,
  42.   const char *rHostName, 
  43.   int r_port, 
  44.   NodeId lHostId,
  45.   NodeId rHostId,
  46.   bool checksum, bool signalId,
  47.   Uint32 reportFreq = 4096);
  48.   
  49.   // Disconnect, delete send buffers and receive buffer
  50.   virtual ~TCP_Transporter();
  51.   /**
  52.    * Allocate buffers for sending and receiving
  53.    */
  54.   bool initTransporter();
  55.   Uint32 * getWritePtr(Uint32 lenBytes, Uint32 prio);
  56.   void updateWritePtr(Uint32 lenBytes, Uint32 prio);
  57.   
  58.   bool hasDataToSend() const ;
  59.   /**
  60.    * Retrieves the contents of the send buffers and writes it on 
  61.    * the external TCP/IP interface until the send buffers are empty
  62.    * and as long as write is possible.
  63.    */
  64.   bool doSend();
  65.   
  66.   /**
  67.    * It reads the external TCP/IP interface once 
  68.    * and puts the data in the receiveBuffer
  69.    */
  70.   int doReceive(); 
  71.   /**
  72.    * Returns socket (used for select)
  73.    */
  74.   NDB_SOCKET_TYPE getSocket() const;
  75.   /**
  76.    * Get Receive Data
  77.    *
  78.    *  Returns - no of bytes to read
  79.    *            and set ptr
  80.    */
  81.   virtual Uint32 getReceiveData(Uint32 ** ptr);
  82.   
  83.   /**
  84.    * Update receive data ptr
  85.    */
  86.   virtual void updateReceiveDataPtr(Uint32 bytesRead);
  87.   virtual Uint32 get_free_buffer() const;
  88. protected:
  89.   /**
  90.    * Setup client/server and perform connect/accept
  91.    * Is used both by clients and servers
  92.    * A client connects to the remote server
  93.    * A server accepts any new connections
  94.    */
  95.   virtual bool connect_server_impl(NDB_SOCKET_TYPE sockfd);
  96.   virtual bool connect_client_impl(NDB_SOCKET_TYPE sockfd);
  97.   bool connect_common(NDB_SOCKET_TYPE sockfd);
  98.   
  99.   /**
  100.    * Disconnects a TCP/IP node. Empty send and receivebuffer.
  101.    */
  102.   virtual void disconnectImpl();
  103.   
  104. private:
  105.   /**
  106.    * Send buffers
  107.    */
  108.   SendBuffer m_sendBuffer;
  109.   
  110.   // Sending/Receiving socket used by both client and server
  111.   NDB_SOCKET_TYPE theSocket;   
  112.   
  113.   Uint32 maxReceiveSize;
  114.   
  115.   /**
  116.    * Socket options
  117.    */
  118.   int sockOptRcvBufSize;
  119.   int sockOptSndBufSize;
  120.   int sockOptNodelay;
  121.   int sockOptTcpMaxSeg;
  122.   void setSocketOptions();
  123.   static bool setSocketNonBlocking(NDB_SOCKET_TYPE aSocket);
  124.   
  125.   bool sendIsPossible(struct timeval * timeout);
  126.   /**
  127.    * Statistics
  128.    */
  129.   Uint32 reportFreq;
  130.   Uint32 receiveCount;
  131.   Uint64 receiveSize;
  132.   Uint32 sendCount;
  133.   Uint64 sendSize;
  134.   ReceiveBuffer receiveBuffer;
  135. #if defined NDB_OSE || defined NDB_SOFTOSE
  136.   PROCESS theReceiverPid;
  137. #endif
  138. };
  139. inline
  140. NDB_SOCKET_TYPE
  141. TCP_Transporter::getSocket() const {
  142.   return theSocket;
  143. }
  144. inline
  145. Uint32
  146. TCP_Transporter::getReceiveData(Uint32 ** ptr){
  147.   (* ptr) = receiveBuffer.readPtr;
  148.   return receiveBuffer.sizeOfData;
  149. }
  150. inline
  151. void
  152. TCP_Transporter::updateReceiveDataPtr(Uint32 bytesRead){
  153.   char * ptr = (char *)receiveBuffer.readPtr;
  154.   ptr += bytesRead;
  155.   receiveBuffer.readPtr = (Uint32*)ptr;
  156.   receiveBuffer.sizeOfData -= bytesRead;
  157.   receiveBuffer.incompleteMessage();
  158. }
  159. inline
  160. bool
  161. TCP_Transporter::hasDataToSend() const {
  162.   return m_sendBuffer.dataSize > 0;
  163. }
  164. inline
  165. bool
  166. ReceiveBuffer::init(int bytes){
  167. #ifdef DEBUG_TRANSPORTER
  168.   ndbout << "Allocating " << bytes << " bytes as receivebuffer" << endl;
  169. #endif
  170.   startOfBuffer = new Uint32[((bytes + 0) >> 2) + 1];
  171.   sizeOfBuffer  = bytes + sizeof(Uint32);
  172.   clear();
  173.   return true;
  174. }
  175. inline
  176. void
  177. ReceiveBuffer::destroy(){
  178.   delete[] startOfBuffer;
  179.   sizeOfBuffer  = 0;
  180.   startOfBuffer = 0;
  181.   clear();
  182. }
  183. inline
  184. void
  185. ReceiveBuffer::clear(){
  186.   readPtr    = startOfBuffer;
  187.   insertPtr  = (char *)startOfBuffer;
  188.   sizeOfData = 0;
  189. }
  190. inline
  191. void
  192. ReceiveBuffer::incompleteMessage() {
  193.   if(startOfBuffer != readPtr){
  194.     if(sizeOfData != 0)
  195.       memmove(startOfBuffer, readPtr, sizeOfData);
  196.     readPtr   = startOfBuffer;
  197.     insertPtr = ((char *)startOfBuffer) + sizeOfData;
  198.   }
  199. }
  200. #endif // Define of TCP_Transporter_H