LoadGenerator.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:17k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.fs.loadGenerator;
  19. import java.io.IOException;
  20. import java.io.InputStream;
  21. import java.net.InetAddress;
  22. import java.net.UnknownHostException;
  23. import java.util.ArrayList;
  24. import java.util.Random;
  25. import org.apache.hadoop.conf.Configuration;
  26. import org.apache.hadoop.conf.Configured;
  27. import org.apache.hadoop.fs.FSDataOutputStream;
  28. import org.apache.hadoop.fs.FileStatus;
  29. import org.apache.hadoop.fs.FileSystem;
  30. import org.apache.hadoop.fs.Path;
  31. import org.apache.hadoop.util.Tool;
  32. import org.apache.hadoop.util.ToolRunner;
  33. /** The load generator is a tool for testing NameNode behavior under
  34.  * different client loads.
  35.  * It allows the user to generate different mixes of read, write,
  36.  * and list requests by specifying the probabilities of read and
  37.  * write. The user controls the intensity of the load by
  38.  * adjusting parameters for the number of worker threads and the delay
  39.  * between operations. While load generators are running, the user
  40.  * can profile and monitor the running of the NameNode. When a load
  41.  * generator exits, it print some NameNode statistics like the average
  42.  * execution time of each kind of operations and the NameNode
  43.  * throughput.
  44.  * 
  45.  * After command line argument parsing and data initialization,
  46.  * the load generator spawns the number of worker threads 
  47.  * as specified by the user.
  48.  * Each thread sends a stream of requests to the NameNode.
  49.  * For each iteration, it first decides if it is going to read a file,
  50.  * create a file, or listing a directory following the read and write 
  51.  * probabilities specified by the user.
  52.  * When reading, it randomly picks a file in the test space and reads
  53.  * the entire file. When writing, it randomly picks a directory in the
  54.  * test space and creates a file whose name consists of the current 
  55.  * machine's host name and the thread id. The length of the file
  56.  * follows Gaussian distribution with an average size of 2 blocks and
  57.  * the standard deviation of 1 block. The new file is filled with 'a'.
  58.  * Immediately after the file creation completes, the file is deleted
  59.  * from the test space.
  60.  * While listing, it randomly picks a directory in the test space and
  61.  * list the directory content.
  62.  * Between two consecutive operations, the thread pauses for a random
  63.  * amount of time in the range of [0, maxDelayBetweenOps] 
  64.  * if the specified max delay is not zero.
  65.  * All threads are stopped when the specified elapsed time is passed.
  66.  * Before exiting, the program prints the average execution for 
  67.  * each kind of NameNode operations, and the number of requests
  68.  * served by the NameNode.
  69.  *
  70.  * The synopsis of the command is
  71.  * java LoadGenerator
  72.  *   -readProbability <read probability>: read probability [0, 1]
  73.  *                                        with a default value of 0.3333. 
  74.  *   -writeProbability <write probability>: write probability [0, 1]
  75.  *                                         with a default value of 0.3333.
  76.  *   -root <root>: test space with a default value of /testLoadSpace
  77.  *   -maxDelayBetweenOps <maxDelayBetweenOpsInMillis>: 
  78.  *      Max delay in the unit of milliseconds between two operations with a 
  79.  *      default value of 0 indicating no delay.
  80.  *   -numOfThreads <numOfThreads>: 
  81.  *      number of threads to spawn with a default value of 200.
  82.  *   -elapsedTime <elapsedTimeInSecs>: 
  83.  *      the elapsed time of program with a default value of 0 
  84.  *      indicating running forever
  85.  *   -startTime <startTimeInMillis> : when the threads start to run.
  86.  */
  87. public class LoadGenerator extends Configured implements Tool {
  88.   private volatile boolean shouldRun = true;
  89.   private Path root = DataGenerator.DEFAULT_ROOT;
  90.   private FileSystem fs;
  91.   private int maxDelayBetweenOps = 0;
  92.   private int numOfThreads = 200;
  93.   private double readPr = 0.3333;
  94.   private double writePr = 0.3333;
  95.   private long elapsedTime = 0;
  96.   private long startTime = System.currentTimeMillis()+10000;
  97.   final static private int BLOCK_SIZE = 10;
  98.   private ArrayList<String> files = new ArrayList<String>();  // a table of file names
  99.   private ArrayList<String> dirs = new ArrayList<String>(); // a table of directory names
  100.   private Random r = null;
  101.   final private static String USAGE = "java LoadGeneratorn" +
  102.    "-readProbability <read probability>n" +
  103.     "-writeProbability <write probability>n" +
  104.     "-root <root>n" +
  105.     "-maxDelayBetweenOps <maxDelayBetweenOpsInMillis>n" +
  106.     "-numOfThreads <numOfThreads>n" +
  107.     "-elapsedTime <elapsedTimeInSecs>n" +
  108.     "-startTime <startTimeInMillis>";
  109.   final private String hostname;
  110.   
  111.   /** Constructor */
  112.   public LoadGenerator() throws IOException, UnknownHostException {
  113.     InetAddress addr = InetAddress.getLocalHost();
  114.     hostname = addr.getHostName();
  115.   }
  116.   private final static int OPEN = 0;
  117.   private final static int LIST = 1;
  118.   private final static int CREATE = 2;
  119.   private final static int WRITE_CLOSE = 3;
  120.   private final static int DELETE = 4;
  121.   private final static int TOTAL_OP_TYPES =5;
  122.   private long [] executionTime = new long[TOTAL_OP_TYPES];
  123.   private long [] totalNumOfOps = new long[TOTAL_OP_TYPES];
  124.   
  125.   /** A thread sends a stream of requests to the NameNode.
  126.    * At each iteration, it first decides if it is going to read a file,
  127.    * create a file, or listing a directory following the read
  128.    * and write probabilities.
  129.    * When reading, it randomly picks a file in the test space and reads
  130.    * the entire file. When writing, it randomly picks a directory in the
  131.    * test space and creates a file whose name consists of the current 
  132.    * machine's host name and the thread id. The length of the file
  133.    * follows Gaussian distribution with an average size of 2 blocks and
  134.    * the standard deviation of 1 block. The new file is filled with 'a'.
  135.    * Immediately after the file creation completes, the file is deleted
  136.    * from the test space.
  137.    * While listing, it randomly picks a directory in the test space and
  138.    * list the directory content.
  139.    * Between two consecutive operations, the thread pauses for a random
  140.    * amount of time in the range of [0, maxDelayBetweenOps] 
  141.    * if the specified max delay is not zero.
  142.    * A thread runs for the specified elapsed time if the time isn't zero.
  143.    * Otherwise, it runs forever.
  144.    */
  145.   private class DFSClientThread extends Thread {
  146.     private int id;
  147.     private long [] executionTime = new long[TOTAL_OP_TYPES];
  148.     private long [] totalNumOfOps = new long[TOTAL_OP_TYPES];
  149.     private byte[] buffer = new byte[1024];
  150.     
  151.     private DFSClientThread(int id) {
  152.       this.id = id;
  153.     }
  154.     
  155.     /** Main loop
  156.      * Each iteration decides what's the next operation and then pauses.
  157.      */
  158.     public void run() {
  159.       try {
  160.         while (shouldRun) {
  161.           nextOp();
  162.           delay();
  163.         }
  164.       } catch (Exception ioe) {
  165.         System.err.println(ioe.getLocalizedMessage());
  166.         ioe.printStackTrace();
  167.       }
  168.     }
  169.     
  170.     /** Let the thread pause for a random amount of time in the range of
  171.      * [0, maxDelayBetweenOps] if the delay is not zero. Otherwise, no pause.
  172.      */
  173.     private void delay() throws InterruptedException {
  174.       if (maxDelayBetweenOps>0) {
  175.         int delay = r.nextInt(maxDelayBetweenOps);
  176.         Thread.sleep(delay);
  177.       }
  178.     }
  179.     
  180.     /** Perform the next operation. 
  181.      * 
  182.      * Depending on the read and write probabilities, the next
  183.      * operation could be either read, write, or list.
  184.      */
  185.     private void nextOp() throws IOException {
  186.       double rn = r.nextDouble();
  187.       if (rn < readPr) {
  188.         read();
  189.       } else if (rn < readPr+writePr) {
  190.         write();
  191.       } else {
  192.         list();
  193.       }
  194.     }
  195.     
  196.     /** Read operation randomly picks a file in the test space and reads
  197.      * the entire file */
  198.     private void read() throws IOException {
  199.       String fileName = files.get(r.nextInt(files.size()));
  200.       long startTime = System.currentTimeMillis();
  201.       InputStream in = fs.open(new Path(fileName));
  202.       executionTime[OPEN] += (System.currentTimeMillis()-startTime);
  203.       totalNumOfOps[OPEN]++;
  204.       while (in.read(buffer) != -1) {}
  205.       in.close();
  206.     }
  207.     
  208.     /** The write operation randomly picks a directory in the
  209.      * test space and creates a file whose name consists of the current 
  210.      * machine's host name and the thread id. The length of the file
  211.      * follows Gaussian distribution with an average size of 2 blocks and
  212.      * the standard deviation of 1 block. The new file is filled with 'a'.
  213.      * Immediately after the file creation completes, the file is deleted
  214.      * from the test space.
  215.      */
  216.     private void write() throws IOException {
  217.       String dirName = dirs.get(r.nextInt(dirs.size()));
  218.       Path file = new Path(dirName, hostname+id);
  219.       double fileSize = 0;
  220.       while ((fileSize = r.nextGaussian()+2)<=0) {}
  221.       genFile(file, (long)(fileSize*BLOCK_SIZE));
  222.       long startTime = System.currentTimeMillis();
  223.       fs.delete(file, true);
  224.       executionTime[DELETE] += (System.currentTimeMillis()-startTime);
  225.       totalNumOfOps[DELETE]++;
  226.     }
  227.     
  228.     /** The list operation randomly picks a directory in the test space and
  229.      * list the directory content.
  230.      */
  231.     private void list() throws IOException {
  232.       String dirName = dirs.get(r.nextInt(dirs.size()));
  233.       long startTime = System.currentTimeMillis();
  234.       fs.listStatus(new Path(dirName));
  235.       executionTime[LIST] += (System.currentTimeMillis()-startTime);
  236.       totalNumOfOps[LIST]++;
  237.     }
  238.   }
  239.   
  240.   /** Main function:
  241.    * It first initializes data by parsing the command line arguments.
  242.    * It then starts the number of DFSClient threads as specified by
  243.    * the user.
  244.    * It stops all the threads when the specified elapsed time is passed.
  245.    * Before exiting, it prints the average execution for 
  246.    * each operation and operation throughput.
  247.    */
  248.   public int run(String[] args) throws Exception {
  249.     int exitCode = init(args);
  250.     if (exitCode != 0) {
  251.       return exitCode;
  252.     }
  253.     
  254.     barrier();
  255.     
  256.     DFSClientThread[] threads = new DFSClientThread[numOfThreads];
  257.     for (int i=0; i<numOfThreads; i++) {
  258.       threads[i] = new DFSClientThread(i); 
  259.       threads[i].start();
  260.     }
  261.     if (elapsedTime>0) {
  262.       Thread.sleep(elapsedTime*1000);
  263.       shouldRun = false;
  264.     } 
  265.     for (DFSClientThread thread : threads) {
  266.       thread.join();
  267.       for (int i=0; i<TOTAL_OP_TYPES; i++) {
  268.         executionTime[i] += thread.executionTime[i];
  269.         totalNumOfOps[i] += thread.totalNumOfOps[i];
  270.       }
  271.     }
  272.     long totalOps = 0;
  273.     for (int i=0; i<TOTAL_OP_TYPES; i++) {
  274.       totalOps += totalNumOfOps[i];
  275.     }
  276.     
  277.     if (totalNumOfOps[OPEN] != 0) {
  278.       System.out.println("Average open execution time: " + 
  279.           (double)executionTime[OPEN]/totalNumOfOps[OPEN] + "ms");
  280.     }
  281.     if (totalNumOfOps[LIST] != 0) {
  282.       System.out.println("Average list execution time: " + 
  283.           (double)executionTime[LIST]/totalNumOfOps[LIST] + "ms");
  284.     }
  285.     if (totalNumOfOps[DELETE] != 0) {
  286.       System.out.println("Average deletion execution time: " + 
  287.           (double)executionTime[DELETE]/totalNumOfOps[DELETE] + "ms");
  288.       System.out.println("Average create execution time: " + 
  289.           (double)executionTime[CREATE]/totalNumOfOps[CREATE] + "ms");
  290.       System.out.println("Average write_close execution time: " + 
  291.           (double)executionTime[WRITE_CLOSE]/totalNumOfOps[WRITE_CLOSE] + "ms");
  292.     }
  293.     if (elapsedTime != 0) { 
  294.       System.out.println("Average operations per second: " + 
  295.           (double)totalOps/elapsedTime +"ops/s");
  296.     }
  297.     System.out.println();
  298.     return exitCode;
  299.   }
  300.   /** Parse the command line arguments and initialize the data */
  301.   private int init(String[] args) throws IOException {
  302.     try {
  303.       fs = FileSystem.get(getConf());
  304.     } catch (IOException ioe) {
  305.       System.err.println("Can not initialize the file system: " + 
  306.           ioe.getLocalizedMessage());
  307.       return -1;
  308.     }
  309.     int hostHashCode = hostname.hashCode();
  310.     try {
  311.       for (int i = 0; i < args.length; i++) { // parse command line
  312.         if (args[i].equals("-readProbability")) {
  313.           readPr = Double.parseDouble(args[++i]);
  314.           if (readPr<0 || readPr>1) {
  315.             System.err.println( 
  316.                 "The read probability must be [0, 1]: " + readPr);
  317.             return -1;
  318.           }
  319.         } else if (args[i].equals("-writeProbability")) {
  320.           writePr = Double.parseDouble(args[++i]);
  321.           if (writePr<0 || writePr>1) {
  322.             System.err.println( 
  323.                 "The write probability must be [0, 1]: " + writePr);
  324.             return -1;
  325.           }
  326.         } else if (args[i].equals("-root")) {
  327.           root = new Path(args[++i]);
  328.         } else if (args[i].equals("-maxDelayBetweenOps")) {
  329.           maxDelayBetweenOps = Integer.parseInt(args[++i]); // in milliseconds
  330.         } else if (args[i].equals("-numOfThreads")) {
  331.           numOfThreads = Integer.parseInt(args[++i]);
  332.           if (numOfThreads <= 0) {
  333.             System.err.println(
  334.                 "Number of threads must be positive: " + numOfThreads);
  335.             return -1;
  336.           }
  337.         } else if (args[i].equals("-startTime")) {
  338.           startTime = Long.parseLong(args[++i]);
  339.         } else if (args[i].equals("-elapsedTime")) {
  340.           elapsedTime = Long.parseLong(args[++i]);
  341.         } else if (args[i].equals("-seed")) {
  342.           r = new Random(Long.parseLong(args[++i])+hostHashCode);
  343.         } else {
  344.           System.err.println(USAGE);
  345.           ToolRunner.printGenericCommandUsage(System.err);
  346.           return -1;
  347.         }
  348.       }
  349.     } catch (NumberFormatException e) {
  350.       System.err.println("Illegal parameter: " + e.getLocalizedMessage());
  351.       System.err.println(USAGE);
  352.       return -1;
  353.     }
  354.     if (readPr+writePr <0 || readPr+writePr>1) {
  355.       System.err.println(
  356.           "The sum of read probability and write probability must be [0, 1]: " +
  357.           readPr + " "+writePr);
  358.       return -1;
  359.     }
  360.     
  361.     if (r==null) {
  362.       r = new Random(System.currentTimeMillis()+hostHashCode);
  363.     }
  364.     
  365.     return initFileDirTables();
  366.   }
  367.   
  368.   /** Create a table that contains all directories under root and
  369.    * another table that contains all files under root.
  370.    */
  371.   private int initFileDirTables() {
  372.     try {
  373.       initFileDirTables(root);
  374.     } catch (IOException e) {
  375.       System.err.println(e.getLocalizedMessage());
  376.       e.printStackTrace();
  377.       return -1;
  378.     }
  379.     if (dirs.isEmpty()) {
  380.       System.err.println("The test space " + root + " is empty");
  381.       return -1;
  382.     }
  383.     if (files.isEmpty()) {
  384.       System.err.println("The test space " + root + 
  385.           " does not have any file");
  386.       return -1;
  387.     }
  388.     return 0;
  389.   }
  390.   
  391.   /** Create a table that contains all directories under the specified path and
  392.    * another table that contains all files under the specified path and
  393.    * whose name starts with "_file_".
  394.    */
  395.   private void initFileDirTables(Path path) throws IOException {
  396.     FileStatus[] stats = fs.listStatus(path);
  397.     if (stats != null) { 
  398.       for (FileStatus stat : stats) {
  399.         if (stat.isDir()) {
  400.           dirs.add(stat.getPath().toString());
  401.           initFileDirTables(stat.getPath());
  402.         } else {
  403.           Path filePath = stat.getPath();
  404.           if (filePath.getName().startsWith(StructureGenerator.FILE_NAME_PREFIX)) {
  405.             files.add(filePath.toString());
  406.           }
  407.         }
  408.       }
  409.     }
  410.   }
  411.   
  412.   /** Returns when the current number of seconds from the epoch equals
  413.    * the command line argument given by <code>-startTime</code>.
  414.    * This allows multiple instances of this program, running on clock
  415.    * synchronized nodes, to start at roughly the same time.
  416.    */
  417.   private void barrier() {
  418.     long sleepTime;
  419.     while ((sleepTime = startTime - System.currentTimeMillis()) > 0) {
  420.       try {
  421.         Thread.sleep(sleepTime);
  422.       } catch (InterruptedException ex) {
  423.       }
  424.     }
  425.   }
  426.   /** Create a file with a length of <code>fileSize</code>.
  427.    * The file is filled with 'a'.
  428.    */
  429.   private void genFile(Path file, long fileSize) throws IOException {
  430.     long startTime = System.currentTimeMillis();
  431.     FSDataOutputStream out = fs.create(file, true, 
  432.         getConf().getInt("io.file.buffer.size", 4096),
  433.         (short)getConf().getInt("dfs.replication", 3),
  434.         fs.getDefaultBlockSize());
  435.     executionTime[CREATE] += (System.currentTimeMillis()-startTime);
  436.     totalNumOfOps[CREATE]++;
  437.     for (long i=0; i<fileSize; i++) {
  438.       out.writeByte('a');
  439.     }
  440.     startTime = System.currentTimeMillis();
  441.     out.close();
  442.     executionTime[WRITE_CLOSE] += (System.currentTimeMillis()-startTime);
  443.     totalNumOfOps[WRITE_CLOSE]++;
  444.   }
  445.   
  446.   /** Main program
  447.    * 
  448.    * @param args command line arguments
  449.    * @throws Exception
  450.    */
  451.   public static void main(String[] args) throws Exception {
  452.     int res = ToolRunner.run(new Configuration(),
  453.         new LoadGenerator(), args);
  454.     System.exit(res);
  455.   }
  456. }