TestMiniMRMapRedDebugScript.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:8k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.DataOutputStream;
  20. import java.io.IOException;
  21. import java.io.InputStream;
  22. import java.net.URI;
  23. import junit.framework.TestCase;
  24. import org.apache.commons.logging.Log;
  25. import org.apache.commons.logging.LogFactory;
  26. import org.apache.hadoop.conf.Configuration;
  27. import org.apache.hadoop.hdfs.MiniDFSCluster;
  28. import org.apache.hadoop.filecache.DistributedCache;
  29. import org.apache.hadoop.fs.FileSystem;
  30. import org.apache.hadoop.fs.Path;
  31. import org.apache.hadoop.io.IntWritable;
  32. import org.apache.hadoop.io.LongWritable;
  33. import org.apache.hadoop.io.Text;
  34. import org.apache.hadoop.mapred.lib.IdentityReducer;
  35. /**
  36.  * Class to test mapred debug Script
  37.  */
  38. public class TestMiniMRMapRedDebugScript extends TestCase {
  39.   private static final Log LOG =
  40.     LogFactory.getLog(TestMiniMRMapRedDebugScript.class.getName());
  41.   private MiniMRCluster mr;
  42.   private MiniDFSCluster dfs;
  43.   private FileSystem fileSys;
  44.   
  45.   /**
  46.    * Fail map class 
  47.    */
  48.   public static class MapClass extends MapReduceBase
  49.   implements Mapper<LongWritable, Text, Text, IntWritable> {
  50.      public void map (LongWritable key, Text value, 
  51.                      OutputCollector<Text, IntWritable> output, 
  52.                      Reporter reporter) throws IOException {
  53.        System.err.println("Bailing out");
  54.        throw new IOException();
  55.      }
  56.   }
  57.   /**
  58.    * Reads tasklog and returns it as string after trimming it.
  59.    * @param filter Task log filter; can be STDOUT, STDERR,
  60.    *                SYSLOG, DEBUGOUT, DEBUGERR
  61.    * @param taskId The task id for which the log has to collected
  62.    * @param isCleanup whether the task is a cleanup attempt or not.
  63.    * @return task log as string
  64.    * @throws IOException
  65.    */
  66.   public static String readTaskLog(TaskLog.LogName  filter, 
  67.                                    TaskAttemptID taskId, 
  68.                                    boolean isCleanup)
  69.   throws IOException {
  70.     // string buffer to store task log
  71.     StringBuffer result = new StringBuffer();
  72.     int res;
  73.     // reads the whole tasklog into inputstream
  74.     InputStream taskLogReader = new TaskLog.Reader(taskId, filter, 0, -1, isCleanup);
  75.     // construct string log from inputstream.
  76.     byte[] b = new byte[65536];
  77.     while (true) {
  78.       res = taskLogReader.read(b);
  79.       if (res > 0) {
  80.         result.append(new String(b));
  81.       } else {
  82.         break;
  83.       }
  84.     }
  85.     taskLogReader.close();
  86.     
  87.     // trim the string and return it
  88.     String str = result.toString();
  89.     str = str.trim();
  90.     return str;
  91.   }
  92.   /**
  93.    * Launches failed map task and debugs the failed task
  94.    * @param conf configuration for the mapred job
  95.    * @param inDir input path
  96.    * @param outDir output path
  97.    * @param debugDir debug directory where script is present
  98.    * @param debugCommand The command to execute script
  99.    * @param input Input text
  100.    * @return the output of debug script 
  101.    * @throws IOException
  102.    */
  103.   public String launchFailMapAndDebug(JobConf conf,
  104.                                       Path inDir,
  105.                                       Path outDir,
  106.                                       Path debugDir,
  107.                                       String debugScript,
  108.                                       String input)
  109.   throws IOException {
  110.     // set up the input file system and write input text.
  111.     FileSystem inFs = inDir.getFileSystem(conf);
  112.     FileSystem outFs = outDir.getFileSystem(conf);
  113.     outFs.delete(outDir, true);
  114.     if (!inFs.mkdirs(inDir)) {
  115.       throw new IOException("Mkdirs failed to create " + inDir.toString());
  116.     }
  117.     {
  118.       // write input into input file
  119.       DataOutputStream file = inFs.create(new Path(inDir, "part-0"));
  120.       file.writeBytes(input);
  121.       file.close();
  122.     }
  123.     // configure the mapred Job for failing map task.
  124.     conf.setJobName("failmap");
  125.     conf.setMapperClass(MapClass.class);        
  126.     conf.setReducerClass(IdentityReducer.class);
  127.     conf.setNumMapTasks(1);
  128.     conf.setNumReduceTasks(0);
  129.     conf.setMapDebugScript(debugScript);
  130.     FileInputFormat.setInputPaths(conf, inDir);
  131.     FileOutputFormat.setOutputPath(conf, outDir);
  132.     String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data",
  133.                                       "/tmp")).toString().replace(' ', '+');
  134.     conf.set("test.build.data", TEST_ROOT_DIR);
  135.     // copy debug script to cache from local file system.
  136.     FileSystem debugFs = debugDir.getFileSystem(conf);
  137.     Path scriptPath = new Path(debugDir,"testscript.txt");
  138.     Path cachePath = new Path("/cacheDir");
  139.     if (!debugFs.mkdirs(cachePath)) {
  140.       throw new IOException("Mkdirs failed to create " + cachePath.toString());
  141.     }
  142.     debugFs.copyFromLocalFile(scriptPath,cachePath);
  143.     
  144.     URI uri = debugFs.getUri().resolve(cachePath+"/testscript.txt#testscript");
  145.     DistributedCache.createSymlink(conf);
  146.     DistributedCache.addCacheFile(uri, conf);
  147.     RunningJob job =null;
  148.     // run the job. It will fail with IOException.
  149.     try {
  150.       job = new JobClient(conf).submitJob(conf);
  151.     } catch (IOException e) {
  152.      LOG.info("Running Job failed", e);
  153.     }
  154.     JobID jobId = job.getID();
  155.     // construct the task id of first map task of failmap
  156.     TaskAttemptID taskId = new TaskAttemptID(new TaskID(jobId,true, 0), 0);
  157.     // wait for the job to finish.
  158.     while (!job.isComplete()) ;
  159.     
  160.     // return the output of debugout log.
  161.     return readTaskLog(TaskLog.LogName.DEBUGOUT,taskId, false);
  162.   }
  163.   /**
  164.    * Tests Map task's debug script
  165.    * 
  166.    * In this test, we launch a mapreduce program which 
  167.    * writes 'Bailing out' to stderr and throws an exception.
  168.    * We will run the script when tsk fails and validate 
  169.    * the output of debug out log. 
  170.    *
  171.    */
  172.   public void testMapDebugScript() throws Exception {
  173.     try {
  174.       
  175.       // create configuration, dfs, file system and mapred cluster 
  176.       Configuration cnf = new Configuration();
  177.       dfs = new MiniDFSCluster(cnf, 1, true, null);
  178.       fileSys = dfs.getFileSystem();
  179.       mr = new MiniMRCluster(2, fileSys.getName(), 1);
  180.       JobConf conf = mr.createJobConf();
  181.       
  182.       // intialize input, output and debug directories
  183.       final Path debugDir = new Path("build/test/debug");
  184.       Path inDir = new Path("testing/wc/input");
  185.       Path outDir = new Path("testing/wc/output");
  186.       
  187.       // initialize debug command and input text
  188.       String debugScript = "./testscript";
  189.       String input = "The input";
  190.       
  191.       // Launch failed map task and run debug script
  192.       String result = launchFailMapAndDebug(conf,inDir, 
  193.                                outDir,debugDir, debugScript, input);
  194.       
  195.       // Assert the output of debug script.
  196.       assertEquals("Test ScriptnBailing out", result);
  197.     } finally {  
  198.       // close file system and shut down dfs and mapred cluster
  199.       try {
  200.         if (fileSys != null) {
  201.           fileSys.close();
  202.         }
  203.         if (dfs != null) {
  204.           dfs.shutdown();
  205.         }
  206.         if (mr != null) {
  207.           mr.shutdown();
  208.         }
  209.       } catch (IOException ioe) {
  210.         LOG.info("IO exception in closing file system:"+ioe.getMessage(), ioe);
  211.       }
  212.     }
  213.   }
  214.   public static void main(String args[]){
  215.     TestMiniMRMapRedDebugScript tmds = new TestMiniMRMapRedDebugScript();
  216.     try {
  217.       tmds.testMapDebugScript();
  218.     } catch (Exception e) {
  219.       LOG.error("Exception in test: "+e.getMessage(), e);
  220.     }
  221.   }
  222.   
  223. }