que0que.c
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:30k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /******************************************************
  2. Query graph
  3. (c) 1996 Innobase Oy
  4. Created 5/27/1996 Heikki Tuuri
  5. *******************************************************/
  6. #include "que0que.h"
  7.  
  8. #ifdef UNIV_NONINL
  9. #include "que0que.ic"
  10. #endif
  11. #include "srv0que.h"
  12. #include "usr0sess.h"
  13. #include "trx0trx.h"
  14. #include "trx0roll.h"
  15. #include "row0undo.h"
  16. #include "row0ins.h"
  17. #include "row0upd.h"
  18. #include "row0sel.h"
  19. #include "row0purge.h"
  20. #include "dict0crea.h"
  21. #include "log0log.h"
  22. #include "eval0proc.h"
  23. #include "eval0eval.h"
  24. #define QUE_PARALLELIZE_LIMIT (64 * 256 * 256 * 256)
  25. #define QUE_ROUND_ROBIN_LIMIT (64 * 256 * 256 * 256)
  26. #define QUE_MAX_LOOPS_WITHOUT_CHECK 16
  27. /* If the following flag is set TRUE, the module will print trace info
  28. of SQL execution in the UNIV_SQL_DEBUG version */
  29. ibool que_trace_on = FALSE;
  30. ibool que_always_false = FALSE;
  31. /* How a stored procedure containing COMMIT or ROLLBACK commands
  32. is executed?
  33. The commit or rollback can be seen as a subprocedure call.
  34. The problem is that if there are several query threads
  35. currently running within the transaction, their action could
  36. mess the commit or rollback operation. Or, at the least, the
  37. operation would be difficult to visualize and keep in control.
  38. Therefore the query thread requesting a commit or a rollback
  39. sends to the transaction a signal, which moves the transaction
  40. to TRX_QUE_SIGNALED state. All running query threads of the
  41. transaction will eventually notice that the transaction is now in
  42. this state and voluntarily suspend themselves. Only the last
  43. query thread which suspends itself will trigger handling of
  44. the signal.
  45. When the transaction starts to handle a rollback or commit
  46. signal, it builds a query graph which, when executed, will
  47. roll back or commit the incomplete transaction. The transaction
  48. is moved to the TRX_QUE_ROLLING_BACK or TRX_QUE_COMMITTING state.
  49. If specified, the SQL cursors opened by the transaction are closed.
  50. When the execution of the graph completes, it is like returning
  51. from a subprocedure: the query thread which requested the operation
  52. starts running again. */
  53. /**************************************************************************
  54. Moves a thread from another state to the QUE_THR_RUNNING state. Increments
  55. the n_active_thrs counters of the query graph and transaction.
  56. ***NOTE***: This is the only function in which such a transition is allowed
  57. to happen! */
  58. static
  59. void
  60. que_thr_move_to_run_state(
  61. /*======================*/
  62. que_thr_t* thr); /* in: an query thread */
  63. /***************************************************************************
  64. Adds a query graph to the session's list of graphs. */
  65. void
  66. que_graph_publish(
  67. /*==============*/
  68. que_t* graph, /* in: graph */
  69. sess_t* sess) /* in: session */
  70. {
  71. #ifdef UNIV_SYNC_DEBUG
  72. ut_ad(mutex_own(&kernel_mutex));
  73. #endif /* UNIV_SYNC_DEBUG */
  74. UT_LIST_ADD_LAST(graphs, sess->graphs, graph);
  75. }
  76. /***************************************************************************
  77. Creates a query graph fork node. */
  78. que_fork_t*
  79. que_fork_create(
  80. /*============*/
  81. /* out, own: fork node */
  82. que_t* graph, /* in: graph, if NULL then this
  83. fork node is assumed to be the
  84. graph root */
  85. que_node_t* parent, /* in: parent node */
  86. ulint fork_type, /* in: fork type */
  87. mem_heap_t* heap) /* in: memory heap where created */
  88. {
  89. que_fork_t* fork;
  90. ut_ad(heap);
  91. fork = mem_heap_alloc(heap, sizeof(que_fork_t));
  92. fork->common.type = QUE_NODE_FORK;
  93. fork->n_active_thrs = 0;
  94. fork->state = QUE_FORK_COMMAND_WAIT;
  95. if (graph != NULL) {
  96. fork->graph = graph;
  97. } else {
  98. fork->graph = fork;
  99. }
  100. fork->common.parent = parent;
  101. fork->fork_type = fork_type;
  102. fork->caller = NULL;
  103. UT_LIST_INIT(fork->thrs);
  104. fork->sym_tab = NULL;
  105. fork->heap = heap;
  106. return(fork);
  107. }
  108. /***************************************************************************
  109. Creates a query graph thread node. */
  110. que_thr_t*
  111. que_thr_create(
  112. /*===========*/
  113. /* out, own: query thread node */
  114. que_fork_t* parent, /* in: parent node, i.e., a fork node */
  115. mem_heap_t* heap) /* in: memory heap where created */
  116. {
  117. que_thr_t* thr;
  118. ut_ad(parent && heap);
  119. thr = mem_heap_alloc(heap, sizeof(que_thr_t));
  120. thr->common.type = QUE_NODE_THR;
  121. thr->common.parent = parent;
  122. thr->magic_n = QUE_THR_MAGIC_N;
  123. thr->graph = parent->graph;
  124. thr->state = QUE_THR_COMMAND_WAIT;
  125. thr->is_active = FALSE;
  126. thr->run_node = NULL;
  127. thr->resource = 0;
  128. UT_LIST_ADD_LAST(thrs, parent->thrs, thr);
  129. return(thr);
  130. }
  131. /**************************************************************************
  132. Moves a suspended query thread to the QUE_THR_RUNNING state and may release
  133. a single worker thread to execute it. This function should be used to end
  134. the wait state of a query thread waiting for a lock or a stored procedure
  135. completion. */
  136. void
  137. que_thr_end_wait(
  138. /*=============*/
  139. que_thr_t* thr, /* in: query thread in the
  140. QUE_THR_LOCK_WAIT,
  141. or QUE_THR_PROCEDURE_WAIT, or
  142. QUE_THR_SIG_REPLY_WAIT state */
  143. que_thr_t** next_thr) /* in/out: next query thread to run;
  144. if the value which is passed in is
  145. a pointer to a NULL pointer, then the
  146. calling function can start running
  147. a new query thread; if NULL is passed
  148. as the parameter, it is ignored */
  149. {
  150. ibool was_active;
  151. #ifdef UNIV_SYNC_DEBUG
  152. ut_ad(mutex_own(&kernel_mutex));
  153. #endif /* UNIV_SYNC_DEBUG */
  154. ut_ad(thr);
  155. ut_ad((thr->state == QUE_THR_LOCK_WAIT)
  156.       || (thr->state == QUE_THR_PROCEDURE_WAIT)
  157.       || (thr->state == QUE_THR_SIG_REPLY_WAIT));
  158. ut_ad(thr->run_node);
  159. thr->prev_node = thr->run_node;
  160. was_active = thr->is_active;
  161. que_thr_move_to_run_state(thr);
  162. if (was_active) {
  163. return;
  164. }
  165. if (next_thr && *next_thr == NULL) {
  166. *next_thr = thr;
  167. } else {
  168. ut_a(0);
  169. srv_que_task_enqueue_low(thr);
  170. }
  171. }
  172. /**************************************************************************
  173. Same as que_thr_end_wait, but no parameter next_thr available. */
  174. void
  175. que_thr_end_wait_no_next_thr(
  176. /*=========================*/
  177. que_thr_t* thr) /* in: query thread in the QUE_THR_LOCK_WAIT,
  178. or QUE_THR_PROCEDURE_WAIT, or
  179. QUE_THR_SIG_REPLY_WAIT state */
  180. {
  181. ibool was_active;
  182. ut_a(thr->state == QUE_THR_LOCK_WAIT); /* In MySQL this is the
  183. only possible state here */
  184. #ifdef UNIV_SYNC_DEBUG
  185. ut_ad(mutex_own(&kernel_mutex));
  186. #endif /* UNIV_SYNC_DEBUG */
  187. ut_ad(thr);
  188. ut_ad((thr->state == QUE_THR_LOCK_WAIT)
  189.       || (thr->state == QUE_THR_PROCEDURE_WAIT)
  190.       || (thr->state == QUE_THR_SIG_REPLY_WAIT));
  191. was_active = thr->is_active;
  192. que_thr_move_to_run_state(thr);
  193. if (was_active) {
  194. return;
  195. }
  196. /* In MySQL we let the OS thread (not just the query thread) to wait
  197. for the lock to be released: */
  198. srv_release_mysql_thread_if_suspended(thr);
  199. /* srv_que_task_enqueue_low(thr); */
  200. }
  201. /**************************************************************************
  202. Inits a query thread for a command. */
  203. UNIV_INLINE
  204. void
  205. que_thr_init_command(
  206. /*=================*/
  207. que_thr_t* thr) /* in: query thread */
  208. {
  209. thr->run_node = thr;
  210. thr->prev_node = thr->common.parent;
  211. que_thr_move_to_run_state(thr);
  212. }
  213. /**************************************************************************
  214. Starts execution of a command in a query fork. Picks a query thread which
  215. is not in the QUE_THR_RUNNING state and moves it to that state. If none
  216. can be chosen, a situation which may arise in parallelized fetches, NULL
  217. is returned. */
  218. que_thr_t*
  219. que_fork_start_command(
  220. /*===================*/
  221. /* out: a query thread of the graph moved to
  222. QUE_THR_RUNNING state, or NULL; the query
  223. thread should be executed by que_run_threads
  224. by the caller */
  225. que_fork_t*  fork) /* in: a query fork */
  226. {
  227. que_thr_t* thr;
  228. fork->state = QUE_FORK_ACTIVE;
  229. fork->last_sel_node = NULL;
  230. /* Choose the query thread to run: usually there is just one thread,
  231. but in a parallelized select, which necessarily is non-scrollable,
  232. there may be several to choose from */
  233. /*---------------------------------------------------------------
  234. First we try to find a query thread in the QUE_THR_COMMAND_WAIT state */
  235. thr = UT_LIST_GET_FIRST(fork->thrs);
  236. while (thr != NULL) {
  237. if (thr->state == QUE_THR_COMMAND_WAIT) {
  238. /* We have to send the initial message to query thread
  239. to start it */
  240. que_thr_init_command(thr);
  241. return(thr);
  242. }
  243. ut_ad(thr->state != QUE_THR_LOCK_WAIT);
  244. thr = UT_LIST_GET_NEXT(thrs, thr);
  245. }
  246. /*----------------------------------------------------------------
  247. Then we try to find a query thread in the QUE_THR_SUSPENDED state */
  248. thr = UT_LIST_GET_FIRST(fork->thrs);
  249. while (thr != NULL) {
  250. if (thr->state == QUE_THR_SUSPENDED) {
  251. /* In this case the execution of the thread was
  252. suspended: no initial message is needed because
  253. execution can continue from where it was left */
  254. que_thr_move_to_run_state(thr);
  255. return(thr);
  256. }
  257. thr = UT_LIST_GET_NEXT(thrs, thr);
  258. }
  259. /*-----------------------------------------------------------------
  260. Then we try to find a query thread in the QUE_THR_COMPLETED state */
  261. thr = UT_LIST_GET_FIRST(fork->thrs);
  262. while (thr != NULL) {
  263. if (thr->state == QUE_THR_COMPLETED) {
  264. que_thr_init_command(thr);
  265. return(thr);
  266. }
  267. thr = UT_LIST_GET_NEXT(thrs, thr);
  268. }
  269. /* Else we return NULL */
  270. return(NULL);
  271. }
  272. /**************************************************************************
  273. After signal handling is finished, returns control to a query graph error
  274. handling routine. (Currently, just returns the control to the root of the
  275. graph so that the graph can communicate an error message to the client.) */
  276. void
  277. que_fork_error_handle(
  278. /*==================*/
  279. trx_t* trx __attribute__((unused)), /* in: trx */
  280. que_t* fork) /* in: query graph which was run before signal
  281. handling started, NULL not allowed */
  282. {
  283. que_thr_t* thr;
  284. #ifdef UNIV_SYNC_DEBUG
  285. ut_ad(mutex_own(&kernel_mutex));
  286. #endif /* UNIV_SYNC_DEBUG */
  287. ut_ad(trx->sess->state == SESS_ERROR);
  288. ut_ad(UT_LIST_GET_LEN(trx->reply_signals) == 0);
  289. ut_ad(UT_LIST_GET_LEN(trx->wait_thrs) == 0);
  290. thr = UT_LIST_GET_FIRST(fork->thrs);
  291. while (thr != NULL) {
  292. ut_ad(!thr->is_active);
  293. ut_ad(thr->state != QUE_THR_SIG_REPLY_WAIT);
  294. ut_ad(thr->state != QUE_THR_LOCK_WAIT);
  295. thr->run_node = thr;
  296. thr->prev_node = thr->child;
  297. thr->state = QUE_THR_COMPLETED;
  298. thr = UT_LIST_GET_NEXT(thrs, thr);
  299. }
  300. thr = UT_LIST_GET_FIRST(fork->thrs);
  301. que_thr_move_to_run_state(thr);
  302. ut_a(0);
  303. srv_que_task_enqueue_low(thr);
  304. }
  305. /********************************************************************
  306. Tests if all the query threads in the same fork have a given state. */
  307. UNIV_INLINE
  308. ibool
  309. que_fork_all_thrs_in_state(
  310. /*=======================*/
  311. /* out: TRUE if all the query threads in the
  312. same fork were in the given state */
  313. que_fork_t* fork, /* in: query fork */
  314. ulint state) /* in: state */
  315. {
  316. que_thr_t* thr_node;
  317. thr_node = UT_LIST_GET_FIRST(fork->thrs);
  318. while (thr_node != NULL) {
  319. if (thr_node->state != state) {
  320. return(FALSE);
  321. }
  322. thr_node = UT_LIST_GET_NEXT(thrs, thr_node);
  323. }
  324. return(TRUE);
  325. }
  326. /**************************************************************************
  327. Calls que_graph_free_recursive for statements in a statement list. */
  328. static
  329. void
  330. que_graph_free_stat_list(
  331. /*=====================*/
  332. que_node_t* node) /* in: first query graph node in the list */
  333. {
  334. while (node) {
  335. que_graph_free_recursive(node);
  336. node = que_node_get_next(node);
  337. }
  338. }
  339. /**************************************************************************
  340. Frees a query graph, but not the heap where it was created. Does not free
  341. explicit cursor declarations, they are freed in que_graph_free. */
  342. void
  343. que_graph_free_recursive(
  344. /*=====================*/
  345. que_node_t* node) /* in: query graph node */
  346. {
  347. que_fork_t* fork;
  348. que_thr_t* thr;
  349. undo_node_t* undo;
  350. sel_node_t* sel;
  351. ins_node_t* ins;
  352. upd_node_t* upd;
  353. tab_node_t* cre_tab;
  354. ind_node_t* cre_ind;
  355. if (node == NULL) {
  356. return;
  357. }
  358. switch (que_node_get_type(node)) {
  359. case QUE_NODE_FORK:
  360. fork = node;
  361. thr = UT_LIST_GET_FIRST(fork->thrs);
  362. while (thr) {
  363. que_graph_free_recursive(thr);
  364. thr = UT_LIST_GET_NEXT(thrs, thr);
  365. }
  366. break;
  367. case QUE_NODE_THR:
  368. thr = node;
  369. if (thr->magic_n != QUE_THR_MAGIC_N) {
  370. fprintf(stderr,
  371. "que_thr struct appears corrupt; magic n %lun",
  372. (unsigned long) thr->magic_n);
  373. mem_analyze_corruption((byte*)thr);
  374. ut_error;
  375. }
  376. thr->magic_n = QUE_THR_MAGIC_FREED;
  377. que_graph_free_recursive(thr->child);
  378. break;
  379. case QUE_NODE_UNDO:
  380. undo = node;
  381. mem_heap_free(undo->heap);
  382. break;
  383. case QUE_NODE_SELECT:
  384. sel = node;
  385. sel_node_free_private(sel);
  386. break;
  387. case QUE_NODE_INSERT:
  388. ins = node;
  389. que_graph_free_recursive(ins->select);
  390. mem_heap_free(ins->entry_sys_heap);
  391. break;
  392. case QUE_NODE_UPDATE:
  393. upd = node;
  394. if (upd->in_mysql_interface) {
  395. btr_pcur_free_for_mysql(upd->pcur);
  396. }
  397. que_graph_free_recursive(upd->cascade_node);
  398. if (upd->cascade_heap) {
  399. mem_heap_free(upd->cascade_heap);
  400. }
  401. que_graph_free_recursive(upd->select);
  402. mem_heap_free(upd->heap);
  403. break;
  404. case QUE_NODE_CREATE_TABLE:
  405. cre_tab = node;
  406. que_graph_free_recursive(cre_tab->tab_def);
  407. que_graph_free_recursive(cre_tab->col_def);
  408. que_graph_free_recursive(cre_tab->commit_node);
  409. mem_heap_free(cre_tab->heap);
  410. break;
  411. case QUE_NODE_CREATE_INDEX:
  412. cre_ind = node;
  413. que_graph_free_recursive(cre_ind->ind_def);
  414. que_graph_free_recursive(cre_ind->field_def);
  415. que_graph_free_recursive(cre_ind->commit_node);
  416. mem_heap_free(cre_ind->heap);
  417. break;
  418. case QUE_NODE_PROC:
  419. que_graph_free_stat_list(((proc_node_t*)node)->stat_list);
  420. break;
  421. case QUE_NODE_IF:
  422. que_graph_free_stat_list(((if_node_t*)node)->stat_list);
  423. que_graph_free_stat_list(((if_node_t*)node)->else_part);
  424. que_graph_free_stat_list(((if_node_t*)node)->elsif_list);
  425. break;
  426. case QUE_NODE_ELSIF:
  427. que_graph_free_stat_list(((elsif_node_t*)node)->stat_list);
  428. break;
  429. case QUE_NODE_WHILE:
  430. que_graph_free_stat_list(((while_node_t*)node)->stat_list);
  431. break;
  432. case QUE_NODE_FOR:
  433. que_graph_free_stat_list(((for_node_t*)node)->stat_list);
  434. break;
  435. case QUE_NODE_ASSIGNMENT:
  436. case QUE_NODE_RETURN:
  437. case QUE_NODE_COMMIT:
  438. case QUE_NODE_ROLLBACK:
  439. case QUE_NODE_LOCK:
  440. case QUE_NODE_FUNC:
  441. case QUE_NODE_ORDER:
  442. case QUE_NODE_ROW_PRINTF:
  443. case QUE_NODE_OPEN:
  444. case QUE_NODE_FETCH:
  445. /* No need to do anything */
  446. break;
  447. default:
  448. fprintf(stderr,
  449. "que_node struct appears corrupt; type %lun",
  450. (unsigned long) que_node_get_type(node));
  451. mem_analyze_corruption((byte*)node);
  452. ut_error;
  453. }
  454. }
  455. /**************************************************************************
  456. Frees a query graph. */
  457. void
  458. que_graph_free(
  459. /*===========*/
  460. que_t* graph) /* in: query graph; we assume that the memory
  461. heap where this graph was created is private
  462. to this graph: if not, then use
  463. que_graph_free_recursive and free the heap
  464. afterwards! */
  465. {
  466. ut_ad(graph);
  467. if (graph->sym_tab) {
  468. /* The following call frees dynamic memory allocated
  469. for variables etc. during execution. Frees also explicit
  470. cursor definitions. */
  471. sym_tab_free_private(graph->sym_tab);
  472. }
  473. que_graph_free_recursive(graph);
  474. mem_heap_free(graph->heap);
  475. }
  476. /**************************************************************************
  477. Checks if the query graph is in a state where it should be freed, and
  478. frees it in that case. If the session is in a state where it should be
  479. closed, also this is done. */
  480. ibool
  481. que_graph_try_free(
  482. /*===============*/
  483. /* out: TRUE if freed */
  484. que_t* graph) /* in: query graph */
  485. {
  486. sess_t* sess;
  487. #ifdef UNIV_SYNC_DEBUG
  488. ut_ad(mutex_own(&kernel_mutex));
  489. #endif /* UNIV_SYNC_DEBUG */
  490. sess = (graph->trx)->sess;
  491. if ((graph->state == QUE_FORK_BEING_FREED)
  492. && (graph->n_active_thrs == 0)) {
  493. UT_LIST_REMOVE(graphs, sess->graphs, graph);
  494. que_graph_free(graph);
  495. sess_try_close(sess);
  496. return(TRUE);
  497. }
  498. return(FALSE);
  499. }
  500. /**************************************************************************
  501. Handles an SQL error noticed during query thread execution. Currently,
  502. does nothing! */
  503. void
  504. que_thr_handle_error(
  505. /*=================*/
  506. que_thr_t* thr __attribute__((unused)),
  507. /* in: query thread */
  508. ulint err_no __attribute__((unused)),
  509. /* in: error number */
  510. byte* err_str __attribute__((unused)),
  511. /* in, own: error string or NULL; NOTE: the
  512. function will take care of freeing of the
  513. string! */
  514. ulint err_len __attribute__((unused)))
  515. /* in: error string length */
  516. {
  517. /* Does nothing */
  518. }
  519. /********************************************************************
  520. Performs an execution step on a thr node. */
  521. static
  522. que_thr_t*
  523. que_thr_node_step(
  524. /*==============*/
  525. /* out: query thread to run next, or NULL
  526. if none */
  527. que_thr_t* thr) /* in: query thread where run_node must
  528. be the thread node itself */
  529. {
  530. ut_ad(thr->run_node == thr);
  531. if (thr->prev_node == thr->common.parent) {
  532. /* If control to the node came from above, it is just passed
  533. on */
  534. thr->run_node = thr->child;
  535. return(thr);
  536. }
  537. mutex_enter(&kernel_mutex);
  538. if (que_thr_peek_stop(thr)) {
  539. mutex_exit(&kernel_mutex);
  540. return(thr);
  541. }
  542. /* Thread execution completed */
  543. thr->state = QUE_THR_COMPLETED;
  544. mutex_exit(&kernel_mutex);
  545. return(NULL);
  546. }
  547. /**************************************************************************
  548. Moves a thread from another state to the QUE_THR_RUNNING state. Increments
  549. the n_active_thrs counters of the query graph and transaction if thr was
  550. not active.
  551. ***NOTE***: This and ..._mysql are  the only functions in which such a
  552. transition is allowed to happen! */
  553. static
  554. void
  555. que_thr_move_to_run_state(
  556. /*======================*/
  557. que_thr_t* thr) /* in: an query thread */
  558. {
  559. trx_t* trx;
  560. ut_ad(thr->state != QUE_THR_RUNNING);
  561. trx = thr_get_trx(thr);
  562. if (!thr->is_active) {
  563. (thr->graph)->n_active_thrs++;
  564. trx->n_active_thrs++;
  565. thr->is_active = TRUE;
  566. ut_ad((thr->graph)->n_active_thrs == 1);
  567. ut_ad(trx->n_active_thrs == 1);
  568. }
  569. thr->state = QUE_THR_RUNNING;
  570. }
  571. /**************************************************************************
  572. Decrements the query thread reference counts in the query graph and the
  573. transaction. May start signal handling, e.g., a rollback.
  574. *** NOTE ***:
  575. This and que_thr_stop_for_mysql are
  576. the only functions where the reference count can be decremented and
  577. this function may only be called from inside que_run_threads or
  578. que_thr_check_if_switch! These restrictions exist to make the rollback code
  579. easier to maintain. */
  580. static
  581. void
  582. que_thr_dec_refer_count(
  583. /*====================*/
  584. que_thr_t* thr, /* in: query thread */
  585. que_thr_t** next_thr) /* in/out: next query thread to run;
  586. if the value which is passed in is
  587. a pointer to a NULL pointer, then the
  588. calling function can start running
  589. a new query thread */ 
  590. {
  591. que_fork_t* fork;
  592. trx_t* trx;
  593. sess_t* sess;
  594. ulint fork_type;
  595. ibool stopped;
  596. fork = thr->common.parent;
  597. trx = thr->graph->trx;
  598. sess = trx->sess;
  599. mutex_enter(&kernel_mutex);
  600. ut_a(thr->is_active);
  601. if (thr->state == QUE_THR_RUNNING) {
  602. stopped = que_thr_stop(thr);
  603. if (!stopped) {
  604. /* The reason for the thr suspension or wait was
  605. already canceled before we came here: continue
  606. running the thread */
  607. /* fputs("!!!!!!!! Wait already ended: continue thrn",
  608. stderr); */
  609. if (next_thr && *next_thr == NULL) {
  610. *next_thr = thr;
  611. } else {
  612. ut_a(0);
  613. srv_que_task_enqueue_low(thr);
  614. }
  615. mutex_exit(&kernel_mutex);
  616. return;
  617. }
  618. }
  619. ut_ad(fork->n_active_thrs == 1);
  620. ut_ad(trx->n_active_thrs == 1);
  621. fork->n_active_thrs--;
  622. trx->n_active_thrs--;
  623. thr->is_active = FALSE;
  624. if (trx->n_active_thrs > 0) {
  625. mutex_exit(&kernel_mutex);
  626. return;
  627. }
  628. fork_type = fork->fork_type;
  629. /* Check if all query threads in the same fork are completed */
  630. if (que_fork_all_thrs_in_state(fork, QUE_THR_COMPLETED)) {
  631. if (fork_type == QUE_FORK_ROLLBACK) {
  632. /* This is really the undo graph used in rollback,
  633. no roll_node in this graph */
  634. ut_ad(UT_LIST_GET_LEN(trx->signals) > 0);
  635. ut_ad(trx->handling_signals == TRUE);
  636. trx_finish_rollback_off_kernel(fork, trx, next_thr);
  637. } else if (fork_type == QUE_FORK_PURGE) {
  638. /* Do nothing */
  639. } else if (fork_type == QUE_FORK_RECOVERY) {
  640. /* Do nothing */
  641. } else if (fork_type == QUE_FORK_MYSQL_INTERFACE) {
  642. /* Do nothing */
  643. } else {
  644. ut_error; /* not used in MySQL */
  645. }
  646. }
  647. if (UT_LIST_GET_LEN(trx->signals) > 0 && trx->n_active_thrs == 0) {
  648.      /* If the trx is signaled and its query thread count drops to
  649. zero, then we start processing a signal; from it we may get
  650. a new query thread to run */
  651. trx_sig_start_handle(trx, next_thr);
  652. }
  653. if (trx->handling_signals && UT_LIST_GET_LEN(trx->signals) == 0) {
  654. trx_end_signal_handling(trx);
  655. }
  656. mutex_exit(&kernel_mutex);
  657. }
  658. /**************************************************************************
  659. Stops a query thread if graph or trx is in a state requiring it. The
  660. conditions are tested in the order (1) graph, (2) trx. The kernel mutex has
  661. to be reserved. */
  662. ibool
  663. que_thr_stop(
  664. /*=========*/
  665. /* out: TRUE if stopped */
  666. que_thr_t* thr) /* in: query thread */
  667. {
  668. trx_t* trx;
  669. que_t* graph;
  670. ibool ret = TRUE;
  671. #ifdef UNIV_SYNC_DEBUG
  672. ut_ad(mutex_own(&kernel_mutex));
  673. #endif /* UNIV_SYNC_DEBUG */
  674. graph = thr->graph;
  675. trx = graph->trx;
  676. if (graph->state == QUE_FORK_COMMAND_WAIT) {
  677. thr->state = QUE_THR_SUSPENDED;
  678. } else if (trx->que_state == TRX_QUE_LOCK_WAIT) {
  679. UT_LIST_ADD_FIRST(trx_thrs, trx->wait_thrs, thr);
  680. thr->state = QUE_THR_LOCK_WAIT;
  681. } else if (trx->error_state != DB_SUCCESS
  682. && trx->error_state != DB_LOCK_WAIT) {
  683. /* Error handling built for the MySQL interface */
  684. thr->state = QUE_THR_COMPLETED;
  685. } else if (UT_LIST_GET_LEN(trx->signals) > 0
  686. && graph->fork_type != QUE_FORK_ROLLBACK) {
  687. thr->state = QUE_THR_SUSPENDED;
  688. } else {
  689. ut_ad(graph->state == QUE_FORK_ACTIVE);
  690. ret = FALSE;
  691. }         
  692. return(ret);
  693. }
  694. /**************************************************************************
  695. A patch for MySQL used to 'stop' a dummy query thread used in MySQL. The
  696. query thread is stopped and made inactive, except in the case where
  697. it was put to the lock wait state in lock0lock.c, but the lock has already
  698. been granted or the transaction chosen as a victim in deadlock resolution. */
  699. void
  700. que_thr_stop_for_mysql(
  701. /*===================*/
  702. que_thr_t* thr) /* in: query thread */
  703. {
  704. trx_t* trx;
  705. trx = thr_get_trx(thr);
  706. mutex_enter(&kernel_mutex);
  707. if (thr->state == QUE_THR_RUNNING) {
  708. if (trx->error_state != DB_SUCCESS
  709.     && trx->error_state != DB_LOCK_WAIT) {
  710. /* Error handling built for the MySQL interface */
  711. thr->state = QUE_THR_COMPLETED;
  712. } else {
  713. /* It must have been a lock wait but the lock was
  714. already released, or this transaction was chosen
  715. as a victim in selective deadlock resolution */
  716. mutex_exit(&kernel_mutex);
  717. return;
  718. }
  719. }
  720. ut_ad(thr->is_active == TRUE);
  721. ut_ad(trx->n_active_thrs == 1);
  722. ut_ad(thr->graph->n_active_thrs == 1);
  723. thr->is_active = FALSE;
  724. (thr->graph)->n_active_thrs--;
  725. trx->n_active_thrs--;
  726. mutex_exit(&kernel_mutex);
  727. }
  728. /**************************************************************************
  729. Moves a thread from another state to the QUE_THR_RUNNING state. Increments
  730. the n_active_thrs counters of the query graph and transaction if thr was
  731. not active. */
  732. void
  733. que_thr_move_to_run_state_for_mysql(
  734. /*================================*/
  735. que_thr_t* thr, /* in: an query thread */
  736. trx_t* trx) /* in: transaction */
  737. {
  738. if (thr->magic_n != QUE_THR_MAGIC_N) {
  739. fprintf(stderr,
  740. "que_thr struct appears corrupt; magic n %lun",
  741. (unsigned long) thr->magic_n);
  742. mem_analyze_corruption((byte*)thr);
  743. ut_error;
  744. }
  745. if (!thr->is_active) {
  746. thr->graph->n_active_thrs++;
  747. trx->n_active_thrs++;
  748. thr->is_active = TRUE;
  749. }
  750. thr->state = QUE_THR_RUNNING;
  751. }
  752. /**************************************************************************
  753. A patch for MySQL used to 'stop' a dummy query thread used in MySQL
  754. select, when there is no error or lock wait. */
  755. void
  756. que_thr_stop_for_mysql_no_error(
  757. /*============================*/
  758. que_thr_t* thr, /* in: query thread */
  759. trx_t* trx) /* in: transaction */
  760. {
  761. ut_ad(thr->state == QUE_THR_RUNNING);
  762. ut_ad(thr->is_active == TRUE);
  763. ut_ad(trx->n_active_thrs == 1);
  764. ut_ad(thr->graph->n_active_thrs == 1);
  765. if (thr->magic_n != QUE_THR_MAGIC_N) {
  766. fprintf(stderr,
  767. "que_thr struct appears corrupt; magic n %lun",
  768. (unsigned long) thr->magic_n);
  769. mem_analyze_corruption((byte*)thr);
  770. ut_error;
  771. }
  772. thr->state = QUE_THR_COMPLETED;
  773. thr->is_active = FALSE;
  774. (thr->graph)->n_active_thrs--;
  775. trx->n_active_thrs--;
  776. }
  777. /**************************************************************************
  778. Prints info of an SQL query graph node. */
  779. void
  780. que_node_print_info(
  781. /*================*/
  782. que_node_t* node) /* in: query graph node */
  783. {
  784. ulint type;
  785. const char* str;
  786. type = que_node_get_type(node);
  787. if (type == QUE_NODE_SELECT) {
  788. str = "SELECT";
  789. } else if (type == QUE_NODE_INSERT) {
  790. str = "INSERT";
  791. } else if (type == QUE_NODE_UPDATE) {
  792. str = "UPDATE";
  793. } else if (type == QUE_NODE_WHILE) {
  794. str = "WHILE";
  795. } else if (type == QUE_NODE_ASSIGNMENT) {
  796. str = "ASSIGNMENT";
  797. } else if (type == QUE_NODE_IF) {
  798. str = "IF";
  799. } else if (type == QUE_NODE_FETCH) {
  800. str = "FETCH";
  801. } else if (type == QUE_NODE_OPEN) {
  802. str = "OPEN";
  803. } else if (type == QUE_NODE_PROC) {
  804. str = "STORED PROCEDURE";
  805. } else if (type == QUE_NODE_FUNC) {
  806. str = "FUNCTION";
  807. } else if (type == QUE_NODE_LOCK) {
  808. str = "LOCK";
  809. } else if (type == QUE_NODE_THR) {
  810. str = "QUERY THREAD";
  811. } else if (type == QUE_NODE_COMMIT) {
  812. str = "COMMIT";
  813. } else if (type == QUE_NODE_UNDO) {
  814. str = "UNDO ROW";
  815. } else if (type == QUE_NODE_PURGE) {
  816. str = "PURGE ROW";
  817. } else if (type == QUE_NODE_ROLLBACK) {
  818. str = "ROLLBACK";
  819. } else if (type == QUE_NODE_CREATE_TABLE) {
  820. str = "CREATE TABLE";
  821. } else if (type == QUE_NODE_CREATE_INDEX) {
  822. str = "CREATE INDEX";
  823. } else if (type == QUE_NODE_FOR) {
  824. str = "FOR LOOP";
  825. } else if (type == QUE_NODE_RETURN) {
  826. str = "RETURN";
  827. } else {
  828. str = "UNKNOWN NODE TYPE";
  829. }
  830. fprintf(stderr, "Node type %lu: %s, address %pn", (ulong) type, str, node);
  831. }
  832. /**************************************************************************
  833. Performs an execution step on a query thread. */
  834. UNIV_INLINE
  835. que_thr_t*
  836. que_thr_step(
  837. /*=========*/
  838. /* out: query thread to run next: it may
  839. differ from the input parameter if, e.g., a
  840. subprocedure call is made */ 
  841. que_thr_t* thr) /* in: query thread */
  842. {
  843. que_node_t* node;
  844. que_thr_t* old_thr;
  845. trx_t* trx;
  846. ulint type;
  847. ut_ad(thr->state == QUE_THR_RUNNING);
  848. thr->resource++;
  849. type = que_node_get_type(thr->run_node);
  850. node = thr->run_node;
  851. old_thr = thr;
  852. #ifdef UNIV_DEBUG
  853. if (que_trace_on) {
  854. fputs("To execute: ", stderr);
  855. que_node_print_info(node);
  856. }
  857. #endif
  858. if (type & QUE_NODE_CONTROL_STAT) {
  859. if ((thr->prev_node != que_node_get_parent(node))
  860. && que_node_get_next(thr->prev_node)) {
  861. /* The control statements, like WHILE, always pass the
  862. control to the next child statement if there is any
  863. child left */
  864. thr->run_node = que_node_get_next(thr->prev_node);
  865. } else if (type == QUE_NODE_IF) {
  866. if_step(thr);
  867. } else if (type == QUE_NODE_FOR) {
  868. for_step(thr);
  869. } else if (type == QUE_NODE_PROC) {
  870. /* We can access trx->undo_no without reserving
  871. trx->undo_mutex, because there cannot be active query
  872. threads doing updating or inserting at the moment! */
  873. if (thr->prev_node == que_node_get_parent(node)) {
  874. trx = thr_get_trx(thr);
  875. trx->last_sql_stat_start.least_undo_no
  876. = trx->undo_no;
  877. }
  878. proc_step(thr);
  879. } else if (type == QUE_NODE_WHILE) {
  880. while_step(thr);
  881. }
  882. } else if (type == QUE_NODE_ASSIGNMENT) {
  883. assign_step(thr);
  884. } else if (type == QUE_NODE_SELECT) {
  885. thr = row_sel_step(thr);
  886. } else if (type == QUE_NODE_INSERT) {
  887. thr = row_ins_step(thr);
  888. } else if (type == QUE_NODE_UPDATE) {
  889. thr = row_upd_step(thr);
  890. } else if (type == QUE_NODE_FETCH) {
  891. thr = fetch_step(thr);
  892. } else if (type == QUE_NODE_OPEN) {
  893. thr = open_step(thr);
  894. } else if (type == QUE_NODE_FUNC) {
  895. proc_eval_step(thr);
  896. } else if (type == QUE_NODE_LOCK) {
  897. ut_error;
  898. /*
  899. thr = que_lock_step(thr);
  900. */
  901. } else if (type == QUE_NODE_THR) {
  902. thr = que_thr_node_step(thr);
  903. } else if (type == QUE_NODE_COMMIT) {
  904. thr = trx_commit_step(thr);
  905. } else if (type == QUE_NODE_UNDO) {
  906. thr = row_undo_step(thr);
  907. } else if (type == QUE_NODE_PURGE) {
  908. thr = row_purge_step(thr);
  909. } else if (type == QUE_NODE_RETURN) {
  910. thr = return_step(thr);
  911. } else if (type == QUE_NODE_ROLLBACK) {
  912. thr = trx_rollback_step(thr);
  913. } else if (type == QUE_NODE_CREATE_TABLE) {
  914. thr = dict_create_table_step(thr);
  915. } else if (type == QUE_NODE_CREATE_INDEX) {
  916. thr = dict_create_index_step(thr);
  917. } else if (type == QUE_NODE_ROW_PRINTF) {
  918. thr = row_printf_step(thr);
  919. } else {
  920. ut_error;
  921. }
  922. old_thr->prev_node = node;
  923. return(thr);
  924. }
  925. /**************************************************************************
  926. Runs query threads. Note that the individual query thread which is run
  927. within this function may change if, e.g., the OS thread executing this
  928. function uses a threshold amount of resources. */
  929. void
  930. que_run_threads(
  931. /*============*/
  932. que_thr_t* thr) /* in: query thread which is run initially */
  933. {
  934. que_thr_t* next_thr;
  935. ulint cumul_resource;
  936. ulint loop_count;
  937. ut_ad(thr->state == QUE_THR_RUNNING);
  938. #ifdef UNIV_SYNC_DEBUG
  939. ut_ad(!mutex_own(&kernel_mutex));
  940. #endif /* UNIV_SYNC_DEBUG */
  941. /* cumul_resource counts how much resources the OS thread (NOT the
  942. query thread) has spent in this function */
  943. loop_count = QUE_MAX_LOOPS_WITHOUT_CHECK;
  944. cumul_resource = 0;
  945. loop:
  946. /* Check that there is enough space in the log to accommodate
  947. possible log entries by this query step; if the operation can touch
  948. more than about 4 pages, checks must be made also within the query
  949. step! */
  950. log_free_check();
  951. /* Perform the actual query step: note that the query thread
  952. may change if, e.g., a subprocedure call is made */
  953. /*-------------------------*/
  954. next_thr = que_thr_step(thr);
  955. /*-------------------------*/
  956. /* Test the effect on performance of adding extra mutex
  957. reservations */
  958. /* if (srv_test_extra_mutexes) {
  959. mutex_enter(&kernel_mutex);
  960. mutex_exit(&kernel_mutex);
  961. }
  962. */
  963. loop_count++;
  964. if (next_thr != thr) {
  965. ut_a(next_thr == NULL);
  966. que_thr_dec_refer_count(thr, &next_thr);
  967. if (next_thr == NULL) {
  968. return;
  969. }
  970. loop_count = QUE_MAX_LOOPS_WITHOUT_CHECK;
  971. thr = next_thr;
  972. }
  973. goto loop;
  974. }