TestTTMemoryReporting.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:9k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.IOException;
  20. import java.util.List;
  21. import org.apache.commons.logging.Log;
  22. import org.apache.commons.logging.LogFactory;
  23. import org.apache.hadoop.conf.Configuration;
  24. import org.apache.hadoop.examples.SleepJob;
  25. import org.apache.hadoop.fs.FileSystem;
  26. import org.apache.hadoop.hdfs.MiniDFSCluster;
  27. import org.apache.hadoop.util.LinuxMemoryCalculatorPlugin;
  28. import org.apache.hadoop.util.MemoryCalculatorPlugin;
  29. import org.apache.hadoop.util.ToolRunner;
  30. import junit.framework.TestCase;
  31. /**
  32.  * This test class tests the functionality related to configuring, reporting
  33.  * and computing memory related parameters in a Map/Reduce cluster.
  34.  * 
  35.  * Each test sets up a {@link MiniMRCluster} with a locally defined 
  36.  * {@link org.apache.hadoop.mapred.TaskScheduler}. This scheduler validates 
  37.  * the memory related configuration is correctly computed and reported from 
  38.  * the tasktracker in 
  39.  * {@link org.apache.hadoop.mapred.TaskScheduler#assignTasks(TaskTrackerStatus)}.
  40.  */
  41. public class TestTTMemoryReporting extends TestCase {
  42.   static final Log LOG = LogFactory.getLog(TestTTMemoryReporting.class);
  43.   
  44.   private MiniDFSCluster miniDFSCluster;
  45.   private MiniMRCluster miniMRCluster;
  46.   /**
  47.    * Fake scheduler to test the proper reporting of memory values by TT
  48.    */
  49.   public static class FakeTaskScheduler extends JobQueueTaskScheduler {
  50.     
  51.     private boolean hasPassed = true;
  52.     private String message;
  53.     
  54.     public FakeTaskScheduler() {
  55.       super();
  56.     }
  57.     
  58.     public boolean hasTestPassed() {
  59.       return hasPassed;
  60.     }
  61.     
  62.     public String getFailureMessage() {
  63.       return message;
  64.     }
  65.     
  66.     @Override
  67.     public List<Task> assignTasks(TaskTrackerStatus status)
  68.         throws IOException {
  69.       long totalVirtualMemoryOnTT =
  70.           getConf().getLong("totalVmemOnTT", JobConf.DISABLED_MEMORY_LIMIT);
  71.       long totalPhysicalMemoryOnTT =
  72.           getConf().getLong("totalPmemOnTT", JobConf.DISABLED_MEMORY_LIMIT);
  73.       long virtualMemoryReservedOnTT =
  74.           getConf().getLong("reservedVmemOnTT", JobConf.DISABLED_MEMORY_LIMIT);
  75.       long physicalMemoryReservedOnTT =
  76.           getConf().getLong("reservedPmemOnTT", JobConf.DISABLED_MEMORY_LIMIT);
  77.       long reportedTotalVirtualMemoryOnTT =
  78.           status.getResourceStatus().getTotalVirtualMemory();
  79.       long reportedTotalPhysicalMemoryOnTT =
  80.           status.getResourceStatus().getTotalPhysicalMemory();
  81.       long reportedVirtualMemoryReservedOnTT =
  82.           status.getResourceStatus().getReservedTotalMemory();
  83.       long reportedPhysicalMemoryReservedOnTT =
  84.           status.getResourceStatus().getReservedPhysicalMemory();
  85.       message =
  86.           "expected memory values : (totalVirtualMemoryOnTT, totalPhysicalMemoryOnTT, "
  87.               + "virtualMemoryReservedOnTT, physicalMemoryReservedOnTT) = ("
  88.               + totalVirtualMemoryOnTT + ", " + totalPhysicalMemoryOnTT + ", "
  89.               + virtualMemoryReservedOnTT + ", " + physicalMemoryReservedOnTT
  90.               + ")";
  91.       message +=
  92.           "nreported memory values : (totalVirtualMemoryOnTT, totalPhysicalMemoryOnTT, "
  93.               + "virtualMemoryReservedOnTT, physicalMemoryReservedOnTT) = ("
  94.               + reportedTotalVirtualMemoryOnTT
  95.               + ", "
  96.               + reportedTotalPhysicalMemoryOnTT
  97.               + ", "
  98.               + reportedVirtualMemoryReservedOnTT
  99.               + ", "
  100.               + reportedPhysicalMemoryReservedOnTT + ")";
  101.       LOG.info(message);
  102.       if (totalVirtualMemoryOnTT != reportedTotalVirtualMemoryOnTT
  103.           || totalPhysicalMemoryOnTT != reportedTotalPhysicalMemoryOnTT
  104.           || virtualMemoryReservedOnTT != reportedVirtualMemoryReservedOnTT
  105.           || physicalMemoryReservedOnTT != reportedPhysicalMemoryReservedOnTT) {
  106.         hasPassed = false;
  107.       }
  108.       return super.assignTasks(status);
  109.     }
  110.   }
  111.   /**
  112.    * Test that verifies default values are configured and reported correctly.
  113.    * 
  114.    * @throws Exception
  115.    */
  116.   public void testDefaultMemoryValues()
  117.       throws Exception {
  118.     JobConf conf = new JobConf();
  119.     try {
  120.       // Memory values are disabled by default.
  121.       conf.setClass(
  122.           TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
  123.           DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class);
  124.       setUpCluster(conf);
  125.       runSleepJob();
  126.       verifyTestResults();
  127.     } finally {
  128.       tearDownCluster();
  129.     }
  130.   }
  131.   /**
  132.    * Test that verifies that configured values are reported correctly.
  133.    * 
  134.    * @throws Exception
  135.    */
  136.   public void testConfiguredMemoryValues()
  137.       throws Exception {
  138.     JobConf conf = new JobConf();
  139.     conf.setLong("totalVmemOnTT", 4 * 1024 * 1024 * 1024L);
  140.     conf.setLong("totalPmemOnTT", 2 * 1024 * 1024 * 1024L);
  141.     conf.setLong("reservedVmemOnTT", 1 * 1024 * 1024 * 1024L);
  142.     conf.setLong("reservedPmemOnTT", 512 * 1024 * 1024L);
  143.     conf.setClass(
  144.         TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
  145.         DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class);
  146.     conf.setLong(DummyMemoryCalculatorPlugin.MAXVMEM_TESTING_PROPERTY,
  147.         4 * 1024 * 1024 * 1024L);
  148.     conf.setLong(DummyMemoryCalculatorPlugin.MAXPMEM_TESTING_PROPERTY,
  149.         2 * 1024 * 1024 * 1024L);
  150.     conf.setLong(
  151.         TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY,
  152.         1 * 1024 * 1024 * 1024L);
  153.     conf.setLong(
  154.         TaskTracker.MAPRED_TASKTRACKER_PMEM_RESERVED_PROPERTY,
  155.         512 * 1024 * 1024L);
  156.     try {
  157.       setUpCluster(conf);
  158.       runSleepJob();
  159.       verifyTestResults();
  160.     } finally {
  161.       tearDownCluster();
  162.     }
  163.   }
  164.   /**
  165.    * Test that verifies that total memory values are calculated and reported
  166.    * correctly.
  167.    * 
  168.    * @throws Exception
  169.    */
  170.   public void testMemoryValuesOnLinux()
  171.       throws Exception {
  172.     if (!System.getProperty("os.name").startsWith("Linux")) {
  173.       return;
  174.     }
  175.     JobConf conf = new JobConf();
  176.     LinuxMemoryCalculatorPlugin plugin = new LinuxMemoryCalculatorPlugin();
  177.     conf.setLong("totalVmemOnTT", plugin.getVirtualMemorySize());
  178.     conf.setLong("totalPmemOnTT", plugin.getPhysicalMemorySize());
  179.     conf.setLong("reservedVmemOnTT", 1 * 1024 * 1024 * 1024L);
  180.     conf.setLong("reservedPmemOnTT", 512 * 1024 * 1024L);
  181.     conf.setLong(
  182.         TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY,
  183.         1 * 1024 * 1024 * 1024L);
  184.     conf.setLong(
  185.         TaskTracker.MAPRED_TASKTRACKER_PMEM_RESERVED_PROPERTY,
  186.         512 * 1024 * 1024L);
  187.     try {
  188.       setUpCluster(conf);
  189.       runSleepJob();
  190.       verifyTestResults();
  191.     } finally {
  192.       tearDownCluster();
  193.     }
  194.   }
  195.   private void setUpCluster(JobConf conf)
  196.                                 throws Exception {
  197.     conf.setClass("mapred.jobtracker.taskScheduler", 
  198.         TestTTMemoryReporting.FakeTaskScheduler.class,
  199.         TaskScheduler.class);
  200.     miniDFSCluster = new MiniDFSCluster(conf, 1, true, null);
  201.     FileSystem fileSys = miniDFSCluster.getFileSystem();
  202.     String namenode = fileSys.getUri().toString();
  203.     miniMRCluster = new MiniMRCluster(1, namenode, 3, 
  204.                       null, null, conf);    
  205.   }
  206.   
  207.   private void runSleepJob() throws Exception {
  208.     Configuration conf = new Configuration();
  209.     conf.set("mapred.job.tracker", "localhost:"
  210.                               + miniMRCluster.getJobTrackerPort());
  211.     String[] args = { "-m", "1", "-r", "1",
  212.                       "-mt", "1000", "-rt", "1000" };
  213.     ToolRunner.run(conf, new SleepJob(), args);
  214.   }
  215.   private void verifyTestResults() {
  216.     FakeTaskScheduler scheduler = 
  217.       (FakeTaskScheduler)miniMRCluster.getJobTrackerRunner().
  218.                               getJobTracker().getTaskScheduler();
  219.     assertTrue(scheduler.getFailureMessage(), scheduler.hasTestPassed());
  220.   }
  221.   
  222.   private void tearDownCluster() {
  223.     if (miniMRCluster != null) { miniMRCluster.shutdown(); }
  224.     if (miniDFSCluster != null) { miniDFSCluster.shutdown(); }
  225.   }
  226. }