psort.c
上传用户:blenddy
上传日期:2007-01-07
资源大小:6495k
文件大小:26k
源码类别:

数据库系统

开发平台:

Unix_Linux

  1. /*
  2.  * psort.c
  3.  *   Polyphase merge sort.
  4.  *
  5.  * Copyright (c) 1994, Regents of the University of California
  6.  *
  7.  *   $Id: psort.c,v 1.53.2.1 1999/08/02 05:25:18 scrappy Exp $
  8.  *
  9.  * NOTES
  10.  * Sorts the first relation into the second relation.
  11.  *
  12.  * The old psort.c's routines formed a temporary relation from the merged
  13.  * sort files. This version keeps the files around instead of generating the
  14.  * relation from them, and provides interface functions to the file so that
  15.  * you can grab tuples, mark a position in the file, restore a position in the
  16.  * file. You must now explicitly call an interface function to end the sort,
  17.  * psort_end, when you are done.
  18.  * Now most of the global variables are stuck in the Sort nodes, and
  19.  * accessed from there (they are passed to all the psort routines) so that
  20.  * each sort running has its own separate state. This is facilitated by having
  21.  * the Sort nodes passed in to all the interface functions.
  22.  * The one global variable that all the sorts still share is SortMemory.
  23.  * You should now be allowed to run two or more psorts concurrently,
  24.  * so long as the memory they eat up is not greater than SORTMEM, the initial
  25.  * value of SortMemory. -Rex 2.15.1995
  26.  *
  27.  *   Use the tape-splitting method (Knuth, Vol. III, pp281-86) in the future.
  28.  *
  29.  * Arguments? Variables?
  30.  * MAXMERGE, MAXTAPES
  31.  *
  32.  */
  33. #include <math.h>
  34. #include <sys/types.h>
  35. #include <unistd.h>
  36. #include "postgres.h"
  37. #include "access/heapam.h"
  38. #include "executor/execdebug.h"
  39. #include "executor/executor.h"
  40. #include "miscadmin.h"
  41. #include "utils/psort.h"
  42. static bool createfirstrun(Sort *node);
  43. static bool createrun(Sort *node, BufFile *file);
  44. static void destroytape(BufFile *file);
  45. static void dumptuples(BufFile *file, Sort *node);
  46. static BufFile *gettape(void);
  47. static void initialrun(Sort *node);
  48. static void inittapes(Sort *node);
  49. static void merge(Sort *node, struct tape * dest);
  50. static BufFile *mergeruns(Sort *node);
  51. static int _psort_cmp(HeapTuple *ltup, HeapTuple *rtup);
  52. /*
  53.  * tlenzero used to delimit runs; both vars below must have
  54.  * the same size as HeapTuple->t_len
  55.  */
  56. static unsigned int tlenzero = 0;
  57. static unsigned int tlendummy;
  58. /* these are used by _psort_cmp, and are set just before calling qsort() */
  59. static TupleDesc PsortTupDesc;
  60. static ScanKey PsortKeys;
  61. static int PsortNkeys;
  62. /*
  63.  * old psort global variables
  64.  *
  65.  * (These are the global variables from the old psort. They are still used,
  66.  * but are now accessed from Sort nodes using the PS macro. Note that while
  67.  * these variables will be accessed by PS(node)->whatever, they will still
  68.  * be called by their original names within the comments! -Rex 2.10.1995)
  69.  *
  70.  * LeftistContextData treeContext;
  71.  *
  72.  * static int TapeRange; number of tapes - 1 (T)
  73.  * static int Level; (l)
  74.  * static int TotalDummy; summation of tp_dummy
  75.  * static struct tape *Tape;
  76.  *
  77.  * static int BytesRead; to keep track of # of IO
  78.  * static int BytesWritten;
  79.  *
  80.  * struct leftist *Tuples; current tuples in memory
  81.  *
  82.  * BufFile *psort_grab_file; this holds tuples grabbed
  83.  *    from merged sort runs
  84.  * long psort_current; current file position
  85.  * long psort_saved; file position saved for
  86.  *    mark and restore
  87.  */
  88. /*
  89.  * PS - Macro to access and cast psortstate from a Sort node
  90.  */
  91. #define PS(N) ((Psortstate *)N->psortstate)
  92. /*
  93.  * psort_begin - polyphase merge sort entry point. Sorts the subplan
  94.  *   into a temporary file psort_grab_file. After
  95.  *   this is called, calling the interface function
  96.  *   psort_grabtuple iteratively will get you the sorted
  97.  *   tuples. psort_end then finishes the sort off, after
  98.  *   all the tuples have been grabbed.
  99.  *
  100.  *   Allocates and initializes sort node's psort state.
  101.  */
  102. bool
  103. psort_begin(Sort *node, int nkeys, ScanKey key)
  104. {
  105. node->psortstate = (struct Psortstate *) palloc(sizeof(struct Psortstate));
  106. AssertArg(nkeys >= 1);
  107. AssertArg(key[0].sk_attno != 0);
  108. AssertArg(key[0].sk_procedure != 0);
  109. PS(node)->BytesRead = 0;
  110. PS(node)->BytesWritten = 0;
  111. PS(node)->treeContext.tupDesc = ExecGetTupType(outerPlan((Plan *) node));
  112. PS(node)->treeContext.nKeys = nkeys;
  113. PS(node)->treeContext.scanKeys = key;
  114. PS(node)->treeContext.sortMem = SortMem * 1024;
  115. PS(node)->Tuples = NULL;
  116. PS(node)->tupcount = 0;
  117. PS(node)->using_tape_files = false;
  118. PS(node)->all_fetched = false;
  119. PS(node)->psort_grab_file = NULL;
  120. PS(node)->memtuples = NULL;
  121. initialrun(node);
  122. if (PS(node)->tupcount == 0)
  123. return false;
  124. if (PS(node)->using_tape_files && PS(node)->psort_grab_file == NULL)
  125. PS(node)->psort_grab_file = mergeruns(node);
  126. PS(node)->psort_current = 0;
  127. PS(node)->psort_saved = 0;
  128. return true;
  129. }
  130. /*
  131.  * inittapes - initializes the tapes
  132.  * - (polyphase merge Alg.D(D1)--Knuth, Vol.3, p.270)
  133.  * Returns:
  134.  * number of allocated tapes
  135.  */
  136. static void
  137. inittapes(Sort *node)
  138. {
  139. int i;
  140. struct tape *tp;
  141. Assert(node != (Sort *) NULL);
  142. Assert(PS(node) != (Psortstate *) NULL);
  143. /*
  144.  * ASSERT(ntapes >= 3 && ntapes <= MAXTAPES, "inittapes: Invalid
  145.  * number of tapes to initialize.n");
  146.  */
  147. tp = PS(node)->Tape;
  148. for (i = 0; i < MAXTAPES && (tp->tp_file = gettape()) != NULL; i++)
  149. {
  150. tp->tp_dummy = 1;
  151. tp->tp_fib = 1;
  152. tp->tp_prev = tp - 1;
  153. tp++;
  154. }
  155. PS(node)->TapeRange = --tp - PS(node)->Tape;
  156. tp->tp_dummy = 0;
  157. tp->tp_fib = 0;
  158. PS(node)->Tape[0].tp_prev = tp;
  159. if (PS(node)->TapeRange <= 1)
  160. elog(ERROR, "inittapes: Could only allocate %d < 3 tapesn",
  161.  PS(node)->TapeRange + 1);
  162. PS(node)->Level = 1;
  163. PS(node)->TotalDummy = PS(node)->TapeRange;
  164. PS(node)->using_tape_files = true;
  165. }
  166. /*
  167.  * PUTTUP - writes the next tuple
  168.  * ENDRUN - mark end of run
  169.  * GETLEN - reads the length of the next tuple
  170.  * ALLOCTUP - returns space for the new tuple
  171.  * SETTUPLEN - stores the length into the tuple
  172.  * GETTUP - reads the tuple
  173.  *
  174.  * Note:
  175.  * LEN field must be as HeapTuple->t_len; FP is a stream
  176.  */
  177. #define PUTTUP(NODE, TUP, FP) 
  178. (TUP)->t_len += HEAPTUPLESIZE, 
  179. ((Psortstate *)NODE->psortstate)->BytesWritten += (TUP)->t_len, 
  180. BufFileWrite(FP, (char *)TUP, (TUP)->t_len), 
  181. BufFileWrite(FP, (char *)&((TUP)->t_len), sizeof(tlendummy)), 
  182. (TUP)->t_len -= HEAPTUPLESIZE 
  183. )
  184. #define ENDRUN(FP) BufFileWrite(FP, (char *)&tlenzero, sizeof(tlenzero))
  185. #define GETLEN(LEN, FP) BufFileRead(FP, (char *)&(LEN), sizeof(tlenzero))
  186. #define ALLOCTUP(LEN) ((HeapTuple)palloc((unsigned)LEN))
  187. #define FREE(x) pfree((char *) x)
  188. #define GETTUP(NODE, TUP, LEN, FP) 
  189. IncrProcessed(), 
  190. ((Psortstate *)NODE->psortstate)->BytesRead += (LEN) - sizeof(tlenzero), 
  191. BufFileRead(FP, (char *)(TUP) + sizeof(tlenzero), (LEN) - sizeof(tlenzero)), 
  192. (TUP)->t_data = (HeapTupleHeader) ((char *)(TUP) + HEAPTUPLESIZE), 
  193. BufFileRead(FP, (char *)&tlendummy, sizeof(tlendummy)) 
  194. )
  195. #define SETTUPLEN(TUP, LEN) ((TUP)->t_len = (LEN) - HEAPTUPLESIZE)
  196. #define rewind(FP) BufFileSeek(FP, 0L, SEEK_SET)
  197.  /*
  198.   * USEMEM - record use of memory FREEMEM    - record
  199.   * freeing of memory FULLMEM   - 1 iff a tuple will fit
  200.   */
  201. #define USEMEM(NODE,AMT) PS(node)->treeContext.sortMem -= (AMT)
  202. #define FREEMEM(NODE,AMT) PS(node)->treeContext.sortMem += (AMT)
  203. #define LACKMEM(NODE) (PS(node)->treeContext.sortMem <= BLCKSZ) /* not accurate */
  204. #define TRACEMEM(FUNC)
  205. #define TRACEOUT(FUNC, TUP)
  206. /*
  207.  * initialrun - distributes tuples from the relation
  208.  * - (replacement selection(R2-R3)--Knuth, Vol.3, p.257)
  209.  * - (polyphase merge Alg.D(D2-D4)--Knuth, Vol.3, p.271)
  210.  *
  211.  * Explanation:
  212.  * Tuples are distributed to the tapes as in Algorithm D.
  213.  * A "tuple" with t_size == 0 is used to mark the end of a run.
  214.  *
  215.  * Note:
  216.  * The replacement selection algorithm has been modified
  217.  * to go from R1 directly to R3 skipping R2 the first time.
  218.  *
  219.  * Maybe should use closer(rdesc) before return
  220.  * Perhaps should adjust the number of tapes if less than n.
  221.  * used--v. likely to have problems in mergeruns().
  222.  * Must know if should open/close files before each
  223.  * call to  psort()? If should--messy??
  224.  *
  225.  * Possible optimization:
  226.  * put the first xxx runs in quickly--problem here since
  227.  * I (perhaps prematurely) combined the 2 algorithms.
  228.  * Also, perhaps allocate tapes when needed. Split into 2 funcs.
  229.  */
  230. static void
  231. initialrun(Sort *node)
  232. {
  233. /* struct tuple   *tup; */
  234. struct tape *tp;
  235. int baseruns; /* D:(a) */
  236. int extrapasses; /* EOF */
  237. Assert(node != (Sort *) NULL);
  238. Assert(PS(node) != (Psortstate *) NULL);
  239. tp = PS(node)->Tape;
  240. if (createfirstrun(node))
  241. {
  242. Assert(PS(node)->using_tape_files);
  243. extrapasses = 0;
  244. }
  245. else
  246. /* all tuples fetched */
  247. {
  248. if (!PS(node)->using_tape_files) /* empty or sorted in
  249.  * memory */
  250. return;
  251. /*
  252.  * if PS(node)->Tuples == NULL then we have single (sorted) run
  253.  * which can be used as result grab file! So, we may avoid
  254.  * mergeruns - it will just copy this run to new file.
  255.  */
  256. if (PS(node)->Tuples == NULL)
  257. {
  258. PS(node)->psort_grab_file = PS(node)->Tape->tp_file;
  259. rewind(PS(node)->psort_grab_file);
  260. return;
  261. }
  262. extrapasses = 2;
  263. }
  264. for (;;)
  265. {
  266. tp->tp_dummy--;
  267. PS(node)->TotalDummy--;
  268. if (tp->tp_dummy < (tp + 1)->tp_dummy)
  269. tp++;
  270. else
  271. {
  272. if (tp->tp_dummy != 0)
  273. tp = PS(node)->Tape;
  274. else
  275. {
  276. PS(node)->Level++;
  277. baseruns = PS(node)->Tape[0].tp_fib;
  278. for (tp = PS(node)->Tape;
  279.  tp - PS(node)->Tape < PS(node)->TapeRange; tp++)
  280. {
  281. PS(node)->TotalDummy += (tp->tp_dummy = baseruns
  282.  + (tp + 1)->tp_fib
  283.  - tp->tp_fib);
  284. tp->tp_fib = baseruns
  285. + (tp + 1)->tp_fib;
  286. }
  287. tp = PS(node)->Tape; /* D4 */
  288. } /* D3 */
  289. }
  290. if (extrapasses)
  291. {
  292. if (--extrapasses)
  293. {
  294. dumptuples(tp->tp_file, node);
  295. ENDRUN(tp->tp_file);
  296. continue;
  297. }
  298. else
  299. break;
  300. }
  301. if ((bool) createrun(node, tp->tp_file) == false)
  302. extrapasses = 1 + (PS(node)->Tuples != NULL);
  303. /* D2 */
  304. }
  305. for (tp = PS(node)->Tape + PS(node)->TapeRange; tp >= PS(node)->Tape; tp--)
  306. rewind(tp->tp_file); /* D. */
  307. }
  308. /*
  309.  * createfirstrun - tries to sort tuples in memory using qsort
  310.  * until LACKMEM; if not enough memory then switches
  311.  * to tape method
  312.  *
  313.  * Returns:
  314.  * FALSE iff process through end of relation
  315.  * Tuples contains the tuples for the following run upon exit
  316.  */
  317. static bool
  318. createfirstrun(Sort *node)
  319. {
  320. HeapTuple tup;
  321. bool foundeor = false;
  322. HeapTuple  *memtuples;
  323. int t_last = -1;
  324. int t_free = 1000;
  325. TupleTableSlot *cr_slot;
  326. Assert(node != (Sort *) NULL);
  327. Assert(PS(node) != (Psortstate *) NULL);
  328. Assert(!PS(node)->using_tape_files);
  329. Assert(PS(node)->memtuples == NULL);
  330. Assert(PS(node)->tupcount == 0);
  331. if (LACKMEM(node))
  332. elog(ERROR, "psort: LACKMEM in createfirstrun");
  333. memtuples = palloc(t_free * sizeof(HeapTuple));
  334. for (;;)
  335. {
  336. if (LACKMEM(node))
  337. break;
  338. /*
  339.  * About to call ExecProcNode, it can mess up the state if it
  340.  * eventually calls another Sort node. So must stow it away here
  341.  * for the meantime. -Rex
  342.  * 2.2.1995
  343.  */
  344. cr_slot = ExecProcNode(outerPlan((Plan *) node), (Plan *) node);
  345. if (TupIsNull(cr_slot))
  346. {
  347. foundeor = true;
  348. break;
  349. }
  350. tup = heap_copytuple(cr_slot->val);
  351. ExecClearTuple(cr_slot);
  352. IncrProcessed();
  353. USEMEM(node, tup->t_len);
  354. TRACEMEM(createfirstrun);
  355. if (t_free <= 0)
  356. {
  357. t_free = 1000;
  358. memtuples = repalloc(memtuples,
  359.   (t_last + t_free + 1) * sizeof(HeapTuple));
  360. }
  361. t_last++;
  362. t_free--;
  363. memtuples[t_last] = tup;
  364. }
  365. if (t_last < 0) /* empty */
  366. {
  367. Assert(foundeor);
  368. pfree(memtuples);
  369. return false;
  370. }
  371. t_last++;
  372. PS(node)->tupcount = t_last;
  373. PsortTupDesc = PS(node)->treeContext.tupDesc;
  374. PsortKeys = PS(node)->treeContext.scanKeys;
  375. PsortNkeys = PS(node)->treeContext.nKeys;
  376. qsort(memtuples, t_last, sizeof(HeapTuple),
  377.   (int (*) (const void *, const void *)) _psort_cmp);
  378. if (LACKMEM(node)) /* in-memory sort is impossible */
  379. {
  380. int t;
  381. Assert(!foundeor);
  382. inittapes(node);
  383. /* put tuples into leftist tree for createrun */
  384. for (t = t_last - 1; t >= 0; t--)
  385. puttuple(&PS(node)->Tuples, memtuples[t], 0, &PS(node)->treeContext);
  386. pfree(memtuples);
  387. foundeor = !createrun(node, PS(node)->Tape->tp_file);
  388. }
  389. else
  390. {
  391. Assert(foundeor);
  392. PS(node)->memtuples = memtuples;
  393. }
  394. return !foundeor;
  395. }
  396. /*
  397.  * createrun - places the next run on file, grabbing the tuples by
  398.  * executing the subplan passed in
  399.  *
  400.  * Uses:
  401.  * Tuples, which should contain any tuples for this run
  402.  *
  403.  * Returns:
  404.  * FALSE iff process through end of relation
  405.  * Tuples contains the tuples for the following run upon exit
  406.  */
  407. static bool
  408. createrun(Sort *node, BufFile *file)
  409. {
  410. HeapTuple lasttuple;
  411. HeapTuple tup;
  412. TupleTableSlot *cr_slot;
  413. HeapTuple  *memtuples;
  414. int t_last = -1;
  415. int t_free = 1000;
  416. bool foundeor = false;
  417. short junk;
  418. Assert(node != (Sort *) NULL);
  419. Assert(PS(node) != (Psortstate *) NULL);
  420. Assert(PS(node)->using_tape_files);
  421. lasttuple = NULL;
  422. memtuples = palloc(t_free * sizeof(HeapTuple));
  423. for (;;)
  424. {
  425. while (LACKMEM(node) && PS(node)->Tuples != NULL)
  426. {
  427. if (lasttuple != NULL)
  428. {
  429. FREEMEM(node, lasttuple->t_len);
  430. FREE(lasttuple);
  431. TRACEMEM(createrun);
  432. }
  433. lasttuple = gettuple(&PS(node)->Tuples, &junk,
  434.  &PS(node)->treeContext);
  435. PUTTUP(node, lasttuple, file);
  436. TRACEOUT(createrun, lasttuple);
  437. }
  438. if (LACKMEM(node))
  439. break;
  440. /*
  441.  * About to call ExecProcNode, it can mess up the state if it
  442.  * eventually calls another Sort node. So must stow it away here
  443.  * for the meantime. -Rex
  444.  * 2.2.1995
  445.  */
  446. cr_slot = ExecProcNode(outerPlan((Plan *) node), (Plan *) node);
  447. if (TupIsNull(cr_slot))
  448. {
  449. foundeor = true;
  450. break;
  451. }
  452. else
  453. {
  454. tup = heap_copytuple(cr_slot->val);
  455. ExecClearTuple(cr_slot);
  456. PS(node)->tupcount++;
  457. }
  458. IncrProcessed();
  459. USEMEM(node, tup->t_len);
  460. TRACEMEM(createrun);
  461. if (lasttuple != NULL && tuplecmp(tup, lasttuple,
  462.   &PS(node)->treeContext))
  463. {
  464. if (t_free <= 0)
  465. {
  466. t_free = 1000;
  467. memtuples = repalloc(memtuples,
  468.   (t_last + t_free + 1) * sizeof(HeapTuple));
  469. }
  470. t_last++;
  471. t_free--;
  472. memtuples[t_last] = tup;
  473. }
  474. else
  475. puttuple(&PS(node)->Tuples, tup, 0, &PS(node)->treeContext);
  476. }
  477. if (lasttuple != NULL)
  478. {
  479. FREEMEM(node, lasttuple->t_len);
  480. FREE(lasttuple);
  481. TRACEMEM(createrun);
  482. }
  483. dumptuples(file, node);
  484. ENDRUN(file); /* delimit the end of the run */
  485. t_last++;
  486. /* put tuples for the next run into leftist tree */
  487. if (t_last >= 1)
  488. {
  489. int t;
  490. PsortTupDesc = PS(node)->treeContext.tupDesc;
  491. PsortKeys = PS(node)->treeContext.scanKeys;
  492. PsortNkeys = PS(node)->treeContext.nKeys;
  493. qsort(memtuples, t_last, sizeof(HeapTuple),
  494.   (int (*) (const void *, const void *)) _psort_cmp);
  495. for (t = t_last - 1; t >= 0; t--)
  496. puttuple(&PS(node)->Tuples, memtuples[t], 0, &PS(node)->treeContext);
  497. }
  498. pfree(memtuples);
  499. return !foundeor;
  500. }
  501. /*
  502.  * mergeruns - merges all runs from input tapes
  503.  *   (polyphase merge Alg.D(D6)--Knuth, Vol.3, p271)
  504.  *
  505.  * Returns:
  506.  * file of tuples in order
  507.  */
  508. static BufFile *
  509. mergeruns(Sort *node)
  510. {
  511. struct tape *tp;
  512. Assert(node != (Sort *) NULL);
  513. Assert(PS(node) != (Psortstate *) NULL);
  514. Assert(PS(node)->using_tape_files == true);
  515. tp = PS(node)->Tape + PS(node)->TapeRange;
  516. merge(node, tp);
  517. rewind(tp->tp_file);
  518. while (--PS(node)->Level != 0)
  519. {
  520. tp = tp->tp_prev;
  521. rewind(tp->tp_file);
  522. merge(node, tp);
  523. rewind(tp->tp_file);
  524. }
  525. return tp->tp_file;
  526. }
  527. /*
  528.  * merge - handles a single merge of the tape
  529.  *   (polyphase merge Alg.D(D5)--Knuth, Vol.3, p271)
  530.  */
  531. static void
  532. merge(Sort *node, struct tape * dest)
  533. {
  534. HeapTuple tup;
  535. struct tape *lasttp; /* (TAPE[P]) */
  536. struct tape *tp;
  537. struct leftist *tuples;
  538. BufFile    *destfile;
  539. int times; /* runs left to merge */
  540. int outdummy; /* complete dummy runs */
  541. short fromtape;
  542. unsigned int tuplen;
  543. Assert(node != (Sort *) NULL);
  544. Assert(PS(node) != (Psortstate *) NULL);
  545. Assert(PS(node)->using_tape_files == true);
  546. lasttp = dest->tp_prev;
  547. times = lasttp->tp_fib;
  548. for (tp = lasttp; tp != dest; tp = tp->tp_prev)
  549. tp->tp_fib -= times;
  550. tp->tp_fib += times;
  551. /* Tape[].tp_fib (A[]) is set to proper exit values */
  552. if (PS(node)->TotalDummy < PS(node)->TapeRange) /* no complete dummy
  553.  * runs */
  554. outdummy = 0;
  555. else
  556. {
  557. outdummy = PS(node)->TotalDummy; /* a large positive number */
  558. for (tp = lasttp; tp != dest; tp = tp->tp_prev)
  559. if (outdummy > tp->tp_dummy)
  560. outdummy = tp->tp_dummy;
  561. for (tp = lasttp; tp != dest; tp = tp->tp_prev)
  562. tp->tp_dummy -= outdummy;
  563. tp->tp_dummy += outdummy;
  564. PS(node)->TotalDummy -= outdummy * PS(node)->TapeRange;
  565. /* do not add the outdummy runs yet */
  566. times -= outdummy;
  567. }
  568. destfile = dest->tp_file;
  569. while (times-- != 0)
  570. { /* merge one run */
  571. tuples = NULL;
  572. if (PS(node)->TotalDummy == 0)
  573. for (tp = dest->tp_prev; tp != dest; tp = tp->tp_prev)
  574. {
  575. GETLEN(tuplen, tp->tp_file);
  576. tup = ALLOCTUP(tuplen);
  577. USEMEM(node, tuplen);
  578. TRACEMEM(merge);
  579. SETTUPLEN(tup, tuplen);
  580. GETTUP(node, tup, tuplen, tp->tp_file);
  581. puttuple(&tuples, tup, tp - PS(node)->Tape,
  582.  &PS(node)->treeContext);
  583. }
  584. else
  585. {
  586. for (tp = dest->tp_prev; tp != dest; tp = tp->tp_prev)
  587. {
  588. if (tp->tp_dummy != 0)
  589. {
  590. tp->tp_dummy--;
  591. PS(node)->TotalDummy--;
  592. }
  593. else
  594. {
  595. GETLEN(tuplen, tp->tp_file);
  596. tup = ALLOCTUP(tuplen);
  597. USEMEM(node, tuplen);
  598. TRACEMEM(merge);
  599. SETTUPLEN(tup, tuplen);
  600. GETTUP(node, tup, tuplen, tp->tp_file);
  601. puttuple(&tuples, tup, tp - PS(node)->Tape,
  602.  &PS(node)->treeContext);
  603. }
  604. }
  605. }
  606. while (tuples != NULL)
  607. {
  608. /* possible optimization by using count in tuples */
  609. tup = gettuple(&tuples, &fromtape, &PS(node)->treeContext);
  610. PUTTUP(node, tup, destfile);
  611. FREEMEM(node, tup->t_len);
  612. FREE(tup);
  613. TRACEMEM(merge);
  614. GETLEN(tuplen, PS(node)->Tape[fromtape].tp_file);
  615. if (tuplen == 0)
  616. ;
  617. else
  618. {
  619. tup = ALLOCTUP(tuplen);
  620. USEMEM(node, tuplen);
  621. TRACEMEM(merge);
  622. SETTUPLEN(tup, tuplen);
  623. GETTUP(node, tup, tuplen, PS(node)->Tape[fromtape].tp_file);
  624. puttuple(&tuples, tup, fromtape, &PS(node)->treeContext);
  625. }
  626. }
  627. ENDRUN(destfile);
  628. }
  629. PS(node)->TotalDummy += outdummy;
  630. }
  631. /*
  632.  * dumptuples - stores all the tuples in tree into file
  633.  */
  634. static void
  635. dumptuples(BufFile *file, Sort *node)
  636. {
  637. struct leftist *tp;
  638. struct leftist *newp;
  639. struct leftist **treep = &PS(node)->Tuples;
  640. LeftistContext context = &PS(node)->treeContext;
  641. HeapTuple tup;
  642. Assert(PS(node)->using_tape_files);
  643. tp = *treep;
  644. while (tp != NULL)
  645. {
  646. tup = tp->lt_tuple;
  647. if (tp->lt_dist == 1) /* lt_right == NULL */
  648. newp = tp->lt_left;
  649. else
  650. newp = lmerge(tp->lt_left, tp->lt_right, context);
  651. pfree(tp);
  652. PUTTUP(node, tup, file);
  653. FREEMEM(node, tup->t_len);
  654. FREE(tup);
  655. tp = newp;
  656. }
  657. *treep = NULL;
  658. }
  659. /*
  660.  * psort_grabtuple - gets a tuple from the sorted file and returns it.
  661.  *   If there are no tuples left, returns NULL.
  662.  *   Should not call psort_end unless this has returned
  663.  *   a NULL indicating the last tuple has been processed.
  664.  */
  665. HeapTuple
  666. psort_grabtuple(Sort *node, bool *should_free)
  667. {
  668. HeapTuple tup;
  669. Assert(node != (Sort *) NULL);
  670. Assert(PS(node) != (Psortstate *) NULL);
  671. if (PS(node)->using_tape_files == true)
  672. {
  673. unsigned int tuplen;
  674. *should_free = true;
  675. if (ScanDirectionIsForward(node->plan.state->es_direction))
  676. {
  677. if (PS(node)->all_fetched)
  678. return NULL;
  679. if (GETLEN(tuplen, PS(node)->psort_grab_file) && tuplen != 0)
  680. {
  681. tup = ALLOCTUP(tuplen);
  682. SETTUPLEN(tup, tuplen);
  683. GETTUP(node, tup, tuplen, PS(node)->psort_grab_file);
  684. /* Update current merged sort file position */
  685. PS(node)->psort_current += tuplen + sizeof(tlendummy);
  686. return tup;
  687. }
  688. else
  689. {
  690. PS(node)->all_fetched = true;
  691. return NULL;
  692. }
  693. }
  694. /* Backward */
  695. if (PS(node)->psort_current <= sizeof(tlendummy))
  696. return NULL;
  697. /*
  698.  * if all tuples are fetched already then we return last tuple,
  699.  * else - tuple before last returned.
  700.  */
  701. if (PS(node)->all_fetched)
  702. {
  703. /*
  704.  * psort_current is pointing to the zero tuplen at the end of
  705.  * file
  706.  */
  707. BufFileSeek(PS(node)->psort_grab_file,
  708.   PS(node)->psort_current - sizeof(tlendummy), SEEK_SET);
  709. GETLEN(tuplen, PS(node)->psort_grab_file);
  710. if (PS(node)->psort_current < tuplen)
  711. elog(ERROR, "psort_grabtuple: too big last tuple len in backward scan");
  712. PS(node)->all_fetched = false;
  713. }
  714. else
  715. {
  716. /* move to position of end tlen of prev tuple */
  717. PS(node)->psort_current -= sizeof(tlendummy);
  718. BufFileSeek(PS(node)->psort_grab_file,
  719. PS(node)->psort_current, SEEK_SET);
  720. GETLEN(tuplen, PS(node)->psort_grab_file); /* get tlen of prev
  721.  * tuple */
  722. if (tuplen == 0)
  723. elog(ERROR, "psort_grabtuple: tuplen is 0 in backward scan");
  724. if (PS(node)->psort_current <= tuplen + sizeof(tlendummy))
  725. { /* prev tuple should be first one */
  726. if (PS(node)->psort_current != tuplen)
  727. elog(ERROR, "psort_grabtuple: first tuple expected in backward scan");
  728. PS(node)->psort_current = 0;
  729. BufFileSeek(PS(node)->psort_grab_file,
  730. PS(node)->psort_current, SEEK_SET);
  731. return NULL;
  732. }
  733. /*
  734.  * Get position of prev tuple. This tuple becomes current
  735.  * tuple now and we have to return previous one.
  736.  */
  737. PS(node)->psort_current -= tuplen;
  738. /* move to position of end tlen of prev tuple */
  739. BufFileSeek(PS(node)->psort_grab_file,
  740.   PS(node)->psort_current - sizeof(tlendummy), SEEK_SET);
  741. GETLEN(tuplen, PS(node)->psort_grab_file);
  742. if (PS(node)->psort_current < tuplen + sizeof(tlendummy))
  743. elog(ERROR, "psort_grabtuple: too big tuple len in backward scan");
  744. }
  745. /*
  746.  * move to prev (or last) tuple start position + sizeof(t_len)
  747.  */
  748. BufFileSeek(PS(node)->psort_grab_file,
  749. PS(node)->psort_current - tuplen, SEEK_SET);
  750. tup = ALLOCTUP(tuplen);
  751. SETTUPLEN(tup, tuplen);
  752. GETTUP(node, tup, tuplen, PS(node)->psort_grab_file);
  753. return tup; /* file position is equal to psort_current */
  754. }
  755. else
  756. {
  757. *should_free = false;
  758. if (ScanDirectionIsForward(node->plan.state->es_direction))
  759. {
  760. if (PS(node)->psort_current < PS(node)->tupcount)
  761. return PS(node)->memtuples[PS(node)->psort_current++];
  762. else
  763. {
  764. PS(node)->all_fetched = true;
  765. return NULL;
  766. }
  767. }
  768. /* Backward */
  769. if (PS(node)->psort_current <= 0)
  770. return NULL;
  771. /*
  772.  * if all tuples are fetched already then we return last tuple,
  773.  * else - tuple before last returned.
  774.  */
  775. if (PS(node)->all_fetched)
  776. PS(node)->all_fetched = false;
  777. else
  778. {
  779. PS(node)->psort_current--; /* last returned tuple */
  780. if (PS(node)->psort_current <= 0)
  781. return NULL;
  782. }
  783. return PS(node)->memtuples[PS(node)->psort_current - 1];
  784. }
  785. }
  786. /*
  787.  * psort_markpos - saves current position in the merged sort file
  788.  */
  789. void
  790. psort_markpos(Sort *node)
  791. {
  792. Assert(node != (Sort *) NULL);
  793. Assert(PS(node) != (Psortstate *) NULL);
  794. PS(node)->psort_saved = PS(node)->psort_current;
  795. }
  796. /*
  797.  * psort_restorepos- restores current position in merged sort file to
  798.  *   last saved position
  799.  */
  800. void
  801. psort_restorepos(Sort *node)
  802. {
  803. Assert(node != (Sort *) NULL);
  804. Assert(PS(node) != (Psortstate *) NULL);
  805. if (PS(node)->using_tape_files == true)
  806. BufFileSeek(PS(node)->psort_grab_file,
  807. PS(node)->psort_saved, SEEK_SET);
  808. PS(node)->psort_current = PS(node)->psort_saved;
  809. }
  810. /*
  811.  * psort_end - unlinks the tape files, and cleans up. Should not be
  812.  *   called unless psort_grabtuple has returned a NULL.
  813.  */
  814. void
  815. psort_end(Sort *node)
  816. {
  817. struct tape *tp;
  818. if (!node->cleaned)
  819. {
  820. /*
  821.  * I'm changing this because if we are sorting a relation with no
  822.  * tuples, psortstate is NULL.
  823.  */
  824. if (PS(node) != (Psortstate *) NULL)
  825. {
  826. if (PS(node)->using_tape_files == true)
  827. for (tp = PS(node)->Tape + PS(node)->TapeRange; tp >= PS(node)->Tape; tp--)
  828. destroytape(tp->tp_file);
  829. else if (PS(node)->memtuples)
  830. pfree(PS(node)->memtuples);
  831. NDirectFileRead += (int) ceil((double) PS(node)->BytesRead / BLCKSZ);
  832. NDirectFileWrite += (int) ceil((double) PS(node)->BytesWritten / BLCKSZ);
  833. pfree((void *) node->psortstate);
  834. node->psortstate = NULL;
  835. node->cleaned = TRUE;
  836. }
  837. }
  838. }
  839. void
  840. psort_rescan(Sort *node)
  841. {
  842. /*
  843.  * If subnode is to be rescanned then free our previous results
  844.  */
  845. if (((Plan *) node)->lefttree->chgParam != NULL)
  846. {
  847. psort_end(node);
  848. node->cleaned = false;
  849. }
  850. else if (PS(node) != (Psortstate *) NULL)
  851. {
  852. PS(node)->all_fetched = false;
  853. PS(node)->psort_current = 0;
  854. PS(node)->psort_saved = 0;
  855. if (PS(node)->using_tape_files == true)
  856. rewind(PS(node)->psort_grab_file);
  857. }
  858. }
  859. /*
  860.  * gettape - returns an open stream for writing/reading
  861.  *
  862.  * Returns:
  863.  * Open stream for writing/reading.
  864.  * NULL if unable to open temporary file.
  865.  *
  866.  * There used to be a lot of cruft here to try to ensure that we destroyed
  867.  * all the tape files; but it didn't really work.  Now we rely on fd.c to
  868.  * clean up temp files if an error occurs.
  869.  */
  870. static BufFile *
  871. gettape()
  872. {
  873. File tfile;
  874. tfile = OpenTemporaryFile();
  875. Assert(tfile >= 0);
  876. return BufFileCreate(tfile);
  877. }
  878. /*
  879.  * destroytape - unlinks the tape
  880.  */
  881. static void
  882. destroytape(BufFile *file)
  883. {
  884. BufFileClose(file);
  885. }
  886. static int
  887. _psort_cmp(HeapTuple *ltup, HeapTuple *rtup)
  888. {
  889. Datum lattr,
  890. rattr;
  891. int nkey;
  892. int result = 0;
  893. bool isnull1,
  894. isnull2;
  895. for (nkey = 0; nkey < PsortNkeys && !result; nkey++)
  896. {
  897. lattr = heap_getattr(*ltup,
  898.  PsortKeys[nkey].sk_attno,
  899.  PsortTupDesc,
  900.  &isnull1);
  901. rattr = heap_getattr(*rtup,
  902.  PsortKeys[nkey].sk_attno,
  903.  PsortTupDesc,
  904.  &isnull2);
  905. if (isnull1)
  906. {
  907. if (!isnull2)
  908. result = 1;
  909. }
  910. else if (isnull2)
  911. result = -1;
  912. else if (PsortKeys[nkey].sk_flags & SK_COMMUTE)
  913. {
  914. if (!(result = -(long) (*fmgr_faddr(&PsortKeys[nkey].sk_func)) (rattr, lattr)))
  915. result = (long) (*fmgr_faddr(&PsortKeys[nkey].sk_func)) (lattr, rattr);
  916. }
  917. else if (!(result = -(long) (*fmgr_faddr(&PsortKeys[nkey].sk_func)) (lattr, rattr)))
  918. result = (long) (*fmgr_faddr(&PsortKeys[nkey].sk_func)) (rattr, lattr);
  919. }
  920. return result;
  921. }