JobStatistics.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:23k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.vaidya.statistics.job;
  19. import java.util.ArrayList;
  20. import org.apache.hadoop.mapred.JobConf;
  21. import org.apache.hadoop.mapred.JobHistory;
  22. import org.apache.hadoop.mapred.JobHistory.JobInfo;
  23. import org.apache.hadoop.mapred.JobHistory.Keys;
  24. import org.apache.hadoop.mapred.Counters;
  25. import org.apache.hadoop.mapred.Counters.Counter;
  26. import java.text.ParseException;
  27. //import org.apache.hadoop.vaidya.statistics.job.JobStatisticsInterface.JobKeys;
  28. import java.util.Hashtable;
  29. import java.util.Map;
  30. import java.util.regex.Pattern;
  31. import java.util.regex.Matcher;
  32. import java.util.Arrays;
  33. import java.util.Comparator;
  34. import java.util.List;
  35. import java.util.Collections;
  36. /**
  37.  *
  38.  */
  39. public class JobStatistics implements JobStatisticsInterface {
  40.   
  41.   
  42.   /*
  43.    * Pattern for parsing the COUNTERS
  44.    */
  45.   private static final Pattern _pattern = Pattern.compile("[[^,]?]+");  //"[[^,]?]+"
  46.   
  47.   /*
  48.    * Job configuration
  49.    */
  50.   private JobConf _jobConf;
  51.   
  52.   /**
  53.    * @param jobConf the jobConf to set
  54.    */
  55.   void setJobConf(JobConf jobConf) {
  56.     this._jobConf = jobConf;
  57.     // TODO: Add job conf to _job array 
  58.   }
  59.   /*
  60.    * Aggregated Job level counters 
  61.    */
  62.   private JobHistory.JobInfo _jobInfo;
  63.   
  64.   /*
  65.    * Job stats 
  66.    */
  67.   private java.util.Hashtable<Enum, String> _job;
  68.   /**
  69.    * @param jobConf the jobConf to set
  70.    */
  71.   public JobConf getJobConf() {
  72.     return this._jobConf;
  73.   }
  74.   
  75.   /*
  76.    * Get Job Counters of type long
  77.    */
  78.   public long getLongValue(Enum key) {
  79.     return Long.parseLong(this._job.get(key));
  80.   }
  81.   
  82.   /*
  83.    * Get job Counters of type Double
  84.    */
  85.   public double getDoubleValue(Enum key) {
  86.     return Double.parseDouble(this._job.get(key));
  87.   }
  88.   
  89.   /* 
  90.    * Get Job Counters of type String
  91.    */
  92.   public String getStringValue(Enum key) {
  93.     return this._job.get(key);
  94.   }
  95.   
  96.   /*
  97.    * Set key value of type long
  98.    */
  99.   public void setValue(Enum key, long value) {
  100.     this._job.put(key, Long.toString(value));
  101.   }
  102.   
  103.   /*
  104.    * Set key value of type double
  105.    */
  106.   public void setValue(Enum key, double value) {
  107.     this._job.put(key, Double.toString(value));
  108.   }
  109.   
  110.   /*
  111.    * Set key value of type String
  112.    */
  113.   public void setValue(Enum key, String value) {
  114.     this._job.put(key, value);
  115.   }
  116.   /*
  117.    * Map Task List (Sorted by task id)
  118.    */
  119.   private ArrayList<MapTaskStatistics> _mapTaskList = new ArrayList<MapTaskStatistics>();
  120.   
  121.   /*
  122.    * Reduce Task List (Sorted by task id)
  123.    */
  124.   private ArrayList<ReduceTaskStatistics> _reduceTaskList = new ArrayList<ReduceTaskStatistics>();
  125.   
  126.   /* 
  127.    * Ctor:
  128.    */
  129.   public JobStatistics (JobConf jobConf, JobInfo jobInfo) throws ParseException {
  130.     this._jobConf = jobConf;
  131.     this._jobInfo = jobInfo;
  132.     this._job = new Hashtable<Enum, String>();
  133.     populate_Job(this._job, this._jobInfo.getValues());
  134.     populate_MapReduceTaskLists(this._mapTaskList, this._reduceTaskList, this._jobInfo.getAllTasks());
  135.   }
  136.   
  137.   /*
  138.    * 
  139.    */
  140.   private void populate_MapReduceTaskLists (ArrayList<MapTaskStatistics> mapTaskList, 
  141.                               ArrayList<ReduceTaskStatistics> reduceTaskList, 
  142.                               java.util.Map<String, JobHistory.Task> taskMap) throws ParseException {
  143.     /*
  144.      * 
  145.      */
  146.     int num_tasks = taskMap.entrySet().size();
  147.     java.util.Iterator<Map.Entry<String, JobHistory.Task>> ti = taskMap.entrySet().iterator();
  148.     for (int i = 0; i < num_tasks; i++)
  149.     {
  150.       Map.Entry<String, JobHistory.Task> entry = (Map.Entry<String, JobHistory.Task>) ti.next();
  151.       JobHistory.Task task = entry.getValue();
  152.       if (task.get(Keys.TASK_TYPE).equals("MAP")) {
  153.       MapTaskStatistics mapT = new MapTaskStatistics();
  154.       java.util.Map<JobHistory.Keys, String> mapTask = task.getValues();
  155.       java.util.Map<JobHistory.Keys, String> successTaskAttemptMap  =  getLastSuccessfulTaskAttempt(task);
  156.       // NOTE: Following would lead to less number of actual tasks collected in the tasklist array
  157.       if (successTaskAttemptMap != null) {
  158.         mapTask.putAll(successTaskAttemptMap);
  159.       } else {
  160.         System.out.println("Task:<"+task.get(Keys.TASKID)+"> is not successful - SKIPPING");
  161.       }
  162.       int size = mapTask.size();
  163.       java.util.Iterator<Map.Entry<JobHistory.Keys, String>> kv = mapTask.entrySet().iterator();
  164.       for (int j = 0; j < size; j++)
  165.       {
  166.         Map.Entry<JobHistory.Keys, String> mtc = kv.next();
  167.         JobHistory.Keys key = mtc.getKey();
  168.         String value = mtc.getValue();
  169.         switch (key) {
  170.         case TASKID: mapT.setValue(MapTaskKeys.TASK_ID, value); break;
  171.         case TASK_ATTEMPT_ID: mapT.setValue(MapTaskKeys.ATTEMPT_ID, value); break;
  172.         case HOSTNAME: mapT.setValue(MapTaskKeys.HOSTNAME, value); break;
  173.         case TASK_TYPE: mapT.setValue(MapTaskKeys.TASK_TYPE, value); break;
  174.         case TASK_STATUS: mapT.setValue(MapTaskKeys.STATUS, value); break;
  175.         case START_TIME: mapT.setValue(MapTaskKeys.START_TIME, value); break;
  176.         case FINISH_TIME: mapT.setValue(MapTaskKeys.FINISH_TIME, value); break;
  177.         case SPLITS: mapT.setValue(MapTaskKeys.SPLITS, value); break;
  178.         case COUNTERS:
  179.           value.concat(",");
  180.           parseAndAddMapTaskCounters(mapT, value);
  181.           mapTaskList.add(mapT);
  182.           break;
  183.         default: System.out.println("JobHistory.Keys."+key+" : NOT INCLUDED IN PERFORMANCE ADVISOR MAP COUNTERS");
  184.           break;
  185.         }
  186.       }
  187.       
  188.       // Add number of task attempts
  189.       mapT.setValue(MapTaskKeys.NUM_ATTEMPTS, (new Integer(task.getTaskAttempts().size())).toString());
  190.       
  191.       }else if (task.get(Keys.TASK_TYPE).equals("REDUCE")) {
  192.       ReduceTaskStatistics reduceT = new ReduceTaskStatistics();
  193.       java.util.Map<JobHistory.Keys, String> reduceTask = task.getValues();
  194.       java.util.Map<JobHistory.Keys, String> successTaskAttemptMap  =  getLastSuccessfulTaskAttempt(task);
  195.       // NOTE: Following would lead to less number of actual tasks collected in the tasklist array
  196.       if (successTaskAttemptMap != null) {
  197.         reduceTask.putAll(successTaskAttemptMap);
  198.       } else {
  199.         System.out.println("Task:<"+task.get(Keys.TASKID)+"> is not successful - SKIPPING");
  200.       }
  201.       int size = reduceTask.size();
  202.       java.util.Iterator<Map.Entry<JobHistory.Keys, String>> kv = reduceTask.entrySet().iterator();
  203.       for (int j = 0; j < size; j++)
  204.       {
  205.         Map.Entry<JobHistory.Keys, String> rtc = kv.next();
  206.         JobHistory.Keys key = rtc.getKey();
  207.         String value = rtc.getValue();
  208.         switch (key) {
  209.         case TASKID: reduceT.setValue(ReduceTaskKeys.TASK_ID, value); break;
  210.         case TASK_ATTEMPT_ID: reduceT.setValue(ReduceTaskKeys.ATTEMPT_ID, value); break;
  211.         case HOSTNAME: reduceT.setValue(ReduceTaskKeys.HOSTNAME, value); break;
  212.         case TASK_TYPE: reduceT.setValue(ReduceTaskKeys.TASK_TYPE, value); break;
  213.         case TASK_STATUS: reduceT.setValue(ReduceTaskKeys.STATUS, value); break;
  214.         case START_TIME: reduceT.setValue(ReduceTaskKeys.START_TIME, value); break;
  215.         case FINISH_TIME: reduceT.setValue(ReduceTaskKeys.FINISH_TIME, value); break;
  216.         case SHUFFLE_FINISHED: reduceT.setValue(ReduceTaskKeys.SHUFFLE_FINISH_TIME, value); break;
  217.         case SORT_FINISHED: reduceT.setValue(ReduceTaskKeys.SORT_FINISH_TIME, value); break;
  218.         case COUNTERS:
  219.           value.concat(",");
  220.           parseAndAddReduceTaskCounters(reduceT, value);
  221.           reduceTaskList.add(reduceT);
  222.           break;
  223.         default: System.out.println("JobHistory.Keys."+key+" : NOT INCLUDED IN PERFORMANCE ADVISOR REDUCE COUNTERS");
  224.           break;
  225.         }
  226.         
  227.         // Add number of task attempts
  228.         reduceT.setValue(ReduceTaskKeys.NUM_ATTEMPTS, (new Integer(task.getTaskAttempts().size())).toString());
  229.       }
  230.       } else if (task.get(Keys.TASK_TYPE).equals("CLEANUP")) {
  231.         //System.out.println("INFO: IGNORING TASK TYPE : "+task.get(Keys.TASK_TYPE));
  232.       } else {
  233.         System.out.println("UNKNOWN TASK TYPE : "+task.get(Keys.TASK_TYPE));
  234.       }
  235.     }
  236.   }
  237.   
  238.   /*
  239.    * Get last successful task attempt to be added in the stats
  240.    */
  241.   private java.util.Map<JobHistory.Keys, String> getLastSuccessfulTaskAttempt(JobHistory.Task task) {
  242.     
  243.     Map<String, JobHistory.TaskAttempt> taskAttempts = task.getTaskAttempts();
  244.     int size = taskAttempts.size();
  245.     java.util.Iterator<Map.Entry<String, JobHistory.TaskAttempt>> kv = taskAttempts.entrySet().iterator();
  246.     for (int i=0; i<size; i++) {
  247.       // CHECK_IT: Only one SUCCESSFUL TASK ATTEMPT
  248.       Map.Entry<String, JobHistory.TaskAttempt> tae = kv.next();
  249.       JobHistory.TaskAttempt attempt = tae.getValue();
  250.       if (attempt.getValues().get(JobHistory.Keys.TASK_STATUS).equals("SUCCESS")) {
  251.         return attempt.getValues();
  252.       }
  253.     }
  254.     
  255.     return null;
  256.   }
  257.   
  258.   /*
  259.    * Popuate the job stats 
  260.    */
  261.   private void populate_Job (Hashtable<Enum, String> job, java.util.Map<JobHistory.Keys, String> jobC) throws ParseException {
  262.     int size = jobC.size(); 
  263.     java.util.Iterator<Map.Entry<JobHistory.Keys, String>> kv = jobC.entrySet().iterator();
  264.     for (int i = 0; i < size; i++)
  265.     {
  266.       Map.Entry<JobHistory.Keys, String> entry = (Map.Entry<JobHistory.Keys, String>) kv.next();
  267.       JobHistory.Keys key = entry.getKey();
  268.       String value = entry.getValue();
  269.       switch (key) {
  270.       case JOBTRACKERID: job.put(JobKeys.JOBTRACKERID, value); break;
  271.       //case START_TIME: job.put(JobKeys., value); break;
  272.       case FINISH_TIME: job.put(JobKeys.FINISH_TIME, value); break;
  273.       case JOBID: job.put(JobKeys.JOBID, value); break;
  274.       case JOBNAME: job.put(JobKeys.JOBNAME, value); break;
  275.       case USER: job.put(JobKeys.USER, value); break;
  276.       case JOBCONF: job.put(JobKeys.JOBCONF, value); break;
  277.       case SUBMIT_TIME: job.put(JobKeys.SUBMIT_TIME, value); break;
  278.       case LAUNCH_TIME: job.put(JobKeys.LAUNCH_TIME, value); break;
  279.       case TOTAL_MAPS: job.put(JobKeys.TOTAL_MAPS, value); break;
  280.       case TOTAL_REDUCES: job.put(JobKeys.TOTAL_REDUCES, value); break;
  281.       case FAILED_MAPS: job.put(JobKeys.FAILED_MAPS, value); break;
  282.       case FAILED_REDUCES: job.put(JobKeys.FAILED_REDUCES, value); break;
  283.       case FINISHED_MAPS: job.put(JobKeys.FINISHED_MAPS, value); break;
  284.       case FINISHED_REDUCES: job.put(JobKeys.FINISHED_REDUCES, value); break;
  285.       case JOB_STATUS: job.put(JobKeys.STATUS, value); break;
  286.       case COUNTERS:
  287.         value.concat(",");
  288.         parseAndAddJobCounters(job, value);
  289.         break;
  290.       default:   System.out.println("JobHistory.Keys."+key+" : NOT INCLUDED IN PERFORMANCE ADVISOR COUNTERS");
  291.                break;
  292.       }
  293.     }
  294.   }
  295.   
  296.   /*
  297.    * Parse and add the job counters
  298.    */
  299.   private void parseAndAddJobCounters(Hashtable<Enum, String> job, String counters) throws ParseException {
  300.     Matcher m = _pattern.matcher(counters);
  301.     while(m.find()){
  302.       String ctuple = m.group(0);
  303.       //String ctuple = c1tuple.substring(0, c1tuple.length()-1);
  304.       String []parts = ctuple.split(":");
  305.       if (parts[0].equals("File Systems.Local bytes read")) {
  306.         job.put(JobKeys.LOCAL_BYTES_READ, parts[1]);
  307.       } else if (parts[0].equals("File Systems.Local bytes written")) {
  308.         job.put(JobKeys.LOCAL_BYTES_WRITTEN, parts[1]);
  309.       } else if (parts[0].equals("File Systems.HDFS bytes read")) {
  310.         job.put(JobKeys.HDFS_BYTES_READ, parts[1]);
  311.       } else if (parts[0].equals("File Systems.HDFS bytes written")) {
  312.         job.put(JobKeys.HDFS_BYTES_WRITTEN, parts[1]);
  313.       } else if (parts[0].equals("Job Counters .Launched map tasks")) {
  314.         job.put(JobKeys.LAUNCHED_MAPS, parts[1]);
  315.       } else if (parts[0].equals("Job Counters .Launched reduce tasks")) {
  316.         job.put(JobKeys.LAUNCHED_REDUCES, parts[1]);
  317.       } else if (parts[0].equals("Job Counters .Data-local map tasks")) {
  318.         job.put(JobKeys.DATALOCAL_MAPS, parts[1]);
  319.       } else if (parts[0].equals("Job Counters .Rack-local map tasks")) {
  320.         job.put(JobKeys.RACKLOCAL_MAPS, parts[1]);
  321.       } else if (parts[0].equals("Map-Reduce Framework.Map input records")) {
  322.         job.put(JobKeys.MAP_INPUT_RECORDS, parts[1]);
  323.       } else if (parts[0].equals("Map-Reduce Framework.Map output records")) {
  324.         job.put(JobKeys.MAP_OUTPUT_RECORDS, parts[1]);
  325.       } else if (parts[0].equals("Map-Reduce Framework.Map input bytes")) {
  326.         job.put(JobKeys.MAP_INPUT_BYTES, parts[1]);
  327.       } else if (parts[0].equals("Map-Reduce Framework.Map output bytes")) {
  328.         job.put(JobKeys.MAP_OUTPUT_BYTES, parts[1]);
  329.       } else if (parts[0].equals("Map-Reduce Framework.Combine input records")) {
  330.         job.put(JobKeys.COMBINE_INPUT_RECORDS, parts[1]);
  331.       } else if (parts[0].equals("Map-Reduce Framework.Combine output records")) {
  332.         job.put(JobKeys.COMBINE_OUTPUT_RECORDS, parts[1]);
  333.       } else if (parts[0].equals("Map-Reduce Framework.Reduce input groups")) {
  334.         job.put(JobKeys.REDUCE_INPUT_GROUPS, parts[1]);
  335.       } else if (parts[0].equals("Map-Reduce Framework.Reduce input records")) {
  336.         job.put(JobKeys.REDUCE_INPUT_RECORDS, parts[1]);
  337.       } else if (parts[0].equals("Map-Reduce Framework.Reduce output records")) {
  338.         job.put(JobKeys.REDUCE_OUTPUT_RECORDS, parts[1]);
  339.       } else {
  340.         System.out.println("Pattern:<"+ctuple+"> ==> NOT INCLUDED IN PERFORMANCE ADVISOR");
  341.       }
  342.     }  
  343.   }
  344.   
  345.   /*
  346.    * Parse and add the Map task counters
  347.    */
  348.   private void parseAndAddMapTaskCounters(MapTaskStatistics mapTask, String counters) {
  349.     Matcher m = _pattern.matcher(counters);
  350.     while(m.find()){
  351.       String ctuple = m.group(0);
  352.       //String ctuple = c1tuple.substring(0, c1tuple.length()-1);
  353.       String []parts = ctuple.split(":");
  354.       if (parts[0].equals("File Systems.Local bytes read")) {
  355.         mapTask.setValue(MapTaskKeys.LOCAL_BYTES_READ, parts[1]);
  356.       } else if (parts[0].equals("File Systems.Local bytes written")) {
  357.         mapTask.setValue(MapTaskKeys.LOCAL_BYTES_WRITTEN, parts[1]);
  358.       } else if (parts[0].equals("File Systems.HDFS bytes read")) {
  359.         mapTask.setValue(MapTaskKeys.HDFS_BYTES_READ, parts[1]);
  360.       } else if (parts[0].equals("File Systems.HDFS bytes written")) {
  361.         mapTask.setValue(MapTaskKeys.HDFS_BYTES_WRITTEN, parts[1]);
  362.       } else if (parts[0].equals("Map-Reduce Framework.Map input records")) {
  363.         mapTask.setValue(MapTaskKeys.INPUT_RECORDS, parts[1]);
  364.       } else if (parts[0].equals("Map-Reduce Framework.Map output records")) {
  365.         mapTask.setValue(MapTaskKeys.OUTPUT_RECORDS, parts[1]);
  366.       } else if (parts[0].equals("Map-Reduce Framework.Map input bytes")) {
  367.         mapTask.setValue(MapTaskKeys.INPUT_BYTES, parts[1]);
  368.       } else if (parts[0].equals("Map-Reduce Framework.Map output bytes")) {
  369.         mapTask.setValue(MapTaskKeys.OUTPUT_BYTES, parts[1]);
  370.       } else if (parts[0].equals("Map-Reduce Framework.Combine input records")) {
  371.         mapTask.setValue(MapTaskKeys.COMBINE_INPUT_RECORDS, parts[1]);
  372.       } else if (parts[0].equals("Map-Reduce Framework.Combine output records")) {
  373.         mapTask.setValue(MapTaskKeys.COMBINE_OUTPUT_RECORDS, parts[1]);
  374.       } else {
  375.         System.out.println("Pattern:<"+ctuple+"> ==> NOT INCLUDED IN PERFORMANCE ADVISOR MAP TASK");
  376.       }
  377.     }    
  378.   }
  379.   
  380.   /*
  381.    * Parse and add the reduce task counters
  382.    */
  383.   private void parseAndAddReduceTaskCounters(ReduceTaskStatistics reduceTask, String counters) {
  384.     Matcher m = _pattern.matcher(counters);
  385.     while(m.find()){
  386.       String ctuple = m.group(0);
  387.       //String ctuple = c1tuple.substring(0, c1tuple.length()-1);
  388.       String []parts = ctuple.split(":");
  389.       if (parts[0].equals("File Systems.Local bytes read")) {
  390.         reduceTask.setValue(ReduceTaskKeys.LOCAL_BYTES_READ, parts[1]);
  391.       } else if (parts[0].equals("File Systems.Local bytes written")) {
  392.         reduceTask.setValue(ReduceTaskKeys.LOCAL_BYTES_WRITTEN, parts[1]);
  393.       } else if (parts[0].equals("File Systems.HDFS bytes read")) {
  394.         reduceTask.setValue(ReduceTaskKeys.HDFS_BYTES_READ, parts[1]);
  395.       } else if (parts[0].equals("File Systems.HDFS bytes written")) {
  396.         reduceTask.setValue(ReduceTaskKeys.HDFS_BYTES_WRITTEN, parts[1]);
  397.       } else if (parts[0].equals("Map-Reduce Framework.Reduce input records")) {
  398.         reduceTask.setValue(ReduceTaskKeys.INPUT_RECORDS, parts[1]);
  399.       } else if (parts[0].equals("Map-Reduce Framework.Reduce output records")) {
  400.         reduceTask.setValue(ReduceTaskKeys.OUTPUT_RECORDS, parts[1]);
  401.       } else if (parts[0].equals("Map-Reduce Framework.Combine input records")) {
  402.         reduceTask.setValue(ReduceTaskKeys.COMBINE_INPUT_RECORDS, parts[1]);
  403.       } else if (parts[0].equals("Map-Reduce Framework.Combine output records")) {
  404.         reduceTask.setValue(ReduceTaskKeys.COMBINE_OUTPUT_RECORDS, parts[1]);
  405.       } else if (parts[0].equals("Map-Reduce Framework.Reduce input groups")) {
  406.         reduceTask.setValue(ReduceTaskKeys.INPUT_GROUPS, parts[1]);
  407.       } else {
  408.         System.out.println("Pattern:<"+ctuple+"> ==> NOT INCLUDED IN PERFORMANCE ADVISOR MAP TASK");
  409.       }
  410.     }    
  411.   }
  412.   
  413.   /*
  414.    * Print the Job Execution Statistics
  415.    * TODO: split to pring job, map/reduce task list and individual map/reduce task stats
  416.    */
  417.   public void printJobExecutionStatistics() {
  418.     /*
  419.      * Print Job Counters
  420.      */
  421.     System.out.println("JOB COUNTERS *********************************************");
  422.     int size = this._job.size();
  423.     java.util.Iterator<Map.Entry<Enum, String>> kv = this._job.entrySet().iterator();
  424.     for (int i = 0; i < size; i++)
  425.     {
  426.       Map.Entry<Enum, String> entry = (Map.Entry<Enum, String>) kv.next();
  427.       Enum key = entry.getKey();
  428.       String value = entry.getValue();
  429.       System.out.println("Key:<" + key.name() + ">, value:<"+ value +">"); 
  430.     }
  431.     /*
  432.      * 
  433.      */
  434.     System.out.println("MAP COUNTERS *********************************************");
  435.     int size1 = this._mapTaskList.size();
  436.     for (int i = 0; i < size1; i++)
  437.     {
  438.       System.out.println("MAP TASK *********************************************");
  439.       this._mapTaskList.get(i).printKeys();
  440.     }
  441.     /*
  442.      * 
  443.      */
  444.     System.out.println("REDUCE COUNTERS *********************************************");
  445.     int size2 = this._mapTaskList.size();
  446.     for (int i = 0; i < size2; i++)
  447.     {
  448.       System.out.println("REDUCE TASK *********************************************");
  449.       this._reduceTaskList.get(i).printKeys();
  450.     }
  451.   }
  452.   
  453.   /*
  454.    * Hash table keeping sorted lists of map tasks based on the specific map task key
  455.    */
  456.   private Hashtable <Enum, ArrayList<MapTaskStatistics>> _sortedMapTaskListsByKey = new Hashtable<Enum, ArrayList<MapTaskStatistics>>();
  457.   
  458.   /*
  459.    * @return mapTaskList : ArrayList of MapTaskStatistics
  460.    * @param mapTaskSortKey : Specific counter key used for sorting the task list
  461.    * @param datatype : indicates the data type of the counter key used for sorting
  462.    * If sort key is null then by default map tasks are sorted using map task ids.
  463.    */
  464.   public synchronized ArrayList<MapTaskStatistics> 
  465.           getMapTaskList(Enum mapTaskSortKey, KeyDataType dataType) {
  466.     
  467.     /* 
  468.      * If mapTaskSortKey is null then use the task id as a key.
  469.      */
  470.     if (mapTaskSortKey == null) {
  471.       mapTaskSortKey = MapTaskKeys.TASK_ID;
  472.     }
  473.     
  474.     if (this._sortedMapTaskListsByKey.get(mapTaskSortKey) == null) {
  475.       ArrayList<MapTaskStatistics> newList = (ArrayList<MapTaskStatistics>)this._mapTaskList.clone();
  476.       this._sortedMapTaskListsByKey.put(mapTaskSortKey, this.sortMapTasksByKey(newList, mapTaskSortKey, dataType));
  477.     } 
  478.     return this._sortedMapTaskListsByKey.get(mapTaskSortKey);
  479.   }
  480.   
  481.   private ArrayList<MapTaskStatistics> sortMapTasksByKey (ArrayList<MapTaskStatistics> mapTasks, 
  482.                          Enum key, Enum dataType) {
  483.     MapCounterComparator mcc = new MapCounterComparator(key, dataType);
  484.     Collections.sort (mapTasks, mcc);
  485.     return mapTasks;
  486.   }
  487.   
  488.   private class MapCounterComparator implements Comparator<MapTaskStatistics> {
  489.     public Enum _sortKey;
  490.     public Enum _dataType;
  491.     
  492.     public MapCounterComparator(Enum key, Enum dataType) {
  493.       this._sortKey = key;
  494.       this._dataType = dataType;
  495.     }
  496.     
  497.     // Comparator interface requires defining compare method.
  498.     public int compare(MapTaskStatistics a, MapTaskStatistics b) {
  499.       if (this._dataType == KeyDataType.LONG) {
  500.         long aa = a.getLongValue(this._sortKey);
  501.         long bb = b.getLongValue(this._sortKey);
  502.         if (aa<bb) return -1; if (aa==bb) return 0; if (aa>bb) return 1;
  503.       } else {
  504.         return a.getStringValue(this._sortKey).compareToIgnoreCase(b.getStringValue(this._sortKey));
  505.       }
  506.       
  507.       return 0;
  508.     }
  509.   }
  510.   
  511.   /*
  512.    * Reduce Array List sorting
  513.    */
  514.     private Hashtable <Enum, ArrayList<ReduceTaskStatistics>> _sortedReduceTaskListsByKey = new Hashtable<Enum,ArrayList<ReduceTaskStatistics>>();
  515.   
  516.     /*
  517.      * @return reduceTaskList : ArrayList of ReduceTaskStatistics
  518.    * @param reduceTaskSortKey : Specific counter key used for sorting the task list
  519.    * @param dataType : indicates the data type of the counter key used for sorting
  520.    * If sort key is null then, by default reduce tasks are sorted using task ids.
  521.      */
  522.   public synchronized ArrayList<ReduceTaskStatistics> 
  523.                                 getReduceTaskList (Enum reduceTaskSortKey, KeyDataType dataType) {
  524.     
  525.     /* 
  526.      * If reduceTaskSortKey is null then use the task id as a key.
  527.      */
  528.     if (reduceTaskSortKey == null) {
  529.       reduceTaskSortKey = ReduceTaskKeys.TASK_ID;
  530.     }
  531.     
  532.     if (this._sortedReduceTaskListsByKey.get(reduceTaskSortKey) == null) {
  533.       ArrayList<ReduceTaskStatistics> newList = (ArrayList<ReduceTaskStatistics>)this._reduceTaskList.clone();
  534.       this._sortedReduceTaskListsByKey.put(reduceTaskSortKey, this.sortReduceTasksByKey(newList, reduceTaskSortKey, dataType));
  535.     } 
  536.     
  537.     return this._sortedReduceTaskListsByKey.get(reduceTaskSortKey);  
  538.   }
  539.   
  540.   private ArrayList<ReduceTaskStatistics> sortReduceTasksByKey (ArrayList<ReduceTaskStatistics> reduceTasks, 
  541.                                 Enum key, Enum dataType) {
  542.     ReduceCounterComparator rcc = new ReduceCounterComparator(key, dataType);
  543.     Collections.sort (reduceTasks, rcc);
  544.     return reduceTasks;
  545.   }
  546.   
  547.   private class ReduceCounterComparator implements Comparator<ReduceTaskStatistics> {
  548.     public Enum _sortKey;
  549.     public Enum _dataType;  //either long or string
  550.     
  551.     public ReduceCounterComparator(Enum key, Enum dataType) {
  552.       this._sortKey = key;
  553.       this._dataType = dataType;
  554.     }
  555.     
  556.     // Comparator interface requires defining compare method.
  557.     public int compare(ReduceTaskStatistics a, ReduceTaskStatistics b) {
  558.       if (this._dataType == KeyDataType.LONG) {
  559.         long aa = a.getLongValue(this._sortKey);
  560.         long bb = b.getLongValue(this._sortKey);
  561.         if (aa<bb) return -1; if (aa==bb) return 0; if (aa>bb) return 1;
  562.       } else {
  563.         return a.getStringValue(this._sortKey).compareToIgnoreCase(b.getStringValue(this._sortKey));
  564.       }
  565.       
  566.       return 0;
  567.     }
  568.   }
  569. }