SocketRegistry.hpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:7k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #ifndef SocketClientRegistry_H
  14. #define SocketClientRegistry_H
  15. #include <NdbTCP.h>
  16. #include <NdbOut.hpp>
  17. #include "SocketClient.hpp" 
  18. template<class T>
  19. class SocketRegistry {
  20. public:
  21.   SocketRegistry(Uint32 maxSocketClients);
  22.   ~SocketRegistry();
  23.   /**
  24.    * creates and adds a SocketClient to m_socketClients[]
  25.    * @param host - host name
  26.    * @param port - port to connect to
  27.    */
  28.   bool createSocketClient(const char * host, const Uint16 port);
  29.   /**
  30.    * performReceive reads from sockets should do more stuff 
  31.    */
  32.   int performReceive(T &);
  33.   /**
  34.    * performReceive reads from sockets should do more stuff 
  35.    */
  36.   int syncPerformReceive(const char* ,T &, Uint32);
  37.   /**
  38.    * performSend sends a command to a host
  39.    */
  40.   bool performSend(const char * buf, Uint32 len, const char * remotehost);
  41.   /**
  42.    * pollSocketClients performs a select (for a max. of timeoutmillis) or
  43.    * until there is data to be read from any SocketClient
  44.    * @param timeOutMillis - select timeout
  45.    */
  46.   int pollSocketClients(Uint32 timeOutMillis);
  47.   /**
  48.    * reconnect tries to reconnect to a cpcd given its hostname
  49.    * @param host - name of host to reconnect to.
  50.    */
  51.   bool reconnect(const char * host);
  52.   /**
  53.    * removeSocketClient
  54.    * @param host - name of host for which to remove the SocketConnection
  55.    */
  56.   bool removeSocketClient(const char * host);
  57. private:
  58.   SocketClient** m_socketClients;
  59.   Uint32 m_maxSocketClients;
  60.   Uint32 m_nSocketClients;
  61.   int tcpReadSelectReply;
  62.   fd_set tcpReadset;
  63.   
  64. };
  65. template<class T>
  66. inline
  67. SocketRegistry<T>::SocketRegistry(Uint32 maxSocketClients) {
  68.   m_maxSocketClients = maxSocketClients;
  69.   m_socketClients  = new SocketClient * [m_maxSocketClients];
  70.   m_nSocketClients = 0;
  71. }
  72. template<class T>
  73. inline
  74. SocketRegistry<T>::~SocketRegistry() {
  75.   delete [] m_socketClients;
  76. }
  77. template<class T>
  78. inline
  79. bool 
  80. SocketRegistry<T>::createSocketClient(const char * host, Uint16 port) {
  81.   if(port == 0)
  82.     return false;
  83.   if(host==NULL)
  84.     return false;
  85.   
  86.   SocketClient * socketClient = new SocketClient(host, port);
  87.   if(socketClient->openSocket() < 0 || socketClient == NULL) {
  88.     ndbout << "could not connect" << endl;
  89.     delete socketClient;
  90.     return false;
  91.   }
  92.   else {
  93.     m_socketClients[m_nSocketClients] = socketClient;
  94.     m_nSocketClients++;
  95.   }
  96.   return true;
  97. }
  98. template<class T>
  99. inline
  100. int 
  101. SocketRegistry<T>::pollSocketClients(Uint32 timeOutMillis) {
  102.   // Return directly if there are no TCP transporters configured
  103.   if (m_nSocketClients == 0){
  104.     tcpReadSelectReply = 0;
  105.     return 0;
  106.   }
  107.   struct timeval timeout;
  108.   timeout.tv_sec  = timeOutMillis / 1000;
  109.   timeout.tv_usec = (timeOutMillis % 1000) * 1000;
  110.   NDB_SOCKET_TYPE maxSocketValue = 0;
  111.   
  112.   // Needed for TCP/IP connections
  113.   // The read- and writeset are used by select
  114.   
  115.   FD_ZERO(&tcpReadset);
  116.   // Prepare for sending and receiving
  117.   for (Uint32 i = 0; i < m_nSocketClients; i++) {
  118.     SocketClient * t = m_socketClients[i];
  119.     
  120.     // If the socketclient is connected
  121.     if (t->isConnected()) {
  122.       
  123.       const NDB_SOCKET_TYPE socket = t->getSocket();
  124.       // Find the highest socket value. It will be used by select
  125.       if (socket > maxSocketValue)
  126. maxSocketValue = socket;
  127.       
  128.       // Put the connected transporters in the socket read-set 
  129.       FD_SET(socket, &tcpReadset);
  130.     }
  131.   }
  132.   
  133.   // The highest socket value plus one
  134.   maxSocketValue++; 
  135.   
  136.   tcpReadSelectReply = select(maxSocketValue, &tcpReadset, 0, 0, &timeout);  
  137. #ifdef NDB_WIN32
  138.   if(tcpReadSelectReply == SOCKET_ERROR)
  139.   {
  140.     NdbSleep_MilliSleep(timeOutMillis);
  141.   }
  142. #endif
  143.   return tcpReadSelectReply;
  144. }
  145. template<class T>
  146. inline
  147. bool 
  148. SocketRegistry<T>::performSend(const char * buf, Uint32 len, const char * remotehost)
  149. {
  150.   SocketClient * socketClient;
  151.   for(Uint32 i=0; i < m_nSocketClients; i++) {
  152.     socketClient = m_socketClients[i];
  153.     if(strcmp(socketClient->gethostname(), remotehost)==0) {
  154.       if(socketClient->isConnected()) {
  155. if(socketClient->writeSocket(buf, len)>0)
  156.   return true;
  157. else
  158.   return false;
  159.       }
  160.     }
  161.   }
  162.   return false;
  163. }
  164. template<class T>
  165. inline
  166. int 
  167. SocketRegistry<T>::performReceive(T & t) {  
  168.   char buf[255] ; //temp. just for testing. must fix better
  169.   if(tcpReadSelectReply > 0){
  170.     for (Uint32 i=0; i<m_nSocketClients; i++) {
  171.       SocketClient *sc = m_socketClients[i];
  172.       const NDB_SOCKET_TYPE socket    = sc->getSocket();
  173.       if(sc->isConnected() && FD_ISSET(socket, &tcpReadset)) {
  174. t->runSession(socket,t);
  175.       }
  176.     }
  177.     return 1;
  178.   }
  179.   return 0;
  180.   
  181. }
  182. template<class T>
  183. inline
  184. int 
  185. SocketRegistry<T>::syncPerformReceive(const char * remotehost,
  186.       T & t,
  187.       Uint32 timeOutMillis) {  
  188.   char buf[255] ; //temp. just for testing. must fix better
  189.   struct timeval timeout;
  190.   timeout.tv_sec  = timeOutMillis / 1000;
  191.   timeout.tv_usec = (timeOutMillis % 1000) * 1000;
  192.   int reply;
  193.   SocketClient * sc;
  194.   for(Uint32 i=0; i < m_nSocketClients; i++) {
  195.     sc = m_socketClients[i];
  196.     if(strcmp(sc->gethostname(), remotehost)==0) {
  197.       if(sc->isConnected()) {
  198. /*FD_ZERO(&tcpReadset);
  199.   reply = select(sc->getSocket()+1, 0, 0, 0, &timeout);
  200. reply=1;
  201. if(reply > 0) {*/
  202.   t.runSession(sc->getSocket(), t);
  203.   //}
  204.       }
  205.       
  206.     }
  207.   }  
  208. }
  209. template<class T>
  210. inline
  211. bool 
  212. SocketRegistry<T>::reconnect(const char * host){
  213.   for(Uint32 i=0; i < m_nSocketClients; i++) {
  214.     SocketClient * socketClient = m_socketClients[i];
  215.     if(strcmp(socketClient->gethostname(), host)==0) {
  216.       if(!socketClient->isConnected()) {
  217. if(socketClient->openSocket() > 0)
  218.   return true;
  219. else return false;
  220.       }
  221.     }
  222.   }
  223.   return false;
  224. }
  225. template<class T>
  226. inline
  227. bool 
  228. SocketRegistry<T>::removeSocketClient(const char * host){
  229.   for(Uint32 i=0; i < m_nSocketClients; i++) {
  230.     SocketClient * socketClient = m_socketClients[i];
  231.     if(strcmp(socketClient->gethostname(), host)==0) {
  232.       if(!socketClient->isConnected()) {
  233. if(socketClient->closeSocket() > 0) {
  234.   delete socketClient;
  235.   return true;
  236. }
  237. else return false;
  238.       }
  239.     }
  240.   }
  241.   return false;
  242. }
  243. #endif // Define of SocketRegistry