TestMiniMRDFSSort.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:7k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.IOException;
  20. import junit.extensions.TestSetup;
  21. import junit.framework.Test;
  22. import junit.framework.TestCase;
  23. import junit.framework.TestSuite;
  24. import org.apache.hadoop.conf.Configuration;
  25. import org.apache.hadoop.hdfs.MiniDFSCluster;
  26. import org.apache.hadoop.io.BytesWritable;
  27. import org.apache.hadoop.io.Text;
  28. import org.apache.hadoop.mapred.lib.NullOutputFormat;
  29. import org.apache.hadoop.fs.FileSystem;
  30. import org.apache.hadoop.fs.Path;
  31. import org.apache.hadoop.util.ToolRunner;
  32. import org.apache.hadoop.examples.RandomWriter;
  33. import org.apache.hadoop.examples.Sort;
  34. /**
  35.  * A JUnit test to test the Map-Reduce framework's sort 
  36.  * with a Mini Map-Reduce Cluster with a Mini HDFS Clusters.
  37.  */
  38. public class TestMiniMRDFSSort extends TestCase {
  39.   // Input/Output paths for sort
  40.   private static final Path SORT_INPUT_PATH = new Path("/sort/input");
  41.   private static final Path SORT_OUTPUT_PATH = new Path("/sort/output");
  42.   // Knobs to control randomwriter; and hence sort
  43.   private static final int NUM_HADOOP_SLAVES = 3;
  44.   // make it big enough to cause a spill in the map
  45.   private static final int RW_BYTES_PER_MAP = 3 * 1024 * 1024;
  46.   private static final int RW_MAPS_PER_HOST = 2;
  47.   private static MiniMRCluster mrCluster = null;
  48.   private static MiniDFSCluster dfsCluster = null;
  49.   private static FileSystem dfs = null;
  50.   public static Test suite() {
  51.     TestSetup setup = new TestSetup(new TestSuite(TestMiniMRDFSSort.class)) {
  52.       protected void setUp() throws Exception {
  53.         Configuration conf = new Configuration();
  54.         dfsCluster = new MiniDFSCluster(conf, NUM_HADOOP_SLAVES, true, null);
  55.         dfs = dfsCluster.getFileSystem();
  56.         mrCluster = new MiniMRCluster(NUM_HADOOP_SLAVES, 
  57.                                       dfs.getUri().toString(), 1);
  58.       }
  59.       protected void tearDown() throws Exception {
  60.         if (dfsCluster != null) { dfsCluster.shutdown(); }
  61.         if (mrCluster != null) { mrCluster.shutdown(); }
  62.       }
  63.     };
  64.     return setup;
  65.   }
  66.   private static void runRandomWriter(JobConf job, Path sortInput) 
  67.   throws Exception {
  68.     // Scale down the default settings for RandomWriter for the test-case
  69.     // Generates NUM_HADOOP_SLAVES * RW_MAPS_PER_HOST * RW_BYTES_PER_MAP
  70.     job.setInt("test.randomwrite.bytes_per_map", RW_BYTES_PER_MAP);
  71.     job.setInt("test.randomwriter.maps_per_host", RW_MAPS_PER_HOST);
  72.     String[] rwArgs = {sortInput.toString()};
  73.     
  74.     // Run RandomWriter
  75.     assertEquals(ToolRunner.run(job, new RandomWriter(), rwArgs), 0);
  76.   }
  77.   
  78.   private static void runSort(JobConf job, Path sortInput, Path sortOutput) 
  79.   throws Exception {
  80.     job.setInt("mapred.job.reuse.jvm.num.tasks", -1);
  81.     job.setInt("io.sort.mb", 1);
  82.     job.setNumMapTasks(12);
  83.     // Setup command-line arguments to 'sort'
  84.     String[] sortArgs = {sortInput.toString(), sortOutput.toString()};
  85.     
  86.     // Run Sort
  87.     Sort sort = new Sort();
  88.     assertEquals(ToolRunner.run(job, sort, sortArgs), 0);
  89.     Counters counters = sort.getResult().getCounters();
  90.     long mapInput = counters.findCounter(Task.Counter.MAP_INPUT_BYTES
  91.     ).getValue();
  92.     long hdfsRead = counters.findCounter(Task.FILESYSTEM_COUNTER_GROUP,
  93.                                          "HDFS_BYTES_READ").getValue();
  94.     // the hdfs read should be between 100% and 110% of the map input bytes
  95.     assertTrue("map input = " + mapInput + ", hdfs read = " + hdfsRead,
  96.                (hdfsRead < (mapInput * 1.1)) &&
  97.                (hdfsRead > mapInput));  
  98.   }
  99.   
  100.   private static void runSortValidator(JobConf job, 
  101.                                        Path sortInput, Path sortOutput) 
  102.   throws Exception {
  103.     String[] svArgs = {"-sortInput", sortInput.toString(), 
  104.                        "-sortOutput", sortOutput.toString()};
  105.     // Run Sort-Validator
  106.     assertEquals(ToolRunner.run(job, new SortValidator(), svArgs), 0);
  107.   }
  108.   
  109.   private static class ReuseDetector extends MapReduceBase
  110.       implements Mapper<BytesWritable,BytesWritable, Text, Text> {
  111.     static int instances = 0;
  112.     Reporter reporter = null;
  113.     @Override
  114.     public void map(BytesWritable key, BytesWritable value,
  115.                     OutputCollector<Text, Text> output, 
  116.                     Reporter reporter) throws IOException {
  117.       this.reporter = reporter;
  118.     }
  119.     
  120.     public void close() throws IOException {
  121.       reporter.incrCounter("jvm", "use", ++instances);
  122.     }
  123.   }
  124.   private static void runJvmReuseTest(JobConf job,
  125.                                       boolean reuse) throws IOException {
  126.     // setup a map-only job that reads the input and only sets the counters
  127.     // based on how many times the jvm was reused.
  128.     job.setInt("mapred.job.reuse.jvm.num.tasks", reuse ? -1 : 1);
  129.     FileInputFormat.setInputPaths(job, SORT_INPUT_PATH);
  130.     job.setInputFormat(SequenceFileInputFormat.class);
  131.     job.setOutputFormat(NullOutputFormat.class);
  132.     job.setMapperClass(ReuseDetector.class);
  133.     job.setOutputKeyClass(Text.class);
  134.     job.setOutputValueClass(Text.class);
  135.     job.setNumMapTasks(24);
  136.     job.setNumReduceTasks(0);
  137.     RunningJob result = JobClient.runJob(job);
  138.     long uses = result.getCounters().findCounter("jvm", "use").getValue();
  139.     int maps = job.getNumMapTasks();
  140.     if (reuse) {
  141.       assertTrue("maps = " + maps + ", uses = " + uses, maps < uses);
  142.     } else {
  143.       assertEquals("uses should be number of maps", job.getNumMapTasks(), uses);
  144.     }
  145.   }
  146.   public void testMapReduceSort() throws Exception {
  147.     // Run randomwriter to generate input for 'sort'
  148.     runRandomWriter(mrCluster.createJobConf(), SORT_INPUT_PATH);
  149.     // Run sort
  150.     runSort(mrCluster.createJobConf(), SORT_INPUT_PATH, SORT_OUTPUT_PATH);
  151.     // Run sort-validator to check if sort worked correctly
  152.     runSortValidator(mrCluster.createJobConf(), SORT_INPUT_PATH, 
  153.                      SORT_OUTPUT_PATH);
  154.   }
  155.   
  156.   public void testJvmReuse() throws Exception {
  157.     runJvmReuseTest(mrCluster.createJobConf(), true);
  158.   }
  159.   public void testNoJvmReuse() throws Exception {
  160.     runJvmReuseTest(mrCluster.createJobConf(), false);
  161.   }
  162. }