JobConf.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:54k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.IOException;
  20. import java.net.URL;
  21. import java.net.URLDecoder;
  22. import java.util.Enumeration;
  23. import org.apache.commons.logging.Log;
  24. import org.apache.commons.logging.LogFactory;
  25. import org.apache.hadoop.filecache.DistributedCache;
  26. import org.apache.hadoop.fs.FileStatus;
  27. import org.apache.hadoop.fs.FileSystem;
  28. import org.apache.hadoop.fs.Path;
  29. import org.apache.hadoop.conf.Configuration;
  30. import org.apache.hadoop.io.*;
  31. import org.apache.hadoop.io.compress.CompressionCodec;
  32. import org.apache.hadoop.mapred.lib.IdentityMapper;
  33. import org.apache.hadoop.mapred.lib.IdentityReducer;
  34. import org.apache.hadoop.mapred.lib.HashPartitioner;
  35. import org.apache.hadoop.mapred.lib.KeyFieldBasedComparator;
  36. import org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner;
  37. import org.apache.hadoop.util.ReflectionUtils;
  38. import org.apache.hadoop.util.Tool;
  39. /** 
  40.  * A map/reduce job configuration.
  41.  * 
  42.  * <p><code>JobConf</code> is the primary interface for a user to describe a 
  43.  * map-reduce job to the Hadoop framework for execution. The framework tries to
  44.  * faithfully execute the job as-is described by <code>JobConf</code>, however:
  45.  * <ol>
  46.  *   <li>
  47.  *   Some configuration parameters might have been marked as 
  48.  *   <a href="{@docRoot}/org/apache/hadoop/conf/Configuration.html#FinalParams">
  49.  *   final</a> by administrators and hence cannot be altered.
  50.  *   </li>
  51.  *   <li>
  52.  *   While some job parameters are straight-forward to set 
  53.  *   (e.g. {@link #setNumReduceTasks(int)}), some parameters interact subtly 
  54.  *   rest of the framework and/or job-configuration and is relatively more 
  55.  *   complex for the user to control finely (e.g. {@link #setNumMapTasks(int)}).
  56.  *   </li>
  57.  * </ol></p>
  58.  * 
  59.  * <p><code>JobConf</code> typically specifies the {@link Mapper}, combiner 
  60.  * (if any), {@link Partitioner}, {@link Reducer}, {@link InputFormat} and 
  61.  * {@link OutputFormat} implementations to be used etc.
  62.  *
  63.  * <p>Optionally <code>JobConf</code> is used to specify other advanced facets 
  64.  * of the job such as <code>Comparator</code>s to be used, files to be put in  
  65.  * the {@link DistributedCache}, whether or not intermediate and/or job outputs 
  66.  * are to be compressed (and how), debugability via user-provided scripts 
  67.  * ( {@link #setMapDebugScript(String)}/{@link #setReduceDebugScript(String)}),
  68.  * for doing post-processing on task logs, task's stdout, stderr, syslog. 
  69.  * and etc.</p>
  70.  * 
  71.  * <p>Here is an example on how to configure a job via <code>JobConf</code>:</p>
  72.  * <p><blockquote><pre>
  73.  *     // Create a new JobConf
  74.  *     JobConf job = new JobConf(new Configuration(), MyJob.class);
  75.  *     
  76.  *     // Specify various job-specific parameters     
  77.  *     job.setJobName("myjob");
  78.  *     
  79.  *     FileInputFormat.setInputPaths(job, new Path("in"));
  80.  *     FileOutputFormat.setOutputPath(job, new Path("out"));
  81.  *     
  82.  *     job.setMapperClass(MyJob.MyMapper.class);
  83.  *     job.setCombinerClass(MyJob.MyReducer.class);
  84.  *     job.setReducerClass(MyJob.MyReducer.class);
  85.  *     
  86.  *     job.setInputFormat(SequenceFileInputFormat.class);
  87.  *     job.setOutputFormat(SequenceFileOutputFormat.class);
  88.  * </pre></blockquote></p>
  89.  * 
  90.  * @see JobClient
  91.  * @see ClusterStatus
  92.  * @see Tool
  93.  * @see DistributedCache
  94.  * @deprecated Use {@link Configuration} instead
  95.  */
  96. @Deprecated
  97. public class JobConf extends Configuration {
  98.   
  99.   private static final Log LOG = LogFactory.getLog(JobConf.class);
  100.   static{
  101.     Configuration.addDefaultResource("mapred-default.xml");
  102.     Configuration.addDefaultResource("mapred-site.xml");
  103.   }
  104.   /**
  105.    * A value which if set for memory related configuration options,
  106.    * indicates that the options are turned off.
  107.    */
  108.   public static final long DISABLED_MEMORY_LIMIT = -1L;
  109.   
  110.   /**
  111.    * Name of the queue to which jobs will be submitted, if no queue
  112.    * name is mentioned.
  113.    */
  114.   public static final String DEFAULT_QUEUE_NAME = "default";
  115.   /**
  116.    * Cluster-wide configuration to be set by the administrators that provides
  117.    * default amount of maximum virtual memory for job's tasks. This has to be
  118.    * set on both the JobTracker node for the sake of scheduling decisions and on
  119.    * the TaskTracker nodes for the sake of memory management.
  120.    * 
  121.    * <p>
  122.    * 
  123.    * If a job doesn't specify its virtual memory requirement by setting
  124.    * {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to {@link #DISABLED_MEMORY_LIMIT},
  125.    * tasks are assured a memory limit set to this property. This property is
  126.    * disabled by default, and if not explicitly set to a valid value by the
  127.    * administrators and if a job doesn't specify its virtual memory
  128.    * requirements, the job's tasks will not be assured anything and may be
  129.    * killed by a TT that intends to control the total memory usage of the tasks
  130.    * via memory management functionality.
  131.    * 
  132.    * <p>
  133.    * 
  134.    * This value should in general be less than the cluster-wide configuration
  135.    * {@link #UPPER_LIMIT_ON_TASK_VMEM_PROPERTY} . If not or if it not set,
  136.    * TaskTracker's memory management may be disabled and a scheduler's memory
  137.    * based scheduling decisions will be affected. Please refer to the
  138.    * documentation of the configured scheduler to see how this property is used.
  139.    */
  140.   public static final String MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY =
  141.       "mapred.task.default.maxvmem";
  142.   /**
  143.    * The maximum amount of memory any task of this job will use.
  144.    * 
  145.    * <p>
  146.    * 
  147.    * This value will be used by TaskTrackers for monitoring the memory usage of
  148.    * tasks of this jobs. If a TaskTracker's memory management functionality is
  149.    * enabled, each task of this job will be allowed to use a maximum virtual
  150.    * memory specified by this property. If the task's memory usage goes over
  151.    * this value, the task will be failed by the TT. If not set, the cluster-wide
  152.    * configuration {@link #MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY} is used as the
  153.    * default value for memory requirements. If this property cascaded with
  154.    * {@link #MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY} becomes equal to -1, job's
  155.    * tasks will not be assured anything and may be killed by a TT that intends
  156.    * to control the total memory usage of the tasks via memory management
  157.    * functionality. If the memory management functionality is disabled on a TT,
  158.    * this value is ignored.
  159.    * 
  160.    * <p>
  161.    * 
  162.    * This value should also be not more than the cluster-wide configuration
  163.    * {@link #UPPER_LIMIT_ON_TASK_VMEM_PROPERTY} which has to be set by the site
  164.    * administrators.
  165.    * 
  166.    * <p>
  167.    * 
  168.    * This value may be used by schedulers that support scheduling based on job's
  169.    * memory requirements. In general, a task of this job will be scheduled on a
  170.    * TaskTracker only if the amount of virtual memory still unoccupied on the
  171.    * TaskTracker is greater than or equal to this value. But different
  172.    * schedulers can take different decisions. Please refer to the documentation
  173.    * of the scheduler being configured to see if it does memory based scheduling
  174.    * and if it does, how this property is used by that scheduler.
  175.    * 
  176.    * @see #setMaxVirtualMemoryForTask(long)
  177.    * @see #getMaxVirtualMemoryForTask()
  178.    */
  179.   public static final String MAPRED_TASK_MAXVMEM_PROPERTY =
  180.       "mapred.task.maxvmem";
  181.   /**
  182.    * The maximum amount of physical memory any task of a job will use.
  183.    * 
  184.    * <p>
  185.    * 
  186.    * This value may be used by schedulers that support scheduling based on job's
  187.    * memory requirements. In general, a task of this job will be scheduled on a
  188.    * TaskTracker, only if the amount of physical memory still unoccupied on the
  189.    * TaskTracker is greater than or equal to this value. But different
  190.    * schedulers can take different decisions. Please refer to the documentation
  191.    * of the scheduler being configured to see how it does memory based
  192.    * scheduling and how this variable is used by that scheduler.
  193.    * 
  194.    * @see #setMaxPhysicalMemoryForTask(long)
  195.    * @see #getMaxPhysicalMemoryForTask()
  196.    */
  197.   public static final String MAPRED_TASK_MAXPMEM_PROPERTY =
  198.       "mapred.task.maxpmem";
  199.   /**
  200.    * Cluster-wide configuration to be set by the site administrators that
  201.    * provides an upper limit on the maximum virtual memory that can be specified
  202.    * by a job. The job configuration {@link #MAPRED_TASK_MAXVMEM_PROPERTY} and
  203.    * the cluster-wide configuration
  204.    * {@link #MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY} should, by definition, be
  205.    * less than this value. If the job configuration
  206.    * {@link #MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY} is more than this value,
  207.    * depending on the scheduler being configured, the job may be rejected or the
  208.    * job configuration may just be ignored.
  209.    * 
  210.    * <p>
  211.    * 
  212.    * If it is not set on a TaskTracker, TaskTracker's memory management will be
  213.    * disabled.
  214.    */
  215.   public static final String UPPER_LIMIT_ON_TASK_VMEM_PROPERTY =
  216.       "mapred.task.limit.maxvmem";
  217.   /**
  218.    * Construct a map/reduce job configuration.
  219.    */
  220.   public JobConf() {}
  221.   /** 
  222.    * Construct a map/reduce job configuration.
  223.    * 
  224.    * @param exampleClass a class whose containing jar is used as the job's jar.
  225.    */
  226.   public JobConf(Class exampleClass) {
  227.     setJarByClass(exampleClass);
  228.   }
  229.   
  230.   /**
  231.    * Construct a map/reduce job configuration.
  232.    * 
  233.    * @param conf a Configuration whose settings will be inherited.
  234.    */
  235.   public JobConf(Configuration conf) {
  236.     super(conf);
  237.   }
  238.   /** Construct a map/reduce job configuration.
  239.    * 
  240.    * @param conf a Configuration whose settings will be inherited.
  241.    * @param exampleClass a class whose containing jar is used as the job's jar.
  242.    */
  243.   public JobConf(Configuration conf, Class exampleClass) {
  244.     this(conf);
  245.     setJarByClass(exampleClass);
  246.   }
  247.   /** Construct a map/reduce configuration.
  248.    *
  249.    * @param config a Configuration-format XML job description file.
  250.    */
  251.   public JobConf(String config) {
  252.     this(new Path(config));
  253.   }
  254.   /** Construct a map/reduce configuration.
  255.    *
  256.    * @param config a Configuration-format XML job description file.
  257.    */
  258.   public JobConf(Path config) {
  259.     super();
  260.     addResource(config);
  261.   }
  262.   /** A new map/reduce configuration where the behavior of reading from the
  263.    * default resources can be turned off.
  264.    * <p/>
  265.    * If the parameter {@code loadDefaults} is false, the new instance
  266.    * will not load resources from the default files.
  267.    *
  268.    * @param loadDefaults specifies whether to load from the default files
  269.    */
  270.   public JobConf(boolean loadDefaults) {
  271.     super(loadDefaults);
  272.   }
  273.   /**
  274.    * Get the user jar for the map-reduce job.
  275.    * 
  276.    * @return the user jar for the map-reduce job.
  277.    */
  278.   public String getJar() { return get("mapred.jar"); }
  279.   
  280.   /**
  281.    * Set the user jar for the map-reduce job.
  282.    * 
  283.    * @param jar the user jar for the map-reduce job.
  284.    */
  285.   public void setJar(String jar) { set("mapred.jar", jar); }
  286.   
  287.   /**
  288.    * Set the job's jar file by finding an example class location.
  289.    * 
  290.    * @param cls the example class.
  291.    */
  292.   public void setJarByClass(Class cls) {
  293.     String jar = findContainingJar(cls);
  294.     if (jar != null) {
  295.       setJar(jar);
  296.     }   
  297.   }
  298.   public String[] getLocalDirs() throws IOException {
  299.     return getStrings("mapred.local.dir");
  300.   }
  301.   public void deleteLocalFiles() throws IOException {
  302.     String[] localDirs = getLocalDirs();
  303.     for (int i = 0; i < localDirs.length; i++) {
  304.       FileSystem.getLocal(this).delete(new Path(localDirs[i]));
  305.     }
  306.   }
  307.   public void deleteLocalFiles(String subdir) throws IOException {
  308.     String[] localDirs = getLocalDirs();
  309.     for (int i = 0; i < localDirs.length; i++) {
  310.       FileSystem.getLocal(this).delete(new Path(localDirs[i], subdir));
  311.     }
  312.   }
  313.   /** 
  314.    * Constructs a local file name. Files are distributed among configured
  315.    * local directories.
  316.    */
  317.   public Path getLocalPath(String pathString) throws IOException {
  318.     return getLocalPath("mapred.local.dir", pathString);
  319.   }
  320.   /**
  321.    * Get the reported username for this job.
  322.    * 
  323.    * @return the username
  324.    */
  325.   public String getUser() {
  326.     return get("user.name");
  327.   }
  328.   
  329.   /**
  330.    * Set the reported username for this job.
  331.    * 
  332.    * @param user the username for this job.
  333.    */
  334.   public void setUser(String user) {
  335.     set("user.name", user);
  336.   }
  337.   
  338.   /**
  339.    * Set whether the framework should keep the intermediate files for 
  340.    * failed tasks.
  341.    * 
  342.    * @param keep <code>true</code> if framework should keep the intermediate files 
  343.    *             for failed tasks, <code>false</code> otherwise.
  344.    * 
  345.    */
  346.   public void setKeepFailedTaskFiles(boolean keep) {
  347.     setBoolean("keep.failed.task.files", keep);
  348.   }
  349.   
  350.   /**
  351.    * Should the temporary files for failed tasks be kept?
  352.    * 
  353.    * @return should the files be kept?
  354.    */
  355.   public boolean getKeepFailedTaskFiles() {
  356.     return getBoolean("keep.failed.task.files", false);
  357.   }
  358.   
  359.   /**
  360.    * Set a regular expression for task names that should be kept. 
  361.    * The regular expression ".*_m_000123_0" would keep the files
  362.    * for the first instance of map 123 that ran.
  363.    * 
  364.    * @param pattern the java.util.regex.Pattern to match against the 
  365.    *        task names.
  366.    */
  367.   public void setKeepTaskFilesPattern(String pattern) {
  368.     set("keep.task.files.pattern", pattern);
  369.   }
  370.   
  371.   /**
  372.    * Get the regular expression that is matched against the task names
  373.    * to see if we need to keep the files.
  374.    * 
  375.    * @return the pattern as a string, if it was set, othewise null.
  376.    */
  377.   public String getKeepTaskFilesPattern() {
  378.     return get("keep.task.files.pattern");
  379.   }
  380.   
  381.   /**
  382.    * Set the current working directory for the default file system.
  383.    * 
  384.    * @param dir the new current working directory.
  385.    */
  386.   public void setWorkingDirectory(Path dir) {
  387.     dir = new Path(getWorkingDirectory(), dir);
  388.     set("mapred.working.dir", dir.toString());
  389.   }
  390.   
  391.   /**
  392.    * Get the current working directory for the default file system.
  393.    * 
  394.    * @return the directory name.
  395.    */
  396.   public Path getWorkingDirectory() {
  397.     String name = get("mapred.working.dir");
  398.     if (name != null) {
  399.       return new Path(name);
  400.     } else {
  401.       try {
  402.         Path dir = FileSystem.get(this).getWorkingDirectory();
  403.         set("mapred.working.dir", dir.toString());
  404.         return dir;
  405.       } catch (IOException e) {
  406.         throw new RuntimeException(e);
  407.       }
  408.     }
  409.   }
  410.   
  411.   /**
  412.    * Sets the number of tasks that a spawned task JVM should run
  413.    * before it exits
  414.    * @param numTasks the number of tasks to execute; defaults to 1;
  415.    * -1 signifies no limit
  416.    */
  417.   public void setNumTasksToExecutePerJvm(int numTasks) {
  418.     setInt("mapred.job.reuse.jvm.num.tasks", numTasks);
  419.   }
  420.   
  421.   /**
  422.    * Get the number of tasks that a spawned JVM should execute
  423.    */
  424.   public int getNumTasksToExecutePerJvm() {
  425.     return getInt("mapred.job.reuse.jvm.num.tasks", 1);
  426.   }
  427.   
  428.   /**
  429.    * Get the {@link InputFormat} implementation for the map-reduce job,
  430.    * defaults to {@link TextInputFormat} if not specified explicity.
  431.    * 
  432.    * @return the {@link InputFormat} implementation for the map-reduce job.
  433.    */
  434.   public InputFormat getInputFormat() {
  435.     return ReflectionUtils.newInstance(getClass("mapred.input.format.class",
  436.                                                              TextInputFormat.class,
  437.                                                              InputFormat.class),
  438.                                                     this);
  439.   }
  440.   
  441.   /**
  442.    * Set the {@link InputFormat} implementation for the map-reduce job.
  443.    * 
  444.    * @param theClass the {@link InputFormat} implementation for the map-reduce 
  445.    *                 job.
  446.    */
  447.   public void setInputFormat(Class<? extends InputFormat> theClass) {
  448.     setClass("mapred.input.format.class", theClass, InputFormat.class);
  449.   }
  450.   
  451.   /**
  452.    * Get the {@link OutputFormat} implementation for the map-reduce job,
  453.    * defaults to {@link TextOutputFormat} if not specified explicity.
  454.    * 
  455.    * @return the {@link OutputFormat} implementation for the map-reduce job.
  456.    */
  457.   public OutputFormat getOutputFormat() {
  458.     return ReflectionUtils.newInstance(getClass("mapred.output.format.class",
  459.                                                               TextOutputFormat.class,
  460.                                                               OutputFormat.class),
  461.                                                      this);
  462.   }
  463.   /**
  464.    * Get the {@link OutputCommitter} implementation for the map-reduce job,
  465.    * defaults to {@link FileOutputCommitter} if not specified explicitly.
  466.    * 
  467.    * @return the {@link OutputCommitter} implementation for the map-reduce job.
  468.    */
  469.   public OutputCommitter getOutputCommitter() {
  470.     return (OutputCommitter)ReflectionUtils.newInstance(
  471.       getClass("mapred.output.committer.class", FileOutputCommitter.class,
  472.                OutputCommitter.class), this);
  473.   }
  474.   /**
  475.    * Set the {@link OutputCommitter} implementation for the map-reduce job.
  476.    * 
  477.    * @param theClass the {@link OutputCommitter} implementation for the map-reduce 
  478.    *                 job.
  479.    */
  480.   public void setOutputCommitter(Class<? extends OutputCommitter> theClass) {
  481.     setClass("mapred.output.committer.class", theClass, OutputCommitter.class);
  482.   }
  483.   
  484.   /**
  485.    * Set the {@link OutputFormat} implementation for the map-reduce job.
  486.    * 
  487.    * @param theClass the {@link OutputFormat} implementation for the map-reduce 
  488.    *                 job.
  489.    */
  490.   public void setOutputFormat(Class<? extends OutputFormat> theClass) {
  491.     setClass("mapred.output.format.class", theClass, OutputFormat.class);
  492.   }
  493.   /**
  494.    * Should the map outputs be compressed before transfer?
  495.    * Uses the SequenceFile compression.
  496.    * 
  497.    * @param compress should the map outputs be compressed?
  498.    */
  499.   public void setCompressMapOutput(boolean compress) {
  500.     setBoolean("mapred.compress.map.output", compress);
  501.   }
  502.   
  503.   /**
  504.    * Are the outputs of the maps be compressed?
  505.    * 
  506.    * @return <code>true</code> if the outputs of the maps are to be compressed,
  507.    *         <code>false</code> otherwise.
  508.    */
  509.   public boolean getCompressMapOutput() {
  510.     return getBoolean("mapred.compress.map.output", false);
  511.   }
  512.   /**
  513.    * Set the given class as the  {@link CompressionCodec} for the map outputs.
  514.    * 
  515.    * @param codecClass the {@link CompressionCodec} class that will compress  
  516.    *                   the map outputs.
  517.    */
  518.   public void 
  519.   setMapOutputCompressorClass(Class<? extends CompressionCodec> codecClass) {
  520.     setCompressMapOutput(true);
  521.     setClass("mapred.map.output.compression.codec", codecClass, 
  522.              CompressionCodec.class);
  523.   }
  524.   
  525.   /**
  526.    * Get the {@link CompressionCodec} for compressing the map outputs.
  527.    * 
  528.    * @param defaultValue the {@link CompressionCodec} to return if not set
  529.    * @return the {@link CompressionCodec} class that should be used to compress the 
  530.    *         map outputs.
  531.    * @throws IllegalArgumentException if the class was specified, but not found
  532.    */
  533.   public Class<? extends CompressionCodec> 
  534.   getMapOutputCompressorClass(Class<? extends CompressionCodec> defaultValue) {
  535.     Class<? extends CompressionCodec> codecClass = defaultValue;
  536.     String name = get("mapred.map.output.compression.codec");
  537.     if (name != null) {
  538.       try {
  539.         codecClass = getClassByName(name).asSubclass(CompressionCodec.class);
  540.       } catch (ClassNotFoundException e) {
  541.         throw new IllegalArgumentException("Compression codec " + name + 
  542.                                            " was not found.", e);
  543.       }
  544.     }
  545.     return codecClass;
  546.   }
  547.   
  548.   /**
  549.    * Get the key class for the map output data. If it is not set, use the
  550.    * (final) output key class. This allows the map output key class to be
  551.    * different than the final output key class.
  552.    *  
  553.    * @return the map output key class.
  554.    */
  555.   public Class<?> getMapOutputKeyClass() {
  556.     Class<?> retv = getClass("mapred.mapoutput.key.class", null, Object.class);
  557.     if (retv == null) {
  558.       retv = getOutputKeyClass();
  559.     }
  560.     return retv;
  561.   }
  562.   
  563.   /**
  564.    * Set the key class for the map output data. This allows the user to
  565.    * specify the map output key class to be different than the final output
  566.    * value class.
  567.    * 
  568.    * @param theClass the map output key class.
  569.    */
  570.   public void setMapOutputKeyClass(Class<?> theClass) {
  571.     setClass("mapred.mapoutput.key.class", theClass, Object.class);
  572.   }
  573.   
  574.   /**
  575.    * Get the value class for the map output data. If it is not set, use the
  576.    * (final) output value class This allows the map output value class to be
  577.    * different than the final output value class.
  578.    *  
  579.    * @return the map output value class.
  580.    */
  581.   public Class<?> getMapOutputValueClass() {
  582.     Class<?> retv = getClass("mapred.mapoutput.value.class", null,
  583.         Object.class);
  584.     if (retv == null) {
  585.       retv = getOutputValueClass();
  586.     }
  587.     return retv;
  588.   }
  589.   
  590.   /**
  591.    * Set the value class for the map output data. This allows the user to
  592.    * specify the map output value class to be different than the final output
  593.    * value class.
  594.    * 
  595.    * @param theClass the map output value class.
  596.    */
  597.   public void setMapOutputValueClass(Class<?> theClass) {
  598.     setClass("mapred.mapoutput.value.class", theClass, Object.class);
  599.   }
  600.   
  601.   /**
  602.    * Get the key class for the job output data.
  603.    * 
  604.    * @return the key class for the job output data.
  605.    */
  606.   public Class<?> getOutputKeyClass() {
  607.     return getClass("mapred.output.key.class",
  608.                     LongWritable.class, Object.class);
  609.   }
  610.   
  611.   /**
  612.    * Set the key class for the job output data.
  613.    * 
  614.    * @param theClass the key class for the job output data.
  615.    */
  616.   public void setOutputKeyClass(Class<?> theClass) {
  617.     setClass("mapred.output.key.class", theClass, Object.class);
  618.   }
  619.   /**
  620.    * Get the {@link RawComparator} comparator used to compare keys.
  621.    * 
  622.    * @return the {@link RawComparator} comparator used to compare keys.
  623.    */
  624.   public RawComparator getOutputKeyComparator() {
  625.     Class<? extends RawComparator> theClass = getClass("mapred.output.key.comparator.class",
  626.         null, RawComparator.class);
  627.     if (theClass != null)
  628.       return ReflectionUtils.newInstance(theClass, this);
  629.     return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class));
  630.   }
  631.   /**
  632.    * Set the {@link RawComparator} comparator used to compare keys.
  633.    * 
  634.    * @param theClass the {@link RawComparator} comparator used to 
  635.    *                 compare keys.
  636.    * @see #setOutputValueGroupingComparator(Class)                 
  637.    */
  638.   public void setOutputKeyComparatorClass(Class<? extends RawComparator> theClass) {
  639.     setClass("mapred.output.key.comparator.class",
  640.              theClass, RawComparator.class);
  641.   }
  642.   /**
  643.    * Set the {@link KeyFieldBasedComparator} options used to compare keys.
  644.    * 
  645.    * @param keySpec the key specification of the form -k pos1[,pos2], where,
  646.    *  pos is of the form f[.c][opts], where f is the number
  647.    *  of the key field to use, and c is the number of the first character from
  648.    *  the beginning of the field. Fields and character posns are numbered 
  649.    *  starting with 1; a character position of zero in pos2 indicates the
  650.    *  field's last character. If '.c' is omitted from pos1, it defaults to 1
  651.    *  (the beginning of the field); if omitted from pos2, it defaults to 0 
  652.    *  (the end of the field). opts are ordering options. The supported options
  653.    *  are:
  654.    *    -n, (Sort numerically)
  655.    *    -r, (Reverse the result of comparison)                 
  656.    */
  657.   public void setKeyFieldComparatorOptions(String keySpec) {
  658.     setOutputKeyComparatorClass(KeyFieldBasedComparator.class);
  659.     set("mapred.text.key.comparator.options", keySpec);
  660.   }
  661.   
  662.   /**
  663.    * Get the {@link KeyFieldBasedComparator} options
  664.    */
  665.   public String getKeyFieldComparatorOption() {
  666.     return get("mapred.text.key.comparator.options");
  667.   }
  668.   /**
  669.    * Set the {@link KeyFieldBasedPartitioner} options used for 
  670.    * {@link Partitioner}
  671.    * 
  672.    * @param keySpec the key specification of the form -k pos1[,pos2], where,
  673.    *  pos is of the form f[.c][opts], where f is the number
  674.    *  of the key field to use, and c is the number of the first character from
  675.    *  the beginning of the field. Fields and character posns are numbered 
  676.    *  starting with 1; a character position of zero in pos2 indicates the
  677.    *  field's last character. If '.c' is omitted from pos1, it defaults to 1
  678.    *  (the beginning of the field); if omitted from pos2, it defaults to 0 
  679.    *  (the end of the field).
  680.    */
  681.   public void setKeyFieldPartitionerOptions(String keySpec) {
  682.     setPartitionerClass(KeyFieldBasedPartitioner.class);
  683.     set("mapred.text.key.partitioner.options", keySpec);
  684.   }
  685.   
  686.   /**
  687.    * Get the {@link KeyFieldBasedPartitioner} options
  688.    */
  689.   public String getKeyFieldPartitionerOption() {
  690.     return get("mapred.text.key.partitioner.options");
  691.   }
  692.   /** 
  693.    * Get the user defined {@link WritableComparable} comparator for 
  694.    * grouping keys of inputs to the reduce.
  695.    * 
  696.    * @return comparator set by the user for grouping values.
  697.    * @see #setOutputValueGroupingComparator(Class) for details.  
  698.    */
  699.   public RawComparator getOutputValueGroupingComparator() {
  700.     Class<? extends RawComparator> theClass = getClass("mapred.output.value.groupfn.class", null,
  701.         RawComparator.class);
  702.     if (theClass == null) {
  703.       return getOutputKeyComparator();
  704.     }
  705.     
  706.     return ReflectionUtils.newInstance(theClass, this);
  707.   }
  708.   /** 
  709.    * Set the user defined {@link RawComparator} comparator for 
  710.    * grouping keys in the input to the reduce.
  711.    * 
  712.    * <p>This comparator should be provided if the equivalence rules for keys
  713.    * for sorting the intermediates are different from those for grouping keys
  714.    * before each call to 
  715.    * {@link Reducer#reduce(Object, java.util.Iterator, OutputCollector, Reporter)}.</p>
  716.    *  
  717.    * <p>For key-value pairs (K1,V1) and (K2,V2), the values (V1, V2) are passed
  718.    * in a single call to the reduce function if K1 and K2 compare as equal.</p>
  719.    * 
  720.    * <p>Since {@link #setOutputKeyComparatorClass(Class)} can be used to control 
  721.    * how keys are sorted, this can be used in conjunction to simulate 
  722.    * <i>secondary sort on values</i>.</p>
  723.    *  
  724.    * <p><i>Note</i>: This is not a guarantee of the reduce sort being 
  725.    * <i>stable</i> in any sense. (In any case, with the order of available 
  726.    * map-outputs to the reduce being non-deterministic, it wouldn't make 
  727.    * that much sense.)</p>
  728.    * 
  729.    * @param theClass the comparator class to be used for grouping keys. 
  730.    *                 It should implement <code>RawComparator</code>.
  731.    * @see #setOutputKeyComparatorClass(Class)                 
  732.    */
  733.   public void setOutputValueGroupingComparator(
  734.   Class<? extends RawComparator> theClass) {
  735.     setClass("mapred.output.value.groupfn.class",
  736.              theClass, RawComparator.class);
  737.   }
  738.   /**
  739.    * Should the framework use the new context-object code for running
  740.    * the mapper?
  741.    * @return true, if the new api should be used
  742.    */
  743.   public boolean getUseNewMapper() {
  744.     return getBoolean("mapred.mapper.new-api", false);
  745.   }
  746.   /**
  747.    * Set whether the framework should use the new api for the mapper.
  748.    * This is the default for jobs submitted with the new Job api.
  749.    * @param flag true, if the new api should be used
  750.    */
  751.   public void setUseNewMapper(boolean flag) {
  752.     setBoolean("mapred.mapper.new-api", flag);
  753.   }
  754.   /**
  755.    * Should the framework use the new context-object code for running
  756.    * the reducer?
  757.    * @return true, if the new api should be used
  758.    */
  759.   public boolean getUseNewReducer() {
  760.     return getBoolean("mapred.reducer.new-api", false);
  761.   }
  762.   /**
  763.    * Set whether the framework should use the new api for the reducer. 
  764.    * This is the default for jobs submitted with the new Job api.
  765.    * @param flag true, if the new api should be used
  766.    */
  767.   public void setUseNewReducer(boolean flag) {
  768.     setBoolean("mapred.reducer.new-api", flag);
  769.   }
  770.   /**
  771.    * Get the value class for job outputs.
  772.    * 
  773.    * @return the value class for job outputs.
  774.    */
  775.   public Class<?> getOutputValueClass() {
  776.     return getClass("mapred.output.value.class", Text.class, Object.class);
  777.   }
  778.   
  779.   /**
  780.    * Set the value class for job outputs.
  781.    * 
  782.    * @param theClass the value class for job outputs.
  783.    */
  784.   public void setOutputValueClass(Class<?> theClass) {
  785.     setClass("mapred.output.value.class", theClass, Object.class);
  786.   }
  787.   /**
  788.    * Get the {@link Mapper} class for the job.
  789.    * 
  790.    * @return the {@link Mapper} class for the job.
  791.    */
  792.   public Class<? extends Mapper> getMapperClass() {
  793.     return getClass("mapred.mapper.class", IdentityMapper.class, Mapper.class);
  794.   }
  795.   
  796.   /**
  797.    * Set the {@link Mapper} class for the job.
  798.    * 
  799.    * @param theClass the {@link Mapper} class for the job.
  800.    */
  801.   public void setMapperClass(Class<? extends Mapper> theClass) {
  802.     setClass("mapred.mapper.class", theClass, Mapper.class);
  803.   }
  804.   /**
  805.    * Get the {@link MapRunnable} class for the job.
  806.    * 
  807.    * @return the {@link MapRunnable} class for the job.
  808.    */
  809.   public Class<? extends MapRunnable> getMapRunnerClass() {
  810.     return getClass("mapred.map.runner.class",
  811.                     MapRunner.class, MapRunnable.class);
  812.   }
  813.   
  814.   /**
  815.    * Expert: Set the {@link MapRunnable} class for the job.
  816.    * 
  817.    * Typically used to exert greater control on {@link Mapper}s.
  818.    * 
  819.    * @param theClass the {@link MapRunnable} class for the job.
  820.    */
  821.   public void setMapRunnerClass(Class<? extends MapRunnable> theClass) {
  822.     setClass("mapred.map.runner.class", theClass, MapRunnable.class);
  823.   }
  824.   /**
  825.    * Get the {@link Partitioner} used to partition {@link Mapper}-outputs 
  826.    * to be sent to the {@link Reducer}s.
  827.    * 
  828.    * @return the {@link Partitioner} used to partition map-outputs.
  829.    */
  830.   public Class<? extends Partitioner> getPartitionerClass() {
  831.     return getClass("mapred.partitioner.class",
  832.                     HashPartitioner.class, Partitioner.class);
  833.   }
  834.   
  835.   /**
  836.    * Set the {@link Partitioner} class used to partition 
  837.    * {@link Mapper}-outputs to be sent to the {@link Reducer}s.
  838.    * 
  839.    * @param theClass the {@link Partitioner} used to partition map-outputs.
  840.    */
  841.   public void setPartitionerClass(Class<? extends Partitioner> theClass) {
  842.     setClass("mapred.partitioner.class", theClass, Partitioner.class);
  843.   }
  844.   /**
  845.    * Get the {@link Reducer} class for the job.
  846.    * 
  847.    * @return the {@link Reducer} class for the job.
  848.    */
  849.   public Class<? extends Reducer> getReducerClass() {
  850.     return getClass("mapred.reducer.class",
  851.                     IdentityReducer.class, Reducer.class);
  852.   }
  853.   
  854.   /**
  855.    * Set the {@link Reducer} class for the job.
  856.    * 
  857.    * @param theClass the {@link Reducer} class for the job.
  858.    */
  859.   public void setReducerClass(Class<? extends Reducer> theClass) {
  860.     setClass("mapred.reducer.class", theClass, Reducer.class);
  861.   }
  862.   /**
  863.    * Get the user-defined <i>combiner</i> class used to combine map-outputs 
  864.    * before being sent to the reducers. Typically the combiner is same as the
  865.    * the {@link Reducer} for the job i.e. {@link #getReducerClass()}.
  866.    * 
  867.    * @return the user-defined combiner class used to combine map-outputs.
  868.    */
  869.   public Class<? extends Reducer> getCombinerClass() {
  870.     return getClass("mapred.combiner.class", null, Reducer.class);
  871.   }
  872.   /**
  873.    * Set the user-defined <i>combiner</i> class used to combine map-outputs 
  874.    * before being sent to the reducers. 
  875.    * 
  876.    * <p>The combiner is an application-specified aggregation operation, which
  877.    * can help cut down the amount of data transferred between the 
  878.    * {@link Mapper} and the {@link Reducer}, leading to better performance.</p>
  879.    * 
  880.    * <p>The framework may invoke the combiner 0, 1, or multiple times, in both
  881.    * the mapper and reducer tasks. In general, the combiner is called as the
  882.    * sort/merge result is written to disk. The combiner must:
  883.    * <ul>
  884.    *   <li> be side-effect free</li>
  885.    *   <li> have the same input and output key types and the same input and 
  886.    *        output value types</li>
  887.    * </ul></p>
  888.    * 
  889.    * <p>Typically the combiner is same as the <code>Reducer</code> for the  
  890.    * job i.e. {@link #setReducerClass(Class)}.</p>
  891.    * 
  892.    * @param theClass the user-defined combiner class used to combine 
  893.    *                 map-outputs.
  894.    */
  895.   public void setCombinerClass(Class<? extends Reducer> theClass) {
  896.     setClass("mapred.combiner.class", theClass, Reducer.class);
  897.   }
  898.   
  899.   /**
  900.    * Should speculative execution be used for this job? 
  901.    * Defaults to <code>true</code>.
  902.    * 
  903.    * @return <code>true</code> if speculative execution be used for this job,
  904.    *         <code>false</code> otherwise.
  905.    */
  906.   public boolean getSpeculativeExecution() { 
  907.     return (getMapSpeculativeExecution() || getReduceSpeculativeExecution());
  908.   }
  909.   
  910.   /**
  911.    * Turn speculative execution on or off for this job. 
  912.    * 
  913.    * @param speculativeExecution <code>true</code> if speculative execution 
  914.    *                             should be turned on, else <code>false</code>.
  915.    */
  916.   public void setSpeculativeExecution(boolean speculativeExecution) {
  917.     setMapSpeculativeExecution(speculativeExecution);
  918.     setReduceSpeculativeExecution(speculativeExecution);
  919.   }
  920.   /**
  921.    * Should speculative execution be used for this job for map tasks? 
  922.    * Defaults to <code>true</code>.
  923.    * 
  924.    * @return <code>true</code> if speculative execution be 
  925.    *                           used for this job for map tasks,
  926.    *         <code>false</code> otherwise.
  927.    */
  928.   public boolean getMapSpeculativeExecution() { 
  929.     return getBoolean("mapred.map.tasks.speculative.execution", true);
  930.   }
  931.   
  932.   /**
  933.    * Turn speculative execution on or off for this job for map tasks. 
  934.    * 
  935.    * @param speculativeExecution <code>true</code> if speculative execution 
  936.    *                             should be turned on for map tasks,
  937.    *                             else <code>false</code>.
  938.    */
  939.   public void setMapSpeculativeExecution(boolean speculativeExecution) {
  940.     setBoolean("mapred.map.tasks.speculative.execution", speculativeExecution);
  941.   }
  942.   /**
  943.    * Should speculative execution be used for this job for reduce tasks? 
  944.    * Defaults to <code>true</code>.
  945.    * 
  946.    * @return <code>true</code> if speculative execution be used 
  947.    *                           for reduce tasks for this job,
  948.    *         <code>false</code> otherwise.
  949.    */
  950.   public boolean getReduceSpeculativeExecution() { 
  951.     return getBoolean("mapred.reduce.tasks.speculative.execution", true);
  952.   }
  953.   
  954.   /**
  955.    * Turn speculative execution on or off for this job for reduce tasks. 
  956.    * 
  957.    * @param speculativeExecution <code>true</code> if speculative execution 
  958.    *                             should be turned on for reduce tasks,
  959.    *                             else <code>false</code>.
  960.    */
  961.   public void setReduceSpeculativeExecution(boolean speculativeExecution) {
  962.     setBoolean("mapred.reduce.tasks.speculative.execution", 
  963.                speculativeExecution);
  964.   }
  965.   /**
  966.    * Get configured the number of reduce tasks for this job.
  967.    * Defaults to <code>1</code>.
  968.    * 
  969.    * @return the number of reduce tasks for this job.
  970.    */
  971.   public int getNumMapTasks() { return getInt("mapred.map.tasks", 1); }
  972.   
  973.   /**
  974.    * Set the number of map tasks for this job.
  975.    * 
  976.    * <p><i>Note</i>: This is only a <i>hint</i> to the framework. The actual 
  977.    * number of spawned map tasks depends on the number of {@link InputSplit}s 
  978.    * generated by the job's {@link InputFormat#getSplits(JobConf, int)}.
  979.    *  
  980.    * A custom {@link InputFormat} is typically used to accurately control 
  981.    * the number of map tasks for the job.</p>
  982.    * 
  983.    * <h4 id="NoOfMaps">How many maps?</h4>
  984.    * 
  985.    * <p>The number of maps is usually driven by the total size of the inputs 
  986.    * i.e. total number of blocks of the input files.</p>
  987.    *  
  988.    * <p>The right level of parallelism for maps seems to be around 10-100 maps 
  989.    * per-node, although it has been set up to 300 or so for very cpu-light map 
  990.    * tasks. Task setup takes awhile, so it is best if the maps take at least a 
  991.    * minute to execute.</p>
  992.    * 
  993.    * <p>The default behavior of file-based {@link InputFormat}s is to split the 
  994.    * input into <i>logical</i> {@link InputSplit}s based on the total size, in 
  995.    * bytes, of input files. However, the {@link FileSystem} blocksize of the 
  996.    * input files is treated as an upper bound for input splits. A lower bound 
  997.    * on the split size can be set via 
  998.    * <a href="{@docRoot}/../mapred-default.html#mapred.min.split.size">
  999.    * mapred.min.split.size</a>.</p>
  1000.    *  
  1001.    * <p>Thus, if you expect 10TB of input data and have a blocksize of 128MB, 
  1002.    * you'll end up with 82,000 maps, unless {@link #setNumMapTasks(int)} is 
  1003.    * used to set it even higher.</p>
  1004.    * 
  1005.    * @param n the number of map tasks for this job.
  1006.    * @see InputFormat#getSplits(JobConf, int)
  1007.    * @see FileInputFormat
  1008.    * @see FileSystem#getDefaultBlockSize()
  1009.    * @see FileStatus#getBlockSize()
  1010.    */
  1011.   public void setNumMapTasks(int n) { setInt("mapred.map.tasks", n); }
  1012.   /**
  1013.    * Get configured the number of reduce tasks for this job. Defaults to 
  1014.    * <code>1</code>.
  1015.    * 
  1016.    * @return the number of reduce tasks for this job.
  1017.    */
  1018.   public int getNumReduceTasks() { return getInt("mapred.reduce.tasks", 1); }
  1019.   
  1020.   /**
  1021.    * Set the requisite number of reduce tasks for this job.
  1022.    * 
  1023.    * <h4 id="NoOfReduces">How many reduces?</h4>
  1024.    * 
  1025.    * <p>The right number of reduces seems to be <code>0.95</code> or 
  1026.    * <code>1.75</code> multiplied by (&lt;<i>no. of nodes</i>&gt; * 
  1027.    * <a href="{@docRoot}/../mapred-default.html#mapred.tasktracker.reduce.tasks.maximum">
  1028.    * mapred.tasktracker.reduce.tasks.maximum</a>).
  1029.    * </p>
  1030.    * 
  1031.    * <p>With <code>0.95</code> all of the reduces can launch immediately and 
  1032.    * start transfering map outputs as the maps finish. With <code>1.75</code> 
  1033.    * the faster nodes will finish their first round of reduces and launch a 
  1034.    * second wave of reduces doing a much better job of load balancing.</p>
  1035.    * 
  1036.    * <p>Increasing the number of reduces increases the framework overhead, but 
  1037.    * increases load balancing and lowers the cost of failures.</p>
  1038.    * 
  1039.    * <p>The scaling factors above are slightly less than whole numbers to 
  1040.    * reserve a few reduce slots in the framework for speculative-tasks, failures
  1041.    * etc.</p> 
  1042.    *
  1043.    * <h4 id="ReducerNone">Reducer NONE</h4>
  1044.    * 
  1045.    * <p>It is legal to set the number of reduce-tasks to <code>zero</code>.</p>
  1046.    * 
  1047.    * <p>In this case the output of the map-tasks directly go to distributed 
  1048.    * file-system, to the path set by 
  1049.    * {@link FileOutputFormat#setOutputPath(JobConf, Path)}. Also, the 
  1050.    * framework doesn't sort the map-outputs before writing it out to HDFS.</p>
  1051.    * 
  1052.    * @param n the number of reduce tasks for this job.
  1053.    */
  1054.   public void setNumReduceTasks(int n) { setInt("mapred.reduce.tasks", n); }
  1055.   
  1056.   /** 
  1057.    * Get the configured number of maximum attempts that will be made to run a
  1058.    * map task, as specified by the <code>mapred.map.max.attempts</code>
  1059.    * property. If this property is not already set, the default is 4 attempts.
  1060.    *  
  1061.    * @return the max number of attempts per map task.
  1062.    */
  1063.   public int getMaxMapAttempts() {
  1064.     return getInt("mapred.map.max.attempts", 4);
  1065.   }
  1066.   
  1067.   /** 
  1068.    * Expert: Set the number of maximum attempts that will be made to run a
  1069.    * map task.
  1070.    * 
  1071.    * @param n the number of attempts per map task.
  1072.    */
  1073.   public void setMaxMapAttempts(int n) {
  1074.     setInt("mapred.map.max.attempts", n);
  1075.   }
  1076.   /** 
  1077.    * Get the configured number of maximum attempts  that will be made to run a
  1078.    * reduce task, as specified by the <code>mapred.reduce.max.attempts</code>
  1079.    * property. If this property is not already set, the default is 4 attempts.
  1080.    * 
  1081.    * @return the max number of attempts per reduce task.
  1082.    */
  1083.   public int getMaxReduceAttempts() {
  1084.     return getInt("mapred.reduce.max.attempts", 4);
  1085.   }
  1086.   /** 
  1087.    * Expert: Set the number of maximum attempts that will be made to run a
  1088.    * reduce task.
  1089.    * 
  1090.    * @param n the number of attempts per reduce task.
  1091.    */
  1092.   public void setMaxReduceAttempts(int n) {
  1093.     setInt("mapred.reduce.max.attempts", n);
  1094.   }
  1095.   
  1096.   /**
  1097.    * Get the user-specified job name. This is only used to identify the 
  1098.    * job to the user.
  1099.    * 
  1100.    * @return the job's name, defaulting to "".
  1101.    */
  1102.   public String getJobName() {
  1103.     return get("mapred.job.name", "");
  1104.   }
  1105.   
  1106.   /**
  1107.    * Set the user-specified job name.
  1108.    * 
  1109.    * @param name the job's new name.
  1110.    */
  1111.   public void setJobName(String name) {
  1112.     set("mapred.job.name", name);
  1113.   }
  1114.   
  1115.   /**
  1116.    * Get the user-specified session identifier. The default is the empty string.
  1117.    *
  1118.    * The session identifier is used to tag metric data that is reported to some
  1119.    * performance metrics system via the org.apache.hadoop.metrics API.  The 
  1120.    * session identifier is intended, in particular, for use by Hadoop-On-Demand 
  1121.    * (HOD) which allocates a virtual Hadoop cluster dynamically and transiently. 
  1122.    * HOD will set the session identifier by modifying the mapred-site.xml file 
  1123.    * before starting the cluster.
  1124.    *
  1125.    * When not running under HOD, this identifer is expected to remain set to 
  1126.    * the empty string.
  1127.    *
  1128.    * @return the session identifier, defaulting to "".
  1129.    */
  1130.   public String getSessionId() {
  1131.       return get("session.id", "");
  1132.   }
  1133.   
  1134.   /**
  1135.    * Set the user-specified session identifier.  
  1136.    *
  1137.    * @param sessionId the new session id.
  1138.    */
  1139.   public void setSessionId(String sessionId) {
  1140.       set("session.id", sessionId);
  1141.   }
  1142.     
  1143.   /**
  1144.    * Set the maximum no. of failures of a given job per tasktracker.
  1145.    * If the no. of task failures exceeds <code>noFailures</code>, the 
  1146.    * tasktracker is <i>blacklisted</i> for this job. 
  1147.    * 
  1148.    * @param noFailures maximum no. of failures of a given job per tasktracker.
  1149.    */
  1150.   public void setMaxTaskFailuresPerTracker(int noFailures) {
  1151.     setInt("mapred.max.tracker.failures", noFailures);
  1152.   }
  1153.   
  1154.   /**
  1155.    * Expert: Get the maximum no. of failures of a given job per tasktracker.
  1156.    * If the no. of task failures exceeds this, the tasktracker is
  1157.    * <i>blacklisted</i> for this job. 
  1158.    * 
  1159.    * @return the maximum no. of failures of a given job per tasktracker.
  1160.    */
  1161.   public int getMaxTaskFailuresPerTracker() {
  1162.     return getInt("mapred.max.tracker.failures", 4); 
  1163.   }
  1164.   /**
  1165.    * Get the maximum percentage of map tasks that can fail without 
  1166.    * the job being aborted. 
  1167.    * 
  1168.    * Each map task is executed a minimum of {@link #getMaxMapAttempts()} 
  1169.    * attempts before being declared as <i>failed</i>.
  1170.    *  
  1171.    * Defaults to <code>zero</code>, i.e. <i>any</i> failed map-task results in
  1172.    * the job being declared as {@link JobStatus#FAILED}.
  1173.    * 
  1174.    * @return the maximum percentage of map tasks that can fail without
  1175.    *         the job being aborted.
  1176.    */
  1177.   public int getMaxMapTaskFailuresPercent() {
  1178.     return getInt("mapred.max.map.failures.percent", 0);
  1179.   }
  1180.   /**
  1181.    * Expert: Set the maximum percentage of map tasks that can fail without the
  1182.    * job being aborted. 
  1183.    * 
  1184.    * Each map task is executed a minimum of {@link #getMaxMapAttempts} attempts 
  1185.    * before being declared as <i>failed</i>.
  1186.    * 
  1187.    * @param percent the maximum percentage of map tasks that can fail without 
  1188.    *                the job being aborted.
  1189.    */
  1190.   public void setMaxMapTaskFailuresPercent(int percent) {
  1191.     setInt("mapred.max.map.failures.percent", percent);
  1192.   }
  1193.   
  1194.   /**
  1195.    * Get the maximum percentage of reduce tasks that can fail without 
  1196.    * the job being aborted. 
  1197.    * 
  1198.    * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()} 
  1199.    * attempts before being declared as <i>failed</i>.
  1200.    * 
  1201.    * Defaults to <code>zero</code>, i.e. <i>any</i> failed reduce-task results 
  1202.    * in the job being declared as {@link JobStatus#FAILED}.
  1203.    * 
  1204.    * @return the maximum percentage of reduce tasks that can fail without
  1205.    *         the job being aborted.
  1206.    */
  1207.   public int getMaxReduceTaskFailuresPercent() {
  1208.     return getInt("mapred.max.reduce.failures.percent", 0);
  1209.   }
  1210.   
  1211.   /**
  1212.    * Set the maximum percentage of reduce tasks that can fail without the job
  1213.    * being aborted.
  1214.    * 
  1215.    * Each reduce task is executed a minimum of {@link #getMaxReduceAttempts()} 
  1216.    * attempts before being declared as <i>failed</i>.
  1217.    * 
  1218.    * @param percent the maximum percentage of reduce tasks that can fail without 
  1219.    *                the job being aborted.
  1220.    */
  1221.   public void setMaxReduceTaskFailuresPercent(int percent) {
  1222.     setInt("mapred.max.reduce.failures.percent", percent);
  1223.   }
  1224.   
  1225.   /**
  1226.    * Set {@link JobPriority} for this job.
  1227.    * 
  1228.    * @param prio the {@link JobPriority} for this job.
  1229.    */
  1230.   public void setJobPriority(JobPriority prio) {
  1231.     set("mapred.job.priority", prio.toString());
  1232.   }
  1233.   
  1234.   /**
  1235.    * Get the {@link JobPriority} for this job.
  1236.    * 
  1237.    * @return the {@link JobPriority} for this job.
  1238.    */
  1239.   public JobPriority getJobPriority() {
  1240.     String prio = get("mapred.job.priority");
  1241.     if(prio == null) {
  1242.       return JobPriority.NORMAL;
  1243.     }
  1244.     
  1245.     return JobPriority.valueOf(prio);
  1246.   }
  1247.   /**
  1248.    * Get whether the task profiling is enabled.
  1249.    * @return true if some tasks will be profiled
  1250.    */
  1251.   public boolean getProfileEnabled() {
  1252.     return getBoolean("mapred.task.profile", false);
  1253.   }
  1254.   /**
  1255.    * Set whether the system should collect profiler information for some of 
  1256.    * the tasks in this job? The information is stored in the user log 
  1257.    * directory.
  1258.    * @param newValue true means it should be gathered
  1259.    */
  1260.   public void setProfileEnabled(boolean newValue) {
  1261.     setBoolean("mapred.task.profile", newValue);
  1262.   }
  1263.   /**
  1264.    * Get the profiler configuration arguments.
  1265.    *
  1266.    * The default value for this property is
  1267.    * "-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s"
  1268.    * 
  1269.    * @return the parameters to pass to the task child to configure profiling
  1270.    */
  1271.   public String getProfileParams() {
  1272.     return get("mapred.task.profile.params",
  1273.                "-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y," +
  1274.                  "verbose=n,file=%s");
  1275.   }
  1276.   /**
  1277.    * Set the profiler configuration arguments. If the string contains a '%s' it
  1278.    * will be replaced with the name of the profiling output file when the task
  1279.    * runs.
  1280.    *
  1281.    * This value is passed to the task child JVM on the command line.
  1282.    *
  1283.    * @param value the configuration string
  1284.    */
  1285.   public void setProfileParams(String value) {
  1286.     set("mapred.task.profile.params", value);
  1287.   }
  1288.   /**
  1289.    * Get the range of maps or reduces to profile.
  1290.    * @param isMap is the task a map?
  1291.    * @return the task ranges
  1292.    */
  1293.   public IntegerRanges getProfileTaskRange(boolean isMap) {
  1294.     return getRange((isMap ? "mapred.task.profile.maps" : 
  1295.                        "mapred.task.profile.reduces"), "0-2");
  1296.   }
  1297.   /**
  1298.    * Set the ranges of maps or reduces to profile. setProfileEnabled(true) 
  1299.    * must also be called.
  1300.    * @param newValue a set of integer ranges of the map ids
  1301.    */
  1302.   public void setProfileTaskRange(boolean isMap, String newValue) {
  1303.     // parse the value to make sure it is legal
  1304.     new Configuration.IntegerRanges(newValue);
  1305.     set((isMap ? "mapred.task.profile.maps" : "mapred.task.profile.reduces"), 
  1306.         newValue);
  1307.   }
  1308.   /**
  1309.    * Set the debug script to run when the map tasks fail.
  1310.    * 
  1311.    * <p>The debug script can aid debugging of failed map tasks. The script is 
  1312.    * given task's stdout, stderr, syslog, jobconf files as arguments.</p>
  1313.    * 
  1314.    * <p>The debug command, run on the node where the map failed, is:</p>
  1315.    * <p><pre><blockquote> 
  1316.    * $script $stdout $stderr $syslog $jobconf.
  1317.    * </blockquote></pre></p>
  1318.    * 
  1319.    * <p> The script file is distributed through {@link DistributedCache} 
  1320.    * APIs. The script needs to be symlinked. </p>
  1321.    * 
  1322.    * <p>Here is an example on how to submit a script 
  1323.    * <p><blockquote><pre>
  1324.    * job.setMapDebugScript("./myscript");
  1325.    * DistributedCache.createSymlink(job);
  1326.    * DistributedCache.addCacheFile("/debug/scripts/myscript#myscript");
  1327.    * </pre></blockquote></p>
  1328.    * 
  1329.    * @param mDbgScript the script name
  1330.    */
  1331.   public void  setMapDebugScript(String mDbgScript) {
  1332.     set("mapred.map.task.debug.script", mDbgScript);
  1333.   }
  1334.   
  1335.   /**
  1336.    * Get the map task's debug script.
  1337.    * 
  1338.    * @return the debug Script for the mapred job for failed map tasks.
  1339.    * @see #setMapDebugScript(String)
  1340.    */
  1341.   public String getMapDebugScript() {
  1342.     return get("mapred.map.task.debug.script");
  1343.   }
  1344.   
  1345.   /**
  1346.    * Set the debug script to run when the reduce tasks fail.
  1347.    * 
  1348.    * <p>The debug script can aid debugging of failed reduce tasks. The script
  1349.    * is given task's stdout, stderr, syslog, jobconf files as arguments.</p>
  1350.    * 
  1351.    * <p>The debug command, run on the node where the map failed, is:</p>
  1352.    * <p><pre><blockquote> 
  1353.    * $script $stdout $stderr $syslog $jobconf.
  1354.    * </blockquote></pre></p>
  1355.    * 
  1356.    * <p> The script file is distributed through {@link DistributedCache} 
  1357.    * APIs. The script file needs to be symlinked </p>
  1358.    * 
  1359.    * <p>Here is an example on how to submit a script 
  1360.    * <p><blockquote><pre>
  1361.    * job.setReduceDebugScript("./myscript");
  1362.    * DistributedCache.createSymlink(job);
  1363.    * DistributedCache.addCacheFile("/debug/scripts/myscript#myscript");
  1364.    * </pre></blockquote></p>
  1365.    * 
  1366.    * @param rDbgScript the script name
  1367.    */
  1368.   public void  setReduceDebugScript(String rDbgScript) {
  1369.     set("mapred.reduce.task.debug.script", rDbgScript);
  1370.   }
  1371.   
  1372.   /**
  1373.    * Get the reduce task's debug Script
  1374.    * 
  1375.    * @return the debug script for the mapred job for failed reduce tasks.
  1376.    * @see #setReduceDebugScript(String)
  1377.    */
  1378.   public String getReduceDebugScript() {
  1379.     return get("mapred.reduce.task.debug.script");
  1380.   }
  1381.   /**
  1382.    * Get the uri to be invoked in-order to send a notification after the job 
  1383.    * has completed (success/failure). 
  1384.    * 
  1385.    * @return the job end notification uri, <code>null</code> if it hasn't
  1386.    *         been set.
  1387.    * @see #setJobEndNotificationURI(String)
  1388.    */
  1389.   public String getJobEndNotificationURI() {
  1390.     return get("job.end.notification.url");
  1391.   }
  1392.   /**
  1393.    * Set the uri to be invoked in-order to send a notification after the job
  1394.    * has completed (success/failure).
  1395.    * 
  1396.    * <p>The uri can contain 2 special parameters: <tt>$jobId</tt> and 
  1397.    * <tt>$jobStatus</tt>. Those, if present, are replaced by the job's 
  1398.    * identifier and completion-status respectively.</p>
  1399.    * 
  1400.    * <p>This is typically used by application-writers to implement chaining of 
  1401.    * Map-Reduce jobs in an <i>asynchronous manner</i>.</p>
  1402.    * 
  1403.    * @param uri the job end notification uri
  1404.    * @see JobStatus
  1405.    * @see <a href="{@docRoot}/org/apache/hadoop/mapred/JobClient.html#JobCompletionAndChaining">Job Completion and Chaining</a>
  1406.    */
  1407.   public void setJobEndNotificationURI(String uri) {
  1408.     set("job.end.notification.url", uri);
  1409.   }
  1410.   /**
  1411.    * Get job-specific shared directory for use as scratch space
  1412.    * 
  1413.    * <p>
  1414.    * When a job starts, a shared directory is created at location
  1415.    * <code>
  1416.    * ${mapred.local.dir}/taskTracker/jobcache/$jobid/work/ </code>.
  1417.    * This directory is exposed to the users through 
  1418.    * <code>job.local.dir </code>.
  1419.    * So, the tasks can use this space 
  1420.    * as scratch space and share files among them. </p>
  1421.    * This value is available as System property also.
  1422.    * 
  1423.    * @return The localized job specific shared directory
  1424.    */
  1425.   public String getJobLocalDir() {
  1426.     return get("job.local.dir");
  1427.   }
  1428.   
  1429.   /**
  1430.    * The maximum amount of memory any task of this job will use. See
  1431.    * {@link #MAPRED_TASK_MAXVMEM_PROPERTY}
  1432.    * 
  1433.    * @return The maximum amount of memory any task of this job will use, in
  1434.    *         bytes.
  1435.    * @see #setMaxVirtualMemoryForTask(long)
  1436.    */
  1437.   public long getMaxVirtualMemoryForTask() {
  1438.     return getLong(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY, DISABLED_MEMORY_LIMIT);
  1439.   }
  1440.   /**
  1441.    * Set the maximum amount of memory any task of this job can use. See
  1442.    * {@link #MAPRED_TASK_MAXVMEM_PROPERTY}
  1443.    * 
  1444.    * @param vmem Maximum amount of virtual memory in bytes any task of this job
  1445.    *          can use.
  1446.    * @see #getMaxVirtualMemoryForTask()
  1447.    */
  1448.   public void setMaxVirtualMemoryForTask(long vmem) {
  1449.     setLong(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY, vmem);
  1450.   }
  1451.   /**
  1452.    * The maximum amount of physical memory any task of this job will use. See
  1453.    * {@link #MAPRED_TASK_MAXPMEM_PROPERTY}
  1454.    * 
  1455.    * @return The maximum amount of physical memory any task of this job will
  1456.    *         use, in bytes.
  1457.    * @see #setMaxPhysicalMemoryForTask(long)
  1458.    */
  1459.   public long getMaxPhysicalMemoryForTask() {
  1460.     return getLong(JobConf.MAPRED_TASK_MAXPMEM_PROPERTY, DISABLED_MEMORY_LIMIT);
  1461.   }
  1462.   /**
  1463.    * Set the maximum amount of physical memory any task of this job can use. See
  1464.    * {@link #MAPRED_TASK_MAXPMEM_PROPERTY}
  1465.    * 
  1466.    * @param pmem Maximum amount of physical memory in bytes any task of this job
  1467.    *          can use.
  1468.    * @see #getMaxPhysicalMemoryForTask()
  1469.    */
  1470.   public void setMaxPhysicalMemoryForTask(long pmem) {
  1471.     setLong(JobConf.MAPRED_TASK_MAXPMEM_PROPERTY, pmem);
  1472.   }
  1473.   /**
  1474.    * Return the name of the queue to which this job is submitted.
  1475.    * Defaults to 'default'.
  1476.    * 
  1477.    * @return name of the queue
  1478.    */
  1479.   public String getQueueName() {
  1480.     return get("mapred.job.queue.name", DEFAULT_QUEUE_NAME);
  1481.   }
  1482.   
  1483.   /**
  1484.    * Set the name of the queue to which this job should be submitted.
  1485.    * 
  1486.    * @param queueName Name of the queue
  1487.    */
  1488.   public void setQueueName(String queueName) {
  1489.     set("mapred.job.queue.name", queueName);
  1490.   }
  1491.   
  1492.   /** 
  1493.    * Find a jar that contains a class of the same name, if any.
  1494.    * It will return a jar file, even if that is not the first thing
  1495.    * on the class path that has a class with the same name.
  1496.    * 
  1497.    * @param my_class the class to find.
  1498.    * @return a jar file that contains the class, or null.
  1499.    * @throws IOException
  1500.    */
  1501.   private static String findContainingJar(Class my_class) {
  1502.     ClassLoader loader = my_class.getClassLoader();
  1503.     String class_file = my_class.getName().replaceAll("\.", "/") + ".class";
  1504.     try {
  1505.       for(Enumeration itr = loader.getResources(class_file);
  1506.           itr.hasMoreElements();) {
  1507.         URL url = (URL) itr.nextElement();
  1508.         if ("jar".equals(url.getProtocol())) {
  1509.           String toReturn = url.getPath();
  1510.           if (toReturn.startsWith("file:")) {
  1511.             toReturn = toReturn.substring("file:".length());
  1512.           }
  1513.           toReturn = URLDecoder.decode(toReturn, "UTF-8");
  1514.           return toReturn.replaceAll("!.*$", "");
  1515.         }
  1516.       }
  1517.     } catch (IOException e) {
  1518.       throw new RuntimeException(e);
  1519.     }
  1520.     return null;
  1521.   }
  1522. }