Job.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:11k
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.apache.hadoop.mapred.jobcontrol;
- import java.io.IOException;
- import java.util.ArrayList;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.mapred.FileInputFormat;
- import org.apache.hadoop.mapred.JobClient;
- import org.apache.hadoop.mapred.JobConf;
- import org.apache.hadoop.mapred.JobID;
- import org.apache.hadoop.mapred.RunningJob;
- import org.apache.hadoop.util.StringUtils;
- /** This class encapsulates a MapReduce job and its dependency. It monitors
- * the states of the depending jobs and updates the state of this job.
- * A job starts in the WAITING state. If it does not have any depending jobs, or
- * all of the depending jobs are in SUCCESS state, then the job state will become
- * READY. If any depending jobs fail, the job will fail too.
- * When in READY state, the job can be submitted to Hadoop for execution, with
- * the state changing into RUNNING state. From RUNNING state, the job can get into
- * SUCCESS or FAILED state, depending the status of the job execution.
- *
- */
- public class Job {
- // A job will be in one of the following states
- final public static int SUCCESS = 0;
- final public static int WAITING = 1;
- final public static int RUNNING = 2;
- final public static int READY = 3;
- final public static int FAILED = 4;
- final public static int DEPENDENT_FAILED = 5;
-
-
- private JobConf theJobConf;
- private int state;
- private String jobID; // assigned and used by JobControl class
- private JobID mapredJobID; // the job ID assigned by map/reduce
- private String jobName; // external name, assigned/used by client app
- private String message; // some info for human consumption,
- // e.g. the reason why the job failed
- private ArrayList<Job> dependingJobs; // the jobs the current job depends on
-
- private JobClient jc = null; // the map reduce job client
-
- /**
- * Construct a job.
- * @param jobConf a mapred job configuration representing a job to be executed.
- * @param dependingJobs an array of jobs the current job depends on
- */
- public Job(JobConf jobConf, ArrayList<Job> dependingJobs) throws IOException {
- this.theJobConf = jobConf;
- this.dependingJobs = dependingJobs;
- this.state = Job.WAITING;
- this.jobID = "unassigned";
- this.mapredJobID = null; //not yet assigned
- this.jobName = "unassigned";
- this.message = "just initialized";
- this.jc = new JobClient(jobConf);
- }
-
- /**
- * Construct a job.
- *
- * @param jobConf mapred job configuration representing a job to be executed.
- * @throws IOException
- */
- public Job(JobConf jobConf) throws IOException {
- this(jobConf, null);
- }
-
- @Override
- public String toString() {
- StringBuffer sb = new StringBuffer();
- sb.append("job name:t").append(this.jobName).append("n");
- sb.append("job id:t").append(this.jobID).append("n");
- sb.append("job state:t").append(this.state).append("n");
- sb.append("job mapred id:t").append(this.mapredJobID==null ? "unassigned"
- : this.mapredJobID).append("n");
- sb.append("job message:t").append(this.message).append("n");
-
- if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
- sb.append("job has no depending job:t").append("n");
- } else {
- sb.append("job has ").append(this.dependingJobs.size()).append(" dependeng jobs:n");
- for (int i = 0; i < this.dependingJobs.size(); i++) {
- sb.append("t depending job ").append(i).append(":t");
- sb.append((this.dependingJobs.get(i)).getJobName()).append("n");
- }
- }
- return sb.toString();
- }
-
- /**
- * @return the job name of this job
- */
- public String getJobName() {
- return this.jobName;
- }
-
- /**
- * Set the job name for this job.
- * @param jobName the job name
- */
- public void setJobName(String jobName) {
- this.jobName = jobName;
- }
-
- /**
- * @return the job ID of this job assigned by JobControl
- */
- public String getJobID() {
- return this.jobID;
- }
-
- /**
- * Set the job ID for this job.
- * @param id the job ID
- */
- public void setJobID(String id) {
- this.jobID = id;
- }
-
- /**
- * @return the mapred ID of this job
- * @deprecated use {@link #getAssignedJobID()} instead
- */
- @Deprecated
- public String getMapredJobID() {
- return this.mapredJobID.toString();
- }
-
- /**
- * Set the mapred ID for this job.
- * @param mapredJobID the mapred job ID for this job.
- * @deprecated use {@link #setAssignedJobID(JobID)} instead
- */
- @Deprecated
- public void setMapredJobID(String mapredJobID) {
- this.mapredJobID = JobID.forName(mapredJobID);
- }
-
- /**
- * @return the mapred ID of this job as assigned by the
- * mapred framework.
- */
- public JobID getAssignedJobID() {
- return this.mapredJobID;
- }
-
- /**
- * Set the mapred ID for this job as assigned by the
- * mapred framework.
- * @param mapredJobID the mapred job ID for this job.
- */
- public void setAssignedJobID(JobID mapredJobID) {
- this.mapredJobID = mapredJobID;
- }
-
- /**
- * @return the mapred job conf of this job
- */
- public JobConf getJobConf() {
- return this.theJobConf;
- }
-
- /**
- * Set the mapred job conf for this job.
- * @param jobConf the mapred job conf for this job.
- */
- public void setJobConf(JobConf jobConf) {
- this.theJobConf = jobConf;
- }
-
- /**
- * @return the state of this job
- */
- public synchronized int getState() {
- return this.state;
- }
-
- /**
- * Set the state for this job.
- * @param state the new state for this job.
- */
- protected synchronized void setState(int state) {
- this.state = state;
- }
-
- /**
- * @return the message of this job
- */
- public String getMessage() {
- return this.message;
- }
-
- /**
- * Set the message for this job.
- * @param message the message for this job.
- */
- public void setMessage(String message) {
- this.message = message;
- }
-
- /**
- * @return the job client of this job
- */
- public JobClient getJobClient(){
- return this.jc;
- }
- /**
- * @return the depending jobs of this job
- */
- public ArrayList<Job> getDependingJobs() {
- return this.dependingJobs;
- }
-
- /**
- * Add a job to this jobs' dependency list. Dependent jobs can only be added while a Job
- * is waiting to run, not during or afterwards.
- *
- * @param dependingJob Job that this Job depends on.
- * @return <tt>true</tt> if the Job was added.
- */
- public synchronized boolean addDependingJob(Job dependingJob) {
- if (this.state == Job.WAITING) { //only allowed to add jobs when waiting
- if (this.dependingJobs == null) {
- this.dependingJobs = new ArrayList<Job>();
- }
- return this.dependingJobs.add(dependingJob);
- } else {
- return false;
- }
- }
-
- /**
- * @return true if this job is in a complete state
- */
- public boolean isCompleted() {
- return this.state == Job.FAILED ||
- this.state == Job.DEPENDENT_FAILED ||
- this.state == Job.SUCCESS;
- }
-
- /**
- * @return true if this job is in READY state
- */
- public boolean isReady() {
- return this.state == Job.READY;
- }
-
- /**
- * Check the state of this running job. The state may
- * remain the same, become SUCCESS or FAILED.
- */
- private void checkRunningState() {
- RunningJob running = null;
- try {
- running = jc.getJob(this.mapredJobID);
- if (running.isComplete()) {
- if (running.isSuccessful()) {
- this.state = Job.SUCCESS;
- } else {
- this.state = Job.FAILED;
- this.message = "Job failed!";
- try {
- running.killJob();
- } catch (IOException e1) {
- }
- try {
- this.jc.close();
- } catch (IOException e2) {
- }
- }
- }
- } catch (IOException ioe) {
- this.state = Job.FAILED;
- this.message = StringUtils.stringifyException(ioe);
- try {
- if (running != null)
- running.killJob();
- } catch (IOException e1) {
- }
- try {
- this.jc.close();
- } catch (IOException e1) {
- }
- }
- }
-
- /**
- * Check and update the state of this job. The state changes
- * depending on its current state and the states of the depending jobs.
- */
- synchronized int checkState() {
- if (this.state == Job.RUNNING) {
- checkRunningState();
- }
- if (this.state != Job.WAITING) {
- return this.state;
- }
- if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
- this.state = Job.READY;
- return this.state;
- }
- Job pred = null;
- int n = this.dependingJobs.size();
- for (int i = 0; i < n; i++) {
- pred = this.dependingJobs.get(i);
- int s = pred.checkState();
- if (s == Job.WAITING || s == Job.READY || s == Job.RUNNING) {
- break; // a pred is still not completed, continue in WAITING
- // state
- }
- if (s == Job.FAILED || s == Job.DEPENDENT_FAILED) {
- this.state = Job.DEPENDENT_FAILED;
- this.message = "depending job " + i + " with jobID "
- + pred.getJobID() + " failed. " + pred.getMessage();
- break;
- }
- // pred must be in success state
- if (i == n - 1) {
- this.state = Job.READY;
- }
- }
- return this.state;
- }
-
- /**
- * Submit this job to mapred. The state becomes RUNNING if submission
- * is successful, FAILED otherwise.
- */
- protected synchronized void submit() {
- try {
- if (theJobConf.getBoolean("create.empty.dir.if.nonexist", false)) {
- FileSystem fs = FileSystem.get(theJobConf);
- Path inputPaths[] = FileInputFormat.getInputPaths(theJobConf);
- for (int i = 0; i < inputPaths.length; i++) {
- if (!fs.exists(inputPaths[i])) {
- try {
- fs.mkdirs(inputPaths[i]);
- } catch (IOException e) {
- }
- }
- }
- }
- RunningJob running = jc.submitJob(theJobConf);
- this.mapredJobID = running.getID();
- this.state = Job.RUNNING;
- } catch (IOException ioe) {
- this.state = Job.FAILED;
- this.message = StringUtils.stringifyException(ioe);
- }
- }
-
- }