RandomWriter.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:10k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.examples;
  19. import java.io.IOException;
  20. import java.util.Date;
  21. import java.util.Random;
  22. import org.apache.hadoop.conf.Configuration;
  23. import org.apache.hadoop.conf.Configured;
  24. import org.apache.hadoop.fs.Path;
  25. import org.apache.hadoop.io.BytesWritable;
  26. import org.apache.hadoop.io.Text;
  27. import org.apache.hadoop.io.Writable;
  28. import org.apache.hadoop.io.WritableComparable;
  29. import org.apache.hadoop.mapred.ClusterStatus;
  30. import org.apache.hadoop.mapred.FileOutputFormat;
  31. import org.apache.hadoop.mapred.FileSplit;
  32. import org.apache.hadoop.mapred.InputFormat;
  33. import org.apache.hadoop.mapred.InputSplit;
  34. import org.apache.hadoop.mapred.JobClient;
  35. import org.apache.hadoop.mapred.JobConf;
  36. import org.apache.hadoop.mapred.MapReduceBase;
  37. import org.apache.hadoop.mapred.Mapper;
  38. import org.apache.hadoop.mapred.OutputCollector;
  39. import org.apache.hadoop.mapred.RecordReader;
  40. import org.apache.hadoop.mapred.Reporter;
  41. import org.apache.hadoop.mapred.SequenceFileOutputFormat;
  42. import org.apache.hadoop.mapred.lib.IdentityReducer;
  43. import org.apache.hadoop.util.GenericOptionsParser;
  44. import org.apache.hadoop.util.Tool;
  45. import org.apache.hadoop.util.ToolRunner;
  46. /**
  47.  * This program uses map/reduce to just run a distributed job where there is
  48.  * no interaction between the tasks and each task write a large unsorted
  49.  * random binary sequence file of BytesWritable.
  50.  * In order for this program to generate data for terasort with 10-byte keys
  51.  * and 90-byte values, have the following config:
  52.  * <xmp>
  53.  * <?xml version="1.0"?>
  54.  * <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
  55.  * <configuration>
  56.  *   <property>
  57.  *     <name>test.randomwrite.min_key</name>
  58.  *     <value>10</value>
  59.  *   </property>
  60.  *   <property>
  61.  *     <name>test.randomwrite.max_key</name>
  62.  *     <value>10</value>
  63.  *   </property>
  64.  *   <property>
  65.  *     <name>test.randomwrite.min_value</name>
  66.  *     <value>90</value>
  67.  *   </property>
  68.  *   <property>
  69.  *     <name>test.randomwrite.max_value</name>
  70.  *     <value>90</value>
  71.  *   </property>
  72.  *   <property>
  73.  *     <name>test.randomwrite.total_bytes</name>
  74.  *     <value>1099511627776</value>
  75.  *   </property>
  76.  * </configuration></xmp>
  77.  * 
  78.  * Equivalently, {@link RandomWriter} also supports all the above options
  79.  * and ones supported by {@link GenericOptionsParser} via the command-line.
  80.  */
  81. public class RandomWriter extends Configured implements Tool {
  82.   
  83.   /**
  84.    * User counters
  85.    */
  86.   static enum Counters { RECORDS_WRITTEN, BYTES_WRITTEN }
  87.   
  88.   /**
  89.    * A custom input format that creates virtual inputs of a single string
  90.    * for each map.
  91.    */
  92.   static class RandomInputFormat implements InputFormat<Text, Text> {
  93.     /** 
  94.      * Generate the requested number of file splits, with the filename
  95.      * set to the filename of the output file.
  96.      */
  97.     public InputSplit[] getSplits(JobConf job, 
  98.                                   int numSplits) throws IOException {
  99.       InputSplit[] result = new InputSplit[numSplits];
  100.       Path outDir = FileOutputFormat.getOutputPath(job);
  101.       for(int i=0; i < result.length; ++i) {
  102.         result[i] = new FileSplit(new Path(outDir, "dummy-split-" + i), 0, 1, 
  103.                                   (String[])null);
  104.       }
  105.       return result;
  106.     }
  107.     /**
  108.      * Return a single record (filename, "") where the filename is taken from
  109.      * the file split.
  110.      */
  111.     static class RandomRecordReader implements RecordReader<Text, Text> {
  112.       Path name;
  113.       public RandomRecordReader(Path p) {
  114.         name = p;
  115.       }
  116.       public boolean next(Text key, Text value) {
  117.         if (name != null) {
  118.           key.set(name.getName());
  119.           name = null;
  120.           return true;
  121.         }
  122.         return false;
  123.       }
  124.       public Text createKey() {
  125.         return new Text();
  126.       }
  127.       public Text createValue() {
  128.         return new Text();
  129.       }
  130.       public long getPos() {
  131.         return 0;
  132.       }
  133.       public void close() {}
  134.       public float getProgress() {
  135.         return 0.0f;
  136.       }
  137.     }
  138.     public RecordReader<Text, Text> getRecordReader(InputSplit split,
  139.                                         JobConf job, 
  140.                                         Reporter reporter) throws IOException {
  141.       return new RandomRecordReader(((FileSplit) split).getPath());
  142.     }
  143.   }
  144.   static class Map extends MapReduceBase
  145.     implements Mapper<WritableComparable, Writable,
  146.                       BytesWritable, BytesWritable> {
  147.     
  148.     private long numBytesToWrite;
  149.     private int minKeySize;
  150.     private int keySizeRange;
  151.     private int minValueSize;
  152.     private int valueSizeRange;
  153.     private Random random = new Random();
  154.     private BytesWritable randomKey = new BytesWritable();
  155.     private BytesWritable randomValue = new BytesWritable();
  156.     
  157.     private void randomizeBytes(byte[] data, int offset, int length) {
  158.       for(int i=offset + length - 1; i >= offset; --i) {
  159.         data[i] = (byte) random.nextInt(256);
  160.       }
  161.     }
  162.     
  163.     /**
  164.      * Given an output filename, write a bunch of random records to it.
  165.      */
  166.     public void map(WritableComparable key, 
  167.                     Writable value,
  168.                     OutputCollector<BytesWritable, BytesWritable> output, 
  169.                     Reporter reporter) throws IOException {
  170.       int itemCount = 0;
  171.       while (numBytesToWrite > 0) {
  172.         int keyLength = minKeySize + 
  173.           (keySizeRange != 0 ? random.nextInt(keySizeRange) : 0);
  174.         randomKey.setSize(keyLength);
  175.         randomizeBytes(randomKey.getBytes(), 0, randomKey.getLength());
  176.         int valueLength = minValueSize +
  177.           (valueSizeRange != 0 ? random.nextInt(valueSizeRange) : 0);
  178.         randomValue.setSize(valueLength);
  179.         randomizeBytes(randomValue.getBytes(), 0, randomValue.getLength());
  180.         output.collect(randomKey, randomValue);
  181.         numBytesToWrite -= keyLength + valueLength;
  182.         reporter.incrCounter(Counters.BYTES_WRITTEN, keyLength + valueLength);
  183.         reporter.incrCounter(Counters.RECORDS_WRITTEN, 1);
  184.         if (++itemCount % 200 == 0) {
  185.           reporter.setStatus("wrote record " + itemCount + ". " + 
  186.                              numBytesToWrite + " bytes left.");
  187.         }
  188.       }
  189.       reporter.setStatus("done with " + itemCount + " records.");
  190.     }
  191.     
  192.     /**
  193.      * Save the values out of the configuaration that we need to write
  194.      * the data.
  195.      */
  196.     @Override
  197.     public void configure(JobConf job) {
  198.       numBytesToWrite = job.getLong("test.randomwrite.bytes_per_map",
  199.                                     1*1024*1024*1024);
  200.       minKeySize = job.getInt("test.randomwrite.min_key", 10);
  201.       keySizeRange = 
  202.         job.getInt("test.randomwrite.max_key", 1000) - minKeySize;
  203.       minValueSize = job.getInt("test.randomwrite.min_value", 0);
  204.       valueSizeRange = 
  205.         job.getInt("test.randomwrite.max_value", 20000) - minValueSize;
  206.     }
  207.     
  208.   }
  209.   
  210.   /**
  211.    * This is the main routine for launching a distributed random write job.
  212.    * It runs 10 maps/node and each node writes 1 gig of data to a DFS file.
  213.    * The reduce doesn't do anything.
  214.    * 
  215.    * @throws IOException 
  216.    */
  217.   public int run(String[] args) throws Exception {    
  218.     if (args.length == 0) {
  219.       System.out.println("Usage: writer <out-dir>");
  220.       ToolRunner.printGenericCommandUsage(System.out);
  221.       return -1;
  222.     }
  223.     
  224.     Path outDir = new Path(args[0]);
  225.     JobConf job = new JobConf(getConf());
  226.     
  227.     job.setJarByClass(RandomWriter.class);
  228.     job.setJobName("random-writer");
  229.     FileOutputFormat.setOutputPath(job, outDir);
  230.     
  231.     job.setOutputKeyClass(BytesWritable.class);
  232.     job.setOutputValueClass(BytesWritable.class);
  233.     
  234.     job.setInputFormat(RandomInputFormat.class);
  235.     job.setMapperClass(Map.class);        
  236.     job.setReducerClass(IdentityReducer.class);
  237.     job.setOutputFormat(SequenceFileOutputFormat.class);
  238.     
  239.     JobClient client = new JobClient(job);
  240.     ClusterStatus cluster = client.getClusterStatus();
  241.     int numMapsPerHost = job.getInt("test.randomwriter.maps_per_host", 10);
  242.     long numBytesToWritePerMap = job.getLong("test.randomwrite.bytes_per_map",
  243.                                              1*1024*1024*1024);
  244.     if (numBytesToWritePerMap == 0) {
  245.       System.err.println("Cannot have test.randomwrite.bytes_per_map set to 0");
  246.       return -2;
  247.     }
  248.     long totalBytesToWrite = job.getLong("test.randomwrite.total_bytes", 
  249.          numMapsPerHost*numBytesToWritePerMap*cluster.getTaskTrackers());
  250.     int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap);
  251.     if (numMaps == 0 && totalBytesToWrite > 0) {
  252.       numMaps = 1;
  253.       job.setLong("test.randomwrite.bytes_per_map", totalBytesToWrite);
  254.     }
  255.     
  256.     job.setNumMapTasks(numMaps);
  257.     System.out.println("Running " + numMaps + " maps.");
  258.     
  259.     // reducer NONE
  260.     job.setNumReduceTasks(0);
  261.     
  262.     Date startTime = new Date();
  263.     System.out.println("Job started: " + startTime);
  264.     JobClient.runJob(job);
  265.     Date endTime = new Date();
  266.     System.out.println("Job ended: " + endTime);
  267.     System.out.println("The job took " + 
  268.                        (endTime.getTime() - startTime.getTime()) /1000 + 
  269.                        " seconds.");
  270.     
  271.     return 0;
  272.   }
  273.   
  274.   public static void main(String[] args) throws Exception {
  275.     int res = ToolRunner.run(new Configuration(), new RandomWriter(), args);
  276.     System.exit(res);
  277.   }
  278. }