ResourceEstimator.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:3k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import org.apache.commons.logging.Log;
  20. import org.apache.commons.logging.LogFactory;
  21. /**
  22.  * Class responsible for modeling the resource consumption of running tasks.
  23.  * 
  24.  * For now, we just do temp space for maps
  25.  * 
  26.  * There is one ResourceEstimator per JobInProgress
  27.  *
  28.  */
  29. class ResourceEstimator {
  30.   //Log with JobInProgress
  31.   private static final Log LOG = LogFactory.getLog(
  32.       "org.apache.hadoop.mapred.ResourceEstimator");
  33.   private long completedMapsInputSize;
  34.   private long completedMapsOutputSize;
  35.   private int completedMapsUpdates;
  36.   final private JobInProgress job;
  37.   final private int threshholdToUse;
  38.   public ResourceEstimator(JobInProgress job) {
  39.     this.job = job;
  40.     threshholdToUse = job.desiredMaps()/ 10;
  41.   }
  42.   protected synchronized void updateWithCompletedTask(TaskStatus ts, 
  43.       TaskInProgress tip) {
  44.     //-1 indicates error, which we don't average in.
  45.     if(tip.isMapTask() &&  ts.getOutputSize() != -1)  {
  46.       completedMapsUpdates++;
  47.       completedMapsInputSize+=(tip.getMapInputSize()+1);
  48.       completedMapsOutputSize+=ts.getOutputSize();
  49.       LOG.info("completedMapsUpdates:"+completedMapsUpdates+"  "+
  50.           "completedMapsInputSize:"+completedMapsInputSize+"  " +
  51.         "completedMapsOutputSize:"+completedMapsOutputSize);
  52.     }
  53.   }
  54.   /**
  55.    * @return estimated length of this job's total map output
  56.    */
  57.   protected synchronized long getEstimatedTotalMapOutputSize()  {
  58.     if(completedMapsUpdates < threshholdToUse) {
  59.       return 0;
  60.     } else {
  61.       long inputSize = job.getInputLength() + job.desiredMaps(); 
  62.       //add desiredMaps() so that randomwriter case doesn't blow up
  63.       long estimate = Math.round((inputSize * 
  64.           completedMapsOutputSize * 2.0)/completedMapsInputSize);
  65.       LOG.debug("estimate total map output will be " + estimate);
  66.       return estimate;
  67.     }
  68.   }
  69.   
  70.   /**
  71.    * @return estimated length of this job's average map output
  72.    */
  73.   long getEstimatedMapOutputSize() {
  74.     long estimate = getEstimatedTotalMapOutputSize()  / job.desiredMaps();
  75.     return estimate;
  76.   }
  77.   /**
  78.    * 
  79.    * @return estimated length of this job's average reduce input
  80.    */
  81.   long getEstimatedReduceInputSize() {
  82.     if(job.desiredReduces() == 0) {//no reduce output, so no size
  83.       return 0;
  84.     } else {
  85.       return getEstimatedTotalMapOutputSize() / job.desiredReduces();
  86.       //estimate that each reduce gets an equal share of total map output
  87.     }
  88.   }
  89.   
  90. }