NNBenchWithoutMR.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:12k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfs;
  19. import java.io.IOException;
  20. import java.util.Date;
  21. import org.apache.commons.logging.Log;
  22. import org.apache.commons.logging.LogFactory;
  23. import org.apache.hadoop.conf.Configuration;
  24. import org.apache.hadoop.fs.FSDataInputStream;
  25. import org.apache.hadoop.fs.FSDataOutputStream;
  26. import org.apache.hadoop.fs.FileSystem;
  27. import org.apache.hadoop.fs.Path;
  28. import org.apache.hadoop.mapred.JobConf;
  29. import org.apache.hadoop.util.StringUtils;
  30. /**
  31.  * This program executes a specified operation that applies load to 
  32.  * the NameNode. Possible operations include create/writing files,
  33.  * opening/reading files, renaming files, and deleting files.
  34.  * 
  35.  * When run simultaneously on multiple nodes, this program functions 
  36.  * as a stress-test and benchmark for namenode, especially when 
  37.  * the number of bytes written to each file is small.
  38.  * 
  39.  * This version does not use the map reduce framework
  40.  * 
  41.  */
  42. public class NNBenchWithoutMR {
  43.   
  44.   private static final Log LOG = LogFactory.getLog(
  45.                                             "org.apache.hadoop.hdfs.NNBench");
  46.   
  47.   // variable initialzed from command line arguments
  48.   private static long startTime = 0;
  49.   private static int numFiles = 0;
  50.   private static long bytesPerBlock = 1;
  51.   private static long blocksPerFile = 0;
  52.   private static long bytesPerFile = 1;
  53.   private static Path baseDir = null;
  54.     
  55.   // variables initialized in main()
  56.   private static FileSystem fileSys = null;
  57.   private static Path taskDir = null;
  58.   private static String uniqueId = null;
  59.   private static byte[] buffer;
  60.   private static long maxExceptionsPerFile = 200;
  61.     
  62.   /**
  63.    * Returns when the current number of seconds from the epoch equals
  64.    * the command line argument given by <code>-startTime</code>.
  65.    * This allows multiple instances of this program, running on clock
  66.    * synchronized nodes, to start at roughly the same time.
  67.    */
  68.   static void barrier() {
  69.     long sleepTime;
  70.     while ((sleepTime = startTime - System.currentTimeMillis()) > 0) {
  71.       try {
  72.         Thread.sleep(sleepTime);
  73.       } catch (InterruptedException ex) {
  74.       }
  75.     }
  76.   }
  77.     
  78.   static private void handleException(String operation, Throwable e, 
  79.                                       int singleFileExceptions) {
  80.     LOG.warn("Exception while " + operation + ": " +
  81.              StringUtils.stringifyException(e));
  82.     if (singleFileExceptions >= maxExceptionsPerFile) {
  83.       throw new RuntimeException(singleFileExceptions + 
  84.         " exceptions for a single file exceeds threshold. Aborting");
  85.     }
  86.   }
  87.   
  88.   /**
  89.    * Create and write to a given number of files.  Repeat each remote
  90.    * operation until is suceeds (does not throw an exception).
  91.    *
  92.    * @return the number of exceptions caught
  93.    */
  94.   static int createWrite() {
  95.     int totalExceptions = 0;
  96.     FSDataOutputStream out = null;
  97.     boolean success = false;
  98.     for (int index = 0; index < numFiles; index++) {
  99.       int singleFileExceptions = 0;
  100.       do { // create file until is succeeds or max exceptions reached
  101.         try {
  102.           out = fileSys.create(
  103.                                new Path(taskDir, "" + index), false, 512, (short)1, bytesPerBlock);
  104.           success = true;
  105.         } catch (IOException ioe) { 
  106.           success=false; 
  107.           totalExceptions++;
  108.           handleException("creating file #" + index, ioe, ++singleFileExceptions);
  109.         }
  110.       } while (!success);
  111.       long toBeWritten = bytesPerFile;
  112.       while (toBeWritten > 0) {
  113.         int nbytes = (int) Math.min(buffer.length, toBeWritten);
  114.         toBeWritten -= nbytes;
  115.         try { // only try once
  116.           out.write(buffer, 0, nbytes);
  117.         } catch (IOException ioe) {
  118.           totalExceptions++;
  119.           handleException("writing to file #" + index, ioe, ++singleFileExceptions);
  120.         }
  121.       }
  122.       do { // close file until is succeeds
  123.         try {
  124.           out.close();
  125.           success = true;
  126.         } catch (IOException ioe) {
  127.           success=false; 
  128.           totalExceptions++;
  129.           handleException("closing file #" + index, ioe, ++singleFileExceptions);
  130.         }
  131.       } while (!success);
  132.     }
  133.     return totalExceptions;
  134.   }
  135.     
  136.   /**
  137.    * Open and read a given number of files.
  138.    *
  139.    * @return the number of exceptions caught
  140.    */
  141.   static int openRead() {
  142.     int totalExceptions = 0;
  143.     FSDataInputStream in = null;
  144.     for (int index = 0; index < numFiles; index++) {
  145.       int singleFileExceptions = 0;
  146.       try {
  147.         in = fileSys.open(new Path(taskDir, "" + index), 512);
  148.         long toBeRead = bytesPerFile;
  149.         while (toBeRead > 0) {
  150.           int nbytes = (int) Math.min(buffer.length, toBeRead);
  151.           toBeRead -= nbytes;
  152.           try { // only try once
  153.             in.read(buffer, 0, nbytes);
  154.           } catch (IOException ioe) {
  155.             totalExceptions++;
  156.             handleException("reading from file #" + index, ioe, ++singleFileExceptions);
  157.           }
  158.         }
  159.         in.close();
  160.       } catch (IOException ioe) { 
  161.         totalExceptions++;
  162.         handleException("opening file #" + index, ioe, ++singleFileExceptions);
  163.       }
  164.     }
  165.     return totalExceptions;
  166.   }
  167.     
  168.   /**
  169.    * Rename a given number of files.  Repeat each remote
  170.    * operation until is suceeds (does not throw an exception).
  171.    *
  172.    * @return the number of exceptions caught
  173.    */
  174.   static int rename() {
  175.     int totalExceptions = 0;
  176.     boolean success = false;
  177.     for (int index = 0; index < numFiles; index++) {
  178.       int singleFileExceptions = 0;
  179.       do { // rename file until is succeeds
  180.         try {
  181.           boolean result = fileSys.rename(
  182.                                           new Path(taskDir, "" + index), new Path(taskDir, "A" + index));
  183.           success = true;
  184.         } catch (IOException ioe) { 
  185.           success=false; 
  186.           totalExceptions++;
  187.           handleException("creating file #" + index, ioe, ++singleFileExceptions);
  188.        }
  189.       } while (!success);
  190.     }
  191.     return totalExceptions;
  192.   }
  193.     
  194.   /**
  195.    * Delete a given number of files.  Repeat each remote
  196.    * operation until is suceeds (does not throw an exception).
  197.    *
  198.    * @return the number of exceptions caught
  199.    */
  200.   static int delete() {
  201.     int totalExceptions = 0;
  202.     boolean success = false;
  203.     for (int index = 0; index < numFiles; index++) {
  204.       int singleFileExceptions = 0;
  205.       do { // delete file until is succeeds
  206.         try {
  207.           boolean result = fileSys.delete(new Path(taskDir, "A" + index), true);
  208.           success = true;
  209.         } catch (IOException ioe) { 
  210.           success=false; 
  211.           totalExceptions++;
  212.           handleException("creating file #" + index, ioe, ++singleFileExceptions);
  213.         }
  214.       } while (!success);
  215.     }
  216.     return totalExceptions;
  217.   }
  218.     
  219.   /**
  220.    * This launches a given namenode operation (<code>-operation</code>),
  221.    * starting at a given time (<code>-startTime</code>).  The files used
  222.    * by the openRead, rename, and delete operations are the same files
  223.    * created by the createWrite operation.  Typically, the program
  224.    * would be run four times, once for each operation in this order:
  225.    * createWrite, openRead, rename, delete.
  226.    *
  227.    * <pre>
  228.    * Usage: nnbench 
  229.    *          -operation <one of createWrite, openRead, rename, or delete>
  230.    *          -baseDir <base output/input DFS path>
  231.    *          -startTime <time to start, given in seconds from the epoch>
  232.    *          -numFiles <number of files to create, read, rename, or delete>
  233.    *          -blocksPerFile <number of blocks to create per file>
  234.    *         [-bytesPerBlock <number of bytes to write to each block, default is 1>]
  235.    *         [-bytesPerChecksum <value for io.bytes.per.checksum>]
  236.    * </pre>
  237.    *
  238.    * @throws IOException indicates a problem with test startup
  239.    */
  240.   public static void main(String[] args) throws IOException {
  241.     String version = "NameNodeBenchmark.0.3";
  242.     System.out.println(version);
  243.     int bytesPerChecksum = -1;
  244.     
  245.     String usage =
  246.       "Usage: nnbench " +
  247.       "  -operation <one of createWrite, openRead, rename, or delete> " +
  248.       "  -baseDir <base output/input DFS path> " +
  249.       "  -startTime <time to start, given in seconds from the epoch> " +
  250.       "  -numFiles <number of files to create> " +
  251.       "  -blocksPerFile <number of blocks to create per file> " +
  252.       "  [-bytesPerBlock <number of bytes to write to each block, default is 1>] " +
  253.       "  [-bytesPerChecksum <value for io.bytes.per.checksum>]" +
  254.       "Note: bytesPerBlock MUST be a multiple of bytesPerChecksum";
  255.     
  256.     String operation = null;
  257.     for (int i = 0; i < args.length; i++) { // parse command line
  258.       if (args[i].equals("-baseDir")) {
  259.         baseDir = new Path(args[++i]);
  260.       } else if (args[i].equals("-numFiles")) {
  261.         numFiles = Integer.parseInt(args[++i]);
  262.       } else if (args[i].equals("-blocksPerFile")) {
  263.         blocksPerFile = Integer.parseInt(args[++i]);
  264.       } else if (args[i].equals("-bytesPerBlock")) {
  265.         bytesPerBlock = Long.parseLong(args[++i]);
  266.       } else if (args[i].equals("-bytesPerChecksum")) {
  267.         bytesPerChecksum = Integer.parseInt(args[++i]);        
  268.       } else if (args[i].equals("-startTime")) {
  269.         startTime = Long.parseLong(args[++i]) * 1000;
  270.       } else if (args[i].equals("-operation")) {
  271.         operation = args[++i];
  272.       } else {
  273.         System.out.println(usage);
  274.         System.exit(-1);
  275.       }
  276.     }
  277.     bytesPerFile = bytesPerBlock * blocksPerFile;
  278.     
  279.     JobConf jobConf = new JobConf(new Configuration(), NNBench.class);
  280.     
  281.     if ( bytesPerChecksum < 0 ) { // if it is not set in cmdline
  282.       bytesPerChecksum = jobConf.getInt("io.bytes.per.checksum", 512);
  283.     }
  284.     jobConf.set("io.bytes.per.checksum", Integer.toString(bytesPerChecksum));
  285.     
  286.     System.out.println("Inputs: ");
  287.     System.out.println("   operation: " + operation);
  288.     System.out.println("   baseDir: " + baseDir);
  289.     System.out.println("   startTime: " + startTime);
  290.     System.out.println("   numFiles: " + numFiles);
  291.     System.out.println("   blocksPerFile: " + blocksPerFile);
  292.     System.out.println("   bytesPerBlock: " + bytesPerBlock);
  293.     System.out.println("   bytesPerChecksum: " + bytesPerChecksum);
  294.     
  295.     if (operation == null ||  // verify args
  296.         baseDir == null ||
  297.         numFiles < 1 ||
  298.         blocksPerFile < 1 ||
  299.         bytesPerBlock < 0 ||
  300.         bytesPerBlock % bytesPerChecksum != 0)
  301.       {
  302.         System.err.println(usage);
  303.         System.exit(-1);
  304.       }
  305.     
  306.     fileSys = FileSystem.get(jobConf);
  307.     uniqueId = java.net.InetAddress.getLocalHost().getHostName();
  308.     taskDir = new Path(baseDir, uniqueId);
  309.     // initialize buffer used for writing/reading file
  310.     buffer = new byte[(int) Math.min(bytesPerFile, 32768L)];
  311.     
  312.     Date execTime;
  313.     Date endTime;
  314.     long duration;
  315.     int exceptions = 0;
  316.     barrier(); // wait for coordinated start time
  317.     execTime = new Date();
  318.     System.out.println("Job started: " + startTime);
  319.     if (operation.equals("createWrite")) {
  320.       if (!fileSys.mkdirs(taskDir)) {
  321.         throw new IOException("Mkdirs failed to create " + taskDir.toString());
  322.       }
  323.       exceptions = createWrite();
  324.     } else if (operation.equals("openRead")) {
  325.       exceptions = openRead();
  326.     } else if (operation.equals("rename")) {
  327.       exceptions = rename();
  328.     } else if (operation.equals("delete")) {
  329.       exceptions = delete();
  330.     } else {
  331.       System.err.println(usage);
  332.       System.exit(-1);
  333.     }
  334.     endTime = new Date();
  335.     System.out.println("Job ended: " + endTime);
  336.     duration = (endTime.getTime() - execTime.getTime()) /1000;
  337.     System.out.println("The " + operation + " job took " + duration + " seconds.");
  338.     System.out.println("The job recorded " + exceptions + " exceptions.");
  339.   }
  340. }