NdbPoolImpl.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:14k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #include "NdbPoolImpl.hpp"
  14. NdbMutex *NdbPool::pool_mutex = NULL;
  15. NdbPool *the_pool = NULL;
  16. NdbPool*
  17. NdbPool::create_instance(Uint32 max_ndb_obj,
  18.                          Uint32 no_conn_obj,
  19.                          Uint32 init_no_ndb_objects)
  20. {
  21.   if (!initPoolMutex()) {
  22.     return NULL;
  23.   }
  24.   NdbMutex_Lock(pool_mutex);
  25.   NdbPool* a_pool;
  26.   if (the_pool != NULL) {
  27.     a_pool = NULL;
  28.   } else {
  29.     the_pool = new NdbPool(max_ndb_obj, no_conn_obj);
  30.     if (!the_pool->init(init_no_ndb_objects)) {
  31.       delete the_pool;
  32.       the_pool = NULL;
  33.     }
  34.     a_pool = the_pool;
  35.   }
  36.   NdbMutex* temp = pool_mutex;
  37.   if (a_pool == NULL) {
  38.     pool_mutex = NULL;
  39.   }
  40.   NdbMutex_Unlock(pool_mutex);
  41.   if (a_pool == NULL) {
  42.     NdbMutex_Destroy(temp);
  43.   }
  44.   return a_pool;
  45. }
  46. void
  47. NdbPool::drop_instance()
  48. {
  49.   if (pool_mutex == NULL) {
  50.     return;
  51.   }
  52.   NdbMutex_Lock(pool_mutex);
  53.   the_pool->release_all();
  54.   delete the_pool;
  55.   the_pool = NULL;
  56.   NdbMutex* temp = pool_mutex;
  57.   NdbMutex_Unlock(temp);
  58.   NdbMutex_Destroy(temp);
  59. }
  60. bool
  61. NdbPool::initPoolMutex()
  62. {
  63.   bool ret_result = false;
  64.   if (pool_mutex == NULL) {
  65.     pool_mutex = NdbMutex_Create();
  66.     ret_result = ((pool_mutex == NULL) ? false : true);
  67.   }
  68.   return ret_result;
  69. }
  70. NdbPool::NdbPool(Uint32 max_no_objects,
  71.                  Uint32 no_conn_objects)
  72. {
  73.   if (no_conn_objects > 1024) {
  74.     no_conn_objects = 1024;
  75.   }
  76.   if (max_no_objects > MAX_NDB_OBJECTS) {
  77.     max_no_objects = MAX_NDB_OBJECTS;
  78.   } else if (max_no_objects == 0) {
  79.     max_no_objects = 1;
  80.   }
  81.   m_max_ndb_objects = max_no_objects;
  82.   m_no_of_conn_objects = no_conn_objects;
  83.   m_no_of_objects = 0;
  84.   m_waiting = 0;
  85.   m_pool_reference = NULL;
  86.   m_hash_entry = NULL;
  87.   m_first_free = NULL_POOL;
  88.   m_first_not_in_use = NULL_POOL;
  89.   m_last_free = NULL_POOL;
  90.   input_pool_cond = NULL;
  91.   output_pool_cond = NULL;
  92.   m_output_queue = 0;
  93.   m_input_queue = 0;
  94.   m_signal_count = 0;
  95. }
  96. NdbPool::~NdbPool()
  97. {
  98.   NdbCondition_Destroy(input_pool_cond);
  99.   NdbCondition_Destroy(output_pool_cond);
  100. }
  101. void
  102. NdbPool::release_all()
  103. {
  104.   int i;
  105.   for (i = 0; i < m_max_ndb_objects + 1; i++) {
  106.     if (m_pool_reference[i].ndb_reference != NULL) {
  107.       assert(m_pool_reference[i].in_use);
  108.       assert(m_pool_reference[i].free_entry);
  109.       delete m_pool_reference[i].ndb_reference;
  110.     }
  111.   }
  112.   delete [] m_pool_reference;
  113.   delete [] m_hash_entry;
  114.   m_pool_reference = NULL;
  115.   m_hash_entry = NULL;
  116. }
  117. bool
  118. NdbPool::init(Uint32 init_no_objects)
  119. {
  120.   bool ret_result = false;
  121.   int i;
  122.   do {
  123.     input_pool_cond = NdbCondition_Create();
  124.     output_pool_cond = NdbCondition_Create();
  125.     if (input_pool_cond == NULL || output_pool_cond == NULL) {
  126.       break;
  127.     }
  128.     if (init_no_objects > m_max_ndb_objects) {
  129.       init_no_objects = m_max_ndb_objects;
  130.     }
  131.     if (init_no_objects == 0) {
  132.       init_no_objects = 1;
  133.     }
  134.     m_pool_reference = new NdbPool::POOL_STRUCT[m_max_ndb_objects + 1];
  135.     m_hash_entry     = new Uint8[POOL_HASH_TABLE_SIZE];
  136.     if ((m_pool_reference == NULL) || (m_hash_entry == NULL)) {
  137.       delete [] m_pool_reference;
  138.       delete [] m_hash_entry;
  139.       break;
  140.     }
  141.     for (i = 0; i < m_max_ndb_objects + 1; i++) {
  142.       m_pool_reference[i].ndb_reference = NULL;
  143.       m_pool_reference[i].in_use = false;
  144.       m_pool_reference[i].next_free_object = i+1;
  145.       m_pool_reference[i].prev_free_object = i-1;
  146.       m_pool_reference[i].next_db_object = NULL_POOL;
  147.       m_pool_reference[i].prev_db_object = NULL_POOL;
  148.     }
  149.     for (i = 0; i < POOL_HASH_TABLE_SIZE; i++) {
  150.       m_hash_entry[i] = NULL_HASH;
  151.     }
  152.     m_pool_reference[m_max_ndb_objects].next_free_object = NULL_POOL;
  153.     m_pool_reference[1].prev_free_object = NULL_POOL;
  154.     m_first_not_in_use = 1;
  155.     m_no_of_objects = init_no_objects;
  156.     for (i = init_no_objects; i > 0 ; i--) {
  157.       Uint32 fake_id;
  158.       if (!allocate_ndb(fake_id, (const char*)NULL, (const char*)NULL)) {
  159.         release_all();
  160.         break;
  161.       }
  162.     }
  163.     ret_result = true;
  164.     break;
  165.   } while (1);
  166.   return ret_result;
  167. }
  168. /*
  169. Get an Ndb object.
  170. Input:
  171. hint_id: 0 = no hint, otherwise a hint of which Ndb object the thread
  172.          used the last time.
  173. a_db_name: NULL = don't check for database specific Ndb  object, otherwise
  174.            a hint of which database is preferred.
  175. Output:
  176. hint_id: Returns id of Ndb object returned
  177. Return value: Ndb object pointer
  178. */
  179. Ndb*
  180. NdbPool::get_ndb_object(Uint32 &hint_id,
  181.                         const char* a_catalog_name,
  182.                         const char* a_schema_name)
  183. {
  184.   Ndb* ret_ndb = NULL;
  185.   Uint32 hash_entry = compute_hash(a_schema_name);
  186.   NdbMutex_Lock(pool_mutex);
  187.   while (1) {
  188.     /*
  189.     We start by checking if we can use the hinted Ndb object.
  190.     */
  191.     if ((ret_ndb = get_hint_ndb(hint_id, hash_entry)) != NULL) {
  192.       break;
  193.     }
  194.     /*
  195.     The hinted Ndb object was not free. We need to allocate another object.
  196.     We start by checking for a free Ndb object connected to the same database.
  197.     */
  198.     if (a_schema_name && (ret_ndb = get_db_hash(hint_id,
  199.                                                 hash_entry,
  200.                                                 a_catalog_name,
  201.                                                 a_schema_name))) {
  202.       break;
  203.     }
  204.     /*
  205.     No Ndb object connected to the preferred database was found.
  206.     We look for a free Ndb object in general.
  207.     */
  208.     if ((ret_ndb = get_free_list(hint_id, hash_entry)) != NULL) {
  209.       break;
  210.     }
  211.     /*
  212.     No free Ndb object was found. If we haven't allocated objects up until the
  213.     maximum number yet then we can allocate a new Ndb object here.
  214.     */
  215.     if (m_no_of_objects < m_max_ndb_objects) {
  216.       if (allocate_ndb(hint_id, a_catalog_name, a_schema_name)) {
  217.         assert((ret_ndb = get_hint_ndb(hint_id, hash_entry)) != NULL);
  218.         break;
  219.       }
  220.     }
  221.     /*
  222.     We need to wait until an Ndb object becomes
  223.     available.
  224.     */
  225.     if ((ret_ndb = wait_free_ndb(hint_id)) != NULL) {
  226.       break;
  227.     }
  228.     /*
  229.     Not even after waiting were we able to get hold of an Ndb object. We 
  230.     return NULL to indicate this problem.
  231.     */
  232.     ret_ndb = NULL;
  233.     break;
  234.   }
  235.   NdbMutex_Unlock(pool_mutex);
  236.   if (ret_ndb != NULL) {
  237.     /*
  238.     We need to set the catalog and schema name of the Ndb object before
  239.     returning it to the caller.
  240.     */
  241.     ret_ndb->setCatalogName(a_catalog_name);
  242.     ret_ndb->setSchemaName(a_schema_name);
  243.   }
  244.   return ret_ndb;
  245. }
  246. void
  247. NdbPool::return_ndb_object(Ndb* returned_ndb, Uint32 id)
  248. {
  249.   NdbMutex_Lock(pool_mutex);
  250.   assert(id <= m_max_ndb_objects);
  251.   assert(id != 0);
  252.   assert(returned_ndb == m_pool_reference[id].ndb_reference);
  253.   bool wait_cond = m_waiting;
  254.   if (wait_cond) {
  255.     NdbCondition* pool_cond;
  256.     if (m_signal_count > 0) {
  257.       pool_cond = output_pool_cond;
  258.       m_signal_count--;
  259.     } else {
  260.       pool_cond = input_pool_cond;
  261.     }
  262.     add_wait_list(id);
  263.     NdbMutex_Unlock(pool_mutex);
  264.     NdbCondition_Signal(pool_cond);
  265.   } else {
  266.     add_free_list(id);
  267.     add_db_hash(id);
  268.     NdbMutex_Unlock(pool_mutex);
  269.   }
  270. }
  271. bool
  272. NdbPool::allocate_ndb(Uint32 &id,
  273.                       const char* a_catalog_name,
  274.                       const char* a_schema_name)
  275. {
  276.   Ndb* a_ndb;
  277.   if (m_first_not_in_use == NULL_POOL) {
  278.     return false;
  279.   }
  280.   if (a_schema_name) {
  281.     a_ndb = new Ndb(a_schema_name, a_catalog_name);
  282.   } else {
  283.     a_ndb = new Ndb("");
  284.   }
  285.   if (a_ndb == NULL) {
  286.     return false;
  287.   }
  288.   a_ndb->init(m_no_of_conn_objects);
  289.   m_no_of_objects++;
  290.   id = m_first_not_in_use;
  291.   Uint32 allocated_id = m_first_not_in_use;
  292.   m_first_not_in_use = m_pool_reference[allocated_id].next_free_object;
  293.   m_pool_reference[allocated_id].ndb_reference = a_ndb;
  294.   m_pool_reference[allocated_id].in_use = true;
  295.   m_pool_reference[allocated_id].free_entry = false;
  296.   add_free_list(allocated_id);
  297.   add_db_hash(allocated_id);
  298.   return true;
  299. }
  300. void
  301. NdbPool::add_free_list(Uint32 id)
  302. {
  303.   assert(!m_pool_reference[id].free_entry);
  304.   assert(m_pool_reference[id].in_use);
  305.   m_pool_reference[id].free_entry = true;
  306.   m_pool_reference[id].next_free_object = m_first_free;
  307.   m_pool_reference[id].prev_free_object = (Uint8)NULL_POOL;
  308.   m_first_free = (Uint8)id;
  309.   if (m_last_free == (Uint8)NULL_POOL) {
  310.     m_last_free = (Uint8)id;
  311.   }
  312. }
  313. void
  314. NdbPool::add_db_hash(Uint32 id)
  315. {
  316.   Ndb* t_ndb = m_pool_reference[id].ndb_reference;
  317.   const char* schema_name = t_ndb->getSchemaName();
  318.   Uint32 hash_entry = compute_hash(schema_name);
  319.   Uint8 next_db_entry = m_hash_entry[hash_entry];
  320.   m_pool_reference[id].next_db_object = next_db_entry;
  321.   m_pool_reference[id].prev_db_object = (Uint8)NULL_HASH;
  322.   m_hash_entry[hash_entry] = (Uint8)id;
  323. }
  324. Ndb*
  325. NdbPool::get_free_list(Uint32 &id, Uint32 hash_entry)
  326. {
  327.   if (m_first_free == NULL_POOL) {
  328.     return NULL;
  329.   }
  330.   id = m_first_free;
  331.   Ndb* ret_ndb = get_hint_ndb(m_first_free, hash_entry);
  332.   assert(ret_ndb != NULL);
  333.   return ret_ndb;
  334. }
  335. Ndb*
  336. NdbPool::get_db_hash(Uint32 &id,
  337.                      Uint32 hash_entry,
  338.                      const char *a_catalog_name,
  339.                      const char *a_schema_name)
  340. {
  341.   Uint32 entry_id = m_hash_entry[hash_entry];
  342.   bool found = false;
  343.   while (entry_id != NULL_HASH) {
  344.     Ndb* t_ndb = m_pool_reference[entry_id].ndb_reference;
  345.     const char *a_ndb_catalog_name = t_ndb->getCatalogName();
  346.     if (strcmp(a_catalog_name, a_ndb_catalog_name) == 0) {
  347.       const char *a_ndb_schema_name = t_ndb->getSchemaName();
  348.       if (strcmp(a_schema_name, a_ndb_schema_name) == 0) {
  349.         found = true;
  350.         break;
  351.       }
  352.     }
  353.     entry_id = m_pool_reference[entry_id].next_db_object;
  354.   }
  355.   if (found) {
  356.     id = entry_id;
  357.     Ndb* ret_ndb = get_hint_ndb(entry_id, hash_entry);
  358.     assert(ret_ndb != NULL);
  359.     return ret_ndb;
  360.   }
  361.   return NULL;
  362. }
  363. Ndb*
  364. NdbPool::get_hint_ndb(Uint32 hint_id, Uint32 hash_entry)
  365. {
  366.   Ndb* ret_ndb = NULL;
  367.   do {
  368.     if ((hint_id != 0) &&
  369.         (hint_id <= m_max_ndb_objects) &&
  370.         (m_pool_reference[hint_id].in_use) &&
  371.         (m_pool_reference[hint_id].free_entry)) {
  372.       ret_ndb = m_pool_reference[hint_id].ndb_reference;
  373.       if (ret_ndb != NULL) {
  374.         break;
  375.       } else {
  376.         assert(false);
  377.       }
  378.     }
  379.     return NULL;
  380.   } while (1);
  381.   /*
  382.   This is where we remove the entry from the free list and from the db hash
  383.   table.
  384.   */
  385.   remove_free_list(hint_id);
  386.   remove_db_hash(hint_id, hash_entry);
  387.   return ret_ndb;
  388. }
  389. void
  390. NdbPool::remove_free_list(Uint32 id)
  391. {
  392.   Uint8 next_free_entry = m_pool_reference[id].next_free_object;
  393.   Uint8 prev_free_entry = m_pool_reference[id].prev_free_object;
  394.   if (prev_free_entry == (Uint8)NULL_POOL) {
  395.     m_first_free = next_free_entry;
  396.   } else {
  397.     m_pool_reference[prev_free_entry].next_free_object = next_free_entry;
  398.   }
  399.   if (next_free_entry == (Uint8)NULL_POOL) {
  400.     m_last_free = prev_free_entry;
  401.   } else {
  402.     m_pool_reference[next_free_entry].prev_free_object = prev_free_entry;
  403.   }
  404.   m_pool_reference[id].next_free_object = NULL_POOL;
  405.   m_pool_reference[id].prev_free_object = NULL_POOL;
  406.   m_pool_reference[id].free_entry = false;
  407. }
  408. void
  409. NdbPool::remove_db_hash(Uint32 id, Uint32 hash_entry)
  410. {
  411.   Uint8 next_free_entry = m_pool_reference[id].next_db_object;
  412.   Uint8 prev_free_entry = m_pool_reference[id].prev_db_object;
  413.   if (prev_free_entry == (Uint8)NULL_HASH) {
  414.     m_hash_entry[hash_entry] = next_free_entry;
  415.   } else {
  416.     m_pool_reference[prev_free_entry].next_db_object = next_free_entry;
  417.   }
  418.   if (next_free_entry == (Uint8)NULL_HASH) {
  419.     ;
  420.   } else {
  421.     m_pool_reference[next_free_entry].prev_db_object = prev_free_entry;
  422.   }
  423.   m_pool_reference[id].next_db_object = NULL_HASH;
  424.   m_pool_reference[id].prev_db_object = NULL_HASH;
  425. }
  426. Uint32
  427. NdbPool::compute_hash(const char *a_schema_name)
  428. {
  429.   Uint32 len = strlen(a_schema_name);
  430.   Uint32 h = 147;
  431.   for (Uint32 i = 0; i < len; i++) {
  432.     Uint32 c = a_schema_name[i];
  433.     h = (h << 5) + h + c;
  434.   }
  435.   h &= (POOL_HASH_TABLE_SIZE - 1);
  436.   return h;
  437. }
  438. Ndb*
  439. NdbPool::wait_free_ndb(Uint32 &id)
  440. {
  441.   int res;
  442.   int time_out = 3500;
  443.   do {
  444.     NdbCondition* tmp = input_pool_cond;
  445.     m_waiting++;
  446.     m_input_queue++;
  447.     time_out -= 500;
  448.     res = NdbCondition_WaitTimeout(input_pool_cond, pool_mutex, time_out);
  449.     if (tmp == input_pool_cond) {
  450.       m_input_queue--;
  451.     } else {
  452.       m_output_queue--;
  453.       if (m_output_queue == 0) {
  454.         switch_condition_queue();
  455.       }
  456.     }
  457.     m_waiting--;
  458.   } while (res == 0 && m_first_wait == NULL_POOL);
  459.   if (res != 0 && m_first_wait == NULL_POOL) {
  460.     return NULL;
  461.   }
  462.   id = m_first_wait;
  463.   remove_wait_list();
  464.   assert(m_waiting != 0 || m_first_wait == NULL_POOL);
  465.   return m_pool_reference[id].ndb_reference;
  466. }
  467. void
  468. NdbPool::remove_wait_list()
  469. {
  470.   Uint32 id = m_first_wait;
  471.   m_first_wait = m_pool_reference[id].next_free_object;
  472.   m_pool_reference[id].next_free_object = NULL_POOL;
  473.   m_pool_reference[id].prev_free_object = NULL_POOL;
  474.   m_pool_reference[id].free_entry = false;
  475. }
  476. void
  477. NdbPool::add_wait_list(Uint32 id)
  478. {
  479.   m_pool_reference[id].next_free_object = m_first_wait;
  480.   m_first_wait = id;
  481. }
  482. void
  483. NdbPool::switch_condition_queue()
  484. {
  485.   m_signal_count = m_input_queue;
  486.   Uint8 move_queue = m_input_queue;
  487.   m_input_queue = m_output_queue;
  488.   m_output_queue = move_queue;
  489.   NdbCondition* move_cond = input_pool_cond;
  490.   input_pool_cond = output_pool_cond;
  491.   output_pool_cond = move_cond;
  492. }