TestDatamerge.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:15k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred.join;
  19. import java.io.DataInput;
  20. import java.io.DataOutput;
  21. import java.io.IOException;
  22. import java.util.Iterator;
  23. import junit.framework.Test;
  24. import junit.framework.TestCase;
  25. import junit.framework.TestSuite;
  26. import junit.extensions.TestSetup;
  27. import org.apache.hadoop.conf.Configuration;
  28. import org.apache.hadoop.fs.FileStatus;
  29. import org.apache.hadoop.fs.Path;
  30. import org.apache.hadoop.hdfs.MiniDFSCluster;
  31. import org.apache.hadoop.io.IntWritable;
  32. import org.apache.hadoop.io.NullWritable;
  33. import org.apache.hadoop.io.SequenceFile;
  34. import org.apache.hadoop.io.Text;
  35. import org.apache.hadoop.io.Writable;
  36. import org.apache.hadoop.io.WritableComparable;
  37. import org.apache.hadoop.mapred.FileOutputFormat;
  38. import org.apache.hadoop.mapred.InputFormat;
  39. import org.apache.hadoop.mapred.InputSplit;
  40. import org.apache.hadoop.mapred.JobClient;
  41. import org.apache.hadoop.mapred.JobConf;
  42. import org.apache.hadoop.mapred.JobConfigurable;
  43. import org.apache.hadoop.mapred.Mapper;
  44. import org.apache.hadoop.mapred.OutputCollector;
  45. import org.apache.hadoop.mapred.RecordReader;
  46. import org.apache.hadoop.mapred.Reducer;
  47. import org.apache.hadoop.mapred.Reporter;
  48. import org.apache.hadoop.mapred.SequenceFileInputFormat;
  49. import org.apache.hadoop.mapred.SequenceFileOutputFormat;
  50. import org.apache.hadoop.mapred.lib.IdentityMapper;
  51. import org.apache.hadoop.mapred.lib.IdentityReducer;
  52. import org.apache.hadoop.util.ReflectionUtils;
  53. public class TestDatamerge extends TestCase {
  54.   private static MiniDFSCluster cluster = null;
  55.   public static Test suite() {
  56.     TestSetup setup = new TestSetup(new TestSuite(TestDatamerge.class)) {
  57.       protected void setUp() throws Exception {
  58.         Configuration conf = new Configuration();
  59.         cluster = new MiniDFSCluster(conf, 2, true, null);
  60.       }
  61.       protected void tearDown() throws Exception {
  62.         if (cluster != null) {
  63.           cluster.shutdown();
  64.         }
  65.       }
  66.     };
  67.     return setup;
  68.   }
  69.   private static SequenceFile.Writer[] createWriters(Path testdir,
  70.       Configuration conf, int srcs, Path[] src) throws IOException {
  71.     for (int i = 0; i < srcs; ++i) {
  72.       src[i] = new Path(testdir, Integer.toString(i + 10, 36));
  73.     }
  74.     SequenceFile.Writer out[] = new SequenceFile.Writer[srcs];
  75.     for (int i = 0; i < srcs; ++i) {
  76.       out[i] = new SequenceFile.Writer(testdir.getFileSystem(conf), conf,
  77.           src[i], IntWritable.class, IntWritable.class);
  78.     }
  79.     return out;
  80.   }
  81.   private static Path[] writeSimpleSrc(Path testdir, Configuration conf,
  82.       int srcs) throws IOException {
  83.     SequenceFile.Writer out[] = null;
  84.     Path[] src = new Path[srcs];
  85.     try {
  86.       out = createWriters(testdir, conf, srcs, src);
  87.       final int capacity = srcs * 2 + 1;
  88.       IntWritable key = new IntWritable();
  89.       IntWritable val = new IntWritable();
  90.       for (int k = 0; k < capacity; ++k) {
  91.         for (int i = 0; i < srcs; ++i) {
  92.           key.set(k % srcs == 0 ? k * srcs : k * srcs + i);
  93.           val.set(10 * k + i);
  94.           out[i].append(key, val);
  95.           if (i == k) {
  96.             // add duplicate key
  97.             out[i].append(key, val);
  98.           }
  99.         }
  100.       }
  101.     } finally {
  102.       if (out != null) {
  103.         for (int i = 0; i < srcs; ++i) {
  104.           if (out[i] != null)
  105.             out[i].close();
  106.         }
  107.       }
  108.     }
  109.     return src;
  110.   }
  111.   private static String stringify(IntWritable key, Writable val) {
  112.     StringBuilder sb = new StringBuilder();
  113.     sb.append("(" + key);
  114.     sb.append("," + val + ")");
  115.     return sb.toString();
  116.   }
  117.   private static abstract class SimpleCheckerBase<V extends Writable>
  118.       implements Mapper<IntWritable, V, IntWritable, IntWritable>,
  119.                  Reducer<IntWritable, IntWritable, Text, Text> {
  120.     protected final static IntWritable one = new IntWritable(1);
  121.     int srcs;
  122.     public void close() { }
  123.     public void configure(JobConf job) {
  124.       srcs = job.getInt("testdatamerge.sources", 0);
  125.       assertTrue("Invalid src count: " + srcs, srcs > 0);
  126.     }
  127.     public abstract void map(IntWritable key, V val,
  128.         OutputCollector<IntWritable, IntWritable> out, Reporter reporter)
  129.         throws IOException;
  130.     public void reduce(IntWritable key, Iterator<IntWritable> values,
  131.                        OutputCollector<Text, Text> output,
  132.                        Reporter reporter) throws IOException {
  133.       int seen = 0;
  134.       while (values.hasNext()) {
  135.         seen += values.next().get();
  136.       }
  137.       assertTrue("Bad count for " + key.get(), verify(key.get(), seen));
  138.     }
  139.     public abstract boolean verify(int key, int occ);
  140.   }
  141.   private static class InnerJoinChecker
  142.       extends SimpleCheckerBase<TupleWritable> {
  143.     public void map(IntWritable key, TupleWritable val,
  144.         OutputCollector<IntWritable, IntWritable> out, Reporter reporter)
  145.         throws IOException {
  146.       int k = key.get();
  147.       final String kvstr = "Unexpected tuple: " + stringify(key, val);
  148.       assertTrue(kvstr, 0 == k % (srcs * srcs));
  149.       for (int i = 0; i < val.size(); ++i) {
  150.         final int vali = ((IntWritable)val.get(i)).get();
  151.         assertTrue(kvstr, (vali - i) * srcs == 10 * k);
  152.       }
  153.       out.collect(key, one);
  154.     }
  155.     public boolean verify(int key, int occ) {
  156.       return (key == 0 && occ == 2) ||
  157.              (key != 0 && (key % (srcs * srcs) == 0) && occ == 1);
  158.     }
  159.   }
  160.   private static class OuterJoinChecker
  161.       extends SimpleCheckerBase<TupleWritable> {
  162.     public void map(IntWritable key, TupleWritable val,
  163.         OutputCollector<IntWritable, IntWritable> out, Reporter reporter)
  164.         throws IOException {
  165.       int k = key.get();
  166.       final String kvstr = "Unexpected tuple: " + stringify(key, val);
  167.       if (0 == k % (srcs * srcs)) {
  168.         for (int i = 0; i < val.size(); ++i) {
  169.           assertTrue(kvstr, val.get(i) instanceof IntWritable);
  170.           final int vali = ((IntWritable)val.get(i)).get();
  171.           assertTrue(kvstr, (vali - i) * srcs == 10 * k);
  172.         }
  173.       } else {
  174.         for (int i = 0; i < val.size(); ++i) {
  175.           if (i == k % srcs) {
  176.             assertTrue(kvstr, val.get(i) instanceof IntWritable);
  177.             final int vali = ((IntWritable)val.get(i)).get();
  178.             assertTrue(kvstr, srcs * (vali - i) == 10 * (k - i));
  179.           } else {
  180.             assertTrue(kvstr, !val.has(i));
  181.           }
  182.         }
  183.       }
  184.       out.collect(key, one);
  185.     }
  186.     public boolean verify(int key, int occ) {
  187.       if (key < srcs * srcs && (key % (srcs + 1)) == 0)
  188.         return 2 == occ;
  189.       return 1 == occ;
  190.     }
  191.   }
  192.   private static class OverrideChecker
  193.       extends SimpleCheckerBase<IntWritable> {
  194.     public void map(IntWritable key, IntWritable val,
  195.         OutputCollector<IntWritable, IntWritable> out, Reporter reporter)
  196.         throws IOException {
  197.       int k = key.get();
  198.       final int vali = val.get();
  199.       final String kvstr = "Unexpected tuple: " + stringify(key, val);
  200.       if (0 == k % (srcs * srcs)) {
  201.         assertTrue(kvstr, vali == k * 10 / srcs + srcs - 1);
  202.       } else {
  203.         final int i = k % srcs;
  204.         assertTrue(kvstr, srcs * (vali - i) == 10 * (k - i));
  205.       }
  206.       out.collect(key, one);
  207.     }
  208.     public boolean verify(int key, int occ) {
  209.       if (key < srcs * srcs && (key % (srcs + 1)) == 0 && key != 0)
  210.         return 2 == occ;
  211.       return 1 == occ;
  212.     }
  213.   }
  214.   private static void joinAs(String jointype,
  215.       Class<? extends SimpleCheckerBase> c) throws Exception {
  216.     final int srcs = 4;
  217.     Configuration conf = new Configuration();
  218.     JobConf job = new JobConf(conf, c);
  219.     Path base = cluster.getFileSystem().makeQualified(new Path("/"+jointype));
  220.     Path[] src = writeSimpleSrc(base, conf, srcs);
  221.     job.set("mapred.join.expr", CompositeInputFormat.compose(jointype,
  222.         SequenceFileInputFormat.class, src));
  223.     job.setInt("testdatamerge.sources", srcs);
  224.     job.setInputFormat(CompositeInputFormat.class);
  225.     FileOutputFormat.setOutputPath(job, new Path(base, "out"));
  226.     job.setMapperClass(c);
  227.     job.setReducerClass(c);
  228.     job.setOutputKeyClass(IntWritable.class);
  229.     job.setOutputValueClass(IntWritable.class);
  230.     JobClient.runJob(job);
  231.     base.getFileSystem(job).delete(base, true);
  232.   }
  233.   public void testSimpleInnerJoin() throws Exception {
  234.     joinAs("inner", InnerJoinChecker.class);
  235.   }
  236.   public void testSimpleOuterJoin() throws Exception {
  237.     joinAs("outer", OuterJoinChecker.class);
  238.   }
  239.   public void testSimpleOverride() throws Exception {
  240.     joinAs("override", OverrideChecker.class);
  241.   }
  242.   public void testNestedJoin() throws Exception {
  243.     // outer(inner(S1,...,Sn),outer(S1,...Sn))
  244.     final int SOURCES = 3;
  245.     final int ITEMS = (SOURCES + 1) * (SOURCES + 1);
  246.     JobConf job = new JobConf();
  247.     Path base = cluster.getFileSystem().makeQualified(new Path("/nested"));
  248.     int[][] source = new int[SOURCES][];
  249.     for (int i = 0; i < SOURCES; ++i) {
  250.       source[i] = new int[ITEMS];
  251.       for (int j = 0; j < ITEMS; ++j) {
  252.         source[i][j] = (i + 2) * (j + 1);
  253.       }
  254.     }
  255.     Path[] src = new Path[SOURCES];
  256.     SequenceFile.Writer out[] = createWriters(base, job, SOURCES, src);
  257.     IntWritable k = new IntWritable();
  258.     for (int i = 0; i < SOURCES; ++i) {
  259.       IntWritable v = new IntWritable();
  260.       v.set(i);
  261.       for (int j = 0; j < ITEMS; ++j) {
  262.         k.set(source[i][j]);
  263.         out[i].append(k, v);
  264.       }
  265.       out[i].close();
  266.     }
  267.     out = null;
  268.     StringBuilder sb = new StringBuilder();
  269.     sb.append("outer(inner(");
  270.     for (int i = 0; i < SOURCES; ++i) {
  271.       sb.append(
  272.           CompositeInputFormat.compose(SequenceFileInputFormat.class,
  273.             src[i].toString()));
  274.       if (i + 1 != SOURCES) sb.append(",");
  275.     }
  276.     sb.append("),outer(");
  277.     sb.append(CompositeInputFormat.compose(Fake_IF.class,"foobar"));
  278.     sb.append(",");
  279.     for (int i = 0; i < SOURCES; ++i) {
  280.       sb.append(
  281.           CompositeInputFormat.compose(SequenceFileInputFormat.class,
  282.             src[i].toString()));
  283.       sb.append(",");
  284.     }
  285.     sb.append(CompositeInputFormat.compose(Fake_IF.class,"raboof") + "))");
  286.     job.set("mapred.join.expr", sb.toString());
  287.     job.setInputFormat(CompositeInputFormat.class);
  288.     Path outf = new Path(base, "out");
  289.     FileOutputFormat.setOutputPath(job, outf);
  290.     Fake_IF.setKeyClass(job, IntWritable.class);
  291.     Fake_IF.setValClass(job, IntWritable.class);
  292.     job.setMapperClass(IdentityMapper.class);
  293.     job.setReducerClass(IdentityReducer.class);
  294.     job.setNumReduceTasks(0);
  295.     job.setOutputKeyClass(IntWritable.class);
  296.     job.setOutputValueClass(TupleWritable.class);
  297.     job.setOutputFormat(SequenceFileOutputFormat.class);
  298.     JobClient.runJob(job);
  299.     FileStatus[] outlist = cluster.getFileSystem().listStatus(outf);
  300.     assertEquals(1, outlist.length);
  301.     assertTrue(0 < outlist[0].getLen());
  302.     SequenceFile.Reader r =
  303.       new SequenceFile.Reader(cluster.getFileSystem(),
  304.           outlist[0].getPath(), job);
  305.     TupleWritable v = new TupleWritable();
  306.     while (r.next(k, v)) {
  307.       assertFalse(((TupleWritable)v.get(1)).has(0));
  308.       assertFalse(((TupleWritable)v.get(1)).has(SOURCES + 1));
  309.       boolean chk = true;
  310.       int ki = k.get();
  311.       for (int i = 2; i < SOURCES + 2; ++i) {
  312.         if ((ki % i) == 0 && ki <= i * ITEMS) {
  313.           assertEquals(i - 2, ((IntWritable)
  314.                               ((TupleWritable)v.get(1)).get((i - 1))).get());
  315.         } else chk = false;
  316.       }
  317.       if (chk) { // present in all sources; chk inner
  318.         assertTrue(v.has(0));
  319.         for (int i = 0; i < SOURCES; ++i)
  320.           assertTrue(((TupleWritable)v.get(0)).has(i));
  321.       } else { // should not be present in inner join
  322.         assertFalse(v.has(0));
  323.       }
  324.     }
  325.     r.close();
  326.     base.getFileSystem(job).delete(base, true);
  327.   }
  328.   public void testEmptyJoin() throws Exception {
  329.     JobConf job = new JobConf();
  330.     Path base = cluster.getFileSystem().makeQualified(new Path("/empty"));
  331.     Path[] src = { new Path(base,"i0"), new Path("i1"), new Path("i2") };
  332.     job.set("mapred.join.expr", CompositeInputFormat.compose("outer",
  333.         Fake_IF.class, src));
  334.     job.setInputFormat(CompositeInputFormat.class);
  335.     FileOutputFormat.setOutputPath(job, new Path(base, "out"));
  336.     job.setMapperClass(IdentityMapper.class);
  337.     job.setReducerClass(IdentityReducer.class);
  338.     job.setOutputKeyClass(IncomparableKey.class);
  339.     job.setOutputValueClass(NullWritable.class);
  340.     JobClient.runJob(job);
  341.     base.getFileSystem(job).delete(base, true);
  342.   }
  343.   public static class Fake_IF<K,V>
  344.       implements InputFormat<K,V>, JobConfigurable {
  345.     public static class FakeSplit implements InputSplit {
  346.       public void write(DataOutput out) throws IOException { }
  347.       public void readFields(DataInput in) throws IOException { }
  348.       public long getLength() { return 0L; }
  349.       public String[] getLocations() { return new String[0]; }
  350.     }
  351.     public static void setKeyClass(JobConf job, Class<?> k) {
  352.       job.setClass("test.fakeif.keyclass", k, WritableComparable.class);
  353.     }
  354.     public static void setValClass(JobConf job, Class<?> v) {
  355.       job.setClass("test.fakeif.valclass", v, Writable.class);
  356.     }
  357.     private Class<? extends K> keyclass;
  358.     private Class<? extends V> valclass;
  359.     @SuppressWarnings("unchecked")
  360.     public void configure(JobConf job) {
  361.       keyclass = (Class<? extends K>) job.getClass("test.fakeif.keyclass",
  362.     IncomparableKey.class, WritableComparable.class);
  363.       valclass = (Class<? extends V>) job.getClass("test.fakeif.valclass",
  364.     NullWritable.class, WritableComparable.class);
  365.     }
  366.     public Fake_IF() { }
  367.     public InputSplit[] getSplits(JobConf conf, int splits) {
  368.       return new InputSplit[] { new FakeSplit() };
  369.     }
  370.     public RecordReader<K,V> getRecordReader(
  371.         InputSplit ignored, JobConf conf, Reporter reporter) {
  372.       return new RecordReader<K,V>() {
  373.         public boolean next(K key, V value) throws IOException { return false; }
  374.         public K createKey() {
  375.           return ReflectionUtils.newInstance(keyclass, null);
  376.         }
  377.         public V createValue() {
  378.           return ReflectionUtils.newInstance(valclass, null);
  379.         }
  380.         public long getPos() throws IOException { return 0L; }
  381.         public void close() throws IOException { }
  382.         public float getProgress() throws IOException { return 0.0f; }
  383.       };
  384.     }
  385.   }
  386. }