TestDFSIO.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:14k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.fs;
  19. import java.io.*;
  20. import junit.framework.TestCase;
  21. import java.util.Date;
  22. import java.util.StringTokenizer;
  23. import org.apache.commons.logging.*;
  24. import org.apache.hadoop.mapred.*;
  25. import org.apache.hadoop.util.StringUtils;
  26. import org.apache.hadoop.io.*;
  27. import org.apache.hadoop.io.SequenceFile.CompressionType;
  28. import org.apache.hadoop.conf.*;
  29. /**
  30.  * Distributed i/o benchmark.
  31.  * <p>
  32.  * This test writes into or reads from a specified number of files.
  33.  * File size is specified as a parameter to the test. 
  34.  * Each file is accessed in a separate map task.
  35.  * <p>
  36.  * The reducer collects the following statistics:
  37.  * <ul>
  38.  * <li>number of tasks completed</li>
  39.  * <li>number of bytes written/read</li>
  40.  * <li>execution time</li>
  41.  * <li>io rate</li>
  42.  * <li>io rate squared</li>
  43.  * </ul>
  44.  *    
  45.  * Finally, the following information is appended to a local file
  46.  * <ul>
  47.  * <li>read or write test</li>
  48.  * <li>date and time the test finished</li>   
  49.  * <li>number of files</li>
  50.  * <li>total number of bytes processed</li>
  51.  * <li>throughput in mb/sec (total number of bytes / sum of processing times)</li>
  52.  * <li>average i/o rate in mb/sec per file</li>
  53.  * <li>standard deviation of i/o rate </li>
  54.  * </ul>
  55.  */
  56. public class TestDFSIO extends TestCase {
  57.   // Constants
  58.   private static final int TEST_TYPE_READ = 0;
  59.   private static final int TEST_TYPE_WRITE = 1;
  60.   private static final int TEST_TYPE_CLEANUP = 2;
  61.   private static final int DEFAULT_BUFFER_SIZE = 1000000;
  62.   private static final String BASE_FILE_NAME = "test_io_";
  63.   private static final String DEFAULT_RES_FILE_NAME = "TestDFSIO_results.log";
  64.   
  65.   private static final Log LOG = FileInputFormat.LOG;
  66.   private static Configuration fsConfig = new Configuration();
  67.   private static final long MEGA = 0x100000;
  68.   private static String TEST_ROOT_DIR = System.getProperty("test.build.data","/benchmarks/TestDFSIO");
  69.   private static Path CONTROL_DIR = new Path(TEST_ROOT_DIR, "io_control");
  70.   private static Path WRITE_DIR = new Path(TEST_ROOT_DIR, "io_write");
  71.   private static Path READ_DIR = new Path(TEST_ROOT_DIR, "io_read");
  72.   private static Path DATA_DIR = new Path(TEST_ROOT_DIR, "io_data");
  73.   /**
  74.    * Run the test with default parameters.
  75.    * 
  76.    * @throws Exception
  77.    */
  78.   public void testIOs() throws Exception {
  79.     testIOs(10, 10);
  80.   }
  81.   /**
  82.    * Run the test with the specified parameters.
  83.    * 
  84.    * @param fileSize file size
  85.    * @param nrFiles number of files
  86.    * @throws IOException
  87.    */
  88.   public static void testIOs(int fileSize, int nrFiles)
  89.     throws IOException {
  90.     FileSystem fs = FileSystem.get(fsConfig);
  91.     createControlFile(fs, fileSize, nrFiles);
  92.     writeTest(fs);
  93.     readTest(fs);
  94.     cleanup(fs);
  95.   }
  96.   private static void createControlFile(
  97.                                         FileSystem fs,
  98.                                         int fileSize, // in MB 
  99.                                         int nrFiles
  100.                                         ) throws IOException {
  101.     LOG.info("creating control file: "+fileSize+" mega bytes, "+nrFiles+" files");
  102.     fs.delete(CONTROL_DIR, true);
  103.     for(int i=0; i < nrFiles; i++) {
  104.       String name = getFileName(i);
  105.       Path controlFile = new Path(CONTROL_DIR, "in_file_" + name);
  106.       SequenceFile.Writer writer = null;
  107.       try {
  108.         writer = SequenceFile.createWriter(fs, fsConfig, controlFile,
  109.                                            UTF8.class, LongWritable.class,
  110.                                            CompressionType.NONE);
  111.         writer.append(new UTF8(name), new LongWritable(fileSize));
  112.       } catch(Exception e) {
  113.         throw new IOException(e.getLocalizedMessage());
  114.       } finally {
  115.      if (writer != null)
  116.           writer.close();
  117.      writer = null;
  118.       }
  119.     }
  120.     LOG.info("created control files for: "+nrFiles+" files");
  121.   }
  122.   private static String getFileName(int fIdx) {
  123.     return BASE_FILE_NAME + Integer.toString(fIdx);
  124.   }
  125.   
  126.   /**
  127.    * Write/Read mapper base class.
  128.    * <p>
  129.    * Collects the following statistics per task:
  130.    * <ul>
  131.    * <li>number of tasks completed</li>
  132.    * <li>number of bytes written/read</li>
  133.    * <li>execution time</li>
  134.    * <li>i/o rate</li>
  135.    * <li>i/o rate squared</li>
  136.    * </ul>
  137.    */
  138.   private abstract static class IOStatMapper extends IOMapperBase {
  139.     IOStatMapper() { 
  140.       super(fsConfig);
  141.     }
  142.     
  143.     void collectStats(OutputCollector<UTF8, UTF8> output, 
  144.                       String name,
  145.                       long execTime, 
  146.                       Object objSize) throws IOException {
  147.       long totalSize = ((Long)objSize).longValue();
  148.       float ioRateMbSec = (float)totalSize * 1000 / (execTime * MEGA);
  149.       LOG.info("Number of bytes processed = " + totalSize);
  150.       LOG.info("Exec time = " + execTime);
  151.       LOG.info("IO rate = " + ioRateMbSec);
  152.       
  153.       output.collect(new UTF8("l:tasks"), new UTF8(String.valueOf(1)));
  154.       output.collect(new UTF8("l:size"), new UTF8(String.valueOf(totalSize)));
  155.       output.collect(new UTF8("l:time"), new UTF8(String.valueOf(execTime)));
  156.       output.collect(new UTF8("f:rate"), new UTF8(String.valueOf(ioRateMbSec*1000)));
  157.       output.collect(new UTF8("f:sqrate"), new UTF8(String.valueOf(ioRateMbSec*ioRateMbSec*1000)));
  158.     }
  159.   }
  160.   /**
  161.    * Write mapper class.
  162.    */
  163.   public static class WriteMapper extends IOStatMapper {
  164.     public WriteMapper() { 
  165.       super(); 
  166.       for(int i=0; i < bufferSize; i++)
  167.         buffer[i] = (byte)('0' + i % 50);
  168.     }
  169.     public Object doIO(Reporter reporter, 
  170.                        String name, 
  171.                        long totalSize 
  172.                        ) throws IOException {
  173.       // create file
  174.       totalSize *= MEGA;
  175.       OutputStream out;
  176.       out = fs.create(new Path(DATA_DIR, name), true, bufferSize);
  177.       
  178.       try {
  179.         // write to the file
  180.         long nrRemaining;
  181.         for (nrRemaining = totalSize; nrRemaining > 0; nrRemaining -= bufferSize) {
  182.           int curSize = (bufferSize < nrRemaining) ? bufferSize : (int)nrRemaining; 
  183.           out.write(buffer, 0, curSize);
  184.           reporter.setStatus("writing " + name + "@" + 
  185.                              (totalSize - nrRemaining) + "/" + totalSize 
  186.                              + " ::host = " + hostName);
  187.         }
  188.       } finally {
  189.         out.close();
  190.       }
  191.       return new Long(totalSize);
  192.     }
  193.   }
  194.   private static void writeTest(FileSystem fs)
  195.     throws IOException {
  196.     fs.delete(DATA_DIR, true);
  197.     fs.delete(WRITE_DIR, true);
  198.     
  199.     runIOTest(WriteMapper.class, WRITE_DIR);
  200.   }
  201.   
  202.   private static void runIOTest( Class<? extends Mapper> mapperClass, 
  203.                                  Path outputDir
  204.                                  ) throws IOException {
  205.     JobConf job = new JobConf(fsConfig, TestDFSIO.class);
  206.     FileInputFormat.setInputPaths(job, CONTROL_DIR);
  207.     job.setInputFormat(SequenceFileInputFormat.class);
  208.     job.setMapperClass(mapperClass);
  209.     job.setReducerClass(AccumulatingReducer.class);
  210.     FileOutputFormat.setOutputPath(job, outputDir);
  211.     job.setOutputKeyClass(UTF8.class);
  212.     job.setOutputValueClass(UTF8.class);
  213.     job.setNumReduceTasks(1);
  214.     JobClient.runJob(job);
  215.   }
  216.   /**
  217.    * Read mapper class.
  218.    */
  219.   public static class ReadMapper extends IOStatMapper {
  220.     public ReadMapper() { 
  221.       super(); 
  222.     }
  223.     public Object doIO(Reporter reporter, 
  224.                        String name, 
  225.                        long totalSize 
  226.                        ) throws IOException {
  227.       totalSize *= MEGA;
  228.       // open file
  229.       DataInputStream in = fs.open(new Path(DATA_DIR, name));
  230.       try {
  231.         long actualSize = 0;
  232.         for(int curSize = bufferSize; curSize == bufferSize;) {
  233.           curSize = in.read(buffer, 0, bufferSize);
  234.           actualSize += curSize;
  235.           reporter.setStatus("reading " + name + "@" + 
  236.                              actualSize + "/" + totalSize 
  237.                              + " ::host = " + hostName);
  238.         }
  239.       } finally {
  240.         in.close();
  241.       }
  242.       return new Long(totalSize);
  243.     }
  244.   }
  245.   private static void readTest(FileSystem fs) throws IOException {
  246.     fs.delete(READ_DIR, true);
  247.     runIOTest(ReadMapper.class, READ_DIR);
  248.   }
  249.   private static void sequentialTest(
  250.                                      FileSystem fs, 
  251.                                      int testType, 
  252.                                      int fileSize, 
  253.                                      int nrFiles
  254.                                      ) throws Exception {
  255.     IOStatMapper ioer = null;
  256.     if (testType == TEST_TYPE_READ)
  257.       ioer = new ReadMapper();
  258.     else if (testType == TEST_TYPE_WRITE)
  259.       ioer = new WriteMapper();
  260.     else
  261.       return;
  262.     for(int i=0; i < nrFiles; i++)
  263.       ioer.doIO(Reporter.NULL,
  264.                 BASE_FILE_NAME+Integer.toString(i), 
  265.                 MEGA*fileSize);
  266.   }
  267.   public static void main(String[] args) {
  268.     int testType = TEST_TYPE_READ;
  269.     int bufferSize = DEFAULT_BUFFER_SIZE;
  270.     int fileSize = 1;
  271.     int nrFiles = 1;
  272.     String resFileName = DEFAULT_RES_FILE_NAME;
  273.     boolean isSequential = false;
  274.     String version="TestFDSIO.0.0.4";
  275.     String usage = "Usage: TestFDSIO -read | -write | -clean [-nrFiles N] [-fileSize MB] [-resFile resultFileName] [-bufferSize Bytes] ";
  276.     
  277.     System.out.println(version);
  278.     if (args.length == 0) {
  279.       System.err.println(usage);
  280.       System.exit(-1);
  281.     }
  282.     for (int i = 0; i < args.length; i++) {       // parse command line
  283.       if (args[i].startsWith("-read")) {
  284.         testType = TEST_TYPE_READ;
  285.       } else if (args[i].equals("-write")) {
  286.         testType = TEST_TYPE_WRITE;
  287.       } else if (args[i].equals("-clean")) {
  288.         testType = TEST_TYPE_CLEANUP;
  289.       } else if (args[i].startsWith("-seq")) {
  290.         isSequential = true;
  291.       } else if (args[i].equals("-nrFiles")) {
  292.         nrFiles = Integer.parseInt(args[++i]);
  293.       } else if (args[i].equals("-fileSize")) {
  294.         fileSize = Integer.parseInt(args[++i]);
  295.       } else if (args[i].equals("-bufferSize")) {
  296.         bufferSize = Integer.parseInt(args[++i]);
  297.       } else if (args[i].equals("-resFile")) {
  298.         resFileName = args[++i];
  299.       }
  300.     }
  301.     LOG.info("nrFiles = " + nrFiles);
  302.     LOG.info("fileSize (MB) = " + fileSize);
  303.     LOG.info("bufferSize = " + bufferSize);
  304.   
  305.     try {
  306.       fsConfig.setInt("test.io.file.buffer.size", bufferSize);
  307.       FileSystem fs = FileSystem.get(fsConfig);
  308.       if (isSequential) {
  309.         long tStart = System.currentTimeMillis();
  310.         sequentialTest(fs, testType, fileSize, nrFiles);
  311.         long execTime = System.currentTimeMillis() - tStart;
  312.         String resultLine = "Seq Test exec time sec: " + (float)execTime / 1000;
  313.         LOG.info(resultLine);
  314.         return;
  315.       }
  316.       if (testType == TEST_TYPE_CLEANUP) {
  317.         cleanup(fs);
  318.         return;
  319.       }
  320.       createControlFile(fs, fileSize, nrFiles);
  321.       long tStart = System.currentTimeMillis();
  322.       if (testType == TEST_TYPE_WRITE)
  323.         writeTest(fs);
  324.       if (testType == TEST_TYPE_READ)
  325.         readTest(fs);
  326.       long execTime = System.currentTimeMillis() - tStart;
  327.     
  328.       analyzeResult(fs, testType, execTime, resFileName);
  329.     } catch(Exception e) {
  330.       System.err.print(StringUtils.stringifyException(e));
  331.       System.exit(-1);
  332.     }
  333.   }
  334.   
  335.   private static void analyzeResult( FileSystem fs, 
  336.                                      int testType,
  337.                                      long execTime,
  338.                                      String resFileName
  339.                                      ) throws IOException {
  340.     Path reduceFile;
  341.     if (testType == TEST_TYPE_WRITE)
  342.       reduceFile = new Path(WRITE_DIR, "part-00000");
  343.     else
  344.       reduceFile = new Path(READ_DIR, "part-00000");
  345.     DataInputStream in;
  346.     in = new DataInputStream(fs.open(reduceFile));
  347.   
  348.     BufferedReader lines;
  349.     lines = new BufferedReader(new InputStreamReader(in));
  350.     long tasks = 0;
  351.     long size = 0;
  352.     long time = 0;
  353.     float rate = 0;
  354.     float sqrate = 0;
  355.     String line;
  356.     while((line = lines.readLine()) != null) {
  357.       StringTokenizer tokens = new StringTokenizer(line, " tnrf%");
  358.       String attr = tokens.nextToken(); 
  359.       if (attr.endsWith(":tasks"))
  360.         tasks = Long.parseLong(tokens.nextToken());
  361.       else if (attr.endsWith(":size"))
  362.         size = Long.parseLong(tokens.nextToken());
  363.       else if (attr.endsWith(":time"))
  364.         time = Long.parseLong(tokens.nextToken());
  365.       else if (attr.endsWith(":rate"))
  366.         rate = Float.parseFloat(tokens.nextToken());
  367.       else if (attr.endsWith(":sqrate"))
  368.         sqrate = Float.parseFloat(tokens.nextToken());
  369.     }
  370.     
  371.     double med = rate / 1000 / tasks;
  372.     double stdDev = Math.sqrt(Math.abs(sqrate / 1000 / tasks - med*med));
  373.     String resultLines[] = {
  374.       "----- TestDFSIO ----- : " + ((testType == TEST_TYPE_WRITE) ? "write" :
  375.                                     (testType == TEST_TYPE_READ) ? "read" : 
  376.                                     "unknown"),
  377.       "           Date & time: " + new Date(System.currentTimeMillis()),
  378.       "       Number of files: " + tasks,
  379.       "Total MBytes processed: " + size/MEGA,
  380.       "     Throughput mb/sec: " + size * 1000.0 / (time * MEGA),
  381.       "Average IO rate mb/sec: " + med,
  382.       " IO rate std deviation: " + stdDev,
  383.       "    Test exec time sec: " + (float)execTime / 1000,
  384.       "" };
  385.     PrintStream res = new PrintStream(
  386.                                       new FileOutputStream(
  387.                                                            new File(resFileName), true)); 
  388.     for(int i = 0; i < resultLines.length; i++) {
  389.       LOG.info(resultLines[i]);
  390.       res.println(resultLines[i]);
  391.     }
  392.   }
  393.   private static void cleanup(FileSystem fs) throws IOException {
  394.     LOG.info("Cleaning up test files");
  395.     fs.delete(new Path(TEST_ROOT_DIR), true);
  396.   }
  397. }