TestFsck.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:13k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfs.server.namenode;
  19. import java.io.ByteArrayOutputStream;
  20. import java.io.PrintStream;
  21. import java.net.InetSocketAddress;
  22. import java.io.File;
  23. import java.io.RandomAccessFile;
  24. import java.lang.Exception;
  25. import java.io.IOException;
  26. import java.nio.channels.FileChannel;
  27. import java.util.Random;
  28. import junit.framework.TestCase;
  29. import org.apache.commons.logging.impl.Log4JLogger;
  30. import org.apache.log4j.Level;
  31. import org.apache.hadoop.conf.Configuration;
  32. import org.apache.hadoop.fs.FileSystem;
  33. import org.apache.hadoop.fs.Path;
  34. import org.apache.hadoop.fs.FSDataOutputStream;
  35. import org.apache.hadoop.util.ToolRunner;
  36. import org.apache.hadoop.hdfs.DFSClient;
  37. import org.apache.hadoop.hdfs.DFSTestUtil;
  38. import org.apache.hadoop.hdfs.MiniDFSCluster;
  39. import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
  40. import org.apache.hadoop.hdfs.tools.DFSck;
  41. import org.apache.hadoop.io.IOUtils;
  42. /**
  43.  * A JUnit test for doing fsck
  44.  */
  45. public class TestFsck extends TestCase {
  46.   static String runFsck(Configuration conf, int expectedErrCode, 
  47.                         boolean checkErrorCode,String... path) 
  48.                         throws Exception {
  49.     PrintStream oldOut = System.out;
  50.     ByteArrayOutputStream bStream = new ByteArrayOutputStream();
  51.     PrintStream newOut = new PrintStream(bStream, true);
  52.     System.setOut(newOut);
  53.     ((Log4JLogger)PermissionChecker.LOG).getLogger().setLevel(Level.ALL);
  54.     int errCode = ToolRunner.run(new DFSck(conf), path);
  55.     if (checkErrorCode)
  56.       assertEquals(expectedErrCode, errCode);
  57.     ((Log4JLogger)PermissionChecker.LOG).getLogger().setLevel(Level.INFO);
  58.     System.setOut(oldOut);
  59.     return bStream.toString();
  60.   }
  61.   /** do fsck */
  62.   public void testFsck() throws Exception {
  63.     DFSTestUtil util = new DFSTestUtil("TestFsck", 20, 3, 8*1024);
  64.     MiniDFSCluster cluster = null;
  65.     FileSystem fs = null;
  66.     try {
  67.       Configuration conf = new Configuration();
  68.       conf.setLong("dfs.blockreport.intervalMsec", 10000L);
  69.       cluster = new MiniDFSCluster(conf, 4, true, null);
  70.       fs = cluster.getFileSystem();
  71.       util.createFiles(fs, "/srcdat");
  72.       util.waitReplication(fs, "/srcdat", (short)3);
  73.       String outStr = runFsck(conf, 0, true, "/");
  74.       assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
  75.       System.out.println(outStr);
  76.       if (fs != null) {try{fs.close();} catch(Exception e){}}
  77.       cluster.shutdown();
  78.       
  79.       // restart the cluster; bring up namenode but not the data nodes
  80.       cluster = new MiniDFSCluster(conf, 0, false, null);
  81.       outStr = runFsck(conf, 1, true, "/");
  82.       // expect the result is corrupt
  83.       assertTrue(outStr.contains(NamenodeFsck.CORRUPT_STATUS));
  84.       System.out.println(outStr);
  85.       
  86.       // bring up data nodes & cleanup cluster
  87.       cluster.startDataNodes(conf, 4, true, null, null);
  88.       cluster.waitActive();
  89.       cluster.waitClusterUp();
  90.       fs = cluster.getFileSystem();
  91.       util.cleanup(fs, "/srcdat");
  92.     } finally {
  93.       if (fs != null) {try{fs.close();} catch(Exception e){}}
  94.       if (cluster != null) { cluster.shutdown(); }
  95.     }
  96.   }
  97.   public void testFsckNonExistent() throws Exception {
  98.     DFSTestUtil util = new DFSTestUtil("TestFsck", 20, 3, 8*1024);
  99.     MiniDFSCluster cluster = null;
  100.     FileSystem fs = null;
  101.     try {
  102.       Configuration conf = new Configuration();
  103.       conf.setLong("dfs.blockreport.intervalMsec", 10000L);
  104.       cluster = new MiniDFSCluster(conf, 4, true, null);
  105.       fs = cluster.getFileSystem();
  106.       util.createFiles(fs, "/srcdat");
  107.       util.waitReplication(fs, "/srcdat", (short)3);
  108.       String outStr = runFsck(conf, 0, true, "/non-existent");
  109.       assertEquals(-1, outStr.indexOf(NamenodeFsck.HEALTHY_STATUS));
  110.       System.out.println(outStr);
  111.       util.cleanup(fs, "/srcdat");
  112.     } finally {
  113.       if (fs != null) {try{fs.close();} catch(Exception e){}}
  114.       if (cluster != null) { cluster.shutdown(); }
  115.     }
  116.   }
  117.   public void testFsckMove() throws Exception {
  118.     DFSTestUtil util = new DFSTestUtil("TestFsck", 5, 3, 8*1024);
  119.     MiniDFSCluster cluster = null;
  120.     FileSystem fs = null;
  121.     try {
  122.       Configuration conf = new Configuration();
  123.       conf.setLong("dfs.blockreport.intervalMsec", 10000L);
  124.       cluster = new MiniDFSCluster(conf, 4, true, null);
  125.       String topDir = "/srcdat";
  126.       fs = cluster.getFileSystem();
  127.       cluster.waitActive();
  128.       util.createFiles(fs, topDir);
  129.       util.waitReplication(fs, topDir, (short)3);
  130.       String outStr = runFsck(conf, 0, true, "/");
  131.       assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
  132.       
  133.       // Corrupt a block by deleting it
  134.       String[] fileNames = util.getFileNames(topDir);
  135.       DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost",
  136.                                           cluster.getNameNodePort()), conf);
  137.       String block = dfsClient.namenode.
  138.                       getBlockLocations(fileNames[0], 0, Long.MAX_VALUE).
  139.                       get(0).getBlock().getBlockName();
  140.       File baseDir = new File(System.getProperty("test.build.data",
  141.                                                  "build/test/data"),"dfs/data");
  142.       for (int i=0; i<8; i++) {
  143.         File blockFile = new File(baseDir, "data" +(i+1)+ "/current/" + block);
  144.         if(blockFile.exists()) {
  145.           assertTrue(blockFile.delete());
  146.         }
  147.       }
  148.       // We excpect the filesystem to be corrupted
  149.       outStr = runFsck(conf, 1, false, "/");
  150.       while (!outStr.contains(NamenodeFsck.CORRUPT_STATUS)) {
  151.         try {
  152.           Thread.sleep(100);
  153.         } catch (InterruptedException ignore) {
  154.         }
  155.         outStr = runFsck(conf, 1, false, "/");
  156.       } 
  157.       
  158.       // Fix the filesystem by moving corrupted files to lost+found
  159.       outStr = runFsck(conf, 1, true, "/", "-move");
  160.       assertTrue(outStr.contains(NamenodeFsck.CORRUPT_STATUS));
  161.       
  162.       // Check to make sure we have healthy filesystem
  163.       outStr = runFsck(conf, 0, true, "/");
  164.       assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS)); 
  165.       util.cleanup(fs, topDir);
  166.       if (fs != null) {try{fs.close();} catch(Exception e){}}
  167.       cluster.shutdown();
  168.     } finally {
  169.       if (fs != null) {try{fs.close();} catch(Exception e){}}
  170.       if (cluster != null) { cluster.shutdown(); }
  171.     }
  172.   }
  173.   
  174.   public void testFsckOpenFiles() throws Exception {
  175.     DFSTestUtil util = new DFSTestUtil("TestFsck", 4, 3, 8*1024); 
  176.     MiniDFSCluster cluster = null;
  177.     FileSystem fs = null;
  178.     try {
  179.       Configuration conf = new Configuration();
  180.       conf.setLong("dfs.blockreport.intervalMsec", 10000L);
  181.       cluster = new MiniDFSCluster(conf, 4, true, null);
  182.       String topDir = "/srcdat";
  183.       String randomString = "HADOOP  ";
  184.       fs = cluster.getFileSystem();
  185.       cluster.waitActive();
  186.       util.createFiles(fs, topDir);
  187.       util.waitReplication(fs, topDir, (short)3);
  188.       String outStr = runFsck(conf, 0, true, "/");
  189.       assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
  190.       // Open a file for writing and do not close for now
  191.       Path openFile = new Path(topDir + "/openFile");
  192.       FSDataOutputStream out = fs.create(openFile);
  193.       int writeCount = 0;
  194.       while (writeCount != 100) {
  195.         out.write(randomString.getBytes());
  196.         writeCount++;                  
  197.       }
  198.       // We expect the filesystem to be HEALTHY and show one open file
  199.       outStr = runFsck(conf, 0, true, topDir);
  200.       System.out.println(outStr);
  201.       assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
  202.       assertFalse(outStr.contains("OPENFORWRITE")); 
  203.       // Use -openforwrite option to list open files
  204.       outStr = runFsck(conf, 0, true, topDir, "-openforwrite");
  205.       System.out.println(outStr);
  206.       assertTrue(outStr.contains("OPENFORWRITE"));
  207.       assertTrue(outStr.contains("openFile"));
  208.       // Close the file
  209.       out.close(); 
  210.       // Now, fsck should show HEALTHY fs and should not show any open files
  211.       outStr = runFsck(conf, 0, true, topDir);
  212.       System.out.println(outStr);
  213.       assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
  214.       assertFalse(outStr.contains("OPENFORWRITE"));
  215.       util.cleanup(fs, topDir);
  216.       if (fs != null) {try{fs.close();} catch(Exception e){}}
  217.       cluster.shutdown();
  218.     } finally {
  219.       if (fs != null) {try{fs.close();} catch(Exception e){}}
  220.       if (cluster != null) { cluster.shutdown(); }
  221.     }
  222.   }
  223.   public void testCorruptBlock() throws Exception {
  224.     Configuration conf = new Configuration();
  225.     conf.setLong("dfs.blockreport.intervalMsec", 1000);
  226.     FileSystem fs = null;
  227.     DFSClient dfsClient = null;
  228.     LocatedBlocks blocks = null;
  229.     int replicaCount = 0;
  230.     Random random = new Random();
  231.     String outStr = null;
  232.     MiniDFSCluster cluster = null;
  233.     try {
  234.     cluster = new MiniDFSCluster(conf, 3, true, null);
  235.     cluster.waitActive();
  236.     fs = cluster.getFileSystem();
  237.     Path file1 = new Path("/testCorruptBlock");
  238.     DFSTestUtil.createFile(fs, file1, 1024, (short)3, 0);
  239.     // Wait until file replication has completed
  240.     DFSTestUtil.waitReplication(fs, file1, (short)3);
  241.     String block = DFSTestUtil.getFirstBlock(fs, file1).getBlockName();
  242.     // Make sure filesystem is in healthy state
  243.     outStr = runFsck(conf, 0, true, "/");
  244.     System.out.println(outStr);
  245.     assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
  246.     
  247.     // corrupt replicas 
  248.     File baseDir = new File(System.getProperty("test.build.data",
  249.                                                "build/test/data"),"dfs/data");
  250.     for (int i=0; i < 6; i++) {
  251.       File blockFile = new File(baseDir, "data" + (i+1) + "/current/" +
  252.                                 block);
  253.       if (blockFile.exists()) {
  254.         RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
  255.         FileChannel channel = raFile.getChannel();
  256.         String badString = "BADBAD";
  257.         int rand = random.nextInt((int)channel.size()/2);
  258.         raFile.seek(rand);
  259.         raFile.write(badString.getBytes());
  260.         raFile.close();
  261.       }
  262.     }
  263.     // Read the file to trigger reportBadBlocks
  264.     try {
  265.       IOUtils.copyBytes(fs.open(file1), new IOUtils.NullOutputStream(), conf,
  266.                         true);
  267.     } catch (IOException ie) {
  268.       // Ignore exception
  269.     }
  270.     dfsClient = new DFSClient(new InetSocketAddress("localhost",
  271.                                cluster.getNameNodePort()), conf);
  272.     blocks = dfsClient.namenode.
  273.                getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
  274.     replicaCount = blocks.get(0).getLocations().length;
  275.     while (replicaCount != 3) {
  276.       try {
  277.         Thread.sleep(100);
  278.       } catch (InterruptedException ignore) {
  279.       }
  280.       blocks = dfsClient.namenode.
  281.                 getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
  282.       replicaCount = blocks.get(0).getLocations().length;
  283.     }
  284.     assertTrue (blocks.get(0).isCorrupt());
  285.     // Check if fsck reports the same
  286.     outStr = runFsck(conf, 1, true, "/");
  287.     System.out.println(outStr);
  288.     assertTrue(outStr.contains(NamenodeFsck.CORRUPT_STATUS));
  289.     assertTrue(outStr.contains("testCorruptBlock"));
  290.     } finally {
  291.       if (cluster != null) {cluster.shutdown();}
  292.     }
  293.   }
  294.   
  295.   /** Test if fsck can return -1 in case of failure
  296.    * 
  297.    * @throws Exception
  298.    */
  299.   public void testFsckError() throws Exception {
  300.     MiniDFSCluster cluster = null;
  301.     try {
  302.       // bring up a one-node cluster
  303.       Configuration conf = new Configuration();
  304.       cluster = new MiniDFSCluster(conf, 1, true, null);
  305.       String fileName = "/test.txt";
  306.       Path filePath = new Path(fileName);
  307.       FileSystem fs = cluster.getFileSystem();
  308.       
  309.       // create a one-block file
  310.       DFSTestUtil.createFile(fs, filePath, 1L, (short)1, 1L);
  311.       DFSTestUtil.waitReplication(fs, filePath, (short)1);
  312.       
  313.       // intentionally corrupt NN data structure
  314.       INodeFile node = (INodeFile)cluster.getNameNode().namesystem.dir.rootDir.getNode(fileName);
  315.       assertEquals(node.blocks.length, 1);
  316.       node.blocks[0].setNumBytes(-1L);  // set the block length to be negative
  317.       
  318.       // run fsck and expect a failure with -1 as the error code
  319.       String outStr = runFsck(conf, -1, true, fileName);
  320.       System.out.println(outStr);
  321.       assertTrue(outStr.contains(NamenodeFsck.FAILURE_STATUS));
  322.       
  323.       // clean up file system
  324.       fs.delete(filePath, true);
  325.     } finally {
  326.       if (cluster != null) {cluster.shutdown();}
  327.     }
  328.   }
  329. }