FileInputFormat.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:14k
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.apache.hadoop.mapreduce.lib.input;
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.List;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FileStatus;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.fs.PathFilter;
- import org.apache.hadoop.fs.BlockLocation;
- import org.apache.hadoop.mapreduce.InputFormat;
- import org.apache.hadoop.mapreduce.InputSplit;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.JobContext;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.util.ReflectionUtils;
- import org.apache.hadoop.util.StringUtils;
- /**
- * A base class for file-based {@link InputFormat}s.
- *
- * <p><code>FileInputFormat</code> is the base class for all file-based
- * <code>InputFormat</code>s. This provides a generic implementation of
- * {@link #getSplits(JobContext)}.
- * Subclasses of <code>FileInputFormat</code> can also override the
- * {@link #isSplitable(JobContext, Path)} method to ensure input-files are
- * not split-up and are processed as a whole by {@link Mapper}s.
- */
- public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
- private static final Log LOG = LogFactory.getLog(FileInputFormat.class);
- private static final double SPLIT_SLOP = 1.1; // 10% slop
- private static final PathFilter hiddenFileFilter = new PathFilter(){
- public boolean accept(Path p){
- String name = p.getName();
- return !name.startsWith("_") && !name.startsWith(".");
- }
- };
- /**
- * Proxy PathFilter that accepts a path only if all filters given in the
- * constructor do. Used by the listPaths() to apply the built-in
- * hiddenFileFilter together with a user provided one (if any).
- */
- private static class MultiPathFilter implements PathFilter {
- private List<PathFilter> filters;
- public MultiPathFilter(List<PathFilter> filters) {
- this.filters = filters;
- }
- public boolean accept(Path path) {
- for (PathFilter filter : filters) {
- if (!filter.accept(path)) {
- return false;
- }
- }
- return true;
- }
- }
- /**
- * Get the lower bound on split size imposed by the format.
- * @return the number of bytes of the minimal split for this format
- */
- protected long getFormatMinSplitSize() {
- return 1;
- }
- /**
- * Is the given filename splitable? Usually, true, but if the file is
- * stream compressed, it will not be.
- *
- * <code>FileInputFormat</code> implementations can override this and return
- * <code>false</code> to ensure that individual input files are never split-up
- * so that {@link Mapper}s process entire files.
- *
- * @param context the job context
- * @param filename the file name to check
- * @return is this file splitable?
- */
- protected boolean isSplitable(JobContext context, Path filename) {
- return true;
- }
- /**
- * Set a PathFilter to be applied to the input paths for the map-reduce job.
- * @param job the job to modify
- * @param filter the PathFilter class use for filtering the input paths.
- */
- public static void setInputPathFilter(Job job,
- Class<? extends PathFilter> filter) {
- job.getConfiguration().setClass("mapred.input.pathFilter.class", filter,
- PathFilter.class);
- }
- /**
- * Set the minimum input split size
- * @param job the job to modify
- * @param size the minimum size
- */
- public static void setMinInputSplitSize(Job job,
- long size) {
- job.getConfiguration().setLong("mapred.min.split.size", size);
- }
- /**
- * Get the minimum split size
- * @param job the job
- * @return the minimum number of bytes that can be in a split
- */
- public static long getMinSplitSize(JobContext job) {
- return job.getConfiguration().getLong("mapred.min.split.size", 1L);
- }
- /**
- * Set the maximum split size
- * @param job the job to modify
- * @param size the maximum split size
- */
- public static void setMaxInputSplitSize(Job job,
- long size) {
- job.getConfiguration().setLong("mapred.max.split.size", size);
- }
- /**
- * Get the maximum split size.
- * @param context the job to look at.
- * @return the maximum number of bytes a split can include
- */
- public static long getMaxSplitSize(JobContext context) {
- return context.getConfiguration().getLong("mapred.max.split.size",
- Long.MAX_VALUE);
- }
- /**
- * Get a PathFilter instance of the filter set for the input paths.
- *
- * @return the PathFilter instance set for the job, NULL if none has been set.
- */
- public static PathFilter getInputPathFilter(JobContext context) {
- Configuration conf = context.getConfiguration();
- Class<?> filterClass = conf.getClass("mapred.input.pathFilter.class", null,
- PathFilter.class);
- return (filterClass != null) ?
- (PathFilter) ReflectionUtils.newInstance(filterClass, conf) : null;
- }
- /** List input directories.
- * Subclasses may override to, e.g., select only files matching a regular
- * expression.
- *
- * @param job the job to list input paths for
- * @return array of FileStatus objects
- * @throws IOException if zero items.
- */
- protected List<FileStatus> listStatus(JobContext job
- ) throws IOException {
- List<FileStatus> result = new ArrayList<FileStatus>();
- Path[] dirs = getInputPaths(job);
- if (dirs.length == 0) {
- throw new IOException("No input paths specified in job");
- }
- List<IOException> errors = new ArrayList<IOException>();
-
- // creates a MultiPathFilter with the hiddenFileFilter and the
- // user provided one (if any).
- List<PathFilter> filters = new ArrayList<PathFilter>();
- filters.add(hiddenFileFilter);
- PathFilter jobFilter = getInputPathFilter(job);
- if (jobFilter != null) {
- filters.add(jobFilter);
- }
- PathFilter inputFilter = new MultiPathFilter(filters);
-
- for (int i=0; i < dirs.length; ++i) {
- Path p = dirs[i];
- FileSystem fs = p.getFileSystem(job.getConfiguration());
- FileStatus[] matches = fs.globStatus(p, inputFilter);
- if (matches == null) {
- errors.add(new IOException("Input path does not exist: " + p));
- } else if (matches.length == 0) {
- errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
- } else {
- for (FileStatus globStat: matches) {
- if (globStat.isDir()) {
- for(FileStatus stat: fs.listStatus(globStat.getPath(),
- inputFilter)) {
- result.add(stat);
- }
- } else {
- result.add(globStat);
- }
- }
- }
- }
- if (!errors.isEmpty()) {
- throw new InvalidInputException(errors);
- }
- LOG.info("Total input paths to process : " + result.size());
- return result;
- }
-
- /**
- * Generate the list of files and make them into FileSplits.
- */
- public List<InputSplit> getSplits(JobContext job
- ) throws IOException {
- long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
- long maxSize = getMaxSplitSize(job);
- // generate splits
- List<InputSplit> splits = new ArrayList<InputSplit>();
- for (FileStatus file: listStatus(job)) {
- Path path = file.getPath();
- FileSystem fs = path.getFileSystem(job.getConfiguration());
- long length = file.getLen();
- BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
- if ((length != 0) && isSplitable(job, path)) {
- long blockSize = file.getBlockSize();
- long splitSize = computeSplitSize(blockSize, minSize, maxSize);
- long bytesRemaining = length;
- while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
- int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
- splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
- blkLocations[blkIndex].getHosts()));
- bytesRemaining -= splitSize;
- }
-
- if (bytesRemaining != 0) {
- splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,
- blkLocations[blkLocations.length-1].getHosts()));
- }
- } else if (length != 0) {
- splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
- } else {
- //Create empty hosts array for zero length files
- splits.add(new FileSplit(path, 0, length, new String[0]));
- }
- }
- LOG.debug("Total # of splits: " + splits.size());
- return splits;
- }
- protected long computeSplitSize(long blockSize, long minSize,
- long maxSize) {
- return Math.max(minSize, Math.min(maxSize, blockSize));
- }
- protected int getBlockIndex(BlockLocation[] blkLocations,
- long offset) {
- for (int i = 0 ; i < blkLocations.length; i++) {
- // is the offset inside this block?
- if ((blkLocations[i].getOffset() <= offset) &&
- (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
- return i;
- }
- }
- BlockLocation last = blkLocations[blkLocations.length -1];
- long fileLength = last.getOffset() + last.getLength() -1;
- throw new IllegalArgumentException("Offset " + offset +
- " is outside of file (0.." +
- fileLength + ")");
- }
- /**
- * Sets the given comma separated paths as the list of inputs
- * for the map-reduce job.
- *
- * @param job the job
- * @param commaSeparatedPaths Comma separated paths to be set as
- * the list of inputs for the map-reduce job.
- */
- public static void setInputPaths(Job job,
- String commaSeparatedPaths
- ) throws IOException {
- setInputPaths(job, StringUtils.stringToPath(
- getPathStrings(commaSeparatedPaths)));
- }
- /**
- * Add the given comma separated paths to the list of inputs for
- * the map-reduce job.
- *
- * @param job The job to modify
- * @param commaSeparatedPaths Comma separated paths to be added to
- * the list of inputs for the map-reduce job.
- */
- public static void addInputPaths(Job job,
- String commaSeparatedPaths
- ) throws IOException {
- for (String str : getPathStrings(commaSeparatedPaths)) {
- addInputPath(job, new Path(str));
- }
- }
- /**
- * Set the array of {@link Path}s as the list of inputs
- * for the map-reduce job.
- *
- * @param job The job to modify
- * @param inputPaths the {@link Path}s of the input directories/files
- * for the map-reduce job.
- */
- public static void setInputPaths(Job job,
- Path... inputPaths) throws IOException {
- Configuration conf = job.getConfiguration();
- FileSystem fs = FileSystem.get(conf);
- Path path = inputPaths[0].makeQualified(fs);
- StringBuffer str = new StringBuffer(StringUtils.escapeString(path.toString()));
- for(int i = 1; i < inputPaths.length;i++) {
- str.append(StringUtils.COMMA_STR);
- path = inputPaths[i].makeQualified(fs);
- str.append(StringUtils.escapeString(path.toString()));
- }
- conf.set("mapred.input.dir", str.toString());
- }
- /**
- * Add a {@link Path} to the list of inputs for the map-reduce job.
- *
- * @param job The {@link Job} to modify
- * @param path {@link Path} to be added to the list of inputs for
- * the map-reduce job.
- */
- public static void addInputPath(Job job,
- Path path) throws IOException {
- Configuration conf = job.getConfiguration();
- FileSystem fs = FileSystem.get(conf);
- path = path.makeQualified(fs);
- String dirStr = StringUtils.escapeString(path.toString());
- String dirs = conf.get("mapred.input.dir");
- conf.set("mapred.input.dir", dirs == null ? dirStr : dirs + "," + dirStr);
- }
-
- // This method escapes commas in the glob pattern of the given paths.
- private static String[] getPathStrings(String commaSeparatedPaths) {
- int length = commaSeparatedPaths.length();
- int curlyOpen = 0;
- int pathStart = 0;
- boolean globPattern = false;
- List<String> pathStrings = new ArrayList<String>();
-
- for (int i=0; i<length; i++) {
- char ch = commaSeparatedPaths.charAt(i);
- switch(ch) {
- case '{' : {
- curlyOpen++;
- if (!globPattern) {
- globPattern = true;
- }
- break;
- }
- case '}' : {
- curlyOpen--;
- if (curlyOpen == 0 && globPattern) {
- globPattern = false;
- }
- break;
- }
- case ',' : {
- if (!globPattern) {
- pathStrings.add(commaSeparatedPaths.substring(pathStart, i));
- pathStart = i + 1 ;
- }
- break;
- }
- }
- }
- pathStrings.add(commaSeparatedPaths.substring(pathStart, length));
-
- return pathStrings.toArray(new String[0]);
- }
-
- /**
- * Get the list of input {@link Path}s for the map-reduce job.
- *
- * @param context The job
- * @return the list of input {@link Path}s for the map-reduce job.
- */
- public static Path[] getInputPaths(JobContext context) {
- String dirs = context.getConfiguration().get("mapred.input.dir", "");
- String [] list = StringUtils.split(dirs);
- Path[] result = new Path[list.length];
- for (int i = 0; i < list.length; i++) {
- result[i] = new Path(StringUtils.unEscapeString(list[i]));
- }
- return result;
- }
- }