JobQueuesManager.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:10k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.IOException;
  20. import java.util.Collection;
  21. import java.util.Collections;
  22. import java.util.Comparator;
  23. import java.util.HashMap;
  24. import java.util.LinkedList;
  25. import java.util.Map;
  26. import java.util.TreeMap;
  27. import org.apache.commons.logging.Log;
  28. import org.apache.commons.logging.LogFactory;
  29. import org.apache.hadoop.mapred.JobQueueJobInProgressListener.JobSchedulingInfo;
  30. import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
  31. /**
  32.  * A {@link JobInProgressListener} that maintains the jobs being managed in
  33.  * one or more queues. 
  34.  */
  35. class JobQueuesManager extends JobInProgressListener {
  36.   /* 
  37.    * If a queue supports priorities, jobs must be 
  38.    * sorted on priorities, and then on their start times (technically, 
  39.    * their insertion time.  
  40.    * If a queue doesn't support priorities, jobs are
  41.    * sorted based on their start time.  
  42.    */
  43.   
  44.   // comparator for jobs in queues that don't support priorities
  45.   private static final Comparator<JobSchedulingInfo> STARTTIME_JOB_COMPARATOR
  46.     = new Comparator<JobSchedulingInfo>() {
  47.     public int compare(JobSchedulingInfo o1, JobSchedulingInfo o2) {
  48.       // the job that started earlier wins
  49.       if (o1.getStartTime() < o2.getStartTime()) {
  50.         return -1;
  51.       } else {
  52.         return (o1.getStartTime() == o2.getStartTime() 
  53.                 ? o1.getJobID().compareTo(o2.getJobID()) 
  54.                 : 1);
  55.       }
  56.     }
  57.   };
  58.   
  59.   // class to store queue info
  60.   private static class QueueInfo {
  61.     // whether the queue supports priorities
  62.     boolean supportsPriorities;
  63.     Map<JobSchedulingInfo, JobInProgress> waitingJobs; // for waiting jobs
  64.     Map<JobSchedulingInfo, JobInProgress> runningJobs; // for running jobs
  65.     
  66.     public Comparator<JobSchedulingInfo> comparator;
  67.     
  68.     QueueInfo(boolean prio) {
  69.       this.supportsPriorities = prio;
  70.       if (supportsPriorities) {
  71.         // use the default priority-aware comparator
  72.         comparator = JobQueueJobInProgressListener.FIFO_JOB_QUEUE_COMPARATOR;
  73.       }
  74.       else {
  75.         comparator = STARTTIME_JOB_COMPARATOR;
  76.       }
  77.       waitingJobs = new TreeMap<JobSchedulingInfo, JobInProgress>(comparator);
  78.       runningJobs = new TreeMap<JobSchedulingInfo, JobInProgress>(comparator);
  79.     }
  80.     
  81.     Collection<JobInProgress> getWaitingJobs() {
  82.       synchronized (waitingJobs) {
  83.         return Collections.unmodifiableCollection(
  84.             new LinkedList<JobInProgress>(waitingJobs.values()));
  85.       }
  86.     }
  87.     
  88.     Collection<JobInProgress> getRunningJobs() {
  89.       synchronized (runningJobs) {
  90.        return Collections.unmodifiableCollection(
  91.            new LinkedList<JobInProgress>(runningJobs.values())); 
  92.       }
  93.     }
  94.     
  95.     void addRunningJob(JobInProgress job) {
  96.       synchronized (runningJobs) {
  97.        runningJobs.put(new JobSchedulingInfo(job),job); 
  98.       }
  99.     }
  100.     
  101.     JobInProgress removeRunningJob(JobSchedulingInfo jobInfo) {
  102.       synchronized (runningJobs) {
  103.         return runningJobs.remove(jobInfo); 
  104.       }
  105.     }
  106.     
  107.     JobInProgress removeWaitingJob(JobSchedulingInfo schedInfo) {
  108.       synchronized (waitingJobs) {
  109.         return waitingJobs.remove(schedInfo);
  110.       }
  111.     }
  112.     
  113.     void addWaitingJob(JobInProgress job) {
  114.       synchronized (waitingJobs) {
  115.         waitingJobs.put(new JobSchedulingInfo(job), job);
  116.       }
  117.     }
  118.     
  119.     int getWaitingJobCount() {
  120.       synchronized (waitingJobs) {
  121.        return waitingJobs.size(); 
  122.       }
  123.     }
  124.     
  125.   }
  126.   
  127.   // we maintain a hashmap of queue-names to queue info
  128.   private Map<String, QueueInfo> jobQueues = 
  129.     new HashMap<String, QueueInfo>();
  130.   private static final Log LOG = LogFactory.getLog(JobQueuesManager.class);
  131.   private CapacityTaskScheduler scheduler;
  132.   
  133.   JobQueuesManager(CapacityTaskScheduler s) {
  134.     this.scheduler = s;
  135.   }
  136.   
  137.   /**
  138.    * create an empty queue with the default comparator
  139.    * @param queueName The name of the queue
  140.    * @param supportsPriotities whether the queue supports priorities
  141.    */
  142.   public void createQueue(String queueName, boolean supportsPriotities) {
  143.     jobQueues.put(queueName, new QueueInfo(supportsPriotities));
  144.   }
  145.   
  146.   /**
  147.    * Returns the queue of running jobs associated with the name
  148.    */
  149.   public Collection<JobInProgress> getRunningJobQueue(String queueName) {
  150.     return jobQueues.get(queueName).getRunningJobs();
  151.   }
  152.   
  153.   /**
  154.    * Returns the queue of waiting jobs associated with queue name.
  155.    * 
  156.    */
  157.   Collection<JobInProgress> getWaitingJobs(String queueName) {
  158.     return jobQueues.get(queueName).getWaitingJobs();
  159.   }
  160.   
  161.   @Override
  162.   public void jobAdded(JobInProgress job) throws IOException {
  163.     LOG.info("Job submitted to queue " + job.getProfile().getQueueName());
  164.     // add job to the right queue
  165.     QueueInfo qi = jobQueues.get(job.getProfile().getQueueName());
  166.     if (null == qi) {
  167.       // job was submitted to a queue we're not aware of
  168.       LOG.warn("Invalid queue " + job.getProfile().getQueueName() + 
  169.           " specified for job" + job.getProfile().getJobID() + 
  170.           ". Ignoring job.");
  171.       return;
  172.     }
  173.     // add job to waiting queue. It will end up in the right place, 
  174.     // based on priority. 
  175.     qi.addWaitingJob(job);
  176.     // let scheduler know. 
  177.     scheduler.jobAdded(job);
  178.   }
  179.   /*
  180.    * Method removes the jobs from both running and waiting job queue in 
  181.    * job queue manager.
  182.    */
  183.   private void jobCompleted(JobInProgress job, JobSchedulingInfo oldInfo, 
  184.                             QueueInfo qi) {
  185.     LOG.info("Job " + job.getJobID().toString() + " submitted to queue " 
  186.         + job.getProfile().getQueueName() + " has completed");
  187.     //remove jobs from both queue's a job can be in
  188.     //running and waiting queue at the same time.
  189.     qi.removeRunningJob(oldInfo);
  190.     qi.removeWaitingJob(oldInfo);
  191.     // let scheduler know
  192.     scheduler.jobCompleted(job);
  193.   }
  194.   
  195.   // Note that job is removed when the job completes i.e in jobUpated()
  196.   @Override
  197.   public void jobRemoved(JobInProgress job) {}
  198.   
  199.   // This is used to reposition a job in the queue. A job can get repositioned 
  200.   // because of the change in the job priority or job start-time.
  201.   private void reorderJobs(JobInProgress job, JobSchedulingInfo oldInfo, 
  202.                            QueueInfo qi) {
  203.     
  204.     if(qi.removeWaitingJob(oldInfo) != null) {
  205.       qi.addWaitingJob(job);
  206.     }
  207.     if(qi.removeRunningJob(oldInfo) != null) {
  208.       qi.addRunningJob(job);
  209.     }
  210.   }
  211.   
  212.   // This is used to move a job from the waiting queue to the running queue.
  213.   private void makeJobRunning(JobInProgress job, JobSchedulingInfo oldInfo, 
  214.                               QueueInfo qi) {
  215.     // Removing of the job from job list is responsibility of the
  216.     //initialization poller.
  217.     // Add the job to the running queue
  218.     qi.addRunningJob(job);
  219.   }
  220.   
  221.   // Update the scheduler as job's state has changed
  222.   private void jobStateChanged(JobStatusChangeEvent event, QueueInfo qi) {
  223.     JobInProgress job = event.getJobInProgress();
  224.     JobSchedulingInfo oldJobStateInfo = 
  225.       new JobSchedulingInfo(event.getOldStatus());
  226.     // Check if the ordering of the job has changed
  227.     // For now priority and start-time can change the job ordering
  228.     if (event.getEventType() == EventType.PRIORITY_CHANGED 
  229.         || event.getEventType() == EventType.START_TIME_CHANGED) {
  230.       // Make a priority change
  231.       reorderJobs(job, oldJobStateInfo, qi);
  232.     } else if (event.getEventType() == EventType.RUN_STATE_CHANGED) {
  233.       // Check if the job is complete
  234.       int runState = job.getStatus().getRunState();
  235.       if (runState == JobStatus.SUCCEEDED
  236.           || runState == JobStatus.FAILED
  237.           || runState == JobStatus.KILLED) {
  238.         jobCompleted(job, oldJobStateInfo, qi);
  239.       } else if (runState == JobStatus.RUNNING) {
  240.         makeJobRunning(job, oldJobStateInfo, qi);
  241.       }
  242.     }
  243.   }
  244.   
  245.   @Override
  246.   public void jobUpdated(JobChangeEvent event) {
  247.     JobInProgress job = event.getJobInProgress();
  248.     QueueInfo qi = jobQueues.get(job.getProfile().getQueueName());
  249.     if (null == qi) {
  250.       // can't find queue for job. Shouldn't happen. 
  251.       LOG.warn("Could not find queue " + job.getProfile().getQueueName() + 
  252.           " when updating job " + job.getProfile().getJobID());
  253.       return;
  254.     }
  255.     
  256.     // Check if this is the status change
  257.     if (event instanceof JobStatusChangeEvent) {
  258.       jobStateChanged((JobStatusChangeEvent)event, qi);
  259.     }
  260.   }
  261.   
  262.   void removeJobFromWaitingQueue(JobInProgress job) {
  263.     String queue = job.getProfile().getQueueName();
  264.     QueueInfo qi = jobQueues.get(queue);
  265.     qi.removeWaitingJob(new JobSchedulingInfo(job));
  266.   }
  267.   
  268.   Comparator<JobSchedulingInfo> getComparator(String queue) {
  269.     return jobQueues.get(queue).comparator;
  270.   }
  271.   
  272.   int getWaitingJobCount(String queue) {
  273.     QueueInfo qi = jobQueues.get(queue);
  274.     return qi.getWaitingJobCount();
  275.   }
  276.   boolean doesQueueSupportPriorities(String queueName) {
  277.     return jobQueues.get(queueName).supportsPriorities;
  278.   }
  279. }