Balancer.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:55k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfs.server.balancer;
  19. import java.io.BufferedInputStream;
  20. import java.io.BufferedOutputStream;
  21. import java.io.DataInput;
  22. import java.io.DataInputStream;
  23. import java.io.DataOutput;
  24. import java.io.DataOutputStream;
  25. import java.io.IOException;
  26. import java.io.OutputStream;
  27. import java.net.InetAddress;
  28. import java.net.InetSocketAddress;
  29. import java.net.Socket;
  30. import java.text.DateFormat;
  31. import java.util.ArrayList;
  32. import java.util.Arrays;
  33. import java.util.Collection;
  34. import java.util.Date;
  35. import java.util.HashMap;
  36. import java.util.HashSet;
  37. import java.util.Iterator;
  38. import java.util.Formatter;
  39. import java.util.LinkedList;
  40. import java.util.List;
  41. import java.util.Map;
  42. import java.util.Random;
  43. import java.util.concurrent.ExecutionException;
  44. import java.util.concurrent.ExecutorService;
  45. import java.util.concurrent.Executors;
  46. import java.util.concurrent.Future;
  47. import java.util.concurrent.TimeUnit;
  48. import org.apache.commons.logging.Log;
  49. import org.apache.commons.logging.LogFactory;
  50. import org.apache.hadoop.conf.Configuration;
  51. import org.apache.hadoop.hdfs.server.common.HdfsConstants;
  52. import org.apache.hadoop.hdfs.server.common.Util;
  53. import org.apache.hadoop.hdfs.DFSClient;
  54. import org.apache.hadoop.hdfs.protocol.*;
  55. import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
  56. import org.apache.hadoop.hdfs.server.datanode.DataNode;
  57. import org.apache.hadoop.hdfs.server.namenode.NameNode;
  58. import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
  59. import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
  60. import org.apache.hadoop.fs.FileSystem;
  61. import org.apache.hadoop.fs.Path;
  62. import org.apache.hadoop.io.IOUtils;
  63. import org.apache.hadoop.io.Text;
  64. import org.apache.hadoop.io.Writable;
  65. import org.apache.hadoop.io.retry.RetryPolicies;
  66. import org.apache.hadoop.io.retry.RetryPolicy;
  67. import org.apache.hadoop.io.retry.RetryProxy;
  68. import org.apache.hadoop.ipc.RPC;
  69. import org.apache.hadoop.ipc.RemoteException;
  70. import org.apache.hadoop.net.NetUtils;
  71. import org.apache.hadoop.net.NetworkTopology;
  72. import org.apache.hadoop.security.UnixUserGroupInformation;
  73. import org.apache.hadoop.security.UserGroupInformation;
  74. import org.apache.hadoop.util.StringUtils;
  75. import org.apache.hadoop.util.Tool;
  76. import org.apache.hadoop.util.ToolRunner;
  77. /** <p>The balancer is a tool that balances disk space usage on an HDFS cluster
  78.  * when some datanodes become full or when new empty nodes join the cluster.
  79.  * The tool is deployed as an application program that can be run by the 
  80.  * cluster administrator on a live HDFS cluster while applications
  81.  * adding and deleting files.
  82.  * 
  83.  * <p>SYNOPSIS
  84.  * <pre>
  85.  * To start:
  86.  *      bin/start-balancer.sh [-threshold <threshold>]
  87.  *      Example: bin/ start-balancer.sh 
  88.  *                     start the balancer with a default threshold of 10%
  89.  *               bin/ start-balancer.sh -threshold 5
  90.  *                     start the balancer with a threshold of 5%
  91.  * To stop:
  92.  *      bin/ stop-balancer.sh
  93.  * </pre>
  94.  * 
  95.  * <p>DESCRIPTION
  96.  * <p>The threshold parameter is a fraction in the range of (0%, 100%) with a 
  97.  * default value of 10%. The threshold sets a target for whether the cluster 
  98.  * is balanced. A cluster is balanced if for each datanode, the utilization 
  99.  * of the node (ratio of used space at the node to total capacity of the node) 
  100.  * differs from the utilization of the (ratio of used space in the cluster 
  101.  * to total capacity of the cluster) by no more than the threshold value. 
  102.  * The smaller the threshold, the more balanced a cluster will become. 
  103.  * It takes more time to run the balancer for small threshold values. 
  104.  * Also for a very small threshold the cluster may not be able to reach the 
  105.  * balanced state when applications write and delete files concurrently.
  106.  * 
  107.  * <p>The tool moves blocks from highly utilized datanodes to poorly 
  108.  * utilized datanodes iteratively. In each iteration a datanode moves or 
  109.  * receives no more than the lesser of 10G bytes or the threshold fraction 
  110.  * of its capacity. Each iteration runs no more than 20 minutes.
  111.  * At the end of each iteration, the balancer obtains updated datanodes
  112.  * information from the namenode.
  113.  * 
  114.  * <p>A system property that limits the balancer's use of bandwidth is 
  115.  * defined in the default configuration file:
  116.  * <pre>
  117.  * <property>
  118.  *   <name>dfs.balance.bandwidthPerSec</name>
  119.  *   <value>1048576</value>
  120.  * <description>  Specifies the maximum bandwidth that each datanode 
  121.  * can utilize for the balancing purpose in term of the number of bytes 
  122.  * per second. </description>
  123.  * </property>
  124.  * </pre>
  125.  * 
  126.  * <p>This property determines the maximum speed at which a block will be 
  127.  * moved from one datanode to another. The default value is 1MB/s. The higher 
  128.  * the bandwidth, the faster a cluster can reach the balanced state, 
  129.  * but with greater competition with application processes. If an 
  130.  * administrator changes the value of this property in the configuration 
  131.  * file, the change is observed when HDFS is next restarted.
  132.  * 
  133.  * <p>MONITERING BALANCER PROGRESS
  134.  * <p>After the balancer is started, an output file name where the balancer 
  135.  * progress will be recorded is printed on the screen.  The administrator 
  136.  * can monitor the running of the balancer by reading the output file. 
  137.  * The output shows the balancer's status iteration by iteration. In each 
  138.  * iteration it prints the starting time, the iteration number, the total 
  139.  * number of bytes that have been moved in the previous iterations, 
  140.  * the total number of bytes that are left to move in order for the cluster 
  141.  * to be balanced, and the number of bytes that are being moved in this 
  142.  * iteration. Normally "Bytes Already Moved" is increasing while "Bytes Left 
  143.  * To Move" is decreasing.
  144.  * 
  145.  * <p>Running multiple instances of the balancer in an HDFS cluster is 
  146.  * prohibited by the tool.
  147.  * 
  148.  * <p>The balancer automatically exits when any of the following five 
  149.  * conditions is satisfied:
  150.  * <ol>
  151.  * <li>The cluster is balanced;
  152.  * <li>No block can be moved;
  153.  * <li>No block has been moved for five consecutive iterations;
  154.  * <li>An IOException occurs while communicating with the namenode;
  155.  * <li>Another balancer is running.
  156.  * </ol>
  157.  * 
  158.  * <p>Upon exit, a balancer returns an exit code and prints one of the 
  159.  * following messages to the output file in corresponding to the above exit 
  160.  * reasons:
  161.  * <ol>
  162.  * <li>The cluster is balanced. Exiting
  163.  * <li>No block can be moved. Exiting...
  164.  * <li>No block has been moved for 3 iterations. Exiting...
  165.  * <li>Received an IO exception: failure reason. Exiting...
  166.  * <li>Another balancer is running. Exiting...
  167.  * </ol>
  168.  * 
  169.  * <p>The administrator can interrupt the execution of the balancer at any 
  170.  * time by running the command "stop-balancer.sh" on the machine where the 
  171.  * balancer is running.
  172.  */
  173. public class Balancer implements Tool {
  174.   private static final Log LOG = 
  175.     LogFactory.getLog(Balancer.class.getName());
  176.   final private static long MAX_BLOCKS_SIZE_TO_FETCH = 2*1024*1024*1024L; //2GB
  177.   /** The maximum number of concurrent blocks moves for 
  178.    * balancing purpose at a datanode
  179.    */
  180.   public static final int MAX_NUM_CONCURRENT_MOVES = 5;
  181.   
  182.   private Configuration conf;
  183.   private double threshold = 10D;
  184.   private NamenodeProtocol namenode;
  185.   private ClientProtocol client;
  186.   private FileSystem fs;
  187.   private final static Random rnd = new Random();
  188.   
  189.   // all data node lists
  190.   private Collection<Source> overUtilizedDatanodes
  191.                                = new LinkedList<Source>();
  192.   private Collection<Source> aboveAvgUtilizedDatanodes
  193.                                = new LinkedList<Source>();
  194.   private Collection<BalancerDatanode> belowAvgUtilizedDatanodes
  195.                                = new LinkedList<BalancerDatanode>();
  196.   private Collection<BalancerDatanode> underUtilizedDatanodes
  197.                                = new LinkedList<BalancerDatanode>();
  198.   
  199.   private Collection<Source> sources
  200.                                = new HashSet<Source>();
  201.   private Collection<BalancerDatanode> targets
  202.                                = new HashSet<BalancerDatanode>();
  203.   
  204.   private Map<Block, BalancerBlock> globalBlockList
  205.                  = new HashMap<Block, BalancerBlock>();
  206.   private MovedBlocks movedBlocks = new MovedBlocks();
  207.   private Map<String, BalancerDatanode> datanodes
  208.                  = new HashMap<String, BalancerDatanode>();
  209.   
  210.   private NetworkTopology cluster = new NetworkTopology();
  211.   
  212.   private double avgUtilization = 0.0D;
  213.   
  214.   final static private int MOVER_THREAD_POOL_SIZE = 1000;
  215.   final private ExecutorService moverExecutor = 
  216.     Executors.newFixedThreadPool(MOVER_THREAD_POOL_SIZE);
  217.   final static private int DISPATCHER_THREAD_POOL_SIZE = 200;
  218.   final private ExecutorService dispatcherExecutor =
  219.     Executors.newFixedThreadPool(DISPATCHER_THREAD_POOL_SIZE);
  220.   
  221.   /* This class keeps track of a scheduled block move */
  222.   private class PendingBlockMove {
  223.     private BalancerBlock block;
  224.     private Source source;
  225.     private BalancerDatanode proxySource;
  226.     private BalancerDatanode target;
  227.     
  228.     /** constructor */
  229.     private PendingBlockMove() {
  230.     }
  231.     
  232.     /* choose a block & a proxy source for this pendingMove 
  233.      * whose source & target have already been chosen.
  234.      * 
  235.      * Return true if a block and its proxy are chosen; false otherwise
  236.      */
  237.     private boolean chooseBlockAndProxy() {
  238.       // iterate all source's blocks until find a good one    
  239.       for (Iterator<BalancerBlock> blocks=
  240.         source.getBlockIterator(); blocks.hasNext();) {
  241.         if (markMovedIfGoodBlock(blocks.next())) {
  242.           blocks.remove();
  243.           return true;
  244.         }
  245.       }
  246.       return false;
  247.     }
  248.     
  249.     /* Return true if the given block is good for the tentative move;
  250.      * If it is good, add it to the moved list to marked as "Moved".
  251.      * A block is good if
  252.      * 1. it is a good candidate; see isGoodBlockCandidate
  253.      * 2. can find a proxy source that's not busy for this move
  254.      */
  255.     private boolean markMovedIfGoodBlock(BalancerBlock block) {
  256.       synchronized(block) {
  257.         synchronized(movedBlocks) {
  258.           if (isGoodBlockCandidate(source, target, block)) {
  259.             this.block = block;
  260.             if ( chooseProxySource() ) {
  261.               movedBlocks.add(block);
  262.               if (LOG.isDebugEnabled()) {
  263.                 LOG.debug("Decided to move block "+ block.getBlockId()
  264.                     +" with a length of "+StringUtils.byteDesc(block.getNumBytes())
  265.                     + " bytes from " + source.getName() 
  266.                     + " to " + target.getName()
  267.                     + " using proxy source " + proxySource.getName() );
  268.               }
  269.               return true;
  270.             }
  271.           }
  272.         }
  273.       }
  274.       return false;
  275.     }
  276.     
  277.     /* Now we find out source, target, and block, we need to find a proxy
  278.      * 
  279.      * @return true if a proxy is found; otherwise false
  280.      */
  281.     private boolean chooseProxySource() {
  282.       // check if there is replica which is on the same rack with the target
  283.       for (BalancerDatanode loc : block.getLocations()) {
  284.         if (cluster.isOnSameRack(loc.getDatanode(), target.getDatanode())) {
  285.           if (loc.addPendingBlock(this)) {
  286.             proxySource = loc;
  287.             return true;
  288.           }
  289.         }
  290.       }
  291.       // find out a non-busy replica
  292.       for (BalancerDatanode loc : block.getLocations()) {
  293.         if (loc.addPendingBlock(this)) {
  294.           proxySource = loc;
  295.           return true;
  296.         }
  297.       }
  298.       return false;
  299.     }
  300.     
  301.     /* Dispatch the block move task to the proxy source & wait for the response
  302.      */
  303.     private void dispatch() {
  304.       Socket sock = new Socket();
  305.       DataOutputStream out = null;
  306.       DataInputStream in = null;
  307.       try {
  308.         sock.connect(NetUtils.createSocketAddr(
  309.             target.datanode.getName()), HdfsConstants.READ_TIMEOUT);
  310.         sock.setKeepAlive(true);
  311.         out = new DataOutputStream( new BufferedOutputStream(
  312.             sock.getOutputStream(), FSConstants.BUFFER_SIZE));
  313.         sendRequest(out);
  314.         in = new DataInputStream( new BufferedInputStream(
  315.             sock.getInputStream(), FSConstants.BUFFER_SIZE));
  316.         receiveResponse(in);
  317.         bytesMoved.inc(block.getNumBytes());
  318.         LOG.info( "Moving block " + block.getBlock().getBlockId() +
  319.               " from "+ source.getName() + " to " +
  320.               target.getName() + " through " +
  321.               proxySource.getName() +
  322.               " is succeeded." );
  323.       } catch (IOException e) {
  324.         LOG.warn("Error moving block "+block.getBlockId()+
  325.             " from " + source.getName() + " to " +
  326.             target.getName() + " through " +
  327.             proxySource.getName() +
  328.             ": "+e.getMessage());
  329.       } finally {
  330.         IOUtils.closeStream(out);
  331.         IOUtils.closeStream(in);
  332.         IOUtils.closeSocket(sock);
  333.         
  334.         proxySource.removePendingBlock(this);
  335.         synchronized(target) {
  336.           target.removePendingBlock(this);
  337.         }
  338.         synchronized (this ) {
  339.           reset();
  340.         }
  341.         synchronized (Balancer.this) {
  342.           Balancer.this.notifyAll();
  343.         }
  344.       }
  345.     }
  346.     
  347.     /* Send a block replace request to the output stream*/
  348.     private void sendRequest(DataOutputStream out) throws IOException {
  349.       out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
  350.       out.writeByte(DataTransferProtocol.OP_REPLACE_BLOCK);
  351.       out.writeLong(block.getBlock().getBlockId());
  352.       out.writeLong(block.getBlock().getGenerationStamp());
  353.       Text.writeString(out, source.getStorageID());
  354.       proxySource.write(out);
  355.       out.flush();
  356.     }
  357.     
  358.     /* Receive a block copy response from the input stream */ 
  359.     private void receiveResponse(DataInputStream in) throws IOException {
  360.       short status = in.readShort();
  361.       if (status != DataTransferProtocol.OP_STATUS_SUCCESS) {
  362.         throw new IOException("block move is failed");
  363.       }
  364.     }
  365.     /* reset the object */
  366.     private void reset() {
  367.       block = null;
  368.       source = null;
  369.       proxySource = null;
  370.       target = null;
  371.     }
  372.     
  373.     /* start a thread to dispatch the block move */
  374.     private void scheduleBlockMove() {
  375.       moverExecutor.execute(new Runnable() {
  376.         public void run() {
  377.           if (LOG.isDebugEnabled()) {
  378.             LOG.debug("Starting moving "+ block.getBlockId() +
  379.                 " from " + proxySource.getName() + " to " + target.getName());
  380.           }
  381.           dispatch();
  382.         }
  383.       });
  384.     }
  385.   }
  386.   
  387.   /* A class for keeping track of blocks in the Balancer */
  388.   static private class BalancerBlock {
  389.     private Block block; // the block
  390.     private List<BalancerDatanode> locations
  391.             = new ArrayList<BalancerDatanode>(3); // its locations
  392.     
  393.     /* Constructor */
  394.     private BalancerBlock(Block block) {
  395.       this.block = block;
  396.     }
  397.     
  398.     /* clean block locations */
  399.     private synchronized void clearLocations() {
  400.       locations.clear();
  401.     }
  402.     
  403.     /* add a location */
  404.     private synchronized void addLocation(BalancerDatanode datanode) {
  405.       if (!locations.contains(datanode)) {
  406.         locations.add(datanode);
  407.       }
  408.     }
  409.     
  410.     /* Return if the block is located on <code>datanode</code> */
  411.     private synchronized boolean isLocatedOnDatanode(
  412.         BalancerDatanode datanode) {
  413.       return locations.contains(datanode);
  414.     }
  415.     
  416.     /* Return its locations */
  417.     private synchronized List<BalancerDatanode> getLocations() {
  418.       return locations;
  419.     }
  420.     
  421.     /* Return the block */
  422.     private Block getBlock() {
  423.       return block;
  424.     }
  425.     
  426.     /* Return the block id */
  427.     private long getBlockId() {
  428.       return block.getBlockId();
  429.     }
  430.     
  431.     /* Return the length of the block */
  432.     private long getNumBytes() {
  433.       return block.getNumBytes();
  434.     }
  435.   }
  436.   
  437.   /* The class represents a desired move of bytes between two nodes 
  438.    * and the target.
  439.    * An object of this class is stored in a source node. 
  440.    */
  441.   static private class NodeTask {
  442.     private BalancerDatanode datanode; //target node
  443.     private long size;  //bytes scheduled to move
  444.     
  445.     /* constructor */
  446.     private NodeTask(BalancerDatanode datanode, long size) {
  447.       this.datanode = datanode;
  448.       this.size = size;
  449.     }
  450.     
  451.     /* Get the node */
  452.     private BalancerDatanode getDatanode() {
  453.       return datanode;
  454.     }
  455.     
  456.     /* Get the number of bytes that need to be moved */
  457.     private long getSize() {
  458.       return size;
  459.     }
  460.   }
  461.   
  462.   /* Return the utilization of a datanode */
  463.   static private double getUtilization(DatanodeInfo datanode) {
  464.     return ((double)datanode.getDfsUsed())/datanode.getCapacity()*100;
  465.   }
  466.   
  467.   /* A class that keeps track of a datanode in Balancer */
  468.   private static class BalancerDatanode implements Writable {
  469.     final private static long MAX_SIZE_TO_MOVE = 10*1024*1024*1024L; //10GB
  470.     protected DatanodeInfo datanode;
  471.     private double utilization;
  472.     protected long maxSizeToMove;
  473.     protected long scheduledSize = 0L;
  474.     //  blocks being moved but not confirmed yet
  475.     private List<PendingBlockMove> pendingBlocks = 
  476.       new ArrayList<PendingBlockMove>(MAX_NUM_CONCURRENT_MOVES); 
  477.     
  478.     /* Constructor 
  479.      * Depending on avgutil & threshold, calculate maximum bytes to move 
  480.      */
  481.     private BalancerDatanode(
  482.         DatanodeInfo node, double avgUtil, double threshold) {
  483.       datanode = node;
  484.       utilization = Balancer.getUtilization(node);
  485.         
  486.       if (utilization >= avgUtil+threshold
  487.           || utilization <= avgUtil-threshold) { 
  488.         maxSizeToMove = (long)(threshold*datanode.getCapacity()/100);
  489.       } else {
  490.         maxSizeToMove = 
  491.           (long)(Math.abs(avgUtil-utilization)*datanode.getCapacity()/100);
  492.       }
  493.       if (utilization < avgUtil ) {
  494.         maxSizeToMove = Math.min(datanode.getRemaining(), maxSizeToMove);
  495.       }
  496.       maxSizeToMove = Math.min(MAX_SIZE_TO_MOVE, maxSizeToMove);
  497.     }
  498.     
  499.     /** Get the datanode */
  500.     protected DatanodeInfo getDatanode() {
  501.       return datanode;
  502.     }
  503.     
  504.     /** Get the name of the datanode */
  505.     protected String getName() {
  506.       return datanode.getName();
  507.     }
  508.     
  509.     /* Get the storage id of the datanode */
  510.     protected String getStorageID() {
  511.       return datanode.getStorageID();
  512.     }
  513.     
  514.     /** Decide if still need to move more bytes */
  515.     protected boolean isMoveQuotaFull() {
  516.       return scheduledSize<maxSizeToMove;
  517.     }
  518.     /** Return the total number of bytes that need to be moved */
  519.     protected long availableSizeToMove() {
  520.       return maxSizeToMove-scheduledSize;
  521.     }
  522.     
  523.     /* increment scheduled size */
  524.     protected void incScheduledSize(long size) {
  525.       scheduledSize += size;
  526.     }
  527.     
  528.     /* Check if the node can schedule more blocks to move */
  529.     synchronized private boolean isPendingQNotFull() {
  530.       if ( pendingBlocks.size() < MAX_NUM_CONCURRENT_MOVES ) {
  531.         return true;
  532.       }
  533.       return false;
  534.     }
  535.     
  536.     /* Check if all the dispatched moves are done */
  537.     synchronized private boolean isPendingQEmpty() {
  538.       return pendingBlocks.isEmpty();
  539.     }
  540.     
  541.     /* Add a scheduled block move to the node */
  542.     private synchronized boolean addPendingBlock(
  543.         PendingBlockMove pendingBlock) {
  544.       if (isPendingQNotFull()) {
  545.         return pendingBlocks.add(pendingBlock);
  546.       }
  547.       return false;
  548.     }
  549.     
  550.     /* Remove a scheduled block move from the node */
  551.     private synchronized boolean  removePendingBlock(
  552.         PendingBlockMove pendingBlock) {
  553.       return pendingBlocks.remove(pendingBlock);
  554.     }
  555.     /** The following two methods support the Writable interface */
  556.     /** Deserialize */
  557.     public void readFields(DataInput in) throws IOException {
  558.       datanode.readFields(in);
  559.     }
  560.     /** Serialize */
  561.     public void write(DataOutput out) throws IOException {
  562.       datanode.write(out);
  563.     }
  564.   }
  565.   
  566.   /** A node that can be the sources of a block move */
  567.   private class Source extends BalancerDatanode {
  568.     
  569.     /* A thread that initiates a block move 
  570.      * and waits for block move to complete */
  571.     private class BlockMoveDispatcher implements Runnable {
  572.       public void run() {
  573.         dispatchBlocks();
  574.       }
  575.     }
  576.     
  577.     private ArrayList<NodeTask> nodeTasks = new ArrayList<NodeTask>(2);
  578.     private long blocksToReceive = 0L;
  579.     /* source blocks point to balancerBlocks in the global list because
  580.      * we want to keep one copy of a block in balancer and be aware that
  581.      * the locations are changing over time.
  582.      */
  583.     private List<BalancerBlock> srcBlockList
  584.             = new ArrayList<BalancerBlock>();
  585.     
  586.     /* constructor */
  587.     private Source(DatanodeInfo node, double avgUtil, double threshold) {
  588.       super(node, avgUtil, threshold);
  589.     }
  590.     
  591.     /** Add a node task */
  592.     private void addNodeTask(NodeTask task) {
  593.       assert (task.datanode != this) :
  594.         "Source and target are the same " + datanode.getName();
  595.       incScheduledSize(task.getSize());
  596.       nodeTasks.add(task);
  597.     }
  598.     
  599.     /* Return an iterator to this source's blocks */
  600.     private Iterator<BalancerBlock> getBlockIterator() {
  601.       return srcBlockList.iterator();
  602.     }
  603.     
  604.     /* fetch new blocks of this source from namenode and
  605.      * update this source's block list & the global block list
  606.      * Return the total size of the received blocks in the number of bytes.
  607.      */
  608.     private long getBlockList() throws IOException {
  609.       BlockWithLocations[] newBlocks = namenode.getBlocks(datanode, 
  610.         (long)Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive)).getBlocks();
  611.       long bytesReceived = 0;
  612.       for (BlockWithLocations blk : newBlocks) {
  613.         bytesReceived += blk.getBlock().getNumBytes();
  614.         BalancerBlock block;
  615.         synchronized(globalBlockList) {
  616.           block = globalBlockList.get(blk.getBlock());
  617.           if (block==null) {
  618.             block = new BalancerBlock(blk.getBlock());
  619.             globalBlockList.put(blk.getBlock(), block);
  620.           } else {
  621.             block.clearLocations();
  622.           }
  623.         
  624.           synchronized (block) {
  625.             // update locations
  626.             for ( String location : blk.getDatanodes() ) {
  627.               BalancerDatanode datanode = datanodes.get(location);
  628.               if (datanode != null) { // not an unknown datanode
  629.                 block.addLocation(datanode);
  630.               }
  631.             }
  632.           }
  633.           if (!srcBlockList.contains(block) && isGoodBlockCandidate(block)) {
  634.             // filter bad candidates
  635.             srcBlockList.add(block);
  636.           }
  637.         }
  638.       }
  639.       return bytesReceived;
  640.     }
  641.     /* Decide if the given block is a good candidate to move or not */
  642.     private boolean isGoodBlockCandidate(BalancerBlock block) {
  643.       for (NodeTask nodeTask : nodeTasks) {
  644.         if (Balancer.this.isGoodBlockCandidate(this, nodeTask.datanode, block)) {
  645.           return true;
  646.         }
  647.       }
  648.       return false;
  649.     }
  650.     /* Return a block that's good for the source thread to dispatch immediately
  651.      * The block's source, target, and proxy source are determined too.
  652.      * When choosing proxy and target, source & target throttling
  653.      * has been considered. They are chosen only when they have the capacity
  654.      * to support this block move.
  655.      * The block should be dispatched immediately after this method is returned.
  656.      */
  657.     private PendingBlockMove chooseNextBlockToMove() {
  658.       for ( Iterator<NodeTask> tasks=nodeTasks.iterator(); tasks.hasNext(); ) {
  659.         NodeTask task = tasks.next();
  660.         BalancerDatanode target = task.getDatanode();
  661.         PendingBlockMove pendingBlock = new PendingBlockMove();
  662.         if ( target.addPendingBlock(pendingBlock) ) { 
  663.           // target is not busy, so do a tentative block allocation
  664.           pendingBlock.source = this;
  665.           pendingBlock.target = target;
  666.           if ( pendingBlock.chooseBlockAndProxy() ) {
  667.             long blockSize = pendingBlock.block.getNumBytes(); 
  668.             scheduledSize -= blockSize;
  669.             task.size -= blockSize;
  670.             if (task.size == 0) {
  671.               tasks.remove();
  672.             }
  673.             return pendingBlock;
  674.           } else {
  675.             // cancel the tentative move
  676.             target.removePendingBlock(pendingBlock);
  677.           }
  678.         }
  679.       }
  680.       return null;
  681.     }
  682.     /* iterate all source's blocks to remove moved ones */    
  683.     private void filterMovedBlocks() {
  684.       for (Iterator<BalancerBlock> blocks=getBlockIterator();
  685.             blocks.hasNext();) {
  686.         if (movedBlocks.contains(blocks.next())) {
  687.           blocks.remove();
  688.         }
  689.       }
  690.     }
  691.     
  692.     private static final int SOURCE_BLOCK_LIST_MIN_SIZE=5;
  693.     /* Return if should fetch more blocks from namenode */
  694.     private boolean shouldFetchMoreBlocks() {
  695.       return srcBlockList.size()<SOURCE_BLOCK_LIST_MIN_SIZE &&
  696.                  blocksToReceive>0;
  697.     }
  698.     
  699.     /* This method iteratively does the following:
  700.      * it first selects a block to move,
  701.      * then sends a request to the proxy source to start the block move
  702.      * when the source's block list falls below a threshold, it asks
  703.      * the namenode for more blocks.
  704.      * It terminates when it has dispatch enough block move tasks or
  705.      * it has received enough blocks from the namenode, or 
  706.      * the elapsed time of the iteration has exceeded the max time limit.
  707.      */ 
  708.     private static final long MAX_ITERATION_TIME = 20*60*1000L; //20 mins
  709.     private void dispatchBlocks() {
  710.       long startTime = Util.now();
  711.       this.blocksToReceive = 2*scheduledSize;
  712.       boolean isTimeUp = false;
  713.       while(!isTimeUp && scheduledSize>0 &&
  714.           (!srcBlockList.isEmpty() || blocksToReceive>0)) {
  715.         PendingBlockMove pendingBlock = chooseNextBlockToMove();
  716.         if (pendingBlock != null) {
  717.           // move the block
  718.           pendingBlock.scheduleBlockMove();
  719.           continue;
  720.         }
  721.         
  722.         /* Since we can not schedule any block to move,
  723.          * filter any moved blocks from the source block list and
  724.          * check if we should fetch more blocks from the namenode
  725.          */
  726.         filterMovedBlocks(); // filter already moved blocks
  727.         if (shouldFetchMoreBlocks()) {
  728.           // fetch new blocks
  729.           try {
  730.             blocksToReceive -= getBlockList();
  731.             continue;
  732.           } catch (IOException e) {
  733.             LOG.warn(StringUtils.stringifyException(e));
  734.             return;
  735.           }
  736.         } 
  737.         
  738.         // check if time is up or not
  739.         if (Util.now()-startTime > MAX_ITERATION_TIME) {
  740.           isTimeUp = true;
  741.           continue;
  742.         }
  743.         
  744.         /* Now we can not schedule any block to move and there are
  745.          * no new blocks added to the source block list, so we wait. 
  746.          */
  747.         try {
  748.           synchronized(Balancer.this) {
  749.             Balancer.this.wait(1000);  // wait for targets/sources to be idle
  750.           }
  751.         } catch (InterruptedException ignored) {
  752.         }
  753.       }
  754.     }
  755.   }
  756.   
  757.   /** Default constructor */
  758.   Balancer() {
  759.   }
  760.   
  761.   /** Construct a balancer from the given configuration */
  762.   Balancer(Configuration conf) {
  763.     setConf(conf);
  764.   } 
  765.   /** Construct a balancer from the given configuration and threshold */
  766.   Balancer(Configuration conf, double threshold) {
  767.     setConf(conf);
  768.     this.threshold = threshold;
  769.   }
  770.   /**
  771.    * Run a balancer
  772.    * @param args
  773.    */
  774.   public static void main(String[] args) {
  775.     try {
  776.       System.exit( ToolRunner.run(null, new Balancer(), args) );
  777.     } catch (Throwable e) {
  778.       LOG.error(StringUtils.stringifyException(e));
  779.       System.exit(-1);
  780.     }
  781.   }
  782.   private static void printUsage() {
  783.     System.out.println("Usage: java Balancer");
  784.     System.out.println("          [-threshold <threshold>]t" 
  785.         +"percentage of disk capacity");
  786.   }
  787.   /* parse argument to get the threshold */
  788.   private double parseArgs(String[] args) {
  789.     double threshold=0;
  790.     int argsLen = (args == null) ? 0 : args.length;
  791.     if (argsLen==0) {
  792.       threshold = 10;
  793.     } else {
  794.       if (argsLen != 2 || !"-threshold".equalsIgnoreCase(args[0])) {
  795.         printUsage();
  796.         throw new IllegalArgumentException(Arrays.toString(args));
  797.       } else {
  798.         try {
  799.           threshold = Double.parseDouble(args[1]);
  800.           if (threshold < 0 || threshold >100) {
  801.             throw new NumberFormatException();
  802.           }
  803.           LOG.info( "Using a threshold of " + threshold );
  804.         } catch(NumberFormatException e) {
  805.           System.err.println(
  806.               "Expect a double parameter in the range of [0, 100]: "+ args[1]);
  807.           printUsage();
  808.           throw e;
  809.         }
  810.       }
  811.     }
  812.     return threshold;
  813.   }
  814.   
  815.   /* Initialize balancer. It sets the value of the threshold, and 
  816.    * builds the communication proxies to
  817.    * namenode as a client and a secondary namenode and retry proxies
  818.    * when connection fails.
  819.    */
  820.   private void init(double threshold) throws IOException {
  821.     this.threshold = threshold;
  822.     this.namenode = createNamenode(conf);
  823.     this.client = DFSClient.createNamenode(conf);
  824.     this.fs = FileSystem.get(conf);
  825.   }
  826.   
  827.   /* Build a NamenodeProtocol connection to the namenode and
  828.    * set up the retry policy */ 
  829.   private static NamenodeProtocol createNamenode(Configuration conf)
  830.     throws IOException {
  831.     InetSocketAddress nameNodeAddr = NameNode.getAddress(conf);
  832.     RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry(
  833.         5, 200, TimeUnit.MILLISECONDS);
  834.     Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
  835.       new HashMap<Class<? extends Exception>, RetryPolicy>();
  836.     RetryPolicy methodPolicy = RetryPolicies.retryByException(
  837.         timeoutPolicy, exceptionToPolicyMap);
  838.     Map<String,RetryPolicy> methodNameToPolicyMap =
  839.         new HashMap<String, RetryPolicy>();
  840.     methodNameToPolicyMap.put("getBlocks", methodPolicy);
  841.     UserGroupInformation ugi;
  842.     try {
  843.       ugi = UnixUserGroupInformation.login(conf);
  844.     } catch (javax.security.auth.login.LoginException e) {
  845.       throw new IOException(StringUtils.stringifyException(e));
  846.     }
  847.     return (NamenodeProtocol) RetryProxy.create(
  848.         NamenodeProtocol.class,
  849.         RPC.getProxy(NamenodeProtocol.class,
  850.             NamenodeProtocol.versionID,
  851.             nameNodeAddr,
  852.             ugi,
  853.             conf,
  854.             NetUtils.getDefaultSocketFactory(conf)),
  855.         methodNameToPolicyMap);
  856.   }
  857.   
  858.   /* Shuffle datanode array */
  859.   static private void shuffleArray(DatanodeInfo[] datanodes) {
  860.     for (int i=datanodes.length; i>1; i--) {
  861.       int randomIndex = rnd.nextInt(i);
  862.       DatanodeInfo tmp = datanodes[randomIndex];
  863.       datanodes[randomIndex] = datanodes[i-1];
  864.       datanodes[i-1] = tmp;
  865.     }
  866.   }
  867.   
  868.   /* get all live datanodes of a cluster and their disk usage
  869.    * decide the number of bytes need to be moved
  870.    */
  871.   private long initNodes() throws IOException {
  872.     return initNodes(client.getDatanodeReport(DatanodeReportType.LIVE));
  873.   }
  874.   
  875.   /* Given a data node set, build a network topology and decide
  876.    * over-utilized datanodes, above average utilized datanodes, 
  877.    * below average utilized datanodes, and underutilized datanodes. 
  878.    * The input data node set is shuffled before the datanodes 
  879.    * are put into the over-utilized datanodes, above average utilized
  880.    * datanodes, below average utilized datanodes, and
  881.    * underutilized datanodes lists. This will add some randomness
  882.    * to the node matching later on.
  883.    * 
  884.    * @return the total number of bytes that are 
  885.    *                needed to move to make the cluster balanced.
  886.    * @param datanodes a set of datanodes
  887.    */
  888.   private long initNodes(DatanodeInfo[] datanodes) {
  889.     // compute average utilization
  890.     long totalCapacity=0L, totalUsedSpace=0L;
  891.     for (DatanodeInfo datanode : datanodes) {
  892.       if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) {
  893.         continue; // ignore decommissioning or decommissioned nodes
  894.       }
  895.       totalCapacity += datanode.getCapacity();
  896.       totalUsedSpace += datanode.getDfsUsed();
  897.     }
  898.     this.avgUtilization = ((double)totalUsedSpace)/totalCapacity*100;
  899.     /*create network topology and all data node lists: 
  900.      * overloaded, above-average, below-average, and underloaded
  901.      * we alternates the accessing of the given datanodes array either by
  902.      * an increasing order or a decreasing order.
  903.      */  
  904.     long overLoadedBytes = 0L, underLoadedBytes = 0L;
  905.     shuffleArray(datanodes);
  906.     for (DatanodeInfo datanode : datanodes) {
  907.       if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) {
  908.         continue; // ignore decommissioning or decommissioned nodes
  909.       }
  910.       cluster.add(datanode);
  911.       BalancerDatanode datanodeS;
  912.       if (getUtilization(datanode) > avgUtilization) {
  913.         datanodeS = new Source(datanode, avgUtilization, threshold);
  914.         if (isAboveAvgUtilized(datanodeS)) {
  915.           this.aboveAvgUtilizedDatanodes.add((Source)datanodeS);
  916.         } else {
  917.           assert(isOverUtilized(datanodeS)) :
  918.             datanodeS.getName()+ "is not an overUtilized node";
  919.           this.overUtilizedDatanodes.add((Source)datanodeS);
  920.           overLoadedBytes += (long)((datanodeS.utilization-avgUtilization
  921.               -threshold)*datanodeS.datanode.getCapacity()/100.0);
  922.         }
  923.       } else {
  924.         datanodeS = new BalancerDatanode(datanode, avgUtilization, threshold);
  925.         if ( isBelowAvgUtilized(datanodeS)) {
  926.           this.belowAvgUtilizedDatanodes.add(datanodeS);
  927.         } else {
  928.           assert (isUnderUtilized(datanodeS)) :
  929.             datanodeS.getName()+ "is not an underUtilized node"; 
  930.           this.underUtilizedDatanodes.add(datanodeS);
  931.           underLoadedBytes += (long)((avgUtilization-threshold-
  932.               datanodeS.utilization)*datanodeS.datanode.getCapacity()/100.0);
  933.         }
  934.       }
  935.       this.datanodes.put(datanode.getStorageID(), datanodeS);
  936.     }
  937.     //logging
  938.     logImbalancedNodes();
  939.     
  940.     assert (this.datanodes.size() == 
  941.       overUtilizedDatanodes.size()+underUtilizedDatanodes.size()+
  942.       aboveAvgUtilizedDatanodes.size()+belowAvgUtilizedDatanodes.size())
  943.       : "Mismatched number of datanodes";
  944.     
  945.     // return number of bytes to be moved in order to make the cluster balanced
  946.     return Math.max(overLoadedBytes, underLoadedBytes);
  947.   }
  948.   /* log the over utilized & under utilized nodes */
  949.   private void logImbalancedNodes() {
  950.     StringBuilder msg = new StringBuilder();
  951.     msg.append(overUtilizedDatanodes.size());
  952.     msg.append(" over utilized nodes:");
  953.     for (Source node : overUtilizedDatanodes) {
  954.       msg.append( " " );
  955.       msg.append( node.getName() );
  956.     }
  957.     LOG.info(msg);
  958.     msg = new StringBuilder();
  959.     msg.append(underUtilizedDatanodes.size());
  960.     msg.append(" under utilized nodes: ");
  961.     for (BalancerDatanode node : underUtilizedDatanodes) {
  962.       msg.append( " " );
  963.       msg.append( node.getName() );
  964.     }
  965.     LOG.info(msg);
  966.   }
  967.   
  968.   /* Decide all <source, target> pairs and
  969.    * the number of bytes to move from a source to a target
  970.    * Maximum bytes to be moved per node is
  971.    * Min(1 Band worth of bytes,  MAX_SIZE_TO_MOVE).
  972.    * Return total number of bytes to move in this iteration
  973.    */
  974.   private long chooseNodes() {
  975.     // Match nodes on the same rack first
  976.     chooseNodes(true);
  977.     // Then match nodes on different racks
  978.     chooseNodes(false);
  979.     
  980.     assert (datanodes.size() == 
  981.       overUtilizedDatanodes.size()+underUtilizedDatanodes.size()+
  982.       aboveAvgUtilizedDatanodes.size()+belowAvgUtilizedDatanodes.size()+
  983.       sources.size()+targets.size())
  984.       : "Mismatched number of datanodes";
  985.     long bytesToMove = 0L;
  986.     for (Source src : sources) {
  987.       bytesToMove += src.scheduledSize;
  988.     }
  989.     return bytesToMove;
  990.   }
  991.   /* if onRack is true, decide all <source, target> pairs
  992.    * where source and target are on the same rack; Otherwise
  993.    * decide all <source, target> pairs where source and target are
  994.    * on different racks
  995.    */
  996.   private void chooseNodes(boolean onRack) {
  997.     /* first step: match each overUtilized datanode (source) to
  998.      * one or more underUtilized datanodes (targets).
  999.      */
  1000.     chooseTargets(underUtilizedDatanodes.iterator(), onRack);
  1001.     
  1002.     /* match each remaining overutilized datanode (source) to 
  1003.      * below average utilized datanodes (targets).
  1004.      * Note only overutilized datanodes that haven't had that max bytes to move
  1005.      * satisfied in step 1 are selected
  1006.      */
  1007.     chooseTargets(belowAvgUtilizedDatanodes.iterator(), onRack);
  1008.     /* match each remaining underutilized datanode to 
  1009.      * above average utilized datanodes.
  1010.      * Note only underutilized datanodes that have not had that max bytes to
  1011.      * move satisfied in step 1 are selected.
  1012.      */
  1013.     chooseSources(aboveAvgUtilizedDatanodes.iterator(), onRack);
  1014.   }
  1015.    
  1016.   /* choose targets from the target candidate list for each over utilized
  1017.    * source datanode. OnRackTarget determines if the chosen target 
  1018.    * should be on the same rack as the source
  1019.    */
  1020.   private void chooseTargets(  
  1021.       Iterator<BalancerDatanode> targetCandidates, boolean onRackTarget ) {
  1022.     for (Iterator<Source> srcIterator = overUtilizedDatanodes.iterator();
  1023.         srcIterator.hasNext();) {
  1024.       Source source = srcIterator.next();
  1025.       while (chooseTarget(source, targetCandidates, onRackTarget)) {
  1026.       }
  1027.       if (!source.isMoveQuotaFull()) {
  1028.         srcIterator.remove();
  1029.       }
  1030.     }
  1031.     return;
  1032.   }
  1033.   
  1034.   /* choose sources from the source candidate list for each under utilized
  1035.    * target datanode. onRackSource determines if the chosen source 
  1036.    * should be on the same rack as the target
  1037.    */
  1038.   private void chooseSources(
  1039.       Iterator<Source> sourceCandidates, boolean onRackSource) {
  1040.     for (Iterator<BalancerDatanode> targetIterator = 
  1041.       underUtilizedDatanodes.iterator(); targetIterator.hasNext();) {
  1042.       BalancerDatanode target = targetIterator.next();
  1043.       while (chooseSource(target, sourceCandidates, onRackSource)) {
  1044.       }
  1045.       if (!target.isMoveQuotaFull()) {
  1046.         targetIterator.remove();
  1047.       }
  1048.     }
  1049.     return;
  1050.   }
  1051.   /* For the given source, choose targets from the target candidate list.
  1052.    * OnRackTarget determines if the chosen target 
  1053.    * should be on the same rack as the source
  1054.    */
  1055.   private boolean chooseTarget(Source source,
  1056.       Iterator<BalancerDatanode> targetCandidates, boolean onRackTarget) {
  1057.     if (!source.isMoveQuotaFull()) {
  1058.       return false;
  1059.     }
  1060.     boolean foundTarget = false;
  1061.     BalancerDatanode target = null;
  1062.     while (!foundTarget && targetCandidates.hasNext()) {
  1063.       target = targetCandidates.next();
  1064.       if (!target.isMoveQuotaFull()) {
  1065.         targetCandidates.remove();
  1066.         continue;
  1067.       }
  1068.       if (onRackTarget) {
  1069.         // choose from on-rack nodes
  1070.         if (cluster.isOnSameRack(source.datanode, target.datanode)) {
  1071.           foundTarget = true;
  1072.         }
  1073.       } else {
  1074.         // choose from off-rack nodes
  1075.         if (!cluster.isOnSameRack(source.datanode, target.datanode)) {
  1076.           foundTarget = true;
  1077.         }
  1078.       }
  1079.     }
  1080.     if (foundTarget) {
  1081.       assert(target != null):"Choose a null target";
  1082.       long size = Math.min(source.availableSizeToMove(),
  1083.           target.availableSizeToMove());
  1084.       NodeTask nodeTask = new NodeTask(target, size);
  1085.       source.addNodeTask(nodeTask);
  1086.       target.incScheduledSize(nodeTask.getSize());
  1087.       sources.add(source);
  1088.       targets.add(target);
  1089.       if (!target.isMoveQuotaFull()) {
  1090.         targetCandidates.remove();
  1091.       }
  1092.       LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from "
  1093.           +source.datanode.getName() + " to " + target.datanode.getName());
  1094.       return true;
  1095.     }
  1096.     return false;
  1097.   }
  1098.   
  1099.   /* For the given target, choose sources from the source candidate list.
  1100.    * OnRackSource determines if the chosen source 
  1101.    * should be on the same rack as the target
  1102.    */
  1103.   private boolean chooseSource(BalancerDatanode target,
  1104.       Iterator<Source> sourceCandidates, boolean onRackSource) {
  1105.     if (!target.isMoveQuotaFull()) {
  1106.       return false;
  1107.     }
  1108.     boolean foundSource = false;
  1109.     Source source = null;
  1110.     while (!foundSource && sourceCandidates.hasNext()) {
  1111.       source = sourceCandidates.next();
  1112.       if (!source.isMoveQuotaFull()) {
  1113.         sourceCandidates.remove();
  1114.         continue;
  1115.       }
  1116.       if (onRackSource) {
  1117.         // choose from on-rack nodes
  1118.         if ( cluster.isOnSameRack(source.getDatanode(), target.getDatanode())) {
  1119.           foundSource = true;
  1120.         }
  1121.       } else {
  1122.         // choose from off-rack nodes
  1123.         if (!cluster.isOnSameRack(source.datanode, target.datanode)) {
  1124.           foundSource = true;
  1125.         }
  1126.       }
  1127.     }
  1128.     if (foundSource) {
  1129.       assert(source != null):"Choose a null source";
  1130.       long size = Math.min(source.availableSizeToMove(),
  1131.           target.availableSizeToMove());
  1132.       NodeTask nodeTask = new NodeTask(target, size);
  1133.       source.addNodeTask(nodeTask);
  1134.       target.incScheduledSize(nodeTask.getSize());
  1135.       sources.add(source);
  1136.       targets.add(target);
  1137.       if ( !source.isMoveQuotaFull()) {
  1138.         sourceCandidates.remove();
  1139.       }
  1140.       LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from "
  1141.           +source.datanode.getName() + " to " + target.datanode.getName());
  1142.       return true;
  1143.     }
  1144.     return false;
  1145.   }
  1146.   private static class BytesMoved {
  1147.     private long bytesMoved = 0L;;
  1148.     private synchronized void inc( long bytes ) {
  1149.       bytesMoved += bytes;
  1150.     }
  1151.     private long get() {
  1152.       return bytesMoved;
  1153.     }
  1154.   };
  1155.   private BytesMoved bytesMoved = new BytesMoved();
  1156.   private int notChangedIterations = 0;
  1157.   
  1158.   /* Start a thread to dispatch block moves for each source. 
  1159.    * The thread selects blocks to move & sends request to proxy source to
  1160.    * initiate block move. The process is flow controlled. Block selection is
  1161.    * blocked if there are too many un-confirmed block moves.
  1162.    * Return the total number of bytes successfully moved in this iteration.
  1163.    */
  1164.   private long dispatchBlockMoves() throws InterruptedException {
  1165.     long bytesLastMoved = bytesMoved.get();
  1166.     Future<?>[] futures = new Future<?>[sources.size()];
  1167.     int i=0;
  1168.     for (Source source : sources) {
  1169.       futures[i++] = dispatcherExecutor.submit(source.new BlockMoveDispatcher());
  1170.     }
  1171.     
  1172.     // wait for all dispatcher threads to finish
  1173.     for (Future<?> future : futures) {
  1174.       try {
  1175.         future.get();
  1176.       } catch (ExecutionException e) {
  1177.         LOG.warn("Dispatcher thread failed", e.getCause());
  1178.       }
  1179.     }
  1180.     
  1181.     // wait for all block moving to be done
  1182.     waitForMoveCompletion();
  1183.     
  1184.     return bytesMoved.get()-bytesLastMoved;
  1185.   }
  1186.   
  1187.   // The sleeping period before checking if block move is completed again
  1188.   static private long blockMoveWaitTime = 30000L;
  1189.   
  1190.   /** set the sleeping period for block move completion check */
  1191.   static void setBlockMoveWaitTime(long time) {
  1192.     blockMoveWaitTime = time;
  1193.   }
  1194.   
  1195.   /* wait for all block move confirmations 
  1196.    * by checking each target's pendingMove queue 
  1197.    */
  1198.   private void waitForMoveCompletion() {
  1199.     boolean shouldWait;
  1200.     do {
  1201.       shouldWait = false;
  1202.       for (BalancerDatanode target : targets) {
  1203.         if (!target.isPendingQEmpty()) {
  1204.           shouldWait = true;
  1205.         }
  1206.       }
  1207.       if (shouldWait) {
  1208.         try {
  1209.           Thread.sleep(blockMoveWaitTime);
  1210.         } catch (InterruptedException ignored) {
  1211.         }
  1212.       }
  1213.     } while (shouldWait);
  1214.   }
  1215.   /** This window makes sure to keep blocks that have been moved within 1.5 hour.
  1216.    * Old window has blocks that are older;
  1217.    * Current window has blocks that are more recent;
  1218.    * Cleanup method triggers the check if blocks in the old window are
  1219.    * more than 1.5 hour old. If yes, purge the old window and then
  1220.    * move blocks in current window to old window.
  1221.    */ 
  1222.   private static class MovedBlocks {
  1223.     private long lastCleanupTime = System.currentTimeMillis();
  1224.     private static long winWidth = 5400*1000L; // 1.5 hour
  1225.     final private static int CUR_WIN = 0;
  1226.     final private static int OLD_WIN = 1;
  1227.     final private static int NUM_WINS = 2;
  1228.     final private List<HashMap<Block, BalancerBlock>> movedBlocks = 
  1229.       new ArrayList<HashMap<Block, BalancerBlock>>(NUM_WINS);
  1230.     
  1231.     /* initialize the moved blocks collection */
  1232.     private MovedBlocks() {
  1233.       movedBlocks.add(new HashMap<Block,BalancerBlock>());
  1234.       movedBlocks.add(new HashMap<Block,BalancerBlock>());
  1235.     }
  1236.     /* set the win width */
  1237.     private void setWinWidth(Configuration conf) {
  1238.       winWidth = conf.getLong(
  1239.           "dfs.balancer.movedWinWidth", 5400*1000L);
  1240.     }
  1241.     
  1242.     /* add a block thus marking a block to be moved */
  1243.     synchronized private void add(BalancerBlock block) {
  1244.       movedBlocks.get(CUR_WIN).put(block.getBlock(), block);
  1245.     }
  1246.     /* check if a block is marked as moved */
  1247.     synchronized private boolean contains(BalancerBlock block) {
  1248.       return contains(block.getBlock());
  1249.     }
  1250.     /* check if a block is marked as moved */
  1251.     synchronized private boolean contains(Block block) {
  1252.       return movedBlocks.get(CUR_WIN).containsKey(block) ||
  1253.         movedBlocks.get(OLD_WIN).containsKey(block);
  1254.     }
  1255.     /* remove old blocks */
  1256.     synchronized private void cleanup() {
  1257.       long curTime = System.currentTimeMillis();
  1258.       // check if old win is older than winWidth
  1259.       if (lastCleanupTime + winWidth <= curTime) {
  1260.         // purge the old window
  1261.         movedBlocks.set(OLD_WIN, movedBlocks.get(CUR_WIN));
  1262.         movedBlocks.set(CUR_WIN, new HashMap<Block, BalancerBlock>());
  1263.         lastCleanupTime = curTime;
  1264.       }
  1265.     }
  1266.   }
  1267.   /* Decide if it is OK to move the given block from source to target
  1268.    * A block is a good candidate if
  1269.    * 1. the block is not in the process of being moved/has not been moved;
  1270.    * 2. the block does not have a replica on the target;
  1271.    * 3. doing the move does not reduce the number of racks that the block has
  1272.    */
  1273.   private boolean isGoodBlockCandidate(Source source, 
  1274.       BalancerDatanode target, BalancerBlock block) {
  1275.     // check if the block is moved or not
  1276.     if (movedBlocks.contains(block)) {
  1277.         return false;
  1278.     }
  1279.     if (block.isLocatedOnDatanode(target)) {
  1280.       return false;
  1281.     }
  1282.     boolean goodBlock = false;
  1283.     if (cluster.isOnSameRack(source.getDatanode(), target.getDatanode())) {
  1284.       // good if source and target are on the same rack
  1285.       goodBlock = true;
  1286.     } else {
  1287.       boolean notOnSameRack = true;
  1288.       synchronized (block) {
  1289.         for (BalancerDatanode loc : block.locations) {
  1290.           if (cluster.isOnSameRack(loc.datanode, target.datanode)) {
  1291.             notOnSameRack = false;
  1292.             break;
  1293.           }
  1294.         }
  1295.       }
  1296.       if (notOnSameRack) {
  1297.         // good if target is target is not on the same rack as any replica
  1298.         goodBlock = true;
  1299.       } else {
  1300.         // good if source is on the same rack as on of the replicas
  1301.         for (BalancerDatanode loc : block.locations) {
  1302.           if (loc != source && 
  1303.               cluster.isOnSameRack(loc.datanode, source.datanode)) {
  1304.             goodBlock = true;
  1305.             break;
  1306.           }
  1307.         }
  1308.       }
  1309.     }
  1310.     return goodBlock;
  1311.   }
  1312.   
  1313.   /* reset all fields in a balancer preparing for the next iteration */
  1314.   private void resetData() {
  1315.     this.cluster = new NetworkTopology();
  1316.     this.overUtilizedDatanodes.clear();
  1317.     this.aboveAvgUtilizedDatanodes.clear();
  1318.     this.belowAvgUtilizedDatanodes.clear();
  1319.     this.underUtilizedDatanodes.clear();
  1320.     this.datanodes.clear();
  1321.     this.sources.clear();
  1322.     this.targets.clear();  
  1323.     this.avgUtilization = 0.0D;
  1324.     cleanGlobalBlockList();
  1325.     this.movedBlocks.cleanup();
  1326.   }
  1327.   
  1328.   /* Remove all blocks from the global block list except for the ones in the
  1329.    * moved list.
  1330.    */
  1331.   private void cleanGlobalBlockList() {
  1332.     for (Iterator<Block> globalBlockListIterator=globalBlockList.keySet().iterator();
  1333.     globalBlockListIterator.hasNext();) {
  1334.       Block block = globalBlockListIterator.next();
  1335.       if(!movedBlocks.contains(block)) {
  1336.         globalBlockListIterator.remove();
  1337.       }
  1338.     }
  1339.   }
  1340.   
  1341.   /* Return true if the given datanode is overUtilized */
  1342.   private boolean isOverUtilized(BalancerDatanode datanode) {
  1343.     return datanode.utilization > (avgUtilization+threshold);
  1344.   }
  1345.   
  1346.   /* Return true if the given datanode is above average utilized
  1347.    * but not overUtilized */
  1348.   private boolean isAboveAvgUtilized(BalancerDatanode datanode) {
  1349.     return (datanode.utilization <= (avgUtilization+threshold))
  1350.         && (datanode.utilization > avgUtilization);
  1351.   }
  1352.   
  1353.   /* Return true if the given datanode is underUtilized */
  1354.   private boolean isUnderUtilized(BalancerDatanode datanode) {
  1355.     return datanode.utilization < (avgUtilization-threshold);
  1356.   }
  1357.   /* Return true if the given datanode is below average utilized 
  1358.    * but not underUtilized */
  1359.   private boolean isBelowAvgUtilized(BalancerDatanode datanode) {
  1360.         return (datanode.utilization >= (avgUtilization-threshold))
  1361.                  && (datanode.utilization < avgUtilization);
  1362.   }
  1363.   // Exit status
  1364.   final public static int SUCCESS = 1;
  1365.   final public static int ALREADY_RUNNING = -1;
  1366.   final public static int NO_MOVE_BLOCK = -2;
  1367.   final public static int NO_MOVE_PROGRESS = -3;
  1368.   final public static int IO_EXCEPTION = -4;
  1369.   final public static int ILLEGAL_ARGS = -5;
  1370.   /** main method of Balancer
  1371.    * @param args arguments to a Balancer
  1372.    * @exception any exception occurs during datanode balancing
  1373.    */
  1374.   public int run(String[] args) throws Exception {
  1375.     long startTime = Util.now();
  1376.     OutputStream out = null;
  1377.     try {
  1378.       // initialize a balancer
  1379.       init(parseArgs(args));
  1380.       
  1381.       /* Check if there is another balancer running.
  1382.        * Exit if there is another one running.
  1383.        */
  1384.       out = checkAndMarkRunningBalancer(); 
  1385.       if (out == null) {
  1386.         System.out.println("Another balancer is running. Exiting...");
  1387.         return ALREADY_RUNNING;
  1388.       }
  1389.       Formatter formatter = new Formatter(System.out);
  1390.       System.out.println("Time Stamp               Iteration#  Bytes Already Moved  Bytes Left To Move  Bytes Being Moved");
  1391.       int iterations = 0;
  1392.       while (true ) {
  1393.         /* get all live datanodes of a cluster and their disk usage
  1394.          * decide the number of bytes need to be moved
  1395.          */
  1396.         long bytesLeftToMove = initNodes();
  1397.         if (bytesLeftToMove == 0) {
  1398.           System.out.println("The cluster is balanced. Exiting...");
  1399.           return SUCCESS;
  1400.         } else {
  1401.           LOG.info( "Need to move "+ StringUtils.byteDesc(bytesLeftToMove)
  1402.               +" bytes to make the cluster balanced." );
  1403.         }
  1404.         
  1405.         /* Decide all the nodes that will participate in the block move and
  1406.          * the number of bytes that need to be moved from one node to another
  1407.          * in this iteration. Maximum bytes to be moved per node is
  1408.          * Min(1 Band worth of bytes,  MAX_SIZE_TO_MOVE).
  1409.          */
  1410.         long bytesToMove = chooseNodes();
  1411.         if (bytesToMove == 0) {
  1412.           System.out.println("No block can be moved. Exiting...");
  1413.           return NO_MOVE_BLOCK;
  1414.         } else {
  1415.           LOG.info( "Will move " + StringUtils.byteDesc(bytesToMove) +
  1416.               "bytes in this iteration");
  1417.         }
  1418.    
  1419.         formatter.format("%-24s %10d  %19s  %18s  %17sn", 
  1420.             DateFormat.getDateTimeInstance().format(new Date()),
  1421.             iterations,
  1422.             StringUtils.byteDesc(bytesMoved.get()),
  1423.             StringUtils.byteDesc(bytesLeftToMove),
  1424.             StringUtils.byteDesc(bytesToMove)
  1425.             );
  1426.         
  1427.         /* For each pair of <source, target>, start a thread that repeatedly 
  1428.          * decide a block to be moved and its proxy source, 
  1429.          * then initiates the move until all bytes are moved or no more block
  1430.          * available to move.
  1431.          * Exit no byte has been moved for 5 consecutive iterations.
  1432.          */
  1433.         if (dispatchBlockMoves() > 0) {
  1434.           notChangedIterations = 0;
  1435.         } else {
  1436.           notChangedIterations++;
  1437.           if (notChangedIterations >= 5) {
  1438.             System.out.println(
  1439.                 "No block has been moved for 5 iterations. Exiting...");
  1440.             return NO_MOVE_PROGRESS;
  1441.           }
  1442.         }
  1443.         // clean all lists
  1444.         resetData();
  1445.         
  1446.         try {
  1447.           Thread.sleep(2*conf.getLong("dfs.heartbeat.interval", 3));
  1448.         } catch (InterruptedException ignored) {
  1449.         }
  1450.         
  1451.         iterations++;
  1452.       }
  1453.     } catch (IllegalArgumentException ae) {
  1454.       return ILLEGAL_ARGS;
  1455.     } catch (IOException e) {
  1456.       System.out.println("Received an IO exception: " + e.getMessage() +
  1457.           " . Exiting...");
  1458.       return IO_EXCEPTION;
  1459.     } finally {
  1460.       // shutdown thread pools
  1461.       dispatcherExecutor.shutdownNow();
  1462.       moverExecutor.shutdownNow();
  1463.       // close the output file
  1464.       IOUtils.closeStream(out); 
  1465.       if (fs != null) {
  1466.         try {
  1467.           fs.delete(BALANCER_ID_PATH, true);
  1468.         } catch(IOException ignored) {
  1469.         }
  1470.       }
  1471.       System.out.println("Balancing took " + 
  1472.           time2Str(Util.now()-startTime));
  1473.     }
  1474.   }
  1475.   private Path BALANCER_ID_PATH = new Path("/system/balancer.id");
  1476.   /* The idea for making sure that there is no more than one balancer
  1477.    * running in an HDFS is to create a file in the HDFS, writes the IP address
  1478.    * of the machine on which the balancer is running to the file, but did not
  1479.    * close the file until the balancer exits. 
  1480.    * This prevents the second balancer from running because it can not
  1481.    * creates the file while the first one is running.
  1482.    * 
  1483.    * This method checks if there is any running balancer and 
  1484.    * if no, mark yes if no.
  1485.    * Note that this is an atomic operation.
  1486.    * 
  1487.    * Return null if there is a running balancer; otherwise the output stream
  1488.    * to the newly created file.
  1489.    */
  1490.   private OutputStream checkAndMarkRunningBalancer() throws IOException {
  1491.     try {
  1492.       DataOutputStream out = fs.create(BALANCER_ID_PATH);
  1493.       out. writeBytes(InetAddress.getLocalHost().getHostName());
  1494.       out.flush();
  1495.       return out;
  1496.     } catch(RemoteException e) {
  1497.       if(AlreadyBeingCreatedException.class.getName().equals(e.getClassName())){
  1498.         return null;
  1499.       } else {
  1500.         throw e;
  1501.       }
  1502.     }
  1503.   }
  1504.   
  1505.   /* Given elaspedTime in ms, return a printable string */
  1506.   private static String time2Str(long elapsedTime) {
  1507.     String unit;
  1508.     double time = elapsedTime;
  1509.     if (elapsedTime < 1000) {
  1510.       unit = "milliseconds";
  1511.     } else if (elapsedTime < 60*1000) {
  1512.       unit = "seconds";
  1513.       time = time/1000;
  1514.     } else if (elapsedTime < 3600*1000) {
  1515.       unit = "minutes";
  1516.       time = time/(60*1000);
  1517.     } else {
  1518.       unit = "hours";
  1519.       time = time/(3600*1000);
  1520.     }
  1521.     return time+" "+unit;
  1522.   }
  1523.   /** return this balancer's configuration */
  1524.   public Configuration getConf() {
  1525.     return conf;
  1526.   }
  1527.   /** set this balancer's configuration */
  1528.   public void setConf(Configuration conf) {
  1529.     this.conf = conf;
  1530.     movedBlocks.setWinWidth(conf);
  1531.   }
  1532. }