SortValidator.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:22k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.*;
  20. import java.net.URI;
  21. import java.util.*;
  22. import org.apache.hadoop.conf.Configuration;
  23. import org.apache.hadoop.conf.Configured;
  24. import org.apache.hadoop.io.BytesWritable;
  25. import org.apache.hadoop.io.IntWritable;
  26. import org.apache.hadoop.io.SequenceFile;
  27. import org.apache.hadoop.io.Text;
  28. import org.apache.hadoop.io.Writable;
  29. import org.apache.hadoop.io.WritableComparable;
  30. import org.apache.hadoop.io.WritableComparator;
  31. import org.apache.hadoop.io.WritableUtils;
  32. import org.apache.hadoop.mapred.lib.HashPartitioner;
  33. import org.apache.hadoop.util.Tool;
  34. import org.apache.hadoop.util.ToolRunner;
  35. import org.apache.hadoop.fs.*;
  36. /**
  37.  * A set of utilities to validate the <b>sort</b> of the map-reduce framework.
  38.  * This utility program has 2 main parts:
  39.  * 1. Checking the records' statistics
  40.  *   a) Validates the no. of bytes and records in sort's input & output. 
  41.  *   b) Validates the xor of the md5's of each key/value pair.
  42.  *   c) Ensures same key/value is present in both input and output.
  43.  * 2. Check individual records  to ensure each record is present in both
  44.  *    the input and the output of the sort (expensive on large data-sets). 
  45.  *    
  46.  * To run: bin/hadoop jar build/hadoop-examples.jar sortvalidate
  47.  *            [-m <i>maps</i>] [-r <i>reduces</i>] [-deep] 
  48.  *            -sortInput <i>sort-in-dir</i> -sortOutput <i>sort-out-dir</i> 
  49.  */
  50. public class SortValidator extends Configured implements Tool {
  51.   static private final IntWritable sortInput = new IntWritable(1); 
  52.   static private final IntWritable sortOutput = new IntWritable(2); 
  53.   static void printUsage() {
  54.     System.err.println("sortvalidate [-m <maps>] [-r <reduces>] [-deep] " +
  55.                        "-sortInput <sort-input-dir> -sortOutput <sort-output-dir>");
  56.     System.exit(1);
  57.   }
  58.   static private IntWritable deduceInputFile(JobConf job) {
  59.     Path[] inputPaths = FileInputFormat.getInputPaths(job);
  60.     Path inputFile = new Path(job.get("map.input.file"));
  61.     // value == one for sort-input; value == two for sort-output
  62.     return (inputFile.getParent().equals(inputPaths[0])) ? 
  63.         sortInput : sortOutput;
  64.   }
  65.   
  66.   static private byte[] pair(BytesWritable a, BytesWritable b) {
  67.     byte[] pairData = new byte[a.getLength()+ b.getLength()];
  68.     System.arraycopy(a.getBytes(), 0, pairData, 0, a.getLength());
  69.     System.arraycopy(b.getBytes(), 0, pairData, a.getLength(), b.getLength());
  70.     return pairData;
  71.   }
  72.   private static final PathFilter sortPathsFilter = new PathFilter() {
  73.     public boolean accept(Path path) {
  74.       return (path.getName().startsWith("part-"));
  75.     }
  76.   };
  77.   
  78.   /**
  79.    * A simple map-reduce job which checks consistency of the
  80.    * MapReduce framework's sort by checking:
  81.    * a) Records are sorted correctly
  82.    * b) Keys are partitioned correctly
  83.    * c) The input and output have same no. of bytes and records.
  84.    * d) The input and output have the correct 'checksum' by xor'ing 
  85.    *    the md5 of each record.
  86.    *    
  87.    */
  88.   public static class RecordStatsChecker {
  89.     /**
  90.      * Generic way to get <b>raw</b> data from a {@link Writable}.
  91.      */
  92.     static class Raw {
  93.       /**
  94.        * Get raw data bytes from a {@link Writable}
  95.        * @param writable {@link Writable} object from whom to get the raw data
  96.        * @return raw data of the writable
  97.        */
  98.       public byte[] getRawBytes(Writable writable) {
  99.         return writable.toString().getBytes(); 
  100.       } 
  101.       
  102.       /**
  103.        * Get number of raw data bytes of the {@link Writable}
  104.        * @param writable {@link Writable} object from whom to get the raw data
  105.        *                 length
  106.        * @return number of raw data bytes
  107.        */
  108.       public int getRawBytesLength(Writable writable) {
  109.         return writable.toString().getBytes().length; 
  110.       }
  111.     }
  112.     /**
  113.      * Specialization of {@link Raw} for {@link BytesWritable}.
  114.      */
  115.     static class RawBytesWritable extends Raw  {
  116.       public byte[] getRawBytes(Writable bw) {
  117.         return ((BytesWritable)bw).getBytes();
  118.       }
  119.       public int getRawBytesLength(Writable bw) {
  120.         return ((BytesWritable)bw).getLength(); 
  121.       }
  122.     }
  123.     
  124.     /**
  125.      * Specialization of {@link Raw} for {@link Text}.
  126.      */
  127.     static class RawText extends Raw  {
  128.       public byte[] getRawBytes(Writable text) {
  129.         return ((Text)text).getBytes();
  130.       }
  131.       public int getRawBytesLength(Writable text) {
  132.         return ((Text)text).getLength();
  133.       }
  134.     }
  135.     
  136.     private static Raw createRaw(Class rawClass) {
  137.       if (rawClass == Text.class) {
  138.         return new RawText();
  139.       } else if (rawClass == BytesWritable.class) {
  140.         System.err.println("Returning " + RawBytesWritable.class);
  141.         return new RawBytesWritable();
  142.       }      
  143.       return new Raw();
  144.     }
  145.     public static class RecordStatsWritable implements Writable {
  146.       private long bytes = 0;
  147.       private long records = 0;
  148.       private int checksum = 0;
  149.       
  150.       public RecordStatsWritable() {}
  151.       
  152.       public RecordStatsWritable(long bytes, long records, int checksum) {
  153.         this.bytes = bytes;
  154.         this.records = records;
  155.         this.checksum = checksum;
  156.       }
  157.       
  158.       public void write(DataOutput out) throws IOException {
  159.         WritableUtils.writeVLong(out, bytes);
  160.         WritableUtils.writeVLong(out, records);
  161.         WritableUtils.writeVInt(out, checksum);
  162.       }
  163.       public void readFields(DataInput in) throws IOException {
  164.         bytes = WritableUtils.readVLong(in);
  165.         records = WritableUtils.readVLong(in);
  166.         checksum = WritableUtils.readVInt(in);
  167.       }
  168.       
  169.       public long getBytes() { return bytes; }
  170.       public long getRecords() { return records; }
  171.       public int getChecksum() { return checksum; }
  172.     }
  173.     
  174.     public static class Map extends MapReduceBase
  175.       implements Mapper<WritableComparable, Writable,
  176.                         IntWritable, RecordStatsWritable> {
  177.       
  178.       private IntWritable key = null;
  179.       private WritableComparable prevKey = null;
  180.       private Class<? extends WritableComparable> keyClass;
  181.       private Partitioner<WritableComparable, Writable> partitioner = null;
  182.       private int partition = -1;
  183.       private int noSortReducers = -1;
  184.       private long recordId = -1;
  185.       
  186.       private Raw rawKey;
  187.       private Raw rawValue;
  188.       public void configure(JobConf job) {
  189.         // 'key' == sortInput for sort-input; key == sortOutput for sort-output
  190.         key = deduceInputFile(job);
  191.         
  192.         if (key == sortOutput) {
  193.           partitioner = new HashPartitioner<WritableComparable, Writable>();
  194.           
  195.           // Figure the 'current' partition and no. of reduces of the 'sort'
  196.           try {
  197.             URI inputURI = new URI(job.get("map.input.file"));
  198.             String inputFile = inputURI.getPath();
  199.             partition = Integer.valueOf(
  200.                                         inputFile.substring(inputFile.lastIndexOf("part")+5)
  201.                                         ).intValue();
  202.             noSortReducers = job.getInt("sortvalidate.sort.reduce.tasks", -1);
  203.           } catch (Exception e) {
  204.             System.err.println("Caught: " + e);
  205.             System.exit(-1);
  206.           }
  207.         }
  208.       }
  209.       
  210.       @SuppressWarnings("unchecked")
  211.       public void map(WritableComparable key, Writable value,
  212.                       OutputCollector<IntWritable, RecordStatsWritable> output, 
  213.                       Reporter reporter) throws IOException {
  214.         // Set up rawKey and rawValue on the first call to 'map'
  215.         if (recordId == -1) {
  216.          rawKey = createRaw(key.getClass());
  217.          rawValue = createRaw(value.getClass());
  218.         }
  219.         ++recordId;
  220.         
  221.         if (this.key == sortOutput) {
  222.           // Check if keys are 'sorted' if this  
  223.           // record is from sort's output
  224.           if (prevKey == null) {
  225.             prevKey = key;
  226.             keyClass = prevKey.getClass();
  227.           } else {
  228.             // Sanity check
  229.             if (keyClass != key.getClass()) {
  230.               throw new IOException("Type mismatch in key: expected " +
  231.                                     keyClass.getName() + ", recieved " +
  232.                                     key.getClass().getName());
  233.             }
  234.             
  235.             // Check if they were sorted correctly
  236.             if (prevKey.compareTo(key) > 0) {
  237.               throw new IOException("The 'map-reduce' framework wrongly" +
  238.                                     " classifed (" + prevKey + ") > (" + 
  239.                                     key + ") "+ "for record# " + recordId); 
  240.             }
  241.             prevKey = key;
  242.           }
  243.           // Check if the sorted output is 'partitioned' right
  244.           int keyPartition = 
  245.             partitioner.getPartition(key, value, noSortReducers);
  246.           if (partition != keyPartition) {
  247.             throw new IOException("Partitions do not match for record# " + 
  248.                                   recordId + " ! - '" + partition + "' v/s '" + 
  249.                                   keyPartition + "'");
  250.           }
  251.         }
  252.         // Construct the record-stats and output (this.key, record-stats)
  253.         byte[] keyBytes = rawKey.getRawBytes(key);
  254.         int keyBytesLen = rawKey.getRawBytesLength(key);
  255.         byte[] valueBytes = rawValue.getRawBytes(value);
  256.         int valueBytesLen = rawValue.getRawBytesLength(value);
  257.         
  258.         int keyValueChecksum = 
  259.           (WritableComparator.hashBytes(keyBytes, keyBytesLen) ^
  260.            WritableComparator.hashBytes(valueBytes, valueBytesLen));
  261.         output.collect(this.key, 
  262.                        new RecordStatsWritable((keyBytesLen+valueBytesLen),
  263.                        1, keyValueChecksum)
  264.                       );
  265.       }
  266.       
  267.     }
  268.     
  269.     public static class Reduce extends MapReduceBase
  270.       implements Reducer<IntWritable, RecordStatsWritable,
  271.                          IntWritable, RecordStatsWritable> {
  272.       
  273.       public void reduce(IntWritable key, Iterator<RecordStatsWritable> values,
  274.                          OutputCollector<IntWritable,
  275.                                          RecordStatsWritable> output, 
  276.                          Reporter reporter) throws IOException {
  277.         long bytes = 0;
  278.         long records = 0;
  279.         int xor = 0;
  280.         while (values.hasNext()) {
  281.           RecordStatsWritable stats = values.next();
  282.           bytes += stats.getBytes();
  283.           records += stats.getRecords();
  284.           xor ^= stats.getChecksum(); 
  285.         }
  286.         
  287.         output.collect(key, new RecordStatsWritable(bytes, records, xor));
  288.       }
  289.     }
  290.     
  291.     public static class NonSplitableSequenceFileInputFormat 
  292.       extends SequenceFileInputFormat {
  293.       protected boolean isSplitable(FileSystem fs, Path filename) {
  294.         return false;
  295.       }
  296.     }
  297.     
  298.     static void checkRecords(Configuration defaults, 
  299.                              Path sortInput, Path sortOutput) throws IOException {
  300.       FileSystem inputfs = sortInput.getFileSystem(defaults);
  301.       FileSystem outputfs = sortOutput.getFileSystem(defaults);
  302.       FileSystem defaultfs = FileSystem.get(defaults);
  303.       JobConf jobConf = new JobConf(defaults, RecordStatsChecker.class);
  304.       jobConf.setJobName("sortvalidate-recordstats-checker");
  305.       int noSortReduceTasks = 
  306.         outputfs.listStatus(sortOutput, sortPathsFilter).length;
  307.       jobConf.setInt("sortvalidate.sort.reduce.tasks", noSortReduceTasks);
  308.       int noSortInputpaths =  inputfs.listStatus(sortInput).length;
  309.       jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
  310.       jobConf.setOutputFormat(SequenceFileOutputFormat.class);
  311.       
  312.       jobConf.setOutputKeyClass(IntWritable.class);
  313.       jobConf.setOutputValueClass(RecordStatsChecker.RecordStatsWritable.class);
  314.       
  315.       jobConf.setMapperClass(Map.class);
  316.       jobConf.setCombinerClass(Reduce.class);
  317.       jobConf.setReducerClass(Reduce.class);
  318.       
  319.       jobConf.setNumMapTasks(noSortReduceTasks);
  320.       jobConf.setNumReduceTasks(1);
  321.       FileInputFormat.setInputPaths(jobConf, sortInput);
  322.       FileInputFormat.addInputPath(jobConf, sortOutput);
  323.       Path outputPath = new Path("/tmp/sortvalidate/recordstatschecker");
  324.       if (defaultfs.exists(outputPath)) {
  325.         defaultfs.delete(outputPath, true);
  326.       }
  327.       FileOutputFormat.setOutputPath(jobConf, outputPath);
  328.       
  329.       // Uncomment to run locally in a single process
  330.       //job_conf.set("mapred.job.tracker", "local");
  331.       Path[] inputPaths = FileInputFormat.getInputPaths(jobConf);
  332.       System.out.println("nSortValidator.RecordStatsChecker: Validate sort " +
  333.                          "from " + inputPaths[0] + " (" + 
  334.                          noSortInputpaths + " files), " + 
  335.                          inputPaths[1] + " (" + 
  336.                          noSortReduceTasks + 
  337.                          " files) into " + 
  338.                          FileOutputFormat.getOutputPath(jobConf) + 
  339.                          " with 1 reducer.");
  340.       Date startTime = new Date();
  341.       System.out.println("Job started: " + startTime);
  342.       JobClient.runJob(jobConf);
  343.       Date end_time = new Date();
  344.       System.out.println("Job ended: " + end_time);
  345.       System.out.println("The job took " + 
  346.                          (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
  347.       
  348.       // Check to ensure that the statistics of the 
  349.       // framework's sort-input and sort-output match
  350.       SequenceFile.Reader stats = new SequenceFile.Reader(defaultfs,
  351.                                                           new Path(outputPath, "part-00000"), defaults);
  352.       IntWritable k1 = new IntWritable();
  353.       IntWritable k2 = new IntWritable();
  354.       RecordStatsWritable v1 = new RecordStatsWritable();
  355.       RecordStatsWritable v2 = new RecordStatsWritable();
  356.       if (!stats.next(k1, v1)) {
  357.         throw new IOException("Failed to read record #1 from reduce's output");
  358.       }
  359.       if (!stats.next(k2, v2)) {
  360.         throw new IOException("Failed to read record #2 from reduce's output");
  361.       }
  362.       if ((v1.getBytes() != v2.getBytes()) || (v1.getRecords() != v2.getRecords()) || 
  363.           v1.getChecksum() != v2.getChecksum()) {
  364.         throw new IOException("(" + 
  365.                               v1.getBytes() + ", " + v1.getRecords() + ", " + v1.getChecksum() + ") v/s (" +
  366.                               v2.getBytes() + ", " + v2.getRecords() + ", " + v2.getChecksum() + ")");
  367.       }
  368.     }
  369.   }
  370.   
  371.   /**
  372.    * A simple map-reduce task to check if the input and the output
  373.    * of the framework's sort is consistent by ensuring each record 
  374.    * is present in both the input and the output.
  375.    * 
  376.    */
  377.   public static class RecordChecker {
  378.     
  379.     public static class Map extends MapReduceBase
  380.       implements Mapper<BytesWritable, BytesWritable,
  381.                         BytesWritable, IntWritable> {
  382.       
  383.       private IntWritable value = null;
  384.       
  385.       public void configure(JobConf job) {
  386.         // value == one for sort-input; value == two for sort-output
  387.         value = deduceInputFile(job);
  388.       }
  389.       
  390.       public void map(BytesWritable key, 
  391.                       BytesWritable value,
  392.                       OutputCollector<BytesWritable, IntWritable> output, 
  393.                       Reporter reporter) throws IOException {
  394.         // newKey = (key, value)
  395.         BytesWritable keyValue = new BytesWritable(pair(key, value));
  396.     
  397.         // output (newKey, value)
  398.         output.collect(keyValue, this.value);
  399.       }
  400.     }
  401.     
  402.     public static class Reduce extends MapReduceBase
  403.       implements Reducer<BytesWritable, IntWritable,
  404.                         BytesWritable, IntWritable> {
  405.       
  406.       public void reduce(BytesWritable key, Iterator<IntWritable> values,
  407.                          OutputCollector<BytesWritable, IntWritable> output,
  408.                          Reporter reporter) throws IOException {
  409.         int ones = 0;
  410.         int twos = 0;
  411.         while (values.hasNext()) {
  412.           IntWritable count = values.next(); 
  413.           if (count.equals(sortInput)) {
  414.             ++ones;
  415.           } else if (count.equals(sortOutput)) {
  416.             ++twos;
  417.           } else {
  418.             throw new IOException("Invalid 'value' of " + count.get() + 
  419.                                   " for (key,value): " + key.toString());
  420.           }
  421.         }
  422.         
  423.         // Check to ensure there are equal no. of ones and twos
  424.         if (ones != twos) {
  425.           throw new IOException("Illegal ('one', 'two'): (" + ones + ", " + twos +
  426.                                 ") for (key, value): " + key.toString());
  427.         }
  428.       }
  429.     }
  430.     
  431.     static void checkRecords(Configuration defaults, int noMaps, int noReduces,
  432.                              Path sortInput, Path sortOutput) throws IOException {
  433.       JobConf jobConf = new JobConf(defaults, RecordChecker.class);
  434.       jobConf.setJobName("sortvalidate-record-checker");
  435.       
  436.       jobConf.setInputFormat(SequenceFileInputFormat.class);
  437.       jobConf.setOutputFormat(SequenceFileOutputFormat.class);
  438.       
  439.       jobConf.setOutputKeyClass(BytesWritable.class);
  440.       jobConf.setOutputValueClass(IntWritable.class);
  441.       
  442.       jobConf.setMapperClass(Map.class);        
  443.       jobConf.setReducerClass(Reduce.class);
  444.       
  445.       JobClient client = new JobClient(jobConf);
  446.       ClusterStatus cluster = client.getClusterStatus();
  447.       if (noMaps == -1) {
  448.         noMaps = cluster.getTaskTrackers() * 
  449.           jobConf.getInt("test.sortvalidate.maps_per_host", 10);
  450.       }
  451.       if (noReduces == -1) {
  452.         noReduces = (int) (cluster.getMaxReduceTasks() * 0.9);
  453.         String sortReduces = jobConf.get("test.sortvalidate.reduces_per_host");
  454.         if (sortReduces != null) {
  455.            noReduces = cluster.getTaskTrackers() * 
  456.                            Integer.parseInt(sortReduces);
  457.         }
  458.       }
  459.       jobConf.setNumMapTasks(noMaps);
  460.       jobConf.setNumReduceTasks(noReduces);
  461.       
  462.       FileInputFormat.setInputPaths(jobConf, sortInput);
  463.       FileInputFormat.addInputPath(jobConf, sortOutput);
  464.       Path outputPath = new Path("/tmp/sortvalidate/recordchecker");
  465.       FileSystem fs = FileSystem.get(defaults);
  466.       if (fs.exists(outputPath)) {
  467.         fs.delete(outputPath, true);
  468.       }
  469.       FileOutputFormat.setOutputPath(jobConf, outputPath);
  470.       
  471.       // Uncomment to run locally in a single process
  472.       //job_conf.set("mapred.job.tracker", "local");
  473.       Path[] inputPaths = FileInputFormat.getInputPaths(jobConf);
  474.       System.out.println("nSortValidator.RecordChecker: Running on " +
  475.                          cluster.getTaskTrackers() +
  476.                         " nodes to validate sort from " + 
  477.                          inputPaths[0] + ", " + 
  478.                          inputPaths[1] + " into " + 
  479.                          FileOutputFormat.getOutputPath(jobConf) + 
  480.                          " with " + noReduces + " reduces.");
  481.       Date startTime = new Date();
  482.       System.out.println("Job started: " + startTime);
  483.       JobClient.runJob(jobConf);
  484.       Date end_time = new Date();
  485.       System.out.println("Job ended: " + end_time);
  486.       System.out.println("The job took " + 
  487.                          (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
  488.     }
  489.   }
  490.   
  491.   /**
  492.    * The main driver for sort-validator program.
  493.    * Invoke this method to submit the map/reduce job.
  494.    * @throws IOException When there is communication problems with the 
  495.    *                     job tracker.
  496.    */
  497.   public int run(String[] args) throws Exception {
  498.     Configuration defaults = getConf();
  499.     
  500.     int noMaps = -1, noReduces = -1;
  501.     Path sortInput = null, sortOutput = null;
  502.     boolean deepTest = false;
  503.     for(int i=0; i < args.length; ++i) {
  504.       try {
  505.         if ("-m".equals(args[i])) {
  506.           noMaps = Integer.parseInt(args[++i]);
  507.         } else if ("-r".equals(args[i])) {
  508.           noReduces = Integer.parseInt(args[++i]);
  509.         } else if ("-sortInput".equals(args[i])){
  510.           sortInput = new Path(args[++i]);
  511.         } else if ("-sortOutput".equals(args[i])){
  512.           sortOutput = new Path(args[++i]);
  513.         } else if ("-deep".equals(args[i])) {
  514.           deepTest = true;
  515.         } else {
  516.           printUsage();
  517.           return -1;
  518.         }
  519.       } catch (NumberFormatException except) {
  520.         System.err.println("ERROR: Integer expected instead of " + args[i]);
  521.         printUsage();
  522.         return -1;
  523.       } catch (ArrayIndexOutOfBoundsException except) {
  524.         System.err.println("ERROR: Required parameter missing from " +
  525.                            args[i-1]);
  526.         printUsage();
  527.         return -1;
  528.       }
  529.     }
  530.     
  531.     // Sanity check
  532.     if (sortInput == null || sortOutput == null) {
  533.       printUsage();
  534.       return -2;
  535.     }
  536.     // Check if the records are consistent and sorted correctly
  537.     RecordStatsChecker.checkRecords(defaults, sortInput, sortOutput);
  538.     // Check if the same records are present in sort's inputs & outputs
  539.     if (deepTest) {
  540.       RecordChecker.checkRecords(defaults, noMaps, noReduces, sortInput, 
  541.                                  sortOutput);
  542.     }
  543.     
  544.     System.out.println("nSUCCESS! Validated the MapReduce framework's 'sort'" +
  545.                        " successfully.");
  546.     
  547.     return 0;
  548.   }
  549.   public static void main(String[] args) throws Exception {
  550.     int res = ToolRunner.run(new Configuration(), new SortValidator(), args);
  551.     System.exit(res);
  552.   }
  553. }