JobQueueJobInProgressListener.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:5k
源码类别:
网格计算
开发平台:
Java
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.apache.hadoop.mapred;
- import java.util.Collection;
- import java.util.Collections;
- import java.util.Comparator;
- import java.util.Map;
- import java.util.TreeMap;
- import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
- /**
- * A {@link JobInProgressListener} that maintains the jobs being managed in
- * a queue. By default the queue is FIFO, but it is possible to use custom
- * queue ordering by using the
- * {@link #JobQueueJobInProgressListener(Collection)} constructor.
- */
- class JobQueueJobInProgressListener extends JobInProgressListener {
- /** A class that groups all the information from a {@link JobInProgress} that
- * is necessary for scheduling a job.
- */
- static class JobSchedulingInfo {
- private JobPriority priority;
- private long startTime;
- private JobID id;
- public JobSchedulingInfo(JobInProgress jip) {
- this(jip.getStatus());
- }
- public JobSchedulingInfo(JobStatus status) {
- priority = status.getJobPriority();
- startTime = status.getStartTime();
- id = status.getJobID();
- }
- JobPriority getPriority() {return priority;}
- long getStartTime() {return startTime;}
- JobID getJobID() {return id;}
- }
- static final Comparator<JobSchedulingInfo> FIFO_JOB_QUEUE_COMPARATOR
- = new Comparator<JobSchedulingInfo>() {
- public int compare(JobSchedulingInfo o1, JobSchedulingInfo o2) {
- int res = o1.getPriority().compareTo(o2.getPriority());
- if (res == 0) {
- if (o1.getStartTime() < o2.getStartTime()) {
- res = -1;
- } else {
- res = (o1.getStartTime() == o2.getStartTime() ? 0 : 1);
- }
- }
- if (res == 0) {
- res = o1.getJobID().compareTo(o2.getJobID());
- }
- return res;
- }
- };
- private Map<JobSchedulingInfo, JobInProgress> jobQueue;
- public JobQueueJobInProgressListener() {
- this(new TreeMap<JobSchedulingInfo,
- JobInProgress>(FIFO_JOB_QUEUE_COMPARATOR));
- }
- /**
- * For clients that want to provide their own job priorities.
- * @param jobQueue A collection whose iterator returns jobs in priority order.
- */
- protected JobQueueJobInProgressListener(Map<JobSchedulingInfo,
- JobInProgress> jobQueue) {
- this.jobQueue = Collections.synchronizedMap(jobQueue);
- }
- /**
- * Returns a synchronized view of the job queue.
- */
- public Collection<JobInProgress> getJobQueue() {
- return jobQueue.values();
- }
- @Override
- public void jobAdded(JobInProgress job) {
- jobQueue.put(new JobSchedulingInfo(job.getStatus()), job);
- }
- // Job will be removed once the job completes
- @Override
- public void jobRemoved(JobInProgress job) {}
- private void jobCompleted(JobSchedulingInfo oldInfo) {
- jobQueue.remove(oldInfo);
- }
- @Override
- public synchronized void jobUpdated(JobChangeEvent event) {
- JobInProgress job = event.getJobInProgress();
- if (event instanceof JobStatusChangeEvent) {
- // Check if the ordering of the job has changed
- // For now priority and start-time can change the job ordering
- JobStatusChangeEvent statusEvent = (JobStatusChangeEvent)event;
- JobSchedulingInfo oldInfo =
- new JobSchedulingInfo(statusEvent.getOldStatus());
- if (statusEvent.getEventType() == EventType.PRIORITY_CHANGED
- || statusEvent.getEventType() == EventType.START_TIME_CHANGED) {
- // Make a priority change
- reorderJobs(job, oldInfo);
- } else if (statusEvent.getEventType() == EventType.RUN_STATE_CHANGED) {
- // Check if the job is complete
- int runState = statusEvent.getNewStatus().getRunState();
- if (runState == JobStatus.SUCCEEDED
- || runState == JobStatus.FAILED
- || runState == JobStatus.KILLED) {
- jobCompleted(oldInfo);
- }
- }
- }
- }
- private void reorderJobs(JobInProgress job, JobSchedulingInfo oldInfo) {
- synchronized (jobQueue) {
- jobQueue.remove(oldInfo);
- jobQueue.put(new JobSchedulingInfo(job), job);
- }
- }
- }