TestTaskFail.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:7k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.DataOutputStream;
  20. import java.io.IOException;
  21. import junit.framework.TestCase;
  22. import org.apache.hadoop.conf.Configuration;
  23. import org.apache.hadoop.fs.FileSystem;
  24. import org.apache.hadoop.fs.Path;
  25. import org.apache.hadoop.hdfs.MiniDFSCluster;
  26. import org.apache.hadoop.io.IntWritable;
  27. import org.apache.hadoop.io.LongWritable;
  28. import org.apache.hadoop.io.Text;
  29. import org.apache.hadoop.mapred.lib.IdentityReducer;
  30. public class TestTaskFail extends TestCase {
  31.   private static String taskLog = "Task attempt log";
  32.   private static String cleanupLog = "cleanup attempt log";
  33.   public static class MapperClass extends MapReduceBase
  34.   implements Mapper<LongWritable, Text, Text, IntWritable> {
  35.     String taskid;
  36.     public void configure(JobConf job) {
  37.       taskid = job.get("mapred.task.id");
  38.     }
  39.     public void map (LongWritable key, Text value, 
  40.                      OutputCollector<Text, IntWritable> output, 
  41.                      Reporter reporter) throws IOException {
  42.       System.err.println(taskLog);
  43.       if (taskid.endsWith("_0")) {
  44.         throw new IOException();
  45.       } else if (taskid.endsWith("_1")) {
  46.         System.exit(-1);
  47.       } 
  48.     }
  49.   }
  50.   static class CommitterWithLogs extends FileOutputCommitter {
  51.     public void abortTask(TaskAttemptContext context) throws IOException {
  52.       System.err.println(cleanupLog);
  53.       super.abortTask(context);
  54.     }
  55.   }
  56.   static class CommitterWithFailTaskCleanup extends FileOutputCommitter {
  57.     public void abortTask(TaskAttemptContext context) throws IOException {
  58.       System.err.println(cleanupLog);
  59.       System.exit(-1);
  60.     }
  61.   }
  62.   static class CommitterWithFailTaskCleanup2 extends FileOutputCommitter {
  63.     public void abortTask(TaskAttemptContext context) throws IOException {
  64.       System.err.println(cleanupLog);
  65.       throw new IOException();
  66.     }
  67.   }
  68.   public RunningJob launchJob(JobConf conf,
  69.                               Path inDir,
  70.                               Path outDir,
  71.                               String input) 
  72.   throws IOException {
  73.     // set up the input file system and write input text.
  74.     FileSystem inFs = inDir.getFileSystem(conf);
  75.     FileSystem outFs = outDir.getFileSystem(conf);
  76.     outFs.delete(outDir, true);
  77.     if (!inFs.mkdirs(inDir)) {
  78.       throw new IOException("Mkdirs failed to create " + inDir.toString());
  79.     }
  80.     {
  81.       // write input into input file
  82.       DataOutputStream file = inFs.create(new Path(inDir, "part-0"));
  83.       file.writeBytes(input);
  84.       file.close();
  85.     }
  86.     // configure the mapred Job
  87.     conf.setMapperClass(MapperClass.class);        
  88.     conf.setReducerClass(IdentityReducer.class);
  89.     FileInputFormat.setInputPaths(conf, inDir);
  90.     FileOutputFormat.setOutputPath(conf, outDir);
  91.     String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data",
  92.                                     "/tmp")).toString().replace(' ', '+');
  93.     conf.set("test.build.data", TEST_ROOT_DIR);
  94.     // return the RunningJob handle.
  95.     return new JobClient(conf).submitJob(conf);
  96.   }
  97.   
  98.   private void validateJob(RunningJob job, MiniMRCluster mr) 
  99.   throws IOException {
  100.     assertEquals(JobStatus.SUCCEEDED, job.getJobState());
  101.     
  102.     JobID jobId = job.getID();
  103.     // construct the task id of first map task
  104.     TaskAttemptID attemptId = 
  105.       new TaskAttemptID(new TaskID(jobId, true, 0), 0);
  106.     TaskInProgress tip = mr.getJobTrackerRunner().getJobTracker().
  107.                             getTip(attemptId.getTaskID());
  108.     // this should not be cleanup attempt since the first attempt 
  109.     // fails with an exception
  110.     assertTrue(!tip.isCleanupAttempt(attemptId));
  111.     TaskStatus ts = 
  112.       mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
  113.     assertTrue(ts != null);
  114.     assertEquals(TaskStatus.State.FAILED, ts.getRunState());
  115.     // validate task logs: tasklog should contain both task logs
  116.     // and cleanup logs
  117.     String log = TestMiniMRMapRedDebugScript.readTaskLog(
  118.                       TaskLog.LogName.STDERR, attemptId, false);
  119.     assertTrue(log.contains(taskLog));
  120.     assertTrue(log.contains(cleanupLog));
  121.     
  122.     attemptId =  new TaskAttemptID(new TaskID(jobId, true, 0), 1);
  123.     // this should be cleanup attempt since the second attempt fails
  124.     // with System.exit
  125.     assertTrue(tip.isCleanupAttempt(attemptId));
  126.     ts = mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
  127.     assertTrue(ts != null);
  128.     assertEquals(TaskStatus.State.FAILED, ts.getRunState());
  129.     // validate tasklogs for task attempt
  130.     log = TestMiniMRMapRedDebugScript.readTaskLog(
  131.                TaskLog.LogName.STDERR, attemptId, false);
  132.     assertTrue(log.contains(taskLog));
  133.     // validate tasklogs for cleanup attempt
  134.     log = TestMiniMRMapRedDebugScript.readTaskLog(
  135.                TaskLog.LogName.STDERR, attemptId, true);
  136.     assertTrue(log.contains(cleanupLog));
  137.   }
  138.   
  139.   public void testWithDFS() throws IOException {
  140.     MiniDFSCluster dfs = null;
  141.     MiniMRCluster mr = null;
  142.     FileSystem fileSys = null;
  143.     try {
  144.       final int taskTrackers = 4;
  145.       Configuration conf = new Configuration();
  146.       dfs = new MiniDFSCluster(conf, 4, true, null);
  147.       fileSys = dfs.getFileSystem();
  148.       mr = new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1);
  149.       final Path inDir = new Path("./input");
  150.       final Path outDir = new Path("./output");
  151.       String input = "The quick brown foxnhas many sillynred fox soxn";
  152.       // launch job with fail tasks
  153.       JobConf jobConf = mr.createJobConf();
  154.       jobConf.setOutputCommitter(CommitterWithLogs.class);
  155.       RunningJob rJob = launchJob(jobConf, inDir, outDir, input);
  156.       rJob.waitForCompletion();
  157.       validateJob(rJob, mr);
  158.       // launch job with fail tasks and fail-cleanups
  159.       fileSys.delete(outDir, true);
  160.       jobConf.setOutputCommitter(CommitterWithFailTaskCleanup.class);
  161.       rJob = launchJob(jobConf, inDir, outDir, input);
  162.       rJob.waitForCompletion();
  163.       validateJob(rJob, mr);
  164.       fileSys.delete(outDir, true);
  165.       jobConf.setOutputCommitter(CommitterWithFailTaskCleanup2.class);
  166.       rJob = launchJob(jobConf, inDir, outDir, input);
  167.       rJob.waitForCompletion();
  168.       validateJob(rJob, mr);
  169.     } finally {
  170.       if (dfs != null) { dfs.shutdown(); }
  171.       if (mr != null) { mr.shutdown(); }
  172.     }
  173.   }
  174.   public static void main(String[] argv) throws Exception {
  175.     TestTaskFail td = new TestTaskFail();
  176.     td.testWithDFS();
  177.   }
  178. }