TestUlimit.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:4k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.streaming;
  19. import java.io.*;
  20. import org.apache.hadoop.conf.Configuration;
  21. import org.apache.hadoop.hdfs.MiniDFSCluster;
  22. import org.apache.hadoop.fs.FileSystem;
  23. import org.apache.hadoop.fs.Path;
  24. import org.apache.hadoop.mapred.MiniMRCluster;
  25. import org.apache.hadoop.mapred.TestMiniMRWithDFS;
  26. import junit.framework.TestCase;
  27. /**
  28.  * This tests the setting of memory limit for streaming processes.
  29.  * This will launch a streaming app which will allocate 10MB memory.
  30.  * First, program is launched with sufficient memory. And test expects
  31.  * it to succeed. Then program is launched with insufficient memory and 
  32.  * is expected to be a failure.  
  33.  */
  34. public class TestUlimit extends TestCase {
  35.   String input = "the dummy input";
  36.   Path inputPath = new Path("/testing/in");
  37.   Path outputPath = new Path("/testing/out");
  38.   String map = null;
  39.   MiniDFSCluster dfs = null;
  40.   MiniMRCluster mr = null;
  41.   FileSystem fs = null;
  42.   private static String SET_MEMORY_LIMIT = "786432"; // 768MB
  43.   String[] genArgs(String memLimit) {
  44.     return new String[] {
  45.       "-input", inputPath.toString(),
  46.       "-output", outputPath.toString(),
  47.       "-mapper", map,
  48.       "-reducer", "org.apache.hadoop.mapred.lib.IdentityReducer",
  49.       "-numReduceTasks", "0",
  50.       "-jobconf", "mapred.map.tasks=1",
  51.       "-jobconf", "mapred.child.ulimit=" + memLimit,
  52.       "-jobconf", "mapred.job.tracker=" + "localhost:" +
  53.                                            mr.getJobTrackerPort(),
  54.       "-jobconf", "fs.default.name=" + "hdfs://localhost:" 
  55.                    + dfs.getNameNodePort(),
  56.       "-jobconf", "stream.tmpdir=" + 
  57.                    System.getProperty("test.build.data","/tmp")
  58.     };
  59.   }
  60.   /**
  61.    * This tests the setting of memory limit for streaming processes.
  62.    * This will launch a streaming app which will allocate 10MB memory.
  63.    * First, program is launched with sufficient memory. And test expects
  64.    * it to succeed. Then program is launched with insufficient memory and 
  65.    * is expected to be a failure.  
  66.    */
  67.   public void testCommandLine() {
  68.     if (StreamUtil.isCygwin()) {
  69.       return;
  70.     }
  71.     try {
  72.       final int numSlaves = 2;
  73.       Configuration conf = new Configuration();
  74.       dfs = new MiniDFSCluster(conf, numSlaves, true, null);
  75.       fs = dfs.getFileSystem();
  76.       
  77.       mr = new MiniMRCluster(numSlaves, fs.getUri().toString(), 1);
  78.       writeInputFile(fs, inputPath);
  79.       map = StreamUtil.makeJavaCommand(UlimitApp.class, new String[]{});  
  80.       runProgram(SET_MEMORY_LIMIT);
  81.       fs.delete(outputPath, true);
  82.       assertFalse("output not cleaned up", fs.exists(outputPath));
  83.       mr.waitUntilIdle();
  84.     } catch(IOException e) {
  85.       fail(e.toString());
  86.     } finally {
  87.       mr.shutdown();
  88.       dfs.shutdown();
  89.     }
  90.   }
  91.   private void writeInputFile(FileSystem fs, Path dir) throws IOException {
  92.     DataOutputStream out = fs.create(new Path(dir, "part0"));
  93.     out.writeBytes(input);
  94.     out.close();
  95.   }
  96.   /**
  97.    * Runs the streaming program. and asserts the result of the program.
  98.    * @param memLimit memory limit to set for mapred child.
  99.    * @param result Expected result
  100.    * @throws IOException
  101.    */
  102.   private void runProgram(String memLimit) throws IOException {
  103.     boolean mayExit = false;
  104.     StreamJob job = new StreamJob(genArgs(memLimit), mayExit);
  105.     job.go();
  106.     String output = TestMiniMRWithDFS.readOutput(outputPath,
  107.                                         mr.createJobConf());
  108.     assertEquals("output is wrong", SET_MEMORY_LIMIT,
  109.                                     output.trim());
  110.   }
  111.   
  112.   public static void main(String[]args) throws Exception
  113.   {
  114.     new TestUlimit().testCommandLine();
  115.   }
  116. }