TestJobTrackerRestart.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:18k
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.apache.hadoop.mapred;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.*;
- import org.apache.hadoop.hdfs.MiniDFSCluster;
- import org.apache.hadoop.mapred.UtilsForTests;
- import org.apache.hadoop.security.UserGroupInformation;
- import junit.framework.TestCase;
- import java.io.*;
- import java.util.ArrayList;
- import java.util.List;
- /**
- * TestJobTrackerRestart checks if the jobtracker can restart. JobTracker
- * should be able to continue running the previously running jobs and also
- * recover previosuly submitted jobs.
- */
- public class TestJobTrackerRestart extends TestCase {
- final Path testDir =
- new Path(System.getProperty("test.build.data","/tmp"),
- "jt-restart-testing");
- final Path inDir = new Path(testDir, "input");
- final Path shareDir = new Path(testDir, "share");
- final Path outputDir = new Path(testDir, "output");
- private static int numJobsSubmitted = 0;
-
- /**
- * Return the job conf configured with the priorities and mappers as passed.
- * @param conf The default conf
- * @param priorities priorities for the jobs
- * @param numMaps number of maps for the jobs
- * @param numReds number of reducers for the jobs
- * @param outputDir output dir
- * @param inDir input dir
- * @param mapSignalFile filename thats acts as a signal for maps
- * @param reduceSignalFile filename thats acts as a signal for reducers
- * @return a array of jobconfs configured as needed
- * @throws IOException
- */
- private static JobConf[] getJobs(JobConf conf, JobPriority[] priorities,
- int[] numMaps, int[] numReds,
- Path outputDir, Path inDir,
- String mapSignalFile, String reduceSignalFile)
- throws IOException {
- JobConf[] jobs = new JobConf[priorities.length];
- for (int i = 0; i < jobs.length; ++i) {
- jobs[i] = new JobConf(conf);
- Path newOutputDir = outputDir.suffix(String.valueOf(numJobsSubmitted++));
- UtilsForTests.configureWaitingJobConf(jobs[i], inDir, newOutputDir,
- numMaps[i], numReds[i], "jt restart test job", mapSignalFile,
- reduceSignalFile);
- jobs[i].setJobPriority(priorities[i]);
- }
- return jobs;
- }
- /**
- * Clean up the signals.
- */
- private static void cleanUp(FileSystem fileSys, Path dir) throws IOException {
- // Delete the map signal file
- fileSys.delete(new Path(getMapSignalFile(dir)), false);
- // Delete the reduce signal file
- fileSys.delete(new Path(getReduceSignalFile(dir)), false);
- }
-
- /**
- * Tests the jobtracker with restart-recovery turned off.
- * Submit a job with normal priority, maps = 2, reducers = 0}
- *
- * Wait for the job to complete 50%
- *
- * Restart the jobtracker with recovery turned off
- *
- * Check if the job is missing
- */
- public void testRestartWithoutRecovery(MiniDFSCluster dfs,
- MiniMRCluster mr)
- throws IOException {
- // III. Test a job with waiting mapper and recovery turned off
-
- FileSystem fileSys = dfs.getFileSystem();
-
- cleanUp(fileSys, shareDir);
-
- JobConf newConf = getJobs(mr.createJobConf(),
- new JobPriority[] {JobPriority.NORMAL},
- new int[] {2}, new int[] {0},
- outputDir, inDir,
- getMapSignalFile(shareDir),
- getReduceSignalFile(shareDir))[0];
-
- JobClient jobClient = new JobClient(newConf);
- RunningJob job = jobClient.submitJob(newConf);
- JobID id = job.getID();
-
- // make sure that the job is 50% completed
- while (UtilsForTests.getJobStatus(jobClient, id).mapProgress() < 0.5f) {
- UtilsForTests.waitFor(100);
- }
-
- mr.stopJobTracker();
-
- // Turn off the recovery
- mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover",
- false);
-
- // Wait for a minute before submitting a job
- UtilsForTests.waitFor(60 * 1000);
-
- mr.startJobTracker();
-
- // Signal the tasks
- UtilsForTests.signalTasks(dfs, fileSys, true, getMapSignalFile(shareDir),
- getReduceSignalFile(shareDir));
-
- // Wait for the JT to be ready
- UtilsForTests.waitForJobTracker(jobClient);
-
- UtilsForTests.waitTillDone(jobClient);
-
- // The submitted job should not exist
- assertTrue("Submitted job was detected with recovery disabled",
- UtilsForTests.getJobStatus(jobClient, id) == null);
- }
- /** Tests a job on jobtracker with restart-recovery turned on.
- * Preparation :
- * - Configure a job with
- * - num-maps : 50
- * - num-reducers : 1
- * - Configure the cluster to run 1 reducer
- * - Lower the history file block size and buffer
- *
- * Wait for the job to complete 50%. Note that all the job is configured to
- * use {@link HalfWaitingMapper} and {@link WaitingReducer}. So job will
- * eventually wait on 50%
- *
- * Make a note of the following things
- * - Task completion events
- * - Cluster status
- * - Task Reports
- * - Job start time
- *
- * Restart the jobtracker
- *
- * Wait for job to finish all the maps and note the TaskCompletion events at
- * the tracker.
- *
- * Wait for all the jobs to finish and note the following
- * - New task completion events at the jobtracker
- * - Task reports
- * - Cluster status
- *
- * Check for the following
- * - Task completion events for recovered tasks should match
- * - Task completion events at the tasktracker and the restarted
- * jobtracker should be same
- * - Cluster status should be fine.
- * - Task Reports for recovered tasks should match
- * Checks
- * - start time
- * - finish time
- * - counters
- * - http-location
- * - task-id
- * - Job start time should match
- * - Check if the counters can be accessed
- * - Check if the history files are (re)named properly
- */
- public void testTaskEventsAndReportsWithRecovery(MiniDFSCluster dfs,
- MiniMRCluster mr)
- throws IOException {
- // II. Test a tasktracker with waiting mapper and recovery turned on.
- // Ideally the tracker should SYNC with the new/restarted jobtracker
-
- FileSystem fileSys = dfs.getFileSystem();
- final int numMaps = 50;
- final int numReducers = 1;
-
-
- cleanUp(fileSys, shareDir);
-
- JobConf newConf = getJobs(mr.createJobConf(),
- new JobPriority[] {JobPriority.NORMAL},
- new int[] {numMaps}, new int[] {numReducers},
- outputDir, inDir,
- getMapSignalFile(shareDir),
- getReduceSignalFile(shareDir))[0];
-
- JobClient jobClient = new JobClient(newConf);
- RunningJob job = jobClient.submitJob(newConf);
- JobID id = job.getID();
-
- // change the job priority
- mr.setJobPriority(id, JobPriority.HIGH);
-
- mr.initializeJob(id);
-
- // make sure that atleast on reducer is spawned
- while (jobClient.getClusterStatus().getReduceTasks() == 0) {
- UtilsForTests.waitFor(100);
- }
-
- while(true) {
- // Since we are using a half waiting mapper, maps should be stuck at 50%
- TaskCompletionEvent[] trackerEvents =
- mr.getMapTaskCompletionEventsUpdates(0, id, numMaps)
- .getMapTaskCompletionEvents();
- if (trackerEvents.length < numMaps / 2) {
- UtilsForTests.waitFor(1000);
- } else {
- break;
- }
- }
-
- TaskCompletionEvent[] prevEvents =
- mr.getTaskCompletionEvents(id, 0, numMaps);
- TaskReport[] prevSetupReports = jobClient.getSetupTaskReports(id);
- TaskReport[] prevMapReports = jobClient.getMapTaskReports(id);
- ClusterStatus prevStatus = jobClient.getClusterStatus();
-
- mr.stopJobTracker();
-
- // Turn off the recovery
- mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover",
- true);
-
- // Wait for a minute before submitting a job
- UtilsForTests.waitFor(60 * 1000);
-
- mr.startJobTracker();
-
- // Signal the map tasks
- UtilsForTests.signalTasks(dfs, fileSys, true, getMapSignalFile(shareDir),
- getReduceSignalFile(shareDir));
-
- // Wait for the JT to be ready
- UtilsForTests.waitForJobTracker(jobClient);
-
- int numToMatch = mr.getNumEventsRecovered() / 2;
-
- // make sure that the maps are completed
- while (UtilsForTests.getJobStatus(jobClient, id).mapProgress() < 1.0f) {
- UtilsForTests.waitFor(100);
- }
-
- // Get the new jobtrackers events
- TaskCompletionEvent[] jtEvents =
- mr.getTaskCompletionEvents(id, 0, 2 * numMaps);
-
- // Test if all the events that were recovered match exactly
- testTaskCompletionEvents(prevEvents, jtEvents, false, numToMatch);
-
- // Check the task reports
- // The reports should match exactly if the attempts are same
- TaskReport[] afterMapReports = jobClient.getMapTaskReports(id);
- TaskReport[] afterSetupReports = jobClient.getSetupTaskReports(id);
- testTaskReports(prevMapReports, afterMapReports, numToMatch - 1);
- testTaskReports(prevSetupReports, afterSetupReports, 1);
-
- // check the job priority
- assertEquals("Job priority change is not reflected",
- JobPriority.HIGH, mr.getJobPriority(id));
-
- List<TaskCompletionEvent> jtMapEvents =
- new ArrayList<TaskCompletionEvent>();
- for (TaskCompletionEvent tce : jtEvents) {
- if (tce.isMapTask()) {
- jtMapEvents.add(tce);
- }
- }
-
- TaskCompletionEvent[] trackerEvents;
- while(true) {
- // Wait for the tracker to pull all the map events
- trackerEvents =
- mr.getMapTaskCompletionEventsUpdates(0, id, jtMapEvents.size())
- .getMapTaskCompletionEvents();
- if (trackerEvents.length < jtMapEvents.size()) {
- UtilsForTests.waitFor(1000);
- } else {
- break;
- }
- }
- // Signal the reduce tasks
- UtilsForTests.signalTasks(dfs, fileSys, false, getMapSignalFile(shareDir),
- getReduceSignalFile(shareDir));
-
- UtilsForTests.waitTillDone(jobClient);
-
- testTaskCompletionEvents(jtMapEvents.toArray(new TaskCompletionEvent[0]),
- trackerEvents, true, -1);
-
- // validate the history file
- TestJobHistory.validateJobHistoryFileFormat(id, newConf, "SUCCESS", true);
- TestJobHistory.validateJobHistoryFileContent(mr, job, newConf);
-
- // check if the cluster status is insane
- ClusterStatus status = jobClient.getClusterStatus();
- assertTrue("Cluster status is insane",
- checkClusterStatusOnCompletion(status, prevStatus));
- }
-
- /**
- * Checks if the history files are as expected
- * @param id job id
- * @param conf job conf
- */
- private void testJobHistoryFiles(JobID id, JobConf conf)
- throws IOException {
- // Get the history files for users
- String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id);
- String tempLogFileName =
- JobHistory.JobInfo.getSecondaryJobHistoryFile(logFileName);
-
- // I. User files
- Path logFile =
- JobHistory.JobInfo.getJobHistoryLogLocationForUser(logFileName, conf);
- FileSystem fileSys = logFile.getFileSystem(conf);
-
- // Check if the history file exists
- assertTrue("User log file does not exist", fileSys.exists(logFile));
-
- // Check if the temporary file is deleted
- Path tempLogFile =
- JobHistory.JobInfo.getJobHistoryLogLocationForUser(tempLogFileName,
- conf);
- assertFalse("User temporary log file exists", fileSys.exists(tempLogFile));
-
- // II. Framework files
- // Get the history file
- logFile = JobHistory.JobInfo.getJobHistoryLogLocation(logFileName);
- fileSys = logFile.getFileSystem(conf);
-
- // Check if the history file exists
- assertTrue("Log file does not exist", fileSys.exists(logFile));
-
- // Check if the temporary file is deleted
- tempLogFile = JobHistory.JobInfo.getJobHistoryLogLocation(tempLogFileName);
- assertFalse("Temporary log file exists", fileSys.exists(tempLogFile));
- }
-
- /**
- * Matches specified number of task reports.
- * @param source the reports to be matched
- * @param target reports to match with
- * @param numToMatch num reports to match
- * @param mismatchSet reports that should not match
- */
- private void testTaskReports(TaskReport[] source, TaskReport[] target,
- int numToMatch) {
- for (int i = 0; i < numToMatch; ++i) {
- // Check if the task reports was recovered correctly
- assertTrue("Task reports for same attempt has changed",
- source[i].equals(target[i]));
- }
- }
-
- /**
- * Matches the task completion events.
- * @param source the events to be matched
- * @param target events to match with
- * @param fullMatch whether to match the events completely or partially
- * @param numToMatch number of events to match in case full match is not
- * desired
- * @param ignoreSet a set of taskids to ignore
- */
- private void testTaskCompletionEvents(TaskCompletionEvent[] source,
- TaskCompletionEvent[] target,
- boolean fullMatch,
- int numToMatch) {
- // Check if the event list size matches
- // The lengths should match only incase of full match
- if (fullMatch) {
- assertEquals("Map task completion events mismatch",
- source.length, target.length);
- numToMatch = source.length;
- }
- // Check if the events match
- for (int i = 0; i < numToMatch; ++i) {
- if (source[i].getTaskAttemptId().equals(target[i].getTaskAttemptId())){
- assertTrue("Map task completion events ordering mismatch",
- source[i].equals(target[i]));
- }
- }
- }
-
- private boolean checkClusterStatusOnCompletion(ClusterStatus status,
- ClusterStatus prevStatus) {
- return status.getJobTrackerState() == prevStatus.getJobTrackerState()
- && status.getMapTasks() == 0
- && status.getReduceTasks() == 0;
- }
-
- public void testJobTrackerRestart() throws IOException {
- String namenode = null;
- MiniDFSCluster dfs = null;
- MiniMRCluster mr = null;
- FileSystem fileSys = null;
- try {
- Configuration conf = new Configuration();
- conf.setBoolean("dfs.replication.considerLoad", false);
- dfs = new MiniDFSCluster(conf, 1, true, null, null);
- dfs.waitActive();
- fileSys = dfs.getFileSystem();
-
- // clean up
- fileSys.delete(testDir, true);
-
- if (!fileSys.mkdirs(inDir)) {
- throw new IOException("Mkdirs failed to create " + inDir.toString());
- }
- // Write the input file
- UtilsForTests.writeFile(dfs.getNameNode(), conf,
- new Path(inDir + "/file"), (short)1);
- dfs.startDataNodes(conf, 1, true, null, null, null, null);
- dfs.waitActive();
- namenode = (dfs.getFileSystem()).getUri().getHost() + ":"
- + (dfs.getFileSystem()).getUri().getPort();
- // Make sure that jobhistory leads to a proper job restart
- // So keep the blocksize and the buffer size small
- JobConf jtConf = new JobConf();
- jtConf.set("mapred.jobtracker.job.history.block.size", "1024");
- jtConf.set("mapred.jobtracker.job.history.buffer.size", "1024");
- jtConf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1);
- jtConf.setLong("mapred.tasktracker.expiry.interval", 25 * 1000);
- jtConf.setBoolean("mapred.acls.enabled", true);
- // get the user group info
- UserGroupInformation ugi = UserGroupInformation.getCurrentUGI();
- jtConf.set("mapred.queue.default.acl-submit-job", ugi.getUserName());
-
- mr = new MiniMRCluster(1, namenode, 1, null, null, jtConf);
-
- // Test the tasktracker SYNC
- testTaskEventsAndReportsWithRecovery(dfs, mr);
-
- // Test jobtracker with restart-recovery turned off
- testRestartWithoutRecovery(dfs, mr);
- } finally {
- if (mr != null) {
- try {
- mr.shutdown();
- } catch (Exception e) {}
- }
- if (dfs != null) {
- try {
- dfs.shutdown();
- } catch (Exception e) {}
- }
- }
- }
- private static String getMapSignalFile(Path dir) {
- return (new Path(dir, "jt-restart-map-signal")).toString();
- }
- private static String getReduceSignalFile(Path dir) {
- return (new Path(dir, "jt-restart-reduce-signal")).toString();
- }
-
- public static void main(String[] args) throws IOException {
- new TestJobTrackerRestart().testJobTrackerRestart();
- }
- }