gwthread-pthread.c
上传用户:gzpyjq
上传日期:2013-01-31
资源大小:1852k
文件大小:19k
源码类别:

手机WAP编程

开发平台:

WINDOWS

  1. /*
  2.  * gwthread-pthread.c - implementation of gwthread.h using POSIX threads.
  3.  *
  4.  * Richard Braakman
  5.  */
  6. #include <unistd.h>
  7. #include <errno.h>
  8. #include <pthread.h>
  9. #include <signal.h>
  10. #include <string.h>
  11. #include "gwlib/gwlib.h"
  12. #ifdef HAVE_LIBSSL
  13. #include <openssl/err.h>
  14. #endif /* HAVE_LIBSSL */
  15. /* Maximum number of live threads we can support at once.  Increasing
  16.  * this will increase the size of the threadtable.  Use powers of two
  17.  * for efficiency. */
  18. #define THREADTABLE_SIZE 1024
  19. struct threadinfo
  20. {
  21.     pthread_t self;
  22.     const char *name;
  23.     gwthread_func_t *func;
  24.     long number;
  25.     int wakefd_recv;
  26.     int wakefd_send;
  27.     /* joiners may be NULL.  It is not allocated until a thread wants
  28.      * to register.  This is safe because the thread table is always
  29.      * locked when a thread accesses this field. */
  30.     List *joiners;
  31. };
  32. struct new_thread_args
  33. {
  34.     gwthread_func_t *func;
  35.     void *arg;
  36.     struct threadinfo *ti;
  37. };
  38. /* The index is the external thread number modulo the table size; the
  39.  * thread number allocation code makes sure that there are no collisions. */
  40. static struct threadinfo *threadtable[THREADTABLE_SIZE];
  41. #define THREAD(t) (threadtable[(t) % THREADTABLE_SIZE])
  42. /* Number of threads currently in the thread table. */
  43. static long active_threads = 0;
  44. /* Number to use for the next thread created.  The actual number used
  45.  * may be higher than this, in order to avoid collisions in the threadtable.
  46.  * Specifically, (threadnumber % THREADTABLE_SIZE) must be unique for all
  47.  * live threads. */
  48. static long next_threadnumber;
  49. /* Info for the main thread is kept statically, because it should not
  50.  * be deallocated even after the thread module shuts down -- after all,
  51.  * the main thread is still running, and in practice, it can still
  52.  * output debug messages which will require the thread number. */
  53. static struct threadinfo mainthread;
  54. /* Our key for accessing the (struct gwthread *) we stash in the
  55.  * thread-specific-data area.  This is much more efficient than
  56.  * accessing a global table, which we would have to lock. */
  57. pthread_key_t tsd_key;
  58. pthread_mutex_t threadtable_lock;
  59. static void lock(void)
  60. {
  61.     int ret;
  62.     ret = pthread_mutex_lock(&threadtable_lock);
  63.     if (ret != 0) {
  64.         panic(ret, "gwthread-pthread: could not lock thread table");
  65.     }
  66. }
  67. static void unlock(void)
  68. {
  69.     int ret;
  70.     ret = pthread_mutex_unlock(&threadtable_lock);
  71.     if (ret != 0) {
  72.         panic(ret, "gwthread-pthread: could not unlock thread table");
  73.     }
  74. }
  75. /* Empty the wakeup pipe, in case we got several wakeup signals before
  76.  * noticing.  We want to wake up only once. */
  77. static void flushpipe(int fd)
  78. {
  79.     unsigned char buf[128];
  80.     ssize_t bytes;
  81.     do {
  82.         bytes = read(fd, buf, sizeof(buf));
  83.     } while (bytes > 0);
  84. }
  85. /* Allocate and fill a threadinfo structure for a new thread, and store
  86.  * it in a free slot in the thread table.  The thread table must already
  87.  * be locked by the caller.  Return the thread number chosen for this
  88.  * thread.  The caller must make sure that there is room in the table. */
  89. static long fill_threadinfo(pthread_t id, const char *name,
  90.                             gwthread_func_t *func,
  91.                             struct threadinfo *ti)
  92. {
  93.     int pipefds[2];
  94.     long first_try;
  95.     gw_assert(active_threads < THREADTABLE_SIZE);
  96.     ti->self = id;
  97.     ti->name = name;
  98.     ti->func = func;
  99.     if (pipe(pipefds) < 0) {
  100.         panic(errno, "cannot allocate wakeup pipe for new thread");
  101.     }
  102.     ti->wakefd_recv = pipefds[0];
  103.     ti->wakefd_send = pipefds[1];
  104.     socket_set_blocking(ti->wakefd_recv, 0);
  105.     socket_set_blocking(ti->wakefd_send, 0);
  106.     ti->joiners = NULL;
  107.     /* Find a free table entry and claim it. */
  108.     first_try = next_threadnumber;
  109.     do {
  110.         ti->number = next_threadnumber++;
  111.         /* Check if we looped all the way around the thread table. */
  112.         if (ti->number == first_try + THREADTABLE_SIZE) {
  113.             panic(0, "Cannot have more than %d active threads",
  114.                   THREADTABLE_SIZE);
  115.         }
  116.     } while (THREAD(ti->number) != NULL);
  117.     THREAD(ti->number) = ti;
  118.     active_threads++;
  119.     return ti->number;
  120. }
  121. /* Look up the threadinfo pointer for the current thread */
  122. static struct threadinfo *getthreadinfo(void)
  123. {
  124.     struct threadinfo *threadinfo;
  125.     threadinfo = pthread_getspecific(tsd_key);
  126.     if (threadinfo == NULL) {
  127.         panic(0, "gwthread-pthread: pthread_getspecific failed");
  128.     } else {
  129.         gw_assert(pthread_equal(threadinfo->self, pthread_self()));
  130.     }
  131.     return threadinfo;
  132. }
  133. /*
  134.  * Go through the list of threads waiting for us to exit, and tell
  135.  * them that we're exiting.  The joiner_cond entries are registered
  136.  * by those threads, and will be cleaned up by them.
  137.  */
  138. static void alert_joiners(void)
  139. {
  140.     struct threadinfo *threadinfo;
  141.     pthread_cond_t *joiner_cond;
  142.     threadinfo = getthreadinfo();
  143.     if (!threadinfo->joiners)
  144.         return;
  145.     while ((joiner_cond = list_extract_first(threadinfo->joiners))) {
  146.         pthread_cond_broadcast(joiner_cond);
  147.     }
  148. }
  149. static void delete_threadinfo(void)
  150. {
  151.     struct threadinfo *threadinfo;
  152.     threadinfo = getthreadinfo();
  153.     list_destroy(threadinfo->joiners, NULL);
  154.     close(threadinfo->wakefd_recv);
  155.     close(threadinfo->wakefd_send);
  156.     THREAD(threadinfo->number) = NULL;
  157.     active_threads--;
  158.     gw_assert(threadinfo != &mainthread);
  159.     gw_free(threadinfo);
  160. }
  161. static void create_threadinfo_main(void)
  162. {
  163.     int ret;
  164.     fill_threadinfo(pthread_self(), "main", NULL, &mainthread);
  165.     ret = pthread_setspecific(tsd_key, &mainthread);
  166.     if (ret != 0) {
  167.         panic(ret, "gwthread-pthread: pthread_setspecific failed");
  168.     }
  169. }
  170. void gwthread_init(void)
  171. {
  172.     int ret;
  173.     int i;
  174.     pthread_mutex_init(&threadtable_lock, NULL);
  175.     ret = pthread_key_create(&tsd_key, NULL);
  176.     if (ret != 0) {
  177.         panic(ret, "gwthread-pthread: pthread_key_create failed");
  178.     }
  179.     for (i = 0; i < THREADTABLE_SIZE; i++) {
  180.         threadtable[i] = NULL;
  181.     }
  182.     active_threads = 0;
  183.     create_threadinfo_main();
  184. }
  185. /* Note that the gwthread library can't shut down completely, because
  186.  * the main thread will still be running, and it may make calls to
  187.  * gwthread_self(). */
  188. void gwthread_shutdown(void)
  189. {
  190.     int ret;
  191.     int running;
  192.     int i;
  193.     /* Main thread must not have disappeared */
  194.     gw_assert(threadtable[0] != NULL);
  195.     lock();
  196.     running = 0;
  197.     /* Start i at 1 to skip the main thread, which is supposed to be
  198.      * still running. */
  199.     for (i = 1; i < THREADTABLE_SIZE; i++) {
  200.         if (threadtable[i] != NULL) {
  201.             debug("gwlib", 0, "Thread %ld (%s) still running",
  202.                   threadtable[i]->number,
  203.                   threadtable[i]->name);
  204.             running++;
  205.         }
  206.     }
  207.     unlock();
  208.     /* We can't do a full cleanup this way */
  209.     if (running)
  210.         return;
  211.     ret = pthread_mutex_destroy(&threadtable_lock);
  212.     if (ret != 0) {
  213.         warning(ret, "cannot destroy threadtable lock");
  214.     }
  215.     /* We can't delete the tsd_key here, because gwthread_self()
  216.      * still needs it to access the main thread's info. */
  217. }
  218. static void *new_thread(void *arg)
  219. {
  220.     int ret;
  221.     struct new_thread_args *p = arg;
  222.     /* Make sure we don't start until our parent has entered
  223.      * our thread info in the thread table. */
  224.     lock();
  225.     unlock();
  226.     /* This has to be done here, because pthread_setspecific cannot
  227.      * be called by our parent on our behalf.  That's why the ti
  228.      * pointer is passed in the new_thread_args structure. */
  229.     /* Synchronization is not a problem, because the only thread
  230.      * that relies on this call having been made is this one --
  231.      * no other thread can access our TSD anyway. */
  232.     ret = pthread_setspecific(tsd_key, p->ti);
  233.     if (ret != 0) {
  234.         panic(ret, "gwthread-pthread: pthread_setspecific failed");
  235.     }
  236.     (p->func)(p->arg);
  237.     lock();
  238.     debug("gwlib.gwthread", 0, "Thread %ld (%s) terminates.",
  239.           p->ti->number, p->ti->name);
  240.     alert_joiners();
  241. #ifdef HAVE_LIBSSL
  242.     /* Clear the OpenSSL thread-specific error queue to avoid
  243.      * memory leaks. */
  244.     ERR_remove_state(gwthread_self());
  245. #endif /* HAVE_LIBSSL */
  246.     /* Must free p before signaling our exit, otherwise there is
  247.      * a race with gw_check_leaks at shutdown. */
  248.     gw_free(p);
  249.     delete_threadinfo();
  250.     unlock();
  251.     return NULL;
  252. }
  253. /*
  254.  * Change this thread's signal mask to block user-visible signals
  255.  * (HUP, TERM, QUIT, INT), and store the old signal mask in
  256.  * *old_set_storage.
  257.  * Return 0 for success, or -1 if an error occurred.
  258.  */
  259.  
  260.  /* 
  261.   * This does not work in Darwin alias MacOS X alias Mach kernel,
  262.   * however. So we define a dummy function doing nothing.
  263.   */
  264. #if defined(DARWIN_OLD)
  265.     static int pthread_sigmask();
  266. #endif
  267.   
  268. static int block_user_signals(sigset_t *old_set_storage)
  269. {
  270.     int ret;
  271.     sigset_t block_signals;
  272.     ret = sigemptyset(&block_signals);
  273.     if (ret != 0) {
  274.         error(errno, "gwthread-pthread: Couldn't initialize signal set");
  275.     return -1;
  276.     }
  277.     ret = sigaddset(&block_signals, SIGHUP);
  278.     ret |= sigaddset(&block_signals, SIGTERM);
  279.     ret |= sigaddset(&block_signals, SIGQUIT);
  280.     ret |= sigaddset(&block_signals, SIGINT);
  281.     if (ret != 0) {
  282.         error(0, "gwthread-pthread: Couldn't add signal to signal set");
  283.     return -1;
  284.     }
  285.     ret = pthread_sigmask(SIG_BLOCK, &block_signals, old_set_storage);
  286.     if (ret != 0) {
  287.         error(ret, 
  288.             "gwthread-pthread: Couldn't disable signals for thread creation");
  289.         return -1;
  290.     }
  291.     return 0;
  292. }
  293. static void restore_user_signals(sigset_t *old_set)
  294. {
  295.     int ret;
  296.     ret = pthread_sigmask(SIG_SETMASK, old_set, NULL);
  297.     if (ret != 0) {
  298.         panic(ret, "gwthread-pthread: Couldn't restore signal set.");
  299.     }
  300. }
  301. static long spawn_thread(gwthread_func_t *func, const char *name, void *arg)
  302. {
  303.     int ret;
  304.     pthread_t id;
  305.     struct new_thread_args *p = NULL;
  306.     long new_thread_id;
  307.     /* We want to pass both these arguments to our wrapper function
  308.      * new_thread, but the pthread_create interface will only let
  309.      * us pass one pointer.  So we wrap them in a little struct. */
  310.     p = gw_malloc(sizeof(*p));
  311.     p->func = func;
  312.     p->arg = arg;
  313.     p->ti = gw_malloc(sizeof(*(p->ti)));
  314.     /* Lock the thread table here, so that new_thread can block
  315.      * on that lock.  That way, the new thread won't start until
  316.      * we have entered it in the thread table. */
  317.     lock();
  318.     if (active_threads >= THREADTABLE_SIZE) {
  319.         unlock();
  320.         warning(0, "Too many threads, could not create new thread.");
  321.         gw_free(p);
  322.         return -1;
  323.     }
  324.     ret = pthread_create(&id, NULL, &new_thread, p);
  325.     if (ret != 0) {
  326.         unlock();
  327.         error(ret, "Could not create new thread.");
  328.         gw_free(p);
  329.         return -1;
  330.     }
  331.     ret = pthread_detach(id);
  332.     if (ret != 0) {
  333.         warning(ret, "Could not detach new thread.");
  334.     }
  335.     new_thread_id = fill_threadinfo(id, name, func, p->ti);
  336.     unlock();
  337.     
  338.     debug("gwlib.gwthread", 0, "Started thread %ld (%s)", new_thread_id, name);
  339.     return new_thread_id;
  340. }
  341. long gwthread_create_real(gwthread_func_t *func, const char *name, void *arg)
  342. {
  343.     int sigtrick = 0;
  344.     sigset_t old_signal_set;
  345.     long thread_id;
  346.     /*
  347.      * We want to make sure that only the main thread handles signals,
  348.      * so that each signal is handled exactly once.  To do this, we
  349.      * make sure that each new thread has all the signals that we
  350.      * handle blocked.  To avoid race conditions, we block them in 
  351.      * the spawning thread first, then create the new thread (which
  352.      * inherits the settings), and then restore the old settings in
  353.      * the spawning thread.  This means that there is a brief period
  354.      * when no signals will be processed, but during that time they
  355.      * should be queued by the operating system.
  356.      */
  357.     if (gwthread_self() == MAIN_THREAD_ID)
  358.     sigtrick = block_user_signals(&old_signal_set) == 0;
  359.     thread_id = spawn_thread(func, name, arg);
  360.     /*
  361.      * Restore the old signal mask.  The new thread will have
  362.      * inherited the resticted one, but the main thread needs
  363.      * the old one back.
  364.      */
  365.     if (sigtrick)
  366.       restore_user_signals(&old_signal_set);
  367.     
  368.     return thread_id;
  369. }
  370. void gwthread_join(long thread)
  371. {
  372.     struct threadinfo *threadinfo;
  373.     pthread_cond_t exit_cond;
  374.     int ret;
  375.     gw_assert(thread >= 0);
  376.     lock();
  377.     threadinfo = THREAD(thread);
  378.     if (threadinfo == NULL || threadinfo->number != thread) {
  379.         /* The other thread has already exited */
  380.         unlock();
  381.         return;
  382.     }
  383.     /* Register our desire to be alerted when that thread exits,
  384.      * and wait for it. */
  385.     ret = pthread_cond_init(&exit_cond, NULL);
  386.     if (ret != 0) {
  387.         warning(ret, "gwthread_join: cannot create condition variable.");
  388.         unlock();
  389.         return;
  390.     }
  391.     if (!threadinfo->joiners)
  392.         threadinfo->joiners = list_create();
  393.     list_append(threadinfo->joiners, &exit_cond);
  394.     /* The wait immediately releases the lock, and reacquires it
  395.      * when the condition is satisfied.  So don't worry, we're not
  396.      * blocking while keeping the table locked. */
  397.     ret = pthread_cond_wait(&exit_cond, &threadtable_lock);
  398.     unlock();
  399.     if (ret != 0)
  400.         warning(ret, "gwthread_join: error in pthread_cond_wait");
  401.     pthread_cond_destroy(&exit_cond);
  402. }
  403. void gwthread_join_all(void)
  404. {
  405.     long i;
  406.     long our_thread = gwthread_self();
  407.     for (i = 0; i < THREADTABLE_SIZE; ++i) {
  408.         if (THREAD(our_thread) != THREAD(i))
  409.             gwthread_join(i);
  410.     }
  411. }
  412. void gwthread_wakeup_all(void)
  413. {
  414.     long i;
  415.     long our_thread = gwthread_self();
  416.     for (i = 0; i < THREADTABLE_SIZE; ++i) {
  417.         if (THREAD(our_thread) != THREAD(i))
  418.             gwthread_wakeup(i);
  419.     }
  420. }
  421. void gwthread_join_every(gwthread_func_t *func)
  422. {
  423.     struct threadinfo *ti;
  424.     pthread_cond_t exit_cond;
  425.     int ret;
  426.     long i;
  427.     ret = pthread_cond_init(&exit_cond, NULL);
  428.     if (ret != 0) {
  429.         warning(ret, "gwthread_join_every: cannot create condition variable.");
  430.         unlock();
  431.         return;
  432.     }
  433.     /*
  434.      * FIXME: To be really safe, this function should keep looping
  435.      * over the table until it does a complete run without having
  436.      * to call pthread_cond_wait.  Otherwise, new threads could
  437.      * start while we wait, and we'll miss them.
  438.      */
  439.     lock();
  440.     for (i = 0; i < THREADTABLE_SIZE; ++i) {
  441.         ti = THREAD(i);
  442.         if (ti == NULL || ti->func != func)
  443.             continue;
  444.         debug("gwlib.gwthread", 0,
  445.               "Waiting for %ld (%s) to terminate",
  446.               ti->number, ti->name);
  447.         if (!ti->joiners)
  448.             ti->joiners = list_create();
  449.         list_append(ti->joiners, &exit_cond);
  450.         ret = pthread_cond_wait(&exit_cond, &threadtable_lock);
  451.         if (ret != 0)
  452.             warning(ret, "gwthread_join_all: error in pthread_cond_wait");
  453.     }
  454.     unlock();
  455.     pthread_cond_destroy(&exit_cond);
  456. }
  457. /* Return the thread id of this thread. */
  458. long gwthread_self(void)
  459. {
  460.     struct threadinfo *threadinfo;
  461.     threadinfo = pthread_getspecific(tsd_key);
  462.     if (threadinfo)
  463.         return threadinfo->number;
  464.     else
  465.         return -1;
  466. }
  467. void gwthread_wakeup(long thread)
  468. {
  469.     unsigned char c = 0;
  470.     struct threadinfo *threadinfo;
  471.     int fd;
  472.     gw_assert(thread >= 0);
  473.     lock();
  474.     threadinfo = THREAD(thread);
  475.     if (threadinfo == NULL || threadinfo->number != thread) {
  476.         unlock();
  477.         return;
  478.     }
  479.     fd = threadinfo->wakefd_send;
  480.     unlock();
  481.     write(fd, &c, 1);
  482. }
  483. int gwthread_pollfd(int fd, int events, double timeout)
  484. {
  485.     struct pollfd pollfd[2];
  486.     struct threadinfo *threadinfo;
  487.     int milliseconds;
  488.     int ret;
  489.     threadinfo = getthreadinfo();
  490.     pollfd[0].fd = threadinfo->wakefd_recv;
  491.     pollfd[0].events = POLLIN;
  492.     pollfd[1].fd = fd;
  493.     pollfd[1].events = events;
  494.     milliseconds = timeout * 1000;
  495.     if (milliseconds < 0)
  496.         milliseconds = POLL_NOTIMEOUT;
  497.     ret = poll(pollfd, 2, milliseconds);
  498.     if (ret < 0) {
  499.         if (errno != EINTR)
  500.             error(errno, "gwthread_pollfd: error in poll");
  501.         return -1;
  502.     }
  503.     if (pollfd[0].revents)
  504.         flushpipe(pollfd[0].fd);
  505.     return pollfd[1].revents;
  506. }
  507. int gwthread_poll(struct pollfd *fds, long numfds, double timeout)
  508. {
  509.     struct pollfd *pollfds;
  510.     struct threadinfo *threadinfo;
  511.     int milliseconds;
  512.     int ret;
  513.     threadinfo = getthreadinfo();
  514.     /* Create a new pollfd array with an extra element for the
  515.      * thread wakeup fd. */
  516.     pollfds = gw_malloc((numfds + 1) * sizeof(*pollfds));
  517.     pollfds[0].fd = threadinfo->wakefd_recv;
  518.     pollfds[0].events = POLLIN;
  519.     memcpy(pollfds + 1, fds, numfds * sizeof(*pollfds));
  520.     milliseconds = timeout * 1000;
  521.     if (milliseconds < 0)
  522.         milliseconds = POLL_NOTIMEOUT;
  523.     ret = poll(pollfds, numfds + 1, milliseconds);
  524.     if (ret < 0) {
  525.         if (errno != EINTR)
  526.             error(errno, "gwthread_poll: error in poll");
  527.         gw_free(pollfds);
  528.         return -1;
  529.     }
  530.     if (pollfds[0].revents)
  531.         flushpipe(pollfds[0].fd);
  532.     /* Copy the results back to the caller */
  533.     memcpy(fds, pollfds + 1, numfds * sizeof(*pollfds));
  534.     gw_free(pollfds);
  535.     return ret;
  536. }
  537. void gwthread_sleep(double seconds)
  538. {
  539.     struct pollfd pollfd;
  540.     struct threadinfo *threadinfo;
  541.     int milliseconds;
  542.     int ret;
  543.     threadinfo = getthreadinfo();
  544.     pollfd.fd = threadinfo->wakefd_recv;
  545.     pollfd.events = POLLIN;
  546.     milliseconds = seconds * 1000;
  547.     if (milliseconds < 0)
  548.         milliseconds = POLL_NOTIMEOUT;
  549.     ret = poll(&pollfd, 1, milliseconds);
  550.     if (ret < 0) {
  551.         if (errno != EINTR && errno != EAGAIN) {
  552.             warning(errno, "gwthread_sleep: error in poll");
  553.         }
  554.     }
  555.     if (ret == 1) {
  556.         flushpipe(pollfd.fd);
  557.     }
  558. }
  559. #ifndef BROKEN_PTHREADS
  560. /* Working pthreads */
  561. int gwthread_shouldhandlesignal(int signal){
  562.     return 1;
  563. }
  564. #else
  565. /* Somewhat broken pthreads */ 
  566. int gwthread_shouldhandlesignal(int signal){
  567.     return (gwthread_self() == MAIN_THREAD_ID);
  568. }
  569. #endif
  570. int gwthread_dumpsigmask(void) {
  571.     sigset_t signal_set;
  572.     int signum;
  573.     /* Grab the signal set data from our thread */
  574.     if (pthread_sigmask(SIG_BLOCK, NULL, &signal_set) != 0) {
  575.     warning(0, "gwthread_dumpsigmask: Couldn't get signal mask.");
  576.     return -1;
  577.     }
  578.     
  579.     /* For each signal normally defined (there are usually only 32),
  580.      * print a message if we don't block it. */
  581.     for (signum = 1; signum <= 32; signum++) {
  582.      if (!sigismember(&signal_set, signum)) {
  583.          debug("gwlib", 0,
  584.      "gwthread_dumpsigmask: Signal Number %d will be caught.", 
  585.      signum);
  586.      }
  587.     }
  588.     return 0;
  589. }
  590. /* DARWIN alias MacOS X doesnt have pthread_sigmask in its pthreads implementation */
  591. #if defined(DARWIN_OLD)
  592. static int pthread_sigmask()
  593. {
  594.     return 0;
  595. }
  596. #endif