FileInputFormat.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:14k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapreduce.lib.input;
  19. import java.io.IOException;
  20. import java.util.ArrayList;
  21. import java.util.List;
  22. import org.apache.commons.logging.Log;
  23. import org.apache.commons.logging.LogFactory;
  24. import org.apache.hadoop.conf.Configuration;
  25. import org.apache.hadoop.fs.FileStatus;
  26. import org.apache.hadoop.fs.FileSystem;
  27. import org.apache.hadoop.fs.Path;
  28. import org.apache.hadoop.fs.PathFilter;
  29. import org.apache.hadoop.fs.BlockLocation;
  30. import org.apache.hadoop.mapreduce.InputFormat;
  31. import org.apache.hadoop.mapreduce.InputSplit;
  32. import org.apache.hadoop.mapreduce.Job;
  33. import org.apache.hadoop.mapreduce.JobContext;
  34. import org.apache.hadoop.mapreduce.Mapper;
  35. import org.apache.hadoop.util.ReflectionUtils;
  36. import org.apache.hadoop.util.StringUtils;
  37. /** 
  38.  * A base class for file-based {@link InputFormat}s.
  39.  * 
  40.  * <p><code>FileInputFormat</code> is the base class for all file-based 
  41.  * <code>InputFormat</code>s. This provides a generic implementation of
  42.  * {@link #getSplits(JobContext)}.
  43.  * Subclasses of <code>FileInputFormat</code> can also override the 
  44.  * {@link #isSplitable(JobContext, Path)} method to ensure input-files are
  45.  * not split-up and are processed as a whole by {@link Mapper}s.
  46.  */
  47. public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
  48.   private static final Log LOG = LogFactory.getLog(FileInputFormat.class);
  49.   private static final double SPLIT_SLOP = 1.1;   // 10% slop
  50.   private static final PathFilter hiddenFileFilter = new PathFilter(){
  51.       public boolean accept(Path p){
  52.         String name = p.getName(); 
  53.         return !name.startsWith("_") && !name.startsWith("."); 
  54.       }
  55.     }; 
  56.   /**
  57.    * Proxy PathFilter that accepts a path only if all filters given in the
  58.    * constructor do. Used by the listPaths() to apply the built-in
  59.    * hiddenFileFilter together with a user provided one (if any).
  60.    */
  61.   private static class MultiPathFilter implements PathFilter {
  62.     private List<PathFilter> filters;
  63.     public MultiPathFilter(List<PathFilter> filters) {
  64.       this.filters = filters;
  65.     }
  66.     public boolean accept(Path path) {
  67.       for (PathFilter filter : filters) {
  68.         if (!filter.accept(path)) {
  69.           return false;
  70.         }
  71.       }
  72.       return true;
  73.     }
  74.   }
  75.   /**
  76.    * Get the lower bound on split size imposed by the format.
  77.    * @return the number of bytes of the minimal split for this format
  78.    */
  79.   protected long getFormatMinSplitSize() {
  80.     return 1;
  81.   }
  82.   /**
  83.    * Is the given filename splitable? Usually, true, but if the file is
  84.    * stream compressed, it will not be.
  85.    * 
  86.    * <code>FileInputFormat</code> implementations can override this and return
  87.    * <code>false</code> to ensure that individual input files are never split-up
  88.    * so that {@link Mapper}s process entire files.
  89.    * 
  90.    * @param context the job context
  91.    * @param filename the file name to check
  92.    * @return is this file splitable?
  93.    */
  94.   protected boolean isSplitable(JobContext context, Path filename) {
  95.     return true;
  96.   }
  97.   /**
  98.    * Set a PathFilter to be applied to the input paths for the map-reduce job.
  99.    * @param job the job to modify
  100.    * @param filter the PathFilter class use for filtering the input paths.
  101.    */
  102.   public static void setInputPathFilter(Job job,
  103.                                         Class<? extends PathFilter> filter) {
  104.     job.getConfiguration().setClass("mapred.input.pathFilter.class", filter, 
  105.                                     PathFilter.class);
  106.   }
  107.   /**
  108.    * Set the minimum input split size
  109.    * @param job the job to modify
  110.    * @param size the minimum size
  111.    */
  112.   public static void setMinInputSplitSize(Job job,
  113.                                           long size) {
  114.     job.getConfiguration().setLong("mapred.min.split.size", size);
  115.   }
  116.   /**
  117.    * Get the minimum split size
  118.    * @param job the job
  119.    * @return the minimum number of bytes that can be in a split
  120.    */
  121.   public static long getMinSplitSize(JobContext job) {
  122.     return job.getConfiguration().getLong("mapred.min.split.size", 1L);
  123.   }
  124.   /**
  125.    * Set the maximum split size
  126.    * @param job the job to modify
  127.    * @param size the maximum split size
  128.    */
  129.   public static void setMaxInputSplitSize(Job job,
  130.                                           long size) {
  131.     job.getConfiguration().setLong("mapred.max.split.size", size);
  132.   }
  133.   /**
  134.    * Get the maximum split size.
  135.    * @param context the job to look at.
  136.    * @return the maximum number of bytes a split can include
  137.    */
  138.   public static long getMaxSplitSize(JobContext context) {
  139.     return context.getConfiguration().getLong("mapred.max.split.size", 
  140.                                               Long.MAX_VALUE);
  141.   }
  142.   /**
  143.    * Get a PathFilter instance of the filter set for the input paths.
  144.    *
  145.    * @return the PathFilter instance set for the job, NULL if none has been set.
  146.    */
  147.   public static PathFilter getInputPathFilter(JobContext context) {
  148.     Configuration conf = context.getConfiguration();
  149.     Class<?> filterClass = conf.getClass("mapred.input.pathFilter.class", null,
  150.         PathFilter.class);
  151.     return (filterClass != null) ?
  152.         (PathFilter) ReflectionUtils.newInstance(filterClass, conf) : null;
  153.   }
  154.   /** List input directories.
  155.    * Subclasses may override to, e.g., select only files matching a regular
  156.    * expression. 
  157.    * 
  158.    * @param job the job to list input paths for
  159.    * @return array of FileStatus objects
  160.    * @throws IOException if zero items.
  161.    */
  162.   protected List<FileStatus> listStatus(JobContext job
  163.                                         ) throws IOException {
  164.     List<FileStatus> result = new ArrayList<FileStatus>();
  165.     Path[] dirs = getInputPaths(job);
  166.     if (dirs.length == 0) {
  167.       throw new IOException("No input paths specified in job");
  168.     }
  169.     List<IOException> errors = new ArrayList<IOException>();
  170.     
  171.     // creates a MultiPathFilter with the hiddenFileFilter and the
  172.     // user provided one (if any).
  173.     List<PathFilter> filters = new ArrayList<PathFilter>();
  174.     filters.add(hiddenFileFilter);
  175.     PathFilter jobFilter = getInputPathFilter(job);
  176.     if (jobFilter != null) {
  177.       filters.add(jobFilter);
  178.     }
  179.     PathFilter inputFilter = new MultiPathFilter(filters);
  180.     
  181.     for (int i=0; i < dirs.length; ++i) {
  182.       Path p = dirs[i];
  183.       FileSystem fs = p.getFileSystem(job.getConfiguration()); 
  184.       FileStatus[] matches = fs.globStatus(p, inputFilter);
  185.       if (matches == null) {
  186.         errors.add(new IOException("Input path does not exist: " + p));
  187.       } else if (matches.length == 0) {
  188.         errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
  189.       } else {
  190.         for (FileStatus globStat: matches) {
  191.           if (globStat.isDir()) {
  192.             for(FileStatus stat: fs.listStatus(globStat.getPath(),
  193.                 inputFilter)) {
  194.               result.add(stat);
  195.             }          
  196.           } else {
  197.             result.add(globStat);
  198.           }
  199.         }
  200.       }
  201.     }
  202.     if (!errors.isEmpty()) {
  203.       throw new InvalidInputException(errors);
  204.     }
  205.     LOG.info("Total input paths to process : " + result.size()); 
  206.     return result;
  207.   }
  208.   
  209.   /** 
  210.    * Generate the list of files and make them into FileSplits.
  211.    */ 
  212.   public List<InputSplit> getSplits(JobContext job
  213.                                     ) throws IOException {
  214.     long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
  215.     long maxSize = getMaxSplitSize(job);
  216.     // generate splits
  217.     List<InputSplit> splits = new ArrayList<InputSplit>();
  218.     for (FileStatus file: listStatus(job)) {
  219.       Path path = file.getPath();
  220.       FileSystem fs = path.getFileSystem(job.getConfiguration());
  221.       long length = file.getLen();
  222.       BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
  223.       if ((length != 0) && isSplitable(job, path)) { 
  224.         long blockSize = file.getBlockSize();
  225.         long splitSize = computeSplitSize(blockSize, minSize, maxSize);
  226.         long bytesRemaining = length;
  227.         while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
  228.           int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
  229.           splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 
  230.                                    blkLocations[blkIndex].getHosts()));
  231.           bytesRemaining -= splitSize;
  232.         }
  233.         
  234.         if (bytesRemaining != 0) {
  235.           splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, 
  236.                      blkLocations[blkLocations.length-1].getHosts()));
  237.         }
  238.       } else if (length != 0) {
  239.         splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
  240.       } else { 
  241.         //Create empty hosts array for zero length files
  242.         splits.add(new FileSplit(path, 0, length, new String[0]));
  243.       }
  244.     }
  245.     LOG.debug("Total # of splits: " + splits.size());
  246.     return splits;
  247.   }
  248.   protected long computeSplitSize(long blockSize, long minSize,
  249.                                   long maxSize) {
  250.     return Math.max(minSize, Math.min(maxSize, blockSize));
  251.   }
  252.   protected int getBlockIndex(BlockLocation[] blkLocations, 
  253.                               long offset) {
  254.     for (int i = 0 ; i < blkLocations.length; i++) {
  255.       // is the offset inside this block?
  256.       if ((blkLocations[i].getOffset() <= offset) &&
  257.           (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
  258.         return i;
  259.       }
  260.     }
  261.     BlockLocation last = blkLocations[blkLocations.length -1];
  262.     long fileLength = last.getOffset() + last.getLength() -1;
  263.     throw new IllegalArgumentException("Offset " + offset + 
  264.                                        " is outside of file (0.." +
  265.                                        fileLength + ")");
  266.   }
  267.   /**
  268.    * Sets the given comma separated paths as the list of inputs 
  269.    * for the map-reduce job.
  270.    * 
  271.    * @param job the job
  272.    * @param commaSeparatedPaths Comma separated paths to be set as 
  273.    *        the list of inputs for the map-reduce job.
  274.    */
  275.   public static void setInputPaths(Job job, 
  276.                                    String commaSeparatedPaths
  277.                                    ) throws IOException {
  278.     setInputPaths(job, StringUtils.stringToPath(
  279.                         getPathStrings(commaSeparatedPaths)));
  280.   }
  281.   /**
  282.    * Add the given comma separated paths to the list of inputs for
  283.    *  the map-reduce job.
  284.    * 
  285.    * @param job The job to modify
  286.    * @param commaSeparatedPaths Comma separated paths to be added to
  287.    *        the list of inputs for the map-reduce job.
  288.    */
  289.   public static void addInputPaths(Job job, 
  290.                                    String commaSeparatedPaths
  291.                                    ) throws IOException {
  292.     for (String str : getPathStrings(commaSeparatedPaths)) {
  293.       addInputPath(job, new Path(str));
  294.     }
  295.   }
  296.   /**
  297.    * Set the array of {@link Path}s as the list of inputs
  298.    * for the map-reduce job.
  299.    * 
  300.    * @param job The job to modify 
  301.    * @param inputPaths the {@link Path}s of the input directories/files 
  302.    * for the map-reduce job.
  303.    */ 
  304.   public static void setInputPaths(Job job, 
  305.                                    Path... inputPaths) throws IOException {
  306.     Configuration conf = job.getConfiguration();
  307.     FileSystem fs = FileSystem.get(conf);
  308.     Path path = inputPaths[0].makeQualified(fs);
  309.     StringBuffer str = new StringBuffer(StringUtils.escapeString(path.toString()));
  310.     for(int i = 1; i < inputPaths.length;i++) {
  311.       str.append(StringUtils.COMMA_STR);
  312.       path = inputPaths[i].makeQualified(fs);
  313.       str.append(StringUtils.escapeString(path.toString()));
  314.     }
  315.     conf.set("mapred.input.dir", str.toString());
  316.   }
  317.   /**
  318.    * Add a {@link Path} to the list of inputs for the map-reduce job.
  319.    * 
  320.    * @param job The {@link Job} to modify
  321.    * @param path {@link Path} to be added to the list of inputs for 
  322.    *            the map-reduce job.
  323.    */
  324.   public static void addInputPath(Job job, 
  325.                                   Path path) throws IOException {
  326.     Configuration conf = job.getConfiguration();
  327.     FileSystem fs = FileSystem.get(conf);
  328.     path = path.makeQualified(fs);
  329.     String dirStr = StringUtils.escapeString(path.toString());
  330.     String dirs = conf.get("mapred.input.dir");
  331.     conf.set("mapred.input.dir", dirs == null ? dirStr : dirs + "," + dirStr);
  332.   }
  333.   
  334.   // This method escapes commas in the glob pattern of the given paths.
  335.   private static String[] getPathStrings(String commaSeparatedPaths) {
  336.     int length = commaSeparatedPaths.length();
  337.     int curlyOpen = 0;
  338.     int pathStart = 0;
  339.     boolean globPattern = false;
  340.     List<String> pathStrings = new ArrayList<String>();
  341.     
  342.     for (int i=0; i<length; i++) {
  343.       char ch = commaSeparatedPaths.charAt(i);
  344.       switch(ch) {
  345.         case '{' : {
  346.           curlyOpen++;
  347.           if (!globPattern) {
  348.             globPattern = true;
  349.           }
  350.           break;
  351.         }
  352.         case '}' : {
  353.           curlyOpen--;
  354.           if (curlyOpen == 0 && globPattern) {
  355.             globPattern = false;
  356.           }
  357.           break;
  358.         }
  359.         case ',' : {
  360.           if (!globPattern) {
  361.             pathStrings.add(commaSeparatedPaths.substring(pathStart, i));
  362.             pathStart = i + 1 ;
  363.           }
  364.           break;
  365.         }
  366.       }
  367.     }
  368.     pathStrings.add(commaSeparatedPaths.substring(pathStart, length));
  369.     
  370.     return pathStrings.toArray(new String[0]);
  371.   }
  372.   
  373.   /**
  374.    * Get the list of input {@link Path}s for the map-reduce job.
  375.    * 
  376.    * @param context The job
  377.    * @return the list of input {@link Path}s for the map-reduce job.
  378.    */
  379.   public static Path[] getInputPaths(JobContext context) {
  380.     String dirs = context.getConfiguration().get("mapred.input.dir", "");
  381.     String [] list = StringUtils.split(dirs);
  382.     Path[] result = new Path[list.length];
  383.     for (int i = 0; i < list.length; i++) {
  384.       result[i] = new Path(StringUtils.unEscapeString(list[i]));
  385.     }
  386.     return result;
  387.   }
  388. }