row0ins.c
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:61k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /******************************************************
  2. Insert into a table
  3. (c) 1996 Innobase Oy
  4. Created 4/20/1996 Heikki Tuuri
  5. *******************************************************/
  6. #include "row0ins.h"
  7. #ifdef UNIV_NONINL
  8. #include "row0ins.ic"
  9. #endif
  10. #include "dict0dict.h"
  11. #include "dict0boot.h"
  12. #include "trx0undo.h"
  13. #include "btr0btr.h"
  14. #include "btr0cur.h"
  15. #include "mach0data.h"
  16. #include "que0que.h"
  17. #include "row0upd.h"
  18. #include "row0sel.h"
  19. #include "row0row.h"
  20. #include "rem0cmp.h"
  21. #include "lock0lock.h"
  22. #include "log0log.h"
  23. #include "eval0eval.h"
  24. #include "data0data.h"
  25. #include "usr0sess.h"
  26. #define ROW_INS_PREV 1
  27. #define ROW_INS_NEXT 2
  28. /*********************************************************************
  29. This prototype is copied from /mysql/sql/ha_innodb.cc.
  30. Invalidates the MySQL query cache for the table.
  31. NOTE that the exact prototype of this function has to be in
  32. /innobase/row/row0ins.c! */
  33. extern
  34. void
  35. innobase_invalidate_query_cache(
  36. /*============================*/
  37. trx_t* trx, /* in: transaction which modifies the table */
  38. char* full_name, /* in: concatenation of database name, null
  39. char '', table name, null char'';
  40. NOTE that in Windows this is always
  41. in LOWER CASE! */
  42. ulint full_name_len); /* in: full name length where also the null
  43. chars count */
  44. /**********************************************************************
  45. This function returns true if 
  46. 1) SQL-query in the current thread
  47. is either REPLACE or LOAD DATA INFILE REPLACE. 
  48. 2) SQL-query in the current thread
  49. is INSERT ON DUPLICATE KEY UPDATE.
  50. NOTE that /mysql/innobase/row/row0ins.c must contain the 
  51. prototype for this function ! */
  52. ibool
  53. innobase_query_is_update(void);
  54. /*************************************************************************
  55. Creates an insert node struct. */
  56. ins_node_t*
  57. ins_node_create(
  58. /*============*/
  59. /* out, own: insert node struct */
  60. ulint ins_type, /* in: INS_VALUES, ... */
  61. dict_table_t* table,  /* in: table where to insert */
  62. mem_heap_t* heap) /* in: mem heap where created */
  63. {
  64. ins_node_t* node;
  65. node = mem_heap_alloc(heap, sizeof(ins_node_t));
  66. node->common.type = QUE_NODE_INSERT;
  67. node->ins_type = ins_type;
  68. node->state = INS_NODE_SET_IX_LOCK;
  69. node->table = table;
  70. node->index = NULL;
  71. node->entry = NULL;
  72. node->select = NULL;
  73. node->trx_id = ut_dulint_zero;
  74. node->entry_sys_heap = mem_heap_create(128);
  75. node->magic_n = INS_NODE_MAGIC_N;
  76. return(node);
  77. }
  78. /***************************************************************
  79. Creates an entry template for each index of a table. */
  80. static
  81. void
  82. ins_node_create_entry_list(
  83. /*=======================*/
  84. ins_node_t* node) /* in: row insert node */
  85. {
  86. dict_index_t* index;
  87. dtuple_t* entry;
  88. ut_ad(node->entry_sys_heap);
  89. UT_LIST_INIT(node->entry_list);
  90. index = dict_table_get_first_index(node->table);
  91. while (index != NULL) {
  92. entry = row_build_index_entry(node->row, index,
  93. node->entry_sys_heap);
  94. UT_LIST_ADD_LAST(tuple_list, node->entry_list, entry);
  95. index = dict_table_get_next_index(index);
  96. }
  97. }
  98. /*********************************************************************
  99. Adds system field buffers to a row. */
  100. static
  101. void
  102. row_ins_alloc_sys_fields(
  103. /*=====================*/
  104. ins_node_t* node) /* in: insert node */
  105. {
  106. dtuple_t* row;
  107. dict_table_t* table;
  108. mem_heap_t* heap;
  109. dict_col_t* col;
  110. dfield_t* dfield;
  111. ulint len;
  112. byte* ptr;
  113. row = node->row;
  114. table = node->table;
  115. heap = node->entry_sys_heap;
  116. ut_ad(row && table && heap);
  117. ut_ad(dtuple_get_n_fields(row) == dict_table_get_n_cols(table));
  118. /* 1. Allocate buffer for row id */
  119. col = dict_table_get_sys_col(table, DATA_ROW_ID);
  120. dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
  121. ptr = mem_heap_alloc(heap, DATA_ROW_ID_LEN);
  122. dfield_set_data(dfield, ptr, DATA_ROW_ID_LEN);
  123. node->row_id_buf = ptr;
  124. if (table->type == DICT_TABLE_CLUSTER_MEMBER) {
  125. /* 2. Fill in the dfield for mix id */
  126. col = dict_table_get_sys_col(table, DATA_MIX_ID);
  127. dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
  128. len = mach_dulint_get_compressed_size(table->mix_id);
  129. ptr = mem_heap_alloc(heap, DATA_MIX_ID_LEN);
  130. mach_dulint_write_compressed(ptr, table->mix_id);
  131. dfield_set_data(dfield, ptr, len);
  132. }
  133. /* 3. Allocate buffer for trx id */
  134. col = dict_table_get_sys_col(table, DATA_TRX_ID);
  135. dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
  136. ptr = mem_heap_alloc(heap, DATA_TRX_ID_LEN);
  137. dfield_set_data(dfield, ptr, DATA_TRX_ID_LEN);
  138. node->trx_id_buf = ptr;
  139. /* 4. Allocate buffer for roll ptr */
  140. col = dict_table_get_sys_col(table, DATA_ROLL_PTR);
  141. dfield = dtuple_get_nth_field(row, dict_col_get_no(col));
  142. ptr = mem_heap_alloc(heap, DATA_ROLL_PTR_LEN);
  143. dfield_set_data(dfield, ptr, DATA_ROLL_PTR_LEN);
  144. }
  145. /*************************************************************************
  146. Sets a new row to insert for an INS_DIRECT node. This function is only used
  147. if we have constructed the row separately, which is a rare case; this
  148. function is quite slow. */
  149. void
  150. ins_node_set_new_row(
  151. /*=================*/
  152. ins_node_t* node, /* in: insert node */
  153. dtuple_t* row) /* in: new row (or first row) for the node */
  154. {
  155. node->state = INS_NODE_SET_IX_LOCK;
  156. node->index = NULL;
  157. node->entry = NULL;
  158. node->row = row;
  159. mem_heap_empty(node->entry_sys_heap);
  160. /* Create templates for index entries */
  161. ins_node_create_entry_list(node);
  162. /* Allocate from entry_sys_heap buffers for sys fields */
  163. row_ins_alloc_sys_fields(node);
  164. /* As we allocated a new trx id buf, the trx id should be written
  165. there again: */
  166. node->trx_id = ut_dulint_zero;
  167. }
  168. /***********************************************************************
  169. Does an insert operation by updating a delete-marked existing record
  170. in the index. This situation can occur if the delete-marked record is
  171. kept in the index for consistent reads. */
  172. static
  173. ulint
  174. row_ins_sec_index_entry_by_modify(
  175. /*==============================*/
  176. /* out: DB_SUCCESS or error code */
  177. ulint mode, /* in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
  178. depending on whether mtr holds just a leaf
  179. latch or also a tree latch */
  180. btr_cur_t* cursor, /* in: B-tree cursor */
  181. dtuple_t* entry, /* in: index entry to insert */
  182. que_thr_t* thr, /* in: query thread */
  183. mtr_t* mtr) /* in: mtr */
  184. {
  185. big_rec_t* dummy_big_rec;
  186. mem_heap_t* heap;
  187. upd_t* update;
  188. rec_t* rec;
  189. ulint err;
  190. rec = btr_cur_get_rec(cursor);
  191. ut_ad((cursor->index->type & DICT_CLUSTERED) == 0);
  192. ut_ad(rec_get_deleted_flag(rec));
  193. /* We know that in the alphabetical ordering, entry and rec are
  194. identified. But in their binary form there may be differences if
  195. there are char fields in them. Therefore we have to calculate the
  196. difference. */
  197. heap = mem_heap_create(1024);
  198. update = row_upd_build_sec_rec_difference_binary(cursor->index,
  199. entry, rec, thr_get_trx(thr), heap);
  200. if (mode == BTR_MODIFY_LEAF) {
  201. /* Try an optimistic updating of the record, keeping changes
  202. within the page */
  203. err = btr_cur_optimistic_update(BTR_KEEP_SYS_FLAG, cursor,
  204. update, 0, thr, mtr);
  205. if (err == DB_OVERFLOW || err == DB_UNDERFLOW) {
  206. err = DB_FAIL;
  207. }
  208. } else  {
  209. ut_a(mode == BTR_MODIFY_TREE);
  210. err = btr_cur_pessimistic_update(BTR_KEEP_SYS_FLAG, cursor,
  211. &dummy_big_rec, update, 0, thr, mtr);
  212. }
  213. mem_heap_free(heap);
  214. return(err);
  215. }
  216. /***********************************************************************
  217. Does an insert operation by delete unmarking and updating a delete marked
  218. existing record in the index. This situation can occur if the delete marked
  219. record is kept in the index for consistent reads. */
  220. static
  221. ulint
  222. row_ins_clust_index_entry_by_modify(
  223. /*================================*/
  224. /* out: DB_SUCCESS, DB_FAIL, or error code */
  225. ulint mode, /* in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
  226. depending on whether mtr holds just a leaf
  227. latch or also a tree latch */
  228. btr_cur_t* cursor, /* in: B-tree cursor */
  229. big_rec_t** big_rec,/* out: possible big rec vector of fields
  230. which have to be stored externally by the
  231. caller */
  232. dtuple_t* entry, /* in: index entry to insert */
  233. ulint* ext_vec,/* in: array containing field numbers of
  234. externally stored fields in entry, or NULL */
  235. ulint n_ext_vec,/* in: number of fields in ext_vec */
  236. que_thr_t* thr, /* in: query thread */
  237. mtr_t* mtr) /* in: mtr */
  238. {
  239. mem_heap_t* heap;
  240. rec_t* rec;
  241. upd_t* update;
  242. ulint err;
  243. ut_ad(cursor->index->type & DICT_CLUSTERED);
  244. *big_rec = NULL;
  245. rec = btr_cur_get_rec(cursor);
  246. ut_ad(rec_get_deleted_flag(rec));
  247. heap = mem_heap_create(1024);
  248. /* Build an update vector containing all the fields to be modified;
  249. NOTE that this vector may NOT contain system columns trx_id or
  250. roll_ptr */
  251. update = row_upd_build_difference_binary(cursor->index, entry, ext_vec,
  252. n_ext_vec, rec, thr_get_trx(thr), heap);
  253. if (mode == BTR_MODIFY_LEAF) {
  254. /* Try optimistic updating of the record, keeping changes
  255. within the page */
  256. err = btr_cur_optimistic_update(0, cursor, update, 0, thr,
  257.    mtr);
  258. if (err == DB_OVERFLOW || err == DB_UNDERFLOW) {
  259. err = DB_FAIL;
  260. }
  261. } else  {
  262. ut_a(mode == BTR_MODIFY_TREE);
  263. err = btr_cur_pessimistic_update(0, cursor, big_rec, update,
  264. 0, thr, mtr);
  265. }
  266. mem_heap_free(heap);
  267. return(err);
  268. }
  269. /*************************************************************************
  270. Returns TRUE if in a cascaded update/delete an ancestor node of node
  271. updates (not DELETE, but UPDATE) table. */
  272. static
  273. ibool
  274. row_ins_cascade_ancestor_updates_table(
  275. /*===================================*/
  276. /* out: TRUE if an ancestor updates table */
  277. que_node_t* node, /* in: node in a query graph */
  278. dict_table_t* table) /* in: table */
  279. {
  280. que_node_t* parent;
  281. upd_node_t* upd_node;
  282. parent = que_node_get_parent(node);
  283. while (que_node_get_type(parent) == QUE_NODE_UPDATE) {
  284. upd_node = parent;
  285. if (upd_node->table == table && upd_node->is_delete == FALSE) {
  286. return(TRUE);
  287. }
  288. parent = que_node_get_parent(parent);
  289. ut_a(parent);
  290. }
  291. return(FALSE);
  292. }
  293. /*************************************************************************
  294. Returns the number of ancestor UPDATE or DELETE nodes of a
  295. cascaded update/delete node. */
  296. static
  297. ulint
  298. row_ins_cascade_n_ancestors(
  299. /*========================*/
  300. /* out: number of ancestors */
  301. que_node_t* node) /* in: node in a query graph */
  302. {
  303. que_node_t* parent;
  304. ulint n_ancestors = 0;
  305. parent = que_node_get_parent(node);
  306. while (que_node_get_type(parent) == QUE_NODE_UPDATE) {
  307. n_ancestors++;
  308. parent = que_node_get_parent(parent);
  309. ut_a(parent);
  310. }
  311. return(n_ancestors);
  312. }
  313. /**********************************************************************
  314. Calculates the update vector node->cascade->update for a child table in
  315. a cascaded update. */
  316. static
  317. ulint
  318. row_ins_cascade_calc_update_vec(
  319. /*============================*/
  320. /* out: number of fields in the
  321. calculated update vector; the value
  322. can also be 0 if no foreign key
  323. fields changed; the returned value
  324. is ULINT_UNDEFINED if the column
  325. type in the child table is too short
  326. to fit the new value in the parent
  327. table: that means the update fails */
  328. upd_node_t* node, /* in: update node of the parent
  329. table */
  330. dict_foreign_t* foreign, /* in: foreign key constraint whose
  331. type is != 0 */
  332. mem_heap_t* heap) /* in: memory heap to use as
  333. temporary storage */
  334. {
  335. upd_node_t* cascade = node->cascade_node;
  336. dict_table_t* table = foreign->foreign_table;
  337. dict_index_t* index = foreign->foreign_index;
  338. upd_t* update;
  339. upd_field_t* ufield;
  340. dict_table_t* parent_table;
  341. dict_index_t* parent_index;
  342. upd_t* parent_update;
  343. upd_field_t* parent_ufield;
  344. ulint n_fields_updated;
  345. ulint           parent_field_no;
  346. dtype_t* type;
  347. ulint i;
  348. ulint j;
  349.     
  350. ut_a(node && foreign && cascade && table && index);
  351. /* Calculate the appropriate update vector which will set the fields
  352. in the child index record to the same value (possibly padded with 
  353. spaces if the column is a fixed length CHAR or FIXBINARY column) as
  354. the referenced index record will get in the update. */
  355. parent_table = node->table;
  356. ut_a(parent_table == foreign->referenced_table);
  357. parent_index = foreign->referenced_index;
  358. parent_update = node->update;
  359. update = cascade->update;
  360. update->info_bits = 0;
  361. update->n_fields = foreign->n_fields;
  362. n_fields_updated = 0;
  363. for (i = 0; i < foreign->n_fields; i++) {
  364. parent_field_no = dict_table_get_nth_col_pos(
  365. parent_table,
  366. dict_index_get_nth_col_no(
  367. parent_index, i));
  368. for (j = 0; j < parent_update->n_fields; j++) {
  369. parent_ufield = parent_update->fields + j;
  370. if (parent_ufield->field_no == parent_field_no) {
  371. /* A field in the parent index record is
  372. updated. Let us make the update vector
  373. field for the child table. */
  374.   ufield = update->fields + n_fields_updated;
  375. ufield->field_no =
  376. dict_table_get_nth_col_pos(table,
  377. dict_index_get_nth_col_no(index, i));
  378. ufield->exp = NULL;
  379. ufield->new_val = parent_ufield->new_val;
  380. type = dict_index_get_nth_type(index, i);
  381. /* Do not allow a NOT NULL column to be
  382. updated as NULL */
  383. if (ufield->new_val.len == UNIV_SQL_NULL
  384.     && (type->prtype & DATA_NOT_NULL)) {
  385.         return(ULINT_UNDEFINED);
  386. }
  387. /* If the new value would not fit in the
  388. column, do not allow the update */
  389. if (ufield->new_val.len != UNIV_SQL_NULL
  390.     && ufield->new_val.len
  391.        > dtype_get_len(type)) {
  392.         return(ULINT_UNDEFINED);
  393. }
  394. /* If the parent column type has a different
  395. length than the child column type, we may
  396. need to pad with spaces the new value of the
  397. child column */
  398. if (dtype_is_fixed_size(type)
  399.     && ufield->new_val.len != UNIV_SQL_NULL
  400.     && ufield->new_val.len
  401.        < dtype_get_fixed_size(type)) {
  402.                                         ulint cset;
  403.         ufield->new_val.data =
  404. mem_heap_alloc(heap,
  405.   dtype_get_fixed_size(type));
  406. ufield->new_val.len = 
  407. dtype_get_fixed_size(type);
  408.                                         /* Handle UCS2 strings differently.
  409.                                         As no new collations will be
  410.                                         introduced in 4.1, we hardcode the
  411.                                         charset-collation codes here.
  412.                                         In 5.0, the logic is based on
  413.                                         mbminlen. */
  414.                                         cset = dtype_get_charset_coll(
  415.                                           dtype_get_prtype(type));
  416.                                         if (cset == 35/*ucs2_general_ci*/
  417.                                             || cset == 90/*ucs2_bin*/
  418.                                             || (cset >= 128/*ucs2_unicode_ci*/
  419.                                           && cset <= 144
  420.                                           /*ucs2_persian_ci*/)) {
  421.                                           /* space=0x0020 */
  422.                                           ulint i;
  423.                                           for (i = 0;
  424.                                                i < ufield->new_val.len;
  425.                                                i += 2) {
  426.                                             mach_write_to_2(((byte*)
  427.                                               ufield->new_val.data)
  428.                                               + i, 0x0020);
  429.                                           }
  430.                                         } else {
  431.                                           ut_a(dtype_get_pad_char(type)
  432.                                               != ULINT_UNDEFINED);
  433.                                           memset(ufield->new_val.data,
  434.                                               (byte)dtype_get_pad_char(
  435.                                                 type),
  436.                                               ufield->new_val.len);
  437.                                         }
  438.                                         memcpy(ufield->new_val.data,
  439.                                           parent_ufield->new_val.data,
  440.                                           parent_ufield->new_val.len);
  441. }
  442. ufield->extern_storage = FALSE;
  443. n_fields_updated++;
  444. }
  445. }
  446. }
  447. update->n_fields = n_fields_updated;
  448. return(n_fields_updated);
  449. }
  450. /*************************************************************************
  451. Reports a foreign key error associated with an update or a delete of a
  452. parent table index entry. */
  453. static
  454. void
  455. row_ins_foreign_report_err(
  456. /*=======================*/
  457. const char* errstr, /* in: error string from the viewpoint
  458. of the parent table */
  459. que_thr_t* thr, /* in: query thread whose run_node
  460. is an update node */
  461. dict_foreign_t* foreign, /* in: foreign key constraint */
  462. rec_t* rec, /* in: a matching index record in the
  463. child table */
  464. dtuple_t* entry) /* in: index entry in the parent
  465. table */
  466. {
  467. FILE* ef = dict_foreign_err_file;
  468. trx_t* trx = thr_get_trx(thr);
  469. mutex_enter(&dict_foreign_err_mutex);
  470. rewind(ef);
  471. ut_print_timestamp(ef);
  472. fputs(" Transaction:n", ef);
  473. trx_print(ef, trx);
  474. fputs("Foreign key constraint fails for table ", ef);
  475. ut_print_name(ef, trx, foreign->foreign_table_name);
  476. fputs(":n", ef);
  477. dict_print_info_on_foreign_key_in_create_format(ef, trx, foreign);
  478. putc('n', ef);
  479. fputs(errstr, ef);
  480. fputs(" in parent table, in index ", ef);
  481. ut_print_name(ef, trx, foreign->referenced_index->name);
  482. if (entry) {
  483. fputs(" tuple:n", ef);
  484. dtuple_print(ef, entry);
  485. }
  486. fputs("nBut in child table ", ef);
  487. ut_print_name(ef, trx, foreign->foreign_table_name);
  488. fputs(", in index ", ef);
  489. ut_print_name(ef, trx, foreign->foreign_index->name);
  490. if (rec) {
  491. fputs(", there is a record:n", ef);
  492. rec_print(ef, rec);
  493. } else {
  494. fputs(", the record is not availablen", ef);
  495. }
  496. putc('n', ef);
  497. mutex_exit(&dict_foreign_err_mutex);
  498. }
  499. /*************************************************************************
  500. Reports a foreign key error to dict_foreign_err_buf when we are trying
  501. to add an index entry to a child table. Note that the adding may be the result
  502. of an update, too. */
  503. static
  504. void
  505. row_ins_foreign_report_add_err(
  506. /*===========================*/
  507. trx_t* trx, /* in: transaction */
  508. dict_foreign_t* foreign, /* in: foreign key constraint */
  509. rec_t* rec, /* in: a record in the parent table:
  510. it does not match entry because we
  511. have an error! */
  512. dtuple_t* entry) /* in: index entry to insert in the
  513. child table */
  514. {
  515. FILE* ef = dict_foreign_err_file;
  516. mutex_enter(&dict_foreign_err_mutex);
  517. rewind(ef);
  518. ut_print_timestamp(ef);
  519. fputs(" Transaction:n", ef);
  520. trx_print(ef, trx);
  521. fputs("Foreign key constraint fails for table ", ef);
  522. ut_print_name(ef, trx, foreign->foreign_table_name);
  523. fputs(":n", ef);
  524. dict_print_info_on_foreign_key_in_create_format(ef, trx, foreign);
  525. fputs("nTrying to add in child table, in index ", ef);
  526. ut_print_name(ef, trx, foreign->foreign_index->name);
  527. if (entry) {
  528. fputs(" tuple:n", ef);
  529. dtuple_print(ef, entry);
  530. }
  531. fputs("nBut in parent table ", ef);
  532. ut_print_name(ef, trx, foreign->referenced_table_name);
  533. fputs(", in index ", ef);
  534. ut_print_name(ef, trx, foreign->referenced_index->name);
  535. fputs(",nthe closest match we can find is record:n", ef);
  536. if (rec && page_rec_is_supremum(rec)) {
  537. /* If the cursor ended on a supremum record, it is better
  538. to report the previous record in the error message, so that
  539. the user gets a more descriptive error message. */
  540. rec = page_rec_get_prev(rec);
  541. }
  542. if (rec) {
  543. rec_print(ef, rec);
  544. }
  545. putc('n', ef);
  546. mutex_exit(&dict_foreign_err_mutex);
  547. }
  548. /*************************************************************************
  549. Invalidate the query cache for the given table. */
  550. static
  551. void
  552. row_ins_invalidate_query_cache(
  553. /*===========================*/
  554. que_thr_t* thr, /* in: query thread whose run_node
  555. is an update node */
  556. const char* name) /* in: table name prefixed with
  557. database name and a '/' character */
  558. {
  559. char* buf;
  560. char* ptr;
  561. ulint len = strlen(name) + 1;
  562. buf = mem_strdupl(name, len);
  563. ptr = strchr(buf, '/');
  564. ut_a(ptr);
  565. *ptr = '';
  566. /* We call a function in ha_innodb.cc */
  567. #ifndef UNIV_HOTBACKUP
  568. innobase_invalidate_query_cache(thr_get_trx(thr), buf, len);
  569. #endif
  570. mem_free(buf);
  571. }
  572. /*************************************************************************
  573. Perform referential actions or checks when a parent row is deleted or updated
  574. and the constraint had an ON DELETE or ON UPDATE condition which was not
  575. RESTRICT. */
  576. static
  577. ulint
  578. row_ins_foreign_check_on_constraint(
  579. /*================================*/
  580. /* out: DB_SUCCESS, DB_LOCK_WAIT,
  581. or error code */
  582. que_thr_t* thr, /* in: query thread whose run_node
  583. is an update node */
  584. dict_foreign_t* foreign, /* in: foreign key constraint whose
  585. type is != 0 */
  586. btr_pcur_t* pcur, /* in: cursor placed on a matching
  587. index record in the child table */
  588. dtuple_t* entry, /* in: index entry in the parent
  589. table */
  590. mtr_t* mtr) /* in: mtr holding the latch of pcur
  591. page */
  592. {
  593. upd_node_t* node;
  594. upd_node_t* cascade;
  595. dict_table_t* table = foreign->foreign_table;
  596. dict_index_t* index;
  597. dict_index_t* clust_index;
  598. dtuple_t* ref;
  599. mem_heap_t* tmp_heap;
  600. mem_heap_t* upd_vec_heap = NULL;
  601. rec_t* rec;
  602. rec_t* clust_rec;
  603. upd_t* update;
  604. ulint n_to_update;
  605. ulint err;
  606. ulint i;
  607. trx_t* trx;
  608. ut_a(thr && foreign && pcur && mtr);
  609. trx = thr_get_trx(thr);
  610. /* Since we are going to delete or update a row, we have to invalidate
  611. the MySQL query cache for table */
  612. row_ins_invalidate_query_cache(thr, table->name);
  613. node = thr->run_node;
  614. if (node->is_delete && 0 == (foreign->type &
  615. (DICT_FOREIGN_ON_DELETE_CASCADE
  616.  | DICT_FOREIGN_ON_DELETE_SET_NULL))) {
  617. row_ins_foreign_report_err("Trying to delete",
  618. thr, foreign,
  619. btr_pcur_get_rec(pcur), entry);
  620.         return(DB_ROW_IS_REFERENCED);
  621. }
  622. if (!node->is_delete && 0 == (foreign->type &
  623. (DICT_FOREIGN_ON_UPDATE_CASCADE
  624.  | DICT_FOREIGN_ON_UPDATE_SET_NULL))) {
  625. /* This is an UPDATE */
  626.  
  627. row_ins_foreign_report_err("Trying to update",
  628. thr, foreign,
  629. btr_pcur_get_rec(pcur), entry);
  630.         return(DB_ROW_IS_REFERENCED);
  631. }
  632. if (node->cascade_node == NULL) {
  633. /* Extend our query graph by creating a child to current
  634. update node. The child is used in the cascade or set null
  635. operation. */
  636. node->cascade_heap = mem_heap_create(128);
  637. node->cascade_node = row_create_update_node_for_mysql(
  638. table, node->cascade_heap);
  639. que_node_set_parent(node->cascade_node, node);
  640. }
  641. /* Initialize cascade_node to do the operation we want. Note that we
  642. use the SAME cascade node to do all foreign key operations of the
  643. SQL DELETE: the table of the cascade node may change if there are
  644. several child tables to the table where the delete is done! */
  645. cascade = node->cascade_node;
  646. cascade->table = table;
  647. cascade->foreign = foreign;
  648. if (node->is_delete
  649.     && (foreign->type & DICT_FOREIGN_ON_DELETE_CASCADE)) {
  650. cascade->is_delete = TRUE;
  651. } else {
  652. cascade->is_delete = FALSE;
  653. if (foreign->n_fields > cascade->update_n_fields) {
  654. /* We have to make the update vector longer */
  655. cascade->update = upd_create(foreign->n_fields,
  656. node->cascade_heap);
  657. cascade->update_n_fields = foreign->n_fields;
  658. }
  659. }
  660. /* We do not allow cyclic cascaded updating (DELETE is allowed,
  661. but not UPDATE) of the same table, as this can lead to an infinite
  662. cycle. Check that we are not updating the same table which is
  663. already being modified in this cascade chain. We have to check
  664. this also because the modification of the indexes of a 'parent'
  665. table may still be incomplete, and we must avoid seeing the indexes
  666. of the parent table in an inconsistent state! */
  667. if (!cascade->is_delete
  668.     && row_ins_cascade_ancestor_updates_table(cascade, table)) {
  669.         /* We do not know if this would break foreign key
  670.         constraints, but play safe and return an error */
  671.         err = DB_ROW_IS_REFERENCED;
  672. row_ins_foreign_report_err(
  673. "Trying an update, possibly causing a cyclic cascaded updaten"
  674. "in the child table,", thr, foreign, btr_pcur_get_rec(pcur), entry);
  675. goto nonstandard_exit_func;
  676. }
  677. if (row_ins_cascade_n_ancestors(cascade) >= 15) {
  678. err = DB_ROW_IS_REFERENCED;
  679. row_ins_foreign_report_err(
  680. (char*)"Trying a too deep cascaded delete or updaten",
  681. thr, foreign, btr_pcur_get_rec(pcur), entry);
  682. goto nonstandard_exit_func;
  683. }
  684. index = btr_pcur_get_btr_cur(pcur)->index;
  685. ut_a(index == foreign->foreign_index);
  686. rec = btr_pcur_get_rec(pcur);
  687. if (index->type & DICT_CLUSTERED) {
  688. /* pcur is already positioned in the clustered index of
  689. the child table */
  690. clust_index = index;
  691. clust_rec = rec;
  692. } else {
  693. /* We have to look for the record in the clustered index
  694. in the child table */
  695. clust_index = dict_table_get_first_index(table);
  696. tmp_heap = mem_heap_create(256);
  697. ref = row_build_row_ref(ROW_COPY_POINTERS, index, rec,
  698. tmp_heap);
  699. btr_pcur_open_with_no_init(clust_index, ref,
  700. PAGE_CUR_LE, BTR_SEARCH_LEAF,
  701. cascade->pcur, 0, mtr);
  702. mem_heap_free(tmp_heap);
  703. clust_rec = btr_pcur_get_rec(cascade->pcur);
  704. if (!page_rec_is_user_rec(clust_rec)
  705.     || btr_pcur_get_low_match(cascade->pcur)
  706.        < dict_index_get_n_unique(clust_index)) {
  707. fputs(
  708. "InnoDB: error in cascade of a foreign key opn"
  709. "InnoDB: ", stderr);
  710. dict_index_name_print(stderr, trx, index);
  711. fputs("n"
  712. "InnoDB: record ", stderr);
  713. rec_print(stderr, rec);
  714. fputs("n"
  715. "InnoDB: clustered record ", stderr);
  716. rec_print(stderr, clust_rec);
  717. fputs("n"
  718. "InnoDB: Submit a detailed bug report to http://bugs.mysql.comn", stderr);
  719. err = DB_SUCCESS;
  720. goto nonstandard_exit_func;
  721. }
  722. }
  723. /* Set an X-lock on the row to delete or update in the child table */
  724. err = lock_table(0, table, LOCK_IX, thr);
  725. if (err == DB_SUCCESS) {
  726. /* Here it suffices to use a LOCK_REC_NOT_GAP type lock;
  727. we already have a normal shared lock on the appropriate
  728. gap if the search criterion was not unique */
  729. err = lock_clust_rec_read_check_and_lock(0, clust_rec,
  730. clust_index, LOCK_X, LOCK_REC_NOT_GAP, thr);
  731. }
  732. if (err != DB_SUCCESS) {
  733. goto nonstandard_exit_func;
  734. }
  735. if (rec_get_deleted_flag(clust_rec)) {
  736. /* This can happen if there is a circular reference of
  737. rows such that cascading delete comes to delete a row
  738. already in the process of being delete marked */
  739. err = DB_SUCCESS;
  740. goto nonstandard_exit_func;
  741. }
  742. if ((node->is_delete
  743.     && (foreign->type & DICT_FOREIGN_ON_DELETE_SET_NULL))
  744.    || (!node->is_delete
  745.     && (foreign->type & DICT_FOREIGN_ON_UPDATE_SET_NULL))) {
  746.     
  747. /* Build the appropriate update vector which sets
  748. foreign->n_fields first fields in rec to SQL NULL */
  749. update = cascade->update;
  750. update->info_bits = 0;
  751. update->n_fields = foreign->n_fields;
  752. for (i = 0; i < foreign->n_fields; i++) {
  753. (update->fields + i)->field_no
  754. = dict_table_get_nth_col_pos(table,
  755. dict_index_get_nth_col_no(index, i));
  756. (update->fields + i)->exp = NULL;
  757. (update->fields + i)->new_val.len = UNIV_SQL_NULL;
  758. (update->fields + i)->new_val.data = NULL;
  759. (update->fields + i)->extern_storage = FALSE;
  760. }
  761. }
  762. if (!node->is_delete
  763.     && (foreign->type & DICT_FOREIGN_ON_UPDATE_CASCADE)) {
  764. /* Build the appropriate update vector which sets changing
  765. foreign->n_fields first fields in rec to new values */
  766. upd_vec_heap = mem_heap_create(256);
  767. n_to_update = row_ins_cascade_calc_update_vec(node, foreign,
  768.       upd_vec_heap);
  769. if (n_to_update == ULINT_UNDEFINED) {
  770.         err = DB_ROW_IS_REFERENCED;
  771. row_ins_foreign_report_err(
  772. "Trying a cascaded update where the updated value in the childn"
  773. "table would not fit in the length of the column, or the value wouldn"
  774. "be NULL and the column is declared as not NULL in the child table,",
  775. thr, foreign, btr_pcur_get_rec(pcur), entry);
  776.        goto nonstandard_exit_func;
  777. }
  778. if (cascade->update->n_fields == 0) {
  779. /* The update does not change any columns referred
  780. to in this foreign key constraint: no need to do
  781. anything */
  782. err = DB_SUCCESS;
  783. goto nonstandard_exit_func;
  784. }
  785. }
  786. /* Store pcur position and initialize or store the cascade node
  787. pcur stored position */
  788. btr_pcur_store_position(pcur, mtr);
  789. if (index == clust_index) {
  790. btr_pcur_copy_stored_position(cascade->pcur, pcur);
  791. } else {
  792. btr_pcur_store_position(cascade->pcur, mtr);
  793. }
  794. mtr_commit(mtr);
  795. ut_a(cascade->pcur->rel_pos == BTR_PCUR_ON);
  796. cascade->state = UPD_NODE_UPDATE_CLUSTERED;
  797. err = row_update_cascade_for_mysql(thr, cascade,
  798. foreign->foreign_table);
  799. if (foreign->foreign_table->n_foreign_key_checks_running == 0) {
  800. fprintf(stderr,
  801. "InnoDB: error: table %s has the counter 0 though there isn"
  802. "InnoDB: a FOREIGN KEY check running on it.n",
  803. foreign->foreign_table->name);
  804. }
  805. /* Release the data dictionary latch for a while, so that we do not
  806. starve other threads from doing CREATE TABLE etc. if we have a huge
  807. cascaded operation running. The counter n_foreign_key_checks_running
  808. will prevent other users from dropping or ALTERing the table when we
  809. release the latch. */
  810. row_mysql_unfreeze_data_dictionary(thr_get_trx(thr));
  811. row_mysql_freeze_data_dictionary(thr_get_trx(thr));
  812. mtr_start(mtr);
  813. /* Restore pcur position */
  814. btr_pcur_restore_position(BTR_SEARCH_LEAF, pcur, mtr);
  815. if (upd_vec_heap) {
  816.         mem_heap_free(upd_vec_heap);
  817. }
  818. return(err);
  819. nonstandard_exit_func:
  820. if (upd_vec_heap) {
  821.         mem_heap_free(upd_vec_heap);
  822. }
  823. btr_pcur_store_position(pcur, mtr);
  824. mtr_commit(mtr);
  825. mtr_start(mtr);
  826. btr_pcur_restore_position(BTR_SEARCH_LEAF, pcur, mtr);
  827. return(err);
  828. }
  829. /*************************************************************************
  830. Sets a shared lock on a record. Used in locking possible duplicate key
  831. records and also in checking foreign key constraints. */
  832. static
  833. ulint
  834. row_ins_set_shared_rec_lock(
  835. /*========================*/
  836. /* out: DB_SUCCESS or error code */
  837. ulint type,  /* in: LOCK_ORDINARY, LOCK_GAP, or
  838. LOCK_REC_NOT_GAP type lock */
  839. rec_t* rec, /* in: record */
  840. dict_index_t* index, /* in: index */
  841. que_thr_t* thr) /* in: query thread */
  842. {
  843. ulint err;
  844. if (index->type & DICT_CLUSTERED) {
  845. err = lock_clust_rec_read_check_and_lock(0, rec, index, LOCK_S,
  846. type, thr);
  847. } else {
  848. err = lock_sec_rec_read_check_and_lock(0, rec, index, LOCK_S,
  849. type, thr);
  850. }
  851. return(err);
  852. }
  853. /*************************************************************************
  854. Sets a exclusive lock on a record. Used in locking possible duplicate key
  855. records */
  856. static
  857. ulint
  858. row_ins_set_exclusive_rec_lock(
  859. /*============================*/
  860. /* out: DB_SUCCESS or error code */
  861. ulint type,  /* in: LOCK_ORDINARY, LOCK_GAP, or
  862. LOCK_REC_NOT_GAP type lock */
  863. rec_t* rec, /* in: record */
  864. dict_index_t* index, /* in: index */
  865. que_thr_t* thr) /* in: query thread */
  866. {
  867. ulint err;
  868. if (index->type & DICT_CLUSTERED) {
  869. err = lock_clust_rec_read_check_and_lock(0, rec, index, LOCK_X,
  870. type, thr);
  871. } else {
  872. err = lock_sec_rec_read_check_and_lock(0, rec, index, LOCK_X,
  873. type, thr);
  874. }
  875. return(err);
  876. }
  877. /*******************************************************************
  878. Checks if foreign key constraint fails for an index entry. Sets shared locks
  879. which lock either the success or the failure of the constraint. NOTE that
  880. the caller must have a shared latch on dict_operation_lock. */
  881. ulint
  882. row_ins_check_foreign_constraint(
  883. /*=============================*/
  884. /* out: DB_SUCCESS,
  885. DB_NO_REFERENCED_ROW,
  886. or DB_ROW_IS_REFERENCED */
  887. ibool check_ref,/* in: TRUE if we want to check that
  888. the referenced table is ok, FALSE if we
  889. want to to check the foreign key table */
  890. dict_foreign_t* foreign,/* in: foreign constraint; NOTE that the
  891. tables mentioned in it must be in the
  892. dictionary cache if they exist at all */
  893. dict_table_t* table, /* in: if check_ref is TRUE, then the foreign
  894. table, else the referenced table */
  895. dtuple_t* entry, /* in: index entry for index */
  896. que_thr_t* thr) /* in: query thread */
  897. {
  898.    upd_node_t*   upd_node;
  899. dict_table_t* check_table;
  900. dict_index_t* check_index;
  901. ulint n_fields_cmp;
  902. rec_t* rec;
  903. btr_pcur_t pcur;
  904. ibool moved;
  905. int cmp;
  906. ulint err;
  907. ulint i;
  908. mtr_t mtr;
  909. trx_t* trx = thr_get_trx(thr);
  910. run_again:
  911. #ifdef UNIV_SYNC_DEBUG
  912. ut_ad(rw_lock_own(&dict_operation_lock, RW_LOCK_SHARED));
  913. #endif /* UNIV_SYNC_DEBUG */
  914. err = DB_SUCCESS;
  915. if (trx->check_foreigns == FALSE) {
  916. /* The user has suppressed foreign key checks currently for
  917. this session */
  918. return(DB_SUCCESS);
  919. }
  920. /* If any of the foreign key fields in entry is SQL NULL, we
  921. suppress the foreign key check: this is compatible with Oracle,
  922. for example */
  923. for (i = 0; i < foreign->n_fields; i++) {
  924. if (UNIV_SQL_NULL == dfield_get_len(
  925.                                          dtuple_get_nth_field(entry, i))) {
  926. return(DB_SUCCESS);
  927. }
  928. }
  929. if (que_node_get_type(thr->run_node) == QUE_NODE_UPDATE) {
  930.         upd_node = thr->run_node;
  931.         if (!(upd_node->is_delete) && upd_node->foreign == foreign) {
  932.         /* If a cascaded update is done as defined by a 
  933. foreign key constraint, do not check that
  934. constraint for the child row. In ON UPDATE CASCADE
  935. the update of the parent row is only half done when
  936. we come here: if we would check the constraint here
  937. for the child row it would fail.
  938. A QUESTION remains: if in the child table there are
  939. several constraints which refer to the same parent
  940. table, we should merge all updates to the child as
  941. one update? And the updates can be contradictory!
  942. Currently we just perform the update associated
  943. with each foreign key constraint, one after
  944. another, and the user has problems predicting in
  945. which order they are performed. */
  946.         return(DB_SUCCESS);
  947. }
  948. }
  949. if (check_ref) {
  950. check_table = foreign->referenced_table;
  951. check_index = foreign->referenced_index;
  952. } else {
  953. check_table = foreign->foreign_table;
  954. check_index = foreign->foreign_index;
  955. }
  956. if (check_table == NULL || check_table->ibd_file_missing) {
  957. if (check_ref) {
  958. FILE* ef = dict_foreign_err_file;
  959. mutex_enter(&dict_foreign_err_mutex);
  960. rewind(ef);
  961. ut_print_timestamp(ef);
  962. fputs(" Transaction:n", ef);
  963. trx_print(ef, trx);
  964. fputs("Foreign key constraint fails for table ", ef);
  965. ut_print_name(ef, trx, foreign->foreign_table_name);
  966. fputs(":n", ef);
  967. dict_print_info_on_foreign_key_in_create_format(ef,
  968. trx, foreign);
  969. fputs("nTrying to add to index ", ef);
  970. ut_print_name(ef, trx, foreign->foreign_index->name);
  971. fputs(" tuple:n", ef);
  972. dtuple_print(ef, entry);
  973. fputs("nBut the parent table ", ef);
  974. ut_print_name(ef, trx, foreign->referenced_table_name);
  975. fputs("nor its .ibd file does not currently exist!n", ef);
  976. mutex_exit(&dict_foreign_err_mutex);
  977. return(DB_NO_REFERENCED_ROW);
  978. }
  979. return(DB_SUCCESS);
  980. }
  981. ut_a(check_table && check_index);
  982. if (check_table != table) {
  983. /* We already have a LOCK_IX on table, but not necessarily
  984. on check_table */
  985. err = lock_table(0, check_table, LOCK_IS, thr);
  986. if (err != DB_SUCCESS) {
  987. goto do_possible_lock_wait;
  988. }
  989. }
  990. mtr_start(&mtr);
  991. /* Store old value on n_fields_cmp */
  992. n_fields_cmp = dtuple_get_n_fields_cmp(entry);
  993. dtuple_set_n_fields_cmp(entry, foreign->n_fields);
  994. btr_pcur_open(check_index, entry, PAGE_CUR_GE,
  995. BTR_SEARCH_LEAF, &pcur, &mtr);
  996. /* Scan index records and check if there is a matching record */
  997. for (;;) {
  998. rec = btr_pcur_get_rec(&pcur);
  999. if (rec == page_get_infimum_rec(buf_frame_align(rec))) {
  1000. goto next_rec;
  1001. }
  1002. if (rec == page_get_supremum_rec(buf_frame_align(rec))) {
  1003. err = row_ins_set_shared_rec_lock(LOCK_ORDINARY, rec,
  1004. check_index, thr);
  1005. if (err != DB_SUCCESS) {
  1006. break;
  1007. }
  1008. goto next_rec;
  1009. }
  1010. cmp = cmp_dtuple_rec(entry, rec);
  1011. if (cmp == 0) {
  1012. if (rec_get_deleted_flag(rec)) {
  1013. err = row_ins_set_shared_rec_lock(
  1014. LOCK_ORDINARY,
  1015. rec, check_index, thr);
  1016. if (err != DB_SUCCESS) {
  1017. break;
  1018. }
  1019. } else {
  1020. /* Found a matching record. Lock only
  1021. a record because we can allow inserts
  1022. into gaps */
  1023. err = row_ins_set_shared_rec_lock(
  1024. LOCK_REC_NOT_GAP,
  1025. rec, check_index, thr);
  1026. if (err != DB_SUCCESS) {
  1027. break;
  1028. }
  1029. if (check_ref) {
  1030. err = DB_SUCCESS;
  1031. break;
  1032. } else if (foreign->type != 0) {
  1033. /* There is an ON UPDATE or ON DELETE
  1034. condition: check them in a separate
  1035. function */
  1036. err =
  1037.   row_ins_foreign_check_on_constraint(
  1038. thr, foreign, &pcur, entry,
  1039. &mtr);
  1040. if (err != DB_SUCCESS) {
  1041. break;
  1042. }
  1043. } else {
  1044. row_ins_foreign_report_err(
  1045. "Trying to delete or update",
  1046. thr, foreign, rec, entry);
  1047. err = DB_ROW_IS_REFERENCED;
  1048. break;
  1049. }
  1050. }
  1051. }
  1052. if (cmp < 0) {
  1053. err = row_ins_set_shared_rec_lock(LOCK_GAP,
  1054. rec, check_index, thr);
  1055. if (err != DB_SUCCESS) {
  1056. break;
  1057. }
  1058. if (check_ref) {
  1059. err = DB_NO_REFERENCED_ROW;
  1060. row_ins_foreign_report_add_err(
  1061. trx, foreign, rec, entry);
  1062. } else {
  1063. err = DB_SUCCESS;
  1064. }
  1065. break;
  1066. }
  1067. ut_a(cmp == 0);
  1068. next_rec:
  1069. moved = btr_pcur_move_to_next(&pcur, &mtr);
  1070. if (!moved) {
  1071. if (check_ref) {
  1072. rec = btr_pcur_get_rec(&pcur);
  1073. row_ins_foreign_report_add_err(
  1074. trx, foreign, rec, entry);
  1075. err = DB_NO_REFERENCED_ROW;
  1076. } else {
  1077. err = DB_SUCCESS;
  1078. }
  1079. break;
  1080. }
  1081. }
  1082. btr_pcur_close(&pcur);
  1083. mtr_commit(&mtr);
  1084. /* Restore old value */
  1085. dtuple_set_n_fields_cmp(entry, n_fields_cmp);
  1086. do_possible_lock_wait:
  1087. if (err == DB_LOCK_WAIT) {
  1088. trx->error_state = err;
  1089. que_thr_stop_for_mysql(thr);
  1090. srv_suspend_mysql_thread(thr);
  1091. if (trx->error_state == DB_SUCCESS) {
  1092.         goto run_again;
  1093. }
  1094. err = trx->error_state;
  1095. }
  1096. return(err);
  1097. }
  1098. /*******************************************************************
  1099. Checks if foreign key constraints fail for an index entry. If index
  1100. is not mentioned in any constraint, this function does nothing,
  1101. Otherwise does searches to the indexes of referenced tables and
  1102. sets shared locks which lock either the success or the failure of
  1103. a constraint. */
  1104. static
  1105. ulint
  1106. row_ins_check_foreign_constraints(
  1107. /*==============================*/
  1108. /* out: DB_SUCCESS or error code */
  1109. dict_table_t* table, /* in: table */
  1110. dict_index_t* index, /* in: index */
  1111. dtuple_t* entry, /* in: index entry for index */
  1112. que_thr_t* thr) /* in: query thread */
  1113. {
  1114. dict_foreign_t* foreign;
  1115. ulint err;
  1116. trx_t* trx;
  1117. ibool got_s_lock = FALSE;
  1118. trx = thr_get_trx(thr);
  1119. foreign = UT_LIST_GET_FIRST(table->foreign_list);
  1120. while (foreign) {
  1121. if (foreign->foreign_index == index) {
  1122. if (foreign->referenced_table == NULL) {
  1123. dict_table_get(foreign->referenced_table_name,
  1124. trx);
  1125. }
  1126. if (0 == trx->dict_operation_lock_mode) {
  1127. got_s_lock = TRUE;
  1128. row_mysql_freeze_data_dictionary(trx);
  1129. }
  1130. if (foreign->referenced_table) {
  1131. mutex_enter(&(dict_sys->mutex));
  1132. (foreign->referenced_table
  1133. ->n_foreign_key_checks_running)++;
  1134. mutex_exit(&(dict_sys->mutex));
  1135. }
  1136. /* NOTE that if the thread ends up waiting for a lock
  1137. we will release dict_operation_lock temporarily!
  1138. But the counter on the table protects the referenced
  1139. table from being dropped while the check is running. */
  1140. err = row_ins_check_foreign_constraint(TRUE, foreign,
  1141. table, entry, thr);
  1142. if (foreign->referenced_table) {
  1143. mutex_enter(&(dict_sys->mutex));
  1144. ut_a(foreign->referenced_table
  1145. ->n_foreign_key_checks_running > 0);
  1146. (foreign->referenced_table
  1147. ->n_foreign_key_checks_running)--;
  1148. mutex_exit(&(dict_sys->mutex));
  1149. }
  1150. if (got_s_lock) {
  1151. row_mysql_unfreeze_data_dictionary(trx);
  1152. }
  1153. if (err != DB_SUCCESS) {
  1154. return(err);
  1155. }
  1156. }
  1157. foreign = UT_LIST_GET_NEXT(foreign_list, foreign);
  1158. }
  1159. return(DB_SUCCESS);
  1160. }
  1161. /*******************************************************************
  1162. Checks if a unique key violation to rec would occur at the index entry
  1163. insert. */
  1164. static
  1165. ibool
  1166. row_ins_dupl_error_with_rec(
  1167. /*========================*/
  1168. /* out: TRUE if error */
  1169. rec_t* rec, /* in: user record; NOTE that we assume
  1170. that the caller already has a record lock on
  1171. the record! */
  1172. dtuple_t* entry, /* in: entry to insert */
  1173. dict_index_t* index) /* in: index */
  1174. {
  1175. ulint matched_fields;
  1176. ulint matched_bytes;
  1177. ulint n_unique;
  1178. ulint   i;
  1179. n_unique = dict_index_get_n_unique(index);
  1180. matched_fields = 0;
  1181. matched_bytes = 0;
  1182. cmp_dtuple_rec_with_match(entry, rec, &matched_fields, &matched_bytes);
  1183. if (matched_fields < n_unique) {
  1184.         return(FALSE);
  1185. }
  1186. /* In a unique secondary index we allow equal key values if they
  1187. contain SQL NULLs */
  1188. if (!(index->type & DICT_CLUSTERED)) {
  1189.         for (i = 0; i < n_unique; i++) {
  1190.                 if (UNIV_SQL_NULL == dfield_get_len(
  1191.                                          dtuple_get_nth_field(entry, i))) {
  1192.                         return(FALSE);
  1193.                 }
  1194.         }
  1195. }
  1196. if (!rec_get_deleted_flag(rec)) {
  1197.         return(TRUE);
  1198. }
  1199. return(FALSE);
  1200. }
  1201. /*******************************************************************
  1202. Scans a unique non-clustered index at a given index entry to determine
  1203. whether a uniqueness violation has occurred for the key value of the entry.
  1204. Set shared locks on possible duplicate records. */
  1205. static
  1206. ulint
  1207. row_ins_scan_sec_index_for_duplicate(
  1208. /*=================================*/
  1209. /* out: DB_SUCCESS, DB_DUPLICATE_KEY, or
  1210. DB_LOCK_WAIT */
  1211. dict_index_t* index, /* in: non-clustered unique index */
  1212. dtuple_t* entry, /* in: index entry */
  1213. que_thr_t* thr) /* in: query thread */
  1214. {
  1215. ulint n_unique;
  1216. ulint i;
  1217. int cmp;
  1218. ulint n_fields_cmp;
  1219. rec_t* rec;
  1220. btr_pcur_t pcur;
  1221. ulint err = DB_SUCCESS;
  1222. ibool moved;
  1223. mtr_t mtr;
  1224. trx_t* trx;
  1225. n_unique = dict_index_get_n_unique(index);
  1226. /* If the secondary index is unique, but one of the fields in the
  1227. n_unique first fields is NULL, a unique key violation cannot occur,
  1228. since we define NULL != NULL in this case */
  1229. for (i = 0; i < n_unique; i++) {
  1230. if (UNIV_SQL_NULL == dfield_get_len(
  1231.                                          dtuple_get_nth_field(entry, i))) {
  1232. return(DB_SUCCESS);
  1233. }
  1234. }
  1235. mtr_start(&mtr);
  1236. /* Store old value on n_fields_cmp */
  1237. n_fields_cmp = dtuple_get_n_fields_cmp(entry);
  1238. dtuple_set_n_fields_cmp(entry, dict_index_get_n_unique(index));
  1239. btr_pcur_open(index, entry, PAGE_CUR_GE, BTR_SEARCH_LEAF, &pcur, &mtr);
  1240. /* Scan index records and check if there is a duplicate */
  1241. for (;;) {
  1242. rec = btr_pcur_get_rec(&pcur);
  1243. if (rec == page_get_infimum_rec(buf_frame_align(rec))) {
  1244. goto next_rec;
  1245. }
  1246. /* Try to place a lock on the index record */
  1247. trx = thr_get_trx(thr);      
  1248. ut_ad(trx);
  1249. if (innobase_query_is_update()) {
  1250. /* If the SQL-query will update or replace
  1251. duplicate key we will take X-lock for 
  1252. duplicates ( REPLACE, LOAD DATAFILE REPLACE, 
  1253. INSERT ON DUPLICATE KEY UPDATE). */
  1254. err = row_ins_set_exclusive_rec_lock(
  1255. LOCK_ORDINARY,rec,index,thr);
  1256. } else {
  1257. err = row_ins_set_shared_rec_lock(
  1258. LOCK_ORDINARY, rec, index,thr);
  1259. }
  1260. if (err != DB_SUCCESS) {
  1261. break;
  1262. }
  1263. if (rec == page_get_supremum_rec(buf_frame_align(rec))) {
  1264. goto next_rec;
  1265. }
  1266. cmp = cmp_dtuple_rec(entry, rec);
  1267. if (cmp == 0) {
  1268. if (row_ins_dupl_error_with_rec(rec, entry, index)) {
  1269. err = DB_DUPLICATE_KEY;
  1270. thr_get_trx(thr)->error_info = index;
  1271. break;
  1272. }
  1273. }
  1274. if (cmp < 0) {
  1275. break;
  1276. }
  1277. ut_a(cmp == 0);
  1278. next_rec:
  1279. moved = btr_pcur_move_to_next(&pcur, &mtr);
  1280. if (!moved) {
  1281. break;
  1282. }
  1283. }
  1284. mtr_commit(&mtr);
  1285. /* Restore old value */
  1286. dtuple_set_n_fields_cmp(entry, n_fields_cmp);
  1287. return(err);
  1288. }
  1289. /*******************************************************************
  1290. Checks if a unique key violation error would occur at an index entry
  1291. insert. Sets shared locks on possible duplicate records. Works only
  1292. for a clustered index! */
  1293. static
  1294. ulint
  1295. row_ins_duplicate_error_in_clust(
  1296. /*=============================*/
  1297. /* out: DB_SUCCESS if no error,
  1298. DB_DUPLICATE_KEY if error, DB_LOCK_WAIT if we
  1299. have to wait for a lock on a possible
  1300. duplicate record */
  1301. btr_cur_t* cursor, /* in: B-tree cursor */
  1302. dtuple_t* entry, /* in: entry to insert */
  1303. que_thr_t* thr, /* in: query thread */
  1304. mtr_t* mtr) /* in: mtr */
  1305. {
  1306. ulint err;
  1307. rec_t* rec;
  1308. page_t* page;
  1309. ulint n_unique;
  1310. trx_t* trx = thr_get_trx(thr);
  1311. UT_NOT_USED(mtr);
  1312. ut_a(cursor->index->type & DICT_CLUSTERED);
  1313. ut_ad(cursor->index->type & DICT_UNIQUE);
  1314. /* NOTE: For unique non-clustered indexes there may be any number
  1315. of delete marked records with the same value for the non-clustered
  1316. index key (remember multiversioning), and which differ only in
  1317. the row refererence part of the index record, containing the
  1318. clustered index key fields. For such a secondary index record,
  1319. to avoid race condition, we must FIRST do the insertion and after
  1320. that check that the uniqueness condition is not breached! */
  1321. /* NOTE: A problem is that in the B-tree node pointers on an
  1322. upper level may match more to the entry than the actual existing
  1323. user records on the leaf level. So, even if low_match would suggest
  1324. that a duplicate key violation may occur, this may not be the case. */
  1325. n_unique = dict_index_get_n_unique(cursor->index);
  1326. if (cursor->low_match >= n_unique) {
  1327. rec = btr_cur_get_rec(cursor);
  1328. page = buf_frame_align(rec);
  1329. if (rec != page_get_infimum_rec(page)) {
  1330. /* We set a lock on the possible duplicate: this
  1331. is needed in logical logging of MySQL to make
  1332. sure that in roll-forward we get the same duplicate
  1333. errors as in original execution */
  1334. if (innobase_query_is_update()) {
  1335. /* If the SQL-query will update or replace
  1336. duplicate key we will take X-lock for 
  1337. duplicates ( REPLACE, LOAD DATAFILE REPLACE, 
  1338. INSERT ON DUPLICATE KEY UPDATE). */
  1339. err = row_ins_set_exclusive_rec_lock(
  1340. LOCK_REC_NOT_GAP,rec,cursor->index,
  1341. thr);
  1342. } else {
  1343. err = row_ins_set_shared_rec_lock(
  1344. LOCK_REC_NOT_GAP,rec, cursor->index, 
  1345. thr);
  1346. if (err != DB_SUCCESS) {
  1347. return(err);
  1348. }
  1349. if (row_ins_dupl_error_with_rec(rec, entry,
  1350. cursor->index)) {
  1351. trx->error_info = cursor->index;
  1352. return(DB_DUPLICATE_KEY);
  1353. }
  1354. }
  1355. }
  1356. if (cursor->up_match >= n_unique) {
  1357. rec = page_rec_get_next(btr_cur_get_rec(cursor));
  1358. page = buf_frame_align(rec);
  1359. if (rec != page_get_supremum_rec(page)) {
  1360. if (innobase_query_is_update()) {
  1361. /* If the SQL-query will update or replace
  1362. duplicate key we will take X-lock for 
  1363. duplicates ( REPLACE, LOAD DATAFILE REPLACE, 
  1364. INSERT ON DUPLICATE KEY UPDATE). */
  1365. err = row_ins_set_exclusive_rec_lock(
  1366. LOCK_REC_NOT_GAP,
  1367. rec,cursor->index,thr);
  1368. } else {
  1369. err = row_ins_set_shared_rec_lock(
  1370. LOCK_REC_NOT_GAP,rec, 
  1371. cursor->index, thr);
  1372. }
  1373. if (err != DB_SUCCESS) {
  1374. return(err);
  1375. }
  1376. if (row_ins_dupl_error_with_rec(rec, entry,
  1377. cursor->index)) {
  1378. trx->error_info = cursor->index;
  1379. return(DB_DUPLICATE_KEY);
  1380. }
  1381. }
  1382. ut_a(!(cursor->index->type & DICT_CLUSTERED));
  1383. /* This should never happen */
  1384. }
  1385. return(DB_SUCCESS);
  1386. }
  1387. /*******************************************************************
  1388. Checks if an index entry has long enough common prefix with an existing
  1389. record so that the intended insert of the entry must be changed to a modify of
  1390. the existing record. In the case of a clustered index, the prefix must be
  1391. n_unique fields long, and in the case of a secondary index, all fields must be
  1392. equal. */
  1393. UNIV_INLINE
  1394. ulint
  1395. row_ins_must_modify(
  1396. /*================*/
  1397. /* out: 0 if no update, ROW_INS_PREV if
  1398. previous should be updated; currently we
  1399. do the search so that only the low_match
  1400. record can match enough to the search tuple,
  1401. not the next record */
  1402. btr_cur_t* cursor) /* in: B-tree cursor */
  1403. {
  1404. ulint enough_match;
  1405. rec_t* rec;
  1406. page_t* page;
  1407. /* NOTE: (compare to the note in row_ins_duplicate_error) Because node
  1408. pointers on upper levels of the B-tree may match more to entry than
  1409. to actual user records on the leaf level, we have to check if the
  1410. candidate record is actually a user record. In a clustered index
  1411. node pointers contain index->n_unique first fields, and in the case
  1412. of a secondary index, all fields of the index. */
  1413. enough_match = dict_index_get_n_unique_in_tree(cursor->index);
  1414. if (cursor->low_match >= enough_match) {
  1415. rec = btr_cur_get_rec(cursor);
  1416. page = buf_frame_align(rec);
  1417. if (rec != page_get_infimum_rec(page)) {
  1418. return(ROW_INS_PREV);
  1419. }
  1420. }
  1421. return(0);
  1422. }
  1423. /*******************************************************************
  1424. Tries to insert an index entry to an index. If the index is clustered
  1425. and a record with the same unique key is found, the other record is
  1426. necessarily marked deleted by a committed transaction, or a unique key
  1427. violation error occurs. The delete marked record is then updated to an
  1428. existing record, and we must write an undo log record on the delete
  1429. marked record. If the index is secondary, and a record with exactly the
  1430. same fields is found, the other record is necessarily marked deleted.
  1431. It is then unmarked. Otherwise, the entry is just inserted to the index. */
  1432. ulint
  1433. row_ins_index_entry_low(
  1434. /*====================*/
  1435. /* out: DB_SUCCESS, DB_LOCK_WAIT, DB_FAIL
  1436. if pessimistic retry needed, or error code */
  1437. ulint mode, /* in: BTR_MODIFY_LEAF or BTR_MODIFY_TREE,
  1438. depending on whether we wish optimistic or
  1439. pessimistic descent down the index tree */
  1440. dict_index_t* index, /* in: index */
  1441. dtuple_t* entry, /* in: index entry to insert */
  1442. ulint* ext_vec,/* in: array containing field numbers of
  1443. externally stored fields in entry, or NULL */
  1444. ulint n_ext_vec,/* in: number of fields in ext_vec */
  1445. que_thr_t* thr) /* in: query thread */
  1446. {
  1447. btr_cur_t cursor;
  1448. ulint ignore_sec_unique = 0;
  1449. ulint modify = 0; /* remove warning */
  1450. rec_t* insert_rec;
  1451. rec_t* rec;
  1452. rec_t* first_rec;
  1453. ulint err;
  1454. ulint n_unique;
  1455. big_rec_t* big_rec = NULL;
  1456. mtr_t mtr;
  1457. log_free_check();
  1458. mtr_start(&mtr);
  1459. cursor.thr = thr;
  1460. /* Note that we use PAGE_CUR_LE as the search mode, because then
  1461. the function will return in both low_match and up_match of the
  1462. cursor sensible values */
  1463. if (!(thr_get_trx(thr)->check_unique_secondary)) {
  1464. ignore_sec_unique = BTR_IGNORE_SEC_UNIQUE;
  1465. }
  1466. btr_cur_search_to_nth_level(index, 0, entry, PAGE_CUR_LE,
  1467. mode | BTR_INSERT | ignore_sec_unique,
  1468. &cursor, 0, &mtr);
  1469. if (cursor.flag == BTR_CUR_INSERT_TO_IBUF) {
  1470. /* The insertion was made to the insert buffer already during
  1471. the search: we are done */
  1472. err = DB_SUCCESS;
  1473. goto function_exit;
  1474. }
  1475. first_rec = page_rec_get_next(page_get_infimum_rec(
  1476. buf_frame_align(btr_cur_get_rec(&cursor))));
  1477. if (!page_rec_is_supremum(first_rec)) {
  1478. ut_a((rec_get_n_fields(first_rec))
  1479. == dtuple_get_n_fields(entry));
  1480. }
  1481. n_unique = dict_index_get_n_unique(index);
  1482. if (index->type & DICT_UNIQUE && (cursor.up_match >= n_unique
  1483.  || cursor.low_match >= n_unique)) {
  1484. if (index->type & DICT_CLUSTERED) {  
  1485. /* Note that the following may return also
  1486. DB_LOCK_WAIT */
  1487. err = row_ins_duplicate_error_in_clust(&cursor,
  1488. entry, thr, &mtr);
  1489. if (err != DB_SUCCESS) {
  1490. goto function_exit;
  1491. }
  1492. } else {
  1493. mtr_commit(&mtr);
  1494. err = row_ins_scan_sec_index_for_duplicate(index,
  1495. entry, thr);
  1496. mtr_start(&mtr);
  1497. if (err != DB_SUCCESS) {
  1498. goto function_exit;
  1499. }
  1500. /* We did not find a duplicate and we have now
  1501. locked with s-locks the necessary records to
  1502. prevent any insertion of a duplicate by another
  1503. transaction. Let us now reposition the cursor and
  1504. continue the insertion. */
  1505. btr_cur_search_to_nth_level(index, 0, entry,
  1506. PAGE_CUR_LE, mode | BTR_INSERT,
  1507. &cursor, 0, &mtr);
  1508. }
  1509. }
  1510. modify = row_ins_must_modify(&cursor);
  1511. if (modify != 0) {
  1512. /* There is already an index entry with a long enough common
  1513. prefix, we must convert the insert into a modify of an
  1514. existing record */
  1515. if (modify == ROW_INS_NEXT) {
  1516. rec = page_rec_get_next(btr_cur_get_rec(&cursor));
  1517. btr_cur_position(index, rec, &cursor);
  1518. }
  1519. if (index->type & DICT_CLUSTERED) {
  1520. err = row_ins_clust_index_entry_by_modify(mode,
  1521. &cursor, &big_rec,
  1522. entry,
  1523. ext_vec, n_ext_vec,
  1524. thr, &mtr);
  1525. } else {
  1526. err = row_ins_sec_index_entry_by_modify(mode, &cursor,
  1527. entry,
  1528. thr, &mtr);
  1529. }
  1530. } else {
  1531. if (mode == BTR_MODIFY_LEAF) {
  1532. err = btr_cur_optimistic_insert(0, &cursor, entry,
  1533. &insert_rec, &big_rec, thr, &mtr);
  1534. } else {
  1535. ut_a(mode == BTR_MODIFY_TREE);
  1536. err = btr_cur_pessimistic_insert(0, &cursor, entry,
  1537. &insert_rec, &big_rec, thr, &mtr);
  1538. }
  1539. if (err == DB_SUCCESS) {
  1540. if (ext_vec) {
  1541. rec_set_field_extern_bits(insert_rec,
  1542. ext_vec, n_ext_vec, &mtr);
  1543. }
  1544. }
  1545. }
  1546. function_exit:
  1547. mtr_commit(&mtr);
  1548. if (big_rec) {
  1549. mtr_start(&mtr);
  1550. btr_cur_search_to_nth_level(index, 0, entry, PAGE_CUR_LE,
  1551. BTR_MODIFY_TREE, &cursor, 0, &mtr);
  1552. err = btr_store_big_rec_extern_fields(index,
  1553. btr_cur_get_rec(&cursor), 
  1554. big_rec, &mtr);
  1555. if (modify) {
  1556. dtuple_big_rec_free(big_rec);
  1557. } else {
  1558. dtuple_convert_back_big_rec(index, entry, big_rec);
  1559. }
  1560. mtr_commit(&mtr);
  1561. }
  1562. return(err);
  1563. }
  1564. /*******************************************************************
  1565. Inserts an index entry to index. Tries first optimistic, then pessimistic
  1566. descent down the tree. If the entry matches enough to a delete marked record,
  1567. performs the insert by updating or delete unmarking the delete marked
  1568. record. */
  1569. ulint
  1570. row_ins_index_entry(
  1571. /*================*/
  1572. /* out: DB_SUCCESS, DB_LOCK_WAIT,
  1573. DB_DUPLICATE_KEY, or some other error code */
  1574. dict_index_t* index, /* in: index */
  1575. dtuple_t* entry, /* in: index entry to insert */
  1576. ulint* ext_vec,/* in: array containing field numbers of
  1577. externally stored fields in entry, or NULL */
  1578. ulint n_ext_vec,/* in: number of fields in ext_vec */
  1579. que_thr_t* thr) /* in: query thread */
  1580. {
  1581. ulint err;
  1582. if (UT_LIST_GET_FIRST(index->table->foreign_list)) {
  1583. err = row_ins_check_foreign_constraints(index->table, index,
  1584. entry, thr);
  1585. if (err != DB_SUCCESS) {
  1586. return(err);
  1587. }
  1588. }
  1589. /* Try first optimistic descent to the B-tree */
  1590. err = row_ins_index_entry_low(BTR_MODIFY_LEAF, index, entry,
  1591. ext_vec, n_ext_vec, thr);
  1592. if (err != DB_FAIL) {
  1593. return(err);
  1594. }
  1595. /* Try then pessimistic descent to the B-tree */
  1596. err = row_ins_index_entry_low(BTR_MODIFY_TREE, index, entry,
  1597. ext_vec, n_ext_vec, thr);
  1598. return(err);
  1599. }
  1600. /***************************************************************
  1601. Sets the values of the dtuple fields in entry from the values of appropriate
  1602. columns in row. */
  1603. static
  1604. void
  1605. row_ins_index_entry_set_vals(
  1606. /*=========================*/
  1607. dict_index_t* index, /* in: index */
  1608. dtuple_t* entry, /* in: index entry to make */
  1609. dtuple_t* row) /* in: row */
  1610. {
  1611. dict_field_t* ind_field;
  1612. dfield_t* field;
  1613. dfield_t* row_field;
  1614. ulint n_fields;
  1615. ulint i;
  1616. dtype_t*        cur_type;
  1617. ut_ad(entry && row);
  1618. n_fields = dtuple_get_n_fields(entry);
  1619. for (i = 0; i < n_fields; i++) {
  1620. field = dtuple_get_nth_field(entry, i);
  1621. ind_field = dict_index_get_nth_field(index, i);
  1622. row_field = dtuple_get_nth_field(row, ind_field->col->ind);
  1623. /* Check column prefix indexes */
  1624. if (ind_field->prefix_len > 0
  1625.     && dfield_get_len(row_field) != UNIV_SQL_NULL) {
  1626. cur_type = dict_col_get_type(
  1627. dict_field_get_col(ind_field));
  1628. field->len = dtype_get_at_most_n_mbchars(cur_type,
  1629.   ind_field->prefix_len,
  1630.   dfield_get_len(row_field), row_field->data);
  1631. } else {
  1632.         field->len = row_field->len;
  1633. }
  1634. field->data = row_field->data;
  1635. }
  1636. }
  1637. /***************************************************************
  1638. Inserts a single index entry to the table. */
  1639. static
  1640. ulint
  1641. row_ins_index_entry_step(
  1642. /*=====================*/
  1643. /* out: DB_SUCCESS if operation successfully
  1644. completed, else error code or DB_LOCK_WAIT */
  1645. ins_node_t* node, /* in: row insert node */
  1646. que_thr_t* thr) /* in: query thread */
  1647. {
  1648. ulint err;
  1649. ut_ad(dtuple_check_typed(node->row));
  1650. row_ins_index_entry_set_vals(node->index, node->entry, node->row);
  1651. ut_ad(dtuple_check_typed(node->entry));
  1652. err = row_ins_index_entry(node->index, node->entry, NULL, 0, thr);
  1653. return(err);
  1654. }
  1655. /***************************************************************
  1656. Allocates a row id for row and inits the node->index field. */
  1657. UNIV_INLINE
  1658. void
  1659. row_ins_alloc_row_id_step(
  1660. /*======================*/
  1661. ins_node_t* node) /* in: row insert node */
  1662. {
  1663. dulint row_id;
  1664. ut_ad(node->state == INS_NODE_ALLOC_ROW_ID);
  1665. if (dict_table_get_first_index(node->table)->type & DICT_UNIQUE) {
  1666. /* No row id is stored if the clustered index is unique */
  1667. return;
  1668. }
  1669. /* Fill in row id value to row */
  1670. row_id = dict_sys_get_new_row_id();
  1671. dict_sys_write_row_id(node->row_id_buf, row_id);
  1672. }
  1673. /***************************************************************
  1674. Gets a row to insert from the values list. */
  1675. UNIV_INLINE
  1676. void
  1677. row_ins_get_row_from_values(
  1678. /*========================*/
  1679. ins_node_t* node) /* in: row insert node */
  1680. {
  1681. que_node_t* list_node;
  1682. dfield_t* dfield;
  1683. dtuple_t* row;
  1684. ulint i;
  1685. /* The field values are copied in the buffers of the select node and
  1686. it is safe to use them until we fetch from select again: therefore
  1687. we can just copy the pointers */
  1688. row = node->row; 
  1689. i = 0;
  1690. list_node = node->values_list;
  1691. while (list_node) {
  1692. eval_exp(list_node);
  1693. dfield = dtuple_get_nth_field(row, i);
  1694. dfield_copy_data(dfield, que_node_get_val(list_node));
  1695. i++;
  1696. list_node = que_node_get_next(list_node);
  1697. }
  1698. }
  1699. /***************************************************************
  1700. Gets a row to insert from the select list. */
  1701. UNIV_INLINE
  1702. void
  1703. row_ins_get_row_from_select(
  1704. /*========================*/
  1705. ins_node_t* node) /* in: row insert node */
  1706. {
  1707. que_node_t* list_node;
  1708. dfield_t* dfield;
  1709. dtuple_t* row;
  1710. ulint i;
  1711. /* The field values are copied in the buffers of the select node and
  1712. it is safe to use them until we fetch from select again: therefore
  1713. we can just copy the pointers */
  1714. row = node->row; 
  1715. i = 0;
  1716. list_node = node->select->select_list;
  1717. while (list_node) {
  1718. dfield = dtuple_get_nth_field(row, i);
  1719. dfield_copy_data(dfield, que_node_get_val(list_node));
  1720. i++;
  1721. list_node = que_node_get_next(list_node);
  1722. }
  1723. }
  1724. /***************************************************************
  1725. Inserts a row to a table. */
  1726. ulint
  1727. row_ins(
  1728. /*====*/
  1729. /* out: DB_SUCCESS if operation successfully
  1730. completed, else error code or DB_LOCK_WAIT */
  1731. ins_node_t* node, /* in: row insert node */
  1732. que_thr_t* thr) /* in: query thread */
  1733. {
  1734. ulint err;
  1735. ut_ad(node && thr);
  1736. if (node->state == INS_NODE_ALLOC_ROW_ID) {
  1737. row_ins_alloc_row_id_step(node);
  1738. node->index = dict_table_get_first_index(node->table);
  1739. node->entry = UT_LIST_GET_FIRST(node->entry_list);
  1740. if (node->ins_type == INS_SEARCHED) {
  1741. row_ins_get_row_from_select(node);
  1742. } else if (node->ins_type == INS_VALUES) {
  1743. row_ins_get_row_from_values(node);
  1744. }
  1745. node->state = INS_NODE_INSERT_ENTRIES;
  1746. }
  1747. ut_ad(node->state == INS_NODE_INSERT_ENTRIES);
  1748. while (node->index != NULL) {
  1749. err = row_ins_index_entry_step(node, thr);
  1750. if (err != DB_SUCCESS) {
  1751. return(err);
  1752. }
  1753. node->index = dict_table_get_next_index(node->index);
  1754. node->entry = UT_LIST_GET_NEXT(tuple_list, node->entry);
  1755. }
  1756. ut_ad(node->entry == NULL);
  1757. node->state = INS_NODE_ALLOC_ROW_ID;
  1758. return(DB_SUCCESS);
  1759. }
  1760. /***************************************************************
  1761. Inserts a row to a table. This is a high-level function used in SQL execution
  1762. graphs. */
  1763. que_thr_t*
  1764. row_ins_step(
  1765. /*=========*/
  1766. /* out: query thread to run next or NULL */
  1767. que_thr_t* thr) /* in: query thread */
  1768. {
  1769. ins_node_t* node;
  1770. que_node_t* parent;
  1771. sel_node_t* sel_node;
  1772. trx_t* trx;
  1773. ulint err;
  1774. ut_ad(thr);
  1775. trx = thr_get_trx(thr);
  1776. trx_start_if_not_started(trx);
  1777. node = thr->run_node;
  1778. ut_ad(que_node_get_type(node) == QUE_NODE_INSERT);
  1779. parent = que_node_get_parent(node);
  1780. sel_node = node->select;
  1781. if (thr->prev_node == parent) {
  1782. node->state = INS_NODE_SET_IX_LOCK;
  1783. }
  1784. /* If this is the first time this node is executed (or when
  1785. execution resumes after wait for the table IX lock), set an
  1786. IX lock on the table and reset the possible select node. */
  1787. if (node->state == INS_NODE_SET_IX_LOCK) {
  1788. /* It may be that the current session has not yet started
  1789. its transaction, or it has been committed: */
  1790. if (UT_DULINT_EQ(trx->id, node->trx_id)) {
  1791. /* No need to do IX-locking or write trx id to buf */
  1792. goto same_trx;
  1793. }
  1794. trx_write_trx_id(node->trx_id_buf, trx->id);
  1795. err = lock_table(0, node->table, LOCK_IX, thr);
  1796. if (err != DB_SUCCESS) {
  1797. goto error_handling;
  1798. }
  1799. node->trx_id = trx->id;
  1800. same_trx:
  1801. node->state = INS_NODE_ALLOC_ROW_ID;
  1802. if (node->ins_type == INS_SEARCHED) {
  1803. /* Reset the cursor */
  1804. sel_node->state = SEL_NODE_OPEN;
  1805.  
  1806. /* Fetch a row to insert */
  1807. thr->run_node = sel_node;
  1808. return(thr);
  1809. }
  1810. }
  1811. if ((node->ins_type == INS_SEARCHED)
  1812. && (sel_node->state != SEL_NODE_FETCH)) {
  1813. ut_ad(sel_node->state == SEL_NODE_NO_MORE_ROWS);
  1814. /* No more rows to insert */
  1815. thr->run_node = parent;
  1816. return(thr);
  1817. }
  1818. /* DO THE CHECKS OF THE CONSISTENCY CONSTRAINTS HERE */
  1819. err = row_ins(node, thr);
  1820. error_handling:
  1821. trx->error_state = err;
  1822. if (err != DB_SUCCESS) {
  1823. /* err == DB_LOCK_WAIT or SQL error detected */
  1824. return(NULL);
  1825. }
  1826. /* DO THE TRIGGER ACTIONS HERE */
  1827. if (node->ins_type == INS_SEARCHED) {
  1828. /* Fetch a row to insert */
  1829. thr->run_node = sel_node;
  1830. } else {
  1831. thr->run_node = que_node_get_parent(node);
  1832. }
  1833. return(thr);
  1834. }