trx0trx.c
上传用户:tsgydb
上传日期:2007-04-14
资源大小:10674k
文件大小:29k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /******************************************************
  2. The transaction
  3. (c) 1996 Innobase Oy
  4. Created 3/26/1996 Heikki Tuuri
  5. *******************************************************/
  6. #include "trx0trx.h"
  7. #ifdef UNIV_NONINL
  8. #include "trx0trx.ic"
  9. #endif
  10. #include "trx0undo.h"
  11. #include "trx0rseg.h"
  12. #include "log0log.h"
  13. #include "que0que.h"
  14. #include "lock0lock.h"
  15. #include "trx0roll.h"
  16. #include "usr0sess.h"
  17. #include "read0read.h"
  18. #include "srv0srv.h"
  19. #include "thr0loc.h"
  20. /* Dummy session used currently in MySQL interface */
  21. sess_t* trx_dummy_sess = NULL;
  22. /* Number of transactions currently allocated for MySQL: protected by
  23. the kernel mutex */
  24. ulint trx_n_mysql_transactions = 0;
  25. /********************************************************************
  26. Creates and initializes a transaction object. */
  27. trx_t*
  28. trx_create(
  29. /*=======*/
  30. /* out, own: the transaction */
  31. sess_t* sess) /* in: session or NULL */
  32. {
  33. trx_t* trx;
  34. ut_ad(mutex_own(&kernel_mutex));
  35. trx = mem_alloc(sizeof(trx_t));
  36. trx->type = TRX_USER;
  37. trx->conc_state = TRX_NOT_STARTED;
  38. trx->dict_operation = FALSE;
  39. trx->n_mysql_tables_in_use = 0;
  40. mutex_create(&(trx->undo_mutex));
  41. mutex_set_level(&(trx->undo_mutex), SYNC_TRX_UNDO);
  42. trx->rseg = NULL;
  43. trx->undo_no = ut_dulint_zero;
  44. trx->last_sql_stat_start.least_undo_no = ut_dulint_zero;
  45. trx->insert_undo = NULL;
  46. trx->update_undo = NULL;
  47. trx->undo_no_arr = NULL;
  48. trx->error_state = DB_SUCCESS;
  49. trx->sess = sess;
  50. trx->que_state = TRX_QUE_RUNNING;
  51. trx->n_active_thrs = 0;
  52. trx->handling_signals = FALSE;
  53. UT_LIST_INIT(trx->signals);
  54. UT_LIST_INIT(trx->reply_signals);
  55. trx->graph = NULL;
  56. trx->wait_lock = NULL;
  57. UT_LIST_INIT(trx->wait_thrs);
  58. trx->lock_heap = mem_heap_create_in_buffer(256);
  59. UT_LIST_INIT(trx->trx_locks);
  60. trx->read_view_heap = mem_heap_create(256);
  61. trx->read_view = NULL;
  62. return(trx);
  63. }
  64. /************************************************************************
  65. Creates a transaction object for MySQL. */
  66. trx_t*
  67. trx_allocate_for_mysql(void)
  68. /*========================*/
  69. /* out, own: transaction object */
  70. {
  71. trx_t* trx;
  72. mutex_enter(&kernel_mutex);
  73. /* Open a dummy session */
  74. if (!trx_dummy_sess) {
  75. trx_dummy_sess = sess_open(NULL, (byte*)"Dummy sess",
  76. ut_strlen("Dummy sess"));
  77. }
  78. trx = trx_create(trx_dummy_sess);
  79. trx_n_mysql_transactions++;
  80. mutex_exit(&kernel_mutex);
  81. trx->mysql_thread_id = os_thread_get_curr_id();
  82. return(trx);
  83. }
  84. /************************************************************************
  85. Frees a transaction object. */
  86. void
  87. trx_free(
  88. /*=====*/
  89. trx_t* trx) /* in, own: trx object */
  90. {
  91. ut_ad(mutex_own(&kernel_mutex));
  92. ut_a(trx->conc_state == TRX_NOT_STARTED);
  93. mutex_free(&(trx->undo_mutex));
  94. ut_a(trx->insert_undo == NULL); 
  95. ut_a(trx->update_undo == NULL); 
  96. ut_a(trx->n_mysql_tables_in_use == 0);
  97. if (trx->undo_no_arr) {
  98. trx_undo_arr_free(trx->undo_no_arr);
  99. }
  100. ut_a(UT_LIST_GET_LEN(trx->signals) == 0);
  101. ut_a(UT_LIST_GET_LEN(trx->reply_signals) == 0);
  102. ut_a(trx->wait_lock == NULL);
  103. ut_a(UT_LIST_GET_LEN(trx->wait_thrs) == 0);
  104. if (trx->lock_heap) {
  105. mem_heap_free(trx->lock_heap);
  106. }
  107. ut_a(UT_LIST_GET_LEN(trx->trx_locks) == 0);
  108. if (trx->read_view_heap) {
  109. mem_heap_free(trx->read_view_heap);
  110. }
  111. ut_a(trx->read_view == NULL);
  112. mem_free(trx);
  113. }
  114. /************************************************************************
  115. Frees a transaction object for MySQL. */
  116. void
  117. trx_free_for_mysql(
  118. /*===============*/
  119. trx_t* trx) /* in, own: trx object */
  120. {
  121. thr_local_free(trx->mysql_thread_id);
  122. mutex_enter(&kernel_mutex);
  123. trx_free(trx);
  124. ut_a(trx_n_mysql_transactions > 0);
  125. trx_n_mysql_transactions--;
  126. mutex_exit(&kernel_mutex);
  127. }
  128. /********************************************************************
  129. Inserts the trx handle in the trx system trx list in the right position.
  130. The list is sorted on the trx id so that the biggest id is at the list
  131. start. This function is used at the database startup to insert incomplete
  132. transactions to the list. */
  133. static
  134. void
  135. trx_list_insert_ordered(
  136. /*====================*/
  137. trx_t* trx) /* in: trx handle */
  138. {
  139. trx_t* trx2;
  140. ut_ad(mutex_own(&kernel_mutex));
  141. trx2 = UT_LIST_GET_FIRST(trx_sys->trx_list);
  142. while (trx2 != NULL) {
  143. if (ut_dulint_cmp(trx->id, trx2->id) >= 0) {
  144. ut_ad(ut_dulint_cmp(trx->id, trx2->id) == 1);
  145. break;
  146. }
  147. trx2 = UT_LIST_GET_NEXT(trx_list, trx2);
  148. }
  149. if (trx2 != NULL) {
  150. trx2 = UT_LIST_GET_PREV(trx_list, trx2);
  151. if (trx2 == NULL) {
  152. UT_LIST_ADD_FIRST(trx_list, trx_sys->trx_list, trx);
  153. } else {
  154. UT_LIST_INSERT_AFTER(trx_list, trx_sys->trx_list,
  155. trx2, trx);
  156. }
  157. } else {
  158. UT_LIST_ADD_LAST(trx_list, trx_sys->trx_list, trx);
  159. }
  160. }
  161. /********************************************************************
  162. Creates trx objects for transactions and initializes the trx list of
  163. trx_sys at database start. Rollback segment and undo log lists must
  164. already exist when this function is called, because the lists of
  165. transactions to be rolled back or cleaned up are built based on the
  166. undo log lists. */
  167. void
  168. trx_lists_init_at_db_start(void)
  169. /*============================*/
  170. {
  171. trx_rseg_t* rseg;
  172. trx_undo_t* undo;
  173. trx_t* trx;
  174. UT_LIST_INIT(trx_sys->trx_list);
  175. /* Look from the rollback segments if there exist undo logs for
  176. transactions */
  177. rseg = UT_LIST_GET_FIRST(trx_sys->rseg_list);
  178. while (rseg != NULL) {
  179. undo = UT_LIST_GET_FIRST(rseg->insert_undo_list);
  180. while (undo != NULL) {
  181. trx = trx_create(NULL); 
  182. if (undo->state != TRX_UNDO_ACTIVE) {
  183. trx->conc_state = TRX_COMMITTED_IN_MEMORY;
  184. } else {
  185. trx->conc_state = TRX_ACTIVE;
  186. }
  187. trx->id = undo->trx_id;
  188. trx->insert_undo = undo;
  189. trx->rseg = rseg;
  190. if (undo->dict_operation) {
  191. trx->dict_operation = undo->dict_operation;
  192. trx->table_id = undo->table_id;
  193. }
  194. if (!undo->empty) {
  195. trx->undo_no = ut_dulint_add(undo->top_undo_no,
  196. 1);
  197. }
  198. trx_list_insert_ordered(trx);
  199. undo = UT_LIST_GET_NEXT(undo_list, undo);
  200. }
  201. undo = UT_LIST_GET_FIRST(rseg->update_undo_list);
  202. while (undo != NULL) {
  203. trx = trx_get_on_id(undo->trx_id);
  204. if (NULL == trx) {
  205. trx = trx_create(NULL); 
  206. if (undo->state != TRX_UNDO_ACTIVE) {
  207. trx->conc_state =
  208. TRX_COMMITTED_IN_MEMORY;
  209. } else {
  210. trx->conc_state = TRX_ACTIVE;
  211. }
  212. trx->id = undo->trx_id;
  213. trx->rseg = rseg;
  214. trx_list_insert_ordered(trx);
  215. if (undo->dict_operation) {
  216. trx->dict_operation =
  217. undo->dict_operation;
  218. trx->table_id = undo->table_id;
  219. }
  220. }
  221. trx->update_undo = undo;
  222. if ((!undo->empty)
  223.     && (ut_dulint_cmp(undo->top_undo_no, trx->undo_no)
  224.         >= 0)) {
  225. trx->undo_no = ut_dulint_add(undo->top_undo_no,
  226. 1);
  227. }
  228. undo = UT_LIST_GET_NEXT(undo_list, undo);
  229. }
  230. rseg = UT_LIST_GET_NEXT(rseg_list, rseg);
  231. }
  232. }
  233. /**********************************************************************
  234. Assigns a rollback segment to a transaction in a round-robin fashion.
  235. Skips the SYSTEM rollback segment if another is available. */
  236. UNIV_INLINE
  237. ulint
  238. trx_assign_rseg(void)
  239. /*=================*/
  240. /* out: assigned rollback segment id */
  241. {
  242. trx_rseg_t* rseg = trx_sys->latest_rseg;
  243. ut_ad(mutex_own(&kernel_mutex));
  244. loop:
  245. /* Get next rseg in a round-robin fashion */
  246. rseg = UT_LIST_GET_NEXT(rseg_list, rseg);
  247. if (rseg == NULL) {
  248. rseg = UT_LIST_GET_FIRST(trx_sys->rseg_list);
  249. }
  250. /* If it is the SYSTEM rollback segment, and there exist others, skip
  251. it */
  252. if ((rseg->id == TRX_SYS_SYSTEM_RSEG_ID) 
  253. && (UT_LIST_GET_LEN(trx_sys->rseg_list) > 1)) {
  254. goto loop;
  255. }
  256. trx_sys->latest_rseg = rseg;
  257. return(rseg->id);
  258. }
  259. /********************************************************************
  260. Starts a new transaction. */
  261. ibool
  262. trx_start_low(
  263. /*==========*/
  264. /* out: TRUE */
  265. trx_t*  trx, /* in: transaction */
  266. ulint rseg_id)/* in: rollback segment id; if ULINT_UNDEFINED
  267. is passed, the system chooses the rollback segment
  268. automatically in a round-robin fashion */
  269. {
  270. trx_rseg_t* rseg;
  271. ut_ad(mutex_own(&kernel_mutex));
  272. ut_ad(trx->rseg == NULL);
  273. if (trx->type == TRX_PURGE) {
  274. trx->id = ut_dulint_zero;
  275. trx->conc_state = TRX_ACTIVE;
  276. return(TRUE);
  277. }
  278. ut_ad(trx->conc_state != TRX_ACTIVE);
  279. if (rseg_id == ULINT_UNDEFINED) {
  280. rseg_id = trx_assign_rseg();
  281. }
  282. rseg = trx_sys_get_nth_rseg(trx_sys, rseg_id);
  283. trx->id = trx_sys_get_new_trx_id();
  284. /* The initial value for trx->no: ut_dulint_max is used in
  285. read_view_open_now: */
  286. trx->no = ut_dulint_max;
  287. trx->rseg = rseg;
  288. trx->conc_state = TRX_ACTIVE;
  289. UT_LIST_ADD_FIRST(trx_list, trx_sys->trx_list, trx);
  290. return(TRUE);
  291. }
  292. /********************************************************************
  293. Starts a new transaction. */
  294. ibool
  295. trx_start(
  296. /*======*/
  297. /* out: TRUE */
  298. trx_t*  trx, /* in: transaction */
  299. ulint rseg_id)/* in: rollback segment id; if ULINT_UNDEFINED
  300. is passed, the system chooses the rollback segment
  301. automatically in a round-robin fashion */
  302. {
  303. ibool ret;
  304. mutex_enter(&kernel_mutex);
  305. ret = trx_start_low(trx, rseg_id);
  306. mutex_exit(&kernel_mutex);
  307. return(ret);
  308. }
  309. /********************************************************************
  310. Commits a transaction. */
  311. void
  312. trx_commit_off_kernel(
  313. /*==================*/
  314. trx_t* trx) /* in: transaction */
  315. {
  316. page_t* update_hdr_page;
  317. dulint lsn;
  318. trx_rseg_t* rseg;
  319. trx_undo_t* undo;
  320. ibool must_flush_log = FALSE;
  321. mtr_t mtr;
  322. ut_ad(mutex_own(&kernel_mutex));
  323. rseg = trx->rseg;
  324. if ((trx->insert_undo != NULL) || (trx->update_undo != NULL)) {
  325. mutex_exit(&kernel_mutex);
  326. mtr_start(&mtr);
  327. must_flush_log = TRUE;
  328. /* Change the undo log segment states from TRX_UNDO_ACTIVE
  329. to some other state: these modifications to the file data
  330. structure define the transaction as committed in the file
  331. based world, at the serialization point of the log sequence
  332. number lsn obtained below. */
  333. mutex_enter(&(rseg->mutex));
  334. if (trx->insert_undo != NULL) {
  335. trx_undo_set_state_at_finish(trx, trx->insert_undo,
  336. &mtr);
  337. }
  338. undo = trx->update_undo;
  339. if (undo) {
  340. mutex_enter(&kernel_mutex);
  341. #ifdef TRX_UPDATE_UNDO_OPT
  342. if (!undo->del_marks && (undo->size == 1)
  343.     && (UT_LIST_GET_LEN(trx_sys->view_list) == 1)) {
  344.      /* There is no need to save the update undo
  345.      log: discard it; note that &mtr gets committed
  346.      while we must hold the kernel mutex and
  347. therefore this optimization may add to the
  348. contention of the kernel mutex. */
  349.      lsn = trx_undo_update_cleanup_by_discard(trx,
  350. &mtr);
  351. mutex_exit(&(rseg->mutex));
  352.      goto shortcut;
  353. }
  354. #endif
  355. trx->no = trx_sys_get_new_trx_no();
  356. mutex_exit(&kernel_mutex);
  357. /* It is not necessary to obtain trx->undo_mutex here
  358. because only a single OS thread is allowed to do the
  359. transaction commit for this transaction. */
  360. update_hdr_page = trx_undo_set_state_at_finish(trx,
  361. undo, &mtr);
  362. /* We have to do the cleanup for the update log while
  363. holding the rseg mutex because update log headers
  364. have to be put to the history list in the order of
  365. the trx number. */
  366. trx_undo_update_cleanup(trx, update_hdr_page, &mtr);
  367. }
  368. mutex_exit(&(rseg->mutex));
  369. /* If we did not take the shortcut, the following call
  370. commits the mini-transaction, making the whole transaction
  371. committed in the file-based world at this log sequence number;
  372. otherwise, we get the commit lsn from the call of
  373. trx_undo_update_cleanup_by_discard above.
  374. NOTE that transaction numbers, which are assigned only to
  375. transactions with an update undo log, do not necessarily come
  376. in exactly the same order as commit lsn's, if the transactions
  377. have different rollback segments. To get exactly the same
  378. order we should hold the kernel mutex up to this point,
  379. adding to to the contention of the kernel mutex. However, if
  380. a transaction T2 is able to see modifications made by
  381. a transaction T1, T2 will always get a bigger transaction
  382. number and a bigger commit lsn than T1. */
  383. /*--------------*/
  384.   mtr_commit(&mtr);
  385.   /*--------------*/
  386.   lsn = mtr.end_lsn;
  387. mutex_enter(&kernel_mutex);
  388. }
  389. #ifdef TRX_UPDATE_UNDO_OPT
  390. shortcut:
  391. #endif
  392. ut_ad(trx->conc_state == TRX_ACTIVE);
  393. ut_ad(mutex_own(&kernel_mutex));
  394. /* The following assignment makes the transaction committed in memory
  395. and makes its changes to data visible to other transactions.
  396. NOTE that there is a small discrepancy from the strict formal
  397. visibility rules here: a human user of the database can see
  398. modifications made by another transaction T even before the necessary
  399. log segment has been flushed to the disk. If the database happens to
  400. crash before the flush, the user has seen modifications from T which
  401. will never be a committed transaction. However, any transaction T2
  402. which sees the modifications of the committing transaction T, and
  403. which also itself makes modifications to the database, will get an lsn
  404. larger than the committing transaction T. In the case where the log
  405. flush fails, and T never gets committed, also T2 will never get
  406. committed. */
  407. /*--------------------------------------*/
  408. trx->conc_state = TRX_COMMITTED_IN_MEMORY;
  409. /*--------------------------------------*/
  410. lock_release_off_kernel(trx);
  411. if (trx->read_view) {
  412. read_view_close(trx->read_view);
  413. mem_heap_empty(trx->read_view_heap);
  414. trx->read_view = NULL;
  415. }
  416. /* printf("Trx %lu commit finishedn", ut_dulint_get_low(trx->id)); */
  417. if (must_flush_log) {
  418. mutex_exit(&kernel_mutex);
  419. if (trx->insert_undo != NULL) {
  420. trx_undo_insert_cleanup(trx);
  421. }
  422. /* NOTE that we could possibly make a group commit more
  423. efficient here: call os_thread_yield here to allow also other
  424. trxs to come to commit! */
  425. /* We now flush the log, as the transaction made changes to
  426. the database, making the transaction committed on disk. It is
  427. enough that any one of the log groups gets written to disk. */
  428. /*-------------------------------------*/
  429. /* Only in some performance tests the variable srv_flush..
  430. will be set to FALSE: */
  431. if (srv_flush_log_at_trx_commit) {
  432.   log_flush_up_to(lsn, LOG_WAIT_ONE_GROUP);
  433.   }
  434. /*-------------------------------------*/
  435. mutex_enter(&kernel_mutex);
  436. }
  437. trx->conc_state = TRX_NOT_STARTED;
  438. trx->rseg = NULL;
  439. trx->undo_no = ut_dulint_zero;
  440. trx->last_sql_stat_start.least_undo_no = ut_dulint_zero;
  441. ut_ad(UT_LIST_GET_LEN(trx->wait_thrs) == 0);
  442. ut_ad(UT_LIST_GET_LEN(trx->trx_locks) == 0);
  443. UT_LIST_REMOVE(trx_list, trx_sys->trx_list, trx);
  444. }
  445. /************************************************************************
  446. Assigns a read view for a consistent read query. All the consistent reads
  447. within the same transaction will get the same read view, which is created
  448. when this function is first called for a new started transaction. */
  449. read_view_t*
  450. trx_assign_read_view(
  451. /*=================*/
  452. /* out: consistent read view */
  453. trx_t* trx) /* in: active transaction */
  454. {
  455. ut_ad(trx->conc_state == TRX_ACTIVE);
  456. if (trx->read_view) {
  457. return(trx->read_view);
  458. }
  459. mutex_enter(&kernel_mutex);
  460. if (!trx->read_view) {
  461. trx->read_view = read_view_open_now(trx, trx->read_view_heap);
  462. }
  463. mutex_exit(&kernel_mutex);
  464. return(trx->read_view);
  465. }
  466. /********************************************************************
  467. Commits a transaction. NOTE that the kernel mutex is temporarily released. */
  468. static
  469. void
  470. trx_handle_commit_sig_off_kernel(
  471. /*=============================*/
  472. trx_t* trx, /* in: transaction */
  473. que_thr_t** next_thr) /* in/out: next query thread to run;
  474. if the value which is passed in is
  475. a pointer to a NULL pointer, then the
  476. calling function can start running
  477. a new query thread */
  478. {
  479. trx_sig_t* sig;
  480. trx_sig_t* next_sig;
  481. ut_ad(mutex_own(&kernel_mutex));
  482. trx->que_state = TRX_QUE_COMMITTING;
  483. trx_commit_off_kernel(trx);
  484. ut_ad(UT_LIST_GET_LEN(trx->wait_thrs) == 0);
  485. /* Remove all TRX_SIG_COMMIT signals from the signal queue and send
  486. reply messages to them */
  487. sig = UT_LIST_GET_FIRST(trx->signals);
  488. while (sig != NULL) {
  489. next_sig = UT_LIST_GET_NEXT(signals, sig);
  490. if (sig->type == TRX_SIG_COMMIT) {
  491. trx_sig_reply(trx, sig, next_thr);
  492. trx_sig_remove(trx, sig);
  493. }
  494. sig = next_sig;
  495. }
  496. trx->que_state = TRX_QUE_RUNNING;
  497. }
  498. /***************************************************************
  499. The transaction must be in the TRX_QUE_LOCK_WAIT state. Puts it to
  500. the TRX_QUE_RUNNING state and releases query threads which were
  501. waiting for a lock in the wait_thrs list. */
  502. void
  503. trx_end_lock_wait(
  504. /*==============*/
  505. trx_t* trx) /* in: transaction */
  506. {
  507. que_thr_t* thr;
  508. ut_ad(mutex_own(&kernel_mutex));
  509. ut_ad(trx->que_state == TRX_QUE_LOCK_WAIT);
  510. thr = UT_LIST_GET_FIRST(trx->wait_thrs);
  511. while (thr != NULL) {
  512. que_thr_end_wait_no_next_thr(thr);
  513. UT_LIST_REMOVE(trx_thrs, trx->wait_thrs, thr);
  514. thr = UT_LIST_GET_FIRST(trx->wait_thrs);
  515. }
  516. trx->que_state = TRX_QUE_RUNNING;
  517. }
  518. /***************************************************************
  519. Moves the query threads in the lock wait list to the SUSPENDED state and puts
  520. the transaction to the TRX_QUE_RUNNING state. */
  521. static
  522. void
  523. trx_lock_wait_to_suspended(
  524. /*=======================*/
  525. trx_t* trx) /* in: transaction in the TRX_QUE_LOCK_WAIT state */
  526. {
  527. que_thr_t* thr;
  528. ut_ad(mutex_own(&kernel_mutex));
  529. ut_ad(trx->que_state == TRX_QUE_LOCK_WAIT);
  530. thr = UT_LIST_GET_FIRST(trx->wait_thrs);
  531. while (thr != NULL) {
  532. thr->state = QUE_THR_SUSPENDED;
  533. UT_LIST_REMOVE(trx_thrs, trx->wait_thrs, thr);
  534. thr = UT_LIST_GET_FIRST(trx->wait_thrs);
  535. }
  536. trx->que_state = TRX_QUE_RUNNING;
  537. }
  538. /***************************************************************
  539. Moves the query threads in the sig reply wait list of trx to the SUSPENDED
  540. state. */
  541. static
  542. void
  543. trx_sig_reply_wait_to_suspended(
  544. /*============================*/
  545. trx_t* trx) /* in: transaction */
  546. {
  547. trx_sig_t* sig;
  548. que_thr_t* thr;
  549. ut_ad(mutex_own(&kernel_mutex));
  550. sig = UT_LIST_GET_FIRST(trx->reply_signals);
  551. while (sig != NULL) {
  552. thr = sig->receiver;
  553. ut_ad(thr->state == QUE_THR_SIG_REPLY_WAIT);
  554. thr->state = QUE_THR_SUSPENDED;
  555. sig->receiver = NULL;
  556. sig->reply = FALSE;
  557. UT_LIST_REMOVE(reply_signals, trx->reply_signals, sig);
  558. sig = UT_LIST_GET_FIRST(trx->reply_signals);
  559. }
  560. }
  561. /*********************************************************************
  562. Checks the compatibility of a new signal with the other signals in the
  563. queue. */
  564. static
  565. ibool
  566. trx_sig_is_compatible(
  567. /*==================*/
  568. /* out: TRUE if the signal can be queued */
  569. trx_t* trx, /* in: trx handle */
  570. ulint type, /* in: signal type */
  571. ulint sender) /* in: TRX_SIG_SELF or TRX_SIG_OTHER_SESS */
  572. {
  573. trx_sig_t* sig;
  574. ut_ad(mutex_own(&kernel_mutex));
  575. if (UT_LIST_GET_LEN(trx->signals) == 0) {
  576. return(TRUE);
  577. }
  578. if (sender == TRX_SIG_SELF) {
  579. if (type == TRX_SIG_ERROR_OCCURRED) {
  580. return(TRUE);
  581. } else if (type == TRX_SIG_BREAK_EXECUTION) {
  582. return(TRUE);
  583. } else {
  584. return(FALSE);
  585. }
  586. }
  587. ut_ad(sender == TRX_SIG_OTHER_SESS);
  588. sig = UT_LIST_GET_FIRST(trx->signals);
  589. if (type == TRX_SIG_COMMIT) {
  590. while (sig != NULL) {
  591. if (sig->type == TRX_SIG_TOTAL_ROLLBACK) {
  592. return(FALSE);
  593. }
  594. sig = UT_LIST_GET_NEXT(signals, sig);
  595. }
  596.   return(TRUE);
  597. } else if (type == TRX_SIG_TOTAL_ROLLBACK) {
  598. while (sig != NULL) {
  599. if (sig->type == TRX_SIG_COMMIT) {
  600. return(FALSE);
  601. }
  602. sig = UT_LIST_GET_NEXT(signals, sig);
  603. }
  604. return(TRUE);
  605. } else if (type == TRX_SIG_BREAK_EXECUTION) {
  606. return(TRUE);
  607. } else {
  608. ut_error;
  609. return(FALSE);
  610. }
  611. }
  612. /********************************************************************
  613. Sends a signal to a trx object. */
  614. ibool
  615. trx_sig_send(
  616. /*=========*/
  617. /* out: TRUE if the signal was
  618. successfully delivered */
  619. trx_t* trx, /* in: trx handle */
  620. ulint type, /* in: signal type */
  621. ulint sender, /* in: TRX_SIG_SELF or
  622. TRX_SIG_OTHER_SESS */
  623. ibool reply, /* in: TRUE if the sender of the signal
  624. wants reply after the operation induced
  625. by the signal is completed; if type
  626. is TRX_SIG_END_WAIT, this must be
  627. FALSE */
  628. que_thr_t* receiver_thr, /* in: query thread which wants the
  629. reply, or NULL */
  630. trx_savept_t*  savept, /* in: possible rollback savepoint, or
  631. NULL */
  632. que_thr_t** next_thr) /* in/out: next query thread to run;
  633. if the value which is passed in is
  634. a pointer to a NULL pointer, then the
  635. calling function can start running
  636. a new query thread; if the parameter
  637. is NULL, it is ignored */
  638. {
  639. trx_sig_t* sig;
  640. trx_t* receiver_trx;
  641. ut_ad(trx);
  642. ut_ad(mutex_own(&kernel_mutex));
  643. if (!trx_sig_is_compatible(trx, type, sender)) {
  644. /* The signal is not compatible with the other signals in
  645. the queue: do nothing */
  646. ut_a(0);
  647. /* sess_raise_error_low(trx, 0, 0, NULL, NULL, NULL, NULL,
  648. "Incompatible signal"); */
  649. return(FALSE);
  650. }
  651. /* Queue the signal object */
  652. if (UT_LIST_GET_LEN(trx->signals) == 0) {
  653. /* The signal list is empty: the 'sig' slot must be unused
  654. (we improve performance a bit by avoiding mem_alloc) */
  655. sig = &(trx->sig);
  656.   } else {
  657. /* It might be that the 'sig' slot is unused also in this
  658. case, but we choose the easy way of using mem_alloc */
  659.  
  660. sig = mem_alloc(sizeof(trx_sig_t));
  661. }
  662. UT_LIST_ADD_LAST(signals, trx->signals, sig);
  663. sig->type = type;
  664. sig->state = TRX_SIG_WAITING;
  665. sig->sender = sender;
  666. sig->reply = reply;
  667. sig->receiver = receiver_thr;
  668. if (savept) {
  669. sig->savept = *savept;
  670. }
  671. if (receiver_thr) {
  672. receiver_trx = thr_get_trx(receiver_thr);
  673. UT_LIST_ADD_LAST(reply_signals, receiver_trx->reply_signals,
  674. sig);
  675. }
  676. if (trx->sess->state == SESS_ERROR) {
  677. trx_sig_reply_wait_to_suspended(trx);
  678. }
  679. if ((sender != TRX_SIG_SELF) || (type == TRX_SIG_BREAK_EXECUTION)) {
  680. /* The following call will add a TRX_SIG_ERROR_OCCURRED
  681. signal to the end of the queue, if the session is not yet
  682. in the error state: */
  683. ut_a(0);
  684. sess_raise_error_low(trx, 0, 0, NULL, NULL, NULL, NULL,
  685.   "Signal from another session, or a break execution signal");
  686. }
  687. /* If there were no other signals ahead in the queue, try to start
  688. handling of the signal */
  689. if (UT_LIST_GET_FIRST(trx->signals) == sig) {
  690. trx_sig_start_handle(trx, next_thr);
  691. }
  692. return(TRUE);
  693. }
  694. /********************************************************************
  695. Ends signal handling. If the session is in the error state, and
  696. trx->graph_before_signal_handling != NULL, then returns control to the error
  697. handling routine of the graph (currently just returns the control to the
  698. graph root which then will send an error message to the client). */
  699. void
  700. trx_end_signal_handling(
  701. /*====================*/
  702. trx_t* trx) /* in: trx */
  703. {
  704. ut_ad(mutex_own(&kernel_mutex));
  705. ut_ad(trx->handling_signals == TRUE);
  706. trx->handling_signals = FALSE;
  707. trx->graph = trx->graph_before_signal_handling;
  708. if (trx->graph && (trx->sess->state == SESS_ERROR)) {
  709. que_fork_error_handle(trx, trx->graph);
  710. }
  711. }
  712. /********************************************************************
  713. Starts handling of a trx signal. */
  714. void
  715. trx_sig_start_handle(
  716. /*=================*/
  717. trx_t* trx, /* in: trx handle */
  718. que_thr_t** next_thr) /* in/out: next query thread to run;
  719. if the value which is passed in is
  720. a pointer to a NULL pointer, then the
  721. calling function can start running
  722. a new query thread; if the parameter
  723. is NULL, it is ignored */
  724. {
  725. trx_sig_t* sig;
  726. ulint type;
  727. loop:
  728. /* We loop in this function body as long as there are queued signals
  729. we can process immediately */
  730. ut_ad(trx);
  731. ut_ad(mutex_own(&kernel_mutex));
  732. if (trx->handling_signals && (UT_LIST_GET_LEN(trx->signals) == 0)) {
  733. trx_end_signal_handling(trx);
  734. return;
  735. }
  736. if (trx->conc_state == TRX_NOT_STARTED) {
  737. trx_start_low(trx, ULINT_UNDEFINED);
  738. }
  739. /* If the trx is in a lock wait state, moves the waiting query threads
  740. to the suspended state */
  741. if (trx->que_state == TRX_QUE_LOCK_WAIT) {
  742. trx_lock_wait_to_suspended(trx);
  743. }
  744. /* If the session is in the error state and this trx has threads
  745. waiting for reply from signals, moves these threads to the suspended
  746. state, canceling wait reservations; note that if the transaction has
  747. sent a commit or rollback signal to itself, and its session is not in
  748. the error state, then nothing is done here. */
  749. if (trx->sess->state == SESS_ERROR) {
  750. trx_sig_reply_wait_to_suspended(trx);
  751. }
  752. /* If there are no running query threads, we can start processing of a
  753. signal, otherwise we have to wait until all query threads of this
  754. transaction are aware of the arrival of the signal. */
  755. if (trx->n_active_thrs > 0) {
  756. return;
  757. }
  758. if (trx->handling_signals == FALSE) {
  759. trx->graph_before_signal_handling = trx->graph;
  760. trx->handling_signals = TRUE;
  761. }
  762. sig = UT_LIST_GET_FIRST(trx->signals);
  763. type = sig->type;
  764. if (type == TRX_SIG_COMMIT) {
  765. trx_handle_commit_sig_off_kernel(trx, next_thr);
  766. } else if ((type == TRX_SIG_TOTAL_ROLLBACK)
  767. || (type == TRX_SIG_ROLLBACK_TO_SAVEPT)) { 
  768. trx_rollback(trx, sig, next_thr);
  769. /* No further signals can be handled until the rollback
  770. completes, therefore we return */
  771. return;
  772. } else if (type == TRX_SIG_ERROR_OCCURRED) {
  773. trx_rollback(trx, sig, next_thr);
  774. /* No further signals can be handled until the rollback
  775. completes, therefore we return */
  776. return;
  777. } else if (type == TRX_SIG_BREAK_EXECUTION) {
  778. trx_sig_reply(trx, sig, next_thr);
  779. trx_sig_remove(trx, sig);
  780. } else {
  781. ut_error;
  782. }
  783. goto loop;
  784. }
  785. /********************************************************************
  786. Send the reply message when a signal in the queue of the trx has been
  787. handled. */
  788. void
  789. trx_sig_reply(
  790. /*==========*/
  791. trx_t* trx, /* in: trx handle */
  792. trx_sig_t* sig, /* in: signal */
  793. que_thr_t** next_thr) /* in/out: next query thread to run;
  794. if the value which is passed in is
  795. a pointer to a NULL pointer, then the
  796. calling function can start running
  797. a new query thread */
  798. {
  799. trx_t* receiver_trx;
  800. ut_ad(trx && sig);
  801. ut_ad(mutex_own(&kernel_mutex));
  802. if (sig->reply && (sig->receiver != NULL)) {
  803. ut_ad((sig->receiver)->state == QUE_THR_SIG_REPLY_WAIT);
  804. receiver_trx = thr_get_trx(sig->receiver);
  805. UT_LIST_REMOVE(reply_signals, receiver_trx->reply_signals,
  806. sig);
  807. ut_ad(receiver_trx->sess->state != SESS_ERROR);
  808. que_thr_end_wait(sig->receiver, next_thr);
  809. sig->reply = FALSE;
  810. sig->receiver = NULL;
  811. } else if (sig->reply) {
  812. /* In this case the reply should be sent to the client of
  813. the session of the transaction */
  814. sig->reply = FALSE;
  815. sig->receiver = NULL;
  816. sess_srv_msg_send_simple(trx->sess, SESS_SRV_SUCCESS,
  817. SESS_NOT_RELEASE_KERNEL);
  818. }
  819. }
  820. /********************************************************************
  821. Removes a signal object from the trx signal queue. */
  822. void
  823. trx_sig_remove(
  824. /*===========*/
  825. trx_t* trx, /* in: trx handle */
  826. trx_sig_t* sig) /* in, own: signal */
  827. {
  828. ut_ad(trx && sig);
  829. ut_ad(mutex_own(&kernel_mutex));
  830. ut_ad(sig->reply == FALSE);
  831. ut_ad(sig->receiver == NULL);
  832. UT_LIST_REMOVE(signals, trx->signals, sig);
  833. sig->type = 0; /* reset the field to catch possible bugs */
  834. if (sig != &(trx->sig)) {
  835. mem_free(sig);
  836. }
  837. }
  838. /*************************************************************************
  839. Creates a commit command node struct. */
  840. commit_node_t*
  841. commit_node_create(
  842. /*===============*/
  843. /* out, own: commit node struct */
  844. mem_heap_t* heap) /* in: mem heap where created */
  845. {
  846. commit_node_t* node;
  847. node = mem_heap_alloc(heap, sizeof(commit_node_t));
  848. node->common.type  = QUE_NODE_COMMIT;
  849. node->state = COMMIT_NODE_SEND;
  850. return(node);
  851. }
  852. /***************************************************************
  853. Performs an execution step for a commit type node in a query graph. */
  854. que_thr_t*
  855. trx_commit_step(
  856. /*============*/
  857. /* out: query thread to run next, or NULL */
  858. que_thr_t* thr) /* in: query thread */
  859. {
  860. commit_node_t* node;
  861. que_thr_t* next_thr;
  862. ibool success;
  863. node = thr->run_node;
  864. ut_ad(que_node_get_type(node) == QUE_NODE_COMMIT);
  865. if (thr->prev_node == que_node_get_parent(node)) {
  866. node->state = COMMIT_NODE_SEND;
  867. }
  868. if (node->state == COMMIT_NODE_SEND) {
  869. mutex_enter(&kernel_mutex);
  870. node->state = COMMIT_NODE_WAIT;
  871. next_thr = NULL;
  872. thr->state = QUE_THR_SIG_REPLY_WAIT;
  873. /* Send the commit signal to the transaction */
  874. success = trx_sig_send(thr_get_trx(thr), TRX_SIG_COMMIT,
  875. TRX_SIG_SELF, TRUE, thr, NULL,
  876. &next_thr);
  877. mutex_exit(&kernel_mutex);
  878. if (!success) {
  879. /* Error in delivering the commit signal */
  880. que_thr_handle_error(thr, DB_ERROR, NULL, 0);
  881. }
  882. return(next_thr);
  883. }
  884. ut_ad(node->state == COMMIT_NODE_WAIT);
  885. node->state = COMMIT_NODE_SEND;
  886. thr->run_node = que_node_get_parent(node);
  887. return(thr);
  888. }
  889. /**************************************************************************
  890. Does the transaction commit for MySQL. */
  891. ulint
  892. trx_commit_for_mysql(
  893. /*=================*/
  894. /* out: 0 or error number */
  895. trx_t* trx) /* in: trx handle */
  896. {
  897. /* Because we do not do the commit by sending an Innobase
  898. sig to the transaction, we must here make sure that trx has been
  899. started. */
  900. trx_start_if_not_started(trx);
  901. mutex_enter(&kernel_mutex);
  902. trx_commit_off_kernel(trx);
  903. mutex_exit(&kernel_mutex);
  904. return(0);
  905. }
  906. /**************************************************************************
  907. Marks the latest SQL statement ended. */
  908. void
  909. trx_mark_sql_stat_end(
  910. /*==================*/
  911. trx_t* trx) /* in: trx handle */
  912. {
  913. trx_start_if_not_started(trx);
  914. mutex_enter(&kernel_mutex);
  915. trx->last_sql_stat_start.least_undo_no = trx->undo_no;
  916. mutex_exit(&kernel_mutex);
  917. }