TaskInProgress.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:36k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.IOException;
  20. import java.util.ArrayList;
  21. import java.util.Arrays;
  22. import java.util.Comparator;
  23. import java.util.Iterator;
  24. import java.util.List;
  25. import java.util.Map;
  26. import java.util.TreeMap;
  27. import java.util.TreeSet;
  28. import org.apache.commons.logging.Log;
  29. import org.apache.commons.logging.LogFactory;
  30. import org.apache.hadoop.mapred.JobClient.RawSplit;
  31. import org.apache.hadoop.mapred.SortedRanges.Range;
  32. import org.apache.hadoop.net.Node;
  33. /*************************************************************
  34.  * TaskInProgress maintains all the info needed for a
  35.  * Task in the lifetime of its owning Job.  A given Task
  36.  * might be speculatively executed or reexecuted, so we
  37.  * need a level of indirection above the running-id itself.
  38.  * <br>
  39.  * A given TaskInProgress contains multiple taskids,
  40.  * 0 or more of which might be executing at any one time.
  41.  * (That's what allows speculative execution.)  A taskid
  42.  * is now *never* recycled.  A TIP allocates enough taskids
  43.  * to account for all the speculation and failures it will
  44.  * ever have to handle.  Once those are up, the TIP is dead.
  45.  * **************************************************************
  46.  */
  47. class TaskInProgress {
  48.   static final int MAX_TASK_EXECS = 1;
  49.   int maxTaskAttempts = 4;    
  50.   static final double SPECULATIVE_GAP = 0.2;
  51.   static final long SPECULATIVE_LAG = 60 * 1000;
  52.   private static final int NUM_ATTEMPTS_PER_RESTART = 1000;
  53.   public static final Log LOG = LogFactory.getLog(TaskInProgress.class);
  54.   // Defines the TIP
  55.   private String jobFile = null;
  56.   private RawSplit rawSplit;
  57.   private int numMaps;
  58.   private int partition;
  59.   private JobTracker jobtracker;
  60.   private TaskID id;
  61.   private JobInProgress job;
  62.   // Status of the TIP
  63.   private int successEventNumber = -1;
  64.   private int numTaskFailures = 0;
  65.   private int numKilledTasks = 0;
  66.   private double progress = 0;
  67.   private String state = "";
  68.   private long startTime = 0;
  69.   private long execStartTime = 0;
  70.   private long execFinishTime = 0;
  71.   private int completes = 0;
  72.   private boolean failed = false;
  73.   private boolean killed = false;
  74.   private long maxSkipRecords = 0;
  75.   private FailedRanges failedRanges = new FailedRanges();
  76.   private volatile boolean skipping = false;
  77.   private boolean jobCleanup = false; 
  78.   private boolean jobSetup = false;
  79.    
  80.   // The 'next' usable taskid of this tip
  81.   int nextTaskId = 0;
  82.     
  83.   // The taskid that took this TIP to SUCCESS
  84.   private TaskAttemptID successfulTaskId;
  85.   // The first taskid of this tip
  86.   private TaskAttemptID firstTaskId;
  87.   
  88.   // Map from task Id -> TaskTracker Id, contains tasks that are
  89.   // currently runnings
  90.   private TreeMap<TaskAttemptID, String> activeTasks = new TreeMap<TaskAttemptID, String>();
  91.   // All attempt Ids of this TIP
  92.   private TreeSet<TaskAttemptID> tasks = new TreeSet<TaskAttemptID>();
  93.   private JobConf conf;
  94.   private Map<TaskAttemptID,List<String>> taskDiagnosticData =
  95.     new TreeMap<TaskAttemptID,List<String>>();
  96.   /**
  97.    * Map from taskId -> TaskStatus
  98.    */
  99.   private TreeMap<TaskAttemptID,TaskStatus> taskStatuses = 
  100.     new TreeMap<TaskAttemptID,TaskStatus>();
  101.   // Map from taskId -> TaskTracker Id, 
  102.   // contains cleanup attempts and where they ran, if any
  103.   private TreeMap<TaskAttemptID, String> cleanupTasks =
  104.     new TreeMap<TaskAttemptID, String>();
  105.   private TreeSet<String> machinesWhereFailed = new TreeSet<String>();
  106.   private TreeSet<TaskAttemptID> tasksReportedClosed = new TreeSet<TaskAttemptID>();
  107.   
  108.   //list of tasks to kill, <taskid> -> <shouldFail> 
  109.   private TreeMap<TaskAttemptID, Boolean> tasksToKill = new TreeMap<TaskAttemptID, Boolean>();
  110.   
  111.   //task to commit, <taskattemptid>  
  112.   private TaskAttemptID taskToCommit;
  113.   
  114.   private Counters counters = new Counters();
  115.   
  116.   /**
  117.    * Constructor for MapTask
  118.    */
  119.   public TaskInProgress(JobID jobid, String jobFile, 
  120.                         RawSplit rawSplit, 
  121.                         JobTracker jobtracker, JobConf conf, 
  122.                         JobInProgress job, int partition) {
  123.     this.jobFile = jobFile;
  124.     this.rawSplit = rawSplit;
  125.     this.jobtracker = jobtracker;
  126.     this.job = job;
  127.     this.conf = conf;
  128.     this.partition = partition;
  129.     this.maxSkipRecords = SkipBadRecords.getMapperMaxSkipRecords(conf);
  130.     setMaxTaskAttempts();
  131.     init(jobid);
  132.   }
  133.         
  134.   /**
  135.    * Constructor for ReduceTask
  136.    */
  137.   public TaskInProgress(JobID jobid, String jobFile, 
  138.                         int numMaps, 
  139.                         int partition, JobTracker jobtracker, JobConf conf,
  140.                         JobInProgress job) {
  141.     this.jobFile = jobFile;
  142.     this.numMaps = numMaps;
  143.     this.partition = partition;
  144.     this.jobtracker = jobtracker;
  145.     this.job = job;
  146.     this.conf = conf;
  147.     this.maxSkipRecords = SkipBadRecords.getReducerMaxSkipGroups(conf);
  148.     setMaxTaskAttempts();
  149.     init(jobid);
  150.   }
  151.   
  152.   /**
  153.    * Set the max number of attempts before we declare a TIP as "failed"
  154.    */
  155.   private void setMaxTaskAttempts() {
  156.     if (isMapTask()) {
  157.       this.maxTaskAttempts = conf.getMaxMapAttempts();
  158.     } else {
  159.       this.maxTaskAttempts = conf.getMaxReduceAttempts();
  160.     }
  161.   }
  162.     
  163.   /**
  164.    * Return the index of the tip within the job, so 
  165.    * "task_200707121733_1313_0002_m_012345" would return 12345;
  166.    * @return int the tip index
  167.    */
  168.   public int idWithinJob() {
  169.     return partition;
  170.   }    
  171.   public boolean isJobCleanupTask() {
  172.    return jobCleanup;
  173.   }
  174.   
  175.   public void setJobCleanupTask() {
  176.     jobCleanup = true;
  177.   }
  178.   public boolean isJobSetupTask() {
  179.     return jobSetup;
  180.   }
  181.   
  182.   public void setJobSetupTask() {
  183.     jobSetup = true;
  184.   }
  185.   public boolean isOnlyCommitPending() {
  186.     for (TaskStatus t : taskStatuses.values()) {
  187.       if (t.getRunState() == TaskStatus.State.COMMIT_PENDING) {
  188.         return true;
  189.       }
  190.     }
  191.     return false;
  192.   }
  193.  
  194.   public boolean isCommitPending(TaskAttemptID taskId) {
  195.     TaskStatus t = taskStatuses.get(taskId);
  196.     if (t == null) {
  197.       return false;
  198.     }
  199.     return t.getRunState() ==  TaskStatus.State.COMMIT_PENDING;
  200.   }
  201.   
  202.   /**
  203.    * Initialization common to Map and Reduce
  204.    */
  205.   void init(JobID jobId) {
  206.     this.startTime = System.currentTimeMillis();
  207.     this.id = new TaskID(jobId, isMapTask(), partition);
  208.     this.skipping = startSkipping();
  209.   }
  210.   ////////////////////////////////////
  211.   // Accessors, info, profiles, etc.
  212.   ////////////////////////////////////
  213.   /**
  214.    * Return the start time
  215.    */
  216.   public long getStartTime() {
  217.     return startTime;
  218.   }
  219.   
  220.   /**
  221.    * Return the exec start time
  222.    */
  223.   public long getExecStartTime() {
  224.     return execStartTime;
  225.   }
  226.   
  227.   /**
  228.    * Set the exec start time
  229.    */
  230.   public void setExecStartTime(long startTime) {
  231.     execStartTime = startTime;
  232.   }
  233.   
  234.   /**
  235.    * Return the exec finish time
  236.    */
  237.   public long getExecFinishTime() {
  238.     return execFinishTime;
  239.   }
  240.   /**
  241.    * Set the exec finish time
  242.    */
  243.   public void setExecFinishTime(long finishTime) {
  244.     execFinishTime = finishTime;
  245.     JobHistory.Task.logUpdates(id, execFinishTime); // log the update
  246.   }
  247.   
  248.   /**
  249.    * Return the parent job
  250.    */
  251.   public JobInProgress getJob() {
  252.     return job;
  253.   }
  254.   /**
  255.    * Return an ID for this task, not its component taskid-threads
  256.    */
  257.   public TaskID getTIPId() {
  258.     return this.id;
  259.   }
  260.   /**
  261.    * Whether this is a map task
  262.    */
  263.   public boolean isMapTask() {
  264.     return rawSplit != null;
  265.   }
  266.     
  267.   /**
  268.    * Is the Task associated with taskid is the first attempt of the tip? 
  269.    * @param taskId
  270.    * @return Returns true if the Task is the first attempt of the tip
  271.    */  
  272.   public boolean isFirstAttempt(TaskAttemptID taskId) {
  273.     return firstTaskId == null ? false : firstTaskId.equals(taskId); 
  274.   }
  275.   
  276.   /**
  277.    * Is this tip currently running any tasks?
  278.    * @return true if any tasks are running
  279.    */
  280.   public boolean isRunning() {
  281.     return !activeTasks.isEmpty();
  282.   }
  283.     
  284.   TaskAttemptID getSuccessfulTaskid() {
  285.     return successfulTaskId;
  286.   }
  287.   
  288.   private void setSuccessfulTaskid(TaskAttemptID successfulTaskId) {
  289.     this.successfulTaskId = successfulTaskId; 
  290.   }
  291.   
  292.   private void resetSuccessfulTaskid() {
  293.     this.successfulTaskId = null; 
  294.   }
  295.   
  296.   /**
  297.    * Is this tip complete?
  298.    * 
  299.    * @return <code>true</code> if the tip is complete, else <code>false</code>
  300.    */
  301.   public synchronized boolean isComplete() {
  302.     return (completes > 0);
  303.   }
  304.   /**
  305.    * Is the given taskid the one that took this tip to completion?
  306.    * 
  307.    * @param taskid taskid of attempt to check for completion
  308.    * @return <code>true</code> if taskid is complete, else <code>false</code>
  309.    */
  310.   public boolean isComplete(TaskAttemptID taskid) {
  311.     return ((completes > 0) 
  312.             && taskid.equals(getSuccessfulTaskid()));
  313.   }
  314.   /**
  315.    * Is the tip a failure?
  316.    * 
  317.    * @return <code>true</code> if tip has failed, else <code>false</code>
  318.    */
  319.   public boolean isFailed() {
  320.     return failed;
  321.   }
  322.   /**
  323.    * Number of times the TaskInProgress has failed.
  324.    */
  325.   public int numTaskFailures() {
  326.     return numTaskFailures;
  327.   }
  328.   /**
  329.    * Number of times the TaskInProgress has been killed by the framework.
  330.    */
  331.   public int numKilledTasks() {
  332.     return numKilledTasks;
  333.   }
  334.   /**
  335.    * Get the overall progress (from 0 to 1.0) for this TIP
  336.    */
  337.   public double getProgress() {
  338.     return progress;
  339.   }
  340.     
  341.   /**
  342.    * Get the task's counters
  343.    */
  344.   public Counters getCounters() {
  345.     return counters;
  346.   }
  347.   /**
  348.    * Returns whether a component task-thread should be 
  349.    * closed because the containing JobInProgress has completed
  350.    * or the task is killed by the user
  351.    */
  352.   public boolean shouldClose(TaskAttemptID taskid) {
  353.     /**
  354.      * If the task hasn't been closed yet, and it belongs to a completed
  355.      * TaskInProgress close it.
  356.      * 
  357.      * However, for completed map tasks we do not close the task which
  358.      * actually was the one responsible for _completing_ the TaskInProgress. 
  359.      */
  360.     boolean close = false;
  361.     TaskStatus ts = taskStatuses.get(taskid);
  362.     if ((ts != null) &&
  363.         (!tasksReportedClosed.contains(taskid)) &&
  364.         ((this.failed) ||
  365.         ((job.getStatus().getRunState() != JobStatus.RUNNING &&
  366.          (job.getStatus().getRunState() != JobStatus.PREP))))) {
  367.       tasksReportedClosed.add(taskid);
  368.       close = true;
  369.     } else if (isComplete() && 
  370.                !(isMapTask() && !jobSetup && 
  371.                    !jobCleanup && isComplete(taskid)) &&
  372.                !tasksReportedClosed.contains(taskid)) {
  373.       tasksReportedClosed.add(taskid);
  374.       close = true; 
  375.     } else if (isCommitPending(taskid) && !shouldCommit(taskid) &&
  376.                !tasksReportedClosed.contains(taskid)) {
  377.       tasksReportedClosed.add(taskid);
  378.       close = true; 
  379.     } else {
  380.       close = tasksToKill.keySet().contains(taskid);
  381.     }   
  382.     return close;
  383.   }
  384.   /**
  385.    * Commit this task attempt for the tip. 
  386.    * @param taskid
  387.    */
  388.   public void doCommit(TaskAttemptID taskid) {
  389.     taskToCommit = taskid;
  390.   }
  391.   /**
  392.    * Returns whether the task attempt should be committed or not 
  393.    */
  394.   public boolean shouldCommit(TaskAttemptID taskid) {
  395.     return !isComplete() && isCommitPending(taskid) && 
  396.            taskToCommit.equals(taskid);
  397.   }
  398.   /**
  399.    * Creates a "status report" for this task.  Includes the
  400.    * task ID and overall status, plus reports for all the
  401.    * component task-threads that have ever been started.
  402.    */
  403.   synchronized TaskReport generateSingleReport() {
  404.     ArrayList<String> diagnostics = new ArrayList<String>();
  405.     for (List<String> l : taskDiagnosticData.values()) {
  406.       diagnostics.addAll(l);
  407.     }
  408.     TIPStatus currentStatus = null;
  409.     if (isRunning() && !isComplete()) {
  410.       currentStatus = TIPStatus.RUNNING;
  411.     } else if (isComplete()) {
  412.       currentStatus = TIPStatus.COMPLETE;
  413.     } else if (wasKilled()) {
  414.       currentStatus = TIPStatus.KILLED;
  415.     } else if (isFailed()) {
  416.       currentStatus = TIPStatus.FAILED;
  417.     } else if (!(isComplete() || isRunning() || wasKilled())) {
  418.       currentStatus = TIPStatus.PENDING;
  419.     }
  420.     
  421.     TaskReport report = new TaskReport
  422.       (getTIPId(), (float)progress, state,
  423.        diagnostics.toArray(new String[diagnostics.size()]),
  424.        currentStatus, execStartTime, execFinishTime, counters);
  425.     if (currentStatus == TIPStatus.RUNNING) {
  426.       report.setRunningTaskAttempts(activeTasks.keySet());
  427.     } else if (currentStatus == TIPStatus.COMPLETE) {
  428.       report.setSuccessfulAttempt(getSuccessfulTaskid());
  429.     }
  430.     return report;
  431.   }
  432.   /**
  433.    * Get the diagnostic messages for a given task within this tip.
  434.    * 
  435.    * @param taskId the id of the required task
  436.    * @return the list of diagnostics for that task
  437.    */
  438.   synchronized List<String> getDiagnosticInfo(TaskAttemptID taskId) {
  439.     return taskDiagnosticData.get(taskId);
  440.   }
  441.     
  442.   ////////////////////////////////////////////////
  443.   // Update methods, usually invoked by the owning
  444.   // job.
  445.   ////////////////////////////////////////////////
  446.   
  447.   /**
  448.    * Save diagnostic information for a given task.
  449.    * 
  450.    * @param taskId id of the task 
  451.    * @param diagInfo diagnostic information for the task
  452.    */
  453.   public void addDiagnosticInfo(TaskAttemptID taskId, String diagInfo) {
  454.     List<String> diagHistory = taskDiagnosticData.get(taskId);
  455.     if (diagHistory == null) {
  456.       diagHistory = new ArrayList<String>();
  457.       taskDiagnosticData.put(taskId, diagHistory);
  458.     }
  459.     diagHistory.add(diagInfo);
  460.   }
  461.   
  462.   /**
  463.    * A status message from a client has arrived.
  464.    * It updates the status of a single component-thread-task,
  465.    * which might result in an overall TaskInProgress status update.
  466.    * @return has the task changed its state noticably?
  467.    */
  468.   synchronized boolean updateStatus(TaskStatus status) {
  469.     TaskAttemptID taskid = status.getTaskID();
  470.     String diagInfo = status.getDiagnosticInfo();
  471.     TaskStatus oldStatus = taskStatuses.get(taskid);
  472.     boolean changed = true;
  473.     if (diagInfo != null && diagInfo.length() > 0) {
  474.       LOG.info("Error from "+taskid+": "+diagInfo);
  475.       addDiagnosticInfo(taskid, diagInfo);
  476.     }
  477.     
  478.     if(skipping) {
  479.       failedRanges.updateState(status);
  480.     }
  481.     
  482.     if (oldStatus != null) {
  483.       TaskStatus.State oldState = oldStatus.getRunState();
  484.       TaskStatus.State newState = status.getRunState();
  485.           
  486.       // We should never recieve a duplicate success/failure/killed
  487.       // status update for the same taskid! This is a safety check, 
  488.       // and is addressed better at the TaskTracker to ensure this.
  489.       // @see {@link TaskTracker.transmitHeartbeat()}
  490.       if ((newState != TaskStatus.State.RUNNING && 
  491.            newState != TaskStatus.State.COMMIT_PENDING && 
  492.            newState != TaskStatus.State.FAILED_UNCLEAN && 
  493.            newState != TaskStatus.State.KILLED_UNCLEAN && 
  494.            newState != TaskStatus.State.UNASSIGNED) && 
  495.           (oldState == newState)) {
  496.         LOG.warn("Recieved duplicate status update of '" + newState + 
  497.                  "' for '" + taskid + "' of TIP '" + getTIPId() + "'");
  498.         return false;
  499.       }
  500.       // The task is not allowed to move from completed back to running.
  501.       // We have seen out of order status messagesmoving tasks from complete
  502.       // to running. This is a spot fix, but it should be addressed more
  503.       // globally.
  504.       if ((newState == TaskStatus.State.RUNNING || 
  505.           newState == TaskStatus.State.UNASSIGNED) &&
  506.           (oldState == TaskStatus.State.FAILED || 
  507.            oldState == TaskStatus.State.KILLED || 
  508.            oldState == TaskStatus.State.FAILED_UNCLEAN || 
  509.            oldState == TaskStatus.State.KILLED_UNCLEAN || 
  510.            oldState == TaskStatus.State.SUCCEEDED ||
  511.            oldState == TaskStatus.State.COMMIT_PENDING)) {
  512.         return false;
  513.       }
  514.       
  515.       //Do not accept any status once the task is marked FAILED/KILLED
  516.       //This is to handle the case of the JobTracker timing out a task
  517.       //due to launch delay, but the TT comes back with any state or 
  518.       //TT got expired
  519.       if (oldState == TaskStatus.State.FAILED ||
  520.           oldState == TaskStatus.State.KILLED) {
  521.         tasksToKill.put(taskid, true);
  522.         return false;   
  523.       }
  524.           
  525.       changed = oldState != newState;
  526.     }
  527.     // if task is a cleanup attempt, do not replace the complete status,
  528.     // update only specific fields.
  529.     // For example, startTime should not be updated, 
  530.     // but finishTime has to be updated.
  531.     if (!isCleanupAttempt(taskid)) {
  532.       taskStatuses.put(taskid, status);
  533.     } else {
  534.       taskStatuses.get(taskid).statusUpdate(status.getRunState(),
  535.         status.getProgress(), status.getStateString(), status.getPhase(),
  536.         status.getFinishTime());
  537.     }
  538.     // Recompute progress
  539.     recomputeProgress();
  540.     return changed;
  541.   }
  542.   /**
  543.    * Indicate that one of the taskids in this TaskInProgress
  544.    * has failed.
  545.    */
  546.   public void incompleteSubTask(TaskAttemptID taskid, 
  547.                                 JobStatus jobStatus) {
  548.     //
  549.     // Note the failure and its location
  550.     //
  551.     TaskStatus status = taskStatuses.get(taskid);
  552.     String trackerName;
  553.     String trackerHostName = null;
  554.     TaskStatus.State taskState = TaskStatus.State.FAILED;
  555.     if (status != null) {
  556.       trackerName = status.getTaskTracker();
  557.       trackerHostName = 
  558.         JobInProgress.convertTrackerNameToHostName(trackerName);
  559.       // Check if the user manually KILLED/FAILED this task-attempt...
  560.       Boolean shouldFail = tasksToKill.remove(taskid);
  561.       if (shouldFail != null) {
  562.         if (status.getRunState() == TaskStatus.State.FAILED ||
  563.             status.getRunState() == TaskStatus.State.KILLED) {
  564.           taskState = (shouldFail) ? TaskStatus.State.FAILED :
  565.                                      TaskStatus.State.KILLED;
  566.         } else {
  567.           taskState = (shouldFail) ? TaskStatus.State.FAILED_UNCLEAN :
  568.                                      TaskStatus.State.KILLED_UNCLEAN;
  569.           
  570.         }
  571.         status.setRunState(taskState);
  572.         addDiagnosticInfo(taskid, "Task has been " + taskState + " by the user" );
  573.       }
  574.  
  575.       taskState = status.getRunState();
  576.       if (taskState != TaskStatus.State.FAILED && 
  577.           taskState != TaskStatus.State.KILLED &&
  578.           taskState != TaskStatus.State.FAILED_UNCLEAN &&
  579.           taskState != TaskStatus.State.KILLED_UNCLEAN) {
  580.         LOG.info("Task '" + taskid + "' running on '" + trackerName + 
  581.                 "' in state: '" + taskState + "' being failed!");
  582.         status.setRunState(TaskStatus.State.FAILED);
  583.         taskState = TaskStatus.State.FAILED;
  584.       }
  585.       // tasktracker went down and failed time was not reported. 
  586.       if (0 == status.getFinishTime()){
  587.         status.setFinishTime(System.currentTimeMillis());
  588.       }
  589.     }
  590.     this.activeTasks.remove(taskid);
  591.     
  592.     // Since we do not fail completed reduces (whose outputs go to hdfs), we 
  593.     // should note this failure only for completed maps, only if this taskid;
  594.     // completed this map. however if the job is done, there is no need to 
  595.     // manipulate completed maps
  596.     if (this.isMapTask() && !jobSetup && !jobCleanup && isComplete(taskid) && 
  597.         jobStatus.getRunState() != JobStatus.SUCCEEDED) {
  598.       this.completes--;
  599.       
  600.       // Reset the successfulTaskId since we don't have a SUCCESSFUL task now
  601.       resetSuccessfulTaskid();
  602.     }
  603.     // Note that there can be failures of tasks that are hosted on a machine 
  604.     // that has not yet registered with restarted jobtracker
  605.     // recalculate the counts only if its a genuine failure
  606.     if (tasks.contains(taskid)) {
  607.       if (taskState == TaskStatus.State.FAILED) {
  608.         numTaskFailures++;
  609.         machinesWhereFailed.add(trackerHostName);
  610.         if(maxSkipRecords>0) {
  611.           //skipping feature enabled
  612.           LOG.debug("TaskInProgress adding" + status.getNextRecordRange());
  613.           failedRanges.add(status.getNextRecordRange());
  614.           skipping = startSkipping();
  615.         }
  616.       } else if (taskState == TaskStatus.State.KILLED) {
  617.         numKilledTasks++;
  618.       }
  619.     }
  620.     if (numTaskFailures >= maxTaskAttempts) {
  621.       LOG.info("TaskInProgress " + getTIPId() + " has failed " + numTaskFailures + " times.");
  622.       kill();
  623.     }
  624.   }
  625.   
  626.   /**
  627.    * Get whether to start skipping mode. 
  628.    */
  629.   private boolean startSkipping() {
  630.     if(maxSkipRecords>0 && 
  631.         numTaskFailures>=SkipBadRecords.getAttemptsToStartSkipping(conf)) {
  632.       return true;
  633.     }
  634.     return false;
  635.   }
  636.   /**
  637.    * Finalize the <b>completed</b> task; note that this might not be the first 
  638.    * task-attempt of the {@link TaskInProgress} and hence might be declared 
  639.    * {@link TaskStatus.State.SUCCEEDED} or {@link TaskStatus.State.KILLED}
  640.    * 
  641.    * @param taskId id of the completed task-attempt
  642.    * @param finalTaskState final {@link TaskStatus.State} of the task-attempt
  643.    */
  644.   private void completedTask(TaskAttemptID taskId, TaskStatus.State finalTaskState) {
  645.     TaskStatus status = taskStatuses.get(taskId);
  646.     status.setRunState(finalTaskState);
  647.     activeTasks.remove(taskId);
  648.   }
  649.   
  650.   /**
  651.    * Indicate that one of the taskids in this already-completed
  652.    * TaskInProgress has successfully completed; hence we mark this
  653.    * taskid as {@link TaskStatus.State.KILLED}. 
  654.    */
  655.   void alreadyCompletedTask(TaskAttemptID taskid) {
  656.     // 'KILL' the task 
  657.     completedTask(taskid, TaskStatus.State.KILLED);
  658.     
  659.     // Note the reason for the task being 'KILLED'
  660.     addDiagnosticInfo(taskid, "Already completed TIP");
  661.     
  662.     LOG.info("Already complete TIP " + getTIPId() + 
  663.              " has completed task " + taskid);
  664.   }
  665.   /**
  666.    * Indicate that one of the taskids in this TaskInProgress
  667.    * has successfully completed!
  668.    */
  669.   public void completed(TaskAttemptID taskid) {
  670.     //
  671.     // Record that this taskid is complete
  672.     //
  673.     completedTask(taskid, TaskStatus.State.SUCCEEDED);
  674.         
  675.     // Note the successful taskid
  676.     setSuccessfulTaskid(taskid);
  677.     
  678.     //
  679.     // Now that the TIP is complete, the other speculative 
  680.     // subtasks will be closed when the owning tasktracker 
  681.     // reports in and calls shouldClose() on this object.
  682.     //
  683.     this.completes++;
  684.     this.execFinishTime = System.currentTimeMillis();
  685.     recomputeProgress();
  686.     
  687.   }
  688.   /**
  689.    * Get the split locations 
  690.    */
  691.   public String[] getSplitLocations() {
  692.     return rawSplit.getLocations();
  693.   }
  694.   
  695.   /**
  696.    * Get the Status of the tasks managed by this TIP
  697.    */
  698.   public TaskStatus[] getTaskStatuses() {
  699.     return taskStatuses.values().toArray(new TaskStatus[taskStatuses.size()]);
  700.   }
  701.   /**
  702.    * Get the status of the specified task
  703.    * @param taskid
  704.    * @return
  705.    */
  706.   public TaskStatus getTaskStatus(TaskAttemptID taskid) {
  707.     return taskStatuses.get(taskid);
  708.   }
  709.   /**
  710.    * The TIP's been ordered kill()ed.
  711.    */
  712.   public void kill() {
  713.     if (isComplete() || failed) {
  714.       return;
  715.     }
  716.     this.failed = true;
  717.     killed = true;
  718.     this.execFinishTime = System.currentTimeMillis();
  719.     recomputeProgress();
  720.   }
  721.   /**
  722.    * Was the task killed?
  723.    * @return true if the task killed
  724.    */
  725.   public boolean wasKilled() {
  726.     return killed;
  727.   }
  728.   
  729.   /**
  730.    * Kill the given task
  731.    */
  732.   boolean killTask(TaskAttemptID taskId, boolean shouldFail) {
  733.     TaskStatus st = taskStatuses.get(taskId);
  734.     if(st != null && (st.getRunState() == TaskStatus.State.RUNNING
  735.         || st.getRunState() == TaskStatus.State.COMMIT_PENDING ||
  736.         st.inTaskCleanupPhase() ||
  737.         st.getRunState() == TaskStatus.State.UNASSIGNED)
  738.         && tasksToKill.put(taskId, shouldFail) == null ) {
  739.       String logStr = "Request received to " + (shouldFail ? "fail" : "kill") 
  740.                       + " task '" + taskId + "' by user";
  741.       addDiagnosticInfo(taskId, logStr);
  742.       LOG.info(logStr);
  743.       return true;
  744.     }
  745.     return false;
  746.   }
  747.   /**
  748.    * This method is called whenever there's a status change
  749.    * for one of the TIP's sub-tasks.  It recomputes the overall 
  750.    * progress for the TIP.  We examine all sub-tasks and find 
  751.    * the one that's most advanced (and non-failed).
  752.    */
  753.   void recomputeProgress() {
  754.     if (isComplete()) {
  755.       this.progress = 1;
  756.       // update the counters and the state
  757.       TaskStatus completedStatus = taskStatuses.get(getSuccessfulTaskid());
  758.       this.counters = completedStatus.getCounters();
  759.       this.state = completedStatus.getStateString();
  760.     } else if (failed) {
  761.       this.progress = 0;
  762.       // reset the counters and the state
  763.       this.state = "";
  764.       this.counters = new Counters();
  765.     } else {
  766.       double bestProgress = 0;
  767.       String bestState = "";
  768.       Counters bestCounters = new Counters();
  769.       for (Iterator<TaskAttemptID> it = taskStatuses.keySet().iterator(); it.hasNext();) {
  770.         TaskAttemptID taskid = it.next();
  771.         TaskStatus status = taskStatuses.get(taskid);
  772.         if (status.getRunState() == TaskStatus.State.SUCCEEDED) {
  773.           bestProgress = 1;
  774.           bestState = status.getStateString();
  775.           bestCounters = status.getCounters();
  776.           break;
  777.         } else if (status.getRunState() == TaskStatus.State.COMMIT_PENDING) {
  778.           //for COMMIT_PENDING, we take the last state that we recorded
  779.           //when the task was RUNNING
  780.           bestProgress = this.progress;
  781.           bestState = this.state;
  782.           bestCounters = this.counters;
  783.         } else if (status.getRunState() == TaskStatus.State.RUNNING) {
  784.           if (status.getProgress() >= bestProgress) {
  785.             bestProgress = status.getProgress();
  786.             bestState = status.getStateString();
  787.             if (status.getIncludeCounters()) {
  788.               bestCounters = status.getCounters();
  789.             } else {
  790.               bestCounters = this.counters;
  791.             }
  792.           }
  793.         }
  794.       }
  795.       this.progress = bestProgress;
  796.       this.state = bestState;
  797.       this.counters = bestCounters;
  798.     }
  799.   }
  800.   /////////////////////////////////////////////////
  801.   // "Action" methods that actually require the TIP
  802.   // to do something.
  803.   /////////////////////////////////////////////////
  804.   /**
  805.    * Return whether this TIP still needs to run
  806.    */
  807.   boolean isRunnable() {
  808.     return !failed && (completes == 0);
  809.   }
  810.     
  811.   /**
  812.    * Return whether the TIP has a speculative task to run.  We
  813.    * only launch a speculative task if the current TIP is really
  814.    * far behind, and has been behind for a non-trivial amount of 
  815.    * time.
  816.    */
  817.   boolean hasSpeculativeTask(long currentTime, double averageProgress) {
  818.     //
  819.     // REMIND - mjc - these constants should be examined
  820.     // in more depth eventually...
  821.     //
  822.       
  823.     if (!skipping && activeTasks.size() <= MAX_TASK_EXECS &&
  824.         (averageProgress - progress >= SPECULATIVE_GAP) &&
  825.         (currentTime - startTime >= SPECULATIVE_LAG) 
  826.         && completes == 0 && !isOnlyCommitPending()) {
  827.       return true;
  828.     }
  829.     return false;
  830.   }
  831.     
  832.   /**
  833.    * Return a Task that can be sent to a TaskTracker for execution.
  834.    */
  835.   public Task getTaskToRun(String taskTracker) throws IOException {
  836.     if (0 == execStartTime){
  837.       // assume task starts running now
  838.       execStartTime = System.currentTimeMillis();
  839.     }
  840.     // Create the 'taskid'; do not count the 'killed' tasks against the job!
  841.     TaskAttemptID taskid = null;
  842.     if (nextTaskId < (MAX_TASK_EXECS + maxTaskAttempts + numKilledTasks)) {
  843.       // Make sure that the attempts are unqiue across restarts
  844.       int attemptId = job.getNumRestarts() * NUM_ATTEMPTS_PER_RESTART + nextTaskId;
  845.       taskid = new TaskAttemptID( id, attemptId);
  846.       ++nextTaskId;
  847.     } else {
  848.       LOG.warn("Exceeded limit of " + (MAX_TASK_EXECS + maxTaskAttempts) +
  849.               " (plus " + numKilledTasks + " killed)"  + 
  850.               " attempts for the tip '" + getTIPId() + "'");
  851.       return null;
  852.     }
  853.     return addRunningTask(taskid, taskTracker);
  854.   }
  855.   
  856.   public Task addRunningTask(TaskAttemptID taskid, String taskTracker) {
  857.     return addRunningTask(taskid, taskTracker, false);
  858.   }
  859.   
  860.   /**
  861.    * Adds a previously running task to this tip. This is used in case of 
  862.    * jobtracker restarts.
  863.    */
  864.   public Task addRunningTask(TaskAttemptID taskid, 
  865.                              String taskTracker,
  866.                              boolean taskCleanup) {
  867.     // create the task
  868.     Task t = null;
  869.     if (isMapTask()) {
  870.       LOG.debug("attempt "+  numTaskFailures   +
  871.           " sending skippedRecords "+failedRanges.getIndicesCount());
  872.       t = new MapTask(jobFile, taskid, partition, 
  873.           rawSplit.getClassName(), rawSplit.getBytes());
  874.     } else {
  875.       t = new ReduceTask(jobFile, taskid, partition, numMaps);
  876.     }
  877.     if (jobCleanup) {
  878.       t.setJobCleanupTask();
  879.     }
  880.     if (jobSetup) {
  881.       t.setJobSetupTask();
  882.     }
  883.     if (taskCleanup) {
  884.       t.setTaskCleanupTask();
  885.       t.setState(taskStatuses.get(taskid).getRunState());
  886.       cleanupTasks.put(taskid, taskTracker);
  887.     }
  888.     t.setConf(conf);
  889.     LOG.debug("Launching task with skipRanges:"+failedRanges.getSkipRanges());
  890.     t.setSkipRanges(failedRanges.getSkipRanges());
  891.     t.setSkipping(skipping);
  892.     if(failedRanges.isTestAttempt()) {
  893.       t.setWriteSkipRecs(false);
  894.     }
  895.     activeTasks.put(taskid, taskTracker);
  896.     tasks.add(taskid);
  897.     // Ask JobTracker to note that the task exists
  898.     jobtracker.createTaskEntry(taskid, taskTracker, this);
  899.     // check and set the first attempt
  900.     if (firstTaskId == null) {
  901.       firstTaskId = taskid;
  902.     }
  903.     return t;
  904.   }
  905.   boolean isRunningTask(TaskAttemptID taskid) {
  906.     TaskStatus status = taskStatuses.get(taskid);
  907.     return status != null && status.getRunState() == TaskStatus.State.RUNNING;
  908.   }
  909.   
  910.   boolean isCleanupAttempt(TaskAttemptID taskid) {
  911.     return cleanupTasks.containsKey(taskid);
  912.   }
  913.   
  914.   String machineWhereCleanupRan(TaskAttemptID taskid) {
  915.     return cleanupTasks.get(taskid);
  916.   }
  917.   
  918.   String machineWhereTaskRan(TaskAttemptID taskid) {
  919.     return taskStatuses.get(taskid).getTaskTracker();
  920.   }
  921.     
  922.   boolean wasKilled(TaskAttemptID taskid) {
  923.     return tasksToKill.containsKey(taskid);
  924.   }
  925.   
  926.   /**
  927.    * Has this task already failed on this machine?
  928.    * @param trackerHost The task tracker hostname
  929.    * @return Has it failed?
  930.    */
  931.   public boolean hasFailedOnMachine(String trackerHost) {
  932.     return machinesWhereFailed.contains(trackerHost);
  933.   }
  934.     
  935.   /**
  936.    * Was this task ever scheduled to run on this machine?
  937.    * @param trackerHost The task tracker hostname 
  938.    * @param trackerName The tracker name
  939.    * @return Was task scheduled on the tracker?
  940.    */
  941.   public boolean hasRunOnMachine(String trackerHost, String trackerName) {
  942.     return this.activeTasks.values().contains(trackerName) || 
  943.       hasFailedOnMachine(trackerHost);
  944.   }
  945.   /**
  946.    * Get the number of machines where this task has failed.
  947.    * @return the size of the failed machine set
  948.    */
  949.   public int getNumberOfFailedMachines() {
  950.     return machinesWhereFailed.size();
  951.   }
  952.     
  953.   /**
  954.    * Get the id of this map or reduce task.
  955.    * @return The index of this tip in the maps/reduces lists.
  956.    */
  957.   public int getIdWithinJob() {
  958.     return partition;
  959.   }
  960.     
  961.   /**
  962.    * Set the event number that was raised for this tip
  963.    */
  964.   public void setSuccessEventNumber(int eventNumber) {
  965.     successEventNumber = eventNumber;
  966.   }
  967.        
  968.   /**
  969.    * Get the event number that was raised for this tip
  970.    */
  971.   public int getSuccessEventNumber() {
  972.     return successEventNumber;
  973.   }
  974.   
  975.   /** 
  976.    * Gets the Node list of input split locations sorted in rack order.
  977.    */ 
  978.   public String getSplitNodes() {
  979.     if ( rawSplit == null) {
  980.       return "";
  981.     }
  982.     String[] splits = rawSplit.getLocations();
  983.     Node[] nodes = new Node[splits.length];
  984.     for (int i = 0; i < splits.length; i++) {
  985.       nodes[i] = jobtracker.getNode(splits[i]);
  986.     }
  987.     // sort nodes on rack location
  988.     Arrays.sort(nodes, new Comparator<Node>() {
  989.       public int compare(Node a, Node b) {
  990.         String left = a.getNetworkLocation();
  991.         String right = b.getNetworkLocation();
  992.         return left.compareTo(right);
  993.       }
  994.     }); 
  995.     return nodeToString(nodes);
  996.   }
  997.   private static String nodeToString(Node[] nodes) {
  998.     if (nodes == null || nodes.length == 0) {
  999.       return "";
  1000.     }
  1001.     StringBuffer ret = new StringBuffer(nodes[0].toString());
  1002.     for(int i = 1; i < nodes.length;i++) {
  1003.       ret.append(",");
  1004.       ret.append(nodes[i].toString());
  1005.     }
  1006.     return ret.toString();
  1007.   }
  1008.   public long getMapInputSize() {
  1009.     if(isMapTask() && !jobSetup && !jobCleanup) {
  1010.       return rawSplit.getDataLength();
  1011.     } else {
  1012.       return 0;
  1013.     }
  1014.   }
  1015.   
  1016.   public void clearSplit() {
  1017.     rawSplit.clearBytes();
  1018.   }
  1019.   
  1020.   /**
  1021.    * This class keeps the records to be skipped during further executions 
  1022.    * based on failed records from all the previous attempts.
  1023.    * It also narrow down the skip records if it is more than the 
  1024.    * acceptable value by dividing the failed range into half. In this case one 
  1025.    * half is executed in the next attempt (test attempt). 
  1026.    * In the test attempt, only the test range gets executed, others get skipped. 
  1027.    * Based on the success/failure of the test attempt, the range is divided 
  1028.    * further.
  1029.    */
  1030.   private class FailedRanges {
  1031.     private SortedRanges skipRanges = new SortedRanges();
  1032.     private Divide divide;
  1033.     
  1034.     synchronized SortedRanges getSkipRanges() {
  1035.       if(divide!=null) {
  1036.         return divide.skipRange;
  1037.       }
  1038.       return skipRanges;
  1039.     }
  1040.     
  1041.     synchronized boolean isTestAttempt() {
  1042.       return divide!=null;
  1043.     }
  1044.     
  1045.     synchronized long getIndicesCount() {
  1046.       if(isTestAttempt()) {
  1047.         return divide.skipRange.getIndicesCount();
  1048.       }
  1049.       return skipRanges.getIndicesCount();
  1050.     }
  1051.     
  1052.     synchronized void updateState(TaskStatus status){
  1053.       if (isTestAttempt() && 
  1054.           (status.getRunState() == TaskStatus.State.SUCCEEDED)) {
  1055.         divide.testPassed = true;
  1056.         //since it was the test attempt we need to set it to failed
  1057.         //as it worked only on the test range
  1058.         status.setRunState(TaskStatus.State.FAILED);
  1059.         
  1060.       }
  1061.     }
  1062.     
  1063.     synchronized void add(Range failedRange) {
  1064.       LOG.warn("FailedRange:"+ failedRange);
  1065.       if(divide!=null) {
  1066.         LOG.warn("FailedRange:"+ failedRange +"  test:"+divide.test +
  1067.             "  pass:"+divide.testPassed);
  1068.         if(divide.testPassed) {
  1069.           //test range passed
  1070.           //other range would be bad. test it
  1071.           failedRange = divide.other;
  1072.         }
  1073.         else {
  1074.           //test range failed
  1075.           //other range would be good.
  1076.           failedRange = divide.test;
  1077.         }
  1078.         //reset
  1079.         divide = null;
  1080.       }
  1081.       
  1082.       if(maxSkipRecords==0 || failedRange.getLength()<=maxSkipRecords) {
  1083.         skipRanges.add(failedRange);
  1084.       } else {
  1085.         //start dividing the range to narrow down the skipped
  1086.         //records until maxSkipRecords are met OR all attempts
  1087.         //get exhausted
  1088.         divide = new Divide(failedRange);
  1089.       }
  1090.     }
  1091.     
  1092.     class Divide {
  1093.       private final SortedRanges skipRange;
  1094.       private final Range test;
  1095.       private final Range other;
  1096.       private boolean testPassed;
  1097.       Divide(Range range){
  1098.         long half = range.getLength()/2;
  1099.         test = new Range(range.getStartIndex(), half);
  1100.         other = new Range(test.getEndIndex(), range.getLength()-half);
  1101.         //construct the skip range from the skipRanges
  1102.         skipRange = new SortedRanges();
  1103.         for(Range r : skipRanges.getRanges()) {
  1104.           skipRange.add(r);
  1105.         }
  1106.         skipRange.add(new Range(0,test.getStartIndex()));
  1107.         skipRange.add(new Range(test.getEndIndex(), 
  1108.             (Long.MAX_VALUE-test.getEndIndex())));
  1109.       }
  1110.     }
  1111.     
  1112.   }
  1113.   TreeMap<TaskAttemptID, String> getActiveTasks() {
  1114.     return activeTasks;
  1115.   }
  1116. }