aiops.c
上传用户:liugui
上传日期:2007-01-04
资源大小:822k
文件大小:22k
源码类别:

代理服务器

开发平台:

Unix_Linux

  1. /*
  2.  * $Id: aiops.c,v 1.25 1999/01/24 02:26:20 wessels Exp $
  3.  *
  4.  * DEBUG: section 43    AIOPS
  5.  * AUTHOR: Stewart Forster <slf@connect.com.au>
  6.  *
  7.  * SQUID Internet Object Cache  http://squid.nlanr.net/Squid/
  8.  * ----------------------------------------------------------
  9.  *
  10.  *  Squid is the result of efforts by numerous individuals from the
  11.  *  Internet community.  Development is led by Duane Wessels of the
  12.  *  National Laboratory for Applied Network Research and funded by the
  13.  *  National Science Foundation.  Squid is Copyrighted (C) 1998 by
  14.  *  Duane Wessels and the University of California San Diego.  Please
  15.  *  see the COPYRIGHT file for full details.  Squid incorporates
  16.  *  software developed and/or copyrighted by other sources.  Please see
  17.  *  the CREDITS file for full details.
  18.  *
  19.  *  This program is free software; you can redistribute it and/or modify
  20.  *  it under the terms of the GNU General Public License as published by
  21.  *  the Free Software Foundation; either version 2 of the License, or
  22.  *  (at your option) any later version.
  23.  *  
  24.  *  This program is distributed in the hope that it will be useful,
  25.  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
  26.  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  27.  *  GNU General Public License for more details.
  28.  *  
  29.  *  You should have received a copy of the GNU General Public License
  30.  *  along with this program; if not, write to the Free Software
  31.  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
  32.  *
  33.  */
  34. #include "squid.h"
  35. #include <stdio.h>
  36. #include <sys/types.h>
  37. #include <sys/stat.h>
  38. #include <fcntl.h>
  39. #include <pthread.h>
  40. #include <errno.h>
  41. #include <dirent.h>
  42. #include <signal.h>
  43. #if HAVE_SCHED_H
  44. #include <sched.h>
  45. #endif
  46. #ifndef NUMTHREADS
  47. #define NUMTHREADS 16
  48. #endif
  49. #define RIDICULOUS_LENGTH 4096
  50. enum _aio_thread_status {
  51.     _THREAD_STARTING = 0,
  52.     _THREAD_WAITING,
  53.     _THREAD_BUSY,
  54.     _THREAD_FAILED,
  55.     _THREAD_DONE
  56. };
  57. enum _aio_request_type {
  58.     _AIO_OP_NONE = 0,
  59.     _AIO_OP_OPEN,
  60.     _AIO_OP_READ,
  61.     _AIO_OP_WRITE,
  62.     _AIO_OP_CLOSE,
  63.     _AIO_OP_UNLINK,
  64.     _AIO_OP_OPENDIR,
  65.     _AIO_OP_STAT
  66. };
  67. typedef struct aio_request_t {
  68.     enum _aio_request_type request_type;
  69.     int cancelled;
  70.     char *path;
  71.     int oflag;
  72.     mode_t mode;
  73.     int fd;
  74.     char *bufferp;
  75.     char *tmpbufp;
  76.     int buflen;
  77.     off_t offset;
  78.     int whence;
  79.     int ret;
  80.     int err;
  81.     struct stat *tmpstatp;
  82.     struct stat *statp;
  83.     aio_result_t *resultp;
  84.     struct aio_request_t *next;
  85. } aio_request_t;
  86. typedef struct aio_thread_t {
  87.     pthread_t thread;
  88.     enum _aio_thread_status status;
  89.     pthread_mutex_t mutex; /* Mutex for testing condition variable */
  90.     pthread_cond_t cond; /* Condition variable */
  91.     struct aio_request_t *volatile req; /* set by main, cleared by thread */
  92.     struct aio_request_t *processed_req; /* reminder to main */
  93.     struct aio_thread_t *next;
  94. } aio_thread_t;
  95. int aio_cancel(aio_result_t *);
  96. int aio_open(const char *, int, mode_t, aio_result_t *);
  97. int aio_read(int, char *, int, off_t, int, aio_result_t *);
  98. int aio_write(int, char *, int, off_t, int, aio_result_t *);
  99. int aio_close(int, aio_result_t *);
  100. int aio_unlink(const char *, aio_result_t *);
  101. int aio_opendir(const char *, aio_result_t *);
  102. aio_result_t *aio_poll_done();
  103. static void aio_init(void);
  104. static void aio_queue_request(aio_request_t *);
  105. static void aio_process_request_queue(void);
  106. static void aio_cleanup_request(aio_request_t *);
  107. static void *aio_thread_loop(void *);
  108. static void aio_do_open(aio_request_t *);
  109. static void aio_do_read(aio_request_t *);
  110. static void aio_do_write(aio_request_t *);
  111. static void aio_do_close(aio_request_t *);
  112. static void aio_do_stat(aio_request_t *);
  113. static void aio_do_unlink(aio_request_t *);
  114. #if AIO_OPENDIR
  115. static void *aio_do_opendir(aio_request_t *);
  116. #endif
  117. static void aio_debug(aio_request_t *);
  118. static void aio_poll_threads(void);
  119. static aio_thread_t *threads;
  120. static int aio_initialised = 0;
  121. static int request_queue_len = 0;
  122. static MemPool *aio_request_pool = NULL;
  123. static aio_request_t *request_queue_head = NULL;
  124. static aio_request_t *request_queue_tail = NULL;
  125. static aio_request_t *request_done_head = NULL;
  126. static aio_request_t *request_done_tail = NULL;
  127. static aio_thread_t *wait_threads = NULL;
  128. static aio_thread_t *busy_threads_head = NULL;
  129. static aio_thread_t *busy_threads_tail = NULL;
  130. static pthread_attr_t globattr;
  131. static struct sched_param globsched;
  132. static pthread_t main_thread;
  133. static void
  134. aio_init(void)
  135. {
  136.     int i;
  137.     aio_thread_t *threadp;
  138.     if (aio_initialised)
  139. return;
  140.     pthread_attr_init(&globattr);
  141. #if HAVE_PTHREAD_ATTR_SETSCOPE
  142.     pthread_attr_setscope(&globattr, PTHREAD_SCOPE_SYSTEM);
  143. #endif
  144.     globsched.sched_priority = 1;
  145.     main_thread = pthread_self();
  146. #if HAVE_PTHREAD_SETSCHEDPARAM
  147.     pthread_setschedparam(main_thread, SCHED_OTHER, &globsched);
  148. #endif
  149.     globsched.sched_priority = 2;
  150. #if HAVE_PTHREAD_ATTR_SETSCHEDPARAM
  151.     pthread_attr_setschedparam(&globattr, &globsched);
  152. #endif
  153.     /* Create threads and get them to sit in their wait loop */
  154.     threads = xcalloc(NUMTHREADS, sizeof(aio_thread_t));
  155.     for (i = 0; i < NUMTHREADS; i++) {
  156. threadp = &threads[i];
  157. threadp->status = _THREAD_STARTING;
  158. if (pthread_mutex_init(&(threadp->mutex), NULL)) {
  159.     threadp->status = _THREAD_FAILED;
  160.     continue;
  161. }
  162. if (pthread_cond_init(&(threadp->cond), NULL)) {
  163.     threadp->status = _THREAD_FAILED;
  164.     continue;
  165. }
  166. threadp->req = NULL;
  167. threadp->processed_req = NULL;
  168. if (pthread_create(&(threadp->thread), &globattr, aio_thread_loop, threadp)) {
  169.     fprintf(stderr, "Thread creation failedn");
  170.     threadp->status = _THREAD_FAILED;
  171.     continue;
  172. }
  173. threadp->next = wait_threads;
  174. wait_threads = threadp;
  175. #if AIO_PROPER_MUTEX
  176. pthread_mutex_lock(&threadp->mutex);
  177. #endif
  178.     }
  179.     /* Create request pool */
  180.     aio_request_pool = memPoolCreate("aio_request", sizeof(aio_request_t));
  181.     aio_initialised = 1;
  182. }
  183. static void *
  184. aio_thread_loop(void *ptr)
  185. {
  186.     aio_thread_t *threadp = (aio_thread_t *) ptr;
  187.     aio_request_t *request;
  188.     sigset_t new;
  189. #if !AIO_PROPER_MUTEX
  190.     struct timespec wait_time;
  191. #endif
  192.     /* Make sure to ignore signals which may possibly get sent to the parent */
  193.     /* squid thread.  Causes havoc with mutex's and condition waits otherwise */
  194.     sigemptyset(&new);
  195.     sigaddset(&new, SIGPIPE);
  196.     sigaddset(&new, SIGCHLD);
  197. #ifdef _SQUID_LINUX_THREADS_
  198.     sigaddset(&new, SIGQUIT);
  199.     sigaddset(&new, SIGTRAP);
  200. #else
  201.     sigaddset(&new, SIGUSR1);
  202.     sigaddset(&new, SIGUSR2);
  203. #endif
  204.     sigaddset(&new, SIGHUP);
  205.     sigaddset(&new, SIGTERM);
  206.     sigaddset(&new, SIGINT);
  207.     sigaddset(&new, SIGALRM);
  208.     pthread_sigmask(SIG_BLOCK, &new, NULL);
  209.     pthread_mutex_lock(&threadp->mutex);
  210.     while (1) {
  211. #if AIO_PROPER_MUTEX
  212. while (threadp->req == NULL) {
  213.     threadp->status = _THREAD_WAITING;
  214.     pthread_cond_wait(&(threadp->cond), &(threadp->mutex));
  215. }
  216. #else
  217. /* The timeout is used to unlock the race condition where
  218.  * ->req is set between the check and pthread_cond_wait.
  219.  * The thread steps it's own clock on each timeout, to avoid a CPU
  220.  * spin situation if the main thread is suspended (paging), and
  221.  * squid_curtime is not being updated timely.
  222.  */
  223. wait_time.tv_sec = squid_curtime + 1; /* little quicker first time */
  224. wait_time.tv_nsec = 0;
  225. while (threadp->req == NULL) {
  226.     threadp->status = _THREAD_WAITING;
  227.     pthread_cond_timedwait(&(threadp->cond), &(threadp->mutex),
  228. &wait_time);
  229.     wait_time.tv_sec += 3; /* then wait 3 seconds between each check */
  230. }
  231. #endif
  232. request = threadp->req;
  233. errno = 0;
  234. if (!request->cancelled) {
  235.     switch (request->request_type) {
  236.     case _AIO_OP_OPEN:
  237. aio_do_open(request);
  238. break;
  239.     case _AIO_OP_READ:
  240. aio_do_read(request);
  241. break;
  242.     case _AIO_OP_WRITE:
  243. aio_do_write(request);
  244. break;
  245.     case _AIO_OP_CLOSE:
  246. aio_do_close(request);
  247. break;
  248.     case _AIO_OP_UNLINK:
  249. aio_do_unlink(request);
  250. break;
  251. #if AIO_OPENDIR /* Opendir not implemented yet */
  252.     case _AIO_OP_OPENDIR:
  253. aio_do_opendir(request);
  254. break;
  255. #endif
  256.     case _AIO_OP_STAT:
  257. aio_do_stat(request);
  258. break;
  259.     default:
  260. request->ret = -1;
  261. request->err = EINVAL;
  262. break;
  263.     }
  264. } else { /* cancelled */
  265.     request->ret = -1;
  266.     request->err = EINTR;
  267. }
  268. threadp->req = NULL; /* tells main thread that we are done */
  269.     } /* while */
  270. } /* aio_thread_loop */
  271. static void
  272. aio_do_request(aio_request_t * requestp)
  273. {
  274.     if (wait_threads == NULL && busy_threads_head == NULL) {
  275. fprintf(stderr, "PANIC: No threads to service requests with!n");
  276. exit(-1);
  277.     }
  278.     aio_queue_request(requestp);
  279. } /* aio_do_request */
  280. static void
  281. aio_queue_request(aio_request_t * requestp)
  282. {
  283.     aio_request_t *rp;
  284.     static int last_warn = 0;
  285.     static int high_start = 0;
  286.     static int queue_high, queue_low;
  287.     int i;
  288.     /* Mark it as not executed (failing result, no error) */
  289.     requestp->ret = -1;
  290.     requestp->err = 0;
  291.     /* Queue it on the request queue */
  292.     if (request_queue_head == NULL) {
  293. request_queue_head = requestp;
  294. request_queue_tail = requestp;
  295.     } else {
  296. request_queue_tail->next = requestp;
  297. request_queue_tail = requestp;
  298.     }
  299.     requestp->next = NULL;
  300.     request_queue_len += 1;
  301.     /* Poll done threads if needed */
  302.     if (wait_threads == NULL)
  303. aio_poll_threads();
  304.     /* Kick it rolling */
  305.     aio_process_request_queue();
  306.     /* Warn if out of threads */
  307.     if (request_queue_len > (NUMTHREADS >> 1)) {
  308. if (high_start == 0) {
  309.     high_start = squid_curtime;
  310.     queue_high = request_queue_len;
  311.     queue_low = request_queue_len;
  312. }
  313. if (request_queue_len > queue_high)
  314.     queue_high = request_queue_len;
  315. if (request_queue_len < queue_low)
  316.     queue_low = request_queue_len;
  317. if (squid_curtime >= (last_warn + 15) &&
  318.     squid_curtime >= (high_start + 1)) {
  319.     debug(43, 1) ("aio_queue_request: WARNING - Running out of I/O theadsn");
  320.     debug(43, 2) ("aio_queue_request: Queue Length: current=%d, high=%d, low=%d, duration=%dn",
  321. request_queue_len, queue_high, queue_low, squid_curtime - high_start);
  322.     debug(43, 1) ("aio_queue_request: Perhaps you should increase NUMTHREADSn");
  323.     debug(43, 1) ("aio_queue_request: Or install more disks to share the loadn");
  324.     debug(43, 3) ("aio_queue_request: First %d items on request queuen", NUMTHREADS);
  325.     rp = request_queue_head;
  326.     for (i = 1; i <= NUMTHREADS; i++) {
  327. switch (rp->request_type) {
  328. case _AIO_OP_OPEN:
  329.     debug(43, 3) ("aio_queue_request: %d : open -> %sn", i, rp->path);
  330.     break;
  331. case _AIO_OP_READ:
  332.     debug(43, 3) ("aio_queue_request: %d : read -> FD = %dn", i, rp->fd);
  333.     break;
  334. case _AIO_OP_WRITE:
  335.     debug(43, 3) ("aio_queue_request: %d : write -> FD = %dn", i, rp->fd);
  336.     break;
  337. case _AIO_OP_CLOSE:
  338.     debug(43, 3) ("aio_queue_request: %d : close -> FD = %dn", i, rp->fd);
  339.     break;
  340. case _AIO_OP_UNLINK:
  341.     debug(43, 3) ("aio_queue_request: %d : unlink -> %sn", i, rp->path);
  342.     break;
  343. case _AIO_OP_STAT:
  344.     debug(43, 3) ("aio_queue_request: %d : stat -> %sn", i, rp->path);
  345.     break;
  346. default:
  347.     debug(43, 1) ("aio_queue_request: %d : Unimplemented request type: %dn", i, rp->request_type);
  348.     break;
  349. }
  350. if ((rp = rp->next) == NULL)
  351.     break;
  352.     }
  353.     last_warn = squid_curtime;
  354. }
  355.     } else {
  356. high_start = 0;
  357.     }
  358.     if (request_queue_len > RIDICULOUS_LENGTH) {
  359. debug(43, 0) ("aio_queue_request: Async request queue growing uncontrollably!n");
  360. debug(43, 0) ("aio_queue_request: Possible infinite loop somewhere in squid. Restarting...n");
  361. abort();
  362.     }
  363. } /* aio_queue_request */
  364. static void
  365. aio_process_request_queue()
  366. {
  367.     aio_thread_t *threadp;
  368.     aio_request_t *requestp;
  369.     for (;;) {
  370. if (wait_threads == NULL || request_queue_head == NULL)
  371.     return;
  372. requestp = request_queue_head;
  373. if ((request_queue_head = requestp->next) == NULL)
  374.     request_queue_tail = NULL;
  375. requestp->next = NULL;
  376. request_queue_len--;
  377. if (requestp->cancelled) {
  378.     aio_cleanup_request(requestp);
  379.     continue;
  380. }
  381. threadp = wait_threads;
  382. wait_threads = threadp->next;
  383. threadp->next = NULL;
  384. if (busy_threads_head != NULL)
  385.     busy_threads_tail->next = threadp;
  386. else
  387.     busy_threads_head = threadp;
  388. busy_threads_tail = threadp;
  389. threadp->status = _THREAD_BUSY;
  390. threadp->req = threadp->processed_req = requestp;
  391. pthread_cond_signal(&(threadp->cond));
  392. #if AIO_PROPER_MUTEX
  393. pthread_mutex_unlock(&threadp->mutex);
  394. #endif
  395.     }
  396. } /* aio_process_request_queue */
  397. static void
  398. aio_cleanup_request(aio_request_t * requestp)
  399. {
  400.     aio_result_t *resultp = requestp->resultp;
  401.     int cancelled = requestp->cancelled;
  402.     /* Free allocated structures and copy data back to user space if the */
  403.     /* request hasn't been cancelled */
  404.     switch (requestp->request_type) {
  405.     case _AIO_OP_STAT:
  406. if (!cancelled && requestp->ret == 0)
  407.     xmemcpy(requestp->statp, requestp->tmpstatp, sizeof(struct stat));
  408. xfree(requestp->tmpstatp);
  409.     case _AIO_OP_OPEN:
  410. if (cancelled && requestp->ret >= 0)
  411.     /* The open() was cancelled but completed */
  412.     close(requestp->ret);
  413. xfree(requestp->path);
  414. break;
  415.     case _AIO_OP_CLOSE:
  416. if (cancelled && requestp->ret < 0)
  417.     /* The close() was cancelled and never got executed */
  418.     close(requestp->fd);
  419. break;
  420.     case _AIO_OP_UNLINK:
  421.     case _AIO_OP_OPENDIR:
  422. xfree(requestp->path);
  423. break;
  424.     case _AIO_OP_READ:
  425. if (!cancelled && requestp->ret > 0)
  426.     xmemcpy(requestp->bufferp, requestp->tmpbufp, requestp->ret);
  427.     case _AIO_OP_WRITE:
  428. xfree(requestp->tmpbufp);
  429. break;
  430.     default:
  431. break;
  432.     }
  433.     if (resultp != NULL && !cancelled) {
  434. resultp->aio_return = requestp->ret;
  435. resultp->aio_errno = requestp->err;
  436.     }
  437.     memPoolFree(aio_request_pool, requestp);
  438. } /* aio_cleanup_request */
  439. int
  440. aio_cancel(aio_result_t * resultp)
  441. {
  442.     aio_thread_t *threadp;
  443.     aio_request_t *requestp;
  444.     for (threadp = busy_threads_head; threadp != NULL; threadp = threadp->next)
  445. if (threadp->processed_req->resultp == resultp) {
  446.     threadp->processed_req->cancelled = 1;
  447.     threadp->processed_req->resultp = NULL;
  448.     return 0;
  449. }
  450.     for (requestp = request_queue_head; requestp != NULL; requestp = requestp->next)
  451. if (requestp->resultp == resultp) {
  452.     requestp->cancelled = 1;
  453.     requestp->resultp = NULL;
  454.     return 0;
  455. }
  456.     for (requestp = request_done_head; requestp != NULL; requestp = requestp->next)
  457. if (requestp->resultp == resultp) {
  458.     requestp->cancelled = 1;
  459.     requestp->resultp = NULL;
  460.     return 0;
  461. }
  462.     return 1;
  463. } /* aio_cancel */
  464. int
  465. aio_open(const char *path, int oflag, mode_t mode, aio_result_t * resultp)
  466. {
  467.     aio_request_t *requestp;
  468.     int len;
  469.     if (!aio_initialised)
  470. aio_init();
  471.     if ((requestp = memPoolAlloc(aio_request_pool)) == NULL) {
  472. errno = ENOMEM;
  473. return -1;
  474.     }
  475.     len = strlen(path) + 1;
  476.     if ((requestp->path = (char *) xmalloc(len)) == NULL) {
  477. memPoolFree(aio_request_pool, requestp);
  478. errno = ENOMEM;
  479. return -1;
  480.     }
  481.     strncpy(requestp->path, path, len);
  482.     requestp->oflag = oflag;
  483.     requestp->mode = mode;
  484.     requestp->resultp = resultp;
  485.     requestp->request_type = _AIO_OP_OPEN;
  486.     requestp->cancelled = 0;
  487.     aio_do_request(requestp);
  488.     return 0;
  489. }
  490. static void
  491. aio_do_open(aio_request_t * requestp)
  492. {
  493.     requestp->ret = open(requestp->path, requestp->oflag, requestp->mode);
  494.     requestp->err = errno;
  495. }
  496. int
  497. aio_read(int fd, char *bufp, int bufs, off_t offset, int whence, aio_result_t * resultp)
  498. {
  499.     aio_request_t *requestp;
  500.     if (!aio_initialised)
  501. aio_init();
  502.     if ((requestp = memPoolAlloc(aio_request_pool)) == NULL) {
  503. errno = ENOMEM;
  504. return -1;
  505.     }
  506.     requestp->fd = fd;
  507.     requestp->bufferp = bufp;
  508.     if ((requestp->tmpbufp = (char *) xmalloc(bufs)) == NULL) {
  509. memPoolFree(aio_request_pool, requestp);
  510. errno = ENOMEM;
  511. return -1;
  512.     }
  513.     requestp->buflen = bufs;
  514.     requestp->offset = offset;
  515.     requestp->whence = whence;
  516.     requestp->resultp = resultp;
  517.     requestp->request_type = _AIO_OP_READ;
  518.     requestp->cancelled = 0;
  519.     aio_do_request(requestp);
  520.     return 0;
  521. }
  522. static void
  523. aio_do_read(aio_request_t * requestp)
  524. {
  525.     lseek(requestp->fd, requestp->offset, requestp->whence);
  526.     requestp->ret = read(requestp->fd, requestp->tmpbufp, requestp->buflen);
  527.     requestp->err = errno;
  528. }
  529. int
  530. aio_write(int fd, char *bufp, int bufs, off_t offset, int whence, aio_result_t * resultp)
  531. {
  532.     aio_request_t *requestp;
  533.     if (!aio_initialised)
  534. aio_init();
  535.     if ((requestp = memPoolAlloc(aio_request_pool)) == NULL) {
  536. errno = ENOMEM;
  537. return -1;
  538.     }
  539.     requestp->fd = fd;
  540.     if ((requestp->tmpbufp = (char *) xmalloc(bufs)) == NULL) {
  541. memPoolFree(aio_request_pool, requestp);
  542. errno = ENOMEM;
  543. return -1;
  544.     }
  545.     xmemcpy(requestp->tmpbufp, bufp, bufs);
  546.     requestp->buflen = bufs;
  547.     requestp->offset = offset;
  548.     requestp->whence = whence;
  549.     requestp->resultp = resultp;
  550.     requestp->request_type = _AIO_OP_WRITE;
  551.     requestp->cancelled = 0;
  552.     aio_do_request(requestp);
  553.     return 0;
  554. }
  555. static void
  556. aio_do_write(aio_request_t * requestp)
  557. {
  558.     requestp->ret = write(requestp->fd, requestp->tmpbufp, requestp->buflen);
  559.     requestp->err = errno;
  560. }
  561. int
  562. aio_close(int fd, aio_result_t * resultp)
  563. {
  564.     aio_request_t *requestp;
  565.     if (!aio_initialised)
  566. aio_init();
  567.     if ((requestp = memPoolAlloc(aio_request_pool)) == NULL) {
  568. errno = ENOMEM;
  569. return -1;
  570.     }
  571.     requestp->fd = fd;
  572.     requestp->resultp = resultp;
  573.     requestp->request_type = _AIO_OP_CLOSE;
  574.     requestp->cancelled = 0;
  575.     aio_do_request(requestp);
  576.     return 0;
  577. }
  578. static void
  579. aio_do_close(aio_request_t * requestp)
  580. {
  581.     requestp->ret = close(requestp->fd);
  582.     requestp->err = errno;
  583. }
  584. int
  585. aio_stat(const char *path, struct stat *sb, aio_result_t * resultp)
  586. {
  587.     aio_request_t *requestp;
  588.     int len;
  589.     if (!aio_initialised)
  590. aio_init();
  591.     if ((requestp = memPoolAlloc(aio_request_pool)) == NULL) {
  592. errno = ENOMEM;
  593. return -1;
  594.     }
  595.     len = strlen(path) + 1;
  596.     if ((requestp->path = (char *) xmalloc(len)) == NULL) {
  597. memPoolFree(aio_request_pool, requestp);
  598. errno = ENOMEM;
  599. return -1;
  600.     }
  601.     strncpy(requestp->path, path, len);
  602.     requestp->statp = sb;
  603.     if ((requestp->tmpstatp = (struct stat *) xmalloc(sizeof(struct stat))) == NULL) {
  604. xfree(requestp->path);
  605. memPoolFree(aio_request_pool, requestp);
  606. errno = ENOMEM;
  607. return -1;
  608.     }
  609.     requestp->resultp = resultp;
  610.     requestp->request_type = _AIO_OP_STAT;
  611.     requestp->cancelled = 0;
  612.     aio_do_request(requestp);
  613.     return 0;
  614. }
  615. static void
  616. aio_do_stat(aio_request_t * requestp)
  617. {
  618.     requestp->ret = stat(requestp->path, requestp->tmpstatp);
  619.     requestp->err = errno;
  620. }
  621. int
  622. aio_unlink(const char *path, aio_result_t * resultp)
  623. {
  624.     aio_request_t *requestp;
  625.     int len;
  626.     if (!aio_initialised)
  627. aio_init();
  628.     if ((requestp = memPoolAlloc(aio_request_pool)) == NULL) {
  629. errno = ENOMEM;
  630. return -1;
  631.     }
  632.     len = strlen(path) + 1;
  633.     if ((requestp->path = (char *) xmalloc(len)) == NULL) {
  634. memPoolFree(aio_request_pool, requestp);
  635. errno = ENOMEM;
  636. return -1;
  637.     }
  638.     strncpy(requestp->path, path, len);
  639.     requestp->resultp = resultp;
  640.     requestp->request_type = _AIO_OP_UNLINK;
  641.     requestp->cancelled = 0;
  642.     aio_do_request(requestp);
  643.     return 0;
  644. }
  645. static void
  646. aio_do_unlink(aio_request_t * requestp)
  647. {
  648.     requestp->ret = unlink(requestp->path);
  649.     requestp->err = errno;
  650. }
  651. #if AIO_OPENDIR
  652. /* XXX aio_opendir NOT implemented? */
  653. int
  654. aio_opendir(const char *path, aio_result_t * resultp)
  655. {
  656.     aio_request_t *requestp;
  657.     int len;
  658.     if (!aio_initialised)
  659. aio_init();
  660.     if ((requestp = memPoolAlloc(aio_request_pool)) == NULL) {
  661. errno = ENOMEM;
  662. return -1;
  663.     }
  664.     return -1;
  665. }
  666. static void
  667. aio_do_opendir(aio_request_t * requestp)
  668. {
  669.     /* NOT IMPLEMENTED */
  670. }
  671. #endif
  672. void
  673. aio_poll_threads(void)
  674. {
  675.     aio_thread_t *prev;
  676.     aio_thread_t *threadp;
  677.     aio_request_t *requestp;
  678.     do { /* while found completed thread */
  679. prev = NULL;
  680. threadp = busy_threads_head;
  681. while (threadp) {
  682.     debug(43, 5) ("%d: %d -> %dn",
  683. threadp->thread,
  684. threadp->processed_req->request_type,
  685. threadp->status);
  686. #if AIO_PROPER_MUTEX
  687.     if (threadp->req == NULL)
  688. if (pthread_mutex_trylock(&threadp->mutex) == 0)
  689.     break;
  690. #else
  691.     if (threadp->req == NULL)
  692. break;
  693. #endif
  694.     prev = threadp;
  695.     threadp = threadp->next;
  696. }
  697. if (threadp == NULL)
  698.     break;
  699. if (prev == NULL)
  700.     busy_threads_head = busy_threads_head->next;
  701. else
  702.     prev->next = threadp->next;
  703. if (busy_threads_tail == threadp)
  704.     busy_threads_tail = prev;
  705. requestp = threadp->processed_req;
  706. threadp->processed_req = NULL;
  707. threadp->next = wait_threads;
  708. wait_threads = threadp;
  709. if (request_done_tail != NULL)
  710.     request_done_tail->next = requestp;
  711. else
  712.     request_done_head = requestp;
  713. request_done_tail = requestp;
  714.     } while (threadp);
  715.     aio_process_request_queue();
  716. } /* aio_poll_threads */
  717. aio_result_t *
  718. aio_poll_done()
  719. {
  720.     aio_request_t *requestp;
  721.     aio_result_t *resultp;
  722.     int cancelled;
  723.   AIO_REPOLL:
  724.     aio_poll_threads();
  725.     if (request_done_head == NULL) {
  726. return NULL;
  727.     }
  728.     requestp = request_done_head;
  729.     request_done_head = requestp->next;
  730.     if (!request_done_head)
  731. request_done_tail = NULL;
  732.     resultp = requestp->resultp;
  733.     cancelled = requestp->cancelled;
  734.     aio_debug(requestp);
  735.     debug(43, 5) ("DONE: %d -> %dn", requestp->ret, requestp->err);
  736.     aio_cleanup_request(requestp);
  737.     if (cancelled)
  738. goto AIO_REPOLL;
  739.     return resultp;
  740. } /* aio_poll_done */
  741. int
  742. aio_operations_pending(void)
  743. {
  744.     if (request_done_head)
  745. return 1;
  746.     if (busy_threads_head)
  747. return 1;
  748.     return 0;
  749. }
  750. static void
  751. aio_debug(aio_request_t * requestp)
  752. {
  753.     switch (requestp->request_type) {
  754.     case _AIO_OP_OPEN:
  755. debug(43, 5) ("OPEN of %s to FD %dn", requestp->path, requestp->ret);
  756. break;
  757.     case _AIO_OP_READ:
  758. debug(43, 5) ("READ on fd: %dn", requestp->fd);
  759. break;
  760.     case _AIO_OP_WRITE:
  761. debug(43, 5) ("WRITE on fd: %dn", requestp->fd);
  762. break;
  763.     case _AIO_OP_CLOSE:
  764. debug(43, 5) ("CLOSE of fd: %dn", requestp->fd);
  765. break;
  766.     case _AIO_OP_UNLINK:
  767. debug(43, 5) ("UNLINK of %sn", requestp->path);
  768. break;
  769.     default:
  770. break;
  771.     }
  772. }