QueueManager.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:10k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.util.ArrayList;
  20. import java.util.HashMap;
  21. import java.util.Set;
  22. import java.util.TreeSet;
  23. import org.apache.commons.logging.Log;
  24. import org.apache.commons.logging.LogFactory;
  25. import org.apache.hadoop.conf.Configuration;
  26. import org.apache.hadoop.security.UserGroupInformation;
  27. import org.apache.hadoop.security.SecurityUtil.AccessControlList;
  28. /**
  29.  * Class that exposes information about queues maintained by the Hadoop
  30.  * Map/Reduce framework.
  31.  * 
  32.  * The Map/Reduce framework can be configured with one or more queues,
  33.  * depending on the scheduler it is configured with. While some 
  34.  * schedulers work only with one queue, some schedulers support multiple 
  35.  * queues.
  36.  *  
  37.  * Queues can be configured with various properties. Some of these
  38.  * properties are common to all schedulers, and those are handled by this
  39.  * class. Schedulers might also associate several custom properties with 
  40.  * queues. Where such a case exists, the queue name must be used to link 
  41.  * the common properties with the scheduler specific ones.  
  42.  */
  43. class QueueManager {
  44.   
  45.   private static final Log LOG = LogFactory.getLog(QueueManager.class);
  46.   
  47.   // Prefix in configuration for queue related keys
  48.   private static final String QUEUE_CONF_PROPERTY_NAME_PREFIX 
  49.                                                         = "mapred.queue.";
  50.   // Configured queues
  51.   private Set<String> queueNames;
  52.   // Map of a queue and ACL property name with an ACL
  53.   private HashMap<String, AccessControlList> aclsMap;
  54.   // Map of a queue name to any generic object that represents 
  55.   // scheduler information 
  56.   private HashMap<String, Object> schedulerInfoObjects;
  57.   // Whether ACLs are enabled in the system or not.
  58.   private boolean aclsEnabled;
  59.   
  60.   /**
  61.    * Enum representing an operation that can be performed on a queue.
  62.    */
  63.   static enum QueueOperation {
  64.     SUBMIT_JOB ("acl-submit-job", false),
  65.     ADMINISTER_JOBS ("acl-administer-jobs", true);
  66.     // TODO: Add ACL for LIST_JOBS when we have ability to authenticate 
  67.     //       users in UI
  68.     // TODO: Add ACL for CHANGE_ACL when we have an admin tool for 
  69.     //       configuring queues.
  70.     
  71.     private final String aclName;
  72.     private final boolean jobOwnerAllowed;
  73.     
  74.     QueueOperation(String aclName, boolean jobOwnerAllowed) {
  75.       this.aclName = aclName;
  76.       this.jobOwnerAllowed = jobOwnerAllowed;
  77.     }
  78.     final String getAclName() {
  79.       return aclName;
  80.     }
  81.     
  82.     final boolean isJobOwnerAllowed() {
  83.       return jobOwnerAllowed;
  84.     }
  85.   }
  86.   
  87.   /**
  88.    * Construct a new QueueManager using configuration specified in the passed
  89.    * in {@link org.apache.hadoop.conf.Configuration} object.
  90.    * 
  91.    * @param conf Configuration object where queue configuration is specified.
  92.    */
  93.   public QueueManager(Configuration conf) {
  94.     queueNames = new TreeSet<String>();
  95.     aclsMap = new HashMap<String, AccessControlList>();
  96.     schedulerInfoObjects = new HashMap<String, Object>();
  97.     initialize(conf);
  98.   }
  99.   
  100.   /**
  101.    * Return the set of queues configured in the system.
  102.    * 
  103.    * The number of queues configured should be dependent on the Scheduler 
  104.    * configured. Note that some schedulers work with only one queue, whereas
  105.    * others can support multiple queues.
  106.    *  
  107.    * @return Set of queue names.
  108.    */
  109.   public synchronized Set<String> getQueues() {
  110.     return queueNames;
  111.   }
  112.   
  113.   /**
  114.    * Return true if the given {@link QueueManager.QueueOperation} can be 
  115.    * performed by the specified user on the given queue.
  116.    * 
  117.    * An operation is allowed if all users are provided access for this
  118.    * operation, or if either the user or any of the groups specified is
  119.    * provided access.
  120.    * 
  121.    * @param queueName Queue on which the operation needs to be performed. 
  122.    * @param oper The operation to perform
  123.    * @param ugi The user and groups who wish to perform the operation.
  124.    * 
  125.    * @return true if the operation is allowed, false otherwise.
  126.    */
  127.   public synchronized boolean hasAccess(String queueName, QueueOperation oper,
  128.                                 UserGroupInformation ugi) {
  129.     return hasAccess(queueName, null, oper, ugi);
  130.   }
  131.   
  132.   /**
  133.    * Return true if the given {@link QueueManager.QueueOperation} can be 
  134.    * performed by the specified user on the specified job in the given queue.
  135.    * 
  136.    * An operation is allowed either if the owner of the job is the user 
  137.    * performing the task, all users are provided access for this
  138.    * operation, or if either the user or any of the groups specified is
  139.    * provided access.
  140.    * 
  141.    * If the {@link QueueManager.QueueOperation} is not job specific then the 
  142.    * job parameter is ignored.
  143.    * 
  144.    * @param queueName Queue on which the operation needs to be performed.
  145.    * @param job The {@link JobInProgress} on which the operation is being
  146.    *            performed. 
  147.    * @param oper The operation to perform
  148.    * @param ugi The user and groups who wish to perform the operation.
  149.    * 
  150.    * @return true if the operation is allowed, false otherwise.
  151.    */
  152.   public synchronized boolean hasAccess(String queueName, JobInProgress job, 
  153.                                 QueueOperation oper, 
  154.                                 UserGroupInformation ugi) {
  155.     if (!aclsEnabled) {
  156.       return true;
  157.     }
  158.     
  159.     if (LOG.isDebugEnabled()) {
  160.       LOG.debug("checking access for : " + toFullPropertyName(queueName, 
  161.                                             oper.getAclName()));      
  162.     }
  163.     
  164.     if (oper.isJobOwnerAllowed()) {
  165.       if (job.getJobConf().getUser().equals(ugi.getUserName())) {
  166.         return true;
  167.       }
  168.     }
  169.     
  170.     AccessControlList acl = aclsMap.get(toFullPropertyName(queueName, oper.getAclName()));
  171.     if (acl == null) {
  172.       return false;
  173.     }
  174.     
  175.     // Check the ACL list
  176.     boolean allowed = acl.allAllowed();
  177.     if (!allowed) {
  178.       // Check the allowed users list
  179.       if (acl.getUsers().contains(ugi.getUserName())) {
  180.         allowed = true;
  181.       } else {
  182.         // Check the allowed groups list
  183.         Set<String> allowedGroups = acl.getGroups();
  184.         for (String group : ugi.getGroupNames()) {
  185.           if (allowedGroups.contains(group)) {
  186.             allowed = true;
  187.             break;
  188.           }
  189.         }
  190.       }
  191.     }
  192.     
  193.     return allowed;    
  194.   }
  195.   
  196.   /**
  197.    * Set a generic Object that represents scheduling information relevant
  198.    * to a queue.
  199.    * 
  200.    * A string representation of this Object will be used by the framework
  201.    * to display in user facing applications like the JobTracker web UI and
  202.    * the hadoop CLI.
  203.    * 
  204.    * @param queueName queue for which the scheduling information is to be set. 
  205.    * @param queueInfo scheduling information for this queue.
  206.    */
  207.   public synchronized void setSchedulerInfo(String queueName, 
  208.                                               Object queueInfo) {
  209.     schedulerInfoObjects.put(queueName, queueInfo);
  210.   }
  211.   
  212.   /**
  213.    * Return the scheduler information configured for this queue.
  214.    * 
  215.    * @param queueName queue for which the scheduling information is required.
  216.    * @return The scheduling information for this queue.
  217.    * 
  218.    * @see #setSchedulerInfo(String, Object)
  219.    */
  220.   public synchronized Object getSchedulerInfo(String queueName) {
  221.     return schedulerInfoObjects.get(queueName);
  222.   }
  223.   
  224.   /**
  225.    * Refresh information configured for queues in the system by reading
  226.    * it from the passed in {@link org.apache.hadoop.conf.Configuration}.
  227.    *
  228.    * Previously stored information about queues is removed and new
  229.    * information populated from the configuration.
  230.    * 
  231.    * @param conf New configuration for the queues. 
  232.    */
  233.   public synchronized void refresh(Configuration conf) {
  234.     queueNames.clear();
  235.     aclsMap.clear();
  236.     schedulerInfoObjects.clear();
  237.     initialize(conf);
  238.   }
  239.   
  240.   private void initialize(Configuration conf) {
  241.     aclsEnabled = conf.getBoolean("mapred.acls.enabled", false);
  242.     String[] queues = conf.getStrings("mapred.queue.names", 
  243.                                   new String[] {JobConf.DEFAULT_QUEUE_NAME});
  244.     addToSet(queueNames, queues);
  245.     
  246.     // for every queue, and every operation, get the ACL
  247.     // if any is specified and store in aclsMap.
  248.     for (String queue : queues) {
  249.       for (QueueOperation oper : QueueOperation.values()) {
  250.         String key = toFullPropertyName(queue, oper.getAclName());
  251.         String aclString = conf.get(key, "*");
  252.         aclsMap.put(key, new AccessControlList(aclString));
  253.       }
  254.     }
  255.   }
  256.   
  257.   private static final String toFullPropertyName(String queue, 
  258.       String property) {
  259.     return QUEUE_CONF_PROPERTY_NAME_PREFIX + queue + "." + property;
  260.   }
  261.   
  262.   private static final void addToSet(Set<String> set, String[] elems) {
  263.     for (String elem : elems) {
  264.       set.add(elem);
  265.     }
  266.   }
  267.   
  268.   synchronized JobQueueInfo[] getJobQueueInfos() {
  269.     ArrayList<JobQueueInfo> queueInfoList = new ArrayList<JobQueueInfo>();
  270.     for(String queue : queueNames) {
  271.       Object schedulerInfo = schedulerInfoObjects.get(queue);
  272.       if(schedulerInfo != null) {
  273.         queueInfoList.add(new JobQueueInfo(queue,schedulerInfo.toString()));
  274.       }else {
  275.         queueInfoList.add(new JobQueueInfo(queue,null));
  276.       }
  277.     }
  278.     return (JobQueueInfo[]) queueInfoList.toArray(new JobQueueInfo[queueInfoList
  279.         .size()]);
  280.   }
  281.   JobQueueInfo getJobQueueInfo(String queue) {
  282.     Object schedulingInfo = schedulerInfoObjects.get(queue);
  283.     if(schedulingInfo!=null){
  284.       return new JobQueueInfo(queue,schedulingInfo.toString());
  285.     }else {
  286.       return new JobQueueInfo(queue,null);
  287.     }
  288.   }
  289. }