nodeTee.c
上传用户:blenddy
上传日期:2007-01-07
资源大小:6495k
文件大小:13k
源码类别:

数据库系统

开发平台:

Unix_Linux

  1. /*-------------------------------------------------------------------------
  2.  *
  3.  * nodeTee.c
  4.  *
  5.  *
  6.  * Copyright (c) 1994, Regents of the University of California
  7.  *
  8.  *  DESCRIPTION
  9.  * This code provides support for a tee node, which allows
  10.  * multiple parent in a megaplan.
  11.  *
  12.  *  INTERFACE ROUTINES
  13.  * ExecTee
  14.  * ExecInitTee
  15.  * ExecEndTee
  16.  *
  17.  * $Id: nodeTee.c,v 1.3 1999/05/25 22:41:04 momjian Exp $
  18.  *
  19.  *-------------------------------------------------------------------------
  20.  */
  21. #include <sys/types.h>
  22. #include <sys/file.h>
  23. #include "postgres.h"
  24. #include "utils/palloc.h"
  25. #include "utils/relcache.h"
  26. #include "utils/mcxt.h"
  27. #include "storage/bufmgr.h"
  28. #include "storage/smgr.h"
  29. #include "optimizer/internal.h"
  30. #include "executor/executor.h"
  31. #include "executor/nodeTee.h"
  32. #include "catalog/catalog.h"
  33. #include "catalog/heap.h"
  34. #include "tcop/pquery.h"
  35. #include "access/heapam.h"
  36. /* ------------------------------------------------------------------
  37.  * ExecInitTee
  38.  *
  39.  * Create tee state
  40.  *
  41.  * ------------------------------------------------------------------
  42.  */
  43. bool
  44. ExecInitTee(Tee * node, EState *currentEstate, Plan *parent)
  45. {
  46. TeeState   *teeState;
  47. Plan    *outerPlan;
  48. int len;
  49. Relation bufferRel;
  50. TupleDesc tupType;
  51. EState    *estate;
  52. /*
  53.  * it is possible that the Tee has already been initialized since it
  54.  * can be reached by multiple parents. If it is already initialized,
  55.  * simply return and do not initialize the children nodes again
  56.  */
  57. if (node->plan.state)
  58. return TRUE;
  59. /* ----------------
  60.  * assign the node's execution state
  61.  * ----------------
  62.  */
  63. /*
  64.  * make a new executor state, because we have a different
  65.  * es_range_table
  66.  */
  67. /*    node->plan.state = estate;*/
  68. estate = CreateExecutorState();
  69. estate->es_direction = currentEstate->es_direction;
  70. estate->es_BaseId = currentEstate->es_BaseId;
  71. estate->es_BaseId = currentEstate->es_BaseId;
  72. estate->es_tupleTable = currentEstate->es_tupleTable;
  73. estate->es_refcount = currentEstate->es_refcount;
  74. estate->es_junkFilter = currentEstate->es_junkFilter;
  75. estate->es_snapshot = currentEstate->es_snapshot;
  76. /*
  77.  * use the range table for Tee subplan since the range tables for the
  78.  * two parents may be different
  79.  */
  80. if (node->rtentries)
  81. estate->es_range_table = node->rtentries;
  82. else
  83. estate->es_range_table = currentEstate->es_range_table;
  84. node->plan.state = estate;
  85. /* ----------------
  86.  * create teeState structure
  87.  * ----------------
  88.  */
  89. teeState = makeNode(TeeState);
  90. teeState->tee_leftPlace = 0;
  91. teeState->tee_rightPlace = 0;
  92. teeState->tee_lastPlace = 0;
  93. teeState->tee_bufferRel = NULL;
  94. teeState->tee_leftScanDesc = NULL;
  95. teeState->tee_rightScanDesc = NULL;
  96. node->teestate = teeState;
  97. /* ----------------
  98.  * Miscellanious initialization
  99.  *
  100.  *  + assign node's base_id
  101.  *  + assign debugging hooks and
  102.  *  + create expression context for node
  103.  * ----------------
  104.  */
  105. ExecAssignNodeBaseInfo(estate, &(teeState->cstate), parent);
  106. ExecAssignExprContext(estate, &(teeState->cstate));
  107. #define TEE_NSLOTS 2
  108. /* ----------------
  109.  * initialize tuple slots
  110.  * ----------------
  111.  */
  112. ExecInitResultTupleSlot(estate, &(teeState->cstate));
  113. /* initialize child nodes */
  114. outerPlan = outerPlan((Plan *) node);
  115. ExecInitNode(outerPlan, estate, (Plan *) node);
  116. /* ----------------
  117.  * the tuple type info is from the outer plan of this node
  118.  * the result type is also the same as the outerplan
  119.  */
  120. ExecAssignResultTypeFromOuterPlan((Plan *) node, &(teeState->cstate));
  121. ExecAssignProjectionInfo((Plan *) node, &teeState->cstate);
  122. /* ---------------------------------------
  123.    initialize temporary relation to buffer tuples
  124. */
  125. tupType = ExecGetResultType(&(teeState->cstate));
  126. len = ExecTargetListLength(((Plan *) node)->targetlist);
  127. /*
  128.  * create a catalogued relation even though this is a temporary
  129.  * relation
  130.  */
  131. /* cleanup of catalogued relations is easier to do */
  132. if (node->teeTableName[0] != '')
  133. {
  134. Relation r;
  135. teeState->tee_bufferRelname = pstrdup(node->teeTableName);
  136. /*
  137.  * we are given an tee table name, if a relation by that name
  138.  * exists, then we open it, else we create it and then open it
  139.  */
  140. r = RelationNameGetRelation(teeState->tee_bufferRelname);
  141. if (RelationIsValid(r))
  142. bufferRel = heap_openr(teeState->tee_bufferRelname);
  143. else
  144. bufferRel = heap_open(
  145. heap_create_with_catalog(teeState->tee_bufferRelname,
  146.   tupType, RELKIND_RELATION, false));
  147. }
  148. else
  149. {
  150. sprintf(teeState->tee_bufferRelname,
  151. "ttemp_%d", /* 'ttemp' for 'tee' temporary */
  152. newoid());
  153. bufferRel = heap_open(
  154. heap_create_with_catalog(teeState->tee_bufferRelname,
  155.   tupType, RELKIND_RELATION, false));
  156. }
  157. teeState->tee_bufferRel = bufferRel;
  158. /*
  159.  * initialize a memory context for allocating thing like scan
  160.  * descriptors
  161.  */
  162. /*
  163.  * we do this so that on cleanup of the tee, we can free things. if we
  164.  * didn't have our own memory context, we would be in the memory
  165.  * context of the portal that we happen to be using at the moment
  166.  */
  167. teeState->tee_mcxt = (MemoryContext) CreateGlobalMemory(teeState->tee_bufferRelname);
  168. /*
  169.  * don't initialize the scan descriptors here because it's not good to
  170.  * initialize scan descriptors on empty rels. Wait until the scan
  171.  * descriptors are needed before initializing them.
  172.  */
  173. teeState->tee_leftScanDesc = NULL;
  174. teeState->tee_rightScanDesc = NULL;
  175. return TRUE;
  176. }
  177. int
  178. ExecCountSlotsTee(Tee * node)
  179. {
  180. /* Tee nodes can't have innerPlans */
  181. return ExecCountSlotsNode(outerPlan(node)) + TEE_NSLOTS;
  182. }
  183. /* ----------------------------------------------------------------
  184.    initTeeScanDescs
  185.   initializes the left and right scandescs on the temporary
  186.   relation of a Tee node
  187.   must open two separate scan descriptors,
  188.   because the left and right scans may be at different points
  189. * ----------------------------------------------------------------
  190. */
  191. static void
  192. initTeeScanDescs(Tee * node)
  193. {
  194. TeeState   *teeState;
  195. Relation bufferRel;
  196. ScanDirection dir;
  197. Snapshot snapshot;
  198. MemoryContext orig;
  199. teeState = node->teestate;
  200. if (teeState->tee_leftScanDesc && teeState->tee_rightScanDesc)
  201. return;
  202. orig = CurrentMemoryContext;
  203. MemoryContextSwitchTo(teeState->tee_mcxt);
  204. bufferRel = teeState->tee_bufferRel;
  205. dir = ((Plan *) node)->state->es_direction; /* backwards not handled
  206.  * yet XXX */
  207. snapshot = ((Plan *) node)->state->es_snapshot;
  208. if (teeState->tee_leftScanDesc == NULL)
  209. {
  210. teeState->tee_leftScanDesc = heap_beginscan(bufferRel,
  211. ScanDirectionIsBackward(dir),
  212. snapshot,
  213. 0, /* num scan keys */
  214. NULL /* scan keys */
  215. );
  216. }
  217. if (teeState->tee_rightScanDesc == NULL)
  218. {
  219. teeState->tee_rightScanDesc = heap_beginscan(bufferRel,
  220. ScanDirectionIsBackward(dir),
  221.  snapshot,
  222.  0, /* num scan keys */
  223.  NULL /* scan keys */
  224. );
  225. }
  226. MemoryContextSwitchTo(orig);
  227. }
  228. /* ----------------------------------------------------------------
  229.  * ExecTee(node)
  230.  *
  231.  *
  232.  * A Tee serves to connect a subplan to multiple parents.
  233.  * the subplan is always the outplan of the Tee node.
  234.  *
  235.  * The Tee gets requests from either leftParent or rightParent,
  236.  * fetches the result tuple from the child, and then
  237.  * stored the result into a temporary relation (serving as a queue).
  238.  * leftPlace and rightPlace keep track of where the left and rightParents
  239.  * are.
  240.  * If a parent requests a tuple and that parent is not at the end
  241.  * of the temporary relation, then the request is satisfied from
  242.  * the queue instead of by executing the child plan
  243.  *
  244.  * ----------------------------------------------------------------
  245.  */
  246. TupleTableSlot *
  247. ExecTee(Tee * node, Plan *parent)
  248. {
  249. EState    *estate;
  250. TeeState   *teeState;
  251. int leftPlace,
  252. rightPlace,
  253. lastPlace;
  254. int branch;
  255. TupleTableSlot *result;
  256. TupleTableSlot *slot;
  257. Plan    *childNode;
  258. ScanDirection dir;
  259. HeapTuple heapTuple;
  260. Relation bufferRel;
  261. HeapScanDesc scanDesc;
  262. estate = ((Plan *) node)->state;
  263. teeState = node->teestate;
  264. leftPlace = teeState->tee_leftPlace;
  265. rightPlace = teeState->tee_rightPlace;
  266. lastPlace = teeState->tee_lastPlace;
  267. bufferRel = teeState->tee_bufferRel;
  268. childNode = outerPlan(node);
  269. dir = estate->es_direction;
  270. /* XXX doesn't handle backwards direction yet */
  271. if (parent == node->leftParent)
  272. branch = leftPlace;
  273. else if ((parent == node->rightParent) || (parent == (Plan *) node))
  274. /*
  275.  * the tee node could be the root node of the plan, in which case,
  276.  * we treat it like a right-parent pull
  277.  */
  278. branch = rightPlace;
  279. else
  280. {
  281. elog(ERROR, "A Tee node can only be executed from its left or right parentn");
  282. return NULL;
  283. }
  284. if (branch == lastPlace)
  285. { /* we're at the end of the queue already,
  286.  * - get a new tuple from the child plan,
  287.  * - store it in the queue, - increment
  288.  * lastPlace, - increment leftPlace or
  289.  * rightPlace as appropriate, - and return
  290.  * result */
  291. slot = ExecProcNode(childNode, (Plan *) node);
  292. if (!TupIsNull(slot))
  293. {
  294. /*
  295.  * heap_insert changes something...
  296.  */
  297. if (slot->ttc_buffer != InvalidBuffer)
  298. heapTuple = heap_copytuple(slot->val);
  299. else
  300. heapTuple = slot->val;
  301. /* insert into temporary relation */
  302. heap_insert(bufferRel, heapTuple);
  303. if (slot->ttc_buffer != InvalidBuffer)
  304. pfree(heapTuple);
  305. /*
  306.  * once there is data in the temporary relation, ensure that
  307.  * the left and right scandescs are initialized
  308.  */
  309. initTeeScanDescs(node);
  310. scanDesc = (parent == node->leftParent) ?
  311. teeState->tee_leftScanDesc : teeState->tee_rightScanDesc;
  312. {
  313. /*
  314.  * move the scandesc forward so we don't re-read this
  315.  * tuple later
  316.  */
  317. HeapTuple throwAway;
  318. /* Buffer buffer; */
  319. throwAway = heap_getnext(scanDesc, ScanDirectionIsBackward(dir));
  320. }
  321. /*
  322.  * set the shouldFree field of the child's slot so that when
  323.  * the child's slot is free'd, this tuple isn't free'd also
  324.  */
  325. /*
  326.  * does this mean this tuple has to be garbage collected
  327.  * later??
  328.  */
  329. slot->ttc_shouldFree = false;
  330. teeState->tee_lastPlace = lastPlace + 1;
  331. }
  332. result = slot;
  333. }
  334. else
  335. { /* the desired data already exists in the
  336.  * temporary relation */
  337. scanDesc = (parent == node->leftParent) ?
  338. teeState->tee_leftScanDesc : teeState->tee_rightScanDesc;
  339. heapTuple = heap_getnext(scanDesc, ScanDirectionIsBackward(dir));
  340. /*
  341.  * Increase the pin count on the buffer page, because the tuple
  342.  * stored in the slot also points to it (as well as the scan
  343.  * descriptor). If we don't, ExecStoreTuple will decrease the pin
  344.  * count on the next iteration.
  345.  */
  346. if (scanDesc->rs_cbuf != InvalidBuffer)
  347. IncrBufferRefCount(scanDesc->rs_cbuf);
  348. slot = teeState->cstate.cs_ResultTupleSlot;
  349. slot->ttc_tupleDescriptor = RelationGetDescr(bufferRel);
  350. result = ExecStoreTuple(heapTuple, /* tuple to store */
  351. slot, /* slot to store in */
  352. scanDesc->rs_cbuf, /* this tuple's buffer */
  353. false); /* don't free stuff from
  354.  * heap_getnext */
  355. }
  356. if (parent == node->leftParent)
  357. teeState->tee_leftPlace = leftPlace + 1;
  358. else
  359. teeState->tee_rightPlace = rightPlace + 1;
  360. return result;
  361. }
  362. /* ---------------------------------------------------------------------
  363.  * ExecEndTee
  364.  *
  365.  *  End the Tee node, and free up any storage
  366.  * since a Tee node can be downstream of multiple parent nodes,
  367.  * only free when both parents are done
  368.  * --------------------------------------------------------------------
  369.  */
  370. void
  371. ExecEndTee(Tee * node, Plan *parent)
  372. {
  373. EState    *estate;
  374. TeeState   *teeState;
  375. int leftPlace,
  376. rightPlace,
  377. lastPlace;
  378. Relation bufferRel;
  379. MemoryContext orig;
  380. estate = ((Plan *) node)->state;
  381. teeState = node->teestate;
  382. leftPlace = teeState->tee_leftPlace;
  383. rightPlace = teeState->tee_rightPlace;
  384. lastPlace = teeState->tee_lastPlace;
  385. if (!node->leftParent || parent == node->leftParent)
  386. leftPlace = -1;
  387. if (!node->rightParent || parent == node->rightParent)
  388. rightPlace = -1;
  389. if (parent == (Plan *) node)
  390. rightPlace = leftPlace = -1;
  391. teeState->tee_leftPlace = leftPlace;
  392. teeState->tee_rightPlace = rightPlace;
  393. if ((leftPlace == -1) && (rightPlace == -1))
  394. {
  395. /* remove the temporary relations */
  396. /* and close the scan descriptors */
  397. bufferRel = teeState->tee_bufferRel;
  398. if (bufferRel)
  399. {
  400. heap_destroy(bufferRel);
  401. teeState->tee_bufferRel = NULL;
  402. if (teeState->tee_mcxt)
  403. {
  404. orig = CurrentMemoryContext;
  405. MemoryContextSwitchTo(teeState->tee_mcxt);
  406. }
  407. else
  408. orig = 0;
  409. if (teeState->tee_leftScanDesc)
  410. {
  411. heap_endscan(teeState->tee_leftScanDesc);
  412. teeState->tee_leftScanDesc = NULL;
  413. }
  414. if (teeState->tee_rightScanDesc)
  415. {
  416. heap_endscan(teeState->tee_rightScanDesc);
  417. teeState->tee_rightScanDesc = NULL;
  418. }
  419. if (teeState->tee_mcxt)
  420. {
  421. MemoryContextSwitchTo(orig);
  422. teeState->tee_mcxt = NULL;
  423. }
  424. }
  425. }
  426. }