TestComparators.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:16k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import org.apache.hadoop.fs.*;
  20. import org.apache.hadoop.io.*;
  21. import org.apache.hadoop.io.BooleanWritable.Comparator;
  22. import junit.framework.TestCase;
  23. import java.io.*;
  24. import java.util.*;
  25. /**
  26.  * Two different types of comparators can be used in MapReduce. One is used
  27.  * during the Map and Reduce phases, to sort/merge key-value pairs. Another
  28.  * is used to group values for a particular key, when calling the user's 
  29.  * reducer. A user can override both of these two. 
  30.  * This class has tests for making sure we use the right comparators at the 
  31.  * right places. See Hadoop issues 485 and 1535. Our tests: 
  32.  * 1. Test that the same comparator is used for all sort/merge operations 
  33.  * during the Map and Reduce phases.  
  34.  * 2. Test the common use case where values are grouped by keys but values 
  35.  * within each key are grouped by a secondary key (a timestamp, for example). 
  36.  */
  37. public class TestComparators extends TestCase 
  38. {
  39.   JobConf conf = new JobConf(TestMapOutputType.class);
  40.   JobClient jc;
  41.   static Random rng = new Random();
  42.   /** 
  43.    * RandomGen is a mapper that generates 5 random values for each key
  44.    * in the input. The values are in the range [0-4]. The mapper also
  45.    * generates a composite key. If the input key is x and the generated
  46.    * value is y, the composite key is x0y (x-zero-y). Therefore, the inter-
  47.    * mediate key value pairs are ordered by {input key, value}.
  48.    * Think of the random value as a timestamp associated with the record. 
  49.    */
  50.   static class RandomGenMapper
  51.     implements Mapper<IntWritable, Writable, IntWritable, IntWritable> {
  52.     
  53.     public void configure(JobConf job) {
  54.     }
  55.     
  56.     public void map(IntWritable key, Writable value,
  57.                     OutputCollector<IntWritable, IntWritable> out,
  58.                     Reporter reporter) throws IOException {
  59.       int num_values = 5;
  60.       for(int i = 0; i < num_values; ++i) {
  61.         int val = rng.nextInt(num_values);
  62.         int compositeKey = key.get() * 100 + val;
  63.         out.collect(new IntWritable(compositeKey), new IntWritable(val));
  64.       }
  65.     }
  66.     
  67.     public void close() {
  68.     }
  69.   }
  70.   
  71.   /** 
  72.    * Your basic identity mapper. 
  73.    */
  74.   static class IdentityMapper
  75.     implements Mapper<WritableComparable, Writable,
  76.                       WritableComparable, Writable> {
  77.     
  78.     public void configure(JobConf job) {
  79.     }
  80.     
  81.     public void map(WritableComparable key, Writable value,
  82.                     OutputCollector<WritableComparable, Writable> out,
  83.                     Reporter reporter) throws IOException {
  84.       out.collect(key, value);
  85.     }
  86.     
  87.     public void close() {
  88.     }
  89.   }
  90.   
  91.   /** 
  92.    * Checks whether keys are in ascending order.  
  93.    */
  94.   static class AscendingKeysReducer
  95.     implements Reducer<IntWritable, Writable, IntWritable, Text> {
  96.     
  97.     public void configure(JobConf job) {}
  98.     // keep track of the last key we've seen
  99.     private int lastKey = Integer.MIN_VALUE;
  100.     public void reduce(IntWritable key, Iterator<Writable> values, 
  101.                        OutputCollector<IntWritable, Text> out,
  102.                        Reporter reporter) throws IOException {
  103.       int currentKey = key.get();
  104.       // keys should be in ascending order
  105.       if (currentKey < lastKey) {
  106.         fail("Keys not in sorted ascending order");
  107.       }
  108.       lastKey = currentKey;
  109.       out.collect(key, new Text("success"));
  110.     }
  111.     
  112.     public void close() {}
  113.   }
  114.   
  115.   /** 
  116.    * Checks whether keys are in ascending order.  
  117.    */
  118.   static class DescendingKeysReducer
  119.     implements Reducer<IntWritable, Writable, IntWritable, Text> {
  120.     public void configure(JobConf job) {}
  121.     // keep track of the last key we've seen
  122.     private int lastKey = Integer.MAX_VALUE;
  123.     public void reduce(IntWritable key, Iterator<Writable> values, 
  124.                        OutputCollector<IntWritable, Text> out,
  125.                        Reporter reporter) throws IOException {
  126.       int currentKey = ((IntWritable)(key)).get();
  127.       // keys should be in descending order
  128.       if (currentKey > lastKey) {
  129.         fail("Keys not in sorted descending order");
  130.       }
  131.       lastKey = currentKey;
  132.       out.collect(key, new Text("success"));
  133.     }
  134.     
  135.     public void close() {}
  136.   }
  137.   
  138.   /** The reducer checks whether the input values are in ascending order and
  139.    * whether they are correctly grouped by key (i.e. each call to reduce
  140.    * should have 5 values if the grouping is correct). It also checks whether
  141.    * the keys themselves are in ascending order.
  142.    */
  143.   static class AscendingGroupReducer
  144.     implements Reducer<IntWritable, IntWritable, IntWritable, Text> {
  145.     
  146.     public void configure(JobConf job) {
  147.     }
  148.     // keep track of the last key we've seen
  149.     private int lastKey = Integer.MIN_VALUE;
  150.     public void reduce(IntWritable key,
  151.                        Iterator<IntWritable> values,
  152.                        OutputCollector<IntWritable, Text> out,
  153.                        Reporter reporter) throws IOException {
  154.       // check key order
  155.       int currentKey = key.get();
  156.       if (currentKey < lastKey) {
  157.         fail("Keys not in sorted ascending order");
  158.       }
  159.       lastKey = currentKey;
  160.       // check order of values
  161.       IntWritable previous = new IntWritable(Integer.MIN_VALUE);
  162.       int valueCount = 0;
  163.       while (values.hasNext()) {
  164.         IntWritable current = values.next();
  165.         
  166.         // Check that the values are sorted
  167.         if (current.compareTo(previous) < 0)
  168.           fail("Values generated by Mapper not in order");
  169.         previous = current;
  170.         ++valueCount;
  171.       }
  172.       if (valueCount != 5) {
  173.         fail("Values not grouped by primary key");
  174.       }
  175.       out.collect(key, new Text("success"));
  176.     }
  177.     public void close() {
  178.     }
  179.   }
  180.   
  181.   /** The reducer checks whether the input values are in descending order and
  182.    * whether they are correctly grouped by key (i.e. each call to reduce
  183.    * should have 5 values if the grouping is correct). 
  184.    */
  185.   static class DescendingGroupReducer
  186.     implements Reducer<IntWritable, IntWritable, IntWritable, Text> {
  187.     
  188.     public void configure(JobConf job) {
  189.     }
  190.     // keep track of the last key we've seen
  191.     private int lastKey = Integer.MAX_VALUE;
  192.     public void reduce(IntWritable key,
  193.                        Iterator<IntWritable> values,
  194.                        OutputCollector<IntWritable, Text> out,
  195.                        Reporter reporter) throws IOException {
  196.       // check key order
  197.       int currentKey = key.get();
  198.       if (currentKey > lastKey) {
  199.         fail("Keys not in sorted descending order");
  200.       }
  201.       lastKey = currentKey;
  202.       // check order of values
  203.       IntWritable previous = new IntWritable(Integer.MAX_VALUE);
  204.       int valueCount = 0;
  205.       while (values.hasNext()) {
  206.         IntWritable current = values.next();
  207.         
  208.         // Check that the values are sorted
  209.         if (current.compareTo(previous) > 0)
  210.           fail("Values generated by Mapper not in order");
  211.         previous = current;
  212.         ++valueCount;
  213.       }
  214.       if (valueCount != 5) {
  215.         fail("Values not grouped by primary key");
  216.       }
  217.       out.collect(key, new Text("success"));
  218.     }
  219.     public void close() {
  220.     }
  221.   }
  222.   
  223.   /** 
  224.    * A decreasing Comparator for IntWritable 
  225.    */ 
  226.   public static class DecreasingIntComparator extends IntWritable.Comparator {
  227.     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
  228.       return -super.compare(b1, s1, l1, b2, s2, l2);
  229.     }
  230.     static {                    // register this comparator
  231.       WritableComparator.define(DecreasingIntComparator.class, new Comparator());
  232.     }
  233.   }
  234.   /** Grouping function for values based on the composite key. This
  235.    * comparator strips off the secondary key part from the x0y composite
  236.    * and only compares the primary key value (x).
  237.    */
  238.   public static class CompositeIntGroupFn extends WritableComparator {
  239.     public CompositeIntGroupFn() {
  240.       super(IntWritable.class);
  241.     }
  242.     public int compare (WritableComparable v1, WritableComparable v2) {
  243.       int val1 = ((IntWritable)(v1)).get() / 100;
  244.       int val2 = ((IntWritable)(v2)).get() / 100;
  245.       if (val1 < val2)
  246.         return 1;
  247.       else if (val1 > val2)
  248.         return -1;
  249.       else
  250.         return 0;
  251.     }
  252.     
  253.     public boolean equals (IntWritable v1, IntWritable v2) {
  254.       int val1 = v1.get();
  255.       int val2 = v2.get();
  256.       
  257.       return (val1/100) == (val2/100);
  258.     }
  259.     
  260.     static {
  261.       WritableComparator.define(CompositeIntGroupFn.class, new Comparator());
  262.     }
  263.   }
  264.   /** Reverse grouping function for values based on the composite key. 
  265.    */
  266.   public static class CompositeIntReverseGroupFn extends CompositeIntGroupFn {
  267.     public int compare (WritableComparable v1, WritableComparable v2) {
  268.       return -super.compare(v1, v2);
  269.     }
  270.     
  271.     public boolean equals (IntWritable v1, IntWritable v2) {
  272.       return !(super.equals(v1, v2));
  273.     }
  274.     
  275.     static {
  276.       WritableComparator.define(CompositeIntReverseGroupFn.class, new Comparator());
  277.     }
  278.   }
  279.   public void configure() throws Exception {
  280.     Path testdir = new Path("build/test/test.mapred.spill");
  281.     Path inDir = new Path(testdir, "in");
  282.     Path outDir = new Path(testdir, "out");
  283.     FileSystem fs = FileSystem.get(conf);
  284.     fs.delete(testdir, true);
  285.     conf.setInputFormat(SequenceFileInputFormat.class);
  286.     FileInputFormat.setInputPaths(conf, inDir);
  287.     FileOutputFormat.setOutputPath(conf, outDir);
  288.     conf.setOutputKeyClass(IntWritable.class);
  289.     conf.setOutputValueClass(Text.class);
  290.     conf.setMapOutputValueClass(IntWritable.class);
  291.     // set up two map jobs, so we can test merge phase in Reduce also
  292.     conf.setNumMapTasks(2);
  293.     
  294.     conf.setOutputFormat(SequenceFileOutputFormat.class);
  295.     if (!fs.mkdirs(testdir)) {
  296.       throw new IOException("Mkdirs failed to create " + testdir.toString());
  297.     }
  298.     if (!fs.mkdirs(inDir)) {
  299.       throw new IOException("Mkdirs failed to create " + inDir.toString());
  300.     }
  301.     // set up input data in 2 files 
  302.     Path inFile = new Path(inDir, "part0");
  303.     SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, inFile, 
  304.         IntWritable.class, IntWritable.class);
  305.     writer.append(new IntWritable(11), new IntWritable(999));
  306.     writer.append(new IntWritable(23), new IntWritable(456));
  307.     writer.append(new IntWritable(10), new IntWritable(780));
  308.     writer.close();
  309.     inFile = new Path(inDir, "part1");
  310.     writer = SequenceFile.createWriter(fs, conf, inFile, 
  311.         IntWritable.class, IntWritable.class);
  312.     writer.append(new IntWritable(45), new IntWritable(100));
  313.     writer.append(new IntWritable(18), new IntWritable(200));
  314.     writer.append(new IntWritable(27), new IntWritable(300));
  315.     writer.close();
  316.     
  317.     jc = new JobClient(conf);
  318.   }
  319.   
  320.   /**
  321.    * Test the default comparator for Map/Reduce. 
  322.    * Use the identity mapper and see if the keys are sorted at the end
  323.    * @throws Exception
  324.    */
  325.   public void testDefaultMRComparator() throws Exception { 
  326.     configure();
  327.     conf.setMapperClass(IdentityMapper.class);
  328.     conf.setReducerClass(AscendingKeysReducer.class);
  329.     
  330.     RunningJob r_job = jc.submitJob(conf);
  331.     while (!r_job.isComplete()) {
  332.       Thread.sleep(1000);
  333.     }
  334.     
  335.     if (!r_job.isSuccessful()) {
  336.       fail("Oops! The job broke due to an unexpected error");
  337.     }
  338.   }
  339.   
  340.   /**
  341.    * Test user-defined comparator for Map/Reduce.
  342.    * We provide our own comparator that is the reverse of the default int 
  343.    * comparator. Keys should be sorted in reverse order in the reducer. 
  344.    * @throws Exception
  345.    */
  346.   public void testUserMRComparator() throws Exception { 
  347.     configure();
  348.     conf.setMapperClass(IdentityMapper.class);
  349.     conf.setReducerClass(DescendingKeysReducer.class);
  350.     conf.setOutputKeyComparatorClass(DecreasingIntComparator.class);
  351.     
  352.     RunningJob r_job = jc.submitJob(conf);
  353.     while (!r_job.isComplete()) {
  354.       Thread.sleep(1000);
  355.     }
  356.     
  357.     if (!r_job.isSuccessful()) {
  358.       fail("Oops! The job broke due to an unexpected error");
  359.     }
  360.   }
  361.   
  362.   /**
  363.    * Test user-defined grouping comparator for grouping values in Reduce.
  364.    * We generate composite keys that contain a random number, which acts
  365.    * as a timestamp associated with the record. In our Reduce function, 
  366.    * values for a key should be sorted by the 'timestamp'. 
  367.    * @throws Exception
  368.    */
  369.   public void testUserValueGroupingComparator() throws Exception { 
  370.     configure();
  371.     conf.setMapperClass(RandomGenMapper.class);
  372.     conf.setReducerClass(AscendingGroupReducer.class);
  373.     conf.setOutputValueGroupingComparator(CompositeIntGroupFn.class);
  374.     
  375.     RunningJob r_job = jc.submitJob(conf);
  376.     while (!r_job.isComplete()) {
  377.       Thread.sleep(1000);
  378.     }
  379.     
  380.     if (!r_job.isSuccessful()) {
  381.       fail("Oops! The job broke due to an unexpected error");
  382.     }
  383.   }
  384.   
  385.   /**
  386.    * Test all user comparators. Super-test of all tests here. 
  387.    * We generate composite keys that contain a random number, which acts
  388.    * as a timestamp associated with the record. In our Reduce function, 
  389.    * values for a key should be sorted by the 'timestamp'.
  390.    * We also provide our own comparators that reverse the default sorting 
  391.    * order. This lets us make sure that the right comparators are used. 
  392.    * @throws Exception
  393.    */
  394.   public void testAllUserComparators() throws Exception { 
  395.     configure();
  396.     conf.setMapperClass(RandomGenMapper.class);
  397.     // use a decreasing comparator so keys are sorted in reverse order
  398.     conf.setOutputKeyComparatorClass(DecreasingIntComparator.class);
  399.     conf.setReducerClass(DescendingGroupReducer.class);
  400.     conf.setOutputValueGroupingComparator(CompositeIntReverseGroupFn.class);
  401.     RunningJob r_job = jc.submitJob(conf);
  402.     while (!r_job.isComplete()) {
  403.       Thread.sleep(1000);
  404.     }
  405.     
  406.     if (!r_job.isSuccessful()) {
  407.       fail("Oops! The job broke due to an unexpected error");
  408.     }
  409.   }
  410.   /**
  411.    * Test a user comparator that relies on deserializing both arguments
  412.    * for each compare.
  413.    */
  414.   public void testBakedUserComparator() throws Exception {
  415.     MyWritable a = new MyWritable(8, 8);
  416.     MyWritable b = new MyWritable(7, 9);
  417.     assertTrue(a.compareTo(b) > 0);
  418.     assertTrue(WritableComparator.get(MyWritable.class).compare(a, b) < 0);
  419.   }
  420.   public static class MyWritable implements WritableComparable<MyWritable> {
  421.     int i, j;
  422.     public MyWritable() { }
  423.     public MyWritable(int i, int j) {
  424.       this.i = i;
  425.       this.j = j;
  426.     }
  427.     public void readFields(DataInput in) throws IOException {
  428.       i = in.readInt();
  429.       j = in.readInt();
  430.     }
  431.     public void write(DataOutput out) throws IOException {
  432.       out.writeInt(i);
  433.       out.writeInt(j);
  434.     }
  435.     public int compareTo(MyWritable b) {
  436.       return this.i - b.i;
  437.     }
  438.     static {
  439.       WritableComparator.define(MyWritable.class, new MyCmp());
  440.     }
  441.   }
  442.   public static class MyCmp extends WritableComparator {
  443.     public MyCmp() { super(MyWritable.class, true); }
  444.     public int compare(WritableComparable a, WritableComparable b) {
  445.       MyWritable aa = (MyWritable)a;
  446.       MyWritable bb = (MyWritable)b;
  447.       return aa.j - bb.j;
  448.     }
  449.   }
  450. }