TaskStatus.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:14k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.DataInput;
  20. import java.io.DataOutput;
  21. import java.io.IOException;
  22. import java.util.List;
  23. import org.apache.commons.logging.Log;
  24. import org.apache.commons.logging.LogFactory;
  25. import org.apache.hadoop.io.Text;
  26. import org.apache.hadoop.io.Writable;
  27. import org.apache.hadoop.io.WritableUtils;
  28. /**************************************************
  29.  * Describes the current status of a task.  This is
  30.  * not intended to be a comprehensive piece of data.
  31.  *
  32.  **************************************************/
  33. abstract class TaskStatus implements Writable, Cloneable {
  34.   static final Log LOG =
  35.     LogFactory.getLog(TaskStatus.class.getName());
  36.   
  37.   //enumeration for reporting current phase of a task. 
  38.   public static enum Phase{STARTING, MAP, SHUFFLE, SORT, REDUCE, CLEANUP}
  39.   // what state is the task in?
  40.   public static enum State {RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED, 
  41.                             COMMIT_PENDING, FAILED_UNCLEAN, KILLED_UNCLEAN}
  42.     
  43.   private final TaskAttemptID taskid;
  44.   private float progress;
  45.   private volatile State runState;
  46.   private String diagnosticInfo;
  47.   private String stateString;
  48.   private String taskTracker;
  49.     
  50.   private long startTime; 
  51.   private long finishTime; 
  52.   private long outputSize;
  53.     
  54.   private volatile Phase phase = Phase.STARTING; 
  55.   private Counters counters;
  56.   private boolean includeCounters;
  57.   private SortedRanges.Range nextRecordRange = new SortedRanges.Range();
  58.   public TaskStatus() {
  59.     taskid = new TaskAttemptID();
  60.   }
  61.   public TaskStatus(TaskAttemptID taskid, float progress,
  62.                     State runState, String diagnosticInfo,
  63.                     String stateString, String taskTracker,
  64.                     Phase phase, Counters counters) {
  65.     this.taskid = taskid;
  66.     this.progress = progress;
  67.     this.runState = runState;
  68.     this.diagnosticInfo = diagnosticInfo;
  69.     this.stateString = stateString;
  70.     this.taskTracker = taskTracker;
  71.     this.phase = phase;
  72.     this.counters = counters;
  73.     this.includeCounters = true;
  74.   }
  75.   
  76.   public TaskAttemptID getTaskID() { return taskid; }
  77.   public abstract boolean getIsMap();
  78.   public float getProgress() { return progress; }
  79.   public void setProgress(float progress) { this.progress = progress; } 
  80.   public State getRunState() { return runState; }
  81.   public String getTaskTracker() {return taskTracker;}
  82.   public void setTaskTracker(String tracker) { this.taskTracker = tracker;}
  83.   public void setRunState(State runState) { this.runState = runState; }
  84.   public String getDiagnosticInfo() { return diagnosticInfo; }
  85.   public void setDiagnosticInfo(String info) { 
  86.     diagnosticInfo = 
  87.       ((diagnosticInfo == null) ? info : diagnosticInfo.concat(info)); 
  88.   }
  89.   public String getStateString() { return stateString; }
  90.   public void setStateString(String stateString) { this.stateString = stateString; }
  91.   
  92.   /**
  93.    * Get the next record range which is going to be processed by Task.
  94.    * @return nextRecordRange
  95.    */
  96.   public SortedRanges.Range getNextRecordRange() {
  97.     return nextRecordRange;
  98.   }
  99.   /**
  100.    * Set the next record range which is going to be processed by Task.
  101.    * @param nextRecordRange
  102.    */
  103.   public void setNextRecordRange(SortedRanges.Range nextRecordRange) {
  104.     this.nextRecordRange = nextRecordRange;
  105.   }
  106.   
  107.   /**
  108.    * Get task finish time. if shuffleFinishTime and sortFinishTime 
  109.    * are not set before, these are set to finishTime. It takes care of 
  110.    * the case when shuffle, sort and finish are completed with in the 
  111.    * heartbeat interval and are not reported separately. if task state is 
  112.    * TaskStatus.FAILED then finish time represents when the task failed.
  113.    * @return finish time of the task. 
  114.    */
  115.   public long getFinishTime() {
  116.     return finishTime;
  117.   }
  118.   /**
  119.    * Sets finishTime. 
  120.    * @param finishTime finish time of task.
  121.    */
  122.   void setFinishTime(long finishTime) {
  123.     this.finishTime = finishTime;
  124.   }
  125.   /**
  126.    * Get shuffle finish time for the task. If shuffle finish time was 
  127.    * not set due to shuffle/sort/finish phases ending within same
  128.    * heartbeat interval, it is set to finish time of next phase i.e. sort 
  129.    * or task finish when these are set.  
  130.    * @return 0 if shuffleFinishTime, sortFinishTime and finish time are not set. else 
  131.    * it returns approximate shuffle finish time.  
  132.    */
  133.   public long getShuffleFinishTime() {
  134.     return 0;
  135.   }
  136.   /**
  137.    * Set shuffle finish time. 
  138.    * @param shuffleFinishTime 
  139.    */
  140.   void setShuffleFinishTime(long shuffleFinishTime) {}
  141.   /**
  142.    * Get sort finish time for the task,. If sort finish time was not set 
  143.    * due to sort and reduce phase finishing in same heartebat interval, it is 
  144.    * set to finish time, when finish time is set. 
  145.    * @return 0 if sort finish time and finish time are not set, else returns sort
  146.    * finish time if that is set, else it returns finish time. 
  147.    */
  148.   public long getSortFinishTime() {
  149.     return 0;
  150.   }
  151.   /**
  152.    * Sets sortFinishTime, if shuffleFinishTime is not set before 
  153.    * then its set to sortFinishTime.  
  154.    * @param sortFinishTime
  155.    */
  156.   void setSortFinishTime(long sortFinishTime) {}
  157.   /**
  158.    * Get start time of the task. 
  159.    * @return 0 is start time is not set, else returns start time. 
  160.    */
  161.   public long getStartTime() {
  162.     return startTime;
  163.   }
  164.   /**
  165.    * Set startTime of the task.
  166.    * @param startTime start time
  167.    */
  168.   void setStartTime(long startTime) {
  169.     this.startTime = startTime;
  170.   }
  171.   /**
  172.    * Get current phase of this task. Phase.Map in case of map tasks, 
  173.    * for reduce one of Phase.SHUFFLE, Phase.SORT or Phase.REDUCE. 
  174.    * @return . 
  175.    */
  176.   public Phase getPhase(){
  177.     return this.phase; 
  178.   }
  179.   /**
  180.    * Set current phase of this task.  
  181.    * @param phase phase of this task
  182.    */
  183.   void setPhase(Phase phase){
  184.     TaskStatus.Phase oldPhase = getPhase();
  185.     if (oldPhase != phase){
  186.       // sort phase started
  187.       if (phase == TaskStatus.Phase.SORT){
  188.         setShuffleFinishTime(System.currentTimeMillis());
  189.       }else if (phase == TaskStatus.Phase.REDUCE){
  190.         setSortFinishTime(System.currentTimeMillis());
  191.       }
  192.     }
  193.     this.phase = phase; 
  194.   }
  195.   boolean inTaskCleanupPhase() {
  196.     return (this.phase == TaskStatus.Phase.CLEANUP && 
  197.       (this.runState == TaskStatus.State.FAILED_UNCLEAN || 
  198.       this.runState == TaskStatus.State.KILLED_UNCLEAN));
  199.   }
  200.   
  201.   public boolean getIncludeCounters() {
  202.     return includeCounters; 
  203.   }
  204.   
  205.   public void setIncludeCounters(boolean send) {
  206.     includeCounters = send;
  207.   }
  208.   
  209.   /**
  210.    * Get task's counters.
  211.    */
  212.   public Counters getCounters() {
  213.     return counters;
  214.   }
  215.   /**
  216.    * Set the task's counters.
  217.    * @param counters
  218.    */
  219.   public void setCounters(Counters counters) {
  220.     this.counters = counters;
  221.   }
  222.   
  223.   /**
  224.    * Returns the number of bytes of output from this map.
  225.    */
  226.   public long getOutputSize() {
  227.     return outputSize;
  228.   }
  229.   
  230.   /**
  231.    * Set the size on disk of this task's output.
  232.    * @param l the number of map output bytes
  233.    */
  234.   void setOutputSize(long l)  {
  235.     outputSize = l;
  236.   }
  237.   
  238.   /**
  239.    * Get the list of maps from which output-fetches failed.
  240.    * 
  241.    * @return the list of maps from which output-fetches failed.
  242.    */
  243.   public List<TaskAttemptID> getFetchFailedMaps() {
  244.     return null;
  245.   }
  246.   
  247.   /**
  248.    * Add to the list of maps from which output-fetches failed.
  249.    *  
  250.    * @param mapTaskId map from which fetch failed
  251.    */
  252.   synchronized void addFetchFailedMap(TaskAttemptID mapTaskId) {}
  253.   /**
  254.    * Update the status of the task.
  255.    * 
  256.    * This update is done by ping thread before sending the status. 
  257.    * 
  258.    * @param progress
  259.    * @param state
  260.    * @param counters
  261.    */
  262.   synchronized void statusUpdate(float progress,
  263.                                  String state, 
  264.                                  Counters counters) {
  265.     setProgress(progress);
  266.     setStateString(state);
  267.     setCounters(counters);
  268.   }
  269.   
  270.   /**
  271.    * Update the status of the task.
  272.    * 
  273.    * @param status updated status
  274.    */
  275.   synchronized void statusUpdate(TaskStatus status) {
  276.     this.progress = status.getProgress();
  277.     this.runState = status.getRunState();
  278.     this.stateString = status.getStateString();
  279.     this.nextRecordRange = status.getNextRecordRange();
  280.     setDiagnosticInfo(status.getDiagnosticInfo());
  281.     
  282.     if (status.getStartTime() != 0) {
  283.       this.startTime = status.getStartTime(); 
  284.     }
  285.     if (status.getFinishTime() != 0) {
  286.       this.finishTime = status.getFinishTime(); 
  287.     }
  288.     
  289.     this.phase = status.getPhase();
  290.     this.counters = status.getCounters();
  291.     this.outputSize = status.outputSize;
  292.   }
  293.   /**
  294.    * Update specific fields of task status
  295.    * 
  296.    * This update is done in JobTracker when a cleanup attempt of task
  297.    * reports its status. Then update only specific fields, not all.
  298.    * 
  299.    * @param runState
  300.    * @param progress
  301.    * @param state
  302.    * @param phase
  303.    * @param finishTime
  304.    */
  305.   synchronized void statusUpdate(State runState, 
  306.                                  float progress,
  307.                                  String state, 
  308.                                  Phase phase,
  309.                                  long finishTime) {
  310.     setRunState(runState);
  311.     setProgress(progress);
  312.     setStateString(state);
  313.     setPhase(phase);
  314.     if (finishTime != 0) {
  315.       this.finishTime = finishTime; 
  316.     }
  317.   }
  318.   /**
  319.    * Clear out transient information after sending out a status-update
  320.    * from either the {@link Task} to the {@link TaskTracker} or from the
  321.    * {@link TaskTracker} to the {@link JobTracker}. 
  322.    */
  323.   synchronized void clearStatus() {
  324.     // Clear diagnosticInfo
  325.     diagnosticInfo = "";
  326.   }
  327.   @Override
  328.   public Object clone() {
  329.     try {
  330.       return super.clone();
  331.     } catch (CloneNotSupportedException cnse) {
  332.       // Shouldn't happen since we do implement Clonable
  333.       throw new InternalError(cnse.toString());
  334.     }
  335.   }
  336.   
  337.   //////////////////////////////////////////////
  338.   // Writable
  339.   //////////////////////////////////////////////
  340.   public void write(DataOutput out) throws IOException {
  341.     taskid.write(out);
  342.     out.writeFloat(progress);
  343.     WritableUtils.writeEnum(out, runState);
  344.     Text.writeString(out, diagnosticInfo);
  345.     Text.writeString(out, stateString);
  346.     WritableUtils.writeEnum(out, phase);
  347.     out.writeLong(startTime);
  348.     out.writeLong(finishTime);
  349.     out.writeBoolean(includeCounters);
  350.     out.writeLong(outputSize);
  351.     if (includeCounters) {
  352.       counters.write(out);
  353.     }
  354.     nextRecordRange.write(out);
  355.   }
  356.   public void readFields(DataInput in) throws IOException {
  357.     this.taskid.readFields(in);
  358.     this.progress = in.readFloat();
  359.     this.runState = WritableUtils.readEnum(in, State.class);
  360.     this.diagnosticInfo = Text.readString(in);
  361.     this.stateString = Text.readString(in);
  362.     this.phase = WritableUtils.readEnum(in, Phase.class); 
  363.     this.startTime = in.readLong(); 
  364.     this.finishTime = in.readLong(); 
  365.     counters = new Counters();
  366.     this.includeCounters = in.readBoolean();
  367.     this.outputSize = in.readLong();
  368.     if (includeCounters) {
  369.       counters.readFields(in);
  370.     }
  371.     nextRecordRange.readFields(in);
  372.   }
  373.   
  374.   //////////////////////////////////////////////////////////////////////////////
  375.   // Factory-like methods to create/read/write appropriate TaskStatus objects
  376.   //////////////////////////////////////////////////////////////////////////////
  377.   
  378.   static TaskStatus createTaskStatus(DataInput in, TaskAttemptID taskId, float progress,
  379.                                      State runState, String diagnosticInfo,
  380.                                      String stateString, String taskTracker,
  381.                                      Phase phase, Counters counters) 
  382.   throws IOException {
  383.     boolean isMap = in.readBoolean();
  384.     return createTaskStatus(isMap, taskId, progress, runState, diagnosticInfo, 
  385.                           stateString, taskTracker, phase, counters);
  386.   }
  387.   
  388.   static TaskStatus createTaskStatus(boolean isMap, TaskAttemptID taskId, float progress,
  389.                                    State runState, String diagnosticInfo,
  390.                                    String stateString, String taskTracker,
  391.                                    Phase phase, Counters counters) { 
  392.     return (isMap) ? new MapTaskStatus(taskId, progress, runState, 
  393.                                        diagnosticInfo, stateString, taskTracker, 
  394.                                        phase, counters) :
  395.                      new ReduceTaskStatus(taskId, progress, runState, 
  396.                                           diagnosticInfo, stateString, 
  397.                                           taskTracker, phase, counters);
  398.   }
  399.   
  400.   static TaskStatus createTaskStatus(boolean isMap) {
  401.     return (isMap) ? new MapTaskStatus() : new ReduceTaskStatus();
  402.   }
  403.   static TaskStatus readTaskStatus(DataInput in) throws IOException {
  404.     boolean isMap = in.readBoolean();
  405.     TaskStatus taskStatus = createTaskStatus(isMap);
  406.     taskStatus.readFields(in);
  407.     return taskStatus;
  408.   }
  409.   
  410.   static void writeTaskStatus(DataOutput out, TaskStatus taskStatus) 
  411.   throws IOException {
  412.     out.writeBoolean(taskStatus.getIsMap());
  413.     taskStatus.write(out);
  414.   }
  415. }