transproxy.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:9k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #include <ndb_global.h>
  14. #include <NdbTCP.h>
  15. #include <NdbOut.hpp>
  16. #include <NdbThread.h>
  17. #include <NdbSleep.h>
  18. #include <Properties.hpp>
  19. #include <LocalConfig.hpp>
  20. #include <Config.hpp>
  21. #include <InitConfigFileParser.hpp>
  22. #include <IPCConfig.hpp>
  23. static void
  24. fatal(char const* fmt, ...)
  25. {
  26.     va_list ap;
  27.     char buf[200];
  28.     va_start(ap, fmt);
  29.     BaseString::vsnprintf(buf, sizeof(buf), fmt, ap);
  30.     va_end(ap);
  31.     ndbout << "FATAL: " << buf << endl;
  32.     sleep(1);
  33.     exit(1);
  34. }
  35. static void
  36. debug(char const* fmt, ...)
  37. {
  38.     va_list ap;
  39.     char buf[200];
  40.     va_start(ap, fmt);
  41.     BaseString::vsnprintf(buf, sizeof(buf), fmt, ap);
  42.     va_end(ap);
  43.     ndbout << buf << endl;
  44. }
  45. // node
  46. struct Node {
  47.     enum Type { MGM = 1, DB = 2, API = 3 };
  48.     Type type;
  49.     unsigned id; // node id
  50.     static Node* list;
  51.     static unsigned count;
  52.     static Node* find(unsigned n) {
  53. for (unsigned i = 0; i < count; i++) {
  54.     if (list[i].id == n)
  55. return &list[i];
  56. }
  57. return 0;
  58.     }
  59. };
  60. unsigned Node::count = 0;
  61. Node* Node::list = 0;
  62. struct Copy {
  63.     int rfd; // read from
  64.     int wfd; // write to
  65.     unsigned char* buf;
  66.     unsigned bufsiz;
  67.     NdbThread* thread;
  68.     void run();
  69.     char info[20];
  70. };
  71. // connection between nodes 0-server side 1-client side
  72. // we are client to 0 and server to 1
  73. struct Conn {
  74.     Node* node[2]; // the nodes
  75.     unsigned port; // server port
  76.     unsigned proxy; // proxy port
  77.     static unsigned count;
  78.     static unsigned proxycount;
  79.     static Conn* list;
  80.     NdbThread* thread; // thread handling this connection
  81.     void run(); // run the connection
  82.     int sockfd[2]; // socket 0-on server side 1-client side
  83.     void conn0(); // connect to side 0
  84.     void conn1(); // connect to side 0
  85.     char info[20];
  86.     Copy copy[2]; // 0-to-1 and 1-to-0
  87. };
  88. unsigned Conn::count = 0;
  89. unsigned Conn::proxycount = 0;
  90. Conn* Conn::list = 0;
  91. // global data
  92. static char* hostname = 0;
  93. static struct sockaddr_in hostaddr;
  94. static char* localcfgfile = 0;
  95. static char* initcfgfile = 0;
  96. static unsigned ownnodeid = 0;
  97. static void
  98. properr(const Properties* props, const char* name, int i = -1)
  99. {
  100.     if (i < 0) {
  101. fatal("get %s failed: errno = %d", name, props->getPropertiesErrno());
  102.     } else {
  103. fatal("get %s_%d failed: errno = %d", name, i, props->getPropertiesErrno());
  104.     }
  105. }
  106. // read config and load it into our structs
  107. static void
  108. getcfg()
  109. {
  110.     LocalConfig lcfg;
  111.     if (! lcfg.read(localcfgfile)) {
  112. fatal("read %s failed", localcfgfile);
  113.     }
  114.     ownnodeid = lcfg._ownNodeId;
  115.     debug("ownnodeid = %d", ownnodeid);
  116.     InitConfigFileParser pars(initcfgfile);
  117.     Config icfg;
  118.     if (! pars.getConfig(icfg)) {
  119. fatal("parse %s failed", initcfgfile);
  120.     }
  121.     Properties* ccfg = icfg.getConfig(ownnodeid);
  122.     if (ccfg == 0) {
  123. const char* err = "unknown error";
  124. fatal("getConfig: %s", err);
  125.     }
  126.     ccfg->put("NodeId", ownnodeid);
  127.     ccfg->put("NodeType", "MGM");
  128.     if (! ccfg->get("NoOfNodes", &Node::count)) {
  129. properr(ccfg, "NoOfNodes", -1);
  130.     }
  131.     debug("Node::count = %d", Node::count);
  132.     Node::list = new Node[Node::count];
  133.     for (unsigned i = 0; i < Node::count; i++) {
  134. Node& node = Node::list[i];
  135. const Properties* nodecfg;
  136. if (! ccfg->get("Node", 1+i, &nodecfg)) {
  137.     properr(ccfg, "Node", 1+i);
  138. }
  139. const char* type;
  140. if (! nodecfg->get("Type", &type)) {
  141.     properr(nodecfg, "Type");
  142. }
  143. if (strcmp(type, "MGM") == 0) {
  144.     node.type = Node::MGM;
  145. } else if (strcmp(type, "DB") == 0) {
  146.     node.type = Node::DB;
  147. } else if (strcmp(type, "API") == 0) {
  148.     node.type = Node::API;
  149. } else {
  150.     fatal("prop %s_%d bad Type = %s", "Node", 1+i, type);
  151. }
  152. if (! nodecfg->get("NodeId", &node.id)) {
  153.     properr(nodecfg, "NodeId");
  154. }
  155. debug("node id=%d type=%d", node.id, node.type);
  156.     }
  157.     IPCConfig ipccfg(ccfg);
  158.     if (ipccfg.init() != 0) {
  159. fatal("ipccfg init failed");
  160.     }
  161.     if (! ccfg->get("NoOfConnections", &Conn::count)) {
  162. properr(ccfg, "NoOfConnections");
  163.     }
  164.     debug("Conn::count = %d", Conn::count);
  165.     Conn::list = new Conn[Conn::count];
  166.     for (unsigned i = 0; i < Conn::count; i++) {
  167. Conn& conn = Conn::list[i];
  168. const Properties* conncfg;
  169. if (! ccfg->get("Connection", i, &conncfg)) {
  170.     properr(ccfg, "Connection", i);
  171. }
  172. unsigned n;
  173. if (! conncfg->get("NodeId1", &n)) {
  174.     properr(conncfg, "NodeId1");
  175. }
  176. if ((conn.node[0] = Node::find(n)) == 0) {
  177.     fatal("node %d not found", n);
  178. }
  179. if (! conncfg->get("NodeId2", &n)) {
  180.     properr(conncfg, "NodeId2");
  181. }
  182. if ((conn.node[1] = Node::find(n)) == 0) {
  183.     fatal("node %d not found", n);
  184. }
  185. if (! conncfg->get("PortNumber", &conn.port)) {
  186.     properr(conncfg, "PortNumber");
  187. }
  188. conn.proxy = 0;
  189. const char* proxy;
  190. if (conncfg->get("Proxy", &proxy)) {
  191.     conn.proxy = atoi(proxy);
  192.     if (conn.proxy > 0) {
  193. Conn::proxycount++;
  194.     }
  195. }
  196. sprintf(conn.info, "conn %d-%d", conn.node[0]->id, conn.node[1]->id);
  197.     }
  198. }
  199. void
  200. Conn::conn0()
  201. {
  202.     int fd;
  203.     while (1) {
  204. if ((fd = socket(PF_INET, SOCK_STREAM, 0)) == -1) {
  205.     fatal("%s: create client socket failed: %s", info, strerror(errno));
  206. }
  207. struct sockaddr_in servaddr;
  208. memset(&servaddr, 0, sizeof(servaddr));
  209. servaddr.sin_family = AF_INET;
  210. servaddr.sin_port = htons(port);
  211. servaddr.sin_addr = hostaddr.sin_addr;
  212. #if 0 // coredump
  213. if (Ndb_getInAddr(&servaddr.sin_addr, hostname) != 0) {
  214.     fatal("%s: hostname %s lookup failed", info, hostname);
  215. }
  216. #endif
  217. if (connect(fd, (struct sockaddr*)&servaddr, sizeof(servaddr)) == 0)
  218.     break;
  219. if (errno != ECONNREFUSED) {
  220.     fatal("%s: connect failed: %s", info, strerror(errno));
  221. }
  222. close(fd);
  223. NdbSleep_MilliSleep(100);
  224.     }
  225.     sockfd[0] = fd;
  226.     debug("%s: side 0 connected", info);
  227. }
  228. void
  229. Conn::conn1()
  230. {
  231.     int servfd;
  232.     if ((servfd = socket(PF_INET, SOCK_STREAM, 0)) == -1) {
  233. fatal("%s: create server socket failed: %s", info, strerror(errno));
  234.     }
  235.     struct sockaddr_in servaddr;
  236.     memset(&servaddr, 0, sizeof(servaddr));
  237.     servaddr.sin_family = AF_INET;
  238.     servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
  239.     servaddr.sin_port = htons(proxy);
  240.     const int on = 1;
  241.     setsockopt(servfd, SOL_SOCKET, SO_REUSEADDR, (const char*)&on, sizeof(on));
  242.     if (bind(servfd, (struct sockaddr*) &servaddr, sizeof(servaddr)) == -1) {
  243. fatal("%s: bind %d failed: %s", info, proxy, strerror(errno));
  244.     }
  245.     if (listen(servfd, 1) == -1) {
  246. fatal("%s: listen %d failed: %s", info, proxy, strerror(errno));
  247.     }
  248.     int fd;
  249.     if ((fd = accept(servfd, 0, 0)) == -1) {
  250. fatal("%s: accept failed: %s", info, strerror(errno));
  251.     }
  252.     sockfd[1] = fd;
  253.     close(servfd);
  254.     debug("%s: side 1 connected", info);
  255. }
  256. void
  257. Copy::run()
  258. {
  259.     debug("%s: start", info);
  260.     int n, m;
  261.     while (1) {
  262. n = read(rfd, buf, sizeof(buf));
  263. if (n < 0)
  264.     fatal("read error: %s", strerror(errno));
  265. m = write(wfd, buf, n);
  266. if (m != n)
  267.     fatal("write error: %s", strerror(errno));
  268.     }
  269.     debug("%s: stop", info);
  270. }
  271. extern "C" void*
  272. copyrun_C(void* copy)
  273. {
  274.     ((Copy*) copy)->run();
  275.     return 0;
  276. }
  277. void
  278. Conn::run()
  279. {
  280.     debug("%s: start", info);
  281.     conn1();
  282.     conn0();
  283.     const unsigned siz = 32 * 1024;
  284.     for (int i = 0; i < 2; i++) {
  285. Copy& copy = this->copy[i];
  286. copy.rfd = sockfd[i];
  287. copy.wfd = sockfd[1-i];
  288. copy.buf = new unsigned char[siz];
  289. copy.bufsiz = siz;
  290. sprintf(copy.info, "copy %d-%d", this->node[i]->id, this->node[1-i]->id);
  291. copy.thread = NdbThread_Create(copyrun_C, (void**)&copy,
  292.     8192, "copyrun", NDB_THREAD_PRIO_LOW);
  293. if (copy.thread == 0) {
  294.     fatal("%s: create thread %d failed errno=%d", i, errno);
  295. }
  296.     }
  297.     debug("%s: stop", info);
  298. }
  299. extern "C" void*
  300. connrun_C(void* conn)
  301. {
  302.     ((Conn*) conn)->run();
  303.     return 0;
  304. }
  305. static void
  306. start()
  307. {
  308.     NdbThread_SetConcurrencyLevel(3 * Conn::proxycount + 2);
  309.     for (unsigned i = 0; i < Conn::count; i++) {
  310. Conn& conn = Conn::list[i];
  311. if (! conn.proxy)
  312.     continue;
  313. conn.thread = NdbThread_Create(connrun_C, (void**)&conn,
  314.     8192, "connrun", NDB_THREAD_PRIO_LOW);
  315. if (conn.thread == 0) {
  316.     fatal("create thread %d failed errno=%d", i, errno);
  317. }
  318.     }
  319.     sleep(3600);
  320. }
  321. int
  322. main(int av, char** ac)
  323. {
  324.     ndb_init();
  325.     debug("start");
  326.     hostname = "ndb-srv7";
  327.     if (Ndb_getInAddr(&hostaddr.sin_addr, hostname) != 0) {
  328. fatal("hostname %s lookup failed", hostname);
  329.     }
  330.     localcfgfile = "Ndb.cfg";
  331.     initcfgfile = "config.txt";
  332.     getcfg();
  333.     start();
  334.     debug("done");
  335.     return 0;
  336. }
  337. // vim: set sw=4 noet: