InputSampler.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:15k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred.lib;
  19. import java.io.IOException;
  20. import java.util.ArrayList;
  21. import java.util.Arrays;
  22. import java.util.Random;
  23. import org.apache.commons.logging.Log;
  24. import org.apache.commons.logging.LogFactory;
  25. import org.apache.hadoop.conf.Configuration;
  26. import org.apache.hadoop.fs.FileSystem;
  27. import org.apache.hadoop.fs.Path;
  28. import org.apache.hadoop.io.NullWritable;
  29. import org.apache.hadoop.io.RawComparator;
  30. import org.apache.hadoop.io.SequenceFile;
  31. import org.apache.hadoop.io.WritableComparable;
  32. import org.apache.hadoop.mapred.FileInputFormat;
  33. import org.apache.hadoop.mapred.InputFormat;
  34. import org.apache.hadoop.mapred.InputSplit;
  35. import org.apache.hadoop.mapred.JobConf;
  36. import org.apache.hadoop.mapred.RecordReader;
  37. import org.apache.hadoop.mapred.Reporter;
  38. import org.apache.hadoop.util.Tool;
  39. import org.apache.hadoop.util.ToolRunner;
  40. /**
  41.  * Utility for collecting samples and writing a partition file for
  42.  * {@link org.apache.hadoop.mapred.lib.TotalOrderPartitioner}.
  43.  */
  44. public class InputSampler<K,V> implements Tool {
  45.   private static final Log LOG = LogFactory.getLog(InputSampler.class);
  46.   static int printUsage() {
  47.     System.out.println("sampler -r <reduces>n" +
  48.                        "      [-inFormat <input format class>]n" +
  49.                        "      [-keyClass <map input & output key class>]n" +
  50.                        "      [-splitRandom <double pcnt> <numSamples> <maxsplits> | " +
  51.                        "// Sample from random splits at random (general)n" +
  52.                        "       -splitSample <numSamples> <maxsplits> | " +
  53.                        "             // Sample from first records in splits (random data)n"+
  54.                        "       -splitInterval <double pcnt> <maxsplits>]" +
  55.                        "             // Sample from splits at intervals (sorted data)");
  56.     System.out.println("Default sampler: -splitRandom 0.1 10000 10");
  57.     ToolRunner.printGenericCommandUsage(System.out);
  58.     return -1;
  59.   }
  60.   private JobConf conf;
  61.   public InputSampler(JobConf conf) {
  62.     this.conf = conf;
  63.   }
  64.   public Configuration getConf() {
  65.     return conf;
  66.   }
  67.   public void setConf(Configuration conf) {
  68.     if (!(conf instanceof JobConf)) {
  69.       this.conf = new JobConf(conf);
  70.     } else {
  71.       this.conf = (JobConf) conf;
  72.     }
  73.   }
  74.   /**
  75.    * Interface to sample using an {@link org.apache.hadoop.mapred.InputFormat}.
  76.    */
  77.   public interface Sampler<K,V> {
  78.     /**
  79.      * For a given job, collect and return a subset of the keys from the
  80.      * input data.
  81.      */
  82.     K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException;
  83.   }
  84.   /**
  85.    * Samples the first n records from s splits.
  86.    * Inexpensive way to sample random data.
  87.    */
  88.   public static class SplitSampler<K,V> implements Sampler<K,V> {
  89.     private final int numSamples;
  90.     private final int maxSplitsSampled;
  91.     /**
  92.      * Create a SplitSampler sampling <em>all</em> splits.
  93.      * Takes the first numSamples / numSplits records from each split.
  94.      * @param numSamples Total number of samples to obtain from all selected
  95.      *                   splits.
  96.      */
  97.     public SplitSampler(int numSamples) {
  98.       this(numSamples, Integer.MAX_VALUE);
  99.     }
  100.     /**
  101.      * Create a new SplitSampler.
  102.      * @param numSamples Total number of samples to obtain from all selected
  103.      *                   splits.
  104.      * @param maxSplitsSampled The maximum number of splits to examine.
  105.      */
  106.     public SplitSampler(int numSamples, int maxSplitsSampled) {
  107.       this.numSamples = numSamples;
  108.       this.maxSplitsSampled = maxSplitsSampled;
  109.     }
  110.     /**
  111.      * From each split sampled, take the first numSamples / numSplits records.
  112.      */
  113.     @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
  114.     public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
  115.       InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
  116.       ArrayList<K> samples = new ArrayList<K>(numSamples);
  117.       int splitsToSample = Math.min(maxSplitsSampled, splits.length);
  118.       int splitStep = splits.length / splitsToSample;
  119.       int samplesPerSplit = numSamples / splitsToSample;
  120.       long records = 0;
  121.       for (int i = 0; i < splitsToSample; ++i) {
  122.         RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
  123.             job, Reporter.NULL);
  124.         K key = reader.createKey();
  125.         V value = reader.createValue();
  126.         while (reader.next(key, value)) {
  127.           samples.add(key);
  128.           key = reader.createKey();
  129.           ++records;
  130.           if ((i+1) * samplesPerSplit <= records) {
  131.             break;
  132.           }
  133.         }
  134.         reader.close();
  135.       }
  136.       return (K[])samples.toArray();
  137.     }
  138.   }
  139.   /**
  140.    * Sample from random points in the input.
  141.    * General-purpose sampler. Takes numSamples / maxSplitsSampled inputs from
  142.    * each split.
  143.    */
  144.   public static class RandomSampler<K,V> implements Sampler<K,V> {
  145.     private double freq;
  146.     private final int numSamples;
  147.     private final int maxSplitsSampled;
  148.     /**
  149.      * Create a new RandomSampler sampling <em>all</em> splits.
  150.      * This will read every split at the client, which is very expensive.
  151.      * @param freq Probability with which a key will be chosen.
  152.      * @param numSamples Total number of samples to obtain from all selected
  153.      *                   splits.
  154.      */
  155.     public RandomSampler(double freq, int numSamples) {
  156.       this(freq, numSamples, Integer.MAX_VALUE);
  157.     }
  158.     /**
  159.      * Create a new RandomSampler.
  160.      * @param freq Probability with which a key will be chosen.
  161.      * @param numSamples Total number of samples to obtain from all selected
  162.      *                   splits.
  163.      * @param maxSplitsSampled The maximum number of splits to examine.
  164.      */
  165.     public RandomSampler(double freq, int numSamples, int maxSplitsSampled) {
  166.       this.freq = freq;
  167.       this.numSamples = numSamples;
  168.       this.maxSplitsSampled = maxSplitsSampled;
  169.     }
  170.     /**
  171.      * Randomize the split order, then take the specified number of keys from
  172.      * each split sampled, where each key is selected with the specified
  173.      * probability and possibly replaced by a subsequently selected key when
  174.      * the quota of keys from that split is satisfied.
  175.      */
  176.     @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
  177.     public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
  178.       InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
  179.       ArrayList<K> samples = new ArrayList<K>(numSamples);
  180.       int splitsToSample = Math.min(maxSplitsSampled, splits.length);
  181.       Random r = new Random();
  182.       long seed = r.nextLong();
  183.       r.setSeed(seed);
  184.       LOG.debug("seed: " + seed);
  185.       // shuffle splits
  186.       for (int i = 0; i < splits.length; ++i) {
  187.         InputSplit tmp = splits[i];
  188.         int j = r.nextInt(splits.length);
  189.         splits[i] = splits[j];
  190.         splits[j] = tmp;
  191.       }
  192.       // our target rate is in terms of the maximum number of sample splits,
  193.       // but we accept the possibility of sampling additional splits to hit
  194.       // the target sample keyset
  195.       for (int i = 0; i < splitsToSample ||
  196.                      (i < splits.length && samples.size() < numSamples); ++i) {
  197.         RecordReader<K,V> reader = inf.getRecordReader(splits[i], job,
  198.             Reporter.NULL);
  199.         K key = reader.createKey();
  200.         V value = reader.createValue();
  201.         while (reader.next(key, value)) {
  202.           if (r.nextDouble() <= freq) {
  203.             if (samples.size() < numSamples) {
  204.               samples.add(key);
  205.             } else {
  206.               // When exceeding the maximum number of samples, replace a
  207.               // random element with this one, then adjust the frequency
  208.               // to reflect the possibility of existing elements being
  209.               // pushed out
  210.               int ind = r.nextInt(numSamples);
  211.               if (ind != numSamples) {
  212.                 samples.set(ind, key);
  213.               }
  214.               freq *= (numSamples - 1) / (double) numSamples;
  215.             }
  216.             key = reader.createKey();
  217.           }
  218.         }
  219.         reader.close();
  220.       }
  221.       return (K[])samples.toArray();
  222.     }
  223.   }
  224.   /**
  225.    * Sample from s splits at regular intervals.
  226.    * Useful for sorted data.
  227.    */
  228.   public static class IntervalSampler<K,V> implements Sampler<K,V> {
  229.     private final double freq;
  230.     private final int maxSplitsSampled;
  231.     /**
  232.      * Create a new IntervalSampler sampling <em>all</em> splits.
  233.      * @param freq The frequency with which records will be emitted.
  234.      */
  235.     public IntervalSampler(double freq) {
  236.       this(freq, Integer.MAX_VALUE);
  237.     }
  238.     /**
  239.      * Create a new IntervalSampler.
  240.      * @param freq The frequency with which records will be emitted.
  241.      * @param maxSplitsSampled The maximum number of splits to examine.
  242.      * @see #getSample
  243.      */
  244.     public IntervalSampler(double freq, int maxSplitsSampled) {
  245.       this.freq = freq;
  246.       this.maxSplitsSampled = maxSplitsSampled;
  247.     }
  248.     /**
  249.      * For each split sampled, emit when the ratio of the number of records
  250.      * retained to the total record count is less than the specified
  251.      * frequency.
  252.      */
  253.     @SuppressWarnings("unchecked") // ArrayList::toArray doesn't preserve type
  254.     public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
  255.       InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
  256.       ArrayList<K> samples = new ArrayList<K>();
  257.       int splitsToSample = Math.min(maxSplitsSampled, splits.length);
  258.       int splitStep = splits.length / splitsToSample;
  259.       long records = 0;
  260.       long kept = 0;
  261.       for (int i = 0; i < splitsToSample; ++i) {
  262.         RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
  263.             job, Reporter.NULL);
  264.         K key = reader.createKey();
  265.         V value = reader.createValue();
  266.         while (reader.next(key, value)) {
  267.           ++records;
  268.           if ((double) kept / records < freq) {
  269.             ++kept;
  270.             samples.add(key);
  271.             key = reader.createKey();
  272.           }
  273.         }
  274.         reader.close();
  275.       }
  276.       return (K[])samples.toArray();
  277.     }
  278.   }
  279.   /**
  280.    * Write a partition file for the given job, using the Sampler provided.
  281.    * Queries the sampler for a sample keyset, sorts by the output key
  282.    * comparator, selects the keys for each rank, and writes to the destination
  283.    * returned from {@link
  284.      org.apache.hadoop.mapred.lib.TotalOrderPartitioner#getPartitionFile}.
  285.    */
  286.   @SuppressWarnings("unchecked") // getInputFormat, getOutputKeyComparator
  287.   public static <K,V> void writePartitionFile(JobConf job,
  288.       Sampler<K,V> sampler) throws IOException {
  289.     final InputFormat<K,V> inf = (InputFormat<K,V>) job.getInputFormat();
  290.     int numPartitions = job.getNumReduceTasks();
  291.     K[] samples = sampler.getSample(inf, job);
  292.     LOG.info("Using " + samples.length + " samples");
  293.     RawComparator<K> comparator =
  294.       (RawComparator<K>) job.getOutputKeyComparator();
  295.     Arrays.sort(samples, comparator);
  296.     Path dst = new Path(TotalOrderPartitioner.getPartitionFile(job));
  297.     FileSystem fs = dst.getFileSystem(job);
  298.     if (fs.exists(dst)) {
  299.       fs.delete(dst, false);
  300.     }
  301.     SequenceFile.Writer writer = SequenceFile.createWriter(fs, job, dst,
  302.         job.getMapOutputKeyClass(), NullWritable.class);
  303.     NullWritable nullValue = NullWritable.get();
  304.     float stepSize = samples.length / (float) numPartitions;
  305.     int last = -1;
  306.     for(int i = 1; i < numPartitions; ++i) {
  307.       int k = Math.round(stepSize * i);
  308.       while (last >= k && comparator.compare(samples[last], samples[k]) == 0) {
  309.         ++k;
  310.       }
  311.       writer.append(samples[k], nullValue);
  312.       last = k;
  313.     }
  314.     writer.close();
  315.   }
  316.   /**
  317.    * Driver for InputSampler from the command line.
  318.    * Configures a JobConf instance and calls {@link #writePartitionFile}.
  319.    */
  320.   public int run(String[] args) throws Exception {
  321.     JobConf job = (JobConf) getConf();
  322.     ArrayList<String> otherArgs = new ArrayList<String>();
  323.     Sampler<K,V> sampler = null;
  324.     for(int i=0; i < args.length; ++i) {
  325.       try {
  326.         if ("-r".equals(args[i])) {
  327.           job.setNumReduceTasks(Integer.parseInt(args[++i]));
  328.         } else if ("-inFormat".equals(args[i])) {
  329.           job.setInputFormat(
  330.               Class.forName(args[++i]).asSubclass(InputFormat.class));
  331.         } else if ("-keyClass".equals(args[i])) {
  332.           job.setMapOutputKeyClass(
  333.               Class.forName(args[++i]).asSubclass(WritableComparable.class));
  334.         } else if ("-splitSample".equals(args[i])) {
  335.           int numSamples = Integer.parseInt(args[++i]);
  336.           int maxSplits = Integer.parseInt(args[++i]);
  337.           if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE;
  338.           sampler = new SplitSampler<K,V>(numSamples, maxSplits);
  339.         } else if ("-splitRandom".equals(args[i])) {
  340.           double pcnt = Double.parseDouble(args[++i]);
  341.           int numSamples = Integer.parseInt(args[++i]);
  342.           int maxSplits = Integer.parseInt(args[++i]);
  343.           if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE;
  344.           sampler = new RandomSampler<K,V>(pcnt, numSamples, maxSplits);
  345.         } else if ("-splitInterval".equals(args[i])) {
  346.           double pcnt = Double.parseDouble(args[++i]);
  347.           int maxSplits = Integer.parseInt(args[++i]);
  348.           if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE;
  349.           sampler = new IntervalSampler<K,V>(pcnt, maxSplits);
  350.         } else {
  351.           otherArgs.add(args[i]);
  352.         }
  353.       } catch (NumberFormatException except) {
  354.         System.out.println("ERROR: Integer expected instead of " + args[i]);
  355.         return printUsage();
  356.       } catch (ArrayIndexOutOfBoundsException except) {
  357.         System.out.println("ERROR: Required parameter missing from " +
  358.             args[i-1]);
  359.         return printUsage();
  360.       }
  361.     }
  362.     if (job.getNumReduceTasks() <= 1) {
  363.       System.err.println("Sampler requires more than one reducer");
  364.       return printUsage();
  365.     }
  366.     if (otherArgs.size() < 2) {
  367.       System.out.println("ERROR: Wrong number of parameters: ");
  368.       return printUsage();
  369.     }
  370.     if (null == sampler) {
  371.       sampler = new RandomSampler<K,V>(0.1, 10000, 10);
  372.     }
  373.     Path outf = new Path(otherArgs.remove(otherArgs.size() - 1));
  374.     TotalOrderPartitioner.setPartitionFile(job, outf);
  375.     for (String s : otherArgs) {
  376.       FileInputFormat.addInputPath(job, new Path(s));
  377.     }
  378.     InputSampler.<K,V>writePartitionFile(job, sampler);
  379.     return 0;
  380.   }
  381.   public static void main(String[] args) throws Exception {
  382.     JobConf job = new JobConf(InputSampler.class);
  383.     InputSampler<?,?> sampler = new InputSampler(job);
  384.     int res = ToolRunner.run(sampler, args);
  385.     System.exit(res);
  386.   }
  387. }