TestBalancer.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:10k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfs.server.balancer;
  19. import java.io.IOException;
  20. import java.util.ArrayList;
  21. import java.util.List;
  22. import java.util.Random;
  23. import org.apache.hadoop.conf.Configuration;
  24. import org.apache.hadoop.hdfs.DFSClient;
  25. import org.apache.hadoop.hdfs.DFSTestUtil;
  26. import org.apache.hadoop.hdfs.MiniDFSCluster;
  27. import org.apache.hadoop.hdfs.protocol.Block;
  28. import org.apache.hadoop.hdfs.protocol.ClientProtocol;
  29. import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
  30. import org.apache.hadoop.hdfs.protocol.LocatedBlock;
  31. import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
  32. import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
  33. import org.apache.hadoop.fs.FileSystem;
  34. import org.apache.hadoop.fs.Path;
  35. import junit.framework.TestCase;
  36. /**
  37.  * This class tests if a balancer schedules tasks correctly.
  38.  */
  39. public class TestBalancer extends TestCase {
  40.   private static final Configuration CONF = new Configuration();
  41.   final private static long CAPACITY = 500L;
  42.   final private static String RACK0 = "/rack0";
  43.   final private static String RACK1 = "/rack1";
  44.   final private static String RACK2 = "/rack2";
  45.   final static private String fileName = "/tmp.txt";
  46.   final static private Path filePath = new Path(fileName);
  47.   private MiniDFSCluster cluster;
  48.   ClientProtocol client;
  49.   static final int DEFAULT_BLOCK_SIZE = 10;
  50.   private Balancer balancer;
  51.   private Random r = new Random();
  52.   static {
  53.     CONF.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
  54.     CONF.setInt("io.bytes.per.checksum", DEFAULT_BLOCK_SIZE);
  55.     CONF.setLong("dfs.heartbeat.interval", 1L);
  56.     CONF.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
  57.     CONF.setLong("dfs.balancer.movedWinWidth", 2000L);
  58.     Balancer.setBlockMoveWaitTime(1000L) ;
  59.   }
  60.   /* create a file with a length of <code>fileLen</code> */
  61.   private void createFile(long fileLen, short replicationFactor)
  62.   throws IOException {
  63.     FileSystem fs = cluster.getFileSystem();
  64.     DFSTestUtil.createFile(fs, filePath, fileLen, 
  65.         replicationFactor, r.nextLong());
  66.     DFSTestUtil.waitReplication(fs, filePath, replicationFactor);
  67.   }
  68.   /* fill up a cluster with <code>numNodes</code> datanodes 
  69.    * whose used space to be <code>size</code>
  70.    */
  71.   private Block[] generateBlocks(long size, short numNodes) throws IOException {
  72.     cluster = new MiniDFSCluster( CONF, numNodes, true, null);
  73.     try {
  74.       cluster.waitActive();
  75.       client = DFSClient.createNamenode(CONF);
  76.       short replicationFactor = (short)(numNodes-1);
  77.       long fileLen = size/replicationFactor;
  78.       createFile(fileLen, replicationFactor);
  79.       List<LocatedBlock> locatedBlocks = client.
  80.       getBlockLocations(fileName, 0, fileLen).getLocatedBlocks();
  81.       int numOfBlocks = locatedBlocks.size();
  82.       Block[] blocks = new Block[numOfBlocks];
  83.       for(int i=0; i<numOfBlocks; i++) {
  84.         Block b = locatedBlocks.get(i).getBlock();
  85.         blocks[i] = new Block(b.getBlockId(), b.getNumBytes(), b.getGenerationStamp());
  86.       }
  87.       return blocks;
  88.     } finally {
  89.       cluster.shutdown();
  90.     }
  91.   }
  92.   /* Distribute all blocks according to the given distribution */
  93.   Block[][] distributeBlocks(Block[] blocks, short replicationFactor, 
  94.       final long[] distribution ) {
  95.     // make a copy
  96.     long[] usedSpace = new long[distribution.length];
  97.     System.arraycopy(distribution, 0, usedSpace, 0, distribution.length);
  98.     List<List<Block>> blockReports = 
  99.       new ArrayList<List<Block>>(usedSpace.length);
  100.     Block[][] results = new Block[usedSpace.length][];
  101.     for(int i=0; i<usedSpace.length; i++) {
  102.       blockReports.add(new ArrayList<Block>());
  103.     }
  104.     for(int i=0; i<blocks.length; i++) {
  105.       for(int j=0; j<replicationFactor; j++) {
  106.         boolean notChosen = true;
  107.         while(notChosen) {
  108.           int chosenIndex = r.nextInt(usedSpace.length);
  109.           if( usedSpace[chosenIndex]>0 ) {
  110.             notChosen = false;
  111.             blockReports.get(chosenIndex).add(blocks[i]);
  112.             usedSpace[chosenIndex] -= blocks[i].getNumBytes();
  113.           }
  114.         }
  115.       }
  116.     }
  117.     for(int i=0; i<usedSpace.length; i++) {
  118.       List<Block> nodeBlockList = blockReports.get(i);
  119.       results[i] = nodeBlockList.toArray(new Block[nodeBlockList.size()]);
  120.     }
  121.     return results;
  122.   }
  123.   /* we first start a cluster and fill the cluster up to a certain size.
  124.    * then redistribute blocks according the required distribution.
  125.    * Afterwards a balancer is running to balance the cluster.
  126.    */
  127.   private void testUnevenDistribution(
  128.       long distribution[], long capacities[], String[] racks) throws Exception {
  129.     int numDatanodes = distribution.length;
  130.     if (capacities.length != numDatanodes || racks.length != numDatanodes) {
  131.       throw new IllegalArgumentException("Array length is not the same");
  132.     }
  133.     // calculate total space that need to be filled
  134.     long totalUsedSpace=0L;
  135.     for(int i=0; i<distribution.length; i++) {
  136.       totalUsedSpace += distribution[i];
  137.     }
  138.     // fill the cluster
  139.     Block[] blocks = generateBlocks(totalUsedSpace, (short)numDatanodes);
  140.     // redistribute blocks
  141.     Block[][] blocksDN = distributeBlocks(
  142.         blocks, (short)(numDatanodes-1), distribution);
  143.     // restart the cluster: do NOT format the cluster
  144.     CONF.set("dfs.safemode.threshold.pct", "0.0f"); 
  145.     cluster = new MiniDFSCluster(0, CONF, numDatanodes,
  146.         false, true, null, racks, capacities);
  147.     cluster.waitActive();
  148.     client = DFSClient.createNamenode(CONF);
  149.     cluster.injectBlocks(blocksDN);
  150.     long totalCapacity = 0L;
  151.     for(long capacity:capacities) {
  152.       totalCapacity += capacity;
  153.     }
  154.     runBalancer(totalUsedSpace, totalCapacity);
  155.   }
  156.   /* wait for one heartbeat */
  157.   private void waitForHeartBeat( long expectedUsedSpace, long expectedTotalSpace )
  158.   throws IOException {
  159.     long[] status = client.getStats();
  160.     while(status[0] != expectedTotalSpace || status[1] != expectedUsedSpace ) {
  161.       try {
  162.         Thread.sleep(100L);
  163.       } catch(InterruptedException ignored) {
  164.       }
  165.       status = client.getStats();
  166.     }
  167.   }
  168.   /* This test start a one-node cluster, fill the node to be 30% full;
  169.    * It then adds an empty node and start balancing.
  170.    * @param newCapacity new node's capacity
  171.    * @param new 
  172.    */
  173.   private void test(long[] capacities, String[] racks, 
  174.       long newCapacity, String newRack) throws Exception {
  175.     int numOfDatanodes = capacities.length;
  176.     assertEquals(numOfDatanodes, racks.length);
  177.     cluster = new MiniDFSCluster(0, CONF, capacities.length, true, true, null, 
  178.         racks, capacities);
  179.     try {
  180.       cluster.waitActive();
  181.       client = DFSClient.createNamenode(CONF);
  182.       long totalCapacity=0L;
  183.       for(long capacity:capacities) {
  184.         totalCapacity += capacity;
  185.       }
  186.       // fill up the cluster to be 30% full
  187.       long totalUsedSpace = totalCapacity*3/10;
  188.       createFile(totalUsedSpace/numOfDatanodes, (short)numOfDatanodes);
  189.       // start up an empty node with the same capacity and on the same rack
  190.       cluster.startDataNodes(CONF, 1, true, null,
  191.           new String[]{newRack}, new long[]{newCapacity});
  192.       totalCapacity += newCapacity;
  193.       // run balancer and validate results
  194.       runBalancer(totalUsedSpace, totalCapacity);
  195.     } finally {
  196.       cluster.shutdown();
  197.     }
  198.   }
  199.   /* Start balancer and check if the cluster is balanced after the run */
  200.   private void runBalancer( long totalUsedSpace, long totalCapacity )
  201.   throws Exception {
  202.     waitForHeartBeat(totalUsedSpace, totalCapacity);
  203.     // start rebalancing
  204.     balancer = new Balancer(CONF);
  205.     balancer.run(new String[0]);
  206.     waitForHeartBeat(totalUsedSpace, totalCapacity);
  207.     boolean balanced;
  208.     do {
  209.       DatanodeInfo[] datanodeReport = 
  210.         client.getDatanodeReport(DatanodeReportType.ALL);
  211.       assertEquals(datanodeReport.length, cluster.getDataNodes().size());
  212.       balanced = true;
  213.       double avgUtilization = ((double)totalUsedSpace)/totalCapacity*100;
  214.       for(DatanodeInfo datanode:datanodeReport) {
  215.         if(Math.abs(avgUtilization-
  216.             ((double)datanode.getDfsUsed())/datanode.getCapacity()*100)>10) {
  217.           balanced = false;
  218.           try {
  219.             Thread.sleep(100);
  220.           } catch(InterruptedException ignored) {
  221.           }
  222.           break;
  223.         }
  224.       }
  225.     } while(!balanced);
  226.   }
  227.   /** Test a cluster with even distribution, 
  228.    * then a new empty node is added to the cluster*/
  229.   public void testBalancer0() throws Exception {
  230.     /** one-node cluster test*/
  231.     // add an empty node with half of the CAPACITY & the same rack
  232.     test(new long[]{CAPACITY}, new String[]{RACK0}, CAPACITY/2, RACK0);
  233.     /** two-node cluster test */
  234.     test(new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
  235.         CAPACITY, RACK2);
  236.   }
  237.   /** Test unevenly distributed cluster */
  238.   public void testBalancer1() throws Exception {
  239.     testUnevenDistribution(
  240.         new long[] {50*CAPACITY/100, 10*CAPACITY/100},
  241.         new long[]{CAPACITY, CAPACITY},
  242.         new String[] {RACK0, RACK1});
  243.   }
  244.   /**
  245.    * @param args
  246.    */
  247.   public static void main(String[] args) throws Exception {
  248.     TestBalancer balancerTest = new TestBalancer();
  249.     balancerTest.testBalancer0();
  250.     balancerTest.testBalancer1();
  251.   }
  252. }