Logalyzer.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:10k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.tools;
  19. import java.io.ByteArrayInputStream;
  20. import java.io.DataInputStream;
  21. import java.io.IOException;
  22. import java.util.Random;
  23. import java.util.regex.Matcher;
  24. import java.util.regex.Pattern;
  25. import org.apache.commons.logging.Log;
  26. import org.apache.commons.logging.LogFactory;
  27. import org.apache.hadoop.conf.Configurable;
  28. import org.apache.hadoop.conf.Configuration;
  29. import org.apache.hadoop.fs.FileSystem;
  30. import org.apache.hadoop.fs.Path;
  31. import org.apache.hadoop.io.LongWritable;
  32. import org.apache.hadoop.io.Text;
  33. import org.apache.hadoop.io.WritableComparable;
  34. import org.apache.hadoop.io.WritableComparator;
  35. import org.apache.hadoop.mapred.FileInputFormat;
  36. import org.apache.hadoop.mapred.FileOutputFormat;
  37. import org.apache.hadoop.mapred.JobClient;
  38. import org.apache.hadoop.mapred.JobConf;
  39. import org.apache.hadoop.mapred.MapReduceBase;
  40. import org.apache.hadoop.mapred.Mapper;
  41. import org.apache.hadoop.mapred.OutputCollector;
  42. import org.apache.hadoop.mapred.Reporter;
  43. import org.apache.hadoop.mapred.TextInputFormat;
  44. import org.apache.hadoop.mapred.TextOutputFormat;
  45. import org.apache.hadoop.mapred.lib.LongSumReducer;
  46. /**
  47.  * Logalyzer: A utility tool for archiving and analyzing hadoop logs.
  48.  * <p>
  49.  * This tool supports archiving and anaylzing (sort/grep) of log-files.
  50.  * It takes as input
  51.  *  a) Input uri which will serve uris of the logs to be archived.
  52.  *  b) Output directory (not mandatory).
  53.  *  b) Directory on dfs to archive the logs. 
  54.  *  c) The sort/grep patterns for analyzing the files and separator for boundaries.
  55.  * Usage: 
  56.  * Logalyzer -archive -archiveDir <directory to archive logs> -analysis <directory> -logs <log-list uri> -grep <pattern> -sort <col1, col2> -separator <separator>   
  57.  * <p>
  58.  */
  59. public class Logalyzer {
  60.   // Constants
  61.   private static Configuration fsConfig = new Configuration();
  62.   
  63.   /** A {@link Mapper} that extracts text matching a regular expression. */
  64.   public static class LogRegexMapper<K extends WritableComparable>
  65.     extends MapReduceBase
  66.     implements Mapper<K, Text, Text, LongWritable> {
  67.     
  68.     private Pattern pattern;
  69.     
  70.     public void configure(JobConf job) {
  71.       pattern = Pattern.compile(job.get("mapred.mapper.regex"));
  72.     }
  73.     
  74.     public void map(K key, Text value,
  75.                     OutputCollector<Text, LongWritable> output,
  76.                     Reporter reporter)
  77.       throws IOException {
  78.       String text = value.toString();
  79.       Matcher matcher = pattern.matcher(text);
  80.       while (matcher.find()) {
  81.         output.collect(value, new LongWritable(1));
  82.       }
  83.     }
  84.     
  85.   }
  86.   
  87.   /** A WritableComparator optimized for UTF8 keys of the logs. */
  88.   public static class LogComparator extends Text.Comparator implements Configurable {
  89.     
  90.     private static Log LOG = LogFactory.getLog(Logalyzer.class);
  91.     private JobConf conf = null;
  92.     private String[] sortSpec = null;
  93.     private String columnSeparator = null;
  94.     
  95.     public void setConf(Configuration conf) {
  96.       if (conf instanceof JobConf) {
  97.         this.conf = (JobConf) conf;
  98.       } else {
  99.         this.conf = new JobConf(conf);
  100.       }
  101.       
  102.       //Initialize the specification for *comparision*
  103.       String sortColumns = this.conf.get("mapred.reducer.sort", null);
  104.       if (sortColumns != null) {
  105.         sortSpec = sortColumns.split(",");
  106.       }
  107.       
  108.       //Column-separator
  109.       columnSeparator = this.conf.get("mapred.reducer.separator", "");
  110.     }
  111.     
  112.     public Configuration getConf() {
  113.       return conf;
  114.     }
  115.     
  116.     public int compare(byte[] b1, int s1, int l1,
  117.                        byte[] b2, int s2, int l2) {
  118.       
  119.       if (sortSpec == null) {
  120.         return super.compare(b1, s1, l1, b2, s2, l2);
  121.       }
  122.       
  123.       try {
  124.         Text logline1 = new Text(); 
  125.         logline1.readFields(new DataInputStream(new ByteArrayInputStream(b1, s1, l1)));
  126.         String line1 = logline1.toString();
  127.         String[] logColumns1 = line1.split(columnSeparator);
  128.         
  129.         Text logline2 = new Text(); 
  130.         logline2.readFields(new DataInputStream(new ByteArrayInputStream(b2, s2, l2)));
  131.         String line2 = logline2.toString();
  132.         String[] logColumns2 = line2.split(columnSeparator);
  133.         
  134.         if (logColumns1 == null || logColumns2 == null) {
  135.           return super.compare(b1, s1, l1, b2, s2, l2);
  136.         }
  137.         
  138.         //Compare column-wise according to *sortSpec*
  139.         for(int i=0; i < sortSpec.length; ++i) {
  140.           int column = (Integer.valueOf(sortSpec[i]).intValue());
  141.           String c1 = logColumns1[column]; 
  142.           String c2 = logColumns2[column];
  143.           
  144.           //Compare columns
  145.           int comparision = super.compareBytes(
  146.                                                c1.getBytes(), 0, c1.length(),
  147.                                                c2.getBytes(), 0, c2.length()
  148.                                                );
  149.           
  150.           //They differ!
  151.           if (comparision != 0) {
  152.             return comparision;
  153.           }
  154.         }
  155.         
  156.       } catch (IOException ioe) {
  157.         LOG.fatal("Caught " + ioe);
  158.         return 0;
  159.       }
  160.       
  161.       return 0;
  162.     }
  163.     
  164.     static {                                        
  165.       // register this comparator
  166.       WritableComparator.define(Text.class, new LogComparator());
  167.     }
  168.   }
  169.   
  170.   /**
  171.    * doArchive: Workhorse function to archive log-files.
  172.    * @param logListURI : The uri which will serve list of log-files to archive.
  173.    * @param archiveDirectory : The directory to store archived logfiles.
  174.    * @throws IOException
  175.    */
  176.   public void
  177.     doArchive(String logListURI, String archiveDirectory)
  178.     throws IOException
  179.   {
  180.     String destURL = FileSystem.getDefaultUri(fsConfig) + archiveDirectory;
  181.     DistCp.copy(new JobConf(fsConfig), logListURI, destURL, null, true, false);
  182.   }
  183.   
  184.   /**
  185.    * doAnalyze: 
  186.    * @param inputFilesDirectory : Directory containing the files to be analyzed.
  187.    * @param outputDirectory : Directory to store analysis (output).
  188.    * @param grepPattern : Pattern to *grep* for.
  189.    * @param sortColumns : Sort specification for output.
  190.    * @param columnSeparator : Column separator.
  191.    * @throws IOException
  192.    */
  193.   public void
  194.     doAnalyze(String inputFilesDirectory, String outputDirectory,
  195.               String grepPattern, String sortColumns, String columnSeparator)
  196.     throws IOException
  197.   {
  198.     Path grepInput = new Path(inputFilesDirectory);
  199.     
  200.     Path analysisOutput = null;
  201.     if (outputDirectory.equals("")) {
  202.       analysisOutput =  new Path(inputFilesDirectory, "logalyzer_" + 
  203.                                  Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
  204.     } else {
  205.       analysisOutput = new Path(outputDirectory);
  206.     }
  207.     
  208.     JobConf grepJob = new JobConf(fsConfig);
  209.     grepJob.setJobName("logalyzer-grep-sort");
  210.     
  211.     FileInputFormat.setInputPaths(grepJob, grepInput);
  212.     grepJob.setInputFormat(TextInputFormat.class);
  213.     
  214.     grepJob.setMapperClass(LogRegexMapper.class);
  215.     grepJob.set("mapred.mapper.regex", grepPattern);
  216.     grepJob.set("mapred.reducer.sort", sortColumns);
  217.     grepJob.set("mapred.reducer.separator", columnSeparator);
  218.     
  219.     grepJob.setCombinerClass(LongSumReducer.class);
  220.     grepJob.setReducerClass(LongSumReducer.class);
  221.     
  222.     FileOutputFormat.setOutputPath(grepJob, analysisOutput);
  223.     grepJob.setOutputFormat(TextOutputFormat.class);
  224.     grepJob.setOutputKeyClass(Text.class);
  225.     grepJob.setOutputValueClass(LongWritable.class);
  226.     grepJob.setOutputKeyComparatorClass(LogComparator.class);
  227.     
  228.     grepJob.setNumReduceTasks(1);                 // write a single file
  229.     
  230.     JobClient.runJob(grepJob);
  231.   }
  232.   
  233.   public static void main(String[] args) {
  234.     
  235.     Log LOG = LogFactory.getLog(Logalyzer.class);
  236.     
  237.     String version = "Logalyzer.0.0.1";
  238.     String usage = "Usage: Logalyzer [-archive -logs <urlsFile>] " +
  239.       "-archiveDir <archiveDirectory> " +
  240.       "-grep <pattern> -sort <column1,column2,...> -separator <separator> " +
  241.       "-analysis <outputDirectory>";
  242.     
  243.     System.out.println(version);
  244.     if (args.length == 0) {
  245.       System.err.println(usage);
  246.       System.exit(-1);
  247.     }
  248.     
  249.     //Command line arguments
  250.     boolean archive = false;
  251.     boolean grep = false;
  252.     boolean sort = false;
  253.     
  254.     String archiveDir = "";
  255.     String logListURI = "";
  256.     String grepPattern = ".*";
  257.     String sortColumns = "";
  258.     String columnSeparator = " ";
  259.     String outputDirectory = "";
  260.     
  261.     for (int i = 0; i < args.length; i++) { // parse command line
  262.       if (args[i].equals("-archive")) {
  263.         archive = true;
  264.       } else if (args[i].equals("-archiveDir")) {
  265.         archiveDir = args[++i];
  266.       } else if (args[i].equals("-grep")) {
  267.         grep = true;
  268.         grepPattern = args[++i];
  269.       } else if (args[i].equals("-logs")) {
  270.         logListURI = args[++i];
  271.       } else if (args[i].equals("-sort")) {
  272.         sort = true;
  273.         sortColumns = args[++i];
  274.       } else if (args[i].equals("-separator")) {
  275.         columnSeparator = args[++i];
  276.       } else if (args[i].equals("-analysis")) {
  277.         outputDirectory = args[++i];
  278.       }
  279.     }
  280.     
  281.     LOG.info("analysisDir = " + outputDirectory);
  282.     LOG.info("archiveDir = " + archiveDir);
  283.     LOG.info("logListURI = " + logListURI);
  284.     LOG.info("grepPattern = " + grepPattern);
  285.     LOG.info("sortColumns = " + sortColumns);
  286.     LOG.info("separator = " + columnSeparator);
  287.     
  288.     try {
  289.       Logalyzer logalyzer = new Logalyzer();
  290.       
  291.       // Archive?
  292.       if (archive) {
  293.         logalyzer.doArchive(logListURI, archiveDir);
  294.       }
  295.       
  296.       // Analyze?
  297.       if (grep || sort) {
  298.         logalyzer.doAnalyze(archiveDir, outputDirectory, grepPattern, sortColumns, columnSeparator);
  299.       }
  300.     } catch (IOException ioe) {
  301.       ioe.printStackTrace();
  302.       System.exit(-1);
  303.     }
  304.     
  305.   } //main
  306.   
  307. } //class Logalyzer