TestMapRed.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:28k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.BufferedReader;
  20. import java.io.BufferedWriter;
  21. import java.io.DataInputStream;
  22. import java.io.DataOutputStream;
  23. import java.io.IOException;
  24. import java.io.InputStreamReader;
  25. import java.io.OutputStreamWriter;
  26. import java.util.EnumSet;
  27. import java.util.Iterator;
  28. import java.util.Random;
  29. import junit.framework.TestCase;
  30. import org.apache.hadoop.conf.Configuration;
  31. import org.apache.hadoop.fs.FileStatus;
  32. import org.apache.hadoop.fs.FileSystem;
  33. import org.apache.hadoop.fs.Path;
  34. import org.apache.hadoop.io.IntWritable;
  35. import org.apache.hadoop.io.LongWritable;
  36. import org.apache.hadoop.io.NullWritable;
  37. import org.apache.hadoop.io.SequenceFile;
  38. import org.apache.hadoop.io.Text;
  39. import org.apache.hadoop.io.WritableComparable;
  40. import org.apache.hadoop.io.SequenceFile.CompressionType;
  41. import org.apache.hadoop.mapred.lib.IdentityMapper;
  42. import org.apache.hadoop.mapred.lib.IdentityReducer;
  43. /**********************************************************
  44.  * MapredLoadTest generates a bunch of work that exercises
  45.  * a Hadoop Map-Reduce system (and DFS, too).  It goes through
  46.  * the following steps:
  47.  *
  48.  * 1) Take inputs 'range' and 'counts'.
  49.  * 2) Generate 'counts' random integers between 0 and range-1.
  50.  * 3) Create a file that lists each integer between 0 and range-1,
  51.  *    and lists the number of times that integer was generated.
  52.  * 4) Emit a (very large) file that contains all the integers
  53.  *    in the order generated.
  54.  * 5) After the file has been generated, read it back and count
  55.  *    how many times each int was generated.
  56.  * 6) Compare this big count-map against the original one.  If
  57.  *    they match, then SUCCESS!  Otherwise, FAILURE!
  58.  *
  59.  * OK, that's how we can think about it.  What are the map-reduce
  60.  * steps that get the job done?
  61.  *
  62.  * 1) In a non-mapred thread, take the inputs 'range' and 'counts'.
  63.  * 2) In a non-mapread thread, generate the answer-key and write to disk.
  64.  * 3) In a mapred job, divide the answer key into K jobs.
  65.  * 4) A mapred 'generator' task consists of K map jobs.  Each reads
  66.  *    an individual "sub-key", and generates integers according to
  67.  *    to it (though with a random ordering).
  68.  * 5) The generator's reduce task agglomerates all of those files
  69.  *    into a single one.
  70.  * 6) A mapred 'reader' task consists of M map jobs.  The output
  71.  *    file is cut into M pieces. Each of the M jobs counts the 
  72.  *    individual ints in its chunk and creates a map of all seen ints.
  73.  * 7) A mapred job integrates all the count files into a single one.
  74.  *
  75.  **********************************************************/
  76. public class TestMapRed extends TestCase {
  77.   /**
  78.    * Modified to make it a junit test.
  79.    * The RandomGen Job does the actual work of creating
  80.    * a huge file of assorted numbers.  It receives instructions
  81.    * as to how many times each number should be counted.  Then
  82.    * it emits those numbers in a crazy order.
  83.    *
  84.    * The map() function takes a key/val pair that describes
  85.    * a value-to-be-emitted (the key) and how many times it 
  86.    * should be emitted (the value), aka "numtimes".  map() then
  87.    * emits a series of intermediate key/val pairs.  It emits
  88.    * 'numtimes' of these.  The key is a random number and the
  89.    * value is the 'value-to-be-emitted'.
  90.    *
  91.    * The system collates and merges these pairs according to
  92.    * the random number.  reduce() function takes in a key/value
  93.    * pair that consists of a crazy random number and a series
  94.    * of values that should be emitted.  The random number key
  95.    * is now dropped, and reduce() emits a pair for every intermediate value.
  96.    * The emitted key is an intermediate value.  The emitted value
  97.    * is just a blank string.  Thus, we've created a huge file
  98.    * of numbers in random order, but where each number appears
  99.    * as many times as we were instructed.
  100.    */
  101.   static class RandomGenMapper
  102.     implements Mapper<IntWritable, IntWritable, IntWritable, IntWritable> {
  103.     
  104.     public void configure(JobConf job) {
  105.     }
  106.     public void map(IntWritable key, IntWritable val,
  107.                     OutputCollector<IntWritable, IntWritable> out,
  108.                     Reporter reporter) throws IOException {
  109.       int randomVal = key.get();
  110.       int randomCount = val.get();
  111.       for (int i = 0; i < randomCount; i++) {
  112.         out.collect(new IntWritable(Math.abs(r.nextInt())), new IntWritable(randomVal));
  113.       }
  114.     }
  115.     public void close() {
  116.     }
  117.   }
  118.   /**
  119.    */
  120.   static class RandomGenReducer
  121.     implements Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
  122.     
  123.     public void configure(JobConf job) {
  124.     }
  125.     public void reduce(IntWritable key, Iterator<IntWritable> it,
  126.                        OutputCollector<IntWritable, IntWritable> out,
  127.                        Reporter reporter) throws IOException {
  128.       while (it.hasNext()) {
  129.         out.collect(it.next(), null);
  130.       }
  131.     }
  132.     public void close() {
  133.     }
  134.   }
  135.   /**
  136.    * The RandomCheck Job does a lot of our work.  It takes
  137.    * in a num/string keyspace, and transforms it into a
  138.    * key/count(int) keyspace.
  139.    *
  140.    * The map() function just emits a num/1 pair for every
  141.    * num/string input pair.
  142.    *
  143.    * The reduce() function sums up all the 1s that were
  144.    * emitted for a single key.  It then emits the key/total
  145.    * pair.
  146.    *
  147.    * This is used to regenerate the random number "answer key".
  148.    * Each key here is a random number, and the count is the
  149.    * number of times the number was emitted.
  150.    */
  151.   static class RandomCheckMapper
  152.     implements Mapper<WritableComparable, Text, IntWritable, IntWritable> {
  153.     
  154.     public void configure(JobConf job) {
  155.     }
  156.     public void map(WritableComparable key, Text val,
  157.                     OutputCollector<IntWritable, IntWritable> out,
  158.                     Reporter reporter) throws IOException {
  159.       out.collect(new IntWritable(Integer.parseInt(val.toString().trim())), new IntWritable(1));
  160.     }
  161.     public void close() {
  162.     }
  163.   }
  164.   /**
  165.    */
  166.   static class RandomCheckReducer
  167.       implements Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
  168.     public void configure(JobConf job) {
  169.     }
  170.         
  171.     public void reduce(IntWritable key, Iterator<IntWritable> it,
  172.                        OutputCollector<IntWritable, IntWritable> out,
  173.                        Reporter reporter) throws IOException {
  174.       int keyint = key.get();
  175.       int count = 0;
  176.       while (it.hasNext()) {
  177.         it.next();
  178.         count++;
  179.       }
  180.       out.collect(new IntWritable(keyint), new IntWritable(count));
  181.     }
  182.     public void close() {
  183.     }
  184.   }
  185.   /**
  186.    * The Merge Job is a really simple one.  It takes in
  187.    * an int/int key-value set, and emits the same set.
  188.    * But it merges identical keys by adding their values.
  189.    *
  190.    * Thus, the map() function is just the identity function
  191.    * and reduce() just sums.  Nothing to see here!
  192.    */
  193.   static class MergeMapper
  194.     implements Mapper<IntWritable, IntWritable, IntWritable, IntWritable> {
  195.     
  196.     public void configure(JobConf job) {
  197.     }
  198.     public void map(IntWritable key, IntWritable val,
  199.                     OutputCollector<IntWritable, IntWritable> out,
  200.                     Reporter reporter) throws IOException {
  201.       int keyint = key.get();
  202.       int valint = val.get();
  203.       out.collect(new IntWritable(keyint), new IntWritable(valint));
  204.     }
  205.     public void close() {
  206.     }
  207.   }
  208.   static class MergeReducer
  209.     implements Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
  210.     public void configure(JobConf job) {
  211.     }
  212.         
  213.     public void reduce(IntWritable key, Iterator<IntWritable> it,
  214.                        OutputCollector<IntWritable, IntWritable> out,
  215.                        Reporter reporter) throws IOException {
  216.       int keyint = key.get();
  217.       int total = 0;
  218.       while (it.hasNext()) {
  219.         total += it.next().get();
  220.       }
  221.       out.collect(new IntWritable(keyint), new IntWritable(total));
  222.     }
  223.     public void close() {
  224.     }
  225.   }
  226.   private static int range = 10;
  227.   private static int counts = 100;
  228.   private static Random r = new Random();
  229.   /**
  230.      public TestMapRed(int range, int counts, Configuration conf) throws IOException {
  231.      this.range = range;
  232.      this.counts = counts;
  233.      this.conf = conf;
  234.      }
  235.   **/
  236.   public void testMapred() throws Exception {
  237.     launch();
  238.   }
  239.   private static class MyMap
  240.     implements Mapper<WritableComparable, Text, Text, Text> {
  241.       
  242.     public void configure(JobConf conf) {
  243.     }
  244.       
  245.     public void map(WritableComparable key, Text value,
  246.                     OutputCollector<Text, Text> output,
  247.                     Reporter reporter) throws IOException {
  248.       String str = value.toString().toLowerCase();
  249.       output.collect(new Text(str), value);
  250.     }
  251.     public void close() throws IOException {
  252.     }
  253.   }
  254.     
  255.   private static class MyReduce extends IdentityReducer {
  256.     private JobConf conf;
  257.     private boolean compressInput;
  258.     private TaskAttemptID taskId;
  259.     private boolean first = true;
  260.       
  261.     @Override
  262.     public void configure(JobConf conf) {
  263.       this.conf = conf;
  264.       compressInput = conf.getCompressMapOutput();
  265.       taskId = TaskAttemptID.forName(conf.get("mapred.task.id"));
  266.     }
  267.       
  268.     public void reduce(WritableComparable key, Iterator values,
  269.                        OutputCollector output, Reporter reporter
  270.                        ) throws IOException {
  271.       if (first) {
  272.         first = false;
  273.         MapOutputFile mapOutputFile = new MapOutputFile(taskId.getJobID());
  274.         mapOutputFile.setConf(conf);
  275.         Path input = mapOutputFile.getInputFile(0, taskId);
  276.         FileSystem fs = FileSystem.get(conf);
  277.         assertTrue("reduce input exists " + input, fs.exists(input));
  278.         SequenceFile.Reader rdr = 
  279.           new SequenceFile.Reader(fs, input, conf);
  280.         assertEquals("is reduce input compressed " + input, 
  281.                      compressInput, 
  282.                      rdr.isCompressed());
  283.         rdr.close();          
  284.       }
  285.     }
  286.       
  287.   }
  288.   private static class BadPartitioner
  289.       implements Partitioner<LongWritable,Text> {
  290.     boolean low;
  291.     public void configure(JobConf conf) {
  292.       low = conf.getBoolean("test.testmapred.badpartition", true);
  293.     }
  294.     public int getPartition(LongWritable k, Text v, int numPartitions) {
  295.       return low ? -1 : numPartitions;
  296.     }
  297.   }
  298.   public void testPartitioner() throws Exception {
  299.     JobConf conf = new JobConf(TestMapRed.class);
  300.     conf.setPartitionerClass(BadPartitioner.class);
  301.     FileSystem fs = FileSystem.getLocal(conf);
  302.     Path testdir = new Path(
  303.         System.getProperty("test.build.data","/tmp")).makeQualified(fs);
  304.     Path inFile = new Path(testdir, "blah/blah");
  305.     DataOutputStream f = fs.create(inFile);
  306.     f.writeBytes("blah blah blahn");
  307.     f.close();
  308.     FileInputFormat.setInputPaths(conf, inFile);
  309.     FileOutputFormat.setOutputPath(conf, new Path(testdir, "out"));
  310.     conf.setMapperClass(IdentityMapper.class);
  311.     conf.setReducerClass(IdentityReducer.class);
  312.     conf.setOutputKeyClass(LongWritable.class);
  313.     conf.setOutputValueClass(Text.class);
  314.     // partition too low
  315.     conf.setBoolean("test.testmapred.badpartition", true);
  316.     boolean pass = true;
  317.     try {
  318.       JobClient.runJob(conf);
  319.     } catch (IOException e) {
  320.       pass = false;
  321.     }
  322.     assertFalse("should fail for partition < 0", pass);
  323.     // partition too high
  324.     conf.setBoolean("test.testmapred.badpartition", false);
  325.     pass = true;
  326.     try {
  327.       JobClient.runJob(conf);
  328.     } catch (IOException e) {
  329.       pass = false;
  330.     }
  331.     assertFalse("should fail for partition >= numPartitions", pass);
  332.   }
  333.   public static class NullMapper
  334.       implements Mapper<NullWritable,Text,NullWritable,Text> {
  335.     public void map(NullWritable key, Text val,
  336.         OutputCollector<NullWritable,Text> output, Reporter reporter)
  337.         throws IOException {
  338.       output.collect(NullWritable.get(), val);
  339.     }
  340.     public void configure(JobConf conf) { }
  341.     public void close() { }
  342.   }
  343.   public void testNullKeys() throws Exception {
  344.     JobConf conf = new JobConf(TestMapRed.class);
  345.     FileSystem fs = FileSystem.getLocal(conf);
  346.     Path testdir = new Path(
  347.         System.getProperty("test.build.data","/tmp")).makeQualified(fs);
  348.     fs.delete(testdir, true);
  349.     Path inFile = new Path(testdir, "nullin/blah");
  350.     SequenceFile.Writer w = SequenceFile.createWriter(fs, conf, inFile,
  351.         NullWritable.class, Text.class, SequenceFile.CompressionType.NONE);
  352.     Text t = new Text();
  353.     t.set("AAAAAAAAAAAAAA"); w.append(NullWritable.get(), t);
  354.     t.set("BBBBBBBBBBBBBB"); w.append(NullWritable.get(), t);
  355.     t.set("CCCCCCCCCCCCCC"); w.append(NullWritable.get(), t);
  356.     t.set("DDDDDDDDDDDDDD"); w.append(NullWritable.get(), t);
  357.     t.set("EEEEEEEEEEEEEE"); w.append(NullWritable.get(), t);
  358.     t.set("FFFFFFFFFFFFFF"); w.append(NullWritable.get(), t);
  359.     t.set("GGGGGGGGGGGGGG"); w.append(NullWritable.get(), t);
  360.     t.set("HHHHHHHHHHHHHH"); w.append(NullWritable.get(), t);
  361.     w.close();
  362.     FileInputFormat.setInputPaths(conf, inFile);
  363.     FileOutputFormat.setOutputPath(conf, new Path(testdir, "nullout"));
  364.     conf.setMapperClass(NullMapper.class);
  365.     conf.setReducerClass(IdentityReducer.class);
  366.     conf.setOutputKeyClass(NullWritable.class);
  367.     conf.setOutputValueClass(Text.class);
  368.     conf.setInputFormat(SequenceFileInputFormat.class);
  369.     conf.setOutputFormat(SequenceFileOutputFormat.class);
  370.     conf.setNumReduceTasks(1);
  371.     JobClient.runJob(conf);
  372.     SequenceFile.Reader r = new SequenceFile.Reader(fs,
  373.         new Path(testdir, "nullout/part-00000"), conf);
  374.     String m = "AAAAAAAAAAAAAA";
  375.     for (int i = 1; r.next(NullWritable.get(), t); ++i) {
  376.       assertTrue(t.toString() + " doesn't match " + m, m.equals(t.toString()));
  377.       m = m.replace((char)('A' + i - 1), (char)('A' + i));
  378.     }
  379.   }
  380.   private void checkCompression(boolean compressMapOutputs,
  381.                                 CompressionType redCompression,
  382.                                 boolean includeCombine
  383.                                 ) throws Exception {
  384.     JobConf conf = new JobConf(TestMapRed.class);
  385.     Path testdir = new Path("build/test/test.mapred.compress");
  386.     Path inDir = new Path(testdir, "in");
  387.     Path outDir = new Path(testdir, "out");
  388.     FileSystem fs = FileSystem.get(conf);
  389.     fs.delete(testdir, true);
  390.     FileInputFormat.setInputPaths(conf, inDir);
  391.     FileOutputFormat.setOutputPath(conf, outDir);
  392.     conf.setMapperClass(MyMap.class);
  393.     conf.setReducerClass(MyReduce.class);
  394.     conf.setOutputKeyClass(Text.class);
  395.     conf.setOutputValueClass(Text.class);
  396.     conf.setOutputFormat(SequenceFileOutputFormat.class);
  397.     if (includeCombine) {
  398.       conf.setCombinerClass(IdentityReducer.class);
  399.     }
  400.     conf.setCompressMapOutput(compressMapOutputs);
  401.     SequenceFileOutputFormat.setOutputCompressionType(conf, redCompression);
  402.     try {
  403.       if (!fs.mkdirs(testdir)) {
  404.         throw new IOException("Mkdirs failed to create " + testdir.toString());
  405.       }
  406.       if (!fs.mkdirs(inDir)) {
  407.         throw new IOException("Mkdirs failed to create " + inDir.toString());
  408.       }
  409.       Path inFile = new Path(inDir, "part0");
  410.       DataOutputStream f = fs.create(inFile);
  411.       f.writeBytes("Owen was heren");
  412.       f.writeBytes("Hadoop is funn");
  413.       f.writeBytes("Is this done, yet?n");
  414.       f.close();
  415.       RunningJob rj = JobClient.runJob(conf);
  416.       assertTrue("job was complete", rj.isComplete());
  417.       assertTrue("job was successful", rj.isSuccessful());
  418.       Path output = new Path(outDir,
  419.                              Task.getOutputName(0));
  420.       assertTrue("reduce output exists " + output, fs.exists(output));
  421.       SequenceFile.Reader rdr = 
  422.         new SequenceFile.Reader(fs, output, conf);
  423.       assertEquals("is reduce output compressed " + output, 
  424.                    redCompression != CompressionType.NONE, 
  425.                    rdr.isCompressed());
  426.       rdr.close();
  427.     } finally {
  428.       fs.delete(testdir, true);
  429.     }
  430.   }
  431.     
  432.   public void testCompression() throws Exception {
  433.     EnumSet<SequenceFile.CompressionType> seq =
  434.       EnumSet.allOf(SequenceFile.CompressionType.class);
  435.     for (CompressionType redCompression : seq) {
  436.       for(int combine=0; combine < 2; ++combine) {
  437.         checkCompression(false, redCompression, combine == 1);
  438.         checkCompression(true, redCompression, combine == 1);
  439.       }
  440.     }
  441.   }
  442.     
  443.     
  444.   /**
  445.    * 
  446.    */
  447.   public static void launch() throws Exception {
  448.     //
  449.     // Generate distribution of ints.  This is the answer key.
  450.     //
  451.     JobConf conf = new JobConf(TestMapRed.class);
  452.     int countsToGo = counts;
  453.     int dist[] = new int[range];
  454.     for (int i = 0; i < range; i++) {
  455.       double avgInts = (1.0 * countsToGo) / (range - i);
  456.       dist[i] = (int) Math.max(0, Math.round(avgInts + (Math.sqrt(avgInts) * r.nextGaussian())));
  457.       countsToGo -= dist[i];
  458.     }
  459.     if (countsToGo > 0) {
  460.       dist[dist.length-1] += countsToGo;
  461.     }
  462.     //
  463.     // Write the answer key to a file.  
  464.     //
  465.     FileSystem fs = FileSystem.get(conf);
  466.     Path testdir = new Path("mapred.loadtest");
  467.     if (!fs.mkdirs(testdir)) {
  468.       throw new IOException("Mkdirs failed to create " + testdir.toString());
  469.     }
  470.     Path randomIns = new Path(testdir, "genins");
  471.     if (!fs.mkdirs(randomIns)) {
  472.       throw new IOException("Mkdirs failed to create " + randomIns.toString());
  473.     }
  474.     Path answerkey = new Path(randomIns, "answer.key");
  475.     SequenceFile.Writer out = 
  476.       SequenceFile.createWriter(fs, conf, answerkey, IntWritable.class,
  477.                                 IntWritable.class, 
  478.                                 SequenceFile.CompressionType.NONE);
  479.     try {
  480.       for (int i = 0; i < range; i++) {
  481.         out.append(new IntWritable(i), new IntWritable(dist[i]));
  482.       }
  483.     } finally {
  484.       out.close();
  485.     }
  486.     //printFiles(randomIns, conf);
  487.     //
  488.     // Now we need to generate the random numbers according to
  489.     // the above distribution.
  490.     //
  491.     // We create a lot of map tasks, each of which takes at least
  492.     // one "line" of the distribution.  (That is, a certain number
  493.     // X is to be generated Y number of times.)
  494.     //
  495.     // A map task emits Y key/val pairs.  The val is X.  The key
  496.     // is a randomly-generated number.
  497.     //
  498.     // The reduce task gets its input sorted by key.  That is, sorted
  499.     // in random order.  It then emits a single line of text that
  500.     // for the given values.  It does not emit the key.
  501.     //
  502.     // Because there's just one reduce task, we emit a single big
  503.     // file of random numbers.
  504.     //
  505.     Path randomOuts = new Path(testdir, "genouts");
  506.     fs.delete(randomOuts, true);
  507.     JobConf genJob = new JobConf(conf, TestMapRed.class);
  508.     FileInputFormat.setInputPaths(genJob, randomIns);
  509.     genJob.setInputFormat(SequenceFileInputFormat.class);
  510.     genJob.setMapperClass(RandomGenMapper.class);
  511.     FileOutputFormat.setOutputPath(genJob, randomOuts);
  512.     genJob.setOutputKeyClass(IntWritable.class);
  513.     genJob.setOutputValueClass(IntWritable.class);
  514.     genJob.setOutputFormat(TextOutputFormat.class);
  515.     genJob.setReducerClass(RandomGenReducer.class);
  516.     genJob.setNumReduceTasks(1);
  517.     JobClient.runJob(genJob);
  518.     //printFiles(randomOuts, conf);
  519.     //
  520.     // Next, we read the big file in and regenerate the 
  521.     // original map.  It's split into a number of parts.
  522.     // (That number is 'intermediateReduces'.)
  523.     //
  524.     // We have many map tasks, each of which read at least one
  525.     // of the output numbers.  For each number read in, the
  526.     // map task emits a key/value pair where the key is the
  527.     // number and the value is "1".
  528.     //
  529.     // We have a single reduce task, which receives its input
  530.     // sorted by the key emitted above.  For each key, there will
  531.     // be a certain number of "1" values.  The reduce task sums
  532.     // these values to compute how many times the given key was
  533.     // emitted.
  534.     //
  535.     // The reduce task then emits a key/val pair where the key
  536.     // is the number in question, and the value is the number of
  537.     // times the key was emitted.  This is the same format as the
  538.     // original answer key (except that numbers emitted zero times
  539.     // will not appear in the regenerated key.)  The answer set
  540.     // is split into a number of pieces.  A final MapReduce job
  541.     // will merge them.
  542.     //
  543.     // There's not really a need to go to 10 reduces here 
  544.     // instead of 1.  But we want to test what happens when
  545.     // you have multiple reduces at once.
  546.     //
  547.     int intermediateReduces = 10;
  548.     Path intermediateOuts = new Path(testdir, "intermediateouts");
  549.     fs.delete(intermediateOuts, true);
  550.     JobConf checkJob = new JobConf(conf, TestMapRed.class);
  551.     FileInputFormat.setInputPaths(checkJob, randomOuts);
  552.     checkJob.setInputFormat(TextInputFormat.class);
  553.     checkJob.setMapperClass(RandomCheckMapper.class);
  554.     FileOutputFormat.setOutputPath(checkJob, intermediateOuts);
  555.     checkJob.setOutputKeyClass(IntWritable.class);
  556.     checkJob.setOutputValueClass(IntWritable.class);
  557.     checkJob.setOutputFormat(MapFileOutputFormat.class);
  558.     checkJob.setReducerClass(RandomCheckReducer.class);
  559.     checkJob.setNumReduceTasks(intermediateReduces);
  560.     JobClient.runJob(checkJob);
  561.     //printFiles(intermediateOuts, conf); 
  562.     //
  563.     // OK, now we take the output from the last job and
  564.     // merge it down to a single file.  The map() and reduce()
  565.     // functions don't really do anything except reemit tuples.
  566.     // But by having a single reduce task here, we end up merging
  567.     // all the files.
  568.     //
  569.     Path finalOuts = new Path(testdir, "finalouts");
  570.     fs.delete(finalOuts, true);
  571.     JobConf mergeJob = new JobConf(conf, TestMapRed.class);
  572.     FileInputFormat.setInputPaths(mergeJob, intermediateOuts);
  573.     mergeJob.setInputFormat(SequenceFileInputFormat.class);
  574.     mergeJob.setMapperClass(MergeMapper.class);
  575.         
  576.     FileOutputFormat.setOutputPath(mergeJob, finalOuts);
  577.     mergeJob.setOutputKeyClass(IntWritable.class);
  578.     mergeJob.setOutputValueClass(IntWritable.class);
  579.     mergeJob.setOutputFormat(SequenceFileOutputFormat.class);
  580.     mergeJob.setReducerClass(MergeReducer.class);
  581.     mergeJob.setNumReduceTasks(1);
  582.         
  583.     JobClient.runJob(mergeJob);
  584.     //printFiles(finalOuts, conf); 
  585.  
  586.     //
  587.     // Finally, we compare the reconstructed answer key with the
  588.     // original one.  Remember, we need to ignore zero-count items
  589.     // in the original key.
  590.     //
  591.     boolean success = true;
  592.     Path recomputedkey = new Path(finalOuts, "part-00000");
  593.     SequenceFile.Reader in = new SequenceFile.Reader(fs, recomputedkey, conf);
  594.     int totalseen = 0;
  595.     try {
  596.       IntWritable key = new IntWritable();
  597.       IntWritable val = new IntWritable();            
  598.       for (int i = 0; i < range; i++) {
  599.         if (dist[i] == 0) {
  600.           continue;
  601.         }
  602.         if (!in.next(key, val)) {
  603.           System.err.println("Cannot read entry " + i);
  604.           success = false;
  605.           break;
  606.         } else {
  607.           if (!((key.get() == i) && (val.get() == dist[i]))) {
  608.             System.err.println("Mismatch!  Pos=" + key.get() + ", i=" + i + 
  609.                                ", val=" + val.get() + ", dist[i]=" + dist[i]);
  610.             success = false;
  611.           }
  612.           totalseen += val.get();
  613.         }
  614.       }
  615.       if (success) {
  616.         if (in.next(key, val)) {
  617.           System.err.println("Unnecessary lines in recomputed key!");
  618.           success = false;
  619.         }
  620.       }
  621.     } finally {
  622.       in.close();
  623.     }
  624.     int originalTotal = 0;
  625.     for (int i = 0; i < dist.length; i++) {
  626.       originalTotal += dist[i];
  627.     }
  628.     System.out.println("Original sum: " + originalTotal);
  629.     System.out.println("Recomputed sum: " + totalseen);
  630.     //
  631.     // Write to "results" whether the test succeeded or not.
  632.     //
  633.     Path resultFile = new Path(testdir, "results");
  634.     BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(fs.create(resultFile)));
  635.     try {
  636.       bw.write("Success=" + success + "n");
  637.       System.out.println("Success=" + success);
  638.     } finally {
  639.       bw.close();
  640.     }
  641.     assertTrue("testMapRed failed", success);
  642.     fs.delete(testdir, true);
  643.   }
  644.   private static void printTextFile(FileSystem fs, Path p) throws IOException {
  645.     BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(p)));
  646.     String line;
  647.     while ((line = in.readLine()) != null) {
  648.       System.out.println("  Row: " + line);
  649.     }
  650.     in.close();
  651.   }
  652.   private static void printSequenceFile(FileSystem fs, Path p, 
  653.                                         Configuration conf) throws IOException {
  654.     SequenceFile.Reader r = new SequenceFile.Reader(fs, p, conf);
  655.     Object key = null;
  656.     Object value = null;
  657.     while ((key = r.next(key)) != null) {
  658.       value = r.getCurrentValue(value);
  659.       System.out.println("  Row: " + key + ", " + value);
  660.     }
  661.     r.close();    
  662.   }
  663.   private static boolean isSequenceFile(FileSystem fs,
  664.                                         Path f) throws IOException {
  665.     DataInputStream in = fs.open(f);
  666.     byte[] seq = "SEQ".getBytes();
  667.     for(int i=0; i < seq.length; ++i) {
  668.       if (seq[i] != in.read()) {
  669.         return false;
  670.       }
  671.     }
  672.     return true;
  673.   }
  674.   private static void printFiles(Path dir, 
  675.                                  Configuration conf) throws IOException {
  676.     FileSystem fs = dir.getFileSystem(conf);
  677.     for(FileStatus f: fs.listStatus(dir)) {
  678.       System.out.println("Reading " + f.getPath() + ": ");
  679.       if (f.isDir()) {
  680.         System.out.println("  it is a map file.");
  681.         printSequenceFile(fs, new Path(f.getPath(), "data"), conf);
  682.       } else if (isSequenceFile(fs, f.getPath())) {
  683.         System.out.println("  it is a sequence file.");
  684.         printSequenceFile(fs, f.getPath(), conf);
  685.       } else {
  686.         System.out.println("  it is a text file.");
  687.         printTextFile(fs, f.getPath());
  688.       }
  689.     }
  690.   }
  691.   /**
  692.    * Launches all the tasks in order.
  693.    */
  694.   public static void main(String[] argv) throws Exception {
  695.     if (argv.length < 2) {
  696.       System.err.println("Usage: TestMapRed <range> <counts>");
  697.       System.err.println();
  698.       System.err.println("Note: a good test will have a <counts> value that is substantially larger than the <range>");
  699.       return;
  700.     }
  701.     int i = 0;
  702.     range = Integer.parseInt(argv[i++]);
  703.     counts = Integer.parseInt(argv[i++]);
  704.     launch();
  705.   }
  706.     
  707.   public void testSmallInput(){
  708.     runJob(100);
  709.   }
  710.   public void testBiggerInput(){
  711.     runJob(1000);
  712.   }
  713.   public void runJob(int items) {
  714.     try {
  715.       JobConf conf = new JobConf(TestMapRed.class);
  716.       Path testdir = new Path("build/test/test.mapred.spill");
  717.       Path inDir = new Path(testdir, "in");
  718.       Path outDir = new Path(testdir, "out");
  719.       FileSystem fs = FileSystem.get(conf);
  720.       fs.delete(testdir, true);
  721.       conf.setInt("io.sort.mb", 1);
  722.       conf.setInputFormat(SequenceFileInputFormat.class);
  723.       FileInputFormat.setInputPaths(conf, inDir);
  724.       FileOutputFormat.setOutputPath(conf, outDir);
  725.       conf.setMapperClass(IdentityMapper.class);
  726.       conf.setReducerClass(IdentityReducer.class);
  727.       conf.setOutputKeyClass(Text.class);
  728.       conf.setOutputValueClass(Text.class);
  729.       conf.setOutputFormat(SequenceFileOutputFormat.class);
  730.       if (!fs.mkdirs(testdir)) {
  731.         throw new IOException("Mkdirs failed to create " + testdir.toString());
  732.       }
  733.       if (!fs.mkdirs(inDir)) {
  734.         throw new IOException("Mkdirs failed to create " + inDir.toString());
  735.       }
  736.       Path inFile = new Path(inDir, "part0");
  737.       SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, inFile,
  738.                                                              Text.class, Text.class);
  739.       StringBuffer content = new StringBuffer();
  740.       for (int i = 0; i < 1000; i++) {
  741.         content.append(i).append(": This is one more line of contentn");
  742.       }
  743.       Text text = new Text(content.toString());
  744.       for (int i = 0; i < items; i++) {
  745.         writer.append(new Text("rec:" + i), text);
  746.       }
  747.       writer.close();
  748.       JobClient.runJob(conf);
  749.     } catch (Exception e) {
  750.       fail("Threw exception:" + e);
  751.     }
  752.   }
  753. }