JobQueueTaskScheduler.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:11k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.IOException;
  20. import java.util.ArrayList;
  21. import java.util.Collection;
  22. import java.util.List;
  23. import org.apache.commons.logging.Log;
  24. import org.apache.commons.logging.LogFactory;
  25. import org.apache.hadoop.conf.Configuration;
  26. /**
  27.  * A {@link TaskScheduler} that keeps jobs in a queue in priority order (FIFO
  28.  * by default).
  29.  */
  30. class JobQueueTaskScheduler extends TaskScheduler {
  31.   
  32.   private static final int MIN_CLUSTER_SIZE_FOR_PADDING = 3;
  33.   public static final Log LOG = LogFactory.getLog(JobQueueTaskScheduler.class);
  34.   
  35.   protected JobQueueJobInProgressListener jobQueueJobInProgressListener;
  36.   protected EagerTaskInitializationListener eagerTaskInitializationListener;
  37.   private float padFraction;
  38.   
  39.   public JobQueueTaskScheduler() {
  40.     this.jobQueueJobInProgressListener = new JobQueueJobInProgressListener();
  41.   }
  42.   
  43.   @Override
  44.   public synchronized void start() throws IOException {
  45.     super.start();
  46.     taskTrackerManager.addJobInProgressListener(jobQueueJobInProgressListener);
  47.     
  48.     eagerTaskInitializationListener.start();
  49.     taskTrackerManager.addJobInProgressListener(
  50.         eagerTaskInitializationListener);
  51.   }
  52.   
  53.   @Override
  54.   public synchronized void terminate() throws IOException {
  55.     if (jobQueueJobInProgressListener != null) {
  56.       taskTrackerManager.removeJobInProgressListener(
  57.           jobQueueJobInProgressListener);
  58.     }
  59.     if (eagerTaskInitializationListener != null) {
  60.       taskTrackerManager.removeJobInProgressListener(
  61.           eagerTaskInitializationListener);
  62.       eagerTaskInitializationListener.terminate();
  63.     }
  64.     super.terminate();
  65.   }
  66.   
  67.   @Override
  68.   public synchronized void setConf(Configuration conf) {
  69.     super.setConf(conf);
  70.     padFraction = conf.getFloat("mapred.jobtracker.taskalloc.capacitypad", 
  71.                                  0.01f);
  72.     this.eagerTaskInitializationListener =
  73.       new EagerTaskInitializationListener(conf);
  74.   }
  75.   @Override
  76.   public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)
  77.       throws IOException {
  78.     ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
  79.     final int numTaskTrackers = clusterStatus.getTaskTrackers();
  80.     final int clusterMapCapacity = clusterStatus.getMaxMapTasks();
  81.     final int clusterReduceCapacity = clusterStatus.getMaxReduceTasks();
  82.     Collection<JobInProgress> jobQueue =
  83.       jobQueueJobInProgressListener.getJobQueue();
  84.     //
  85.     // Get map + reduce counts for the current tracker.
  86.     //
  87.     final int trackerMapCapacity = taskTracker.getMaxMapTasks();
  88.     final int trackerReduceCapacity = taskTracker.getMaxReduceTasks();
  89.     final int trackerRunningMaps = taskTracker.countMapTasks();
  90.     final int trackerRunningReduces = taskTracker.countReduceTasks();
  91.     // Assigned tasks
  92.     List<Task> assignedTasks = new ArrayList<Task>();
  93.     //
  94.     // Compute (running + pending) map and reduce task numbers across pool
  95.     //
  96.     int remainingReduceLoad = 0;
  97.     int remainingMapLoad = 0;
  98.     synchronized (jobQueue) {
  99.       for (JobInProgress job : jobQueue) {
  100.         if (job.getStatus().getRunState() == JobStatus.RUNNING) {
  101.           remainingMapLoad += (job.desiredMaps() - job.finishedMaps());
  102.           if (job.scheduleReduces()) {
  103.             remainingReduceLoad += 
  104.               (job.desiredReduces() - job.finishedReduces());
  105.           }
  106.         }
  107.       }
  108.     }
  109.     // Compute the 'load factor' for maps and reduces
  110.     double mapLoadFactor = 0.0;
  111.     if (clusterMapCapacity > 0) {
  112.       mapLoadFactor = (double)remainingMapLoad / clusterMapCapacity;
  113.     }
  114.     double reduceLoadFactor = 0.0;
  115.     if (clusterReduceCapacity > 0) {
  116.       reduceLoadFactor = (double)remainingReduceLoad / clusterReduceCapacity;
  117.     }
  118.         
  119.     //
  120.     // In the below steps, we allocate first map tasks (if appropriate),
  121.     // and then reduce tasks if appropriate.  We go through all jobs
  122.     // in order of job arrival; jobs only get serviced if their 
  123.     // predecessors are serviced, too.
  124.     //
  125.     //
  126.     // We assign tasks to the current taskTracker if the given machine 
  127.     // has a workload that's less than the maximum load of that kind of
  128.     // task.
  129.     // However, if the cluster is close to getting loaded i.e. we don't
  130.     // have enough _padding_ for speculative executions etc., we only 
  131.     // schedule the "highest priority" task i.e. the task from the job 
  132.     // with the highest priority.
  133.     //
  134.     
  135.     final int trackerCurrentMapCapacity = 
  136.       Math.min((int)Math.ceil(mapLoadFactor * trackerMapCapacity), 
  137.                               trackerMapCapacity);
  138.     int availableMapSlots = trackerCurrentMapCapacity - trackerRunningMaps;
  139.     boolean exceededMapPadding = false;
  140.     if (availableMapSlots > 0) {
  141.       exceededMapPadding = 
  142.         exceededPadding(true, clusterStatus, trackerMapCapacity);
  143.     }
  144.     
  145.     int numLocalMaps = 0;
  146.     int numNonLocalMaps = 0;
  147.     scheduleMaps:
  148.     for (int i=0; i < availableMapSlots; ++i) {
  149.       synchronized (jobQueue) {
  150.         for (JobInProgress job : jobQueue) {
  151.           if (job.getStatus().getRunState() != JobStatus.RUNNING) {
  152.             continue;
  153.           }
  154.           Task t = null;
  155.           
  156.           // Try to schedule a node-local or rack-local Map task
  157.           t = 
  158.             job.obtainNewLocalMapTask(taskTracker, numTaskTrackers,
  159.                                       taskTrackerManager.getNumberOfUniqueHosts());
  160.           if (t != null) {
  161.             assignedTasks.add(t);
  162.             ++numLocalMaps;
  163.             
  164.             // Don't assign map tasks to the hilt!
  165.             // Leave some free slots in the cluster for future task-failures,
  166.             // speculative tasks etc. beyond the highest priority job
  167.             if (exceededMapPadding) {
  168.               break scheduleMaps;
  169.             }
  170.            
  171.             // Try all jobs again for the next Map task 
  172.             break;
  173.           }
  174.           
  175.           // Try to schedule a node-local or rack-local Map task
  176.           t = 
  177.             job.obtainNewNonLocalMapTask(taskTracker, numTaskTrackers,
  178.                                    taskTrackerManager.getNumberOfUniqueHosts());
  179.           
  180.           if (t != null) {
  181.             assignedTasks.add(t);
  182.             ++numNonLocalMaps;
  183.             
  184.             // We assign at most 1 off-switch or speculative task
  185.             // This is to prevent TaskTrackers from stealing local-tasks
  186.             // from other TaskTrackers.
  187.             break scheduleMaps;
  188.           }
  189.         }
  190.       }
  191.     }
  192.     int assignedMaps = assignedTasks.size();
  193.     //
  194.     // Same thing, but for reduce tasks
  195.     // However we _never_ assign more than 1 reduce task per heartbeat
  196.     //
  197.     final int trackerCurrentReduceCapacity = 
  198.       Math.min((int)Math.ceil(reduceLoadFactor * trackerReduceCapacity), 
  199.                trackerReduceCapacity);
  200.     final int availableReduceSlots = 
  201.       Math.min((trackerCurrentReduceCapacity - trackerRunningReduces), 1);
  202.     boolean exceededReducePadding = false;
  203.     if (availableReduceSlots > 0) {
  204.       exceededReducePadding = exceededPadding(false, clusterStatus, 
  205.                                               trackerReduceCapacity);
  206.       synchronized (jobQueue) {
  207.         for (JobInProgress job : jobQueue) {
  208.           if (job.getStatus().getRunState() != JobStatus.RUNNING ||
  209.               job.numReduceTasks == 0) {
  210.             continue;
  211.           }
  212.           Task t = 
  213.             job.obtainNewReduceTask(taskTracker, numTaskTrackers, 
  214.                                     taskTrackerManager.getNumberOfUniqueHosts()
  215.                                     );
  216.           if (t != null) {
  217.             assignedTasks.add(t);
  218.             break;
  219.           }
  220.           
  221.           // Don't assign reduce tasks to the hilt!
  222.           // Leave some free slots in the cluster for future task-failures,
  223.           // speculative tasks etc. beyond the highest priority job
  224.           if (exceededReducePadding) {
  225.             break;
  226.           }
  227.         }
  228.       }
  229.     }
  230.     
  231.     if (LOG.isDebugEnabled()) {
  232.       LOG.debug("Task assignments for " + taskTracker.getTrackerName() + " --> " +
  233.                 "[" + mapLoadFactor + ", " + trackerMapCapacity + ", " + 
  234.                 trackerCurrentMapCapacity + ", " + trackerRunningMaps + "] -> [" + 
  235.                 (trackerCurrentMapCapacity - trackerRunningMaps) + ", " +
  236.                 assignedMaps + " (" + numLocalMaps + ", " + numNonLocalMaps + 
  237.                 ")] [" + reduceLoadFactor + ", " + trackerReduceCapacity + ", " + 
  238.                 trackerCurrentReduceCapacity + "," + trackerRunningReduces + 
  239.                 "] -> [" + (trackerCurrentReduceCapacity - trackerRunningReduces) + 
  240.                 ", " + (assignedTasks.size()-assignedMaps) + "]");
  241.     }
  242.     return assignedTasks;
  243.   }
  244.   private boolean exceededPadding(boolean isMapTask, 
  245.                                   ClusterStatus clusterStatus, 
  246.                                   int maxTaskTrackerSlots) { 
  247.     int numTaskTrackers = clusterStatus.getTaskTrackers();
  248.     int totalTasks = 
  249.       (isMapTask) ? clusterStatus.getMapTasks() : 
  250.         clusterStatus.getReduceTasks();
  251.     int totalTaskCapacity = 
  252.       isMapTask ? clusterStatus.getMaxMapTasks() : 
  253.                   clusterStatus.getMaxReduceTasks();
  254.     Collection<JobInProgress> jobQueue =
  255.       jobQueueJobInProgressListener.getJobQueue();
  256.     boolean exceededPadding = false;
  257.     synchronized (jobQueue) {
  258.       int totalNeededTasks = 0;
  259.       for (JobInProgress job : jobQueue) {
  260.         if (job.getStatus().getRunState() != JobStatus.RUNNING ||
  261.             job.numReduceTasks == 0) {
  262.           continue;
  263.         }
  264.         //
  265.         // Beyond the highest-priority task, reserve a little 
  266.         // room for failures and speculative executions; don't 
  267.         // schedule tasks to the hilt.
  268.         //
  269.         totalNeededTasks += 
  270.           isMapTask ? job.desiredMaps() : job.desiredReduces();
  271.         int padding = 0;
  272.         if (numTaskTrackers > MIN_CLUSTER_SIZE_FOR_PADDING) {
  273.           padding = 
  274.             Math.min(maxTaskTrackerSlots,
  275.                      (int) (totalNeededTasks * padFraction));
  276.         }
  277.         if (totalTasks + padding >= totalTaskCapacity) {
  278.           exceededPadding = true;
  279.           break;
  280.         }
  281.       }
  282.     }
  283.     return exceededPadding;
  284.   }
  285.   @Override
  286.   public synchronized Collection<JobInProgress> getJobs(String queueName) {
  287.     return jobQueueJobInProgressListener.getJobQueue();
  288.   }  
  289. }