TestJobClient.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:4k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.BufferedReader;
  20. import java.io.ByteArrayOutputStream;
  21. import java.io.IOException;
  22. import java.io.InputStreamReader;
  23. import java.io.OutputStream;
  24. import java.io.OutputStreamWriter;
  25. import java.io.PipedInputStream;
  26. import java.io.PipedOutputStream;
  27. import java.io.PrintStream;
  28. import java.io.Writer;
  29. import org.apache.commons.logging.Log;
  30. import org.apache.commons.logging.LogFactory;
  31. import org.apache.hadoop.conf.Configuration;
  32. import org.apache.hadoop.fs.Path;
  33. import org.apache.hadoop.io.LongWritable;
  34. import org.apache.hadoop.io.Text;
  35. import org.apache.hadoop.util.Tool;
  36. import org.apache.hadoop.util.ToolRunner;
  37. public class TestJobClient extends ClusterMapReduceTestCase {
  38.   
  39.   private static final Log LOG = LogFactory.getLog(TestJobClient.class);
  40.   
  41.   private String runJob() throws Exception {
  42.     OutputStream os = getFileSystem().create(new Path(getInputDir(), "text.txt"));
  43.     Writer wr = new OutputStreamWriter(os);
  44.     wr.write("hello1n");
  45.     wr.write("hello2n");
  46.     wr.write("hello3n");
  47.     wr.close();
  48.     JobConf conf = createJobConf();
  49.     conf.setJobName("mr");
  50.     conf.setJobPriority(JobPriority.HIGH);
  51.     
  52.     conf.setInputFormat(TextInputFormat.class);
  53.     conf.setMapOutputKeyClass(LongWritable.class);
  54.     conf.setMapOutputValueClass(Text.class);
  55.     conf.setOutputFormat(TextOutputFormat.class);
  56.     conf.setOutputKeyClass(LongWritable.class);
  57.     conf.setOutputValueClass(Text.class);
  58.     conf.setMapperClass(org.apache.hadoop.mapred.lib.IdentityMapper.class);
  59.     conf.setReducerClass(org.apache.hadoop.mapred.lib.IdentityReducer.class);
  60.     FileInputFormat.setInputPaths(conf, getInputDir());
  61.     FileOutputFormat.setOutputPath(conf, getOutputDir());
  62.     return JobClient.runJob(conf).getID().toString();
  63.   }
  64.   
  65.   private int runTool(Configuration conf, Tool tool, String[] args, OutputStream out) throws Exception {
  66.     PrintStream oldOut = System.out;
  67.     PrintStream newOut = new PrintStream(out, true);
  68.     try {
  69.       System.setOut(newOut);
  70.       return ToolRunner.run(conf, tool, args);
  71.     } finally {
  72.       System.setOut(oldOut);
  73.     }
  74.   }
  75.   public void testGetCounter() throws Exception {
  76.     String jobId = runJob();
  77.     ByteArrayOutputStream out = new ByteArrayOutputStream();
  78.     int exitCode = runTool(createJobConf(), new JobClient(),
  79.         new String[] { "-counter", jobId,
  80.         "org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS" },
  81.         out);
  82.     assertEquals("Exit code", 0, exitCode);
  83.     assertEquals("Counter", "3", out.toString().trim());
  84.   }
  85.   public void testJobList() throws Exception {
  86.     String jobId = runJob();
  87.     verifyJobPriority(jobId, "HIGH");
  88.   }
  89.   private void verifyJobPriority(String jobId, String priority)
  90.                             throws Exception {
  91.     PipedInputStream pis = new PipedInputStream();
  92.     PipedOutputStream pos = new PipedOutputStream(pis);
  93.     int exitCode = runTool(createJobConf(), new JobClient(),
  94.         new String[] { "-list", "all" },
  95.         pos);
  96.     assertEquals("Exit code", 0, exitCode);
  97.     BufferedReader br = new BufferedReader(new InputStreamReader(pis));
  98.     String line = null;
  99.     while ((line=br.readLine()) != null) {
  100.       LOG.info("line = " + line);
  101.       if (!line.startsWith(jobId)) {
  102.         continue;
  103.       }
  104.       assertTrue(line.contains(priority));
  105.       break;
  106.     }
  107.     pis.close();
  108.   }
  109.   
  110.   public void testChangingJobPriority() throws Exception {
  111.     String jobId = runJob();
  112.     int exitCode = runTool(createJobConf(), new JobClient(),
  113.         new String[] { "-set-priority", jobId, "VERY_LOW" },
  114.         new ByteArrayOutputStream());
  115.     assertEquals("Exit code", 0, exitCode);
  116.     verifyJobPriority(jobId, "VERY_LOW");
  117.   }
  118. }