TestTaskTrackerMemoryManager.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:14k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.IOException;
  20. import java.util.regex.Pattern;
  21. import java.util.regex.Matcher;
  22. import org.apache.commons.logging.Log;
  23. import org.apache.commons.logging.LogFactory;
  24. import org.apache.hadoop.hdfs.MiniDFSCluster;
  25. import org.apache.hadoop.hdfs.server.namenode.NameNode;
  26. import org.apache.hadoop.examples.SleepJob;
  27. import org.apache.hadoop.util.MemoryCalculatorPlugin;
  28. import org.apache.hadoop.util.ProcfsBasedProcessTree;
  29. import org.apache.hadoop.util.StringUtils;
  30. import org.apache.hadoop.util.ToolRunner;
  31. import org.apache.hadoop.fs.FileSystem;
  32. import junit.framework.TestCase;
  33. /**
  34.  * Test class to verify memory management of tasks.
  35.  */
  36. public class TestTaskTrackerMemoryManager extends TestCase {
  37.   private static final Log LOG =
  38.       LogFactory.getLog(TestTaskTrackerMemoryManager.class);
  39.   private MiniDFSCluster miniDFSCluster;
  40.   private MiniMRCluster miniMRCluster;
  41.   private String taskOverLimitPatternString =
  42.       "TaskTree \[pid=[0-9]*,tipID=.*\] is running beyond memory-limits. "
  43.           + "Current usage : [0-9]*bytes. Limit : %sbytes. Killing task.";
  44.   private void startCluster(JobConf conf) throws Exception {
  45.     miniDFSCluster = new MiniDFSCluster(conf, 1, true, null);
  46.     FileSystem fileSys = miniDFSCluster.getFileSystem();
  47.     String namenode = fileSys.getUri().toString();
  48.     miniMRCluster = new MiniMRCluster(1, namenode, 1, null, null, conf);
  49.   }
  50.   @Override
  51.   protected void tearDown() {
  52.     if (miniMRCluster != null) {
  53.       miniMRCluster.shutdown();
  54.     }
  55.     if (miniDFSCluster != null) {
  56.       miniDFSCluster.shutdown();
  57.     }
  58.   }
  59.   private void runSleepJob(JobConf conf) throws Exception {
  60.     String[] args = { "-m", "3", "-r", "1", "-mt", "3000", "-rt", "1000" };
  61.     ToolRunner.run(conf, new SleepJob(), args);
  62.   }
  63.   private void runAndCheckSuccessfulJob(JobConf conf)
  64.       throws IOException {
  65.     // Set up job.
  66.     JobTracker jt = miniMRCluster.getJobTrackerRunner().getJobTracker();
  67.     conf.set("mapred.job.tracker", jt.getJobTrackerMachine() + ":"
  68.         + jt.getTrackerPort());
  69.     NameNode nn = miniDFSCluster.getNameNode();
  70.     conf.set("fs.default.name", "hdfs://"
  71.         + nn.getNameNodeAddress().getHostName() + ":"
  72.         + nn.getNameNodeAddress().getPort());
  73.     Pattern taskOverLimitPattern =
  74.         Pattern.compile(String.format(taskOverLimitPatternString, "[0-9]*"));
  75.     Matcher mat = null;
  76.     // Start the job.
  77.     boolean success = true;
  78.     try {
  79.       runSleepJob(conf);
  80.       success = true;
  81.     } catch (Exception e) {
  82.       success = false;
  83.     }
  84.     // Job has to succeed
  85.     assertTrue(success);
  86.     JobClient jClient = new JobClient(conf);
  87.     JobStatus[] jStatus = jClient.getAllJobs();
  88.     JobStatus js = jStatus[0]; // Our only job
  89.     RunningJob rj = jClient.getJob(js.getJobID());
  90.     // All events
  91.     TaskCompletionEvent[] taskComplEvents = rj.getTaskCompletionEvents(0);
  92.     for (TaskCompletionEvent tce : taskComplEvents) {
  93.       String[] diagnostics =
  94.           rj.getTaskDiagnostics(tce.getTaskAttemptId());
  95.       if (diagnostics != null) {
  96.         for (String str : diagnostics) {
  97.           mat = taskOverLimitPattern.matcher(str);
  98.           // The error pattern shouldn't be there in any TIP's diagnostics
  99.           assertFalse(mat.find());
  100.         }
  101.       }
  102.     }
  103.   }
  104.   private boolean isProcfsBasedTreeAvailable() {
  105.     try {
  106.       if (!ProcfsBasedProcessTree.isAvailable()) {
  107.         LOG.info("Currently ProcessTree has only one implementation "
  108.             + "ProcfsBasedProcessTree, which is not available on this "
  109.             + "system. Not testing");
  110.         return false;
  111.       }
  112.     } catch (Exception e) {
  113.       LOG.info(StringUtils.stringifyException(e));
  114.       return false;
  115.     }
  116.     return true;
  117.   }
  118.   /**
  119.    * Test for verifying that nothing is killed when memory management is
  120.    * disabled on the TT, even when the tasks run over their limits.
  121.    * 
  122.    * @throws Exception
  123.    */
  124.   public void testTTLimitsDisabled()
  125.       throws Exception {
  126.     // Run the test only if memory management is enabled
  127.     if (!isProcfsBasedTreeAvailable()) {
  128.       return;
  129.     }
  130.     JobConf conf = new JobConf();
  131.     // Task-memory management disabled by default.
  132.     startCluster(conf);
  133.     long PER_TASK_LIMIT = 100L; // Doesn't matter how low.
  134.     conf.setMaxVirtualMemoryForTask(PER_TASK_LIMIT);
  135.     runAndCheckSuccessfulJob(conf);
  136.   }
  137.   /**
  138.    * Test for verifying that tasks with no limits, with the cumulative usage
  139.    * still under TT's limits, succeed.
  140.    * 
  141.    * @throws Exception
  142.    */
  143.   public void testTasksWithNoLimits()
  144.       throws Exception {
  145.     // Run the test only if memory management is enabled
  146.     if (!isProcfsBasedTreeAvailable()) {
  147.       return;
  148.     }
  149.     // Fairly large value for sleepJob to succeed
  150.     long ttLimit = 4 * 1024 * 1024 * 1024L;
  151.     // Start cluster with proper configuration.
  152.     JobConf fConf = new JobConf();
  153.     fConf.setClass(
  154.         TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
  155.         DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class);
  156.     fConf.setLong(DummyMemoryCalculatorPlugin.MAXVMEM_TESTING_PROPERTY,
  157.         ttLimit);
  158.     fConf.setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY, ttLimit);
  159.     fConf.setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY, ttLimit);
  160.     fConf.setLong(
  161.         TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY, 0);
  162.     startCluster(fConf);
  163.     JobConf conf = new JobConf();
  164.     runAndCheckSuccessfulJob(conf);
  165.   }
  166.   /**
  167.    * Test for verifying that tasks within limits, with the cumulative usage also
  168.    * under TT's limits succeed.
  169.    * 
  170.    * @throws Exception
  171.    */
  172.   public void testTasksWithinLimits()
  173.       throws Exception {
  174.     // Run the test only if memory management is enabled
  175.     if (!isProcfsBasedTreeAvailable()) {
  176.       return;
  177.     }
  178.     // Large so that sleepjob goes through and fits total TT usage
  179.     long PER_TASK_LIMIT = 2 * 1024 * 1024 * 1024L;
  180.     long TASK_TRACKER_LIMIT = 4 * 1024 * 1024 * 1024L;
  181.     // Start cluster with proper configuration.
  182.     JobConf fConf = new JobConf();
  183.     fConf.setClass(
  184.         TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
  185.         DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class);
  186.     fConf.setLong(DummyMemoryCalculatorPlugin.MAXVMEM_TESTING_PROPERTY,
  187.         TASK_TRACKER_LIMIT);
  188.     fConf.setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
  189.         TASK_TRACKER_LIMIT);
  190.     fConf.setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
  191.         TASK_TRACKER_LIMIT);
  192.     fConf.setLong(
  193.         TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY, 0);
  194.     startCluster(fConf);
  195.     JobConf conf = new JobConf();
  196.     conf.setMaxVirtualMemoryForTask(PER_TASK_LIMIT);
  197.     runAndCheckSuccessfulJob(conf);
  198.   }
  199.   /**
  200.    * Test for verifying that tasks that go beyond limits, though the cumulative
  201.    * usage is under TT's limits, get killed.
  202.    * 
  203.    * @throws Exception
  204.    */
  205.   public void testTasksBeyondLimits()
  206.       throws Exception {
  207.     // Run the test only if memory management is enabled
  208.     if (!isProcfsBasedTreeAvailable()) {
  209.       return;
  210.     }
  211.     long PER_TASK_LIMIT = 444; // Low enough to kill off sleepJob tasks.
  212.     long TASK_TRACKER_LIMIT = 4 * 1024 * 1024 * 1024L; // Large so as to fit
  213.     // total usage
  214.     Pattern taskOverLimitPattern =
  215.         Pattern.compile(String.format(taskOverLimitPatternString, String
  216.             .valueOf(PER_TASK_LIMIT)));
  217.     Matcher mat = null;
  218.     // Start cluster with proper configuration.
  219.     JobConf fConf = new JobConf();
  220.     fConf.setClass(
  221.         TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
  222.         DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class);
  223.     fConf.setLong(DummyMemoryCalculatorPlugin.MAXVMEM_TESTING_PROPERTY,
  224.         TASK_TRACKER_LIMIT);
  225.     fConf.setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
  226.         TASK_TRACKER_LIMIT);
  227.     fConf.setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
  228.         TASK_TRACKER_LIMIT);
  229.     fConf.setLong(
  230.         TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY, 0);
  231.     // very small value, so that no task escapes to successful completion.
  232.     fConf.set("mapred.tasktracker.taskmemorymanager.monitoring-interval",
  233.         String.valueOf(300));
  234.     startCluster(fConf);
  235.     // Set up job.
  236.     JobConf conf = new JobConf();
  237.     conf.setMaxVirtualMemoryForTask(PER_TASK_LIMIT);
  238.     JobTracker jt = miniMRCluster.getJobTrackerRunner().getJobTracker();
  239.     conf.set("mapred.job.tracker", jt.getJobTrackerMachine() + ":"
  240.         + jt.getTrackerPort());
  241.     NameNode nn = miniDFSCluster.getNameNode();
  242.     conf.set("fs.default.name", "hdfs://"
  243.         + nn.getNameNodeAddress().getHostName() + ":"
  244.         + nn.getNameNodeAddress().getPort());
  245.     // Start the job.
  246.     boolean success = true;
  247.     try {
  248.       runSleepJob(conf);
  249.       success = true;
  250.     } catch (Exception e) {
  251.       success = false;
  252.     }
  253.     // Job has to fail
  254.     assertFalse(success);
  255.     JobClient jClient = new JobClient(conf);
  256.     JobStatus[] jStatus = jClient.getAllJobs();
  257.     JobStatus js = jStatus[0]; // Our only job
  258.     RunningJob rj = jClient.getJob(js.getJobID());
  259.     // All events
  260.     TaskCompletionEvent[] taskComplEvents = rj.getTaskCompletionEvents(0);
  261.     for (TaskCompletionEvent tce : taskComplEvents) {
  262.       // Every task HAS to fail
  263.       assert (tce.getTaskStatus() == TaskCompletionEvent.Status.TIPFAILED || tce
  264.           .getTaskStatus() == TaskCompletionEvent.Status.FAILED);
  265.       String[] diagnostics =
  266.           rj.getTaskDiagnostics(tce.getTaskAttemptId());
  267.       // Every task HAS to spit out the out-of-memory errors
  268.       assert (diagnostics != null);
  269.       for (String str : diagnostics) {
  270.         mat = taskOverLimitPattern.matcher(str);
  271.         // Every task HAS to spit out the out-of-memory errors in the same
  272.         // format. And these are the only diagnostic messages.
  273.         assertTrue(mat.find());
  274.       }
  275.     }
  276.   }
  277.   /**
  278.    * Test for verifying that tasks causing cumulative usage to go beyond TT's
  279.    * limit get killed even though they all are under individual limits. Memory
  280.    * management for tasks with disabled task-limits also traverses the same
  281.    * code-path, so we don't need a separate testTaskLimitsDisabled.
  282.    * 
  283.    * @throws Exception
  284.    */
  285.   public void testTasksCumulativelyExceedingTTLimits()
  286.       throws Exception {
  287.     // Run the test only if memory management is enabled
  288.     if (!isProcfsBasedTreeAvailable()) {
  289.       return;
  290.     }
  291.     // Large enough for SleepJob Tasks.
  292.     long PER_TASK_LIMIT = 100000000000L;
  293.     // Very Limited TT. All tasks will be killed.
  294.     long TASK_TRACKER_LIMIT = 100L;
  295.     Pattern taskOverLimitPattern =
  296.         Pattern.compile(String.format(taskOverLimitPatternString, String
  297.             .valueOf(PER_TASK_LIMIT)));
  298.     Pattern trackerOverLimitPattern =
  299.         Pattern
  300.             .compile("Killing one of the least progress tasks - .*, as "
  301.                 + "the cumulative memory usage of all the tasks on the TaskTracker"
  302.                 + " exceeds virtual memory limit " + TASK_TRACKER_LIMIT + ".");
  303.     Matcher mat = null;
  304.     // Start cluster with proper configuration.
  305.     JobConf fConf = new JobConf();
  306.     fConf.setClass(
  307.         TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
  308.         DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class);
  309.     fConf.setLong(DummyMemoryCalculatorPlugin.MAXVMEM_TESTING_PROPERTY,
  310.         TASK_TRACKER_LIMIT);
  311.     fConf.setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
  312.         TASK_TRACKER_LIMIT);
  313.     fConf.setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
  314.         TASK_TRACKER_LIMIT);
  315.     fConf.setLong(
  316.         TaskTracker.MAPRED_TASKTRACKER_VMEM_RESERVED_PROPERTY, 0);
  317.     // very small value, so that no task escapes to successful completion.
  318.     fConf.set("mapred.tasktracker.taskmemorymanager.monitoring-interval",
  319.         String.valueOf(300));
  320.     startCluster(fConf);
  321.     // Set up job.
  322.     JobConf conf = new JobConf();
  323.     conf.setMaxVirtualMemoryForTask(PER_TASK_LIMIT);
  324.     JobTracker jt = miniMRCluster.getJobTrackerRunner().getJobTracker();
  325.     conf.set("mapred.job.tracker", jt.getJobTrackerMachine() + ":"
  326.         + jt.getTrackerPort());
  327.     NameNode nn = miniDFSCluster.getNameNode();
  328.     conf.set("fs.default.name", "hdfs://"
  329.         + nn.getNameNodeAddress().getHostName() + ":"
  330.         + nn.getNameNodeAddress().getPort());
  331.     JobClient jClient = new JobClient(conf);
  332.     SleepJob sleepJob = new SleepJob();
  333.     sleepJob.setConf(conf);
  334.     // Start the job
  335.     RunningJob job =
  336.         jClient.submitJob(sleepJob.setupJobConf(1, 1, 5000, 1, 1000, 1));
  337.     boolean TTOverFlowMsgPresent = false;
  338.     while (true) {
  339.       // Set-up tasks are the first to be launched.
  340.       TaskReport[] setUpReports = jt.getSetupTaskReports(job.getID());
  341.       for (TaskReport tr : setUpReports) {
  342.         String[] diag = tr.getDiagnostics();
  343.         for (String str : diag) {
  344.           mat = taskOverLimitPattern.matcher(str);
  345.           assertFalse(mat.find());
  346.           mat = trackerOverLimitPattern.matcher(str);
  347.           if (mat.find()) {
  348.             TTOverFlowMsgPresent = true;
  349.           }
  350.         }
  351.       }
  352.       if (TTOverFlowMsgPresent) {
  353.         break;
  354.       }
  355.       try {
  356.         Thread.sleep(1000);
  357.       } catch (InterruptedException e) {
  358.         // nothing
  359.       }
  360.     }
  361.     // If it comes here without a test-timeout, it means there was a task that
  362.     // was killed because of crossing cumulative TT limit.
  363.     // Test succeeded, kill the job.
  364.     job.killJob();
  365.   }
  366. }