qam.c
上传用户:tsgydb
上传日期:2007-04-14
资源大小:10674k
文件大小:31k
- /*-
- * See the file LICENSE for redistribution information.
- *
- * Copyright (c) 1999, 2000
- * Sleepycat Software. All rights reserved.
- */
- #include "db_config.h"
- #ifndef lint
- static const char revid[] = "$Id: qam.c,v 11.72 2001/01/16 20:10:55 ubell Exp $";
- #endif /* not lint */
- #ifndef NO_SYSTEM_INCLUDES
- #include <sys/types.h>
- #include <string.h>
- #endif
- #include "db_int.h"
- #include "db_page.h"
- #include "db_shash.h"
- #include "db_am.h"
- #include "mp.h"
- #include "lock.h"
- #include "log.h"
- #include "btree.h"
- #include "qam.h"
- static int __qam_c_close __P((DBC *, db_pgno_t, int *));
- static int __qam_c_del __P((DBC *));
- static int __qam_c_destroy __P((DBC *));
- static int __qam_c_get __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
- static int __qam_c_put __P((DBC *, DBT *, DBT *, u_int32_t, db_pgno_t *));
- static int __qam_getno __P((DB *, const DBT *, db_recno_t *));
- /*
- * __qam_position --
- * Position a queued access method cursor at a record. This returns
- * the page locked. *exactp will be set if the record is valid.
- * PUBLIC: int __qam_position
- * PUBLIC: __P((DBC *, db_recno_t *, qam_position_mode, int *));
- */
- int
- __qam_position(dbc, recnop, mode, exactp)
- DBC *dbc; /* open cursor */
- db_recno_t *recnop; /* pointer to recno to find */
- qam_position_mode mode;/* locking: read or write */
- int *exactp; /* indicate if it was found */
- {
- QUEUE_CURSOR *cp;
- DB *dbp;
- QAMDATA *qp;
- db_pgno_t pg;
- int ret;
- dbp = dbc->dbp;
- cp = (QUEUE_CURSOR *)dbc->internal;
- /* Fetch the page for this recno. */
- pg = QAM_RECNO_PAGE(dbp, *recnop);
- if ((ret = __db_lget(dbc, 0, pg, mode == QAM_READ ?
- DB_LOCK_READ : DB_LOCK_WRITE, 0, &cp->lock)) != 0)
- return (ret);
- cp->page = NULL;
- *exactp = 0;
- if ((ret = __qam_fget(dbp, &pg,
- mode == QAM_WRITE ? DB_MPOOL_CREATE : 0,
- &cp->page)) != 0) {
- /* We did not fetch it, we can release the lock. */
- (void)__LPUT(dbc, cp->lock);
- cp->lock.off = LOCK_INVALID;
- if (mode != QAM_WRITE && (ret == EINVAL || ret == ENOENT))
- return (0);
- return (ret);
- }
- cp->pgno = pg;
- cp->indx = QAM_RECNO_INDEX(dbp, pg, *recnop);
- if (PGNO(cp->page) == 0) {
- if (F_ISSET(dbp, DB_AM_RDONLY)) {
- *exactp = 0;
- return (0);
- }
- PGNO(cp->page) = pg;
- TYPE(cp->page) = P_QAMDATA;
- }
- qp = QAM_GET_RECORD(dbp, cp->page, cp->indx);
- *exactp = F_ISSET(qp, QAM_VALID);
- return (ret);
- }
- /*
- * __qam_pitem --
- * Put an item on a queue page. Copy the data to the page and set the
- * VALID and SET bits. If logging and the record was previously set,
- * log that data, otherwise just log the new data.
- *
- * pagep must be write locked
- *
- * PUBLIC: int __qam_pitem
- * PUBLIC: __P((DBC *, QPAGE *, u_int32_t, db_recno_t, DBT *));
- */
- int
- __qam_pitem(dbc, pagep, indx, recno, data)
- DBC *dbc;
- QPAGE *pagep;
- u_int32_t indx;
- db_recno_t recno;
- DBT *data;
- {
- DB *dbp;
- DBT olddata, pdata, *datap;
- QAMDATA *qp;
- QUEUE *t;
- u_int32_t size;
- u_int8_t *dest, *p;
- int alloced, ret;
- alloced = ret = 0;
- dbp = dbc->dbp;
- t = (QUEUE *)dbp->q_internal;
- if (data->size > t->re_len)
- goto len_err;
- qp = QAM_GET_RECORD(dbp, pagep, indx);
- p = qp->data;
- size = data->size;
- datap = data;
- if (F_ISSET(data, DB_DBT_PARTIAL)) {
- if (data->doff + data->dlen > t->re_len) {
- alloced = data->dlen;
- goto len_err;
- }
- if (data->size != data->dlen) {
- len_err: __db_err(dbp->dbenv,
- "Length improper for fixed length record %lu",
- (u_long)(alloced ? alloced : data->size));
- return (EINVAL);
- }
- if (data->size == t->re_len)
- goto no_partial;
- /*
- * If we are logging, then we have to build the record
- * first, otherwise, we can simply drop the change
- * directly on the page. After this clause, make
- * sure that datap and p are set up correctly so that
- * copying datap into p does the right thing.
- *
- * Note, I am changing this so that if the existing
- * record is not valid, we create a complete record
- * to log so that both this and the recovery code is simpler.
- */
- if (DB_LOGGING(dbc) || !F_ISSET(qp, QAM_VALID)) {
- datap = &pdata;
- memset(datap, 0, sizeof(*datap));
- if ((ret = __os_malloc(dbp->dbenv,
- t->re_len, NULL, &datap->data)) != 0)
- return (ret);
- alloced = 1;
- datap->size = t->re_len;
- /*
- * Construct the record if it's valid, otherwise set it
- * all to the pad character.
- */
- dest = datap->data;
- if (F_ISSET(qp, QAM_VALID))
- memcpy(dest, p, t->re_len);
- else
- memset(dest, t->re_pad, t->re_len);
- dest += data->doff;
- memcpy(dest, data->data, data->size);
- } else {
- datap = data;
- p += data->doff;
- }
- }
- no_partial:
- if (DB_LOGGING(dbc)) {
- olddata.size = 0;
- if (F_ISSET(qp, QAM_SET)) {
- olddata.data = qp->data;
- olddata.size = t->re_len;
- }
- if ((ret = __qam_add_log(dbp->dbenv, dbc->txn, &LSN(pagep),
- 0, dbp->log_fileid, &LSN(pagep), pagep->pgno,
- indx, recno, datap, qp->flags,
- olddata.size == 0 ? NULL : &olddata)) != 0)
- goto err;
- }
- F_SET(qp, QAM_VALID | QAM_SET);
- memcpy(p, datap->data, datap->size);
- if (!F_ISSET(data, DB_DBT_PARTIAL))
- memset(p + datap->size, t->re_pad, t->re_len - datap->size);
- err: if (alloced)
- __os_free(datap->data, t->re_len);
- return (ret);
- }
- /*
- * __qam_c_put
- * Cursor put for queued access method.
- * BEFORE and AFTER cannot be specified.
- */
- static int
- __qam_c_put(dbc, key, data, flags, pgnop)
- DBC *dbc;
- DBT *key, *data;
- u_int32_t flags;
- db_pgno_t *pgnop;
- {
- QUEUE_CURSOR *cp;
- DB *dbp;
- DB_LOCK lock;
- QMETA *meta;
- db_pgno_t pg;
- db_recno_t new_cur, new_first;
- u_int32_t opcode;
- int exact, ret, t_ret;
- COMPQUIET(key, NULL);
- dbp = dbc->dbp;
- if (pgnop != NULL)
- *pgnop = PGNO_INVALID;
- cp = (QUEUE_CURSOR *)dbc->internal;
- /* Write lock the record. */
- if ((ret = __db_lget(dbc,
- 0, cp->recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &lock)) != 0)
- return (ret);
- if ((ret = __qam_position(dbc,
- &cp->recno, QAM_WRITE, &exact)) != 0) {
- /* We could not get the page, we can release the record lock. */
- __LPUT(dbc, lock);
- return (ret);
- }
- if (exact && flags == DB_NOOVERWRITE) {
- ret = __TLPUT(dbc, lock);
- /* Doing record locking, release the page lock */
- if ((t_ret = __LPUT(dbc, cp->lock)) == 0)
- cp->lock.off = LOCK_INVALID;
- else
- if (ret == 0)
- ret = t_ret;
- if ((t_ret =
- __qam_fput(dbp, cp->pgno, cp->page, 0)) != 0 && ret == 0)
- ret = t_ret;
- cp->page = NULL;
- return (ret == 0 ? DB_KEYEXIST : ret);
- }
- /* Put the item on the page. */
- ret = __qam_pitem(dbc, (QPAGE *)cp->page, cp->indx, cp->recno, data);
- /* Doing record locking, release the page lock */
- if ((t_ret = __LPUT(dbc, cp->lock)) != 0 && ret == 0)
- ret = t_ret;
- if ((t_ret =
- __qam_fput(dbp, cp->pgno, cp->page, DB_MPOOL_DIRTY)) && ret == 0)
- ret = t_ret;
- cp->page = NULL;
- cp->lock = lock;
- cp->lock_mode = DB_LOCK_WRITE;
- if (ret != 0)
- return (ret);
- /* We may need to reset the head or tail of the queue. */
- pg = ((QUEUE *)dbp->q_internal)->q_meta;
- if ((ret = __db_lget(dbc, 0, pg, DB_LOCK_WRITE, 0, &lock)) != 0)
- return (ret);
- if ((ret = memp_fget(dbp->mpf, &pg, 0, &meta)) != 0) {
- /* We did not fetch it, we can release the lock. */
- (void)__LPUT(dbc, lock);
- return (ret);
- }
- opcode = 0;
- new_cur = new_first = 0;
- /*
- * If the put address is outside the queue, adjust the head and
- * tail of the queue. If the order is inverted we move
- * the one which is closer. The first case is when the
- * queue is empty, move first and current to where the new
- * insert is.
- */
- if (meta->first_recno == meta->cur_recno) {
- new_first = cp->recno;
- new_cur = cp->recno + 1;
- if (new_cur == RECNO_OOB)
- new_cur++;
- opcode |= QAM_SETFIRST;
- opcode |= QAM_SETCUR;
- } else {
- if (QAM_BEFORE_FIRST(meta, cp->recno) &&
- (meta->first_recno <= meta->cur_recno ||
- meta->first_recno - cp->recno < cp->recno - meta->cur_recno)) {
- new_first = cp->recno;
- opcode |= QAM_SETFIRST;
- }
- if (meta->cur_recno == cp->recno ||
- (QAM_AFTER_CURRENT(meta, cp->recno) &&
- (meta->first_recno <= meta->cur_recno ||
- cp->recno - meta->cur_recno <= meta->first_recno - cp->recno))) {
- new_cur = cp->recno + 1;
- if (new_cur == RECNO_OOB)
- new_cur++;
- opcode |= QAM_SETCUR;
- }
- }
- if (opcode != 0 && DB_LOGGING(dbc)) {
- ret = __qam_mvptr_log(dbp->dbenv, dbc->txn, &meta->dbmeta.lsn,
- 0, opcode, dbp->log_fileid, meta->first_recno, new_first,
- meta->cur_recno, new_cur, &meta->dbmeta.lsn);
- }
- if (opcode & QAM_SETCUR)
- meta->cur_recno = new_cur;
- if (opcode & QAM_SETFIRST)
- meta->first_recno = new_first;
- if ((t_ret =
- memp_fput(dbp->mpf, meta, opcode != 0 ? DB_MPOOL_DIRTY : 0)) != 0 &&
- ret == 0)
- ret = t_ret;
- /* Don't hold the meta page long term. */
- if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0)
- ret = t_ret;
- return (ret);
- }
- /*
- * __qam_put --
- * Add a record to the queue.
- * If we are doing anything but appending, just call qam_c_put to do the
- * work. Otherwise we fast path things here.
- *
- * PUBLIC: int __qam_put __P((DB *, DB_TXN *, DBT *, DBT *, u_int32_t));
- */
- int
- __qam_put(dbp, txn, key, data, flags)
- DB *dbp;
- DB_TXN *txn;
- DBT *key, *data;
- u_int32_t flags;
- {
- QUEUE_CURSOR *cp;
- DBC *dbc;
- DB_LOCK lock;
- QMETA *meta;
- QPAGE *page;
- QUEUE *qp;
- db_pgno_t pg;
- db_recno_t recno;
- int ret, t_ret;
- PANIC_CHECK(dbp->dbenv);
- DB_CHECK_TXN(dbp, txn);
- /* Allocate a cursor. */
- if ((ret = dbp->cursor(dbp, txn, &dbc, DB_WRITELOCK)) != 0)
- return (ret);
- DEBUG_LWRITE(dbc, dbc->txn, "qam_put", key, data, flags);
- cp = (QUEUE_CURSOR *)dbc->internal;
- /* Check for invalid flags. */
- if ((ret = __db_putchk(dbp,
- key, data, flags, F_ISSET(dbp, DB_AM_RDONLY), 0)) != 0)
- goto done;
- /* If not appending, then just call the cursor routine */
- if (flags != DB_APPEND) {
- if ((ret = __qam_getno(dbp, key, &cp->recno)) != 0)
- goto done;
- ret = __qam_c_put(dbc, NULL, data, flags, NULL);
- goto done;
- }
- /* Write lock the meta page. */
- pg = ((QUEUE *)dbp->q_internal)->q_meta;
- if ((ret = __db_lget(dbc, 0, pg, DB_LOCK_WRITE, 0, &lock)) != 0)
- goto done;
- if ((ret = memp_fget(dbp->mpf, &pg, 0, &meta)) != 0) {
- /* We did not fetch it, we can release the lock. */
- (void)__LPUT(dbc, lock);
- goto done;
- }
- /* Record that we are going to allocate a record. */
- if (DB_LOGGING(dbc)) {
- __qam_inc_log(dbp->dbenv,
- dbc->txn, &meta->dbmeta.lsn,
- 0, dbp->log_fileid, &meta->dbmeta.lsn);
- }
- /* Get the next record number. */
- recno = meta->cur_recno;
- meta->cur_recno++;
- if (meta->cur_recno == RECNO_OOB)
- meta->cur_recno++;
- if (meta->cur_recno == meta->first_recno) {
- meta->cur_recno--;
- if (meta->cur_recno == RECNO_OOB)
- meta->cur_recno--;
- (void)__LPUT(dbc, lock);
- ret = EFBIG;
- goto err;
- }
- if (QAM_BEFORE_FIRST(meta, recno))
- meta->first_recno = recno;
- /* Lock the record and release meta page lock. */
- if ((ret = __db_lget(dbc,
- 1, recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &lock)) != 0)
- goto err;
- /*
- * The application may modify the data based on the selected record
- * number.
- */
- if (flags == DB_APPEND && dbc->dbp->db_append_recno != NULL &&
- (ret = dbc->dbp->db_append_recno(dbc->dbp, data, recno)) != 0) {
- (void)__LPUT(dbc, lock);
- goto err;
- }
- cp->lock = lock;
- cp->lock_mode = DB_LOCK_WRITE;
- pg = QAM_RECNO_PAGE(dbp, recno);
- /* Fetch and write lock the data page. */
- if ((ret = __db_lget(dbc, 0, pg, DB_LOCK_WRITE, 0, &lock)) != 0)
- goto err;
- if ((ret = __qam_fget(dbp, &pg, DB_MPOOL_CREATE, &page)) != 0) {
- /* We did not fetch it, we can release the lock. */
- (void)__LPUT(dbc, lock);
- goto err;
- }
- /* See if this is a new page. */
- if (page->pgno == 0) {
- page->pgno = pg;
- page->type = P_QAMDATA;
- }
- /* Put the item on the page and log it. */
- ret = __qam_pitem(dbc, page,
- QAM_RECNO_INDEX(dbp, pg, recno), recno, data);
- /* Doing record locking, release the page lock */
- if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0)
- ret = t_ret;
- if ((t_ret
- = __qam_fput(dbp, pg, page, DB_MPOOL_DIRTY)) != 0 && ret == 0)
- ret = t_ret;
- /* Return the record number to the user. */
- if (ret == 0)
- ret = __db_retcopy(dbp, key,
- &recno, sizeof(recno), &dbc->rkey.data, &dbc->rkey.ulen);
- /* See if we are leaving the extent. */
- qp = (QUEUE *) dbp->q_internal;
- if (qp->page_ext != 0
- && (recno % (qp->page_ext * qp->rec_page) == 0
- || recno == UINT32_T_MAX)) {
- if ((ret =
- __db_lget(dbc, 0, pg, DB_LOCK_WRITE, 0, &lock)) != 0)
- goto err;
- if (!QAM_AFTER_CURRENT(meta, recno))
- ret = __qam_fclose(dbp, pg);
- (void)__LPUT(dbc, lock);
- }
- err:
- /* Release the meta page. */
- if ((t_ret
- = memp_fput(dbp->mpf, meta, DB_MPOOL_DIRTY)) != 0 && ret == 0)
- ret = t_ret;
- done:
- /* Discard the cursor. */
- if ((t_ret = dbc->c_close(dbc)) != 0 && ret == 0)
- ret = t_ret;
- return (ret);
- }
- /*
- * __qam_c_del --
- * Qam cursor->am_del function
- */
- static int
- __qam_c_del(dbc)
- DBC *dbc;
- {
- QUEUE_CURSOR *cp;
- DB *dbp;
- DBT data;
- DB_LOCK lock;
- PAGE *pagep;
- QAMDATA *qp;
- QMETA *meta;
- db_pgno_t pg;
- int exact, ret, t_ret;
- dbp = dbc->dbp;
- cp = (QUEUE_CURSOR *)dbc->internal;
- pg = ((QUEUE *)dbp->q_internal)->q_meta;
- if ((ret = __db_lget(dbc, 0, pg, DB_LOCK_READ, 0, &lock)) != 0)
- return (ret);
- if ((ret = memp_fget(dbp->mpf, &pg, 0, &meta)) != 0) {
- /* We did not fetch it, we can release the lock. */
- (void)__LPUT(dbc, lock);
- return (ret);
- }
- if (QAM_NOT_VALID(meta, cp->recno))
- ret = DB_NOTFOUND;
- /* Don't hold the meta page long term. */
- if ((t_ret = __LPUT(dbc, lock)) != 0 && ret == 0)
- ret = t_ret;
- if ((t_ret = memp_fput(dbp->mpf, meta, 0)) != 0 && ret == 0)
- ret = t_ret;
- if (ret != 0)
- return (ret);
- if ((ret = __db_lget(dbc,
- 0, cp->recno, DB_LOCK_WRITE, DB_LOCK_RECORD, &lock)) != 0)
- return (ret);
- cp->lock_mode = DB_LOCK_WRITE;
- /* Find the record ; delete only deletes exact matches. */
- if ((ret = __qam_position(dbc,
- &cp->recno, QAM_WRITE, &exact)) != 0) {
- cp->lock = lock;
- return (ret);
- }
- if (!exact) {
- ret = DB_NOTFOUND;
- goto err1;
- }
- pagep = cp->page;
- qp = QAM_GET_RECORD(dbp, pagep, cp->indx);
- if (DB_LOGGING(dbc)) {
- if (((QUEUE *)dbp->q_internal)->page_ext == 0
- || ((QUEUE *)dbp->q_internal)->re_len == 0) {
- if ((ret =
- __qam_del_log(dbp->dbenv,
- dbc->txn, &LSN(pagep), 0,
- dbp->log_fileid, &LSN(pagep),
- pagep->pgno, cp->indx, cp->recno)) != 0)
- goto err1;
- } else {
- data.size = ((QUEUE *)dbp->q_internal)->re_len;
- data.data = qp->data;
- if ((ret =
- __qam_delext_log(dbp->dbenv, dbc->txn,
- &LSN(pagep), 0, dbp->log_fileid, &LSN(pagep),
- pagep->pgno, cp->indx, cp->recno, &data)) != 0)
- goto err1;
- }
- }
- F_CLR(qp, QAM_VALID);
- err1:
- if ((t_ret = __qam_fput(
- dbp, cp->pgno, cp->page, ret == 0 ? DB_MPOOL_DIRTY : 0)) != 0)
- return (ret ? ret : t_ret);
- cp->page = NULL;
- /* Doing record locking, release the page lock */
- if ((t_ret = __LPUT(dbc, cp->lock)) != 0) {
- cp->lock = lock;
- return (ret ? ret : t_ret);
- }
- cp->lock = lock;
- return (ret);
- }
- /*
- * __qam_delete --
- * Queue db->del function.
- *
- * PUBLIC: int __qam_delete __P((DB *, DB_TXN *, DBT *, u_int32_t));
- */
- int
- __qam_delete(dbp, txn, key, flags)
- DB *dbp;
- DB_TXN *txn;
- DBT *key;
- u_int32_t flags;
- {
- QUEUE_CURSOR *cp;
- DBC *dbc;
- int ret, t_ret;
- PANIC_CHECK(dbp->dbenv);
- DB_CHECK_TXN(dbp, txn);
- /* Check for invalid flags. */
- if ((ret =
- __db_delchk(dbp, key, flags, F_ISSET(dbp, DB_AM_RDONLY))) != 0)
- return (ret);
- /* Acquire a cursor. */
- if ((ret = dbp->cursor(dbp, txn, &dbc, DB_WRITELOCK)) != 0)
- return (ret);
- DEBUG_LWRITE(dbc, txn, "qam_delete", key, NULL, flags);
- cp = (QUEUE_CURSOR *)dbc->internal;
- if ((ret = __qam_getno(dbp, key, &cp->recno)) != 0)
- goto err;
- ret = __qam_c_del(dbc);
- /* Release the cursor. */
- err: if ((t_ret = dbc->c_close(dbc)) != 0 && ret == 0)
- ret = t_ret;
- return (ret);
- }
- #ifdef DEBUG_WOP
- #define QDEBUG
- #endif
- /*
- * __qam_c_get --
- * Queue cursor->c_get function.
- */
- static int
- __qam_c_get(dbc, key, data, flags, pgnop)
- DBC *dbc;
- DBT *key, *data;
- u_int32_t flags;
- db_pgno_t *pgnop;
- {
- DB *dbp;
- DB_LOCK lock, pglock, metalock, save_lock;
- DBT tmp;
- PAGE *pg;
- QAMDATA *qp;
- QMETA *meta;
- QUEUE *t;
- QUEUE_CURSOR *cp;
- db_indx_t save_indx;
- db_lockmode_t lock_mode;
- db_pgno_t metapno, save_page;
- db_recno_t current, first, save_recno;
- qam_position_mode mode;
- u_int32_t rec_extent;
- int exact, is_first, locked, ret, t_ret, wait, with_delete;
- int put_mode, meta_dirty, retrying, skip_again, wrapped;
- cp = (QUEUE_CURSOR *)dbc->internal;
- dbp = dbc->dbp;
- PANIC_CHECK(dbp->dbenv);
- wait = 0;
- with_delete = 0;
- retrying = 0;
- rec_extent = 0;
- lock_mode = DB_LOCK_READ;
- mode = QAM_READ;
- put_mode = 0;
- t_ret = 0;
- *pgnop = 0;
- pg = NULL;
- skip_again = 0;
- if (F_ISSET(dbc, DBC_RMW)) {
- lock_mode = DB_LOCK_WRITE;
- mode = QAM_WRITE;
- }
- if (flags == DB_CONSUME_WAIT) {
- wait = 1;
- flags = DB_CONSUME;
- }
- if (flags == DB_CONSUME) {
- DB_CHECK_TXN(dbp, dbc->txn);
- with_delete = 1;
- flags = DB_FIRST;
- lock_mode = DB_LOCK_WRITE;
- mode = QAM_CONSUME;
- }
- DEBUG_LREAD(dbc, dbc->txn, "qam_c_get",
- flags == DB_SET || flags == DB_SET_RANGE ? key : NULL, NULL, flags);
- is_first = 0;
- t = (QUEUE *)dbp->q_internal;
- /* get the meta page */
- metapno = t->q_meta;
- if ((ret = __db_lget(dbc, 0, metapno, lock_mode, 0, &metalock)) != 0)
- return (ret);
- locked = 1;
- if ((ret = memp_fget(dbp->mpf, &metapno, 0, &meta)) != 0) {
- /* We did not fetch it, we can release the lock. */
- (void)__LPUT(dbc, metalock);
- return (ret);
- }
- first = 0;
- /* Make lint and friends happy. */
- meta_dirty = 0;
- /* Release any previous lock if not in a transaction. */
- if (cp->lock.off != LOCK_INVALID) {
- (void)__TLPUT(dbc, cp->lock);
- cp->lock.off = LOCK_INVALID;
- }
- retry: /* Update the record number. */
- switch (flags) {
- case DB_CURRENT:
- break;
- case DB_NEXT_DUP:
- ret = DB_NOTFOUND;
- goto err;
- /* NOTREACHED */
- case DB_NEXT:
- case DB_NEXT_NODUP:
- if (cp->recno != RECNO_OOB) {
- ++cp->recno;
- /* Wrap around, skipping zero. */
- if (cp->recno == RECNO_OOB)
- cp->recno++;
- break;
- }
- /* FALLTHROUGH */
- case DB_FIRST:
- flags = DB_NEXT;
- is_first = 1;
- /* get the first record number */
- cp->recno = first = meta->first_recno;
- break;
- case DB_PREV:
- case DB_PREV_NODUP:
- if (cp->recno != RECNO_OOB) {
- if (QAM_BEFORE_FIRST(meta, cp->recno)
- || cp->recno == meta->first_recno) {
- ret = DB_NOTFOUND;
- goto err;
- }
- --cp->recno;
- /* Wrap around, skipping zero. */
- if (cp->recno == RECNO_OOB)
- --cp->recno;
- break;
- }
- /* FALLTHROUGH */
- case DB_LAST:
- if (meta->first_recno == meta->cur_recno) {
- ret = DB_NOTFOUND;
- goto err;
- }
- cp->recno = meta->cur_recno - 1;
- if (cp->recno == RECNO_OOB)
- cp->recno--;
- break;
- case DB_GET_BOTH:
- case DB_SET:
- case DB_SET_RANGE:
- if ((ret = __qam_getno(dbp, key, &cp->recno)) != 0)
- goto err;
- break;
- default:
- ret = __db_unknown_flag(dbp->dbenv, "__qam_c_get", flags);
- goto err;
- }
- /*
- * Check to see if we are out of data. Current points to
- * the first free slot.
- */
- if (cp->recno == meta->cur_recno ||
- QAM_AFTER_CURRENT(meta, cp->recno)) {
- ret = DB_NOTFOUND;
- pg = NULL;
- if (wait) {
- flags = DB_FIRST;
- /*
- * If first is not set, then we skipped a
- * locked record, go back and find it.
- * If we find a locked record again
- * wait for it.
- */
- if (first == 0) {
- retrying = 1;
- goto retry;
- }
- if (CDB_LOCKING(dbp->dbenv)) {
- if ((ret = lock_get(dbp->dbenv, dbc->locker,
- DB_LOCK_SWITCH, &dbc->lock_dbt,
- DB_LOCK_WAIT, &dbc->mylock)) != 0)
- goto err;
- if ((ret = lock_get(dbp->dbenv, dbc->locker,
- DB_LOCK_UPGRADE, &dbc->lock_dbt, DB_LOCK_WRITE,
- &dbc->mylock)) != 0)
- goto err;
- goto retry;
- }
- /*
- * Wait for someone to update the meta page.
- * This will probably mean there is something
- * in the queue. We then go back up and
- * try again.
- */
- if (locked == 0) {
- if ((ret = __db_lget( dbc,
- 0, metapno, lock_mode, 0, &metalock)) != 0)
- goto err;
- locked = 1;
- if (cp->recno != RECNO_OOB &&
- !QAM_AFTER_CURRENT(meta, cp->recno))
- goto retry;
- }
- if ((ret = __db_lget(dbc, 0, metapno,
- DB_LOCK_WAIT, DB_LOCK_SWITCH, &metalock)) != 0)
- goto err;
- if ((ret = lock_get(dbp->dbenv, dbc->locker,
- DB_LOCK_UPGRADE, &dbc->lock_dbt, DB_LOCK_WRITE,
- &metalock)) != 0)
- goto err;
- locked = 1;
- goto retry;
- }
- goto err;
- }
- /* Don't hold the meta page long term. */
- if (locked) {
- if ((ret = __LPUT(dbc, metalock)) != 0)
- goto err;
- locked = 0;
- }
- /* Lock the record. */
- if ((ret = __db_lget(dbc, 0, cp->recno, lock_mode,
- (with_delete && !retrying) ?
- DB_LOCK_NOWAIT | DB_LOCK_RECORD : DB_LOCK_RECORD,
- &lock)) == DB_LOCK_NOTGRANTED && with_delete) {
- #ifdef QDEBUG
- __db_logmsg(dbp->dbenv,
- dbc->txn, "Queue S", 0, "%x %d %d %d",
- dbc->locker, cp->recno, first, meta->first_recno);
- #endif
- first = 0;
- goto retry;
- }
- if (ret != 0)
- goto err;
- /*
- * In the DB_FIRST or DB_LAST cases we must wait and then start over
- * since the first/last may have moved while we slept.
- * We release our locks and try again.
- */
- if ((!with_delete && is_first) || flags == DB_LAST) {
- if ((ret =
- __db_lget(dbc, 0, metapno, lock_mode, 0, &metalock)) != 0)
- goto err;
- if (cp->recno !=
- (is_first ? meta->first_recno : (meta->cur_recno - 1))) {
- __LPUT(dbc, lock);
- if (is_first)
- flags = DB_FIRST;
- locked = 1;
- goto retry;
- }
- /* Don't hold the meta page long term. */
- if ((ret = __LPUT(dbc, metalock)) != 0)
- goto err;
- }
- /* Position the cursor on the record. */
- if ((ret = __qam_position(dbc, &cp->recno, mode, &exact)) != 0) {
- /* We cannot get the page, release the record lock. */
- (void)__LPUT(dbc, lock);
- goto err;
- }
- pg = cp->page;
- pglock = cp->lock;
- cp->lock = lock;
- cp->lock_mode = lock_mode;
- if (!exact) {
- if (flags == DB_NEXT || flags == DB_NEXT_NODUP
- || flags == DB_PREV || flags == DB_PREV_NODUP
- || flags == DB_LAST) {
- /* Release locks and try again. */
- if (pg != NULL)
- (void)__qam_fput(dbp, cp->pgno, pg, 0);
- cp->page = pg = NULL;
- (void)__LPUT(dbc, pglock);
- (void)__LPUT(dbc, cp->lock);
- if (flags == DB_LAST)
- flags = DB_PREV;
- if (!with_delete)
- is_first = 0;
- retrying = 0;
- goto retry;
- }
- /* this is for the SET and SET_RANGE cases */
- ret = DB_KEYEMPTY;
- goto err1;
- }
- /* Return the key if the user didn't give us one. */
- if (key != NULL && flags != DB_SET && flags != DB_GET_BOTH &&
- (ret = __db_retcopy(dbp, key, &cp->recno, sizeof(cp->recno),
- &dbc->rkey.data, &dbc->rkey.ulen)) != 0)
- goto err1;
- if (key != NULL)
- F_SET(key, DB_DBT_ISSET);
- qp = QAM_GET_RECORD(dbp, pg, cp->indx);
- /* Return the data item. */
- if (flags == DB_GET_BOTH) {
- /*
- * Need to compare
- */
- tmp.data = qp->data;
- tmp.size = t->re_len;
- if ((ret = __bam_defcmp(dbp, data, &tmp)) != 0) {
- ret = DB_NOTFOUND;
- goto err1;
- }
- }
- if (data != NULL && (ret = __db_retcopy(dbp, data,
- qp->data, t->re_len, &dbc->rdata.data, &dbc->rdata.ulen)) != 0)
- goto err1;
- if (data != NULL)
- F_SET(data, DB_DBT_ISSET);
- /* Finally, if we are doing DB_CONSUME mark the record. */
- if (with_delete) {
- if (DB_LOGGING(dbc)) {
- if (t->page_ext == 0 || t->re_len == 0) {
- if ((ret = __qam_del_log(dbp->dbenv, dbc->txn,
- &LSN(pg), 0, dbp->log_fileid, &LSN(pg),
- pg->pgno, cp->indx, cp->recno)) != 0)
- goto err1;
- } else {
- tmp.data = qp->data;
- tmp.size = t->re_len;
- if ((ret =
- __qam_delext_log(dbp->dbenv, dbc->txn,
- &LSN(pg), 0, dbp->log_fileid, &LSN(pg),
- pg->pgno, cp->indx, cp->recno, &tmp)) != 0)
- goto err1;
- }
- }
- F_CLR(qp, QAM_VALID);
- put_mode = DB_MPOOL_DIRTY;
- if ((ret = __LPUT(dbc, pglock)) != 0)
- goto err;
- /*
- * Now we need to update the metapage
- * first pointer. If we have deleted
- * the record that is pointed to by
- * first_recno then we move it as far
- * forward as we can without blocking.
- * The metapage lock must be held for
- * the whole scan otherwise someone could
- * do a random insert behind where we are
- * looking.
- */
- if (locked == 0 && (ret = __db_lget(
- dbc, 0, metapno, lock_mode, 0, &metalock)) != 0)
- goto err1;
- locked = 1;
- #ifdef QDEBUG
- __db_logmsg(dbp->dbenv,
- dbc->txn, "Queue D", 0, "%x %d %d %d",
- dbc->locker, cp->recno, first, meta->first_recno);
- #endif
- /*
- * See if we deleted the "first" record. If
- * first is zero then we skipped something,
- * see if first_recno has been move passed
- * that to the record that we deleted.
- */
- if (first == 0)
- first = cp->recno;
- if (first != meta->first_recno)
- goto done;
- save_page = cp->pgno;
- save_indx = cp->indx;
- save_recno = cp->recno;
- save_lock = cp->lock;
- /*
- * If we skipped some deleted records, we need to
- * reposition on the first one. Get a lock
- * in case someone is trying to put it back.
- */
- if (first != cp->recno) {
- ret = __db_lget(dbc, 0, first, DB_LOCK_READ,
- DB_LOCK_NOWAIT | DB_LOCK_RECORD, &lock);
- if (ret == DB_LOCK_NOTGRANTED) {
- ret = 0;
- goto done;
- }
- if (ret != 0)
- goto err1;
- if ((ret =
- __qam_fput(dbp, cp->pgno, cp->page, put_mode)) != 0)
- goto err1;
- cp->page = NULL;
- put_mode = 0;
- if ((ret = __qam_position(dbc,
- &first, QAM_READ, &exact)) != 0 || exact != 0) {
- (void)__LPUT(dbc, lock);
- goto err1;
- }
- if ((ret =__LPUT(dbc, lock)) != 0)
- goto err1;
- if ((ret = __LPUT(dbc, cp->lock)) != 0)
- goto err1;
- }
- current = meta->cur_recno;
- wrapped = 0;
- if (first > current)
- wrapped = 1;
- rec_extent = meta->page_ext * meta->rec_page;
- /* Loop until we find a record or hit current */
- for (;;) {
- /*
- * Check to see if we are moving off the extent
- * and remove the extent.
- * If we are moving off a page we need to
- * get rid of the buffer.
- * Wait for the lagging readers to move off the
- * page.
- */
- if (rec_extent != 0
- && ((exact = first % rec_extent == 0)
- || first % meta->rec_page == 0
- || first == UINT32_T_MAX)) {
- if (exact == 1 && (ret = __db_lget(dbc,
- 0, cp->pgno, DB_LOCK_WRITE, 0, &cp->lock)) != 0)
- break;
- #ifdef QDEBUG
- __db_logmsg(dbp->dbenv,
- dbc->txn, "Queue R", 0, "%x %d %d %d",
- dbc->locker, cp->pgno, first, meta->first_recno);
- #endif
- put_mode |= DB_MPOOL_DISCARD;
- if ((ret = __qam_fput(dbp,
- cp->pgno, cp->page, put_mode)) != 0)
- break;
- cp->page = NULL;
- if (exact == 1) {
- ret = __qam_fremove(dbp, cp->pgno);
- t_ret = __LPUT(dbc, cp->lock);
- }
- if (ret != 0)
- break;
- if (t_ret != 0) {
- ret = t_ret;
- break;
- }
- } else if ((ret =
- __qam_fput(dbp, cp->pgno, cp->page, put_mode)) != 0)
- break;
- cp->page = NULL;
- first++;
- if (first == RECNO_OOB) {
- wrapped = 0;
- first++;
- }
- /*
- * LOOP EXIT when we come move to the current
- * pointer.
- */
- if (!wrapped && first >= current)
- break;
- ret = __db_lget(dbc, 0, first, DB_LOCK_READ,
- DB_LOCK_NOWAIT | DB_LOCK_RECORD, &lock);
- if (ret == DB_LOCK_NOTGRANTED) {
- ret = 0;
- break;
- }
- if (ret != 0)
- break;
- if ((ret = __qam_position(dbc,
- &first, QAM_READ, &exact)) != 0) {
- (void)__LPUT(dbc, lock);
- break;
- }
- put_mode = 0;
- if ((ret =__LPUT(dbc, lock)) != 0
- || (ret = __LPUT(dbc, cp->lock)) != 0 ||exact) {
- if ((t_ret = __qam_fput(dbp, cp->pgno,
- cp->page, put_mode)) != 0 && ret == 0)
- ret = t_ret;
- cp->page = NULL;
- break;
- }
- }
- cp->pgno = save_page;
- cp->indx = save_indx;
- cp->recno = save_recno;
- cp->lock = save_lock;
- /*
- * We have advanced as far as we can.
- * Advance first_recno to this point.
- */
- if (meta->first_recno != first) {
- #ifdef QDEBUG
- __db_logmsg(dbp->dbenv, dbc->txn, "Queue M",
- 0, "%x %d %d %d", dbc->locker, cp->recno,
- first, meta->first_recno);
- #endif
- if (DB_LOGGING(dbc))
- if ((ret =
- __qam_incfirst_log(dbp->dbenv,
- dbc->txn, &meta->dbmeta.lsn, 0,
- dbp->log_fileid, cp->recno)) != 0)
- goto err;
- meta->first_recno = first;
- meta_dirty = 1;
- }
- }
- done:
- err1: if (cp->page != NULL) {
- t_ret = __qam_fput(dbp, cp->pgno, cp->page, put_mode);
- if (!ret)
- ret = t_ret;
- /* Doing record locking, release the page lock */
- t_ret = __LPUT(dbc, pglock);
- cp->page = NULL;
- }
- err: if (!ret)
- ret = t_ret;
- if (meta) {
- /* release the meta page */
- t_ret = memp_fput(
- dbp->mpf, meta, meta_dirty ? DB_MPOOL_DIRTY : 0);
- if (!ret)
- ret = t_ret;
- /* Don't hold the meta page long term. */
- if (locked)
- t_ret = __LPUT(dbc, metalock);
- }
- DB_ASSERT(metalock.off == LOCK_INVALID);
- /*
- * There is no need to keep the record locked if we are
- * not in a transaction.
- */
- if (t_ret == 0)
- t_ret = __TLPUT(dbc, cp->lock);
- return (ret ? ret : t_ret);
- }
- /*
- * __qam_c_close --
- * Close down the cursor from a single use.
- */
- static int
- __qam_c_close(dbc, root_pgno, rmroot)
- DBC *dbc;
- db_pgno_t root_pgno;
- int *rmroot;
- {
- QUEUE_CURSOR *cp;
- COMPQUIET(root_pgno, 0);
- COMPQUIET(rmroot, NULL);
- cp = (QUEUE_CURSOR *)dbc->internal;
- /* Discard any locks not acquired inside of a transaction. */
- if (cp->lock.off != LOCK_INVALID) {
- (void)__TLPUT(dbc, cp->lock);
- cp->lock.off = LOCK_INVALID;
- }
- cp->page = NULL;
- cp->pgno = PGNO_INVALID;
- cp->indx = 0;
- cp->lock.off = LOCK_INVALID;
- cp->lock_mode = DB_LOCK_NG;
- cp->recno = RECNO_OOB;
- cp->flags = 0;
- return (0);
- }
- /*
- * __qam_c_dup --
- * Duplicate a queue cursor, such that the new one holds appropriate
- * locks for the position of the original.
- *
- * PUBLIC: int __qam_c_dup __P((DBC *, DBC *));
- */
- int
- __qam_c_dup(orig_dbc, new_dbc)
- DBC *orig_dbc, *new_dbc;
- {
- QUEUE_CURSOR *orig, *new;
- orig = (QUEUE_CURSOR *)orig_dbc->internal;
- new = (QUEUE_CURSOR *)new_dbc->internal;
- new->recno = orig->recno;
- /* reget the long term lock if we are not in a xact */
- if (orig_dbc->txn != NULL ||
- !STD_LOCKING(orig_dbc) || orig->lock.off == LOCK_INVALID)
- return (0);
- return (__db_lget(new_dbc,
- 0, new->recno, new->lock_mode, DB_LOCK_RECORD, &new->lock));
- }
- /*
- * __qam_c_init
- *
- * PUBLIC: int __qam_c_init __P((DBC *));
- */
- int
- __qam_c_init(dbc)
- DBC *dbc;
- {
- QUEUE_CURSOR *cp;
- DB *dbp;
- int ret;
- dbp = dbc->dbp;
- /* Allocate the internal structure. */
- cp = (QUEUE_CURSOR *)dbc->internal;
- if (cp == NULL) {
- if ((ret =
- __os_calloc(dbp->dbenv, 1, sizeof(QUEUE_CURSOR), &cp)) != 0)
- return (ret);
- dbc->internal = (DBC_INTERNAL *)cp;
- }
- /* Initialize methods. */
- dbc->c_close = __db_c_close;
- dbc->c_count = __db_c_count;
- dbc->c_del = __db_c_del;
- dbc->c_dup = __db_c_dup;
- dbc->c_get = __db_c_get;
- dbc->c_put = __db_c_put;
- dbc->c_am_close = __qam_c_close;
- dbc->c_am_del = __qam_c_del;
- dbc->c_am_destroy = __qam_c_destroy;
- dbc->c_am_get = __qam_c_get;
- dbc->c_am_put = __qam_c_put;
- dbc->c_am_writelock = NULL;
- return (0);
- }
- /*
- * __qam_c_destroy --
- * Close a single cursor -- internal version.
- */
- static int
- __qam_c_destroy(dbc)
- DBC *dbc;
- {
- /* Discard the structures. */
- __os_free(dbc->internal, sizeof(QUEUE_CURSOR));
- return (0);
- }
- /*
- * __qam_getno --
- * Check the user's record number.
- */
- static int
- __qam_getno(dbp, key, rep)
- DB *dbp;
- const DBT *key;
- db_recno_t *rep;
- {
- if ((*rep = *(db_recno_t *)key->data) == 0) {
- __db_err(dbp->dbenv, "illegal record number of 0");
- return (EINVAL);
- }
- return (0);
- }