TestCombineFileInputFormat.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:23k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred.lib;
  19. import java.io.IOException;
  20. import java.io.DataOutputStream;
  21. import java.util.BitSet;
  22. import java.util.HashMap;
  23. import java.util.HashSet;
  24. import java.util.Random;
  25. import junit.framework.TestCase;
  26. import org.apache.commons.logging.Log;
  27. import org.apache.commons.logging.LogFactory;
  28. import org.apache.hadoop.fs.FSDataOutputStream;
  29. import org.apache.hadoop.fs.FileSystem;
  30. import org.apache.hadoop.fs.FileStatus;
  31. import org.apache.hadoop.fs.Path;
  32. import org.apache.hadoop.io.Text;
  33. import org.apache.hadoop.hdfs.MiniDFSCluster;
  34. import org.apache.hadoop.fs.BlockLocation;
  35. import org.apache.hadoop.io.BytesWritable;
  36. import org.apache.hadoop.hdfs.DFSTestUtil;
  37. import org.apache.hadoop.hdfs.DistributedFileSystem;
  38. import org.apache.hadoop.conf.Configuration;
  39. import org.apache.hadoop.io.SequenceFile;
  40. import org.apache.hadoop.io.SequenceFile.CompressionType;
  41. import org.apache.hadoop.fs.PathFilter;
  42. import org.apache.hadoop.mapred.InputSplit;
  43. import org.apache.hadoop.mapred.JobConf;
  44. import org.apache.hadoop.mapred.Reporter;
  45. import org.apache.hadoop.mapred.RecordReader;
  46. import org.apache.hadoop.mapred.MiniMRCluster;
  47. public class TestCombineFileInputFormat extends TestCase{
  48.   private static final String rack1[] = new String[] {
  49.     "/r1"
  50.   };
  51.   private static final String hosts1[] = new String[] {
  52.     "host1.rack1.com"
  53.   };
  54.   private static final String rack2[] = new String[] {
  55.     "/r2"
  56.   };
  57.   private static final String hosts2[] = new String[] {
  58.     "host2.rack2.com"
  59.   };
  60.   private static final String rack3[] = new String[] {
  61.     "/r3"
  62.   };
  63.   private static final String hosts3[] = new String[] {
  64.     "host3.rack3.com"
  65.   };
  66.   final Path inDir = new Path("/racktesting");
  67.   final Path outputPath = new Path("/output");
  68.   final Path dir1 = new Path(inDir, "/dir1");
  69.   final Path dir2 = new Path(inDir, "/dir2");
  70.   final Path dir3 = new Path(inDir, "/dir3");
  71.   final Path dir4 = new Path(inDir, "/dir4");
  72.   static final int BLOCKSIZE = 1024;
  73.   static final byte[] databuf = new byte[BLOCKSIZE];
  74.   private static final Log LOG = LogFactory.getLog(TestCombineFileInputFormat.class);
  75.   
  76.   /** Dummy class to extend CombineFileInputFormat*/
  77.   private class DummyInputFormat extends CombineFileInputFormat<Text, Text> {
  78.     @Override
  79.     public RecordReader<Text,Text> getRecordReader(InputSplit split, JobConf job
  80.         , Reporter reporter) throws IOException {
  81.       return null;
  82.     }
  83.   }
  84.   public void testSplitPlacement() throws IOException {
  85.     String namenode = null;
  86.     MiniDFSCluster dfs = null;
  87.     MiniMRCluster mr = null;
  88.     FileSystem fileSys = null;
  89.     String testName = "TestSplitPlacement";
  90.     try {
  91.       /* Start 3 datanodes, one each in rack r1, r2, r3. Create three files
  92.        * 1) file1, just after starting the datanode on r1, with 
  93.        *    a repl factor of 1, and,
  94.        * 2) file2, just after starting the datanode on r2, with 
  95.        *    a repl factor of 2, and,
  96.        * 3) file3 after starting the all three datanodes, with a repl 
  97.        *    factor of 3.
  98.        * At the end, file1 will be present on only datanode1, file2 will be
  99.        * present on datanode 1 and datanode2 and 
  100.        * file3 will be present on all datanodes. 
  101.        */
  102.       JobConf conf = new JobConf();
  103.       conf.setBoolean("dfs.replication.considerLoad", false);
  104.       dfs = new MiniDFSCluster(conf, 1, true, rack1, hosts1);
  105.       dfs.waitActive();
  106.       namenode = (dfs.getFileSystem()).getUri().getHost() + ":" +
  107.                  (dfs.getFileSystem()).getUri().getPort();
  108.       fileSys = dfs.getFileSystem();
  109.       if (!fileSys.mkdirs(inDir)) {
  110.         throw new IOException("Mkdirs failed to create " + inDir.toString());
  111.       }
  112.       Path file1 = new Path(dir1 + "/file1");
  113.       writeFile(conf, file1, (short)1, 1);
  114.       dfs.startDataNodes(conf, 1, true, null, rack2, hosts2, null);
  115.       dfs.waitActive();
  116.       // create file on two datanodes.
  117.       Path file2 = new Path(dir2 + "/file2");
  118.       writeFile(conf, file2, (short)2, 2);
  119.       // split it using a CombinedFile input format
  120.       DummyInputFormat inFormat = new DummyInputFormat();
  121.       inFormat.setInputPaths(conf, dir1 + "," + dir2);
  122.       inFormat.setMinSplitSizeRack(BLOCKSIZE);
  123.       InputSplit[] splits = inFormat.getSplits(conf, 1);
  124.       System.out.println("Made splits(Test1): " + splits.length);
  125.       // make sure that each split has different locations
  126.       CombineFileSplit fileSplit = null;
  127.       for (int i = 0; i < splits.length; ++i) {
  128.         fileSplit = (CombineFileSplit) splits[i];
  129.         System.out.println("File split(Test1): " + fileSplit);
  130.       }
  131.       assertEquals(splits.length, 2);
  132.       fileSplit = (CombineFileSplit) splits[0];
  133.       assertEquals(fileSplit.getNumPaths(), 2);
  134.       assertEquals(fileSplit.getLocations().length, 1);
  135.       assertEquals(fileSplit.getPath(0).getName(), file2.getName());
  136.       assertEquals(fileSplit.getOffset(0), 0);
  137.       assertEquals(fileSplit.getLength(0), BLOCKSIZE);
  138.       assertEquals(fileSplit.getPath(1).getName(), file2.getName());
  139.       assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
  140.       assertEquals(fileSplit.getLength(1), BLOCKSIZE);
  141.       assertEquals(fileSplit.getLocations()[0], "/r2");
  142.       fileSplit = (CombineFileSplit) splits[1];
  143.       assertEquals(fileSplit.getNumPaths(), 1);
  144.       assertEquals(fileSplit.getLocations().length, 1);
  145.       assertEquals(fileSplit.getPath(0).getName(), file1.getName());
  146.       assertEquals(fileSplit.getOffset(0), 0);
  147.       assertEquals(fileSplit.getLength(0), BLOCKSIZE);
  148.       assertEquals(fileSplit.getLocations()[0], "/r1");
  149.       // create another file on 3 datanodes and 3 racks.
  150.       dfs.startDataNodes(conf, 1, true, null, rack3, hosts3, null);
  151.       dfs.waitActive();
  152.       Path file3 = new Path(dir3 + "/file3");
  153.       writeFile(conf, new Path(dir3 + "/file3"), (short)3, 3);
  154.       inFormat = new DummyInputFormat();
  155.       inFormat.setInputPaths(conf, dir1 + "," + dir2 + "," + dir3);
  156.       inFormat.setMinSplitSizeRack(BLOCKSIZE);
  157.       splits = inFormat.getSplits(conf, 1);
  158.       for (int i = 0; i < splits.length; ++i) {
  159.         fileSplit = (CombineFileSplit) splits[i];
  160.         System.out.println("File split(Test2): " + fileSplit);
  161.       }
  162.       assertEquals(splits.length, 3);
  163.       fileSplit = (CombineFileSplit) splits[0];
  164.       assertEquals(fileSplit.getNumPaths(), 3);
  165.       assertEquals(fileSplit.getLocations().length, 1);
  166.       assertEquals(fileSplit.getPath(0).getName(), file3.getName());
  167.       assertEquals(fileSplit.getOffset(0), 0);
  168.       assertEquals(fileSplit.getLength(0), BLOCKSIZE);
  169.       assertEquals(fileSplit.getPath(1).getName(), file3.getName());
  170.       assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
  171.       assertEquals(fileSplit.getLength(1), BLOCKSIZE);
  172.       assertEquals(fileSplit.getPath(2).getName(), file3.getName());
  173.       assertEquals(fileSplit.getOffset(2), 2 * BLOCKSIZE);
  174.       assertEquals(fileSplit.getLength(2), BLOCKSIZE);
  175.       assertEquals(fileSplit.getLocations()[0], "/r3");
  176.       fileSplit = (CombineFileSplit) splits[1];
  177.       assertEquals(fileSplit.getNumPaths(), 2);
  178.       assertEquals(fileSplit.getLocations().length, 1);
  179.       assertEquals(fileSplit.getPath(0).getName(), file2.getName());
  180.       assertEquals(fileSplit.getOffset(0), 0);
  181.       assertEquals(fileSplit.getLength(0), BLOCKSIZE);
  182.       assertEquals(fileSplit.getPath(1).getName(), file2.getName());
  183.       assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
  184.       assertEquals(fileSplit.getLength(1), BLOCKSIZE);
  185.       assertEquals(fileSplit.getLocations()[0], "/r2");
  186.       fileSplit = (CombineFileSplit) splits[2];
  187.       assertEquals(fileSplit.getNumPaths(), 1);
  188.       assertEquals(fileSplit.getLocations().length, 1);
  189.       assertEquals(fileSplit.getPath(0).getName(), file1.getName());
  190.       assertEquals(fileSplit.getOffset(0), 0);
  191.       assertEquals(fileSplit.getLength(0), BLOCKSIZE);
  192.       assertEquals(fileSplit.getLocations()[0], "/r1");
  193.       // create file4 on all three racks
  194.       Path file4 = new Path(dir4 + "/file4");
  195.       writeFile(conf, file4, (short)3, 3);
  196.       inFormat = new DummyInputFormat();
  197.       inFormat.setInputPaths(conf, dir1 + "," + dir2 + "," + dir3 + "," + dir4);
  198.       inFormat.setMinSplitSizeRack(BLOCKSIZE);
  199.       splits = inFormat.getSplits(conf, 1);
  200.       for (int i = 0; i < splits.length; ++i) {
  201.         fileSplit = (CombineFileSplit) splits[i];
  202.         System.out.println("File split(Test3): " + fileSplit);
  203.       }
  204.       assertEquals(splits.length, 3);
  205.       fileSplit = (CombineFileSplit) splits[0];
  206.       assertEquals(fileSplit.getNumPaths(), 6);
  207.       assertEquals(fileSplit.getLocations().length, 1);
  208.       assertEquals(fileSplit.getPath(0).getName(), file3.getName());
  209.       assertEquals(fileSplit.getOffset(0), 0);
  210.       assertEquals(fileSplit.getLength(0), BLOCKSIZE);
  211.       assertEquals(fileSplit.getPath(1).getName(), file3.getName());
  212.       assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
  213.       assertEquals(fileSplit.getLength(1), BLOCKSIZE);
  214.       assertEquals(fileSplit.getPath(2).getName(), file3.getName());
  215.       assertEquals(fileSplit.getOffset(2), 2 * BLOCKSIZE);
  216.       assertEquals(fileSplit.getLength(2), BLOCKSIZE);
  217.       assertEquals(fileSplit.getLocations()[0], "/r3");
  218.       fileSplit = (CombineFileSplit) splits[1];
  219.       assertEquals(fileSplit.getNumPaths(), 2);
  220.       assertEquals(fileSplit.getLocations().length, 1);
  221.       assertEquals(fileSplit.getPath(0).getName(), file2.getName());
  222.       assertEquals(fileSplit.getOffset(0), 0);
  223.       assertEquals(fileSplit.getLength(0), BLOCKSIZE);
  224.       assertEquals(fileSplit.getPath(1).getName(), file2.getName());
  225.       assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
  226.       assertEquals(fileSplit.getLength(1), BLOCKSIZE);
  227.       assertEquals(fileSplit.getLocations()[0], "/r2");
  228.       fileSplit = (CombineFileSplit) splits[2];
  229.       assertEquals(fileSplit.getNumPaths(), 1);
  230.       assertEquals(fileSplit.getLocations().length, 1);
  231.       assertEquals(fileSplit.getPath(0).getName(), file1.getName());
  232.       assertEquals(fileSplit.getOffset(0), 0);
  233.       assertEquals(fileSplit.getLength(0), BLOCKSIZE);
  234.       assertEquals(fileSplit.getLocations()[0], "/r1");
  235.       // maximum split size is 2 blocks 
  236.       inFormat = new DummyInputFormat();
  237.       inFormat.setMinSplitSizeNode(BLOCKSIZE);
  238.       inFormat.setMaxSplitSize(2*BLOCKSIZE);
  239.       inFormat.setInputPaths(conf, dir1 + "," + dir2 + "," + dir3 + "," + dir4);
  240.       splits = inFormat.getSplits(conf, 1);
  241.       for (int i = 0; i < splits.length; ++i) {
  242.         fileSplit = (CombineFileSplit) splits[i];
  243.         System.out.println("File split(Test4): " + fileSplit);
  244.       }
  245.       assertEquals(splits.length, 5);
  246.       fileSplit = (CombineFileSplit) splits[0];
  247.       assertEquals(fileSplit.getNumPaths(), 2);
  248.       assertEquals(fileSplit.getLocations().length, 1);
  249.       assertEquals(fileSplit.getPath(0).getName(), file3.getName());
  250.       assertEquals(fileSplit.getOffset(0), 0);
  251.       assertEquals(fileSplit.getLength(0), BLOCKSIZE);
  252.       assertEquals(fileSplit.getPath(1).getName(), file3.getName());
  253.       assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
  254.       assertEquals(fileSplit.getLength(1), BLOCKSIZE);
  255.       assertEquals(fileSplit.getLocations()[0], "host3.rack3.com");
  256.       fileSplit = (CombineFileSplit) splits[1];
  257.       assertEquals(fileSplit.getPath(0).getName(), file3.getName());
  258.       assertEquals(fileSplit.getOffset(0), 2 * BLOCKSIZE);
  259.       assertEquals(fileSplit.getLength(0), BLOCKSIZE);
  260.       assertEquals(fileSplit.getPath(1).getName(), file4.getName());
  261.       assertEquals(fileSplit.getOffset(1), 0);
  262.       assertEquals(fileSplit.getLength(1), BLOCKSIZE);
  263.       assertEquals(fileSplit.getLocations()[0], "host3.rack3.com");
  264.       fileSplit = (CombineFileSplit) splits[2];
  265.       assertEquals(fileSplit.getNumPaths(), 2);
  266.       assertEquals(fileSplit.getLocations().length, 1);
  267.       assertEquals(fileSplit.getPath(0).getName(), file4.getName());
  268.       assertEquals(fileSplit.getOffset(0), BLOCKSIZE);
  269.       assertEquals(fileSplit.getLength(0), BLOCKSIZE);
  270.       assertEquals(fileSplit.getPath(1).getName(), file4.getName());
  271.       assertEquals(fileSplit.getOffset(1), 2 * BLOCKSIZE);
  272.       assertEquals(fileSplit.getLength(1), BLOCKSIZE);
  273.       assertEquals(fileSplit.getLocations()[0], "host3.rack3.com");
  274.       // maximum split size is 3 blocks 
  275.       inFormat = new DummyInputFormat();
  276.       inFormat.setMinSplitSizeNode(BLOCKSIZE);
  277.       inFormat.setMaxSplitSize(3*BLOCKSIZE);
  278.       inFormat.setInputPaths(conf, dir1 + "," + dir2 + "," + dir3 + "," + dir4);
  279.       splits = inFormat.getSplits(conf, 1);
  280.       for (int i = 0; i < splits.length; ++i) {
  281.         fileSplit = (CombineFileSplit) splits[i];
  282.         System.out.println("File split(Test5): " + fileSplit);
  283.       }
  284.       assertEquals(splits.length, 4);
  285.       fileSplit = (CombineFileSplit) splits[0];
  286.       assertEquals(fileSplit.getNumPaths(), 3);
  287.       assertEquals(fileSplit.getLocations().length, 1);
  288.       assertEquals(fileSplit.getPath(0).getName(), file3.getName());
  289.       assertEquals(fileSplit.getOffset(0), 0);
  290.       assertEquals(fileSplit.getLength(0), BLOCKSIZE);
  291.       assertEquals(fileSplit.getPath(1).getName(), file3.getName());
  292.       assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
  293.       assertEquals(fileSplit.getLength(1), BLOCKSIZE);
  294.       assertEquals(fileSplit.getPath(2).getName(), file3.getName());
  295.       assertEquals(fileSplit.getOffset(2), 2 * BLOCKSIZE);
  296.       assertEquals(fileSplit.getLength(2), BLOCKSIZE);
  297.       assertEquals(fileSplit.getLocations()[0], "host3.rack3.com");
  298.       fileSplit = (CombineFileSplit) splits[1];
  299.       assertEquals(fileSplit.getPath(0).getName(), file4.getName());
  300.       assertEquals(fileSplit.getOffset(0), 0);
  301.       assertEquals(fileSplit.getLength(0), BLOCKSIZE);
  302.       assertEquals(fileSplit.getPath(1).getName(), file4.getName());
  303.       assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
  304.       assertEquals(fileSplit.getLength(1), BLOCKSIZE);
  305.       assertEquals(fileSplit.getPath(2).getName(), file4.getName());
  306.       assertEquals(fileSplit.getOffset(2),  2 * BLOCKSIZE);
  307.       assertEquals(fileSplit.getLength(2), BLOCKSIZE);
  308.       assertEquals(fileSplit.getLocations()[0], "host3.rack3.com");
  309.       fileSplit = (CombineFileSplit) splits[2];
  310.       assertEquals(fileSplit.getNumPaths(), 2);
  311.       assertEquals(fileSplit.getLocations().length, 1);
  312.       assertEquals(fileSplit.getPath(0).getName(), file2.getName());
  313.       assertEquals(fileSplit.getOffset(0), 0);
  314.       assertEquals(fileSplit.getLength(0), BLOCKSIZE);
  315.       assertEquals(fileSplit.getPath(1).getName(), file2.getName());
  316.       assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
  317.       assertEquals(fileSplit.getLength(1), BLOCKSIZE);
  318.       assertEquals(fileSplit.getLocations()[0], "host2.rack2.com");
  319.       fileSplit = (CombineFileSplit) splits[3];
  320.       assertEquals(fileSplit.getNumPaths(), 1);
  321.       assertEquals(fileSplit.getLocations().length, 1);
  322.       assertEquals(fileSplit.getPath(0).getName(), file1.getName());
  323.       assertEquals(fileSplit.getOffset(0), 0);
  324.       assertEquals(fileSplit.getLength(0), BLOCKSIZE);
  325.       assertEquals(fileSplit.getLocations()[0], "host1.rack1.com");
  326.       // maximum split size is 4 blocks 
  327.       inFormat = new DummyInputFormat();
  328.       inFormat.setMaxSplitSize(4*BLOCKSIZE);
  329.       inFormat.setInputPaths(conf, dir1 + "," + dir2 + "," + dir3 + "," + dir4);
  330.       splits = inFormat.getSplits(conf, 1);
  331.       for (int i = 0; i < splits.length; ++i) {
  332.         fileSplit = (CombineFileSplit) splits[i];
  333.         System.out.println("File split(Test6): " + fileSplit);
  334.       }
  335.       assertEquals(splits.length, 3);
  336.       fileSplit = (CombineFileSplit) splits[0];
  337.       assertEquals(fileSplit.getNumPaths(), 4);
  338.       assertEquals(fileSplit.getLocations().length, 1);
  339.       assertEquals(fileSplit.getPath(0).getName(), file3.getName());
  340.       assertEquals(fileSplit.getOffset(0), 0);
  341.       assertEquals(fileSplit.getLength(0), BLOCKSIZE);
  342.       assertEquals(fileSplit.getPath(1).getName(), file3.getName());
  343.       assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
  344.       assertEquals(fileSplit.getLength(1), BLOCKSIZE);
  345.       assertEquals(fileSplit.getPath(2).getName(), file3.getName());
  346.       assertEquals(fileSplit.getOffset(2), 2 * BLOCKSIZE);
  347.       assertEquals(fileSplit.getLength(2), BLOCKSIZE);
  348.       assertEquals(fileSplit.getLocations()[0], "host3.rack3.com");
  349.       fileSplit = (CombineFileSplit) splits[1];
  350.       assertEquals(fileSplit.getNumPaths(), 4);
  351.       assertEquals(fileSplit.getPath(0).getName(), file2.getName());
  352.       assertEquals(fileSplit.getOffset(0), 0);
  353.       assertEquals(fileSplit.getLength(0), BLOCKSIZE);
  354.       assertEquals(fileSplit.getPath(1).getName(), file2.getName());
  355.       assertEquals(fileSplit.getOffset(1), BLOCKSIZE);
  356.       assertEquals(fileSplit.getLength(1), BLOCKSIZE);
  357.       assertEquals(fileSplit.getPath(2).getName(), file4.getName());
  358.       assertEquals(fileSplit.getOffset(2), BLOCKSIZE);
  359.       assertEquals(fileSplit.getLength(2), BLOCKSIZE);
  360.       assertEquals(fileSplit.getPath(3).getName(), file4.getName());
  361.       assertEquals(fileSplit.getOffset(3),  2 * BLOCKSIZE);
  362.       assertEquals(fileSplit.getLength(3), BLOCKSIZE);
  363.       assertEquals(fileSplit.getLocations()[0], "host2.rack2.com");
  364.       fileSplit = (CombineFileSplit) splits[2];
  365.       assertEquals(fileSplit.getNumPaths(), 1);
  366.       assertEquals(fileSplit.getLocations().length, 1);
  367.       assertEquals(fileSplit.getPath(0).getName(), file1.getName());
  368.       assertEquals(fileSplit.getOffset(0), 0);
  369.       assertEquals(fileSplit.getLength(0), BLOCKSIZE);
  370.       assertEquals(fileSplit.getLocations()[0], "/r1");
  371.       // maximum split size is 7 blocks and min is 3 blocks
  372.       inFormat = new DummyInputFormat();
  373.       inFormat.setMaxSplitSize(7*BLOCKSIZE);
  374.       inFormat.setMinSplitSizeNode(3*BLOCKSIZE);
  375.       inFormat.setMinSplitSizeRack(3*BLOCKSIZE);
  376.       inFormat.setInputPaths(conf, dir1 + "," + dir2 + "," + dir3 + "," + dir4);
  377.       splits = inFormat.getSplits(conf, 1);
  378.       for (int i = 0; i < splits.length; ++i) {
  379.         fileSplit = (CombineFileSplit) splits[i];
  380.         System.out.println("File split(Test7): " + fileSplit);
  381.       }
  382.       assertEquals(splits.length, 2);
  383.       fileSplit = (CombineFileSplit) splits[0];
  384.       assertEquals(fileSplit.getNumPaths(), 6);
  385.       assertEquals(fileSplit.getLocations().length, 1);
  386.       assertEquals(fileSplit.getLocations()[0], "host3.rack3.com");
  387.       fileSplit = (CombineFileSplit) splits[1];
  388.       assertEquals(fileSplit.getNumPaths(), 3);
  389.       assertEquals(fileSplit.getLocations().length, 1);
  390.       assertEquals(fileSplit.getLocations()[0], "host1.rack1.com");
  391.       // Rack 1 has file1, file2 and file3 and file4
  392.       // Rack 2 has file2 and file3 and file4
  393.       // Rack 3 has file3 and file4
  394.       file1 = new Path(conf.getWorkingDirectory(), file1);
  395.       file2 = new Path(conf.getWorkingDirectory(), file2);
  396.       file3 = new Path(conf.getWorkingDirectory(), file3);
  397.       file4 = new Path(conf.getWorkingDirectory(), file4);
  398.       // setup a filter so that only file1 and file2 can be combined
  399.       inFormat = new DummyInputFormat();
  400.       inFormat.addInputPath(conf, inDir);
  401.       inFormat.setMinSplitSizeRack(1); // everything is at least rack local
  402.       inFormat.createPool(conf, new TestFilter(dir1), 
  403.                           new TestFilter(dir2));
  404.       splits = inFormat.getSplits(conf, 1);
  405.       for (int i = 0; i < splits.length; ++i) {
  406.         fileSplit = (CombineFileSplit) splits[i];
  407.         System.out.println("File split(TestPool1): " + fileSplit);
  408.       }
  409.       assertEquals(splits.length, 3);
  410.       fileSplit = (CombineFileSplit) splits[0];
  411.       assertEquals(fileSplit.getNumPaths(), 2);
  412.       assertEquals(fileSplit.getLocations().length, 1);
  413.       assertEquals(fileSplit.getLocations()[0], "/r2");
  414.       fileSplit = (CombineFileSplit) splits[1];
  415.       assertEquals(fileSplit.getNumPaths(), 1);
  416.       assertEquals(fileSplit.getLocations().length, 1);
  417.       assertEquals(fileSplit.getLocations()[0], "/r1");
  418.       fileSplit = (CombineFileSplit) splits[2];
  419.       assertEquals(fileSplit.getNumPaths(), 6);
  420.       assertEquals(fileSplit.getLocations().length, 1);
  421.       assertEquals(fileSplit.getLocations()[0], "/r3");
  422.     } finally {
  423.       if (dfs != null) {
  424.         dfs.shutdown();
  425.       }
  426.     }
  427.   }
  428.   static void writeFile(Configuration conf, Path name,
  429.       short replication, int numBlocks) throws IOException {
  430.     FileSystem fileSys = FileSystem.get(conf);
  431.     FSDataOutputStream stm = fileSys.create(name, true,
  432.                                             conf.getInt("io.file.buffer.size", 4096),
  433.                                             replication, (long)BLOCKSIZE);
  434.     for (int i = 0; i < numBlocks; i++) {
  435.       stm.write(databuf);
  436.     }
  437.     stm.close();
  438.     DFSTestUtil.waitReplication(fileSys, name, replication);
  439.   }
  440.   static class TestFilter implements PathFilter {
  441.     private Path p;
  442.     // store a path prefix in this TestFilter
  443.     public TestFilter(Path p) {
  444.       this.p = p;
  445.     }
  446.     // returns true if the specified path matches the prefix stored
  447.     // in this TestFilter.
  448.     public boolean accept(Path path) {
  449.       if (path.toString().indexOf(p.toString()) == 0) {
  450.         return true;
  451.       }
  452.       return false;
  453.     }
  454.     public String toString() {
  455.       return "PathFilter:" + p;
  456.     }
  457.   }
  458.   /*
  459.    * Prints out the input splits for the specified files
  460.    */
  461.   private void splitRealFiles(String[] args) throws IOException {
  462.     JobConf conf = new JobConf();
  463.     FileSystem fs = FileSystem.get(conf);
  464.     if (!(fs instanceof DistributedFileSystem)) {
  465.       throw new IOException("Wrong file system: " + fs.getClass().getName());
  466.     }
  467.     int blockSize = conf.getInt("dfs.block.size", 128 * 1024 * 1024);
  468.     DummyInputFormat inFormat = new DummyInputFormat();
  469.     for (int i = 0; i < args.length; i++) {
  470.       inFormat.addInputPaths(conf, args[i]);
  471.     }
  472.     inFormat.setMinSplitSizeRack(blockSize);
  473.     inFormat.setMaxSplitSize(10 * blockSize);
  474.     InputSplit[] splits = inFormat.getSplits(conf, 1);
  475.     System.out.println("Total number of splits " + splits.length);
  476.     for (int i = 0; i < splits.length; ++i) {
  477.       CombineFileSplit fileSplit = (CombineFileSplit) splits[i];
  478.       System.out.println("Split[" + i + "] " + fileSplit);
  479.     }
  480.   }
  481.   public static void main(String[] args) throws Exception{
  482.     // if there are some parameters specified, then use those paths
  483.     if (args.length != 0) {
  484.       TestCombineFileInputFormat test = new TestCombineFileInputFormat();
  485.       test.splitRealFiles(args);
  486.     } else {
  487.       TestCombineFileInputFormat test = new TestCombineFileInputFormat();
  488.       test.testSplitPlacement();
  489.     }
  490.   }
  491. }