CompositeRecordReader.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:13k
源码类别:

网格计算

开发平台:

Java

  1. /** * Licensed to the Apache Software Foundation (ASF) under one
  2.  * or more contributor license agreements.  See the NOTICE file
  3.  * distributed with this work for additional information
  4.  * regarding copyright ownership.  The ASF licenses this file
  5.  * to you under the Apache License, Version 2.0 (the
  6.  * "License"); you may not use this file except in compliance
  7.  * with the License.  You may obtain a copy of the License at
  8.  *
  9.  *     http://www.apache.org/licenses/LICENSE-2.0
  10.  *
  11.  * Unless required by applicable law or agreed to in writing, software
  12.  * distributed under the License is distributed on an "AS IS" BASIS,
  13.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14.  * See the License for the specific language governing permissions and
  15.  * limitations under the License.
  16.  */
  17. package org.apache.hadoop.mapred.join;
  18. import java.io.IOException;
  19. import java.util.ArrayList;
  20. import java.util.Comparator;
  21. import java.util.PriorityQueue;
  22. import org.apache.hadoop.conf.Configurable;
  23. import org.apache.hadoop.conf.Configuration;
  24. import org.apache.hadoop.io.Writable;
  25. import org.apache.hadoop.io.WritableComparable;
  26. import org.apache.hadoop.io.WritableComparator;
  27. import org.apache.hadoop.io.WritableUtils;
  28. import org.apache.hadoop.mapred.RecordReader;
  29. import org.apache.hadoop.util.ReflectionUtils;
  30. /**
  31.  * A RecordReader that can effect joins of RecordReaders sharing a common key
  32.  * type and partitioning.
  33.  */
  34. public abstract class CompositeRecordReader<
  35.     K extends WritableComparable, // key type
  36.     V extends Writable,           // accepts RecordReader<K,V> as children
  37.     X extends Writable>           // emits Writables of this type
  38.     implements Configurable {
  39.   private int id;
  40.   private Configuration conf;
  41.   private final ResetableIterator<X> EMPTY = new ResetableIterator.EMPTY<X>();
  42.   private WritableComparator cmp;
  43.   private Class<? extends WritableComparable> keyclass;
  44.   private PriorityQueue<ComposableRecordReader<K,?>> q;
  45.   protected final JoinCollector jc;
  46.   protected final ComposableRecordReader<K,? extends V>[] kids;
  47.   protected abstract boolean combine(Object[] srcs, TupleWritable value);
  48.   /**
  49.    * Create a RecordReader with <tt>capacity</tt> children to position
  50.    * <tt>id</tt> in the parent reader.
  51.    * The id of a root CompositeRecordReader is -1 by convention, but relying
  52.    * on this is not recommended.
  53.    */
  54.   @SuppressWarnings("unchecked") // Generic array assignment
  55.   public CompositeRecordReader(int id, int capacity,
  56.       Class<? extends WritableComparator> cmpcl)
  57.       throws IOException {
  58.     assert capacity > 0 : "Invalid capacity";
  59.     this.id = id;
  60.     if (null != cmpcl) {
  61.       cmp = ReflectionUtils.newInstance(cmpcl, null);
  62.       q = new PriorityQueue<ComposableRecordReader<K,?>>(3,
  63.           new Comparator<ComposableRecordReader<K,?>>() {
  64.             public int compare(ComposableRecordReader<K,?> o1,
  65.                                ComposableRecordReader<K,?> o2) {
  66.               return cmp.compare(o1.key(), o2.key());
  67.             }
  68.           });
  69.     }
  70.     jc = new JoinCollector(capacity);
  71.     kids = new ComposableRecordReader[capacity];
  72.   }
  73.   /**
  74.    * Return the position in the collector this class occupies.
  75.    */
  76.   public int id() {
  77.     return id;
  78.   }
  79.   /**
  80.    * {@inheritDoc}
  81.    */
  82.   public void setConf(Configuration conf) {
  83.     this.conf = conf;
  84.   }
  85.   /**
  86.    * {@inheritDoc}
  87.    */
  88.   public Configuration getConf() {
  89.     return conf;
  90.   }
  91.   /**
  92.    * Return sorted list of RecordReaders for this composite.
  93.    */
  94.   protected PriorityQueue<ComposableRecordReader<K,?>> getRecordReaderQueue() {
  95.     return q;
  96.   }
  97.   /**
  98.    * Return comparator defining the ordering for RecordReaders in this
  99.    * composite.
  100.    */
  101.   protected WritableComparator getComparator() {
  102.     return cmp;
  103.   }
  104.   /**
  105.    * Add a RecordReader to this collection.
  106.    * The id() of a RecordReader determines where in the Tuple its
  107.    * entry will appear. Adding RecordReaders with the same id has
  108.    * undefined behavior.
  109.    */
  110.   public void add(ComposableRecordReader<K,? extends V> rr) throws IOException {
  111.     kids[rr.id()] = rr;
  112.     if (null == q) {
  113.       cmp = WritableComparator.get(rr.createKey().getClass());
  114.       q = new PriorityQueue<ComposableRecordReader<K,?>>(3,
  115.           new Comparator<ComposableRecordReader<K,?>>() {
  116.             public int compare(ComposableRecordReader<K,?> o1,
  117.                                ComposableRecordReader<K,?> o2) {
  118.               return cmp.compare(o1.key(), o2.key());
  119.             }
  120.           });
  121.     }
  122.     if (rr.hasNext()) {
  123.       q.add(rr);
  124.     }
  125.   }
  126.   /**
  127.    * Collector for join values.
  128.    * This accumulates values for a given key from the child RecordReaders. If
  129.    * one or more child RR contain duplicate keys, this will emit the cross
  130.    * product of the associated values until exhausted.
  131.    */
  132.   class JoinCollector {
  133.     private K key;
  134.     private ResetableIterator<X>[] iters;
  135.     private int pos = -1;
  136.     private boolean first = true;
  137.     /**
  138.      * Construct a collector capable of handling the specified number of
  139.      * children.
  140.      */
  141.     @SuppressWarnings("unchecked") // Generic array assignment
  142.     public JoinCollector(int card) {
  143.       iters = new ResetableIterator[card];
  144.       for (int i = 0; i < iters.length; ++i) {
  145.         iters[i] = EMPTY;
  146.       }
  147.     }
  148.     /**
  149.      * Register a given iterator at position id.
  150.      */
  151.     public void add(int id, ResetableIterator<X> i)
  152.         throws IOException {
  153.       iters[id] = i;
  154.     }
  155.     /**
  156.      * Return the key associated with this collection.
  157.      */
  158.     public K key() {
  159.       return key;
  160.     }
  161.     /**
  162.      * Codify the contents of the collector to be iterated over.
  163.      * When this is called, all RecordReaders registered for this
  164.      * key should have added ResetableIterators.
  165.      */
  166.     public void reset(K key) {
  167.       this.key = key;
  168.       first = true;
  169.       pos = iters.length - 1;
  170.       for (int i = 0; i < iters.length; ++i) {
  171.         iters[i].reset();
  172.       }
  173.     }
  174.     /**
  175.      * Clear all state information.
  176.      */
  177.     public void clear() {
  178.       key = null;
  179.       pos = -1;
  180.       for (int i = 0; i < iters.length; ++i) {
  181.         iters[i].clear();
  182.         iters[i] = EMPTY;
  183.       }
  184.     }
  185.     /**
  186.      * Returns false if exhausted or if reset(K) has not been called.
  187.      */
  188.     protected boolean hasNext() {
  189.       return !(pos < 0);
  190.     }
  191.     /**
  192.      * Populate Tuple from iterators.
  193.      * It should be the case that, given iterators i_1...i_n over values from
  194.      * sources s_1...s_n sharing key k, repeated calls to next should yield
  195.      * I x I.
  196.      */
  197.     @SuppressWarnings("unchecked") // No static typeinfo on Tuples
  198.     protected boolean next(TupleWritable val) throws IOException {
  199.       if (first) {
  200.         int i = -1;
  201.         for (pos = 0; pos < iters.length; ++pos) {
  202.           if (iters[pos].hasNext() && iters[pos].next((X)val.get(pos))) {
  203.             i = pos;
  204.             val.setWritten(i);
  205.           }
  206.         }
  207.         pos = i;
  208.         first = false;
  209.         if (pos < 0) {
  210.           clear();
  211.           return false;
  212.         }
  213.         return true;
  214.       }
  215.       while (0 <= pos && !(iters[pos].hasNext() &&
  216.                            iters[pos].next((X)val.get(pos)))) {
  217.         --pos;
  218.       }
  219.       if (pos < 0) {
  220.         clear();
  221.         return false;
  222.       }
  223.       val.setWritten(pos);
  224.       for (int i = 0; i < pos; ++i) {
  225.         if (iters[i].replay((X)val.get(i))) {
  226.           val.setWritten(i);
  227.         }
  228.       }
  229.       while (pos + 1 < iters.length) {
  230.         ++pos;
  231.         iters[pos].reset();
  232.         if (iters[pos].hasNext() && iters[pos].next((X)val.get(pos))) {
  233.           val.setWritten(pos);
  234.         }
  235.       }
  236.       return true;
  237.     }
  238.     /**
  239.      * Replay the last Tuple emitted.
  240.      */
  241.     @SuppressWarnings("unchecked") // No static typeinfo on Tuples
  242.     public boolean replay(TupleWritable val) throws IOException {
  243.       // The last emitted tuple might have drawn on an empty source;
  244.       // it can't be cleared prematurely, b/c there may be more duplicate
  245.       // keys in iterator positions < pos
  246.       assert !first;
  247.       boolean ret = false;
  248.       for (int i = 0; i < iters.length; ++i) {
  249.         if (iters[i].replay((X)val.get(i))) {
  250.           val.setWritten(i);
  251.           ret = true;
  252.         }
  253.       }
  254.       return ret;
  255.     }
  256.     /**
  257.      * Close all child iterators.
  258.      */
  259.     public void close() throws IOException {
  260.       for (int i = 0; i < iters.length; ++i) {
  261.         iters[i].close();
  262.       }
  263.     }
  264.     /**
  265.      * Write the next value into key, value as accepted by the operation
  266.      * associated with this set of RecordReaders.
  267.      */
  268.     public boolean flush(TupleWritable value) throws IOException {
  269.       while (hasNext()) {
  270.         value.clearWritten();
  271.         if (next(value) && combine(kids, value)) {
  272.           return true;
  273.         }
  274.       }
  275.       return false;
  276.     }
  277.   }
  278.   /**
  279.    * Return the key for the current join or the value at the top of the
  280.    * RecordReader heap.
  281.    */
  282.   public K key() {
  283.     if (jc.hasNext()) {
  284.       return jc.key();
  285.     }
  286.     if (!q.isEmpty()) {
  287.       return q.peek().key();
  288.     }
  289.     return null;
  290.   }
  291.   /**
  292.    * Clone the key at the top of this RR into the given object.
  293.    */
  294.   public void key(K key) throws IOException {
  295.     WritableUtils.cloneInto(key, key());
  296.   }
  297.   /**
  298.    * Return true if it is possible that this could emit more values.
  299.    */
  300.   public boolean hasNext() {
  301.     return jc.hasNext() || !q.isEmpty();
  302.   }
  303.   /**
  304.    * Pass skip key to child RRs.
  305.    */
  306.   public void skip(K key) throws IOException {
  307.     ArrayList<ComposableRecordReader<K,?>> tmp =
  308.       new ArrayList<ComposableRecordReader<K,?>>();
  309.     while (!q.isEmpty() && cmp.compare(q.peek().key(), key) <= 0) {
  310.       tmp.add(q.poll());
  311.     }
  312.     for (ComposableRecordReader<K,?> rr : tmp) {
  313.       rr.skip(key);
  314.       if (rr.hasNext()) {
  315.         q.add(rr);
  316.       }
  317.     }
  318.   }
  319.   /**
  320.    * Obtain an iterator over the child RRs apropos of the value type
  321.    * ultimately emitted from this join.
  322.    */
  323.   protected abstract ResetableIterator<X> getDelegate();
  324.   /**
  325.    * If key provided matches that of this Composite, give JoinCollector
  326.    * iterator over values it may emit.
  327.    */
  328.   @SuppressWarnings("unchecked") // No values from static EMPTY class
  329.   public void accept(CompositeRecordReader.JoinCollector jc, K key)
  330.       throws IOException {
  331.     if (hasNext() && 0 == cmp.compare(key, key())) {
  332.       fillJoinCollector(createKey());
  333.       jc.add(id, getDelegate());
  334.       return;
  335.     }
  336.     jc.add(id, EMPTY);
  337.   }
  338.   /**
  339.    * For all child RRs offering the key provided, obtain an iterator
  340.    * at that position in the JoinCollector.
  341.    */
  342.   protected void fillJoinCollector(K iterkey) throws IOException {
  343.     if (!q.isEmpty()) {
  344.       q.peek().key(iterkey);
  345.       while (0 == cmp.compare(q.peek().key(), iterkey)) {
  346.         ComposableRecordReader<K,?> t = q.poll();
  347.         t.accept(jc, iterkey);
  348.         if (t.hasNext()) {
  349.           q.add(t);
  350.         } else if (q.isEmpty()) {
  351.           return;
  352.         }
  353.       }
  354.     }
  355.   }
  356.   /**
  357.    * Implement Comparable contract (compare key of join or head of heap
  358.    * with that of another).
  359.    */
  360.   public int compareTo(ComposableRecordReader<K,?> other) {
  361.     return cmp.compare(key(), other.key());
  362.   }
  363.   /**
  364.    * Create a new key value common to all child RRs.
  365.    * @throws ClassCastException if key classes differ.
  366.    */
  367.   @SuppressWarnings("unchecked") // Explicit check for key class agreement
  368.   public K createKey() {
  369.     if (null == keyclass) {
  370.       final Class<?> cls = kids[0].createKey().getClass();
  371.       for (RecordReader<K,? extends Writable> rr : kids) {
  372.         if (!cls.equals(rr.createKey().getClass())) {
  373.           throw new ClassCastException("Child key classes fail to agree");
  374.         }
  375.       }
  376.       keyclass = cls.asSubclass(WritableComparable.class);
  377.     }
  378.     return (K) ReflectionUtils.newInstance(keyclass, getConf());
  379.   }
  380.   /**
  381.    * Create a value to be used internally for joins.
  382.    */
  383.   protected TupleWritable createInternalValue() {
  384.     Writable[] vals = new Writable[kids.length];
  385.     for (int i = 0; i < vals.length; ++i) {
  386.       vals[i] = kids[i].createValue();
  387.     }
  388.     return new TupleWritable(vals);
  389.   }
  390.   /**
  391.    * Unsupported (returns zero in all cases).
  392.    */
  393.   public long getPos() throws IOException {
  394.     return 0;
  395.   }
  396.   /**
  397.    * Close all child RRs.
  398.    */
  399.   public void close() throws IOException {
  400.     if (kids != null) {
  401.       for (RecordReader<K,? extends Writable> rr : kids) {
  402.         rr.close();
  403.       }
  404.     }
  405.     if (jc != null) {
  406.       jc.close();
  407.     }
  408.   }
  409.   /**
  410.    * Report progress as the minimum of all child RR progress.
  411.    */
  412.   public float getProgress() throws IOException {
  413.     float ret = 1.0f;
  414.     for (RecordReader<K,? extends Writable> rr : kids) {
  415.       ret = Math.min(ret, rr.getProgress());
  416.     }
  417.     return ret;
  418.   }
  419. }