JobTracker.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:127k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.IOException;
  20. import java.net.BindException;
  21. import java.net.InetSocketAddress;
  22. import java.net.UnknownHostException;
  23. import java.text.ParseException;
  24. import java.text.SimpleDateFormat;
  25. import java.util.ArrayList;
  26. import java.util.Collection;
  27. import java.util.Collections;
  28. import java.util.Comparator;
  29. import java.util.Date;
  30. import java.util.HashMap;
  31. import java.util.HashSet;
  32. import java.util.Iterator;
  33. import java.util.LinkedHashMap;
  34. import java.util.List;
  35. import java.util.Map;
  36. import java.util.Properties;
  37. import java.util.Set;
  38. import java.util.TreeMap;
  39. import java.util.TreeSet;
  40. import java.util.Vector;
  41. import java.util.concurrent.ConcurrentHashMap;
  42. import java.util.concurrent.CopyOnWriteArrayList;
  43. import org.apache.commons.logging.Log;
  44. import org.apache.commons.logging.LogFactory;
  45. import org.apache.hadoop.conf.Configuration;
  46. import org.apache.hadoop.fs.FSDataInputStream;
  47. import org.apache.hadoop.fs.FSDataOutputStream;
  48. import org.apache.hadoop.fs.FileStatus;
  49. import org.apache.hadoop.fs.FileSystem;
  50. import org.apache.hadoop.fs.Path;
  51. import org.apache.hadoop.fs.permission.FsPermission;
  52. import org.apache.hadoop.http.HttpServer;
  53. import org.apache.hadoop.ipc.RPC;
  54. import org.apache.hadoop.ipc.RemoteException;
  55. import org.apache.hadoop.ipc.Server;
  56. import org.apache.hadoop.ipc.RPC.VersionMismatch;
  57. import org.apache.hadoop.mapred.JobHistory.Keys;
  58. import org.apache.hadoop.mapred.JobHistory.Listener;
  59. import org.apache.hadoop.mapred.JobHistory.Values;
  60. import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
  61. import org.apache.hadoop.net.DNSToSwitchMapping;
  62. import org.apache.hadoop.net.NetUtils;
  63. import org.apache.hadoop.net.NetworkTopology;
  64. import org.apache.hadoop.net.Node;
  65. import org.apache.hadoop.net.NodeBase;
  66. import org.apache.hadoop.net.ScriptBasedMapping;
  67. import org.apache.hadoop.security.AccessControlException;
  68. import org.apache.hadoop.security.SecurityUtil;
  69. import org.apache.hadoop.security.UserGroupInformation;
  70. import org.apache.hadoop.security.authorize.AuthorizationException;
  71. import org.apache.hadoop.security.authorize.ConfiguredPolicy;
  72. import org.apache.hadoop.security.authorize.PolicyProvider;
  73. import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
  74. import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
  75. import org.apache.hadoop.util.HostsFileReader;
  76. import org.apache.hadoop.util.ReflectionUtils;
  77. import org.apache.hadoop.util.StringUtils;
  78. import org.apache.hadoop.util.VersionInfo;
  79. /*******************************************************
  80.  * JobTracker is the central location for submitting and 
  81.  * tracking MR jobs in a network environment.
  82.  *
  83.  *******************************************************/
  84. public class JobTracker implements MRConstants, InterTrackerProtocol,
  85.     JobSubmissionProtocol, TaskTrackerManager, RefreshAuthorizationPolicyProtocol {
  86.   static{
  87.     Configuration.addDefaultResource("mapred-default.xml");
  88.     Configuration.addDefaultResource("mapred-site.xml");
  89.   }
  90.   static long TASKTRACKER_EXPIRY_INTERVAL = 10 * 60 * 1000;
  91.   static long RETIRE_JOB_INTERVAL;
  92.   static long RETIRE_JOB_CHECK_INTERVAL;
  93.   // The interval after which one fault of a tracker will be discarded,
  94.   // if there are no faults during this. 
  95.   private static long UPDATE_FAULTY_TRACKER_INTERVAL = 24 * 60 * 60 * 1000;
  96.   // The maximum percentage of trackers in cluster added 
  97.   // to the 'blacklist' across all the jobs.
  98.   private static double MAX_BLACKLIST_PERCENT = 0.50;
  99.   // A tracker is blacklisted across jobs only if number of 
  100.   // blacklists are X% above the average number of blacklists.
  101.   // X is the blacklist threshold here.
  102.   private double AVERAGE_BLACKLIST_THRESHOLD = 0.50;
  103.   // The maximum number of blacklists for a tracker after which the 
  104.   // tracker could be blacklisted across all jobs
  105.   private int MAX_BLACKLISTS_PER_TRACKER = 4;
  106.   public static enum State { INITIALIZING, RUNNING }
  107.   State state = State.INITIALIZING;
  108.   private static final int SYSTEM_DIR_CLEANUP_RETRY_PERIOD = 10000;
  109.   private DNSToSwitchMapping dnsToSwitchMapping;
  110.   private NetworkTopology clusterMap = new NetworkTopology();
  111.   private int numTaskCacheLevels; // the max level to which we cache tasks
  112.   private Set<Node> nodesAtMaxLevel = new HashSet<Node>();
  113.   private final TaskScheduler taskScheduler;
  114.   private final List<JobInProgressListener> jobInProgressListeners =
  115.     new CopyOnWriteArrayList<JobInProgressListener>();
  116.   // system directories are world-wide readable and owner readable
  117.   final static FsPermission SYSTEM_DIR_PERMISSION =
  118.     FsPermission.createImmutable((short) 0733); // rwx-wx-wx
  119.   // system files should have 700 permission
  120.   final static FsPermission SYSTEM_FILE_PERMISSION =
  121.     FsPermission.createImmutable((short) 0700); // rwx------
  122.   /**
  123.    * A client tried to submit a job before the Job Tracker was ready.
  124.    */
  125.   public static class IllegalStateException extends IOException {
  126.     public IllegalStateException(String msg) {
  127.       super(msg);
  128.     }
  129.   }
  130.   /**
  131.    * The maximum no. of 'completed' (successful/failed/killed)
  132.    * jobs kept in memory per-user. 
  133.    */
  134.   final int MAX_COMPLETE_USER_JOBS_IN_MEMORY;
  135.    /**
  136.     * The minimum time (in ms) that a job's information has to remain
  137.     * in the JobTracker's memory before it is retired.
  138.     */
  139.   static final int MIN_TIME_BEFORE_RETIRE = 60000;
  140.   private int nextJobId = 1;
  141.   public static final Log LOG = LogFactory.getLog(JobTracker.class);
  142.     
  143.   /**
  144.    * Start the JobTracker with given configuration.
  145.    * 
  146.    * The conf will be modified to reflect the actual ports on which 
  147.    * the JobTracker is up and running if the user passes the port as
  148.    * <code>zero</code>.
  149.    *   
  150.    * @param conf configuration for the JobTracker.
  151.    * @throws IOException
  152.    */
  153.   public static JobTracker startTracker(JobConf conf
  154.                                         ) throws IOException,
  155.                                                  InterruptedException {
  156.     JobTracker result = null;
  157.     while (true) {
  158.       try {
  159.         result = new JobTracker(conf);
  160.         result.taskScheduler.setTaskTrackerManager(result);
  161.         break;
  162.       } catch (VersionMismatch e) {
  163.         throw e;
  164.       } catch (BindException e) {
  165.         throw e;
  166.       } catch (UnknownHostException e) {
  167.         throw e;
  168.       } catch (IOException e) {
  169.         LOG.warn("Error starting tracker: " + 
  170.                  StringUtils.stringifyException(e));
  171.       }
  172.       Thread.sleep(1000);
  173.     }
  174.     if (result != null) {
  175.       JobEndNotifier.startNotifier();
  176.     }
  177.     return result;
  178.   }
  179.   public void stopTracker() throws IOException {
  180.     JobEndNotifier.stopNotifier();
  181.     close();
  182.   }
  183.     
  184.   public long getProtocolVersion(String protocol, 
  185.                                  long clientVersion) throws IOException {
  186.     if (protocol.equals(InterTrackerProtocol.class.getName())) {
  187.       return InterTrackerProtocol.versionID;
  188.     } else if (protocol.equals(JobSubmissionProtocol.class.getName())){
  189.       return JobSubmissionProtocol.versionID;
  190.     } else if (protocol.equals(RefreshAuthorizationPolicyProtocol.class.getName())){
  191.       return RefreshAuthorizationPolicyProtocol.versionID;
  192.     } else {
  193.       throw new IOException("Unknown protocol to job tracker: " + protocol);
  194.     }
  195.   }
  196.   
  197.   /**
  198.    * A thread to timeout tasks that have been assigned to task trackers,
  199.    * but that haven't reported back yet.
  200.    * Note that I included a stop() method, even though there is no place
  201.    * where JobTrackers are cleaned up.
  202.    */
  203.   private class ExpireLaunchingTasks implements Runnable {
  204.     /**
  205.      * This is a map of the tasks that have been assigned to task trackers,
  206.      * but that have not yet been seen in a status report.
  207.      * map: task-id -> time-assigned 
  208.      */
  209.     private Map<TaskAttemptID, Long> launchingTasks =
  210.       new LinkedHashMap<TaskAttemptID, Long>();
  211.       
  212.     public void run() {
  213.       while (true) {
  214.         try {
  215.           // Every 3 minutes check for any tasks that are overdue
  216.           Thread.sleep(TASKTRACKER_EXPIRY_INTERVAL/3);
  217.           long now = System.currentTimeMillis();
  218.           LOG.debug("Starting launching task sweep");
  219.           synchronized (JobTracker.this) {
  220.             synchronized (launchingTasks) {
  221.               Iterator<Map.Entry<TaskAttemptID, Long>> itr =
  222.                 launchingTasks.entrySet().iterator();
  223.               while (itr.hasNext()) {
  224.                 Map.Entry<TaskAttemptID, Long> pair = itr.next();
  225.                 TaskAttemptID taskId = pair.getKey();
  226.                 long age = now - (pair.getValue()).longValue();
  227.                 LOG.info(taskId + " is " + age + " ms debug.");
  228.                 if (age > TASKTRACKER_EXPIRY_INTERVAL) {
  229.                   LOG.info("Launching task " + taskId + " timed out.");
  230.                   TaskInProgress tip = null;
  231.                   tip = taskidToTIPMap.get(taskId);
  232.                   if (tip != null) {
  233.                     JobInProgress job = tip.getJob();
  234.                     String trackerName = getAssignedTracker(taskId);
  235.                     TaskTrackerStatus trackerStatus = 
  236.                       getTaskTracker(trackerName);
  237.                     // This might happen when the tasktracker has already
  238.                     // expired and this thread tries to call failedtask
  239.                     // again. expire tasktracker should have called failed
  240.                     // task!
  241.                     if (trackerStatus != null)
  242.                       job.failedTask(tip, taskId, "Error launching task", 
  243.                                      tip.isMapTask()? TaskStatus.Phase.MAP:
  244.                                      TaskStatus.Phase.STARTING,
  245.                                      TaskStatus.State.FAILED,
  246.                                      trackerName);
  247.                   }
  248.                   itr.remove();
  249.                 } else {
  250.                   // the tasks are sorted by start time, so once we find
  251.                   // one that we want to keep, we are done for this cycle.
  252.                   break;
  253.                 }
  254.               }
  255.             }
  256.           }
  257.         } catch (InterruptedException ie) {
  258.           // all done
  259.           break;
  260.         } catch (Exception e) {
  261.           LOG.error("Expire Launching Task Thread got exception: " +
  262.                     StringUtils.stringifyException(e));
  263.         }
  264.       }
  265.     }
  266.       
  267.     public void addNewTask(TaskAttemptID taskName) {
  268.       synchronized (launchingTasks) {
  269.         launchingTasks.put(taskName, 
  270.                            System.currentTimeMillis());
  271.       }
  272.     }
  273.       
  274.     public void removeTask(TaskAttemptID taskName) {
  275.       synchronized (launchingTasks) {
  276.         launchingTasks.remove(taskName);
  277.       }
  278.     }
  279.   }
  280.     
  281.   ///////////////////////////////////////////////////////
  282.   // Used to expire TaskTrackers that have gone down
  283.   ///////////////////////////////////////////////////////
  284.   class ExpireTrackers implements Runnable {
  285.     public ExpireTrackers() {
  286.     }
  287.     /**
  288.      * The run method lives for the life of the JobTracker, and removes TaskTrackers
  289.      * that have not checked in for some time.
  290.      */
  291.     public void run() {
  292.       while (true) {
  293.         try {
  294.           //
  295.           // Thread runs periodically to check whether trackers should be expired.
  296.           // The sleep interval must be no more than half the maximum expiry time
  297.           // for a task tracker.
  298.           //
  299.           Thread.sleep(TASKTRACKER_EXPIRY_INTERVAL / 3);
  300.           //
  301.           // Loop through all expired items in the queue
  302.           //
  303.           // Need to lock the JobTracker here since we are
  304.           // manipulating it's data-structures via
  305.           // ExpireTrackers.run -> JobTracker.lostTaskTracker ->
  306.           // JobInProgress.failedTask -> JobTracker.markCompleteTaskAttempt
  307.           // Also need to lock JobTracker before locking 'taskTracker' &
  308.           // 'trackerExpiryQueue' to prevent deadlock:
  309.           // @see {@link JobTracker.processHeartbeat(TaskTrackerStatus, boolean)} 
  310.           synchronized (JobTracker.this) {
  311.             synchronized (taskTrackers) {
  312.               synchronized (trackerExpiryQueue) {
  313.                 long now = System.currentTimeMillis();
  314.                 TaskTrackerStatus leastRecent = null;
  315.                 while ((trackerExpiryQueue.size() > 0) &&
  316.                        ((leastRecent = trackerExpiryQueue.first()) != null) &&
  317.                        (now - leastRecent.getLastSeen() > TASKTRACKER_EXPIRY_INTERVAL)) {
  318.                         
  319.                   // Remove profile from head of queue
  320.                   trackerExpiryQueue.remove(leastRecent);
  321.                   String trackerName = leastRecent.getTrackerName();
  322.                         
  323.                   // Figure out if last-seen time should be updated, or if tracker is dead
  324.                   TaskTrackerStatus newProfile = taskTrackers.get(leastRecent.getTrackerName());
  325.                   // Items might leave the taskTracker set through other means; the
  326.                   // status stored in 'taskTrackers' might be null, which means the
  327.                   // tracker has already been destroyed.
  328.                   if (newProfile != null) {
  329.                     if (now - newProfile.getLastSeen() > TASKTRACKER_EXPIRY_INTERVAL) {
  330.                       // Remove completely after marking the tasks as 'KILLED'
  331.                       lostTaskTracker(leastRecent.getTrackerName());
  332.                       // tracker is lost, and if it is blacklisted, remove 
  333.                       // it from the count of blacklisted trackers in the cluster
  334.                       if (isBlacklisted(trackerName)) {
  335.                         faultyTrackers.numBlacklistedTrackers -= 1;
  336.                       }
  337.                       updateTaskTrackerStatus(trackerName, null);
  338.                     } else {
  339.                       // Update time by inserting latest profile
  340.                       trackerExpiryQueue.add(newProfile);
  341.                     }
  342.                   }
  343.                 }
  344.               }
  345.             }
  346.           }
  347.         } catch (InterruptedException iex) {
  348.           break;
  349.         } catch (Exception t) {
  350.           LOG.error("Tracker Expiry Thread got exception: " +
  351.                     StringUtils.stringifyException(t));
  352.         }
  353.       }
  354.     }
  355.         
  356.   }
  357.   ///////////////////////////////////////////////////////
  358.   // Used to remove old finished Jobs that have been around for too long
  359.   ///////////////////////////////////////////////////////
  360.   class RetireJobs implements Runnable {
  361.     public RetireJobs() {
  362.     }
  363.     /**
  364.      * The run method lives for the life of the JobTracker,
  365.      * and removes Jobs that are not still running, but which
  366.      * finished a long time ago.
  367.      */
  368.     public void run() {
  369.       while (true) {
  370.         try {
  371.           Thread.sleep(RETIRE_JOB_CHECK_INTERVAL);
  372.           List<JobInProgress> retiredJobs = new ArrayList<JobInProgress>();
  373.           long now = System.currentTimeMillis();
  374.           long retireBefore = now - RETIRE_JOB_INTERVAL;
  375.           synchronized (jobs) {
  376.             for(JobInProgress job: jobs.values()) {
  377.               if (job.getStatus().getRunState() != JobStatus.RUNNING &&
  378.                   job.getStatus().getRunState() != JobStatus.PREP &&
  379.                   (job.getFinishTime() + MIN_TIME_BEFORE_RETIRE < now) &&
  380.                   (job.getFinishTime()  < retireBefore)) {
  381.                 retiredJobs.add(job);
  382.               }
  383.             }
  384.           }
  385.           if (!retiredJobs.isEmpty()) {
  386.             synchronized (JobTracker.this) {
  387.               synchronized (jobs) {
  388.                 synchronized (taskScheduler) {
  389.                   for (JobInProgress job: retiredJobs) {
  390.                     removeJobTasks(job);
  391.                     jobs.remove(job.getProfile().getJobID());
  392.                     for (JobInProgressListener l : jobInProgressListeners) {
  393.                       l.jobRemoved(job);
  394.                     }
  395.                     String jobUser = job.getProfile().getUser();
  396.                     synchronized (userToJobsMap) {
  397.                       ArrayList<JobInProgress> userJobs =
  398.                         userToJobsMap.get(jobUser);
  399.                       synchronized (userJobs) {
  400.                         userJobs.remove(job);
  401.                       }
  402.                       if (userJobs.isEmpty()) {
  403.                         userToJobsMap.remove(jobUser);
  404.                       }
  405.                     }
  406.                     LOG.info("Retired job with id: '" + 
  407.                              job.getProfile().getJobID() + "' of user '" +
  408.                              jobUser + "'");
  409.                   }
  410.                 }
  411.               }
  412.             }
  413.           }
  414.         } catch (InterruptedException t) {
  415.           break;
  416.         } catch (Throwable t) {
  417.           LOG.error("Error in retiring job:n" +
  418.                     StringUtils.stringifyException(t));
  419.         }
  420.       }
  421.     }
  422.   }
  423.   
  424.   // The FaultInfo which indicates the number of faults of a tracker
  425.   // and when the last fault occurred
  426.   // and whether the tracker is blacklisted across all jobs or not
  427.   private static class FaultInfo {
  428.     int numFaults = 0;
  429.     long lastUpdated;
  430.     boolean blacklisted; 
  431.     FaultInfo() {
  432.       numFaults = 0;
  433.       lastUpdated = System.currentTimeMillis();
  434.       blacklisted = false;
  435.     }
  436.     void setFaultCount(int num) {
  437.       numFaults = num;
  438.     }
  439.     void setLastUpdated(long timeStamp) {
  440.       lastUpdated = timeStamp;
  441.     }
  442.     int getFaultCount() {
  443.       return numFaults;
  444.     }
  445.     long getLastUpdated() {
  446.       return lastUpdated;
  447.     }
  448.     
  449.     boolean isBlacklisted() {
  450.       return blacklisted;
  451.     }
  452.     
  453.     void setBlacklist(boolean blacklist) {
  454.       blacklisted = blacklist;
  455.     }
  456.   }
  457.   private class FaultyTrackersInfo {
  458.     // A map from hostName to its faults
  459.     private Map<String, FaultInfo> potentiallyFaultyTrackers = 
  460.               new HashMap<String, FaultInfo>();
  461.     // This count gives the number of blacklisted trackers in the cluster 
  462.     // at any time. This is maintained to avoid iteration over 
  463.     // the potentiallyFaultyTrackers to get blacklisted trackers. And also
  464.     // this count doesn't include blacklisted trackers which are lost, 
  465.     // although the fault info is maintained for lost trackers.  
  466.     private volatile int numBlacklistedTrackers = 0;
  467.     /**
  468.      * Increments faults(blacklist by job) for the tracker by one.
  469.      * 
  470.      * Adds the tracker to the potentially faulty list. 
  471.      * 
  472.      * @param hostName 
  473.      */
  474.     void incrementFaults(String hostName) {
  475.       synchronized (potentiallyFaultyTrackers) {
  476.         FaultInfo fi = potentiallyFaultyTrackers.get(hostName);
  477.         if (fi == null) {
  478.           fi = new FaultInfo();
  479.           potentiallyFaultyTrackers.put(hostName, fi);
  480.         }
  481.         int numFaults = fi.getFaultCount();
  482.         ++numFaults;
  483.         fi.setFaultCount(numFaults);
  484.         fi.setLastUpdated(System.currentTimeMillis());
  485.         if (!fi.isBlacklisted()) {
  486.           if (shouldBlacklist(hostName, numFaults)) {
  487.             LOG.info("Adding " + hostName + " to the blacklist" +
  488.                      " across all jobs");
  489.             removeHostCapacity(hostName);
  490.             fi.setBlacklist(true);
  491.           }
  492.         }
  493.       }        
  494.     }
  495.     /**
  496.      * Blacklists the tracker across all jobs if
  497.      * <ol>
  498.      * <li>#faults are more than 
  499.      *     MAX_BLACKLISTS_PER_TRACKER (configurable) blacklists</li>
  500.      * <li>#faults is 50% (configurable) above the average #faults</li>
  501.      * <li>50% the cluster is not blacklisted yet </li>
  502.      */
  503.     private boolean shouldBlacklist(String hostName, int numFaults) {
  504.       if (numFaults >= MAX_BLACKLISTS_PER_TRACKER) {
  505.         // calculate avgBlackLists
  506.         long clusterSize = getClusterStatus().getTaskTrackers();
  507.         long sum = 0;
  508.         for (FaultInfo f : potentiallyFaultyTrackers.values()) {
  509.           sum += f.getFaultCount();
  510.         }
  511.         double avg = (double) sum / clusterSize;
  512.             
  513.         long totalCluster = clusterSize + numBlacklistedTrackers;
  514.         if ((numFaults - avg) > (AVERAGE_BLACKLIST_THRESHOLD * avg) &&
  515.             numBlacklistedTrackers < (totalCluster * MAX_BLACKLIST_PERCENT)) {
  516.           return true;
  517.         }
  518.       }
  519.       return false;
  520.     }
  521.     
  522.     /**
  523.      * Removes the tracker from blacklist and
  524.      * from potentially faulty list, when it is restarted.
  525.      * 
  526.      * @param hostName
  527.      */
  528.     void markTrackerHealthy(String hostName) {
  529.       synchronized (potentiallyFaultyTrackers) {
  530.         FaultInfo fi = potentiallyFaultyTrackers.remove(hostName);
  531.         if (fi != null && fi.isBlacklisted()) {
  532.           LOG.info("Removing " + hostName + " from blacklist");
  533.           addHostCapacity(hostName);
  534.         }
  535.       }
  536.     }
  537.     /**
  538.      * Check whether tasks can be assigned to the tracker.
  539.      *
  540.      * One fault of the tracker is discarded if there
  541.      * are no faults during one day. So, the tracker will get a 
  542.      * chance again to run tasks of a job.
  543.      * 
  544.      * @param hostName The tracker name
  545.      * @param now The current time
  546.      * 
  547.      * @return true if the tracker is blacklisted 
  548.      *         false otherwise
  549.      */
  550.     boolean shouldAssignTasksToTracker(String hostName, long now) {
  551.       synchronized (potentiallyFaultyTrackers) {
  552.         FaultInfo fi = potentiallyFaultyTrackers.get(hostName);
  553.         if (fi != null &&
  554.             (now - fi.getLastUpdated()) > UPDATE_FAULTY_TRACKER_INTERVAL) {
  555.           int numFaults = fi.getFaultCount() - 1;
  556.           if (fi.isBlacklisted()) {
  557.             LOG.info("Removing " + hostName + " from blacklist");
  558.             addHostCapacity(hostName);
  559.             fi.setBlacklist(false);
  560.           }
  561.           if (numFaults > 0) {
  562.             fi.setFaultCount(numFaults);
  563.             fi.setLastUpdated(now);
  564.           } else {
  565.             potentiallyFaultyTrackers.remove(hostName);
  566.           }
  567.         }
  568.         return (fi != null && fi.isBlacklisted());
  569.       }
  570.     }
  571.     private void removeHostCapacity(String hostName) {
  572.       synchronized (taskTrackers) {
  573.         // remove the capacity of trackers on this host
  574.         for (TaskTrackerStatus status : getStatusesOnHost(hostName)) {
  575.           totalMapTaskCapacity -= status.getMaxMapTasks();
  576.           totalReduceTaskCapacity -= status.getMaxReduceTasks();
  577.         }
  578.         numBlacklistedTrackers +=
  579.           uniqueHostsMap.remove(hostName);
  580.       }
  581.     }
  582.     
  583.     // This is called on tracker's restart or after a day of blacklist.
  584.     private void addHostCapacity(String hostName) {
  585.       synchronized (taskTrackers) {
  586.         int numTrackersOnHost = 0;
  587.         // add the capacity of trackers on the host
  588.         for (TaskTrackerStatus status : getStatusesOnHost(hostName)) {
  589.           totalMapTaskCapacity += status.getMaxMapTasks();
  590.           totalReduceTaskCapacity += status.getMaxReduceTasks();
  591.           numTrackersOnHost++;
  592.         }
  593.         uniqueHostsMap.put(hostName,
  594.                            numTrackersOnHost);
  595.         numBlacklistedTrackers -= numTrackersOnHost;
  596.       }
  597.     }
  598.     /**
  599.      * Whether a host is blacklisted across all the jobs. 
  600.      * 
  601.      * @param hostName
  602.      * @return
  603.      */
  604.     boolean isBlacklisted(String hostName) {
  605.       synchronized (potentiallyFaultyTrackers) {
  606.         FaultInfo fi = null;
  607.         if ((fi = potentiallyFaultyTrackers.get(hostName)) != null) {
  608.           return fi.isBlacklisted();
  609.         }
  610.       }
  611.       return false;
  612.     }
  613.     
  614.     int getFaultCount(String hostName) {
  615.       synchronized (potentiallyFaultyTrackers) {
  616.         FaultInfo fi = null;
  617.         if ((fi = potentiallyFaultyTrackers.get(hostName)) != null) {
  618.           return fi.getFaultCount();
  619.         }
  620.       }
  621.       return 0;
  622.     }
  623.   }
  624.   
  625.   /**
  626.    * Get all task tracker statuses on given host
  627.    * 
  628.    * @param hostName
  629.    * @return {@link java.util.List} of {@link TaskTrackerStatus}
  630.    */
  631.   private List<TaskTrackerStatus> getStatusesOnHost(String hostName) {
  632.     List<TaskTrackerStatus> statuses = new ArrayList<TaskTrackerStatus>();
  633.     synchronized (taskTrackers) {
  634.       for (TaskTrackerStatus status : taskTrackers.values()) {
  635.         if (hostName.equals(status.getHost())) {
  636.           statuses.add(status);
  637.         }
  638.       }
  639.     }
  640.     return statuses;
  641.   }
  642.   
  643.   ///////////////////////////////////////////////////////
  644.   // Used to recover the jobs upon restart
  645.   ///////////////////////////////////////////////////////
  646.   class RecoveryManager {
  647.     Set<JobID> jobsToRecover; // set of jobs to be recovered
  648.     
  649.     private int totalEventsRecovered = 0;
  650.     private int restartCount = 0;
  651.     private boolean shouldRecover = false;
  652.     Set<String> recoveredTrackers = 
  653.       Collections.synchronizedSet(new HashSet<String>());
  654.     
  655.     /** A custom listener that replays the events in the order in which the 
  656.      * events (task attempts) occurred. 
  657.      */
  658.     class JobRecoveryListener implements Listener {
  659.       // The owner job
  660.       private JobInProgress jip;
  661.       
  662.       private JobHistory.JobInfo job; // current job's info object
  663.       
  664.       // Maintain the count of the (attempt) events recovered
  665.       private int numEventsRecovered = 0;
  666.       
  667.       // Maintains open transactions
  668.       private Map<String, String> hangingAttempts = 
  669.         new HashMap<String, String>();
  670.       
  671.       // Whether there are any updates for this job
  672.       private boolean hasUpdates = false;
  673.       
  674.       public JobRecoveryListener(JobInProgress jip) {
  675.         this.jip = jip;
  676.         this.job = new JobHistory.JobInfo(jip.getJobID().toString());
  677.       }
  678.       /**
  679.        * Process a task. Note that a task might commit a previously pending 
  680.        * transaction.
  681.        */
  682.       private void processTask(String taskId, JobHistory.Task task) {
  683.         // Any TASK info commits the previous transaction
  684.         boolean hasHanging = hangingAttempts.remove(taskId) != null;
  685.         if (hasHanging) {
  686.           numEventsRecovered += 2;
  687.         }
  688.         
  689.         TaskID id = TaskID.forName(taskId);
  690.         TaskInProgress tip = getTip(id);
  691.         
  692.         updateTip(tip, task);
  693.       }
  694.       /**
  695.        * Adds a task-attempt in the listener
  696.        */
  697.       private void processTaskAttempt(String taskAttemptId, 
  698.                                       JobHistory.TaskAttempt attempt) {
  699.         TaskAttemptID id = TaskAttemptID.forName(taskAttemptId);
  700.         
  701.         // Check if the transaction for this attempt can be committed
  702.         String taskStatus = attempt.get(Keys.TASK_STATUS);
  703.         
  704.         if (taskStatus.length() > 0) {
  705.           // This means this is an update event
  706.           if (taskStatus.equals(Values.SUCCESS.name())) {
  707.             // Mark this attempt as hanging
  708.             hangingAttempts.put(id.getTaskID().toString(), taskAttemptId);
  709.             addSuccessfulAttempt(jip, id, attempt);
  710.           } else {
  711.             addUnsuccessfulAttempt(jip, id, attempt);
  712.             numEventsRecovered += 2;
  713.           }
  714.         } else {
  715.           createTaskAttempt(jip, id, attempt);
  716.         }
  717.       }
  718.       public void handle(JobHistory.RecordTypes recType, Map<Keys, 
  719.                          String> values) throws IOException {
  720.         if (recType == JobHistory.RecordTypes.Job) {
  721.           // Update the meta-level job information
  722.           job.handle(values);
  723.           
  724.           // Forcefully init the job as we have some updates for it
  725.           checkAndInit();
  726.         } else if (recType.equals(JobHistory.RecordTypes.Task)) {
  727.           String taskId = values.get(Keys.TASKID);
  728.           
  729.           // Create a task
  730.           JobHistory.Task task = new JobHistory.Task();
  731.           task.handle(values);
  732.           
  733.           // Ignore if its a cleanup task
  734.           if (isCleanup(task)) {
  735.             return;
  736.           }
  737.             
  738.           // Process the task i.e update the tip state
  739.           processTask(taskId, task);
  740.         } else if (recType.equals(JobHistory.RecordTypes.MapAttempt)) {
  741.           String attemptId = values.get(Keys.TASK_ATTEMPT_ID);
  742.           
  743.           // Create a task attempt
  744.           JobHistory.MapAttempt attempt = new JobHistory.MapAttempt();
  745.           attempt.handle(values);
  746.           
  747.           // Ignore if its a cleanup task
  748.           if (isCleanup(attempt)) {
  749.             return;
  750.           }
  751.           
  752.           // Process the attempt i.e update the attempt state via job
  753.           processTaskAttempt(attemptId, attempt);
  754.         } else if (recType.equals(JobHistory.RecordTypes.ReduceAttempt)) {
  755.           String attemptId = values.get(Keys.TASK_ATTEMPT_ID);
  756.           
  757.           // Create a task attempt
  758.           JobHistory.ReduceAttempt attempt = new JobHistory.ReduceAttempt();
  759.           attempt.handle(values);
  760.           
  761.           // Ignore if its a cleanup task
  762.           if (isCleanup(attempt)) {
  763.             return;
  764.           }
  765.           
  766.           // Process the attempt i.e update the job state via job
  767.           processTaskAttempt(attemptId, attempt);
  768.         }
  769.       }
  770.       // Check if the task is of type CLEANUP
  771.       private boolean isCleanup(JobHistory.Task task) {
  772.         String taskType = task.get(Keys.TASK_TYPE);
  773.         return Values.CLEANUP.name().equals(taskType);
  774.       }
  775.       
  776.       // Init the job if its ready for init. Also make sure that the scheduler
  777.       // is updated
  778.       private void checkAndInit() throws IOException {
  779.         String jobStatus = this.job.get(Keys.JOB_STATUS);
  780.         if (Values.PREP.name().equals(jobStatus)) {
  781.           hasUpdates = true;
  782.           LOG.info("Calling init from RM for job " + jip.getJobID().toString());
  783.           try {
  784.             jip.initTasks();
  785.           } catch (Throwable t) {
  786.             LOG.error("Job initialization failed : n" 
  787.                       + StringUtils.stringifyException(t));
  788.             jip.fail(); // fail the job
  789.             throw new IOException(t);
  790.           }
  791.         }
  792.       }
  793.       
  794.       void close() {
  795.         if (hasUpdates) {
  796.           // Apply the final (job-level) updates
  797.           JobStatusChangeEvent event = updateJob(jip, job);
  798.           
  799.           synchronized (JobTracker.this) {
  800.             // Update the job listeners
  801.             updateJobInProgressListeners(event);
  802.           }
  803.         }
  804.       }
  805.       
  806.       public int getNumEventsRecovered() {
  807.         return numEventsRecovered;
  808.       }
  809.     }
  810.     
  811.     public RecoveryManager() {
  812.       jobsToRecover = new TreeSet<JobID>();
  813.     }
  814.     public boolean contains(JobID id) {
  815.       return jobsToRecover.contains(id);
  816.     }
  817.     void addJobForRecovery(JobID id) {
  818.       jobsToRecover.add(id);
  819.     }
  820.     public boolean shouldRecover() {
  821.       return shouldRecover;
  822.     }
  823.     public boolean shouldSchedule() {
  824.       return recoveredTrackers.isEmpty();
  825.     }
  826.     private void markTracker(String trackerName) {
  827.       recoveredTrackers.add(trackerName);
  828.     }
  829.     void unMarkTracker(String trackerName) {
  830.       recoveredTrackers.remove(trackerName);
  831.     }
  832.     Set<JobID> getJobsToRecover() {
  833.       return jobsToRecover;
  834.     }
  835.     /** Check if the given string represents a job-id or not 
  836.      */
  837.     private boolean isJobNameValid(String str) {
  838.       if(str == null) {
  839.         return false;
  840.       }
  841.       String[] parts = str.split("_");
  842.       if(parts.length == 3) {
  843.         if(parts[0].equals("job")) {
  844.             // other 2 parts should be parseable
  845.             return JobTracker.validateIdentifier(parts[1])
  846.                    && JobTracker.validateJobNumber(parts[2]);
  847.         }
  848.       }
  849.       return false;
  850.     }
  851.     
  852.     // checks if the job dir has the required files
  853.     public void checkAndAddJob(FileStatus status) throws IOException {
  854.       String fileName = status.getPath().getName();
  855.       if (isJobNameValid(fileName)) {
  856.         if (JobClient.isJobDirValid(status.getPath(), fs)) {
  857.           recoveryManager.addJobForRecovery(JobID.forName(fileName));
  858.           shouldRecover = true; // enable actual recovery if num-files > 1
  859.         } else {
  860.           LOG.info("Found an incomplete job directory " + fileName + "." 
  861.                    + " Deleting it!!");
  862.           fs.delete(status.getPath(), true);
  863.         }
  864.       }
  865.     }
  866.     
  867.     private JobStatusChangeEvent updateJob(JobInProgress jip, 
  868.                                            JobHistory.JobInfo job) {
  869.       // Change the job priority
  870.       String jobpriority = job.get(Keys.JOB_PRIORITY);
  871.       JobPriority priority = JobPriority.valueOf(jobpriority);
  872.       // It's important to update this via the jobtracker's api as it will 
  873.       // take care of updating the event listeners too
  874.       setJobPriority(jip.getJobID(), priority);
  875.       // Save the previous job status
  876.       JobStatus oldStatus = (JobStatus)jip.getStatus().clone();
  877.       
  878.       // Set the start/launch time only if there are recovered tasks
  879.       // Increment the job's restart count
  880.       jip.updateJobInfo(job.getLong(JobHistory.Keys.SUBMIT_TIME), 
  881.                         job.getLong(JobHistory.Keys.LAUNCH_TIME));
  882.       // Save the new job status
  883.       JobStatus newStatus = (JobStatus)jip.getStatus().clone();
  884.       
  885.       return new JobStatusChangeEvent(jip, EventType.START_TIME_CHANGED, oldStatus, 
  886.                                       newStatus);
  887.     }
  888.     
  889.     private void updateTip(TaskInProgress tip, JobHistory.Task task) {
  890.       long startTime = task.getLong(Keys.START_TIME);
  891.       if (startTime != 0) {
  892.         tip.setExecStartTime(startTime);
  893.       }
  894.       
  895.       long finishTime = task.getLong(Keys.FINISH_TIME);
  896.       // For failed tasks finish-time will be missing
  897.       if (finishTime != 0) {
  898.         tip.setExecFinishTime(finishTime);
  899.       }
  900.       
  901.       String cause = task.get(Keys.TASK_ATTEMPT_ID);
  902.       if (cause.length() > 0) {
  903.         // This means that the this is a FAILED events
  904.         TaskAttemptID id = TaskAttemptID.forName(cause);
  905.         TaskStatus status = tip.getTaskStatus(id);
  906.         synchronized (JobTracker.this) {
  907.           // This will add the tip failed event in the new log
  908.           tip.getJob().failedTask(tip, id, status.getDiagnosticInfo(), 
  909.                                   status.getPhase(), status.getRunState(), 
  910.                                   status.getTaskTracker());
  911.         }
  912.       }
  913.     }
  914.     
  915.     private void createTaskAttempt(JobInProgress job, 
  916.                                    TaskAttemptID attemptId, 
  917.                                    JobHistory.TaskAttempt attempt) {
  918.       TaskID id = attemptId.getTaskID();
  919.       String type = attempt.get(Keys.TASK_TYPE);
  920.       TaskInProgress tip = job.getTaskInProgress(id);
  921.       
  922.       //    I. Get the required info
  923.       TaskStatus taskStatus = null;
  924.       String trackerName = attempt.get(Keys.TRACKER_NAME);
  925.       String trackerHostName = 
  926.         JobInProgress.convertTrackerNameToHostName(trackerName);
  927.       // recover the port information.
  928.       int port = 0; // default to 0
  929.       String hport = attempt.get(Keys.HTTP_PORT);
  930.       if (hport != null && hport.length() > 0) {
  931.         port = attempt.getInt(Keys.HTTP_PORT);
  932.       }
  933.       
  934.       long attemptStartTime = attempt.getLong(Keys.START_TIME);
  935.       // II. Create the (appropriate) task status
  936.       if (type.equals(Values.MAP.name())) {
  937.         taskStatus = 
  938.           new MapTaskStatus(attemptId, 0.0f, TaskStatus.State.RUNNING, 
  939.                             "", "", trackerName, TaskStatus.Phase.MAP, 
  940.                             new Counters());
  941.       } else {
  942.         taskStatus = 
  943.           new ReduceTaskStatus(attemptId, 0.0f, TaskStatus.State.RUNNING, 
  944.                                "", "", trackerName, TaskStatus.Phase.REDUCE, 
  945.                                new Counters());
  946.       }
  947.       // Set the start time
  948.       taskStatus.setStartTime(attemptStartTime);
  949.       List<TaskStatus> ttStatusList = new ArrayList<TaskStatus>();
  950.       ttStatusList.add(taskStatus);
  951.       
  952.       // III. Create the dummy tasktracker status
  953.       TaskTrackerStatus ttStatus = 
  954.         new TaskTrackerStatus(trackerName, trackerHostName, port, ttStatusList, 
  955.                               0 , 0, 0);
  956.       ttStatus.setLastSeen(System.currentTimeMillis());
  957.       synchronized (JobTracker.this) {
  958.         synchronized (taskTrackers) {
  959.           synchronized (trackerExpiryQueue) {
  960.             // IV. Register a new tracker
  961.             boolean isTrackerRegistered = getTaskTracker(trackerName) != null;
  962.             if (!isTrackerRegistered) {
  963.               markTracker(trackerName); // add the tracker to recovery-manager
  964.               addNewTracker(ttStatus);
  965.             }
  966.       
  967.             // V. Update the tracker status
  968.             // This will update the meta info of the jobtracker and also add the
  969.             // tracker status if missing i.e register it
  970.             updateTaskTrackerStatus(trackerName, ttStatus);
  971.           }
  972.         }
  973.         // Register the attempt with job and tip, under JobTracker lock. 
  974.         // Since, as of today they are atomic through heartbeat.
  975.         // VI. Register the attempt
  976.         //   a) In the job
  977.         job.addRunningTaskToTIP(tip, attemptId, ttStatus, false);
  978.         //   b) In the tip
  979.         tip.updateStatus(taskStatus);
  980.       }
  981.       
  982.       // VII. Make an entry in the launched tasks
  983.       expireLaunchingTasks.addNewTask(attemptId);
  984.     }
  985.     
  986.     private void addSuccessfulAttempt(JobInProgress job, 
  987.                                       TaskAttemptID attemptId, 
  988.                                       JobHistory.TaskAttempt attempt) {
  989.       // I. Get the required info
  990.       TaskID taskId = attemptId.getTaskID();
  991.       String type = attempt.get(Keys.TASK_TYPE);
  992.       TaskInProgress tip = job.getTaskInProgress(taskId);
  993.       long attemptFinishTime = attempt.getLong(Keys.FINISH_TIME);
  994.       // Get the task status and the tracker name and make a copy of it
  995.       TaskStatus taskStatus = (TaskStatus)tip.getTaskStatus(attemptId).clone();
  996.       taskStatus.setFinishTime(attemptFinishTime);
  997.       String stateString = attempt.get(Keys.STATE_STRING);
  998.       // Update the basic values
  999.       taskStatus.setStateString(stateString);
  1000.       taskStatus.setProgress(1.0f);
  1001.       taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
  1002.       // Set the shuffle/sort finished times
  1003.       if (type.equals(Values.REDUCE.name())) {
  1004.         long shuffleTime = 
  1005.           Long.parseLong(attempt.get(Keys.SHUFFLE_FINISHED));
  1006.         long sortTime = 
  1007.           Long.parseLong(attempt.get(Keys.SORT_FINISHED));
  1008.         taskStatus.setShuffleFinishTime(shuffleTime);
  1009.         taskStatus.setSortFinishTime(sortTime);
  1010.       }
  1011.       // Add the counters
  1012.       String counterString = attempt.get(Keys.COUNTERS);
  1013.       Counters counter = null;
  1014.       //TODO Check if an exception should be thrown
  1015.       try {
  1016.         counter = Counters.fromEscapedCompactString(counterString);
  1017.       } catch (ParseException pe) { 
  1018.         counter = new Counters(); // Set it to empty counter
  1019.       }
  1020.       taskStatus.setCounters(counter);
  1021.       
  1022.       synchronized (JobTracker.this) {
  1023.         // II. Replay the status
  1024.         job.updateTaskStatus(tip, taskStatus);
  1025.       }
  1026.       
  1027.       // III. Prevent the task from expiry
  1028.       expireLaunchingTasks.removeTask(attemptId);
  1029.     }
  1030.     
  1031.     private void addUnsuccessfulAttempt(JobInProgress job,
  1032.                                         TaskAttemptID attemptId,
  1033.                                         JobHistory.TaskAttempt attempt) {
  1034.       // I. Get the required info
  1035.       TaskID taskId = attemptId.getTaskID();
  1036.       TaskInProgress tip = job.getTaskInProgress(taskId);
  1037.       long attemptFinishTime = attempt.getLong(Keys.FINISH_TIME);
  1038.       TaskStatus taskStatus = (TaskStatus)tip.getTaskStatus(attemptId).clone();
  1039.       taskStatus.setFinishTime(attemptFinishTime);
  1040.       // Reset the progress
  1041.       taskStatus.setProgress(0.0f);
  1042.       
  1043.       String stateString = attempt.get(Keys.STATE_STRING);
  1044.       taskStatus.setStateString(stateString);
  1045.       boolean hasFailed = 
  1046.         attempt.get(Keys.TASK_STATUS).equals(Values.FAILED.name());
  1047.       // Set the state failed/killed
  1048.       if (hasFailed) {
  1049.         taskStatus.setRunState(TaskStatus.State.FAILED);
  1050.       } else {
  1051.         taskStatus.setRunState(TaskStatus.State.KILLED);
  1052.       }
  1053.       // Get/Set the error msg
  1054.       String diagInfo = attempt.get(Keys.ERROR);
  1055.       taskStatus.setDiagnosticInfo(diagInfo); // diag info
  1056.       synchronized (JobTracker.this) {
  1057.         // II. Update the task status
  1058.         job.updateTaskStatus(tip, taskStatus);
  1059.       }
  1060.      // III. Prevent the task from expiry
  1061.      expireLaunchingTasks.removeTask(attemptId);
  1062.     }
  1063.   
  1064.     Path getRestartCountFile() {
  1065.       return new Path(getSystemDir(), "jobtracker.info");
  1066.     }
  1067.     Path getTempRestartCountFile() {
  1068.       return new Path(getSystemDir(), "jobtracker.info.recover");
  1069.     }
  1070.     /**
  1071.      * Initialize the recovery process. It simply creates a jobtracker.info file
  1072.      * in the jobtracker's system directory and writes its restart count in it.
  1073.      * For the first start, the jobtracker writes '0' in it. Upon subsequent 
  1074.      * restarts the jobtracker replaces the count with its current count which 
  1075.      * is (old count + 1). The whole purpose of this api is to obtain restart 
  1076.      * counts across restarts to avoid attempt-id clashes.
  1077.      * 
  1078.      * Note that in between if the jobtracker.info files goes missing then the
  1079.      * jobtracker will disable recovery and continue. 
  1080.      *  
  1081.      */
  1082.     void updateRestartCount() throws IOException {
  1083.       Path restartFile = getRestartCountFile();
  1084.       Path tmpRestartFile = getTempRestartCountFile();
  1085.       FileSystem fs = restartFile.getFileSystem(conf);
  1086.       FsPermission filePerm = new FsPermission(SYSTEM_FILE_PERMISSION);
  1087.       // read the count from the jobtracker info file
  1088.       if (fs.exists(restartFile)) {
  1089.         fs.delete(tmpRestartFile, false); // delete the tmp file
  1090.       } else if (fs.exists(tmpRestartFile)) {
  1091.         // if .rec exists then delete the main file and rename the .rec to main
  1092.         fs.rename(tmpRestartFile, restartFile); // rename .rec to main file
  1093.       } else {
  1094.         // For the very first time the jobtracker will create a jobtracker.info
  1095.         // file. If the jobtracker has restarted then disable recovery as files'
  1096.         // needed for recovery are missing.
  1097.         // disable recovery if this is a restart
  1098.         shouldRecover = false;
  1099.         // write the jobtracker.info file
  1100.         FSDataOutputStream out = FileSystem.create(fs, restartFile, filePerm);
  1101.         out.writeInt(0);
  1102.         out.close();
  1103.         return;
  1104.       }
  1105.       FSDataInputStream in = fs.open(restartFile);
  1106.       // read the old count
  1107.       restartCount = in.readInt();
  1108.       ++restartCount; // increment the restart count
  1109.       in.close();
  1110.       // Write back the new restart count and rename the old info file
  1111.       //TODO This is similar to jobhistory recovery, maybe this common code
  1112.       //      can be factored out.
  1113.       
  1114.       // write to the tmp file
  1115.       FSDataOutputStream out = FileSystem.create(fs, tmpRestartFile, filePerm);
  1116.       out.writeInt(restartCount);
  1117.       out.close();
  1118.       // delete the main file
  1119.       fs.delete(restartFile, false);
  1120.       
  1121.       // rename the .rec to main file
  1122.       fs.rename(tmpRestartFile, restartFile);
  1123.     }
  1124.     public void recover() {
  1125.       if (!shouldRecover()) {
  1126.         // clean up jobs structure
  1127.         jobsToRecover.clear();
  1128.         return;
  1129.       }
  1130.       LOG.info("Restart count of the jobtracker : " + restartCount);
  1131.       // I. Init the jobs and cache the recovered job history filenames
  1132.       Map<JobID, Path> jobHistoryFilenameMap = new HashMap<JobID, Path>();
  1133.       Iterator<JobID> idIter = jobsToRecover.iterator();
  1134.       while (idIter.hasNext()) {
  1135.         JobID id = idIter.next();
  1136.         LOG.info("Trying to recover details of job " + id);
  1137.         try {
  1138.           // 1. Create the job object
  1139.           JobInProgress job = 
  1140.             new JobInProgress(id, JobTracker.this, conf, restartCount);
  1141.           // 2. Check if the user has appropriate access
  1142.           // Get the user group info for the job's owner
  1143.           UserGroupInformation ugi =
  1144.             UserGroupInformation.readFrom(job.getJobConf());
  1145.           LOG.info("Submitting job " + id + " on behalf of user "
  1146.                    + ugi.getUserName() + " in groups : "
  1147.                    + StringUtils.arrayToString(ugi.getGroupNames()));
  1148.           // check the access
  1149.           try {
  1150.             checkAccess(job, QueueManager.QueueOperation.SUBMIT_JOB, ugi);
  1151.           } catch (Throwable t) {
  1152.             LOG.warn("Access denied for user " + ugi.getUserName() 
  1153.                      + " in groups : [" 
  1154.                      + StringUtils.arrayToString(ugi.getGroupNames()) + "]");
  1155.             throw t;
  1156.           }
  1157.           // 3. Get the log file and the file path
  1158.           String logFileName = 
  1159.             JobHistory.JobInfo.getJobHistoryFileName(job.getJobConf(), id);
  1160.           Path jobHistoryFilePath = 
  1161.             JobHistory.JobInfo.getJobHistoryLogLocation(logFileName);
  1162.           // 4. Recover the history file. This involved
  1163.           //     - deleting file.recover if file exists
  1164.           //     - renaming file.recover to file if file doesnt exist
  1165.           // This makes sure that the (master) file exists
  1166.           JobHistory.JobInfo.recoverJobHistoryFile(job.getJobConf(), 
  1167.                                                    jobHistoryFilePath);
  1168.           
  1169.           // 5. Cache the history file name as it costs one dfs access
  1170.           jobHistoryFilenameMap.put(job.getJobID(), jobHistoryFilePath);
  1171.           // 6. Sumbit the job to the jobtracker
  1172.           addJob(id, job);
  1173.         } catch (Throwable t) {
  1174.           LOG.warn("Failed to recover job " + id + " Ignoring the job.", t);
  1175.           idIter.remove();
  1176.           continue;
  1177.         }
  1178.       }
  1179.       long recoveryStartTime = System.currentTimeMillis();
  1180.       // II. Recover each job
  1181.       idIter = jobsToRecover.iterator();
  1182.       while (idIter.hasNext()) {
  1183.         JobID id = idIter.next();
  1184.         JobInProgress pJob = getJob(id);
  1185.         // 1. Get the required info
  1186.         // Get the recovered history file
  1187.         Path jobHistoryFilePath = jobHistoryFilenameMap.get(pJob.getJobID());
  1188.         String logFileName = jobHistoryFilePath.getName();
  1189.         FileSystem fs;
  1190.         try {
  1191.           fs = jobHistoryFilePath.getFileSystem(conf);
  1192.         } catch (IOException ioe) {
  1193.           LOG.warn("Failed to get the filesystem for job " + id + ". Ignoring.",
  1194.                    ioe);
  1195.           continue;
  1196.         }
  1197.         // 2. Parse the history file
  1198.         // Note that this also involves job update
  1199.         JobRecoveryListener listener = new JobRecoveryListener(pJob);
  1200.         try {
  1201.           JobHistory.parseHistoryFromFS(jobHistoryFilePath.toString(), 
  1202.                                         listener, fs);
  1203.         } catch (Throwable t) {
  1204.           LOG.info("Error reading history file of job " + pJob.getJobID() 
  1205.                    + ". Ignoring the error and continuing.", t);
  1206.         }
  1207.         // 3. Close the listener
  1208.         listener.close();
  1209.         
  1210.         // 4. Update the recovery metric
  1211.         totalEventsRecovered += listener.getNumEventsRecovered();
  1212.         // 5. Cleanup history
  1213.         // Delete the master log file as an indication that the new file
  1214.         // should be used in future
  1215.         try {
  1216.           synchronized (pJob) {
  1217.             JobHistory.JobInfo.checkpointRecovery(logFileName, 
  1218.                                                   pJob.getJobConf());
  1219.           }
  1220.         } catch (Throwable t) {
  1221.           LOG.warn("Failed to delete log file (" + logFileName + ") for job " 
  1222.                    + id + ". Continuing.", t);
  1223.         }
  1224.         if (pJob.isComplete()) {
  1225.           idIter.remove(); // no need to keep this job info as its successful
  1226.         }
  1227.       }
  1228.       recoveryDuration = System.currentTimeMillis() - recoveryStartTime;
  1229.       hasRecovered = true;
  1230.       // III. Finalize the recovery
  1231.       synchronized (trackerExpiryQueue) {
  1232.         // Make sure that the tracker statuses in the expiry-tracker queue
  1233.         // are updated
  1234.         long now = System.currentTimeMillis();
  1235.         int size = trackerExpiryQueue.size();
  1236.         for (int i = 0; i < size ; ++i) {
  1237.           // Get the first status
  1238.           TaskTrackerStatus status = trackerExpiryQueue.first();
  1239.           // Remove it
  1240.           trackerExpiryQueue.remove(status);
  1241.           // Set the new time
  1242.           status.setLastSeen(now);
  1243.           // Add back to get the sorted list
  1244.           trackerExpiryQueue.add(status);
  1245.         }
  1246.       }
  1247.       LOG.info("Restoration complete");
  1248.     }
  1249.     
  1250.     int totalEventsRecovered() {
  1251.       return totalEventsRecovered;
  1252.     }
  1253.   }
  1254.   private final JobTrackerInstrumentation myInstrumentation;
  1255.     
  1256.   /////////////////////////////////////////////////////////////////
  1257.   // The real JobTracker
  1258.   ////////////////////////////////////////////////////////////////
  1259.   int port;
  1260.   String localMachine;
  1261.   private String trackerIdentifier;
  1262.   long startTime;
  1263.   int totalSubmissions = 0;
  1264.   private int totalMapTaskCapacity;
  1265.   private int totalReduceTaskCapacity;
  1266.   private HostsFileReader hostsReader;
  1267.   
  1268.   // JobTracker recovery variables
  1269.   private volatile boolean hasRestarted = false;
  1270.   private volatile boolean hasRecovered = false;
  1271.   private volatile long recoveryDuration;
  1272.   //
  1273.   // Properties to maintain while running Jobs and Tasks:
  1274.   //
  1275.   // 1.  Each Task is always contained in a single Job.  A Job succeeds when all its 
  1276.   //     Tasks are complete.
  1277.   //
  1278.   // 2.  Every running or successful Task is assigned to a Tracker.  Idle Tasks are not.
  1279.   //
  1280.   // 3.  When a Tracker fails, all of its assigned Tasks are marked as failures.
  1281.   //
  1282.   // 4.  A Task might need to be reexecuted if it (or the machine it's hosted on) fails
  1283.   //     before the Job is 100% complete.  Sometimes an upstream Task can fail without
  1284.   //     reexecution if all downstream Tasks that require its output have already obtained
  1285.   //     the necessary files.
  1286.   //
  1287.   // All the known jobs.  (jobid->JobInProgress)
  1288.   Map<JobID, JobInProgress> jobs = new TreeMap<JobID, JobInProgress>();
  1289.   // (user -> list of JobInProgress)
  1290.   TreeMap<String, ArrayList<JobInProgress>> userToJobsMap =
  1291.     new TreeMap<String, ArrayList<JobInProgress>>();
  1292.     
  1293.   // (trackerID --> list of jobs to cleanup)
  1294.   Map<String, Set<JobID>> trackerToJobsToCleanup = 
  1295.     new HashMap<String, Set<JobID>>();
  1296.   
  1297.   // All the known TaskInProgress items, mapped to by taskids (taskid->TIP)
  1298.   Map<TaskAttemptID, TaskInProgress> taskidToTIPMap =
  1299.     new TreeMap<TaskAttemptID, TaskInProgress>();
  1300.   // (taskid --> trackerID) 
  1301.   TreeMap<TaskAttemptID, String> taskidToTrackerMap = new TreeMap<TaskAttemptID, String>();
  1302.   // (trackerID->TreeSet of taskids running at that tracker)
  1303.   TreeMap<String, Set<TaskAttemptID>> trackerToTaskMap =
  1304.     new TreeMap<String, Set<TaskAttemptID>>();
  1305.   // (trackerID -> TreeSet of completed taskids running at that tracker)
  1306.   TreeMap<String, Set<TaskAttemptID>> trackerToMarkedTasksMap =
  1307.     new TreeMap<String, Set<TaskAttemptID>>();
  1308.   // (trackerID --> last sent HeartBeatResponse)
  1309.   Map<String, HeartbeatResponse> trackerToHeartbeatResponseMap = 
  1310.     new TreeMap<String, HeartbeatResponse>();
  1311.   // (hostname --> Node (NetworkTopology))
  1312.   Map<String, Node> hostnameToNodeMap = 
  1313.     Collections.synchronizedMap(new TreeMap<String, Node>());
  1314.   
  1315.   // Number of resolved entries
  1316.   int numResolved;
  1317.     
  1318.   private FaultyTrackersInfo faultyTrackers = new FaultyTrackersInfo();
  1319.   
  1320.   //
  1321.   // Watch and expire TaskTracker objects using these structures.
  1322.   // We can map from Name->TaskTrackerStatus, or we can expire by time.
  1323.   //
  1324.   int totalMaps = 0;
  1325.   int totalReduces = 0;
  1326.   private HashMap<String, TaskTrackerStatus> taskTrackers =
  1327.     new HashMap<String, TaskTrackerStatus>();
  1328.   Map<String,Integer>uniqueHostsMap = new ConcurrentHashMap<String, Integer>();
  1329.   ExpireTrackers expireTrackers = new ExpireTrackers();
  1330.   Thread expireTrackersThread = null;
  1331.   RetireJobs retireJobs = new RetireJobs();
  1332.   Thread retireJobsThread = null;
  1333.   ExpireLaunchingTasks expireLaunchingTasks = new ExpireLaunchingTasks();
  1334.   Thread expireLaunchingTaskThread = new Thread(expireLaunchingTasks,
  1335.                                                 "expireLaunchingTasks");
  1336.   CompletedJobStatusStore completedJobStatusStore = null;
  1337.   Thread completedJobsStoreThread = null;
  1338.   RecoveryManager recoveryManager;
  1339.   /**
  1340.    * It might seem like a bug to maintain a TreeSet of status objects,
  1341.    * which can be updated at any time.  But that's not what happens!  We
  1342.    * only update status objects in the taskTrackers table.  Status objects
  1343.    * are never updated once they enter the expiry queue.  Instead, we wait
  1344.    * for them to expire and remove them from the expiry queue.  If a status
  1345.    * object has been updated in the taskTracker table, the latest status is 
  1346.    * reinserted.  Otherwise, we assume the tracker has expired.
  1347.    */
  1348.   TreeSet<TaskTrackerStatus> trackerExpiryQueue =
  1349.     new TreeSet<TaskTrackerStatus>(
  1350.                                    new Comparator<TaskTrackerStatus>() {
  1351.                                      public int compare(TaskTrackerStatus p1, TaskTrackerStatus p2) {
  1352.                                        if (p1.getLastSeen() < p2.getLastSeen()) {
  1353.                                          return -1;
  1354.                                        } else if (p1.getLastSeen() > p2.getLastSeen()) {
  1355.                                          return 1;
  1356.                                        } else {
  1357.                                          return (p1.getTrackerName().compareTo(p2.getTrackerName()));
  1358.                                        }
  1359.                                      }
  1360.                                    }
  1361.                                    );
  1362.   // Used to provide an HTML view on Job, Task, and TaskTracker structures
  1363.   final HttpServer infoServer;
  1364.   int infoPort;
  1365.   Server interTrackerServer;
  1366.   // Some jobs are stored in a local system directory.  We can delete
  1367.   // the files when we're done with the job.
  1368.   static final String SUBDIR = "jobTracker";
  1369.   FileSystem fs = null;
  1370.   Path systemDir = null;
  1371.   private JobConf conf;
  1372.   private QueueManager queueManager;
  1373.   /**
  1374.    * Start the JobTracker process, listen on the indicated port
  1375.    */
  1376.   JobTracker(JobConf conf) throws IOException, InterruptedException {
  1377.     //
  1378.     // Grab some static constants
  1379.     //
  1380.     TASKTRACKER_EXPIRY_INTERVAL = 
  1381.       conf.getLong("mapred.tasktracker.expiry.interval", 10 * 60 * 1000);
  1382.     RETIRE_JOB_INTERVAL = conf.getLong("mapred.jobtracker.retirejob.interval", 24 * 60 * 60 * 1000);
  1383.     RETIRE_JOB_CHECK_INTERVAL = conf.getLong("mapred.jobtracker.retirejob.check", 60 * 1000);
  1384.     MAX_COMPLETE_USER_JOBS_IN_MEMORY = conf.getInt("mapred.jobtracker.completeuserjobs.maximum", 100);
  1385.     MAX_BLACKLISTS_PER_TRACKER = 
  1386.         conf.getInt("mapred.max.tracker.blacklists", 4);
  1387.     //This configuration is there solely for tuning purposes and 
  1388.     //once this feature has been tested in real clusters and an appropriate
  1389.     //value for the threshold has been found, this config might be taken out.
  1390.     AVERAGE_BLACKLIST_THRESHOLD = 
  1391.       conf.getFloat("mapred.cluster.average.blacklist.threshold", 0.5f); 
  1392.     // This is a directory of temporary submission files.  We delete it
  1393.     // on startup, and can delete any files that we're done with
  1394.     this.conf = conf;
  1395.     JobConf jobConf = new JobConf(conf);
  1396.     // Read the hosts/exclude files to restrict access to the jobtracker.
  1397.     this.hostsReader = new HostsFileReader(conf.get("mapred.hosts", ""),
  1398.                                            conf.get("mapred.hosts.exclude", ""));
  1399.     
  1400.     queueManager = new QueueManager(this.conf);
  1401.     
  1402.     // Create the scheduler
  1403.     Class<? extends TaskScheduler> schedulerClass
  1404.       = conf.getClass("mapred.jobtracker.taskScheduler",
  1405.           JobQueueTaskScheduler.class, TaskScheduler.class);
  1406.     taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass, conf);
  1407.                                            
  1408.     // Set ports, start RPC servers, setup security policy etc.
  1409.     InetSocketAddress addr = getAddress(conf);
  1410.     this.localMachine = addr.getHostName();
  1411.     this.port = addr.getPort();
  1412.     
  1413.     // Set service-level authorization security policy
  1414.     if (conf.getBoolean(
  1415.           ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) {
  1416.       PolicyProvider policyProvider = 
  1417.         (PolicyProvider)(ReflectionUtils.newInstance(
  1418.             conf.getClass(PolicyProvider.POLICY_PROVIDER_CONFIG, 
  1419.                 MapReducePolicyProvider.class, PolicyProvider.class), 
  1420.             conf));
  1421.       SecurityUtil.setPolicy(new ConfiguredPolicy(conf, policyProvider));
  1422.     }
  1423.     
  1424.     int handlerCount = conf.getInt("mapred.job.tracker.handler.count", 10);
  1425.     this.interTrackerServer = RPC.getServer(this, addr.getHostName(), addr.getPort(), handlerCount, false, conf);
  1426.     if (LOG.isDebugEnabled()) {
  1427.       Properties p = System.getProperties();
  1428.       for (Iterator it = p.keySet().iterator(); it.hasNext();) {
  1429.         String key = (String) it.next();
  1430.         String val = p.getProperty(key);
  1431.         LOG.debug("Property '" + key + "' is " + val);
  1432.       }
  1433.     }
  1434.     String infoAddr = 
  1435.       NetUtils.getServerAddress(conf, "mapred.job.tracker.info.bindAddress",
  1436.                                 "mapred.job.tracker.info.port",
  1437.                                 "mapred.job.tracker.http.address");
  1438.     InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(infoAddr);
  1439.     String infoBindAddress = infoSocAddr.getHostName();
  1440.     int tmpInfoPort = infoSocAddr.getPort();
  1441.     this.startTime = System.currentTimeMillis();
  1442.     infoServer = new HttpServer("job", infoBindAddress, tmpInfoPort, 
  1443.         tmpInfoPort == 0, conf);
  1444.     infoServer.setAttribute("job.tracker", this);
  1445.     // initialize history parameters.
  1446.     boolean historyInitialized = JobHistory.init(conf, this.localMachine,
  1447.                                                  this.startTime);
  1448.     String historyLogDir = null;
  1449.     FileSystem historyFS = null;
  1450.     if (historyInitialized) {
  1451.       historyLogDir = conf.get("hadoop.job.history.location");
  1452.       infoServer.setAttribute("historyLogDir", historyLogDir);
  1453.       historyFS = new Path(historyLogDir).getFileSystem(conf);
  1454.       infoServer.setAttribute("fileSys", historyFS);
  1455.     }
  1456.     infoServer.addServlet("reducegraph", "/taskgraph", TaskGraphServlet.class);
  1457.     infoServer.start();
  1458.     
  1459.     trackerIdentifier = getDateFormat().format(new Date());
  1460.     // Initialize instrumentation
  1461.     JobTrackerInstrumentation tmp;
  1462.     Class<? extends JobTrackerInstrumentation> metricsInst =
  1463.       getInstrumentationClass(jobConf);
  1464.     try {
  1465.       java.lang.reflect.Constructor<? extends JobTrackerInstrumentation> c =
  1466.         metricsInst.getConstructor(new Class[] {JobTracker.class, JobConf.class} );
  1467.       tmp = c.newInstance(this, jobConf);
  1468.     } catch(Exception e) {
  1469.       //Reflection can throw lots of exceptions -- handle them all by 
  1470.       //falling back on the default.
  1471.       LOG.error("failed to initialize job tracker metrics", e);
  1472.       tmp = new JobTrackerMetricsInst(this, jobConf);
  1473.     }
  1474.     myInstrumentation = tmp;
  1475.     
  1476.     // The rpc/web-server ports can be ephemeral ports... 
  1477.     // ... ensure we have the correct info
  1478.     this.port = interTrackerServer.getListenerAddress().getPort();
  1479.     this.conf.set("mapred.job.tracker", (this.localMachine + ":" + this.port));
  1480.     LOG.info("JobTracker up at: " + this.port);
  1481.     this.infoPort = this.infoServer.getPort();
  1482.     this.conf.set("mapred.job.tracker.http.address", 
  1483.         infoBindAddress + ":" + this.infoPort); 
  1484.     LOG.info("JobTracker webserver: " + this.infoServer.getPort());
  1485.     
  1486.     // start the recovery manager
  1487.     recoveryManager = new RecoveryManager();
  1488.     
  1489.     while (true) {
  1490.       try {
  1491.         // if we haven't contacted the namenode go ahead and do it
  1492.         if (fs == null) {
  1493.           fs = FileSystem.get(conf);
  1494.         }
  1495.         // clean up the system dir, which will only work if hdfs is out of 
  1496.         // safe mode
  1497.         if(systemDir == null) {
  1498.           systemDir = new Path(getSystemDir());    
  1499.         }
  1500.         // Make sure that the backup data is preserved
  1501.         FileStatus[] systemDirData = fs.listStatus(this.systemDir);
  1502.         // Check if the history is enabled .. as we cant have persistence with 
  1503.         // history disabled
  1504.         if (conf.getBoolean("mapred.jobtracker.restart.recover", false) 
  1505.             && !JobHistory.isDisableHistory()
  1506.             && systemDirData != null) {
  1507.           for (FileStatus status : systemDirData) {
  1508.             try {
  1509.               recoveryManager.checkAndAddJob(status);
  1510.             } catch (Throwable t) {
  1511.               LOG.warn("Failed to add the job " + status.getPath().getName(), 
  1512.                        t);
  1513.             }
  1514.           }
  1515.           
  1516.           // Check if there are jobs to be recovered
  1517.           hasRestarted = recoveryManager.shouldRecover();
  1518.           if (hasRestarted) {
  1519.             break; // if there is something to recover else clean the sys dir
  1520.           }
  1521.         }
  1522.         LOG.info("Cleaning up the system directory");
  1523.         fs.delete(systemDir, true);
  1524.         if (FileSystem.mkdirs(fs, systemDir, 
  1525.             new FsPermission(SYSTEM_DIR_PERMISSION))) {
  1526.           break;
  1527.         }
  1528.         LOG.error("Mkdirs failed to create " + systemDir);
  1529.       } catch (IOException ie) {
  1530.         if (ie instanceof RemoteException && 
  1531.             AccessControlException.class.getName().equals(
  1532.                 ((RemoteException)ie).getClassName())) {
  1533.           throw ie;
  1534.         }
  1535.         LOG.info("problem cleaning system directory: " + systemDir, ie);
  1536.       }
  1537.       Thread.sleep(SYSTEM_DIR_CLEANUP_RETRY_PERIOD);
  1538.     }
  1539.     // Prepare for recovery. This is done irrespective of the status of restart
  1540.     // flag.
  1541.     try {
  1542.       recoveryManager.updateRestartCount();
  1543.     } catch (IOException ioe) {
  1544.       LOG.warn("Failed to initialize recovery manager. The Recovery manager "
  1545.                + "failed to access the system files in the system dir (" 
  1546.                + getSystemDir() + ")."); 
  1547.       LOG.warn("It might be because the JobTracker failed to read/write system"
  1548.                + " files (" + recoveryManager.getRestartCountFile() + " / " 
  1549.                + recoveryManager.getTempRestartCountFile() + ") or the system "
  1550.                + " file " + recoveryManager.getRestartCountFile() 
  1551.                + " is missing!");
  1552.       LOG.warn("Bailing out...");
  1553.       throw ioe;
  1554.     }
  1555.     
  1556.     // Same with 'localDir' except it's always on the local disk.
  1557.     jobConf.deleteLocalFiles(SUBDIR);
  1558.     // Initialize history again if it is not initialized
  1559.     // because history was on dfs and namenode was in safemode.
  1560.     if (!historyInitialized) {
  1561.       JobHistory.init(conf, this.localMachine, this.startTime); 
  1562.       historyLogDir = conf.get("hadoop.job.history.location");
  1563.       infoServer.setAttribute("historyLogDir", historyLogDir);
  1564.       historyFS = new Path(historyLogDir).getFileSystem(conf);
  1565.       infoServer.setAttribute("fileSys", historyFS);
  1566.     }
  1567.     this.dnsToSwitchMapping = ReflectionUtils.newInstance(
  1568.         conf.getClass("topology.node.switch.mapping.impl", ScriptBasedMapping.class,
  1569.             DNSToSwitchMapping.class), conf);
  1570.     this.numTaskCacheLevels = conf.getInt("mapred.task.cache.levels", 
  1571.         NetworkTopology.DEFAULT_HOST_LEVEL);
  1572.     //initializes the job status store
  1573.     completedJobStatusStore = new CompletedJobStatusStore(conf,fs);
  1574.   }
  1575.   private static SimpleDateFormat getDateFormat() {
  1576.     return new SimpleDateFormat("yyyyMMddHHmm");
  1577.   }
  1578.   static boolean validateIdentifier(String id) {
  1579.     try {
  1580.       // the jobtracker id should be 'date' parseable
  1581.       getDateFormat().parse(id);
  1582.       return true;
  1583.     } catch (ParseException pe) {}
  1584.     return false;
  1585.   }
  1586.   static boolean validateJobNumber(String id) {
  1587.     try {
  1588.       // the job number should be integer parseable
  1589.       Integer.parseInt(id);
  1590.       return true;
  1591.     } catch (IllegalArgumentException pe) {}
  1592.     return false;
  1593.   }
  1594.   /**
  1595.    * Whether the JT has restarted
  1596.    */
  1597.   public boolean hasRestarted() {
  1598.     return hasRestarted;
  1599.   }
  1600.   /**
  1601.    * Whether the JT has recovered upon restart
  1602.    */
  1603.   public boolean hasRecovered() {
  1604.     return hasRecovered;
  1605.   }
  1606.   /**
  1607.    * How long the jobtracker took to recover from restart.
  1608.    */
  1609.   public long getRecoveryDuration() {
  1610.     return hasRestarted() 
  1611.            ? recoveryDuration
  1612.            : 0;
  1613.   }
  1614.   public static Class<? extends JobTrackerInstrumentation> getInstrumentationClass(Configuration conf) {
  1615.     return conf.getClass("mapred.jobtracker.instrumentation",
  1616.         JobTrackerMetricsInst.class, JobTrackerInstrumentation.class);
  1617.   }
  1618.   
  1619.   public static void setInstrumentationClass(Configuration conf, Class<? extends JobTrackerInstrumentation> t) {
  1620.     conf.setClass("mapred.jobtracker.instrumentation",
  1621.         t, JobTrackerInstrumentation.class);
  1622.   }
  1623.   JobTrackerInstrumentation getInstrumentation() {
  1624.     return myInstrumentation;
  1625.   }
  1626.   public static InetSocketAddress getAddress(Configuration conf) {
  1627.     String jobTrackerStr =
  1628.       conf.get("mapred.job.tracker", "localhost:8012");
  1629.     return NetUtils.createSocketAddr(jobTrackerStr);
  1630.   }
  1631.   /**
  1632.    * Run forever
  1633.    */
  1634.   public void offerService() throws InterruptedException, IOException {
  1635.     taskScheduler.start();
  1636.     
  1637.     //  Start the recovery after starting the scheduler
  1638.     try {
  1639.       recoveryManager.recover();
  1640.     } catch (Throwable t) {
  1641.       LOG.warn("Recovery manager crashed! Ignoring.", t);
  1642.     }
  1643.     
  1644.     this.expireTrackersThread = new Thread(this.expireTrackers,
  1645.                                           "expireTrackers");
  1646.     this.expireTrackersThread.start();
  1647.     this.retireJobsThread = new Thread(this.retireJobs, "retireJobs");
  1648.     this.retireJobsThread.start();
  1649.     expireLaunchingTaskThread.start();
  1650.     if (completedJobStatusStore.isActive()) {
  1651.       completedJobsStoreThread = new Thread(completedJobStatusStore,
  1652.                                             "completedjobsStore-housekeeper");
  1653.       completedJobsStoreThread.start();
  1654.     }
  1655.     // start the inter-tracker server once the jt is ready
  1656.     this.interTrackerServer.start();
  1657.     
  1658.     synchronized (this) {
  1659.       state = State.RUNNING;
  1660.     }
  1661.     LOG.info("Starting RUNNING");
  1662.     
  1663.     this.interTrackerServer.join();
  1664.     LOG.info("Stopped interTrackerServer");
  1665.   }
  1666.   void close() throws IOException {
  1667.     if (this.infoServer != null) {
  1668.       LOG.info("Stopping infoServer");
  1669.       try {
  1670.         this.infoServer.stop();
  1671.       } catch (Exception ex) {
  1672.         LOG.warn("Exception shutting down JobTracker", ex);
  1673.       }
  1674.     }
  1675.     if (this.interTrackerServer != null) {
  1676.       LOG.info("Stopping interTrackerServer");
  1677.       this.interTrackerServer.stop();
  1678.     }
  1679.     if (this.expireTrackersThread != null && this.expireTrackersThread.isAlive()) {
  1680.       LOG.info("Stopping expireTrackers");
  1681.       this.expireTrackersThread.interrupt();
  1682.       try {
  1683.         this.expireTrackersThread.join();
  1684.       } catch (InterruptedException ex) {
  1685.         ex.printStackTrace();
  1686.       }
  1687.     }
  1688.     if (this.retireJobsThread != null && this.retireJobsThread.isAlive()) {
  1689.       LOG.info("Stopping retirer");
  1690.       this.retireJobsThread.interrupt();
  1691.       try {
  1692.         this.retireJobsThread.join();
  1693.       } catch (InterruptedException ex) {
  1694.         ex.printStackTrace();
  1695.       }
  1696.     }
  1697.     if (taskScheduler != null) {
  1698.       taskScheduler.terminate();
  1699.     }
  1700.     if (this.expireLaunchingTaskThread != null && this.expireLaunchingTaskThread.isAlive()) {
  1701.       LOG.info("Stopping expireLaunchingTasks");
  1702.       this.expireLaunchingTaskThread.interrupt();
  1703.       try {
  1704.         this.expireLaunchingTaskThread.join();
  1705.       } catch (InterruptedException ex) {
  1706.         ex.printStackTrace();
  1707.       }
  1708.     }
  1709.     if (this.completedJobsStoreThread != null &&
  1710.         this.completedJobsStoreThread.isAlive()) {
  1711.       LOG.info("Stopping completedJobsStore thread");
  1712.       this.completedJobsStoreThread.interrupt();
  1713.       try {
  1714.         this.completedJobsStoreThread.join();
  1715.       } catch (InterruptedException ex) {
  1716.         ex.printStackTrace();
  1717.       }
  1718.     }
  1719.     LOG.info("stopped all jobtracker services");
  1720.     return;
  1721.   }
  1722.     
  1723.   ///////////////////////////////////////////////////////
  1724.   // Maintain lookup tables; called by JobInProgress
  1725.   // and TaskInProgress
  1726.   ///////////////////////////////////////////////////////
  1727.   void createTaskEntry(TaskAttemptID taskid, String taskTracker, TaskInProgress tip) {
  1728.     LOG.info("Adding task " + 
  1729.       (tip.isCleanupAttempt(taskid) ? "(cleanup)" : "") + 
  1730.       "'"  + taskid + "' to tip " + 
  1731.       tip.getTIPId() + ", for tracker '" + taskTracker + "'");
  1732.     // taskid --> tracker
  1733.     taskidToTrackerMap.put(taskid, taskTracker);
  1734.     // tracker --> taskid
  1735.     Set<TaskAttemptID> taskset = trackerToTaskMap.get(taskTracker);
  1736.     if (taskset == null) {
  1737.       taskset = new TreeSet<TaskAttemptID>();
  1738.       trackerToTaskMap.put(taskTracker, taskset);
  1739.     }
  1740.     taskset.add(taskid);
  1741.     // taskid --> TIP
  1742.     taskidToTIPMap.put(taskid, tip);
  1743.     
  1744.   }
  1745.     
  1746.   void removeTaskEntry(TaskAttemptID taskid) {
  1747.     // taskid --> tracker
  1748.     String tracker = taskidToTrackerMap.remove(taskid);
  1749.     // tracker --> taskid
  1750.     if (tracker != null) {
  1751.       Set<TaskAttemptID> trackerSet = trackerToTaskMap.get(tracker);
  1752.       if (trackerSet != null) {
  1753.         trackerSet.remove(taskid);
  1754.       }
  1755.     }
  1756.     // taskid --> TIP
  1757.     taskidToTIPMap.remove(taskid);
  1758.         
  1759.     LOG.debug("Removing task '" + taskid + "'");
  1760.   }
  1761.     
  1762.   /**
  1763.    * Mark a 'task' for removal later.
  1764.    * This function assumes that the JobTracker is locked on entry.
  1765.    * 
  1766.    * @param taskTracker the tasktracker at which the 'task' was running
  1767.    * @param taskid completed (success/failure/killed) task
  1768.    */
  1769.   void markCompletedTaskAttempt(String taskTracker, TaskAttemptID taskid) {
  1770.     // tracker --> taskid
  1771.     Set<TaskAttemptID> taskset = trackerToMarkedTasksMap.get(taskTracker);
  1772.     if (taskset == null) {
  1773.       taskset = new TreeSet<TaskAttemptID>();
  1774.       trackerToMarkedTasksMap.put(taskTracker, taskset);
  1775.     }
  1776.     taskset.add(taskid);
  1777.       
  1778.     LOG.debug("Marked '" + taskid + "' from '" + taskTracker + "'");
  1779.   }
  1780.   /**
  1781.    * Mark all 'non-running' jobs of the job for pruning.
  1782.    * This function assumes that the JobTracker is locked on entry.
  1783.    * 
  1784.    * @param job the completed job
  1785.    */
  1786.   void markCompletedJob(JobInProgress job) {
  1787.     for (TaskInProgress tip : job.getSetupTasks()) {
  1788.       for (TaskStatus taskStatus : tip.getTaskStatuses()) {
  1789.         if (taskStatus.getRunState() != TaskStatus.State.RUNNING && 
  1790.             taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
  1791.             taskStatus.getRunState() != TaskStatus.State.UNASSIGNED) {
  1792.           markCompletedTaskAttempt(taskStatus.getTaskTracker(), 
  1793.                                    taskStatus.getTaskID());
  1794.         }
  1795.       }
  1796.     }
  1797.     for (TaskInProgress tip : job.getMapTasks()) {
  1798.       for (TaskStatus taskStatus : tip.getTaskStatuses()) {
  1799.         if (taskStatus.getRunState() != TaskStatus.State.RUNNING && 
  1800.             taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
  1801.             taskStatus.getRunState() != TaskStatus.State.FAILED_UNCLEAN &&
  1802.             taskStatus.getRunState() != TaskStatus.State.KILLED_UNCLEAN &&
  1803.             taskStatus.getRunState() != TaskStatus.State.UNASSIGNED) {
  1804.           markCompletedTaskAttempt(taskStatus.getTaskTracker(), 
  1805.                                    taskStatus.getTaskID());
  1806.         }
  1807.       }
  1808.     }
  1809.     for (TaskInProgress tip : job.getReduceTasks()) {
  1810.       for (TaskStatus taskStatus : tip.getTaskStatuses()) {
  1811.         if (taskStatus.getRunState() != TaskStatus.State.RUNNING &&
  1812.             taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
  1813.             taskStatus.getRunState() != TaskStatus.State.FAILED_UNCLEAN &&
  1814.             taskStatus.getRunState() != TaskStatus.State.KILLED_UNCLEAN &&
  1815.             taskStatus.getRunState() != TaskStatus.State.UNASSIGNED) {
  1816.           markCompletedTaskAttempt(taskStatus.getTaskTracker(), 
  1817.                                    taskStatus.getTaskID());
  1818.         }
  1819.       }
  1820.     }
  1821.   }
  1822.     
  1823.   /**
  1824.    * Remove all 'marked' tasks running on a given {@link TaskTracker}
  1825.    * from the {@link JobTracker}'s data-structures.
  1826.    * This function assumes that the JobTracker is locked on entry.
  1827.    * 
  1828.    * @param taskTracker tasktracker whose 'non-running' tasks are to be purged
  1829.    */
  1830.   private void removeMarkedTasks(String taskTracker) {
  1831.     // Purge all the 'marked' tasks which were running at taskTracker
  1832.     Set<TaskAttemptID> markedTaskSet = 
  1833.       trackerToMarkedTasksMap.get(taskTracker);
  1834.     if (markedTaskSet != null) {
  1835.       for (TaskAttemptID taskid : markedTaskSet) {
  1836.         removeTaskEntry(taskid);
  1837.         LOG.info("Removed completed task '" + taskid + "' from '" + 
  1838.                  taskTracker + "'");
  1839.       }
  1840.       // Clear
  1841.       trackerToMarkedTasksMap.remove(taskTracker);
  1842.     }
  1843.   }
  1844.     
  1845.   /**
  1846.    * Call {@link #removeTaskEntry(String)} for each of the
  1847.    * job's tasks.
  1848.    * When the JobTracker is retiring the long-completed
  1849.    * job, either because it has outlived {@link #RETIRE_JOB_INTERVAL}
  1850.    * or the limit of {@link #MAX_COMPLETE_USER_JOBS_IN_MEMORY} jobs 
  1851.    * has been reached, we can afford to nuke all it's tasks; a little
  1852.    * unsafe, but practically feasible. 
  1853.    * 
  1854.    * @param job the job about to be 'retired'
  1855.    */
  1856.   synchronized private void removeJobTasks(JobInProgress job) { 
  1857.     for (TaskInProgress tip : job.getMapTasks()) {
  1858.       for (TaskStatus taskStatus : tip.getTaskStatuses()) {
  1859.         removeTaskEntry(taskStatus.getTaskID());
  1860.       }
  1861.     }
  1862.     for (TaskInProgress tip : job.getReduceTasks()) {
  1863.       for (TaskStatus taskStatus : tip.getTaskStatuses()) {
  1864.         removeTaskEntry(taskStatus.getTaskID());
  1865.       }
  1866.     }
  1867.   }
  1868.     
  1869.   /**
  1870.    * Safe clean-up all data structures at the end of the 
  1871.    * job (success/failure/killed).
  1872.    * Here we also ensure that for a given user we maintain 
  1873.    * information for only MAX_COMPLETE_USER_JOBS_IN_MEMORY jobs 
  1874.    * on the JobTracker.
  1875.    *  
  1876.    * @param job completed job.
  1877.    */
  1878.   synchronized void finalizeJob(JobInProgress job) {
  1879.     // Mark the 'non-running' tasks for pruning
  1880.     markCompletedJob(job);
  1881.     
  1882.     JobEndNotifier.registerNotification(job.getJobConf(), job.getStatus());
  1883.     // start the merge of log files
  1884.     JobID id = job.getStatus().getJobID();
  1885.     try {
  1886.       JobHistory.JobInfo.finalizeRecovery(id, job.getJobConf());
  1887.     } catch (IOException ioe) {
  1888.       LOG.info("Failed to finalize the log file recovery for job " + id, ioe);
  1889.     }
  1890.     final JobTrackerInstrumentation metrics = getInstrumentation();
  1891.     metrics.finalizeJob(conf, id);
  1892.     
  1893.     long now = System.currentTimeMillis();
  1894.     
  1895.     // mark the job for cleanup at all the trackers
  1896.     addJobForCleanup(id);
  1897.     // add the blacklisted trackers to potentially faulty list
  1898.     if (job.getStatus().getRunState() == JobStatus.SUCCEEDED) {
  1899.       if (job.getNoOfBlackListedTrackers() > 0) {
  1900.         for (String hostName : job.getBlackListedTrackers()) {
  1901.           faultyTrackers.incrementFaults(hostName);
  1902.         }
  1903.       }
  1904.     }
  1905.     
  1906.     // Purge oldest jobs and keep at-most MAX_COMPLETE_USER_JOBS_IN_MEMORY jobs of a given user
  1907.     // in memory; information about the purged jobs is available via
  1908.     // JobHistory.
  1909.     synchronized (jobs) {
  1910.       synchronized (taskScheduler) {
  1911.         synchronized (userToJobsMap) {
  1912.           String jobUser = job.getProfile().getUser();
  1913.           if (!userToJobsMap.containsKey(jobUser)) {
  1914.             userToJobsMap.put(jobUser, 
  1915.                               new ArrayList<JobInProgress>());
  1916.           }
  1917.           ArrayList<JobInProgress> userJobs = 
  1918.             userToJobsMap.get(jobUser);
  1919.           synchronized (userJobs) {
  1920.             // Add the currently completed 'job'
  1921.             userJobs.add(job);
  1922.             // Check if we need to retire some jobs of this user
  1923.             while (userJobs.size() > 
  1924.                    MAX_COMPLETE_USER_JOBS_IN_MEMORY) {
  1925.               JobInProgress rjob = userJobs.get(0);
  1926.                 
  1927.               // Do not delete 'current'
  1928.               // finished job just yet.
  1929.               if (rjob == job) {
  1930.                 break;
  1931.               }
  1932.               // do not retire jobs that finished in the very recent past.
  1933.               if (rjob.getFinishTime() + MIN_TIME_BEFORE_RETIRE > now) {
  1934.                 break;
  1935.               }
  1936.                 
  1937.               // Cleanup all datastructures
  1938.               int rjobRunState = 
  1939.                 rjob.getStatus().getRunState();
  1940.               if (rjobRunState == JobStatus.SUCCEEDED || 
  1941.                   rjobRunState == JobStatus.FAILED ||
  1942.                   rjobRunState == JobStatus.KILLED) {
  1943.                 // Ok, this call to removeTaskEntries
  1944.                 // is dangerous is some very very obscure
  1945.                 // cases; e.g. when rjob completed, hit
  1946.                 // MAX_COMPLETE_USER_JOBS_IN_MEMORY job
  1947.                 // limit and yet some task (taskid)
  1948.                 // wasn't complete!
  1949.                 removeJobTasks(rjob);
  1950.                   
  1951.                 userJobs.remove(0);
  1952.                 jobs.remove(rjob.getProfile().getJobID());
  1953.                 for (JobInProgressListener listener : jobInProgressListeners) {
  1954.                   listener.jobRemoved(rjob);
  1955.                 }
  1956.                   
  1957.                 LOG.info("Retired job with id: '" + 
  1958.                          rjob.getProfile().getJobID() + "' of user: '" +
  1959.                          jobUser + "'");
  1960.               } else {
  1961.                 // Do not remove jobs that aren't complete.
  1962.                 // Stop here, and let the next pass take
  1963.                 // care of purging jobs.
  1964.                 break;
  1965.               }
  1966.             }
  1967.           }
  1968.           if (userJobs.isEmpty()) {
  1969.             userToJobsMap.remove(jobUser);
  1970.           }
  1971.         }
  1972.       }
  1973.     }
  1974.   }
  1975.   ///////////////////////////////////////////////////////
  1976.   // Accessors for objects that want info on jobs, tasks,
  1977.   // trackers, etc.
  1978.   ///////////////////////////////////////////////////////
  1979.   public int getTotalSubmissions() {
  1980.     return totalSubmissions;
  1981.   }
  1982.   public String getJobTrackerMachine() {
  1983.     return localMachine;
  1984.   }
  1985.   
  1986.   /**
  1987.    * Get the unique identifier (ie. timestamp) of this job tracker start.
  1988.    * @return a string with a unique identifier
  1989.    */
  1990.   public String getTrackerIdentifier() {
  1991.     return trackerIdentifier;
  1992.   }
  1993.   public int getTrackerPort() {
  1994.     return port;
  1995.   }
  1996.   public int getInfoPort() {
  1997.     return infoPort;
  1998.   }
  1999.   public long getStartTime() {
  2000.     return startTime;
  2001.   }
  2002.   public Vector<JobInProgress> runningJobs() {
  2003.     Vector<JobInProgress> v = new Vector<JobInProgress>();
  2004.     for (Iterator it = jobs.values().iterator(); it.hasNext();) {
  2005.       JobInProgress jip = (JobInProgress) it.next();
  2006.       JobStatus status = jip.getStatus();
  2007.       if (status.getRunState() == JobStatus.RUNNING) {
  2008.         v.add(jip);
  2009.       }
  2010.     }
  2011.     return v;
  2012.   }
  2013.   /**
  2014.    * Version that is called from a timer thread, and therefore needs to be
  2015.    * careful to synchronize.
  2016.    */
  2017.   public synchronized List<JobInProgress> getRunningJobs() {
  2018.     synchronized (jobs) {
  2019.       return runningJobs();
  2020.     }
  2021.   }
  2022.   public Vector<JobInProgress> failedJobs() {
  2023.     Vector<JobInProgress> v = new Vector<JobInProgress>();
  2024.     for (Iterator it = jobs.values().iterator(); it.hasNext();) {
  2025.       JobInProgress jip = (JobInProgress) it.next();
  2026.       JobStatus status = jip.getStatus();
  2027.       if ((status.getRunState() == JobStatus.FAILED)
  2028.           || (status.getRunState() == JobStatus.KILLED)) {
  2029.         v.add(jip);
  2030.       }
  2031.     }
  2032.     return v;
  2033.   }
  2034.   public Vector<JobInProgress> completedJobs() {
  2035.     Vector<JobInProgress> v = new Vector<JobInProgress>();
  2036.     for (Iterator it = jobs.values().iterator(); it.hasNext();) {
  2037.       JobInProgress jip = (JobInProgress) it.next();
  2038.       JobStatus status = jip.getStatus();
  2039.       if (status.getRunState() == JobStatus.SUCCEEDED) {
  2040.         v.add(jip);
  2041.       }
  2042.     }
  2043.     return v;
  2044.   }
  2045.   /**
  2046.    * Get all the task trackers in the cluster
  2047.    * 
  2048.    * @return {@link Collection} of {@link TaskTrackerStatus} 
  2049.    */
  2050.   public Collection<TaskTrackerStatus> taskTrackers() {
  2051.     synchronized (taskTrackers) {
  2052.       return taskTrackers.values();
  2053.     }
  2054.   }
  2055.   
  2056.   /**
  2057.    * Get the active task tracker statuses in the cluster
  2058.    *  
  2059.    * @return {@link Collection} of active {@link TaskTrackerStatus} 
  2060.    */
  2061.   public Collection<TaskTrackerStatus> activeTaskTrackers() {
  2062.     Collection<TaskTrackerStatus> activeTrackers = 
  2063.       new ArrayList<TaskTrackerStatus>();
  2064.     synchronized (taskTrackers) {
  2065.       for (TaskTrackerStatus status : taskTrackers.values()) {
  2066.         if (!faultyTrackers.isBlacklisted(status.getHost())) {
  2067.           activeTrackers.add(status);
  2068.         }
  2069.       }
  2070.     }
  2071.     return activeTrackers;
  2072.   }
  2073.   
  2074.   /**
  2075.    * Get the active and blacklisted task tracker names in the cluster. The first
  2076.    * element in the returned list contains the list of active tracker names.
  2077.    * The second element in the returned list contains the list of blacklisted
  2078.    * tracker names. 
  2079.    */
  2080.   public List<List<String>> taskTrackerNames() {
  2081.     List<String> activeTrackers = 
  2082.       new ArrayList<String>();
  2083.     List<String> blacklistedTrackers = 
  2084.       new ArrayList<String>();
  2085.     synchronized (taskTrackers) {
  2086.       for (TaskTrackerStatus status : taskTrackers.values()) {
  2087.         if (!faultyTrackers.isBlacklisted(status.getHost())) {
  2088.           activeTrackers.add(status.getTrackerName());
  2089.         } else {
  2090.           blacklistedTrackers.add(status.getTrackerName());
  2091.         }
  2092.       }
  2093.     }
  2094.     List<List<String>> result = new ArrayList<List<String>>(2);
  2095.     result.add(activeTrackers);
  2096.     result.add(blacklistedTrackers);
  2097.     return result;
  2098.   }
  2099.   
  2100.   /**
  2101.    * Get the blacklisted task tracker statuses in the cluster
  2102.    *  
  2103.    * @return {@link Collection} of blacklisted {@link TaskTrackerStatus} 
  2104.    */
  2105.   public Collection<TaskTrackerStatus> blacklistedTaskTrackers() {
  2106.     Collection<TaskTrackerStatus> blacklistedTrackers = 
  2107.       new ArrayList<TaskTrackerStatus>();
  2108.     synchronized (taskTrackers) {
  2109.       for (TaskTrackerStatus status : taskTrackers.values()) {
  2110.         if (faultyTrackers.isBlacklisted(status.getHost())) {
  2111.           blacklistedTrackers.add(status);
  2112.         }
  2113.       }
  2114.     }    
  2115.     return blacklistedTrackers;
  2116.   }
  2117.   int getFaultCount(String hostName) {
  2118.     return faultyTrackers.getFaultCount(hostName);
  2119.   }
  2120.   
  2121.   /**
  2122.    * Get the number of blacklisted trackers across all the jobs
  2123.    * 
  2124.    * @return
  2125.    */
  2126.   int getBlacklistedTrackerCount() {
  2127.     return faultyTrackers.numBlacklistedTrackers;
  2128.   }
  2129.   /**
  2130.    * Whether the tracker is blacklisted or not
  2131.    * 
  2132.    * @param trackerID
  2133.    * 
  2134.    * @return true if blacklisted, false otherwise
  2135.    */
  2136.   public boolean isBlacklisted(String trackerID) {
  2137.     TaskTrackerStatus status = getTaskTracker(trackerID);
  2138.     if (status != null) {
  2139.       return faultyTrackers.isBlacklisted(status.getHost());
  2140.     }
  2141.     return false;
  2142.   }
  2143.   
  2144.   public TaskTrackerStatus getTaskTracker(String trackerID) {
  2145.     synchronized (taskTrackers) {
  2146.       return taskTrackers.get(trackerID);
  2147.     }
  2148.   }
  2149.   /**
  2150.    * Adds a new node to the jobtracker. It involves adding it to the expiry
  2151.    * thread and adding it for resolution
  2152.    * 
  2153.    * Assuming trackerExpiryQueue is locked on entry
  2154.    * 
  2155.    * @param status Task Tracker's status
  2156.    */
  2157.   private void addNewTracker(TaskTrackerStatus status) {
  2158.     trackerExpiryQueue.add(status);
  2159.     //  Register the tracker if its not registered
  2160.     if (getNode(status.getTrackerName()) == null) {
  2161.       // Making the network location resolution inline .. 
  2162.       resolveAndAddToTopology(status.getHost());
  2163.     }
  2164.   }
  2165.   public Node resolveAndAddToTopology(String name) {
  2166.     List <String> tmpList = new ArrayList<String>(1);
  2167.     tmpList.add(name);
  2168.     List <String> rNameList = dnsToSwitchMapping.resolve(tmpList);
  2169.     String rName = rNameList.get(0);
  2170.     String networkLoc = NodeBase.normalize(rName);
  2171.     return addHostToNodeMapping(name, networkLoc);
  2172.   }
  2173.   
  2174.   private Node addHostToNodeMapping(String host, String networkLoc) {
  2175.     Node node;
  2176.     if ((node = clusterMap.getNode(networkLoc+"/"+host)) == null) {
  2177.       node = new NodeBase(host, networkLoc);
  2178.       clusterMap.add(node);
  2179.       if (node.getLevel() < getNumTaskCacheLevels()) {
  2180.         LOG.fatal("Got a host whose level is: " + node.getLevel() + "." 
  2181.                   + " Should get at least a level of value: " 
  2182.                   + getNumTaskCacheLevels());
  2183.         try {
  2184.           stopTracker();
  2185.         } catch (IOException ie) {
  2186.           LOG.warn("Exception encountered during shutdown: " 
  2187.                    + StringUtils.stringifyException(ie));
  2188.           System.exit(-1);
  2189.         }
  2190.       }
  2191.       hostnameToNodeMap.put(host, node);
  2192.       // Make an entry for the node at the max level in the cache
  2193.       nodesAtMaxLevel.add(getParentNode(node, getNumTaskCacheLevels() - 1));
  2194.     }
  2195.     return node;
  2196.   }
  2197.   /**
  2198.    * Returns a collection of nodes at the max level
  2199.    */
  2200.   public Collection<Node> getNodesAtMaxLevel() {
  2201.     return nodesAtMaxLevel;
  2202.   }
  2203.   public static Node getParentNode(Node node, int level) {
  2204.     for (int i = 0; i < level; ++i) {
  2205.       node = node.getParent();
  2206.     }
  2207.     return node;
  2208.   }
  2209.   /**
  2210.    * Return the Node in the network topology that corresponds to the hostname
  2211.    */
  2212.   public Node getNode(String name) {
  2213.     return hostnameToNodeMap.get(name);
  2214.   }
  2215.   public int getNumTaskCacheLevels() {
  2216.     return numTaskCacheLevels;
  2217.   }
  2218.   public int getNumResolvedTaskTrackers() {
  2219.     return numResolved;
  2220.   }
  2221.   
  2222.   public int getNumberOfUniqueHosts() {
  2223.     return uniqueHostsMap.size();
  2224.   }
  2225.   
  2226.   public void addJobInProgressListener(JobInProgressListener listener) {
  2227.     jobInProgressListeners.add(listener);
  2228.   }
  2229.   public void removeJobInProgressListener(JobInProgressListener listener) {
  2230.     jobInProgressListeners.remove(listener);
  2231.   }
  2232.   
  2233.   // Update the listeners about the job
  2234.   // Assuming JobTracker is locked on entry.
  2235.   private void updateJobInProgressListeners(JobChangeEvent event) {
  2236.     for (JobInProgressListener listener : jobInProgressListeners) {
  2237.       listener.jobUpdated(event);
  2238.     }
  2239.   }
  2240.   
  2241.   /**
  2242.    * Return the {@link QueueManager} associated with the JobTracker.
  2243.    */
  2244.   public QueueManager getQueueManager() {
  2245.     return queueManager;
  2246.   }
  2247.   
  2248.   ////////////////////////////////////////////////////
  2249.   // InterTrackerProtocol
  2250.   ////////////////////////////////////////////////////
  2251.   
  2252.   public String getBuildVersion() throws IOException{
  2253.     return VersionInfo.getBuildVersion();
  2254.   }
  2255.   /**
  2256.    * The periodic heartbeat mechanism between the {@link TaskTracker} and
  2257.    * the {@link JobTracker}.
  2258.    * 
  2259.    * The {@link JobTracker} processes the status information sent by the 
  2260.    * {@link TaskTracker} and responds with instructions to start/stop 
  2261.    * tasks or jobs, and also 'reset' instructions during contingencies. 
  2262.    */
  2263.   public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status, 
  2264.                                                   boolean restarted,
  2265.                                                   boolean initialContact,
  2266.                                                   boolean acceptNewTasks, 
  2267.                                                   short responseId) 
  2268.     throws IOException {
  2269.     LOG.debug("Got heartbeat from: " + status.getTrackerName() + 
  2270.               " (restarted: " + restarted + 
  2271.               " initialContact: " + initialContact + 
  2272.               " acceptNewTasks: " + acceptNewTasks + ")" +
  2273.               " with responseId: " + responseId);
  2274.     // Make sure heartbeat is from a tasktracker allowed by the jobtracker.
  2275.     if (!acceptTaskTracker(status)) {
  2276.       throw new DisallowedTaskTrackerException(status);
  2277.     }
  2278.     // First check if the last heartbeat response got through
  2279.     String trackerName = status.getTrackerName();
  2280.     long now = System.currentTimeMillis();
  2281.     boolean isBlacklisted = false;
  2282.     if (restarted) {
  2283.       faultyTrackers.markTrackerHealthy(status.getHost());
  2284.     } else {
  2285.       isBlacklisted = 
  2286.         faultyTrackers.shouldAssignTasksToTracker(status.getHost(), now);
  2287.     }
  2288.     
  2289.     HeartbeatResponse prevHeartbeatResponse =
  2290.       trackerToHeartbeatResponseMap.get(trackerName);
  2291.     boolean addRestartInfo = false;
  2292.     if (initialContact != true) {
  2293.       // If this isn't the 'initial contact' from the tasktracker,
  2294.       // there is something seriously wrong if the JobTracker has
  2295.       // no record of the 'previous heartbeat'; if so, ask the 
  2296.       // tasktracker to re-initialize itself.
  2297.       if (prevHeartbeatResponse == null) {
  2298.         // This is the first heartbeat from the old tracker to the newly 
  2299.         // started JobTracker
  2300.         if (hasRestarted()) {
  2301.           addRestartInfo = true;
  2302.           // inform the recovery manager about this tracker joining back
  2303.           recoveryManager.unMarkTracker(trackerName);
  2304.         } else {
  2305.           // Jobtracker might have restarted but no recovery is needed
  2306.           // otherwise this code should not be reached
  2307.           LOG.warn("Serious problem, cannot find record of 'previous' " +
  2308.                    "heartbeat for '" + trackerName + 
  2309.                    "'; reinitializing the tasktracker");
  2310.           return new HeartbeatResponse(responseId, 
  2311.               new TaskTrackerAction[] {new ReinitTrackerAction()});
  2312.         }
  2313.       } else {
  2314.                 
  2315.         // It is completely safe to not process a 'duplicate' heartbeat from a 
  2316.         // {@link TaskTracker} since it resends the heartbeat when rpcs are 
  2317.         // lost see {@link TaskTracker.transmitHeartbeat()};
  2318.         // acknowledge it by re-sending the previous response to let the 
  2319.         // {@link TaskTracker} go forward. 
  2320.         if (prevHeartbeatResponse.getResponseId() != responseId) {
  2321.           LOG.info("Ignoring 'duplicate' heartbeat from '" + 
  2322.               trackerName + "'; resending the previous 'lost' response");
  2323.           return prevHeartbeatResponse;
  2324.         }
  2325.       }
  2326.     }
  2327.       
  2328.     // Process this heartbeat 
  2329.     short newResponseId = (short)(responseId + 1);
  2330.     status.setLastSeen(now);
  2331.     if (!processHeartbeat(status, initialContact)) {
  2332.       if (prevHeartbeatResponse != null) {
  2333.         trackerToHeartbeatResponseMap.remove(trackerName);
  2334.       }
  2335.       return new HeartbeatResponse(newResponseId, 
  2336.                    new TaskTrackerAction[] {new ReinitTrackerAction()});
  2337.     }
  2338.       
  2339.     // Initialize the response to be sent for the heartbeat
  2340.     HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);
  2341.     List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();
  2342.       
  2343.     // Check for new tasks to be executed on the tasktracker
  2344.     if (recoveryManager.shouldSchedule() && acceptNewTasks && !isBlacklisted) {
  2345.       TaskTrackerStatus taskTrackerStatus = getTaskTracker(trackerName);
  2346.       if (taskTrackerStatus == null) {
  2347.         LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);
  2348.       } else {
  2349.         List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);
  2350.         if (tasks == null ) {
  2351.           tasks = taskScheduler.assignTasks(taskTrackerStatus);
  2352.         }
  2353.         if (tasks != null) {
  2354.           for (Task task : tasks) {
  2355.             expireLaunchingTasks.addNewTask(task.getTaskID());
  2356.             LOG.debug(trackerName + " -> LaunchTask: " + task.getTaskID());
  2357.             actions.add(new LaunchTaskAction(task));
  2358.           }
  2359.         }
  2360.       }
  2361.     }
  2362.       
  2363.     // Check for tasks to be killed
  2364.     List<TaskTrackerAction> killTasksList = getTasksToKill(trackerName);
  2365.     if (killTasksList != null) {
  2366.       actions.addAll(killTasksList);
  2367.     }
  2368.      
  2369.     // Check for jobs to be killed/cleanedup
  2370.     List<TaskTrackerAction> killJobsList = getJobsForCleanup(trackerName);
  2371.     if (killJobsList != null) {
  2372.       actions.addAll(killJobsList);
  2373.     }
  2374.     // Check for tasks whose outputs can be saved
  2375.     List<TaskTrackerAction> commitTasksList = getTasksToSave(status);
  2376.     if (commitTasksList != null) {
  2377.       actions.addAll(commitTasksList);
  2378.     }
  2379.     // calculate next heartbeat interval and put in heartbeat response
  2380.     int nextInterval = getNextHeartbeatInterval();
  2381.     response.setHeartbeatInterval(nextInterval);
  2382.     response.setActions(
  2383.                         actions.toArray(new TaskTrackerAction[actions.size()]));
  2384.     
  2385.     // check if the restart info is req
  2386.     if (addRestartInfo) {
  2387.       response.setRecoveredJobs(recoveryManager.getJobsToRecover());
  2388.     }
  2389.         
  2390.     // Update the trackerToHeartbeatResponseMap
  2391.     trackerToHeartbeatResponseMap.put(trackerName, response);
  2392.     // Done processing the hearbeat, now remove 'marked' tasks
  2393.     removeMarkedTasks(trackerName);
  2394.         
  2395.     return response;
  2396.   }
  2397.   
  2398.   /**
  2399.    * Calculates next heartbeat interval using cluster size.
  2400.    * Heartbeat interval is incremented 1second for every 50 nodes. 
  2401.    * @return next heartbeat interval.
  2402.    */
  2403.   public int getNextHeartbeatInterval() {
  2404.     // get the no of task trackers
  2405.     int clusterSize = getClusterStatus().getTaskTrackers();
  2406.     int heartbeatInterval =  Math.max(
  2407.                                 (int)(1000 * Math.ceil((double)clusterSize / 
  2408.                                                        CLUSTER_INCREMENT)),
  2409.                                 HEARTBEAT_INTERVAL_MIN) ;
  2410.     return heartbeatInterval;
  2411.   }
  2412.   /**
  2413.    * Return if the specified tasktracker is in the hosts list, 
  2414.    * if one was configured.  If none was configured, then this 
  2415.    * returns true.
  2416.    */
  2417.   private boolean inHostsList(TaskTrackerStatus status) {
  2418.     Set<String> hostsList = hostsReader.getHosts();
  2419.     return (hostsList.isEmpty() || hostsList.contains(status.getHost()));
  2420.   }
  2421.   /**
  2422.    * Return if the specified tasktracker is in the exclude list.
  2423.    */
  2424.   private boolean inExcludedHostsList(TaskTrackerStatus status) {
  2425.     Set<String> excludeList = hostsReader.getExcludedHosts();
  2426.     return excludeList.contains(status.getHost());
  2427.   }
  2428.   /**
  2429.    * Returns true if the tasktracker is in the hosts list and 
  2430.    * not in the exclude list. 
  2431.    */
  2432.   private boolean acceptTaskTracker(TaskTrackerStatus status) {
  2433.     return (inHostsList(status) && !inExcludedHostsList(status));
  2434.   }
  2435.     
  2436.   /**
  2437.    * Update the last recorded status for the given task tracker.
  2438.    * It assumes that the taskTrackers are locked on entry.
  2439.    * @param trackerName The name of the tracker
  2440.    * @param status The new status for the task tracker
  2441.    * @return Was an old status found?
  2442.    */
  2443.   private boolean updateTaskTrackerStatus(String trackerName,
  2444.                                           TaskTrackerStatus status) {
  2445.     TaskTrackerStatus oldStatus = taskTrackers.get(trackerName);
  2446.     if (oldStatus != null) {
  2447.       totalMaps -= oldStatus.countMapTasks();
  2448.       totalReduces -= oldStatus.countReduceTasks();
  2449.       if (!faultyTrackers.isBlacklisted(oldStatus.getHost())) {
  2450.         totalMapTaskCapacity -= oldStatus.getMaxMapTasks();
  2451.         totalReduceTaskCapacity -= oldStatus.getMaxReduceTasks();
  2452.       }
  2453.       if (status == null) {
  2454.         taskTrackers.remove(trackerName);
  2455.         Integer numTaskTrackersInHost = 
  2456.           uniqueHostsMap.get(oldStatus.getHost());
  2457.         numTaskTrackersInHost --;
  2458.         if (numTaskTrackersInHost > 0)  {
  2459.           uniqueHostsMap.put(oldStatus.getHost(), numTaskTrackersInHost);
  2460.         }
  2461.         else {
  2462.           uniqueHostsMap.remove(oldStatus.getHost());
  2463.         }
  2464.       }
  2465.     }
  2466.     if (status != null) {
  2467.       totalMaps += status.countMapTasks();
  2468.       totalReduces += status.countReduceTasks();
  2469.       if (!faultyTrackers.isBlacklisted(status.getHost())) {
  2470.         totalMapTaskCapacity += status.getMaxMapTasks();
  2471.         totalReduceTaskCapacity += status.getMaxReduceTasks();
  2472.       }
  2473.       boolean alreadyPresent = false;
  2474.       if (taskTrackers.containsKey(trackerName)) {
  2475.         alreadyPresent = true;
  2476.       }
  2477.       taskTrackers.put(trackerName, status);
  2478.       if (!alreadyPresent)  {
  2479.         Integer numTaskTrackersInHost = 
  2480.           uniqueHostsMap.get(status.getHost());
  2481.         if (numTaskTrackersInHost == null) {
  2482.           numTaskTrackersInHost = 0;
  2483.         }
  2484.         numTaskTrackersInHost ++;
  2485.         uniqueHostsMap.put(status.getHost(), numTaskTrackersInHost);
  2486.       }
  2487.     }
  2488.     return oldStatus != null;
  2489.   }
  2490.     
  2491.   /**
  2492.    * Process incoming heartbeat messages from the task trackers.
  2493.    */
  2494.   private synchronized boolean processHeartbeat(
  2495.                                  TaskTrackerStatus trackerStatus, 
  2496.                                  boolean initialContact) {
  2497.     String trackerName = trackerStatus.getTrackerName();
  2498.     synchronized (taskTrackers) {
  2499.       synchronized (trackerExpiryQueue) {
  2500.         boolean seenBefore = updateTaskTrackerStatus(trackerName,
  2501.                                                      trackerStatus);
  2502.         if (initialContact) {
  2503.           // If it's first contact, then clear out 
  2504.           // any state hanging around
  2505.           if (seenBefore) {
  2506.             lostTaskTracker(trackerName);
  2507.           }
  2508.         } else {
  2509.           // If not first contact, there should be some record of the tracker
  2510.           if (!seenBefore) {
  2511.             LOG.warn("Status from unknown Tracker : " + trackerName);
  2512.             updateTaskTrackerStatus(trackerName, null);
  2513.             return false;
  2514.           }
  2515.         }
  2516.         if (initialContact) {
  2517.           // if this is lost tracker that came back now, and if it blacklisted
  2518.           // increment the count of blacklisted trackers in the cluster
  2519.           if (isBlacklisted(trackerName)) {
  2520.             faultyTrackers.numBlacklistedTrackers += 1;
  2521.           }
  2522.           addNewTracker(trackerStatus);
  2523.         }
  2524.       }
  2525.     }
  2526.     updateTaskStatuses(trackerStatus);
  2527.     
  2528.     return true;
  2529.   }
  2530.   /**
  2531.    * A tracker wants to know if any of its Tasks have been
  2532.    * closed (because the job completed, whether successfully or not)
  2533.    */
  2534.   private synchronized List<TaskTrackerAction> getTasksToKill(
  2535.                                                               String taskTracker) {
  2536.     
  2537.     Set<TaskAttemptID> taskIds = trackerToTaskMap.get(taskTracker);
  2538.     if (taskIds != null) {
  2539.       List<TaskTrackerAction> killList = new ArrayList<TaskTrackerAction>();
  2540.       for (TaskAttemptID killTaskId : taskIds) {
  2541.         TaskInProgress tip = taskidToTIPMap.get(killTaskId);
  2542.         if (tip == null) {
  2543.           continue;
  2544.         }
  2545.         if (tip.shouldClose(killTaskId)) {
  2546.           // 
  2547.           // This is how the JobTracker ends a task at the TaskTracker.
  2548.           // It may be successfully completed, or may be killed in
  2549.           // mid-execution.
  2550.           //
  2551.           if (!tip.getJob().isComplete()) {
  2552.             killList.add(new KillTaskAction(killTaskId));
  2553.             LOG.debug(taskTracker + " -> KillTaskAction: " + killTaskId);
  2554.           }
  2555.         }
  2556.       }
  2557.             
  2558.       return killList;
  2559.     }
  2560.     return null;
  2561.   }
  2562.   /**
  2563.    * Add a job to cleanup for the tracker.
  2564.    */
  2565.   private void addJobForCleanup(JobID id) {
  2566.     for (String taskTracker : taskTrackers.keySet()) {
  2567.       LOG.debug("Marking job " + id + " for cleanup by tracker " + taskTracker);
  2568.       synchronized (trackerToJobsToCleanup) {
  2569.         Set<JobID> jobsToKill = trackerToJobsToCleanup.get(taskTracker);
  2570.         if (jobsToKill == null) {
  2571.           jobsToKill = new HashSet<JobID>();
  2572.           trackerToJobsToCleanup.put(taskTracker, jobsToKill);
  2573.         }
  2574.         jobsToKill.add(id);
  2575.       }
  2576.     }
  2577.   }
  2578.   
  2579.   /**
  2580.    * A tracker wants to know if any job needs cleanup because the job completed.
  2581.    */
  2582.   private List<TaskTrackerAction> getJobsForCleanup(String taskTracker) {
  2583.     Set<JobID> jobs = null;
  2584.     synchronized (trackerToJobsToCleanup) {
  2585.       jobs = trackerToJobsToCleanup.remove(taskTracker);
  2586.     }
  2587.     if (jobs != null) {
  2588.       // prepare the actions list
  2589.       List<TaskTrackerAction> killList = new ArrayList<TaskTrackerAction>();
  2590.       for (JobID killJobId : jobs) {
  2591.         killList.add(new KillJobAction(killJobId));
  2592.         LOG.debug(taskTracker + " -> KillJobAction: " + killJobId);
  2593.       }
  2594.       return killList;
  2595.     }
  2596.     return null;
  2597.   }
  2598.   /**
  2599.    * A tracker wants to know if any of its Tasks can be committed 
  2600.    */
  2601.   private synchronized List<TaskTrackerAction> getTasksToSave(
  2602.                                                  TaskTrackerStatus tts) {
  2603.     List<TaskStatus> taskStatuses = tts.getTaskReports();
  2604.     if (taskStatuses != null) {
  2605.       List<TaskTrackerAction> saveList = new ArrayList<TaskTrackerAction>();
  2606.       for (TaskStatus taskStatus : taskStatuses) {
  2607.         if (taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING) {
  2608.           TaskAttemptID taskId = taskStatus.getTaskID();
  2609.           TaskInProgress tip = taskidToTIPMap.get(taskId);
  2610.           if (tip == null) {
  2611.             continue;
  2612.           }
  2613.           if (tip.shouldCommit(taskId)) {
  2614.             saveList.add(new CommitTaskAction(taskId));
  2615.             LOG.debug(tts.getTrackerName() + 
  2616.                       " -> CommitTaskAction: " + taskId);
  2617.           }
  2618.         }
  2619.       }
  2620.       return saveList;
  2621.     }
  2622.     return null;
  2623.   }
  2624.   
  2625.   // returns cleanup tasks first, then setup tasks.
  2626.   private synchronized List<Task> getSetupAndCleanupTasks(
  2627.     TaskTrackerStatus taskTracker) throws IOException {
  2628.     int maxMapTasks = taskTracker.getMaxMapTasks();
  2629.     int maxReduceTasks = taskTracker.getMaxReduceTasks();
  2630.     int numMaps = taskTracker.countMapTasks();
  2631.     int numReduces = taskTracker.countReduceTasks();
  2632.     int numTaskTrackers = getClusterStatus().getTaskTrackers();
  2633.     int numUniqueHosts = getNumberOfUniqueHosts();
  2634.     Task t = null;
  2635.     synchronized (jobs) {
  2636.       if (numMaps < maxMapTasks) {
  2637.         for (Iterator<JobInProgress> it = jobs.values().iterator();
  2638.              it.hasNext();) {
  2639.           JobInProgress job = it.next();
  2640.           t = job.obtainJobCleanupTask(taskTracker, numTaskTrackers,
  2641.                                     numUniqueHosts, true);
  2642.           if (t != null) {
  2643.             return Collections.singletonList(t);
  2644.           }
  2645.         }
  2646.         for (Iterator<JobInProgress> it = jobs.values().iterator();
  2647.              it.hasNext();) {
  2648.           JobInProgress job = it.next();
  2649.           t = job.obtainTaskCleanupTask(taskTracker, true);
  2650.           if (t != null) {
  2651.             return Collections.singletonList(t);
  2652.           }
  2653.         }
  2654.         for (Iterator<JobInProgress> it = jobs.values().iterator();
  2655.              it.hasNext();) {
  2656.           JobInProgress job = it.next();
  2657.           t = job.obtainJobSetupTask(taskTracker, numTaskTrackers,
  2658.                                   numUniqueHosts, true);
  2659.           if (t != null) {
  2660.             return Collections.singletonList(t);
  2661.           }
  2662.         }
  2663.       }
  2664.       if (numReduces < maxReduceTasks) {
  2665.         for (Iterator<JobInProgress> it = jobs.values().iterator();
  2666.              it.hasNext();) {
  2667.           JobInProgress job = it.next();
  2668.           t = job.obtainJobCleanupTask(taskTracker, numTaskTrackers,
  2669.                                     numUniqueHosts, false);
  2670.           if (t != null) {
  2671.             return Collections.singletonList(t);
  2672.           }
  2673.         }
  2674.         for (Iterator<JobInProgress> it = jobs.values().iterator();
  2675.              it.hasNext();) {
  2676.           JobInProgress job = it.next();
  2677.           t = job.obtainTaskCleanupTask(taskTracker, false);
  2678.           if (t != null) {
  2679.             return Collections.singletonList(t);
  2680.           }
  2681.         }
  2682.         for (Iterator<JobInProgress> it = jobs.values().iterator();
  2683.              it.hasNext();) {
  2684.           JobInProgress job = it.next();
  2685.           t = job.obtainJobSetupTask(taskTracker, numTaskTrackers,
  2686.                                     numUniqueHosts, false);
  2687.           if (t != null) {
  2688.             return Collections.singletonList(t);
  2689.           }
  2690.         }
  2691.       }
  2692.     }
  2693.     return null;
  2694.   }
  2695.   /**
  2696.    * Grab the local fs name
  2697.    */
  2698.   public synchronized String getFilesystemName() throws IOException {
  2699.     if (fs == null) {
  2700.       throw new IllegalStateException("FileSystem object not available yet");
  2701.     }
  2702.     return fs.getUri().toString();
  2703.   }
  2704.   public void reportTaskTrackerError(String taskTracker,
  2705.                                      String errorClass,
  2706.                                      String errorMessage) throws IOException {
  2707.     LOG.warn("Report from " + taskTracker + ": " + errorMessage);        
  2708.   }
  2709.   /**
  2710.    * Remove the job_ from jobids to get the unique string.
  2711.    */
  2712.   static String getJobUniqueString(String jobid) {
  2713.     return jobid.substring(4);
  2714.   }
  2715.   ////////////////////////////////////////////////////
  2716.   // JobSubmissionProtocol
  2717.   ////////////////////////////////////////////////////
  2718.   /**
  2719.    * Allocates a new JobId string.
  2720.    */
  2721.   public synchronized JobID getNewJobId() throws IOException {
  2722.     return new JobID(getTrackerIdentifier(), nextJobId++);
  2723.   }
  2724.   /**
  2725.    * JobTracker.submitJob() kicks off a new job.  
  2726.    *
  2727.    * Create a 'JobInProgress' object, which contains both JobProfile
  2728.    * and JobStatus.  Those two sub-objects are sometimes shipped outside
  2729.    * of the JobTracker.  But JobInProgress adds info that's useful for
  2730.    * the JobTracker alone.
  2731.    */
  2732.   public synchronized JobStatus submitJob(JobID jobId) throws IOException {
  2733.     if(jobs.containsKey(jobId)) {
  2734.       //job already running, don't start twice
  2735.       return jobs.get(jobId).getStatus();
  2736.     }
  2737.     
  2738.     JobInProgress job = new JobInProgress(jobId, this, this.conf);
  2739.     
  2740.     String queue = job.getProfile().getQueueName();
  2741.     if(!(queueManager.getQueues().contains(queue))) {      
  2742.       new CleanupQueue().addToQueue(conf,getSystemDirectoryForJob(jobId));
  2743.       throw new IOException("Queue "" + queue + "" does not exist");        
  2744.     }
  2745.     // check for access
  2746.     try {
  2747.       checkAccess(job, QueueManager.QueueOperation.SUBMIT_JOB);
  2748.     } catch (IOException ioe) {
  2749.        LOG.warn("Access denied for user " + job.getJobConf().getUser() 
  2750.                 + ". Ignoring job " + jobId, ioe);
  2751.       new CleanupQueue().addToQueue(conf, getSystemDirectoryForJob(jobId));
  2752.       throw ioe;
  2753.     }
  2754.    return addJob(jobId, job); 
  2755.   }
  2756.   /**
  2757.    * Adds a job to the jobtracker. Make sure that the checks are inplace before
  2758.    * adding a job. This is the core job submission logic
  2759.    * @param jobId The id for the job submitted which needs to be added
  2760.    */
  2761.   private synchronized JobStatus addJob(JobID jobId, JobInProgress job) {
  2762.     totalSubmissions++;
  2763.     synchronized (jobs) {
  2764.       synchronized (taskScheduler) {
  2765.         jobs.put(job.getProfile().getJobID(), job);
  2766.         for (JobInProgressListener listener : jobInProgressListeners) {
  2767.           try {
  2768.             listener.jobAdded(job);
  2769.           } catch (IOException ioe) {
  2770.             LOG.warn("Failed to add and so skipping the job : "
  2771.                 + job.getJobID() + ". Exception : " + ioe);
  2772.           }
  2773.         }
  2774.       }
  2775.     }
  2776.     myInstrumentation.submitJob(job.getJobConf(), jobId);
  2777.     return job.getStatus();
  2778.   }
  2779.   // Check whether the specified operation can be performed
  2780.   // related to the job.
  2781.   private void checkAccess(JobInProgress job, 
  2782.                                 QueueManager.QueueOperation oper) 
  2783.                                   throws IOException {
  2784.     // get the user group info
  2785.     UserGroupInformation ugi = UserGroupInformation.getCurrentUGI();
  2786.     checkAccess(job, oper, ugi);
  2787.   }
  2788.   // use the passed ugi for checking the access
  2789.   private void checkAccess(JobInProgress job, QueueManager.QueueOperation oper,
  2790.                            UserGroupInformation ugi) throws IOException {
  2791.     // get the queue
  2792.     String queue = job.getProfile().getQueueName();
  2793.     if (!queueManager.hasAccess(queue, job, oper, ugi)) {
  2794.       throw new AccessControlException("User " 
  2795.                             + ugi.getUserName() 
  2796.                             + " cannot perform "
  2797.                             + "operation " + oper + " on queue " + queue);
  2798.     }
  2799.   }
  2800.   /**@deprecated use {@link #getClusterStatus(boolean)}*/
  2801.   @Deprecated
  2802.   public synchronized ClusterStatus getClusterStatus() {
  2803.     return getClusterStatus(false);
  2804.   }
  2805.   public synchronized ClusterStatus getClusterStatus(boolean detailed) {
  2806.     synchronized (taskTrackers) {
  2807.       if (detailed) {
  2808.         List<List<String>> trackerNames = taskTrackerNames();
  2809.         return new ClusterStatus(trackerNames.get(0),
  2810.             trackerNames.get(1),
  2811.             TASKTRACKER_EXPIRY_INTERVAL,
  2812.             totalMaps,
  2813.             totalReduces,
  2814.             totalMapTaskCapacity,
  2815.             totalReduceTaskCapacity, 
  2816.             state);
  2817.       } else {
  2818.         return new ClusterStatus(taskTrackers.size() - 
  2819.             getBlacklistedTrackerCount(),
  2820.             getBlacklistedTrackerCount(),
  2821.             TASKTRACKER_EXPIRY_INTERVAL,
  2822.             totalMaps,
  2823.             totalReduces,
  2824.             totalMapTaskCapacity,
  2825.             totalReduceTaskCapacity, 
  2826.             state);          
  2827.       }
  2828.     }
  2829.   }
  2830.     
  2831.   public synchronized void killJob(JobID jobid) throws IOException {
  2832.     if (null == jobid) {
  2833.       LOG.info("Null jobid object sent to JobTracker.killJob()");
  2834.       return;
  2835.     }
  2836.     
  2837.     JobInProgress job = jobs.get(jobid);
  2838.     
  2839.     if (null == job) {
  2840.       LOG.info("killJob(): JobId " + jobid.toString() + " is not a valid job");
  2841.       return;
  2842.     }
  2843.         
  2844.     JobStatus prevStatus = (JobStatus)job.getStatus().clone();
  2845.     checkAccess(job, QueueManager.QueueOperation.ADMINISTER_JOBS);
  2846.     job.kill();
  2847.     
  2848.     // Inform the listeners if the job is killed
  2849.     // Note : 
  2850.     //   If the job is killed in the PREP state then the listeners will be 
  2851.     //   invoked
  2852.     //   If the job is killed in the RUNNING state then cleanup tasks will be 
  2853.     //   launched and the updateTaskStatuses() will take care of it
  2854.     JobStatus newStatus = (JobStatus)job.getStatus().clone();
  2855.     if (prevStatus.getRunState() != newStatus.getRunState()
  2856.         && newStatus.getRunState() == JobStatus.KILLED) {
  2857.       JobStatusChangeEvent event = 
  2858.         new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus, 
  2859.             newStatus);
  2860.       updateJobInProgressListeners(event);
  2861.     }
  2862.   }
  2863.   /**
  2864.    * Set the priority of a job
  2865.    * @param jobid id of the job
  2866.    * @param priority new priority of the job
  2867.    */
  2868.   public synchronized void setJobPriority(JobID jobid, 
  2869.                                               String priority)
  2870.                                                 throws IOException {
  2871.     JobInProgress job = jobs.get(jobid);
  2872.     if (null == job) {
  2873.         LOG.info("setJobPriority(): JobId " + jobid.toString()
  2874.             + " is not a valid job");
  2875.         return;
  2876.     }
  2877.     checkAccess(job, QueueManager.QueueOperation.ADMINISTER_JOBS);
  2878.     JobPriority newPriority = JobPriority.valueOf(priority);
  2879.     setJobPriority(jobid, newPriority);
  2880.   }
  2881.                            
  2882.   void storeCompletedJob(JobInProgress job) {
  2883.     //persists the job info in DFS
  2884.     completedJobStatusStore.store(job);
  2885.   }
  2886.   public JobProfile getJobProfile(JobID jobid) {
  2887.     synchronized (this) {
  2888.       JobInProgress job = jobs.get(jobid);
  2889.       if (job != null) {
  2890.         return job.getProfile();
  2891.       } 
  2892.     }
  2893.     return completedJobStatusStore.readJobProfile(jobid);
  2894.   }
  2895.   public JobStatus getJobStatus(JobID jobid) {
  2896.     if (null == jobid) {
  2897.       LOG.warn("JobTracker.getJobStatus() cannot get status for null jobid");
  2898.       return null;
  2899.     }
  2900.     synchronized (this) {
  2901.       JobInProgress job = jobs.get(jobid);
  2902.       if (job != null) {
  2903.         return job.getStatus();
  2904.       } 
  2905.     }
  2906.     return completedJobStatusStore.readJobStatus(jobid);
  2907.   }
  2908.   public Counters getJobCounters(JobID jobid) {
  2909.     synchronized (this) {
  2910.       JobInProgress job = jobs.get(jobid);
  2911.       if (job != null) {
  2912.         return job.getCounters();
  2913.       } 
  2914.     }
  2915.     return completedJobStatusStore.readCounters(jobid);
  2916.   }
  2917.   public synchronized TaskReport[] getMapTaskReports(JobID jobid) {
  2918.     JobInProgress job = jobs.get(jobid);
  2919.     if (job == null) {
  2920.       return new TaskReport[0];
  2921.     } else {
  2922.       Vector<TaskReport> reports = new Vector<TaskReport>();
  2923.       Vector<TaskInProgress> completeMapTasks =
  2924.         job.reportTasksInProgress(true, true);
  2925.       for (Iterator it = completeMapTasks.iterator(); it.hasNext();) {
  2926.         TaskInProgress tip = (TaskInProgress) it.next();
  2927.         reports.add(tip.generateSingleReport());
  2928.       }
  2929.       Vector<TaskInProgress> incompleteMapTasks =
  2930.         job.reportTasksInProgress(true, false);
  2931.       for (Iterator it = incompleteMapTasks.iterator(); it.hasNext();) {
  2932.         TaskInProgress tip = (TaskInProgress) it.next();
  2933.         reports.add(tip.generateSingleReport());
  2934.       }
  2935.       return reports.toArray(new TaskReport[reports.size()]);
  2936.     }
  2937.   }
  2938.   public synchronized TaskReport[] getReduceTaskReports(JobID jobid) {
  2939.     JobInProgress job = jobs.get(jobid);
  2940.     if (job == null) {
  2941.       return new TaskReport[0];
  2942.     } else {
  2943.       Vector<TaskReport> reports = new Vector<TaskReport>();
  2944.       Vector completeReduceTasks = job.reportTasksInProgress(false, true);
  2945.       for (Iterator it = completeReduceTasks.iterator(); it.hasNext();) {
  2946.         TaskInProgress tip = (TaskInProgress) it.next();
  2947.         reports.add(tip.generateSingleReport());
  2948.       }
  2949.       Vector incompleteReduceTasks = job.reportTasksInProgress(false, false);
  2950.       for (Iterator it = incompleteReduceTasks.iterator(); it.hasNext();) {
  2951.         TaskInProgress tip = (TaskInProgress) it.next();
  2952.         reports.add(tip.generateSingleReport());
  2953.       }
  2954.       return reports.toArray(new TaskReport[reports.size()]);
  2955.     }
  2956.   }
  2957.   public synchronized TaskReport[] getCleanupTaskReports(JobID jobid) {
  2958.     JobInProgress job = jobs.get(jobid);
  2959.     if (job == null) {
  2960.       return new TaskReport[0];
  2961.     } else {
  2962.       Vector<TaskReport> reports = new Vector<TaskReport>();
  2963.       Vector<TaskInProgress> completeTasks = job.reportCleanupTIPs(true);
  2964.       for (Iterator<TaskInProgress> it = completeTasks.iterator();
  2965.            it.hasNext();) {
  2966.         TaskInProgress tip = (TaskInProgress) it.next();
  2967.         reports.add(tip.generateSingleReport());
  2968.       }
  2969.       Vector<TaskInProgress> incompleteTasks = job.reportCleanupTIPs(false);
  2970.       for (Iterator<TaskInProgress> it = incompleteTasks.iterator(); 
  2971.            it.hasNext();) {
  2972.         TaskInProgress tip = (TaskInProgress) it.next();
  2973.         reports.add(tip.generateSingleReport());
  2974.       }
  2975.       return reports.toArray(new TaskReport[reports.size()]);
  2976.     }
  2977.   
  2978.   }
  2979.   
  2980.   public synchronized TaskReport[] getSetupTaskReports(JobID jobid) {
  2981.     JobInProgress job = jobs.get(jobid);
  2982.     if (job == null) {
  2983.       return new TaskReport[0];
  2984.     } else {
  2985.       Vector<TaskReport> reports = new Vector<TaskReport>();
  2986.       Vector<TaskInProgress> completeTasks = job.reportSetupTIPs(true);
  2987.       for (Iterator<TaskInProgress> it = completeTasks.iterator();
  2988.            it.hasNext();) {
  2989.         TaskInProgress tip = (TaskInProgress) it.next();
  2990.         reports.add(tip.generateSingleReport());
  2991.       }
  2992.       Vector<TaskInProgress> incompleteTasks = job.reportSetupTIPs(false);
  2993.       for (Iterator<TaskInProgress> it = incompleteTasks.iterator(); 
  2994.            it.hasNext();) {
  2995.         TaskInProgress tip = (TaskInProgress) it.next();
  2996.         reports.add(tip.generateSingleReport());
  2997.       }
  2998.       return reports.toArray(new TaskReport[reports.size()]);
  2999.     }
  3000.   }
  3001.   
  3002.   TaskCompletionEvent[] EMPTY_EVENTS = new TaskCompletionEvent[0];
  3003.   
  3004.   /* 
  3005.    * Returns a list of TaskCompletionEvent for the given job, 
  3006.    * starting from fromEventId.
  3007.    * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getTaskCompletionEvents(java.lang.String, int, int)
  3008.    */
  3009.   public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
  3010.       JobID jobid, int fromEventId, int maxEvents) throws IOException{
  3011.     synchronized (this) {
  3012.       JobInProgress job = this.jobs.get(jobid);
  3013.       if (null != job) {
  3014.         if (job.inited()) {
  3015.           return job.getTaskCompletionEvents(fromEventId, maxEvents);
  3016.         } else {
  3017.           return EMPTY_EVENTS;
  3018.         }
  3019.       }
  3020.     }
  3021.     return completedJobStatusStore.readJobTaskCompletionEvents(jobid, fromEventId, maxEvents);
  3022.   }
  3023.   /**
  3024.    * Get the diagnostics for a given task
  3025.    * @param taskId the id of the task
  3026.    * @return an array of the diagnostic messages
  3027.    */
  3028.   public synchronized String[] getTaskDiagnostics(TaskAttemptID taskId)  
  3029.     throws IOException {
  3030.     
  3031.     JobID jobId = taskId.getJobID();
  3032.     TaskID tipId = taskId.getTaskID();
  3033.     JobInProgress job = jobs.get(jobId);
  3034.     if (job == null) {
  3035.       throw new IllegalArgumentException("Job " + jobId + " not found.");
  3036.     }
  3037.     TaskInProgress tip = job.getTaskInProgress(tipId);
  3038.     if (tip == null) {
  3039.       throw new IllegalArgumentException("TIP " + tipId + " not found.");
  3040.     }
  3041.     List<String> taskDiagnosticInfo = tip.getDiagnosticInfo(taskId);
  3042.     return ((taskDiagnosticInfo == null) ? null 
  3043.             : taskDiagnosticInfo.toArray(new String[0]));
  3044.   }
  3045.     
  3046.   /** Get all the TaskStatuses from the tipid. */
  3047.   TaskStatus[] getTaskStatuses(TaskID tipid) {
  3048.     TaskInProgress tip = getTip(tipid);
  3049.     return (tip == null ? new TaskStatus[0] 
  3050.             : tip.getTaskStatuses());
  3051.   }
  3052.   /** Returns the TaskStatus for a particular taskid. */
  3053.   TaskStatus getTaskStatus(TaskAttemptID taskid) {
  3054.     TaskInProgress tip = getTip(taskid.getTaskID());
  3055.     return (tip == null ? null 
  3056.             : tip.getTaskStatus(taskid));
  3057.   }
  3058.     
  3059.   /**
  3060.    * Returns the counters for the specified task in progress.
  3061.    */
  3062.   Counters getTipCounters(TaskID tipid) {
  3063.     TaskInProgress tip = getTip(tipid);
  3064.     return (tip == null ? null : tip.getCounters());
  3065.   }
  3066.   /**
  3067.    * Returns the configured task scheduler for this job tracker.
  3068.    * @return the configured task scheduler
  3069.    */
  3070.   TaskScheduler getTaskScheduler() {
  3071.     return taskScheduler;
  3072.   }
  3073.   
  3074.   /**
  3075.    * Returns specified TaskInProgress, or null.
  3076.    */
  3077.   public TaskInProgress getTip(TaskID tipid) {
  3078.     JobInProgress job = jobs.get(tipid.getJobID());
  3079.     return (job == null ? null : job.getTaskInProgress(tipid));
  3080.   }
  3081.     
  3082.   /** Mark a Task to be killed */
  3083.   public synchronized boolean killTask(TaskAttemptID taskid, boolean shouldFail) throws IOException{
  3084.     TaskInProgress tip = taskidToTIPMap.get(taskid);
  3085.     if(tip != null) {
  3086.       checkAccess(tip.getJob(), QueueManager.QueueOperation.ADMINISTER_JOBS);
  3087.       return tip.killTask(taskid, shouldFail);
  3088.     }
  3089.     else {
  3090.       LOG.info("Kill task attempt failed since task " + taskid + " was not found");
  3091.       return false;
  3092.     }
  3093.   }
  3094.   
  3095.   /**
  3096.    * Get tracker name for a given task id.
  3097.    * @param taskId the name of the task
  3098.    * @return The name of the task tracker
  3099.    */
  3100.   public synchronized String getAssignedTracker(TaskAttemptID taskId) {
  3101.     return taskidToTrackerMap.get(taskId);
  3102.   }
  3103.     
  3104.   public JobStatus[] jobsToComplete() {
  3105.     return getJobStatus(jobs.values(), true);
  3106.   } 
  3107.   
  3108.   public JobStatus[] getAllJobs() {
  3109.     return getJobStatus(jobs.values(),false);
  3110.   }
  3111.     
  3112.   /**
  3113.    * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getSystemDir()
  3114.    */
  3115.   public String getSystemDir() {
  3116.     Path sysDir = new Path(conf.get("mapred.system.dir", "/tmp/hadoop/mapred/system"));  
  3117.     return fs.makeQualified(sysDir).toString();
  3118.   }
  3119.   
  3120.   ///////////////////////////////////////////////////////////////
  3121.   // JobTracker methods
  3122.   ///////////////////////////////////////////////////////////////
  3123.   public JobInProgress getJob(JobID jobid) {
  3124.     return jobs.get(jobid);
  3125.   }
  3126.   // Get the job directory in system directory
  3127.   Path getSystemDirectoryForJob(JobID id) {
  3128.     return new Path(getSystemDir(), id.toString());
  3129.   }
  3130.   /**
  3131.    * Change the run-time priority of the given job.
  3132.    * @param jobId job id
  3133.    * @param priority new {@link JobPriority} for the job
  3134.    */
  3135.   synchronized void setJobPriority(JobID jobId, JobPriority priority) {
  3136.     JobInProgress job = jobs.get(jobId);
  3137.     if (job != null) {
  3138.       synchronized (taskScheduler) {
  3139.         JobStatus oldStatus = (JobStatus)job.getStatus().clone();
  3140.         job.setPriority(priority);
  3141.         JobStatus newStatus = (JobStatus)job.getStatus().clone();
  3142.         JobStatusChangeEvent event = 
  3143.           new JobStatusChangeEvent(job, EventType.PRIORITY_CHANGED, oldStatus, 
  3144.                                    newStatus);
  3145.         updateJobInProgressListeners(event);
  3146.       }
  3147.     } else {
  3148.       LOG.warn("Trying to change the priority of an unknown job: " + jobId);
  3149.     }
  3150.   }
  3151.   
  3152.   ////////////////////////////////////////////////////
  3153.   // Methods to track all the TaskTrackers
  3154.   ////////////////////////////////////////////////////
  3155.   /**
  3156.    * Accept and process a new TaskTracker profile.  We might
  3157.    * have known about the TaskTracker previously, or it might
  3158.    * be brand-new.  All task-tracker structures have already
  3159.    * been updated.  Just process the contained tasks and any
  3160.    * jobs that might be affected.
  3161.    */
  3162.   void updateTaskStatuses(TaskTrackerStatus status) {
  3163.     String trackerName = status.getTrackerName();
  3164.     for (TaskStatus report : status.getTaskReports()) {
  3165.       report.setTaskTracker(trackerName);
  3166.       TaskAttemptID taskId = report.getTaskID();
  3167.       
  3168.       // expire it
  3169.       expireLaunchingTasks.removeTask(taskId);
  3170.       
  3171.       JobInProgress job = getJob(taskId.getJobID());
  3172.       if (job == null) {
  3173.         // if job is not there in the cleanup list ... add it
  3174.         synchronized (trackerToJobsToCleanup) {
  3175.           Set<JobID> jobs = trackerToJobsToCleanup.get(trackerName);
  3176.           if (jobs == null) {
  3177.             jobs = new HashSet<JobID>();
  3178.             trackerToJobsToCleanup.put(trackerName, jobs);
  3179.           }
  3180.           jobs.add(taskId.getJobID());
  3181.         }
  3182.         continue;
  3183.       }
  3184.       
  3185.       TaskInProgress tip = taskidToTIPMap.get(taskId);
  3186.       // Check if the tip is known to the jobtracker. In case of a restarted
  3187.       // jt, some tasks might join in later
  3188.       if (tip != null || hasRestarted()) {
  3189.         if (tip == null) {
  3190.           tip = job.getTaskInProgress(taskId.getTaskID());
  3191.           job.addRunningTaskToTIP(tip, taskId, status, false);
  3192.         }
  3193.         
  3194.         // Update the job and inform the listeners if necessary
  3195.         JobStatus prevStatus = (JobStatus)job.getStatus().clone();
  3196.         // Clone TaskStatus object here, because JobInProgress
  3197.         // or TaskInProgress can modify this object and
  3198.         // the changes should not get reflected in TaskTrackerStatus.
  3199.         // An old TaskTrackerStatus is used later in countMapTasks, etc.
  3200.         job.updateTaskStatus(tip, (TaskStatus)report.clone());
  3201.         JobStatus newStatus = (JobStatus)job.getStatus().clone();
  3202.         
  3203.         // Update the listeners if an incomplete job completes
  3204.         if (prevStatus.getRunState() != newStatus.getRunState()) {
  3205.           JobStatusChangeEvent event = 
  3206.             new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, 
  3207.                                      prevStatus, newStatus);
  3208.           updateJobInProgressListeners(event);
  3209.         }
  3210.       } else {
  3211.         LOG.info("Serious problem.  While updating status, cannot find taskid " 
  3212.                  + report.getTaskID());
  3213.       }
  3214.       
  3215.       // Process 'failed fetch' notifications 
  3216.       List<TaskAttemptID> failedFetchMaps = report.getFetchFailedMaps();
  3217.       if (failedFetchMaps != null) {
  3218.         for (TaskAttemptID mapTaskId : failedFetchMaps) {
  3219.           TaskInProgress failedFetchMap = taskidToTIPMap.get(mapTaskId);
  3220.           
  3221.           if (failedFetchMap != null) {
  3222.             // Gather information about the map which has to be failed, if need be
  3223.             String failedFetchTrackerName = getAssignedTracker(mapTaskId);
  3224.             if (failedFetchTrackerName == null) {
  3225.               failedFetchTrackerName = "Lost task tracker";
  3226.             }
  3227.             failedFetchMap.getJob().fetchFailureNotification(failedFetchMap, 
  3228.                                                              mapTaskId, 
  3229.                                                              failedFetchTrackerName);
  3230.           }
  3231.         }
  3232.       }
  3233.     }
  3234.   }
  3235.   /**
  3236.    * We lost the task tracker!  All task-tracker structures have 
  3237.    * already been updated.  Just process the contained tasks and any
  3238.    * jobs that might be affected.
  3239.    */
  3240.   void lostTaskTracker(String trackerName) {
  3241.     LOG.info("Lost tracker '" + trackerName + "'");
  3242.     
  3243.     // remove the tracker from the local structures
  3244.     synchronized (trackerToJobsToCleanup) {
  3245.       trackerToJobsToCleanup.remove(trackerName);
  3246.     }
  3247.     
  3248.     // Inform the recovery manager
  3249.     recoveryManager.unMarkTracker(trackerName);
  3250.     
  3251.     Set<TaskAttemptID> lostTasks = trackerToTaskMap.get(trackerName);
  3252.     trackerToTaskMap.remove(trackerName);
  3253.     if (lostTasks != null) {
  3254.       // List of jobs which had any of their tasks fail on this tracker
  3255.       Set<JobInProgress> jobsWithFailures = new HashSet<JobInProgress>(); 
  3256.       for (TaskAttemptID taskId : lostTasks) {
  3257.         TaskInProgress tip = taskidToTIPMap.get(taskId);
  3258.         JobInProgress job = tip.getJob();
  3259.         // Completed reduce tasks never need to be failed, because 
  3260.         // their outputs go to dfs
  3261.         // And completed maps with zero reducers of the job 
  3262.         // never need to be failed. 
  3263.         if (!tip.isComplete() || 
  3264.             (tip.isMapTask() && !tip.isJobSetupTask() && 
  3265.              job.desiredReduces() != 0)) {
  3266.           // if the job is done, we don't want to change anything
  3267.           if (job.getStatus().getRunState() == JobStatus.RUNNING ||
  3268.               job.getStatus().getRunState() == JobStatus.PREP) {
  3269.             // the state will be KILLED_UNCLEAN, if the task(map or reduce) 
  3270.             // was RUNNING on the tracker
  3271.             TaskStatus.State killState = (tip.isRunningTask(taskId) && 
  3272.               !tip.isJobSetupTask() && !tip.isJobCleanupTask()) ? 
  3273.               TaskStatus.State.KILLED_UNCLEAN : TaskStatus.State.KILLED;
  3274.             job.failedTask(tip, taskId, ("Lost task tracker: " + trackerName), 
  3275.                            (tip.isMapTask() ? 
  3276.                                TaskStatus.Phase.MAP : 
  3277.                                TaskStatus.Phase.REDUCE), 
  3278.                             killState,
  3279.                             trackerName);
  3280.             jobsWithFailures.add(job);
  3281.           }
  3282.         } else {
  3283.           // Completed 'reduce' task and completed 'maps' with zero 
  3284.           // reducers of the job, not failed;
  3285.           // only removed from data-structures.
  3286.           markCompletedTaskAttempt(trackerName, taskId);
  3287.         }
  3288.       }
  3289.       
  3290.       // Penalize this tracker for each of the jobs which   
  3291.       // had any tasks running on it when it was 'lost' 
  3292.       for (JobInProgress job : jobsWithFailures) {
  3293.         job.addTrackerTaskFailure(trackerName);
  3294.       }
  3295.       
  3296.       // Purge 'marked' tasks, needs to be done  
  3297.       // here to prevent hanging references!
  3298.       removeMarkedTasks(trackerName);
  3299.     }
  3300.   }
  3301.   
  3302.   /**
  3303.    * Get the localized job file path on the job trackers local file system
  3304.    * @param jobId id of the job
  3305.    * @return the path of the job conf file on the local file system
  3306.    */
  3307.   public static String getLocalJobFilePath(JobID jobId){
  3308.     return JobHistory.JobInfo.getLocalJobFilePath(jobId);
  3309.   }
  3310.   ////////////////////////////////////////////////////////////
  3311.   // main()
  3312.   ////////////////////////////////////////////////////////////
  3313.   /**
  3314.    * Start the JobTracker process.  This is used only for debugging.  As a rule,
  3315.    * JobTracker should be run as part of the DFS Namenode process.
  3316.    */
  3317.   public static void main(String argv[]
  3318.                           ) throws IOException, InterruptedException {
  3319.     StringUtils.startupShutdownMessage(JobTracker.class, argv, LOG);
  3320.     if (argv.length != 0) {
  3321.       System.out.println("usage: JobTracker");
  3322.       System.exit(-1);
  3323.     }
  3324.       
  3325.     try {
  3326.       JobTracker tracker = startTracker(new JobConf());
  3327.       tracker.offerService();
  3328.     } catch (Throwable e) {
  3329.       LOG.fatal(StringUtils.stringifyException(e));
  3330.       System.exit(-1);
  3331.     }
  3332.   }
  3333.   @Override
  3334.   public JobQueueInfo[] getQueues() throws IOException {
  3335.     return queueManager.getJobQueueInfos();
  3336.   }
  3337.   @Override
  3338.   public JobQueueInfo getQueueInfo(String queue) throws IOException {
  3339.     return queueManager.getJobQueueInfo(queue);
  3340.   }
  3341.   @Override
  3342.   public JobStatus[] getJobsFromQueue(String queue) throws IOException {
  3343.     Collection<JobInProgress> jips = taskScheduler.getJobs(queue);
  3344.     return getJobStatus(jips,false);
  3345.   }
  3346.   
  3347.   private synchronized JobStatus[] getJobStatus(Collection<JobInProgress> jips,
  3348.       boolean toComplete) {
  3349.     if(jips == null || jips.isEmpty()) {
  3350.       return new JobStatus[]{};
  3351.     }
  3352.     ArrayList<JobStatus> jobStatusList = new ArrayList<JobStatus>();
  3353.     for(JobInProgress jip : jips) {
  3354.       JobStatus status = jip.getStatus();
  3355.       status.setStartTime(jip.getStartTime());
  3356.       status.setUsername(jip.getProfile().getUser());
  3357.       if(toComplete) {
  3358.         if(status.getRunState() == JobStatus.RUNNING || 
  3359.             status.getRunState() == JobStatus.PREP) {
  3360.           jobStatusList.add(status);
  3361.         }
  3362.       }else {
  3363.         jobStatusList.add(status);
  3364.       }
  3365.     }
  3366.     return (JobStatus[]) jobStatusList.toArray(
  3367.         new JobStatus[jobStatusList.size()]);
  3368.   }
  3369.   /**
  3370.    * Returns the confgiured maximum number of tasks for a single job
  3371.    */
  3372.   int getMaxTasksPerJob() {
  3373.     return conf.getInt("mapred.jobtracker.maxtasks.per.job", -1);
  3374.   }
  3375.   
  3376.   @Override
  3377.   public void refreshServiceAcl() throws IOException {
  3378.     if (!conf.getBoolean(
  3379.             ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) {
  3380.       throw new AuthorizationException("Service Level Authorization not enabled!");
  3381.     }
  3382.     SecurityUtil.getPolicy().refresh();
  3383.   }
  3384. }