que0que.c
上传用户:tsgydb
上传日期:2007-04-14
资源大小:10674k
文件大小:33k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /******************************************************
  2. Query graph
  3. (c) 1996 Innobase Oy
  4. Created 5/27/1996 Heikki Tuuri
  5. *******************************************************/
  6. #include "que0que.h"
  7. #ifdef UNIV_NONINL
  8. #include "que0que.ic"
  9. #endif
  10. #include "srv0que.h"
  11. #include "usr0sess.h"
  12. #include "trx0trx.h"
  13. #include "trx0roll.h"
  14. #include "row0undo.h"
  15. #include "row0ins.h"
  16. #include "row0upd.h"
  17. #include "row0sel.h"
  18. #include "row0purge.h"
  19. #include "dict0crea.h"
  20. #include "log0log.h"
  21. #include "eval0proc.h"
  22. #include "eval0eval.h"
  23. #include "odbc0odbc.h"
  24. #define QUE_PARALLELIZE_LIMIT (64 * 256 * 256 * 256)
  25. #define QUE_ROUND_ROBIN_LIMIT (64 * 256 * 256 * 256)
  26. #define QUE_MAX_LOOPS_WITHOUT_CHECK 16
  27. /* If the following flag is set TRUE, the module will print trace info
  28. of SQL execution in the UNIV_SQL_DEBUG version */
  29. ibool que_trace_on = FALSE;
  30. ibool que_always_false = FALSE;
  31. /* How a stored procedure containing COMMIT or ROLLBACK commands
  32. is executed?
  33. The commit or rollback can be seen as a subprocedure call.
  34. The problem is that if there are several query threads
  35. currently running within the transaction, their action could
  36. mess the commit or rollback operation. Or, at the least, the
  37. operation would be difficult to visualize and keep in control.
  38. Therefore the query thread requesting a commit or a rollback
  39. sends to the transaction a signal, which moves the transaction
  40. to TRX_QUE_SIGNALED state. All running query threads of the
  41. transaction will eventually notice that the transaction is now in
  42. this state and voluntarily suspend themselves. Only the last
  43. query thread which suspends itself will trigger handling of
  44. the signal.
  45. When the transaction starts to handle a rollback or commit
  46. signal, it builds a query graph which, when executed, will
  47. roll back or commit the incomplete transaction. The transaction
  48. is moved to the TRX_QUE_ROLLING_BACK or TRX_QUE_COMMITTING state.
  49. If specified, the SQL cursors opened by the transaction are closed.
  50. When the execution of the graph completes, it is like returning
  51. from a subprocedure: the query thread which requested the operation
  52. starts running again. */
  53. /**************************************************************************
  54. Moves a thread from another state to the QUE_THR_RUNNING state. Increments
  55. the n_active_thrs counters of the query graph and transaction.
  56. ***NOTE***: This is the only function in which such a transition is allowed
  57. to happen! */
  58. static
  59. void
  60. que_thr_move_to_run_state(
  61. /*======================*/
  62. que_thr_t* thr); /* in: an query thread */
  63. /**************************************************************************
  64. Tries to parallelize query if it is not parallel enough yet. */
  65. static
  66. que_thr_t*
  67. que_try_parallelize(
  68. /*================*/
  69. /* out: next thread to execute */
  70. que_thr_t* thr); /* in: query thread */
  71. #ifdef notdefined
  72. /********************************************************************
  73. Adds info about the number of inserted rows etc. to the message to the
  74. client. */
  75. static
  76. void
  77. que_thr_add_update_info(
  78. /*====================*/
  79. que_thr_t* thr) /* in: query thread */
  80. {
  81. que_fork_t* graph;
  82. graph = thr->graph;
  83. mach_write_to_8(thr->msg_buf + SESS_SRV_MSG_N_INSERTS,
  84. graph->n_inserts);
  85. mach_write_to_8(thr->msg_buf + SESS_SRV_MSG_N_UPDATES,
  86. graph->n_updates);
  87. mach_write_to_8(thr->msg_buf + SESS_SRV_MSG_N_DELETES,
  88. graph->n_deletes);
  89. }
  90. #endif
  91. /***************************************************************************
  92. Adds a query graph to the session's list of graphs. */
  93. void
  94. que_graph_publish(
  95. /*==============*/
  96. que_t* graph, /* in: graph */
  97. sess_t* sess) /* in: session */
  98. {
  99. ut_ad(mutex_own(&kernel_mutex));
  100. UT_LIST_ADD_LAST(graphs, sess->graphs, graph);
  101. }
  102. /***************************************************************************
  103. Creates a query graph fork node. */
  104. que_fork_t*
  105. que_fork_create(
  106. /*============*/
  107. /* out, own: fork node */
  108. que_t* graph, /* in: graph, if NULL then this
  109. fork node is assumed to be the
  110. graph root */
  111. que_node_t* parent, /* in: parent node */
  112. ulint fork_type, /* in: fork type */
  113. mem_heap_t* heap) /* in: memory heap where created */
  114. {
  115. que_fork_t* fork;
  116. ut_ad(heap);
  117. fork = mem_heap_alloc(heap, sizeof(que_fork_t));
  118. fork->common.type = QUE_NODE_FORK;
  119. fork->n_active_thrs = 0;
  120. fork->state = QUE_FORK_COMMAND_WAIT;
  121. if (graph != NULL) {
  122. fork->graph = graph;
  123. } else {
  124. fork->graph = fork;
  125. }
  126. fork->common.parent = parent;
  127. fork->fork_type = fork_type;
  128. fork->caller = NULL;
  129. UT_LIST_INIT(fork->thrs);
  130. fork->sym_tab = NULL;
  131. fork->heap = heap;
  132. return(fork);
  133. }
  134. /***************************************************************************
  135. Creates a query graph thread node. */
  136. que_thr_t*
  137. que_thr_create(
  138. /*===========*/
  139. /* out, own: query thread node */
  140. que_fork_t* parent, /* in: parent node, i.e., a fork node */
  141. mem_heap_t* heap) /* in: memory heap where created */
  142. {
  143. que_thr_t* thr;
  144. ut_ad(parent && heap);
  145. thr = mem_heap_alloc(heap, sizeof(que_thr_t));
  146. thr->common.type = QUE_NODE_THR;
  147. thr->common.parent = parent;
  148. thr->graph = parent->graph;
  149. thr->state = QUE_THR_COMMAND_WAIT;
  150. thr->is_active = FALSE;
  151. thr->run_node = NULL;
  152. thr->resource = 0;
  153. UT_LIST_ADD_LAST(thrs, parent->thrs, thr);
  154. return(thr);
  155. }
  156. /**************************************************************************
  157. Moves a suspended query thread to the QUE_THR_RUNNING state and may release
  158. a single worker thread to execute it. This function should be used to end
  159. the wait state of a query thread waiting for a lock or a stored procedure
  160. completion. */
  161. void
  162. que_thr_end_wait(
  163. /*=============*/
  164. que_thr_t* thr, /* in: query thread in the
  165. QUE_THR_LOCK_WAIT,
  166. or QUE_THR_PROCEDURE_WAIT, or
  167. QUE_THR_SIG_REPLY_WAIT state */
  168. que_thr_t** next_thr) /* in/out: next query thread to run;
  169. if the value which is passed in is
  170. a pointer to a NULL pointer, then the
  171. calling function can start running
  172. a new query thread; if NULL is passed
  173. as the parameter, it is ignored */
  174. {
  175. ibool was_active;
  176. ut_ad(mutex_own(&kernel_mutex));
  177. ut_ad(thr);
  178. ut_ad((thr->state == QUE_THR_LOCK_WAIT)
  179.       || (thr->state == QUE_THR_PROCEDURE_WAIT)
  180.       || (thr->state == QUE_THR_SIG_REPLY_WAIT));
  181. ut_ad(thr->run_node);
  182. thr->prev_node = thr->run_node;
  183. was_active = thr->is_active;
  184. que_thr_move_to_run_state(thr);
  185. if (was_active) {
  186. return;
  187. }
  188. if (next_thr && *next_thr == NULL) {
  189. *next_thr = thr;
  190. } else {
  191. srv_que_task_enqueue_low(thr);
  192. }
  193. }
  194. /**************************************************************************
  195. Same as que_thr_end_wait, but no parameter next_thr available. */
  196. void
  197. que_thr_end_wait_no_next_thr(
  198. /*=========================*/
  199. que_thr_t* thr) /* in: query thread in the QUE_THR_LOCK_WAIT,
  200. or QUE_THR_PROCEDURE_WAIT, or
  201. QUE_THR_SIG_REPLY_WAIT state */
  202. {
  203. ibool was_active;
  204. ut_a(thr->state == QUE_THR_LOCK_WAIT); /* In MySQL this is the
  205. only possible state here */
  206. ut_ad(mutex_own(&kernel_mutex));
  207. ut_ad(thr);
  208. ut_ad((thr->state == QUE_THR_LOCK_WAIT)
  209.       || (thr->state == QUE_THR_PROCEDURE_WAIT)
  210.       || (thr->state == QUE_THR_SIG_REPLY_WAIT));
  211. was_active = thr->is_active;
  212. que_thr_move_to_run_state(thr);
  213. if (was_active) {
  214. return;
  215. }
  216. /* In MySQL we let the OS thread (not just the query thread) to wait
  217. for the lock to be released: */
  218. srv_release_mysql_thread_if_suspended(thr);
  219. /* srv_que_task_enqueue_low(thr); */
  220. }
  221. /**************************************************************************
  222. Inits a query thread for a command. */
  223. UNIV_INLINE
  224. void
  225. que_thr_init_command(
  226. /*=================*/
  227. que_thr_t* thr) /* in: query thread */
  228. {
  229. thr->run_node = thr;
  230. thr->prev_node = thr->common.parent;
  231. que_thr_move_to_run_state(thr);
  232. }
  233. /**************************************************************************
  234. Starts execution of a command in a query fork. Picks a query thread which
  235. is not in the QUE_THR_RUNNING state and moves it to that state. If none
  236. can be chosen, a situation which may arise in parallelized fetches, NULL
  237. is returned. */
  238. que_thr_t*
  239. que_fork_start_command(
  240. /*===================*/
  241. /* out: a query thread of the graph moved to
  242. QUE_THR_RUNNING state, or NULL; the query
  243. thread should be executed by que_run_threads
  244. by the caller */
  245. que_fork_t*  fork, /* in: a query fork */
  246. ulint command,/* in: command SESS_COMM_FETCH_NEXT, ... */
  247. ulint param) /* in: possible parameter to the command */
  248. {
  249. que_thr_t* thr;
  250. /* Set the command parameters in the fork root */
  251. fork->command = command;
  252. fork->param = param;
  253. fork->state = QUE_FORK_ACTIVE;
  254. fork->last_sel_node = NULL;
  255. /* Choose the query thread to run: usually there is just one thread,
  256. but in a parallelized select, which necessarily is non-scrollable,
  257. there may be several to choose from */
  258. /*---------------------------------------------------------------
  259. First we try to find a query thread in the QUE_THR_COMMAND_WAIT state */
  260. thr = UT_LIST_GET_FIRST(fork->thrs);
  261. while (thr != NULL) {
  262. if (thr->state == QUE_THR_COMMAND_WAIT) {
  263. /* We have to send the initial message to query thread
  264. to start it */
  265. que_thr_init_command(thr);
  266. return(thr);
  267. }
  268. ut_ad(thr->state != QUE_THR_LOCK_WAIT);
  269. thr = UT_LIST_GET_NEXT(thrs, thr);
  270. }
  271. /*----------------------------------------------------------------
  272. Then we try to find a query thread in the QUE_THR_SUSPENDED state */
  273. thr = UT_LIST_GET_FIRST(fork->thrs);
  274. while (thr != NULL) {
  275. if (thr->state == QUE_THR_SUSPENDED) {
  276. /* In this case the execution of the thread was
  277. suspended: no initial message is needed because
  278. execution can continue from where it was left */
  279. que_thr_move_to_run_state(thr);
  280. return(thr);
  281. }
  282. thr = UT_LIST_GET_NEXT(thrs, thr);
  283. }
  284. /*-----------------------------------------------------------------
  285. Then we try to find a query thread in the QUE_THR_COMPLETED state */
  286. thr = UT_LIST_GET_FIRST(fork->thrs);
  287. while (thr != NULL) {
  288. if (thr->state == QUE_THR_COMPLETED) {
  289. que_thr_init_command(thr);
  290. return(thr);
  291. }
  292. thr = UT_LIST_GET_NEXT(thrs, thr);
  293. }
  294. /* Else we return NULL */
  295. return(NULL);
  296. }
  297. /**************************************************************************
  298. After signal handling is finished, returns control to a query graph error
  299. handling routine. (Currently, just returns the control to the root of the
  300. graph so that the graph can communicate an error message to the client.) */
  301. void
  302. que_fork_error_handle(
  303. /*==================*/
  304. trx_t* trx, /* in: trx */
  305. que_t* fork) /* in: query graph which was run before signal
  306. handling started, NULL not allowed */
  307. {
  308. que_thr_t* thr;
  309. ut_ad(mutex_own(&kernel_mutex));
  310. ut_ad(trx->sess->state == SESS_ERROR);
  311. ut_ad(UT_LIST_GET_LEN(trx->reply_signals) == 0);
  312. ut_ad(UT_LIST_GET_LEN(trx->wait_thrs) == 0);
  313. thr = UT_LIST_GET_FIRST(fork->thrs);
  314. while (thr != NULL) {
  315. ut_ad(!thr->is_active);
  316. ut_ad(thr->state != QUE_THR_SIG_REPLY_WAIT);
  317. ut_ad(thr->state != QUE_THR_LOCK_WAIT);
  318. thr->run_node = thr;
  319. thr->prev_node = thr->child;
  320. thr->state = QUE_THR_COMPLETED;
  321. thr = UT_LIST_GET_NEXT(thrs, thr);
  322. }
  323. thr = UT_LIST_GET_FIRST(fork->thrs);
  324. que_thr_move_to_run_state(thr);
  325. srv_que_task_enqueue_low(thr);
  326. }
  327. /********************************************************************
  328. Tests if all the query threads in the same fork have a given state. */
  329. UNIV_INLINE
  330. ibool
  331. que_fork_all_thrs_in_state(
  332. /*=======================*/
  333. /* out: TRUE if all the query threads in the
  334. same fork were in the given state */
  335. que_fork_t* fork, /* in: query fork */
  336. ulint state) /* in: state */
  337. {
  338. que_thr_t* thr_node;
  339. thr_node = UT_LIST_GET_FIRST(fork->thrs);
  340. while (thr_node != NULL) {
  341. if (thr_node->state != state) {
  342. return(FALSE);
  343. }
  344. thr_node = UT_LIST_GET_NEXT(thrs, thr_node);
  345. }
  346. return(TRUE);
  347. }
  348. /**************************************************************************
  349. Calls que_graph_free_recursive for statements in a statement list. */
  350. static
  351. void
  352. que_graph_free_stat_list(
  353. /*=====================*/
  354. que_node_t* node) /* in: first query graph node in the list */
  355. {
  356. while (node) {
  357. que_graph_free_recursive(node);
  358. node = que_node_get_next(node);
  359. }
  360. }
  361. /**************************************************************************
  362. Frees a query graph, but not the heap where it was created. Does not free
  363. explicit cursor declarations, they are freed in que_graph_free. */
  364. void
  365. que_graph_free_recursive(
  366. /*=====================*/
  367. que_node_t* node) /* in: query graph node */
  368. {
  369. que_fork_t* fork;
  370. que_thr_t* thr;
  371. undo_node_t* undo;
  372. sel_node_t* sel;
  373. ins_node_t* ins;
  374. upd_node_t* upd;
  375. tab_node_t* cre_tab;
  376. ind_node_t* cre_ind;
  377. if (node == NULL) {
  378. return;
  379. }
  380. switch (que_node_get_type(node)) {
  381. case QUE_NODE_FORK:
  382. fork = node;
  383. thr = UT_LIST_GET_FIRST(fork->thrs);
  384. while (thr) {
  385. que_graph_free_recursive(thr);
  386. thr = UT_LIST_GET_NEXT(thrs, thr);
  387. }
  388. break;
  389. case QUE_NODE_THR:
  390. thr = node;
  391. que_graph_free_recursive(thr->child);
  392. break;
  393. case QUE_NODE_UNDO:
  394. undo = node;
  395. mem_heap_free(undo->heap);
  396. break;
  397. case QUE_NODE_SELECT:
  398. sel = node;
  399. sel_node_free_private(sel);
  400. break;
  401. case QUE_NODE_INSERT:
  402. ins = node;
  403. que_graph_free_recursive(ins->select);
  404. mem_heap_free(ins->entry_sys_heap);
  405. break;
  406. case QUE_NODE_UPDATE:
  407. upd = node;
  408. if (upd->in_mysql_interface) {
  409. btr_pcur_free_for_mysql(upd->pcur);
  410. }
  411. que_graph_free_recursive(upd->select);
  412. mem_heap_free(upd->heap);
  413. break;
  414. case QUE_NODE_CREATE_TABLE:
  415. cre_tab = node;
  416. que_graph_free_recursive(cre_tab->tab_def);
  417. que_graph_free_recursive(cre_tab->col_def);
  418. que_graph_free_recursive(cre_tab->commit_node);
  419. mem_heap_free(cre_tab->heap);
  420. break;
  421. case QUE_NODE_CREATE_INDEX:
  422. cre_ind = node;
  423. que_graph_free_recursive(cre_ind->ind_def);
  424. que_graph_free_recursive(cre_ind->field_def);
  425. que_graph_free_recursive(cre_ind->commit_node);
  426. mem_heap_free(cre_ind->heap);
  427. break;
  428. case QUE_NODE_PROC:
  429. que_graph_free_stat_list(((proc_node_t*)node)->stat_list);
  430. break;
  431. case QUE_NODE_IF:
  432. que_graph_free_stat_list(((if_node_t*)node)->stat_list);
  433. que_graph_free_stat_list(((if_node_t*)node)->else_part);
  434. que_graph_free_stat_list(((if_node_t*)node)->elsif_list);
  435. break;
  436. case QUE_NODE_ELSIF:
  437. que_graph_free_stat_list(((elsif_node_t*)node)->stat_list);
  438. break;
  439. case QUE_NODE_WHILE:
  440. que_graph_free_stat_list(((while_node_t*)node)->stat_list);
  441. break;
  442. case QUE_NODE_FOR:
  443. que_graph_free_stat_list(((for_node_t*)node)->stat_list);
  444. break;
  445. case QUE_NODE_ASSIGNMENT:
  446. case QUE_NODE_RETURN:
  447. case QUE_NODE_COMMIT:
  448. case QUE_NODE_ROLLBACK:
  449. case QUE_NODE_LOCK:
  450. case QUE_NODE_FUNC:
  451. case QUE_NODE_ORDER:
  452. case QUE_NODE_ROW_PRINTF:
  453. case QUE_NODE_OPEN:
  454. case QUE_NODE_FETCH:
  455. /* No need to do anything */
  456. break;
  457. default:
  458. ut_a(0);
  459. }
  460. }
  461. /**************************************************************************
  462. Frees a query graph. */
  463. void
  464. que_graph_free(
  465. /*===========*/
  466. que_t* graph) /* in: query graph; we assume that the memory
  467. heap where this graph was created is private
  468. to this graph: if not, then use
  469. que_graph_free_recursive and free the heap
  470. afterwards! */
  471. {
  472. ut_ad(graph);
  473. if (graph->sym_tab) {
  474. /* The following call frees dynamic memory allocated
  475. for variables etc. during execution. Frees also explicit
  476. cursor definitions. */
  477. sym_tab_free_private(graph->sym_tab);
  478. }
  479. que_graph_free_recursive(graph);
  480. mem_heap_free(graph->heap);
  481. }
  482. /**************************************************************************
  483. Checks if the query graph is in a state where it should be freed, and
  484. frees it in that case. If the session is in a state where it should be
  485. closed, also this is done. */
  486. ibool
  487. que_graph_try_free(
  488. /*===============*/
  489. /* out: TRUE if freed */
  490. que_t* graph) /* in: query graph */
  491. {
  492. sess_t* sess;
  493. ut_ad(mutex_own(&kernel_mutex));
  494. sess = (graph->trx)->sess;
  495. if ((graph->state == QUE_FORK_BEING_FREED)
  496. && (graph->n_active_thrs == 0)) {
  497. UT_LIST_REMOVE(graphs, sess->graphs, graph);
  498. que_graph_free(graph);
  499. sess_try_close(sess);
  500. return(TRUE);
  501. }
  502. return(FALSE);
  503. }
  504. /**************************************************************************
  505. Handles an SQL error noticed during query thread execution. Currently,
  506. does nothing! */
  507. void
  508. que_thr_handle_error(
  509. /*=================*/
  510. que_thr_t* thr, /* in: query thread */
  511. ulint err_no, /* in: error number */
  512. byte* err_str,/* in, own: error string or NULL; NOTE: the
  513. function will take care of freeing of the
  514. string! */
  515. ulint err_len)/* in: error string length */
  516. {
  517. UT_NOT_USED(thr);
  518. UT_NOT_USED(err_no);
  519. UT_NOT_USED(err_str);
  520. UT_NOT_USED(err_len);
  521. /* Does nothing */
  522. }
  523. /**************************************************************************
  524. Tries to parallelize query if it is not parallel enough yet. */
  525. static
  526. que_thr_t*
  527. que_try_parallelize(
  528. /*================*/
  529. /* out: next thread to execute */
  530. que_thr_t* thr) /* in: query thread */
  531. {
  532. ut_ad(thr);
  533. /* Does nothing yet */
  534. return(thr);
  535. }
  536. /********************************************************************
  537. Builds a command completed-message to the client. */
  538. static
  539. ulint
  540. que_build_srv_msg(
  541. /*==============*/
  542. /* out: message data length */
  543. byte* buf, /* in: message buffer */
  544. que_fork_t* fork, /* in: query graph where execution completed */
  545. sess_t* sess) /* in: session */
  546. {
  547. ulint len;
  548. /* Currently, we only support stored procedures: */
  549. ut_ad(fork->fork_type == QUE_FORK_PROCEDURE);
  550. if (sess->state == SESS_ERROR) {
  551. return(0);
  552. }
  553.    sess_srv_msg_init(sess, buf, SESS_SRV_SUCCESS);
  554. len = pars_proc_write_output_params_to_buf(buf + SESS_SRV_MSG_DATA,
  555. fork);
  556. return(len);
  557. }
  558. /********************************************************************
  559. Performs an execution step on a thr node. */
  560. static
  561. que_thr_t*
  562. que_thr_node_step(
  563. /*==============*/
  564. /* out: query thread to run next, or NULL
  565. if none */
  566. que_thr_t* thr) /* in: query thread where run_node must
  567. be the thread node itself */
  568. {
  569. ut_ad(thr->run_node == thr);
  570. if (thr->prev_node == thr->common.parent) {
  571. /* If control to the node came from above, it is just passed
  572. on */
  573. thr->run_node = thr->child;
  574. return(thr);
  575. }
  576. mutex_enter(&kernel_mutex);
  577. if (que_thr_peek_stop(thr)) {
  578. mutex_exit(&kernel_mutex);
  579. return(thr);
  580. }
  581. /* Thread execution completed */
  582. thr->state = QUE_THR_COMPLETED;
  583. mutex_exit(&kernel_mutex);
  584. return(NULL);
  585. }
  586. /**************************************************************************
  587. Moves a thread from another state to the QUE_THR_RUNNING state. Increments
  588. the n_active_thrs counters of the query graph and transaction if thr was
  589. not active.
  590. ***NOTE***: This and ..._mysql are  the only functions in which such a
  591. transition is allowed to happen! */
  592. static
  593. void
  594. que_thr_move_to_run_state(
  595. /*======================*/
  596. que_thr_t* thr) /* in: an query thread */
  597. {
  598. trx_t* trx;
  599. ut_ad(thr->state != QUE_THR_RUNNING);
  600. trx = thr_get_trx(thr);
  601. if (!thr->is_active) {
  602. (thr->graph)->n_active_thrs++;
  603. trx->n_active_thrs++;
  604. thr->is_active = TRUE;
  605. ut_ad((thr->graph)->n_active_thrs == 1);
  606. ut_ad(trx->n_active_thrs == 1);
  607. }
  608. thr->state = QUE_THR_RUNNING;
  609. }
  610. /**************************************************************************
  611. Decrements the query thread reference counts in the query graph and the
  612. transaction. May start signal handling, e.g., a rollback.
  613. *** NOTE ***:
  614. This and que_thr_stop_for_mysql are
  615. the only functions where the reference count can be decremented and
  616. this function may only be called from inside que_run_threads or
  617. que_thr_check_if_switch! These restrictions exist to make the rollback code
  618. easier to maintain. */
  619. static
  620. void
  621. que_thr_dec_refer_count(
  622. /*====================*/
  623. que_thr_t* thr, /* in: query thread */
  624. que_thr_t** next_thr) /* in/out: next query thread to run;
  625. if the value which is passed in is
  626. a pointer to a NULL pointer, then the
  627. calling function can start running
  628. a new query thread */ 
  629. {
  630. que_fork_t* fork;
  631. trx_t* trx;
  632. sess_t* sess;
  633. ibool send_srv_msg = FALSE;
  634. ibool release_stored_proc = FALSE;
  635. ulint msg_len;
  636. byte msg_buf[ODBC_DATAGRAM_SIZE];
  637. ulint fork_type;
  638. ibool stopped;
  639. fork = thr->common.parent;
  640. trx = thr->graph->trx;
  641. sess = trx->sess;
  642. mutex_enter(&kernel_mutex);
  643. ut_a(thr->is_active);
  644. if (thr->state == QUE_THR_RUNNING) {
  645. stopped = que_thr_stop(thr);
  646. if (!stopped) {
  647. /* The reason for the thr suspension or wait was
  648. already canceled before we came here: continue
  649. running the thread */
  650. /* printf(
  651. "!!!!!!!!!! Wait already ended: continue thrn"); */
  652. if (next_thr && *next_thr == NULL) {
  653. *next_thr = thr;
  654. } else {
  655. srv_que_task_enqueue_low(thr);
  656. }
  657. mutex_exit(&kernel_mutex);
  658. return;
  659. }
  660. }
  661. ut_ad(fork->n_active_thrs == 1);
  662. ut_ad(trx->n_active_thrs == 1);
  663. fork->n_active_thrs--;
  664. trx->n_active_thrs--;
  665. thr->is_active = FALSE;
  666. if (trx->n_active_thrs > 0) {
  667. mutex_exit(&kernel_mutex);
  668. return;
  669. }
  670. fork_type = fork->fork_type;
  671. /* Check if all query threads in the same fork are completed */
  672. if (que_fork_all_thrs_in_state(fork, QUE_THR_COMPLETED)) {
  673. if (fork_type == QUE_FORK_ROLLBACK) {
  674. /* This is really the undo graph used in rollback,
  675. no roll_node in this graph */
  676. ut_ad(UT_LIST_GET_LEN(trx->signals) > 0);
  677. ut_ad(trx->handling_signals == TRUE);
  678. trx_finish_rollback_off_kernel(fork, trx, next_thr);
  679. } else if (fork_type == QUE_FORK_PURGE) {
  680. /* Do nothing */
  681. } else if (fork_type == QUE_FORK_RECOVERY) {
  682. /* Do nothing */
  683. } else if (fork_type == QUE_FORK_MYSQL_INTERFACE) {
  684. /* Do nothing */
  685. } else if (fork->common.parent == NULL
  686. && fork->caller == NULL
  687.   && UT_LIST_GET_LEN(trx->signals) == 0) {
  688. ut_a(0); /* not used in MySQL */
  689. /* Reply to the client */ 
  690. /* que_thr_add_update_info(thr); */
  691. fork->state = QUE_FORK_COMMAND_WAIT;
  692. msg_len = que_build_srv_msg(msg_buf, fork, sess);
  693. send_srv_msg = TRUE;
  694. if (fork->fork_type == QUE_FORK_PROCEDURE) {
  695. release_stored_proc = TRUE;
  696. }
  697. ut_ad(trx->graph == fork);
  698. trx->graph = NULL;
  699. } else {
  700. /* Subprocedure calls not implemented yet */
  701. ut_a(0);
  702. }
  703. }
  704. if (UT_LIST_GET_LEN(trx->signals) > 0 && trx->n_active_thrs == 0) {
  705. ut_ad(!send_srv_msg);
  706.      /* If the trx is signaled and its query thread count drops to
  707. zero, then we start processing a signal; from it we may get
  708. a new query thread to run */
  709. trx_sig_start_handle(trx, next_thr);
  710. }
  711. if (trx->handling_signals && UT_LIST_GET_LEN(trx->signals) == 0) {
  712. trx_end_signal_handling(trx);
  713. }
  714. mutex_exit(&kernel_mutex);
  715. if (send_srv_msg) {
  716. /* Note that, as we do not own the kernel mutex at this point,
  717. and neither do we own it all the time when doing the actual
  718. communication operation within the next function, it is
  719. possible that the messages will not get delivered in the right
  720. sequential order. This is possible if the client communicates
  721. an extra message to the server while the message below is still
  722. undelivered. But then the client should notice that there
  723. is an error in the order numbers of the messages. */
  724. sess_command_completed_message(sess, msg_buf, msg_len);
  725. }
  726. if (release_stored_proc) {
  727. /* Return the stored procedure graph to the dictionary cache */
  728. dict_procedure_release_parsed_copy(fork);
  729. }
  730. }
  731. /**************************************************************************
  732. Stops a query thread if graph or trx is in a state requiring it. The
  733. conditions are tested in the order (1) graph, (2) trx. The kernel mutex has
  734. to be reserved. */
  735. ibool
  736. que_thr_stop(
  737. /*=========*/
  738. /* out: TRUE if stopped */
  739. que_thr_t* thr) /* in: query thread */
  740. {
  741. trx_t* trx;
  742. que_t* graph;
  743. ibool ret = TRUE;
  744. ut_ad(mutex_own(&kernel_mutex));
  745. graph = thr->graph;
  746. trx = graph->trx;
  747. if (graph->state == QUE_FORK_COMMAND_WAIT) {
  748. thr->state = QUE_THR_SUSPENDED;
  749. } else if (trx->que_state == TRX_QUE_LOCK_WAIT) {
  750. UT_LIST_ADD_FIRST(trx_thrs, trx->wait_thrs, thr);
  751. thr->state = QUE_THR_LOCK_WAIT;
  752. } else if (trx->error_state != DB_SUCCESS
  753. && trx->error_state != DB_LOCK_WAIT) {
  754. /* Error handling built for the MySQL interface */
  755. thr->state = QUE_THR_COMPLETED;
  756. } else if (UT_LIST_GET_LEN(trx->signals) > 0
  757. && graph->fork_type != QUE_FORK_ROLLBACK) {
  758. thr->state = QUE_THR_SUSPENDED;
  759. } else {
  760. ut_ad(graph->state == QUE_FORK_ACTIVE);
  761. ret = FALSE;
  762. }         
  763. return(ret);
  764. }
  765. /**************************************************************************
  766. A patch for MySQL used to 'stop' a dummy query thread used in MySQL. */
  767. void
  768. que_thr_stop_for_mysql(
  769. /*===================*/
  770. que_thr_t* thr) /* in: query thread */
  771. {
  772. ibool stopped  = FALSE;
  773. trx_t* trx;
  774. trx = thr_get_trx(thr);
  775. mutex_enter(&kernel_mutex);
  776. if (thr->state == QUE_THR_RUNNING) {
  777. if (trx->error_state != DB_SUCCESS
  778.     && trx->error_state != DB_LOCK_WAIT) {
  779. /* Error handling built for the MySQL interface */
  780. thr->state = QUE_THR_COMPLETED;
  781. stopped = TRUE;
  782. }
  783. if (!stopped) {
  784. /* It must have been a lock wait but the
  785. lock was already released */
  786. mutex_exit(&kernel_mutex);
  787. return;
  788. }
  789. }
  790. thr->is_active = FALSE;
  791. (thr->graph)->n_active_thrs--;
  792. trx->n_active_thrs--;
  793. mutex_exit(&kernel_mutex);
  794. }
  795. /**************************************************************************
  796. Prints info of an SQL query graph node. */
  797. void
  798. que_node_print_info(
  799. /*================*/
  800. que_node_t* node) /* in: query graph node */
  801. {
  802. ulint type;
  803. char* str;
  804. ulint addr;
  805. type = que_node_get_type(node);
  806. addr = (ulint)node;
  807. if (type == QUE_NODE_SELECT) {
  808. str = "SELECT";
  809. } else if (type == QUE_NODE_INSERT) {
  810. str = "INSERT";
  811. } else if (type == QUE_NODE_UPDATE) {
  812. str = "UPDATE";
  813. } else if (type == QUE_NODE_WHILE) {
  814. str = "WHILE";
  815. } else if (type == QUE_NODE_ASSIGNMENT) {
  816. str = "ASSIGNMENT";
  817. } else if (type == QUE_NODE_IF) {
  818. str = "IF";
  819. } else if (type == QUE_NODE_FETCH) {
  820. str = "FETCH";
  821. } else if (type == QUE_NODE_OPEN) {
  822. str = "OPEN";
  823. } else if (type == QUE_NODE_PROC) {
  824. str = "STORED PROCEDURE";
  825. } else if (type == QUE_NODE_FUNC) {
  826. str = "FUNCTION";
  827. } else if (type == QUE_NODE_LOCK) {
  828. str = "LOCK";
  829. } else if (type == QUE_NODE_THR) {
  830. str = "QUERY THREAD";
  831. } else if (type == QUE_NODE_COMMIT) {
  832. str = "COMMIT";
  833. } else if (type == QUE_NODE_UNDO) {
  834. str = "UNDO ROW";
  835. } else if (type == QUE_NODE_PURGE) {
  836. str = "PURGE ROW";
  837. } else if (type == QUE_NODE_ROLLBACK) {
  838. str = "ROLLBACK";
  839. } else if (type == QUE_NODE_CREATE_TABLE) {
  840. str = "CREATE TABLE";
  841. } else if (type == QUE_NODE_CREATE_INDEX) {
  842. str = "CREATE INDEX";
  843. } else if (type == QUE_NODE_FOR) {
  844. str = "FOR LOOP";
  845. } else if (type == QUE_NODE_RETURN) {
  846. str = "RETURN";
  847. } else {
  848. str = "UNKNOWN NODE TYPE";
  849. }
  850. printf("Node type %lu: %s, address %lxn", type, str, addr);
  851. }
  852. /**************************************************************************
  853. Performs an execution step on a query thread. */
  854. UNIV_INLINE
  855. que_thr_t*
  856. que_thr_step(
  857. /*=========*/
  858. /* out: query thread to run next: it may
  859. differ from the input parameter if, e.g., a
  860. subprocedure call is made */ 
  861. que_thr_t* thr) /* in: query thread */
  862. {
  863. que_node_t* node;
  864. que_thr_t* old_thr;
  865. trx_t* trx;
  866. ulint type;
  867. ut_ad(thr->state == QUE_THR_RUNNING);
  868. thr->resource++;
  869. type = que_node_get_type(thr->run_node);
  870. node = thr->run_node;
  871. old_thr = thr;
  872. #ifdef UNIV_DEBUG
  873. if (que_trace_on) {
  874. printf("To execute: ");
  875. que_node_print_info(node);
  876. }
  877. #endif
  878. if (type & QUE_NODE_CONTROL_STAT) {
  879. if ((thr->prev_node != que_node_get_parent(node))
  880. && que_node_get_next(thr->prev_node)) {
  881. /* The control statements, like WHILE, always pass the
  882. control to the next child statement if there is any
  883. child left */
  884. thr->run_node = que_node_get_next(thr->prev_node);
  885. } else if (type == QUE_NODE_IF) {
  886. if_step(thr);
  887. } else if (type == QUE_NODE_FOR) {
  888. for_step(thr);
  889. } else if (type == QUE_NODE_PROC) {
  890. /* We can access trx->undo_no without reserving
  891. trx->undo_mutex, because there cannot be active query
  892. threads doing updating or inserting at the moment! */
  893. if (thr->prev_node == que_node_get_parent(node)) {
  894. trx = thr_get_trx(thr);
  895. trx->last_sql_stat_start.least_undo_no
  896. = trx->undo_no;
  897. }
  898. proc_step(thr);
  899. } else if (type == QUE_NODE_WHILE) {
  900. while_step(thr);
  901. }
  902. } else if (type == QUE_NODE_ASSIGNMENT) {
  903. assign_step(thr);
  904. } else if (type == QUE_NODE_SELECT) {
  905. thr = row_sel_step(thr);
  906. } else if (type == QUE_NODE_INSERT) {
  907. thr = row_ins_step(thr);
  908. } else if (type == QUE_NODE_UPDATE) {
  909. thr = row_upd_step(thr);
  910. } else if (type == QUE_NODE_FETCH) {
  911. thr = fetch_step(thr);
  912. } else if (type == QUE_NODE_OPEN) {
  913. thr = open_step(thr);
  914. } else if (type == QUE_NODE_FUNC) {
  915. proc_eval_step(thr);
  916. } else if (type == QUE_NODE_LOCK) {
  917. ut_error;
  918. /*
  919. thr = que_lock_step(thr);
  920. */
  921. } else if (type == QUE_NODE_THR) {
  922. thr = que_thr_node_step(thr);
  923. } else if (type == QUE_NODE_COMMIT) {
  924. thr = trx_commit_step(thr);
  925. } else if (type == QUE_NODE_UNDO) {
  926. thr = row_undo_step(thr);
  927. } else if (type == QUE_NODE_PURGE) {
  928. thr = row_purge_step(thr);
  929. } else if (type == QUE_NODE_RETURN) {
  930. thr = return_step(thr);
  931. } else if (type == QUE_NODE_ROLLBACK) {
  932. thr = trx_rollback_step(thr);
  933. } else if (type == QUE_NODE_CREATE_TABLE) {
  934. thr = dict_create_table_step(thr);
  935. } else if (type == QUE_NODE_CREATE_INDEX) {
  936. thr = dict_create_index_step(thr);
  937. } else if (type == QUE_NODE_ROW_PRINTF) {
  938. thr = row_printf_step(thr);
  939. } else {
  940. ut_error;
  941. }
  942. old_thr->prev_node = node;
  943. return(thr);
  944. }
  945. /***********************************************************************
  946. Checks if there is a need for a query thread switch or stopping the current
  947. thread. */
  948. que_thr_t*
  949. que_thr_check_if_switch(
  950. /*====================*/
  951. que_thr_t* thr, /* in: current query thread */
  952. ulint* cumul_resource) /* in: amount of resources used
  953. by the current call of que_run_threads
  954. (resources used by the OS thread!) */
  955. {
  956. que_thr_t* next_thr;
  957. ibool stopped;
  958. if (que_thr_peek_stop(thr)) {
  959. mutex_enter(&kernel_mutex);
  960. stopped = que_thr_stop(thr);
  961. mutex_exit(&kernel_mutex);
  962. if (stopped) {
  963. /* If a signal is processed, we may get a new query
  964. thread next_thr to run */
  965. next_thr = NULL;
  966. que_thr_dec_refer_count(thr, &next_thr);
  967. if (next_thr == NULL) {
  968. return(NULL);
  969. }
  970. thr = next_thr;
  971. }
  972. }
  973. if (thr->resource > QUE_PARALLELIZE_LIMIT) { 
  974. /* Try parallelization of the query thread */
  975. thr = que_try_parallelize(thr);
  976. thr->resource = 0;
  977. }
  978. (*cumul_resource)++;
  979. if (*cumul_resource > QUE_ROUND_ROBIN_LIMIT) {
  980. /* It is time to round-robin query threads in the
  981. server task queue */
  982. if (srv_get_thread_type() == SRV_COM) {
  983. /* This OS thread is a SRV_COM thread: we put
  984. the query thread to the task queue and return
  985. to allow the OS thread to receive more
  986. messages from clients */
  987. ut_ad(thr->is_active);
  988.     
  989. srv_que_task_enqueue(thr);
  990. return(NULL);
  991. } else {
  992. /* Change the query thread if there is another
  993. in the server task queue */
  994. thr = srv_que_round_robin(thr);
  995. }
  996. *cumul_resource = 0;
  997. }
  998. return(thr);
  999. }
  1000. /**************************************************************************
  1001. Runs query threads. Note that the individual query thread which is run
  1002. within this function may change if, e.g., the OS thread executing this
  1003. function uses a threshold amount of resources. */
  1004. void
  1005. que_run_threads(
  1006. /*============*/
  1007. que_thr_t* thr) /* in: query thread which is run initially */
  1008. {
  1009. que_thr_t* next_thr;
  1010. ulint cumul_resource;
  1011. ulint loop_count;
  1012. ut_ad(thr->state == QUE_THR_RUNNING);
  1013. ut_ad(!mutex_own(&kernel_mutex));
  1014. /* cumul_resource counts how much resources the OS thread (NOT the
  1015. query thread) has spent in this function */
  1016. loop_count = QUE_MAX_LOOPS_WITHOUT_CHECK;
  1017. cumul_resource = 0;
  1018. loop:
  1019. if (loop_count >= QUE_MAX_LOOPS_WITHOUT_CHECK) {
  1020. /* In MySQL this thread switch is never needed! 
  1021. loop_count = 0;
  1022. next_thr = que_thr_check_if_switch(thr, &cumul_resource);
  1023. if (next_thr != thr) {
  1024. if (next_thr == NULL) {
  1025. return;
  1026. }
  1027. loop_count = QUE_MAX_LOOPS_WITHOUT_CHECK;
  1028. }
  1029. thr = next_thr;
  1030. */
  1031. }
  1032. /* Check that there is enough space in the log to accommodate
  1033. possible log entries by this query step; if the operation can touch
  1034. more than about 4 pages, checks must be made also within the query
  1035. step! */
  1036. log_free_check();
  1037. /* Perform the actual query step: note that the query thread
  1038. may change if, e.g., a subprocedure call is made */
  1039. /*-------------------------*/
  1040. next_thr = que_thr_step(thr);
  1041. /*-------------------------*/
  1042. /* Test the effect on performance of adding extra mutex
  1043. reservations */
  1044. /* if (srv_test_extra_mutexes) {
  1045. mutex_enter(&kernel_mutex);
  1046. mutex_exit(&kernel_mutex);
  1047. }
  1048. */
  1049. /* TRUE below denotes that the thread is allowed to own the dictionary
  1050. mutex, though */
  1051. ut_ad(sync_thread_levels_empty_gen(TRUE));
  1052. loop_count++;
  1053. if (next_thr != thr) {
  1054. que_thr_dec_refer_count(thr, &next_thr);
  1055. if (next_thr == NULL) {
  1056. return;
  1057. }
  1058. loop_count = QUE_MAX_LOOPS_WITHOUT_CHECK;
  1059. thr = next_thr;
  1060. }
  1061. goto loop;
  1062. }