JobControl.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:8k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred.jobcontrol;
  19. import java.util.ArrayList;
  20. import java.util.Collection;
  21. import java.util.Hashtable;
  22. import java.util.Map;
  23. /** This class encapsulates a set of MapReduce jobs and its dependency. It tracks 
  24.  *  the states of the jobs by placing them into different tables according to their 
  25.  *  states. 
  26.  *  
  27.  *  This class provides APIs for the client app to add a job to the group and to get 
  28.  *  the jobs in the group in different states. When a 
  29.  *  job is added, an ID unique to the group is assigned to the job. 
  30.  *  
  31.  *  This class has a thread that submits jobs when they become ready, monitors the
  32.  *  states of the running jobs, and updates the states of jobs based on the state changes 
  33.  *  of their depending jobs states. The class provides APIs for suspending/resuming
  34.  *  the thread,and for stopping the thread.
  35.  *  
  36.  */
  37. public class JobControl implements Runnable{
  38.   // The thread can be in one of the following state
  39.   private static final int RUNNING = 0;
  40.   private static final int SUSPENDED = 1;
  41.   private static final int STOPPED = 2;
  42.   private static final int STOPPING = 3;
  43.   private static final int READY = 4;
  44.   private int runnerState; // the thread state
  45.   private Map<String, Job> waitingJobs;
  46.   private Map<String, Job> readyJobs;
  47.   private Map<String, Job> runningJobs;
  48.   private Map<String, Job> successfulJobs;
  49.   private Map<String, Job> failedJobs;
  50.   private long nextJobID;
  51.   private String groupName;
  52.   /** 
  53.    * Construct a job control for a group of jobs.
  54.    * @param groupName a name identifying this group
  55.    */
  56.   public JobControl(String groupName) {
  57.     this.waitingJobs = new Hashtable<String, Job>();
  58.     this.readyJobs = new Hashtable<String, Job>();
  59.     this.runningJobs = new Hashtable<String, Job>();
  60.     this.successfulJobs = new Hashtable<String, Job>();
  61.     this.failedJobs = new Hashtable<String, Job>();
  62.     this.nextJobID = -1;
  63.     this.groupName = groupName;
  64.     this.runnerState = JobControl.READY;
  65.   }
  66.   private static ArrayList<Job> toArrayList(Map<String, Job> jobs) {
  67.     ArrayList<Job> retv = new ArrayList<Job>();
  68.     synchronized (jobs) {
  69.       for (Job job : jobs.values()) {
  70.         retv.add(job);
  71.       }
  72.     }
  73.     return retv;
  74.   }
  75.   /**
  76.    * @return the jobs in the waiting state
  77.    */
  78.   public ArrayList<Job> getWaitingJobs() {
  79.     return JobControl.toArrayList(this.waitingJobs);
  80.   }
  81.   /**
  82.    * @return the jobs in the running state
  83.    */
  84.   public ArrayList<Job> getRunningJobs() {
  85.     return JobControl.toArrayList(this.runningJobs);
  86.   }
  87.   /**
  88.    * @return the jobs in the ready state
  89.    */
  90.   public ArrayList<Job> getReadyJobs() {
  91.     return JobControl.toArrayList(this.readyJobs);
  92.   }
  93.   /**
  94.    * @return the jobs in the success state
  95.    */
  96.   public ArrayList<Job> getSuccessfulJobs() {
  97.     return JobControl.toArrayList(this.successfulJobs);
  98.   }
  99.   public ArrayList<Job> getFailedJobs() {
  100.     return JobControl.toArrayList(this.failedJobs);
  101.   }
  102.   private String getNextJobID() {
  103.     nextJobID += 1;
  104.     return this.groupName + this.nextJobID;
  105.   }
  106.   private static void addToQueue(Job aJob, Map<String, Job> queue) {
  107.     synchronized(queue) {
  108.       queue.put(aJob.getJobID(), aJob);
  109.     }
  110.   }
  111.   private void addToQueue(Job aJob) {
  112.     Map<String, Job> queue = getQueue(aJob.getState());
  113.     addToQueue(aJob, queue);
  114.   }
  115.   private Map<String, Job> getQueue(int state) {
  116.     Map<String, Job> retv = null;
  117.     if (state == Job.WAITING) {
  118.       retv = this.waitingJobs;
  119.     } else if (state == Job.READY) {
  120.       retv = this.readyJobs;
  121.     } else if (state == Job.RUNNING) {
  122.       retv = this.runningJobs;
  123.     } else if (state == Job.SUCCESS) {
  124.       retv = this.successfulJobs;
  125.     } else if (state == Job.FAILED || state == Job.DEPENDENT_FAILED) {
  126.       retv = this.failedJobs;
  127.     } 
  128.     return retv;
  129.   }
  130.   /**
  131.    * Add a new job.
  132.    * @param aJob the new job
  133.    */
  134.   synchronized public String addJob(Job aJob) {
  135.     String id = this.getNextJobID();
  136.     aJob.setJobID(id);
  137.     aJob.setState(Job.WAITING);
  138.     this.addToQueue(aJob);
  139.     return id;
  140.   }
  141.   /**
  142.    * Add a collection of jobs
  143.    * 
  144.    * @param jobs
  145.    */
  146.   public void addJobs(Collection<Job> jobs) {
  147.     for (Job job : jobs) {
  148.       addJob(job);
  149.     }
  150.   }
  151.   /**
  152.    * @return the thread state
  153.    */
  154.   public int getState() {
  155.     return this.runnerState;
  156.   }
  157.   /**
  158.    * set the thread state to STOPPING so that the 
  159.    * thread will stop when it wakes up.
  160.    */
  161.   public void stop() {
  162.     this.runnerState = JobControl.STOPPING;
  163.   }
  164.   /**
  165.    * suspend the running thread
  166.    */
  167.   public void suspend () {
  168.     if (this.runnerState == JobControl.RUNNING) {
  169.       this.runnerState = JobControl.SUSPENDED;
  170.     }
  171.   }
  172.   /**
  173.    * resume the suspended thread
  174.    */
  175.   public void resume () {
  176.     if (this.runnerState == JobControl.SUSPENDED) {
  177.       this.runnerState = JobControl.RUNNING;
  178.     }
  179.   }
  180.   synchronized private void checkRunningJobs() {
  181.     Map<String, Job> oldJobs = null;
  182.     oldJobs = this.runningJobs;
  183.     this.runningJobs = new Hashtable<String, Job>();
  184.     for (Job nextJob : oldJobs.values()) {
  185.       int state = nextJob.checkState();
  186.       /*
  187.         if (state != Job.RUNNING) {
  188.         System.out.println("The state of the running job " +
  189.         nextJob.getJobName() + " has changed to: " + nextJob.getState());
  190.         }
  191.       */
  192.       this.addToQueue(nextJob);
  193.     }
  194.   }
  195.   synchronized private void checkWaitingJobs() {
  196.     Map<String, Job> oldJobs = null;
  197.     oldJobs = this.waitingJobs;
  198.     this.waitingJobs = new Hashtable<String, Job>();
  199.     for (Job nextJob : oldJobs.values()) {
  200.       int state = nextJob.checkState();
  201.       /*
  202.         if (state != Job.WAITING) {
  203.         System.out.println("The state of the waiting job " +
  204.         nextJob.getJobName() + " has changed to: " + nextJob.getState());
  205.         }
  206.       */
  207.       this.addToQueue(nextJob);
  208.     }
  209.   }
  210.   synchronized private void startReadyJobs() {
  211.     Map<String, Job> oldJobs = null;
  212.     oldJobs = this.readyJobs;
  213.     this.readyJobs = new Hashtable<String, Job>();
  214.     for (Job nextJob : oldJobs.values()) {
  215.       //System.out.println("Job to submit to Hadoop: " + nextJob.getJobName());
  216.       nextJob.submit();
  217.       //System.out.println("Hadoop ID: " + nextJob.getMapredJobID());
  218.       this.addToQueue(nextJob);
  219.     }
  220.   }
  221.   synchronized public boolean allFinished() {
  222.     return this.waitingJobs.size() == 0 &&
  223.       this.readyJobs.size() == 0 &&
  224.       this.runningJobs.size() == 0;
  225.   }
  226.   /**
  227.    *  The main loop for the thread.
  228.    *  The loop does the following:
  229.    *   Check the states of the running jobs
  230.    *   Update the states of waiting jobs
  231.    *   Submit the jobs in ready state
  232.    */
  233.   public void run() {
  234.     this.runnerState = JobControl.RUNNING;
  235.     while (true) {
  236.       while (this.runnerState == JobControl.SUSPENDED) {
  237.         try {
  238.           Thread.sleep(5000);
  239.         }
  240.         catch (Exception e) {
  241.         }
  242.       }
  243.       checkRunningJobs();
  244.       checkWaitingJobs();
  245.       startReadyJobs();
  246.       if (this.runnerState != JobControl.RUNNING && 
  247.           this.runnerState != JobControl.SUSPENDED) {
  248.         break;
  249.       }
  250.       try {
  251.         Thread.sleep(5000);
  252.       }
  253.       catch (Exception e) {
  254.       }
  255.       if (this.runnerState != JobControl.RUNNING && 
  256.           this.runnerState != JobControl.SUSPENDED) {
  257.         break;
  258.       }
  259.     }
  260.     this.runnerState = JobControl.STOPPED;
  261.   }
  262. }