ha_ndbcluster.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:136k
源码类别:
MySQL数据库
开发平台:
Visual C++
- /* Copyright (C) 2000-2003 MySQL AB
- This program is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation; either version 2 of the License, or
- (at your option) any later version.
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
- You should have received a copy of the GNU General Public License
- along with this program; if not, write to the Free Software
- Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
- */
- /*
- This file defines the NDB Cluster handler: the interface between MySQL and
- NDB Cluster
- */
- #ifdef USE_PRAGMA_IMPLEMENTATION
- #pragma implementation // gcc: Class implementation
- #endif
- #include "mysql_priv.h"
- #ifdef HAVE_NDBCLUSTER_DB
- #include <my_dir.h>
- #include "ha_ndbcluster.h"
- #include <ndbapi/NdbApi.hpp>
- #include <ndbapi/NdbScanFilter.hpp>
- // options from from mysqld.cc
- extern my_bool opt_ndb_optimized_node_selection;
- extern const char *opt_ndbcluster_connectstring;
- // Default value for parallelism
- static const int parallelism= 240;
- // Default value for max number of transactions
- // createable against NDB from this handler
- static const int max_transactions= 256;
- static const char *ha_ndb_ext=".ndb";
- #define NDB_HIDDEN_PRIMARY_KEY_LENGTH 8
- #define NDB_FAILED_AUTO_INCREMENT ~(Uint64)0
- #define NDB_AUTO_INCREMENT_RETRIES 10
- #define NDB_INVALID_SCHEMA_OBJECT 241
- #define ERR_PRINT(err)
- DBUG_PRINT("error", ("%d message: %s", err.code, err.message))
- #define ERR_RETURN(err)
- {
- ERR_PRINT(err);
- DBUG_RETURN(ndb_to_mysql_error(&err));
- }
- // Typedefs for long names
- typedef NdbDictionary::Column NDBCOL;
- typedef NdbDictionary::Table NDBTAB;
- typedef NdbDictionary::Index NDBINDEX;
- typedef NdbDictionary::Dictionary NDBDICT;
- bool ndbcluster_inited= FALSE;
- static Ndb* g_ndb= NULL;
- static Ndb_cluster_connection* g_ndb_cluster_connection= NULL;
- // Handler synchronization
- pthread_mutex_t ndbcluster_mutex;
- // Table lock handling
- static HASH ndbcluster_open_tables;
- static byte *ndbcluster_get_key(NDB_SHARE *share,uint *length,
- my_bool not_used __attribute__((unused)));
- static NDB_SHARE *get_share(const char *table_name);
- static void free_share(NDB_SHARE *share);
- static int packfrm(const void *data, uint len, const void **pack_data, uint *pack_len);
- static int unpackfrm(const void **data, uint *len,
- const void* pack_data);
- static int ndb_get_table_statistics(Ndb*, const char *,
- Uint64* rows, Uint64* commits);
- /*
- Dummy buffer to read zero pack_length fields
- which are mapped to 1 char
- */
- static byte dummy_buf[1];
- /*
- Error handling functions
- */
- struct err_code_mapping
- {
- int ndb_err;
- int my_err;
- int show_warning;
- };
- static const err_code_mapping err_map[]=
- {
- { 626, HA_ERR_KEY_NOT_FOUND, 0 },
- { 630, HA_ERR_FOUND_DUPP_KEY, 0 },
- { 893, HA_ERR_FOUND_DUPP_KEY, 0 },
- { 721, HA_ERR_TABLE_EXIST, 1 },
- { 4244, HA_ERR_TABLE_EXIST, 1 },
- { 709, HA_ERR_NO_SUCH_TABLE, 1 },
- { 266, HA_ERR_LOCK_WAIT_TIMEOUT, 1 },
- { 274, HA_ERR_LOCK_WAIT_TIMEOUT, 1 },
- { 296, HA_ERR_LOCK_WAIT_TIMEOUT, 1 },
- { 297, HA_ERR_LOCK_WAIT_TIMEOUT, 1 },
- { 237, HA_ERR_LOCK_WAIT_TIMEOUT, 1 },
- { 623, HA_ERR_RECORD_FILE_FULL, 1 },
- { 624, HA_ERR_RECORD_FILE_FULL, 1 },
- { 625, HA_ERR_RECORD_FILE_FULL, 1 },
- { 826, HA_ERR_RECORD_FILE_FULL, 1 },
- { 827, HA_ERR_RECORD_FILE_FULL, 1 },
- { 832, HA_ERR_RECORD_FILE_FULL, 1 },
- { 0, 1, 0 },
- { -1, -1, 1 }
- };
- static int ndb_to_mysql_error(const NdbError *err)
- {
- uint i;
- for (i=0; err_map[i].ndb_err != err->code && err_map[i].my_err != -1; i++);
- if (err_map[i].show_warning)
- {
- // Push the NDB error message as warning
- push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
- ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
- err->code, err->message, "NDB");
- }
- if (err_map[i].my_err == -1)
- return err->code;
- return err_map[i].my_err;
- }
- inline
- int execute_no_commit(ha_ndbcluster *h, NdbConnection *trans)
- {
- int m_batch_execute= 0;
- #ifdef NOT_USED
- if (m_batch_execute)
- return 0;
- #endif
- return trans->execute(NoCommit,AbortOnError,h->m_force_send);
- }
- inline
- int execute_commit(ha_ndbcluster *h, NdbConnection *trans)
- {
- int m_batch_execute= 0;
- #ifdef NOT_USED
- if (m_batch_execute)
- return 0;
- #endif
- return trans->execute(Commit,AbortOnError,h->m_force_send);
- }
- inline
- int execute_commit(THD *thd, NdbConnection *trans)
- {
- int m_batch_execute= 0;
- #ifdef NOT_USED
- if (m_batch_execute)
- return 0;
- #endif
- return trans->execute(Commit,AbortOnError,thd->variables.ndb_force_send);
- }
- inline
- int execute_no_commit_ie(ha_ndbcluster *h, NdbConnection *trans)
- {
- int m_batch_execute= 0;
- #ifdef NOT_USED
- if (m_batch_execute)
- return 0;
- #endif
- return trans->execute(NoCommit, AO_IgnoreError,h->m_force_send);
- }
- /*
- Place holder for ha_ndbcluster thread specific data
- */
- Thd_ndb::Thd_ndb()
- {
- ndb= new Ndb(g_ndb_cluster_connection, "");
- lock_count= 0;
- count= 0;
- error= 0;
- }
- Thd_ndb::~Thd_ndb()
- {
- if (ndb)
- {
- #ifndef DBUG_OFF
- Ndb::Free_list_usage tmp; tmp.m_name= 0;
- while (ndb->get_free_list_usage(&tmp))
- {
- uint leaked= (uint) tmp.m_created - tmp.m_free;
- if (leaked)
- fprintf(stderr, "NDB: Found %u %s%s that %s not been releasedn",
- leaked, tmp.m_name,
- (leaked == 1)?"":"'s",
- (leaked == 1)?"has":"have");
- }
- #endif
- delete ndb;
- }
- ndb= 0;
- }
- inline
- Ndb *ha_ndbcluster::get_ndb()
- {
- return ((Thd_ndb*)current_thd->transaction.thd_ndb)->ndb;
- }
- /*
- * manage uncommitted insert/deletes during transactio to get records correct
- */
- struct Ndb_local_table_statistics {
- int no_uncommitted_rows_count;
- ulong last_count;
- ha_rows records;
- };
- void ha_ndbcluster::set_rec_per_key()
- {
- DBUG_ENTER("ha_ndbcluster::get_status_const");
- for (uint i=0 ; i < table->keys ; i++)
- {
- table->key_info[i].rec_per_key[table->key_info[i].key_parts-1]= 1;
- }
- DBUG_VOID_RETURN;
- }
- void ha_ndbcluster::records_update()
- {
- if (m_ha_not_exact_count)
- return;
- DBUG_ENTER("ha_ndbcluster::records_update");
- struct Ndb_local_table_statistics *info=
- (struct Ndb_local_table_statistics *)m_table_info;
- DBUG_PRINT("info", ("id=%d, no_uncommitted_rows_count=%d",
- ((const NDBTAB *)m_table)->getTableId(),
- info->no_uncommitted_rows_count));
- // if (info->records == ~(ha_rows)0)
- {
- Ndb *ndb= get_ndb();
- Uint64 rows;
- if(ndb_get_table_statistics(ndb, m_tabname, &rows, 0) == 0){
- info->records= rows;
- }
- }
- {
- THD *thd= current_thd;
- if (((Thd_ndb*)(thd->transaction.thd_ndb))->error)
- info->no_uncommitted_rows_count= 0;
- }
- records= info->records+ info->no_uncommitted_rows_count;
- DBUG_VOID_RETURN;
- }
- void ha_ndbcluster::no_uncommitted_rows_execute_failure()
- {
- if (m_ha_not_exact_count)
- return;
- DBUG_ENTER("ha_ndbcluster::no_uncommitted_rows_execute_failure");
- THD *thd= current_thd;
- ((Thd_ndb*)(thd->transaction.thd_ndb))->error= 1;
- DBUG_VOID_RETURN;
- }
- void ha_ndbcluster::no_uncommitted_rows_init(THD *thd)
- {
- if (m_ha_not_exact_count)
- return;
- DBUG_ENTER("ha_ndbcluster::no_uncommitted_rows_init");
- struct Ndb_local_table_statistics *info=
- (struct Ndb_local_table_statistics *)m_table_info;
- Thd_ndb *thd_ndb= (Thd_ndb *)thd->transaction.thd_ndb;
- if (info->last_count != thd_ndb->count)
- {
- info->last_count = thd_ndb->count;
- info->no_uncommitted_rows_count= 0;
- info->records= ~(ha_rows)0;
- DBUG_PRINT("info", ("id=%d, no_uncommitted_rows_count=%d",
- ((const NDBTAB *)m_table)->getTableId(),
- info->no_uncommitted_rows_count));
- }
- DBUG_VOID_RETURN;
- }
- void ha_ndbcluster::no_uncommitted_rows_update(int c)
- {
- if (m_ha_not_exact_count)
- return;
- DBUG_ENTER("ha_ndbcluster::no_uncommitted_rows_update");
- struct Ndb_local_table_statistics *info=
- (struct Ndb_local_table_statistics *)m_table_info;
- info->no_uncommitted_rows_count+= c;
- DBUG_PRINT("info", ("id=%d, no_uncommitted_rows_count=%d",
- ((const NDBTAB *)m_table)->getTableId(),
- info->no_uncommitted_rows_count));
- DBUG_VOID_RETURN;
- }
- void ha_ndbcluster::no_uncommitted_rows_reset(THD *thd)
- {
- if (m_ha_not_exact_count)
- return;
- DBUG_ENTER("ha_ndbcluster::no_uncommitted_rows_reset");
- ((Thd_ndb*)(thd->transaction.thd_ndb))->count++;
- ((Thd_ndb*)(thd->transaction.thd_ndb))->error= 0;
- DBUG_VOID_RETURN;
- }
- /*
- Take care of the error that occured in NDB
- RETURN
- 0 No error
- # The mapped error code
- */
- void ha_ndbcluster::invalidate_dictionary_cache(bool global)
- {
- NDBDICT *dict= get_ndb()->getDictionary();
- DBUG_ENTER("invalidate_dictionary_cache");
- DBUG_PRINT("info", ("invalidating %s", m_tabname));
- if (global)
- {
- const NDBTAB *tab= dict->getTable(m_tabname);
- if (!tab)
- DBUG_VOID_RETURN;
- if (tab->getObjectStatus() == NdbDictionary::Object::Invalid)
- {
- // Global cache has already been invalidated
- dict->removeCachedTable(m_tabname);
- global= FALSE;
- }
- else
- dict->invalidateTable(m_tabname);
- }
- else
- dict->removeCachedTable(m_tabname);
- table->version=0L; /* Free when thread is ready */
- /* Invalidate indexes */
- for (uint i= 0; i < table->keys; i++)
- {
- NDBINDEX *index = (NDBINDEX *) m_index[i].index;
- NDBINDEX *unique_index = (NDBINDEX *) m_index[i].unique_index;
- NDB_INDEX_TYPE idx_type= m_index[i].type;
- switch(idx_type) {
- case(PRIMARY_KEY_ORDERED_INDEX):
- case(ORDERED_INDEX):
- if (global)
- dict->invalidateIndex(index->getName(), m_tabname);
- else
- dict->removeCachedIndex(index->getName(), m_tabname);
- break;
- case(UNIQUE_ORDERED_INDEX):
- if (global)
- dict->invalidateIndex(index->getName(), m_tabname);
- else
- dict->removeCachedIndex(index->getName(), m_tabname);
- case(UNIQUE_INDEX):
- if (global)
- dict->invalidateIndex(unique_index->getName(), m_tabname);
- else
- dict->removeCachedIndex(unique_index->getName(), m_tabname);
- break;
- case(PRIMARY_KEY_INDEX):
- case(UNDEFINED_INDEX):
- break;
- }
- }
- DBUG_VOID_RETURN;
- }
- int ha_ndbcluster::ndb_err(NdbConnection *trans)
- {
- int res;
- NdbError err= trans->getNdbError();
- DBUG_ENTER("ndb_err");
- ERR_PRINT(err);
- switch (err.classification) {
- case NdbError::SchemaError:
- {
- invalidate_dictionary_cache(TRUE);
- if (err.code==284)
- {
- /*
- Check if the table is _really_ gone or if the table has
- been alterend and thus changed table id
- */
- NDBDICT *dict= get_ndb()->getDictionary();
- DBUG_PRINT("info", ("Check if table %s is really gone", m_tabname));
- if (!(dict->getTable(m_tabname)))
- {
- err= dict->getNdbError();
- DBUG_PRINT("info", ("Table not found, error: %d", err.code));
- if (err.code != 709)
- DBUG_RETURN(1);
- }
- else
- {
- DBUG_PRINT("info", ("Table exist but must have changed"));
- /* In 5.0, this should be replaced with a mapping to a mysql error */
- my_printf_error(ER_UNKNOWN_ERROR,
- "Table definition has changed, "
- "please retry transaction",
- MYF(0));
- DBUG_RETURN(1);
- }
- }
- break;
- }
- default:
- break;
- }
- res= ndb_to_mysql_error(&err);
- DBUG_PRINT("info", ("transformed ndbcluster error %d to mysql error %d",
- err.code, res));
- if (res == HA_ERR_FOUND_DUPP_KEY)
- {
- if (m_rows_to_insert == 1)
- m_dupkey= table->primary_key;
- else
- {
- /* We are batching inserts, offending key is not available */
- m_dupkey= (uint) -1;
- }
- }
- DBUG_RETURN(res);
- }
- /*
- Override the default get_error_message in order to add the
- error message of NDB
- */
- bool ha_ndbcluster::get_error_message(int error,
- String *buf)
- {
- DBUG_ENTER("ha_ndbcluster::get_error_message");
- DBUG_PRINT("enter", ("error: %d", error));
- Ndb *ndb= get_ndb();
- if (!ndb)
- DBUG_RETURN(FALSE);
- const NdbError err= ndb->getNdbError(error);
- bool temporary= err.status==NdbError::TemporaryError;
- buf->set(err.message, strlen(err.message), &my_charset_bin);
- DBUG_PRINT("exit", ("message: %s, temporary: %d", buf->ptr(), temporary));
- DBUG_RETURN(temporary);
- }
- #ifndef DBUG_OFF
- /*
- Check if type is supported by NDB.
- */
- static bool ndb_supported_type(enum_field_types type)
- {
- switch (type) {
- case MYSQL_TYPE_DECIMAL:
- case MYSQL_TYPE_TINY:
- case MYSQL_TYPE_SHORT:
- case MYSQL_TYPE_LONG:
- case MYSQL_TYPE_INT24:
- case MYSQL_TYPE_LONGLONG:
- case MYSQL_TYPE_FLOAT:
- case MYSQL_TYPE_DOUBLE:
- case MYSQL_TYPE_TIMESTAMP:
- case MYSQL_TYPE_DATETIME:
- case MYSQL_TYPE_DATE:
- case MYSQL_TYPE_NEWDATE:
- case MYSQL_TYPE_TIME:
- case MYSQL_TYPE_YEAR:
- case MYSQL_TYPE_STRING:
- case MYSQL_TYPE_VAR_STRING:
- case MYSQL_TYPE_TINY_BLOB:
- case MYSQL_TYPE_BLOB:
- case MYSQL_TYPE_MEDIUM_BLOB:
- case MYSQL_TYPE_LONG_BLOB:
- case MYSQL_TYPE_ENUM:
- case MYSQL_TYPE_SET:
- return TRUE;
- case MYSQL_TYPE_NULL:
- case MYSQL_TYPE_GEOMETRY:
- break;
- }
- return FALSE;
- }
- #endif /* !DBUG_OFF */
- /*
- Instruct NDB to set the value of the hidden primary key
- */
- bool ha_ndbcluster::set_hidden_key(NdbOperation *ndb_op,
- uint fieldnr, const byte *field_ptr)
- {
- DBUG_ENTER("set_hidden_key");
- DBUG_RETURN(ndb_op->equal(fieldnr, (char*)field_ptr,
- NDB_HIDDEN_PRIMARY_KEY_LENGTH) != 0);
- }
- /*
- Instruct NDB to set the value of one primary key attribute
- */
- int ha_ndbcluster::set_ndb_key(NdbOperation *ndb_op, Field *field,
- uint fieldnr, const byte *field_ptr)
- {
- uint32 pack_len= field->pack_length();
- DBUG_ENTER("set_ndb_key");
- DBUG_PRINT("enter", ("%d: %s, ndb_type: %u, len=%d",
- fieldnr, field->field_name, field->type(),
- pack_len));
- DBUG_DUMP("key", (char*)field_ptr, pack_len);
- DBUG_ASSERT(ndb_supported_type(field->type()));
- DBUG_ASSERT(! (field->flags & BLOB_FLAG));
- // Common implementation for most field types
- DBUG_RETURN(ndb_op->equal(fieldnr, (char*) field_ptr, pack_len) != 0);
- }
- /*
- Instruct NDB to set the value of one attribute
- */
- int ha_ndbcluster::set_ndb_value(NdbOperation *ndb_op, Field *field,
- uint fieldnr, bool *set_blob_value)
- {
- const byte* field_ptr= field->ptr;
- uint32 pack_len= field->pack_length();
- DBUG_ENTER("set_ndb_value");
- DBUG_PRINT("enter", ("%d: %s, type: %u, len=%d, is_null=%s",
- fieldnr, field->field_name, field->type(),
- pack_len, field->is_null()?"Y":"N"));
- DBUG_DUMP("value", (char*) field_ptr, pack_len);
- DBUG_ASSERT(ndb_supported_type(field->type()));
- {
- // ndb currently does not support size 0
- const byte *empty_field= "";
- if (pack_len == 0)
- {
- pack_len= 1;
- field_ptr= empty_field;
- }
- if (! (field->flags & BLOB_FLAG))
- {
- if (field->is_null())
- // Set value to NULL
- DBUG_RETURN((ndb_op->setValue(fieldnr, (char*)NULL, pack_len) != 0));
- // Common implementation for most field types
- DBUG_RETURN(ndb_op->setValue(fieldnr, (char*)field_ptr, pack_len) != 0);
- }
- // Blob type
- NdbBlob *ndb_blob= ndb_op->getBlobHandle(fieldnr);
- if (ndb_blob != NULL)
- {
- if (field->is_null())
- DBUG_RETURN(ndb_blob->setNull() != 0);
- Field_blob *field_blob= (Field_blob*)field;
- // Get length and pointer to data
- uint32 blob_len= field_blob->get_length(field_ptr);
- char* blob_ptr= NULL;
- field_blob->get_ptr(&blob_ptr);
- // Looks like NULL ptr signals length 0 blob
- if (blob_ptr == NULL) {
- DBUG_ASSERT(blob_len == 0);
- blob_ptr= (char*)"";
- }
- DBUG_PRINT("value", ("set blob ptr=%p len=%u",
- blob_ptr, blob_len));
- DBUG_DUMP("value", (char*)blob_ptr, min(blob_len, 26));
- if (set_blob_value)
- *set_blob_value= TRUE;
- // No callback needed to write value
- DBUG_RETURN(ndb_blob->setValue(blob_ptr, blob_len) != 0);
- }
- DBUG_RETURN(1);
- }
- }
- /*
- Callback to read all blob values.
- - not done in unpack_record because unpack_record is valid
- after execute(Commit) but reading blobs is not
- - may only generate read operations; they have to be executed
- somewhere before the data is available
- - due to single buffer for all blobs, we let the last blob
- process all blobs (last so that all are active)
- - null bit is still set in unpack_record
- - TODO allocate blob part aligned buffers
- */
- NdbBlob::ActiveHook g_get_ndb_blobs_value;
- int g_get_ndb_blobs_value(NdbBlob *ndb_blob, void *arg)
- {
- DBUG_ENTER("g_get_ndb_blobs_value");
- if (ndb_blob->blobsNextBlob() != NULL)
- DBUG_RETURN(0);
- ha_ndbcluster *ha= (ha_ndbcluster *)arg;
- DBUG_RETURN(ha->get_ndb_blobs_value(ndb_blob));
- }
- int ha_ndbcluster::get_ndb_blobs_value(NdbBlob *last_ndb_blob)
- {
- DBUG_ENTER("get_ndb_blobs_value");
- // Field has no field number so cannot use TABLE blob_field
- // Loop twice, first only counting total buffer size
- for (int loop= 0; loop <= 1; loop++)
- {
- uint32 offset= 0;
- for (uint i= 0; i < table->fields; i++)
- {
- Field *field= table->field[i];
- NdbValue value= m_value[i];
- if (value.ptr != NULL && (field->flags & BLOB_FLAG))
- {
- Field_blob *field_blob= (Field_blob *)field;
- NdbBlob *ndb_blob= value.blob;
- Uint64 blob_len= 0;
- if (ndb_blob->getLength(blob_len) != 0)
- DBUG_RETURN(-1);
- // Align to Uint64
- uint32 blob_size= blob_len;
- if (blob_size % 8 != 0)
- blob_size+= 8 - blob_size % 8;
- if (loop == 1)
- {
- char *buf= m_blobs_buffer + offset;
- uint32 len= 0xffffffff; // Max uint32
- DBUG_PRINT("value", ("read blob ptr=%x len=%u",
- (UintPtr)buf, (uint)blob_len));
- if (ndb_blob->readData(buf, len) != 0)
- DBUG_RETURN(-1);
- DBUG_ASSERT(len == blob_len);
- field_blob->set_ptr(len, buf);
- }
- offset+= blob_size;
- }
- }
- if (loop == 0 && offset > m_blobs_buffer_size)
- {
- my_free(m_blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
- m_blobs_buffer_size= 0;
- DBUG_PRINT("value", ("allocate blobs buffer size %u", offset));
- m_blobs_buffer= my_malloc(offset, MYF(MY_WME));
- if (m_blobs_buffer == NULL)
- DBUG_RETURN(-1);
- m_blobs_buffer_size= offset;
- }
- }
- DBUG_RETURN(0);
- }
- /*
- Instruct NDB to fetch one field
- - data is read directly into buffer provided by field
- if field is NULL, data is read into memory provided by NDBAPI
- */
- int ha_ndbcluster::get_ndb_value(NdbOperation *ndb_op, Field *field,
- uint fieldnr, byte* buf)
- {
- DBUG_ENTER("get_ndb_value");
- DBUG_PRINT("enter", ("fieldnr: %d flags: %o", fieldnr,
- (int)(field != NULL ? field->flags : 0)));
- if (field != NULL)
- {
- DBUG_ASSERT(buf);
- DBUG_ASSERT(ndb_supported_type(field->type()));
- DBUG_ASSERT(field->ptr != NULL);
- if (! (field->flags & BLOB_FLAG))
- {
- byte *field_buf;
- if (field->pack_length() != 0)
- field_buf= buf + (field->ptr - table->record[0]);
- else
- field_buf= dummy_buf;
- m_value[fieldnr].rec= ndb_op->getValue(fieldnr,
- field_buf);
- DBUG_RETURN(m_value[fieldnr].rec == NULL);
- }
- // Blob type
- NdbBlob *ndb_blob= ndb_op->getBlobHandle(fieldnr);
- m_value[fieldnr].blob= ndb_blob;
- if (ndb_blob != NULL)
- {
- // Set callback
- void *arg= (void *)this;
- DBUG_RETURN(ndb_blob->setActiveHook(g_get_ndb_blobs_value, arg) != 0);
- }
- DBUG_RETURN(1);
- }
- // Used for hidden key only
- m_value[fieldnr].rec= ndb_op->getValue(fieldnr, NULL);
- DBUG_RETURN(m_value[fieldnr].rec == NULL);
- }
- /*
- Check if any set or get of blob value in current query.
- */
- bool ha_ndbcluster::uses_blob_value(bool all_fields)
- {
- if (table->blob_fields == 0)
- return FALSE;
- if (all_fields)
- return TRUE;
- {
- uint no_fields= table->fields;
- int i;
- THD *thd= table->in_use;
- // They always put blobs at the end..
- for (i= no_fields - 1; i >= 0; i--)
- {
- Field *field= table->field[i];
- if (thd->query_id == field->query_id)
- {
- return TRUE;
- }
- }
- }
- return FALSE;
- }
- /*
- Get metadata for this table from NDB
- IMPLEMENTATION
- - check that frm-file on disk is equal to frm-file
- of table accessed in NDB
- */
- int ha_ndbcluster::get_metadata(const char *path)
- {
- Ndb *ndb= get_ndb();
- NDBDICT *dict= ndb->getDictionary();
- const NDBTAB *tab;
- int error;
- bool invalidating_ndb_table= FALSE;
- DBUG_ENTER("get_metadata");
- DBUG_PRINT("enter", ("m_tabname: %s, path: %s", m_tabname, path));
- do {
- const void *data, *pack_data;
- uint length, pack_length;
- if (!(tab= dict->getTable(m_tabname)))
- ERR_RETURN(dict->getNdbError());
- // Check if thread has stale local cache
- if (tab->getObjectStatus() == NdbDictionary::Object::Invalid)
- {
- invalidate_dictionary_cache(FALSE);
- if (!(tab= dict->getTable(m_tabname)))
- ERR_RETURN(dict->getNdbError());
- DBUG_PRINT("info", ("Table schema version: %d", tab->getObjectVersion()));
- }
- /*
- Compare FrmData in NDB with frm file from disk.
- */
- error= 0;
- if (readfrm(path, &data, &length) ||
- packfrm(data, length, &pack_data, &pack_length))
- {
- my_free((char*)data, MYF(MY_ALLOW_ZERO_PTR));
- my_free((char*)pack_data, MYF(MY_ALLOW_ZERO_PTR));
- DBUG_RETURN(1);
- }
- if ((pack_length != tab->getFrmLength()) ||
- (memcmp(pack_data, tab->getFrmData(), pack_length)))
- {
- if (!invalidating_ndb_table)
- {
- DBUG_PRINT("info", ("Invalidating table"));
- invalidate_dictionary_cache(TRUE);
- invalidating_ndb_table= TRUE;
- }
- else
- {
- DBUG_PRINT("error",
- ("metadata, pack_length: %d getFrmLength: %d memcmp: %d",
- pack_length, tab->getFrmLength(),
- memcmp(pack_data, tab->getFrmData(), pack_length)));
- DBUG_DUMP("pack_data", (char*)pack_data, pack_length);
- DBUG_DUMP("frm", (char*)tab->getFrmData(), tab->getFrmLength());
- error= 3;
- invalidating_ndb_table= FALSE;
- }
- }
- else
- {
- invalidating_ndb_table= FALSE;
- }
- my_free((char*)data, MYF(0));
- my_free((char*)pack_data, MYF(0));
- } while (invalidating_ndb_table);
- if (error)
- DBUG_RETURN(error);
- m_table_version= tab->getObjectVersion();
- m_table= (void *)tab;
- m_table_info= NULL; // Set in external lock
- DBUG_RETURN(build_index_list(ndb, table, ILBP_OPEN));
- }
- static int fix_unique_index_attr_order(NDB_INDEX_DATA &data,
- const NDBINDEX *index,
- KEY *key_info)
- {
- DBUG_ENTER("fix_unique_index_attr_order");
- unsigned sz= index->getNoOfIndexColumns();
- if (data.unique_index_attrid_map)
- my_free((char*)data.unique_index_attrid_map, MYF(0));
- data.unique_index_attrid_map= (unsigned char*)my_malloc(sz,MYF(MY_WME));
- KEY_PART_INFO* key_part= key_info->key_part;
- KEY_PART_INFO* end= key_part+key_info->key_parts;
- DBUG_ASSERT(key_info->key_parts == sz);
- for (unsigned i= 0; key_part != end; key_part++, i++)
- {
- const char *field_name= key_part->field->field_name;
- unsigned name_sz= strlen(field_name);
- if (name_sz >= NDB_MAX_ATTR_NAME_SIZE)
- name_sz= NDB_MAX_ATTR_NAME_SIZE-1;
- #ifndef DBUG_OFF
- data.unique_index_attrid_map[i]= 255;
- #endif
- for (unsigned j= 0; j < sz; j++)
- {
- const NDBCOL *c= index->getColumn(j);
- if (strncmp(field_name, c->getName(), name_sz) == 0)
- {
- data.unique_index_attrid_map[i]= j;
- break;
- }
- }
- DBUG_ASSERT(data.unique_index_attrid_map[i] != 255);
- }
- DBUG_RETURN(0);
- }
- int ha_ndbcluster::build_index_list(Ndb *ndb, TABLE *tab, enum ILBP phase)
- {
- uint i;
- int error= 0;
- const char *name, *index_name;
- char unique_index_name[FN_LEN];
- static const char* unique_suffix= "$unique";
- KEY* key_info= tab->key_info;
- const char **key_name= tab->keynames.type_names;
- NDBDICT *dict= ndb->getDictionary();
- DBUG_ENTER("build_index_list");
- // Save information about all known indexes
- for (i= 0; i < tab->keys; i++, key_info++, key_name++)
- {
- index_name= *key_name;
- NDB_INDEX_TYPE idx_type= get_index_type_from_table(i);
- m_index[i].type= idx_type;
- if (idx_type == UNIQUE_ORDERED_INDEX || idx_type == UNIQUE_INDEX)
- {
- strxnmov(unique_index_name, FN_LEN, index_name, unique_suffix, NullS);
- DBUG_PRINT("info", ("Created unique index name '%s' for index %d",
- unique_index_name, i));
- }
- // Create secondary indexes if in create phase
- if (phase == ILBP_CREATE)
- {
- DBUG_PRINT("info", ("Creating index %u: %s", i, index_name));
- switch (idx_type){
- case PRIMARY_KEY_INDEX:
- // Do nothing, already created
- break;
- case PRIMARY_KEY_ORDERED_INDEX:
- error= create_ordered_index(index_name, key_info);
- break;
- case UNIQUE_ORDERED_INDEX:
- if (!(error= create_ordered_index(index_name, key_info)))
- error= create_unique_index(unique_index_name, key_info);
- break;
- case UNIQUE_INDEX:
- if (!(error= check_index_fields_not_null(i)))
- error= create_unique_index(unique_index_name, key_info);
- break;
- case ORDERED_INDEX:
- error= create_ordered_index(index_name, key_info);
- break;
- default:
- DBUG_ASSERT(FALSE);
- break;
- }
- if (error)
- {
- DBUG_PRINT("error", ("Failed to create index %u", i));
- drop_table();
- break;
- }
- }
- // Add handles to index objects
- if (idx_type != PRIMARY_KEY_INDEX && idx_type != UNIQUE_INDEX)
- {
- DBUG_PRINT("info", ("Get handle to index %s", index_name));
- const NDBINDEX *index= dict->getIndex(index_name, m_tabname);
- if (!index) DBUG_RETURN(1);
- m_index[i].index= (void *) index;
- }
- if (idx_type == UNIQUE_ORDERED_INDEX || idx_type == UNIQUE_INDEX)
- {
- DBUG_PRINT("info", ("Get handle to unique_index %s", unique_index_name));
- const NDBINDEX *index= dict->getIndex(unique_index_name, m_tabname);
- if (!index) DBUG_RETURN(1);
- m_index[i].unique_index= (void *) index;
- error= fix_unique_index_attr_order(m_index[i], index, key_info);
- }
- }
- DBUG_RETURN(error);
- }
- /*
- Decode the type of an index from information
- provided in table object
- */
- NDB_INDEX_TYPE ha_ndbcluster::get_index_type_from_table(uint inx) const
- {
- bool is_hash_index= (table->key_info[inx].algorithm == HA_KEY_ALG_HASH);
- if (inx == table->primary_key)
- return is_hash_index ? PRIMARY_KEY_INDEX : PRIMARY_KEY_ORDERED_INDEX;
- else
- return ((table->key_info[inx].flags & HA_NOSAME) ?
- (is_hash_index ? UNIQUE_INDEX : UNIQUE_ORDERED_INDEX) :
- ORDERED_INDEX);
- }
- int ha_ndbcluster::check_index_fields_not_null(uint inx)
- {
- KEY* key_info= table->key_info + inx;
- KEY_PART_INFO* key_part= key_info->key_part;
- KEY_PART_INFO* end= key_part+key_info->key_parts;
- DBUG_ENTER("check_index_fields_not_null");
- for (; key_part != end; key_part++)
- {
- Field* field= key_part->field;
- if (field->maybe_null())
- {
- my_printf_error(ER_NULL_COLUMN_IN_INDEX,ER(ER_NULL_COLUMN_IN_INDEX),
- MYF(0),field->field_name);
- DBUG_RETURN(ER_NULL_COLUMN_IN_INDEX);
- }
- }
- DBUG_RETURN(0);
- }
- void ha_ndbcluster::release_metadata()
- {
- uint i;
- DBUG_ENTER("release_metadata");
- DBUG_PRINT("enter", ("m_tabname: %s", m_tabname));
- m_table= NULL;
- m_table_info= NULL;
- // Release index list
- for (i= 0; i < MAX_KEY; i++)
- {
- m_index[i].unique_index= NULL;
- m_index[i].index= NULL;
- if (m_index[i].unique_index_attrid_map)
- {
- my_free((char *)m_index[i].unique_index_attrid_map, MYF(0));
- m_index[i].unique_index_attrid_map= NULL;
- }
- }
- DBUG_VOID_RETURN;
- }
- int ha_ndbcluster::get_ndb_lock_type(enum thr_lock_type type)
- {
- if (type >= TL_WRITE_ALLOW_WRITE)
- return NdbOperation::LM_Exclusive;
- else if (uses_blob_value(m_retrieve_all_fields))
- return NdbOperation::LM_Read;
- else
- return NdbOperation::LM_CommittedRead;
- }
- static const ulong index_type_flags[]=
- {
- /* UNDEFINED_INDEX */
- 0,
- /* PRIMARY_KEY_INDEX */
- HA_ONLY_WHOLE_INDEX,
- /* PRIMARY_KEY_ORDERED_INDEX */
- /*
- Enable HA_KEYREAD_ONLY when "sorted" indexes are supported,
- thus ORDERD BY clauses can be optimized by reading directly
- through the index.
- */
- // HA_KEYREAD_ONLY |
- HA_READ_NEXT |
- HA_READ_RANGE |
- HA_READ_ORDER,
- /* UNIQUE_INDEX */
- HA_ONLY_WHOLE_INDEX,
- /* UNIQUE_ORDERED_INDEX */
- HA_READ_NEXT |
- HA_READ_RANGE |
- HA_READ_ORDER,
- /* ORDERED_INDEX */
- HA_READ_NEXT |
- HA_READ_RANGE |
- HA_READ_ORDER
- };
- static const int index_flags_size= sizeof(index_type_flags)/sizeof(ulong);
- inline NDB_INDEX_TYPE ha_ndbcluster::get_index_type(uint idx_no) const
- {
- DBUG_ASSERT(idx_no < MAX_KEY);
- return m_index[idx_no].type;
- }
- /*
- Get the flags for an index
- RETURN
- flags depending on the type of the index.
- */
- inline ulong ha_ndbcluster::index_flags(uint idx_no, uint part,
- bool all_parts) const
- {
- DBUG_ENTER("index_flags");
- DBUG_PRINT("info", ("idx_no: %d", idx_no));
- DBUG_ASSERT(get_index_type_from_table(idx_no) < index_flags_size);
- DBUG_RETURN(index_type_flags[get_index_type_from_table(idx_no)]);
- }
- int ha_ndbcluster::set_primary_key(NdbOperation *op, const byte *key)
- {
- KEY* key_info= table->key_info + table->primary_key;
- KEY_PART_INFO* key_part= key_info->key_part;
- KEY_PART_INFO* end= key_part+key_info->key_parts;
- DBUG_ENTER("set_primary_key");
- for (; key_part != end; key_part++)
- {
- Field* field= key_part->field;
- if (set_ndb_key(op, field,
- key_part->fieldnr-1, key))
- ERR_RETURN(op->getNdbError());
- key += key_part->length;
- }
- DBUG_RETURN(0);
- }
- int ha_ndbcluster::set_primary_key_from_record(NdbOperation *op, const byte *record)
- {
- KEY* key_info= table->key_info + table->primary_key;
- KEY_PART_INFO* key_part= key_info->key_part;
- KEY_PART_INFO* end= key_part+key_info->key_parts;
- DBUG_ENTER("set_primary_key_from_record");
- for (; key_part != end; key_part++)
- {
- Field* field= key_part->field;
- if (set_ndb_key(op, field,
- key_part->fieldnr-1, record+key_part->offset))
- ERR_RETURN(op->getNdbError());
- }
- DBUG_RETURN(0);
- }
- /*
- Read one record from NDB using primary key
- */
- int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf)
- {
- uint no_fields= table->fields, i;
- NdbConnection *trans= m_active_trans;
- NdbOperation *op;
- THD *thd= current_thd;
- DBUG_ENTER("pk_read");
- DBUG_PRINT("enter", ("key_len: %u", key_len));
- DBUG_DUMP("key", (char*)key, key_len);
- NdbOperation::LockMode lm=
- (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
- if (!(op= trans->getNdbOperation((const NDBTAB *) m_table)) ||
- op->readTuple(lm) != 0)
- ERR_RETURN(trans->getNdbError());
- if (table->primary_key == MAX_KEY)
- {
- // This table has no primary key, use "hidden" primary key
- DBUG_PRINT("info", ("Using hidden key"));
- DBUG_DUMP("key", (char*)key, 8);
- if (set_hidden_key(op, no_fields, key))
- ERR_RETURN(trans->getNdbError());
- // Read key at the same time, for future reference
- if (get_ndb_value(op, NULL, no_fields, NULL))
- ERR_RETURN(trans->getNdbError());
- }
- else
- {
- int res;
- if ((res= set_primary_key(op, key)))
- return res;
- }
- // Read all wanted non-key field(s) unless HA_EXTRA_RETRIEVE_ALL_COLS
- for (i= 0; i < no_fields; i++)
- {
- Field *field= table->field[i];
- if ((thd->query_id == field->query_id) ||
- m_retrieve_all_fields ||
- (field->flags & PRI_KEY_FLAG) && m_retrieve_primary_key)
- {
- if (get_ndb_value(op, field, i, buf))
- ERR_RETURN(trans->getNdbError());
- }
- else
- {
- // Attribute was not to be read
- m_value[i].ptr= NULL;
- }
- }
- if (execute_no_commit_ie(this,trans) != 0)
- {
- table->status= STATUS_NOT_FOUND;
- DBUG_RETURN(ndb_err(trans));
- }
- // The value have now been fetched from NDB
- unpack_record(buf);
- table->status= 0;
- DBUG_RETURN(0);
- }
- /*
- Read one complementing record from NDB using primary key from old_data
- */
- int ha_ndbcluster::complemented_pk_read(const byte *old_data, byte *new_data)
- {
- uint no_fields= table->fields, i;
- NdbConnection *trans= m_active_trans;
- NdbOperation *op;
- THD *thd= current_thd;
- DBUG_ENTER("complemented_pk_read");
- if (m_retrieve_all_fields)
- // We have allready retrieved all fields, nothing to complement
- DBUG_RETURN(0);
- NdbOperation::LockMode lm=
- (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
- if (!(op= trans->getNdbOperation((const NDBTAB *) m_table)) ||
- op->readTuple(lm) != 0)
- ERR_RETURN(trans->getNdbError());
- int res;
- if ((res= set_primary_key_from_record(op, old_data)))
- ERR_RETURN(trans->getNdbError());
- // Read all unreferenced non-key field(s)
- for (i= 0; i < no_fields; i++)
- {
- Field *field= table->field[i];
- if (!(field->flags & PRI_KEY_FLAG) &&
- (thd->query_id != field->query_id))
- {
- if (get_ndb_value(op, field, i, new_data))
- ERR_RETURN(trans->getNdbError());
- }
- }
- if (execute_no_commit(this,trans) != 0)
- {
- table->status= STATUS_NOT_FOUND;
- DBUG_RETURN(ndb_err(trans));
- }
- // The value have now been fetched from NDB
- unpack_record(new_data);
- table->status= 0;
- DBUG_RETURN(0);
- }
- /*
- Peek to check if a particular row already exists
- */
- int ha_ndbcluster::peek_row(const byte *record)
- {
- NdbConnection *trans= m_active_trans;
- NdbOperation *op;
- THD *thd= current_thd;
- DBUG_ENTER("peek_row");
- NdbOperation::LockMode lm=
- (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
- if (!(op= trans->getNdbOperation((const NDBTAB *) m_table)) ||
- op->readTuple(lm) != 0)
- ERR_RETURN(trans->getNdbError());
- int res;
- if ((res= set_primary_key_from_record(op, record)))
- ERR_RETURN(trans->getNdbError());
- if (execute_no_commit_ie(this,trans) != 0)
- {
- table->status= STATUS_NOT_FOUND;
- DBUG_RETURN(ndb_err(trans));
- }
- DBUG_RETURN(0);
- }
- /*
- Read one record from NDB using unique secondary index
- */
- int ha_ndbcluster::unique_index_read(const byte *key,
- uint key_len, byte *buf)
- {
- NdbConnection *trans= m_active_trans;
- NdbIndexOperation *op;
- THD *thd= current_thd;
- byte *key_ptr;
- KEY* key_info;
- KEY_PART_INFO *key_part, *end;
- uint i;
- DBUG_ENTER("unique_index_read");
- DBUG_PRINT("enter", ("key_len: %u, index: %u", key_len, active_index));
- DBUG_DUMP("key", (char*)key, key_len);
- NdbOperation::LockMode lm=
- (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
- if (!(op= trans->getNdbIndexOperation((NDBINDEX *)
- m_index[active_index].unique_index,
- (const NDBTAB *) m_table)) ||
- op->readTuple(lm) != 0)
- ERR_RETURN(trans->getNdbError());
- // Set secondary index key(s)
- key_ptr= (byte *) key;
- key_info= table->key_info + active_index;
- DBUG_ASSERT(key_info->key_length == key_len);
- end= (key_part= key_info->key_part) + key_info->key_parts;
- for (i= 0; key_part != end; key_part++, i++)
- {
- if (set_ndb_key(op, key_part->field,
- m_index[active_index].unique_index_attrid_map[i],
- key_part->null_bit ? key_ptr + 1 : key_ptr))
- ERR_RETURN(trans->getNdbError());
- key_ptr+= key_part->store_length;
- }
- // Get non-index attribute(s)
- for (i= 0; i < table->fields; i++)
- {
- Field *field= table->field[i];
- if ((thd->query_id == field->query_id) ||
- (field->flags & PRI_KEY_FLAG)) // && m_retrieve_primary_key ??
- {
- if (get_ndb_value(op, field, i, buf))
- ERR_RETURN(op->getNdbError());
- }
- else
- {
- // Attribute was not to be read
- m_value[i].ptr= NULL;
- }
- }
- if (execute_no_commit_ie(this,trans) != 0)
- {
- table->status= STATUS_NOT_FOUND;
- DBUG_RETURN(ndb_err(trans));
- }
- // The value have now been fetched from NDB
- unpack_record(buf);
- table->status= 0;
- DBUG_RETURN(0);
- }
- /*
- Get the next record of a started scan. Try to fetch
- it locally from NdbApi cached records if possible,
- otherwise ask NDB for more.
- NOTE
- If this is a update/delete make sure to not contact
- NDB before any pending ops have been sent to NDB.
- */
- inline int ha_ndbcluster::next_result(byte *buf)
- {
- int check;
- NdbConnection *trans= m_active_trans;
- NdbResultSet *cursor= m_active_cursor;
- DBUG_ENTER("next_result");
- if (!cursor)
- DBUG_RETURN(HA_ERR_END_OF_FILE);
- /*
- If this an update or delete, call nextResult with false
- to process any records already cached in NdbApi
- */
- bool contact_ndb= m_lock.type < TL_WRITE_ALLOW_WRITE;
- do {
- DBUG_PRINT("info", ("Call nextResult, contact_ndb: %d", contact_ndb));
- /*
- We can only handle one tuple with blobs at a time.
- */
- if (m_ops_pending && m_blobs_pending)
- {
- if (execute_no_commit(this,trans) != 0)
- DBUG_RETURN(ndb_err(trans));
- m_ops_pending= 0;
- m_blobs_pending= FALSE;
- }
- check= cursor->nextResult(contact_ndb, m_force_send);
- if (check == 0)
- {
- // One more record found
- DBUG_PRINT("info", ("One more record found"));
- unpack_record(buf);
- table->status= 0;
- DBUG_RETURN(0);
- }
- else if (check == 1 || check == 2)
- {
- // 1: No more records
- // 2: No more cached records
- /*
- Before fetching more rows and releasing lock(s),
- all pending update or delete operations should
- be sent to NDB
- */
- DBUG_PRINT("info", ("ops_pending: %d", m_ops_pending));
- if (m_ops_pending)
- {
- // if (current_thd->transaction.on)
- if (m_transaction_on)
- {
- if (execute_no_commit(this,trans) != 0)
- DBUG_RETURN(ndb_err(trans));
- }
- else
- {
- if (execute_commit(this,trans) != 0)
- DBUG_RETURN(ndb_err(trans));
- int res= trans->restart();
- DBUG_ASSERT(res == 0);
- }
- m_ops_pending= 0;
- }
- contact_ndb= (check == 2);
- }
- } while (check == 2);
- table->status= STATUS_NOT_FOUND;
- if (check == -1)
- DBUG_RETURN(ndb_err(trans));
- // No more records
- DBUG_PRINT("info", ("No more records"));
- DBUG_RETURN(HA_ERR_END_OF_FILE);
- }
- /*
- Set bounds for ordered index scan.
- */
- int ha_ndbcluster::set_bounds(NdbIndexScanOperation *op,
- const key_range *keys[2])
- {
- const KEY *const key_info= table->key_info + active_index;
- const uint key_parts= key_info->key_parts;
- uint key_tot_len[2];
- uint tot_len;
- uint i, j;
- DBUG_ENTER("set_bounds");
- DBUG_PRINT("info", ("key_parts=%d", key_parts));
- for (j= 0; j <= 1; j++)
- {
- const key_range *key= keys[j];
- if (key != NULL)
- {
- // for key->flag see ha_rkey_function
- DBUG_PRINT("info", ("key %d length=%d flag=%d",
- j, key->length, key->flag));
- key_tot_len[j]= key->length;
- }
- else
- {
- DBUG_PRINT("info", ("key %d not present", j));
- key_tot_len[j]= 0;
- }
- }
- tot_len= 0;
- for (i= 0; i < key_parts; i++)
- {
- KEY_PART_INFO *key_part= &key_info->key_part[i];
- Field *field= key_part->field;
- uint part_len= key_part->length;
- uint part_store_len= key_part->store_length;
- // Info about each key part
- struct part_st {
- bool part_last;
- const key_range *key;
- const byte *part_ptr;
- bool part_null;
- int bound_type;
- const char* bound_ptr;
- };
- struct part_st part[2];
- for (j= 0; j <= 1; j++)
- {
- struct part_st &p = part[j];
- p.key= NULL;
- p.bound_type= -1;
- if (tot_len < key_tot_len[j])
- {
- p.part_last= (tot_len + part_store_len >= key_tot_len[j]);
- p.key= keys[j];
- p.part_ptr= &p.key->key[tot_len];
- p.part_null= key_part->null_bit && *p.part_ptr;
- p.bound_ptr= (const char *)
- p.part_null ? 0 : key_part->null_bit ? p.part_ptr + 1 : p.part_ptr;
- if (j == 0)
- {
- switch (p.key->flag)
- {
- case HA_READ_KEY_EXACT:
- p.bound_type= NdbIndexScanOperation::BoundEQ;
- break;
- case HA_READ_KEY_OR_NEXT:
- p.bound_type= NdbIndexScanOperation::BoundLE;
- break;
- case HA_READ_AFTER_KEY:
- if (! p.part_last)
- p.bound_type= NdbIndexScanOperation::BoundLE;
- else
- p.bound_type= NdbIndexScanOperation::BoundLT;
- break;
- default:
- break;
- }
- }
- if (j == 1) {
- switch (p.key->flag)
- {
- case HA_READ_BEFORE_KEY:
- if (! p.part_last)
- p.bound_type= NdbIndexScanOperation::BoundGE;
- else
- p.bound_type= NdbIndexScanOperation::BoundGT;
- break;
- case HA_READ_AFTER_KEY: // weird
- p.bound_type= NdbIndexScanOperation::BoundGE;
- break;
- default:
- break;
- }
- }
- if (p.bound_type == -1)
- {
- DBUG_PRINT("error", ("key %d unknown flag %d", j, p.key->flag));
- DBUG_ASSERT(false);
- // Stop setting bounds but continue with what we have
- DBUG_RETURN(0);
- }
- }
- }
- // Seen with e.g. b = 1 and c > 1
- if (part[0].bound_type == NdbIndexScanOperation::BoundLE &&
- part[1].bound_type == NdbIndexScanOperation::BoundGE &&
- memcmp(part[0].part_ptr, part[1].part_ptr, part_store_len) == 0)
- {
- DBUG_PRINT("info", ("replace LE/GE pair by EQ"));
- part[0].bound_type= NdbIndexScanOperation::BoundEQ;
- part[1].bound_type= -1;
- }
- // Not seen but was in previous version
- if (part[0].bound_type == NdbIndexScanOperation::BoundEQ &&
- part[1].bound_type == NdbIndexScanOperation::BoundGE &&
- memcmp(part[0].part_ptr, part[1].part_ptr, part_store_len) == 0)
- {
- DBUG_PRINT("info", ("remove GE from EQ/GE pair"));
- part[1].bound_type= -1;
- }
- for (j= 0; j <= 1; j++)
- {
- struct part_st &p = part[j];
- // Set bound if not done with this key
- if (p.key != NULL)
- {
- DBUG_PRINT("info", ("key %d:%d offset=%d length=%d last=%d bound=%d",
- j, i, tot_len, part_len, p.part_last, p.bound_type));
- DBUG_DUMP("info", (const char*)p.part_ptr, part_store_len);
- // Set bound if not cancelled via type -1
- if (p.bound_type != -1)
- {
- if (op->setBound(i, p.bound_type, p.bound_ptr))
- ERR_RETURN(op->getNdbError());
- }
- }
- }
- tot_len+= part_store_len;
- }
- DBUG_RETURN(0);
- }
- inline
- int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op)
- {
- uint i;
- THD *thd= current_thd;
- NdbConnection *trans= m_active_trans;
- DBUG_ENTER("define_read_attrs");
- // Define attributes to read
- for (i= 0; i < table->fields; i++)
- {
- Field *field= table->field[i];
- if ((thd->query_id == field->query_id) ||
- (field->flags & PRI_KEY_FLAG) ||
- m_retrieve_all_fields)
- {
- if (get_ndb_value(op, field, i, buf))
- ERR_RETURN(op->getNdbError());
- }
- else
- {
- m_value[i].ptr= NULL;
- }
- }
- if (table->primary_key == MAX_KEY)
- {
- DBUG_PRINT("info", ("Getting hidden key"));
- // Scanning table with no primary key
- int hidden_no= table->fields;
- #ifndef DBUG_OFF
- const NDBTAB *tab= (const NDBTAB *) m_table;
- if (!tab->getColumn(hidden_no))
- DBUG_RETURN(1);
- #endif
- if (get_ndb_value(op, NULL, hidden_no, NULL))
- ERR_RETURN(op->getNdbError());
- }
- if (execute_no_commit(this,trans) != 0)
- DBUG_RETURN(ndb_err(trans));
- DBUG_PRINT("exit", ("Scan started successfully"));
- DBUG_RETURN(next_result(buf));
- }
- /*
- Start ordered index scan in NDB
- */
- int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
- const key_range *end_key,
- bool sorted, byte* buf)
- {
- bool restart;
- NdbConnection *trans= m_active_trans;
- NdbResultSet *cursor;
- NdbIndexScanOperation *op;
- DBUG_ENTER("ordered_index_scan");
- DBUG_PRINT("enter", ("index: %u, sorted: %d", active_index, sorted));
- DBUG_PRINT("enter", ("Starting new ordered scan on %s", m_tabname));
- // Check that sorted seems to be initialised
- DBUG_ASSERT(sorted == 0 || sorted == 1);
- if (m_active_cursor == 0)
- {
- restart= false;
- NdbOperation::LockMode lm=
- (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
- if (!(op= trans->getNdbIndexScanOperation((NDBINDEX *)
- m_index[active_index].index,
- (const NDBTAB *) m_table)) ||
- !(cursor= op->readTuples(lm, 0, parallelism, sorted)))
- ERR_RETURN(trans->getNdbError());
- m_active_cursor= cursor;
- } else {
- restart= true;
- op= (NdbIndexScanOperation*)m_active_cursor->getOperation();
- DBUG_ASSERT(op->getSorted() == sorted);
- DBUG_ASSERT(op->getLockMode() ==
- (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type));
- if(op->reset_bounds(m_force_send))
- DBUG_RETURN(ndb_err(m_active_trans));
- }
- {
- const key_range *keys[2]= { start_key, end_key };
- int ret= set_bounds(op, keys);
- if (ret)
- DBUG_RETURN(ret);
- }
- if (!restart)
- {
- DBUG_RETURN(define_read_attrs(buf, op));
- }
- else
- {
- if (execute_no_commit(this,trans) != 0)
- DBUG_RETURN(ndb_err(trans));
- DBUG_RETURN(next_result(buf));
- }
- }
- /*
- Start a filtered scan in NDB.
- NOTE
- This function is here as an example of how to start a
- filtered scan. It should be possible to replace full_table_scan
- with this function and make a best effort attempt
- at filtering out the irrelevant data by converting the "items"
- into interpreted instructions.
- This would speed up table scans where there is a limiting WHERE clause
- that doesn't match any index in the table.
- */
- int ha_ndbcluster::filtered_scan(const byte *key, uint key_len,
- byte *buf,
- enum ha_rkey_function find_flag)
- {
- NdbConnection *trans= m_active_trans;
- NdbResultSet *cursor;
- NdbScanOperation *op;
- DBUG_ENTER("filtered_scan");
- DBUG_PRINT("enter", ("key_len: %u, index: %u",
- key_len, active_index));
- DBUG_DUMP("key", (char*)key, key_len);
- DBUG_PRINT("info", ("Starting a new filtered scan on %s",
- m_tabname));
- NdbOperation::LockMode lm=
- (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
- if (!(op= trans->getNdbScanOperation((const NDBTAB *) m_table)) ||
- !(cursor= op->readTuples(lm, 0, parallelism)))
- ERR_RETURN(trans->getNdbError());
- m_active_cursor= cursor;
- {
- // Start scan filter
- NdbScanFilter sf(op);
- sf.begin();
- // Set filter using the supplied key data
- byte *key_ptr= (byte *) key;
- uint tot_len= 0;
- KEY* key_info= table->key_info + active_index;
- for (uint k= 0; k < key_info->key_parts; k++)
- {
- KEY_PART_INFO* key_part= key_info->key_part+k;
- Field* field= key_part->field;
- uint ndb_fieldnr= key_part->fieldnr-1;
- DBUG_PRINT("key_part", ("fieldnr: %d", ndb_fieldnr));
- //const NDBCOL *col= ((const NDBTAB *) m_table)->getColumn(ndb_fieldnr);
- uint32 field_len= field->pack_length();
- DBUG_DUMP("key", (char*)key, field_len);
- DBUG_PRINT("info", ("Column %s, type: %d, len: %d",
- field->field_name, field->real_type(), field_len));
- // Define scan filter
- if (field->real_type() == MYSQL_TYPE_STRING)
- sf.eq(ndb_fieldnr, key_ptr, field_len);
- else
- {
- if (field_len == 8)
- sf.eq(ndb_fieldnr, (Uint64)*key_ptr);
- else if (field_len <= 4)
- sf.eq(ndb_fieldnr, (Uint32)*key_ptr);
- else
- DBUG_RETURN(1);
- }
- key_ptr += field_len;
- tot_len += field_len;
- if (tot_len >= key_len)
- break;
- }
- // End scan filter
- sf.end();
- }
- DBUG_RETURN(define_read_attrs(buf, op));
- }
- /*
- Start full table scan in NDB
- */
- int ha_ndbcluster::full_table_scan(byte *buf)
- {
- uint i;
- NdbResultSet *cursor;
- NdbScanOperation *op;
- NdbConnection *trans= m_active_trans;
- DBUG_ENTER("full_table_scan");
- DBUG_PRINT("enter", ("Starting new scan on %s", m_tabname));
- NdbOperation::LockMode lm=
- (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
- if (!(op=trans->getNdbScanOperation((const NDBTAB *) m_table)) ||
- !(cursor= op->readTuples(lm, 0, parallelism)))
- ERR_RETURN(trans->getNdbError());
- m_active_cursor= cursor;
- DBUG_RETURN(define_read_attrs(buf, op));
- }
- /*
- Insert one record into NDB
- */
- int ha_ndbcluster::write_row(byte *record)
- {
- bool has_auto_increment;
- uint i;
- NdbConnection *trans= m_active_trans;
- NdbOperation *op;
- int res;
- DBUG_ENTER("write_row");
- if(m_ignore_dup_key && table->primary_key != MAX_KEY)
- {
- int peek_res= peek_row(record);
- if (!peek_res)
- {
- m_dupkey= table->primary_key;
- DBUG_RETURN(HA_ERR_FOUND_DUPP_KEY);
- }
- if (peek_res != HA_ERR_KEY_NOT_FOUND)
- DBUG_RETURN(peek_res);
- }
- statistic_increment(ha_write_count,&LOCK_status);
- if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
- table->timestamp_field->set_time();
- has_auto_increment= (table->next_number_field && record == table->record[0]);
- if (!(op= trans->getNdbOperation((const NDBTAB *) m_table)))
- ERR_RETURN(trans->getNdbError());
- res= (m_use_write) ? op->writeTuple() :op->insertTuple();
- if (res != 0)
- ERR_RETURN(trans->getNdbError());
- if (table->primary_key == MAX_KEY)
- {
- // Table has hidden primary key
- Ndb *ndb= get_ndb();
- Uint64 auto_value= NDB_FAILED_AUTO_INCREMENT;
- uint retries= NDB_AUTO_INCREMENT_RETRIES;
- do {
- auto_value= ndb->getAutoIncrementValue((const NDBTAB *) m_table);
- } while (auto_value == NDB_FAILED_AUTO_INCREMENT &&
- --retries &&
- ndb->getNdbError().status == NdbError::TemporaryError);
- if (auto_value == NDB_FAILED_AUTO_INCREMENT)
- ERR_RETURN(ndb->getNdbError());
- if (set_hidden_key(op, table->fields, (const byte*)&auto_value))
- ERR_RETURN(op->getNdbError());
- }
- else
- {
- int res;
- if (has_auto_increment)
- {
- m_skip_auto_increment= FALSE;
- update_auto_increment();
- m_skip_auto_increment= !auto_increment_column_changed;
- }
- if ((res= set_primary_key_from_record(op, record)))
- return res;
- }
- // Set non-key attribute(s)
- bool set_blob_value= FALSE;
- for (i= 0; i < table->fields; i++)
- {
- Field *field= table->field[i];
- if (!(field->flags & PRI_KEY_FLAG) &&
- set_ndb_value(op, field, i, &set_blob_value))
- {
- m_skip_auto_increment= TRUE;
- ERR_RETURN(op->getNdbError());
- }
- }
- /*
- Execute write operation
- NOTE When doing inserts with many values in
- each INSERT statement it should not be necessary
- to NoCommit the transaction between each row.
- Find out how this is detected!
- */
- m_rows_inserted++;
- no_uncommitted_rows_update(1);
- m_bulk_insert_not_flushed= TRUE;
- if ((m_rows_to_insert == (ha_rows) 1) ||
- ((m_rows_inserted % m_bulk_insert_rows) == 0) ||
- m_primary_key_update ||
- set_blob_value)
- {
- THD *thd= current_thd;
- // Send rows to NDB
- DBUG_PRINT("info", ("Sending inserts to NDB, "
- "rows_inserted:%d, bulk_insert_rows: %d",
- (int)m_rows_inserted, (int)m_bulk_insert_rows));
- m_bulk_insert_not_flushed= FALSE;
- // if (thd->transaction.on)
- if (m_transaction_on)
- {
- if (execute_no_commit(this,trans) != 0)
- {
- m_skip_auto_increment= TRUE;
- no_uncommitted_rows_execute_failure();
- DBUG_RETURN(ndb_err(trans));
- }
- }
- else
- {
- if (execute_commit(this,trans) != 0)
- {
- m_skip_auto_increment= TRUE;
- no_uncommitted_rows_execute_failure();
- DBUG_RETURN(ndb_err(trans));
- }
- int res= trans->restart();
- DBUG_ASSERT(res == 0);
- }
- }
- if ((has_auto_increment) && (m_skip_auto_increment))
- {
- Ndb *ndb= get_ndb();
- Uint64 next_val= (Uint64) table->next_number_field->val_int() + 1;
- DBUG_PRINT("info",
- ("Trying to set next auto increment value to %lu",
- (ulong) next_val));
- if (ndb->setAutoIncrementValue((const NDBTAB *) m_table, next_val, TRUE))
- DBUG_PRINT("info",
- ("Setting next auto increment value to %u", next_val));
- }
- m_skip_auto_increment= TRUE;
- DBUG_RETURN(0);
- }
- /* Compare if a key in a row has changed */
- int ha_ndbcluster::key_cmp(uint keynr, const byte * old_row,
- const byte * new_row)
- {
- KEY_PART_INFO *key_part=table->key_info[keynr].key_part;
- KEY_PART_INFO *end=key_part+table->key_info[keynr].key_parts;
- for (; key_part != end ; key_part++)
- {
- if (key_part->null_bit)
- {
- if ((old_row[key_part->null_offset] & key_part->null_bit) !=
- (new_row[key_part->null_offset] & key_part->null_bit))
- return 1;
- }
- if (key_part->key_part_flag & (HA_BLOB_PART | HA_VAR_LENGTH))
- {
- if (key_part->field->cmp_binary((char*) (old_row + key_part->offset),
- (char*) (new_row + key_part->offset),
- (ulong) key_part->length))
- return 1;
- }
- else
- {
- if (memcmp(old_row+key_part->offset, new_row+key_part->offset,
- key_part->length))
- return 1;
- }
- }
- return 0;
- }
- /*
- Update one record in NDB using primary key
- */
- int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
- {
- THD *thd= current_thd;
- NdbConnection *trans= m_active_trans;
- NdbResultSet* cursor= m_active_cursor;
- NdbOperation *op;
- uint i;
- DBUG_ENTER("update_row");
- statistic_increment(ha_update_count,&LOCK_status);
- if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE)
- {
- table->timestamp_field->set_time();
- // Set query_id so that field is really updated
- table->timestamp_field->query_id= thd->query_id;
- }
- /* Check for update of primary key for special handling */
- if ((table->primary_key != MAX_KEY) &&
- (key_cmp(table->primary_key, old_data, new_data)))
- {
- int read_res, insert_res, delete_res, undo_res;
- DBUG_PRINT("info", ("primary key update, doing pk read+delete+insert"));
- // Get all old fields, since we optimize away fields not in query
- read_res= complemented_pk_read(old_data, new_data);
- if (read_res)
- {
- DBUG_PRINT("info", ("pk read failed"));
- DBUG_RETURN(read_res);
- }
- // Delete old row
- m_primary_key_update= TRUE;
- delete_res= delete_row(old_data);
- m_primary_key_update= FALSE;
- if (delete_res)
- {
- DBUG_PRINT("info", ("delete failed"));
- DBUG_RETURN(delete_res);
- }
- // Insert new row
- DBUG_PRINT("info", ("delete succeded"));
- m_primary_key_update= TRUE;
- insert_res= write_row(new_data);
- m_primary_key_update= FALSE;
- if (insert_res)
- {
- DBUG_PRINT("info", ("insert failed"));
- if (trans->commitStatus() == NdbConnection::Started)
- {
- // Undo delete_row(old_data)
- m_primary_key_update= TRUE;
- undo_res= write_row((byte *)old_data);
- if (undo_res)
- push_warning(current_thd,
- MYSQL_ERROR::WARN_LEVEL_WARN,
- undo_res,
- "NDB failed undoing delete at primary key update");
- m_primary_key_update= FALSE;
- }
- DBUG_RETURN(insert_res);
- }
- DBUG_PRINT("info", ("delete+insert succeeded"));
- DBUG_RETURN(0);
- }
- if (cursor)
- {
- /*
- We are scanning records and want to update the record
- that was just found, call updateTuple on the cursor
- to take over the lock to a new update operation
- And thus setting the primary key of the record from
- the active record in cursor
- */
- DBUG_PRINT("info", ("Calling updateTuple on cursor"));
- if (!(op= cursor->updateTuple()))
- ERR_RETURN(trans->getNdbError());
- m_ops_pending++;
- if (uses_blob_value(FALSE))
- m_blobs_pending= TRUE;
- }
- else
- {
- if (!(op= trans->getNdbOperation((const NDBTAB *) m_table)) ||
- op->updateTuple() != 0)
- ERR_RETURN(trans->getNdbError());
- if (table->primary_key == MAX_KEY)
- {
- // This table has no primary key, use "hidden" primary key
- DBUG_PRINT("info", ("Using hidden key"));
- // Require that the PK for this record has previously been
- // read into m_value
- uint no_fields= table->fields;
- NdbRecAttr* rec= m_value[no_fields].rec;
- DBUG_ASSERT(rec);
- DBUG_DUMP("key", (char*)rec->aRef(), NDB_HIDDEN_PRIMARY_KEY_LENGTH);
- if (set_hidden_key(op, no_fields, rec->aRef()))
- ERR_RETURN(op->getNdbError());
- }
- else
- {
- int res;
- if ((res= set_primary_key_from_record(op, old_data)))
- DBUG_RETURN(res);
- }
- }
- // Set non-key attribute(s)
- for (i= 0; i < table->fields; i++)
- {
- Field *field= table->field[i];
- if (((thd->query_id == field->query_id) || m_retrieve_all_fields) &&
- (!(field->flags & PRI_KEY_FLAG)) &&
- set_ndb_value(op, field, i))
- ERR_RETURN(op->getNdbError());
- }
- // Execute update operation
- if (!cursor && execute_no_commit(this,trans) != 0) {
- no_uncommitted_rows_execute_failure();
- DBUG_RETURN(ndb_err(trans));
- }
- DBUG_RETURN(0);
- }
- /*
- Delete one record from NDB, using primary key
- */
- int ha_ndbcluster::delete_row(const byte *record)
- {
- NdbConnection *trans= m_active_trans;
- NdbResultSet* cursor= m_active_cursor;
- NdbOperation *op;
- DBUG_ENTER("delete_row");
- statistic_increment(ha_delete_count,&LOCK_status);
- if (cursor)
- {
- /*
- We are scanning records and want to delete the record
- that was just found, call deleteTuple on the cursor
- to take over the lock to a new delete operation
- And thus setting the primary key of the record from
- the active record in cursor
- */
- DBUG_PRINT("info", ("Calling deleteTuple on cursor"));
- if (cursor->deleteTuple() != 0)
- ERR_RETURN(trans->getNdbError());
- m_ops_pending++;
- no_uncommitted_rows_update(-1);
- if (!m_primary_key_update)
- // If deleting from cursor, NoCommit will be handled in next_result
- DBUG_RETURN(0);
- }
- else
- {
- if (!(op=trans->getNdbOperation((const NDBTAB *) m_table)) ||
- op->deleteTuple() != 0)
- ERR_RETURN(trans->getNdbError());
- no_uncommitted_rows_update(-1);
- if (table->primary_key == MAX_KEY)
- {
- // This table has no primary key, use "hidden" primary key
- DBUG_PRINT("info", ("Using hidden key"));
- uint no_fields= table->fields;
- NdbRecAttr* rec= m_value[no_fields].rec;
- DBUG_ASSERT(rec != NULL);
- if (set_hidden_key(op, no_fields, rec->aRef()))
- ERR_RETURN(op->getNdbError());
- }
- else
- {
- int res;
- if ((res= set_primary_key_from_record(op, record)))
- return res;
- }
- }
- // Execute delete operation
- if (execute_no_commit(this,trans) != 0) {
- no_uncommitted_rows_execute_failure();
- DBUG_RETURN(ndb_err(trans));
- }
- DBUG_RETURN(0);
- }
- /*
- Unpack a record read from NDB
- SYNOPSIS
- unpack_record()
- buf Buffer to store read row
- NOTE
- The data for each row is read directly into the
- destination buffer. This function is primarily
- called in order to check if any fields should be
- set to null.
- */
- void ha_ndbcluster::unpack_record(byte* buf)
- {
- uint row_offset= (uint) (buf - table->record[0]);
- Field **field, **end;
- NdbValue *value= m_value;
- DBUG_ENTER("unpack_record");
- // Set null flag(s)
- bzero(buf, table->null_bytes);
- for (field= table->field, end= field+table->fields;
- field < end;
- field++, value++)
- {
- if ((*value).ptr)
- {
- if (! ((*field)->flags & BLOB_FLAG))
- {
- if ((*value).rec->isNULL())
- (*field)->set_null(row_offset);
- }
- else
- {
- NdbBlob* ndb_blob= (*value).blob;
- bool isNull= TRUE;
- int ret= ndb_blob->getNull(isNull);
- DBUG_ASSERT(ret == 0);
- if (isNull)
- (*field)->set_null(row_offset);
- }
- }
- }
- #ifndef DBUG_OFF
- // Read and print all values that was fetched
- if (table->primary_key == MAX_KEY)
- {
- // Table with hidden primary key
- int hidden_no= table->fields;
- const NDBTAB *tab= (const NDBTAB *) m_table;
- const NDBCOL *hidden_col= tab->getColumn(hidden_no);
- NdbRecAttr* rec= m_value[hidden_no].rec;
- DBUG_ASSERT(rec);
- DBUG_PRINT("hidden", ("%d: %s "%llu"", hidden_no,
- hidden_col->getName(), rec->u_64_value()));
- }
- print_results();
- #endif
- DBUG_VOID_RETURN;
- }
- /*
- Utility function to print/dump the fetched field
- */
- void ha_ndbcluster::print_results()
- {
- const NDBTAB *tab= (const NDBTAB*) m_table;
- DBUG_ENTER("print_results");
- #ifndef DBUG_OFF
- if (!_db_on_)
- DBUG_VOID_RETURN;
- for (uint f=0; f<table->fields;f++)
- {
- Field *field;
- const NDBCOL *col;
- NdbValue value;
- if (!(value= m_value[f]).ptr)
- {
- fprintf(DBUG_FILE, "Field %d was not readn", f);
- continue;
- }
- field= table->field[f];
- DBUG_DUMP("field->ptr", (char*)field->ptr, field->pack_length());
- col= tab->getColumn(f);
- fprintf(DBUG_FILE, "%d: %st", f, col->getName());
- NdbBlob *ndb_blob= NULL;
- if (! (field->flags & BLOB_FLAG))
- {
- if (value.rec->isNULL())
- {
- fprintf(DBUG_FILE, "NULLn");
- continue;
- }
- }
- else
- {
- ndb_blob= value.blob;
- bool isNull= TRUE;
- ndb_blob->getNull(isNull);
- if (isNull) {
- fprintf(DBUG_FILE, "NULLn");
- continue;
- }
- }
- switch (col->getType()) {
- case NdbDictionary::Column::Tinyint: {
- char value= *field->ptr;
- fprintf(DBUG_FILE, "Tinyintt%d", value);
- break;
- }
- case NdbDictionary::Column::Tinyunsigned: {
- unsigned char value= *field->ptr;
- fprintf(DBUG_FILE, "Tinyunsignedt%u", value);
- break;
- }
- case NdbDictionary::Column::Smallint: {
- short value= *field->ptr;
- fprintf(DBUG_FILE, "Smallintt%d", value);
- break;
- }
- case NdbDictionary::Column::Smallunsigned: {
- unsigned short value= *field->ptr;
- fprintf(DBUG_FILE, "Smallunsignedt%u", value);
- break;
- }
- case NdbDictionary::Column::Mediumint: {
- byte value[3];
- memcpy(value, field->ptr, 3);
- fprintf(DBUG_FILE, "Mediumintt%d,%d,%d", value[0], value[1], value[2]);
- break;
- }
- case NdbDictionary::Column::Mediumunsigned: {
- byte value[3];
- memcpy(value, field->ptr, 3);
- fprintf(DBUG_FILE, "Mediumunsignedt%u,%u,%u", value[0], value[1], value[2]);
- break;
- }
- case NdbDictionary::Column::Int: {
- fprintf(DBUG_FILE, "Intt%lld", field->val_int());
- break;
- }
- case NdbDictionary::Column::Unsigned: {
- Uint32 value= (Uint32) *field->ptr;
- fprintf(DBUG_FILE, "Unsignedt%u", value);
- break;
- }
- case NdbDictionary::Column::Bigint: {
- Int64 value= (Int64) *field->ptr;
- fprintf(DBUG_FILE, "Bigintt%lld", value);
- break;
- }
- case NdbDictionary::Column::Bigunsigned: {
- Uint64 value= (Uint64) *field->ptr;
- fprintf(DBUG_FILE, "Bigunsignedt%llu", value);
- break;
- }
- case NdbDictionary::Column::Float: {
- float value= (float) *field->ptr;
- fprintf(DBUG_FILE, "Floatt%f", value);
- break;
- }
- case NdbDictionary::Column::Double: {
- double value= (double) *field->ptr;
- fprintf(DBUG_FILE, "Doublet%f", value);
- break;
- }
- case NdbDictionary::Column::Olddecimal: {
- char *value= field->ptr;
- fprintf(DBUG_FILE, "Olddecimalt'%-*s'", field->pack_length(), value);
- break;
- }
- case NdbDictionary::Column::Olddecimalunsigned: {
- char *value= field->ptr;
- fprintf(DBUG_FILE, "Olddecimalunsignedt'%-*s'", field->pack_length(), value);
- break;
- }
- case NdbDictionary::Column::Char:{
- const char *value= (char *) field->ptr;
- fprintf(DBUG_FILE, "Chart'%.*s'", field->pack_length(), value);
- break;
- }
- case NdbDictionary::Column::Varchar:
- case NdbDictionary::Column::Binary:
- case NdbDictionary::Column::Varbinary: {
- const char *value= (char *) field->ptr;
- fprintf(DBUG_FILE, "Vart'%.*s'", field->pack_length(), value);
- break;
- }
- case NdbDictionary::Column::Datetime: {
- Uint64 value= (Uint64) *field->ptr;
- fprintf(DBUG_FILE, "Datetimet%llu", value);
- break;
- }
- case NdbDictionary::Column::Date: {
- Uint64 value= (Uint64) *field->ptr;
- fprintf(DBUG_FILE, "Datet%llu", value);
- break;
- }
- case NdbDictionary::Column::Time: {
- Uint64 value= (Uint64) *field->ptr;
- fprintf(DBUG_FILE, "Timet%llu", value);
- break;
- }
- case NdbDictionary::Column::Blob: {
- Uint64 len= 0;
- ndb_blob->getLength(len);
- fprintf(DBUG_FILE, "Blobt[len=%u]", (unsigned)len);
- break;
- }
- case NdbDictionary::Column::Text: {
- Uint64 len= 0;
- ndb_blob->getLength(len);
- fprintf(DBUG_FILE, "Textt[len=%u]", (unsigned)len);
- break;
- }
- case NdbDictionary::Column::Undefined:
- default:
- fprintf(DBUG_FILE, "Unknown type: %d", col->getType());
- break;
- }
- fprintf(DBUG_FILE, "n");
- }
- #endif
- DBUG_VOID_RETURN;
- }
- int ha_ndbcluster::index_init(uint index)
- {
- DBUG_ENTER("index_init");
- DBUG_PRINT("enter", ("index: %u", index));
- DBUG_RETURN(handler::index_init(index));
- }
- int ha_ndbcluster::index_end()
- {
- DBUG_ENTER("index_end");
- DBUG_RETURN(close_scan());
- }
- /**
- * Check if key contains null
- */
- static
- int
- check_null_in_key(const KEY* key_info, const byte *key, uint key_len)
- {
- KEY_PART_INFO *curr_part, *end_part;
- const byte* end_ptr = key + key_len;
- curr_part= key_info->key_part;
- end_part= curr_part + key_info->key_parts;
- for (; curr_part != end_part && key < end_ptr; curr_part++)
- {
- if(curr_part->null_bit && *key)
- return 1;
- key += curr_part->store_length;
- }
- return 0;
- }
- int ha_ndbcluster::index_read(byte *buf,
- const byte *key, uint key_len,
- enum ha_rkey_function find_flag)
- {
- DBUG_ENTER("index_read");
- DBUG_PRINT("enter", ("active_index: %u, key_len: %u, find_flag: %d",
- active_index, key_len, find_flag));
- int error;
- ndb_index_type type = get_index_type(active_index);
- const KEY* key_info = table->key_info+active_index;
- switch (type){
- case PRIMARY_KEY_ORDERED_INDEX:
- case PRIMARY_KEY_INDEX:
- if (find_flag == HA_READ_KEY_EXACT && key_info->key_length == key_len)
- {
- if(m_active_cursor && (error= close_scan()))
- DBUG_RETURN(error);
- DBUG_RETURN(pk_read(key, key_len, buf));
- }
- else if (type == PRIMARY_KEY_INDEX)
- {
- DBUG_RETURN(1);
- }
- break;
- case UNIQUE_ORDERED_INDEX:
- case UNIQUE_INDEX:
- if (find_flag == HA_READ_KEY_EXACT && key_info->key_length == key_len &&
- !check_null_in_key(key_info, key, key_len))
- {
- if(m_active_cursor && (error= close_scan()))
- DBUG_RETURN(error);
- DBUG_RETURN(unique_index_read(key, key_len, buf));
- }
- else if (type == UNIQUE_INDEX)
- {
- DBUG_RETURN(1);
- }
- break;
- case ORDERED_INDEX:
- break;
- default:
- case UNDEFINED_INDEX:
- DBUG_ASSERT(FALSE);
- DBUG_RETURN(1);
- break;
- }
- key_range start_key;
- start_key.key = key;
- start_key.length = key_len;
- start_key.flag = find_flag;
- error= ordered_index_scan(&start_key, 0, TRUE, buf);
- DBUG_RETURN(error == HA_ERR_END_OF_FILE ? HA_ERR_KEY_NOT_FOUND : error);
- }
- int ha_ndbcluster::index_read_idx(byte *buf, uint index_no,
- const byte *key, uint key_len,
- enum ha_rkey_function find_flag)
- {
- statistic_increment(ha_read_key_count,&LOCK_status);
- DBUG_ENTER("index_read_idx");
- DBUG_PRINT("enter", ("index_no: %u, key_len: %u", index_no, key_len));
- index_init(index_no);
- DBUG_RETURN(index_read(buf, key, key_len, find_flag));
- }
- int ha_ndbcluster::index_next(byte *buf)
- {
- DBUG_ENTER("index_next");
- int error= 1;
- statistic_increment(ha_read_next_count,&LOCK_status);
- DBUG_RETURN(next_result(buf));
- }
- int ha_ndbcluster::index_prev(byte *buf)
- {
- DBUG_ENTER("index_prev");
- statistic_increment(ha_read_prev_count,&LOCK_status);
- DBUG_RETURN(1);
- }
- int ha_ndbcluster::index_first(byte *buf)
- {
- DBUG_ENTER("index_first");
- statistic_increment(ha_read_first_count,&LOCK_status);
- // Start the ordered index scan and fetch the first row
- // Only HA_READ_ORDER indexes get called by index_first
- DBUG_RETURN(ordered_index_scan(0, 0, TRUE, buf));
- }
- int ha_ndbcluster::index_last(byte *buf)
- {
- DBUG_ENTER("index_last");
- statistic_increment(ha_read_last_count,&LOCK_status);
- int res;
- if((res= ordered_index_scan(0, 0, TRUE, buf)) == 0){
- NdbResultSet *cursor= m_active_cursor;
- while((res= cursor->nextResult(TRUE, m_force_send)) == 0);
- if(res == 1){
- unpack_record(buf);
- table->status= 0;
- DBUG_RETURN(0);
- }
- }