MultiFileInputFormat.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:4k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.IOException;
  20. import java.util.ArrayList;
  21. import java.util.List;
  22. import org.apache.hadoop.fs.FileSystem;
  23. import org.apache.hadoop.fs.FileUtil;
  24. import org.apache.hadoop.fs.Path;
  25. /**
  26.  * An abstract {@link InputFormat} that returns {@link MultiFileSplit}'s
  27.  * in {@link #getSplits(JobConf, int)} method. Splits are constructed from 
  28.  * the files under the input paths. Each split returned contains <i>nearly</i>
  29.  * equal content length. <br>  
  30.  * Subclasses implement {@link #getRecordReader(InputSplit, JobConf, Reporter)}
  31.  * to construct <code>RecordReader</code>'s for <code>MultiFileSplit</code>'s.
  32.  * @see MultiFileSplit
  33.  * @deprecated Use {@link org.apache.hadoop.mapred.lib.CombineFileInputFormat} instead
  34.  */
  35. @Deprecated
  36. public abstract class MultiFileInputFormat<K, V>
  37.   extends FileInputFormat<K, V> {
  38.   @Override
  39.   public InputSplit[] getSplits(JobConf job, int numSplits) 
  40.     throws IOException {
  41.     
  42.     Path[] paths = FileUtil.stat2Paths(listStatus(job));
  43.     List<MultiFileSplit> splits = new ArrayList<MultiFileSplit>(Math.min(numSplits, paths.length));
  44.     if (paths.length != 0) {
  45.       // HADOOP-1818: Manage splits only if there are paths
  46.       long[] lengths = new long[paths.length];
  47.       long totLength = 0;
  48.       for(int i=0; i<paths.length; i++) {
  49.         FileSystem fs = paths[i].getFileSystem(job);
  50.         lengths[i] = fs.getContentSummary(paths[i]).getLength();
  51.         totLength += lengths[i];
  52.       }
  53.       double avgLengthPerSplit = ((double)totLength) / numSplits;
  54.       long cumulativeLength = 0;
  55.       int startIndex = 0;
  56.       for(int i=0; i<numSplits; i++) {
  57.         int splitSize = findSize(i, avgLengthPerSplit, cumulativeLength
  58.             , startIndex, lengths);
  59.         if (splitSize != 0) {
  60.           // HADOOP-1818: Manage split only if split size is not equals to 0
  61.           Path[] splitPaths = new Path[splitSize];
  62.           long[] splitLengths = new long[splitSize];
  63.           System.arraycopy(paths, startIndex, splitPaths , 0, splitSize);
  64.           System.arraycopy(lengths, startIndex, splitLengths , 0, splitSize);
  65.           splits.add(new MultiFileSplit(job, splitPaths, splitLengths));
  66.           startIndex += splitSize;
  67.           for(long l: splitLengths) {
  68.             cumulativeLength += l;
  69.           }
  70.         }
  71.       }
  72.     }
  73.     return splits.toArray(new MultiFileSplit[splits.size()]);    
  74.   }
  75.   private int findSize(int splitIndex, double avgLengthPerSplit
  76.       , long cumulativeLength , int startIndex, long[] lengths) {
  77.     
  78.     if(splitIndex == lengths.length - 1)
  79.       return lengths.length - startIndex;
  80.     
  81.     long goalLength = (long)((splitIndex + 1) * avgLengthPerSplit);
  82.     long partialLength = 0;
  83.     // accumulate till just above the goal length;
  84.     for(int i = startIndex; i < lengths.length; i++) {
  85.       partialLength += lengths[i];
  86.       if(partialLength + cumulativeLength >= goalLength) {
  87.         return i - startIndex + 1;
  88.       }
  89.     }
  90.     return lengths.length - startIndex;
  91.   }
  92.   
  93.   @Override
  94.   public abstract RecordReader<K, V> getRecordReader(InputSplit split,
  95.       JobConf job, Reporter reporter)
  96.       throws IOException;
  97. }