TCP_Transporter.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:12k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #include <ndb_global.h>
  14. #include <NdbTCP.h>
  15. #include "TCP_Transporter.hpp"
  16. #include <NdbOut.hpp>
  17. #include <NdbSleep.h>
  18. // End of stuff to be moved
  19. #if defined NDB_OSE || defined NDB_SOFTOSE
  20. #define inet_send inet_send
  21. #else
  22. #define inet_send send
  23. #endif
  24. #ifdef NDB_WIN32
  25. class ndbstrerror
  26. {
  27. public:
  28.   ndbstrerror(int iError);
  29.   ~ndbstrerror(void);
  30.   operator char*(void) { return m_szError; };
  31. private:
  32.   int m_iError;
  33.   char* m_szError;
  34. };
  35. ndbstrerror::ndbstrerror(int iError)
  36. : m_iError(iError)
  37. {
  38.   FormatMessage( 
  39.     FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS,
  40.     0,
  41.     iError,
  42.     MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
  43.     (LPTSTR)&m_szError,
  44.     0,
  45.     0);
  46. }
  47. ndbstrerror::~ndbstrerror(void)
  48. {
  49.   LocalFree( m_szError );
  50.   m_szError = 0;
  51. }
  52. #else
  53. #define ndbstrerror strerror
  54. #endif
  55. TCP_Transporter::TCP_Transporter(TransporterRegistry &t_reg,
  56.  int sendBufSize, int maxRecvSize, 
  57.                                  const char *lHostName,
  58.                                  const char *rHostName, 
  59.                                  int r_port,
  60.  NodeId lNodeId,
  61.                                  NodeId rNodeId,
  62.                                  bool chksm, bool signalId,
  63.                                  Uint32 _reportFreq) :
  64.   Transporter(t_reg, tt_TCP_TRANSPORTER,
  65.       lHostName, rHostName, r_port, lNodeId, rNodeId,
  66.       0, false, chksm, signalId),
  67.   m_sendBuffer(sendBufSize)
  68. {
  69.   maxReceiveSize = maxRecvSize;
  70.   
  71.   // Initialize member variables
  72.   theSocket     = NDB_INVALID_SOCKET;
  73.   
  74.   sendCount      = receiveCount = 0;
  75.   sendSize       = receiveSize  = 0;
  76.   reportFreq     = _reportFreq;
  77.   sockOptRcvBufSize = 70080;
  78.   sockOptSndBufSize = 71540;
  79.   sockOptNodelay    = 1;
  80.   sockOptTcpMaxSeg  = 4096;
  81. }
  82. TCP_Transporter::~TCP_Transporter() {
  83.   
  84.   // Disconnect
  85.   if (theSocket != NDB_INVALID_SOCKET)
  86.     doDisconnect();
  87.   
  88.   // Delete send buffers
  89.   
  90.   // Delete receive buffer!!
  91.   receiveBuffer.destroy();
  92. }
  93. bool TCP_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd)
  94. {
  95.   DBUG_ENTER("TCP_Transpporter::connect_server_impl");
  96.   DBUG_RETURN(connect_common(sockfd));
  97. }
  98. bool TCP_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd)
  99. {
  100.   DBUG_ENTER("TCP_Transpporter::connect_client_impl");
  101.   DBUG_RETURN(connect_common(sockfd));
  102. }
  103. bool TCP_Transporter::connect_common(NDB_SOCKET_TYPE sockfd)
  104. {
  105.   theSocket = sockfd;
  106.   setSocketOptions();
  107.   setSocketNonBlocking(theSocket);
  108.   DBUG_PRINT("info", ("Successfully set-up TCP transporter to node %d",
  109.               remoteNodeId));
  110.   return true;
  111. }
  112. bool
  113. TCP_Transporter::initTransporter() {
  114.   
  115.   // Allocate buffer for receiving
  116.   // Let it be the maximum size we receive plus 8 kB for any earlier received
  117.   // incomplete messages (slack)
  118.   Uint32 recBufSize = maxReceiveSize;
  119.   if(recBufSize < MAX_MESSAGE_SIZE){
  120.     recBufSize = MAX_MESSAGE_SIZE;
  121.   }
  122.   
  123.   if(!receiveBuffer.init(recBufSize+MAX_MESSAGE_SIZE)){
  124.     return false;
  125.   }
  126.   
  127.   // Allocate buffers for sending
  128.   if (!m_sendBuffer.initBuffer(remoteNodeId)) {
  129.     // XXX What shall be done here? 
  130.     // The same is valid for the other init-methods 
  131.     return false;
  132.   }
  133.   
  134.   return true;
  135. }
  136. void
  137. TCP_Transporter::setSocketOptions(){
  138.   if (setsockopt(theSocket, SOL_SOCKET, SO_RCVBUF,
  139.                  (char*)&sockOptRcvBufSize, sizeof(sockOptRcvBufSize)) < 0) {
  140. #ifdef DEBUG_TRANSPORTER
  141.     ndbout_c("The setsockopt SO_RCVBUF error code = %d", InetErrno);
  142. #endif
  143.   }//if
  144.   
  145.   if (setsockopt(theSocket, SOL_SOCKET, SO_SNDBUF,
  146.                  (char*)&sockOptSndBufSize, sizeof(sockOptSndBufSize)) < 0) {
  147. #ifdef DEBUG_TRANSPORTER
  148.     ndbout_c("The setsockopt SO_SNDBUF error code = %d", InetErrno);
  149. #endif
  150.   }//if
  151.   
  152.   //-----------------------------------------------
  153.   // Set the TCP_NODELAY option so also small packets are sent
  154.   // as soon as possible
  155.   //-----------------------------------------------
  156.   if (setsockopt(theSocket, IPPROTO_TCP, TCP_NODELAY, 
  157.                  (char*)&sockOptNodelay, sizeof(sockOptNodelay)) < 0) {
  158. #ifdef DEBUG_TRANSPORTER
  159.     ndbout_c("The setsockopt TCP_NODELAY error code = %d", InetErrno);
  160. #endif
  161.   }//if
  162. }
  163. #ifdef NDB_WIN32
  164. bool
  165. TCP_Transporter::setSocketNonBlocking(NDB_SOCKET_TYPE socket){
  166.   unsigned long  ul = 1;
  167.   if(ioctlsocket(socket, FIONBIO, &ul))
  168.   {
  169. #ifdef DEBUG_TRANSPORTER
  170.     ndbout_c("Set non-blocking server error3: %d", InetErrno);
  171. #endif
  172.   }//if
  173.   return true;
  174. }
  175. #else
  176. bool
  177. TCP_Transporter::setSocketNonBlocking(NDB_SOCKET_TYPE socket){
  178.   int flags;
  179.   flags = fcntl(socket, F_GETFL, 0);
  180.   if (flags < 0) {
  181. #ifdef DEBUG_TRANSPORTER
  182.     ndbout_c("Set non-blocking server error1: %s", strerror(InetErrno));
  183. #endif
  184.   }//if
  185.   flags |= NDB_NONBLOCK;
  186.   if (fcntl(socket, F_SETFL, flags) == -1) {
  187. #ifdef DEBUG_TRANSPORTER
  188.     ndbout_c("Set non-blocking server error2: %s", strerror(InetErrno));
  189. #endif
  190.   }//if
  191.   return true;
  192. }
  193. #endif
  194. bool
  195. TCP_Transporter::sendIsPossible(struct timeval * timeout) {
  196. #ifdef NDB_OSE
  197.   /**
  198.    * In OSE you cant do select without owning a socket,
  199.    * and since this method might be called by any thread in the api
  200.    * we choose not to implementet and always return true after sleeping
  201.    * a while.
  202.    *
  203.    * Note that this only sensible as long as the sockets are non blocking
  204.    */
  205.   if(theSocket >= 0){
  206.     Uint32 timeOutMillis = timeout->tv_sec * 1000 + timeout->tv_usec / 1000;
  207.     NdbSleep_MilliSleep(timeOutMillis);
  208.     return true;
  209.   }
  210.   return false;
  211. #else
  212.   if(theSocket != NDB_INVALID_SOCKET){
  213.     fd_set   writeset;
  214.     FD_ZERO(&writeset);
  215.     FD_SET(theSocket, &writeset);
  216.     
  217.     int selectReply = select(theSocket + 1, NULL, &writeset, NULL, timeout);
  218.     if ((selectReply > 0) && FD_ISSET(theSocket, &writeset)) 
  219.       return true;
  220.     else
  221.       return false;
  222.   }
  223.   return false;
  224. #endif
  225. }
  226. Uint32
  227. TCP_Transporter::get_free_buffer() const 
  228. {
  229.   return m_sendBuffer.bufferSizeRemaining();
  230. }
  231. Uint32 *
  232. TCP_Transporter::getWritePtr(Uint32 lenBytes, Uint32 prio){
  233.   
  234.   Uint32 * insertPtr = m_sendBuffer.getInsertPtr(lenBytes);
  235.   
  236.   struct timeval timeout = {0, 10000};
  237.   if (insertPtr == 0) {
  238.     //-------------------------------------------------
  239.     // Buffer was completely full. We have severe problems.
  240.     // We will attempt to wait for a small time
  241.     //-------------------------------------------------
  242.     if(sendIsPossible(&timeout)) {
  243.       //-------------------------------------------------
  244.       // Send is possible after the small timeout.
  245.       //-------------------------------------------------
  246.       if(!doSend()){
  247. return 0;
  248.       } else {
  249. //-------------------------------------------------
  250. // Since send was successful we will make a renewed
  251. // attempt at inserting the signal into the buffer.
  252. //-------------------------------------------------
  253.         insertPtr = m_sendBuffer.getInsertPtr(lenBytes);
  254.       }//if
  255.     } else {
  256.       return 0;
  257.     }//if
  258.   }
  259.   return insertPtr;
  260. }
  261. void
  262. TCP_Transporter::updateWritePtr(Uint32 lenBytes, Uint32 prio){
  263.   m_sendBuffer.updateInsertPtr(lenBytes);
  264.   
  265.   const int bufsize = m_sendBuffer.bufferSize();
  266.   if(bufsize > TCP_SEND_LIMIT) {
  267.     //-------------------------------------------------
  268.     // Buffer is full and we are ready to send. We will
  269.     // not wait since the signal is already in the buffer.
  270.     // Force flag set has the same indication that we
  271.     // should always send. If it is not possible to send
  272.     // we will not worry since we will soon be back for
  273.     // a renewed trial.
  274.     //-------------------------------------------------
  275.     struct timeval no_timeout = {0,0};
  276.     if(sendIsPossible(&no_timeout)) {
  277.       //-------------------------------------------------
  278.       // Send was possible, attempt at a send.
  279.       //-------------------------------------------------
  280.       doSend();
  281.     }//if
  282.   }
  283. }
  284. #define DISCONNECT_ERRNO(e, sz) ((sz == 0) || 
  285.                (!((sz == -1) && (e == EAGAIN) || (e == EWOULDBLOCK) || (e == EINTR))))
  286. bool
  287. TCP_Transporter::doSend() {
  288.   // If no sendbuffers are used nothing is done
  289.   // Sends the contents of the SendBuffers until they are empty
  290.   // or until select does not select the socket for write.
  291.   // Before calling send, the socket must be selected for write
  292.   // using "select"
  293.   // It writes on the external TCP/IP interface until the send buffer is empty
  294.   // and as long as write is possible (test it using select)
  295.   // Empty the SendBuffers
  296.   
  297.   const char * const sendPtr = m_sendBuffer.sendPtr;
  298.   const Uint32 sizeToSend    = m_sendBuffer.sendDataSize;
  299.   if (sizeToSend > 0){
  300.     const int nBytesSent = inet_send(theSocket, sendPtr, sizeToSend, 0);
  301.     
  302.     if (nBytesSent > 0) {
  303.       m_sendBuffer.bytesSent(nBytesSent);
  304.       
  305.       sendCount ++;
  306.       sendSize  += nBytesSent;
  307.       if(sendCount == reportFreq){
  308. reportSendLen(get_callback_obj(), remoteNodeId, sendCount, sendSize);
  309. sendCount = 0;
  310. sendSize  = 0;
  311.       }
  312.     } else {
  313.       // Send failed
  314. #if defined DEBUG_TRANSPORTER
  315.       ndbout_c("Send Failure(disconnect==%d) to node = %d nBytesSent = %d "
  316.        "errno = %d strerror = %s",
  317.        DISCONNECT_ERRNO(InetErrno, nBytesSent),
  318.        remoteNodeId, nBytesSent, InetErrno, 
  319.        (char*)ndbstrerror(InetErrno));
  320. #endif   
  321.       if(DISCONNECT_ERRNO(InetErrno, nBytesSent)){
  322. doDisconnect();
  323. report_disconnect(InetErrno);
  324.       }
  325.       
  326.       return false;
  327.     }
  328.   }
  329.   return true;
  330. }
  331. int
  332. TCP_Transporter::doReceive() {
  333.   // Select-function must return the socket for read
  334.   // before this method is called
  335.   // It reads the external TCP/IP interface once
  336.   Uint32 size = receiveBuffer.sizeOfBuffer - receiveBuffer.sizeOfData;
  337.   if(size > 0){
  338.     const int nBytesRead = recv(theSocket, 
  339. receiveBuffer.insertPtr, 
  340. size < maxReceiveSize ? size : maxReceiveSize, 
  341. 0);
  342.     
  343.     if (nBytesRead > 0) {
  344.       receiveBuffer.sizeOfData += nBytesRead;
  345.       receiveBuffer.insertPtr  += nBytesRead;
  346.       
  347.       if(receiveBuffer.sizeOfData > receiveBuffer.sizeOfBuffer){
  348. #ifdef DEBUG_TRANSPORTER
  349. ndbout_c("receiveBuffer.sizeOfData(%d) > receiveBuffer.sizeOfBuffer(%d)",
  350.  receiveBuffer.sizeOfData, receiveBuffer.sizeOfBuffer);
  351. ndbout_c("nBytesRead = %d", nBytesRead);
  352. #endif
  353. ndbout_c("receiveBuffer.sizeOfData(%d) > receiveBuffer.sizeOfBuffer(%d)",
  354.  receiveBuffer.sizeOfData, receiveBuffer.sizeOfBuffer);
  355. report_error(TE_INVALID_MESSAGE_LENGTH);
  356. return 0;
  357.       }
  358.       
  359.       receiveCount ++;
  360.       receiveSize  += nBytesRead;
  361.       
  362.       if(receiveCount == reportFreq){
  363. reportReceiveLen(get_callback_obj(), remoteNodeId, receiveCount, receiveSize);
  364. receiveCount = 0;
  365. receiveSize  = 0;
  366.       }
  367.       return nBytesRead;
  368.     } else {
  369. #if defined DEBUG_TRANSPORTER
  370.       ndbout_c("Receive Failure(disconnect==%d) to node = %d nBytesSent = %d "
  371.        "errno = %d strerror = %s",
  372.        DISCONNECT_ERRNO(InetErrno, nBytesRead),
  373.        remoteNodeId, nBytesRead, InetErrno, 
  374.        (char*)ndbstrerror(InetErrno));
  375. #endif   
  376.       if(DISCONNECT_ERRNO(InetErrno, nBytesRead)){
  377. // The remote node has closed down
  378. doDisconnect();
  379. report_disconnect(InetErrno);
  380.       } 
  381.     }
  382.     return nBytesRead;
  383.   } else {
  384.     return 0;
  385.   }
  386. }
  387. void
  388. TCP_Transporter::disconnectImpl() {
  389.   if(theSocket != NDB_INVALID_SOCKET){
  390.     if(NDB_CLOSE_SOCKET(theSocket) < 0){
  391.       report_error(TE_ERROR_CLOSING_SOCKET);
  392.     }
  393.   }
  394.   
  395.   // Empty send och receive buffers 
  396.   receiveBuffer.clear();
  397.   m_sendBuffer.emptyBuffer();
  398.   theSocket = NDB_INVALID_SOCKET;
  399. }