TransporterRegistry.hpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:9k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. //****************************************************************************
  14. //
  15. //  NAME
  16. //      TransporterRegistry
  17. //
  18. //  DESCRIPTION
  19. //      TransporterRegistry (singelton) is the interface to the 
  20. //      transporter layer. It handles transporter states and 
  21. //      holds the transporter arrays.
  22. //
  23. //***************************************************************************/
  24. #ifndef TransporterRegistry_H
  25. #define TransporterRegistry_H
  26. #include "TransporterDefinitions.hpp"
  27. #include <SocketServer.hpp>
  28. #include <NdbTCP.h>
  29. // A transporter is always in an IOState.
  30. // NoHalt is used initially and as long as it is no restrictions on
  31. // sending or receiving.
  32. enum IOState {
  33.   NoHalt     = 0,
  34.   HaltInput  = 1,
  35.   HaltOutput = 2,
  36.   HaltIO     = 3
  37. };
  38. enum TransporterType {
  39.   tt_TCP_TRANSPORTER = 1,
  40.   tt_SCI_TRANSPORTER = 2,
  41.   tt_SHM_TRANSPORTER = 3,
  42.   tt_OSE_TRANSPORTER = 4
  43. };
  44. static const char *performStateString[] = 
  45.   { "is connected",
  46.     "is trying to connect",
  47.     "does nothing",
  48.     "is trying to disconnect" };
  49. class Transporter;
  50. class TCP_Transporter;
  51. class SCI_Transporter;
  52. class SHM_Transporter;
  53. class OSE_Transporter;
  54. class TransporterRegistry;
  55. class SocketAuthenticator;
  56. class TransporterService : public SocketServer::Service {
  57.   SocketAuthenticator * m_auth;
  58.   TransporterRegistry * m_transporter_registry;
  59. public:
  60.   TransporterService(SocketAuthenticator *auth= 0)
  61.   {
  62.     m_auth= auth;
  63.     m_transporter_registry= 0;
  64.   }
  65.   void setTransporterRegistry(TransporterRegistry *t)
  66.   {
  67.     m_transporter_registry= t;
  68.   }
  69.   SocketServer::Session * newSession(NDB_SOCKET_TYPE socket);
  70. };
  71. /**
  72.  * @class TransporterRegistry
  73.  * @brief ...
  74.  */
  75. class TransporterRegistry {
  76.   friend class OSE_Receiver;
  77.   friend class SHM_Transporter;
  78.   friend class Transporter;
  79.   friend class TransporterService;
  80. public:
  81.  /**
  82.   * Constructor
  83.   */
  84.   TransporterRegistry(void * callback = 0 , 
  85.       unsigned maxTransporters = MAX_NTRANSPORTERS, 
  86.       unsigned sizeOfLongSignalMemory = 100);
  87.   
  88.   bool init(NodeId localNodeId);
  89.   /**
  90.    * after a connect from client, perform connection using correct transporter
  91.    */
  92.   bool connect_server(NDB_SOCKET_TYPE sockfd);
  93.   /**
  94.    * Remove all transporters
  95.    */
  96.   void removeAll();
  97.   
  98.   /**
  99.    * Disconnect all transporters
  100.    */
  101.   void disconnectAll();
  102.   /**
  103.    * Stops the server, disconnects all the transporter 
  104.    * and deletes them and remove it from the transporter arrays
  105.    */
  106.   ~TransporterRegistry();
  107.   bool start_service(SocketServer& server);
  108.   bool start_clients();
  109.   bool stop_clients();
  110.   void start_clients_thread();
  111.   void update_connections();
  112.   /**
  113.    * Start/Stop receiving
  114.    */
  115.   void startReceiving();
  116.   void stopReceiving();
  117.   
  118.   /**
  119.    * Start/Stop sending
  120.    */
  121.   void startSending();
  122.   void stopSending();
  123.   // A transporter is always in a PerformState.
  124.   // PerformIO is used initially and as long as any of the events 
  125.   // PerformConnect, ... 
  126.   enum PerformState {
  127.     CONNECTED         = 0,
  128.     CONNECTING        = 1,
  129.     DISCONNECTED      = 2,
  130.     DISCONNECTING     = 3
  131.   };
  132.   const char *getPerformStateString(NodeId nodeId) const
  133.   { return performStateString[(unsigned)performStates[nodeId]]; };
  134.   /**
  135.    * Get and set methods for PerformState
  136.    */
  137.   void do_connect(NodeId node_id);
  138.   void do_disconnect(NodeId node_id);
  139.   bool is_connected(NodeId node_id) { return performStates[node_id] == CONNECTED; };
  140.   void report_connect(NodeId node_id);
  141.   void report_disconnect(NodeId node_id, int errnum);
  142.   
  143.   /**
  144.    * Get and set methods for IOState
  145.    */
  146.   IOState ioState(NodeId nodeId);
  147.   void setIOState(NodeId nodeId, IOState state);
  148.   /** 
  149.    * createTransporter
  150.    *
  151.    * If the config object indicates that the transporter
  152.    * to be created will act as a server and no server is
  153.    * started, startServer is called. A transporter of the selected kind
  154.    * is created and it is put in the transporter arrays.
  155.    */
  156.   bool createTransporter(struct TCP_TransporterConfiguration * config);
  157.   bool createTransporter(struct SCI_TransporterConfiguration * config);
  158.   bool createTransporter(struct SHM_TransporterConfiguration * config);
  159.   bool createTransporter(struct OSE_TransporterConfiguration * config);
  160.   /**
  161.    * Get free buffer space
  162.    *
  163.    *   Get #free bytes in send buffer for <em>node</node>
  164.    */
  165.   Uint32 get_free_buffer(Uint32 node) const ;
  166.   
  167.   /**
  168.    * prepareSend
  169.    *
  170.    * When IOState is HaltOutput or HaltIO do not send or insert any 
  171.    * signals in the SendBuffer, unless it is intended for the remote 
  172.    * CMVMI block (blockno 252)
  173.    * Perform prepareSend on the transporter. 
  174.    *
  175.    * NOTE signalHeader->xxxBlockRef should contain block numbers and 
  176.    *                                not references
  177.    */
  178.   SendStatus prepareSend(const SignalHeader * const signalHeader, Uint8 prio,
  179.  const Uint32 * const signalData,
  180.  NodeId nodeId, 
  181.  const LinearSectionPtr ptr[3]);
  182.   SendStatus prepareSend(const SignalHeader * const signalHeader, Uint8 prio,
  183.  const Uint32 * const signalData,
  184.  NodeId nodeId, 
  185.  class SectionSegmentPool & pool,
  186.  const SegmentedSectionPtr ptr[3]);
  187.   
  188.   /**
  189.    * external_IO
  190.    *
  191.    * Equal to: poll(...); perform_IO()
  192.    *
  193.    */
  194.   void external_IO(Uint32 timeOutMillis);
  195.   
  196.   Uint32 pollReceive(Uint32 timeOutMillis);
  197.   void performReceive();
  198.   void performSend();
  199.   
  200.   /**
  201.    * Force sending if more than or equal to sendLimit
  202.    * number have asked for send. Returns 0 if not sending
  203.    * and 1 if sending.
  204.    */
  205.   int forceSendCheck(int sendLimit);
  206.   
  207. #ifdef DEBUG_TRANSPORTER
  208.   void printState();
  209. #endif
  210.   
  211.   class Transporter_interface {
  212.   public:
  213.     unsigned short m_service_port;
  214.     const char *m_interface;
  215.   };
  216.   Vector<Transporter_interface> m_transporter_interface;
  217.   void add_transporter_interface(const char *interf, unsigned short port);
  218.   struct in_addr get_connect_address(NodeId node_id) const;
  219. protected:
  220.   
  221. private:
  222.   void * callbackObj;
  223.   struct NdbThread   *m_start_clients_thread;
  224.   bool                m_run_start_clients_thread;
  225.   int sendCounter;
  226.   NodeId localNodeId;
  227.   bool nodeIdSpecified;
  228.   unsigned maxTransporters;
  229.   int nTransporters;
  230.   int nTCPTransporters;
  231.   int nSCITransporters;
  232.   int nSHMTransporters;
  233.   int nOSETransporters;
  234.   /**
  235.    * Arrays holding all transporters in the order they are created
  236.    */
  237.   TCP_Transporter** theTCPTransporters;
  238.   SCI_Transporter** theSCITransporters;
  239.   SHM_Transporter** theSHMTransporters;
  240.   OSE_Transporter** theOSETransporters;
  241.   
  242.   /**
  243.    * Array, indexed by nodeId, holding all transporters
  244.    */
  245.   TransporterType* theTransporterTypes;
  246.   Transporter**    theTransporters;
  247.   /**
  248.    * OSE Receiver
  249.    */
  250.   class OSE_Receiver * theOSEReceiver;
  251.   /**
  252.    * In OSE you for some bizar reason needs to create a socket
  253.    *  the first thing you do when using inet functions.
  254.    *
  255.    * Furthermore a process doing select has to "own" a socket
  256.    * 
  257.    */  
  258.   int theOSEJunkSocketSend;
  259.   int theOSEJunkSocketRecv;
  260. #if defined NDB_OSE || defined NDB_SOFTOSE
  261.   PROCESS theReceiverPid;
  262. #endif
  263.   
  264.   /** 
  265.    * State arrays, index by host id
  266.    */
  267.   PerformState* performStates;
  268.   IOState*      ioStates;
  269.  
  270.   /**
  271.    * Unpack signal data
  272.    */
  273.   Uint32 unpack(Uint32 * readPtr,
  274. Uint32 bufferSize,
  275. NodeId remoteNodeId, 
  276. IOState state);
  277.   
  278.   Uint32 * unpack(Uint32 * readPtr,
  279.   Uint32 * eodPtr,
  280.   NodeId remoteNodeId,
  281.   IOState state);
  282.   /** 
  283.    * Disconnect the transporter and remove it from 
  284.    * theTransporters array. Do not allow any holes 
  285.    * in theTransporters. Delete the transporter 
  286.    * and remove it from theIndexedTransporters array
  287.    */
  288.   void removeTransporter(NodeId nodeId);
  289.   
  290.   /**
  291.    * Used in polling if exists TCP_Transporter
  292.    */
  293.   int tcpReadSelectReply;
  294.   fd_set tcpReadset;
  295.   
  296.   Uint32 poll_OSE(Uint32 timeOutMillis);
  297.   Uint32 poll_TCP(Uint32 timeOutMillis);
  298.   Uint32 poll_SCI(Uint32 timeOutMillis);
  299.   Uint32 poll_SHM(Uint32 timeOutMillis);
  300.   int m_shm_own_pid;
  301. };
  302. #endif // Define of TransporterRegistry_H