TestChainMapReduce.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:9k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred.lib;
  19. import org.apache.hadoop.fs.FileSystem;
  20. import org.apache.hadoop.fs.Path;
  21. import org.apache.hadoop.io.LongWritable;
  22. import org.apache.hadoop.io.Text;
  23. import org.apache.hadoop.mapred.*;
  24. import java.io.DataOutputStream;
  25. import java.io.IOException;
  26. import java.util.Iterator;
  27. public class TestChainMapReduce extends HadoopTestCase {
  28.   private static Path getFlagDir(boolean local) {
  29.     Path flagDir = new Path("testing/chain/flags");
  30.     // Hack for local FS that does not have the concept of a 'mounting point'
  31.     if (local) {
  32.       String localPathRoot = System.getProperty("test.build.data", "/tmp")
  33.         .replace(' ', '+');
  34.       flagDir = new Path(localPathRoot, flagDir);
  35.     }
  36.     return flagDir;
  37.   }
  38.   private static void cleanFlags(JobConf conf) throws IOException {
  39.     FileSystem fs = FileSystem.get(conf);
  40.     fs.delete(getFlagDir(conf.getBoolean("localFS", true)), true);
  41.     fs.mkdirs(getFlagDir(conf.getBoolean("localFS", true)));
  42.   }
  43.   private static void writeFlag(JobConf conf, String flag) throws IOException {
  44.     FileSystem fs = FileSystem.get(conf);
  45.     if (getFlag(conf, flag)) {
  46.       fail("Flag " + flag + " already exists");
  47.     }
  48.     DataOutputStream file =
  49.       fs.create(new Path(getFlagDir(conf.getBoolean("localFS", true)), flag));
  50.     file.close();
  51.   }
  52.   private static boolean getFlag(JobConf conf, String flag) throws IOException {
  53.     FileSystem fs = FileSystem.get(conf);
  54.     return fs
  55.       .exists(new Path(getFlagDir(conf.getBoolean("localFS", true)), flag));
  56.   }
  57.   public TestChainMapReduce() throws IOException {
  58.     super(HadoopTestCase.LOCAL_MR, HadoopTestCase.LOCAL_FS, 1, 1);
  59.   }
  60.   public void testChain() throws Exception {
  61.     Path inDir = new Path("testing/chain/input");
  62.     Path outDir = new Path("testing/chain/output");
  63.     // Hack for local FS that does not have the concept of a 'mounting point'
  64.     if (isLocalFS()) {
  65.       String localPathRoot = System.getProperty("test.build.data", "/tmp")
  66.         .replace(' ', '+');
  67.       inDir = new Path(localPathRoot, inDir);
  68.       outDir = new Path(localPathRoot, outDir);
  69.     }
  70.     JobConf conf = createJobConf();
  71.     conf.setBoolean("localFS", isLocalFS());
  72.     cleanFlags(conf);
  73.     FileSystem fs = FileSystem.get(conf);
  74.     fs.delete(outDir, true);
  75.     if (!fs.mkdirs(inDir)) {
  76.       throw new IOException("Mkdirs failed to create " + inDir.toString());
  77.     }
  78.     DataOutputStream file = fs.create(new Path(inDir, "part-0"));
  79.     file.writeBytes("1n2n");
  80.     file.close();
  81.     conf.setJobName("chain");
  82.     conf.setInputFormat(TextInputFormat.class);
  83.     conf.setOutputFormat(TextOutputFormat.class);
  84.     conf.set("a", "X");
  85.     JobConf mapAConf = new JobConf(false);
  86.     mapAConf.set("a", "A");
  87.     ChainMapper.addMapper(conf, AMap.class, LongWritable.class, Text.class,
  88.                           LongWritable.class, Text.class, true, mapAConf);
  89.     ChainMapper.addMapper(conf, BMap.class, LongWritable.class, Text.class,
  90.                           LongWritable.class, Text.class, false, null);
  91.     JobConf reduceConf = new JobConf(false);
  92.     reduceConf.set("a", "C");
  93.     ChainReducer.setReducer(conf, CReduce.class, LongWritable.class, Text.class,
  94.                             LongWritable.class, Text.class, true, reduceConf);
  95.     ChainReducer.addMapper(conf, DMap.class, LongWritable.class, Text.class,
  96.                            LongWritable.class, Text.class, false, null);
  97.     JobConf mapEConf = new JobConf(false);
  98.     mapEConf.set("a", "E");
  99.     ChainReducer.addMapper(conf, EMap.class, LongWritable.class, Text.class,
  100.                            LongWritable.class, Text.class, true, mapEConf);
  101.     FileInputFormat.setInputPaths(conf, inDir);
  102.     FileOutputFormat.setOutputPath(conf, outDir);
  103.     JobClient jc = new JobClient(conf);
  104.     RunningJob job = jc.submitJob(conf);
  105.     while (!job.isComplete()) {
  106.       Thread.sleep(100);
  107.     }
  108.     assertTrue(getFlag(conf, "configure.A"));
  109.     assertTrue(getFlag(conf, "configure.B"));
  110.     assertTrue(getFlag(conf, "configure.C"));
  111.     assertTrue(getFlag(conf, "configure.D"));
  112.     assertTrue(getFlag(conf, "configure.E"));
  113.     assertTrue(getFlag(conf, "map.A.value.1"));
  114.     assertTrue(getFlag(conf, "map.A.value.2"));
  115.     assertTrue(getFlag(conf, "map.B.value.1"));
  116.     assertTrue(getFlag(conf, "map.B.value.2"));
  117.     assertTrue(getFlag(conf, "reduce.C.value.2"));
  118.     assertTrue(getFlag(conf, "reduce.C.value.1"));
  119.     assertTrue(getFlag(conf, "map.D.value.1"));
  120.     assertTrue(getFlag(conf, "map.D.value.2"));
  121.     assertTrue(getFlag(conf, "map.E.value.1"));
  122.     assertTrue(getFlag(conf, "map.E.value.2"));
  123.     assertTrue(getFlag(conf, "close.A"));
  124.     assertTrue(getFlag(conf, "close.B"));
  125.     assertTrue(getFlag(conf, "close.C"));
  126.     assertTrue(getFlag(conf, "close.D"));
  127.     assertTrue(getFlag(conf, "close.E"));
  128.   }
  129.   public static class AMap extends IDMap {
  130.     public AMap() {
  131.       super("A", "A", true);
  132.     }
  133.   }
  134.   public static class BMap extends IDMap {
  135.     public BMap() {
  136.       super("B", "X", false);
  137.     }
  138.   }
  139.   public static class CReduce extends IDReduce {
  140.     public CReduce() {
  141.       super("C", "C");
  142.     }
  143.   }
  144.   public static class DMap extends IDMap {
  145.     public DMap() {
  146.       super("D", "X", false);
  147.     }
  148.   }
  149.   public static class EMap extends IDMap {
  150.     public EMap() {
  151.       super("E", "E", true);
  152.     }
  153.   }
  154.   public static class IDMap
  155.     implements Mapper<LongWritable, Text, LongWritable, Text> {
  156.     private JobConf conf;
  157.     private String name;
  158.     private String prop;
  159.     private boolean byValue;
  160.     public IDMap(String name, String prop, boolean byValue) {
  161.       this.name = name;
  162.       this.prop = prop;
  163.       this.byValue = byValue;
  164.     }
  165.     public void configure(JobConf conf) {
  166.       this.conf = conf;
  167.       assertEquals(prop, conf.get("a"));
  168.       try {
  169.         writeFlag(conf, "configure." + name);
  170.       } catch (IOException ex) {
  171.         throw new RuntimeException(ex);
  172.       }
  173.     }
  174.     public void map(LongWritable key, Text value,
  175.                     OutputCollector<LongWritable, Text> output,
  176.                     Reporter reporter) throws IOException {
  177.       writeFlag(conf, "map." + name + ".value." + value);
  178.       key.set(10);
  179.       output.collect(key, value);
  180.       if (byValue) {
  181.         assertEquals(10, key.get());
  182.       } else {
  183.         assertNotSame(10, key.get());
  184.       }
  185.       key.set(11);
  186.     }
  187.     public void close() throws IOException {
  188.       try {
  189.         writeFlag(conf, "close." + name);
  190.       } catch (IOException ex) {
  191.         throw new RuntimeException(ex);
  192.       }
  193.     }
  194.   }
  195.   public static class IDReduce
  196.     implements Reducer<LongWritable, Text, LongWritable, Text> {
  197.     private JobConf conf;
  198.     private String name;
  199.     private String prop;
  200.     private boolean byValue = false;
  201.     public IDReduce(String name, String prop) {
  202.       this.name = name;
  203.       this.prop = prop;
  204.     }
  205.     public void configure(JobConf conf) {
  206.       this.conf = conf;
  207.       assertEquals(prop, conf.get("a"));
  208.       try {
  209.         writeFlag(conf, "configure." + name);
  210.       } catch (IOException ex) {
  211.         throw new RuntimeException(ex);
  212.       }
  213.     }
  214.     public void reduce(LongWritable key, Iterator<Text> values,
  215.                        OutputCollector<LongWritable, Text> output,
  216.                        Reporter reporter) throws IOException {
  217.       while (values.hasNext()) {
  218.         Text value = values.next();
  219.         writeFlag(conf, "reduce." + name + ".value." + value);
  220.         key.set(10);
  221.         output.collect(key, value);
  222.         if (byValue) {
  223.           assertEquals(10, key.get());
  224.         } else {
  225.           assertNotSame(10, key.get());
  226.         }
  227.         key.set(11);
  228.       }
  229.     }
  230.     public void close() throws IOException {
  231.       try {
  232.         writeFlag(conf, "close." + name);
  233.       } catch (IOException ex) {
  234.         throw new RuntimeException(ex);
  235.       }
  236.     }
  237.   }
  238. }