IPCConfig.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:15k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #include <ndb_global.h>
  14. #include <ndb_opt_defaults.h>
  15. #include <IPCConfig.hpp>
  16. #include <NdbOut.hpp>
  17. #include <NdbHost.h>
  18. #include <TransporterDefinitions.hpp>
  19. #include <TransporterRegistry.hpp>
  20. #include <Properties.hpp>
  21. #include <mgmapi_configuration.hpp>
  22. #include <mgmapi_config_parameters.h>
  23. #if defined DEBUG_TRANSPORTER
  24. #define DEBUG(t) ndbout << __FILE__ << ":" << __LINE__ << ":" << t << endl;
  25. #else
  26. #define DEBUG(t)
  27. #endif
  28. IPCConfig::IPCConfig(Properties * p)
  29. {
  30.   theNoOfRemoteNodes = 0;
  31.   the_ownId = 0;
  32.   if(p != 0)
  33.     props = new Properties(* p);
  34.   else
  35.     props = 0;
  36. }
  37. IPCConfig::~IPCConfig()
  38. {
  39.   if(props != 0){
  40.     delete props;
  41.   }
  42. }
  43. int
  44. IPCConfig::init(){
  45.   Uint32 nodeId;
  46.   if(props == 0) return -1;
  47.   if(!props->get("LocalNodeId", &nodeId)) {
  48.     DEBUG( "Did not find local node id." );
  49.     return -1;
  50.   }
  51.   the_ownId = nodeId;
  52.   
  53.   Uint32 noOfConnections;
  54.   if(!props->get("NoOfConnections", &noOfConnections)) {
  55.     DEBUG( "Did not find noOfConnections." );
  56.     return -1;
  57.   }
  58.   
  59.   for(Uint32 i = 0; i<noOfConnections; i++){
  60.     const Properties * tmp;
  61.     Uint32 node1, node2;
  62.     if(!props->get("Connection", i, &tmp)) {
  63.       DEBUG( "Did not find Connection." );
  64.       return -1;
  65.     }
  66.     if(!tmp->get("NodeId1", &node1)) {
  67.       DEBUG( "Did not find NodeId1." );
  68.       return -1;
  69.     }
  70.     if(!tmp->get("NodeId2", &node2)) {
  71.       DEBUG( "Did not find NodeId2." );
  72.       return -1;
  73.     }
  74.     if(node1 == the_ownId && node2 != the_ownId)
  75.       if(!addRemoteNodeId(node2)) {
  76. DEBUG( "addRemoteNodeId(node2) failed." );
  77. return -1;
  78.       }
  79.     if(node1 != the_ownId && node2 == the_ownId)
  80.       if(!addRemoteNodeId(node1)) {
  81. DEBUG( "addRemoteNodeId(node2) failed." );
  82. return -1;
  83.       }
  84.   }
  85.   return 0;
  86. }
  87. bool
  88. IPCConfig::addRemoteNodeId(NodeId nodeId){
  89.   for(int i = 0; i<theNoOfRemoteNodes; i++)
  90.     if(theRemoteNodeIds[i] == nodeId)
  91.       return false;
  92.   theRemoteNodeIds[theNoOfRemoteNodes] = nodeId;
  93.   theNoOfRemoteNodes++;
  94.   return true;
  95. }
  96. /**
  97.  * Returns no of transporters configured
  98.  */
  99. int
  100. IPCConfig::configureTransporters(TransporterRegistry * theTransporterRegistry)
  101. {
  102.   DBUG_ENTER("IPCConfig::configureTransporters");
  103.   int noOfTransportersCreated = 0;
  104.   Uint32 noOfConnections;
  105.   if(!props->get("NoOfConnections", &noOfConnections)) return -1;
  106.   
  107.   for (Uint32 i = 0; i < noOfConnections; i++){
  108.     const Properties * tmp;
  109.     Uint32 nodeId1, nodeId2;
  110.     const char * host1;
  111.     const char * host2;
  112.     if(!props->get("Connection", i, &tmp)) continue;
  113.     if(!tmp->get("NodeId1", &nodeId1)) continue;
  114.     if(!tmp->get("NodeId2", &nodeId2)) continue;
  115.     if(nodeId1 != the_ownId && nodeId2 != the_ownId) continue;
  116.     Uint32 sendSignalId;
  117.     Uint32 compression;
  118.     Uint32 checksum;
  119.     if(!tmp->get("SendSignalId", &sendSignalId)) continue;
  120.     if(!tmp->get("Checksum", &checksum)) continue;
  121.     
  122.     const char * type;
  123.     if(!tmp->get("Type", &type)) continue;
  124.     if(strcmp("SHM", type) == 0){
  125.       SHM_TransporterConfiguration conf;
  126.       conf.localNodeId  = the_ownId;
  127.       conf.remoteNodeId = (nodeId1 != the_ownId ? nodeId1 : nodeId2);
  128.       conf.checksum     = checksum;
  129.       conf.signalId     = sendSignalId;
  130.       if(!tmp->get("ShmKey", &conf.shmKey)) continue;
  131.       if(!tmp->get("ShmSize", &conf.shmSize)) continue;
  132.       if(!theTransporterRegistry->createTransporter(&conf)){
  133. ndbout << "Failed to create SHM Transporter from: " 
  134.        << conf.localNodeId << " to: " << conf.remoteNodeId << endl;
  135. continue;
  136.       } else {
  137. noOfTransportersCreated++;
  138. continue;
  139.       }
  140.     } else if(strcmp("SCI", type) == 0){
  141.       SCI_TransporterConfiguration conf;
  142.       conf.localNodeId  = the_ownId;
  143.       conf.remoteNodeId = (nodeId1 != the_ownId ? nodeId1 : nodeId2);
  144.       conf.checksum     = checksum;
  145.       conf.signalId     = sendSignalId;
  146.     
  147.       if(!tmp->get("SendLimit", &conf.sendLimit)) continue;
  148.       if(!tmp->get("SharedBufferSize", &conf.bufferSize)) continue;
  149.       if(the_ownId == nodeId1){
  150. if(!tmp->get("Node1_NoOfAdapters", &conf.nLocalAdapters)) continue;
  151. if(!tmp->get("Node2_Adapter", 0, &conf.remoteSciNodeId0)) continue;
  152. if(conf.nLocalAdapters > 1){
  153.   if(!tmp->get("Node2_Adapter", 1, &conf.remoteSciNodeId1)) continue;
  154. }
  155.       } else {
  156. if(!tmp->get("Node2_NoOfAdapters", &conf.nLocalAdapters)) continue;
  157. if(!tmp->get("Node1_Adapter", 0, &conf.remoteSciNodeId0)) continue;
  158. if(conf.nLocalAdapters > 1){
  159.   if(!tmp->get("Node1_Adapter", 1, &conf.remoteSciNodeId1)) continue;
  160. }
  161.       }
  162.       if(!theTransporterRegistry->createTransporter(&conf)){
  163. ndbout << "Failed to create SCI Transporter from: " 
  164.        << conf.localNodeId << " to: " << conf.remoteNodeId << endl;
  165. continue;
  166.       } else {
  167. noOfTransportersCreated++;
  168. continue;
  169.       }
  170.     }
  171.     
  172.     if(!tmp->get("HostName1", &host1)) continue;
  173.     if(!tmp->get("HostName2", &host2)) continue;
  174.     Uint32 ownNodeId;
  175.     Uint32 remoteNodeId;
  176.     const char * ownHostName;
  177.     const char * remoteHostName;
  178.     if(nodeId1 == the_ownId){
  179.       ownNodeId      = nodeId1;
  180.       ownHostName    = host1;
  181.       remoteNodeId   = nodeId2;
  182.       remoteHostName = host2;
  183.     } else if(nodeId2 == the_ownId){
  184.       ownNodeId      = nodeId2;
  185.       ownHostName    = host2;
  186.       remoteNodeId   = nodeId1;
  187.       remoteHostName = host1;
  188.     } else {
  189.       continue;
  190.     }
  191.     
  192.     if(strcmp("TCP", type) == 0){
  193.       TCP_TransporterConfiguration conf;
  194.       
  195.       if(!tmp->get("PortNumber", &conf.port)) continue;
  196.       if(!tmp->get("SendBufferSize", &conf.sendBufferSize)) continue;
  197.       if(!tmp->get("MaxReceiveSize", &conf.maxReceiveSize)) continue;
  198.       
  199.       const char * proxy;
  200.       if (tmp->get("Proxy", &proxy)) {
  201. if (strlen(proxy) > 0 && nodeId2 == the_ownId) {
  202.   // TODO handle host:port
  203.   conf.port = atoi(proxy);
  204. }
  205.       }
  206.       conf.sendBufferSize *= MAX_MESSAGE_SIZE;
  207.       conf.maxReceiveSize *= MAX_MESSAGE_SIZE;
  208.       
  209.       conf.remoteHostName = remoteHostName;
  210.       conf.localHostName  = ownHostName;
  211.       conf.remoteNodeId   = remoteNodeId;
  212.       conf.localNodeId    = ownNodeId;
  213.       conf.checksum       = checksum;
  214.       conf.signalId       = sendSignalId;
  215.       if(!theTransporterRegistry->createTransporter(&conf)){
  216. ndbout << "Failed to create TCP Transporter from: " 
  217.        << ownNodeId << " to: " << remoteNodeId << endl;
  218.       } else {
  219. noOfTransportersCreated++;
  220.       }
  221.     } else if(strcmp("OSE", type) == 0){
  222.       OSE_TransporterConfiguration conf;
  223.       if(!tmp->get("PrioASignalSize", &conf.prioASignalSize))
  224. continue;
  225.       if(!tmp->get("PrioBSignalSize", &conf.prioBSignalSize))
  226. continue;
  227.       if(!tmp->get("ReceiveArraySize", &conf.receiveBufferSize))
  228. continue;
  229.       
  230.       conf.remoteHostName = remoteHostName;
  231.       conf.localHostName  = ownHostName;
  232.       conf.remoteNodeId   = remoteNodeId;
  233.       conf.localNodeId    = ownNodeId;
  234.       conf.checksum       = checksum;
  235.       conf.signalId       = sendSignalId;
  236.       if(!theTransporterRegistry->createTransporter(&conf)){
  237. ndbout << "Failed to create OSE Transporter from: " 
  238.        << ownNodeId << " to: " << remoteNodeId << endl;
  239.       } else {
  240. noOfTransportersCreated++;
  241.       }
  242.     } else {
  243.       continue;
  244.     }
  245.   }
  246.   DBUG_RETURN(noOfTransportersCreated);
  247. }
  248. /**
  249.  * Supply a nodeId,
  250.  *  and get next higher node id
  251.  * Returns false if none found
  252.  */
  253. bool
  254. IPCConfig::getNextRemoteNodeId(NodeId & nodeId) const {
  255.   NodeId returnNode = MAX_NODES + 1;
  256.   for(int i = 0; i<theNoOfRemoteNodes; i++)
  257.     if(theRemoteNodeIds[i] > nodeId){
  258.       if(theRemoteNodeIds[i] < returnNode){
  259. returnNode = theRemoteNodeIds[i];
  260.       }
  261.     }
  262.   if(returnNode == (MAX_NODES + 1))
  263.     return false;
  264.   nodeId = returnNode;
  265.   return true;
  266. }
  267. Uint32 
  268. IPCConfig::getREPHBFrequency(NodeId id) const {
  269.   const Properties * tmp;
  270.   Uint32 out;
  271.   /**
  272.    *  Todo: Fix correct heartbeat
  273.    */
  274.   if (!props->get("Node", id, &tmp) || 
  275.       !tmp->get("HeartbeatIntervalRepRep", &out)) {
  276.     DEBUG("Illegal Node or HeartbeatIntervalRepRep in config.");    
  277.     out = 10000;
  278.   }
  279.   
  280.   return out;
  281. }
  282. const char* 
  283. IPCConfig::getNodeType(NodeId id) const {
  284.   const char * out;
  285.   const Properties * tmp;
  286.   if (!props->get("Node", id, &tmp) || !tmp->get("Type", &out)) {
  287.     DEBUG("Illegal Node or NodeType in config.");
  288.     out = "Unknown";
  289.   }
  290.   return out;
  291. }
  292. #include <mgmapi.h>
  293. Uint32
  294. IPCConfig::configureTransporters(Uint32 nodeId,
  295.  const class ndb_mgm_configuration & config,
  296.  class TransporterRegistry & tr){
  297.   DBUG_ENTER("IPCConfig::configureTransporters");
  298.   Uint32 noOfTransportersCreated= 0;
  299.   ndb_mgm_configuration_iterator iter(config, CFG_SECTION_CONNECTION);
  300.   
  301.   for(iter.first(); iter.valid(); iter.next()){
  302.     
  303.     Uint32 nodeId1, nodeId2, remoteNodeId;
  304.     const char * remoteHostName= 0, * localHostName= 0;
  305.     if(iter.get(CFG_CONNECTION_NODE_1, &nodeId1)) continue;
  306.     if(iter.get(CFG_CONNECTION_NODE_2, &nodeId2)) continue;
  307.     if(nodeId1 != nodeId && nodeId2 != nodeId) continue;
  308.     remoteNodeId = (nodeId == nodeId1 ? nodeId2 : nodeId1);
  309.     {
  310.       const char * host1= 0, * host2= 0;
  311.       iter.get(CFG_CONNECTION_HOSTNAME_1, &host1);
  312.       iter.get(CFG_CONNECTION_HOSTNAME_2, &host2);
  313.       localHostName  = (nodeId == nodeId1 ? host1 : host2);
  314.       remoteHostName = (nodeId == nodeId1 ? host2 : host1);
  315.     }
  316.     Uint32 sendSignalId = 1;
  317.     Uint32 checksum = 1;
  318.     if(iter.get(CFG_CONNECTION_SEND_SIGNAL_ID, &sendSignalId)) continue;
  319.     if(iter.get(CFG_CONNECTION_CHECKSUM, &checksum)) continue;
  320.     Uint32 type = ~0;
  321.     if(iter.get(CFG_TYPE_OF_SECTION, &type)) continue;
  322.     Uint32 server_port= 0;
  323.     if(iter.get(CFG_CONNECTION_SERVER_PORT, &server_port)) break;
  324.     if (nodeId <= nodeId1 && nodeId <= nodeId2) {
  325.       tr.add_transporter_interface(localHostName, server_port);
  326.     }
  327.     DBUG_PRINT("info", ("Transporter between this node %d and node %d using port %d, signalId %d, checksum %d",
  328.                nodeId, remoteNodeId, server_port, sendSignalId, checksum));
  329.     switch(type){
  330.     case CONNECTION_TYPE_SHM:{
  331.       SHM_TransporterConfiguration conf;
  332.       conf.localNodeId  = nodeId;
  333.       conf.remoteNodeId = remoteNodeId;
  334.       conf.checksum     = checksum;
  335.       conf.signalId     = sendSignalId;
  336.       
  337.       if(iter.get(CFG_SHM_KEY, &conf.shmKey)) break;
  338.       if(iter.get(CFG_SHM_BUFFER_MEM, &conf.shmSize)) break;
  339.       {
  340. Uint32 tmp;
  341. if(iter.get(CFG_SHM_SIGNUM, &tmp)) break;
  342. conf.signum= tmp;
  343.       }
  344.       conf.port= server_port;
  345.       conf.localHostName  = localHostName;
  346.       conf.remoteHostName = remoteHostName;
  347.       if(!tr.createTransporter(&conf)){
  348.         DBUG_PRINT("error", ("Failed to create SHM Transporter from %d to %d",
  349.            conf.localNodeId, conf.remoteNodeId));
  350. ndbout << "Failed to create SHM Transporter from: " 
  351.        << conf.localNodeId << " to: " << conf.remoteNodeId << endl;
  352.       } else {
  353. noOfTransportersCreated++;
  354.       }
  355.       DBUG_PRINT("info", ("Created SHM Transporter using shmkey %d, buf size = %d",
  356.                  conf.shmKey, conf.shmSize));
  357.       break;
  358.     }
  359.     case CONNECTION_TYPE_SCI:{
  360.       SCI_TransporterConfiguration conf;
  361.       conf.localNodeId  = nodeId;
  362.       conf.remoteNodeId = remoteNodeId;
  363.       conf.checksum     = checksum;
  364.       conf.signalId     = sendSignalId;
  365.       conf.port= server_port;
  366.       
  367.       conf.localHostName  = localHostName;
  368.       conf.remoteHostName = remoteHostName;
  369.       if(iter.get(CFG_SCI_SEND_LIMIT, &conf.sendLimit)) break;
  370.       if(iter.get(CFG_SCI_BUFFER_MEM, &conf.bufferSize)) break;
  371.       if (nodeId == nodeId1) {
  372.         if(iter.get(CFG_SCI_HOST2_ID_0, &conf.remoteSciNodeId0)) break;
  373.         if(iter.get(CFG_SCI_HOST2_ID_1, &conf.remoteSciNodeId1)) break;
  374.       } else {
  375.         if(iter.get(CFG_SCI_HOST1_ID_0, &conf.remoteSciNodeId0)) break;
  376.         if(iter.get(CFG_SCI_HOST1_ID_1, &conf.remoteSciNodeId1)) break;
  377.       }
  378.       if (conf.remoteSciNodeId1 == 0) {
  379.         conf.nLocalAdapters = 1;
  380.       } else {
  381.         conf.nLocalAdapters = 2;
  382.       }
  383.      if(!tr.createTransporter(&conf)){
  384.         DBUG_PRINT("error", ("Failed to create SCI Transporter from %d to %d",
  385.            conf.localNodeId, conf.remoteNodeId));
  386. ndbout << "Failed to create SCI Transporter from: " 
  387.        << conf.localNodeId << " to: " << conf.remoteNodeId << endl;
  388.       } else {
  389.         DBUG_PRINT("info", ("Created SCI Transporter: Adapters = %d, remote SCI node id %d",
  390.                    conf.nLocalAdapters, conf.remoteSciNodeId0));
  391.         DBUG_PRINT("info", ("Host 1 = %s, Host 2 = %s, sendLimit = %d, buf size = %d",
  392.                    conf.localHostName, conf.remoteHostName, conf.sendLimit, conf.bufferSize));
  393.         if (conf.nLocalAdapters > 1) {
  394.           DBUG_PRINT("info", ("Fault-tolerant with 2 Remote Adapters, second remote SCI node id = %d",
  395.                      conf.remoteSciNodeId1)); 
  396.         }
  397. noOfTransportersCreated++;
  398. continue;
  399.       }
  400.     }
  401.     case CONNECTION_TYPE_TCP:{
  402.       TCP_TransporterConfiguration conf;
  403.       
  404.       if(iter.get(CFG_TCP_SEND_BUFFER_SIZE, &conf.sendBufferSize)) break;
  405.       if(iter.get(CFG_TCP_RECEIVE_BUFFER_SIZE, &conf.maxReceiveSize)) break;
  406.       
  407.       conf.port= server_port;
  408.       const char * proxy;
  409.       if (!iter.get(CFG_TCP_PROXY, &proxy)) {
  410. if (strlen(proxy) > 0 && nodeId2 == nodeId) {
  411.   // TODO handle host:port
  412.   conf.port = atoi(proxy);
  413. }
  414.       }
  415.       
  416.       conf.localNodeId    = nodeId;
  417.       conf.remoteNodeId   = remoteNodeId;
  418.       conf.localHostName  = localHostName;
  419.       conf.remoteHostName = remoteHostName;
  420.       conf.checksum       = checksum;
  421.       conf.signalId       = sendSignalId;
  422.       
  423.       if(!tr.createTransporter(&conf)){
  424. ndbout << "Failed to create TCP Transporter from: " 
  425.        << nodeId << " to: " << remoteNodeId << endl;
  426.       } else {
  427. noOfTransportersCreated++;
  428.       }
  429.       DBUG_PRINT("info", ("Created TCP Transporter: sendBufferSize = %d, maxReceiveSize = %d",
  430.                  conf.sendBufferSize, conf.maxReceiveSize));
  431.       break;
  432.     case CONNECTION_TYPE_OSE:{
  433.       OSE_TransporterConfiguration conf;
  434.       if(iter.get(CFG_OSE_PRIO_A_SIZE, &conf.prioASignalSize)) break;
  435.       if(iter.get(CFG_OSE_PRIO_B_SIZE, &conf.prioBSignalSize)) break;
  436.       if(iter.get(CFG_OSE_RECEIVE_ARRAY_SIZE, &conf.receiveBufferSize)) break;
  437.       
  438.       conf.localNodeId    = nodeId;
  439.       conf.remoteNodeId   = remoteNodeId;
  440.       conf.localHostName  = localHostName;
  441.       conf.remoteHostName = remoteHostName;
  442.       conf.checksum       = checksum;
  443.       conf.signalId       = sendSignalId;
  444.       
  445.       if(!tr.createTransporter(&conf)){
  446. ndbout << "Failed to create OSE Transporter from: " 
  447.        << nodeId << " to: " << remoteNodeId << endl;
  448.       } else {
  449. noOfTransportersCreated++;
  450.       }
  451.     }
  452.     default:
  453.       ndbout << "Unknown transporter type from: " << nodeId << 
  454. " to: " << remoteNodeId << endl;
  455.       break;
  456.     }
  457.     }
  458.   }
  459.   DBUG_RETURN(noOfTransportersCreated);
  460. }
  461.