TestDatanodeBlockScanner.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:17k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfs;
  19. import java.io.IOException;
  20. import java.net.InetSocketAddress;
  21. import java.net.URL;
  22. import java.util.regex.Matcher;
  23. import java.util.regex.Pattern;
  24. import java.io.*;
  25. import java.nio.channels.FileChannel;
  26. import java.util.Random;
  27. import org.apache.commons.logging.Log;
  28. import org.apache.commons.logging.LogFactory;
  29. import org.apache.hadoop.conf.Configuration;
  30. import org.apache.hadoop.hdfs.protocol.Block;
  31. import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
  32. import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
  33. import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
  34. import org.apache.hadoop.fs.FileSystem;
  35. import org.apache.hadoop.fs.Path;
  36. import org.apache.hadoop.io.IOUtils;
  37. import junit.framework.TestCase;
  38. /**
  39.  * This test verifies that block verification occurs on the datanode
  40.  */
  41. public class TestDatanodeBlockScanner extends TestCase {
  42.   
  43.   private static final Log LOG = 
  44.                  LogFactory.getLog(TestDatanodeBlockScanner.class);
  45.   
  46.   private static Pattern pattern = 
  47.              Pattern.compile(".*?(blk_[-]*\d+).*?scan time\s*:\s*(\d+)");
  48.   /**
  49.    * This connects to datanode and fetches block verification data.
  50.    * It repeats this until the given block has a verification time > 0.
  51.    */
  52.   private static long waitForVerification(DatanodeInfo dn, FileSystem fs, 
  53.                                           Path file) throws IOException {
  54.     URL url = new URL("http://localhost:" + dn.getInfoPort() +
  55.                       "/blockScannerReport?listblocks");
  56.     long lastWarnTime = System.currentTimeMillis();
  57.     long verificationTime = 0;
  58.     
  59.     String block = DFSTestUtil.getFirstBlock(fs, file).getBlockName();
  60.     
  61.     while (verificationTime <= 0) {
  62.       String response = DFSTestUtil.urlGet(url);
  63.       for(Matcher matcher = pattern.matcher(response); matcher.find();) {
  64.         if (block.equals(matcher.group(1))) {
  65.           verificationTime = Long.parseLong(matcher.group(2));
  66.           break;
  67.         }
  68.       }
  69.       
  70.       if (verificationTime <= 0) {
  71.         long now = System.currentTimeMillis();
  72.         if ((now - lastWarnTime) >= 5*1000) {
  73.           LOG.info("Waiting for verification of " + block);
  74.           lastWarnTime = now; 
  75.         }
  76.         try {
  77.           Thread.sleep(500);
  78.         } catch (InterruptedException ignored) {}
  79.       }
  80.     }
  81.     
  82.     return verificationTime;
  83.   }
  84.   public void testDatanodeBlockScanner() throws IOException {
  85.     
  86.     long startTime = System.currentTimeMillis();
  87.     
  88.     Configuration conf = new Configuration();
  89.     MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
  90.     cluster.waitActive();
  91.     
  92.     FileSystem fs = cluster.getFileSystem();
  93.     Path file1 = new Path("/tmp/testBlockVerification/file1");
  94.     Path file2 = new Path("/tmp/testBlockVerification/file2");
  95.     
  96.     /*
  97.      * Write the first file and restart the cluster.
  98.      */
  99.     DFSTestUtil.createFile(fs, file1, 10, (short)1, 0);
  100.     cluster.shutdown();
  101.     cluster = new MiniDFSCluster(conf, 1, false, null);
  102.     cluster.waitActive();
  103.     
  104.     DFSClient dfsClient =  new DFSClient(new InetSocketAddress("localhost", 
  105.                                          cluster.getNameNodePort()), conf);
  106.     fs = cluster.getFileSystem();
  107.     DatanodeInfo dn = dfsClient.datanodeReport(DatanodeReportType.LIVE)[0];
  108.     
  109.     /*
  110.      * The cluster restarted. The block should be verified by now.
  111.      */
  112.     assertTrue(waitForVerification(dn, fs, file1) > startTime);
  113.     
  114.     /*
  115.      * Create a new file and read the block. The block should be marked 
  116.      * verified since the client reads the block and verifies checksum. 
  117.      */
  118.     DFSTestUtil.createFile(fs, file2, 10, (short)1, 0);
  119.     IOUtils.copyBytes(fs.open(file2), new IOUtils.NullOutputStream(), 
  120.                       conf, true); 
  121.     assertTrue(waitForVerification(dn, fs, file2) > startTime);
  122.     
  123.     cluster.shutdown();
  124.   }
  125.   public static boolean corruptReplica(String blockName, int replica) throws IOException {
  126.     Random random = new Random();
  127.     File baseDir = new File(System.getProperty("test.build.data"), "dfs/data");
  128.     boolean corrupted = false;
  129.     for (int i=replica*2; i<replica*2+2; i++) {
  130.       File blockFile = new File(baseDir, "data" + (i+1)+ "/current/" + 
  131.                                blockName);
  132.       if (blockFile.exists()) {
  133.         // Corrupt replica by writing random bytes into replica
  134.         RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
  135.         FileChannel channel = raFile.getChannel();
  136.         String badString = "BADBAD";
  137.         int rand = random.nextInt((int)channel.size()/2);
  138.         raFile.seek(rand);
  139.         raFile.write(badString.getBytes());
  140.         raFile.close();
  141.         corrupted = true;
  142.       }
  143.     }
  144.     return corrupted;
  145.   }
  146.   public void testBlockCorruptionPolicy() throws IOException {
  147.     Configuration conf = new Configuration();
  148.     conf.setLong("dfs.blockreport.intervalMsec", 1000L);
  149.     Random random = new Random();
  150.     FileSystem fs = null;
  151.     DFSClient dfsClient = null;
  152.     LocatedBlocks blocks = null;
  153.     int blockCount = 0;
  154.     int rand = random.nextInt(3);
  155.     MiniDFSCluster cluster = new MiniDFSCluster(conf, 3, true, null);
  156.     cluster.waitActive();
  157.     fs = cluster.getFileSystem();
  158.     Path file1 = new Path("/tmp/testBlockVerification/file1");
  159.     DFSTestUtil.createFile(fs, file1, 1024, (short)3, 0);
  160.     String block = DFSTestUtil.getFirstBlock(fs, file1).getBlockName();
  161.     
  162.     dfsClient = new DFSClient(new InetSocketAddress("localhost", 
  163.                                         cluster.getNameNodePort()), conf);
  164.     do {
  165.       blocks = dfsClient.namenode.
  166.                    getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
  167.       blockCount = blocks.get(0).getLocations().length;
  168.       try {
  169.         LOG.info("Looping until expected blockCount of 3 is received");
  170.         Thread.sleep(1000);
  171.       } catch (InterruptedException ignore) {
  172.       }
  173.     } while (blockCount != 3);
  174.     assertTrue(blocks.get(0).isCorrupt() == false);
  175.     // Corrupt random replica of block 
  176.     corruptReplica(block, rand);
  177.     // Restart the datanode hoping the corrupt block to be reported
  178.     cluster.restartDataNode(rand);
  179.     // We have 2 good replicas and block is not corrupt
  180.     do {
  181.       blocks = dfsClient.namenode.
  182.                    getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
  183.       blockCount = blocks.get(0).getLocations().length;
  184.       try {
  185.         LOG.info("Looping until expected blockCount of 2 is received");
  186.         Thread.sleep(1000);
  187.       } catch (InterruptedException ignore) {
  188.       }
  189.     } while (blockCount != 2);
  190.     assertTrue(blocks.get(0).isCorrupt() == false);
  191.   
  192.     // Corrupt all replicas. Now, block should be marked as corrupt
  193.     // and we should get all the replicas 
  194.     corruptReplica(block, 0);
  195.     corruptReplica(block, 1);
  196.     corruptReplica(block, 2);
  197.     // Read the file to trigger reportBadBlocks by client
  198.     try {
  199.       IOUtils.copyBytes(fs.open(file1), new IOUtils.NullOutputStream(), 
  200.                         conf, true);
  201.     } catch (IOException e) {
  202.       // Ignore exception
  203.     }
  204.     // We now have the blocks to be marked as corrupt and we get back all
  205.     // its replicas
  206.     do {
  207.       blocks = dfsClient.namenode.
  208.                    getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
  209.       blockCount = blocks.get(0).getLocations().length;
  210.       try {
  211.         LOG.info("Looping until expected blockCount of 3 is received");
  212.         Thread.sleep(1000);
  213.       } catch (InterruptedException ignore) {
  214.       }
  215.     } while (blockCount != 3);
  216.     assertTrue(blocks.get(0).isCorrupt() == true);
  217.     cluster.shutdown();
  218.   }
  219.   
  220.   /**
  221.    * testBlockCorruptionRecoveryPolicy.
  222.    * This tests recovery of corrupt replicas, first for one corrupt replica
  223.    * then for two. The test invokes blockCorruptionRecoveryPolicy which
  224.    * 1. Creates a block with desired number of replicas
  225.    * 2. Corrupts the desired number of replicas and restarts the datanodes
  226.    *    containing the corrupt replica. Additionaly we also read the block
  227.    *    in case restarting does not report corrupt replicas.
  228.    *    Restarting or reading from the datanode would trigger reportBadBlocks 
  229.    *    to namenode.
  230.    *    NameNode adds it to corruptReplicasMap and neededReplication
  231.    * 3. Test waits until all corrupt replicas are reported, meanwhile
  232.    *    Re-replciation brings the block back to healthy state
  233.    * 4. Test again waits until the block is reported with expected number
  234.    *    of good replicas.
  235.    */
  236.   public void testBlockCorruptionRecoveryPolicy() throws IOException {
  237.     // Test recovery of 1 corrupt replica
  238.     LOG.info("Testing corrupt replica recovery for one corrupt replica");
  239.     blockCorruptionRecoveryPolicy(4, (short)3, 1);
  240.     // Test recovery of 2 corrupt replicas
  241.     LOG.info("Testing corrupt replica recovery for two corrupt replicas");
  242.     blockCorruptionRecoveryPolicy(5, (short)3, 2);
  243.   }
  244.   
  245.   private void blockCorruptionRecoveryPolicy(int numDataNodes, 
  246.                                              short numReplicas,
  247.                                              int numCorruptReplicas) 
  248.                                              throws IOException {
  249.     Configuration conf = new Configuration();
  250.     conf.setLong("dfs.blockreport.intervalMsec", 30L);
  251.     conf.setLong("dfs.replication.interval", 30);
  252.     conf.setLong("dfs.heartbeat.interval", 30L);
  253.     conf.setBoolean("dfs.replication.considerLoad", false);
  254.     Random random = new Random();
  255.     FileSystem fs = null;
  256.     DFSClient dfsClient = null;
  257.     LocatedBlocks blocks = null;
  258.     int replicaCount = 0;
  259.     int rand = random.nextInt(numDataNodes);
  260.     MiniDFSCluster cluster = new MiniDFSCluster(conf, numDataNodes, true, null);
  261.     cluster.waitActive();
  262.     fs = cluster.getFileSystem();
  263.     Path file1 = new Path("/tmp/testBlockCorruptRecovery/file");
  264.     DFSTestUtil.createFile(fs, file1, 1024, numReplicas, 0);
  265.     Block blk = DFSTestUtil.getFirstBlock(fs, file1);
  266.     String block = blk.getBlockName();
  267.     
  268.     dfsClient = new DFSClient(new InetSocketAddress("localhost", 
  269.                                         cluster.getNameNodePort()), conf);
  270.     blocks = dfsClient.namenode.
  271.                getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
  272.     replicaCount = blocks.get(0).getLocations().length;
  273.     // Wait until block is replicated to numReplicas
  274.     while (replicaCount != numReplicas) {
  275.       try {
  276.         LOG.info("Looping until expected replicaCount of " + numReplicas +
  277.                   "is reached");
  278.         Thread.sleep(1000);
  279.       } catch (InterruptedException ignore) {
  280.       }
  281.       blocks = dfsClient.namenode.
  282.                    getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
  283.       replicaCount = blocks.get(0).getLocations().length;
  284.     }
  285.     assertTrue(blocks.get(0).isCorrupt() == false);
  286.     // Corrupt numCorruptReplicas replicas of block 
  287.     int[] corruptReplicasDNIDs = new int[numCorruptReplicas];
  288.     for (int i=0, j=0; (j != numCorruptReplicas) && (i < numDataNodes); i++) {
  289.       if (corruptReplica(block, i)) 
  290.         corruptReplicasDNIDs[j++] = i;
  291.     }
  292.     
  293.     // Restart the datanodes containing corrupt replicas 
  294.     // so they would be reported to namenode and re-replicated
  295.     for (int i =0; i < numCorruptReplicas; i++) 
  296.      cluster.restartDataNode(corruptReplicasDNIDs[i]);
  297.     // Loop until all corrupt replicas are reported
  298.     int corruptReplicaSize = cluster.getNameNode().namesystem.
  299.                               corruptReplicas.numCorruptReplicas(blk);
  300.     while (corruptReplicaSize != numCorruptReplicas) {
  301.       try {
  302.         IOUtils.copyBytes(fs.open(file1), new IOUtils.NullOutputStream(), 
  303.                           conf, true);
  304.       } catch (IOException e) {
  305.       }
  306.       try {
  307.         LOG.info("Looping until expected " + numCorruptReplicas + " are " +
  308.                  "reported. Current reported " + corruptReplicaSize);
  309.         Thread.sleep(1000);
  310.       } catch (InterruptedException ignore) {
  311.       }
  312.       corruptReplicaSize = cluster.getNameNode().namesystem.
  313.                               corruptReplicas.numCorruptReplicas(blk);
  314.     }
  315.     
  316.     // Loop until the block recovers after replication
  317.     blocks = dfsClient.namenode.
  318.                getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
  319.     replicaCount = blocks.get(0).getLocations().length;
  320.     while (replicaCount != numReplicas) {
  321.       try {
  322.         LOG.info("Looping until block gets rereplicated to " + numReplicas);
  323.         Thread.sleep(1000);
  324.       } catch (InterruptedException ignore) {
  325.       }
  326.       blocks = dfsClient.namenode.
  327.                  getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
  328.       replicaCount = blocks.get(0).getLocations().length;
  329.     }
  330.     // Make sure the corrupt replica is invalidated and removed from
  331.     // corruptReplicasMap
  332.     corruptReplicaSize = cluster.getNameNode().namesystem.
  333.                           corruptReplicas.numCorruptReplicas(blk);
  334.     while (corruptReplicaSize != 0 || replicaCount != numReplicas) {
  335.       try {
  336.         LOG.info("Looping until corrupt replica is invalidated");
  337.         Thread.sleep(1000);
  338.       } catch (InterruptedException ignore) {
  339.       }
  340.       corruptReplicaSize = cluster.getNameNode().namesystem.
  341.                             corruptReplicas.numCorruptReplicas(blk);
  342.       blocks = dfsClient.namenode.
  343.                  getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
  344.       replicaCount = blocks.get(0).getLocations().length;
  345.     }
  346.     // Make sure block is healthy 
  347.     assertTrue(corruptReplicaSize == 0);
  348.     assertTrue(replicaCount == numReplicas);
  349.     assertTrue(blocks.get(0).isCorrupt() == false);
  350.     cluster.shutdown();
  351.   }
  352.   
  353.   /** Test if NameNode handles truncated blocks in block report */
  354.   public void testTruncatedBlockReport() throws Exception {
  355.     final Configuration conf = new Configuration();
  356.     final short REPLICATION_FACTOR = (short)2;
  357.     MiniDFSCluster cluster = new MiniDFSCluster(conf, REPLICATION_FACTOR, true, null);
  358.     cluster.waitActive();
  359.     FileSystem fs = cluster.getFileSystem();
  360.     try {
  361.       final Path fileName = new Path("/file1");
  362.       DFSTestUtil.createFile(fs, fileName, 1, REPLICATION_FACTOR, 0);
  363.       DFSTestUtil.waitReplication(fs, fileName, REPLICATION_FACTOR);
  364.       String block = DFSTestUtil.getFirstBlock(fs, fileName).getBlockName();
  365.       // Truncate replica of block
  366.       changeReplicaLength(block, 0, -1);
  367.       cluster.shutdown();
  368.       // restart the cluster
  369.       cluster = new MiniDFSCluster(
  370.           0, conf, REPLICATION_FACTOR, false, true, null, null, null);
  371.       cluster.startDataNodes(conf, 1, true, null, null);
  372.       cluster.waitActive();  // now we have 3 datanodes
  373.       // wait for truncated block be detected and the block to be replicated
  374.       DFSTestUtil.waitReplication(
  375.           cluster.getFileSystem(), fileName, REPLICATION_FACTOR);
  376.       
  377.       // Make sure that truncated block will be deleted
  378.       waitForBlockDeleted(block, 0);
  379.     } finally {
  380.       cluster.shutdown();
  381.     }
  382.   }
  383.   
  384.   /**
  385.    * Change the length of a block at datanode dnIndex
  386.    */
  387.   static boolean changeReplicaLength(String blockName, int dnIndex, int lenDelta) throws IOException {
  388.     File baseDir = new File(System.getProperty("test.build.data"), "dfs/data");
  389.     for (int i=dnIndex*2; i<dnIndex*2+2; i++) {
  390.       File blockFile = new File(baseDir, "data" + (i+1)+ "/current/" + 
  391.                                blockName);
  392.       if (blockFile.exists()) {
  393.         RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
  394.         raFile.setLength(raFile.length()+lenDelta);
  395.         raFile.close();
  396.         return true;
  397.       }
  398.     }
  399.     return false;
  400.   }
  401.   
  402.   private static void waitForBlockDeleted(String blockName, int dnIndex) 
  403.   throws IOException, InterruptedException {
  404.     File baseDir = new File(System.getProperty("test.build.data"), "dfs/data");
  405.     File blockFile1 = new File(baseDir, "data" + (2*dnIndex+1)+ "/current/" + 
  406.         blockName);
  407.     File blockFile2 = new File(baseDir, "data" + (2*dnIndex+2)+ "/current/" + 
  408.         blockName);
  409.     while (blockFile1.exists() || blockFile2.exists()) {
  410.       Thread.sleep(100);
  411.     }
  412.   }
  413. }