ChainReducer.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:9k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred.lib;
  19. import org.apache.hadoop.mapred.*;
  20. import java.io.IOException;
  21. import java.util.Iterator;
  22. /**
  23.  * The ChainReducer class allows to chain multiple Mapper classes after a
  24.  * Reducer within the Reducer task.
  25.  * <p/>
  26.  * For each record output by the Reducer, the Mapper classes are invoked in a
  27.  * chained (or piped) fashion, the output of the first becomes the input of the
  28.  * second, and so on until the last Mapper, the output of the last Mapper will
  29.  * be written to the task's output.
  30.  * <p/>
  31.  * The key functionality of this feature is that the Mappers in the chain do not
  32.  * need to be aware that they are executed after the Reducer or in a chain.
  33.  * This enables having reusable specialized Mappers that can be combined to
  34.  * perform composite operations within a single task.
  35.  * <p/>
  36.  * Special care has to be taken when creating chains that the key/values output
  37.  * by a Mapper are valid for the following Mapper in the chain. It is assumed
  38.  * all Mappers and the Reduce in the chain use maching output and input key and
  39.  * value classes as no conversion is done by the chaining code.
  40.  * <p/>
  41.  * Using the ChainMapper and the ChainReducer classes is possible to compose
  42.  * Map/Reduce jobs that look like <code>[MAP+ / REDUCE MAP*]</code>. And
  43.  * immediate benefit of this pattern is a dramatic reduction in disk IO.
  44.  * <p/>
  45.  * IMPORTANT: There is no need to specify the output key/value classes for the
  46.  * ChainReducer, this is done by the setReducer or the addMapper for the last
  47.  * element in the chain.
  48.  * <p/>
  49.  * ChainReducer usage pattern:
  50.  * <p/>
  51.  * <pre>
  52.  * ...
  53.  * conf.setJobName("chain");
  54.  * conf.setInputFormat(TextInputFormat.class);
  55.  * conf.setOutputFormat(TextOutputFormat.class);
  56.  * <p/>
  57.  * JobConf mapAConf = new JobConf(false);
  58.  * ...
  59.  * ChainMapper.addMapper(conf, AMap.class, LongWritable.class, Text.class,
  60.  *   Text.class, Text.class, true, mapAConf);
  61.  * <p/>
  62.  * JobConf mapBConf = new JobConf(false);
  63.  * ...
  64.  * ChainMapper.addMapper(conf, BMap.class, Text.class, Text.class,
  65.  *   LongWritable.class, Text.class, false, mapBConf);
  66.  * <p/>
  67.  * JobConf reduceConf = new JobConf(false);
  68.  * ...
  69.  * ChainReducer.setReducer(conf, XReduce.class, LongWritable.class, Text.class,
  70.  *   Text.class, Text.class, true, reduceConf);
  71.  * <p/>
  72.  * ChainReducer.addMapper(conf, CMap.class, Text.class, Text.class,
  73.  *   LongWritable.class, Text.class, false, null);
  74.  * <p/>
  75.  * ChainReducer.addMapper(conf, DMap.class, LongWritable.class, Text.class,
  76.  *   LongWritable.class, LongWritable.class, true, null);
  77.  * <p/>
  78.  * FileInputFormat.setInputPaths(conf, inDir);
  79.  * FileOutputFormat.setOutputPath(conf, outDir);
  80.  * ...
  81.  * <p/>
  82.  * JobClient jc = new JobClient(conf);
  83.  * RunningJob job = jc.submitJob(conf);
  84.  * ...
  85.  * </pre>
  86.  */
  87. public class ChainReducer implements Reducer {
  88.   /**
  89.    * Sets the Reducer class to the chain job's JobConf.
  90.    * <p/>
  91.    * It has to be specified how key and values are passed from one element of
  92.    * the chain to the next, by value or by reference. If a Reducer leverages the
  93.    * assumed semantics that the key and values are not modified by the collector
  94.    * 'by value' must be used. If the Reducer does not expect this semantics, as
  95.    * an optimization to avoid serialization and deserialization 'by reference'
  96.    * can be used.
  97.    * <p/>
  98.    * For the added Reducer the configuration given for it,
  99.    * <code>reducerConf</code>, have precedence over the job's JobConf. This
  100.    * precedence is in effect when the task is running.
  101.    * <p/>
  102.    * IMPORTANT: There is no need to specify the output key/value classes for the
  103.    * ChainReducer, this is done by the setReducer or the addMapper for the last
  104.    * element in the chain.
  105.    *
  106.    * @param job              job's JobConf to add the Reducer class.
  107.    * @param klass            the Reducer class to add.
  108.    * @param inputKeyClass    reducer input key class.
  109.    * @param inputValueClass  reducer input value class.
  110.    * @param outputKeyClass   reducer output key class.
  111.    * @param outputValueClass reducer output value class.
  112.    * @param byValue          indicates if key/values should be passed by value
  113.    * to the next Mapper in the chain, if any.
  114.    * @param reducerConf      a JobConf with the configuration for the Reducer
  115.    * class. It is recommended to use a JobConf without default values using the
  116.    * <code>JobConf(boolean loadDefaults)</code> constructor with FALSE.
  117.    */
  118.   public static <K1, V1, K2, V2> void setReducer(JobConf job,
  119.                            Class<? extends Reducer<K1, V1, K2, V2>> klass,
  120.                            Class<? extends K1> inputKeyClass,
  121.                            Class<? extends V1> inputValueClass,
  122.                            Class<? extends K2> outputKeyClass,
  123.                            Class<? extends V2> outputValueClass,
  124.                            boolean byValue, JobConf reducerConf) {
  125.     job.setReducerClass(ChainReducer.class);
  126.     job.setOutputKeyClass(outputKeyClass);
  127.     job.setOutputValueClass(outputValueClass);
  128.     Chain.setReducer(job, klass, inputKeyClass, inputValueClass, outputKeyClass,
  129.                      outputValueClass, byValue, reducerConf);
  130.   }
  131.   /**
  132.    * Adds a Mapper class to the chain job's JobConf.
  133.    * <p/>
  134.    * It has to be specified how key and values are passed from one element of
  135.    * the chain to the next, by value or by reference. If a Mapper leverages the
  136.    * assumed semantics that the key and values are not modified by the collector
  137.    * 'by value' must be used. If the Mapper does not expect this semantics, as
  138.    * an optimization to avoid serialization and deserialization 'by reference'
  139.    * can be used.
  140.    * <p/>
  141.    * For the added Mapper the configuration given for it,
  142.    * <code>mapperConf</code>, have precedence over the job's JobConf. This
  143.    * precedence is in effect when the task is running.
  144.    * <p/>
  145.    * IMPORTANT: There is no need to specify the output key/value classes for the
  146.    * ChainMapper, this is done by the addMapper for the last mapper in the chain
  147.    * .
  148.    *
  149.    * @param job              chain job's JobConf to add the Mapper class.
  150.    * @param klass            the Mapper class to add.
  151.    * @param inputKeyClass    mapper input key class.
  152.    * @param inputValueClass  mapper input value class.
  153.    * @param outputKeyClass   mapper output key class.
  154.    * @param outputValueClass mapper output value class.
  155.    * @param byValue          indicates if key/values should be passed by value
  156.    * to the next Mapper in the chain, if any.
  157.    * @param mapperConf       a JobConf with the configuration for the Mapper
  158.    * class. It is recommended to use a JobConf without default values using the
  159.    * <code>JobConf(boolean loadDefaults)</code> constructor with FALSE.
  160.    */
  161.   public static <K1, V1, K2, V2> void addMapper(JobConf job,
  162.                            Class<? extends Mapper<K1, V1, K2, V2>> klass,
  163.                            Class<? extends K1> inputKeyClass,
  164.                            Class<? extends V1> inputValueClass,
  165.                            Class<? extends K2> outputKeyClass,
  166.                            Class<? extends V2> outputValueClass,
  167.                            boolean byValue, JobConf mapperConf) {
  168.     job.setOutputKeyClass(outputKeyClass);
  169.     job.setOutputValueClass(outputValueClass);
  170.     Chain.addMapper(false, job, klass, inputKeyClass, inputValueClass,
  171.                     outputKeyClass, outputValueClass, byValue, mapperConf);
  172.   }
  173.   private Chain chain;
  174.   /**
  175.    * Constructor.
  176.    */
  177.   public ChainReducer() {
  178.     chain = new Chain(false);
  179.   }
  180.   /**
  181.    * Configures the ChainReducer, the Reducer and all the Mappers in the chain.
  182.    * <p/>
  183.    * If this method is overriden <code>super.configure(...)</code> should be
  184.    * invoked at the beginning of the overwriter method.
  185.    */
  186.   public void configure(JobConf job) {
  187.     chain.configure(job);
  188.   }
  189.   /**
  190.    * Chains the <code>reduce(...)</code> method of the Reducer with the
  191.    * <code>map(...) </code> methods of the Mappers in the chain.
  192.    */
  193.   @SuppressWarnings({"unchecked"})
  194.   public void reduce(Object key, Iterator values, OutputCollector output,
  195.                      Reporter reporter) throws IOException {
  196.     Reducer reducer = chain.getReducer();
  197.     if (reducer != null) {
  198.       reducer.reduce(key, values, chain.getReducerCollector(output, reporter),
  199.                      reporter);
  200.     }
  201.   }
  202.   /**
  203.    * Closes  the ChainReducer, the Reducer and all the Mappers in the chain.
  204.    * <p/>
  205.    * If this method is overriden <code>super.close()</code> should be
  206.    * invoked at the end of the overwriter method.
  207.    */
  208.   public void close() throws IOException {
  209.     chain.close();
  210.   }
  211. }