JobQueueTaskScheduler.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:11k
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.apache.hadoop.mapred;
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.Collection;
- import java.util.List;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.apache.hadoop.conf.Configuration;
- /**
- * A {@link TaskScheduler} that keeps jobs in a queue in priority order (FIFO
- * by default).
- */
- class JobQueueTaskScheduler extends TaskScheduler {
-
- private static final int MIN_CLUSTER_SIZE_FOR_PADDING = 3;
- public static final Log LOG = LogFactory.getLog(JobQueueTaskScheduler.class);
-
- protected JobQueueJobInProgressListener jobQueueJobInProgressListener;
- protected EagerTaskInitializationListener eagerTaskInitializationListener;
- private float padFraction;
-
- public JobQueueTaskScheduler() {
- this.jobQueueJobInProgressListener = new JobQueueJobInProgressListener();
- }
-
- @Override
- public synchronized void start() throws IOException {
- super.start();
- taskTrackerManager.addJobInProgressListener(jobQueueJobInProgressListener);
-
- eagerTaskInitializationListener.start();
- taskTrackerManager.addJobInProgressListener(
- eagerTaskInitializationListener);
- }
-
- @Override
- public synchronized void terminate() throws IOException {
- if (jobQueueJobInProgressListener != null) {
- taskTrackerManager.removeJobInProgressListener(
- jobQueueJobInProgressListener);
- }
- if (eagerTaskInitializationListener != null) {
- taskTrackerManager.removeJobInProgressListener(
- eagerTaskInitializationListener);
- eagerTaskInitializationListener.terminate();
- }
- super.terminate();
- }
-
- @Override
- public synchronized void setConf(Configuration conf) {
- super.setConf(conf);
- padFraction = conf.getFloat("mapred.jobtracker.taskalloc.capacitypad",
- 0.01f);
- this.eagerTaskInitializationListener =
- new EagerTaskInitializationListener(conf);
- }
- @Override
- public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)
- throws IOException {
- ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
- final int numTaskTrackers = clusterStatus.getTaskTrackers();
- final int clusterMapCapacity = clusterStatus.getMaxMapTasks();
- final int clusterReduceCapacity = clusterStatus.getMaxReduceTasks();
- Collection<JobInProgress> jobQueue =
- jobQueueJobInProgressListener.getJobQueue();
- //
- // Get map + reduce counts for the current tracker.
- //
- final int trackerMapCapacity = taskTracker.getMaxMapTasks();
- final int trackerReduceCapacity = taskTracker.getMaxReduceTasks();
- final int trackerRunningMaps = taskTracker.countMapTasks();
- final int trackerRunningReduces = taskTracker.countReduceTasks();
- // Assigned tasks
- List<Task> assignedTasks = new ArrayList<Task>();
- //
- // Compute (running + pending) map and reduce task numbers across pool
- //
- int remainingReduceLoad = 0;
- int remainingMapLoad = 0;
- synchronized (jobQueue) {
- for (JobInProgress job : jobQueue) {
- if (job.getStatus().getRunState() == JobStatus.RUNNING) {
- remainingMapLoad += (job.desiredMaps() - job.finishedMaps());
- if (job.scheduleReduces()) {
- remainingReduceLoad +=
- (job.desiredReduces() - job.finishedReduces());
- }
- }
- }
- }
- // Compute the 'load factor' for maps and reduces
- double mapLoadFactor = 0.0;
- if (clusterMapCapacity > 0) {
- mapLoadFactor = (double)remainingMapLoad / clusterMapCapacity;
- }
- double reduceLoadFactor = 0.0;
- if (clusterReduceCapacity > 0) {
- reduceLoadFactor = (double)remainingReduceLoad / clusterReduceCapacity;
- }
-
- //
- // In the below steps, we allocate first map tasks (if appropriate),
- // and then reduce tasks if appropriate. We go through all jobs
- // in order of job arrival; jobs only get serviced if their
- // predecessors are serviced, too.
- //
- //
- // We assign tasks to the current taskTracker if the given machine
- // has a workload that's less than the maximum load of that kind of
- // task.
- // However, if the cluster is close to getting loaded i.e. we don't
- // have enough _padding_ for speculative executions etc., we only
- // schedule the "highest priority" task i.e. the task from the job
- // with the highest priority.
- //
-
- final int trackerCurrentMapCapacity =
- Math.min((int)Math.ceil(mapLoadFactor * trackerMapCapacity),
- trackerMapCapacity);
- int availableMapSlots = trackerCurrentMapCapacity - trackerRunningMaps;
- boolean exceededMapPadding = false;
- if (availableMapSlots > 0) {
- exceededMapPadding =
- exceededPadding(true, clusterStatus, trackerMapCapacity);
- }
-
- int numLocalMaps = 0;
- int numNonLocalMaps = 0;
- scheduleMaps:
- for (int i=0; i < availableMapSlots; ++i) {
- synchronized (jobQueue) {
- for (JobInProgress job : jobQueue) {
- if (job.getStatus().getRunState() != JobStatus.RUNNING) {
- continue;
- }
- Task t = null;
-
- // Try to schedule a node-local or rack-local Map task
- t =
- job.obtainNewLocalMapTask(taskTracker, numTaskTrackers,
- taskTrackerManager.getNumberOfUniqueHosts());
- if (t != null) {
- assignedTasks.add(t);
- ++numLocalMaps;
-
- // Don't assign map tasks to the hilt!
- // Leave some free slots in the cluster for future task-failures,
- // speculative tasks etc. beyond the highest priority job
- if (exceededMapPadding) {
- break scheduleMaps;
- }
-
- // Try all jobs again for the next Map task
- break;
- }
-
- // Try to schedule a node-local or rack-local Map task
- t =
- job.obtainNewNonLocalMapTask(taskTracker, numTaskTrackers,
- taskTrackerManager.getNumberOfUniqueHosts());
-
- if (t != null) {
- assignedTasks.add(t);
- ++numNonLocalMaps;
-
- // We assign at most 1 off-switch or speculative task
- // This is to prevent TaskTrackers from stealing local-tasks
- // from other TaskTrackers.
- break scheduleMaps;
- }
- }
- }
- }
- int assignedMaps = assignedTasks.size();
- //
- // Same thing, but for reduce tasks
- // However we _never_ assign more than 1 reduce task per heartbeat
- //
- final int trackerCurrentReduceCapacity =
- Math.min((int)Math.ceil(reduceLoadFactor * trackerReduceCapacity),
- trackerReduceCapacity);
- final int availableReduceSlots =
- Math.min((trackerCurrentReduceCapacity - trackerRunningReduces), 1);
- boolean exceededReducePadding = false;
- if (availableReduceSlots > 0) {
- exceededReducePadding = exceededPadding(false, clusterStatus,
- trackerReduceCapacity);
- synchronized (jobQueue) {
- for (JobInProgress job : jobQueue) {
- if (job.getStatus().getRunState() != JobStatus.RUNNING ||
- job.numReduceTasks == 0) {
- continue;
- }
- Task t =
- job.obtainNewReduceTask(taskTracker, numTaskTrackers,
- taskTrackerManager.getNumberOfUniqueHosts()
- );
- if (t != null) {
- assignedTasks.add(t);
- break;
- }
-
- // Don't assign reduce tasks to the hilt!
- // Leave some free slots in the cluster for future task-failures,
- // speculative tasks etc. beyond the highest priority job
- if (exceededReducePadding) {
- break;
- }
- }
- }
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Task assignments for " + taskTracker.getTrackerName() + " --> " +
- "[" + mapLoadFactor + ", " + trackerMapCapacity + ", " +
- trackerCurrentMapCapacity + ", " + trackerRunningMaps + "] -> [" +
- (trackerCurrentMapCapacity - trackerRunningMaps) + ", " +
- assignedMaps + " (" + numLocalMaps + ", " + numNonLocalMaps +
- ")] [" + reduceLoadFactor + ", " + trackerReduceCapacity + ", " +
- trackerCurrentReduceCapacity + "," + trackerRunningReduces +
- "] -> [" + (trackerCurrentReduceCapacity - trackerRunningReduces) +
- ", " + (assignedTasks.size()-assignedMaps) + "]");
- }
- return assignedTasks;
- }
- private boolean exceededPadding(boolean isMapTask,
- ClusterStatus clusterStatus,
- int maxTaskTrackerSlots) {
- int numTaskTrackers = clusterStatus.getTaskTrackers();
- int totalTasks =
- (isMapTask) ? clusterStatus.getMapTasks() :
- clusterStatus.getReduceTasks();
- int totalTaskCapacity =
- isMapTask ? clusterStatus.getMaxMapTasks() :
- clusterStatus.getMaxReduceTasks();
- Collection<JobInProgress> jobQueue =
- jobQueueJobInProgressListener.getJobQueue();
- boolean exceededPadding = false;
- synchronized (jobQueue) {
- int totalNeededTasks = 0;
- for (JobInProgress job : jobQueue) {
- if (job.getStatus().getRunState() != JobStatus.RUNNING ||
- job.numReduceTasks == 0) {
- continue;
- }
- //
- // Beyond the highest-priority task, reserve a little
- // room for failures and speculative executions; don't
- // schedule tasks to the hilt.
- //
- totalNeededTasks +=
- isMapTask ? job.desiredMaps() : job.desiredReduces();
- int padding = 0;
- if (numTaskTrackers > MIN_CLUSTER_SIZE_FOR_PADDING) {
- padding =
- Math.min(maxTaskTrackerSlots,
- (int) (totalNeededTasks * padFraction));
- }
- if (totalTasks + padding >= totalTaskCapacity) {
- exceededPadding = true;
- break;
- }
- }
- }
- return exceededPadding;
- }
- @Override
- public synchronized Collection<JobInProgress> getJobs(String queueName) {
- return jobQueueJobInProgressListener.getJobQueue();
- }
- }