TestStreamingStderr.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:4k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.streaming;
  19. import junit.framework.TestCase;
  20. import java.io.*;
  21. import java.util.*;
  22. import org.apache.hadoop.conf.Configuration;
  23. import org.apache.hadoop.fs.FileSystem;
  24. import org.apache.hadoop.fs.Path;
  25. /**
  26.  * Test that streaming consumes stderr from the streaming process
  27.  * (before, during, and after the main processing of mapred input),
  28.  * and that stderr messages count as task progress.
  29.  */
  30. public class TestStreamingStderr extends TestCase
  31. {
  32.   public TestStreamingStderr() throws IOException {
  33.     UtilTest utilTest = new UtilTest(getClass().getName());
  34.     utilTest.checkUserDir();
  35.     utilTest.redirectIfAntJunit();
  36.   }
  37.   protected String[] genArgs(File input, File output, int preLines, int duringLines, int postLines) {
  38.     return new String[] {
  39.       "-input", input.getAbsolutePath(),
  40.       "-output", output.getAbsolutePath(),
  41.       "-mapper", StreamUtil.makeJavaCommand(StderrApp.class,
  42.                                             new String[]{Integer.toString(preLines),
  43.                                                          Integer.toString(duringLines),
  44.                                                          Integer.toString(postLines)}),
  45.       "-reducer", StreamJob.REDUCE_NONE,
  46.       "-jobconf", "keep.failed.task.files=true",
  47.       "-jobconf", "mapred.task.timeout=5000",
  48.       "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp")
  49.     };
  50.   }
  51.   protected File setupInput(String base, boolean hasInput) throws IOException {
  52.     File input = new File(base + "-input.txt");
  53.     UtilTest.recursiveDelete(input);
  54.     FileOutputStream in = new FileOutputStream(input.getAbsoluteFile());
  55.     if (hasInput) {
  56.       in.write("hellon".getBytes());      
  57.     }
  58.     in.close();
  59.     return input;
  60.   }
  61.   
  62.   protected File setupOutput(String base) throws IOException {
  63.     File output = new File(base + "-out");
  64.     UtilTest.recursiveDelete(output);
  65.     return output;
  66.   }
  67.   public void runStreamJob(String baseName, boolean hasInput,
  68.                            int preLines, int duringLines, int postLines) {
  69.     try {
  70.       File input = setupInput(baseName, hasInput);
  71.       File output = setupOutput(baseName);
  72.       boolean mayExit = false;
  73.       int returnStatus = 0;
  74.       StreamJob job = new StreamJob(genArgs(input, output, preLines, duringLines, postLines), mayExit);
  75.       returnStatus = job.go();
  76.       assertEquals("StreamJob success", 0, returnStatus);
  77.     } catch (Exception e) {
  78.       failTrace(e);
  79.     }
  80.   }
  81.   // This test will fail by blocking forever if the stderr isn't
  82.   // consumed by Hadoop for tasks that don't have any input.
  83.   public void testStderrNoInput() throws IOException {
  84.     runStreamJob("stderr-pre", false, 10000, 0, 0);
  85.   }
  86.   // Streaming should continue to read stderr even after all input has
  87.   // been consumed.
  88.   public void testStderrAfterOutput() throws IOException {
  89.     runStreamJob("stderr-post", false, 0, 0, 10000);
  90.   }
  91.   // This test should produce a task timeout if stderr lines aren't
  92.   // counted as progress. This won't actually work until
  93.   // LocalJobRunner supports timeouts.
  94.   public void testStderrCountsAsProgress() throws IOException {
  95.     runStreamJob("stderr-progress", true, 10, 1000, 0);
  96.   }
  97.   
  98.   protected void failTrace(Exception e) {
  99.     StringWriter sw = new StringWriter();
  100.     e.printStackTrace(new PrintWriter(sw));
  101.     fail(sw.toString());
  102.   }
  103. }