RepApiService.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:8k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #include <Parser.hpp>
  14. #include <NdbOut.hpp>
  15. #include <Properties.hpp>
  16. #include <socket_io.h>
  17. #include "RepApiService.hpp"
  18. #include "RepApiInterpreter.hpp"
  19. #include "repapi/repapi.h"
  20. #include <NdbMutex.h>
  21. #include <OutputStream.hpp>
  22. /**
  23.    const char * name;
  24.    const char * realName;
  25.    const Type type;
  26.    const ArgType argType;
  27.    const ArgRequired argRequired;
  28.    const ArgMinMax argMinMax;
  29.    const int minVal;
  30.    const int maxVal;
  31.    void (T::* function)(const class Properties & args);
  32.    const char * description;
  33. */
  34. #define REP_CMD(name, fun, desc) 
  35.  { name, 
  36.    0, 
  37.    ParserRow<RepApiSession>::Cmd, 
  38.    ParserRow<RepApiSession>::String, 
  39.    ParserRow<RepApiSession>::Optional, 
  40.    ParserRow<RepApiSession>::IgnoreMinMax, 
  41.    0, 0, 
  42.    fun, 
  43.    desc }
  44. #define REP_ARG(name, type, opt, desc) 
  45.  { name, 
  46.    0, 
  47.    ParserRow<RepApiSession>::Arg, 
  48.    ParserRow<RepApiSession>::type, 
  49.    ParserRow<RepApiSession>::opt, 
  50.    ParserRow<RepApiSession>::IgnoreMinMax, 
  51.    0, 0, 
  52.    0, 
  53.   desc }
  54. #define REP_ARG2(name, type, opt, min, max, desc) 
  55.  { name, 
  56.    0, 
  57.    ParserRow<RepApiSession>::Arg, 
  58.    ParserRow<RepApiSession>::type, 
  59.    ParserRow<RepApiSession>::opt, 
  60.    ParserRow<RepApiSession>::IgnoreMinMax, 
  61.    min, max, 
  62.    0, 
  63.   desc }
  64. #define REP_END() 
  65.  { 0, 
  66.    0, 
  67.    ParserRow<RepApiSession>::Arg, 
  68.    ParserRow<RepApiSession>::Int, 
  69.    ParserRow<RepApiSession>::Optional, 
  70.    ParserRow<RepApiSession>::IgnoreMinMax, 
  71.    0, 0, 
  72.    0, 
  73.    0 }
  74. #define REP_CMD_ALIAS(name, realName, fun) 
  75.  { name, 
  76.    realName, 
  77.    ParserRow<RepApiSession>::CmdAlias, 
  78.    ParserRow<RepApiSession>::Int, 
  79.    ParserRow<RepApiSession>::Optional, 
  80.    ParserRow<RepApiSession>::IgnoreMinMax, 
  81.    0, 0, 
  82.    0, 
  83.    0 }
  84. #define REP_ARG_ALIAS(name, realName, fun) 
  85.  { name, 
  86.    realName, 
  87.    ParserRow<RepApiSession>::ArgAlias, 
  88.    ParserRow<RepApiSession>::Int, 
  89.    ParserRow<RepApiSession>::Optional, 
  90.    ParserRow<RepApiSession>::IgnoreMinMax, 
  91.    0, 0, 
  92.    0, 
  93.    0 }
  94. const
  95. ParserRow<RepApiSession> commands[] = 
  96. {
  97.   
  98.   REP_CMD("rep" , &RepApiSession::execCommand, ""),
  99.     REP_ARG("request",     Int, Mandatory,  "Grep::Request."),
  100.     REP_ARG("id",     Int, Mandatory,  "Replication id "),
  101.     REP_ARG("epoch",     Int, Optional,  "Epoch. Used by stop epoch ..."),
  102.   REP_CMD("rep status" , &RepApiSession::getStatus, ""),
  103.     REP_ARG("request",     Int, Optional,  "Grep::Request."),
  104.   REP_CMD("rep query" , &RepApiSession::query, ""),
  105.   REP_ARG("id",     Int, Mandatory,  "Replication Id"),
  106.   REP_ARG("counter",     Int, Mandatory,  "QueryCounter."),
  107.   REP_ARG("request",     Int, Mandatory,  "Grep::Request."),
  108.   
  109.   REP_END()
  110. };
  111. RepApiSession::RepApiSession(NDB_SOCKET_TYPE sock,
  112.        class RepApiInterpreter & rep)
  113.   : SocketServer::Session(sock)
  114.   , m_rep(rep)
  115. {
  116.   m_input = new SocketInputStream(sock);
  117.   m_output = new SocketOutputStream(sock);
  118.   m_parser = new Parser<RepApiSession>(commands, *m_input, true, true, true);
  119. }
  120. RepApiSession::RepApiSession(FILE * f, class RepApiInterpreter & rep)
  121.   : SocketServer::Session(1)
  122.   , m_rep(rep)
  123. {
  124.   m_input = new FileInputStream(f);
  125.   m_parser = new Parser<RepApiSession>(commands, *m_input, true, true, true);
  126. }
  127.   
  128. RepApiSession::~RepApiSession() 
  129. {
  130.   delete m_input;
  131.   delete m_parser;
  132. }
  133. void
  134. RepApiSession::runSession()
  135. {
  136.   Parser_t::Context ctx;
  137.   while(!m_stop){
  138.     m_parser->run(ctx, * this); 
  139.     if(ctx.m_currentToken == 0)
  140.       break;
  141.     switch(ctx.m_status){
  142.     case Parser_t::Ok:
  143.       for(size_t i = 0; i<ctx.m_aliasUsed.size(); i++)
  144. ndbout_c("Used alias: %s -> %s", 
  145.  ctx.m_aliasUsed[i]->name, ctx.m_aliasUsed[i]->realName);
  146.       break;
  147.     case Parser_t::NoLine:
  148.     case Parser_t::EmptyLine:
  149.       break;
  150.     default:
  151.       break;
  152.     }
  153.   }
  154.   NDB_CLOSE_SOCKET(m_socket);
  155. }
  156. void
  157. RepApiSession::execCommand(Parser_t::Context & /* unused */, 
  158.    const class Properties & args)
  159. {
  160.   Uint32  err;
  161.   Uint32  replicationId;
  162.   args.get("id", &replicationId);
  163.   Properties * result  = m_rep.execCommand(args);
  164.   if(result == NULL) {
  165.     m_output->println("global replication reply");
  166.     m_output->println("result: %d", -1);
  167.     m_output->println("id: %d",replicationId);
  168.     m_output->println("");
  169.     return;
  170.   }
  171.   result->get("err", &err);
  172.   m_output->println("global replication reply");
  173.   m_output->println("result: %d", err);
  174.   m_output->println("id: %d", 0);
  175.   m_output->println("");
  176.   delete result;
  177. }
  178. void
  179. RepApiSession::getStatus(Parser_t::Context & /* unused */, 
  180.  const class Properties & args)
  181. {
  182.   Uint32  err;
  183.   Properties * result  = m_rep.getStatus();
  184.   result->get("err", &err);
  185.   Uint32 subId;
  186.   result->get("subid", &subId);
  187.   Uint32 subKey;
  188.   result->get("subkey", &subKey);
  189.   Uint32 connected_rep;
  190.   result->get("connected_rep", &connected_rep);
  191.   Uint32 connected_db;
  192.   result->get("connected_db", &connected_db);
  193.   Uint32 state;
  194.   result->get("state", &state);
  195.   Uint32 state_sub;
  196.   result->get("state", &state_sub);
  197.   m_output->println("global replication status reply");
  198.   m_output->println("result: %d",0);
  199.   m_output->println("id: %d",0);
  200.   m_output->println("subid: %d", subId);
  201.   m_output->println("subkey: %d", subKey);
  202.   m_output->println("connected_rep: %d", connected_rep);
  203.   m_output->println("connected_db: %d", connected_db);
  204.   m_output->println("state_sub: %d", state_sub);
  205.   m_output->println("state: %d", state);
  206.   m_output->println("");
  207.   delete result;
  208. }
  209. void
  210. RepApiSession::query(Parser_t::Context & /* unused */, 
  211. const class Properties & args)
  212. {
  213.   Uint32  err;
  214.   Uint32 counter, replicationId;
  215.   args.get("counter", &counter);
  216.   args.get("id", &replicationId);
  217.   Properties * result  = m_rep.query(counter, replicationId);
  218.   if(result == NULL) {
  219.     m_output->println("global replication query reply");
  220.     m_output->println("result: %s","Failed");
  221.     m_output->println("id: %d",replicationId);
  222.     m_output->println("");
  223.     return;
  224.   }
  225.     
  226.   BaseString first;
  227.   BaseString last;
  228.   Uint32 subid = 0, subkey = 0, no_of_nodegroups = 0;
  229.   Uint32 connected_rep = 0, connected_db = 0;
  230.   Uint32 state = 0 , state_sub = 0;
  231.   result->get("err", &err);
  232.   result->get("no_of_nodegroups", &no_of_nodegroups);
  233.   result->get("subid", &subid);
  234.   result->get("subkey", &subkey);
  235.   result->get("connected_rep", &connected_rep);
  236.   result->get("connected_db", &connected_db);
  237.   result->get("first", first);
  238.   result->get("last", last);
  239.   result->get("state", &state);
  240.   result->get("state_sub", &state_sub);
  241.   m_output->println("global replication query reply");
  242.   m_output->println("result: %s","Ok");
  243.   m_output->println("id: %d",replicationId);
  244.   m_output->println("no_of_nodegroups: %d",no_of_nodegroups);
  245.   m_output->println("subid: %d", subid);
  246.   m_output->println("subkey: %d", subkey);
  247.   m_output->println("connected_rep: %d", connected_rep);
  248.   m_output->println("connected_db: %d", connected_db);
  249.   m_output->println("state_sub: %d", state_sub);
  250.   m_output->println("state: %d", state);
  251.   m_output->println("first: %s", first.c_str());
  252.   m_output->println("last: %s", last.c_str());
  253.   m_output->println("");
  254.   delete result;
  255. }
  256. static const char *
  257. propToString(Properties *prop, const char *key) {
  258.   static char buf[32];
  259.   const char *retval = NULL;
  260.   PropertiesType pt;
  261.   prop->getTypeOf(key, &pt);
  262.   switch(pt) {
  263.   case PropertiesType_Uint32:
  264.     Uint32 val;
  265.     prop->get(key, &val);
  266.     snprintf(buf, sizeof buf, "%d", val);
  267.     retval = buf;
  268.     break;
  269.   case PropertiesType_char:
  270.     const char *str;
  271.     prop->get(key, &str);
  272.     retval = str;
  273.     break;
  274.   default:
  275.     snprintf(buf, sizeof buf, "(unknown)");
  276.     retval = buf;
  277.   }
  278.   return retval;
  279. }
  280. void
  281. RepApiSession::printProperty(Properties *prop, const char *key) {
  282.   m_output->println("%s: %s", key, propToString(prop, key));
  283. }
  284. void
  285. RepApiSession::stopSession(){
  286. }