bt_split.c
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:35k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /*-
  2.  * See the file LICENSE for redistribution information.
  3.  *
  4.  * Copyright (c) 1996-2002
  5.  * Sleepycat Software.  All rights reserved.
  6.  */
  7. /*
  8.  * Copyright (c) 1990, 1993, 1994, 1995, 1996
  9.  * Keith Bostic.  All rights reserved.
  10.  */
  11. /*
  12.  * Copyright (c) 1990, 1993, 1994, 1995
  13.  * The Regents of the University of California.  All rights reserved.
  14.  *
  15.  * Redistribution and use in source and binary forms, with or without
  16.  * modification, are permitted provided that the following conditions
  17.  * are met:
  18.  * 1. Redistributions of source code must retain the above copyright
  19.  *    notice, this list of conditions and the following disclaimer.
  20.  * 2. Redistributions in binary form must reproduce the above copyright
  21.  *    notice, this list of conditions and the following disclaimer in the
  22.  *    documentation and/or other materials provided with the distribution.
  23.  * 3. Neither the name of the University nor the names of its contributors
  24.  *    may be used to endorse or promote products derived from this software
  25.  *    without specific prior written permission.
  26.  *
  27.  * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
  28.  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  29.  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  30.  * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
  31.  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
  32.  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
  33.  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
  34.  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
  35.  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
  36.  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  37.  * SUCH DAMAGE.
  38.  */
  39. #include "db_config.h"
  40. #ifndef lint
  41. static const char revid[] = "$Id: bt_split.c,v 11.58 2002/07/03 19:03:50 bostic Exp $";
  42. #endif /* not lint */
  43. #ifndef NO_SYSTEM_INCLUDES
  44. #include <sys/types.h>
  45. #include <limits.h>
  46. #include <string.h>
  47. #endif
  48. #include "db_int.h"
  49. #include "dbinc/db_page.h"
  50. #include "dbinc/db_shash.h"
  51. #include "dbinc/lock.h"
  52. #include "dbinc/btree.h"
  53. static int __bam_broot __P((DBC *, PAGE *, PAGE *, PAGE *));
  54. static int __bam_page __P((DBC *, EPG *, EPG *));
  55. static int __bam_pinsert __P((DBC *, EPG *, PAGE *, PAGE *, int));
  56. static int __bam_psplit __P((DBC *, EPG *, PAGE *, PAGE *, db_indx_t *));
  57. static int __bam_root __P((DBC *, EPG *));
  58. static int __ram_root __P((DBC *, PAGE *, PAGE *, PAGE *));
  59. /*
  60.  * __bam_split --
  61.  * Split a page.
  62.  *
  63.  * PUBLIC: int __bam_split __P((DBC *, void *, db_pgno_t *));
  64.  */
  65. int
  66. __bam_split(dbc, arg, root_pgnop)
  67. DBC *dbc;
  68. void *arg;
  69. db_pgno_t *root_pgnop;
  70. {
  71. BTREE_CURSOR *cp;
  72. enum { UP, DOWN } dir;
  73. db_pgno_t root_pgno;
  74. int exact, level, ret;
  75. cp = (BTREE_CURSOR *)dbc->internal;
  76. root_pgno = cp->root;
  77. /*
  78.  * The locking protocol we use to avoid deadlock to acquire locks by
  79.  * walking down the tree, but we do it as lazily as possible, locking
  80.  * the root only as a last resort.  We expect all stack pages to have
  81.  * been discarded before we're called; we discard all short-term locks.
  82.  *
  83.  * When __bam_split is first called, we know that a leaf page was too
  84.  * full for an insert.  We don't know what leaf page it was, but we
  85.  * have the key/recno that caused the problem.  We call XX_search to
  86.  * reacquire the leaf page, but this time get both the leaf page and
  87.  * its parent, locked.  We then split the leaf page and see if the new
  88.  * internal key will fit into the parent page.  If it will, we're done.
  89.  *
  90.  * If it won't, we discard our current locks and repeat the process,
  91.  * only this time acquiring the parent page and its parent, locked.
  92.  * This process repeats until we succeed in the split, splitting the
  93.  * root page as the final resort.  The entire process then repeats,
  94.  * as necessary, until we split a leaf page.
  95.  *
  96.  * XXX
  97.  * A traditional method of speeding this up is to maintain a stack of
  98.  * the pages traversed in the original search.  You can detect if the
  99.  * stack is correct by storing the page's LSN when it was searched and
  100.  * comparing that LSN with the current one when it's locked during the
  101.  * split.  This would be an easy change for this code, but I have no
  102.  * numbers that indicate it's worthwhile.
  103.  */
  104. for (dir = UP, level = LEAFLEVEL;; dir == UP ? ++level : --level) {
  105. /*
  106.  * Acquire a page and its parent, locked.
  107.  */
  108. if ((ret = (dbc->dbtype == DB_BTREE ?
  109.     __bam_search(dbc, PGNO_INVALID,
  110. arg, S_WRPAIR, level, NULL, &exact) :
  111.     __bam_rsearch(dbc,
  112. (db_recno_t *)arg, S_WRPAIR, level, &exact))) != 0)
  113. return (ret);
  114. if (root_pgnop != NULL)
  115. *root_pgnop = cp->csp[0].page->pgno == root_pgno ?
  116.     root_pgno : cp->csp[-1].page->pgno;
  117. /*
  118.  * Split the page if it still needs it (it's possible another
  119.  * thread of control has already split the page).  If we are
  120.  * guaranteed that two items will fit on the page, the split
  121.  * is no longer necessary.
  122.  */
  123. if (2 * B_MAXSIZEONPAGE(cp->ovflsize)
  124.     <= (db_indx_t)P_FREESPACE(dbc->dbp, cp->csp[0].page)) {
  125. __bam_stkrel(dbc, STK_NOLOCK);
  126. return (0);
  127. }
  128. ret = cp->csp[0].page->pgno == root_pgno ?
  129.     __bam_root(dbc, &cp->csp[0]) :
  130.     __bam_page(dbc, &cp->csp[-1], &cp->csp[0]);
  131. BT_STK_CLR(cp);
  132. switch (ret) {
  133. case 0:
  134. /* Once we've split the leaf page, we're done. */
  135. if (level == LEAFLEVEL)
  136. return (0);
  137. /* Switch directions. */
  138. if (dir == UP)
  139. dir = DOWN;
  140. break;
  141. case DB_NEEDSPLIT:
  142. /*
  143.  * It's possible to fail to split repeatedly, as other
  144.  * threads may be modifying the tree, or the page usage
  145.  * is sufficiently bad that we don't get enough space
  146.  * the first time.
  147.  */
  148. if (dir == DOWN)
  149. dir = UP;
  150. break;
  151. default:
  152. return (ret);
  153. }
  154. }
  155. /* NOTREACHED */
  156. }
  157. /*
  158.  * __bam_root --
  159.  * Split the root page of a btree.
  160.  */
  161. static int
  162. __bam_root(dbc, cp)
  163. DBC *dbc;
  164. EPG *cp;
  165. {
  166. DB *dbp;
  167. DBT log_dbt;
  168. DB_LSN log_lsn;
  169. DB_MPOOLFILE *mpf;
  170. PAGE *lp, *rp;
  171. db_indx_t split;
  172. u_int32_t opflags;
  173. int ret;
  174. dbp = dbc->dbp;
  175. mpf = dbp->mpf;
  176. /* Yeah, right. */
  177. if (cp->page->level >= MAXBTREELEVEL) {
  178. __db_err(dbp->dbenv,
  179.     "Too many btree levels: %d", cp->page->level);
  180. ret = ENOSPC;
  181. goto err;
  182. }
  183. /* Create new left and right pages for the split. */
  184. lp = rp = NULL;
  185. if ((ret = __db_new(dbc, TYPE(cp->page), &lp)) != 0 ||
  186.     (ret = __db_new(dbc, TYPE(cp->page), &rp)) != 0)
  187. goto err;
  188. P_INIT(lp, dbp->pgsize, lp->pgno,
  189.     PGNO_INVALID, ISINTERNAL(cp->page) ? PGNO_INVALID : rp->pgno,
  190.     cp->page->level, TYPE(cp->page));
  191. P_INIT(rp, dbp->pgsize, rp->pgno,
  192.     ISINTERNAL(cp->page) ?  PGNO_INVALID : lp->pgno, PGNO_INVALID,
  193.     cp->page->level, TYPE(cp->page));
  194. /* Split the page. */
  195. if ((ret = __bam_psplit(dbc, cp, lp, rp, &split)) != 0)
  196. goto err;
  197. /* Log the change. */
  198. if (DBC_LOGGING(dbc)) {
  199. memset(&log_dbt, 0, sizeof(log_dbt));
  200. log_dbt.data = cp->page;
  201. log_dbt.size = dbp->pgsize;
  202. ZERO_LSN(log_lsn);
  203. opflags = F_ISSET(
  204.     (BTREE_CURSOR *)dbc->internal, C_RECNUM) ? SPL_NRECS : 0;
  205. if ((ret = __bam_split_log(dbp,
  206.     dbc->txn, &LSN(cp->page), 0, PGNO(lp), &LSN(lp), PGNO(rp),
  207.     &LSN(rp), (u_int32_t)NUM_ENT(lp), 0, &log_lsn,
  208.     dbc->internal->root, &log_dbt, opflags)) != 0)
  209. goto err;
  210. } else
  211. LSN_NOT_LOGGED(LSN(cp->page));
  212. LSN(lp) = LSN(cp->page);
  213. LSN(rp) = LSN(cp->page);
  214. /* Clean up the new root page. */
  215. if ((ret = (dbc->dbtype == DB_RECNO ?
  216.     __ram_root(dbc, cp->page, lp, rp) :
  217.     __bam_broot(dbc, cp->page, lp, rp))) != 0)
  218. goto err;
  219. /* Adjust any cursors. */
  220. if ((ret = __bam_ca_split(dbc,
  221.     cp->page->pgno, lp->pgno, rp->pgno, split, 1)) != 0)
  222. goto err;
  223. /* Success -- write the real pages back to the store. */
  224. (void)mpf->put(mpf, cp->page, DB_MPOOL_DIRTY);
  225. (void)__TLPUT(dbc, cp->lock);
  226. (void)mpf->put(mpf, lp, DB_MPOOL_DIRTY);
  227. (void)mpf->put(mpf, rp, DB_MPOOL_DIRTY);
  228. return (0);
  229. err: if (lp != NULL)
  230. (void)mpf->put(mpf, lp, 0);
  231. if (rp != NULL)
  232. (void)mpf->put(mpf, rp, 0);
  233. (void)mpf->put(mpf, cp->page, 0);
  234. (void)__TLPUT(dbc, cp->lock);
  235. return (ret);
  236. }
  237. /*
  238.  * __bam_page --
  239.  * Split the non-root page of a btree.
  240.  */
  241. static int
  242. __bam_page(dbc, pp, cp)
  243. DBC *dbc;
  244. EPG *pp, *cp;
  245. {
  246. BTREE_CURSOR *bc;
  247. DBT log_dbt;
  248. DB_LSN log_lsn;
  249. DB *dbp;
  250. DB_LOCK rplock, tplock;
  251. DB_MPOOLFILE *mpf;
  252. DB_LSN save_lsn;
  253. PAGE *lp, *rp, *alloc_rp, *tp;
  254. db_indx_t split;
  255. u_int32_t opflags;
  256. int ret, t_ret;
  257. dbp = dbc->dbp;
  258. mpf = dbp->mpf;
  259. alloc_rp = lp = rp = tp = NULL;
  260. LOCK_INIT(rplock);
  261. LOCK_INIT(tplock);
  262. ret = -1;
  263. /*
  264.  * Create a new right page for the split, and fill in everything
  265.  * except its LSN and page number.
  266.  *
  267.  * We malloc space for both the left and right pages, so we don't get
  268.  * a new page from the underlying buffer pool until we know the split
  269.  * is going to succeed.  The reason is that we can't release locks
  270.  * acquired during the get-a-new-page process because metadata page
  271.  * locks can't be discarded on failure since we may have modified the
  272.  * free list.  So, if you assume that we're holding a write lock on the
  273.  * leaf page which ran out of space and started this split (e.g., we
  274.  * have already written records to the page, or we retrieved a record
  275.  * from it with the DB_RMW flag set), failing in a split with both a
  276.  * leaf page locked and the metadata page locked can potentially lock
  277.  * up the tree badly, because we've violated the rule of always locking
  278.  * down the tree, and never up.
  279.  */
  280. if ((ret = __os_malloc(dbp->dbenv, dbp->pgsize, &rp)) != 0)
  281. goto err;
  282. P_INIT(rp, dbp->pgsize, 0,
  283.     ISINTERNAL(cp->page) ? PGNO_INVALID : PGNO(cp->page),
  284.     ISINTERNAL(cp->page) ? PGNO_INVALID : NEXT_PGNO(cp->page),
  285.     cp->page->level, TYPE(cp->page));
  286. /*
  287.  * Create new left page for the split, and fill in everything
  288.  * except its LSN and next-page page number.
  289.  */
  290. if ((ret = __os_malloc(dbp->dbenv, dbp->pgsize, &lp)) != 0)
  291. goto err;
  292. P_INIT(lp, dbp->pgsize, PGNO(cp->page),
  293.     ISINTERNAL(cp->page) ?  PGNO_INVALID : PREV_PGNO(cp->page),
  294.     ISINTERNAL(cp->page) ?  PGNO_INVALID : 0,
  295.     cp->page->level, TYPE(cp->page));
  296. /*
  297.  * Split right.
  298.  *
  299.  * Only the indices are sorted on the page, i.e., the key/data pairs
  300.  * aren't, so it's simpler to copy the data from the split page onto
  301.  * two new pages instead of copying half the data to a new right page
  302.  * and compacting the left page in place.  Since the left page can't
  303.  * change, we swap the original and the allocated left page after the
  304.  * split.
  305.  */
  306. if ((ret = __bam_psplit(dbc, cp, lp, rp, &split)) != 0)
  307. goto err;
  308. /*
  309.  * Test to see if we are going to be able to insert the new pages into
  310.  * the parent page.  The interesting failure here is that the parent
  311.  * page can't hold the new keys, and has to be split in turn, in which
  312.  * case we want to release all the locks we can.
  313.  */
  314. if ((ret = __bam_pinsert(dbc, pp, lp, rp, 1)) != 0)
  315. goto err;
  316. /*
  317.  * Fix up the previous pointer of any leaf page following the split
  318.  * page.
  319.  *
  320.  * There's interesting deadlock situations here as we try to write-lock
  321.  * a page that's not in our direct ancestry.  Consider a cursor walking
  322.  * backward through the leaf pages, that has our following page locked,
  323.  * and is waiting on a lock for the page we're splitting.  In that case
  324.  * we're going to deadlock here .  It's probably OK, stepping backward
  325.  * through the tree isn't a common operation.
  326.  */
  327. if (ISLEAF(cp->page) && NEXT_PGNO(cp->page) != PGNO_INVALID) {
  328. if ((ret = __db_lget(dbc,
  329.     0, NEXT_PGNO(cp->page), DB_LOCK_WRITE, 0, &tplock)) != 0)
  330. goto err;
  331. if ((ret = mpf->get(mpf, &NEXT_PGNO(cp->page), 0, &tp)) != 0)
  332. goto err;
  333. }
  334. /*
  335.  * We've got everything locked down we need, and we know the split
  336.  * is going to succeed.  Go and get the additional page we'll need.
  337.  */
  338. if ((ret = __db_new(dbc, TYPE(cp->page), &alloc_rp)) != 0)
  339. goto err;
  340. /*
  341.  * Lock the new page.  We need to do this because someone
  342.  * could get here through bt_lpgno if this page was recently
  343.  * dealocated.  They can't look at it before we commit.
  344.  */
  345. if ((ret = __db_lget(dbc,
  346.     0, PGNO(alloc_rp), DB_LOCK_WRITE, 0, &rplock)) != 0)
  347. goto err;
  348. /*
  349.  * Fix up the page numbers we didn't have before.  We have to do this
  350.  * before calling __bam_pinsert because it may copy a page number onto
  351.  * the parent page and it takes the page number from its page argument.
  352.  */
  353. PGNO(rp) = NEXT_PGNO(lp) = PGNO(alloc_rp);
  354. /* Actually update the parent page. */
  355. if ((ret = __bam_pinsert(dbc, pp, lp, rp, 0)) != 0)
  356. goto err;
  357. bc = (BTREE_CURSOR *)dbc->internal;
  358. /* Log the change. */
  359. if (DBC_LOGGING(dbc)) {
  360. memset(&log_dbt, 0, sizeof(log_dbt));
  361. log_dbt.data = cp->page;
  362. log_dbt.size = dbp->pgsize;
  363. if (tp == NULL)
  364. ZERO_LSN(log_lsn);
  365. opflags = F_ISSET(bc, C_RECNUM) ? SPL_NRECS : 0;
  366. if ((ret = __bam_split_log(dbp, dbc->txn, &LSN(cp->page), 0,
  367.     PGNO(cp->page), &LSN(cp->page), PGNO(alloc_rp),
  368.     &LSN(alloc_rp), (u_int32_t)NUM_ENT(lp),
  369.     tp == NULL ? 0 : PGNO(tp),
  370.     tp == NULL ? &log_lsn : &LSN(tp),
  371.     PGNO_INVALID, &log_dbt, opflags)) != 0)
  372. goto err;
  373. } else
  374. LSN_NOT_LOGGED(LSN(cp->page));
  375. /* Update the LSNs for all involved pages. */
  376. LSN(alloc_rp) = LSN(cp->page);
  377. LSN(lp) = LSN(cp->page);
  378. LSN(rp) = LSN(cp->page);
  379. if (tp != NULL)
  380. LSN(tp) = LSN(cp->page);
  381. /*
  382.  * Copy the left and right pages into place.  There are two paths
  383.  * through here.  Either we are logging and we set the LSNs in the
  384.  * logging path.  However, if we are not logging, then we do not
  385.  * have valid LSNs on lp or rp.  The correct LSNs to use are the
  386.  * ones on the page we got from __db_new or the one that was
  387.  * originally on cp->page.  In both cases, we save the LSN from the
  388.  * real database page (not a malloc'd one) and reapply it after we
  389.  * do the copy.
  390.  */
  391. save_lsn = alloc_rp->lsn;
  392. memcpy(alloc_rp, rp, LOFFSET(dbp, rp));
  393. memcpy((u_int8_t *)alloc_rp + HOFFSET(rp),
  394.     (u_int8_t *)rp + HOFFSET(rp), dbp->pgsize - HOFFSET(rp));
  395. alloc_rp->lsn = save_lsn;
  396. save_lsn = cp->page->lsn;
  397. memcpy(cp->page, lp, LOFFSET(dbp, lp));
  398. memcpy((u_int8_t *)cp->page + HOFFSET(lp),
  399.     (u_int8_t *)lp + HOFFSET(lp), dbp->pgsize - HOFFSET(lp));
  400. cp->page->lsn = save_lsn;
  401. /* Fix up the next-page link. */
  402. if (tp != NULL)
  403. PREV_PGNO(tp) = PGNO(rp);
  404. /* Adjust any cursors. */
  405. if ((ret = __bam_ca_split(dbc,
  406.     PGNO(cp->page), PGNO(cp->page), PGNO(rp), split, 0)) != 0)
  407. goto err;
  408. __os_free(dbp->dbenv, lp);
  409. __os_free(dbp->dbenv, rp);
  410. /*
  411.  * Success -- write the real pages back to the store.  As we never
  412.  * acquired any sort of lock on the new page, we release it before
  413.  * releasing locks on the pages that reference it.  We're finished
  414.  * modifying the page so it's not really necessary, but it's neater.
  415.  */
  416. if ((t_ret = mpf->put(mpf, alloc_rp, DB_MPOOL_DIRTY)) != 0 && ret == 0)
  417. ret = t_ret;
  418. (void)__TLPUT(dbc, rplock);
  419. if ((t_ret = mpf->put(mpf, pp->page, DB_MPOOL_DIRTY)) != 0 && ret == 0)
  420. ret = t_ret;
  421. (void)__TLPUT(dbc, pp->lock);
  422. if ((t_ret = mpf->put(mpf, cp->page, DB_MPOOL_DIRTY)) != 0 && ret == 0)
  423. ret = t_ret;
  424. (void)__TLPUT(dbc, cp->lock);
  425. if (tp != NULL) {
  426. if ((t_ret =
  427.     mpf->put(mpf, tp, DB_MPOOL_DIRTY)) != 0 && ret == 0)
  428. ret = t_ret;
  429. (void)__TLPUT(dbc, tplock);
  430. }
  431. return (ret);
  432. err: if (lp != NULL)
  433. __os_free(dbp->dbenv, lp);
  434. if (rp != NULL)
  435. __os_free(dbp->dbenv, rp);
  436. if (alloc_rp != NULL)
  437. (void)mpf->put(mpf, alloc_rp, 0);
  438. if (tp != NULL)
  439. (void)mpf->put(mpf, tp, 0);
  440. /* We never updated the new or next pages, we can release them. */
  441. (void)__LPUT(dbc, rplock);
  442. (void)__LPUT(dbc, tplock);
  443. (void)mpf->put(mpf, pp->page, 0);
  444. if (ret == DB_NEEDSPLIT)
  445. (void)__LPUT(dbc, pp->lock);
  446. else
  447. (void)__TLPUT(dbc, pp->lock);
  448. (void)mpf->put(mpf, cp->page, 0);
  449. if (ret == DB_NEEDSPLIT)
  450. (void)__LPUT(dbc, cp->lock);
  451. else
  452. (void)__TLPUT(dbc, cp->lock);
  453. return (ret);
  454. }
  455. /*
  456.  * __bam_broot --
  457.  * Fix up the btree root page after it has been split.
  458.  */
  459. static int
  460. __bam_broot(dbc, rootp, lp, rp)
  461. DBC *dbc;
  462. PAGE *rootp, *lp, *rp;
  463. {
  464. BINTERNAL bi, *child_bi;
  465. BKEYDATA *child_bk;
  466. BTREE_CURSOR *cp;
  467. DB *dbp;
  468. DBT hdr, data;
  469. db_pgno_t root_pgno;
  470. int ret;
  471. dbp = dbc->dbp;
  472. cp = (BTREE_CURSOR *)dbc->internal;
  473. /*
  474.  * If the root page was a leaf page, change it into an internal page.
  475.  * We copy the key we split on (but not the key's data, in the case of
  476.  * a leaf page) to the new root page.
  477.  */
  478. root_pgno = cp->root;
  479. P_INIT(rootp, dbp->pgsize,
  480.     root_pgno, PGNO_INVALID, PGNO_INVALID, lp->level + 1, P_IBTREE);
  481. memset(&data, 0, sizeof(data));
  482. memset(&hdr, 0, sizeof(hdr));
  483. /*
  484.  * The btree comparison code guarantees that the left-most key on any
  485.  * internal btree page is never used, so it doesn't need to be filled
  486.  * in.  Set the record count if necessary.
  487.  */
  488. memset(&bi, 0, sizeof(bi));
  489. bi.len = 0;
  490. B_TSET(bi.type, B_KEYDATA, 0);
  491. bi.pgno = lp->pgno;
  492. if (F_ISSET(cp, C_RECNUM)) {
  493. bi.nrecs = __bam_total(dbp, lp);
  494. RE_NREC_SET(rootp, bi.nrecs);
  495. }
  496. hdr.data = &bi;
  497. hdr.size = SSZA(BINTERNAL, data);
  498. if ((ret =
  499.     __db_pitem(dbc, rootp, 0, BINTERNAL_SIZE(0), &hdr, NULL)) != 0)
  500. return (ret);
  501. switch (TYPE(rp)) {
  502. case P_IBTREE:
  503. /* Copy the first key of the child page onto the root page. */
  504. child_bi = GET_BINTERNAL(dbp, rp, 0);
  505. bi.len = child_bi->len;
  506. B_TSET(bi.type, child_bi->type, 0);
  507. bi.pgno = rp->pgno;
  508. if (F_ISSET(cp, C_RECNUM)) {
  509. bi.nrecs = __bam_total(dbp, rp);
  510. RE_NREC_ADJ(rootp, bi.nrecs);
  511. }
  512. hdr.data = &bi;
  513. hdr.size = SSZA(BINTERNAL, data);
  514. data.data = child_bi->data;
  515. data.size = child_bi->len;
  516. if ((ret = __db_pitem(dbc, rootp, 1,
  517.     BINTERNAL_SIZE(child_bi->len), &hdr, &data)) != 0)
  518. return (ret);
  519. /* Increment the overflow ref count. */
  520. if (B_TYPE(child_bi->type) == B_OVERFLOW)
  521. if ((ret = __db_ovref(dbc,
  522.     ((BOVERFLOW *)(child_bi->data))->pgno, 1)) != 0)
  523. return (ret);
  524. break;
  525. case P_LDUP:
  526. case P_LBTREE:
  527. /* Copy the first key of the child page onto the root page. */
  528. child_bk = GET_BKEYDATA(dbp, rp, 0);
  529. switch (B_TYPE(child_bk->type)) {
  530. case B_KEYDATA:
  531. bi.len = child_bk->len;
  532. B_TSET(bi.type, child_bk->type, 0);
  533. bi.pgno = rp->pgno;
  534. if (F_ISSET(cp, C_RECNUM)) {
  535. bi.nrecs = __bam_total(dbp, rp);
  536. RE_NREC_ADJ(rootp, bi.nrecs);
  537. }
  538. hdr.data = &bi;
  539. hdr.size = SSZA(BINTERNAL, data);
  540. data.data = child_bk->data;
  541. data.size = child_bk->len;
  542. if ((ret = __db_pitem(dbc, rootp, 1,
  543.     BINTERNAL_SIZE(child_bk->len), &hdr, &data)) != 0)
  544. return (ret);
  545. break;
  546. case B_DUPLICATE:
  547. case B_OVERFLOW:
  548. bi.len = BOVERFLOW_SIZE;
  549. B_TSET(bi.type, child_bk->type, 0);
  550. bi.pgno = rp->pgno;
  551. if (F_ISSET(cp, C_RECNUM)) {
  552. bi.nrecs = __bam_total(dbp, rp);
  553. RE_NREC_ADJ(rootp, bi.nrecs);
  554. }
  555. hdr.data = &bi;
  556. hdr.size = SSZA(BINTERNAL, data);
  557. data.data = child_bk;
  558. data.size = BOVERFLOW_SIZE;
  559. if ((ret = __db_pitem(dbc, rootp, 1,
  560.     BINTERNAL_SIZE(BOVERFLOW_SIZE), &hdr, &data)) != 0)
  561. return (ret);
  562. /* Increment the overflow ref count. */
  563. if (B_TYPE(child_bk->type) == B_OVERFLOW)
  564. if ((ret = __db_ovref(dbc,
  565.     ((BOVERFLOW *)child_bk)->pgno, 1)) != 0)
  566. return (ret);
  567. break;
  568. default:
  569. return (__db_pgfmt(dbp->dbenv, rp->pgno));
  570. }
  571. break;
  572. default:
  573. return (__db_pgfmt(dbp->dbenv, rp->pgno));
  574. }
  575. return (0);
  576. }
  577. /*
  578.  * __ram_root --
  579.  * Fix up the recno root page after it has been split.
  580.  */
  581. static int
  582. __ram_root(dbc, rootp, lp, rp)
  583. DBC *dbc;
  584. PAGE *rootp, *lp, *rp;
  585. {
  586. DB *dbp;
  587. DBT hdr;
  588. RINTERNAL ri;
  589. db_pgno_t root_pgno;
  590. int ret;
  591. dbp = dbc->dbp;
  592. root_pgno = dbc->internal->root;
  593. /* Initialize the page. */
  594. P_INIT(rootp, dbp->pgsize,
  595.     root_pgno, PGNO_INVALID, PGNO_INVALID, lp->level + 1, P_IRECNO);
  596. /* Initialize the header. */
  597. memset(&hdr, 0, sizeof(hdr));
  598. hdr.data = &ri;
  599. hdr.size = RINTERNAL_SIZE;
  600. /* Insert the left and right keys, set the header information. */
  601. ri.pgno = lp->pgno;
  602. ri.nrecs = __bam_total(dbp, lp);
  603. if ((ret = __db_pitem(dbc, rootp, 0, RINTERNAL_SIZE, &hdr, NULL)) != 0)
  604. return (ret);
  605. RE_NREC_SET(rootp, ri.nrecs);
  606. ri.pgno = rp->pgno;
  607. ri.nrecs = __bam_total(dbp, rp);
  608. if ((ret = __db_pitem(dbc, rootp, 1, RINTERNAL_SIZE, &hdr, NULL)) != 0)
  609. return (ret);
  610. RE_NREC_ADJ(rootp, ri.nrecs);
  611. return (0);
  612. }
  613. /*
  614.  * __bam_pinsert --
  615.  * Insert a new key into a parent page, completing the split.
  616.  */
  617. static int
  618. __bam_pinsert(dbc, parent, lchild, rchild, space_check)
  619. DBC *dbc;
  620. EPG *parent;
  621. PAGE *lchild, *rchild;
  622. int space_check;
  623. {
  624. BINTERNAL bi, *child_bi;
  625. BKEYDATA *child_bk, *tmp_bk;
  626. BTREE *t;
  627. BTREE_CURSOR *cp;
  628. DB *dbp;
  629. DBT a, b, hdr, data;
  630. PAGE *ppage;
  631. RINTERNAL ri;
  632. db_indx_t off;
  633. db_recno_t nrecs;
  634. size_t (*func) __P((DB *, const DBT *, const DBT *));
  635. u_int32_t n, nbytes, nksize;
  636. int ret;
  637. dbp = dbc->dbp;
  638. cp = (BTREE_CURSOR *)dbc->internal;
  639. t = dbp->bt_internal;
  640. ppage = parent->page;
  641. /* If handling record numbers, count records split to the right page. */
  642. nrecs = F_ISSET(cp, C_RECNUM) &&
  643.     !space_check ? __bam_total(dbp, rchild) : 0;
  644. /*
  645.  * Now we insert the new page's first key into the parent page, which
  646.  * completes the split.  The parent points to a PAGE and a page index
  647.  * offset, where the new key goes ONE AFTER the index, because we split
  648.  * to the right.
  649.  *
  650.  * XXX
  651.  * Some btree algorithms replace the key for the old page as well as
  652.  * the new page.  We don't, as there's no reason to believe that the
  653.  * first key on the old page is any better than the key we have, and,
  654.  * in the case of a key being placed at index 0 causing the split, the
  655.  * key is unavailable.
  656.  */
  657. off = parent->indx + O_INDX;
  658. /*
  659.  * Calculate the space needed on the parent page.
  660.  *
  661.  * Prefix trees: space hack used when inserting into BINTERNAL pages.
  662.  * Retain only what's needed to distinguish between the new entry and
  663.  * the LAST entry on the page to its left.  If the keys compare equal,
  664.  * retain the entire key.  We ignore overflow keys, and the entire key
  665.  * must be retained for the next-to-leftmost key on the leftmost page
  666.  * of each level, or the search will fail.  Applicable ONLY to internal
  667.  * pages that have leaf pages as children.  Further reduction of the
  668.  * key between pairs of internal pages loses too much information.
  669.  */
  670. switch (TYPE(rchild)) {
  671. case P_IBTREE:
  672. child_bi = GET_BINTERNAL(dbp, rchild, 0);
  673. nbytes = BINTERNAL_PSIZE(child_bi->len);
  674. if (P_FREESPACE(dbp, ppage) < nbytes)
  675. return (DB_NEEDSPLIT);
  676. if (space_check)
  677. return (0);
  678. /* Add a new record for the right page. */
  679. memset(&bi, 0, sizeof(bi));
  680. bi.len = child_bi->len;
  681. B_TSET(bi.type, child_bi->type, 0);
  682. bi.pgno = rchild->pgno;
  683. bi.nrecs = nrecs;
  684. memset(&hdr, 0, sizeof(hdr));
  685. hdr.data = &bi;
  686. hdr.size = SSZA(BINTERNAL, data);
  687. memset(&data, 0, sizeof(data));
  688. data.data = child_bi->data;
  689. data.size = child_bi->len;
  690. if ((ret = __db_pitem(dbc, ppage, off,
  691.     BINTERNAL_SIZE(child_bi->len), &hdr, &data)) != 0)
  692. return (ret);
  693. /* Increment the overflow ref count. */
  694. if (B_TYPE(child_bi->type) == B_OVERFLOW)
  695. if ((ret = __db_ovref(dbc,
  696.     ((BOVERFLOW *)(child_bi->data))->pgno, 1)) != 0)
  697. return (ret);
  698. break;
  699. case P_LDUP:
  700. case P_LBTREE:
  701. child_bk = GET_BKEYDATA(dbp, rchild, 0);
  702. switch (B_TYPE(child_bk->type)) {
  703. case B_KEYDATA:
  704. /*
  705.  * We set t->bt_prefix to NULL if we have a comparison
  706.  * callback but no prefix compression callback.  But,
  707.  * if we're splitting in an off-page duplicates tree,
  708.  * we still have to do some checking.  If using the
  709.  * default off-page duplicates comparison routine we
  710.  * can use the default prefix compression callback. If
  711.  * not using the default off-page duplicates comparison
  712.  * routine, we can't do any kind of prefix compression
  713.  * as there's no way for an application to specify a
  714.  * prefix compression callback that corresponds to its
  715.  * comparison callback.
  716.  */
  717. if (F_ISSET(dbc, DBC_OPD)) {
  718. if (dbp->dup_compare == __bam_defcmp)
  719. func = __bam_defpfx;
  720. else
  721. func = NULL;
  722. } else
  723. func = t->bt_prefix;
  724. nbytes = BINTERNAL_PSIZE(child_bk->len);
  725. nksize = child_bk->len;
  726. if (func == NULL)
  727. goto noprefix;
  728. if (ppage->prev_pgno == PGNO_INVALID && off <= 1)
  729. goto noprefix;
  730. tmp_bk = GET_BKEYDATA(dbp, lchild, NUM_ENT(lchild) -
  731.     (TYPE(lchild) == P_LDUP ? O_INDX : P_INDX));
  732. if (B_TYPE(tmp_bk->type) != B_KEYDATA)
  733. goto noprefix;
  734. memset(&a, 0, sizeof(a));
  735. a.size = tmp_bk->len;
  736. a.data = tmp_bk->data;
  737. memset(&b, 0, sizeof(b));
  738. b.size = child_bk->len;
  739. b.data = child_bk->data;
  740. nksize = (u_int32_t)func(dbp, &a, &b);
  741. if ((n = BINTERNAL_PSIZE(nksize)) < nbytes)
  742. nbytes = n;
  743. else
  744. noprefix: nksize = child_bk->len;
  745. if (P_FREESPACE(dbp, ppage) < nbytes)
  746. return (DB_NEEDSPLIT);
  747. if (space_check)
  748. return (0);
  749. memset(&bi, 0, sizeof(bi));
  750. bi.len = nksize;
  751. B_TSET(bi.type, child_bk->type, 0);
  752. bi.pgno = rchild->pgno;
  753. bi.nrecs = nrecs;
  754. memset(&hdr, 0, sizeof(hdr));
  755. hdr.data = &bi;
  756. hdr.size = SSZA(BINTERNAL, data);
  757. memset(&data, 0, sizeof(data));
  758. data.data = child_bk->data;
  759. data.size = nksize;
  760. if ((ret = __db_pitem(dbc, ppage, off,
  761.     BINTERNAL_SIZE(nksize), &hdr, &data)) != 0)
  762. return (ret);
  763. break;
  764. case B_DUPLICATE:
  765. case B_OVERFLOW:
  766. nbytes = BINTERNAL_PSIZE(BOVERFLOW_SIZE);
  767. if (P_FREESPACE(dbp, ppage) < nbytes)
  768. return (DB_NEEDSPLIT);
  769. if (space_check)
  770. return (0);
  771. memset(&bi, 0, sizeof(bi));
  772. bi.len = BOVERFLOW_SIZE;
  773. B_TSET(bi.type, child_bk->type, 0);
  774. bi.pgno = rchild->pgno;
  775. bi.nrecs = nrecs;
  776. memset(&hdr, 0, sizeof(hdr));
  777. hdr.data = &bi;
  778. hdr.size = SSZA(BINTERNAL, data);
  779. memset(&data, 0, sizeof(data));
  780. data.data = child_bk;
  781. data.size = BOVERFLOW_SIZE;
  782. if ((ret = __db_pitem(dbc, ppage, off,
  783.     BINTERNAL_SIZE(BOVERFLOW_SIZE), &hdr, &data)) != 0)
  784. return (ret);
  785. /* Increment the overflow ref count. */
  786. if (B_TYPE(child_bk->type) == B_OVERFLOW)
  787. if ((ret = __db_ovref(dbc,
  788.     ((BOVERFLOW *)child_bk)->pgno, 1)) != 0)
  789. return (ret);
  790. break;
  791. default:
  792. return (__db_pgfmt(dbp->dbenv, rchild->pgno));
  793. }
  794. break;
  795. case P_IRECNO:
  796. case P_LRECNO:
  797. nbytes = RINTERNAL_PSIZE;
  798. if (P_FREESPACE(dbp, ppage) < nbytes)
  799. return (DB_NEEDSPLIT);
  800. if (space_check)
  801. return (0);
  802. /* Add a new record for the right page. */
  803. memset(&hdr, 0, sizeof(hdr));
  804. hdr.data = &ri;
  805. hdr.size = RINTERNAL_SIZE;
  806. ri.pgno = rchild->pgno;
  807. ri.nrecs = nrecs;
  808. if ((ret = __db_pitem(dbc,
  809.     ppage, off, RINTERNAL_SIZE, &hdr, NULL)) != 0)
  810. return (ret);
  811. break;
  812. default:
  813. return (__db_pgfmt(dbp->dbenv, rchild->pgno));
  814. }
  815. /*
  816.  * If a Recno or Btree with record numbers AM page, or an off-page
  817.  * duplicates tree, adjust the parent page's left page record count.
  818.  */
  819. if (F_ISSET(cp, C_RECNUM)) {
  820. /* Log the change. */
  821. if (DBC_LOGGING(dbc)) {
  822. if ((ret = __bam_cadjust_log(dbp, dbc->txn,
  823.     &LSN(ppage), 0, PGNO(ppage),
  824.     &LSN(ppage), parent->indx, -(int32_t)nrecs, 0)) != 0)
  825. return (ret);
  826. } else
  827. LSN_NOT_LOGGED(LSN(ppage));
  828. /* Update the left page count. */
  829. if (dbc->dbtype == DB_RECNO)
  830. GET_RINTERNAL(dbp, ppage, parent->indx)->nrecs -= nrecs;
  831. else
  832. GET_BINTERNAL(dbp, ppage, parent->indx)->nrecs -= nrecs;
  833. }
  834. return (0);
  835. }
  836. /*
  837.  * __bam_psplit --
  838.  * Do the real work of splitting the page.
  839.  */
  840. static int
  841. __bam_psplit(dbc, cp, lp, rp, splitret)
  842. DBC *dbc;
  843. EPG *cp;
  844. PAGE *lp, *rp;
  845. db_indx_t *splitret;
  846. {
  847. DB *dbp;
  848. PAGE *pp;
  849. db_indx_t half, *inp, nbytes, off, splitp, top;
  850. int adjust, cnt, iflag, isbigkey, ret;
  851. dbp = dbc->dbp;
  852. pp = cp->page;
  853. inp = P_INP(dbp, pp);
  854. adjust = TYPE(pp) == P_LBTREE ? P_INDX : O_INDX;
  855. /*
  856.  * If we're splitting the first (last) page on a level because we're
  857.  * inserting (appending) a key to it, it's likely that the data is
  858.  * sorted.  Moving a single item to the new page is less work and can
  859.  * push the fill factor higher than normal.  This is trivial when we
  860.  * are splitting a new page before the beginning of the tree, all of
  861.  * the interesting tests are against values of 0.
  862.  *
  863.  * Catching appends to the tree is harder.  In a simple append, we're
  864.  * inserting an item that sorts past the end of the tree; the cursor
  865.  * will point past the last element on the page.  But, in trees with
  866.  * duplicates, the cursor may point to the last entry on the page --
  867.  * in this case, the entry will also be the last element of a duplicate
  868.  * set (the last because the search call specified the S_DUPLAST flag).
  869.  * The only way to differentiate between an insert immediately before
  870.  * the last item in a tree or an append after a duplicate set which is
  871.  * also the last item in the tree is to call the comparison function.
  872.  * When splitting internal pages during an append, the search code
  873.  * guarantees the cursor always points to the largest page item less
  874.  * than the new internal entry.  To summarize, we want to catch three
  875.  * possible index values:
  876.  *
  877.  * NUM_ENT(page) Btree/Recno leaf insert past end-of-tree
  878.  * NUM_ENT(page) - O_INDX Btree or Recno internal insert past EOT
  879.  * NUM_ENT(page) - P_INDX Btree leaf insert past EOT after a set
  880.  *     of duplicates
  881.  *
  882.  * two of which, (NUM_ENT(page) - O_INDX or P_INDX) might be an insert
  883.  * near the end of the tree, and not after the end of the tree at all.
  884.  * Do a simple test which might be wrong because calling the comparison
  885.  * functions is expensive.  Regardless, it's not a big deal if we're
  886.  * wrong, we'll do the split the right way next time.
  887.  */
  888. off = 0;
  889. if (NEXT_PGNO(pp) == PGNO_INVALID && cp->indx >= NUM_ENT(pp) - adjust)
  890. off = NUM_ENT(pp) - adjust;
  891. else if (PREV_PGNO(pp) == PGNO_INVALID && cp->indx == 0)
  892. off = adjust;
  893. if (off != 0)
  894. goto sort;
  895. /*
  896.  * Split the data to the left and right pages.  Try not to split on
  897.  * an overflow key.  (Overflow keys on internal pages will slow down
  898.  * searches.)  Refuse to split in the middle of a set of duplicates.
  899.  *
  900.  * First, find the optimum place to split.
  901.  *
  902.  * It's possible to try and split past the last record on the page if
  903.  * there's a very large record at the end of the page.  Make sure this
  904.  * doesn't happen by bounding the check at the next-to-last entry on
  905.  * the page.
  906.  *
  907.  * Note, we try and split half the data present on the page.  This is
  908.  * because another process may have already split the page and left
  909.  * it half empty.  We don't try and skip the split -- we don't know
  910.  * how much space we're going to need on the page, and we may need up
  911.  * to half the page for a big item, so there's no easy test to decide
  912.  * if we need to split or not.  Besides, if two threads are inserting
  913.  * data into the same place in the database, we're probably going to
  914.  * need more space soon anyway.
  915.  */
  916. top = NUM_ENT(pp) - adjust;
  917. half = (dbp->pgsize - HOFFSET(pp)) / 2;
  918. for (nbytes = 0, off = 0; off < top && nbytes < half; ++off)
  919. switch (TYPE(pp)) {
  920. case P_IBTREE:
  921. if (B_TYPE(
  922.     GET_BINTERNAL(dbp, pp, off)->type) == B_KEYDATA)
  923. nbytes += BINTERNAL_SIZE(
  924.    GET_BINTERNAL(dbp, pp, off)->len);
  925. else
  926. nbytes += BINTERNAL_SIZE(BOVERFLOW_SIZE);
  927. break;
  928. case P_LBTREE:
  929. if (B_TYPE(GET_BKEYDATA(dbp, pp, off)->type) ==
  930.     B_KEYDATA)
  931. nbytes += BKEYDATA_SIZE(GET_BKEYDATA(dbp,
  932.     pp, off)->len);
  933. else
  934. nbytes += BOVERFLOW_SIZE;
  935. ++off;
  936. /* FALLTHROUGH */
  937. case P_LDUP:
  938. case P_LRECNO:
  939. if (B_TYPE(GET_BKEYDATA(dbp, pp, off)->type) ==
  940.     B_KEYDATA)
  941. nbytes += BKEYDATA_SIZE(GET_BKEYDATA(dbp,
  942.     pp, off)->len);
  943. else
  944. nbytes += BOVERFLOW_SIZE;
  945. break;
  946. case P_IRECNO:
  947. nbytes += RINTERNAL_SIZE;
  948. break;
  949. default:
  950. return (__db_pgfmt(dbp->dbenv, pp->pgno));
  951. }
  952. sort: splitp = off;
  953. /*
  954.  * Splitp is either at or just past the optimum split point.  If the
  955.  * tree type is such that we're going to promote a key to an internal
  956.  * page, and our current choice is an overflow key, look for something
  957.  * close by that's smaller.
  958.  */
  959. switch (TYPE(pp)) {
  960. case P_IBTREE:
  961. iflag = 1;
  962. isbigkey =
  963.     B_TYPE(GET_BINTERNAL(dbp, pp, off)->type) != B_KEYDATA;
  964. break;
  965. case P_LBTREE:
  966. case P_LDUP:
  967. iflag = 0;
  968. isbigkey = B_TYPE(GET_BKEYDATA(dbp, pp, off)->type) !=
  969.     B_KEYDATA;
  970. break;
  971. default:
  972. iflag = isbigkey = 0;
  973. }
  974. if (isbigkey)
  975. for (cnt = 1; cnt <= 3; ++cnt) {
  976. off = splitp + cnt * adjust;
  977. if (off < (db_indx_t)NUM_ENT(pp) &&
  978.     ((iflag && B_TYPE(
  979.     GET_BINTERNAL(dbp, pp,off)->type) == B_KEYDATA) ||
  980.     B_TYPE(GET_BKEYDATA(dbp, pp, off)->type) ==
  981.     B_KEYDATA)) {
  982. splitp = off;
  983. break;
  984. }
  985. if (splitp <= (db_indx_t)(cnt * adjust))
  986. continue;
  987. off = splitp - cnt * adjust;
  988. if (iflag ? B_TYPE(
  989.     GET_BINTERNAL(dbp, pp, off)->type) == B_KEYDATA :
  990.     B_TYPE(GET_BKEYDATA(dbp, pp, off)->type) ==
  991.     B_KEYDATA) {
  992. splitp = off;
  993. break;
  994. }
  995. }
  996. /*
  997.  * We can't split in the middle a set of duplicates.  We know that
  998.  * no duplicate set can take up more than about 25% of the page,
  999.  * because that's the point where we push it off onto a duplicate
  1000.  * page set.  So, this loop can't be unbounded.
  1001.  */
  1002. if (TYPE(pp) == P_LBTREE &&
  1003.     inp[splitp] == inp[splitp - adjust])
  1004. for (cnt = 1;; ++cnt) {
  1005. off = splitp + cnt * adjust;
  1006. if (off < NUM_ENT(pp) &&
  1007.     inp[splitp] != inp[off]) {
  1008. splitp = off;
  1009. break;
  1010. }
  1011. if (splitp <= (db_indx_t)(cnt * adjust))
  1012. continue;
  1013. off = splitp - cnt * adjust;
  1014. if (inp[splitp] != inp[off]) {
  1015. splitp = off + adjust;
  1016. break;
  1017. }
  1018. }
  1019. /* We're going to split at splitp. */
  1020. if ((ret = __bam_copy(dbp, pp, lp, 0, splitp)) != 0)
  1021. return (ret);
  1022. if ((ret = __bam_copy(dbp, pp, rp, splitp, NUM_ENT(pp))) != 0)
  1023. return (ret);
  1024. *splitret = splitp;
  1025. return (0);
  1026. }
  1027. /*
  1028.  * __bam_copy --
  1029.  * Copy a set of records from one page to another.
  1030.  *
  1031.  * PUBLIC: int __bam_copy __P((DB *, PAGE *, PAGE *, u_int32_t, u_int32_t));
  1032.  */
  1033. int
  1034. __bam_copy(dbp, pp, cp, nxt, stop)
  1035. DB *dbp;
  1036. PAGE *pp, *cp;
  1037. u_int32_t nxt, stop;
  1038. {
  1039. db_indx_t *cinp, nbytes, off, *pinp;
  1040. cinp = P_INP(dbp, cp);
  1041. pinp = P_INP(dbp, pp);
  1042. /*
  1043.  * Nxt is the offset of the next record to be placed on the target page.
  1044.  */
  1045. for (off = 0; nxt < stop; ++nxt, ++NUM_ENT(cp), ++off) {
  1046. switch (TYPE(pp)) {
  1047. case P_IBTREE:
  1048. if (B_TYPE(
  1049.     GET_BINTERNAL(dbp, pp, nxt)->type) == B_KEYDATA)
  1050. nbytes = BINTERNAL_SIZE(
  1051.     GET_BINTERNAL(dbp, pp, nxt)->len);
  1052. else
  1053. nbytes = BINTERNAL_SIZE(BOVERFLOW_SIZE);
  1054. break;
  1055. case P_LBTREE:
  1056. /*
  1057.  * If we're on a key and it's a duplicate, just copy
  1058.  * the offset.
  1059.  */
  1060. if (off != 0 && (nxt % P_INDX) == 0 &&
  1061.     pinp[nxt] == pinp[nxt - P_INDX]) {
  1062. cinp[off] = cinp[off - P_INDX];
  1063. continue;
  1064. }
  1065. /* FALLTHROUGH */
  1066. case P_LDUP:
  1067. case P_LRECNO:
  1068. if (B_TYPE(GET_BKEYDATA(dbp, pp, nxt)->type) ==
  1069.     B_KEYDATA)
  1070. nbytes = BKEYDATA_SIZE(GET_BKEYDATA(dbp,
  1071.     pp, nxt)->len);
  1072. else
  1073. nbytes = BOVERFLOW_SIZE;
  1074. break;
  1075. case P_IRECNO:
  1076. nbytes = RINTERNAL_SIZE;
  1077. break;
  1078. default:
  1079. return (__db_pgfmt(dbp->dbenv, pp->pgno));
  1080. }
  1081. cinp[off] = HOFFSET(cp) -= nbytes;
  1082. memcpy(P_ENTRY(dbp, cp, off), P_ENTRY(dbp, pp, nxt), nbytes);
  1083. }
  1084. return (0);
  1085. }