ex_thread.c
上传用户:tsgydb
上传日期:2007-04-14
资源大小:10674k
文件大小:12k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /*-
  2.  * See the file LICENSE for redistribution information.
  3.  *
  4.  * Copyright (c) 1997, 1998, 1999, 2000
  5.  * Sleepycat Software.  All rights reserved.
  6.  *
  7.  * $Id: ex_thread.c,v 11.9 2000/05/31 15:10:04 bostic Exp $
  8.  */
  9. #include "db_config.h"
  10. #ifndef NO_SYSTEM_INCLUDES
  11. #include <sys/types.h>
  12. #if TIME_WITH_SYS_TIME
  13. #include <sys/time.h>
  14. #include <time.h>
  15. #else
  16. #if HAVE_SYS_TIME_H
  17. #include <sys/time.h>
  18. #else
  19. #include <time.h>
  20. #endif
  21. #endif
  22. #include <errno.h>
  23. #include <pthread.h>
  24. #include <stdio.h>
  25. #include <stdlib.h>
  26. #include <string.h>
  27. #include <unistd.h>
  28. #endif
  29. #include <db.h>
  30. /*
  31.  * NB: This application is written using POSIX 1003.1b-1993 pthreads
  32.  * interfaces, which may not be portable to your system.
  33.  */
  34. extern int sched_yield __P((void)); /* Pthread yield function. */
  35. DB_ENV *db_init __P((char *));
  36. void   *deadlock __P((void *));
  37. void fatal __P((char *, int, int));
  38. int main __P((int, char *[]));
  39. int reader __P((int));
  40. void stats __P((void));
  41. void   *trickle __P((void *));
  42. void   *tstart __P((void *));
  43. void usage __P((void));
  44. void word __P((void));
  45. int writer __P((int));
  46. struct _statistics {
  47. int aborted; /* Write. */
  48. int aborts; /* Read/write. */
  49. int adds; /* Write. */
  50. int deletes; /* Write. */
  51. int txns; /* Write. */
  52. int found; /* Read. */
  53. int notfound; /* Read. */
  54. } *perf;
  55. const char
  56. *progname = "ex_thread"; /* Program name. */
  57. #define DATABASE "access.db" /* Database name. */
  58. #define WORDLIST "../test/wordlist" /* Dictionary. */
  59. /*
  60.  * We can seriously increase the number of collisions and transaction
  61.  * aborts by yielding the scheduler after every DB call.  Specify the
  62.  * -p option to do this.
  63.  */
  64. int punish; /* -p */
  65. int nlist; /* -n */
  66. int nreaders; /* -r */
  67. int verbose; /* -v */
  68. int nwriters; /* -w */
  69. DB     *dbp; /* Database handle. */
  70. DB_ENV *dbenv; /* Database environment. */
  71. int nthreads; /* Total threads. */
  72. char  **list; /* Word list. */
  73. /*
  74.  * ex_thread --
  75.  * Run a simple threaded application of some numbers of readers and
  76.  * writers competing for a set of words.
  77.  *
  78.  * Example UNIX shell script to run this program:
  79.  * % rm -rf TESTDIR
  80.  * % mkdir TESTDIR
  81.  * % ex_thread -h TESTDIR
  82.  */
  83. int
  84. main(argc, argv)
  85. int argc;
  86. char *argv[];
  87. {
  88. extern char *optarg;
  89. extern int errno, optind;
  90. pthread_t *tids;
  91. int ch, i, ret;
  92. char *home;
  93. void *retp;
  94. nlist = 1000;
  95. nreaders = nwriters = 4;
  96. home = "TESTDIR";
  97. while ((ch = getopt(argc, argv, "h:pn:r:vw:")) != EOF)
  98. switch (ch) {
  99. case 'h':
  100. home = optarg;
  101. break;
  102. case 'p':
  103. punish = 1;
  104. break;
  105. case 'n':
  106. nlist = atoi(optarg);
  107. break;
  108. case 'r':
  109. nreaders = atoi(optarg);
  110. break;
  111. case 'v':
  112. verbose = 1;
  113. break;
  114. case 'w':
  115. nwriters = atoi(optarg);
  116. break;
  117. case '?':
  118. default:
  119. usage();
  120. }
  121. argc -= optind;
  122. argv += optind;
  123. /* Initialize the random number generator. */
  124. srand(getpid() | time(NULL));
  125. /* Build the key list. */
  126. word();
  127. /* Remove the previous database. */
  128. (void)unlink(DATABASE);
  129. /* Initialize the database environment. */
  130. dbenv = db_init(home);
  131. /* Initialize the database. */
  132. if ((ret = db_create(&dbp, dbenv, 0)) != 0) {
  133. dbenv->err(dbenv, ret, "db_create");
  134. (void)dbenv->close(dbenv, 0);
  135. return (1);
  136. }
  137. if ((ret = dbp->set_pagesize(dbp, 1024)) != 0) {
  138. dbp->err(dbp, ret, "set_pagesize");
  139. goto err;
  140. }
  141. if ((ret = dbp->open(dbp,
  142.      DATABASE, NULL, DB_BTREE, DB_CREATE | DB_THREAD, 0664)) != 0) {
  143. dbp->err(dbp, ret, "%s: open", DATABASE);
  144. goto err;
  145. }
  146. nthreads = nreaders + nwriters + 2;
  147. printf("Running: readers %d, writers %dn", nreaders, nwriters);
  148. fflush(stdout);
  149. /* Create statistics structures, offset by 1. */
  150. if ((perf = calloc(nreaders + nwriters + 1, sizeof(*perf))) == NULL)
  151. fatal(NULL, errno, 1);
  152. /* Create thread ID structures. */
  153. if ((tids = malloc(nthreads * sizeof(pthread_t))) == NULL)
  154. fatal(NULL, errno, 1);
  155. /* Create reader/writer threads. */
  156. for (i = 0; i < nreaders + nwriters; ++i)
  157. if (pthread_create(&tids[i], NULL, tstart, (void *)i))
  158. fatal("pthread_create", errno, 1);
  159. /* Create buffer pool trickle thread. */
  160. if (pthread_create(&tids[i], NULL, trickle, &i))
  161. fatal("pthread_create", errno, 1);
  162. ++i;
  163. /* Create deadlock detector thread. */
  164. if (pthread_create(&tids[i], NULL, deadlock, &i))
  165. fatal("pthread_create", errno, 1);
  166. /* Wait for the threads. */
  167. for (i = 0; i < nthreads; ++i)
  168. (void)pthread_join(tids[i], &retp);
  169. err: (void)dbp->close(dbp, 0);
  170. (void)dbenv->close(dbenv, 0);
  171. return (0);
  172. }
  173. int
  174. reader(id)
  175. int id;
  176. {
  177. DBT key, data;
  178. int n, ret;
  179. char buf[64];
  180. /*
  181.  * DBT's must use local memory or malloc'd memory if the DB handle
  182.  * is accessed in a threaded fashion.
  183.  */
  184. memset(&key, 0, sizeof(DBT));
  185. memset(&data, 0, sizeof(DBT));
  186. data.flags = DB_DBT_MALLOC;
  187. /*
  188.  * Read-only threads do not require transaction protection, unless
  189.  * there's a need for repeatable reads.
  190.  */
  191. for (;;) {
  192. /* Pick a key at random, and look it up. */
  193. n = rand() % nlist;
  194. key.data = list[n];
  195. key.size = strlen(key.data);
  196. if (verbose) {
  197. sprintf(buf, "reader: %d: list entry %dn", id, n);
  198. write(STDOUT_FILENO, buf, strlen(buf));
  199. }
  200. switch (ret = dbp->get(dbp, NULL, &key, &data, 0)) {
  201. case DB_LOCK_DEADLOCK: /* Deadlock. */
  202. ++perf[id].aborts;
  203. break;
  204. case 0: /* Success. */
  205. ++perf[id].found;
  206. free(data.data);
  207. break;
  208. case DB_NOTFOUND: /* Not found. */
  209. ++perf[id].notfound;
  210. break;
  211. default:
  212. sprintf(buf,
  213.     "reader %d: dbp->get: %s", id, (char *)key.data);
  214. fatal(buf, ret, 0);
  215. }
  216. }
  217. return (0);
  218. }
  219. int
  220. writer(id)
  221. int id;
  222. {
  223. DBT key, data;
  224. DB_TXN *tid;
  225. time_t now, then;
  226. int n, ret;
  227. char buf[256], dbuf[10000];
  228. time(&now);
  229. then = now;
  230. /*
  231.  * DBT's must use local memory or malloc'd memory if the DB handle
  232.  * is accessed in a threaded fashion.
  233.  */
  234. memset(&key, 0, sizeof(DBT));
  235. memset(&data, 0, sizeof(DBT));
  236. data.data = dbuf;
  237. data.ulen = sizeof(dbuf);
  238. data.flags = DB_DBT_USERMEM;
  239. for (;;) {
  240. /* Pick a random key. */
  241. n = rand() % nlist;
  242. key.data = list[n];
  243. key.size = strlen(key.data);
  244. if (verbose) {
  245. sprintf(buf, "writer: %d: list entry %dn", id, n);
  246. write(STDOUT_FILENO, buf, strlen(buf));
  247. }
  248. /* Abort and retry. */
  249. if (0) {
  250. retry: if ((ret = txn_abort(tid)) != 0)
  251. fatal("txn_abort", ret, 1);
  252. ++perf[id].aborts;
  253. ++perf[id].aborted;
  254. }
  255. /* Thread #1 prints out the stats every 20 seconds. */
  256. if (id == 1) {
  257. time(&now);
  258. if (now - then >= 20) {
  259. stats();
  260. then = now;
  261. }
  262. }
  263. /* Begin the transaction. */
  264. if ((ret = txn_begin(dbenv, NULL, &tid, 0)) != 0)
  265. fatal("txn_begin", ret, 1);
  266. /*
  267.  * Get the key.  If it doesn't exist, add it.  If it does
  268.  * exist, delete it.
  269.  */
  270. switch (ret = dbp->get(dbp, tid, &key, &data, 0)) {
  271. case DB_LOCK_DEADLOCK:
  272. goto retry;
  273. case 0:
  274. goto delete;
  275. case DB_NOTFOUND:
  276. goto add;
  277. }
  278. sprintf(buf, "writer: %d: dbp->get", id);
  279. fatal(buf, ret, 1);
  280. /* NOTREACHED */
  281. delete: /* Delete the key. */
  282. switch (ret = dbp->del(dbp, tid, &key, 0)) {
  283. case DB_LOCK_DEADLOCK:
  284. goto retry;
  285. case 0:
  286. ++perf[id].deletes;
  287. goto commit;
  288. }
  289. sprintf(buf, "writer: %d: dbp->del", id);
  290. fatal(buf, ret, 1);
  291. /* NOTREACHED */
  292. add: /* Add the key.  1 data item in 30 is an overflow item. */
  293. data.size = 20 + rand() % 128;
  294. if (rand() % 30 == 0)
  295. data.size += 8192;
  296. switch (ret = dbp->put(dbp, tid, &key, &data, 0)) {
  297. case DB_LOCK_DEADLOCK:
  298. goto retry;
  299. case 0:
  300. ++perf[id].adds;
  301. goto commit;
  302. default:
  303. sprintf(buf, "writer: %d: dbp->put", id);
  304. fatal(buf, ret, 1);
  305. }
  306. commit: /* The transaction finished, commit it. */
  307. if ((ret = txn_commit(tid, 0)) != 0)
  308. fatal("txn_commit", ret, 1);
  309. /*
  310.  * Every time the thread completes 20 transactions, show
  311.  * our progress.
  312.  */
  313. if (++perf[id].txns % 20 == 0) {
  314. sprintf(buf,
  315. "writer: %2d: adds: %4d: deletes: %4d: aborts: %4d: txns: %4dn",
  316.     id, perf[id].adds, perf[id].deletes,
  317.     perf[id].aborts, perf[id].txns);
  318. write(STDOUT_FILENO, buf, strlen(buf));
  319. }
  320. /*
  321.  * If this thread was aborted more than 5 times before
  322.  * the transaction finished, complain.
  323.  */
  324. if (perf[id].aborted > 5) {
  325. sprintf(buf,
  326. "writer: %2d: adds: %4d: deletes: %4d: aborts: %4d: txns: %4d: ABORTED: %2dn",
  327.     id, perf[id].adds, perf[id].deletes,
  328.     perf[id].aborts, perf[id].txns, perf[id].aborted);
  329. write(STDOUT_FILENO, buf, strlen(buf));
  330. }
  331. perf[id].aborted = 0;
  332. }
  333. return (0);
  334. }
  335. /*
  336.  * stats --
  337.  * Display reader/writer thread statistics.  To display the statistics
  338.  * for the mpool trickle or deadlock threads, use db_stat(1).
  339.  */
  340. void
  341. stats()
  342. {
  343. int id;
  344. char *p, buf[8192];
  345. p = buf + sprintf(buf, "-------------n");
  346. for (id = 0; id < nreaders + nwriters;)
  347. if (id++ < nwriters)
  348. p += sprintf(p,
  349. "writer: %2d: adds: %4d: deletes: %4d: aborts: %4d: txns: %4dn",
  350.     id, perf[id].adds,
  351.     perf[id].deletes, perf[id].aborts, perf[id].txns);
  352. else
  353. p += sprintf(p,
  354. "reader: %2d: found: %5d: notfound: %5d: aborts: %4dn",
  355.     id, perf[id].found,
  356.     perf[id].notfound, perf[id].aborts);
  357. p += sprintf(p, "-------------n");
  358. write(STDOUT_FILENO, buf, p - buf);
  359. }
  360. /*
  361.  * db_init --
  362.  * Initialize the environment.
  363.  */
  364. DB_ENV *
  365. db_init(home)
  366. char *home;
  367. {
  368. DB_ENV *dbenv;
  369. int ret;
  370. if (punish) {
  371. (void)db_env_set_pageyield(1);
  372. (void)db_env_set_func_yield(sched_yield);
  373. }
  374. if ((ret = db_env_create(&dbenv, 0)) != 0) {
  375. fprintf(stderr,
  376.     "%s: db_env_create: %sn", progname, db_strerror(ret));
  377. exit (1);
  378. }
  379. dbenv->set_errfile(dbenv, stderr);
  380. dbenv->set_errpfx(dbenv, progname);
  381. (void)dbenv->set_cachesize(dbenv, 0, 100 * 1024, 0);
  382. (void)dbenv->set_lg_max(dbenv, 200000);
  383. if ((ret = dbenv->open(dbenv, home,
  384.     DB_CREATE | DB_INIT_LOCK | DB_INIT_LOG |
  385.     DB_INIT_MPOOL | DB_INIT_TXN | DB_THREAD, 0)) != 0) {
  386. dbenv->err(dbenv, ret, NULL);
  387. (void)dbenv->close(dbenv, 0);
  388. exit (1);
  389. }
  390. return (dbenv);
  391. }
  392. /*
  393.  * tstart --
  394.  * Thread start function for readers and writers.
  395.  */
  396. void *
  397. tstart(arg)
  398. void *arg;
  399. {
  400. pthread_t tid;
  401. u_int id;
  402. id = (u_int)arg + 1;
  403. tid = pthread_self();
  404. if (id <= (u_int)nwriters) {
  405. printf("write thread %d starting: tid: %lun", id, (u_long)tid);
  406. fflush(stdout);
  407. writer(id);
  408. } else {
  409. printf("read thread %d starting: tid: %lun", id, (u_long)tid);
  410. fflush(stdout);
  411. reader(id);
  412. }
  413. /* NOTREACHED */
  414. return (NULL);
  415. }
  416. /*
  417.  * deadlock --
  418.  * Thread start function for lock_detect().
  419.  */
  420. void *
  421. deadlock(arg)
  422. void *arg;
  423. {
  424. struct timeval t;
  425. pthread_t tid;
  426. arg = arg; /* XXX: shut the compiler up. */
  427. tid = pthread_self();
  428. printf("deadlock thread starting: tid: %lun", (u_long)tid);
  429. fflush(stdout);
  430. t.tv_sec = 0;
  431. t.tv_usec = 100000;
  432. for (;;) {
  433. (void)lock_detect(dbenv,
  434.     DB_LOCK_CONFLICT, DB_LOCK_YOUNGEST, NULL);
  435. /* Check every 100ms. */
  436. (void)select(0, NULL, NULL, NULL, &t);
  437. }
  438. /* NOTREACHED */
  439. return (NULL);
  440. }
  441. /*
  442.  * trickle --
  443.  * Thread start function for memp_trickle().
  444.  */
  445. void *
  446. trickle(arg)
  447. void *arg;
  448. {
  449. pthread_t tid;
  450. int wrote;
  451. char buf[64];
  452. arg = arg; /* XXX: shut the compiler up. */
  453. tid = pthread_self();
  454. printf("trickle thread starting: tid: %lun", (u_long)tid);
  455. fflush(stdout);
  456. for (;;) {
  457. (void)memp_trickle(dbenv, 10, &wrote);
  458. if (verbose) {
  459. sprintf(buf, "trickle: wrote %dn", wrote);
  460. write(STDOUT_FILENO, buf, strlen(buf));
  461. }
  462. if (wrote == 0) {
  463. sleep(1);
  464. sched_yield();
  465. }
  466. }
  467. /* NOTREACHED */
  468. return (NULL);
  469. }
  470. /*
  471.  * word --
  472.  * Build the dictionary word list.
  473.  */
  474. void
  475. word()
  476. {
  477. FILE *fp;
  478. int cnt;
  479. char buf[256];
  480. if ((fp = fopen(WORDLIST, "r")) == NULL)
  481. fatal(WORDLIST, errno, 1);
  482. if ((list = malloc(nlist * sizeof(char *))) == NULL)
  483. fatal(NULL, errno, 1);
  484. for (cnt = 0; cnt < nlist; ++cnt) {
  485. if (fgets(buf, sizeof(buf), fp) == NULL)
  486. break;
  487. if ((list[cnt] = strdup(buf)) == NULL)
  488. fatal(NULL, errno, 1);
  489. }
  490. nlist = cnt; /* In case nlist was larger than possible. */
  491. }
  492. /*
  493.  * fatal --
  494.  * Report a fatal error and quit.
  495.  */
  496. void
  497. fatal(msg, err, syserr)
  498. char *msg;
  499. int err, syserr;
  500. {
  501. fprintf(stderr, "%s: ", progname);
  502. if (msg != NULL) {
  503. fprintf(stderr, "%s", msg);
  504. if (syserr)
  505. fprintf(stderr, ": ");
  506. }
  507. if (syserr)
  508. fprintf(stderr, "%s", strerror(err));
  509. fprintf(stderr, "n");
  510. exit (1);
  511. /* NOTREACHED */
  512. }
  513. /*
  514.  * usage --
  515.  * Usage message.
  516.  */
  517. void
  518. usage()
  519. {
  520. (void)fprintf(stderr,
  521.     "usage: %s [-pv] [-h home] [-n words] [-r readers] [-w writers]n",
  522.     progname);
  523. exit(1);
  524. }