Reducer.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:7k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.IOException;
  20. import java.util.Iterator;
  21. import org.apache.hadoop.fs.FileSystem;
  22. import org.apache.hadoop.io.Closeable;
  23. /** 
  24.  * Reduces a set of intermediate values which share a key to a smaller set of
  25.  * values.  
  26.  * 
  27.  * <p>The number of <code>Reducer</code>s for the job is set by the user via 
  28.  * {@link JobConf#setNumReduceTasks(int)}. <code>Reducer</code> implementations 
  29.  * can access the {@link JobConf} for the job via the 
  30.  * {@link JobConfigurable#configure(JobConf)} method and initialize themselves. 
  31.  * Similarly they can use the {@link Closeable#close()} method for
  32.  * de-initialization.</p>
  33.  * <p><code>Reducer</code> has 3 primary phases:</p>
  34.  * <ol>
  35.  *   <li>
  36.  *   
  37.  *   <h4 id="Shuffle">Shuffle</h4>
  38.  *   
  39.  *   <p><code>Reducer</code> is input the grouped output of a {@link Mapper}.
  40.  *   In the phase the framework, for each <code>Reducer</code>, fetches the 
  41.  *   relevant partition of the output of all the <code>Mapper</code>s, via HTTP. 
  42.  *   </p>
  43.  *   </li>
  44.  *   
  45.  *   <li>
  46.  *   <h4 id="Sort">Sort</h4>
  47.  *   
  48.  *   <p>The framework groups <code>Reducer</code> inputs by <code>key</code>s 
  49.  *   (since different <code>Mapper</code>s may have output the same key) in this
  50.  *   stage.</p>
  51.  *   
  52.  *   <p>The shuffle and sort phases occur simultaneously i.e. while outputs are
  53.  *   being fetched they are merged.</p>
  54.  *      
  55.  *   <h5 id="SecondarySort">SecondarySort</h5>
  56.  *   
  57.  *   <p>If equivalence rules for keys while grouping the intermediates are 
  58.  *   different from those for grouping keys before reduction, then one may 
  59.  *   specify a <code>Comparator</code> via 
  60.  *   {@link JobConf#setOutputValueGroupingComparator(Class)}.Since 
  61.  *   {@link JobConf#setOutputKeyComparatorClass(Class)} can be used to 
  62.  *   control how intermediate keys are grouped, these can be used in conjunction 
  63.  *   to simulate <i>secondary sort on values</i>.</p>
  64.  *   
  65.  *   
  66.  *   For example, say that you want to find duplicate web pages and tag them 
  67.  *   all with the url of the "best" known example. You would set up the job 
  68.  *   like:
  69.  *   <ul>
  70.  *     <li>Map Input Key: url</li>
  71.  *     <li>Map Input Value: document</li>
  72.  *     <li>Map Output Key: document checksum, url pagerank</li>
  73.  *     <li>Map Output Value: url</li>
  74.  *     <li>Partitioner: by checksum</li>
  75.  *     <li>OutputKeyComparator: by checksum and then decreasing pagerank</li>
  76.  *     <li>OutputValueGroupingComparator: by checksum</li>
  77.  *   </ul>
  78.  *   </li>
  79.  *   
  80.  *   <li>   
  81.  *   <h4 id="Reduce">Reduce</h4>
  82.  *   
  83.  *   <p>In this phase the 
  84.  *   {@link #reduce(Object, Iterator, OutputCollector, Reporter)}
  85.  *   method is called for each <code>&lt;key, (list of values)></code> pair in
  86.  *   the grouped inputs.</p>
  87.  *   <p>The output of the reduce task is typically written to the 
  88.  *   {@link FileSystem} via 
  89.  *   {@link OutputCollector#collect(Object, Object)}.</p>
  90.  *   </li>
  91.  * </ol>
  92.  * 
  93.  * <p>The output of the <code>Reducer</code> is <b>not re-sorted</b>.</p>
  94.  * 
  95.  * <p>Example:</p>
  96.  * <p><blockquote><pre>
  97.  *     public class MyReducer&lt;K extends WritableComparable, V extends Writable&gt; 
  98.  *     extends MapReduceBase implements Reducer&lt;K, V, K, V&gt; {
  99.  *     
  100.  *       static enum MyCounters { NUM_RECORDS }
  101.  *        
  102.  *       private String reduceTaskId;
  103.  *       private int noKeys = 0;
  104.  *       
  105.  *       public void configure(JobConf job) {
  106.  *         reduceTaskId = job.get("mapred.task.id");
  107.  *       }
  108.  *       
  109.  *       public void reduce(K key, Iterator&lt;V&gt; values,
  110.  *                          OutputCollector&lt;K, V&gt; output, 
  111.  *                          Reporter reporter)
  112.  *       throws IOException {
  113.  *       
  114.  *         // Process
  115.  *         int noValues = 0;
  116.  *         while (values.hasNext()) {
  117.  *           V value = values.next();
  118.  *           
  119.  *           // Increment the no. of values for this key
  120.  *           ++noValues;
  121.  *           
  122.  *           // Process the &lt;key, value&gt; pair (assume this takes a while)
  123.  *           // ...
  124.  *           // ...
  125.  *           
  126.  *           // Let the framework know that we are alive, and kicking!
  127.  *           if ((noValues%10) == 0) {
  128.  *             reporter.progress();
  129.  *           }
  130.  *         
  131.  *           // Process some more
  132.  *           // ...
  133.  *           // ...
  134.  *           
  135.  *           // Output the &lt;key, value&gt; 
  136.  *           output.collect(key, value);
  137.  *         }
  138.  *         
  139.  *         // Increment the no. of &lt;key, list of values&gt; pairs processed
  140.  *         ++noKeys;
  141.  *         
  142.  *         // Increment counters
  143.  *         reporter.incrCounter(NUM_RECORDS, 1);
  144.  *         
  145.  *         // Every 100 keys update application-level status
  146.  *         if ((noKeys%100) == 0) {
  147.  *           reporter.setStatus(reduceTaskId + " processed " + noKeys);
  148.  *         }
  149.  *       }
  150.  *     }
  151.  * </pre></blockquote></p>
  152.  * 
  153.  * @see Mapper
  154.  * @see Partitioner
  155.  * @see Reporter
  156.  * @see MapReduceBase
  157.  * @deprecated Use {@link org.apache.hadoop.mapreduce.Reducer} instead.
  158.  */
  159. @Deprecated
  160. public interface Reducer<K2, V2, K3, V3> extends JobConfigurable, Closeable {
  161.   
  162.   /** 
  163.    * <i>Reduces</i> values for a given key.  
  164.    * 
  165.    * <p>The framework calls this method for each 
  166.    * <code>&lt;key, (list of values)></code> pair in the grouped inputs.
  167.    * Output values must be of the same type as input values.  Input keys must 
  168.    * not be altered. The framework will <b>reuse</b> the key and value objects
  169.    * that are passed into the reduce, therefore the application should clone
  170.    * the objects they want to keep a copy of. In many cases, all values are 
  171.    * combined into zero or one value.
  172.    * </p>
  173.    *   
  174.    * <p>Output pairs are collected with calls to  
  175.    * {@link OutputCollector#collect(Object,Object)}.</p>
  176.    *
  177.    * <p>Applications can use the {@link Reporter} provided to report progress 
  178.    * or just indicate that they are alive. In scenarios where the application 
  179.    * takes an insignificant amount of time to process individual key/value 
  180.    * pairs, this is crucial since the framework might assume that the task has 
  181.    * timed-out and kill that task. The other way of avoiding this is to set 
  182.    * <a href="{@docRoot}/../mapred-default.html#mapred.task.timeout">
  183.    * mapred.task.timeout</a> to a high-enough value (or even zero for no 
  184.    * time-outs).</p>
  185.    * 
  186.    * @param key the key.
  187.    * @param values the list of values to reduce.
  188.    * @param output to collect keys and combined values.
  189.    * @param reporter facility to report progress.
  190.    */
  191.   void reduce(K2 key, Iterator<V2> values,
  192.               OutputCollector<K3, V3> output, Reporter reporter)
  193.     throws IOException;
  194. }