ChainMapper.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:7k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred.lib;
  19. import org.apache.hadoop.mapred.JobConf;
  20. import org.apache.hadoop.mapred.Mapper;
  21. import org.apache.hadoop.mapred.OutputCollector;
  22. import org.apache.hadoop.mapred.Reporter;
  23. import java.io.IOException;
  24. /**
  25.  * The ChainMapper class allows to use multiple Mapper classes within a single
  26.  * Map task.
  27.  * <p/>
  28.  * The Mapper classes are invoked in a chained (or piped) fashion, the output of
  29.  * the first becomes the input of the second, and so on until the last Mapper,
  30.  * the output of the last Mapper will be written to the task's output.
  31.  * <p/>
  32.  * The key functionality of this feature is that the Mappers in the chain do not
  33.  * need to be aware that they are executed in a chain. This enables having
  34.  * reusable specialized Mappers that can be combined to perform composite
  35.  * operations within a single task.
  36.  * <p/>
  37.  * Special care has to be taken when creating chains that the key/values output
  38.  * by a Mapper are valid for the following Mapper in the chain. It is assumed
  39.  * all Mappers and the Reduce in the chain use maching output and input key and
  40.  * value classes as no conversion is done by the chaining code.
  41.  * <p/>
  42.  * Using the ChainMapper and the ChainReducer classes is possible to compose
  43.  * Map/Reduce jobs that look like <code>[MAP+ / REDUCE MAP*]</code>. And
  44.  * immediate benefit of this pattern is a dramatic reduction in disk IO.
  45.  * <p/>
  46.  * IMPORTANT: There is no need to specify the output key/value classes for the
  47.  * ChainMapper, this is done by the addMapper for the last mapper in the chain.
  48.  * <p/>
  49.  * ChainMapper usage pattern:
  50.  * <p/>
  51.  * <pre>
  52.  * ...
  53.  * conf.setJobName("chain");
  54.  * conf.setInputFormat(TextInputFormat.class);
  55.  * conf.setOutputFormat(TextOutputFormat.class);
  56.  * <p/>
  57.  * JobConf mapAConf = new JobConf(false);
  58.  * ...
  59.  * ChainMapper.addMapper(conf, AMap.class, LongWritable.class, Text.class,
  60.  *   Text.class, Text.class, true, mapAConf);
  61.  * <p/>
  62.  * JobConf mapBConf = new JobConf(false);
  63.  * ...
  64.  * ChainMapper.addMapper(conf, BMap.class, Text.class, Text.class,
  65.  *   LongWritable.class, Text.class, false, mapBConf);
  66.  * <p/>
  67.  * JobConf reduceConf = new JobConf(false);
  68.  * ...
  69.  * ChainReducer.setReducer(conf, XReduce.class, LongWritable.class, Text.class,
  70.  *   Text.class, Text.class, true, reduceConf);
  71.  * <p/>
  72.  * ChainReducer.addMapper(conf, CMap.class, Text.class, Text.class,
  73.  *   LongWritable.class, Text.class, false, null);
  74.  * <p/>
  75.  * ChainReducer.addMapper(conf, DMap.class, LongWritable.class, Text.class,
  76.  *   LongWritable.class, LongWritable.class, true, null);
  77.  * <p/>
  78.  * FileInputFormat.setInputPaths(conf, inDir);
  79.  * FileOutputFormat.setOutputPath(conf, outDir);
  80.  * ...
  81.  * <p/>
  82.  * JobClient jc = new JobClient(conf);
  83.  * RunningJob job = jc.submitJob(conf);
  84.  * ...
  85.  * </pre>
  86.  */
  87. public class ChainMapper implements Mapper {
  88.   /**
  89.    * Adds a Mapper class to the chain job's JobConf.
  90.    * <p/>
  91.    * It has to be specified how key and values are passed from one element of
  92.    * the chain to the next, by value or by reference. If a Mapper leverages the
  93.    * assumed semantics that the key and values are not modified by the collector
  94.    * 'by value' must be used. If the Mapper does not expect this semantics, as
  95.    * an optimization to avoid serialization and deserialization 'by reference'
  96.    * can be used.
  97.    * <p/>
  98.    * For the added Mapper the configuration given for it,
  99.    * <code>mapperConf</code>, have precedence over the job's JobConf. This
  100.    * precedence is in effect when the task is running.
  101.    * <p/>
  102.    * IMPORTANT: There is no need to specify the output key/value classes for the
  103.    * ChainMapper, this is done by the addMapper for the last mapper in the chain
  104.    * <p/>
  105.    *
  106.    * @param job              job's JobConf to add the Mapper class.
  107.    * @param klass            the Mapper class to add.
  108.    * @param inputKeyClass    mapper input key class.
  109.    * @param inputValueClass  mapper input value class.
  110.    * @param outputKeyClass   mapper output key class.
  111.    * @param outputValueClass mapper output value class.
  112.    * @param byValue          indicates if key/values should be passed by value
  113.    * to the next Mapper in the chain, if any.
  114.    * @param mapperConf       a JobConf with the configuration for the Mapper
  115.    * class. It is recommended to use a JobConf without default values using the
  116.    * <code>JobConf(boolean loadDefaults)</code> constructor with FALSE.
  117.    */
  118.   public static <K1, V1, K2, V2> void addMapper(JobConf job,
  119.                            Class<? extends Mapper<K1, V1, K2, V2>> klass,
  120.                            Class<? extends K1> inputKeyClass,
  121.                            Class<? extends V1> inputValueClass,
  122.                            Class<? extends K2> outputKeyClass,
  123.                            Class<? extends V2> outputValueClass,
  124.                            boolean byValue, JobConf mapperConf) {
  125.     job.setMapperClass(ChainMapper.class);
  126.     job.setMapOutputKeyClass(outputKeyClass);
  127.     job.setMapOutputValueClass(outputValueClass);
  128.     Chain.addMapper(true, job, klass, inputKeyClass, inputValueClass,
  129.                     outputKeyClass, outputValueClass, byValue, mapperConf);
  130.   }
  131.   private Chain chain;
  132.   /**
  133.    * Constructor.
  134.    */
  135.   public ChainMapper() {
  136.     chain = new Chain(true);
  137.   }
  138.   /**
  139.    * Configures the ChainMapper and all the Mappers in the chain.
  140.    * <p/>
  141.    * If this method is overriden <code>super.configure(...)</code> should be
  142.    * invoked at the beginning of the overwriter method.
  143.    */
  144.   public void configure(JobConf job) {
  145.     chain.configure(job);
  146.   }
  147.   /**
  148.    * Chains the <code>map(...)</code> methods of the Mappers in the chain.
  149.    */
  150.   @SuppressWarnings({"unchecked"})
  151.   public void map(Object key, Object value, OutputCollector output,
  152.                   Reporter reporter) throws IOException {
  153.     Mapper mapper = chain.getFirstMap();
  154.     if (mapper != null) {
  155.       mapper.map(key, value, chain.getMapperCollector(0, output, reporter),
  156.                  reporter);
  157.     }
  158.   }
  159.   /**
  160.    * Closes  the ChainMapper and all the Mappers in the chain.
  161.    * <p/>
  162.    * If this method is overriden <code>super.close()</code> should be
  163.    * invoked at the end of the overwriter method.
  164.    */
  165.   public void close() throws IOException {
  166.     chain.close();
  167.   }
  168. }