Reducer.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:6k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapreduce;
  19. import java.io.IOException;
  20. import org.apache.hadoop.conf.Configuration;
  21. import org.apache.hadoop.io.RawComparator;
  22. import org.apache.hadoop.mapred.RawKeyValueIterator;
  23. /** 
  24.  * Reduces a set of intermediate values which share a key to a smaller set of
  25.  * values.  
  26.  * 
  27.  * <p><code>Reducer</code> implementations 
  28.  * can access the {@link Configuration} for the job via the 
  29.  * {@link JobContext#getConfiguration()} method.</p>
  30.  * <p><code>Reducer</code> has 3 primary phases:</p>
  31.  * <ol>
  32.  *   <li>
  33.  *   
  34.  *   <h4 id="Shuffle">Shuffle</h4>
  35.  *   
  36.  *   <p>The <code>Reducer</code> copies the sorted output from each 
  37.  *   {@link Mapper} using HTTP across the network.</p>
  38.  *   </li>
  39.  *   
  40.  *   <li>
  41.  *   <h4 id="Sort">Sort</h4>
  42.  *   
  43.  *   <p>The framework merge sorts <code>Reducer</code> inputs by 
  44.  *   <code>key</code>s 
  45.  *   (since different <code>Mapper</code>s may have output the same key).</p>
  46.  *   
  47.  *   <p>The shuffle and sort phases occur simultaneously i.e. while outputs are
  48.  *   being fetched they are merged.</p>
  49.  *      
  50.  *   <h5 id="SecondarySort">SecondarySort</h5>
  51.  *   
  52.  *   <p>To achieve a secondary sort on the values returned by the value 
  53.  *   iterator, the application should extend the key with the secondary
  54.  *   key and define a grouping comparator. The keys will be sorted using the
  55.  *   entire key, but will be grouped using the grouping comparator to decide
  56.  *   which keys and values are sent in the same call to reduce.The grouping 
  57.  *   comparator is specified via 
  58.  *   {@link Job#setGroupingComparatorClass(Class)}. The sort order is
  59.  *   controlled by 
  60.  *   {@link Job#setSortComparatorClass(Class)}.</p>
  61.  *   
  62.  *   
  63.  *   For example, say that you want to find duplicate web pages and tag them 
  64.  *   all with the url of the "best" known example. You would set up the job 
  65.  *   like:
  66.  *   <ul>
  67.  *     <li>Map Input Key: url</li>
  68.  *     <li>Map Input Value: document</li>
  69.  *     <li>Map Output Key: document checksum, url pagerank</li>
  70.  *     <li>Map Output Value: url</li>
  71.  *     <li>Partitioner: by checksum</li>
  72.  *     <li>OutputKeyComparator: by checksum and then decreasing pagerank</li>
  73.  *     <li>OutputValueGroupingComparator: by checksum</li>
  74.  *   </ul>
  75.  *   </li>
  76.  *   
  77.  *   <li>   
  78.  *   <h4 id="Reduce">Reduce</h4>
  79.  *   
  80.  *   <p>In this phase the 
  81.  *   {@link #reduce(Object, Iterable, Context)}
  82.  *   method is called for each <code>&lt;key, (collection of values)></code> in
  83.  *   the sorted inputs.</p>
  84.  *   <p>The output of the reduce task is typically written to a 
  85.  *   {@link RecordWriter} via 
  86.  *   {@link Context#write(Object, Object)}.</p>
  87.  *   </li>
  88.  * </ol>
  89.  * 
  90.  * <p>The output of the <code>Reducer</code> is <b>not re-sorted</b>.</p>
  91.  * 
  92.  * <p>Example:</p>
  93.  * <p><blockquote><pre>
  94.  * public class IntSumReducer<Key> extends Reducer<Key,IntWritable,
  95.  *                                                 Key,IntWritable> {
  96.  *   private IntWritable result = new IntWritable();
  97.  * 
  98.  *   public void reduce(Key key, Iterable<IntWritable> values, 
  99.  *                      Context context) throws IOException {
  100.  *     int sum = 0;
  101.  *     for (IntWritable val : values) {
  102.  *       sum += val.get();
  103.  *     }
  104.  *     result.set(sum);
  105.  *     context.collect(key, result);
  106.  *   }
  107.  * }
  108.  * </pre></blockquote></p>
  109.  * 
  110.  * @see Mapper
  111.  * @see Partitioner
  112.  */
  113. public abstract class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
  114.   public class Context 
  115.     extends ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
  116.     public Context(Configuration conf, TaskAttemptID taskid,
  117.                    RawKeyValueIterator input, 
  118.                    Counter inputCounter,
  119.                    RecordWriter<KEYOUT,VALUEOUT> output,
  120.                    OutputCommitter committer,
  121.                    StatusReporter reporter,
  122.                    RawComparator<KEYIN> comparator,
  123.                    Class<KEYIN> keyClass,
  124.                    Class<VALUEIN> valueClass
  125.                    ) throws IOException, InterruptedException {
  126.       super(conf, taskid, input, inputCounter, output, committer, reporter, 
  127.             comparator, keyClass, valueClass);
  128.     }
  129.   }
  130.   /**
  131.    * Called once at the start of the task.
  132.    */
  133.   protected void setup(Context context
  134.                        ) throws IOException, InterruptedException {
  135.     // NOTHING
  136.   }
  137.   /**
  138.    * This method is called once for each key. Most applications will define
  139.    * their reduce class by overriding this method. The default implementation
  140.    * is an identity function.
  141.    */
  142.   @SuppressWarnings("unchecked")
  143.   protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
  144.                         ) throws IOException, InterruptedException {
  145.     for(VALUEIN value: values) {
  146.       context.write((KEYOUT) key, (VALUEOUT) value);
  147.     }
  148.   }
  149.   /**
  150.    * Called once at the end of the task.
  151.    */
  152.   protected void cleanup(Context context
  153.                          ) throws IOException, InterruptedException {
  154.     // NOTHING
  155.   }
  156.   /**
  157.    * Advanced application writers can use the 
  158.    * {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
  159.    * control how the reduce task works.
  160.    */
  161.   public void run(Context context) throws IOException, InterruptedException {
  162.     setup(context);
  163.     while (context.nextKey()) {
  164.       reduce(context.getCurrentKey(), context.getValues(), context);
  165.     }
  166.     cleanup(context);
  167.   }
  168. }