ex_thread.c
上传用户:tsgydb
上传日期:2007-04-14
资源大小:10674k
文件大小:12k
- /*-
- * See the file LICENSE for redistribution information.
- *
- * Copyright (c) 1997, 1998, 1999, 2000
- * Sleepycat Software. All rights reserved.
- *
- * $Id: ex_thread.c,v 11.9 2000/05/31 15:10:04 bostic Exp $
- */
- #include "db_config.h"
- #ifndef NO_SYSTEM_INCLUDES
- #include <sys/types.h>
- #if TIME_WITH_SYS_TIME
- #include <sys/time.h>
- #include <time.h>
- #else
- #if HAVE_SYS_TIME_H
- #include <sys/time.h>
- #else
- #include <time.h>
- #endif
- #endif
- #include <errno.h>
- #include <pthread.h>
- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
- #include <unistd.h>
- #endif
- #include <db.h>
- /*
- * NB: This application is written using POSIX 1003.1b-1993 pthreads
- * interfaces, which may not be portable to your system.
- */
- extern int sched_yield __P((void)); /* Pthread yield function. */
- DB_ENV *db_init __P((char *));
- void *deadlock __P((void *));
- void fatal __P((char *, int, int));
- int main __P((int, char *[]));
- int reader __P((int));
- void stats __P((void));
- void *trickle __P((void *));
- void *tstart __P((void *));
- void usage __P((void));
- void word __P((void));
- int writer __P((int));
- struct _statistics {
- int aborted; /* Write. */
- int aborts; /* Read/write. */
- int adds; /* Write. */
- int deletes; /* Write. */
- int txns; /* Write. */
- int found; /* Read. */
- int notfound; /* Read. */
- } *perf;
- const char
- *progname = "ex_thread"; /* Program name. */
- #define DATABASE "access.db" /* Database name. */
- #define WORDLIST "../test/wordlist" /* Dictionary. */
- /*
- * We can seriously increase the number of collisions and transaction
- * aborts by yielding the scheduler after every DB call. Specify the
- * -p option to do this.
- */
- int punish; /* -p */
- int nlist; /* -n */
- int nreaders; /* -r */
- int verbose; /* -v */
- int nwriters; /* -w */
- DB *dbp; /* Database handle. */
- DB_ENV *dbenv; /* Database environment. */
- int nthreads; /* Total threads. */
- char **list; /* Word list. */
- /*
- * ex_thread --
- * Run a simple threaded application of some numbers of readers and
- * writers competing for a set of words.
- *
- * Example UNIX shell script to run this program:
- * % rm -rf TESTDIR
- * % mkdir TESTDIR
- * % ex_thread -h TESTDIR
- */
- int
- main(argc, argv)
- int argc;
- char *argv[];
- {
- extern char *optarg;
- extern int errno, optind;
- pthread_t *tids;
- int ch, i, ret;
- char *home;
- void *retp;
- nlist = 1000;
- nreaders = nwriters = 4;
- home = "TESTDIR";
- while ((ch = getopt(argc, argv, "h:pn:r:vw:")) != EOF)
- switch (ch) {
- case 'h':
- home = optarg;
- break;
- case 'p':
- punish = 1;
- break;
- case 'n':
- nlist = atoi(optarg);
- break;
- case 'r':
- nreaders = atoi(optarg);
- break;
- case 'v':
- verbose = 1;
- break;
- case 'w':
- nwriters = atoi(optarg);
- break;
- case '?':
- default:
- usage();
- }
- argc -= optind;
- argv += optind;
- /* Initialize the random number generator. */
- srand(getpid() | time(NULL));
- /* Build the key list. */
- word();
- /* Remove the previous database. */
- (void)unlink(DATABASE);
- /* Initialize the database environment. */
- dbenv = db_init(home);
- /* Initialize the database. */
- if ((ret = db_create(&dbp, dbenv, 0)) != 0) {
- dbenv->err(dbenv, ret, "db_create");
- (void)dbenv->close(dbenv, 0);
- return (1);
- }
- if ((ret = dbp->set_pagesize(dbp, 1024)) != 0) {
- dbp->err(dbp, ret, "set_pagesize");
- goto err;
- }
- if ((ret = dbp->open(dbp,
- DATABASE, NULL, DB_BTREE, DB_CREATE | DB_THREAD, 0664)) != 0) {
- dbp->err(dbp, ret, "%s: open", DATABASE);
- goto err;
- }
- nthreads = nreaders + nwriters + 2;
- printf("Running: readers %d, writers %dn", nreaders, nwriters);
- fflush(stdout);
- /* Create statistics structures, offset by 1. */
- if ((perf = calloc(nreaders + nwriters + 1, sizeof(*perf))) == NULL)
- fatal(NULL, errno, 1);
- /* Create thread ID structures. */
- if ((tids = malloc(nthreads * sizeof(pthread_t))) == NULL)
- fatal(NULL, errno, 1);
- /* Create reader/writer threads. */
- for (i = 0; i < nreaders + nwriters; ++i)
- if (pthread_create(&tids[i], NULL, tstart, (void *)i))
- fatal("pthread_create", errno, 1);
- /* Create buffer pool trickle thread. */
- if (pthread_create(&tids[i], NULL, trickle, &i))
- fatal("pthread_create", errno, 1);
- ++i;
- /* Create deadlock detector thread. */
- if (pthread_create(&tids[i], NULL, deadlock, &i))
- fatal("pthread_create", errno, 1);
- /* Wait for the threads. */
- for (i = 0; i < nthreads; ++i)
- (void)pthread_join(tids[i], &retp);
- err: (void)dbp->close(dbp, 0);
- (void)dbenv->close(dbenv, 0);
- return (0);
- }
- int
- reader(id)
- int id;
- {
- DBT key, data;
- int n, ret;
- char buf[64];
- /*
- * DBT's must use local memory or malloc'd memory if the DB handle
- * is accessed in a threaded fashion.
- */
- memset(&key, 0, sizeof(DBT));
- memset(&data, 0, sizeof(DBT));
- data.flags = DB_DBT_MALLOC;
- /*
- * Read-only threads do not require transaction protection, unless
- * there's a need for repeatable reads.
- */
- for (;;) {
- /* Pick a key at random, and look it up. */
- n = rand() % nlist;
- key.data = list[n];
- key.size = strlen(key.data);
- if (verbose) {
- sprintf(buf, "reader: %d: list entry %dn", id, n);
- write(STDOUT_FILENO, buf, strlen(buf));
- }
- switch (ret = dbp->get(dbp, NULL, &key, &data, 0)) {
- case DB_LOCK_DEADLOCK: /* Deadlock. */
- ++perf[id].aborts;
- break;
- case 0: /* Success. */
- ++perf[id].found;
- free(data.data);
- break;
- case DB_NOTFOUND: /* Not found. */
- ++perf[id].notfound;
- break;
- default:
- sprintf(buf,
- "reader %d: dbp->get: %s", id, (char *)key.data);
- fatal(buf, ret, 0);
- }
- }
- return (0);
- }
- int
- writer(id)
- int id;
- {
- DBT key, data;
- DB_TXN *tid;
- time_t now, then;
- int n, ret;
- char buf[256], dbuf[10000];
- time(&now);
- then = now;
- /*
- * DBT's must use local memory or malloc'd memory if the DB handle
- * is accessed in a threaded fashion.
- */
- memset(&key, 0, sizeof(DBT));
- memset(&data, 0, sizeof(DBT));
- data.data = dbuf;
- data.ulen = sizeof(dbuf);
- data.flags = DB_DBT_USERMEM;
- for (;;) {
- /* Pick a random key. */
- n = rand() % nlist;
- key.data = list[n];
- key.size = strlen(key.data);
- if (verbose) {
- sprintf(buf, "writer: %d: list entry %dn", id, n);
- write(STDOUT_FILENO, buf, strlen(buf));
- }
- /* Abort and retry. */
- if (0) {
- retry: if ((ret = txn_abort(tid)) != 0)
- fatal("txn_abort", ret, 1);
- ++perf[id].aborts;
- ++perf[id].aborted;
- }
- /* Thread #1 prints out the stats every 20 seconds. */
- if (id == 1) {
- time(&now);
- if (now - then >= 20) {
- stats();
- then = now;
- }
- }
- /* Begin the transaction. */
- if ((ret = txn_begin(dbenv, NULL, &tid, 0)) != 0)
- fatal("txn_begin", ret, 1);
- /*
- * Get the key. If it doesn't exist, add it. If it does
- * exist, delete it.
- */
- switch (ret = dbp->get(dbp, tid, &key, &data, 0)) {
- case DB_LOCK_DEADLOCK:
- goto retry;
- case 0:
- goto delete;
- case DB_NOTFOUND:
- goto add;
- }
- sprintf(buf, "writer: %d: dbp->get", id);
- fatal(buf, ret, 1);
- /* NOTREACHED */
- delete: /* Delete the key. */
- switch (ret = dbp->del(dbp, tid, &key, 0)) {
- case DB_LOCK_DEADLOCK:
- goto retry;
- case 0:
- ++perf[id].deletes;
- goto commit;
- }
- sprintf(buf, "writer: %d: dbp->del", id);
- fatal(buf, ret, 1);
- /* NOTREACHED */
- add: /* Add the key. 1 data item in 30 is an overflow item. */
- data.size = 20 + rand() % 128;
- if (rand() % 30 == 0)
- data.size += 8192;
- switch (ret = dbp->put(dbp, tid, &key, &data, 0)) {
- case DB_LOCK_DEADLOCK:
- goto retry;
- case 0:
- ++perf[id].adds;
- goto commit;
- default:
- sprintf(buf, "writer: %d: dbp->put", id);
- fatal(buf, ret, 1);
- }
- commit: /* The transaction finished, commit it. */
- if ((ret = txn_commit(tid, 0)) != 0)
- fatal("txn_commit", ret, 1);
- /*
- * Every time the thread completes 20 transactions, show
- * our progress.
- */
- if (++perf[id].txns % 20 == 0) {
- sprintf(buf,
- "writer: %2d: adds: %4d: deletes: %4d: aborts: %4d: txns: %4dn",
- id, perf[id].adds, perf[id].deletes,
- perf[id].aborts, perf[id].txns);
- write(STDOUT_FILENO, buf, strlen(buf));
- }
- /*
- * If this thread was aborted more than 5 times before
- * the transaction finished, complain.
- */
- if (perf[id].aborted > 5) {
- sprintf(buf,
- "writer: %2d: adds: %4d: deletes: %4d: aborts: %4d: txns: %4d: ABORTED: %2dn",
- id, perf[id].adds, perf[id].deletes,
- perf[id].aborts, perf[id].txns, perf[id].aborted);
- write(STDOUT_FILENO, buf, strlen(buf));
- }
- perf[id].aborted = 0;
- }
- return (0);
- }
- /*
- * stats --
- * Display reader/writer thread statistics. To display the statistics
- * for the mpool trickle or deadlock threads, use db_stat(1).
- */
- void
- stats()
- {
- int id;
- char *p, buf[8192];
- p = buf + sprintf(buf, "-------------n");
- for (id = 0; id < nreaders + nwriters;)
- if (id++ < nwriters)
- p += sprintf(p,
- "writer: %2d: adds: %4d: deletes: %4d: aborts: %4d: txns: %4dn",
- id, perf[id].adds,
- perf[id].deletes, perf[id].aborts, perf[id].txns);
- else
- p += sprintf(p,
- "reader: %2d: found: %5d: notfound: %5d: aborts: %4dn",
- id, perf[id].found,
- perf[id].notfound, perf[id].aborts);
- p += sprintf(p, "-------------n");
- write(STDOUT_FILENO, buf, p - buf);
- }
- /*
- * db_init --
- * Initialize the environment.
- */
- DB_ENV *
- db_init(home)
- char *home;
- {
- DB_ENV *dbenv;
- int ret;
- if (punish) {
- (void)db_env_set_pageyield(1);
- (void)db_env_set_func_yield(sched_yield);
- }
- if ((ret = db_env_create(&dbenv, 0)) != 0) {
- fprintf(stderr,
- "%s: db_env_create: %sn", progname, db_strerror(ret));
- exit (1);
- }
- dbenv->set_errfile(dbenv, stderr);
- dbenv->set_errpfx(dbenv, progname);
- (void)dbenv->set_cachesize(dbenv, 0, 100 * 1024, 0);
- (void)dbenv->set_lg_max(dbenv, 200000);
- if ((ret = dbenv->open(dbenv, home,
- DB_CREATE | DB_INIT_LOCK | DB_INIT_LOG |
- DB_INIT_MPOOL | DB_INIT_TXN | DB_THREAD, 0)) != 0) {
- dbenv->err(dbenv, ret, NULL);
- (void)dbenv->close(dbenv, 0);
- exit (1);
- }
- return (dbenv);
- }
- /*
- * tstart --
- * Thread start function for readers and writers.
- */
- void *
- tstart(arg)
- void *arg;
- {
- pthread_t tid;
- u_int id;
- id = (u_int)arg + 1;
- tid = pthread_self();
- if (id <= (u_int)nwriters) {
- printf("write thread %d starting: tid: %lun", id, (u_long)tid);
- fflush(stdout);
- writer(id);
- } else {
- printf("read thread %d starting: tid: %lun", id, (u_long)tid);
- fflush(stdout);
- reader(id);
- }
- /* NOTREACHED */
- return (NULL);
- }
- /*
- * deadlock --
- * Thread start function for lock_detect().
- */
- void *
- deadlock(arg)
- void *arg;
- {
- struct timeval t;
- pthread_t tid;
- arg = arg; /* XXX: shut the compiler up. */
- tid = pthread_self();
- printf("deadlock thread starting: tid: %lun", (u_long)tid);
- fflush(stdout);
- t.tv_sec = 0;
- t.tv_usec = 100000;
- for (;;) {
- (void)lock_detect(dbenv,
- DB_LOCK_CONFLICT, DB_LOCK_YOUNGEST, NULL);
- /* Check every 100ms. */
- (void)select(0, NULL, NULL, NULL, &t);
- }
- /* NOTREACHED */
- return (NULL);
- }
- /*
- * trickle --
- * Thread start function for memp_trickle().
- */
- void *
- trickle(arg)
- void *arg;
- {
- pthread_t tid;
- int wrote;
- char buf[64];
- arg = arg; /* XXX: shut the compiler up. */
- tid = pthread_self();
- printf("trickle thread starting: tid: %lun", (u_long)tid);
- fflush(stdout);
- for (;;) {
- (void)memp_trickle(dbenv, 10, &wrote);
- if (verbose) {
- sprintf(buf, "trickle: wrote %dn", wrote);
- write(STDOUT_FILENO, buf, strlen(buf));
- }
- if (wrote == 0) {
- sleep(1);
- sched_yield();
- }
- }
- /* NOTREACHED */
- return (NULL);
- }
- /*
- * word --
- * Build the dictionary word list.
- */
- void
- word()
- {
- FILE *fp;
- int cnt;
- char buf[256];
- if ((fp = fopen(WORDLIST, "r")) == NULL)
- fatal(WORDLIST, errno, 1);
- if ((list = malloc(nlist * sizeof(char *))) == NULL)
- fatal(NULL, errno, 1);
- for (cnt = 0; cnt < nlist; ++cnt) {
- if (fgets(buf, sizeof(buf), fp) == NULL)
- break;
- if ((list[cnt] = strdup(buf)) == NULL)
- fatal(NULL, errno, 1);
- }
- nlist = cnt; /* In case nlist was larger than possible. */
- }
- /*
- * fatal --
- * Report a fatal error and quit.
- */
- void
- fatal(msg, err, syserr)
- char *msg;
- int err, syserr;
- {
- fprintf(stderr, "%s: ", progname);
- if (msg != NULL) {
- fprintf(stderr, "%s", msg);
- if (syserr)
- fprintf(stderr, ": ");
- }
- if (syserr)
- fprintf(stderr, "%s", strerror(err));
- fprintf(stderr, "n");
- exit (1);
- /* NOTREACHED */
- }
- /*
- * usage --
- * Usage message.
- */
- void
- usage()
- {
- (void)fprintf(stderr,
- "usage: %s [-pv] [-h home] [-n words] [-r readers] [-w writers]n",
- progname);
- exit(1);
- }