TeraGen.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:12k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.examples.terasort;
  19. import java.io.DataInput;
  20. import java.io.DataOutput;
  21. import java.io.IOException;
  22. import org.apache.hadoop.conf.Configured;
  23. import org.apache.hadoop.fs.Path;
  24. import org.apache.hadoop.io.LongWritable;
  25. import org.apache.hadoop.io.NullWritable;
  26. import org.apache.hadoop.io.Text;
  27. import org.apache.hadoop.io.WritableUtils;
  28. import org.apache.hadoop.mapred.FileOutputFormat;
  29. import org.apache.hadoop.mapred.InputFormat;
  30. import org.apache.hadoop.mapred.InputSplit;
  31. import org.apache.hadoop.mapred.JobClient;
  32. import org.apache.hadoop.mapred.JobConf;
  33. import org.apache.hadoop.mapred.MapReduceBase;
  34. import org.apache.hadoop.mapred.Mapper;
  35. import org.apache.hadoop.mapred.OutputCollector;
  36. import org.apache.hadoop.mapred.RecordReader;
  37. import org.apache.hadoop.mapred.Reporter;
  38. import org.apache.hadoop.util.Tool;
  39. import org.apache.hadoop.util.ToolRunner;
  40. /**
  41.  * Generate the official terasort input data set.
  42.  * The user specifies the number of rows and the output directory and this
  43.  * class runs a map/reduce program to generate the data.
  44.  * The format of the data is:
  45.  * <ul>
  46.  * <li>(10 bytes key) (10 bytes rowid) (78 bytes filler) r n
  47.  * <li>The keys are random characters from the set ' ' .. '~'.
  48.  * <li>The rowid is the right justified row id as a int.
  49.  * <li>The filler consists of 7 runs of 10 characters from 'A' to 'Z'.
  50.  * </ul>
  51.  *
  52.  * <p>
  53.  * To run the program: 
  54.  * <b>bin/hadoop jar hadoop-*-examples.jar teragen 10000000000 in-dir</b>
  55.  */
  56. public class TeraGen extends Configured implements Tool {
  57.   /**
  58.    * An input format that assigns ranges of longs to each mapper.
  59.    */
  60.   static class RangeInputFormat 
  61.        implements InputFormat<LongWritable, NullWritable> {
  62.     
  63.     /**
  64.      * An input split consisting of a range on numbers.
  65.      */
  66.     static class RangeInputSplit implements InputSplit {
  67.       long firstRow;
  68.       long rowCount;
  69.       public RangeInputSplit() { }
  70.       public RangeInputSplit(long offset, long length) {
  71.         firstRow = offset;
  72.         rowCount = length;
  73.       }
  74.       public long getLength() throws IOException {
  75.         return 0;
  76.       }
  77.       public String[] getLocations() throws IOException {
  78.         return new String[]{};
  79.       }
  80.       public void readFields(DataInput in) throws IOException {
  81.         firstRow = WritableUtils.readVLong(in);
  82.         rowCount = WritableUtils.readVLong(in);
  83.       }
  84.       public void write(DataOutput out) throws IOException {
  85.         WritableUtils.writeVLong(out, firstRow);
  86.         WritableUtils.writeVLong(out, rowCount);
  87.       }
  88.     }
  89.     
  90.     /**
  91.      * A record reader that will generate a range of numbers.
  92.      */
  93.     static class RangeRecordReader 
  94.           implements RecordReader<LongWritable, NullWritable> {
  95.       long startRow;
  96.       long finishedRows;
  97.       long totalRows;
  98.       public RangeRecordReader(RangeInputSplit split) {
  99.         startRow = split.firstRow;
  100.         finishedRows = 0;
  101.         totalRows = split.rowCount;
  102.       }
  103.       public void close() throws IOException {
  104.         // NOTHING
  105.       }
  106.       public LongWritable createKey() {
  107.         return new LongWritable();
  108.       }
  109.       public NullWritable createValue() {
  110.         return NullWritable.get();
  111.       }
  112.       public long getPos() throws IOException {
  113.         return finishedRows;
  114.       }
  115.       public float getProgress() throws IOException {
  116.         return finishedRows / (float) totalRows;
  117.       }
  118.       public boolean next(LongWritable key, 
  119.                           NullWritable value) {
  120.         if (finishedRows < totalRows) {
  121.           key.set(startRow + finishedRows);
  122.           finishedRows += 1;
  123.           return true;
  124.         } else {
  125.           return false;
  126.         }
  127.       }
  128.       
  129.     }
  130.     public RecordReader<LongWritable, NullWritable> 
  131.       getRecordReader(InputSplit split, JobConf job,
  132.                       Reporter reporter) throws IOException {
  133.       return new RangeRecordReader((RangeInputSplit) split);
  134.     }
  135.     /**
  136.      * Create the desired number of splits, dividing the number of rows
  137.      * between the mappers.
  138.      */
  139.     public InputSplit[] getSplits(JobConf job, 
  140.                                   int numSplits) {
  141.       long totalRows = getNumberOfRows(job);
  142.       long rowsPerSplit = totalRows / numSplits;
  143.       System.out.println("Generating " + totalRows + " using " + numSplits + 
  144.                          " maps with step of " + rowsPerSplit);
  145.       InputSplit[] splits = new InputSplit[numSplits];
  146.       long currentRow = 0;
  147.       for(int split=0; split < numSplits-1; ++split) {
  148.         splits[split] = new RangeInputSplit(currentRow, rowsPerSplit);
  149.         currentRow += rowsPerSplit;
  150.       }
  151.       splits[numSplits-1] = new RangeInputSplit(currentRow, 
  152.                                                 totalRows - currentRow);
  153.       return splits;
  154.     }
  155.   }
  156.   
  157.   static long getNumberOfRows(JobConf job) {
  158.     return job.getLong("terasort.num-rows", 0);
  159.   }
  160.   
  161.   static void setNumberOfRows(JobConf job, long numRows) {
  162.     job.setLong("terasort.num-rows", numRows);
  163.   }
  164.   static class RandomGenerator {
  165.     private long seed = 0;
  166.     private static final long mask32 = (1l<<32) - 1;
  167.     /**
  168.      * The number of iterations separating the precomputed seeds.
  169.      */
  170.     private static final int seedSkip = 128 * 1024 * 1024;
  171.     /**
  172.      * The precomputed seed values after every seedSkip iterations.
  173.      * There should be enough values so that a 2**32 iterations are 
  174.      * covered.
  175.      */
  176.     private static final long[] seeds = new long[]{0L,
  177.                                                    4160749568L,
  178.                                                    4026531840L,
  179.                                                    3892314112L,
  180.                                                    3758096384L,
  181.                                                    3623878656L,
  182.                                                    3489660928L,
  183.                                                    3355443200L,
  184.                                                    3221225472L,
  185.                                                    3087007744L,
  186.                                                    2952790016L,
  187.                                                    2818572288L,
  188.                                                    2684354560L,
  189.                                                    2550136832L,
  190.                                                    2415919104L,
  191.                                                    2281701376L,
  192.                                                    2147483648L,
  193.                                                    2013265920L,
  194.                                                    1879048192L,
  195.                                                    1744830464L,
  196.                                                    1610612736L,
  197.                                                    1476395008L,
  198.                                                    1342177280L,
  199.                                                    1207959552L,
  200.                                                    1073741824L,
  201.                                                    939524096L,
  202.                                                    805306368L,
  203.                                                    671088640L,
  204.                                                    536870912L,
  205.                                                    402653184L,
  206.                                                    268435456L,
  207.                                                    134217728L,
  208.                                                   };
  209.     /**
  210.      * Start the random number generator on the given iteration.
  211.      * @param initalIteration the iteration number to start on
  212.      */
  213.     RandomGenerator(long initalIteration) {
  214.       int baseIndex = (int) ((initalIteration & mask32) / seedSkip);
  215.       seed = seeds[baseIndex];
  216.       for(int i=0; i < initalIteration % seedSkip; ++i) {
  217.         next();
  218.       }
  219.     }
  220.     RandomGenerator() {
  221.       this(0);
  222.     }
  223.     long next() {
  224.       seed = (seed * 3141592621l + 663896637) & mask32;
  225.       return seed;
  226.     }
  227.   }
  228.   /**
  229.    * The Mapper class that given a row number, will generate the appropriate 
  230.    * output line.
  231.    */
  232.   public static class SortGenMapper extends MapReduceBase 
  233.       implements Mapper<LongWritable, NullWritable, Text, Text> {
  234.     private Text key = new Text();
  235.     private Text value = new Text();
  236.     private RandomGenerator rand;
  237.     private byte[] keyBytes = new byte[12];
  238.     private byte[] spaces = "          ".getBytes();
  239.     private byte[][] filler = new byte[26][];
  240.     {
  241.       for(int i=0; i < 26; ++i) {
  242.         filler[i] = new byte[10];
  243.         for(int j=0; j<10; ++j) {
  244.           filler[i][j] = (byte) ('A' + i);
  245.         }
  246.       }
  247.     }
  248.     
  249.     /**
  250.      * Add a random key to the text
  251.      * @param rowId
  252.      */
  253.     private void addKey() {
  254.       for(int i=0; i<3; i++) {
  255.         long temp = rand.next() / 52;
  256.         keyBytes[3 + 4*i] = (byte) (' ' + (temp % 95));
  257.         temp /= 95;
  258.         keyBytes[2 + 4*i] = (byte) (' ' + (temp % 95));
  259.         temp /= 95;
  260.         keyBytes[1 + 4*i] = (byte) (' ' + (temp % 95));
  261.         temp /= 95;
  262.         keyBytes[4*i] = (byte) (' ' + (temp % 95));
  263.       }
  264.       key.set(keyBytes, 0, 10);
  265.     }
  266.     
  267.     /**
  268.      * Add the rowid to the row.
  269.      * @param rowId
  270.      */
  271.     private void addRowId(long rowId) {
  272.       byte[] rowid = Integer.toString((int) rowId).getBytes();
  273.       int padSpace = 10 - rowid.length;
  274.       if (padSpace > 0) {
  275.         value.append(spaces, 0, 10 - rowid.length);
  276.       }
  277.       value.append(rowid, 0, Math.min(rowid.length, 10));
  278.     }
  279.     /**
  280.      * Add the required filler bytes. Each row consists of 7 blocks of
  281.      * 10 characters and 1 block of 8 characters.
  282.      * @param rowId the current row number
  283.      */
  284.     private void addFiller(long rowId) {
  285.       int base = (int) ((rowId * 8) % 26);
  286.       for(int i=0; i<7; ++i) {
  287.         value.append(filler[(base+i) % 26], 0, 10);
  288.       }
  289.       value.append(filler[(base+7) % 26], 0, 8);
  290.     }
  291.     public void map(LongWritable row, NullWritable ignored,
  292.                     OutputCollector<Text, Text> output,
  293.                     Reporter reporter) throws IOException {
  294.       long rowId = row.get();
  295.       if (rand == null) {
  296.         // we use 3 random numbers per a row
  297.         rand = new RandomGenerator(rowId*3);
  298.       }
  299.       addKey();
  300.       value.clear();
  301.       addRowId(rowId);
  302.       addFiller(rowId);
  303.       output.collect(key, value);
  304.     }
  305.   }
  306.   /**
  307.    * @param args the cli arguments
  308.    */
  309.   public int run(String[] args) throws IOException {
  310.     JobConf job = (JobConf) getConf();
  311.     setNumberOfRows(job, Long.parseLong(args[0]));
  312.     FileOutputFormat.setOutputPath(job, new Path(args[1]));
  313.     job.setJobName("TeraGen");
  314.     job.setJarByClass(TeraGen.class);
  315.     job.setMapperClass(SortGenMapper.class);
  316.     job.setNumReduceTasks(0);
  317.     job.setOutputKeyClass(Text.class);
  318.     job.setOutputValueClass(Text.class);
  319.     job.setInputFormat(RangeInputFormat.class);
  320.     job.setOutputFormat(TeraOutputFormat.class);
  321.     JobClient.runJob(job);
  322.     return 0;
  323.   }
  324.   public static void main(String[] args) throws Exception {
  325.     int res = ToolRunner.run(new JobConf(), new TeraGen(), args);
  326.     System.exit(res);
  327.   }
  328. }