ClusterWithCapacityScheduler.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:8k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.File;
  20. import java.io.IOException;
  21. import java.io.OutputStream;
  22. import java.net.MalformedURLException;
  23. import java.net.URL;
  24. import java.util.Enumeration;
  25. import java.util.Properties;
  26. import junit.framework.TestCase;
  27. import org.apache.commons.logging.Log;
  28. import org.apache.commons.logging.LogFactory;
  29. import org.apache.hadoop.conf.Configuration;
  30. import org.apache.hadoop.fs.FileSystem;
  31. import org.apache.hadoop.fs.LocalFileSystem;
  32. import org.apache.hadoop.fs.Path;
  33. import org.apache.hadoop.hdfs.MiniDFSCluster;
  34. /**
  35.  * A test-cluster based on {@link MiniMRCluster} that is started with
  36.  * CapacityTaskScheduler. It provides knobs to configure both the cluster as
  37.  * well as the scheduler. Any test that intends to test capacity-scheduler
  38.  * should extend this.
  39.  * 
  40.  */
  41. public class ClusterWithCapacityScheduler extends TestCase {
  42.   static final Log LOG = LogFactory.getLog(ClusterWithCapacityScheduler.class);
  43.   private MiniMRCluster mrCluster;
  44.   private MiniDFSCluster dfsCluster;
  45.   private JobConf jobConf;
  46.   static final String MY_SCHEDULER_CONF_PATH_PROPERTY = "my.resource.path";
  47.   protected void startCluster()
  48.       throws IOException {
  49.     startCluster(null, null);
  50.   }
  51.   /**
  52.    * Start the cluster with two TaskTrackers and two DataNodes and configure the
  53.    * cluster with clusterProperties and the scheduler with schedulerProperties.
  54.    * Uses default configuration whenever user provided properties are missing
  55.    * (null/empty)
  56.    * 
  57.    * @param clusterProperties
  58.    * @param schedulerProperties
  59.    * @throws IOException
  60.    */
  61.   protected void startCluster(Properties clusterProperties,
  62.       Properties schedulerProperties)
  63.       throws IOException {
  64.     startCluster(2, 2, clusterProperties, schedulerProperties);
  65.   }
  66.   /**
  67.    * Start the cluster with numTaskTrackers TaskTrackers and numDataNodes
  68.    * DataNodes and configure the cluster with clusterProperties and the
  69.    * scheduler with schedulerProperties. Uses default configuration whenever
  70.    * user provided properties are missing (null/empty)
  71.    * 
  72.    * @param numTaskTrackers
  73.    * @param numDataNodes
  74.    * @param clusterProperties
  75.    * @param schedulerProperties
  76.    * @throws IOException
  77.    */
  78.   protected void startCluster(int numTaskTrackers, int numDataNodes,
  79.       Properties clusterProperties, Properties schedulerProperties)
  80.       throws IOException {
  81.     Thread.currentThread().setContextClassLoader(
  82.         new ClusterWithCapacityScheduler.MyClassLoader());
  83.     JobConf clusterConf = new JobConf();
  84.     if (clusterProperties != null) {
  85.       for (Enumeration<?> e = clusterProperties.propertyNames(); e
  86.           .hasMoreElements();) {
  87.         String key = (String) e.nextElement();
  88.         clusterConf.set(key, (String) clusterProperties.get(key));
  89.       }
  90.     }
  91.     dfsCluster = new MiniDFSCluster(clusterConf, numDataNodes, true, null);
  92.     if (schedulerProperties != null) {
  93.       setUpSchedulerConfigFile(schedulerProperties);
  94.     }
  95.     clusterConf.set("mapred.jobtracker.taskScheduler",
  96.         CapacityTaskScheduler.class.getName());
  97.     mrCluster =
  98.         new MiniMRCluster(numTaskTrackers, dfsCluster.getFileSystem().getUri()
  99.             .toString(), 1, null, null, clusterConf);
  100.     this.jobConf = mrCluster.createJobConf(clusterConf);
  101.   }
  102.   private void setUpSchedulerConfigFile(Properties schedulerConfProps)
  103.       throws IOException {
  104.     Configuration config = new Configuration(false);
  105.     LocalFileSystem fs = FileSystem.getLocal(config);
  106.     String myResourcePath = System.getProperty("test.build.data");
  107.     Path schedulerConfigFilePath =
  108.         new Path(myResourcePath, CapacitySchedulerConf.SCHEDULER_CONF_FILE);
  109.     OutputStream out = fs.create(schedulerConfigFilePath);
  110.     for (Enumeration<?> e = schedulerConfProps.propertyNames(); e
  111.         .hasMoreElements();) {
  112.       String key = (String) e.nextElement();
  113.       LOG.debug("Adding " + key + schedulerConfProps.getProperty(key));
  114.       config.set(key, schedulerConfProps.getProperty(key));
  115.     }
  116.     config.writeXml(out);
  117.     out.close();
  118.     LOG.info("setting resource path where capacity-scheduler's config file "
  119.         + "is placed to " + myResourcePath);
  120.     System.setProperty(MY_SCHEDULER_CONF_PATH_PROPERTY, myResourcePath);
  121.   }
  122.   private void cleanUpSchedulerConfigFile() throws IOException {
  123.     Configuration config = new Configuration(false);
  124.     LocalFileSystem fs = FileSystem.getLocal(config);
  125.     String myResourcePath = System.getProperty("test.build.data");
  126.     Path schedulerConfigFilePath =
  127.         new Path(myResourcePath, CapacitySchedulerConf.SCHEDULER_CONF_FILE);
  128.     fs.delete(schedulerConfigFilePath, false);
  129.   }
  130.   protected JobConf getJobConf() {
  131.     return this.jobConf;
  132.   }
  133.   protected JobTracker getJobTracker() {
  134.     return this.mrCluster.getJobTrackerRunner().getJobTracker();
  135.   }
  136.   @Override
  137.   protected void tearDown()
  138.       throws Exception {
  139.     cleanUpSchedulerConfigFile();
  140.     
  141.     if (mrCluster != null) {
  142.       mrCluster.shutdown();
  143.     }
  144.     if (dfsCluster != null) {
  145.       dfsCluster.shutdown();
  146.     }
  147.   }
  148.   /**
  149.    * Wait till all the slots in the cluster are occupied with respect to the
  150.    * tasks of type specified isMap.
  151.    * 
  152.    * <p>
  153.    * 
  154.    * <b>Also, it is assumed that the tasks won't finish any time soon, like in
  155.    * the case of tasks of {@link ControlledMapReduceJob}</b>.
  156.    * 
  157.    * @param isMap
  158.    */
  159.   protected void waitTillAllSlotsAreOccupied(boolean isMap)
  160.       throws InterruptedException {
  161.     JobTracker jt = this.mrCluster.getJobTrackerRunner().getJobTracker();
  162.     ClusterStatus clusterStatus = jt.getClusterStatus();
  163.     int currentTasks =
  164.         (isMap ? clusterStatus.getMapTasks() : clusterStatus.getReduceTasks());
  165.     int maxTasks =
  166.         (isMap ? clusterStatus.getMaxMapTasks() : clusterStatus
  167.             .getMaxReduceTasks());
  168.     while (currentTasks != maxTasks) {
  169.       Thread.sleep(1000);
  170.       clusterStatus = jt.getClusterStatus();
  171.       currentTasks =
  172.           (isMap ? clusterStatus.getMapTasks() : clusterStatus
  173.               .getReduceTasks());
  174.       maxTasks =
  175.           (isMap ? clusterStatus.getMaxMapTasks() : clusterStatus
  176.               .getMaxReduceTasks());
  177.       LOG.info("Waiting till cluster reaches steady state. currentTasks : "
  178.           + currentTasks + " total cluster capacity : " + maxTasks);
  179.     }
  180.   }
  181.   static class MyClassLoader extends ClassLoader {
  182.     @Override
  183.     public URL getResource(String name) {
  184.       if (!name.equals(CapacitySchedulerConf.SCHEDULER_CONF_FILE)) {
  185.         return super.getResource(name);
  186.       }
  187.       return findResource(name);
  188.     }
  189.     @Override
  190.     protected URL findResource(String name) {
  191.       try {
  192.         String resourcePath =
  193.             System
  194.                 .getProperty(ClusterWithCapacityScheduler.MY_SCHEDULER_CONF_PATH_PROPERTY);
  195.         // Check the resourcePath directory
  196.         File file = new File(resourcePath, name);
  197.         if (file.exists()) {
  198.           return new URL("file://" + file.getAbsolutePath());
  199.         }
  200.       } catch (MalformedURLException mue) {
  201.         LOG.warn("exception : " + mue);
  202.       }
  203.       return super.findResource(name);
  204.     }
  205.   }
  206. }