TestJobTrackerSafeMode.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:10k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import org.apache.commons.logging.Log;
  20. import org.apache.commons.logging.LogFactory;
  21. import org.apache.hadoop.conf.Configuration;
  22. import org.apache.hadoop.fs.*;
  23. import org.apache.hadoop.hdfs.MiniDFSCluster;
  24. import junit.framework.TestCase;
  25. import java.io.*;
  26. import java.util.HashSet;
  27. import java.util.Set;
  28. /** 
  29.  * This test checks jobtracker in safe mode. In safe mode the jobtracker upon 
  30.  * restart doesnt schedule any new tasks and waits for the (old) trackers to 
  31.  * join back.
  32.  */
  33. public class TestJobTrackerSafeMode extends TestCase {
  34.   final Path testDir = 
  35.     new Path(System.getProperty("test.build.data", "/tmp"), "jt-safemode");
  36.   final Path inDir = new Path(testDir, "input");
  37.   final Path shareDir = new Path(testDir, "share");
  38.   final Path outputDir = new Path(testDir, "output");
  39.   final int numDir = 1;
  40.   final int numTrackers = 2;
  41.   
  42.   private static final Log LOG = 
  43.     LogFactory.getLog(TestJobTrackerSafeMode.class);
  44.   
  45.   private JobConf configureJob(JobConf conf, int maps, int reduces,
  46.                                String mapSignal, String redSignal) 
  47.   throws IOException {
  48.     UtilsForTests.configureWaitingJobConf(conf, inDir, outputDir, 
  49.         maps, reduces, "test-jobtracker-safemode", 
  50.         mapSignal, redSignal);
  51.     return conf;
  52.   }
  53.   
  54.   /**
  55.    * Tests the jobtracker's safemode. The test is as follows : 
  56.    *   - starts a cluster with 2 trackers
  57.    *   - submits a job with large (40) maps to make sure that all the trackers 
  58.    *     are logged to the job history
  59.    *   - wait for the job to be 50% done
  60.    *   - stop the jobtracker
  61.    *   - wait for the trackers to be done with all the tasks
  62.    *   - kill a task tracker
  63.    *   - start the jobtracker
  64.    *   - start 2 more trackers
  65.    *   - now check that while all the tracker are detected (or lost) the 
  66.    *     scheduling window is closed
  67.    *   - check that after all the trackers are recovered, scheduling is opened 
  68.    */
  69.   private void testSafeMode(MiniDFSCluster dfs, MiniMRCluster mr) 
  70.   throws IOException {
  71.     FileSystem fileSys = dfs.getFileSystem();
  72.     JobConf jobConf = mr.createJobConf();
  73.     String mapSignalFile = UtilsForTests.getMapSignalFile(shareDir);
  74.     String redSignalFile = UtilsForTests.getReduceSignalFile(shareDir);
  75.     JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
  76.     int numTracker = jobtracker.getClusterStatus(false).getTaskTrackers();
  77.     
  78.     // Configure the jobs
  79.     JobConf job = configureJob(jobConf, 40, 0, mapSignalFile, redSignalFile);
  80.       
  81.     fileSys.delete(shareDir, true);
  82.     
  83.     // Submit a master job   
  84.     JobClient jobClient = new JobClient(job);
  85.     RunningJob rJob = jobClient.submitJob(job);
  86.     JobID id = rJob.getID();
  87.     
  88.     // wait for the job to be inited
  89.     mr.initializeJob(id);
  90.     
  91.     // Make sure that the master job is 50% completed
  92.     while (UtilsForTests.getJobStatus(jobClient, id).mapProgress() 
  93.            < 0.5f) {
  94.       LOG.info("Waiting for the job to be 50% done");
  95.       UtilsForTests.waitFor(100);
  96.     }
  97.     // Kill the jobtracker
  98.     mr.stopJobTracker();
  99.     // Enable recovery on restart
  100.     mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", 
  101.                                       true);
  102.     
  103.     // Signal the maps to complete
  104.     UtilsForTests.signalTasks(dfs, fileSys, true, mapSignalFile, redSignalFile);
  105.     
  106.     // Signal the reducers to complete
  107.     UtilsForTests.signalTasks(dfs, fileSys, false, mapSignalFile, 
  108.                               redSignalFile);
  109.     
  110.     // wait for the tasks to complete at the tracker
  111.     Set<String> trackers = new HashSet<String>();
  112.     for (int i = 0 ; i < numTracker; ++i) {
  113.       TaskTracker t = mr.getTaskTrackerRunner(i).getTaskTracker();
  114.       trackers.add(t.getName());
  115.       int runningCount = t.getRunningTaskStatuses().size();
  116.       while (runningCount != 0) {
  117.         LOG.info("Waiting for tracker " + t.getName() + " to stabilize");
  118.         UtilsForTests.waitFor(100);
  119.         runningCount = 0;
  120.         for (TaskStatus status : t.getRunningTaskStatuses()) {
  121.           if (status.getIsMap() 
  122.               && (status.getRunState() == TaskStatus.State.UNASSIGNED 
  123.                   || status.getRunState() == TaskStatus.State.RUNNING)) {
  124.             ++runningCount;
  125.           }
  126.         }
  127.       }
  128.     }
  129.     LOG.info("Trackers have stabilized");
  130.     
  131.     // Kill a tasktracker
  132.     int trackerToKill = --numTracker;
  133.     TaskTracker t = mr.getTaskTrackerRunner(trackerToKill).getTaskTracker();
  134.     
  135.     trackers.remove(t.getName()); // remove this from the set to check
  136.     
  137.     Set<String> lostTrackers = new HashSet<String>();
  138.     lostTrackers.add(t.getName());
  139.     
  140.     // get the attempt-id's to ignore
  141.     // stop the tracker
  142.     LOG.info("Stopping tracker : " + t.getName());
  143.     mr.getTaskTrackerRunner(trackerToKill).getTaskTracker().shutdown();
  144.     mr.stopTaskTracker(trackerToKill);
  145.     // Restart the jobtracker
  146.     mr.startJobTracker();
  147.     // Wait for the JT to be ready
  148.     UtilsForTests.waitForJobTracker(jobClient);
  149.     jobtracker = mr.getJobTrackerRunner().getJobTracker();
  150.     // Start a tracker
  151.     LOG.info("Start a new tracker");
  152.     mr.startTaskTracker(null, null, ++numTracker, numDir);
  153.     
  154.     // Start a tracker
  155.     LOG.info("Start a new tracker");
  156.     mr.startTaskTracker(null, null, ++numTracker, numDir);
  157.     // Check if the jobs are still running
  158.     
  159.     // Wait for the tracker to be lost
  160.     boolean shouldSchedule = jobtracker.recoveryManager.shouldSchedule();
  161.     while (!checkTrackers(jobtracker, trackers, lostTrackers)) {
  162.       assertFalse("JobTracker has opened up scheduling before all the" 
  163.                   + " trackers were recovered", shouldSchedule);
  164.       UtilsForTests.waitFor(100);
  165.       
  166.       // snapshot jobtracker's scheduling status
  167.       shouldSchedule = jobtracker.recoveryManager.shouldSchedule();
  168.     }
  169.     assertTrue("JobTracker hasnt opened up scheduling even all the" 
  170.                + " trackers were recovered", 
  171.                jobtracker.recoveryManager.shouldSchedule());
  172.     
  173.     assertEquals("Recovery manager is in inconsistent state", 
  174.                  0, jobtracker.recoveryManager.recoveredTrackers.size());
  175.     
  176.     // wait for the job to be complete
  177.     UtilsForTests.waitTillDone(jobClient);
  178.   }
  179.   private boolean checkTrackers(JobTracker jobtracker, Set<String> present, 
  180.                                 Set<String> absent) {
  181.     long jobtrackerRecoveryFinishTime = 
  182.       jobtracker.getStartTime() + jobtracker.getRecoveryDuration();
  183.     for (String trackerName : present) {
  184.       TaskTrackerStatus status = jobtracker.getTaskTracker(trackerName);
  185.       // check if the status is present and also the tracker has contacted back
  186.       // after restart
  187.       if (status == null 
  188.           || status.getLastSeen() < jobtrackerRecoveryFinishTime) {
  189.         return false;
  190.       }
  191.     }
  192.     for (String trackerName : absent) {
  193.       TaskTrackerStatus status = jobtracker.getTaskTracker(trackerName);
  194.       // check if the status is still present
  195.       if ( status != null) {
  196.         return false;
  197.       }
  198.     }
  199.     return true;
  200.   }
  201.   /**
  202.    * Test {@link JobTracker}'s safe mode.
  203.    */
  204.   public void testJobTrackerSafeMode() throws IOException {
  205.     String namenode = null;
  206.     MiniDFSCluster dfs = null;
  207.     MiniMRCluster mr = null;
  208.     FileSystem fileSys = null;
  209.     try {
  210.       Configuration conf = new Configuration();
  211.       conf.setBoolean("dfs.replication.considerLoad", false);
  212.       dfs = new MiniDFSCluster(conf, 1, true, null, null);
  213.       dfs.waitActive();
  214.       fileSys = dfs.getFileSystem();
  215.       
  216.       // clean up
  217.       fileSys.delete(testDir, true);
  218.       
  219.       if (!fileSys.mkdirs(inDir)) {
  220.         throw new IOException("Mkdirs failed to create " + inDir.toString());
  221.       }
  222.       // Write the input file
  223.       UtilsForTests.writeFile(dfs.getNameNode(), conf, 
  224.                               new Path(inDir + "/file"), (short)1);
  225.       dfs.startDataNodes(conf, 1, true, null, null, null, null);
  226.       dfs.waitActive();
  227.       namenode = (dfs.getFileSystem()).getUri().getHost() + ":" 
  228.                  + (dfs.getFileSystem()).getUri().getPort();
  229.       // Make sure that jobhistory leads to a proper job restart
  230.       // So keep the blocksize and the buffer size small
  231.       JobConf jtConf = new JobConf();
  232.       jtConf.set("mapred.jobtracker.job.history.block.size", "512");
  233.       jtConf.set("mapred.jobtracker.job.history.buffer.size", "512");
  234.       jtConf.setInt("mapred.tasktracker.map.tasks.maximum", 1);
  235.       jtConf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1);
  236.       jtConf.setLong("mapred.tasktracker.expiry.interval", 5000);
  237.       jtConf.setInt("mapred.reduce.copy.backoff", 4);
  238.       jtConf.setLong("mapred.job.reuse.jvm.num.tasks", -1);
  239.       
  240.       mr = new MiniMRCluster(numTrackers, namenode, numDir, null, null, jtConf);
  241.       
  242.       // Test Lost tracker case
  243.       testSafeMode(dfs, mr);
  244.     } finally {
  245.       if (mr != null) {
  246.         try {
  247.           mr.shutdown();
  248.         } catch (Exception e) {}
  249.       }
  250.       if (dfs != null) {
  251.         try {
  252.           dfs.shutdown();
  253.         } catch (Exception e) {}
  254.       }
  255.     }
  256.   }
  257.   public static void main(String[] args) throws IOException {
  258.     new TestJobTrackerSafeMode().testJobTrackerSafeMode();
  259.   }
  260. }