TestMultipleOutputs.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:8k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred.lib;
  19. import org.apache.hadoop.fs.FileStatus;
  20. import org.apache.hadoop.fs.FileSystem;
  21. import org.apache.hadoop.fs.Path;
  22. import org.apache.hadoop.io.LongWritable;
  23. import org.apache.hadoop.io.SequenceFile;
  24. import org.apache.hadoop.io.Text;
  25. import org.apache.hadoop.mapred.*;
  26. import java.io.BufferedReader;
  27. import java.io.DataOutputStream;
  28. import java.io.IOException;
  29. import java.io.InputStreamReader;
  30. import java.util.Iterator;
  31. public class TestMultipleOutputs extends HadoopTestCase {
  32.   public TestMultipleOutputs() throws IOException {
  33.     super(HadoopTestCase.LOCAL_MR, HadoopTestCase.LOCAL_FS, 1, 1);
  34.   }
  35.   public void testWithoutCounters() throws Exception {
  36.     _testMultipleOutputs(false);
  37.   }
  38.   public void testWithCounters() throws Exception {
  39.     _testMultipleOutputs(true);
  40.   }
  41.   private static final Path ROOT_DIR = new Path("testing/mo");
  42.   private static final Path IN_DIR = new Path(ROOT_DIR, "input");
  43.   private static final Path OUT_DIR = new Path(ROOT_DIR, "output");
  44.   private Path getDir(Path dir) {
  45.     // Hack for local FS that does not have the concept of a 'mounting point'
  46.     if (isLocalFS()) {
  47.       String localPathRoot = System.getProperty("test.build.data", "/tmp")
  48.         .replace(' ', '+');
  49.       dir = new Path(localPathRoot, dir);
  50.     }
  51.     return dir;
  52.   }
  53.   public void setUp() throws Exception {
  54.     super.setUp();
  55.     Path rootDir = getDir(ROOT_DIR);
  56.     Path inDir = getDir(IN_DIR);
  57.     JobConf conf = createJobConf();
  58.     FileSystem fs = FileSystem.get(conf);
  59.     fs.delete(rootDir, true);
  60.     if (!fs.mkdirs(inDir)) {
  61.       throw new IOException("Mkdirs failed to create " + inDir.toString());
  62.     }
  63.   }
  64.   public void tearDown() throws Exception {
  65.     Path rootDir = getDir(ROOT_DIR);
  66.     JobConf conf = createJobConf();
  67.     FileSystem fs = FileSystem.get(conf);
  68.     fs.delete(rootDir, true);
  69.     super.tearDown();
  70.   }
  71.   protected void _testMultipleOutputs(boolean withCounters) throws Exception {
  72.     Path inDir = getDir(IN_DIR);
  73.     Path outDir = getDir(OUT_DIR);
  74.     JobConf conf = createJobConf();
  75.     FileSystem fs = FileSystem.get(conf);
  76.     DataOutputStream file = fs.create(new Path(inDir, "part-0"));
  77.     file.writeBytes("anbnncndne");
  78.     file.close();
  79.     file = fs.create(new Path(inDir, "part-1"));
  80.     file.writeBytes("anbnncndne");
  81.     file.close();
  82.     conf.setJobName("mo");
  83.     conf.setInputFormat(TextInputFormat.class);
  84.     conf.setOutputKeyClass(LongWritable.class);
  85.     conf.setOutputValueClass(Text.class);
  86.     conf.setMapOutputKeyClass(LongWritable.class);
  87.     conf.setMapOutputValueClass(Text.class);
  88.     conf.setOutputFormat(TextOutputFormat.class);
  89.     conf.setOutputKeyClass(LongWritable.class);
  90.     conf.setOutputValueClass(Text.class);
  91.     MultipleOutputs.addNamedOutput(conf, "text", TextOutputFormat.class,
  92.       LongWritable.class, Text.class);
  93.     MultipleOutputs.addMultiNamedOutput(conf, "sequence",
  94.       SequenceFileOutputFormat.class, LongWritable.class, Text.class);
  95.     MultipleOutputs.setCountersEnabled(conf, withCounters);
  96.     conf.setMapperClass(MOMap.class);
  97.     conf.setReducerClass(MOReduce.class);
  98.     FileInputFormat.setInputPaths(conf, inDir);
  99.     FileOutputFormat.setOutputPath(conf, outDir);
  100.     JobClient jc = new JobClient(conf);
  101.     RunningJob job = jc.submitJob(conf);
  102.     while (!job.isComplete()) {
  103.       Thread.sleep(100);
  104.     }
  105.     // assert number of named output part files
  106.     int namedOutputCount = 0;
  107.     FileStatus[] statuses = fs.listStatus(outDir);
  108.     for (FileStatus status : statuses) {
  109.       if (status.getPath().getName().equals("text-m-00000") ||
  110.         status.getPath().getName().equals("text-m-00001") ||
  111.         status.getPath().getName().equals("text-r-00000") ||
  112.         status.getPath().getName().equals("sequence_A-m-00000") ||
  113.         status.getPath().getName().equals("sequence_A-m-00001") ||
  114.         status.getPath().getName().equals("sequence_B-m-00000") ||
  115.         status.getPath().getName().equals("sequence_B-m-00001") ||
  116.         status.getPath().getName().equals("sequence_B-r-00000") ||
  117.         status.getPath().getName().equals("sequence_C-r-00000")) {
  118.         namedOutputCount++;
  119.       }
  120.     }
  121.     assertEquals(9, namedOutputCount);
  122.     // assert TextOutputFormat files correctness
  123.     BufferedReader reader = new BufferedReader(
  124.       new InputStreamReader(fs.open(
  125.         new Path(FileOutputFormat.getOutputPath(conf), "text-r-00000"))));
  126.     int count = 0;
  127.     String line = reader.readLine();
  128.     while (line != null) {
  129.       assertTrue(line.endsWith("text"));
  130.       line = reader.readLine();
  131.       count++;
  132.     }
  133.     reader.close();
  134.     assertFalse(count == 0);
  135.     // assert SequenceOutputFormat files correctness
  136.     SequenceFile.Reader seqReader =
  137.       new SequenceFile.Reader(fs, new Path(FileOutputFormat.getOutputPath(conf),
  138.         "sequence_B-r-00000"), conf);
  139.     assertEquals(LongWritable.class, seqReader.getKeyClass());
  140.     assertEquals(Text.class, seqReader.getValueClass());
  141.     count = 0;
  142.     LongWritable key = new LongWritable();
  143.     Text value = new Text();
  144.     while (seqReader.next(key, value)) {
  145.       assertEquals("sequence", value.toString());
  146.       count++;
  147.     }
  148.     seqReader.close();
  149.     assertFalse(count == 0);
  150.     Counters.Group counters =
  151.       job.getCounters().getGroup(MultipleOutputs.class.getName());
  152.     if (!withCounters) {
  153.       assertEquals(0, counters.size());
  154.     }
  155.     else {
  156.       assertEquals(4, counters.size());
  157.       assertEquals(4, counters.getCounter("text"));
  158.       assertEquals(2, counters.getCounter("sequence_A"));
  159.       assertEquals(4, counters.getCounter("sequence_B"));
  160.       assertEquals(2, counters.getCounter("sequence_C"));
  161.     }
  162.   }
  163.   @SuppressWarnings({"unchecked"})
  164.   public static class MOMap implements Mapper<LongWritable, Text, LongWritable,
  165.     Text> {
  166.     private MultipleOutputs mos;
  167.     public void configure(JobConf conf) {
  168.       mos = new MultipleOutputs(conf);
  169.     }
  170.     public void map(LongWritable key, Text value,
  171.                     OutputCollector<LongWritable, Text> output,
  172.                     Reporter reporter)
  173.       throws IOException {
  174.       if (!value.toString().equals("a")) {
  175.         output.collect(key, value);
  176.       } else {
  177.         mos.getCollector("text", reporter).collect(key, new Text("text"));
  178.         mos.getCollector("sequence", "A", reporter).collect(key,
  179.           new Text("sequence"));
  180.         mos.getCollector("sequence", "B", reporter).collect(key,
  181.           new Text("sequence"));
  182.       }
  183.     }
  184.     public void close() throws IOException {
  185.       mos.close();
  186.     }
  187.   }
  188.   @SuppressWarnings({"unchecked"})
  189.   public static class MOReduce implements Reducer<LongWritable, Text,
  190.     LongWritable, Text> {
  191.     private MultipleOutputs mos;
  192.     public void configure(JobConf conf) {
  193.       mos = new MultipleOutputs(conf);
  194.     }
  195.     public void reduce(LongWritable key, Iterator<Text> values,
  196.                        OutputCollector<LongWritable, Text> output,
  197.                        Reporter reporter)
  198.       throws IOException {
  199.       while (values.hasNext()) {
  200.         Text value = values.next();
  201.         if (!value.toString().equals("b")) {
  202.           output.collect(key, value);
  203.         } else {
  204.           mos.getCollector("text", reporter).collect(key, new Text("text"));
  205.           mos.getCollector("sequence", "B", reporter).collect(key,
  206.             new Text("sequence"));
  207.           mos.getCollector("sequence", "C", reporter).collect(key,
  208.             new Text("sequence"));
  209.         }
  210.       }
  211.     }
  212.     public void close() throws IOException {
  213.       mos.close();
  214.     }
  215.   }
  216. }