DelegatingInputFormat.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:5k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred.lib;
  19. import java.io.IOException;
  20. import java.util.ArrayList;
  21. import java.util.HashMap;
  22. import java.util.LinkedList;
  23. import java.util.List;
  24. import java.util.Map;
  25. import java.util.Map.Entry;
  26. import org.apache.hadoop.fs.Path;
  27. import org.apache.hadoop.mapred.FileInputFormat;
  28. import org.apache.hadoop.mapred.InputFormat;
  29. import org.apache.hadoop.mapred.InputSplit;
  30. import org.apache.hadoop.mapred.JobConf;
  31. import org.apache.hadoop.mapred.Mapper;
  32. import org.apache.hadoop.mapred.RecordReader;
  33. import org.apache.hadoop.mapred.Reporter;
  34. import org.apache.hadoop.util.ReflectionUtils;
  35. /**
  36.  * An {@link InputFormat} that delegates behaviour of paths to multiple other
  37.  * InputFormats.
  38.  * 
  39.  * @see MultipleInputs#addInputPath(JobConf, Path, Class, Class)
  40.  */
  41. public class DelegatingInputFormat<K, V> implements InputFormat<K, V> {
  42.   public InputSplit[] getSplits(JobConf conf, int numSplits) throws IOException {
  43.     JobConf confCopy = new JobConf(conf);
  44.     List<InputSplit> splits = new ArrayList<InputSplit>();
  45.     Map<Path, InputFormat> formatMap = MultipleInputs.getInputFormatMap(conf);
  46.     Map<Path, Class<? extends Mapper>> mapperMap = MultipleInputs
  47.        .getMapperTypeMap(conf);
  48.     Map<Class<? extends InputFormat>, List<Path>> formatPaths
  49.         = new HashMap<Class<? extends InputFormat>, List<Path>>();
  50.     // First, build a map of InputFormats to Paths
  51.     for (Entry<Path, InputFormat> entry : formatMap.entrySet()) {
  52.       if (!formatPaths.containsKey(entry.getValue().getClass())) {
  53.        formatPaths.put(entry.getValue().getClass(), new LinkedList<Path>());
  54.       }
  55.       formatPaths.get(entry.getValue().getClass()).add(entry.getKey());
  56.     }
  57.     for (Entry<Class<? extends InputFormat>, List<Path>> formatEntry : 
  58.         formatPaths.entrySet()) {
  59.       Class<? extends InputFormat> formatClass = formatEntry.getKey();
  60.       InputFormat format = (InputFormat) ReflectionUtils.newInstance(
  61.          formatClass, conf);
  62.       List<Path> paths = formatEntry.getValue();
  63.       Map<Class<? extends Mapper>, List<Path>> mapperPaths
  64.           = new HashMap<Class<? extends Mapper>, List<Path>>();
  65.       // Now, for each set of paths that have a common InputFormat, build
  66.       // a map of Mappers to the paths they're used for
  67.       for (Path path : paths) {
  68.        Class<? extends Mapper> mapperClass = mapperMap.get(path);
  69.        if (!mapperPaths.containsKey(mapperClass)) {
  70.          mapperPaths.put(mapperClass, new LinkedList<Path>());
  71.        }
  72.        mapperPaths.get(mapperClass).add(path);
  73.       }
  74.       // Now each set of paths that has a common InputFormat and Mapper can
  75.       // be added to the same job, and split together.
  76.       for (Entry<Class<? extends Mapper>, List<Path>> mapEntry : mapperPaths
  77.          .entrySet()) {
  78.        paths = mapEntry.getValue();
  79.        Class<? extends Mapper> mapperClass = mapEntry.getKey();
  80.        if (mapperClass == null) {
  81.          mapperClass = conf.getMapperClass();
  82.        }
  83.        FileInputFormat.setInputPaths(confCopy, paths.toArray(new Path[paths
  84.            .size()]));
  85.        // Get splits for each input path and tag with InputFormat
  86.        // and Mapper types by wrapping in a TaggedInputSplit.
  87.        InputSplit[] pathSplits = format.getSplits(confCopy, numSplits);
  88.        for (InputSplit pathSplit : pathSplits) {
  89.          splits.add(new TaggedInputSplit(pathSplit, conf, format.getClass(),
  90.              mapperClass));
  91.        }
  92.       }
  93.     }
  94.     return splits.toArray(new InputSplit[splits.size()]);
  95.   }
  96.   @SuppressWarnings("unchecked")
  97.   public RecordReader<K, V> getRecordReader(InputSplit split, JobConf conf,
  98.       Reporter reporter) throws IOException {
  99.     // Find the InputFormat and then the RecordReader from the
  100.     // TaggedInputSplit.
  101.     TaggedInputSplit taggedInputSplit = (TaggedInputSplit) split;
  102.     InputFormat<K, V> inputFormat = (InputFormat<K, V>) ReflectionUtils
  103.        .newInstance(taggedInputSplit.getInputFormatClass(), conf);
  104.     return inputFormat.getRecordReader(taggedInputSplit.getInputSplit(), conf,
  105.        reporter);
  106.   }
  107. }