ClusterMapReduceTestCase.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:6k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import junit.framework.TestCase;
  20. import org.apache.hadoop.hdfs.MiniDFSCluster;
  21. import org.apache.hadoop.fs.FileSystem;
  22. import org.apache.hadoop.fs.Path;
  23. import java.io.IOException;
  24. import java.util.Map;
  25. import java.util.Properties;
  26. /**
  27.  * Test case to run a MapReduce job.
  28.  * <p/>
  29.  * It runs a 2 node cluster Hadoop with a 2 node DFS.
  30.  * <p/>
  31.  * The JobConf to use must be obtained via the creatJobConf() method.
  32.  * <p/>
  33.  * It creates a temporary directory -accessible via getTestRootDir()-
  34.  * for both input and output.
  35.  * <p/>
  36.  * The input directory is accesible via getInputDir() and the output
  37.  * directory via getOutputDir()
  38.  * <p/>
  39.  * The DFS filesystem is formated before the testcase starts and after it ends.
  40.  */
  41. public abstract class ClusterMapReduceTestCase extends TestCase {
  42.   private MiniDFSCluster dfsCluster = null;
  43.   private MiniMRCluster mrCluster = null;
  44.   /**
  45.    * Creates Hadoop Cluster and DFS before a test case is run.
  46.    *
  47.    * @throws Exception
  48.    */
  49.   protected void setUp() throws Exception {
  50.     super.setUp();
  51.     startCluster(true, null);
  52.   }
  53.   /**
  54.    * Starts the cluster within a testcase.
  55.    * <p/>
  56.    * Note that the cluster is already started when the testcase method
  57.    * is invoked. This method is useful if as part of the testcase the
  58.    * cluster has to be shutdown and restarted again.
  59.    * <p/>
  60.    * If the cluster is already running this method does nothing.
  61.    *
  62.    * @param reformatDFS indicates if DFS has to be reformated
  63.    * @param props configuration properties to inject to the mini cluster
  64.    * @throws Exception if the cluster could not be started
  65.    */
  66.   protected synchronized void startCluster(boolean reformatDFS, Properties props)
  67.           throws Exception {
  68.     if (dfsCluster == null) {
  69.       JobConf conf = new JobConf();
  70.       if (props != null) {
  71.         for (Map.Entry entry : props.entrySet()) {
  72.           conf.set((String) entry.getKey(), (String) entry.getValue());
  73.         }
  74.       }
  75.       dfsCluster = new MiniDFSCluster(conf, 2, reformatDFS, null);
  76.       ConfigurableMiniMRCluster.setConfiguration(props);
  77.       //noinspection deprecation
  78.       mrCluster = new ConfigurableMiniMRCluster(2, getFileSystem().getName(), 1);
  79.     }
  80.   }
  81.   private static class ConfigurableMiniMRCluster extends MiniMRCluster {
  82.     private static Properties config;
  83.     public static void setConfiguration(Properties props) {
  84.       config = props;
  85.     }
  86.     public ConfigurableMiniMRCluster(int numTaskTrackers, String namenode,
  87.                                      int numDir) throws Exception {
  88.       super(numTaskTrackers, namenode, numDir);
  89.     }
  90.     public JobConf createJobConf() {
  91.       JobConf conf = super.createJobConf();
  92.       if (config != null) {
  93.         for (Map.Entry entry : config.entrySet()) {
  94.           conf.set((String) entry.getKey(), (String) entry.getValue());
  95.         }
  96.       }
  97.       return conf;
  98.     }
  99.   }
  100.   /**
  101.    * Stops the cluster within a testcase.
  102.    * <p/>
  103.    * Note that the cluster is already started when the testcase method
  104.    * is invoked. This method is useful if as part of the testcase the
  105.    * cluster has to be shutdown.
  106.    * <p/>
  107.    * If the cluster is already stopped this method does nothing.
  108.    *
  109.    * @throws Exception if the cluster could not be stopped
  110.    */
  111.   protected void stopCluster() throws Exception {
  112.     if (mrCluster != null) {
  113.       mrCluster.shutdown();
  114.       mrCluster = null;
  115.     }
  116.     if (dfsCluster != null) {
  117.       dfsCluster.shutdown();
  118.       dfsCluster = null;
  119.     }
  120.   }
  121.   /**
  122.    * Destroys Hadoop Cluster and DFS after a test case is run.
  123.    *
  124.    * @throws Exception
  125.    */
  126.   protected void tearDown() throws Exception {
  127.     stopCluster();
  128.     super.tearDown();
  129.   }
  130.   /**
  131.    * Returns a preconfigured Filesystem instance for test cases to read and
  132.    * write files to it.
  133.    * <p/>
  134.    * TestCases should use this Filesystem instance.
  135.    *
  136.    * @return the filesystem used by Hadoop.
  137.    * @throws IOException 
  138.    */
  139.   protected FileSystem getFileSystem() throws IOException {
  140.     return dfsCluster.getFileSystem();
  141.   }
  142.   protected MiniMRCluster getMRCluster() {
  143.     return mrCluster;
  144.   }
  145.   /**
  146.    * Returns the path to the root directory for the testcase.
  147.    *
  148.    * @return path to the root directory for the testcase.
  149.    */
  150.   protected Path getTestRootDir() {
  151.     return new Path("x").getParent();
  152.   }
  153.   /**
  154.    * Returns a path to the input directory for the testcase.
  155.    *
  156.    * @return path to the input directory for the tescase.
  157.    */
  158.   protected Path getInputDir() {
  159.     return new Path("input");
  160.   }
  161.   /**
  162.    * Returns a path to the output directory for the testcase.
  163.    *
  164.    * @return path to the output directory for the tescase.
  165.    */
  166.   protected Path getOutputDir() {
  167.     return new Path("output");
  168.   }
  169.   /**
  170.    * Returns a job configuration preconfigured to run against the Hadoop
  171.    * managed by the testcase.
  172.    *
  173.    * @return configuration that works on the testcase Hadoop instance
  174.    */
  175.   protected JobConf createJobConf() {
  176.     return mrCluster.createJobConf();
  177.   }
  178. }