FileInputFormat.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:20k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.IOException;
  20. import java.util.ArrayList;
  21. import java.util.Collections;
  22. import java.util.Comparator;
  23. import java.util.HashSet;
  24. import java.util.IdentityHashMap;
  25. import java.util.LinkedList;
  26. import java.util.List;
  27. import java.util.Map;
  28. import java.util.Set;
  29. import org.apache.commons.logging.Log;
  30. import org.apache.commons.logging.LogFactory;
  31. import org.apache.hadoop.fs.BlockLocation;
  32. import org.apache.hadoop.fs.FileStatus;
  33. import org.apache.hadoop.fs.FileSystem;
  34. import org.apache.hadoop.fs.Path;
  35. import org.apache.hadoop.fs.PathFilter;
  36. import org.apache.hadoop.net.NetworkTopology;
  37. import org.apache.hadoop.net.Node;
  38. import org.apache.hadoop.net.NodeBase;
  39. import org.apache.hadoop.util.ReflectionUtils;
  40. import org.apache.hadoop.util.StringUtils;
  41. /** 
  42.  * A base class for file-based {@link InputFormat}.
  43.  * 
  44.  * <p><code>FileInputFormat</code> is the base class for all file-based 
  45.  * <code>InputFormat</code>s. This provides a generic implementation of
  46.  * {@link #getSplits(JobConf, int)}.
  47.  * Subclasses of <code>FileInputFormat</code> can also override the 
  48.  * {@link #isSplitable(FileSystem, Path)} method to ensure input-files are
  49.  * not split-up and are processed as a whole by {@link Mapper}s.
  50.  * @deprecated Use {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}
  51.  *  instead.
  52.  */
  53. @Deprecated
  54. public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
  55.   public static final Log LOG =
  56.     LogFactory.getLog(FileInputFormat.class);
  57.   private static final double SPLIT_SLOP = 1.1;   // 10% slop
  58.   private long minSplitSize = 1;
  59.   private static final PathFilter hiddenFileFilter = new PathFilter(){
  60.       public boolean accept(Path p){
  61.         String name = p.getName(); 
  62.         return !name.startsWith("_") && !name.startsWith("."); 
  63.       }
  64.     }; 
  65.   protected void setMinSplitSize(long minSplitSize) {
  66.     this.minSplitSize = minSplitSize;
  67.   }
  68.   /**
  69.    * Proxy PathFilter that accepts a path only if all filters given in the
  70.    * constructor do. Used by the listPaths() to apply the built-in
  71.    * hiddenFileFilter together with a user provided one (if any).
  72.    */
  73.   private static class MultiPathFilter implements PathFilter {
  74.     private List<PathFilter> filters;
  75.     public MultiPathFilter(List<PathFilter> filters) {
  76.       this.filters = filters;
  77.     }
  78.     public boolean accept(Path path) {
  79.       for (PathFilter filter : filters) {
  80.         if (!filter.accept(path)) {
  81.           return false;
  82.         }
  83.       }
  84.       return true;
  85.     }
  86.   }
  87.   /**
  88.    * Is the given filename splitable? Usually, true, but if the file is
  89.    * stream compressed, it will not be.
  90.    * 
  91.    * <code>FileInputFormat</code> implementations can override this and return
  92.    * <code>false</code> to ensure that individual input files are never split-up
  93.    * so that {@link Mapper}s process entire files.
  94.    * 
  95.    * @param fs the file system that the file is on
  96.    * @param filename the file name to check
  97.    * @return is this file splitable?
  98.    */
  99.   protected boolean isSplitable(FileSystem fs, Path filename) {
  100.     return true;
  101.   }
  102.   
  103.   public abstract RecordReader<K, V> getRecordReader(InputSplit split,
  104.                                                JobConf job,
  105.                                                Reporter reporter)
  106.     throws IOException;
  107.   /**
  108.    * Set a PathFilter to be applied to the input paths for the map-reduce job.
  109.    *
  110.    * @param filter the PathFilter class use for filtering the input paths.
  111.    */
  112.   public static void setInputPathFilter(JobConf conf,
  113.                                         Class<? extends PathFilter> filter) {
  114.     conf.setClass("mapred.input.pathFilter.class", filter, PathFilter.class);
  115.   }
  116.   /**
  117.    * Get a PathFilter instance of the filter set for the input paths.
  118.    *
  119.    * @return the PathFilter instance set for the job, NULL if none has been set.
  120.    */
  121.   public static PathFilter getInputPathFilter(JobConf conf) {
  122.     Class<? extends PathFilter> filterClass = conf.getClass(
  123. "mapred.input.pathFilter.class", null, PathFilter.class);
  124.     return (filterClass != null) ?
  125.         ReflectionUtils.newInstance(filterClass, conf) : null;
  126.   }
  127.   /** List input directories.
  128.    * Subclasses may override to, e.g., select only files matching a regular
  129.    * expression. 
  130.    * 
  131.    * @param job the job to list input paths for
  132.    * @return array of FileStatus objects
  133.    * @throws IOException if zero items.
  134.    */
  135.   protected FileStatus[] listStatus(JobConf job) throws IOException {
  136.     Path[] dirs = getInputPaths(job);
  137.     if (dirs.length == 0) {
  138.       throw new IOException("No input paths specified in job");
  139.     }
  140.     List<FileStatus> result = new ArrayList<FileStatus>();
  141.     List<IOException> errors = new ArrayList<IOException>();
  142.     
  143.     // creates a MultiPathFilter with the hiddenFileFilter and the
  144.     // user provided one (if any).
  145.     List<PathFilter> filters = new ArrayList<PathFilter>();
  146.     filters.add(hiddenFileFilter);
  147.     PathFilter jobFilter = getInputPathFilter(job);
  148.     if (jobFilter != null) {
  149.       filters.add(jobFilter);
  150.     }
  151.     PathFilter inputFilter = new MultiPathFilter(filters);
  152.     for (Path p: dirs) {
  153.       FileSystem fs = p.getFileSystem(job); 
  154.       FileStatus[] matches = fs.globStatus(p, inputFilter);
  155.       if (matches == null) {
  156.         errors.add(new IOException("Input path does not exist: " + p));
  157.       } else if (matches.length == 0) {
  158.         errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
  159.       } else {
  160.         for (FileStatus globStat: matches) {
  161.           if (globStat.isDir()) {
  162.             for(FileStatus stat: fs.listStatus(globStat.getPath(),
  163.                 inputFilter)) {
  164.               result.add(stat);
  165.             }          
  166.           } else {
  167.             result.add(globStat);
  168.           }
  169.         }
  170.       }
  171.     }
  172.     if (!errors.isEmpty()) {
  173.       throw new InvalidInputException(errors);
  174.     }
  175.     LOG.info("Total input paths to process : " + result.size()); 
  176.     return result.toArray(new FileStatus[result.size()]);
  177.   }
  178.   /** Splits files returned by {@link #listStatus(JobConf)} when
  179.    * they're too big.*/ 
  180.   @SuppressWarnings("deprecation")
  181.   public InputSplit[] getSplits(JobConf job, int numSplits)
  182.     throws IOException {
  183.     FileStatus[] files = listStatus(job);
  184.     
  185.     long totalSize = 0;                           // compute total size
  186.     for (FileStatus file: files) {                // check we have valid files
  187.       if (file.isDir()) {
  188.         throw new IOException("Not a file: "+ file.getPath());
  189.       }
  190.       totalSize += file.getLen();
  191.     }
  192.     long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
  193.     long minSize = Math.max(job.getLong("mapred.min.split.size", 1),
  194.                             minSplitSize);
  195.     // generate splits
  196.     ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
  197.     NetworkTopology clusterMap = new NetworkTopology();
  198.     for (FileStatus file: files) {
  199.       Path path = file.getPath();
  200.       FileSystem fs = path.getFileSystem(job);
  201.       long length = file.getLen();
  202.       BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
  203.       if ((length != 0) && isSplitable(fs, path)) { 
  204.         long blockSize = file.getBlockSize();
  205.         long splitSize = computeSplitSize(goalSize, minSize, blockSize);
  206.         long bytesRemaining = length;
  207.         while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
  208.           String[] splitHosts = getSplitHosts(blkLocations, 
  209.               length-bytesRemaining, splitSize, clusterMap);
  210.           splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 
  211.               splitHosts));
  212.           bytesRemaining -= splitSize;
  213.         }
  214.         
  215.         if (bytesRemaining != 0) {
  216.           splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, 
  217.                      blkLocations[blkLocations.length-1].getHosts()));
  218.         }
  219.       } else if (length != 0) {
  220.         String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
  221.         splits.add(new FileSplit(path, 0, length, splitHosts));
  222.       } else { 
  223.         //Create empty hosts array for zero length files
  224.         splits.add(new FileSplit(path, 0, length, new String[0]));
  225.       }
  226.     }
  227.     LOG.debug("Total # of splits: " + splits.size());
  228.     return splits.toArray(new FileSplit[splits.size()]);
  229.   }
  230.   protected long computeSplitSize(long goalSize, long minSize,
  231.                                        long blockSize) {
  232.     return Math.max(minSize, Math.min(goalSize, blockSize));
  233.   }
  234.   protected int getBlockIndex(BlockLocation[] blkLocations, 
  235.                               long offset) {
  236.     for (int i = 0 ; i < blkLocations.length; i++) {
  237.       // is the offset inside this block?
  238.       if ((blkLocations[i].getOffset() <= offset) &&
  239.           (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
  240.         return i;
  241.       }
  242.     }
  243.     BlockLocation last = blkLocations[blkLocations.length -1];
  244.     long fileLength = last.getOffset() + last.getLength() -1;
  245.     throw new IllegalArgumentException("Offset " + offset + 
  246.                                        " is outside of file (0.." +
  247.                                        fileLength + ")");
  248.   }
  249.   /**
  250.    * Sets the given comma separated paths as the list of inputs 
  251.    * for the map-reduce job.
  252.    * 
  253.    * @param conf Configuration of the job
  254.    * @param commaSeparatedPaths Comma separated paths to be set as 
  255.    *        the list of inputs for the map-reduce job.
  256.    */
  257.   public static void setInputPaths(JobConf conf, String commaSeparatedPaths) {
  258.     setInputPaths(conf, StringUtils.stringToPath(
  259.                         getPathStrings(commaSeparatedPaths)));
  260.   }
  261.   /**
  262.    * Add the given comma separated paths to the list of inputs for
  263.    *  the map-reduce job.
  264.    * 
  265.    * @param conf The configuration of the job 
  266.    * @param commaSeparatedPaths Comma separated paths to be added to
  267.    *        the list of inputs for the map-reduce job.
  268.    */
  269.   public static void addInputPaths(JobConf conf, String commaSeparatedPaths) {
  270.     for (String str : getPathStrings(commaSeparatedPaths)) {
  271.       addInputPath(conf, new Path(str));
  272.     }
  273.   }
  274.   /**
  275.    * Set the array of {@link Path}s as the list of inputs
  276.    * for the map-reduce job.
  277.    * 
  278.    * @param conf Configuration of the job. 
  279.    * @param inputPaths the {@link Path}s of the input directories/files 
  280.    * for the map-reduce job.
  281.    */ 
  282.   public static void setInputPaths(JobConf conf, Path... inputPaths) {
  283.     Path path = new Path(conf.getWorkingDirectory(), inputPaths[0]);
  284.     StringBuffer str = new StringBuffer(StringUtils.escapeString(path.toString()));
  285.     for(int i = 1; i < inputPaths.length;i++) {
  286.       str.append(StringUtils.COMMA_STR);
  287.       path = new Path(conf.getWorkingDirectory(), inputPaths[i]);
  288.       str.append(StringUtils.escapeString(path.toString()));
  289.     }
  290.     conf.set("mapred.input.dir", str.toString());
  291.   }
  292.   /**
  293.    * Add a {@link Path} to the list of inputs for the map-reduce job.
  294.    * 
  295.    * @param conf The configuration of the job 
  296.    * @param path {@link Path} to be added to the list of inputs for 
  297.    *            the map-reduce job.
  298.    */
  299.   public static void addInputPath(JobConf conf, Path path ) {
  300.     path = new Path(conf.getWorkingDirectory(), path);
  301.     String dirStr = StringUtils.escapeString(path.toString());
  302.     String dirs = conf.get("mapred.input.dir");
  303.     conf.set("mapred.input.dir", dirs == null ? dirStr :
  304.       dirs + StringUtils.COMMA_STR + dirStr);
  305.   }
  306.          
  307.   // This method escapes commas in the glob pattern of the given paths.
  308.   private static String[] getPathStrings(String commaSeparatedPaths) {
  309.     int length = commaSeparatedPaths.length();
  310.     int curlyOpen = 0;
  311.     int pathStart = 0;
  312.     boolean globPattern = false;
  313.     List<String> pathStrings = new ArrayList<String>();
  314.     
  315.     for (int i=0; i<length; i++) {
  316.       char ch = commaSeparatedPaths.charAt(i);
  317.       switch(ch) {
  318.         case '{' : {
  319.           curlyOpen++;
  320.           if (!globPattern) {
  321.             globPattern = true;
  322.           }
  323.           break;
  324.         }
  325.         case '}' : {
  326.           curlyOpen--;
  327.           if (curlyOpen == 0 && globPattern) {
  328.             globPattern = false;
  329.           }
  330.           break;
  331.         }
  332.         case ',' : {
  333.           if (!globPattern) {
  334.             pathStrings.add(commaSeparatedPaths.substring(pathStart, i));
  335.             pathStart = i + 1 ;
  336.           }
  337.           break;
  338.         }
  339.       }
  340.     }
  341.     pathStrings.add(commaSeparatedPaths.substring(pathStart, length));
  342.     
  343.     return pathStrings.toArray(new String[0]);
  344.   }
  345.   
  346.   /**
  347.    * Get the list of input {@link Path}s for the map-reduce job.
  348.    * 
  349.    * @param conf The configuration of the job 
  350.    * @return the list of input {@link Path}s for the map-reduce job.
  351.    */
  352.   public static Path[] getInputPaths(JobConf conf) {
  353.     String dirs = conf.get("mapred.input.dir", "");
  354.     String [] list = StringUtils.split(dirs);
  355.     Path[] result = new Path[list.length];
  356.     for (int i = 0; i < list.length; i++) {
  357.       result[i] = new Path(StringUtils.unEscapeString(list[i]));
  358.     }
  359.     return result;
  360.   }
  361.   
  362.   private void sortInDescendingOrder(List<NodeInfo> mylist) {
  363.     Collections.sort(mylist, new Comparator<NodeInfo> () {
  364.       public int compare(NodeInfo obj1, NodeInfo obj2) {
  365.         if (obj1 == null || obj2 == null)
  366.           return -1;
  367.         if (obj1.getValue() == obj2.getValue()) {
  368.           return 0;
  369.         }
  370.         else {
  371.           return ((obj1.getValue() < obj2.getValue()) ? 1 : -1);
  372.         }
  373.       }
  374.     }
  375.     );
  376.   }
  377.   /** 
  378.    * This function identifies and returns the hosts that contribute 
  379.    * most for a given split. For calculating the contribution, rack
  380.    * locality is treated on par with host locality, so hosts from racks
  381.    * that contribute the most are preferred over hosts on racks that 
  382.    * contribute less
  383.    * @param blkLocations The list of block locations
  384.    * @param offset 
  385.    * @param splitSize 
  386.    * @return array of hosts that contribute most to this split
  387.    * @throws IOException
  388.    */
  389.   protected String[] getSplitHosts(BlockLocation[] blkLocations, 
  390.       long offset, long splitSize, NetworkTopology clusterMap)
  391.   throws IOException {
  392.     int startIndex = getBlockIndex(blkLocations, offset);
  393.     long bytesInThisBlock = blkLocations[startIndex].getOffset() + 
  394.                           blkLocations[startIndex].getLength() - offset;
  395.     //If this is the only block, just return
  396.     if (bytesInThisBlock >= splitSize) {
  397.       return blkLocations[startIndex].getHosts();
  398.     }
  399.     long bytesInFirstBlock = bytesInThisBlock;
  400.     int index = startIndex + 1;
  401.     splitSize -= bytesInThisBlock;
  402.     while (splitSize > 0) {
  403.       bytesInThisBlock =
  404.         Math.min(splitSize, blkLocations[index++].getLength());
  405.       splitSize -= bytesInThisBlock;
  406.     }
  407.     long bytesInLastBlock = bytesInThisBlock;
  408.     int endIndex = index - 1;
  409.     
  410.     Map <Node,NodeInfo> hostsMap = new IdentityHashMap<Node,NodeInfo>();
  411.     Map <Node,NodeInfo> racksMap = new IdentityHashMap<Node,NodeInfo>();
  412.     String [] allTopos = new String[0];
  413.     // Build the hierarchy and aggregate the contribution of 
  414.     // bytes at each level. See TestGetSplitHosts.java 
  415.     for (index = startIndex; index <= endIndex; index++) {
  416.       // Establish the bytes in this block
  417.       if (index == startIndex) {
  418.         bytesInThisBlock = bytesInFirstBlock;
  419.       }
  420.       else if (index == endIndex) {
  421.         bytesInThisBlock = bytesInLastBlock;
  422.       }
  423.       else {
  424.         bytesInThisBlock = blkLocations[index].getLength();
  425.       }
  426.       
  427.       allTopos = blkLocations[index].getTopologyPaths();
  428.       // If no topology information is available, just
  429.       // prefix a fakeRack
  430.       if (allTopos.length == 0) {
  431.         allTopos = fakeRacks(blkLocations, index);
  432.       }
  433.       // NOTE: This code currently works only for one level of
  434.       // hierarchy (rack/host). However, it is relatively easy
  435.       // to extend this to support aggregation at different
  436.       // levels 
  437.       
  438.       for (String topo: allTopos) {
  439.         Node node, parentNode;
  440.         NodeInfo nodeInfo, parentNodeInfo;
  441.         node = clusterMap.getNode(topo);
  442.         if (node == null) {
  443.           node = new NodeBase(topo);
  444.           clusterMap.add(node);
  445.         }
  446.         
  447.         nodeInfo = hostsMap.get(node);
  448.         
  449.         if (nodeInfo == null) {
  450.           nodeInfo = new NodeInfo(node);
  451.           hostsMap.put(node,nodeInfo);
  452.           parentNode = node.getParent();
  453.           parentNodeInfo = racksMap.get(parentNode);
  454.           if (parentNodeInfo == null) {
  455.             parentNodeInfo = new NodeInfo(parentNode);
  456.             racksMap.put(parentNode,parentNodeInfo);
  457.           }
  458.           parentNodeInfo.addLeaf(nodeInfo);
  459.         }
  460.         else {
  461.           nodeInfo = hostsMap.get(node);
  462.           parentNode = node.getParent();
  463.           parentNodeInfo = racksMap.get(parentNode);
  464.         }
  465.         nodeInfo.addValue(index, bytesInThisBlock);
  466.         parentNodeInfo.addValue(index, bytesInThisBlock);
  467.       } // for all topos
  468.     
  469.     } // for all indices
  470.     return identifyHosts(allTopos.length, racksMap);
  471.   }
  472.   
  473.   private String[] identifyHosts(int replicationFactor, 
  474.                                  Map<Node,NodeInfo> racksMap) {
  475.     
  476.     String [] retVal = new String[replicationFactor];
  477.    
  478.     List <NodeInfo> rackList = new LinkedList<NodeInfo>(); 
  479.     rackList.addAll(racksMap.values());
  480.     
  481.     // Sort the racks based on their contribution to this split
  482.     sortInDescendingOrder(rackList);
  483.     
  484.     boolean done = false;
  485.     int index = 0;
  486.     
  487.     // Get the host list for all our aggregated items, sort
  488.     // them and return the top entries
  489.     for (NodeInfo ni: rackList) {
  490.       Set<NodeInfo> hostSet = ni.getLeaves();
  491.       List<NodeInfo>hostList = new LinkedList<NodeInfo>();
  492.       hostList.addAll(hostSet);
  493.     
  494.       // Sort the hosts in this rack based on their contribution
  495.       sortInDescendingOrder(hostList);
  496.       for (NodeInfo host: hostList) {
  497.         // Strip out the port number from the host name
  498.         retVal[index++] = host.node.getName().split(":")[0];
  499.         if (index == replicationFactor) {
  500.           done = true;
  501.           break;
  502.         }
  503.       }
  504.       
  505.       if (done == true) {
  506.         break;
  507.       }
  508.     }
  509.     return retVal;
  510.   }
  511.   
  512.   private String[] fakeRacks(BlockLocation[] blkLocations, int index) 
  513.   throws IOException {
  514.     String[] allHosts = blkLocations[index].getHosts();
  515.     String[] allTopos = new String[allHosts.length];
  516.     for (int i = 0; i < allHosts.length; i++) {
  517.       allTopos[i] = NetworkTopology.DEFAULT_RACK + "/" + allHosts[i];
  518.     }
  519.     return allTopos;
  520.   }
  521.   private static class NodeInfo {
  522.     final Node node;
  523.     final Set<Integer> blockIds;
  524.     final Set<NodeInfo> leaves;
  525.     private long value;
  526.     
  527.     NodeInfo(Node node) {
  528.       this.node = node;
  529.       blockIds = new HashSet<Integer>();
  530.       leaves = new HashSet<NodeInfo>();
  531.     }
  532.     long getValue() {return value;}
  533.     void addValue(int blockIndex, long value) {
  534.       if (blockIds.add(blockIndex) == true) {
  535.         this.value += value;
  536.       }
  537.     }
  538.     Set<NodeInfo> getLeaves() { return leaves;}
  539.     void addLeaf(NodeInfo nodeInfo) {
  540.       leaves.add(nodeInfo);
  541.     }
  542.   }
  543. }