TransporterFacade.hpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:9k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #ifndef TransporterFacade_H
  14. #define TransporterFacade_H
  15. #include <kernel_types.h>
  16. #include <ndb_limits.h>
  17. #include <NdbThread.h>
  18. #include <TransporterRegistry.hpp>
  19. #include <NdbMutex.h>
  20. #include "DictCache.hpp"
  21. #include <BlockNumbers.h>
  22. class ClusterMgr;
  23. class ArbitMgr;
  24. class IPCConfig;
  25. struct ndb_mgm_configuration;
  26. class ConfigRetriever;
  27. class Ndb;
  28. class NdbApiSignal;
  29. typedef void (* ExecuteFunction)(void *, NdbApiSignal *, LinearSectionPtr ptr[3]);
  30. typedef void (* NodeStatusFunction)(void *, Uint32, bool nodeAlive, bool nfComplete);
  31. extern "C" {
  32.   void* runSendRequest_C(void*);
  33.   void* runReceiveResponse_C(void*);
  34.   void atexit_stop_instance();
  35. }
  36. /**
  37.  * Max number of Ndb objects in different threads.  
  38.  * (Ndb objects should not be shared by different threads.)
  39.  */
  40. class TransporterFacade
  41. {
  42. public:
  43.   TransporterFacade();
  44.   virtual ~TransporterFacade();
  45.   bool init(Uint32, const ndb_mgm_configuration *);
  46.   static TransporterFacade* instance();
  47.   int start_instance(int, const ndb_mgm_configuration*);
  48.   static void stop_instance();
  49.   
  50.   /**
  51.    * Register this block for sending/receiving signals
  52.    * @return BlockNumber or -1 for failure
  53.    */
  54.   int open(void* objRef, ExecuteFunction, NodeStatusFunction);
  55.   
  56.   // Close this block number
  57.   int close(BlockNumber blockNumber, Uint64 trans_id);
  58.   // Only sends to nodes which are alive
  59.   int sendSignal(NdbApiSignal * signal, NodeId nodeId);
  60.   int sendSignal(NdbApiSignal*, NodeId, 
  61.  LinearSectionPtr ptr[3], Uint32 secs);
  62.   int sendFragmentedSignal(NdbApiSignal*, NodeId, 
  63.    LinearSectionPtr ptr[3], Uint32 secs);
  64.   // Is node available for running transactions
  65.   bool   get_node_alive(NodeId nodeId) const;
  66.   bool   get_node_stopping(NodeId nodeId) const;
  67.   bool   getIsDbNode(NodeId nodeId) const;
  68.   bool   getIsNodeSendable(NodeId nodeId) const;
  69.   Uint32 getNodeGrp(NodeId nodeId) const;
  70.   Uint32 getNodeSequence(NodeId nodeId) const;
  71.   // Is there space in sendBuffer to send messages
  72.   bool   check_send_size(Uint32 node_id, Uint32 send_size);
  73.   // My own processor id
  74.   NodeId ownId() const;
  75.   void connected();
  76.   void doConnect(int NodeId);
  77.   void reportConnected(int NodeId);
  78.   void doDisconnect(int NodeId);
  79.   void reportDisconnected(int NodeId);
  80.   NodeId get_an_alive_node();
  81.   void ReportNodeAlive(NodeId nodeId);
  82.   void ReportNodeDead(NodeId nodeId);
  83.   void ReportNodeFailureComplete(NodeId nodeId);
  84.   
  85.   void lock_mutex();
  86.   void unlock_mutex();
  87.   // Improving the API performance
  88.   void forceSend(Uint32 block_number);
  89.   void checkForceSend(Uint32 block_number);
  90.   // Close this block number
  91.   int close_local(BlockNumber blockNumber);
  92.   // Scan batch configuration parameters
  93.   Uint32 get_scan_batch_size();
  94.   Uint32 get_batch_byte_size();
  95.   Uint32 get_batch_size();
  96. private:
  97.   /**
  98.    * Send a signal unconditional of node status (used by ClusterMgr)
  99.    */
  100.   friend class ClusterMgr;
  101.   friend class ArbitMgr;
  102.   friend class MgmtSrvr;
  103.   friend class SignalSender;
  104.   friend class GrepPS;
  105.   friend class ExtSender; ///< @todo Hack to be able to sendSignalUnCond
  106.   friend class GrepSS;
  107.   friend class Ndb;
  108.   friend class Ndb_cluster_connection_impl;
  109.   int sendSignalUnCond(NdbApiSignal *, NodeId nodeId);
  110.   bool isConnected(NodeId aNodeId);
  111.   void doStop();
  112.   
  113.   TransporterRegistry* theTransporterRegistry;
  114.   SocketServer m_socket_server;
  115.   int sendPerformedLastInterval;
  116.   int theOwnId;
  117.   NodeId theStartNodeId;
  118.   ClusterMgr* theClusterMgr;
  119.   ArbitMgr* theArbitMgr;
  120.   
  121.   // Improving the API response time
  122.   int checkCounter;
  123.   Uint32 currentSendLimit;
  124.   
  125.   void calculateSendLimit();
  126.   // Scan batch configuration parameters
  127.   Uint32 m_scan_batch_size;
  128.   Uint32 m_batch_byte_size;
  129.   Uint32 m_batch_size;
  130.   // Declarations for the receive and send thread
  131.   int  theStopReceive;
  132.   void threadMainSend(void);
  133.   NdbThread* theSendThread;
  134.   void threadMainReceive(void);
  135.   NdbThread* theReceiveThread;
  136.   friend void* runSendRequest_C(void*);
  137.   friend void* runReceiveResponse_C(void*);
  138.   friend void atexit_stop_instance();
  139.   /**
  140.    * Block number handling
  141.    */
  142. public:
  143.   STATIC_CONST( MAX_NO_THREADS = 4711 );
  144. private:
  145.   struct ThreadData {
  146.     STATIC_CONST( ACTIVE = (1 << 16) | 1 );
  147.     STATIC_CONST( INACTIVE = (1 << 16) );
  148.     STATIC_CONST( END_OF_LIST = MAX_NO_THREADS + 1 );
  149.     
  150.     ThreadData(Uint32 initialSize = 32);
  151.     
  152.     /**
  153.      * Split "object" into 3 list
  154.      *   This to improve locality
  155.      *   when iterating over lists
  156.      */
  157.     struct Object_Execute {
  158.       void * m_object;
  159.       ExecuteFunction m_executeFunction;
  160.     };
  161.     struct NodeStatus_NextFree {
  162.       NodeStatusFunction m_statusFunction;
  163.     };
  164.     Uint32 m_firstFree;
  165.     Vector<Uint32> m_statusNext;
  166.     Vector<Object_Execute> m_objectExecute;
  167.     Vector<NodeStatusFunction> m_statusFunction;
  168.     
  169.     int open(void* objRef, ExecuteFunction, NodeStatusFunction);
  170.     int close(int number);
  171.     void expand(Uint32 size);
  172.     inline Object_Execute get(Uint16 blockNo) const {
  173.       blockNo -= MIN_API_BLOCK_NO;
  174.       if(blockNo < m_objectExecute.size()){
  175. return m_objectExecute[blockNo];
  176.       }
  177.       Object_Execute oe = { 0, 0 };
  178.       return oe;
  179.     }
  180.     /**
  181.      * Is the block number used currently
  182.      */
  183.     inline bool getInUse(Uint16 index) const {
  184.       return (m_statusNext[index] & (1 << 16)) != 0;
  185.     }
  186.   } m_threads;
  187.   
  188.   Uint32 m_max_trans_id;
  189.   Uint32 m_fragmented_signal_id;
  190.   /**
  191.    * execute function
  192.    */
  193.   friend void execute(void * callbackObj, SignalHeader * const header, 
  194.                       Uint8 prio, 
  195.                       Uint32 * const theData, LinearSectionPtr ptr[3]);
  196.   
  197. public:
  198.   NdbMutex* theMutexPtr;
  199. private:
  200.   static TransporterFacade* theFacadeInstance;
  201. public:
  202.   GlobalDictCache m_globalDictCache;
  203. };
  204. inline
  205. TransporterFacade*
  206. TransporterFacade::instance()
  207. {
  208.   return theFacadeInstance;
  209. }
  210. inline
  211. void 
  212. TransporterFacade::lock_mutex()
  213. {
  214.   NdbMutex_Lock(theMutexPtr);
  215. }
  216. inline
  217. void 
  218. TransporterFacade::unlock_mutex()
  219. {
  220.   NdbMutex_Unlock(theMutexPtr);
  221. }
  222. #include "ClusterMgr.hpp"
  223. inline
  224. bool
  225. TransporterFacade::check_send_size(Uint32 node_id, Uint32 send_size)
  226. {
  227.   return true;
  228. }
  229. inline
  230. bool
  231. TransporterFacade::getIsDbNode(NodeId n) const {
  232.   return 
  233.     theClusterMgr->getNodeInfo(n).defined && 
  234.     theClusterMgr->getNodeInfo(n).m_info.m_type == NodeInfo::DB;
  235. }
  236. inline
  237. Uint32
  238. TransporterFacade::getNodeGrp(NodeId n) const {
  239.   return theClusterMgr->getNodeInfo(n).m_state.nodeGroup;
  240. }
  241. inline
  242. bool
  243. TransporterFacade::get_node_alive(NodeId n) const {
  244.   const ClusterMgr::Node & node = theClusterMgr->getNodeInfo(n);
  245.   return node.m_alive;
  246. }
  247. inline
  248. bool
  249. TransporterFacade::get_node_stopping(NodeId n) const {
  250.   const ClusterMgr::Node & node = theClusterMgr->getNodeInfo(n);
  251.   return ((node.m_state.startLevel == NodeState::SL_STOPPING_1) ||
  252.           (node.m_state.startLevel == NodeState::SL_STOPPING_2));
  253. }
  254. inline
  255. bool
  256. TransporterFacade::getIsNodeSendable(NodeId n) const {
  257.   const ClusterMgr::Node & node = theClusterMgr->getNodeInfo(n);
  258.   const Uint32 startLevel = node.m_state.startLevel;
  259.   if (node.m_info.m_type == NodeInfo::DB) {
  260.     if(node.m_state.singleUserMode && 
  261.        ownId() == node.m_state.singleUserApi) {
  262.       return (node.compatible && 
  263.               (node.m_state.startLevel == NodeState::SL_STOPPING_1 ||
  264.                node.m_state.startLevel == NodeState::SL_STARTED ||
  265.                node.m_state.startLevel == NodeState::SL_SINGLEUSER));
  266.       }
  267.       else
  268.         return node.compatible && (startLevel == NodeState::SL_STARTED ||
  269.                                  startLevel == NodeState::SL_STOPPING_1);
  270.   } else if (node.m_info.m_type == NodeInfo::REP) {
  271.     /**
  272.      * @todo Check that REP node actually has received API_REG_REQ
  273.      */
  274.     return node.compatible;
  275.   } else {
  276.     ndbout_c("TransporterFacade::getIsNodeSendable: Illegal node type: "
  277.              "%d of node: %d", 
  278.              node.m_info.m_type, n);
  279.     abort();
  280.     return false; // to remove compiler warning
  281.   }
  282. }
  283. inline
  284. Uint32
  285. TransporterFacade::getNodeSequence(NodeId n) const {
  286.   return theClusterMgr->getNodeInfo(n).m_info.m_connectCount;
  287. }
  288. inline
  289. Uint32
  290. TransporterFacade::get_scan_batch_size() {
  291.   return m_scan_batch_size;
  292. }
  293. inline
  294. Uint32
  295. TransporterFacade::get_batch_byte_size() {
  296.   return m_batch_byte_size;
  297. }
  298. inline
  299. Uint32
  300. TransporterFacade::get_batch_size() {
  301.   return m_batch_size;
  302. }
  303. #endif // TransporterFacade_H