TestRecordMR.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:17k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.record;
  19. import org.apache.hadoop.mapred.*;
  20. import org.apache.hadoop.fs.*;
  21. import org.apache.hadoop.io.*;
  22. import org.apache.hadoop.io.SequenceFile.CompressionType;
  23. import org.apache.hadoop.conf.*;
  24. import junit.framework.TestCase;
  25. import java.io.*;
  26. import java.util.*;
  27. /**********************************************************
  28.  * MapredLoadTest generates a bunch of work that exercises
  29.  * a Hadoop Map-Reduce system (and DFS, too).  It goes through
  30.  * the following steps:
  31.  *
  32.  * 1) Take inputs 'range' and 'counts'.
  33.  * 2) Generate 'counts' random integers between 0 and range-1.
  34.  * 3) Create a file that lists each integer between 0 and range-1,
  35.  *    and lists the number of times that integer was generated.
  36.  * 4) Emit a (very large) file that contains all the integers
  37.  *    in the order generated.
  38.  * 5) After the file has been generated, read it back and count
  39.  *    how many times each int was generated.
  40.  * 6) Compare this big count-map against the original one.  If
  41.  *    they match, then SUCCESS!  Otherwise, FAILURE!
  42.  *
  43.  * OK, that's how we can think about it.  What are the map-reduce
  44.  * steps that get the job done?
  45.  *
  46.  * 1) In a non-mapred thread, take the inputs 'range' and 'counts'.
  47.  * 2) In a non-mapread thread, generate the answer-key and write to disk.
  48.  * 3) In a mapred job, divide the answer key into K jobs.
  49.  * 4) A mapred 'generator' task consists of K map jobs.  Each reads
  50.  *    an individual "sub-key", and generates integers according to
  51.  *    to it (though with a random ordering).
  52.  * 5) The generator's reduce task agglomerates all of those files
  53.  *    into a single one.
  54.  * 6) A mapred 'reader' task consists of M map jobs.  The output
  55.  *    file is cut into M pieces. Each of the M jobs counts the 
  56.  *    individual ints in its chunk and creates a map of all seen ints.
  57.  * 7) A mapred job integrates all the count files into a single one.
  58.  *
  59.  **********************************************************/
  60. public class TestRecordMR extends TestCase {
  61.   /**
  62.    * Modified to make it a junit test.
  63.    * The RandomGen Job does the actual work of creating
  64.    * a huge file of assorted numbers.  It receives instructions
  65.    * as to how many times each number should be counted.  Then
  66.    * it emits those numbers in a crazy order.
  67.    *
  68.    * The map() function takes a key/val pair that describes
  69.    * a value-to-be-emitted (the key) and how many times it 
  70.    * should be emitted (the value), aka "numtimes".  map() then
  71.    * emits a series of intermediate key/val pairs.  It emits
  72.    * 'numtimes' of these.  The key is a random number and the
  73.    * value is the 'value-to-be-emitted'.
  74.    *
  75.    * The system collates and merges these pairs according to
  76.    * the random number.  reduce() function takes in a key/value
  77.    * pair that consists of a crazy random number and a series
  78.    * of values that should be emitted.  The random number key
  79.    * is now dropped, and reduce() emits a pair for every intermediate value.
  80.    * The emitted key is an intermediate value.  The emitted value
  81.    * is just a blank string.  Thus, we've created a huge file
  82.    * of numbers in random order, but where each number appears
  83.    * as many times as we were instructed.
  84.    */
  85.   static public class RandomGenMapper implements Mapper<RecInt, RecInt,
  86.                                                         RecInt, RecString> {
  87.     Random r = new Random();
  88.     public void configure(JobConf job) {
  89.     }
  90.     public void map(RecInt key,
  91.                     RecInt val,
  92.                     OutputCollector<RecInt, RecString> out,
  93.                     Reporter reporter) throws IOException {
  94.       int randomVal = key.getData();
  95.       int randomCount = val.getData();
  96.       for (int i = 0; i < randomCount; i++) {
  97.         out.collect(new RecInt(Math.abs(r.nextInt())),
  98.                     new RecString(Integer.toString(randomVal)));
  99.       }
  100.     }
  101.     public void close() {
  102.     }
  103.   }
  104.   /**
  105.    */
  106.   static public class RandomGenReducer implements Reducer<RecInt, RecString,
  107.                                                           RecInt, RecString> {
  108.     public void configure(JobConf job) {
  109.     }
  110.     public void reduce(RecInt key,
  111.                        Iterator<RecString> it,
  112.                        OutputCollector<RecInt, RecString> out,
  113.                        Reporter reporter) throws IOException {
  114.       int keyint = key.getData();
  115.       while (it.hasNext()) {
  116.         String val = it.next().getData();
  117.         out.collect(new RecInt(Integer.parseInt(val)),
  118.                     new RecString(""));
  119.       }
  120.     }
  121.     public void close() {
  122.     }
  123.   }
  124.   /**
  125.    * The RandomCheck Job does a lot of our work.  It takes
  126.    * in a num/string keyspace, and transforms it into a
  127.    * key/count(int) keyspace.
  128.    *
  129.    * The map() function just emits a num/1 pair for every
  130.    * num/string input pair.
  131.    *
  132.    * The reduce() function sums up all the 1s that were
  133.    * emitted for a single key.  It then emits the key/total
  134.    * pair.
  135.    *
  136.    * This is used to regenerate the random number "answer key".
  137.    * Each key here is a random number, and the count is the
  138.    * number of times the number was emitted.
  139.    */
  140.   static public class RandomCheckMapper implements Mapper<RecInt, RecString,
  141.                                                           RecInt, RecString> {
  142.     public void configure(JobConf job) {
  143.     }
  144.     public void map(RecInt key,
  145.                     RecString val,
  146.                     OutputCollector<RecInt, RecString> out,
  147.                     Reporter reporter) throws IOException {
  148.       int pos = key.getData();
  149.       String str = val.getData();
  150.       out.collect(new RecInt(pos), new RecString("1"));
  151.     }
  152.     public void close() {
  153.     }
  154.   }
  155.   /**
  156.    */
  157.   static public class RandomCheckReducer implements Reducer<RecInt, RecString,
  158.                                                             RecInt, RecString> {
  159.     public void configure(JobConf job) {
  160.     }
  161.         
  162.     public void reduce(RecInt key,
  163.                        Iterator<RecString> it,
  164.                        OutputCollector<RecInt, RecString> out,
  165.                        Reporter reporter) throws IOException {
  166.       int keyint = key.getData();
  167.       int count = 0;
  168.       while (it.hasNext()) {
  169.         it.next();
  170.         count++;
  171.       }
  172.       out.collect(new RecInt(keyint), new RecString(Integer.toString(count)));
  173.     }
  174.     public void close() {
  175.     }
  176.   }
  177.   /**
  178.    * The Merge Job is a really simple one.  It takes in
  179.    * an int/int key-value set, and emits the same set.
  180.    * But it merges identical keys by adding their values.
  181.    *
  182.    * Thus, the map() function is just the identity function
  183.    * and reduce() just sums.  Nothing to see here!
  184.    */
  185.   static public class MergeMapper implements Mapper<RecInt, RecString,
  186.                                                     RecInt, RecInt> {
  187.     public void configure(JobConf job) {
  188.     }
  189.     public void map(RecInt key,
  190.                     RecString val,
  191.                     OutputCollector<RecInt, RecInt> out,
  192.                     Reporter reporter) throws IOException {
  193.       int keyint = key.getData();
  194.       String valstr = val.getData();
  195.       out.collect(new RecInt(keyint), new RecInt(Integer.parseInt(valstr)));
  196.     }
  197.     public void close() {
  198.     }
  199.   }
  200.   static public class MergeReducer implements Reducer<RecInt, RecInt,
  201.                                                       RecInt, RecInt> {
  202.     public void configure(JobConf job) {
  203.     }
  204.         
  205.     public void reduce(RecInt key,
  206.                        Iterator<RecInt> it,
  207.                        OutputCollector<RecInt, RecInt> out,
  208.                        Reporter reporter) throws IOException {
  209.       int keyint = key.getData();
  210.       int total = 0;
  211.       while (it.hasNext()) {
  212.         total += it.next().getData();
  213.       }
  214.       out.collect(new RecInt(keyint), new RecInt(total));
  215.     }
  216.     public void close() {
  217.     }
  218.   }
  219.   private static int range = 10;
  220.   private static int counts = 100;
  221.   private static Random r = new Random();
  222.   private static Configuration conf = new Configuration();
  223.   public void testMapred() throws Exception {
  224.     launch();
  225.   }
  226.   /**
  227.    * 
  228.    */
  229.   public static void launch() throws Exception {
  230.     //
  231.     // Generate distribution of ints.  This is the answer key.
  232.     //
  233.     int countsToGo = counts;
  234.     int dist[] = new int[range];
  235.     for (int i = 0; i < range; i++) {
  236.       double avgInts = (1.0 * countsToGo) / (range - i);
  237.       dist[i] = (int) Math.max(0, Math.round(avgInts + (Math.sqrt(avgInts) * r.nextGaussian())));
  238.       countsToGo -= dist[i];
  239.     }
  240.     if (countsToGo > 0) {
  241.       dist[dist.length-1] += countsToGo;
  242.     }
  243.     //
  244.     // Write the answer key to a file.  
  245.     //
  246.     FileSystem fs = FileSystem.get(conf);
  247.     Path testdir = new Path("mapred.loadtest");
  248.     if (!fs.mkdirs(testdir)) {
  249.       throw new IOException("Mkdirs failed to create directory " + testdir.toString());
  250.     }
  251.     Path randomIns = new Path(testdir, "genins");
  252.     if (!fs.mkdirs(randomIns)) {
  253.       throw new IOException("Mkdirs failed to create directory " + randomIns.toString());
  254.     }
  255.     Path answerkey = new Path(randomIns, "answer.key");
  256.     SequenceFile.Writer out = SequenceFile.createWriter(fs, conf, 
  257.                                                         answerkey, RecInt.class, RecInt.class, 
  258.                                                         CompressionType.NONE);
  259.     try {
  260.       for (int i = 0; i < range; i++) {
  261.         RecInt k = new RecInt();
  262.         RecInt v = new RecInt();
  263.         k.setData(i);
  264.         v.setData(dist[i]);
  265.         out.append(k, v);
  266.       }
  267.     } finally {
  268.       out.close();
  269.     }
  270.     //
  271.     // Now we need to generate the random numbers according to
  272.     // the above distribution.
  273.     //
  274.     // We create a lot of map tasks, each of which takes at least
  275.     // one "line" of the distribution.  (That is, a certain number
  276.     // X is to be generated Y number of times.)
  277.     //
  278.     // A map task emits Y key/val pairs.  The val is X.  The key
  279.     // is a randomly-generated number.
  280.     //
  281.     // The reduce task gets its input sorted by key.  That is, sorted
  282.     // in random order.  It then emits a single line of text that
  283.     // for the given values.  It does not emit the key.
  284.     //
  285.     // Because there's just one reduce task, we emit a single big
  286.     // file of random numbers.
  287.     //
  288.     Path randomOuts = new Path(testdir, "genouts");
  289.     fs.delete(randomOuts, true);
  290.     JobConf genJob = new JobConf(conf, TestRecordMR.class);
  291.     FileInputFormat.setInputPaths(genJob, randomIns);
  292.     genJob.setInputFormat(SequenceFileInputFormat.class);
  293.     genJob.setMapperClass(RandomGenMapper.class);
  294.     FileOutputFormat.setOutputPath(genJob, randomOuts);
  295.     genJob.setOutputKeyClass(RecInt.class);
  296.     genJob.setOutputValueClass(RecString.class);
  297.     genJob.setOutputFormat(SequenceFileOutputFormat.class);
  298.     genJob.setReducerClass(RandomGenReducer.class);
  299.     genJob.setNumReduceTasks(1);
  300.     JobClient.runJob(genJob);
  301.     //
  302.     // Next, we read the big file in and regenerate the 
  303.     // original map.  It's split into a number of parts.
  304.     // (That number is 'intermediateReduces'.)
  305.     //
  306.     // We have many map tasks, each of which read at least one
  307.     // of the output numbers.  For each number read in, the
  308.     // map task emits a key/value pair where the key is the
  309.     // number and the value is "1".
  310.     //
  311.     // We have a single reduce task, which receives its input
  312.     // sorted by the key emitted above.  For each key, there will
  313.     // be a certain number of "1" values.  The reduce task sums
  314.     // these values to compute how many times the given key was
  315.     // emitted.
  316.     //
  317.     // The reduce task then emits a key/val pair where the key
  318.     // is the number in question, and the value is the number of
  319.     // times the key was emitted.  This is the same format as the
  320.     // original answer key (except that numbers emitted zero times
  321.     // will not appear in the regenerated key.)  The answer set
  322.     // is split into a number of pieces.  A final MapReduce job
  323.     // will merge them.
  324.     //
  325.     // There's not really a need to go to 10 reduces here 
  326.     // instead of 1.  But we want to test what happens when
  327.     // you have multiple reduces at once.
  328.     //
  329.     int intermediateReduces = 10;
  330.     Path intermediateOuts = new Path(testdir, "intermediateouts");
  331.     fs.delete(intermediateOuts, true);
  332.     JobConf checkJob = new JobConf(conf, TestRecordMR.class);
  333.     FileInputFormat.setInputPaths(checkJob, randomOuts);
  334.     checkJob.setInputFormat(SequenceFileInputFormat.class);
  335.     checkJob.setMapperClass(RandomCheckMapper.class);
  336.     FileOutputFormat.setOutputPath(checkJob, intermediateOuts);
  337.     checkJob.setOutputKeyClass(RecInt.class);
  338.     checkJob.setOutputValueClass(RecString.class);
  339.     checkJob.setOutputFormat(SequenceFileOutputFormat.class);
  340.     checkJob.setReducerClass(RandomCheckReducer.class);
  341.     checkJob.setNumReduceTasks(intermediateReduces);
  342.     JobClient.runJob(checkJob);
  343.     //
  344.     // OK, now we take the output from the last job and
  345.     // merge it down to a single file.  The map() and reduce()
  346.     // functions don't really do anything except reemit tuples.
  347.     // But by having a single reduce task here, we end up merging
  348.     // all the files.
  349.     //
  350.     Path finalOuts = new Path(testdir, "finalouts");        
  351.     fs.delete(finalOuts, true);
  352.     JobConf mergeJob = new JobConf(conf, TestRecordMR.class);
  353.     FileInputFormat.setInputPaths(mergeJob, intermediateOuts);
  354.     mergeJob.setInputFormat(SequenceFileInputFormat.class);
  355.     mergeJob.setMapperClass(MergeMapper.class);
  356.         
  357.     FileOutputFormat.setOutputPath(mergeJob, finalOuts);
  358.     mergeJob.setOutputKeyClass(RecInt.class);
  359.     mergeJob.setOutputValueClass(RecInt.class);
  360.     mergeJob.setOutputFormat(SequenceFileOutputFormat.class);
  361.     mergeJob.setReducerClass(MergeReducer.class);
  362.     mergeJob.setNumReduceTasks(1);
  363.         
  364.     JobClient.runJob(mergeJob);
  365.         
  366.  
  367.     //
  368.     // Finally, we compare the reconstructed answer key with the
  369.     // original one.  Remember, we need to ignore zero-count items
  370.     // in the original key.
  371.     //
  372.     boolean success = true;
  373.     Path recomputedkey = new Path(finalOuts, "part-00000");
  374.     SequenceFile.Reader in = new SequenceFile.Reader(fs, recomputedkey, conf);
  375.     int totalseen = 0;
  376.     try {
  377.       RecInt key = new RecInt();
  378.       RecInt val = new RecInt();            
  379.       for (int i = 0; i < range; i++) {
  380.         if (dist[i] == 0) {
  381.           continue;
  382.         }
  383.         if (!in.next(key, val)) {
  384.           System.err.println("Cannot read entry " + i);
  385.           success = false;
  386.           break;
  387.         } else {
  388.           if (!((key.getData() == i) && (val.getData() == dist[i]))) {
  389.             System.err.println("Mismatch!  Pos=" + key.getData() + ", i=" + i + ", val=" + val.getData() + ", dist[i]=" + dist[i]);
  390.             success = false;
  391.           }
  392.           totalseen += val.getData();
  393.         }
  394.       }
  395.       if (success) {
  396.         if (in.next(key, val)) {
  397.           System.err.println("Unnecessary lines in recomputed key!");
  398.           success = false;
  399.         }
  400.       }
  401.     } finally {
  402.       in.close();
  403.     }
  404.     int originalTotal = 0;
  405.     for (int i = 0; i < dist.length; i++) {
  406.       originalTotal += dist[i];
  407.     }
  408.     System.out.println("Original sum: " + originalTotal);
  409.     System.out.println("Recomputed sum: " + totalseen);
  410.     //
  411.     // Write to "results" whether the test succeeded or not.
  412.     //
  413.     Path resultFile = new Path(testdir, "results");
  414.     BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(fs.create(resultFile)));
  415.     try {
  416.       bw.write("Success=" + success + "n");
  417.       System.out.println("Success=" + success);            
  418.     } finally {
  419.       bw.close();
  420.     }
  421.     fs.delete(testdir, true);
  422.   }
  423.   /**
  424.    * Launches all the tasks in order.
  425.    */
  426.   public static void main(String[] argv) throws Exception {
  427.     if (argv.length < 2) {
  428.       System.err.println("Usage: TestRecordMR <range> <counts>");
  429.       System.err.println();
  430.       System.err.println("Note: a good test will have a <counts> value that is substantially larger than the <range>");
  431.       return;
  432.     }
  433.     int i = 0;
  434.     int range = Integer.parseInt(argv[i++]);
  435.     int counts = Integer.parseInt(argv[i++]);
  436.     launch();
  437.   }
  438. }