ndb_cluster_connection.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:15k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #include <ndb_global.h>
  14. #include <my_pthread.h>
  15. #include <my_sys.h>
  16. #include "ndb_cluster_connection_impl.hpp"
  17. #include <mgmapi_configuration.hpp>
  18. #include <mgmapi_config_parameters.h>
  19. #include <TransporterFacade.hpp>
  20. #include <NdbOut.hpp>
  21. #include <NdbSleep.h>
  22. #include <NdbThread.h>
  23. #include <ndb_limits.h>
  24. #include <ConfigRetriever.hpp>
  25. #include <ndb_version.h>
  26. #include <Vector.hpp>
  27. #include <md5_hash.hpp>
  28. #include <EventLogger.hpp>
  29. EventLogger g_eventLogger;
  30. static int g_run_connect_thread= 0;
  31. #include <NdbMutex.h>
  32. NdbMutex *ndb_global_event_buffer_mutex= NULL;
  33. #ifdef VM_TRACE
  34. NdbMutex *ndb_print_state_mutex= NULL;
  35. #endif
  36. /*
  37.  * Ndb_cluster_connection
  38.  */
  39. Ndb_cluster_connection::Ndb_cluster_connection(const char *connect_string)
  40.   : m_impl(* new Ndb_cluster_connection_impl(connect_string))
  41. {
  42. }
  43. Ndb_cluster_connection::Ndb_cluster_connection
  44. (Ndb_cluster_connection_impl& impl) : m_impl(impl)
  45. {
  46. }
  47. Ndb_cluster_connection::~Ndb_cluster_connection()
  48. {
  49.   Ndb_cluster_connection_impl *tmp = &m_impl;
  50.   if (this != tmp)
  51.     delete tmp;
  52. }
  53. int Ndb_cluster_connection::get_connected_port() const
  54. {
  55.   if (m_impl.m_config_retriever)
  56.     return m_impl.m_config_retriever->get_mgmd_port();
  57.   return -1;
  58. }
  59. const char *Ndb_cluster_connection::get_connected_host() const
  60. {
  61.   if (m_impl.m_config_retriever)
  62.     return m_impl.m_config_retriever->get_mgmd_host();
  63.   return 0;
  64. }
  65. const char *Ndb_cluster_connection::get_connectstring(char *buf,
  66.       int buf_sz) const
  67. {
  68.   if (m_impl.m_config_retriever)
  69.     return m_impl.m_config_retriever->get_connectstring(buf,buf_sz);
  70.   return 0;
  71. }
  72. extern "C" pthread_handler_decl(run_ndb_cluster_connection_connect_thread, me)
  73. {
  74.   g_run_connect_thread= 1;
  75.   ((Ndb_cluster_connection_impl*) me)->connect_thread();
  76.   return me;
  77. }
  78. int Ndb_cluster_connection::start_connect_thread(int (*connect_callback)(void))
  79. {
  80.   int r;
  81.   DBUG_ENTER("Ndb_cluster_connection::start_connect_thread");
  82.   m_impl.m_connect_callback= connect_callback;
  83.   if ((r = connect(0,0,0)) == 1)
  84.   {
  85.     DBUG_PRINT("info",("starting thread"));
  86.     m_impl.m_connect_thread= 
  87.       NdbThread_Create(run_ndb_cluster_connection_connect_thread,
  88.        (void**)&m_impl, 32768, "ndb_cluster_connection",
  89.        NDB_THREAD_PRIO_LOW);
  90.   }
  91.   else if (r < 0)
  92.   {
  93.     DBUG_RETURN(-1);
  94.   }
  95.   else if (m_impl.m_connect_callback)
  96.   { 
  97.     (*m_impl.m_connect_callback)();
  98.   }
  99.   DBUG_RETURN(0);
  100. }
  101. void Ndb_cluster_connection::set_optimized_node_selection(int val)
  102. {
  103.   m_impl.m_optimized_node_selection= val;
  104. }
  105. void
  106. Ndb_cluster_connection_impl::init_get_next_node
  107. (Ndb_cluster_connection_node_iter &iter)
  108. {
  109.   if (iter.scan_state != (Uint8)~0)
  110.     iter.cur_pos= iter.scan_state;
  111.   if (iter.cur_pos >= no_db_nodes())
  112.     iter.cur_pos= 0;
  113.   iter.init_pos= iter.cur_pos;
  114.   iter.scan_state= 0;
  115.   //  fprintf(stderr,"[init %d]",iter.init_pos);
  116.   return;
  117. }
  118. Uint32
  119. Ndb_cluster_connection_impl::get_next_node(Ndb_cluster_connection_node_iter &iter)
  120. {
  121.   Uint32 cur_pos= iter.cur_pos;
  122.   if (cur_pos >= no_db_nodes())
  123.     return 0;
  124.   Ndb_cluster_connection_impl::Node *nodes= m_impl.m_all_nodes.getBase();
  125.   Ndb_cluster_connection_impl::Node &node=  nodes[cur_pos];
  126.   if (iter.scan_state != (Uint8)~0)
  127.   {
  128.     assert(iter.scan_state < no_db_nodes());
  129.     if (nodes[iter.scan_state].group == node.group)
  130.       iter.scan_state= ~0;
  131.     else
  132.       return nodes[iter.scan_state++].id;
  133.   }
  134.   //  fprintf(stderr,"[%d]",node.id);
  135.   cur_pos++;
  136.   Uint32 init_pos= iter.init_pos;
  137.   if (cur_pos == node.next_group)
  138.   {
  139.     cur_pos= nodes[init_pos].this_group;
  140.   }
  141.   //  fprintf(stderr,"[cur_pos %d]",cur_pos);
  142.   if (cur_pos != init_pos)
  143.     iter.cur_pos= cur_pos;
  144.   else
  145.   {
  146.     iter.cur_pos= node.next_group;
  147.     iter.init_pos= node.next_group;
  148.   }
  149.   return node.id;
  150. }
  151. unsigned
  152. Ndb_cluster_connection::no_db_nodes()
  153. {
  154.   return m_impl.m_all_nodes.size();
  155. }
  156. int
  157. Ndb_cluster_connection::wait_until_ready(int timeout,
  158.  int timeout_after_first_alive)
  159. {
  160.   DBUG_ENTER("Ndb_cluster_connection::wait_until_ready");
  161.   TransporterFacade *tp = TransporterFacade::instance();
  162.   if (tp == 0)
  163.   {
  164.     DBUG_RETURN(-1);
  165.   }
  166.   if (tp->ownId() == 0)
  167.   {
  168.     DBUG_RETURN(-1);
  169.   }
  170.   int secondsCounter = 0;
  171.   int milliCounter = 0;
  172.   int noChecksSinceFirstAliveFound = 0;
  173.   do {
  174.     unsigned int foundAliveNode = 0;
  175.     tp->lock_mutex();
  176.     for(unsigned i= 0; i < no_db_nodes(); i++)
  177.     {
  178.       //************************************************
  179.       // If any node is answering, ndb is answering
  180.       //************************************************
  181.       if (tp->get_node_alive(m_impl.m_all_nodes[i].id) != 0) {
  182. foundAliveNode++;
  183.       }
  184.     }
  185.     tp->unlock_mutex();
  186.     if (foundAliveNode == no_db_nodes())
  187.     {
  188.       DBUG_RETURN(0);
  189.     }
  190.     else if (foundAliveNode > 0)
  191.     {
  192.       noChecksSinceFirstAliveFound++;
  193.       // 100 ms delay -> 10*
  194.       if (noChecksSinceFirstAliveFound > 10*timeout_after_first_alive)
  195. DBUG_RETURN(1);
  196.     }
  197.     else if (secondsCounter >= timeout)
  198.     { // no alive nodes and timed out
  199.       DBUG_RETURN(-1);
  200.     }
  201.     NdbSleep_MilliSleep(100);
  202.     milliCounter += 100;
  203.     if (milliCounter >= 1000) {
  204.       secondsCounter++;
  205.       milliCounter = 0;
  206.     }//if
  207.   } while (1);
  208. }
  209. /*
  210.  * Ndb_cluster_connection_impl
  211.  */
  212. Ndb_cluster_connection_impl::Ndb_cluster_connection_impl(const char *
  213.  connect_string)
  214.   : Ndb_cluster_connection(*this),
  215.     m_optimized_node_selection(1)
  216. {
  217.   DBUG_ENTER("Ndb_cluster_connection");
  218.   DBUG_PRINT("enter",("Ndb_cluster_connection this=0x%x", this));
  219.   g_eventLogger.createConsoleHandler();
  220.   g_eventLogger.setCategory("NdbApi");
  221.   g_eventLogger.enable(Logger::LL_ON, Logger::LL_ERROR);
  222.   
  223.   m_transporter_facade=
  224.     TransporterFacade::theFacadeInstance= new TransporterFacade();
  225.   m_connect_thread= 0;
  226.   m_connect_callback= 0;
  227.   if (ndb_global_event_buffer_mutex == NULL)
  228.   {
  229.     ndb_global_event_buffer_mutex= NdbMutex_Create();
  230.   }
  231. #ifdef VM_TRACE
  232.   if (ndb_print_state_mutex == NULL)
  233.   {
  234.     ndb_print_state_mutex= NdbMutex_Create();
  235.   }
  236. #endif
  237.   m_config_retriever=
  238.     new ConfigRetriever(connect_string, NDB_VERSION, NODE_TYPE_API);
  239.   if (m_config_retriever->hasError())
  240.   {
  241.     printf("Could not connect initialize handle to management server: %s",
  242.    m_config_retriever->getErrorString());
  243.     delete m_config_retriever;
  244.     m_config_retriever= 0;
  245.   }
  246.   DBUG_VOID_RETURN;
  247. }
  248. Ndb_cluster_connection_impl::~Ndb_cluster_connection_impl()
  249. {
  250.   DBUG_ENTER("~Ndb_cluster_connection");
  251.   DBUG_PRINT("enter",("~Ndb_cluster_connection this=0x%x", this));
  252.   TransporterFacade::stop_instance();
  253.   if (m_connect_thread)
  254.   {
  255.     void *status;
  256.     g_run_connect_thread= 0;
  257.     NdbThread_WaitFor(m_connect_thread, &status);
  258.     NdbThread_Destroy(&m_connect_thread);
  259.     m_connect_thread= 0;
  260.   }
  261.   if (m_transporter_facade != 0)
  262.   {
  263.     delete m_transporter_facade;
  264.     if (m_transporter_facade != TransporterFacade::theFacadeInstance)
  265.       abort();
  266.     TransporterFacade::theFacadeInstance= 0;
  267.   }
  268.   if (m_config_retriever)
  269.     delete m_config_retriever;
  270.   //  fragmentToNodeMap.release();
  271.   DBUG_VOID_RETURN;
  272. }
  273. void
  274. Ndb_cluster_connection_impl::init_nodes_vector(Uint32 nodeid,
  275.        const ndb_mgm_configuration 
  276.        &config)
  277. {
  278.   DBUG_ENTER("Ndb_cluster_connection_impl::init_nodes_vector");
  279.   ndb_mgm_configuration_iterator iter(config, CFG_SECTION_CONNECTION);
  280.   
  281.   for(iter.first(); iter.valid(); iter.next())
  282.   {
  283.     Uint32 nodeid1, nodeid2, remoteNodeId, group= 5;
  284.     const char * remoteHostName= 0, * localHostName= 0;
  285.     if(iter.get(CFG_CONNECTION_NODE_1, &nodeid1)) continue;
  286.     if(iter.get(CFG_CONNECTION_NODE_2, &nodeid2)) continue;
  287.     if(nodeid1 != nodeid && nodeid2 != nodeid) continue;
  288.     remoteNodeId = (nodeid == nodeid1 ? nodeid2 : nodeid1);
  289.     iter.get(CFG_CONNECTION_GROUP, &group);
  290.     {
  291.       const char * host1= 0, * host2= 0;
  292.       iter.get(CFG_CONNECTION_HOSTNAME_1, &host1);
  293.       iter.get(CFG_CONNECTION_HOSTNAME_2, &host2);
  294.       localHostName  = (nodeid == nodeid1 ? host1 : host2);
  295.       remoteHostName = (nodeid == nodeid1 ? host2 : host1);
  296.     }
  297.     Uint32 type = ~0;
  298.     if(iter.get(CFG_TYPE_OF_SECTION, &type)) continue;
  299.     switch(type){
  300.     case CONNECTION_TYPE_SHM:{
  301.       break;
  302.     }
  303.     case CONNECTION_TYPE_SCI:{
  304.       break;
  305.     }
  306.     case CONNECTION_TYPE_TCP:{
  307.       // connecting through localhost
  308.       // check if config_hostname is local
  309.       if (SocketServer::tryBind(0,remoteHostName))
  310. group--; // upgrade group value
  311.       break;
  312.     }
  313.     case CONNECTION_TYPE_OSE:{
  314.       break;
  315.     }
  316.     }
  317.     m_impl.m_all_nodes.push_back(Node(group,remoteNodeId));
  318.     DBUG_PRINT("info",("saved %d %d", group,remoteNodeId));
  319.     for (int i= m_impl.m_all_nodes.size()-2;
  320.  i >= 0 && m_impl.m_all_nodes[i].group > m_impl.m_all_nodes[i+1].group;
  321.  i--)
  322.     {
  323.       Node tmp= m_impl.m_all_nodes[i];
  324.       m_impl.m_all_nodes[i]= m_impl.m_all_nodes[i+1];
  325.       m_impl.m_all_nodes[i+1]= tmp;
  326.     }
  327.   }
  328.   int i;
  329.   Uint32 cur_group, i_group= 0;
  330.   cur_group= ~0;
  331.   for (i= (int)m_impl.m_all_nodes.size()-1; i >= 0; i--)
  332.   {
  333.     if (m_impl.m_all_nodes[i].group != cur_group)
  334.     {
  335.       cur_group= m_impl.m_all_nodes[i].group;
  336.       i_group= i+1;
  337.     }
  338.     m_impl.m_all_nodes[i].next_group= i_group;
  339.   }
  340.   cur_group= ~0;
  341.   for (i= 0; i < (int)m_impl.m_all_nodes.size(); i++)
  342.   {
  343.     if (m_impl.m_all_nodes[i].group != cur_group)
  344.     {
  345.       cur_group= m_impl.m_all_nodes[i].group;
  346.       i_group= i;
  347.     }
  348.     m_impl.m_all_nodes[i].this_group= i_group;
  349.   }
  350. #if 0
  351.   for (i= 0; i < (int)m_impl.m_all_nodes.size(); i++)
  352.   {
  353.     fprintf(stderr, "[%d] %d %d %d %dn",
  354.    i,
  355.    m_impl.m_all_nodes[i].id,
  356.    m_impl.m_all_nodes[i].group,
  357.    m_impl.m_all_nodes[i].this_group,
  358.    m_impl.m_all_nodes[i].next_group);
  359.   }
  360.   do_test();
  361. #endif
  362.   DBUG_VOID_RETURN;
  363. }
  364. void
  365. Ndb_cluster_connection_impl::do_test()
  366. {
  367.   Ndb_cluster_connection_node_iter iter;
  368.   int n= no_db_nodes()+5;
  369.   Uint32 *nodes= new Uint32[n+1];
  370.   for (int g= 0; g < n; g++)
  371.   {
  372.     for (int h= 0; h < n; h++)
  373.     {
  374.       Uint32 id;
  375.       Ndb_cluster_connection_node_iter iter2;
  376.       {
  377. for (int j= 0; j < g; j++)
  378. {
  379.   nodes[j]= get_next_node(iter2);
  380. }
  381.       }
  382.       for (int i= 0; i < n; i++)
  383.       {
  384. init_get_next_node(iter);
  385. fprintf(stderr, "%d dead:(", g);
  386. id= 0;
  387. while (id == 0)
  388. {
  389.   if ((id= get_next_node(iter)) == 0)
  390.     break;
  391.   for (int j= 0; j < g; j++)
  392.   {
  393.     if (nodes[j] == id)
  394.     {
  395.       fprintf(stderr, " %d", id);
  396.       id= 0;
  397.       break;
  398.     }
  399.   }
  400. }
  401. fprintf(stderr, ")");
  402. if (id == 0)
  403. {
  404.   break;
  405. }
  406. fprintf(stderr, " %dn", id);
  407.       }
  408.       fprintf(stderr, "n");
  409.     }
  410.   }
  411.   delete [] nodes;
  412. }
  413. int Ndb_cluster_connection::connect(int no_retries, int retry_delay_in_seconds,
  414.     int verbose)
  415. {
  416.   DBUG_ENTER("Ndb_cluster_connection::connect");
  417.   const char* error = 0;
  418.   do {
  419.     if (m_impl.m_config_retriever == 0)
  420.       DBUG_RETURN(-1);
  421.     if (m_impl.m_config_retriever->do_connect(no_retries,
  422.       retry_delay_in_seconds,
  423.       verbose))
  424.       DBUG_RETURN(1); // mgmt server not up yet
  425.     Uint32 nodeId = m_impl.m_config_retriever->allocNodeId(4/*retries*/,
  426.    3/*delay*/);
  427.     if(nodeId == 0)
  428.       break;
  429.     ndb_mgm_configuration * props = m_impl.m_config_retriever->getConfig();
  430.     if(props == 0)
  431.       break;
  432.     m_impl.m_transporter_facade->start_instance(nodeId, props);
  433.     m_impl.init_nodes_vector(nodeId, *props);
  434.     ndb_mgm_destroy_configuration(props);
  435.     m_impl.m_transporter_facade->connected();
  436.     DBUG_RETURN(0);
  437.   } while(0);
  438.   
  439.   ndbout << "Configuration error: ";
  440.   const char* erString = m_impl.m_config_retriever->getErrorString();
  441.   if (erString == 0) {
  442.     erString = "No error specified!";
  443.   }
  444.   ndbout << erString << endl;
  445.   DBUG_RETURN(-1);
  446. }
  447. void Ndb_cluster_connection_impl::connect_thread()
  448. {
  449.   DBUG_ENTER("Ndb_cluster_connection_impl::connect_thread");
  450.   int r;
  451.   do {
  452.     NdbSleep_SecSleep(1);
  453.     if ((r = connect(0,0,0)) == 0)
  454.       break;
  455.     if (r == -1) {
  456.       printf("Ndb_cluster_connection::connect_thread errorn");
  457.       DBUG_ASSERT(false);
  458.       g_run_connect_thread= 0;
  459.     } else {
  460.       // Wait before making a new connect attempt
  461.       NdbSleep_SecSleep(1);
  462.     }
  463.   } while (g_run_connect_thread);
  464.   if (m_connect_callback)
  465.     (*m_connect_callback)();
  466.   DBUG_VOID_RETURN;
  467. }
  468. /*
  469.  * Hint handling to select node
  470.  * ToDo: fix this
  471.  */
  472. void
  473. Ndb_cluster_connection_impl::FragmentToNodeMap::init(Uint32 noOfNodes,
  474.      Uint8 nodeIds[])
  475. {
  476.   kValue           = 6;
  477.   noOfFragments    = 2 * noOfNodes;
  478.   /**
  479.    * Compute hashValueMask and hashpointerValue
  480.    */
  481.   {
  482.     Uint32 topBit = (1 << 31);
  483.     for(int i = 31; i>=0; i--){
  484.       if((noOfFragments & topBit) != 0)
  485. break;
  486.       topBit >>= 1;
  487.     }
  488.     hashValueMask    = topBit - 1;
  489.     hashpointerValue = noOfFragments - (hashValueMask + 1);
  490.   }
  491.   
  492.   /**
  493.    * This initialization depends on
  494.    * the fact that:
  495.    *  primary node for fragment i = i % noOfNodes
  496.    *
  497.    * This algorithm should be implemented in Dbdih
  498.    */
  499.   {
  500.     if (fragment2PrimaryNodeMap != 0)
  501.       abort();
  502.     fragment2PrimaryNodeMap = new Uint32[noOfFragments];
  503.     Uint32 i;  
  504.     for(i = 0; i<noOfNodes; i++){
  505.       fragment2PrimaryNodeMap[i] = nodeIds[i];
  506.     }
  507.     
  508.     // Sort them (bubble sort)
  509.     for(i = 0; i<noOfNodes-1; i++)
  510.       for(Uint32 j = i+1; j<noOfNodes; j++)
  511. if(fragment2PrimaryNodeMap[i] > fragment2PrimaryNodeMap[j]){
  512.   Uint32 tmp = fragment2PrimaryNodeMap[i];
  513.   fragment2PrimaryNodeMap[i] = fragment2PrimaryNodeMap[j];
  514.   fragment2PrimaryNodeMap[j] = tmp;
  515. }
  516.     
  517.     for(i = 0; i<noOfNodes; i++){
  518.       fragment2PrimaryNodeMap[i+noOfNodes] = fragment2PrimaryNodeMap[i];
  519.     }
  520.   }
  521. }
  522. void
  523. Ndb_cluster_connection_impl::FragmentToNodeMap::release(){
  524.   delete [] fragment2PrimaryNodeMap;
  525.   fragment2PrimaryNodeMap = 0;
  526. }
  527. static const Uint32 MAX_KEY_LEN_64_WORDS = 4;
  528. Uint32
  529. Ndb_cluster_connection_impl::guess_primary_node(const char *keyData,
  530. Uint32 keyLen)
  531. {
  532.   Uint64 tempData[MAX_KEY_LEN_64_WORDS];
  533.   
  534.   const Uint32 usedKeyLen = (keyLen + 3) >> 2; // In words
  535.   const char * usedKeyData = 0;
  536.   
  537.   /**
  538.    * If   key data buffer is not aligned (on 64 bit boundary)
  539.    *   or key len is not a multiple of 4
  540.    * Use temp data
  541.    */
  542.   if(((((UintPtr)keyData) & 7) == 0) && ((keyLen & 3) == 0)) {
  543.     usedKeyData = keyData;
  544.   } else {
  545.     memcpy(&tempData[0], keyData, keyLen);
  546.     const int slack = keyLen & 3;
  547.     if(slack > 0) {
  548.       memset(&((char *)&tempData[0])[keyLen], 0, (4 - slack));
  549.     }//if
  550.     usedKeyData = (char *)&tempData[0];
  551.   }//if
  552.   
  553.   Uint32 hashValue = md5_hash((Uint64 *)usedKeyData, usedKeyLen);
  554.   hashValue >>= fragmentToNodeMap.kValue;
  555.   Uint32 fragmentId = hashValue &
  556.     fragmentToNodeMap.hashValueMask;
  557.   if(fragmentId < fragmentToNodeMap.hashpointerValue) {
  558.     fragmentId = hashValue &
  559.                  ((fragmentToNodeMap.hashValueMask << 1) + 1);
  560.   }//if
  561.   return fragmentId;
  562. }
  563. template class Vector<Ndb_cluster_connection_impl::Node>;