LimitTasksPerJobTaskScheduler.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:7k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.IOException;
  20. import java.util.ArrayList;
  21. import java.util.Collection;
  22. import java.util.Collections;
  23. import java.util.List;
  24. import org.apache.commons.logging.Log;
  25. import org.apache.commons.logging.LogFactory;
  26. import org.apache.hadoop.conf.Configuration;
  27. /**
  28.  * A {@link TaskScheduler} that limits the maximum number of tasks
  29.  * running for a job. The limit is set by means of the
  30.  * <code>mapred.jobtracker.scheduler.maxRunningTasksPerJob</code>
  31.  * property.
  32.  */
  33. class LimitTasksPerJobTaskScheduler extends JobQueueTaskScheduler {
  34.   
  35.   private static final Log LOG = LogFactory.getLog(
  36.     "org.apache.hadoop.mapred.TaskLimitedJobQueueTaskScheduler");
  37.   
  38.   public static final String MAX_TASKS_PER_JOB_PROPERTY = 
  39.     "mapred.jobtracker.taskScheduler.maxRunningTasksPerJob";
  40.   
  41.   private long maxTasksPerJob;
  42.   
  43.   public LimitTasksPerJobTaskScheduler() {
  44.     super();
  45.   }
  46.   
  47.   @Override
  48.   public synchronized void start() throws IOException {
  49.     super.start();
  50.     QueueManager queueManager = taskTrackerManager.getQueueManager();
  51.     String queueName = queueManager.getJobQueueInfos()[0].getQueueName();
  52.     queueManager.setSchedulerInfo(queueName
  53.         ,"Maximum Tasks Per Job :: " + String.valueOf(maxTasksPerJob));
  54.   }
  55.   
  56.   @Override
  57.   public synchronized void setConf(Configuration conf) {
  58.     super.setConf(conf);
  59.     maxTasksPerJob = conf.getLong(MAX_TASKS_PER_JOB_PROPERTY ,Long.MAX_VALUE);
  60.     if (maxTasksPerJob <= 0) {
  61.       String msg = MAX_TASKS_PER_JOB_PROPERTY +
  62.         " is set to zero or a negative value. Aborting.";
  63.       LOG.fatal(msg);
  64.       throw new RuntimeException (msg);
  65.     }
  66.   }
  67.   @Override
  68.   public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)
  69.       throws IOException {
  70.     final int numTaskTrackers =
  71.         taskTrackerManager.getClusterStatus().getTaskTrackers();
  72.     Collection<JobInProgress> jobQueue =
  73.       jobQueueJobInProgressListener.getJobQueue();
  74.     Task task;
  75.     /* Stats about the current taskTracker */
  76.     final int mapTasksNumber = taskTracker.countMapTasks();
  77.     final int reduceTasksNumber = taskTracker.countReduceTasks();
  78.     final int maximumMapTasksNumber = taskTracker.getMaxMapTasks();
  79.     final int maximumReduceTasksNumber = taskTracker.getMaxReduceTasks();
  80.     /*
  81.      * Statistics about the whole cluster. Most are approximate because of
  82.      * concurrency
  83.      */
  84.     final int[] maxMapAndReduceLoad = getMaxMapAndReduceLoad(
  85.         maximumMapTasksNumber, maximumReduceTasksNumber);
  86.     final int maximumMapLoad = maxMapAndReduceLoad[0];
  87.     final int maximumReduceLoad = maxMapAndReduceLoad[1];
  88.     
  89.     final int beginAtStep;
  90.     /*
  91.      * When step == 0, this loop starts as many map tasks it can wrt
  92.      * maxTasksPerJob
  93.      * When step == 1, this loop starts as many reduce tasks it can wrt
  94.      * maxTasksPerJob
  95.      * When step == 2, this loop starts as many map tasks it can
  96.      * When step == 3, this loop starts as many reduce tasks it can
  97.      *
  98.      * It may seem that we would improve this loop by queuing jobs we cannot
  99.      * start in steps 0 and 1 because of maxTasksPerJob, and using that queue
  100.      * in step 2 and 3.
  101.      * A first thing to notice is that the time with the current algorithm is
  102.      * logarithmic, because it is the sum of (p^k) for k from 1 to N, were
  103.      * N is the number of jobs and p is the probability for a job to not exceed
  104.      * limits The probability for the cache to be useful would be similar to
  105.      * p^N, that is 1/(e^N), whereas its size and the time spent to manage it
  106.      * would be in ln(N).
  107.      * So it is not a good idea.
  108.      */
  109.     if (maxTasksPerJob != Long.MAX_VALUE) {
  110.       beginAtStep = 0;
  111.     }
  112.     else {
  113.       beginAtStep = 2;
  114.     }
  115.     List<Task> assignedTasks = new ArrayList<Task>();
  116.     scheduleTasks:
  117.     for (int step = beginAtStep; step <= 3; ++step) {
  118.       /* If we reached the maximum load for this step, go to the next */
  119.       if ((step == 0 || step == 2) && mapTasksNumber >= maximumMapLoad ||
  120.           (step == 1 || step == 3) && reduceTasksNumber >= maximumReduceLoad) {
  121.         continue;
  122.       }
  123.       /* For each job, start its tasks */
  124.       synchronized (jobQueue) {
  125.         for (JobInProgress job : jobQueue) {
  126.           /* Ignore non running jobs */
  127.           if (job.getStatus().getRunState() != JobStatus.RUNNING) {
  128.             continue;
  129.           }
  130.           /* Check that we're not exceeding the global limits */
  131.           if ((step == 0 || step == 1)
  132.               && (job.runningMaps() + job.runningReduces() >= maxTasksPerJob)) {
  133.             continue;
  134.           }
  135.           if (step == 0 || step == 2) {
  136.             task = job.obtainNewMapTask(taskTracker, numTaskTrackers,
  137.                 taskTrackerManager.getNumberOfUniqueHosts());
  138.           }
  139.           else {
  140.             task = job.obtainNewReduceTask(taskTracker, numTaskTrackers,
  141.                 taskTrackerManager.getNumberOfUniqueHosts());
  142.           }
  143.           if (task != null) {
  144.             assignedTasks.add(task);
  145.             break scheduleTasks;
  146.           }
  147.         }
  148.       }
  149.     }
  150.     return assignedTasks;
  151.   }
  152.   /**
  153.    * Determine the maximum number of maps or reduces that we are willing to run
  154.    * on a taskTracker which accept a maximum of localMaxMapLoad maps and
  155.    * localMaxReduceLoad reduces
  156.    * @param localMaxMapLoad The local maximum number of map tasks for a host
  157.    * @param localMaxReduceLoad The local maximum number of reduce tasks for a
  158.    * host
  159.    * @return An array of the two maximums: map then reduce.
  160.    */
  161.   protected synchronized int[] getMaxMapAndReduceLoad(int localMaxMapLoad,
  162.       int localMaxReduceLoad) {
  163.     // Approximate because of concurrency
  164.     final int numTaskTrackers =
  165.       taskTrackerManager.getClusterStatus().getTaskTrackers();
  166.     /* Hold the result */
  167.     int maxMapLoad = 0;
  168.     int maxReduceLoad = 0;
  169.     int neededMaps = 0;
  170.     int neededReduces = 0;
  171.     Collection<JobInProgress> jobQueue =
  172.       jobQueueJobInProgressListener.getJobQueue();
  173.     synchronized (jobQueue) {
  174.       for (JobInProgress job : jobQueue) {
  175.         if (job.getStatus().getRunState() == JobStatus.RUNNING) {
  176.           neededMaps += job.desiredMaps() - job.finishedMaps();
  177.           neededReduces += job.desiredReduces() - job.finishedReduces();
  178.         }
  179.       }
  180.     }
  181.     if (numTaskTrackers > 0) {
  182.       maxMapLoad = Math.min(localMaxMapLoad, (int) Math
  183.           .ceil((double) neededMaps / numTaskTrackers));
  184.       maxReduceLoad = Math.min(localMaxReduceLoad, (int) Math
  185.           .ceil((double) neededReduces / numTaskTrackers));
  186.     }
  187.     return new int[] { maxMapLoad, maxReduceLoad };
  188.   }
  189. }