ReduceContext.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:7k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapreduce;
  19. import java.io.IOException;
  20. import java.util.Iterator;
  21. import java.util.NoSuchElementException;
  22. import org.apache.hadoop.conf.Configuration;
  23. import org.apache.hadoop.io.BytesWritable;
  24. import org.apache.hadoop.io.DataInputBuffer;
  25. import org.apache.hadoop.io.RawComparator;
  26. import org.apache.hadoop.io.serializer.Deserializer;
  27. import org.apache.hadoop.io.serializer.SerializationFactory;
  28. import org.apache.hadoop.mapred.RawKeyValueIterator;
  29. import org.apache.hadoop.util.Progressable;
  30. /**
  31.  * The context passed to the {@link Reducer}.
  32.  * @param <KEYIN> the class of the input keys
  33.  * @param <VALUEIN> the class of the input values
  34.  * @param <KEYOUT> the class of the output keys
  35.  * @param <VALUEOUT> the class of the output values
  36.  */
  37. public class ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
  38.     extends TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
  39.   private RawKeyValueIterator input;
  40.   private Counter inputCounter;
  41.   private RawComparator<KEYIN> comparator;
  42.   private KEYIN key;                                  // current key
  43.   private VALUEIN value;                              // current value
  44.   private boolean firstValue = false;                 // first value in key
  45.   private boolean nextKeyIsSame = false;              // more w/ this key
  46.   private boolean hasMore;                            // more in file
  47.   protected Progressable reporter;
  48.   private Deserializer<KEYIN> keyDeserializer;
  49.   private Deserializer<VALUEIN> valueDeserializer;
  50.   private DataInputBuffer buffer = new DataInputBuffer();
  51.   private BytesWritable currentRawKey = new BytesWritable();
  52.   private ValueIterable iterable = new ValueIterable();
  53.   public ReduceContext(Configuration conf, TaskAttemptID taskid,
  54.                        RawKeyValueIterator input, 
  55.                        Counter inputCounter,
  56.                        RecordWriter<KEYOUT,VALUEOUT> output,
  57.                        OutputCommitter committer,
  58.                        StatusReporter reporter,
  59.                        RawComparator<KEYIN> comparator,
  60.                        Class<KEYIN> keyClass,
  61.                        Class<VALUEIN> valueClass
  62.                        ) throws InterruptedException, IOException{
  63.     super(conf, taskid, output, committer, reporter);
  64.     this.input = input;
  65.     this.inputCounter = inputCounter;
  66.     this.comparator = comparator;
  67.     SerializationFactory serializationFactory = new SerializationFactory(conf);
  68.     this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
  69.     this.keyDeserializer.open(buffer);
  70.     this.valueDeserializer = serializationFactory.getDeserializer(valueClass);
  71.     this.valueDeserializer.open(buffer);
  72.     hasMore = input.next();
  73.   }
  74.   /** Start processing next unique key. */
  75.   public boolean nextKey() throws IOException,InterruptedException {
  76.     while (hasMore && nextKeyIsSame) {
  77.       nextKeyValue();
  78.     }
  79.     if (hasMore) {
  80.       return nextKeyValue();
  81.     } else {
  82.       return false;
  83.     }
  84.   }
  85.   /**
  86.    * Advance to the next key/value pair.
  87.    */
  88.   @Override
  89.   public boolean nextKeyValue() throws IOException, InterruptedException {
  90.     if (!hasMore) {
  91.       key = null;
  92.       value = null;
  93.       return false;
  94.     }
  95.     firstValue = !nextKeyIsSame;
  96.     DataInputBuffer next = input.getKey();
  97.     currentRawKey.set(next.getData(), next.getPosition(), 
  98.                       next.getLength() - next.getPosition());
  99.     buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength());
  100.     key = keyDeserializer.deserialize(key);
  101.     next = input.getValue();
  102.     buffer.reset(next.getData(), next.getPosition(), next.getLength());
  103.     value = valueDeserializer.deserialize(value);
  104.     hasMore = input.next();
  105.     inputCounter.increment(1);
  106.     if (hasMore) {
  107.       next = input.getKey();
  108.       nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0, 
  109.                                          currentRawKey.getLength(),
  110.                                          next.getData(),
  111.                                          next.getPosition(),
  112.                                          next.getLength() - next.getPosition()
  113.                                          ) == 0;
  114.     } else {
  115.       nextKeyIsSame = false;
  116.     }
  117.     return true;
  118.   }
  119.   public KEYIN getCurrentKey() {
  120.     return key;
  121.   }
  122.   @Override
  123.   public VALUEIN getCurrentValue() {
  124.     return value;
  125.   }
  126.   protected class ValueIterator implements Iterator<VALUEIN> {
  127.     @Override
  128.     public boolean hasNext() {
  129.       return firstValue || nextKeyIsSame;
  130.     }
  131.     @Override
  132.     public VALUEIN next() {
  133.       // if this is the first record, we don't need to advance
  134.       if (firstValue) {
  135.         firstValue = false;
  136.         return value;
  137.       }
  138.       // if this isn't the first record and the next key is different, they
  139.       // can't advance it here.
  140.       if (!nextKeyIsSame) {
  141.         throw new NoSuchElementException("iterate past last value");
  142.       }
  143.       // otherwise, go to the next key/value pair
  144.       try {
  145.         nextKeyValue();
  146.         return value;
  147.       } catch (IOException ie) {
  148.         throw new RuntimeException("next value iterator failed", ie);
  149.       } catch (InterruptedException ie) {
  150.         // this is bad, but we can't modify the exception list of java.util
  151.         throw new RuntimeException("next value iterator interrupted", ie);        
  152.       }
  153.     }
  154.     @Override
  155.     public void remove() {
  156.       throw new UnsupportedOperationException("remove not implemented");
  157.     }
  158.     
  159.   }
  160.   protected class ValueIterable implements Iterable<VALUEIN> {
  161.     private ValueIterator iterator = new ValueIterator();
  162.     @Override
  163.     public Iterator<VALUEIN> iterator() {
  164.       return iterator;
  165.     } 
  166.   }
  167.   
  168.   /**
  169.    * Iterate through the values for the current key, reusing the same value 
  170.    * object, which is stored in the context.
  171.    * @return the series of values associated with the current key. All of the 
  172.    * objects returned directly and indirectly from this method are reused.
  173.    */
  174.   public 
  175.   Iterable<VALUEIN> getValues() throws IOException, InterruptedException {
  176.     return iterable;
  177.   }
  178. }