TestHeartbeatHandling.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:4k
源码类别:

网格计算

开发平台:

Java

  1. package org.apache.hadoop.hdfs.server.namenode;
  2. import java.util.ArrayList;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.hdfs.MiniDFSCluster;
  5. import org.apache.hadoop.hdfs.protocol.Block;
  6. import org.apache.hadoop.hdfs.server.common.GenerationStamp;
  7. import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
  8. import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
  9. import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
  10. import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
  11. import junit.framework.TestCase;
  12. /**
  13.  * Test if FSNamesystem handles heartbeat right
  14.  */
  15. public class TestHeartbeatHandling extends TestCase {
  16.   /**
  17.    * Test if {@link FSNamesystem#handleHeartbeat(DatanodeRegistration, long, long, long, int, int)}
  18.    * can pick up replication and/or invalidate requests and 
  19.    * observes the max limit
  20.    */
  21.   public void testHeartbeat() throws Exception {
  22.     final Configuration conf = new Configuration();
  23.     final MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
  24.     try {
  25.       cluster.waitActive();
  26.       final FSNamesystem namesystem = cluster.getNameNode().getNamesystem();
  27.       final DatanodeRegistration nodeReg = cluster.getDataNodes().get(0).dnRegistration;
  28.       DatanodeDescriptor dd = namesystem.getDatanode(nodeReg);
  29.       
  30.       final int REMAINING_BLOCKS = 1;
  31.       final int MAX_REPLICATE_LIMIT = conf.getInt("dfs.max-repl-streams", 2);
  32.       final int MAX_INVALIDATE_LIMIT = FSNamesystem.BLOCK_INVALIDATE_CHUNK;
  33.       final int MAX_INVALIDATE_BLOCKS = 2*MAX_INVALIDATE_LIMIT+REMAINING_BLOCKS;
  34.       final int MAX_REPLICATE_BLOCKS = 2*MAX_REPLICATE_LIMIT+REMAINING_BLOCKS;
  35.       final DatanodeDescriptor[] ONE_TARGET = new DatanodeDescriptor[1];
  36.       synchronized (namesystem.heartbeats) {
  37.       for (int i=0; i<MAX_REPLICATE_BLOCKS; i++) {
  38.         dd.addBlockToBeReplicated(
  39.             new Block(i, 0, GenerationStamp.FIRST_VALID_STAMP), ONE_TARGET);
  40.       }
  41.       DatanodeCommand[] cmds = namesystem.handleHeartbeat(
  42.           nodeReg, dd.getCapacity(), dd.getDfsUsed(), dd.getRemaining(), 0, 0);
  43.       assertEquals(1, cmds.length);
  44.       assertEquals(DatanodeProtocol.DNA_TRANSFER, cmds[0].getAction());
  45.       assertEquals(MAX_REPLICATE_LIMIT, ((BlockCommand)cmds[0]).getBlocks().length);
  46.       
  47.       ArrayList<Block> blockList = new ArrayList<Block>(MAX_INVALIDATE_BLOCKS);
  48.       for (int i=0; i<MAX_INVALIDATE_BLOCKS; i++) {
  49.         blockList.add(new Block(i, 0, GenerationStamp.FIRST_VALID_STAMP));
  50.       }
  51.       dd.addBlocksToBeInvalidated(blockList);
  52.            
  53.       cmds = namesystem.handleHeartbeat(
  54.           nodeReg, dd.getCapacity(), dd.getDfsUsed(), dd.getRemaining(), 0, 0);
  55.       assertEquals(2, cmds.length);
  56.       assertEquals(DatanodeProtocol.DNA_TRANSFER, cmds[0].getAction());
  57.       assertEquals(MAX_REPLICATE_LIMIT, ((BlockCommand)cmds[0]).getBlocks().length);
  58.       assertEquals(DatanodeProtocol.DNA_INVALIDATE, cmds[1].getAction());
  59.       assertEquals(MAX_INVALIDATE_LIMIT, ((BlockCommand)cmds[1]).getBlocks().length);
  60.       
  61.       cmds = namesystem.handleHeartbeat(
  62.           nodeReg, dd.getCapacity(), dd.getDfsUsed(), dd.getRemaining(), 0, 0);
  63.       assertEquals(2, cmds.length);
  64.       assertEquals(DatanodeProtocol.DNA_TRANSFER, cmds[0].getAction());
  65.       assertEquals(REMAINING_BLOCKS, ((BlockCommand)cmds[0]).getBlocks().length);
  66.       assertEquals(DatanodeProtocol.DNA_INVALIDATE, cmds[1].getAction());
  67.       assertEquals(MAX_INVALIDATE_LIMIT, ((BlockCommand)cmds[1]).getBlocks().length);
  68.       
  69.       cmds = namesystem.handleHeartbeat(
  70.           nodeReg, dd.getCapacity(), dd.getDfsUsed(), dd.getRemaining(), 0, 0);
  71.       assertEquals(1, cmds.length);
  72.       assertEquals(DatanodeProtocol.DNA_INVALIDATE, cmds[0].getAction());
  73.       assertEquals(REMAINING_BLOCKS, ((BlockCommand)cmds[0]).getBlocks().length);
  74.       cmds = namesystem.handleHeartbeat(
  75.           nodeReg, dd.getCapacity(), dd.getDfsUsed(), dd.getRemaining(), 0, 0);
  76.       assertEquals(null, cmds);
  77.       }
  78.     } finally {
  79.       cluster.shutdown();
  80.     }
  81.   }
  82. }