TestStreamDataProtocol.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:4k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.streaming;
  19. import junit.framework.TestCase;
  20. import java.io.*;
  21. import java.util.*;
  22. import org.apache.hadoop.conf.Configuration;
  23. import org.apache.hadoop.fs.FileSystem;
  24. import org.apache.hadoop.fs.Path;
  25. import org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner;
  26. /**
  27.  * This class tests hadoopStreaming in MapReduce local mode.
  28.  */
  29. public class TestStreamDataProtocol extends TestCase
  30. {
  31.   // "map" command: grep -E (red|green|blue)
  32.   // reduce command: uniq
  33.   protected File INPUT_FILE = new File("input_for_data_protocol_test.txt");
  34.   protected File OUTPUT_DIR = new File("out_for_data_protocol_test");
  35.   protected String input = "roses.smell.goodnroses.look.goodnroses.need.carenroses.attract.beesnroses.are.rednroses.are.not.bluenbunnies.are.pinknbunnies.run.fastnbunnies.have.short.tailnbunnies.have.long.earsn";
  36.   // map behaves like "/usr/bin/cat"; 
  37.   protected String map = StreamUtil.makeJavaCommand(TrApp.class, new String[]{".", "."});
  38.   // reduce counts the number of values for each key
  39.   protected String reduce = "org.apache.hadoop.streaming.ValueCountReduce";
  40.   protected String outputExpect = "bunnies.aret1nbunnies.havet2nbunnies.runt1nroses.aret2nroses.attractt1nroses.lookt1nroses.needt1nroses.smellt1n";
  41.   private StreamJob job;
  42.   public TestStreamDataProtocol() throws IOException
  43.   {
  44.     UtilTest utilTest = new UtilTest(getClass().getName());
  45.     utilTest.checkUserDir();
  46.     utilTest.redirectIfAntJunit();
  47.   }
  48.   protected void createInput() throws IOException
  49.   {
  50.     DataOutputStream out = new DataOutputStream(
  51.                                                 new FileOutputStream(INPUT_FILE.getAbsoluteFile()));
  52.     out.write(input.getBytes("UTF-8"));
  53.     out.close();
  54.   }
  55.   protected String[] genArgs() {
  56.     return new String[] {
  57.       "-input", INPUT_FILE.getAbsolutePath(),
  58.       "-output", OUTPUT_DIR.getAbsolutePath(),
  59.       "-mapper", map,
  60.       "-reducer", reduce,
  61.       "-partitioner", KeyFieldBasedPartitioner.class.getCanonicalName(),
  62.       //"-verbose",
  63.       "-jobconf", "stream.map.output.field.separator=.",
  64.       "-jobconf", "stream.num.map.output.key.fields=2",
  65.       "-jobconf", "map.output.key.field.separator=.",
  66.       "-jobconf", "num.key.fields.for.partition=1",
  67.       "-jobconf", "mapred.reduce.tasks=2",
  68.       "-jobconf", "keep.failed.task.files=true",
  69.       "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp")
  70.     };
  71.   }
  72.   
  73.   public void testCommandLine()
  74.   {
  75.     try {
  76.       try {
  77.         OUTPUT_DIR.getAbsoluteFile().delete();
  78.       } catch (Exception e) {
  79.       }
  80.       createInput();
  81.       boolean mayExit = false;
  82.       // During tests, the default Configuration will use a local mapred
  83.       // So don't specify -config or -cluster
  84.       job = new StreamJob(genArgs(), mayExit);      
  85.       job.go();
  86.       File outFile = new File(OUTPUT_DIR, "part-00000").getAbsoluteFile();
  87.       String output = StreamUtil.slurp(outFile);
  88.       outFile.delete();
  89.       System.err.println("outEx1=" + outputExpect);
  90.       System.err.println("  out1=" + output);
  91.       System.err.println("  equals=" + outputExpect.compareTo(output));
  92.       assertEquals(outputExpect, output);
  93.     } catch(Exception e) {
  94.       failTrace(e);
  95.     } finally {
  96.       File outFileCRC = new File(OUTPUT_DIR, ".part-00000.crc").getAbsoluteFile();
  97.       INPUT_FILE.delete();
  98.       outFileCRC.delete();
  99.       OUTPUT_DIR.getAbsoluteFile().delete();
  100.     }
  101.   }
  102.   private void failTrace(Exception e)
  103.   {
  104.     StringWriter sw = new StringWriter();
  105.     e.printStackTrace(new PrintWriter(sw));
  106.     fail(sw.toString());
  107.   }
  108.   public static void main(String[]args) throws Exception
  109.   {
  110.     new TestStreamDataProtocol().testCommandLine();
  111.   }
  112. }