FairScheduler.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:29k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.IOException;
  20. import java.util.ArrayList;
  21. import java.util.Collection;
  22. import java.util.Collections;
  23. import java.util.Comparator;
  24. import java.util.HashMap;
  25. import java.util.HashSet;
  26. import java.util.Iterator;
  27. import java.util.List;
  28. import java.util.Map;
  29. import java.util.Set;
  30. import java.util.Map.Entry;
  31. import org.apache.commons.logging.Log;
  32. import org.apache.commons.logging.LogFactory;
  33. import org.apache.hadoop.conf.Configuration;
  34. import org.apache.hadoop.http.HttpServer;
  35. import org.apache.hadoop.mapred.JobStatus;
  36. import org.apache.hadoop.util.ReflectionUtils;
  37. /**
  38.  * A {@link TaskScheduler} that implements fair sharing.
  39.  */
  40. public class FairScheduler extends TaskScheduler {
  41.   /** How often fair shares are re-calculated */
  42.   public static final long UPDATE_INTERVAL = 500;
  43.   public static final Log LOG = LogFactory.getLog(
  44.       "org.apache.hadoop.mapred.FairScheduler");
  45.   
  46.   protected PoolManager poolMgr;
  47.   protected LoadManager loadMgr;
  48.   protected TaskSelector taskSelector;
  49.   protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster
  50.   protected Map<JobInProgress, JobInfo> infos = // per-job scheduling variables
  51.     new HashMap<JobInProgress, JobInfo>();
  52.   protected long lastUpdateTime;           // Time when we last updated infos
  53.   protected boolean initialized;  // Are we initialized?
  54.   protected volatile boolean running; // Are we running?
  55.   protected boolean useFifo;      // Set if we want to revert to FIFO behavior
  56.   protected boolean assignMultiple; // Simultaneously assign map and reduce?
  57.   protected boolean sizeBasedWeight; // Give larger weights to larger jobs
  58.   protected boolean waitForMapsBeforeLaunchingReduces = true;
  59.   private Clock clock;
  60.   private boolean runBackgroundUpdates; // Can be set to false for testing
  61.   private EagerTaskInitializationListener eagerInitListener;
  62.   private JobListener jobListener;
  63.   
  64.   /**
  65.    * A class for holding per-job scheduler variables. These always contain the
  66.    * values of the variables at the last update(), and are used along with a
  67.    * time delta to update the map and reduce deficits before a new update().
  68.    */
  69.   static class JobInfo {
  70.     boolean runnable = false;   // Can the job run given user/pool limits?
  71.     double mapWeight = 0;       // Weight of job in calculation of map share
  72.     double reduceWeight = 0;    // Weight of job in calculation of reduce share
  73.     long mapDeficit = 0;        // Time deficit for maps
  74.     long reduceDeficit = 0;     // Time deficit for reduces
  75.     int runningMaps = 0;        // Maps running at last update
  76.     int runningReduces = 0;     // Reduces running at last update
  77.     int neededMaps;             // Maps needed at last update
  78.     int neededReduces;          // Reduces needed at last update
  79.     int minMaps = 0;            // Minimum maps as guaranteed by pool
  80.     int minReduces = 0;         // Minimum reduces as guaranteed by pool
  81.     double mapFairShare = 0;    // Fair share of map slots at last update
  82.     double reduceFairShare = 0; // Fair share of reduce slots at last update
  83.   }
  84.   
  85.   /**
  86.    * A clock class - can be mocked out for testing.
  87.    */
  88.   static class Clock {
  89.     long getTime() {
  90.       return System.currentTimeMillis();
  91.     }
  92.   }
  93.   
  94.   public FairScheduler() {
  95.     this(new Clock(), true);
  96.   }
  97.   
  98.   /**
  99.    * Constructor used for tests, which can change the clock and disable updates.
  100.    */
  101.   protected FairScheduler(Clock clock, boolean runBackgroundUpdates) {
  102.     this.clock = clock;
  103.     this.runBackgroundUpdates = runBackgroundUpdates;
  104.     this.jobListener = new JobListener();
  105.   }
  106.   @Override
  107.   public void start() {
  108.     try {
  109.       Configuration conf = getConf();
  110.       this.eagerInitListener = new EagerTaskInitializationListener(conf);
  111.       eagerInitListener.start();
  112.       taskTrackerManager.addJobInProgressListener(eagerInitListener);
  113.       taskTrackerManager.addJobInProgressListener(jobListener);
  114.       poolMgr = new PoolManager(conf);
  115.       loadMgr = (LoadManager) ReflectionUtils.newInstance(
  116.           conf.getClass("mapred.fairscheduler.loadmanager", 
  117.               CapBasedLoadManager.class, LoadManager.class), conf);
  118.       loadMgr.setTaskTrackerManager(taskTrackerManager);
  119.       loadMgr.start();
  120.       taskSelector = (TaskSelector) ReflectionUtils.newInstance(
  121.           conf.getClass("mapred.fairscheduler.taskselector", 
  122.               DefaultTaskSelector.class, TaskSelector.class), conf);
  123.       taskSelector.setTaskTrackerManager(taskTrackerManager);
  124.       taskSelector.start();
  125.       Class<?> weightAdjClass = conf.getClass(
  126.           "mapred.fairscheduler.weightadjuster", null);
  127.       if (weightAdjClass != null) {
  128.         weightAdjuster = (WeightAdjuster) ReflectionUtils.newInstance(
  129.             weightAdjClass, conf);
  130.       }
  131.       assignMultiple = conf.getBoolean("mapred.fairscheduler.assignmultiple",
  132.           false);
  133.       sizeBasedWeight = conf.getBoolean("mapred.fairscheduler.sizebasedweight",
  134.           false);
  135.       initialized = true;
  136.       running = true;
  137.       lastUpdateTime = clock.getTime();
  138.       // Start a thread to update deficits every UPDATE_INTERVAL
  139.       if (runBackgroundUpdates)
  140.         new UpdateThread().start();
  141.       // Register servlet with JobTracker's Jetty server
  142.       if (taskTrackerManager instanceof JobTracker) {
  143.         JobTracker jobTracker = (JobTracker) taskTrackerManager;
  144.         HttpServer infoServer = jobTracker.infoServer;
  145.         infoServer.setAttribute("scheduler", this);
  146.         infoServer.addServlet("scheduler", "/scheduler",
  147.             FairSchedulerServlet.class);
  148.       }
  149.     } catch (Exception e) {
  150.       // Can't load one of the managers - crash the JobTracker now while it is
  151.       // starting up so that the user notices.
  152.       throw new RuntimeException("Failed to start FairScheduler", e);
  153.     }
  154.     LOG.info("Successfully configured FairScheduler");
  155.   }
  156.   @Override
  157.   public void terminate() throws IOException {
  158.     running = false;
  159.     if (jobListener != null)
  160.       taskTrackerManager.removeJobInProgressListener(jobListener);
  161.     if (eagerInitListener != null)
  162.       taskTrackerManager.removeJobInProgressListener(eagerInitListener);
  163.   }
  164.   
  165.   /**
  166.    * Used to listen for jobs added/removed by our {@link TaskTrackerManager}.
  167.    */
  168.   private class JobListener extends JobInProgressListener {
  169.     @Override
  170.     public void jobAdded(JobInProgress job) {
  171.       synchronized (FairScheduler.this) {
  172.         poolMgr.addJob(job);
  173.         JobInfo info = new JobInfo();
  174.         infos.put(job, info);
  175.         update();
  176.       }
  177.     }
  178.     
  179.     @Override
  180.     public void jobRemoved(JobInProgress job) {
  181.       synchronized (FairScheduler.this) {
  182.         poolMgr.removeJob(job);
  183.         infos.remove(job);
  184.       }
  185.     }
  186.   
  187.     @Override
  188.     public void jobUpdated(JobChangeEvent event) {
  189.     }
  190.   }
  191.   /**
  192.    * A thread which calls {@link FairScheduler#update()} ever
  193.    * <code>UPDATE_INTERVAL</code> milliseconds.
  194.    */
  195.   private class UpdateThread extends Thread {
  196.     private UpdateThread() {
  197.       super("FairScheduler update thread");
  198.     }
  199.     public void run() {
  200.       while (running) {
  201.         try {
  202.           Thread.sleep(UPDATE_INTERVAL);
  203.           update();
  204.         } catch (Exception e) {
  205.           LOG.error("Failed to update fair share calculations", e);
  206.         }
  207.       }
  208.     }
  209.   }
  210.   
  211.   @Override
  212.   public synchronized List<Task> assignTasks(TaskTrackerStatus tracker)
  213.       throws IOException {
  214.     if (!initialized) // Don't try to assign tasks if we haven't yet started up
  215.       return null;
  216.     
  217.     // Reload allocations file if it hasn't been loaded in a while
  218.     poolMgr.reloadAllocsIfNecessary();
  219.     
  220.     // Compute total runnable maps and reduces
  221.     int runnableMaps = 0;
  222.     int runnableReduces = 0;
  223.     for (JobInProgress job: infos.keySet()) {
  224.       runnableMaps += runnableTasks(job, TaskType.MAP);
  225.       runnableReduces += runnableTasks(job, TaskType.REDUCE);
  226.     }
  227.     ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
  228.     // Compute total map/reduce slots
  229.     // In the future we can precompute this if the Scheduler becomes a 
  230.     // listener of tracker join/leave events.
  231.     int totalMapSlots = getTotalSlots(TaskType.MAP, clusterStatus);
  232.     int totalReduceSlots = getTotalSlots(TaskType.REDUCE, clusterStatus);
  233.     
  234.     // Scan to see whether any job needs to run a map, then a reduce
  235.     ArrayList<Task> tasks = new ArrayList<Task>();
  236.     TaskType[] types = new TaskType[] {TaskType.MAP, TaskType.REDUCE};
  237.     for (TaskType taskType: types) {
  238.       boolean canAssign = (taskType == TaskType.MAP) ? 
  239.           loadMgr.canAssignMap(tracker, runnableMaps, totalMapSlots) :
  240.           loadMgr.canAssignReduce(tracker, runnableReduces, totalReduceSlots);
  241.       if (canAssign) {
  242.         // Figure out the jobs that need this type of task
  243.         List<JobInProgress> candidates = new ArrayList<JobInProgress>();
  244.         for (JobInProgress job: infos.keySet()) {
  245.           if (job.getStatus().getRunState() == JobStatus.RUNNING && 
  246.               neededTasks(job, taskType) > 0) {
  247.             candidates.add(job);
  248.           }
  249.         }
  250.         // Sort jobs by deficit (for Fair Sharing) or submit time (for FIFO)
  251.         Comparator<JobInProgress> comparator = useFifo ?
  252.             new FifoJobComparator() : new DeficitComparator(taskType);
  253.         Collections.sort(candidates, comparator);
  254.         for (JobInProgress job: candidates) {
  255.           Task task = (taskType == TaskType.MAP ? 
  256.               taskSelector.obtainNewMapTask(tracker, job) :
  257.               taskSelector.obtainNewReduceTask(tracker, job));
  258.           if (task != null) {
  259.             // Update the JobInfo for this job so we account for the launched
  260.             // tasks during this update interval and don't try to launch more
  261.             // tasks than the job needed on future heartbeats
  262.             JobInfo info = infos.get(job);
  263.             if (taskType == TaskType.MAP) {
  264.               info.runningMaps++;
  265.               info.neededMaps--;
  266.             } else {
  267.               info.runningReduces++;
  268.               info.neededReduces--;
  269.             }
  270.             tasks.add(task);
  271.             if (!assignMultiple)
  272.               return tasks;
  273.             break;
  274.           }
  275.         }
  276.       }
  277.     }
  278.     
  279.     // If no tasks were found, return null
  280.     return tasks.isEmpty() ? null : tasks;
  281.   }
  282.   /**
  283.    * Compare jobs by deficit for a given task type, putting jobs whose current
  284.    * allocation is less than their minimum share always ahead of others. This is
  285.    * the default job comparator used for Fair Sharing.
  286.    */
  287.   private class DeficitComparator implements Comparator<JobInProgress> {
  288.     private final TaskType taskType;
  289.     private DeficitComparator(TaskType taskType) {
  290.       this.taskType = taskType;
  291.     }
  292.     public int compare(JobInProgress j1, JobInProgress j2) {
  293.       // Put needy jobs ahead of non-needy jobs (where needy means must receive
  294.       // new tasks to meet slot minimum), comparing among jobs of the same type
  295.       // by deficit so as to put jobs with higher deficit ahead.
  296.       JobInfo j1Info = infos.get(j1);
  297.       JobInfo j2Info = infos.get(j2);
  298.       long deficitDif;
  299.       boolean j1Needy, j2Needy;
  300.       if (taskType == TaskType.MAP) {
  301.         j1Needy = j1.runningMaps() < Math.floor(j1Info.minMaps);
  302.         j2Needy = j2.runningMaps() < Math.floor(j2Info.minMaps);
  303.         deficitDif = j2Info.mapDeficit - j1Info.mapDeficit;
  304.       } else {
  305.         j1Needy = j1.runningReduces() < Math.floor(j1Info.minReduces);
  306.         j2Needy = j2.runningReduces() < Math.floor(j2Info.minReduces);
  307.         deficitDif = j2Info.reduceDeficit - j1Info.reduceDeficit;
  308.       }
  309.       if (j1Needy && !j2Needy)
  310.         return -1;
  311.       else if (j2Needy && !j1Needy)
  312.         return 1;
  313.       else // Both needy or both non-needy; compare by deficit
  314.         return (int) Math.signum(deficitDif);
  315.     }
  316.   }
  317.   
  318.   /**
  319.    * Recompute the internal variables used by the scheduler - per-job weights,
  320.    * fair shares, deficits, minimum slot allocations, and numbers of running
  321.    * and needed tasks of each type. 
  322.    */
  323.   protected void update() {
  324.     //Making more granual locking so that clusterStatus can be fetched from Jobtracker.
  325.     ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
  326.     // Got clusterStatus hence acquiring scheduler lock now
  327.     // Remove non-running jobs
  328.     synchronized(this){
  329.       List<JobInProgress> toRemove = new ArrayList<JobInProgress>();
  330.       for (JobInProgress job: infos.keySet()) { 
  331.         int runState = job.getStatus().getRunState();
  332.         if (runState == JobStatus.SUCCEEDED || runState == JobStatus.FAILED
  333.           || runState == JobStatus.KILLED) {
  334.             toRemove.add(job);
  335.         }
  336.       }
  337.       for (JobInProgress job: toRemove) {
  338.         infos.remove(job);
  339.         poolMgr.removeJob(job);
  340.       }
  341.       // Update running jobs with deficits since last update, and compute new
  342.       // slot allocations, weight, shares and task counts
  343.       long now = clock.getTime();
  344.       long timeDelta = now - lastUpdateTime;
  345.       updateDeficits(timeDelta);
  346.       updateRunnability();
  347.       updateTaskCounts();
  348.       updateWeights();
  349.       updateMinSlots();
  350.       updateFairShares(clusterStatus);
  351.       lastUpdateTime = now;
  352.     }
  353.   }
  354.   
  355.   private void updateDeficits(long timeDelta) {
  356.     for (JobInfo info: infos.values()) {
  357.       info.mapDeficit +=
  358.         (info.mapFairShare - info.runningMaps) * timeDelta;
  359.       info.reduceDeficit +=
  360.         (info.reduceFairShare - info.runningReduces) * timeDelta;
  361.     }
  362.   }
  363.   
  364.   private void updateRunnability() {
  365.     // Start by marking everything as not runnable
  366.     for (JobInfo info: infos.values()) {
  367.       info.runnable = false;
  368.     }
  369.     // Create a list of sorted jobs in order of start time and priority
  370.     List<JobInProgress> jobs = new ArrayList<JobInProgress>(infos.keySet());
  371.     Collections.sort(jobs, new FifoJobComparator());
  372.     // Mark jobs as runnable in order of start time and priority, until
  373.     // user or pool limits have been reached.
  374.     Map<String, Integer> userJobs = new HashMap<String, Integer>();
  375.     Map<String, Integer> poolJobs = new HashMap<String, Integer>();
  376.     for (JobInProgress job: jobs) {
  377.       if (job.getStatus().getRunState() == JobStatus.RUNNING) {
  378.         String user = job.getJobConf().getUser();
  379.         String pool = poolMgr.getPoolName(job);
  380.         int userCount = userJobs.containsKey(user) ? userJobs.get(user) : 0;
  381.         int poolCount = poolJobs.containsKey(pool) ? poolJobs.get(pool) : 0;
  382.         if (userCount < poolMgr.getUserMaxJobs(user) && 
  383.             poolCount < poolMgr.getPoolMaxJobs(pool)) {
  384.           infos.get(job).runnable = true;
  385.           userJobs.put(user, userCount + 1);
  386.           poolJobs.put(pool, poolCount + 1);
  387.         }
  388.       }
  389.     }
  390.   }
  391.   private void updateTaskCounts() {
  392.     for (Map.Entry<JobInProgress, JobInfo> entry: infos.entrySet()) {
  393.       JobInProgress job = entry.getKey();
  394.       JobInfo info = entry.getValue();
  395.       if (job.getStatus().getRunState() != JobStatus.RUNNING)
  396.         continue; // Job is still in PREP state and tasks aren't initialized
  397.       // Count maps
  398.       int totalMaps = job.numMapTasks;
  399.       int finishedMaps = 0;
  400.       int runningMaps = 0;
  401.       for (TaskInProgress tip: job.getMapTasks()) {
  402.         if (tip.isComplete()) {
  403.           finishedMaps += 1;
  404.         } else if (tip.isRunning()) {
  405.           runningMaps += tip.getActiveTasks().size();
  406.         }
  407.       }
  408.       info.runningMaps = runningMaps;
  409.       info.neededMaps = (totalMaps - runningMaps - finishedMaps
  410.           + taskSelector.neededSpeculativeMaps(job));
  411.       // Count reduces
  412.       int totalReduces = job.numReduceTasks;
  413.       int finishedReduces = 0;
  414.       int runningReduces = 0;
  415.       for (TaskInProgress tip: job.getReduceTasks()) {
  416.         if (tip.isComplete()) {
  417.           finishedReduces += 1;
  418.         } else if (tip.isRunning()) {
  419.           runningReduces += tip.getActiveTasks().size();
  420.         }
  421.       }
  422.       info.runningReduces = runningReduces;
  423.       if (enoughMapsFinishedToRunReduces(finishedMaps, totalMaps)) {
  424.         info.neededReduces = (totalReduces - runningReduces - finishedReduces 
  425.             + taskSelector.neededSpeculativeReduces(job));
  426.       } else {
  427.         info.neededReduces = 0;
  428.       }
  429.       // If the job was marked as not runnable due to its user or pool having
  430.       // too many active jobs, set the neededMaps/neededReduces to 0. We still
  431.       // count runningMaps/runningReduces however so we can give it a deficit.
  432.       if (!info.runnable) {
  433.         info.neededMaps = 0;
  434.         info.neededReduces = 0;
  435.       }
  436.     }
  437.   }
  438.   /**
  439.    * Has a job finished enough maps to allow launching its reduces?
  440.    */
  441.   protected boolean enoughMapsFinishedToRunReduces(
  442.       int finishedMaps, int totalMaps) {
  443.     if (waitForMapsBeforeLaunchingReduces) {
  444.       return finishedMaps >= Math.max(1, totalMaps * 0.05);
  445.     } else {
  446.       return true;
  447.     }
  448.   }
  449.   private void updateWeights() {
  450.     // First, calculate raw weights for each job
  451.     for (Map.Entry<JobInProgress, JobInfo> entry: infos.entrySet()) {
  452.       JobInProgress job = entry.getKey();
  453.       JobInfo info = entry.getValue();
  454.       info.mapWeight = calculateRawWeight(job, TaskType.MAP);
  455.       info.reduceWeight = calculateRawWeight(job, TaskType.REDUCE);
  456.     }
  457.     // Now calculate job weight sums for each pool
  458.     Map<String, Double> mapWeightSums = new HashMap<String, Double>();
  459.     Map<String, Double> reduceWeightSums = new HashMap<String, Double>();
  460.     for (Pool pool: poolMgr.getPools()) {
  461.       double mapWeightSum = 0;
  462.       double reduceWeightSum = 0;
  463.       for (JobInProgress job: pool.getJobs()) {
  464.         if (isRunnable(job)) {
  465.           if (runnableTasks(job, TaskType.MAP) > 0) {
  466.             mapWeightSum += infos.get(job).mapWeight;
  467.           }
  468.           if (runnableTasks(job, TaskType.REDUCE) > 0) {
  469.             reduceWeightSum += infos.get(job).reduceWeight;
  470.           }
  471.         }
  472.       }
  473.       mapWeightSums.put(pool.getName(), mapWeightSum);
  474.       reduceWeightSums.put(pool.getName(), reduceWeightSum);
  475.     }
  476.     // And normalize the weights based on pool sums and pool weights
  477.     // to share fairly across pools (proportional to their weights)
  478.     for (Map.Entry<JobInProgress, JobInfo> entry: infos.entrySet()) {
  479.       JobInProgress job = entry.getKey();
  480.       JobInfo info = entry.getValue();
  481.       String pool = poolMgr.getPoolName(job);
  482.       double poolWeight = poolMgr.getPoolWeight(pool);
  483.       double mapWeightSum = mapWeightSums.get(pool);
  484.       double reduceWeightSum = reduceWeightSums.get(pool);
  485.       if (mapWeightSum == 0)
  486.         info.mapWeight = 0;
  487.       else
  488.         info.mapWeight *= (poolWeight / mapWeightSum); 
  489.       if (reduceWeightSum == 0)
  490.         info.reduceWeight = 0;
  491.       else
  492.         info.reduceWeight *= (poolWeight / reduceWeightSum); 
  493.     }
  494.   }
  495.   
  496.   private void updateMinSlots() {
  497.     // Clear old minSlots
  498.     for (JobInfo info: infos.values()) {
  499.       info.minMaps = 0;
  500.       info.minReduces = 0;
  501.     }
  502.     // For each pool, distribute its task allocation among jobs in it that need
  503.     // slots. This is a little tricky since some jobs in the pool might not be
  504.     // able to use all the slots, e.g. they might have only a few tasks left.
  505.     // To deal with this, we repeatedly split up the available task slots
  506.     // between the jobs left, give each job min(its alloc, # of slots it needs),
  507.     // and redistribute any slots that are left over between jobs that still
  508.     // need slots on the next pass. If, in total, the jobs in our pool don't
  509.     // need all its allocation, we leave the leftover slots for general use.
  510.     PoolManager poolMgr = getPoolManager();
  511.     for (Pool pool: poolMgr.getPools()) {
  512.       for (final TaskType type: TaskType.values()) {
  513.         Set<JobInProgress> jobs = new HashSet<JobInProgress>(pool.getJobs());
  514.         int slotsLeft = poolMgr.getAllocation(pool.getName(), type);
  515.         // Keep assigning slots until none are left
  516.         while (slotsLeft > 0) {
  517.           // Figure out total weight of jobs that still need slots
  518.           double totalWeight = 0;
  519.           for (Iterator<JobInProgress> it = jobs.iterator(); it.hasNext();) {
  520.             JobInProgress job = it.next();
  521.             if (isRunnable(job) &&
  522.                 runnableTasks(job, type) > minTasks(job, type)) {
  523.               totalWeight += weight(job, type);
  524.             } else {
  525.               it.remove();
  526.             }
  527.           }
  528.           if (totalWeight == 0) // No jobs that can use more slots are left 
  529.             break;
  530.           // Assign slots to jobs, using the floor of their weight divided by
  531.           // total weight. This ensures that all jobs get some chance to take
  532.           // a slot. Then, if no slots were assigned this way, we do another
  533.           // pass where we use ceil, in case some slots were still left over.
  534.           int oldSlots = slotsLeft; // Copy slotsLeft so we can modify it
  535.           for (JobInProgress job: jobs) {
  536.             double weight = weight(job, type);
  537.             int share = (int) Math.floor(oldSlots * weight / totalWeight);
  538.             slotsLeft = giveMinSlots(job, type, slotsLeft, share);
  539.           }
  540.           if (slotsLeft == oldSlots) {
  541.             // No tasks were assigned; do another pass using ceil, giving the
  542.             // extra slots to jobs in order of weight then deficit
  543.             List<JobInProgress> sortedJobs = new ArrayList<JobInProgress>(jobs);
  544.             Collections.sort(sortedJobs, new Comparator<JobInProgress>() {
  545.               public int compare(JobInProgress j1, JobInProgress j2) {
  546.                 double dif = weight(j2, type) - weight(j1, type);
  547.                 if (dif == 0) // Weights are equal, compare by deficit 
  548.                   dif = deficit(j2, type) - deficit(j1, type);
  549.                 return (int) Math.signum(dif);
  550.               }
  551.             });
  552.             for (JobInProgress job: sortedJobs) {
  553.               double weight = weight(job, type);
  554.               int share = (int) Math.ceil(oldSlots * weight / totalWeight);
  555.               slotsLeft = giveMinSlots(job, type, slotsLeft, share);
  556.             }
  557.             if (slotsLeft > 0) {
  558.               LOG.warn("Had slotsLeft = " + slotsLeft + " after the final "
  559.                   + "loop in updateMinSlots. This probably means some fair "
  560.                   + "scheduler weights are being set to NaN or Infinity.");
  561.             }
  562.             break;
  563.           }
  564.         }
  565.       }
  566.     }
  567.   }
  568.   /**
  569.    * Give up to <code>tasksToGive</code> min slots to a job (potentially fewer
  570.    * if either the job needs fewer slots or there aren't enough slots left).
  571.    * Returns the number of slots left over.
  572.    */
  573.   private int giveMinSlots(JobInProgress job, TaskType type,
  574.       int slotsLeft, int slotsToGive) {
  575.     int runnable = runnableTasks(job, type);
  576.     int curMin = minTasks(job, type);
  577.     slotsToGive = Math.min(Math.min(slotsLeft, runnable - curMin), slotsToGive);
  578.     slotsLeft -= slotsToGive;
  579.     JobInfo info = infos.get(job);
  580.     if (type == TaskType.MAP)
  581.       info.minMaps += slotsToGive;
  582.     else
  583.       info.minReduces += slotsToGive;
  584.     return slotsLeft;
  585.   }
  586.   private void updateFairShares(ClusterStatus clusterStatus) {
  587.     // Clear old fairShares
  588.     for (JobInfo info: infos.values()) {
  589.       info.mapFairShare = 0;
  590.       info.reduceFairShare = 0;
  591.     }
  592.     // Assign new shares, based on weight and minimum share. This is done
  593.     // as follows. First, we split up the available slots between all
  594.     // jobs according to weight. Then if there are any jobs whose minSlots is
  595.     // larger than their fair allocation, we give them their minSlots and
  596.     // remove them from the list, and start again with the amount of slots
  597.     // left over. This continues until all jobs' minSlots are less than their
  598.     // fair allocation, and at this point we know that we've met everyone's
  599.     // guarantee and we've split the excess capacity fairly among jobs left.
  600.     for (TaskType type: TaskType.values()) {
  601.       // Select only jobs that still need this type of task
  602.       HashSet<JobInfo> jobsLeft = new HashSet<JobInfo>();
  603.       for (Entry<JobInProgress, JobInfo> entry: infos.entrySet()) {
  604.         JobInProgress job = entry.getKey();
  605.         JobInfo info = entry.getValue();
  606.         if (isRunnable(job) && runnableTasks(job, type) > 0) {
  607.           jobsLeft.add(info);
  608.         }
  609.       }
  610.       double slotsLeft = getTotalSlots(type, clusterStatus);
  611.       while (!jobsLeft.isEmpty()) {
  612.         double totalWeight = 0;
  613.         for (JobInfo info: jobsLeft) {
  614.           double weight = (type == TaskType.MAP ?
  615.               info.mapWeight : info.reduceWeight);
  616.           totalWeight += weight;
  617.         }
  618.         boolean recomputeSlots = false;
  619.         double oldSlots = slotsLeft; // Copy slotsLeft so we can modify it
  620.         for (Iterator<JobInfo> iter = jobsLeft.iterator(); iter.hasNext();) {
  621.           JobInfo info = iter.next();
  622.           double minSlots = (type == TaskType.MAP ?
  623.               info.minMaps : info.minReduces);
  624.           double weight = (type == TaskType.MAP ?
  625.               info.mapWeight : info.reduceWeight);
  626.           double fairShare = weight / totalWeight * oldSlots;
  627.           if (minSlots > fairShare) {
  628.             // Job needs more slots than its fair share; give it its minSlots,
  629.             // remove it from the list, and set recomputeSlots = true to 
  630.             // remember that we must loop again to redistribute unassigned slots
  631.             if (type == TaskType.MAP)
  632.               info.mapFairShare = minSlots;
  633.             else
  634.               info.reduceFairShare = minSlots;
  635.             slotsLeft -= minSlots;
  636.             iter.remove();
  637.             recomputeSlots = true;
  638.           }
  639.         }
  640.         if (!recomputeSlots) {
  641.           // All minimums are met. Give each job its fair share of excess slots.
  642.           for (JobInfo info: jobsLeft) {
  643.             double weight = (type == TaskType.MAP ?
  644.                 info.mapWeight : info.reduceWeight);
  645.             double fairShare = weight / totalWeight * oldSlots;
  646.             if (type == TaskType.MAP)
  647.               info.mapFairShare = fairShare;
  648.             else
  649.               info.reduceFairShare = fairShare;
  650.           }
  651.           break;
  652.         }
  653.       }
  654.     }
  655.   }
  656.   private double calculateRawWeight(JobInProgress job, TaskType taskType) {
  657.     if (!isRunnable(job)) {
  658.       return 0;
  659.     } else {
  660.       double weight = 1.0;
  661.       if (sizeBasedWeight) {
  662.         // Set weight based on runnable tasks
  663.         weight = Math.log1p(runnableTasks(job, taskType)) / Math.log(2);
  664.       }
  665.       weight *= getPriorityFactor(job.getPriority());
  666.       if (weightAdjuster != null) {
  667.         // Run weight through the user-supplied weightAdjuster
  668.         weight = weightAdjuster.adjustWeight(job, taskType, weight);
  669.       }
  670.       return weight;
  671.     }
  672.   }
  673.   private double getPriorityFactor(JobPriority priority) {
  674.     switch (priority) {
  675.     case VERY_HIGH: return 4.0;
  676.     case HIGH:      return 2.0;
  677.     case NORMAL:    return 1.0;
  678.     case LOW:       return 0.5;
  679.     default:        return 0.25; // priority = VERY_LOW
  680.     }
  681.   }
  682.   
  683.   public PoolManager getPoolManager() {
  684.     return poolMgr;
  685.   }
  686.   private int getTotalSlots(TaskType type, ClusterStatus clusterStatus) {
  687.     return (type == TaskType.MAP ?
  688.       clusterStatus.getMaxMapTasks() : clusterStatus.getMaxReduceTasks());
  689.   }
  690.   public synchronized boolean getUseFifo() {
  691.     return useFifo;
  692.   }
  693.   
  694.   public synchronized void setUseFifo(boolean useFifo) {
  695.     this.useFifo = useFifo;
  696.   }
  697.   
  698.   // Getter methods for reading JobInfo values based on TaskType, safely
  699.   // returning 0's for jobs with no JobInfo present.
  700.   protected int neededTasks(JobInProgress job, TaskType taskType) {
  701.     JobInfo info = infos.get(job);
  702.     if (info == null) return 0;
  703.     return taskType == TaskType.MAP ? info.neededMaps : info.neededReduces;
  704.   }
  705.   
  706.   protected int runningTasks(JobInProgress job, TaskType taskType) {
  707.     JobInfo info = infos.get(job);
  708.     if (info == null) return 0;
  709.     return taskType == TaskType.MAP ? info.runningMaps : info.runningReduces;
  710.   }
  711.   protected int runnableTasks(JobInProgress job, TaskType type) {
  712.     return neededTasks(job, type) + runningTasks(job, type);
  713.   }
  714.   protected int minTasks(JobInProgress job, TaskType type) {
  715.     JobInfo info = infos.get(job);
  716.     if (info == null) return 0;
  717.     return (type == TaskType.MAP) ? info.minMaps : info.minReduces;
  718.   }
  719.   protected double weight(JobInProgress job, TaskType taskType) {
  720.     JobInfo info = infos.get(job);
  721.     if (info == null) return 0;
  722.     return (taskType == TaskType.MAP ? info.mapWeight : info.reduceWeight);
  723.   }
  724.   protected double deficit(JobInProgress job, TaskType taskType) {
  725.     JobInfo info = infos.get(job);
  726.     if (info == null) return 0;
  727.     return taskType == TaskType.MAP ? info.mapDeficit : info.reduceDeficit;
  728.   }
  729.   protected boolean isRunnable(JobInProgress job) {
  730.     JobInfo info = infos.get(job);
  731.     if (info == null) return false;
  732.     return info.runnable;
  733.   }
  734.   @Override
  735.   public synchronized Collection<JobInProgress> getJobs(String queueName) {
  736.     Pool myJobPool = poolMgr.getPool(queueName);
  737.     return myJobPool.getJobs();
  738.   }
  739. }