JobInProgress.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:91k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.DataInputStream;
  20. import java.io.IOException;
  21. import java.util.ArrayList;
  22. import java.util.Collection;
  23. import java.util.IdentityHashMap;
  24. import java.util.Iterator;
  25. import java.util.LinkedHashSet;
  26. import java.util.LinkedList;
  27. import java.util.List;
  28. import java.util.Map;
  29. import java.util.Set;
  30. import java.util.TreeMap;
  31. import java.util.Vector;
  32. import java.util.concurrent.atomic.AtomicBoolean;
  33. import org.apache.commons.logging.Log;
  34. import org.apache.commons.logging.LogFactory;
  35. import org.apache.hadoop.fs.FileSystem;
  36. import org.apache.hadoop.fs.LocalFileSystem;
  37. import org.apache.hadoop.fs.Path;
  38. import org.apache.hadoop.mapred.JobHistory.Values;
  39. import org.apache.hadoop.metrics.MetricsContext;
  40. import org.apache.hadoop.metrics.MetricsRecord;
  41. import org.apache.hadoop.metrics.MetricsUtil;
  42. import org.apache.hadoop.net.NetUtils;
  43. import org.apache.hadoop.net.NetworkTopology;
  44. import org.apache.hadoop.net.Node;
  45. import org.apache.hadoop.util.StringUtils;
  46. /*************************************************************
  47.  * JobInProgress maintains all the info for keeping
  48.  * a Job on the straight and narrow.  It keeps its JobProfile
  49.  * and its latest JobStatus, plus a set of tables for 
  50.  * doing bookkeeping of its Tasks.
  51.  * ***********************************************************
  52.  */
  53. class JobInProgress {
  54.   static final Log LOG = LogFactory.getLog(JobInProgress.class);
  55.     
  56.   JobProfile profile;
  57.   JobStatus status;
  58.   Path jobFile = null;
  59.   Path localJobFile = null;
  60.   Path localJarFile = null;
  61.   TaskInProgress maps[] = new TaskInProgress[0];
  62.   TaskInProgress reduces[] = new TaskInProgress[0];
  63.   TaskInProgress cleanup[] = new TaskInProgress[0];
  64.   TaskInProgress setup[] = new TaskInProgress[0];
  65.   int numMapTasks = 0;
  66.   int numReduceTasks = 0;
  67.   
  68.   // Counters to track currently running/finished/failed Map/Reduce task-attempts
  69.   int runningMapTasks = 0;
  70.   int runningReduceTasks = 0;
  71.   int finishedMapTasks = 0;
  72.   int finishedReduceTasks = 0;
  73.   int failedMapTasks = 0; 
  74.   int failedReduceTasks = 0;
  75.   
  76.   private static float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 0.05f;
  77.   int completedMapsForReduceSlowstart = 0;
  78.   
  79.   // runningMapTasks include speculative tasks, so we need to capture 
  80.   // speculative tasks separately 
  81.   int speculativeMapTasks = 0;
  82.   int speculativeReduceTasks = 0;
  83.   
  84.   int mapFailuresPercent = 0;
  85.   int reduceFailuresPercent = 0;
  86.   int failedMapTIPs = 0;
  87.   int failedReduceTIPs = 0;
  88.   private volatile boolean launchedCleanup = false;
  89.   private volatile boolean launchedSetup = false;
  90.   private volatile boolean jobKilled = false;
  91.   private volatile boolean jobFailed = false;
  92.   JobPriority priority = JobPriority.NORMAL;
  93.   final JobTracker jobtracker;
  94.   // NetworkTopology Node to the set of TIPs
  95.   Map<Node, List<TaskInProgress>> nonRunningMapCache;
  96.   
  97.   // Map of NetworkTopology Node to set of running TIPs
  98.   Map<Node, Set<TaskInProgress>> runningMapCache;
  99.   // A list of non-local non-running maps
  100.   List<TaskInProgress> nonLocalMaps;
  101.   // A set of non-local running maps
  102.   Set<TaskInProgress> nonLocalRunningMaps;
  103.   // A list of non-running reduce TIPs
  104.   List<TaskInProgress> nonRunningReduces;
  105.   // A set of running reduce TIPs
  106.   Set<TaskInProgress> runningReduces;
  107.   
  108.   // A list of cleanup tasks for the map task attempts, to be launched
  109.   List<TaskAttemptID> mapCleanupTasks = new LinkedList<TaskAttemptID>();
  110.   
  111.   // A list of cleanup tasks for the reduce task attempts, to be launched
  112.   List<TaskAttemptID> reduceCleanupTasks = new LinkedList<TaskAttemptID>();
  113.   private final int maxLevel;
  114.   /**
  115.    * A special value indicating that 
  116.    * {@link #findNewMapTask(TaskTrackerStatus, int, int, int, double)} should
  117.    * schedule any available map tasks for this job, including speculative tasks.
  118.    */
  119.   private final int anyCacheLevel;
  120.   
  121.   /**
  122.    * A special value indicating that 
  123.    * {@link #findNewMapTask(TaskTrackerStatus, int, int, int, double)} should
  124.    * schedule any only off-switch and speculative map tasks for this job.
  125.    */
  126.   private static final int NON_LOCAL_CACHE_LEVEL = -1;
  127.   private int taskCompletionEventTracker = 0; 
  128.   List<TaskCompletionEvent> taskCompletionEvents;
  129.     
  130.   // The maximum percentage of trackers in cluster added to the 'blacklist'.
  131.   private static final double CLUSTER_BLACKLIST_PERCENT = 0.25;
  132.   
  133.   // The maximum percentage of fetch failures allowed for a map 
  134.   private static final double MAX_ALLOWED_FETCH_FAILURES_PERCENT = 0.5;
  135.   
  136.   // No. of tasktrackers in the cluster
  137.   private volatile int clusterSize = 0;
  138.   
  139.   // The no. of tasktrackers where >= conf.getMaxTaskFailuresPerTracker()
  140.   // tasks have failed
  141.   private volatile int flakyTaskTrackers = 0;
  142.   // Map of trackerHostName -> no. of task failures
  143.   private Map<String, Integer> trackerToFailuresMap = 
  144.     new TreeMap<String, Integer>();
  145.     
  146.   //Confine estimation algorithms to an "oracle" class that JIP queries.
  147.   private ResourceEstimator resourceEstimator; 
  148.   
  149.   long startTime;
  150.   long launchTime;
  151.   long finishTime;
  152.   
  153.   // Indicates how many times the job got restarted
  154.   private final int restartCount;
  155.   private JobConf conf;
  156.   AtomicBoolean tasksInited = new AtomicBoolean(false);
  157.   private JobInitKillStatus jobInitKillStatus = new JobInitKillStatus();
  158.   private LocalFileSystem localFs;
  159.   private JobID jobId;
  160.   private boolean hasSpeculativeMaps;
  161.   private boolean hasSpeculativeReduces;
  162.   private long inputLength = 0;
  163.   private long maxVirtualMemoryForTask;
  164.   private long maxPhysicalMemoryForTask;
  165.   
  166.   // Per-job counters
  167.   public static enum Counter { 
  168.     NUM_FAILED_MAPS, 
  169.     NUM_FAILED_REDUCES,
  170.     TOTAL_LAUNCHED_MAPS,
  171.     TOTAL_LAUNCHED_REDUCES,
  172.     OTHER_LOCAL_MAPS,
  173.     DATA_LOCAL_MAPS,
  174.     RACK_LOCAL_MAPS
  175.   }
  176.   private Counters jobCounters = new Counters();
  177.   
  178.   private MetricsRecord jobMetrics;
  179.   
  180.   // Maximum no. of fetch-failure notifications after which
  181.   // the map task is killed
  182.   private static final int MAX_FETCH_FAILURES_NOTIFICATIONS = 3;
  183.   
  184.   // Map of mapTaskId -> no. of fetch failures
  185.   private Map<TaskAttemptID, Integer> mapTaskIdToFetchFailuresMap =
  186.     new TreeMap<TaskAttemptID, Integer>();
  187.   private Object schedulingInfo;
  188.   
  189.   /**
  190.    * Create an almost empty JobInProgress, which can be used only for tests
  191.    */
  192.   protected JobInProgress(JobID jobid, JobConf conf) {
  193.     this.conf = conf;
  194.     this.jobId = jobid;
  195.     this.numMapTasks = conf.getNumMapTasks();
  196.     this.numReduceTasks = conf.getNumReduceTasks();
  197.     this.maxLevel = NetworkTopology.DEFAULT_HOST_LEVEL;
  198.     this.anyCacheLevel = this.maxLevel+1;
  199.     this.jobtracker = null;
  200.     this.restartCount = 0;
  201.   }
  202.   
  203.   /**
  204.    * Create a JobInProgress with the given job file, plus a handle
  205.    * to the tracker.
  206.    */
  207.   public JobInProgress(JobID jobid, JobTracker jobtracker, 
  208.                        JobConf default_conf) throws IOException {
  209.     this(jobid, jobtracker, default_conf, 0);
  210.   }
  211.   
  212.   public JobInProgress(JobID jobid, JobTracker jobtracker, 
  213.                        JobConf default_conf, int rCount) throws IOException {
  214.     this.restartCount = rCount;
  215.     this.jobId = jobid;
  216.     String url = "http://" + jobtracker.getJobTrackerMachine() + ":" 
  217.         + jobtracker.getInfoPort() + "/jobdetails.jsp?jobid=" + jobid;
  218.     this.jobtracker = jobtracker;
  219.     this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP);
  220.     this.startTime = System.currentTimeMillis();
  221.     status.setStartTime(startTime);
  222.     this.localFs = FileSystem.getLocal(default_conf);
  223.     JobConf default_job_conf = new JobConf(default_conf);
  224.     this.localJobFile = default_job_conf.getLocalPath(JobTracker.SUBDIR 
  225.                                                       +"/"+jobid + ".xml");
  226.     this.localJarFile = default_job_conf.getLocalPath(JobTracker.SUBDIR
  227.                                                       +"/"+ jobid + ".jar");
  228.     Path jobDir = jobtracker.getSystemDirectoryForJob(jobId);
  229.     FileSystem fs = jobDir.getFileSystem(default_conf);
  230.     jobFile = new Path(jobDir, "job.xml");
  231.     fs.copyToLocalFile(jobFile, localJobFile);
  232.     conf = new JobConf(localJobFile);
  233.     this.priority = conf.getJobPriority();
  234.     this.status.setJobPriority(this.priority);
  235.     this.profile = new JobProfile(conf.getUser(), jobid, 
  236.                                   jobFile.toString(), url, conf.getJobName(),
  237.                                   conf.getQueueName());
  238.     String jarFile = conf.getJar();
  239.     if (jarFile != null) {
  240.       fs.copyToLocalFile(new Path(jarFile), localJarFile);
  241.       conf.setJar(localJarFile.toString());
  242.     }
  243.     this.numMapTasks = conf.getNumMapTasks();
  244.     this.numReduceTasks = conf.getNumReduceTasks();
  245.     this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>
  246.        (numMapTasks + numReduceTasks + 10);
  247.     this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent();
  248.     this.reduceFailuresPercent = conf.getMaxReduceTaskFailuresPercent();
  249.         
  250.     MetricsContext metricsContext = MetricsUtil.getContext("mapred");
  251.     this.jobMetrics = MetricsUtil.createRecord(metricsContext, "job");
  252.     this.jobMetrics.setTag("user", conf.getUser());
  253.     this.jobMetrics.setTag("sessionId", conf.getSessionId());
  254.     this.jobMetrics.setTag("jobName", conf.getJobName());
  255.     this.jobMetrics.setTag("jobId", jobid.toString());
  256.     hasSpeculativeMaps = conf.getMapSpeculativeExecution();
  257.     hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
  258.     this.maxLevel = jobtracker.getNumTaskCacheLevels();
  259.     this.anyCacheLevel = this.maxLevel+1;
  260.     this.nonLocalMaps = new LinkedList<TaskInProgress>();
  261.     this.nonLocalRunningMaps = new LinkedHashSet<TaskInProgress>();
  262.     this.runningMapCache = new IdentityHashMap<Node, Set<TaskInProgress>>();
  263.     this.nonRunningReduces = new LinkedList<TaskInProgress>();    
  264.     this.runningReduces = new LinkedHashSet<TaskInProgress>();
  265.     this.resourceEstimator = new ResourceEstimator(this);
  266.     setMaxVirtualMemoryForTask(conf.getMaxVirtualMemoryForTask());
  267.     setMaxPhysicalMemoryForTask(conf.getMaxPhysicalMemoryForTask());
  268.   }
  269.   /**
  270.    * Called periodically by JobTrackerMetrics to update the metrics for
  271.    * this job.
  272.    */
  273.   public void updateMetrics() {
  274.     Counters counters = getCounters();
  275.     for (Counters.Group group : counters) {
  276.       jobMetrics.setTag("group", group.getDisplayName());
  277.       for (Counters.Counter counter : group) {
  278.         jobMetrics.setTag("counter", counter.getDisplayName());
  279.         jobMetrics.setMetric("value", (float) counter.getCounter());
  280.         jobMetrics.update();
  281.       }
  282.     }
  283.   }
  284.     
  285.   /**
  286.    * Called when the job is complete
  287.    */
  288.   public void cleanUpMetrics() {
  289.     // Deletes all metric data for this job (in internal table in metrics package).
  290.     // This frees up RAM and possibly saves network bandwidth, since otherwise
  291.     // the metrics package implementation might continue to send these job metrics
  292.     // after the job has finished.
  293.     jobMetrics.removeTag("group");
  294.     jobMetrics.removeTag("counter");
  295.     jobMetrics.remove();
  296.   }
  297.     
  298.   private void printCache (Map<Node, List<TaskInProgress>> cache) {
  299.     LOG.info("The taskcache info:");
  300.     for (Map.Entry<Node, List<TaskInProgress>> n : cache.entrySet()) {
  301.       List <TaskInProgress> tips = n.getValue();
  302.       LOG.info("Cached TIPs on node: " + n.getKey());
  303.       for (TaskInProgress tip : tips) {
  304.         LOG.info("tip : " + tip.getTIPId());
  305.       }
  306.     }
  307.   }
  308.   
  309.   private Map<Node, List<TaskInProgress>> createCache(
  310.                          JobClient.RawSplit[] splits, int maxLevel) {
  311.     Map<Node, List<TaskInProgress>> cache = 
  312.       new IdentityHashMap<Node, List<TaskInProgress>>(maxLevel);
  313.     
  314.     for (int i = 0; i < splits.length; i++) {
  315.       String[] splitLocations = splits[i].getLocations();
  316.       if (splitLocations.length == 0) {
  317.         nonLocalMaps.add(maps[i]);
  318.         continue;
  319.       }
  320.       for(String host: splitLocations) {
  321.         Node node = jobtracker.resolveAndAddToTopology(host);
  322.         LOG.info("tip:" + maps[i].getTIPId() + " has split on node:" + node);
  323.         for (int j = 0; j < maxLevel; j++) {
  324.           List<TaskInProgress> hostMaps = cache.get(node);
  325.           if (hostMaps == null) {
  326.             hostMaps = new ArrayList<TaskInProgress>();
  327.             cache.put(node, hostMaps);
  328.             hostMaps.add(maps[i]);
  329.           }
  330.           //check whether the hostMaps already contains an entry for a TIP
  331.           //This will be true for nodes that are racks and multiple nodes in
  332.           //the rack contain the input for a tip. Note that if it already
  333.           //exists in the hostMaps, it must be the last element there since
  334.           //we process one TIP at a time sequentially in the split-size order
  335.           if (hostMaps.get(hostMaps.size() - 1) != maps[i]) {
  336.             hostMaps.add(maps[i]);
  337.           }
  338.           node = node.getParent();
  339.         }
  340.       }
  341.     }
  342.     return cache;
  343.   }
  344.   
  345.   /**
  346.    * Check if the job has been initialized.
  347.    * @return <code>true</code> if the job has been initialized, 
  348.    *         <code>false</code> otherwise
  349.    */
  350.   public boolean inited() {
  351.     return tasksInited.get();
  352.   }
  353.   
  354.   /**
  355.    * Construct the splits, etc.  This is invoked from an async
  356.    * thread so that split-computation doesn't block anyone.
  357.    */
  358.   public synchronized void initTasks() throws IOException {
  359.     if (tasksInited.get()) {
  360.       return;
  361.     }
  362.     synchronized(jobInitKillStatus){
  363.       if(jobInitKillStatus.killed) {
  364.         return;
  365.       }
  366.       jobInitKillStatus.initStarted = true;
  367.     }
  368.     LOG.debug("initializing " + this.jobId);
  369.     // log job info
  370.     JobHistory.JobInfo.logSubmitted(getJobID(), conf, jobFile.toString(), 
  371.                                     this.startTime);
  372.     // log the job priority
  373.     setPriority(this.priority);
  374.     
  375.     //
  376.     // read input splits and create a map per a split
  377.     //
  378.     String jobFile = profile.getJobFile();
  379.     Path sysDir = new Path(this.jobtracker.getSystemDir());
  380.     FileSystem fs = sysDir.getFileSystem(conf);
  381.     DataInputStream splitFile =
  382.       fs.open(new Path(conf.get("mapred.job.split.file")));
  383.     JobClient.RawSplit[] splits;
  384.     try {
  385.       splits = JobClient.readSplitFile(splitFile);
  386.     } finally {
  387.       splitFile.close();
  388.     }
  389.     numMapTasks = splits.length;
  390.     // if the number of splits is larger than a configured value
  391.     // then fail the job.
  392.     int maxTasks = jobtracker.getMaxTasksPerJob();
  393.     if (maxTasks > 0 && numMapTasks + numReduceTasks > maxTasks) {
  394.       throw new IOException(
  395.                 "The number of tasks for this job " + 
  396.                 (numMapTasks + numReduceTasks) +
  397.                 " exceeds the configured limit " + maxTasks);
  398.     }
  399.     jobtracker.getInstrumentation().addWaiting(
  400.         getJobID(), numMapTasks + numReduceTasks);
  401.     maps = new TaskInProgress[numMapTasks];
  402.     for(int i=0; i < numMapTasks; ++i) {
  403.       inputLength += splits[i].getDataLength();
  404.       maps[i] = new TaskInProgress(jobId, jobFile, 
  405.                                    splits[i], 
  406.                                    jobtracker, conf, this, i);
  407.     }
  408.     LOG.info("Input size for job "+ jobId + " = " + inputLength);
  409.     if (numMapTasks > 0) { 
  410.       LOG.info("Split info for job:" + jobId + " with " + 
  411.                splits.length + " splits:");
  412.       nonRunningMapCache = createCache(splits, maxLevel);
  413.     }
  414.         
  415.     // set the launch time
  416.     this.launchTime = System.currentTimeMillis();
  417.     // if no split is returned, job is considered completed and successful
  418.     if (numMapTasks == 0) {
  419.       // Finished time need to be setted here to prevent this job to be retired
  420.       // from the job tracker jobs at the next retire iteration.
  421.       this.finishTime = this.launchTime;
  422.       status.setSetupProgress(1.0f);
  423.       status.setMapProgress(1.0f);
  424.       status.setReduceProgress(1.0f);
  425.       status.setCleanupProgress(1.0f);
  426.       status.setRunState(JobStatus.SUCCEEDED);
  427.       tasksInited.set(true);
  428.       JobHistory.JobInfo.logInited(profile.getJobID(), 
  429.                                     this.launchTime, 0, 0);
  430.       JobHistory.JobInfo.logFinished(profile.getJobID(), 
  431.                                      this.finishTime, 0, 0, 0, 0,
  432.                                      getCounters());
  433.       // Special case because the Job is not queued
  434.       JobEndNotifier.registerNotification(this.getJobConf(), this.getStatus());
  435.       return;
  436.     }
  437.     //
  438.     // Create reduce tasks
  439.     //
  440.     this.reduces = new TaskInProgress[numReduceTasks];
  441.     for (int i = 0; i < numReduceTasks; i++) {
  442.       reduces[i] = new TaskInProgress(jobId, jobFile, 
  443.                                       numMapTasks, i, 
  444.                                       jobtracker, conf, this);
  445.       nonRunningReduces.add(reduces[i]);
  446.     }
  447.     // Calculate the minimum number of maps to be complete before 
  448.     // we should start scheduling reduces
  449.     completedMapsForReduceSlowstart = 
  450.       (int)Math.ceil(
  451.           (conf.getFloat("mapred.reduce.slowstart.completed.maps", 
  452.                          DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART) * 
  453.            numMapTasks));
  454.     // create cleanup two cleanup tips, one map and one reduce.
  455.     cleanup = new TaskInProgress[2];
  456.     // cleanup map tip. This map is doesn't use split. 
  457.     // Just assign splits[0]
  458.     cleanup[0] = new TaskInProgress(jobId, jobFile, splits[0], 
  459.             jobtracker, conf, this, numMapTasks);
  460.     cleanup[0].setJobCleanupTask();
  461.     // cleanup reduce tip.
  462.     cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
  463.                        numReduceTasks, jobtracker, conf, this);
  464.     cleanup[1].setJobCleanupTask();
  465.     // create two setup tips, one map and one reduce.
  466.     setup = new TaskInProgress[2];
  467.     // setup map tip. This map is doesn't use split. 
  468.     // Just assign splits[0]
  469.     setup[0] = new TaskInProgress(jobId, jobFile, splits[0], 
  470.             jobtracker, conf, this, numMapTasks + 1 );
  471.     setup[0].setJobSetupTask();
  472.     // setup reduce tip.
  473.     setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
  474.                        numReduceTasks + 1, jobtracker, conf, this);
  475.     setup[1].setJobSetupTask();
  476.     
  477.     synchronized(jobInitKillStatus){
  478.       jobInitKillStatus.initDone = true;
  479.       if(jobInitKillStatus.killed) {
  480.         //setup not launched so directly terminate
  481.         terminateJob(JobStatus.KILLED);
  482.         return;
  483.       }
  484.     }
  485.     
  486.     tasksInited.set(true);
  487.     JobHistory.JobInfo.logInited(profile.getJobID(), this.launchTime, 
  488.                                  numMapTasks, numReduceTasks);
  489.   }
  490.   /////////////////////////////////////////////////////
  491.   // Accessors for the JobInProgress
  492.   /////////////////////////////////////////////////////
  493.   public JobProfile getProfile() {
  494.     return profile;
  495.   }
  496.   public JobStatus getStatus() {
  497.     return status;
  498.   }
  499.   public synchronized long getLaunchTime() {
  500.     return launchTime;
  501.   }
  502.   public long getStartTime() {
  503.     return startTime;
  504.   }
  505.   public long getFinishTime() {
  506.     return finishTime;
  507.   }
  508.   public int desiredMaps() {
  509.     return numMapTasks;
  510.   }
  511.   public synchronized int finishedMaps() {
  512.     return finishedMapTasks;
  513.   }
  514.   public int desiredReduces() {
  515.     return numReduceTasks;
  516.   }
  517.   public synchronized int runningMaps() {
  518.     return runningMapTasks;
  519.   }
  520.   public synchronized int runningReduces() {
  521.     return runningReduceTasks;
  522.   }
  523.   public synchronized int finishedReduces() {
  524.     return finishedReduceTasks;
  525.   }
  526.   public synchronized int pendingMaps() {
  527.     return numMapTasks - runningMapTasks - failedMapTIPs - 
  528.     finishedMapTasks + speculativeMapTasks;
  529.   }
  530.   public synchronized int pendingReduces() {
  531.     return numReduceTasks - runningReduceTasks - failedReduceTIPs - 
  532.     finishedReduceTasks + speculativeReduceTasks;
  533.   }
  534.   public JobPriority getPriority() {
  535.     return this.priority;
  536.   }
  537.   public void setPriority(JobPriority priority) {
  538.     if(priority == null) {
  539.       this.priority = JobPriority.NORMAL;
  540.     } else {
  541.       this.priority = priority;
  542.     }
  543.     synchronized (this) {
  544.       status.setJobPriority(priority);
  545.     }
  546.     // log and change to the job's priority
  547.     JobHistory.JobInfo.logJobPriority(jobId, priority);
  548.   }
  549.   // Accessors for resources.
  550.   long getMaxVirtualMemoryForTask() {
  551.     return maxVirtualMemoryForTask;
  552.   }
  553.   void setMaxVirtualMemoryForTask(long maxVMem) {
  554.     maxVirtualMemoryForTask = maxVMem;
  555.   }
  556.   long getMaxPhysicalMemoryForTask() {
  557.     return maxPhysicalMemoryForTask;
  558.   }
  559.   void setMaxPhysicalMemoryForTask(long maxPMem) {
  560.     maxPhysicalMemoryForTask = maxPMem;
  561.   }
  562.   // Update the job start/launch time (upon restart) and log to history
  563.   synchronized void updateJobInfo(long startTime, long launchTime) {
  564.     // log and change to the job's start/launch time
  565.     this.startTime = startTime;
  566.     this.launchTime = launchTime;
  567.     JobHistory.JobInfo.logJobInfo(jobId, startTime, launchTime);
  568.   }
  569.   /**
  570.    * Get the number of times the job has restarted
  571.    */
  572.   int getNumRestarts() {
  573.     return restartCount;
  574.   }
  575.   
  576.   long getInputLength() {
  577.     return inputLength;
  578.   }
  579.  
  580.   /**
  581.    * Get the list of map tasks
  582.    * @return the raw array of maps for this job
  583.    */
  584.   TaskInProgress[] getMapTasks() {
  585.     return maps;
  586.   }
  587.     
  588.   /**
  589.    * Get the list of cleanup tasks
  590.    * @return the array of cleanup tasks for the job
  591.    */
  592.   TaskInProgress[] getCleanupTasks() {
  593.     return cleanup;
  594.   }
  595.   
  596.   /**
  597.    * Get the list of setup tasks
  598.    * @return the array of setup tasks for the job
  599.    */
  600.   TaskInProgress[] getSetupTasks() {
  601.     return setup;
  602.   }
  603.   
  604.   /**
  605.    * Get the list of reduce tasks
  606.    * @return the raw array of reduce tasks for this job
  607.    */
  608.   TaskInProgress[] getReduceTasks() {
  609.     return reduces;
  610.   }
  611.   /**
  612.    * Return the nonLocalRunningMaps
  613.    * @return
  614.    */
  615.   Set<TaskInProgress> getNonLocalRunningMaps()
  616.   {
  617.     return nonLocalRunningMaps;
  618.   }
  619.   
  620.   /**
  621.    * Return the runningMapCache
  622.    * @return
  623.    */
  624.   Map<Node, Set<TaskInProgress>> getRunningMapCache()
  625.   {
  626.     return runningMapCache;
  627.   }
  628.   
  629.   /**
  630.    * Return runningReduces
  631.    * @return
  632.    */
  633.   Set<TaskInProgress> getRunningReduces()
  634.   {
  635.     return runningReduces;
  636.   }
  637.   
  638.   /**
  639.    * Get the job configuration
  640.    * @return the job's configuration
  641.    */
  642.   JobConf getJobConf() {
  643.     return conf;
  644.   }
  645.     
  646.   /**
  647.    * Return a vector of completed TaskInProgress objects
  648.    */
  649.   public synchronized Vector<TaskInProgress> reportTasksInProgress(boolean shouldBeMap,
  650.                                                       boolean shouldBeComplete) {
  651.     
  652.     Vector<TaskInProgress> results = new Vector<TaskInProgress>();
  653.     TaskInProgress tips[] = null;
  654.     if (shouldBeMap) {
  655.       tips = maps;
  656.     } else {
  657.       tips = reduces;
  658.     }
  659.     for (int i = 0; i < tips.length; i++) {
  660.       if (tips[i].isComplete() == shouldBeComplete) {
  661.         results.add(tips[i]);
  662.       }
  663.     }
  664.     return results;
  665.   }
  666.   
  667.   /**
  668.    * Return a vector of cleanup TaskInProgress objects
  669.    */
  670.   public synchronized Vector<TaskInProgress> reportCleanupTIPs(
  671.                                                boolean shouldBeComplete) {
  672.     
  673.     Vector<TaskInProgress> results = new Vector<TaskInProgress>();
  674.     for (int i = 0; i < cleanup.length; i++) {
  675.       if (cleanup[i].isComplete() == shouldBeComplete) {
  676.         results.add(cleanup[i]);
  677.       }
  678.     }
  679.     return results;
  680.   }
  681.   /**
  682.    * Return a vector of setup TaskInProgress objects
  683.    */
  684.   public synchronized Vector<TaskInProgress> reportSetupTIPs(
  685.                                                boolean shouldBeComplete) {
  686.     
  687.     Vector<TaskInProgress> results = new Vector<TaskInProgress>();
  688.     for (int i = 0; i < setup.length; i++) {
  689.       if (setup[i].isComplete() == shouldBeComplete) {
  690.         results.add(setup[i]);
  691.       }
  692.     }
  693.     return results;
  694.   }
  695.   ////////////////////////////////////////////////////
  696.   // Status update methods
  697.   ////////////////////////////////////////////////////
  698.   /**
  699.    * Assuming {@link JobTracker} is locked on entry.
  700.    */
  701.   public synchronized void updateTaskStatus(TaskInProgress tip, 
  702.                                             TaskStatus status) {
  703.     double oldProgress = tip.getProgress();   // save old progress
  704.     boolean wasRunning = tip.isRunning();
  705.     boolean wasComplete = tip.isComplete();
  706.     boolean wasPending = tip.isOnlyCommitPending();
  707.     TaskAttemptID taskid = status.getTaskID();
  708.     
  709.     // If the TIP is already completed and the task reports as SUCCEEDED then 
  710.     // mark the task as KILLED.
  711.     // In case of task with no promotion the task tracker will mark the task 
  712.     // as SUCCEEDED.
  713.     // User has requested to kill the task, but TT reported SUCCEEDED, 
  714.     // mark the task KILLED.
  715.     if ((wasComplete || tip.wasKilled(taskid)) && 
  716.         (status.getRunState() == TaskStatus.State.SUCCEEDED)) {
  717.       status.setRunState(TaskStatus.State.KILLED);
  718.     }
  719.     
  720.     // If the job is complete and a task has just reported its 
  721.     // state as FAILED_UNCLEAN/KILLED_UNCLEAN, 
  722.     // make the task's state FAILED/KILLED without launching cleanup attempt.
  723.     // Note that if task is already a cleanup attempt, 
  724.     // we don't change the state to make sure the task gets a killTaskAction
  725.     if ((this.isComplete() || jobFailed || jobKilled) && 
  726.         !tip.isCleanupAttempt(taskid)) {
  727.       if (status.getRunState() == TaskStatus.State.FAILED_UNCLEAN) {
  728.         status.setRunState(TaskStatus.State.FAILED);
  729.       } else if (status.getRunState() == TaskStatus.State.KILLED_UNCLEAN) {
  730.         status.setRunState(TaskStatus.State.KILLED);
  731.       }
  732.     }
  733.     
  734.     boolean change = tip.updateStatus(status);
  735.     if (change) {
  736.       TaskStatus.State state = status.getRunState();
  737.       // get the TaskTrackerStatus where the task ran 
  738.       TaskTrackerStatus ttStatus = 
  739.         this.jobtracker.getTaskTracker(tip.machineWhereTaskRan(taskid));
  740.       String httpTaskLogLocation = null; 
  741.       if (null != ttStatus){
  742.         String host;
  743.         if (NetUtils.getStaticResolution(ttStatus.getHost()) != null) {
  744.           host = NetUtils.getStaticResolution(ttStatus.getHost());
  745.         } else {
  746.           host = ttStatus.getHost();
  747.         }
  748.         httpTaskLogLocation = "http://" + host + ":" + ttStatus.getHttpPort(); 
  749.            //+ "/tasklog?plaintext=true&taskid=" + status.getTaskID();
  750.       }
  751.       TaskCompletionEvent taskEvent = null;
  752.       if (state == TaskStatus.State.SUCCEEDED) {
  753.         taskEvent = new TaskCompletionEvent(
  754.                                             taskCompletionEventTracker, 
  755.                                             taskid,
  756.                                             tip.idWithinJob(),
  757.                                             status.getIsMap() &&
  758.                                             !tip.isJobCleanupTask() &&
  759.                                             !tip.isJobSetupTask(),
  760.                                             TaskCompletionEvent.Status.SUCCEEDED,
  761.                                             httpTaskLogLocation 
  762.                                            );
  763.         taskEvent.setTaskRunTime((int)(status.getFinishTime() 
  764.                                        - status.getStartTime()));
  765.         tip.setSuccessEventNumber(taskCompletionEventTracker); 
  766.       } else if (state == TaskStatus.State.COMMIT_PENDING) {
  767.         // If it is the first attempt reporting COMMIT_PENDING
  768.         // ask the task to commit.
  769.         if (!wasComplete && !wasPending) {
  770.           tip.doCommit(taskid);
  771.         }
  772.         return;
  773.       } else if (state == TaskStatus.State.FAILED_UNCLEAN ||
  774.                  state == TaskStatus.State.KILLED_UNCLEAN) {
  775.         tip.incompleteSubTask(taskid, this.status);
  776.         // add this task, to be rescheduled as cleanup attempt
  777.         if (tip.isMapTask()) {
  778.           mapCleanupTasks.add(taskid);
  779.         } else {
  780.           reduceCleanupTasks.add(taskid);
  781.         }
  782.         // Remove the task entry from jobtracker
  783.         jobtracker.removeTaskEntry(taskid);
  784.       }
  785.       //For a failed task update the JT datastructures. 
  786.       else if (state == TaskStatus.State.FAILED ||
  787.                state == TaskStatus.State.KILLED) {
  788.         // Get the event number for the (possibly) previously successful
  789.         // task. If there exists one, then set that status to OBSOLETE 
  790.         int eventNumber;
  791.         if ((eventNumber = tip.getSuccessEventNumber()) != -1) {
  792.           TaskCompletionEvent t = 
  793.             this.taskCompletionEvents.get(eventNumber);
  794.           if (t.getTaskAttemptId().equals(taskid))
  795.             t.setTaskStatus(TaskCompletionEvent.Status.OBSOLETE);
  796.         }
  797.         
  798.         // Tell the job to fail the relevant task
  799.         failedTask(tip, taskid, status, ttStatus,
  800.                    wasRunning, wasComplete);
  801.         // Did the task failure lead to tip failure?
  802.         TaskCompletionEvent.Status taskCompletionStatus = 
  803.           (state == TaskStatus.State.FAILED ) ?
  804.               TaskCompletionEvent.Status.FAILED :
  805.               TaskCompletionEvent.Status.KILLED;
  806.         if (tip.isFailed()) {
  807.           taskCompletionStatus = TaskCompletionEvent.Status.TIPFAILED;
  808.         }
  809.         taskEvent = new TaskCompletionEvent(taskCompletionEventTracker, 
  810.                                             taskid,
  811.                                             tip.idWithinJob(),
  812.                                             status.getIsMap() &&
  813.                                             !tip.isJobCleanupTask() &&
  814.                                             !tip.isJobSetupTask(),
  815.                                             taskCompletionStatus, 
  816.                                             httpTaskLogLocation
  817.                                            );
  818.       }          
  819.       // Add the 'complete' task i.e. successful/failed
  820.       // It _is_ safe to add the TaskCompletionEvent.Status.SUCCEEDED
  821.       // *before* calling TIP.completedTask since:
  822.       // a. One and only one task of a TIP is declared as a SUCCESS, the
  823.       //    other (speculative tasks) are marked KILLED by the TaskCommitThread
  824.       // b. TIP.completedTask *does not* throw _any_ exception at all.
  825.       if (taskEvent != null) {
  826.         this.taskCompletionEvents.add(taskEvent);
  827.         taskCompletionEventTracker++;
  828.         if (state == TaskStatus.State.SUCCEEDED) {
  829.           completedTask(tip, status);
  830.         }
  831.       }
  832.     }
  833.         
  834.     //
  835.     // Update JobInProgress status
  836.     //
  837.     if(LOG.isDebugEnabled()) {
  838.       LOG.debug("Taking progress for " + tip.getTIPId() + " from " + 
  839.                  oldProgress + " to " + tip.getProgress());
  840.     }
  841.     
  842.     if (!tip.isJobCleanupTask() && !tip.isJobSetupTask()) {
  843.       double progressDelta = tip.getProgress() - oldProgress;
  844.       if (tip.isMapTask()) {
  845.         if (maps.length == 0) {
  846.           this.status.setMapProgress(1.0f);
  847.         } else {
  848.           this.status.setMapProgress((float) (this.status.mapProgress() +
  849.                                               progressDelta / maps.length));
  850.         }
  851.       } else {
  852.         if (reduces.length == 0) {
  853.           this.status.setReduceProgress(1.0f);
  854.         } else {
  855.           this.status.setReduceProgress
  856.             ((float) (this.status.reduceProgress() +
  857.                       (progressDelta / reduces.length)));
  858.         }
  859.       }
  860.     }
  861.   }
  862.   /**
  863.    * Returns the job-level counters.
  864.    * 
  865.    * @return the job-level counters.
  866.    */
  867.   public synchronized Counters getJobCounters() {
  868.     return jobCounters;
  869.   }
  870.   
  871.   /**
  872.    *  Returns map phase counters by summing over all map tasks in progress.
  873.    */
  874.   public synchronized Counters getMapCounters() {
  875.     return incrementTaskCounters(new Counters(), maps);
  876.   }
  877.     
  878.   /**
  879.    *  Returns map phase counters by summing over all map tasks in progress.
  880.    */
  881.   public synchronized Counters getReduceCounters() {
  882.     return incrementTaskCounters(new Counters(), reduces);
  883.   }
  884.     
  885.   /**
  886.    *  Returns the total job counters, by adding together the job, 
  887.    *  the map and the reduce counters.
  888.    */
  889.   public synchronized Counters getCounters() {
  890.     Counters result = new Counters();
  891.     result.incrAllCounters(getJobCounters());
  892.     incrementTaskCounters(result, maps);
  893.     return incrementTaskCounters(result, reduces);
  894.   }
  895.     
  896.   /**
  897.    * Increments the counters with the counters from each task.
  898.    * @param counters the counters to increment
  899.    * @param tips the tasks to add in to counters
  900.    * @return counters the same object passed in as counters
  901.    */
  902.   private Counters incrementTaskCounters(Counters counters,
  903.                                          TaskInProgress[] tips) {
  904.     for (TaskInProgress tip : tips) {
  905.       counters.incrAllCounters(tip.getCounters());
  906.     }
  907.     return counters;
  908.   }
  909.   /////////////////////////////////////////////////////
  910.   // Create/manage tasks
  911.   /////////////////////////////////////////////////////
  912.   /**
  913.    * Return a MapTask, if appropriate, to run on the given tasktracker
  914.    */
  915.   public synchronized Task obtainNewMapTask(TaskTrackerStatus tts, 
  916.                                             int clusterSize, 
  917.                                             int numUniqueHosts
  918.                                            ) throws IOException {
  919.     if (status.getRunState() != JobStatus.RUNNING) {
  920.       LOG.info("Cannot create task split for " + profile.getJobID());
  921.       return null;
  922.     }
  923.         
  924.     int target = findNewMapTask(tts, clusterSize, numUniqueHosts, anyCacheLevel,
  925.                                 status.mapProgress());
  926.     if (target == -1) {
  927.       return null;
  928.     }
  929.     
  930.     Task result = maps[target].getTaskToRun(tts.getTrackerName());
  931.     if (result != null) {
  932.       addRunningTaskToTIP(maps[target], result.getTaskID(), tts, true);
  933.     }
  934.     return result;
  935.   }    
  936.   /*
  937.    * Return task cleanup attempt if any, to run on a given tracker
  938.    */
  939.   public Task obtainTaskCleanupTask(TaskTrackerStatus tts, 
  940.                                                  boolean isMapSlot)
  941.   throws IOException {
  942.     if (!tasksInited.get()) {
  943.       return null;
  944.     }
  945.     synchronized (this) {
  946.       if (this.status.getRunState() != JobStatus.RUNNING || 
  947.           jobFailed || jobKilled) {
  948.         return null;
  949.       }
  950.       String taskTracker = tts.getTrackerName();
  951.       if (!shouldRunOnTaskTracker(taskTracker)) {
  952.         return null;
  953.       }
  954.       TaskAttemptID taskid = null;
  955.       TaskInProgress tip = null;
  956.       if (isMapSlot) {
  957.         if (!mapCleanupTasks.isEmpty()) {
  958.           taskid = mapCleanupTasks.remove(0);
  959.           tip = maps[taskid.getTaskID().getId()];
  960.         }
  961.       } else {
  962.         if (!reduceCleanupTasks.isEmpty()) {
  963.           taskid = reduceCleanupTasks.remove(0);
  964.           tip = reduces[taskid.getTaskID().getId()];
  965.         }
  966.       }
  967.       if (tip != null) {
  968.         return tip.addRunningTask(taskid, taskTracker, true);
  969.       }
  970.       return null;
  971.     }
  972.   }
  973.   
  974.   public synchronized Task obtainNewLocalMapTask(TaskTrackerStatus tts,
  975.                                                      int clusterSize, 
  976.                                                      int numUniqueHosts)
  977.   throws IOException {
  978.     if (!tasksInited.get()) {
  979.       LOG.info("Cannot create task split for " + profile.getJobID());
  980.       return null;
  981.     }
  982.     int target = findNewMapTask(tts, clusterSize, numUniqueHosts, maxLevel, 
  983.                                 status.mapProgress());
  984.     if (target == -1) {
  985.       return null;
  986.     }
  987.     Task result = maps[target].getTaskToRun(tts.getTrackerName());
  988.     if (result != null) {
  989.       addRunningTaskToTIP(maps[target], result.getTaskID(), tts, true);
  990.     }
  991.     return result;
  992.   }
  993.   
  994.   public synchronized Task obtainNewNonLocalMapTask(TaskTrackerStatus tts,
  995.                                                     int clusterSize, 
  996.                                                     int numUniqueHosts)
  997.   throws IOException {
  998.     if (!tasksInited.get()) {
  999.       LOG.info("Cannot create task split for " + profile.getJobID());
  1000.       return null;
  1001.     }
  1002.     int target = findNewMapTask(tts, clusterSize, numUniqueHosts, 
  1003.                                 NON_LOCAL_CACHE_LEVEL, status.mapProgress());
  1004.     if (target == -1) {
  1005.       return null;
  1006.     }
  1007.     Task result = maps[target].getTaskToRun(tts.getTrackerName());
  1008.     if (result != null) {
  1009.       addRunningTaskToTIP(maps[target], result.getTaskID(), tts, true);
  1010.     }
  1011.     return result;
  1012.   }
  1013.   
  1014.   /**
  1015.    * Return a CleanupTask, if appropriate, to run on the given tasktracker
  1016.    * 
  1017.    */
  1018.   public Task obtainJobCleanupTask(TaskTrackerStatus tts, 
  1019.                                              int clusterSize, 
  1020.                                              int numUniqueHosts,
  1021.                                              boolean isMapSlot
  1022.                                             ) throws IOException {
  1023.     if(!tasksInited.get()) {
  1024.       return null;
  1025.     }
  1026.     
  1027.     synchronized(this) {
  1028.       if (!canLaunchJobCleanupTask()) {
  1029.         return null;
  1030.       }
  1031.       
  1032.       String taskTracker = tts.getTrackerName();
  1033.       // Update the last-known clusterSize
  1034.       this.clusterSize = clusterSize;
  1035.       if (!shouldRunOnTaskTracker(taskTracker)) {
  1036.         return null;
  1037.       }
  1038.       
  1039.       List<TaskInProgress> cleanupTaskList = new ArrayList<TaskInProgress>();
  1040.       if (isMapSlot) {
  1041.         cleanupTaskList.add(cleanup[0]);
  1042.       } else {
  1043.         cleanupTaskList.add(cleanup[1]);
  1044.       }
  1045.       TaskInProgress tip = findTaskFromList(cleanupTaskList,
  1046.                              tts, numUniqueHosts, false);
  1047.       if (tip == null) {
  1048.         return null;
  1049.       }
  1050.       
  1051.       // Now launch the cleanupTask
  1052.       Task result = tip.getTaskToRun(tts.getTrackerName());
  1053.       if (result != null) {
  1054.         addRunningTaskToTIP(tip, result.getTaskID(), tts, true);
  1055.       }
  1056.       return result;
  1057.     }
  1058.     
  1059.   }
  1060.   
  1061.   /**
  1062.    * Check whether cleanup task can be launched for the job.
  1063.    * 
  1064.    * Cleanup task can be launched if it is not already launched
  1065.    * or job is Killed
  1066.    * or all maps and reduces are complete
  1067.    * @return true/false
  1068.    */
  1069.   private synchronized boolean canLaunchJobCleanupTask() {
  1070.     // check if the job is running
  1071.     if (status.getRunState() != JobStatus.RUNNING &&
  1072.         status.getRunState() != JobStatus.PREP) {
  1073.       return false;
  1074.     }
  1075.     // check if cleanup task has been launched already. 
  1076.     if (launchedCleanup) {
  1077.       return false;
  1078.     }
  1079.     // check if job has failed or killed
  1080.     if (jobKilled || jobFailed) {
  1081.       return true;
  1082.     }
  1083.     // Check if all maps and reducers have finished.
  1084.     boolean launchCleanupTask = 
  1085.         ((finishedMapTasks + failedMapTIPs) == (numMapTasks));
  1086.     if (launchCleanupTask) {
  1087.       launchCleanupTask = 
  1088.         ((finishedReduceTasks + failedReduceTIPs) == numReduceTasks);
  1089.     }
  1090.     return launchCleanupTask;
  1091.   }
  1092.   /**
  1093.    * Return a SetupTask, if appropriate, to run on the given tasktracker
  1094.    * 
  1095.    */
  1096.   public Task obtainJobSetupTask(TaskTrackerStatus tts, 
  1097.                                              int clusterSize, 
  1098.                                              int numUniqueHosts,
  1099.                                              boolean isMapSlot
  1100.                                             ) throws IOException {
  1101.     if(!tasksInited.get()) {
  1102.       return null;
  1103.     }
  1104.     
  1105.     synchronized(this) {
  1106.       if (!canLaunchSetupTask()) {
  1107.         return null;
  1108.       }
  1109.       
  1110.       String taskTracker = tts.getTrackerName();
  1111.       // Update the last-known clusterSize
  1112.       this.clusterSize = clusterSize;
  1113.       if (!shouldRunOnTaskTracker(taskTracker)) {
  1114.         return null;
  1115.       }
  1116.       
  1117.       List<TaskInProgress> setupTaskList = new ArrayList<TaskInProgress>();
  1118.       if (isMapSlot) {
  1119.         setupTaskList.add(setup[0]);
  1120.       } else {
  1121.         setupTaskList.add(setup[1]);
  1122.       }
  1123.       TaskInProgress tip = findTaskFromList(setupTaskList,
  1124.                              tts, numUniqueHosts, false);
  1125.       if (tip == null) {
  1126.         return null;
  1127.       }
  1128.       
  1129.       // Now launch the setupTask
  1130.       Task result = tip.getTaskToRun(tts.getTrackerName());
  1131.       if (result != null) {
  1132.         addRunningTaskToTIP(tip, result.getTaskID(), tts, true);
  1133.       }
  1134.       return result;
  1135.     }
  1136.   }
  1137.   
  1138.   public synchronized boolean scheduleReduces() {
  1139.     return finishedMapTasks >= completedMapsForReduceSlowstart;
  1140.   }
  1141.   
  1142.   /**
  1143.    * Check whether setup task can be launched for the job.
  1144.    * 
  1145.    * Setup task can be launched after the tasks are inited
  1146.    * and Job is in PREP state
  1147.    * and if it is not already launched
  1148.    * or job is not Killed/Failed
  1149.    * @return true/false
  1150.    */
  1151.   private synchronized boolean canLaunchSetupTask() {
  1152.     return (tasksInited.get() && status.getRunState() == JobStatus.PREP && 
  1153.            !launchedSetup && !jobKilled && !jobFailed);
  1154.   }
  1155.   
  1156.   /**
  1157.    * Return a ReduceTask, if appropriate, to run on the given tasktracker.
  1158.    * We don't have cache-sensitivity for reduce tasks, as they
  1159.    *  work on temporary MapRed files.  
  1160.    */
  1161.   public synchronized Task obtainNewReduceTask(TaskTrackerStatus tts,
  1162.                                                int clusterSize,
  1163.                                                int numUniqueHosts
  1164.                                               ) throws IOException {
  1165.     if (status.getRunState() != JobStatus.RUNNING) {
  1166.       LOG.info("Cannot create task split for " + profile.getJobID());
  1167.       return null;
  1168.     }
  1169.     
  1170.     // Ensure we have sufficient map outputs ready to shuffle before 
  1171.     // scheduling reduces
  1172.     if (!scheduleReduces()) {
  1173.       return null;
  1174.     }
  1175.     int  target = findNewReduceTask(tts, clusterSize, numUniqueHosts, 
  1176.                                     status.reduceProgress());
  1177.     if (target == -1) {
  1178.       return null;
  1179.     }
  1180.     
  1181.     Task result = reduces[target].getTaskToRun(tts.getTrackerName());
  1182.     if (result != null) {
  1183.       addRunningTaskToTIP(reduces[target], result.getTaskID(), tts, true);
  1184.     }
  1185.     return result;
  1186.   }
  1187.   
  1188.   // returns the (cache)level at which the nodes matches
  1189.   private int getMatchingLevelForNodes(Node n1, Node n2) {
  1190.     int count = 0;
  1191.     do {
  1192.       if (n1.equals(n2)) {
  1193.         return count;
  1194.       }
  1195.       ++count;
  1196.       n1 = n1.getParent();
  1197.       n2 = n2.getParent();
  1198.     } while (n1 != null);
  1199.     return this.maxLevel;
  1200.   }
  1201.   /**
  1202.    * Populate the data structures as a task is scheduled.
  1203.    * 
  1204.    * Assuming {@link JobTracker} is locked on entry.
  1205.    * 
  1206.    * @param tip The tip for which the task is added
  1207.    * @param id The attempt-id for the task
  1208.    * @param tts task-tracker status
  1209.    * @param isScheduled Whether this task is scheduled from the JT or has 
  1210.    *        joined back upon restart
  1211.    */
  1212.   synchronized void addRunningTaskToTIP(TaskInProgress tip, TaskAttemptID id, 
  1213.                                         TaskTrackerStatus tts, 
  1214.                                         boolean isScheduled) {
  1215.     // Make an entry in the tip if the attempt is not scheduled i.e externally
  1216.     // added
  1217.     if (!isScheduled) {
  1218.       tip.addRunningTask(id, tts.getTrackerName());
  1219.     }
  1220.     final JobTrackerInstrumentation metrics = jobtracker.getInstrumentation();
  1221.     // keeping the earlier ordering intact
  1222.     String name;
  1223.     String splits = "";
  1224.     Enum counter = null;
  1225.     if (tip.isJobSetupTask()) {
  1226.       launchedSetup = true;
  1227.       name = Values.SETUP.name();
  1228.     } else if (tip.isJobCleanupTask()) {
  1229.       launchedCleanup = true;
  1230.       name = Values.CLEANUP.name();
  1231.     } else if (tip.isMapTask()) {
  1232.       ++runningMapTasks;
  1233.       name = Values.MAP.name();
  1234.       counter = Counter.TOTAL_LAUNCHED_MAPS;
  1235.       splits = tip.getSplitNodes();
  1236.       if (tip.getActiveTasks().size() > 1)
  1237.         speculativeMapTasks++;
  1238.       metrics.launchMap(id);
  1239.     } else {
  1240.       ++runningReduceTasks;
  1241.       name = Values.REDUCE.name();
  1242.       counter = Counter.TOTAL_LAUNCHED_REDUCES;
  1243.       if (tip.getActiveTasks().size() > 1)
  1244.         speculativeReduceTasks++;
  1245.       metrics.launchReduce(id);
  1246.     }
  1247.     // Note that the logs are for the scheduled tasks only. Tasks that join on 
  1248.     // restart has already their logs in place.
  1249.     if (tip.isFirstAttempt(id)) {
  1250.       JobHistory.Task.logStarted(tip.getTIPId(), name,
  1251.                                  tip.getExecStartTime(), splits);
  1252.     }
  1253.     if (!tip.isJobSetupTask() && !tip.isJobCleanupTask()) {
  1254.       jobCounters.incrCounter(counter, 1);
  1255.     }
  1256.     
  1257.     //TODO The only problem with these counters would be on restart.
  1258.     // The jobtracker updates the counter only when the task that is scheduled
  1259.     // if from a non-running tip and is local (data, rack ...). But upon restart
  1260.     // as the reports come from the task tracker, there is no good way to infer
  1261.     // when exactly to increment the locality counters. The only solution is to 
  1262.     // increment the counters for all the tasks irrespective of 
  1263.     //    - whether the tip is running or not
  1264.     //    - whether its a speculative task or not
  1265.     //
  1266.     // So to simplify, increment the data locality counter whenever there is 
  1267.     // data locality.
  1268.     if (tip.isMapTask() && !tip.isJobSetupTask() && !tip.isJobCleanupTask()) {
  1269.       // increment the data locality counter for maps
  1270.       Node tracker = jobtracker.getNode(tts.getHost());
  1271.       int level = this.maxLevel;
  1272.       // find the right level across split locations
  1273.       for (String local : maps[tip.getIdWithinJob()].getSplitLocations()) {
  1274.         Node datanode = jobtracker.getNode(local);
  1275.         int newLevel = this.maxLevel;
  1276.         if (tracker != null && datanode != null) {
  1277.           newLevel = getMatchingLevelForNodes(tracker, datanode);
  1278.         }
  1279.         if (newLevel < level) {
  1280.           level = newLevel;
  1281.           // an optimization
  1282.           if (level == 0) {
  1283.             break;
  1284.           }
  1285.         }
  1286.       }
  1287.       switch (level) {
  1288.       case 0 :
  1289.         LOG.info("Choosing data-local task " + tip.getTIPId());
  1290.         jobCounters.incrCounter(Counter.DATA_LOCAL_MAPS, 1);
  1291.         break;
  1292.       case 1:
  1293.         LOG.info("Choosing rack-local task " + tip.getTIPId());
  1294.         jobCounters.incrCounter(Counter.RACK_LOCAL_MAPS, 1);
  1295.         break;
  1296.       default :
  1297.         // check if there is any locality
  1298.         if (level != this.maxLevel) {
  1299.           LOG.info("Choosing cached task at level " + level + tip.getTIPId());
  1300.           jobCounters.incrCounter(Counter.OTHER_LOCAL_MAPS, 1);
  1301.         }
  1302.         break;
  1303.       }
  1304.     }
  1305.   }
  1306.     
  1307.   static String convertTrackerNameToHostName(String trackerName) {
  1308.     // Ugly!
  1309.     // Convert the trackerName to it's host name
  1310.     int indexOfColon = trackerName.indexOf(":");
  1311.     String trackerHostName = (indexOfColon == -1) ? 
  1312.       trackerName : 
  1313.       trackerName.substring(0, indexOfColon);
  1314.     return trackerHostName.substring("tracker_".length());
  1315.   }
  1316.     
  1317.   /**
  1318.    * Note that a task has failed on a given tracker and add the tracker  
  1319.    * to the blacklist iff too many trackers in the cluster i.e. 
  1320.    * (clusterSize * CLUSTER_BLACKLIST_PERCENT) haven't turned 'flaky' already.
  1321.    * 
  1322.    * @param trackerName task-tracker on which a task failed
  1323.    */
  1324.   void addTrackerTaskFailure(String trackerName) {
  1325.     if (flakyTaskTrackers < (clusterSize * CLUSTER_BLACKLIST_PERCENT)) { 
  1326.       String trackerHostName = convertTrackerNameToHostName(trackerName);
  1327.       Integer trackerFailures = trackerToFailuresMap.get(trackerHostName);
  1328.       if (trackerFailures == null) {
  1329.         trackerFailures = 0;
  1330.       }
  1331.       trackerToFailuresMap.put(trackerHostName, ++trackerFailures);
  1332.       // Check if this tasktracker has turned 'flaky'
  1333.       if (trackerFailures.intValue() == conf.getMaxTaskFailuresPerTracker()) {
  1334.         ++flakyTaskTrackers;
  1335.         LOG.info("TaskTracker at '" + trackerHostName + "' turned 'flaky'");
  1336.       }
  1337.     }
  1338.   }
  1339.     
  1340.   private int getTrackerTaskFailures(String trackerName) {
  1341.     String trackerHostName = convertTrackerNameToHostName(trackerName);
  1342.     Integer failedTasks = trackerToFailuresMap.get(trackerHostName);
  1343.     return (failedTasks != null) ? failedTasks.intValue() : 0; 
  1344.   }
  1345.     
  1346.   /**
  1347.    * Get the black listed trackers for the job
  1348.    * 
  1349.    * @return List of blacklisted tracker names
  1350.    */
  1351.   List<String> getBlackListedTrackers() {
  1352.     List<String> blackListedTrackers = new ArrayList<String>();
  1353.     for (Map.Entry<String,Integer> e : trackerToFailuresMap.entrySet()) {
  1354.        if (e.getValue().intValue() >= conf.getMaxTaskFailuresPerTracker()) {
  1355.          blackListedTrackers.add(e.getKey());
  1356.        }
  1357.     }
  1358.     return blackListedTrackers;
  1359.   }
  1360.   
  1361.   /**
  1362.    * Get the no. of 'flaky' tasktrackers for a given job.
  1363.    * 
  1364.    * @return the no. of 'flaky' tasktrackers for a given job.
  1365.    */
  1366.   int getNoOfBlackListedTrackers() {
  1367.     return flakyTaskTrackers;
  1368.   }
  1369.     
  1370.   /**
  1371.    * Get the information on tasktrackers and no. of errors which occurred
  1372.    * on them for a given job. 
  1373.    * 
  1374.    * @return the map of tasktrackers and no. of errors which occurred
  1375.    *         on them for a given job. 
  1376.    */
  1377.   synchronized Map<String, Integer> getTaskTrackerErrors() {
  1378.     // Clone the 'trackerToFailuresMap' and return the copy
  1379.     Map<String, Integer> trackerErrors = 
  1380.       new TreeMap<String, Integer>(trackerToFailuresMap);
  1381.     return trackerErrors;
  1382.   }
  1383.   /**
  1384.    * Remove a map TIP from the lists for running maps.
  1385.    * Called when a map fails/completes (note if a map is killed,
  1386.    * it won't be present in the list since it was completed earlier)
  1387.    * @param tip the tip that needs to be retired
  1388.    */
  1389.   private synchronized void retireMap(TaskInProgress tip) {
  1390.     if (runningMapCache == null) {
  1391.       LOG.warn("Running cache for maps missing!! "
  1392.                + "Job details are missing.");
  1393.       return;
  1394.     }
  1395.     
  1396.     String[] splitLocations = tip.getSplitLocations();
  1397.     // Remove the TIP from the list for running non-local maps
  1398.     if (splitLocations.length == 0) {
  1399.       nonLocalRunningMaps.remove(tip);
  1400.       return;
  1401.     }
  1402.     // Remove from the running map caches
  1403.     for(String host: splitLocations) {
  1404.       Node node = jobtracker.getNode(host);
  1405.       for (int j = 0; j < maxLevel; ++j) {
  1406.         Set<TaskInProgress> hostMaps = runningMapCache.get(node);
  1407.         if (hostMaps != null) {
  1408.           hostMaps.remove(tip);
  1409.           if (hostMaps.size() == 0) {
  1410.             runningMapCache.remove(node);
  1411.           }
  1412.         }
  1413.         node = node.getParent();
  1414.       }
  1415.     }
  1416.   }
  1417.   /**
  1418.    * Remove a reduce TIP from the list for running-reduces
  1419.    * Called when a reduce fails/completes
  1420.    * @param tip the tip that needs to be retired
  1421.    */
  1422.   private synchronized void retireReduce(TaskInProgress tip) {
  1423.     if (runningReduces == null) {
  1424.       LOG.warn("Running list for reducers missing!! "
  1425.                + "Job details are missing.");
  1426.       return;
  1427.     }
  1428.     runningReduces.remove(tip);
  1429.   }
  1430.   /**
  1431.    * Adds a map tip to the list of running maps.
  1432.    * @param tip the tip that needs to be scheduled as running
  1433.    */
  1434.   private synchronized void scheduleMap(TaskInProgress tip) {
  1435.     
  1436.     if (runningMapCache == null) {
  1437.       LOG.warn("Running cache for maps is missing!! " 
  1438.                + "Job details are missing.");
  1439.       return;
  1440.     }
  1441.     String[] splitLocations = tip.getSplitLocations();
  1442.     // Add the TIP to the list of non-local running TIPs
  1443.     if (splitLocations.length == 0) {
  1444.       nonLocalRunningMaps.add(tip);
  1445.       return;
  1446.     }
  1447.     for(String host: splitLocations) {
  1448.       Node node = jobtracker.getNode(host);
  1449.       for (int j = 0; j < maxLevel; ++j) {
  1450.         Set<TaskInProgress> hostMaps = runningMapCache.get(node);
  1451.         if (hostMaps == null) {
  1452.           // create a cache if needed
  1453.           hostMaps = new LinkedHashSet<TaskInProgress>();
  1454.           runningMapCache.put(node, hostMaps);
  1455.         }
  1456.         hostMaps.add(tip);
  1457.         node = node.getParent();
  1458.       }
  1459.     }
  1460.   }
  1461.   
  1462.   /**
  1463.    * Adds a reduce tip to the list of running reduces
  1464.    * @param tip the tip that needs to be scheduled as running
  1465.    */
  1466.   private synchronized void scheduleReduce(TaskInProgress tip) {
  1467.     if (runningReduces == null) {
  1468.       LOG.warn("Running cache for reducers missing!! "
  1469.                + "Job details are missing.");
  1470.       return;
  1471.     }
  1472.     runningReduces.add(tip);
  1473.   }
  1474.   
  1475.   /**
  1476.    * Adds the failed TIP in the front of the list for non-running maps
  1477.    * @param tip the tip that needs to be failed
  1478.    */
  1479.   private synchronized void failMap(TaskInProgress tip) {
  1480.     if (nonRunningMapCache == null) {
  1481.       LOG.warn("Non-running cache for maps missing!! "
  1482.                + "Job details are missing.");
  1483.       return;
  1484.     }
  1485.     // 1. Its added everywhere since other nodes (having this split local)
  1486.     //    might have removed this tip from their local cache
  1487.     // 2. Give high priority to failed tip - fail early
  1488.     String[] splitLocations = tip.getSplitLocations();
  1489.     // Add the TIP in the front of the list for non-local non-running maps
  1490.     if (splitLocations.length == 0) {
  1491.       nonLocalMaps.add(0, tip);
  1492.       return;
  1493.     }
  1494.     for(String host: splitLocations) {
  1495.       Node node = jobtracker.getNode(host);
  1496.       
  1497.       for (int j = 0; j < maxLevel; ++j) {
  1498.         List<TaskInProgress> hostMaps = nonRunningMapCache.get(node);
  1499.         if (hostMaps == null) {
  1500.           hostMaps = new LinkedList<TaskInProgress>();
  1501.           nonRunningMapCache.put(node, hostMaps);
  1502.         }
  1503.         hostMaps.add(0, tip);
  1504.         node = node.getParent();
  1505.       }
  1506.     }
  1507.   }
  1508.   
  1509.   /**
  1510.    * Adds a failed TIP in the front of the list for non-running reduces
  1511.    * @param tip the tip that needs to be failed
  1512.    */
  1513.   private synchronized void failReduce(TaskInProgress tip) {
  1514.     if (nonRunningReduces == null) {
  1515.       LOG.warn("Failed cache for reducers missing!! "
  1516.                + "Job details are missing.");
  1517.       return;
  1518.     }
  1519.     nonRunningReduces.add(0, tip);
  1520.   }
  1521.   
  1522.   /**
  1523.    * Find a non-running task in the passed list of TIPs
  1524.    * @param tips a collection of TIPs
  1525.    * @param ttStatus the status of tracker that has requested a task to run
  1526.    * @param numUniqueHosts number of unique hosts that run trask trackers
  1527.    * @param removeFailedTip whether to remove the failed tips
  1528.    */
  1529.   private synchronized TaskInProgress findTaskFromList(
  1530.       Collection<TaskInProgress> tips, TaskTrackerStatus ttStatus,
  1531.       int numUniqueHosts,
  1532.       boolean removeFailedTip) {
  1533.     Iterator<TaskInProgress> iter = tips.iterator();
  1534.     while (iter.hasNext()) {
  1535.       TaskInProgress tip = iter.next();
  1536.       // Select a tip if
  1537.       //   1. runnable   : still needs to be run and is not completed
  1538.       //   2. ~running   : no other node is running it
  1539.       //   3. earlier attempt failed : has not failed on this host
  1540.       //                               and has failed on all the other hosts
  1541.       // A TIP is removed from the list if 
  1542.       // (1) this tip is scheduled
  1543.       // (2) if the passed list is a level 0 (host) cache
  1544.       // (3) when the TIP is non-schedulable (running, killed, complete)
  1545.       if (tip.isRunnable() && !tip.isRunning()) {
  1546.         // check if the tip has failed on this host
  1547.         if (!tip.hasFailedOnMachine(ttStatus.getHost()) || 
  1548.              tip.getNumberOfFailedMachines() >= numUniqueHosts) {
  1549.           // check if the tip has failed on all the nodes
  1550.           iter.remove();
  1551.           return tip;
  1552.         } else if (removeFailedTip) { 
  1553.           // the case where we want to remove a failed tip from the host cache
  1554.           // point#3 in the TIP removal logic above
  1555.           iter.remove();
  1556.         }
  1557.       } else {
  1558.         // see point#3 in the comment above for TIP removal logic
  1559.         iter.remove();
  1560.       }
  1561.     }
  1562.     return null;
  1563.   }
  1564.   
  1565.   /**
  1566.    * Find a speculative task
  1567.    * @param list a list of tips
  1568.    * @param taskTracker the tracker that has requested a tip
  1569.    * @param avgProgress the average progress for speculation
  1570.    * @param currentTime current time in milliseconds
  1571.    * @param shouldRemove whether to remove the tips
  1572.    * @return a tip that can be speculated on the tracker
  1573.    */
  1574.   private synchronized TaskInProgress findSpeculativeTask(
  1575.       Collection<TaskInProgress> list, TaskTrackerStatus ttStatus,
  1576.       double avgProgress, long currentTime, boolean shouldRemove) {
  1577.     
  1578.     Iterator<TaskInProgress> iter = list.iterator();
  1579.     while (iter.hasNext()) {
  1580.       TaskInProgress tip = iter.next();
  1581.       // should never be true! (since we delete completed/failed tasks)
  1582.       if (!tip.isRunning()) {
  1583.         iter.remove();
  1584.         continue;
  1585.       }
  1586.       if (!tip.hasRunOnMachine(ttStatus.getHost(), 
  1587.                                ttStatus.getTrackerName())) {
  1588.         if (tip.hasSpeculativeTask(currentTime, avgProgress)) {
  1589.           // In case of shared list we don't remove it. Since the TIP failed 
  1590.           // on this tracker can be scheduled on some other tracker.
  1591.           if (shouldRemove) {
  1592.             iter.remove(); //this tracker is never going to run it again
  1593.           }
  1594.           return tip;
  1595.         } 
  1596.       } else {
  1597.         // Check if this tip can be removed from the list.
  1598.         // If the list is shared then we should not remove.
  1599.         if (shouldRemove) {
  1600.           // This tracker will never speculate this tip
  1601.           iter.remove();
  1602.         }
  1603.       }
  1604.     }
  1605.     return null;
  1606.   }
  1607.   
  1608.   /**
  1609.    * Find new map task
  1610.    * @param tts The task tracker that is asking for a task
  1611.    * @param clusterSize The number of task trackers in the cluster
  1612.    * @param numUniqueHosts The number of hosts that run task trackers
  1613.    * @param avgProgress The average progress of this kind of task in this job
  1614.    * @param maxCacheLevel The maximum topology level until which to schedule
  1615.    *                      maps. 
  1616.    *                      A value of {@link #anyCacheLevel} implies any 
  1617.    *                      available task (node-local, rack-local, off-switch and 
  1618.    *                      speculative tasks).
  1619.    *                      A value of {@link #NON_LOCAL_CACHE_LEVEL} implies only
  1620.    *                      off-switch/speculative tasks should be scheduled.
  1621.    * @return the index in tasks of the selected task (or -1 for no task)
  1622.    */
  1623.   private synchronized int findNewMapTask(final TaskTrackerStatus tts, 
  1624.                                           final int clusterSize,
  1625.                                           final int numUniqueHosts,
  1626.                                           final int maxCacheLevel,
  1627.                                           final double avgProgress) {
  1628.     String taskTracker = tts.getTrackerName();
  1629.     TaskInProgress tip = null;
  1630.     
  1631.     //
  1632.     // Update the last-known clusterSize
  1633.     //
  1634.     this.clusterSize = clusterSize;
  1635.     if (!shouldRunOnTaskTracker(taskTracker)) {
  1636.       return -1;
  1637.     }
  1638.     // Check to ensure this TaskTracker has enough resources to 
  1639.     // run tasks from this job
  1640.     long outSize = resourceEstimator.getEstimatedMapOutputSize();
  1641.     long availSpace = tts.getResourceStatus().getAvailableSpace();
  1642.     if(availSpace < outSize) {
  1643.       LOG.warn("No room for map task. Node " + tts.getHost() + 
  1644.                " has " + availSpace + 
  1645.                " bytes free; but we expect map to take " + outSize);
  1646.       return -1; //see if a different TIP might work better. 
  1647.     }
  1648.     
  1649.     
  1650.     // For scheduling a map task, we have two caches and a list (optional)
  1651.     //  I)   one for non-running task
  1652.     //  II)  one for running task (this is for handling speculation)
  1653.     //  III) a list of TIPs that have empty locations (e.g., dummy splits),
  1654.     //       the list is empty if all TIPs have associated locations
  1655.     // First a look up is done on the non-running cache and on a miss, a look 
  1656.     // up is done on the running cache. The order for lookup within the cache:
  1657.     //   1. from local node to root [bottom up]
  1658.     //   2. breadth wise for all the parent nodes at max level
  1659.     // We fall to linear scan of the list (III above) if we have misses in the 
  1660.     // above caches
  1661.     Node node = jobtracker.getNode(tts.getHost());
  1662.     
  1663.     //
  1664.     // I) Non-running TIP :
  1665.     // 
  1666.     // 1. check from local node to the root [bottom up cache lookup]
  1667.     //    i.e if the cache is available and the host has been resolved
  1668.     //    (node!=null)
  1669.     if (node != null) {
  1670.       Node key = node;
  1671.       int level = 0;
  1672.       // maxCacheLevel might be greater than this.maxLevel if findNewMapTask is
  1673.       // called to schedule any task (local, rack-local, off-switch or speculative)
  1674.       // tasks or it might be NON_LOCAL_CACHE_LEVEL (i.e. -1) if findNewMapTask is
  1675.       //  (i.e. -1) if findNewMapTask is to only schedule off-switch/speculative
  1676.       // tasks
  1677.       int maxLevelToSchedule = Math.min(maxCacheLevel, maxLevel);
  1678.       for (level = 0;level < maxLevelToSchedule; ++level) {
  1679.         List <TaskInProgress> cacheForLevel = nonRunningMapCache.get(key);
  1680.         if (cacheForLevel != null) {
  1681.           tip = findTaskFromList(cacheForLevel, tts, 
  1682.               numUniqueHosts,level == 0);
  1683.           if (tip != null) {
  1684.             // Add to running cache
  1685.             scheduleMap(tip);
  1686.             // remove the cache if its empty
  1687.             if (cacheForLevel.size() == 0) {
  1688.               nonRunningMapCache.remove(key);
  1689.             }
  1690.             return tip.getIdWithinJob();
  1691.           }
  1692.         }
  1693.         key = key.getParent();
  1694.       }
  1695.       
  1696.       // Check if we need to only schedule a local task (node-local/rack-local)
  1697.       if (level == maxCacheLevel) {
  1698.         return -1;
  1699.       }
  1700.     }
  1701.     //2. Search breadth-wise across parents at max level for non-running 
  1702.     //   TIP if
  1703.     //     - cache exists and there is a cache miss 
  1704.     //     - node information for the tracker is missing (tracker's topology
  1705.     //       info not obtained yet)
  1706.     // collection of node at max level in the cache structure
  1707.     Collection<Node> nodesAtMaxLevel = jobtracker.getNodesAtMaxLevel();
  1708.     // get the node parent at max level
  1709.     Node nodeParentAtMaxLevel = 
  1710.       (node == null) ? null : JobTracker.getParentNode(node, maxLevel - 1);
  1711.     
  1712.     for (Node parent : nodesAtMaxLevel) {
  1713.       // skip the parent that has already been scanned
  1714.       if (parent == nodeParentAtMaxLevel) {
  1715.         continue;
  1716.       }
  1717.       List<TaskInProgress> cache = nonRunningMapCache.get(parent);
  1718.       if (cache != null) {
  1719.         tip = findTaskFromList(cache, tts, numUniqueHosts, false);
  1720.         if (tip != null) {
  1721.           // Add to the running cache
  1722.           scheduleMap(tip);
  1723.           // remove the cache if empty
  1724.           if (cache.size() == 0) {
  1725.             nonRunningMapCache.remove(parent);
  1726.           }
  1727.           LOG.info("Choosing a non-local task " + tip.getTIPId());
  1728.           return tip.getIdWithinJob();
  1729.         }
  1730.       }
  1731.     }
  1732.     // 3. Search non-local tips for a new task
  1733.     tip = findTaskFromList(nonLocalMaps, tts, numUniqueHosts, false);
  1734.     if (tip != null) {
  1735.       // Add to the running list
  1736.       scheduleMap(tip);
  1737.       LOG.info("Choosing a non-local task " + tip.getTIPId());
  1738.       return tip.getIdWithinJob();
  1739.     }
  1740.     //
  1741.     // II) Running TIP :
  1742.     // 
  1743.  
  1744.     if (hasSpeculativeMaps) {
  1745.       long currentTime = System.currentTimeMillis();
  1746.       // 1. Check bottom up for speculative tasks from the running cache
  1747.       if (node != null) {
  1748.         Node key = node;
  1749.         for (int level = 0; level < maxLevel; ++level) {
  1750.           Set<TaskInProgress> cacheForLevel = runningMapCache.get(key);
  1751.           if (cacheForLevel != null) {
  1752.             tip = findSpeculativeTask(cacheForLevel, tts, 
  1753.                                       avgProgress, currentTime, level == 0);
  1754.             if (tip != null) {
  1755.               if (cacheForLevel.size() == 0) {
  1756.                 runningMapCache.remove(key);
  1757.               }
  1758.               return tip.getIdWithinJob();
  1759.             }
  1760.           }
  1761.           key = key.getParent();
  1762.         }
  1763.       }
  1764.       // 2. Check breadth-wise for speculative tasks
  1765.       
  1766.       for (Node parent : nodesAtMaxLevel) {
  1767.         // ignore the parent which is already scanned
  1768.         if (parent == nodeParentAtMaxLevel) {
  1769.           continue;
  1770.         }
  1771.         Set<TaskInProgress> cache = runningMapCache.get(parent);
  1772.         if (cache != null) {
  1773.           tip = findSpeculativeTask(cache, tts, avgProgress, 
  1774.                                     currentTime, false);
  1775.           if (tip != null) {
  1776.             // remove empty cache entries
  1777.             if (cache.size() == 0) {
  1778.               runningMapCache.remove(parent);
  1779.             }
  1780.             LOG.info("Choosing a non-local task " + tip.getTIPId() 
  1781.                      + " for speculation");
  1782.             return tip.getIdWithinJob();
  1783.           }
  1784.         }
  1785.       }
  1786.       // 3. Check non-local tips for speculation
  1787.       tip = findSpeculativeTask(nonLocalRunningMaps, tts, avgProgress, 
  1788.                                 currentTime, false);
  1789.       if (tip != null) {
  1790.         LOG.info("Choosing a non-local task " + tip.getTIPId() 
  1791.                  + " for speculation");
  1792.         return tip.getIdWithinJob();
  1793.       }
  1794.     }
  1795.     
  1796.     return -1;
  1797.   }
  1798.   /**
  1799.    * Find new reduce task
  1800.    * @param tts The task tracker that is asking for a task
  1801.    * @param clusterSize The number of task trackers in the cluster
  1802.    * @param numUniqueHosts The number of hosts that run task trackers
  1803.    * @param avgProgress The average progress of this kind of task in this job
  1804.    * @return the index in tasks of the selected task (or -1 for no task)
  1805.    */
  1806.   private synchronized int findNewReduceTask(TaskTrackerStatus tts, 
  1807.                                              int clusterSize,
  1808.                                              int numUniqueHosts,
  1809.                                              double avgProgress) {
  1810.     String taskTracker = tts.getTrackerName();
  1811.     TaskInProgress tip = null;
  1812.     
  1813.     // Update the last-known clusterSize
  1814.     this.clusterSize = clusterSize;
  1815.     if (!shouldRunOnTaskTracker(taskTracker)) {
  1816.       return -1;
  1817.     }
  1818.     long outSize = resourceEstimator.getEstimatedReduceInputSize();
  1819.     long availSpace = tts.getResourceStatus().getAvailableSpace();
  1820.     if(availSpace < outSize) {
  1821.       LOG.warn("No room for reduce task. Node " + taskTracker + " has " +
  1822.                 availSpace + 
  1823.                " bytes free; but we expect reduce input to take " + outSize);
  1824.       return -1; //see if a different TIP might work better. 
  1825.     }
  1826.     
  1827.     // 1. check for a never-executed reduce tip
  1828.     // reducers don't have a cache and so pass -1 to explicitly call that out
  1829.     tip = findTaskFromList(nonRunningReduces, tts, numUniqueHosts, false);
  1830.     if (tip != null) {
  1831.       scheduleReduce(tip);
  1832.       return tip.getIdWithinJob();
  1833.     }
  1834.     // 2. check for a reduce tip to be speculated
  1835.     if (hasSpeculativeReduces) {
  1836.       tip = findSpeculativeTask(runningReduces, tts, avgProgress, 
  1837.                                 System.currentTimeMillis(), false);
  1838.       if (tip != null) {
  1839.         scheduleReduce(tip);
  1840.         return tip.getIdWithinJob();
  1841.       }
  1842.     }
  1843.     return -1;
  1844.   }
  1845.   
  1846.   private boolean shouldRunOnTaskTracker(String taskTracker) {
  1847.     //
  1848.     // Check if too many tasks of this job have failed on this
  1849.     // tasktracker prior to assigning it a new one.
  1850.     //
  1851.     int taskTrackerFailedTasks = getTrackerTaskFailures(taskTracker);
  1852.     if ((flakyTaskTrackers < (clusterSize * CLUSTER_BLACKLIST_PERCENT)) && 
  1853.         taskTrackerFailedTasks >= conf.getMaxTaskFailuresPerTracker()) {
  1854.       if (LOG.isDebugEnabled()) {
  1855.         String flakyTracker = convertTrackerNameToHostName(taskTracker); 
  1856.         LOG.debug("Ignoring the black-listed tasktracker: '" + flakyTracker 
  1857.                   + "' for assigning a new task");
  1858.       }
  1859.       return false;
  1860.     }
  1861.     return true;
  1862.   }
  1863.   /**
  1864.    * A taskid assigned to this JobInProgress has reported in successfully.
  1865.    */
  1866.   public synchronized boolean completedTask(TaskInProgress tip, 
  1867.                                             TaskStatus status)
  1868.   {
  1869.     TaskAttemptID taskid = status.getTaskID();
  1870.     int oldNumAttempts = tip.getActiveTasks().size();
  1871.     final JobTrackerInstrumentation metrics = jobtracker.getInstrumentation();
  1872.         
  1873.     // Sanity check: is the TIP already complete? 
  1874.     // It _is_ safe to not decrement running{Map|Reduce}Tasks and
  1875.     // finished{Map|Reduce}Tasks variables here because one and only
  1876.     // one task-attempt of a TIP gets to completedTask. This is because
  1877.     // the TaskCommitThread in the JobTracker marks other, completed, 
  1878.     // speculative tasks as _complete_.
  1879.     if (tip.isComplete()) {
  1880.       // Mark this task as KILLED
  1881.       tip.alreadyCompletedTask(taskid);
  1882.       // Let the JobTracker cleanup this taskid if the job isn't running
  1883.       if (this.status.getRunState() != JobStatus.RUNNING) {
  1884.         jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
  1885.       }
  1886.       return false;
  1887.     } 
  1888.     LOG.info("Task '" + taskid + "' has completed " + tip.getTIPId() + 
  1889.              " successfully.");          
  1890.     // Mark the TIP as complete
  1891.     tip.completed(taskid);
  1892.     resourceEstimator.updateWithCompletedTask(status, tip);
  1893.     // Update jobhistory 
  1894.     TaskTrackerStatus ttStatus = 
  1895.       this.jobtracker.getTaskTracker(status.getTaskTracker());
  1896.     String trackerHostname = jobtracker.getNode(ttStatus.getHost()).toString();
  1897.     String taskType = getTaskType(tip);
  1898.     if (status.getIsMap()){
  1899.       JobHistory.MapAttempt.logStarted(status.getTaskID(), status.getStartTime(), 
  1900.                                        status.getTaskTracker(), 
  1901.                                        ttStatus.getHttpPort(), 
  1902.                                        taskType); 
  1903.       JobHistory.MapAttempt.logFinished(status.getTaskID(), status.getFinishTime(), 
  1904.                                         trackerHostname, taskType,
  1905.                                         status.getStateString(), 
  1906.                                         status.getCounters()); 
  1907.     }else{
  1908.       JobHistory.ReduceAttempt.logStarted( status.getTaskID(), status.getStartTime(), 
  1909.                                           status.getTaskTracker(),
  1910.                                           ttStatus.getHttpPort(), 
  1911.                                           taskType); 
  1912.       JobHistory.ReduceAttempt.logFinished(status.getTaskID(), status.getShuffleFinishTime(),
  1913.                                            status.getSortFinishTime(), status.getFinishTime(), 
  1914.                                            trackerHostname, 
  1915.                                            taskType,
  1916.                                            status.getStateString(), 
  1917.                                            status.getCounters()); 
  1918.     }
  1919.     JobHistory.Task.logFinished(tip.getTIPId(), 
  1920.                                 taskType,
  1921.                                 tip.getExecFinishTime(),
  1922.                                 status.getCounters()); 
  1923.         
  1924.     int newNumAttempts = tip.getActiveTasks().size();
  1925.     if (tip.isJobSetupTask()) {
  1926.       // setup task has finished. kill the extra setup tip
  1927.       killSetupTip(!tip.isMapTask());
  1928.       // Job can start running now.
  1929.       this.status.setSetupProgress(1.0f);
  1930.       this.status.setRunState(JobStatus.RUNNING);
  1931.       JobHistory.JobInfo.logStarted(profile.getJobID());
  1932.     } else if (tip.isJobCleanupTask()) {
  1933.       // cleanup task has finished. Kill the extra cleanup tip
  1934.       if (tip.isMapTask()) {
  1935.         // kill the reduce tip
  1936.         cleanup[1].kill();
  1937.       } else {
  1938.         cleanup[0].kill();
  1939.       }
  1940.       //
  1941.       // The Job is done
  1942.       // if the job is failed, then mark the job failed.
  1943.       if (jobFailed) {
  1944.         terminateJob(JobStatus.FAILED);
  1945.       }
  1946.       // if the job is killed, then mark the job killed.
  1947.       if (jobKilled) {
  1948.         terminateJob(JobStatus.KILLED);
  1949.       }
  1950.       else {
  1951.         jobComplete();
  1952.       }
  1953.       // The job has been killed/failed/successful
  1954.       // JobTracker should cleanup this task
  1955.       jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
  1956.     } else if (tip.isMapTask()) {
  1957.       runningMapTasks -= 1;
  1958.       // check if this was a sepculative task
  1959.       if (oldNumAttempts > 1) {
  1960.         speculativeMapTasks -= (oldNumAttempts - newNumAttempts);
  1961.       }
  1962.       finishedMapTasks += 1;
  1963.       metrics.completeMap(taskid);
  1964.       // remove the completed map from the resp running caches
  1965.       retireMap(tip);
  1966.       if ((finishedMapTasks + failedMapTIPs) == (numMapTasks)) {
  1967.         this.status.setMapProgress(1.0f);
  1968.       }
  1969.     } else {
  1970.       runningReduceTasks -= 1;
  1971.       if (oldNumAttempts > 1) {
  1972.         speculativeReduceTasks -= (oldNumAttempts - newNumAttempts);
  1973.       }
  1974.       finishedReduceTasks += 1;
  1975.       metrics.completeReduce(taskid);
  1976.       // remove the completed reduces from the running reducers set
  1977.       retireReduce(tip);
  1978.       if ((finishedReduceTasks + failedReduceTIPs) == (numReduceTasks)) {
  1979.         this.status.setReduceProgress(1.0f);
  1980.       }
  1981.     }
  1982.     
  1983.     return true;
  1984.   }
  1985.   /**
  1986.    * The job is done since all it's component tasks are either
  1987.    * successful or have failed.
  1988.    */
  1989.   private void jobComplete() {
  1990.     final JobTrackerInstrumentation metrics = jobtracker.getInstrumentation();
  1991.     //
  1992.     // All tasks are complete, then the job is done!
  1993.     //
  1994.     if (this.status.getRunState() == JobStatus.RUNNING ) {
  1995.       this.status.setRunState(JobStatus.SUCCEEDED);
  1996.       this.status.setCleanupProgress(1.0f);
  1997.       this.finishTime = System.currentTimeMillis();
  1998.       LOG.info("Job " + this.status.getJobID() + 
  1999.                " has completed successfully.");
  2000.       JobHistory.JobInfo.logFinished(this.status.getJobID(), finishTime, 
  2001.                                      this.finishedMapTasks, 
  2002.                                      this.finishedReduceTasks, failedMapTasks, 
  2003.                                      failedReduceTasks, getCounters());
  2004.       // Note that finalize will close the job history handles which garbage collect
  2005.       // might try to finalize
  2006.       garbageCollect();
  2007.       
  2008.       metrics.completeJob(this.conf, this.status.getJobID());
  2009.     }
  2010.   }
  2011.   
  2012.   private synchronized void terminateJob(int jobTerminationState) {
  2013.     if ((status.getRunState() == JobStatus.RUNNING) ||
  2014.         (status.getRunState() == JobStatus.PREP)) {
  2015.       if (jobTerminationState == JobStatus.FAILED) {
  2016.         this.status = new JobStatus(status.getJobID(),
  2017.                                     1.0f, 1.0f, 1.0f, JobStatus.FAILED,
  2018.                                     status.getJobPriority());
  2019.         this.finishTime = System.currentTimeMillis();
  2020.         JobHistory.JobInfo.logFailed(this.status.getJobID(), finishTime, 
  2021.                                      this.finishedMapTasks, 
  2022.                                      this.finishedReduceTasks);
  2023.       } else {
  2024.         this.status = new JobStatus(status.getJobID(),
  2025.                                     1.0f, 1.0f, 1.0f, JobStatus.KILLED,
  2026.                                     status.getJobPriority());
  2027.         this.finishTime = System.currentTimeMillis();
  2028.         JobHistory.JobInfo.logKilled(this.status.getJobID(), finishTime, 
  2029.                                      this.finishedMapTasks, 
  2030.                                      this.finishedReduceTasks);
  2031.       }
  2032.       garbageCollect();
  2033.       jobtracker.getInstrumentation().terminateJob(
  2034.           this.conf, this.status.getJobID());
  2035.     }
  2036.   }
  2037.   /**
  2038.    * Terminate the job and all its component tasks.
  2039.    * Calling this will lead to marking the job as failed/killed. Cleanup 
  2040.    * tip will be launched. If the job has not inited, it will directly call 
  2041.    * terminateJob as there is no need to launch cleanup tip.
  2042.    * This method is reentrant.
  2043.    * @param jobTerminationState job termination state
  2044.    */
  2045.   private synchronized void terminate(int jobTerminationState) {
  2046.     if(!tasksInited.get()) {
  2047.      //init could not be done, we just terminate directly.
  2048.       terminateJob(jobTerminationState);
  2049.       return;
  2050.     }
  2051.     if ((status.getRunState() == JobStatus.RUNNING) ||
  2052.          (status.getRunState() == JobStatus.PREP)) {
  2053.       LOG.info("Killing job '" + this.status.getJobID() + "'");
  2054.       if (jobTerminationState == JobStatus.FAILED) {
  2055.         if(jobFailed) {//reentrant
  2056.           return;
  2057.         }
  2058.         jobFailed = true;
  2059.       } else if (jobTerminationState == JobStatus.KILLED) {
  2060.         if(jobKilled) {//reentrant
  2061.           return;
  2062.         }
  2063.         jobKilled = true;
  2064.       }
  2065.       // clear all unclean tasks
  2066.       clearUncleanTasks();
  2067.       //
  2068.       // kill all TIPs.
  2069.       //
  2070.       for (int i = 0; i < setup.length; i++) {
  2071.         setup[i].kill();
  2072.       }
  2073.       for (int i = 0; i < maps.length; i++) {
  2074.         maps[i].kill();
  2075.       }
  2076.       for (int i = 0; i < reduces.length; i++) {
  2077.         reduces[i].kill();
  2078.       }
  2079.     }
  2080.   }
  2081.   private void clearUncleanTasks() {
  2082.     TaskAttemptID taskid = null;
  2083.     TaskInProgress tip = null;
  2084.     while (!mapCleanupTasks.isEmpty()) {
  2085.       taskid = mapCleanupTasks.remove(0);
  2086.       tip = maps[taskid.getTaskID().getId()];
  2087.       updateTaskStatus(tip, tip.getTaskStatus(taskid));
  2088.     }
  2089.     while (!reduceCleanupTasks.isEmpty()) {
  2090.       taskid = reduceCleanupTasks.remove(0);
  2091.       tip = reduces[taskid.getTaskID().getId()];
  2092.       updateTaskStatus(tip, tip.getTaskStatus(taskid));
  2093.     }
  2094.   }
  2095.   /**
  2096.    * Kill the job and all its component tasks. This method is called from 
  2097.    * jobtracker and should return fast as it locks the jobtracker.
  2098.    */
  2099.   public void kill() {
  2100.     boolean killNow = false;
  2101.     synchronized(jobInitKillStatus) {
  2102.       if(jobInitKillStatus.killed) {//job is already marked for killing
  2103.         return;
  2104.       }
  2105.       jobInitKillStatus.killed = true;
  2106.       //if not in middle of init, terminate it now
  2107.       if(!jobInitKillStatus.initStarted || jobInitKillStatus.initDone) {
  2108.         //avoiding nested locking by setting flag
  2109.         killNow = true;
  2110.       }
  2111.     }
  2112.     if(killNow) {
  2113.       terminate(JobStatus.KILLED);
  2114.     }
  2115.   }
  2116.   
  2117.   /**
  2118.    * Fails the job and all its component tasks.
  2119.    */
  2120.   synchronized void fail() {
  2121.     terminate(JobStatus.FAILED);
  2122.   }
  2123.   
  2124.   /**
  2125.    * A task assigned to this JobInProgress has reported in as failed.
  2126.    * Most of the time, we'll just reschedule execution.  However, after
  2127.    * many repeated failures we may instead decide to allow the entire 
  2128.    * job to fail or succeed if the user doesn't care about a few tasks failing.
  2129.    *
  2130.    * Even if a task has reported as completed in the past, it might later
  2131.    * be reported as failed.  That's because the TaskTracker that hosts a map
  2132.    * task might die before the entire job can complete.  If that happens,
  2133.    * we need to schedule reexecution so that downstream reduce tasks can 
  2134.    * obtain the map task's output.
  2135.    */
  2136.   private void failedTask(TaskInProgress tip, TaskAttemptID taskid, 
  2137.                           TaskStatus status, 
  2138.                           TaskTrackerStatus taskTrackerStatus,
  2139.                           boolean wasRunning, boolean wasComplete) {
  2140.     final JobTrackerInstrumentation metrics = jobtracker.getInstrumentation();
  2141.     // check if the TIP is already failed
  2142.     boolean wasFailed = tip.isFailed();
  2143.     // Mark the taskid as FAILED or KILLED
  2144.     tip.incompleteSubTask(taskid, this.status);
  2145.    
  2146.     boolean isRunning = tip.isRunning();
  2147.     boolean isComplete = tip.isComplete();
  2148.         
  2149.     //update running  count on task failure.
  2150.     if (wasRunning && !isRunning) {
  2151.       if (tip.isJobCleanupTask()) {
  2152.         launchedCleanup = false;
  2153.       } else if (tip.isJobSetupTask()) {
  2154.         launchedSetup = false;
  2155.       } else if (tip.isMapTask()) {
  2156.         runningMapTasks -= 1;
  2157.         metrics.failedMap(taskid);
  2158.         // remove from the running queue and put it in the non-running cache
  2159.         // if the tip is not complete i.e if the tip still needs to be run
  2160.         if (!isComplete) {
  2161.           retireMap(tip);
  2162.           failMap(tip);
  2163.         }
  2164.       } else {
  2165.         runningReduceTasks -= 1;
  2166.         metrics.failedReduce(taskid);
  2167.         // remove from the running queue and put in the failed queue if the tip
  2168.         // is not complete
  2169.         if (!isComplete) {
  2170.           retireReduce(tip);
  2171.           failReduce(tip);
  2172.         }
  2173.       }
  2174.     }
  2175.         
  2176.     // the case when the map was complete but the task tracker went down.
  2177.     if (wasComplete && !isComplete) {
  2178.       if (tip.isMapTask()) {
  2179.         // Put the task back in the cache. This will help locality for cases
  2180.         // where we have a different TaskTracker from the same rack/switch
  2181.         // asking for a task. 
  2182.         // We bother about only those TIPs that were successful
  2183.         // earlier (wasComplete and !isComplete) 
  2184.         // (since they might have been removed from the cache of other 
  2185.         // racks/switches, if the input split blocks were present there too)
  2186.         failMap(tip);
  2187.         finishedMapTasks -= 1;
  2188.       }
  2189.     }
  2190.         
  2191.     // update job history
  2192.     // get taskStatus from tip
  2193.     TaskStatus taskStatus = tip.getTaskStatus(taskid);
  2194.     String taskTrackerName = taskStatus.getTaskTracker();
  2195.     String taskTrackerHostName = convertTrackerNameToHostName(taskTrackerName);
  2196.     int taskTrackerPort = -1;
  2197.     if (taskTrackerStatus != null) {
  2198.       taskTrackerPort = taskTrackerStatus.getHttpPort();
  2199.     }
  2200.     long startTime = taskStatus.getStartTime();
  2201.     long finishTime = taskStatus.getFinishTime();
  2202.     List<String> taskDiagnosticInfo = tip.getDiagnosticInfo(taskid);
  2203.     String diagInfo = taskDiagnosticInfo == null ? "" :
  2204.       StringUtils.arrayToString(taskDiagnosticInfo.toArray(new String[0]));
  2205.     String taskType = getTaskType(tip);
  2206.     if (taskStatus.getIsMap()) {
  2207.       JobHistory.MapAttempt.logStarted(taskid, startTime, 
  2208.         taskTrackerName, taskTrackerPort, taskType);
  2209.       if (taskStatus.getRunState() == TaskStatus.State.FAILED) {
  2210.         JobHistory.MapAttempt.logFailed(taskid, finishTime,
  2211.           taskTrackerHostName, diagInfo, taskType);
  2212.       } else {
  2213.         JobHistory.MapAttempt.logKilled(taskid, finishTime,
  2214.           taskTrackerHostName, diagInfo, taskType);
  2215.       }
  2216.     } else {
  2217.       JobHistory.ReduceAttempt.logStarted(taskid, startTime, 
  2218.         taskTrackerName, taskTrackerPort, taskType);
  2219.       if (taskStatus.getRunState() == TaskStatus.State.FAILED) {
  2220.         JobHistory.ReduceAttempt.logFailed(taskid, finishTime,
  2221.           taskTrackerHostName, diagInfo, taskType);
  2222.       } else {
  2223.         JobHistory.ReduceAttempt.logKilled(taskid, finishTime,
  2224.           taskTrackerHostName, diagInfo, taskType);
  2225.       }
  2226.     }
  2227.         
  2228.     // After this, try to assign tasks with the one after this, so that
  2229.     // the failed task goes to the end of the list.
  2230.     if (!tip.isJobCleanupTask() && !tip.isJobSetupTask()) {
  2231.       if (tip.isMapTask()) {
  2232.         failedMapTasks++;
  2233.       } else {
  2234.         failedReduceTasks++; 
  2235.       }
  2236.     }
  2237.             
  2238.     //
  2239.     // Note down that a task has failed on this tasktracker 
  2240.     //
  2241.     if (status.getRunState() == TaskStatus.State.FAILED) { 
  2242.       addTrackerTaskFailure(taskTrackerName);
  2243.     }
  2244.         
  2245.     //
  2246.     // Let the JobTracker know that this task has failed
  2247.     //
  2248.     jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
  2249.     //
  2250.     // Check if we need to kill the job because of too many failures or 
  2251.     // if the job is complete since all component tasks have completed
  2252.     // We do it once per TIP and that too for the task that fails the TIP
  2253.     if (!wasFailed && tip.isFailed()) {
  2254.       //
  2255.       // Allow upto 'mapFailuresPercent' of map tasks to fail or
  2256.       // 'reduceFailuresPercent' of reduce tasks to fail
  2257.       //
  2258.       boolean killJob = tip.isJobCleanupTask() || tip.isJobSetupTask() ? true :
  2259.                         tip.isMapTask() ? 
  2260.             ((++failedMapTIPs*100) > (mapFailuresPercent*numMapTasks)) :
  2261.             ((++failedReduceTIPs*100) > (reduceFailuresPercent*numReduceTasks));
  2262.       
  2263.       if (killJob) {
  2264.         LOG.info("Aborting job " + profile.getJobID());
  2265.         JobHistory.Task.logFailed(tip.getTIPId(), 
  2266.                                   taskType,  
  2267.                                   finishTime, 
  2268.                                   diagInfo);
  2269.         if (tip.isJobCleanupTask()) {
  2270.           // kill the other tip
  2271.           if (tip.isMapTask()) {
  2272.             cleanup[1].kill();
  2273.           } else {
  2274.             cleanup[0].kill();
  2275.           }
  2276.           terminateJob(JobStatus.FAILED);
  2277.         } else {
  2278.           if (tip.isJobSetupTask()) {
  2279.             // kill the other tip
  2280.             killSetupTip(!tip.isMapTask());
  2281.           }
  2282.           fail();
  2283.         }
  2284.       }
  2285.       
  2286.       //
  2287.       // Update the counters
  2288.       //
  2289.       if (!tip.isJobCleanupTask() && !tip.isJobSetupTask()) {
  2290.         if (tip.isMapTask()) {
  2291.           jobCounters.incrCounter(Counter.NUM_FAILED_MAPS, 1);
  2292.         } else {
  2293.           jobCounters.incrCounter(Counter.NUM_FAILED_REDUCES, 1);
  2294.         }
  2295.       }
  2296.     }
  2297.   }
  2298.   void killSetupTip(boolean isMap) {
  2299.     if (isMap) {
  2300.       setup[0].kill();
  2301.     } else {
  2302.       setup[1].kill();
  2303.     }
  2304.   }
  2305.   /**
  2306.    * Fail a task with a given reason, but without a status object.
  2307.    * 
  2308.    * Assuming {@link JobTracker} is locked on entry.
  2309.    * 
  2310.    * @param tip The task's tip
  2311.    * @param taskid The task id
  2312.    * @param reason The reason that the task failed
  2313.    * @param trackerName The task tracker the task failed on
  2314.    */
  2315.   public void failedTask(TaskInProgress tip, TaskAttemptID taskid, String reason, 
  2316.                          TaskStatus.Phase phase, TaskStatus.State state, 
  2317.                          String trackerName) {
  2318.     TaskStatus status = TaskStatus.createTaskStatus(tip.isMapTask(), 
  2319.                                                     taskid,
  2320.                                                     0.0f,
  2321.                                                     state,
  2322.                                                     reason,
  2323.                                                     reason,
  2324.                                                     trackerName, phase,
  2325.                                                     new Counters());
  2326.     // update the actual start-time of the attempt
  2327.     TaskStatus oldStatus = tip.getTaskStatus(taskid); 
  2328.     long startTime = oldStatus == null
  2329.                      ? System.currentTimeMillis()
  2330.                      : oldStatus.getStartTime();
  2331.     status.setStartTime(startTime);
  2332.     status.setFinishTime(System.currentTimeMillis());
  2333.     boolean wasComplete = tip.isComplete();
  2334.     updateTaskStatus(tip, status);
  2335.     boolean isComplete = tip.isComplete();
  2336.     if (wasComplete && !isComplete) { // mark a successful tip as failed
  2337.       String taskType = getTaskType(tip);
  2338.       JobHistory.Task.logFailed(tip.getTIPId(), taskType, 
  2339.                                 tip.getExecFinishTime(), reason, taskid);
  2340.     }
  2341.   }
  2342.        
  2343.                            
  2344.   /**
  2345.    * The job is dead.  We're now GC'ing it, getting rid of the job
  2346.    * from all tables.  Be sure to remove all of this job's tasks
  2347.    * from the various tables.
  2348.    */
  2349.   synchronized void garbageCollect() {
  2350.     // Let the JobTracker know that a job is complete
  2351.     jobtracker.getInstrumentation(
  2352.         ).decWaiting(getJobID(), pendingMaps() + pendingReduces());
  2353.     jobtracker.storeCompletedJob(this);
  2354.     jobtracker.finalizeJob(this);
  2355.       
  2356.     try {
  2357.       // Definitely remove the local-disk copy of the job file
  2358.       if (localJobFile != null) {
  2359.         localFs.delete(localJobFile, true);
  2360.         localJobFile = null;
  2361.       }
  2362.       if (localJarFile != null) {
  2363.         localFs.delete(localJarFile, true);
  2364.         localJarFile = null;
  2365.       }
  2366.       // clean up splits
  2367.       for (int i = 0; i < maps.length; i++) {
  2368.         maps[i].clearSplit();
  2369.       }
  2370.       // JobClient always creates a new directory with job files
  2371.       // so we remove that directory to cleanup
  2372.       // Delete temp dfs dirs created if any, like in case of 
  2373.       // speculative exn of reduces.  
  2374.       Path tempDir = jobtracker.getSystemDirectoryForJob(getJobID());
  2375.       new CleanupQueue().addToQueue(conf,tempDir); 
  2376.     } catch (IOException e) {
  2377.       LOG.warn("Error cleaning up "+profile.getJobID()+": "+e);
  2378.     }
  2379.     
  2380.     cleanUpMetrics();
  2381.     // free up the memory used by the data structures
  2382.     this.nonRunningMapCache = null;
  2383.     this.runningMapCache = null;
  2384.     this.nonRunningReduces = null;
  2385.     this.runningReduces = null;
  2386.   }
  2387.   /**
  2388.    * Return the TaskInProgress that matches the tipid.
  2389.    */
  2390.   public synchronized TaskInProgress getTaskInProgress(TaskID tipid) {
  2391.     if (tipid.isMap()) {
  2392.       if (tipid.equals(cleanup[0].getTIPId())) { // cleanup map tip
  2393.         return cleanup[0]; 
  2394.       }
  2395.       if (tipid.equals(setup[0].getTIPId())) { //setup map tip
  2396.         return setup[0];
  2397.       }
  2398.       for (int i = 0; i < maps.length; i++) {
  2399.         if (tipid.equals(maps[i].getTIPId())){
  2400.           return maps[i];
  2401.         }
  2402.       }
  2403.     } else {
  2404.       if (tipid.equals(cleanup[1].getTIPId())) { // cleanup reduce tip
  2405.         return cleanup[1]; 
  2406.       }
  2407.       if (tipid.equals(setup[1].getTIPId())) { //setup reduce tip
  2408.         return setup[1];
  2409.       }
  2410.       for (int i = 0; i < reduces.length; i++) {
  2411.         if (tipid.equals(reduces[i].getTIPId())){
  2412.           return reduces[i];
  2413.         }
  2414.       }
  2415.     }
  2416.     return null;
  2417.   }
  2418.     
  2419.   /**
  2420.    * Find the details of someplace where a map has finished
  2421.    * @param mapId the id of the map
  2422.    * @return the task status of the completed task
  2423.    */
  2424.   public synchronized TaskStatus findFinishedMap(int mapId) {
  2425.     TaskInProgress tip = maps[mapId];
  2426.     if (tip.isComplete()) {
  2427.       TaskStatus[] statuses = tip.getTaskStatuses();
  2428.       for(int i=0; i < statuses.length; i++) {
  2429.         if (statuses[i].getRunState() == TaskStatus.State.SUCCEEDED) {
  2430.           return statuses[i];
  2431.         }
  2432.       }
  2433.     }
  2434.     return null;
  2435.   }
  2436.   
  2437.   synchronized int getNumTaskCompletionEvents() {
  2438.     return taskCompletionEvents.size();
  2439.   }
  2440.     
  2441.   synchronized public TaskCompletionEvent[] getTaskCompletionEvents(
  2442.                                                                     int fromEventId, int maxEvents) {
  2443.     TaskCompletionEvent[] events = TaskCompletionEvent.EMPTY_ARRAY;
  2444.     if (taskCompletionEvents.size() > fromEventId) {
  2445.       int actualMax = Math.min(maxEvents, 
  2446.                                (taskCompletionEvents.size() - fromEventId));
  2447.       events = taskCompletionEvents.subList(fromEventId, actualMax + fromEventId).toArray(events);        
  2448.     }
  2449.     return events; 
  2450.   }
  2451.   
  2452.   synchronized void fetchFailureNotification(TaskInProgress tip, 
  2453.                                              TaskAttemptID mapTaskId, 
  2454.                                              String trackerName) {
  2455.     Integer fetchFailures = mapTaskIdToFetchFailuresMap.get(mapTaskId);
  2456.     fetchFailures = (fetchFailures == null) ? 1 : (fetchFailures+1);
  2457.     mapTaskIdToFetchFailuresMap.put(mapTaskId, fetchFailures);
  2458.     LOG.info("Failed fetch notification #" + fetchFailures + " for task " + 
  2459.             mapTaskId);
  2460.     
  2461.     float failureRate = (float)fetchFailures / runningReduceTasks;
  2462.     // declare faulty if fetch-failures >= max-allowed-failures
  2463.     boolean isMapFaulty = (failureRate >= MAX_ALLOWED_FETCH_FAILURES_PERCENT) 
  2464.                           ? true
  2465.                           : false;
  2466.     if (fetchFailures >= MAX_FETCH_FAILURES_NOTIFICATIONS
  2467.         && isMapFaulty) {
  2468.       LOG.info("Too many fetch-failures for output of task: " + mapTaskId 
  2469.                + " ... killing it");
  2470.       
  2471.       failedTask(tip, mapTaskId, "Too many fetch-failures",                            
  2472.                  (tip.isMapTask() ? TaskStatus.Phase.MAP : 
  2473.                                     TaskStatus.Phase.REDUCE), 
  2474.                  TaskStatus.State.FAILED, trackerName);
  2475.       
  2476.       mapTaskIdToFetchFailuresMap.remove(mapTaskId);
  2477.     }
  2478.   }
  2479.   
  2480.   /**
  2481.    * @return The JobID of this JobInProgress.
  2482.    */
  2483.   public JobID getJobID() {
  2484.     return jobId;
  2485.   }
  2486.   
  2487.   public synchronized Object getSchedulingInfo() {
  2488.     return this.schedulingInfo;
  2489.   }
  2490.   
  2491.   public synchronized void setSchedulingInfo(Object schedulingInfo) {
  2492.     this.schedulingInfo = schedulingInfo;
  2493.     this.status.setSchedulingInfo(schedulingInfo.toString());
  2494.   }
  2495.   
  2496.   /**
  2497.    * To keep track of kill and initTasks status of this job. initTasks() take 
  2498.    * a lock on JobInProgress object. kill should avoid waiting on 
  2499.    * JobInProgress lock since it may take a while to do initTasks().
  2500.    */
  2501.   private static class JobInitKillStatus {
  2502.     //flag to be set if kill is called
  2503.     boolean killed;
  2504.     
  2505.     boolean initStarted;
  2506.     boolean initDone;
  2507.   }
  2508.   boolean isComplete() {
  2509.     return status.isJobComplete();
  2510.   }
  2511.   
  2512.   /**
  2513.    * Get the task type for logging it to {@link JobHistory}.
  2514.    */
  2515.   private String getTaskType(TaskInProgress tip) {
  2516.     if (tip.isJobCleanupTask()) {
  2517.       return Values.CLEANUP.name();
  2518.     } else if (tip.isJobSetupTask()) {
  2519.       return Values.SETUP.name();
  2520.     } else if (tip.isMapTask()) {
  2521.       return Values.MAP.name();
  2522.     } else {
  2523.       return Values.REDUCE.name();
  2524.     }
  2525.   }
  2526. }