DistributedFSCheck.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:11k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.fs;
  19. import java.io.*;
  20. import junit.framework.TestCase;
  21. import java.util.Date;
  22. import java.util.StringTokenizer;
  23. import java.util.TreeSet;
  24. import java.util.Vector;
  25. import org.apache.commons.logging.*;
  26. import org.apache.hadoop.mapred.*;
  27. import org.apache.hadoop.io.*;
  28. import org.apache.hadoop.io.SequenceFile.CompressionType;
  29. import org.apache.hadoop.conf.*;
  30. /**
  31.  * Distributed checkup of the file system consistency.
  32.  * <p>
  33.  * Test file system consistency by reading each block of each file
  34.  * of the specified file tree. 
  35.  * Report corrupted blocks and general file statistics.
  36.  * <p>
  37.  * Optionally displays statistics on read performance.
  38.  * 
  39.  */
  40. public class DistributedFSCheck extends TestCase {
  41.   // Constants
  42.   private static final int TEST_TYPE_READ = 0;
  43.   private static final int TEST_TYPE_CLEANUP = 2;
  44.   private static final int DEFAULT_BUFFER_SIZE = 1000000;
  45.   private static final String DEFAULT_RES_FILE_NAME = "DistributedFSCheck_results.log";
  46.   private static final long MEGA = 0x100000;
  47.   
  48.   private static Configuration fsConfig = new Configuration();
  49.   private static final Log LOG = FileInputFormat.LOG;
  50.   private static Path TEST_ROOT_DIR = new Path(System.getProperty("test.build.data","/benchmarks/DistributedFSCheck"));
  51.   private static Path MAP_INPUT_DIR = new Path(TEST_ROOT_DIR, "map_input");
  52.   private static Path READ_DIR = new Path(TEST_ROOT_DIR, "io_read");
  53.   private FileSystem fs;
  54.   private long nrFiles;
  55.   
  56.   DistributedFSCheck(Configuration conf) throws Exception {
  57.     fsConfig = conf;
  58.     this.fs = FileSystem.get(conf);
  59.   }
  60.   /**
  61.    * Run distributed checkup for the entire files system.
  62.    * 
  63.    * @throws Exception
  64.    */
  65.   public void testFSBlocks() throws Exception {
  66.     testFSBlocks("/");
  67.   }
  68.   /**
  69.    * Run distributed checkup for the specified directory.
  70.    * 
  71.    * @param rootName root directory name
  72.    * @throws Exception
  73.    */
  74.   public void testFSBlocks(String rootName) throws Exception {
  75.     createInputFile(rootName);
  76.     runDistributedFSCheck();
  77.     cleanup();  // clean up after all to restore the system state
  78.   }
  79.   private void createInputFile(String rootName) throws IOException {
  80.     cleanup();  // clean up if previous run failed
  81.     Path inputFile = new Path(MAP_INPUT_DIR, "in_file");
  82.     SequenceFile.Writer writer =
  83.       SequenceFile.createWriter(fs, fsConfig, inputFile, 
  84.                                 UTF8.class, LongWritable.class, CompressionType.NONE);
  85.     
  86.     try {
  87.       nrFiles = 0;
  88.       listSubtree(new Path(rootName), writer);
  89.     } finally {
  90.       writer.close();
  91.     }
  92.     LOG.info("Created map input files.");
  93.   }
  94.   
  95.   private void listSubtree(Path rootFile,
  96.                            SequenceFile.Writer writer
  97.                            ) throws IOException {
  98.     if (!fs.isDirectory(rootFile)) {
  99.       nrFiles++;
  100.       // For a regular file generate <fName,offset> pairs
  101.       long blockSize = fs.getDefaultBlockSize();
  102.       long fileLength = fs.getLength(rootFile);
  103.       for(long offset = 0; offset < fileLength; offset += blockSize)
  104.         writer.append(new UTF8(rootFile.toString()), new LongWritable(offset));
  105.       return;
  106.     }
  107.     
  108.     FileStatus children[] = fs.listStatus(rootFile);
  109.     if (children == null)
  110.       throw new IOException("Could not get listing for " + rootFile);
  111.     for (int i = 0; i < children.length; i++)
  112.       listSubtree(children[i].getPath(), writer);
  113.   }
  114.   /**
  115.    * DistributedFSCheck mapper class.
  116.    */
  117.   public static class DistributedFSCheckMapper extends IOMapperBase {
  118.     public DistributedFSCheckMapper() { 
  119.       super(fsConfig); 
  120.     }
  121.     public Object doIO(Reporter reporter, 
  122.                        String name, 
  123.                        long offset 
  124.                        ) throws IOException {
  125.       // open file
  126.       FSDataInputStream in = null;
  127.       try {
  128.         in = fs.open(new Path(name));
  129.       } catch(IOException e) {
  130.         return name + "@(missing)";
  131.       }
  132.       in.seek(offset);
  133.       long actualSize = 0;
  134.       try {
  135.         long blockSize = fs.getDefaultBlockSize();
  136.         reporter.setStatus("reading " + name + "@" + 
  137.                            offset + "/" + blockSize);
  138.         for( int curSize = bufferSize; 
  139.              curSize == bufferSize && actualSize < blockSize;
  140.              actualSize += curSize) {
  141.           curSize = in.read(buffer, 0, bufferSize);
  142.         }
  143.       } catch(IOException e) {
  144.         LOG.info("Corrupted block detected in "" + name + "" at " + offset);
  145.         return name + "@" + offset;
  146.       } finally {
  147.         in.close();
  148.       }
  149.       return new Long(actualSize);
  150.     }
  151.     
  152.     void collectStats(OutputCollector<UTF8, UTF8> output, 
  153.                       String name, 
  154.                       long execTime, 
  155.                       Object corruptedBlock) throws IOException {
  156.       output.collect(new UTF8("l:blocks"), new UTF8(String.valueOf(1)));
  157.       if (corruptedBlock.getClass().getName().endsWith("String")) {
  158.         output.collect(new UTF8("s:badBlocks"), new UTF8((String)corruptedBlock));
  159.         return;
  160.       }
  161.       long totalSize = ((Long)corruptedBlock).longValue();
  162.       float ioRateMbSec = (float)totalSize * 1000 / (execTime * 0x100000);
  163.       LOG.info("Number of bytes processed = " + totalSize);
  164.       LOG.info("Exec time = " + execTime);
  165.       LOG.info("IO rate = " + ioRateMbSec);
  166.       
  167.       output.collect(new UTF8("l:size"), new UTF8(String.valueOf(totalSize)));
  168.       output.collect(new UTF8("l:time"), new UTF8(String.valueOf(execTime)));
  169.       output.collect(new UTF8("f:rate"), new UTF8(String.valueOf(ioRateMbSec*1000)));
  170.     }
  171.   }
  172.   
  173.   private void runDistributedFSCheck() throws Exception {
  174.     JobConf job = new JobConf(fs.getConf(), DistributedFSCheck.class);
  175.     FileInputFormat.setInputPaths(job, MAP_INPUT_DIR);
  176.     job.setInputFormat(SequenceFileInputFormat.class);
  177.     job.setMapperClass(DistributedFSCheckMapper.class);
  178.     job.setReducerClass(AccumulatingReducer.class);
  179.     FileOutputFormat.setOutputPath(job, READ_DIR);
  180.     job.setOutputKeyClass(UTF8.class);
  181.     job.setOutputValueClass(UTF8.class);
  182.     job.setNumReduceTasks(1);
  183.     JobClient.runJob(job);
  184.   }
  185.   public static void main(String[] args) throws Exception {
  186.     int testType = TEST_TYPE_READ;
  187.     int bufferSize = DEFAULT_BUFFER_SIZE;
  188.     String resFileName = DEFAULT_RES_FILE_NAME;
  189.     String rootName = "/";
  190.     boolean viewStats = false;
  191.     String usage = "Usage: DistributedFSCheck [-root name] [-clean] [-resFile resultFileName] [-bufferSize Bytes] [-stats] ";
  192.     
  193.     if (args.length == 1 && args[0].startsWith("-h")) {
  194.       System.err.println(usage);
  195.       System.exit(-1);
  196.     }
  197.     for(int i = 0; i < args.length; i++) {       // parse command line
  198.       if (args[i].equals("-root")) {
  199.         rootName = args[++i];
  200.       } else if (args[i].startsWith("-clean")) {
  201.         testType = TEST_TYPE_CLEANUP;
  202.       } else if (args[i].equals("-bufferSize")) {
  203.         bufferSize = Integer.parseInt(args[++i]);
  204.       } else if (args[i].equals("-resFile")) {
  205.         resFileName = args[++i];
  206.       } else if (args[i].startsWith("-stat")) {
  207.         viewStats = true;
  208.       }
  209.     }
  210.     LOG.info("root = " + rootName);
  211.     LOG.info("bufferSize = " + bufferSize);
  212.   
  213.     Configuration conf = new Configuration();  
  214.     conf.setInt("test.io.file.buffer.size", bufferSize);
  215.     DistributedFSCheck test = new DistributedFSCheck(conf);
  216.     if (testType == TEST_TYPE_CLEANUP) {
  217.       test.cleanup();
  218.       return;
  219.     }
  220.     test.createInputFile(rootName);
  221.     long tStart = System.currentTimeMillis();
  222.     test.runDistributedFSCheck();
  223.     long execTime = System.currentTimeMillis() - tStart;
  224.     
  225.     test.analyzeResult(execTime, resFileName, viewStats);
  226.     // test.cleanup();  // clean up after all to restore the system state
  227.   }
  228.   
  229.   private void analyzeResult(long execTime,
  230.                              String resFileName,
  231.                              boolean viewStats
  232.                              ) throws IOException {
  233.     Path reduceFile= new Path(READ_DIR, "part-00000");
  234.     DataInputStream in;
  235.     in = new DataInputStream(fs.open(reduceFile));
  236.   
  237.     BufferedReader lines;
  238.     lines = new BufferedReader(new InputStreamReader(in));
  239.     long blocks = 0;
  240.     long size = 0;
  241.     long time = 0;
  242.     float rate = 0;
  243.     StringTokenizer  badBlocks = null;
  244.     long nrBadBlocks = 0;
  245.     String line;
  246.     while((line = lines.readLine()) != null) {
  247.       StringTokenizer tokens = new StringTokenizer(line, " tnrf%");
  248.       String attr = tokens.nextToken(); 
  249.       if (attr.endsWith("blocks"))
  250.         blocks = Long.parseLong(tokens.nextToken());
  251.       else if (attr.endsWith("size"))
  252.         size = Long.parseLong(tokens.nextToken());
  253.       else if (attr.endsWith("time"))
  254.         time = Long.parseLong(tokens.nextToken());
  255.       else if (attr.endsWith("rate"))
  256.         rate = Float.parseFloat(tokens.nextToken());
  257.       else if (attr.endsWith("badBlocks")) {
  258.         badBlocks = new StringTokenizer(tokens.nextToken(), ";");
  259.         nrBadBlocks = badBlocks.countTokens();
  260.       }
  261.     }
  262.     
  263.     Vector<String> resultLines = new Vector<String>();
  264.     resultLines.add( "----- DistributedFSCheck ----- : ");
  265.     resultLines.add( "               Date & time: " + new Date(System.currentTimeMillis()));
  266.     resultLines.add( "    Total number of blocks: " + blocks);
  267.     resultLines.add( "    Total number of  files: " + nrFiles);
  268.     resultLines.add( "Number of corrupted blocks: " + nrBadBlocks);
  269.     
  270.     int nrBadFilesPos = resultLines.size();
  271.     TreeSet<String> badFiles = new TreeSet<String>();
  272.     long nrBadFiles = 0;
  273.     if (nrBadBlocks > 0) {
  274.       resultLines.add("");
  275.       resultLines.add("----- Corrupted Blocks (file@offset) ----- : ");
  276.       while(badBlocks.hasMoreTokens()) {
  277.         String curBlock = badBlocks.nextToken();
  278.         resultLines.add(curBlock);
  279.         badFiles.add(curBlock.substring(0, curBlock.indexOf('@')));
  280.       }
  281.       nrBadFiles = badFiles.size();
  282.     }
  283.     
  284.     resultLines.insertElementAt(" Number of corrupted files: " + nrBadFiles, nrBadFilesPos);
  285.     
  286.     if (viewStats) {
  287.       resultLines.add("");
  288.       resultLines.add("-----   Performance  ----- : ");
  289.       resultLines.add("         Total MBytes read: " + size/MEGA);
  290.       resultLines.add("         Throughput mb/sec: " + (float)size * 1000.0 / (time * MEGA));
  291.       resultLines.add("    Average IO rate mb/sec: " + rate / 1000 / blocks);
  292.       resultLines.add("        Test exec time sec: " + (float)execTime / 1000);
  293.     }
  294.     PrintStream res = new PrintStream(
  295.                                       new FileOutputStream(
  296.                                                            new File(resFileName), true)); 
  297.     for(int i = 0; i < resultLines.size(); i++) {
  298.       String cur = resultLines.get(i);
  299.       LOG.info(cur);
  300.       res.println(cur);
  301.     }
  302.   }
  303.   private void cleanup() throws IOException {
  304.     LOG.info("Cleaning up test files");
  305.     fs.delete(TEST_ROOT_DIR, true);
  306.   }
  307. }