TestJobTrackerSafeMode.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:10k
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.apache.hadoop.mapred;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.*;
- import org.apache.hadoop.hdfs.MiniDFSCluster;
- import junit.framework.TestCase;
- import java.io.*;
- import java.util.HashSet;
- import java.util.Set;
- /**
- * This test checks jobtracker in safe mode. In safe mode the jobtracker upon
- * restart doesnt schedule any new tasks and waits for the (old) trackers to
- * join back.
- */
- public class TestJobTrackerSafeMode extends TestCase {
- final Path testDir =
- new Path(System.getProperty("test.build.data", "/tmp"), "jt-safemode");
- final Path inDir = new Path(testDir, "input");
- final Path shareDir = new Path(testDir, "share");
- final Path outputDir = new Path(testDir, "output");
- final int numDir = 1;
- final int numTrackers = 2;
-
- private static final Log LOG =
- LogFactory.getLog(TestJobTrackerSafeMode.class);
-
- private JobConf configureJob(JobConf conf, int maps, int reduces,
- String mapSignal, String redSignal)
- throws IOException {
- UtilsForTests.configureWaitingJobConf(conf, inDir, outputDir,
- maps, reduces, "test-jobtracker-safemode",
- mapSignal, redSignal);
- return conf;
- }
-
- /**
- * Tests the jobtracker's safemode. The test is as follows :
- * - starts a cluster with 2 trackers
- * - submits a job with large (40) maps to make sure that all the trackers
- * are logged to the job history
- * - wait for the job to be 50% done
- * - stop the jobtracker
- * - wait for the trackers to be done with all the tasks
- * - kill a task tracker
- * - start the jobtracker
- * - start 2 more trackers
- * - now check that while all the tracker are detected (or lost) the
- * scheduling window is closed
- * - check that after all the trackers are recovered, scheduling is opened
- */
- private void testSafeMode(MiniDFSCluster dfs, MiniMRCluster mr)
- throws IOException {
- FileSystem fileSys = dfs.getFileSystem();
- JobConf jobConf = mr.createJobConf();
- String mapSignalFile = UtilsForTests.getMapSignalFile(shareDir);
- String redSignalFile = UtilsForTests.getReduceSignalFile(shareDir);
- JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
- int numTracker = jobtracker.getClusterStatus(false).getTaskTrackers();
-
- // Configure the jobs
- JobConf job = configureJob(jobConf, 40, 0, mapSignalFile, redSignalFile);
-
- fileSys.delete(shareDir, true);
-
- // Submit a master job
- JobClient jobClient = new JobClient(job);
- RunningJob rJob = jobClient.submitJob(job);
- JobID id = rJob.getID();
-
- // wait for the job to be inited
- mr.initializeJob(id);
-
- // Make sure that the master job is 50% completed
- while (UtilsForTests.getJobStatus(jobClient, id).mapProgress()
- < 0.5f) {
- LOG.info("Waiting for the job to be 50% done");
- UtilsForTests.waitFor(100);
- }
- // Kill the jobtracker
- mr.stopJobTracker();
- // Enable recovery on restart
- mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover",
- true);
-
- // Signal the maps to complete
- UtilsForTests.signalTasks(dfs, fileSys, true, mapSignalFile, redSignalFile);
-
- // Signal the reducers to complete
- UtilsForTests.signalTasks(dfs, fileSys, false, mapSignalFile,
- redSignalFile);
-
- // wait for the tasks to complete at the tracker
- Set<String> trackers = new HashSet<String>();
- for (int i = 0 ; i < numTracker; ++i) {
- TaskTracker t = mr.getTaskTrackerRunner(i).getTaskTracker();
- trackers.add(t.getName());
- int runningCount = t.getRunningTaskStatuses().size();
- while (runningCount != 0) {
- LOG.info("Waiting for tracker " + t.getName() + " to stabilize");
- UtilsForTests.waitFor(100);
- runningCount = 0;
- for (TaskStatus status : t.getRunningTaskStatuses()) {
- if (status.getIsMap()
- && (status.getRunState() == TaskStatus.State.UNASSIGNED
- || status.getRunState() == TaskStatus.State.RUNNING)) {
- ++runningCount;
- }
- }
- }
- }
- LOG.info("Trackers have stabilized");
-
- // Kill a tasktracker
- int trackerToKill = --numTracker;
- TaskTracker t = mr.getTaskTrackerRunner(trackerToKill).getTaskTracker();
-
- trackers.remove(t.getName()); // remove this from the set to check
-
- Set<String> lostTrackers = new HashSet<String>();
- lostTrackers.add(t.getName());
-
- // get the attempt-id's to ignore
- // stop the tracker
- LOG.info("Stopping tracker : " + t.getName());
- mr.getTaskTrackerRunner(trackerToKill).getTaskTracker().shutdown();
- mr.stopTaskTracker(trackerToKill);
- // Restart the jobtracker
- mr.startJobTracker();
- // Wait for the JT to be ready
- UtilsForTests.waitForJobTracker(jobClient);
- jobtracker = mr.getJobTrackerRunner().getJobTracker();
- // Start a tracker
- LOG.info("Start a new tracker");
- mr.startTaskTracker(null, null, ++numTracker, numDir);
-
- // Start a tracker
- LOG.info("Start a new tracker");
- mr.startTaskTracker(null, null, ++numTracker, numDir);
- // Check if the jobs are still running
-
- // Wait for the tracker to be lost
- boolean shouldSchedule = jobtracker.recoveryManager.shouldSchedule();
- while (!checkTrackers(jobtracker, trackers, lostTrackers)) {
- assertFalse("JobTracker has opened up scheduling before all the"
- + " trackers were recovered", shouldSchedule);
- UtilsForTests.waitFor(100);
-
- // snapshot jobtracker's scheduling status
- shouldSchedule = jobtracker.recoveryManager.shouldSchedule();
- }
- assertTrue("JobTracker hasnt opened up scheduling even all the"
- + " trackers were recovered",
- jobtracker.recoveryManager.shouldSchedule());
-
- assertEquals("Recovery manager is in inconsistent state",
- 0, jobtracker.recoveryManager.recoveredTrackers.size());
-
- // wait for the job to be complete
- UtilsForTests.waitTillDone(jobClient);
- }
- private boolean checkTrackers(JobTracker jobtracker, Set<String> present,
- Set<String> absent) {
- long jobtrackerRecoveryFinishTime =
- jobtracker.getStartTime() + jobtracker.getRecoveryDuration();
- for (String trackerName : present) {
- TaskTrackerStatus status = jobtracker.getTaskTracker(trackerName);
- // check if the status is present and also the tracker has contacted back
- // after restart
- if (status == null
- || status.getLastSeen() < jobtrackerRecoveryFinishTime) {
- return false;
- }
- }
- for (String trackerName : absent) {
- TaskTrackerStatus status = jobtracker.getTaskTracker(trackerName);
- // check if the status is still present
- if ( status != null) {
- return false;
- }
- }
- return true;
- }
- /**
- * Test {@link JobTracker}'s safe mode.
- */
- public void testJobTrackerSafeMode() throws IOException {
- String namenode = null;
- MiniDFSCluster dfs = null;
- MiniMRCluster mr = null;
- FileSystem fileSys = null;
- try {
- Configuration conf = new Configuration();
- conf.setBoolean("dfs.replication.considerLoad", false);
- dfs = new MiniDFSCluster(conf, 1, true, null, null);
- dfs.waitActive();
- fileSys = dfs.getFileSystem();
-
- // clean up
- fileSys.delete(testDir, true);
-
- if (!fileSys.mkdirs(inDir)) {
- throw new IOException("Mkdirs failed to create " + inDir.toString());
- }
- // Write the input file
- UtilsForTests.writeFile(dfs.getNameNode(), conf,
- new Path(inDir + "/file"), (short)1);
- dfs.startDataNodes(conf, 1, true, null, null, null, null);
- dfs.waitActive();
- namenode = (dfs.getFileSystem()).getUri().getHost() + ":"
- + (dfs.getFileSystem()).getUri().getPort();
- // Make sure that jobhistory leads to a proper job restart
- // So keep the blocksize and the buffer size small
- JobConf jtConf = new JobConf();
- jtConf.set("mapred.jobtracker.job.history.block.size", "512");
- jtConf.set("mapred.jobtracker.job.history.buffer.size", "512");
- jtConf.setInt("mapred.tasktracker.map.tasks.maximum", 1);
- jtConf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1);
- jtConf.setLong("mapred.tasktracker.expiry.interval", 5000);
- jtConf.setInt("mapred.reduce.copy.backoff", 4);
- jtConf.setLong("mapred.job.reuse.jvm.num.tasks", -1);
-
- mr = new MiniMRCluster(numTrackers, namenode, numDir, null, null, jtConf);
-
- // Test Lost tracker case
- testSafeMode(dfs, mr);
- } finally {
- if (mr != null) {
- try {
- mr.shutdown();
- } catch (Exception e) {}
- }
- if (dfs != null) {
- try {
- dfs.shutdown();
- } catch (Exception e) {}
- }
- }
- }
- public static void main(String[] args) throws IOException {
- new TestJobTrackerSafeMode().testJobTrackerSafeMode();
- }
- }