TestRecoveryManager.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:15k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.IOException;
  20. import junit.framework.TestCase;
  21. import org.apache.commons.logging.Log;
  22. import org.apache.commons.logging.LogFactory;
  23. import org.apache.hadoop.conf.Configuration;
  24. import org.apache.hadoop.fs.FSDataOutputStream;
  25. import org.apache.hadoop.fs.FileSystem;
  26. import org.apache.hadoop.fs.Path;
  27. import org.apache.hadoop.mapred.JobTracker.RecoveryManager;
  28. import org.apache.hadoop.mapred.MiniMRCluster.JobTrackerRunner;
  29. import org.apache.hadoop.mapred.TestJobInProgressListener.MyScheduler;
  30. import org.apache.hadoop.security.UserGroupInformation;
  31. /**
  32.  * Test whether the {@link RecoveryManager} is able to tolerate job-recovery 
  33.  * failures and the jobtracker is able to tolerate {@link RecoveryManager}
  34.  * failure.
  35.  */
  36. public class TestRecoveryManager extends TestCase {
  37.   private static final Log LOG = 
  38.     LogFactory.getLog(TestRecoveryManager.class);
  39.   private static final Path TEST_DIR = 
  40.     new Path(System.getProperty("test.build.data", "/tmp"), 
  41.              "test-recovery-manager");
  42.   
  43.   /**
  44.    * Tests the {@link JobTracker} against the exceptions thrown in 
  45.    * {@link JobTracker.RecoveryManager}. It does the following :
  46.    *  - submits 2 jobs
  47.    *  - kills the jobtracker
  48.    *  - Garble job.xml for one job causing it to fail in constructor 
  49.    *    and job.split for another causing it to fail in init.
  50.    *  - restarts the jobtracker
  51.    *  - checks if the jobtraker starts normally
  52.    */
  53.   public void testJobTracker() throws Exception {
  54.     LOG.info("Testing jobtracker restart with faulty job");
  55.     String signalFile = new Path(TEST_DIR, "signal").toString();
  56.     JobConf conf = new JobConf();
  57.     
  58.     FileSystem fs = FileSystem.get(new Configuration());
  59.     fs.delete(TEST_DIR, true); // cleanup
  60.     
  61.     conf.set("mapred.jobtracker.job.history.block.size", "1024");
  62.     conf.set("mapred.jobtracker.job.history.buffer.size", "1024");
  63.     
  64.     MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
  65.     
  66.     JobConf job1 = mr.createJobConf();
  67.     
  68.     UtilsForTests.configureWaitingJobConf(job1, 
  69.         new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output1"), 2, 0, 
  70.         "test-recovery-manager", signalFile, signalFile);
  71.     
  72.     // submit the faulty job
  73.     RunningJob rJob1 = (new JobClient(job1)).submitJob(job1);
  74.     LOG.info("Submitted job " + rJob1.getID());
  75.     
  76.     while (rJob1.mapProgress() < 0.5f) {
  77.       LOG.info("Waiting for job " + rJob1.getID() + " to be 50% done");
  78.       UtilsForTests.waitFor(100);
  79.     }
  80.     
  81.     JobConf job2 = mr.createJobConf();
  82.     
  83.     UtilsForTests.configureWaitingJobConf(job2, 
  84.         new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output2"), 30, 0, 
  85.         "test-recovery-manager", signalFile, signalFile);
  86.     
  87.     // submit the faulty job
  88.     RunningJob rJob2 = (new JobClient(job2)).submitJob(job2);
  89.     LOG.info("Submitted job " + rJob2.getID());
  90.     
  91.     while (rJob2.mapProgress() < 0.5f) {
  92.       LOG.info("Waiting for job " + rJob2.getID() + " to be 50% done");
  93.       UtilsForTests.waitFor(100);
  94.     }
  95.     
  96.     // kill the jobtracker
  97.     LOG.info("Stopping jobtracker");
  98.     String sysDir = mr.getJobTrackerRunner().getJobTracker().getSystemDir();
  99.     mr.stopJobTracker();
  100.     
  101.     // delete the job.xml of job #1 causing the job to fail in constructor
  102.     Path jobFile = 
  103.       new Path(sysDir, rJob1.getID().toString() + Path.SEPARATOR + "job.xml");
  104.     LOG.info("Deleting job.xml file : " + jobFile.toString());
  105.     fs.delete(jobFile, false); // delete the job.xml file
  106.     
  107.     // create the job.xml file with 0 bytes
  108.     FSDataOutputStream out = fs.create(jobFile);
  109.     out.write(1);
  110.     out.close();
  111.     // delete the job.split of job #2 causing the job to fail in initTasks
  112.     Path jobSplitFile = 
  113.       new Path(sysDir, rJob2.getID().toString() + Path.SEPARATOR + "job.split");
  114.     LOG.info("Deleting job.split file : " + jobSplitFile.toString());
  115.     fs.delete(jobSplitFile, false); // delete the job.split file
  116.     
  117.     // create the job.split file with 0 bytes
  118.     out = fs.create(jobSplitFile);
  119.     out.write(1);
  120.     out.close();
  121.     // make sure that the jobtracker is in recovery mode
  122.     mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", 
  123.                                       true);
  124.     // start the jobtracker
  125.     LOG.info("Starting jobtracker");
  126.     mr.startJobTracker();
  127.     ClusterStatus status = 
  128.       mr.getJobTrackerRunner().getJobTracker().getClusterStatus(false);
  129.     
  130.     // check if the jobtracker came up or not
  131.     assertEquals("JobTracker crashed!", 
  132.                  JobTracker.State.RUNNING, status.getJobTrackerState());
  133.     
  134.     mr.shutdown();
  135.   }
  136.   
  137.   /**
  138.    * Tests the {@link JobTracker.RecoveryManager} against the exceptions thrown 
  139.    * during recovery. It does the following :
  140.    *  - submits a job with HIGH priority and x tasks
  141.    *  - allows it to complete 50%
  142.    *  - submits another job with normal priority and y tasks
  143.    *  - kills the jobtracker
  144.    *  - restarts the jobtracker with max-tasks-per-job such that 
  145.    *        y < max-tasks-per-job < x
  146.    *  - checks if the jobtraker starts normally and job#2 is recovered while 
  147.    *    job#1 is failed.
  148.    */
  149.   public void testRecoveryManager() throws Exception {
  150.     LOG.info("Testing recovery-manager");
  151.     String signalFile = new Path(TEST_DIR, "signal").toString();
  152.     
  153.     // clean up
  154.     FileSystem fs = FileSystem.get(new Configuration());
  155.     fs.delete(TEST_DIR, true);
  156.     
  157.     JobConf conf = new JobConf();
  158.     conf.set("mapred.jobtracker.job.history.block.size", "1024");
  159.     conf.set("mapred.jobtracker.job.history.buffer.size", "1024");
  160.     
  161.     MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
  162.     JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
  163.     
  164.     JobConf job1 = mr.createJobConf();
  165.     //  set the high priority
  166.     job1.setJobPriority(JobPriority.HIGH);
  167.     
  168.     UtilsForTests.configureWaitingJobConf(job1, 
  169.         new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output3"), 30, 0, 
  170.         "test-recovery-manager", signalFile, signalFile);
  171.     
  172.     // submit the faulty job
  173.     JobClient jc = new JobClient(job1);
  174.     RunningJob rJob1 = jc.submitJob(job1);
  175.     LOG.info("Submitted first job " + rJob1.getID());
  176.     
  177.     while (rJob1.mapProgress() < 0.5f) {
  178.       LOG.info("Waiting for job " + rJob1.getID() + " to be 50% done");
  179.       UtilsForTests.waitFor(100);
  180.     }
  181.     
  182.     // now submit job2
  183.     JobConf job2 = mr.createJobConf();
  184.     String signalFile1 = new Path(TEST_DIR, "signal1").toString();
  185.     UtilsForTests.configureWaitingJobConf(job2, 
  186.         new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output4"), 20, 0, 
  187.         "test-recovery-manager", signalFile1, signalFile1);
  188.     
  189.     // submit the job
  190.     RunningJob rJob2 = (new JobClient(job2)).submitJob(job2);
  191.     LOG.info("Submitted job " + rJob2.getID());
  192.     
  193.     // wait for it to init
  194.     JobInProgress jip = jobtracker.getJob(rJob2.getID());
  195.     
  196.     while (!jip.inited()) {
  197.       LOG.info("Waiting for job " + jip.getJobID() + " to be inited");
  198.       UtilsForTests.waitFor(100);
  199.     }
  200.     
  201.     // now submit job3 with inappropriate acls
  202.     JobConf job3 = mr.createJobConf();
  203.     job3.set("hadoop.job.ugi","abc,users");
  204.     UtilsForTests.configureWaitingJobConf(job3, 
  205.         new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output5"), 1, 0, 
  206.         "test-recovery-manager", signalFile, signalFile);
  207.     
  208.     // submit the job
  209.     RunningJob rJob3 = (new JobClient(job3)).submitJob(job3);
  210.     LOG.info("Submitted job " + rJob3.getID() + " with different user");
  211.     
  212.     jip = jobtracker.getJob(rJob3.getID());
  213.     while (!jip.inited()) {
  214.       LOG.info("Waiting for job " + jip.getJobID() + " to be inited");
  215.       UtilsForTests.waitFor(100);
  216.     }
  217.     // kill the jobtracker
  218.     LOG.info("Stopping jobtracker");
  219.     mr.stopJobTracker();
  220.     
  221.     // make sure that the jobtracker is in recovery mode
  222.     mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", 
  223.                                       true);
  224.     mr.getJobTrackerConf().setInt("mapred.jobtracker.maxtasks.per.job", 25);
  225.     
  226.     mr.getJobTrackerConf().setBoolean("mapred.acls.enabled" , true);
  227.     UserGroupInformation ugi = UserGroupInformation.readFrom(job1);
  228.     mr.getJobTrackerConf().set("mapred.queue.default.acl-submit-job", 
  229.                                ugi.getUserName());
  230.     // start the jobtracker
  231.     LOG.info("Starting jobtracker");
  232.     mr.startJobTracker();
  233.     UtilsForTests.waitForJobTracker(jc);
  234.     
  235.     jobtracker = mr.getJobTrackerRunner().getJobTracker();
  236.     
  237.     // assert that job2 is recovered by the jobtracker as job1 would fail
  238.     assertEquals("Recovery manager failed to tolerate job failures",
  239.                  2, jobtracker.getAllJobs().length);
  240.     
  241.     // check if the job#1 has failed
  242.     JobStatus status = jobtracker.getJobStatus(rJob1.getID());
  243.     assertEquals("Faulty job not failed", 
  244.                  JobStatus.FAILED, status.getRunState());
  245.     
  246.     jip = jobtracker.getJob(rJob2.getID());
  247.     assertFalse("Job should be running", jip.isComplete());
  248.     
  249.     status = jobtracker.getJobStatus(rJob3.getID());
  250.     assertNull("Job should be missing", status);
  251.     
  252.     mr.shutdown();
  253.   }
  254.   
  255.   /**
  256.    * Test if restart count of the jobtracker is correctly managed.
  257.    * Steps are as follows :
  258.    *   - start the jobtracker and check if the info file gets created.
  259.    *   - stops the jobtracker, deletes the jobtracker.info file and checks if
  260.    *     upon restart the recovery is 'off'
  261.    *   - submit a job to the jobtracker.
  262.    *   - restart the jobtracker k times and check if the restart count on ith 
  263.    *     iteration is i.
  264.    *   - submit a new job and check if its restart count is 0.
  265.    *   - garble the jobtracker.info file and restart he jobtracker, the 
  266.    *     jobtracker should crash.
  267.    */
  268.   public void testRestartCount() throws Exception {
  269.     LOG.info("Testing restart-count");
  270.     String signalFile = new Path(TEST_DIR, "signal").toString();
  271.     
  272.     // clean up
  273.     FileSystem fs = FileSystem.get(new Configuration());
  274.     fs.delete(TEST_DIR, true);
  275.     
  276.     JobConf conf = new JobConf();
  277.     conf.set("mapred.jobtracker.job.history.block.size", "1024");
  278.     conf.set("mapred.jobtracker.job.history.buffer.size", "1024");
  279.     conf.setBoolean("mapred.jobtracker.restart.recover", true);
  280.     // since there is no need for initing
  281.     conf.setClass("mapred.jobtracker.taskScheduler", MyScheduler.class,
  282.                   TaskScheduler.class);
  283.     
  284.     MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
  285.     JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
  286.     JobClient jc = new JobClient(mr.createJobConf());
  287.     // check if the jobtracker info file exists
  288.     Path infoFile = jobtracker.recoveryManager.getRestartCountFile();
  289.     assertTrue("Jobtracker infomation is missing", fs.exists(infoFile));
  290.     // check if garbling the system files disables the recovery process
  291.     LOG.info("Stopping jobtracker for testing with system files deleted");
  292.     mr.stopJobTracker();
  293.     
  294.     // delete the info file
  295.     Path rFile = jobtracker.recoveryManager.getRestartCountFile();
  296.     fs.delete(rFile,false);
  297.     
  298.     // start the jobtracker
  299.     LOG.info("Stopping jobtracker with system files deleted");
  300.     mr.startJobTracker();
  301.     
  302.     UtilsForTests.waitForJobTracker(jc);
  303.     jobtracker = mr.getJobTrackerRunner().getJobTracker();
  304.     // check if the recovey is disabled
  305.     assertFalse("Recovery is not disabled upon missing system files", 
  306.                 jobtracker.recoveryManager.shouldRecover());
  307.     // check if the system dir is sane
  308.     assertTrue("Recovery file is missing upon restart", fs.exists(rFile));
  309.     Path tFile = jobtracker.recoveryManager.getTempRestartCountFile();
  310.     assertFalse("Temp recovery file exists upon restart", fs.exists(tFile));
  311.     // submit a job
  312.     JobConf job = mr.createJobConf();
  313.     
  314.     UtilsForTests.configureWaitingJobConf(job, 
  315.         new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output6"), 2, 0, 
  316.         "test-recovery-manager", signalFile, signalFile);
  317.     
  318.     // submit the faulty job
  319.     RunningJob rJob = jc.submitJob(job);
  320.     LOG.info("Submitted first job " + rJob.getID());
  321.     // kill the jobtracker multiple times and check if the count is correct
  322.     for (int i = 1; i <= 5; ++i) {
  323.       LOG.info("Stopping jobtracker for " + i + " time");
  324.       mr.stopJobTracker();
  325.       
  326.       // start the jobtracker
  327.       LOG.info("Starting jobtracker for " + i + " time");
  328.       mr.startJobTracker();
  329.       
  330.       UtilsForTests.waitForJobTracker(jc);
  331.       
  332.       // check if the system dir is sane
  333.       assertTrue("Recovery file is missing upon restart", fs.exists(rFile));
  334.       assertFalse("Temp recovery file exists upon restart", fs.exists(tFile));
  335.       
  336.       jobtracker = mr.getJobTrackerRunner().getJobTracker();
  337.       JobInProgress jip = jobtracker.getJob(rJob.getID());
  338.       
  339.       // assert if restart count is correct
  340.       assertEquals("Recovery manager failed to recover restart count",
  341.                    i, jip.getNumRestarts());
  342.     }
  343.     
  344.     // kill the old job
  345.     rJob.killJob();
  346.     // II. Submit a new job and check if the restart count is 0
  347.     JobConf job1 = mr.createJobConf();
  348.     
  349.     UtilsForTests.configureWaitingJobConf(job1, 
  350.         new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output7"), 50, 0, 
  351.         "test-recovery-manager", signalFile, signalFile);
  352.     
  353.     // make sure that the job id's dont clash
  354.     jobtracker.getNewJobId();
  355.     // submit a new job
  356.     rJob = jc.submitJob(job1);
  357.     LOG.info("Submitted first job after restart" + rJob.getID());
  358.     // assert if restart count is correct
  359.     JobInProgress jip = jobtracker.getJob(rJob.getID());
  360.     assertEquals("Restart count for new job is incorrect",
  361.                  0, jip.getNumRestarts());
  362.     LOG.info("Stopping jobtracker for testing the fs errors");
  363.     mr.stopJobTracker();
  364.     // check if system.dir problems in recovery kills the jobtracker
  365.     fs.delete(rFile, false);
  366.     FSDataOutputStream out = fs.create(rFile);
  367.     out.writeBoolean(true);
  368.     out.close();
  369.     // start the jobtracker
  370.     LOG.info("Starting jobtracker with fs errors");
  371.     mr.startJobTracker();
  372.     JobTrackerRunner runner = mr.getJobTrackerRunner();
  373.     assertFalse("Restart count for new job is incorrect", runner.isActive());
  374.     mr.shutdown();
  375.   } 
  376. }