TestBlockReplacement.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:9k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfs.server.datanode;
  19. import java.io.DataInputStream;
  20. import java.io.DataOutputStream;
  21. import java.io.IOException;
  22. import java.net.InetSocketAddress;
  23. import java.net.Socket;
  24. import java.util.ArrayList;
  25. import java.util.Arrays;
  26. import java.util.List;
  27. import java.util.Random;
  28. import junit.framework.TestCase;
  29. import org.apache.commons.logging.Log;
  30. import org.apache.commons.logging.LogFactory;
  31. import org.apache.hadoop.conf.Configuration;
  32. import org.apache.hadoop.fs.FileSystem;
  33. import org.apache.hadoop.fs.Path;
  34. import org.apache.hadoop.hdfs.DFSClient;
  35. import org.apache.hadoop.hdfs.DFSTestUtil;
  36. import org.apache.hadoop.hdfs.MiniDFSCluster;
  37. import org.apache.hadoop.hdfs.protocol.Block;
  38. import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
  39. import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
  40. import org.apache.hadoop.hdfs.protocol.LocatedBlock;
  41. import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
  42. import org.apache.hadoop.hdfs.server.common.HdfsConstants;
  43. import org.apache.hadoop.hdfs.server.common.Util;
  44. import org.apache.hadoop.hdfs.server.datanode.BlockTransferThrottler;
  45. import org.apache.hadoop.io.Text;
  46. import org.apache.hadoop.net.NetUtils;
  47. /**
  48.  * This class tests if block replacement request to data nodes work correctly.
  49.  */
  50. public class TestBlockReplacement extends TestCase {
  51.   private static final Log LOG = LogFactory.getLog(
  52.   "org.apache.hadoop.hdfs.TestBlockReplacement");
  53.   MiniDFSCluster cluster;
  54.   public void testThrottler() throws IOException {
  55.     Configuration conf = new Configuration();
  56.     FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
  57.     long bandwidthPerSec = 1024*1024L;
  58.     final long TOTAL_BYTES =6*bandwidthPerSec; 
  59.     long bytesToSend = TOTAL_BYTES; 
  60.     long start = Util.now();
  61.     BlockTransferThrottler throttler = new BlockTransferThrottler(bandwidthPerSec);
  62.     long totalBytes = 0L;
  63.     long bytesSent = 1024*512L; // 0.5MB
  64.     throttler.throttle(bytesSent);
  65.     bytesToSend -= bytesSent;
  66.     bytesSent = 1024*768L; // 0.75MB
  67.     throttler.throttle(bytesSent);
  68.     bytesToSend -= bytesSent;
  69.     try {
  70.       Thread.sleep(1000);
  71.     } catch (InterruptedException ignored) {}
  72.     throttler.throttle(bytesToSend);
  73.     long end = Util.now();
  74.     assertTrue(totalBytes*1000/(end-start)<=bandwidthPerSec);
  75.   }
  76.   
  77.   public void testBlockReplacement() throws IOException {
  78.     final Configuration CONF = new Configuration();
  79.     final String[] INITIAL_RACKS = {"/RACK0", "/RACK1", "/RACK2"};
  80.     final String[] NEW_RACKS = {"/RACK2"};
  81.     final short REPLICATION_FACTOR = (short)3;
  82.     final int DEFAULT_BLOCK_SIZE = 1024;
  83.     final Random r = new Random();
  84.     
  85.     CONF.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
  86.     CONF.setInt("io.bytes.per.checksum", DEFAULT_BLOCK_SIZE/2);
  87.     CONF.setLong("dfs.blockreport.intervalMsec",500);
  88.     cluster = new MiniDFSCluster(
  89.           CONF, REPLICATION_FACTOR, true, INITIAL_RACKS );
  90.     try {
  91.       cluster.waitActive();
  92.       
  93.       FileSystem fs = cluster.getFileSystem();
  94.       Path fileName = new Path("/tmp.txt");
  95.       
  96.       // create a file with one block
  97.       DFSTestUtil.createFile(fs, fileName,
  98.           DEFAULT_BLOCK_SIZE, REPLICATION_FACTOR, r.nextLong());
  99.       DFSTestUtil.waitReplication(fs,fileName, REPLICATION_FACTOR);
  100.       
  101.       // get all datanodes
  102.       InetSocketAddress addr = new InetSocketAddress("localhost",
  103.           cluster.getNameNodePort());
  104.       DFSClient client = new DFSClient(addr, CONF);
  105.       List<LocatedBlock> locatedBlocks = client.namenode.
  106.         getBlockLocations("/tmp.txt", 0, DEFAULT_BLOCK_SIZE).getLocatedBlocks();
  107.       assertEquals(1, locatedBlocks.size());
  108.       LocatedBlock block = locatedBlocks.get(0);
  109.       DatanodeInfo[]  oldNodes = block.getLocations();
  110.       assertEquals(oldNodes.length, 3);
  111.       Block b = block.getBlock();
  112.       
  113.       // add a new datanode to the cluster
  114.       cluster.startDataNodes(CONF, 1, true, null, NEW_RACKS);
  115.       cluster.waitActive();
  116.       
  117.       DatanodeInfo[] datanodes = client.datanodeReport(DatanodeReportType.ALL);
  118.       // find out the new node
  119.       DatanodeInfo newNode=null;
  120.       for(DatanodeInfo node:datanodes) {
  121.         Boolean isNewNode = true;
  122.         for(DatanodeInfo oldNode:oldNodes) {
  123.           if(node.equals(oldNode)) {
  124.             isNewNode = false;
  125.             break;
  126.           }
  127.         }
  128.         if(isNewNode) {
  129.           newNode = node;
  130.           break;
  131.         }
  132.       }
  133.       
  134.       assertTrue(newNode!=null);
  135.       DatanodeInfo source=null;
  136.       ArrayList<DatanodeInfo> proxies = new ArrayList<DatanodeInfo>(2);
  137.       for(DatanodeInfo node:datanodes) {
  138.         if(node != newNode) {
  139.           if( node.getNetworkLocation().equals(newNode.getNetworkLocation())) {
  140.             source = node;
  141.           } else {
  142.             proxies.add( node );
  143.           }
  144.         }
  145.       }
  146.       assertTrue(source!=null && proxies.size()==2);
  147.       
  148.       // start to replace the block
  149.       // case 1: proxySource does not contain the block
  150.       LOG.info("Testcase 1: Proxy " + newNode.getName() 
  151.           + " does not contain the block " + b.getBlockName() );
  152.       assertFalse(replaceBlock(b, source, newNode, proxies.get(0)));
  153.       // case 2: destination contains the block
  154.       LOG.info("Testcase 2: Destination " + proxies.get(1).getName() 
  155.           + " contains the block " + b.getBlockName() );
  156.       assertFalse(replaceBlock(b, source, proxies.get(0), proxies.get(1)));
  157.       // case 3: correct case
  158.       LOG.info("Testcase 3: Proxy=" + source.getName() + " source=" + 
  159.           proxies.get(0).getName() + " destination=" + newNode.getName() );
  160.       assertTrue(replaceBlock(b, source, proxies.get(0), newNode));
  161.       // block locations should contain two proxies and newNode
  162.       checkBlocks(new DatanodeInfo[]{newNode, proxies.get(0), proxies.get(1)},
  163.           fileName.toString(), 
  164.           DEFAULT_BLOCK_SIZE, REPLICATION_FACTOR, client);
  165.       // case 4: proxies.get(0) is not a valid del hint
  166.       LOG.info("Testcase 4: invalid del hint " + proxies.get(0).getName() );
  167.       assertTrue(replaceBlock(b, proxies.get(1), proxies.get(0), source));
  168.       /* block locations should contain two proxies,
  169.        * and either of source or newNode
  170.        */
  171.       checkBlocks(proxies.toArray(new DatanodeInfo[proxies.size()]), 
  172.           fileName.toString(), 
  173.           DEFAULT_BLOCK_SIZE, REPLICATION_FACTOR, client);
  174.     } finally {
  175.       cluster.shutdown();
  176.     }
  177.   }
  178.   
  179.   /* check if file's blocks exist at includeNodes */
  180.   private void checkBlocks(DatanodeInfo[] includeNodes, String fileName, 
  181.       long fileLen, short replFactor, DFSClient client) throws IOException {
  182.     Boolean notDone;
  183.     do {
  184.       try {
  185.         Thread.sleep(100);
  186.       } catch(InterruptedException e) {
  187.       }
  188.       List<LocatedBlock> blocks = client.namenode.
  189.       getBlockLocations(fileName, 0, fileLen).getLocatedBlocks();
  190.       assertEquals(1, blocks.size());
  191.       DatanodeInfo[] nodes = blocks.get(0).getLocations();
  192.       notDone = (nodes.length != replFactor);
  193.       if (notDone) {
  194.         LOG.info("Expected replication factor is " + replFactor +
  195.             " but the real replication factor is " + nodes.length );
  196.       } else {
  197.         List<DatanodeInfo> nodeLocations = Arrays.asList(nodes);
  198.         for (DatanodeInfo node : includeNodes) {
  199.           if (!nodeLocations.contains(node) ) {
  200.             notDone=true; 
  201.             LOG.info("Block is not located at " + node.getName() );
  202.             break;
  203.           }
  204.         }
  205.       }
  206.     } while(notDone);
  207.   }
  208.   /* Copy a block from sourceProxy to destination. If the block becomes
  209.    * over-replicated, preferably remove it from source.
  210.    * 
  211.    * Return true if a block is successfully copied; otherwise false.
  212.    */
  213.   private boolean replaceBlock( Block block, DatanodeInfo source,
  214.       DatanodeInfo sourceProxy, DatanodeInfo destination) throws IOException {
  215.     Socket sock = new Socket();
  216.     sock.connect(NetUtils.createSocketAddr(
  217.         destination.getName()), HdfsConstants.READ_TIMEOUT);
  218.     sock.setKeepAlive(true);
  219.     // sendRequest
  220.     DataOutputStream out = new DataOutputStream(sock.getOutputStream());
  221.     out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
  222.     out.writeByte(DataTransferProtocol.OP_REPLACE_BLOCK);
  223.     out.writeLong(block.getBlockId());
  224.     out.writeLong(block.getGenerationStamp());
  225.     Text.writeString(out, source.getStorageID());
  226.     sourceProxy.write(out);
  227.     out.flush();
  228.     // receiveResponse
  229.     DataInputStream reply = new DataInputStream(sock.getInputStream());
  230.     short status = reply.readShort();
  231.     if(status == DataTransferProtocol.OP_STATUS_SUCCESS) {
  232.       return true;
  233.     }
  234.     return false;
  235.   }
  236.   /**
  237.    * @param args
  238.    */
  239.   public static void main(String[] args) throws Exception {
  240.     (new TestBlockReplacement()).testBlockReplacement();
  241.   }
  242. }