TestJobTrackerRestart.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:18k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import org.apache.hadoop.conf.Configuration;
  20. import org.apache.hadoop.fs.*;
  21. import org.apache.hadoop.hdfs.MiniDFSCluster;
  22. import org.apache.hadoop.mapred.UtilsForTests;
  23. import org.apache.hadoop.security.UserGroupInformation;
  24. import junit.framework.TestCase;
  25. import java.io.*;
  26. import java.util.ArrayList;
  27. import java.util.List;
  28. /** 
  29.  * TestJobTrackerRestart checks if the jobtracker can restart. JobTracker 
  30.  * should be able to continue running the previously running jobs and also 
  31.  * recover previosuly submitted jobs.
  32.  */
  33. public class TestJobTrackerRestart extends TestCase {
  34.   final Path testDir = 
  35.     new Path(System.getProperty("test.build.data","/tmp"), 
  36.              "jt-restart-testing");
  37.   final Path inDir = new Path(testDir, "input");
  38.   final Path shareDir = new Path(testDir, "share");
  39.   final Path outputDir = new Path(testDir, "output");
  40.   private static int numJobsSubmitted = 0;
  41.   
  42.   /**
  43.    * Return the job conf configured with the priorities and mappers as passed.
  44.    * @param conf The default conf
  45.    * @param priorities priorities for the jobs
  46.    * @param numMaps number of maps for the jobs
  47.    * @param numReds number of reducers for the jobs
  48.    * @param outputDir output dir
  49.    * @param inDir input dir
  50.    * @param mapSignalFile filename thats acts as a signal for maps
  51.    * @param reduceSignalFile filename thats acts as a signal for reducers
  52.    * @return a array of jobconfs configured as needed
  53.    * @throws IOException
  54.    */
  55.   private static JobConf[] getJobs(JobConf conf, JobPriority[] priorities, 
  56.                            int[] numMaps, int[] numReds,
  57.                            Path outputDir, Path inDir,
  58.                            String mapSignalFile, String reduceSignalFile) 
  59.   throws IOException {
  60.     JobConf[] jobs = new JobConf[priorities.length];
  61.     for (int i = 0; i < jobs.length; ++i) {
  62.       jobs[i] = new JobConf(conf);
  63.       Path newOutputDir = outputDir.suffix(String.valueOf(numJobsSubmitted++));
  64.       UtilsForTests.configureWaitingJobConf(jobs[i], inDir, newOutputDir, 
  65.           numMaps[i], numReds[i], "jt restart test job", mapSignalFile, 
  66.           reduceSignalFile);
  67.       jobs[i].setJobPriority(priorities[i]);
  68.     }
  69.     return jobs;
  70.   }
  71.   /**
  72.    * Clean up the signals.
  73.    */
  74.   private static void cleanUp(FileSystem fileSys, Path dir) throws IOException {
  75.     // Delete the map signal file
  76.     fileSys.delete(new Path(getMapSignalFile(dir)), false);
  77.     // Delete the reduce signal file
  78.     fileSys.delete(new Path(getReduceSignalFile(dir)), false);
  79.   }
  80.   
  81.  /**
  82.    * Tests the jobtracker with restart-recovery turned off.
  83.    * Submit a job with normal priority, maps = 2, reducers = 0}
  84.    * 
  85.    * Wait for the job to complete 50%
  86.    * 
  87.    * Restart the jobtracker with recovery turned off
  88.    * 
  89.    * Check if the job is missing
  90.    */
  91.   public void testRestartWithoutRecovery(MiniDFSCluster dfs, 
  92.                                          MiniMRCluster mr) 
  93.   throws IOException {
  94.     // III. Test a job with waiting mapper and recovery turned off
  95.     
  96.     FileSystem fileSys = dfs.getFileSystem();
  97.     
  98.     cleanUp(fileSys, shareDir);
  99.     
  100.     JobConf newConf = getJobs(mr.createJobConf(), 
  101.                               new JobPriority[] {JobPriority.NORMAL}, 
  102.                               new int[] {2}, new int[] {0},
  103.                               outputDir, inDir, 
  104.                               getMapSignalFile(shareDir), 
  105.                               getReduceSignalFile(shareDir))[0];
  106.     
  107.     JobClient jobClient = new JobClient(newConf);
  108.     RunningJob job = jobClient.submitJob(newConf);
  109.     JobID id = job.getID();
  110.     
  111.     //  make sure that the job is 50% completed
  112.     while (UtilsForTests.getJobStatus(jobClient, id).mapProgress() < 0.5f) {
  113.       UtilsForTests.waitFor(100);
  114.     }
  115.     
  116.     mr.stopJobTracker();
  117.     
  118.     // Turn off the recovery
  119.     mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", 
  120.                                       false);
  121.     
  122.     // Wait for a minute before submitting a job
  123.     UtilsForTests.waitFor(60 * 1000);
  124.     
  125.     mr.startJobTracker();
  126.     
  127.     // Signal the tasks
  128.     UtilsForTests.signalTasks(dfs, fileSys, true, getMapSignalFile(shareDir), 
  129.                               getReduceSignalFile(shareDir));
  130.     
  131.     // Wait for the JT to be ready
  132.     UtilsForTests.waitForJobTracker(jobClient);
  133.     
  134.     UtilsForTests.waitTillDone(jobClient);
  135.     
  136.     // The submitted job should not exist
  137.     assertTrue("Submitted job was detected with recovery disabled", 
  138.                UtilsForTests.getJobStatus(jobClient, id) == null);
  139.   }
  140.   /** Tests a job on jobtracker with restart-recovery turned on.
  141.    * Preparation :
  142.    *    - Configure a job with
  143.    *       - num-maps : 50
  144.    *       - num-reducers : 1
  145.    *    - Configure the cluster to run 1 reducer
  146.    *    - Lower the history file block size and buffer
  147.    *    
  148.    * Wait for the job to complete 50%. Note that all the job is configured to 
  149.    * use {@link HalfWaitingMapper} and {@link WaitingReducer}. So job will 
  150.    * eventually wait on 50%
  151.    * 
  152.    * Make a note of the following things
  153.    *    - Task completion events
  154.    *    - Cluster status
  155.    *    - Task Reports
  156.    *    - Job start time
  157.    *    
  158.    * Restart the jobtracker
  159.    * 
  160.    * Wait for job to finish all the maps and note the TaskCompletion events at
  161.    * the tracker.
  162.    * 
  163.    * Wait for all the jobs to finish and note the following
  164.    *    - New task completion events at the jobtracker
  165.    *    - Task reports
  166.    *    - Cluster status
  167.    * 
  168.    * Check for the following
  169.    *    - Task completion events for recovered tasks should match 
  170.    *    - Task completion events at the tasktracker and the restarted 
  171.    *      jobtracker should be same
  172.    *    - Cluster status should be fine.
  173.    *    - Task Reports for recovered tasks should match
  174.    *      Checks
  175.    *        - start time
  176.    *        - finish time
  177.    *        - counters
  178.    *        - http-location
  179.    *        - task-id
  180.    *    - Job start time should match
  181.    *    - Check if the counters can be accessed
  182.    *    - Check if the history files are (re)named properly
  183.    */
  184.   public void testTaskEventsAndReportsWithRecovery(MiniDFSCluster dfs, 
  185.                                                    MiniMRCluster mr) 
  186.   throws IOException {
  187.     // II. Test a tasktracker with waiting mapper and recovery turned on.
  188.     //     Ideally the tracker should SYNC with the new/restarted jobtracker
  189.     
  190.     FileSystem fileSys = dfs.getFileSystem();
  191.     final int numMaps = 50;
  192.     final int numReducers = 1;
  193.     
  194.     
  195.     cleanUp(fileSys, shareDir);
  196.     
  197.     JobConf newConf = getJobs(mr.createJobConf(), 
  198.                               new JobPriority[] {JobPriority.NORMAL}, 
  199.                               new int[] {numMaps}, new int[] {numReducers},
  200.                               outputDir, inDir, 
  201.                               getMapSignalFile(shareDir), 
  202.                               getReduceSignalFile(shareDir))[0];
  203.     
  204.     JobClient jobClient = new JobClient(newConf);
  205.     RunningJob job = jobClient.submitJob(newConf);
  206.     JobID id = job.getID();
  207.     
  208.     // change the job priority
  209.     mr.setJobPriority(id, JobPriority.HIGH);
  210.     
  211.     mr.initializeJob(id);
  212.     
  213.     //  make sure that atleast on reducer is spawned
  214.     while (jobClient.getClusterStatus().getReduceTasks() == 0) {
  215.       UtilsForTests.waitFor(100);
  216.     }
  217.     
  218.     while(true) {
  219.       // Since we are using a half waiting mapper, maps should be stuck at 50%
  220.       TaskCompletionEvent[] trackerEvents = 
  221.         mr.getMapTaskCompletionEventsUpdates(0, id, numMaps)
  222.           .getMapTaskCompletionEvents();
  223.       if (trackerEvents.length < numMaps / 2) {
  224.         UtilsForTests.waitFor(1000);
  225.       } else {
  226.         break;
  227.       }
  228.     }
  229.     
  230.     TaskCompletionEvent[] prevEvents = 
  231.       mr.getTaskCompletionEvents(id, 0, numMaps);
  232.     TaskReport[] prevSetupReports = jobClient.getSetupTaskReports(id);
  233.     TaskReport[] prevMapReports = jobClient.getMapTaskReports(id);
  234.     ClusterStatus prevStatus = jobClient.getClusterStatus();
  235.     
  236.     mr.stopJobTracker();
  237.     
  238.     // Turn off the recovery
  239.     mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", 
  240.                                       true);
  241.     
  242.     //  Wait for a minute before submitting a job
  243.     UtilsForTests.waitFor(60 * 1000);
  244.     
  245.     mr.startJobTracker();
  246.     
  247.     // Signal the map tasks
  248.     UtilsForTests.signalTasks(dfs, fileSys, true, getMapSignalFile(shareDir), 
  249.                               getReduceSignalFile(shareDir));
  250.     
  251.     // Wait for the JT to be ready
  252.     UtilsForTests.waitForJobTracker(jobClient);
  253.     
  254.     int numToMatch = mr.getNumEventsRecovered() / 2;
  255.     
  256.     //  make sure that the maps are completed
  257.     while (UtilsForTests.getJobStatus(jobClient, id).mapProgress() < 1.0f) {
  258.       UtilsForTests.waitFor(100);
  259.     }
  260.     
  261.     // Get the new jobtrackers events
  262.     TaskCompletionEvent[] jtEvents =  
  263.       mr.getTaskCompletionEvents(id, 0, 2 * numMaps);
  264.     
  265.     // Test if all the events that were recovered match exactly
  266.     testTaskCompletionEvents(prevEvents, jtEvents, false, numToMatch);
  267.     
  268.     // Check the task reports
  269.     // The reports should match exactly if the attempts are same
  270.     TaskReport[] afterMapReports = jobClient.getMapTaskReports(id);
  271.     TaskReport[] afterSetupReports = jobClient.getSetupTaskReports(id);
  272.     testTaskReports(prevMapReports, afterMapReports, numToMatch - 1);
  273.     testTaskReports(prevSetupReports, afterSetupReports, 1);
  274.     
  275.     // check the job priority
  276.     assertEquals("Job priority change is not reflected", 
  277.                  JobPriority.HIGH, mr.getJobPriority(id));
  278.     
  279.     List<TaskCompletionEvent> jtMapEvents =
  280.       new ArrayList<TaskCompletionEvent>();
  281.     for (TaskCompletionEvent tce : jtEvents) {
  282.       if (tce.isMapTask()) {
  283.         jtMapEvents.add(tce);
  284.       }
  285.     }
  286.    
  287.     TaskCompletionEvent[] trackerEvents; 
  288.     while(true) {
  289.      // Wait for the tracker to pull all the map events
  290.      trackerEvents =
  291.        mr.getMapTaskCompletionEventsUpdates(0, id, jtMapEvents.size())
  292.          .getMapTaskCompletionEvents();
  293.      if (trackerEvents.length < jtMapEvents.size()) {
  294.        UtilsForTests.waitFor(1000);
  295.      } else {
  296.        break;
  297.      }
  298.    }
  299.     //  Signal the reduce tasks
  300.     UtilsForTests.signalTasks(dfs, fileSys, false, getMapSignalFile(shareDir), 
  301.                               getReduceSignalFile(shareDir));
  302.     
  303.     UtilsForTests.waitTillDone(jobClient);
  304.     
  305.     testTaskCompletionEvents(jtMapEvents.toArray(new TaskCompletionEvent[0]), 
  306.                               trackerEvents, true, -1);
  307.     
  308.     // validate the history file
  309.     TestJobHistory.validateJobHistoryFileFormat(id, newConf, "SUCCESS", true);
  310.     TestJobHistory.validateJobHistoryFileContent(mr, job, newConf);
  311.     
  312.     // check if the cluster status is insane
  313.     ClusterStatus status = jobClient.getClusterStatus();
  314.     assertTrue("Cluster status is insane", 
  315.                checkClusterStatusOnCompletion(status, prevStatus));
  316.   }
  317.   
  318.   /**
  319.    * Checks if the history files are as expected
  320.    * @param id job id
  321.    * @param conf job conf
  322.    */
  323.   private void testJobHistoryFiles(JobID id, JobConf conf) 
  324.   throws IOException  {
  325.     // Get the history files for users
  326.     String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id);
  327.     String tempLogFileName = 
  328.       JobHistory.JobInfo.getSecondaryJobHistoryFile(logFileName);
  329.     
  330.     // I. User files
  331.     Path logFile = 
  332.       JobHistory.JobInfo.getJobHistoryLogLocationForUser(logFileName, conf);
  333.     FileSystem fileSys = logFile.getFileSystem(conf);
  334.     
  335.     // Check if the history file exists
  336.     assertTrue("User log file does not exist", fileSys.exists(logFile));
  337.     
  338.     // Check if the temporary file is deleted
  339.     Path tempLogFile = 
  340.       JobHistory.JobInfo.getJobHistoryLogLocationForUser(tempLogFileName, 
  341.                                                          conf);
  342.     assertFalse("User temporary log file exists", fileSys.exists(tempLogFile));
  343.     
  344.     // II. Framework files
  345.     // Get the history file
  346.     logFile = JobHistory.JobInfo.getJobHistoryLogLocation(logFileName);
  347.     fileSys = logFile.getFileSystem(conf);
  348.     
  349.     // Check if the history file exists
  350.     assertTrue("Log file does not exist", fileSys.exists(logFile));
  351.     
  352.     // Check if the temporary file is deleted
  353.     tempLogFile = JobHistory.JobInfo.getJobHistoryLogLocation(tempLogFileName);
  354.     assertFalse("Temporary log file exists", fileSys.exists(tempLogFile));
  355.   }
  356.   
  357.   /**
  358.    * Matches specified number of task reports.
  359.    * @param source the reports to be matched
  360.    * @param target reports to match with
  361.    * @param numToMatch num reports to match
  362.    * @param mismatchSet reports that should not match
  363.    */
  364.   private void testTaskReports(TaskReport[] source, TaskReport[] target, 
  365.                                int numToMatch) {
  366.     for (int i = 0; i < numToMatch; ++i) {
  367.       // Check if the task reports was recovered correctly
  368.       assertTrue("Task reports for same attempt has changed", 
  369.                  source[i].equals(target[i]));
  370.     }
  371.   }
  372.   
  373.   /**
  374.    * Matches the task completion events.
  375.    * @param source the events to be matched
  376.    * @param target events to match with
  377.    * @param fullMatch whether to match the events completely or partially
  378.    * @param numToMatch number of events to match in case full match is not 
  379.    *        desired
  380.    * @param ignoreSet a set of taskids to ignore
  381.    */
  382.   private void testTaskCompletionEvents(TaskCompletionEvent[] source, 
  383.                                        TaskCompletionEvent[] target, 
  384.                                        boolean fullMatch,
  385.                                        int numToMatch) {
  386.     //  Check if the event list size matches
  387.     // The lengths should match only incase of full match
  388.     if (fullMatch) {
  389.       assertEquals("Map task completion events mismatch", 
  390.                    source.length, target.length);
  391.       numToMatch = source.length;
  392.     }
  393.     // Check if the events match
  394.     for (int i = 0; i < numToMatch; ++i) {
  395.       if (source[i].getTaskAttemptId().equals(target[i].getTaskAttemptId())){
  396.         assertTrue("Map task completion events ordering mismatch", 
  397.                    source[i].equals(target[i]));
  398.       }
  399.     }
  400.   }
  401.   
  402.   private boolean checkClusterStatusOnCompletion(ClusterStatus status, 
  403.                                                  ClusterStatus prevStatus) {
  404.     return status.getJobTrackerState() == prevStatus.getJobTrackerState()
  405.            && status.getMapTasks() == 0
  406.            && status.getReduceTasks() == 0;
  407.   }
  408.   
  409.   public void testJobTrackerRestart() throws IOException {
  410.     String namenode = null;
  411.     MiniDFSCluster dfs = null;
  412.     MiniMRCluster mr = null;
  413.     FileSystem fileSys = null;
  414.     try {
  415.       Configuration conf = new Configuration();
  416.       conf.setBoolean("dfs.replication.considerLoad", false);
  417.       dfs = new MiniDFSCluster(conf, 1, true, null, null);
  418.       dfs.waitActive();
  419.       fileSys = dfs.getFileSystem();
  420.       
  421.       // clean up
  422.       fileSys.delete(testDir, true);
  423.       
  424.       if (!fileSys.mkdirs(inDir)) {
  425.         throw new IOException("Mkdirs failed to create " + inDir.toString());
  426.       }
  427.       // Write the input file
  428.       UtilsForTests.writeFile(dfs.getNameNode(), conf, 
  429.                               new Path(inDir + "/file"), (short)1);
  430.       dfs.startDataNodes(conf, 1, true, null, null, null, null);
  431.       dfs.waitActive();
  432.       namenode = (dfs.getFileSystem()).getUri().getHost() + ":" 
  433.                  + (dfs.getFileSystem()).getUri().getPort();
  434.       // Make sure that jobhistory leads to a proper job restart
  435.       // So keep the blocksize and the buffer size small
  436.       JobConf jtConf = new JobConf();
  437.       jtConf.set("mapred.jobtracker.job.history.block.size", "1024");
  438.       jtConf.set("mapred.jobtracker.job.history.buffer.size", "1024");
  439.       jtConf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1);
  440.       jtConf.setLong("mapred.tasktracker.expiry.interval", 25 * 1000);
  441.       jtConf.setBoolean("mapred.acls.enabled", true);
  442.       // get the user group info
  443.       UserGroupInformation ugi = UserGroupInformation.getCurrentUGI();
  444.       jtConf.set("mapred.queue.default.acl-submit-job", ugi.getUserName());
  445.       
  446.       mr = new MiniMRCluster(1, namenode, 1, null, null, jtConf);
  447.       
  448.       // Test the tasktracker SYNC
  449.       testTaskEventsAndReportsWithRecovery(dfs, mr);
  450.       
  451.       // Test jobtracker with restart-recovery turned off
  452.       testRestartWithoutRecovery(dfs, mr);
  453.     } finally {
  454.       if (mr != null) {
  455.         try {
  456.           mr.shutdown();
  457.         } catch (Exception e) {}
  458.       }
  459.       if (dfs != null) {
  460.         try {
  461.           dfs.shutdown();
  462.         } catch (Exception e) {}
  463.       }
  464.     }
  465.   }
  466.   private static String getMapSignalFile(Path dir) {
  467.     return (new Path(dir, "jt-restart-map-signal")).toString();
  468.   }
  469.   private static String getReduceSignalFile(Path dir) {
  470.     return (new Path(dir, "jt-restart-reduce-signal")).toString();
  471.   }
  472.   
  473.   public static void main(String[] args) throws IOException {
  474.     new TestJobTrackerRestart().testJobTrackerRestart();
  475.   }
  476. }