TestMultiFileInputFormat.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:5k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.IOException;
  20. import java.util.BitSet;
  21. import java.util.HashMap;
  22. import java.util.Random;
  23. import junit.framework.TestCase;
  24. import org.apache.commons.logging.Log;
  25. import org.apache.commons.logging.LogFactory;
  26. import org.apache.hadoop.fs.FSDataOutputStream;
  27. import org.apache.hadoop.fs.FileSystem;
  28. import org.apache.hadoop.fs.Path;
  29. import org.apache.hadoop.io.Text;
  30. public class TestMultiFileInputFormat extends TestCase{
  31.   private static JobConf job = new JobConf();
  32.   private static final Log LOG = LogFactory.getLog(TestMultiFileInputFormat.class);
  33.   
  34.   private static final int MAX_SPLIT_COUNT  = 10000;
  35.   private static final int SPLIT_COUNT_INCR = 6000;
  36.   private static final int MAX_BYTES = 1024;
  37.   private static final int MAX_NUM_FILES = 10000;
  38.   private static final int NUM_FILES_INCR = 8000;
  39.   
  40.   private Random rand = new Random(System.currentTimeMillis());
  41.   private HashMap<String, Long> lengths = new HashMap<String, Long>();
  42.   
  43.   /** Dummy class to extend MultiFileInputFormat*/
  44.   private class DummyMultiFileInputFormat extends MultiFileInputFormat<Text, Text> {
  45.     @Override
  46.     public RecordReader<Text,Text> getRecordReader(InputSplit split, JobConf job
  47.         , Reporter reporter) throws IOException {
  48.       return null;
  49.     }
  50.   }
  51.   
  52.   private Path initFiles(FileSystem fs, int numFiles, int numBytes) throws IOException{
  53.     Path dir = new Path(System.getProperty("test.build.data",".") + "/mapred");
  54.     Path multiFileDir = new Path(dir, "test.multifile");
  55.     fs.delete(multiFileDir, true);
  56.     fs.mkdirs(multiFileDir);
  57.     LOG.info("Creating " + numFiles + " file(s) in " + multiFileDir);
  58.     for(int i=0; i<numFiles ;i++) {
  59.       Path path = new Path(multiFileDir, "file_" + i);
  60.        FSDataOutputStream out = fs.create(path);
  61.        if (numBytes == -1) {
  62.          numBytes = rand.nextInt(MAX_BYTES);
  63.        }
  64.        for(int j=0; j< numBytes; j++) {
  65.          out.write(rand.nextInt());
  66.        }
  67.        out.close();
  68.        if(LOG.isDebugEnabled()) {
  69.          LOG.debug("Created file " + path + " with length " + numBytes);
  70.        }
  71.        lengths.put(path.getName(), new Long(numBytes));
  72.     }
  73.     FileInputFormat.setInputPaths(job, multiFileDir);
  74.     return multiFileDir;
  75.   }
  76.   
  77.   public void testFormat() throws IOException {
  78.     if(LOG.isInfoEnabled()) {
  79.       LOG.info("Test started");
  80.       LOG.info("Max split count           = " + MAX_SPLIT_COUNT);
  81.       LOG.info("Split count increment     = " + SPLIT_COUNT_INCR);
  82.       LOG.info("Max bytes per file        = " + MAX_BYTES);
  83.       LOG.info("Max number of files       = " + MAX_NUM_FILES);
  84.       LOG.info("Number of files increment = " + NUM_FILES_INCR);
  85.     }
  86.     
  87.     MultiFileInputFormat<Text,Text> format = new DummyMultiFileInputFormat();
  88.     FileSystem fs = FileSystem.getLocal(job);
  89.     
  90.     for(int numFiles = 1; numFiles< MAX_NUM_FILES ; 
  91.       numFiles+= (NUM_FILES_INCR / 2) + rand.nextInt(NUM_FILES_INCR / 2)) {
  92.       
  93.       Path dir = initFiles(fs, numFiles, -1);
  94.       BitSet bits = new BitSet(numFiles);
  95.       for(int i=1;i< MAX_SPLIT_COUNT ;i+= rand.nextInt(SPLIT_COUNT_INCR) + 1) {
  96.         LOG.info("Running for Num Files=" + numFiles + ", split count=" + i);
  97.         
  98.         MultiFileSplit[] splits = (MultiFileSplit[])format.getSplits(job, i);
  99.         bits.clear();
  100.         
  101.         for(MultiFileSplit split : splits) {
  102.           long splitLength = 0;
  103.           for(Path p : split.getPaths()) {
  104.             long length = fs.getContentSummary(p).getLength();
  105.             assertEquals(length, lengths.get(p.getName()).longValue());
  106.             splitLength += length;
  107.             String name = p.getName();
  108.             int index = Integer.parseInt(
  109.                 name.substring(name.lastIndexOf("file_") + 5));
  110.             assertFalse(bits.get(index));
  111.             bits.set(index);
  112.           }
  113.           assertEquals(splitLength, split.getLength());
  114.         }
  115.       }
  116.       assertEquals(bits.cardinality(), numFiles);
  117.       fs.delete(dir, true);
  118.     }
  119.     LOG.info("Test Finished");
  120.   }
  121.   
  122.   public void testFormatWithLessPathsThanSplits() throws Exception {
  123.     MultiFileInputFormat<Text,Text> format = new DummyMultiFileInputFormat();
  124.     FileSystem fs = FileSystem.getLocal(job);     
  125.     
  126.     // Test with no path
  127.     initFiles(fs, 0, -1);    
  128.     assertEquals(0, format.getSplits(job, 2).length);
  129.     
  130.     // Test with 2 path and 4 splits
  131.     initFiles(fs, 2, 500);
  132.     assertEquals(2, format.getSplits(job, 4).length);
  133.   }
  134.   
  135.   public static void main(String[] args) throws Exception{
  136.     TestMultiFileInputFormat test = new TestMultiFileInputFormat();
  137.     test.testFormat();
  138.   }
  139. }