CapacityTaskScheduler.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:59k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.IOException;
  20. import java.util.ArrayList;
  21. import java.util.Collection;
  22. import java.util.Collections;
  23. import java.util.Comparator;
  24. import java.util.HashMap;
  25. import java.util.HashSet;
  26. import java.util.Iterator;
  27. import java.util.LinkedList;
  28. import java.util.List;
  29. import java.util.Map;
  30. import java.util.Set;
  31. import org.apache.commons.logging.Log;
  32. import org.apache.commons.logging.LogFactory;
  33. import org.apache.hadoop.conf.Configuration;
  34. import org.apache.hadoop.mapred.JobTracker.IllegalStateException;
  35. import org.apache.hadoop.util.StringUtils;
  36. /**
  37.  * A {@link TaskScheduler} that implements the requirements in HADOOP-3421
  38.  * and provides a HOD-less way to share large clusters. This scheduler 
  39.  * provides the following features: 
  40.  *  * support for queues, where a job is submitted to a queue. 
  41.  *  * Queues are guaranteed a fraction of the capacity of the grid (their 
  42.  *  'guaranteed capacity') in the sense that a certain capacity of resources 
  43.  *  will be at their disposal. All jobs submitted to the queues of an Org 
  44.  *  will have access to the capacity guaranteed to the Org.
  45.  *  * Free resources can be allocated to any queue beyond its guaranteed 
  46.  *  capacity. These excess allocated resources can be reclaimed and made 
  47.  *  available to another queue in order to meet its capacity guarantee.
  48.  *  * The scheduler guarantees that excess resources taken from a queue will 
  49.  *  be restored to it within N minutes of its need for them.
  50.  *  * Queues optionally support job priorities (disabled by default). 
  51.  *  * Within a queue, jobs with higher priority will have access to the 
  52.  *  queue's resources before jobs with lower priority. However, once a job 
  53.  *  is running, it will not be preempted for a higher priority job.
  54.  *  * In order to prevent one or more users from monopolizing its resources, 
  55.  *  each queue enforces a limit on the percentage of resources allocated to a 
  56.  *  user at any given time, if there is competition for them.
  57.  *  
  58.  */
  59. class CapacityTaskScheduler extends TaskScheduler {
  60.   
  61.   /** 
  62.    * For keeping track of reclaimed capacity. 
  63.    * Whenever slots need to be reclaimed, we create one of these objects. 
  64.    * As the queue gets slots, the amount to reclaim gets decremented. if 
  65.    * we haven't reclaimed enough within a certain time, we need to kill 
  66.    * tasks. This object 'expires' either if all resources are reclaimed
  67.    * before the deadline, or the deadline passes . 
  68.    */
  69.   private static class ReclaimedResource {
  70.     // how much resource to reclaim
  71.     public int originalAmount;
  72.     // how much is to be reclaimed currently
  73.     public int currentAmount;
  74.     // the time, in millisecs, when this object expires. 
  75.     // This time is equal to the time when the object was created, plus
  76.     // the reclaim-time SLA for the queue.  
  77.     public long whenToExpire;
  78.     // we also keep track of when to kill tasks, in millisecs. This is a 
  79.     // fraction of 'whenToExpire', but we store it here so we don't 
  80.     // recompute it every time. 
  81.     public long whenToKill;
  82.     
  83.     public ReclaimedResource(int amount, long expiryTime, 
  84.         long whenToKill) {
  85.       this.originalAmount = amount;
  86.       this.currentAmount = amount;
  87.       this.whenToExpire = expiryTime;
  88.       this.whenToKill = whenToKill;
  89.     }
  90.   }
  91.   /***********************************************************************
  92.    * Keeping track of scheduling information for queues
  93.    * 
  94.    * We need to maintain scheduling information relevant to a queue (its 
  95.    * name, guaranteed capacity, etc), along with information specific to 
  96.    * each kind of task, Map or Reduce (num of running tasks, pending 
  97.    * tasks etc). 
  98.    * 
  99.    * This scheduling information is used to decide how to allocate
  100.    * tasks, redistribute capacity, etc.
  101.    *  
  102.    * A QueueSchedulingInfo(QSI) object represents scheduling information for
  103.    * a queue. A TaskSchedulingInfo (TSI) object represents scheduling 
  104.    * information for a particular kind of task (Map or Reduce).
  105.    *   
  106.    **********************************************************************/
  107.   private static class TaskSchedulingInfo {
  108.     /** 
  109.      * the actual gc, which depends on how many slots are available
  110.      * in the cluster at any given time. 
  111.      */
  112.     int guaranteedCapacity = 0;
  113.     // number of running tasks
  114.     int numRunningTasks = 0;
  115.     // number of pending tasks
  116.     int numPendingTasks = 0;
  117.     /** for each user, we need to keep track of number of running tasks */
  118.     Map<String, Integer> numRunningTasksByUser = 
  119.       new HashMap<String, Integer>();
  120.     
  121.     /**
  122.      * We need to keep track of resources to reclaim. 
  123.      * Whenever a queue is under capacity and has tasks pending, we offer it 
  124.      * an SLA that gives it free slots equal to or greater than the gap in 
  125.      * its capacity, within a period of time (reclaimTime). 
  126.      * To do this, we periodically check if queues need to reclaim capacity. 
  127.      * If they do, we create a ResourceReclaim object. We also periodically
  128.      * check if a queue has received enough free slots within, say, 80% of 
  129.      * its reclaimTime. If not, we kill enough tasks to make up the 
  130.      * difference. 
  131.      * We keep two queues of ResourceReclaim objects. when an object is 
  132.      * created, it is placed in one queue. Once we kill tasks to recover 
  133.      * resources for that object, it is placed in an expiry queue. we need
  134.      * to do this to prevent creating spurious ResourceReclaim objects. We 
  135.      * keep a count of total resources that are being reclaimed. This count 
  136.      * is decremented when an object expires. 
  137.      */
  138.     
  139.     /**
  140.      * the list of resources to reclaim. This list is always sorted so that
  141.      * resources that need to be reclaimed sooner occur earlier in the list.
  142.      */
  143.     LinkedList<ReclaimedResource> reclaimList = 
  144.       new LinkedList<ReclaimedResource>();
  145.     /**
  146.      * the list of resources to expire. This list is always sorted so that
  147.      * resources that need to be expired sooner occur earlier in the list.
  148.      */
  149.     LinkedList<ReclaimedResource> reclaimExpireList = 
  150.       new LinkedList<ReclaimedResource>();
  151.     /** 
  152.      * sum of all resources that are being reclaimed. 
  153.      * We keep this to prevent unnecessary ReclaimResource objects from being
  154.      * created.  
  155.      */
  156.     int numReclaimedResources = 0;
  157.     
  158.     /**
  159.      * reset the variables associated with tasks
  160.      */
  161.     void resetTaskVars() {
  162.       numRunningTasks = 0;
  163.       numPendingTasks = 0;
  164.       for (String s: numRunningTasksByUser.keySet()) {
  165.         numRunningTasksByUser.put(s, 0);
  166.       }
  167.     }
  168.     /**
  169.      * return information about the tasks
  170.      */
  171.     public String toString(){
  172.       float runningTasksAsPercent = guaranteedCapacity!= 0 ? 
  173.           ((float)numRunningTasks * 100/guaranteedCapacity):0;
  174.       StringBuffer sb = new StringBuffer();
  175.       sb.append("Guaranteed Capacity: " + guaranteedCapacity + "n");
  176.       sb.append(String.format("Running tasks: %.1f%% of Guaranteed Capacityn",
  177.           runningTasksAsPercent));
  178.       // include info on active users
  179.       if (numRunningTasks != 0) {
  180.         sb.append("Active users:n");
  181.         for (Map.Entry<String, Integer> entry: numRunningTasksByUser.entrySet()) {
  182.           if ((entry.getValue() == null) || (entry.getValue().intValue() <= 0)) {
  183.             // user has no tasks running
  184.             continue;
  185.           }
  186.           sb.append("User '" + entry.getKey()+ "': ");
  187.           float p = (float)entry.getValue().intValue()*100/numRunningTasks;
  188.           sb.append(String.format("%.1f%% of running tasksn", p));
  189.         }
  190.       }
  191.       return sb.toString();
  192.     }
  193.   }
  194.   
  195.   private static class QueueSchedulingInfo {
  196.     String queueName;
  197.     /** guaranteed capacity(%) is set in the config */ 
  198.     float guaranteedCapacityPercent = 0;
  199.     
  200.     /** 
  201.      * to handle user limits, we need to know how many users have jobs in 
  202.      * the queue.
  203.      */  
  204.     Map<String, Integer> numJobsByUser = new HashMap<String, Integer>();
  205.       
  206.     /** min value of user limit (same for all users) */
  207.     int ulMin;
  208.     
  209.     /**
  210.      * reclaim time limit (in msec). This time represents the SLA we offer 
  211.      * a queue - a queue gets back any lost capacity withing this period 
  212.      * of time.  
  213.      */ 
  214.     long reclaimTime;
  215.     
  216.     /**
  217.      * We keep track of the JobQueuesManager only for reporting purposes 
  218.      * (in toString()). 
  219.      */
  220.     private JobQueuesManager jobQueuesManager;
  221.     
  222.     /**
  223.      * We keep a TaskSchedulingInfo object for each kind of task we support
  224.      */
  225.     TaskSchedulingInfo mapTSI;
  226.     TaskSchedulingInfo reduceTSI;
  227.     
  228.     public QueueSchedulingInfo(String queueName, float gcPercent, 
  229.         int ulMin, long reclaimTime, JobQueuesManager jobQueuesManager) {
  230.       this.queueName = new String(queueName);
  231.       this.guaranteedCapacityPercent = gcPercent;
  232.       this.ulMin = ulMin;
  233.       this.reclaimTime = reclaimTime;
  234.       this.jobQueuesManager = jobQueuesManager;
  235.       this.mapTSI = new TaskSchedulingInfo();
  236.       this.reduceTSI = new TaskSchedulingInfo();
  237.     }
  238.     
  239.     /**
  240.      * return information about the queue
  241.      */
  242.     public String toString(){
  243.       // We print out the queue information first, followed by info
  244.       // on map and reduce tasks and job info
  245.       StringBuffer sb = new StringBuffer();
  246.       sb.append("Queue configurationn");
  247.       //sb.append("Name: " + queueName + "n");
  248.       sb.append("Guaranteed Capacity Percentage: ");
  249.       sb.append(guaranteedCapacityPercent);
  250.       sb.append("%n");
  251.       sb.append(String.format("User Limit: %d%sn",ulMin, "%"));
  252.       sb.append(String.format("Reclaim Time limit: %sn", 
  253.           StringUtils.formatTime(reclaimTime)));
  254.       sb.append(String.format("Priority Supported: %sn",
  255.           (jobQueuesManager.doesQueueSupportPriorities(queueName))?
  256.               "YES":"NO"));
  257.       sb.append("-------------n");
  258.       
  259.       sb.append("Map tasksn");
  260.       sb.append(mapTSI.toString());
  261.       sb.append("-------------n");
  262.       sb.append("Reduce tasksn");
  263.       sb.append(reduceTSI.toString());
  264.       sb.append("-------------n");
  265.       
  266.       sb.append("Job infon");
  267.       sb.append(String.format("Number of Waiting Jobs: %dn", 
  268.           jobQueuesManager.getWaitingJobCount(queueName)));
  269.       sb.append(String.format("Number of users who have submitted jobs: %dn", 
  270.           numJobsByUser.size()));
  271.       return sb.toString();
  272.     }
  273.   }
  274.   /** quick way to get qsi object given a queue name */
  275.   private Map<String, QueueSchedulingInfo> queueInfoMap = 
  276.     new HashMap<String, QueueSchedulingInfo>();
  277.   
  278.   /**
  279.    * This class captures scheduling information we want to display or log.
  280.    */
  281.   private static class SchedulingDisplayInfo {
  282.     private String queueName;
  283.     CapacityTaskScheduler scheduler;
  284.     
  285.     SchedulingDisplayInfo(String queueName, CapacityTaskScheduler scheduler) { 
  286.       this.queueName = queueName;
  287.       this.scheduler = scheduler;
  288.     }
  289.     
  290.     @Override
  291.     public String toString(){
  292.       // note that we do not call updateQSIObjects() here for performance
  293.       // reasons. This means that the data we print out may be slightly
  294.       // stale. This data is updated whenever assignTasks() is called, or
  295.       // whenever the reclaim capacity thread runs, which should be fairly
  296.       // often. If neither of these happen, the data gets stale. If we see
  297.       // this often, we may need to detect this situation and call 
  298.       // updateQSIObjects(), or just call it each time. 
  299.       return scheduler.getDisplayInfo(queueName);
  300.     }
  301.   }
  302.   // this class encapsulates the result of a task lookup
  303.   private static class TaskLookupResult {
  304.     static enum LookUpStatus {
  305.       TASK_FOUND,
  306.       NO_TASK_FOUND,
  307.       TASK_FAILING_MEMORY_REQUIREMENT,
  308.     }
  309.     // constant TaskLookupResult objects. Should not be accessed directly.
  310.     private static final TaskLookupResult NoTaskLookupResult = 
  311.       new TaskLookupResult(null, TaskLookupResult.LookUpStatus.NO_TASK_FOUND);
  312.     private static final TaskLookupResult MemFailedLookupResult = 
  313.       new TaskLookupResult(null, 
  314.           TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT);
  315.     private LookUpStatus lookUpStatus;
  316.     private Task task;
  317.     // should not call this constructor directly. use static factory methods.
  318.     private TaskLookupResult(Task t, LookUpStatus lUStatus) {
  319.       this.task = t;
  320.       this.lookUpStatus = lUStatus;
  321.     }
  322.     
  323.     static TaskLookupResult getTaskFoundResult(Task t) {
  324.       return new TaskLookupResult(t, LookUpStatus.TASK_FOUND);
  325.     }
  326.     static TaskLookupResult getNoTaskFoundResult() {
  327.       return NoTaskLookupResult;
  328.     }
  329.     static TaskLookupResult getMemFailedResult() {
  330.       return MemFailedLookupResult;
  331.     }
  332.     
  333.     Task getTask() {
  334.       return task;
  335.     }
  336.     LookUpStatus getLookUpStatus() {
  337.       return lookUpStatus;
  338.     }
  339.   }
  340.   /** 
  341.    * This class handles the scheduling algorithms. 
  342.    * The algos are the same for both Map and Reduce tasks. 
  343.    * There may be slight variations later, in which case we can make this
  344.    * an abstract base class and have derived classes for Map and Reduce.  
  345.    */
  346.   private static abstract class TaskSchedulingMgr {
  347.     /** our TaskScheduler object */
  348.     protected CapacityTaskScheduler scheduler;
  349.     // can be replaced with a global type, if we have one
  350.     protected static enum TYPE {
  351.       MAP, REDUCE
  352.     }
  353.     protected TYPE type = null;
  354.     abstract Task obtainNewTask(TaskTrackerStatus taskTracker, 
  355.         JobInProgress job) throws IOException; 
  356.     abstract int getPendingTasks(JobInProgress job);
  357.     abstract int killTasksFromJob(JobInProgress job, int tasksToKill);
  358.     abstract TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi);
  359.     /**
  360.      * List of QSIs for assigning tasks.
  361.      * This list is ordered such that queues that need to reclaim capacity
  362.      * sooner, come before queues that don't. For queues that don't, they're
  363.      * ordered by a ratio of (# of running tasks)/Guaranteed capacity, which
  364.      * indicates how much 'free space' the queue has, or how much it is over
  365.      * capacity. This ordered list is iterated over, when assigning tasks.
  366.      */  
  367.     private List<QueueSchedulingInfo> qsiForAssigningTasks = 
  368.       new ArrayList<QueueSchedulingInfo>();  
  369.     /** 
  370.      * Comparator to sort queues.
  371.      * For maps, we need to sort on QueueSchedulingInfo.mapTSI. For 
  372.      * reducers, we use reduceTSI. So we'll need separate comparators.  
  373.      */ 
  374.     private static abstract class QueueComparator 
  375.       implements Comparator<QueueSchedulingInfo> {
  376.       abstract TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi);
  377.       public int compare(QueueSchedulingInfo q1, QueueSchedulingInfo q2) {
  378.         TaskSchedulingInfo t1 = getTSI(q1);
  379.         TaskSchedulingInfo t2 = getTSI(q2);
  380.         // if one queue needs to reclaim something and the other one doesn't, 
  381.         // the former is first
  382.         if ((0 == t1.reclaimList.size()) && (0 != t2.reclaimList.size())) {
  383.           return 1;
  384.         }
  385.         else if ((0 != t1.reclaimList.size()) && (0 == t2.reclaimList.size())){
  386.           return -1;
  387.         }
  388.         else if ((0 == t1.reclaimList.size()) && (0 == t2.reclaimList.size())){
  389.           // neither needs to reclaim. 
  390.           // look at how much capacity they've filled. Treat a queue with gc=0 
  391.           // equivalent to a queue running at capacity
  392.           double r1 = (0 == t1.guaranteedCapacity)? 1.0f: 
  393.             (double)t1.numRunningTasks/(double)t1.guaranteedCapacity;
  394.           double r2 = (0 == t2.guaranteedCapacity)? 1.0f:
  395.             (double)t2.numRunningTasks/(double)t2.guaranteedCapacity;
  396.           if (r1<r2) return -1;
  397.           else if (r1>r2) return 1;
  398.           else return 0;
  399.         }
  400.         else {
  401.           // both have to reclaim. Look at which one needs to reclaim earlier
  402.           long tm1 = t1.reclaimList.get(0).whenToKill;
  403.           long tm2 = t2.reclaimList.get(0).whenToKill;
  404.           if (tm1<tm2) return -1;
  405.           else if (tm1>tm2) return 1;
  406.           else return 0;
  407.         }
  408.       }
  409.     }
  410.     // subclass for map and reduce comparators
  411.     private static final class MapQueueComparator extends QueueComparator {
  412.       TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi) {
  413.         return qsi.mapTSI;
  414.       }
  415.     }
  416.     private static final class ReduceQueueComparator extends QueueComparator {
  417.       TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi) {
  418.         return qsi.reduceTSI;
  419.       }
  420.     }
  421.     // these are our comparator instances
  422.     protected final static MapQueueComparator mapComparator = new MapQueueComparator();
  423.     protected final static ReduceQueueComparator reduceComparator = new ReduceQueueComparator();
  424.     // and this is the comparator to use
  425.     protected QueueComparator queueComparator;
  426.    
  427.     TaskSchedulingMgr(CapacityTaskScheduler sched) {
  428.       scheduler = sched;
  429.     }
  430.     
  431.     // let the scheduling mgr know which queues are in the system
  432.     void initialize(Map<String, QueueSchedulingInfo> qsiMap) { 
  433.       // add all the qsi objects to our list and sort
  434.       qsiForAssigningTasks.addAll(qsiMap.values());
  435.       Collections.sort(qsiForAssigningTasks, queueComparator);
  436.     }
  437.     
  438.     /** 
  439.      * Periodically, we walk through our queues to do the following: 
  440.      * a. Check if a queue needs to reclaim any resources within a period
  441.      * of time (because it's running below capacity and more tasks are
  442.      * waiting)
  443.      * b. Check if a queue hasn't received enough of the resources it needed
  444.      * to be reclaimed and thus tasks need to be killed.
  445.      * The caller is responsible for ensuring that the QSI objects and the 
  446.      * collections are up-to-date.
  447.      * 
  448.      * Make sure that we do not make any calls to scheduler.taskTrackerManager
  449.      * as this can result in a deadlock (see HADOOP-4977). 
  450.      */
  451.     private synchronized void reclaimCapacity(int nextHeartbeatInterval) {
  452.       int tasksToKill = 0;
  453.       
  454.       QueueSchedulingInfo lastQsi = 
  455.         qsiForAssigningTasks.get(qsiForAssigningTasks.size()-1);
  456.       TaskSchedulingInfo lastTsi = getTSI(lastQsi);
  457.       long currentTime = scheduler.clock.getTime();
  458.       for (QueueSchedulingInfo qsi: qsiForAssigningTasks) {
  459.         TaskSchedulingInfo tsi = getTSI(qsi);
  460.         if (tsi.guaranteedCapacity <= 0) {
  461.           // no capacity, hence nothing can be reclaimed.
  462.           continue;
  463.         }
  464.         // is there any resource that needs to be reclaimed? 
  465.         if ((!tsi.reclaimList.isEmpty()) &&  
  466.             (tsi.reclaimList.getFirst().whenToKill < 
  467.               currentTime + CapacityTaskScheduler.RECLAIM_CAPACITY_INTERVAL)) {
  468.           // make a note of how many tasks to kill to claim resources
  469.           tasksToKill += tsi.reclaimList.getFirst().currentAmount;
  470.           // move this to expiry list
  471.           ReclaimedResource r = tsi.reclaimList.remove();
  472.           tsi.reclaimExpireList.add(r);
  473.         }
  474.         // is there any resource that needs to be expired?
  475.         if ((!tsi.reclaimExpireList.isEmpty()) && 
  476.             (tsi.reclaimExpireList.getFirst().whenToExpire <= currentTime)) {
  477.           ReclaimedResource r = tsi.reclaimExpireList.remove();
  478.           tsi.numReclaimedResources -= r.originalAmount;
  479.         }
  480.         // do we need to reclaim a resource later? 
  481.         // if no queue is over capacity, there's nothing to reclaim
  482.         if (lastTsi.numRunningTasks <= lastTsi.guaranteedCapacity) {
  483.           continue;
  484.         }
  485.         if (tsi.numRunningTasks < tsi.guaranteedCapacity) {
  486.           // usedCap is how much capacity is currently accounted for
  487.           int usedCap = tsi.numRunningTasks + tsi.numReclaimedResources;
  488.           // see if we have remaining capacity and if we have enough pending 
  489.           // tasks to use up remaining capacity
  490.           if ((usedCap < tsi.guaranteedCapacity) && 
  491.               ((tsi.numPendingTasks - tsi.numReclaimedResources)>0)) {
  492.             // create a request for resources to be reclaimed
  493.             int amt = Math.min((tsi.guaranteedCapacity-usedCap), 
  494.                 (tsi.numPendingTasks - tsi.numReclaimedResources));
  495.             // create a resource object that needs to be reclaimed some time
  496.             // in the future
  497.             long whenToKill = qsi.reclaimTime - 
  498.               (CapacityTaskScheduler.HEARTBEATS_LEFT_BEFORE_KILLING * 
  499.                   nextHeartbeatInterval);
  500.             if (whenToKill < 0) whenToKill = 0;
  501.             tsi.reclaimList.add(new ReclaimedResource(amt, 
  502.                 currentTime + qsi.reclaimTime, 
  503.                 currentTime + whenToKill));
  504.             tsi.numReclaimedResources += amt;
  505.             LOG.debug("Queue " + qsi.queueName + " needs to reclaim " + 
  506.                 amt + " resources");
  507.           }
  508.         }
  509.       }
  510.       // kill tasks to reclaim capacity
  511.       if (0 != tasksToKill) {
  512.         killTasks(tasksToKill);
  513.       }
  514.     }
  515.     // kill 'tasksToKill' tasks 
  516.     private void killTasks(int tasksToKill)
  517.     {
  518.       /* 
  519.        * There are a number of fair ways in which one can figure out how
  520.        * many tasks to kill from which queue, so that the total number of
  521.        * tasks killed is equal to 'tasksToKill'.
  522.        * Maybe the best way is to keep a global ordering of running tasks
  523.        * and kill the ones that ran last, irrespective of what queue or 
  524.        * job they belong to. 
  525.        * What we do here is look at how many tasks is each queue running
  526.        * over capacity, and use that as a weight to decide how many tasks
  527.        * to kill from that queue.
  528.        */ 
  529.       
  530.       // first, find out all queues over capacity
  531.       int loc;
  532.       for (loc=0; loc<qsiForAssigningTasks.size(); loc++) {
  533.         QueueSchedulingInfo qsi = qsiForAssigningTasks.get(loc);
  534.         if (getTSI(qsi).numRunningTasks > getTSI(qsi).guaranteedCapacity) {
  535.           // all queues from here onwards are running over cap
  536.           break;
  537.         }
  538.       }
  539.       // if some queue needs to reclaim cap, there must be at least one queue
  540.       // over cap. But check, just in case. 
  541.       if (loc == qsiForAssigningTasks.size()) {
  542.         LOG.warn("In Capacity scheduler, we need to kill " + tasksToKill + 
  543.             " tasks but there is no queue over capacity.");
  544.         return;
  545.       }
  546.       // calculate how many total tasks are over cap
  547.       int tasksOverCap = 0;
  548.       for (int i=loc; i<qsiForAssigningTasks.size(); i++) {
  549.         QueueSchedulingInfo qsi = qsiForAssigningTasks.get(i);
  550.         tasksOverCap += 
  551.           (getTSI(qsi).numRunningTasks - getTSI(qsi).guaranteedCapacity);
  552.       }
  553.       // now kill tasks from each queue
  554.       for (int i=loc; i<qsiForAssigningTasks.size(); i++) {
  555.         QueueSchedulingInfo qsi = qsiForAssigningTasks.get(i);
  556.         killTasksFromQueue(qsi, (int)Math.round(
  557.             ((double)(getTSI(qsi).numRunningTasks - 
  558.                 getTSI(qsi).guaranteedCapacity))*
  559.             tasksToKill/(double)tasksOverCap));
  560.       }
  561.     }
  562.     // kill 'tasksToKill' tasks from queue represented by qsi
  563.     private void killTasksFromQueue(QueueSchedulingInfo qsi, int tasksToKill) {
  564.       // we start killing as many tasks as possible from the jobs that started
  565.       // last. This way, we let long-running jobs complete faster.
  566.       int tasksKilled = 0;
  567.       JobInProgress jobs[] = scheduler.jobQueuesManager.
  568.         getRunningJobQueue(qsi.queueName).toArray(new JobInProgress[0]);
  569.       for (int i=jobs.length-1; i>=0; i--) {
  570.         if (jobs[i].getStatus().getRunState() != JobStatus.RUNNING) {
  571.           continue;
  572.         }
  573.         tasksKilled += killTasksFromJob(jobs[i], tasksToKill-tasksKilled);
  574.         if (tasksKilled >= tasksToKill) break;
  575.       }
  576.     }
  577.    
  578.     // return the TaskAttemptID of the running task, if any, that has made 
  579.     // the least progress.
  580.     TaskAttemptID getRunningTaskWithLeastProgress(TaskInProgress tip) {
  581.       double leastProgress = 1;
  582.       TaskAttemptID tID = null;
  583.       for (Iterator<TaskAttemptID> it = 
  584.         tip.getActiveTasks().keySet().iterator(); it.hasNext();) {
  585.         TaskAttemptID taskid = it.next();
  586.         TaskStatus status = tip.getTaskStatus(taskid);
  587.         if (status.getRunState() == TaskStatus.State.RUNNING) {
  588.           if (status.getProgress() < leastProgress) {
  589.             leastProgress = status.getProgress();
  590.             tID = taskid;
  591.           }
  592.         }
  593.       }
  594.       return tID;
  595.     }
  596.     
  597.     // called when a task is allocated to queue represented by qsi. 
  598.     // update our info about reclaimed resources
  599.     private synchronized void updateReclaimedResources(QueueSchedulingInfo qsi) {
  600.       TaskSchedulingInfo tsi = getTSI(qsi);
  601.       // if we needed to reclaim resources, we have reclaimed one
  602.       if (tsi.reclaimList.isEmpty()) {
  603.         return;
  604.       }
  605.       ReclaimedResource res = tsi.reclaimList.getFirst();
  606.       res.currentAmount--;
  607.       if (0 == res.currentAmount) {
  608.         // move this resource to the expiry list
  609.         ReclaimedResource r = tsi.reclaimList.remove();
  610.         tsi.reclaimExpireList.add(r);
  611.       }
  612.     }
  613.     private synchronized void updateCollectionOfQSIs() {
  614.       Collections.sort(qsiForAssigningTasks, queueComparator);
  615.     }
  616.     private boolean isUserOverLimit(String user, QueueSchedulingInfo qsi) {
  617.       // what is our current capacity? It's GC if we're running below GC. 
  618.       // If we're running over GC, then its #running plus 1 (which is the 
  619.       // extra slot we're getting). 
  620.       int currentCapacity;
  621.       TaskSchedulingInfo tsi = getTSI(qsi);
  622.       if (tsi.numRunningTasks < tsi.guaranteedCapacity) {
  623.         currentCapacity = tsi.guaranteedCapacity;
  624.       }
  625.       else {
  626.         currentCapacity = tsi.numRunningTasks+1;
  627.       }
  628.       int limit = Math.max((int)(Math.ceil((double)currentCapacity/
  629.           (double)qsi.numJobsByUser.size())), 
  630.           (int)(Math.ceil((double)(qsi.ulMin*currentCapacity)/100.0)));
  631.       if (tsi.numRunningTasksByUser.get(user) >= limit) {
  632.         LOG.debug("User " + user + " is over limit, num running tasks = " + 
  633.             tsi.numRunningTasksByUser.get(user) + ", limit = " + limit);
  634.         return true;
  635.       }
  636.       else {
  637.         return false;
  638.       }
  639.     }
  640.     /*
  641.      * This is the central scheduling method. 
  642.      * It tries to get a task from jobs in a single queue. 
  643.      * Always return a TaskLookupResult object. Don't return null. 
  644.      */
  645.     private TaskLookupResult getTaskFromQueue(TaskTrackerStatus taskTracker,
  646.         QueueSchedulingInfo qsi)
  647.         throws IOException {
  648.       // we only look at jobs in the running queues, as these are the ones
  649.       // who have been potentially initialized
  650.       for (JobInProgress j : 
  651.         scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName)) {
  652.         // only look at jobs that can be run. We ignore jobs that haven't 
  653.         // initialized, or have completed but haven't been removed from the 
  654.         // running queue. 
  655.         if (j.getStatus().getRunState() != JobStatus.RUNNING) {
  656.           continue;
  657.         }
  658.         // check if the job's user is over limit
  659.         if (isUserOverLimit(j.getProfile().getUser(), qsi)) {
  660.           continue;
  661.         }
  662.         if (getPendingTasks(j) != 0) {
  663.           // Not accurate TODO:
  664.           // check if the job's memory requirements are met
  665.           if (scheduler.memoryMatcher.matchesMemoryRequirements(j, taskTracker)) {
  666.             // We found a suitable job. Get task from it.
  667.             Task t = obtainNewTask(taskTracker, j);
  668.             if (t != null) {
  669.               // we're successful in getting a task
  670.               return TaskLookupResult.getTaskFoundResult(t);
  671.             }
  672.           }
  673.           else {
  674.             // mem requirements not met. Rather than look at the next job, 
  675.             // we return nothing to the TT, with the hope that we improve 
  676.             // chances of finding a suitable TT for this job. This lets us
  677.             // avoid starving jobs with high mem requirements.         
  678.             return TaskLookupResult.getMemFailedResult();
  679.           }
  680.         }
  681.         // if we're here, this job has no task to run. Look at the next job.
  682.       }
  683.       // if we're here, we haven't found any task to run among all jobs in 
  684.       // the queue. This could be because there is nothing to run, or that 
  685.       // the user limit for some user is too strict, i.e., there's at least 
  686.       // one user who doesn't have enough tasks to satisfy his limit. If 
  687.       // it's the latter case, re-look at jobs without considering user 
  688.       // limits, and get a task from the first eligible job
  689.       // Note: some of the code from above is repeated here. This is on 
  690.       // purpose as it improves overall readability.  
  691.       // Note: we walk through jobs again. Some of these jobs, which weren't
  692.       // considered in the first pass, shouldn't be considered here again, 
  693.       // but we still check for their viability to keep the code simple. In
  694.       // some cases, for high mem jobs that have nothing to run, we call 
  695.       // obtainNewTask() unnecessarily. Should this be a problem, we can 
  696.       // create a list of jobs to look at (those whose users were over 
  697.       // limit) in the first pass and walk through that list only. 
  698.       for (JobInProgress j : 
  699.         scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName)) {
  700.         if (j.getStatus().getRunState() != JobStatus.RUNNING) {
  701.           continue;
  702.         }
  703.         if (getPendingTasks(j) != 0) {
  704.           // Not accurate TODO:
  705.           // check if the job's memory requirements are met
  706.           if (scheduler.memoryMatcher.matchesMemoryRequirements(j, taskTracker)) {
  707.             // We found a suitable job. Get task from it.
  708.             Task t = obtainNewTask(taskTracker, j);
  709.             if (t != null) {
  710.               // we're successful in getting a task
  711.               return TaskLookupResult.getTaskFoundResult(t);
  712.             }
  713.           }
  714.           else {
  715.             // mem requirements not met. 
  716.             return TaskLookupResult.getMemFailedResult();
  717.           }
  718.         }
  719.         // if we're here, this job has no task to run. Look at the next job.
  720.       }
  721.       // found nothing for this queue, look at the next one.
  722.       String msg = "Found no task from the queue " + qsi.queueName;
  723.       LOG.debug(msg);
  724.       return TaskLookupResult.getNoTaskFoundResult();
  725.     }
  726.     // Always return a TaskLookupResult object. Don't return null. 
  727.     // The caller is responsible for ensuring that the QSI objects and the 
  728.     // collections are up-to-date.
  729.     private TaskLookupResult assignTasks(TaskTrackerStatus taskTracker) throws IOException {
  730.       for (QueueSchedulingInfo qsi : qsiForAssigningTasks) {
  731.         // we may have queues with gc=0. We shouldn't look at jobs from 
  732.         // these queues
  733.         if (0 == getTSI(qsi).guaranteedCapacity) {
  734.           continue;
  735.         }
  736.         TaskLookupResult tlr = getTaskFromQueue(taskTracker, qsi);
  737.         TaskLookupResult.LookUpStatus lookUpStatus = tlr.getLookUpStatus();
  738.         if (lookUpStatus == TaskLookupResult.LookUpStatus.NO_TASK_FOUND) {
  739.           continue; // Look in other queues.
  740.         }
  741.         // if we find a task, return
  742.         if (lookUpStatus == TaskLookupResult.LookUpStatus.TASK_FOUND) {
  743.           // we have a task. Update reclaimed resource info
  744.           updateReclaimedResources(qsi);
  745.           return tlr;
  746.         }
  747.         // if there was a memory mismatch, return
  748.         else if (lookUpStatus == 
  749.           TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT) {
  750.             return tlr;
  751.         }
  752.       }
  753.       // nothing to give
  754.       return TaskLookupResult.getNoTaskFoundResult();
  755.     }
  756.     
  757.     // for debugging.
  758.     private void printQSIs() {
  759.       StringBuffer s = new StringBuffer();
  760.       for (QueueSchedulingInfo qsi: qsiForAssigningTasks) {
  761.         TaskSchedulingInfo tsi = getTSI(qsi);
  762.         Collection<JobInProgress> runJobs = 
  763.           scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName);
  764.         s.append(" Queue '" + qsi.queueName + "'(" + this.type + "): run=" + 
  765.             tsi.numRunningTasks + ", gc=" + tsi.guaranteedCapacity + 
  766.             ", wait=" + tsi.numPendingTasks + ", run jobs="+ runJobs.size() + 
  767.             "*** ");
  768.       }
  769.       LOG.debug(s);
  770.     }
  771.     
  772.   }
  773.   /**
  774.    * The scheduling algorithms for map tasks. 
  775.    */
  776.   private static class MapSchedulingMgr extends TaskSchedulingMgr {
  777.     MapSchedulingMgr(CapacityTaskScheduler dad) {
  778.       super(dad);
  779.       type = TaskSchedulingMgr.TYPE.MAP;
  780.       queueComparator = mapComparator;
  781.     }
  782.     Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job) 
  783.     throws IOException {
  784.       ClusterStatus clusterStatus = 
  785.         scheduler.taskTrackerManager.getClusterStatus();
  786.       int numTaskTrackers = clusterStatus.getTaskTrackers();
  787.       return job.obtainNewMapTask(taskTracker, numTaskTrackers, 
  788.           scheduler.taskTrackerManager.getNumberOfUniqueHosts());
  789.     }
  790.     int getClusterCapacity() {
  791.       return scheduler.taskTrackerManager.getClusterStatus().getMaxMapTasks();
  792.     }
  793.     int getRunningTasks(JobInProgress job) {
  794.       return job.runningMaps();
  795.     }
  796.     int getPendingTasks(JobInProgress job) {
  797.       return job.pendingMaps();
  798.     }
  799.     int killTasksFromJob(JobInProgress job, int tasksToKill) {
  800.       /*
  801.        * We'd like to kill tasks that ran the last, or that have made the
  802.        * least progress.
  803.        * Ideally, each job would have a list of tasks, sorted by start 
  804.        * time or progress. That's a lot of state to keep, however. 
  805.        * For now, we do something a little different. We first try and kill
  806.        * non-local tasks, as these can be run anywhere. For each TIP, we 
  807.        * kill the task that has made the least progress, if the TIP has
  808.        * more than one active task. 
  809.        * We then look at tasks in runningMapCache.
  810.        */
  811.       int tasksKilled = 0;
  812.       
  813.       /* 
  814.        * For non-local running maps, we 'cheat' a bit. We know that the set
  815.        * of non-local running maps has an insertion order such that tasks 
  816.        * that ran last are at the end. So we iterate through the set in 
  817.        * reverse. This is OK because even if the implementation changes, 
  818.        * we're still using generic set iteration and are no worse of.
  819.        */ 
  820.       TaskInProgress[] tips = 
  821.         job.getNonLocalRunningMaps().toArray(new TaskInProgress[0]);
  822.       for (int i=tips.length-1; i>=0; i--) {
  823.         // pick the tast attempt that has progressed least
  824.         TaskAttemptID tid = getRunningTaskWithLeastProgress(tips[i]);
  825.         if (null != tid) {
  826.           if (tips[i].killTask(tid, false)) {
  827.             if (++tasksKilled >= tasksToKill) {
  828.               return tasksKilled;
  829.             }
  830.           }
  831.         }
  832.       }
  833.       // now look at other running tasks
  834.       for (Set<TaskInProgress> s: job.getRunningMapCache().values()) {
  835.         for (TaskInProgress tip: s) {
  836.           TaskAttemptID tid = getRunningTaskWithLeastProgress(tip);
  837.           if (null != tid) {
  838.             if (tip.killTask(tid, false)) {
  839.               if (++tasksKilled >= tasksToKill) {
  840.                 return tasksKilled;
  841.               }
  842.             }
  843.           }
  844.         }
  845.       }
  846.       return tasksKilled;
  847.     }
  848.     TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi) {
  849.       return qsi.mapTSI;
  850.     }
  851.   }
  852.   /**
  853.    * The scheduling algorithms for reduce tasks. 
  854.    */
  855.   private static class ReduceSchedulingMgr extends TaskSchedulingMgr {
  856.     ReduceSchedulingMgr(CapacityTaskScheduler dad) {
  857.       super(dad);
  858.       type = TaskSchedulingMgr.TYPE.REDUCE;
  859.       queueComparator = reduceComparator;
  860.     }
  861.     Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job) 
  862.     throws IOException {
  863.       ClusterStatus clusterStatus = 
  864.         scheduler.taskTrackerManager.getClusterStatus();
  865.       int numTaskTrackers = clusterStatus.getTaskTrackers();
  866.       return job.obtainNewReduceTask(taskTracker, numTaskTrackers, 
  867.           scheduler.taskTrackerManager.getNumberOfUniqueHosts());
  868.     }
  869.     int getClusterCapacity() {
  870.       return scheduler.taskTrackerManager.getClusterStatus().getMaxReduceTasks();
  871.     }
  872.     int getRunningTasks(JobInProgress job) {
  873.       return job.runningReduces();
  874.     }
  875.     int getPendingTasks(JobInProgress job) {
  876.       return job.pendingReduces();
  877.     }
  878.     int killTasksFromJob(JobInProgress job, int tasksToKill) {
  879.       /* 
  880.        * For reduces, we 'cheat' a bit. We know that the set
  881.        * of running reduces has an insertion order such that tasks 
  882.        * that ran last are at the end. So we iterate through the set in 
  883.        * reverse. This is OK because even if the implementation changes, 
  884.        * we're still using generic set iteration and are no worse of.
  885.        */ 
  886.       int tasksKilled = 0;
  887.       TaskInProgress[] tips = 
  888.         job.getRunningReduces().toArray(new TaskInProgress[0]);
  889.       for (int i=tips.length-1; i>=0; i--) {
  890.         // pick the tast attempt that has progressed least
  891.         TaskAttemptID tid = getRunningTaskWithLeastProgress(tips[i]);
  892.         if (null != tid) {
  893.           if (tips[i].killTask(tid, false)) {
  894.             if (++tasksKilled >= tasksToKill) {
  895.               return tasksKilled;
  896.             }
  897.           }
  898.         }
  899.       }
  900.       return tasksKilled;
  901.     }
  902.     TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi) {
  903.       return qsi.reduceTSI;
  904.     }
  905.   }
  906.   
  907.   /** the scheduling mgrs for Map and Reduce tasks */ 
  908.   protected TaskSchedulingMgr mapScheduler = new MapSchedulingMgr(this);
  909.   protected TaskSchedulingMgr reduceScheduler = new ReduceSchedulingMgr(this);
  910.   MemoryMatcher memoryMatcher = new MemoryMatcher(this);
  911.   /** we keep track of the number of map/reduce slots we saw last */
  912.   private int prevMapClusterCapacity = 0;
  913.   private int prevReduceClusterCapacity = 0;
  914.   
  915.   /** name of the default queue. */ 
  916.   static final String DEFAULT_QUEUE_NAME = "default";
  917.   
  918.   /** how often does redistribution thread run (in msecs)*/
  919.   private static long RECLAIM_CAPACITY_INTERVAL;
  920.   /** we start killing tasks to reclaim capacity when we have so many 
  921.    * heartbeats left. */
  922.   private static final int HEARTBEATS_LEFT_BEFORE_KILLING = 3;
  923.   static final Log LOG = LogFactory.getLog(CapacityTaskScheduler.class);
  924.   protected JobQueuesManager jobQueuesManager;
  925.   protected CapacitySchedulerConf schedConf;
  926.   /** whether scheduler has started or not */
  927.   private boolean started = false;
  928.   
  929.   /**
  930.    * Used to distribute/reclaim excess capacity among queues
  931.    */ 
  932.   class ReclaimCapacity implements Runnable {
  933.     public ReclaimCapacity() {
  934.     }
  935.     public void run() {
  936.       while (true) {
  937.         try {
  938.           Thread.sleep(RECLAIM_CAPACITY_INTERVAL);
  939.           if (stopReclaim) { 
  940.             break;
  941.           }
  942.           reclaimCapacity();
  943.         } catch (InterruptedException t) {
  944.           break;
  945.         } catch (Throwable t) {
  946.           LOG.error("Error in redistributing capacity:n" +
  947.                     StringUtils.stringifyException(t));
  948.         }
  949.       }
  950.     }
  951.   }
  952.   private Thread reclaimCapacityThread = null;
  953.   /** variable to indicate that thread should stop */
  954.   private boolean stopReclaim = false;
  955.   /**
  956.    * A clock class - can be mocked out for testing.
  957.    */
  958.   static class Clock {
  959.     long getTime() {
  960.       return System.currentTimeMillis();
  961.     }
  962.   }
  963.   private Clock clock;
  964.   private JobInitializationPoller initializationPoller;
  965.   long limitMaxVmemForTasks;
  966.   long limitMaxPmemForTasks;
  967.   long defaultMaxVmPerTask;
  968.   float defaultPercentOfPmemInVmem;
  969.   public CapacityTaskScheduler() {
  970.     this(new Clock());
  971.   }
  972.   
  973.   // for testing
  974.   public CapacityTaskScheduler(Clock clock) {
  975.     this.jobQueuesManager = new JobQueuesManager(this);
  976.     this.clock = clock;
  977.   }
  978.   
  979.   /** mostly for testing purposes */
  980.   public void setResourceManagerConf(CapacitySchedulerConf conf) {
  981.     this.schedConf = conf;
  982.   }
  983.   /**
  984.    * Normalize the negative values in configuration
  985.    * 
  986.    * @param val
  987.    * @return normalized value
  988.    */
  989.   private long normalizeMemoryConfigValue(long val) {
  990.     if (val < 0) {
  991.       val = JobConf.DISABLED_MEMORY_LIMIT;
  992.     }
  993.     return val;
  994.   }
  995.   private void initializeMemoryRelatedConf() {
  996.     limitMaxVmemForTasks =
  997.         normalizeMemoryConfigValue(conf.getLong(
  998.             JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
  999.             JobConf.DISABLED_MEMORY_LIMIT));
  1000.     limitMaxPmemForTasks =
  1001.         normalizeMemoryConfigValue(schedConf.getLimitMaxPmemForTasks());
  1002.     defaultMaxVmPerTask =
  1003.         normalizeMemoryConfigValue(conf.getLong(
  1004.             JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
  1005.             JobConf.DISABLED_MEMORY_LIMIT));
  1006.     defaultPercentOfPmemInVmem = schedConf.getDefaultPercentOfPmemInVmem();
  1007.     if (defaultPercentOfPmemInVmem < 0) {
  1008.       defaultPercentOfPmemInVmem = JobConf.DISABLED_MEMORY_LIMIT;
  1009.     }
  1010.   }
  1011.   @Override
  1012.   public synchronized void start() throws IOException {
  1013.     if (started) return;
  1014.     super.start();
  1015.     // initialize our queues from the config settings
  1016.     if (null == schedConf) {
  1017.       schedConf = new CapacitySchedulerConf();
  1018.     }
  1019.     initializeMemoryRelatedConf();
  1020.     
  1021.     RECLAIM_CAPACITY_INTERVAL = schedConf.getReclaimCapacityInterval();
  1022.     RECLAIM_CAPACITY_INTERVAL *= 1000;
  1023.     // read queue info from config file
  1024.     QueueManager queueManager = taskTrackerManager.getQueueManager();
  1025.     Set<String> queues = queueManager.getQueues();
  1026.     // Sanity check: there should be at least one queue. 
  1027.     if (0 == queues.size()) {
  1028.       throw new IllegalStateException("System has no queue configured");
  1029.     }
  1030.     Set<String> queuesWithoutConfiguredGC = new HashSet<String>();
  1031.     float totalCapacity = 0.0f;
  1032.     for (String queueName: queues) {
  1033.       float gc = schedConf.getGuaranteedCapacity(queueName); 
  1034.       if(gc == -1.0) {
  1035.         queuesWithoutConfiguredGC.add(queueName);
  1036.       }else {
  1037.         totalCapacity += gc;
  1038.       }
  1039.       int ulMin = schedConf.getMinimumUserLimitPercent(queueName); 
  1040.       long reclaimTimeLimit = schedConf.getReclaimTimeLimit(queueName) * 1000;
  1041.       // create our QSI and add to our hashmap
  1042.       QueueSchedulingInfo qsi = new QueueSchedulingInfo(queueName, gc, 
  1043.           ulMin, reclaimTimeLimit, jobQueuesManager);
  1044.       queueInfoMap.put(queueName, qsi);
  1045.       // create the queues of job objects
  1046.       boolean supportsPrio = schedConf.isPrioritySupported(queueName);
  1047.       jobQueuesManager.createQueue(queueName, supportsPrio);
  1048.       
  1049.       SchedulingDisplayInfo schedulingInfo = 
  1050.         new SchedulingDisplayInfo(queueName, this);
  1051.       queueManager.setSchedulerInfo(queueName, schedulingInfo);
  1052.       
  1053.     }
  1054.     float remainingQuantityToAllocate = 100 - totalCapacity;
  1055.     float quantityToAllocate = 
  1056.       remainingQuantityToAllocate/queuesWithoutConfiguredGC.size();
  1057.     for(String queue: queuesWithoutConfiguredGC) {
  1058.       QueueSchedulingInfo qsi = queueInfoMap.get(queue); 
  1059.       qsi.guaranteedCapacityPercent = quantityToAllocate;
  1060.       schedConf.setGuaranteedCapacity(queue, quantityToAllocate);
  1061.     }    
  1062.     
  1063.     // check if there's a queue with the default name. If not, we quit.
  1064.     if (!queueInfoMap.containsKey(DEFAULT_QUEUE_NAME)) {
  1065.       throw new IllegalStateException("System has no default queue configured");
  1066.     }
  1067.     if (totalCapacity > 100.0) {
  1068.       throw new IllegalArgumentException("Sum of queue capacities over 100% at "
  1069.                                          + totalCapacity);
  1070.     }    
  1071.     
  1072.     // let our mgr objects know about the queues
  1073.     mapScheduler.initialize(queueInfoMap);
  1074.     reduceScheduler.initialize(queueInfoMap);
  1075.     
  1076.     // listen to job changes
  1077.     taskTrackerManager.addJobInProgressListener(jobQueuesManager);
  1078.     //Start thread for initialization
  1079.     if (initializationPoller == null) {
  1080.       this.initializationPoller = new JobInitializationPoller(
  1081.           jobQueuesManager,schedConf,queues);
  1082.     }
  1083.     initializationPoller.init(queueManager.getQueues(), schedConf);
  1084.     initializationPoller.setDaemon(true);
  1085.     initializationPoller.start();
  1086.     // start thread for redistributing capacity if we have more than 
  1087.     // one queue
  1088.     if (queueInfoMap.size() > 1) {
  1089.       this.reclaimCapacityThread = 
  1090.         new Thread(new ReclaimCapacity(),"reclaimCapacity");
  1091.       this.reclaimCapacityThread.start();
  1092.     }
  1093.     else {
  1094.       LOG.info("Only one queue present. Reclaim capacity thread not started.");
  1095.     }
  1096.     
  1097.     started = true;
  1098.     LOG.info("Capacity scheduler initialized " + queues.size() + " queues");  }
  1099.   
  1100.   /** mostly for testing purposes */
  1101.   void setInitializationPoller(JobInitializationPoller p) {
  1102.     this.initializationPoller = p;
  1103.   }
  1104.   
  1105.   @Override
  1106.   public synchronized void terminate() throws IOException {
  1107.     if (!started) return;
  1108.     if (jobQueuesManager != null) {
  1109.       taskTrackerManager.removeJobInProgressListener(
  1110.           jobQueuesManager);
  1111.     }
  1112.     // tell the reclaim thread to stop
  1113.     stopReclaim = true;
  1114.     started = false;
  1115.     initializationPoller.terminate();
  1116.     super.terminate();
  1117.   }
  1118.   
  1119.   @Override
  1120.   public synchronized void setConf(Configuration conf) {
  1121.     super.setConf(conf);
  1122.   }
  1123.   /**
  1124.    * Reclaim capacity for both map & reduce tasks. 
  1125.    * Do not make this synchronized, since we call taskTrackerManager 
  1126.    * (see HADOOP-4977). 
  1127.    */
  1128.   void reclaimCapacity() {
  1129.     // get the cluster capacity
  1130.     ClusterStatus c = taskTrackerManager.getClusterStatus();
  1131.     int mapClusterCapacity = c.getMaxMapTasks();
  1132.     int reduceClusterCapacity = c.getMaxReduceTasks();
  1133.     int nextHeartbeatInterval = taskTrackerManager.getNextHeartbeatInterval();
  1134.     // update the QSI objects
  1135.     updateQSIObjects(mapClusterCapacity, reduceClusterCapacity);
  1136.     // update the qsi collections, since we depend on their ordering 
  1137.     mapScheduler.updateCollectionOfQSIs();
  1138.     reduceScheduler.updateCollectionOfQSIs();
  1139.     // now, reclaim
  1140.     mapScheduler.reclaimCapacity(nextHeartbeatInterval);
  1141.     reduceScheduler.reclaimCapacity(nextHeartbeatInterval);
  1142.   }
  1143.   
  1144.   /**
  1145.    * provided for the test classes
  1146.    * lets you update the QSI objects and sorted collections
  1147.    */ 
  1148.   void updateQSIInfoForTests() {
  1149.     ClusterStatus c = taskTrackerManager.getClusterStatus();
  1150.     int mapClusterCapacity = c.getMaxMapTasks();
  1151.     int reduceClusterCapacity = c.getMaxReduceTasks();
  1152.     // update the QSI objects
  1153.     updateQSIObjects(mapClusterCapacity, reduceClusterCapacity);
  1154.     mapScheduler.updateCollectionOfQSIs();
  1155.     reduceScheduler.updateCollectionOfQSIs();
  1156.   }
  1157.   /**
  1158.    * Update individual QSI objects.
  1159.    * We don't need exact information for all variables, just enough for us
  1160.    * to make scheduling decisions. For example, we don't need an exact count
  1161.    * of numRunningTasks. Once we count upto the grid capacity, any
  1162.    * number beyond that will make no difference.
  1163.    * 
  1164.    * The pending task count is only required in reclaim capacity. So 
  1165.    * if the computation becomes expensive, we can add a boolean to 
  1166.    * denote if pending task computation is required or not.
  1167.    * 
  1168.    **/
  1169.   private synchronized void updateQSIObjects(int mapClusterCapacity, 
  1170.       int reduceClusterCapacity) {
  1171.     // if # of slots have changed since last time, update. 
  1172.     // First, compute whether the total number of TT slots have changed
  1173.     for (QueueSchedulingInfo qsi: queueInfoMap.values()) {
  1174.       // compute new GCs, if TT slots have changed
  1175.       if (mapClusterCapacity != prevMapClusterCapacity) {
  1176.         qsi.mapTSI.guaranteedCapacity =
  1177.           (int)(qsi.guaranteedCapacityPercent*mapClusterCapacity/100);
  1178.       }
  1179.       if (reduceClusterCapacity != prevReduceClusterCapacity) {
  1180.         qsi.reduceTSI.guaranteedCapacity =
  1181.           (int)(qsi.guaranteedCapacityPercent*reduceClusterCapacity/100);
  1182.       }
  1183.       // reset running/pending tasks, tasks per user
  1184.       qsi.mapTSI.resetTaskVars();
  1185.       qsi.reduceTSI.resetTaskVars();
  1186.       // update stats on running jobs
  1187.       for (JobInProgress j: 
  1188.         jobQueuesManager.getRunningJobQueue(qsi.queueName)) {
  1189.         if (j.getStatus().getRunState() != JobStatus.RUNNING) {
  1190.           continue;
  1191.         }
  1192.         int runningMaps = j.runningMaps();
  1193.         int runningReduces = j.runningReduces();
  1194.         qsi.mapTSI.numRunningTasks += runningMaps;
  1195.         qsi.reduceTSI.numRunningTasks += runningReduces;
  1196.         Integer i = 
  1197.           qsi.mapTSI.numRunningTasksByUser.get(j.getProfile().getUser());
  1198.         qsi.mapTSI.numRunningTasksByUser.put(j.getProfile().getUser(), 
  1199.             i+runningMaps);
  1200.         i = qsi.reduceTSI.numRunningTasksByUser.get(j.getProfile().getUser());
  1201.         qsi.reduceTSI.numRunningTasksByUser.put(j.getProfile().getUser(), 
  1202.             i+runningReduces);
  1203.         qsi.mapTSI.numPendingTasks += j.pendingMaps();
  1204.         qsi.reduceTSI.numPendingTasks += j.pendingReduces();
  1205.         LOG.debug("updateQSI: job " + j.getJobID().toString() + ": run(m) = " +
  1206.             j.runningMaps() + ", run(r) = " + j.runningReduces() + 
  1207.             ", finished(m) = " + j.finishedMaps() + ", finished(r)= " + 
  1208.             j.finishedReduces() + ", failed(m) = " + j.failedMapTasks + 
  1209.             ", failed(r) = " + j.failedReduceTasks + ", spec(m) = " + 
  1210.             j.speculativeMapTasks + ", spec(r) = " + j.speculativeReduceTasks 
  1211.             + ", total(m) = " + j.numMapTasks + ", total(r) = " + 
  1212.             j.numReduceTasks);
  1213.         /* 
  1214.          * it's fine walking down the entire list of running jobs - there
  1215.          * probably will not be many, plus, we may need to go through the
  1216.          * list to compute numRunningTasksByUser. If this is expensive, we
  1217.          * can keep a list of running jobs per user. Then we only need to
  1218.          * consider the first few jobs per user.
  1219.          */ 
  1220.       }
  1221.       
  1222.       //update stats on waiting jobs
  1223.       for(JobInProgress j: jobQueuesManager.getWaitingJobs(qsi.queueName)) {
  1224.         // pending tasks
  1225.         if ((qsi.mapTSI.numPendingTasks > mapClusterCapacity) &&
  1226.             (qsi.reduceTSI.numPendingTasks > reduceClusterCapacity)) {
  1227.           // that's plenty. no need for more computation
  1228.           break;
  1229.         }
  1230.         /*
  1231.          * Consider only the waiting jobs in the job queue. Job queue can
  1232.          * contain:
  1233.          * 1. Jobs which are in running state but not scheduled
  1234.          * (these would also be present in running queue), the pending 
  1235.          * task count of these jobs is computed when scheduler walks
  1236.          * through running job queue.
  1237.          * 2. Jobs which are killed by user, but waiting job initialization
  1238.          * poller to walk through the job queue to clean up killed jobs.
  1239.          */
  1240.         if (j.getStatus().getRunState() == JobStatus.PREP) {
  1241.           qsi.mapTSI.numPendingTasks += j.pendingMaps();
  1242.           qsi.reduceTSI.numPendingTasks += j.pendingReduces();
  1243.         }
  1244.       }
  1245.     }
  1246.     
  1247.     prevMapClusterCapacity = mapClusterCapacity;
  1248.     prevReduceClusterCapacity = reduceClusterCapacity;
  1249.   }
  1250.   /* 
  1251.    * The grand plan for assigning a task. 
  1252.    * First, decide whether a Map or Reduce task should be given to a TT 
  1253.    * (if the TT can accept either). 
  1254.    * Next, pick a queue. We only look at queues that need a slot. Among
  1255.    * these, we first look at queues whose ac is less than gc (queues that 
  1256.    * gave up capacity in the past). Next, we look at any other queue that
  1257.    * needs a slot. 
  1258.    * Next, pick a job in a queue. we pick the job at the front of the queue
  1259.    * unless its user is over the user limit. 
  1260.    * Finally, given a job, pick a task from the job. 
  1261.    *  
  1262.    */
  1263.   @Override
  1264.   public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)
  1265.       throws IOException {
  1266.     
  1267.     TaskLookupResult tlr;
  1268.     /* 
  1269.      * If TT has Map and Reduce slot free, we need to figure out whether to
  1270.      * give it a Map or Reduce task.
  1271.      * Number of ways to do this. For now, base decision on how much is needed
  1272.      * versus how much is used (default to Map, if equal).
  1273.      */
  1274.     ClusterStatus c = taskTrackerManager.getClusterStatus();
  1275.     int mapClusterCapacity = c.getMaxMapTasks();
  1276.     int reduceClusterCapacity = c.getMaxReduceTasks();
  1277.     int maxMapTasks = taskTracker.getMaxMapTasks();
  1278.     int currentMapTasks = taskTracker.countMapTasks();
  1279.     int maxReduceTasks = taskTracker.getMaxReduceTasks();
  1280.     int currentReduceTasks = taskTracker.countReduceTasks();
  1281.     LOG.debug("TT asking for task, max maps=" + taskTracker.getMaxMapTasks() + 
  1282.         ", run maps=" + taskTracker.countMapTasks() + ", max reds=" + 
  1283.         taskTracker.getMaxReduceTasks() + ", run reds=" + 
  1284.         taskTracker.countReduceTasks() + ", map cap=" + 
  1285.         mapClusterCapacity + ", red cap = " + 
  1286.         reduceClusterCapacity);
  1287.     /* 
  1288.      * update all our QSI objects.
  1289.      * This involves updating each qsi structure. This operation depends
  1290.      * on the number of running jobs in a queue, and some waiting jobs. If it
  1291.      * becomes expensive, do it once every few heartbeats only.
  1292.      */ 
  1293.     updateQSIObjects(mapClusterCapacity, reduceClusterCapacity);
  1294.     // make sure we get our map or reduce scheduling object to update its 
  1295.     // collection of QSI objects too. 
  1296.     if ((maxReduceTasks - currentReduceTasks) > 
  1297.     (maxMapTasks - currentMapTasks)) {
  1298.       // get a reduce task first
  1299.       reduceScheduler.updateCollectionOfQSIs();
  1300.       tlr = reduceScheduler.assignTasks(taskTracker);
  1301.       if (TaskLookupResult.LookUpStatus.TASK_FOUND == 
  1302.         tlr.getLookUpStatus()) {
  1303.         // found a task; return
  1304.         return Collections.singletonList(tlr.getTask());
  1305.       }
  1306.       else if (TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT == 
  1307.         tlr.getLookUpStatus()) {
  1308.         // return no task
  1309.         return null;
  1310.       }
  1311.       // if we didn't get any, look at map tasks, if TT has space
  1312.       else if ((TaskLookupResult.LookUpStatus.NO_TASK_FOUND == 
  1313.         tlr.getLookUpStatus()) && (maxMapTasks > currentMapTasks)) {
  1314.         mapScheduler.updateCollectionOfQSIs();
  1315.         tlr = mapScheduler.assignTasks(taskTracker);
  1316.         if (TaskLookupResult.LookUpStatus.TASK_FOUND == 
  1317.           tlr.getLookUpStatus()) {
  1318.           return Collections.singletonList(tlr.getTask());
  1319.         }
  1320.       }
  1321.     }
  1322.     else {
  1323.       // get a map task first
  1324.       mapScheduler.updateCollectionOfQSIs();
  1325.       tlr = mapScheduler.assignTasks(taskTracker);
  1326.       if (TaskLookupResult.LookUpStatus.TASK_FOUND == 
  1327.         tlr.getLookUpStatus()) {
  1328.         // found a task; return
  1329.         return Collections.singletonList(tlr.getTask());
  1330.       }
  1331.       else if (TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT == 
  1332.         tlr.getLookUpStatus()) {
  1333.         return null;
  1334.       }
  1335.       // if we didn't get any, look at reduce tasks, if TT has space
  1336.       else if ((TaskLookupResult.LookUpStatus.NO_TASK_FOUND == 
  1337.         tlr.getLookUpStatus()) && (maxReduceTasks > currentReduceTasks)) {
  1338.         reduceScheduler.updateCollectionOfQSIs();
  1339.         tlr = reduceScheduler.assignTasks(taskTracker);
  1340.         if (TaskLookupResult.LookUpStatus.TASK_FOUND == 
  1341.           tlr.getLookUpStatus()) {
  1342.           return Collections.singletonList(tlr.getTask());
  1343.         }
  1344.       }
  1345.     }
  1346.     return null;
  1347.   }
  1348.   /**
  1349.    * Kill the job if it has invalid requirements and return why it is killed
  1350.    * 
  1351.    * @param job
  1352.    * @return string mentioning why the job is killed. Null if the job has valid
  1353.    *         requirements.
  1354.    */
  1355.   private String killJobIfInvalidRequirements(JobInProgress job) {
  1356.     if (!memoryMatcher.isSchedulingBasedOnVmemEnabled()) {
  1357.       return null;
  1358.     }
  1359.     if ((job.getMaxVirtualMemoryForTask() > limitMaxVmemForTasks)
  1360.         || (memoryMatcher.isSchedulingBasedOnPmemEnabled() && (job
  1361.             .getMaxPhysicalMemoryForTask() > limitMaxPmemForTasks))) {
  1362.       String msg =
  1363.           job.getJobID() + " (" + job.getMaxVirtualMemoryForTask() + "vmem, "
  1364.               + job.getMaxPhysicalMemoryForTask()
  1365.               + "pmem) exceeds the cluster's max-memory-limits ("
  1366.               + limitMaxVmemForTasks + "vmem, " + limitMaxPmemForTasks
  1367.               + "pmem). Cannot run in this cluster, so killing it.";
  1368.       LOG.warn(msg);
  1369.       try {
  1370.         taskTrackerManager.killJob(job.getJobID());
  1371.         return msg;
  1372.       } catch (IOException ioe) {
  1373.         LOG.warn("Failed to kill the job " + job.getJobID() + ". Reason : "
  1374.             + StringUtils.stringifyException(ioe));
  1375.       }
  1376.     }
  1377.     return null;
  1378.   }
  1379.   // called when a job is added
  1380.   synchronized void jobAdded(JobInProgress job) throws IOException {
  1381.     QueueSchedulingInfo qsi = 
  1382.       queueInfoMap.get(job.getProfile().getQueueName());
  1383.     // qsi shouldn't be null
  1384.     // update user-specific info
  1385.     Integer i = qsi.numJobsByUser.get(job.getProfile().getUser());
  1386.     if (null == i) {
  1387.       i = 1;
  1388.       // set the count for running tasks to 0
  1389.       qsi.mapTSI.numRunningTasksByUser.put(job.getProfile().getUser(), 0);
  1390.       qsi.reduceTSI.numRunningTasksByUser.put(job.getProfile().getUser(), 0);
  1391.     }
  1392.     else {
  1393.       i++;
  1394.     }
  1395.     qsi.numJobsByUser.put(job.getProfile().getUser(), i);
  1396.     LOG.debug("Job " + job.getJobID().toString() + " is added under user " 
  1397.               + job.getProfile().getUser() + ", user now has " + i + " jobs");
  1398.     // Kill the job if it cannot run in the cluster because of invalid
  1399.     // resource requirements.
  1400.     String statusMsg = killJobIfInvalidRequirements(job);
  1401.     if (statusMsg != null) {
  1402.       throw new IOException(statusMsg);
  1403.     }
  1404.   }
  1405.   // called when a job completes
  1406.   synchronized void jobCompleted(JobInProgress job) {
  1407.     QueueSchedulingInfo qsi = 
  1408.       queueInfoMap.get(job.getProfile().getQueueName());
  1409.     // qsi shouldn't be null
  1410.     // update numJobsByUser
  1411.     LOG.debug("JOb to be removed for user " + job.getProfile().getUser());
  1412.     Integer i = qsi.numJobsByUser.get(job.getProfile().getUser());
  1413.     i--;
  1414.     if (0 == i.intValue()) {
  1415.       qsi.numJobsByUser.remove(job.getProfile().getUser());
  1416.       // remove job footprint from our TSIs
  1417.       qsi.mapTSI.numRunningTasksByUser.remove(job.getProfile().getUser());
  1418.       qsi.reduceTSI.numRunningTasksByUser.remove(job.getProfile().getUser());
  1419.       LOG.debug("No more jobs for user, number of users = " + qsi.numJobsByUser.size());
  1420.     }
  1421.     else {
  1422.       qsi.numJobsByUser.put(job.getProfile().getUser(), i);
  1423.       LOG.debug("User still has " + i + " jobs, number of users = "
  1424.                 + qsi.numJobsByUser.size());
  1425.     }
  1426.   }
  1427.   
  1428.   @Override
  1429.   public synchronized Collection<JobInProgress> getJobs(String queueName) {
  1430.     Collection<JobInProgress> jobCollection = new ArrayList<JobInProgress>();
  1431.     Collection<JobInProgress> runningJobs = 
  1432.         jobQueuesManager.getRunningJobQueue(queueName);
  1433.     if (runningJobs != null) {
  1434.       jobCollection.addAll(runningJobs);
  1435.     }
  1436.     Collection<JobInProgress> waitingJobs = 
  1437.       jobQueuesManager.getWaitingJobs(queueName);
  1438.     Collection<JobInProgress> tempCollection = new ArrayList<JobInProgress>();
  1439.     if(waitingJobs != null) {
  1440.       tempCollection.addAll(waitingJobs);
  1441.     }
  1442.     tempCollection.removeAll(runningJobs);
  1443.     if(!tempCollection.isEmpty()) {
  1444.       jobCollection.addAll(tempCollection);
  1445.     }
  1446.     return jobCollection;
  1447.   }
  1448.   
  1449.   JobInitializationPoller getInitializationPoller() {
  1450.     return initializationPoller;
  1451.   }
  1452.   synchronized String getDisplayInfo(String queueName) {
  1453.     QueueSchedulingInfo qsi = queueInfoMap.get(queueName);
  1454.     if (null == qsi) { 
  1455.       return null;
  1456.     }
  1457.     return qsi.toString();
  1458.   }
  1459. }