TestKillCompletedJob.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:4k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.*;
  20. import java.net.*;
  21. import junit.framework.TestCase;
  22. import org.apache.hadoop.conf.Configuration;
  23. import org.apache.hadoop.hdfs.MiniDFSCluster;
  24. import org.apache.hadoop.fs.FileSystem;
  25. import org.apache.hadoop.fs.Path;
  26. import org.apache.hadoop.io.IntWritable;
  27. import org.apache.hadoop.io.Text;
  28. /**
  29.  * A JUnit test to test that killing completed jobs does not move them
  30.  * to the failed sate - See JIRA HADOOP-2132
  31.  */
  32. public class TestKillCompletedJob extends TestCase {
  33.   
  34.   
  35.   static Boolean launchWordCount(String fileSys,
  36.                                 String jobTracker,
  37.                                 JobConf conf,
  38.                                 String input,
  39.                                 int numMaps,
  40.                                 int numReduces) throws IOException {
  41.     final Path inDir = new Path("/testing/wc/input");
  42.     final Path outDir = new Path("/testing/wc/output");
  43.     FileSystem fs = FileSystem.get(URI.create(fileSys), conf);
  44.     fs.delete(outDir, true);
  45.     if (!fs.mkdirs(inDir)) {
  46.       throw new IOException("Mkdirs failed to create " + inDir.toString());
  47.     }
  48.     {
  49.       DataOutputStream file = fs.create(new Path(inDir, "part-0"));
  50.       file.writeBytes(input);
  51.       file.close();
  52.     }
  53.     FileSystem.setDefaultUri(conf, fileSys);
  54.     conf.set("mapred.job.tracker", jobTracker);
  55.     conf.setJobName("wordcount");
  56.     conf.setInputFormat(TextInputFormat.class);
  57.     
  58.     // the keys are words (strings)
  59.     conf.setOutputKeyClass(Text.class);
  60.     // the values are counts (ints)
  61.     conf.setOutputValueClass(IntWritable.class);
  62.     
  63.     conf.setMapperClass(WordCount.MapClass.class);
  64.     conf.setCombinerClass(WordCount.Reduce.class);
  65.     conf.setReducerClass(WordCount.Reduce.class);
  66.     
  67.     FileInputFormat.setInputPaths(conf, inDir);
  68.     FileOutputFormat.setOutputPath(conf, outDir);
  69.     conf.setNumMapTasks(numMaps);
  70.     conf.setNumReduceTasks(numReduces);
  71.     RunningJob rj = JobClient.runJob(conf);
  72.     JobID jobId = rj.getID();
  73.     
  74.     // Kill the job after it is successful
  75.     if (rj.isSuccessful())
  76.     {
  77.       System.out.println("Job Id:" + jobId + 
  78.         " completed successfully. Killing it now");
  79.       rj.killJob();
  80.     }
  81.     
  82.        
  83.     return rj.isSuccessful();
  84.       
  85.   }
  86.      
  87.   public void testKillCompJob() throws IOException {
  88.     String namenode = null;
  89.     MiniDFSCluster dfs = null;
  90.     MiniMRCluster mr = null;
  91.     FileSystem fileSys = null;
  92.     try {
  93.       final int taskTrackers = 1;
  94.       Configuration conf = new Configuration();
  95.       dfs = new MiniDFSCluster(conf, 1, true, null);
  96.       fileSys = dfs.getFileSystem();
  97.       namenode = fileSys.getUri().toString();
  98.       mr = new MiniMRCluster(taskTrackers, namenode, 3);
  99.       JobConf jobConf = new JobConf();
  100.     
  101.       Boolean result;
  102.       final String jobTrackerName = "localhost:" + mr.getJobTrackerPort();
  103.       result = launchWordCount(namenode, jobTrackerName, jobConf, 
  104.                                "Small textn",
  105.                                1, 0);
  106.       assertTrue(result);
  107.           
  108.     } finally {
  109.       if (dfs != null) { dfs.shutdown(); }
  110.       if (mr != null) { mr.shutdown();
  111.       }
  112.     }
  113.   }
  114.   
  115. }