CapacitySchedulerConf.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:16k
源码类别:

网格计算

开发平台:

Java

  1. /** Licensed to the Apache Software Foundation (ASF) under one
  2.  * or more contributor license agreements.  See the NOTICE file
  3.  * distributed with this work for additional information
  4.  * regarding copyright ownership.  The ASF licenses this file
  5.  * to you under the Apache License, Version 2.0 (the
  6.  * "License"); you may not use this file except in compliance
  7.  * with the License.  You may obtain a copy of the License at
  8.  *
  9.  *     http://www.apache.org/licenses/LICENSE-2.0
  10.  *
  11.  * Unless required by applicable law or agreed to in writing, software
  12.  * distributed under the License is distributed on an "AS IS" BASIS,
  13.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14.  * See the License for the specific language governing permissions and
  15.  * limitations under the License.
  16.  */
  17. package org.apache.hadoop.mapred;
  18. import org.apache.hadoop.conf.Configuration;
  19. import org.apache.hadoop.fs.Path;
  20. /**
  21.  * Class providing access to resource manager configuration.
  22.  * 
  23.  * Resource manager configuration involves setting up queues, and defining
  24.  * various properties for the queues. These are typically read from a file 
  25.  * called capacity-scheduler.xml that must be in the classpath of the
  26.  * application. The class provides APIs to get/set and reload the 
  27.  * configuration for the queues.
  28.  */
  29. class CapacitySchedulerConf {
  30.   
  31.   /** Default file name from which the resource manager configuration is read. */ 
  32.   public static final String SCHEDULER_CONF_FILE = "capacity-scheduler.xml";
  33.   
  34.   private int defaultReclaimTime;
  35.   
  36.   private int defaultUlimitMinimum;
  37.   
  38.   private boolean defaultSupportPriority;
  39.   
  40.   private static final String QUEUE_CONF_PROPERTY_NAME_PREFIX = 
  41.     "mapred.capacity-scheduler.queue.";
  42.   /**
  43.    * If {@link JobConf#MAPRED_TASK_MAXPMEM_PROPERTY} is set to
  44.    * {@link JobConf#DISABLED_MEMORY_LIMIT}, this configuration will be used to
  45.    * calculate job's physical memory requirements as a percentage of the job's
  46.    * virtual memory requirements set via
  47.    * {@link JobConf#setMaxVirtualMemoryForTask()}. This property thus provides
  48.    * default value of physical memory for job's that don't explicitly specify
  49.    * physical memory requirements.
  50.    * 
  51.    * It defaults to {@link JobConf#DISABLED_MEMORY_LIMIT} and if not explicitly
  52.    * set to a valid value, scheduler will not consider physical memory for
  53.    * scheduling even if virtual memory based scheduling is enabled.
  54.    */
  55.   static String DEFAULT_PERCENTAGE_OF_PMEM_IN_VMEM_PROPERTY =
  56.       "mapred.capacity-scheduler.task.default-pmem-percentage-in-vmem";
  57.   /**
  58.    * Configuration that provides an upper limit on the maximum physical memory
  59.    * that can be specified by a job. The job configuration
  60.    * {@link JobConf#MAPRED_TASK_MAXPMEM_PROPERTY} should,
  61.    * by definition, be less than this value. If not, the job will be rejected
  62.    * by the scheduler. If it is set to {@link JobConf#DISABLED_MEMORY_LIMIT},
  63.    * scheduler will not consider physical memory for scheduling even if virtual
  64.    * memory based scheduling is enabled.
  65.    */
  66.   static final String UPPER_LIMIT_ON_TASK_PMEM_PROPERTY =
  67.       "mapred.capacity-scheduler.task.limit.maxpmem";
  68.   /**
  69.    * The constant which defines the default initialization thread
  70.    * polling interval, denoted in milliseconds.
  71.    */
  72.   private static final int INITIALIZATION_THREAD_POLLING_INTERVAL = 5000;
  73.   /**
  74.    * The constant which defines the maximum number of worker threads to be
  75.    * spawned off for job initialization
  76.    */
  77.   private static final int MAX_INITIALIZATION_WORKER_THREADS = 5;
  78.   private Configuration rmConf;
  79.   private int defaultMaxJobsPerUsersToInitialize;
  80.   
  81.   /**
  82.    * Create a new ResourceManagerConf.
  83.    * This method reads from the default configuration file mentioned in
  84.    * {@link RM_CONF_FILE}, that must be present in the classpath of the
  85.    * application.
  86.    */
  87.   public CapacitySchedulerConf() {
  88.     rmConf = new Configuration(false);
  89.     rmConf.addResource(SCHEDULER_CONF_FILE);
  90.     initializeDefaults();
  91.   }
  92.   /**
  93.    * Create a new ResourceManagerConf reading the specified configuration
  94.    * file.
  95.    * 
  96.    * @param configFile {@link Path} to the configuration file containing
  97.    * the resource manager configuration.
  98.    */
  99.   public CapacitySchedulerConf(Path configFile) {
  100.     rmConf = new Configuration(false);
  101.     rmConf.addResource(configFile);
  102.     initializeDefaults();
  103.   }
  104.   
  105.   /*
  106.    * Method used to initialize the default values and the queue list
  107.    * which is used by the Capacity Scheduler.
  108.    */
  109.   private void initializeDefaults() {
  110.     defaultReclaimTime = rmConf.getInt(
  111.         "mapred.capacity-scheduler.default-reclaim-time-limit",300);
  112.     defaultUlimitMinimum = rmConf.getInt(
  113.         "mapred.capacity-scheduler.default-minimum-user-limit-percent", 100);
  114.     defaultSupportPriority = rmConf.getBoolean(
  115.         "mapred.capacity-scheduler.default-supports-priority", false);
  116.     defaultMaxJobsPerUsersToInitialize = rmConf.getInt(
  117.         "mapred.capacity-scheduler.default-maximum-initialized-jobs-per-user",
  118.         2);
  119.   }
  120.   
  121.   /**
  122.    * Get the guaranteed percentage of the cluster for the specified queue.
  123.    * 
  124.    * This method defaults to configured default Guaranteed Capacity if
  125.    * no value is specified in the configuration for this queue. 
  126.    * If the configured capacity is negative value or greater than 100 an
  127.    * {@link IllegalArgumentException} is thrown.
  128.    * 
  129.    * If default Guaranteed capacity is not configured for a queue, then
  130.    * system allocates capacity based on what is free at the time of 
  131.    * capacity scheduler start
  132.    * 
  133.    * 
  134.    * @param queue name of the queue
  135.    * @return guaranteed percent of the cluster for the queue.
  136.    */
  137.   public float getGuaranteedCapacity(String queue) {
  138.     //Check done in order to return default GC which can be negative
  139.     //In case of both GC and default GC not configured.
  140.     //Last check is if the configuration is specified and is marked as
  141.     //negative we throw exception
  142.     String raw = rmConf.getRaw(toFullPropertyName(queue, 
  143.         "guaranteed-capacity"));
  144.     if(raw == null) {
  145.       return -1;
  146.     }
  147.     float result = rmConf.getFloat(toFullPropertyName(queue, 
  148.                                    "guaranteed-capacity"), 
  149.                                    -1);
  150.     if (result < 0.0 || result > 100.0) {
  151.       throw new IllegalArgumentException("Illegal capacity for queue " + queue +
  152.                                          " of " + result);
  153.     }
  154.     return result;
  155.   }
  156.   
  157.   /**
  158.    * Sets the Guaranteed capacity of the given queue.
  159.    * 
  160.    * @param queue name of the queue
  161.    * @param gc guaranteed percent of the cluster for the queue.
  162.    */
  163.   public void setGuaranteedCapacity(String queue,float gc) {
  164.     rmConf.setFloat(toFullPropertyName(queue, "guaranteed-capacity"),gc);
  165.   }
  166.   
  167.   
  168.   /**
  169.    * Get the amount of time before which redistributed resources must be
  170.    * reclaimed for the specified queue.
  171.    * 
  172.    * The resource manager distributes spare capacity from a free queue
  173.    * to ones which are in need for more resources. However, if a job 
  174.    * submitted to the first queue requires back the resources, they must
  175.    * be reclaimed within the specified configuration time limit.
  176.    * 
  177.    * This method defaults to configured default reclaim time limit if
  178.    * no value is specified in the configuration for this queue.
  179.    * 
  180.    * Throws an {@link IllegalArgumentException} when invalid value is 
  181.    * configured.
  182.    * 
  183.    * @param queue name of the queue
  184.    * @return reclaim time limit for this queue.
  185.    */
  186.   public int getReclaimTimeLimit(String queue) {
  187.     int reclaimTimeLimit = rmConf.getInt(toFullPropertyName(queue, "reclaim-time-limit"), 
  188.         defaultReclaimTime);
  189.     if(reclaimTimeLimit <= 0) {
  190.       throw new IllegalArgumentException("Invalid reclaim time limit : " 
  191.           + reclaimTimeLimit + " for queue : " + queue);
  192.     }
  193.     return reclaimTimeLimit;
  194.   }
  195.   
  196.   /**
  197.    * Set the amount of time before which redistributed resources must be
  198.    * reclaimed for the specified queue.
  199.    * @param queue Name of the queue
  200.    * @param value Amount of time before which the redistributed resources
  201.    * must be retained.
  202.    */
  203.   public void setReclaimTimeLimit(String queue, int value) {
  204.     rmConf.setInt(toFullPropertyName(queue, "reclaim-time-limit"), value);
  205.   }
  206.   
  207.   /**
  208.    * Get whether priority is supported for this queue.
  209.    * 
  210.    * If this value is false, then job priorities will be ignored in 
  211.    * scheduling decisions. This method defaults to <code>false</code> if 
  212.    * the property is not configured for this queue. 
  213.    * @param queue name of the queue
  214.    * @return Whether this queue supports priority or not.
  215.    */
  216.   public boolean isPrioritySupported(String queue) {
  217.     return rmConf.getBoolean(toFullPropertyName(queue, "supports-priority"),
  218.         defaultSupportPriority);  
  219.   }
  220.   
  221.   /**
  222.    * Set whether priority is supported for this queue.
  223.    * 
  224.    * 
  225.    * @param queue name of the queue
  226.    * @param value true, if the queue must support priorities, false otherwise.
  227.    */
  228.   public void setPrioritySupported(String queue, boolean value) {
  229.     rmConf.setBoolean(toFullPropertyName(queue, "supports-priority"), value);
  230.   }
  231.   
  232.   /**
  233.    * Get the minimum limit of resources for any user submitting jobs in 
  234.    * this queue, in percentage.
  235.    * 
  236.    * This method defaults to default user limit configured if
  237.    * no value is specified in the configuration for this queue.
  238.    * 
  239.    * Throws an {@link IllegalArgumentException} when invalid value is 
  240.    * configured.
  241.    * 
  242.    * @param queue name of the queue
  243.    * @return minimum limit of resources, in percentage, that will be 
  244.    * available for a user.
  245.    * 
  246.    */
  247.   public int getMinimumUserLimitPercent(String queue) {
  248.     int userLimit = rmConf.getInt(toFullPropertyName(queue,
  249.         "minimum-user-limit-percent"), defaultUlimitMinimum);
  250.     if(userLimit <= 0 || userLimit > 100) {
  251.       throw new IllegalArgumentException("Invalid user limit : "
  252.           + userLimit + " for queue : " + queue);
  253.     }
  254.     return userLimit;
  255.   }
  256.   
  257.   /**
  258.    * Set the minimum limit of resources for any user submitting jobs in
  259.    * this queue, in percentage.
  260.    * 
  261.    * @param queue name of the queue
  262.    * @param value minimum limit of resources for any user submitting jobs
  263.    * in this queue
  264.    */
  265.   public void setMinimumUserLimitPercent(String queue, int value) {
  266.     rmConf.setInt(toFullPropertyName(queue, "minimum-user-limit-percent"), 
  267.                     value);
  268.   }
  269.   
  270.   /**
  271.    * Reload configuration by clearing the information read from the 
  272.    * underlying configuration file.
  273.    */
  274.   public synchronized void reloadConfiguration() {
  275.     rmConf.reloadConfiguration();
  276.     initializeDefaults();
  277.   }
  278.   
  279.   static final String toFullPropertyName(String queue, 
  280.                                                   String property) {
  281.       return QUEUE_CONF_PROPERTY_NAME_PREFIX + queue + "." + property;
  282.   }
  283.   /**
  284.    * Gets the maximum number of jobs which are allowed to initialize in the
  285.    * job queue.
  286.    * 
  287.    * @param queue queue name.
  288.    * @return maximum number of jobs allowed to be initialized per user.
  289.    * @throws IllegalArgumentException if maximum number of users is negative
  290.    * or zero.
  291.    */
  292.   public int getMaxJobsPerUserToInitialize(String queue) {
  293.     int maxJobsPerUser = rmConf.getInt(toFullPropertyName(queue,
  294.         "maximum-initialized-jobs-per-user"), 
  295.         defaultMaxJobsPerUsersToInitialize);
  296.     if(maxJobsPerUser <= 0) {
  297.       throw new IllegalArgumentException(
  298.           "Invalid maximum jobs per user configuration " + maxJobsPerUser);
  299.     }
  300.     return maxJobsPerUser;
  301.   }
  302.   
  303.   /**
  304.    * Sets the maximum number of jobs which are allowed to be initialized 
  305.    * for a user in the queue.
  306.    * 
  307.    * @param queue queue name.
  308.    * @param value maximum number of jobs allowed to be initialized per user.
  309.    */
  310.   public void setMaxJobsPerUserToInitialize(String queue, int value) {
  311.     rmConf.setInt(toFullPropertyName(queue, 
  312.         "maximum-initialized-jobs-per-user"), value);
  313.   }
  314.   /**
  315.    * Amount of time in milliseconds which poller thread and initialization
  316.    * thread would sleep before looking at the queued jobs.
  317.    * 
  318.    * The default value if no corresponding configuration is present is
  319.    * 5000 Milliseconds.
  320.    *  
  321.    * @return time in milliseconds.
  322.    * @throws IllegalArgumentException if time is negative or zero.
  323.    */
  324.   public long getSleepInterval() {
  325.     long sleepInterval = rmConf.getLong(
  326.         "mapred.capacity-scheduler.init-poll-interval", 
  327.         INITIALIZATION_THREAD_POLLING_INTERVAL);
  328.     
  329.     if(sleepInterval <= 0) {
  330.       throw new IllegalArgumentException(
  331.           "Invalid initializater poller interval " + sleepInterval);
  332.     }
  333.     
  334.     return sleepInterval;
  335.   }
  336.   /**
  337.    * Gets maximum number of threads which are spawned to initialize jobs
  338.    * in job queue in  parallel. The number of threads should be always less than
  339.    * or equal to number of job queues present.
  340.    * 
  341.    * If number of threads is configured to be more than job queues present,
  342.    * then number of job queues is used as number of threads used for initializing
  343.    * jobs.
  344.    * 
  345.    * So a given thread can have responsibility of initializing jobs from more 
  346.    * than one queue.
  347.    * 
  348.    * The default value is 5
  349.    * 
  350.    * @return maximum number of threads spawned to initialize jobs in job queue
  351.    * in parallel.
  352.    */
  353.   public int getMaxWorkerThreads() {
  354.     int maxWorkerThreads = rmConf.getInt(
  355.         "mapred.capacity-scheduler.init-worker-threads", 
  356.         MAX_INITIALIZATION_WORKER_THREADS);
  357.     if(maxWorkerThreads <= 0) {
  358.       throw new IllegalArgumentException(
  359.           "Invalid initializater worker thread number " + maxWorkerThreads);
  360.     }
  361.     return maxWorkerThreads;
  362.   }
  363.   /**
  364.    * Set the sleep interval which initialization poller would sleep before 
  365.    * it looks at the jobs in the job queue.
  366.    * 
  367.    * @param interval sleep interval
  368.    */
  369.   public void setSleepInterval(long interval) {
  370.     rmConf.setLong(
  371.         "mapred.capacity-scheduler.init-poll-interval", interval);
  372.   }
  373.   
  374.   /**
  375.    * Sets number of threads which can be spawned to initialize jobs in
  376.    * parallel.
  377.    * 
  378.    * @param poolSize number of threads to be spawned to initialize jobs
  379.    * in parallel.
  380.    */
  381.   public void setMaxWorkerThreads(int poolSize) {
  382.     rmConf.setInt(
  383.         "mapred.capacity-scheduler.init-worker-threads", poolSize);
  384.   }
  385.   /**
  386.    * Get the upper limit on the maximum physical memory that can be specified by
  387.    * a job.
  388.    * 
  389.    * @return upper limit for max pmem for tasks.
  390.    */
  391.   public long getLimitMaxPmemForTasks() {
  392.     return rmConf.getLong(UPPER_LIMIT_ON_TASK_PMEM_PROPERTY,
  393.         JobConf.DISABLED_MEMORY_LIMIT);
  394.   }
  395.   /**
  396.    * Get the upper limit on the maximum physical memory that can be specified by
  397.    * a job.
  398.    * 
  399.    * @param value
  400.    */
  401.   public void setLimitMaxPmemForTasks(long value) {
  402.     rmConf.setLong(UPPER_LIMIT_ON_TASK_PMEM_PROPERTY, value);
  403.   }
  404.   /**
  405.    * Get cluster-wide default percentage of pmem in vmem.
  406.    * 
  407.    * @return cluster-wide default percentage of pmem in vmem.
  408.    */
  409.   public float getDefaultPercentOfPmemInVmem() {
  410.     return rmConf.getFloat(DEFAULT_PERCENTAGE_OF_PMEM_IN_VMEM_PROPERTY,
  411.         JobConf.DISABLED_MEMORY_LIMIT);
  412.   }
  413.   /**
  414.    * Set cluster-wide default percentage of pmem in vmem.
  415.    * 
  416.    * @param value
  417.    */
  418.   public void setDefaultPercentOfPmemInVmem(float value) {
  419.     rmConf.setFloat(DEFAULT_PERCENTAGE_OF_PMEM_IN_VMEM_PROPERTY, value);
  420.   }
  421.   
  422.   /**
  423.    * Gets the reclaim capacity thread interval.
  424.    * 
  425.    * @return reclaim capacity interval
  426.    */
  427.   public long getReclaimCapacityInterval() {
  428.     long reclaimCapacityInterval = 
  429.       rmConf.getLong("mapred.capacity-scheduler.reclaimCapacity.interval", 5);
  430.     
  431.     if(reclaimCapacityInterval <= 0) {
  432.       throw new IllegalArgumentException("Invalid reclaim capacity " +
  433.        "interval, should be greater than zero");
  434.     }
  435.     return reclaimCapacityInterval;
  436.   }
  437.   /**
  438.    * Sets the reclaim capacity thread interval.
  439.    * 
  440.    * @param value
  441.    */
  442.   public void setReclaimCapacityInterval(long value) {
  443.     rmConf.setLong("mapred.capacity-scheduler.reclaimCapacity.interval", 
  444.         value);
  445.   }
  446. }