PiEstimator.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:12k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.examples;
  19. import java.io.IOException;
  20. import java.math.BigDecimal;
  21. import java.util.Iterator;
  22. import org.apache.hadoop.conf.Configured;
  23. import org.apache.hadoop.fs.FileSystem;
  24. import org.apache.hadoop.fs.Path;
  25. import org.apache.hadoop.io.BooleanWritable;
  26. import org.apache.hadoop.io.LongWritable;
  27. import org.apache.hadoop.io.SequenceFile;
  28. import org.apache.hadoop.io.Writable;
  29. import org.apache.hadoop.io.WritableComparable;
  30. import org.apache.hadoop.io.SequenceFile.CompressionType;
  31. import org.apache.hadoop.mapred.FileInputFormat;
  32. import org.apache.hadoop.mapred.FileOutputFormat;
  33. import org.apache.hadoop.mapred.JobClient;
  34. import org.apache.hadoop.mapred.JobConf;
  35. import org.apache.hadoop.mapred.MapReduceBase;
  36. import org.apache.hadoop.mapred.Mapper;
  37. import org.apache.hadoop.mapred.OutputCollector;
  38. import org.apache.hadoop.mapred.Reducer;
  39. import org.apache.hadoop.mapred.Reporter;
  40. import org.apache.hadoop.mapred.SequenceFileInputFormat;
  41. import org.apache.hadoop.mapred.SequenceFileOutputFormat;
  42. import org.apache.hadoop.util.Tool;
  43. import org.apache.hadoop.util.ToolRunner;
  44. /**
  45.  * A Map-reduce program to estimate the value of Pi
  46.  * using quasi-Monte Carlo method.
  47.  *
  48.  * Mapper:
  49.  *   Generate points in a unit square
  50.  *   and then count points inside/outside of the inscribed circle of the square.
  51.  *
  52.  * Reducer:
  53.  *   Accumulate points inside/outside results from the mappers.
  54.  *
  55.  * Let numTotal = numInside + numOutside.
  56.  * The fraction numInside/numTotal is a rational approximation of
  57.  * the value (Area of the circle)/(Area of the square),
  58.  * where the area of the inscribed circle is Pi/4
  59.  * and the area of unit square is 1.
  60.  * Then, Pi is estimated value to be 4(numInside/numTotal).  
  61.  */
  62. public class PiEstimator extends Configured implements Tool {
  63.   /** tmp directory for input/output */
  64.   static private final Path TMP_DIR = new Path(
  65.       PiEstimator.class.getSimpleName() + "_TMP_3_141592654");
  66.   
  67.   /** 2-dimensional Halton sequence {H(i)},
  68.    * where H(i) is a 2-dimensional point and i >= 1 is the index.
  69.    * Halton sequence is used to generate sample points for Pi estimation. 
  70.    */
  71.   private static class HaltonSequence {
  72.     /** Bases */
  73.     static final int[] P = {2, 3}; 
  74.     /** Maximum number of digits allowed */
  75.     static final int[] K = {63, 40}; 
  76.     private long index;
  77.     private double[] x;
  78.     private double[][] q;
  79.     private int[][] d;
  80.     /** Initialize to H(startindex),
  81.      * so the sequence begins with H(startindex+1).
  82.      */
  83.     HaltonSequence(long startindex) {
  84.       index = startindex;
  85.       x = new double[K.length];
  86.       q = new double[K.length][];
  87.       d = new int[K.length][];
  88.       for(int i = 0; i < K.length; i++) {
  89.         q[i] = new double[K[i]];
  90.         d[i] = new int[K[i]];
  91.       }
  92.       for(int i = 0; i < K.length; i++) {
  93.         long k = index;
  94.         x[i] = 0;
  95.         
  96.         for(int j = 0; j < K[i]; j++) {
  97.           q[i][j] = (j == 0? 1.0: q[i][j-1])/P[i];
  98.           d[i][j] = (int)(k % P[i]);
  99.           k = (k - d[i][j])/P[i];
  100.           x[i] += d[i][j] * q[i][j];
  101.         }
  102.       }
  103.     }
  104.     /** Compute next point.
  105.      * Assume the current point is H(index).
  106.      * Compute H(index+1).
  107.      * 
  108.      * @return a 2-dimensional point with coordinates in [0,1)^2
  109.      */
  110.     double[] nextPoint() {
  111.       index++;
  112.       for(int i = 0; i < K.length; i++) {
  113.         for(int j = 0; j < K[i]; j++) {
  114.           d[i][j]++;
  115.           x[i] += q[i][j];
  116.           if (d[i][j] < P[i]) {
  117.             break;
  118.           }
  119.           d[i][j] = 0;
  120.           x[i] -= (j == 0? 1.0: q[i][j-1]);
  121.         }
  122.       }
  123.       return x;
  124.     }
  125.   }
  126.   /**
  127.    * Mapper class for Pi estimation.
  128.    * Generate points in a unit square
  129.    * and then count points inside/outside of the inscribed circle of the square.
  130.    */
  131.   public static class PiMapper extends MapReduceBase
  132.     implements Mapper<LongWritable, LongWritable, BooleanWritable, LongWritable> {
  133.     /** Map method.
  134.      * @param offset samples starting from the (offset+1)th sample.
  135.      * @param size the number of samples for this map
  136.      * @param out output {ture->numInside, false->numOutside}
  137.      * @param reporter
  138.      */
  139.     public void map(LongWritable offset,
  140.                     LongWritable size,
  141.                     OutputCollector<BooleanWritable, LongWritable> out,
  142.                     Reporter reporter) throws IOException {
  143.       final HaltonSequence haltonsequence = new HaltonSequence(offset.get());
  144.       long numInside = 0L;
  145.       long numOutside = 0L;
  146.       for(long i = 0; i < size.get(); ) {
  147.         //generate points in a unit square
  148.         final double[] point = haltonsequence.nextPoint();
  149.         //count points inside/outside of the inscribed circle of the square
  150.         final double x = point[0] - 0.5;
  151.         final double y = point[1] - 0.5;
  152.         if (x*x + y*y > 0.25) {
  153.           numOutside++;
  154.         } else {
  155.           numInside++;
  156.         }
  157.         //report status
  158.         i++;
  159.         if (i % 1000 == 0) {
  160.           reporter.setStatus("Generated " + i + " samples.");
  161.         }
  162.       }
  163.       //output map results
  164.       out.collect(new BooleanWritable(true), new LongWritable(numInside));
  165.       out.collect(new BooleanWritable(false), new LongWritable(numOutside));
  166.     }
  167.   }
  168.   /**
  169.    * Reducer class for Pi estimation.
  170.    * Accumulate points inside/outside results from the mappers.
  171.    */
  172.   public static class PiReducer extends MapReduceBase
  173.     implements Reducer<BooleanWritable, LongWritable, WritableComparable<?>, Writable> {
  174.     
  175.     private long numInside = 0;
  176.     private long numOutside = 0;
  177.     private JobConf conf; //configuration for accessing the file system
  178.       
  179.     /** Store job configuration. */
  180.     @Override
  181.     public void configure(JobConf job) {
  182.       conf = job;
  183.     }
  184.     /**
  185.      * Accumulate number of points inside/outside results from the mappers.
  186.      * @param isInside Is the points inside? 
  187.      * @param values An iterator to a list of point counts
  188.      * @param output dummy, not used here.
  189.      * @param reporter
  190.      */
  191.     public void reduce(BooleanWritable isInside,
  192.                        Iterator<LongWritable> values,
  193.                        OutputCollector<WritableComparable<?>, Writable> output,
  194.                        Reporter reporter) throws IOException {
  195.       if (isInside.get()) {
  196.         for(; values.hasNext(); numInside += values.next().get());
  197.       } else {
  198.         for(; values.hasNext(); numOutside += values.next().get());
  199.       }
  200.     }
  201.     /**
  202.      * Reduce task done, write output to a file.
  203.      */
  204.     @Override
  205.     public void close() throws IOException {
  206.       //write output to a file
  207.       Path outDir = new Path(TMP_DIR, "out");
  208.       Path outFile = new Path(outDir, "reduce-out");
  209.       FileSystem fileSys = FileSystem.get(conf);
  210.       SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
  211.           outFile, LongWritable.class, LongWritable.class, 
  212.           CompressionType.NONE);
  213.       writer.append(new LongWritable(numInside), new LongWritable(numOutside));
  214.       writer.close();
  215.     }
  216.   }
  217.   /**
  218.    * Run a map/reduce job for estimating Pi.
  219.    *
  220.    * @return the estimated value of Pi
  221.    */
  222.   public static BigDecimal estimate(int numMaps, long numPoints, JobConf jobConf
  223.       ) throws IOException {
  224.     //setup job conf
  225.     jobConf.setJobName(PiEstimator.class.getSimpleName());
  226.     jobConf.setInputFormat(SequenceFileInputFormat.class);
  227.     jobConf.setOutputKeyClass(BooleanWritable.class);
  228.     jobConf.setOutputValueClass(LongWritable.class);
  229.     jobConf.setOutputFormat(SequenceFileOutputFormat.class);
  230.     jobConf.setMapperClass(PiMapper.class);
  231.     jobConf.setNumMapTasks(numMaps);
  232.     jobConf.setReducerClass(PiReducer.class);
  233.     jobConf.setNumReduceTasks(1);
  234.     // turn off speculative execution, because DFS doesn't handle
  235.     // multiple writers to the same file.
  236.     jobConf.setSpeculativeExecution(false);
  237.     //setup input/output directories
  238.     final Path inDir = new Path(TMP_DIR, "in");
  239.     final Path outDir = new Path(TMP_DIR, "out");
  240.     FileInputFormat.setInputPaths(jobConf, inDir);
  241.     FileOutputFormat.setOutputPath(jobConf, outDir);
  242.     final FileSystem fs = FileSystem.get(jobConf);
  243.     if (fs.exists(TMP_DIR)) {
  244.       throw new IOException("Tmp directory " + fs.makeQualified(TMP_DIR)
  245.           + " already exists.  Please remove it first.");
  246.     }
  247.     if (!fs.mkdirs(inDir)) {
  248.       throw new IOException("Cannot create input directory " + inDir);
  249.     }
  250.     try {
  251.       //generate an input file for each map task
  252.       for(int i=0; i < numMaps; ++i) {
  253.         final Path file = new Path(inDir, "part"+i);
  254.         final LongWritable offset = new LongWritable(i * numPoints);
  255.         final LongWritable size = new LongWritable(numPoints);
  256.         final SequenceFile.Writer writer = SequenceFile.createWriter(
  257.             fs, jobConf, file,
  258.             LongWritable.class, LongWritable.class, CompressionType.NONE);
  259.         try {
  260.           writer.append(offset, size);
  261.         } finally {
  262.           writer.close();
  263.         }
  264.         System.out.println("Wrote input for Map #"+i);
  265.       }
  266.   
  267.       //start a map/reduce job
  268.       System.out.println("Starting Job");
  269.       final long startTime = System.currentTimeMillis();
  270.       JobClient.runJob(jobConf);
  271.       final double duration = (System.currentTimeMillis() - startTime)/1000.0;
  272.       System.out.println("Job Finished in " + duration + " seconds");
  273.       //read outputs
  274.       Path inFile = new Path(outDir, "reduce-out");
  275.       LongWritable numInside = new LongWritable();
  276.       LongWritable numOutside = new LongWritable();
  277.       SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, jobConf);
  278.       try {
  279.         reader.next(numInside, numOutside);
  280.       } finally {
  281.         reader.close();
  282.       }
  283.       //compute estimated value
  284.       return BigDecimal.valueOf(4).setScale(20)
  285.           .multiply(BigDecimal.valueOf(numInside.get()))
  286.           .divide(BigDecimal.valueOf(numMaps))
  287.           .divide(BigDecimal.valueOf(numPoints));
  288.     } finally {
  289.       fs.delete(TMP_DIR, true);
  290.     }
  291.   }
  292.   /**
  293.    * Parse arguments and then runs a map/reduce job.
  294.    * Print output in standard out.
  295.    * 
  296.    * @return a non-zero if there is an error.  Otherwise, return 0.  
  297.    */
  298.   public int run(String[] args) throws Exception {
  299.     if (args.length != 2) {
  300.       System.err.println("Usage: "+getClass().getName()+" <nMaps> <nSamples>");
  301.       ToolRunner.printGenericCommandUsage(System.err);
  302.       return -1;
  303.     }
  304.     
  305.     final int nMaps = Integer.parseInt(args[0]);
  306.     final long nSamples = Long.parseLong(args[1]);
  307.         
  308.     System.out.println("Number of Maps  = " + nMaps);
  309.     System.out.println("Samples per Map = " + nSamples);
  310.         
  311.     final JobConf jobConf = new JobConf(getConf(), getClass());
  312.     System.out.println("Estimated value of Pi is "
  313.         + estimate(nMaps, nSamples, jobConf));
  314.     return 0;
  315.   }
  316.   /**
  317.    * main method for running it as a stand alone command. 
  318.    */
  319.   public static void main(String[] argv) throws Exception {
  320.     System.exit(ToolRunner.run(null, new PiEstimator(), argv));
  321.   }
  322. }