TestEmptyJobWithDFS.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:5k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.IOException;
  20. import junit.framework.TestCase;
  21. import org.apache.commons.logging.Log;
  22. import org.apache.commons.logging.LogFactory;
  23. import org.apache.hadoop.conf.Configuration;
  24. import org.apache.hadoop.hdfs.MiniDFSCluster;
  25. import org.apache.hadoop.fs.FileSystem;
  26. import org.apache.hadoop.fs.Path;
  27. import org.apache.hadoop.io.IntWritable;
  28. import org.apache.hadoop.io.Text;
  29. import org.apache.hadoop.mapred.lib.IdentityMapper;
  30. import org.apache.hadoop.mapred.lib.IdentityReducer;
  31. /**
  32.  * A JUnit test to test Map-Reduce empty jobs Mini-DFS.
  33.  */
  34. public class TestEmptyJobWithDFS extends TestCase {
  35.   private static final Log LOG =
  36.     LogFactory.getLog(TestEmptyJobWithDFS.class.getName());
  37.   
  38.   /**
  39.    * Simple method running a MapReduce job with no input data. Used
  40.    * to test that such a job is successful.
  41.    * @param fileSys
  42.    * @param jobTracker
  43.    * @param conf
  44.    * @param numMaps
  45.    * @param numReduces
  46.    * @return true if the MR job is successful, otherwise false
  47.    * @throws IOException
  48.    */
  49.   public static boolean launchEmptyJob(String fileSys,
  50.                                        String jobTracker,
  51.                                        JobConf conf,
  52.                                        int numMaps,
  53.                                        int numReduces) throws IOException {
  54.     // create an empty input dir
  55.     final Path inDir = new Path("/testing/empty/input");
  56.     final Path outDir = new Path("/testing/empty/output");
  57.     FileSystem fs = FileSystem.getNamed(fileSys, conf);
  58.     fs.delete(outDir, true);
  59.     if (!fs.mkdirs(inDir)) {
  60.       LOG.warn("Can't create " + inDir);
  61.       return false;
  62.     }
  63.     // use WordCount example
  64.     FileSystem.setDefaultUri(conf, fileSys);
  65.     conf.set("mapred.job.tracker", jobTracker);
  66.     conf.setJobName("empty");
  67.     // use an InputFormat which returns no split
  68.     conf.setInputFormat(EmptyInputFormat.class);
  69.     conf.setOutputKeyClass(Text.class);
  70.     conf.setOutputValueClass(IntWritable.class);
  71.     conf.setMapperClass(IdentityMapper.class);        
  72.     conf.setReducerClass(IdentityReducer.class);
  73.     FileInputFormat.setInputPaths(conf, inDir);
  74.     FileOutputFormat.setOutputPath(conf, outDir);
  75.     conf.setNumMapTasks(numMaps);
  76.     conf.setNumReduceTasks(numReduces);
  77.       
  78.     // run job and wait for completion
  79.     JobClient jc = new JobClient(conf);
  80.     RunningJob runningJob = jc.submitJob(conf);
  81.     while (true) {
  82.       try {
  83.         Thread.sleep(1000);
  84.       } catch (InterruptedException e) {}
  85.       if (runningJob.isComplete()) {
  86.         break;
  87.       }
  88.     }
  89.       
  90.     try {
  91.       assertTrue(runningJob.isComplete());
  92.       assertTrue(runningJob.isSuccessful());
  93.     } catch (NullPointerException npe) {
  94.       // This NPE should no more happens
  95.       fail("A NPE should not have happened.");
  96.     }
  97.           
  98.     // return job result
  99.     LOG.info("job is complete: " + runningJob.isSuccessful());
  100.     return (runningJob.isSuccessful());
  101.   }
  102.   
  103.   /**
  104.    * Test that a job with no input data (and thus with no input split and
  105.    * no map task to execute) is successful.
  106.    * @throws IOException
  107.    */
  108.   public void testEmptyJobWithDFS() throws IOException {
  109.     String namenode = null;
  110.     MiniDFSCluster dfs = null;
  111.     MiniMRCluster mr = null;
  112.     FileSystem fileSys = null;
  113.     try {
  114.       final int taskTrackers = 4;
  115.       final int jobTrackerPort = 60050;
  116.       Configuration conf = new Configuration();
  117.       dfs = new MiniDFSCluster(conf, 1, true, null);
  118.       fileSys = dfs.getFileSystem();
  119.       namenode = fileSys.getName();
  120.       mr = new MiniMRCluster(taskTrackers, namenode, 2);
  121.       final String jobTrackerName = "localhost:" + mr.getJobTrackerPort();
  122.       JobConf jobConf = new JobConf();
  123.       boolean result;
  124.       result = launchEmptyJob(namenode, jobTrackerName, jobConf, 
  125.                               3, 1);
  126.       assertTrue(result);
  127.           
  128.     } finally {
  129.       if (fileSys != null) { fileSys.close(); }
  130.       if (dfs != null) { dfs.shutdown(); }
  131.       if (mr != null) { mr.shutdown(); }
  132.     }
  133.   }
  134.   
  135. }