WrappedRecordReader.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:6k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred.join;
  19. import java.io.IOException;
  20. import org.apache.hadoop.io.Writable;
  21. import org.apache.hadoop.io.WritableComparable;
  22. import org.apache.hadoop.io.WritableComparator;
  23. import org.apache.hadoop.io.WritableUtils;
  24. import org.apache.hadoop.mapred.RecordReader;
  25. /**
  26.  * Proxy class for a RecordReader participating in the join framework.
  27.  * This class keeps track of the "head" key-value pair for the
  28.  * provided RecordReader and keeps a store of values matching a key when
  29.  * this source is participating in a join.
  30.  */
  31. public class WrappedRecordReader<K extends WritableComparable,
  32.                           U extends Writable>
  33.     implements ComposableRecordReader<K,U> {
  34.   private boolean empty = false;
  35.   private RecordReader<K,U> rr;
  36.   private int id;  // index at which values will be inserted in collector
  37.   private K khead; // key at the top of this RR
  38.   private U vhead; // value assoc with khead
  39.   private WritableComparator cmp;
  40.   private ResetableIterator<U> vjoin;
  41.   /**
  42.    * For a given RecordReader rr, occupy position id in collector.
  43.    */
  44.   WrappedRecordReader(int id, RecordReader<K,U> rr,
  45.       Class<? extends WritableComparator> cmpcl) throws IOException {
  46.     this.id = id;
  47.     this.rr = rr;
  48.     khead = rr.createKey();
  49.     vhead = rr.createValue();
  50.     try {
  51.       cmp = (null == cmpcl)
  52.         ? WritableComparator.get(khead.getClass())
  53.         : cmpcl.newInstance();
  54.     } catch (InstantiationException e) {
  55.       throw (IOException)new IOException().initCause(e);
  56.     } catch (IllegalAccessException e) {
  57.       throw (IOException)new IOException().initCause(e);
  58.     }
  59.     vjoin = new StreamBackedIterator<U>();
  60.     next();
  61.   }
  62.   /** {@inheritDoc} */
  63.   public int id() {
  64.     return id;
  65.   }
  66.   /**
  67.    * Return the key at the head of this RR.
  68.    */
  69.   public K key() {
  70.     return khead;
  71.   }
  72.   /**
  73.    * Clone the key at the head of this RR into the object supplied.
  74.    */
  75.   public void key(K qkey) throws IOException {
  76.     WritableUtils.cloneInto(qkey, khead);
  77.   }
  78.   /**
  79.    * Return true if the RR- including the k,v pair stored in this object-
  80.    * is exhausted.
  81.    */
  82.   public boolean hasNext() {
  83.     return !empty;
  84.   }
  85.   /**
  86.    * Skip key-value pairs with keys less than or equal to the key provided.
  87.    */
  88.   public void skip(K key) throws IOException {
  89.     if (hasNext()) {
  90.       while (cmp.compare(khead, key) <= 0 && next());
  91.     }
  92.   }
  93.   /**
  94.    * Read the next k,v pair into the head of this object; return true iff
  95.    * the RR and this are exhausted.
  96.    */
  97.   protected boolean next() throws IOException {
  98.     empty = !rr.next(khead, vhead);
  99.     return hasNext();
  100.   }
  101.   /**
  102.    * Add an iterator to the collector at the position occupied by this
  103.    * RecordReader over the values in this stream paired with the key
  104.    * provided (ie register a stream of values from this source matching K
  105.    * with a collector).
  106.    */
  107.                                  // JoinCollector comes from parent, which has
  108.   @SuppressWarnings("unchecked") // no static type for the slot this sits in
  109.   public void accept(CompositeRecordReader.JoinCollector i, K key)
  110.       throws IOException {
  111.     vjoin.clear();
  112.     if (0 == cmp.compare(key, khead)) {
  113.       do {
  114.         vjoin.add(vhead);
  115.       } while (next() && 0 == cmp.compare(key, khead));
  116.     }
  117.     i.add(id, vjoin);
  118.   }
  119.   /**
  120.    * Write key-value pair at the head of this stream to the objects provided;
  121.    * get next key-value pair from proxied RR.
  122.    */
  123.   public boolean next(K key, U value) throws IOException {
  124.     if (hasNext()) {
  125.       WritableUtils.cloneInto(key, khead);
  126.       WritableUtils.cloneInto(value, vhead);
  127.       next();
  128.       return true;
  129.     }
  130.     return false;
  131.   }
  132.   /**
  133.    * Request new key from proxied RR.
  134.    */
  135.   public K createKey() {
  136.     return rr.createKey();
  137.   }
  138.   /**
  139.    * Request new value from proxied RR.
  140.    */
  141.   public U createValue() {
  142.     return rr.createValue();
  143.   }
  144.   /**
  145.    * Request progress from proxied RR.
  146.    */
  147.   public float getProgress() throws IOException {
  148.     return rr.getProgress();
  149.   }
  150.   /**
  151.    * Request position from proxied RR.
  152.    */
  153.   public long getPos() throws IOException {
  154.     return rr.getPos();
  155.   }
  156.   /**
  157.    * Forward close request to proxied RR.
  158.    */
  159.   public void close() throws IOException {
  160.     rr.close();
  161.   }
  162.   /**
  163.    * Implement Comparable contract (compare key at head of proxied RR
  164.    * with that of another).
  165.    */
  166.   public int compareTo(ComposableRecordReader<K,?> other) {
  167.     return cmp.compare(key(), other.key());
  168.   }
  169.   /**
  170.    * Return true iff compareTo(other) retn true.
  171.    */
  172.   @SuppressWarnings("unchecked") // Explicit type check prior to cast
  173.   public boolean equals(Object other) {
  174.     return other instanceof ComposableRecordReader
  175.         && 0 == compareTo((ComposableRecordReader)other);
  176.   }
  177.   public int hashCode() {
  178.     assert false : "hashCode not designed";
  179.     return 42;
  180.   }
  181. }