fdwatch_kqueue.c
上传用户:tany51
上传日期:2013-06-12
资源大小:1397k
文件大小:9k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /*
  2.   * Abstraction API/layer for the various ways PvPGN can inspect sockets state
  3.   * 2003 (C) dizzy@roedu.net
  4.   *
  5.   * Code is based on the ideas found in thttpd project.
  6.   *
  7.   * *BSD kqueue() based backend
  8.   *
  9.   * This program is free software; you can redistribute it and/or
  10.   * modify it under the terms of the GNU General Public License
  11.   * as published by the Free Software Foundation; either version 2
  12.   * of the License, or (at your option) any later version.
  13.   *
  14.   * This program is distributed in the hope that it will be useful,
  15.   * but WITHOUT ANY WARRANTY; without even the implied warranty of
  16.   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  17.   * GNU General Public License for more details.
  18.   *
  19.   * You should have received a copy of the GNU General Public License
  20.   * along with this program; if not, write to the Free Software
  21.   * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
  22.   */
  23. #include "common/setup_before.h"
  24. #ifdef STDC_HEADERS
  25. # include <stdlib.h>
  26. #else
  27. # ifdef HAVE_MALLOC_H
  28. #  include <malloc.h>
  29. # endif
  30. #endif
  31. #ifdef HAVE_STRING_H
  32. # include <string.h>
  33. #else
  34. # ifdef HAVE_STRINGS_H
  35. #  include <strings.h>
  36. # endif
  37. #endif
  38. #ifdef HAVE_SYS_TYPES_H
  39. # include <sys/types.h>
  40. #endif
  41. #ifdef HAVE_SYS_EVENT_H
  42. # include <sys/event.h>
  43. #endif
  44. #ifdef HAVE_SYS_TIME_H
  45. # include <sys/time.h>
  46. #endif
  47. #include "fdwatch.h"
  48. #include "common/eventlog.h"
  49. #include "common/setup_after.h"
  50. #ifdef HAVE_KQUEUE
  51. static int sr;
  52. static int kq;
  53. static struct kevent *kqchanges = NULL; /* changes to make to kqueue */
  54. static struct kevent *kqevents = NULL; /* events to investigate */
  55. static int *fdw_rridx, *fdw_wridx;
  56. static unsigned nofds;
  57. static int fdw_kqueue_init(int nfds);
  58. static int fdw_kqueue_close(void);
  59. static int fdw_kqueue_add_fd(int fd, t_fdwatch_type rw);
  60. static int fdw_kqueue_del_fd(int fd);
  61. static int fdw_kqueue_watch(long timeout_msecs);
  62. static void fdw_kqueue_handle(void);
  63. t_fdw_backend fdw_kqueue = {
  64.     fdw_kqueue_init,
  65.     fdw_kqueue_close,
  66.     fdw_kqueue_add_fd,
  67.     fdw_kqueue_del_fd,
  68.     fdw_kqueue_watch,
  69.     fdw_kqueue_handle
  70. };
  71. static int fdw_kqueue_init(int nfds)
  72. {
  73.     int i;
  74.     if ((kq = kqueue()) == -1)
  75. return -1;
  76.     kqevents = (struct kevent *) malloc(sizeof(struct kevent) * nfds);
  77.     kqchanges = (struct kevent *) malloc(sizeof(struct kevent) * nfds * 2);
  78.     fdw_rridx = (int *) malloc(sizeof(int) * nfds);
  79.     fdw_wridx = (int *) malloc(sizeof(int) * nfds);
  80.     if (fdw_rridx == NULL || fdw_wridx == NULL || kqevents == NULL || kqchanges == NULL)
  81.     {
  82. fdw_kqueue_close();
  83. return -1;
  84.     }
  85.     memset(kqchanges, 0, sizeof(struct kevent) * nfds);
  86.     for (i = 0; i < nfds; i++)
  87.     {
  88. fdw_rridx[i] = -1;
  89. fdw_wridx[i] = -1;
  90.     }
  91.     sr = 0;
  92.     nofds = 0;
  93.     eventlog(eventlog_level_info, __FUNCTION__, "fdwatch kqueue() based layer initialized (max %d sockets)", nfds);
  94.     return 0;
  95. }
  96. static int fdw_kqueue_close(void)
  97. {
  98.     if (fdw_rridx) { free((void *) fdw_rridx); fdw_rridx = NULL; }
  99.     if (fdw_wridx) { free((void *) fdw_wridx); fdw_wridx = NULL; }
  100.     if (kqchanges) { free((void *) kqchanges); kqchanges = NULL; }
  101.     if (kqevents) { free((void *) kqevents); kqevents = NULL; }
  102.     sr = 0;
  103.     nofds = 0;
  104.     return 0;
  105. }
  106. static int fdw_kqueue_add_fd(int fd, t_fdwatch_type rw)
  107. {
  108.     static int ridx;
  109. /*    eventlog(eventlog_level_trace, __FUNCTION__, "called fd: %d rw: %d", fd, rw); */
  110.     /* adding read event filter */
  111.     if (!(fdw_rw[fd] & fdwatch_type_read) && rw & fdwatch_type_read)
  112.     {
  113. if (fdw_rridx[fd] >= 0 && fdw_rridx[fd] < nofds && kqchanges[fdw_rridx[fd]].ident == fd)
  114. {
  115.     ridx = fdw_rridx[fd];
  116. /*     eventlog(eventlog_level_trace, __FUNCTION__, "updating change event (read) fd on %d", ridx); */
  117. } else {
  118.     ridx = nofds++;
  119.     fdw_rridx[fd] = ridx;
  120. /*     eventlog(eventlog_level_trace, __FUNCTION__, "adding new change event (read) fd on %d", ridx); */
  121. }
  122. EV_SET(kqchanges + ridx, fd, EVFILT_READ, EV_ADD, 0, 0, 0);
  123.     } 
  124.     else if (fdw_rw[fd] & fdwatch_type_read && !( rw & fdwatch_type_read))
  125.     {
  126. if (fdw_rridx[fd] >= 0 && fdw_rridx[fd] < nofds && kqchanges[fdw_rridx[fd]].ident == fd)
  127. {
  128.     ridx = fdw_rridx[fd];
  129. /*     eventlog(eventlog_level_trace, __FUNCTION__, "updating change event (read) fd on %d", ridx); */
  130. } else {
  131.     ridx = nofds++;
  132.     fdw_rridx[fd] = ridx;
  133. /*     eventlog(eventlog_level_trace, __FUNCTION__, "adding new change event (read) fd on %d", ridx); */
  134. }
  135. EV_SET(kqchanges + ridx, fd, EVFILT_READ, EV_DELETE, 0, 0, 0);
  136.     }
  137.     /* adding write event filter */
  138.     if (!(fdw_rw[fd] & fdwatch_type_write) && rw & fdwatch_type_write)
  139.     {
  140. if (fdw_wridx[fd] >= 0 && fdw_wridx[fd] < nofds && kqchanges[fdw_wridx[fd]].ident == fd)
  141. {
  142.     ridx = fdw_wridx[fd];
  143. /*     eventlog(eventlog_level_trace, __FUNCTION__, "updating change event (write) fd on %d", ridx); */
  144. } else {
  145.     ridx = nofds++;
  146.     fdw_wridx[fd] = ridx;
  147. /*     eventlog(eventlog_level_trace, __FUNCTION__, "adding new change event (write) fd on %d", ridx); */
  148. }
  149. EV_SET(kqchanges + ridx, fd, EVFILT_WRITE, EV_ADD, 0, 0, 0);
  150.     }
  151.     else if (fdw_rw[fd] & fdwatch_type_write && !(rw & fdwatch_type_write))
  152.     {
  153. if (fdw_wridx[fd] >= 0 && fdw_wridx[fd] < nofds && kqchanges[fdw_wridx[fd]].ident == fd)
  154. {
  155.     ridx = fdw_wridx[fd];
  156. /*     eventlog(eventlog_level_trace, __FUNCTION__, "updating change event (write) fd on %d", ridx); */
  157. } else {
  158.     ridx = nofds++;
  159.     fdw_wridx[fd] = ridx;
  160. /*     eventlog(eventlog_level_trace, __FUNCTION__, "adding new change event (write) fd on %d", ridx); */
  161. }
  162. EV_SET(kqchanges + ridx, fd, EVFILT_WRITE, EV_DELETE, 0, 0, 0);
  163.     }
  164.     return 0;
  165. }
  166. static int fdw_kqueue_del_fd(int fd)
  167. {
  168. /*    eventlog(eventlog_level_trace, __FUNCTION__, "called fd: %d", fd); */
  169.     if (sr > 0) 
  170. eventlog(eventlog_level_error, __FUNCTION__, "BUG: called while still handling sockets");
  171.     /* the last event changes about this fd has not yet been sent to kernel */
  172.     if (fdw_rw[fd] & fdwatch_type_read &&
  173.         nofds && fdw_rridx[fd] >= 0 && fdw_rridx[fd] < nofds && 
  174. kqchanges[fdw_rridx[fd]].ident == fd)
  175.     {
  176. nofds--;
  177. if (fdw_rridx[fd] < nofds)
  178. {
  179.     int tmp;
  180.     tmp = kqchanges[nofds].ident;
  181.     if (kqchanges[nofds].filter == EVFILT_READ && 
  182. fdw_rridx[tmp] == nofds)
  183.     {
  184. /*     eventlog(eventlog_level_trace, __FUNCTION__, "not last, moving %d", kqchanges[rnfds].ident); */
  185. fdw_rridx[tmp] = fdw_rridx[fd];
  186. memcpy(kqchanges + fdw_rridx[fd], kqchanges + nofds, sizeof(struct kevent));
  187.     }
  188.     if (kqchanges[nofds].filter == EVFILT_WRITE &&
  189. fdw_wridx[tmp] == nofds)
  190.     {
  191. /*     eventlog(eventlog_level_trace, __FUNCTION__, "not last, moving %d", kqchanges[rnfds].ident); */
  192. fdw_wridx[tmp] = fdw_rridx[fd];
  193. memcpy(kqchanges + fdw_rridx[fd], kqchanges + nofds, sizeof(struct kevent));
  194.     }
  195. }
  196. fdw_rridx[fd] = -1;
  197.     }
  198.     if (fdw_rw[fd] & fdwatch_type_write &&
  199.         nofds && fdw_wridx[fd] >= 0 && fdw_wridx[fd] < nofds && 
  200. kqchanges[fdw_wridx[fd]].ident == fd)
  201.     {
  202. nofds--;
  203. if (fdw_wridx[fd] < nofds)
  204. {
  205.     int tmp;
  206.     tmp = kqchanges[nofds].ident;
  207.     if (kqchanges[nofds].filter == EVFILT_READ && 
  208. fdw_rridx[tmp] == nofds)
  209.     {
  210. /*     eventlog(eventlog_level_trace, __FUNCTION__, "not last, moving %d", kqchanges[rnfds].ident); */
  211. fdw_rridx[tmp] = fdw_wridx[fd];
  212. memcpy(kqchanges + fdw_wridx[fd], kqchanges + nofds, sizeof(struct kevent));
  213.     }
  214.     if (kqchanges[nofds].filter == EVFILT_WRITE &&
  215. fdw_wridx[tmp] == nofds)
  216.     {
  217. /*     eventlog(eventlog_level_trace, __FUNCTION__, "not last, moving %d", kqchanges[rnfds].ident); */
  218. fdw_wridx[tmp] = fdw_wridx[fd];
  219. memcpy(kqchanges + fdw_wridx[fd], kqchanges + nofds, sizeof(struct kevent));
  220.     }
  221. }
  222. fdw_wridx[fd] = -1;
  223.     }
  224. /* here we presume the calling code does close() on the socket and if so
  225.  * it is automatically removed from any kernel kqueues */
  226.     return 0;
  227. }
  228. static int fdw_kqueue_watch(long timeout_msec)
  229. {
  230.     static struct timespec ts;
  231.     ts.tv_sec = timeout_msec / 1000L;
  232.     ts.tv_nsec = (timeout_msec % 1000L) * 1000000L;
  233.     sr = kevent(kq, nofds > 0 ? kqchanges : NULL, nofds, kqevents, fdw_maxfd, &ts);
  234.     nofds = 0;
  235.     return sr;
  236. }
  237. static void fdw_kqueue_handle(void)
  238. {
  239.     register unsigned i;
  240. /*    eventlog(eventlog_level_trace, __FUNCTION__, "called"); */
  241.     for (i = 0; i < sr; i++)
  242.     {
  243. /*      eventlog(eventlog_level_trace, __FUNCTION__, "checking %d ident: %d read: %d write: %d", i, kqevents[i].ident, kqevents[i].filter & EVFILT_READ, kqevents[i].filter & EVFILT_WRITE); */
  244. if (fdw_rw[kqevents[i].ident] & fdwatch_type_read && kqevents[i].filter == EVFILT_READ)
  245.     if (fdw_hnd[kqevents[i].ident] (fdw_data[kqevents[i].ident], fdwatch_type_read) == -2)
  246. continue;
  247. if (fdw_rw[kqevents[i].ident] & fdwatch_type_write && kqevents[i].filter == EVFILT_WRITE)
  248.     fdw_hnd[kqevents[i].ident] (fdw_data[kqevents[i].ident], fdwatch_type_write);
  249.     }
  250.     sr = 0;
  251. }
  252. #endif /* HAVE_KQUEUE */