JobHistory.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:68k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.BufferedReader;
  20. import java.io.File;
  21. import java.io.FileFilter;
  22. import java.io.FileOutputStream;
  23. import java.io.IOException;
  24. import java.io.InputStreamReader;
  25. import java.io.PrintWriter;
  26. import java.io.UnsupportedEncodingException;
  27. import java.net.URLDecoder;
  28. import java.net.URLEncoder;
  29. import java.util.ArrayList;
  30. import java.util.HashMap;
  31. import java.util.Map;
  32. import java.util.TreeMap;
  33. import java.util.concurrent.ConcurrentHashMap;
  34. import java.util.regex.Matcher;
  35. import java.util.regex.Pattern;
  36. import org.apache.commons.logging.Log;
  37. import org.apache.commons.logging.LogFactory;
  38. import org.apache.hadoop.fs.FSDataInputStream;
  39. import org.apache.hadoop.fs.FSDataOutputStream;
  40. import org.apache.hadoop.fs.FileStatus;
  41. import org.apache.hadoop.fs.FileSystem;
  42. import org.apache.hadoop.fs.Path;
  43. import org.apache.hadoop.fs.PathFilter;
  44. import org.apache.hadoop.fs.permission.FsPermission;
  45. import org.apache.hadoop.util.StringUtils;
  46. /**
  47.  * Provides methods for writing to and reading from job history. 
  48.  * Job History works in an append mode, JobHistory and its inner classes provide methods 
  49.  * to log job events. 
  50.  * 
  51.  * JobHistory is split into multiple files, format of each file is plain text where each line 
  52.  * is of the format [type (key=value)*], where type identifies the type of the record. 
  53.  * Type maps to UID of one of the inner classes of this class. 
  54.  * 
  55.  * Job history is maintained in a master index which contains star/stop times of all jobs with
  56.  * a few other job level properties. Apart from this each job's history is maintained in a seperate history 
  57.  * file. name of job history files follows the format jobtrackerId_jobid
  58.  *  
  59.  * For parsing the job history it supports a listener based interface where each line is parsed
  60.  * and passed to listener. The listener can create an object model of history or look for specific 
  61.  * events and discard rest of the history.  
  62.  * 
  63.  * CHANGE LOG :
  64.  * Version 0 : The history has the following format : 
  65.  *             TAG KEY1="VALUE1" KEY2="VALUE2" and so on. 
  66.                TAG can be Job, Task, MapAttempt or ReduceAttempt. 
  67.                Note that a '"' is the line delimiter.
  68.  * Version 1 : Changes the line delimiter to '.'
  69.                Values are now escaped for unambiguous parsing. 
  70.                Added the Meta tag to store version info.
  71.  */
  72. public class JobHistory {
  73.   
  74.   static final long VERSION = 1L;
  75.   public static final Log LOG = LogFactory.getLog(JobHistory.class);
  76.   private static final String DELIMITER = " ";
  77.   static final char LINE_DELIMITER_CHAR = '.';
  78.   static final char[] charsToEscape = new char[] {'"', '=', 
  79.                                                 LINE_DELIMITER_CHAR};
  80.   static final String DIGITS = "[0-9]+";
  81.   static final String KEY = "(\w+)";
  82.   // value is any character other than quote, but escaped quotes can be there
  83.   static final String VALUE = "[^"\\]*(?:\\.[^"\\]*)*"; 
  84.   
  85.   static final Pattern pattern = Pattern.compile(KEY + "=" + """ + VALUE + """);
  86.   
  87.   public static final int JOB_NAME_TRIM_LENGTH = 50;
  88.   private static String JOBTRACKER_UNIQUE_STRING = null;
  89.   private static String LOG_DIR = null;
  90.   private static Map<String, ArrayList<PrintWriter>> openJobs = 
  91.                      new ConcurrentHashMap<String, ArrayList<PrintWriter>>();
  92.   private static boolean disableHistory = false; 
  93.   private static final String SECONDARY_FILE_SUFFIX = ".recover";
  94.   private static long jobHistoryBlockSize = 0;
  95.   private static String jobtrackerHostname;
  96.   final static FsPermission HISTORY_DIR_PERMISSION =
  97.     FsPermission.createImmutable((short) 0750); // rwxr-x---
  98.   final static FsPermission HISTORY_FILE_PERMISSION =
  99.     FsPermission.createImmutable((short) 0740); // rwxr-----
  100.   private static JobConf jtConf;
  101.   /**
  102.    * Record types are identifiers for each line of log in history files. 
  103.    * A record type appears as the first token in a single line of log. 
  104.    */
  105.   public static enum RecordTypes {
  106.     Jobtracker, Job, Task, MapAttempt, ReduceAttempt, Meta
  107.   }
  108.   /**
  109.    * Job history files contain key="value" pairs, where keys belong to this enum. 
  110.    * It acts as a global namespace for all keys. 
  111.    */
  112.   public static enum Keys { 
  113.     JOBTRACKERID,
  114.     START_TIME, FINISH_TIME, JOBID, JOBNAME, USER, JOBCONF, SUBMIT_TIME, 
  115.     LAUNCH_TIME, TOTAL_MAPS, TOTAL_REDUCES, FAILED_MAPS, FAILED_REDUCES, 
  116.     FINISHED_MAPS, FINISHED_REDUCES, JOB_STATUS, TASKID, HOSTNAME, TASK_TYPE, 
  117.     ERROR, TASK_ATTEMPT_ID, TASK_STATUS, COPY_PHASE, SORT_PHASE, REDUCE_PHASE, 
  118.     SHUFFLE_FINISHED, SORT_FINISHED, COUNTERS, SPLITS, JOB_PRIORITY, HTTP_PORT, 
  119.     TRACKER_NAME, STATE_STRING, VERSION
  120.   }
  121.   /**
  122.    * This enum contains some of the values commonly used by history log events. 
  123.    * since values in history can only be strings - Values.name() is used in 
  124.    * most places in history file. 
  125.    */
  126.   public static enum Values {
  127.     SUCCESS, FAILED, KILLED, MAP, REDUCE, CLEANUP, RUNNING, PREP, SETUP
  128.   }
  129.   // temp buffer for parsed dataa
  130.   private static Map<Keys,String> parseBuffer = new HashMap<Keys, String>(); 
  131.   /**
  132.    * Initialize JobHistory files. 
  133.    * @param conf Jobconf of the job tracker.
  134.    * @param hostname jobtracker's hostname
  135.    * @param jobTrackerStartTime jobtracker's start time
  136.    * @return true if intialized properly
  137.    *         false otherwise
  138.    */
  139.   public static boolean init(JobConf conf, String hostname, 
  140.                               long jobTrackerStartTime){
  141.     try {
  142.       LOG_DIR = conf.get("hadoop.job.history.location" ,
  143.         "file:///" + new File(
  144.         System.getProperty("hadoop.log.dir")).getAbsolutePath()
  145.         + File.separator + "history");
  146.       JOBTRACKER_UNIQUE_STRING = hostname + "_" + 
  147.                                     String.valueOf(jobTrackerStartTime) + "_";
  148.       jobtrackerHostname = hostname;
  149.       Path logDir = new Path(LOG_DIR);
  150.       FileSystem fs = logDir.getFileSystem(conf);
  151.       if (!fs.exists(logDir)){
  152.         if (!fs.mkdirs(logDir, new FsPermission(HISTORY_DIR_PERMISSION))) {
  153.           throw new IOException("Mkdirs failed to create " + logDir.toString());
  154.         }
  155.       }
  156.       conf.set("hadoop.job.history.location", LOG_DIR);
  157.       disableHistory = false;
  158.       // set the job history block size (default is 3MB)
  159.       jobHistoryBlockSize = 
  160.         conf.getLong("mapred.jobtracker.job.history.block.size", 
  161.                      3 * 1024 * 1024);
  162.       jtConf = conf;
  163.     } catch(IOException e) {
  164.         LOG.error("Failed to initialize JobHistory log file", e); 
  165.         disableHistory = true;
  166.     }
  167.     return !(disableHistory);
  168.   }
  169.   /**
  170.    * Manages job-history's meta information such as version etc.
  171.    * Helps in logging version information to the job-history and recover
  172.    * version information from the history. 
  173.    */
  174.   static class MetaInfoManager implements Listener {
  175.     private long version = 0L;
  176.     private KeyValuePair pairs = new KeyValuePair();
  177.     
  178.     // Extract the version of the history that was used to write the history
  179.     public MetaInfoManager(String line) throws IOException {
  180.       if (null != line) {
  181.         // Parse the line
  182.         parseLine(line, this, false);
  183.       }
  184.     }
  185.     
  186.     // Get the line delimiter
  187.     char getLineDelim() {
  188.       if (version == 0) {
  189.         return '"';
  190.       } else {
  191.         return LINE_DELIMITER_CHAR;
  192.       }
  193.     }
  194.     
  195.     // Checks if the values are escaped or not
  196.     boolean isValueEscaped() {
  197.       // Note that the values are not escaped in version 0
  198.       return version != 0;
  199.     }
  200.     
  201.     public void handle(RecordTypes recType, Map<Keys, String> values) 
  202.     throws IOException {
  203.       // Check if the record is of type META
  204.       if (RecordTypes.Meta == recType) {
  205.         pairs.handle(values);
  206.         version = pairs.getLong(Keys.VERSION); // defaults to 0
  207.       }
  208.     }
  209.     
  210.     /**
  211.      * Logs history meta-info to the history file. This needs to be called once
  212.      * per history file. 
  213.      * @param jobId job id, assigned by jobtracker. 
  214.      */
  215.     static void logMetaInfo(ArrayList<PrintWriter> writers){
  216.       if (!disableHistory){
  217.         if (null != writers){
  218.           JobHistory.log(writers, RecordTypes.Meta, 
  219.               new Keys[] {Keys.VERSION},
  220.               new String[] {String.valueOf(VERSION)}); 
  221.         }
  222.       }
  223.     }
  224.   }
  225.   
  226.   /** Escapes the string especially for {@link JobHistory}
  227.    */
  228.   static String escapeString(String data) {
  229.     return StringUtils.escapeString(data, StringUtils.ESCAPE_CHAR, 
  230.                                     charsToEscape);
  231.   }
  232.   
  233.   /**
  234.    * Parses history file and invokes Listener.handle() for 
  235.    * each line of history. It can be used for looking through history
  236.    * files for specific items without having to keep whole history in memory. 
  237.    * @param path path to history file
  238.    * @param l Listener for history events 
  239.    * @param fs FileSystem where history file is present
  240.    * @throws IOException
  241.    */
  242.   public static void parseHistoryFromFS(String path, Listener l, FileSystem fs)
  243.   throws IOException{
  244.     FSDataInputStream in = fs.open(new Path(path));
  245.     BufferedReader reader = new BufferedReader(new InputStreamReader (in));
  246.     try {
  247.       String line = null; 
  248.       StringBuffer buf = new StringBuffer(); 
  249.       
  250.       // Read the meta-info line. Note that this might a jobinfo line for files
  251.       // written with older format
  252.       line = reader.readLine();
  253.       
  254.       // Check if the file is empty
  255.       if (line == null) {
  256.         return;
  257.       }
  258.       
  259.       // Get the information required for further processing
  260.       MetaInfoManager mgr = new MetaInfoManager(line);
  261.       boolean isEscaped = mgr.isValueEscaped();
  262.       String lineDelim = String.valueOf(mgr.getLineDelim());  
  263.       String escapedLineDelim = 
  264.         StringUtils.escapeString(lineDelim, StringUtils.ESCAPE_CHAR, 
  265.                                  mgr.getLineDelim());
  266.       
  267.       do {
  268.         buf.append(line); 
  269.         if (!line.trim().endsWith(lineDelim) 
  270.             || line.trim().endsWith(escapedLineDelim)) {
  271.           buf.append("n");
  272.           continue; 
  273.         }
  274.         parseLine(buf.toString(), l, isEscaped);
  275.         buf = new StringBuffer(); 
  276.       } while ((line = reader.readLine())!= null);
  277.     } finally {
  278.       try { reader.close(); } catch (IOException ex) {}
  279.     }
  280.   }
  281.   /**
  282.    * Parse a single line of history. 
  283.    * @param line
  284.    * @param l
  285.    * @throws IOException
  286.    */
  287.   private static void parseLine(String line, Listener l, boolean isEscaped) 
  288.   throws IOException{
  289.     // extract the record type 
  290.     int idx = line.indexOf(' '); 
  291.     String recType = line.substring(0, idx);
  292.     String data = line.substring(idx+1, line.length());
  293.     
  294.     Matcher matcher = pattern.matcher(data); 
  295.     while(matcher.find()){
  296.       String tuple = matcher.group(0);
  297.       String []parts = StringUtils.split(tuple, StringUtils.ESCAPE_CHAR, '=');
  298.       String value = parts[1].substring(1, parts[1].length() -1);
  299.       if (isEscaped) {
  300.         value = StringUtils.unEscapeString(value, StringUtils.ESCAPE_CHAR,
  301.                                            charsToEscape);
  302.       }
  303.       parseBuffer.put(Keys.valueOf(parts[0]), value);
  304.     }
  305.     l.handle(RecordTypes.valueOf(recType), parseBuffer); 
  306.     
  307.     parseBuffer.clear(); 
  308.   }
  309.   
  310.   
  311.   /**
  312.    * Log a raw record type with keys and values. This is method is generally not used directly. 
  313.    * @param recordType type of log event
  314.    * @param key key
  315.    * @param value value
  316.    */
  317.   
  318.   static void log(PrintWriter out, RecordTypes recordType, Keys key, 
  319.                   String value){
  320.     value = escapeString(value);
  321.     out.println(recordType.name() + DELIMITER + key + "="" + value + """
  322.                 + DELIMITER + LINE_DELIMITER_CHAR); 
  323.   }
  324.   
  325.   /**
  326.    * Log a number of keys and values with record. the array length of keys and values
  327.    * should be same. 
  328.    * @param recordType type of log event
  329.    * @param keys type of log event
  330.    * @param values type of log event
  331.    */
  332.   static void log(ArrayList<PrintWriter> writers, RecordTypes recordType, 
  333.                   Keys[] keys, String[] values) {
  334.     StringBuffer buf = new StringBuffer(recordType.name()); 
  335.     buf.append(DELIMITER); 
  336.     for(int i =0; i< keys.length; i++){
  337.       buf.append(keys[i]);
  338.       buf.append("="");
  339.       values[i] = escapeString(values[i]);
  340.       buf.append(values[i]);
  341.       buf.append(""");
  342.       buf.append(DELIMITER); 
  343.     }
  344.     buf.append(LINE_DELIMITER_CHAR);
  345.     
  346.     for (PrintWriter out : writers) {
  347.       out.println(buf.toString());
  348.     }
  349.   }
  350.   
  351.   /**
  352.    * Returns history disable status. by default history is enabled so this
  353.    * method returns false. 
  354.    * @return true if history logging is disabled, false otherwise. 
  355.    */
  356.   public static boolean isDisableHistory() {
  357.     return disableHistory;
  358.   }
  359.   /**
  360.    * Enable/disable history logging. Default value is false, so history 
  361.    * is enabled by default. 
  362.    * @param disableHistory true if history should be disabled, false otherwise. 
  363.    */
  364.   public static void setDisableHistory(boolean disableHistory) {
  365.     JobHistory.disableHistory = disableHistory;
  366.   }
  367.   
  368.   /**
  369.    * Base class contais utility stuff to manage types key value pairs with enums. 
  370.    */
  371.   static class KeyValuePair{
  372.     private Map<Keys, String> values = new HashMap<Keys, String>(); 
  373.     /**
  374.      * Get 'String' value for given key. Most of the places use Strings as 
  375.      * values so the default get' method returns 'String'.  This method never returns 
  376.      * null to ease on GUIs. if no value is found it returns empty string ""
  377.      * @param k 
  378.      * @return if null it returns empty string - "" 
  379.      */
  380.     public String get(Keys k){
  381.       String s = values.get(k); 
  382.       return s == null ? "" : s; 
  383.     }
  384.     /**
  385.      * Convert value from history to int and return. 
  386.      * if no value is found it returns 0.
  387.      * @param k key 
  388.      */
  389.     public int getInt(Keys k){
  390.       String s = values.get(k); 
  391.       if (null != s){
  392.         return Integer.parseInt(s);
  393.       }
  394.       return 0; 
  395.     }
  396.     /**
  397.      * Convert value from history to int and return. 
  398.      * if no value is found it returns 0.
  399.      * @param k
  400.      */
  401.     public long getLong(Keys k){
  402.       String s = values.get(k); 
  403.       if (null != s){
  404.         return Long.parseLong(s);
  405.       }
  406.       return 0; 
  407.     }
  408.     /**
  409.      * Set value for the key. 
  410.      * @param k
  411.      * @param s
  412.      */
  413.     public void set(Keys k, String s){
  414.       values.put(k, s); 
  415.     }
  416.     /**
  417.      * Adds all values in the Map argument to its own values. 
  418.      * @param m
  419.      */
  420.     public void set(Map<Keys, String> m){
  421.       values.putAll(m);
  422.     }
  423.     /**
  424.      * Reads values back from the history, input is same Map as passed to Listener by parseHistory().  
  425.      * @param values
  426.      */
  427.     public synchronized void handle(Map<Keys, String> values){
  428.       set(values); 
  429.     }
  430.     /**
  431.      * Returns Map containing all key-values. 
  432.      */
  433.     public Map<Keys, String> getValues(){
  434.       return values; 
  435.     }
  436.   }
  437.   
  438.   /**
  439.    * Helper class for logging or reading back events related to job start, finish or failure. 
  440.    */
  441.   public static class JobInfo extends KeyValuePair{
  442.     
  443.     private Map<String, Task> allTasks = new TreeMap<String, Task>();
  444.     
  445.     /** Create new JobInfo */
  446.     public JobInfo(String jobId){ 
  447.       set(Keys.JOBID, jobId);  
  448.     }
  449.     /**
  450.      * Returns all map and reduce tasks <taskid-Task>. 
  451.      */
  452.     public Map<String, Task> getAllTasks() { return allTasks; }
  453.     
  454.     /**
  455.      * Get the path of the locally stored job file
  456.      * @param jobId id of the job
  457.      * @return the path of the job file on the local file system 
  458.      */
  459.     public static String getLocalJobFilePath(JobID jobId){
  460.       return System.getProperty("hadoop.log.dir") + File.separator +
  461.                jobId + "_conf.xml";
  462.     }
  463.     
  464.     /**
  465.      * Helper function to encode the URL of the path of the job-history
  466.      * log file. 
  467.      * 
  468.      * @param logFile path of the job-history file
  469.      * @return URL encoded path
  470.      * @throws IOException
  471.      */
  472.     public static String encodeJobHistoryFilePath(String logFile)
  473.     throws IOException {
  474.       Path rawPath = new Path(logFile);
  475.       String encodedFileName = null;
  476.       try {
  477.         encodedFileName = URLEncoder.encode(rawPath.getName(), "UTF-8");
  478.       } catch (UnsupportedEncodingException uee) {
  479.         IOException ioe = new IOException();
  480.         ioe.initCause(uee);
  481.         ioe.setStackTrace(uee.getStackTrace());
  482.         throw ioe;
  483.       }
  484.       
  485.       Path encodedPath = new Path(rawPath.getParent(), encodedFileName);
  486.       return encodedPath.toString();
  487.     }
  488.     
  489.     /**
  490.      * Helper function to encode the URL of the filename of the job-history 
  491.      * log file.
  492.      * 
  493.      * @param logFileName file name of the job-history file
  494.      * @return URL encoded filename
  495.      * @throws IOException
  496.      */
  497.     public static String encodeJobHistoryFileName(String logFileName)
  498.     throws IOException {
  499.       String encodedFileName = null;
  500.       try {
  501.         encodedFileName = URLEncoder.encode(logFileName, "UTF-8");
  502.       } catch (UnsupportedEncodingException uee) {
  503.         IOException ioe = new IOException();
  504.         ioe.initCause(uee);
  505.         ioe.setStackTrace(uee.getStackTrace());
  506.         throw ioe;
  507.       }
  508.       return encodedFileName;
  509.     }
  510.     
  511.     /**
  512.      * Helper function to decode the URL of the filename of the job-history 
  513.      * log file.
  514.      * 
  515.      * @param logFileName file name of the job-history file
  516.      * @return URL decoded filename
  517.      * @throws IOException
  518.      */
  519.     public static String decodeJobHistoryFileName(String logFileName)
  520.     throws IOException {
  521.       String decodedFileName = null;
  522.       try {
  523.         decodedFileName = URLDecoder.decode(logFileName, "UTF-8");
  524.       } catch (UnsupportedEncodingException uee) {
  525.         IOException ioe = new IOException();
  526.         ioe.initCause(uee);
  527.         ioe.setStackTrace(uee.getStackTrace());
  528.         throw ioe;
  529.       }
  530.       return decodedFileName;
  531.     }
  532.     
  533.     /**
  534.      * Get the job name from the job conf
  535.      */
  536.     static String getJobName(JobConf jobConf) {
  537.       String jobName = jobConf.getJobName();
  538.       if (jobName == null || jobName.length() == 0) {
  539.         jobName = "NA";
  540.       }
  541.       return jobName;
  542.     }
  543.     
  544.     /**
  545.      * Get the user name from the job conf
  546.      */
  547.     public static String getUserName(JobConf jobConf) {
  548.       String user = jobConf.getUser();
  549.       if (user == null || user.length() == 0) {
  550.         user = "NA";
  551.       }
  552.       return user;
  553.     }
  554.     
  555.     /**
  556.      * Get the job history file path given the history filename
  557.      */
  558.     public static Path getJobHistoryLogLocation(String logFileName)
  559.     {
  560.       return LOG_DIR == null ? null : new Path(LOG_DIR, logFileName);
  561.     }
  562.     /**
  563.      * Get the user job history file path
  564.      */
  565.     public static Path getJobHistoryLogLocationForUser(String logFileName, 
  566.                                                        JobConf jobConf) {
  567.       // find user log directory 
  568.       Path userLogFile = null;
  569.       Path outputPath = FileOutputFormat.getOutputPath(jobConf);
  570.       String userLogDir = jobConf.get("hadoop.job.history.user.location",
  571.                                       outputPath == null 
  572.                                       ? null 
  573.                                       : outputPath.toString());
  574.       if ("none".equals(userLogDir)) {
  575.         userLogDir = null;
  576.       }
  577.       if (userLogDir != null) {
  578.         userLogDir = userLogDir + Path.SEPARATOR + "_logs" + Path.SEPARATOR 
  579.                      + "history";
  580.         userLogFile = new Path(userLogDir, logFileName);
  581.       }
  582.       return userLogFile;
  583.     }
  584.     /**
  585.      * Generates the job history filename for a new job
  586.      */
  587.     private static String getNewJobHistoryFileName(JobConf jobConf, JobID id) {
  588.       return JOBTRACKER_UNIQUE_STRING
  589.              + id.toString() + "_" + getUserName(jobConf) + "_" 
  590.              + trimJobName(getJobName(jobConf));
  591.     }
  592.     
  593.     /**
  594.      * Trims the job-name if required
  595.      */
  596.     private static String trimJobName(String jobName) {
  597.       if (jobName.length() > JOB_NAME_TRIM_LENGTH) {
  598.         jobName = jobName.substring(0, JOB_NAME_TRIM_LENGTH);
  599.       }
  600.       return jobName;
  601.     }
  602.     
  603.     private static String escapeRegexChars( String string ) {
  604.       return "\Q"+string.replaceAll("\\E", "\\E\\\\E\\Q")+"\E";
  605.     }
  606.     /**
  607.      * Recover the job history filename from the history folder. 
  608.      * Uses the following pattern
  609.      *    $jt-hostname_[0-9]*_$job-id_$user-$job-name*
  610.      * @param jobConf the job conf
  611.      * @param id job id
  612.      */
  613.     public static synchronized String getJobHistoryFileName(JobConf jobConf, 
  614.                                                             JobID id) 
  615.     throws IOException {
  616.       String user = getUserName(jobConf);
  617.       String jobName = trimJobName(getJobName(jobConf));
  618.       
  619.       FileSystem fs = new Path(LOG_DIR).getFileSystem(jobConf);
  620.       if (LOG_DIR == null) {
  621.         return null;
  622.       }
  623.       // Make the pattern matching the job's history file
  624.       final Pattern historyFilePattern = 
  625.         Pattern.compile(jobtrackerHostname + "_" + DIGITS + "_" 
  626.                         + id.toString() + "_" + user + "_" 
  627.                         + escapeRegexChars(jobName) + "+");
  628.       // a path filter that matches 4 parts of the filenames namely
  629.       //  - jt-hostname
  630.       //  - job-id
  631.       //  - username
  632.       //  - jobname
  633.       PathFilter filter = new PathFilter() {
  634.         public boolean accept(Path path) {
  635.           String fileName = path.getName();
  636.           try {
  637.             fileName = decodeJobHistoryFileName(fileName);
  638.           } catch (IOException ioe) {
  639.             LOG.info("Error while decoding history file " + fileName + "."
  640.                      + " Ignoring file.", ioe);
  641.             return false;
  642.           }
  643.           return historyFilePattern.matcher(fileName).find();
  644.         }
  645.       };
  646.       
  647.       FileStatus[] statuses = fs.listStatus(new Path(LOG_DIR), filter);
  648.       String filename;
  649.       if (statuses.length == 0) {
  650.         filename = 
  651.           encodeJobHistoryFileName(getNewJobHistoryFileName(jobConf, id));
  652.         LOG.info("Nothing to recover! Generating a new filename " + filename 
  653.                  + " for job " + id);
  654.       } else {
  655.         // return filename considering that fact the name can be a 
  656.         // secondary filename like filename.recover
  657.         filename = decodeJobHistoryFileName(statuses[0].getPath().getName());
  658.         // Remove the '.recover' suffix if it exists
  659.         if (filename.endsWith(jobName + SECONDARY_FILE_SUFFIX)) {
  660.           int newLength = filename.length() - SECONDARY_FILE_SUFFIX.length();
  661.           filename = filename.substring(0, newLength);
  662.         }
  663.         filename = encodeJobHistoryFileName(filename);
  664.         LOG.info("Recovered job history filename for job " + id + " is " 
  665.                  + filename);
  666.       }
  667.       return filename;
  668.     }
  669.     
  670.     /** Since there was a restart, there should be a master file and 
  671.      * a recovery file. Once the recovery is complete, the master should be 
  672.      * deleted as an indication that the recovery file should be treated as the 
  673.      * master upon completion or next restart.
  674.      * @param fileName the history filename that needs checkpointing
  675.      * @param conf Job conf
  676.      * @throws IOException
  677.      */
  678.     static synchronized void checkpointRecovery(String fileName, JobConf conf) 
  679.     throws IOException {
  680.       Path logPath = JobHistory.JobInfo.getJobHistoryLogLocation(fileName);
  681.       if (logPath != null) {
  682.         FileSystem fs = logPath.getFileSystem(conf);
  683.         LOG.info("Deleting job history file " + logPath.getName());
  684.         fs.delete(logPath, false);
  685.       }
  686.       // do the same for the user file too
  687.       logPath = JobHistory.JobInfo.getJobHistoryLogLocationForUser(fileName, 
  688.                                                                    conf);
  689.       if (logPath != null) {
  690.         FileSystem fs = logPath.getFileSystem(conf);
  691.         fs.delete(logPath, false);
  692.       }
  693.     }
  694.     
  695.     static String getSecondaryJobHistoryFile(String filename) 
  696.     throws IOException {
  697.       return encodeJobHistoryFileName(
  698.           decodeJobHistoryFileName(filename) + SECONDARY_FILE_SUFFIX);
  699.     }
  700.     
  701.     /** Selects one of the two files generated as a part of recovery. 
  702.      * The thumb rule is that always select the oldest file. 
  703.      * This call makes sure that only one file is left in the end. 
  704.      * @param conf job conf
  705.      * @param logFilePath Path of the log file
  706.      * @throws IOException 
  707.      */
  708.     public synchronized static Path recoverJobHistoryFile(JobConf conf, 
  709.                                                           Path logFilePath) 
  710.     throws IOException {
  711.       Path ret;
  712.       FileSystem fs = logFilePath.getFileSystem(conf);
  713.       String logFileName = logFilePath.getName();
  714.       String tmpFilename = getSecondaryJobHistoryFile(logFileName);
  715.       Path logDir = logFilePath.getParent();
  716.       Path tmpFilePath = new Path(logDir, tmpFilename);
  717.       if (fs.exists(logFilePath)) {
  718.         LOG.info(logFileName + " exists!");
  719.         if (fs.exists(tmpFilePath)) {
  720.           LOG.info("Deleting " + tmpFilename 
  721.                    + "  and using " + logFileName + " for recovery.");
  722.           fs.delete(tmpFilePath, false);
  723.         }
  724.         ret = tmpFilePath;
  725.       } else {
  726.         LOG.info(logFileName + " doesnt exist! Using " 
  727.                  + tmpFilename + " for recovery.");
  728.         if (fs.exists(tmpFilePath)) {
  729.           LOG.info("Renaming " + tmpFilename + " to " + logFileName);
  730.           fs.rename(tmpFilePath, logFilePath);
  731.           ret = tmpFilePath;
  732.         } else {
  733.           ret = logFilePath;
  734.         }
  735.       }
  736.       // do the same for the user files too
  737.       logFilePath = getJobHistoryLogLocationForUser(logFileName, conf);
  738.       if (logFilePath != null) {
  739.         fs = logFilePath.getFileSystem(conf);
  740.         logDir = logFilePath.getParent();
  741.         tmpFilePath = new Path(logDir, tmpFilename);
  742.         if (fs.exists(logFilePath)) {
  743.           LOG.info(logFileName + " exists!");
  744.           if (fs.exists(tmpFilePath)) {
  745.             LOG.info("Deleting " + tmpFilename + "  and making " + logFileName 
  746.                      + " as the master history file for user.");
  747.             fs.delete(tmpFilePath, false);
  748.           }
  749.         } else {
  750.           LOG.info(logFileName + " doesnt exist! Using " 
  751.                    + tmpFilename + " as the master history file for user.");
  752.           if (fs.exists(tmpFilePath)) {
  753.             LOG.info("Renaming " + tmpFilename + " to " + logFileName 
  754.                      + " in user directory");
  755.             fs.rename(tmpFilePath, logFilePath);
  756.           }
  757.         }
  758.       }
  759.       
  760.       return ret;
  761.     }
  762.     /** Finalize the recovery and make one file in the end. 
  763.      * This invloves renaming the recover file to the master file.
  764.      * @param id Job id  
  765.      * @param conf the job conf
  766.      * @throws IOException
  767.      */
  768.     static synchronized void finalizeRecovery(JobID id, JobConf conf) 
  769.     throws IOException {
  770.       String masterLogFileName = 
  771.         JobHistory.JobInfo.getJobHistoryFileName(conf, id);
  772.       Path masterLogPath = 
  773.         JobHistory.JobInfo.getJobHistoryLogLocation(masterLogFileName);
  774.       String tmpLogFileName = getSecondaryJobHistoryFile(masterLogFileName);
  775.       Path tmpLogPath = 
  776.         JobHistory.JobInfo.getJobHistoryLogLocation(tmpLogFileName);
  777.       if (masterLogPath != null) {
  778.         FileSystem fs = masterLogPath.getFileSystem(conf);
  779.         // rename the tmp file to the master file. Note that this should be 
  780.         // done only when the file is closed and handles are released.
  781.         if(fs.exists(tmpLogPath)) {
  782.           LOG.info("Renaming " + tmpLogFileName + " to " + masterLogFileName);
  783.           fs.rename(tmpLogPath, masterLogPath);
  784.         }
  785.       }
  786.       
  787.       // do the same for the user file too
  788.       masterLogPath = 
  789.         JobHistory.JobInfo.getJobHistoryLogLocationForUser(masterLogFileName,
  790.                                                            conf);
  791.       tmpLogPath = 
  792.         JobHistory.JobInfo.getJobHistoryLogLocationForUser(tmpLogFileName, 
  793.                                                            conf);
  794.       if (masterLogPath != null) {
  795.         FileSystem fs = masterLogPath.getFileSystem(conf);
  796.         if (fs.exists(tmpLogPath)) {
  797.           LOG.info("Renaming " + tmpLogFileName + " to " + masterLogFileName
  798.                    + " in user directory");
  799.           fs.rename(tmpLogPath, masterLogPath);
  800.         }
  801.       }
  802.     }
  803.     /**
  804.      * Log job submitted event to history. Creates a new file in history 
  805.      * for the job. if history file creation fails, it disables history 
  806.      * for all other events. 
  807.      * @param jobId job id assigned by job tracker.
  808.      * @param jobConf job conf of the job
  809.      * @param jobConfPath path to job conf xml file in HDFS.
  810.      * @param submitTime time when job tracker received the job
  811.      * @throws IOException
  812.      */
  813.     public static void logSubmitted(JobID jobId, JobConf jobConf, 
  814.                                     String jobConfPath, long submitTime) 
  815.     throws IOException {
  816.       FileSystem fs = null;
  817.       String userLogDir = null;
  818.       String jobUniqueString = JOBTRACKER_UNIQUE_STRING + jobId;
  819.       if (!disableHistory){
  820.         // Get the username and job name to be used in the actual log filename;
  821.         // sanity check them too        
  822.         String jobName = getJobName(jobConf);
  823.         String user = getUserName(jobConf);
  824.         
  825.         // get the history filename
  826.         String logFileName = 
  827.           getJobHistoryFileName(jobConf, jobId);
  828.         // setup the history log file for this job
  829.         Path logFile = getJobHistoryLogLocation(logFileName);
  830.         
  831.         // find user log directory
  832.         Path userLogFile = 
  833.           getJobHistoryLogLocationForUser(logFileName, jobConf);
  834.         try{
  835.           ArrayList<PrintWriter> writers = new ArrayList<PrintWriter>();
  836.           FSDataOutputStream out = null;
  837.           PrintWriter writer = null;
  838.           if (LOG_DIR != null) {
  839.             // create output stream for logging in hadoop.job.history.location
  840.             fs = new Path(LOG_DIR).getFileSystem(jobConf);
  841.             
  842.             logFile = recoverJobHistoryFile(jobConf, logFile);
  843.             logFileName = logFile.getName();
  844.             
  845.             int defaultBufferSize = 
  846.               fs.getConf().getInt("io.file.buffer.size", 4096);
  847.             out = fs.create(logFile, 
  848.                             new FsPermission(HISTORY_FILE_PERMISSION),
  849.                             true, 
  850.                             defaultBufferSize, 
  851.                             fs.getDefaultReplication(), 
  852.                             jobHistoryBlockSize, null);
  853.             writer = new PrintWriter(out);
  854.             writers.add(writer);
  855.           }
  856.           if (userLogFile != null) {
  857.             // Get the actual filename as recoverJobHistoryFile() might return
  858.             // a different filename
  859.             userLogDir = userLogFile.getParent().toString();
  860.             userLogFile = new Path(userLogDir, logFileName);
  861.             
  862.             // create output stream for logging 
  863.             // in hadoop.job.history.user.location
  864.             fs = userLogFile.getFileSystem(jobConf);
  865.  
  866.             out = fs.create(userLogFile, true, 4096);
  867.             writer = new PrintWriter(out);
  868.             writers.add(writer);
  869.           }
  870.           openJobs.put(jobUniqueString, writers);
  871.           
  872.           // Log the history meta info
  873.           JobHistory.MetaInfoManager.logMetaInfo(writers);
  874.           //add to writer as well 
  875.           JobHistory.log(writers, RecordTypes.Job, 
  876.                          new Keys[]{Keys.JOBID, Keys.JOBNAME, Keys.USER, Keys.SUBMIT_TIME, Keys.JOBCONF }, 
  877.                          new String[]{jobId.toString(), jobName, user, 
  878.                                       String.valueOf(submitTime) , jobConfPath}
  879.                         ); 
  880.              
  881.         }catch(IOException e){
  882.           LOG.error("Failed creating job history log file, disabling history", e);
  883.           disableHistory = true; 
  884.         }
  885.       }
  886.       // Always store job conf on local file system 
  887.       String localJobFilePath =  JobInfo.getLocalJobFilePath(jobId); 
  888.       File localJobFile = new File(localJobFilePath);
  889.       FileOutputStream jobOut = null;
  890.       try {
  891.         jobOut = new FileOutputStream(localJobFile);
  892.         jobConf.writeXml(jobOut);
  893.         if (LOG.isDebugEnabled()) {
  894.           LOG.debug("Job conf for " + jobId + " stored at " 
  895.                     + localJobFile.getAbsolutePath());
  896.         }
  897.       } catch (IOException ioe) {
  898.         LOG.error("Failed to store job conf on the local filesystem ", ioe);
  899.       } finally {
  900.         if (jobOut != null) {
  901.           try {
  902.             jobOut.close();
  903.           } catch (IOException ie) {
  904.             LOG.info("Failed to close the job configuration file " 
  905.                        + StringUtils.stringifyException(ie));
  906.           }
  907.         }
  908.       }
  909.       /* Storing the job conf on the log dir */
  910.       Path jobFilePath = null;
  911.       if (LOG_DIR != null) {
  912.         jobFilePath = new Path(LOG_DIR + File.separator + 
  913.                                jobUniqueString + "_conf.xml");
  914.       }
  915.       Path userJobFilePath = null;
  916.       if (userLogDir != null) {
  917.         userJobFilePath = new Path(userLogDir + File.separator +
  918.                                    jobUniqueString + "_conf.xml");
  919.       }
  920.       FSDataOutputStream jobFileOut = null;
  921.       try {
  922.         if (LOG_DIR != null) {
  923.           fs = new Path(LOG_DIR).getFileSystem(jobConf);
  924.           int defaultBufferSize = 
  925.               fs.getConf().getInt("io.file.buffer.size", 4096);
  926.           if (!fs.exists(jobFilePath)) {
  927.             jobFileOut = fs.create(jobFilePath, 
  928.                                    new FsPermission(HISTORY_FILE_PERMISSION),
  929.                                    true, 
  930.                                    defaultBufferSize, 
  931.                                    fs.getDefaultReplication(), 
  932.                                    fs.getDefaultBlockSize(), null);
  933.             jobConf.writeXml(jobFileOut);
  934.             jobFileOut.close();
  935.           }
  936.         } 
  937.         if (userLogDir != null) {
  938.           fs = new Path(userLogDir).getFileSystem(jobConf);
  939.           jobFileOut = fs.create(userJobFilePath);
  940.           jobConf.writeXml(jobFileOut);
  941.         }
  942.         if (LOG.isDebugEnabled()) {
  943.           LOG.debug("Job conf for " + jobId + " stored at " 
  944.                     + jobFilePath + "and" + userJobFilePath );
  945.         }
  946.       } catch (IOException ioe) {
  947.         LOG.error("Failed to store job conf on the local filesystem ", ioe);
  948.       } finally {
  949.         if (jobFileOut != null) {
  950.           try {
  951.             jobFileOut.close();
  952.           } catch (IOException ie) {
  953.             LOG.info("Failed to close the job configuration file " 
  954.                      + StringUtils.stringifyException(ie));
  955.           }
  956.         }
  957.       } 
  958.     }
  959.     /**
  960.      * Logs launch time of job. 
  961.      * 
  962.      * @param jobId job id, assigned by jobtracker. 
  963.      * @param startTime start time of job. 
  964.      * @param totalMaps total maps assigned by jobtracker. 
  965.      * @param totalReduces total reduces. 
  966.      */
  967.     public static void logInited(JobID jobId, long startTime, 
  968.                                  int totalMaps, int totalReduces) {
  969.       if (!disableHistory){
  970.         String logFileKey =  JOBTRACKER_UNIQUE_STRING + jobId; 
  971.         ArrayList<PrintWriter> writer = openJobs.get(logFileKey); 
  972.         if (null != writer){
  973.           JobHistory.log(writer, RecordTypes.Job, 
  974.               new Keys[] {Keys.JOBID, Keys.LAUNCH_TIME, Keys.TOTAL_MAPS, 
  975.                           Keys.TOTAL_REDUCES, Keys.JOB_STATUS},
  976.               new String[] {jobId.toString(), String.valueOf(startTime), 
  977.                             String.valueOf(totalMaps), 
  978.                             String.valueOf(totalReduces), 
  979.                             Values.PREP.name()}); 
  980.         }
  981.       }
  982.     }
  983.     
  984.    /**
  985.      * Logs the job as RUNNING. 
  986.      *
  987.      * @param jobId job id, assigned by jobtracker. 
  988.      * @param startTime start time of job. 
  989.      * @param totalMaps total maps assigned by jobtracker. 
  990.      * @param totalReduces total reduces. 
  991.      * @deprecated Use {@link #logInited(JobID, long, int, int)} and 
  992.      * {@link #logStarted(JobID)}
  993.      */
  994.     @Deprecated
  995.     public static void logStarted(JobID jobId, long startTime, 
  996.                                   int totalMaps, int totalReduces) {
  997.       logStarted(jobId);
  998.     }
  999.     
  1000.     /**
  1001.      * Logs job as running 
  1002.      * @param jobId job id, assigned by jobtracker. 
  1003.      */
  1004.     public static void logStarted(JobID jobId){
  1005.       if (!disableHistory){
  1006.         String logFileKey =  JOBTRACKER_UNIQUE_STRING + jobId; 
  1007.         ArrayList<PrintWriter> writer = openJobs.get(logFileKey); 
  1008.         if (null != writer){
  1009.           JobHistory.log(writer, RecordTypes.Job, 
  1010.               new Keys[] {Keys.JOBID, Keys.JOB_STATUS},
  1011.               new String[] {jobId.toString(),  
  1012.                             Values.RUNNING.name()}); 
  1013.         }
  1014.       }
  1015.     }
  1016.     
  1017.     /**
  1018.      * Log job finished. closes the job file in history. 
  1019.      * @param jobId job id, assigned by jobtracker. 
  1020.      * @param finishTime finish time of job in ms. 
  1021.      * @param finishedMaps no of maps successfully finished. 
  1022.      * @param finishedReduces no of reduces finished sucessfully. 
  1023.      * @param failedMaps no of failed map tasks. 
  1024.      * @param failedReduces no of failed reduce tasks. 
  1025.      * @param counters the counters from the job
  1026.      */ 
  1027.     public static void logFinished(JobID jobId, long finishTime, 
  1028.                                    int finishedMaps, int finishedReduces,
  1029.                                    int failedMaps, int failedReduces,
  1030.                                    Counters counters){
  1031.       if (!disableHistory){
  1032.         // close job file for this job
  1033.         String logFileKey =  JOBTRACKER_UNIQUE_STRING + jobId; 
  1034.         ArrayList<PrintWriter> writer = openJobs.get(logFileKey); 
  1035.         if (null != writer){
  1036.           JobHistory.log(writer, RecordTypes.Job,          
  1037.                          new Keys[] {Keys.JOBID, Keys.FINISH_TIME, 
  1038.                                      Keys.JOB_STATUS, Keys.FINISHED_MAPS, 
  1039.                                      Keys.FINISHED_REDUCES,
  1040.                                      Keys.FAILED_MAPS, Keys.FAILED_REDUCES,
  1041.                                      Keys.COUNTERS},
  1042.                          new String[] {jobId.toString(),  Long.toString(finishTime), 
  1043.                                        Values.SUCCESS.name(), 
  1044.                                        String.valueOf(finishedMaps), 
  1045.                                        String.valueOf(finishedReduces),
  1046.                                        String.valueOf(failedMaps), 
  1047.                                        String.valueOf(failedReduces),
  1048.                                        counters.makeEscapedCompactString()});
  1049.           for (PrintWriter out : writer) {
  1050.             out.close();
  1051.           }
  1052.           openJobs.remove(logFileKey); 
  1053.         }
  1054.         Thread historyCleaner  = new Thread(new HistoryCleaner());
  1055.         historyCleaner.start(); 
  1056.       }
  1057.     }
  1058.     /**
  1059.      * Logs job failed event. Closes the job history log file. 
  1060.      * @param jobid job id
  1061.      * @param timestamp time when job failure was detected in ms.  
  1062.      * @param finishedMaps no finished map tasks. 
  1063.      * @param finishedReduces no of finished reduce tasks. 
  1064.      */
  1065.     public static void logFailed(JobID jobid, long timestamp, int finishedMaps, int finishedReduces){
  1066.       if (!disableHistory){
  1067.         String logFileKey =  JOBTRACKER_UNIQUE_STRING + jobid; 
  1068.         ArrayList<PrintWriter> writer = openJobs.get(logFileKey); 
  1069.         if (null != writer){
  1070.           JobHistory.log(writer, RecordTypes.Job,
  1071.                          new Keys[] {Keys.JOBID, Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS, Keys.FINISHED_REDUCES },
  1072.                          new String[] {jobid.toString(),  String.valueOf(timestamp), Values.FAILED.name(), String.valueOf(finishedMaps), 
  1073.                                        String.valueOf(finishedReduces)}); 
  1074.           for (PrintWriter out : writer) {
  1075.             out.close();
  1076.           }
  1077.           openJobs.remove(logFileKey); 
  1078.         }
  1079.       }
  1080.     }
  1081.     /**
  1082.      * Logs job killed event. Closes the job history log file.
  1083.      * 
  1084.      * @param jobid
  1085.      *          job id
  1086.      * @param timestamp
  1087.      *          time when job killed was issued in ms.
  1088.      * @param finishedMaps
  1089.      *          no finished map tasks.
  1090.      * @param finishedReduces
  1091.      *          no of finished reduce tasks.
  1092.      */
  1093.     public static void logKilled(JobID jobid, long timestamp, int finishedMaps,
  1094.         int finishedReduces) {
  1095.       if (!disableHistory) {
  1096.         String logFileKey = JOBTRACKER_UNIQUE_STRING + jobid;
  1097.         ArrayList<PrintWriter> writer = openJobs.get(logFileKey);
  1098.         if (null != writer) {
  1099.           JobHistory.log(writer, RecordTypes.Job, new Keys[] { Keys.JOBID,
  1100.               Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS,
  1101.               Keys.FINISHED_REDUCES }, new String[] { jobid.toString(),
  1102.               String.valueOf(timestamp), Values.KILLED.name(),
  1103.               String.valueOf(finishedMaps), String.valueOf(finishedReduces) });
  1104.           for (PrintWriter out : writer) {
  1105.             out.close();
  1106.           }
  1107.           openJobs.remove(logFileKey);
  1108.         }
  1109.       }
  1110.     }
  1111.     /**
  1112.      * Log job's priority. 
  1113.      * @param jobid job id
  1114.      * @param priority Jobs priority 
  1115.      */
  1116.     public static void logJobPriority(JobID jobid, JobPriority priority){
  1117.       if (!disableHistory){
  1118.         String logFileKey =  JOBTRACKER_UNIQUE_STRING + jobid; 
  1119.         ArrayList<PrintWriter> writer = openJobs.get(logFileKey); 
  1120.         if (null != writer){
  1121.           JobHistory.log(writer, RecordTypes.Job,
  1122.                          new Keys[] {Keys.JOBID, Keys.JOB_PRIORITY},
  1123.                          new String[] {jobid.toString(), priority.toString()});
  1124.         }
  1125.       }
  1126.     }
  1127.     /**
  1128.      * Log job's submit-time/launch-time 
  1129.      * @param jobid job id
  1130.      * @param submitTime job's submit time
  1131.      * @param launchTime job's launch time
  1132.      * @param restartCount number of times the job got restarted
  1133.      * @deprecated Use {@link #logJobInfo(JobID, long, long)} instead.
  1134.      */
  1135.     public static void logJobInfo(JobID jobid, long submitTime, long launchTime,
  1136.                                   int restartCount){
  1137.       logJobInfo(jobid, submitTime, launchTime);
  1138.     }
  1139.     public static void logJobInfo(JobID jobid, long submitTime, long launchTime)
  1140.     {
  1141.       if (!disableHistory){
  1142.         String logFileKey =  JOBTRACKER_UNIQUE_STRING + jobid; 
  1143.         ArrayList<PrintWriter> writer = openJobs.get(logFileKey); 
  1144.         if (null != writer){
  1145.           JobHistory.log(writer, RecordTypes.Job,
  1146.                          new Keys[] {Keys.JOBID, Keys.SUBMIT_TIME, 
  1147.                                      Keys.LAUNCH_TIME},
  1148.                          new String[] {jobid.toString(), 
  1149.                                        String.valueOf(submitTime), 
  1150.                                        String.valueOf(launchTime)});
  1151.         }
  1152.       }
  1153.     }
  1154.   }
  1155.   
  1156.   /**
  1157.    * Helper class for logging or reading back events related to Task's start, finish or failure. 
  1158.    * All events logged by this class are logged in a separate file per job in 
  1159.    * job tracker history. These events map to TIPs in jobtracker. 
  1160.    */
  1161.   public static class Task extends KeyValuePair{
  1162.     private Map <String, TaskAttempt> taskAttempts = new TreeMap<String, TaskAttempt>(); 
  1163.     /**
  1164.      * Log start time of task (TIP).
  1165.      * @param taskId task id
  1166.      * @param taskType MAP or REDUCE
  1167.      * @param startTime startTime of tip. 
  1168.      */
  1169.     public static void logStarted(TaskID taskId, String taskType, 
  1170.                                   long startTime, String splitLocations) {
  1171.       if (!disableHistory){
  1172.         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
  1173.                                                      + taskId.getJobID()); 
  1174.         if (null != writer){
  1175.           JobHistory.log(writer, RecordTypes.Task, 
  1176.                          new Keys[]{Keys.TASKID, Keys.TASK_TYPE ,
  1177.                                     Keys.START_TIME, Keys.SPLITS}, 
  1178.                          new String[]{taskId.toString(), taskType,
  1179.                                       String.valueOf(startTime),
  1180.                                       splitLocations});
  1181.         }
  1182.       }
  1183.     }
  1184.     /**
  1185.      * Log finish time of task. 
  1186.      * @param taskId task id
  1187.      * @param taskType MAP or REDUCE
  1188.      * @param finishTime finish timeof task in ms
  1189.      */
  1190.     public static void logFinished(TaskID taskId, String taskType, 
  1191.                                    long finishTime, Counters counters){
  1192.       if (!disableHistory){
  1193.         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
  1194.                                                      + taskId.getJobID()); 
  1195.         if (null != writer){
  1196.           JobHistory.log(writer, RecordTypes.Task, 
  1197.                          new Keys[]{Keys.TASKID, Keys.TASK_TYPE, 
  1198.                                     Keys.TASK_STATUS, Keys.FINISH_TIME,
  1199.                                     Keys.COUNTERS}, 
  1200.                          new String[]{ taskId.toString(), taskType, Values.SUCCESS.name(), 
  1201.                                        String.valueOf(finishTime),
  1202.                                        counters.makeEscapedCompactString()});
  1203.         }
  1204.       }
  1205.     }
  1206.     /**
  1207.      * Update the finish time of task. 
  1208.      * @param taskId task id
  1209.      * @param finishTime finish time of task in ms
  1210.      */
  1211.     public static void logUpdates(TaskID taskId, long finishTime){
  1212.       if (!disableHistory){
  1213.         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
  1214.                                                      + taskId.getJobID()); 
  1215.         if (null != writer){
  1216.           JobHistory.log(writer, RecordTypes.Task, 
  1217.                          new Keys[]{Keys.TASKID, Keys.FINISH_TIME}, 
  1218.                          new String[]{ taskId.toString(), 
  1219.                                        String.valueOf(finishTime)});
  1220.         }
  1221.       }
  1222.     }
  1223.     /**
  1224.      * Log job failed event.
  1225.      * @param taskId task id
  1226.      * @param taskType MAP or REDUCE.
  1227.      * @param time timestamp when job failed detected. 
  1228.      * @param error error message for failure. 
  1229.      */
  1230.     public static void logFailed(TaskID taskId, String taskType, long time, String error){
  1231.       logFailed(taskId, taskType, time, error, null);
  1232.     }
  1233.     
  1234.     /**
  1235.      * @param failedDueToAttempt The attempt that caused the failure, if any
  1236.      */
  1237.     public static void logFailed(TaskID taskId, String taskType, long time,
  1238.                                  String error, 
  1239.                                  TaskAttemptID failedDueToAttempt){
  1240.       if (!disableHistory){
  1241.         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
  1242.                                                      + taskId.getJobID()); 
  1243.         if (null != writer){
  1244.           String failedAttempt = failedDueToAttempt == null
  1245.                                  ? ""
  1246.                                  : failedDueToAttempt.toString();
  1247.           JobHistory.log(writer, RecordTypes.Task, 
  1248.                          new Keys[]{Keys.TASKID, Keys.TASK_TYPE, 
  1249.                                     Keys.TASK_STATUS, Keys.FINISH_TIME, 
  1250.                                     Keys.ERROR, Keys.TASK_ATTEMPT_ID}, 
  1251.                          new String[]{ taskId.toString(),  taskType, 
  1252.                                       Values.FAILED.name(), 
  1253.                                       String.valueOf(time) , error, 
  1254.                                       failedAttempt});
  1255.         }
  1256.       }
  1257.     }
  1258.     /**
  1259.      * Returns all task attempts for this task. <task attempt id - TaskAttempt>
  1260.      */
  1261.     public Map<String, TaskAttempt> getTaskAttempts(){
  1262.       return this.taskAttempts;
  1263.     }
  1264.   }
  1265.   /**
  1266.    * Base class for Map and Reduce TaskAttempts. 
  1267.    */
  1268.   public static class TaskAttempt extends Task{} 
  1269.   /**
  1270.    * Helper class for logging or reading back events related to start, finish or failure of 
  1271.    * a Map Attempt on a node.
  1272.    */
  1273.   public static class MapAttempt extends TaskAttempt{
  1274.     /**
  1275.      * Log start time of this map task attempt. 
  1276.      * @param taskAttemptId task attempt id
  1277.      * @param startTime start time of task attempt as reported by task tracker. 
  1278.      * @param hostName host name of the task attempt. 
  1279.      * @deprecated Use 
  1280.      *             {@link #logStarted(TaskAttemptID, long, String, int, String)}
  1281.      */
  1282.     @Deprecated
  1283.     public static void logStarted(TaskAttemptID taskAttemptId, long startTime, String hostName){
  1284.       logStarted(taskAttemptId, startTime, hostName, -1, Values.MAP.name());
  1285.     }
  1286.     
  1287.     /**
  1288.      * Log start time of this map task attempt.
  1289.      *  
  1290.      * @param taskAttemptId task attempt id
  1291.      * @param startTime start time of task attempt as reported by task tracker. 
  1292.      * @param trackerName name of the tracker executing the task attempt.
  1293.      * @param httpPort http port of the task tracker executing the task attempt
  1294.      * @param taskType Whether the attempt is cleanup or setup or map 
  1295.      */
  1296.     public static void logStarted(TaskAttemptID taskAttemptId, long startTime,
  1297.                                   String trackerName, int httpPort, 
  1298.                                   String taskType) {
  1299.       if (!disableHistory){
  1300.         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
  1301.                                                    + taskAttemptId.getJobID()); 
  1302.         if (null != writer){
  1303.           JobHistory.log(writer, RecordTypes.MapAttempt, 
  1304.                          new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, 
  1305.                                      Keys.TASK_ATTEMPT_ID, Keys.START_TIME, 
  1306.                                      Keys.TRACKER_NAME, Keys.HTTP_PORT},
  1307.                          new String[]{taskType,
  1308.                                       taskAttemptId.getTaskID().toString(), 
  1309.                                       taskAttemptId.toString(), 
  1310.                                       String.valueOf(startTime), trackerName,
  1311.                                       httpPort == -1 ? "" : 
  1312.                                         String.valueOf(httpPort)}); 
  1313.         }
  1314.       }
  1315.     }
  1316.     
  1317.     /**
  1318.      * Log finish time of map task attempt. 
  1319.      * @param taskAttemptId task attempt id 
  1320.      * @param finishTime finish time
  1321.      * @param hostName host name 
  1322.      * @deprecated Use 
  1323.      * {@link #logFinished(TaskAttemptID, long, String, String, String, Counters)}
  1324.      */
  1325.     @Deprecated
  1326.     public static void logFinished(TaskAttemptID taskAttemptId, long finishTime, 
  1327.                                    String hostName){
  1328.       logFinished(taskAttemptId, finishTime, hostName, Values.MAP.name(), "", 
  1329.                   new Counters());
  1330.     }
  1331.     /**
  1332.      * Log finish time of map task attempt. 
  1333.      * 
  1334.      * @param taskAttemptId task attempt id 
  1335.      * @param finishTime finish time
  1336.      * @param hostName host name 
  1337.      * @param taskType Whether the attempt is cleanup or setup or map 
  1338.      * @param stateString state string of the task attempt
  1339.      * @param counter counters of the task attempt
  1340.      */
  1341.     public static void logFinished(TaskAttemptID taskAttemptId, 
  1342.                                    long finishTime, 
  1343.                                    String hostName,
  1344.                                    String taskType,
  1345.                                    String stateString, 
  1346.                                    Counters counter) {
  1347.       if (!disableHistory){
  1348.         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
  1349.                                                    + taskAttemptId.getJobID()); 
  1350.         if (null != writer){
  1351.           JobHistory.log(writer, RecordTypes.MapAttempt, 
  1352.                          new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, 
  1353.                                      Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
  1354.                                      Keys.FINISH_TIME, Keys.HOSTNAME, 
  1355.                                      Keys.STATE_STRING, Keys.COUNTERS},
  1356.                          new String[]{taskType, 
  1357.                                       taskAttemptId.getTaskID().toString(),
  1358.                                       taskAttemptId.toString(), 
  1359.                                       Values.SUCCESS.name(),  
  1360.                                       String.valueOf(finishTime), hostName, 
  1361.                                       stateString, 
  1362.                                       counter.makeEscapedCompactString()}); 
  1363.         }
  1364.       }
  1365.     }
  1366.     /**
  1367.      * Log task attempt failed event.  
  1368.      * @param taskAttemptId task attempt id
  1369.      * @param timestamp timestamp
  1370.      * @param hostName hostname of this task attempt.
  1371.      * @param error error message if any for this task attempt.
  1372.      * @deprecated Use
  1373.      * {@link #logFailed(TaskAttemptID, long, String, String, String)} 
  1374.      */
  1375.     @Deprecated
  1376.     public static void logFailed(TaskAttemptID taskAttemptId, 
  1377.                                  long timestamp, String hostName, 
  1378.                                  String error) {
  1379.       logFailed(taskAttemptId, timestamp, hostName, error, Values.MAP.name());
  1380.     }
  1381.     /**
  1382.      * Log task attempt failed event. 
  1383.      *  
  1384.      * @param taskAttemptId task attempt id
  1385.      * @param timestamp timestamp
  1386.      * @param hostName hostname of this task attempt.
  1387.      * @param error error message if any for this task attempt. 
  1388.      * @param taskType Whether the attempt is cleanup or setup or map 
  1389.      */
  1390.     public static void logFailed(TaskAttemptID taskAttemptId, 
  1391.                                  long timestamp, String hostName, 
  1392.                                  String error, String taskType) {
  1393.       if (!disableHistory){
  1394.         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
  1395.                                                    + taskAttemptId.getJobID()); 
  1396.         if (null != writer){
  1397.           JobHistory.log(writer, RecordTypes.MapAttempt, 
  1398.                          new Keys[]{Keys.TASK_TYPE, Keys.TASKID, 
  1399.                                     Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
  1400.                                     Keys.FINISH_TIME, Keys.HOSTNAME, Keys.ERROR},
  1401.                          new String[]{ taskType, 
  1402.                                        taskAttemptId.getTaskID().toString(),
  1403.                                        taskAttemptId.toString(), 
  1404.                                        Values.FAILED.name(),
  1405.                                        String.valueOf(timestamp), 
  1406.                                        hostName, error}); 
  1407.         }
  1408.       }
  1409.     }
  1410.     
  1411.     /**
  1412.      * Log task attempt killed event.  
  1413.      * @param taskAttemptId task attempt id
  1414.      * @param timestamp timestamp
  1415.      * @param hostName hostname of this task attempt.
  1416.      * @param error error message if any for this task attempt. 
  1417.      * @deprecated Use 
  1418.      * {@link #logKilled(TaskAttemptID, long, String, String, String)}
  1419.      */
  1420.     @Deprecated
  1421.     public static void logKilled(TaskAttemptID taskAttemptId, 
  1422.                                  long timestamp, String hostName, String error){
  1423.       logKilled(taskAttemptId, timestamp, hostName, error, Values.MAP.name());
  1424.     } 
  1425.     
  1426.     /**
  1427.      * Log task attempt killed event.  
  1428.      * 
  1429.      * @param taskAttemptId task attempt id
  1430.      * @param timestamp timestamp
  1431.      * @param hostName hostname of this task attempt.
  1432.      * @param error error message if any for this task attempt. 
  1433.      * @param taskType Whether the attempt is cleanup or setup or map 
  1434.      */
  1435.     public static void logKilled(TaskAttemptID taskAttemptId, 
  1436.                                  long timestamp, String hostName,
  1437.                                  String error, String taskType) {
  1438.       if (!disableHistory){
  1439.         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
  1440.                                                    + taskAttemptId.getJobID()); 
  1441.         if (null != writer){
  1442.           JobHistory.log(writer, RecordTypes.MapAttempt, 
  1443.                          new Keys[]{Keys.TASK_TYPE, Keys.TASKID,
  1444.                                     Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
  1445.                                     Keys.FINISH_TIME, Keys.HOSTNAME,
  1446.                                     Keys.ERROR},
  1447.                          new String[]{ taskType, 
  1448.                                        taskAttemptId.getTaskID().toString(), 
  1449.                                        taskAttemptId.toString(),
  1450.                                        Values.KILLED.name(),
  1451.                                        String.valueOf(timestamp), 
  1452.                                        hostName, error}); 
  1453.         }
  1454.       }
  1455.     } 
  1456.   }
  1457.   /**
  1458.    * Helper class for logging or reading back events related to start, finish or failure of 
  1459.    * a Map Attempt on a node.
  1460.    */
  1461.   public static class ReduceAttempt extends TaskAttempt{
  1462.     /**
  1463.      * Log start time of  Reduce task attempt. 
  1464.      * @param taskAttemptId task attempt id
  1465.      * @param startTime start time
  1466.      * @param hostName host name 
  1467.      * @deprecated Use 
  1468.      * {@link #logStarted(TaskAttemptID, long, String, int, String)}
  1469.      */
  1470.     @Deprecated
  1471.     public static void logStarted(TaskAttemptID taskAttemptId, 
  1472.                                   long startTime, String hostName){
  1473.       logStarted(taskAttemptId, startTime, hostName, -1, Values.REDUCE.name());
  1474.     }
  1475.     
  1476.     /**
  1477.      * Log start time of  Reduce task attempt. 
  1478.      * 
  1479.      * @param taskAttemptId task attempt id
  1480.      * @param startTime start time
  1481.      * @param trackerName tracker name 
  1482.      * @param httpPort the http port of the tracker executing the task attempt
  1483.      * @param taskType Whether the attempt is cleanup or setup or reduce 
  1484.      */
  1485.     public static void logStarted(TaskAttemptID taskAttemptId, 
  1486.                                   long startTime, String trackerName, 
  1487.                                   int httpPort, 
  1488.                                   String taskType) {
  1489.       if (!disableHistory){
  1490.         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
  1491.                                                    + taskAttemptId.getJobID()); 
  1492.         if (null != writer){
  1493.           JobHistory.log(writer, RecordTypes.ReduceAttempt, 
  1494.                          new Keys[]{  Keys.TASK_TYPE, Keys.TASKID, 
  1495.                                       Keys.TASK_ATTEMPT_ID, Keys.START_TIME,
  1496.                                       Keys.TRACKER_NAME, Keys.HTTP_PORT},
  1497.                          new String[]{taskType,
  1498.                                       taskAttemptId.getTaskID().toString(), 
  1499.                                       taskAttemptId.toString(), 
  1500.                                       String.valueOf(startTime), trackerName,
  1501.                                       httpPort == -1 ? "" : 
  1502.                                         String.valueOf(httpPort)}); 
  1503.         }
  1504.       }
  1505.     }
  1506.     
  1507.     /**
  1508.      * Log finished event of this task. 
  1509.      * @param taskAttemptId task attempt id
  1510.      * @param shuffleFinished shuffle finish time
  1511.      * @param sortFinished sort finish time
  1512.      * @param finishTime finish time of task
  1513.      * @param hostName host name where task attempt executed
  1514.      * @deprecated Use 
  1515.      * {@link #logFinished(TaskAttemptID, long, long, long, String, String, String, Counters)}
  1516.      */
  1517.     @Deprecated
  1518.     public static void logFinished(TaskAttemptID taskAttemptId, long shuffleFinished, 
  1519.                                    long sortFinished, long finishTime, 
  1520.                                    String hostName){
  1521.       logFinished(taskAttemptId, shuffleFinished, sortFinished, 
  1522.                   finishTime, hostName, Values.REDUCE.name(),
  1523.                   "", new Counters());
  1524.     }
  1525.     
  1526.     /**
  1527.      * Log finished event of this task. 
  1528.      * 
  1529.      * @param taskAttemptId task attempt id
  1530.      * @param shuffleFinished shuffle finish time
  1531.      * @param sortFinished sort finish time
  1532.      * @param finishTime finish time of task
  1533.      * @param hostName host name where task attempt executed
  1534.      * @param taskType Whether the attempt is cleanup or setup or reduce 
  1535.      * @param stateString the state string of the attempt
  1536.      * @param counter counters of the attempt
  1537.      */
  1538.     public static void logFinished(TaskAttemptID taskAttemptId, 
  1539.                                    long shuffleFinished, 
  1540.                                    long sortFinished, long finishTime, 
  1541.                                    String hostName, String taskType,
  1542.                                    String stateString, Counters counter) {
  1543.       if (!disableHistory){
  1544.         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
  1545.                                                    + taskAttemptId.getJobID()); 
  1546.         if (null != writer){
  1547.           JobHistory.log(writer, RecordTypes.ReduceAttempt, 
  1548.                          new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, 
  1549.                                      Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
  1550.                                      Keys.SHUFFLE_FINISHED, Keys.SORT_FINISHED,
  1551.                                      Keys.FINISH_TIME, Keys.HOSTNAME, 
  1552.                                      Keys.STATE_STRING, Keys.COUNTERS},
  1553.                          new String[]{taskType,
  1554.                                       taskAttemptId.getTaskID().toString(), 
  1555.                                       taskAttemptId.toString(), 
  1556.                                       Values.SUCCESS.name(), 
  1557.                                       String.valueOf(shuffleFinished), 
  1558.                                       String.valueOf(sortFinished),
  1559.                                       String.valueOf(finishTime), hostName,
  1560.                                       stateString, 
  1561.                                       counter.makeEscapedCompactString()}); 
  1562.         }
  1563.       }
  1564.     }
  1565.     
  1566.     /**
  1567.      * Log failed reduce task attempt. 
  1568.      * @param taskAttemptId task attempt id
  1569.      * @param timestamp time stamp when task failed
  1570.      * @param hostName host name of the task attempt.  
  1571.      * @param error error message of the task.
  1572.      * @deprecated Use 
  1573.      * {@link #logFailed(TaskAttemptID, long, String, String, String)} 
  1574.      */
  1575.     @Deprecated
  1576.     public static void logFailed(TaskAttemptID taskAttemptId, long timestamp, 
  1577.                                  String hostName, String error){
  1578.       logFailed(taskAttemptId, timestamp, hostName, error, Values.REDUCE.name());
  1579.     }
  1580.     
  1581.     /**
  1582.      * Log failed reduce task attempt.
  1583.      *  
  1584.      * @param taskAttemptId task attempt id
  1585.      * @param timestamp time stamp when task failed
  1586.      * @param hostName host name of the task attempt.  
  1587.      * @param error error message of the task. 
  1588.      * @param taskType Whether the attempt is cleanup or setup or reduce 
  1589.      */
  1590.     public static void logFailed(TaskAttemptID taskAttemptId, long timestamp, 
  1591.                                  String hostName, String error, 
  1592.                                  String taskType) {
  1593.       if (!disableHistory){
  1594.         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
  1595.                                                    + taskAttemptId.getJobID()); 
  1596.         if (null != writer){
  1597.           JobHistory.log(writer, RecordTypes.ReduceAttempt, 
  1598.                          new Keys[]{  Keys.TASK_TYPE, Keys.TASKID, 
  1599.                                       Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
  1600.                                       Keys.FINISH_TIME, Keys.HOSTNAME,
  1601.                                       Keys.ERROR },
  1602.                          new String[]{ taskType, 
  1603.                                        taskAttemptId.getTaskID().toString(), 
  1604.                                        taskAttemptId.toString(), 
  1605.                                        Values.FAILED.name(), 
  1606.                                        String.valueOf(timestamp), hostName, error }); 
  1607.         }
  1608.       }
  1609.     }
  1610.     
  1611.     /**
  1612.      * Log killed reduce task attempt. 
  1613.      * @param taskAttemptId task attempt id
  1614.      * @param timestamp time stamp when task failed
  1615.      * @param hostName host name of the task attempt.  
  1616.      * @param error error message of the task.
  1617.      * @deprecated Use 
  1618.      * {@link #logKilled(TaskAttemptID, long, String, String, String)} 
  1619.      */
  1620.     @Deprecated
  1621.     public static void logKilled(TaskAttemptID taskAttemptId, long timestamp, 
  1622.                                  String hostName, String error) {
  1623.       logKilled(taskAttemptId, timestamp, hostName, error, Values.REDUCE.name());
  1624.     }
  1625.     
  1626.     /**
  1627.      * Log killed reduce task attempt. 
  1628.      * 
  1629.      * @param taskAttemptId task attempt id
  1630.      * @param timestamp time stamp when task failed
  1631.      * @param hostName host name of the task attempt.  
  1632.      * @param error error message of the task. 
  1633.      * @param taskType Whether the attempt is cleanup or setup or reduce 
  1634.     */
  1635.     public static void logKilled(TaskAttemptID taskAttemptId, long timestamp, 
  1636.                                  String hostName, String error, 
  1637.                                  String taskType) {
  1638.       if (!disableHistory){
  1639.         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
  1640.                                                    + taskAttemptId.getJobID()); 
  1641.         if (null != writer){
  1642.           JobHistory.log(writer, RecordTypes.ReduceAttempt, 
  1643.                          new Keys[]{  Keys.TASK_TYPE, Keys.TASKID, 
  1644.                                       Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
  1645.                                       Keys.FINISH_TIME, Keys.HOSTNAME, 
  1646.                                       Keys.ERROR },
  1647.                          new String[]{ taskType,
  1648.                                        taskAttemptId.getTaskID().toString(), 
  1649.                                        taskAttemptId.toString(), 
  1650.                                        Values.KILLED.name(), 
  1651.                                        String.valueOf(timestamp), 
  1652.                                        hostName, error }); 
  1653.         }
  1654.       }
  1655.     }
  1656.   }
  1657.   /**
  1658.    * Callback interface for reading back log events from JobHistory. This interface 
  1659.    * should be implemented and passed to JobHistory.parseHistory() 
  1660.    *
  1661.    */
  1662.   public static interface Listener{
  1663.     /**
  1664.      * Callback method for history parser. 
  1665.      * @param recType type of record, which is the first entry in the line. 
  1666.      * @param values a map of key-value pairs as thry appear in history.
  1667.      * @throws IOException
  1668.      */
  1669.     public void handle(RecordTypes recType, Map<Keys, String> values) throws IOException; 
  1670.   }
  1671.   
  1672.   /**
  1673.    * Delete history files older than one month. Update master index and remove all 
  1674.    * jobs older than one month. Also if a job tracker has no jobs in last one month
  1675.    * remove reference to the job tracker. 
  1676.    *
  1677.    */
  1678.   public static class HistoryCleaner implements Runnable{
  1679.     static final long ONE_DAY_IN_MS = 24 * 60 * 60 * 1000L;
  1680.     static final long THIRTY_DAYS_IN_MS = 30 * ONE_DAY_IN_MS;
  1681.     private long now; 
  1682.     private static boolean isRunning = false; 
  1683.     private static long lastRan = 0; 
  1684.     /**
  1685.      * Cleans up history data. 
  1686.      */
  1687.     public void run(){
  1688.       if (isRunning){
  1689.         return; 
  1690.       }
  1691.       now = System.currentTimeMillis();
  1692.       // clean history only once a day at max
  1693.       if (lastRan != 0 && (now - lastRan) < ONE_DAY_IN_MS) {
  1694.         return; 
  1695.       }
  1696.       lastRan = now;  
  1697.       isRunning = true; 
  1698.       try {
  1699.         Path logDir = new Path(LOG_DIR);
  1700.         FileSystem fs = logDir.getFileSystem(jtConf);
  1701.         FileStatus[] historyFiles = fs.listStatus(logDir);
  1702.         // delete if older than 30 days
  1703.         if (historyFiles != null) {
  1704.           for (FileStatus f : historyFiles) {
  1705.             if (now - f.getModificationTime() > THIRTY_DAYS_IN_MS) {
  1706.               fs.delete(f.getPath(), true); 
  1707.               LOG.info("Deleting old history file : " + f.getPath());
  1708.             }
  1709.           }
  1710.         }
  1711.       } catch (IOException ie) {
  1712.         LOG.info("Error cleaning up history directory" + 
  1713.                  StringUtils.stringifyException(ie));
  1714.       }
  1715.       isRunning = false; 
  1716.     }
  1717.     
  1718.     static long getLastRan() {
  1719.       return lastRan;
  1720.     }
  1721.   }
  1722.   /**
  1723.    * Return the TaskLogsUrl of a particular TaskAttempt
  1724.    * 
  1725.    * @param attempt
  1726.    * @return the taskLogsUrl. null if http-port or tracker-name or
  1727.    *         task-attempt-id are unavailable.
  1728.    */
  1729.   public static String getTaskLogsUrl(JobHistory.TaskAttempt attempt) {
  1730.     if (attempt.get(Keys.HTTP_PORT).equals("")
  1731.         || attempt.get(Keys.TRACKER_NAME).equals("")
  1732.         || attempt.get(Keys.TASK_ATTEMPT_ID).equals("")) {
  1733.       return null;
  1734.     }
  1735.     String taskTrackerName =
  1736.       JobInProgress.convertTrackerNameToHostName(
  1737.         attempt.get(Keys.TRACKER_NAME));
  1738.     return TaskLogServlet.getTaskLogUrl(taskTrackerName, attempt
  1739.         .get(Keys.HTTP_PORT), attempt.get(Keys.TASK_ATTEMPT_ID));
  1740.   }
  1741. }