TestMapOutputType.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:5k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import org.apache.hadoop.fs.*;
  20. import org.apache.hadoop.io.*;
  21. import org.apache.hadoop.mapred.lib.*;
  22. import junit.framework.TestCase;
  23. import java.io.*;
  24. import java.util.*;
  25. /** 
  26.  * TestMapOutputType checks whether the Map task handles type mismatch
  27.  * between mapper output and the type specified in
  28.  * JobConf.MapOutputKeyType and JobConf.MapOutputValueType.
  29.  */
  30. public class TestMapOutputType extends TestCase 
  31. {
  32.   JobConf conf = new JobConf(TestMapOutputType.class);
  33.   JobClient jc;
  34.   /** 
  35.    * TextGen is a Mapper that generates a Text key-value pair. The
  36.    * type specified in conf will be anything but.
  37.    */
  38.    
  39.   static class TextGen
  40.     implements Mapper<WritableComparable, Writable, Text, Text> {
  41.     
  42.     public void configure(JobConf job) {
  43.     }
  44.     
  45.     public void map(WritableComparable key, Writable val,
  46.                     OutputCollector<Text, Text> out,
  47.                     Reporter reporter) throws IOException {
  48.       out.collect(new Text("Hello"), new Text("World"));
  49.     }
  50.     
  51.     public void close() {
  52.     }
  53.   }
  54.   
  55.   /** A do-nothing reducer class. We won't get this far, really.
  56.    *
  57.    */
  58.   static class TextReduce
  59.     implements Reducer<Text, Text, Text, Text> {
  60.     
  61.     public void configure(JobConf job) {
  62.     }
  63.     public void reduce(Text key,
  64.                        Iterator<Text> values,
  65.                        OutputCollector<Text, Text> out,
  66.                        Reporter reporter) throws IOException {
  67.       out.collect(new Text("Test"), new Text("Me"));
  68.     }
  69.     public void close() {
  70.     }
  71.   }
  72.   public void configure() throws Exception {
  73.     Path testdir = new Path("build/test/test.mapred.spill");
  74.     Path inDir = new Path(testdir, "in");
  75.     Path outDir = new Path(testdir, "out");
  76.     FileSystem fs = FileSystem.get(conf);
  77.     fs.delete(testdir, true);
  78.     conf.setInt("io.sort.mb", 1);
  79.     conf.setInputFormat(SequenceFileInputFormat.class);
  80.     FileInputFormat.setInputPaths(conf, inDir);
  81.     FileOutputFormat.setOutputPath(conf, outDir);
  82.     conf.setMapperClass(TextGen.class);
  83.     conf.setReducerClass(TextReduce.class);
  84.     conf.setOutputKeyClass(Text.class);
  85.     conf.setOutputValueClass(Text.class); 
  86.     
  87.     conf.setOutputFormat(SequenceFileOutputFormat.class);
  88.     if (!fs.mkdirs(testdir)) {
  89.       throw new IOException("Mkdirs failed to create " + testdir.toString());
  90.     }
  91.     if (!fs.mkdirs(inDir)) {
  92.       throw new IOException("Mkdirs failed to create " + inDir.toString());
  93.     }
  94.     Path inFile = new Path(inDir, "part0");
  95.     SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, inFile, 
  96.                                                            Text.class, Text.class);
  97.     writer.append(new Text("rec: 1"), new Text("Hello"));
  98.     writer.close();
  99.     
  100.     jc = new JobClient(conf);
  101.   }
  102.   
  103.   public void testKeyMismatch() throws Exception {
  104.     configure();
  105.     
  106.     //  Set bad MapOutputKeyClass and MapOutputValueClass
  107.     conf.setMapOutputKeyClass(IntWritable.class);
  108.     conf.setMapOutputValueClass(IntWritable.class);
  109.     
  110.     RunningJob r_job = jc.submitJob(conf);
  111.     while (!r_job.isComplete()) {
  112.       Thread.sleep(1000);
  113.     }
  114.     
  115.     if (r_job.isSuccessful()) {
  116.       fail("Oops! The job was supposed to break due to an exception");
  117.     }
  118.   }
  119.   
  120.   public void testValueMismatch() throws Exception {
  121.     configure();
  122.   
  123.     // Set good MapOutputKeyClass, bad MapOutputValueClass    
  124.     conf.setMapOutputKeyClass(Text.class);
  125.     conf.setMapOutputValueClass(IntWritable.class);
  126.     
  127.     RunningJob r_job = jc.submitJob(conf);
  128.     while (!r_job.isComplete()) {
  129.       Thread.sleep(1000);
  130.     }
  131.     
  132.     if (r_job.isSuccessful()) {
  133.       fail("Oops! The job was supposed to break due to an exception");
  134.     }
  135.   }
  136.   
  137.   public void testNoMismatch() throws Exception{ 
  138.     configure();
  139.     
  140.     //  Set good MapOutputKeyClass and MapOutputValueClass    
  141.     conf.setMapOutputKeyClass(Text.class);
  142.     conf.setMapOutputValueClass(Text.class);
  143.      
  144.     RunningJob r_job = jc.submitJob(conf);
  145.     while (!r_job.isComplete()) {
  146.       Thread.sleep(1000);
  147.     }
  148.      
  149.     if (!r_job.isSuccessful()) {
  150.       fail("Oops! The job broke due to an unexpected error");
  151.     }
  152.   }
  153. }