fdset.c
上传用户:gzpyjq
上传日期:2013-01-31
资源大小:1852k
文件大小:13k
源码类别:

手机WAP编程

开发平台:

WINDOWS

  1. /*
  2.  * fdset.c - module for managing a large collection of file descriptors
  3.  */
  4. #include <stdlib.h>
  5. #include <unistd.h>
  6. #include <errno.h>
  7. #include "gwlib/gwlib.h"
  8. struct FDSet
  9. {
  10.     /* Thread ID of the set's internal thread, which will spend most
  11.      * of its time blocking on poll().  This is set when the thread
  12.      * is created, and not changed after that.  It's not protected
  13.      * by any lock. */
  14.     long poll_thread;
  15.     /* The following fields are for use by the polling thread only.
  16.      * No-one else may touch them.  It's not protected by any lock. */
  17.     /* Array for use with poll().  Elements 0 through size-1 are allocated.
  18.      * Elements 0 through entries-1 are in use. */
  19.     struct pollfd *pollinfo;
  20.     int size;
  21.     int entries;
  22.     /* Arrays of callback and data fields.  They are kept in sync with
  23.      * the pollinfo array, and are basically extra fields that we couldn't
  24.      * put in struct pollfd because that structure is defined externally. */
  25.     fdset_callback_t **callbacks;
  26.     void **datafields;
  27.     /* The poller function loops over the table after poll() returns,
  28.      * and calls callback functions that may modify the table that is
  29.      * being scanned.  We can't just copy the table to avoid interference,
  30.      * because fdset_unregister and fdset_listen guarantee that their
  31.      * operations are complete when they return -- that does not work
  32.      * if poller() is scanning an outdated copy of the table.
  33.      * To solve this, we have a field that marks when the table is
  34.      * being scanned.  If this field is true, fdset_unregister merely
  35.      * sets the fd to -1 instead of deleting the whole entry.
  36.      * fdset_listen will takes care to modify revents as well as
  37.      * events. fdset_register always adds to the end of the table,
  38.      * so it does not have to do anything special.
  39.      */
  40.     int scanning;
  41.     /* This field keeps track of how many fds were set to -1 by
  42.      * fdset_unregister while "scanning" is true.  That way we can
  43.      * efficiently check if we need to scan the table to really 
  44.      * delete those entries. */
  45.     int deleted_entries;
  46.     
  47.     /* The following fields are for general use, and are of types that
  48.      * have internal locks. */
  49.     /* List of struct action.  Used by other threads to make requests
  50.      * of the polling thread. */
  51.     List *actions;
  52. };
  53. /* Datatype to describe changes to the fdset fields that only the polling
  54.  * thread may touch.  Other threads use this type to submit requests to
  55.  * change those fields. */
  56. /* Action life cycle: Created, then pushed on set->actions list by
  57.  * action_submit.  Poller thread wakes up and takes it from the list,
  58.  * then calls handle_action, which performs the action and pushes it
  59.  * on the action's done list.  action_submit then takes it back and
  60.  * destroys it. */
  61. /* If no synchronization is needed, action_submit_nosync can be used.
  62.  * In that case handle_action will destroy the action itself instead
  63.  * of putting it on any list. */
  64. struct action
  65. {
  66.     enum { REGISTER, LISTEN, UNREGISTER, DESTROY } type;
  67.     int fd;                     /* Used by REGISTER, LISTEN, and UNREGISTER */
  68.     int mask;                   /* Used by LISTEN */
  69.     int events;                 /* Used by REGISTER and LISTEN */
  70.     fdset_callback_t *callback; /* Used by REGISTER */
  71.     void *data;                 /* Used by REGISTER */
  72.     /* When the request has been handled, an element is produced on this
  73.      * list, so that the submitter can synchronize.  Can be left NULL. */
  74.     List *done;                 /* Used by LISTEN, UNREGISTER, and DESTROY */
  75. };
  76. /* Return a new action structure of the given type, with all fields empty. */
  77. static struct action *action_create(int type)
  78. {
  79.     struct action *new;
  80.     new = gw_malloc(sizeof(*new));
  81.     new->type = type;
  82.     new->fd = -1;
  83.     new->mask = 0;
  84.     new->events = 0;
  85.     new->callback = NULL;
  86.     new->data = NULL;
  87.     new->done = NULL;
  88.     return new;
  89. }
  90. static void action_destroy(struct action *action)
  91. {
  92.     if (action == NULL)
  93.         return;
  94.     list_destroy(action->done, NULL);
  95.     gw_free(action);
  96. }
  97. /* For use with list_destroy */
  98. static void action_destroy_item(void *action)
  99. {
  100.     action_destroy(action);
  101. }
  102. /*
  103.  * Submit an action for this set, and wait for the polling thread to
  104.  * confirm that it's been done, by pushing the action on its done list.
  105.  */
  106. static void submit_action(FDSet *set, struct action *action)
  107. {
  108.     List *done;
  109.     void *sync;
  110.     gw_assert(set != NULL);
  111.     gw_assert(action != NULL);
  112.     done = list_create();
  113.     list_add_producer(done);
  114.     action->done = done;
  115.     list_append(set->actions, action);
  116.     gwthread_wakeup(set->poll_thread);
  117.     sync = list_consume(done);
  118.     gw_assert(sync == action);
  119.     action_destroy(action);
  120. }
  121. /* 
  122.  * As above, but don't wait for confirmation.
  123.  */
  124. static void submit_action_nosync(FDSet *set, struct action *action)
  125. {
  126.     list_append(set->actions, action);
  127.     gwthread_wakeup(set->poll_thread);
  128. }
  129. /* Do one action for this thread and confirm that it's been done by
  130.  * appending the action to its done list.  May only be called by
  131.  * the polling thread.  Returns 0 normally, and returns -1 if the
  132.  * action destroyed the set. */
  133. static int handle_action(FDSet *set, struct action *action)
  134. {
  135.     int result;
  136.     gw_assert(set != NULL);
  137.     gw_assert(set->poll_thread == gwthread_self());
  138.     gw_assert(action != NULL);
  139.     result = 0;
  140.     switch (action->type) {
  141.     case REGISTER:
  142.         fdset_register(set, action->fd, action->events,
  143.                        action->callback, action->data);
  144.         break;
  145.     case LISTEN:
  146.         fdset_listen(set, action->fd, action->mask, action->events);
  147.         break;
  148.     case UNREGISTER:
  149.         fdset_unregister(set, action->fd);
  150.         break;
  151.     case DESTROY:
  152.         fdset_destroy(set);
  153.         result = -1;
  154.         break;
  155.     default:
  156.         panic(0, "fdset: handle_action got unknown action type %d.",
  157.               action->type);
  158.     }
  159.     if (action->done == NULL)
  160. action_destroy(action);
  161.     else
  162.         list_produce(action->done, action);
  163.     return result;
  164. }
  165. /* Look up the entry number in the pollinfo array for this fd.
  166.  * Right now it's a linear search, this may have to be improved. */
  167. static int find_entry(FDSet *set, int fd)
  168. {
  169.     int i;
  170.     gw_assert(set != NULL);
  171.     gw_assert(gwthread_self() == set->poll_thread);
  172.     for (i = 0; i < set->entries; i++) {
  173.         if (set->pollinfo[i].fd == fd)
  174.             return i;
  175.     }
  176.     return -1;
  177. }
  178. static void remove_entry(FDSet *set, int entry)
  179. {
  180.     if (entry != set->entries - 1) {
  181.         /* We need to keep the array contiguous, so move the last element
  182.          * to fill in the hole. */
  183.         set->pollinfo[entry] = set->pollinfo[set->entries - 1];
  184.         set->callbacks[entry] = set->callbacks[set->entries - 1];
  185.         set->datafields[entry] = set->datafields[set->entries - 1];
  186.     }
  187.     set->entries--;
  188. }
  189. static void remove_deleted_entries(FDSet *set)
  190. {
  191.     int i;
  192.     i = 0;
  193.     while (i < set->entries && set->deleted_entries > 0) {
  194.         if (set->pollinfo[i].fd < 0) {
  195.             remove_entry(set, i);
  196.     set->deleted_entries--;
  197. } else {
  198.     i++;
  199.         }
  200.     }
  201. }
  202. /* Main function for polling thread.  Most its time is spent blocking
  203.  * in poll().  No-one else is allowed to change the fields it uses,
  204.  * so other threads just put something on the actions list and wake
  205.  * up this thread.  That's why it checks the actions list every time
  206.  * it goes through the loop.
  207.  */
  208. static void poller(void *arg)
  209. {
  210.     FDSet *set = arg;
  211.     struct action *action;
  212.     int ret;
  213.     int i;
  214.     gw_assert(set != NULL);
  215.     for (;;) {
  216.         while ((action = list_extract_first(set->actions)) != NULL) {
  217.             /* handle_action returns -1 if the set was destroyed. */
  218.             if (handle_action(set, action) < 0)
  219.                 return;
  220.         }
  221.         /* Block indefinitely, waiting for activity */
  222.         ret = gwthread_poll(set->pollinfo, set->entries, -1.0);
  223.         if (ret < 0) {
  224.     if (errno != EINTR) {
  225.                 error(0, "Poller: can't handle error; sleeping 1 second.");
  226.                 gwthread_sleep(1.0);
  227.             }
  228.             continue;
  229.         }
  230. /* Callbacks may modify the table while we scan it, so be careful. */
  231. set->scanning = 1;
  232.         for (i = 0; i < set->entries; i++) {
  233.             if (set->pollinfo[i].revents != 0)
  234.                 set->callbacks[i](set->pollinfo[i].fd,
  235.                                   set->pollinfo[i].revents,
  236.                                   set->datafields[i]);
  237.         }
  238. set->scanning = 0;
  239. if (set->deleted_entries > 0)
  240.     remove_deleted_entries(set);
  241.     }
  242. }
  243. FDSet *fdset_create(void)
  244. {
  245.     FDSet *new;
  246.     new = gw_malloc(sizeof(*new));
  247.     /* Start off with space for one element because we can't malloc 0 bytes
  248.      * and we don't want to worry about these pointers being NULL. */
  249.     new->size = 1;
  250.     new->entries = 0;
  251.     new->pollinfo = gw_malloc(sizeof(new->pollinfo[0]) * new->size);
  252.     new->callbacks = gw_malloc(sizeof(new->callbacks[0]) * new->size);
  253.     new->datafields = gw_malloc(sizeof(new->datafields[0]) * new->size);
  254.     new->scanning = 0;
  255.     new->deleted_entries = 0;
  256.     new->actions = list_create();
  257.     new->poll_thread = gwthread_create(poller, new);
  258.     if (new->poll_thread < 0) {
  259.         error(0, "Could not start internal thread for fdset.");
  260.         fdset_destroy(new);
  261.         return NULL;
  262.     }
  263.     return new;
  264. }
  265. void fdset_destroy(FDSet *set)
  266. {
  267.     if (set == NULL)
  268.         return;
  269.     if (set->poll_thread < 0 || gwthread_self() == set->poll_thread) {
  270.         if (set->entries > 0) {
  271.             warning(0, "Destroying fdset with %d active entries.",
  272.                     set->entries);
  273.         }
  274.         gw_free(set->pollinfo);
  275.         gw_free(set->callbacks);
  276.         gw_free(set->datafields);
  277.         if (list_len(set->actions) > 0) {
  278.             error(0, "Destroying fdset with %ld pending actions.",
  279.                   list_len(set->actions));
  280.         }
  281.         list_destroy(set->actions, action_destroy_item);
  282.         gw_free(set);
  283.     } else {
  284.         long thread = set->poll_thread;
  285.         submit_action(set, action_create(DESTROY));
  286. gwthread_join(thread);
  287.     }
  288. }
  289. void fdset_register(FDSet *set, int fd, int events,
  290.                     fdset_callback_t callback, void *data)
  291. {
  292.     int new;
  293.     gw_assert(set != NULL);
  294.     if (gwthread_self() != set->poll_thread) {
  295.         struct action *action;
  296.         action = action_create(REGISTER);
  297.         action->fd = fd;
  298.         action->events = events;
  299.         action->callback = callback;
  300.         action->data = data;
  301. submit_action_nosync(set, action);
  302.         return;
  303.     }
  304.     gw_assert(set->entries <= set->size);
  305.     if (set->entries >= set->size) {
  306.         int newsize = set->entries + 1;
  307.         set->pollinfo = gw_realloc(set->pollinfo,
  308.                                    sizeof(set->pollinfo[0]) * newsize);
  309.         set->callbacks = gw_realloc(set->callbacks,
  310.                                    sizeof(set->callbacks[0]) * newsize);
  311.         set->datafields = gw_realloc(set->datafields,
  312.                                    sizeof(set->datafields[0]) * newsize);
  313.         set->size = newsize;
  314.     }
  315.     /* We don't check set->scanning.  Adding new entries is not harmful
  316.      * because their revents fields are 0. */
  317.     new = set->entries++;
  318.     set->pollinfo[new].fd = fd;
  319.     set->pollinfo[new].events = events;
  320.     set->pollinfo[new].revents = 0;
  321.     set->callbacks[new] = callback;
  322.     set->datafields[new] = data;
  323. }
  324. void fdset_listen(FDSet *set, int fd, int mask, int events)
  325. {
  326.     int entry;
  327.     gw_assert(set != NULL);
  328.     if (gwthread_self() != set->poll_thread) {
  329.         struct action *action;
  330.         action = action_create(LISTEN);
  331.         action->fd = fd;
  332. action->mask = mask;
  333.         action->events = events;
  334.         submit_action(set, action);
  335.         return;
  336.     }
  337.     entry = find_entry(set, fd);   
  338.     if (entry < 0) {
  339.         warning(0, "fdset_listen called on unregistered fd %d.", fd);
  340.         return;
  341.     }
  342.     /* Copy the bits from events specified by the mask, and preserve the
  343.      * bits not specified by the mask. */
  344.     set->pollinfo[entry].events =
  345. (set->pollinfo[entry].events & ~mask) | (events & mask);
  346.     /* If poller is currently scanning the array, then change the
  347.      * revents field so that the callback function will not be called
  348.      * for events we should no longer listen for.  The idea is the
  349.      * same as for the events field, except that we only turn bits off. */
  350.     if (set->scanning) {
  351.         set->pollinfo[entry].revents =
  352.             set->pollinfo[entry].revents & (events | ~mask);
  353.     }
  354. }
  355. void fdset_unregister(FDSet *set, int fd)
  356. {
  357.     int entry;
  358.     gw_assert(set != NULL);
  359.     if (gwthread_self() != set->poll_thread) {
  360.         struct action *action;
  361.         action = action_create(UNREGISTER);
  362.         action->fd = fd;
  363.         submit_action(set, action);
  364.         return;
  365.     }
  366.     /* Remove the entry from the pollinfo array */
  367.     entry = find_entry(set, fd);
  368.     if (entry < 0) {
  369.         warning(0, "fdset_listen called on unregistered fd %d.", fd);
  370.         return;
  371.     }
  372.     if (entry == set->entries - 1) {
  373.         /* It's the last entry.  We can safely remove it even while
  374.          * the array is being scanned, because the scan checks set->entries. */
  375.         set->entries--;
  376.     } else if (set->scanning) {
  377.         /* We can't remove entries because the array is being
  378.          * scanned.  Mark it as deleted.  */
  379.         set->pollinfo[entry].fd = -1;
  380.         set->deleted_entries++;
  381.     } else {
  382.         remove_entry(set, entry);
  383.     }
  384. }