TaskLog.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:20k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.BufferedOutputStream;
  20. import java.io.BufferedReader;
  21. import java.io.DataOutputStream;
  22. import java.io.File;
  23. import java.io.FileFilter;
  24. import java.io.FileInputStream;
  25. import java.io.FileOutputStream;
  26. import java.io.IOException;
  27. import java.io.InputStream;
  28. import java.util.ArrayList;
  29. import java.util.Enumeration;
  30. import java.util.List;
  31. import org.apache.commons.logging.Log;
  32. import org.apache.commons.logging.LogFactory;
  33. import org.apache.hadoop.conf.Configuration;
  34. import org.apache.hadoop.fs.FileSystem;
  35. import org.apache.hadoop.fs.LocalFileSystem;
  36. import org.apache.hadoop.fs.FileUtil;
  37. import org.apache.hadoop.fs.Path;
  38. import org.apache.log4j.Appender;
  39. import org.apache.log4j.LogManager;
  40. import org.apache.log4j.Logger;
  41. /**
  42.  * A simple logger to handle the task-specific user logs.
  43.  * This class uses the system property <code>hadoop.log.dir</code>.
  44.  * 
  45.  */
  46. public class TaskLog {
  47.   private static final Log LOG =
  48.     LogFactory.getLog(TaskLog.class.getName());
  49.   private static final File LOG_DIR = 
  50.     new File(System.getProperty("hadoop.log.dir"), 
  51.              "userlogs").getAbsoluteFile();
  52.   
  53.   static LocalFileSystem localFS = null;
  54.   static {
  55.     try {
  56.       localFS = FileSystem.getLocal(new Configuration());
  57.     } catch (IOException ioe) {
  58.       LOG.warn("Getting local file system failed.");
  59.     }
  60.     if (!LOG_DIR.exists()) {
  61.       LOG_DIR.mkdirs();
  62.     }
  63.   }
  64.   public static File getTaskLogFile(TaskAttemptID taskid, LogName filter) {
  65.     return new File(getBaseDir(taskid.toString()), filter.toString());
  66.   }
  67.   public static File getRealTaskLogFileLocation(TaskAttemptID taskid, 
  68.       LogName filter) {
  69.     LogFileDetail l;
  70.     try {
  71.       l = getTaskLogFileDetail(taskid, filter);
  72.     } catch (IOException ie) {
  73.       LOG.error("getTaskLogFileDetail threw an exception " + ie);
  74.       return null;
  75.     }
  76.     return new File(getBaseDir(l.location), filter.toString());
  77.   }
  78.   private static class LogFileDetail {
  79.     final static String LOCATION = "LOG_DIR:";
  80.     String location;
  81.     long start;
  82.     long length;
  83.   }
  84.   
  85.   private static LogFileDetail getTaskLogFileDetail(TaskAttemptID taskid,
  86.       LogName filter) throws IOException {
  87.     return getLogFileDetail(taskid, filter, false);
  88.   }
  89.   
  90.   private static LogFileDetail getLogFileDetail(TaskAttemptID taskid, 
  91.                                                 LogName filter,
  92.                                                 boolean isCleanup) 
  93.   throws IOException {
  94.     File indexFile = getIndexFile(taskid.toString(), isCleanup);
  95.     BufferedReader fis = new BufferedReader(new java.io.FileReader(indexFile));
  96.     //the format of the index file is
  97.     //LOG_DIR: <the dir where the task logs are really stored>
  98.     //stdout:<start-offset in the stdout file> <length>
  99.     //stderr:<start-offset in the stderr file> <length>
  100.     //syslog:<start-offset in the syslog file> <length>
  101.     LogFileDetail l = new LogFileDetail();
  102.     String str = fis.readLine();
  103.     if (str == null) { //the file doesn't have anything
  104.       throw new IOException ("Index file for the log of " + taskid+" doesn't exist.");
  105.     }
  106.     l.location = str.substring(str.indexOf(LogFileDetail.LOCATION)+
  107.         LogFileDetail.LOCATION.length());
  108.     //special cases are the debugout and profile.out files. They are guaranteed
  109.     //to be associated with each task attempt since jvm reuse is disabled
  110.     //when profiling/debugging is enabled
  111.     if (filter.equals(LogName.DEBUGOUT) || filter.equals(LogName.PROFILE)) {
  112.       l.length = new File(getBaseDir(l.location), filter.toString()).length();
  113.       l.start = 0;
  114.       fis.close();
  115.       return l;
  116.     }
  117.     str = fis.readLine();
  118.     while (str != null) {
  119.       //look for the exact line containing the logname
  120.       if (str.contains(filter.toString())) {
  121.         str = str.substring(filter.toString().length()+1);
  122.         String[] startAndLen = str.split(" ");
  123.         l.start = Long.parseLong(startAndLen[0]);
  124.         l.length = Long.parseLong(startAndLen[1]);
  125.         break;
  126.       }
  127.       str = fis.readLine();
  128.     }
  129.     fis.close();
  130.     return l;
  131.   }
  132.   
  133.   private static File getTmpIndexFile(String taskid) {
  134.     return new File(getBaseDir(taskid), "log.tmp");
  135.   }
  136.   public static File getIndexFile(String taskid) {
  137.     return getIndexFile(taskid, false);
  138.   }
  139.   
  140.   public static File getIndexFile(String taskid, boolean isCleanup) {
  141.     if (isCleanup) {
  142.       return new File(getBaseDir(taskid), "log.index.cleanup");
  143.     } else {
  144.       return new File(getBaseDir(taskid), "log.index");
  145.     }
  146.   }
  147.   
  148.   private static File getBaseDir(String taskid) {
  149.     return new File(LOG_DIR, taskid);
  150.   }
  151.   private static long prevOutLength;
  152.   private static long prevErrLength;
  153.   private static long prevLogLength;
  154.   
  155.   private static void writeToIndexFile(TaskAttemptID firstTaskid,
  156.                                        boolean isCleanup) 
  157.   throws IOException {
  158.     // To ensure atomicity of updates to index file, write to temporary index
  159.     // file first and then rename.
  160.     File tmpIndexFile = getTmpIndexFile(currentTaskid.toString());
  161.     
  162.     BufferedOutputStream bos = 
  163.       new BufferedOutputStream(new FileOutputStream(tmpIndexFile,false));
  164.     DataOutputStream dos = new DataOutputStream(bos);
  165.     //the format of the index file is
  166.     //LOG_DIR: <the dir where the task logs are really stored>
  167.     //STDOUT: <start-offset in the stdout file> <length>
  168.     //STDERR: <start-offset in the stderr file> <length>
  169.     //SYSLOG: <start-offset in the syslog file> <length>    
  170.     dos.writeBytes(LogFileDetail.LOCATION + firstTaskid.toString()+"n"+
  171.         LogName.STDOUT.toString()+":");
  172.     dos.writeBytes(Long.toString(prevOutLength)+" ");
  173.     dos.writeBytes(Long.toString(getTaskLogFile(firstTaskid, LogName.STDOUT)
  174.         .length() - prevOutLength)+"n"+LogName.STDERR+":");
  175.     dos.writeBytes(Long.toString(prevErrLength)+" ");
  176.     dos.writeBytes(Long.toString(getTaskLogFile(firstTaskid, LogName.STDERR)
  177.         .length() - prevErrLength)+"n"+LogName.SYSLOG.toString()+":");
  178.     dos.writeBytes(Long.toString(prevLogLength)+" ");
  179.     dos.writeBytes(Long.toString(getTaskLogFile(firstTaskid, LogName.SYSLOG)
  180.         .length() - prevLogLength)+"n");
  181.     dos.close();
  182.     File indexFile = getIndexFile(currentTaskid.toString(), isCleanup);
  183.     Path indexFilePath = new Path(indexFile.getAbsolutePath());
  184.     Path tmpIndexFilePath = new Path(tmpIndexFile.getAbsolutePath());
  185.     localFS.rename (tmpIndexFilePath, indexFilePath);
  186.   }
  187.   private static void resetPrevLengths(TaskAttemptID firstTaskid) {
  188.     prevOutLength = getTaskLogFile(firstTaskid, LogName.STDOUT).length();
  189.     prevErrLength = getTaskLogFile(firstTaskid, LogName.STDERR).length();
  190.     prevLogLength = getTaskLogFile(firstTaskid, LogName.SYSLOG).length();
  191.   }
  192.   private volatile static TaskAttemptID currentTaskid = null;
  193.   public synchronized static void syncLogs(TaskAttemptID firstTaskid, 
  194.                                            TaskAttemptID taskid) 
  195.   throws IOException {
  196.     syncLogs(firstTaskid, taskid, false);
  197.   }
  198.   
  199.   @SuppressWarnings("unchecked")
  200.   public synchronized static void syncLogs(TaskAttemptID firstTaskid, 
  201.                                            TaskAttemptID taskid,
  202.                                            boolean isCleanup) 
  203.   throws IOException {
  204.     System.out.flush();
  205.     System.err.flush();
  206.     Enumeration<Logger> allLoggers = LogManager.getCurrentLoggers();
  207.     while (allLoggers.hasMoreElements()) {
  208.       Logger l = allLoggers.nextElement();
  209.       Enumeration<Appender> allAppenders = l.getAllAppenders();
  210.       while (allAppenders.hasMoreElements()) {
  211.         Appender a = allAppenders.nextElement();
  212.         if (a instanceof TaskLogAppender) {
  213.           ((TaskLogAppender)a).flush();
  214.         }
  215.       }
  216.     }
  217.     if (currentTaskid != taskid) {
  218.       currentTaskid = taskid;
  219.       resetPrevLengths(firstTaskid);
  220.     }
  221.     writeToIndexFile(firstTaskid, isCleanup);
  222.   }
  223.   
  224.   /**
  225.    * The filter for userlogs.
  226.    */
  227.   public static enum LogName {
  228.     /** Log on the stdout of the task. */
  229.     STDOUT ("stdout"),
  230.     /** Log on the stderr of the task. */
  231.     STDERR ("stderr"),
  232.     
  233.     /** Log on the map-reduce system logs of the task. */
  234.     SYSLOG ("syslog"),
  235.     
  236.     /** The java profiler information. */
  237.     PROFILE ("profile.out"),
  238.     
  239.     /** Log the debug script's stdout  */
  240.     DEBUGOUT ("debugout");
  241.         
  242.     private String prefix;
  243.     
  244.     private LogName(String prefix) {
  245.       this.prefix = prefix;
  246.     }
  247.     
  248.     @Override
  249.     public String toString() {
  250.       return prefix;
  251.     }
  252.   }
  253.   private static class TaskLogsPurgeFilter implements FileFilter {
  254.     long purgeTimeStamp;
  255.   
  256.     TaskLogsPurgeFilter(long purgeTimeStamp) {
  257.       this.purgeTimeStamp = purgeTimeStamp;
  258.     }
  259.     public boolean accept(File file) {
  260.       LOG.debug("PurgeFilter - file: " + file + ", mtime: " + file.lastModified() + ", purge: " + purgeTimeStamp);
  261.       return file.lastModified() < purgeTimeStamp;
  262.     }
  263.   }
  264.   /**
  265.    * Purge old user logs.
  266.    * 
  267.    * @throws IOException
  268.    */
  269.   public static synchronized void cleanup(int logsRetainHours
  270.                                           ) throws IOException {
  271.     // Purge logs of tasks on this tasktracker if their  
  272.     // mtime has exceeded "mapred.task.log.retain" hours
  273.     long purgeTimeStamp = System.currentTimeMillis() - 
  274.                             (logsRetainHours*60L*60*1000);
  275.     File[] oldTaskLogs = LOG_DIR.listFiles
  276.                            (new TaskLogsPurgeFilter(purgeTimeStamp));
  277.     if (oldTaskLogs != null) {
  278.       for (int i=0; i < oldTaskLogs.length; ++i) {
  279.         FileUtil.fullyDelete(oldTaskLogs[i]);
  280.       }
  281.     }
  282.   }
  283.   static class Reader extends InputStream {
  284.     private long bytesRemaining;
  285.     private FileInputStream file;
  286.     public Reader(TaskAttemptID taskid, LogName kind, 
  287.                   long start, long end) throws IOException {
  288.       this(taskid, kind, start, end, false);
  289.     }
  290.     
  291.     /**
  292.      * Read a log file from start to end positions. The offsets may be negative,
  293.      * in which case they are relative to the end of the file. For example,
  294.      * Reader(taskid, kind, 0, -1) is the entire file and 
  295.      * Reader(taskid, kind, -4197, -1) is the last 4196 bytes. 
  296.      * @param taskid the id of the task to read the log file for
  297.      * @param kind the kind of log to read
  298.      * @param start the offset to read from (negative is relative to tail)
  299.      * @param end the offset to read upto (negative is relative to tail)
  300.      * @param isCleanup whether the attempt is cleanup attempt or not
  301.      * @throws IOException
  302.      */
  303.     public Reader(TaskAttemptID taskid, LogName kind, 
  304.                   long start, long end, boolean isCleanup) throws IOException {
  305.       // find the right log file
  306.       LogFileDetail fileDetail = getLogFileDetail(taskid, kind, isCleanup);
  307.       // calculate the start and stop
  308.       long size = fileDetail.length;
  309.       if (start < 0) {
  310.         start += size + 1;
  311.       }
  312.       if (end < 0) {
  313.         end += size + 1;
  314.       }
  315.       start = Math.max(0, Math.min(start, size));
  316.       end = Math.max(0, Math.min(end, size));
  317.       start += fileDetail.start;
  318.       end += fileDetail.start;
  319.       bytesRemaining = end - start;
  320.       file = new FileInputStream(new File(getBaseDir(fileDetail.location), 
  321.           kind.toString()));
  322.       // skip upto start
  323.       long pos = 0;
  324.       while (pos < start) {
  325.         long result = file.skip(start - pos);
  326.         if (result < 0) {
  327.           bytesRemaining = 0;
  328.           break;
  329.         }
  330.         pos += result;
  331.       }
  332.     }
  333.     
  334.     @Override
  335.     public int read() throws IOException {
  336.       int result = -1;
  337.       if (bytesRemaining > 0) {
  338.         bytesRemaining -= 1;
  339.         result = file.read();
  340.       }
  341.       return result;
  342.     }
  343.     
  344.     @Override
  345.     public int read(byte[] buffer, int offset, int length) throws IOException {
  346.       length = (int) Math.min(length, bytesRemaining);
  347.       int bytes = file.read(buffer, offset, length);
  348.       if (bytes > 0) {
  349.         bytesRemaining -= bytes;
  350.       }
  351.       return bytes;
  352.     }
  353.     
  354.     @Override
  355.     public int available() throws IOException {
  356.       return (int) Math.min(bytesRemaining, file.available());
  357.     }
  358.     @Override
  359.     public void close() throws IOException {
  360.       file.close();
  361.     }
  362.   }
  363.   private static final String bashCommand = "bash";
  364.   private static final String tailCommand = "tail";
  365.   
  366.   /**
  367.    * Get the desired maximum length of task's logs.
  368.    * @param conf the job to look in
  369.    * @return the number of bytes to cap the log files at
  370.    */
  371.   public static long getTaskLogLength(JobConf conf) {
  372.     return conf.getLong("mapred.userlog.limit.kb", 100) * 1024;
  373.   }
  374.   /**
  375.    * Wrap a command in a shell to capture stdout and stderr to files.
  376.    * If the tailLength is 0, the entire output will be saved.
  377.    * @param cmd The command and the arguments that should be run
  378.    * @param stdoutFilename The filename that stdout should be saved to
  379.    * @param stderrFilename The filename that stderr should be saved to
  380.    * @param tailLength The length of the tail to be saved.
  381.    * @return the modified command that should be run
  382.    */
  383.   public static List<String> captureOutAndError(List<String> cmd, 
  384.                                                 File stdoutFilename,
  385.                                                 File stderrFilename,
  386.                                                 long tailLength
  387.                                                ) throws IOException {
  388.     return captureOutAndError(null, cmd, stdoutFilename,
  389.                               stderrFilename, tailLength, null );
  390.   }
  391.   /**
  392.    * Wrap a command in a shell to capture stdout and stderr to files.
  393.    * Setup commands such as setting memory limit can be passed which 
  394.    * will be executed before exec.
  395.    * If the tailLength is 0, the entire output will be saved.
  396.    * @param setup The setup commands for the execed process.
  397.    * @param cmd The command and the arguments that should be run
  398.    * @param stdoutFilename The filename that stdout should be saved to
  399.    * @param stderrFilename The filename that stderr should be saved to
  400.    * @param tailLength The length of the tail to be saved.
  401.    * @return the modified command that should be run
  402.    */
  403.   public static List<String> captureOutAndError(List<String> setup,
  404.                                                 List<String> cmd, 
  405.                                                 File stdoutFilename,
  406.                                                 File stderrFilename,
  407.                                                 long tailLength
  408.                                                ) throws IOException {
  409.     return captureOutAndError(setup, cmd, stdoutFilename, stderrFilename,
  410.         tailLength, null);
  411.   }
  412.   /**
  413.    * Wrap a command in a shell to capture stdout and stderr to files.
  414.    * Setup commands such as setting memory limit can be passed which 
  415.    * will be executed before exec.
  416.    * If the tailLength is 0, the entire output will be saved.
  417.    * @param setup The setup commands for the execed process.
  418.    * @param cmd The command and the arguments that should be run
  419.    * @param stdoutFilename The filename that stdout should be saved to
  420.    * @param stderrFilename The filename that stderr should be saved to
  421.    * @param tailLength The length of the tail to be saved.
  422.    * @param pidFileName The name of the pid-file
  423.    * @return the modified command that should be run
  424.    */
  425.   public static List<String> captureOutAndError(List<String> setup,
  426.                                                 List<String> cmd, 
  427.                                                 File stdoutFilename,
  428.                                                 File stderrFilename,
  429.                                                 long tailLength,
  430.                                                 String pidFileName
  431.                                                ) throws IOException {
  432.     String stdout = FileUtil.makeShellPath(stdoutFilename);
  433.     String stderr = FileUtil.makeShellPath(stderrFilename);
  434.     List<String> result = new ArrayList<String>(3);
  435.     result.add(bashCommand);
  436.     result.add("-c");
  437.     StringBuffer mergedCmd = new StringBuffer();
  438.     
  439.     // Spit out the pid to pidFileName
  440.     if (pidFileName != null) {
  441.       mergedCmd.append("echo $$ > ");
  442.       mergedCmd.append(pidFileName);
  443.       mergedCmd.append(" ;");
  444.     }
  445.     if (setup != null && setup.size() > 0) {
  446.       mergedCmd.append(addCommand(setup, false));
  447.       mergedCmd.append(";");
  448.     }
  449.     if (tailLength > 0) {
  450.       mergedCmd.append("(");
  451.     } else {
  452.       mergedCmd.append("exec ");
  453.     }
  454.     mergedCmd.append(addCommand(cmd, true));
  455.     mergedCmd.append(" < /dev/null ");
  456.     if (tailLength > 0) {
  457.       mergedCmd.append(" | ");
  458.       mergedCmd.append(tailCommand);
  459.       mergedCmd.append(" -c ");
  460.       mergedCmd.append(tailLength);
  461.       mergedCmd.append(" >> ");
  462.       mergedCmd.append(stdout);
  463.       mergedCmd.append(" ; exit $PIPESTATUS ) 2>&1 | ");
  464.       mergedCmd.append(tailCommand);
  465.       mergedCmd.append(" -c ");
  466.       mergedCmd.append(tailLength);
  467.       mergedCmd.append(" >> ");
  468.       mergedCmd.append(stderr);
  469.       mergedCmd.append(" ; exit $PIPESTATUS");
  470.     } else {
  471.       mergedCmd.append(" 1>> ");
  472.       mergedCmd.append(stdout);
  473.       mergedCmd.append(" 2>> ");
  474.       mergedCmd.append(stderr);
  475.     }
  476.     result.add(mergedCmd.toString());
  477.     return result;
  478.   }
  479.   /**
  480.    * Add quotes to each of the command strings and
  481.    * return as a single string 
  482.    * @param cmd The command to be quoted
  483.    * @param isExecutable makes shell path if the first 
  484.    * argument is executable
  485.    * @return returns The quoted string. 
  486.    * @throws IOException
  487.    */
  488.   public static String addCommand(List<String> cmd, boolean isExecutable) 
  489.   throws IOException {
  490.     StringBuffer command = new StringBuffer();
  491.     for(String s: cmd) {
  492.      command.append(''');
  493.       if (isExecutable) {
  494.         // the executable name needs to be expressed as a shell path for the  
  495.         // shell to find it.
  496.        command.append(FileUtil.makeShellPath(new File(s)));
  497.         isExecutable = false; 
  498.       } else {
  499.        command.append(s);
  500.       }
  501.       command.append(''');
  502.       command.append(" ");
  503.     }
  504.     return command.toString();
  505.   }
  506.   
  507.   /**
  508.    * Wrap a command in a shell to capture debug script's 
  509.    * stdout and stderr to debugout.
  510.    * @param cmd The command and the arguments that should be run
  511.    * @param debugoutFilename The filename that stdout and stderr
  512.    *  should be saved to.
  513.    * @return the modified command that should be run
  514.    * @throws IOException
  515.    */
  516.   public static List<String> captureDebugOut(List<String> cmd, 
  517.                                              File debugoutFilename
  518.                                             ) throws IOException {
  519.     String debugout = FileUtil.makeShellPath(debugoutFilename);
  520.     List<String> result = new ArrayList<String>(3);
  521.     result.add(bashCommand);
  522.     result.add("-c");
  523.     StringBuffer mergedCmd = new StringBuffer();
  524.     mergedCmd.append("exec ");
  525.     boolean isExecutable = true;
  526.     for(String s: cmd) {
  527.       if (isExecutable) {
  528.         // the executable name needs to be expressed as a shell path for the  
  529.         // shell to find it.
  530.         mergedCmd.append(FileUtil.makeShellPath(new File(s)));
  531.         isExecutable = false; 
  532.       } else {
  533.         mergedCmd.append(s);
  534.       }
  535.       mergedCmd.append(" ");
  536.     }
  537.     mergedCmd.append(" < /dev/null ");
  538.     mergedCmd.append(" >");
  539.     mergedCmd.append(debugout);
  540.     mergedCmd.append(" 2>&1 ");
  541.     result.add(mergedCmd.toString());
  542.     return result;
  543.   }
  544.   
  545. } // TaskLog