MultipleInputs.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:5k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred.lib;
  19. import java.util.Collections;
  20. import java.util.HashMap;
  21. import java.util.Map;
  22. import org.apache.hadoop.fs.Path;
  23. import org.apache.hadoop.mapred.InputFormat;
  24. import org.apache.hadoop.mapred.JobConf;
  25. import org.apache.hadoop.mapred.Mapper;
  26. import org.apache.hadoop.util.ReflectionUtils;
  27. /**
  28.  * This class supports MapReduce jobs that have multiple input paths with
  29.  * a different {@link InputFormat} and {@link Mapper} for each path 
  30.  */
  31. public class MultipleInputs {
  32.   /**
  33.    * Add a {@link Path} with a custom {@link InputFormat} to the list of
  34.    * inputs for the map-reduce job.
  35.    * 
  36.    * @param conf The configuration of the job
  37.    * @param path {@link Path} to be added to the list of inputs for the job
  38.    * @param inputFormatClass {@link InputFormat} class to use for this path
  39.    */
  40.   public static void addInputPath(JobConf conf, Path path,
  41.       Class<? extends InputFormat> inputFormatClass) {
  42.     String inputFormatMapping = path.toString() + ";"
  43.        + inputFormatClass.getName();
  44.     String inputFormats = conf.get("mapred.input.dir.formats");
  45.     conf.set("mapred.input.dir.formats",
  46.        inputFormats == null ? inputFormatMapping : inputFormats + ","
  47.            + inputFormatMapping);
  48.     conf.setInputFormat(DelegatingInputFormat.class);
  49.   }
  50.   /**
  51.    * Add a {@link Path} with a custom {@link InputFormat} and
  52.    * {@link Mapper} to the list of inputs for the map-reduce job.
  53.    * 
  54.    * @param conf The configuration of the job
  55.    * @param path {@link Path} to be added to the list of inputs for the job
  56.    * @param inputFormatClass {@link InputFormat} class to use for this path
  57.    * @param mapperClass {@link Mapper} class to use for this path
  58.    */
  59.   public static void addInputPath(JobConf conf, Path path,
  60.       Class<? extends InputFormat> inputFormatClass,
  61.       Class<? extends Mapper> mapperClass) {
  62.     addInputPath(conf, path, inputFormatClass);
  63.     String mapperMapping = path.toString() + ";" + mapperClass.getName();
  64.     String mappers = conf.get("mapred.input.dir.mappers");
  65.     conf.set("mapred.input.dir.mappers", mappers == null ? mapperMapping
  66.        : mappers + "," + mapperMapping);
  67.     conf.setMapperClass(DelegatingMapper.class);
  68.   }
  69.   /**
  70.    * Retrieves a map of {@link Path}s to the {@link InputFormat} class
  71.    * that should be used for them.
  72.    * 
  73.    * @param conf The confuration of the job
  74.    * @see #addInputPath(JobConf, Path, Class)
  75.    * @return A map of paths to inputformats for the job
  76.    */
  77.   static Map<Path, InputFormat> getInputFormatMap(JobConf conf) {
  78.     Map<Path, InputFormat> m = new HashMap<Path, InputFormat>();
  79.     String[] pathMappings = conf.get("mapred.input.dir.formats").split(",");
  80.     for (String pathMapping : pathMappings) {
  81.       String[] split = pathMapping.split(";");
  82.       InputFormat inputFormat;
  83.       try {
  84.        inputFormat = (InputFormat) ReflectionUtils.newInstance(conf
  85.            .getClassByName(split[1]), conf);
  86.       } catch (ClassNotFoundException e) {
  87.        throw new RuntimeException(e);
  88.       }
  89.       m.put(new Path(split[0]), inputFormat);
  90.     }
  91.     return m;
  92.   }
  93.   /**
  94.    * Retrieves a map of {@link Path}s to the {@link Mapper} class that
  95.    * should be used for them.
  96.    * 
  97.    * @param conf The confuration of the job
  98.    * @see #addInputPath(JobConf, Path, Class, Class)
  99.    * @return A map of paths to mappers for the job
  100.    */
  101.   @SuppressWarnings("unchecked")
  102.   static Map<Path, Class<? extends Mapper>> getMapperTypeMap(JobConf conf) {
  103.     if (conf.get("mapred.input.dir.mappers") == null) {
  104.       return Collections.emptyMap();
  105.     }
  106.     Map<Path, Class<? extends Mapper>> m = new HashMap<Path, Class<? extends Mapper>>();
  107.     String[] pathMappings = conf.get("mapred.input.dir.mappers").split(",");
  108.     for (String pathMapping : pathMappings) {
  109.       String[] split = pathMapping.split(";");
  110.       Class<? extends Mapper> mapClass;
  111.       try {
  112.        mapClass = (Class<? extends Mapper>) conf.getClassByName(split[1]);
  113.       } catch (ClassNotFoundException e) {
  114.        throw new RuntimeException(e);
  115.       }
  116.       m.put(new Path(split[0]), mapClass);
  117.     }
  118.     return m;
  119.   }
  120. }