TestDelegatingInputFormat.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:4k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred.lib;
  19. import java.io.DataOutputStream;
  20. import java.io.IOException;
  21. import junit.framework.TestCase;
  22. import org.apache.hadoop.fs.FileSystem;
  23. import org.apache.hadoop.fs.Path;
  24. import org.apache.hadoop.hdfs.MiniDFSCluster;
  25. import org.apache.hadoop.mapred.InputSplit;
  26. import org.apache.hadoop.mapred.JobConf;
  27. import org.apache.hadoop.mapred.KeyValueTextInputFormat;
  28. import org.apache.hadoop.mapred.Mapper;
  29. import org.apache.hadoop.mapred.OutputCollector;
  30. import org.apache.hadoop.mapred.Reporter;
  31. import org.apache.hadoop.mapred.TextInputFormat;
  32. public class TestDelegatingInputFormat extends TestCase {
  33.   public void testSplitting() throws Exception {
  34.     JobConf conf = new JobConf();
  35.     conf.set("fs.hdfs.impl",
  36.        "org.apache.hadoop.hdfs.ChecksumDistributedFileSystem");
  37.     MiniDFSCluster dfs = null;
  38.     try {
  39.       dfs = new MiniDFSCluster(conf, 4, true, new String[] { "/rack0",
  40.          "/rack0", "/rack1", "/rack1" }, new String[] { "host0", "host1",
  41.          "host2", "host3" });
  42.       FileSystem fs = dfs.getFileSystem();
  43.       Path path = getPath("/foo/bar", fs);
  44.       Path path2 = getPath("/foo/baz", fs);
  45.       Path path3 = getPath("/bar/bar", fs);
  46.       Path path4 = getPath("/bar/baz", fs);
  47.       final int numSplits = 100;
  48.       MultipleInputs.addInputPath(conf, path, TextInputFormat.class,
  49.          MapClass.class);
  50.       MultipleInputs.addInputPath(conf, path2, TextInputFormat.class,
  51.          MapClass2.class);
  52.       MultipleInputs.addInputPath(conf, path3, KeyValueTextInputFormat.class,
  53.          MapClass.class);
  54.       MultipleInputs.addInputPath(conf, path4, TextInputFormat.class,
  55.          MapClass2.class);
  56.       DelegatingInputFormat inFormat = new DelegatingInputFormat();
  57.       InputSplit[] splits = inFormat.getSplits(conf, numSplits);
  58.       int[] bins = new int[3];
  59.       for (InputSplit split : splits) {
  60.        assertTrue(split instanceof TaggedInputSplit);
  61.        final TaggedInputSplit tis = (TaggedInputSplit) split;
  62.        int index = -1;
  63.        if (tis.getInputFormatClass().equals(KeyValueTextInputFormat.class)) {
  64.          // path3
  65.          index = 0;
  66.        } else if (tis.getMapperClass().equals(MapClass.class)) {
  67.          // path
  68.          index = 1;
  69.        } else {
  70.          // path2 and path4
  71.          index = 2;
  72.        }
  73.        bins[index]++;
  74.       }
  75.       // Each bin is a unique combination of a Mapper and InputFormat, and
  76.       // DelegatingInputFormat should split each bin into numSplits splits,
  77.       // regardless of the number of paths that use that Mapper/InputFormat
  78.       for (int count : bins) {
  79.        assertEquals(numSplits, count);
  80.       }
  81.       assertTrue(true);
  82.     } finally {
  83.       if (dfs != null) {
  84.        dfs.shutdown();
  85.       }
  86.     }
  87.   }
  88.   static Path getPath(final String location, final FileSystem fs)
  89.       throws IOException {
  90.     Path path = new Path(location);
  91.     // create a multi-block file on hdfs
  92.     DataOutputStream out = fs.create(path, true, 4096, (short) 2, 512, null);
  93.     for (int i = 0; i < 1000; ++i) {
  94.       out.writeChars("Hellon");
  95.     }
  96.     out.close();
  97.     return path;
  98.   }
  99.   static class MapClass implements Mapper<String, String, String, String> {
  100.     public void map(String key, String value,
  101.        OutputCollector<String, String> output, Reporter reporter)
  102.        throws IOException {
  103.     }
  104.     public void configure(JobConf job) {
  105.     }
  106.     public void close() throws IOException {
  107.     }
  108.   }
  109.   static class MapClass2 extends MapClass {
  110.   }
  111. }