Job.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:11k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred.jobcontrol;
  19. import java.io.IOException;
  20. import java.util.ArrayList;
  21. import org.apache.hadoop.fs.FileSystem;
  22. import org.apache.hadoop.fs.Path;
  23. import org.apache.hadoop.mapred.FileInputFormat;
  24. import org.apache.hadoop.mapred.JobClient;
  25. import org.apache.hadoop.mapred.JobConf;
  26. import org.apache.hadoop.mapred.JobID;
  27. import org.apache.hadoop.mapred.RunningJob;
  28. import org.apache.hadoop.util.StringUtils;
  29. /** This class encapsulates a MapReduce job and its dependency. It monitors 
  30.  *  the states of the depending jobs and updates the state of this job.
  31.  *  A job starts in the WAITING state. If it does not have any depending jobs, or
  32.  *  all of the depending jobs are in SUCCESS state, then the job state will become
  33.  *  READY. If any depending jobs fail, the job will fail too. 
  34.  *  When in READY state, the job can be submitted to Hadoop for execution, with
  35.  *  the state changing into RUNNING state. From RUNNING state, the job can get into 
  36.  *  SUCCESS or FAILED state, depending the status of the job execution.
  37.  *  
  38.  */
  39. public class Job {
  40.   // A job will be in one of the following states
  41.   final public static int SUCCESS = 0;
  42.   final public static int WAITING = 1;
  43.   final public static int RUNNING = 2;
  44.   final public static int READY = 3;
  45.   final public static int FAILED = 4;
  46.   final public static int DEPENDENT_FAILED = 5;
  47.   private JobConf theJobConf;
  48.   private int state;
  49.   private String jobID;  // assigned and used by JobControl class
  50.   private JobID mapredJobID; // the job ID assigned by map/reduce
  51.   private String jobName; // external name, assigned/used by client app
  52.   private String message; // some info for human consumption, 
  53.   // e.g. the reason why the job failed
  54.   private ArrayList<Job> dependingJobs; // the jobs the current job depends on
  55.   private JobClient jc = null; // the map reduce job client
  56.   /** 
  57.    * Construct a job.
  58.    * @param jobConf a mapred job configuration representing a job to be executed.
  59.    * @param dependingJobs an array of jobs the current job depends on
  60.    */
  61.   public Job(JobConf jobConf, ArrayList<Job> dependingJobs) throws IOException {
  62.     this.theJobConf = jobConf;
  63.     this.dependingJobs = dependingJobs;
  64.     this.state = Job.WAITING;
  65.     this.jobID = "unassigned";
  66.     this.mapredJobID = null; //not yet assigned 
  67.     this.jobName = "unassigned";
  68.     this.message = "just initialized";
  69.     this.jc = new JobClient(jobConf);
  70.   }
  71.   
  72.   /**
  73.    * Construct a job.
  74.    * 
  75.    * @param jobConf mapred job configuration representing a job to be executed.
  76.    * @throws IOException
  77.    */
  78.   public Job(JobConf jobConf) throws IOException {
  79.     this(jobConf, null);
  80.   }
  81.   @Override
  82.   public String toString() {
  83.     StringBuffer sb = new StringBuffer();
  84.     sb.append("job name:t").append(this.jobName).append("n");
  85.     sb.append("job id:t").append(this.jobID).append("n");
  86.     sb.append("job state:t").append(this.state).append("n");
  87.     sb.append("job mapred id:t").append(this.mapredJobID==null ? "unassigned" 
  88.         : this.mapredJobID).append("n");
  89.     sb.append("job message:t").append(this.message).append("n");
  90.     if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
  91.       sb.append("job has no depending job:t").append("n");
  92.     } else {
  93.       sb.append("job has ").append(this.dependingJobs.size()).append(" dependeng jobs:n");
  94.       for (int i = 0; i < this.dependingJobs.size(); i++) {
  95.         sb.append("t depending job ").append(i).append(":t");
  96.         sb.append((this.dependingJobs.get(i)).getJobName()).append("n");
  97.       }
  98.     }
  99.     return sb.toString();
  100.   }
  101.   /**
  102.    * @return the job name of this job
  103.    */
  104.   public String getJobName() {
  105.     return this.jobName;
  106.   }
  107.   /**
  108.    * Set the job name for  this job.
  109.    * @param jobName the job name
  110.    */
  111.   public void setJobName(String jobName) {
  112.     this.jobName = jobName;
  113.   }
  114.   /**
  115.    * @return the job ID of this job assigned by JobControl
  116.    */
  117.   public String getJobID() {
  118.     return this.jobID;
  119.   }
  120.   /**
  121.    * Set the job ID for  this job.
  122.    * @param id the job ID
  123.    */
  124.   public void setJobID(String id) {
  125.     this.jobID = id;
  126.   }
  127.   /**
  128.    * @return the mapred ID of this job
  129.    * @deprecated use {@link #getAssignedJobID()} instead
  130.    */
  131.   @Deprecated
  132.   public String getMapredJobID() {
  133.     return this.mapredJobID.toString();
  134.   }
  135.   /**
  136.    * Set the mapred ID for this job.
  137.    * @param mapredJobID the mapred job ID for this job.
  138.    * @deprecated use {@link #setAssignedJobID(JobID)} instead
  139.    */
  140.   @Deprecated
  141.   public void setMapredJobID(String mapredJobID) {
  142.     this.mapredJobID = JobID.forName(mapredJobID);
  143.   }
  144.   /**
  145.    * @return the mapred ID of this job as assigned by the 
  146.    * mapred framework.
  147.    */
  148.   public JobID getAssignedJobID() {
  149.     return this.mapredJobID;
  150.   }
  151.   
  152.   /**
  153.    * Set the mapred ID for this job as assigned by the 
  154.    * mapred framework.
  155.    * @param mapredJobID the mapred job ID for this job.
  156.    */
  157.   public void setAssignedJobID(JobID mapredJobID) {
  158.     this.mapredJobID = mapredJobID;
  159.   }
  160.   
  161.   /**
  162.    * @return the mapred job conf of this job
  163.    */
  164.   public JobConf getJobConf() {
  165.     return this.theJobConf;
  166.   }
  167.   /**
  168.    * Set the mapred job conf for this job.
  169.    * @param jobConf the mapred job conf for this job.
  170.    */
  171.   public void setJobConf(JobConf jobConf) {
  172.     this.theJobConf = jobConf;
  173.   }
  174.   /**
  175.    * @return the state of this job
  176.    */
  177.   public synchronized int getState() {
  178.     return this.state;
  179.   }
  180.   /**
  181.    * Set the state for this job.
  182.    * @param state the new state for this job.
  183.    */
  184.   protected synchronized void setState(int state) {
  185.     this.state = state;
  186.   }
  187.   /**
  188.    * @return the message of this job
  189.    */
  190.   public String getMessage() {
  191.     return this.message;
  192.   }
  193.   /**
  194.    * Set the message for this job.
  195.    * @param message the message for this job.
  196.    */
  197.   public void setMessage(String message) {
  198.     this.message = message;
  199.   }
  200.   /**
  201.    * @return the job client of this job
  202.    */
  203.   public JobClient getJobClient(){
  204.           return this.jc;
  205.   }
  206.   /**
  207.    * @return the depending jobs of this job
  208.    */
  209.   public ArrayList<Job> getDependingJobs() {
  210.     return this.dependingJobs;
  211.   }
  212.   
  213.   /**
  214.    * Add a job to this jobs' dependency list. Dependent jobs can only be added while a Job 
  215.    * is waiting to run, not during or afterwards.
  216.    * 
  217.    * @param dependingJob Job that this Job depends on.
  218.    * @return <tt>true</tt> if the Job was added.
  219.    */
  220.   public synchronized boolean addDependingJob(Job dependingJob) {
  221.     if (this.state == Job.WAITING) { //only allowed to add jobs when waiting
  222.       if (this.dependingJobs == null) {
  223.         this.dependingJobs = new ArrayList<Job>();
  224.       }
  225.       return this.dependingJobs.add(dependingJob);
  226.     } else {
  227.       return false;
  228.     }
  229.   }
  230.   /**
  231.    * @return true if this job is in a complete state
  232.    */
  233.   public boolean isCompleted() {
  234.     return this.state == Job.FAILED || 
  235.       this.state == Job.DEPENDENT_FAILED ||
  236.       this.state == Job.SUCCESS;
  237.   }
  238.   /**
  239.    * @return true if this job is in READY state
  240.    */
  241.   public boolean isReady() {
  242.     return this.state == Job.READY;
  243.   }
  244.   /**
  245.    * Check the state of this running job. The state may 
  246.    * remain the same, become SUCCESS or FAILED.
  247.    */
  248.   private void checkRunningState() {
  249.     RunningJob running = null;
  250.     try {
  251.       running = jc.getJob(this.mapredJobID);
  252.       if (running.isComplete()) {
  253.         if (running.isSuccessful()) {
  254.           this.state = Job.SUCCESS;
  255.         } else {
  256.           this.state = Job.FAILED;
  257.           this.message = "Job failed!";
  258.           try {
  259.             running.killJob();
  260.           } catch (IOException e1) {
  261.           }
  262.           try {
  263.             this.jc.close();
  264.           } catch (IOException e2) {
  265.           }
  266.         }
  267.       }
  268.     } catch (IOException ioe) {
  269.       this.state = Job.FAILED;
  270.       this.message = StringUtils.stringifyException(ioe);
  271.       try {
  272.         if (running != null)
  273.           running.killJob();
  274.       } catch (IOException e1) {
  275.       }
  276.       try {
  277.         this.jc.close();
  278.       } catch (IOException e1) {
  279.       }
  280.     }
  281.   }
  282.   /**
  283.    * Check and update the state of this job. The state changes  
  284.    * depending on its current state and the states of the depending jobs.
  285.    */
  286.    synchronized int checkState() {
  287.     if (this.state == Job.RUNNING) {
  288.       checkRunningState();
  289.     }
  290.     if (this.state != Job.WAITING) {
  291.       return this.state;
  292.     }
  293.     if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
  294.       this.state = Job.READY;
  295.       return this.state;
  296.     }
  297.     Job pred = null;
  298.     int n = this.dependingJobs.size();
  299.     for (int i = 0; i < n; i++) {
  300.       pred = this.dependingJobs.get(i);
  301.       int s = pred.checkState();
  302.       if (s == Job.WAITING || s == Job.READY || s == Job.RUNNING) {
  303.         break; // a pred is still not completed, continue in WAITING
  304.         // state
  305.       }
  306.       if (s == Job.FAILED || s == Job.DEPENDENT_FAILED) {
  307.         this.state = Job.DEPENDENT_FAILED;
  308.         this.message = "depending job " + i + " with jobID "
  309.           + pred.getJobID() + " failed. " + pred.getMessage();
  310.         break;
  311.       }
  312.       // pred must be in success state
  313.       if (i == n - 1) {
  314.         this.state = Job.READY;
  315.       }
  316.     }
  317.     return this.state;
  318.   }
  319.   /**
  320.    * Submit this job to mapred. The state becomes RUNNING if submission 
  321.    * is successful, FAILED otherwise.  
  322.    */
  323.   protected synchronized void submit() {
  324.     try {
  325.       if (theJobConf.getBoolean("create.empty.dir.if.nonexist", false)) {
  326.         FileSystem fs = FileSystem.get(theJobConf);
  327.         Path inputPaths[] = FileInputFormat.getInputPaths(theJobConf);
  328.         for (int i = 0; i < inputPaths.length; i++) {
  329.           if (!fs.exists(inputPaths[i])) {
  330.             try {
  331.               fs.mkdirs(inputPaths[i]);
  332.             } catch (IOException e) {
  333.             }
  334.           }
  335.         }
  336.       }
  337.       RunningJob running = jc.submitJob(theJobConf);
  338.       this.mapredJobID = running.getID();
  339.       this.state = Job.RUNNING;
  340.     } catch (IOException ioe) {
  341.       this.state = Job.FAILED;
  342.       this.message = StringUtils.stringifyException(ioe);
  343.     }
  344.   }
  345. }