DefaultJobHistoryParser.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:6k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.util.*;
  20. import java.io.*;
  21. import org.apache.hadoop.fs.FileSystem;
  22. import org.apache.hadoop.mapred.JobHistory.Keys; 
  23. import org.apache.hadoop.mapred.JobHistory.Values;
  24. /**
  25.  * Default parser for job history files. It creates object model from 
  26.  * job history file. 
  27.  * 
  28.  */
  29. public class DefaultJobHistoryParser {
  30.   // This class is required to work around the Java compiler's lack of
  31.   // run-time information on generic classes. In particular, we need to be able
  32.   // to cast to this type without generating compiler warnings, which is only
  33.   // possible if it is a non-generic class.
  34.   /**
  35.    * Populates a JobInfo object from the job's history log file. 
  36.    * @param jobHistoryFile history file for this job. 
  37.    * @param job a precreated JobInfo object, should be non-null. 
  38.    * @param fs FileSystem where historyFile is present. 
  39.    * @throws IOException
  40.    */
  41.   public static void parseJobTasks(String jobHistoryFile, 
  42.                        JobHistory.JobInfo job, FileSystem fs)
  43.     throws IOException {
  44.     JobHistory.parseHistoryFromFS(jobHistoryFile, 
  45.                             new JobTasksParseListener(job), fs);
  46.   }
  47.   
  48.   /**
  49.    * Listener for Job's history log file, it populates JobHistory.JobInfo 
  50.    * object with data from log file. 
  51.    */
  52.   static class JobTasksParseListener
  53.     implements JobHistory.Listener {
  54.     JobHistory.JobInfo job;
  55.     JobTasksParseListener(JobHistory.JobInfo job) {
  56.       this.job = job;
  57.     }
  58.     private JobHistory.Task getTask(String taskId) {
  59.       JobHistory.Task task = job.getAllTasks().get(taskId);
  60.       if (null == task) {
  61.         task = new JobHistory.Task();
  62.         task.set(Keys.TASKID, taskId);
  63.         job.getAllTasks().put(taskId, task);
  64.       }
  65.       return task;
  66.     }
  67.     private JobHistory.MapAttempt getMapAttempt(
  68.                                                 String jobid, String jobTrackerId, String taskId, String taskAttemptId) {
  69.       JobHistory.Task task = getTask(taskId);
  70.       JobHistory.MapAttempt mapAttempt = 
  71.         (JobHistory.MapAttempt) task.getTaskAttempts().get(taskAttemptId);
  72.       if (null == mapAttempt) {
  73.         mapAttempt = new JobHistory.MapAttempt();
  74.         mapAttempt.set(Keys.TASK_ATTEMPT_ID, taskAttemptId);
  75.         task.getTaskAttempts().put(taskAttemptId, mapAttempt);
  76.       }
  77.       return mapAttempt;
  78.     }
  79.     private JobHistory.ReduceAttempt getReduceAttempt(
  80.                                                       String jobid, String jobTrackerId, String taskId, String taskAttemptId) {
  81.       JobHistory.Task task = getTask(taskId);
  82.       JobHistory.ReduceAttempt reduceAttempt = 
  83.         (JobHistory.ReduceAttempt) task.getTaskAttempts().get(taskAttemptId);
  84.       if (null == reduceAttempt) {
  85.         reduceAttempt = new JobHistory.ReduceAttempt();
  86.         reduceAttempt.set(Keys.TASK_ATTEMPT_ID, taskAttemptId);
  87.         task.getTaskAttempts().put(taskAttemptId, reduceAttempt);
  88.       }
  89.       return reduceAttempt;
  90.     }
  91.     // JobHistory.Listener implementation 
  92.     public void handle(JobHistory.RecordTypes recType, Map<Keys, String> values)
  93.       throws IOException {
  94.       String jobTrackerId = values.get(JobHistory.Keys.JOBTRACKERID);
  95.       String jobid = values.get(Keys.JOBID);
  96.       
  97.       if (recType == JobHistory.RecordTypes.Job) {
  98.         job.handle(values);
  99.       }if (recType.equals(JobHistory.RecordTypes.Task)) {
  100.         String taskid = values.get(JobHistory.Keys.TASKID);
  101.         getTask(taskid).handle(values);
  102.       } else if (recType.equals(JobHistory.RecordTypes.MapAttempt)) {
  103.         String taskid =  values.get(Keys.TASKID);
  104.         String mapAttemptId = values.get(Keys.TASK_ATTEMPT_ID);
  105.         getMapAttempt(jobid, jobTrackerId, taskid, mapAttemptId).handle(values);
  106.       } else if (recType.equals(JobHistory.RecordTypes.ReduceAttempt)) {
  107.         String taskid = values.get(Keys.TASKID);
  108.         String reduceAttemptId = values.get(Keys.TASK_ATTEMPT_ID);
  109.         getReduceAttempt(jobid, jobTrackerId, taskid, reduceAttemptId).handle(values);
  110.       }
  111.     }
  112.   }
  113.   // call this only for jobs that succeeded for better results. 
  114.   abstract static class NodesFilter implements JobHistory.Listener {
  115.     private Map<String, Set<String>> badNodesToNumFailedTasks =
  116.       new HashMap<String, Set<String>>();
  117.     
  118.     Map<String, Set<String>> getValues(){
  119.       return badNodesToNumFailedTasks; 
  120.     }
  121.     String failureType;
  122.     
  123.     public void handle(JobHistory.RecordTypes recType, Map<Keys, String> values)
  124.       throws IOException {
  125.       if (recType.equals(JobHistory.RecordTypes.MapAttempt) || 
  126.           recType.equals(JobHistory.RecordTypes.ReduceAttempt)) {
  127.         if (failureType.equals(values.get(Keys.TASK_STATUS)) ) {
  128.           String hostName = values.get(Keys.HOSTNAME);
  129.           String taskid = values.get(Keys.TASKID); 
  130.           Set<String> tasks = badNodesToNumFailedTasks.get(hostName); 
  131.           if (null == tasks ){
  132.             tasks = new TreeSet<String>(); 
  133.             tasks.add(taskid);
  134.             badNodesToNumFailedTasks.put(hostName, tasks);
  135.           }else{
  136.             tasks.add(taskid);
  137.           }
  138.         }
  139.       }      
  140.     }
  141.     abstract void setFailureType();
  142.     String getFailureType() {
  143.       return failureType;
  144.     }
  145.     NodesFilter() {
  146.       setFailureType();
  147.     }
  148.   }
  149.  
  150.   static class FailedOnNodesFilter extends NodesFilter {
  151.     void setFailureType() {
  152.       failureType = Values.FAILED.name();
  153.     }
  154.   }
  155.   static class KilledOnNodesFilter extends NodesFilter {
  156.     void setFailureType() {
  157.       failureType = Values.KILLED.name();
  158.     }
  159.   }
  160. }