TestMultithreadedMapRunner.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:5k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred.lib;
  19. import org.apache.hadoop.fs.FileSystem;
  20. import org.apache.hadoop.fs.Path;
  21. import org.apache.hadoop.io.LongWritable;
  22. import org.apache.hadoop.io.Text;
  23. import org.apache.hadoop.mapred.*;
  24. import java.io.DataOutputStream;
  25. import java.io.IOException;
  26. import java.util.Iterator;
  27. public class TestMultithreadedMapRunner extends HadoopTestCase {
  28.   public TestMultithreadedMapRunner() throws IOException {
  29.     super(HadoopTestCase.LOCAL_MR, HadoopTestCase.LOCAL_FS, 1, 1);
  30.   }
  31.   public void testOKRun() throws Exception {
  32.     run(false, false);
  33.   }
  34.   public void testIOExRun() throws Exception {
  35.     run(true, false);
  36.   }
  37.   public void testRuntimeExRun() throws Exception {
  38.     run(false, true);
  39.   }
  40.   private void run(boolean ioEx, boolean rtEx) throws Exception {
  41.     Path inDir = new Path("testing/mt/input");
  42.     Path outDir = new Path("testing/mt/output");
  43.     // Hack for local FS that does not have the concept of a 'mounting point'
  44.     if (isLocalFS()) {
  45.       String localPathRoot = System.getProperty("test.build.data", "/tmp")
  46.               .replace(' ', '+');
  47.       inDir = new Path(localPathRoot, inDir);
  48.       outDir = new Path(localPathRoot, outDir);
  49.     }
  50.     JobConf conf = createJobConf();
  51.     FileSystem fs = FileSystem.get(conf);
  52.     fs.delete(outDir, true);
  53.     if (!fs.mkdirs(inDir)) {
  54.       throw new IOException("Mkdirs failed to create " + inDir.toString());
  55.     }
  56.     {
  57.       DataOutputStream file = fs.create(new Path(inDir, "part-0"));
  58.       file.writeBytes("anbnncndne");
  59.       file.close();
  60.     }
  61.     conf.setJobName("mt");
  62.     conf.setInputFormat(TextInputFormat.class);
  63.     conf.setOutputKeyClass(LongWritable.class);
  64.     conf.setOutputValueClass(Text.class);
  65.     conf.setMapOutputKeyClass(LongWritable.class);
  66.     conf.setMapOutputValueClass(Text.class);
  67.     conf.setOutputFormat(TextOutputFormat.class);
  68.     conf.setOutputKeyClass(LongWritable.class);
  69.     conf.setOutputValueClass(Text.class);
  70.     conf.setMapperClass(IDMap.class);
  71.     conf.setReducerClass(IDReduce.class);
  72.     FileInputFormat.setInputPaths(conf, inDir);
  73.     FileOutputFormat.setOutputPath(conf, outDir);
  74.     conf.setMapRunnerClass(MultithreadedMapRunner.class);
  75.     
  76.     conf.setInt("mapred.map.multithreadedrunner.threads", 2);
  77.     if (ioEx) {
  78.       conf.setBoolean("multithreaded.ioException", true);
  79.     }
  80.     if (rtEx) {
  81.       conf.setBoolean("multithreaded.runtimeException", true);
  82.     }
  83.     JobClient jc = new JobClient(conf);
  84.     RunningJob job =jc.submitJob(conf);
  85.     while (!job.isComplete()) {
  86.       Thread.sleep(100);
  87.     }
  88.     if (job.isSuccessful()) {
  89.       assertFalse(ioEx || rtEx);
  90.     }
  91.     else {
  92.       assertTrue(ioEx || rtEx);
  93.     }
  94.   }
  95.   public static class IDMap implements Mapper<LongWritable, Text,
  96.                                               LongWritable, Text> {
  97.     private boolean ioEx = false;
  98.     private boolean rtEx = false;
  99.     public void configure(JobConf job) {
  100.       ioEx = job.getBoolean("multithreaded.ioException", false);
  101.       rtEx = job.getBoolean("multithreaded.runtimeException", false);
  102.     }
  103.     public void map(LongWritable key, Text value,
  104.                     OutputCollector<LongWritable, Text> output,
  105.                     Reporter reporter)
  106.             throws IOException {
  107.       if (ioEx) {
  108.         throw new IOException();
  109.       }
  110.       if (rtEx) {
  111.         throw new RuntimeException();
  112.       }
  113.       output.collect(key, value);
  114.       try {
  115.         Thread.sleep(100);
  116.       } catch (InterruptedException ex) {
  117.         throw new RuntimeException(ex);
  118.       }
  119.     }
  120.     public void close() throws IOException {
  121.     }
  122.   }
  123.   public static class IDReduce implements Reducer<LongWritable, Text,
  124.                                                   LongWritable, Text> {
  125.     public void configure(JobConf job) {
  126.     }
  127.     public void reduce(LongWritable key, Iterator<Text> values,
  128.                        OutputCollector<LongWritable, Text> output,
  129.                        Reporter reporter)
  130.             throws IOException {
  131.       while (values.hasNext()) {
  132.         output.collect(key, values.next());
  133.       }
  134.     }
  135.     public void close() throws IOException {
  136.     }
  137.   }
  138. }