repapi.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:13k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #include <ndb_global.h>
  14. #include <NdbTCP.h>
  15. #include "repapi.h"
  16. //#include "mgmapi_debug.h"
  17. #include <socket_io.h>
  18. #include <NdbOut.hpp>
  19. #include <SocketServer.hpp>
  20. #include <Parser.hpp>
  21. #include <OutputStream.hpp>
  22. #include <InputStream.hpp>
  23. #if defined  VM_TRACE && !defined NO_DEBUG_MESSAGES
  24. #define DEBUG(x) ndbout << x << endl;
  25. #elif defined NO_DEBUG_MESSAGES
  26. #define DEBUG(x)
  27. #endif
  28. #ifdef NDB_WIN32
  29. #define EBADMSG EFAULT
  30. #endif
  31. class ParserDummy2 : SocketServer::Session {
  32. public:
  33.   ParserDummy2(NDB_SOCKET_TYPE sock);
  34. };
  35. ParserDummy2::ParserDummy2(NDB_SOCKET_TYPE sock) : SocketServer::Session(sock) {
  36. }
  37. typedef Parser<ParserDummy2> Parser_t;
  38. #define REP_CMD(name, fun, desc) 
  39.  { name, 
  40.    0, 
  41.    ParserRow<ParserDummy2>::Cmd, 
  42.    ParserRow<ParserDummy2>::String, 
  43.    ParserRow<ParserDummy2>::Optional, 
  44.    ParserRow<ParserDummy2>::IgnoreMinMax, 
  45.    0, 0, 
  46.    fun, 
  47.    desc, 0 }
  48. #define REP_ARG(name, type, opt, desc) 
  49.  { name, 
  50.    0, 
  51.    ParserRow<ParserDummy2>::Arg, 
  52.    ParserRow<ParserDummy2>::type, 
  53.    ParserRow<ParserDummy2>::opt, 
  54.    ParserRow<ParserDummy2>::IgnoreMinMax, 
  55.    0, 0, 
  56.    0, 
  57.   desc, 0 }
  58. #define REP_END() 
  59.  { 0, 
  60.    0, 
  61.    ParserRow<ParserDummy2>::Arg, 
  62.    ParserRow<ParserDummy2>::Int, 
  63.    ParserRow<ParserDummy2>::Optional, 
  64.    ParserRow<ParserDummy2>::IgnoreMinMax, 
  65.    0, 0, 
  66.    0, 
  67.    0, 0 }
  68. struct ndb_rep_handle {
  69.   char * hostname;
  70.   unsigned short port;
  71.   
  72.   int connected;
  73.   int last_error;
  74.   int last_error_line;
  75.   int read_timeout;
  76.   int write_timeout;
  77.   NDB_SOCKET_TYPE socket;
  78. #ifdef REPAPI_LOG
  79.   FILE* logfile;
  80. #endif
  81. };
  82. #define SET_ERROR(h, e) 
  83.   h->last_error = e;  
  84.   h->last_error_line = __LINE__;
  85. extern "C"
  86. NdbRepHandle
  87. ndb_rep_create_handle(){
  88.   NdbRepHandle h   = (NdbRepHandle)malloc(sizeof(ndb_rep_handle));
  89.   h->connected     = 0;
  90.   h->last_error    = 0;
  91.   h->last_error_line = 0;
  92.   h->hostname      = 0;
  93.   h->socket        = -1;
  94.   h->read_timeout  = 50000;
  95.   h->write_timeout = 100;
  96.   
  97. #ifdef REPAPI_LOG
  98.   h->logfile = 0;
  99. #endif
  100.   return h;
  101. }
  102. /**
  103.  * Destroy a handle
  104.  */
  105. extern "C"
  106. void
  107. ndb_rep_destroy_handle(NdbRepHandle * handle){
  108.   if(!handle)
  109.     return;
  110.   if((* handle)->connected){
  111.     ndb_rep_disconnect(* handle);
  112.   }
  113.   if((* handle)->hostname != 0){
  114.     free((* handle)->hostname);
  115.   }
  116. #ifdef REPAPI_LOG
  117.   if ((* handle)->logfile != 0){
  118.     fclose((* handle)->logfile);
  119.     (* handle)->logfile = 0;
  120.   }
  121. #endif
  122.   free(* handle);
  123.   * handle = 0;
  124. }
  125. /**
  126.  * Get latest error associated with a handle
  127.  */
  128. extern "C"
  129. int
  130. ndb_rep_get_latest_error(const NdbRepHandle h){
  131.   return h->last_error;
  132. }
  133. /**
  134.  * Get latest error line associated with a handle
  135.  */
  136. extern "C"
  137. int
  138. ndb_rep_get_latest_error_line(const NdbRepHandle h){
  139.   return h->last_error_line;
  140. }
  141. static
  142. int
  143. parse_connect_string(const char * connect_string,
  144.      NdbRepHandle handle){
  145.   if(connect_string == 0){
  146.     DEBUG("connect_string == 0");
  147.     SET_ERROR(handle, EINVAL);
  148.     return -1;
  149.   }
  150.   
  151.   char * line = strdup(connect_string);
  152.   if(line == 0){
  153.     DEBUG("line == 0");
  154.     SET_ERROR(handle, ENOMEM);
  155.     return -1;
  156.   }
  157.   
  158.   char * tmp = strchr(line, ':');
  159.   if(tmp == 0){
  160.     DEBUG("tmp == 0");
  161.     free(line);
  162.     SET_ERROR(handle, EINVAL);
  163.     return -1;
  164.   }
  165.   * tmp = 0; tmp++;
  166.   
  167.   int port = 0;
  168.   if(sscanf(tmp, "%d", &port) != 1){
  169.     DEBUG("sscanf() != 1");
  170.     free(line);
  171.     SET_ERROR(handle, EINVAL);
  172.     return -1;
  173.   }
  174.   
  175.   if(handle->hostname != 0)
  176.     free(handle->hostname);
  177.   handle->hostname = strdup(line);
  178.   handle->port = port;
  179.   free(line);
  180.   return 0;
  181. }
  182. /*
  183.  * Call an operation, and return the reply
  184.  */
  185. static const Properties *
  186. ndb_rep_call(NdbRepHandle handle,
  187.      const ParserRow<ParserDummy2> *command_reply,
  188.      const char *cmd,
  189.      const Properties *cmd_args) {
  190.   SocketOutputStream out(handle->socket);
  191.   SocketInputStream in(handle->socket, handle->read_timeout);
  192.   out.println(cmd);
  193. #ifdef REPAPI_LOG
  194.   /** 
  195.    * Print command to  log file
  196.    */
  197.   FileOutputStream f(handle->logfile);
  198.   f.println("OUT: %s", cmd);
  199. #endif
  200.   if(cmd_args != NULL) {
  201.     Properties::Iterator iter(cmd_args);
  202.     const char *name;
  203.     while((name = iter.next()) != NULL) {
  204.       PropertiesType t;
  205.       Uint32 val_i;
  206.       BaseString val_s;
  207.       cmd_args->getTypeOf(name, &t);
  208.       switch(t) {
  209.       case PropertiesType_Uint32:
  210. cmd_args->get(name, &val_i);
  211. out.println("%s: %d", name, val_i);
  212. break;
  213.       case PropertiesType_char:
  214. cmd_args->get(name, val_s);
  215. out.println("%s: %s", name, val_s.c_str());
  216. break;
  217.       default:
  218. /* Ignore */
  219. break;
  220.       }
  221.     }
  222. #ifdef REPAPI_LOG
  223.   /** 
  224.    * Print arguments to  log file
  225.    */
  226.   cmd_args->print(handle->logfile, "OUT: ");
  227. #endif
  228.   }
  229.   out.println("");
  230.   Parser_t::Context ctx;
  231.   ParserDummy2 session(handle->socket);
  232.   Parser_t parser(command_reply, in, true, true, true);
  233. #if 1
  234.   const Properties* p = parser.parse(ctx, session);
  235.   if (p == NULL){
  236.     /**
  237.      * Print some info about why the parser returns NULL
  238.      */
  239.     ndbout << " status=" << ctx.m_status << ", curr="<<ctx.m_currentToken << endl;
  240.   } 
  241. #ifdef REPAPI_LOG
  242.   else {
  243.     /** 
  244.      * Print reply to log file
  245.      */
  246.     p->print(handle->logfile, "IN: ");
  247.   }
  248. #endif
  249.   return p;
  250. #else
  251.    return parser.parse(ctx, session);
  252. #endif
  253. }
  254. /**
  255.  * Connect to a rep server
  256.  *
  257.  * Returns 0 if OK, sets ndb_rep_handle->last_error otherwise
  258.  */
  259. extern "C"
  260. int
  261. ndb_rep_connect(NdbRepHandle handle, const char * repsrv){
  262.   if(handle == 0)
  263.     return -1;
  264.   
  265.   if(parse_connect_string(repsrv, handle) != 0)
  266.     return -1;
  267. #ifdef REPAPI_LOG
  268.   /**
  269.   * Open the log file
  270.   */
  271.   char logname[64];
  272.   snprintf(logname, 64, "repapi.log");
  273.   handle->logfile = fopen(logname, "w");
  274. #endif
  275.   /**
  276.    * Do connect
  277.    */
  278.   const NDB_SOCKET_TYPE sockfd = socket(AF_INET, SOCK_STREAM, 0);
  279.   if (sockfd == NDB_INVALID_SOCKET) {
  280.     DEBUG("socket() == INVALID_SOCKET");
  281.     return -1;
  282.   }
  283.   
  284.   struct sockaddr_in servaddr;
  285.   memset(&servaddr, 0, sizeof(servaddr));
  286.   servaddr.sin_family = AF_INET;
  287.   servaddr.sin_port = htons(handle->port);
  288.   // Convert ip address presentation format to numeric format
  289.   const int res1 = Ndb_getInAddr(&servaddr.sin_addr, handle->hostname);
  290.   if (res1 != 0) {
  291.     DEBUG("Ndb_getInAddr(...) == -1");
  292.     return -1;
  293.   }
  294.   
  295.   const int res2 = connect(sockfd, (struct sockaddr*) &servaddr, 
  296.    sizeof(servaddr));
  297.   if (res2 == -1) {
  298.     DEBUG("connect() == -1");
  299.     NDB_CLOSE_SOCKET(sockfd);
  300.     return -1;
  301.   }
  302.   
  303.   handle->socket    = sockfd;
  304.   handle->connected = 1;
  305.   return 0;
  306. }
  307.   
  308. /**
  309.  * Disconnect from a rep server
  310.  */
  311. extern "C"
  312. void
  313. ndb_rep_disconnect(NdbRepHandle handle){
  314.   if(handle == 0)
  315.     return;
  316.   
  317.   if(handle->connected != 1){
  318.     return;
  319.   }
  320.   NDB_CLOSE_SOCKET(handle->socket);
  321.   handle->socket = -1;
  322.   handle->connected = 0;
  323.   return;
  324. }
  325. /******************************************************************************
  326.  * Global Replication
  327.  ******************************************************************************/
  328. extern "C"
  329. int ndb_rep_command(NdbRepHandle handle,
  330.     unsigned int request,
  331.     unsigned int* replication_id,
  332.     struct ndb_rep_reply* /*reply*/,
  333.     unsigned int epoch) {
  334.   
  335.   *replication_id = 0;
  336.   const ParserRow<ParserDummy2> replication_reply[] = {
  337.     REP_CMD("global replication reply", NULL, ""),
  338.     REP_ARG("result", Int, Mandatory, "Error message"),
  339.     REP_ARG("id", Int, Optional, "Id of global replication"),
  340.     REP_END()
  341.   };
  342.   
  343.   if (handle == 0) {
  344.     return -1;
  345.   }
  346.   
  347.   if (handle->connected != 1) {
  348.     handle->last_error = EINVAL;
  349.     return -1;
  350.   }
  351.   Properties args;
  352.   args.put("request", request);
  353.   args.put("id", *replication_id);
  354.   if(epoch > 0) 
  355.     args.put("epoch",epoch);
  356.   else
  357.     args.put("epoch",(unsigned int)0);
  358.   const Properties *reply;
  359.   reply = ndb_rep_call(handle, replication_reply, "rep", &args);
  360.   if(reply == NULL) {
  361.     handle->last_error = EIO;
  362.     return -1;
  363.   }
  364.   reply->get("id", replication_id);
  365.   Uint32 result;
  366.   reply->get("result", &result);
  367.   delete reply;
  368.   return result;
  369. }
  370. extern "C"
  371. int convert2int(char * first, char * last, unsigned int f[], unsigned int  l[])
  372. {
  373.   char * ftok = strtok(first, ",");
  374.   char * ltok = strtok(last, ",");
  375.   Uint32 i=0;
  376.   while(ftok!=NULL && ltok!=NULL) 
  377.   {
  378.     f[i] = atoi(ftok);
  379.     l[i] = atoi(ltok);
  380.     ftok = strtok(NULL, ",");
  381.     ltok = strtok(NULL, ",");
  382.     i++;
  383.   }
  384.  
  385.   return 0;
  386. }
  387. int ndb_rep_query(NdbRepHandle           handle,
  388.   QueryCounter           counter,
  389.   unsigned int*          replicationId,
  390.   struct ndb_rep_reply*  /*reply*/,
  391.   struct rep_state * state)
  392. {
  393.   *replicationId = 0; // not used currently.
  394.   if(state == 0)   
  395.     return -1;
  396.   const ParserRow<ParserDummy2> replication_reply[] = {
  397.     REP_CMD("global replication query reply", NULL, ""),
  398.     REP_ARG("result", String, Mandatory, "Error message"),
  399.     REP_ARG("id", Int, Mandatory, "replicationId"),
  400.     REP_ARG("no_of_nodegroups", Int, Optional, "number of nodegroups"),
  401.     REP_ARG("subid", Int, Optional, "Id of subscription"),
  402.     REP_ARG("subkey", Int, Optional, "Key of subscription"),
  403.     REP_ARG("connected_rep", Int, Optional, "connected to rep"),
  404.     REP_ARG("connected_db", Int, Optional, "connected to db"),
  405.     REP_ARG("first", String, Optional, ""),
  406.     REP_ARG("last", String, Optional, ""),
  407.     REP_ARG("state_sub", Int, Optional, "state of subsription"),
  408.     REP_ARG("state", Int, Optional, "state"),
  409.     REP_END()
  410.   };
  411.   
  412.   if (handle == 0) {
  413.     return -1;
  414.   }
  415.   
  416.   if (handle->connected != 1) {
  417.     handle->last_error = EINVAL;
  418.     return -1;
  419.   }
  420.   const Properties *props;
  421.   Properties args;
  422.   Uint32 request = 0;
  423.   args.put("request", request);
  424.   args.put("id", *replicationId);
  425.   args.put("counter" , (Uint32)counter);
  426.   props = ndb_rep_call(handle, replication_reply, "rep query", &args);
  427.   BaseString result;
  428.   props->get("result", result);
  429.   if(strcmp(result.c_str(), "Ok") != 0) 
  430.   {
  431.     delete props;
  432.     return 1;
  433.   }
  434.   state->queryCounter = counter;
  435.   unsigned int no_of_nodegroups;
  436.   props->get("no_of_nodegroups", &no_of_nodegroups);
  437.   state->no_of_nodegroups = no_of_nodegroups;  
  438.   if(counter >= 0) 
  439.   {
  440.     BaseString first, last;
  441.     props->get("first", first);
  442.     props->get("last", last);
  443.     convert2int((char*)first.c_str(), (char*)last.c_str(),
  444.       state->first , state->last );
  445.   } else 
  446.   {
  447.     for(Uint32 i = 0; i<REPAPI_MAX_NODE_GROUPS; i++) {
  448.     state->first[i] = 0;
  449.     state->last[i] = 0;
  450.     }
  451.   }
  452.   unsigned int connected_rep = 0;
  453.   props->get("connected_rep", &connected_rep);
  454.   state->connected_rep = connected_rep;
  455.   
  456.   unsigned int connected_db = 0;
  457.   props->get("connected_rep", &connected_db);
  458.   state->connected_db = connected_db;      
  459.   
  460.   unsigned int subid;
  461.   props->get("subid", &subid);
  462.   state->subid = subid;
  463.   unsigned int subkey;
  464.   props->get("subkey", &subkey);
  465.   state->subkey = subkey;
  466.   unsigned int _state;
  467.   props->get("state", &_state);
  468.   state->state = _state;
  469.   unsigned int state_sub;
  470.   props->get("state_sub", &state_sub);
  471.   state->state_sub = state_sub;
  472.   if(props == NULL) {
  473.     handle->last_error = EIO;
  474.     return -1;
  475.   }
  476.   delete props;
  477.   return 0;
  478. }
  479. extern "C"
  480. int  
  481. ndb_rep_get_status(NdbRepHandle handle,
  482. unsigned int* replication_id,
  483. struct ndb_rep_reply* /*reply*/,
  484. struct rep_state * repstate) {
  485.   
  486.   const ParserRow<ParserDummy2> replication_reply[] = {
  487.     REP_CMD("global replication status reply", NULL, ""),
  488.     REP_ARG("result", String, Mandatory, "Error message"),
  489.     REP_ARG("id", Int, Optional, "Error message"),
  490.     REP_ARG("subid", Int, Optional, "Id of subscription"),
  491.     REP_ARG("subkey", Int, Optional, "Key of subscription"),
  492.     REP_ARG("connected_rep", Int, Optional, "connected to rep"),
  493.     REP_ARG("connected_db", Int, Optional, "connected to db"),
  494.     REP_ARG("state_sub", Int, Optional, "state of subsription"),
  495.     REP_ARG("state", Int, Optional, "state"),
  496.     REP_END()
  497.   };
  498.   
  499.   if (handle == 0) {
  500.     return -1;
  501.   }
  502.   
  503.   if (handle->connected != 1) {
  504.     handle->last_error = EINVAL;
  505.     return -1;
  506.   }
  507.   const Properties *reply;
  508.   Properties args;
  509.   Uint32 request = 0;
  510.   args.put("request", request);
  511.   reply = ndb_rep_call(handle, replication_reply, "rep status", &args);
  512.   if(reply == NULL) {
  513.     handle->last_error = EIO;
  514.     return -1;
  515.   }
  516.   
  517.   Uint32 result;
  518.   reply->get("result", &result);
  519.   reply->get("id", replication_id);
  520.   reply->get("subid", (Uint32*)&repstate->subid);
  521.   reply->get("subkey", (Uint32*)&repstate->subkey);
  522.   reply->get("connected_rep", (Uint32*)&repstate->connected_rep);
  523.   reply->get("connected_db", (Uint32*)&repstate->connected_db);
  524.   reply->get("state", (Uint32*)&repstate->state);
  525.   reply->get("state_sub", (Uint32*)&repstate->state_sub);
  526.   delete reply;
  527.   return result;
  528. }