nbtinsert.c
上传用户:blenddy
上传日期:2007-01-07
资源大小:6495k
文件大小:49k
源码类别:

数据库系统

开发平台:

Unix_Linux

  1. /*-------------------------------------------------------------------------
  2.  *
  3.  * btinsert.c
  4.  *   Item insertion in Lehman and Yao btrees for Postgres.
  5.  *
  6.  * Copyright (c) 1994, Regents of the University of California
  7.  *
  8.  *
  9.  * IDENTIFICATION
  10.  *   $Header: /usr/local/cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.42.2.2 1999/09/01 17:54:00 scrappy Exp $
  11.  *
  12.  *-------------------------------------------------------------------------
  13.  */
  14. #include "postgres.h"
  15. #include "access/heapam.h"
  16. #include "access/nbtree.h"
  17. static InsertIndexResult _bt_insertonpg(Relation rel, Buffer buf, BTStack stack, int keysz, ScanKey scankey, BTItem btitem, BTItem afteritem);
  18. static Buffer _bt_split(Relation rel, Buffer buf, OffsetNumber firstright);
  19. static OffsetNumber _bt_findsplitloc(Relation rel, Page page, OffsetNumber start, OffsetNumber maxoff, Size llimit);
  20. static void _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
  21. static OffsetNumber _bt_pgaddtup(Relation rel, Buffer buf, int keysz, ScanKey itup_scankey, Size itemsize, BTItem btitem, BTItem afteritem);
  22. static bool _bt_goesonpg(Relation rel, Buffer buf, Size keysz, ScanKey scankey, BTItem afteritem);
  23. static void _bt_updateitem(Relation rel, Size keysz, Buffer buf, BTItem oldItem, BTItem newItem);
  24. static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum, int keysz, ScanKey scankey);
  25. /*
  26.  * _bt_doinsert() -- Handle insertion of a single btitem in the tree.
  27.  *
  28.  * This routine is called by the public interface routines, btbuild
  29.  * and btinsert.  By here, btitem is filled in, and has a unique
  30.  * (xid, seqno) pair.
  31.  */
  32. InsertIndexResult
  33. _bt_doinsert(Relation rel, BTItem btitem, bool index_is_unique, Relation heapRel)
  34. {
  35. ScanKey itup_scankey;
  36. IndexTuple itup;
  37. BTStack stack;
  38. Buffer buf;
  39. BlockNumber blkno;
  40. int natts = rel->rd_rel->relnatts;
  41. InsertIndexResult res;
  42. Buffer buffer;
  43. itup = &(btitem->bti_itup);
  44. /* we need a scan key to do our search, so build one */
  45. itup_scankey = _bt_mkscankey(rel, itup);
  46. /* find the page containing this key */
  47. stack = _bt_search(rel, natts, itup_scankey, &buf);
  48. /* trade in our read lock for a write lock */
  49. LockBuffer(buf, BUFFER_LOCK_UNLOCK);
  50. LockBuffer(buf, BT_WRITE);
  51. l1:
  52. /*
  53.  * If the page was split between the time that we surrendered our read
  54.  * lock and acquired our write lock, then this page may no longer be
  55.  * the right place for the key we want to insert.  In this case, we
  56.  * need to move right in the tree. See Lehman and Yao for an
  57.  * excruciatingly precise description.
  58.  */
  59. buf = _bt_moveright(rel, buf, natts, itup_scankey, BT_WRITE);
  60. blkno = BufferGetBlockNumber(buf);
  61. /* if we're not allowing duplicates, make sure the key isn't */
  62. /* already in the node */
  63. if (index_is_unique)
  64. {
  65. OffsetNumber offset,
  66. maxoff;
  67. Page page;
  68. page = BufferGetPage(buf);
  69. maxoff = PageGetMaxOffsetNumber(page);
  70. offset = _bt_binsrch(rel, buf, natts, itup_scankey, BT_DESCENT);
  71. /* make sure the offset we're given points to an actual */
  72. /* key on the page before trying to compare it */
  73. if (!PageIsEmpty(page) && offset <= maxoff)
  74. {
  75. TupleDesc itupdesc;
  76. BTItem cbti;
  77. HeapTupleData htup;
  78. BTPageOpaque opaque;
  79. Buffer nbuf;
  80. BlockNumber nblkno;
  81. bool chtup = true;
  82. itupdesc = RelationGetDescr(rel);
  83. nbuf = InvalidBuffer;
  84. opaque = (BTPageOpaque) PageGetSpecialPointer(page);
  85. /*
  86.  * _bt_compare returns 0 for (1,NULL) and (1,NULL) - this's
  87.  * how we handling NULLs - and so we must not use _bt_compare
  88.  * in real comparison, but only for ordering/finding items on
  89.  * pages. - vadim 03/24/97
  90.  *
  91.  * while ( !_bt_compare (rel, itupdesc, page, natts,
  92.  * itup_scankey, offset) )
  93.  */
  94. while (_bt_isequal(itupdesc, page, offset, natts, itup_scankey))
  95. { /* they're equal */
  96. /*
  97.  * Have to check is inserted heap tuple deleted one (i.e.
  98.  * just moved to another place by vacuum)!
  99.  */
  100. if (chtup)
  101. {
  102. htup.t_self = btitem->bti_itup.t_tid;
  103. heap_fetch(heapRel, SnapshotDirty, &htup, &buffer);
  104. if (htup.t_data == NULL) /* YES! */
  105. break;
  106. /* Live tuple was inserted */
  107. ReleaseBuffer(buffer);
  108. chtup = false;
  109. }
  110. cbti = (BTItem) PageGetItem(page, PageGetItemId(page, offset));
  111. htup.t_self = cbti->bti_itup.t_tid;
  112. heap_fetch(heapRel, SnapshotDirty, &htup, &buffer);
  113. if (htup.t_data != NULL) /* it is a duplicate */
  114. {
  115. TransactionId xwait =
  116. (TransactionIdIsValid(SnapshotDirty->xmin)) ?
  117. SnapshotDirty->xmin : SnapshotDirty->xmax;
  118. /*
  119.  * If this tuple is being updated by other transaction
  120.  * then we have to wait for its commit/abort.
  121.  */
  122. ReleaseBuffer(buffer);
  123. if (TransactionIdIsValid(xwait))
  124. {
  125. if (nbuf != InvalidBuffer)
  126. _bt_relbuf(rel, nbuf, BT_READ);
  127. _bt_relbuf(rel, buf, BT_WRITE);
  128. XactLockTableWait(xwait);
  129. buf = _bt_getbuf(rel, blkno, BT_WRITE);
  130. goto l1;/* continue from the begin */
  131. }
  132. elog(ERROR, "Cannot insert a duplicate key into a unique index");
  133. }
  134. /* htup null so no buffer to release */
  135. /* get next offnum */
  136. if (offset < maxoff)
  137. offset = OffsetNumberNext(offset);
  138. else
  139. { /* move right ? */
  140. if (P_RIGHTMOST(opaque))
  141. break;
  142. if (!_bt_isequal(itupdesc, page, P_HIKEY,
  143.  natts, itup_scankey))
  144. break;
  145. /*
  146.  * min key of the right page is the same, ooh - so
  147.  * many dead duplicates...
  148.  */
  149. nblkno = opaque->btpo_next;
  150. if (nbuf != InvalidBuffer)
  151. _bt_relbuf(rel, nbuf, BT_READ);
  152. for (nbuf = InvalidBuffer;;)
  153. {
  154. nbuf = _bt_getbuf(rel, nblkno, BT_READ);
  155. page = BufferGetPage(nbuf);
  156. maxoff = PageGetMaxOffsetNumber(page);
  157. opaque = (BTPageOpaque) PageGetSpecialPointer(page);
  158. offset = P_RIGHTMOST(opaque) ? P_HIKEY : P_FIRSTKEY;
  159. if (!PageIsEmpty(page) && offset <= maxoff)
  160. { /* Found some key */
  161. break;
  162. }
  163. else
  164. { /* Empty or "pseudo"-empty page - get next */
  165. nblkno = opaque->btpo_next;
  166. _bt_relbuf(rel, nbuf, BT_READ);
  167. nbuf = InvalidBuffer;
  168. if (nblkno == P_NONE)
  169. break;
  170. }
  171. }
  172. if (nbuf == InvalidBuffer)
  173. break;
  174. }
  175. }
  176. if (nbuf != InvalidBuffer)
  177. _bt_relbuf(rel, nbuf, BT_READ);
  178. }
  179. }
  180. /* do the insertion */
  181. res = _bt_insertonpg(rel, buf, stack, natts, itup_scankey,
  182.  btitem, (BTItem) NULL);
  183. /* be tidy */
  184. _bt_freestack(stack);
  185. _bt_freeskey(itup_scankey);
  186. return res;
  187. }
  188. /*
  189.  * _bt_insertonpg() -- Insert a tuple on a particular page in the index.
  190.  *
  191.  * This recursive procedure does the following things:
  192.  *
  193.  * +  if necessary, splits the target page.
  194.  * +  finds the right place to insert the tuple (taking into
  195.  *    account any changes induced by a split).
  196.  * +  inserts the tuple.
  197.  * +  if the page was split, pops the parent stack, and finds the
  198.  *    right place to insert the new child pointer (by walking
  199.  *    right using information stored in the parent stack).
  200.  * +  invoking itself with the appropriate tuple for the right
  201.  *    child page on the parent.
  202.  *
  203.  * On entry, we must have the right buffer on which to do the
  204.  * insertion, and the buffer must be pinned and locked.  On return,
  205.  * we will have dropped both the pin and the write lock on the buffer.
  206.  *
  207.  * The locking interactions in this code are critical.  You should
  208.  * grok Lehman and Yao's paper before making any changes.  In addition,
  209.  * you need to understand how we disambiguate duplicate keys in this
  210.  * implementation, in order to be able to find our location using
  211.  * L&Y "move right" operations.  Since we may insert duplicate user
  212.  * keys, and since these dups may propogate up the tree, we use the
  213.  * 'afteritem' parameter to position ourselves correctly for the
  214.  * insertion on internal pages.
  215.  */
  216. static InsertIndexResult
  217. _bt_insertonpg(Relation rel,
  218.    Buffer buf,
  219.    BTStack stack,
  220.    int keysz,
  221.    ScanKey scankey,
  222.    BTItem btitem,
  223.    BTItem afteritem)
  224. {
  225. InsertIndexResult res;
  226. Page page;
  227. BTPageOpaque lpageop;
  228. BlockNumber itup_blkno;
  229. OffsetNumber itup_off;
  230. OffsetNumber firstright = InvalidOffsetNumber;
  231. int itemsz;
  232. bool do_split = false;
  233. bool keys_equal = false;
  234. page = BufferGetPage(buf);
  235. lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
  236. itemsz = IndexTupleDSize(btitem->bti_itup)
  237. + (sizeof(BTItemData) - sizeof(IndexTupleData));
  238. itemsz = MAXALIGN(itemsz); /* be safe, PageAddItem will do
  239.  * this but we need to be
  240.  * consistent */
  241. /*
  242.  * If we have to insert item on the leftmost page which is the first
  243.  * page in the chain of duplicates then: 1. if scankey == hikey (i.e.
  244.  * - new duplicate item) then insert it here; 2. if scankey < hikey
  245.  * then: 2.a if there is duplicate key(s) here - we force splitting;
  246.  * 2.b else - we may "eat" this page from duplicates chain.
  247.  */
  248. if (lpageop->btpo_flags & BTP_CHAIN)
  249. {
  250. OffsetNumber maxoff = PageGetMaxOffsetNumber(page);
  251. ItemId hitemid;
  252. BTItem hitem;
  253. Assert(!P_RIGHTMOST(lpageop));
  254. hitemid = PageGetItemId(page, P_HIKEY);
  255. hitem = (BTItem) PageGetItem(page, hitemid);
  256. if (maxoff > P_HIKEY &&
  257. !_bt_itemcmp(rel, keysz, hitem,
  258.  (BTItem) PageGetItem(page, PageGetItemId(page, P_FIRSTKEY)),
  259.  BTEqualStrategyNumber))
  260. elog(FATAL, "btree: bad key on the page in the chain of duplicates");
  261. if (!_bt_skeycmp(rel, keysz, scankey, page, hitemid,
  262.  BTEqualStrategyNumber))
  263. {
  264. if (!P_LEFTMOST(lpageop))
  265. elog(FATAL, "btree: attempt to insert bad key on the non-leftmost page in the chain of duplicates");
  266. if (!_bt_skeycmp(rel, keysz, scankey, page, hitemid,
  267.  BTLessStrategyNumber))
  268. elog(FATAL, "btree: attempt to insert higher key on the leftmost page in the chain of duplicates");
  269. if (maxoff > P_HIKEY) /* have duplicate(s) */
  270. {
  271. firstright = P_FIRSTKEY;
  272. do_split = true;
  273. }
  274. else
  275. /* "eat" page */
  276. {
  277. Buffer pbuf;
  278. Page ppage;
  279. itup_blkno = BufferGetBlockNumber(buf);
  280. itup_off = PageAddItem(page, (Item) btitem, itemsz,
  281.    P_FIRSTKEY, LP_USED);
  282. if (itup_off == InvalidOffsetNumber)
  283. elog(FATAL, "btree: failed to add item");
  284. lpageop->btpo_flags &= ~BTP_CHAIN;
  285. pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
  286. ppage = BufferGetPage(pbuf);
  287. PageIndexTupleDelete(ppage, stack->bts_offset);
  288. pfree(stack->bts_btitem);
  289. stack->bts_btitem = _bt_formitem(&(btitem->bti_itup));
  290. ItemPointerSet(&(stack->bts_btitem->bti_itup.t_tid),
  291.    itup_blkno, P_HIKEY);
  292. _bt_wrtbuf(rel, buf);
  293. res = _bt_insertonpg(rel, pbuf, stack->bts_parent,
  294.  keysz, scankey, stack->bts_btitem,
  295.  NULL);
  296. ItemPointerSet(&(res->pointerData), itup_blkno, itup_off);
  297. return res;
  298. }
  299. }
  300. else
  301. {
  302. keys_equal = true;
  303. if (PageGetFreeSpace(page) < itemsz)
  304. do_split = true;
  305. }
  306. }
  307. else if (PageGetFreeSpace(page) < itemsz)
  308. do_split = true;
  309. else if (PageGetFreeSpace(page) < 3 * itemsz + 2 * sizeof(ItemIdData))
  310. {
  311. OffsetNumber offnum = (P_RIGHTMOST(lpageop)) ? P_HIKEY : P_FIRSTKEY;
  312. OffsetNumber maxoff = PageGetMaxOffsetNumber(page);
  313. ItemId itid;
  314. BTItem previtem,
  315. chkitem;
  316. Size maxsize;
  317. Size currsize;
  318. itid = PageGetItemId(page, offnum);
  319. previtem = (BTItem) PageGetItem(page, itid);
  320. maxsize = currsize = (ItemIdGetLength(itid) + sizeof(ItemIdData));
  321. for (offnum = OffsetNumberNext(offnum);
  322.  offnum <= maxoff; offnum = OffsetNumberNext(offnum))
  323. {
  324. itid = PageGetItemId(page, offnum);
  325. chkitem = (BTItem) PageGetItem(page, itid);
  326. if (!_bt_itemcmp(rel, keysz, previtem, chkitem,
  327.  BTEqualStrategyNumber))
  328. {
  329. if (currsize > maxsize)
  330. maxsize = currsize;
  331. currsize = 0;
  332. previtem = chkitem;
  333. }
  334. currsize += (ItemIdGetLength(itid) + sizeof(ItemIdData));
  335. }
  336. if (currsize > maxsize)
  337. maxsize = currsize;
  338. maxsize += sizeof(PageHeaderData) +
  339. MAXALIGN(sizeof(BTPageOpaqueData));
  340. if (maxsize >= PageGetPageSize(page) / 2)
  341. do_split = true;
  342. }
  343. if (do_split)
  344. {
  345. Buffer rbuf;
  346. Page rpage;
  347. BTItem ritem;
  348. BlockNumber rbknum;
  349. BTPageOpaque rpageop;
  350. Buffer pbuf;
  351. Page ppage;
  352. BTPageOpaque ppageop;
  353. BlockNumber bknum = BufferGetBlockNumber(buf);
  354. BTItem lowLeftItem;
  355. OffsetNumber maxoff;
  356. bool shifted = false;
  357. bool left_chained = (lpageop->btpo_flags & BTP_CHAIN) ? true : false;
  358. bool is_root = lpageop->btpo_flags & BTP_ROOT;
  359. /*
  360.  * Instead of splitting leaf page in the chain of duplicates 
  361.  * by new duplicate, insert it into some right page.
  362.  */
  363. if ((lpageop->btpo_flags & BTP_CHAIN) &&
  364. (lpageop->btpo_flags & BTP_LEAF) && keys_equal)
  365. {
  366. rbuf = _bt_getbuf(rel, lpageop->btpo_next, BT_WRITE);
  367. rpage = BufferGetPage(rbuf);
  368. rpageop = (BTPageOpaque) PageGetSpecialPointer(rpage);
  369. /* 
  370.  * some checks 
  371.  */
  372. if (!P_RIGHTMOST(rpageop)) /* non-rightmost page */
  373. { /* If we have the same hikey here then
  374.  * it's yet another page in chain. */
  375. if (_bt_skeycmp(rel, keysz, scankey, rpage,
  376. PageGetItemId(rpage, P_HIKEY),
  377. BTEqualStrategyNumber))
  378. {
  379. if (!(rpageop->btpo_flags & BTP_CHAIN))
  380. elog(FATAL, "btree: lost page in the chain of duplicates");
  381. }
  382. else if (_bt_skeycmp(rel, keysz, scankey, rpage,
  383.  PageGetItemId(rpage, P_HIKEY),
  384.  BTGreaterStrategyNumber))
  385. elog(FATAL, "btree: hikey is out of order");
  386. else if (rpageop->btpo_flags & BTP_CHAIN)
  387. /*
  388.  * If hikey > scankey then it's last page in chain and
  389.  * BTP_CHAIN must be OFF
  390.  */
  391. elog(FATAL, "btree: lost last page in the chain of duplicates");
  392. }
  393. else
  394. /* rightmost page */
  395. {
  396. Assert(!(rpageop->btpo_flags & BTP_CHAIN));
  397. }
  398. _bt_relbuf(rel, buf, BT_WRITE);
  399. return (_bt_insertonpg(rel, rbuf, stack, keysz,
  400.    scankey, btitem, afteritem));
  401. }
  402. /*
  403.  * If after splitting un-chained page we'll got chain of pages
  404.  * with duplicates then we want to know 1. on which of two pages
  405.  * new btitem will go (current _bt_findsplitloc is quite bad); 2.
  406.  * what parent (if there's one) thinking about it (remember about
  407.  * deletions)
  408.  */
  409. else if (!(lpageop->btpo_flags & BTP_CHAIN))
  410. {
  411. OffsetNumber start = (P_RIGHTMOST(lpageop)) ? P_HIKEY : P_FIRSTKEY;
  412. Size llimit;
  413. maxoff = PageGetMaxOffsetNumber(page);
  414. llimit = PageGetPageSize(page) - sizeof(PageHeaderData) -
  415. MAXALIGN(sizeof(BTPageOpaqueData))
  416. +sizeof(ItemIdData);
  417. llimit /= 2;
  418. firstright = _bt_findsplitloc(rel, page, start, maxoff, llimit);
  419. if (_bt_itemcmp(rel, keysz,
  420.   (BTItem) PageGetItem(page, PageGetItemId(page, start)),
  421.  (BTItem) PageGetItem(page, PageGetItemId(page, firstright)),
  422. BTEqualStrategyNumber))
  423. {
  424. if (_bt_skeycmp(rel, keysz, scankey, page,
  425. PageGetItemId(page, firstright),
  426. BTLessStrategyNumber))
  427. /*
  428.  * force moving current items to the new page: new
  429.  * item will go on the current page.
  430.  */
  431. firstright = start;
  432. else
  433. /*
  434.  * new btitem >= firstright, start item == firstright
  435.  * - new chain of duplicates: if this non-leftmost
  436.  * leaf page and parent item < start item then force
  437.  * moving all items to the new page - current page
  438.  * will be "empty" after it.
  439.  */
  440. {
  441. if (!P_LEFTMOST(lpageop) &&
  442. (lpageop->btpo_flags & BTP_LEAF))
  443. {
  444. ItemPointerSet(&(stack->bts_btitem->bti_itup.t_tid),
  445.    bknum, P_HIKEY);
  446. pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
  447. if (_bt_itemcmp(rel, keysz, stack->bts_btitem,
  448. (BTItem) PageGetItem(page,
  449.  PageGetItemId(page, start)),
  450. BTLessStrategyNumber))
  451. {
  452. firstright = start;
  453. shifted = true;
  454. }
  455. _bt_relbuf(rel, pbuf, BT_WRITE);
  456. }
  457. }
  458. } /* else - no new chain if start item <
  459.  * firstright one */
  460. }
  461. /* split the buffer into left and right halves */
  462. rbuf = _bt_split(rel, buf, firstright);
  463. /* which new page (left half or right half) gets the tuple? */
  464. if (_bt_goesonpg(rel, buf, keysz, scankey, afteritem))
  465. {
  466. /* left page */
  467. itup_off = _bt_pgaddtup(rel, buf, keysz, scankey,
  468. itemsz, btitem, afteritem);
  469. itup_blkno = BufferGetBlockNumber(buf);
  470. }
  471. else
  472. {
  473. /* right page */
  474. itup_off = _bt_pgaddtup(rel, rbuf, keysz, scankey,
  475. itemsz, btitem, afteritem);
  476. itup_blkno = BufferGetBlockNumber(rbuf);
  477. }
  478. maxoff = PageGetMaxOffsetNumber(page);
  479. if (shifted)
  480. {
  481. if (maxoff > P_FIRSTKEY)
  482. elog(FATAL, "btree: shifted page is not empty");
  483. lowLeftItem = (BTItem) NULL;
  484. }
  485. else
  486. {
  487. if (maxoff < P_FIRSTKEY)
  488. elog(FATAL, "btree: un-shifted page is empty");
  489. lowLeftItem = (BTItem) PageGetItem(page,
  490. PageGetItemId(page, P_FIRSTKEY));
  491. if (_bt_itemcmp(rel, keysz, lowLeftItem,
  492. (BTItem) PageGetItem(page, PageGetItemId(page, P_HIKEY)),
  493. BTEqualStrategyNumber))
  494. lpageop->btpo_flags |= BTP_CHAIN;
  495. }
  496. /*
  497.  * By here,
  498.  *
  499.  * +  our target page has been split; +  the original tuple has been
  500.  * inserted; + we have write locks on both the old (left half)
  501.  * and new (right half) buffers, after the split; and +  we have
  502.  * the key we want to insert into the parent.
  503.  *
  504.  * Do the parent insertion.  We need to hold onto the locks for the
  505.  * child pages until we locate the parent, but we can release them
  506.  * before doing the actual insertion (see Lehman and Yao for the
  507.  * reasoning).
  508.  */
  509. l_spl: ;
  510. if (stack == (BTStack) NULL)
  511. {
  512. if (!is_root) /* if this page was not root page */
  513. {
  514. elog(DEBUG, "btree: concurrent ROOT page split");
  515. stack = (BTStack) palloc(sizeof(BTStackData));
  516. stack->bts_blkno = lpageop->btpo_parent;
  517. stack->bts_offset = InvalidOffsetNumber;
  518. stack->bts_btitem = (BTItem) palloc(sizeof(BTItemData));
  519. /* bts_btitem will be initialized below */
  520. stack->bts_parent = NULL;
  521. goto l_spl;
  522. }
  523. /* create a new root node and release the split buffers */
  524. _bt_newroot(rel, buf, rbuf);
  525. }
  526. else
  527. {
  528. ScanKey newskey;
  529. InsertIndexResult newres;
  530. BTItem new_item;
  531. OffsetNumber upditem_offset = P_HIKEY;
  532. bool do_update = false;
  533. bool update_in_place = true;
  534. bool parent_chained;
  535. /* form a index tuple that points at the new right page */
  536. rbknum = BufferGetBlockNumber(rbuf);
  537. rpage = BufferGetPage(rbuf);
  538. rpageop = (BTPageOpaque) PageGetSpecialPointer(rpage);
  539. /*
  540.  * By convention, the first entry (1) on every non-rightmost
  541.  * page is the high key for that page. In order to get the
  542.  * lowest key on the new right page, we actually look at its
  543.  * second (2) entry.
  544.  */
  545. if (!P_RIGHTMOST(rpageop))
  546. {
  547. ritem = (BTItem) PageGetItem(rpage,
  548.    PageGetItemId(rpage, P_FIRSTKEY));
  549. if (_bt_itemcmp(rel, keysz, ritem,
  550. (BTItem) PageGetItem(rpage,
  551.   PageGetItemId(rpage, P_HIKEY)),
  552. BTEqualStrategyNumber))
  553. rpageop->btpo_flags |= BTP_CHAIN;
  554. }
  555. else
  556. ritem = (BTItem) PageGetItem(rpage,
  557.   PageGetItemId(rpage, P_HIKEY));
  558. /* get a unique btitem for this key */
  559. new_item = _bt_formitem(&(ritem->bti_itup));
  560. ItemPointerSet(&(new_item->bti_itup.t_tid), rbknum, P_HIKEY);
  561. /*
  562.  * Find the parent buffer and get the parent page.
  563.  *
  564.  * Oops - if we were moved right then we need to change stack
  565.  * item! We want to find parent pointing to where we are,
  566.  * right ?   - vadim 05/27/97
  567.  */
  568. ItemPointerSet(&(stack->bts_btitem->bti_itup.t_tid),
  569.    bknum, P_HIKEY);
  570. pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
  571. ppage = BufferGetPage(pbuf);
  572. ppageop = (BTPageOpaque) PageGetSpecialPointer(ppage);
  573. parent_chained = ((ppageop->btpo_flags & BTP_CHAIN)) ? true : false;
  574. if (parent_chained && !left_chained)
  575. elog(FATAL, "nbtree: unexpected chained parent of unchained page");
  576. /*
  577.  * If the key of new_item is < than the key of the item in the
  578.  * parent page pointing to the left page (stack->bts_btitem),
  579.  * we have to update the latter key; otherwise the keys on the
  580.  * parent page wouldn't be monotonically increasing after we
  581.  * inserted the new pointer to the right page (new_item). This
  582.  * only happens if our left page is the leftmost page and a
  583.  * new minimum key had been inserted before, which is not
  584.  * reflected in the parent page but didn't matter so far. If
  585.  * there are duplicate keys and this new minimum key spills
  586.  * over to our new right page, we get an inconsistency if we
  587.  * don't update the left key in the parent page.
  588.  *
  589.  * Also, new duplicates handling code require us to update parent
  590.  * item if some smaller items left on the left page (which is
  591.  * possible in splitting leftmost page) and current parent
  592.  * item == new_item. - vadim 05/27/97
  593.  */
  594. if (_bt_itemcmp(rel, keysz, stack->bts_btitem, new_item,
  595. BTGreaterStrategyNumber) ||
  596. (!shifted &&
  597.  _bt_itemcmp(rel, keysz, stack->bts_btitem,
  598.  new_item, BTEqualStrategyNumber) &&
  599.  _bt_itemcmp(rel, keysz, lowLeftItem,
  600.  new_item, BTLessStrategyNumber)))
  601. {
  602. do_update = true;
  603. /*
  604.  * figure out which key is leftmost (if the parent page is
  605.  * rightmost, too, it must be the root)
  606.  */
  607. if (P_RIGHTMOST(ppageop))
  608. upditem_offset = P_HIKEY;
  609. else
  610. upditem_offset = P_FIRSTKEY;
  611. if (!P_LEFTMOST(lpageop) ||
  612. stack->bts_offset != upditem_offset)
  613. elog(FATAL, "btree: items are out of order (leftmost %d, stack %u, update %u)",
  614.  P_LEFTMOST(lpageop), stack->bts_offset, upditem_offset);
  615. }
  616. if (do_update)
  617. {
  618. if (shifted)
  619. elog(FATAL, "btree: attempt to update parent for shifted page");
  620. /*
  621.  * Try to update in place. If out parent page is chained
  622.  * then we must forse insertion.
  623.  */
  624. if (!parent_chained &&
  625. MAXALIGN(IndexTupleDSize(lowLeftItem->bti_itup)) ==
  626. MAXALIGN(IndexTupleDSize(stack->bts_btitem->bti_itup)))
  627. {
  628. _bt_updateitem(rel, keysz, pbuf,
  629.    stack->bts_btitem, lowLeftItem);
  630. _bt_wrtbuf(rel, buf);
  631. _bt_wrtbuf(rel, rbuf);
  632. }
  633. else
  634. {
  635. update_in_place = false;
  636. PageIndexTupleDelete(ppage, upditem_offset);
  637. /*
  638.  * don't write anything out yet--we still have the
  639.  * write lock, and now we call another _bt_insertonpg
  640.  * to insert the correct key. First, make a new item,
  641.  * using the tuple data from lowLeftItem. Point it to
  642.  * the left child. Update it on the stack at the same
  643.  * time.
  644.  */
  645. pfree(stack->bts_btitem);
  646. stack->bts_btitem = _bt_formitem(&(lowLeftItem->bti_itup));
  647. ItemPointerSet(&(stack->bts_btitem->bti_itup.t_tid),
  648.    bknum, P_HIKEY);
  649. /*
  650.  * Unlock the children before doing this
  651.  */
  652. _bt_wrtbuf(rel, buf);
  653. _bt_wrtbuf(rel, rbuf);
  654. /*
  655.  * A regular _bt_binsrch should find the right place
  656.  * to put the new entry, since it should be lower than
  657.  * any other key on the page. Therefore set afteritem
  658.  * to NULL.
  659.  */
  660. newskey = _bt_mkscankey(rel, &(stack->bts_btitem->bti_itup));
  661. newres = _bt_insertonpg(rel, pbuf, stack->bts_parent,
  662.    keysz, newskey, stack->bts_btitem,
  663. NULL);
  664. pfree(newres);
  665. pfree(newskey);
  666. /*
  667.  * we have now lost our lock on the parent buffer, and
  668.  * need to get it back.
  669.  */
  670. pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
  671. }
  672. }
  673. else
  674. {
  675. _bt_wrtbuf(rel, buf);
  676. _bt_wrtbuf(rel, rbuf);
  677. }
  678. newskey = _bt_mkscankey(rel, &(new_item->bti_itup));
  679. afteritem = stack->bts_btitem;
  680. if (parent_chained && !update_in_place)
  681. {
  682. ppage = BufferGetPage(pbuf);
  683. ppageop = (BTPageOpaque) PageGetSpecialPointer(ppage);
  684. if (ppageop->btpo_flags & BTP_CHAIN)
  685. elog(FATAL, "btree: unexpected BTP_CHAIN flag in parent after update");
  686. if (P_RIGHTMOST(ppageop))
  687. elog(FATAL, "btree: chained parent is RIGHTMOST after update");
  688. maxoff = PageGetMaxOffsetNumber(ppage);
  689. if (maxoff != P_FIRSTKEY)
  690. elog(FATAL, "btree: FIRSTKEY was unexpected in parent after update");
  691. if (_bt_skeycmp(rel, keysz, newskey, ppage,
  692. PageGetItemId(ppage, P_FIRSTKEY),
  693. BTLessEqualStrategyNumber))
  694. elog(FATAL, "btree: parent FIRSTKEY is >= duplicate key after update");
  695. if (!_bt_skeycmp(rel, keysz, newskey, ppage,
  696.  PageGetItemId(ppage, P_HIKEY),
  697.  BTEqualStrategyNumber))
  698. elog(FATAL, "btree: parent HIGHKEY is not equal duplicate key after update");
  699. afteritem = (BTItem) NULL;
  700. }
  701. else if (left_chained && !update_in_place)
  702. {
  703. ppage = BufferGetPage(pbuf);
  704. ppageop = (BTPageOpaque) PageGetSpecialPointer(ppage);
  705. if (!P_RIGHTMOST(ppageop) &&
  706. _bt_skeycmp(rel, keysz, newskey, ppage,
  707. PageGetItemId(ppage, P_HIKEY),
  708. BTGreaterStrategyNumber))
  709. afteritem = (BTItem) NULL;
  710. }
  711. if (afteritem == (BTItem) NULL)
  712. {
  713. rbuf = _bt_getbuf(rel, ppageop->btpo_next, BT_WRITE);
  714. _bt_relbuf(rel, pbuf, BT_WRITE);
  715. pbuf = rbuf;
  716. }
  717. newres = _bt_insertonpg(rel, pbuf, stack->bts_parent,
  718. keysz, newskey, new_item,
  719. afteritem);
  720. /* be tidy */
  721. pfree(newres);
  722. pfree(newskey);
  723. pfree(new_item);
  724. }
  725. }
  726. else
  727. {
  728. itup_off = _bt_pgaddtup(rel, buf, keysz, scankey,
  729. itemsz, btitem, afteritem);
  730. itup_blkno = BufferGetBlockNumber(buf);
  731. _bt_relbuf(rel, buf, BT_WRITE);
  732. }
  733. /* by here, the new tuple is inserted */
  734. res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData));
  735. ItemPointerSet(&(res->pointerData), itup_blkno, itup_off);
  736. return res;
  737. }
  738. /*
  739.  * _bt_split() -- split a page in the btree.
  740.  *
  741.  * On entry, buf is the page to split, and is write-locked and pinned.
  742.  * Returns the new right sibling of buf, pinned and write-locked. The
  743.  * pin and lock on buf are maintained.
  744.  */
  745. static Buffer
  746. _bt_split(Relation rel, Buffer buf, OffsetNumber firstright)
  747. {
  748. Buffer rbuf;
  749. Page origpage;
  750. Page leftpage,
  751. rightpage;
  752. BTPageOpaque ropaque,
  753. lopaque,
  754. oopaque;
  755. Buffer sbuf;
  756. Page spage;
  757. BTPageOpaque sopaque;
  758. Size itemsz;
  759. ItemId itemid;
  760. BTItem item;
  761. OffsetNumber leftoff,
  762. rightoff;
  763. OffsetNumber start;
  764. OffsetNumber maxoff;
  765. OffsetNumber i;
  766. rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
  767. origpage = BufferGetPage(buf);
  768. leftpage = PageGetTempPage(origpage, sizeof(BTPageOpaqueData));
  769. rightpage = BufferGetPage(rbuf);
  770. _bt_pageinit(rightpage, BufferGetPageSize(rbuf));
  771. _bt_pageinit(leftpage, BufferGetPageSize(buf));
  772. /* init btree private data */
  773. oopaque = (BTPageOpaque) PageGetSpecialPointer(origpage);
  774. lopaque = (BTPageOpaque) PageGetSpecialPointer(leftpage);
  775. ropaque = (BTPageOpaque) PageGetSpecialPointer(rightpage);
  776. /* if we're splitting this page, it won't be the root when we're done */
  777. oopaque->btpo_flags &= ~BTP_ROOT;
  778. oopaque->btpo_flags &= ~BTP_CHAIN;
  779. lopaque->btpo_flags = ropaque->btpo_flags = oopaque->btpo_flags;
  780. lopaque->btpo_prev = oopaque->btpo_prev;
  781. ropaque->btpo_prev = BufferGetBlockNumber(buf);
  782. lopaque->btpo_next = BufferGetBlockNumber(rbuf);
  783. ropaque->btpo_next = oopaque->btpo_next;
  784. lopaque->btpo_parent = ropaque->btpo_parent = oopaque->btpo_parent;
  785. /*
  786.  * If the page we're splitting is not the rightmost page at its level
  787.  * in the tree, then the first (0) entry on the page is the high key
  788.  * for the page.  We need to copy that to the right half.  Otherwise
  789.  * (meaning the rightmost page case), we should treat the line
  790.  * pointers beginning at zero as user data.
  791.  *
  792.  * We leave a blank space at the start of the line table for the left
  793.  * page.  We'll come back later and fill it in with the high key item
  794.  * we get from the right key.
  795.  */
  796. leftoff = P_FIRSTKEY;
  797. ropaque->btpo_next = oopaque->btpo_next;
  798. if (!P_RIGHTMOST(oopaque))
  799. {
  800. /* splitting a non-rightmost page, start at the first data item */
  801. start = P_FIRSTKEY;
  802. itemid = PageGetItemId(origpage, P_HIKEY);
  803. itemsz = ItemIdGetLength(itemid);
  804. item = (BTItem) PageGetItem(origpage, itemid);
  805. if (PageAddItem(rightpage, (Item) item, itemsz, P_HIKEY, LP_USED) == InvalidOffsetNumber)
  806. elog(FATAL, "btree: failed to add hikey to the right sibling");
  807. rightoff = P_FIRSTKEY;
  808. }
  809. else
  810. {
  811. /* splitting a rightmost page, "high key" is the first data item */
  812. start = P_HIKEY;
  813. /* the new rightmost page will not have a high key */
  814. rightoff = P_HIKEY;
  815. }
  816. maxoff = PageGetMaxOffsetNumber(origpage);
  817. if (firstright == InvalidOffsetNumber)
  818. {
  819. Size llimit = PageGetFreeSpace(leftpage) / 2;
  820. firstright = _bt_findsplitloc(rel, origpage, start, maxoff, llimit);
  821. }
  822. for (i = start; i <= maxoff; i = OffsetNumberNext(i))
  823. {
  824. itemid = PageGetItemId(origpage, i);
  825. itemsz = ItemIdGetLength(itemid);
  826. item = (BTItem) PageGetItem(origpage, itemid);
  827. /* decide which page to put it on */
  828. if (i < firstright)
  829. {
  830. if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
  831. LP_USED) == InvalidOffsetNumber)
  832. elog(FATAL, "btree: failed to add item to the left sibling");
  833. leftoff = OffsetNumberNext(leftoff);
  834. }
  835. else
  836. {
  837. if (PageAddItem(rightpage, (Item) item, itemsz, rightoff,
  838. LP_USED) == InvalidOffsetNumber)
  839. elog(FATAL, "btree: failed to add item to the right sibling");
  840. rightoff = OffsetNumberNext(rightoff);
  841. }
  842. }
  843. /*
  844.  * Okay, page has been split, high key on right page is correct.  Now
  845.  * set the high key on the left page to be the min key on the right
  846.  * page.
  847.  */
  848. if (P_RIGHTMOST(ropaque))
  849. itemid = PageGetItemId(rightpage, P_HIKEY);
  850. else
  851. itemid = PageGetItemId(rightpage, P_FIRSTKEY);
  852. itemsz = ItemIdGetLength(itemid);
  853. item = (BTItem) PageGetItem(rightpage, itemid);
  854. /*
  855.  * We left a hole for the high key on the left page; fill it.  The
  856.  * modal crap is to tell the page manager to put the new item on the
  857.  * page and not screw around with anything else.  Whoever designed
  858.  * this interface has presumably crawled back into the dung heap they
  859.  * came from.  No one here will admit to it.
  860.  */
  861. PageManagerModeSet(OverwritePageManagerMode);
  862. if (PageAddItem(leftpage, (Item) item, itemsz, P_HIKEY, LP_USED) == InvalidOffsetNumber)
  863. elog(FATAL, "btree: failed to add hikey to the left sibling");
  864. PageManagerModeSet(ShufflePageManagerMode);
  865. /*
  866.  * By here, the original data page has been split into two new halves,
  867.  * and these are correct.  The algorithm requires that the left page
  868.  * never move during a split, so we copy the new left page back on top
  869.  * of the original.  Note that this is not a waste of time, since we
  870.  * also require (in the page management code) that the center of a
  871.  * page always be clean, and the most efficient way to guarantee this
  872.  * is just to compact the data by reinserting it into a new left page.
  873.  */
  874. PageRestoreTempPage(leftpage, origpage);
  875. /* write these guys out */
  876. _bt_wrtnorelbuf(rel, rbuf);
  877. _bt_wrtnorelbuf(rel, buf);
  878. /*
  879.  * Finally, we need to grab the right sibling (if any) and fix the
  880.  * prev pointer there. We are guaranteed that this is deadlock-free
  881.  * since no other writer will be moving holding a lock on that page
  882.  * and trying to move left, and all readers release locks on a page
  883.  * before trying to fetch its neighbors.
  884.  */
  885. if (!P_RIGHTMOST(ropaque))
  886. {
  887. sbuf = _bt_getbuf(rel, ropaque->btpo_next, BT_WRITE);
  888. spage = BufferGetPage(sbuf);
  889. sopaque = (BTPageOpaque) PageGetSpecialPointer(spage);
  890. sopaque->btpo_prev = BufferGetBlockNumber(rbuf);
  891. /* write and release the old right sibling */
  892. _bt_wrtbuf(rel, sbuf);
  893. }
  894. /* split's done */
  895. return rbuf;
  896. }
  897. /*
  898.  * _bt_findsplitloc() -- find a safe place to split a page.
  899.  *
  900.  * In order to guarantee the proper handling of searches for duplicate
  901.  * keys, the first duplicate in the chain must either be the first
  902.  * item on the page after the split, or the entire chain must be on
  903.  * one of the two pages.  That is,
  904.  * [1 2 2 2 3 4 5]
  905.  * must become
  906.  * [1] [2 2 2 3 4 5]
  907.  * or
  908.  * [1 2 2 2] [3 4 5]
  909.  * but not
  910.  * [1 2 2] [2 3 4 5].
  911.  * However,
  912.  * [2 2 2 2 2 3 4]
  913.  * may be split as
  914.  * [2 2 2 2] [2 3 4].
  915.  */
  916. static OffsetNumber
  917. _bt_findsplitloc(Relation rel,
  918.  Page page,
  919.  OffsetNumber start,
  920.  OffsetNumber maxoff,
  921.  Size llimit)
  922. {
  923. OffsetNumber i;
  924. OffsetNumber saferight;
  925. ItemId nxtitemid,
  926. safeitemid;
  927. BTItem safeitem,
  928. nxtitem;
  929. Size nbytes;
  930. int natts;
  931. if (start >= maxoff)
  932. elog(FATAL, "btree: cannot split if start (%d) >= maxoff (%d)",
  933.  start, maxoff);
  934. natts = rel->rd_rel->relnatts;
  935. saferight = start;
  936. safeitemid = PageGetItemId(page, saferight);
  937. nbytes = ItemIdGetLength(safeitemid) + sizeof(ItemIdData);
  938. safeitem = (BTItem) PageGetItem(page, safeitemid);
  939. i = OffsetNumberNext(start);
  940. while (nbytes < llimit)
  941. {
  942. /* check the next item on the page */
  943. nxtitemid = PageGetItemId(page, i);
  944. nbytes += (ItemIdGetLength(nxtitemid) + sizeof(ItemIdData));
  945. nxtitem = (BTItem) PageGetItem(page, nxtitemid);
  946. /*
  947.  * Test against last known safe item: if the tuple we're looking
  948.  * at isn't equal to the last safe one we saw, then it's our new
  949.  * safe tuple.
  950.  */
  951. if (!_bt_itemcmp(rel, natts,
  952.  safeitem, nxtitem, BTEqualStrategyNumber))
  953. {
  954. safeitem = nxtitem;
  955. saferight = i;
  956. }
  957. if (i < maxoff)
  958. i = OffsetNumberNext(i);
  959. else
  960. break;
  961. }
  962. /*
  963.  * If the chain of dups starts at the beginning of the page and
  964.  * extends past the halfway mark, we can split it in the middle.
  965.  */
  966. if (saferight == start)
  967. saferight = i;
  968. if (saferight == maxoff && (maxoff - start) > 1)
  969. saferight = start + (maxoff - start) / 2;
  970. return saferight;
  971. }
  972. /*
  973.  * _bt_newroot() -- Create a new root page for the index.
  974.  *
  975.  * We've just split the old root page and need to create a new one.
  976.  * In order to do this, we add a new root page to the file, then lock
  977.  * the metadata page and update it.  This is guaranteed to be deadlock-
  978.  * free, because all readers release their locks on the metadata page
  979.  * before trying to lock the root, and all writers lock the root before
  980.  * trying to lock the metadata page.  We have a write lock on the old
  981.  * root page, so we have not introduced any cycles into the waits-for
  982.  * graph.
  983.  *
  984.  * On entry, lbuf (the old root) and rbuf (its new peer) are write-
  985.  * locked.  We don't drop the locks in this routine; that's done by
  986.  * the caller.  On exit, a new root page exists with entries for the
  987.  * two new children.  The new root page is neither pinned nor locked.
  988.  */
  989. static void
  990. _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
  991. {
  992. Buffer rootbuf;
  993. Page lpage,
  994. rpage,
  995. rootpage;
  996. BlockNumber lbkno,
  997. rbkno;
  998. BlockNumber rootbknum;
  999. BTPageOpaque rootopaque;
  1000. ItemId itemid;
  1001. BTItem item;
  1002. Size itemsz;
  1003. BTItem new_item;
  1004. /* get a new root page */
  1005. rootbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
  1006. rootpage = BufferGetPage(rootbuf);
  1007. rootbknum = BufferGetBlockNumber(rootbuf);
  1008. _bt_pageinit(rootpage, BufferGetPageSize(rootbuf));
  1009. /* set btree special data */
  1010. rootopaque = (BTPageOpaque) PageGetSpecialPointer(rootpage);
  1011. rootopaque->btpo_prev = rootopaque->btpo_next = P_NONE;
  1012. rootopaque->btpo_flags |= BTP_ROOT;
  1013. /*
  1014.  * Insert the internal tuple pointers.
  1015.  */
  1016. lbkno = BufferGetBlockNumber(lbuf);
  1017. rbkno = BufferGetBlockNumber(rbuf);
  1018. lpage = BufferGetPage(lbuf);
  1019. rpage = BufferGetPage(rbuf);
  1020. ((BTPageOpaque) PageGetSpecialPointer(lpage))->btpo_parent =
  1021. ((BTPageOpaque) PageGetSpecialPointer(rpage))->btpo_parent =
  1022. rootbknum;
  1023. /*
  1024.  * step over the high key on the left page while building the left
  1025.  * page pointer.
  1026.  */
  1027. itemid = PageGetItemId(lpage, P_FIRSTKEY);
  1028. itemsz = ItemIdGetLength(itemid);
  1029. item = (BTItem) PageGetItem(lpage, itemid);
  1030. new_item = _bt_formitem(&(item->bti_itup));
  1031. ItemPointerSet(&(new_item->bti_itup.t_tid), lbkno, P_HIKEY);
  1032. /*
  1033.  * insert the left page pointer into the new root page.  the root page
  1034.  * is the rightmost page on its level so the "high key" item is the
  1035.  * first data item.
  1036.  */
  1037. if (PageAddItem(rootpage, (Item) new_item, itemsz, P_HIKEY, LP_USED) == InvalidOffsetNumber)
  1038. elog(FATAL, "btree: failed to add leftkey to new root page");
  1039. pfree(new_item);
  1040. /*
  1041.  * the right page is the rightmost page on the second level, so the
  1042.  * "high key" item is the first data item on that page as well.
  1043.  */
  1044. itemid = PageGetItemId(rpage, P_HIKEY);
  1045. itemsz = ItemIdGetLength(itemid);
  1046. item = (BTItem) PageGetItem(rpage, itemid);
  1047. new_item = _bt_formitem(&(item->bti_itup));
  1048. ItemPointerSet(&(new_item->bti_itup.t_tid), rbkno, P_HIKEY);
  1049. /*
  1050.  * insert the right page pointer into the new root page.
  1051.  */
  1052. if (PageAddItem(rootpage, (Item) new_item, itemsz, P_FIRSTKEY, LP_USED) == InvalidOffsetNumber)
  1053. elog(FATAL, "btree: failed to add rightkey to new root page");
  1054. pfree(new_item);
  1055. /* write and let go of the root buffer */
  1056. _bt_wrtbuf(rel, rootbuf);
  1057. /* update metadata page with new root block number */
  1058. _bt_metaproot(rel, rootbknum, 0);
  1059. _bt_wrtbuf(rel, lbuf);
  1060. _bt_wrtbuf(rel, rbuf);
  1061. }
  1062. /*
  1063.  * _bt_pgaddtup() -- add a tuple to a particular page in the index.
  1064.  *
  1065.  * This routine adds the tuple to the page as requested, and keeps the
  1066.  * write lock and reference associated with the page's buffer.  It is
  1067.  * an error to call pgaddtup() without a write lock and reference.  If
  1068.  * afteritem is non-null, it's the item that we expect our new item
  1069.  * to follow. Otherwise, we do a binary search for the correct place
  1070.  * and insert the new item there.
  1071.  */
  1072. static OffsetNumber
  1073. _bt_pgaddtup(Relation rel,
  1074.  Buffer buf,
  1075.  int keysz,
  1076.  ScanKey itup_scankey,
  1077.  Size itemsize,
  1078.  BTItem btitem,
  1079.  BTItem afteritem)
  1080. {
  1081. OffsetNumber itup_off;
  1082. OffsetNumber first;
  1083. Page page;
  1084. BTPageOpaque opaque;
  1085. BTItem chkitem;
  1086. page = BufferGetPage(buf);
  1087. opaque = (BTPageOpaque) PageGetSpecialPointer(page);
  1088. first = P_RIGHTMOST(opaque) ? P_HIKEY : P_FIRSTKEY;
  1089. if (afteritem == (BTItem) NULL)
  1090. itup_off = _bt_binsrch(rel, buf, keysz, itup_scankey, BT_INSERTION);
  1091. else
  1092. {
  1093. itup_off = first;
  1094. do
  1095. {
  1096. chkitem = (BTItem) PageGetItem(page, PageGetItemId(page, itup_off));
  1097. itup_off = OffsetNumberNext(itup_off);
  1098. } while (!BTItemSame(chkitem, afteritem));
  1099. }
  1100. if (PageAddItem(page, (Item) btitem, itemsize, itup_off, LP_USED) == InvalidOffsetNumber)
  1101. elog(FATAL, "btree: failed to add item to the page");
  1102. /* write the buffer, but hold our lock */
  1103. _bt_wrtnorelbuf(rel, buf);
  1104. return itup_off;
  1105. }
  1106. /*
  1107.  * _bt_goesonpg() -- Does a new tuple belong on this page?
  1108.  *
  1109.  * This is part of the complexity introduced by allowing duplicate
  1110.  * keys into the index.  The tuple belongs on this page if:
  1111.  *
  1112.  * + there is no page to the right of this one; or
  1113.  * + it is less than the high key on the page; or
  1114.  * + the item it is to follow ("afteritem") appears on this
  1115.  *   page.
  1116.  */
  1117. static bool
  1118. _bt_goesonpg(Relation rel,
  1119.  Buffer buf,
  1120.  Size keysz,
  1121.  ScanKey scankey,
  1122.  BTItem afteritem)
  1123. {
  1124. Page page;
  1125. ItemId hikey;
  1126. BTPageOpaque opaque;
  1127. BTItem chkitem;
  1128. OffsetNumber offnum,
  1129. maxoff;
  1130. bool found;
  1131. page = BufferGetPage(buf);
  1132. /* no right neighbor? */
  1133. opaque = (BTPageOpaque) PageGetSpecialPointer(page);
  1134. if (P_RIGHTMOST(opaque))
  1135. return true;
  1136. /*
  1137.  * this is a non-rightmost page, so it must have a high key item.
  1138.  *
  1139.  * If the scan key is < the high key (the min key on the next page), then
  1140.  * it for sure belongs here.
  1141.  */
  1142. hikey = PageGetItemId(page, P_HIKEY);
  1143. if (_bt_skeycmp(rel, keysz, scankey, page, hikey, BTLessStrategyNumber))
  1144. return true;
  1145. /*
  1146.  * If the scan key is > the high key, then it for sure doesn't belong
  1147.  * here.
  1148.  */
  1149. if (_bt_skeycmp(rel, keysz, scankey, page, hikey, BTGreaterStrategyNumber))
  1150. return false;
  1151. /*
  1152.  * If we have no adjacency information, and the item is equal to the
  1153.  * high key on the page (by here it is), then the item does not belong
  1154.  * on this page.
  1155.  *
  1156.  * Now it's not true in all cases. - vadim 06/10/97
  1157.  */
  1158. if (afteritem == (BTItem) NULL)
  1159. {
  1160. if (opaque->btpo_flags & BTP_LEAF)
  1161. return false;
  1162. if (opaque->btpo_flags & BTP_CHAIN)
  1163. return true;
  1164. if (_bt_skeycmp(rel, keysz, scankey, page,
  1165. PageGetItemId(page, P_FIRSTKEY),
  1166. BTEqualStrategyNumber))
  1167. return true;
  1168. return false;
  1169. }
  1170. /* damn, have to work for it.  i hate that. */
  1171. maxoff = PageGetMaxOffsetNumber(page);
  1172. /*
  1173.  * Search the entire page for the afteroid.  We need to do this,
  1174.  * rather than doing a binary search and starting from there, because
  1175.  * if the key we're searching for is the leftmost key in the tree at
  1176.  * this level, then a binary search will do the wrong thing.  Splits
  1177.  * are pretty infrequent, so the cost isn't as bad as it could be.
  1178.  */
  1179. found = false;
  1180. for (offnum = P_FIRSTKEY;
  1181.  offnum <= maxoff;
  1182.  offnum = OffsetNumberNext(offnum))
  1183. {
  1184. chkitem = (BTItem) PageGetItem(page, PageGetItemId(page, offnum));
  1185. if (BTItemSame(chkitem, afteritem))
  1186. {
  1187. found = true;
  1188. break;
  1189. }
  1190. }
  1191. return found;
  1192. }
  1193. /*
  1194.  * _bt_itemcmp() -- compare item1 to item2 using a requested
  1195.  *  strategy (<, <=, =, >=, >)
  1196.  *
  1197.  */
  1198. bool
  1199. _bt_itemcmp(Relation rel,
  1200. Size keysz,
  1201. BTItem item1,
  1202. BTItem item2,
  1203. StrategyNumber strat)
  1204. {
  1205. TupleDesc tupDes;
  1206. IndexTuple indexTuple1,
  1207. indexTuple2;
  1208. Datum attrDatum1,
  1209. attrDatum2;
  1210. int i;
  1211. bool isFirstNull,
  1212. isSecondNull;
  1213. bool compare;
  1214. bool useEqual = false;
  1215. if (strat == BTLessEqualStrategyNumber)
  1216. {
  1217. useEqual = true;
  1218. strat = BTLessStrategyNumber;
  1219. }
  1220. else if (strat == BTGreaterEqualStrategyNumber)
  1221. {
  1222. useEqual = true;
  1223. strat = BTGreaterStrategyNumber;
  1224. }
  1225. tupDes = RelationGetDescr(rel);
  1226. indexTuple1 = &(item1->bti_itup);
  1227. indexTuple2 = &(item2->bti_itup);
  1228. for (i = 1; i <= keysz; i++)
  1229. {
  1230. attrDatum1 = index_getattr(indexTuple1, i, tupDes, &isFirstNull);
  1231. attrDatum2 = index_getattr(indexTuple2, i, tupDes, &isSecondNull);
  1232. /* see comments about NULLs handling in btbuild */
  1233. if (isFirstNull) /* attr in item1 is NULL */
  1234. {
  1235. if (isSecondNull) /* attr in item2 is NULL too */
  1236. compare = (strat == BTEqualStrategyNumber) ? true : false;
  1237. else
  1238. compare = (strat == BTGreaterStrategyNumber) ? true : false;
  1239. }
  1240. else if (isSecondNull) /* attr in item1 is NOT_NULL and */
  1241. { /* and attr in item2 is NULL */
  1242. compare = (strat == BTLessStrategyNumber) ? true : false;
  1243. }
  1244. else
  1245. compare = _bt_invokestrat(rel, i, strat, attrDatum1, attrDatum2);
  1246. if (compare) /* true for one of ">, <, =" */
  1247. {
  1248. if (strat != BTEqualStrategyNumber)
  1249. return true;
  1250. }
  1251. else
  1252. /* false for one of ">, <, =" */
  1253. {
  1254. if (strat == BTEqualStrategyNumber)
  1255. return false;
  1256. /*
  1257.  * if original strat was "<=, >=" OR "<, >" but some
  1258.  * attribute(s) left - need to test for Equality
  1259.  */
  1260. if (useEqual || i < keysz)
  1261. {
  1262. if (isFirstNull || isSecondNull)
  1263. compare = (isFirstNull && isSecondNull) ? true : false;
  1264. else
  1265. compare = _bt_invokestrat(rel, i, BTEqualStrategyNumber,
  1266.   attrDatum1, attrDatum2);
  1267. if (compare) /* item1' and item2' attributes are equal */
  1268. continue; /* - try to compare next attributes */
  1269. }
  1270. return false;
  1271. }
  1272. }
  1273. return true;
  1274. }
  1275. /*
  1276.  * _bt_updateitem() -- updates the key of the item identified by the
  1277.  * oid with the key of newItem (done in place if
  1278.  * possible)
  1279.  *
  1280.  */
  1281. static void
  1282. _bt_updateitem(Relation rel,
  1283.    Size keysz,
  1284.    Buffer buf,
  1285.    BTItem oldItem,
  1286.    BTItem newItem)
  1287. {
  1288. Page page;
  1289. OffsetNumber maxoff;
  1290. OffsetNumber i;
  1291. ItemPointerData itemPtrData;
  1292. BTItem item;
  1293. IndexTuple oldIndexTuple,
  1294. newIndexTuple;
  1295. int first;
  1296. page = BufferGetPage(buf);
  1297. maxoff = PageGetMaxOffsetNumber(page);
  1298. /* locate item on the page */
  1299. first = P_RIGHTMOST((BTPageOpaque) PageGetSpecialPointer(page))
  1300. ? P_HIKEY : P_FIRSTKEY;
  1301. i = first;
  1302. do
  1303. {
  1304. item = (BTItem) PageGetItem(page, PageGetItemId(page, i));
  1305. i = OffsetNumberNext(i);
  1306. } while (i <= maxoff && !BTItemSame(item, oldItem));
  1307. /* this should never happen (in theory) */
  1308. if (!BTItemSame(item, oldItem))
  1309. elog(FATAL, "_bt_getstackbuf was lying!!");
  1310. /*
  1311.  * It's  defined by caller (_bt_insertonpg)
  1312.  */
  1313. /*
  1314.  * if(IndexTupleDSize(newItem->bti_itup) >
  1315.  * IndexTupleDSize(item->bti_itup)) { elog(NOTICE, "trying to
  1316.  * overwrite a smaller value with a bigger one in _bt_updateitem");
  1317.  * elog(ERROR, "this is not good."); }
  1318.  */
  1319. oldIndexTuple = &(item->bti_itup);
  1320. newIndexTuple = &(newItem->bti_itup);
  1321. /* keep the original item pointer */
  1322. ItemPointerCopy(&(oldIndexTuple->t_tid), &itemPtrData);
  1323. CopyIndexTuple(newIndexTuple, &oldIndexTuple);
  1324. ItemPointerCopy(&itemPtrData, &(oldIndexTuple->t_tid));
  1325. }
  1326. /*
  1327.  * _bt_isequal - used in _bt_doinsert in check for duplicates.
  1328.  *
  1329.  * Rule is simple: NOT_NULL not equal NULL, NULL not_equal NULL too.
  1330.  */
  1331. static bool
  1332. _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
  1333. int keysz, ScanKey scankey)
  1334. {
  1335. Datum datum;
  1336. BTItem btitem;
  1337. IndexTuple itup;
  1338. ScanKey entry;
  1339. AttrNumber attno;
  1340. long result;
  1341. int i;
  1342. bool null;
  1343. btitem = (BTItem) PageGetItem(page, PageGetItemId(page, offnum));
  1344. itup = &(btitem->bti_itup);
  1345. for (i = 1; i <= keysz; i++)
  1346. {
  1347. entry = &scankey[i - 1];
  1348. attno = entry->sk_attno;
  1349. Assert(attno == i);
  1350. datum = index_getattr(itup, attno, itupdesc, &null);
  1351. /* NULLs are not equal */
  1352. if (entry->sk_flags & SK_ISNULL || null)
  1353. return false;
  1354. result = (long) FMGR_PTR2(&entry->sk_func, entry->sk_argument, datum);
  1355. if (result != 0)
  1356. return false;
  1357. }
  1358. /* by here, the keys are equal */
  1359. return true;
  1360. }
  1361. #ifdef NOT_USED
  1362. /*
  1363.  * _bt_shift - insert btitem on the passed page after shifting page
  1364.  *    to the right in the tree.
  1365.  *
  1366.  * NOTE: tested for shifting leftmost page only, having btitem < hikey.
  1367.  */
  1368. static InsertIndexResult
  1369. _bt_shift(Relation rel, Buffer buf, BTStack stack, int keysz,
  1370.   ScanKey scankey, BTItem btitem, BTItem hikey)
  1371. {
  1372. InsertIndexResult res;
  1373. int itemsz;
  1374. Page page;
  1375. BlockNumber bknum;
  1376. BTPageOpaque pageop;
  1377. Buffer rbuf;
  1378. Page rpage;
  1379. BTPageOpaque rpageop;
  1380. Buffer pbuf;
  1381. Page ppage;
  1382. BTPageOpaque ppageop;
  1383. Buffer nbuf;
  1384. Page npage;
  1385. BTPageOpaque npageop;
  1386. BlockNumber nbknum;
  1387. BTItem nitem;
  1388. OffsetNumber afteroff;
  1389. btitem = _bt_formitem(&(btitem->bti_itup));
  1390. hikey = _bt_formitem(&(hikey->bti_itup));
  1391. page = BufferGetPage(buf);
  1392. /* grab new page */
  1393. nbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
  1394. nbknum = BufferGetBlockNumber(nbuf);
  1395. npage = BufferGetPage(nbuf);
  1396. _bt_pageinit(npage, BufferGetPageSize(nbuf));
  1397. npageop = (BTPageOpaque) PageGetSpecialPointer(npage);
  1398. /* copy content of the passed page */
  1399. memmove((char *) npage, (char *) page, BufferGetPageSize(buf));
  1400. /* re-init old (passed) page */
  1401. _bt_pageinit(page, BufferGetPageSize(buf));
  1402. pageop = (BTPageOpaque) PageGetSpecialPointer(page);
  1403. /* init old page opaque */
  1404. pageop->btpo_flags = npageop->btpo_flags; /* restore flags */
  1405. pageop->btpo_flags &= ~BTP_CHAIN;
  1406. if (_bt_itemcmp(rel, keysz, hikey, btitem, BTEqualStrategyNumber))
  1407. pageop->btpo_flags |= BTP_CHAIN;
  1408. pageop->btpo_prev = npageop->btpo_prev; /* restore prev */
  1409. pageop->btpo_next = nbknum; /* next points to the new page */
  1410. pageop->btpo_parent = npageop->btpo_parent;
  1411. /* init shifted page opaque */
  1412. npageop->btpo_prev = bknum = BufferGetBlockNumber(buf);
  1413. /* shifted page is ok, populate old page */
  1414. /* add passed hikey */
  1415. itemsz = IndexTupleDSize(hikey->bti_itup)
  1416. + (sizeof(BTItemData) - sizeof(IndexTupleData));
  1417. itemsz = MAXALIGN(itemsz);
  1418. if (PageAddItem(page, (Item) hikey, itemsz, P_HIKEY, LP_USED) == InvalidOffsetNumber)
  1419. elog(FATAL, "btree: failed to add hikey in _bt_shift");
  1420. pfree(hikey);
  1421. /* add btitem */
  1422. itemsz = IndexTupleDSize(btitem->bti_itup)
  1423. + (sizeof(BTItemData) - sizeof(IndexTupleData));
  1424. itemsz = MAXALIGN(itemsz);
  1425. if (PageAddItem(page, (Item) btitem, itemsz, P_FIRSTKEY, LP_USED) == InvalidOffsetNumber)
  1426. elog(FATAL, "btree: failed to add firstkey in _bt_shift");
  1427. pfree(btitem);
  1428. nitem = (BTItem) PageGetItem(page, PageGetItemId(page, P_FIRSTKEY));
  1429. btitem = _bt_formitem(&(nitem->bti_itup));
  1430. ItemPointerSet(&(btitem->bti_itup.t_tid), bknum, P_HIKEY);
  1431. /* ok, write them out */
  1432. _bt_wrtnorelbuf(rel, nbuf);
  1433. _bt_wrtnorelbuf(rel, buf);
  1434. /* fix btpo_prev on right sibling of old page */
  1435. if (!P_RIGHTMOST(npageop))
  1436. {
  1437. rbuf = _bt_getbuf(rel, npageop->btpo_next, BT_WRITE);
  1438. rpage = BufferGetPage(rbuf);
  1439. rpageop = (BTPageOpaque) PageGetSpecialPointer(rpage);
  1440. rpageop->btpo_prev = nbknum;
  1441. _bt_wrtbuf(rel, rbuf);
  1442. }
  1443. /* get parent pointing to the old page */
  1444. ItemPointerSet(&(stack->bts_btitem->bti_itup.t_tid),
  1445.    bknum, P_HIKEY);
  1446. pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
  1447. ppage = BufferGetPage(pbuf);
  1448. ppageop = (BTPageOpaque) PageGetSpecialPointer(ppage);
  1449. _bt_relbuf(rel, nbuf, BT_WRITE);
  1450. _bt_relbuf(rel, buf, BT_WRITE);
  1451. /* re-set parent' pointer - we shifted our page to the right ! */
  1452. nitem = (BTItem) PageGetItem(ppage,
  1453.  PageGetItemId(ppage, stack->bts_offset));
  1454. ItemPointerSet(&(nitem->bti_itup.t_tid), nbknum, P_HIKEY);
  1455. ItemPointerSet(&(stack->bts_btitem->bti_itup.t_tid), nbknum, P_HIKEY);
  1456. _bt_wrtnorelbuf(rel, pbuf);
  1457. /*
  1458.  * Now we want insert into the parent pointer to our old page. It has
  1459.  * to be inserted before the pointer to new page. You may get problems
  1460.  * here (in the _bt_goesonpg and/or _bt_pgaddtup), but may be not - I
  1461.  * don't know. It works if old page is leftmost (nitem is NULL) and
  1462.  * btitem < hikey and it's all what we need currently. - vadim
  1463.  * 05/30/97
  1464.  */
  1465. nitem = NULL;
  1466. afteroff = P_FIRSTKEY;
  1467. if (!P_RIGHTMOST(ppageop))
  1468. afteroff = OffsetNumberNext(afteroff);
  1469. if (stack->bts_offset >= afteroff)
  1470. {
  1471. afteroff = OffsetNumberPrev(stack->bts_offset);
  1472. nitem = (BTItem) PageGetItem(ppage, PageGetItemId(ppage, afteroff));
  1473. nitem = _bt_formitem(&(nitem->bti_itup));
  1474. }
  1475. res = _bt_insertonpg(rel, pbuf, stack->bts_parent,
  1476.  keysz, scankey, btitem, nitem);
  1477. pfree(btitem);
  1478. ItemPointerSet(&(res->pointerData), nbknum, P_HIKEY);
  1479. return res;
  1480. }
  1481. #endif