MultiFilterRecordReader.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:5k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred.join;
  19. import java.io.IOException;
  20. import java.util.PriorityQueue;
  21. import org.apache.hadoop.io.Writable;
  22. import org.apache.hadoop.io.WritableComparable;
  23. import org.apache.hadoop.io.WritableComparator;
  24. import org.apache.hadoop.io.WritableUtils;
  25. import org.apache.hadoop.util.ReflectionUtils;
  26. import org.apache.hadoop.mapred.JobConf;
  27. import org.apache.hadoop.mapred.RecordReader;
  28. /**
  29.  * Base class for Composite join returning values derived from multiple
  30.  * sources, but generally not tuples.
  31.  */
  32. public abstract class MultiFilterRecordReader<K extends WritableComparable,
  33.                                               V extends Writable>
  34.     extends CompositeRecordReader<K,V,V>
  35.     implements ComposableRecordReader<K,V> {
  36.   private Class<? extends Writable> valueclass;
  37.   private TupleWritable ivalue;
  38.   public MultiFilterRecordReader(int id, JobConf conf, int capacity,
  39.       Class<? extends WritableComparator> cmpcl) throws IOException {
  40.     super(id, capacity, cmpcl);
  41.     setConf(conf);
  42.   }
  43.   /**
  44.    * For each tuple emitted, return a value (typically one of the values
  45.    * in the tuple).
  46.    * Modifying the Writables in the tuple is permitted and unlikely to affect
  47.    * join behavior in most cases, but it is not recommended. It's safer to
  48.    * clone first.
  49.    */
  50.   protected abstract V emit(TupleWritable dst) throws IOException;
  51.   /**
  52.    * Default implementation offers {@link #emit} every Tuple from the
  53.    * collector (the outer join of child RRs).
  54.    */
  55.   protected boolean combine(Object[] srcs, TupleWritable dst) {
  56.     return true;
  57.   }
  58.   /** {@inheritDoc} */
  59.   public boolean next(K key, V value) throws IOException {
  60.     if (jc.flush(ivalue)) {
  61.       WritableUtils.cloneInto(key, jc.key());
  62.       WritableUtils.cloneInto(value, emit(ivalue));
  63.       return true;
  64.     }
  65.     jc.clear();
  66.     K iterkey = createKey();
  67.     final PriorityQueue<ComposableRecordReader<K,?>> q = getRecordReaderQueue();
  68.     while (!q.isEmpty()) {
  69.       fillJoinCollector(iterkey);
  70.       jc.reset(iterkey);
  71.       if (jc.flush(ivalue)) {
  72.         WritableUtils.cloneInto(key, jc.key());
  73.         WritableUtils.cloneInto(value, emit(ivalue));
  74.         return true;
  75.       }
  76.       jc.clear();
  77.     }
  78.     return false;
  79.   }
  80.   /** {@inheritDoc} */
  81.   @SuppressWarnings("unchecked") // Explicit check for value class agreement
  82.   public V createValue() {
  83.     if (null == valueclass) {
  84.       final Class<?> cls = kids[0].createValue().getClass();
  85.       for (RecordReader<K,? extends V> rr : kids) {
  86.         if (!cls.equals(rr.createValue().getClass())) {
  87.           throw new ClassCastException("Child value classes fail to agree");
  88.         }
  89.       }
  90.       valueclass = cls.asSubclass(Writable.class);
  91.       ivalue = createInternalValue();
  92.     }
  93.     return (V) ReflectionUtils.newInstance(valueclass, null);
  94.   }
  95.   /**
  96.    * Return an iterator returning a single value from the tuple.
  97.    * @see MultiFilterDelegationIterator
  98.    */
  99.   protected ResetableIterator<V> getDelegate() {
  100.     return new MultiFilterDelegationIterator();
  101.   }
  102.   /**
  103.    * Proxy the JoinCollector, but include callback to emit.
  104.    */
  105.   protected class MultiFilterDelegationIterator
  106.       implements ResetableIterator<V> {
  107.     public boolean hasNext() {
  108.       return jc.hasNext();
  109.     }
  110.     public boolean next(V val) throws IOException {
  111.       boolean ret;
  112.       if (ret = jc.flush(ivalue)) {
  113.         WritableUtils.cloneInto(val, emit(ivalue));
  114.       }
  115.       return ret;
  116.     }
  117.     public boolean replay(V val) throws IOException {
  118.       WritableUtils.cloneInto(val, emit(ivalue));
  119.       return true;
  120.     }
  121.     public void reset() {
  122.       jc.reset(jc.key());
  123.     }
  124.     public void add(V item) throws IOException {
  125.       throw new UnsupportedOperationException();
  126.     }
  127.     public void close() throws IOException {
  128.       jc.close();
  129.     }
  130.     public void clear() {
  131.       jc.clear();
  132.     }
  133.   }
  134. }