psort.c
上传用户:blenddy
上传日期:2007-01-07
资源大小:6495k
文件大小:26k
- /*
- * psort.c
- * Polyphase merge sort.
- *
- * Copyright (c) 1994, Regents of the University of California
- *
- * $Id: psort.c,v 1.53.2.1 1999/08/02 05:25:18 scrappy Exp $
- *
- * NOTES
- * Sorts the first relation into the second relation.
- *
- * The old psort.c's routines formed a temporary relation from the merged
- * sort files. This version keeps the files around instead of generating the
- * relation from them, and provides interface functions to the file so that
- * you can grab tuples, mark a position in the file, restore a position in the
- * file. You must now explicitly call an interface function to end the sort,
- * psort_end, when you are done.
- * Now most of the global variables are stuck in the Sort nodes, and
- * accessed from there (they are passed to all the psort routines) so that
- * each sort running has its own separate state. This is facilitated by having
- * the Sort nodes passed in to all the interface functions.
- * The one global variable that all the sorts still share is SortMemory.
- * You should now be allowed to run two or more psorts concurrently,
- * so long as the memory they eat up is not greater than SORTMEM, the initial
- * value of SortMemory. -Rex 2.15.1995
- *
- * Use the tape-splitting method (Knuth, Vol. III, pp281-86) in the future.
- *
- * Arguments? Variables?
- * MAXMERGE, MAXTAPES
- *
- */
- #include <math.h>
- #include <sys/types.h>
- #include <unistd.h>
- #include "postgres.h"
- #include "access/heapam.h"
- #include "executor/execdebug.h"
- #include "executor/executor.h"
- #include "miscadmin.h"
- #include "utils/psort.h"
- static bool createfirstrun(Sort *node);
- static bool createrun(Sort *node, BufFile *file);
- static void destroytape(BufFile *file);
- static void dumptuples(BufFile *file, Sort *node);
- static BufFile *gettape(void);
- static void initialrun(Sort *node);
- static void inittapes(Sort *node);
- static void merge(Sort *node, struct tape * dest);
- static BufFile *mergeruns(Sort *node);
- static int _psort_cmp(HeapTuple *ltup, HeapTuple *rtup);
- /*
- * tlenzero used to delimit runs; both vars below must have
- * the same size as HeapTuple->t_len
- */
- static unsigned int tlenzero = 0;
- static unsigned int tlendummy;
- /* these are used by _psort_cmp, and are set just before calling qsort() */
- static TupleDesc PsortTupDesc;
- static ScanKey PsortKeys;
- static int PsortNkeys;
- /*
- * old psort global variables
- *
- * (These are the global variables from the old psort. They are still used,
- * but are now accessed from Sort nodes using the PS macro. Note that while
- * these variables will be accessed by PS(node)->whatever, they will still
- * be called by their original names within the comments! -Rex 2.10.1995)
- *
- * LeftistContextData treeContext;
- *
- * static int TapeRange; number of tapes - 1 (T)
- * static int Level; (l)
- * static int TotalDummy; summation of tp_dummy
- * static struct tape *Tape;
- *
- * static int BytesRead; to keep track of # of IO
- * static int BytesWritten;
- *
- * struct leftist *Tuples; current tuples in memory
- *
- * BufFile *psort_grab_file; this holds tuples grabbed
- * from merged sort runs
- * long psort_current; current file position
- * long psort_saved; file position saved for
- * mark and restore
- */
- /*
- * PS - Macro to access and cast psortstate from a Sort node
- */
- #define PS(N) ((Psortstate *)N->psortstate)
- /*
- * psort_begin - polyphase merge sort entry point. Sorts the subplan
- * into a temporary file psort_grab_file. After
- * this is called, calling the interface function
- * psort_grabtuple iteratively will get you the sorted
- * tuples. psort_end then finishes the sort off, after
- * all the tuples have been grabbed.
- *
- * Allocates and initializes sort node's psort state.
- */
- bool
- psort_begin(Sort *node, int nkeys, ScanKey key)
- {
- node->psortstate = (struct Psortstate *) palloc(sizeof(struct Psortstate));
- AssertArg(nkeys >= 1);
- AssertArg(key[0].sk_attno != 0);
- AssertArg(key[0].sk_procedure != 0);
- PS(node)->BytesRead = 0;
- PS(node)->BytesWritten = 0;
- PS(node)->treeContext.tupDesc = ExecGetTupType(outerPlan((Plan *) node));
- PS(node)->treeContext.nKeys = nkeys;
- PS(node)->treeContext.scanKeys = key;
- PS(node)->treeContext.sortMem = SortMem * 1024;
- PS(node)->Tuples = NULL;
- PS(node)->tupcount = 0;
- PS(node)->using_tape_files = false;
- PS(node)->all_fetched = false;
- PS(node)->psort_grab_file = NULL;
- PS(node)->memtuples = NULL;
- initialrun(node);
- if (PS(node)->tupcount == 0)
- return false;
- if (PS(node)->using_tape_files && PS(node)->psort_grab_file == NULL)
- PS(node)->psort_grab_file = mergeruns(node);
- PS(node)->psort_current = 0;
- PS(node)->psort_saved = 0;
- return true;
- }
- /*
- * inittapes - initializes the tapes
- * - (polyphase merge Alg.D(D1)--Knuth, Vol.3, p.270)
- * Returns:
- * number of allocated tapes
- */
- static void
- inittapes(Sort *node)
- {
- int i;
- struct tape *tp;
- Assert(node != (Sort *) NULL);
- Assert(PS(node) != (Psortstate *) NULL);
- /*
- * ASSERT(ntapes >= 3 && ntapes <= MAXTAPES, "inittapes: Invalid
- * number of tapes to initialize.n");
- */
- tp = PS(node)->Tape;
- for (i = 0; i < MAXTAPES && (tp->tp_file = gettape()) != NULL; i++)
- {
- tp->tp_dummy = 1;
- tp->tp_fib = 1;
- tp->tp_prev = tp - 1;
- tp++;
- }
- PS(node)->TapeRange = --tp - PS(node)->Tape;
- tp->tp_dummy = 0;
- tp->tp_fib = 0;
- PS(node)->Tape[0].tp_prev = tp;
- if (PS(node)->TapeRange <= 1)
- elog(ERROR, "inittapes: Could only allocate %d < 3 tapesn",
- PS(node)->TapeRange + 1);
- PS(node)->Level = 1;
- PS(node)->TotalDummy = PS(node)->TapeRange;
- PS(node)->using_tape_files = true;
- }
- /*
- * PUTTUP - writes the next tuple
- * ENDRUN - mark end of run
- * GETLEN - reads the length of the next tuple
- * ALLOCTUP - returns space for the new tuple
- * SETTUPLEN - stores the length into the tuple
- * GETTUP - reads the tuple
- *
- * Note:
- * LEN field must be as HeapTuple->t_len; FP is a stream
- */
- #define PUTTUP(NODE, TUP, FP)
- (
- (TUP)->t_len += HEAPTUPLESIZE,
- ((Psortstate *)NODE->psortstate)->BytesWritten += (TUP)->t_len,
- BufFileWrite(FP, (char *)TUP, (TUP)->t_len),
- BufFileWrite(FP, (char *)&((TUP)->t_len), sizeof(tlendummy)),
- (TUP)->t_len -= HEAPTUPLESIZE
- )
- #define ENDRUN(FP) BufFileWrite(FP, (char *)&tlenzero, sizeof(tlenzero))
- #define GETLEN(LEN, FP) BufFileRead(FP, (char *)&(LEN), sizeof(tlenzero))
- #define ALLOCTUP(LEN) ((HeapTuple)palloc((unsigned)LEN))
- #define FREE(x) pfree((char *) x)
- #define GETTUP(NODE, TUP, LEN, FP)
- (
- IncrProcessed(),
- ((Psortstate *)NODE->psortstate)->BytesRead += (LEN) - sizeof(tlenzero),
- BufFileRead(FP, (char *)(TUP) + sizeof(tlenzero), (LEN) - sizeof(tlenzero)),
- (TUP)->t_data = (HeapTupleHeader) ((char *)(TUP) + HEAPTUPLESIZE),
- BufFileRead(FP, (char *)&tlendummy, sizeof(tlendummy))
- )
- #define SETTUPLEN(TUP, LEN) ((TUP)->t_len = (LEN) - HEAPTUPLESIZE)
- #define rewind(FP) BufFileSeek(FP, 0L, SEEK_SET)
- /*
- * USEMEM - record use of memory FREEMEM - record
- * freeing of memory FULLMEM - 1 iff a tuple will fit
- */
- #define USEMEM(NODE,AMT) PS(node)->treeContext.sortMem -= (AMT)
- #define FREEMEM(NODE,AMT) PS(node)->treeContext.sortMem += (AMT)
- #define LACKMEM(NODE) (PS(node)->treeContext.sortMem <= BLCKSZ) /* not accurate */
- #define TRACEMEM(FUNC)
- #define TRACEOUT(FUNC, TUP)
- /*
- * initialrun - distributes tuples from the relation
- * - (replacement selection(R2-R3)--Knuth, Vol.3, p.257)
- * - (polyphase merge Alg.D(D2-D4)--Knuth, Vol.3, p.271)
- *
- * Explanation:
- * Tuples are distributed to the tapes as in Algorithm D.
- * A "tuple" with t_size == 0 is used to mark the end of a run.
- *
- * Note:
- * The replacement selection algorithm has been modified
- * to go from R1 directly to R3 skipping R2 the first time.
- *
- * Maybe should use closer(rdesc) before return
- * Perhaps should adjust the number of tapes if less than n.
- * used--v. likely to have problems in mergeruns().
- * Must know if should open/close files before each
- * call to psort()? If should--messy??
- *
- * Possible optimization:
- * put the first xxx runs in quickly--problem here since
- * I (perhaps prematurely) combined the 2 algorithms.
- * Also, perhaps allocate tapes when needed. Split into 2 funcs.
- */
- static void
- initialrun(Sort *node)
- {
- /* struct tuple *tup; */
- struct tape *tp;
- int baseruns; /* D:(a) */
- int extrapasses; /* EOF */
- Assert(node != (Sort *) NULL);
- Assert(PS(node) != (Psortstate *) NULL);
- tp = PS(node)->Tape;
- if (createfirstrun(node))
- {
- Assert(PS(node)->using_tape_files);
- extrapasses = 0;
- }
- else
- /* all tuples fetched */
- {
- if (!PS(node)->using_tape_files) /* empty or sorted in
- * memory */
- return;
- /*
- * if PS(node)->Tuples == NULL then we have single (sorted) run
- * which can be used as result grab file! So, we may avoid
- * mergeruns - it will just copy this run to new file.
- */
- if (PS(node)->Tuples == NULL)
- {
- PS(node)->psort_grab_file = PS(node)->Tape->tp_file;
- rewind(PS(node)->psort_grab_file);
- return;
- }
- extrapasses = 2;
- }
- for (;;)
- {
- tp->tp_dummy--;
- PS(node)->TotalDummy--;
- if (tp->tp_dummy < (tp + 1)->tp_dummy)
- tp++;
- else
- {
- if (tp->tp_dummy != 0)
- tp = PS(node)->Tape;
- else
- {
- PS(node)->Level++;
- baseruns = PS(node)->Tape[0].tp_fib;
- for (tp = PS(node)->Tape;
- tp - PS(node)->Tape < PS(node)->TapeRange; tp++)
- {
- PS(node)->TotalDummy += (tp->tp_dummy = baseruns
- + (tp + 1)->tp_fib
- - tp->tp_fib);
- tp->tp_fib = baseruns
- + (tp + 1)->tp_fib;
- }
- tp = PS(node)->Tape; /* D4 */
- } /* D3 */
- }
- if (extrapasses)
- {
- if (--extrapasses)
- {
- dumptuples(tp->tp_file, node);
- ENDRUN(tp->tp_file);
- continue;
- }
- else
- break;
- }
- if ((bool) createrun(node, tp->tp_file) == false)
- extrapasses = 1 + (PS(node)->Tuples != NULL);
- /* D2 */
- }
- for (tp = PS(node)->Tape + PS(node)->TapeRange; tp >= PS(node)->Tape; tp--)
- rewind(tp->tp_file); /* D. */
- }
- /*
- * createfirstrun - tries to sort tuples in memory using qsort
- * until LACKMEM; if not enough memory then switches
- * to tape method
- *
- * Returns:
- * FALSE iff process through end of relation
- * Tuples contains the tuples for the following run upon exit
- */
- static bool
- createfirstrun(Sort *node)
- {
- HeapTuple tup;
- bool foundeor = false;
- HeapTuple *memtuples;
- int t_last = -1;
- int t_free = 1000;
- TupleTableSlot *cr_slot;
- Assert(node != (Sort *) NULL);
- Assert(PS(node) != (Psortstate *) NULL);
- Assert(!PS(node)->using_tape_files);
- Assert(PS(node)->memtuples == NULL);
- Assert(PS(node)->tupcount == 0);
- if (LACKMEM(node))
- elog(ERROR, "psort: LACKMEM in createfirstrun");
- memtuples = palloc(t_free * sizeof(HeapTuple));
- for (;;)
- {
- if (LACKMEM(node))
- break;
- /*
- * About to call ExecProcNode, it can mess up the state if it
- * eventually calls another Sort node. So must stow it away here
- * for the meantime. -Rex
- * 2.2.1995
- */
- cr_slot = ExecProcNode(outerPlan((Plan *) node), (Plan *) node);
- if (TupIsNull(cr_slot))
- {
- foundeor = true;
- break;
- }
- tup = heap_copytuple(cr_slot->val);
- ExecClearTuple(cr_slot);
- IncrProcessed();
- USEMEM(node, tup->t_len);
- TRACEMEM(createfirstrun);
- if (t_free <= 0)
- {
- t_free = 1000;
- memtuples = repalloc(memtuples,
- (t_last + t_free + 1) * sizeof(HeapTuple));
- }
- t_last++;
- t_free--;
- memtuples[t_last] = tup;
- }
- if (t_last < 0) /* empty */
- {
- Assert(foundeor);
- pfree(memtuples);
- return false;
- }
- t_last++;
- PS(node)->tupcount = t_last;
- PsortTupDesc = PS(node)->treeContext.tupDesc;
- PsortKeys = PS(node)->treeContext.scanKeys;
- PsortNkeys = PS(node)->treeContext.nKeys;
- qsort(memtuples, t_last, sizeof(HeapTuple),
- (int (*) (const void *, const void *)) _psort_cmp);
- if (LACKMEM(node)) /* in-memory sort is impossible */
- {
- int t;
- Assert(!foundeor);
- inittapes(node);
- /* put tuples into leftist tree for createrun */
- for (t = t_last - 1; t >= 0; t--)
- puttuple(&PS(node)->Tuples, memtuples[t], 0, &PS(node)->treeContext);
- pfree(memtuples);
- foundeor = !createrun(node, PS(node)->Tape->tp_file);
- }
- else
- {
- Assert(foundeor);
- PS(node)->memtuples = memtuples;
- }
- return !foundeor;
- }
- /*
- * createrun - places the next run on file, grabbing the tuples by
- * executing the subplan passed in
- *
- * Uses:
- * Tuples, which should contain any tuples for this run
- *
- * Returns:
- * FALSE iff process through end of relation
- * Tuples contains the tuples for the following run upon exit
- */
- static bool
- createrun(Sort *node, BufFile *file)
- {
- HeapTuple lasttuple;
- HeapTuple tup;
- TupleTableSlot *cr_slot;
- HeapTuple *memtuples;
- int t_last = -1;
- int t_free = 1000;
- bool foundeor = false;
- short junk;
- Assert(node != (Sort *) NULL);
- Assert(PS(node) != (Psortstate *) NULL);
- Assert(PS(node)->using_tape_files);
- lasttuple = NULL;
- memtuples = palloc(t_free * sizeof(HeapTuple));
- for (;;)
- {
- while (LACKMEM(node) && PS(node)->Tuples != NULL)
- {
- if (lasttuple != NULL)
- {
- FREEMEM(node, lasttuple->t_len);
- FREE(lasttuple);
- TRACEMEM(createrun);
- }
- lasttuple = gettuple(&PS(node)->Tuples, &junk,
- &PS(node)->treeContext);
- PUTTUP(node, lasttuple, file);
- TRACEOUT(createrun, lasttuple);
- }
- if (LACKMEM(node))
- break;
- /*
- * About to call ExecProcNode, it can mess up the state if it
- * eventually calls another Sort node. So must stow it away here
- * for the meantime. -Rex
- * 2.2.1995
- */
- cr_slot = ExecProcNode(outerPlan((Plan *) node), (Plan *) node);
- if (TupIsNull(cr_slot))
- {
- foundeor = true;
- break;
- }
- else
- {
- tup = heap_copytuple(cr_slot->val);
- ExecClearTuple(cr_slot);
- PS(node)->tupcount++;
- }
- IncrProcessed();
- USEMEM(node, tup->t_len);
- TRACEMEM(createrun);
- if (lasttuple != NULL && tuplecmp(tup, lasttuple,
- &PS(node)->treeContext))
- {
- if (t_free <= 0)
- {
- t_free = 1000;
- memtuples = repalloc(memtuples,
- (t_last + t_free + 1) * sizeof(HeapTuple));
- }
- t_last++;
- t_free--;
- memtuples[t_last] = tup;
- }
- else
- puttuple(&PS(node)->Tuples, tup, 0, &PS(node)->treeContext);
- }
- if (lasttuple != NULL)
- {
- FREEMEM(node, lasttuple->t_len);
- FREE(lasttuple);
- TRACEMEM(createrun);
- }
- dumptuples(file, node);
- ENDRUN(file); /* delimit the end of the run */
- t_last++;
- /* put tuples for the next run into leftist tree */
- if (t_last >= 1)
- {
- int t;
- PsortTupDesc = PS(node)->treeContext.tupDesc;
- PsortKeys = PS(node)->treeContext.scanKeys;
- PsortNkeys = PS(node)->treeContext.nKeys;
- qsort(memtuples, t_last, sizeof(HeapTuple),
- (int (*) (const void *, const void *)) _psort_cmp);
- for (t = t_last - 1; t >= 0; t--)
- puttuple(&PS(node)->Tuples, memtuples[t], 0, &PS(node)->treeContext);
- }
- pfree(memtuples);
- return !foundeor;
- }
- /*
- * mergeruns - merges all runs from input tapes
- * (polyphase merge Alg.D(D6)--Knuth, Vol.3, p271)
- *
- * Returns:
- * file of tuples in order
- */
- static BufFile *
- mergeruns(Sort *node)
- {
- struct tape *tp;
- Assert(node != (Sort *) NULL);
- Assert(PS(node) != (Psortstate *) NULL);
- Assert(PS(node)->using_tape_files == true);
- tp = PS(node)->Tape + PS(node)->TapeRange;
- merge(node, tp);
- rewind(tp->tp_file);
- while (--PS(node)->Level != 0)
- {
- tp = tp->tp_prev;
- rewind(tp->tp_file);
- merge(node, tp);
- rewind(tp->tp_file);
- }
- return tp->tp_file;
- }
- /*
- * merge - handles a single merge of the tape
- * (polyphase merge Alg.D(D5)--Knuth, Vol.3, p271)
- */
- static void
- merge(Sort *node, struct tape * dest)
- {
- HeapTuple tup;
- struct tape *lasttp; /* (TAPE[P]) */
- struct tape *tp;
- struct leftist *tuples;
- BufFile *destfile;
- int times; /* runs left to merge */
- int outdummy; /* complete dummy runs */
- short fromtape;
- unsigned int tuplen;
- Assert(node != (Sort *) NULL);
- Assert(PS(node) != (Psortstate *) NULL);
- Assert(PS(node)->using_tape_files == true);
- lasttp = dest->tp_prev;
- times = lasttp->tp_fib;
- for (tp = lasttp; tp != dest; tp = tp->tp_prev)
- tp->tp_fib -= times;
- tp->tp_fib += times;
- /* Tape[].tp_fib (A[]) is set to proper exit values */
- if (PS(node)->TotalDummy < PS(node)->TapeRange) /* no complete dummy
- * runs */
- outdummy = 0;
- else
- {
- outdummy = PS(node)->TotalDummy; /* a large positive number */
- for (tp = lasttp; tp != dest; tp = tp->tp_prev)
- if (outdummy > tp->tp_dummy)
- outdummy = tp->tp_dummy;
- for (tp = lasttp; tp != dest; tp = tp->tp_prev)
- tp->tp_dummy -= outdummy;
- tp->tp_dummy += outdummy;
- PS(node)->TotalDummy -= outdummy * PS(node)->TapeRange;
- /* do not add the outdummy runs yet */
- times -= outdummy;
- }
- destfile = dest->tp_file;
- while (times-- != 0)
- { /* merge one run */
- tuples = NULL;
- if (PS(node)->TotalDummy == 0)
- for (tp = dest->tp_prev; tp != dest; tp = tp->tp_prev)
- {
- GETLEN(tuplen, tp->tp_file);
- tup = ALLOCTUP(tuplen);
- USEMEM(node, tuplen);
- TRACEMEM(merge);
- SETTUPLEN(tup, tuplen);
- GETTUP(node, tup, tuplen, tp->tp_file);
- puttuple(&tuples, tup, tp - PS(node)->Tape,
- &PS(node)->treeContext);
- }
- else
- {
- for (tp = dest->tp_prev; tp != dest; tp = tp->tp_prev)
- {
- if (tp->tp_dummy != 0)
- {
- tp->tp_dummy--;
- PS(node)->TotalDummy--;
- }
- else
- {
- GETLEN(tuplen, tp->tp_file);
- tup = ALLOCTUP(tuplen);
- USEMEM(node, tuplen);
- TRACEMEM(merge);
- SETTUPLEN(tup, tuplen);
- GETTUP(node, tup, tuplen, tp->tp_file);
- puttuple(&tuples, tup, tp - PS(node)->Tape,
- &PS(node)->treeContext);
- }
- }
- }
- while (tuples != NULL)
- {
- /* possible optimization by using count in tuples */
- tup = gettuple(&tuples, &fromtape, &PS(node)->treeContext);
- PUTTUP(node, tup, destfile);
- FREEMEM(node, tup->t_len);
- FREE(tup);
- TRACEMEM(merge);
- GETLEN(tuplen, PS(node)->Tape[fromtape].tp_file);
- if (tuplen == 0)
- ;
- else
- {
- tup = ALLOCTUP(tuplen);
- USEMEM(node, tuplen);
- TRACEMEM(merge);
- SETTUPLEN(tup, tuplen);
- GETTUP(node, tup, tuplen, PS(node)->Tape[fromtape].tp_file);
- puttuple(&tuples, tup, fromtape, &PS(node)->treeContext);
- }
- }
- ENDRUN(destfile);
- }
- PS(node)->TotalDummy += outdummy;
- }
- /*
- * dumptuples - stores all the tuples in tree into file
- */
- static void
- dumptuples(BufFile *file, Sort *node)
- {
- struct leftist *tp;
- struct leftist *newp;
- struct leftist **treep = &PS(node)->Tuples;
- LeftistContext context = &PS(node)->treeContext;
- HeapTuple tup;
- Assert(PS(node)->using_tape_files);
- tp = *treep;
- while (tp != NULL)
- {
- tup = tp->lt_tuple;
- if (tp->lt_dist == 1) /* lt_right == NULL */
- newp = tp->lt_left;
- else
- newp = lmerge(tp->lt_left, tp->lt_right, context);
- pfree(tp);
- PUTTUP(node, tup, file);
- FREEMEM(node, tup->t_len);
- FREE(tup);
- tp = newp;
- }
- *treep = NULL;
- }
- /*
- * psort_grabtuple - gets a tuple from the sorted file and returns it.
- * If there are no tuples left, returns NULL.
- * Should not call psort_end unless this has returned
- * a NULL indicating the last tuple has been processed.
- */
- HeapTuple
- psort_grabtuple(Sort *node, bool *should_free)
- {
- HeapTuple tup;
- Assert(node != (Sort *) NULL);
- Assert(PS(node) != (Psortstate *) NULL);
- if (PS(node)->using_tape_files == true)
- {
- unsigned int tuplen;
- *should_free = true;
- if (ScanDirectionIsForward(node->plan.state->es_direction))
- {
- if (PS(node)->all_fetched)
- return NULL;
- if (GETLEN(tuplen, PS(node)->psort_grab_file) && tuplen != 0)
- {
- tup = ALLOCTUP(tuplen);
- SETTUPLEN(tup, tuplen);
- GETTUP(node, tup, tuplen, PS(node)->psort_grab_file);
- /* Update current merged sort file position */
- PS(node)->psort_current += tuplen + sizeof(tlendummy);
- return tup;
- }
- else
- {
- PS(node)->all_fetched = true;
- return NULL;
- }
- }
- /* Backward */
- if (PS(node)->psort_current <= sizeof(tlendummy))
- return NULL;
- /*
- * if all tuples are fetched already then we return last tuple,
- * else - tuple before last returned.
- */
- if (PS(node)->all_fetched)
- {
- /*
- * psort_current is pointing to the zero tuplen at the end of
- * file
- */
- BufFileSeek(PS(node)->psort_grab_file,
- PS(node)->psort_current - sizeof(tlendummy), SEEK_SET);
- GETLEN(tuplen, PS(node)->psort_grab_file);
- if (PS(node)->psort_current < tuplen)
- elog(ERROR, "psort_grabtuple: too big last tuple len in backward scan");
- PS(node)->all_fetched = false;
- }
- else
- {
- /* move to position of end tlen of prev tuple */
- PS(node)->psort_current -= sizeof(tlendummy);
- BufFileSeek(PS(node)->psort_grab_file,
- PS(node)->psort_current, SEEK_SET);
- GETLEN(tuplen, PS(node)->psort_grab_file); /* get tlen of prev
- * tuple */
- if (tuplen == 0)
- elog(ERROR, "psort_grabtuple: tuplen is 0 in backward scan");
- if (PS(node)->psort_current <= tuplen + sizeof(tlendummy))
- { /* prev tuple should be first one */
- if (PS(node)->psort_current != tuplen)
- elog(ERROR, "psort_grabtuple: first tuple expected in backward scan");
- PS(node)->psort_current = 0;
- BufFileSeek(PS(node)->psort_grab_file,
- PS(node)->psort_current, SEEK_SET);
- return NULL;
- }
- /*
- * Get position of prev tuple. This tuple becomes current
- * tuple now and we have to return previous one.
- */
- PS(node)->psort_current -= tuplen;
- /* move to position of end tlen of prev tuple */
- BufFileSeek(PS(node)->psort_grab_file,
- PS(node)->psort_current - sizeof(tlendummy), SEEK_SET);
- GETLEN(tuplen, PS(node)->psort_grab_file);
- if (PS(node)->psort_current < tuplen + sizeof(tlendummy))
- elog(ERROR, "psort_grabtuple: too big tuple len in backward scan");
- }
- /*
- * move to prev (or last) tuple start position + sizeof(t_len)
- */
- BufFileSeek(PS(node)->psort_grab_file,
- PS(node)->psort_current - tuplen, SEEK_SET);
- tup = ALLOCTUP(tuplen);
- SETTUPLEN(tup, tuplen);
- GETTUP(node, tup, tuplen, PS(node)->psort_grab_file);
- return tup; /* file position is equal to psort_current */
- }
- else
- {
- *should_free = false;
- if (ScanDirectionIsForward(node->plan.state->es_direction))
- {
- if (PS(node)->psort_current < PS(node)->tupcount)
- return PS(node)->memtuples[PS(node)->psort_current++];
- else
- {
- PS(node)->all_fetched = true;
- return NULL;
- }
- }
- /* Backward */
- if (PS(node)->psort_current <= 0)
- return NULL;
- /*
- * if all tuples are fetched already then we return last tuple,
- * else - tuple before last returned.
- */
- if (PS(node)->all_fetched)
- PS(node)->all_fetched = false;
- else
- {
- PS(node)->psort_current--; /* last returned tuple */
- if (PS(node)->psort_current <= 0)
- return NULL;
- }
- return PS(node)->memtuples[PS(node)->psort_current - 1];
- }
- }
- /*
- * psort_markpos - saves current position in the merged sort file
- */
- void
- psort_markpos(Sort *node)
- {
- Assert(node != (Sort *) NULL);
- Assert(PS(node) != (Psortstate *) NULL);
- PS(node)->psort_saved = PS(node)->psort_current;
- }
- /*
- * psort_restorepos- restores current position in merged sort file to
- * last saved position
- */
- void
- psort_restorepos(Sort *node)
- {
- Assert(node != (Sort *) NULL);
- Assert(PS(node) != (Psortstate *) NULL);
- if (PS(node)->using_tape_files == true)
- BufFileSeek(PS(node)->psort_grab_file,
- PS(node)->psort_saved, SEEK_SET);
- PS(node)->psort_current = PS(node)->psort_saved;
- }
- /*
- * psort_end - unlinks the tape files, and cleans up. Should not be
- * called unless psort_grabtuple has returned a NULL.
- */
- void
- psort_end(Sort *node)
- {
- struct tape *tp;
- if (!node->cleaned)
- {
- /*
- * I'm changing this because if we are sorting a relation with no
- * tuples, psortstate is NULL.
- */
- if (PS(node) != (Psortstate *) NULL)
- {
- if (PS(node)->using_tape_files == true)
- for (tp = PS(node)->Tape + PS(node)->TapeRange; tp >= PS(node)->Tape; tp--)
- destroytape(tp->tp_file);
- else if (PS(node)->memtuples)
- pfree(PS(node)->memtuples);
- NDirectFileRead += (int) ceil((double) PS(node)->BytesRead / BLCKSZ);
- NDirectFileWrite += (int) ceil((double) PS(node)->BytesWritten / BLCKSZ);
- pfree((void *) node->psortstate);
- node->psortstate = NULL;
- node->cleaned = TRUE;
- }
- }
- }
- void
- psort_rescan(Sort *node)
- {
- /*
- * If subnode is to be rescanned then free our previous results
- */
- if (((Plan *) node)->lefttree->chgParam != NULL)
- {
- psort_end(node);
- node->cleaned = false;
- }
- else if (PS(node) != (Psortstate *) NULL)
- {
- PS(node)->all_fetched = false;
- PS(node)->psort_current = 0;
- PS(node)->psort_saved = 0;
- if (PS(node)->using_tape_files == true)
- rewind(PS(node)->psort_grab_file);
- }
- }
- /*
- * gettape - returns an open stream for writing/reading
- *
- * Returns:
- * Open stream for writing/reading.
- * NULL if unable to open temporary file.
- *
- * There used to be a lot of cruft here to try to ensure that we destroyed
- * all the tape files; but it didn't really work. Now we rely on fd.c to
- * clean up temp files if an error occurs.
- */
- static BufFile *
- gettape()
- {
- File tfile;
- tfile = OpenTemporaryFile();
- Assert(tfile >= 0);
- return BufFileCreate(tfile);
- }
- /*
- * destroytape - unlinks the tape
- */
- static void
- destroytape(BufFile *file)
- {
- BufFileClose(file);
- }
- static int
- _psort_cmp(HeapTuple *ltup, HeapTuple *rtup)
- {
- Datum lattr,
- rattr;
- int nkey;
- int result = 0;
- bool isnull1,
- isnull2;
- for (nkey = 0; nkey < PsortNkeys && !result; nkey++)
- {
- lattr = heap_getattr(*ltup,
- PsortKeys[nkey].sk_attno,
- PsortTupDesc,
- &isnull1);
- rattr = heap_getattr(*rtup,
- PsortKeys[nkey].sk_attno,
- PsortTupDesc,
- &isnull2);
- if (isnull1)
- {
- if (!isnull2)
- result = 1;
- }
- else if (isnull2)
- result = -1;
- else if (PsortKeys[nkey].sk_flags & SK_COMMUTE)
- {
- if (!(result = -(long) (*fmgr_faddr(&PsortKeys[nkey].sk_func)) (rattr, lattr)))
- result = (long) (*fmgr_faddr(&PsortKeys[nkey].sk_func)) (lattr, rattr);
- }
- else if (!(result = -(long) (*fmgr_faddr(&PsortKeys[nkey].sk_func)) (lattr, rattr)))
- result = (long) (*fmgr_faddr(&PsortKeys[nkey].sk_func)) (rattr, lattr);
- }
- return result;
- }