CombineFileSplit.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:6k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred.lib;
  19. import java.io.DataInput;
  20. import java.io.DataOutput;
  21. import java.io.IOException;
  22. import java.util.HashSet;
  23. import java.util.Set;
  24. import org.apache.hadoop.fs.FileStatus;
  25. import org.apache.hadoop.fs.FileSystem;
  26. import org.apache.hadoop.fs.Path;
  27. import org.apache.hadoop.fs.BlockLocation;
  28. import org.apache.hadoop.io.Text;
  29. import org.apache.hadoop.mapred.InputSplit;
  30. import org.apache.hadoop.mapred.FileInputFormat;
  31. import org.apache.hadoop.mapred.JobConf;
  32. /**
  33.  * A sub-collection of input files. Unlike {@link org.apache.hadoop.mapred.FileSplit}, 
  34.  * CombineFileSplit * class does not represent a split of a file, but a split of input files 
  35.  * into smaller sets. A split may contain blocks from different file but all 
  36.  * the blocks in the same split are probably local to some rack <br> 
  37.  * CombineFileSplit can be used to implement {@link org.apache.hadoop.mapred.RecordReader}'s, 
  38.  * with reading one record per file.
  39.  * @see org.apache.hadoop.mapred.FileSplit
  40.  * @see CombineFileInputFormat 
  41.  */
  42. public class CombineFileSplit implements InputSplit {
  43.   private Path[] paths;
  44.   private long[] startoffset;
  45.   private long[] lengths;
  46.   private String[] locations;
  47.   private long totLength;
  48.   private JobConf job;
  49.   /**
  50.    * default constructor
  51.    */
  52.   public CombineFileSplit() {}
  53.   public CombineFileSplit(JobConf job, Path[] files, long[] start, 
  54.                           long[] lengths, String[] locations) {
  55.     initSplit(job, files, start, lengths, locations);
  56.   }
  57.   public CombineFileSplit(JobConf job, Path[] files, long[] lengths) {
  58.     long[] startoffset = new long[files.length];
  59.     for (int i = 0; i < startoffset.length; i++) {
  60.       startoffset[i] = 0;
  61.     }
  62.     String[] locations = new String[files.length];
  63.     for (int i = 0; i < locations.length; i++) {
  64.       locations[i] = "";
  65.     }
  66.     initSplit(job, files, startoffset, lengths, locations);
  67.   }
  68.   
  69.   private void initSplit(JobConf job, Path[] files, long[] start, 
  70.                          long[] lengths, String[] locations) {
  71.     this.job = job;
  72.     this.startoffset = start;
  73.     this.lengths = lengths;
  74.     this.paths = files;
  75.     this.totLength = 0;
  76.     this.locations = locations;
  77.     for(long length : lengths) {
  78.       totLength += length;
  79.     }
  80.   }
  81.   /**
  82.    * Copy constructor
  83.    */
  84.   public CombineFileSplit(CombineFileSplit old) throws IOException {
  85.     this(old.getJob(), old.getPaths(), old.getStartOffsets(),
  86.          old.getLengths(), old.getLocations());
  87.   }
  88.   public JobConf getJob() {
  89.     return job;
  90.   }
  91.   public long getLength() {
  92.     return totLength;
  93.   }
  94.   /** Returns an array containing the startoffsets of the files in the split*/ 
  95.   public long[] getStartOffsets() {
  96.     return startoffset;
  97.   }
  98.   
  99.   /** Returns an array containing the lengths of the files in the split*/ 
  100.   public long[] getLengths() {
  101.     return lengths;
  102.   }
  103.   /** Returns the start offset of the i<sup>th</sup> Path */
  104.   public long getOffset(int i) {
  105.     return startoffset[i];
  106.   }
  107.   
  108.   /** Returns the length of the i<sup>th</sup> Path */
  109.   public long getLength(int i) {
  110.     return lengths[i];
  111.   }
  112.   
  113.   /** Returns the number of Paths in the split */
  114.   public int getNumPaths() {
  115.     return paths.length;
  116.   }
  117.   /** Returns the i<sup>th</sup> Path */
  118.   public Path getPath(int i) {
  119.     return paths[i];
  120.   }
  121.   
  122.   /** Returns all the Paths in the split */
  123.   public Path[] getPaths() {
  124.     return paths;
  125.   }
  126.   /** Returns all the Paths where this input-split resides */
  127.   public String[] getLocations() throws IOException {
  128.     return locations;
  129.   }
  130.   public void readFields(DataInput in) throws IOException {
  131.     totLength = in.readLong();
  132.     int arrLength = in.readInt();
  133.     lengths = new long[arrLength];
  134.     for(int i=0; i<arrLength;i++) {
  135.       lengths[i] = in.readLong();
  136.     }
  137.     int filesLength = in.readInt();
  138.     paths = new Path[filesLength];
  139.     for(int i=0; i<filesLength;i++) {
  140.       paths[i] = new Path(Text.readString(in));
  141.     }
  142.     arrLength = in.readInt();
  143.     startoffset = new long[arrLength];
  144.     for(int i=0; i<arrLength;i++) {
  145.       startoffset[i] = in.readLong();
  146.     }
  147.   }
  148.   public void write(DataOutput out) throws IOException {
  149.     out.writeLong(totLength);
  150.     out.writeInt(lengths.length);
  151.     for(long length : lengths) {
  152.       out.writeLong(length);
  153.     }
  154.     out.writeInt(paths.length);
  155.     for(Path p : paths) {
  156.       Text.writeString(out, p.toString());
  157.     }
  158.     out.writeInt(startoffset.length);
  159.     for(long length : startoffset) {
  160.       out.writeLong(length);
  161.     }
  162.   }
  163.   
  164.   @Override
  165.  public String toString() {
  166.     StringBuffer sb = new StringBuffer();
  167.     for (int i = 0; i < paths.length; i++) {
  168.       if (i == 0 ) {
  169.         sb.append("Paths:");
  170.       }
  171.       sb.append(paths[i].toUri().getPath() + ":" + startoffset[i] +
  172.                 "+" + lengths[i]);
  173.       if (i < paths.length -1) {
  174.         sb.append(",");
  175.       }
  176.     }
  177.     if (locations != null) {
  178.       String locs = "";
  179.       StringBuffer locsb = new StringBuffer();
  180.       for (int i = 0; i < locations.length; i++) {
  181.         locsb.append(locations[i] + ":");
  182.       }
  183.       locs = locsb.toString();
  184.       sb.append(" Locations:" + locs + "; ");
  185.     }
  186.     return sb.toString();
  187.   }
  188. }