TaskInProgress.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:36k
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.apache.hadoop.mapred;
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.Arrays;
- import java.util.Comparator;
- import java.util.Iterator;
- import java.util.List;
- import java.util.Map;
- import java.util.TreeMap;
- import java.util.TreeSet;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.apache.hadoop.mapred.JobClient.RawSplit;
- import org.apache.hadoop.mapred.SortedRanges.Range;
- import org.apache.hadoop.net.Node;
- /*************************************************************
- * TaskInProgress maintains all the info needed for a
- * Task in the lifetime of its owning Job. A given Task
- * might be speculatively executed or reexecuted, so we
- * need a level of indirection above the running-id itself.
- * <br>
- * A given TaskInProgress contains multiple taskids,
- * 0 or more of which might be executing at any one time.
- * (That's what allows speculative execution.) A taskid
- * is now *never* recycled. A TIP allocates enough taskids
- * to account for all the speculation and failures it will
- * ever have to handle. Once those are up, the TIP is dead.
- * **************************************************************
- */
- class TaskInProgress {
- static final int MAX_TASK_EXECS = 1;
- int maxTaskAttempts = 4;
- static final double SPECULATIVE_GAP = 0.2;
- static final long SPECULATIVE_LAG = 60 * 1000;
- private static final int NUM_ATTEMPTS_PER_RESTART = 1000;
- public static final Log LOG = LogFactory.getLog(TaskInProgress.class);
- // Defines the TIP
- private String jobFile = null;
- private RawSplit rawSplit;
- private int numMaps;
- private int partition;
- private JobTracker jobtracker;
- private TaskID id;
- private JobInProgress job;
- // Status of the TIP
- private int successEventNumber = -1;
- private int numTaskFailures = 0;
- private int numKilledTasks = 0;
- private double progress = 0;
- private String state = "";
- private long startTime = 0;
- private long execStartTime = 0;
- private long execFinishTime = 0;
- private int completes = 0;
- private boolean failed = false;
- private boolean killed = false;
- private long maxSkipRecords = 0;
- private FailedRanges failedRanges = new FailedRanges();
- private volatile boolean skipping = false;
- private boolean jobCleanup = false;
- private boolean jobSetup = false;
-
- // The 'next' usable taskid of this tip
- int nextTaskId = 0;
-
- // The taskid that took this TIP to SUCCESS
- private TaskAttemptID successfulTaskId;
- // The first taskid of this tip
- private TaskAttemptID firstTaskId;
-
- // Map from task Id -> TaskTracker Id, contains tasks that are
- // currently runnings
- private TreeMap<TaskAttemptID, String> activeTasks = new TreeMap<TaskAttemptID, String>();
- // All attempt Ids of this TIP
- private TreeSet<TaskAttemptID> tasks = new TreeSet<TaskAttemptID>();
- private JobConf conf;
- private Map<TaskAttemptID,List<String>> taskDiagnosticData =
- new TreeMap<TaskAttemptID,List<String>>();
- /**
- * Map from taskId -> TaskStatus
- */
- private TreeMap<TaskAttemptID,TaskStatus> taskStatuses =
- new TreeMap<TaskAttemptID,TaskStatus>();
- // Map from taskId -> TaskTracker Id,
- // contains cleanup attempts and where they ran, if any
- private TreeMap<TaskAttemptID, String> cleanupTasks =
- new TreeMap<TaskAttemptID, String>();
- private TreeSet<String> machinesWhereFailed = new TreeSet<String>();
- private TreeSet<TaskAttemptID> tasksReportedClosed = new TreeSet<TaskAttemptID>();
-
- //list of tasks to kill, <taskid> -> <shouldFail>
- private TreeMap<TaskAttemptID, Boolean> tasksToKill = new TreeMap<TaskAttemptID, Boolean>();
-
- //task to commit, <taskattemptid>
- private TaskAttemptID taskToCommit;
-
- private Counters counters = new Counters();
-
- /**
- * Constructor for MapTask
- */
- public TaskInProgress(JobID jobid, String jobFile,
- RawSplit rawSplit,
- JobTracker jobtracker, JobConf conf,
- JobInProgress job, int partition) {
- this.jobFile = jobFile;
- this.rawSplit = rawSplit;
- this.jobtracker = jobtracker;
- this.job = job;
- this.conf = conf;
- this.partition = partition;
- this.maxSkipRecords = SkipBadRecords.getMapperMaxSkipRecords(conf);
- setMaxTaskAttempts();
- init(jobid);
- }
-
- /**
- * Constructor for ReduceTask
- */
- public TaskInProgress(JobID jobid, String jobFile,
- int numMaps,
- int partition, JobTracker jobtracker, JobConf conf,
- JobInProgress job) {
- this.jobFile = jobFile;
- this.numMaps = numMaps;
- this.partition = partition;
- this.jobtracker = jobtracker;
- this.job = job;
- this.conf = conf;
- this.maxSkipRecords = SkipBadRecords.getReducerMaxSkipGroups(conf);
- setMaxTaskAttempts();
- init(jobid);
- }
-
- /**
- * Set the max number of attempts before we declare a TIP as "failed"
- */
- private void setMaxTaskAttempts() {
- if (isMapTask()) {
- this.maxTaskAttempts = conf.getMaxMapAttempts();
- } else {
- this.maxTaskAttempts = conf.getMaxReduceAttempts();
- }
- }
-
- /**
- * Return the index of the tip within the job, so
- * "task_200707121733_1313_0002_m_012345" would return 12345;
- * @return int the tip index
- */
- public int idWithinJob() {
- return partition;
- }
- public boolean isJobCleanupTask() {
- return jobCleanup;
- }
-
- public void setJobCleanupTask() {
- jobCleanup = true;
- }
- public boolean isJobSetupTask() {
- return jobSetup;
- }
-
- public void setJobSetupTask() {
- jobSetup = true;
- }
- public boolean isOnlyCommitPending() {
- for (TaskStatus t : taskStatuses.values()) {
- if (t.getRunState() == TaskStatus.State.COMMIT_PENDING) {
- return true;
- }
- }
- return false;
- }
-
- public boolean isCommitPending(TaskAttemptID taskId) {
- TaskStatus t = taskStatuses.get(taskId);
- if (t == null) {
- return false;
- }
- return t.getRunState() == TaskStatus.State.COMMIT_PENDING;
- }
-
- /**
- * Initialization common to Map and Reduce
- */
- void init(JobID jobId) {
- this.startTime = System.currentTimeMillis();
- this.id = new TaskID(jobId, isMapTask(), partition);
- this.skipping = startSkipping();
- }
- ////////////////////////////////////
- // Accessors, info, profiles, etc.
- ////////////////////////////////////
- /**
- * Return the start time
- */
- public long getStartTime() {
- return startTime;
- }
-
- /**
- * Return the exec start time
- */
- public long getExecStartTime() {
- return execStartTime;
- }
-
- /**
- * Set the exec start time
- */
- public void setExecStartTime(long startTime) {
- execStartTime = startTime;
- }
-
- /**
- * Return the exec finish time
- */
- public long getExecFinishTime() {
- return execFinishTime;
- }
- /**
- * Set the exec finish time
- */
- public void setExecFinishTime(long finishTime) {
- execFinishTime = finishTime;
- JobHistory.Task.logUpdates(id, execFinishTime); // log the update
- }
-
- /**
- * Return the parent job
- */
- public JobInProgress getJob() {
- return job;
- }
- /**
- * Return an ID for this task, not its component taskid-threads
- */
- public TaskID getTIPId() {
- return this.id;
- }
- /**
- * Whether this is a map task
- */
- public boolean isMapTask() {
- return rawSplit != null;
- }
-
- /**
- * Is the Task associated with taskid is the first attempt of the tip?
- * @param taskId
- * @return Returns true if the Task is the first attempt of the tip
- */
- public boolean isFirstAttempt(TaskAttemptID taskId) {
- return firstTaskId == null ? false : firstTaskId.equals(taskId);
- }
-
- /**
- * Is this tip currently running any tasks?
- * @return true if any tasks are running
- */
- public boolean isRunning() {
- return !activeTasks.isEmpty();
- }
-
- TaskAttemptID getSuccessfulTaskid() {
- return successfulTaskId;
- }
-
- private void setSuccessfulTaskid(TaskAttemptID successfulTaskId) {
- this.successfulTaskId = successfulTaskId;
- }
-
- private void resetSuccessfulTaskid() {
- this.successfulTaskId = null;
- }
-
- /**
- * Is this tip complete?
- *
- * @return <code>true</code> if the tip is complete, else <code>false</code>
- */
- public synchronized boolean isComplete() {
- return (completes > 0);
- }
- /**
- * Is the given taskid the one that took this tip to completion?
- *
- * @param taskid taskid of attempt to check for completion
- * @return <code>true</code> if taskid is complete, else <code>false</code>
- */
- public boolean isComplete(TaskAttemptID taskid) {
- return ((completes > 0)
- && taskid.equals(getSuccessfulTaskid()));
- }
- /**
- * Is the tip a failure?
- *
- * @return <code>true</code> if tip has failed, else <code>false</code>
- */
- public boolean isFailed() {
- return failed;
- }
- /**
- * Number of times the TaskInProgress has failed.
- */
- public int numTaskFailures() {
- return numTaskFailures;
- }
- /**
- * Number of times the TaskInProgress has been killed by the framework.
- */
- public int numKilledTasks() {
- return numKilledTasks;
- }
- /**
- * Get the overall progress (from 0 to 1.0) for this TIP
- */
- public double getProgress() {
- return progress;
- }
-
- /**
- * Get the task's counters
- */
- public Counters getCounters() {
- return counters;
- }
- /**
- * Returns whether a component task-thread should be
- * closed because the containing JobInProgress has completed
- * or the task is killed by the user
- */
- public boolean shouldClose(TaskAttemptID taskid) {
- /**
- * If the task hasn't been closed yet, and it belongs to a completed
- * TaskInProgress close it.
- *
- * However, for completed map tasks we do not close the task which
- * actually was the one responsible for _completing_ the TaskInProgress.
- */
- boolean close = false;
- TaskStatus ts = taskStatuses.get(taskid);
- if ((ts != null) &&
- (!tasksReportedClosed.contains(taskid)) &&
- ((this.failed) ||
- ((job.getStatus().getRunState() != JobStatus.RUNNING &&
- (job.getStatus().getRunState() != JobStatus.PREP))))) {
- tasksReportedClosed.add(taskid);
- close = true;
- } else if (isComplete() &&
- !(isMapTask() && !jobSetup &&
- !jobCleanup && isComplete(taskid)) &&
- !tasksReportedClosed.contains(taskid)) {
- tasksReportedClosed.add(taskid);
- close = true;
- } else if (isCommitPending(taskid) && !shouldCommit(taskid) &&
- !tasksReportedClosed.contains(taskid)) {
- tasksReportedClosed.add(taskid);
- close = true;
- } else {
- close = tasksToKill.keySet().contains(taskid);
- }
- return close;
- }
- /**
- * Commit this task attempt for the tip.
- * @param taskid
- */
- public void doCommit(TaskAttemptID taskid) {
- taskToCommit = taskid;
- }
- /**
- * Returns whether the task attempt should be committed or not
- */
- public boolean shouldCommit(TaskAttemptID taskid) {
- return !isComplete() && isCommitPending(taskid) &&
- taskToCommit.equals(taskid);
- }
- /**
- * Creates a "status report" for this task. Includes the
- * task ID and overall status, plus reports for all the
- * component task-threads that have ever been started.
- */
- synchronized TaskReport generateSingleReport() {
- ArrayList<String> diagnostics = new ArrayList<String>();
- for (List<String> l : taskDiagnosticData.values()) {
- diagnostics.addAll(l);
- }
- TIPStatus currentStatus = null;
- if (isRunning() && !isComplete()) {
- currentStatus = TIPStatus.RUNNING;
- } else if (isComplete()) {
- currentStatus = TIPStatus.COMPLETE;
- } else if (wasKilled()) {
- currentStatus = TIPStatus.KILLED;
- } else if (isFailed()) {
- currentStatus = TIPStatus.FAILED;
- } else if (!(isComplete() || isRunning() || wasKilled())) {
- currentStatus = TIPStatus.PENDING;
- }
-
- TaskReport report = new TaskReport
- (getTIPId(), (float)progress, state,
- diagnostics.toArray(new String[diagnostics.size()]),
- currentStatus, execStartTime, execFinishTime, counters);
- if (currentStatus == TIPStatus.RUNNING) {
- report.setRunningTaskAttempts(activeTasks.keySet());
- } else if (currentStatus == TIPStatus.COMPLETE) {
- report.setSuccessfulAttempt(getSuccessfulTaskid());
- }
- return report;
- }
- /**
- * Get the diagnostic messages for a given task within this tip.
- *
- * @param taskId the id of the required task
- * @return the list of diagnostics for that task
- */
- synchronized List<String> getDiagnosticInfo(TaskAttemptID taskId) {
- return taskDiagnosticData.get(taskId);
- }
-
- ////////////////////////////////////////////////
- // Update methods, usually invoked by the owning
- // job.
- ////////////////////////////////////////////////
-
- /**
- * Save diagnostic information for a given task.
- *
- * @param taskId id of the task
- * @param diagInfo diagnostic information for the task
- */
- public void addDiagnosticInfo(TaskAttemptID taskId, String diagInfo) {
- List<String> diagHistory = taskDiagnosticData.get(taskId);
- if (diagHistory == null) {
- diagHistory = new ArrayList<String>();
- taskDiagnosticData.put(taskId, diagHistory);
- }
- diagHistory.add(diagInfo);
- }
-
- /**
- * A status message from a client has arrived.
- * It updates the status of a single component-thread-task,
- * which might result in an overall TaskInProgress status update.
- * @return has the task changed its state noticably?
- */
- synchronized boolean updateStatus(TaskStatus status) {
- TaskAttemptID taskid = status.getTaskID();
- String diagInfo = status.getDiagnosticInfo();
- TaskStatus oldStatus = taskStatuses.get(taskid);
- boolean changed = true;
- if (diagInfo != null && diagInfo.length() > 0) {
- LOG.info("Error from "+taskid+": "+diagInfo);
- addDiagnosticInfo(taskid, diagInfo);
- }
-
- if(skipping) {
- failedRanges.updateState(status);
- }
-
- if (oldStatus != null) {
- TaskStatus.State oldState = oldStatus.getRunState();
- TaskStatus.State newState = status.getRunState();
-
- // We should never recieve a duplicate success/failure/killed
- // status update for the same taskid! This is a safety check,
- // and is addressed better at the TaskTracker to ensure this.
- // @see {@link TaskTracker.transmitHeartbeat()}
- if ((newState != TaskStatus.State.RUNNING &&
- newState != TaskStatus.State.COMMIT_PENDING &&
- newState != TaskStatus.State.FAILED_UNCLEAN &&
- newState != TaskStatus.State.KILLED_UNCLEAN &&
- newState != TaskStatus.State.UNASSIGNED) &&
- (oldState == newState)) {
- LOG.warn("Recieved duplicate status update of '" + newState +
- "' for '" + taskid + "' of TIP '" + getTIPId() + "'");
- return false;
- }
- // The task is not allowed to move from completed back to running.
- // We have seen out of order status messagesmoving tasks from complete
- // to running. This is a spot fix, but it should be addressed more
- // globally.
- if ((newState == TaskStatus.State.RUNNING ||
- newState == TaskStatus.State.UNASSIGNED) &&
- (oldState == TaskStatus.State.FAILED ||
- oldState == TaskStatus.State.KILLED ||
- oldState == TaskStatus.State.FAILED_UNCLEAN ||
- oldState == TaskStatus.State.KILLED_UNCLEAN ||
- oldState == TaskStatus.State.SUCCEEDED ||
- oldState == TaskStatus.State.COMMIT_PENDING)) {
- return false;
- }
-
- //Do not accept any status once the task is marked FAILED/KILLED
- //This is to handle the case of the JobTracker timing out a task
- //due to launch delay, but the TT comes back with any state or
- //TT got expired
- if (oldState == TaskStatus.State.FAILED ||
- oldState == TaskStatus.State.KILLED) {
- tasksToKill.put(taskid, true);
- return false;
- }
-
- changed = oldState != newState;
- }
- // if task is a cleanup attempt, do not replace the complete status,
- // update only specific fields.
- // For example, startTime should not be updated,
- // but finishTime has to be updated.
- if (!isCleanupAttempt(taskid)) {
- taskStatuses.put(taskid, status);
- } else {
- taskStatuses.get(taskid).statusUpdate(status.getRunState(),
- status.getProgress(), status.getStateString(), status.getPhase(),
- status.getFinishTime());
- }
- // Recompute progress
- recomputeProgress();
- return changed;
- }
- /**
- * Indicate that one of the taskids in this TaskInProgress
- * has failed.
- */
- public void incompleteSubTask(TaskAttemptID taskid,
- JobStatus jobStatus) {
- //
- // Note the failure and its location
- //
- TaskStatus status = taskStatuses.get(taskid);
- String trackerName;
- String trackerHostName = null;
- TaskStatus.State taskState = TaskStatus.State.FAILED;
- if (status != null) {
- trackerName = status.getTaskTracker();
- trackerHostName =
- JobInProgress.convertTrackerNameToHostName(trackerName);
- // Check if the user manually KILLED/FAILED this task-attempt...
- Boolean shouldFail = tasksToKill.remove(taskid);
- if (shouldFail != null) {
- if (status.getRunState() == TaskStatus.State.FAILED ||
- status.getRunState() == TaskStatus.State.KILLED) {
- taskState = (shouldFail) ? TaskStatus.State.FAILED :
- TaskStatus.State.KILLED;
- } else {
- taskState = (shouldFail) ? TaskStatus.State.FAILED_UNCLEAN :
- TaskStatus.State.KILLED_UNCLEAN;
-
- }
- status.setRunState(taskState);
- addDiagnosticInfo(taskid, "Task has been " + taskState + " by the user" );
- }
-
- taskState = status.getRunState();
- if (taskState != TaskStatus.State.FAILED &&
- taskState != TaskStatus.State.KILLED &&
- taskState != TaskStatus.State.FAILED_UNCLEAN &&
- taskState != TaskStatus.State.KILLED_UNCLEAN) {
- LOG.info("Task '" + taskid + "' running on '" + trackerName +
- "' in state: '" + taskState + "' being failed!");
- status.setRunState(TaskStatus.State.FAILED);
- taskState = TaskStatus.State.FAILED;
- }
- // tasktracker went down and failed time was not reported.
- if (0 == status.getFinishTime()){
- status.setFinishTime(System.currentTimeMillis());
- }
- }
- this.activeTasks.remove(taskid);
-
- // Since we do not fail completed reduces (whose outputs go to hdfs), we
- // should note this failure only for completed maps, only if this taskid;
- // completed this map. however if the job is done, there is no need to
- // manipulate completed maps
- if (this.isMapTask() && !jobSetup && !jobCleanup && isComplete(taskid) &&
- jobStatus.getRunState() != JobStatus.SUCCEEDED) {
- this.completes--;
-
- // Reset the successfulTaskId since we don't have a SUCCESSFUL task now
- resetSuccessfulTaskid();
- }
- // Note that there can be failures of tasks that are hosted on a machine
- // that has not yet registered with restarted jobtracker
- // recalculate the counts only if its a genuine failure
- if (tasks.contains(taskid)) {
- if (taskState == TaskStatus.State.FAILED) {
- numTaskFailures++;
- machinesWhereFailed.add(trackerHostName);
- if(maxSkipRecords>0) {
- //skipping feature enabled
- LOG.debug("TaskInProgress adding" + status.getNextRecordRange());
- failedRanges.add(status.getNextRecordRange());
- skipping = startSkipping();
- }
- } else if (taskState == TaskStatus.State.KILLED) {
- numKilledTasks++;
- }
- }
- if (numTaskFailures >= maxTaskAttempts) {
- LOG.info("TaskInProgress " + getTIPId() + " has failed " + numTaskFailures + " times.");
- kill();
- }
- }
-
- /**
- * Get whether to start skipping mode.
- */
- private boolean startSkipping() {
- if(maxSkipRecords>0 &&
- numTaskFailures>=SkipBadRecords.getAttemptsToStartSkipping(conf)) {
- return true;
- }
- return false;
- }
- /**
- * Finalize the <b>completed</b> task; note that this might not be the first
- * task-attempt of the {@link TaskInProgress} and hence might be declared
- * {@link TaskStatus.State.SUCCEEDED} or {@link TaskStatus.State.KILLED}
- *
- * @param taskId id of the completed task-attempt
- * @param finalTaskState final {@link TaskStatus.State} of the task-attempt
- */
- private void completedTask(TaskAttemptID taskId, TaskStatus.State finalTaskState) {
- TaskStatus status = taskStatuses.get(taskId);
- status.setRunState(finalTaskState);
- activeTasks.remove(taskId);
- }
-
- /**
- * Indicate that one of the taskids in this already-completed
- * TaskInProgress has successfully completed; hence we mark this
- * taskid as {@link TaskStatus.State.KILLED}.
- */
- void alreadyCompletedTask(TaskAttemptID taskid) {
- // 'KILL' the task
- completedTask(taskid, TaskStatus.State.KILLED);
-
- // Note the reason for the task being 'KILLED'
- addDiagnosticInfo(taskid, "Already completed TIP");
-
- LOG.info("Already complete TIP " + getTIPId() +
- " has completed task " + taskid);
- }
- /**
- * Indicate that one of the taskids in this TaskInProgress
- * has successfully completed!
- */
- public void completed(TaskAttemptID taskid) {
- //
- // Record that this taskid is complete
- //
- completedTask(taskid, TaskStatus.State.SUCCEEDED);
-
- // Note the successful taskid
- setSuccessfulTaskid(taskid);
-
- //
- // Now that the TIP is complete, the other speculative
- // subtasks will be closed when the owning tasktracker
- // reports in and calls shouldClose() on this object.
- //
- this.completes++;
- this.execFinishTime = System.currentTimeMillis();
- recomputeProgress();
-
- }
- /**
- * Get the split locations
- */
- public String[] getSplitLocations() {
- return rawSplit.getLocations();
- }
-
- /**
- * Get the Status of the tasks managed by this TIP
- */
- public TaskStatus[] getTaskStatuses() {
- return taskStatuses.values().toArray(new TaskStatus[taskStatuses.size()]);
- }
- /**
- * Get the status of the specified task
- * @param taskid
- * @return
- */
- public TaskStatus getTaskStatus(TaskAttemptID taskid) {
- return taskStatuses.get(taskid);
- }
- /**
- * The TIP's been ordered kill()ed.
- */
- public void kill() {
- if (isComplete() || failed) {
- return;
- }
- this.failed = true;
- killed = true;
- this.execFinishTime = System.currentTimeMillis();
- recomputeProgress();
- }
- /**
- * Was the task killed?
- * @return true if the task killed
- */
- public boolean wasKilled() {
- return killed;
- }
-
- /**
- * Kill the given task
- */
- boolean killTask(TaskAttemptID taskId, boolean shouldFail) {
- TaskStatus st = taskStatuses.get(taskId);
- if(st != null && (st.getRunState() == TaskStatus.State.RUNNING
- || st.getRunState() == TaskStatus.State.COMMIT_PENDING ||
- st.inTaskCleanupPhase() ||
- st.getRunState() == TaskStatus.State.UNASSIGNED)
- && tasksToKill.put(taskId, shouldFail) == null ) {
- String logStr = "Request received to " + (shouldFail ? "fail" : "kill")
- + " task '" + taskId + "' by user";
- addDiagnosticInfo(taskId, logStr);
- LOG.info(logStr);
- return true;
- }
- return false;
- }
- /**
- * This method is called whenever there's a status change
- * for one of the TIP's sub-tasks. It recomputes the overall
- * progress for the TIP. We examine all sub-tasks and find
- * the one that's most advanced (and non-failed).
- */
- void recomputeProgress() {
- if (isComplete()) {
- this.progress = 1;
- // update the counters and the state
- TaskStatus completedStatus = taskStatuses.get(getSuccessfulTaskid());
- this.counters = completedStatus.getCounters();
- this.state = completedStatus.getStateString();
- } else if (failed) {
- this.progress = 0;
- // reset the counters and the state
- this.state = "";
- this.counters = new Counters();
- } else {
- double bestProgress = 0;
- String bestState = "";
- Counters bestCounters = new Counters();
- for (Iterator<TaskAttemptID> it = taskStatuses.keySet().iterator(); it.hasNext();) {
- TaskAttemptID taskid = it.next();
- TaskStatus status = taskStatuses.get(taskid);
- if (status.getRunState() == TaskStatus.State.SUCCEEDED) {
- bestProgress = 1;
- bestState = status.getStateString();
- bestCounters = status.getCounters();
- break;
- } else if (status.getRunState() == TaskStatus.State.COMMIT_PENDING) {
- //for COMMIT_PENDING, we take the last state that we recorded
- //when the task was RUNNING
- bestProgress = this.progress;
- bestState = this.state;
- bestCounters = this.counters;
- } else if (status.getRunState() == TaskStatus.State.RUNNING) {
- if (status.getProgress() >= bestProgress) {
- bestProgress = status.getProgress();
- bestState = status.getStateString();
- if (status.getIncludeCounters()) {
- bestCounters = status.getCounters();
- } else {
- bestCounters = this.counters;
- }
- }
- }
- }
- this.progress = bestProgress;
- this.state = bestState;
- this.counters = bestCounters;
- }
- }
- /////////////////////////////////////////////////
- // "Action" methods that actually require the TIP
- // to do something.
- /////////////////////////////////////////////////
- /**
- * Return whether this TIP still needs to run
- */
- boolean isRunnable() {
- return !failed && (completes == 0);
- }
-
- /**
- * Return whether the TIP has a speculative task to run. We
- * only launch a speculative task if the current TIP is really
- * far behind, and has been behind for a non-trivial amount of
- * time.
- */
- boolean hasSpeculativeTask(long currentTime, double averageProgress) {
- //
- // REMIND - mjc - these constants should be examined
- // in more depth eventually...
- //
-
- if (!skipping && activeTasks.size() <= MAX_TASK_EXECS &&
- (averageProgress - progress >= SPECULATIVE_GAP) &&
- (currentTime - startTime >= SPECULATIVE_LAG)
- && completes == 0 && !isOnlyCommitPending()) {
- return true;
- }
- return false;
- }
-
- /**
- * Return a Task that can be sent to a TaskTracker for execution.
- */
- public Task getTaskToRun(String taskTracker) throws IOException {
- if (0 == execStartTime){
- // assume task starts running now
- execStartTime = System.currentTimeMillis();
- }
- // Create the 'taskid'; do not count the 'killed' tasks against the job!
- TaskAttemptID taskid = null;
- if (nextTaskId < (MAX_TASK_EXECS + maxTaskAttempts + numKilledTasks)) {
- // Make sure that the attempts are unqiue across restarts
- int attemptId = job.getNumRestarts() * NUM_ATTEMPTS_PER_RESTART + nextTaskId;
- taskid = new TaskAttemptID( id, attemptId);
- ++nextTaskId;
- } else {
- LOG.warn("Exceeded limit of " + (MAX_TASK_EXECS + maxTaskAttempts) +
- " (plus " + numKilledTasks + " killed)" +
- " attempts for the tip '" + getTIPId() + "'");
- return null;
- }
- return addRunningTask(taskid, taskTracker);
- }
-
- public Task addRunningTask(TaskAttemptID taskid, String taskTracker) {
- return addRunningTask(taskid, taskTracker, false);
- }
-
- /**
- * Adds a previously running task to this tip. This is used in case of
- * jobtracker restarts.
- */
- public Task addRunningTask(TaskAttemptID taskid,
- String taskTracker,
- boolean taskCleanup) {
- // create the task
- Task t = null;
- if (isMapTask()) {
- LOG.debug("attempt "+ numTaskFailures +
- " sending skippedRecords "+failedRanges.getIndicesCount());
- t = new MapTask(jobFile, taskid, partition,
- rawSplit.getClassName(), rawSplit.getBytes());
- } else {
- t = new ReduceTask(jobFile, taskid, partition, numMaps);
- }
- if (jobCleanup) {
- t.setJobCleanupTask();
- }
- if (jobSetup) {
- t.setJobSetupTask();
- }
- if (taskCleanup) {
- t.setTaskCleanupTask();
- t.setState(taskStatuses.get(taskid).getRunState());
- cleanupTasks.put(taskid, taskTracker);
- }
- t.setConf(conf);
- LOG.debug("Launching task with skipRanges:"+failedRanges.getSkipRanges());
- t.setSkipRanges(failedRanges.getSkipRanges());
- t.setSkipping(skipping);
- if(failedRanges.isTestAttempt()) {
- t.setWriteSkipRecs(false);
- }
- activeTasks.put(taskid, taskTracker);
- tasks.add(taskid);
- // Ask JobTracker to note that the task exists
- jobtracker.createTaskEntry(taskid, taskTracker, this);
- // check and set the first attempt
- if (firstTaskId == null) {
- firstTaskId = taskid;
- }
- return t;
- }
- boolean isRunningTask(TaskAttemptID taskid) {
- TaskStatus status = taskStatuses.get(taskid);
- return status != null && status.getRunState() == TaskStatus.State.RUNNING;
- }
-
- boolean isCleanupAttempt(TaskAttemptID taskid) {
- return cleanupTasks.containsKey(taskid);
- }
-
- String machineWhereCleanupRan(TaskAttemptID taskid) {
- return cleanupTasks.get(taskid);
- }
-
- String machineWhereTaskRan(TaskAttemptID taskid) {
- return taskStatuses.get(taskid).getTaskTracker();
- }
-
- boolean wasKilled(TaskAttemptID taskid) {
- return tasksToKill.containsKey(taskid);
- }
-
- /**
- * Has this task already failed on this machine?
- * @param trackerHost The task tracker hostname
- * @return Has it failed?
- */
- public boolean hasFailedOnMachine(String trackerHost) {
- return machinesWhereFailed.contains(trackerHost);
- }
-
- /**
- * Was this task ever scheduled to run on this machine?
- * @param trackerHost The task tracker hostname
- * @param trackerName The tracker name
- * @return Was task scheduled on the tracker?
- */
- public boolean hasRunOnMachine(String trackerHost, String trackerName) {
- return this.activeTasks.values().contains(trackerName) ||
- hasFailedOnMachine(trackerHost);
- }
- /**
- * Get the number of machines where this task has failed.
- * @return the size of the failed machine set
- */
- public int getNumberOfFailedMachines() {
- return machinesWhereFailed.size();
- }
-
- /**
- * Get the id of this map or reduce task.
- * @return The index of this tip in the maps/reduces lists.
- */
- public int getIdWithinJob() {
- return partition;
- }
-
- /**
- * Set the event number that was raised for this tip
- */
- public void setSuccessEventNumber(int eventNumber) {
- successEventNumber = eventNumber;
- }
-
- /**
- * Get the event number that was raised for this tip
- */
- public int getSuccessEventNumber() {
- return successEventNumber;
- }
-
- /**
- * Gets the Node list of input split locations sorted in rack order.
- */
- public String getSplitNodes() {
- if ( rawSplit == null) {
- return "";
- }
- String[] splits = rawSplit.getLocations();
- Node[] nodes = new Node[splits.length];
- for (int i = 0; i < splits.length; i++) {
- nodes[i] = jobtracker.getNode(splits[i]);
- }
- // sort nodes on rack location
- Arrays.sort(nodes, new Comparator<Node>() {
- public int compare(Node a, Node b) {
- String left = a.getNetworkLocation();
- String right = b.getNetworkLocation();
- return left.compareTo(right);
- }
- });
- return nodeToString(nodes);
- }
- private static String nodeToString(Node[] nodes) {
- if (nodes == null || nodes.length == 0) {
- return "";
- }
- StringBuffer ret = new StringBuffer(nodes[0].toString());
- for(int i = 1; i < nodes.length;i++) {
- ret.append(",");
- ret.append(nodes[i].toString());
- }
- return ret.toString();
- }
- public long getMapInputSize() {
- if(isMapTask() && !jobSetup && !jobCleanup) {
- return rawSplit.getDataLength();
- } else {
- return 0;
- }
- }
-
- public void clearSplit() {
- rawSplit.clearBytes();
- }
-
- /**
- * This class keeps the records to be skipped during further executions
- * based on failed records from all the previous attempts.
- * It also narrow down the skip records if it is more than the
- * acceptable value by dividing the failed range into half. In this case one
- * half is executed in the next attempt (test attempt).
- * In the test attempt, only the test range gets executed, others get skipped.
- * Based on the success/failure of the test attempt, the range is divided
- * further.
- */
- private class FailedRanges {
- private SortedRanges skipRanges = new SortedRanges();
- private Divide divide;
-
- synchronized SortedRanges getSkipRanges() {
- if(divide!=null) {
- return divide.skipRange;
- }
- return skipRanges;
- }
-
- synchronized boolean isTestAttempt() {
- return divide!=null;
- }
-
- synchronized long getIndicesCount() {
- if(isTestAttempt()) {
- return divide.skipRange.getIndicesCount();
- }
- return skipRanges.getIndicesCount();
- }
-
- synchronized void updateState(TaskStatus status){
- if (isTestAttempt() &&
- (status.getRunState() == TaskStatus.State.SUCCEEDED)) {
- divide.testPassed = true;
- //since it was the test attempt we need to set it to failed
- //as it worked only on the test range
- status.setRunState(TaskStatus.State.FAILED);
-
- }
- }
-
- synchronized void add(Range failedRange) {
- LOG.warn("FailedRange:"+ failedRange);
- if(divide!=null) {
- LOG.warn("FailedRange:"+ failedRange +" test:"+divide.test +
- " pass:"+divide.testPassed);
- if(divide.testPassed) {
- //test range passed
- //other range would be bad. test it
- failedRange = divide.other;
- }
- else {
- //test range failed
- //other range would be good.
- failedRange = divide.test;
- }
- //reset
- divide = null;
- }
-
- if(maxSkipRecords==0 || failedRange.getLength()<=maxSkipRecords) {
- skipRanges.add(failedRange);
- } else {
- //start dividing the range to narrow down the skipped
- //records until maxSkipRecords are met OR all attempts
- //get exhausted
- divide = new Divide(failedRange);
- }
- }
-
- class Divide {
- private final SortedRanges skipRange;
- private final Range test;
- private final Range other;
- private boolean testPassed;
- Divide(Range range){
- long half = range.getLength()/2;
- test = new Range(range.getStartIndex(), half);
- other = new Range(test.getEndIndex(), range.getLength()-half);
- //construct the skip range from the skipRanges
- skipRange = new SortedRanges();
- for(Range r : skipRanges.getRanges()) {
- skipRange.add(r);
- }
- skipRange.add(new Range(0,test.getStartIndex()));
- skipRange.add(new Range(test.getEndIndex(),
- (Long.MAX_VALUE-test.getEndIndex())));
- }
- }
-
- }
- TreeMap<TaskAttemptID, String> getActiveTasks() {
- return activeTasks;
- }
- }