DFSCIOTest.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:19k
源码类别:

网格计算

开发平台:

Java

  1.  /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.fs;
  19. import java.io.*;
  20. import junit.framework.TestCase;
  21. import java.util.Date;
  22. import java.util.StringTokenizer;
  23. import org.apache.commons.logging.*;
  24. import org.apache.hadoop.mapred.*;
  25. import org.apache.hadoop.io.*;
  26. import org.apache.hadoop.io.SequenceFile.CompressionType;
  27. import org.apache.hadoop.conf.*;
  28. /**
  29.  * Distributed i/o benchmark.
  30.  * <p>
  31.  * This test writes into or reads from a specified number of files.
  32.  * File size is specified as a parameter to the test. 
  33.  * Each file is accessed in a separate map task.
  34.  * <p>
  35.  * The reducer collects the following statistics:
  36.  * <ul>
  37.  * <li>number of tasks completed</li>
  38.  * <li>number of bytes written/read</li>
  39.  * <li>execution time</li>
  40.  * <li>io rate</li>
  41.  * <li>io rate squared</li>
  42.  * </ul>
  43.  *    
  44.  * Finally, the following information is appended to a local file
  45.  * <ul>
  46.  * <li>read or write test</li>
  47.  * <li>date and time the test finished</li>   
  48.  * <li>number of files</li>
  49.  * <li>total number of bytes processed</li>
  50.  * <li>throughput in mb/sec (total number of bytes / sum of processing times)</li>
  51.  * <li>average i/o rate in mb/sec per file</li>
  52.  * <li>standard i/o rate deviation</li>
  53.  * </ul>
  54.  */
  55. public class DFSCIOTest extends TestCase {
  56.   // Constants
  57.   private static final int TEST_TYPE_READ = 0;
  58.   private static final int TEST_TYPE_WRITE = 1;
  59.   private static final int TEST_TYPE_CLEANUP = 2;
  60.   private static final int DEFAULT_BUFFER_SIZE = 1000000;
  61.   private static final String BASE_FILE_NAME = "test_io_";
  62.   private static final String DEFAULT_RES_FILE_NAME = "DFSCIOTest_results.log";
  63.   
  64.   private static final Log LOG = FileInputFormat.LOG;
  65.   private static Configuration fsConfig = new Configuration();
  66.   private static final long MEGA = 0x100000;
  67.   private static String TEST_ROOT_DIR = System.getProperty("test.build.data","/benchmarks/DFSCIOTest");
  68.   private static Path CONTROL_DIR = new Path(TEST_ROOT_DIR, "io_control");
  69.   private static Path WRITE_DIR = new Path(TEST_ROOT_DIR, "io_write");
  70.   private static Path READ_DIR = new Path(TEST_ROOT_DIR, "io_read");
  71.   private static Path DATA_DIR = new Path(TEST_ROOT_DIR, "io_data");
  72.   private static Path HDFS_TEST_DIR = new Path("/tmp/DFSCIOTest");
  73.   private static String HDFS_LIB_VERSION = System.getProperty("libhdfs.version", "1");
  74.   private static String CHMOD = new String("chmod");
  75.   private static Path HDFS_SHLIB = new Path(HDFS_TEST_DIR + "/libhdfs.so." + HDFS_LIB_VERSION);
  76.   private static Path HDFS_READ = new Path(HDFS_TEST_DIR + "/hdfs_read");
  77.   private static Path HDFS_WRITE = new Path(HDFS_TEST_DIR + "/hdfs_write");
  78.   /**
  79.    * Run the test with default parameters.
  80.    * 
  81.    * @throws Exception
  82.    */
  83.   public void testIOs() throws Exception {
  84.     testIOs(10, 10);
  85.   }
  86.   /**
  87.    * Run the test with the specified parameters.
  88.    * 
  89.    * @param fileSize file size
  90.    * @param nrFiles number of files
  91.    * @throws IOException
  92.    */
  93.   public static void testIOs(int fileSize, int nrFiles)
  94.     throws IOException {
  95.     FileSystem fs = FileSystem.get(fsConfig);
  96.     createControlFile(fs, fileSize, nrFiles);
  97.     writeTest(fs);
  98.     readTest(fs);
  99.   }
  100.   private static void createControlFile(
  101.                                         FileSystem fs,
  102.                                         int fileSize, // in MB 
  103.                                         int nrFiles
  104.                                         ) throws IOException {
  105.     LOG.info("creating control file: "+fileSize+" mega bytes, "+nrFiles+" files");
  106.     fs.delete(CONTROL_DIR, true);
  107.     for(int i=0; i < nrFiles; i++) {
  108.       String name = getFileName(i);
  109.       Path controlFile = new Path(CONTROL_DIR, "in_file_" + name);
  110.       SequenceFile.Writer writer = null;
  111.       try {
  112.         writer = SequenceFile.createWriter(fs, fsConfig, controlFile,
  113.                                            UTF8.class, LongWritable.class,
  114.                                            CompressionType.NONE);
  115.         writer.append(new UTF8(name), new LongWritable(fileSize));
  116.       } catch(Exception e) {
  117.         throw new IOException(e.getLocalizedMessage());
  118.       } finally {
  119.      if (writer != null)
  120.           writer.close();
  121.      writer = null;
  122.       }
  123.     }
  124.     LOG.info("created control files for: "+nrFiles+" files");
  125.   }
  126.   private static String getFileName(int fIdx) {
  127.     return BASE_FILE_NAME + Integer.toString(fIdx);
  128.   }
  129.   
  130.   /**
  131.    * Write/Read mapper base class.
  132.    * <p>
  133.    * Collects the following statistics per task:
  134.    * <ul>
  135.    * <li>number of tasks completed</li>
  136.    * <li>number of bytes written/read</li>
  137.    * <li>execution time</li>
  138.    * <li>i/o rate</li>
  139.    * <li>i/o rate squared</li>
  140.    * </ul>
  141.    */
  142.   private abstract static class IOStatMapper extends IOMapperBase {
  143.     IOStatMapper() { 
  144.       super(fsConfig);
  145.     }
  146.     
  147.     void collectStats(OutputCollector<UTF8, UTF8> output, 
  148.                       String name,
  149.                       long execTime, 
  150.                       Object objSize) throws IOException {
  151.       long totalSize = ((Long)objSize).longValue();
  152.       float ioRateMbSec = (float)totalSize * 1000 / (execTime * MEGA);
  153.       LOG.info("Number of bytes processed = " + totalSize);
  154.       LOG.info("Exec time = " + execTime);
  155.       LOG.info("IO rate = " + ioRateMbSec);
  156.       
  157.       output.collect(new UTF8("l:tasks"), new UTF8(String.valueOf(1)));
  158.       output.collect(new UTF8("l:size"), new UTF8(String.valueOf(totalSize)));
  159.       output.collect(new UTF8("l:time"), new UTF8(String.valueOf(execTime)));
  160.       output.collect(new UTF8("f:rate"), new UTF8(String.valueOf(ioRateMbSec*1000)));
  161.       output.collect(new UTF8("f:sqrate"), new UTF8(String.valueOf(ioRateMbSec*ioRateMbSec*1000)));
  162.     }
  163.   }
  164.   /**
  165.    * Write mapper class.
  166.    */
  167.   public static class WriteMapper extends IOStatMapper {
  168.     public WriteMapper() { 
  169.       super(); 
  170.       for(int i=0; i < bufferSize; i++)
  171.         buffer[i] = (byte)('0' + i % 50);
  172.     }
  173.     public Object doIO(Reporter reporter, 
  174.                        String name, 
  175.                        long totalSize 
  176.                        ) throws IOException {
  177.       // create file
  178.       totalSize *= MEGA;
  179.       
  180.       // create instance of local filesystem 
  181.       FileSystem localFS = FileSystem.getLocal(fsConfig);
  182.       
  183.       try {
  184.         // native runtime
  185.         Runtime runTime = Runtime.getRuntime();
  186.           
  187.         // copy the dso and executable from dfs and chmod them
  188.         synchronized (this) {
  189.           localFS.delete(HDFS_TEST_DIR, true);
  190.           if (!(localFS.mkdirs(HDFS_TEST_DIR))) {
  191.             throw new IOException("Failed to create " + HDFS_TEST_DIR + " on local filesystem");
  192.           }
  193.         }
  194.         
  195.         synchronized (this) {
  196.           if (!localFS.exists(HDFS_SHLIB)) {
  197.             FileUtil.copy(fs, HDFS_SHLIB, localFS, HDFS_SHLIB, false, fsConfig);
  198.             String chmodCmd = new String(CHMOD + " a+x " + HDFS_SHLIB);
  199.             Process process = runTime.exec(chmodCmd);
  200.             int exitStatus = process.waitFor();
  201.             if (exitStatus != 0) {
  202.               throw new IOException(chmodCmd + ": Failed with exitStatus: " + exitStatus);
  203.             }
  204.           }
  205.         } 
  206.         
  207.         synchronized (this) {
  208.           if (!localFS.exists(HDFS_WRITE)) {
  209.             FileUtil.copy(fs, HDFS_WRITE, localFS, HDFS_WRITE, false, fsConfig);
  210.             String chmodCmd = new String(CHMOD + " a+x " + HDFS_WRITE); 
  211.             Process process = runTime.exec(chmodCmd);
  212.             int exitStatus = process.waitFor();
  213.             if (exitStatus != 0) {
  214.               throw new IOException(chmodCmd + ": Failed with exitStatus: " + exitStatus);
  215.             }
  216.           }
  217.         }
  218.           
  219.         // exec the C program
  220.         Path outFile = new Path(DATA_DIR, name);
  221.         String writeCmd = new String(HDFS_WRITE + " " + outFile + " " + totalSize + " " + bufferSize); 
  222.         Process process = runTime.exec(writeCmd, null, new File(HDFS_TEST_DIR.toString()));
  223.         int exitStatus = process.waitFor();
  224.         if (exitStatus != 0) {
  225.           throw new IOException(writeCmd + ": Failed with exitStatus: " + exitStatus);
  226.         }
  227.       } catch (InterruptedException interruptedException) {
  228.         reporter.setStatus(interruptedException.toString());
  229.       } finally {
  230.         localFS.close();
  231.       }
  232.       return new Long(totalSize);
  233.     }
  234.   }
  235.   private static void writeTest(FileSystem fs)
  236.     throws IOException {
  237.     fs.delete(DATA_DIR, true);
  238.     fs.delete(WRITE_DIR, true);
  239.     
  240.     runIOTest(WriteMapper.class, WRITE_DIR);
  241.   }
  242.   
  243.   private static void runIOTest( Class<? extends Mapper> mapperClass, 
  244.                                  Path outputDir
  245.                                  ) throws IOException {
  246.     JobConf job = new JobConf(fsConfig, DFSCIOTest.class);
  247.     FileInputFormat.setInputPaths(job, CONTROL_DIR);
  248.     job.setInputFormat(SequenceFileInputFormat.class);
  249.     job.setMapperClass(mapperClass);
  250.     job.setReducerClass(AccumulatingReducer.class);
  251.     FileOutputFormat.setOutputPath(job, outputDir);
  252.     job.setOutputKeyClass(UTF8.class);
  253.     job.setOutputValueClass(UTF8.class);
  254.     job.setNumReduceTasks(1);
  255.     JobClient.runJob(job);
  256.   }
  257.   /**
  258.    * Read mapper class.
  259.    */
  260.   public static class ReadMapper extends IOStatMapper {
  261.     public ReadMapper() { 
  262.       super(); 
  263.     }
  264.     public Object doIO(Reporter reporter, 
  265.                        String name, 
  266.                        long totalSize 
  267.                        ) throws IOException {
  268.       totalSize *= MEGA;
  269.       
  270.       // create instance of local filesystem 
  271.       FileSystem localFS = FileSystem.getLocal(fsConfig);
  272.       
  273.       try {
  274.         // native runtime
  275.         Runtime runTime = Runtime.getRuntime();
  276.         
  277.         // copy the dso and executable from dfs
  278.         synchronized (this) {
  279.           localFS.delete(HDFS_TEST_DIR, true);
  280.           if (!(localFS.mkdirs(HDFS_TEST_DIR))) {
  281.             throw new IOException("Failed to create " + HDFS_TEST_DIR + " on local filesystem");
  282.           }
  283.         }
  284.         
  285.         synchronized (this) {
  286.           if (!localFS.exists(HDFS_SHLIB)) {
  287.             if (!FileUtil.copy(fs, HDFS_SHLIB, localFS, HDFS_SHLIB, false, fsConfig)) {
  288.               throw new IOException("Failed to copy " + HDFS_SHLIB + " to local filesystem");
  289.             }
  290.             String chmodCmd = new String(CHMOD + " a+x " + HDFS_SHLIB);
  291.             Process process = runTime.exec(chmodCmd);
  292.             int exitStatus = process.waitFor();
  293.             if (exitStatus != 0) {
  294.               throw new IOException(chmodCmd + ": Failed with exitStatus: " + exitStatus);
  295.             }
  296.           }
  297.         }
  298.         
  299.         synchronized (this) {
  300.           if (!localFS.exists(HDFS_READ)) {
  301.             if (!FileUtil.copy(fs, HDFS_READ, localFS, HDFS_READ, false, fsConfig)) {
  302.               throw new IOException("Failed to copy " + HDFS_READ + " to local filesystem");
  303.             }
  304.             String chmodCmd = new String(CHMOD + " a+x " + HDFS_READ); 
  305.             Process process = runTime.exec(chmodCmd);
  306.             int exitStatus = process.waitFor();
  307.              
  308.             if (exitStatus != 0) {
  309.               throw new IOException(chmodCmd + ": Failed with exitStatus: " + exitStatus);
  310.             }
  311.           }
  312.         }
  313.           
  314.         // exec the C program
  315.         Path inFile = new Path(DATA_DIR, name);
  316.         String readCmd = new String(HDFS_READ + " " + inFile + " " + totalSize + " " + 
  317.                                     bufferSize); 
  318.         Process process = runTime.exec(readCmd, null, new File(HDFS_TEST_DIR.toString()));
  319.         int exitStatus = process.waitFor();
  320.         
  321.         if (exitStatus != 0) {
  322.           throw new IOException(HDFS_READ + ": Failed with exitStatus: " + exitStatus);
  323.         }
  324.       } catch (InterruptedException interruptedException) {
  325.         reporter.setStatus(interruptedException.toString());
  326.       } finally {
  327.         localFS.close();
  328.       }
  329.       return new Long(totalSize);
  330.     }
  331.   }
  332.   private static void readTest(FileSystem fs) throws IOException {
  333.     fs.delete(READ_DIR, true);
  334.     runIOTest(ReadMapper.class, READ_DIR);
  335.   }
  336.   private static void sequentialTest(
  337.                                      FileSystem fs, 
  338.                                      int testType, 
  339.                                      int fileSize, 
  340.                                      int nrFiles
  341.                                      ) throws Exception {
  342.     IOStatMapper ioer = null;
  343.     if (testType == TEST_TYPE_READ)
  344.       ioer = new ReadMapper();
  345.     else if (testType == TEST_TYPE_WRITE)
  346.       ioer = new WriteMapper();
  347.     else
  348.       return;
  349.     for(int i=0; i < nrFiles; i++)
  350.       ioer.doIO(Reporter.NULL,
  351.                 BASE_FILE_NAME+Integer.toString(i), 
  352.                 MEGA*fileSize);
  353.   }
  354.   public static void main(String[] args) {
  355.     int testType = TEST_TYPE_READ;
  356.     int bufferSize = DEFAULT_BUFFER_SIZE;
  357.     int fileSize = 1;
  358.     int nrFiles = 1;
  359.     String resFileName = DEFAULT_RES_FILE_NAME;
  360.     boolean isSequential = false;
  361.     String version="DFSCIOTest.0.0.1";
  362.     String usage = "Usage: DFSCIOTest -read | -write | -clean [-nrFiles N] [-fileSize MB] [-resFile resultFileName] [-bufferSize Bytes] ";
  363.     
  364.     System.out.println(version);
  365.     if (args.length == 0) {
  366.       System.err.println(usage);
  367.       System.exit(-1);
  368.     }
  369.     for (int i = 0; i < args.length; i++) {       // parse command line
  370.       if (args[i].startsWith("-r")) {
  371.         testType = TEST_TYPE_READ;
  372.       } else if (args[i].startsWith("-w")) {
  373.         testType = TEST_TYPE_WRITE;
  374.       } else if (args[i].startsWith("-clean")) {
  375.         testType = TEST_TYPE_CLEANUP;
  376.       } else if (args[i].startsWith("-seq")) {
  377.         isSequential = true;
  378.       } else if (args[i].equals("-nrFiles")) {
  379.         nrFiles = Integer.parseInt(args[++i]);
  380.       } else if (args[i].equals("-fileSize")) {
  381.         fileSize = Integer.parseInt(args[++i]);
  382.       } else if (args[i].equals("-bufferSize")) {
  383.         bufferSize = Integer.parseInt(args[++i]);
  384.       } else if (args[i].equals("-resFile")) {
  385.         resFileName = args[++i];
  386.       }
  387.     }
  388.     LOG.info("nrFiles = " + nrFiles);
  389.     LOG.info("fileSize (MB) = " + fileSize);
  390.     LOG.info("bufferSize = " + bufferSize);
  391.   
  392.     try {
  393.       fsConfig.setInt("test.io.file.buffer.size", bufferSize);
  394.       FileSystem fs = FileSystem.get(fsConfig);
  395.       
  396.       if (testType != TEST_TYPE_CLEANUP) {
  397.         fs.delete(HDFS_TEST_DIR, true);
  398.         if (!fs.mkdirs(HDFS_TEST_DIR)) {
  399.           throw new IOException("Mkdirs failed to create " + 
  400.                                 HDFS_TEST_DIR.toString());
  401.         }
  402.         //Copy the executables over to the remote filesystem
  403.         String hadoopHome = System.getenv("HADOOP_HOME");
  404.         fs.copyFromLocalFile(new Path(hadoopHome + "/libhdfs/libhdfs.so." + HDFS_LIB_VERSION),
  405.                              HDFS_SHLIB);
  406.         fs.copyFromLocalFile(new Path(hadoopHome + "/libhdfs/hdfs_read"), HDFS_READ);
  407.         fs.copyFromLocalFile(new Path(hadoopHome + "/libhdfs/hdfs_write"), HDFS_WRITE);
  408.       }
  409.       if (isSequential) {
  410.         long tStart = System.currentTimeMillis();
  411.         sequentialTest(fs, testType, fileSize, nrFiles);
  412.         long execTime = System.currentTimeMillis() - tStart;
  413.         String resultLine = "Seq Test exec time sec: " + (float)execTime / 1000;
  414.         LOG.info(resultLine);
  415.         return;
  416.       }
  417.       if (testType == TEST_TYPE_CLEANUP) {
  418.         cleanup(fs);
  419.         return;
  420.       }
  421.       createControlFile(fs, fileSize, nrFiles);
  422.       long tStart = System.currentTimeMillis();
  423.       if (testType == TEST_TYPE_WRITE)
  424.         writeTest(fs);
  425.       if (testType == TEST_TYPE_READ)
  426.         readTest(fs);
  427.       long execTime = System.currentTimeMillis() - tStart;
  428.     
  429.       analyzeResult(fs, testType, execTime, resFileName);
  430.     } catch(Exception e) {
  431.       System.err.print(e.getLocalizedMessage());
  432.       System.exit(-1);
  433.     }
  434.   }
  435.   
  436.   private static void analyzeResult( FileSystem fs, 
  437.                                      int testType,
  438.                                      long execTime,
  439.                                      String resFileName
  440.                                      ) throws IOException {
  441.     Path reduceFile;
  442.     if (testType == TEST_TYPE_WRITE)
  443.       reduceFile = new Path(WRITE_DIR, "part-00000");
  444.     else
  445.       reduceFile = new Path(READ_DIR, "part-00000");
  446.     DataInputStream in;
  447.     in = new DataInputStream(fs.open(reduceFile));
  448.   
  449.     BufferedReader lines;
  450.     lines = new BufferedReader(new InputStreamReader(in));
  451.     long tasks = 0;
  452.     long size = 0;
  453.     long time = 0;
  454.     float rate = 0;
  455.     float sqrate = 0;
  456.     String line;
  457.     while((line = lines.readLine()) != null) {
  458.       StringTokenizer tokens = new StringTokenizer(line, " tnrf%");
  459.       String attr = tokens.nextToken(); 
  460.       if (attr.endsWith(":tasks"))
  461.         tasks = Long.parseLong(tokens.nextToken());
  462.       else if (attr.endsWith(":size"))
  463.         size = Long.parseLong(tokens. nextToken());
  464.       else if (attr.endsWith(":time"))
  465.         time = Long.parseLong(tokens.nextToken());
  466.       else if (attr.endsWith(":rate"))
  467.         rate = Float.parseFloat(tokens.nextToken());
  468.       else if (attr.endsWith(":sqrate"))
  469.         sqrate = Float.parseFloat(tokens.nextToken());
  470.     }
  471.     
  472.     double med = rate / 1000 / tasks;
  473.     double stdDev = Math.sqrt(Math.abs(sqrate / 1000 / tasks - med*med));
  474.     String resultLines[] = {
  475.       "----- DFSCIOTest ----- : " + ((testType == TEST_TYPE_WRITE) ? "write" :
  476.                                      (testType == TEST_TYPE_READ) ? "read" : 
  477.                                      "unknown"),
  478.       "           Date & time: " + new Date(System.currentTimeMillis()),
  479.       "       Number of files: " + tasks,
  480.       "Total MBytes processed: " + size/MEGA,
  481.       "     Throughput mb/sec: " + size * 1000.0 / (time * MEGA),
  482.       "Average IO rate mb/sec: " + med,
  483.       " Std IO rate deviation: " + stdDev,
  484.       "    Test exec time sec: " + (float)execTime / 1000,
  485.       "" };
  486.     PrintStream res = new PrintStream(
  487.                                       new FileOutputStream(
  488.                                                            new File(resFileName), true)); 
  489.     for(int i = 0; i < resultLines.length; i++) {
  490.       LOG.info(resultLines[i]);
  491.       res.println(resultLines[i]);
  492.     }
  493.   }
  494.   private static void cleanup(FileSystem fs) throws Exception {
  495.     LOG.info("Cleaning up test files");
  496.     fs.delete(new Path(TEST_ROOT_DIR), true);
  497.     fs.delete(HDFS_TEST_DIR, true);
  498.   }
  499. }