ha_ndbcluster.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:136k
源码类别:
MySQL数据库
开发平台:
Visual C++
- DBUG_RETURN(res);
- }
- inline
- int ha_ndbcluster::read_range_first_to_buf(const key_range *start_key,
- const key_range *end_key,
- bool eq_range, bool sorted,
- byte* buf)
- {
- KEY* key_info;
- int error= 1;
- DBUG_ENTER("ha_ndbcluster::read_range_first_to_buf");
- DBUG_PRINT("info", ("eq_range: %d, sorted: %d", eq_range, sorted));
- switch (get_index_type(active_index)){
- case PRIMARY_KEY_ORDERED_INDEX:
- case PRIMARY_KEY_INDEX:
- key_info= table->key_info + active_index;
- if (start_key &&
- start_key->length == key_info->key_length &&
- start_key->flag == HA_READ_KEY_EXACT)
- {
- if(m_active_cursor && (error= close_scan()))
- DBUG_RETURN(error);
- error= pk_read(start_key->key, start_key->length, buf);
- DBUG_RETURN(error == HA_ERR_KEY_NOT_FOUND ? HA_ERR_END_OF_FILE : error);
- }
- break;
- case UNIQUE_ORDERED_INDEX:
- case UNIQUE_INDEX:
- key_info= table->key_info + active_index;
- if (start_key && start_key->length == key_info->key_length &&
- start_key->flag == HA_READ_KEY_EXACT &&
- !check_null_in_key(key_info, start_key->key, start_key->length))
- {
- if(m_active_cursor && (error= close_scan()))
- DBUG_RETURN(error);
- error= unique_index_read(start_key->key, start_key->length, buf);
- DBUG_RETURN(error == HA_ERR_KEY_NOT_FOUND ? HA_ERR_END_OF_FILE : error);
- }
- break;
- default:
- break;
- }
- // Start the ordered index scan and fetch the first row
- error= ordered_index_scan(start_key, end_key, sorted, buf);
- DBUG_RETURN(error);
- }
- int ha_ndbcluster::read_range_first(const key_range *start_key,
- const key_range *end_key,
- bool eq_range, bool sorted)
- {
- byte* buf= table->record[0];
- DBUG_ENTER("ha_ndbcluster::read_range_first");
- DBUG_RETURN(read_range_first_to_buf(start_key,
- end_key,
- eq_range,
- sorted,
- buf));
- }
- int ha_ndbcluster::read_range_next()
- {
- DBUG_ENTER("ha_ndbcluster::read_range_next");
- DBUG_RETURN(next_result(table->record[0]));
- }
- int ha_ndbcluster::rnd_init(bool scan)
- {
- NdbResultSet *cursor= m_active_cursor;
- DBUG_ENTER("rnd_init");
- DBUG_PRINT("enter", ("scan: %d", scan));
- // Check if scan is to be restarted
- if (cursor)
- {
- if (!scan)
- DBUG_RETURN(1);
- int res= cursor->restart(m_force_send);
- DBUG_ASSERT(res == 0);
- }
- index_init(table->primary_key);
- DBUG_RETURN(0);
- }
- int ha_ndbcluster::close_scan()
- {
- NdbResultSet *cursor= m_active_cursor;
- NdbConnection *trans= m_active_trans;
- DBUG_ENTER("close_scan");
- if (!cursor)
- DBUG_RETURN(1);
- if (m_ops_pending)
- {
- /*
- Take over any pending transactions to the
- deleteing/updating transaction before closing the scan
- */
- DBUG_PRINT("info", ("ops_pending: %d", m_ops_pending));
- if (execute_no_commit(this,trans) != 0) {
- no_uncommitted_rows_execute_failure();
- DBUG_RETURN(ndb_err(trans));
- }
- m_ops_pending= 0;
- }
- cursor->close(m_force_send);
- m_active_cursor= NULL;
- DBUG_RETURN(0);
- }
- int ha_ndbcluster::rnd_end()
- {
- DBUG_ENTER("rnd_end");
- DBUG_RETURN(close_scan());
- }
- int ha_ndbcluster::rnd_next(byte *buf)
- {
- DBUG_ENTER("rnd_next");
- statistic_increment(ha_read_rnd_next_count, &LOCK_status);
- if (!m_active_cursor)
- DBUG_RETURN(full_table_scan(buf));
- DBUG_RETURN(next_result(buf));
- }
- /*
- An "interesting" record has been found and it's pk
- retrieved by calling position
- Now it's time to read the record from db once
- again
- */
- int ha_ndbcluster::rnd_pos(byte *buf, byte *pos)
- {
- DBUG_ENTER("rnd_pos");
- statistic_increment(ha_read_rnd_count,&LOCK_status);
- // The primary key for the record is stored in pos
- // Perform a pk_read using primary key "index"
- DBUG_RETURN(pk_read(pos, ref_length, buf));
- }
- /*
- Store the primary key of this record in ref
- variable, so that the row can be retrieved again later
- using "reference" in rnd_pos
- */
- void ha_ndbcluster::position(const byte *record)
- {
- KEY *key_info;
- KEY_PART_INFO *key_part;
- KEY_PART_INFO *end;
- byte *buff;
- DBUG_ENTER("position");
- if (table->primary_key != MAX_KEY)
- {
- key_info= table->key_info + table->primary_key;
- key_part= key_info->key_part;
- end= key_part + key_info->key_parts;
- buff= ref;
- for (; key_part != end; key_part++)
- {
- if (key_part->null_bit) {
- /* Store 0 if the key part is a NULL part */
- if (record[key_part->null_offset]
- & key_part->null_bit) {
- *buff++= 1;
- continue;
- }
- *buff++= 0;
- }
- memcpy(buff, record + key_part->offset, key_part->length);
- buff += key_part->length;
- }
- }
- else
- {
- // No primary key, get hidden key
- DBUG_PRINT("info", ("Getting hidden key"));
- int hidden_no= table->fields;
- NdbRecAttr* rec= m_value[hidden_no].rec;
- const NDBTAB *tab= (const NDBTAB *) m_table;
- const NDBCOL *hidden_col= tab->getColumn(hidden_no);
- DBUG_ASSERT(hidden_col->getPrimaryKey() &&
- hidden_col->getAutoIncrement() &&
- rec != NULL &&
- ref_length == NDB_HIDDEN_PRIMARY_KEY_LENGTH);
- memcpy(ref, (const void*)rec->aRef(), ref_length);
- }
- DBUG_DUMP("ref", (char*)ref, ref_length);
- DBUG_VOID_RETURN;
- }
- void ha_ndbcluster::info(uint flag)
- {
- DBUG_ENTER("info");
- DBUG_PRINT("enter", ("flag: %d", flag));
- if (flag & HA_STATUS_POS)
- DBUG_PRINT("info", ("HA_STATUS_POS"));
- if (flag & HA_STATUS_NO_LOCK)
- DBUG_PRINT("info", ("HA_STATUS_NO_LOCK"));
- if (flag & HA_STATUS_TIME)
- DBUG_PRINT("info", ("HA_STATUS_TIME"));
- if (flag & HA_STATUS_VARIABLE)
- {
- DBUG_PRINT("info", ("HA_STATUS_VARIABLE"));
- if (m_table_info)
- {
- if (m_ha_not_exact_count)
- records= 100;
- else
- records_update();
- }
- else
- {
- if ((my_errno= check_ndb_connection()))
- DBUG_VOID_RETURN;
- Ndb *ndb= get_ndb();
- Uint64 rows= 100;
- if (current_thd->variables.ndb_use_exact_count)
- ndb_get_table_statistics(ndb, m_tabname, &rows, 0);
- records= rows;
- }
- }
- if (flag & HA_STATUS_CONST)
- {
- DBUG_PRINT("info", ("HA_STATUS_CONST"));
- set_rec_per_key();
- }
- if (flag & HA_STATUS_ERRKEY)
- {
- DBUG_PRINT("info", ("HA_STATUS_ERRKEY"));
- errkey= m_dupkey;
- }
- if (flag & HA_STATUS_AUTO)
- {
- DBUG_PRINT("info", ("HA_STATUS_AUTO"));
- if (m_table)
- {
- Ndb *ndb= get_ndb();
- auto_increment_value=
- ndb->readAutoIncrementValue((const NDBTAB *) m_table);
- }
- }
- DBUG_VOID_RETURN;
- }
- int ha_ndbcluster::extra(enum ha_extra_function operation)
- {
- DBUG_ENTER("extra");
- switch (operation) {
- case HA_EXTRA_NORMAL: /* Optimize for space (def) */
- DBUG_PRINT("info", ("HA_EXTRA_NORMAL"));
- break;
- case HA_EXTRA_QUICK: /* Optimize for speed */
- DBUG_PRINT("info", ("HA_EXTRA_QUICK"));
- break;
- case HA_EXTRA_RESET: /* Reset database to after open */
- DBUG_PRINT("info", ("HA_EXTRA_RESET"));
- break;
- case HA_EXTRA_CACHE: /* Cash record in HA_rrnd() */
- DBUG_PRINT("info", ("HA_EXTRA_CACHE"));
- break;
- case HA_EXTRA_NO_CACHE: /* End cacheing of records (def) */
- DBUG_PRINT("info", ("HA_EXTRA_NO_CACHE"));
- break;
- case HA_EXTRA_NO_READCHECK: /* No readcheck on update */
- DBUG_PRINT("info", ("HA_EXTRA_NO_READCHECK"));
- break;
- case HA_EXTRA_READCHECK: /* Use readcheck (def) */
- DBUG_PRINT("info", ("HA_EXTRA_READCHECK"));
- break;
- case HA_EXTRA_KEYREAD: /* Read only key to database */
- DBUG_PRINT("info", ("HA_EXTRA_KEYREAD"));
- break;
- case HA_EXTRA_NO_KEYREAD: /* Normal read of records (def) */
- DBUG_PRINT("info", ("HA_EXTRA_NO_KEYREAD"));
- break;
- case HA_EXTRA_NO_USER_CHANGE: /* No user is allowed to write */
- DBUG_PRINT("info", ("HA_EXTRA_NO_USER_CHANGE"));
- break;
- case HA_EXTRA_KEY_CACHE:
- DBUG_PRINT("info", ("HA_EXTRA_KEY_CACHE"));
- break;
- case HA_EXTRA_NO_KEY_CACHE:
- DBUG_PRINT("info", ("HA_EXTRA_NO_KEY_CACHE"));
- break;
- case HA_EXTRA_WAIT_LOCK: /* Wait until file is avalably (def) */
- DBUG_PRINT("info", ("HA_EXTRA_WAIT_LOCK"));
- break;
- case HA_EXTRA_NO_WAIT_LOCK: /* If file is locked, return quickly */
- DBUG_PRINT("info", ("HA_EXTRA_NO_WAIT_LOCK"));
- break;
- case HA_EXTRA_WRITE_CACHE: /* Use write cache in ha_write() */
- DBUG_PRINT("info", ("HA_EXTRA_WRITE_CACHE"));
- break;
- case HA_EXTRA_FLUSH_CACHE: /* flush write_record_cache */
- DBUG_PRINT("info", ("HA_EXTRA_FLUSH_CACHE"));
- break;
- case HA_EXTRA_NO_KEYS: /* Remove all update of keys */
- DBUG_PRINT("info", ("HA_EXTRA_NO_KEYS"));
- break;
- case HA_EXTRA_KEYREAD_CHANGE_POS: /* Keyread, but change pos */
- DBUG_PRINT("info", ("HA_EXTRA_KEYREAD_CHANGE_POS")); /* xxxxchk -r must be used */
- break;
- case HA_EXTRA_REMEMBER_POS: /* Remember pos for next/prev */
- DBUG_PRINT("info", ("HA_EXTRA_REMEMBER_POS"));
- break;
- case HA_EXTRA_RESTORE_POS:
- DBUG_PRINT("info", ("HA_EXTRA_RESTORE_POS"));
- break;
- case HA_EXTRA_REINIT_CACHE: /* init cache from current record */
- DBUG_PRINT("info", ("HA_EXTRA_REINIT_CACHE"));
- break;
- case HA_EXTRA_FORCE_REOPEN: /* Datafile have changed on disk */
- DBUG_PRINT("info", ("HA_EXTRA_FORCE_REOPEN"));
- break;
- case HA_EXTRA_FLUSH: /* Flush tables to disk */
- DBUG_PRINT("info", ("HA_EXTRA_FLUSH"));
- break;
- case HA_EXTRA_NO_ROWS: /* Don't write rows */
- DBUG_PRINT("info", ("HA_EXTRA_NO_ROWS"));
- break;
- case HA_EXTRA_RESET_STATE: /* Reset positions */
- DBUG_PRINT("info", ("HA_EXTRA_RESET_STATE"));
- break;
- case HA_EXTRA_IGNORE_DUP_KEY: /* Dup keys don't rollback everything*/
- DBUG_PRINT("info", ("HA_EXTRA_IGNORE_DUP_KEY"));
- if (current_thd->lex->sql_command == SQLCOM_REPLACE)
- {
- DBUG_PRINT("info", ("Turning ON use of write instead of insert"));
- m_use_write= TRUE;
- } else
- {
- DBUG_PRINT("info", ("Ignoring duplicate key"));
- m_ignore_dup_key= TRUE;
- }
- break;
- case HA_EXTRA_NO_IGNORE_DUP_KEY:
- DBUG_PRINT("info", ("HA_EXTRA_NO_IGNORE_DUP_KEY"));
- DBUG_PRINT("info", ("Turning OFF use of write instead of insert"));
- m_use_write= FALSE;
- m_ignore_dup_key= FALSE;
- break;
- case HA_EXTRA_RETRIEVE_ALL_COLS: /* Retrieve all columns, not just those
- where field->query_id is the same as
- the current query id */
- DBUG_PRINT("info", ("HA_EXTRA_RETRIEVE_ALL_COLS"));
- m_retrieve_all_fields= TRUE;
- break;
- case HA_EXTRA_PREPARE_FOR_DELETE:
- DBUG_PRINT("info", ("HA_EXTRA_PREPARE_FOR_DELETE"));
- break;
- case HA_EXTRA_PREPARE_FOR_UPDATE: /* Remove read cache if problems */
- DBUG_PRINT("info", ("HA_EXTRA_PREPARE_FOR_UPDATE"));
- break;
- case HA_EXTRA_PRELOAD_BUFFER_SIZE:
- DBUG_PRINT("info", ("HA_EXTRA_PRELOAD_BUFFER_SIZE"));
- break;
- case HA_EXTRA_RETRIEVE_PRIMARY_KEY:
- DBUG_PRINT("info", ("HA_EXTRA_RETRIEVE_PRIMARY_KEY"));
- m_retrieve_primary_key= TRUE;
- break;
- case HA_EXTRA_CHANGE_KEY_TO_UNIQUE:
- DBUG_PRINT("info", ("HA_EXTRA_CHANGE_KEY_TO_UNIQUE"));
- break;
- case HA_EXTRA_CHANGE_KEY_TO_DUP:
- DBUG_PRINT("info", ("HA_EXTRA_CHANGE_KEY_TO_DUP"));
- break;
- }
- DBUG_RETURN(0);
- }
- /*
- Start of an insert, remember number of rows to be inserted, it will
- be used in write_row and get_autoincrement to send an optimal number
- of rows in each roundtrip to the server
- SYNOPSIS
- rows number of rows to insert, 0 if unknown
- */
- void ha_ndbcluster::start_bulk_insert(ha_rows rows)
- {
- int bytes, batch;
- const NDBTAB *tab= (const NDBTAB *) m_table;
- DBUG_ENTER("start_bulk_insert");
- DBUG_PRINT("enter", ("rows: %d", (int)rows));
- m_rows_inserted= (ha_rows) 0;
- if (rows == (ha_rows) 0)
- {
- /* We don't know how many will be inserted, guess */
- m_rows_to_insert= m_autoincrement_prefetch;
- }
- else
- m_rows_to_insert= rows;
- /*
- Calculate how many rows that should be inserted
- per roundtrip to NDB. This is done in order to minimize the
- number of roundtrips as much as possible. However performance will
- degrade if too many bytes are inserted, thus it's limited by this
- calculation.
- */
- const int bytesperbatch= 8192;
- bytes= 12 + tab->getRowSizeInBytes() + 4 * tab->getNoOfColumns();
- batch= bytesperbatch/bytes;
- batch= batch == 0 ? 1 : batch;
- DBUG_PRINT("info", ("batch: %d, bytes: %d", batch, bytes));
- m_bulk_insert_rows= batch;
- DBUG_VOID_RETURN;
- }
- /*
- End of an insert
- */
- int ha_ndbcluster::end_bulk_insert()
- {
- int error= 0;
- DBUG_ENTER("end_bulk_insert");
- // Check if last inserts need to be flushed
- if (m_bulk_insert_not_flushed)
- {
- NdbConnection *trans= m_active_trans;
- // Send rows to NDB
- DBUG_PRINT("info", ("Sending inserts to NDB, "
- "rows_inserted:%d, bulk_insert_rows: %d",
- (int) m_rows_inserted, (int) m_bulk_insert_rows));
- m_bulk_insert_not_flushed= FALSE;
- if (execute_no_commit(this,trans) != 0) {
- no_uncommitted_rows_execute_failure();
- my_errno= error= ndb_err(trans);
- }
- }
- m_rows_inserted= (ha_rows) 0;
- m_rows_to_insert= (ha_rows) 1;
- DBUG_RETURN(error);
- }
- int ha_ndbcluster::extra_opt(enum ha_extra_function operation, ulong cache_size)
- {
- DBUG_ENTER("extra_opt");
- DBUG_PRINT("enter", ("cache_size: %lu", cache_size));
- DBUG_RETURN(extra(operation));
- }
- int ha_ndbcluster::reset()
- {
- DBUG_ENTER("reset");
- // Reset what?
- DBUG_RETURN(1);
- }
- static const char *ha_ndb_bas_exts[]= { ha_ndb_ext, NullS };
- const char **ha_ndbcluster::bas_ext() const
- { return ha_ndb_bas_exts; }
- /*
- How many seeks it will take to read through the table
- This is to be comparable to the number returned by records_in_range so
- that we can decide if we should scan the table or use keys.
- */
- double ha_ndbcluster::scan_time()
- {
- DBUG_ENTER("ha_ndbcluster::scan_time()");
- double res= rows2double(records*1000);
- DBUG_PRINT("exit", ("table: %s value: %f",
- m_tabname, res));
- DBUG_RETURN(res);
- }
- /*
- Convert MySQL table locks into locks supported by Ndb Cluster.
- Note that MySQL Cluster does currently not support distributed
- table locks, so to be safe one should set cluster in Single
- User Mode, before relying on table locks when updating tables
- from several MySQL servers
- */
- THR_LOCK_DATA **ha_ndbcluster::store_lock(THD *thd,
- THR_LOCK_DATA **to,
- enum thr_lock_type lock_type)
- {
- DBUG_ENTER("store_lock");
- if (lock_type != TL_IGNORE && m_lock.type == TL_UNLOCK)
- {
- /* If we are not doing a LOCK TABLE, then allow multiple
- writers */
- /* Since NDB does not currently have table locks
- this is treated as a ordinary lock */
- if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
- lock_type <= TL_WRITE) && !thd->in_lock_tables)
- lock_type= TL_WRITE_ALLOW_WRITE;
- /* In queries of type INSERT INTO t1 SELECT ... FROM t2 ...
- MySQL would use the lock TL_READ_NO_INSERT on t2, and that
- would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts
- to t2. Convert the lock to a normal read lock to allow
- concurrent inserts to t2. */
- if (lock_type == TL_READ_NO_INSERT && !thd->in_lock_tables)
- lock_type= TL_READ;
- m_lock.type=lock_type;
- }
- *to++= &m_lock;
- DBUG_PRINT("exit", ("lock_type: %d", lock_type));
- DBUG_RETURN(to);
- }
- #ifndef DBUG_OFF
- #define PRINT_OPTION_FLAGS(t) {
- if (t->options & OPTION_NOT_AUTOCOMMIT)
- DBUG_PRINT("thd->options", ("OPTION_NOT_AUTOCOMMIT"));
- if (t->options & OPTION_BEGIN)
- DBUG_PRINT("thd->options", ("OPTION_BEGIN"));
- if (t->options & OPTION_TABLE_LOCK)
- DBUG_PRINT("thd->options", ("OPTION_TABLE_LOCK"));
- }
- #else
- #define PRINT_OPTION_FLAGS(t)
- #endif
- /*
- As MySQL will execute an external lock for every new table it uses
- we can use this to start the transactions.
- If we are in auto_commit mode we just need to start a transaction
- for the statement, this will be stored in transaction.stmt.
- If not, we have to start a master transaction if there doesn't exist
- one from before, this will be stored in transaction.all
- When a table lock is held one transaction will be started which holds
- the table lock and for each statement a hupp transaction will be started
- If we are locking the table then:
- - save the NdbDictionary::Table for easy access
- - save reference to table statistics
- - refresh list of the indexes for the table if needed (if altered)
- */
- int ha_ndbcluster::external_lock(THD *thd, int lock_type)
- {
- int error=0;
- NdbConnection* trans= NULL;
- DBUG_ENTER("external_lock");
- /*
- Check that this handler instance has a connection
- set up to the Ndb object of thd
- */
- if (check_ndb_connection())
- DBUG_RETURN(1);
- Thd_ndb *thd_ndb= (Thd_ndb*)thd->transaction.thd_ndb;
- Ndb *ndb= thd_ndb->ndb;
- DBUG_PRINT("enter", ("transaction.thd_ndb->lock_count: %d",
- thd_ndb->lock_count));
- if (lock_type != F_UNLCK)
- {
- DBUG_PRINT("info", ("lock_type != F_UNLCK"));
- if (!thd_ndb->lock_count++)
- {
- PRINT_OPTION_FLAGS(thd);
- if (!(thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)))
- {
- // Autocommit transaction
- DBUG_ASSERT(!thd->transaction.stmt.ndb_tid);
- DBUG_PRINT("trans",("Starting transaction stmt"));
- trans= ndb->startTransaction();
- if (trans == NULL)
- ERR_RETURN(ndb->getNdbError());
- no_uncommitted_rows_reset(thd);
- thd->transaction.stmt.ndb_tid= trans;
- }
- else
- {
- if (!thd->transaction.all.ndb_tid)
- {
- // Not autocommit transaction
- // A "master" transaction ha not been started yet
- DBUG_PRINT("trans",("starting transaction, all"));
- trans= ndb->startTransaction();
- if (trans == NULL)
- ERR_RETURN(ndb->getNdbError());
- no_uncommitted_rows_reset(thd);
- /*
- If this is the start of a LOCK TABLE, a table look
- should be taken on the table in NDB
- Check if it should be read or write lock
- */
- if (thd->options & (OPTION_TABLE_LOCK))
- {
- //lockThisTable();
- DBUG_PRINT("info", ("Locking the table..." ));
- }
- thd->transaction.all.ndb_tid= trans;
- }
- }
- }
- /*
- This is the place to make sure this handler instance
- has a started transaction.
- The transaction is started by the first handler on which
- MySQL Server calls external lock
- Other handlers in the same stmt or transaction should use
- the same NDB transaction. This is done by setting up the m_active_trans
- pointer to point to the NDB transaction.
- */
- // store thread specific data first to set the right context
- m_force_send= thd->variables.ndb_force_send;
- m_ha_not_exact_count= !thd->variables.ndb_use_exact_count;
- m_autoincrement_prefetch=
- (ha_rows) thd->variables.ndb_autoincrement_prefetch_sz;
- if (!thd->transaction.on)
- m_transaction_on= FALSE;
- else
- m_transaction_on= thd->variables.ndb_use_transactions;
- // m_use_local_query_cache= thd->variables.ndb_use_local_query_cache;
- m_active_trans= thd->transaction.all.ndb_tid ?
- (NdbConnection*)thd->transaction.all.ndb_tid:
- (NdbConnection*)thd->transaction.stmt.ndb_tid;
- DBUG_ASSERT(m_active_trans);
- // Start of transaction
- m_retrieve_all_fields= FALSE;
- m_retrieve_primary_key= FALSE;
- m_ops_pending= 0;
- {
- NDBDICT *dict= ndb->getDictionary();
- const NDBTAB *tab;
- void *tab_info;
- if (!(tab= dict->getTable(m_tabname, &tab_info)))
- ERR_RETURN(dict->getNdbError());
- DBUG_PRINT("info", ("Table schema version: %d",
- tab->getObjectVersion()));
- // Check if thread has stale local cache
- // New transaction must not use old tables... (trans != 0)
- // Running might...
- if ((trans && tab->getObjectStatus() != NdbDictionary::Object::Retrieved)
- || tab->getObjectStatus() == NdbDictionary::Object::Invalid)
- {
- invalidate_dictionary_cache(FALSE);
- if (!(tab= dict->getTable(m_tabname, &tab_info)))
- ERR_RETURN(dict->getNdbError());
- DBUG_PRINT("info", ("Table schema version: %d",
- tab->getObjectVersion()));
- }
- if (m_table_version < tab->getObjectVersion())
- {
- /*
- The table has been altered, caller has to retry
- */
- NdbError err= ndb->getNdbError(NDB_INVALID_SCHEMA_OBJECT);
- DBUG_RETURN(ndb_to_mysql_error(&err));
- }
- if (m_table != (void *)tab)
- {
- m_table= (void *)tab;
- m_table_version = tab->getObjectVersion();
- if (!(my_errno= build_index_list(ndb, table, ILBP_OPEN)))
- DBUG_RETURN(my_errno);
- }
- m_table_info= tab_info;
- }
- no_uncommitted_rows_init(thd);
- }
- else
- {
- DBUG_PRINT("info", ("lock_type == F_UNLCK"));
- if (!--thd_ndb->lock_count)
- {
- DBUG_PRINT("trans", ("Last external_lock"));
- PRINT_OPTION_FLAGS(thd);
- if (thd->transaction.stmt.ndb_tid)
- {
- /*
- Unlock is done without a transaction commit / rollback.
- This happens if the thread didn't update any rows
- We must in this case close the transaction to release resources
- */
- DBUG_PRINT("trans",("ending non-updating transaction"));
- ndb->closeTransaction(m_active_trans);
- thd->transaction.stmt.ndb_tid= 0;
- }
- }
- m_table_info= NULL;
- /*
- This is the place to make sure this handler instance
- no longer are connected to the active transaction.
- And since the handler is no longer part of the transaction
- it can't have open cursors, ops or blobs pending.
- */
- m_active_trans= NULL;
- if (m_active_cursor)
- DBUG_PRINT("warning", ("m_active_cursor != NULL"));
- m_active_cursor= NULL;
- if (m_blobs_pending)
- DBUG_PRINT("warning", ("blobs_pending != 0"));
- m_blobs_pending= 0;
- if (m_ops_pending)
- DBUG_PRINT("warning", ("ops_pending != 0L"));
- m_ops_pending= 0;
- }
- DBUG_RETURN(error);
- }
- /*
- Start a transaction for running a statement if one is not
- already running in a transaction. This will be the case in
- a BEGIN; COMMIT; block
- When using LOCK TABLE's external_lock will start a transaction
- since ndb does not currently does not support table locking
- */
- int ha_ndbcluster::start_stmt(THD *thd)
- {
- int error=0;
- DBUG_ENTER("start_stmt");
- PRINT_OPTION_FLAGS(thd);
- NdbConnection *trans=
- (thd->transaction.stmt.ndb_tid)
- ? (NdbConnection *)(thd->transaction.stmt.ndb_tid)
- : (NdbConnection *)(thd->transaction.all.ndb_tid);
- if (!trans){
- Ndb *ndb= ((Thd_ndb*)thd->transaction.thd_ndb)->ndb;
- DBUG_PRINT("trans",("Starting transaction stmt"));
- trans= ndb->startTransaction();
- if (trans == NULL)
- ERR_RETURN(ndb->getNdbError());
- no_uncommitted_rows_reset(thd);
- thd->transaction.stmt.ndb_tid= trans;
- }
- m_active_trans= trans;
- // Start of statement
- m_retrieve_all_fields= FALSE;
- m_retrieve_primary_key= FALSE;
- m_ops_pending= 0;
- DBUG_RETURN(error);
- }
- /*
- Commit a transaction started in NDB
- */
- int ndbcluster_commit(THD *thd, void *ndb_transaction)
- {
- int res= 0;
- Ndb *ndb= ((Thd_ndb*)thd->transaction.thd_ndb)->ndb;
- NdbConnection *trans= (NdbConnection*)ndb_transaction;
- DBUG_ENTER("ndbcluster_commit");
- DBUG_PRINT("transaction",("%s",
- trans == thd->transaction.stmt.ndb_tid ?
- "stmt" : "all"));
- DBUG_ASSERT(ndb && trans);
- if (execute_commit(thd,trans) != 0)
- {
- const NdbError err= trans->getNdbError();
- const NdbOperation *error_op= trans->getNdbErrorOperation();
- ERR_PRINT(err);
- res= ndb_to_mysql_error(&err);
- if (res != -1)
- ndbcluster_print_error(res, error_op);
- }
- ndb->closeTransaction(trans);
- DBUG_RETURN(res);
- }
- /*
- Rollback a transaction started in NDB
- */
- int ndbcluster_rollback(THD *thd, void *ndb_transaction)
- {
- int res= 0;
- Ndb *ndb= ((Thd_ndb*)thd->transaction.thd_ndb)->ndb;
- NdbConnection *trans= (NdbConnection*)ndb_transaction;
- DBUG_ENTER("ndbcluster_rollback");
- DBUG_PRINT("transaction",("%s",
- trans == thd->transaction.stmt.ndb_tid ?
- "stmt" : "all"));
- DBUG_ASSERT(ndb && trans);
- if (trans->execute(Rollback) != 0)
- {
- const NdbError err= trans->getNdbError();
- const NdbOperation *error_op= trans->getNdbErrorOperation();
- ERR_PRINT(err);
- res= ndb_to_mysql_error(&err);
- if (res != -1)
- ndbcluster_print_error(res, error_op);
- }
- ndb->closeTransaction(trans);
- DBUG_RETURN(0);
- }
- /*
- Define NDB column based on Field.
- Returns 0 or mysql error code.
- Not member of ha_ndbcluster because NDBCOL cannot be declared.
- */
- static int create_ndb_column(NDBCOL &col,
- Field *field,
- HA_CREATE_INFO *info)
- {
- // Set name
- {
- char truncated_field_name[NDB_MAX_ATTR_NAME_SIZE];
- strnmov(truncated_field_name,field->field_name,sizeof(truncated_field_name));
- truncated_field_name[sizeof(truncated_field_name)-1]= ' ';
- col.setName(truncated_field_name);
- }
- // Get char set
- CHARSET_INFO *cs= field->charset();
- // Set type and sizes
- const enum enum_field_types mysql_type= field->real_type();
- switch (mysql_type) {
- // Numeric types
- case MYSQL_TYPE_TINY:
- if (field->flags & UNSIGNED_FLAG)
- col.setType(NDBCOL::Tinyunsigned);
- else
- col.setType(NDBCOL::Tinyint);
- col.setLength(1);
- break;
- case MYSQL_TYPE_SHORT:
- if (field->flags & UNSIGNED_FLAG)
- col.setType(NDBCOL::Smallunsigned);
- else
- col.setType(NDBCOL::Smallint);
- col.setLength(1);
- break;
- case MYSQL_TYPE_LONG:
- if (field->flags & UNSIGNED_FLAG)
- col.setType(NDBCOL::Unsigned);
- else
- col.setType(NDBCOL::Int);
- col.setLength(1);
- break;
- case MYSQL_TYPE_INT24:
- if (field->flags & UNSIGNED_FLAG)
- col.setType(NDBCOL::Mediumunsigned);
- else
- col.setType(NDBCOL::Mediumint);
- col.setLength(1);
- break;
- case MYSQL_TYPE_LONGLONG:
- if (field->flags & UNSIGNED_FLAG)
- col.setType(NDBCOL::Bigunsigned);
- else
- col.setType(NDBCOL::Bigint);
- col.setLength(1);
- break;
- case MYSQL_TYPE_FLOAT:
- col.setType(NDBCOL::Float);
- col.setLength(1);
- break;
- case MYSQL_TYPE_DOUBLE:
- col.setType(NDBCOL::Double);
- col.setLength(1);
- break;
- case MYSQL_TYPE_DECIMAL:
- {
- Field_decimal *f= (Field_decimal*)field;
- uint precision= f->pack_length();
- uint scale= f->decimals();
- if (field->flags & UNSIGNED_FLAG)
- {
- col.setType(NDBCOL::Olddecimalunsigned);
- precision-= (scale > 0);
- }
- else
- {
- col.setType(NDBCOL::Olddecimal);
- precision-= 1 + (scale > 0);
- }
- col.setPrecision(precision);
- col.setScale(scale);
- col.setLength(1);
- }
- break;
- // Date types
- case MYSQL_TYPE_DATETIME:
- col.setType(NDBCOL::Datetime);
- col.setLength(1);
- break;
- case MYSQL_TYPE_DATE: // ?
- col.setType(NDBCOL::Char);
- col.setLength(field->pack_length());
- break;
- case MYSQL_TYPE_NEWDATE:
- col.setType(NDBCOL::Date);
- col.setLength(1);
- break;
- case MYSQL_TYPE_TIME:
- col.setType(NDBCOL::Time);
- col.setLength(1);
- break;
- case MYSQL_TYPE_YEAR:
- col.setType(NDBCOL::Year);
- col.setLength(1);
- break;
- case MYSQL_TYPE_TIMESTAMP:
- col.setType(NDBCOL::Timestamp);
- col.setLength(1);
- break;
- // Char types
- case MYSQL_TYPE_STRING:
- if (field->flags & BINARY_FLAG)
- col.setType(NDBCOL::Binary);
- else {
- col.setType(NDBCOL::Char);
- col.setCharset(cs);
- }
- if (field->pack_length() == 0)
- col.setLength(1); // currently ndb does not support size 0
- else
- col.setLength(field->pack_length());
- break;
- case MYSQL_TYPE_VAR_STRING:
- if (field->flags & BINARY_FLAG)
- col.setType(NDBCOL::Varbinary);
- else {
- col.setType(NDBCOL::Varchar);
- col.setCharset(cs);
- }
- col.setLength(field->pack_length());
- break;
- // Blob types (all come in as MYSQL_TYPE_BLOB)
- mysql_type_tiny_blob:
- case MYSQL_TYPE_TINY_BLOB:
- if (field->flags & BINARY_FLAG)
- col.setType(NDBCOL::Blob);
- else {
- col.setType(NDBCOL::Text);
- col.setCharset(cs);
- }
- col.setInlineSize(256);
- // No parts
- col.setPartSize(0);
- col.setStripeSize(0);
- break;
- mysql_type_blob:
- case MYSQL_TYPE_BLOB:
- if (field->flags & BINARY_FLAG)
- col.setType(NDBCOL::Blob);
- else {
- col.setType(NDBCOL::Text);
- col.setCharset(cs);
- }
- // Use "<=" even if "<" is the exact condition
- if (field->max_length() <= (1 << 8))
- goto mysql_type_tiny_blob;
- else if (field->max_length() <= (1 << 16))
- {
- col.setInlineSize(256);
- col.setPartSize(2000);
- col.setStripeSize(16);
- }
- else if (field->max_length() <= (1 << 24))
- goto mysql_type_medium_blob;
- else
- goto mysql_type_long_blob;
- break;
- mysql_type_medium_blob:
- case MYSQL_TYPE_MEDIUM_BLOB:
- if (field->flags & BINARY_FLAG)
- col.setType(NDBCOL::Blob);
- else {
- col.setType(NDBCOL::Text);
- col.setCharset(cs);
- }
- col.setInlineSize(256);
- col.setPartSize(4000);
- col.setStripeSize(8);
- break;
- mysql_type_long_blob:
- case MYSQL_TYPE_LONG_BLOB:
- if (field->flags & BINARY_FLAG)
- col.setType(NDBCOL::Blob);
- else {
- col.setType(NDBCOL::Text);
- col.setCharset(cs);
- }
- col.setInlineSize(256);
- col.setPartSize(8000);
- col.setStripeSize(4);
- break;
- // Other types
- case MYSQL_TYPE_ENUM:
- col.setType(NDBCOL::Char);
- col.setLength(field->pack_length());
- break;
- case MYSQL_TYPE_SET:
- col.setType(NDBCOL::Char);
- col.setLength(field->pack_length());
- break;
- case MYSQL_TYPE_NULL:
- case MYSQL_TYPE_GEOMETRY:
- goto mysql_type_unsupported;
- mysql_type_unsupported:
- default:
- return HA_ERR_UNSUPPORTED;
- }
- // Set nullable and pk
- col.setNullable(field->maybe_null());
- col.setPrimaryKey(field->flags & PRI_KEY_FLAG);
- // Set autoincrement
- if (field->flags & AUTO_INCREMENT_FLAG)
- {
- col.setAutoIncrement(TRUE);
- ulonglong value= info->auto_increment_value ?
- info->auto_increment_value : (ulonglong) 1;
- DBUG_PRINT("info", ("Autoincrement key, initial: %llu", value));
- col.setAutoIncrementInitialValue(value);
- }
- else
- col.setAutoIncrement(FALSE);
- return 0;
- }
- /*
- Create a table in NDB Cluster
- */
- static void ndb_set_fragmentation(NDBTAB &tab, TABLE *form, uint pk_length)
- {
- if (form->max_rows == (ha_rows) 0) /* default setting, don't set fragmentation */
- return;
- /**
- * get the number of fragments right
- */
- uint no_fragments;
- {
- #if MYSQL_VERSION_ID >= 50000
- uint acc_row_size= 25 + /*safety margin*/ 2;
- #else
- uint acc_row_size= pk_length*4;
- /* add acc overhead */
- if (pk_length <= 8) /* main page will set the limit */
- acc_row_size+= 25 + /*safety margin*/ 2;
- else /* overflow page will set the limit */
- acc_row_size+= 4 + /*safety margin*/ 4;
- #endif
- ulonglong acc_fragment_size= 512*1024*1024;
- ulonglong max_rows= form->max_rows;
- #if MYSQL_VERSION_ID >= 50100
- no_fragments= (max_rows*acc_row_size)/acc_fragment_size+1;
- #else
- no_fragments= ((max_rows*acc_row_size)/acc_fragment_size+1
- +1/*correct rounding*/)/2;
- #endif
- }
- {
- uint no_nodes= g_ndb_cluster_connection->no_db_nodes();
- NDBTAB::FragmentType ftype;
- if (no_fragments > 2*no_nodes)
- {
- ftype= NDBTAB::FragAllLarge;
- if (no_fragments > 4*no_nodes)
- push_warning(current_thd, MYSQL_ERROR::WARN_LEVEL_WARN, ER_UNKNOWN_ERROR,
- "Ndb might have problems storing the max amount of rows specified");
- }
- else if (no_fragments > no_nodes)
- ftype= NDBTAB::FragAllMedium;
- else
- ftype= NDBTAB::FragAllSmall;
- tab.setFragmentType(ftype);
- }
- }
- int ha_ndbcluster::create(const char *name,
- TABLE *form,
- HA_CREATE_INFO *info)
- {
- NDBTAB tab;
- NDBCOL col;
- uint pack_length, length, i, pk_length= 0;
- const void *data, *pack_data;
- const char **key_names= form->keynames.type_names;
- char name2[FN_HEADLEN];
- bool create_from_engine= (info->table_options & HA_OPTION_CREATE_FROM_ENGINE);
- DBUG_ENTER("create");
- DBUG_PRINT("enter", ("name: %s", name));
- fn_format(name2, name, "", "",2); // Remove the .frm extension
- set_dbname(name2);
- set_tabname(name2);
- if (create_from_engine)
- {
- /*
- Table alreay exists in NDB and frm file has been created by
- caller.
- Do Ndb specific stuff, such as create a .ndb file
- */
- my_errno= write_ndb_file();
- DBUG_RETURN(my_errno);
- }
- DBUG_PRINT("table", ("name: %s", m_tabname));
- tab.setName(m_tabname);
- tab.setLogging(!(info->options & HA_LEX_CREATE_TMP_TABLE));
- // Save frm data for this table
- if (readfrm(name, &data, &length))
- DBUG_RETURN(1);
- if (packfrm(data, length, &pack_data, &pack_length))
- DBUG_RETURN(2);
- DBUG_PRINT("info", ("setFrm data=%x, len=%d", pack_data, pack_length));
- tab.setFrm(pack_data, pack_length);
- my_free((char*)data, MYF(0));
- my_free((char*)pack_data, MYF(0));
- for (i= 0; i < form->fields; i++)
- {
- Field *field= form->field[i];
- DBUG_PRINT("info", ("name: %s, type: %u, pack_length: %d",
- field->field_name, field->real_type(),
- field->pack_length()));
- if ((my_errno= create_ndb_column(col, field, info)))
- DBUG_RETURN(my_errno);
- tab.addColumn(col);
- if(col.getPrimaryKey())
- pk_length += (field->pack_length() + 3) / 4;
- }
- // No primary key, create shadow key as 64 bit, auto increment
- if (form->primary_key == MAX_KEY)
- {
- DBUG_PRINT("info", ("Generating shadow key"));
- col.setName("$PK");
- col.setType(NdbDictionary::Column::Bigunsigned);
- col.setLength(1);
- col.setNullable(FALSE);
- col.setPrimaryKey(TRUE);
- col.setAutoIncrement(TRUE);
- tab.addColumn(col);
- pk_length += 2;
- }
- // Make sure that blob tables don't have to big part size
- for (i= 0; i < form->fields; i++)
- {
- /**
- * The extra +7 concists
- * 2 - words from pk in blob table
- * 5 - from extra words added by tup/dict??
- */
- switch (form->field[i]->real_type()) {
- case MYSQL_TYPE_BLOB:
- case MYSQL_TYPE_MEDIUM_BLOB:
- case MYSQL_TYPE_LONG_BLOB:
- {
- NdbDictionary::Column * col = tab.getColumn(i);
- int size = pk_length + (col->getPartSize()+3)/4 + 7;
- if(size > NDB_MAX_TUPLE_SIZE_IN_WORDS &&
- (pk_length+7) < NDB_MAX_TUPLE_SIZE_IN_WORDS)
- {
- size = NDB_MAX_TUPLE_SIZE_IN_WORDS - pk_length - 7;
- col->setPartSize(4*size);
- }
- /**
- * If size > NDB_MAX and pk_length+7 >= NDB_MAX
- * then the table can't be created anyway, so skip
- * changing part size, and have error later
- */
- }
- default:
- break;
- }
- }
- ndb_set_fragmentation(tab, form, pk_length);
- if ((my_errno= check_ndb_connection()))
- DBUG_RETURN(my_errno);
- // Create the table in NDB
- Ndb *ndb= get_ndb();
- NDBDICT *dict= ndb->getDictionary();
- if (dict->createTable(tab) != 0)
- {
- const NdbError err= dict->getNdbError();
- ERR_PRINT(err);
- my_errno= ndb_to_mysql_error(&err);
- DBUG_RETURN(my_errno);
- }
- DBUG_PRINT("info", ("Table %s/%s created successfully",
- m_dbname, m_tabname));
- // Create secondary indexes
- my_errno= build_index_list(ndb, form, ILBP_CREATE);
- if (!my_errno)
- my_errno= write_ndb_file();
- DBUG_RETURN(my_errno);
- }
- int ha_ndbcluster::create_ordered_index(const char *name,
- KEY *key_info)
- {
- DBUG_ENTER("create_ordered_index");
- DBUG_RETURN(create_index(name, key_info, FALSE));
- }
- int ha_ndbcluster::create_unique_index(const char *name,
- KEY *key_info)
- {
- DBUG_ENTER("create_unique_index");
- DBUG_RETURN(create_index(name, key_info, TRUE));
- }
- /*
- Create an index in NDB Cluster
- */
- int ha_ndbcluster::create_index(const char *name,
- KEY *key_info,
- bool unique)
- {
- Ndb *ndb= get_ndb();
- NdbDictionary::Dictionary *dict= ndb->getDictionary();
- KEY_PART_INFO *key_part= key_info->key_part;
- KEY_PART_INFO *end= key_part + key_info->key_parts;
- DBUG_ENTER("create_index");
- DBUG_PRINT("enter", ("name: %s ", name));
- NdbDictionary::Index ndb_index(name);
- if (unique)
- ndb_index.setType(NdbDictionary::Index::UniqueHashIndex);
- else
- {
- ndb_index.setType(NdbDictionary::Index::OrderedIndex);
- // TODO Only temporary ordered indexes supported
- ndb_index.setLogging(FALSE);
- }
- ndb_index.setTable(m_tabname);
- for (; key_part != end; key_part++)
- {
- Field *field= key_part->field;
- DBUG_PRINT("info", ("attr: %s", field->field_name));
- {
- char truncated_field_name[NDB_MAX_ATTR_NAME_SIZE];
- strnmov(truncated_field_name,field->field_name,sizeof(truncated_field_name));
- truncated_field_name[sizeof(truncated_field_name)-1]= ' ';
- ndb_index.addColumnName(truncated_field_name);
- }
- }
- if (dict->createIndex(ndb_index))
- ERR_RETURN(dict->getNdbError());
- // Success
- DBUG_PRINT("info", ("Created index %s", name));
- DBUG_RETURN(0);
- }
- /*
- Rename a table in NDB Cluster
- */
- int ha_ndbcluster::rename_table(const char *from, const char *to)
- {
- NDBDICT *dict;
- char new_tabname[FN_HEADLEN];
- const NDBTAB *orig_tab;
- int result;
- DBUG_ENTER("ha_ndbcluster::rename_table");
- DBUG_PRINT("info", ("Renaming %s to %s", from, to));
- set_dbname(from);
- set_tabname(from);
- set_tabname(to, new_tabname);
- if (check_ndb_connection())
- DBUG_RETURN(my_errno= HA_ERR_NO_CONNECTION);
- Ndb *ndb= get_ndb();
- dict= ndb->getDictionary();
- if (!(orig_tab= dict->getTable(m_tabname)))
- ERR_RETURN(dict->getNdbError());
- // Check if thread has stale local cache
- if (orig_tab->getObjectStatus() == NdbDictionary::Object::Invalid)
- {
- dict->removeCachedTable(m_tabname);
- if (!(orig_tab= dict->getTable(m_tabname)))
- ERR_RETURN(dict->getNdbError());
- }
- m_table= (void *)orig_tab;
- // Change current database to that of target table
- set_dbname(to);
- ndb->setDatabaseName(m_dbname);
- if (!(result= alter_table_name(new_tabname)))
- {
- // Rename .ndb file
- result= handler::rename_table(from, to);
- }
- DBUG_RETURN(result);
- }
- /*
- Rename a table in NDB Cluster using alter table
- */
- int ha_ndbcluster::alter_table_name(const char *to)
- {
- Ndb *ndb= get_ndb();
- NDBDICT *dict= ndb->getDictionary();
- const NDBTAB *orig_tab= (const NDBTAB *) m_table;
- int ret;
- DBUG_ENTER("alter_table_name_table");
- NdbDictionary::Table new_tab= *orig_tab;
- new_tab.setName(to);
- if (dict->alterTable(new_tab) != 0)
- ERR_RETURN(dict->getNdbError());
- m_table= NULL;
- m_table_info= NULL;
- DBUG_RETURN(0);
- }
- /*
- Delete a table from NDB Cluster
- */
- int ha_ndbcluster::delete_table(const char *name)
- {
- DBUG_ENTER("delete_table");
- DBUG_PRINT("enter", ("name: %s", name));
- set_dbname(name);
- set_tabname(name);
- if (check_ndb_connection())
- DBUG_RETURN(HA_ERR_NO_CONNECTION);
- // Remove .ndb file
- handler::delete_table(name);
- DBUG_RETURN(drop_table());
- }
- /*
- Drop a table in NDB Cluster
- */
- int ha_ndbcluster::drop_table()
- {
- Ndb *ndb= get_ndb();
- NdbDictionary::Dictionary *dict= ndb->getDictionary();
- DBUG_ENTER("drop_table");
- DBUG_PRINT("enter", ("Deleting %s", m_tabname));
- if (dict->dropTable(m_tabname))
- {
- const NdbError err= dict->getNdbError();
- if (err.code == 709)
- ; // 709: No such table existed
- else
- ERR_RETURN(dict->getNdbError());
- }
- release_metadata();
- DBUG_RETURN(0);
- }
- longlong ha_ndbcluster::get_auto_increment()
- {
- DBUG_ENTER("get_auto_increment");
- DBUG_PRINT("enter", ("m_tabname: %s", m_tabname));
- Ndb *ndb= get_ndb();
- if (m_rows_inserted > m_rows_to_insert)
- {
- /* We guessed too low */
- m_rows_to_insert+= m_autoincrement_prefetch;
- }
- int cache_size=
- (int)
- (m_rows_to_insert - m_rows_inserted < m_autoincrement_prefetch) ?
- m_rows_to_insert - m_rows_inserted
- : (m_rows_to_insert > m_autoincrement_prefetch) ?
- m_rows_to_insert
- : m_autoincrement_prefetch;
- Uint64 auto_value= NDB_FAILED_AUTO_INCREMENT;
- uint retries= NDB_AUTO_INCREMENT_RETRIES;
- do {
- auto_value=
- (m_skip_auto_increment) ?
- ndb->readAutoIncrementValue((const NDBTAB *) m_table)
- : ndb->getAutoIncrementValue((const NDBTAB *) m_table, cache_size);
- } while (auto_value == NDB_FAILED_AUTO_INCREMENT &&
- --retries &&
- ndb->getNdbError().status == NdbError::TemporaryError);
- if (auto_value == NDB_FAILED_AUTO_INCREMENT)
- {
- const NdbError err= ndb->getNdbError();
- sql_print_error("Error %lu in ::get_auto_increment(): %s",
- (ulong) err.code, err.message);
- DBUG_RETURN(~(ulonglong) 0);
- }
- DBUG_RETURN((longlong)auto_value);
- }
- /*
- Constructor for the NDB Cluster table handler
- */
- ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
- handler(table_arg),
- m_active_trans(NULL),
- m_active_cursor(NULL),
- m_table(NULL),
- m_table_version(-1),
- m_table_info(NULL),
- m_table_flags(HA_REC_NOT_IN_SEQ |
- HA_NULL_IN_KEY |
- HA_AUTO_PART_KEY |
- HA_NO_PREFIX_CHAR_KEYS),
- m_share(0),
- m_use_write(FALSE),
- m_ignore_dup_key(FALSE),
- m_primary_key_update(FALSE),
- m_retrieve_all_fields(FALSE),
- m_retrieve_primary_key(FALSE),
- m_rows_to_insert((ha_rows) 1),
- m_rows_inserted((ha_rows) 0),
- m_bulk_insert_rows((ha_rows) 1024),
- m_bulk_insert_not_flushed(FALSE),
- m_ops_pending(0),
- m_skip_auto_increment(TRUE),
- m_blobs_pending(0),
- m_blobs_buffer(0),
- m_blobs_buffer_size(0),
- m_dupkey((uint) -1),
- m_ha_not_exact_count(FALSE),
- m_force_send(TRUE),
- m_autoincrement_prefetch((ha_rows) 32),
- m_transaction_on(TRUE),
- m_use_local_query_cache(FALSE)
- {
- int i;
- DBUG_ENTER("ha_ndbcluster");
- m_tabname[0]= ' ';
- m_dbname[0]= ' ';
- records= ~(ha_rows)0; // uninitialized
- block_size= 1024;
- for (i= 0; i < MAX_KEY; i++)
- {
- m_index[i].type= UNDEFINED_INDEX;
- m_index[i].unique_index= NULL;
- m_index[i].index= NULL;
- m_index[i].unique_index_attrid_map= NULL;
- }
- DBUG_VOID_RETURN;
- }
- /*
- Destructor for NDB Cluster table handler
- */
- ha_ndbcluster::~ha_ndbcluster()
- {
- DBUG_ENTER("~ha_ndbcluster");
- if (m_share)
- free_share(m_share);
- release_metadata();
- my_free(m_blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
- m_blobs_buffer= 0;
- // Check for open cursor/transaction
- if (m_active_cursor) {
- }
- DBUG_ASSERT(m_active_cursor == NULL);
- if (m_active_trans) {
- }
- DBUG_ASSERT(m_active_trans == NULL);
- DBUG_VOID_RETURN;
- }
- /*
- Open a table for further use
- - fetch metadata for this table from NDB
- - check that table exists
- */
- int ha_ndbcluster::open(const char *name, int mode, uint test_if_locked)
- {
- int res;
- KEY *key;
- DBUG_ENTER("open");
- DBUG_PRINT("enter", ("name: %s mode: %d test_if_locked: %d",
- name, mode, test_if_locked));
- // Setup ref_length to make room for the whole
- // primary key to be written in the ref variable
- if (table->primary_key != MAX_KEY)
- {
- key= table->key_info+table->primary_key;
- ref_length= key->key_length;
- DBUG_PRINT("info", (" ref_length: %d", ref_length));
- }
- // Init table lock structure
- if (!(m_share=get_share(name)))
- DBUG_RETURN(1);
- thr_lock_data_init(&m_share->lock,&m_lock,(void*) 0);
- set_dbname(name);
- set_tabname(name);
- if (check_ndb_connection()) {
- free_share(m_share); m_share= 0;
- DBUG_RETURN(HA_ERR_NO_CONNECTION);
- }
- res= get_metadata(name);
- if (!res)
- info(HA_STATUS_VARIABLE | HA_STATUS_CONST);
- DBUG_RETURN(res);
- }
- /*
- Close the table
- - release resources setup by open()
- */
- int ha_ndbcluster::close(void)
- {
- DBUG_ENTER("close");
- free_share(m_share); m_share= 0;
- release_metadata();
- DBUG_RETURN(0);
- }
- Thd_ndb* ha_ndbcluster::seize_thd_ndb()
- {
- Thd_ndb *thd_ndb;
- DBUG_ENTER("seize_thd_ndb");
- thd_ndb= new Thd_ndb();
- thd_ndb->ndb->getDictionary()->set_local_table_data_size(
- sizeof(Ndb_local_table_statistics)
- );
- if (thd_ndb->ndb->init(max_transactions) != 0)
- {
- ERR_PRINT(thd_ndb->ndb->getNdbError());
- /*
- TODO
- Alt.1 If init fails because to many allocated Ndb
- wait on condition for a Ndb object to be released.
- Alt.2 Seize/release from pool, wait until next release
- */
- delete thd_ndb;
- thd_ndb= NULL;
- }
- DBUG_RETURN(thd_ndb);
- }
- void ha_ndbcluster::release_thd_ndb(Thd_ndb* thd_ndb)
- {
- DBUG_ENTER("release_thd_ndb");
- delete thd_ndb;
- DBUG_VOID_RETURN;
- }
- /*
- If this thread already has a Thd_ndb object allocated
- in current THD, reuse it. Otherwise
- seize a Thd_ndb object, assign it to current THD and use it.
- */
- Ndb* check_ndb_in_thd(THD* thd)
- {
- DBUG_ENTER("check_ndb_in_thd");
- Thd_ndb *thd_ndb= (Thd_ndb*)thd->transaction.thd_ndb;
- if (!thd_ndb)
- {
- if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb()))
- DBUG_RETURN(NULL);
- thd->transaction.thd_ndb= thd_ndb;
- }
- DBUG_RETURN(thd_ndb->ndb);
- }
- int ha_ndbcluster::check_ndb_connection()
- {
- THD* thd= current_thd;
- Ndb *ndb;
- DBUG_ENTER("check_ndb_connection");
- if (!(ndb= check_ndb_in_thd(thd)))
- DBUG_RETURN(HA_ERR_NO_CONNECTION);
- ndb->setDatabaseName(m_dbname);
- DBUG_RETURN(0);
- }
- void ndbcluster_close_connection(THD *thd)
- {
- Thd_ndb *thd_ndb= (Thd_ndb*)thd->transaction.thd_ndb;
- DBUG_ENTER("ndbcluster_close_connection");
- if (thd_ndb)
- {
- ha_ndbcluster::release_thd_ndb(thd_ndb);
- thd->transaction.thd_ndb= NULL;
- }
- DBUG_VOID_RETURN;
- }
- /*
- Try to discover one table from NDB
- */
- int ndbcluster_discover(THD* thd, const char *db, const char *name,
- const void** frmblob, uint* frmlen)
- {
- uint len;
- const void* data;
- const NDBTAB* tab;
- Ndb* ndb;
- DBUG_ENTER("ndbcluster_discover");
- DBUG_PRINT("enter", ("db: %s, name: %s", db, name));
- if (!(ndb= check_ndb_in_thd(thd)))
- DBUG_RETURN(HA_ERR_NO_CONNECTION);
- ndb->setDatabaseName(db);
- NDBDICT* dict= ndb->getDictionary();
- dict->set_local_table_data_size(sizeof(Ndb_local_table_statistics));
- dict->invalidateTable(name);
- if (!(tab= dict->getTable(name)))
- {
- const NdbError err= dict->getNdbError();
- if (err.code == 709)
- DBUG_RETURN(-1);
- ERR_RETURN(err);
- }
- DBUG_PRINT("info", ("Found table %s", tab->getName()));
- len= tab->getFrmLength();
- if (len == 0 || tab->getFrmData() == NULL)
- {
- DBUG_PRINT("error", ("No frm data found."));
- DBUG_RETURN(1);
- }
- if (unpackfrm(&data, &len, tab->getFrmData()))
- {
- DBUG_PRINT("error", ("Could not unpack table"));
- DBUG_RETURN(1);
- }
- *frmlen= len;
- *frmblob= data;
- DBUG_RETURN(0);
- }
- /*
- Check if a table exists in NDB
- */
- int ndbcluster_table_exists_in_engine(THD* thd, const char *db, const char *name)
- {
- uint len;
- const void* data;
- const NDBTAB* tab;
- Ndb* ndb;
- DBUG_ENTER("ndbcluster_table_exists_in_engine");
- DBUG_PRINT("enter", ("db: %s, name: %s", db, name));
- if (!(ndb= check_ndb_in_thd(thd)))
- DBUG_RETURN(HA_ERR_NO_CONNECTION);
- ndb->setDatabaseName(db);
- NDBDICT* dict= ndb->getDictionary();
- dict->set_local_table_data_size(sizeof(Ndb_local_table_statistics));
- dict->invalidateTable(name);
- if (!(tab= dict->getTable(name)))
- {
- const NdbError err= dict->getNdbError();
- if (err.code == 709)
- DBUG_RETURN(0);
- ERR_RETURN(err);
- }
- DBUG_PRINT("info", ("Found table %s", tab->getName()));
- DBUG_RETURN(1);
- }
- extern "C" byte* tables_get_key(const char *entry, uint *length,
- my_bool not_used __attribute__((unused)))
- {
- *length= strlen(entry);
- return (byte*) entry;
- }
- /*
- Drop a database in NDB Cluster
- */
- int ndbcluster_drop_database(const char *path)
- {
- DBUG_ENTER("ndbcluster_drop_database");
- THD *thd= current_thd;
- char dbname[FN_HEADLEN];
- Ndb* ndb;
- NdbDictionary::Dictionary::List list;
- uint i;
- char *tabname;
- List<char> drop_list;
- int ret= 0;
- ha_ndbcluster::set_dbname(path, (char *)&dbname);
- DBUG_PRINT("enter", ("db: %s", dbname));
- if (!(ndb= check_ndb_in_thd(thd)))
- DBUG_RETURN(HA_ERR_NO_CONNECTION);
- // List tables in NDB
- NDBDICT *dict= ndb->getDictionary();
- if (dict->listObjects(list,
- NdbDictionary::Object::UserTable) != 0)
- ERR_RETURN(dict->getNdbError());
- for (i= 0 ; i < list.count ; i++)
- {
- NdbDictionary::Dictionary::List::Element& t= list.elements[i];
- DBUG_PRINT("info", ("Found %s/%s in NDB", t.database, t.name));
- // Add only tables that belongs to db
- if (my_strcasecmp(system_charset_info, t.database, dbname))
- continue;
- DBUG_PRINT("info", ("%s must be dropped", t.name));
- drop_list.push_back(thd->strdup(t.name));
- }
- // Drop any tables belonging to database
- ndb->setDatabaseName(dbname);
- List_iterator_fast<char> it(drop_list);
- while ((tabname=it++))
- {
- if (dict->dropTable(tabname))
- {
- const NdbError err= dict->getNdbError();
- if (err.code != 709)
- {
- ERR_PRINT(err);
- ret= ndb_to_mysql_error(&err);
- }
- }
- }
- DBUG_RETURN(ret);
- }
- int ndbcluster_find_files(THD *thd,const char *db,const char *path,
- const char *wild, bool dir, List<char> *files)
- {
- DBUG_ENTER("ndbcluster_find_files");
- DBUG_PRINT("enter", ("db: %s", db));
- { // extra bracket to avoid gcc 2.95.3 warning
- uint i;
- Ndb* ndb;
- char name[FN_REFLEN];
- HASH ndb_tables, ok_tables;
- NdbDictionary::Dictionary::List list;
- if (!(ndb= check_ndb_in_thd(thd)))
- DBUG_RETURN(HA_ERR_NO_CONNECTION);
- if (dir)
- DBUG_RETURN(0); // Discover of databases not yet supported
- // List tables in NDB
- NDBDICT *dict= ndb->getDictionary();
- if (dict->listObjects(list,
- NdbDictionary::Object::UserTable) != 0)
- ERR_RETURN(dict->getNdbError());
- if (hash_init(&ndb_tables, system_charset_info,list.count,0,0,
- (hash_get_key)tables_get_key,0,0))
- {
- DBUG_PRINT("error", ("Failed to init HASH ndb_tables"));
- DBUG_RETURN(-1);
- }
- if (hash_init(&ok_tables, system_charset_info,32,0,0,
- (hash_get_key)tables_get_key,0,0))
- {
- DBUG_PRINT("error", ("Failed to init HASH ok_tables"));
- hash_free(&ndb_tables);
- DBUG_RETURN(-1);
- }
- for (i= 0 ; i < list.count ; i++)
- {
- NdbDictionary::Dictionary::List::Element& t= list.elements[i];
- DBUG_PRINT("info", ("Found %s/%s in NDB", t.database, t.name));
- // Add only tables that belongs to db
- if (my_strcasecmp(system_charset_info, t.database, db))
- continue;
- // Apply wildcard to list of tables in NDB
- if (wild)
- {
- if (lower_case_table_names)
- {
- if (wild_case_compare(files_charset_info, t.name, wild))
- continue;
- }
- else if (wild_compare(t.name,wild,0))
- continue;
- }
- DBUG_PRINT("info", ("Inserting %s into ndb_tables hash", t.name));
- my_hash_insert(&ndb_tables, (byte*)thd->strdup(t.name));
- }
- char *file_name;
- List_iterator<char> it(*files);
- List<char> delete_list;
- while ((file_name=it++))
- {
- DBUG_PRINT("info", ("%s", file_name));
- if (hash_search(&ndb_tables, file_name, strlen(file_name)))
- {
- DBUG_PRINT("info", ("%s existed in NDB _and_ on disk ", file_name));
- // File existed in NDB and as frm file, put in ok_tables list
- my_hash_insert(&ok_tables, (byte*)file_name);
- continue;
- }
- // File is not in NDB, check for .ndb file with this name
- (void)strxnmov(name, FN_REFLEN,
- mysql_data_home,"/",db,"/",file_name,ha_ndb_ext,NullS);
- DBUG_PRINT("info", ("Check access for %s", name));
- if (access(name, F_OK))
- {
- DBUG_PRINT("info", ("%s did not exist on disk", name));
- // .ndb file did not exist on disk, another table type
- continue;
- }
- DBUG_PRINT("info", ("%s existed on disk", name));
- // The .ndb file exists on disk, but it's not in list of tables in ndb
- // Verify that handler agrees table is gone.
- if (ndbcluster_table_exists_in_engine(thd, db, file_name) == 0)
- {
- DBUG_PRINT("info", ("NDB says %s does not exists", file_name));
- it.remove();
- // Put in list of tables to remove from disk
- delete_list.push_back(thd->strdup(file_name));
- }
- }
- // Check for new files to discover
- DBUG_PRINT("info", ("Checking for new files to discover"));
- List<char> create_list;
- for (i= 0 ; i < ndb_tables.records ; i++)
- {
- file_name= hash_element(&ndb_tables, i);
- if (!hash_search(&ok_tables, file_name, strlen(file_name)))
- {
- DBUG_PRINT("info", ("%s must be discovered", file_name));
- // File is in list of ndb tables and not in ok_tables
- // This table need to be created
- create_list.push_back(thd->strdup(file_name));
- }
- }
- // Lock mutex before deleting and creating frm files
- pthread_mutex_lock(&LOCK_open);
- if (!global_read_lock)
- {
- // Delete old files
- List_iterator_fast<char> it3(delete_list);
- while ((file_name=it3++))
- {
- DBUG_PRINT("info", ("Remove table %s/%s",db, file_name ));
- // Delete the table and all related files
- TABLE_LIST table_list;
- bzero((char*) &table_list,sizeof(table_list));
- table_list.db= (char*) db;
- table_list.alias=table_list.real_name=(char*)file_name;
- (void)mysql_rm_table_part2(thd, &table_list,
- /* if_exists */ TRUE,
- /* drop_temporary */ FALSE,
- /* dont_log_query*/ TRUE);
- }
- }
- // Create new files
- List_iterator_fast<char> it2(create_list);
- while ((file_name=it2++))
- {
- DBUG_PRINT("info", ("Table %s need discovery", name));
- if (ha_create_table_from_engine(thd, db, file_name) == 0)
- files->push_back(thd->strdup(file_name));
- }
- pthread_mutex_unlock(&LOCK_open);
- hash_free(&ok_tables);
- hash_free(&ndb_tables);
- } // extra bracket to avoid gcc 2.95.3 warning
- DBUG_RETURN(0);
- }
- /*
- Initialise all gloal variables before creating
- a NDB Cluster table handler
- */
- bool ndbcluster_init()
- {
- int res;
- DBUG_ENTER("ndbcluster_init");
- // Set connectstring if specified
- if (opt_ndbcluster_connectstring != 0)
- DBUG_PRINT("connectstring", ("%s", opt_ndbcluster_connectstring));
- if ((g_ndb_cluster_connection=
- new Ndb_cluster_connection(opt_ndbcluster_connectstring)) == 0)
- {
- DBUG_PRINT("error",("Ndb_cluster_connection(%s)",
- opt_ndbcluster_connectstring));
- goto ndbcluster_init_error;
- }
- g_ndb_cluster_connection->set_optimized_node_selection
- (opt_ndb_optimized_node_selection);
- // Create a Ndb object to open the connection to NDB
- g_ndb= new Ndb(g_ndb_cluster_connection, "sys");
- g_ndb->getDictionary()->set_local_table_data_size(sizeof(Ndb_local_table_statistics));
- if (g_ndb->init() != 0)
- {
- ERR_PRINT (g_ndb->getNdbError());
- goto ndbcluster_init_error;
- }
- if ((res= g_ndb_cluster_connection->connect(0,0,0)) == 0)
- {
- DBUG_PRINT("info",("NDBCLUSTER storage engine at %s on port %d",
- g_ndb_cluster_connection->get_connected_host(),
- g_ndb_cluster_connection->get_connected_port()));
- g_ndb_cluster_connection->wait_until_ready(10,3);
- }
- else if(res == 1)
- {
- if (g_ndb_cluster_connection->start_connect_thread()) {
- DBUG_PRINT("error", ("g_ndb_cluster_connection->start_connect_thread()"));
- goto ndbcluster_init_error;
- }
- {
- char buf[1024];
- DBUG_PRINT("info",("NDBCLUSTER storage engine not started, will connect using %s",
- g_ndb_cluster_connection->get_connectstring(buf,sizeof(buf))));
- }
- }
- else
- {
- DBUG_ASSERT(res == -1);
- DBUG_PRINT("error", ("permanent error"));
- goto ndbcluster_init_error;
- }
- (void) hash_init(&ndbcluster_open_tables,system_charset_info,32,0,0,
- (hash_get_key) ndbcluster_get_key,0,0);
- pthread_mutex_init(&ndbcluster_mutex,MY_MUTEX_INIT_FAST);
- ndbcluster_inited= 1;
- DBUG_RETURN(FALSE);
- ndbcluster_init_error:
- ndbcluster_end();
- DBUG_RETURN(TRUE);
- }
- /*
- End use of the NDB Cluster table handler
- - free all global variables allocated by
- ndcluster_init()
- */
- bool ndbcluster_end()
- {
- DBUG_ENTER("ndbcluster_end");
- if(g_ndb)
- {
- #ifndef DBUG_OFF
- Ndb::Free_list_usage tmp; tmp.m_name= 0;
- while (g_ndb->get_free_list_usage(&tmp))
- {
- uint leaked= (uint) tmp.m_created - tmp.m_free;
- if (leaked)
- fprintf(stderr, "NDB: Found %u %s%s that %s not been releasedn",
- leaked, tmp.m_name,
- (leaked == 1)?"":"'s",
- (leaked == 1)?"has":"have");
- }
- #endif
- delete g_ndb;
- }
- g_ndb= NULL;
- if (g_ndb_cluster_connection)
- delete g_ndb_cluster_connection;
- g_ndb_cluster_connection= NULL;
- if (!ndbcluster_inited)
- DBUG_RETURN(0);
- hash_free(&ndbcluster_open_tables);
- pthread_mutex_destroy(&ndbcluster_mutex);
- ndbcluster_inited= 0;
- DBUG_RETURN(0);
- }
- /*
- Static error print function called from
- static handler method ndbcluster_commit
- and ndbcluster_rollback
- */
- void ndbcluster_print_error(int error, const NdbOperation *error_op)
- {
- DBUG_ENTER("ndbcluster_print_error");
- TABLE tab;
- const char *tab_name= (error_op) ? error_op->getTableName() : "";
- tab.table_name= (char *) tab_name;
- ha_ndbcluster error_handler(&tab);
- tab.file= &error_handler;
- error_handler.print_error(error, MYF(0));
- DBUG_VOID_RETURN;
- }
- /**
- * Set a given location from full pathname to database name
- *
- */
- void ha_ndbcluster::set_dbname(const char *path_name, char *dbname)
- {
- char *end, *ptr;
- /* Scan name from the end */
- ptr= strend(path_name)-1;
- while (ptr >= path_name && *ptr != '\' && *ptr != '/') {
- ptr--;
- }
- ptr--;
- end= ptr;
- while (ptr >= path_name && *ptr != '\' && *ptr != '/') {
- ptr--;
- }
- uint name_len= end - ptr;
- memcpy(dbname, ptr + 1, name_len);
- dbname[name_len]= ' ';
- #ifdef __WIN__
- /* Put to lower case */
- ptr= dbname;
- while (*ptr != ' ') {
- *ptr= tolower(*ptr);
- ptr++;
- }
- #endif
- }
- /*
- Set m_dbname from full pathname to table file
- */
- void ha_ndbcluster::set_dbname(const char *path_name)
- {
- set_dbname(path_name, m_dbname);
- }
- /**
- * Set a given location from full pathname to table file
- *
- */
- void
- ha_ndbcluster::set_tabname(const char *path_name, char * tabname)
- {
- char *end, *ptr;
- /* Scan name from the end */
- end= strend(path_name)-1;
- ptr= end;
- while (ptr >= path_name && *ptr != '\' && *ptr != '/') {
- ptr--;
- }
- uint name_len= end - ptr;
- memcpy(tabname, ptr + 1, end - ptr);
- tabname[name_len]= ' ';
- #ifdef __WIN__
- /* Put to lower case */
- ptr= tabname;
- while (*ptr != ' ') {
- *ptr= tolower(*ptr);
- ptr++;
- }
- #endif
- }
- /*
- Set m_tabname from full pathname to table file
- */
- void ha_ndbcluster::set_tabname(const char *path_name)
- {
- set_tabname(path_name, m_tabname);
- }
- ha_rows
- ha_ndbcluster::records_in_range(uint inx, key_range *min_key,
- key_range *max_key)
- {
- KEY *key_info= table->key_info + inx;
- uint key_length= key_info->key_length;
- NDB_INDEX_TYPE idx_type= get_index_type(inx);
- DBUG_ENTER("records_in_range");
- // Prevent partial read of hash indexes by returning HA_POS_ERROR
- if ((idx_type == UNIQUE_INDEX || idx_type == PRIMARY_KEY_INDEX) &&
- ((min_key && min_key->length < key_length) ||
- (max_key && max_key->length < key_length)))
- DBUG_RETURN(HA_POS_ERROR);
- // Read from hash index with full key
- // This is a "const" table which returns only one record!
- if ((idx_type != ORDERED_INDEX) &&
- ((min_key && min_key->length == key_length) ||
- (max_key && max_key->length == key_length)))
- DBUG_RETURN(1);
- DBUG_RETURN(10); /* Good guess when you don't know anything */
- }
- ulong ha_ndbcluster::table_flags(void) const
- {
- if (m_ha_not_exact_count)
- return m_table_flags | HA_NOT_EXACT_COUNT;
- else
- return m_table_flags;
- }
- const char * ha_ndbcluster::table_type() const
- {
- return("ndbcluster");
- }
- uint ha_ndbcluster::max_supported_record_length() const
- {
- return NDB_MAX_TUPLE_SIZE;
- }
- uint ha_ndbcluster::max_supported_keys() const
- {
- return MAX_KEY;
- }
- uint ha_ndbcluster::max_supported_key_parts() const
- {
- return NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY;
- }
- uint ha_ndbcluster::max_supported_key_length() const
- {
- return NDB_MAX_KEY_SIZE;
- }
- bool ha_ndbcluster::low_byte_first() const
- {
- #ifdef WORDS_BIGENDIAN
- return FALSE;
- #else
- return TRUE;
- #endif
- }
- bool ha_ndbcluster::has_transactions()
- {
- return m_transaction_on;
- }
- const char* ha_ndbcluster::index_type(uint key_number)
- {
- switch (get_index_type(key_number)) {
- case ORDERED_INDEX:
- case UNIQUE_ORDERED_INDEX:
- case PRIMARY_KEY_ORDERED_INDEX:
- return "BTREE";
- case UNIQUE_INDEX:
- case PRIMARY_KEY_INDEX:
- default:
- return "HASH";
- }
- }
- uint8 ha_ndbcluster::table_cache_type()
- {
- if (m_use_local_query_cache)
- return HA_CACHE_TBL_TRANSACT;
- else
- return HA_CACHE_TBL_NOCACHE;
- }
- /*
- Handling the shared NDB_SHARE structure that is needed to
- provide table locking.
- It's also used for sharing data with other NDB handlers
- in the same MySQL Server. There is currently not much
- data we want to or can share.
- */
- static byte* ndbcluster_get_key(NDB_SHARE *share,uint *length,
- my_bool not_used __attribute__((unused)))
- {
- *length=share->table_name_length;
- return (byte*) share->table_name;
- }
- static NDB_SHARE* get_share(const char *table_name)
- {
- NDB_SHARE *share;
- pthread_mutex_lock(&ndbcluster_mutex);
- uint length=(uint) strlen(table_name);
- if (!(share=(NDB_SHARE*) hash_search(&ndbcluster_open_tables,
- (byte*) table_name,
- length)))
- {
- if ((share=(NDB_SHARE *) my_malloc(sizeof(*share)+length+1,
- MYF(MY_WME | MY_ZEROFILL))))
- {
- share->table_name_length=length;
- share->table_name=(char*) (share+1);
- strmov(share->table_name,table_name);
- if (my_hash_insert(&ndbcluster_open_tables, (byte*) share))
- {
- pthread_mutex_unlock(&ndbcluster_mutex);
- my_free((gptr) share,0);
- return 0;
- }
- thr_lock_init(&share->lock);
- pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST);
- }
- }
- share->use_count++;
- pthread_mutex_unlock(&ndbcluster_mutex);
- return share;
- }
- static void free_share(NDB_SHARE *share)
- {
- pthread_mutex_lock(&ndbcluster_mutex);
- if (!--share->use_count)
- {
- hash_delete(&ndbcluster_open_tables, (byte*) share);
- thr_lock_delete(&share->lock);
- pthread_mutex_destroy(&share->mutex);
- my_free((gptr) share, MYF(0));
- }
- pthread_mutex_unlock(&ndbcluster_mutex);
- }
- /*
- Internal representation of the frm blob
- */
- struct frm_blob_struct
- {
- struct frm_blob_header
- {
- uint ver; // Version of header
- uint orglen; // Original length of compressed data
- uint complen; // Compressed length of data, 0=uncompressed
- } head;
- char data[1];
- };
- static int packfrm(const void *data, uint len,
- const void **pack_data, uint *pack_len)
- {
- int error;
- ulong org_len, comp_len;
- uint blob_len;
- frm_blob_struct* blob;
- DBUG_ENTER("packfrm");
- DBUG_PRINT("enter", ("data: %x, len: %d", data, len));
- error= 1;
- org_len= len;
- if (my_compress((byte*)data, &org_len, &comp_len))
- goto err;
- DBUG_PRINT("info", ("org_len: %d, comp_len: %d", org_len, comp_len));
- DBUG_DUMP("compressed", (char*)data, org_len);
- error= 2;
- blob_len= sizeof(frm_blob_struct::frm_blob_header)+org_len;
- if (!(blob= (frm_blob_struct*) my_malloc(blob_len,MYF(MY_WME))))
- goto err;
- // Store compressed blob in machine independent format
- int4store((char*)(&blob->head.ver), 1);
- int4store((char*)(&blob->head.orglen), comp_len);
- int4store((char*)(&blob->head.complen), org_len);
- // Copy frm data into blob, already in machine independent format
- memcpy(blob->data, data, org_len);
- *pack_data= blob;
- *pack_len= blob_len;
- error= 0;
- DBUG_PRINT("exit", ("pack_data: %x, pack_len: %d", *pack_data, *pack_len));
- err:
- DBUG_RETURN(error);
- }
- static int unpackfrm(const void **unpack_data, uint *unpack_len,
- const void *pack_data)
- {
- const frm_blob_struct *blob= (frm_blob_struct*)pack_data;
- byte *data;
- ulong complen, orglen, ver;
- DBUG_ENTER("unpackfrm");
- DBUG_PRINT("enter", ("pack_data: %x", pack_data));
- complen= uint4korr((char*)&blob->head.complen);
- orglen= uint4korr((char*)&blob->head.orglen);
- ver= uint4korr((char*)&blob->head.ver);
- DBUG_PRINT("blob",("ver: %d complen: %d orglen: %d",
- ver,complen,orglen));
- DBUG_DUMP("blob->data", (char*) blob->data, complen);
- if (ver != 1)
- DBUG_RETURN(1);
- if (!(data= my_malloc(max(orglen, complen), MYF(MY_WME))))
- DBUG_RETURN(2);
- memcpy(data, blob->data, complen);
- if (my_uncompress(data, &complen, &orglen))
- {
- my_free((char*)data, MYF(0));
- DBUG_RETURN(3);
- }
- *unpack_data= data;
- *unpack_len= complen;
- DBUG_PRINT("exit", ("frmdata: %x, len: %d", *unpack_data, *unpack_len));
- DBUG_RETURN(0);
- }
- static
- int
- ndb_get_table_statistics(Ndb* ndb, const char * table,
- Uint64* row_count, Uint64* commit_count)
- {
- DBUG_ENTER("ndb_get_table_statistics");
- DBUG_PRINT("enter", ("table: %s", table));
- NdbConnection* pTrans= ndb->startTransaction();
- do
- {
- if (pTrans == NULL)
- break;
- NdbScanOperation* pOp= pTrans->getNdbScanOperation(table);
- if (pOp == NULL)
- break;
- NdbResultSet* rs= pOp->readTuples(NdbOperation::LM_CommittedRead);
- if (rs == 0)
- break;
- int check= pOp->interpret_exit_last_row();
- if (check == -1)
- break;
- Uint64 rows, commits;
- pOp->getValue(NdbDictionary::Column::ROW_COUNT, (char*)&rows);
- pOp->getValue(NdbDictionary::Column::COMMIT_COUNT, (char*)&commits);
- check= pTrans->execute(NoCommit, AbortOnError, TRUE);
- if (check == -1)
- break;
- Uint64 sum_rows= 0;
- Uint64 sum_commits= 0;
- while((check= rs->nextResult(TRUE, TRUE)) == 0)
- {
- sum_rows+= rows;
- sum_commits+= commits;
- }
- if (check == -1)
- break;
- rs->close(TRUE);
- ndb->closeTransaction(pTrans);
- if(row_count)
- * row_count= sum_rows;
- if(commit_count)
- * commit_count= sum_commits;
- DBUG_PRINT("exit", ("records: %u commits: %u", sum_rows, sum_commits));
- DBUG_RETURN(0);
- } while(0);
- ndb->closeTransaction(pTrans);
- DBUG_PRINT("exit", ("failed"));
- DBUG_RETURN(-1);
- }
- /*
- Create a .ndb file to serve as a placeholder indicating
- that the table with this name is a ndb table
- */
- int ha_ndbcluster::write_ndb_file()
- {
- File file;
- bool error=1;
- char path[FN_REFLEN];
- DBUG_ENTER("write_ndb_file");
- DBUG_PRINT("enter", ("db: %s, name: %s", m_dbname, m_tabname));
- (void)strxnmov(path, FN_REFLEN,
- mysql_data_home,"/",m_dbname,"/",m_tabname,ha_ndb_ext,NullS);
- if ((file=my_create(path, CREATE_MODE,O_RDWR | O_TRUNC,MYF(MY_WME))) >= 0)
- {
- // It's an empty file
- error=0;
- my_close(file,MYF(0));
- }
- DBUG_RETURN(error);
- }
- int
- ndbcluster_show_status(THD* thd)
- {
- Protocol *protocol= thd->protocol;
- DBUG_ENTER("ndbcluster_show_status");
- if (have_ndbcluster != SHOW_OPTION_YES)
- {
- my_message(ER_NOT_SUPPORTED_YET,
- "Cannot call SHOW NDBCLUSTER STATUS because skip-ndbcluster is defined",
- MYF(0));
- DBUG_RETURN(TRUE);
- }
- List<Item> field_list;
- field_list.push_back(new Item_empty_string("free_list", 255));
- field_list.push_back(new Item_return_int("created", 10,MYSQL_TYPE_LONG));
- field_list.push_back(new Item_return_int("free", 10,MYSQL_TYPE_LONG));
- field_list.push_back(new Item_return_int("sizeof", 10,MYSQL_TYPE_LONG));
- if (protocol->send_fields(&field_list, 1))
- DBUG_RETURN(TRUE);
- if (thd->transaction.thd_ndb &&
- ((Thd_ndb*)thd->transaction.thd_ndb)->ndb)
- {
- Ndb* ndb= ((Thd_ndb*)thd->transaction.thd_ndb)->ndb;
- Ndb::Free_list_usage tmp; tmp.m_name= 0;
- while (ndb->get_free_list_usage(&tmp))
- {
- protocol->prepare_for_resend();
- protocol->store(tmp.m_name, &my_charset_bin);
- protocol->store((uint)tmp.m_created);
- protocol->store((uint)tmp.m_free);
- protocol->store((uint)tmp.m_sizeof);
- if (protocol->write())
- DBUG_RETURN(TRUE);
- }
- }
- send_eof(thd);
- DBUG_RETURN(FALSE);
- }
- #endif /* HAVE_NDBCLUSTER_DB */