JobClient.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:62k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.BufferedReader;
  20. import java.io.BufferedWriter;
  21. import java.io.DataInput;
  22. import java.io.DataOutput;
  23. import java.io.DataOutputStream;
  24. import java.io.FileNotFoundException;
  25. import java.io.FileOutputStream;
  26. import java.io.IOException;
  27. import java.io.InputStream;
  28. import java.io.InputStreamReader;
  29. import java.io.OutputStream;
  30. import java.io.OutputStreamWriter;
  31. import java.net.InetAddress;
  32. import java.net.InetSocketAddress;
  33. import java.net.URI;
  34. import java.net.URISyntaxException;
  35. import java.net.URL;
  36. import java.net.URLConnection;
  37. import java.net.UnknownHostException;
  38. import java.util.Arrays;
  39. import java.util.Collection;
  40. import java.util.Comparator;
  41. import java.util.List;
  42. import javax.security.auth.login.LoginException;
  43. import org.apache.commons.logging.Log;
  44. import org.apache.commons.logging.LogFactory;
  45. import org.apache.hadoop.conf.Configuration;
  46. import org.apache.hadoop.conf.Configured;
  47. import org.apache.hadoop.filecache.DistributedCache;
  48. import org.apache.hadoop.fs.FSDataOutputStream;
  49. import org.apache.hadoop.fs.FileStatus;
  50. import org.apache.hadoop.fs.FileSystem;
  51. import org.apache.hadoop.fs.FileUtil;
  52. import org.apache.hadoop.fs.Path;
  53. import org.apache.hadoop.fs.permission.FsPermission;
  54. import org.apache.hadoop.io.BytesWritable;
  55. import org.apache.hadoop.io.DataOutputBuffer;
  56. import org.apache.hadoop.io.IOUtils;
  57. import org.apache.hadoop.io.Text;
  58. import org.apache.hadoop.io.Writable;
  59. import org.apache.hadoop.io.WritableUtils;
  60. import org.apache.hadoop.io.serializer.SerializationFactory;
  61. import org.apache.hadoop.io.serializer.Serializer;
  62. import org.apache.hadoop.ipc.RPC;
  63. import org.apache.hadoop.mapred.Counters.Counter;
  64. import org.apache.hadoop.mapred.Counters.Group;
  65. import org.apache.hadoop.net.NetUtils;
  66. import org.apache.hadoop.security.UnixUserGroupInformation;
  67. import org.apache.hadoop.util.ReflectionUtils;
  68. import org.apache.hadoop.util.StringUtils;
  69. import org.apache.hadoop.util.Tool;
  70. import org.apache.hadoop.util.ToolRunner;
  71. /**
  72.  * <code>JobClient</code> is the primary interface for the user-job to interact
  73.  * with the {@link JobTracker}.
  74.  * 
  75.  * <code>JobClient</code> provides facilities to submit jobs, track their 
  76.  * progress, access component-tasks' reports/logs, get the Map-Reduce cluster
  77.  * status information etc.
  78.  * 
  79.  * <p>The job submission process involves:
  80.  * <ol>
  81.  *   <li>
  82.  *   Checking the input and output specifications of the job.
  83.  *   </li>
  84.  *   <li>
  85.  *   Computing the {@link InputSplit}s for the job.
  86.  *   </li>
  87.  *   <li>
  88.  *   Setup the requisite accounting information for the {@link DistributedCache} 
  89.  *   of the job, if necessary.
  90.  *   </li>
  91.  *   <li>
  92.  *   Copying the job's jar and configuration to the map-reduce system directory 
  93.  *   on the distributed file-system. 
  94.  *   </li>
  95.  *   <li>
  96.  *   Submitting the job to the <code>JobTracker</code> and optionally monitoring
  97.  *   it's status.
  98.  *   </li>
  99.  * </ol></p>
  100.  *  
  101.  * Normally the user creates the application, describes various facets of the
  102.  * job via {@link JobConf} and then uses the <code>JobClient</code> to submit 
  103.  * the job and monitor its progress.
  104.  * 
  105.  * <p>Here is an example on how to use <code>JobClient</code>:</p>
  106.  * <p><blockquote><pre>
  107.  *     // Create a new JobConf
  108.  *     JobConf job = new JobConf(new Configuration(), MyJob.class);
  109.  *     
  110.  *     // Specify various job-specific parameters     
  111.  *     job.setJobName("myjob");
  112.  *     
  113.  *     job.setInputPath(new Path("in"));
  114.  *     job.setOutputPath(new Path("out"));
  115.  *     
  116.  *     job.setMapperClass(MyJob.MyMapper.class);
  117.  *     job.setReducerClass(MyJob.MyReducer.class);
  118.  *
  119.  *     // Submit the job, then poll for progress until the job is complete
  120.  *     JobClient.runJob(job);
  121.  * </pre></blockquote></p>
  122.  * 
  123.  * <h4 id="JobControl">Job Control</h4>
  124.  * 
  125.  * <p>At times clients would chain map-reduce jobs to accomplish complex tasks 
  126.  * which cannot be done via a single map-reduce job. This is fairly easy since 
  127.  * the output of the job, typically, goes to distributed file-system and that 
  128.  * can be used as the input for the next job.</p>
  129.  * 
  130.  * <p>However, this also means that the onus on ensuring jobs are complete 
  131.  * (success/failure) lies squarely on the clients. In such situations the 
  132.  * various job-control options are:
  133.  * <ol>
  134.  *   <li>
  135.  *   {@link #runJob(JobConf)} : submits the job and returns only after 
  136.  *   the job has completed.
  137.  *   </li>
  138.  *   <li>
  139.  *   {@link #submitJob(JobConf)} : only submits the job, then poll the 
  140.  *   returned handle to the {@link RunningJob} to query status and make 
  141.  *   scheduling decisions.
  142.  *   </li>
  143.  *   <li>
  144.  *   {@link JobConf#setJobEndNotificationURI(String)} : setup a notification
  145.  *   on job-completion, thus avoiding polling.
  146.  *   </li>
  147.  * </ol></p>
  148.  * 
  149.  * @see JobConf
  150.  * @see ClusterStatus
  151.  * @see Tool
  152.  * @see DistributedCache
  153.  */
  154. public class JobClient extends Configured implements MRConstants, Tool  {
  155.   private static final Log LOG = LogFactory.getLog(JobClient.class);
  156.   public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL }
  157.   private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED; 
  158.   private static final long MAX_JOBPROFILE_AGE = 1000 * 2;
  159.   static{
  160.     Configuration.addDefaultResource("mapred-default.xml");
  161.     Configuration.addDefaultResource("mapred-site.xml");
  162.   }
  163.   /**
  164.    * A NetworkedJob is an implementation of RunningJob.  It holds
  165.    * a JobProfile object to provide some info, and interacts with the
  166.    * remote service to provide certain functionality.
  167.    */
  168.   class NetworkedJob implements RunningJob {
  169.     JobProfile profile;
  170.     JobStatus status;
  171.     long statustime;
  172.     /**
  173.      * We store a JobProfile and a timestamp for when we last
  174.      * acquired the job profile.  If the job is null, then we cannot
  175.      * perform any of the tasks.  The job might be null if the JobTracker
  176.      * has completely forgotten about the job.  (eg, 24 hours after the
  177.      * job completes.)
  178.      */
  179.     public NetworkedJob(JobStatus job) throws IOException {
  180.       this.status = job;
  181.       this.profile = jobSubmitClient.getJobProfile(job.getJobID());
  182.       this.statustime = System.currentTimeMillis();
  183.     }
  184.     /**
  185.      * Some methods rely on having a recent job profile object.  Refresh
  186.      * it, if necessary
  187.      */
  188.     synchronized void ensureFreshStatus() throws IOException {
  189.       if (System.currentTimeMillis() - statustime > MAX_JOBPROFILE_AGE) {
  190.         updateStatus();
  191.       }
  192.     }
  193.     
  194.     /** Some methods need to update status immediately. So, refresh
  195.      * immediately
  196.      * @throws IOException
  197.      */
  198.     synchronized void updateStatus() throws IOException {
  199.       this.status = jobSubmitClient.getJobStatus(profile.getJobID());
  200.       this.statustime = System.currentTimeMillis();
  201.     }
  202.     /**
  203.      * An identifier for the job
  204.      */
  205.     public JobID getID() {
  206.       return profile.getJobID();
  207.     }
  208.     
  209.     /** @deprecated This method is deprecated and will be removed. Applications should 
  210.      * rather use {@link #getID()}.*/
  211.     @Deprecated
  212.     public String getJobID() {
  213.       return profile.getJobID().toString();
  214.     }
  215.     
  216.     /**
  217.      * The user-specified job name
  218.      */
  219.     public String getJobName() {
  220.       return profile.getJobName();
  221.     }
  222.     /**
  223.      * The name of the job file
  224.      */
  225.     public String getJobFile() {
  226.       return profile.getJobFile();
  227.     }
  228.     /**
  229.      * A URL where the job's status can be seen
  230.      */
  231.     public String getTrackingURL() {
  232.       return profile.getURL().toString();
  233.     }
  234.     /**
  235.      * A float between 0.0 and 1.0, indicating the % of map work
  236.      * completed.
  237.      */
  238.     public float mapProgress() throws IOException {
  239.       ensureFreshStatus();
  240.       return status.mapProgress();
  241.     }
  242.     /**
  243.      * A float between 0.0 and 1.0, indicating the % of reduce work
  244.      * completed.
  245.      */
  246.     public float reduceProgress() throws IOException {
  247.       ensureFreshStatus();
  248.       return status.reduceProgress();
  249.     }
  250.     /**
  251.      * A float between 0.0 and 1.0, indicating the % of cleanup work
  252.      * completed.
  253.      */
  254.     public float cleanupProgress() throws IOException {
  255.       ensureFreshStatus();
  256.       return status.cleanupProgress();
  257.     }
  258.     /**
  259.      * A float between 0.0 and 1.0, indicating the % of setup work
  260.      * completed.
  261.      */
  262.     public float setupProgress() throws IOException {
  263.       ensureFreshStatus();
  264.       return status.setupProgress();
  265.     }
  266.     /**
  267.      * Returns immediately whether the whole job is done yet or not.
  268.      */
  269.     public synchronized boolean isComplete() throws IOException {
  270.       updateStatus();
  271.       return (status.getRunState() == JobStatus.SUCCEEDED ||
  272.               status.getRunState() == JobStatus.FAILED ||
  273.               status.getRunState() == JobStatus.KILLED);
  274.     }
  275.     /**
  276.      * True iff job completed successfully.
  277.      */
  278.     public synchronized boolean isSuccessful() throws IOException {
  279.       updateStatus();
  280.       return status.getRunState() == JobStatus.SUCCEEDED;
  281.     }
  282.     /**
  283.      * Blocks until the job is finished
  284.      */
  285.     public void waitForCompletion() throws IOException {
  286.       while (!isComplete()) {
  287.         try {
  288.           Thread.sleep(5000);
  289.         } catch (InterruptedException ie) {
  290.         }
  291.       }
  292.     }
  293.     /**
  294.      * Tells the service to get the state of the current job.
  295.      */
  296.     public synchronized int getJobState() throws IOException {
  297.       updateStatus();
  298.       return status.getRunState();
  299.     }
  300.     
  301.     /**
  302.      * Tells the service to terminate the current job.
  303.      */
  304.     public synchronized void killJob() throws IOException {
  305.       jobSubmitClient.killJob(getID());
  306.     }
  307.    
  308.     
  309.     /** Set the priority of the job.
  310.     * @param priority new priority of the job. 
  311.     */
  312.     public synchronized void setJobPriority(String priority) 
  313.                                                 throws IOException {
  314.       jobSubmitClient.setJobPriority(getID(), priority);
  315.     }
  316.     
  317.     /**
  318.      * Kill indicated task attempt.
  319.      * @param taskId the id of the task to kill.
  320.      * @param shouldFail if true the task is failed and added to failed tasks list, otherwise
  321.      * it is just killed, w/o affecting job failure status.
  322.      */
  323.     public synchronized void killTask(TaskAttemptID taskId, boolean shouldFail) throws IOException {
  324.       jobSubmitClient.killTask(taskId, shouldFail);
  325.     }
  326.     /** @deprecated Applications should rather use {@link #killTask(TaskAttemptID, boolean)}*/
  327.     @Deprecated
  328.     public synchronized void killTask(String taskId, boolean shouldFail) throws IOException {
  329.       killTask(TaskAttemptID.forName(taskId), shouldFail);
  330.     }
  331.     
  332.     /**
  333.      * Fetch task completion events from jobtracker for this job. 
  334.      */
  335.     public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
  336.                                                                       int startFrom) throws IOException{
  337.       return jobSubmitClient.getTaskCompletionEvents(
  338.                                                      getID(), startFrom, 10); 
  339.     }
  340.     /**
  341.      * Dump stats to screen
  342.      */
  343.     @Override
  344.     public String toString() {
  345.       try {
  346.         updateStatus();
  347.       } catch (IOException e) {
  348.       }
  349.       return "Job: " + profile.getJobID() + "n" + 
  350.         "file: " + profile.getJobFile() + "n" + 
  351.         "tracking URL: " + profile.getURL() + "n" + 
  352.         "map() completion: " + status.mapProgress() + "n" + 
  353.         "reduce() completion: " + status.reduceProgress();
  354.     }
  355.         
  356.     /**
  357.      * Returns the counters for this job
  358.      */
  359.     public Counters getCounters() throws IOException {
  360.       return jobSubmitClient.getJobCounters(getID());
  361.     }
  362.     
  363.     @Override
  364.     public String[] getTaskDiagnostics(TaskAttemptID id) throws IOException {
  365.       return jobSubmitClient.getTaskDiagnostics(id);
  366.     }
  367.   }
  368.   private JobSubmissionProtocol jobSubmitClient;
  369.   private Path sysDir = null;
  370.   
  371.   private FileSystem fs = null;
  372.   /**
  373.    * Create a job client.
  374.    */
  375.   public JobClient() {
  376.   }
  377.     
  378.   /**
  379.    * Build a job client with the given {@link JobConf}, and connect to the 
  380.    * default {@link JobTracker}.
  381.    * 
  382.    * @param conf the job configuration.
  383.    * @throws IOException
  384.    */
  385.   public JobClient(JobConf conf) throws IOException {
  386.     setConf(conf);
  387.     init(conf);
  388.   }
  389.   /**
  390.    * Connect to the default {@link JobTracker}.
  391.    * @param conf the job configuration.
  392.    * @throws IOException
  393.    */
  394.   public void init(JobConf conf) throws IOException {
  395.     String tracker = conf.get("mapred.job.tracker", "local");
  396.     if ("local".equals(tracker)) {
  397.       this.jobSubmitClient = new LocalJobRunner(conf);
  398.     } else {
  399.       this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf);
  400.     }        
  401.   }
  402.   private JobSubmissionProtocol createRPCProxy(InetSocketAddress addr,
  403.       Configuration conf) throws IOException {
  404.     return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
  405.         JobSubmissionProtocol.versionID, addr, getUGI(conf), conf,
  406.         NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class));
  407.   }
  408.   /**
  409.    * Build a job client, connect to the indicated job tracker.
  410.    * 
  411.    * @param jobTrackAddr the job tracker to connect to.
  412.    * @param conf configuration.
  413.    */
  414.   public JobClient(InetSocketAddress jobTrackAddr, 
  415.                    Configuration conf) throws IOException {
  416.     jobSubmitClient = createRPCProxy(jobTrackAddr, conf);
  417.   }
  418.   /**
  419.    * Close the <code>JobClient</code>.
  420.    */
  421.   public synchronized void close() throws IOException {
  422.     if (!(jobSubmitClient instanceof LocalJobRunner)) {
  423.       RPC.stopProxy(jobSubmitClient);
  424.     }
  425.   }
  426.   /**
  427.    * Get a filesystem handle.  We need this to prepare jobs
  428.    * for submission to the MapReduce system.
  429.    * 
  430.    * @return the filesystem handle.
  431.    */
  432.   public synchronized FileSystem getFs() throws IOException {
  433.     if (this.fs == null) {
  434.       Path sysDir = getSystemDir();
  435.       this.fs = sysDir.getFileSystem(getConf());
  436.     }
  437.     return fs;
  438.   }
  439.   
  440.   /* see if two file systems are the same or not
  441.    *
  442.    */
  443.   private boolean compareFs(FileSystem srcFs, FileSystem destFs) {
  444.     URI srcUri = srcFs.getUri();
  445.     URI dstUri = destFs.getUri();
  446.     if (srcUri.getScheme() == null) {
  447.       return false;
  448.     }
  449.     if (!srcUri.getScheme().equals(dstUri.getScheme())) {
  450.       return false;
  451.     }
  452.     String srcHost = srcUri.getHost();    
  453.     String dstHost = dstUri.getHost();
  454.     if ((srcHost != null) && (dstHost != null)) {
  455.       try {
  456.         srcHost = InetAddress.getByName(srcHost).getCanonicalHostName();
  457.         dstHost = InetAddress.getByName(dstHost).getCanonicalHostName();
  458.       } catch(UnknownHostException ue) {
  459.         return false;
  460.       }
  461.       if (!srcHost.equals(dstHost)) {
  462.         return false;
  463.       }
  464.     }
  465.     else if (srcHost == null && dstHost != null) {
  466.       return false;
  467.     }
  468.     else if (srcHost != null && dstHost == null) {
  469.       return false;
  470.     }
  471.     //check for ports
  472.     if (srcUri.getPort() != dstUri.getPort()) {
  473.       return false;
  474.     }
  475.     return true;
  476.   }
  477.   // copies a file to the jobtracker filesystem and returns the path where it
  478.   // was copied to
  479.   private Path copyRemoteFiles(FileSystem jtFs, Path parentDir, Path originalPath, 
  480.                                JobConf job, short replication) throws IOException {
  481.     //check if we do not need to copy the files
  482.     // is jt using the same file system.
  483.     // just checking for uri strings... doing no dns lookups 
  484.     // to see if the filesystems are the same. This is not optimal.
  485.     // but avoids name resolution.
  486.     
  487.     FileSystem remoteFs = null;
  488.     remoteFs = originalPath.getFileSystem(job);
  489.     if (compareFs(remoteFs, jtFs)) {
  490.       return originalPath;
  491.     }
  492.     // this might have name collisions. copy will throw an exception
  493.     //parse the original path to create new path
  494.     Path newPath = new Path(parentDir, originalPath.getName());
  495.     FileUtil.copy(remoteFs, originalPath, jtFs, newPath, false, job);
  496.     jtFs.setReplication(newPath, replication);
  497.     return newPath;
  498.   }
  499.  
  500.   /**
  501.    * configure the jobconf of the user with the command line options of 
  502.    * -libjars, -files, -archives
  503.    * @param conf
  504.    * @throws IOException
  505.    */
  506.   private void configureCommandLineOptions(JobConf job, Path submitJobDir, Path submitJarFile) 
  507.     throws IOException {
  508.     
  509.     if (!(job.getBoolean("mapred.used.genericoptionsparser", false))) {
  510.       LOG.warn("Use GenericOptionsParser for parsing the arguments. " +
  511.                "Applications should implement Tool for the same.");
  512.     }
  513.     // get all the command line arguments into the 
  514.     // jobconf passed in by the user conf
  515.     String files = null;
  516.     String libjars = null;
  517.     String archives = null;
  518.     files = job.get("tmpfiles");
  519.     libjars = job.get("tmpjars");
  520.     archives = job.get("tmparchives");
  521.     /*
  522.      * set this user's id in job configuration, so later job files can be
  523.      * accessed using this user's id
  524.      */
  525.     UnixUserGroupInformation ugi = getUGI(job);
  526.       
  527.     //
  528.     // Figure out what fs the JobTracker is using.  Copy the
  529.     // job to it, under a temporary name.  This allows DFS to work,
  530.     // and under the local fs also provides UNIX-like object loading 
  531.     // semantics.  (that is, if the job file is deleted right after
  532.     // submission, we can still run the submission to completion)
  533.     //
  534.     // Create a number of filenames in the JobTracker's fs namespace
  535.     FileSystem fs = getFs();
  536.     LOG.debug("default FileSystem: " + fs.getUri());
  537.     fs.delete(submitJobDir, true);
  538.     submitJobDir = fs.makeQualified(submitJobDir);
  539.     submitJobDir = new Path(submitJobDir.toUri().getPath());
  540.     FsPermission mapredSysPerms = new FsPermission(JOB_DIR_PERMISSION);
  541.     FileSystem.mkdirs(fs, submitJobDir, mapredSysPerms);
  542.     Path filesDir = new Path(submitJobDir, "files");
  543.     Path archivesDir = new Path(submitJobDir, "archives");
  544.     Path libjarsDir = new Path(submitJobDir, "libjars");
  545.     short replication = (short)job.getInt("mapred.submit.replication", 10);
  546.     // add all the command line files/ jars and archive
  547.     // first copy them to jobtrackers filesystem 
  548.     
  549.     if (files != null) {
  550.       FileSystem.mkdirs(fs, filesDir, mapredSysPerms);
  551.       String[] fileArr = files.split(",");
  552.       for (String tmpFile: fileArr) {
  553.         Path tmp = new Path(tmpFile);
  554.         Path newPath = copyRemoteFiles(fs,filesDir, tmp, job, replication);
  555.         try {
  556.           URI pathURI = new URI(newPath.toUri().toString() + "#" + newPath.getName());
  557.           DistributedCache.addCacheFile(pathURI, job);
  558.         } catch(URISyntaxException ue) {
  559.           //should not throw a uri exception 
  560.           throw new IOException("Failed to create uri for " + tmpFile);
  561.         }
  562.         DistributedCache.createSymlink(job);
  563.       }
  564.     }
  565.     
  566.     if (libjars != null) {
  567.       FileSystem.mkdirs(fs, libjarsDir, mapredSysPerms);
  568.       String[] libjarsArr = libjars.split(",");
  569.       for (String tmpjars: libjarsArr) {
  570.         Path tmp = new Path(tmpjars);
  571.         Path newPath = copyRemoteFiles(fs, libjarsDir, tmp, job, replication);
  572.         DistributedCache.addArchiveToClassPath(newPath, job);
  573.       }
  574.     }
  575.     
  576.     
  577.     if (archives != null) {
  578.      FileSystem.mkdirs(fs, archivesDir, mapredSysPerms); 
  579.      String[] archivesArr = archives.split(",");
  580.      for (String tmpArchives: archivesArr) {
  581.        Path tmp = new Path(tmpArchives);
  582.        Path newPath = copyRemoteFiles(fs, archivesDir, tmp, job, replication);
  583.        try {
  584.          URI pathURI = new URI(newPath.toUri().toString() + "#" + newPath.getName());
  585.          DistributedCache.addCacheArchive(pathURI, job);
  586.        } catch(URISyntaxException ue) {
  587.          //should not throw an uri excpetion
  588.          throw new IOException("Failed to create uri for " + tmpArchives);
  589.        }
  590.        DistributedCache.createSymlink(job);
  591.      }
  592.     }
  593.     
  594.     //  set the timestamps of the archives and files
  595.     URI[] tarchives = DistributedCache.getCacheArchives(job);
  596.     if (tarchives != null) {
  597.       StringBuffer archiveTimestamps = 
  598.         new StringBuffer(String.valueOf(DistributedCache.getTimestamp(job, tarchives[0])));
  599.       for (int i = 1; i < tarchives.length; i++) {
  600.         archiveTimestamps.append(",");
  601.         archiveTimestamps.append(String.valueOf(DistributedCache.getTimestamp(job, tarchives[i])));
  602.       }
  603.       DistributedCache.setArchiveTimestamps(job, archiveTimestamps.toString());
  604.     }
  605.     URI[] tfiles = DistributedCache.getCacheFiles(job);
  606.     if (tfiles != null) {
  607.       StringBuffer fileTimestamps = 
  608.         new StringBuffer(String.valueOf(DistributedCache.getTimestamp(job, tfiles[0])));
  609.       for (int i = 1; i < tfiles.length; i++) {
  610.         fileTimestamps.append(",");
  611.         fileTimestamps.append(String.valueOf(DistributedCache.getTimestamp(job, tfiles[i])));
  612.       }
  613.       DistributedCache.setFileTimestamps(job, fileTimestamps.toString());
  614.     }
  615.        
  616.     String originalJarPath = job.getJar();
  617.     if (originalJarPath != null) {           // copy jar to JobTracker's fs
  618.       // use jar name if job is not named. 
  619.       if ("".equals(job.getJobName())){
  620.         job.setJobName(new Path(originalJarPath).getName());
  621.       }
  622.       job.setJar(submitJarFile.toString());
  623.       fs.copyFromLocalFile(new Path(originalJarPath), submitJarFile);
  624.       fs.setReplication(submitJarFile, replication);
  625.       fs.setPermission(submitJarFile, new FsPermission(JOB_FILE_PERMISSION));
  626.     } else {
  627.       LOG.warn("No job jar file set.  User classes may not be found. "+
  628.                "See JobConf(Class) or JobConf#setJar(String).");
  629.     }
  630.     // Set the user's name and working directory
  631.     job.setUser(ugi.getUserName());
  632.     if (ugi.getGroupNames().length > 0) {
  633.       job.set("group.name", ugi.getGroupNames()[0]);
  634.     }
  635.     if (job.getWorkingDirectory() == null) {
  636.       job.setWorkingDirectory(fs.getWorkingDirectory());          
  637.     }
  638.   }
  639.   private UnixUserGroupInformation getUGI(Configuration job) throws IOException {
  640.     UnixUserGroupInformation ugi = null;
  641.     try {
  642.       ugi = UnixUserGroupInformation.login(job, true);
  643.     } catch (LoginException e) {
  644.       throw (IOException)(new IOException(
  645.           "Failed to get the current user's information.").initCause(e));
  646.     }
  647.     return ugi;
  648.   }
  649.   
  650.   /**
  651.    * Submit a job to the MR system.
  652.    * 
  653.    * This returns a handle to the {@link RunningJob} which can be used to track
  654.    * the running-job.
  655.    * 
  656.    * @param jobFile the job configuration.
  657.    * @return a handle to the {@link RunningJob} which can be used to track the
  658.    *         running-job.
  659.    * @throws FileNotFoundException
  660.    * @throws InvalidJobConfException
  661.    * @throws IOException
  662.    */
  663.   public RunningJob submitJob(String jobFile) throws FileNotFoundException, 
  664.                                                      InvalidJobConfException, 
  665.                                                      IOException {
  666.     // Load in the submitted job details
  667.     JobConf job = new JobConf(jobFile);
  668.     return submitJob(job);
  669.   }
  670.     
  671.   // job files are world-wide readable and owner writable
  672.   final private static FsPermission JOB_FILE_PERMISSION = 
  673.     FsPermission.createImmutable((short) 0644); // rw-r--r--
  674.   // job submission directory is world readable/writable/executable
  675.   final static FsPermission JOB_DIR_PERMISSION =
  676.     FsPermission.createImmutable((short) 0777); // rwx-rwx-rwx
  677.    
  678.   /**
  679.    * Submit a job to the MR system.
  680.    * This returns a handle to the {@link RunningJob} which can be used to track
  681.    * the running-job.
  682.    * 
  683.    * @param job the job configuration.
  684.    * @return a handle to the {@link RunningJob} which can be used to track the
  685.    *         running-job.
  686.    * @throws FileNotFoundException
  687.    * @throws IOException
  688.    */
  689.   public RunningJob submitJob(JobConf job) throws FileNotFoundException,
  690.                                                   IOException {
  691.     try {
  692.       return submitJobInternal(job);
  693.     } catch (InterruptedException ie) {
  694.       throw new IOException("interrupted", ie);
  695.     } catch (ClassNotFoundException cnfe) {
  696.       throw new IOException("class not found", cnfe);
  697.     }
  698.   }
  699.   /**
  700.    * Internal method for submitting jobs to the system.
  701.    * @param job the configuration to submit
  702.    * @return a proxy object for the running job
  703.    * @throws FileNotFoundException
  704.    * @throws ClassNotFoundException
  705.    * @throws InterruptedException
  706.    * @throws IOException
  707.    */
  708.   public 
  709.   RunningJob submitJobInternal(JobConf job
  710.                                ) throws FileNotFoundException, 
  711.                                         ClassNotFoundException,
  712.                                         InterruptedException,
  713.                                         IOException {
  714.     /*
  715.      * configure the command line options correctly on the submitting dfs
  716.      */
  717.     
  718.     JobID jobId = jobSubmitClient.getNewJobId();
  719.     Path submitJobDir = new Path(getSystemDir(), jobId.toString());
  720.     Path submitJarFile = new Path(submitJobDir, "job.jar");
  721.     Path submitSplitFile = new Path(submitJobDir, "job.split");
  722.     configureCommandLineOptions(job, submitJobDir, submitJarFile);
  723.     Path submitJobFile = new Path(submitJobDir, "job.xml");
  724.     int reduces = job.getNumReduceTasks();
  725.     JobContext context = new JobContext(job, jobId);
  726.     
  727.     // Check the output specification
  728.     if (reduces == 0 ? job.getUseNewMapper() : job.getUseNewReducer()) {
  729.       org.apache.hadoop.mapreduce.OutputFormat<?,?> output =
  730.         ReflectionUtils.newInstance(context.getOutputFormatClass(), job);
  731.       output.checkOutputSpecs(context);
  732.     } else {
  733.       job.getOutputFormat().checkOutputSpecs(fs, job);
  734.     }
  735.     // Create the splits for the job
  736.     LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
  737.     int maps;
  738.     if (job.getUseNewMapper()) {
  739.       maps = writeNewSplits(context, submitSplitFile);
  740.     } else {
  741.       maps = writeOldSplits(job, submitSplitFile);
  742.     }
  743.     job.set("mapred.job.split.file", submitSplitFile.toString());
  744.     job.setNumMapTasks(maps);
  745.         
  746.     // Write job file to JobTracker's fs        
  747.     FSDataOutputStream out = 
  748.       FileSystem.create(fs, submitJobFile,
  749.                         new FsPermission(JOB_FILE_PERMISSION));
  750.     try {
  751.       job.writeXml(out);
  752.     } finally {
  753.       out.close();
  754.     }
  755.     //
  756.     // Now, actually submit the job (using the submit name)
  757.     //
  758.     JobStatus status = jobSubmitClient.submitJob(jobId);
  759.     if (status != null) {
  760.       return new NetworkedJob(status);
  761.     } else {
  762.       throw new IOException("Could not launch job");
  763.     }
  764.   }
  765.   private int writeOldSplits(JobConf job, 
  766.                              Path submitSplitFile) throws IOException {
  767.     InputSplit[] splits = 
  768.       job.getInputFormat().getSplits(job, job.getNumMapTasks());
  769.     // sort the splits into order based on size, so that the biggest
  770.     // go first
  771.     Arrays.sort(splits, new Comparator<InputSplit>() {
  772.       public int compare(InputSplit a, InputSplit b) {
  773.         try {
  774.           long left = a.getLength();
  775.           long right = b.getLength();
  776.           if (left == right) {
  777.             return 0;
  778.           } else if (left < right) {
  779.             return 1;
  780.           } else {
  781.             return -1;
  782.           }
  783.         } catch (IOException ie) {
  784.           throw new RuntimeException("Problem getting input split size",
  785.                                      ie);
  786.         }
  787.       }
  788.     });
  789.     DataOutputStream out = writeSplitsFileHeader(job, submitSplitFile, splits.length);
  790.     
  791.     try {
  792.       DataOutputBuffer buffer = new DataOutputBuffer();
  793.       RawSplit rawSplit = new RawSplit();
  794.       for(InputSplit split: splits) {
  795.         rawSplit.setClassName(split.getClass().getName());
  796.         buffer.reset();
  797.         split.write(buffer);
  798.         rawSplit.setDataLength(split.getLength());
  799.         rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
  800.         rawSplit.setLocations(split.getLocations());
  801.         rawSplit.write(out);
  802.       }
  803.     } finally {
  804.       out.close();
  805.     }
  806.     return splits.length;
  807.   }
  808.   private static class NewSplitComparator 
  809.     implements Comparator<org.apache.hadoop.mapreduce.InputSplit>{
  810.     @Override
  811.     public int compare(org.apache.hadoop.mapreduce.InputSplit o1,
  812.                        org.apache.hadoop.mapreduce.InputSplit o2) {
  813.       try {
  814.         long len1 = o1.getLength();
  815.         long len2 = o2.getLength();
  816.         if (len1 < len2) {
  817.           return 1;
  818.         } else if (len1 == len2) {
  819.           return 0;
  820.         } else {
  821.           return -1;
  822.         }
  823.       } catch (IOException ie) {
  824.         throw new RuntimeException("exception in compare", ie);
  825.       } catch (InterruptedException ie) {
  826.         throw new RuntimeException("exception in compare", ie);        
  827.       }
  828.     }
  829.   }
  830.   @SuppressWarnings("unchecked")
  831.   private <T extends org.apache.hadoop.mapreduce.InputSplit> 
  832.   int writeNewSplits(JobContext job, Path submitSplitFile
  833.                      ) throws IOException, InterruptedException, 
  834.                               ClassNotFoundException {
  835.     JobConf conf = job.getJobConf();
  836.     org.apache.hadoop.mapreduce.InputFormat<?,?> input =
  837.       ReflectionUtils.newInstance(job.getInputFormatClass(), job.getJobConf());
  838.     
  839.     List<org.apache.hadoop.mapreduce.InputSplit> splits = input.getSplits(job);
  840.     T[] array = (T[])
  841.       splits.toArray(new org.apache.hadoop.mapreduce.InputSplit[splits.size()]);
  842.     // sort the splits into order based on size, so that the biggest
  843.     // go first
  844.     Arrays.sort(array, new NewSplitComparator());
  845.     DataOutputStream out = writeSplitsFileHeader(conf, submitSplitFile, 
  846.                                                  array.length);
  847.     try {
  848.       if (array.length != 0) {
  849.         DataOutputBuffer buffer = new DataOutputBuffer();
  850.         RawSplit rawSplit = new RawSplit();
  851.         SerializationFactory factory = new SerializationFactory(conf);
  852.         Serializer<T> serializer = 
  853.           factory.getSerializer((Class<T>) array[0].getClass());
  854.         serializer.open(buffer);
  855.         for(T split: array) {
  856.           rawSplit.setClassName(split.getClass().getName());
  857.           buffer.reset();
  858.           serializer.serialize(split);
  859.           rawSplit.setDataLength(split.getLength());
  860.           rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
  861.           rawSplit.setLocations(split.getLocations());
  862.           rawSplit.write(out);
  863.         }
  864.         serializer.close();
  865.       }
  866.     } finally {
  867.       out.close();
  868.     }
  869.     return array.length;
  870.   }
  871.   /** 
  872.    * Checks if the job directory is clean and has all the required components 
  873.    * for (re) starting the job
  874.    */
  875.   public static boolean isJobDirValid(Path jobDirPath, FileSystem fs) 
  876.   throws IOException {
  877.     FileStatus[] contents = fs.listStatus(jobDirPath);
  878.     int matchCount = 0;
  879.     if (contents != null && contents.length >=3) {
  880.       for (FileStatus status : contents) {
  881.         if ("job.xml".equals(status.getPath().getName())) {
  882.           ++matchCount;
  883.         }
  884.         if ("job.jar".equals(status.getPath().getName())) {
  885.           ++matchCount;
  886.         }
  887.         if ("job.split".equals(status.getPath().getName())) {
  888.           ++matchCount;
  889.         }
  890.       }
  891.       if (matchCount == 3) {
  892.         return true;
  893.       }
  894.     }
  895.     return false;
  896.   }
  897.   static class RawSplit implements Writable {
  898.     private String splitClass;
  899.     private BytesWritable bytes = new BytesWritable();
  900.     private String[] locations;
  901.     long dataLength;
  902.     public void setBytes(byte[] data, int offset, int length) {
  903.       bytes.set(data, offset, length);
  904.     }
  905.     public void setClassName(String className) {
  906.       splitClass = className;
  907.     }
  908.       
  909.     public String getClassName() {
  910.       return splitClass;
  911.     }
  912.       
  913.     public BytesWritable getBytes() {
  914.       return bytes;
  915.     }
  916.     public void clearBytes() {
  917.       bytes = null;
  918.     }
  919.       
  920.     public void setLocations(String[] locations) {
  921.       this.locations = locations;
  922.     }
  923.       
  924.     public String[] getLocations() {
  925.       return locations;
  926.     }
  927.       
  928.     public void readFields(DataInput in) throws IOException {
  929.       splitClass = Text.readString(in);
  930.       dataLength = in.readLong();
  931.       bytes.readFields(in);
  932.       int len = WritableUtils.readVInt(in);
  933.       locations = new String[len];
  934.       for(int i=0; i < len; ++i) {
  935.         locations[i] = Text.readString(in);
  936.       }
  937.     }
  938.       
  939.     public void write(DataOutput out) throws IOException {
  940.       Text.writeString(out, splitClass);
  941.       out.writeLong(dataLength);
  942.       bytes.write(out);
  943.       WritableUtils.writeVInt(out, locations.length);
  944.       for(int i = 0; i < locations.length; i++) {
  945.         Text.writeString(out, locations[i]);
  946.       }        
  947.     }
  948.     public long getDataLength() {
  949.       return dataLength;
  950.     }
  951.     public void setDataLength(long l) {
  952.       dataLength = l;
  953.     }
  954.     
  955.   }
  956.     
  957.   private static final int CURRENT_SPLIT_FILE_VERSION = 0;
  958.   private static final byte[] SPLIT_FILE_HEADER = "SPL".getBytes();
  959.   private DataOutputStream writeSplitsFileHeader(Configuration conf,
  960.                                                  Path filename,
  961.                                                  int length
  962.                                                  ) throws IOException {
  963.     // write the splits to a file for the job tracker
  964.     FileSystem fs = filename.getFileSystem(conf);
  965.     FSDataOutputStream out = 
  966.       FileSystem.create(fs, filename, new FsPermission(JOB_FILE_PERMISSION));
  967.     out.write(SPLIT_FILE_HEADER);
  968.     WritableUtils.writeVInt(out, CURRENT_SPLIT_FILE_VERSION);
  969.     WritableUtils.writeVInt(out, length);
  970.     return out;
  971.   }
  972.   /** Create the list of input splits and write them out in a file for
  973.    *the JobTracker. The format is:
  974.    * <format version>
  975.    * <numSplits>
  976.    * for each split:
  977.    *    <RawSplit>
  978.    * @param splits the input splits to write out
  979.    * @param out the stream to write to
  980.    */
  981.   private void writeOldSplitsFile(InputSplit[] splits, 
  982.                                   FSDataOutputStream out) throws IOException {
  983.   }
  984.   /**
  985.    * Read a splits file into a list of raw splits
  986.    * @param in the stream to read from
  987.    * @return the complete list of splits
  988.    * @throws IOException
  989.    */
  990.   static RawSplit[] readSplitFile(DataInput in) throws IOException {
  991.     byte[] header = new byte[SPLIT_FILE_HEADER.length];
  992.     in.readFully(header);
  993.     if (!Arrays.equals(SPLIT_FILE_HEADER, header)) {
  994.       throw new IOException("Invalid header on split file");
  995.     }
  996.     int vers = WritableUtils.readVInt(in);
  997.     if (vers != CURRENT_SPLIT_FILE_VERSION) {
  998.       throw new IOException("Unsupported split version " + vers);
  999.     }
  1000.     int len = WritableUtils.readVInt(in);
  1001.     RawSplit[] result = new RawSplit[len];
  1002.     for(int i=0; i < len; ++i) {
  1003.       result[i] = new RawSplit();
  1004.       result[i].readFields(in);
  1005.     }
  1006.     return result;
  1007.   }
  1008.     
  1009.   /**
  1010.    * Get an {@link RunningJob} object to track an ongoing job.  Returns
  1011.    * null if the id does not correspond to any known job.
  1012.    * 
  1013.    * @param jobid the jobid of the job.
  1014.    * @return the {@link RunningJob} handle to track the job, null if the 
  1015.    *         <code>jobid</code> doesn't correspond to any known job.
  1016.    * @throws IOException
  1017.    */
  1018.   public RunningJob getJob(JobID jobid) throws IOException {
  1019.     JobStatus status = jobSubmitClient.getJobStatus(jobid);
  1020.     if (status != null) {
  1021.       return new NetworkedJob(status);
  1022.     } else {
  1023.       return null;
  1024.     }
  1025.   }
  1026.   /**@deprecated Applications should rather use {@link #getJob(JobID)}. 
  1027.    */
  1028.   @Deprecated
  1029.   public RunningJob getJob(String jobid) throws IOException {
  1030.     return getJob(JobID.forName(jobid));
  1031.   }
  1032.   
  1033.   /**
  1034.    * Get the information of the current state of the map tasks of a job.
  1035.    * 
  1036.    * @param jobId the job to query.
  1037.    * @return the list of all of the map tips.
  1038.    * @throws IOException
  1039.    */
  1040.   public TaskReport[] getMapTaskReports(JobID jobId) throws IOException {
  1041.     return jobSubmitClient.getMapTaskReports(jobId);
  1042.   }
  1043.   
  1044.   /**@deprecated Applications should rather use {@link #getMapTaskReports(JobID)}*/
  1045.   @Deprecated
  1046.   public TaskReport[] getMapTaskReports(String jobId) throws IOException {
  1047.     return getMapTaskReports(JobID.forName(jobId));
  1048.   }
  1049.   
  1050.   /**
  1051.    * Get the information of the current state of the reduce tasks of a job.
  1052.    * 
  1053.    * @param jobId the job to query.
  1054.    * @return the list of all of the reduce tips.
  1055.    * @throws IOException
  1056.    */    
  1057.   public TaskReport[] getReduceTaskReports(JobID jobId) throws IOException {
  1058.     return jobSubmitClient.getReduceTaskReports(jobId);
  1059.   }
  1060.   /**
  1061.    * Get the information of the current state of the cleanup tasks of a job.
  1062.    * 
  1063.    * @param jobId the job to query.
  1064.    * @return the list of all of the cleanup tips.
  1065.    * @throws IOException
  1066.    */    
  1067.   public TaskReport[] getCleanupTaskReports(JobID jobId) throws IOException {
  1068.     return jobSubmitClient.getCleanupTaskReports(jobId);
  1069.   }
  1070.   /**
  1071.    * Get the information of the current state of the setup tasks of a job.
  1072.    * 
  1073.    * @param jobId the job to query.
  1074.    * @return the list of all of the setup tips.
  1075.    * @throws IOException
  1076.    */    
  1077.   public TaskReport[] getSetupTaskReports(JobID jobId) throws IOException {
  1078.     return jobSubmitClient.getSetupTaskReports(jobId);
  1079.   }
  1080.   /**@deprecated Applications should rather use {@link #getReduceTaskReports(JobID)}*/
  1081.   @Deprecated
  1082.   public TaskReport[] getReduceTaskReports(String jobId) throws IOException {
  1083.     return getReduceTaskReports(JobID.forName(jobId));
  1084.   }
  1085.   
  1086.   /**
  1087.    * Display the information about a job's tasks, of a particular type and
  1088.    * in a particular state
  1089.    * 
  1090.    * @param jobId the ID of the job
  1091.    * @param type the type of the task (map/reduce/setup/cleanup)
  1092.    * @param state the state of the task 
  1093.    * (pending/running/completed/failed/killed)
  1094.    */
  1095.   public void displayTasks(JobID jobId, String type, String state) 
  1096.   throws IOException {
  1097.     TaskReport[] reports = new TaskReport[0];
  1098.     if (type.equals("map")) {
  1099.       reports = getMapTaskReports(jobId);
  1100.     } else if (type.equals("reduce")) {
  1101.       reports = getReduceTaskReports(jobId);
  1102.     } else if (type.equals("setup")) {
  1103.       reports = getSetupTaskReports(jobId);
  1104.     } else if (type.equals("cleanup")) {
  1105.       reports = getCleanupTaskReports(jobId);
  1106.     }
  1107.     for (TaskReport report : reports) {
  1108.       TIPStatus status = report.getCurrentStatus();
  1109.       if ((state.equals("pending") && status ==TIPStatus.PENDING) ||
  1110.           (state.equals("running") && status ==TIPStatus.RUNNING) ||
  1111.           (state.equals("completed") && status == TIPStatus.COMPLETE) ||
  1112.           (state.equals("failed") && status == TIPStatus.FAILED) ||
  1113.           (state.equals("killed") && status == TIPStatus.KILLED)) {
  1114.         printTaskAttempts(report);
  1115.       }
  1116.     }
  1117.   }
  1118.   private void printTaskAttempts(TaskReport report) {
  1119.     if (report.getCurrentStatus() == TIPStatus.COMPLETE) {
  1120.       System.out.println(report.getSuccessfulTaskAttempt());
  1121.     } else if (report.getCurrentStatus() == TIPStatus.RUNNING) {
  1122.       for (TaskAttemptID t : 
  1123.         report.getRunningTaskAttempts()) {
  1124.         System.out.println(t);
  1125.       }
  1126.     }
  1127.   }
  1128.   /**
  1129.    * Get status information about the Map-Reduce cluster.
  1130.    *  
  1131.    * @return the status information about the Map-Reduce cluster as an object
  1132.    *         of {@link ClusterStatus}.
  1133.    * @throws IOException
  1134.    */
  1135.   public ClusterStatus getClusterStatus() throws IOException {
  1136.     return getClusterStatus(false);
  1137.   }
  1138.   /**
  1139.    * Get status information about the Map-Reduce cluster.
  1140.    *  
  1141.    * @param  detailed if true then get a detailed status including the
  1142.    *         tracker names
  1143.    * @return the status information about the Map-Reduce cluster as an object
  1144.    *         of {@link ClusterStatus}.
  1145.    * @throws IOException
  1146.    */
  1147.   public ClusterStatus getClusterStatus(boolean detailed) throws IOException {
  1148.     return jobSubmitClient.getClusterStatus(detailed);
  1149.   }
  1150.     
  1151.   /** 
  1152.    * Get the jobs that are not completed and not failed.
  1153.    * 
  1154.    * @return array of {@link JobStatus} for the running/to-be-run jobs.
  1155.    * @throws IOException
  1156.    */
  1157.   public JobStatus[] jobsToComplete() throws IOException {
  1158.     return jobSubmitClient.jobsToComplete();
  1159.   }
  1160.   private static void downloadProfile(TaskCompletionEvent e
  1161.                                       ) throws IOException  {
  1162.     URLConnection connection = 
  1163.       new URL(getTaskLogURL(e.getTaskAttemptId(), e.getTaskTrackerHttp()) + 
  1164.               "&filter=profile").openConnection();
  1165.     InputStream in = connection.getInputStream();
  1166.     OutputStream out = new FileOutputStream(e.getTaskAttemptId() + ".profile");
  1167.     IOUtils.copyBytes(in, out, 64 * 1024, true);
  1168.   }
  1169.   /** 
  1170.    * Get the jobs that are submitted.
  1171.    * 
  1172.    * @return array of {@link JobStatus} for the submitted jobs.
  1173.    * @throws IOException
  1174.    */
  1175.   public JobStatus[] getAllJobs() throws IOException {
  1176.     return jobSubmitClient.getAllJobs();
  1177.   }
  1178.   
  1179.   /** 
  1180.    * Utility that submits a job, then polls for progress until the job is
  1181.    * complete.
  1182.    * 
  1183.    * @param job the job configuration.
  1184.    * @throws IOException if the job fails
  1185.    */
  1186.   public static RunningJob runJob(JobConf job) throws IOException {
  1187.     JobClient jc = new JobClient(job);
  1188.     RunningJob rj = jc.submitJob(job);
  1189.     try {
  1190.       if (!jc.monitorAndPrintJob(job, rj)) {
  1191.         throw new IOException("Job failed!");
  1192.       }
  1193.     } catch (InterruptedException ie) {
  1194.       Thread.currentThread().interrupt();
  1195.     }
  1196.     return rj;
  1197.   }
  1198.   
  1199.   /**
  1200.    * Monitor a job and print status in real-time as progress is made and tasks 
  1201.    * fail.
  1202.    * @param conf the job's configuration
  1203.    * @param job the job to track
  1204.    * @return true if the job succeeded
  1205.    * @throws IOException if communication to the JobTracker fails
  1206.    */
  1207.   public boolean monitorAndPrintJob(JobConf conf, 
  1208.                                     RunningJob job
  1209.   ) throws IOException, InterruptedException {
  1210.     String lastReport = null;
  1211.     TaskStatusFilter filter;
  1212.     filter = getTaskOutputFilter(conf);
  1213.     JobID jobId = job.getID();
  1214.     LOG.info("Running job: " + jobId);
  1215.     int eventCounter = 0;
  1216.     boolean profiling = conf.getProfileEnabled();
  1217.     Configuration.IntegerRanges mapRanges = conf.getProfileTaskRange(true);
  1218.     Configuration.IntegerRanges reduceRanges = conf.getProfileTaskRange(false);
  1219.     while (!job.isComplete()) {
  1220.       Thread.sleep(1000);
  1221.       String report = 
  1222.         (" map " + StringUtils.formatPercent(job.mapProgress(), 0)+
  1223.             " reduce " + 
  1224.             StringUtils.formatPercent(job.reduceProgress(), 0));
  1225.       if (!report.equals(lastReport)) {
  1226.         LOG.info(report);
  1227.         lastReport = report;
  1228.       }
  1229.       TaskCompletionEvent[] events = 
  1230.         job.getTaskCompletionEvents(eventCounter); 
  1231.       eventCounter += events.length;
  1232.       for(TaskCompletionEvent event : events){
  1233.         TaskCompletionEvent.Status status = event.getTaskStatus();
  1234.         if (profiling && 
  1235.             (status == TaskCompletionEvent.Status.SUCCEEDED ||
  1236.                 status == TaskCompletionEvent.Status.FAILED) &&
  1237.                 (event.isMap ? mapRanges : reduceRanges).
  1238.                 isIncluded(event.idWithinJob())) {
  1239.           downloadProfile(event);
  1240.         }
  1241.         switch(filter){
  1242.         case NONE:
  1243.           break;
  1244.         case SUCCEEDED:
  1245.           if (event.getTaskStatus() == 
  1246.             TaskCompletionEvent.Status.SUCCEEDED){
  1247.             LOG.info(event.toString());
  1248.             displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
  1249.           }
  1250.           break; 
  1251.         case FAILED:
  1252.           if (event.getTaskStatus() == 
  1253.             TaskCompletionEvent.Status.FAILED){
  1254.             LOG.info(event.toString());
  1255.             // Displaying the task diagnostic information
  1256.             TaskAttemptID taskId = event.getTaskAttemptId();
  1257.             String[] taskDiagnostics = 
  1258.               jobSubmitClient.getTaskDiagnostics(taskId); 
  1259.             if (taskDiagnostics != null) {
  1260.               for(String diagnostics : taskDiagnostics){
  1261.                 System.err.println(diagnostics);
  1262.               }
  1263.             }
  1264.             // Displaying the task logs
  1265.             displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
  1266.           }
  1267.           break; 
  1268.         case KILLED:
  1269.           if (event.getTaskStatus() == TaskCompletionEvent.Status.KILLED){
  1270.             LOG.info(event.toString());
  1271.           }
  1272.           break; 
  1273.         case ALL:
  1274.           LOG.info(event.toString());
  1275.           displayTaskLogs(event.getTaskAttemptId(), event.getTaskTrackerHttp());
  1276.           break;
  1277.         }
  1278.       }
  1279.     }
  1280.     LOG.info("Job complete: " + jobId);
  1281.     job.getCounters().log(LOG);
  1282.     return job.isSuccessful();
  1283.   }
  1284.   static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
  1285.     return (baseUrl + "/tasklog?plaintext=true&taskid=" + taskId); 
  1286.   }
  1287.   
  1288.   private static void displayTaskLogs(TaskAttemptID taskId, String baseUrl)
  1289.     throws IOException {
  1290.     // The tasktracker for a 'failed/killed' job might not be around...
  1291.     if (baseUrl != null) {
  1292.       // Construct the url for the tasklogs
  1293.       String taskLogUrl = getTaskLogURL(taskId, baseUrl);
  1294.       
  1295.       // Copy tasks's stdout of the JobClient
  1296.       getTaskLogs(taskId, new URL(taskLogUrl+"&filter=stdout"), System.out);
  1297.         
  1298.       // Copy task's stderr to stderr of the JobClient 
  1299.       getTaskLogs(taskId, new URL(taskLogUrl+"&filter=stderr"), System.err);
  1300.     }
  1301.   }
  1302.     
  1303.   private static void getTaskLogs(TaskAttemptID taskId, URL taskLogUrl, 
  1304.                                   OutputStream out) {
  1305.     try {
  1306.       URLConnection connection = taskLogUrl.openConnection();
  1307.       BufferedReader input = 
  1308.         new BufferedReader(new InputStreamReader(connection.getInputStream()));
  1309.       BufferedWriter output = 
  1310.         new BufferedWriter(new OutputStreamWriter(out));
  1311.       try {
  1312.         String logData = null;
  1313.         while ((logData = input.readLine()) != null) {
  1314.           if (logData.length() > 0) {
  1315.             output.write(taskId + ": " + logData + "n");
  1316.             output.flush();
  1317.           }
  1318.         }
  1319.       } finally {
  1320.         input.close();
  1321.       }
  1322.     }catch(IOException ioe){
  1323.       LOG.warn("Error reading task output" + ioe.getMessage()); 
  1324.     }
  1325.   }    
  1326.   static Configuration getConfiguration(String jobTrackerSpec)
  1327.   {
  1328.     Configuration conf = new Configuration();
  1329.     if (jobTrackerSpec != null) {        
  1330.       if (jobTrackerSpec.indexOf(":") >= 0) {
  1331.         conf.set("mapred.job.tracker", jobTrackerSpec);
  1332.       } else {
  1333.         String classpathFile = "hadoop-" + jobTrackerSpec + ".xml";
  1334.         URL validate = conf.getResource(classpathFile);
  1335.         if (validate == null) {
  1336.           throw new RuntimeException(classpathFile + " not found on CLASSPATH");
  1337.         }
  1338.         conf.addResource(classpathFile);
  1339.       }
  1340.     }
  1341.     return conf;
  1342.   }
  1343.   /**
  1344.    * Sets the output filter for tasks. only those tasks are printed whose
  1345.    * output matches the filter. 
  1346.    * @param newValue task filter.
  1347.    */
  1348.   @Deprecated
  1349.   public void setTaskOutputFilter(TaskStatusFilter newValue){
  1350.     this.taskOutputFilter = newValue;
  1351.   }
  1352.     
  1353.   /**
  1354.    * Get the task output filter out of the JobConf.
  1355.    * 
  1356.    * @param job the JobConf to examine.
  1357.    * @return the filter level.
  1358.    */
  1359.   public static TaskStatusFilter getTaskOutputFilter(JobConf job) {
  1360.     return TaskStatusFilter.valueOf(job.get("jobclient.output.filter", 
  1361.                                             "FAILED"));
  1362.   }
  1363.     
  1364.   /**
  1365.    * Modify the JobConf to set the task output filter.
  1366.    * 
  1367.    * @param job the JobConf to modify.
  1368.    * @param newValue the value to set.
  1369.    */
  1370.   public static void setTaskOutputFilter(JobConf job, 
  1371.                                          TaskStatusFilter newValue) {
  1372.     job.set("jobclient.output.filter", newValue.toString());
  1373.   }
  1374.     
  1375.   /**
  1376.    * Returns task output filter.
  1377.    * @return task filter. 
  1378.    */
  1379.   @Deprecated
  1380.   public TaskStatusFilter getTaskOutputFilter(){
  1381.     return this.taskOutputFilter; 
  1382.   }
  1383.   private String getJobPriorityNames() {
  1384.     StringBuffer sb = new StringBuffer();
  1385.     for (JobPriority p : JobPriority.values()) {
  1386.       sb.append(p.name()).append(" ");
  1387.     }
  1388.     return sb.substring(0, sb.length()-1);
  1389.   }
  1390.   
  1391.   /**
  1392.    * Display usage of the command-line tool and terminate execution
  1393.    */
  1394.   private void displayUsage(String cmd) {
  1395.     String prefix = "Usage: JobClient ";
  1396.     String jobPriorityValues = getJobPriorityNames();
  1397.     String taskTypes = "map, reduce, setup, cleanup";
  1398.     String taskStates = "running, completed";
  1399.     if("-submit".equals(cmd)) {
  1400.       System.err.println(prefix + "[" + cmd + " <job-file>]");
  1401.     } else if ("-status".equals(cmd) || "-kill".equals(cmd)) {
  1402.       System.err.println(prefix + "[" + cmd + " <job-id>]");
  1403.     } else if ("-counter".equals(cmd)) {
  1404.       System.err.println(prefix + "[" + cmd + " <job-id> <group-name> <counter-name>]");
  1405.     } else if ("-events".equals(cmd)) {
  1406.       System.err.println(prefix + "[" + cmd + " <job-id> <from-event-#> <#-of-events>]");
  1407.     } else if ("-history".equals(cmd)) {
  1408.       System.err.println(prefix + "[" + cmd + " <jobOutputDir>]");
  1409.     } else if ("-list".equals(cmd)) {
  1410.       System.err.println(prefix + "[" + cmd + " [all]]");
  1411.     } else if ("-kill-task".equals(cmd) || "-fail-task".equals(cmd)) {
  1412.       System.err.println(prefix + "[" + cmd + " <task-id>]");
  1413.     } else if ("-set-priority".equals(cmd)) {
  1414.       System.err.println(prefix + "[" + cmd + " <job-id> <priority>]. " +
  1415.           "Valid values for priorities are: " 
  1416.           + jobPriorityValues); 
  1417.     } else if ("-list-active-trackers".equals(cmd)) {
  1418.       System.err.println(prefix + "[" + cmd + "]");
  1419.     } else if ("-list-blacklisted-trackers".equals(cmd)) {
  1420.       System.err.println(prefix + "[" + cmd + "]");
  1421.     } else if ("-list-attempt-ids".equals(cmd)) {
  1422.       System.err.println(prefix + "[" + cmd + 
  1423.           " <job-id> <task-type> <task-state>]. " +
  1424.           "Valid values for <task-type> are " + taskTypes + ". " +
  1425.           "Valid values for <task-state> are " + taskStates);
  1426.     } else {
  1427.       System.err.printf(prefix + "<command> <args>n");
  1428.       System.err.printf("t[-submit <job-file>]n");
  1429.       System.err.printf("t[-status <job-id>]n");
  1430.       System.err.printf("t[-counter <job-id> <group-name> <counter-name>]n");
  1431.       System.err.printf("t[-kill <job-id>]n");
  1432.       System.err.printf("t[-set-priority <job-id> <priority>]. " +
  1433.                                       "Valid values for priorities are: " +
  1434.                                       jobPriorityValues + "n");
  1435.       System.err.printf("t[-events <job-id> <from-event-#> <#-of-events>]n");
  1436.       System.err.printf("t[-history <jobOutputDir>]n");
  1437.       System.err.printf("t[-list [all]]n");
  1438.       System.err.printf("t[-list-active-trackers]n");
  1439.       System.err.printf("t[-list-blacklisted-trackers]n");
  1440.       System.err.println("t[-list-attempt-ids <job-id> <task-type> " +
  1441.        "<task-state>]n");
  1442.       System.err.printf("t[-kill-task <task-id>]n");
  1443.       System.err.printf("t[-fail-task <task-id>]nn");
  1444.       ToolRunner.printGenericCommandUsage(System.out);
  1445.     }
  1446.   }
  1447.     
  1448.   public int run(String[] argv) throws Exception {
  1449.     int exitCode = -1;
  1450.     if (argv.length < 1) {
  1451.       displayUsage("");
  1452.       return exitCode;
  1453.     }    
  1454.     // process arguments
  1455.     String cmd = argv[0];
  1456.     String submitJobFile = null;
  1457.     String jobid = null;
  1458.     String taskid = null;
  1459.     String outputDir = null;
  1460.     String counterGroupName = null;
  1461.     String counterName = null;
  1462.     String newPriority = null;
  1463.     String taskType = null;
  1464.     String taskState = null;
  1465.     int fromEvent = 0;
  1466.     int nEvents = 0;
  1467.     boolean getStatus = false;
  1468.     boolean getCounter = false;
  1469.     boolean killJob = false;
  1470.     boolean listEvents = false;
  1471.     boolean viewHistory = false;
  1472.     boolean viewAllHistory = false;
  1473.     boolean listJobs = false;
  1474.     boolean listAllJobs = false;
  1475.     boolean listActiveTrackers = false;
  1476.     boolean listBlacklistedTrackers = false;
  1477.     boolean displayTasks = false;
  1478.     boolean killTask = false;
  1479.     boolean failTask = false;
  1480.     boolean setJobPriority = false;
  1481.     if ("-submit".equals(cmd)) {
  1482.       if (argv.length != 2) {
  1483.         displayUsage(cmd);
  1484.         return exitCode;
  1485.       }
  1486.       submitJobFile = argv[1];
  1487.     } else if ("-status".equals(cmd)) {
  1488.       if (argv.length != 2) {
  1489.         displayUsage(cmd);
  1490.         return exitCode;
  1491.       }
  1492.       jobid = argv[1];
  1493.       getStatus = true;
  1494.     } else if("-counter".equals(cmd)) {
  1495.       if (argv.length != 4) {
  1496.         displayUsage(cmd);
  1497.         return exitCode;
  1498.       }
  1499.       getCounter = true;
  1500.       jobid = argv[1];
  1501.       counterGroupName = argv[2];
  1502.       counterName = argv[3];
  1503.     } else if ("-kill".equals(cmd)) {
  1504.       if (argv.length != 2) {
  1505.         displayUsage(cmd);
  1506.         return exitCode;
  1507.       }
  1508.       jobid = argv[1];
  1509.       killJob = true;
  1510.     } else if ("-set-priority".equals(cmd)) {
  1511.       if (argv.length != 3) {
  1512.         displayUsage(cmd);
  1513.         return exitCode;
  1514.       }
  1515.       jobid = argv[1];
  1516.       newPriority = argv[2];
  1517.       try {
  1518.         JobPriority jp = JobPriority.valueOf(newPriority); 
  1519.       } catch (IllegalArgumentException iae) {
  1520.         displayUsage(cmd);
  1521.         return exitCode;
  1522.       }
  1523.       setJobPriority = true; 
  1524.     } else if ("-events".equals(cmd)) {
  1525.       if (argv.length != 4) {
  1526.         displayUsage(cmd);
  1527.         return exitCode;
  1528.       }
  1529.       jobid = argv[1];
  1530.       fromEvent = Integer.parseInt(argv[2]);
  1531.       nEvents = Integer.parseInt(argv[3]);
  1532.       listEvents = true;
  1533.     } else if ("-history".equals(cmd)) {
  1534.       if (argv.length != 2 && !(argv.length == 3 && "all".equals(argv[1]))) {
  1535.          displayUsage(cmd);
  1536.          return exitCode;
  1537.       }
  1538.       viewHistory = true;
  1539.       if (argv.length == 3 && "all".equals(argv[1])) {
  1540.          viewAllHistory = true;
  1541.          outputDir = argv[2];
  1542.       } else {
  1543.          outputDir = argv[1];
  1544.       }
  1545.     } else if ("-list".equals(cmd)) {
  1546.       if (argv.length != 1 && !(argv.length == 2 && "all".equals(argv[1]))) {
  1547.         displayUsage(cmd);
  1548.         return exitCode;
  1549.       }
  1550.       if (argv.length == 2 && "all".equals(argv[1])) {
  1551.         listAllJobs = true;
  1552.       } else {
  1553.         listJobs = true;
  1554.       }
  1555.     } else if("-kill-task".equals(cmd)) {
  1556.       if(argv.length != 2) {
  1557.         displayUsage(cmd);
  1558.         return exitCode;
  1559.       }
  1560.       killTask = true;
  1561.       taskid = argv[1];
  1562.     } else if("-fail-task".equals(cmd)) {
  1563.       if(argv.length != 2) {
  1564.         displayUsage(cmd);
  1565.         return exitCode;
  1566.       }
  1567.       failTask = true;
  1568.       taskid = argv[1];
  1569.     } else if ("-list-active-trackers".equals(cmd)) {
  1570.       if (argv.length != 1) {
  1571.         displayUsage(cmd);
  1572.         return exitCode;
  1573.       }
  1574.       listActiveTrackers = true;
  1575.     } else if ("-list-blacklisted-trackers".equals(cmd)) {
  1576.       if (argv.length != 1) {
  1577.         displayUsage(cmd);
  1578.         return exitCode;
  1579.       }
  1580.       listBlacklistedTrackers = true;
  1581.     } else if ("-list-attempt-ids".equals(cmd)) {
  1582.       if (argv.length != 4) {
  1583.         displayUsage(cmd);
  1584.         return exitCode;
  1585.       }
  1586.       jobid = argv[1];
  1587.       taskType = argv[2];
  1588.       taskState = argv[3];
  1589.       displayTasks = true;
  1590.     } else {
  1591.       displayUsage(cmd);
  1592.       return exitCode;
  1593.     }
  1594.     // initialize JobClient
  1595.     JobConf conf = null;
  1596.     if (submitJobFile != null) {
  1597.       conf = new JobConf(submitJobFile);
  1598.     } else {
  1599.       conf = new JobConf(getConf());
  1600.     }
  1601.     init(conf);
  1602.         
  1603.     // Submit the request
  1604.     try {
  1605.       if (submitJobFile != null) {
  1606.         RunningJob job = submitJob(conf);
  1607.         System.out.println("Created job " + job.getID());
  1608.         exitCode = 0;
  1609.       } else if (getStatus) {
  1610.         RunningJob job = getJob(JobID.forName(jobid));
  1611.         if (job == null) {
  1612.           System.out.println("Could not find job " + jobid);
  1613.         } else {
  1614.           System.out.println();
  1615.           System.out.println(job);
  1616.           System.out.println(job.getCounters());
  1617.           exitCode = 0;
  1618.         }
  1619.       } else if (getCounter) {
  1620.         RunningJob job = getJob(JobID.forName(jobid));
  1621.         if (job == null) {
  1622.           System.out.println("Could not find job " + jobid);
  1623.         } else {
  1624.           Counters counters = job.getCounters();
  1625.           Group group = counters.getGroup(counterGroupName);
  1626.           Counter counter = group.getCounterForName(counterName);
  1627.           System.out.println(counter.getCounter());
  1628.           exitCode = 0;
  1629.         }
  1630.       } else if (killJob) {
  1631.         RunningJob job = getJob(JobID.forName(jobid));
  1632.         if (job == null) {
  1633.           System.out.println("Could not find job " + jobid);
  1634.         } else {
  1635.           job.killJob();
  1636.           System.out.println("Killed job " + jobid);
  1637.           exitCode = 0;
  1638.         }
  1639.       } else if (setJobPriority) {
  1640.         RunningJob job = getJob(JobID.forName(jobid));
  1641.         if (job == null) {
  1642.           System.out.println("Could not find job " + jobid);
  1643.         } else {
  1644.           job.setJobPriority(newPriority);
  1645.           System.out.println("Changed job priority.");
  1646.           exitCode = 0;
  1647.         } 
  1648.       } else if (viewHistory) {
  1649.         viewHistory(outputDir, viewAllHistory);
  1650.         exitCode = 0;
  1651.       } else if (listEvents) {
  1652.         listEvents(JobID.forName(jobid), fromEvent, nEvents);
  1653.         exitCode = 0;
  1654.       } else if (listJobs) {
  1655.         listJobs();
  1656.         exitCode = 0;
  1657.       } else if (listAllJobs) {
  1658.         listAllJobs();
  1659.         exitCode = 0;
  1660.       } else if (listActiveTrackers) {
  1661.         listActiveTrackers();
  1662.         exitCode = 0;
  1663.       } else if (listBlacklistedTrackers) {
  1664.         listBlacklistedTrackers();
  1665.         exitCode = 0;
  1666.       } else if (displayTasks) {
  1667.         displayTasks(JobID.forName(jobid), taskType, taskState);
  1668.       } else if(killTask) {
  1669.         if(jobSubmitClient.killTask(TaskAttemptID.forName(taskid), false)) {
  1670.           System.out.println("Killed task " + taskid);
  1671.           exitCode = 0;
  1672.         } else {
  1673.           System.out.println("Could not kill task " + taskid);
  1674.           exitCode = -1;
  1675.         }
  1676.       } else if(failTask) {
  1677.         if(jobSubmitClient.killTask(TaskAttemptID.forName(taskid), true)) {
  1678.           System.out.println("Killed task " + taskid + " by failing it");
  1679.           exitCode = 0;
  1680.         } else {
  1681.           System.out.println("Could not fail task " + taskid);
  1682.           exitCode = -1;
  1683.         }
  1684.       }
  1685.     } finally {
  1686.       close();
  1687.     }
  1688.     return exitCode;
  1689.   }
  1690.   private void viewHistory(String outputDir, boolean all) 
  1691.     throws IOException {
  1692.     HistoryViewer historyViewer = new HistoryViewer(outputDir,
  1693.                                         getConf(), all);
  1694.     historyViewer.print();
  1695.   }
  1696.   
  1697.   /**
  1698.    * List the events for the given job
  1699.    * @param jobId the job id for the job's events to list
  1700.    * @throws IOException
  1701.    */
  1702.   private void listEvents(JobID jobId, int fromEventId, int numEvents)
  1703.     throws IOException {
  1704.     TaskCompletionEvent[] events = 
  1705.       jobSubmitClient.getTaskCompletionEvents(jobId, fromEventId, numEvents);
  1706.     System.out.println("Task completion events for " + jobId);
  1707.     System.out.println("Number of events (from " + fromEventId + 
  1708.                        ") are: " + events.length);
  1709.     for(TaskCompletionEvent event: events) {
  1710.       System.out.println(event.getTaskStatus() + " " + event.getTaskAttemptId() + " " + 
  1711.                          getTaskLogURL(event.getTaskAttemptId(), 
  1712.                                        event.getTaskTrackerHttp()));
  1713.     }
  1714.   }
  1715.   /**
  1716.    * Dump a list of currently running jobs
  1717.    * @throws IOException
  1718.    */
  1719.   private void listJobs() throws IOException {
  1720.     JobStatus[] jobs = jobsToComplete();
  1721.     if (jobs == null)
  1722.       jobs = new JobStatus[0];
  1723.     System.out.printf("%d jobs currently runningn", jobs.length);
  1724.     displayJobList(jobs);
  1725.   }
  1726.     
  1727.   /**
  1728.    * Dump a list of all jobs submitted.
  1729.    * @throws IOException
  1730.    */
  1731.   private void listAllJobs() throws IOException {
  1732.     JobStatus[] jobs = getAllJobs();
  1733.     if (jobs == null)
  1734.       jobs = new JobStatus[0];
  1735.     System.out.printf("%d jobs submittedn", jobs.length);
  1736.     System.out.printf("States are:ntRunning : 1tSucceded : 2" +
  1737.     "tFailed : 3tPrep : 4n");
  1738.     displayJobList(jobs);
  1739.   }
  1740.   
  1741.   /**
  1742.    * Display the list of active trackers
  1743.    */
  1744.   private void listActiveTrackers() throws IOException {
  1745.     ClusterStatus c = jobSubmitClient.getClusterStatus(true);
  1746.     Collection<String> trackers = c.getActiveTrackerNames();
  1747.     for (String trackerName : trackers) {
  1748.       System.out.println(trackerName);
  1749.     }
  1750.   }
  1751.   /**
  1752.    * Display the list of blacklisted trackers
  1753.    */
  1754.   private void listBlacklistedTrackers() throws IOException {
  1755.     ClusterStatus c = jobSubmitClient.getClusterStatus(true);
  1756.     Collection<String> trackers = c.getBlacklistedTrackerNames();
  1757.     for (String trackerName : trackers) {
  1758.       System.out.println(trackerName);
  1759.     }
  1760.   }
  1761.   void displayJobList(JobStatus[] jobs) {
  1762.     System.out.printf("JobIdtStatetStartTimetUserNametPrioritytSchedulingInfon");
  1763.     for (JobStatus job : jobs) {
  1764.       System.out.printf("%st%dt%dt%st%st%sn", job.getJobID(), job.getRunState(),
  1765.           job.getStartTime(), job.getUsername(), 
  1766.           job.getJobPriority().name(), job.getSchedulingInfo());
  1767.     }
  1768.   }
  1769.   /**
  1770.    * Get status information about the max available Maps in the cluster.
  1771.    *  
  1772.    * @return the max available Maps in the cluster
  1773.    * @throws IOException
  1774.    */
  1775.   public int getDefaultMaps() throws IOException {
  1776.     return getClusterStatus().getMaxMapTasks();
  1777.   }
  1778.   /**
  1779.    * Get status information about the max available Reduces in the cluster.
  1780.    *  
  1781.    * @return the max available Reduces in the cluster
  1782.    * @throws IOException
  1783.    */
  1784.   public int getDefaultReduces() throws IOException {
  1785.     return getClusterStatus().getMaxReduceTasks();
  1786.   }
  1787.   /**
  1788.    * Grab the jobtracker system directory path where job-specific files are to be placed.
  1789.    * 
  1790.    * @return the system directory where job-specific files are to be placed.
  1791.    */
  1792.   public Path getSystemDir() {
  1793.     if (sysDir == null) {
  1794.       sysDir = new Path(jobSubmitClient.getSystemDir());
  1795.     }
  1796.     return sysDir;
  1797.   }
  1798.   
  1799.   
  1800.   /**
  1801.    * Return an array of queue information objects about all the Job Queues
  1802.    * configured.
  1803.    * 
  1804.    * @return Array of JobQueueInfo objects
  1805.    * @throws IOException
  1806.    */
  1807.   public JobQueueInfo[] getQueues() throws IOException {
  1808.     return jobSubmitClient.getQueues();
  1809.   }
  1810.   
  1811.   /**
  1812.    * Gets all the jobs which were added to particular Job Queue
  1813.    * 
  1814.    * @param queueName name of the Job Queue
  1815.    * @return Array of jobs present in the job queue
  1816.    * @throws IOException
  1817.    */
  1818.   
  1819.   public JobStatus[] getJobsFromQueue(String queueName) throws IOException {
  1820.     return jobSubmitClient.getJobsFromQueue(queueName);
  1821.   }
  1822.   
  1823.   /**
  1824.    * Gets the queue information associated to a particular Job Queue
  1825.    * 
  1826.    * @param queueName name of the job queue.
  1827.    * @return Queue information associated to particular queue.
  1828.    * @throws IOException
  1829.    */
  1830.   public JobQueueInfo getQueueInfo(String queueName) throws IOException {
  1831.     return jobSubmitClient.getQueueInfo(queueName);
  1832.   }
  1833.   
  1834.   /**
  1835.    */
  1836.   public static void main(String argv[]) throws Exception {
  1837.     int res = ToolRunner.run(new JobClient(), argv);
  1838.     System.exit(res);
  1839.   }
  1840. }