BlockTransferThrottler.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:4k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfs.server.datanode;
  19. /** 
  20.  * a class to throttle the block transfers.
  21.  * This class is thread safe. It can be shared by multiple threads.
  22.  * The parameter bandwidthPerSec specifies the total bandwidth shared by
  23.  * threads.
  24.  */
  25. class BlockTransferThrottler {
  26.   private long period;          // period over which bw is imposed
  27.   private long periodExtension; // Max period over which bw accumulates.
  28.   private long bytesPerPeriod; // total number of bytes can be sent in each period
  29.   private long curPeriodStart; // current period starting time
  30.   private long curReserve;     // remaining bytes can be sent in the period
  31.   private long bytesAlreadyUsed;
  32.   /** Constructor 
  33.    * @param bandwidthPerSec bandwidth allowed in bytes per second. 
  34.    */
  35.   BlockTransferThrottler(long bandwidthPerSec) {
  36.     this(500, bandwidthPerSec);  // by default throttling period is 500ms 
  37.   }
  38.   /**
  39.    * Constructor
  40.    * @param period in milliseconds. Bandwidth is enforced over this
  41.    *        period.
  42.    * @param bandwidthPerSec bandwidth allowed in bytes per second. 
  43.    */
  44.   BlockTransferThrottler(long period, long bandwidthPerSec) {
  45.     this.curPeriodStart = System.currentTimeMillis();
  46.     this.period = period;
  47.     this.curReserve = this.bytesPerPeriod = bandwidthPerSec*period/1000;
  48.     this.periodExtension = period*3;
  49.   }
  50.   /**
  51.    * @return current throttle bandwidth in bytes per second.
  52.    */
  53.   synchronized long getBandwidth() {
  54.     return bytesPerPeriod*1000/period;
  55.   }
  56.   
  57.   /**
  58.    * Sets throttle bandwidth. This takes affect latest by the end of current
  59.    * period.
  60.    * 
  61.    * @param bytesPerSecond 
  62.    */
  63.   synchronized void setBandwidth(long bytesPerSecond) {
  64.     if ( bytesPerSecond <= 0 ) {
  65.       throw new IllegalArgumentException("" + bytesPerSecond);
  66.     }
  67.     bytesPerPeriod = bytesPerSecond*period/1000;
  68.   }
  69.   
  70.   /** Given the numOfBytes sent/received since last time throttle was called,
  71.    * make the current thread sleep if I/O rate is too fast
  72.    * compared to the given bandwidth.
  73.    *
  74.    * @param numOfBytes
  75.    *     number of bytes sent/received since last time throttle was called
  76.    */
  77.   synchronized void throttle(long numOfBytes) {
  78.     if ( numOfBytes <= 0 ) {
  79.       return;
  80.     }
  81.     curReserve -= numOfBytes;
  82.     bytesAlreadyUsed += numOfBytes;
  83.     while (curReserve <= 0) {
  84.       long now = System.currentTimeMillis();
  85.       long curPeriodEnd = curPeriodStart + period;
  86.       if ( now < curPeriodEnd ) {
  87.         // Wait for next period so that curReserve can be increased.
  88.         try {
  89.           wait( curPeriodEnd - now );
  90.         } catch (InterruptedException ignored) {}
  91.       } else if ( now <  (curPeriodStart + periodExtension)) {
  92.         curPeriodStart = curPeriodEnd;
  93.         curReserve += bytesPerPeriod;
  94.       } else {
  95.         // discard the prev period. Throttler might not have
  96.         // been used for a long time.
  97.         curPeriodStart = now;
  98.         curReserve = bytesPerPeriod - bytesAlreadyUsed;
  99.       }
  100.     }
  101.     bytesAlreadyUsed -= numOfBytes;
  102.   }
  103. }