CombineFileInputFormat.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:21k
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.apache.hadoop.mapred.lib;
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.HashMap;
- import java.util.Set;
- import java.util.Iterator;
- import java.util.Map;
- import java.util.Map.Entry;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.FileUtil;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.fs.BlockLocation;
- import org.apache.hadoop.fs.FileStatus;
- import org.apache.hadoop.fs.PathFilter;
- import org.apache.hadoop.net.NodeBase;
- import org.apache.hadoop.net.NetworkTopology;
- import org.apache.hadoop.mapred.InputSplit;
- import org.apache.hadoop.mapred.FileInputFormat;
- import org.apache.hadoop.mapred.JobConf;
- import org.apache.hadoop.mapred.Reporter;
- import org.apache.hadoop.mapred.RecordReader;
- /**
- * An abstract {@link org.apache.hadoop.mapred.InputFormat} that returns {@link CombineFileSplit}'s
- * in {@link org.apache.hadoop.mapred.InputFormat#getSplits(JobConf, int)} method.
- * Splits are constructed from the files under the input paths.
- * A split cannot have files from different pools.
- * Each split returned may contain blocks from different files.
- * If a maxSplitSize is specified, then blocks on the same node are
- * combined to form a single split. Blocks that are left over are
- * then combined with other blocks in the same rack.
- * If maxSplitSize is not specified, then blocks from the same rack
- * are combined in a single split; no attempt is made to create
- * node-local splits.
- * If the maxSplitSize is equal to the block size, then this class
- * is similar to the default spliting behaviour in Hadoop: each
- * block is a locally processed split.
- * Subclasses implement {@link org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit, JobConf, Reporter)}
- * to construct <code>RecordReader</code>'s for <code>CombineFileSplit</code>'s.
- * @see CombineFileSplit
- */
- public abstract class CombineFileInputFormat<K, V>
- extends FileInputFormat<K, V> {
- // ability to limit the size of a single split
- private long maxSplitSize = 0;
- private long minSplitSizeNode = 0;
- private long minSplitSizeRack = 0;
- // A pool of input paths filters. A split cannot have blocks from files
- // across multiple pools.
- private ArrayList<MultiPathFilter> pools = new ArrayList<MultiPathFilter>();
- /**
- * Specify the maximum size (in bytes) of each split. Each split is
- * approximately equal to the specified size.
- */
- protected void setMaxSplitSize(long maxSplitSize) {
- this.maxSplitSize = maxSplitSize;
- }
- /**
- * Specify the minimum size (in bytes) of each split per node.
- * This applies to data that is left over after combining data on a single
- * node into splits that are of maximum size specified by maxSplitSize.
- * This leftover data will be combined into its own split if its size
- * exceeds minSplitSizeNode.
- */
- protected void setMinSplitSizeNode(long minSplitSizeNode) {
- this.minSplitSizeNode = minSplitSizeNode;
- }
- /**
- * Specify the minimum size (in bytes) of each split per rack.
- * This applies to data that is left over after combining data on a single
- * rack into splits that are of maximum size specified by maxSplitSize.
- * This leftover data will be combined into its own split if its size
- * exceeds minSplitSizeRack.
- */
- protected void setMinSplitSizeRack(long minSplitSizeRack) {
- this.minSplitSizeRack = minSplitSizeRack;
- }
- /**
- * Create a new pool and add the filters to it.
- * A split cannot have files from different pools.
- */
- protected void createPool(JobConf conf, List<PathFilter> filters) {
- pools.add(new MultiPathFilter(filters));
- }
- /**
- * Create a new pool and add the filters to it.
- * A pathname can satisfy any one of the specified filters.
- * A split cannot have files from different pools.
- */
- protected void createPool(JobConf conf, PathFilter... filters) {
- MultiPathFilter multi = new MultiPathFilter();
- for (PathFilter f: filters) {
- multi.add(f);
- }
- pools.add(multi);
- }
- /**
- * default constructor
- */
- public CombineFileInputFormat() {
- }
- @Override
- public InputSplit[] getSplits(JobConf job, int numSplits)
- throws IOException {
- long minSizeNode = 0;
- long minSizeRack = 0;
- long maxSize = 0;
- // the values specified by setxxxSplitSize() takes precedence over the
- // values that might have been specified in the config
- if (minSplitSizeNode != 0) {
- minSizeNode = minSplitSizeNode;
- } else {
- minSizeNode = job.getLong("mapred.min.split.size.per.node", 0);
- }
- if (minSplitSizeRack != 0) {
- minSizeRack = minSplitSizeRack;
- } else {
- minSizeRack = job.getLong("mapred.min.split.size.per.rack", 0);
- }
- if (maxSplitSize != 0) {
- maxSize = maxSplitSize;
- } else {
- maxSize = job.getLong("mapred.max.split.size", 0);
- }
- if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) {
- throw new IOException("Minimum split size pernode " + minSizeNode +
- " cannot be larger than maximum split size " +
- maxSize);
- }
- if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) {
- throw new IOException("Minimum split size per rack" + minSizeRack +
- " cannot be larger than maximum split size " +
- maxSize);
- }
- if (minSizeRack != 0 && minSizeNode > minSizeRack) {
- throw new IOException("Minimum split size per node" + minSizeNode +
- " cannot be smaller than minimum split size per rack " +
- minSizeRack);
- }
- // all the files in input set
- Path[] paths = FileUtil.stat2Paths(listStatus(job));
- List<CombineFileSplit> splits = new ArrayList<CombineFileSplit>();
- if (paths.length == 0) {
- return splits.toArray(new CombineFileSplit[splits.size()]);
- }
- // In one single iteration, process all the paths in a single pool.
- // Processing one pool at a time ensures that a split contans paths
- // from a single pool only.
- for (MultiPathFilter onepool : pools) {
- ArrayList<Path> myPaths = new ArrayList<Path>();
-
- // pick one input path. If it matches all the filters in a pool,
- // add it to the output set
- for (int i = 0; i < paths.length; i++) {
- if (paths[i] == null) { // already processed
- continue;
- }
- FileSystem fs = paths[i].getFileSystem(job);
- Path p = new Path(paths[i].toUri().getPath());
- if (onepool.accept(p)) {
- myPaths.add(paths[i]); // add it to my output set
- paths[i] = null; // already processed
- }
- }
- // create splits for all files in this pool.
- getMoreSplits(job, myPaths.toArray(new Path[myPaths.size()]),
- maxSize, minSizeNode, minSizeRack, splits);
- }
- // Finally, process all paths that do not belong to any pool.
- ArrayList<Path> myPaths = new ArrayList<Path>();
- for (int i = 0; i < paths.length; i++) {
- if (paths[i] == null) { // already processed
- continue;
- }
- myPaths.add(paths[i]);
- }
- // create splits for all files that are not in any pool.
- getMoreSplits(job, myPaths.toArray(new Path[myPaths.size()]),
- maxSize, minSizeNode, minSizeRack, splits);
- return splits.toArray(new CombineFileSplit[splits.size()]);
- }
- /**
- * Return all the splits in the specified set of paths
- */
- private void getMoreSplits(JobConf job, Path[] paths,
- long maxSize, long minSizeNode, long minSizeRack,
- List<CombineFileSplit> splits)
- throws IOException {
- // all blocks for all the files in input set
- OneFileInfo[] files;
-
- // mapping from a rack name to the list of blocks it has
- HashMap<String, List<OneBlockInfo>> rackToBlocks =
- new HashMap<String, List<OneBlockInfo>>();
- // mapping from a block to the nodes on which it has replicas
- HashMap<OneBlockInfo, String[]> blockToNodes =
- new HashMap<OneBlockInfo, String[]>();
- // mapping from a node to the list of blocks that it contains
- HashMap<String, List<OneBlockInfo>> nodeToBlocks =
- new HashMap<String, List<OneBlockInfo>>();
-
- files = new OneFileInfo[paths.length];
- if (paths.length == 0) {
- return;
- }
- // populate all the blocks for all files
- long totLength = 0;
- for (int i = 0; i < paths.length; i++) {
- files[i] = new OneFileInfo(paths[i], job,
- rackToBlocks, blockToNodes, nodeToBlocks);
- totLength += files[i].getLength();
- }
- ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
- ArrayList<String> nodes = new ArrayList<String>();
- long curSplitSize = 0;
- // process all nodes and create splits that are local
- // to a node.
- for (Iterator<Map.Entry<String,
- List<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator();
- iter.hasNext();) {
- Map.Entry<String, List<OneBlockInfo>> one = iter.next();
- nodes.add(one.getKey());
- List<OneBlockInfo> blocksInNode = one.getValue();
- // for each block, copy it into validBlocks. Delete it from
- // blockToNodes so that the same block does not appear in
- // two different splits.
- for (OneBlockInfo oneblock : blocksInNode) {
- if (blockToNodes.containsKey(oneblock)) {
- validBlocks.add(oneblock);
- blockToNodes.remove(oneblock);
- curSplitSize += oneblock.length;
- // if the accumulated split size exceeds the maximum, then
- // create this split.
- if (maxSize != 0 && curSplitSize >= maxSize) {
- // create an input split and add it to the splits array
- addCreatedSplit(job, splits, nodes, validBlocks);
- curSplitSize = 0;
- validBlocks.clear();
- }
- }
- }
- // if there were any blocks left over and their combined size is
- // larger than minSplitNode, then combine them into one split.
- // Otherwise add them back to the unprocessed pool. It is likely
- // that they will be combined with other blocks from the same rack later on.
- if (minSizeNode != 0 && curSplitSize >= minSizeNode) {
- // create an input split and add it to the splits array
- addCreatedSplit(job, splits, nodes, validBlocks);
- } else {
- for (OneBlockInfo oneblock : validBlocks) {
- blockToNodes.put(oneblock, oneblock.hosts);
- }
- }
- validBlocks.clear();
- nodes.clear();
- curSplitSize = 0;
- }
- // if blocks in a rack are below the specified minimum size, then keep them
- // in 'overflow'. After the processing of all racks is complete, these overflow
- // blocks will be combined into splits.
- ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();
- ArrayList<String> racks = new ArrayList<String>();
- // Process all racks over and over again until there is no more work to do.
- while (blockToNodes.size() > 0) {
- // Create one split for this rack before moving over to the next rack.
- // Come back to this rack after creating a single split for each of the
- // remaining racks.
- // Process one rack location at a time, Combine all possible blocks that
- // reside on this rack as one split. (constrained by minimum and maximum
- // split size).
- // iterate over all racks
- for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter =
- rackToBlocks.entrySet().iterator(); iter.hasNext();) {
- Map.Entry<String, List<OneBlockInfo>> one = iter.next();
- racks.add(one.getKey());
- List<OneBlockInfo> blocks = one.getValue();
- // for each block, copy it into validBlocks. Delete it from
- // blockToNodes so that the same block does not appear in
- // two different splits.
- boolean createdSplit = false;
- for (OneBlockInfo oneblock : blocks) {
- if (blockToNodes.containsKey(oneblock)) {
- validBlocks.add(oneblock);
- blockToNodes.remove(oneblock);
- curSplitSize += oneblock.length;
-
- // if the accumulated split size exceeds the maximum, then
- // create this split.
- if (maxSize != 0 && curSplitSize >= maxSize) {
- // create an input split and add it to the splits array
- addCreatedSplit(job, splits, racks, validBlocks);
- createdSplit = true;
- break;
- }
- }
- }
- // if we created a split, then just go to the next rack
- if (createdSplit) {
- curSplitSize = 0;
- validBlocks.clear();
- racks.clear();
- continue;
- }
- if (!validBlocks.isEmpty()) {
- if (minSizeRack != 0 && curSplitSize >= minSizeRack) {
- // if there is a mimimum size specified, then create a single split
- // otherwise, store these blocks into overflow data structure
- addCreatedSplit(job, splits, racks, validBlocks);
- } else {
- // There were a few blocks in this rack that remained to be processed.
- // Keep them in 'overflow' block list. These will be combined later.
- overflowBlocks.addAll(validBlocks);
- }
- }
- curSplitSize = 0;
- validBlocks.clear();
- racks.clear();
- }
- }
- assert blockToNodes.isEmpty();
- assert curSplitSize == 0;
- assert validBlocks.isEmpty();
- assert racks.isEmpty();
- // Process all overflow blocks
- for (OneBlockInfo oneblock : overflowBlocks) {
- validBlocks.add(oneblock);
- curSplitSize += oneblock.length;
- // This might cause an exiting rack location to be re-added,
- // but it should be ok.
- for (int i = 0; i < oneblock.racks.length; i++) {
- racks.add(oneblock.racks[i]);
- }
- // if the accumulated split size exceeds the maximum, then
- // create this split.
- if (maxSize != 0 && curSplitSize >= maxSize) {
- // create an input split and add it to the splits array
- addCreatedSplit(job, splits, racks, validBlocks);
- curSplitSize = 0;
- validBlocks.clear();
- racks.clear();
- }
- }
- // Process any remaining blocks, if any.
- if (!validBlocks.isEmpty()) {
- addCreatedSplit(job, splits, racks, validBlocks);
- }
- }
- /**
- * Create a single split from the list of blocks specified in validBlocks
- * Add this new split into splitList.
- */
- private void addCreatedSplit(JobConf job,
- List<CombineFileSplit> splitList,
- List<String> racks,
- ArrayList<OneBlockInfo> validBlocks) {
- // create an input split
- Path[] fl = new Path[validBlocks.size()];
- long[] offset = new long[validBlocks.size()];
- long[] length = new long[validBlocks.size()];
- String[] rackLocations = racks.toArray(new String[racks.size()]);
- for (int i = 0; i < validBlocks.size(); i++) {
- fl[i] = validBlocks.get(i).onepath;
- offset[i] = validBlocks.get(i).offset;
- length[i] = validBlocks.get(i).length;
- }
- // add this split to the list that is returned
- CombineFileSplit thissplit = new CombineFileSplit(job, fl, offset,
- length, rackLocations);
- splitList.add(thissplit);
- }
- /**
- * This is not implemented yet.
- */
- public abstract RecordReader<K, V> getRecordReader(InputSplit split,
- JobConf job, Reporter reporter)
- throws IOException;
- /**
- * information about one file from the File System
- */
- private static class OneFileInfo {
- private long fileSize; // size of the file
- private OneBlockInfo[] blocks; // all blocks in this file
- OneFileInfo(Path path, JobConf job,
- HashMap<String, List<OneBlockInfo>> rackToBlocks,
- HashMap<OneBlockInfo, String[]> blockToNodes,
- HashMap<String, List<OneBlockInfo>> nodeToBlocks)
- throws IOException {
- this.fileSize = 0;
- // get block locations from file system
- FileSystem fs = path.getFileSystem(job);
- FileStatus stat = fs.getFileStatus(path);
- BlockLocation[] locations = fs.getFileBlockLocations(stat, 0,
- stat.getLen());
- // create a list of all block and their locations
- if (locations == null) {
- blocks = new OneBlockInfo[0];
- } else {
- blocks = new OneBlockInfo[locations.length];
- for (int i = 0; i < locations.length; i++) {
-
- fileSize += locations[i].getLength();
- OneBlockInfo oneblock = new OneBlockInfo(path,
- locations[i].getOffset(),
- locations[i].getLength(),
- locations[i].getHosts(),
- locations[i].getTopologyPaths());
- blocks[i] = oneblock;
- // add this block to the block --> node locations map
- blockToNodes.put(oneblock, oneblock.hosts);
- // add this block to the rack --> block map
- for (int j = 0; j < oneblock.racks.length; j++) {
- String rack = oneblock.racks[j];
- List<OneBlockInfo> blklist = rackToBlocks.get(rack);
- if (blklist == null) {
- blklist = new ArrayList<OneBlockInfo>();
- rackToBlocks.put(rack, blklist);
- }
- blklist.add(oneblock);
- }
- // add this block to the node --> block map
- for (int j = 0; j < oneblock.hosts.length; j++) {
- String node = oneblock.hosts[j];
- List<OneBlockInfo> blklist = nodeToBlocks.get(node);
- if (blklist == null) {
- blklist = new ArrayList<OneBlockInfo>();
- nodeToBlocks.put(node, blklist);
- }
- blklist.add(oneblock);
- }
- }
- }
- }
- long getLength() {
- return fileSize;
- }
- OneBlockInfo[] getBlocks() {
- return blocks;
- }
- }
- /**
- * information about one block from the File System
- */
- private static class OneBlockInfo {
- Path onepath; // name of this file
- long offset; // offset in file
- long length; // length of this block
- String[] hosts; // nodes on whch this block resides
- String[] racks; // network topology of hosts
- OneBlockInfo(Path path, long offset, long len,
- String[] hosts, String[] topologyPaths) {
- this.onepath = path;
- this.offset = offset;
- this.hosts = hosts;
- this.length = len;
- assert (hosts.length == topologyPaths.length ||
- topologyPaths.length == 0);
- // if the file ystem does not have any rack information, then
- // use dummy rack location.
- if (topologyPaths.length == 0) {
- topologyPaths = new String[hosts.length];
- for (int i = 0; i < topologyPaths.length; i++) {
- topologyPaths[i] = (new NodeBase(hosts[i], NetworkTopology.DEFAULT_RACK)).
- toString();
- }
- }
- // The topology paths have the host name included as the last
- // component. Strip it.
- this.racks = new String[topologyPaths.length];
- for (int i = 0; i < topologyPaths.length; i++) {
- this.racks[i] = (new NodeBase(topologyPaths[i])).getNetworkLocation();
- }
- }
- }
- /**
- * Accept a path only if any one of filters given in the
- * constructor do.
- */
- private static class MultiPathFilter implements PathFilter {
- private List<PathFilter> filters;
- public MultiPathFilter() {
- this.filters = new ArrayList<PathFilter>();
- }
- public MultiPathFilter(List<PathFilter> filters) {
- this.filters = filters;
- }
- public void add(PathFilter one) {
- filters.add(one);
- }
- public boolean accept(Path path) {
- for (PathFilter filter : filters) {
- if (filter.accept(path)) {
- return true;
- }
- }
- return false;
- }
- public String toString() {
- StringBuffer buf = new StringBuffer();
- buf.append("[");
- for (PathFilter f: filters) {
- buf.append(f);
- buf.append(",");
- }
- buf.append("]");
- return buf.toString();
- }
- }
- }