os0file.c
上传用户:tsgydb
上传日期:2007-04-14
资源大小:10674k
文件大小:46k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /******************************************************
  2. The interface to the operating system file i/o primitives
  3. (c) 1995 Innobase Oy
  4. Created 10/21/1995 Heikki Tuuri
  5. *******************************************************/
  6. #include "os0file.h"
  7. #include "os0sync.h"
  8. #include "ut0mem.h"
  9. #ifdef POSIX_ASYNC_IO
  10. /* We assume in this case that the OS has standard Posix aio (at least SunOS
  11. 2.6, HP-UX 11i and AIX 4.3 have) */
  12. #undef __USE_FILE_OFFSET64
  13. #include <aio.h>
  14. #endif
  15. /* We use these mutexes to protect lseek + file i/o operation, if the
  16. OS does not provide an atomic pread or pwrite, or similar */
  17. #define OS_FILE_N_SEEK_MUTEXES 16
  18. os_mutex_t os_file_seek_mutexes[OS_FILE_N_SEEK_MUTEXES];
  19. /* In simulated aio, merge at most this many consecutive i/os */
  20. #define OS_AIO_MERGE_N_CONSECUTIVE 32
  21. /* If this flag is TRUE, then we will use the native aio of the
  22. OS (provided we compiled Innobase with it in), otherwise we will
  23. use simulated aio we build below with threads */
  24. ibool os_aio_use_native_aio = FALSE;
  25. /* The aio array slot structure */
  26. typedef struct os_aio_slot_struct os_aio_slot_t;
  27. struct os_aio_slot_struct{
  28. ibool is_read; /* TRUE if a read operation */
  29. ulint pos; /* index of the slot in the aio
  30. array */
  31. ibool reserved; /* TRUE if this slot is reserved */
  32. ulint len; /* length of the block to read or
  33. write */
  34. byte* buf; /* buffer used in i/o */
  35. ulint type; /* OS_FILE_READ or OS_FILE_WRITE */
  36. ulint offset; /* 32 low bits of file offset in
  37. bytes */
  38. ulint offset_high; /* 32 high bits of file offset */
  39. os_file_t file; /* file where to read or write */
  40. char* name; /* file name or path */
  41. ibool io_already_done;/* used only in simulated aio:
  42. TRUE if the physical i/o already
  43. made and only the slot message
  44. needs to be passed to the caller
  45. of os_aio_simulated_handle */
  46. void* message1; /* message which is given by the */
  47. void* message2; /* the requester of an aio operation
  48. and which can be used to identify
  49. which pending aio operation was
  50. completed */
  51. #ifdef WIN_ASYNC_IO
  52. OVERLAPPED control; /* Windows control block for the
  53. aio request */
  54. #elif defined(POSIX_ASYNC_IO)
  55. struct aiocb control; /* Posix control block for aio
  56. request */
  57. #endif
  58. };
  59. /* The aio array structure */
  60. typedef struct os_aio_array_struct os_aio_array_t;
  61. struct os_aio_array_struct{
  62. os_mutex_t mutex;   /* the mutex protecting the aio array */
  63. os_event_t not_full; /* The event which is set to signaled
  64.   state when there is space in the aio
  65.   outside the ibuf segment */
  66. ulint n_slots;  /* Total number of slots in the aio array.
  67.   This must be divisible by n_threads. */
  68. ulint n_segments;/* Number of segments in the aio array of
  69.   pending aio requests. A thread can wait
  70.   separately for any one of the segments. */
  71. ulint n_reserved;/* Number of reserved slots in the
  72.   aio array outside the ibuf segment */
  73. os_aio_slot_t*  slots;   /* Pointer to the slots in the array */
  74. os_event_t* events;   /* Pointer to an array of event handles
  75.   where we copied the handles from slots,
  76.   in the same order. This can be used in
  77.   WaitForMultipleObjects; used only in
  78.   Windows */
  79. };
  80. /* Array of events used in simulated aio */
  81. os_event_t* os_aio_segment_wait_events = NULL;
  82. /* The aio arrays for non-ibuf i/o and ibuf i/o, as well as sync aio. These
  83. are NULL when the module has not yet been initialized. */
  84. os_aio_array_t* os_aio_read_array = NULL;
  85. os_aio_array_t* os_aio_write_array = NULL;
  86. os_aio_array_t* os_aio_ibuf_array = NULL;
  87. os_aio_array_t* os_aio_log_array = NULL;
  88. os_aio_array_t* os_aio_sync_array = NULL;
  89. ulint os_aio_n_segments = ULINT_UNDEFINED;
  90. /***************************************************************************
  91. Retrieves the last error number if an error occurs in a file io function.
  92. The number should be retrieved before any other OS calls (because they may
  93. overwrite the error number). If the number is not known to this program,
  94. the OS error number + 100 is returned. */
  95. ulint
  96. os_file_get_last_error(void)
  97. /*========================*/
  98. /* out: error number, or OS error number + 100 */
  99. {
  100. ulint err;
  101. #ifdef __WIN__
  102. err = (ulint) GetLastError();
  103. if (err == ERROR_FILE_NOT_FOUND) {
  104. return(OS_FILE_NOT_FOUND);
  105. } else if (err == ERROR_DISK_FULL) {
  106. return(OS_FILE_DISK_FULL);
  107. } else if (err == ERROR_FILE_EXISTS) {
  108. return(OS_FILE_ALREADY_EXISTS);
  109. } else {
  110. return(100 + err);
  111. }
  112. #else
  113. err = (ulint) errno;
  114. if (err == ENOSPC ) {
  115. return(OS_FILE_DISK_FULL);
  116. #ifdef POSIX_ASYNC_IO
  117. } else if (err == EAGAIN) {
  118. return(OS_FILE_AIO_RESOURCES_RESERVED);
  119. #endif
  120. } else if (err == ENOENT) {
  121. return(OS_FILE_NOT_FOUND);
  122. } else if (err == EEXIST) {
  123. return(OS_FILE_ALREADY_EXISTS);
  124. } else {
  125. return(100 + err);
  126. }
  127. #endif
  128. }
  129. /********************************************************************
  130. Does error handling when a file operation fails. If we have run out
  131. of disk space, then the user can clean the disk. If we do not find
  132. a specified file, then the user can copy it to disk. */
  133. static
  134. ibool
  135. os_file_handle_error(
  136. /*=================*/
  137. /* out: TRUE if we should retry the operation */
  138. os_file_t file, /* in: file pointer */
  139. char* name) /* in: name of a file or NULL */
  140. {
  141. int input_char;
  142. ulint err;
  143. UT_NOT_USED(file);
  144. err = os_file_get_last_error();
  145. if (err == OS_FILE_DISK_FULL) {
  146. ask_again:
  147. printf("n");
  148. if (name) {
  149. printf(
  150. "Innobase encountered a problem with file %s.n",
  151. name);
  152. }
  153. printf("Disk is full. Try to clean the disk to free spacen");
  154. printf("before answering the following: How to continue?n");
  155. printf("(Y == freed some space: try again)n");
  156. printf("(N == crash the database: will restart it)?n");
  157. ask_with_no_question:
  158. input_char = getchar();
  159. if (input_char == (int) 'N') {
  160. ut_error;
  161. return(FALSE);
  162. } else if (input_char == (int) 'Y') {
  163. return(TRUE);
  164. } else if (input_char == (int) 'n') {
  165. goto ask_with_no_question;
  166. } else {
  167. goto ask_again;
  168. }
  169. } else if (err == OS_FILE_AIO_RESOURCES_RESERVED) {
  170. return(TRUE);
  171. } else {
  172. ut_error;
  173. }
  174. return(FALSE);
  175. }
  176. /********************************************************************
  177. Opens an existing file or creates a new. */
  178. os_file_t
  179. os_file_create(
  180. /*===========*/
  181. /* out, own: handle to the file, not defined if error,
  182. error number can be retrieved with os_get_last_error */
  183. char* name, /* in: name of the file or path as a null-terminated
  184. string */
  185. ulint create_mode, /* in: OS_FILE_OPEN if an existing file is opened
  186. (if does not exist, error), or OS_FILE_CREATE if a new
  187. file is created (if exists, error), OS_FILE_OVERWRITE
  188. if a new is created or an old overwritten */
  189. ulint purpose,/* in: OS_FILE_AIO, if asynchronous, non-buffered i/o
  190. is desired, OS_FILE_NORMAL, if any normal file */
  191. ibool* success)/* out: TRUE if succeed, FALSE if error */
  192. {
  193. #ifdef __WIN__
  194. os_file_t file;
  195. DWORD create_flag;
  196. DWORD attributes;
  197. ibool retry;
  198. try_again:
  199. ut_a(name);
  200. if (create_mode == OS_FILE_OPEN) {
  201. create_flag = OPEN_EXISTING;
  202. } else if (create_mode == OS_FILE_CREATE) {
  203. create_flag = CREATE_NEW;
  204. } else if (create_mode == OS_FILE_OVERWRITE) {
  205. create_flag = CREATE_ALWAYS;
  206. } else {
  207. create_flag = 0;
  208. ut_error;
  209. }
  210. if (purpose == OS_FILE_AIO) {
  211. /* use asynchronous (overlapped) io and no buffering
  212. of writes in the OS */
  213. attributes = 0;
  214. #ifdef WIN_ASYNC_IO
  215. if (os_aio_use_native_aio) {
  216. attributes = attributes | FILE_FLAG_OVERLAPPED;
  217. }
  218. #endif
  219. #ifdef UNIV_NON_BUFFERED_IO
  220. attributes = attributes | FILE_FLAG_NO_BUFFERING;
  221. #endif
  222. } else if (purpose == OS_FILE_NORMAL) {
  223. attributes = 0
  224. #ifdef UNIV_NON_BUFFERED_IO
  225.  | FILE_FLAG_NO_BUFFERING
  226. #endif
  227. ;
  228. } else {
  229. attributes = 0;
  230. ut_error;
  231. }
  232. file = CreateFile(name,
  233. GENERIC_READ | GENERIC_WRITE, /* read and write
  234. access */
  235. FILE_SHARE_READ,/* file can be read by other
  236. processes */
  237. NULL, /* default security attributes */
  238. create_flag,
  239. attributes,
  240. NULL); /* no template file */
  241. if (file == INVALID_HANDLE_VALUE) {
  242. *success = FALSE;
  243. if (create_mode != OS_FILE_OPEN
  244.     && os_file_get_last_error() == OS_FILE_DISK_FULL) {
  245. retry = os_file_handle_error(file, name);
  246. if (retry) {
  247. goto try_again;
  248. }
  249. }
  250. } else {
  251. *success = TRUE;
  252. }
  253. return(file);
  254. #else
  255. os_file_t file;
  256. int create_flag;
  257. ibool retry;
  258. try_again:
  259. ut_a(name);
  260. if (create_mode == OS_FILE_OPEN) {
  261. create_flag = O_RDWR;
  262. } else if (create_mode == OS_FILE_CREATE) {
  263. create_flag = O_RDWR | O_CREAT | O_EXCL;
  264. } else if (create_mode == OS_FILE_OVERWRITE) {
  265. create_flag = O_RDWR | O_CREAT | O_TRUNC;
  266. } else {
  267. create_flag = 0;
  268. ut_error;
  269. }
  270. UT_NOT_USED(purpose);
  271. if (create_mode == OS_FILE_CREATE) {
  272. #ifndef S_IRWXU
  273.                 file = open(name, create_flag);
  274. #else
  275.         file = open(name, create_flag, S_IRWXU | S_IRWXG | S_IRWXO);
  276. #endif
  277.         } else {
  278.                 file = open(name, create_flag);
  279.         }
  280. if (file == -1) {
  281. *success = FALSE;
  282. if (create_mode != OS_FILE_OPEN
  283.     && errno == ENOSPC) {
  284. retry = os_file_handle_error(file, name);
  285. if (retry) {
  286. goto try_again;
  287. }
  288. }
  289. } else {
  290. *success = TRUE;
  291. }
  292. return(file);
  293. #endif
  294. }
  295. /***************************************************************************
  296. Closes a file handle. In case of error, error number can be retrieved with
  297. os_file_get_last_error. */
  298. ibool
  299. os_file_close(
  300. /*==========*/
  301. /* out: TRUE if success */
  302. os_file_t file) /* in, own: handle to a file */
  303. {
  304. #ifdef __WIN__
  305. BOOL ret;
  306. ut_a(file);
  307. ret = CloseHandle(file);
  308. if (ret) {
  309. return(TRUE);
  310. }
  311. return(FALSE);
  312. #else
  313. int ret;
  314. ret = close(file);
  315. if (ret == -1) {
  316. return(FALSE);
  317. }
  318. return(TRUE);
  319. #endif
  320. }
  321. /***************************************************************************
  322. Gets a file size. */
  323. ibool
  324. os_file_get_size(
  325. /*=============*/
  326. /* out: TRUE if success */
  327. os_file_t file, /* in: handle to a file */
  328. ulint* size, /* out: least significant 32 bits of file
  329. size */
  330. ulint* size_high)/* out: most significant 32 bits of size */
  331. {
  332. #ifdef __WIN__
  333. DWORD high;
  334. DWORD low;
  335. low = GetFileSize(file, &high);
  336. if ((low == 0xFFFFFFFF) && (GetLastError() != NO_ERROR)) {
  337. return(FALSE);
  338. }
  339. *size = low;
  340. *size_high = high;
  341. return(TRUE);
  342. #else
  343. *size = (ulint) lseek(file, 0, SEEK_END);
  344. *size_high = 0;
  345. return(TRUE);
  346. #endif
  347. }
  348. /***************************************************************************
  349. Sets a file size. This function can be used to extend or truncate a file. */
  350. ibool
  351. os_file_set_size(
  352. /*=============*/
  353. /* out: TRUE if success */
  354. char* name, /* in: name of the file or path as a
  355. null-terminated string */
  356. os_file_t file, /* in: handle to a file */
  357. ulint size, /* in: least significant 32 bits of file
  358. size */
  359. ulint size_high)/* in: most significant 32 bits of size */
  360. {
  361. ulint   offset;
  362. ulint   n_bytes;
  363. ulint low;
  364. ibool ret;
  365. ibool retry;
  366. ulint   i;
  367. byte*   buf;
  368. try_again:
  369. buf = ut_malloc(UNIV_PAGE_SIZE * 64);
  370. /* Write buffer full of zeros */
  371. for (i = 0; i < UNIV_PAGE_SIZE * 64; i++) {
  372.         buf[i] = '';
  373. }
  374. offset = 0;
  375. low = size;
  376. #if (UNIV_WORD_SIZE == 8)
  377. low = low + (size_high << 32);
  378. #else
  379. UT_NOT_USED(size_high);
  380. #endif
  381. while (offset < low) {
  382.         if (low - offset < UNIV_PAGE_SIZE * 64) {
  383.                  n_bytes = low - offset;
  384.         } else {
  385.                  n_bytes = UNIV_PAGE_SIZE * 64;
  386.         }
  387.   
  388.         ret = os_file_write(name, file, buf, offset, 0, n_bytes);
  389.         if (!ret) {
  390. ut_free(buf);
  391.           goto error_handling;
  392.         }
  393.         offset += n_bytes;
  394. }
  395. ut_free(buf);
  396. ret = os_file_flush(file);
  397. if (ret) {
  398.         return(TRUE);
  399. }
  400. error_handling:
  401. retry = os_file_handle_error(file, name); 
  402. if (retry) {
  403. goto try_again;
  404. }
  405. ut_error;
  406. return(FALSE);
  407. }
  408. /***************************************************************************
  409. Flushes the write buffers of a given file to the disk. */
  410. ibool
  411. os_file_flush(
  412. /*==========*/
  413. /* out: TRUE if success */
  414. os_file_t file) /* in, own: handle to a file */
  415. {
  416. #ifdef __WIN__
  417. BOOL ret;
  418. ut_a(file);
  419. ret = FlushFileBuffers(file);
  420. if (ret) {
  421. return(TRUE);
  422. }
  423. return(FALSE);
  424. #else
  425. int ret;
  426. ret = fsync(file);
  427. if (ret == 0) {
  428. return(TRUE);
  429. }
  430. return(FALSE);
  431. #endif
  432. }
  433. #ifndef __WIN__
  434. /***********************************************************************
  435. Does a synchronous read operation in Posix. */
  436. static
  437. ssize_t
  438. os_file_pread(
  439. /*==========*/
  440. /* out: number of bytes read, -1 if error */
  441. os_file_t file, /* in: handle to a file */
  442. void* buf, /* in: buffer where to read */
  443. ulint n, /* in: number of bytes to read */
  444. ulint offset) /* in: offset from where to read */
  445. {
  446. #ifdef HAVE_PREAD
  447. return(pread(file, buf, n, (off_t) offset));
  448. #else
  449. ssize_t ret;
  450. ulint i;
  451. /* Protect the seek / read operation with a mutex */
  452. i = ((ulint) file) % OS_FILE_N_SEEK_MUTEXES;
  453. os_mutex_enter(os_file_seek_mutexes[i]);
  454. ret = lseek(file, (off_t) offset, 0);
  455. if (ret < 0) {
  456. os_mutex_exit(os_file_seek_mutexes[i]);
  457. return(ret);
  458. }
  459. ret = read(file, buf, n);
  460. os_mutex_exit(os_file_seek_mutexes[i]);
  461. return(ret);
  462. #endif
  463. }
  464. /***********************************************************************
  465. Does a synchronous write operation in Posix. */
  466. static
  467. ssize_t
  468. os_file_pwrite(
  469. /*===========*/
  470. /* out: number of bytes written, -1 if error */
  471. os_file_t file, /* in: handle to a file */
  472. void* buf, /* in: buffer from where to write */
  473. ulint n, /* in: number of bytes to write */
  474. ulint offset) /* in: offset where to write */
  475. {
  476. #ifdef HAVE_PWRITE
  477. return(pwrite(file, buf, n, (off_t) offset));
  478. #else
  479. ssize_t ret;
  480. ulint i;
  481. /* Protect the seek / write operation with a mutex */
  482. i = ((ulint) file) % OS_FILE_N_SEEK_MUTEXES;
  483. os_mutex_enter(os_file_seek_mutexes[i]);
  484. ret = lseek(file, (off_t) offset, 0);
  485. if (ret < 0) {
  486. os_mutex_exit(os_file_seek_mutexes[i]);
  487. return(ret);
  488. }
  489. ret = write(file, buf, n);
  490. os_mutex_exit(os_file_seek_mutexes[i]);
  491. return(ret);
  492. #endif
  493. }
  494. #endif
  495. /***********************************************************************
  496. Requests a synchronous positioned read operation. */
  497. ibool
  498. os_file_read(
  499. /*=========*/
  500. /* out: TRUE if request was
  501. successful, FALSE if fail */
  502. os_file_t file, /* in: handle to a file */
  503. void* buf, /* in: buffer where to read */
  504. ulint offset, /* in: least significant 32 bits of file
  505. offset where to read */
  506. ulint offset_high, /* in: most significant 32 bits of
  507. offset */
  508. ulint n) /* in: number of bytes to read */
  509. {
  510. #ifdef __WIN__
  511. BOOL ret;
  512. DWORD len;
  513. DWORD ret2;
  514. DWORD err;
  515. DWORD low;
  516. DWORD high;
  517. ibool retry;
  518. ulint i;
  519. try_again:
  520. ut_ad(file);
  521. ut_ad(buf);
  522. ut_ad(n > 0);
  523. low = offset;
  524. high = offset_high;
  525. /* Protect the seek / read operation with a mutex */
  526. i = ((ulint) file) % OS_FILE_N_SEEK_MUTEXES;
  527. os_mutex_enter(os_file_seek_mutexes[i]);
  528. ret2 = SetFilePointer(file, low, &high, FILE_BEGIN);
  529. if (ret2 == 0xFFFFFFFF && GetLastError() != NO_ERROR) {
  530. err = GetLastError();
  531. os_mutex_exit(os_file_seek_mutexes[i]);
  532. goto error_handling;
  533. ret = ReadFile(file, buf, n, &len, NULL);
  534. os_mutex_exit(os_file_seek_mutexes[i]);
  535. if (ret && len == n) {
  536. return(TRUE);
  537. }
  538. err = GetLastError();
  539. #else
  540. ibool retry;
  541. ssize_t ret;
  542. ulint   i;
  543. #if (UNIV_WORD_SIZE == 8)
  544. offset = offset + (offset_high << 32);
  545. #else
  546. UT_NOT_USED(offset_high);
  547. #endif
  548. try_again:
  549. /* Protect the seek / read operation with a mutex */
  550. i = ((ulint) file) % OS_FILE_N_SEEK_MUTEXES;
  551. os_mutex_enter(os_file_seek_mutexes[i]);
  552. ret = os_file_pread(file, buf, n, (off_t) offset);
  553. if ((ulint)ret == n) {
  554. os_mutex_exit(os_file_seek_mutexes[i]);
  555. return(TRUE);
  556. }
  557. #endif
  558. error_handling:
  559. retry = os_file_handle_error(file, NULL); 
  560. if (retry) {
  561. goto try_again;
  562. }
  563. ut_error;
  564. return(FALSE);
  565. }
  566. /***********************************************************************
  567. Requests a synchronous write operation. */
  568. ibool
  569. os_file_write(
  570. /*==========*/
  571. /* out: TRUE if request was
  572. successful, FALSE if fail */
  573. char* name, /* in: name of the file or path as a
  574. null-terminated string */
  575. os_file_t file, /* in: handle to a file */
  576. void* buf, /* in: buffer from which to write */
  577. ulint offset, /* in: least significant 32 bits of file
  578. offset where to write */
  579. ulint offset_high, /* in: most significant 32 bits of
  580. offset */
  581. ulint n) /* in: number of bytes to write */
  582. {
  583. #ifdef __WIN__
  584. BOOL ret;
  585. DWORD len;
  586. DWORD ret2;
  587. DWORD err;
  588. DWORD low;
  589. DWORD high;
  590. ibool retry;
  591. ulint i;
  592. try_again:
  593. ut_ad(file);
  594. ut_ad(buf);
  595. ut_ad(n > 0);
  596. low = offset;
  597. high = offset_high;
  598. /* Protect the seek / write operation with a mutex */
  599. i = ((ulint) file) % OS_FILE_N_SEEK_MUTEXES;
  600. os_mutex_enter(os_file_seek_mutexes[i]);
  601. ret2 = SetFilePointer(file, low, &high, FILE_BEGIN);
  602. if (ret2 == 0xFFFFFFFF && GetLastError() != NO_ERROR) {
  603. err = GetLastError();
  604. os_mutex_exit(os_file_seek_mutexes[i]);
  605. goto error_handling;
  606. ret = WriteFile(file, buf, n, &len, NULL);
  607. os_mutex_exit(os_file_seek_mutexes[i]);
  608. if (ret && len == n) {
  609. return(TRUE);
  610. }
  611. #else
  612. ibool retry;
  613. ssize_t ret;
  614. #if (UNIV_WORD_SIZE == 8)
  615. offset = offset + (offset_high << 32);
  616. #else
  617. UT_NOT_USED(offset_high);
  618. #endif
  619. try_again:
  620. ret = pwrite(file, buf, n, (off_t) offset);
  621. if ((ulint)ret == n) {
  622. return(TRUE);
  623. }
  624. #endif
  625. error_handling:
  626. retry = os_file_handle_error(file, name); 
  627. if (retry) {
  628. goto try_again;
  629. }
  630. ut_error;
  631. return(FALSE);
  632. }
  633. /********************************************************************
  634. Returns a pointer to the nth slot in the aio array. */
  635. static
  636. os_aio_slot_t*
  637. os_aio_array_get_nth_slot(
  638. /*======================*/
  639. /* out: pointer to slot */
  640. os_aio_array_t* array, /* in: aio array */
  641. ulint index) /* in: index of the slot */
  642. {
  643. ut_a(index < array->n_slots);
  644. return((array->slots) + index);
  645. }
  646. /****************************************************************************
  647. Creates an aio wait array. */
  648. static
  649. os_aio_array_t*
  650. os_aio_array_create(
  651. /*================*/
  652. /* out, own: aio array */
  653. ulint n, /* in: maximum number of pending aio operations
  654. allowed; n must be divisible by n_segments */
  655. ulint n_segments)  /* in: number of segments in the aio array */
  656. {
  657. os_aio_array_t* array;
  658. ulint i;
  659. os_aio_slot_t* slot;
  660. #ifdef WIN_ASYNC_IO
  661. OVERLAPPED* over;
  662. #endif
  663. ut_a(n > 0);
  664. ut_a(n_segments > 0);
  665. ut_a(n % n_segments == 0);
  666. array = ut_malloc(sizeof(os_aio_array_t));
  667. array->mutex  = os_mutex_create(NULL);
  668. array->not_full = os_event_create(NULL);
  669. array->n_slots   = n;
  670. array->n_segments = n_segments;
  671. array->n_reserved = 0;
  672. array->slots = ut_malloc(n * sizeof(os_aio_slot_t));
  673. array->events = ut_malloc(n * sizeof(os_event_t));
  674. for (i = 0; i < n; i++) {
  675. slot = os_aio_array_get_nth_slot(array, i);
  676. slot->pos = i;
  677. slot->reserved = FALSE;
  678. #ifdef WIN_ASYNC_IO
  679. over = &(slot->control);
  680. over->hEvent = os_event_create(NULL);
  681. *((array->events) + i) = over->hEvent;
  682. #endif
  683. }
  684. return(array);
  685. }
  686. /****************************************************************************
  687. Initializes the asynchronous io system. Creates separate aio array for
  688. non-ibuf read and write, a third aio array for the ibuf i/o, with just one
  689. segment, two aio arrays for log reads and writes with one segment, and a
  690. synchronous aio array of the specified size. The combined number of segments
  691. in the three first aio arrays is the parameter n_segments given to the
  692. function. The caller must create an i/o handler thread for each segment in
  693. the four first arrays, but not for the sync aio array. */
  694. void
  695. os_aio_init(
  696. /*========*/
  697. ulint n, /* in: maximum number of pending aio operations
  698. allowed; n must be divisible by n_segments */
  699. ulint n_segments, /* in: combined number of segments in the four
  700. first aio arrays; must be >= 4 */
  701. ulint n_slots_sync) /* in: number of slots in the sync aio array */
  702. {
  703. ulint n_read_segs;
  704. ulint n_write_segs;
  705. ulint n_per_seg;
  706. ulint i;
  707. #ifdef POSIX_ASYNC_IO
  708. sigset_t   sigset;
  709. #endif
  710. ut_ad(n % n_segments == 0);
  711. ut_ad(n_segments >= 4);
  712. n_per_seg = n / n_segments;
  713. n_write_segs = (n_segments - 2) / 2;
  714. n_read_segs = n_segments - 2 - n_write_segs;
  715. /* printf("Array n per seg %lun", n_per_seg); */
  716. os_aio_read_array = os_aio_array_create(n_read_segs * n_per_seg,
  717. n_read_segs);
  718. os_aio_write_array = os_aio_array_create(n_write_segs * n_per_seg,
  719. n_write_segs);
  720. os_aio_ibuf_array = os_aio_array_create(n_per_seg, 1);
  721. os_aio_log_array = os_aio_array_create(n_per_seg, 1);
  722. os_aio_sync_array = os_aio_array_create(n_slots_sync, 1);
  723. os_aio_n_segments = n_segments;
  724. os_aio_validate();
  725. for (i = 0; i < OS_FILE_N_SEEK_MUTEXES; i++) {
  726. os_file_seek_mutexes[i] = os_mutex_create(NULL);
  727. }
  728. os_aio_segment_wait_events = ut_malloc(n_segments * sizeof(void*));
  729. for (i = 0; i < n_segments; i++) {
  730. os_aio_segment_wait_events[i] = os_event_create(NULL);
  731. }
  732. #ifdef POSIX_ASYNC_IO
  733. /* Block aio signals from the current thread and its children:
  734. for this to work, the current thread must be the first created
  735. in the database, so that all its children will inherit its
  736. signal mask */
  737.         sigemptyset(&sigset);
  738. sigaddset(&sigset, SIGRTMIN + 1 + 0);
  739. sigaddset(&sigset, SIGRTMIN + 1 + 1);
  740. sigaddset(&sigset, SIGRTMIN + 1 + 2);
  741. sigaddset(&sigset, SIGRTMIN + 1 + 3);
  742. pthread_sigmask(SIG_BLOCK, &sigset, NULL);
  743. #endif
  744. }
  745. /**************************************************************************
  746. Calculates segment number for a slot. */
  747. static
  748. ulint
  749. os_aio_get_segment_no_from_slot(
  750. /*============================*/
  751. /* out: segment number (which is the number
  752. used by, for example, i/o-handler threads) */
  753. os_aio_array_t* array, /* in: aio wait array */
  754. os_aio_slot_t* slot) /* in: slot in this array */
  755. {
  756. ulint segment;
  757. ulint seg_len;
  758. if (array == os_aio_ibuf_array) {
  759. segment = 0;
  760. } else if (array == os_aio_log_array) {
  761. segment = 1;
  762. } else if (array == os_aio_read_array) {
  763. seg_len = os_aio_read_array->n_slots /
  764. os_aio_read_array->n_segments;
  765. segment = 2 + slot->pos / seg_len;
  766. } else {
  767. ut_a(array == os_aio_write_array);
  768. seg_len = os_aio_write_array->n_slots /
  769. os_aio_write_array->n_segments;
  770. segment = os_aio_read_array->n_segments + 2
  771. + slot->pos / seg_len;
  772. }
  773. return(segment);
  774. }
  775. /**************************************************************************
  776. Calculates local segment number and aio array from global segment number. */
  777. static
  778. ulint
  779. os_aio_get_array_and_local_segment(
  780. /*===============================*/
  781. /* out: local segment number within
  782. the aio array */
  783. os_aio_array_t** array, /* out: aio wait array */
  784. ulint  global_segment)/* in: global segment number */
  785. {
  786. ulint segment;
  787. ut_a(global_segment < os_aio_n_segments);
  788. if (global_segment == 0) {
  789. *array = os_aio_ibuf_array;
  790. segment = 0;
  791. } else if (global_segment == 1) {
  792. *array = os_aio_log_array;
  793. segment = 0;
  794. } else if (global_segment < os_aio_read_array->n_segments + 2) {
  795. *array = os_aio_read_array;
  796. segment = global_segment - 2;
  797. } else {
  798. *array = os_aio_write_array;
  799. segment = global_segment - (os_aio_read_array->n_segments + 2);
  800. }
  801. return(segment);
  802. }
  803. /***********************************************************************
  804. Gets an integer value designating a specified aio array. This is used
  805. to give numbers to signals in Posix aio. */
  806. static
  807. ulint
  808. os_aio_get_array_no(
  809. /*================*/
  810. os_aio_array_t* array) /* in: aio array */
  811. {
  812. if (array == os_aio_ibuf_array) {
  813. return(0);
  814. } else if (array == os_aio_log_array) {
  815. return(1);
  816. } else if (array == os_aio_read_array) {
  817. return(2);
  818. } else if (array == os_aio_write_array) {
  819. return(3);
  820. } else {
  821. ut_a(0);
  822. return(0);
  823. }
  824. }
  825. /***********************************************************************
  826. Gets the aio array for its number. */
  827. static
  828. os_aio_array_t*
  829. os_aio_get_array_from_no(
  830. /*=====================*/
  831. /* out: aio array */
  832. ulint n) /* in: array number */
  833. {
  834. if (n == 0) {
  835. return(os_aio_ibuf_array);
  836. } else if (n == 1) {
  837. return(os_aio_log_array);
  838. } else if (n == 2) {
  839. return(os_aio_read_array);
  840. } else if (n == 3) {
  841. return(os_aio_write_array);
  842. } else {
  843. ut_a(0);
  844. return(NULL);
  845. }
  846. }
  847. /***********************************************************************
  848. Requests for a slot in the aio array. If no slot is available, waits until
  849. not_full-event becomes signaled. */
  850. static
  851. os_aio_slot_t*
  852. os_aio_array_reserve_slot(
  853. /*======================*/
  854. /* out: pointer to slot */
  855. ulint type, /* in: OS_FILE_READ or OS_FILE_WRITE */
  856. os_aio_array_t* array, /* in: aio array */
  857. void* message1,/* in: message to be passed along with
  858. the aio operation */
  859. void* message2,/* in: message to be passed along with
  860. the aio operation */
  861. os_file_t file, /* in: file handle */
  862. char* name, /* in: name of the file or path as a
  863. null-terminated string */
  864. void* buf, /* in: buffer where to read or from which
  865. to write */
  866. ulint offset, /* in: least significant 32 bits of file
  867. offset */
  868. ulint offset_high, /* in: most significant 32 bits of
  869. offset */
  870. ulint len) /* in: length of the block to read or write */
  871. {
  872. os_aio_slot_t* slot;
  873. #ifdef WIN_ASYNC_IO
  874. OVERLAPPED* control;
  875. #elif defined(POSIX_ASYNC_IO)
  876. struct aiocb* control;
  877. #endif
  878. ulint i;
  879. loop:
  880. os_mutex_enter(array->mutex);
  881. if (array->n_reserved == array->n_slots) {
  882. os_mutex_exit(array->mutex);
  883. if (!os_aio_use_native_aio) {
  884. /* If the handler threads are suspended, wake them
  885. so that we get more slots */
  886. os_aio_simulated_wake_handler_threads();
  887. }
  888. os_event_wait(array->not_full);
  889. goto loop;
  890. }
  891. for (i = 0;; i++) {
  892. slot = os_aio_array_get_nth_slot(array, i);
  893. if (slot->reserved == FALSE) {
  894. break;
  895. }
  896. }
  897. array->n_reserved++;
  898. if (array->n_reserved == array->n_slots) {
  899. os_event_reset(array->not_full);
  900. }
  901. slot->reserved = TRUE;
  902. slot->message1 = message1;
  903. slot->message2 = message2;
  904. slot->file     = file;
  905. slot->name     = name;
  906. slot->len      = len;
  907. slot->type     = type;
  908. slot->buf      = buf;
  909. slot->offset   = offset;
  910. slot->offset_high = offset_high;
  911. slot->io_already_done = FALSE;
  912. #ifdef WIN_ASYNC_IO
  913. control = &(slot->control);
  914. control->Offset = (DWORD)offset;
  915. control->OffsetHigh = (DWORD)offset_high;
  916. os_event_reset(control->hEvent);
  917. #elif defined(POSIX_ASYNC_IO)
  918. #if (UNIV_WORD_SIZE == 8)
  919. offset = offset + (offset_high << 32);
  920. #else
  921. ut_a(offset_high == 0);
  922. #endif 
  923. control = &(slot->control);
  924. control->aio_fildes = file;
  925. control->aio_buf = buf;
  926. control->aio_nbytes = len;
  927. control->aio_offset = offset;
  928. control->aio_reqprio = 0;
  929. control->aio_sigevent.sigev_notify = SIGEV_SIGNAL;
  930. control->aio_sigevent.sigev_signo =
  931. SIGRTMIN + 1 + os_aio_get_array_no(array);
  932. /* TODO: How to choose the signal numbers? */
  933. /*
  934. printf("AIO signal number %lun", (ulint) control->aio_sigevent.sigev_signo);
  935. */
  936. control->aio_sigevent.sigev_value.sival_ptr = slot;
  937. #endif
  938. os_mutex_exit(array->mutex);
  939. return(slot);
  940. }
  941. /***********************************************************************
  942. Frees a slot in the aio array. */
  943. static
  944. void
  945. os_aio_array_free_slot(
  946. /*===================*/
  947. os_aio_array_t* array, /* in: aio array */
  948. os_aio_slot_t* slot) /* in: pointer to slot */
  949. {
  950. ut_ad(array);
  951. ut_ad(slot);
  952. os_mutex_enter(array->mutex);
  953. ut_ad(slot->reserved);
  954. slot->reserved = FALSE;
  955. array->n_reserved--;
  956. if (array->n_reserved == array->n_slots - 1) {
  957. os_event_set(array->not_full);
  958. }
  959. #ifdef WIN_ASYNC_IO
  960. os_event_reset(slot->control.hEvent);
  961. #endif
  962. os_mutex_exit(array->mutex);
  963. }
  964. /**************************************************************************
  965. Wakes up a simulated aio i/o-handler thread if it has something to do. */
  966. static
  967. void
  968. os_aio_simulated_wake_handler_thread(
  969. /*=================================*/
  970. ulint global_segment) /* in: the number of the segment in the aio
  971. arrays */
  972. {
  973. os_aio_array_t* array;
  974. ulint segment;
  975. os_aio_slot_t* slot;
  976. ulint n;
  977. ulint i;
  978. ut_ad(!os_aio_use_native_aio);
  979. segment = os_aio_get_array_and_local_segment(&array, global_segment);
  980. n = array->n_slots / array->n_segments;
  981. /* Look through n slots after the segment * n'th slot */
  982. os_mutex_enter(array->mutex);
  983. for (i = 0; i < n; i++) {
  984. slot = os_aio_array_get_nth_slot(array, i + segment * n);
  985. if (slot->reserved) {
  986. /* Found an i/o request */
  987. break;
  988. }
  989. }
  990. os_mutex_exit(array->mutex);
  991. if (i < n) {
  992. os_event_set(os_aio_segment_wait_events[global_segment]);
  993. }
  994. }
  995. /**************************************************************************
  996. Wakes up simulated aio i/o-handler threads if they have something to do. */
  997. void
  998. os_aio_simulated_wake_handler_threads(void)
  999. /*=======================================*/
  1000. {
  1001. ulint i;
  1002. if (os_aio_use_native_aio) {
  1003. /* We do not use simulated aio: do nothing */
  1004. return;
  1005. }
  1006. for (i = 0; i < os_aio_n_segments; i++) {
  1007. os_aio_simulated_wake_handler_thread(i);
  1008. }
  1009. }
  1010. /***********************************************************************
  1011. Requests an asynchronous i/o operation. */
  1012. ibool
  1013. os_aio(
  1014. /*===*/
  1015. /* out: TRUE if request was queued
  1016. successfully, FALSE if fail */
  1017. ulint type, /* in: OS_FILE_READ or OS_FILE_WRITE */
  1018. ulint mode, /* in: OS_AIO_NORMAL, ..., possibly ORed
  1019. to OS_AIO_SIMULATED_WAKE_LATER: the
  1020. last flag advises this function not to wake
  1021. i/o-handler threads, but the caller will
  1022. do the waking explicitly later, in this
  1023. way the caller can post several requests in
  1024. a batch; NOTE that the batch must not be
  1025. so big that it exhausts the slots in aio
  1026. arrays! NOTE that a simulated batch
  1027. may introduce hidden chances of deadlocks,
  1028. because i/os are not actually handled until
  1029. all have been posted: use with great
  1030. caution! */
  1031. char* name, /* in: name of the file or path as a
  1032. null-terminated string */
  1033. os_file_t file, /* in: handle to a file */
  1034. void* buf, /* in: buffer where to read or from which
  1035. to write */
  1036. ulint offset, /* in: least significant 32 bits of file
  1037. offset where to read or write */
  1038. ulint offset_high, /* in: most significant 32 bits of
  1039. offset */
  1040. ulint n, /* in: number of bytes to read or write */
  1041. void* message1,/* in: messages for the aio handler (these
  1042. can be used to identify a completed aio
  1043. operation); if mode is OS_AIO_SYNC, these
  1044. are ignored */
  1045. void* message2)
  1046. {
  1047. os_aio_array_t* array;
  1048. os_aio_slot_t* slot;
  1049. #ifdef WIN_ASYNC_IO
  1050. BOOL ret = TRUE;
  1051. DWORD len = n;
  1052. void* dummy_mess1;
  1053. void* dummy_mess2;
  1054. #endif
  1055. ulint err = 0;
  1056. ibool retry;
  1057. ulint wake_later;
  1058. ut_ad(file);
  1059. ut_ad(buf);
  1060. ut_ad(n > 0);
  1061. ut_ad(n % OS_FILE_LOG_BLOCK_SIZE == 0);
  1062. ut_ad((ulint)buf % OS_FILE_LOG_BLOCK_SIZE == 0)
  1063. ut_ad(offset % OS_FILE_LOG_BLOCK_SIZE == 0);
  1064. ut_ad(os_aio_validate());
  1065. wake_later = mode & OS_AIO_SIMULATED_WAKE_LATER;
  1066. mode = mode & (~OS_AIO_SIMULATED_WAKE_LATER);
  1067. if (mode == OS_AIO_SYNC
  1068. #ifdef WIN_ASYNC_IO
  1069. && !os_aio_use_native_aio
  1070. #endif
  1071. ) {
  1072. /* This is actually an ordinary synchronous read or write:
  1073. no need to use an i/o-handler thread. NOTE that if we use
  1074. Windows async i/o, Windows does not allow us to use
  1075. ordinary synchronous os_file_read etc. on the same file,
  1076. therefore we have built a special mechanism for synchronous
  1077. wait in the Windows case. */
  1078. if (type == OS_FILE_READ) {
  1079. return(os_file_read(file, buf, offset, offset_high, n));
  1080. }
  1081. ut_a(type == OS_FILE_WRITE);
  1082. return(os_file_write(name, file, buf, offset, offset_high, n));
  1083. }
  1084. try_again:
  1085. if (mode == OS_AIO_NORMAL) {
  1086. if (type == OS_FILE_READ) {
  1087. array = os_aio_read_array;
  1088. } else {
  1089. array = os_aio_write_array;
  1090. }
  1091. } else if (mode == OS_AIO_IBUF) {
  1092. ut_ad(type == OS_FILE_READ);
  1093. array = os_aio_ibuf_array;
  1094. } else if (mode == OS_AIO_LOG) {
  1095. array = os_aio_log_array;
  1096. } else if (mode == OS_AIO_SYNC) {
  1097. array = os_aio_sync_array;
  1098. } else {
  1099. ut_error;
  1100. }
  1101. slot = os_aio_array_reserve_slot(type, array, message1, message2, file,
  1102. name, buf, offset, offset_high, n);
  1103. if (type == OS_FILE_READ) {
  1104. if (os_aio_use_native_aio) {
  1105. #ifdef WIN_ASYNC_IO
  1106. ret = ReadFile(file, buf, (DWORD)n, &len,
  1107. &(slot->control));
  1108. #elif defined(POSIX_ASYNC_IO)
  1109. slot->control.aio_lio_opcode = LIO_READ;
  1110. err = (ulint) aio_read(&(slot->control));
  1111. printf("Starting Posix aio read %lun", err);
  1112. #endif
  1113. } else {
  1114. if (!wake_later) {
  1115. os_aio_simulated_wake_handler_thread(
  1116.  os_aio_get_segment_no_from_slot(array, slot));
  1117. }
  1118. }
  1119. } else if (type == OS_FILE_WRITE) {
  1120. if (os_aio_use_native_aio) {
  1121. #ifdef WIN_ASYNC_IO
  1122. ret = WriteFile(file, buf, (DWORD)n, &len,
  1123. &(slot->control));
  1124. #elif defined(POSIX_ASYNC_IO)
  1125. slot->control.aio_lio_opcode = LIO_WRITE;
  1126. err = (ulint) aio_write(&(slot->control));
  1127. printf("Starting Posix aio write %lun", err);
  1128. #endif
  1129. } else {
  1130. if (!wake_later) {
  1131. os_aio_simulated_wake_handler_thread(
  1132.  os_aio_get_segment_no_from_slot(array, slot));
  1133. }
  1134. }
  1135. } else {
  1136. ut_error;
  1137. }
  1138. #ifdef WIN_ASYNC_IO
  1139. if (os_aio_use_native_aio) {
  1140. if ((ret && len == n)
  1141. || (!ret && GetLastError() == ERROR_IO_PENDING)) {
  1142. /* aio was queued successfully! */
  1143.      if (mode == OS_AIO_SYNC) {
  1144.          /* We want a synchronous i/o operation on a file
  1145.          where we also use async i/o: in Windows we must
  1146.          use the same wait mechanism as for async i/o */
  1147.     
  1148.          return(os_aio_windows_handle(ULINT_UNDEFINED,
  1149. slot->pos,
  1150.      &dummy_mess1, &dummy_mess2));
  1151.      }
  1152. return(TRUE);
  1153. }
  1154. goto error_handling;
  1155. }
  1156. #endif
  1157. if (err == 0) {
  1158. /* aio was queued successfully! */
  1159. return(TRUE);
  1160. }
  1161. os_aio_array_free_slot(array, slot);
  1162. retry = os_file_handle_error(file, name);
  1163. if (retry) {
  1164. goto try_again;
  1165. }
  1166. ut_error;
  1167. return(FALSE);
  1168. }
  1169. #ifdef WIN_ASYNC_IO
  1170. /**************************************************************************
  1171. This function is only used in Windows asynchronous i/o.
  1172. Waits for an aio operation to complete. This function is used to wait the
  1173. for completed requests. The aio array of pending requests is divided
  1174. into segments. The thread specifies which segment or slot it wants to wait
  1175. for. NOTE: this function will also take care of freeing the aio slot,
  1176. therefore no other thread is allowed to do the freeing! */
  1177. ibool
  1178. os_aio_windows_handle(
  1179. /*==================*/
  1180. /* out: TRUE if the aio operation succeeded */
  1181. ulint segment, /* in: the number of the segment in the aio
  1182. arrays to wait for; segment 0 is the ibuf
  1183. i/o thread, segment 1 the log i/o thread,
  1184. then follow the non-ibuf read threads, and as
  1185. the last are the non-ibuf write threads; if
  1186. this is ULINT_UNDEFINED, then it means that
  1187. sync aio is used, and this parameter is
  1188. ignored */
  1189. ulint pos, /* this parameter is used only in sync aio:
  1190. wait for the aio slot at this position */  
  1191. void** message1, /* out: the messages passed with the aio
  1192. request; note that also in the case where
  1193. the aio operation failed, these output
  1194. parameters are valid and can be used to
  1195. restart the operation, for example */
  1196. void** message2)
  1197. {
  1198. os_aio_array_t* array;
  1199. os_aio_slot_t* slot;
  1200. ulint n;
  1201. ulint i;
  1202. ibool ret_val;
  1203. ulint err;
  1204. BOOL ret;
  1205. DWORD len;
  1206. if (segment == ULINT_UNDEFINED) {
  1207. array = os_aio_sync_array;
  1208. segment = 0;
  1209. } else {
  1210. segment = os_aio_get_array_and_local_segment(&array, segment);
  1211. }
  1212. /* NOTE! We only access constant fields in os_aio_array. Therefore
  1213. we do not have to acquire the protecting mutex yet */
  1214. ut_ad(os_aio_validate());
  1215. ut_ad(segment < array->n_segments);
  1216. n = array->n_slots / array->n_segments;
  1217. if (array == os_aio_sync_array) {
  1218. ut_ad(pos < array->n_slots); 
  1219. os_event_wait(array->events[pos]);
  1220. i = pos;
  1221. } else {
  1222. i = os_event_wait_multiple(n, (array->events) + segment * n);
  1223. }
  1224. os_mutex_enter(array->mutex);
  1225. slot = os_aio_array_get_nth_slot(array, i + segment * n);
  1226. ut_a(slot->reserved);
  1227. ret = GetOverlappedResult(slot->file, &(slot->control), &len, TRUE);
  1228. *message1 = slot->message1;
  1229. *message2 = slot->message2;
  1230. if (ret && len == slot->len) {
  1231. ret_val = TRUE;
  1232. } else {
  1233. err = GetLastError();
  1234. ut_error;
  1235. ret_val = FALSE;
  1236. }   
  1237. os_mutex_exit(array->mutex);
  1238. os_aio_array_free_slot(array, slot);
  1239. return(ret_val);
  1240. }
  1241. #endif
  1242. #ifdef POSIX_ASYNC_IO
  1243. /**************************************************************************
  1244. This function is only used in Posix asynchronous i/o. Waits for an aio
  1245. operation to complete. */
  1246. ibool
  1247. os_aio_posix_handle(
  1248. /*================*/
  1249. /* out: TRUE if the aio operation succeeded */
  1250. ulint array_no, /* in: array number 0 - 3 */
  1251. void** message1, /* out: the messages passed with the aio
  1252. request; note that also in the case where
  1253. the aio operation failed, these output
  1254. parameters are valid and can be used to
  1255. restart the operation, for example */
  1256. void** message2)
  1257. {
  1258. os_aio_array_t* array;
  1259. os_aio_slot_t* slot;
  1260. siginfo_t info;
  1261. sigset_t sigset;
  1262. sigset_t        proc_sigset;
  1263. sigset_t        thr_sigset;
  1264. int ret;
  1265. int             i;
  1266. int             sig;
  1267.         sigemptyset(&sigset);
  1268. sigaddset(&sigset, SIGRTMIN + 1 + array_no);
  1269. pthread_sigmask(SIG_UNBLOCK, &sigset, NULL);
  1270. /*
  1271. sigprocmask(0, NULL, &proc_sigset);
  1272. pthread_sigmask(0, NULL, &thr_sigset);
  1273. for (i = 32 ; i < 40; i++) {
  1274.   printf("%lu : %lu %lun", (ulint)i,
  1275.  (ulint)sigismember(&proc_sigset, i),
  1276.  (ulint)sigismember(&thr_sigset, i));
  1277. }
  1278. */
  1279. ret = sigwaitinfo(&sigset, &info);
  1280. if (sig != SIGRTMIN + 1 + array_no) {
  1281. ut_a(0);
  1282. return(FALSE);
  1283. }
  1284. printf("Handling Posix aion");
  1285. array = os_aio_get_array_from_no(array_no);
  1286. os_mutex_enter(array->mutex);
  1287. slot = info.si_value.sival_ptr;
  1288. ut_a(slot->reserved);
  1289. *message1 = slot->message1;
  1290. *message2 = slot->message2;
  1291. os_mutex_exit(array->mutex);
  1292. os_aio_array_free_slot(array, slot);
  1293. return(TRUE);
  1294. }
  1295. #endif
  1296. /**************************************************************************
  1297. Does simulated aio. This function should be called by an i/o-handler
  1298. thread. */
  1299. ibool
  1300. os_aio_simulated_handle(
  1301. /*====================*/
  1302. /* out: TRUE if the aio operation succeeded */
  1303. ulint global_segment, /* in: the number of the segment in the aio
  1304. arrays to wait for; segment 0 is the ibuf
  1305. i/o thread, segment 1 the log i/o thread,
  1306. then follow the non-ibuf read threads, and as
  1307. the last are the non-ibuf write threads */
  1308. void** message1, /* out: the messages passed with the aio
  1309. request; note that also in the case where
  1310. the aio operation failed, these output
  1311. parameters are valid and can be used to
  1312. restart the operation, for example */
  1313. void** message2)
  1314. {
  1315. os_aio_array_t* array;
  1316. ulint segment;
  1317. os_aio_slot_t* slot;
  1318. os_aio_slot_t* slot2;
  1319. os_aio_slot_t* consecutive_ios[OS_AIO_MERGE_N_CONSECUTIVE];
  1320. ulint n_consecutive;
  1321. ulint total_len;
  1322. ulint offs;
  1323. ulint lowest_offset;
  1324. byte* combined_buf;
  1325. ibool ret;
  1326. ulint n;
  1327. ulint i;
  1328. segment = os_aio_get_array_and_local_segment(&array, global_segment);
  1329. restart:
  1330. /* Give other threads chance to add several i/os to the array
  1331. at once */
  1332. os_thread_yield();
  1333. /* NOTE! We only access constant fields in os_aio_array. Therefore
  1334. we do not have to acquire the protecting mutex yet */
  1335. ut_ad(os_aio_validate());
  1336. ut_ad(segment < array->n_segments);
  1337. n = array->n_slots / array->n_segments;
  1338. /* Look through n slots after the segment * n'th slot */
  1339. os_mutex_enter(array->mutex);
  1340. /* Check if there is a slot for which the i/o has already been
  1341. done */
  1342. for (i = 0; i < n; i++) {
  1343. slot = os_aio_array_get_nth_slot(array, i + segment * n);
  1344. if (slot->reserved && slot->io_already_done) {
  1345. ret = TRUE;
  1346. goto slot_io_done;
  1347. }
  1348. }
  1349. n_consecutive = 0;
  1350. /* Look for an i/o request at the lowest offset in the array */
  1351. lowest_offset = ULINT_MAX;
  1352. for (i = 0; i < n; i++) {
  1353. slot = os_aio_array_get_nth_slot(array, i + segment * n);
  1354. if (slot->reserved && slot->offset < lowest_offset) {
  1355. /* Found an i/o request */
  1356. consecutive_ios[0] = slot;
  1357. n_consecutive = 1;
  1358. lowest_offset = slot->offset;
  1359. }
  1360. }
  1361. if (n_consecutive == 0) {
  1362. /* No i/o requested at the moment */
  1363. goto wait_for_io;
  1364. }
  1365. slot = consecutive_ios[0];
  1366. /* Check if there are several consecutive blocks to read or write */
  1367. consecutive_loop:
  1368. for (i = 0; i < n; i++) {
  1369. slot2 = os_aio_array_get_nth_slot(array, i + segment * n);
  1370. if (slot2->reserved && slot2 != slot
  1371.     && slot2->offset == slot->offset + slot->len
  1372.     && slot->offset + slot->len > slot->offset /* check that
  1373. sum does not wrap over */
  1374.     && slot2->offset_high == slot->offset_high
  1375.     && slot2->type == slot->type
  1376.     && slot2->file == slot->file) {
  1377. /* Found a consecutive i/o request */
  1378. consecutive_ios[n_consecutive] = slot2;
  1379. n_consecutive++;
  1380. slot = slot2;
  1381. if (n_consecutive < OS_AIO_MERGE_N_CONSECUTIVE) {
  1382. goto consecutive_loop;
  1383. } else {
  1384. break;
  1385. }
  1386. }
  1387. }
  1388. /* We have now collected n_consecutive i/o requests in the array;
  1389. allocate a single buffer which can hold all data, and perform the
  1390. i/o */
  1391. total_len = 0;
  1392. slot = consecutive_ios[0];
  1393. for (i = 0; i < n_consecutive; i++) {
  1394. total_len += consecutive_ios[i]->len;
  1395. }
  1396. if (n_consecutive == 1) {
  1397. /* We can use the buffer of the i/o request */
  1398. combined_buf = slot->buf;
  1399. } else {
  1400. combined_buf = ut_malloc(total_len);
  1401. ut_a(combined_buf);
  1402. }
  1403. /* We release the array mutex for the time of the i/o: NOTE that
  1404. this assumes that there is just one i/o-handler thread serving
  1405. a single segment of slots! */
  1406. os_mutex_exit(array->mutex);
  1407. if (slot->type == OS_FILE_WRITE && n_consecutive > 1) {
  1408. /* Copy the buffers to the combined buffer */
  1409. offs = 0;
  1410. for (i = 0; i < n_consecutive; i++) {
  1411. ut_memcpy(combined_buf + offs, consecutive_ios[i]->buf,
  1412. consecutive_ios[i]->len);
  1413. offs += consecutive_ios[i]->len;
  1414. }
  1415. }
  1416. /* Do the i/o with ordinary, synchronous i/o functions: */
  1417. if (slot->type == OS_FILE_WRITE) {
  1418. ret = os_file_write(slot->name, slot->file, combined_buf,
  1419. slot->offset, slot->offset_high, total_len);
  1420. } else {
  1421. ret = os_file_read(slot->file, combined_buf,
  1422. slot->offset, slot->offset_high, total_len);
  1423. }
  1424. ut_a(ret);
  1425. /* printf("aio: %lu consecutive %lu:th segment, first offs %lu blocksn",
  1426. n_consecutive, global_segment, slot->offset
  1427. / UNIV_PAGE_SIZE); */
  1428. if (slot->type == OS_FILE_READ && n_consecutive > 1) {
  1429. /* Copy the combined buffer to individual buffers */
  1430. offs = 0;
  1431. for (i = 0; i < n_consecutive; i++) {
  1432. ut_memcpy(consecutive_ios[i]->buf, combined_buf + offs, 
  1433. consecutive_ios[i]->len);
  1434. offs += consecutive_ios[i]->len;
  1435. }
  1436. }
  1437. if (n_consecutive > 1) {
  1438. ut_free(combined_buf);
  1439. }
  1440. os_mutex_enter(array->mutex);
  1441. /* Mark the i/os done in slots */
  1442. for (i = 0; i < n_consecutive; i++) {
  1443. consecutive_ios[i]->io_already_done = TRUE;
  1444. }
  1445. /* We return the messages for the first slot now, and if there were
  1446. several slots, the messages will be returned with subsequent calls
  1447. of this function */
  1448. slot_io_done:
  1449. ut_a(slot->reserved);
  1450. *message1 = slot->message1;
  1451. *message2 = slot->message2;
  1452. os_mutex_exit(array->mutex);
  1453. os_aio_array_free_slot(array, slot);
  1454. return(ret);
  1455. wait_for_io:
  1456. /* We wait here until there again can be i/os in the segment
  1457. of this thread */
  1458. os_event_reset(os_aio_segment_wait_events[global_segment]);
  1459. os_mutex_exit(array->mutex);
  1460. os_event_wait(os_aio_segment_wait_events[global_segment]);
  1461. goto restart;
  1462. }
  1463. /**************************************************************************
  1464. Validates the consistency of an aio array. */
  1465. static
  1466. ibool
  1467. os_aio_array_validate(
  1468. /*==================*/
  1469. /* out: TRUE if ok */
  1470. os_aio_array_t* array) /* in: aio wait array */
  1471. {
  1472. os_aio_slot_t* slot;
  1473. ulint n_reserved = 0;
  1474. ulint i;
  1475. ut_a(array);
  1476. os_mutex_enter(array->mutex);
  1477. ut_a(array->n_slots > 0);
  1478. ut_a(array->n_segments > 0);
  1479. for (i = 0; i < array->n_slots; i++) {
  1480. slot = os_aio_array_get_nth_slot(array, i);
  1481. if (slot->reserved) {
  1482. n_reserved++;
  1483. ut_a(slot->len > 0);
  1484. }
  1485. }
  1486. ut_a(array->n_reserved == n_reserved);
  1487. os_mutex_exit(array->mutex);
  1488. return(TRUE);
  1489. }
  1490. /**************************************************************************
  1491. Validates the consistency the aio system. */
  1492. ibool
  1493. os_aio_validate(void)
  1494. /*=================*/
  1495. /* out: TRUE if ok */
  1496. {
  1497. os_aio_array_validate(os_aio_read_array);
  1498. os_aio_array_validate(os_aio_write_array);
  1499. os_aio_array_validate(os_aio_ibuf_array);
  1500. os_aio_array_validate(os_aio_log_array);
  1501. os_aio_array_validate(os_aio_sync_array);
  1502. return(TRUE);
  1503. }
  1504. /**************************************************************************
  1505. Prints info of the aio arrays. */
  1506. void
  1507. os_aio_print(void)
  1508. /*==============*/
  1509. {
  1510. os_aio_array_t* array;
  1511. os_aio_slot_t* slot;
  1512. ulint n_reserved;
  1513. ulint i;
  1514. array = os_aio_read_array;
  1515. loop:
  1516. ut_a(array);
  1517. printf("INFO OF AN AIO ARRAYn");
  1518. os_mutex_enter(array->mutex);
  1519. ut_a(array->n_slots > 0);
  1520. ut_a(array->n_segments > 0);
  1521. n_reserved = 0;
  1522. for (i = 0; i < array->n_slots; i++) {
  1523. slot = os_aio_array_get_nth_slot(array, i);
  1524. if (slot->reserved) {
  1525. n_reserved++;
  1526. printf("Reserved slot, messages %lx %lxn",
  1527. (ulint)slot->message1,
  1528. (ulint)slot->message2);
  1529. ut_a(slot->len > 0);
  1530. }
  1531. }
  1532. ut_a(array->n_reserved == n_reserved);
  1533. printf("Total of %lu reserved aio slotsn", n_reserved);
  1534. os_mutex_exit(array->mutex);
  1535. if (array == os_aio_read_array) {
  1536. array = os_aio_write_array;
  1537. goto loop;
  1538. }
  1539. if (array == os_aio_write_array) {
  1540. array = os_aio_ibuf_array;
  1541. goto loop;
  1542. }
  1543. if (array == os_aio_ibuf_array) {
  1544. array = os_aio_log_array;
  1545. goto loop;
  1546. }
  1547. if (array == os_aio_log_array) {
  1548. array = os_aio_sync_array;
  1549. goto loop;
  1550. }
  1551. }
  1552. /**************************************************************************
  1553. Checks that all slots in the system have been freed, that is, there are
  1554. no pending io operations. */
  1555. ibool
  1556. os_aio_all_slots_free(void)
  1557. /*=======================*/
  1558. /* out: TRUE if all free */
  1559. {
  1560. os_aio_array_t* array;
  1561. ulint n_res = 0;
  1562. array = os_aio_read_array;
  1563. os_mutex_enter(array->mutex);
  1564. n_res += array->n_reserved; 
  1565. os_mutex_exit(array->mutex);
  1566. array = os_aio_write_array;
  1567. os_mutex_enter(array->mutex);
  1568. n_res += array->n_reserved; 
  1569. os_mutex_exit(array->mutex);
  1570. array = os_aio_ibuf_array;
  1571. os_mutex_enter(array->mutex);
  1572. n_res += array->n_reserved; 
  1573. os_mutex_exit(array->mutex);
  1574. array = os_aio_log_array;
  1575. os_mutex_enter(array->mutex);
  1576. n_res += array->n_reserved; 
  1577. os_mutex_exit(array->mutex);
  1578. array = os_aio_sync_array;
  1579. os_mutex_enter(array->mutex);
  1580. n_res += array->n_reserved; 
  1581. os_mutex_exit(array->mutex);
  1582. if (n_res == 0) {
  1583. return(TRUE);
  1584. }
  1585. return(FALSE);
  1586. }