BalancedReducePartitioning.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:4k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.vaidya.postexdiagnosis.tests;
  19. import org.apache.hadoop.vaidya.statistics.job.JobStatistics;
  20. import org.apache.hadoop.vaidya.statistics.job.JobStatisticsInterface.JobKeys;
  21. import org.apache.hadoop.vaidya.statistics.job.JobStatisticsInterface.KeyDataType;
  22. import org.apache.hadoop.vaidya.statistics.job.JobStatisticsInterface.ReduceTaskKeys;
  23. import org.apache.hadoop.vaidya.statistics.job.ReduceTaskStatistics;
  24. import org.apache.hadoop.vaidya.DiagnosticTest;
  25. import org.w3c.dom.Element;
  26. import java.util.Hashtable;
  27. import java.util.List;
  28. /**
  29.  *
  30.  */
  31. public class BalancedReducePartitioning extends DiagnosticTest {
  32.   private long totalReduces;
  33.   private long busyReducers;
  34.   private long percentReduceRecordsSize;
  35.   private double percent;
  36.   private double impact;
  37.   
  38.   /**
  39.    * 
  40.    */
  41.   public BalancedReducePartitioning() {
  42.   }
  43.   /*    
  44.    */
  45.   @Override
  46.   public double evaluate(JobStatistics jobExecutionStats) {
  47.     
  48.     /*
  49.      * Read this rule specific input PercentReduceRecords
  50.      */
  51.     this.percent = getInputElementDoubleValue("PercentReduceRecords", 0.90);
  52.     
  53.     
  54.     /*
  55.      * Get the sorted reduce task list by number of INPUT_RECORDS (ascending) 
  56.      */
  57.     List<ReduceTaskStatistics> srTaskList = 
  58.                             jobExecutionStats.getReduceTaskList(ReduceTaskKeys.INPUT_RECORDS, KeyDataType.LONG);
  59.     this.percentReduceRecordsSize = (long) (this.percent * jobExecutionStats.getLongValue(JobKeys.REDUCE_INPUT_RECORDS));
  60.     this.totalReduces = jobExecutionStats.getLongValue(JobKeys.TOTAL_REDUCES);
  61.     long tempReduceRecordsCount = 0;
  62.     this.busyReducers = 0;
  63.     for (int i=srTaskList.size()-1; i>-1; i--) {
  64.       tempReduceRecordsCount += srTaskList.get(i).getLongValue(ReduceTaskKeys.INPUT_RECORDS);
  65.       this.busyReducers++;
  66.       if (tempReduceRecordsCount >= this.percentReduceRecordsSize) {
  67.         break;
  68.       }
  69.     }
  70.     
  71.     // Calculate Impact
  72.     return this.impact = (1 - (double)this.busyReducers/(double)this.totalReduces);
  73.     
  74.   }
  75.   /*
  76.    * helper function to print specific reduce counter for all reduce tasks
  77.    */
  78.   public void printReduceCounters (List<Hashtable<ReduceTaskKeys, String>> x, ReduceTaskKeys key) {
  79.     for (int i=0; i<x.size(); i++) {
  80.       System.out.println("ind:"+i+", Value:<"+x.get(i).get(key)+">");
  81.     }
  82.   }
  83.   
  84.   /* 
  85.    * 
  86.    */
  87.   @Override
  88.   public String getPrescription() {
  89.     return 
  90.     "* Use the appropriate partitioning function"+ "n" +
  91.     "* For streaming job consider following partitioner and hadoop config parametersn"+
  92.     "  * org.apache.hadoop.mapred.lib.KeyFieldBasedPartitionern" +
  93.     "  * -jobconf stream.map.output.field.separator, -jobconf stream.num.map.output.key.fields";
  94.   }
  95.   /* 
  96.    */
  97.   @Override
  98.   public String getReferenceDetails() {
  99.     String ref = 
  100.     "* TotalReduceTasks: "+this.totalReduces+"n"+
  101.     "* BusyReduceTasks processing "+this.percent+ "% of total records: " +this.busyReducers+"n"+
  102.     "* Impact: "+truncate(this.impact);
  103.     return ref;
  104.   }
  105. }