ReduceTaskStatus.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:4k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.DataInput;
  20. import java.io.DataOutput;
  21. import java.io.IOException;
  22. import java.util.ArrayList;
  23. import java.util.List;
  24. class ReduceTaskStatus extends TaskStatus {
  25.   private long shuffleFinishTime; 
  26.   private long sortFinishTime; 
  27.   private List<TaskAttemptID> failedFetchTasks = new ArrayList<TaskAttemptID>(1);
  28.   
  29.   public ReduceTaskStatus() {}
  30.   public ReduceTaskStatus(TaskAttemptID taskid, float progress, State runState,
  31.           String diagnosticInfo, String stateString, String taskTracker,
  32.           Phase phase, Counters counters) {
  33.     super(taskid, progress, runState, diagnosticInfo, stateString, taskTracker,
  34.             phase, counters);
  35.   }
  36.   @Override
  37.   public Object clone() {
  38.     ReduceTaskStatus myClone = (ReduceTaskStatus)super.clone();
  39.     myClone.failedFetchTasks = new ArrayList<TaskAttemptID>(failedFetchTasks);
  40.     return myClone;
  41.   }
  42.   @Override
  43.   public boolean getIsMap() {
  44.     return false;
  45.   }
  46.   @Override
  47.   void setFinishTime(long finishTime) {
  48.     if (shuffleFinishTime == 0) {
  49.       this.shuffleFinishTime = finishTime; 
  50.     }
  51.     if (sortFinishTime == 0){
  52.       this.sortFinishTime = finishTime;
  53.     }
  54.     super.setFinishTime(finishTime);
  55.   }
  56.   @Override
  57.   public long getShuffleFinishTime() {
  58.     return shuffleFinishTime;
  59.   }
  60.   @Override
  61.   void setShuffleFinishTime(long shuffleFinishTime) {
  62.     this.shuffleFinishTime = shuffleFinishTime;
  63.   }
  64.   @Override
  65.   public long getSortFinishTime() {
  66.     return sortFinishTime;
  67.   }
  68.   @Override
  69.   void setSortFinishTime(long sortFinishTime) {
  70.     this.sortFinishTime = sortFinishTime;
  71.     if (0 == this.shuffleFinishTime){
  72.       this.shuffleFinishTime = sortFinishTime;
  73.     }
  74.   }
  75.   @Override
  76.   public List<TaskAttemptID> getFetchFailedMaps() {
  77.     return failedFetchTasks;
  78.   }
  79.   
  80.   @Override
  81.   void addFetchFailedMap(TaskAttemptID mapTaskId) {
  82.     failedFetchTasks.add(mapTaskId);
  83.   }
  84.   
  85.   @Override
  86.   synchronized void statusUpdate(TaskStatus status) {
  87.     super.statusUpdate(status);
  88.     
  89.     if (status.getShuffleFinishTime() != 0) {
  90.       this.shuffleFinishTime = status.getShuffleFinishTime();
  91.     }
  92.     
  93.     if (status.getSortFinishTime() != 0) {
  94.       sortFinishTime = status.getSortFinishTime();
  95.     }
  96.     
  97.     List<TaskAttemptID> newFetchFailedMaps = status.getFetchFailedMaps();
  98.     if (failedFetchTasks == null) {
  99.       failedFetchTasks = newFetchFailedMaps;
  100.     } else if (newFetchFailedMaps != null){
  101.       failedFetchTasks.addAll(newFetchFailedMaps);
  102.     }
  103.   }
  104.   @Override
  105.   synchronized void clearStatus() {
  106.     super.clearStatus();
  107.     failedFetchTasks.clear();
  108.   }
  109.   @Override
  110.   public void readFields(DataInput in) throws IOException {
  111.     super.readFields(in);
  112.     shuffleFinishTime = in.readLong(); 
  113.     sortFinishTime = in.readLong();
  114.     int noFailedFetchTasks = in.readInt();
  115.     failedFetchTasks = new ArrayList<TaskAttemptID>(noFailedFetchTasks);
  116.     for (int i=0; i < noFailedFetchTasks; ++i) {
  117.       TaskAttemptID id = new TaskAttemptID();
  118.       id.readFields(in);
  119.       failedFetchTasks.add(id);
  120.     }
  121.   }
  122.   @Override
  123.   public void write(DataOutput out) throws IOException {
  124.     super.write(out);
  125.     out.writeLong(shuffleFinishTime);
  126.     out.writeLong(sortFinishTime);
  127.     out.writeInt(failedFetchTasks.size());
  128.     for (TaskAttemptID taskId : failedFetchTasks) {
  129.       taskId.write(out);
  130.     }
  131.   }
  132.   
  133. }