DataXceiverServer.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:6k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfs.server.datanode;
  19. import java.io.IOException;
  20. import java.net.ServerSocket;
  21. import java.net.Socket;
  22. import java.net.SocketTimeoutException;
  23. import java.util.Collections;
  24. import java.util.HashMap;
  25. import java.util.Iterator;
  26. import java.util.Map;
  27. import org.apache.commons.logging.Log;
  28. import org.apache.hadoop.conf.Configuration;
  29. import org.apache.hadoop.hdfs.protocol.FSConstants;
  30. import org.apache.hadoop.hdfs.server.balancer.Balancer;
  31. import org.apache.hadoop.util.Daemon;
  32. import org.apache.hadoop.util.StringUtils;
  33. /**
  34.  * Server used for receiving/sending a block of data.
  35.  * This is created to listen for requests from clients or 
  36.  * other DataNodes.  This small server does not use the 
  37.  * Hadoop IPC mechanism.
  38.  */
  39. class DataXceiverServer implements Runnable, FSConstants {
  40.   public static final Log LOG = DataNode.LOG;
  41.   
  42.   ServerSocket ss;
  43.   DataNode datanode;
  44.   // Record all sockets opend for data transfer
  45.   Map<Socket, Socket> childSockets = Collections.synchronizedMap(
  46.                                        new HashMap<Socket, Socket>());
  47.   
  48.   /**
  49.    * Maximal number of concurrent xceivers per node.
  50.    * Enforcing the limit is required in order to avoid data-node
  51.    * running out of memory.
  52.    */
  53.   static final int MAX_XCEIVER_COUNT = 256;
  54.   int maxXceiverCount = MAX_XCEIVER_COUNT;
  55.   /** A manager to make sure that cluster balancing does not
  56.    * take too much resources.
  57.    * 
  58.    * It limits the number of block moves for balancing and
  59.    * the total amount of bandwidth they can use.
  60.    */
  61.   static class BlockBalanceThrottler extends BlockTransferThrottler {
  62.    private int numThreads;
  63.    
  64.    /**Constructor
  65.     * 
  66.     * @param bandwidth Total amount of bandwidth can be used for balancing 
  67.     */
  68.    private BlockBalanceThrottler(long bandwidth) {
  69.      super(bandwidth);
  70.      LOG.info("Balancing bandwith is "+ bandwidth + " bytes/s");
  71.    }
  72.    
  73.    /** Check if the block move can start. 
  74.     * 
  75.     * Return true if the thread quota is not exceeded and 
  76.     * the counter is incremented; False otherwise.
  77.     */
  78.    synchronized boolean acquire() {
  79.      if (numThreads >= Balancer.MAX_NUM_CONCURRENT_MOVES) {
  80.        return false;
  81.      }
  82.      numThreads++;
  83.      return true;
  84.    }
  85.    
  86.    /** Mark that the move is completed. The thread counter is decremented. */
  87.    synchronized void release() {
  88.      numThreads--;
  89.    }
  90.   }
  91.   BlockBalanceThrottler balanceThrottler;
  92.   
  93.   /**
  94.    * We need an estimate for block size to check if the disk partition has
  95.    * enough space. For now we set it to be the default block size set
  96.    * in the server side configuration, which is not ideal because the
  97.    * default block size should be a client-size configuration. 
  98.    * A better solution is to include in the header the estimated block size,
  99.    * i.e. either the actual block size or the default block size.
  100.    */
  101.   long estimateBlockSize;
  102.   
  103.   
  104.   DataXceiverServer(ServerSocket ss, Configuration conf, 
  105.       DataNode datanode) {
  106.     
  107.     this.ss = ss;
  108.     this.datanode = datanode;
  109.     
  110.     this.maxXceiverCount = conf.getInt("dfs.datanode.max.xcievers",
  111.         MAX_XCEIVER_COUNT);
  112.     
  113.     this.estimateBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
  114.     
  115.     //set up parameter for cluster balancing
  116.     this.balanceThrottler = new BlockBalanceThrottler(
  117.       conf.getLong("dfs.balance.bandwidthPerSec", 1024L*1024));
  118.   }
  119.   /**
  120.    */
  121.   public void run() {
  122.     while (datanode.shouldRun) {
  123.       try {
  124.         Socket s = ss.accept();
  125.         s.setTcpNoDelay(true);
  126.         new Daemon(datanode.threadGroup, 
  127.             new DataXceiver(s, datanode, this)).start();
  128.       } catch (SocketTimeoutException ignored) {
  129.         // wake up to see if should continue to run
  130.       } catch (IOException ie) {
  131.         LOG.warn(datanode.dnRegistration + ":DataXceiveServer: " 
  132.                                 + StringUtils.stringifyException(ie));
  133.       } catch (Throwable te) {
  134.         LOG.error(datanode.dnRegistration + ":DataXceiveServer: Exiting due to:" 
  135.                                  + StringUtils.stringifyException(te));
  136.         datanode.shouldRun = false;
  137.       }
  138.     }
  139.     try {
  140.       ss.close();
  141.     } catch (IOException ie) {
  142.       LOG.warn(datanode.dnRegistration + ":DataXceiveServer: " 
  143.                               + StringUtils.stringifyException(ie));
  144.     }
  145.   }
  146.   
  147.   void kill() {
  148.     assert datanode.shouldRun == false :
  149.       "shoudRun should be set to false before killing";
  150.     try {
  151.       this.ss.close();
  152.     } catch (IOException ie) {
  153.       LOG.warn(datanode.dnRegistration + ":DataXceiveServer.kill(): " 
  154.                               + StringUtils.stringifyException(ie));
  155.     }
  156.     // close all the sockets that were accepted earlier
  157.     synchronized (childSockets) {
  158.       for (Iterator<Socket> it = childSockets.values().iterator();
  159.            it.hasNext();) {
  160.         Socket thissock = it.next();
  161.         try {
  162.           thissock.close();
  163.         } catch (IOException e) {
  164.         }
  165.       }
  166.     }
  167.   }
  168. }