usr0sess.c
上传用户:tsgydb
上传日期:2007-04-14
资源大小:10674k
文件大小:27k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /******************************************************
  2. Sessions
  3. (c) 1996 Innobase Oy
  4. Created 6/25/1996 Heikki Tuuri
  5. *******************************************************/
  6. #include "usr0sess.h"
  7. #ifdef UNIV_NONINL
  8. #include "usr0sess.ic"
  9. #endif
  10. #include "ut0rnd.h"
  11. #include "mach0data.h"
  12. #include "ha0ha.h"
  13. #include "trx0trx.h"
  14. #include "que0que.h"
  15. #include "pars0pars.h"
  16. #include "pars0sym.h"
  17. #include "dict0dict.h"
  18. #include "dict0mem.h"
  19. #include "odbc0odbc.h"
  20. #define SESS_ERR_BUF_SIZE 8192
  21. /* The session system global data structure */
  22. sess_sys_t* sess_sys = NULL;
  23. /*************************************************************************
  24. Communicates an error message to the client. If sess->client_waits is not
  25. TRUE, puts the session to error state and does not try to send the error
  26. message. */
  27. static
  28. void
  29. sess_srv_msg_send_error(
  30. /*====================*/
  31. sess_t* sess); /* in: session object */
  32. /*************************************************************************
  33. Copies error info to a session. Sends to the transaction a signal which will
  34. rollback the latest incomplete SQL statement and then send the error message
  35. to the client. NOTE: This function will take care of the freeing of the error
  36. string, thus the caller must supply a copy of the error string. */
  37. static
  38. void
  39. sess_error_low(
  40. /*===========*/
  41. sess_t* sess, /* in: session object */
  42. ulint err_no, /* in: error number */
  43. char* err_str);/* in, own: error string or NULL;
  44. NOTE: the function will take care of freeing of the
  45. string! */
  46. /*************************************************************************
  47. Folds a session id to a ulint. Because this function is used also in
  48. calculating a checksum for the id to write in the message, it is performs
  49. also a XOR operation to mix the values more thoroughly. */
  50. UNIV_INLINE
  51. ulint
  52. sess_id_fold(
  53. /*=========*/
  54. /* out: folded value; can be used also as the checksum
  55. for id */
  56. dulint id) /* in: session id */
  57. {
  58. return(ut_fold_dulint(id) ^ 2945794411U);
  59. }
  60. /*************************************************************************
  61. Sets the session id in a client message. */
  62. void
  63. sess_cli_msg_set_sess(
  64. /*==================*/
  65. byte* str, /* in/out: message string */
  66. dulint sess_id)/* in: session id */
  67. {
  68. ulint fold;
  69. mach_write_to_8(str + SESS_CLI_MSG_SESS_ID, sess_id);
  70. fold = sess_id_fold(sess_id);
  71. mach_write_to_4(str + SESS_CLI_MSG_SESS_ID_CHECK, fold);
  72. }
  73. /*************************************************************************
  74. Returns the session to which a message from a client is addressed.
  75. NOTE: this function does not assume that the message is uncorrupted. */
  76. static
  77. sess_t*
  78. sess_cli_msg_get_sess(
  79. /*==================*/
  80. /* out: session, NULL if not found */
  81. byte* str, /* in: message string */
  82. ulint len) /* in: message string length */
  83. {
  84. sess_t* sess;
  85. ulint fold;
  86. dulint id;
  87. ut_ad(mutex_own(&kernel_mutex));
  88. if (len < SESS_CLI_MSG_SESS_ID_CHECK + 4) {
  89. return(NULL);
  90. }
  91. id = mach_read_from_8(str + SESS_CLI_MSG_SESS_ID);
  92. fold = sess_id_fold(id);
  93. if (fold != mach_read_from_4(str + SESS_CLI_MSG_SESS_ID_CHECK)) {
  94. return(NULL);
  95. }
  96. HASH_SEARCH(hash, sess_sys->hash, fold, sess,
  97. UT_DULINT_EQ(id, sess->id));
  98. return(sess);
  99. }
  100. /***************************************************************************
  101. Decrements the reference count of a session and closes it, if desired. */
  102. UNIV_INLINE
  103. void
  104. sess_refer_count_dec(
  105. /*=================*/
  106. sess_t* sess) /* in: session */
  107. {
  108. ut_ad(mutex_own(&kernel_mutex));
  109. ut_ad(sess->refer_count > 0);
  110. sess->refer_count--;
  111. if (sess->disconnecting && (sess->refer_count == 0)) {
  112.      sess_close(sess);
  113. }
  114. }
  115. /***************************************************************************
  116. Increments the reference count of a session. */
  117. UNIV_INLINE
  118. void
  119. sess_refer_count_inc(
  120. /*=================*/
  121. sess_t* sess) /* in: session */
  122. {
  123. ut_ad(mutex_own(&kernel_mutex));
  124. sess->refer_count++;
  125. }
  126. /***************************************************************************
  127. Creates a session system at a database start. */
  128. void
  129. sess_sys_init_at_db_start(void)
  130. /*===========================*/
  131. {
  132. sess_sys = mem_alloc(sizeof(sess_sys_t));
  133. sess_sys->state = SESS_SYS_RUNNING;
  134. sess_sys->free_sess_id = ut_dulint_create(0, 1);
  135. sess_sys->hash = hash_create(SESS_HASH_SIZE);
  136. }
  137. /***************************************************************************
  138. Gets the message type of a message from client. */
  139. UNIV_INLINE
  140. ulint
  141. sess_cli_msg_get_type(
  142. /*==================*/
  143. /* out: message type */
  144. byte* str) /* in: message string */
  145. {
  146. ut_ad(mutex_own(&kernel_mutex));
  147. return(mach_read_from_4(str + SESS_CLI_MSG_TYPE));
  148. }
  149. /***************************************************************************
  150. Gets the message number of a message from client. */
  151. UNIV_INLINE
  152. dulint
  153. sess_cli_msg_get_msg_no(
  154. /*====================*/
  155. /* out: message number */
  156. byte* str) /* in: message string */
  157. {
  158. ut_ad(mutex_own(&kernel_mutex));
  159. return(mach_read_from_8(str + SESS_CLI_MSG_NO));
  160. }
  161. /***************************************************************************
  162. Gets the continue field of a message from client. */
  163. UNIV_INLINE
  164. ulint
  165. sess_cli_msg_get_continue(
  166. /*======================*/
  167. /* out: SESS_MSG_SINGLE_PART, ... */
  168. byte* str) /* in: message string */
  169. {
  170. ut_ad(mutex_own(&kernel_mutex));
  171. return(mach_read_from_4(str + SESS_CLI_MSG_CONTINUE));
  172. }
  173. /***************************************************************************
  174. Gets the size of a big message in kilobytes. */
  175. UNIV_INLINE
  176. ulint
  177. sess_cli_msg_get_cont_size(
  178. /*=======================*/
  179. /* out: size in kilobytes */
  180. byte* str) /* in: message string */
  181. {
  182. ut_ad(mutex_own(&kernel_mutex));
  183. return(mach_read_from_4(str + SESS_CLI_MSG_CONT_SIZE));
  184. }
  185. /*************************************************************************
  186. Checks the consistency of a message from a client. */
  187. UNIV_INLINE
  188. ibool
  189. sess_cli_msg_check_consistency(
  190. /*===========================*/
  191. /* out: TRUE if ok */
  192. byte* str, /* in: message string */
  193. ulint len) /* in: message string length */
  194. {
  195. ulint fold;
  196. ut_ad(mutex_own(&kernel_mutex));
  197. if (len < SESS_CLI_MSG_DATA) {
  198. return(FALSE);
  199. }
  200. ut_ad(SESS_CLI_MSG_CHECKSUM == 0);
  201. fold = ut_fold_binary(str + 4, len - 4);
  202. if (mach_read_from_4(str + SESS_CLI_MSG_CHECKSUM) != fold) {
  203. return(FALSE);
  204. }
  205. return(TRUE);
  206. }
  207. /*************************************************************************
  208. Opens a session. */
  209. sess_t*
  210. sess_open(
  211. /*======*/
  212. /* out, own: session object */
  213. com_endpoint_t* endpoint, /* in: communication endpoint used
  214. for receiving messages from the client,
  215. or NULL if no client */
  216. byte* addr_buf, /* in: client address (= user name) */
  217. ulint addr_len) /* in: client address length */
  218. {
  219. sess_t* sess;
  220. ulint fold;
  221. ut_ad(mutex_own(&kernel_mutex));
  222. sess = mem_alloc(sizeof(sess_t));
  223. sess->id = sess_sys->free_sess_id;
  224. UT_DULINT_INC(sess_sys->free_sess_id);
  225. sess->state = SESS_ACTIVE;
  226. sess->disconnecting = FALSE;
  227. sess->msgs_sent = ut_dulint_zero;
  228. sess->msgs_recv = ut_dulint_zero;
  229. sess->client_waits = TRUE;
  230. sess->err_no = 0;
  231. sess->err_str = NULL;
  232. sess->error_count = ut_dulint_zero;
  233. sess->big_msg = NULL;
  234. sess->trx = trx_create(sess);
  235. sess->next_graph_id = 0;
  236. UT_LIST_INIT(sess->graphs);
  237. fold = sess_id_fold(sess->id);
  238. HASH_INSERT(sess_t, hash, sess_sys->hash, fold, sess);
  239. sess->endpoint = endpoint;
  240. sess->addr_buf = mem_alloc(addr_len);
  241. ut_memcpy(sess->addr_buf, addr_buf, addr_len);
  242. sess->addr_len = addr_len;
  243. return(sess);
  244. }
  245. /*************************************************************************
  246. Closes a session, freeing the memory occupied by it. */
  247. void
  248. sess_close(
  249. /*=======*/
  250. sess_t* sess) /* in, own: session object */
  251. {
  252. ulint fold;
  253. ut_ad(mutex_own(&kernel_mutex));
  254. ut_ad(sess->disconnecting);
  255. ut_ad(sess->trx == NULL);
  256. ut_ad(sess->refer_count == 0);
  257. fold = ut_fold_dulint(sess->id);
  258. HASH_DELETE(sess_t, hash, sess_sys->hash, fold, sess);
  259. /* sess_reply_to_client_rel_kernel(sess); */
  260. if (sess->err_str != NULL) {
  261. mem_free(sess->err_str);
  262. }
  263. mem_free(sess->addr_buf);
  264. mem_free(sess);
  265. }
  266. /*************************************************************************
  267. Closes a session, freeing the memory occupied by it, if it is in a state
  268. where it should be closed. */
  269. ibool
  270. sess_try_close(
  271. /*===========*/
  272. /* out: TRUE if closed */
  273. sess_t* sess) /* in, own: session object */
  274. {
  275. ut_ad(mutex_own(&kernel_mutex));
  276. if (sess->disconnecting && (UT_LIST_GET_LEN(sess->graphs) == 0)
  277. && (sess->refer_count == 0)) {
  278. sess_close(sess);
  279. return(TRUE);
  280. }
  281. return(FALSE);
  282. }
  283. /*************************************************************************
  284. Initializes the first fields of a message to client. */
  285. void
  286. sess_srv_msg_init(
  287. /*==============*/
  288. sess_t* sess, /* in: session object */
  289. byte* buf, /* in: message buffer, must be at least of size
  290. SESS_SRV_MSG_DATA */
  291. ulint type) /* in: message type */
  292. {
  293. ut_ad(mutex_own(&kernel_mutex));
  294. sess->msgs_sent = ut_dulint_add(sess->msgs_sent, 1);
  295. mach_write_to_8(buf + SESS_SRV_MSG_SESS_ID, sess->id);
  296. mach_write_to_4(buf + SESS_SRV_MSG_TYPE, type);
  297. mach_write_to_8(buf + SESS_SRV_MSG_NO, sess->msgs_sent);
  298. ut_ad(com_endpoint_get_max_size(sess->endpoint) >= SESS_SRV_MSG_DATA);
  299. }
  300. /*************************************************************************
  301. Sends a message to the client. */
  302. static
  303. ulint
  304. sess_srv_msg_send_low(
  305. /*==================*/
  306. /* out: 0 if success, else error number */
  307. sess_t* sess, /* in: session object */
  308. byte* buf, /* in: message buffer */
  309. ulint len, /* in: message length */
  310. ulint rel_ker)/* in: SESS_RELEASE_KERNEL if the kernel mutex should
  311. be temporarily released in the call; otherwise
  312. SESS_NOT_RELEASE_KERNEL */
  313. {
  314. ulint ret;
  315. ut_ad((rel_ker == SESS_NOT_RELEASE_KERNEL)
  316. || (rel_ker == SESS_RELEASE_KERNEL));
  317. ut_ad(mutex_own(&kernel_mutex));
  318. ut_ad(len <= com_endpoint_get_max_size(sess->endpoint));
  319. ut_ad(len >= SESS_SRV_MSG_DATA);
  320. if (sess->client_waits == FALSE) {
  321. sess_error_low(sess, SESS_ERR_EXTRANEOUS_SRV_MSG, NULL);
  322. return(1);
  323. }
  324. /* The client will now receive an error message: if the session is
  325. in the error state, we can reset it to the normal state */
  326. if (sess->state == SESS_ERROR) {
  327. sess->state = SESS_ACTIVE;
  328. }
  329. /* We reset the client_waits flag to FALSE, regardless of whether the
  330. message gets delivered to the client or not. This convention makes
  331. things simpler. */
  332. sess->client_waits = FALSE;
  333. if (rel_ker == SESS_RELEASE_KERNEL) {
  334. mutex_exit(&kernel_mutex);
  335. }
  336. ret = com_sendto(sess->endpoint, buf, len, sess->addr_buf,
  337. sess->addr_len);
  338. if (rel_ker == SESS_RELEASE_KERNEL) {
  339. mutex_enter(&kernel_mutex);
  340. }
  341. if (ret != 0) {
  342. sess_error_low(sess, SESS_ERR_REPLY_FAILED, NULL);
  343. }
  344. return(ret);
  345. }
  346. /*************************************************************************
  347. Sends a message to the client. If the session is in the error state, sends
  348. the error message instead of buf. */
  349. static
  350. ulint
  351. sess_srv_msg_send(
  352. /*==============*/
  353. /* out: 0 if success, else error number */
  354. sess_t* sess, /* in: session object */
  355. byte* buf, /* in: message buffer */
  356. ulint len, /* in: message length */
  357. ulint rel_ker)/* in: SESS_RELEASE_KERNEL if the kernel mutex should
  358. be temporarily released in the call; otherwise
  359. SESS_NOT_RELEASE_KERNEL */
  360. {
  361. ulint ret;
  362. ut_ad(mutex_own(&kernel_mutex));
  363. if (sess->state == SESS_ERROR) {
  364. sess_srv_msg_send_error(sess);
  365. return(2);
  366. }
  367. ret = sess_srv_msg_send_low(sess, buf, len, rel_ker);
  368. return(ret);
  369. }
  370. /*************************************************************************
  371. Sends a simple message to client. */
  372. void
  373. sess_srv_msg_send_simple(
  374. /*=====================*/
  375. sess_t* sess, /* in: session object */
  376. ulint type, /* in: message type */
  377. ulint rel_kernel) /* in: SESS_RELEASE_KERNEL or
  378. SESS_NOT_RELEASE_KERNEL */
  379. {
  380. byte buf[SESS_SRV_MSG_DATA];
  381. ut_ad(mutex_own(&kernel_mutex));
  382. sess_srv_msg_init(sess, buf, type);
  383. sess_srv_msg_send(sess, buf, SESS_SRV_MSG_DATA, rel_kernel);
  384. }
  385. /*************************************************************************
  386. Communicates an error message to the client. If sess->client_waits is not
  387. TRUE, puts the session to error state and does not try to send the error
  388. message. */
  389. static
  390. void
  391. sess_srv_msg_send_error(
  392. /*====================*/
  393. sess_t* sess) /* in: session object */
  394. {
  395. ulint err_no;
  396. byte* err_str;
  397. ulint err_len;
  398. ulint max_len;
  399. byte buf[SESS_ERR_BUF_SIZE];
  400. ulint ret;
  401. ut_ad(sess->client_waits);
  402. ut_ad(mutex_own(&kernel_mutex));
  403. ut_ad(sess->state == SESS_ERROR);
  404. ut_ad(!UT_LIST_GET_FIRST((sess->trx)->signals));
  405. if (!sess->client_waits) {
  406. /* Cannot send the error message now: leave the session to
  407. the error state and send it later */
  408. return;
  409. }
  410. err_no = sess->err_no;
  411. err_str = (byte*)sess->err_str;
  412. err_len = sess->err_len;
  413. max_len = ut_min(SESS_ERR_BUF_SIZE,
  414. com_endpoint_get_max_size(sess->endpoint));
  415. sess_srv_msg_init(sess, buf, SESS_SRV_ERROR);
  416. if (err_len + SESS_SRV_MSG_DATA > max_len) {
  417. err_len = max_len - SESS_SRV_MSG_DATA;
  418. }
  419. ut_memcpy(buf + SESS_SRV_MSG_DATA, err_str, err_len);
  420. ret = sess_srv_msg_send_low(sess, buf, SESS_SRV_MSG_DATA + err_len,
  421. SESS_NOT_RELEASE_KERNEL);
  422. }
  423. /*************************************************************************
  424. Copies error info to a session. Sends to the transaction a signal which will
  425. rollback the latest incomplete SQL statement and then send the error message
  426. to the client. NOTE: This function will take care of the freeing of the error
  427. string, thus the caller must supply a copy of the error string. */
  428. static
  429. void
  430. sess_error_low(
  431. /*===========*/
  432. sess_t* sess, /* in: session object */
  433. ulint err_no, /* in: error number */
  434. char* err_str)/* in, own: error string or NULL;
  435. NOTE: the function will take care of freeing of the
  436. string! */
  437. {
  438. ut_ad(mutex_own(&kernel_mutex));
  439. UT_DULINT_INC(sess->error_count);
  440. printf("Error string::: %sn", err_str);
  441. if (sess->state == SESS_ERROR) {
  442. /* Ignore the error because the session is already in the
  443. error state */
  444. if (err_str) {
  445. mem_free(err_str);
  446. }
  447. return;
  448. }
  449. sess->err_no = err_no;
  450. if (sess->err_str) {
  451. mem_free(sess->err_str);
  452. }
  453. sess->err_str = err_str;
  454. sess->err_len = ut_strlen(err_str);
  455. sess->state = SESS_ERROR;
  456. if (sess->big_msg) {
  457. mem_free(sess->big_msg);
  458. }
  459. /* Send a signal which will roll back the latest incomplete SQL
  460. statement: the error message will be sent to the client by the error
  461. handling mechanism after the rollback is completed. */
  462. trx_sig_send(sess->trx, TRX_SIG_ERROR_OCCURRED, TRX_SIG_SELF, FALSE,
  463. NULL, NULL, NULL);
  464. }
  465. /*************************************************************************
  466. Raises an SQL error. */
  467. void
  468. sess_raise_error_low(
  469. /*=================*/
  470. trx_t* trx, /* in: transaction */
  471. ulint err_no, /* in: error number */
  472. ulint type, /* in: more info of the error, or 0 */
  473. dict_table_t* table, /* in: dictionary table or NULL */
  474. dict_index_t* index, /* in: table index or NULL */
  475. dtuple_t* tuple, /* in: tuple to insert or NULL */
  476. rec_t* rec, /* in: record or NULL */
  477. char* err_str)/* in: arbitrary null-terminated error string,
  478. or NULL */
  479. {
  480. char* str;
  481. ulint len;
  482. ut_ad(mutex_own(&kernel_mutex));
  483. str = mem_alloc(64000);
  484. len = 0;
  485. len += sprintf(str + len, "Error number: %lu", err_no);
  486. if (type) {
  487. len += sprintf(str + len, ", type: %lu", type);
  488. }
  489. if (table) {
  490. len += sprintf(str + len, ", table: %s", table->name);
  491. }
  492. if (index) {
  493. len += sprintf(str + len, ", index: %s", index->name);
  494. }
  495. if (tuple) {
  496. len += sprintf(str + len, ", tuple:");
  497. len += dtuple_sprintf(str + len, 8192, tuple);
  498. }
  499. if (rec) {
  500. len += sprintf(str + len, ", record:");
  501. len += rec_sprintf(str + len, 8192, rec);
  502. }
  503. if (err_str) {
  504. len += sprintf(str + len, ", %s", err_str);
  505. }
  506. str[len] = '';
  507. ut_a(len < 64000);
  508. if (trx->sess) {
  509. sess_error_low(trx->sess, err_no, str);
  510. } else {
  511. mem_free(str);
  512. }
  513. }
  514. /***************************************************************************
  515. Processes a client message which is part of a bigger message. */
  516. static
  517. ibool
  518. sess_receive_msg_part(
  519. /*==================*/
  520. /* TRUE if message completed */
  521. sess_t* sess, /* in: session */
  522. byte* str, /* in: message string */
  523. ulint len) /* in: message length */
  524. {
  525. ulint cont;
  526. cont = sess_cli_msg_get_continue(str);
  527. ut_ad(cont != SESS_MSG_SINGLE_PART);
  528. if (cont == SESS_MSG_FIRST_PART) {
  529. if (sess->big_msg) {
  530. sess_error_low(sess, SESS_ERR_MSG_LOST, NULL);
  531. return(FALSE);
  532. }
  533. sess->big_msg_size = 1024 * sess_cli_msg_get_cont_size(str);
  534. sess->big_msg = mem_alloc(sess->big_msg_size);
  535. if (sess->big_msg == NULL) {
  536. sess_error_low(sess, SESS_ERR_OUT_OF_MEMORY, NULL);
  537. return(FALSE);
  538. }
  539. ut_memcpy(sess->big_msg, str, len);
  540. sess->big_msg_len = len;
  541. return(FALSE);
  542. } else {
  543. if (sess->big_msg == NULL) {
  544. sess_error_low(sess, SESS_ERR_MSG_LOST, NULL);
  545. return(FALSE);
  546. }
  547. ut_memcpy(sess->big_msg + sess->big_msg_len,
  548.   str + SESS_CLI_MSG_DATA, len - SESS_CLI_MSG_DATA);
  549. sess->big_msg_len += len - SESS_CLI_MSG_DATA;
  550. if (cont == SESS_MSG_MIDDLE_PART) {
  551. return(FALSE);
  552. }
  553. return(TRUE);
  554. }
  555. }
  556. /***************************************************************************
  557. Processes a client message which requires SQL parsing. This function decodes
  558. the client message built in SQLPrepare. NOTE: The kernel mutex is temporarily
  559. released within this function. */
  560. static
  561. void
  562. sess_receive_prepare(
  563. /*=================*/
  564. sess_t* sess, /* in: session */
  565. byte* cli_msg,/* in: client message */
  566. ulint len) /* in: message length */
  567. {
  568. dulint error_count;
  569. que_t* graph;
  570. byte msg[ODBC_DATAGRAM_SIZE];
  571. UT_NOT_USED(len);
  572. ut_ad(mutex_own(&kernel_mutex));
  573. error_count = sess->error_count;
  574. /* Make sure the session object is not freed during the parsing */
  575. sess_refer_count_inc(sess);
  576. /* We release the kernel mutex before parsing the command: this is
  577. to reduce contention on the kernel mutex */
  578. mutex_exit(&kernel_mutex);
  579. /* printf("To parse query %sn", (char*)(cli_msg + SESS_CLI_MSG_DATA)); */
  580. graph = pars_sql((char*)(cli_msg + SESS_CLI_MSG_DATA));
  581. mutex_enter(&kernel_mutex);
  582. if (graph == NULL) {
  583. /* Error in parsing */
  584. sess_error_low(sess, SESS_ERR_SQL_ERROR, NULL);
  585. sess_refer_count_dec(sess);
  586. ut_error;
  587. return;
  588. }
  589. if (!UT_DULINT_EQ(error_count, sess->error_count)) {
  590. /* An error, or an asyncronous signal on the session happened
  591. when the kernel mutex was not reserved: discard graph */
  592. graph->state = QUE_FORK_INVALID;
  593. que_graph_try_free(graph);
  594. sess_refer_count_dec(sess);
  595. ut_error;
  596. return;
  597. }
  598. UT_LIST_ADD_LAST(graphs, sess->graphs, graph);
  599. graph->id = sess->next_graph_id;
  600. sess->next_graph_id++;
  601. /* Tell the client that the preparation succeeded and communicate info
  602. about the possible query parameters: the message will be decoded in
  603. SQLPrepare */ 
  604. ut_ad(sess->client_waits);
  605. sess_srv_msg_init(sess, msg, SESS_SRV_SUCCESS);
  606. mach_write_to_4(msg + SESS_SRV_MSG_DATA, graph->id);
  607. mutex_exit(&kernel_mutex);
  608. len = pars_write_query_param_info(msg + SESS_SRV_MSG_DATA + 4, graph);
  609. mutex_enter(&kernel_mutex);
  610. sess_srv_msg_send(sess, msg, SESS_SRV_MSG_DATA + 4 + len,
  611. SESS_RELEASE_KERNEL);
  612. sess_refer_count_dec(sess);
  613. }
  614. /***************************************************************************
  615. Processes a client message which does not require SQL parsing. This function
  616. decodes the client message built in SQLExecute. */
  617. static
  618. void
  619. sess_receive_command(
  620. /*=================*/
  621. sess_t* sess, /* in: session */
  622. byte* cli_msg,/* in: client message */
  623. ulint len, /* in: message length */
  624. ulint type) /* in: message type */
  625. {
  626. proc_node_t* proc_node;
  627. call_node_t* call_node;
  628. dict_proc_t* dict_proc;
  629. que_thr_t* thr;
  630. que_t* graph;
  631. ulint stat_id;
  632. UT_NOT_USED(len);
  633. UT_NOT_USED(type);
  634. ut_ad(mutex_own(&kernel_mutex));
  635. sess->client_waits = TRUE;
  636. stat_id = mach_read_from_4(cli_msg + SESS_CLI_MSG_DATA);
  637. /* Look for the statement from the list of query graphs */
  638. graph = UT_LIST_GET_FIRST(sess->graphs);
  639. while (graph != NULL) {
  640. if (graph->id == stat_id) {
  641. break;
  642. }
  643. graph = UT_LIST_GET_NEXT(graphs, graph);
  644. }
  645. if (graph == NULL) {
  646. /* Could not find the right graph: error */
  647. sess_error_low(sess, SESS_ERR_STMT_NOT_FOUND, NULL);
  648. return;
  649. }
  650. if (graph->state != QUE_FORK_COMMAND_WAIT) {
  651. sess_error_low(sess, SESS_ERR_STMT_NOT_READY, NULL);
  652. return;
  653. }
  654. /* printf("To execute stat %lun", stat_id); */
  655. if (graph->fork_type == QUE_FORK_PROCEDURE_CALL) {
  656. /* It is a stored procedure call: retrieve a parsed copy of
  657. the procedure from the dictionary cache */
  658. mutex_exit(&kernel_mutex);
  659. call_node = que_fork_get_child(graph);
  660. graph = dict_procedure_reserve_parsed_copy(
  661. call_node->procedure_def);
  662. graph->trx = sess->trx;
  663. /* Retrieve the procedure input parameters from the message */
  664. pars_proc_read_input_params_from_buf(graph,
  665. cli_msg + SESS_CLI_MSG_DATA + 4);
  666. mutex_enter(&kernel_mutex);
  667. } else {
  668. /* It is a create procedure command: add the procedure to the
  669. dictionary cache */
  670. ut_ad(graph->fork_type == QUE_FORK_PROCEDURE);
  671. mutex_exit(&kernel_mutex);
  672. proc_node = que_fork_get_child(graph);
  673. dict_proc = dict_mem_procedure_create(proc_node->proc_id->name,
  674. proc_node->sym_tab->sql_string,
  675. graph);
  676. dict_procedure_add_to_cache(dict_proc);
  677. mutex_enter(&kernel_mutex);
  678. sess_srv_msg_send_simple(sess, SESS_SRV_SUCCESS,
  679. SESS_RELEASE_KERNEL);
  680. return;
  681. }
  682. /* Choose a query thread for execution */
  683. thr = que_fork_start_command(graph, SESS_COMM_EXECUTE, 0);
  684. ut_ad(thr);
  685. sess->trx->graph = graph;
  686. mutex_exit(&kernel_mutex);
  687. /* Run query threads with the kernel mutex released */
  688. que_run_threads(thr);
  689. mutex_enter(&kernel_mutex);
  690. }
  691. /***************************************************************************
  692. When a command has been completed, this function sends the message about it
  693. to the client. */
  694. void
  695. sess_command_completed_message(
  696. /*===========================*/
  697. sess_t* sess, /* in: session */
  698. byte* msg, /* in: message buffer */
  699. ulint len) /* in: message data length */
  700. {
  701. mutex_enter(&kernel_mutex);
  702. sess_srv_msg_send(sess, msg, SESS_SRV_MSG_DATA + len,
  703. SESS_RELEASE_KERNEL);
  704. mutex_exit(&kernel_mutex);
  705. }
  706. /***************************************************************************
  707. Processes a break message from the client. */
  708. static
  709. void
  710. sess_receive_break(
  711. /*===============*/
  712. sess_t* sess) /* in: session */
  713. {
  714. ut_ad(mutex_own(&kernel_mutex));
  715. /* Rollback the latest incomplete SQL statement */
  716. sess_error_low(sess, SESS_ERR_BREAK_BY_CLIENT, NULL);
  717. }
  718. /***************************************************************************
  719. Processes a message from a client. NOTE: Releases the kernel mutex temporarily
  720. when parsing an SQL string. */
  721. void
  722. sess_receive_msg_rel_kernel(
  723. /*========================*/
  724. sess_t* sess, /* in: session */
  725. byte* str, /* in: message string */
  726. ulint len) /* in: message length */
  727. {
  728. dulint msg_no;
  729. ulint msg_type;
  730. ulint cont;
  731. ibool is_big_msg = FALSE;
  732. ibool client_waited;
  733. ut_ad(mutex_own(&kernel_mutex));
  734. ut_ad(!sess->disconnecting);
  735. client_waited = sess->client_waits;
  736. sess->client_waits = TRUE;
  737. if (sess->state == SESS_ERROR) {
  738. /* Send a buffered error message */
  739. sess_srv_msg_send_error(sess);
  740. return;
  741. }
  742. if (FALSE == sess_cli_msg_check_consistency(str, len)) {
  743. /* Message from the client was corrupted */
  744. sess_error_low(sess, SESS_ERR_MSG_CORRUPTED, NULL);
  745. return;
  746. }
  747. msg_no = sess_cli_msg_get_msg_no(str);
  748. UT_DULINT_INC(sess->msgs_recv);
  749. if (!UT_DULINT_EQ(msg_no, sess->msgs_recv)) {
  750. sess_error_low(sess, SESS_ERR_MSG_LOST, NULL);
  751. sess->msgs_recv = msg_no;
  752. return;
  753. }
  754. msg_type = sess_cli_msg_get_type(str);
  755. if (msg_type == SESS_CLI_BREAK_EXECUTION) {
  756. sess_receive_break(sess);
  757. return;
  758. }
  759. if (client_waited) {
  760. /* Client sent an extraneous message which is not a break
  761. command: an error */
  762. sess_error_low(sess, SESS_ERR_EXTRANEOUS_MSG, NULL);
  763. return;
  764. }
  765. /*-----------------------------------------------------------*/
  766. /* Handle big messages */
  767. cont = sess_cli_msg_get_continue(str);
  768. if (cont == SESS_MSG_SINGLE_PART) {
  769. if (sess->big_msg) {
  770. sess_error_low(sess, SESS_ERR_MSG_LOST, NULL);
  771. return;
  772. }
  773. } else {
  774. ut_error; /* Not in use */
  775. is_big_msg = sess_receive_msg_part(sess, str, len);
  776. if (is_big_msg) {
  777. str = sess->big_msg;
  778. len = sess->big_msg_len;
  779. sess->big_msg = NULL;
  780. } else {
  781. return;
  782. }
  783. }
  784.   /*-----------------------------------------------------------*/
  785.   /* The session has received a complete message from the client */
  786. ut_ad(!UT_LIST_GET_FIRST((sess->trx)->signals));
  787. if (msg_type == SESS_CLI_PREPARE) {
  788. /* Note that the kernel mutex is temporarily released when
  789. the SQL string is parsed */
  790. sess_receive_prepare(sess, str, len);
  791. } else {
  792. /* Note that the kernel mutex is temporarily released when the
  793. command is executed */
  794. sess_receive_command(sess, str, len, msg_type);
  795. }
  796. if (is_big_msg) {
  797. mem_free(str);
  798. }
  799. }
  800. /***********************************************************************
  801. Opens a new connection and creates a session. */
  802. static
  803. ibool
  804. sess_open_connection(
  805. /*=================*/
  806. byte* str, /* in: message string */
  807. ulint len, /* in: string length */
  808. byte* addr, /* in: user address string */
  809. ulint alen) /* in: user address length */
  810. {
  811. dulint sess_id;
  812. sess_t* sess;
  813. sess_id = mach_read_from_8(str + SESS_CLI_MSG_SESS_ID);
  814. if (!(UT_DULINT_EQ(sess_id, ut_dulint_zero))
  815. || !(sess_cli_msg_get_type(str) == SESS_CLI_CONNECT)) {
  816. /* It is not a valid connect message */
  817. return(FALSE);
  818. }
  819. ut_a(len == SESS_CLI_MSG_DATA);
  820. sess = sess_open(srv_sys->endpoint, addr, alen);
  821. sess_srv_msg_send_simple(sess, SESS_SRV_ACCEPT_CONNECT,
  822. SESS_NOT_RELEASE_KERNEL);
  823. return(TRUE);
  824. }
  825. /***********************************************************************
  826. Starts a new connection and a session, or starts a query based on a client
  827. message. This is called by a SRV_COM thread. */
  828. void
  829. sess_process_cli_msg(
  830. /*=================*/
  831. byte* str, /* in: message string */
  832. ulint len, /* in: string length */
  833. byte* addr, /* in: address string */
  834. ulint alen) /* in: address length */
  835. {
  836. sess_t* sess;
  837. ibool success;
  838. UT_NOT_USED(addr);
  839. UT_NOT_USED(alen);
  840. mutex_enter(&kernel_mutex);
  841.   sess = sess_cli_msg_get_sess(str, len);
  842. if (sess == NULL) {
  843. /* There was no matching session */
  844. if (sess_cli_msg_check_consistency(str, len)) {
  845. /* As the message is consistent, it may be a connect
  846. message */
  847. /* printf("%sn", addr); */
  848. success = sess_open_connection(str, len, addr, alen);
  849. if (success) {
  850. mutex_exit(&kernel_mutex);
  851. return;
  852. }
  853. }
  854. /* Could not make sense of the message: write an error entry
  855. to the system error log */
  856. /* srv_err_log_insert(
  857. "MESSAGE SENT TO AN UNKNOWN SESSION");*/
  858. ut_error;
  859. mutex_exit(&kernel_mutex);
  860. return;
  861. }
  862. if (sess->disconnecting) {
  863. /* srv_err_log_insert(
  864. "MESSAGE SENT TO A DISCONNECTING SESSION");*/
  865. ut_error;
  866. mutex_exit(&kernel_mutex);
  867. return;
  868. }
  869. sess_receive_msg_rel_kernel(sess, str, len);
  870. mutex_exit(&kernel_mutex);
  871. }