ha_ndbcluster.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:136k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1.  /* Copyright (C) 2000-2003 MySQL AB
  2.   This program is free software; you can redistribute it and/or modify
  3.   it under the terms of the GNU General Public License as published by
  4.   the Free Software Foundation; either version 2 of the License, or
  5.   (at your option) any later version.
  6.   This program is distributed in the hope that it will be useful,
  7.   but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.   GNU General Public License for more details.
  10.   You should have received a copy of the GNU General Public License
  11.   along with this program; if not, write to the Free Software
  12.   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA 
  13. */
  14. /*
  15.   This file defines the NDB Cluster handler: the interface between MySQL and
  16.   NDB Cluster
  17. */
  18. #ifdef USE_PRAGMA_IMPLEMENTATION
  19. #pragma implementation // gcc: Class implementation
  20. #endif
  21. #include "mysql_priv.h"
  22. #ifdef HAVE_NDBCLUSTER_DB
  23. #include <my_dir.h>
  24. #include "ha_ndbcluster.h"
  25. #include <ndbapi/NdbApi.hpp>
  26. #include <ndbapi/NdbScanFilter.hpp>
  27. // options from from mysqld.cc
  28. extern my_bool opt_ndb_optimized_node_selection;
  29. extern const char *opt_ndbcluster_connectstring;
  30. // Default value for parallelism
  31. static const int parallelism= 240;
  32. // Default value for max number of transactions
  33. // createable against NDB from this handler
  34. static const int max_transactions= 256;
  35. static const char *ha_ndb_ext=".ndb";
  36. #define NDB_HIDDEN_PRIMARY_KEY_LENGTH 8
  37. #define NDB_FAILED_AUTO_INCREMENT ~(Uint64)0
  38. #define NDB_AUTO_INCREMENT_RETRIES 10
  39. #define NDB_INVALID_SCHEMA_OBJECT 241
  40. #define ERR_PRINT(err) 
  41.   DBUG_PRINT("error", ("%d  message: %s", err.code, err.message))
  42. #define ERR_RETURN(err)          
  43. {          
  44.   ERR_PRINT(err);          
  45.   DBUG_RETURN(ndb_to_mysql_error(&err)); 
  46. }
  47. // Typedefs for long names
  48. typedef NdbDictionary::Column NDBCOL;
  49. typedef NdbDictionary::Table NDBTAB;
  50. typedef NdbDictionary::Index  NDBINDEX;
  51. typedef NdbDictionary::Dictionary  NDBDICT;
  52. bool ndbcluster_inited= FALSE;
  53. static Ndb* g_ndb= NULL;
  54. static Ndb_cluster_connection* g_ndb_cluster_connection= NULL;
  55. // Handler synchronization
  56. pthread_mutex_t ndbcluster_mutex;
  57. // Table lock handling
  58. static HASH ndbcluster_open_tables;
  59. static byte *ndbcluster_get_key(NDB_SHARE *share,uint *length,
  60.                                 my_bool not_used __attribute__((unused)));
  61. static NDB_SHARE *get_share(const char *table_name);
  62. static void free_share(NDB_SHARE *share);
  63. static int packfrm(const void *data, uint len, const void **pack_data, uint *pack_len);
  64. static int unpackfrm(const void **data, uint *len,
  65.      const void* pack_data);
  66. static int ndb_get_table_statistics(Ndb*, const char *, 
  67.     Uint64* rows, Uint64* commits);
  68. /*
  69.   Dummy buffer to read zero pack_length fields
  70.   which are mapped to 1 char
  71. */
  72. static byte dummy_buf[1];
  73. /*
  74.   Error handling functions
  75. */
  76. struct err_code_mapping
  77. {
  78.   int ndb_err;
  79.   int my_err;
  80.   int show_warning;
  81. };
  82. static const err_code_mapping err_map[]= 
  83. {
  84.   { 626, HA_ERR_KEY_NOT_FOUND, 0 },
  85.   { 630, HA_ERR_FOUND_DUPP_KEY, 0 },
  86.   { 893, HA_ERR_FOUND_DUPP_KEY, 0 },
  87.   { 721, HA_ERR_TABLE_EXIST, 1 },
  88.   { 4244, HA_ERR_TABLE_EXIST, 1 },
  89.   { 709, HA_ERR_NO_SUCH_TABLE, 1 },
  90.   { 266, HA_ERR_LOCK_WAIT_TIMEOUT, 1 },
  91.   { 274, HA_ERR_LOCK_WAIT_TIMEOUT, 1 },
  92.   { 296, HA_ERR_LOCK_WAIT_TIMEOUT, 1 },
  93.   { 297, HA_ERR_LOCK_WAIT_TIMEOUT, 1 },
  94.   { 237, HA_ERR_LOCK_WAIT_TIMEOUT, 1 },
  95.   { 623, HA_ERR_RECORD_FILE_FULL, 1 },
  96.   { 624, HA_ERR_RECORD_FILE_FULL, 1 },
  97.   { 625, HA_ERR_RECORD_FILE_FULL, 1 },
  98.   { 826, HA_ERR_RECORD_FILE_FULL, 1 },
  99.   { 827, HA_ERR_RECORD_FILE_FULL, 1 },
  100.   { 832, HA_ERR_RECORD_FILE_FULL, 1 },
  101.   { 0, 1, 0 },
  102.   { -1, -1, 1 }
  103. };
  104. static int ndb_to_mysql_error(const NdbError *err)
  105. {
  106.   uint i;
  107.   for (i=0; err_map[i].ndb_err != err->code && err_map[i].my_err != -1; i++);
  108.   if (err_map[i].show_warning)
  109.   {
  110.     // Push the NDB error message as warning
  111.     push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
  112. ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
  113. err->code, err->message, "NDB");
  114.   }
  115.   if (err_map[i].my_err == -1)
  116.     return err->code;
  117.   return err_map[i].my_err;
  118. }
  119. inline
  120. int execute_no_commit(ha_ndbcluster *h, NdbConnection *trans)
  121. {
  122.   int m_batch_execute= 0;
  123. #ifdef NOT_USED
  124.   if (m_batch_execute)
  125.     return 0;
  126. #endif
  127.   return trans->execute(NoCommit,AbortOnError,h->m_force_send);
  128. }
  129. inline
  130. int execute_commit(ha_ndbcluster *h, NdbConnection *trans)
  131. {
  132.   int m_batch_execute= 0;
  133. #ifdef NOT_USED
  134.   if (m_batch_execute)
  135.     return 0;
  136. #endif
  137.   return trans->execute(Commit,AbortOnError,h->m_force_send);
  138. }
  139. inline
  140. int execute_commit(THD *thd, NdbConnection *trans)
  141. {
  142.   int m_batch_execute= 0;
  143. #ifdef NOT_USED
  144.   if (m_batch_execute)
  145.     return 0;
  146. #endif
  147.   return trans->execute(Commit,AbortOnError,thd->variables.ndb_force_send);
  148. }
  149. inline
  150. int execute_no_commit_ie(ha_ndbcluster *h, NdbConnection *trans)
  151. {
  152.   int m_batch_execute= 0;
  153. #ifdef NOT_USED
  154.   if (m_batch_execute)
  155.     return 0;
  156. #endif
  157.   return trans->execute(NoCommit, AO_IgnoreError,h->m_force_send);
  158. }
  159. /*
  160.   Place holder for ha_ndbcluster thread specific data
  161. */
  162. Thd_ndb::Thd_ndb()
  163. {
  164.   ndb= new Ndb(g_ndb_cluster_connection, "");
  165.   lock_count= 0;
  166.   count= 0;
  167.   error= 0;
  168. }
  169. Thd_ndb::~Thd_ndb()
  170. {
  171.   if (ndb)
  172.   {
  173. #ifndef DBUG_OFF
  174.     Ndb::Free_list_usage tmp; tmp.m_name= 0;
  175.     while (ndb->get_free_list_usage(&tmp))
  176.     {
  177.       uint leaked= (uint) tmp.m_created - tmp.m_free;
  178.       if (leaked)
  179.         fprintf(stderr, "NDB: Found %u %s%s that %s not been releasedn",
  180.                 leaked, tmp.m_name,
  181.                 (leaked == 1)?"":"'s",
  182.                 (leaked == 1)?"has":"have");
  183.     }
  184. #endif
  185.     delete ndb;
  186.   }
  187.   ndb= 0;
  188. }
  189. inline
  190. Ndb *ha_ndbcluster::get_ndb()
  191. {
  192.   return ((Thd_ndb*)current_thd->transaction.thd_ndb)->ndb;
  193. }
  194. /*
  195.  * manage uncommitted insert/deletes during transactio to get records correct
  196.  */
  197. struct Ndb_local_table_statistics {
  198.   int no_uncommitted_rows_count;
  199.   ulong last_count;
  200.   ha_rows records;
  201. };
  202. void ha_ndbcluster::set_rec_per_key()
  203. {
  204.   DBUG_ENTER("ha_ndbcluster::get_status_const");
  205.   for (uint i=0 ; i < table->keys ; i++)
  206.   {
  207.     table->key_info[i].rec_per_key[table->key_info[i].key_parts-1]= 1;
  208.   }
  209.   DBUG_VOID_RETURN;
  210. }
  211. void ha_ndbcluster::records_update()
  212. {
  213.   if (m_ha_not_exact_count)
  214.     return;
  215.   DBUG_ENTER("ha_ndbcluster::records_update");
  216.   struct Ndb_local_table_statistics *info= 
  217.     (struct Ndb_local_table_statistics *)m_table_info;
  218.   DBUG_PRINT("info", ("id=%d, no_uncommitted_rows_count=%d",
  219.       ((const NDBTAB *)m_table)->getTableId(),
  220.       info->no_uncommitted_rows_count));
  221.   //  if (info->records == ~(ha_rows)0)
  222.   {
  223.     Ndb *ndb= get_ndb();
  224.     Uint64 rows;
  225.     if(ndb_get_table_statistics(ndb, m_tabname, &rows, 0) == 0){
  226.       info->records= rows;
  227.     }
  228.   }
  229.   {
  230.     THD *thd= current_thd;
  231.     if (((Thd_ndb*)(thd->transaction.thd_ndb))->error)
  232.       info->no_uncommitted_rows_count= 0;
  233.   }
  234.   records= info->records+ info->no_uncommitted_rows_count;
  235.   DBUG_VOID_RETURN;
  236. }
  237. void ha_ndbcluster::no_uncommitted_rows_execute_failure()
  238. {
  239.   if (m_ha_not_exact_count)
  240.     return;
  241.   DBUG_ENTER("ha_ndbcluster::no_uncommitted_rows_execute_failure");
  242.   THD *thd= current_thd;
  243.   ((Thd_ndb*)(thd->transaction.thd_ndb))->error= 1;
  244.   DBUG_VOID_RETURN;
  245. }
  246. void ha_ndbcluster::no_uncommitted_rows_init(THD *thd)
  247. {
  248.   if (m_ha_not_exact_count)
  249.     return;
  250.   DBUG_ENTER("ha_ndbcluster::no_uncommitted_rows_init");
  251.   struct Ndb_local_table_statistics *info= 
  252.     (struct Ndb_local_table_statistics *)m_table_info;
  253.   Thd_ndb *thd_ndb= (Thd_ndb *)thd->transaction.thd_ndb;
  254.   if (info->last_count != thd_ndb->count)
  255.   {
  256.     info->last_count = thd_ndb->count;
  257.     info->no_uncommitted_rows_count= 0;
  258.     info->records= ~(ha_rows)0;
  259.     DBUG_PRINT("info", ("id=%d, no_uncommitted_rows_count=%d",
  260. ((const NDBTAB *)m_table)->getTableId(),
  261. info->no_uncommitted_rows_count));
  262.   }
  263.   DBUG_VOID_RETURN;
  264. }
  265. void ha_ndbcluster::no_uncommitted_rows_update(int c)
  266. {
  267.   if (m_ha_not_exact_count)
  268.     return;
  269.   DBUG_ENTER("ha_ndbcluster::no_uncommitted_rows_update");
  270.   struct Ndb_local_table_statistics *info=
  271.     (struct Ndb_local_table_statistics *)m_table_info;
  272.   info->no_uncommitted_rows_count+= c;
  273.   DBUG_PRINT("info", ("id=%d, no_uncommitted_rows_count=%d",
  274.       ((const NDBTAB *)m_table)->getTableId(),
  275.       info->no_uncommitted_rows_count));
  276.   DBUG_VOID_RETURN;
  277. }
  278. void ha_ndbcluster::no_uncommitted_rows_reset(THD *thd)
  279. {
  280.   if (m_ha_not_exact_count)
  281.     return;
  282.   DBUG_ENTER("ha_ndbcluster::no_uncommitted_rows_reset");
  283.   ((Thd_ndb*)(thd->transaction.thd_ndb))->count++;
  284.   ((Thd_ndb*)(thd->transaction.thd_ndb))->error= 0;
  285.   DBUG_VOID_RETURN;
  286. }
  287. /*
  288.   Take care of the error that occured in NDB
  289.   
  290.   RETURN
  291.     0 No error
  292.     #   The mapped error code
  293. */
  294. void ha_ndbcluster::invalidate_dictionary_cache(bool global)
  295. {
  296.   NDBDICT *dict= get_ndb()->getDictionary();
  297.   DBUG_ENTER("invalidate_dictionary_cache");
  298.   DBUG_PRINT("info", ("invalidating %s", m_tabname));
  299.   if (global)
  300.   {
  301.     const NDBTAB *tab= dict->getTable(m_tabname);
  302.     if (!tab)
  303.       DBUG_VOID_RETURN;
  304.     if (tab->getObjectStatus() == NdbDictionary::Object::Invalid)
  305.     {
  306.       // Global cache has already been invalidated
  307.       dict->removeCachedTable(m_tabname);
  308.       global= FALSE;
  309.     }
  310.     else
  311.       dict->invalidateTable(m_tabname);
  312.   }
  313.   else
  314.     dict->removeCachedTable(m_tabname);
  315.   table->version=0L; /* Free when thread is ready */
  316.   /* Invalidate indexes */
  317.   for (uint i= 0; i < table->keys; i++)
  318.   {
  319.     NDBINDEX *index = (NDBINDEX *) m_index[i].index;
  320.     NDBINDEX *unique_index = (NDBINDEX *) m_index[i].unique_index;
  321.     NDB_INDEX_TYPE idx_type= m_index[i].type;
  322.     switch(idx_type) {
  323.     case(PRIMARY_KEY_ORDERED_INDEX):
  324.     case(ORDERED_INDEX):
  325.       if (global)
  326.         dict->invalidateIndex(index->getName(), m_tabname);
  327.       else
  328.         dict->removeCachedIndex(index->getName(), m_tabname);
  329.       break;      
  330.     case(UNIQUE_ORDERED_INDEX):
  331.       if (global)
  332.         dict->invalidateIndex(index->getName(), m_tabname);
  333.       else
  334.         dict->removeCachedIndex(index->getName(), m_tabname);
  335.     case(UNIQUE_INDEX):
  336.       if (global)
  337.         dict->invalidateIndex(unique_index->getName(), m_tabname);
  338.       else
  339.         dict->removeCachedIndex(unique_index->getName(), m_tabname);
  340.       break;
  341.     case(PRIMARY_KEY_INDEX):
  342.     case(UNDEFINED_INDEX):
  343.       break;
  344.     }
  345.   }
  346.   DBUG_VOID_RETURN;
  347. }
  348. int ha_ndbcluster::ndb_err(NdbConnection *trans)
  349. {
  350.   int res;
  351.   NdbError err= trans->getNdbError();
  352.   DBUG_ENTER("ndb_err");
  353.   
  354.   ERR_PRINT(err);
  355.   switch (err.classification) {
  356.   case NdbError::SchemaError:
  357.   {
  358.     invalidate_dictionary_cache(TRUE);
  359.     if (err.code==284)
  360.     {
  361.       /*
  362.          Check if the table is _really_ gone or if the table has
  363.          been alterend and thus changed table id
  364.        */
  365.       NDBDICT *dict= get_ndb()->getDictionary();
  366.       DBUG_PRINT("info", ("Check if table %s is really gone", m_tabname));
  367.       if (!(dict->getTable(m_tabname)))
  368.       {
  369.         err= dict->getNdbError();
  370.         DBUG_PRINT("info", ("Table not found, error: %d", err.code));
  371.         if (err.code != 709)
  372.           DBUG_RETURN(1);
  373.       }
  374.       else
  375.       {
  376.         DBUG_PRINT("info", ("Table exist but must have changed"));
  377.         /* In 5.0, this should be replaced with a mapping to a mysql error */
  378.         my_printf_error(ER_UNKNOWN_ERROR,
  379.                         "Table definition has changed, "
  380.                         "please retry transaction",
  381.                         MYF(0));
  382.         DBUG_RETURN(1);
  383.       }
  384.     }
  385.     break;
  386.   }
  387.   default:
  388.     break;
  389.   }
  390.   res= ndb_to_mysql_error(&err);
  391.   DBUG_PRINT("info", ("transformed ndbcluster error %d to mysql error %d", 
  392.       err.code, res));
  393.   if (res == HA_ERR_FOUND_DUPP_KEY)
  394.   {
  395.     if (m_rows_to_insert == 1)
  396.       m_dupkey= table->primary_key;
  397.     else
  398.     {
  399.       /* We are batching inserts, offending key is not available */
  400.       m_dupkey= (uint) -1;
  401.     }
  402.   }
  403.   DBUG_RETURN(res);
  404. }
  405. /*
  406.   Override the default get_error_message in order to add the 
  407.   error message of NDB 
  408.  */
  409. bool ha_ndbcluster::get_error_message(int error, 
  410.       String *buf)
  411. {
  412.   DBUG_ENTER("ha_ndbcluster::get_error_message");
  413.   DBUG_PRINT("enter", ("error: %d", error));
  414.   Ndb *ndb= get_ndb();
  415.   if (!ndb)
  416.     DBUG_RETURN(FALSE);
  417.   const NdbError err= ndb->getNdbError(error);
  418.   bool temporary= err.status==NdbError::TemporaryError;
  419.   buf->set(err.message, strlen(err.message), &my_charset_bin);
  420.   DBUG_PRINT("exit", ("message: %s, temporary: %d", buf->ptr(), temporary));
  421.   DBUG_RETURN(temporary);
  422. }
  423. #ifndef DBUG_OFF
  424. /*
  425.   Check if type is supported by NDB.
  426. */
  427. static bool ndb_supported_type(enum_field_types type)
  428. {
  429.   switch (type) {
  430.   case MYSQL_TYPE_DECIMAL:    
  431.   case MYSQL_TYPE_TINY:        
  432.   case MYSQL_TYPE_SHORT:
  433.   case MYSQL_TYPE_LONG:
  434.   case MYSQL_TYPE_INT24:       
  435.   case MYSQL_TYPE_LONGLONG:
  436.   case MYSQL_TYPE_FLOAT:
  437.   case MYSQL_TYPE_DOUBLE:
  438.   case MYSQL_TYPE_TIMESTAMP:
  439.   case MYSQL_TYPE_DATETIME:    
  440.   case MYSQL_TYPE_DATE:
  441.   case MYSQL_TYPE_NEWDATE:
  442.   case MYSQL_TYPE_TIME:        
  443.   case MYSQL_TYPE_YEAR:        
  444.   case MYSQL_TYPE_STRING:      
  445.   case MYSQL_TYPE_VAR_STRING:
  446.   case MYSQL_TYPE_TINY_BLOB:
  447.   case MYSQL_TYPE_BLOB:    
  448.   case MYSQL_TYPE_MEDIUM_BLOB:   
  449.   case MYSQL_TYPE_LONG_BLOB:  
  450.   case MYSQL_TYPE_ENUM:
  451.   case MYSQL_TYPE_SET:         
  452.     return TRUE;
  453.   case MYSQL_TYPE_NULL:   
  454.   case MYSQL_TYPE_GEOMETRY:
  455.     break;
  456.   }
  457.   return FALSE;
  458. }
  459. #endif /* !DBUG_OFF */
  460. /*
  461.   Instruct NDB to set the value of the hidden primary key
  462. */
  463. bool ha_ndbcluster::set_hidden_key(NdbOperation *ndb_op,
  464.    uint fieldnr, const byte *field_ptr)
  465. {
  466.   DBUG_ENTER("set_hidden_key");
  467.   DBUG_RETURN(ndb_op->equal(fieldnr, (char*)field_ptr,
  468.     NDB_HIDDEN_PRIMARY_KEY_LENGTH) != 0);
  469. }
  470. /*
  471.   Instruct NDB to set the value of one primary key attribute
  472. */
  473. int ha_ndbcluster::set_ndb_key(NdbOperation *ndb_op, Field *field,
  474.                                uint fieldnr, const byte *field_ptr)
  475. {
  476.   uint32 pack_len= field->pack_length();
  477.   DBUG_ENTER("set_ndb_key");
  478.   DBUG_PRINT("enter", ("%d: %s, ndb_type: %u, len=%d", 
  479.                        fieldnr, field->field_name, field->type(),
  480.                        pack_len));
  481.   DBUG_DUMP("key", (char*)field_ptr, pack_len);
  482.   
  483.   DBUG_ASSERT(ndb_supported_type(field->type()));
  484.   DBUG_ASSERT(! (field->flags & BLOB_FLAG));
  485.   // Common implementation for most field types
  486.   DBUG_RETURN(ndb_op->equal(fieldnr, (char*) field_ptr, pack_len) != 0);
  487. }
  488. /*
  489.  Instruct NDB to set the value of one attribute
  490. */
  491. int ha_ndbcluster::set_ndb_value(NdbOperation *ndb_op, Field *field, 
  492.                                  uint fieldnr, bool *set_blob_value)
  493. {
  494.   const byte* field_ptr= field->ptr;
  495.   uint32 pack_len=  field->pack_length();
  496.   DBUG_ENTER("set_ndb_value");
  497.   DBUG_PRINT("enter", ("%d: %s, type: %u, len=%d, is_null=%s", 
  498.                        fieldnr, field->field_name, field->type(), 
  499.                        pack_len, field->is_null()?"Y":"N"));
  500.   DBUG_DUMP("value", (char*) field_ptr, pack_len);
  501.   DBUG_ASSERT(ndb_supported_type(field->type()));
  502.   {
  503.     // ndb currently does not support size 0
  504.     const byte *empty_field= "";
  505.     if (pack_len == 0)
  506.     {
  507.       pack_len= 1;
  508.       field_ptr= empty_field;
  509.     }
  510.     if (! (field->flags & BLOB_FLAG))
  511.     {
  512.       if (field->is_null())
  513.         // Set value to NULL
  514.         DBUG_RETURN((ndb_op->setValue(fieldnr, (char*)NULL, pack_len) != 0));
  515.       // Common implementation for most field types
  516.       DBUG_RETURN(ndb_op->setValue(fieldnr, (char*)field_ptr, pack_len) != 0);
  517.     }
  518.     // Blob type
  519.     NdbBlob *ndb_blob= ndb_op->getBlobHandle(fieldnr);
  520.     if (ndb_blob != NULL)
  521.     {
  522.       if (field->is_null())
  523.         DBUG_RETURN(ndb_blob->setNull() != 0);
  524.       Field_blob *field_blob= (Field_blob*)field;
  525.       // Get length and pointer to data
  526.       uint32 blob_len= field_blob->get_length(field_ptr);
  527.       char* blob_ptr= NULL;
  528.       field_blob->get_ptr(&blob_ptr);
  529.       // Looks like NULL ptr signals length 0 blob
  530.       if (blob_ptr == NULL) {
  531.         DBUG_ASSERT(blob_len == 0);
  532.         blob_ptr= (char*)"";
  533.       }
  534.       DBUG_PRINT("value", ("set blob ptr=%p len=%u",
  535.    blob_ptr, blob_len));
  536.       DBUG_DUMP("value", (char*)blob_ptr, min(blob_len, 26));
  537.       if (set_blob_value)
  538. *set_blob_value= TRUE;
  539.       // No callback needed to write value
  540.       DBUG_RETURN(ndb_blob->setValue(blob_ptr, blob_len) != 0);
  541.     }
  542.     DBUG_RETURN(1);
  543.   }
  544. }
  545. /*
  546.   Callback to read all blob values.
  547.   - not done in unpack_record because unpack_record is valid
  548.     after execute(Commit) but reading blobs is not
  549.   - may only generate read operations; they have to be executed
  550.     somewhere before the data is available
  551.   - due to single buffer for all blobs, we let the last blob
  552.     process all blobs (last so that all are active)
  553.   - null bit is still set in unpack_record
  554.   - TODO allocate blob part aligned buffers
  555. */
  556. NdbBlob::ActiveHook g_get_ndb_blobs_value;
  557. int g_get_ndb_blobs_value(NdbBlob *ndb_blob, void *arg)
  558. {
  559.   DBUG_ENTER("g_get_ndb_blobs_value");
  560.   if (ndb_blob->blobsNextBlob() != NULL)
  561.     DBUG_RETURN(0);
  562.   ha_ndbcluster *ha= (ha_ndbcluster *)arg;
  563.   DBUG_RETURN(ha->get_ndb_blobs_value(ndb_blob));
  564. }
  565. int ha_ndbcluster::get_ndb_blobs_value(NdbBlob *last_ndb_blob)
  566. {
  567.   DBUG_ENTER("get_ndb_blobs_value");
  568.   // Field has no field number so cannot use TABLE blob_field
  569.   // Loop twice, first only counting total buffer size
  570.   for (int loop= 0; loop <= 1; loop++)
  571.   {
  572.     uint32 offset= 0;
  573.     for (uint i= 0; i < table->fields; i++)
  574.     {
  575.       Field *field= table->field[i];
  576.       NdbValue value= m_value[i];
  577.       if (value.ptr != NULL && (field->flags & BLOB_FLAG))
  578.       {
  579.         Field_blob *field_blob= (Field_blob *)field;
  580.         NdbBlob *ndb_blob= value.blob;
  581.         Uint64 blob_len= 0;
  582.         if (ndb_blob->getLength(blob_len) != 0)
  583.           DBUG_RETURN(-1);
  584.         // Align to Uint64
  585.         uint32 blob_size= blob_len;
  586.         if (blob_size % 8 != 0)
  587.           blob_size+= 8 - blob_size % 8;
  588.         if (loop == 1)
  589.         {
  590.           char *buf= m_blobs_buffer + offset;
  591.           uint32 len= 0xffffffff;  // Max uint32
  592.           DBUG_PRINT("value", ("read blob ptr=%x len=%u",
  593.                                (UintPtr)buf, (uint)blob_len));
  594.           if (ndb_blob->readData(buf, len) != 0)
  595.             DBUG_RETURN(-1);
  596.           DBUG_ASSERT(len == blob_len);
  597.           field_blob->set_ptr(len, buf);
  598.         }
  599.         offset+= blob_size;
  600.       }
  601.     }
  602.     if (loop == 0 && offset > m_blobs_buffer_size)
  603.     {
  604.       my_free(m_blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
  605.       m_blobs_buffer_size= 0;
  606.       DBUG_PRINT("value", ("allocate blobs buffer size %u", offset));
  607.       m_blobs_buffer= my_malloc(offset, MYF(MY_WME));
  608.       if (m_blobs_buffer == NULL)
  609.         DBUG_RETURN(-1);
  610.       m_blobs_buffer_size= offset;
  611.     }
  612.   }
  613.   DBUG_RETURN(0);
  614. }
  615. /*
  616.   Instruct NDB to fetch one field
  617.   - data is read directly into buffer provided by field
  618.     if field is NULL, data is read into memory provided by NDBAPI
  619. */
  620. int ha_ndbcluster::get_ndb_value(NdbOperation *ndb_op, Field *field,
  621.                                  uint fieldnr, byte* buf)
  622. {
  623.   DBUG_ENTER("get_ndb_value");
  624.   DBUG_PRINT("enter", ("fieldnr: %d flags: %o", fieldnr,
  625.                        (int)(field != NULL ? field->flags : 0)));
  626.   if (field != NULL)
  627.   {
  628.       DBUG_ASSERT(buf);
  629.       DBUG_ASSERT(ndb_supported_type(field->type()));
  630.       DBUG_ASSERT(field->ptr != NULL);
  631.       if (! (field->flags & BLOB_FLAG))
  632.       {
  633. byte *field_buf;
  634. if (field->pack_length() != 0)
  635.   field_buf= buf + (field->ptr - table->record[0]);
  636. else
  637.   field_buf= dummy_buf;
  638.         m_value[fieldnr].rec= ndb_op->getValue(fieldnr, 
  639.        field_buf);
  640.         DBUG_RETURN(m_value[fieldnr].rec == NULL);
  641.       }
  642.       // Blob type
  643.       NdbBlob *ndb_blob= ndb_op->getBlobHandle(fieldnr);
  644.       m_value[fieldnr].blob= ndb_blob;
  645.       if (ndb_blob != NULL)
  646.       {
  647.         // Set callback
  648.         void *arg= (void *)this;
  649.         DBUG_RETURN(ndb_blob->setActiveHook(g_get_ndb_blobs_value, arg) != 0);
  650.       }
  651.       DBUG_RETURN(1);
  652.   }
  653.   // Used for hidden key only
  654.   m_value[fieldnr].rec= ndb_op->getValue(fieldnr, NULL);
  655.   DBUG_RETURN(m_value[fieldnr].rec == NULL);
  656. }
  657. /*
  658.   Check if any set or get of blob value in current query.
  659. */
  660. bool ha_ndbcluster::uses_blob_value(bool all_fields)
  661. {
  662.   if (table->blob_fields == 0)
  663.     return FALSE;
  664.   if (all_fields)
  665.     return TRUE;
  666.   {
  667.     uint no_fields= table->fields;
  668.     int i;
  669.     THD *thd= table->in_use;
  670.     // They always put blobs at the end..
  671.     for (i= no_fields - 1; i >= 0; i--)
  672.     {
  673.       Field *field= table->field[i];
  674.       if (thd->query_id == field->query_id)
  675.       {
  676.         return TRUE;
  677.       }
  678.     }
  679.   }
  680.   return FALSE;
  681. }
  682. /*
  683.   Get metadata for this table from NDB 
  684.   IMPLEMENTATION
  685.     - check that frm-file on disk is equal to frm-file
  686.       of table accessed in NDB
  687. */
  688. int ha_ndbcluster::get_metadata(const char *path)
  689. {
  690.   Ndb *ndb= get_ndb();
  691.   NDBDICT *dict= ndb->getDictionary();
  692.   const NDBTAB *tab;
  693.   int error;
  694.   bool invalidating_ndb_table= FALSE;
  695.   DBUG_ENTER("get_metadata");
  696.   DBUG_PRINT("enter", ("m_tabname: %s, path: %s", m_tabname, path));
  697.   do {
  698.     const void *data, *pack_data;
  699.     uint length, pack_length;
  700.     if (!(tab= dict->getTable(m_tabname)))
  701.       ERR_RETURN(dict->getNdbError());
  702.     // Check if thread has stale local cache
  703.     if (tab->getObjectStatus() == NdbDictionary::Object::Invalid)
  704.     {
  705.       invalidate_dictionary_cache(FALSE);
  706.       if (!(tab= dict->getTable(m_tabname)))
  707.          ERR_RETURN(dict->getNdbError());
  708.       DBUG_PRINT("info", ("Table schema version: %d", tab->getObjectVersion()));
  709.     }
  710.     /*
  711.       Compare FrmData in NDB with frm file from disk.
  712.     */
  713.     error= 0;
  714.     if (readfrm(path, &data, &length) ||
  715. packfrm(data, length, &pack_data, &pack_length))
  716.     {
  717.       my_free((char*)data, MYF(MY_ALLOW_ZERO_PTR));
  718.       my_free((char*)pack_data, MYF(MY_ALLOW_ZERO_PTR));
  719.       DBUG_RETURN(1);
  720.     }
  721.     
  722.     if ((pack_length != tab->getFrmLength()) || 
  723. (memcmp(pack_data, tab->getFrmData(), pack_length)))
  724.     {
  725.       if (!invalidating_ndb_table)
  726.       {
  727. DBUG_PRINT("info", ("Invalidating table"));
  728.         invalidate_dictionary_cache(TRUE);
  729. invalidating_ndb_table= TRUE;
  730.       }
  731.       else
  732.       {
  733. DBUG_PRINT("error", 
  734.    ("metadata, pack_length: %d getFrmLength: %d memcmp: %d", 
  735.     pack_length, tab->getFrmLength(),
  736.     memcmp(pack_data, tab->getFrmData(), pack_length)));      
  737. DBUG_DUMP("pack_data", (char*)pack_data, pack_length);
  738. DBUG_DUMP("frm", (char*)tab->getFrmData(), tab->getFrmLength());
  739. error= 3;
  740. invalidating_ndb_table= FALSE;
  741.       }
  742.     }
  743.     else
  744.     {
  745.       invalidating_ndb_table= FALSE;
  746.     }
  747.     my_free((char*)data, MYF(0));
  748.     my_free((char*)pack_data, MYF(0));
  749.   } while (invalidating_ndb_table);
  750.   if (error)
  751.     DBUG_RETURN(error);
  752.   
  753.   m_table_version= tab->getObjectVersion();
  754.   m_table= (void *)tab; 
  755.   m_table_info= NULL; // Set in external lock
  756.   
  757.   DBUG_RETURN(build_index_list(ndb, table, ILBP_OPEN));
  758. }
  759. static int fix_unique_index_attr_order(NDB_INDEX_DATA &data,
  760.        const NDBINDEX *index,
  761.        KEY *key_info)
  762. {
  763.   DBUG_ENTER("fix_unique_index_attr_order");
  764.   unsigned sz= index->getNoOfIndexColumns();
  765.   if (data.unique_index_attrid_map)
  766.     my_free((char*)data.unique_index_attrid_map, MYF(0));
  767.   data.unique_index_attrid_map= (unsigned char*)my_malloc(sz,MYF(MY_WME));
  768.   KEY_PART_INFO* key_part= key_info->key_part;
  769.   KEY_PART_INFO* end= key_part+key_info->key_parts;
  770.   DBUG_ASSERT(key_info->key_parts == sz);
  771.   for (unsigned i= 0; key_part != end; key_part++, i++) 
  772.   {
  773.     const char *field_name= key_part->field->field_name;
  774.     unsigned name_sz= strlen(field_name);
  775.     if (name_sz >= NDB_MAX_ATTR_NAME_SIZE)
  776.       name_sz= NDB_MAX_ATTR_NAME_SIZE-1;
  777. #ifndef DBUG_OFF
  778.    data.unique_index_attrid_map[i]= 255;
  779. #endif
  780.     for (unsigned j= 0; j < sz; j++)
  781.     {
  782.       const NDBCOL *c= index->getColumn(j);
  783.       if (strncmp(field_name, c->getName(), name_sz) == 0)
  784.       {
  785. data.unique_index_attrid_map[i]= j;
  786. break;
  787.       }
  788.     }
  789.     DBUG_ASSERT(data.unique_index_attrid_map[i] != 255);
  790.   }
  791.   DBUG_RETURN(0);
  792. }
  793. int ha_ndbcluster::build_index_list(Ndb *ndb, TABLE *tab, enum ILBP phase)
  794. {
  795.   uint i;
  796.   int error= 0;
  797.   const char *name, *index_name;
  798.   char unique_index_name[FN_LEN];
  799.   static const char* unique_suffix= "$unique";
  800.   KEY* key_info= tab->key_info;
  801.   const char **key_name= tab->keynames.type_names;
  802.   NDBDICT *dict= ndb->getDictionary();
  803.   DBUG_ENTER("build_index_list");
  804.   
  805.   // Save information about all known indexes
  806.   for (i= 0; i < tab->keys; i++, key_info++, key_name++)
  807.   {
  808.     index_name= *key_name;
  809.     NDB_INDEX_TYPE idx_type= get_index_type_from_table(i);
  810.     m_index[i].type= idx_type;
  811.     if (idx_type == UNIQUE_ORDERED_INDEX || idx_type == UNIQUE_INDEX)
  812.     {
  813.       strxnmov(unique_index_name, FN_LEN, index_name, unique_suffix, NullS);
  814.       DBUG_PRINT("info", ("Created unique index name '%s' for index %d",
  815.   unique_index_name, i));
  816.     }
  817.     // Create secondary indexes if in create phase
  818.     if (phase == ILBP_CREATE)
  819.     {
  820.       DBUG_PRINT("info", ("Creating index %u: %s", i, index_name));      
  821.       switch (idx_type){
  822.       case PRIMARY_KEY_INDEX:
  823. // Do nothing, already created
  824. break;
  825.       case PRIMARY_KEY_ORDERED_INDEX:
  826. error= create_ordered_index(index_name, key_info);
  827. break;
  828.       case UNIQUE_ORDERED_INDEX:
  829. if (!(error= create_ordered_index(index_name, key_info)))
  830.   error= create_unique_index(unique_index_name, key_info);
  831. break;
  832.       case UNIQUE_INDEX:
  833. if (!(error= check_index_fields_not_null(i)))
  834.   error= create_unique_index(unique_index_name, key_info);
  835. break;
  836.       case ORDERED_INDEX:
  837. error= create_ordered_index(index_name, key_info);
  838. break;
  839.       default:
  840. DBUG_ASSERT(FALSE);
  841. break;
  842.       }
  843.       if (error)
  844.       {
  845. DBUG_PRINT("error", ("Failed to create index %u", i));
  846. drop_table();
  847. break;
  848.       }
  849.     }
  850.     // Add handles to index objects
  851.     if (idx_type != PRIMARY_KEY_INDEX && idx_type != UNIQUE_INDEX)
  852.     {
  853.       DBUG_PRINT("info", ("Get handle to index %s", index_name));
  854.       const NDBINDEX *index= dict->getIndex(index_name, m_tabname);
  855.       if (!index) DBUG_RETURN(1);
  856.       m_index[i].index= (void *) index;
  857.     }
  858.     if (idx_type == UNIQUE_ORDERED_INDEX || idx_type == UNIQUE_INDEX)
  859.     {
  860.       DBUG_PRINT("info", ("Get handle to unique_index %s", unique_index_name));
  861.       const NDBINDEX *index= dict->getIndex(unique_index_name, m_tabname);
  862.       if (!index) DBUG_RETURN(1);
  863.       m_index[i].unique_index= (void *) index;
  864.       error= fix_unique_index_attr_order(m_index[i], index, key_info);
  865.     }
  866.   }
  867.   
  868.   DBUG_RETURN(error);
  869. }
  870. /*
  871.   Decode the type of an index from information 
  872.   provided in table object
  873. */
  874. NDB_INDEX_TYPE ha_ndbcluster::get_index_type_from_table(uint inx) const
  875. {
  876.   bool is_hash_index=  (table->key_info[inx].algorithm == HA_KEY_ALG_HASH);
  877.   if (inx == table->primary_key)
  878.     return is_hash_index ? PRIMARY_KEY_INDEX : PRIMARY_KEY_ORDERED_INDEX;
  879.   else
  880.     return ((table->key_info[inx].flags & HA_NOSAME) ? 
  881.     (is_hash_index ? UNIQUE_INDEX : UNIQUE_ORDERED_INDEX) :
  882.     ORDERED_INDEX);
  883. int ha_ndbcluster::check_index_fields_not_null(uint inx)
  884. {
  885.   KEY* key_info= table->key_info + inx;
  886.   KEY_PART_INFO* key_part= key_info->key_part;
  887.   KEY_PART_INFO* end= key_part+key_info->key_parts;
  888.   DBUG_ENTER("check_index_fields_not_null");
  889.   
  890.   for (; key_part != end; key_part++) 
  891.     {
  892.       Field* field= key_part->field;
  893.       if (field->maybe_null())
  894.       {
  895. my_printf_error(ER_NULL_COLUMN_IN_INDEX,ER(ER_NULL_COLUMN_IN_INDEX),
  896. MYF(0),field->field_name);
  897. DBUG_RETURN(ER_NULL_COLUMN_IN_INDEX);
  898.       }
  899.     }
  900.   
  901.   DBUG_RETURN(0);
  902. }
  903. void ha_ndbcluster::release_metadata()
  904. {
  905.   uint i;
  906.   DBUG_ENTER("release_metadata");
  907.   DBUG_PRINT("enter", ("m_tabname: %s", m_tabname));
  908.   m_table= NULL;
  909.   m_table_info= NULL;
  910.   // Release index list 
  911.   for (i= 0; i < MAX_KEY; i++)
  912.   {
  913.     m_index[i].unique_index= NULL;      
  914.     m_index[i].index= NULL;      
  915.     if (m_index[i].unique_index_attrid_map)
  916.     {
  917.       my_free((char *)m_index[i].unique_index_attrid_map, MYF(0));
  918.       m_index[i].unique_index_attrid_map= NULL;
  919.     }
  920.   }
  921.   DBUG_VOID_RETURN;
  922. }
  923. int ha_ndbcluster::get_ndb_lock_type(enum thr_lock_type type)
  924. {
  925.   if (type >= TL_WRITE_ALLOW_WRITE)
  926.     return NdbOperation::LM_Exclusive;
  927.   else if (uses_blob_value(m_retrieve_all_fields))
  928.     return NdbOperation::LM_Read;
  929.   else
  930.     return NdbOperation::LM_CommittedRead;
  931. }
  932. static const ulong index_type_flags[]=
  933. {
  934.   /* UNDEFINED_INDEX */
  935.   0,                         
  936.   /* PRIMARY_KEY_INDEX */
  937.   HA_ONLY_WHOLE_INDEX, 
  938.   /* PRIMARY_KEY_ORDERED_INDEX */
  939.   /* 
  940.      Enable HA_KEYREAD_ONLY when "sorted" indexes are supported, 
  941.      thus ORDERD BY clauses can be optimized by reading directly 
  942.      through the index.
  943.   */
  944.   // HA_KEYREAD_ONLY | 
  945.   HA_READ_NEXT |
  946.   HA_READ_RANGE |
  947.   HA_READ_ORDER,
  948.   /* UNIQUE_INDEX */
  949.   HA_ONLY_WHOLE_INDEX,
  950.   /* UNIQUE_ORDERED_INDEX */
  951.   HA_READ_NEXT |
  952.   HA_READ_RANGE |
  953.   HA_READ_ORDER,
  954.   /* ORDERED_INDEX */
  955.   HA_READ_NEXT |
  956.   HA_READ_RANGE |
  957.   HA_READ_ORDER
  958. };
  959. static const int index_flags_size= sizeof(index_type_flags)/sizeof(ulong);
  960. inline NDB_INDEX_TYPE ha_ndbcluster::get_index_type(uint idx_no) const
  961. {
  962.   DBUG_ASSERT(idx_no < MAX_KEY);
  963.   return m_index[idx_no].type;
  964. }
  965. /*
  966.   Get the flags for an index
  967.   RETURN
  968.     flags depending on the type of the index.
  969. */
  970. inline ulong ha_ndbcluster::index_flags(uint idx_no, uint part,
  971.                                         bool all_parts) const 
  972.   DBUG_ENTER("index_flags");
  973.   DBUG_PRINT("info", ("idx_no: %d", idx_no));
  974.   DBUG_ASSERT(get_index_type_from_table(idx_no) < index_flags_size);
  975.   DBUG_RETURN(index_type_flags[get_index_type_from_table(idx_no)]);
  976. }
  977. int ha_ndbcluster::set_primary_key(NdbOperation *op, const byte *key)
  978. {
  979.   KEY* key_info= table->key_info + table->primary_key;
  980.   KEY_PART_INFO* key_part= key_info->key_part;
  981.   KEY_PART_INFO* end= key_part+key_info->key_parts;
  982.   DBUG_ENTER("set_primary_key");
  983.   for (; key_part != end; key_part++) 
  984.   {
  985.     Field* field= key_part->field;
  986.     if (set_ndb_key(op, field, 
  987.     key_part->fieldnr-1, key))
  988.       ERR_RETURN(op->getNdbError());
  989.     key += key_part->length;
  990.   }
  991.   DBUG_RETURN(0);
  992. }
  993. int ha_ndbcluster::set_primary_key_from_record(NdbOperation *op, const byte *record)
  994. {
  995.   KEY* key_info= table->key_info + table->primary_key;
  996.   KEY_PART_INFO* key_part= key_info->key_part;
  997.   KEY_PART_INFO* end= key_part+key_info->key_parts;
  998.   DBUG_ENTER("set_primary_key_from_record");
  999.   for (; key_part != end; key_part++) 
  1000.   {
  1001.     Field* field= key_part->field;
  1002.     if (set_ndb_key(op, field, 
  1003.     key_part->fieldnr-1, record+key_part->offset))
  1004.       ERR_RETURN(op->getNdbError());
  1005.   }
  1006.   DBUG_RETURN(0);
  1007. }
  1008. /*
  1009.   Read one record from NDB using primary key
  1010. */
  1011. int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf) 
  1012. {
  1013.   uint no_fields= table->fields, i;
  1014.   NdbConnection *trans= m_active_trans;
  1015.   NdbOperation *op;
  1016.   THD *thd= current_thd;
  1017.   DBUG_ENTER("pk_read");
  1018.   DBUG_PRINT("enter", ("key_len: %u", key_len));
  1019.   DBUG_DUMP("key", (char*)key, key_len);
  1020.   NdbOperation::LockMode lm=
  1021.     (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
  1022.   if (!(op= trans->getNdbOperation((const NDBTAB *) m_table)) || 
  1023.       op->readTuple(lm) != 0)
  1024.     ERR_RETURN(trans->getNdbError());
  1025.   if (table->primary_key == MAX_KEY) 
  1026.   {
  1027.     // This table has no primary key, use "hidden" primary key
  1028.     DBUG_PRINT("info", ("Using hidden key"));
  1029.     DBUG_DUMP("key", (char*)key, 8);    
  1030.     if (set_hidden_key(op, no_fields, key))
  1031.       ERR_RETURN(trans->getNdbError());
  1032.     // Read key at the same time, for future reference
  1033.     if (get_ndb_value(op, NULL, no_fields, NULL))
  1034.       ERR_RETURN(trans->getNdbError());
  1035.   } 
  1036.   else 
  1037.   {
  1038.     int res;
  1039.     if ((res= set_primary_key(op, key)))
  1040.       return res;
  1041.   }
  1042.   
  1043.   // Read all wanted non-key field(s) unless HA_EXTRA_RETRIEVE_ALL_COLS
  1044.   for (i= 0; i < no_fields; i++) 
  1045.   {
  1046.     Field *field= table->field[i];
  1047.     if ((thd->query_id == field->query_id) ||
  1048. m_retrieve_all_fields ||
  1049. (field->flags & PRI_KEY_FLAG) && m_retrieve_primary_key)
  1050.     {
  1051.       if (get_ndb_value(op, field, i, buf))
  1052. ERR_RETURN(trans->getNdbError());
  1053.     }
  1054.     else
  1055.     {
  1056.       // Attribute was not to be read
  1057.       m_value[i].ptr= NULL;
  1058.     }
  1059.   }
  1060.   
  1061.   if (execute_no_commit_ie(this,trans) != 0) 
  1062.   {
  1063.     table->status= STATUS_NOT_FOUND;
  1064.     DBUG_RETURN(ndb_err(trans));
  1065.   }
  1066.   // The value have now been fetched from NDB  
  1067.   unpack_record(buf);
  1068.   table->status= 0;     
  1069.   DBUG_RETURN(0);
  1070. }
  1071. /*
  1072.   Read one complementing record from NDB using primary key from old_data
  1073. */
  1074. int ha_ndbcluster::complemented_pk_read(const byte *old_data, byte *new_data)
  1075. {
  1076.   uint no_fields= table->fields, i;
  1077.   NdbConnection *trans= m_active_trans;
  1078.   NdbOperation *op;
  1079.   THD *thd= current_thd;
  1080.   DBUG_ENTER("complemented_pk_read");
  1081.   if (m_retrieve_all_fields)
  1082.     // We have allready retrieved all fields, nothing to complement
  1083.     DBUG_RETURN(0);
  1084.   NdbOperation::LockMode lm=
  1085.     (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
  1086.   if (!(op= trans->getNdbOperation((const NDBTAB *) m_table)) || 
  1087.       op->readTuple(lm) != 0)
  1088.     ERR_RETURN(trans->getNdbError());
  1089.   int res;
  1090.   if ((res= set_primary_key_from_record(op, old_data)))
  1091.     ERR_RETURN(trans->getNdbError());
  1092.     
  1093.   // Read all unreferenced non-key field(s)
  1094.   for (i= 0; i < no_fields; i++) 
  1095.   {
  1096.     Field *field= table->field[i];
  1097.     if (!(field->flags & PRI_KEY_FLAG) &&
  1098. (thd->query_id != field->query_id))
  1099.     {
  1100.       if (get_ndb_value(op, field, i, new_data))
  1101. ERR_RETURN(trans->getNdbError());
  1102.     }
  1103.   }
  1104.   
  1105.   if (execute_no_commit(this,trans) != 0) 
  1106.   {
  1107.     table->status= STATUS_NOT_FOUND;
  1108.     DBUG_RETURN(ndb_err(trans));
  1109.   }
  1110.   // The value have now been fetched from NDB  
  1111.   unpack_record(new_data);
  1112.   table->status= 0;     
  1113.   DBUG_RETURN(0);
  1114. }
  1115. /*
  1116.   Peek to check if a particular row already exists
  1117. */
  1118. int ha_ndbcluster::peek_row(const byte *record)
  1119. {
  1120.   NdbConnection *trans= m_active_trans;
  1121.   NdbOperation *op;
  1122.   THD *thd= current_thd;
  1123.   DBUG_ENTER("peek_row");
  1124.                                                                                 
  1125.   NdbOperation::LockMode lm=
  1126.     (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
  1127.   if (!(op= trans->getNdbOperation((const NDBTAB *) m_table)) ||
  1128.       op->readTuple(lm) != 0)
  1129.     ERR_RETURN(trans->getNdbError());
  1130.                                                                                 
  1131.   int res;
  1132.   if ((res= set_primary_key_from_record(op, record)))
  1133.     ERR_RETURN(trans->getNdbError());
  1134.                                                                                 
  1135.   if (execute_no_commit_ie(this,trans) != 0)
  1136.     {
  1137.       table->status= STATUS_NOT_FOUND;
  1138.       DBUG_RETURN(ndb_err(trans));
  1139.     }                                                                                
  1140.   DBUG_RETURN(0);
  1141. }
  1142. /*
  1143.   Read one record from NDB using unique secondary index
  1144. */
  1145. int ha_ndbcluster::unique_index_read(const byte *key,
  1146.      uint key_len, byte *buf)
  1147. {
  1148.   NdbConnection *trans= m_active_trans;
  1149.   NdbIndexOperation *op;
  1150.   THD *thd= current_thd;
  1151.   byte *key_ptr;
  1152.   KEY* key_info;
  1153.   KEY_PART_INFO *key_part, *end;
  1154.   uint i;
  1155.   DBUG_ENTER("unique_index_read");
  1156.   DBUG_PRINT("enter", ("key_len: %u, index: %u", key_len, active_index));
  1157.   DBUG_DUMP("key", (char*)key, key_len);
  1158.   
  1159.   NdbOperation::LockMode lm=
  1160.     (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
  1161.   if (!(op= trans->getNdbIndexOperation((NDBINDEX *) 
  1162. m_index[active_index].unique_index, 
  1163.                                         (const NDBTAB *) m_table)) ||
  1164.       op->readTuple(lm) != 0)
  1165.     ERR_RETURN(trans->getNdbError());
  1166.   
  1167.   // Set secondary index key(s)
  1168.   key_ptr= (byte *) key;
  1169.   key_info= table->key_info + active_index;
  1170.   DBUG_ASSERT(key_info->key_length == key_len);
  1171.   end= (key_part= key_info->key_part) + key_info->key_parts;
  1172.   for (i= 0; key_part != end; key_part++, i++) 
  1173.   {
  1174.     if (set_ndb_key(op, key_part->field,
  1175.     m_index[active_index].unique_index_attrid_map[i], 
  1176.     key_part->null_bit ? key_ptr + 1 : key_ptr))
  1177.       ERR_RETURN(trans->getNdbError());
  1178.     key_ptr+= key_part->store_length;
  1179.   }
  1180.   // Get non-index attribute(s)
  1181.   for (i= 0; i < table->fields; i++) 
  1182.   {
  1183.     Field *field= table->field[i];
  1184.     if ((thd->query_id == field->query_id) ||
  1185.         (field->flags & PRI_KEY_FLAG)) // && m_retrieve_primary_key ??
  1186.     {
  1187.       if (get_ndb_value(op, field, i, buf))
  1188.         ERR_RETURN(op->getNdbError());
  1189.     }
  1190.     else
  1191.     {
  1192.       // Attribute was not to be read
  1193.       m_value[i].ptr= NULL;
  1194.     }
  1195.   }
  1196.   if (execute_no_commit_ie(this,trans) != 0) 
  1197.   {
  1198.     table->status= STATUS_NOT_FOUND;
  1199.     DBUG_RETURN(ndb_err(trans));
  1200.   }
  1201.   // The value have now been fetched from NDB
  1202.   unpack_record(buf);
  1203.   table->status= 0;
  1204.   DBUG_RETURN(0);
  1205. }
  1206. /*
  1207.   Get the next record of a started scan. Try to fetch
  1208.   it locally from NdbApi cached records if possible, 
  1209.   otherwise ask NDB for more.
  1210.   NOTE
  1211.   If this is a update/delete make sure to not contact 
  1212.   NDB before any pending ops have been sent to NDB.
  1213. */
  1214. inline int ha_ndbcluster::next_result(byte *buf)
  1215. {  
  1216.   int check;
  1217.   NdbConnection *trans= m_active_trans;
  1218.   NdbResultSet *cursor= m_active_cursor; 
  1219.   DBUG_ENTER("next_result");
  1220.   if (!cursor)
  1221.     DBUG_RETURN(HA_ERR_END_OF_FILE);
  1222.     
  1223.   /* 
  1224.      If this an update or delete, call nextResult with false
  1225.      to process any records already cached in NdbApi
  1226.   */
  1227.   bool contact_ndb= m_lock.type < TL_WRITE_ALLOW_WRITE;
  1228.   do {
  1229.     DBUG_PRINT("info", ("Call nextResult, contact_ndb: %d", contact_ndb));
  1230.     /*
  1231.       We can only handle one tuple with blobs at a time.
  1232.     */
  1233.     if (m_ops_pending && m_blobs_pending)
  1234.     {
  1235.       if (execute_no_commit(this,trans) != 0)
  1236. DBUG_RETURN(ndb_err(trans));
  1237.       m_ops_pending= 0;
  1238.       m_blobs_pending= FALSE;
  1239.     }
  1240.     check= cursor->nextResult(contact_ndb, m_force_send);
  1241.     if (check == 0)
  1242.     {
  1243.       // One more record found
  1244.       DBUG_PRINT("info", ("One more record found"));    
  1245.       unpack_record(buf);
  1246.       table->status= 0;
  1247.       DBUG_RETURN(0);
  1248.     } 
  1249.     else if (check == 1 || check == 2)
  1250.     {
  1251.       // 1: No more records
  1252.       // 2: No more cached records
  1253.       /*
  1254. Before fetching more rows and releasing lock(s),
  1255. all pending update or delete operations should 
  1256. be sent to NDB
  1257.       */
  1258.       DBUG_PRINT("info", ("ops_pending: %d", m_ops_pending));    
  1259.       if (m_ops_pending)
  1260.       {
  1261. // if (current_thd->transaction.on)
  1262. if (m_transaction_on)
  1263. {
  1264.   if (execute_no_commit(this,trans) != 0)
  1265.     DBUG_RETURN(ndb_err(trans));
  1266. }
  1267. else
  1268. {
  1269.   if  (execute_commit(this,trans) != 0)
  1270.     DBUG_RETURN(ndb_err(trans));
  1271.   int res= trans->restart();
  1272.   DBUG_ASSERT(res == 0);
  1273. }
  1274. m_ops_pending= 0;
  1275.       }
  1276.       
  1277.       contact_ndb= (check == 2);
  1278.     }
  1279.   } while (check == 2);
  1280.     
  1281.   table->status= STATUS_NOT_FOUND;
  1282.   if (check == -1)
  1283.     DBUG_RETURN(ndb_err(trans));
  1284.   // No more records
  1285.   DBUG_PRINT("info", ("No more records"));
  1286.   DBUG_RETURN(HA_ERR_END_OF_FILE);
  1287. }
  1288. /*
  1289.   Set bounds for ordered index scan.
  1290. */
  1291. int ha_ndbcluster::set_bounds(NdbIndexScanOperation *op,
  1292.       const key_range *keys[2])
  1293. {
  1294.   const KEY *const key_info= table->key_info + active_index;
  1295.   const uint key_parts= key_info->key_parts;
  1296.   uint key_tot_len[2];
  1297.   uint tot_len;
  1298.   uint i, j;
  1299.   DBUG_ENTER("set_bounds");
  1300.   DBUG_PRINT("info", ("key_parts=%d", key_parts));
  1301.   for (j= 0; j <= 1; j++)
  1302.   {
  1303.     const key_range *key= keys[j];
  1304.     if (key != NULL)
  1305.     {
  1306.       // for key->flag see ha_rkey_function
  1307.       DBUG_PRINT("info", ("key %d length=%d flag=%d",
  1308.                           j, key->length, key->flag));
  1309.       key_tot_len[j]= key->length;
  1310.     }
  1311.     else
  1312.     {
  1313.       DBUG_PRINT("info", ("key %d not present", j));
  1314.       key_tot_len[j]= 0;
  1315.     }
  1316.   }
  1317.   tot_len= 0;
  1318.   for (i= 0; i < key_parts; i++)
  1319.   {
  1320.     KEY_PART_INFO *key_part= &key_info->key_part[i];
  1321.     Field *field= key_part->field;
  1322.     uint part_len= key_part->length;
  1323.     uint part_store_len= key_part->store_length;
  1324.     // Info about each key part
  1325.     struct part_st {
  1326.       bool part_last;
  1327.       const key_range *key;
  1328.       const byte *part_ptr;
  1329.       bool part_null;
  1330.       int bound_type;
  1331.       const char* bound_ptr;
  1332.     };
  1333.     struct part_st part[2];
  1334.     for (j= 0; j <= 1; j++)
  1335.     {
  1336.       struct part_st &p = part[j];
  1337.       p.key= NULL;
  1338.       p.bound_type= -1;
  1339.       if (tot_len < key_tot_len[j])
  1340.       {
  1341.         p.part_last= (tot_len + part_store_len >= key_tot_len[j]);
  1342.         p.key= keys[j];
  1343.         p.part_ptr= &p.key->key[tot_len];
  1344.         p.part_null= key_part->null_bit && *p.part_ptr;
  1345.         p.bound_ptr= (const char *)
  1346.           p.part_null ? 0 : key_part->null_bit ? p.part_ptr + 1 : p.part_ptr;
  1347.         if (j == 0)
  1348.         {
  1349.           switch (p.key->flag)
  1350.           {
  1351.             case HA_READ_KEY_EXACT:
  1352.               p.bound_type= NdbIndexScanOperation::BoundEQ;
  1353.               break;
  1354.             case HA_READ_KEY_OR_NEXT:
  1355.               p.bound_type= NdbIndexScanOperation::BoundLE;
  1356.               break;
  1357.             case HA_READ_AFTER_KEY:
  1358.               if (! p.part_last)
  1359.                 p.bound_type= NdbIndexScanOperation::BoundLE;
  1360.               else
  1361.                 p.bound_type= NdbIndexScanOperation::BoundLT;
  1362.               break;
  1363.             default:
  1364.               break;
  1365.           }
  1366.         }
  1367.         if (j == 1) {
  1368.           switch (p.key->flag)
  1369.           {
  1370.             case HA_READ_BEFORE_KEY:
  1371.               if (! p.part_last)
  1372.                 p.bound_type= NdbIndexScanOperation::BoundGE;
  1373.               else
  1374.                 p.bound_type= NdbIndexScanOperation::BoundGT;
  1375.               break;
  1376.             case HA_READ_AFTER_KEY:     // weird
  1377.               p.bound_type= NdbIndexScanOperation::BoundGE;
  1378.               break;
  1379.             default:
  1380.               break;
  1381.           }
  1382.         }
  1383.         if (p.bound_type == -1)
  1384.         {
  1385.           DBUG_PRINT("error", ("key %d unknown flag %d", j, p.key->flag));
  1386.           DBUG_ASSERT(false);
  1387.           // Stop setting bounds but continue with what we have
  1388.           DBUG_RETURN(0);
  1389.         }
  1390.       }
  1391.     }
  1392.     // Seen with e.g. b = 1 and c > 1
  1393.     if (part[0].bound_type == NdbIndexScanOperation::BoundLE &&
  1394.         part[1].bound_type == NdbIndexScanOperation::BoundGE &&
  1395.         memcmp(part[0].part_ptr, part[1].part_ptr, part_store_len) == 0)
  1396.     {
  1397.       DBUG_PRINT("info", ("replace LE/GE pair by EQ"));
  1398.       part[0].bound_type= NdbIndexScanOperation::BoundEQ;
  1399.       part[1].bound_type= -1;
  1400.     }
  1401.     // Not seen but was in previous version
  1402.     if (part[0].bound_type == NdbIndexScanOperation::BoundEQ &&
  1403.         part[1].bound_type == NdbIndexScanOperation::BoundGE &&
  1404.         memcmp(part[0].part_ptr, part[1].part_ptr, part_store_len) == 0)
  1405.     {
  1406.       DBUG_PRINT("info", ("remove GE from EQ/GE pair"));
  1407.       part[1].bound_type= -1;
  1408.     }
  1409.     for (j= 0; j <= 1; j++)
  1410.     {
  1411.       struct part_st &p = part[j];
  1412.       // Set bound if not done with this key
  1413.       if (p.key != NULL)
  1414.       {
  1415.         DBUG_PRINT("info", ("key %d:%d offset=%d length=%d last=%d bound=%d",
  1416.                             j, i, tot_len, part_len, p.part_last, p.bound_type));
  1417.         DBUG_DUMP("info", (const char*)p.part_ptr, part_store_len);
  1418.         // Set bound if not cancelled via type -1
  1419.         if (p.bound_type != -1)
  1420. {
  1421.           if (op->setBound(i, p.bound_type, p.bound_ptr))
  1422.             ERR_RETURN(op->getNdbError());
  1423. }
  1424.       }
  1425.     }
  1426.     tot_len+= part_store_len;
  1427.   }
  1428.   DBUG_RETURN(0);
  1429. }
  1430. inline 
  1431. int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op)
  1432. {
  1433.   uint i;
  1434.   THD *thd= current_thd;
  1435.   NdbConnection *trans= m_active_trans;
  1436.   DBUG_ENTER("define_read_attrs");  
  1437.   // Define attributes to read
  1438.   for (i= 0; i < table->fields; i++) 
  1439.   {
  1440.     Field *field= table->field[i];
  1441.     if ((thd->query_id == field->query_id) ||
  1442. (field->flags & PRI_KEY_FLAG) || 
  1443. m_retrieve_all_fields)
  1444.     {      
  1445.       if (get_ndb_value(op, field, i, buf))
  1446. ERR_RETURN(op->getNdbError());
  1447.     } 
  1448.     else 
  1449.     {
  1450.       m_value[i].ptr= NULL;
  1451.     }
  1452.   }
  1453.     
  1454.   if (table->primary_key == MAX_KEY) 
  1455.   {
  1456.     DBUG_PRINT("info", ("Getting hidden key"));
  1457.     // Scanning table with no primary key
  1458.     int hidden_no= table->fields;      
  1459. #ifndef DBUG_OFF
  1460.     const NDBTAB *tab= (const NDBTAB *) m_table;    
  1461.     if (!tab->getColumn(hidden_no))
  1462.       DBUG_RETURN(1);
  1463. #endif
  1464.     if (get_ndb_value(op, NULL, hidden_no, NULL))
  1465.       ERR_RETURN(op->getNdbError());
  1466.   }
  1467.   if (execute_no_commit(this,trans) != 0)
  1468.     DBUG_RETURN(ndb_err(trans));
  1469.   DBUG_PRINT("exit", ("Scan started successfully"));
  1470.   DBUG_RETURN(next_result(buf));
  1471. /*
  1472.   Start ordered index scan in NDB
  1473. */
  1474. int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
  1475.       const key_range *end_key,
  1476.       bool sorted, byte* buf)
  1477. {  
  1478.   bool restart;
  1479.   NdbConnection *trans= m_active_trans;
  1480.   NdbResultSet *cursor;
  1481.   NdbIndexScanOperation *op;
  1482.   DBUG_ENTER("ordered_index_scan");
  1483.   DBUG_PRINT("enter", ("index: %u, sorted: %d", active_index, sorted));  
  1484.   DBUG_PRINT("enter", ("Starting new ordered scan on %s", m_tabname));
  1485.   // Check that sorted seems to be initialised
  1486.   DBUG_ASSERT(sorted == 0 || sorted == 1);
  1487.   
  1488.   if (m_active_cursor == 0)
  1489.   {
  1490.     restart= false;
  1491.     NdbOperation::LockMode lm=
  1492.       (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
  1493.     if (!(op= trans->getNdbIndexScanOperation((NDBINDEX *)
  1494.       m_index[active_index].index, 
  1495.       (const NDBTAB *) m_table)) ||
  1496. !(cursor= op->readTuples(lm, 0, parallelism, sorted)))
  1497.       ERR_RETURN(trans->getNdbError());
  1498.     m_active_cursor= cursor;
  1499.   } else {
  1500.     restart= true;
  1501.     op= (NdbIndexScanOperation*)m_active_cursor->getOperation();
  1502.     
  1503.     DBUG_ASSERT(op->getSorted() == sorted);
  1504.     DBUG_ASSERT(op->getLockMode() == 
  1505. (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type));
  1506.     if(op->reset_bounds(m_force_send))
  1507.       DBUG_RETURN(ndb_err(m_active_trans));
  1508.   }
  1509.   {
  1510.     const key_range *keys[2]= { start_key, end_key };
  1511.     int ret= set_bounds(op, keys);
  1512.     if (ret)
  1513.       DBUG_RETURN(ret);
  1514.   }
  1515.   if (!restart)
  1516.   {
  1517.     DBUG_RETURN(define_read_attrs(buf, op));
  1518.   }
  1519.   else
  1520.   {
  1521.     if (execute_no_commit(this,trans) != 0)
  1522.       DBUG_RETURN(ndb_err(trans));
  1523.     
  1524.     DBUG_RETURN(next_result(buf));
  1525.   }
  1526. /*
  1527.   Start a filtered scan in NDB.
  1528.   NOTE
  1529.   This function is here as an example of how to start a
  1530.   filtered scan. It should be possible to replace full_table_scan 
  1531.   with this function and make a best effort attempt 
  1532.   at filtering out the irrelevant data by converting the "items" 
  1533.   into interpreted instructions.
  1534.   This would speed up table scans where there is a limiting WHERE clause
  1535.   that doesn't match any index in the table.
  1536.  */
  1537. int ha_ndbcluster::filtered_scan(const byte *key, uint key_len, 
  1538.  byte *buf,
  1539.  enum ha_rkey_function find_flag)
  1540. {  
  1541.   NdbConnection *trans= m_active_trans;
  1542.   NdbResultSet *cursor;
  1543.   NdbScanOperation *op;
  1544.   DBUG_ENTER("filtered_scan");
  1545.   DBUG_PRINT("enter", ("key_len: %u, index: %u", 
  1546.                        key_len, active_index));
  1547.   DBUG_DUMP("key", (char*)key, key_len);  
  1548.   DBUG_PRINT("info", ("Starting a new filtered scan on %s",
  1549.       m_tabname));
  1550.   NdbOperation::LockMode lm=
  1551.     (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
  1552.   if (!(op= trans->getNdbScanOperation((const NDBTAB *) m_table)) ||
  1553.       !(cursor= op->readTuples(lm, 0, parallelism)))
  1554.     ERR_RETURN(trans->getNdbError());
  1555.   m_active_cursor= cursor;
  1556.   
  1557.   {
  1558.     // Start scan filter
  1559.     NdbScanFilter sf(op);
  1560.     sf.begin();
  1561.       
  1562.     // Set filter using the supplied key data
  1563.     byte *key_ptr= (byte *) key;    
  1564.     uint tot_len= 0;
  1565.     KEY* key_info= table->key_info + active_index;
  1566.     for (uint k= 0; k < key_info->key_parts; k++) 
  1567.     {
  1568.       KEY_PART_INFO* key_part= key_info->key_part+k;
  1569.       Field* field= key_part->field;
  1570.       uint ndb_fieldnr= key_part->fieldnr-1;
  1571.       DBUG_PRINT("key_part", ("fieldnr: %d", ndb_fieldnr));
  1572.       //const NDBCOL *col= ((const NDBTAB *) m_table)->getColumn(ndb_fieldnr);
  1573.       uint32 field_len=  field->pack_length();
  1574.       DBUG_DUMP("key", (char*)key, field_len);
  1575.       DBUG_PRINT("info", ("Column %s, type: %d, len: %d", 
  1576.   field->field_name, field->real_type(), field_len));
  1577.       // Define scan filter
  1578.       if (field->real_type() == MYSQL_TYPE_STRING)
  1579. sf.eq(ndb_fieldnr, key_ptr, field_len);
  1580.       else 
  1581.       {
  1582. if (field_len == 8)
  1583.   sf.eq(ndb_fieldnr, (Uint64)*key_ptr);
  1584. else if (field_len <= 4)
  1585.   sf.eq(ndb_fieldnr, (Uint32)*key_ptr);
  1586. else 
  1587.   DBUG_RETURN(1);
  1588.       }
  1589.       key_ptr += field_len;
  1590.       tot_len += field_len;
  1591.       if (tot_len >= key_len)
  1592. break;
  1593.     }
  1594.     // End scan filter
  1595.     sf.end();
  1596.   }
  1597.   DBUG_RETURN(define_read_attrs(buf, op));
  1598. /*
  1599.   Start full table scan in NDB
  1600.  */
  1601. int ha_ndbcluster::full_table_scan(byte *buf)
  1602. {
  1603.   uint i;
  1604.   NdbResultSet *cursor;
  1605.   NdbScanOperation *op;
  1606.   NdbConnection *trans= m_active_trans;
  1607.   DBUG_ENTER("full_table_scan");  
  1608.   DBUG_PRINT("enter", ("Starting new scan on %s", m_tabname));
  1609.   NdbOperation::LockMode lm=
  1610.     (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
  1611.   if (!(op=trans->getNdbScanOperation((const NDBTAB *) m_table)) ||
  1612.       !(cursor= op->readTuples(lm, 0, parallelism)))
  1613.     ERR_RETURN(trans->getNdbError());
  1614.   m_active_cursor= cursor;
  1615.   DBUG_RETURN(define_read_attrs(buf, op));
  1616. }
  1617. /*
  1618.   Insert one record into NDB
  1619. */
  1620. int ha_ndbcluster::write_row(byte *record)
  1621. {
  1622.   bool has_auto_increment;
  1623.   uint i;
  1624.   NdbConnection *trans= m_active_trans;
  1625.   NdbOperation *op;
  1626.   int res;
  1627.   DBUG_ENTER("write_row");
  1628.   if(m_ignore_dup_key && table->primary_key != MAX_KEY)
  1629.   {
  1630.     int peek_res= peek_row(record);
  1631.     
  1632.     if (!peek_res) 
  1633.     {
  1634.       m_dupkey= table->primary_key;
  1635.       DBUG_RETURN(HA_ERR_FOUND_DUPP_KEY);
  1636.     }
  1637.     if (peek_res != HA_ERR_KEY_NOT_FOUND)
  1638.       DBUG_RETURN(peek_res);
  1639.   }
  1640.   
  1641.   statistic_increment(ha_write_count,&LOCK_status);
  1642.   if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
  1643.     table->timestamp_field->set_time();
  1644.   has_auto_increment= (table->next_number_field && record == table->record[0]);
  1645.   if (!(op= trans->getNdbOperation((const NDBTAB *) m_table)))
  1646.     ERR_RETURN(trans->getNdbError());
  1647.   res= (m_use_write) ? op->writeTuple() :op->insertTuple(); 
  1648.   if (res != 0)
  1649.     ERR_RETURN(trans->getNdbError());  
  1650.  
  1651.   if (table->primary_key == MAX_KEY) 
  1652.   {
  1653.     // Table has hidden primary key
  1654.     Ndb *ndb= get_ndb();
  1655.     Uint64 auto_value= NDB_FAILED_AUTO_INCREMENT;
  1656.     uint retries= NDB_AUTO_INCREMENT_RETRIES;
  1657.     do {
  1658.       auto_value= ndb->getAutoIncrementValue((const NDBTAB *) m_table);
  1659.     } while (auto_value == NDB_FAILED_AUTO_INCREMENT && 
  1660.              --retries &&
  1661.              ndb->getNdbError().status == NdbError::TemporaryError);
  1662.     if (auto_value == NDB_FAILED_AUTO_INCREMENT)
  1663.       ERR_RETURN(ndb->getNdbError());
  1664.     if (set_hidden_key(op, table->fields, (const byte*)&auto_value))
  1665.       ERR_RETURN(op->getNdbError());
  1666.   } 
  1667.   else 
  1668.   {
  1669.     int res;
  1670.     if (has_auto_increment) 
  1671.     {
  1672.       m_skip_auto_increment= FALSE;
  1673.       update_auto_increment();
  1674.       m_skip_auto_increment= !auto_increment_column_changed;
  1675.     }
  1676.     if ((res= set_primary_key_from_record(op, record)))
  1677.       return res;  
  1678.   }
  1679.   // Set non-key attribute(s)
  1680.   bool set_blob_value= FALSE;
  1681.   for (i= 0; i < table->fields; i++) 
  1682.   {
  1683.     Field *field= table->field[i];
  1684.     if (!(field->flags & PRI_KEY_FLAG) &&
  1685. set_ndb_value(op, field, i, &set_blob_value))
  1686.     {
  1687.       m_skip_auto_increment= TRUE;
  1688.       ERR_RETURN(op->getNdbError());
  1689.     }
  1690.   }
  1691.   /*
  1692.     Execute write operation
  1693.     NOTE When doing inserts with many values in 
  1694.     each INSERT statement it should not be necessary
  1695.     to NoCommit the transaction between each row.
  1696.     Find out how this is detected!
  1697.   */
  1698.   m_rows_inserted++;
  1699.   no_uncommitted_rows_update(1);
  1700.   m_bulk_insert_not_flushed= TRUE;
  1701.   if ((m_rows_to_insert == (ha_rows) 1) || 
  1702.       ((m_rows_inserted % m_bulk_insert_rows) == 0) ||
  1703.       m_primary_key_update ||
  1704.       set_blob_value)
  1705.   {
  1706.     THD *thd= current_thd;
  1707.     // Send rows to NDB
  1708.     DBUG_PRINT("info", ("Sending inserts to NDB, "
  1709. "rows_inserted:%d, bulk_insert_rows: %d", 
  1710. (int)m_rows_inserted, (int)m_bulk_insert_rows));
  1711.     m_bulk_insert_not_flushed= FALSE;
  1712.     //    if (thd->transaction.on)
  1713.     if (m_transaction_on)
  1714.     {
  1715.       if (execute_no_commit(this,trans) != 0)
  1716.       {
  1717. m_skip_auto_increment= TRUE;
  1718. no_uncommitted_rows_execute_failure();
  1719. DBUG_RETURN(ndb_err(trans));
  1720.       }
  1721.     }
  1722.     else
  1723.     {
  1724.       if (execute_commit(this,trans) != 0)
  1725.       {
  1726. m_skip_auto_increment= TRUE;
  1727. no_uncommitted_rows_execute_failure();
  1728. DBUG_RETURN(ndb_err(trans));
  1729.       }
  1730.       int res= trans->restart();
  1731.       DBUG_ASSERT(res == 0);
  1732.     }
  1733.   }
  1734.   if ((has_auto_increment) && (m_skip_auto_increment))
  1735.   {
  1736.     Ndb *ndb= get_ndb();
  1737.     Uint64 next_val= (Uint64) table->next_number_field->val_int() + 1;
  1738.     DBUG_PRINT("info", 
  1739.        ("Trying to set next auto increment value to %lu",
  1740.                 (ulong) next_val));
  1741.     if (ndb->setAutoIncrementValue((const NDBTAB *) m_table, next_val, TRUE))
  1742.       DBUG_PRINT("info", 
  1743.  ("Setting next auto increment value to %u", next_val));  
  1744.   }
  1745.   m_skip_auto_increment= TRUE;
  1746.   DBUG_RETURN(0);
  1747. }
  1748. /* Compare if a key in a row has changed */
  1749. int ha_ndbcluster::key_cmp(uint keynr, const byte * old_row,
  1750.    const byte * new_row)
  1751. {
  1752.   KEY_PART_INFO *key_part=table->key_info[keynr].key_part;
  1753.   KEY_PART_INFO *end=key_part+table->key_info[keynr].key_parts;
  1754.   for (; key_part != end ; key_part++)
  1755.   {
  1756.     if (key_part->null_bit)
  1757.     {
  1758.       if ((old_row[key_part->null_offset] & key_part->null_bit) !=
  1759.   (new_row[key_part->null_offset] & key_part->null_bit))
  1760. return 1;
  1761.     }
  1762.     if (key_part->key_part_flag & (HA_BLOB_PART | HA_VAR_LENGTH))
  1763.     {
  1764.       if (key_part->field->cmp_binary((char*) (old_row + key_part->offset),
  1765.       (char*) (new_row + key_part->offset),
  1766.       (ulong) key_part->length))
  1767. return 1;
  1768.     }
  1769.     else
  1770.     {
  1771.       if (memcmp(old_row+key_part->offset, new_row+key_part->offset,
  1772.  key_part->length))
  1773. return 1;
  1774.     }
  1775.   }
  1776.   return 0;
  1777. }
  1778. /*
  1779.   Update one record in NDB using primary key
  1780. */
  1781. int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
  1782. {
  1783.   THD *thd= current_thd;
  1784.   NdbConnection *trans= m_active_trans;
  1785.   NdbResultSet* cursor= m_active_cursor;
  1786.   NdbOperation *op;
  1787.   uint i;
  1788.   DBUG_ENTER("update_row");
  1789.   
  1790.   statistic_increment(ha_update_count,&LOCK_status);
  1791.   if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE)
  1792.   {
  1793.     table->timestamp_field->set_time();
  1794.     // Set query_id so that field is really updated
  1795.     table->timestamp_field->query_id= thd->query_id;
  1796.   }
  1797.   /* Check for update of primary key for special handling */  
  1798.   if ((table->primary_key != MAX_KEY) &&
  1799.       (key_cmp(table->primary_key, old_data, new_data)))
  1800.   {
  1801.     int read_res, insert_res, delete_res, undo_res;
  1802.     DBUG_PRINT("info", ("primary key update, doing pk read+delete+insert"));
  1803.     // Get all old fields, since we optimize away fields not in query
  1804.     read_res= complemented_pk_read(old_data, new_data);
  1805.     if (read_res)
  1806.     {
  1807.       DBUG_PRINT("info", ("pk read failed"));
  1808.       DBUG_RETURN(read_res);
  1809.     }
  1810.     // Delete old row
  1811.     m_primary_key_update= TRUE;
  1812.     delete_res= delete_row(old_data);
  1813.     m_primary_key_update= FALSE;
  1814.     if (delete_res)
  1815.     {
  1816.       DBUG_PRINT("info", ("delete failed"));
  1817.       DBUG_RETURN(delete_res);
  1818.     }     
  1819.     // Insert new row
  1820.     DBUG_PRINT("info", ("delete succeded"));
  1821.     m_primary_key_update= TRUE;
  1822.     insert_res= write_row(new_data);
  1823.     m_primary_key_update= FALSE;
  1824.     if (insert_res)
  1825.     {
  1826.       DBUG_PRINT("info", ("insert failed"));
  1827.       if (trans->commitStatus() == NdbConnection::Started)
  1828.       {
  1829.         // Undo delete_row(old_data)
  1830.         m_primary_key_update= TRUE;
  1831.         undo_res= write_row((byte *)old_data);
  1832.         if (undo_res)
  1833.           push_warning(current_thd, 
  1834.                        MYSQL_ERROR::WARN_LEVEL_WARN, 
  1835.                        undo_res, 
  1836.                        "NDB failed undoing delete at primary key update");
  1837.         m_primary_key_update= FALSE;
  1838.       }
  1839.       DBUG_RETURN(insert_res);
  1840.     }
  1841.     DBUG_PRINT("info", ("delete+insert succeeded"));
  1842.     DBUG_RETURN(0);
  1843.   }
  1844.   if (cursor)
  1845.   {
  1846.     /*
  1847.       We are scanning records and want to update the record
  1848.       that was just found, call updateTuple on the cursor 
  1849.       to take over the lock to a new update operation
  1850.       And thus setting the primary key of the record from 
  1851.       the active record in cursor
  1852.     */
  1853.     DBUG_PRINT("info", ("Calling updateTuple on cursor"));
  1854.     if (!(op= cursor->updateTuple()))
  1855.       ERR_RETURN(trans->getNdbError());
  1856.     m_ops_pending++;
  1857.     if (uses_blob_value(FALSE))
  1858.       m_blobs_pending= TRUE;
  1859.   }
  1860.   else
  1861.   {  
  1862.     if (!(op= trans->getNdbOperation((const NDBTAB *) m_table)) ||
  1863. op->updateTuple() != 0)
  1864.       ERR_RETURN(trans->getNdbError());  
  1865.     
  1866.     if (table->primary_key == MAX_KEY) 
  1867.     {
  1868.       // This table has no primary key, use "hidden" primary key
  1869.       DBUG_PRINT("info", ("Using hidden key"));
  1870.       
  1871.       // Require that the PK for this record has previously been 
  1872.       // read into m_value
  1873.       uint no_fields= table->fields;
  1874.       NdbRecAttr* rec= m_value[no_fields].rec;
  1875.       DBUG_ASSERT(rec);
  1876.       DBUG_DUMP("key", (char*)rec->aRef(), NDB_HIDDEN_PRIMARY_KEY_LENGTH);
  1877.       
  1878.       if (set_hidden_key(op, no_fields, rec->aRef()))
  1879. ERR_RETURN(op->getNdbError());
  1880.     } 
  1881.     else 
  1882.     {
  1883.       int res;
  1884.       if ((res= set_primary_key_from_record(op, old_data)))
  1885. DBUG_RETURN(res);
  1886.     }
  1887.   }
  1888.   // Set non-key attribute(s)
  1889.   for (i= 0; i < table->fields; i++) 
  1890.   {
  1891.     Field *field= table->field[i];
  1892.     if (((thd->query_id == field->query_id) || m_retrieve_all_fields) &&
  1893.         (!(field->flags & PRI_KEY_FLAG)) &&
  1894. set_ndb_value(op, field, i))
  1895.       ERR_RETURN(op->getNdbError());
  1896.   }
  1897.   // Execute update operation
  1898.   if (!cursor && execute_no_commit(this,trans) != 0) {
  1899.     no_uncommitted_rows_execute_failure();
  1900.     DBUG_RETURN(ndb_err(trans));
  1901.   }
  1902.   
  1903.   DBUG_RETURN(0);
  1904. }
  1905. /*
  1906.   Delete one record from NDB, using primary key 
  1907. */
  1908. int ha_ndbcluster::delete_row(const byte *record)
  1909. {
  1910.   NdbConnection *trans= m_active_trans;
  1911.   NdbResultSet* cursor= m_active_cursor;
  1912.   NdbOperation *op;
  1913.   DBUG_ENTER("delete_row");
  1914.   statistic_increment(ha_delete_count,&LOCK_status);
  1915.   if (cursor)
  1916.   {
  1917.     /*
  1918.       We are scanning records and want to delete the record
  1919.       that was just found, call deleteTuple on the cursor 
  1920.       to take over the lock to a new delete operation
  1921.       And thus setting the primary key of the record from 
  1922.       the active record in cursor
  1923.     */
  1924.     DBUG_PRINT("info", ("Calling deleteTuple on cursor"));
  1925.     if (cursor->deleteTuple() != 0)
  1926.       ERR_RETURN(trans->getNdbError());     
  1927.     m_ops_pending++;
  1928.     no_uncommitted_rows_update(-1);
  1929.     if (!m_primary_key_update)
  1930.       // If deleting from cursor, NoCommit will be handled in next_result
  1931.       DBUG_RETURN(0);
  1932.   }
  1933.   else
  1934.   {
  1935.     
  1936.     if (!(op=trans->getNdbOperation((const NDBTAB *) m_table)) || 
  1937. op->deleteTuple() != 0)
  1938.       ERR_RETURN(trans->getNdbError());
  1939.     
  1940.     no_uncommitted_rows_update(-1);
  1941.     
  1942.     if (table->primary_key == MAX_KEY) 
  1943.     {
  1944.       // This table has no primary key, use "hidden" primary key
  1945.       DBUG_PRINT("info", ("Using hidden key"));
  1946.       uint no_fields= table->fields;
  1947.       NdbRecAttr* rec= m_value[no_fields].rec;
  1948.       DBUG_ASSERT(rec != NULL);
  1949.       
  1950.       if (set_hidden_key(op, no_fields, rec->aRef()))
  1951. ERR_RETURN(op->getNdbError());
  1952.     } 
  1953.     else 
  1954.     {
  1955.       int res;
  1956.       if ((res= set_primary_key_from_record(op, record)))
  1957.         return res;  
  1958.     }
  1959.   }
  1960.   
  1961.   // Execute delete operation
  1962.   if (execute_no_commit(this,trans) != 0) {
  1963.     no_uncommitted_rows_execute_failure();
  1964.     DBUG_RETURN(ndb_err(trans));
  1965.   }
  1966.   DBUG_RETURN(0);
  1967. }
  1968.   
  1969. /*
  1970.   Unpack a record read from NDB 
  1971.   SYNOPSIS
  1972.     unpack_record()
  1973.     buf Buffer to store read row
  1974.   NOTE
  1975.     The data for each row is read directly into the
  1976.     destination buffer. This function is primarily 
  1977.     called in order to check if any fields should be 
  1978.     set to null.
  1979. */
  1980. void ha_ndbcluster::unpack_record(byte* buf)
  1981. {
  1982.   uint row_offset= (uint) (buf - table->record[0]);
  1983.   Field **field, **end;
  1984.   NdbValue *value= m_value;
  1985.   DBUG_ENTER("unpack_record");
  1986.   
  1987.   // Set null flag(s)
  1988.   bzero(buf, table->null_bytes);
  1989.   for (field= table->field, end= field+table->fields;
  1990.        field < end;
  1991.        field++, value++)
  1992.   {
  1993.     if ((*value).ptr)
  1994.     {
  1995.       if (! ((*field)->flags & BLOB_FLAG))
  1996.       {
  1997.         if ((*value).rec->isNULL())
  1998.          (*field)->set_null(row_offset);
  1999.       }
  2000.       else
  2001.       {
  2002.         NdbBlob* ndb_blob= (*value).blob;
  2003.         bool isNull= TRUE;
  2004.         int ret= ndb_blob->getNull(isNull);
  2005.         DBUG_ASSERT(ret == 0);
  2006.         if (isNull)
  2007.          (*field)->set_null(row_offset);
  2008.       }
  2009.     }
  2010.   }
  2011. #ifndef DBUG_OFF
  2012.   // Read and print all values that was fetched
  2013.   if (table->primary_key == MAX_KEY)
  2014.   {
  2015.     // Table with hidden primary key
  2016.     int hidden_no= table->fields;
  2017.     const NDBTAB *tab= (const NDBTAB *) m_table;
  2018.     const NDBCOL *hidden_col= tab->getColumn(hidden_no);
  2019.     NdbRecAttr* rec= m_value[hidden_no].rec;
  2020.     DBUG_ASSERT(rec);
  2021.     DBUG_PRINT("hidden", ("%d: %s "%llu"", hidden_no, 
  2022.                           hidden_col->getName(), rec->u_64_value()));
  2023.   } 
  2024.   print_results();
  2025. #endif
  2026.   DBUG_VOID_RETURN;
  2027. }
  2028. /*
  2029.   Utility function to print/dump the fetched field
  2030.  */
  2031. void ha_ndbcluster::print_results()
  2032. {
  2033.   const NDBTAB *tab= (const NDBTAB*) m_table;
  2034.   DBUG_ENTER("print_results");
  2035. #ifndef DBUG_OFF
  2036.   if (!_db_on_)
  2037.     DBUG_VOID_RETURN;
  2038.   
  2039.   for (uint f=0; f<table->fields;f++)
  2040.   {
  2041.     Field *field;
  2042.     const NDBCOL *col;
  2043.     NdbValue value;
  2044.     if (!(value= m_value[f]).ptr)
  2045.     {
  2046.       fprintf(DBUG_FILE, "Field %d was not readn", f);
  2047.       continue;
  2048.     }
  2049.     field= table->field[f];
  2050.     DBUG_DUMP("field->ptr", (char*)field->ptr, field->pack_length());
  2051.     col= tab->getColumn(f);
  2052.     fprintf(DBUG_FILE, "%d: %st", f, col->getName());
  2053.     NdbBlob *ndb_blob= NULL;
  2054.     if (! (field->flags & BLOB_FLAG))
  2055.     {
  2056.       if (value.rec->isNULL())
  2057.       {
  2058.         fprintf(DBUG_FILE, "NULLn");
  2059.         continue;
  2060.       }
  2061.     }
  2062.     else
  2063.     {
  2064.       ndb_blob= value.blob;
  2065.       bool isNull= TRUE;
  2066.       ndb_blob->getNull(isNull);
  2067.       if (isNull) {
  2068.         fprintf(DBUG_FILE, "NULLn");
  2069.         continue;
  2070.       }
  2071.     }
  2072.     switch (col->getType()) {
  2073.     case NdbDictionary::Column::Tinyint: {
  2074.       char value= *field->ptr;
  2075.       fprintf(DBUG_FILE, "Tinyintt%d", value);
  2076.       break;
  2077.     }
  2078.     case NdbDictionary::Column::Tinyunsigned: {
  2079.       unsigned char value= *field->ptr;
  2080.       fprintf(DBUG_FILE, "Tinyunsignedt%u", value);
  2081.       break;
  2082.     }
  2083.     case NdbDictionary::Column::Smallint: {
  2084.       short value= *field->ptr;
  2085.       fprintf(DBUG_FILE, "Smallintt%d", value);
  2086.       break;
  2087.     }
  2088.     case NdbDictionary::Column::Smallunsigned: {
  2089.       unsigned short value= *field->ptr;
  2090.       fprintf(DBUG_FILE, "Smallunsignedt%u", value);
  2091.       break;
  2092.     }
  2093.     case NdbDictionary::Column::Mediumint: {
  2094.       byte value[3];
  2095.       memcpy(value, field->ptr, 3);
  2096.       fprintf(DBUG_FILE, "Mediumintt%d,%d,%d", value[0], value[1], value[2]);
  2097.       break;
  2098.     }
  2099.     case NdbDictionary::Column::Mediumunsigned: {
  2100.       byte value[3];
  2101.       memcpy(value, field->ptr, 3);
  2102.       fprintf(DBUG_FILE, "Mediumunsignedt%u,%u,%u", value[0], value[1], value[2]);
  2103.       break;
  2104.     }
  2105.     case NdbDictionary::Column::Int: {
  2106.       fprintf(DBUG_FILE, "Intt%lld", field->val_int());
  2107.       break;
  2108.     }
  2109.     case NdbDictionary::Column::Unsigned: {
  2110.       Uint32 value= (Uint32) *field->ptr;
  2111.       fprintf(DBUG_FILE, "Unsignedt%u", value);
  2112.       break;
  2113.     }
  2114.     case NdbDictionary::Column::Bigint: {
  2115.       Int64 value= (Int64) *field->ptr;
  2116.       fprintf(DBUG_FILE, "Bigintt%lld", value);
  2117.       break;
  2118.     }
  2119.     case NdbDictionary::Column::Bigunsigned: {
  2120.       Uint64 value= (Uint64) *field->ptr;
  2121.       fprintf(DBUG_FILE, "Bigunsignedt%llu", value);
  2122.       break;
  2123.     }
  2124.     case NdbDictionary::Column::Float: {
  2125.       float value= (float) *field->ptr;
  2126.       fprintf(DBUG_FILE, "Floatt%f", value);
  2127.       break;
  2128.     }
  2129.     case NdbDictionary::Column::Double: {
  2130.       double value= (double) *field->ptr;
  2131.       fprintf(DBUG_FILE, "Doublet%f", value);
  2132.       break;
  2133.     }
  2134.     case NdbDictionary::Column::Olddecimal: {
  2135.       char *value= field->ptr;
  2136.       fprintf(DBUG_FILE, "Olddecimalt'%-*s'", field->pack_length(), value);
  2137.       break;
  2138.     }
  2139.     case NdbDictionary::Column::Olddecimalunsigned: {
  2140.       char *value= field->ptr;
  2141.       fprintf(DBUG_FILE, "Olddecimalunsignedt'%-*s'", field->pack_length(), value);
  2142.       break;
  2143.     }
  2144.     case NdbDictionary::Column::Char:{
  2145.       const char *value= (char *) field->ptr;
  2146.       fprintf(DBUG_FILE, "Chart'%.*s'", field->pack_length(), value);
  2147.       break;
  2148.     }
  2149.     case NdbDictionary::Column::Varchar:
  2150.     case NdbDictionary::Column::Binary:
  2151.     case NdbDictionary::Column::Varbinary: {
  2152.       const char *value= (char *) field->ptr;
  2153.       fprintf(DBUG_FILE, "Vart'%.*s'", field->pack_length(), value);
  2154.       break;
  2155.     }
  2156.     case NdbDictionary::Column::Datetime: {
  2157.       Uint64 value= (Uint64) *field->ptr;
  2158.       fprintf(DBUG_FILE, "Datetimet%llu", value);
  2159.       break;
  2160.     }
  2161.     case NdbDictionary::Column::Date: {
  2162.       Uint64 value= (Uint64) *field->ptr;
  2163.       fprintf(DBUG_FILE, "Datet%llu", value);
  2164.       break;
  2165.     }
  2166.     case NdbDictionary::Column::Time: {
  2167.       Uint64 value= (Uint64) *field->ptr;
  2168.       fprintf(DBUG_FILE, "Timet%llu", value);
  2169.       break;
  2170.     }
  2171.     case NdbDictionary::Column::Blob: {
  2172.       Uint64 len= 0;
  2173.       ndb_blob->getLength(len);
  2174.       fprintf(DBUG_FILE, "Blobt[len=%u]", (unsigned)len);
  2175.       break;
  2176.     }
  2177.     case NdbDictionary::Column::Text: {
  2178.       Uint64 len= 0;
  2179.       ndb_blob->getLength(len);
  2180.       fprintf(DBUG_FILE, "Textt[len=%u]", (unsigned)len);
  2181.       break;
  2182.     }
  2183.     case NdbDictionary::Column::Undefined:
  2184.     default:
  2185.       fprintf(DBUG_FILE, "Unknown type: %d", col->getType());
  2186.       break;
  2187.     }
  2188.     fprintf(DBUG_FILE, "n");
  2189.     
  2190.   }
  2191. #endif
  2192.   DBUG_VOID_RETURN;
  2193. }
  2194. int ha_ndbcluster::index_init(uint index)
  2195. {
  2196.   DBUG_ENTER("index_init");
  2197.   DBUG_PRINT("enter", ("index: %u", index));
  2198.   DBUG_RETURN(handler::index_init(index));
  2199. }
  2200. int ha_ndbcluster::index_end()
  2201. {
  2202.   DBUG_ENTER("index_end");
  2203.   DBUG_RETURN(close_scan());
  2204. }
  2205. /**
  2206.  * Check if key contains null
  2207.  */
  2208. static
  2209. int
  2210. check_null_in_key(const KEY* key_info, const byte *key, uint key_len)
  2211. {
  2212.   KEY_PART_INFO *curr_part, *end_part;
  2213.   const byte* end_ptr = key + key_len;
  2214.   curr_part= key_info->key_part;
  2215.   end_part= curr_part + key_info->key_parts;
  2216.   
  2217.   for (; curr_part != end_part && key < end_ptr; curr_part++)
  2218.   {
  2219.     if(curr_part->null_bit && *key)
  2220.       return 1;
  2221.     key += curr_part->store_length;
  2222.   }
  2223.   return 0;
  2224. }
  2225. int ha_ndbcluster::index_read(byte *buf,
  2226.       const byte *key, uint key_len, 
  2227.       enum ha_rkey_function find_flag)
  2228. {
  2229.   DBUG_ENTER("index_read");
  2230.   DBUG_PRINT("enter", ("active_index: %u, key_len: %u, find_flag: %d", 
  2231.                        active_index, key_len, find_flag));
  2232.   int error;
  2233.   ndb_index_type type = get_index_type(active_index);
  2234.   const KEY* key_info = table->key_info+active_index;
  2235.   switch (type){
  2236.   case PRIMARY_KEY_ORDERED_INDEX:
  2237.   case PRIMARY_KEY_INDEX:
  2238.     if (find_flag == HA_READ_KEY_EXACT && key_info->key_length == key_len)
  2239.     {
  2240.       if(m_active_cursor && (error= close_scan()))
  2241. DBUG_RETURN(error);
  2242.       DBUG_RETURN(pk_read(key, key_len, buf));
  2243.     }
  2244.     else if (type == PRIMARY_KEY_INDEX)
  2245.     {
  2246.       DBUG_RETURN(1);
  2247.     }
  2248.     break;
  2249.   case UNIQUE_ORDERED_INDEX:
  2250.   case UNIQUE_INDEX:
  2251.     if (find_flag == HA_READ_KEY_EXACT && key_info->key_length == key_len &&
  2252. !check_null_in_key(key_info, key, key_len))
  2253.     {
  2254.       if(m_active_cursor && (error= close_scan()))
  2255. DBUG_RETURN(error);
  2256.       DBUG_RETURN(unique_index_read(key, key_len, buf));
  2257.     }
  2258.     else if (type == UNIQUE_INDEX)
  2259.     {
  2260.       DBUG_RETURN(1);
  2261.     }
  2262.     break;
  2263.   case ORDERED_INDEX:
  2264.     break;
  2265.   default:
  2266.   case UNDEFINED_INDEX:
  2267.     DBUG_ASSERT(FALSE);
  2268.     DBUG_RETURN(1);
  2269.     break;
  2270.   }
  2271.   
  2272.   key_range start_key;
  2273.   start_key.key = key;
  2274.   start_key.length = key_len;
  2275.   start_key.flag = find_flag;
  2276.   error= ordered_index_scan(&start_key, 0, TRUE, buf);  
  2277.   DBUG_RETURN(error == HA_ERR_END_OF_FILE ? HA_ERR_KEY_NOT_FOUND : error);
  2278. }
  2279. int ha_ndbcluster::index_read_idx(byte *buf, uint index_no, 
  2280.       const byte *key, uint key_len, 
  2281.       enum ha_rkey_function find_flag)
  2282. {
  2283.   statistic_increment(ha_read_key_count,&LOCK_status);
  2284.   DBUG_ENTER("index_read_idx");
  2285.   DBUG_PRINT("enter", ("index_no: %u, key_len: %u", index_no, key_len));  
  2286.   index_init(index_no);  
  2287.   DBUG_RETURN(index_read(buf, key, key_len, find_flag));
  2288. }
  2289. int ha_ndbcluster::index_next(byte *buf)
  2290. {
  2291.   DBUG_ENTER("index_next");
  2292.   int error= 1;
  2293.   statistic_increment(ha_read_next_count,&LOCK_status);
  2294.   DBUG_RETURN(next_result(buf));
  2295. }
  2296. int ha_ndbcluster::index_prev(byte *buf)
  2297. {
  2298.   DBUG_ENTER("index_prev");
  2299.   statistic_increment(ha_read_prev_count,&LOCK_status);
  2300.   DBUG_RETURN(1);
  2301. }
  2302. int ha_ndbcluster::index_first(byte *buf)
  2303. {
  2304.   DBUG_ENTER("index_first");
  2305.   statistic_increment(ha_read_first_count,&LOCK_status);
  2306.   // Start the ordered index scan and fetch the first row
  2307.   // Only HA_READ_ORDER indexes get called by index_first
  2308.   DBUG_RETURN(ordered_index_scan(0, 0, TRUE, buf));
  2309. }
  2310. int ha_ndbcluster::index_last(byte *buf)
  2311. {
  2312.   DBUG_ENTER("index_last");
  2313.   statistic_increment(ha_read_last_count,&LOCK_status);
  2314.   int res;
  2315.   if((res= ordered_index_scan(0, 0, TRUE, buf)) == 0){
  2316.     NdbResultSet *cursor= m_active_cursor; 
  2317.     while((res= cursor->nextResult(TRUE, m_force_send)) == 0);
  2318.     if(res == 1){
  2319.       unpack_record(buf);
  2320.       table->status= 0;     
  2321.       DBUG_RETURN(0);
  2322.     }
  2323.   }