Job.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:15k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapreduce;
  19. import java.io.IOException;
  20. import org.apache.hadoop.conf.Configuration;
  21. import org.apache.hadoop.fs.Path;
  22. import org.apache.hadoop.io.RawComparator;
  23. import org.apache.hadoop.mapreduce.TaskAttemptID;
  24. import org.apache.hadoop.mapred.JobClient;
  25. import org.apache.hadoop.mapred.JobConf;
  26. import org.apache.hadoop.mapred.RunningJob;
  27. import org.apache.hadoop.mapred.TaskCompletionEvent;
  28. /**
  29.  * The job submitter's view of the Job. It allows the user to configure the
  30.  * job, submit it, control its execution, and query the state. The set methods
  31.  * only work until the job is submitted, afterwards they will throw an 
  32.  * IllegalStateException.
  33.  */
  34. public class Job extends JobContext {  
  35.   public static enum JobState {DEFINE, RUNNING};
  36.   private JobState state = JobState.DEFINE;
  37.   private JobClient jobClient;
  38.   private RunningJob info;
  39.   public Job() throws IOException {
  40.     this(new Configuration());
  41.   }
  42.   public Job(Configuration conf) throws IOException {
  43.     super(conf, null);
  44.     jobClient = new JobClient((JobConf) getConfiguration());
  45.   }
  46.   public Job(Configuration conf, String jobName) throws IOException {
  47.     this(conf);
  48.     setJobName(jobName);
  49.   }
  50.   private void ensureState(JobState state) throws IllegalStateException {
  51.     if (state != this.state) {
  52.       throw new IllegalStateException("Job in state "+ this.state + 
  53.                                       " instead of " + state);
  54.     }
  55.   }
  56.   /**
  57.    * Set the number of reduce tasks for the job.
  58.    * @param tasks the number of reduce tasks
  59.    * @throws IllegalStateException if the job is submitted
  60.    */
  61.   public void setNumReduceTasks(int tasks) throws IllegalStateException {
  62.     ensureState(JobState.DEFINE);
  63.     conf.setNumReduceTasks(tasks);
  64.   }
  65.   /**
  66.    * Set the current working directory for the default file system.
  67.    * 
  68.    * @param dir the new current working directory.
  69.    * @throws IllegalStateException if the job is submitted
  70.    */
  71.   public void setWorkingDirectory(Path dir) throws IOException {
  72.     ensureState(JobState.DEFINE);
  73.     conf.setWorkingDirectory(dir);
  74.   }
  75.   /**
  76.    * Set the {@link InputFormat} for the job.
  77.    * @param cls the <code>InputFormat</code> to use
  78.    * @throws IllegalStateException if the job is submitted
  79.    */
  80.   public void setInputFormatClass(Class<? extends InputFormat> cls
  81.                                   ) throws IllegalStateException {
  82.     ensureState(JobState.DEFINE);
  83.     conf.setClass(INPUT_FORMAT_CLASS_ATTR, cls, InputFormat.class);
  84.   }
  85.   /**
  86.    * Set the {@link OutputFormat} for the job.
  87.    * @param cls the <code>OutputFormat</code> to use
  88.    * @throws IllegalStateException if the job is submitted
  89.    */
  90.   public void setOutputFormatClass(Class<? extends OutputFormat> cls
  91.                                    ) throws IllegalStateException {
  92.     ensureState(JobState.DEFINE);
  93.     conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls, OutputFormat.class);
  94.   }
  95.   /**
  96.    * Set the {@link Mapper} for the job.
  97.    * @param cls the <code>Mapper</code> to use
  98.    * @throws IllegalStateException if the job is submitted
  99.    */
  100.   public void setMapperClass(Class<? extends Mapper> cls
  101.                              ) throws IllegalStateException {
  102.     ensureState(JobState.DEFINE);
  103.     conf.setClass(MAP_CLASS_ATTR, cls, Mapper.class);
  104.   }
  105.   /**
  106.    * Set the Jar by finding where a given class came from.
  107.    * @param cls the example class
  108.    */
  109.   public void setJarByClass(Class<?> cls) {
  110.     conf.setJarByClass(cls);
  111.   }
  112.   
  113.   /**
  114.    * Get the pathname of the job's jar.
  115.    * @return the pathname
  116.    */
  117.   public String getJar() {
  118.     return conf.getJar();
  119.   }
  120.   /**
  121.    * Set the combiner class for the job.
  122.    * @param cls the combiner to use
  123.    * @throws IllegalStateException if the job is submitted
  124.    */
  125.   public void setCombinerClass(Class<? extends Reducer> cls
  126.                                ) throws IllegalStateException {
  127.     ensureState(JobState.DEFINE);
  128.     conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class);
  129.   }
  130.   /**
  131.    * Set the {@link Reducer} for the job.
  132.    * @param cls the <code>Reducer</code> to use
  133.    * @throws IllegalStateException if the job is submitted
  134.    */
  135.   public void setReducerClass(Class<? extends Reducer> cls
  136.                               ) throws IllegalStateException {
  137.     ensureState(JobState.DEFINE);
  138.     conf.setClass(REDUCE_CLASS_ATTR, cls, Reducer.class);
  139.   }
  140.   /**
  141.    * Set the {@link Partitioner} for the job.
  142.    * @param cls the <code>Partitioner</code> to use
  143.    * @throws IllegalStateException if the job is submitted
  144.    */
  145.   public void setPartitionerClass(Class<? extends Partitioner> cls
  146.                                   ) throws IllegalStateException {
  147.     ensureState(JobState.DEFINE);
  148.     conf.setClass(PARTITIONER_CLASS_ATTR, cls, Partitioner.class);
  149.   }
  150.   /**
  151.    * Set the key class for the map output data. This allows the user to
  152.    * specify the map output key class to be different than the final output
  153.    * value class.
  154.    * 
  155.    * @param theClass the map output key class.
  156.    * @throws IllegalStateException if the job is submitted
  157.    */
  158.   public void setMapOutputKeyClass(Class<?> theClass
  159.                                    ) throws IllegalStateException {
  160.     ensureState(JobState.DEFINE);
  161.     conf.setMapOutputKeyClass(theClass);
  162.   }
  163.   /**
  164.    * Set the value class for the map output data. This allows the user to
  165.    * specify the map output value class to be different than the final output
  166.    * value class.
  167.    * 
  168.    * @param theClass the map output value class.
  169.    * @throws IllegalStateException if the job is submitted
  170.    */
  171.   public void setMapOutputValueClass(Class<?> theClass
  172.                                      ) throws IllegalStateException {
  173.     ensureState(JobState.DEFINE);
  174.     conf.setMapOutputValueClass(theClass);
  175.   }
  176.   /**
  177.    * Set the key class for the job output data.
  178.    * 
  179.    * @param theClass the key class for the job output data.
  180.    * @throws IllegalStateException if the job is submitted
  181.    */
  182.   public void setOutputKeyClass(Class<?> theClass
  183.                                 ) throws IllegalStateException {
  184.     ensureState(JobState.DEFINE);
  185.     conf.setOutputKeyClass(theClass);
  186.   }
  187.   /**
  188.    * Set the value class for job outputs.
  189.    * 
  190.    * @param theClass the value class for job outputs.
  191.    * @throws IllegalStateException if the job is submitted
  192.    */
  193.   public void setOutputValueClass(Class<?> theClass
  194.                                   ) throws IllegalStateException {
  195.     ensureState(JobState.DEFINE);
  196.     conf.setOutputValueClass(theClass);
  197.   }
  198.   /**
  199.    * Define the comparator that controls how the keys are sorted before they
  200.    * are passed to the {@link Reducer}.
  201.    * @param cls the raw comparator
  202.    * @throws IllegalStateException if the job is submitted
  203.    */
  204.   public void setSortComparatorClass(Class<? extends RawComparator> cls
  205.                                      ) throws IllegalStateException {
  206.     ensureState(JobState.DEFINE);
  207.     conf.setOutputKeyComparatorClass(cls);
  208.   }
  209.   /**
  210.    * Define the comparator that controls which keys are grouped together
  211.    * for a single call to 
  212.    * {@link Reducer#reduce(Object, Iterable, 
  213.    *                       org.apache.hadoop.mapreduce.Reducer.Context)}
  214.    * @param cls the raw comparator to use
  215.    * @throws IllegalStateException if the job is submitted
  216.    */
  217.   public void setGroupingComparatorClass(Class<? extends RawComparator> cls
  218.                                          ) throws IllegalStateException {
  219.     ensureState(JobState.DEFINE);
  220.     conf.setOutputValueGroupingComparator(cls);
  221.   }
  222.   /**
  223.    * Set the user-specified job name.
  224.    * 
  225.    * @param name the job's new name.
  226.    * @throws IllegalStateException if the job is submitted
  227.    */
  228.   public void setJobName(String name) throws IllegalStateException {
  229.     ensureState(JobState.DEFINE);
  230.     conf.setJobName(name);
  231.   }
  232.   /**
  233.    * Get the URL where some job progress information will be displayed.
  234.    * 
  235.    * @return the URL where some job progress information will be displayed.
  236.    */
  237.   public String getTrackingURL() {
  238.     ensureState(JobState.RUNNING);
  239.     return info.getTrackingURL();
  240.   }
  241.   /**
  242.    * Get the <i>progress</i> of the job's map-tasks, as a float between 0.0 
  243.    * and 1.0.  When all map tasks have completed, the function returns 1.0.
  244.    * 
  245.    * @return the progress of the job's map-tasks.
  246.    * @throws IOException
  247.    */
  248.   public float mapProgress() throws IOException {
  249.     ensureState(JobState.RUNNING);
  250.     return info.mapProgress();
  251.   }
  252.   /**
  253.    * Get the <i>progress</i> of the job's reduce-tasks, as a float between 0.0 
  254.    * and 1.0.  When all reduce tasks have completed, the function returns 1.0.
  255.    * 
  256.    * @return the progress of the job's reduce-tasks.
  257.    * @throws IOException
  258.    */
  259.   public float reduceProgress() throws IOException {
  260.     ensureState(JobState.RUNNING);
  261.     return info.reduceProgress();
  262.   }
  263.   /**
  264.    * Check if the job is finished or not. 
  265.    * This is a non-blocking call.
  266.    * 
  267.    * @return <code>true</code> if the job is complete, else <code>false</code>.
  268.    * @throws IOException
  269.    */
  270.   public boolean isComplete() throws IOException {
  271.     ensureState(JobState.RUNNING);
  272.     return info.isComplete();
  273.   }
  274.   /**
  275.    * Check if the job completed successfully. 
  276.    * 
  277.    * @return <code>true</code> if the job succeeded, else <code>false</code>.
  278.    * @throws IOException
  279.    */
  280.   public boolean isSuccessful() throws IOException {
  281.     ensureState(JobState.RUNNING);
  282.     return info.isSuccessful();
  283.   }
  284.   /**
  285.    * Kill the running job.  Blocks until all job tasks have been
  286.    * killed as well.  If the job is no longer running, it simply returns.
  287.    * 
  288.    * @throws IOException
  289.    */
  290.   public void killJob() throws IOException {
  291.     ensureState(JobState.RUNNING);
  292.     info.killJob();
  293.   }
  294.     
  295.   /**
  296.    * Get events indicating completion (success/failure) of component tasks.
  297.    *  
  298.    * @param startFrom index to start fetching events from
  299.    * @return an array of {@link TaskCompletionEvent}s
  300.    * @throws IOException
  301.    */
  302.   public TaskCompletionEvent[] getTaskCompletionEvents(int startFrom
  303.                                                        ) throws IOException {
  304.     ensureState(JobState.RUNNING);
  305.     return info.getTaskCompletionEvents(startFrom);
  306.   }
  307.   
  308.   /**
  309.    * Kill indicated task attempt.
  310.    * 
  311.    * @param taskId the id of the task to be terminated.
  312.    * @throws IOException
  313.    */
  314.   public void killTask(TaskAttemptID taskId) throws IOException {
  315.     ensureState(JobState.RUNNING);
  316.     info.killTask(org.apache.hadoop.mapred.TaskAttemptID.downgrade(taskId), 
  317.                   false);
  318.   }
  319.   /**
  320.    * Fail indicated task attempt.
  321.    * 
  322.    * @param taskId the id of the task to be terminated.
  323.    * @throws IOException
  324.    */
  325.   public void failTask(TaskAttemptID taskId) throws IOException {
  326.     ensureState(JobState.RUNNING);
  327.     info.killTask(org.apache.hadoop.mapred.TaskAttemptID.downgrade(taskId), 
  328.                   true);
  329.   }
  330.   /**
  331.    * Gets the counters for this job.
  332.    * 
  333.    * @return the counters for this job.
  334.    * @throws IOException
  335.    */
  336.   public Counters getCounters() throws IOException {
  337.     ensureState(JobState.RUNNING);
  338.     return new Counters(info.getCounters());
  339.   }
  340.   private void ensureNotSet(String attr, String msg) throws IOException {
  341.     if (conf.get(attr) != null) {
  342.       throw new IOException(attr + " is incompatible with " + msg + " mode.");
  343.     }    
  344.   }
  345.   /**
  346.    * Default to the new APIs unless they are explicitly set or the old mapper or
  347.    * reduce attributes are used.
  348.    * @throws IOException if the configuration is inconsistant
  349.    */
  350.   private void setUseNewAPI() throws IOException {
  351.     int numReduces = conf.getNumReduceTasks();
  352.     String oldMapperClass = "mapred.mapper.class";
  353.     String oldReduceClass = "mapred.reducer.class";
  354.     conf.setBooleanIfUnset("mapred.mapper.new-api",
  355.                            conf.get(oldMapperClass) == null);
  356.     if (conf.getUseNewMapper()) {
  357.       String mode = "new map API";
  358.       ensureNotSet("mapred.input.format.class", mode);
  359.       ensureNotSet(oldMapperClass, mode);
  360.       if (numReduces != 0) {
  361.         ensureNotSet("mapred.partitioner.class", mode);
  362.        } else {
  363.         ensureNotSet("mapred.output.format.class", mode);
  364.       }      
  365.     } else {
  366.       String mode = "map compatability";
  367.       ensureNotSet(JobContext.INPUT_FORMAT_CLASS_ATTR, mode);
  368.       ensureNotSet(JobContext.MAP_CLASS_ATTR, mode);
  369.       if (numReduces != 0) {
  370.         ensureNotSet(JobContext.PARTITIONER_CLASS_ATTR, mode);
  371.        } else {
  372.         ensureNotSet(JobContext.OUTPUT_FORMAT_CLASS_ATTR, mode);
  373.       }
  374.     }
  375.     if (numReduces != 0) {
  376.       conf.setBooleanIfUnset("mapred.reducer.new-api",
  377.                              conf.get(oldReduceClass) == null);
  378.       if (conf.getUseNewReducer()) {
  379.         String mode = "new reduce API";
  380.         ensureNotSet("mapred.output.format.class", mode);
  381.         ensureNotSet(oldReduceClass, mode);   
  382.       } else {
  383.         String mode = "reduce compatability";
  384.         ensureNotSet(JobContext.OUTPUT_FORMAT_CLASS_ATTR, mode);
  385.         ensureNotSet(JobContext.REDUCE_CLASS_ATTR, mode);   
  386.       }
  387.     }   
  388.   }
  389.   /**
  390.    * Submit the job to the cluster and return immediately.
  391.    * @throws IOException
  392.    */
  393.   public void submit() throws IOException, InterruptedException, 
  394.                               ClassNotFoundException {
  395.     ensureState(JobState.DEFINE);
  396.     setUseNewAPI();
  397.     info = jobClient.submitJobInternal(conf);
  398.     state = JobState.RUNNING;
  399.    }
  400.   
  401.   /**
  402.    * Submit the job to the cluster and wait for it to finish.
  403.    * @param verbose print the progress to the user
  404.    * @return true if the job succeeded
  405.    * @throws IOException thrown if the communication with the 
  406.    *         <code>JobTracker</code> is lost
  407.    */
  408.   public boolean waitForCompletion(boolean verbose
  409.                                    ) throws IOException, InterruptedException,
  410.                                             ClassNotFoundException {
  411.     if (state == JobState.DEFINE) {
  412.       submit();
  413.     }
  414.     if (verbose) {
  415.       jobClient.monitorAndPrintJob(conf, info);
  416.     } else {
  417.       info.waitForCompletion();
  418.     }
  419.     return isSuccessful();
  420.   }
  421.   
  422. }