ha_ndbcluster.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:136k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1.   DBUG_RETURN(res);
  2. }
  3. inline
  4. int ha_ndbcluster::read_range_first_to_buf(const key_range *start_key,
  5.    const key_range *end_key,
  6.    bool eq_range, bool sorted,
  7.    byte* buf)
  8. {
  9.   KEY* key_info;
  10.   int error= 1; 
  11.   DBUG_ENTER("ha_ndbcluster::read_range_first_to_buf");
  12.   DBUG_PRINT("info", ("eq_range: %d, sorted: %d", eq_range, sorted));
  13.   switch (get_index_type(active_index)){
  14.   case PRIMARY_KEY_ORDERED_INDEX:
  15.   case PRIMARY_KEY_INDEX:
  16.     key_info= table->key_info + active_index;
  17.     if (start_key && 
  18. start_key->length == key_info->key_length &&
  19. start_key->flag == HA_READ_KEY_EXACT)
  20.     {
  21.       if(m_active_cursor && (error= close_scan()))
  22. DBUG_RETURN(error);
  23.       error= pk_read(start_key->key, start_key->length, buf);      
  24.       DBUG_RETURN(error == HA_ERR_KEY_NOT_FOUND ? HA_ERR_END_OF_FILE : error);
  25.     }
  26.     break;
  27.   case UNIQUE_ORDERED_INDEX:
  28.   case UNIQUE_INDEX:
  29.     key_info= table->key_info + active_index;
  30.     if (start_key && start_key->length == key_info->key_length &&
  31. start_key->flag == HA_READ_KEY_EXACT && 
  32. !check_null_in_key(key_info, start_key->key, start_key->length))
  33.     {
  34.       if(m_active_cursor && (error= close_scan()))
  35. DBUG_RETURN(error);
  36.       error= unique_index_read(start_key->key, start_key->length, buf);
  37.       DBUG_RETURN(error == HA_ERR_KEY_NOT_FOUND ? HA_ERR_END_OF_FILE : error);
  38.     }
  39.     break;
  40.   default:
  41.     break;
  42.   }
  43.   // Start the ordered index scan and fetch the first row
  44.   error= ordered_index_scan(start_key, end_key, sorted, buf);
  45.   DBUG_RETURN(error);
  46. }
  47. int ha_ndbcluster::read_range_first(const key_range *start_key,
  48.     const key_range *end_key,
  49.     bool eq_range, bool sorted)
  50. {
  51.   byte* buf= table->record[0];
  52.   DBUG_ENTER("ha_ndbcluster::read_range_first");
  53.   
  54.   DBUG_RETURN(read_range_first_to_buf(start_key,
  55.       end_key,
  56.       eq_range, 
  57.       sorted,
  58.       buf));
  59. }
  60. int ha_ndbcluster::read_range_next()
  61. {
  62.   DBUG_ENTER("ha_ndbcluster::read_range_next");
  63.   DBUG_RETURN(next_result(table->record[0]));
  64. }
  65. int ha_ndbcluster::rnd_init(bool scan)
  66. {
  67.   NdbResultSet *cursor= m_active_cursor;
  68.   DBUG_ENTER("rnd_init");
  69.   DBUG_PRINT("enter", ("scan: %d", scan));
  70.   // Check if scan is to be restarted
  71.   if (cursor)
  72.   {
  73.     if (!scan)
  74.       DBUG_RETURN(1);
  75.     int res= cursor->restart(m_force_send);
  76.     DBUG_ASSERT(res == 0);
  77.   }
  78.   index_init(table->primary_key);
  79.   DBUG_RETURN(0);
  80. }
  81. int ha_ndbcluster::close_scan()
  82. {
  83.   NdbResultSet *cursor= m_active_cursor;
  84.   NdbConnection *trans= m_active_trans;
  85.   DBUG_ENTER("close_scan");
  86.   if (!cursor)
  87.     DBUG_RETURN(1);
  88.   
  89.   if (m_ops_pending)
  90.   {
  91.     /*
  92.       Take over any pending transactions to the 
  93.       deleteing/updating transaction before closing the scan    
  94.     */
  95.     DBUG_PRINT("info", ("ops_pending: %d", m_ops_pending));    
  96.     if (execute_no_commit(this,trans) != 0) {
  97.       no_uncommitted_rows_execute_failure();
  98.       DBUG_RETURN(ndb_err(trans));
  99.     }
  100.     m_ops_pending= 0;
  101.   }
  102.   
  103.   cursor->close(m_force_send);
  104.   m_active_cursor= NULL;
  105.   DBUG_RETURN(0);
  106. }
  107. int ha_ndbcluster::rnd_end()
  108. {
  109.   DBUG_ENTER("rnd_end");
  110.   DBUG_RETURN(close_scan());
  111. }
  112. int ha_ndbcluster::rnd_next(byte *buf)
  113. {
  114.   DBUG_ENTER("rnd_next");
  115.   statistic_increment(ha_read_rnd_next_count, &LOCK_status);
  116.   if (!m_active_cursor)
  117.     DBUG_RETURN(full_table_scan(buf));
  118.   DBUG_RETURN(next_result(buf));
  119. }
  120. /*
  121.   An "interesting" record has been found and it's pk 
  122.   retrieved by calling position
  123.   Now it's time to read the record from db once 
  124.   again
  125. */
  126. int ha_ndbcluster::rnd_pos(byte *buf, byte *pos)
  127. {
  128.   DBUG_ENTER("rnd_pos");
  129.   statistic_increment(ha_read_rnd_count,&LOCK_status);
  130.   // The primary key for the record is stored in pos
  131.   // Perform a pk_read using primary key "index"
  132.   DBUG_RETURN(pk_read(pos, ref_length, buf));  
  133. }
  134. /*
  135.   Store the primary key of this record in ref 
  136.   variable, so that the row can be retrieved again later
  137.   using "reference" in rnd_pos
  138. */
  139. void ha_ndbcluster::position(const byte *record)
  140. {
  141.   KEY *key_info;
  142.   KEY_PART_INFO *key_part;
  143.   KEY_PART_INFO *end;
  144.   byte *buff;
  145.   DBUG_ENTER("position");
  146.   if (table->primary_key != MAX_KEY) 
  147.   {
  148.     key_info= table->key_info + table->primary_key;
  149.     key_part= key_info->key_part;
  150.     end= key_part + key_info->key_parts;
  151.     buff= ref;
  152.     
  153.     for (; key_part != end; key_part++) 
  154.     {
  155.       if (key_part->null_bit) {
  156.         /* Store 0 if the key part is a NULL part */      
  157.         if (record[key_part->null_offset]
  158.             & key_part->null_bit) {
  159.           *buff++= 1;
  160.           continue;
  161.         }      
  162.         *buff++= 0;
  163.       }
  164.       memcpy(buff, record + key_part->offset, key_part->length);
  165.       buff += key_part->length;
  166.     }
  167.   } 
  168.   else 
  169.   {
  170.     // No primary key, get hidden key
  171.     DBUG_PRINT("info", ("Getting hidden key"));
  172.     int hidden_no= table->fields;
  173.     NdbRecAttr* rec= m_value[hidden_no].rec;
  174.     const NDBTAB *tab= (const NDBTAB *) m_table;  
  175.     const NDBCOL *hidden_col= tab->getColumn(hidden_no);
  176.     DBUG_ASSERT(hidden_col->getPrimaryKey() && 
  177.                 hidden_col->getAutoIncrement() &&
  178.                 rec != NULL && 
  179.                 ref_length == NDB_HIDDEN_PRIMARY_KEY_LENGTH);
  180.     memcpy(ref, (const void*)rec->aRef(), ref_length);
  181.   }
  182.   
  183.   DBUG_DUMP("ref", (char*)ref, ref_length);
  184.   DBUG_VOID_RETURN;
  185. }
  186. void ha_ndbcluster::info(uint flag)
  187. {
  188.   DBUG_ENTER("info");
  189.   DBUG_PRINT("enter", ("flag: %d", flag));
  190.   
  191.   if (flag & HA_STATUS_POS)
  192.     DBUG_PRINT("info", ("HA_STATUS_POS"));
  193.   if (flag & HA_STATUS_NO_LOCK)
  194.     DBUG_PRINT("info", ("HA_STATUS_NO_LOCK"));
  195.   if (flag & HA_STATUS_TIME)
  196.     DBUG_PRINT("info", ("HA_STATUS_TIME"));
  197.   if (flag & HA_STATUS_VARIABLE)
  198.   {
  199.     DBUG_PRINT("info", ("HA_STATUS_VARIABLE"));
  200.     if (m_table_info)
  201.     {
  202.       if (m_ha_not_exact_count)
  203. records= 100;
  204.       else
  205. records_update();
  206.     }
  207.     else
  208.     {
  209.       if ((my_errno= check_ndb_connection()))
  210.         DBUG_VOID_RETURN;
  211.       Ndb *ndb= get_ndb();
  212.       Uint64 rows= 100;
  213.       if (current_thd->variables.ndb_use_exact_count)
  214. ndb_get_table_statistics(ndb, m_tabname, &rows, 0);
  215.       records= rows;
  216.     }
  217.   }
  218.   if (flag & HA_STATUS_CONST)
  219.   {
  220.     DBUG_PRINT("info", ("HA_STATUS_CONST"));
  221.     set_rec_per_key();
  222.   }
  223.   if (flag & HA_STATUS_ERRKEY)
  224.   {
  225.     DBUG_PRINT("info", ("HA_STATUS_ERRKEY"));
  226.     errkey= m_dupkey;
  227.   }
  228.   if (flag & HA_STATUS_AUTO)
  229.   {
  230.     DBUG_PRINT("info", ("HA_STATUS_AUTO"));
  231.     if (m_table)
  232.     {
  233.       Ndb *ndb= get_ndb();
  234.       
  235.       auto_increment_value= 
  236.         ndb->readAutoIncrementValue((const NDBTAB *) m_table);
  237.     }
  238.   }
  239.   DBUG_VOID_RETURN;
  240. }
  241. int ha_ndbcluster::extra(enum ha_extra_function operation)
  242. {
  243.   DBUG_ENTER("extra");
  244.   switch (operation) {
  245.   case HA_EXTRA_NORMAL:              /* Optimize for space (def) */
  246.     DBUG_PRINT("info", ("HA_EXTRA_NORMAL"));
  247.     break;
  248.   case HA_EXTRA_QUICK:                 /* Optimize for speed */
  249.     DBUG_PRINT("info", ("HA_EXTRA_QUICK"));
  250.     break;
  251.   case HA_EXTRA_RESET:                 /* Reset database to after open */
  252.     DBUG_PRINT("info", ("HA_EXTRA_RESET"));
  253.     break;
  254.   case HA_EXTRA_CACHE:                 /* Cash record in HA_rrnd() */
  255.     DBUG_PRINT("info", ("HA_EXTRA_CACHE"));
  256.     break;
  257.   case HA_EXTRA_NO_CACHE:              /* End cacheing of records (def) */
  258.     DBUG_PRINT("info", ("HA_EXTRA_NO_CACHE"));
  259.     break;
  260.   case HA_EXTRA_NO_READCHECK:          /* No readcheck on update */
  261.     DBUG_PRINT("info", ("HA_EXTRA_NO_READCHECK"));
  262.     break;
  263.   case HA_EXTRA_READCHECK:             /* Use readcheck (def) */
  264.     DBUG_PRINT("info", ("HA_EXTRA_READCHECK"));
  265.     break;
  266.   case HA_EXTRA_KEYREAD:               /* Read only key to database */
  267.     DBUG_PRINT("info", ("HA_EXTRA_KEYREAD"));
  268.     break;
  269.   case HA_EXTRA_NO_KEYREAD:            /* Normal read of records (def) */
  270.     DBUG_PRINT("info", ("HA_EXTRA_NO_KEYREAD"));
  271.     break;
  272.   case HA_EXTRA_NO_USER_CHANGE:        /* No user is allowed to write */
  273.     DBUG_PRINT("info", ("HA_EXTRA_NO_USER_CHANGE"));
  274.     break;
  275.   case HA_EXTRA_KEY_CACHE:
  276.     DBUG_PRINT("info", ("HA_EXTRA_KEY_CACHE"));
  277.     break;
  278.   case HA_EXTRA_NO_KEY_CACHE:
  279.     DBUG_PRINT("info", ("HA_EXTRA_NO_KEY_CACHE"));
  280.     break;
  281.   case HA_EXTRA_WAIT_LOCK:            /* Wait until file is avalably (def) */
  282.     DBUG_PRINT("info", ("HA_EXTRA_WAIT_LOCK"));
  283.     break;
  284.   case HA_EXTRA_NO_WAIT_LOCK:         /* If file is locked, return quickly */
  285.     DBUG_PRINT("info", ("HA_EXTRA_NO_WAIT_LOCK"));
  286.     break;
  287.   case HA_EXTRA_WRITE_CACHE:           /* Use write cache in ha_write() */
  288.     DBUG_PRINT("info", ("HA_EXTRA_WRITE_CACHE"));
  289.     break;
  290.   case HA_EXTRA_FLUSH_CACHE:           /* flush write_record_cache */
  291.     DBUG_PRINT("info", ("HA_EXTRA_FLUSH_CACHE"));
  292.     break;
  293.   case HA_EXTRA_NO_KEYS:               /* Remove all update of keys */
  294.     DBUG_PRINT("info", ("HA_EXTRA_NO_KEYS"));
  295.     break;
  296.   case HA_EXTRA_KEYREAD_CHANGE_POS:         /* Keyread, but change pos */
  297.     DBUG_PRINT("info", ("HA_EXTRA_KEYREAD_CHANGE_POS")); /* xxxxchk -r must be used */
  298.     break;                                  
  299.   case HA_EXTRA_REMEMBER_POS:          /* Remember pos for next/prev */
  300.     DBUG_PRINT("info", ("HA_EXTRA_REMEMBER_POS"));
  301.     break;
  302.   case HA_EXTRA_RESTORE_POS:
  303.     DBUG_PRINT("info", ("HA_EXTRA_RESTORE_POS"));
  304.     break;
  305.   case HA_EXTRA_REINIT_CACHE:          /* init cache from current record */
  306.     DBUG_PRINT("info", ("HA_EXTRA_REINIT_CACHE"));
  307.     break;
  308.   case HA_EXTRA_FORCE_REOPEN:          /* Datafile have changed on disk */
  309.     DBUG_PRINT("info", ("HA_EXTRA_FORCE_REOPEN"));
  310.     break;
  311.   case HA_EXTRA_FLUSH:                 /* Flush tables to disk */
  312.     DBUG_PRINT("info", ("HA_EXTRA_FLUSH"));
  313.     break;
  314.   case HA_EXTRA_NO_ROWS:               /* Don't write rows */
  315.     DBUG_PRINT("info", ("HA_EXTRA_NO_ROWS"));
  316.     break;
  317.   case HA_EXTRA_RESET_STATE:           /* Reset positions */
  318.     DBUG_PRINT("info", ("HA_EXTRA_RESET_STATE"));
  319.     break;
  320.   case HA_EXTRA_IGNORE_DUP_KEY:       /* Dup keys don't rollback everything*/
  321.     DBUG_PRINT("info", ("HA_EXTRA_IGNORE_DUP_KEY"));
  322.     if (current_thd->lex->sql_command == SQLCOM_REPLACE)
  323.     {
  324.       DBUG_PRINT("info", ("Turning ON use of write instead of insert"));
  325.       m_use_write= TRUE;
  326.     } else 
  327.     {
  328.       DBUG_PRINT("info", ("Ignoring duplicate key"));
  329.       m_ignore_dup_key= TRUE;
  330.     }
  331.     break;
  332.   case HA_EXTRA_NO_IGNORE_DUP_KEY:
  333.     DBUG_PRINT("info", ("HA_EXTRA_NO_IGNORE_DUP_KEY"));
  334.     DBUG_PRINT("info", ("Turning OFF use of write instead of insert"));
  335.     m_use_write= FALSE;
  336.     m_ignore_dup_key= FALSE;
  337.     break;
  338.   case HA_EXTRA_RETRIEVE_ALL_COLS:    /* Retrieve all columns, not just those
  339.  where field->query_id is the same as
  340.  the current query id */
  341.     DBUG_PRINT("info", ("HA_EXTRA_RETRIEVE_ALL_COLS"));
  342.     m_retrieve_all_fields= TRUE;
  343.     break;
  344.   case HA_EXTRA_PREPARE_FOR_DELETE:
  345.     DBUG_PRINT("info", ("HA_EXTRA_PREPARE_FOR_DELETE"));
  346.     break;
  347.   case HA_EXTRA_PREPARE_FOR_UPDATE:     /* Remove read cache if problems */
  348.     DBUG_PRINT("info", ("HA_EXTRA_PREPARE_FOR_UPDATE"));
  349.     break;
  350.   case HA_EXTRA_PRELOAD_BUFFER_SIZE: 
  351.     DBUG_PRINT("info", ("HA_EXTRA_PRELOAD_BUFFER_SIZE"));
  352.     break;
  353.   case HA_EXTRA_RETRIEVE_PRIMARY_KEY: 
  354.     DBUG_PRINT("info", ("HA_EXTRA_RETRIEVE_PRIMARY_KEY"));
  355.     m_retrieve_primary_key= TRUE;
  356.     break;
  357.   case HA_EXTRA_CHANGE_KEY_TO_UNIQUE: 
  358.     DBUG_PRINT("info", ("HA_EXTRA_CHANGE_KEY_TO_UNIQUE"));
  359.     break;
  360.   case HA_EXTRA_CHANGE_KEY_TO_DUP: 
  361.     DBUG_PRINT("info", ("HA_EXTRA_CHANGE_KEY_TO_DUP"));
  362.     break;
  363.   }
  364.   
  365.   DBUG_RETURN(0);
  366. }
  367. /* 
  368.    Start of an insert, remember number of rows to be inserted, it will
  369.    be used in write_row and get_autoincrement to send an optimal number
  370.    of rows in each roundtrip to the server
  371.    SYNOPSIS
  372.    rows     number of rows to insert, 0 if unknown
  373. */
  374. void ha_ndbcluster::start_bulk_insert(ha_rows rows)
  375. {
  376.   int bytes, batch;
  377.   const NDBTAB *tab= (const NDBTAB *) m_table;    
  378.   DBUG_ENTER("start_bulk_insert");
  379.   DBUG_PRINT("enter", ("rows: %d", (int)rows));
  380.   
  381.   m_rows_inserted= (ha_rows) 0;
  382.   if (rows == (ha_rows) 0)
  383.   {
  384.     /* We don't know how many will be inserted, guess */
  385.     m_rows_to_insert= m_autoincrement_prefetch;
  386.   }
  387.   else
  388.     m_rows_to_insert= rows; 
  389.   /* 
  390.     Calculate how many rows that should be inserted
  391.     per roundtrip to NDB. This is done in order to minimize the 
  392.     number of roundtrips as much as possible. However performance will 
  393.     degrade if too many bytes are inserted, thus it's limited by this 
  394.     calculation.   
  395.   */
  396.   const int bytesperbatch= 8192;
  397.   bytes= 12 + tab->getRowSizeInBytes() + 4 * tab->getNoOfColumns();
  398.   batch= bytesperbatch/bytes;
  399.   batch= batch == 0 ? 1 : batch;
  400.   DBUG_PRINT("info", ("batch: %d, bytes: %d", batch, bytes));
  401.   m_bulk_insert_rows= batch;
  402.   DBUG_VOID_RETURN;
  403. }
  404. /*
  405.   End of an insert
  406.  */
  407. int ha_ndbcluster::end_bulk_insert()
  408. {
  409.   int error= 0;
  410.   DBUG_ENTER("end_bulk_insert");
  411.   // Check if last inserts need to be flushed
  412.   if (m_bulk_insert_not_flushed)
  413.   {
  414.     NdbConnection *trans= m_active_trans;
  415.     // Send rows to NDB
  416.     DBUG_PRINT("info", ("Sending inserts to NDB, "
  417.                         "rows_inserted:%d, bulk_insert_rows: %d", 
  418.                         (int) m_rows_inserted, (int) m_bulk_insert_rows)); 
  419.     m_bulk_insert_not_flushed= FALSE;
  420.     if (execute_no_commit(this,trans) != 0) {
  421.       no_uncommitted_rows_execute_failure();
  422.       my_errno= error= ndb_err(trans);
  423.     }
  424.   }
  425.   m_rows_inserted= (ha_rows) 0;
  426.   m_rows_to_insert= (ha_rows) 1;
  427.   DBUG_RETURN(error);
  428. }
  429. int ha_ndbcluster::extra_opt(enum ha_extra_function operation, ulong cache_size)
  430. {
  431.   DBUG_ENTER("extra_opt");
  432.   DBUG_PRINT("enter", ("cache_size: %lu", cache_size));
  433.   DBUG_RETURN(extra(operation));
  434. }
  435. int ha_ndbcluster::reset()
  436. {
  437.   DBUG_ENTER("reset");
  438.   // Reset what?
  439.   DBUG_RETURN(1);
  440. }
  441. static const char *ha_ndb_bas_exts[]= { ha_ndb_ext, NullS };
  442. const char **ha_ndbcluster::bas_ext() const
  443. { return ha_ndb_bas_exts; }
  444. /*
  445.   How many seeks it will take to read through the table
  446.   This is to be comparable to the number returned by records_in_range so
  447.   that we can decide if we should scan the table or use keys.
  448. */
  449. double ha_ndbcluster::scan_time()
  450. {
  451.   DBUG_ENTER("ha_ndbcluster::scan_time()");
  452.   double res= rows2double(records*1000);
  453.   DBUG_PRINT("exit", ("table: %s value: %f", 
  454.       m_tabname, res));
  455.   DBUG_RETURN(res);
  456. }
  457. /*
  458.   Convert MySQL table locks into locks supported by Ndb Cluster.
  459.   Note that MySQL Cluster does currently not support distributed
  460.   table locks, so to be safe one should set cluster in Single
  461.   User Mode, before relying on table locks when updating tables
  462.   from several MySQL servers
  463. */
  464. THR_LOCK_DATA **ha_ndbcluster::store_lock(THD *thd,
  465.                                           THR_LOCK_DATA **to,
  466.                                           enum thr_lock_type lock_type)
  467. {
  468.   DBUG_ENTER("store_lock");
  469.   if (lock_type != TL_IGNORE && m_lock.type == TL_UNLOCK) 
  470.   {
  471.     /* If we are not doing a LOCK TABLE, then allow multiple
  472.        writers */
  473.     
  474.     /* Since NDB does not currently have table locks
  475.        this is treated as a ordinary lock */
  476.     if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
  477.          lock_type <= TL_WRITE) && !thd->in_lock_tables)      
  478.       lock_type= TL_WRITE_ALLOW_WRITE;
  479.     
  480.     /* In queries of type INSERT INTO t1 SELECT ... FROM t2 ...
  481.        MySQL would use the lock TL_READ_NO_INSERT on t2, and that
  482.        would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts
  483.        to t2. Convert the lock to a normal read lock to allow
  484.        concurrent inserts to t2. */
  485.     
  486.     if (lock_type == TL_READ_NO_INSERT && !thd->in_lock_tables)
  487.       lock_type= TL_READ;
  488.     
  489.     m_lock.type=lock_type;
  490.   }
  491.   *to++= &m_lock;
  492.   DBUG_PRINT("exit", ("lock_type: %d", lock_type));
  493.   
  494.   DBUG_RETURN(to);
  495. }
  496. #ifndef DBUG_OFF
  497. #define PRINT_OPTION_FLAGS(t) { 
  498.       if (t->options & OPTION_NOT_AUTOCOMMIT) 
  499.         DBUG_PRINT("thd->options", ("OPTION_NOT_AUTOCOMMIT")); 
  500.       if (t->options & OPTION_BEGIN) 
  501.         DBUG_PRINT("thd->options", ("OPTION_BEGIN")); 
  502.       if (t->options & OPTION_TABLE_LOCK) 
  503.         DBUG_PRINT("thd->options", ("OPTION_TABLE_LOCK")); 
  504. }
  505. #else
  506. #define PRINT_OPTION_FLAGS(t)
  507. #endif
  508. /*
  509.   As MySQL will execute an external lock for every new table it uses
  510.   we can use this to start the transactions.
  511.   If we are in auto_commit mode we just need to start a transaction
  512.   for the statement, this will be stored in transaction.stmt.
  513.   If not, we have to start a master transaction if there doesn't exist
  514.   one from before, this will be stored in transaction.all
  515.  
  516.   When a table lock is held one transaction will be started which holds
  517.   the table lock and for each statement a hupp transaction will be started  
  518.   If we are locking the table then:
  519.   - save the NdbDictionary::Table for easy access
  520.   - save reference to table statistics
  521.   - refresh list of the indexes for the table if needed (if altered)
  522.  */
  523. int ha_ndbcluster::external_lock(THD *thd, int lock_type)
  524. {
  525.   int error=0;
  526.   NdbConnection* trans= NULL;
  527.   DBUG_ENTER("external_lock");
  528.   /*
  529.     Check that this handler instance has a connection
  530.     set up to the Ndb object of thd
  531.    */
  532.   if (check_ndb_connection())
  533.     DBUG_RETURN(1);
  534.  
  535.   Thd_ndb *thd_ndb= (Thd_ndb*)thd->transaction.thd_ndb;
  536.   Ndb *ndb= thd_ndb->ndb;
  537.   DBUG_PRINT("enter", ("transaction.thd_ndb->lock_count: %d", 
  538.                        thd_ndb->lock_count));
  539.   if (lock_type != F_UNLCK)
  540.   {
  541.     DBUG_PRINT("info", ("lock_type != F_UNLCK"));
  542.     if (!thd_ndb->lock_count++)
  543.     {
  544.       PRINT_OPTION_FLAGS(thd);
  545.       if (!(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))) 
  546.       {
  547.         // Autocommit transaction
  548.         DBUG_ASSERT(!thd->transaction.stmt.ndb_tid);
  549.         DBUG_PRINT("trans",("Starting transaction stmt"));      
  550.         trans= ndb->startTransaction();
  551.         if (trans == NULL)
  552.           ERR_RETURN(ndb->getNdbError());
  553. no_uncommitted_rows_reset(thd);
  554.         thd->transaction.stmt.ndb_tid= trans;
  555.       } 
  556.       else 
  557.       { 
  558.         if (!thd->transaction.all.ndb_tid)
  559. {
  560.           // Not autocommit transaction
  561.           // A "master" transaction ha not been started yet
  562.           DBUG_PRINT("trans",("starting transaction, all"));
  563.           
  564.           trans= ndb->startTransaction();
  565.           if (trans == NULL)
  566.             ERR_RETURN(ndb->getNdbError());
  567.   no_uncommitted_rows_reset(thd);
  568.           /*
  569.             If this is the start of a LOCK TABLE, a table look 
  570.             should be taken on the table in NDB
  571.            
  572.             Check if it should be read or write lock
  573.            */
  574.           if (thd->options & (OPTION_TABLE_LOCK))
  575.   {
  576.             //lockThisTable();
  577.             DBUG_PRINT("info", ("Locking the table..." ));
  578.           }
  579.           thd->transaction.all.ndb_tid= trans; 
  580.         }
  581.       }
  582.     }
  583.     /*
  584.       This is the place to make sure this handler instance
  585.       has a started transaction.
  586.      
  587.       The transaction is started by the first handler on which 
  588.       MySQL Server calls external lock
  589.      
  590.       Other handlers in the same stmt or transaction should use 
  591.       the same NDB transaction. This is done by setting up the m_active_trans
  592.       pointer to point to the NDB transaction. 
  593.      */
  594.     // store thread specific data first to set the right context
  595.     m_force_send=          thd->variables.ndb_force_send;
  596.     m_ha_not_exact_count= !thd->variables.ndb_use_exact_count;
  597.     m_autoincrement_prefetch= 
  598.       (ha_rows) thd->variables.ndb_autoincrement_prefetch_sz;
  599.     if (!thd->transaction.on)
  600.       m_transaction_on= FALSE;
  601.     else
  602.       m_transaction_on= thd->variables.ndb_use_transactions;
  603.     //     m_use_local_query_cache= thd->variables.ndb_use_local_query_cache;
  604.     m_active_trans= thd->transaction.all.ndb_tid ? 
  605.       (NdbConnection*)thd->transaction.all.ndb_tid:
  606.       (NdbConnection*)thd->transaction.stmt.ndb_tid;
  607.     DBUG_ASSERT(m_active_trans);
  608.     // Start of transaction
  609.     m_retrieve_all_fields= FALSE;
  610.     m_retrieve_primary_key= FALSE;
  611.     m_ops_pending= 0;    
  612.     {
  613.       NDBDICT *dict= ndb->getDictionary();
  614.       const NDBTAB *tab;
  615.       void *tab_info;
  616.       if (!(tab= dict->getTable(m_tabname, &tab_info)))
  617. ERR_RETURN(dict->getNdbError());
  618.       DBUG_PRINT("info", ("Table schema version: %d", 
  619.                           tab->getObjectVersion()));
  620.       // Check if thread has stale local cache
  621.       // New transaction must not use old tables... (trans != 0)
  622.       // Running might...
  623.       if ((trans && tab->getObjectStatus() != NdbDictionary::Object::Retrieved)
  624.   || tab->getObjectStatus() == NdbDictionary::Object::Invalid)
  625.       {
  626.         invalidate_dictionary_cache(FALSE);
  627.         if (!(tab= dict->getTable(m_tabname, &tab_info)))
  628.           ERR_RETURN(dict->getNdbError());
  629.         DBUG_PRINT("info", ("Table schema version: %d", 
  630.                             tab->getObjectVersion()));
  631.       }
  632.       if (m_table_version < tab->getObjectVersion())
  633.       {
  634.         /*
  635.           The table has been altered, caller has to retry
  636.         */
  637.         NdbError err= ndb->getNdbError(NDB_INVALID_SCHEMA_OBJECT);
  638.         DBUG_RETURN(ndb_to_mysql_error(&err));
  639.       }
  640.       if (m_table != (void *)tab)
  641.       {
  642.         m_table= (void *)tab;
  643.         m_table_version = tab->getObjectVersion();
  644.         if (!(my_errno= build_index_list(ndb, table, ILBP_OPEN)))
  645.           DBUG_RETURN(my_errno);
  646.       }
  647.       m_table_info= tab_info;
  648.     }
  649.     no_uncommitted_rows_init(thd);
  650.   } 
  651.   else 
  652.   {
  653.     DBUG_PRINT("info", ("lock_type == F_UNLCK"));
  654.     if (!--thd_ndb->lock_count)
  655.     {
  656.       DBUG_PRINT("trans", ("Last external_lock"));
  657.       PRINT_OPTION_FLAGS(thd);
  658.       if (thd->transaction.stmt.ndb_tid)
  659.       {
  660.         /*
  661.           Unlock is done without a transaction commit / rollback.
  662.           This happens if the thread didn't update any rows
  663.           We must in this case close the transaction to release resources
  664.         */
  665.         DBUG_PRINT("trans",("ending non-updating transaction"));
  666.         ndb->closeTransaction(m_active_trans);
  667.         thd->transaction.stmt.ndb_tid= 0;
  668.       }
  669.     }
  670.     m_table_info= NULL;
  671.     /*
  672.       This is the place to make sure this handler instance
  673.       no longer are connected to the active transaction.
  674.       And since the handler is no longer part of the transaction 
  675.       it can't have open cursors, ops or blobs pending.
  676.     */
  677.     m_active_trans= NULL;    
  678.     if (m_active_cursor)
  679.       DBUG_PRINT("warning", ("m_active_cursor != NULL"));
  680.     m_active_cursor= NULL;
  681.     if (m_blobs_pending)
  682.       DBUG_PRINT("warning", ("blobs_pending != 0"));
  683.     m_blobs_pending= 0;
  684.     
  685.     if (m_ops_pending)
  686.       DBUG_PRINT("warning", ("ops_pending != 0L"));
  687.     m_ops_pending= 0;
  688.   }
  689.   DBUG_RETURN(error);
  690. }
  691. /*
  692.   Start a transaction for running a statement if one is not
  693.   already running in a transaction. This will be the case in
  694.   a BEGIN; COMMIT; block
  695.   When using LOCK TABLE's external_lock will start a transaction
  696.   since ndb does not currently does not support table locking
  697. */
  698. int ha_ndbcluster::start_stmt(THD *thd)
  699. {
  700.   int error=0;
  701.   DBUG_ENTER("start_stmt");
  702.   PRINT_OPTION_FLAGS(thd);
  703.   NdbConnection *trans= 
  704.     (thd->transaction.stmt.ndb_tid)
  705.     ? (NdbConnection *)(thd->transaction.stmt.ndb_tid)
  706.     : (NdbConnection *)(thd->transaction.all.ndb_tid);
  707.   if (!trans){
  708.     Ndb *ndb= ((Thd_ndb*)thd->transaction.thd_ndb)->ndb;
  709.     DBUG_PRINT("trans",("Starting transaction stmt"));  
  710.     trans= ndb->startTransaction();
  711.     if (trans == NULL)
  712.       ERR_RETURN(ndb->getNdbError());
  713.     no_uncommitted_rows_reset(thd);
  714.     thd->transaction.stmt.ndb_tid= trans;
  715.   }
  716.   m_active_trans= trans;
  717.   // Start of statement
  718.   m_retrieve_all_fields= FALSE;
  719.   m_retrieve_primary_key= FALSE;
  720.   m_ops_pending= 0;    
  721.   
  722.   DBUG_RETURN(error);
  723. }
  724. /*
  725.   Commit a transaction started in NDB 
  726.  */
  727. int ndbcluster_commit(THD *thd, void *ndb_transaction)
  728. {
  729.   int res= 0;
  730.   Ndb *ndb= ((Thd_ndb*)thd->transaction.thd_ndb)->ndb;
  731.   NdbConnection *trans= (NdbConnection*)ndb_transaction;
  732.   DBUG_ENTER("ndbcluster_commit");
  733.   DBUG_PRINT("transaction",("%s",
  734.                             trans == thd->transaction.stmt.ndb_tid ? 
  735.                             "stmt" : "all"));
  736.   DBUG_ASSERT(ndb && trans);
  737.   if (execute_commit(thd,trans) != 0)
  738.   {
  739.     const NdbError err= trans->getNdbError();
  740.     const NdbOperation *error_op= trans->getNdbErrorOperation();
  741.     ERR_PRINT(err);     
  742.     res= ndb_to_mysql_error(&err);
  743.     if (res != -1) 
  744.       ndbcluster_print_error(res, error_op);
  745.   }
  746.   ndb->closeTransaction(trans);
  747.   DBUG_RETURN(res);
  748. }
  749. /*
  750.   Rollback a transaction started in NDB
  751.  */
  752. int ndbcluster_rollback(THD *thd, void *ndb_transaction)
  753. {
  754.   int res= 0;
  755.   Ndb *ndb= ((Thd_ndb*)thd->transaction.thd_ndb)->ndb;
  756.   NdbConnection *trans= (NdbConnection*)ndb_transaction;
  757.   DBUG_ENTER("ndbcluster_rollback");
  758.   DBUG_PRINT("transaction",("%s",
  759.                             trans == thd->transaction.stmt.ndb_tid ? 
  760.                             "stmt" : "all"));
  761.   DBUG_ASSERT(ndb && trans);
  762.   if (trans->execute(Rollback) != 0)
  763.   {
  764.     const NdbError err= trans->getNdbError();
  765.     const NdbOperation *error_op= trans->getNdbErrorOperation();
  766.     ERR_PRINT(err);     
  767.     res= ndb_to_mysql_error(&err);
  768.     if (res != -1) 
  769.       ndbcluster_print_error(res, error_op);
  770.   }
  771.   ndb->closeTransaction(trans);
  772.   DBUG_RETURN(0);
  773. }
  774. /*
  775.   Define NDB column based on Field.
  776.   Returns 0 or mysql error code.
  777.   Not member of ha_ndbcluster because NDBCOL cannot be declared.
  778.  */
  779. static int create_ndb_column(NDBCOL &col,
  780.                              Field *field,
  781.                              HA_CREATE_INFO *info)
  782. {
  783.   // Set name
  784.   {
  785.     char truncated_field_name[NDB_MAX_ATTR_NAME_SIZE];
  786.     strnmov(truncated_field_name,field->field_name,sizeof(truncated_field_name));
  787.     truncated_field_name[sizeof(truncated_field_name)-1]= '';
  788.     col.setName(truncated_field_name);
  789.   }
  790.   // Get char set
  791.   CHARSET_INFO *cs= field->charset();
  792.   // Set type and sizes
  793.   const enum enum_field_types mysql_type= field->real_type();
  794.   switch (mysql_type) {
  795.   // Numeric types
  796.   case MYSQL_TYPE_TINY:        
  797.     if (field->flags & UNSIGNED_FLAG)
  798.       col.setType(NDBCOL::Tinyunsigned);
  799.     else
  800.       col.setType(NDBCOL::Tinyint);
  801.     col.setLength(1);
  802.     break;
  803.   case MYSQL_TYPE_SHORT:
  804.     if (field->flags & UNSIGNED_FLAG)
  805.       col.setType(NDBCOL::Smallunsigned);
  806.     else
  807.       col.setType(NDBCOL::Smallint);
  808.     col.setLength(1);
  809.     break;
  810.   case MYSQL_TYPE_LONG:
  811.     if (field->flags & UNSIGNED_FLAG)
  812.       col.setType(NDBCOL::Unsigned);
  813.     else
  814.       col.setType(NDBCOL::Int);
  815.     col.setLength(1);
  816.     break;
  817.   case MYSQL_TYPE_INT24:       
  818.     if (field->flags & UNSIGNED_FLAG)
  819.       col.setType(NDBCOL::Mediumunsigned);
  820.     else
  821.       col.setType(NDBCOL::Mediumint);
  822.     col.setLength(1);
  823.     break;
  824.   case MYSQL_TYPE_LONGLONG:
  825.     if (field->flags & UNSIGNED_FLAG)
  826.       col.setType(NDBCOL::Bigunsigned);
  827.     else
  828.       col.setType(NDBCOL::Bigint);
  829.     col.setLength(1);
  830.     break;
  831.   case MYSQL_TYPE_FLOAT:
  832.     col.setType(NDBCOL::Float);
  833.     col.setLength(1);
  834.     break;
  835.   case MYSQL_TYPE_DOUBLE:
  836.     col.setType(NDBCOL::Double);
  837.     col.setLength(1);
  838.     break;
  839.   case MYSQL_TYPE_DECIMAL:    
  840.     {
  841.       Field_decimal *f= (Field_decimal*)field;
  842.       uint precision= f->pack_length();
  843.       uint scale= f->decimals();
  844.       if (field->flags & UNSIGNED_FLAG)
  845.       {
  846.         col.setType(NDBCOL::Olddecimalunsigned);
  847.         precision-= (scale > 0);
  848.       }
  849.       else
  850.       {
  851.         col.setType(NDBCOL::Olddecimal);
  852.         precision-= 1 + (scale > 0);
  853.       }
  854.       col.setPrecision(precision);
  855.       col.setScale(scale);
  856.       col.setLength(1);
  857.     }
  858.     break;
  859.   // Date types
  860.   case MYSQL_TYPE_DATETIME:    
  861.     col.setType(NDBCOL::Datetime);
  862.     col.setLength(1);
  863.     break;
  864.   case MYSQL_TYPE_DATE: // ?
  865.     col.setType(NDBCOL::Char);
  866.     col.setLength(field->pack_length());
  867.     break;
  868.   case MYSQL_TYPE_NEWDATE:
  869.     col.setType(NDBCOL::Date);
  870.     col.setLength(1);
  871.     break;
  872.   case MYSQL_TYPE_TIME:        
  873.     col.setType(NDBCOL::Time);
  874.     col.setLength(1);
  875.     break;
  876.   case MYSQL_TYPE_YEAR:
  877.     col.setType(NDBCOL::Year);
  878.     col.setLength(1);
  879.     break;
  880.   case MYSQL_TYPE_TIMESTAMP:
  881.     col.setType(NDBCOL::Timestamp);
  882.     col.setLength(1);
  883.     break;
  884.   // Char types
  885.   case MYSQL_TYPE_STRING:      
  886.     if (field->flags & BINARY_FLAG)
  887.       col.setType(NDBCOL::Binary);
  888.     else {
  889.       col.setType(NDBCOL::Char);
  890.       col.setCharset(cs);
  891.     }
  892.     if (field->pack_length() == 0)
  893.       col.setLength(1); // currently ndb does not support size 0
  894.     else
  895.       col.setLength(field->pack_length());
  896.     break;
  897.   case MYSQL_TYPE_VAR_STRING:
  898.     if (field->flags & BINARY_FLAG)
  899.       col.setType(NDBCOL::Varbinary);
  900.     else {
  901.       col.setType(NDBCOL::Varchar);
  902.       col.setCharset(cs);
  903.     }
  904.     col.setLength(field->pack_length());
  905.     break;
  906.   // Blob types (all come in as MYSQL_TYPE_BLOB)
  907.   mysql_type_tiny_blob:
  908.   case MYSQL_TYPE_TINY_BLOB:
  909.     if (field->flags & BINARY_FLAG)
  910.       col.setType(NDBCOL::Blob);
  911.     else {
  912.       col.setType(NDBCOL::Text);
  913.       col.setCharset(cs);
  914.     }
  915.     col.setInlineSize(256);
  916.     // No parts
  917.     col.setPartSize(0);
  918.     col.setStripeSize(0);
  919.     break;
  920.   mysql_type_blob:
  921.   case MYSQL_TYPE_BLOB:    
  922.     if (field->flags & BINARY_FLAG)
  923.       col.setType(NDBCOL::Blob);
  924.     else {
  925.       col.setType(NDBCOL::Text);
  926.       col.setCharset(cs);
  927.     }
  928.     // Use "<=" even if "<" is the exact condition
  929.     if (field->max_length() <= (1 << 8))
  930.       goto mysql_type_tiny_blob;
  931.     else if (field->max_length() <= (1 << 16))
  932.     {
  933.       col.setInlineSize(256);
  934.       col.setPartSize(2000);
  935.       col.setStripeSize(16);
  936.     }
  937.     else if (field->max_length() <= (1 << 24))
  938.       goto mysql_type_medium_blob;
  939.     else
  940.       goto mysql_type_long_blob;
  941.     break;
  942.   mysql_type_medium_blob:
  943.   case MYSQL_TYPE_MEDIUM_BLOB:   
  944.     if (field->flags & BINARY_FLAG)
  945.       col.setType(NDBCOL::Blob);
  946.     else {
  947.       col.setType(NDBCOL::Text);
  948.       col.setCharset(cs);
  949.     }
  950.     col.setInlineSize(256);
  951.     col.setPartSize(4000);
  952.     col.setStripeSize(8);
  953.     break;
  954.   mysql_type_long_blob:
  955.   case MYSQL_TYPE_LONG_BLOB:  
  956.     if (field->flags & BINARY_FLAG)
  957.       col.setType(NDBCOL::Blob);
  958.     else {
  959.       col.setType(NDBCOL::Text);
  960.       col.setCharset(cs);
  961.     }
  962.     col.setInlineSize(256);
  963.     col.setPartSize(8000);
  964.     col.setStripeSize(4);
  965.     break;
  966.   // Other types
  967.   case MYSQL_TYPE_ENUM:
  968.     col.setType(NDBCOL::Char);
  969.     col.setLength(field->pack_length());
  970.     break;
  971.   case MYSQL_TYPE_SET:         
  972.     col.setType(NDBCOL::Char);
  973.     col.setLength(field->pack_length());
  974.     break;
  975.   case MYSQL_TYPE_NULL:        
  976.   case MYSQL_TYPE_GEOMETRY:
  977.     goto mysql_type_unsupported;
  978.   mysql_type_unsupported:
  979.   default:
  980.     return HA_ERR_UNSUPPORTED;
  981.   }
  982.   // Set nullable and pk
  983.   col.setNullable(field->maybe_null());
  984.   col.setPrimaryKey(field->flags & PRI_KEY_FLAG);
  985.   // Set autoincrement
  986.   if (field->flags & AUTO_INCREMENT_FLAG) 
  987.   {
  988.     col.setAutoIncrement(TRUE);
  989.     ulonglong value= info->auto_increment_value ?
  990.       info->auto_increment_value : (ulonglong) 1;
  991.     DBUG_PRINT("info", ("Autoincrement key, initial: %llu", value));
  992.     col.setAutoIncrementInitialValue(value);
  993.   }
  994.   else
  995.     col.setAutoIncrement(FALSE);
  996.   return 0;
  997. }
  998. /*
  999.   Create a table in NDB Cluster
  1000.  */
  1001. static void ndb_set_fragmentation(NDBTAB &tab, TABLE *form, uint pk_length)
  1002. {
  1003.   if (form->max_rows == (ha_rows) 0) /* default setting, don't set fragmentation */
  1004.     return;
  1005.   /**
  1006.    * get the number of fragments right
  1007.    */
  1008.   uint no_fragments;
  1009.   {
  1010. #if MYSQL_VERSION_ID >= 50000
  1011.     uint acc_row_size= 25 + /*safety margin*/ 2;
  1012. #else
  1013.     uint acc_row_size= pk_length*4;
  1014.     /* add acc overhead */
  1015.     if (pk_length <= 8)  /* main page will set the limit */
  1016.       acc_row_size+= 25 + /*safety margin*/ 2;
  1017.     else                /* overflow page will set the limit */
  1018.       acc_row_size+= 4 + /*safety margin*/ 4;
  1019. #endif
  1020.     ulonglong acc_fragment_size= 512*1024*1024;
  1021.     ulonglong max_rows= form->max_rows;
  1022. #if MYSQL_VERSION_ID >= 50100
  1023.     no_fragments= (max_rows*acc_row_size)/acc_fragment_size+1;
  1024. #else
  1025.     no_fragments= ((max_rows*acc_row_size)/acc_fragment_size+1
  1026.    +1/*correct rounding*/)/2;
  1027. #endif
  1028.   }
  1029.   {
  1030.     uint no_nodes= g_ndb_cluster_connection->no_db_nodes();
  1031.     NDBTAB::FragmentType ftype;
  1032.     if (no_fragments > 2*no_nodes)
  1033.     {
  1034.       ftype= NDBTAB::FragAllLarge;
  1035.       if (no_fragments > 4*no_nodes)
  1036. push_warning(current_thd, MYSQL_ERROR::WARN_LEVEL_WARN, ER_UNKNOWN_ERROR,
  1037.      "Ndb might have problems storing the max amount of rows specified");
  1038.     }
  1039.     else if (no_fragments > no_nodes)
  1040.       ftype= NDBTAB::FragAllMedium;
  1041.     else
  1042.       ftype= NDBTAB::FragAllSmall;
  1043.     tab.setFragmentType(ftype);
  1044.   }
  1045. }
  1046. int ha_ndbcluster::create(const char *name, 
  1047.   TABLE *form, 
  1048.   HA_CREATE_INFO *info)
  1049. {
  1050.   NDBTAB tab;
  1051.   NDBCOL col;
  1052.   uint pack_length, length, i, pk_length= 0;
  1053.   const void *data, *pack_data;
  1054.   const char **key_names= form->keynames.type_names;
  1055.   char name2[FN_HEADLEN];
  1056.   bool create_from_engine= (info->table_options & HA_OPTION_CREATE_FROM_ENGINE);
  1057.    
  1058.   DBUG_ENTER("create");
  1059.   DBUG_PRINT("enter", ("name: %s", name));
  1060.   fn_format(name2, name, "", "",2);       // Remove the .frm extension
  1061.   set_dbname(name2);
  1062.   set_tabname(name2);    
  1063.   if (create_from_engine)
  1064.   {
  1065.     /*
  1066.       Table alreay exists in NDB and frm file has been created by 
  1067.       caller.
  1068.       Do Ndb specific stuff, such as create a .ndb file
  1069.     */
  1070.     my_errno= write_ndb_file();
  1071.     DBUG_RETURN(my_errno);
  1072.   }
  1073.   DBUG_PRINT("table", ("name: %s", m_tabname));  
  1074.   tab.setName(m_tabname);
  1075.   tab.setLogging(!(info->options & HA_LEX_CREATE_TMP_TABLE));    
  1076.    
  1077.   // Save frm data for this table
  1078.   if (readfrm(name, &data, &length))
  1079.     DBUG_RETURN(1);
  1080.   if (packfrm(data, length, &pack_data, &pack_length))
  1081.     DBUG_RETURN(2);
  1082.   
  1083.   DBUG_PRINT("info", ("setFrm data=%x, len=%d", pack_data, pack_length));
  1084.   tab.setFrm(pack_data, pack_length);      
  1085.   my_free((char*)data, MYF(0));
  1086.   my_free((char*)pack_data, MYF(0));
  1087.   
  1088.   for (i= 0; i < form->fields; i++) 
  1089.   {
  1090.     Field *field= form->field[i];
  1091.     DBUG_PRINT("info", ("name: %s, type: %u, pack_length: %d", 
  1092.                         field->field_name, field->real_type(),
  1093. field->pack_length()));
  1094.     if ((my_errno= create_ndb_column(col, field, info)))
  1095.       DBUG_RETURN(my_errno);
  1096.     tab.addColumn(col);
  1097.     if(col.getPrimaryKey())
  1098.       pk_length += (field->pack_length() + 3) / 4;
  1099.   }
  1100.   
  1101.   // No primary key, create shadow key as 64 bit, auto increment  
  1102.   if (form->primary_key == MAX_KEY) 
  1103.   {
  1104.     DBUG_PRINT("info", ("Generating shadow key"));
  1105.     col.setName("$PK");
  1106.     col.setType(NdbDictionary::Column::Bigunsigned);
  1107.     col.setLength(1);
  1108.     col.setNullable(FALSE);
  1109.     col.setPrimaryKey(TRUE);
  1110.     col.setAutoIncrement(TRUE);
  1111.     tab.addColumn(col);
  1112.     pk_length += 2;
  1113.   }
  1114.   
  1115.   // Make sure that blob tables don't have to big part size
  1116.   for (i= 0; i < form->fields; i++) 
  1117.   {
  1118.     /**
  1119.      * The extra +7 concists
  1120.      * 2 - words from pk in blob table
  1121.      * 5 - from extra words added by tup/dict??
  1122.      */
  1123.     switch (form->field[i]->real_type()) {
  1124.     case MYSQL_TYPE_BLOB:    
  1125.     case MYSQL_TYPE_MEDIUM_BLOB:   
  1126.     case MYSQL_TYPE_LONG_BLOB: 
  1127.     {
  1128.       NdbDictionary::Column * col = tab.getColumn(i);
  1129.       int size = pk_length + (col->getPartSize()+3)/4 + 7;
  1130.       if(size > NDB_MAX_TUPLE_SIZE_IN_WORDS && 
  1131.  (pk_length+7) < NDB_MAX_TUPLE_SIZE_IN_WORDS)
  1132.       {
  1133. size = NDB_MAX_TUPLE_SIZE_IN_WORDS - pk_length - 7;
  1134. col->setPartSize(4*size);
  1135.       }
  1136.       /**
  1137.        * If size > NDB_MAX and pk_length+7 >= NDB_MAX
  1138.        *   then the table can't be created anyway, so skip
  1139.        *   changing part size, and have error later
  1140.        */ 
  1141.     }
  1142.     default:
  1143.       break;
  1144.     }
  1145.   }
  1146.   ndb_set_fragmentation(tab, form, pk_length);
  1147.   if ((my_errno= check_ndb_connection()))
  1148.     DBUG_RETURN(my_errno);
  1149.   
  1150.   // Create the table in NDB     
  1151.   Ndb *ndb= get_ndb();
  1152.   NDBDICT *dict= ndb->getDictionary();
  1153.   if (dict->createTable(tab) != 0) 
  1154.   {
  1155.     const NdbError err= dict->getNdbError();
  1156.     ERR_PRINT(err);
  1157.     my_errno= ndb_to_mysql_error(&err);
  1158.     DBUG_RETURN(my_errno);
  1159.   }
  1160.   DBUG_PRINT("info", ("Table %s/%s created successfully", 
  1161.                       m_dbname, m_tabname));
  1162.   // Create secondary indexes
  1163.   my_errno= build_index_list(ndb, form, ILBP_CREATE);
  1164.   if (!my_errno)
  1165.     my_errno= write_ndb_file();
  1166.   DBUG_RETURN(my_errno);
  1167. }
  1168. int ha_ndbcluster::create_ordered_index(const char *name, 
  1169. KEY *key_info)
  1170. {
  1171.   DBUG_ENTER("create_ordered_index");
  1172.   DBUG_RETURN(create_index(name, key_info, FALSE));
  1173. }
  1174. int ha_ndbcluster::create_unique_index(const char *name, 
  1175.        KEY *key_info)
  1176. {
  1177.   DBUG_ENTER("create_unique_index");
  1178.   DBUG_RETURN(create_index(name, key_info, TRUE));
  1179. }
  1180. /*
  1181.   Create an index in NDB Cluster
  1182.  */
  1183. int ha_ndbcluster::create_index(const char *name, 
  1184. KEY *key_info,
  1185. bool unique)
  1186. {
  1187.   Ndb *ndb= get_ndb();
  1188.   NdbDictionary::Dictionary *dict= ndb->getDictionary();
  1189.   KEY_PART_INFO *key_part= key_info->key_part;
  1190.   KEY_PART_INFO *end= key_part + key_info->key_parts;
  1191.   
  1192.   DBUG_ENTER("create_index");
  1193.   DBUG_PRINT("enter", ("name: %s ", name));
  1194.   NdbDictionary::Index ndb_index(name);
  1195.   if (unique)
  1196.     ndb_index.setType(NdbDictionary::Index::UniqueHashIndex);
  1197.   else 
  1198.   {
  1199.     ndb_index.setType(NdbDictionary::Index::OrderedIndex);
  1200.     // TODO Only temporary ordered indexes supported
  1201.     ndb_index.setLogging(FALSE); 
  1202.   }
  1203.   ndb_index.setTable(m_tabname);
  1204.   for (; key_part != end; key_part++) 
  1205.   {
  1206.     Field *field= key_part->field;
  1207.     DBUG_PRINT("info", ("attr: %s", field->field_name));
  1208.     {
  1209.       char truncated_field_name[NDB_MAX_ATTR_NAME_SIZE];
  1210.       strnmov(truncated_field_name,field->field_name,sizeof(truncated_field_name));
  1211.       truncated_field_name[sizeof(truncated_field_name)-1]= '';
  1212.       ndb_index.addColumnName(truncated_field_name);
  1213.     }
  1214.   }
  1215.   
  1216.   if (dict->createIndex(ndb_index))
  1217.     ERR_RETURN(dict->getNdbError());
  1218.   // Success
  1219.   DBUG_PRINT("info", ("Created index %s", name));
  1220.   DBUG_RETURN(0);  
  1221. }
  1222. /*
  1223.   Rename a table in NDB Cluster
  1224. */
  1225. int ha_ndbcluster::rename_table(const char *from, const char *to)
  1226. {
  1227.   NDBDICT *dict;
  1228.   char new_tabname[FN_HEADLEN];
  1229.   const NDBTAB *orig_tab;
  1230.   int result;
  1231.   DBUG_ENTER("ha_ndbcluster::rename_table");
  1232.   DBUG_PRINT("info", ("Renaming %s to %s", from, to));
  1233.   set_dbname(from);
  1234.   set_tabname(from);
  1235.   set_tabname(to, new_tabname);
  1236.   if (check_ndb_connection())
  1237.     DBUG_RETURN(my_errno= HA_ERR_NO_CONNECTION);
  1238.   
  1239.   Ndb *ndb= get_ndb();
  1240.   dict= ndb->getDictionary();
  1241.   if (!(orig_tab= dict->getTable(m_tabname)))
  1242.     ERR_RETURN(dict->getNdbError());
  1243.   // Check if thread has stale local cache
  1244.   if (orig_tab->getObjectStatus() == NdbDictionary::Object::Invalid)
  1245.   {
  1246.     dict->removeCachedTable(m_tabname);
  1247.     if (!(orig_tab= dict->getTable(m_tabname)))
  1248.       ERR_RETURN(dict->getNdbError());
  1249.   }
  1250.   m_table= (void *)orig_tab;
  1251.   // Change current database to that of target table
  1252.   set_dbname(to);
  1253.   ndb->setDatabaseName(m_dbname);
  1254.   if (!(result= alter_table_name(new_tabname)))
  1255.   {
  1256.     // Rename .ndb file
  1257.     result= handler::rename_table(from, to);
  1258.   }
  1259.   DBUG_RETURN(result);
  1260. }
  1261. /*
  1262.   Rename a table in NDB Cluster using alter table
  1263.  */
  1264. int ha_ndbcluster::alter_table_name(const char *to)
  1265. {
  1266.   Ndb *ndb= get_ndb();
  1267.   NDBDICT *dict= ndb->getDictionary();
  1268.   const NDBTAB *orig_tab= (const NDBTAB *) m_table;
  1269.   int ret;
  1270.   DBUG_ENTER("alter_table_name_table");
  1271.   NdbDictionary::Table new_tab= *orig_tab;
  1272.   new_tab.setName(to);
  1273.   if (dict->alterTable(new_tab) != 0)
  1274.     ERR_RETURN(dict->getNdbError());
  1275.   m_table= NULL;
  1276.   m_table_info= NULL;
  1277.                                                                              
  1278.   DBUG_RETURN(0);
  1279. }
  1280. /*
  1281.   Delete a table from NDB Cluster
  1282.  */
  1283. int ha_ndbcluster::delete_table(const char *name)
  1284. {
  1285.   DBUG_ENTER("delete_table");
  1286.   DBUG_PRINT("enter", ("name: %s", name));
  1287.   set_dbname(name);
  1288.   set_tabname(name);
  1289.   
  1290.   if (check_ndb_connection())
  1291.     DBUG_RETURN(HA_ERR_NO_CONNECTION);
  1292.   // Remove .ndb file
  1293.   handler::delete_table(name);
  1294.   DBUG_RETURN(drop_table());
  1295. }
  1296. /*
  1297.   Drop a table in NDB Cluster
  1298.  */
  1299. int ha_ndbcluster::drop_table()
  1300. {
  1301.   Ndb *ndb= get_ndb();
  1302.   NdbDictionary::Dictionary *dict= ndb->getDictionary();
  1303.   
  1304.   DBUG_ENTER("drop_table");
  1305.   DBUG_PRINT("enter", ("Deleting %s", m_tabname));
  1306.   
  1307.   if (dict->dropTable(m_tabname)) 
  1308.   {
  1309.     const NdbError err= dict->getNdbError();
  1310.     if (err.code == 709)
  1311.       ; // 709: No such table existed
  1312.     else 
  1313.       ERR_RETURN(dict->getNdbError());
  1314.   }  
  1315.   release_metadata();
  1316.   DBUG_RETURN(0);
  1317. }
  1318. longlong ha_ndbcluster::get_auto_increment()
  1319. {  
  1320.   DBUG_ENTER("get_auto_increment");
  1321.   DBUG_PRINT("enter", ("m_tabname: %s", m_tabname));
  1322.   Ndb *ndb= get_ndb();
  1323.   
  1324.   if (m_rows_inserted > m_rows_to_insert)
  1325.   {
  1326.     /* We guessed too low */
  1327.     m_rows_to_insert+= m_autoincrement_prefetch;
  1328.   }
  1329.   int cache_size= 
  1330.     (int)
  1331.     (m_rows_to_insert - m_rows_inserted < m_autoincrement_prefetch) ?
  1332.     m_rows_to_insert - m_rows_inserted 
  1333.     : (m_rows_to_insert > m_autoincrement_prefetch) ? 
  1334.     m_rows_to_insert 
  1335.     : m_autoincrement_prefetch;
  1336.   Uint64 auto_value= NDB_FAILED_AUTO_INCREMENT;
  1337.   uint retries= NDB_AUTO_INCREMENT_RETRIES;
  1338.   do {
  1339.     auto_value=
  1340.       (m_skip_auto_increment) ? 
  1341.       ndb->readAutoIncrementValue((const NDBTAB *) m_table)
  1342.       : ndb->getAutoIncrementValue((const NDBTAB *) m_table, cache_size);
  1343.   } while (auto_value == NDB_FAILED_AUTO_INCREMENT && 
  1344.            --retries &&
  1345.            ndb->getNdbError().status == NdbError::TemporaryError);
  1346.   if (auto_value == NDB_FAILED_AUTO_INCREMENT)
  1347.   {
  1348.     const NdbError err= ndb->getNdbError();
  1349.     sql_print_error("Error %lu in ::get_auto_increment(): %s",
  1350.                     (ulong) err.code, err.message);
  1351.     DBUG_RETURN(~(ulonglong) 0);
  1352.   }
  1353.   DBUG_RETURN((longlong)auto_value);
  1354. }
  1355. /*
  1356.   Constructor for the NDB Cluster table handler 
  1357.  */
  1358. ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
  1359.   handler(table_arg),
  1360.   m_active_trans(NULL),
  1361.   m_active_cursor(NULL),
  1362.   m_table(NULL),
  1363.   m_table_version(-1),
  1364.   m_table_info(NULL),
  1365.   m_table_flags(HA_REC_NOT_IN_SEQ |
  1366. HA_NULL_IN_KEY |
  1367. HA_AUTO_PART_KEY |
  1368. HA_NO_PREFIX_CHAR_KEYS),
  1369.   m_share(0),
  1370.   m_use_write(FALSE),
  1371.   m_ignore_dup_key(FALSE),
  1372.   m_primary_key_update(FALSE),
  1373.   m_retrieve_all_fields(FALSE),
  1374.   m_retrieve_primary_key(FALSE),
  1375.   m_rows_to_insert((ha_rows) 1),
  1376.   m_rows_inserted((ha_rows) 0),
  1377.   m_bulk_insert_rows((ha_rows) 1024),
  1378.   m_bulk_insert_not_flushed(FALSE),
  1379.   m_ops_pending(0),
  1380.   m_skip_auto_increment(TRUE),
  1381.   m_blobs_pending(0),
  1382.   m_blobs_buffer(0),
  1383.   m_blobs_buffer_size(0),
  1384.   m_dupkey((uint) -1),
  1385.   m_ha_not_exact_count(FALSE),
  1386.   m_force_send(TRUE),
  1387.   m_autoincrement_prefetch((ha_rows) 32),
  1388.   m_transaction_on(TRUE),
  1389.   m_use_local_query_cache(FALSE)
  1390.   int i;
  1391.   
  1392.   DBUG_ENTER("ha_ndbcluster");
  1393.   m_tabname[0]= '';
  1394.   m_dbname[0]= '';
  1395.   records= ~(ha_rows)0; // uninitialized
  1396.   block_size= 1024;
  1397.   for (i= 0; i < MAX_KEY; i++)
  1398.   {
  1399.     m_index[i].type= UNDEFINED_INDEX;
  1400.     m_index[i].unique_index= NULL;
  1401.     m_index[i].index= NULL;
  1402.     m_index[i].unique_index_attrid_map= NULL;
  1403.   }
  1404.   DBUG_VOID_RETURN;
  1405. }
  1406. /*
  1407.   Destructor for NDB Cluster table handler
  1408.  */
  1409. ha_ndbcluster::~ha_ndbcluster() 
  1410. {
  1411.   DBUG_ENTER("~ha_ndbcluster");
  1412.   if (m_share)
  1413.     free_share(m_share);
  1414.   release_metadata();
  1415.   my_free(m_blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
  1416.   m_blobs_buffer= 0;
  1417.   // Check for open cursor/transaction
  1418.   if (m_active_cursor) {
  1419.   }
  1420.   DBUG_ASSERT(m_active_cursor == NULL);
  1421.   if (m_active_trans) {
  1422.   }
  1423.   DBUG_ASSERT(m_active_trans == NULL);
  1424.   DBUG_VOID_RETURN;
  1425. }
  1426. /*
  1427.   Open a table for further use
  1428.   - fetch metadata for this table from NDB
  1429.   - check that table exists
  1430. */
  1431. int ha_ndbcluster::open(const char *name, int mode, uint test_if_locked)
  1432. {
  1433.   int res;
  1434.   KEY *key;
  1435.   DBUG_ENTER("open");
  1436.   DBUG_PRINT("enter", ("name: %s mode: %d test_if_locked: %d",
  1437.                        name, mode, test_if_locked));
  1438.   
  1439.   // Setup ref_length to make room for the whole 
  1440.   // primary key to be written in the ref variable
  1441.   
  1442.   if (table->primary_key != MAX_KEY) 
  1443.   {
  1444.     key= table->key_info+table->primary_key;
  1445.     ref_length= key->key_length;
  1446.     DBUG_PRINT("info", (" ref_length: %d", ref_length));
  1447.   }
  1448.   // Init table lock structure 
  1449.   if (!(m_share=get_share(name)))
  1450.     DBUG_RETURN(1);
  1451.   thr_lock_data_init(&m_share->lock,&m_lock,(void*) 0);
  1452.   
  1453.   set_dbname(name);
  1454.   set_tabname(name);
  1455.   
  1456.   if (check_ndb_connection()) {
  1457.     free_share(m_share); m_share= 0;
  1458.     DBUG_RETURN(HA_ERR_NO_CONNECTION);
  1459.   }
  1460.   
  1461.   res= get_metadata(name);
  1462.   if (!res)
  1463.     info(HA_STATUS_VARIABLE | HA_STATUS_CONST);
  1464.   DBUG_RETURN(res);
  1465. }
  1466. /*
  1467.   Close the table
  1468.   - release resources setup by open()
  1469.  */
  1470. int ha_ndbcluster::close(void)
  1471. {
  1472.   DBUG_ENTER("close");  
  1473.   free_share(m_share); m_share= 0;
  1474.   release_metadata();
  1475.   DBUG_RETURN(0);
  1476. }
  1477. Thd_ndb* ha_ndbcluster::seize_thd_ndb()
  1478. {
  1479.   Thd_ndb *thd_ndb;
  1480.   DBUG_ENTER("seize_thd_ndb");
  1481.   thd_ndb= new Thd_ndb();
  1482.   thd_ndb->ndb->getDictionary()->set_local_table_data_size(
  1483.     sizeof(Ndb_local_table_statistics)
  1484.     );
  1485.   if (thd_ndb->ndb->init(max_transactions) != 0)
  1486.   {
  1487.     ERR_PRINT(thd_ndb->ndb->getNdbError());
  1488.     /*
  1489.       TODO 
  1490.       Alt.1 If init fails because to many allocated Ndb 
  1491.       wait on condition for a Ndb object to be released.
  1492.       Alt.2 Seize/release from pool, wait until next release 
  1493.     */
  1494.     delete thd_ndb;
  1495.     thd_ndb= NULL;
  1496.   }
  1497.   DBUG_RETURN(thd_ndb);
  1498. }
  1499. void ha_ndbcluster::release_thd_ndb(Thd_ndb* thd_ndb)
  1500. {
  1501.   DBUG_ENTER("release_thd_ndb");
  1502.   delete thd_ndb;
  1503.   DBUG_VOID_RETURN;
  1504. }
  1505. /*
  1506.   If this thread already has a Thd_ndb object allocated
  1507.   in current THD, reuse it. Otherwise
  1508.   seize a Thd_ndb object, assign it to current THD and use it.
  1509.  
  1510. */
  1511. Ndb* check_ndb_in_thd(THD* thd)
  1512. {
  1513.   DBUG_ENTER("check_ndb_in_thd");
  1514.   Thd_ndb *thd_ndb= (Thd_ndb*)thd->transaction.thd_ndb;
  1515.   
  1516.   if (!thd_ndb)
  1517.   {
  1518.     if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb()))
  1519.       DBUG_RETURN(NULL);
  1520.     thd->transaction.thd_ndb= thd_ndb;
  1521.   }
  1522.   DBUG_RETURN(thd_ndb->ndb);
  1523. }
  1524. int ha_ndbcluster::check_ndb_connection()
  1525. {
  1526.   THD* thd= current_thd;
  1527.   Ndb *ndb;
  1528.   DBUG_ENTER("check_ndb_connection");
  1529.   
  1530.   if (!(ndb= check_ndb_in_thd(thd)))
  1531.     DBUG_RETURN(HA_ERR_NO_CONNECTION);
  1532.   ndb->setDatabaseName(m_dbname);
  1533.   DBUG_RETURN(0);
  1534. }
  1535. void ndbcluster_close_connection(THD *thd)
  1536. {
  1537.   Thd_ndb *thd_ndb= (Thd_ndb*)thd->transaction.thd_ndb;
  1538.   DBUG_ENTER("ndbcluster_close_connection");
  1539.   if (thd_ndb)
  1540.   {
  1541.     ha_ndbcluster::release_thd_ndb(thd_ndb);
  1542.     thd->transaction.thd_ndb= NULL;
  1543.   }
  1544.   DBUG_VOID_RETURN;
  1545. }
  1546. /*
  1547.   Try to discover one table from NDB
  1548.  */
  1549. int ndbcluster_discover(THD* thd, const char *db, const char *name,
  1550. const void** frmblob, uint* frmlen)
  1551. {
  1552.   uint len;
  1553.   const void* data;
  1554.   const NDBTAB* tab;
  1555.   Ndb* ndb;
  1556.   DBUG_ENTER("ndbcluster_discover");
  1557.   DBUG_PRINT("enter", ("db: %s, name: %s", db, name)); 
  1558.   if (!(ndb= check_ndb_in_thd(thd)))
  1559.     DBUG_RETURN(HA_ERR_NO_CONNECTION);  
  1560.   ndb->setDatabaseName(db);
  1561.   NDBDICT* dict= ndb->getDictionary();
  1562.   dict->set_local_table_data_size(sizeof(Ndb_local_table_statistics));
  1563.   dict->invalidateTable(name);
  1564.   if (!(tab= dict->getTable(name)))
  1565.   {    
  1566.     const NdbError err= dict->getNdbError();
  1567.     if (err.code == 709)
  1568.       DBUG_RETURN(-1);
  1569.     ERR_RETURN(err);
  1570.   }
  1571.   DBUG_PRINT("info", ("Found table %s", tab->getName()));
  1572.   
  1573.   len= tab->getFrmLength();  
  1574.   if (len == 0 || tab->getFrmData() == NULL)
  1575.   {
  1576.     DBUG_PRINT("error", ("No frm data found."));
  1577.     DBUG_RETURN(1);
  1578.   }
  1579.   
  1580.   if (unpackfrm(&data, &len, tab->getFrmData()))
  1581.   {
  1582.     DBUG_PRINT("error", ("Could not unpack table"));
  1583.     DBUG_RETURN(1);
  1584.   }
  1585.   *frmlen= len;
  1586.   *frmblob= data;
  1587.   
  1588.   DBUG_RETURN(0);
  1589. }
  1590. /*
  1591.   Check if a table exists in NDB
  1592.    
  1593.  */
  1594. int ndbcluster_table_exists_in_engine(THD* thd, const char *db, const char *name)
  1595. {
  1596.   uint len;
  1597.   const void* data;
  1598.   const NDBTAB* tab;
  1599.   Ndb* ndb;
  1600.   DBUG_ENTER("ndbcluster_table_exists_in_engine");
  1601.   DBUG_PRINT("enter", ("db: %s, name: %s", db, name)); 
  1602.   if (!(ndb= check_ndb_in_thd(thd)))
  1603.     DBUG_RETURN(HA_ERR_NO_CONNECTION);  
  1604.   ndb->setDatabaseName(db);
  1605.   NDBDICT* dict= ndb->getDictionary();
  1606.   dict->set_local_table_data_size(sizeof(Ndb_local_table_statistics));
  1607.   dict->invalidateTable(name);
  1608.   if (!(tab= dict->getTable(name)))
  1609.   {    
  1610.     const NdbError err= dict->getNdbError();
  1611.     if (err.code == 709)
  1612.       DBUG_RETURN(0);
  1613.     ERR_RETURN(err);
  1614.   }
  1615.   
  1616.   DBUG_PRINT("info", ("Found table %s", tab->getName()));
  1617.   DBUG_RETURN(1);
  1618. }
  1619. extern "C" byte* tables_get_key(const char *entry, uint *length,
  1620. my_bool not_used __attribute__((unused)))
  1621. {
  1622.   *length= strlen(entry);
  1623.   return (byte*) entry;
  1624. }
  1625. /*
  1626.   Drop a database in NDB Cluster
  1627.  */
  1628. int ndbcluster_drop_database(const char *path)
  1629. {
  1630.   DBUG_ENTER("ndbcluster_drop_database");
  1631.   THD *thd= current_thd;
  1632.   char dbname[FN_HEADLEN];
  1633.   Ndb* ndb;
  1634.   NdbDictionary::Dictionary::List list;
  1635.   uint i;
  1636.   char *tabname;
  1637.   List<char> drop_list;
  1638.   int ret= 0;
  1639.   ha_ndbcluster::set_dbname(path, (char *)&dbname);
  1640.   DBUG_PRINT("enter", ("db: %s", dbname));
  1641.   
  1642.   if (!(ndb= check_ndb_in_thd(thd)))
  1643.     DBUG_RETURN(HA_ERR_NO_CONNECTION);
  1644.   
  1645.   // List tables in NDB
  1646.   NDBDICT *dict= ndb->getDictionary();
  1647.   if (dict->listObjects(list, 
  1648.                         NdbDictionary::Object::UserTable) != 0)
  1649.     ERR_RETURN(dict->getNdbError());
  1650.   for (i= 0 ; i < list.count ; i++)
  1651.   {
  1652.     NdbDictionary::Dictionary::List::Element& t= list.elements[i];
  1653.     DBUG_PRINT("info", ("Found %s/%s in NDB", t.database, t.name));     
  1654.     
  1655.     // Add only tables that belongs to db
  1656.     if (my_strcasecmp(system_charset_info, t.database, dbname))
  1657.       continue;
  1658.     DBUG_PRINT("info", ("%s must be dropped", t.name));     
  1659.     drop_list.push_back(thd->strdup(t.name));
  1660.   }
  1661.   // Drop any tables belonging to database
  1662.   ndb->setDatabaseName(dbname);
  1663.   List_iterator_fast<char> it(drop_list);
  1664.   while ((tabname=it++))
  1665.   {
  1666.     if (dict->dropTable(tabname))
  1667.     {
  1668.       const NdbError err= dict->getNdbError();
  1669.       if (err.code != 709)
  1670.       {
  1671.         ERR_PRINT(err);
  1672.         ret= ndb_to_mysql_error(&err);
  1673.       }
  1674.     }
  1675.   }
  1676.   DBUG_RETURN(ret);      
  1677. }
  1678. int ndbcluster_find_files(THD *thd,const char *db,const char *path,
  1679.   const char *wild, bool dir, List<char> *files)
  1680. {
  1681.   DBUG_ENTER("ndbcluster_find_files");
  1682.   DBUG_PRINT("enter", ("db: %s", db));
  1683.   { // extra bracket to avoid gcc 2.95.3 warning
  1684.   uint i;
  1685.   Ndb* ndb;
  1686.   char name[FN_REFLEN];
  1687.   HASH ndb_tables, ok_tables;
  1688.   NdbDictionary::Dictionary::List list;
  1689.   if (!(ndb= check_ndb_in_thd(thd)))
  1690.     DBUG_RETURN(HA_ERR_NO_CONNECTION);
  1691.   if (dir)
  1692.     DBUG_RETURN(0); // Discover of databases not yet supported
  1693.   // List tables in NDB
  1694.   NDBDICT *dict= ndb->getDictionary();
  1695.   if (dict->listObjects(list, 
  1696. NdbDictionary::Object::UserTable) != 0)
  1697.     ERR_RETURN(dict->getNdbError());
  1698.   if (hash_init(&ndb_tables, system_charset_info,list.count,0,0,
  1699. (hash_get_key)tables_get_key,0,0))
  1700.   {
  1701.     DBUG_PRINT("error", ("Failed to init HASH ndb_tables"));
  1702.     DBUG_RETURN(-1);
  1703.   }
  1704.   if (hash_init(&ok_tables, system_charset_info,32,0,0,
  1705. (hash_get_key)tables_get_key,0,0))
  1706.   {
  1707.     DBUG_PRINT("error", ("Failed to init HASH ok_tables"));
  1708.     hash_free(&ndb_tables);
  1709.     DBUG_RETURN(-1);
  1710.   }  
  1711.   for (i= 0 ; i < list.count ; i++)
  1712.   {
  1713.     NdbDictionary::Dictionary::List::Element& t= list.elements[i];
  1714.     DBUG_PRINT("info", ("Found %s/%s in NDB", t.database, t.name));     
  1715.     // Add only tables that belongs to db
  1716.     if (my_strcasecmp(system_charset_info, t.database, db))
  1717.       continue;
  1718.     // Apply wildcard to list of tables in NDB
  1719.     if (wild)
  1720.     {
  1721.       if (lower_case_table_names)
  1722.       {
  1723. if (wild_case_compare(files_charset_info, t.name, wild))
  1724.   continue;
  1725.       }
  1726.       else if (wild_compare(t.name,wild,0))
  1727. continue;
  1728.     }
  1729.     DBUG_PRINT("info", ("Inserting %s into ndb_tables hash", t.name));     
  1730.     my_hash_insert(&ndb_tables, (byte*)thd->strdup(t.name));
  1731.   }
  1732.   char *file_name;
  1733.   List_iterator<char> it(*files);
  1734.   List<char> delete_list;
  1735.   while ((file_name=it++))
  1736.   {
  1737.     DBUG_PRINT("info", ("%s", file_name));     
  1738.     if (hash_search(&ndb_tables, file_name, strlen(file_name)))
  1739.     {
  1740.       DBUG_PRINT("info", ("%s existed in NDB _and_ on disk ", file_name));
  1741.       // File existed in NDB and as frm file, put in ok_tables list
  1742.       my_hash_insert(&ok_tables, (byte*)file_name);
  1743.       continue;
  1744.     }
  1745.     
  1746.     // File is not in NDB, check for .ndb file with this name
  1747.     (void)strxnmov(name, FN_REFLEN, 
  1748.    mysql_data_home,"/",db,"/",file_name,ha_ndb_ext,NullS);
  1749.     DBUG_PRINT("info", ("Check access for %s", name));
  1750.     if (access(name, F_OK))
  1751.     {
  1752.       DBUG_PRINT("info", ("%s did not exist on disk", name));     
  1753.       // .ndb file did not exist on disk, another table type
  1754.       continue;
  1755.     }
  1756.     DBUG_PRINT("info", ("%s existed on disk", name));     
  1757.     // The .ndb file exists on disk, but it's not in list of tables in ndb
  1758.     // Verify that handler agrees table is gone.
  1759.     if (ndbcluster_table_exists_in_engine(thd, db, file_name) == 0)    
  1760.     {
  1761.       DBUG_PRINT("info", ("NDB says %s does not exists", file_name));     
  1762.       it.remove();
  1763.       // Put in list of tables to remove from disk
  1764.       delete_list.push_back(thd->strdup(file_name));
  1765.     }
  1766.   }
  1767.   // Check for new files to discover
  1768.   DBUG_PRINT("info", ("Checking for new files to discover"));       
  1769.   List<char> create_list;
  1770.   for (i= 0 ; i < ndb_tables.records ; i++)
  1771.   {
  1772.     file_name= hash_element(&ndb_tables, i);
  1773.     if (!hash_search(&ok_tables, file_name, strlen(file_name)))
  1774.     {
  1775.       DBUG_PRINT("info", ("%s must be discovered", file_name));       
  1776.       // File is in list of ndb tables and not in ok_tables
  1777.       // This table need to be created
  1778.       create_list.push_back(thd->strdup(file_name));
  1779.     }
  1780.   }
  1781.   // Lock mutex before deleting and creating frm files
  1782.   pthread_mutex_lock(&LOCK_open);
  1783.   if (!global_read_lock)
  1784.   {
  1785.     // Delete old files
  1786.     List_iterator_fast<char> it3(delete_list);
  1787.     while ((file_name=it3++))
  1788.     {  
  1789.       DBUG_PRINT("info", ("Remove table %s/%s",db, file_name ));
  1790.       // Delete the table and all related files
  1791.       TABLE_LIST table_list;
  1792.       bzero((char*) &table_list,sizeof(table_list));
  1793.       table_list.db= (char*) db;
  1794.       table_list.alias=table_list.real_name=(char*)file_name;
  1795.       (void)mysql_rm_table_part2(thd, &table_list, 
  1796.  /* if_exists */ TRUE, 
  1797.  /* drop_temporary */ FALSE, 
  1798.  /* dont_log_query*/ TRUE);
  1799.     }
  1800.   }
  1801.   // Create new files
  1802.   List_iterator_fast<char> it2(create_list);
  1803.   while ((file_name=it2++))
  1804.   {  
  1805.     DBUG_PRINT("info", ("Table %s need discovery", name));
  1806.     if (ha_create_table_from_engine(thd, db, file_name) == 0)
  1807.       files->push_back(thd->strdup(file_name)); 
  1808.   }
  1809.   pthread_mutex_unlock(&LOCK_open);      
  1810.   
  1811.   hash_free(&ok_tables);
  1812.   hash_free(&ndb_tables);
  1813.   } // extra bracket to avoid gcc 2.95.3 warning
  1814.   DBUG_RETURN(0);    
  1815. }
  1816. /*
  1817.   Initialise all gloal variables before creating 
  1818.   a NDB Cluster table handler
  1819.  */
  1820. bool ndbcluster_init()
  1821. {
  1822.   int res;
  1823.   DBUG_ENTER("ndbcluster_init");
  1824.   // Set connectstring if specified
  1825.   if (opt_ndbcluster_connectstring != 0)
  1826.     DBUG_PRINT("connectstring", ("%s", opt_ndbcluster_connectstring));     
  1827.   if ((g_ndb_cluster_connection=
  1828.        new Ndb_cluster_connection(opt_ndbcluster_connectstring)) == 0)
  1829.   {
  1830.     DBUG_PRINT("error",("Ndb_cluster_connection(%s)",
  1831. opt_ndbcluster_connectstring));
  1832.     goto ndbcluster_init_error;
  1833.   }
  1834.   g_ndb_cluster_connection->set_optimized_node_selection
  1835.     (opt_ndb_optimized_node_selection);
  1836.   // Create a Ndb object to open the connection  to NDB
  1837.   g_ndb= new Ndb(g_ndb_cluster_connection, "sys");
  1838.   g_ndb->getDictionary()->set_local_table_data_size(sizeof(Ndb_local_table_statistics));
  1839.   if (g_ndb->init() != 0)
  1840.   {
  1841.     ERR_PRINT (g_ndb->getNdbError());
  1842.     goto ndbcluster_init_error;
  1843.   }
  1844.   if ((res= g_ndb_cluster_connection->connect(0,0,0)) == 0)
  1845.   {
  1846.     DBUG_PRINT("info",("NDBCLUSTER storage engine at %s on port %d",
  1847.        g_ndb_cluster_connection->get_connected_host(),
  1848.        g_ndb_cluster_connection->get_connected_port()));
  1849.     g_ndb_cluster_connection->wait_until_ready(10,3);
  1850.   } 
  1851.   else if(res == 1)
  1852.   {
  1853.     if (g_ndb_cluster_connection->start_connect_thread()) {
  1854.       DBUG_PRINT("error", ("g_ndb_cluster_connection->start_connect_thread()"));
  1855.       goto ndbcluster_init_error;
  1856.     }
  1857.     {
  1858.       char buf[1024];
  1859.       DBUG_PRINT("info",("NDBCLUSTER storage engine not started, will connect using %s",
  1860.  g_ndb_cluster_connection->get_connectstring(buf,sizeof(buf))));
  1861.     }
  1862.   }
  1863.   else
  1864.   {
  1865.     DBUG_ASSERT(res == -1);
  1866.     DBUG_PRINT("error", ("permanent error"));
  1867.     goto ndbcluster_init_error;
  1868.   }
  1869.   
  1870.   (void) hash_init(&ndbcluster_open_tables,system_charset_info,32,0,0,
  1871.                    (hash_get_key) ndbcluster_get_key,0,0);
  1872.   pthread_mutex_init(&ndbcluster_mutex,MY_MUTEX_INIT_FAST);
  1873.   ndbcluster_inited= 1;
  1874.   DBUG_RETURN(FALSE);
  1875.  ndbcluster_init_error:
  1876.   ndbcluster_end();
  1877.   DBUG_RETURN(TRUE);
  1878. }
  1879. /*
  1880.   End use of the NDB Cluster table handler
  1881.   - free all global variables allocated by 
  1882.     ndcluster_init()
  1883. */
  1884. bool ndbcluster_end()
  1885. {
  1886.   DBUG_ENTER("ndbcluster_end");
  1887.   if(g_ndb)
  1888.   {
  1889. #ifndef DBUG_OFF
  1890.     Ndb::Free_list_usage tmp; tmp.m_name= 0;
  1891.     while (g_ndb->get_free_list_usage(&tmp))
  1892.     {
  1893.       uint leaked= (uint) tmp.m_created - tmp.m_free;
  1894.       if (leaked)
  1895.         fprintf(stderr, "NDB: Found %u %s%s that %s not been releasedn",
  1896.                 leaked, tmp.m_name,
  1897.                 (leaked == 1)?"":"'s",
  1898.                 (leaked == 1)?"has":"have");
  1899.     }
  1900. #endif
  1901.     delete g_ndb;
  1902.   }
  1903.   g_ndb= NULL;
  1904.   if (g_ndb_cluster_connection)
  1905.     delete g_ndb_cluster_connection;
  1906.   g_ndb_cluster_connection= NULL;
  1907.   if (!ndbcluster_inited)
  1908.     DBUG_RETURN(0);
  1909.   hash_free(&ndbcluster_open_tables);
  1910.   pthread_mutex_destroy(&ndbcluster_mutex);
  1911.   ndbcluster_inited= 0;
  1912.   DBUG_RETURN(0);
  1913. }
  1914. /*
  1915.   Static error print function called from
  1916.   static handler method ndbcluster_commit
  1917.   and ndbcluster_rollback
  1918. */
  1919. void ndbcluster_print_error(int error, const NdbOperation *error_op)
  1920. {
  1921.   DBUG_ENTER("ndbcluster_print_error");
  1922.   TABLE tab;
  1923.   const char *tab_name= (error_op) ? error_op->getTableName() : "";
  1924.   tab.table_name= (char *) tab_name;
  1925.   ha_ndbcluster error_handler(&tab);
  1926.   tab.file= &error_handler;
  1927.   error_handler.print_error(error, MYF(0));
  1928.   DBUG_VOID_RETURN;
  1929. }
  1930. /**
  1931.  * Set a given location from full pathname to database name
  1932.  *
  1933.  */
  1934. void ha_ndbcluster::set_dbname(const char *path_name, char *dbname)
  1935. {
  1936.   char *end, *ptr;
  1937.   
  1938.   /* Scan name from the end */
  1939.   ptr= strend(path_name)-1;
  1940.   while (ptr >= path_name && *ptr != '\' && *ptr != '/') {
  1941.     ptr--;
  1942.   }
  1943.   ptr--;
  1944.   end= ptr;
  1945.   while (ptr >= path_name && *ptr != '\' && *ptr != '/') {
  1946.     ptr--;
  1947.   }
  1948.   uint name_len= end - ptr;
  1949.   memcpy(dbname, ptr + 1, name_len);
  1950.   dbname[name_len]= '';
  1951. #ifdef __WIN__
  1952.   /* Put to lower case */
  1953.   
  1954.   ptr= dbname;
  1955.   
  1956.   while (*ptr != '') {
  1957.     *ptr= tolower(*ptr);
  1958.     ptr++;
  1959.   }
  1960. #endif
  1961. }
  1962. /*
  1963.   Set m_dbname from full pathname to table file
  1964.  */
  1965. void ha_ndbcluster::set_dbname(const char *path_name)
  1966. {
  1967.   set_dbname(path_name, m_dbname);
  1968. }
  1969. /**
  1970.  * Set a given location from full pathname to table file
  1971.  *
  1972.  */
  1973. void
  1974. ha_ndbcluster::set_tabname(const char *path_name, char * tabname)
  1975. {
  1976.   char *end, *ptr;
  1977.   
  1978.   /* Scan name from the end */
  1979.   end= strend(path_name)-1;
  1980.   ptr= end;
  1981.   while (ptr >= path_name && *ptr != '\' && *ptr != '/') {
  1982.     ptr--;
  1983.   }
  1984.   uint name_len= end - ptr;
  1985.   memcpy(tabname, ptr + 1, end - ptr);
  1986.   tabname[name_len]= '';
  1987. #ifdef __WIN__
  1988.   /* Put to lower case */
  1989.   ptr= tabname;
  1990.   
  1991.   while (*ptr != '') {
  1992.     *ptr= tolower(*ptr);
  1993.     ptr++;
  1994.   }
  1995. #endif
  1996. }
  1997. /*
  1998.   Set m_tabname from full pathname to table file 
  1999.  */
  2000. void ha_ndbcluster::set_tabname(const char *path_name)
  2001. {
  2002.   set_tabname(path_name, m_tabname);
  2003. }
  2004. ha_rows 
  2005. ha_ndbcluster::records_in_range(uint inx, key_range *min_key,
  2006.                                 key_range *max_key)
  2007. {
  2008.   KEY *key_info= table->key_info + inx;
  2009.   uint key_length= key_info->key_length;
  2010.   NDB_INDEX_TYPE idx_type= get_index_type(inx);  
  2011.   DBUG_ENTER("records_in_range");
  2012.   // Prevent partial read of hash indexes by returning HA_POS_ERROR
  2013.   if ((idx_type == UNIQUE_INDEX || idx_type == PRIMARY_KEY_INDEX) &&
  2014.       ((min_key && min_key->length < key_length) ||
  2015.        (max_key && max_key->length < key_length)))
  2016.     DBUG_RETURN(HA_POS_ERROR);
  2017.   
  2018.   // Read from hash index with full key
  2019.   // This is a "const" table which returns only one record!      
  2020.   if ((idx_type != ORDERED_INDEX) &&
  2021.       ((min_key && min_key->length == key_length) || 
  2022.        (max_key && max_key->length == key_length)))
  2023.     DBUG_RETURN(1);
  2024.   
  2025.   DBUG_RETURN(10); /* Good guess when you don't know anything */
  2026. }
  2027. ulong ha_ndbcluster::table_flags(void) const
  2028. {
  2029.   if (m_ha_not_exact_count)
  2030.     return m_table_flags | HA_NOT_EXACT_COUNT;
  2031.   else
  2032.     return m_table_flags;
  2033. }
  2034. const char * ha_ndbcluster::table_type() const 
  2035. {
  2036.   return("ndbcluster");
  2037. }
  2038. uint ha_ndbcluster::max_supported_record_length() const
  2039.   return NDB_MAX_TUPLE_SIZE;
  2040. }
  2041. uint ha_ndbcluster::max_supported_keys() const
  2042. {
  2043.   return MAX_KEY;
  2044. }
  2045. uint ha_ndbcluster::max_supported_key_parts() const 
  2046. {
  2047.   return NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY;
  2048. }
  2049. uint ha_ndbcluster::max_supported_key_length() const
  2050. {
  2051.   return NDB_MAX_KEY_SIZE;
  2052. }
  2053. bool ha_ndbcluster::low_byte_first() const
  2054. #ifdef WORDS_BIGENDIAN
  2055.   return FALSE;
  2056. #else
  2057.   return TRUE;
  2058. #endif
  2059. }
  2060. bool ha_ndbcluster::has_transactions()
  2061. {
  2062.   return m_transaction_on;
  2063. }
  2064. const char* ha_ndbcluster::index_type(uint key_number)
  2065. {
  2066.   switch (get_index_type(key_number)) {
  2067.   case ORDERED_INDEX:
  2068.   case UNIQUE_ORDERED_INDEX:
  2069.   case PRIMARY_KEY_ORDERED_INDEX:
  2070.     return "BTREE";
  2071.   case UNIQUE_INDEX:
  2072.   case PRIMARY_KEY_INDEX:
  2073.   default:
  2074.     return "HASH";
  2075.   }
  2076. }
  2077. uint8 ha_ndbcluster::table_cache_type()
  2078. {
  2079.   if (m_use_local_query_cache)
  2080.     return HA_CACHE_TBL_TRANSACT;
  2081.   else
  2082.     return HA_CACHE_TBL_NOCACHE;
  2083. }
  2084. /*
  2085.   Handling the shared NDB_SHARE structure that is needed to 
  2086.   provide table locking.
  2087.   It's also used for sharing data with other NDB handlers
  2088.   in the same MySQL Server. There is currently not much
  2089.   data we want to or can share.
  2090.  */
  2091. static byte* ndbcluster_get_key(NDB_SHARE *share,uint *length,
  2092. my_bool not_used __attribute__((unused)))
  2093. {
  2094.   *length=share->table_name_length;
  2095.   return (byte*) share->table_name;
  2096. }
  2097. static NDB_SHARE* get_share(const char *table_name)
  2098. {
  2099.   NDB_SHARE *share;
  2100.   pthread_mutex_lock(&ndbcluster_mutex);
  2101.   uint length=(uint) strlen(table_name);
  2102.   if (!(share=(NDB_SHARE*) hash_search(&ndbcluster_open_tables,
  2103.                                        (byte*) table_name,
  2104.                                        length)))
  2105.   {
  2106.     if ((share=(NDB_SHARE *) my_malloc(sizeof(*share)+length+1,
  2107.                                        MYF(MY_WME | MY_ZEROFILL))))
  2108.     {
  2109.       share->table_name_length=length;
  2110.       share->table_name=(char*) (share+1);
  2111.       strmov(share->table_name,table_name);
  2112.       if (my_hash_insert(&ndbcluster_open_tables, (byte*) share))
  2113.       {
  2114.         pthread_mutex_unlock(&ndbcluster_mutex);
  2115.         my_free((gptr) share,0);
  2116.         return 0;
  2117.       }
  2118.       thr_lock_init(&share->lock);
  2119.       pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST);
  2120.     }
  2121.   }
  2122.   share->use_count++;
  2123.   pthread_mutex_unlock(&ndbcluster_mutex);
  2124.   return share;
  2125. }
  2126. static void free_share(NDB_SHARE *share)
  2127. {
  2128.   pthread_mutex_lock(&ndbcluster_mutex);
  2129.   if (!--share->use_count)
  2130.   {
  2131.     hash_delete(&ndbcluster_open_tables, (byte*) share);
  2132.     thr_lock_delete(&share->lock);
  2133.     pthread_mutex_destroy(&share->mutex);
  2134.     my_free((gptr) share, MYF(0));
  2135.   }
  2136.   pthread_mutex_unlock(&ndbcluster_mutex);
  2137. }
  2138. /*
  2139.   Internal representation of the frm blob
  2140.    
  2141. */
  2142. struct frm_blob_struct 
  2143. {
  2144.   struct frm_blob_header 
  2145.   {
  2146.     uint ver;      // Version of header
  2147.     uint orglen;   // Original length of compressed data
  2148.     uint complen;  // Compressed length of data, 0=uncompressed
  2149.   } head;
  2150.   char data[1];  
  2151. };
  2152. static int packfrm(const void *data, uint len, 
  2153.    const void **pack_data, uint *pack_len)
  2154. {
  2155.   int error;
  2156.   ulong org_len, comp_len;
  2157.   uint blob_len;
  2158.   frm_blob_struct* blob;
  2159.   DBUG_ENTER("packfrm");
  2160.   DBUG_PRINT("enter", ("data: %x, len: %d", data, len));
  2161.   
  2162.   error= 1;
  2163.   org_len= len;
  2164.   if (my_compress((byte*)data, &org_len, &comp_len))
  2165.     goto err;
  2166.   
  2167.   DBUG_PRINT("info", ("org_len: %d, comp_len: %d", org_len, comp_len));
  2168.   DBUG_DUMP("compressed", (char*)data, org_len);
  2169.   
  2170.   error= 2;
  2171.   blob_len= sizeof(frm_blob_struct::frm_blob_header)+org_len;
  2172.   if (!(blob= (frm_blob_struct*) my_malloc(blob_len,MYF(MY_WME))))
  2173.     goto err;
  2174.   
  2175.   // Store compressed blob in machine independent format
  2176.   int4store((char*)(&blob->head.ver), 1);
  2177.   int4store((char*)(&blob->head.orglen), comp_len);
  2178.   int4store((char*)(&blob->head.complen), org_len);
  2179.   
  2180.   // Copy frm data into blob, already in machine independent format
  2181.   memcpy(blob->data, data, org_len);  
  2182.   
  2183.   *pack_data= blob;
  2184.   *pack_len= blob_len;
  2185.   error= 0;
  2186.   
  2187.   DBUG_PRINT("exit", ("pack_data: %x, pack_len: %d", *pack_data, *pack_len));
  2188. err:
  2189.   DBUG_RETURN(error);
  2190.   
  2191. }
  2192. static int unpackfrm(const void **unpack_data, uint *unpack_len,
  2193.     const void *pack_data)
  2194. {
  2195.    const frm_blob_struct *blob= (frm_blob_struct*)pack_data;
  2196.    byte *data;
  2197.    ulong complen, orglen, ver;
  2198.    DBUG_ENTER("unpackfrm");
  2199.    DBUG_PRINT("enter", ("pack_data: %x", pack_data));
  2200.    complen= uint4korr((char*)&blob->head.complen);
  2201.    orglen= uint4korr((char*)&blob->head.orglen);
  2202.    ver= uint4korr((char*)&blob->head.ver);
  2203.  
  2204.    DBUG_PRINT("blob",("ver: %d complen: %d orglen: %d",
  2205.        ver,complen,orglen));
  2206.    DBUG_DUMP("blob->data", (char*) blob->data, complen);
  2207.  
  2208.    if (ver != 1)
  2209.      DBUG_RETURN(1);
  2210.    if (!(data= my_malloc(max(orglen, complen), MYF(MY_WME))))
  2211.      DBUG_RETURN(2);
  2212.    memcpy(data, blob->data, complen);
  2213.  
  2214.    if (my_uncompress(data, &complen, &orglen))
  2215.    {
  2216.      my_free((char*)data, MYF(0));
  2217.      DBUG_RETURN(3);
  2218.    }
  2219.    *unpack_data= data;
  2220.    *unpack_len= complen;
  2221.    DBUG_PRINT("exit", ("frmdata: %x, len: %d", *unpack_data, *unpack_len));
  2222.    DBUG_RETURN(0);
  2223. }
  2224. static 
  2225. int
  2226. ndb_get_table_statistics(Ndb* ndb, const char * table, 
  2227.  Uint64* row_count, Uint64* commit_count)
  2228. {
  2229.   DBUG_ENTER("ndb_get_table_statistics");
  2230.   DBUG_PRINT("enter", ("table: %s", table));
  2231.   NdbConnection* pTrans= ndb->startTransaction();
  2232.   do 
  2233.   {
  2234.     if (pTrans == NULL)
  2235.       break;
  2236.       
  2237.     NdbScanOperation* pOp= pTrans->getNdbScanOperation(table);
  2238.     if (pOp == NULL)
  2239.       break;
  2240.     
  2241.     NdbResultSet* rs= pOp->readTuples(NdbOperation::LM_CommittedRead); 
  2242.     if (rs == 0)
  2243.       break;
  2244.     
  2245.     int check= pOp->interpret_exit_last_row();
  2246.     if (check == -1)
  2247.       break;
  2248.     
  2249.     Uint64 rows, commits;
  2250.     pOp->getValue(NdbDictionary::Column::ROW_COUNT, (char*)&rows);
  2251.     pOp->getValue(NdbDictionary::Column::COMMIT_COUNT, (char*)&commits);
  2252.     
  2253.     check= pTrans->execute(NoCommit, AbortOnError, TRUE);
  2254.     if (check == -1)
  2255.       break;
  2256.     
  2257.     Uint64 sum_rows= 0;
  2258.     Uint64 sum_commits= 0;
  2259.     while((check= rs->nextResult(TRUE, TRUE)) == 0)
  2260.     {
  2261.       sum_rows+= rows;
  2262.       sum_commits+= commits;
  2263.     }
  2264.     
  2265.     if (check == -1)
  2266.       break;
  2267.     rs->close(TRUE);
  2268.     ndb->closeTransaction(pTrans);
  2269.     if(row_count)
  2270.       * row_count= sum_rows;
  2271.     if(commit_count)
  2272.       * commit_count= sum_commits;
  2273.     DBUG_PRINT("exit", ("records: %u commits: %u", sum_rows, sum_commits));
  2274.     DBUG_RETURN(0);
  2275.   } while(0);
  2276.   ndb->closeTransaction(pTrans);
  2277.   DBUG_PRINT("exit", ("failed"));
  2278.   DBUG_RETURN(-1);
  2279. }
  2280. /*
  2281.   Create a .ndb file to serve as a placeholder indicating 
  2282.   that the table with this name is a ndb table
  2283. */
  2284. int ha_ndbcluster::write_ndb_file()
  2285. {
  2286.   File file;
  2287.   bool error=1;
  2288.   char path[FN_REFLEN];
  2289.   
  2290.   DBUG_ENTER("write_ndb_file");
  2291.   DBUG_PRINT("enter", ("db: %s, name: %s", m_dbname, m_tabname));
  2292.   (void)strxnmov(path, FN_REFLEN, 
  2293.  mysql_data_home,"/",m_dbname,"/",m_tabname,ha_ndb_ext,NullS);
  2294.   if ((file=my_create(path, CREATE_MODE,O_RDWR | O_TRUNC,MYF(MY_WME))) >= 0)
  2295.   {
  2296.     // It's an empty file
  2297.     error=0;
  2298.     my_close(file,MYF(0));
  2299.   }
  2300.   DBUG_RETURN(error);
  2301. }
  2302. int
  2303. ndbcluster_show_status(THD* thd)
  2304. {
  2305.   Protocol *protocol= thd->protocol;
  2306.   
  2307.   DBUG_ENTER("ndbcluster_show_status");
  2308.   
  2309.   if (have_ndbcluster != SHOW_OPTION_YES) 
  2310.   {
  2311.     my_message(ER_NOT_SUPPORTED_YET,
  2312.        "Cannot call SHOW NDBCLUSTER STATUS because skip-ndbcluster is defined",
  2313.        MYF(0));
  2314.     DBUG_RETURN(TRUE);
  2315.   }
  2316.   
  2317.   List<Item> field_list;
  2318.   field_list.push_back(new Item_empty_string("free_list", 255));
  2319.   field_list.push_back(new Item_return_int("created", 10,MYSQL_TYPE_LONG));
  2320.   field_list.push_back(new Item_return_int("free", 10,MYSQL_TYPE_LONG));
  2321.   field_list.push_back(new Item_return_int("sizeof", 10,MYSQL_TYPE_LONG));
  2322.   if (protocol->send_fields(&field_list, 1))
  2323.     DBUG_RETURN(TRUE);
  2324.   
  2325.   if (thd->transaction.thd_ndb &&
  2326.       ((Thd_ndb*)thd->transaction.thd_ndb)->ndb)
  2327.   {
  2328.     Ndb* ndb= ((Thd_ndb*)thd->transaction.thd_ndb)->ndb;
  2329.     Ndb::Free_list_usage tmp; tmp.m_name= 0;
  2330.     while (ndb->get_free_list_usage(&tmp))
  2331.     {
  2332.       protocol->prepare_for_resend();
  2333.       
  2334.       protocol->store(tmp.m_name, &my_charset_bin);
  2335.       protocol->store((uint)tmp.m_created);
  2336.       protocol->store((uint)tmp.m_free);
  2337.       protocol->store((uint)tmp.m_sizeof);
  2338.       if (protocol->write())
  2339. DBUG_RETURN(TRUE);
  2340.     }
  2341.   }
  2342.   send_eof(thd);
  2343.   
  2344.   DBUG_RETURN(FALSE);
  2345. }
  2346. #endif /* HAVE_NDBCLUSTER_DB */