TestReduceFetch.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:6k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.IOException;
  20. import java.util.Arrays;
  21. import junit.framework.Test;
  22. import junit.framework.TestCase;
  23. import junit.framework.TestSuite;
  24. import junit.extensions.TestSetup;
  25. import org.apache.hadoop.conf.Configuration;
  26. import org.apache.hadoop.fs.FileSystem;
  27. import org.apache.hadoop.fs.Path;
  28. import org.apache.hadoop.hdfs.MiniDFSCluster;
  29. import org.apache.hadoop.io.NullWritable;
  30. import org.apache.hadoop.io.Text;
  31. import org.apache.hadoop.mapred.TestMapCollection.FakeIF;
  32. import org.apache.hadoop.mapred.TestMapCollection.FakeSplit;
  33. import org.apache.hadoop.mapred.lib.IdentityReducer;
  34. public class TestReduceFetch extends TestCase {
  35.   private static MiniMRCluster mrCluster = null;
  36.   private static MiniDFSCluster dfsCluster = null;
  37.   public static Test suite() {
  38.     TestSetup setup = new TestSetup(new TestSuite(TestReduceFetch.class)) {
  39.       protected void setUp() throws Exception {
  40.         Configuration conf = new Configuration();
  41.         dfsCluster = new MiniDFSCluster(conf, 2, true, null);
  42.         mrCluster = new MiniMRCluster(2,
  43.             dfsCluster.getFileSystem().getUri().toString(), 1);
  44.       }
  45.       protected void tearDown() throws Exception {
  46.         if (dfsCluster != null) { dfsCluster.shutdown(); }
  47.         if (mrCluster != null) { mrCluster.shutdown(); }
  48.       }
  49.     };
  50.     return setup;
  51.   }
  52.   public static class MapMB
  53.       implements Mapper<NullWritable,NullWritable,Text,Text> {
  54.     public void map(NullWritable nk, NullWritable nv,
  55.         OutputCollector<Text, Text> output, Reporter reporter)
  56.         throws IOException {
  57.       Text key = new Text();
  58.       Text val = new Text();
  59.       key.set("KEYKEYKEYKEYKEYKEYKEYKEY");
  60.       byte[] b = new byte[1000];
  61.       Arrays.fill(b, (byte)'V');
  62.       val.set(b);
  63.       b = null;
  64.       for (int i = 0; i < 4 * 1024; ++i) {
  65.         output.collect(key, val);
  66.       }
  67.     }
  68.     public void configure(JobConf conf) { }
  69.     public void close() throws IOException { }
  70.   }
  71.   public static Counters runJob(JobConf conf) throws Exception {
  72.     conf.setMapperClass(MapMB.class);
  73.     conf.setReducerClass(IdentityReducer.class);
  74.     conf.setOutputKeyClass(Text.class);
  75.     conf.setOutputValueClass(Text.class);
  76.     conf.setNumReduceTasks(1);
  77.     conf.setInputFormat(FakeIF.class);
  78.     FileInputFormat.setInputPaths(conf, new Path("/in"));
  79.     final Path outp = new Path("/out");
  80.     FileOutputFormat.setOutputPath(conf, outp);
  81.     RunningJob job = null;
  82.     try {
  83.       job = JobClient.runJob(conf);
  84.       assertTrue(job.isSuccessful());
  85.     } finally {
  86.       FileSystem fs = dfsCluster.getFileSystem();
  87.       if (fs.exists(outp)) {
  88.         fs.delete(outp, true);
  89.       }
  90.     }
  91.     return job.getCounters();
  92.   }
  93.   public void testReduceFromDisk() throws Exception {
  94.     JobConf job = mrCluster.createJobConf();
  95.     job.set("mapred.job.reduce.input.buffer.percent", "0.0");
  96.     job.setNumMapTasks(3);
  97.     Counters c = runJob(job);
  98.     final long hdfsWritten = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP, 
  99.         Task.getFileSystemCounterNames("hdfs")[1]).getCounter();
  100.     final long localRead = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP, 
  101.         Task.getFileSystemCounterNames("file")[0]).getCounter();
  102.     assertTrue("Expected more bytes read from local (" +
  103.         localRead + ") than written to HDFS (" + hdfsWritten + ")",
  104.         hdfsWritten <= localRead);
  105.   }
  106.   public void testReduceFromPartialMem() throws Exception {
  107.     JobConf job = mrCluster.createJobConf();
  108.     job.setNumMapTasks(5);
  109.     job.setInt("mapred.inmem.merge.threshold", 0);
  110.     job.set("mapred.job.reduce.input.buffer.percent", "1.0");
  111.     job.setInt("mapred.reduce.parallel.copies", 1);
  112.     job.setInt("io.sort.mb", 10);
  113.     job.set("mapred.child.java.opts", "-Xmx128m");
  114.     job.set("mapred.job.shuffle.input.buffer.percent", "0.14");
  115.     job.setNumTasksToExecutePerJvm(1);
  116.     job.set("mapred.job.shuffle.merge.percent", "1.0");
  117.     Counters c = runJob(job);
  118.     final long hdfsWritten = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP, 
  119.         Task.getFileSystemCounterNames("hdfs")[1]).getCounter();
  120.     final long localRead = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP, 
  121.         Task.getFileSystemCounterNames("file")[0]).getCounter();
  122.     assertTrue("Expected at least 1MB fewer bytes read from local (" +
  123.         localRead + ") than written to HDFS (" + hdfsWritten + ")",
  124.         hdfsWritten >= localRead + 1024 * 1024);
  125.   }
  126.   public void testReduceFromMem() throws Exception {
  127.     JobConf job = mrCluster.createJobConf();
  128.     job.set("mapred.job.reduce.input.buffer.percent", "1.0");
  129.     job.setNumMapTasks(3);
  130.     Counters c = runJob(job);
  131.     final long localRead = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP, 
  132.         Task.getFileSystemCounterNames("file")[0]).getCounter();
  133.     assertTrue("Non-zero read from local: " + localRead, localRead == 0);
  134.   }
  135. }