qam.c
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:36k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /*-
  2.  * See the file LICENSE for redistribution information.
  3.  *
  4.  * Copyright (c) 1999-2002
  5.  * Sleepycat Software.  All rights reserved.
  6.  */
  7. #include "db_config.h"
  8. #ifndef lint
  9. static const char revid[] = "$Id: qam.c,v 11.134 2002/08/13 20:46:08 ubell Exp $";
  10. #endif /* not lint */
  11. #ifndef NO_SYSTEM_INCLUDES
  12. #include <sys/types.h>
  13. #include <string.h>
  14. #endif
  15. #include "db_int.h"
  16. #include "dbinc/db_page.h"
  17. #include "dbinc/db_shash.h"
  18. #include "dbinc/btree.h"
  19. #include "dbinc/lock.h"
  20. #include "dbinc/log.h"
  21. #include "dbinc/qam.h"
  22. static int __qam_bulk __P((DBC *, DBT *, u_int32_t));
  23. static int __qam_c_close __P((DBC *, db_pgno_t, int *));
  24. static int __qam_c_del __P((DBC *));
  25. static int __qam_c_destroy __P((DBC *));
  26. static int __qam_c_get __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
  27. static int __qam_c_put __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
  28. static int __qam_consume __P((DBC *, QMETA *, db_recno_t));
  29. static int __qam_getno __P((DB *, const DBT *, db_recno_t *));
  30. /*
  31.  * __qam_position --
  32.  * Position a queued access method cursor at a record.  This returns
  33.  * the page locked.  *exactp will be set if the record is valid.
  34.  * PUBLIC: int __qam_position
  35.  * PUBLIC:       __P((DBC *, db_recno_t *, qam_position_mode, int *));
  36.  */
  37. int
  38. __qam_position(dbc, recnop, mode, exactp)
  39. DBC *dbc; /* open cursor */
  40. db_recno_t *recnop; /* pointer to recno to find */
  41. qam_position_mode mode;/* locking: read or write */
  42. int *exactp; /* indicate if it was found */
  43. {
  44. QUEUE_CURSOR *cp;
  45. DB *dbp;
  46. QAMDATA  *qp;
  47. db_pgno_t pg;
  48. int ret;
  49. dbp = dbc->dbp;
  50. cp = (QUEUE_CURSOR *)dbc->internal;
  51. /* Fetch the page for this recno. */
  52. pg = QAM_RECNO_PAGE(dbp, *recnop);
  53. if ((ret = __db_lget(dbc, 0, pg, mode == QAM_READ ?
  54.     DB_LOCK_READ : DB_LOCK_WRITE, 0, &cp->lock)) != 0)
  55. return (ret);
  56. cp->page = NULL;
  57. *exactp = 0;
  58. if ((ret = __qam_fget(dbp, &pg,
  59.     mode == QAM_WRITE ? DB_MPOOL_CREATE : 0, &cp->page)) != 0) {
  60. /* We did not fetch it, we can release the lock. */
  61. (void)__LPUT(dbc, cp->lock);
  62. if (mode != QAM_WRITE &&
  63.     (ret == DB_PAGE_NOTFOUND || ret == ENOENT))
  64. return (0);
  65. return (ret);
  66. }
  67. cp->pgno = pg;
  68. cp->indx = QAM_RECNO_INDEX(dbp, pg, *recnop);
  69. if (PGNO(cp->page) == 0) {
  70. if (F_ISSET(dbp, DB_AM_RDONLY)) {
  71. *exactp = 0;
  72. return (0);
  73. }
  74. PGNO(cp->page) = pg;
  75. TYPE(cp->page) = P_QAMDATA;
  76. }
  77. qp = QAM_GET_RECORD(dbp, cp->page, cp->indx);
  78. *exactp = F_ISSET(qp, QAM_VALID) ? 1 : 0;
  79. return (ret);
  80. }
  81. /*
  82.  * __qam_pitem --
  83.  * Put an item on a queue page.  Copy the data to the page and set the
  84.  * VALID and SET bits.  If logging and the record was previously set,
  85.  * log that data, otherwise just log the new data.
  86.  *
  87.  *   pagep must be write locked
  88.  *
  89.  * PUBLIC: int __qam_pitem
  90.  * PUBLIC:     __P((DBC *,  QPAGE *, u_int32_t, db_recno_t, DBT *));
  91.  */
  92. int
  93. __qam_pitem(dbc, pagep, indx, recno, data)
  94. DBC *dbc;
  95. QPAGE *pagep;
  96. u_int32_t indx;
  97. db_recno_t recno;
  98. DBT *data;
  99. {
  100. DB *dbp;
  101. DBT olddata, pdata, *datap;
  102. QAMDATA *qp;
  103. QUEUE *t;
  104. u_int32_t alloced;
  105. u_int8_t *dest, *p;
  106. int ret;
  107. alloced = ret = 0;
  108. dbp = dbc->dbp;
  109. t = (QUEUE *)dbp->q_internal;
  110. if (data->size > t->re_len)
  111. goto len_err;
  112. qp = QAM_GET_RECORD(dbp, pagep, indx);
  113. p = qp->data;
  114. datap = data;
  115. if (F_ISSET(data, DB_DBT_PARTIAL)) {
  116. if (data->doff + data->dlen > t->re_len) {
  117. alloced = data->dlen;
  118. goto len_err;
  119. }
  120. if (data->size != data->dlen) {
  121. len_err: __db_err(dbp->dbenv,
  122.     "Length improper for fixed length record %lu",
  123.     (u_long)(alloced ? alloced : data->size));
  124. return (EINVAL);
  125. }
  126. if (data->size == t->re_len)
  127. goto no_partial;
  128. /*
  129.  * If we are logging, then we have to build the record
  130.  * first, otherwise, we can simply drop the change
  131.  * directly on the page.  After this clause, make
  132.  * sure that datap and p are set up correctly so that
  133.  * copying datap into p does the right thing.
  134.  *
  135.  * Note, I am changing this so that if the existing
  136.  * record is not valid, we create a complete record
  137.  * to log so that both this and the recovery code is simpler.
  138.  */
  139. if (DBC_LOGGING(dbc) || !F_ISSET(qp, QAM_VALID)) {
  140. datap = &pdata;
  141. memset(datap, 0, sizeof(*datap));
  142. if ((ret = __os_malloc(dbp->dbenv,
  143.     t->re_len, &datap->data)) != 0)
  144. return (ret);
  145. alloced = 1;
  146. datap->size = t->re_len;
  147. /*
  148.  * Construct the record if it's valid, otherwise set it
  149.  * all to the pad character.
  150.  */
  151. dest = datap->data;
  152. if (F_ISSET(qp, QAM_VALID))
  153. memcpy(dest, p, t->re_len);
  154. else
  155. memset(dest, t->re_pad, t->re_len);
  156. dest += data->doff;
  157. memcpy(dest, data->data, data->size);
  158. } else {
  159. datap = data;
  160. p += data->doff;
  161. }
  162. }
  163. no_partial:
  164. if (DBC_LOGGING(dbc)) {
  165. olddata.size = 0;
  166. if (F_ISSET(qp, QAM_SET)) {
  167. olddata.data = qp->data;
  168. olddata.size = t->re_len;
  169. }
  170. if ((ret = __qam_add_log(dbp, dbc->txn, &LSN(pagep),
  171.     0, &LSN(pagep), pagep->pgno,
  172.     indx, recno, datap, qp->flags,
  173.     olddata.size == 0 ? NULL : &olddata)) != 0)
  174. goto err;
  175. }
  176. F_SET(qp, QAM_VALID | QAM_SET);
  177. memcpy(p, datap->data, datap->size);
  178. if (!F_ISSET(data, DB_DBT_PARTIAL))
  179. memset(p + datap->size,  t->re_pad, t->re_len - datap->size);
  180. err: if (alloced)
  181. __os_free(dbp->dbenv, datap->data);
  182. return (ret);
  183. }
  184. /*
  185.  * __qam_c_put
  186.  * Cursor put for queued access method.
  187.  * BEFORE and AFTER cannot be specified.
  188.  */
  189. static int
  190. __qam_c_put(dbc, key, data, flags, pgnop)
  191. DBC *dbc;
  192. DBT *key, *data;
  193. u_int32_t flags;
  194. db_pgno_t *pgnop;
  195. {
  196. DB *dbp;
  197. DB_LOCK lock;
  198. DB_MPOOLFILE *mpf;
  199. QMETA *meta;
  200. QUEUE_CURSOR *cp;
  201. db_pgno_t pg;
  202. db_recno_t new_cur, new_first;
  203. u_int32_t opcode;
  204. int exact, ret, t_ret;
  205. dbp = dbc->dbp;
  206. mpf = dbp->mpf;
  207. if (pgnop != NULL)
  208. *pgnop = PGNO_INVALID;
  209. cp = (QUEUE_CURSOR *)dbc->internal;
  210. switch (flags) {
  211. case DB_KEYFIRST:
  212. case DB_KEYLAST:
  213. if ((ret = __qam_getno(dbp, key, &cp->recno)) != 0)
  214. return (ret);
  215. /* FALLTHROUGH */
  216. case DB_CURRENT:
  217. break;
  218. default:
  219. /* The interface shouldn't let anything else through. */
  220. DB_ASSERT(0);
  221. return (__db_ferr(dbp->dbenv, "__qam_c_put", flags));
  222. }
  223. /* Write lock the record. */
  224. if ((ret = __db_lget(dbc,
  225.     0, cp->recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &lock)) != 0)
  226. return (ret);
  227. if ((ret = __qam_position(dbc,
  228.     &cp->recno, QAM_WRITE, &exact)) != 0) {
  229. /* We could not get the page, we can release the record lock. */
  230. __LPUT(dbc, lock);
  231. return (ret);
  232. }
  233. /* Put the item on the page. */
  234. ret = __qam_pitem(dbc, (QPAGE *)cp->page, cp->indx, cp->recno, data);
  235. /* Doing record locking, release the page lock */
  236. if ((t_ret = __LPUT(dbc, cp->lock)) != 0 && ret == 0)
  237. ret = t_ret;
  238. if ((t_ret = __qam_fput(
  239.     dbp, cp->pgno, cp->page, DB_MPOOL_DIRTY)) != 0 && ret == 0)
  240. ret = t_ret;
  241. cp->page = NULL;
  242. cp->lock = lock;
  243. cp->lock_mode = DB_LOCK_WRITE;
  244. if (ret != 0)
  245. return (ret);
  246. /* We may need to reset the head or tail of the queue. */
  247. pg = ((QUEUE *)dbp->q_internal)->q_meta;
  248. /*
  249.  * Get the meta page first, we don't want to write lock it while
  250.  * trying to pin it.
  251.  */
  252. if ((ret = mpf->get(mpf, &pg, 0, &meta)) != 0)
  253. return (ret);
  254. if ((ret = __db_lget(dbc, 0, pg,  DB_LOCK_WRITE, 0, &lock)) != 0) {
  255. (void)mpf->put(mpf, meta, 0);
  256. return (ret);
  257. }
  258. opcode = 0;
  259. new_cur = new_first = 0;
  260. /*
  261.  * If the put address is outside the queue, adjust the head and
  262.  * tail of the queue.  If the order is inverted we move
  263.  * the one which is closer.  The first case is when the
  264.  * queue is empty, move first and current to where the new
  265.  * insert is.
  266.  */
  267. if (meta->first_recno == meta->cur_recno) {
  268. new_first = cp->recno;
  269. new_cur = cp->recno + 1;
  270. if (new_cur == RECNO_OOB)
  271. new_cur++;
  272. opcode |= QAM_SETFIRST;
  273. opcode |= QAM_SETCUR;
  274. } else {
  275. if (QAM_BEFORE_FIRST(meta, cp->recno) &&
  276.     (meta->first_recno <= meta->cur_recno ||
  277.     meta->first_recno - cp->recno <
  278.     cp->recno - meta->cur_recno)) {
  279. new_first = cp->recno;
  280. opcode |= QAM_SETFIRST;
  281. }
  282. if (meta->cur_recno == cp->recno ||
  283.     (QAM_AFTER_CURRENT(meta, cp->recno) &&
  284.     (meta->first_recno <= meta->cur_recno ||
  285.     cp->recno - meta->cur_recno <=
  286.     meta->first_recno - cp->recno))) {
  287. new_cur = cp->recno + 1;
  288. if (new_cur == RECNO_OOB)
  289. new_cur++;
  290. opcode |= QAM_SETCUR;
  291. }
  292. }
  293. if (opcode != 0 && DBC_LOGGING(dbc)) {
  294. ret = __qam_mvptr_log(dbp, dbc->txn, &meta->dbmeta.lsn,
  295.     0, opcode, meta->first_recno, new_first,
  296.     meta->cur_recno, new_cur, &meta->dbmeta.lsn, PGNO_BASE_MD);
  297. if (ret != 0)
  298. opcode = 0;
  299. }
  300. if (opcode & QAM_SETCUR)
  301. meta->cur_recno = new_cur;
  302. if (opcode & QAM_SETFIRST)
  303. meta->first_recno = new_first;
  304. if ((t_ret = mpf->put(
  305.     mpf, meta, opcode != 0 ? DB_MPOOL_DIRTY : 0)) != 0 && ret == 0)
  306. ret = t_ret;
  307. /* Don't hold the meta page long term. */
  308. if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0)
  309. ret = t_ret;
  310. return (ret);
  311. }
  312. /*
  313.  * __qam_append --
  314.  * Perform a put(DB_APPEND) in queue.
  315.  *
  316.  * PUBLIC: int __qam_append __P((DBC *, DBT *, DBT *));
  317.  */
  318. int
  319. __qam_append(dbc, key, data)
  320. DBC *dbc;
  321. DBT *key, *data;
  322. {
  323. DB *dbp;
  324. DB_LOCK lock;
  325. DB_MPOOLFILE *mpf;
  326. QMETA *meta;
  327. QPAGE *page;
  328. QUEUE *qp;
  329. QUEUE_CURSOR *cp;
  330. db_pgno_t pg;
  331. db_recno_t recno;
  332. int ret, t_ret;
  333. dbp = dbc->dbp;
  334. mpf = dbp->mpf;
  335. cp = (QUEUE_CURSOR *)dbc->internal;
  336. pg = ((QUEUE *)dbp->q_internal)->q_meta;
  337. /*
  338.  * Get the meta page first, we don't want to write lock it while
  339.  * trying to pin it.
  340.  */
  341. if ((ret = mpf->get(mpf, &pg, 0, &meta)) != 0)
  342. return (ret);
  343. /* Write lock the meta page. */
  344. if ((ret = __db_lget(dbc, 0, pg,  DB_LOCK_WRITE, 0, &lock)) != 0) {
  345. (void)mpf->put(mpf, meta, 0);
  346. return (ret);
  347. }
  348. /* Get the next record number. */
  349. recno = meta->cur_recno;
  350. meta->cur_recno++;
  351. if (meta->cur_recno == RECNO_OOB)
  352. meta->cur_recno++;
  353. if (meta->cur_recno == meta->first_recno) {
  354. meta->cur_recno--;
  355. if (meta->cur_recno == RECNO_OOB)
  356. meta->cur_recno--;
  357. (void)__LPUT(dbc, lock);
  358. ret = EFBIG;
  359. goto err;
  360. }
  361. if (QAM_BEFORE_FIRST(meta, recno))
  362. meta->first_recno = recno;
  363. /* Lock the record and release meta page lock. */
  364. if ((ret = __db_lget(dbc, LCK_COUPLE_ALWAYS,
  365.     recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &lock)) != 0) {
  366. (void)__LPUT(dbc, lock);
  367. goto err;
  368. }
  369. /*
  370.  * The application may modify the data based on the selected record
  371.  * number.
  372.  */
  373. if (dbc->dbp->db_append_recno != NULL &&
  374.     (ret = dbc->dbp->db_append_recno(dbc->dbp, data, recno)) != 0) {
  375. (void)__LPUT(dbc, lock);
  376. goto err;
  377. }
  378. cp->lock = lock;
  379. cp->lock_mode = DB_LOCK_WRITE;
  380. pg = QAM_RECNO_PAGE(dbp, recno);
  381. /* Fetch and write lock the data page. */
  382. if ((ret = __db_lget(dbc, 0, pg,  DB_LOCK_WRITE, 0, &lock)) != 0)
  383. goto err;
  384. if ((ret = __qam_fget(dbp, &pg, DB_MPOOL_CREATE, &page)) != 0) {
  385. /* We did not fetch it, we can release the lock. */
  386. (void)__LPUT(dbc, lock);
  387. goto err;
  388. }
  389. /* See if this is a new page. */
  390. if (page->pgno == 0) {
  391. page->pgno = pg;
  392. page->type = P_QAMDATA;
  393. }
  394. /* Put the item on the page and log it. */
  395. ret = __qam_pitem(dbc, page,
  396.     QAM_RECNO_INDEX(dbp, pg, recno), recno, data);
  397. /* Doing record locking, release the page lock */
  398. if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0)
  399. ret = t_ret;
  400. if ((t_ret
  401.     = __qam_fput(dbp, pg, page, DB_MPOOL_DIRTY)) != 0 && ret == 0)
  402. ret = t_ret;
  403. /* Return the record number to the user. */
  404. if (ret == 0)
  405. ret = __db_retcopy(dbp->dbenv, key,
  406.     &recno, sizeof(recno), &dbc->rkey->data, &dbc->rkey->ulen);
  407. /* Position the cursor on this record. */
  408. cp->recno = recno;
  409. /* See if we are leaving the extent. */
  410. qp = (QUEUE *) dbp->q_internal;
  411. if (qp->page_ext != 0 &&
  412.     (recno % (qp->page_ext * qp->rec_page) == 0 ||
  413.     recno == UINT32_T_MAX)) {
  414. if ((ret = __db_lget(dbc,
  415.     0, ((QUEUE *)dbp->q_internal)->q_meta,
  416.     DB_LOCK_WRITE, 0, &lock)) != 0)
  417. goto err;
  418. if (!QAM_AFTER_CURRENT(meta, recno))
  419. ret = __qam_fclose(dbp, pg);
  420. (void)__LPUT(dbc, lock);
  421. }
  422. err:
  423. /* Release the meta page. */
  424. if ((t_ret = mpf->put(mpf, meta, DB_MPOOL_DIRTY)) != 0 && ret == 0)
  425. ret = t_ret;
  426. return (ret);
  427. }
  428. /*
  429.  * __qam_c_del --
  430.  * Qam cursor->am_del function
  431.  */
  432. static int
  433. __qam_c_del(dbc)
  434. DBC *dbc;
  435. {
  436. DB *dbp;
  437. DBT data;
  438. DB_LOCK lock;
  439. DB_MPOOLFILE *mpf;
  440. PAGE *pagep;
  441. QAMDATA *qp;
  442. QMETA *meta;
  443. QUEUE_CURSOR *cp;
  444. db_pgno_t pg;
  445. db_recno_t first;
  446. int exact, ret, t_ret;
  447. dbp = dbc->dbp;
  448. mpf = dbp->mpf;
  449. cp = (QUEUE_CURSOR *)dbc->internal;
  450. pg = ((QUEUE *)dbp->q_internal)->q_meta;
  451. /*
  452.  * Get the meta page first, we don't want to write lock it while
  453.  * trying to pin it.
  454.  */
  455. if ((ret = mpf->get(mpf, &pg, 0, &meta)) != 0)
  456. return (ret);
  457. /* Write lock the meta page. */
  458. if ((ret = __db_lget(dbc, 0, pg,  DB_LOCK_READ, 0, &lock)) != 0) {
  459. (void)mpf->put(mpf, meta, 0);
  460. return (ret);
  461. }
  462. if (QAM_NOT_VALID(meta, cp->recno))
  463. ret = DB_NOTFOUND;
  464. first = meta->first_recno;
  465. /* Don't hold the meta page long term. */
  466. if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0)
  467. ret = t_ret;
  468. if (ret != 0)
  469. goto err1;
  470. if ((ret = __db_lget(dbc,
  471.     0, cp->recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &lock)) != 0)
  472. goto err1;
  473. cp->lock_mode = DB_LOCK_WRITE;
  474. /* Find the record ; delete only deletes exact matches. */
  475. if ((ret = __qam_position(dbc,
  476.     &cp->recno, QAM_WRITE, &exact)) != 0) {
  477. cp->lock = lock;
  478. goto err1;
  479. }
  480. if (!exact) {
  481. ret = DB_NOTFOUND;
  482. goto err1;
  483. }
  484. pagep = cp->page;
  485. qp = QAM_GET_RECORD(dbp, pagep, cp->indx);
  486. if (DBC_LOGGING(dbc)) {
  487. if (((QUEUE *)dbp->q_internal)->page_ext == 0 ||
  488.     ((QUEUE *)dbp->q_internal)->re_len == 0) {
  489. if ((ret = __qam_del_log(dbp,
  490.     dbc->txn, &LSN(pagep), 0, &LSN(pagep),
  491.     pagep->pgno, cp->indx, cp->recno)) != 0)
  492. goto err1;
  493. } else {
  494. data.size = ((QUEUE *)dbp->q_internal)->re_len;
  495. data.data = qp->data;
  496. if ((ret = __qam_delext_log(dbp,
  497.     dbc->txn, &LSN(pagep), 0, &LSN(pagep),
  498.     pagep->pgno, cp->indx, cp->recno, &data)) != 0)
  499. goto err1;
  500. }
  501. }
  502. F_CLR(qp, QAM_VALID);
  503. if (cp->recno == first) {
  504. pg = ((QUEUE *)dbp->q_internal)->q_meta;
  505. if ((ret =
  506.     __db_lget(dbc, 0, pg,  DB_LOCK_WRITE, 0, &lock)) != 0)
  507. goto err1;
  508. ret = __qam_consume(dbc, meta, first);
  509. if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0)
  510. ret = t_ret;
  511. }
  512. err1:
  513. if ((t_ret = mpf->put(mpf, meta, 0)) != 0 && ret == 0)
  514. ret = t_ret;
  515. if (cp->page != NULL && (t_ret = __qam_fput(dbp, cp->pgno,
  516.     cp->page, ret == 0 ? DB_MPOOL_DIRTY : 0)) != 0 && ret == 0)
  517. ret = t_ret;
  518. cp->page = NULL;
  519. /* Doing record locking, release the page lock */
  520. if ((t_ret = __LPUT(dbc, cp->lock)) != 0 && ret == 0)
  521. ret = t_ret;
  522. cp->lock = lock;
  523. return (ret);
  524. }
  525. #ifdef DEBUG_WOP
  526. #define QDEBUG
  527. #endif
  528. /*
  529.  * __qam_c_get --
  530.  * Queue cursor->c_get function.
  531.  */
  532. static int
  533. __qam_c_get(dbc, key, data, flags, pgnop)
  534. DBC *dbc;
  535. DBT *key, *data;
  536. u_int32_t flags;
  537. db_pgno_t *pgnop;
  538. {
  539. DB *dbp;
  540. DBC *dbcdup;
  541. DBT tmp;
  542. DB_ENV *dbenv;
  543. DB_LOCK lock, pglock, metalock;
  544. DB_MPOOLFILE *mpf;
  545. PAGE *pg;
  546. QAMDATA *qp;
  547. QMETA *meta;
  548. QUEUE *t;
  549. QUEUE_CURSOR *cp;
  550. db_lockmode_t lock_mode;
  551. db_pgno_t metapno;
  552. db_recno_t first;
  553. qam_position_mode mode;
  554. int exact, is_first, locked, ret, t_ret, wait, with_delete;
  555. int put_mode, meta_dirty, retrying;
  556. dbp = dbc->dbp;
  557. dbenv = dbp->dbenv;
  558. mpf = dbp->mpf;
  559. cp = (QUEUE_CURSOR *)dbc->internal;
  560. PANIC_CHECK(dbenv);
  561. wait = 0;
  562. with_delete = 0;
  563. retrying = 0;
  564. lock_mode = DB_LOCK_READ;
  565. put_mode = 0;
  566. t_ret = 0;
  567. *pgnop = 0;
  568. pg = NULL;
  569. mode = QAM_READ;
  570. if (F_ISSET(dbc, DBC_RMW)) {
  571. lock_mode = DB_LOCK_WRITE;
  572. mode = QAM_WRITE;
  573. }
  574. if (flags == DB_CONSUME_WAIT) {
  575. wait = 1;
  576. flags = DB_CONSUME;
  577. }
  578. if (flags == DB_CONSUME) {
  579. if ((ret = __db_check_txn(dbp, dbc->txn, dbc->locker, 0)) != 0)
  580. return (ret);
  581. with_delete = 1;
  582. flags = DB_FIRST;
  583. lock_mode = DB_LOCK_WRITE;
  584. mode = QAM_CONSUME;
  585. }
  586. DEBUG_LREAD(dbc, dbc->txn, "qam_c_get",
  587.     flags == DB_SET || flags == DB_SET_RANGE ? key : NULL, NULL, flags);
  588. /* Make lint and friends happy. */
  589. meta_dirty = 0;
  590. locked = 0;
  591. is_first = 0;
  592. t = (QUEUE *)dbp->q_internal;
  593. metapno = t->q_meta;
  594. /*
  595.  * Get the meta page first, we don't want to write lock it while
  596.  * trying to pin it.  This is because someone my have it pinned
  597.  * but not locked.
  598.  */
  599. if ((ret = mpf->get(mpf, &metapno, 0, &meta)) != 0)
  600. return (ret);
  601. if ((ret = __db_lget(dbc, 0, metapno, lock_mode, 0, &metalock)) != 0)
  602. goto err;
  603. locked = 1;
  604. first = 0;
  605. /* Release any previous lock if not in a transaction. */
  606. (void)__TLPUT(dbc, cp->lock);
  607. retry: /* Update the record number. */
  608. switch (flags) {
  609. case DB_CURRENT:
  610. break;
  611. case DB_NEXT_DUP:
  612. ret = DB_NOTFOUND;
  613. goto err;
  614. /* NOTREACHED */
  615. case DB_NEXT:
  616. case DB_NEXT_NODUP:
  617. if (cp->recno != RECNO_OOB) {
  618. ++cp->recno;
  619. /* Wrap around, skipping zero. */
  620. if (cp->recno == RECNO_OOB)
  621. cp->recno++;
  622. break;
  623. }
  624. /* FALLTHROUGH */
  625. case DB_FIRST:
  626. flags = DB_NEXT;
  627. is_first = 1;
  628. /* get the first record number */
  629. cp->recno = first = meta->first_recno;
  630. break;
  631. case DB_PREV:
  632. case DB_PREV_NODUP:
  633. if (cp->recno != RECNO_OOB) {
  634. if (QAM_BEFORE_FIRST(meta, cp->recno) ||
  635.     cp->recno == meta->first_recno) {
  636. ret = DB_NOTFOUND;
  637. goto err;
  638. }
  639. --cp->recno;
  640. /* Wrap around, skipping zero. */
  641. if (cp->recno == RECNO_OOB)
  642. --cp->recno;
  643. break;
  644. }
  645. /* FALLTHROUGH */
  646. case DB_LAST:
  647. if (meta->first_recno == meta->cur_recno) {
  648. ret = DB_NOTFOUND;
  649. goto err;
  650. }
  651. cp->recno = meta->cur_recno - 1;
  652. if (cp->recno == RECNO_OOB)
  653. cp->recno--;
  654. break;
  655. case DB_SET:
  656. case DB_SET_RANGE:
  657. case DB_GET_BOTH:
  658. case DB_GET_BOTH_RANGE:
  659. if ((ret = __qam_getno(dbp, key, &cp->recno)) != 0)
  660. goto err;
  661. break;
  662. default:
  663. ret = __db_unknown_flag(dbenv, "__qam_c_get", flags);
  664. goto err;
  665. }
  666. /*
  667.  * Check to see if we are out of data.  Current points to
  668.  * the first free slot.
  669.  */
  670. if (cp->recno == meta->cur_recno ||
  671.     QAM_AFTER_CURRENT(meta, cp->recno)) {
  672. ret = DB_NOTFOUND;
  673. pg = NULL;
  674. if (wait) {
  675. flags = DB_FIRST;
  676. /*
  677.  * If first is not set, then we skipped a
  678.  * locked record, go back and find it.
  679.  * If we find a locked record again
  680.  * wait for it.
  681.  */
  682. if (first == 0) {
  683. retrying = 1;
  684. goto retry;
  685. }
  686. if (CDB_LOCKING(dbenv)) {
  687. if ((ret = dbenv->lock_get(
  688.     dbenv, dbc->locker,
  689.     DB_LOCK_SWITCH, &dbc->lock_dbt,
  690.     DB_LOCK_WAIT, &dbc->mylock)) != 0)
  691. goto err;
  692. if ((ret = dbenv->lock_get(
  693.     dbenv, dbc->locker,
  694.     DB_LOCK_UPGRADE, &dbc->lock_dbt,
  695.     DB_LOCK_WRITE, &dbc->mylock)) != 0)
  696. goto err;
  697. goto retry;
  698. }
  699. /*
  700.  * Wait for someone to update the meta page.
  701.  * This will probably mean there is something
  702.  * in the queue.  We then go back up and
  703.  * try again.
  704.  */
  705. if (locked == 0) {
  706. if ((ret = __db_lget( dbc,
  707.     0, metapno, lock_mode, 0, &metalock)) != 0)
  708. goto err;
  709. locked = 1;
  710. if (cp->recno != RECNO_OOB &&
  711.     !QAM_AFTER_CURRENT(meta, cp->recno))
  712. goto retry;
  713. }
  714. if ((ret = __db_lget(dbc, 0, metapno,
  715.     DB_LOCK_WAIT, DB_LOCK_SWITCH, &metalock)) != 0)
  716. goto err;
  717. if ((ret = dbenv->lock_get(dbenv, dbc->locker,
  718.     DB_LOCK_UPGRADE, &dbc->lock_dbt, DB_LOCK_WRITE,
  719.     &metalock)) != 0)
  720. goto err;
  721. locked = 1;
  722. goto retry;
  723. }
  724. goto err;
  725. }
  726. /* Don't hold the meta page long term. */
  727. if (locked) {
  728. if ((ret = __LPUT(dbc, metalock)) != 0)
  729. goto err;
  730. locked = 0;
  731. }
  732. /* Lock the record. */
  733. if ((ret = __db_lget(dbc, 0, cp->recno, lock_mode,
  734.     (with_delete && !retrying) ?
  735.     DB_LOCK_NOWAIT | DB_LOCK_RECORD : DB_LOCK_RECORD,
  736.     &lock)) == DB_LOCK_NOTGRANTED && with_delete) {
  737. #ifdef QDEBUG
  738. __db_logmsg(dbenv,
  739.     dbc->txn, "Queue S", 0, "%x %d %d %d",
  740.     dbc->locker, cp->recno, first, meta->first_recno);
  741. #endif
  742. first = 0;
  743. if ((ret =
  744.     __db_lget(dbc, 0, metapno, lock_mode, 0, &metalock)) != 0)
  745. goto err;
  746. locked = 1;
  747. goto retry;
  748. }
  749. if (ret != 0)
  750. goto err;
  751. /*
  752.  * In the DB_FIRST or DB_LAST cases we must wait and then start over
  753.  * since the first/last may have moved while we slept.
  754.  * We release our locks and try again.
  755.  */
  756. if ((!with_delete && is_first) || flags == DB_LAST) {
  757. if ((ret =
  758.     __db_lget(dbc, 0, metapno, lock_mode, 0, &metalock)) != 0)
  759. goto err;
  760. if (cp->recno !=
  761.     (is_first ? meta->first_recno : (meta->cur_recno - 1))) {
  762. __LPUT(dbc, lock);
  763. if (is_first)
  764. flags = DB_FIRST;
  765. locked = 1;
  766. goto retry;
  767. }
  768. /* Don't hold the meta page long term. */
  769. if ((ret = __LPUT(dbc, metalock)) != 0)
  770. goto err;
  771. }
  772. /* Position the cursor on the record. */
  773. if ((ret = __qam_position(dbc, &cp->recno, mode, &exact)) != 0) {
  774. /* We cannot get the page, release the record lock. */
  775. (void)__LPUT(dbc, lock);
  776. goto err;
  777. }
  778. pg = cp->page;
  779. pglock = cp->lock;
  780. cp->lock = lock;
  781. cp->lock_mode = lock_mode;
  782. if (!exact) {
  783. if (flags == DB_NEXT || flags == DB_NEXT_NODUP ||
  784.     flags == DB_PREV || flags == DB_PREV_NODUP ||
  785.     flags == DB_LAST) {
  786. /* Release locks and try again. */
  787. if (pg != NULL)
  788. (void)__qam_fput(dbp, cp->pgno, pg, 0);
  789. cp->page = pg = NULL;
  790. (void)__LPUT(dbc, pglock);
  791. (void)__LPUT(dbc, cp->lock);
  792. if (flags == DB_LAST)
  793. flags = DB_PREV;
  794. if (!with_delete)
  795. is_first = 0;
  796. retrying = 0;
  797. goto retry;
  798. }
  799. /* this is for the SET and SET_RANGE cases */
  800. ret = DB_KEYEMPTY;
  801. goto err1;
  802. }
  803. /* Return the key if the user didn't give us one. */
  804. if (key != NULL) {
  805. if (flags != DB_GET_BOTH && flags != DB_GET_BOTH_RANGE &&
  806.     flags != DB_SET && flags != DB_SET_RANGE &&
  807.     (ret = __db_retcopy(dbp->dbenv,
  808.     key, &cp->recno, sizeof(cp->recno),
  809.     &dbc->rkey->data, &dbc->rkey->ulen)) != 0)
  810. goto err1;
  811. F_SET(key, DB_DBT_ISSET);
  812. }
  813. qp = QAM_GET_RECORD(dbp, pg, cp->indx);
  814. /* Return the data item. */
  815. if (flags == DB_GET_BOTH || flags == DB_GET_BOTH_RANGE) {
  816. /*
  817.  * Need to compare
  818.  */
  819. tmp.data = qp->data;
  820. tmp.size = t->re_len;
  821. if ((ret = __bam_defcmp(dbp, data, &tmp)) != 0) {
  822. ret = DB_NOTFOUND;
  823. goto err1;
  824. }
  825. }
  826. if (data != NULL &&
  827.     !F_ISSET(dbc, DBC_MULTIPLE|DBC_MULTIPLE_KEY) &&
  828.     (ret = __db_retcopy(dbp->dbenv, data,
  829.     qp->data, t->re_len, &dbc->rdata->data, &dbc->rdata->ulen)) != 0)
  830. goto err1;
  831. if (data != NULL)
  832. F_SET(data, DB_DBT_ISSET);
  833. /* Finally, if we are doing DB_CONSUME mark the record. */
  834. if (with_delete) {
  835. /*
  836.  * Assert that we're not a secondary index.  Doing a DB_CONSUME
  837.  * on a secondary makes very little sense, since one can't
  838.  * DB_APPEND there;  attempting one should be forbidden by
  839.  * the interface.
  840.  */
  841. DB_ASSERT(!F_ISSET(dbp, DB_AM_SECONDARY));
  842. /*
  843.  * Check and see if we *have* any secondary indices.
  844.  * If we do, we're a primary, so call __db_c_del_primary
  845.  * to delete the references to the item we're about to
  846.  * delete.
  847.  *
  848.  * Note that we work on a duplicated cursor, since the
  849.  * __db_ret work has already been done, so it's not safe
  850.  * to perform any additional ops on this cursor.
  851.  */
  852. if (LIST_FIRST(&dbp->s_secondaries) != NULL) {
  853. if ((ret = __db_c_idup(dbc,
  854.     &dbcdup, DB_POSITIONI)) != 0)
  855. goto err1;
  856. if ((ret = __db_c_del_primary(dbcdup)) != 0) {
  857. /*
  858.  * The __db_c_del_primary return is more
  859.  * interesting.
  860.  */
  861. (void)dbcdup->c_close(dbcdup);
  862. goto err1;
  863. }
  864. if ((ret = dbcdup->c_close(dbcdup)) != 0)
  865. goto err1;
  866. }
  867. if (DBC_LOGGING(dbc)) {
  868. if (t->page_ext == 0 || t->re_len == 0) {
  869. if ((ret = __qam_del_log(dbp, dbc->txn,
  870.     &LSN(pg), 0, &LSN(pg),
  871.     pg->pgno, cp->indx, cp->recno)) != 0)
  872. goto err1;
  873. } else {
  874. tmp.data = qp->data;
  875. tmp.size = t->re_len;
  876. if ((ret = __qam_delext_log(dbp,
  877.    dbc->txn, &LSN(pg), 0, &LSN(pg),
  878.    pg->pgno, cp->indx, cp->recno, &tmp)) != 0)
  879. goto err1;
  880. }
  881. }
  882. F_CLR(qp, QAM_VALID);
  883. put_mode = DB_MPOOL_DIRTY;
  884. if ((ret = __LPUT(dbc, pglock)) != 0)
  885. goto err1;
  886. /*
  887.  * Now we need to update the metapage
  888.  * first pointer. If we have deleted
  889.  * the record that is pointed to by
  890.  * first_recno then we move it as far
  891.  * forward as we can without blocking.
  892.  * The metapage lock must be held for
  893.  * the whole scan otherwise someone could
  894.  * do a random insert behind where we are
  895.  * looking.
  896.  */
  897. if (locked == 0 && (ret = __db_lget(
  898.     dbc, 0, metapno, lock_mode, 0, &metalock)) != 0)
  899. goto err1;
  900. locked = 1;
  901. #ifdef QDEBUG
  902. __db_logmsg(dbenv,
  903.     dbc->txn, "Queue D", 0, "%x %d %d %d",
  904.     dbc->locker, cp->recno, first, meta->first_recno);
  905. #endif
  906. /*
  907.  * See if we deleted the "first" record.  If
  908.  * first is zero then we skipped something,
  909.  * see if first_recno has been move passed
  910.  * that to the record that we deleted.
  911.  */
  912. if (first == 0)
  913. first = cp->recno;
  914. if (first != meta->first_recno)
  915. goto done;
  916. if ((ret = __qam_consume(dbc, meta, first)) != 0)
  917. goto err1;
  918. }
  919. done:
  920. err1: if (cp->page != NULL) {
  921. t_ret = __qam_fput(dbp, cp->pgno, cp->page, put_mode);
  922. if (!ret)
  923. ret = t_ret;
  924. /* Doing record locking, release the page lock */
  925. t_ret = __LPUT(dbc, pglock);
  926. cp->page = NULL;
  927. }
  928. err: if (!ret)
  929. ret = t_ret;
  930. if (meta) {
  931. /* release the meta page */
  932. t_ret = mpf->put(mpf, meta, meta_dirty ? DB_MPOOL_DIRTY : 0);
  933. if (!ret)
  934. ret = t_ret;
  935. /* Don't hold the meta page long term. */
  936. if (locked)
  937. t_ret = __LPUT(dbc, metalock);
  938. }
  939. DB_ASSERT(!LOCK_ISSET(metalock));
  940. /*
  941.  * There is no need to keep the record locked if we are
  942.  * not in a transaction.
  943.  */
  944. if (t_ret == 0)
  945. t_ret = __TLPUT(dbc, cp->lock);
  946. return (ret ? ret : t_ret);
  947. }
  948. /*
  949.  * __qam_consume -- try to reset the head of the queue.
  950.  *
  951.  */
  952. static int
  953. __qam_consume(dbc, meta, first)
  954. DBC *dbc;
  955. QMETA *meta;
  956. db_recno_t first;
  957. {
  958. DB *dbp;
  959. DB_LOCK lock, save_lock;
  960. DB_MPOOLFILE *mpf;
  961. QUEUE_CURSOR *cp;
  962. db_indx_t save_indx;
  963. db_pgno_t save_page;
  964. db_recno_t current, save_recno;
  965. u_int32_t rec_extent;
  966. int exact, put_mode, ret, t_ret, wrapped;
  967. dbp = dbc->dbp;
  968. mpf = dbp->mpf;
  969. cp = (QUEUE_CURSOR *)dbc->internal;
  970. put_mode = DB_MPOOL_DIRTY;
  971. ret = t_ret = 0;
  972. save_page = cp->pgno;
  973. save_indx = cp->indx;
  974. save_recno = cp->recno;
  975. save_lock = cp->lock;
  976. /*
  977.  * If we skipped some deleted records, we need to
  978.  * reposition on the first one.  Get a lock
  979.  * in case someone is trying to put it back.
  980.  */
  981. if (first != cp->recno) {
  982. ret = __db_lget(dbc, 0, first, DB_LOCK_READ,
  983.     DB_LOCK_NOWAIT | DB_LOCK_RECORD, &lock);
  984. if (ret == DB_LOCK_NOTGRANTED) {
  985. ret = 0;
  986. goto done;
  987. }
  988. if (ret != 0)
  989. goto done;
  990. if ((ret =
  991.     __qam_fput(dbp, cp->pgno, cp->page, put_mode)) != 0)
  992. goto done;
  993. cp->page = NULL;
  994. put_mode = 0;
  995. if ((ret = __qam_position(dbc,
  996.     &first, QAM_READ, &exact)) != 0 || exact != 0) {
  997. (void)__LPUT(dbc, lock);
  998. goto done;
  999. }
  1000. if ((ret =__LPUT(dbc, lock)) != 0)
  1001. goto done;
  1002. if ((ret = __LPUT(dbc, cp->lock)) != 0)
  1003. goto done;
  1004. }
  1005. current = meta->cur_recno;
  1006. wrapped = 0;
  1007. if (first > current)
  1008. wrapped = 1;
  1009. rec_extent = meta->page_ext * meta->rec_page;
  1010. /* Loop until we find a record or hit current */
  1011. for (;;) {
  1012. /*
  1013.  * Check to see if we are moving off the extent
  1014.  * and remove the extent.
  1015.  * If we are moving off a page we need to
  1016.  * get rid of the buffer.
  1017.  * Wait for the lagging readers to move off the
  1018.  * page.
  1019.  */
  1020. if (cp->page != NULL && rec_extent != 0 &&
  1021.     ((exact = (first % rec_extent == 0)) ||
  1022.     first % meta->rec_page == 0 ||
  1023.     first == UINT32_T_MAX)) {
  1024. if (exact == 1 && (ret = __db_lget(dbc,
  1025.     0, cp->pgno, DB_LOCK_WRITE, 0, &cp->lock)) != 0)
  1026. break;
  1027. #ifdef QDEBUG
  1028. __db_logmsg(dbp->dbenv,
  1029.     dbc->txn, "Queue R", 0, "%x %d %d %d",
  1030.     dbc->locker, cp->pgno, first, meta->first_recno);
  1031. #endif
  1032. put_mode |= DB_MPOOL_DISCARD;
  1033. if ((ret = __qam_fput(dbp,
  1034.     cp->pgno, cp->page, put_mode)) != 0)
  1035. break;
  1036. cp->page = NULL;
  1037. if (exact == 1) {
  1038. ret = __qam_fremove(dbp, cp->pgno);
  1039. t_ret = __LPUT(dbc, cp->lock);
  1040. }
  1041. if (ret != 0)
  1042. break;
  1043. if (t_ret != 0) {
  1044. ret = t_ret;
  1045. break;
  1046. }
  1047. } else if (cp->page != NULL && (ret =
  1048.     __qam_fput(dbp, cp->pgno, cp->page, put_mode)) != 0)
  1049. break;
  1050. cp->page = NULL;
  1051. first++;
  1052. if (first == RECNO_OOB) {
  1053. wrapped = 0;
  1054. first++;
  1055. }
  1056. /*
  1057.  * LOOP EXIT when we come move to the current
  1058.  * pointer.
  1059.  */
  1060. if (!wrapped && first >= current)
  1061. break;
  1062. ret = __db_lget(dbc, 0, first, DB_LOCK_READ,
  1063.     DB_LOCK_NOWAIT | DB_LOCK_RECORD, &lock);
  1064. if (ret == DB_LOCK_NOTGRANTED) {
  1065. ret = 0;
  1066. break;
  1067. }
  1068. if (ret != 0)
  1069. break;
  1070. if ((ret = __qam_position(dbc,
  1071.     &first, QAM_READ, &exact)) != 0) {
  1072. (void)__LPUT(dbc, lock);
  1073. break;
  1074. }
  1075. put_mode = 0;
  1076. if ((ret =__LPUT(dbc, lock)) != 0 ||
  1077.     (ret = __LPUT(dbc, cp->lock)) != 0 || exact) {
  1078. if ((t_ret = __qam_fput(dbp, cp->pgno,
  1079.     cp->page, put_mode)) != 0 && ret == 0)
  1080. ret = t_ret;
  1081. cp->page = NULL;
  1082. break;
  1083. }
  1084. }
  1085. cp->pgno = save_page;
  1086. cp->indx = save_indx;
  1087. cp->recno = save_recno;
  1088. cp->lock = save_lock;
  1089. /*
  1090.  * We have advanced as far as we can.
  1091.  * Advance first_recno to this point.
  1092.  */
  1093. if (ret == 0 && meta->first_recno != first) {
  1094. #ifdef QDEBUG
  1095. __db_logmsg(dbp->dbenv, dbc->txn, "Queue M",
  1096.     0, "%x %d %d %d", dbc->locker, cp->recno,
  1097.     first, meta->first_recno);
  1098. #endif
  1099. if (DBC_LOGGING(dbc))
  1100. if ((ret = __qam_incfirst_log(dbp,
  1101.     dbc->txn, &meta->dbmeta.lsn, 0,
  1102.     cp->recno, PGNO_BASE_MD)) != 0)
  1103. goto done;
  1104. meta->first_recno = first;
  1105. (void)mpf->set(mpf, meta, DB_MPOOL_DIRTY);
  1106. }
  1107. done:
  1108. return (ret);
  1109. }
  1110. static int
  1111. __qam_bulk(dbc, data, flags)
  1112. DBC *dbc;
  1113. DBT *data;
  1114. u_int32_t flags;
  1115. {
  1116. DB *dbp;
  1117. DB_LOCK metalock;
  1118. DB_MPOOLFILE *mpf;
  1119. PAGE *pg;
  1120. QMETA *meta;
  1121. QAMDATA *qp;
  1122. QUEUE_CURSOR *cp;
  1123. db_indx_t indx;
  1124. db_pgno_t metapno;
  1125. qam_position_mode mode;
  1126. int32_t  *endp, *offp;
  1127. u_int8_t *dbuf, *dp, *np;
  1128. int exact, recs, re_len, ret, t_ret, valid;
  1129. int is_key, need_pg, pagesize, size, space;
  1130. dbp = dbc->dbp;
  1131. mpf = dbp->mpf;
  1132. cp = (QUEUE_CURSOR *)dbc->internal;
  1133. mode = QAM_READ;
  1134. if (F_ISSET(dbc, DBC_RMW))
  1135. mode = QAM_WRITE;
  1136. pagesize = dbp->pgsize;
  1137. re_len = ((QUEUE *)dbp->q_internal)->re_len;
  1138. recs = ((QUEUE *)dbp->q_internal)->rec_page;
  1139. metapno = ((QUEUE *)dbp->q_internal)->q_meta;
  1140. is_key = LF_ISSET(DB_MULTIPLE_KEY) ? 1 : 0;
  1141. size = 0;
  1142. if ((ret = __db_lget(dbc, 0, metapno, DB_LOCK_READ, 0, &metalock)) != 0)
  1143. return (ret);
  1144. if ((ret = mpf->get(mpf, &metapno, 0, &meta)) != 0) {
  1145. /* We did not fetch it, we can release the lock. */
  1146. (void)__LPUT(dbc, metalock);
  1147. return (ret);
  1148. }
  1149. dbuf = data->data;
  1150. np = dp = dbuf;
  1151. /* Keep track of space that is left.  There is an termination entry */
  1152. space = data->ulen;
  1153. space -= sizeof(*offp);
  1154. /* Build the offset/size table form the end up. */
  1155. endp = (int32_t *) ((u_int8_t *)dbuf + data->ulen);
  1156. endp--;
  1157. offp = endp;
  1158. next_pg:
  1159. if ((ret = __qam_position(dbc, &cp->recno, mode, &exact)) != 0)
  1160. goto done;
  1161. pg = cp->page;
  1162. indx = cp->indx;
  1163. need_pg = 1;
  1164. do {
  1165. /*
  1166.  * If this page is a nonexistent page at the end of an
  1167.  * extent, pg may be NULL.  A NULL page has no valid records,
  1168.  * so just keep looping as though qp exists and isn't QAM_VALID;
  1169.  * calling QAM_GET_RECORD is unsafe.
  1170.  */
  1171. valid = 0;
  1172. /* Wrap around, skipping zero. */
  1173. if (cp->recno == RECNO_OOB)
  1174. cp->recno++;
  1175. if (pg != NULL) {
  1176. qp = QAM_GET_RECORD(dbp, pg, indx);
  1177. if (F_ISSET(qp, QAM_VALID)) {
  1178. valid = 1;
  1179. space -= (is_key ? 3 : 2) * sizeof(*offp);
  1180. if (space < 0)
  1181. goto get_space;
  1182. if (need_pg) {
  1183. dp = np;
  1184. size = pagesize - QPAGE_SZ(dbp);
  1185. if (space < size) {
  1186. get_space:
  1187. if (offp == endp) {
  1188. data->size =
  1189.     ALIGN(size +
  1190.     pagesize,
  1191.     sizeof(u_int32_t));
  1192. ret = ENOMEM;
  1193. break;
  1194. }
  1195. if (indx != 0)
  1196. indx--;
  1197. cp->recno--;
  1198. break;
  1199. }
  1200. memcpy(dp,
  1201.     (char *)pg + QPAGE_SZ(dbp), size);
  1202. need_pg = 0;
  1203. space -= size;
  1204. np += size;
  1205. }
  1206. if (is_key)
  1207. *offp-- = cp->recno;
  1208. *offp-- = (int32_t)((u_int8_t*)qp -
  1209.     (u_int8_t*)pg - QPAGE_SZ(dbp) +
  1210.     dp - dbuf + SSZA(QAMDATA, data));
  1211. *offp-- = re_len;
  1212. }
  1213. }
  1214. if (!valid && is_key == 0) {
  1215. *offp-- = 0;
  1216. *offp-- = 0;
  1217. }
  1218. cp->recno++;
  1219. } while (++indx < recs && indx != RECNO_OOB
  1220.     && cp->recno != meta->cur_recno
  1221.     && !QAM_AFTER_CURRENT(meta, cp->recno));
  1222. if ((t_ret = __TLPUT(dbc, cp->lock)) != 0 && ret == 0)
  1223. ret = t_ret;
  1224. if (cp->page != NULL) {
  1225. if ((t_ret =
  1226.     __qam_fput(dbp, cp->pgno, cp->page, 0)) != 0 && ret == 0)
  1227. ret = t_ret;
  1228. cp->page = NULL;
  1229. }
  1230. if (ret == 0
  1231.     && (indx >= recs || indx == RECNO_OOB)
  1232.     && cp->recno != meta->cur_recno
  1233.     && !QAM_AFTER_CURRENT(meta, cp->recno))
  1234. goto next_pg;
  1235. if (is_key == 1)
  1236. *offp = RECNO_OOB;
  1237. else
  1238. *offp = -1;
  1239. done:
  1240. /* release the meta page */
  1241. t_ret = mpf->put(mpf, meta, 0);
  1242. if (!ret)
  1243. ret = t_ret;
  1244. t_ret = __LPUT(dbc, metalock);
  1245. return (ret);
  1246. }
  1247. /*
  1248.  * __qam_c_close --
  1249.  * Close down the cursor from a single use.
  1250.  */
  1251. static int
  1252. __qam_c_close(dbc, root_pgno, rmroot)
  1253. DBC *dbc;
  1254. db_pgno_t root_pgno;
  1255. int *rmroot;
  1256. {
  1257. QUEUE_CURSOR *cp;
  1258. COMPQUIET(root_pgno, 0);
  1259. COMPQUIET(rmroot, NULL);
  1260. cp = (QUEUE_CURSOR *)dbc->internal;
  1261. /* Discard any locks not acquired inside of a transaction. */
  1262. (void)__TLPUT(dbc, cp->lock);
  1263. LOCK_INIT(cp->lock);
  1264. cp->page = NULL;
  1265. cp->pgno = PGNO_INVALID;
  1266. cp->indx = 0;
  1267. cp->lock_mode = DB_LOCK_NG;
  1268. cp->recno = RECNO_OOB;
  1269. cp->flags = 0;
  1270. return (0);
  1271. }
  1272. /*
  1273.  * __qam_c_dup --
  1274.  * Duplicate a queue cursor, such that the new one holds appropriate
  1275.  * locks for the position of the original.
  1276.  *
  1277.  * PUBLIC: int __qam_c_dup __P((DBC *, DBC *));
  1278.  */
  1279. int
  1280. __qam_c_dup(orig_dbc, new_dbc)
  1281. DBC *orig_dbc, *new_dbc;
  1282. {
  1283. QUEUE_CURSOR *orig, *new;
  1284. orig = (QUEUE_CURSOR *)orig_dbc->internal;
  1285. new = (QUEUE_CURSOR *)new_dbc->internal;
  1286. new->recno = orig->recno;
  1287. /* reget the long term lock if we are not in a xact */
  1288. if (orig_dbc->txn != NULL ||
  1289.     !STD_LOCKING(orig_dbc) || !LOCK_ISSET(orig->lock))
  1290. return (0);
  1291. return (__db_lget(new_dbc,
  1292.     0, new->recno, new->lock_mode, DB_LOCK_RECORD, &new->lock));
  1293. }
  1294. /*
  1295.  * __qam_c_init
  1296.  *
  1297.  * PUBLIC: int __qam_c_init __P((DBC *));
  1298.  */
  1299. int
  1300. __qam_c_init(dbc)
  1301. DBC *dbc;
  1302. {
  1303. QUEUE_CURSOR *cp;
  1304. DB *dbp;
  1305. int ret;
  1306. dbp = dbc->dbp;
  1307. /* Allocate the internal structure. */
  1308. cp = (QUEUE_CURSOR *)dbc->internal;
  1309. if (cp == NULL) {
  1310. if ((ret =
  1311.     __os_calloc(dbp->dbenv, 1, sizeof(QUEUE_CURSOR), &cp)) != 0)
  1312. return (ret);
  1313. dbc->internal = (DBC_INTERNAL *)cp;
  1314. }
  1315. /* Initialize methods. */
  1316. dbc->c_close = __db_c_close;
  1317. dbc->c_count = __db_c_count;
  1318. dbc->c_del = __db_c_del;
  1319. dbc->c_dup = __db_c_dup;
  1320. dbc->c_get = dbc->c_real_get = __db_c_get;
  1321. dbc->c_pget = __db_c_pget;
  1322. dbc->c_put = __db_c_put;
  1323. dbc->c_am_bulk = __qam_bulk;
  1324. dbc->c_am_close = __qam_c_close;
  1325. dbc->c_am_del = __qam_c_del;
  1326. dbc->c_am_destroy = __qam_c_destroy;
  1327. dbc->c_am_get = __qam_c_get;
  1328. dbc->c_am_put = __qam_c_put;
  1329. dbc->c_am_writelock = NULL;
  1330. return (0);
  1331. }
  1332. /*
  1333.  * __qam_c_destroy --
  1334.  * Close a single cursor -- internal version.
  1335.  */
  1336. static int
  1337. __qam_c_destroy(dbc)
  1338. DBC *dbc;
  1339. {
  1340. /* Discard the structures. */
  1341. __os_free(dbc->dbp->dbenv, dbc->internal);
  1342. return (0);
  1343. }
  1344. /*
  1345.  * __qam_getno --
  1346.  * Check the user's record number.
  1347.  */
  1348. static int
  1349. __qam_getno(dbp, key, rep)
  1350. DB *dbp;
  1351. const DBT *key;
  1352. db_recno_t *rep;
  1353. {
  1354. if ((*rep = *(db_recno_t *)key->data) == 0) {
  1355. __db_err(dbp->dbenv, "illegal record number of 0");
  1356. return (EINVAL);
  1357. }
  1358. return (0);
  1359. }
  1360. /*
  1361.  * __qam_truncate --
  1362.  * Truncate a queue database
  1363.  *
  1364.  * PUBLIC: int __qam_truncate __P((DB *, DB_TXN *, u_int32_t *));
  1365.  */
  1366. int
  1367. __qam_truncate(dbp, txn, countp)
  1368. DB *dbp;
  1369. DB_TXN *txn;
  1370. u_int32_t *countp;
  1371. {
  1372. DBC *dbc;
  1373. DB_LOCK metalock;
  1374. DB_MPOOLFILE *mpf;
  1375. QMETA *meta;
  1376. db_pgno_t metapno;
  1377. int count, ret, t_ret;
  1378. mpf = dbp->mpf;
  1379. /* Acquire a cursor. */
  1380. if ((ret = dbp->cursor(dbp, txn, &dbc, 0)) != 0)
  1381. return (ret);
  1382. /* Walk the queue, counting rows. */
  1383. count = 0;
  1384. while ((ret = __qam_c_get(dbc, NULL, NULL, DB_CONSUME, &metapno)) == 0)
  1385. count++;
  1386. if (ret == DB_NOTFOUND)
  1387. ret = 0;
  1388. /* Discard the cursor. */
  1389. if ((t_ret = dbc->c_close(dbc)) != 0 && ret == 0)
  1390. ret = t_ret;
  1391. if (ret != 0)
  1392. return (ret);
  1393. /* update the meta page */
  1394. /* get the meta page */
  1395. metapno = ((QUEUE *)dbp->q_internal)->q_meta;
  1396. if ((ret =
  1397.     __db_lget(dbc, 0, metapno, DB_LOCK_WRITE, 0, &metalock)) != 0)
  1398. return (ret);
  1399. if ((ret = mpf->get(mpf, &metapno, 0, &meta)) != 0) {
  1400. /* We did not fetch it, we can release the lock. */
  1401. (void)__LPUT(dbc, metalock);
  1402. return (ret);
  1403. }
  1404. if (DBC_LOGGING(dbc)) {
  1405. ret = __qam_mvptr_log(dbp, dbc->txn, &meta->dbmeta.lsn, 0,
  1406.     QAM_SETCUR | QAM_SETFIRST | QAM_TRUNCATE, meta->first_recno,
  1407.     1, meta->cur_recno, 1, &meta->dbmeta.lsn, PGNO_BASE_MD);
  1408. }
  1409. if (ret == 0)
  1410. meta->first_recno = meta->cur_recno = 1;
  1411. if ((t_ret =
  1412.     mpf->put(mpf, meta, ret == 0 ? DB_MPOOL_DIRTY: 0)) != 0 && ret == 0)
  1413. ret = t_ret;
  1414. if ((t_ret = __LPUT(dbc, metalock)) != 0 && ret == 0)
  1415. ret = t_ret;
  1416. *countp = count;
  1417. return (ret);
  1418. }