JobQueueJobInProgressListener.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:5k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.util.Collection;
  20. import java.util.Collections;
  21. import java.util.Comparator;
  22. import java.util.Map;
  23. import java.util.TreeMap;
  24. import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
  25. /**
  26.  * A {@link JobInProgressListener} that maintains the jobs being managed in
  27.  * a queue. By default the queue is FIFO, but it is possible to use custom
  28.  * queue ordering by using the
  29.  * {@link #JobQueueJobInProgressListener(Collection)} constructor.
  30.  */
  31. class JobQueueJobInProgressListener extends JobInProgressListener {
  32.   /** A class that groups all the information from a {@link JobInProgress} that 
  33.    * is necessary for scheduling a job.
  34.    */ 
  35.   static class JobSchedulingInfo {
  36.     private JobPriority priority;
  37.     private long startTime;
  38.     private JobID id;
  39.     
  40.     public JobSchedulingInfo(JobInProgress jip) {
  41.       this(jip.getStatus());
  42.     }
  43.     
  44.     public JobSchedulingInfo(JobStatus status) {
  45.       priority = status.getJobPriority();
  46.       startTime = status.getStartTime();
  47.       id = status.getJobID();
  48.     }
  49.     
  50.     JobPriority getPriority() {return priority;}
  51.     long getStartTime() {return startTime;}
  52.     JobID getJobID() {return id;}
  53.   }
  54.   
  55.   static final Comparator<JobSchedulingInfo> FIFO_JOB_QUEUE_COMPARATOR
  56.     = new Comparator<JobSchedulingInfo>() {
  57.     public int compare(JobSchedulingInfo o1, JobSchedulingInfo o2) {
  58.       int res = o1.getPriority().compareTo(o2.getPriority());
  59.       if (res == 0) {
  60.         if (o1.getStartTime() < o2.getStartTime()) {
  61.           res = -1;
  62.         } else {
  63.           res = (o1.getStartTime() == o2.getStartTime() ? 0 : 1);
  64.         }
  65.       }
  66.       if (res == 0) {
  67.         res = o1.getJobID().compareTo(o2.getJobID());
  68.       }
  69.       return res;
  70.     }
  71.   };
  72.   
  73.   private Map<JobSchedulingInfo, JobInProgress> jobQueue;
  74.   
  75.   public JobQueueJobInProgressListener() {
  76.     this(new TreeMap<JobSchedulingInfo, 
  77.                      JobInProgress>(FIFO_JOB_QUEUE_COMPARATOR));
  78.   }
  79.   /**
  80.    * For clients that want to provide their own job priorities.
  81.    * @param jobQueue A collection whose iterator returns jobs in priority order.
  82.    */
  83.   protected JobQueueJobInProgressListener(Map<JobSchedulingInfo, 
  84.                                           JobInProgress> jobQueue) {
  85.     this.jobQueue = Collections.synchronizedMap(jobQueue);
  86.   }
  87.   /**
  88.    * Returns a synchronized view of the job queue.
  89.    */
  90.   public Collection<JobInProgress> getJobQueue() {
  91.     return jobQueue.values();
  92.   }
  93.   
  94.   @Override
  95.   public void jobAdded(JobInProgress job) {
  96.     jobQueue.put(new JobSchedulingInfo(job.getStatus()), job);
  97.   }
  98.   // Job will be removed once the job completes
  99.   @Override
  100.   public void jobRemoved(JobInProgress job) {}
  101.   
  102.   private void jobCompleted(JobSchedulingInfo oldInfo) {
  103.     jobQueue.remove(oldInfo);
  104.   }
  105.   
  106.   @Override
  107.   public synchronized void jobUpdated(JobChangeEvent event) {
  108.     JobInProgress job = event.getJobInProgress();
  109.     if (event instanceof JobStatusChangeEvent) {
  110.       // Check if the ordering of the job has changed
  111.       // For now priority and start-time can change the job ordering
  112.       JobStatusChangeEvent statusEvent = (JobStatusChangeEvent)event;
  113.       JobSchedulingInfo oldInfo =  
  114.         new JobSchedulingInfo(statusEvent.getOldStatus());
  115.       if (statusEvent.getEventType() == EventType.PRIORITY_CHANGED 
  116.           || statusEvent.getEventType() == EventType.START_TIME_CHANGED) {
  117.         // Make a priority change
  118.         reorderJobs(job, oldInfo);
  119.       } else if (statusEvent.getEventType() == EventType.RUN_STATE_CHANGED) {
  120.         // Check if the job is complete
  121.         int runState = statusEvent.getNewStatus().getRunState();
  122.         if (runState == JobStatus.SUCCEEDED
  123.             || runState == JobStatus.FAILED
  124.             || runState == JobStatus.KILLED) {
  125.           jobCompleted(oldInfo);
  126.         }
  127.       }
  128.     }
  129.   }
  130.   
  131.   private void reorderJobs(JobInProgress job, JobSchedulingInfo oldInfo) {
  132.     synchronized (jobQueue) {
  133.       jobQueue.remove(oldInfo);
  134.       jobQueue.put(new JobSchedulingInfo(job), job);
  135.     }
  136.   }
  137. }