nodeAgg.c
上传用户:blenddy
上传日期:2007-01-07
资源大小:6495k
文件大小:17k
源码类别:

数据库系统

开发平台:

Unix_Linux

  1. /*-------------------------------------------------------------------------
  2.  *
  3.  * nodeAgg.c
  4.  *   Routines to handle aggregate nodes.
  5.  *
  6.  * Copyright (c) 1994, Regents of the University of California
  7.  *
  8.  *
  9.  * NOTE
  10.  *   The implementation of Agg node has been reworked to handle legal
  11.  *   SQL aggregates. (Do not expect POSTQUEL semantics.)  -- ay 2/95
  12.  *
  13.  * IDENTIFICATION
  14.  *   /usr/local/devel/pglite/cvs/src/backend/executor/nodeAgg.c,v 1.13 1995/08/01 20:19:07 jolly Exp
  15.  *
  16.  *-------------------------------------------------------------------------
  17.  */
  18. #include <string.h>
  19. #include "postgres.h"
  20. #include "fmgr.h"
  21. #include "access/heapam.h"
  22. #include "catalog/pg_aggregate.h"
  23. #include "catalog/catalog.h"
  24. #include "parser/parse_type.h"
  25. #include "executor/executor.h"
  26. #include "executor/nodeAgg.h"
  27. #include "storage/bufmgr.h"
  28. #include "utils/palloc.h"
  29. #include "utils/syscache.h"
  30. #include "optimizer/clauses.h"
  31. /*
  32.  * AggFuncInfo -
  33.  *   keeps the transition functions information around
  34.  */
  35. typedef struct AggFuncInfo
  36. {
  37. Oid xfn1_oid;
  38. Oid xfn2_oid;
  39. Oid finalfn_oid;
  40. FmgrInfo xfn1;
  41. FmgrInfo xfn2;
  42. FmgrInfo finalfn;
  43. } AggFuncInfo;
  44. static Datum aggGetAttr(TupleTableSlot *tuple, Aggref *aggref, bool *isNull);
  45. /* ---------------------------------------
  46.  *
  47.  * ExecAgg -
  48.  *
  49.  *   ExecAgg receives tuples from its outer subplan and aggregates over
  50.  *   the appropriate attribute for each (unique) aggregate in the target
  51.  *   list. (The number of tuples to aggregate over depends on whether a
  52.  *   GROUP BY clause is present. It might be the number of tuples in a
  53.  *   group or all the tuples that satisfy the qualifications.) The value of
  54.  *   each aggregate is stored in the expression context for ExecProject to
  55.  *   evaluate the result tuple.
  56.  *
  57.  *   ExecAgg evaluates each aggregate in the following steps: (initcond1,
  58.  *   initcond2 are the initial values and sfunc1, sfunc2, and finalfunc are
  59.  *   the transition functions.)
  60.  *
  61.  *  value1[i] = initcond1
  62.  *  value2[i] = initcond2
  63.  *  forall tuples do
  64.  * value1[i] = sfunc1(value1[i], aggregated_value)
  65.  * value2[i] = sfunc2(value2[i])
  66.  *  value1[i] = finalfunc(value1[i], value2[i])
  67.  *
  68.  *   If initcond1 is NULL then the first non-NULL aggregated_value is
  69.  *   assigned directly to value1[i].  sfunc1 isn't applied until value1[i]
  70.  *   is non-NULL.
  71.  *
  72.  *   If the outer subplan is a Group node, ExecAgg returns as many tuples
  73.  *   as there are groups.
  74.  *
  75.  *   XXX handling of NULL doesn't work
  76.  *
  77.  *   OLD COMMENTS
  78.  *
  79.  * XXX Aggregates should probably have another option: what to do
  80.  * with transfn2 if we hit a null value.  "count" (transfn1 = null,
  81.  * transfn2 = increment) will want to have transfn2 called; "avg"
  82.  * (transfn1 = add, transfn2 = increment) will not. -pma 1/3/93
  83.  *
  84.  * ------------------------------------------
  85.  */
  86. TupleTableSlot *
  87. ExecAgg(Agg *node)
  88. {
  89. AggState   *aggstate;
  90. EState    *estate;
  91. Plan    *outerPlan;
  92. int aggno,
  93. nagg;
  94. Datum    *value1,
  95.    *value2;
  96. int    *noInitValue;
  97. AggFuncInfo *aggFuncInfo;
  98. long nTuplesAgged = 0;
  99. ExprContext *econtext;
  100. ProjectionInfo *projInfo;
  101. TupleTableSlot *resultSlot;
  102. HeapTuple oneTuple;
  103. List    *alist;
  104. char    *nulls;
  105. bool isDone;
  106. bool isNull = FALSE,
  107. isNull1 = FALSE,
  108. isNull2 = FALSE;
  109. bool qual_result;
  110. /* ---------------------
  111.  * get state info from node
  112.  * ---------------------
  113.  */
  114. /*
  115.  * We loop retrieving groups until we find one matching
  116.  * node->plan.qual
  117.  */
  118. do
  119. {
  120. aggstate = node->aggstate;
  121. if (aggstate->agg_done)
  122. return NULL;
  123. estate = node->plan.state;
  124. econtext = aggstate->csstate.cstate.cs_ExprContext;
  125. nagg = length(node->aggs);
  126. value1 = node->aggstate->csstate.cstate.cs_ExprContext->ecxt_values;
  127. nulls = node->aggstate->csstate.cstate.cs_ExprContext->ecxt_nulls;
  128. value2 = (Datum *) palloc(sizeof(Datum) * nagg);
  129. MemSet(value2, 0, sizeof(Datum) * nagg);
  130. aggFuncInfo = (AggFuncInfo *) palloc(sizeof(AggFuncInfo) * nagg);
  131. MemSet(aggFuncInfo, 0, sizeof(AggFuncInfo) * nagg);
  132. noInitValue = (int *) palloc(sizeof(int) * nagg);
  133. MemSet(noInitValue, 0, sizeof(int) * nagg);
  134. outerPlan = outerPlan(node);
  135. oneTuple = NULL;
  136. projInfo = aggstate->csstate.cstate.cs_ProjInfo;
  137. aggno = -1;
  138. foreach(alist, node->aggs)
  139. {
  140. Aggref    *aggref = lfirst(alist);
  141. char    *aggname;
  142. HeapTuple aggTuple;
  143. Form_pg_aggregate aggp;
  144. Oid xfn1_oid,
  145. xfn2_oid,
  146. finalfn_oid;
  147. aggref->aggno = ++aggno;
  148. /* ---------------------
  149.  * find transfer functions of all the aggregates and initialize
  150.  * their initial values
  151.  * ---------------------
  152.  */
  153. aggname = aggref->aggname;
  154. aggTuple = SearchSysCacheTuple(AGGNAME,
  155.    PointerGetDatum(aggname),
  156.   ObjectIdGetDatum(aggref->basetype),
  157.    0, 0);
  158. if (!HeapTupleIsValid(aggTuple))
  159. elog(ERROR, "ExecAgg: cache lookup failed for aggregate "%s"(%s)",
  160.  aggname,
  161.  typeidTypeName(aggref->basetype));
  162. aggp = (Form_pg_aggregate) GETSTRUCT(aggTuple);
  163. xfn1_oid = aggp->aggtransfn1;
  164. xfn2_oid = aggp->aggtransfn2;
  165. finalfn_oid = aggp->aggfinalfn;
  166. if (OidIsValid(finalfn_oid))
  167. {
  168. fmgr_info(finalfn_oid, &aggFuncInfo[aggno].finalfn);
  169. aggFuncInfo[aggno].finalfn_oid = finalfn_oid;
  170. }
  171. if (OidIsValid(xfn2_oid))
  172. {
  173. fmgr_info(xfn2_oid, &aggFuncInfo[aggno].xfn2);
  174. aggFuncInfo[aggno].xfn2_oid = xfn2_oid;
  175. value2[aggno] = (Datum) AggNameGetInitVal((char *) aggname,
  176.    aggp->aggbasetype,
  177.   2,
  178.   &isNull2);
  179. /* ------------------------------------------
  180.  * If there is a second transition function, its initial
  181.  * value must exist -- as it does not depend on data values,
  182.  * we have no other way of determining an initial value.
  183.  * ------------------------------------------
  184.  */
  185. if (isNull2)
  186. elog(ERROR, "ExecAgg: agginitval2 is null");
  187. }
  188. if (OidIsValid(xfn1_oid))
  189. {
  190. fmgr_info(xfn1_oid, &aggFuncInfo[aggno].xfn1);
  191. aggFuncInfo[aggno].xfn1_oid = xfn1_oid;
  192. value1[aggno] = (Datum) AggNameGetInitVal((char *) aggname,
  193.    aggp->aggbasetype,
  194.   1,
  195.   &isNull1);
  196. /* ------------------------------------------
  197.  * If the initial value for the first transition function
  198.  * doesn't exist in the pg_aggregate table then we let
  199.  * the first value returned from the outer procNode become
  200.  * the initial value. (This is useful for aggregates like
  201.  * max{} and min{}.)
  202.  * ------------------------------------------
  203.  */
  204. if (isNull1)
  205. {
  206. noInitValue[aggno] = 1;
  207. nulls[aggno] = 1;
  208. }
  209. }
  210. }
  211. /* ----------------
  212.  *  for each tuple from the the outer plan, apply all the aggregates
  213.  * ----------------
  214.  */
  215. for (;;)
  216. {
  217. TupleTableSlot *outerslot;
  218. isNull = isNull1 = isNull2 = 0;
  219. outerslot = ExecProcNode(outerPlan, (Plan *) node);
  220. if (TupIsNull(outerslot))
  221. {
  222. /*
  223.  * when the outerplan doesn't return a single tuple,
  224.  * create a dummy heaptuple anyway because we still need
  225.  * to return a valid aggregate value. The value returned
  226.  * will be the initial values of the transition functions
  227.  */
  228. if (nTuplesAgged == 0)
  229. {
  230. TupleDesc tupType;
  231. Datum    *tupValue;
  232. char    *null_array;
  233. AttrNumber attnum;
  234. tupType = aggstate->csstate.css_ScanTupleSlot->ttc_tupleDescriptor;
  235. tupValue = projInfo->pi_tupValue;
  236. /* initially, set all the values to NULL */
  237. null_array = palloc(sizeof(char) * tupType->natts);
  238. for (attnum = 0; attnum < tupType->natts; attnum++)
  239. null_array[attnum] = 'n';
  240. oneTuple = heap_formtuple(tupType, tupValue, null_array);
  241. pfree(null_array);
  242. }
  243. break;
  244. }
  245. aggno = -1;
  246. foreach(alist, node->aggs)
  247. {
  248. Aggref    *aggref = lfirst(alist);
  249. AggFuncInfo *aggfns = &aggFuncInfo[++aggno];
  250. Datum newVal;
  251. Datum args[2];
  252. /* Do we really need the special case for Var here? */
  253. if (IsA(aggref->target, Var))
  254. {
  255. newVal = aggGetAttr(outerslot, aggref,
  256. &isNull);
  257. }
  258. else
  259. {
  260. econtext->ecxt_scantuple = outerslot;
  261. newVal = ExecEvalExpr(aggref->target, econtext,
  262.   &isNull, &isDone);
  263. }
  264. if (isNull && !aggref->usenulls)
  265. continue; /* ignore this tuple for this agg */
  266. if (aggfns->xfn1.fn_addr != NULL)
  267. {
  268. if (noInitValue[aggno])
  269. {
  270. /*
  271.  * value1 has not been initialized. This is the
  272.  * first non-NULL input value. We use it as the
  273.  * initial value for value1.
  274.  *
  275.  * But we can't just use it straight, we have to make
  276.  * a copy of it since the tuple from which it came
  277.  * will be freed on the next iteration of the
  278.  * scan.  This requires finding out how to copy
  279.  * the Datum.  We assume the datum is of the agg's
  280.  * basetype, or at least binary compatible with
  281.  * it.
  282.  */
  283. Type aggBaseType = typeidType(aggref->basetype);
  284. int attlen = typeLen(aggBaseType);
  285. bool byVal = typeByVal(aggBaseType);
  286. if (byVal)
  287. value1[aggno] = newVal;
  288. else
  289. {
  290. if (attlen == -1) /* variable length */
  291. attlen = VARSIZE((struct varlena *) newVal);
  292. value1[aggno] = (Datum) palloc(attlen);
  293. memcpy((char *) (value1[aggno]), (char *) newVal,
  294.    attlen);
  295. }
  296. noInitValue[aggno] = 0;
  297. nulls[aggno] = 0;
  298. }
  299. else
  300. {
  301. /*
  302.  * apply the transition functions.
  303.  */
  304. args[0] = value1[aggno];
  305. args[1] = newVal;
  306. value1[aggno] = (Datum) fmgr_c(&aggfns->xfn1,
  307.   (FmgrValues *) args, &isNull1);
  308. Assert(!isNull1);
  309. }
  310. }
  311. if (aggfns->xfn2.fn_addr != NULL)
  312. {
  313. args[0] = value2[aggno];
  314. value2[aggno] = (Datum) fmgr_c(&aggfns->xfn2,
  315.   (FmgrValues *) args, &isNull2);
  316. Assert(!isNull2);
  317. }
  318. }
  319. /*
  320.  * keep this for the projection (we only need one of these -
  321.  * all the tuples we aggregate over share the same group
  322.  * column)
  323.  */
  324. if (!oneTuple)
  325. oneTuple = heap_copytuple(outerslot->val);
  326. nTuplesAgged++;
  327. }
  328. /* --------------
  329.  * finalize the aggregate (if necessary), and get the resultant value
  330.  * --------------
  331.  */
  332. aggno = -1;
  333. foreach(alist, node->aggs)
  334. {
  335. char    *args[2];
  336. AggFuncInfo *aggfns = &aggFuncInfo[++aggno];
  337. if (noInitValue[aggno])
  338. {
  339. /*
  340.  * No values found for this agg; return current state.
  341.  * This seems to fix behavior for avg() aggregate. -tgl
  342.  * 12/96
  343.  */
  344. }
  345. else if (aggfns->finalfn.fn_addr != NULL && nTuplesAgged > 0)
  346. {
  347. if (aggfns->finalfn.fn_nargs > 1)
  348. {
  349. args[0] = (char *) value1[aggno];
  350. args[1] = (char *) value2[aggno];
  351. }
  352. else if (aggfns->xfn1.fn_addr != NULL)
  353. args[0] = (char *) value1[aggno];
  354. else if (aggfns->xfn2.fn_addr != NULL)
  355. args[0] = (char *) value2[aggno];
  356. else
  357. elog(NOTICE, "ExecAgg: no valid transition functions??");
  358. value1[aggno] = (Datum) fmgr_c(&aggfns->finalfn,
  359.    (FmgrValues *) args, &(nulls[aggno]));
  360. }
  361. else if (aggfns->xfn1.fn_addr != NULL)
  362. {
  363. /*
  364.  * value in the right place, ignore. (If you remove this
  365.  * case, fix the else part. -ay 2/95)
  366.  */
  367. }
  368. else if (aggfns->xfn2.fn_addr != NULL)
  369. value1[aggno] = value2[aggno];
  370. else
  371. elog(ERROR, "ExecAgg: no valid transition functions??");
  372. }
  373. /*
  374.  * whether the aggregation is done depends on whether we are doing
  375.  * aggregation over groups or the entire table
  376.  */
  377. if (nodeTag(outerPlan) == T_Group)
  378. {
  379. /* aggregation over groups */
  380. aggstate->agg_done = ((Group *) outerPlan)->grpstate->grp_done;
  381. }
  382. else
  383. aggstate->agg_done = TRUE;
  384. /* ----------------
  385.  * form a projection tuple, store it in the result tuple
  386.  * slot and return it.
  387.  * ----------------
  388.  */
  389. ExecStoreTuple(oneTuple,
  390.    aggstate->csstate.css_ScanTupleSlot,
  391.    InvalidBuffer,
  392.    false);
  393. econtext->ecxt_scantuple = aggstate->csstate.css_ScanTupleSlot;
  394. resultSlot = ExecProject(projInfo, &isDone);
  395. /*
  396.  * As long as the retrieved group does not match the
  397.  * qualifications it is ignored and the next group is fetched
  398.  */
  399. if (node->plan.qual != NULL)
  400. qual_result = ExecQual(fix_opids(node->plan.qual), econtext);
  401. else
  402. qual_result = false;
  403. if (oneTuple)
  404. pfree(oneTuple);
  405. }
  406. while (node->plan.qual != NULL && qual_result != true);
  407. return resultSlot;
  408. }
  409. /* -----------------
  410.  * ExecInitAgg
  411.  *
  412.  * Creates the run-time information for the agg node produced by the
  413.  * planner and initializes its outer subtree
  414.  * -----------------
  415.  */
  416. bool
  417. ExecInitAgg(Agg *node, EState *estate, Plan *parent)
  418. {
  419. AggState   *aggstate;
  420. Plan    *outerPlan;
  421. ExprContext *econtext;
  422. /*
  423.  * assign the node's execution state
  424.  */
  425. node->plan.state = estate;
  426. /*
  427.  * create state structure
  428.  */
  429. aggstate = makeNode(AggState);
  430. node->aggstate = aggstate;
  431. aggstate->agg_done = FALSE;
  432. /*
  433.  * assign node's base id and create expression context
  434.  */
  435. ExecAssignNodeBaseInfo(estate, &aggstate->csstate.cstate, (Plan *) parent);
  436. ExecAssignExprContext(estate, &aggstate->csstate.cstate);
  437. #define AGG_NSLOTS 2
  438. /*
  439.  * tuple table initialization
  440.  */
  441. ExecInitScanTupleSlot(estate, &aggstate->csstate);
  442. ExecInitResultTupleSlot(estate, &aggstate->csstate.cstate);
  443. econtext = aggstate->csstate.cstate.cs_ExprContext;
  444. econtext->ecxt_values = (Datum *) palloc(sizeof(Datum) * length(node->aggs));
  445. MemSet(econtext->ecxt_values, 0, sizeof(Datum) * length(node->aggs));
  446. econtext->ecxt_nulls = (char *) palloc(sizeof(char) * length(node->aggs));
  447. MemSet(econtext->ecxt_nulls, 0, sizeof(char) * length(node->aggs));
  448. /*
  449.  * initializes child nodes
  450.  */
  451. outerPlan = outerPlan(node);
  452. ExecInitNode(outerPlan, estate, (Plan *) node);
  453. /*
  454.  * Result runs in its own context, but make it use our aggregates fix
  455.  * for 'select sum(2+2)'
  456.  */
  457. if (nodeTag(outerPlan) == T_Result)
  458. {
  459. ((Result *) outerPlan)->resstate->cstate.cs_ProjInfo->pi_exprContext->ecxt_values =
  460. econtext->ecxt_values;
  461. ((Result *) outerPlan)->resstate->cstate.cs_ProjInfo->pi_exprContext->ecxt_nulls =
  462. econtext->ecxt_nulls;
  463. }
  464. /* ----------------
  465.  * initialize tuple type.
  466.  * ----------------
  467.  */
  468. ExecAssignScanTypeFromOuterPlan((Plan *) node, &aggstate->csstate);
  469. /*
  470.  * Initialize tuple type for both result and scan. This node does no
  471.  * projection
  472.  */
  473. ExecAssignResultTypeFromTL((Plan *) node, &aggstate->csstate.cstate);
  474. ExecAssignProjectionInfo((Plan *) node, &aggstate->csstate.cstate);
  475. return TRUE;
  476. }
  477. int
  478. ExecCountSlotsAgg(Agg *node)
  479. {
  480. return ExecCountSlotsNode(outerPlan(node)) +
  481. ExecCountSlotsNode(innerPlan(node)) +
  482. AGG_NSLOTS;
  483. }
  484. /* ------------------------
  485.  * ExecEndAgg(node)
  486.  *
  487.  * -----------------------
  488.  */
  489. void
  490. ExecEndAgg(Agg *node)
  491. {
  492. AggState   *aggstate;
  493. Plan    *outerPlan;
  494. aggstate = node->aggstate;
  495. ExecFreeProjectionInfo(&aggstate->csstate.cstate);
  496. outerPlan = outerPlan(node);
  497. ExecEndNode(outerPlan, (Plan *) node);
  498. /* clean up tuple table */
  499. ExecClearTuple(aggstate->csstate.css_ScanTupleSlot);
  500. }
  501. /*****************************************************************************
  502.  * Support Routines
  503.  *****************************************************************************/
  504. /*
  505.  * aggGetAttr -
  506.  *   get the attribute (specified in the Var node in agg) to aggregate
  507.  *   over from the tuple
  508.  */
  509. static Datum
  510. aggGetAttr(TupleTableSlot *slot,
  511.    Aggref *aggref,
  512.    bool *isNull)
  513. {
  514. Datum result;
  515. AttrNumber attnum;
  516. HeapTuple heapTuple;
  517. TupleDesc tuple_type;
  518. Buffer buffer;
  519. /* ----------------
  520.  *  extract tuple information from the slot
  521.  * ----------------
  522.  */
  523. heapTuple = slot->val;
  524. tuple_type = slot->ttc_tupleDescriptor;
  525. buffer = slot->ttc_buffer;
  526. attnum = ((Var *) aggref->target)->varattno;
  527. /*
  528.  * If the attribute number is invalid, then we are supposed to return
  529.  * the entire tuple, we give back a whole slot so that callers know
  530.  * what the tuple looks like.
  531.  */
  532. if (attnum == InvalidAttrNumber)
  533. {
  534. TupleTableSlot *tempSlot;
  535. TupleDesc td;
  536. HeapTuple tup;
  537. tempSlot = makeNode(TupleTableSlot);
  538. tempSlot->ttc_shouldFree = false;
  539. tempSlot->ttc_descIsNew = true;
  540. tempSlot->ttc_tupleDescriptor = (TupleDesc) NULL;
  541. tempSlot->ttc_buffer = InvalidBuffer;
  542. tempSlot->ttc_whichplan = -1;
  543. tup = heap_copytuple(heapTuple);
  544. td = CreateTupleDescCopy(slot->ttc_tupleDescriptor);
  545. ExecSetSlotDescriptor(tempSlot, td);
  546. ExecStoreTuple(tup, tempSlot, InvalidBuffer, true);
  547. return (Datum) tempSlot;
  548. }
  549. result = heap_getattr(heapTuple, /* tuple containing attribute */
  550.   attnum, /* attribute number of desired
  551.  * attribute */
  552.   tuple_type, /* tuple descriptor of tuple */
  553.   isNull); /* return: is attribute null? */
  554. /* ----------------
  555.  * return null if att is null
  556.  * ----------------
  557.  */
  558. if (*isNull)
  559. return (Datum) NULL;
  560. return result;
  561. }
  562. void
  563. ExecReScanAgg(Agg *node, ExprContext *exprCtxt, Plan *parent)
  564. {
  565. AggState   *aggstate = node->aggstate;
  566. ExprContext *econtext = aggstate->csstate.cstate.cs_ExprContext;
  567. aggstate->agg_done = FALSE;
  568. MemSet(econtext->ecxt_values, 0, sizeof(Datum) * length(node->aggs));
  569. MemSet(econtext->ecxt_nulls, 0, sizeof(char) * length(node->aggs));
  570. /*
  571.  * if chgParam of subnode is not null then plan will be re-scanned by
  572.  * first ExecProcNode.
  573.  */
  574. if (((Plan *) node)->lefttree->chgParam == NULL)
  575. ExecReScan(((Plan *) node)->lefttree, exprCtxt, (Plan *) node);
  576. }