CombineFileInputFormat.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:21k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred.lib;
  19. import java.io.IOException;
  20. import java.util.ArrayList;
  21. import java.util.List;
  22. import java.util.HashMap;
  23. import java.util.Set;
  24. import java.util.Iterator;
  25. import java.util.Map;
  26. import java.util.Map.Entry;
  27. import org.apache.hadoop.fs.FileSystem;
  28. import org.apache.hadoop.fs.FileUtil;
  29. import org.apache.hadoop.fs.Path;
  30. import org.apache.hadoop.fs.BlockLocation;
  31. import org.apache.hadoop.fs.FileStatus;
  32. import org.apache.hadoop.fs.PathFilter;
  33. import org.apache.hadoop.net.NodeBase;
  34. import org.apache.hadoop.net.NetworkTopology;
  35. import org.apache.hadoop.mapred.InputSplit;
  36. import org.apache.hadoop.mapred.FileInputFormat;
  37. import org.apache.hadoop.mapred.JobConf;
  38. import org.apache.hadoop.mapred.Reporter;
  39. import org.apache.hadoop.mapred.RecordReader;
  40. /**
  41.  * An abstract {@link org.apache.hadoop.mapred.InputFormat} that returns {@link CombineFileSplit}'s
  42.  * in {@link org.apache.hadoop.mapred.InputFormat#getSplits(JobConf, int)} method. 
  43.  * Splits are constructed from the files under the input paths. 
  44.  * A split cannot have files from different pools.
  45.  * Each split returned may contain blocks from different files.
  46.  * If a maxSplitSize is specified, then blocks on the same node are
  47.  * combined to form a single split. Blocks that are left over are
  48.  * then combined with other blocks in the same rack. 
  49.  * If maxSplitSize is not specified, then blocks from the same rack
  50.  * are combined in a single split; no attempt is made to create
  51.  * node-local splits.
  52.  * If the maxSplitSize is equal to the block size, then this class
  53.  * is similar to the default spliting behaviour in Hadoop: each
  54.  * block is a locally processed split.
  55.  * Subclasses implement {@link org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit, JobConf, Reporter)}
  56.  * to construct <code>RecordReader</code>'s for <code>CombineFileSplit</code>'s.
  57.  * @see CombineFileSplit
  58.  */
  59. public abstract class CombineFileInputFormat<K, V>
  60.   extends FileInputFormat<K, V> {
  61.   // ability to limit the size of a single split
  62.   private long maxSplitSize = 0;
  63.   private long minSplitSizeNode = 0;
  64.   private long minSplitSizeRack = 0;
  65.   // A pool of input paths filters. A split cannot have blocks from files
  66.   // across multiple pools.
  67.   private ArrayList<MultiPathFilter> pools = new  ArrayList<MultiPathFilter>();
  68.   /**
  69.    * Specify the maximum size (in bytes) of each split. Each split is
  70.    * approximately equal to the specified size.
  71.    */
  72.   protected void setMaxSplitSize(long maxSplitSize) {
  73.     this.maxSplitSize = maxSplitSize;
  74.   }
  75.   /**
  76.    * Specify the minimum size (in bytes) of each split per node.
  77.    * This applies to data that is left over after combining data on a single
  78.    * node into splits that are of maximum size specified by maxSplitSize.
  79.    * This leftover data will be combined into its own split if its size
  80.    * exceeds minSplitSizeNode.
  81.    */
  82.   protected void setMinSplitSizeNode(long minSplitSizeNode) {
  83.     this.minSplitSizeNode = minSplitSizeNode;
  84.   }
  85.   /**
  86.    * Specify the minimum size (in bytes) of each split per rack.
  87.    * This applies to data that is left over after combining data on a single
  88.    * rack into splits that are of maximum size specified by maxSplitSize.
  89.    * This leftover data will be combined into its own split if its size
  90.    * exceeds minSplitSizeRack.
  91.    */
  92.   protected void setMinSplitSizeRack(long minSplitSizeRack) {
  93.     this.minSplitSizeRack = minSplitSizeRack;
  94.   }
  95.   /**
  96.    * Create a new pool and add the filters to it.
  97.    * A split cannot have files from different pools.
  98.    */
  99.   protected void createPool(JobConf conf, List<PathFilter> filters) {
  100.     pools.add(new MultiPathFilter(filters));
  101.   }
  102.   /**
  103.    * Create a new pool and add the filters to it. 
  104.    * A pathname can satisfy any one of the specified filters.
  105.    * A split cannot have files from different pools.
  106.    */
  107.   protected void createPool(JobConf conf, PathFilter... filters) {
  108.     MultiPathFilter multi = new MultiPathFilter();
  109.     for (PathFilter f: filters) {
  110.       multi.add(f);
  111.     }
  112.     pools.add(multi);
  113.   }
  114.   /**
  115.    * default constructor
  116.    */
  117.   public CombineFileInputFormat() {
  118.   }
  119.   @Override
  120.   public InputSplit[] getSplits(JobConf job, int numSplits) 
  121.     throws IOException {
  122.     long minSizeNode = 0;
  123.     long minSizeRack = 0;
  124.     long maxSize = 0;
  125.     // the values specified by setxxxSplitSize() takes precedence over the
  126.     // values that might have been specified in the config
  127.     if (minSplitSizeNode != 0) {
  128.       minSizeNode = minSplitSizeNode;
  129.     } else {
  130.       minSizeNode = job.getLong("mapred.min.split.size.per.node", 0);
  131.     }
  132.     if (minSplitSizeRack != 0) {
  133.       minSizeRack = minSplitSizeRack;
  134.     } else {
  135.       minSizeRack = job.getLong("mapred.min.split.size.per.rack", 0);
  136.     }
  137.     if (maxSplitSize != 0) {
  138.       maxSize = maxSplitSize;
  139.     } else {
  140.       maxSize = job.getLong("mapred.max.split.size", 0);
  141.     }
  142.     if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) {
  143.       throw new IOException("Minimum split size pernode " + minSizeNode +
  144.                             " cannot be larger than maximum split size " +
  145.                             maxSize);
  146.     }
  147.     if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) {
  148.       throw new IOException("Minimum split size per rack" + minSizeRack +
  149.                             " cannot be larger than maximum split size " +
  150.                             maxSize);
  151.     }
  152.     if (minSizeRack != 0 && minSizeNode > minSizeRack) {
  153.       throw new IOException("Minimum split size per node" + minSizeNode +
  154.                             " cannot be smaller than minimum split size per rack " +
  155.                             minSizeRack);
  156.     }
  157.     // all the files in input set
  158.     Path[] paths = FileUtil.stat2Paths(listStatus(job));
  159.     List<CombineFileSplit> splits = new ArrayList<CombineFileSplit>();
  160.     if (paths.length == 0) {
  161.       return splits.toArray(new CombineFileSplit[splits.size()]);    
  162.     }
  163.     // In one single iteration, process all the paths in a single pool.
  164.     // Processing one pool at a time ensures that a split contans paths
  165.     // from a single pool only.
  166.     for (MultiPathFilter onepool : pools) {
  167.       ArrayList<Path> myPaths = new ArrayList<Path>();
  168.       
  169.       // pick one input path. If it matches all the filters in a pool,
  170.       // add it to the output set
  171.       for (int i = 0; i < paths.length; i++) {
  172.         if (paths[i] == null) {  // already processed
  173.           continue;
  174.         }
  175.         FileSystem fs = paths[i].getFileSystem(job);
  176.         Path p = new Path(paths[i].toUri().getPath());
  177.         if (onepool.accept(p)) {
  178.           myPaths.add(paths[i]); // add it to my output set
  179.           paths[i] = null;       // already processed
  180.         }
  181.       }
  182.       // create splits for all files in this pool.
  183.       getMoreSplits(job, myPaths.toArray(new Path[myPaths.size()]), 
  184.                     maxSize, minSizeNode, minSizeRack, splits);
  185.     }
  186.     // Finally, process all paths that do not belong to any pool.
  187.     ArrayList<Path> myPaths = new ArrayList<Path>();
  188.     for (int i = 0; i < paths.length; i++) {
  189.       if (paths[i] == null) {  // already processed
  190.         continue;
  191.       }
  192.       myPaths.add(paths[i]);
  193.     }
  194.     // create splits for all files that are not in any pool.
  195.     getMoreSplits(job, myPaths.toArray(new Path[myPaths.size()]), 
  196.                   maxSize, minSizeNode, minSizeRack, splits);
  197.     return splits.toArray(new CombineFileSplit[splits.size()]);    
  198.   }
  199.   /**
  200.    * Return all the splits in the specified set of paths
  201.    */
  202.   private void getMoreSplits(JobConf job, Path[] paths, 
  203.                              long maxSize, long minSizeNode, long minSizeRack,
  204.                              List<CombineFileSplit> splits)
  205.     throws IOException {
  206.     // all blocks for all the files in input set
  207.     OneFileInfo[] files;
  208.   
  209.     // mapping from a rack name to the list of blocks it has
  210.     HashMap<String, List<OneBlockInfo>> rackToBlocks = 
  211.                               new HashMap<String, List<OneBlockInfo>>();
  212.     // mapping from a block to the nodes on which it has replicas
  213.     HashMap<OneBlockInfo, String[]> blockToNodes = 
  214.                               new HashMap<OneBlockInfo, String[]>();
  215.     // mapping from a node to the list of blocks that it contains
  216.     HashMap<String, List<OneBlockInfo>> nodeToBlocks = 
  217.                               new HashMap<String, List<OneBlockInfo>>();
  218.     
  219.     files = new OneFileInfo[paths.length];
  220.     if (paths.length == 0) {
  221.       return; 
  222.     }
  223.     // populate all the blocks for all files
  224.     long totLength = 0;
  225.     for (int i = 0; i < paths.length; i++) {
  226.       files[i] = new OneFileInfo(paths[i], job, 
  227.                                  rackToBlocks, blockToNodes, nodeToBlocks);
  228.       totLength += files[i].getLength();
  229.     }
  230.     ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
  231.     ArrayList<String> nodes = new ArrayList<String>();
  232.     long curSplitSize = 0;
  233.     // process all nodes and create splits that are local
  234.     // to a node. 
  235.     for (Iterator<Map.Entry<String, 
  236.          List<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator(); 
  237.          iter.hasNext();) {
  238.       Map.Entry<String, List<OneBlockInfo>> one = iter.next();
  239.       nodes.add(one.getKey());
  240.       List<OneBlockInfo> blocksInNode = one.getValue();
  241.       // for each block, copy it into validBlocks. Delete it from 
  242.       // blockToNodes so that the same block does not appear in 
  243.       // two different splits.
  244.       for (OneBlockInfo oneblock : blocksInNode) {
  245.         if (blockToNodes.containsKey(oneblock)) {
  246.           validBlocks.add(oneblock);
  247.           blockToNodes.remove(oneblock);
  248.           curSplitSize += oneblock.length;
  249.           // if the accumulated split size exceeds the maximum, then 
  250.           // create this split.
  251.           if (maxSize != 0 && curSplitSize >= maxSize) {
  252.             // create an input split and add it to the splits array
  253.             addCreatedSplit(job, splits, nodes, validBlocks);
  254.             curSplitSize = 0;
  255.             validBlocks.clear();
  256.           }
  257.         }
  258.       }
  259.       // if there were any blocks left over and their combined size is
  260.       // larger than minSplitNode, then combine them into one split.
  261.       // Otherwise add them back to the unprocessed pool. It is likely 
  262.       // that they will be combined with other blocks from the same rack later on.
  263.       if (minSizeNode != 0 && curSplitSize >= minSizeNode) {
  264.         // create an input split and add it to the splits array
  265.         addCreatedSplit(job, splits, nodes, validBlocks);
  266.       } else {
  267.         for (OneBlockInfo oneblock : validBlocks) {
  268.           blockToNodes.put(oneblock, oneblock.hosts);
  269.         }
  270.       }
  271.       validBlocks.clear();
  272.       nodes.clear();
  273.       curSplitSize = 0;
  274.     }
  275.     // if blocks in a rack are below the specified minimum size, then keep them
  276.     // in 'overflow'. After the processing of all racks is complete, these overflow
  277.     // blocks will be combined into splits.
  278.     ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();
  279.     ArrayList<String> racks = new ArrayList<String>();
  280.     // Process all racks over and over again until there is no more work to do.
  281.     while (blockToNodes.size() > 0) {
  282.       // Create one split for this rack before moving over to the next rack. 
  283.       // Come back to this rack after creating a single split for each of the 
  284.       // remaining racks.
  285.       // Process one rack location at a time, Combine all possible blocks that
  286.       // reside on this rack as one split. (constrained by minimum and maximum
  287.       // split size).
  288.       // iterate over all racks 
  289.       for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter = 
  290.            rackToBlocks.entrySet().iterator(); iter.hasNext();) {
  291.         Map.Entry<String, List<OneBlockInfo>> one = iter.next();
  292.         racks.add(one.getKey());
  293.         List<OneBlockInfo> blocks = one.getValue();
  294.         // for each block, copy it into validBlocks. Delete it from 
  295.         // blockToNodes so that the same block does not appear in 
  296.         // two different splits.
  297.         boolean createdSplit = false;
  298.         for (OneBlockInfo oneblock : blocks) {
  299.           if (blockToNodes.containsKey(oneblock)) {
  300.             validBlocks.add(oneblock);
  301.             blockToNodes.remove(oneblock);
  302.             curSplitSize += oneblock.length;
  303.       
  304.             // if the accumulated split size exceeds the maximum, then 
  305.             // create this split.
  306.             if (maxSize != 0 && curSplitSize >= maxSize) {
  307.               // create an input split and add it to the splits array
  308.               addCreatedSplit(job, splits, racks, validBlocks);
  309.               createdSplit = true;
  310.               break;
  311.             }
  312.           }
  313.         }
  314.         // if we created a split, then just go to the next rack
  315.         if (createdSplit) {
  316.           curSplitSize = 0;
  317.           validBlocks.clear();
  318.           racks.clear();
  319.           continue;
  320.         }
  321.         if (!validBlocks.isEmpty()) {
  322.           if (minSizeRack != 0 && curSplitSize >= minSizeRack) {
  323.             // if there is a mimimum size specified, then create a single split
  324.             // otherwise, store these blocks into overflow data structure
  325.             addCreatedSplit(job, splits, racks, validBlocks);
  326.           } else {
  327.             // There were a few blocks in this rack that remained to be processed.
  328.             // Keep them in 'overflow' block list. These will be combined later.
  329.             overflowBlocks.addAll(validBlocks);
  330.           }
  331.         }
  332.         curSplitSize = 0;
  333.         validBlocks.clear();
  334.         racks.clear();
  335.       }
  336.     }
  337.     assert blockToNodes.isEmpty();
  338.     assert curSplitSize == 0;
  339.     assert validBlocks.isEmpty();
  340.     assert racks.isEmpty();
  341.     // Process all overflow blocks
  342.     for (OneBlockInfo oneblock : overflowBlocks) {
  343.       validBlocks.add(oneblock);
  344.       curSplitSize += oneblock.length;
  345.       // This might cause an exiting rack location to be re-added,
  346.       // but it should be ok.
  347.       for (int i = 0; i < oneblock.racks.length; i++) {
  348.         racks.add(oneblock.racks[i]);
  349.       }
  350.       // if the accumulated split size exceeds the maximum, then 
  351.       // create this split.
  352.       if (maxSize != 0 && curSplitSize >= maxSize) {
  353.         // create an input split and add it to the splits array
  354.         addCreatedSplit(job, splits, racks, validBlocks);
  355.         curSplitSize = 0;
  356.         validBlocks.clear();
  357.         racks.clear();
  358.       }
  359.     }
  360.     // Process any remaining blocks, if any.
  361.     if (!validBlocks.isEmpty()) {
  362.       addCreatedSplit(job, splits, racks, validBlocks);
  363.     }
  364.   }
  365.   /**
  366.    * Create a single split from the list of blocks specified in validBlocks
  367.    * Add this new split into splitList.
  368.    */
  369.   private void addCreatedSplit(JobConf job,
  370.                                List<CombineFileSplit> splitList, 
  371.                                List<String> racks, 
  372.                                ArrayList<OneBlockInfo> validBlocks) {
  373.     // create an input split
  374.     Path[] fl = new Path[validBlocks.size()];
  375.     long[] offset = new long[validBlocks.size()];
  376.     long[] length = new long[validBlocks.size()];
  377.     String[] rackLocations = racks.toArray(new String[racks.size()]);
  378.     for (int i = 0; i < validBlocks.size(); i++) {
  379.       fl[i] = validBlocks.get(i).onepath; 
  380.       offset[i] = validBlocks.get(i).offset;
  381.       length[i] = validBlocks.get(i).length;
  382.     }
  383.      // add this split to the list that is returned
  384.     CombineFileSplit thissplit = new CombineFileSplit(job, fl, offset, 
  385.                                                       length, rackLocations);
  386.     splitList.add(thissplit); 
  387.   }
  388.   /**
  389.    * This is not implemented yet. 
  390.    */
  391.   public abstract RecordReader<K, V> getRecordReader(InputSplit split,
  392.                                       JobConf job, Reporter reporter)
  393.     throws IOException;
  394.   /**
  395.    * information about one file from the File System
  396.    */
  397.   private static class OneFileInfo {
  398.     private long fileSize;               // size of the file
  399.     private OneBlockInfo[] blocks;       // all blocks in this file
  400.     OneFileInfo(Path path, JobConf job,
  401.                 HashMap<String, List<OneBlockInfo>> rackToBlocks,
  402.                 HashMap<OneBlockInfo, String[]> blockToNodes,
  403.                 HashMap<String, List<OneBlockInfo>> nodeToBlocks)
  404.                 throws IOException {
  405.       this.fileSize = 0;
  406.       // get block locations from file system
  407.       FileSystem fs = path.getFileSystem(job);
  408.       FileStatus stat = fs.getFileStatus(path);
  409.       BlockLocation[] locations = fs.getFileBlockLocations(stat, 0, 
  410.                                                            stat.getLen());
  411.       // create a list of all block and their locations
  412.       if (locations == null) {
  413.         blocks = new OneBlockInfo[0];
  414.       } else {
  415.         blocks = new OneBlockInfo[locations.length];
  416.         for (int i = 0; i < locations.length; i++) {
  417.            
  418.           fileSize += locations[i].getLength();
  419.           OneBlockInfo oneblock =  new OneBlockInfo(path, 
  420.                                        locations[i].getOffset(), 
  421.                                        locations[i].getLength(),
  422.                                        locations[i].getHosts(),
  423.                                        locations[i].getTopologyPaths()); 
  424.           blocks[i] = oneblock;
  425.           // add this block to the block --> node locations map
  426.           blockToNodes.put(oneblock, oneblock.hosts);
  427.           // add this block to the rack --> block map
  428.           for (int j = 0; j < oneblock.racks.length; j++) {
  429.             String rack = oneblock.racks[j];
  430.             List<OneBlockInfo> blklist = rackToBlocks.get(rack);
  431.             if (blklist == null) {
  432.               blklist = new ArrayList<OneBlockInfo>();
  433.               rackToBlocks.put(rack, blklist);
  434.             }
  435.             blklist.add(oneblock);
  436.           }
  437.           // add this block to the node --> block map
  438.           for (int j = 0; j < oneblock.hosts.length; j++) {
  439.             String node = oneblock.hosts[j];
  440.             List<OneBlockInfo> blklist = nodeToBlocks.get(node);
  441.             if (blklist == null) {
  442.               blklist = new ArrayList<OneBlockInfo>();
  443.               nodeToBlocks.put(node, blklist);
  444.             }
  445.             blklist.add(oneblock);
  446.           }
  447.         }
  448.       }
  449.     }
  450.     long getLength() {
  451.       return fileSize;
  452.     }
  453.     OneBlockInfo[] getBlocks() {
  454.       return blocks;
  455.     }
  456.   }
  457.   /**
  458.    * information about one block from the File System
  459.    */
  460.   private static class OneBlockInfo {
  461.     Path onepath;                // name of this file
  462.     long offset;                 // offset in file
  463.     long length;                 // length of this block
  464.     String[] hosts;              // nodes on whch this block resides
  465.     String[] racks;              // network topology of hosts
  466.     OneBlockInfo(Path path, long offset, long len, 
  467.                  String[] hosts, String[] topologyPaths) {
  468.       this.onepath = path;
  469.       this.offset = offset;
  470.       this.hosts = hosts;
  471.       this.length = len;
  472.       assert (hosts.length == topologyPaths.length ||
  473.               topologyPaths.length == 0);
  474.       // if the file ystem does not have any rack information, then
  475.       // use dummy rack location.
  476.       if (topologyPaths.length == 0) {
  477.         topologyPaths = new String[hosts.length];
  478.         for (int i = 0; i < topologyPaths.length; i++) {
  479.           topologyPaths[i] = (new NodeBase(hosts[i], NetworkTopology.DEFAULT_RACK)).
  480.                                           toString();
  481.         }
  482.       }
  483.       // The topology paths have the host name included as the last 
  484.       // component. Strip it.
  485.       this.racks = new String[topologyPaths.length];
  486.       for (int i = 0; i < topologyPaths.length; i++) {
  487.         this.racks[i] = (new NodeBase(topologyPaths[i])).getNetworkLocation();
  488.       }
  489.     }
  490.   }
  491.   /**
  492.    * Accept a path only if any one of filters given in the
  493.    * constructor do. 
  494.    */
  495.   private static class MultiPathFilter implements PathFilter {
  496.     private List<PathFilter> filters;
  497.     public MultiPathFilter() {
  498.       this.filters = new ArrayList<PathFilter>();
  499.     }
  500.     public MultiPathFilter(List<PathFilter> filters) {
  501.       this.filters = filters;
  502.     }
  503.     public void add(PathFilter one) {
  504.       filters.add(one);
  505.     }
  506.     public boolean accept(Path path) {
  507.       for (PathFilter filter : filters) {
  508.         if (filter.accept(path)) {
  509.           return true;
  510.         }
  511.       }
  512.       return false;
  513.     }
  514.     public String toString() {
  515.       StringBuffer buf = new StringBuffer();
  516.       buf.append("[");
  517.       for (PathFilter f: filters) {
  518.         buf.append(f);
  519.         buf.append(",");
  520.       }
  521.       buf.append("]");
  522.       return buf.toString();
  523.     }
  524.   }
  525. }