TestDecommission.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:10k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfs;
  19. import java.io.IOException;
  20. import java.net.InetSocketAddress;
  21. import java.util.ArrayList;
  22. import java.util.Collection;
  23. import java.util.Iterator;
  24. import java.util.Random;
  25. import junit.framework.TestCase;
  26. import org.apache.hadoop.conf.Configuration;
  27. import org.apache.hadoop.fs.BlockLocation;
  28. import org.apache.hadoop.fs.FSDataOutputStream;
  29. import org.apache.hadoop.fs.FileSystem;
  30. import org.apache.hadoop.fs.Path;
  31. import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
  32. import org.apache.hadoop.hdfs.protocol.LocatedBlock;
  33. import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
  34. import org.apache.hadoop.hdfs.server.namenode.NameNode;
  35. /**
  36.  * This class tests the decommissioning of nodes.
  37.  */
  38. public class TestDecommission extends TestCase {
  39.   static final long seed = 0xDEADBEEFL;
  40.   static final int blockSize = 8192;
  41.   static final int fileSize = 16384;
  42.   static final int numDatanodes = 6;
  43.   Random myrand = new Random();
  44.   Path hostsFile;
  45.   Path excludeFile;
  46.   ArrayList<String> decommissionedNodes = new ArrayList<String>(numDatanodes);
  47.   private enum NodeState {NORMAL, DECOMMISSION_INPROGRESS, DECOMMISSIONED; }
  48.   private void writeConfigFile(FileSystem fs, Path name, ArrayList<String> nodes) 
  49.     throws IOException {
  50.     // delete if it already exists
  51.     if (fs.exists(name)) {
  52.       fs.delete(name, true);
  53.     }
  54.     FSDataOutputStream stm = fs.create(name);
  55.     
  56.     if (nodes != null) {
  57.       for (Iterator<String> it = nodes.iterator(); it.hasNext();) {
  58.         String node = it.next();
  59.         stm.writeBytes(node);
  60.         stm.writeBytes("n");
  61.       }
  62.     }
  63.     stm.close();
  64.   }
  65.   private void writeFile(FileSystem fileSys, Path name, int repl)
  66.     throws IOException {
  67.     // create and write a file that contains three blocks of data
  68.     FSDataOutputStream stm = fileSys.create(name, true, 
  69.                                             fileSys.getConf().getInt("io.file.buffer.size", 4096),
  70.                                             (short)repl, (long)blockSize);
  71.     byte[] buffer = new byte[fileSize];
  72.     Random rand = new Random(seed);
  73.     rand.nextBytes(buffer);
  74.     stm.write(buffer);
  75.     stm.close();
  76.   }
  77.   
  78.   
  79.   private void checkFile(FileSystem fileSys, Path name, int repl)
  80.     throws IOException {
  81.     DFSTestUtil.waitReplication(fileSys, name, (short) repl);
  82.   }
  83.   private void printFileLocations(FileSystem fileSys, Path name)
  84.   throws IOException {
  85.     BlockLocation[] locations = fileSys.getFileBlockLocations(
  86.         fileSys.getFileStatus(name), 0, fileSize);
  87.     for (int idx = 0; idx < locations.length; idx++) {
  88.       String[] loc = locations[idx].getHosts();
  89.       System.out.print("Block[" + idx + "] : ");
  90.       for (int j = 0; j < loc.length; j++) {
  91.         System.out.print(loc[j] + " ");
  92.       }
  93.       System.out.println("");
  94.     }
  95.   }
  96.   /**
  97.    * For blocks that reside on the nodes that are down, verify that their
  98.    * replication factor is 1 more than the specified one.
  99.    */
  100.   private void checkFile(FileSystem fileSys, Path name, int repl,
  101.                          String downnode) throws IOException {
  102.     //
  103.     // sleep an additional 10 seconds for the blockreports from the datanodes
  104.     // to arrive. 
  105.     //
  106.     // need a raw stream
  107.     assertTrue("Not HDFS:"+fileSys.getUri(), fileSys instanceof DistributedFileSystem);
  108.         
  109.     DFSClient.DFSDataInputStream dis = (DFSClient.DFSDataInputStream) 
  110.       ((DistributedFileSystem)fileSys).open(name);
  111.     Collection<LocatedBlock> dinfo = dis.getAllBlocks();
  112.     for (LocatedBlock blk : dinfo) { // for each block
  113.       int hasdown = 0;
  114.       DatanodeInfo[] nodes = blk.getLocations();
  115.       for (int j = 0; j < nodes.length; j++) {     // for each replica
  116.         if (nodes[j].getName().equals(downnode)) {
  117.           hasdown++;
  118.           System.out.println("Block " + blk.getBlock() + " replica " +
  119.                              nodes[j].getName() + " is decommissioned.");
  120.         }
  121.       }
  122.       System.out.println("Block " + blk.getBlock() + " has " + hasdown +
  123.                          " decommissioned replica.");
  124.       assertEquals("Number of replicas for block" + blk.getBlock(),
  125.                    Math.min(numDatanodes, repl+hasdown), nodes.length);  
  126.     }
  127.   }
  128.   
  129.   private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
  130.     assertTrue(fileSys.exists(name));
  131.     fileSys.delete(name, true);
  132.     assertTrue(!fileSys.exists(name));
  133.   }
  134.   private void printDatanodeReport(DatanodeInfo[] info) {
  135.     System.out.println("-------------------------------------------------");
  136.     for (int i = 0; i < info.length; i++) {
  137.       System.out.println(info[i].getDatanodeReport());
  138.       System.out.println();
  139.     }
  140.   }
  141.   /*
  142.    * decommission one random node.
  143.    */
  144.   private String decommissionNode(NameNode namenode,
  145.                                   Configuration conf,
  146.                                   DFSClient client, 
  147.                                   FileSystem localFileSys)
  148.     throws IOException {
  149.     DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
  150.     //
  151.     // pick one datanode randomly.
  152.     //
  153.     int index = 0;
  154.     boolean found = false;
  155.     while (!found) {
  156.       index = myrand.nextInt(info.length);
  157.       if (!info[index].isDecommissioned()) {
  158.         found = true;
  159.       }
  160.     }
  161.     String nodename = info[index].getName();
  162.     System.out.println("Decommissioning node: " + nodename);
  163.     // write nodename into the exclude file.
  164.     ArrayList<String> nodes = new ArrayList<String>(decommissionedNodes);
  165.     nodes.add(nodename);
  166.     writeConfigFile(localFileSys, excludeFile, nodes);
  167.     namenode.namesystem.refreshNodes(conf);
  168.     return nodename;
  169.   }
  170.   /*
  171.    * Check if node is in the requested state.
  172.    */
  173.   private boolean checkNodeState(FileSystem filesys, 
  174.                                  String node, 
  175.                                  NodeState state) throws IOException {
  176.     DistributedFileSystem dfs = (DistributedFileSystem) filesys;
  177.     boolean done = false;
  178.     boolean foundNode = false;
  179.     DatanodeInfo[] datanodes = dfs.getDataNodeStats();
  180.     for (int i = 0; i < datanodes.length; i++) {
  181.       DatanodeInfo dn = datanodes[i];
  182.       if (dn.getName().equals(node)) {
  183.         if (state == NodeState.DECOMMISSIONED) {
  184.           done = dn.isDecommissioned();
  185.         } else if (state == NodeState.DECOMMISSION_INPROGRESS) {
  186.           done = dn.isDecommissionInProgress();
  187.         } else {
  188.           done = (!dn.isDecommissionInProgress() && !dn.isDecommissioned());
  189.         }
  190.         System.out.println(dn.getDatanodeReport());
  191.         foundNode = true;
  192.       }
  193.     }
  194.     if (!foundNode) {
  195.       throw new IOException("Could not find node: " + node);
  196.     }
  197.     return done;
  198.   }
  199.   /* 
  200.    * Wait till node is fully decommissioned.
  201.    */
  202.   private void waitNodeState(FileSystem filesys,
  203.                              String node,
  204.                              NodeState state) throws IOException {
  205.     boolean done = checkNodeState(filesys, node, state);
  206.     while (!done) {
  207.       System.out.println("Waiting for node " + node +
  208.                          " to change state to " + state);
  209.       try {
  210.         Thread.sleep(1000);
  211.       } catch (InterruptedException e) {
  212.         // nothing
  213.       }
  214.       done = checkNodeState(filesys, node, state);
  215.     }
  216.   }
  217.   
  218.   /**
  219.    * Tests Decommission in DFS.
  220.    */
  221.   public void testDecommission() throws IOException {
  222.     Configuration conf = new Configuration();
  223.     conf.setBoolean("dfs.replication.considerLoad", false);
  224.     // Set up the hosts/exclude files.
  225.     FileSystem localFileSys = FileSystem.getLocal(conf);
  226.     Path workingDir = localFileSys.getWorkingDirectory();
  227.     Path dir = new Path(workingDir, "build/test/data/work-dir/decommission");
  228.     assertTrue(localFileSys.mkdirs(dir));
  229.     hostsFile = new Path(dir, "hosts");
  230.     excludeFile = new Path(dir, "exclude");
  231.     conf.set("dfs.hosts.exclude", excludeFile.toUri().getPath());
  232.     conf.setInt("heartbeat.recheck.interval", 2000);
  233.     conf.setInt("dfs.heartbeat.interval", 1);
  234.     conf.setInt("dfs.replication.pending.timeout.sec", 4);
  235.     writeConfigFile(localFileSys, excludeFile, null);
  236.     MiniDFSCluster cluster = new MiniDFSCluster(conf, numDatanodes, true, null);
  237.     cluster.waitActive();
  238.     InetSocketAddress addr = new InetSocketAddress("localhost", 
  239.                                                    cluster.getNameNodePort());
  240.     DFSClient client = new DFSClient(addr, conf);
  241.     DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
  242.     assertEquals("Number of Datanodes ", numDatanodes, info.length);
  243.     FileSystem fileSys = cluster.getFileSystem();
  244.     try {
  245.       for (int iteration = 0; iteration < numDatanodes - 1; iteration++) {
  246.         int replicas = numDatanodes - iteration - 1;
  247.         //
  248.         // Decommission one node. Verify that node is decommissioned.
  249.         // 
  250.         Path file1 = new Path("decommission.dat");
  251.         writeFile(fileSys, file1, replicas);
  252.         System.out.println("Created file decommission.dat with " +
  253.                            replicas + " replicas.");
  254.         checkFile(fileSys, file1, replicas);
  255.         printFileLocations(fileSys, file1);
  256.         String downnode = decommissionNode(cluster.getNameNode(), conf,
  257.                                            client, localFileSys);
  258.         decommissionedNodes.add(downnode);
  259.         waitNodeState(fileSys, downnode, NodeState.DECOMMISSIONED);
  260.         checkFile(fileSys, file1, replicas, downnode);
  261.         cleanupFile(fileSys, file1);
  262.         cleanupFile(localFileSys, dir);
  263.       }
  264.     } catch (IOException e) {
  265.       info = client.datanodeReport(DatanodeReportType.ALL);
  266.       printDatanodeReport(info);
  267.       throw e;
  268.     } finally {
  269.       fileSys.close();
  270.       cluster.shutdown();
  271.     }
  272.   }
  273. }