TestJobHistory.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:41k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.File;
  20. import java.io.IOException;
  21. import java.text.ParseException;
  22. import java.util.ArrayList;
  23. import java.util.List;
  24. import java.util.HashMap;
  25. import java.util.Map;
  26. import java.util.Iterator;
  27. import java.util.regex.Matcher;
  28. import java.util.regex.Pattern;
  29. import junit.framework.TestCase;
  30. import org.apache.hadoop.fs.FileSystem;
  31. import org.apache.hadoop.fs.Path;
  32. import org.apache.hadoop.fs.permission.FsPermission;
  33. import org.apache.hadoop.mapred.JobHistory.*;
  34. import org.apache.commons.logging.Log;
  35. import org.apache.commons.logging.LogFactory;
  36. /**
  37.  * Tests the JobHistory files - to catch any changes to JobHistory that can
  38.  * cause issues for the execution of JobTracker.RecoveryManager, HistoryViewer.
  39.  *
  40.  * testJobHistoryFile
  41.  * Run a job that will be succeeded and validate its history file format and
  42.  * content.
  43.  *
  44.  * testJobHistoryUserLogLocation
  45.  * Run jobs with the given values of hadoop.job.history.user.location as
  46.  *   (1)null(default case), (2)"none", and (3)some dir like "/tmp".
  47.  *   Validate user history file location in each case.
  48.  *
  49.  * testJobHistoryJobStatus
  50.  * Run jobs that will be (1) succeeded (2) failed (3) killed.
  51.  *   Validate job status read from history file in each case.
  52.  *
  53.  * Future changes to job history are to be reflected here in this file.
  54.  */
  55. public class TestJobHistory extends TestCase {
  56.    private static final Log LOG = LogFactory.getLog(TestJobHistory.class);
  57.  
  58.   private static String TEST_ROOT_DIR = new File(System.getProperty(
  59.       "test.build.data", "/tmp")).toURI().toString().replace(' ', '+');
  60.   private static final Pattern digitsPattern =
  61.                                      Pattern.compile(JobHistory.DIGITS);
  62.   // hostname like   /default-rack/host1.foo.com OR host1.foo.com
  63.   private static final Pattern hostNamePattern = Pattern.compile(
  64.                                        "(/(([\w\-\.]+)/)+)?([\w\-\.]+)");
  65.   private static final String IP_ADDR =
  66.                        "\d\d?\d?\.\d\d?\d?\.\d\d?\d?\.\d\d?\d?";
  67.   // hostname like   /default-rack/host1.foo.com OR host1.foo.com
  68.   private static final Pattern trackerNamePattern = Pattern.compile(
  69.                          "tracker_" + hostNamePattern + ":([\w\-\.]+)/" +
  70.                          IP_ADDR + ":" + JobHistory.DIGITS);
  71.   private static final Pattern splitsPattern = Pattern.compile(
  72.                               hostNamePattern + "(," + hostNamePattern + ")*");
  73.   private static Map<String, List<String>> taskIDsToAttemptIDs =
  74.                                      new HashMap<String, List<String>>();
  75.   //Each Task End seen from history file is added here
  76.   private static List<String> taskEnds = new ArrayList<String>();
  77.   // List of tasks that appear in history file after JT reatart. This is to
  78.   // allow START_TIME=0 for these tasks.
  79.   private static List<String> ignoreStartTimeOfTasks = new ArrayList<String>();
  80.   // List of potential tasks whose start time can be 0 because of JT restart
  81.   private static List<String> tempIgnoreStartTimeOfTasks = new ArrayList<String>();
  82.   /**
  83.    * Listener for history log file, it populates JobHistory.JobInfo
  84.    * object with data from log file and validates the data.
  85.    */
  86.   static class TestListener
  87.                     extends DefaultJobHistoryParser.JobTasksParseListener {
  88.     int lineNum;//line number of history log file
  89.     boolean isJobLaunched;
  90.     boolean isJTRestarted;
  91.     TestListener(JobInfo job) {
  92.       super(job);
  93.       lineNum = 0;
  94.       isJobLaunched = false;
  95.       isJTRestarted = false;
  96.     }
  97.     // TestListener implementation
  98.     public void handle(RecordTypes recType, Map<Keys, String> values)
  99.     throws IOException {
  100.       lineNum++;
  101.       // Check if the record is of type Meta
  102.       if (recType == JobHistory.RecordTypes.Meta) {
  103.         long version = Long.parseLong(values.get(Keys.VERSION));
  104.         assertTrue("Unexpected job history version ",
  105.                    (version >= 0 && version <= JobHistory.VERSION));
  106.       }
  107.       else if (recType.equals(RecordTypes.Job)) {
  108.         String jobid = values.get(Keys.JOBID);
  109.         assertTrue("record type 'Job' is seen without JOBID key" +
  110.          " in history file at line " + lineNum, jobid != null);
  111.         JobID id = JobID.forName(jobid);
  112.         assertTrue("JobID in history file is in unexpected format " +
  113.                   "at line " + lineNum, id != null);
  114.         String time = values.get(Keys.LAUNCH_TIME);
  115.         if (time != null) {
  116.           if (isJobLaunched) {
  117.             // We assume that if we see LAUNCH_TIME again, it is because of JT restart
  118.             isJTRestarted = true;
  119.           }
  120.           else {// job launched first time
  121.             isJobLaunched = true;
  122.           }
  123.         }
  124.         time = values.get(Keys.FINISH_TIME);
  125.         if (time != null) {
  126.           assertTrue ("Job FINISH_TIME is seen in history file at line " +
  127.                       lineNum + " before LAUNCH_TIME is seen", isJobLaunched);
  128.         }
  129.       }
  130.       else if (recType.equals(RecordTypes.Task)) {
  131.         String taskid = values.get(Keys.TASKID);
  132.         assertTrue("record type 'Task' is seen without TASKID key" +
  133.          " in history file at line " + lineNum, taskid != null);
  134.         TaskID id = TaskID.forName(taskid);
  135.         assertTrue("TaskID in history file is in unexpected format " +
  136.                   "at line " + lineNum, id != null);
  137.         
  138.         String time = values.get(Keys.START_TIME);
  139.         if (time != null) {
  140.           List<String> attemptIDs = taskIDsToAttemptIDs.get(taskid);
  141.           assertTrue("Duplicate START_TIME seen for task " + taskid +
  142.                      " in history file at line " + lineNum, attemptIDs == null);
  143.           attemptIDs = new ArrayList<String>();
  144.           taskIDsToAttemptIDs.put(taskid, attemptIDs);
  145.           if (isJTRestarted) {
  146.             // This maintains a potential ignoreStartTimeTasks list
  147.             tempIgnoreStartTimeOfTasks.add(taskid);
  148.           }
  149.         }
  150.         time = values.get(Keys.FINISH_TIME);
  151.         if (time != null) {
  152.           String s = values.get(Keys.TASK_STATUS);
  153.           if (s != null) {
  154.             List<String> attemptIDs = taskIDsToAttemptIDs.get(taskid);
  155.             assertTrue ("Task FINISH_TIME is seen in history file at line " +
  156.                     lineNum + " before START_TIME is seen", attemptIDs != null);
  157.             // Check if all the attemptIDs of this task are finished
  158.             assertTrue("TaskId " + taskid + " is finished at line " +
  159.                        lineNum + " but its attemptID is not finished.",
  160.                        (attemptIDs.size() <= 1));
  161.             // Check if at least 1 attempt of this task is seen
  162.             assertTrue("TaskId " + taskid + " is finished at line " +
  163.                        lineNum + " but no attemptID is seen before this.",
  164.                        attemptIDs.size() == 1);
  165.             if (s.equals("KILLED") || s.equals("FAILED")) {
  166.               // Task End with KILLED/FAILED status in history file is
  167.               // considered as TaskEnd, TaskStart. This is useful in checking
  168.               // the order of history lines.
  169.               attemptIDs = new ArrayList<String>();
  170.               taskIDsToAttemptIDs.put(taskid, attemptIDs);
  171.             }
  172.             else {
  173.               taskEnds.add(taskid);
  174.             }
  175.           }
  176.           else {
  177.             // This line of history file could be just an update to finish time
  178.           }
  179.         }
  180.       }
  181.       else if (recType.equals(RecordTypes.MapAttempt) ||
  182.                  recType.equals(RecordTypes.ReduceAttempt)) {
  183.         String taskid =  values.get(Keys.TASKID);
  184.         assertTrue("record type " + recType + " is seen without TASKID key" +
  185.          " in history file at line " + lineNum, taskid != null);
  186.         
  187.         String attemptId = values.get(Keys.TASK_ATTEMPT_ID);
  188.         TaskAttemptID id = TaskAttemptID.forName(attemptId);
  189.         assertTrue("AttemptID in history file is in unexpected format " +
  190.                    "at line " + lineNum, id != null);
  191.         
  192.         String time = values.get(Keys.START_TIME);
  193.         if (time != null) {
  194.           List<String> attemptIDs = taskIDsToAttemptIDs.get(taskid);
  195.           assertTrue ("TaskAttempt is seen in history file at line " + lineNum +
  196.                       " before Task is seen", attemptIDs != null);
  197.           assertFalse ("Duplicate TaskAttempt START_TIME is seen in history " +
  198.                       "file at line " + lineNum, attemptIDs.remove(attemptId));
  199.           if (attemptIDs.isEmpty()) {
  200.             //just a boolean whether any attempt is seen or not
  201.             attemptIDs.add("firstAttemptIsSeen");
  202.           }
  203.           attemptIDs.add(attemptId);
  204.           if (tempIgnoreStartTimeOfTasks.contains(taskid) &&
  205.               (id.getId() < 1000)) {
  206.             // If Task line of this attempt is seen in history file after
  207.             // JT restart and if this attempt is < 1000(i.e. attempt is noti
  208.             // started after JT restart) - assuming single JT restart happened
  209.             ignoreStartTimeOfTasks.add(taskid);
  210.           }
  211.         }
  212.         time = values.get(Keys.FINISH_TIME);
  213.         if (time != null) {
  214.           List<String> attemptIDs = taskIDsToAttemptIDs.get(taskid);
  215.           assertTrue ("TaskAttempt FINISH_TIME is seen in history file at line "
  216.                       + lineNum + " before Task is seen", attemptIDs != null);
  217.           assertTrue ("TaskAttempt FINISH_TIME is seen in history file at line "
  218.                       + lineNum + " before TaskAttempt START_TIME is seen",
  219.                       attemptIDs.remove(attemptId));
  220.         }
  221.       }
  222.       super.handle(recType, values);
  223.     }
  224.   }
  225.   // Check if the time is in the expected format
  226.   private static boolean isTimeValid(String time) {
  227.     Matcher m = digitsPattern.matcher(time);
  228.     return m.matches() && (Long.parseLong(time) > 0);
  229.   }
  230.   private static boolean areTimesInOrder(String time1, String time2) {
  231.     return (Long.parseLong(time1) <= Long.parseLong(time2));
  232.   }
  233.   // Validate Format of Job Level Keys, Values read from history file
  234.   private static void validateJobLevelKeyValuesFormat(Map<Keys, String> values,
  235.                                                       String status) {
  236.     String time = values.get(Keys.SUBMIT_TIME);
  237.     assertTrue("Job SUBMIT_TIME is in unexpected format:" + time +
  238.                " in history file", isTimeValid(time));
  239.     time = values.get(Keys.LAUNCH_TIME);
  240.     assertTrue("Job LAUNCH_TIME is in unexpected format:" + time +
  241.                " in history file", isTimeValid(time));
  242.     String time1 = values.get(Keys.FINISH_TIME);
  243.     assertTrue("Job FINISH_TIME is in unexpected format:" + time1 +
  244.                " in history file", isTimeValid(time1));
  245.     assertTrue("Job FINISH_TIME is < LAUNCH_TIME in history file",
  246.                areTimesInOrder(time, time1));
  247.     String stat = values.get(Keys.JOB_STATUS);
  248.     assertTrue("Unexpected JOB_STATUS "" + stat + "" is seen in" +
  249.                " history file", (status.equals(stat)));
  250.     String priority = values.get(Keys.JOB_PRIORITY);
  251.     assertTrue("Unknown priority for the job in history file",
  252.                (priority.equals("HIGH") ||
  253.                 priority.equals("LOW")  || priority.equals("NORMAL") ||
  254.                 priority.equals("VERY_HIGH") || priority.equals("VERY_LOW")));
  255.   }
  256.   // Validate Format of Task Level Keys, Values read from history file
  257.   private static void validateTaskLevelKeyValuesFormat(JobInfo job,
  258.                                   boolean splitsCanBeEmpty) {
  259.     Map<String, JobHistory.Task> tasks = job.getAllTasks();
  260.     // validate info of each task
  261.     for (JobHistory.Task task : tasks.values()) {
  262.       String tid = task.get(Keys.TASKID);
  263.       String time = task.get(Keys.START_TIME);
  264.       // We allow START_TIME=0 for tasks seen in history after JT restart
  265.       if (!ignoreStartTimeOfTasks.contains(tid) || (Long.parseLong(time) != 0)) {
  266.         assertTrue("Task START_TIME of " + tid + " is in unexpected format:" +
  267.                  time + " in history file", isTimeValid(time));
  268.       }
  269.       String time1 = task.get(Keys.FINISH_TIME);
  270.       assertTrue("Task FINISH_TIME of " + tid + " is in unexpected format:" +
  271.                  time1 + " in history file", isTimeValid(time1));
  272.       assertTrue("Task FINISH_TIME is < START_TIME in history file",
  273.                  areTimesInOrder(time, time1));
  274.       // Make sure that the Task type exists and it is valid
  275.       String type = task.get(Keys.TASK_TYPE);
  276.       assertTrue("Unknown Task type "" + type + "" is seen in " +
  277.                  "history file for task " + tid,
  278.                  (type.equals("MAP") || type.equals("REDUCE") ||
  279.                   type.equals("SETUP") || type.equals("CLEANUP")));
  280.       if (type.equals("MAP")) {
  281.         String splits = task.get(Keys.SPLITS);
  282.         //order in the condition OR check is important here
  283.         if (!splitsCanBeEmpty || splits.length() != 0) {
  284.           Matcher m = splitsPattern.matcher(splits);
  285.           assertTrue("Unexpected format of SPLITS "" + splits + "" is seen" +
  286.                      " in history file for task " + tid, m.matches());
  287.         }
  288.       }
  289.       // Validate task status
  290.       String status = task.get(Keys.TASK_STATUS);
  291.       assertTrue("Unexpected TASK_STATUS "" + status + "" is seen in" +
  292.                  " history file for task " + tid, (status.equals("SUCCESS") ||
  293.                  status.equals("FAILED") || status.equals("KILLED")));
  294.     }
  295.   }
  296.   // Validate foramt of Task Attempt Level Keys, Values read from history file
  297.   private static void validateTaskAttemptLevelKeyValuesFormat(JobInfo job) {
  298.     Map<String, JobHistory.Task> tasks = job.getAllTasks();
  299.     // For each task
  300.     for (JobHistory.Task task : tasks.values()) {
  301.       // validate info of each attempt
  302.       for (JobHistory.TaskAttempt attempt : task.getTaskAttempts().values()) {
  303.         String id = attempt.get(Keys.TASK_ATTEMPT_ID);
  304.         String time = attempt.get(Keys.START_TIME);
  305.         assertTrue("START_TIME of task attempt " + id +
  306.                    " is in unexpected format:" + time +
  307.                    " in history file", isTimeValid(time));
  308.         String time1 = attempt.get(Keys.FINISH_TIME);
  309.         assertTrue("FINISH_TIME of task attempt " + id +
  310.                    " is in unexpected format:" + time1 +
  311.                    " in history file", isTimeValid(time1));
  312.         assertTrue("Task FINISH_TIME is < START_TIME in history file",
  313.                    areTimesInOrder(time, time1));
  314.         // Make sure that the Task type exists and it is valid
  315.         String type = attempt.get(Keys.TASK_TYPE);
  316.         assertTrue("Unknown Task type "" + type + "" is seen in " +
  317.                    "history file for task attempt " + id,
  318.                    (type.equals("MAP") || type.equals("REDUCE") ||
  319.                     type.equals("SETUP") || type.equals("CLEANUP")));
  320.         // Validate task status
  321.         String status = attempt.get(Keys.TASK_STATUS);
  322.         assertTrue("Unexpected TASK_STATUS "" + status + "" is seen in" +
  323.                    " history file for task attempt " + id,
  324.                    (status.equals("SUCCESS") || status.equals("FAILED") ||
  325.                     status.equals("KILLED")));
  326.         // Reduce Task Attempts should have valid SHUFFLE_FINISHED time and
  327.         // SORT_FINISHED time
  328.         if (type.equals("REDUCE") && status.equals("SUCCESS")) {
  329.           time1 = attempt.get(Keys.SHUFFLE_FINISHED);
  330.           assertTrue("SHUFFLE_FINISHED time of task attempt " + id +
  331.                      " is in unexpected format:" + time1 +
  332.                      " in history file", isTimeValid(time1));
  333.           assertTrue("Reduce Task SHUFFLE_FINISHED time is < START_TIME " +
  334.                      "in history file", areTimesInOrder(time, time1));
  335.           time = attempt.get(Keys.SORT_FINISHED);
  336.           assertTrue("SORT_FINISHED of task attempt " + id +
  337.                      " is in unexpected format:" + time +
  338.                      " in history file", isTimeValid(time));
  339.           assertTrue("Reduce Task SORT_FINISHED time is < SORT_FINISHED time" +
  340.                      " in history file", areTimesInOrder(time1, time));
  341.         }
  342.         // check if hostname is valid
  343.         String hostname = attempt.get(Keys.HOSTNAME);
  344.         Matcher m = hostNamePattern.matcher(hostname);
  345.         assertTrue("Unexpected Host name of task attempt " + id, m.matches());
  346.         // check if trackername is valid
  347.         String trackerName = attempt.get(Keys.TRACKER_NAME);
  348.         m = trackerNamePattern.matcher(trackerName);
  349.         assertTrue("Unexpected tracker name of task attempt " + id,
  350.                    m.matches());
  351.         if (!status.equals("KILLED")) {
  352.           // check if http port is valid
  353.           String httpPort = attempt.get(Keys.HTTP_PORT);
  354.           m = digitsPattern.matcher(httpPort);
  355.           assertTrue("Unexpected http port of task attempt " + id, m.matches());
  356.         }
  357.         
  358.         // check if counters are parsable
  359.         String counters = attempt.get(Keys.COUNTERS);
  360.         try {
  361.           Counters readCounters = Counters.fromEscapedCompactString(counters);
  362.           assertTrue("Counters of task attempt " + id + " are not parsable",
  363.                      readCounters != null);
  364.         } catch (ParseException pe) {
  365.           LOG.warn("While trying to parse counters of task attempt " + id +
  366.                    ", " + pe);
  367.         }
  368.       }
  369.     }
  370.   }
  371.   /**
  372.    *  Validates the format of contents of history file
  373.    *  (1) history file exists and in correct location
  374.    *  (2) Verify if the history file is parsable
  375.    *  (3) Validate the contents of history file
  376.    *     (a) Format of all TIMEs are checked against a regex
  377.    *     (b) validate legality/format of job level key, values
  378.    *     (c) validate legality/format of task level key, values
  379.    *     (d) validate legality/format of attempt level key, values
  380.    *     (e) check if all the TaskAttempts, Tasks started are finished.
  381.    *         Check finish of each TaskAttemptID against its start to make sure
  382.    *         that all TaskAttempts, Tasks started are indeed finished and the
  383.    *         history log lines are in the proper order.
  384.    *         We want to catch ordering of history lines like
  385.    *            Task START
  386.    *            Attempt START
  387.    *            Task FINISH
  388.    *            Attempt FINISH
  389.    *         (speculative execution is turned off for this).
  390.    * @param id job id
  391.    * @param conf job conf
  392.    */
  393.   static void validateJobHistoryFileFormat(JobID id, JobConf conf,
  394.                  String status, boolean splitsCanBeEmpty) throws IOException  {
  395.     // Get the history file name
  396.     String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id);
  397.     // Framework history log file location
  398.     Path logFile = JobHistory.JobInfo.getJobHistoryLogLocation(logFileName);
  399.     FileSystem fileSys = logFile.getFileSystem(conf);
  400.  
  401.     // Check if the history file exists
  402.     assertTrue("History file does not exist", fileSys.exists(logFile));
  403.     // check if the history file is parsable
  404.     String[] jobDetails = JobHistory.JobInfo.decodeJobHistoryFileName(
  405.                                         logFileName).split("_");
  406.     String jobId = jobDetails[2] + "_" + jobDetails[3] + "_" + jobDetails[4];
  407.     JobHistory.JobInfo jobInfo = new JobHistory.JobInfo(jobId);
  408.     TestListener l = new TestListener(jobInfo);
  409.     JobHistory.parseHistoryFromFS(logFile.toString().substring(5), l, fileSys);
  410.     // validate format of job level key, values
  411.     validateJobLevelKeyValuesFormat(jobInfo.getValues(), status);
  412.     // validate format of task level key, values
  413.     validateTaskLevelKeyValuesFormat(jobInfo, splitsCanBeEmpty);
  414.     // validate format of attempt level key, values
  415.     validateTaskAttemptLevelKeyValuesFormat(jobInfo);
  416.     // check if all the TaskAttempts, Tasks started are finished for
  417.     // successful jobs
  418.     if (status.equals("SUCCESS")) {
  419.       // Make sure that the lists in taskIDsToAttemptIDs are empty.
  420.       for(Iterator<String> it = taskIDsToAttemptIDs.keySet().iterator();it.hasNext();) {
  421.         String taskid = it.next();
  422.         assertTrue("There are some Tasks which are not finished in history " +
  423.                    "file.", taskEnds.contains(taskid));
  424.         List<String> attemptIDs = taskIDsToAttemptIDs.get(taskid);
  425.         if(attemptIDs != null) {
  426.           assertTrue("Unexpected. TaskID " + taskid + " has task attempt(s)" +
  427.                      " that are not finished.", (attemptIDs.size() == 1));
  428.         }
  429.       }
  430.     }
  431.   }
  432.   // Validate Job Level Keys, Values read from history file by
  433.   // comparing them with the actual values from JT.
  434.   private static void validateJobLevelKeyValues(MiniMRCluster mr,
  435.           RunningJob job, JobInfo jobInfo, JobConf conf) throws IOException  {
  436.     JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
  437.     JobInProgress jip = jt.getJob(job.getID());
  438.     Map<Keys, String> values = jobInfo.getValues();
  439.     assertTrue("SUBMIT_TIME of job obtained from history file did not " +
  440.                "match the expected value", jip.getStartTime() ==
  441.                Long.parseLong(values.get(Keys.SUBMIT_TIME)));
  442.     assertTrue("LAUNCH_TIME of job obtained from history file did not " +
  443.                "match the expected value", jip.getLaunchTime() ==
  444.                Long.parseLong(values.get(Keys.LAUNCH_TIME)));
  445.     assertTrue("FINISH_TIME of job obtained from history file did not " +
  446.                "match the expected value", jip.getFinishTime() ==
  447.                Long.parseLong(values.get(Keys.FINISH_TIME)));
  448.     assertTrue("Job Status of job obtained from history file did not " +
  449.                "match the expected value",
  450.                values.get(Keys.JOB_STATUS).equals("SUCCESS"));
  451.     assertTrue("Job Priority of job obtained from history file did not " +
  452.                "match the expected value", jip.getPriority().toString().equals(
  453.                values.get(Keys.JOB_PRIORITY)));
  454.     assertTrue("Job Name of job obtained from history file did not " +
  455.                "match the expected value", JobInfo.getJobName(conf).equals(
  456.                values.get(Keys.JOBNAME)));
  457.     assertTrue("User Name of job obtained from history file did not " +
  458.                "match the expected value", JobInfo.getUserName(conf).equals(
  459.                values.get(Keys.USER)));
  460.     // Validate job counters
  461.     Counters c = jip.getCounters();
  462.     assertTrue("Counters of job obtained from history file did not " +
  463.                "match the expected value",
  464.                c.makeEscapedCompactString().equals(values.get(Keys.COUNTERS)));
  465.     // Validate number of total maps, total reduces, finished maps,
  466.     // finished reduces, failed maps, failed recudes
  467.     String totalMaps = values.get(Keys.TOTAL_MAPS);
  468.     assertTrue("Unexpected number of total maps in history file",
  469.                Integer.parseInt(totalMaps) == jip.desiredMaps());
  470.     String totalReduces = values.get(Keys.TOTAL_REDUCES);
  471.     assertTrue("Unexpected number of total reduces in history file",
  472.                Integer.parseInt(totalReduces) == jip.desiredReduces());
  473.     String finMaps = values.get(Keys.FINISHED_MAPS);
  474.     assertTrue("Unexpected number of finished maps in history file",
  475.                Integer.parseInt(finMaps) == jip.finishedMaps());
  476.     String finReduces = values.get(Keys.FINISHED_REDUCES);
  477.     assertTrue("Unexpected number of finished reduces in history file",
  478.                Integer.parseInt(finReduces) == jip.finishedReduces());
  479.     String failedMaps = values.get(Keys.FAILED_MAPS);
  480.     assertTrue("Unexpected number of failed maps in history file",
  481.                Integer.parseInt(failedMaps) == jip.failedMapTasks);
  482.     String failedReduces = values.get(Keys.FAILED_REDUCES);
  483.     assertTrue("Unexpected number of failed reduces in history file",
  484.                Integer.parseInt(failedReduces) == jip.failedReduceTasks);
  485.   }
  486.   // Validate Task Level Keys, Values read from history file by
  487.   // comparing them with the actual values from JT.
  488.   private static void validateTaskLevelKeyValues(MiniMRCluster mr,
  489.                       RunningJob job, JobInfo jobInfo) throws IOException  {
  490.     JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
  491.     JobInProgress jip = jt.getJob(job.getID());
  492.     // Get the 1st map, 1st reduce, cleanup & setup taskIDs and
  493.     // validate their history info
  494.     TaskID mapTaskId = new TaskID(job.getID(), true, 0);
  495.     TaskID reduceTaskId = new TaskID(job.getID(), false, 0);
  496.     TaskInProgress cleanups[] = jip.getCleanupTasks();
  497.     TaskID cleanupTaskId;
  498.     if (cleanups[0].isComplete()) {
  499.       cleanupTaskId = cleanups[0].getTIPId();
  500.     }
  501.     else {
  502.       cleanupTaskId = cleanups[1].getTIPId();
  503.     }
  504.     TaskInProgress setups[] = jip.getSetupTasks();
  505.     TaskID setupTaskId;
  506.     if (setups[0].isComplete()) {
  507.       setupTaskId = setups[0].getTIPId();
  508.     }
  509.     else {
  510.       setupTaskId = setups[1].getTIPId();
  511.     }
  512.     Map<String, JobHistory.Task> tasks = jobInfo.getAllTasks();
  513.     // validate info of the 4 tasks(cleanup, setup, 1st map, 1st reduce)
  514.     for (JobHistory.Task task : tasks.values()) {
  515.       String tid = task.get(Keys.TASKID);
  516.       if (tid.equals(mapTaskId.toString()) ||
  517.           tid.equals(reduceTaskId.toString()) ||
  518.           tid.equals(cleanupTaskId.toString()) ||
  519.           tid.equals(setupTaskId.toString())) {
  520.         TaskID taskId = null;
  521.         if (tid.equals(mapTaskId.toString())) {
  522.           taskId = mapTaskId;
  523.         }
  524.         else if (tid.equals(reduceTaskId.toString())) {
  525.           taskId = reduceTaskId;
  526.         }
  527.         else if (tid.equals(cleanupTaskId.toString())) {
  528.           taskId = cleanupTaskId;
  529.         }
  530.         else if (tid.equals(setupTaskId.toString())) {
  531.           taskId = setupTaskId;
  532.         }
  533.         TaskInProgress tip = jip.getTaskInProgress(taskId);
  534.         assertTrue("START_TIME of Task " + tid + " obtained from history " +
  535.              "file did not match the expected value", tip.getExecStartTime() ==
  536.              Long.parseLong(task.get(Keys.START_TIME)));
  537.         assertTrue("FINISH_TIME of Task " + tid + " obtained from history " +
  538.              "file did not match the expected value", tip.getExecFinishTime() ==
  539.              Long.parseLong(task.get(Keys.FINISH_TIME)));
  540.         if (taskId == mapTaskId) {//check splits only for map task
  541.           assertTrue("Splits of Task " + tid + " obtained from history file " +
  542.                      " did not match the expected value",
  543.                      tip.getSplitNodes().equals(task.get(Keys.SPLITS)));
  544.         }
  545.         TaskAttemptID attemptId = tip.getSuccessfulTaskid();
  546.         TaskStatus ts = tip.getTaskStatus(attemptId);
  547.         // Validate task counters
  548.         Counters c = ts.getCounters();
  549.         assertTrue("Counters of Task " + tid + " obtained from history file " +
  550.                    " did not match the expected value",
  551.                   c.makeEscapedCompactString().equals(task.get(Keys.COUNTERS)));
  552.       }
  553.     }
  554.   }
  555.   // Validate Task Attempt Level Keys, Values read from history file by
  556.   // comparing them with the actual values from JT.
  557.   private static void validateTaskAttemptLevelKeyValues(MiniMRCluster mr,
  558.                       RunningJob job, JobInfo jobInfo) throws IOException  {
  559.     JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
  560.     JobInProgress jip = jt.getJob(job.getID());
  561.     Map<String, JobHistory.Task> tasks = jobInfo.getAllTasks();
  562.     // For each task
  563.     for (JobHistory.Task task : tasks.values()) {
  564.       // validate info of each attempt
  565.       for (JobHistory.TaskAttempt attempt : task.getTaskAttempts().values()) {
  566.         String idStr = attempt.get(Keys.TASK_ATTEMPT_ID);
  567.         TaskAttemptID attemptId = TaskAttemptID.forName(idStr);
  568.         TaskID tid = attemptId.getTaskID();
  569.         // Validate task id
  570.         assertTrue("Task id of Task Attempt " + idStr + " obtained from " +
  571.                    "history file did not match the expected value",
  572.                    tid.toString().equals(attempt.get(Keys.TASKID)));
  573.         TaskInProgress tip = jip.getTaskInProgress(tid);
  574.         TaskStatus ts = tip.getTaskStatus(attemptId);
  575.         // Validate task attempt start time
  576.         assertTrue("START_TIME of Task attempt " + idStr + " obtained from " +
  577.                    "history file did not match the expected value",
  578.             ts.getStartTime() == Long.parseLong(attempt.get(Keys.START_TIME)));
  579.         // Validate task attempt finish time
  580.         assertTrue("FINISH_TIME of Task attempt " + idStr + " obtained from " +
  581.                    "history file did not match the expected value",
  582.             ts.getFinishTime() == Long.parseLong(attempt.get(Keys.FINISH_TIME)));
  583.         TaskTrackerStatus ttStatus = jt.getTaskTracker(ts.getTaskTracker());
  584.         if (ttStatus != null) {
  585.           assertTrue("http port of task attempt " + idStr + " obtained from " +
  586.                      "history file did not match the expected value",
  587.                      ttStatus.getHttpPort() ==
  588.                      Integer.parseInt(attempt.get(Keys.HTTP_PORT)));
  589.           if (attempt.get(Keys.TASK_STATUS).equals("SUCCESS")) {
  590.             String ttHostname = jt.getNode(ttStatus.getHost()).toString();
  591.             // check if hostname is valid
  592.             assertTrue("Host name of task attempt " + idStr + " obtained from" +
  593.                        " history file did not match the expected value",
  594.                        ttHostname.equals(attempt.get(Keys.HOSTNAME)));
  595.           }
  596.         }
  597.         if (attempt.get(Keys.TASK_STATUS).equals("SUCCESS")) {
  598.           // Validate SHUFFLE_FINISHED time and SORT_FINISHED time of
  599.           // Reduce Task Attempts
  600.           if (attempt.get(Keys.TASK_TYPE).equals("REDUCE")) {
  601.             assertTrue("SHUFFLE_FINISHED time of task attempt " + idStr +
  602.                      " obtained from history file did not match the expected" +
  603.                      " value", ts.getShuffleFinishTime() ==
  604.                      Long.parseLong(attempt.get(Keys.SHUFFLE_FINISHED)));
  605.             assertTrue("SORT_FINISHED time of task attempt " + idStr +
  606.                      " obtained from history file did not match the expected" +
  607.                      " value", ts.getSortFinishTime() ==
  608.                      Long.parseLong(attempt.get(Keys.SORT_FINISHED)));
  609.           }
  610.           //Validate task counters
  611.           Counters c = ts.getCounters();
  612.           assertTrue("Counters of Task Attempt " + idStr + " obtained from " +
  613.                      "history file did not match the expected value",
  614.                c.makeEscapedCompactString().equals(attempt.get(Keys.COUNTERS)));
  615.         }
  616.         
  617.         // check if tracker name is valid
  618.         assertTrue("Tracker name of task attempt " + idStr + " obtained from " +
  619.                    "history file did not match the expected value",
  620.                    ts.getTaskTracker().equals(attempt.get(Keys.TRACKER_NAME)));
  621.       }
  622.     }
  623.   }
  624.   /**
  625.    * Checks if the history file content is as expected comparing with the
  626.    * actual values obtained from JT.
  627.    * Job Level, Task Level and Task Attempt Level Keys, Values are validated.
  628.    * @param job RunningJob object of the job whose history is to be validated
  629.    * @param conf job conf
  630.    */
  631.   static void validateJobHistoryFileContent(MiniMRCluster mr,
  632.                               RunningJob job, JobConf conf) throws IOException  {
  633.     JobID id = job.getID();
  634.     // Get the history file name
  635.     String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id);
  636.     // Framework history log file location
  637.     Path logFile = JobHistory.JobInfo.getJobHistoryLogLocation(logFileName);
  638.     FileSystem fileSys = logFile.getFileSystem(conf);
  639.  
  640.     // Check if the history file exists
  641.     assertTrue("History file does not exist", fileSys.exists(logFile));
  642.     // check if the history file is parsable
  643.     String[] jobDetails = JobHistory.JobInfo.decodeJobHistoryFileName(
  644.                                         logFileName).split("_");
  645.     String jobId = jobDetails[2] + "_" + jobDetails[3] + "_" + jobDetails[4];
  646.     JobHistory.JobInfo jobInfo = new JobHistory.JobInfo(jobId);
  647.     DefaultJobHistoryParser.JobTasksParseListener l =
  648.                    new DefaultJobHistoryParser.JobTasksParseListener(jobInfo);
  649.     JobHistory.parseHistoryFromFS(logFile.toString().substring(5), l, fileSys);
  650.     // Now the history file contents are available in jobInfo. Let us compare
  651.     // them with the actual values from JT.
  652.     validateJobLevelKeyValues(mr, job, jobInfo, conf);
  653.     validateTaskLevelKeyValues(mr, job, jobInfo);
  654.     validateTaskAttemptLevelKeyValues(mr, job, jobInfo);
  655.   }
  656.   /** Run a job that will be succeeded and validate its history file format
  657.    *  and its content.
  658.    */
  659.   public void testJobHistoryFile() throws IOException {
  660.     MiniMRCluster mr = null;
  661.     try {
  662.       mr = new MiniMRCluster(2, "file:///", 3);
  663.       // run the TCs
  664.       JobConf conf = mr.createJobConf();
  665.       FileSystem fs = FileSystem.get(conf);
  666.       // clean up
  667.       fs.delete(new Path(TEST_ROOT_DIR + "/succeed"), true);
  668.       Path inDir = new Path(TEST_ROOT_DIR + "/succeed/input");
  669.       Path outDir = new Path(TEST_ROOT_DIR + "/succeed/output");
  670.       //Disable speculative execution
  671.       conf.setSpeculativeExecution(false);
  672.       // Make sure that the job is not removed from memory until we do finish
  673.       // the validation of history file content
  674.       conf.setInt("mapred.jobtracker.completeuserjobs.maximum", 10);
  675.       // Run a job that will be succeeded and validate its history file
  676.       RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
  677.       validateJobHistoryFileFormat(job.getID(), conf, "SUCCESS", false);
  678.       validateJobHistoryFileContent(mr, job, conf);
  679.     } finally {
  680.       if (mr != null) {
  681.         mr.shutdown();
  682.       }
  683.     }
  684.   }
  685.   // Returns the output path where user history log file is written to with
  686.   // default configuration setting for hadoop.job.history.user.location
  687.   private static Path getLogLocationInOutputPath(String logFileName,
  688.                                                       JobConf conf) {
  689.     JobConf jobConf = new JobConf(true);//default JobConf
  690.     FileOutputFormat.setOutputPath(jobConf,
  691.                      FileOutputFormat.getOutputPath(conf));
  692.     return JobHistory.JobInfo.getJobHistoryLogLocationForUser(
  693.                                              logFileName, jobConf);
  694.   }
  695.   /**
  696.    * Checks if the user history file exists in the correct dir
  697.    * @param id job id
  698.    * @param conf job conf
  699.    */
  700.   private static void validateJobHistoryUserLogLocation(JobID id, JobConf conf) 
  701.           throws IOException  {
  702.     // Get the history file name
  703.     String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id);
  704.     // User history log file location
  705.     Path logFile = JobHistory.JobInfo.getJobHistoryLogLocationForUser(
  706.                                                      logFileName, conf);
  707.     if(logFile == null) {
  708.       // get the output path where history file is written to when
  709.       // hadoop.job.history.user.location is not set
  710.       logFile = getLogLocationInOutputPath(logFileName, conf);
  711.     }
  712.     FileSystem fileSys = null;
  713.     fileSys = logFile.getFileSystem(conf);
  714.     // Check if the user history file exists in the correct dir
  715.     if (conf.get("hadoop.job.history.user.location") == null) {
  716.       assertTrue("User log file " + logFile + " does not exist",
  717.                  fileSys.exists(logFile));
  718.     }
  719.     else if (conf.get("hadoop.job.history.user.location") == "none") {
  720.       // history file should not exist in the output path
  721.       assertFalse("Unexpected. User log file exists in output dir when " +
  722.                  "hadoop.job.history.user.location is set to "none"",
  723.                  fileSys.exists(logFile));
  724.     }
  725.     else {
  726.       //hadoop.job.history.user.location is set to a specific location.
  727.       // User log file should exist in that location
  728.       assertTrue("User log file " + logFile + " does not exist",
  729.                  fileSys.exists(logFile));
  730.       // User log file should not exist in output path.
  731.       // get the output path where history file is written to when
  732.       // hadoop.job.history.user.location is not set
  733.       Path logFile1 = getLogLocationInOutputPath(logFileName, conf);
  734.       
  735.       if (logFile != logFile1) {
  736.         fileSys = logFile1.getFileSystem(conf);
  737.         assertFalse("Unexpected. User log file exists in output dir when " +
  738.               "hadoop.job.history.user.location is set to a different location",
  739.               fileSys.exists(logFile1));
  740.       }
  741.     }
  742.   }
  743.   // Validate user history file location for the given values of
  744.   // hadoop.job.history.user.location as
  745.   // (1)null(default case), (2)"none", and (3)some dir "/tmp"
  746.   public void testJobHistoryUserLogLocation() throws IOException {
  747.     MiniMRCluster mr = null;
  748.     try {
  749.       mr = new MiniMRCluster(2, "file:///", 3);
  750.       // run the TCs
  751.       JobConf conf = mr.createJobConf();
  752.       FileSystem fs = FileSystem.get(conf);
  753.       // clean up
  754.       fs.delete(new Path(TEST_ROOT_DIR + "/succeed"), true);
  755.       Path inDir = new Path(TEST_ROOT_DIR + "/succeed/input1");
  756.       Path outDir = new Path(TEST_ROOT_DIR + "/succeed/output1");
  757.       // validate for the case of null(default)
  758.       RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
  759.       validateJobHistoryUserLogLocation(job.getID(), conf);
  760.       inDir = new Path(TEST_ROOT_DIR + "/succeed/input2");
  761.       outDir = new Path(TEST_ROOT_DIR + "/succeed/output2");
  762.       // validate for the case of "none"
  763.       conf.set("hadoop.job.history.user.location", "none");
  764.       job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
  765.       validateJobHistoryUserLogLocation(job.getID(), conf);
  766.  
  767.       inDir = new Path(TEST_ROOT_DIR + "/succeed/input3");
  768.       outDir = new Path(TEST_ROOT_DIR + "/succeed/output3");
  769.       // validate for the case of any dir
  770.       conf.set("hadoop.job.history.user.location", "/tmp");
  771.       job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
  772.       validateJobHistoryUserLogLocation(job.getID(), conf);
  773.     } finally {
  774.       if (mr != null) {
  775.         mr.shutdown();
  776.       }
  777.     }
  778.   }
  779.   /**
  780.    * Checks if the history file has expected job status
  781.    * @param id job id
  782.    * @param conf job conf
  783.    */
  784.   private static void validateJobHistoryJobStatus(JobID id, JobConf conf,
  785.           String status) throws IOException  {
  786.     // Get the history file name
  787.     String logFileName = JobHistory.JobInfo.getJobHistoryFileName(conf, id);
  788.     // Framework history log file location
  789.     Path logFile = JobHistory.JobInfo.getJobHistoryLogLocation(logFileName);
  790.     FileSystem fileSys = logFile.getFileSystem(conf);
  791.  
  792.     // Check if the history file exists
  793.     assertTrue("History file does not exist", fileSys.exists(logFile));
  794.     // check history file permission
  795.     assertTrue("History file permissions does not match", 
  796.     fileSys.getFileStatus(logFile).getPermission().equals(
  797.        new FsPermission(JobHistory.HISTORY_FILE_PERMISSION)));
  798.     
  799.     // check if the history file is parsable
  800.     String[] jobDetails = JobHistory.JobInfo.decodeJobHistoryFileName(
  801.                                         logFileName).split("_");
  802.     String jobId = jobDetails[2] + "_" + jobDetails[3] + "_" + jobDetails[4];
  803.     JobHistory.JobInfo jobInfo = new JobHistory.JobInfo(jobId);
  804.     DefaultJobHistoryParser.JobTasksParseListener l =
  805.                   new DefaultJobHistoryParser.JobTasksParseListener(jobInfo);
  806.     JobHistory.parseHistoryFromFS(logFile.toString().substring(5), l, fileSys);
  807.     assertTrue("Job Status read from job history file is not the expected" +
  808.          " status", status.equals(jobInfo.getValues().get(Keys.JOB_STATUS)));
  809.   }
  810.   // run jobs that will be (1) succeeded (2) failed (3) killed
  811.   // and validate job status read from history file in each case
  812.   public void testJobHistoryJobStatus() throws IOException {
  813.     MiniMRCluster mr = null;
  814.     try {
  815.       mr = new MiniMRCluster(2, "file:///", 3);
  816.       // run the TCs
  817.       JobConf conf = mr.createJobConf();
  818.       FileSystem fs = FileSystem.get(conf);
  819.       // clean up
  820.       fs.delete(new Path(TEST_ROOT_DIR + "/succeedfailkilljob"), true);
  821.       Path inDir = new Path(TEST_ROOT_DIR + "/succeedfailkilljob/input");
  822.       Path outDir = new Path(TEST_ROOT_DIR + "/succeedfailkilljob/output");
  823.       // Run a job that will be succeeded and validate its job status
  824.       // existing in history file
  825.       RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
  826.       validateJobHistoryJobStatus(job.getID(), conf, "SUCCESS");
  827.       long historyCleanerRanAt = JobHistory.HistoryCleaner.getLastRan();
  828.       assertTrue(historyCleanerRanAt != 0);
  829.       
  830.       // Run a job that will be failed and validate its job status
  831.       // existing in history file
  832.       job = UtilsForTests.runJobFail(conf, inDir, outDir);
  833.       validateJobHistoryJobStatus(job.getID(), conf, "FAILED");
  834.       assertTrue(historyCleanerRanAt == JobHistory.HistoryCleaner.getLastRan());
  835.       
  836.       // Run a job that will be killed and validate its job status
  837.       // existing in history file
  838.       job = UtilsForTests.runJobKill(conf, inDir, outDir);
  839.       validateJobHistoryJobStatus(job.getID(), conf, "KILLED");
  840.       assertTrue(historyCleanerRanAt == JobHistory.HistoryCleaner.getLastRan());
  841.       
  842.     } finally {
  843.       if (mr != null) {
  844.         mr.shutdown();
  845.       }
  846.     }
  847.   }
  848. }