TCP_Transporter.hpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:5k
源码类别:
MySQL数据库
开发平台:
Visual C++
- /* Copyright (C) 2003 MySQL AB
- This program is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation; either version 2 of the License, or
- (at your option) any later version.
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
- You should have received a copy of the GNU General Public License
- along with this program; if not, write to the Free Software
- Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
- #ifndef TCP_TRANSPORTER_HPP
- #define TCP_TRANSPORTER_HPP
- #include "Transporter.hpp"
- #include "SendBuffer.hpp"
- #include <NdbTCP.h>
- struct ReceiveBuffer {
- Uint32 *startOfBuffer; // Pointer to start of the receive buffer
- Uint32 *readPtr; // Pointer to start reading data
- char *insertPtr; // Pointer to first position in the receiveBuffer
- // in which to insert received data. Earlier
- // received incomplete messages (slack) are
- // copied into the first part of the receiveBuffer
- Uint32 sizeOfData; // In bytes
- Uint32 sizeOfBuffer;
- bool init(int bytes);
- void destroy();
- void clear();
- void incompleteMessage();
- };
- class TCP_Transporter : public Transporter {
- friend class TransporterRegistry;
- private:
- // Initialize member variables
- TCP_Transporter(TransporterRegistry&,
- int sendBufferSize, int maxReceiveSize,
- const char *lHostName,
- const char *rHostName,
- int r_port,
- NodeId lHostId,
- NodeId rHostId,
- bool checksum, bool signalId,
- Uint32 reportFreq = 4096);
- // Disconnect, delete send buffers and receive buffer
- virtual ~TCP_Transporter();
- /**
- * Allocate buffers for sending and receiving
- */
- bool initTransporter();
- Uint32 * getWritePtr(Uint32 lenBytes, Uint32 prio);
- void updateWritePtr(Uint32 lenBytes, Uint32 prio);
- bool hasDataToSend() const ;
- /**
- * Retrieves the contents of the send buffers and writes it on
- * the external TCP/IP interface until the send buffers are empty
- * and as long as write is possible.
- */
- bool doSend();
- /**
- * It reads the external TCP/IP interface once
- * and puts the data in the receiveBuffer
- */
- int doReceive();
- /**
- * Returns socket (used for select)
- */
- NDB_SOCKET_TYPE getSocket() const;
- /**
- * Get Receive Data
- *
- * Returns - no of bytes to read
- * and set ptr
- */
- virtual Uint32 getReceiveData(Uint32 ** ptr);
- /**
- * Update receive data ptr
- */
- virtual void updateReceiveDataPtr(Uint32 bytesRead);
- virtual Uint32 get_free_buffer() const;
- protected:
- /**
- * Setup client/server and perform connect/accept
- * Is used both by clients and servers
- * A client connects to the remote server
- * A server accepts any new connections
- */
- virtual bool connect_server_impl(NDB_SOCKET_TYPE sockfd);
- virtual bool connect_client_impl(NDB_SOCKET_TYPE sockfd);
- bool connect_common(NDB_SOCKET_TYPE sockfd);
- /**
- * Disconnects a TCP/IP node. Empty send and receivebuffer.
- */
- virtual void disconnectImpl();
- private:
- /**
- * Send buffers
- */
- SendBuffer m_sendBuffer;
- // Sending/Receiving socket used by both client and server
- NDB_SOCKET_TYPE theSocket;
- Uint32 maxReceiveSize;
- /**
- * Socket options
- */
- int sockOptRcvBufSize;
- int sockOptSndBufSize;
- int sockOptNodelay;
- int sockOptTcpMaxSeg;
- void setSocketOptions();
- static bool setSocketNonBlocking(NDB_SOCKET_TYPE aSocket);
- bool sendIsPossible(struct timeval * timeout);
- /**
- * Statistics
- */
- Uint32 reportFreq;
- Uint32 receiveCount;
- Uint64 receiveSize;
- Uint32 sendCount;
- Uint64 sendSize;
- ReceiveBuffer receiveBuffer;
- #if defined NDB_OSE || defined NDB_SOFTOSE
- PROCESS theReceiverPid;
- #endif
- };
- inline
- NDB_SOCKET_TYPE
- TCP_Transporter::getSocket() const {
- return theSocket;
- }
- inline
- Uint32
- TCP_Transporter::getReceiveData(Uint32 ** ptr){
- (* ptr) = receiveBuffer.readPtr;
- return receiveBuffer.sizeOfData;
- }
- inline
- void
- TCP_Transporter::updateReceiveDataPtr(Uint32 bytesRead){
- char * ptr = (char *)receiveBuffer.readPtr;
- ptr += bytesRead;
- receiveBuffer.readPtr = (Uint32*)ptr;
- receiveBuffer.sizeOfData -= bytesRead;
- receiveBuffer.incompleteMessage();
- }
- inline
- bool
- TCP_Transporter::hasDataToSend() const {
- return m_sendBuffer.dataSize > 0;
- }
- inline
- bool
- ReceiveBuffer::init(int bytes){
- #ifdef DEBUG_TRANSPORTER
- ndbout << "Allocating " << bytes << " bytes as receivebuffer" << endl;
- #endif
- startOfBuffer = new Uint32[((bytes + 0) >> 2) + 1];
- sizeOfBuffer = bytes + sizeof(Uint32);
- clear();
- return true;
- }
- inline
- void
- ReceiveBuffer::destroy(){
- delete[] startOfBuffer;
- sizeOfBuffer = 0;
- startOfBuffer = 0;
- clear();
- }
- inline
- void
- ReceiveBuffer::clear(){
- readPtr = startOfBuffer;
- insertPtr = (char *)startOfBuffer;
- sizeOfData = 0;
- }
- inline
- void
- ReceiveBuffer::incompleteMessage() {
- if(startOfBuffer != readPtr){
- if(sizeOfData != 0)
- memmove(startOfBuffer, readPtr, sizeOfData);
- readPtr = startOfBuffer;
- insertPtr = ((char *)startOfBuffer) + sizeOfData;
- }
- }
- #endif // Define of TCP_Transporter_H