hashpage.c
上传用户:blenddy
上传日期:2007-01-07
资源大小:6495k
文件大小:19k
源码类别:

数据库系统

开发平台:

Unix_Linux

  1. /*-------------------------------------------------------------------------
  2.  *
  3.  * hashpage.c
  4.  *   Hash table page management code for the Postgres hash access method
  5.  *
  6.  * Copyright (c) 1994, Regents of the University of California
  7.  *
  8.  *
  9.  * IDENTIFICATION
  10.  *   $Header: /usr/local/cvsroot/pgsql/src/backend/access/hash/hashpage.c,v 1.20.2.1 1999/08/02 05:56:34 scrappy Exp $
  11.  *
  12.  * NOTES
  13.  *   Postgres hash pages look like ordinary relation pages.  The opaque
  14.  *   data at high addresses includes information about the page including
  15.  *   whether a page is an overflow page or a true bucket, the block
  16.  *   numbers of the preceding and following pages, and the overflow
  17.  *   address of the page if it is an overflow page.
  18.  *
  19.  *   The first page in a hash relation, page zero, is special -- it stores
  20.  *   information describing the hash table; it is referred to as teh
  21.  *   "meta page." Pages one and higher store the actual data.
  22.  *
  23.  *-------------------------------------------------------------------------
  24.  */
  25. #include "postgres.h"
  26. #include "access/genam.h"
  27. #include "access/hash.h"
  28. #include "miscadmin.h"
  29. static void _hash_setpagelock(Relation rel, BlockNumber blkno, int access);
  30. static void _hash_unsetpagelock(Relation rel, BlockNumber blkno, int access);
  31. static void _hash_splitpage(Relation rel, Buffer metabuf, Bucket obucket, Bucket nbucket);
  32. /*
  33.  * We use high-concurrency locking on hash indices.  There are two cases in
  34.  * which we don't do locking.  One is when we're building the index.
  35.  * Since the creating transaction has not committed, no one can see
  36.  * the index, and there's no reason to share locks.  The second case
  37.  * is when we're just starting up the database system.  We use some
  38.  * special-purpose initialization code in the relation cache manager
  39.  * (see utils/cache/relcache.c) to allow us to do indexed scans on
  40.  * the system catalogs before we'd normally be able to.  This happens
  41.  * before the lock table is fully initialized, so we can't use it.
  42.  * Strictly speaking, this violates 2pl, but we don't do 2pl on the
  43.  * system catalogs anyway.
  44.  */
  45. #define USELOCKING (!BuildingHash && !IsInitProcessingMode())
  46. /*
  47.  * _hash_metapinit() -- Initialize the metadata page of a hash index,
  48.  * the two buckets that we begin with and the initial
  49.  * bitmap page.
  50.  */
  51. void
  52. _hash_metapinit(Relation rel)
  53. {
  54. HashMetaPage metap;
  55. HashPageOpaque pageopaque;
  56. Buffer metabuf;
  57. Buffer buf;
  58. Page pg;
  59. int nbuckets;
  60. uint32 nelem; /* number elements */
  61. uint32 lg2nelem; /* _hash_log2(nelem)   */
  62. uint32 nblocks;
  63. uint16 i;
  64. /* can't be sharing this with anyone, now... */
  65. if (USELOCKING)
  66. LockRelation(rel, AccessExclusiveLock);
  67. if ((nblocks = RelationGetNumberOfBlocks(rel)) != 0)
  68. {
  69. elog(ERROR, "Cannot initialize non-empty hash table %s",
  70.  RelationGetRelationName(rel));
  71. }
  72. metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_WRITE);
  73. pg = BufferGetPage(metabuf);
  74. metap = (HashMetaPage) pg;
  75. _hash_pageinit(pg, BufferGetPageSize(metabuf));
  76. metap->hashm_magic = HASH_MAGIC;
  77. metap->hashm_version = HASH_VERSION;
  78. metap->hashm_nkeys = 0;
  79. metap->hashm_nmaps = 0;
  80. metap->hashm_ffactor = DEFAULT_FFACTOR;
  81. metap->hashm_bsize = BufferGetPageSize(metabuf);
  82. metap->hashm_bshift = _hash_log2(metap->hashm_bsize);
  83. for (i = metap->hashm_bshift; i > 0; --i)
  84. {
  85. if ((1 << i) < (metap->hashm_bsize -
  86. (MAXALIGN(sizeof(PageHeaderData)) +
  87.  MAXALIGN(sizeof(HashPageOpaqueData)))))
  88. break;
  89. }
  90. Assert(i);
  91. metap->hashm_bmsize = 1 << i;
  92. metap->hashm_procid = index_getprocid(rel, 1, HASHPROC);
  93. /*
  94.  * Make nelem = 2 rather than 0 so that we end up allocating space for
  95.  * the next greater power of two number of buckets.
  96.  */
  97. nelem = 2;
  98. lg2nelem = 1; /* _hash_log2(MAX(nelem, 2)) */
  99. nbuckets = 2; /* 1 << lg2nelem */
  100. MemSet((char *) metap->hashm_spares, 0, sizeof(metap->hashm_spares));
  101. MemSet((char *) metap->hashm_mapp, 0, sizeof(metap->hashm_mapp));
  102. metap->hashm_spares[lg2nelem] = 2; /* lg2nelem + 1 */
  103. metap->hashm_spares[lg2nelem + 1] = 2; /* lg2nelem + 1 */
  104. metap->hashm_ovflpoint = 1; /* lg2nelem */
  105. metap->hashm_lastfreed = 2;
  106. metap->hashm_maxbucket = metap->hashm_lowmask = 1; /* nbuckets - 1 */
  107. metap->hashm_highmask = 3; /* (nbuckets << 1) - 1 */
  108. pageopaque = (HashPageOpaque) PageGetSpecialPointer(pg);
  109. pageopaque->hasho_oaddr = InvalidOvflAddress;
  110. pageopaque->hasho_prevblkno = InvalidBlockNumber;
  111. pageopaque->hasho_nextblkno = InvalidBlockNumber;
  112. pageopaque->hasho_flag = LH_META_PAGE;
  113. pageopaque->hasho_bucket = -1;
  114. /*
  115.  * First bitmap page is at: splitpoint lg2nelem page offset 1 which
  116.  * turns out to be page 3. Couldn't initialize page 3  until we
  117.  * created the first two buckets above.
  118.  */
  119. if (_hash_initbitmap(rel, metap, OADDR_OF(lg2nelem, 1), lg2nelem + 1, 0))
  120. elog(ERROR, "Problem with _hash_initbitmap.");
  121. /* all done */
  122. _hash_wrtnorelbuf(rel, metabuf);
  123. /*
  124.  * initialize the first two buckets
  125.  */
  126. for (i = 0; i <= 1; i++)
  127. {
  128. buf = _hash_getbuf(rel, BUCKET_TO_BLKNO(i), HASH_WRITE);
  129. pg = BufferGetPage(buf);
  130. _hash_pageinit(pg, BufferGetPageSize(buf));
  131. pageopaque = (HashPageOpaque) PageGetSpecialPointer(pg);
  132. pageopaque->hasho_oaddr = InvalidOvflAddress;
  133. pageopaque->hasho_prevblkno = InvalidBlockNumber;
  134. pageopaque->hasho_nextblkno = InvalidBlockNumber;
  135. pageopaque->hasho_flag = LH_BUCKET_PAGE;
  136. pageopaque->hasho_bucket = i;
  137. _hash_wrtbuf(rel, buf);
  138. }
  139. _hash_relbuf(rel, metabuf, HASH_WRITE);
  140. if (USELOCKING)
  141. UnlockRelation(rel, AccessExclusiveLock);
  142. }
  143. /*
  144.  * _hash_getbuf() -- Get a buffer by block number for read or write.
  145.  *
  146.  * When this routine returns, the appropriate lock is set on the
  147.  * requested buffer its reference count is correct.
  148.  *
  149.  * XXX P_NEW is not used because, unlike the tree structures, we
  150.  * need the bucket blocks to be at certain block numbers. we must
  151.  * depend on the caller to call _hash_pageinit on the block if it
  152.  * knows that this is a new block.
  153.  */
  154. Buffer
  155. _hash_getbuf(Relation rel, BlockNumber blkno, int access)
  156. {
  157. Buffer buf;
  158. if (blkno == P_NEW)
  159. elog(ERROR, "_hash_getbuf: internal error: hash AM does not use P_NEW");
  160. switch (access)
  161. {
  162. case HASH_WRITE:
  163. case HASH_READ:
  164. _hash_setpagelock(rel, blkno, access);
  165. break;
  166. default:
  167. elog(ERROR, "_hash_getbuf: invalid access (%d) on new blk: %s",
  168.  access, RelationGetRelationName(rel));
  169. break;
  170. }
  171. buf = ReadBuffer(rel, blkno);
  172. /* ref count and lock type are correct */
  173. return buf;
  174. }
  175. /*
  176.  * _hash_relbuf() -- release a locked buffer.
  177.  */
  178. void
  179. _hash_relbuf(Relation rel, Buffer buf, int access)
  180. {
  181. BlockNumber blkno;
  182. blkno = BufferGetBlockNumber(buf);
  183. switch (access)
  184. {
  185. case HASH_WRITE:
  186. case HASH_READ:
  187. _hash_unsetpagelock(rel, blkno, access);
  188. break;
  189. default:
  190. elog(ERROR, "_hash_relbuf: invalid access (%d) on blk %x: %s",
  191.  access, blkno, RelationGetRelationName(rel));
  192. }
  193. ReleaseBuffer(buf);
  194. }
  195. /*
  196.  * _hash_wrtbuf() -- write a hash page to disk.
  197.  *
  198.  * This routine releases the lock held on the buffer and our reference
  199.  * to it. It is an error to call _hash_wrtbuf() without a write lock
  200.  * or a reference to the buffer.
  201.  */
  202. void
  203. _hash_wrtbuf(Relation rel, Buffer buf)
  204. {
  205. BlockNumber blkno;
  206. blkno = BufferGetBlockNumber(buf);
  207. WriteBuffer(buf);
  208. _hash_unsetpagelock(rel, blkno, HASH_WRITE);
  209. }
  210. /*
  211.  * _hash_wrtnorelbuf() -- write a hash page to disk, but do not release
  212.  *  our reference or lock.
  213.  *
  214.  * It is an error to call _hash_wrtnorelbuf() without a write lock
  215.  * or a reference to the buffer.
  216.  */
  217. void
  218. _hash_wrtnorelbuf(Relation rel, Buffer buf)
  219. {
  220. BlockNumber blkno;
  221. blkno = BufferGetBlockNumber(buf);
  222. WriteNoReleaseBuffer(buf);
  223. }
  224. Page
  225. _hash_chgbufaccess(Relation rel,
  226.    Buffer *bufp,
  227.    int from_access,
  228.    int to_access)
  229. {
  230. BlockNumber blkno;
  231. blkno = BufferGetBlockNumber(*bufp);
  232. switch (from_access)
  233. {
  234. case HASH_WRITE:
  235. _hash_wrtbuf(rel, *bufp);
  236. break;
  237. case HASH_READ:
  238. _hash_relbuf(rel, *bufp, from_access);
  239. break;
  240. default:
  241. elog(ERROR, "_hash_chgbufaccess: invalid access (%d) on blk %x: %s",
  242.  from_access, blkno, RelationGetRelationName(rel));
  243. break;
  244. }
  245. *bufp = _hash_getbuf(rel, blkno, to_access);
  246. return BufferGetPage(*bufp);
  247. }
  248. /*
  249.  * _hash_pageinit() -- Initialize a new page.
  250.  */
  251. void
  252. _hash_pageinit(Page page, Size size)
  253. {
  254. Assert(((PageHeader) page)->pd_lower == 0);
  255. Assert(((PageHeader) page)->pd_upper == 0);
  256. Assert(((PageHeader) page)->pd_special == 0);
  257. /*
  258.  * Cargo-cult programming -- don't really need this to be zero, but
  259.  * creating new pages is an infrequent occurrence and it makes me feel
  260.  * good when I know they're empty.
  261.  */
  262. MemSet(page, 0, size);
  263. PageInit(page, size, sizeof(HashPageOpaqueData));
  264. }
  265. static void
  266. _hash_setpagelock(Relation rel,
  267.   BlockNumber blkno,
  268.   int access)
  269. {
  270. if (USELOCKING)
  271. {
  272. switch (access)
  273. {
  274. case HASH_WRITE:
  275. LockPage(rel, blkno, ExclusiveLock);
  276. break;
  277. case HASH_READ:
  278. LockPage(rel, blkno, ShareLock);
  279. break;
  280. default:
  281. elog(ERROR, "_hash_setpagelock: invalid access (%d) on blk %x: %s",
  282.  access, blkno, RelationGetRelationName(rel));
  283. break;
  284. }
  285. }
  286. }
  287. static void
  288. _hash_unsetpagelock(Relation rel,
  289. BlockNumber blkno,
  290. int access)
  291. {
  292. if (USELOCKING)
  293. {
  294. switch (access)
  295. {
  296. case HASH_WRITE:
  297. UnlockPage(rel, blkno, ExclusiveLock);
  298. break;
  299. case HASH_READ:
  300. UnlockPage(rel, blkno, ShareLock);
  301. break;
  302. default:
  303. elog(ERROR, "_hash_unsetpagelock: invalid access (%d) on blk %x: %s",
  304.  access, blkno, RelationGetRelationName(rel));
  305. break;
  306. }
  307. }
  308. }
  309. void
  310. _hash_pagedel(Relation rel, ItemPointer tid)
  311. {
  312. Buffer buf;
  313. Buffer metabuf;
  314. Page page;
  315. BlockNumber blkno;
  316. OffsetNumber offno;
  317. HashMetaPage metap;
  318. HashPageOpaque opaque;
  319. blkno = ItemPointerGetBlockNumber(tid);
  320. offno = ItemPointerGetOffsetNumber(tid);
  321. buf = _hash_getbuf(rel, blkno, HASH_WRITE);
  322. page = BufferGetPage(buf);
  323. _hash_checkpage(page, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
  324. opaque = (HashPageOpaque) PageGetSpecialPointer(page);
  325. PageIndexTupleDelete(page, offno);
  326. _hash_wrtnorelbuf(rel, buf);
  327. if (PageIsEmpty(page) && (opaque->hasho_flag & LH_OVERFLOW_PAGE))
  328. {
  329. buf = _hash_freeovflpage(rel, buf);
  330. if (BufferIsValid(buf))
  331. _hash_relbuf(rel, buf, HASH_WRITE);
  332. }
  333. else
  334. _hash_relbuf(rel, buf, HASH_WRITE);
  335. metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_WRITE);
  336. metap = (HashMetaPage) BufferGetPage(metabuf);
  337. _hash_checkpage((Page) metap, LH_META_PAGE);
  338. ++metap->hashm_nkeys;
  339. _hash_wrtbuf(rel, metabuf);
  340. }
  341. void
  342. _hash_expandtable(Relation rel, Buffer metabuf)
  343. {
  344. HashMetaPage metap;
  345. Bucket old_bucket;
  346. Bucket new_bucket;
  347. uint32 spare_ndx;
  348. /*   elog(DEBUG, "_hash_expandtable: expanding..."); */
  349. metap = (HashMetaPage) BufferGetPage(metabuf);
  350. _hash_checkpage((Page) metap, LH_META_PAGE);
  351. metap = (HashMetaPage) _hash_chgbufaccess(rel, &metabuf, HASH_READ, HASH_WRITE);
  352. new_bucket = ++metap->MAX_BUCKET;
  353. metap = (HashMetaPage) _hash_chgbufaccess(rel, &metabuf, HASH_WRITE, HASH_READ);
  354. old_bucket = (metap->MAX_BUCKET & metap->LOW_MASK);
  355. /*
  356.  * If the split point is increasing (MAX_BUCKET's log base 2 *
  357.  * increases), we need to copy the current contents of the spare split
  358.  * bucket to the next bucket.
  359.  */
  360. spare_ndx = _hash_log2(metap->MAX_BUCKET + 1);
  361. if (spare_ndx > metap->OVFL_POINT)
  362. {
  363. metap = (HashMetaPage) _hash_chgbufaccess(rel, &metabuf, HASH_READ, HASH_WRITE);
  364. metap->SPARES[spare_ndx] = metap->SPARES[metap->OVFL_POINT];
  365. metap->OVFL_POINT = spare_ndx;
  366. metap = (HashMetaPage) _hash_chgbufaccess(rel, &metabuf, HASH_WRITE, HASH_READ);
  367. }
  368. if (new_bucket > metap->HIGH_MASK)
  369. {
  370. /* Starting a new doubling */
  371. metap = (HashMetaPage) _hash_chgbufaccess(rel, &metabuf, HASH_READ, HASH_WRITE);
  372. metap->LOW_MASK = metap->HIGH_MASK;
  373. metap->HIGH_MASK = new_bucket | metap->LOW_MASK;
  374. metap = (HashMetaPage) _hash_chgbufaccess(rel, &metabuf, HASH_WRITE, HASH_READ);
  375. }
  376. /* Relocate records to the new bucket */
  377. _hash_splitpage(rel, metabuf, old_bucket, new_bucket);
  378. }
  379. /*
  380.  * _hash_splitpage -- split 'obucket' into 'obucket' and 'nbucket'
  381.  *
  382.  * this routine is actually misnamed -- we are splitting a bucket that
  383.  * consists of a base bucket page and zero or more overflow (bucket
  384.  * chain) pages.
  385.  */
  386. static void
  387. _hash_splitpage(Relation rel,
  388. Buffer metabuf,
  389. Bucket obucket,
  390. Bucket nbucket)
  391. {
  392. Bucket bucket;
  393. Buffer obuf;
  394. Buffer nbuf;
  395. Buffer ovflbuf;
  396. BlockNumber oblkno;
  397. BlockNumber nblkno;
  398. bool null;
  399. Datum datum;
  400. HashItem hitem;
  401. HashPageOpaque oopaque;
  402. HashPageOpaque nopaque;
  403. HashMetaPage metap;
  404. IndexTuple itup;
  405. int itemsz;
  406. OffsetNumber ooffnum;
  407. OffsetNumber noffnum;
  408. OffsetNumber omaxoffnum;
  409. Page opage;
  410. Page npage;
  411. TupleDesc itupdesc;
  412. /*   elog(DEBUG, "_hash_splitpage: splitting %d into %d,%d",
  413.  obucket, obucket, nbucket);
  414. */
  415. metap = (HashMetaPage) BufferGetPage(metabuf);
  416. _hash_checkpage((Page) metap, LH_META_PAGE);
  417. /* get the buffers & pages */
  418. oblkno = BUCKET_TO_BLKNO(obucket);
  419. nblkno = BUCKET_TO_BLKNO(nbucket);
  420. obuf = _hash_getbuf(rel, oblkno, HASH_WRITE);
  421. nbuf = _hash_getbuf(rel, nblkno, HASH_WRITE);
  422. opage = BufferGetPage(obuf);
  423. npage = BufferGetPage(nbuf);
  424. /* initialize the new bucket */
  425. _hash_pageinit(npage, BufferGetPageSize(nbuf));
  426. nopaque = (HashPageOpaque) PageGetSpecialPointer(npage);
  427. nopaque->hasho_prevblkno = InvalidBlockNumber;
  428. nopaque->hasho_nextblkno = InvalidBlockNumber;
  429. nopaque->hasho_flag = LH_BUCKET_PAGE;
  430. nopaque->hasho_oaddr = InvalidOvflAddress;
  431. nopaque->hasho_bucket = nbucket;
  432. _hash_wrtnorelbuf(rel, nbuf);
  433. /*
  434.  * make sure the old bucket isn't empty.  advance 'opage' and friends
  435.  * through the overflow bucket chain until we find a non-empty page.
  436.  *
  437.  * XXX we should only need this once, if we are careful to preserve the
  438.  * invariant that overflow pages are never empty.
  439.  */
  440. _hash_checkpage(opage, LH_BUCKET_PAGE);
  441. oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
  442. if (PageIsEmpty(opage))
  443. {
  444. oblkno = oopaque->hasho_nextblkno;
  445. _hash_relbuf(rel, obuf, HASH_WRITE);
  446. if (!BlockNumberIsValid(oblkno))
  447. {
  448. /*
  449.  * the old bucket is completely empty; of course, the new
  450.  * bucket will be as well, but since it's a base bucket page
  451.  * we don't care.
  452.  */
  453. _hash_relbuf(rel, nbuf, HASH_WRITE);
  454. return;
  455. }
  456. obuf = _hash_getbuf(rel, oblkno, HASH_WRITE);
  457. opage = BufferGetPage(obuf);
  458. _hash_checkpage(opage, LH_OVERFLOW_PAGE);
  459. if (PageIsEmpty(opage))
  460. elog(ERROR, "_hash_splitpage: empty overflow page %d", oblkno);
  461. oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
  462. }
  463. /*
  464.  * we are now guaranteed that 'opage' is not empty.  partition the
  465.  * tuples in the old bucket between the old bucket and the new bucket,
  466.  * advancing along their respective overflow bucket chains and adding
  467.  * overflow pages as needed.
  468.  */
  469. ooffnum = FirstOffsetNumber;
  470. omaxoffnum = PageGetMaxOffsetNumber(opage);
  471. for (;;)
  472. {
  473. /*
  474.  * at each iteration through this loop, each of these variables
  475.  * should be up-to-date: obuf opage oopaque ooffnum omaxoffnum
  476.  */
  477. /* check if we're at the end of the page */
  478. if (ooffnum > omaxoffnum)
  479. {
  480. /* at end of page, but check for overflow page */
  481. oblkno = oopaque->hasho_nextblkno;
  482. if (BlockNumberIsValid(oblkno))
  483. {
  484. /*
  485.  * we ran out of tuples on this particular page, but we
  486.  * have more overflow pages; re-init values.
  487.  */
  488. _hash_wrtbuf(rel, obuf);
  489. obuf = _hash_getbuf(rel, oblkno, HASH_WRITE);
  490. opage = BufferGetPage(obuf);
  491. _hash_checkpage(opage, LH_OVERFLOW_PAGE);
  492. oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
  493. /* we're guaranteed that an ovfl page has at least 1 tuple */
  494. if (PageIsEmpty(opage))
  495. {
  496. elog(ERROR, "_hash_splitpage: empty ovfl page %d!",
  497.  oblkno);
  498. }
  499. ooffnum = FirstOffsetNumber;
  500. omaxoffnum = PageGetMaxOffsetNumber(opage);
  501. }
  502. else
  503. {
  504. /*
  505.  * we're at the end of the bucket chain, so now we're
  506.  * really done with everything.  before quitting, call
  507.  * _hash_squeezebucket to ensure the tuples in the bucket
  508.  * (including the overflow pages) are packed as tightly as
  509.  * possible.
  510.  */
  511. _hash_wrtbuf(rel, obuf);
  512. _hash_wrtbuf(rel, nbuf);
  513. _hash_squeezebucket(rel, metap, obucket);
  514. return;
  515. }
  516. }
  517. /* hash on the tuple */
  518. hitem = (HashItem) PageGetItem(opage, PageGetItemId(opage, ooffnum));
  519. itup = &(hitem->hash_itup);
  520. itupdesc = RelationGetDescr(rel);
  521. datum = index_getattr(itup, 1, itupdesc, &null);
  522. bucket = _hash_call(rel, metap, datum);
  523. if (bucket == nbucket)
  524. {
  525. /*
  526.  * insert the tuple into the new bucket.  if it doesn't fit on
  527.  * the current page in the new bucket, we must allocate a new
  528.  * overflow page and place the tuple on that page instead.
  529.  */
  530. itemsz = IndexTupleDSize(hitem->hash_itup)
  531. + (sizeof(HashItemData) - sizeof(IndexTupleData));
  532. itemsz = MAXALIGN(itemsz);
  533. if (PageGetFreeSpace(npage) < itemsz)
  534. {
  535. ovflbuf = _hash_addovflpage(rel, &metabuf, nbuf);
  536. _hash_wrtbuf(rel, nbuf);
  537. nbuf = ovflbuf;
  538. npage = BufferGetPage(nbuf);
  539. _hash_checkpage(npage, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
  540. }
  541. noffnum = OffsetNumberNext(PageGetMaxOffsetNumber(npage));
  542. PageAddItem(npage, (Item) hitem, itemsz, noffnum, LP_USED);
  543. _hash_wrtnorelbuf(rel, nbuf);
  544. /*
  545.  * now delete the tuple from the old bucket.  after this
  546.  * section of code, 'ooffnum' will actually point to the
  547.  * ItemId to which we would point if we had advanced it before
  548.  * the deletion (PageIndexTupleDelete repacks the ItemId
  549.  * array). this also means that 'omaxoffnum' is exactly one
  550.  * less than it used to be, so we really can just decrement it
  551.  * instead of calling PageGetMaxOffsetNumber.
  552.  */
  553. PageIndexTupleDelete(opage, ooffnum);
  554. _hash_wrtnorelbuf(rel, obuf);
  555. omaxoffnum = OffsetNumberPrev(omaxoffnum);
  556. /*
  557.  * tidy up.  if the old page was an overflow page and it is
  558.  * now empty, we must free it (we want to preserve the
  559.  * invariant that overflow pages cannot be empty).
  560.  */
  561. if (PageIsEmpty(opage) &&
  562. (oopaque->hasho_flag & LH_OVERFLOW_PAGE))
  563. {
  564. obuf = _hash_freeovflpage(rel, obuf);
  565. /* check that we're not through the bucket chain */
  566. if (BufferIsInvalid(obuf))
  567. {
  568. _hash_wrtbuf(rel, nbuf);
  569. _hash_squeezebucket(rel, metap, obucket);
  570. return;
  571. }
  572. /*
  573.  * re-init. again, we're guaranteed that an ovfl page has
  574.  * at least one tuple.
  575.  */
  576. opage = BufferGetPage(obuf);
  577. _hash_checkpage(opage, LH_OVERFLOW_PAGE);
  578. oblkno = BufferGetBlockNumber(obuf);
  579. oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
  580. if (PageIsEmpty(opage))
  581. {
  582. elog(ERROR, "_hash_splitpage: empty overflow page %d",
  583.  oblkno);
  584. }
  585. ooffnum = FirstOffsetNumber;
  586. omaxoffnum = PageGetMaxOffsetNumber(opage);
  587. }
  588. }
  589. else
  590. {
  591. /*
  592.  * the tuple stays on this page.  we didn't move anything, so
  593.  * we didn't delete anything and therefore we don't have to
  594.  * change 'omaxoffnum'.
  595.  *
  596.  * XXX any hash value from [0, nbucket-1] will map to this
  597.  * bucket, which doesn't make sense to me.
  598.  */
  599. ooffnum = OffsetNumberNext(ooffnum);
  600. }
  601. }
  602. /* NOTREACHED */
  603. }