ThreadedMapBenchmark.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:11k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.IOException;
  20. import java.io.File;
  21. import java.util.Random;
  22. import org.apache.commons.logging.Log;
  23. import org.apache.commons.logging.LogFactory;
  24. import org.apache.hadoop.conf.Configured;
  25. import org.apache.hadoop.examples.RandomWriter;
  26. import org.apache.hadoop.fs.FileSystem;
  27. import org.apache.hadoop.fs.Path;
  28. import org.apache.hadoop.io.BytesWritable;
  29. import org.apache.hadoop.io.Text;
  30. import org.apache.hadoop.io.Writable;
  31. import org.apache.hadoop.io.WritableComparable;
  32. import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat;
  33. import org.apache.hadoop.mapred.lib.IdentityMapper;
  34. import org.apache.hadoop.mapred.lib.IdentityReducer;
  35. import org.apache.hadoop.util.Tool;
  36. import org.apache.hadoop.util.ToolRunner;
  37. /**
  38.  * Distributed threaded map benchmark.
  39.  * <p>
  40.  * This benchmark generates random data per map and tests the performance 
  41.  * of having multiple spills (using multiple threads) over having just one 
  42.  * spill. Following are the parameters that can be specified
  43.  * <li>File size per map.
  44.  * <li>Number of spills per map. 
  45.  * <li>Number of maps per host.
  46.  * <p>
  47.  * Sort is used for benchmarking the performance. 
  48.  */
  49. public class ThreadedMapBenchmark extends Configured implements Tool {
  50.   private static final Log LOG = LogFactory.getLog(ThreadedMapBenchmark.class);
  51.   private static Path BASE_DIR =
  52.     new Path(System.getProperty("test.build.data", 
  53.                                 File.separator + "benchmarks" + File.separator 
  54.                                 + "ThreadedMapBenchmark"));
  55.   private static Path INPUT_DIR = new Path(BASE_DIR, "input");
  56.   private static Path OUTPUT_DIR = new Path(BASE_DIR, "output");
  57.   private static final float FACTOR = 2.3f; // io.sort.mb set to 
  58.                                             // (FACTOR * data_size) should 
  59.                                             // result in only 1 spill
  60.   static enum Counters { RECORDS_WRITTEN, BYTES_WRITTEN }
  61.   
  62.   /**
  63.    * Generates random input data of given size with keys and values of given 
  64.    * sizes. By default it generates 128mb input data with 10 byte keys and 10 
  65.    * byte values.
  66.    */
  67.   public static class Map extends MapReduceBase
  68.   implements Mapper<WritableComparable, Writable,
  69.                     BytesWritable, BytesWritable> {
  70.   
  71.   private long numBytesToWrite;
  72.   private int minKeySize;
  73.   private int keySizeRange;
  74.   private int minValueSize;
  75.   private int valueSizeRange;
  76.   private Random random = new Random();
  77.   private BytesWritable randomKey = new BytesWritable();
  78.   private BytesWritable randomValue = new BytesWritable();
  79.   
  80.   private void randomizeBytes(byte[] data, int offset, int length) {
  81.     for(int i = offset + length - 1; i >= offset; --i) {
  82.       data[i] = (byte) random.nextInt(256);
  83.     }
  84.   }
  85.   
  86.   public void map(WritableComparable key, 
  87.                   Writable value,
  88.                   OutputCollector<BytesWritable, BytesWritable> output, 
  89.                   Reporter reporter) throws IOException {
  90.     int itemCount = 0;
  91.     while (numBytesToWrite > 0) {
  92.       int keyLength = minKeySize 
  93.                       + (keySizeRange != 0 
  94.                          ? random.nextInt(keySizeRange) 
  95.                          : 0);
  96.       randomKey.setSize(keyLength);
  97.       randomizeBytes(randomKey.getBytes(), 0, randomKey.getLength());
  98.       int valueLength = minValueSize 
  99.                         + (valueSizeRange != 0 
  100.                            ? random.nextInt(valueSizeRange) 
  101.                            : 0);
  102.       randomValue.setSize(valueLength);
  103.       randomizeBytes(randomValue.getBytes(), 0, randomValue.getLength());
  104.       output.collect(randomKey, randomValue);
  105.       numBytesToWrite -= keyLength + valueLength;
  106.       reporter.incrCounter(Counters.BYTES_WRITTEN, 1);
  107.       reporter.incrCounter(Counters.RECORDS_WRITTEN, 1);
  108.       if (++itemCount % 200 == 0) {
  109.         reporter.setStatus("wrote record " + itemCount + ". " 
  110.                            + numBytesToWrite + " bytes left.");
  111.       }
  112.     }
  113.     reporter.setStatus("done with " + itemCount + " records.");
  114.   }
  115.   
  116.   @Override
  117.   public void configure(JobConf job) {
  118.     numBytesToWrite = job.getLong("test.tmb.bytes_per_map",
  119.                                   128 * 1024 * 1024);
  120.     minKeySize = job.getInt("test.tmb.min_key", 10);
  121.     keySizeRange = job.getInt("test.tmb.max_key", 10) - minKeySize;
  122.     minValueSize = job.getInt("test.tmb.min_value", 10);
  123.     valueSizeRange = job.getInt("test.tmb.max_value", 10) - minValueSize;
  124.   }
  125. }
  126.   /**
  127.    * Generate input data for the benchmark
  128.    */
  129.   public static void generateInputData(int dataSizePerMap, 
  130.                                        int numSpillsPerMap, 
  131.                                        int numMapsPerHost, 
  132.                                        JobConf masterConf) 
  133.   throws Exception { 
  134.     JobConf job = new JobConf(masterConf, ThreadedMapBenchmark.class);
  135.     job.setJobName("threaded-map-benchmark-random-writer");
  136.     job.setJarByClass(ThreadedMapBenchmark.class);
  137.     job.setInputFormat(UtilsForTests.RandomInputFormat.class);
  138.     job.setOutputFormat(SequenceFileOutputFormat.class);
  139.     
  140.     job.setMapperClass(Map.class);
  141.     job.setReducerClass(IdentityReducer.class);
  142.     
  143.     job.setOutputKeyClass(BytesWritable.class);
  144.     job.setOutputValueClass(BytesWritable.class);
  145.     
  146.     JobClient client = new JobClient(job);
  147.     ClusterStatus cluster = client.getClusterStatus();
  148.     long totalDataSize = dataSizePerMap * numMapsPerHost 
  149.                          * cluster.getTaskTrackers();
  150.     job.set("test.tmb.bytes_per_map", 
  151.             String.valueOf(dataSizePerMap * 1024 * 1024));
  152.     job.setNumReduceTasks(0); // none reduce
  153.     job.setNumMapTasks(numMapsPerHost * cluster.getTaskTrackers());
  154.     FileOutputFormat.setOutputPath(job, INPUT_DIR);
  155.     
  156.     FileSystem fs = FileSystem.get(job);
  157.     fs.delete(BASE_DIR, true);
  158.     
  159.     LOG.info("Generating random input for the benchmark");
  160.     LOG.info("Total data : " + totalDataSize + " mb");
  161.     LOG.info("Data per map: " + dataSizePerMap + " mb");
  162.     LOG.info("Number of spills : " + numSpillsPerMap);
  163.     LOG.info("Number of maps per host : " + numMapsPerHost);
  164.     LOG.info("Number of hosts : " + cluster.getTaskTrackers());
  165.     
  166.     JobClient.runJob(job); // generates the input for the benchmark
  167.   }
  168.   /**
  169.    * This is the main routine for launching the benchmark. It generates random 
  170.    * input data. The input is non-splittable. Sort is used for benchmarking. 
  171.    * This benchmark reports the effect of having multiple sort and spill 
  172.    * cycles over a single sort and spill. 
  173.    * 
  174.    * @throws IOException 
  175.    */
  176.   public int run (String[] args) throws Exception {
  177.     LOG.info("Starting the benchmark for threaded spills");
  178.     String version = "ThreadedMapBenchmark.0.0.1";
  179.     System.out.println(version);
  180.     
  181.     String usage = 
  182.       "Usage: threadedmapbenchmark " +
  183.       "[-dataSizePerMap <data size (in mb) per map, default is 128 mb>] " + 
  184.       "[-numSpillsPerMap <number of spills per map, default is 2>] " +
  185.       "[-numMapsPerHost <number of maps per host, default is 1>]";
  186.     
  187.     int dataSizePerMap = 128; // in mb
  188.     int numSpillsPerMap = 2;
  189.     int numMapsPerHost = 1;
  190.     JobConf masterConf = new JobConf(getConf());
  191.     
  192.     for (int i = 0; i < args.length; i++) { // parse command line
  193.       if (args[i].equals("-dataSizePerMap")) {
  194.         dataSizePerMap = Integer.parseInt(args[++i]);
  195.       } else if (args[i].equals("-numSpillsPerMap")) {
  196.         numSpillsPerMap = Integer.parseInt(args[++i]);
  197.       } else if (args[i].equals("-numMapsPerHost")) {
  198.         numMapsPerHost = Integer.parseInt(args[++i]);
  199.       } else {
  200.         System.err.println(usage);
  201.         System.exit(-1);
  202.       }
  203.     }
  204.     
  205.     if (dataSizePerMap <  1 ||  // verify arguments
  206.         numSpillsPerMap < 1 ||
  207.         numMapsPerHost < 1)
  208.       {
  209.         System.err.println(usage);
  210.         System.exit(-1);
  211.       }
  212.     
  213.     FileSystem fs = null;
  214.     try {
  215.       // using random-writer to generate the input data
  216.       generateInputData(dataSizePerMap, numSpillsPerMap, numMapsPerHost, 
  217.                         masterConf);
  218.       
  219.       // configure job for sorting
  220.       JobConf job = new JobConf(masterConf, ThreadedMapBenchmark.class);
  221.       job.setJobName("threaded-map-benchmark-unspilled");
  222.       job.setJarByClass(ThreadedMapBenchmark.class);
  223.       job.setInputFormat(NonSplitableSequenceFileInputFormat.class);
  224.       job.setOutputFormat(SequenceFileOutputFormat.class);
  225.       
  226.       job.setOutputKeyClass(BytesWritable.class);
  227.       job.setOutputValueClass(BytesWritable.class);
  228.       
  229.       job.setMapperClass(IdentityMapper.class);        
  230.       job.setReducerClass(IdentityReducer.class);
  231.       
  232.       FileInputFormat.addInputPath(job, INPUT_DIR);
  233.       FileOutputFormat.setOutputPath(job, OUTPUT_DIR);
  234.       
  235.       JobClient client = new JobClient(job);
  236.       ClusterStatus cluster = client.getClusterStatus();
  237.       job.setNumMapTasks(numMapsPerHost * cluster.getTaskTrackers());
  238.       job.setNumReduceTasks(1);
  239.       
  240.       // set io.sort.mb to avoid spill
  241.       int ioSortMb = (int)Math.ceil(FACTOR * dataSizePerMap);
  242.       job.set("io.sort.mb", String.valueOf(ioSortMb));
  243.       fs = FileSystem.get(job);
  244.       
  245.       LOG.info("Running sort with 1 spill per map");
  246.       long startTime = System.currentTimeMillis();
  247.       JobClient.runJob(job);
  248.       long endTime = System.currentTimeMillis();
  249.       
  250.       LOG.info("Total time taken : " + String.valueOf(endTime - startTime) 
  251.                + " millisec");
  252.       fs.delete(OUTPUT_DIR, true);
  253.       
  254.       // set io.sort.mb to have multiple spills
  255.       JobConf spilledJob = new JobConf(job, ThreadedMapBenchmark.class);
  256.       ioSortMb = (int)Math.ceil(FACTOR 
  257.                                 * Math.ceil((double)dataSizePerMap 
  258.                                             / numSpillsPerMap));
  259.       spilledJob.set("io.sort.mb", String.valueOf(ioSortMb));
  260.       spilledJob.setJobName("threaded-map-benchmark-spilled");
  261.       spilledJob.setJarByClass(ThreadedMapBenchmark.class);
  262.       
  263.       LOG.info("Running sort with " + numSpillsPerMap + " spills per map");
  264.       startTime = System.currentTimeMillis();
  265.       JobClient.runJob(spilledJob);
  266.       endTime = System.currentTimeMillis();
  267.       
  268.       LOG.info("Total time taken : " + String.valueOf(endTime - startTime) 
  269.                + " millisec");
  270.     } finally {
  271.       if (fs != null) {
  272.         fs.delete(BASE_DIR, true);
  273.       }
  274.     }
  275.     return 0;
  276.   }
  277.   public static void main(String[] args) throws Exception {
  278.     int res = ToolRunner.run(new ThreadedMapBenchmark(), args);
  279.     System.exit(res);
  280.   }
  281. }