MRBench.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:11k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.IOException;
  20. import java.io.PrintStream;
  21. import java.util.ArrayList;
  22. import java.util.Iterator;
  23. import java.util.Random;
  24. import org.apache.commons.logging.Log;
  25. import org.apache.commons.logging.LogFactory;
  26. import org.apache.hadoop.fs.FileSystem;
  27. import org.apache.hadoop.fs.Path;
  28. import org.apache.hadoop.io.UTF8;
  29. import org.apache.hadoop.io.WritableComparable;
  30. import org.apache.hadoop.io.Text;
  31. /**
  32.  * Runs a job multiple times and takes average of all runs.
  33.  */
  34. public class MRBench {
  35.   
  36.   private static final Log LOG = LogFactory.getLog(MRBench.class);
  37.   private static Path BASE_DIR =
  38.     new Path(System.getProperty("test.build.data","/benchmarks/MRBench"));
  39.   private static Path INPUT_DIR = new Path(BASE_DIR, "mr_input");
  40.   private static Path OUTPUT_DIR = new Path(BASE_DIR, "mr_output");
  41.   
  42.   public static enum Order {RANDOM, ASCENDING, DESCENDING}; 
  43.   
  44.   /**
  45.    * Takes input format as text lines, runs some processing on it and 
  46.    * writes out data as text again. 
  47.    */
  48.   public static class Map extends MapReduceBase
  49.     implements Mapper<WritableComparable, Text, UTF8, UTF8> {
  50.     
  51.     public void map(WritableComparable key, Text value,
  52.                     OutputCollector<UTF8, UTF8> output,
  53.                     Reporter reporter) throws IOException 
  54.     {
  55.       String line = value.toString();
  56.       output.collect(new UTF8(process(line)), new UTF8(""));
  57.     }
  58.     public String process(String line) {
  59.       return line; 
  60.     }
  61.   }
  62.   /**
  63.    * Ignores the key and writes values to the output. 
  64.    */
  65.   public static class Reduce extends MapReduceBase
  66.     implements Reducer<UTF8, UTF8, UTF8, UTF8> {
  67.     
  68.     public void reduce(UTF8 key, Iterator<UTF8> values,
  69.                        OutputCollector<UTF8, UTF8> output, Reporter reporter) throws IOException 
  70.     {
  71.       while(values.hasNext()) {
  72.         output.collect(key, new UTF8(values.next().toString()));
  73.       }
  74.     }
  75.   }
  76.   /**
  77.    * Generate a text file on the given filesystem with the given path name.
  78.    * The text file will contain the given number of lines of generated data.
  79.    * The generated data are string representations of numbers.  Each line
  80.    * is the same length, which is achieved by padding each number with
  81.    * an appropriate number of leading '0' (zero) characters.  The order of
  82.    * generated data is one of ascending, descending, or random.
  83.    */
  84.   public static void generateTextFile(FileSystem fs, Path inputFile, 
  85.                                       long numLines, Order sortOrder) throws IOException 
  86.   {
  87.     LOG.info("creating control file: "+numLines+" numLines, "+sortOrder+" sortOrder");
  88.     PrintStream output = null;
  89.     try {
  90.       output = new PrintStream(fs.create(inputFile));
  91.       int padding = String.valueOf(numLines).length();
  92.       switch(sortOrder) {
  93.       case RANDOM:
  94.         for (long l = 0; l < numLines; l++) {
  95.           output.println(pad((new Random()).nextLong(), padding));
  96.         }
  97.         break; 
  98.       case ASCENDING: 
  99.         for (long l = 0; l < numLines; l++) {
  100.           output.println(pad(l, padding));
  101.         }
  102.         break;
  103.       case DESCENDING: 
  104.         for (long l = numLines; l > 0; l--) {
  105.           output.println(pad(l, padding));
  106.         }
  107.         break;
  108.       }
  109.     } finally {
  110.       if (output != null)
  111.         output.close();
  112.     }
  113.     LOG.info("created control file: " + inputFile);
  114.   }
  115.   
  116.   /**
  117.    * Convert the given number to a string and pad the number with 
  118.    * leading '0' (zero) characters so that the string is exactly
  119.    * the given length.
  120.    */
  121.   private static String pad(long number, int length) {
  122.     String str = String.valueOf(number);
  123.     StringBuffer value = new StringBuffer(); 
  124.     for (int i = str.length(); i < length; i++) {
  125.       value.append("0"); 
  126.     }
  127.     value.append(str); 
  128.     return value.toString();
  129.   }
  130.   
  131.   /**
  132.    * Create the job configuration.
  133.    */
  134.   private static JobConf setupJob(int numMaps, int numReduces, String jarFile) {
  135.     JobConf jobConf = new JobConf(MRBench.class);
  136.     FileInputFormat.addInputPath(jobConf, INPUT_DIR);
  137.     
  138.     jobConf.setInputFormat(TextInputFormat.class);
  139.     jobConf.setOutputFormat(TextOutputFormat.class);
  140.     
  141.     jobConf.setOutputValueClass(UTF8.class);
  142.     
  143.     jobConf.setMapOutputKeyClass(UTF8.class);
  144.     jobConf.setMapOutputValueClass(UTF8.class);
  145.     
  146.     if (null != jarFile) {
  147.       jobConf.setJar(jarFile);
  148.     }
  149.     jobConf.setMapperClass(Map.class);
  150.     jobConf.setReducerClass(Reduce.class);
  151.     
  152.     jobConf.setNumMapTasks(numMaps);
  153.     jobConf.setNumReduceTasks(numReduces);
  154.     
  155.     return jobConf; 
  156.   }
  157.   
  158.   /**
  159.    * Runs a MapReduce task, given number of times. The input to each run
  160.    * is the same file.
  161.    */
  162.   private static ArrayList<Long> runJobInSequence(JobConf masterJobConf, int numRuns) throws IOException {
  163.     Path intrimData = null; 
  164.     Random rand = new Random();
  165.     ArrayList<Long> execTimes = new ArrayList<Long>(); 
  166.     
  167.     for (int i = 0; i < numRuns; i++) {
  168.       // create a new job conf every time, reusing same object does not work 
  169.       JobConf jobConf = new JobConf(masterJobConf);
  170.       // reset the job jar because the copy constructor doesn't
  171.       jobConf.setJar(masterJobConf.getJar());
  172.       // give a new random name to output of the mapred tasks
  173.       FileOutputFormat.setOutputPath(jobConf, 
  174.                          new Path(OUTPUT_DIR, "output_" + rand.nextInt()));
  175.       LOG.info("Running job " + i + ":" +
  176.                " input=" + FileInputFormat.getInputPaths(jobConf)[0] + 
  177.                " output=" + FileOutputFormat.getOutputPath(jobConf));
  178.       
  179.       // run the mapred task now 
  180.       long curTime = System.currentTimeMillis();
  181.       JobClient.runJob(jobConf);
  182.       execTimes.add(new Long(System.currentTimeMillis() - curTime));
  183.     }
  184.     return execTimes;
  185.   }
  186.   
  187.   /**
  188.    * <pre>
  189.    * Usage: mrbench
  190.    *    [-baseDir <base DFS path for output/input, default is /benchmarks/MRBench>]
  191.    *    [-jar <local path to job jar file containing Mapper and Reducer implementations, default is current jar file>]
  192.    *    [-numRuns <number of times to run the job, default is 1>]
  193.    *    [-maps <number of maps for each run, default is 2>]
  194.    *    [-reduces <number of reduces for each run, default is 1>]
  195.    *    [-inputLines <number of input lines to generate, default is 1>]
  196.    *    [-inputType <type of input to generate, one of ascending (default), descending, random>]
  197.    *    [-verbose]
  198.    * </pre>
  199.    */
  200.   public static void main (String[] args) throws IOException {
  201.     String version = "MRBenchmark.0.0.2";
  202.     System.out.println(version);
  203.     String usage = 
  204.       "Usage: mrbench " +
  205.       "[-baseDir <base DFS path for output/input, default is /benchmarks/MRBench>] " + 
  206.       "[-jar <local path to job jar file containing Mapper and Reducer implementations, default is current jar file>] " + 
  207.       "[-numRuns <number of times to run the job, default is 1>] " +
  208.       "[-maps <number of maps for each run, default is 2>] " +
  209.       "[-reduces <number of reduces for each run, default is 1>] " +
  210.       "[-inputLines <number of input lines to generate, default is 1>] " +
  211.       "[-inputType <type of input to generate, one of ascending (default), descending, random>] " + 
  212.       "[-verbose]";
  213.     
  214.     String jarFile = null;
  215.     int inputLines = 1; 
  216.     int numRuns = 1;
  217.     int numMaps = 2; 
  218.     int numReduces = 1;
  219.     boolean verbose = false;         
  220.     Order inputSortOrder = Order.ASCENDING;     
  221.     for (int i = 0; i < args.length; i++) { // parse command line
  222.       if (args[i].equals("-jar")) {
  223.         jarFile = args[++i];
  224.       } else if (args[i].equals("-numRuns")) {
  225.         numRuns = Integer.parseInt(args[++i]);
  226.       } else if (args[i].equals("-baseDir")) {
  227.         BASE_DIR = new Path(args[++i]);
  228.       } else if (args[i].equals("-maps")) {
  229.         numMaps = Integer.parseInt(args[++i]);
  230.       } else if (args[i].equals("-reduces")) {
  231.         numReduces = Integer.parseInt(args[++i]);
  232.       } else if (args[i].equals("-inputLines")) {
  233.         inputLines = Integer.parseInt(args[++i]);
  234.       } else if (args[i].equals("-inputType")) {
  235.         String s = args[++i]; 
  236.         if (s.equalsIgnoreCase("ascending")) {
  237.           inputSortOrder = Order.ASCENDING;
  238.         } else if (s.equalsIgnoreCase("descending")) {
  239.           inputSortOrder = Order.DESCENDING; 
  240.         } else if (s.equalsIgnoreCase("random")) {
  241.           inputSortOrder = Order.RANDOM;
  242.         } else {
  243.           inputSortOrder = null;
  244.         }
  245.       } else if (args[i].equals("-verbose")) {
  246.         verbose = true;
  247.       } else {
  248.         System.err.println(usage);
  249.         System.exit(-1);
  250.       }
  251.     }
  252.     
  253.     if (numRuns < 1 ||  // verify args
  254.         numMaps < 1 ||
  255.         numReduces < 1 ||
  256.         inputLines < 0 ||
  257.         inputSortOrder == null)
  258.       {
  259.         System.err.println(usage);
  260.         System.exit(-1);
  261.       }
  262.     JobConf jobConf = setupJob(numMaps, numReduces, jarFile);
  263.     FileSystem fs = FileSystem.get(jobConf);
  264.     Path inputFile = new Path(INPUT_DIR, "input_" + (new Random()).nextInt() + ".txt");
  265.     generateTextFile(fs, inputFile, inputLines, inputSortOrder);
  266.     // setup test output directory
  267.     fs.mkdirs(BASE_DIR); 
  268.     ArrayList<Long> execTimes = new ArrayList<Long>();
  269.     try {
  270.       execTimes = runJobInSequence(jobConf, numRuns);
  271.     } finally {
  272.       // delete output -- should we really do this?
  273.       fs.delete(BASE_DIR, true);
  274.     }
  275.     
  276.     if (verbose) {
  277.       // Print out a report 
  278.       System.out.println("Total MapReduce jobs executed: " + numRuns);
  279.       System.out.println("Total lines of data per job: " + inputLines);
  280.       System.out.println("Maps per job: " + numMaps);
  281.       System.out.println("Reduces per job: " + numReduces);
  282.     }
  283.     int i = 0;
  284.     long totalTime = 0; 
  285.     for (Long time : execTimes) {
  286.       totalTime += time.longValue(); 
  287.       if (verbose) {
  288.         System.out.println("Total milliseconds for task: " + (++i) + 
  289.                            " = " +  time);
  290.       }
  291.     }
  292.     long avgTime = totalTime / numRuns;    
  293.     System.out.println("DataLinestMapstReducestAvgTime (milliseconds)");
  294.     System.out.println(inputLines + "tt" + numMaps + "t" + 
  295.                        numReduces + "t" + avgTime);
  296.   }
  297.   
  298. }