Transporter.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:5k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #include <TransporterRegistry.hpp>
  14. #include <TransporterCallback.hpp>
  15. #include "Transporter.hpp"
  16. #include "TransporterInternalDefinitions.hpp"
  17. #include <NdbSleep.h>
  18. #include <SocketAuthenticator.hpp>
  19. #include <InputStream.hpp>
  20. #include <OutputStream.hpp>
  21. #include <EventLogger.hpp>
  22. extern EventLogger g_eventLogger;
  23. Transporter::Transporter(TransporterRegistry &t_reg,
  24.  TransporterType _type,
  25.  const char *lHostName,
  26.  const char *rHostName, 
  27.  int r_port,
  28.  NodeId lNodeId,
  29.  NodeId rNodeId, 
  30.  int _byteorder, 
  31.  bool _compression, bool _checksum, bool _signalId)
  32.   : m_r_port(r_port), remoteNodeId(rNodeId), localNodeId(lNodeId),
  33.     isServer(lNodeId < rNodeId),
  34.     m_packer(_signalId, _checksum),
  35.     m_type(_type),
  36.     m_transporter_registry(t_reg)
  37. {
  38.   DBUG_ENTER("Transporter::Transporter");
  39.   if (rHostName && strlen(rHostName) > 0){
  40.     strncpy(remoteHostName, rHostName, sizeof(remoteHostName));
  41.     Ndb_getInAddr(&remoteHostAddress, rHostName);
  42.   }
  43.   else
  44.   {
  45.     if (!isServer) {
  46.       ndbout << "Unable to setup transporter. Node " << rNodeId 
  47.      << " must have hostname. Update configuration." << endl; 
  48.       exit(-1);
  49.     }
  50.     remoteHostName[0]= 0;
  51.   }
  52.   strncpy(localHostName, lHostName, sizeof(localHostName));
  53.   if (strlen(lHostName) > 0)
  54.     Ndb_getInAddr(&localHostAddress, lHostName);
  55.   DBUG_PRINT("info",("rId=%d lId=%d isServer=%d rHost=%s lHost=%s r_port=%d",
  56.      remoteNodeId, localNodeId, isServer,
  57.      remoteHostName, localHostName,
  58.      r_port));
  59.   byteOrder       = _byteorder;
  60.   compressionUsed = _compression;
  61.   checksumUsed    = _checksum;
  62.   signalIdUsed    = _signalId;
  63.   m_connected     = false;
  64.   m_timeOutMillis = 1000;
  65.   m_connect_address.s_addr= 0;
  66.   if (isServer)
  67.     m_socket_client= 0;
  68.   else
  69.     m_socket_client= new SocketClient(remoteHostName, r_port,
  70.       new SocketAuthSimple("ndbd",
  71.    "ndbd passwd"));
  72.   DBUG_VOID_RETURN;
  73. }
  74. Transporter::~Transporter(){
  75.   if (m_socket_client)
  76.     delete m_socket_client;
  77. }
  78. bool
  79. Transporter::connect_server(NDB_SOCKET_TYPE sockfd) {
  80.   // all initial negotiation is done in TransporterRegistry::connect_server
  81.   DBUG_ENTER("Transporter::connect_server");
  82.   if(m_connected)
  83.   {
  84.     DBUG_RETURN(true); // TODO assert(0);
  85.   }
  86.   
  87.   {
  88.     struct sockaddr addr;
  89.     SOCKET_SIZE_TYPE addrlen= sizeof(addr);
  90.     int r= getpeername(sockfd, &addr, &addrlen);
  91.     m_connect_address= ((struct sockaddr_in *)&addr)->sin_addr;
  92.   }
  93.   bool res = connect_server_impl(sockfd);
  94.   if(res){
  95.     m_connected  = true;
  96.     m_errorCount = 0;
  97.   }
  98.   DBUG_RETURN(res);
  99. }
  100. bool
  101. Transporter::connect_client() {
  102.   if(m_connected)
  103.     return true;
  104.   NDB_SOCKET_TYPE sockfd = m_socket_client->connect();
  105.   
  106.   if (sockfd == NDB_INVALID_SOCKET)
  107.     return false;
  108.   DBUG_ENTER("Transporter::connect_client");
  109.   // send info about own id
  110.   // send info about own transporter type
  111.   SocketOutputStream s_output(sockfd);
  112.   s_output.println("%d %d", localNodeId, m_type);
  113.   // get remote id
  114.   int nodeId, remote_transporter_type= -1;
  115.   SocketInputStream s_input(sockfd);
  116.   char buf[256];
  117.   if (s_input.gets(buf, 256) == 0) {
  118.     NDB_CLOSE_SOCKET(sockfd);
  119.     DBUG_RETURN(false);
  120.   }
  121.   int r= sscanf(buf, "%d %d", &nodeId, &remote_transporter_type);
  122.   switch (r) {
  123.   case 2:
  124.     break;
  125.   case 1:
  126.     // we're running version prior to 4.1.9
  127.     // ok, but with no checks on transporter configuration compatability
  128.     break;
  129.   default:
  130.     NDB_CLOSE_SOCKET(sockfd);
  131.     DBUG_RETURN(false);
  132.   }
  133.   DBUG_PRINT("info", ("nodeId=%d remote_transporter_type=%d",
  134.       nodeId, remote_transporter_type));
  135.   if (remote_transporter_type != -1)
  136.   {
  137.     if (remote_transporter_type != m_type)
  138.     {
  139.       DBUG_PRINT("error", ("Transporter types mismatch this=%d remote=%d",
  140.    m_type, remote_transporter_type));
  141.       NDB_CLOSE_SOCKET(sockfd);
  142.       g_eventLogger.error("Incompatible configuration: transporter type "
  143.   "mismatch with node %d", nodeId);
  144.       DBUG_RETURN(false);
  145.     }
  146.   }
  147.   else if (m_type == tt_SHM_TRANSPORTER)
  148.   {
  149.     g_eventLogger.warning("Unable to verify transporter compatability with node %d", nodeId);
  150.   }
  151.   {
  152.     struct sockaddr addr;
  153.     SOCKET_SIZE_TYPE addrlen= sizeof(addr);
  154.     int r= getpeername(sockfd, &addr, &addrlen);
  155.     m_connect_address= ((struct sockaddr_in *)&addr)->sin_addr;
  156.   }
  157.   bool res = connect_client_impl(sockfd);
  158.   if(res){
  159.     m_connected  = true;
  160.     m_errorCount = 0;
  161.   }
  162.   DBUG_RETURN(res);
  163. }
  164. void
  165. Transporter::doDisconnect() {
  166.   if(!m_connected)
  167.     return; //assert(0); TODO will fail
  168.   m_connected= false;
  169.   disconnectImpl();
  170. }