TestNodeCount.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:4k
源码类别:

网格计算

开发平台:

Java

  1. package org.apache.hadoop.hdfs.server.namenode;
  2. import java.util.Collection;
  3. import java.util.Iterator;
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.fs.FileSystem;
  6. import org.apache.hadoop.fs.Path;
  7. import org.apache.hadoop.hdfs.DFSTestUtil;
  8. import org.apache.hadoop.hdfs.MiniDFSCluster;
  9. import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
  10. import org.apache.hadoop.hdfs.protocol.Block;
  11. import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NumberReplicas;
  12. import junit.framework.TestCase;
  13. /**
  14.  * Test if live nodes count per node is correct 
  15.  * so NN makes right decision for under/over-replicated blocks
  16.  */
  17. public class TestNodeCount extends TestCase {
  18.   public void testNodeCount() throws Exception {
  19.     // start a mini dfs cluster of 2 nodes
  20.     final Configuration conf = new Configuration();
  21.     final short REPLICATION_FACTOR = (short)2;
  22.     final MiniDFSCluster cluster = 
  23.       new MiniDFSCluster(conf, REPLICATION_FACTOR, true, null);
  24.     try {
  25.       final FSNamesystem namesystem = cluster.getNameNode().namesystem;
  26.       final FileSystem fs = cluster.getFileSystem();
  27.       
  28.       // populate the cluster with a one block file
  29.       final Path FILE_PATH = new Path("/testfile");
  30.       DFSTestUtil.createFile(fs, FILE_PATH, 1L, REPLICATION_FACTOR, 1L);
  31.       DFSTestUtil.waitReplication(fs, FILE_PATH, REPLICATION_FACTOR);
  32.       Block block = DFSTestUtil.getFirstBlock(fs, FILE_PATH);
  33.       // keep a copy of all datanode descriptor
  34.       DatanodeDescriptor[] datanodes = (DatanodeDescriptor[])
  35.          namesystem.heartbeats.toArray(new DatanodeDescriptor[REPLICATION_FACTOR]);
  36.       
  37.       // start two new nodes
  38.       cluster.startDataNodes(conf, 2, true, null, null);
  39.       cluster.waitActive();
  40.       
  41.       // bring down first datanode
  42.       DatanodeDescriptor datanode = datanodes[0];
  43.       DataNodeProperties dnprop = cluster.stopDataNode(datanode.getName());
  44.       // make sure that NN detects that the datanode is down
  45.       synchronized (namesystem.heartbeats) {
  46.         datanode.setLastUpdate(0); // mark it dead
  47.         namesystem.heartbeatCheck();
  48.       }
  49.       // the block will be replicated
  50.       DFSTestUtil.waitReplication(fs, FILE_PATH, REPLICATION_FACTOR);
  51.       // restart the first datanode
  52.       cluster.restartDataNode(dnprop);
  53.       cluster.waitActive();
  54.       
  55.       // check if excessive replica is detected
  56.       NumberReplicas num = null;
  57.       do {
  58.        synchronized (namesystem) {
  59.          num = namesystem.countNodes(block);
  60.        }
  61.       } while (num.excessReplicas() == 0);
  62.       
  63.       // find out a non-excess node
  64.       Iterator<DatanodeDescriptor> iter = namesystem.blocksMap.nodeIterator(block);
  65.       DatanodeDescriptor nonExcessDN = null;
  66.       while (iter.hasNext()) {
  67.         DatanodeDescriptor dn = iter.next();
  68.         Collection<Block> blocks = namesystem.excessReplicateMap.get(dn.getStorageID());
  69.         if (blocks == null || !blocks.contains(block) ) {
  70.           nonExcessDN = dn;
  71.           break;
  72.         }
  73.       }
  74.       assertTrue(nonExcessDN!=null);
  75.       
  76.       // bring down non excessive datanode
  77.       dnprop = cluster.stopDataNode(nonExcessDN.getName());
  78.       // make sure that NN detects that the datanode is down
  79.       synchronized (namesystem.heartbeats) {
  80.         nonExcessDN.setLastUpdate(0); // mark it dead
  81.         namesystem.heartbeatCheck();
  82.       }
  83.       
  84.       // The block should be replicated
  85.       do {
  86.         num = namesystem.countNodes(block);
  87.       } while (num.liveReplicas() != REPLICATION_FACTOR);
  88.       
  89.       // restart the first datanode
  90.       cluster.restartDataNode(dnprop);
  91.       cluster.waitActive();
  92.       
  93.       // check if excessive replica is detected
  94.       do {
  95.        num = namesystem.countNodes(block);
  96.       } while (num.excessReplicas() == 2);
  97.     } finally {
  98.       cluster.shutdown();
  99.     }
  100.   }
  101. }