OverrideRecordReader.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:3k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred.join;
  19. import java.io.IOException;
  20. import java.util.ArrayList;
  21. import java.util.PriorityQueue;
  22. import org.apache.hadoop.io.Writable;
  23. import org.apache.hadoop.io.WritableComparable;
  24. import org.apache.hadoop.io.WritableComparator;
  25. import org.apache.hadoop.mapred.JobConf;
  26. /**
  27.  * Prefer the "rightmost" data source for this key.
  28.  * For example, <tt>override(S1,S2,S3)</tt> will prefer values
  29.  * from S3 over S2, and values from S2 over S1 for all keys
  30.  * emitted from all sources.
  31.  */
  32. public class OverrideRecordReader<K extends WritableComparable,
  33.                                   V extends Writable>
  34.     extends MultiFilterRecordReader<K,V> {
  35.   OverrideRecordReader(int id, JobConf conf, int capacity,
  36.       Class<? extends WritableComparator> cmpcl) throws IOException {
  37.     super(id, conf, capacity, cmpcl);
  38.   }
  39.   /**
  40.    * Emit the value with the highest position in the tuple.
  41.    */
  42.   @SuppressWarnings("unchecked") // No static typeinfo on Tuples
  43.   protected V emit(TupleWritable dst) {
  44.     return (V) dst.iterator().next();
  45.   }
  46.   /**
  47.    * Instead of filling the JoinCollector with iterators from all
  48.    * data sources, fill only the rightmost for this key.
  49.    * This not only saves space by discarding the other sources, but
  50.    * it also emits the number of key-value pairs in the preferred
  51.    * RecordReader instead of repeating that stream n times, where
  52.    * n is the cardinality of the cross product of the discarded
  53.    * streams for the given key.
  54.    */
  55.   protected void fillJoinCollector(K iterkey) throws IOException {
  56.     final PriorityQueue<ComposableRecordReader<K,?>> q = getRecordReaderQueue();
  57.     if (!q.isEmpty()) {
  58.       int highpos = -1;
  59.       ArrayList<ComposableRecordReader<K,?>> list =
  60.         new ArrayList<ComposableRecordReader<K,?>>(kids.length);
  61.       q.peek().key(iterkey);
  62.       final WritableComparator cmp = getComparator();
  63.       while (0 == cmp.compare(q.peek().key(), iterkey)) {
  64.         ComposableRecordReader<K,?> t = q.poll();
  65.         if (-1 == highpos || list.get(highpos).id() < t.id()) {
  66.           highpos = list.size();
  67.         }
  68.         list.add(t);
  69.         if (q.isEmpty())
  70.           break;
  71.       }
  72.       ComposableRecordReader<K,?> t = list.remove(highpos);
  73.       t.accept(jc, iterkey);
  74.       for (ComposableRecordReader<K,?> rr : list) {
  75.         rr.skip(iterkey);
  76.       }
  77.       list.add(t);
  78.       for (ComposableRecordReader<K,?> rr : list) {
  79.         if (rr.hasNext()) {
  80.           q.add(rr);
  81.         }
  82.       }
  83.     }
  84.   }
  85. }