NNBench.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:34k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfs;
  19. import java.io.IOException;
  20. import java.util.Date;
  21. import java.io.DataInputStream;
  22. import java.io.FileOutputStream;
  23. import java.io.InputStreamReader;
  24. import java.io.PrintStream;
  25. import java.io.File;
  26. import java.io.BufferedReader;
  27. import java.util.StringTokenizer;
  28. import java.net.InetAddress;
  29. import java.text.SimpleDateFormat;
  30. import java.util.Iterator;
  31. import org.apache.commons.logging.LogFactory;
  32. import org.apache.commons.logging.Log;
  33. import org.apache.hadoop.conf.Configuration;
  34. import org.apache.hadoop.conf.Configured;
  35. import org.apache.hadoop.fs.Path;
  36. import org.apache.hadoop.fs.FSDataOutputStream;
  37. import org.apache.hadoop.fs.FSDataInputStream;
  38. import org.apache.hadoop.fs.FileSystem;
  39. import org.apache.hadoop.io.Text;
  40. import org.apache.hadoop.io.LongWritable;
  41. import org.apache.hadoop.io.SequenceFile.CompressionType;
  42. import org.apache.hadoop.io.SequenceFile;
  43. import org.apache.hadoop.mapred.FileInputFormat;
  44. import org.apache.hadoop.mapred.FileOutputFormat;
  45. import org.apache.hadoop.mapred.Mapper;
  46. import org.apache.hadoop.mapred.SequenceFileInputFormat;
  47. import org.apache.hadoop.mapred.JobClient;
  48. import org.apache.hadoop.mapred.MapReduceBase;
  49. import org.apache.hadoop.mapred.Reporter;
  50. import org.apache.hadoop.mapred.OutputCollector;
  51. import org.apache.hadoop.mapred.JobConf;
  52. import org.apache.hadoop.mapred.Reducer;
  53. /**
  54.  * This program executes a specified operation that applies load to 
  55.  * the NameNode.
  56.  * 
  57.  * When run simultaneously on multiple nodes, this program functions 
  58.  * as a stress-test and benchmark for namenode, especially when 
  59.  * the number of bytes written to each file is small.
  60.  * 
  61.  * Valid operations are:
  62.  *   create_write
  63.  *   open_read
  64.  *   rename
  65.  *   delete
  66.  * 
  67.  * NOTE: The open_read, rename and delete operations assume that the files
  68.  *       they operate on are already available. The create_write operation 
  69.  *       must be run before running the other operations.
  70.  */
  71. public class NNBench {
  72.   private static final Log LOG = LogFactory.getLog(
  73.           "org.apache.hadoop.hdfs.NNBench");
  74.   
  75.   protected static String CONTROL_DIR_NAME = "control";
  76.   protected static String OUTPUT_DIR_NAME = "output";
  77.   protected static String DATA_DIR_NAME = "data";
  78.   protected static final String DEFAULT_RES_FILE_NAME = "NNBench_results.log";
  79.   protected static final String NNBENCH_VERSION = "NameNode Benchmark 0.4";
  80.   
  81.   public static String operation = "none";
  82.   public static long numberOfMaps = 1l; // default is 1
  83.   public static long numberOfReduces = 1l; // default is 1
  84.   public static long startTime = 
  85.           System.currentTimeMillis() + (120 * 1000); // default is 'now' + 2min
  86.   public static long blockSize = 1l; // default is 1
  87.   public static int bytesToWrite = 0; // default is 0
  88.   public static long bytesPerChecksum = 1l; // default is 1
  89.   public static long numberOfFiles = 1l; // default is 1
  90.   public static short replicationFactorPerFile = 1; // default is 1
  91.   public static String baseDir = "/benchmarks/NNBench";  // default
  92.   public static boolean readFileAfterOpen = false; // default is to not read
  93.   
  94.   // Supported operations
  95.   private static final String OP_CREATE_WRITE = "create_write";
  96.   private static final String OP_OPEN_READ = "open_read";
  97.   private static final String OP_RENAME = "rename";
  98.   private static final String OP_DELETE = "delete";
  99.   
  100.   // To display in the format that matches the NN and DN log format
  101.   // Example: 2007-10-26 00:01:19,853
  102.   static SimpleDateFormat sdf = 
  103.           new SimpleDateFormat("yyyy-MM-dd' 'HH:mm:ss','S");
  104.   private static Configuration config = new Configuration();
  105.   
  106.   /**
  107.    * Clean up the files before a test run
  108.    * 
  109.    * @throws IOException on error
  110.    */
  111.   private static void cleanupBeforeTestrun() throws IOException {
  112.     FileSystem tempFS = FileSystem.get(config);
  113.     
  114.     // Delete the data directory only if it is the create/write operation
  115.     if (operation.equals(OP_CREATE_WRITE)) {
  116.       LOG.info("Deleting data directory");
  117.       tempFS.delete(new Path(baseDir, DATA_DIR_NAME), true);
  118.     }
  119.     tempFS.delete(new Path(baseDir, CONTROL_DIR_NAME), true);
  120.     tempFS.delete(new Path(baseDir, OUTPUT_DIR_NAME), true);
  121.   }
  122.   
  123.   /**
  124.    * Create control files before a test run.
  125.    * Number of files created is equal to the number of maps specified
  126.    * 
  127.    * @throws IOException on error
  128.    */
  129.   private static void createControlFiles() throws IOException {
  130.     FileSystem tempFS = FileSystem.get(config);
  131.     LOG.info("Creating " + numberOfMaps + " control files");
  132.     for (int i = 0; i < numberOfMaps; i++) {
  133.       String strFileName = "NNBench_Controlfile_" + i;
  134.       Path filePath = new Path(new Path(baseDir, CONTROL_DIR_NAME),
  135.               strFileName);
  136.       SequenceFile.Writer writer = null;
  137.       try {
  138.         writer = SequenceFile.createWriter(tempFS, config, filePath, Text.class, 
  139.                 LongWritable.class, CompressionType.NONE);
  140.         writer.append(new Text(strFileName), new LongWritable(0l));
  141.       } catch(Exception e) {
  142.         throw new IOException(e.getLocalizedMessage());
  143.       } finally {
  144.         if (writer != null) {
  145.           writer.close();
  146.         }
  147.         writer = null;
  148.       }
  149.     }
  150.   }
  151.   /**
  152.    * Display version
  153.    */
  154.   private static void displayVersion() {
  155.     System.out.println(NNBENCH_VERSION);
  156.   }
  157.   
  158.   /**
  159.    * Display usage
  160.    */
  161.   private static void displayUsage() {
  162.     String usage =
  163.       "Usage: nnbench <options>n" +
  164.       "Options:n" +
  165.       "t-operation <Available operations are " + OP_CREATE_WRITE + " " +
  166.       OP_OPEN_READ + " " + OP_RENAME + " " + OP_DELETE + ". " +
  167.       "This option is mandatory>n" +
  168.       "t * NOTE: The open_read, rename and delete operations assume " +
  169.       "that the files they operate on, are already available. " +
  170.       "The create_write operation must be run before running the " +
  171.       "other operations.n" +
  172.       "t-maps <number of maps. default is 1. This is not mandatory>n" +
  173.       "t-reduces <number of reduces. default is 1. This is not mandatory>n" +
  174.       "t-startTime <time to start, given in seconds from the epoch. " +
  175.       "Make sure this is far enough into the future, so all maps " +
  176.       "(operations) will start at the same time>. " +
  177.       "default is launch time + 2 mins. This is not mandatory n" +
  178.       "t-blockSize <Block size in bytes. default is 1. " + 
  179.       "This is not mandatory>n" +
  180.       "t-bytesToWrite <Bytes to write. default is 0. " + 
  181.       "This is not mandatory>n" +
  182.       "t-bytesPerChecksum <Bytes per checksum for the files. default is 1. " + 
  183.       "This is not mandatory>n" +
  184.       "t-numberOfFiles <number of files to create. default is 1. " +
  185.       "This is not mandatory>n" +
  186.       "t-replicationFactorPerFile <Replication factor for the files." +
  187.         " default is 1. This is not mandatory>n" +
  188.       "t-baseDir <base DFS path. default is /becnhmarks/NNBench. " +
  189.       "This is not mandatory>n" +
  190.       "t-readFileAfterOpen <true or false. if true, it reads the file and " +
  191.       "reports the average time to read. This is valid with the open_read " +
  192.       "operation. default is false. This is not mandatory>n" +
  193.       "t-help: Display the help statementn";
  194.       
  195.     
  196.     System.out.println(usage);
  197.   }
  198.   /**
  199.    * check for arguments and fail if the values are not specified
  200.    */
  201.   public static void checkArgs(final int index, final int length) {
  202.     if (index == length) {
  203.       displayUsage();
  204.       System.exit(-1);
  205.     }
  206.   }
  207.   
  208.   /**
  209.    * Parse input arguments
  210.    * 
  211.    * @params args Command line inputs
  212.    */
  213.   public static void parseInputs(final String[] args) {
  214.     // If there are no command line arguments, exit
  215.     if (args.length == 0) {
  216.       displayUsage();
  217.       System.exit(-1);
  218.     }
  219.     
  220.     // Parse command line args
  221.     for (int i = 0; i < args.length; i++) {
  222.       if (args[i].equals("-operation")) {
  223.         operation = args[++i];
  224.       } else if (args[i].equals("-maps")) {
  225.         checkArgs(i + 1, args.length);
  226.         numberOfMaps = Long.parseLong(args[++i]);
  227.       } else if (args[i].equals("-reduces")) {
  228.         checkArgs(i + 1, args.length);
  229.         numberOfReduces = Long.parseLong(args[++i]);
  230.       } else if (args[i].equals("-startTime")) {
  231.         checkArgs(i + 1, args.length);
  232.         startTime = Long.parseLong(args[++i]) * 1000;
  233.       } else if (args[i].equals("-blockSize")) {
  234.         checkArgs(i + 1, args.length);
  235.         blockSize = Long.parseLong(args[++i]);
  236.       } else if (args[i].equals("-bytesToWrite")) {
  237.         checkArgs(i + 1, args.length);
  238.         bytesToWrite = Integer.parseInt(args[++i]);
  239.       } else if (args[i].equals("-bytesPerChecksum")) {
  240.         checkArgs(i + 1, args.length);
  241.         bytesPerChecksum = Long.parseLong(args[++i]);
  242.       } else if (args[i].equals("-numberOfFiles")) {
  243.         checkArgs(i + 1, args.length);
  244.         numberOfFiles = Long.parseLong(args[++i]);
  245.       } else if (args[i].equals("-replicationFactorPerFile")) {
  246.         checkArgs(i + 1, args.length);
  247.         replicationFactorPerFile = Short.parseShort(args[++i]);
  248.       } else if (args[i].equals("-baseDir")) {
  249.         checkArgs(i + 1, args.length);
  250.         baseDir = args[++i];
  251.       } else if (args[i].equals("-readFileAfterOpen")) {
  252.         checkArgs(i + 1, args.length);
  253.         readFileAfterOpen = Boolean.parseBoolean(args[++i]);
  254.       } else if (args[i].equals("-help")) {
  255.         displayUsage();
  256.         System.exit(-1);
  257.       }
  258.     }
  259.     
  260.     LOG.info("Test Inputs: ");
  261.     LOG.info("           Test Operation: " + operation);
  262.     LOG.info("               Start time: " + sdf.format(new Date(startTime)));
  263.     LOG.info("           Number of maps: " + numberOfMaps);
  264.     LOG.info("        Number of reduces: " + numberOfReduces);
  265.     LOG.info("               Block Size: " + blockSize);
  266.     LOG.info("           Bytes to write: " + bytesToWrite);
  267.     LOG.info("       Bytes per checksum: " + bytesPerChecksum);
  268.     LOG.info("          Number of files: " + numberOfFiles);
  269.     LOG.info("       Replication factor: " + replicationFactorPerFile);
  270.     LOG.info("                 Base dir: " + baseDir);
  271.     LOG.info("     Read file after open: " + readFileAfterOpen);
  272.     
  273.     // Set user-defined parameters, so the map method can access the values
  274.     config.set("test.nnbench.operation", operation);
  275.     config.setLong("test.nnbench.maps", numberOfMaps);
  276.     config.setLong("test.nnbench.reduces", numberOfReduces);
  277.     config.setLong("test.nnbench.starttime", startTime);
  278.     config.setLong("test.nnbench.blocksize", blockSize);
  279.     config.setInt("test.nnbench.bytestowrite", bytesToWrite);
  280.     config.setLong("test.nnbench.bytesperchecksum", bytesPerChecksum);
  281.     config.setLong("test.nnbench.numberoffiles", numberOfFiles);
  282.     config.setInt("test.nnbench.replicationfactor", 
  283.             (int) replicationFactorPerFile);
  284.     config.set("test.nnbench.basedir", baseDir);
  285.     config.setBoolean("test.nnbench.readFileAfterOpen", readFileAfterOpen);
  286.     config.set("test.nnbench.datadir.name", DATA_DIR_NAME);
  287.     config.set("test.nnbench.outputdir.name", OUTPUT_DIR_NAME);
  288.     config.set("test.nnbench.controldir.name", CONTROL_DIR_NAME);
  289.   }
  290.   
  291.   /**
  292.    * Analyze the results
  293.    * 
  294.    * @throws IOException on error
  295.    */
  296.   private static void analyzeResults() throws IOException {
  297.     final FileSystem fs = FileSystem.get(config);
  298.     Path reduceFile = new Path(new Path(baseDir, OUTPUT_DIR_NAME),
  299.             "part-00000");
  300.     DataInputStream in;
  301.     in = new DataInputStream(fs.open(reduceFile));
  302.     BufferedReader lines;
  303.     lines = new BufferedReader(new InputStreamReader(in));
  304.     long totalTimeAL1 = 0l;
  305.     long totalTimeAL2 = 0l;
  306.     long totalTimeTPmS = 0l;
  307.     long lateMaps = 0l;
  308.     long numOfExceptions = 0l;
  309.     long successfulFileOps = 0l;
  310.     
  311.     long mapStartTimeTPmS = 0l;
  312.     long mapEndTimeTPmS = 0l;
  313.     
  314.     String resultTPSLine1 = null;
  315.     String resultTPSLine2 = null;
  316.     String resultALLine1 = null;
  317.     String resultALLine2 = null;
  318.     
  319.     String line;
  320.     while((line = lines.readLine()) != null) {
  321.       StringTokenizer tokens = new StringTokenizer(line, " tnrf%;");
  322.       String attr = tokens.nextToken();
  323.       if (attr.endsWith(":totalTimeAL1")) {
  324.         totalTimeAL1 = Long.parseLong(tokens.nextToken());
  325.       } else if (attr.endsWith(":totalTimeAL2")) {
  326.         totalTimeAL2 = Long.parseLong(tokens.nextToken());
  327.       } else if (attr.endsWith(":totalTimeTPmS")) {
  328.         totalTimeTPmS = Long.parseLong(tokens.nextToken());
  329.       } else if (attr.endsWith(":latemaps")) {
  330.         lateMaps = Long.parseLong(tokens.nextToken());
  331.       } else if (attr.endsWith(":numOfExceptions")) {
  332.         numOfExceptions = Long.parseLong(tokens.nextToken());
  333.       } else if (attr.endsWith(":successfulFileOps")) {
  334.         successfulFileOps = Long.parseLong(tokens.nextToken());
  335.       } else if (attr.endsWith(":mapStartTimeTPmS")) {
  336.         mapStartTimeTPmS = Long.parseLong(tokens.nextToken());
  337.       } else if (attr.endsWith(":mapEndTimeTPmS")) {
  338.         mapEndTimeTPmS = Long.parseLong(tokens.nextToken());
  339.       }
  340.     }
  341.     
  342.     // Average latency is the average time to perform 'n' number of
  343.     // operations, n being the number of files
  344.     double avgLatency1 = (double) totalTimeAL1 / (double) successfulFileOps;
  345.     double avgLatency2 = (double) totalTimeAL2 / (double) successfulFileOps;
  346.     
  347.     // The time it takes for the longest running map is measured. Using that,
  348.     // cluster transactions per second is calculated. It includes time to 
  349.     // retry any of the failed operations
  350.     double longestMapTimeTPmS = (double) (mapEndTimeTPmS - mapStartTimeTPmS);
  351.     double totalTimeTPS = (longestMapTimeTPmS == 0) ?
  352.             (1000 * successfulFileOps) :
  353.             (double) (1000 * successfulFileOps) / (double) longestMapTimeTPmS;
  354.             
  355.     // The time it takes to perform 'n' operations is calculated (in ms),
  356.     // n being the number of files. Using that time, the average execution 
  357.     // time is calculated. It includes time to retry any of the
  358.     // failed operations
  359.     double AverageExecutionTime = (totalTimeTPmS == 0) ?
  360.         (double) successfulFileOps : 
  361.         (double) (totalTimeTPmS / successfulFileOps);
  362.             
  363.     if (operation.equals(OP_CREATE_WRITE)) {
  364.       // For create/write/close, it is treated as two transactions,
  365.       // since a file create from a client perspective involves create and close
  366.       resultTPSLine1 = "               TPS: Create/Write/Close: " + 
  367.         (int) (totalTimeTPS * 2);
  368.       resultTPSLine2 = "Avg exec time (ms): Create/Write/Close: " + 
  369.         (double) AverageExecutionTime;
  370.       resultALLine1 = "            Avg Lat (ms): Create/Write: " + avgLatency1;
  371.       resultALLine2 = "                   Avg Lat (ms): Close: " + avgLatency2;
  372.     } else if (operation.equals(OP_OPEN_READ)) {
  373.       resultTPSLine1 = "                        TPS: Open/Read: " + 
  374.         (int) totalTimeTPS;
  375.       resultTPSLine2 = "         Avg Exec time (ms): Open/Read: " + 
  376.         (double) AverageExecutionTime;
  377.       resultALLine1 = "                    Avg Lat (ms): Open: " + avgLatency1;
  378.       if (readFileAfterOpen) {
  379.         resultALLine2 = "                  Avg Lat (ms): Read: " + avgLatency2;
  380.       }
  381.     } else if (operation.equals(OP_RENAME)) {
  382.       resultTPSLine1 = "                           TPS: Rename: " + 
  383.         (int) totalTimeTPS;
  384.       resultTPSLine2 = "            Avg Exec time (ms): Rename: " + 
  385.         (double) AverageExecutionTime;
  386.       resultALLine1 = "                  Avg Lat (ms): Rename: " + avgLatency1;
  387.     } else if (operation.equals(OP_DELETE)) {
  388.       resultTPSLine1 = "                           TPS: Delete: " + 
  389.         (int) totalTimeTPS;
  390.       resultTPSLine2 = "            Avg Exec time (ms): Delete: " + 
  391.         (double) AverageExecutionTime;
  392.       resultALLine1 = "                  Avg Lat (ms): Delete: " + avgLatency1;
  393.     }
  394.     
  395.     String resultLines[] = {
  396.     "-------------- NNBench -------------- : ",
  397.     "                               Version: " + NNBENCH_VERSION,
  398.     "                           Date & time: " + sdf.format(new Date(
  399.             System.currentTimeMillis())),
  400.     "",
  401.     "                        Test Operation: " + operation,
  402.     "                            Start time: " + 
  403.       sdf.format(new Date(startTime)),
  404.     "                           Maps to run: " + numberOfMaps,
  405.     "                        Reduces to run: " + numberOfReduces,
  406.     "                    Block Size (bytes): " + blockSize,
  407.     "                        Bytes to write: " + bytesToWrite,
  408.     "                    Bytes per checksum: " + bytesPerChecksum,
  409.     "                       Number of files: " + numberOfFiles,
  410.     "                    Replication factor: " + replicationFactorPerFile,
  411.     "            Successful file operations: " + successfulFileOps,
  412.     "",
  413.     "        # maps that missed the barrier: " + lateMaps,
  414.     "                          # exceptions: " + numOfExceptions,
  415.     "",
  416.     resultTPSLine1,
  417.     resultTPSLine2,
  418.     resultALLine1,
  419.     resultALLine2,
  420.     "",
  421.     "                 RAW DATA: AL Total #1: " + totalTimeAL1,
  422.     "                 RAW DATA: AL Total #2: " + totalTimeAL2,
  423.     "              RAW DATA: TPS Total (ms): " + totalTimeTPmS,
  424.     "       RAW DATA: Longest Map Time (ms): " + longestMapTimeTPmS,
  425.     "                   RAW DATA: Late maps: " + lateMaps,
  426.     "             RAW DATA: # of exceptions: " + numOfExceptions,
  427.     "" };
  428.     PrintStream res = new PrintStream(new FileOutputStream(
  429.             new File(DEFAULT_RES_FILE_NAME), true));
  430.     
  431.     // Write to a file and also dump to log
  432.     for(int i = 0; i < resultLines.length; i++) {
  433.       LOG.info(resultLines[i]);
  434.       res.println(resultLines[i]);
  435.     }
  436.   }
  437.   
  438.   /**
  439.    * Run the test
  440.    * 
  441.    * @throws IOException on error
  442.    */
  443.   public static void runTests() throws IOException {
  444.     config.setLong("io.bytes.per.checksum", bytesPerChecksum);
  445.     
  446.     JobConf job = new JobConf(config, NNBench.class);
  447.     job.setJobName("NNBench-" + operation);
  448.     FileInputFormat.setInputPaths(job, new Path(baseDir, CONTROL_DIR_NAME));
  449.     job.setInputFormat(SequenceFileInputFormat.class);
  450.     
  451.     // Explicitly set number of max map attempts to 1.
  452.     job.setMaxMapAttempts(1);
  453.     
  454.     // Explicitly turn off speculative execution
  455.     job.setSpeculativeExecution(false);
  456.     job.setMapperClass(NNBenchMapper.class);
  457.     job.setReducerClass(NNBenchReducer.class);
  458.     FileOutputFormat.setOutputPath(job, new Path(baseDir, OUTPUT_DIR_NAME));
  459.     job.setOutputKeyClass(Text.class);
  460.     job.setOutputValueClass(Text.class);
  461.     job.setNumReduceTasks((int) numberOfReduces);
  462.     JobClient.runJob(job);
  463.   }
  464.   
  465.   /**
  466.    * Validate the inputs
  467.    */
  468.   public static void validateInputs() {
  469.     // If it is not one of the four operations, then fail
  470.     if (!operation.equals(OP_CREATE_WRITE) &&
  471.             !operation.equals(OP_OPEN_READ) &&
  472.             !operation.equals(OP_RENAME) &&
  473.             !operation.equals(OP_DELETE)) {
  474.       System.err.println("Error: Unknown operation: " + operation);
  475.       displayUsage();
  476.       System.exit(-1);
  477.     }
  478.     
  479.     // If number of maps is a negative number, then fail
  480.     // Hadoop allows the number of maps to be 0
  481.     if (numberOfMaps < 0) {
  482.       System.err.println("Error: Number of maps must be a positive number");
  483.       displayUsage();
  484.       System.exit(-1);
  485.     }
  486.     
  487.     // If number of reduces is a negative number or 0, then fail
  488.     if (numberOfReduces <= 0) {
  489.       System.err.println("Error: Number of reduces must be a positive number");
  490.       displayUsage();
  491.       System.exit(-1);
  492.     }
  493.     // If blocksize is a negative number or 0, then fail
  494.     if (blockSize <= 0) {
  495.       System.err.println("Error: Block size must be a positive number");
  496.       displayUsage();
  497.       System.exit(-1);
  498.     }
  499.     
  500.     // If bytes to write is a negative number, then fail
  501.     if (bytesToWrite < 0) {
  502.       System.err.println("Error: Bytes to write must be a positive number");
  503.       displayUsage();
  504.       System.exit(-1);
  505.     }
  506.     
  507.     // If bytes per checksum is a negative number, then fail
  508.     if (bytesPerChecksum < 0) {
  509.       System.err.println("Error: Bytes per checksum must be a positive number");
  510.       displayUsage();
  511.       System.exit(-1);
  512.     }
  513.     
  514.     // If number of files is a negative number, then fail
  515.     if (numberOfFiles < 0) {
  516.       System.err.println("Error: Number of files must be a positive number");
  517.       displayUsage();
  518.       System.exit(-1);
  519.     }
  520.     
  521.     // If replication factor is a negative number, then fail
  522.     if (replicationFactorPerFile < 0) {
  523.       System.err.println("Error: Replication factor must be a positive number");
  524.       displayUsage();
  525.       System.exit(-1);
  526.     }
  527.     
  528.     // If block size is not a multiple of bytesperchecksum, fail
  529.     if (blockSize % bytesPerChecksum != 0) {
  530.       System.err.println("Error: Block Size in bytes must be a multiple of " +
  531.               "bytes per checksum: ");
  532.       displayUsage();
  533.       System.exit(-1);
  534.     }
  535.   }
  536.   /**
  537.   * Main method for running the NNBench benchmarks
  538.   *
  539.   * @throws IOException indicates a problem with test startup
  540.   */
  541.   public static void main(String[] args) throws IOException {
  542.     // Display the application version string
  543.     displayVersion();
  544.     // Parse the inputs
  545.     parseInputs(args);
  546.     
  547.     // Validate inputs
  548.     validateInputs();
  549.     
  550.     // Clean up files before the test run
  551.     cleanupBeforeTestrun();
  552.     
  553.     // Create control files before test run
  554.     createControlFiles();
  555.     // Run the tests as a map reduce job
  556.     runTests();
  557.     
  558.     // Analyze results
  559.     analyzeResults();
  560.   }
  561.   
  562.   /**
  563.    * Mapper class
  564.    */
  565.   static class NNBenchMapper extends Configured 
  566.           implements Mapper<Text, LongWritable, Text, Text> {
  567.     FileSystem filesystem = null;
  568.     private String hostName = null;
  569.     long numberOfFiles = 1l;
  570.     long blkSize = 1l;
  571.     short replFactor = 1;
  572.     int bytesToWrite = 0;
  573.     String baseDir = null;
  574.     String dataDirName = null;
  575.     String op = null;
  576.     boolean readFile = false;
  577.     final int MAX_OPERATION_EXCEPTIONS = 1000;
  578.     
  579.     // Data to collect from the operation
  580.     int numOfExceptions = 0;
  581.     long startTimeAL = 0l;
  582.     long totalTimeAL1 = 0l;
  583.     long totalTimeAL2 = 0l;
  584.     long successfulFileOps = 0l;
  585.     
  586.     /**
  587.      * Constructor
  588.      */
  589.     public NNBenchMapper() {
  590.     }
  591.     
  592.     /**
  593.      * Mapper base implementation
  594.      */
  595.     public void configure(JobConf conf) {
  596.       setConf(conf);
  597.       
  598.       try {
  599.         filesystem = FileSystem.get(conf);
  600.       } catch(Exception e) {
  601.         throw new RuntimeException("Cannot get file system.", e);
  602.       }
  603.       
  604.       try {
  605.         hostName = InetAddress.getLocalHost().getHostName();
  606.       } catch(Exception e) {
  607.         throw new RuntimeException("Error getting hostname", e);
  608.       }
  609.     }
  610.     
  611.     /**
  612.      * Mapper base implementation
  613.      */
  614.     public void close() throws IOException {
  615.     }
  616.     
  617.     /**
  618.     * Returns when the current number of seconds from the epoch equals
  619.     * the command line argument given by <code>-startTime</code>.
  620.     * This allows multiple instances of this program, running on clock
  621.     * synchronized nodes, to start at roughly the same time.
  622.     */
  623.     private boolean barrier() {
  624.       long startTime = getConf().getLong("test.nnbench.starttime", 0l);
  625.       long currentTime = System.currentTimeMillis();
  626.       long sleepTime = startTime - currentTime;
  627.       boolean retVal = false;
  628.       
  629.       // If the sleep time is greater than 0, then sleep and return
  630.       if (sleepTime > 0) {
  631.         LOG.info("Waiting in barrier for: " + sleepTime + " ms");
  632.       
  633.         try {
  634.           Thread.sleep(sleepTime);
  635.           retVal = true;
  636.         } catch (Exception e) {
  637.           retVal = false;
  638.         }
  639.       }
  640.       
  641.       return retVal;
  642.     }
  643.     
  644.     /**
  645.      * Map method
  646.      */ 
  647.     public void map(Text key, 
  648.             LongWritable value,
  649.             OutputCollector<Text, Text> output,
  650.             Reporter reporter) throws IOException {
  651.       Configuration conf = filesystem.getConf();
  652.       
  653.       numberOfFiles = conf.getLong("test.nnbench.numberoffiles", 1l);
  654.       blkSize = conf.getLong("test.nnbench.blocksize", 1l);
  655.       replFactor = (short) (conf.getInt("test.nnbench.replicationfactor", 1));
  656.       bytesToWrite = conf.getInt("test.nnbench.bytestowrite", 0);
  657.       baseDir = conf.get("test.nnbench.basedir");
  658.       dataDirName = conf.get("test.nnbench.datadir.name");
  659.       op = conf.get("test.nnbench.operation");
  660.       readFile = conf.getBoolean("test.nnbench.readFileAfterOpen", false);
  661.       
  662.       long totalTimeTPmS = 0l;
  663.       long startTimeTPmS = 0l;
  664.       long endTimeTPms = 0l;
  665.       
  666.       numOfExceptions = 0;
  667.       startTimeAL = 0l;
  668.       totalTimeAL1 = 0l;
  669.       totalTimeAL2 = 0l;
  670.       successfulFileOps = 0l;
  671.       
  672.       if (barrier()) {
  673.         if (op.equals(OP_CREATE_WRITE)) {
  674.           startTimeTPmS = System.currentTimeMillis();
  675.           doCreateWriteOp("file_" + hostName + "_", output, reporter);
  676.         } else if (op.equals(OP_OPEN_READ)) {
  677.           startTimeTPmS = System.currentTimeMillis();
  678.           doOpenReadOp("file_" + hostName + "_", output, reporter);
  679.         } else if (op.equals(OP_RENAME)) {
  680.           startTimeTPmS = System.currentTimeMillis();
  681.           doRenameOp("file_" + hostName + "_", output, reporter);
  682.         } else if (op.equals(OP_DELETE)) {
  683.           startTimeTPmS = System.currentTimeMillis();
  684.           doDeleteOp("file_" + hostName + "_", output, reporter);
  685.         }
  686.         
  687.         endTimeTPms = System.currentTimeMillis();
  688.         totalTimeTPmS = endTimeTPms - startTimeTPmS;
  689.       } else {
  690.         output.collect(new Text("l:latemaps"), new Text("1"));
  691.       }
  692.       
  693.       // collect after the map end time is measured
  694.       output.collect(new Text("l:totalTimeAL1"), 
  695.           new Text(String.valueOf(totalTimeAL1)));
  696.       output.collect(new Text("l:totalTimeAL2"), 
  697.           new Text(String.valueOf(totalTimeAL2)));
  698.       output.collect(new Text("l:numOfExceptions"), 
  699.           new Text(String.valueOf(numOfExceptions)));
  700.       output.collect(new Text("l:successfulFileOps"), 
  701.           new Text(String.valueOf(successfulFileOps)));
  702.       output.collect(new Text("l:totalTimeTPmS"), 
  703.               new Text(String.valueOf(totalTimeTPmS)));
  704.       output.collect(new Text("min:mapStartTimeTPmS"), 
  705.           new Text(String.valueOf(startTimeTPmS)));
  706.       output.collect(new Text("max:mapEndTimeTPmS"), 
  707.           new Text(String.valueOf(endTimeTPms)));
  708.     }
  709.     
  710.     /**
  711.      * Create and Write operation.
  712.      */
  713.     private void doCreateWriteOp(String name,
  714.             OutputCollector<Text, Text> output,
  715.             Reporter reporter) {
  716.       FSDataOutputStream out = null;
  717.       byte[] buffer = new byte[bytesToWrite];
  718.       
  719.       for (long l = 0l; l < numberOfFiles; l++) {
  720.         Path filePath = new Path(new Path(baseDir, dataDirName), 
  721.                 name + "_" + l);
  722.         boolean successfulOp = false;
  723.         while (! successfulOp && numOfExceptions < MAX_OPERATION_EXCEPTIONS) {
  724.           try {
  725.             // Set up timer for measuring AL (transaction #1)
  726.             startTimeAL = System.currentTimeMillis();
  727.             // Create the file
  728.             // Use a buffer size of 512
  729.             out = filesystem.create(filePath, 
  730.                     true, 
  731.                     512, 
  732.                     replFactor, 
  733.                     blkSize);
  734.             out.write(buffer);
  735.             totalTimeAL1 += (System.currentTimeMillis() - startTimeAL);
  736.             // Close the file / file output stream
  737.             // Set up timers for measuring AL (transaction #2)
  738.             startTimeAL = System.currentTimeMillis();
  739.             out.close();
  740.             
  741.             totalTimeAL2 += (System.currentTimeMillis() - startTimeAL);
  742.             successfulOp = true;
  743.             successfulFileOps ++;
  744.             reporter.setStatus("Finish "+ l + " files");
  745.           } catch (IOException e) {
  746.             LOG.info("Exception recorded in op: " +
  747.                     "Create/Write/Close");
  748.  
  749.             numOfExceptions++;
  750.           }
  751.         }
  752.       }
  753.     }
  754.     
  755.     /**
  756.      * Open operation
  757.      */
  758.     private void doOpenReadOp(String name,
  759.             OutputCollector<Text, Text> output,
  760.             Reporter reporter) {
  761.       FSDataInputStream input = null;
  762.       byte[] buffer = new byte[bytesToWrite];
  763.       
  764.       for (long l = 0l; l < numberOfFiles; l++) {
  765.         Path filePath = new Path(new Path(baseDir, dataDirName), 
  766.                 name + "_" + l);
  767.         boolean successfulOp = false;
  768.         while (! successfulOp && numOfExceptions < MAX_OPERATION_EXCEPTIONS) {
  769.           try {
  770.             // Set up timer for measuring AL
  771.             startTimeAL = System.currentTimeMillis();
  772.             input = filesystem.open(filePath);
  773.             totalTimeAL1 += (System.currentTimeMillis() - startTimeAL);
  774.             
  775.             // If the file needs to be read (specified at command line)
  776.             if (readFile) {
  777.               startTimeAL = System.currentTimeMillis();
  778.               input.readFully(buffer);
  779.               totalTimeAL2 += (System.currentTimeMillis() - startTimeAL);
  780.             }
  781.             input.close();
  782.             successfulOp = true;
  783.             successfulFileOps ++;
  784.             reporter.setStatus("Finish "+ l + " files");
  785.           } catch (IOException e) {
  786.             LOG.info("Exception recorded in op: OpenRead " + e);
  787.             numOfExceptions++;
  788.           }
  789.         }
  790.       }
  791.     }
  792.     
  793.     /**
  794.      * Rename operation
  795.      */
  796.     private void doRenameOp(String name,
  797.             OutputCollector<Text, Text> output,
  798.             Reporter reporter) {
  799.       for (long l = 0l; l < numberOfFiles; l++) {
  800.         Path filePath = new Path(new Path(baseDir, dataDirName), 
  801.                 name + "_" + l);
  802.         Path filePathR = new Path(new Path(baseDir, dataDirName), 
  803.                 name + "_r_" + l);
  804.         boolean successfulOp = false;
  805.         while (! successfulOp && numOfExceptions < MAX_OPERATION_EXCEPTIONS) {
  806.           try {
  807.             // Set up timer for measuring AL
  808.             startTimeAL = System.currentTimeMillis();
  809.             filesystem.rename(filePath, filePathR);
  810.             totalTimeAL1 += (System.currentTimeMillis() - startTimeAL);
  811.             
  812.             successfulOp = true;
  813.             successfulFileOps ++;
  814.             reporter.setStatus("Finish "+ l + " files");
  815.           } catch (IOException e) {
  816.             LOG.info("Exception recorded in op: Rename");
  817.             numOfExceptions++;
  818.           }
  819.         }
  820.       }
  821.     }
  822.     
  823.     /**
  824.      * Delete operation
  825.      */
  826.     private void doDeleteOp(String name,
  827.             OutputCollector<Text, Text> output,
  828.             Reporter reporter) {
  829.       for (long l = 0l; l < numberOfFiles; l++) {
  830.         Path filePath = new Path(new Path(baseDir, dataDirName), 
  831.                 name + "_" + l);
  832.         
  833.         boolean successfulOp = false;
  834.         while (! successfulOp && numOfExceptions < MAX_OPERATION_EXCEPTIONS) {
  835.           try {
  836.             // Set up timer for measuring AL
  837.             startTimeAL = System.currentTimeMillis();
  838.             filesystem.delete(filePath, true);
  839.             totalTimeAL1 += (System.currentTimeMillis() - startTimeAL);
  840.             
  841.             successfulOp = true;
  842.             successfulFileOps ++;
  843.             reporter.setStatus("Finish "+ l + " files");
  844.           } catch (IOException e) {
  845.             LOG.info("Exception in recorded op: Delete");
  846.             numOfExceptions++;
  847.           }
  848.         }
  849.       }
  850.     }
  851.   }
  852.   
  853.   /**
  854.    * Reducer class
  855.    */
  856.   static class NNBenchReducer extends MapReduceBase
  857.       implements Reducer<Text, Text, Text, Text> {
  858.     protected String hostName;
  859.     public NNBenchReducer () {
  860.       LOG.info("Starting NNBenchReducer !!!");
  861.       try {
  862.         hostName = java.net.InetAddress.getLocalHost().getHostName();
  863.       } catch(Exception e) {
  864.         hostName = "localhost";
  865.       }
  866.       LOG.info("Starting NNBenchReducer on " + hostName);
  867.     }
  868.     /**
  869.      * Reduce method
  870.      */
  871.     public void reduce(Text key, 
  872.                        Iterator<Text> values,
  873.                        OutputCollector<Text, Text> output, 
  874.                        Reporter reporter
  875.                        ) throws IOException {
  876.       String field = key.toString();
  877.       
  878.       reporter.setStatus("starting " + field + " ::host = " + hostName);
  879.       
  880.       // sum long values
  881.       if (field.startsWith("l:")) {
  882.         long lSum = 0;
  883.         while (values.hasNext()) {
  884.           lSum += Long.parseLong(values.next().toString());
  885.         }
  886.         output.collect(key, new Text(String.valueOf(lSum)));
  887.       }
  888.       
  889.       if (field.startsWith("min:")) {
  890.         long minVal = -1;
  891.         while (values.hasNext()) {
  892.           long value = Long.parseLong(values.next().toString());
  893.           
  894.           if (minVal == -1) {
  895.             minVal = value;
  896.           } else {
  897.             if (value != 0 && value < minVal) {
  898.               minVal = value;
  899.             }
  900.           }
  901.         }
  902.         output.collect(key, new Text(String.valueOf(minVal)));
  903.       }
  904.       
  905.       if (field.startsWith("max:")) {
  906.         long maxVal = -1;
  907.         while (values.hasNext()) {
  908.           long value = Long.parseLong(values.next().toString());
  909.           
  910.           if (maxVal == -1) {
  911.             maxVal = value;
  912.           } else {
  913.             if (value > maxVal) {
  914.               maxVal = value;
  915.             }
  916.           }
  917.         }
  918.         output.collect(key, new Text(String.valueOf(maxVal)));
  919.       }
  920.       
  921.       reporter.setStatus("finished " + field + " ::host = " + hostName);
  922.     }
  923.   }
  924. }