NNThroughputBenchmark.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:40k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfs.server.namenode;
  19. import java.io.File;
  20. import java.io.FileOutputStream;
  21. import java.io.IOException;
  22. import java.util.ArrayList;
  23. import java.util.Arrays;
  24. import java.util.List;
  25. import javax.security.auth.login.LoginException;
  26. import org.apache.commons.logging.Log;
  27. import org.apache.commons.logging.LogFactory;
  28. import org.apache.commons.logging.impl.Log4JLogger;
  29. import org.apache.hadoop.conf.Configuration;
  30. import org.apache.hadoop.fs.permission.FsPermission;
  31. import org.apache.hadoop.hdfs.protocol.Block;
  32. import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
  33. import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
  34. import org.apache.hadoop.hdfs.protocol.FSConstants;
  35. import org.apache.hadoop.hdfs.protocol.LocatedBlock;
  36. import org.apache.hadoop.hdfs.server.datanode.DataNode;
  37. import org.apache.hadoop.hdfs.server.datanode.DataStorage;
  38. import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
  39. import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
  40. import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
  41. import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
  42. import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
  43. import org.apache.hadoop.net.DNS;
  44. import org.apache.hadoop.net.NetworkTopology;
  45. import org.apache.hadoop.security.UnixUserGroupInformation;
  46. import org.apache.hadoop.security.UserGroupInformation;
  47. import org.apache.hadoop.util.StringUtils;
  48. import org.apache.log4j.Level;
  49. /**
  50.  * Main class for a series of name-node benchmarks.
  51.  * 
  52.  * Each benchmark measures throughput and average execution time 
  53.  * of a specific name-node operation, e.g. file creation or block reports.
  54.  * 
  55.  * The benchmark does not involve any other hadoop components
  56.  * except for the name-node. Each operation is executed
  57.  * by calling directly the respective name-node method.
  58.  * The name-node here is real all other components are simulated.
  59.  * 
  60.  * Command line arguments for the benchmark include:<br>
  61.  * 1) total number of operations to be performed,<br>
  62.  * 2) number of threads to run these operations,<br>
  63.  * 3) followed by operation specific input parameters.
  64.  * 
  65.  * Then the benchmark generates inputs for each thread so that the
  66.  * input generation overhead does not effect the resulting statistics.
  67.  * The number of operations performed by threads practically is the same. 
  68.  * Precisely, the difference between the number of operations 
  69.  * performed by any two threads does not exceed 1.
  70.  * 
  71.  * Then the benchmark executes the specified number of operations using 
  72.  * the specified number of threads and outputs the resulting stats.
  73.  */
  74. public class NNThroughputBenchmark {
  75.   private static final Log LOG = LogFactory.getLog(NNThroughputBenchmark.class);
  76.   private static final int BLOCK_SIZE = 16;
  77.   static Configuration config;
  78.   static NameNode nameNode;
  79.   private final UserGroupInformation ugi;
  80.   NNThroughputBenchmark(Configuration conf) throws IOException, LoginException {
  81.     config = conf;
  82.     ugi = UnixUserGroupInformation.login(config);
  83.     UserGroupInformation.setCurrentUser(ugi);
  84.     // We do not need many handlers, since each thread simulates a handler
  85.     // by calling name-node methods directly
  86.     config.setInt("dfs.namenode.handler.count", 1);
  87.     // set exclude file
  88.     config.set("dfs.hosts.exclude", "${hadoop.tmp.dir}/dfs/hosts/exclude");
  89.     File excludeFile = new File(config.get("dfs.hosts.exclude", "exclude"));
  90.     if(! excludeFile.exists()) {
  91.       if(!excludeFile.getParentFile().mkdirs())
  92.         throw new IOException("NNThroughputBenchmark: cannot mkdir " + excludeFile);
  93.     }
  94.     new FileOutputStream(excludeFile).close();
  95.     // Start the NameNode
  96.     String[] argv = new String[] {};
  97.     nameNode = NameNode.createNameNode(argv, config);
  98.   }
  99.   void close() throws IOException {
  100.     nameNode.stop();
  101.   }
  102.   static void turnOffNameNodeLogging() {
  103.     // change log level to ERROR: NameNode.LOG & NameNode.stateChangeLog
  104.     ((Log4JLogger)NameNode.LOG).getLogger().setLevel(Level.ERROR);
  105.     ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ERROR);
  106.     ((Log4JLogger)NetworkTopology.LOG).getLogger().setLevel(Level.ERROR);
  107.     ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ERROR);
  108.     ((Log4JLogger)FSNamesystem.auditLog).getLogger().setLevel(Level.ERROR);
  109.     ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ERROR);
  110.   }
  111.   /**
  112.    * Base class for collecting operation statistics.
  113.    * 
  114.    * Overload this class in order to run statistics for a 
  115.    * specific name-node operation.
  116.    */
  117.   abstract class OperationStatsBase {
  118.     protected static final String BASE_DIR_NAME = "/nnThroughputBenchmark";
  119.     protected static final String OP_ALL_NAME = "all";
  120.     protected static final String OP_ALL_USAGE = "-op all " +
  121.                                   "<other ops options> [-keepResults]";
  122.     protected String baseDir;
  123.     protected short replication;
  124.     protected int  numThreads = 0;        // number of threads
  125.     protected int  numOpsRequired = 0;    // number of operations requested
  126.     protected int  numOpsExecuted = 0;    // number of operations executed
  127.     protected long cumulativeTime = 0;    // sum of times for each op
  128.     protected long elapsedTime = 0;       // time from start to finish
  129.     protected boolean keepResults = false;// don't clean base directory on exit
  130.     protected List<StatsDaemon> daemons;
  131.     /**
  132.      * Operation name.
  133.      */
  134.     abstract String getOpName();
  135.     /**
  136.      * Parse command line arguments.
  137.      * 
  138.      * @param args arguments
  139.      * @throws IOException
  140.      */
  141.     abstract void parseArguments(List<String> args) throws IOException;
  142.     /**
  143.      * Generate inputs for each daemon thread.
  144.      * 
  145.      * @param opsPerThread number of inputs for each thread.
  146.      * @throws IOException
  147.      */
  148.     abstract void generateInputs(int[] opsPerThread) throws IOException;
  149.     /**
  150.      * This corresponds to the arg1 argument of 
  151.      * {@link #executeOp(int, int, String)}, which can have different meanings
  152.      * depending on the operation performed.
  153.      * 
  154.      * @param daemonId
  155.      * @return the argument
  156.      */
  157.     abstract String getExecutionArgument(int daemonId);
  158.     /**
  159.      * Execute name-node operation.
  160.      * 
  161.      * @param daemonId id of the daemon calling this method.
  162.      * @param inputIdx serial index of the operation called by the deamon.
  163.      * @param arg1 operation specific argument.
  164.      * @return time of the individual name-node call.
  165.      * @throws IOException
  166.      */
  167.     abstract long executeOp(int daemonId, int inputIdx, String arg1) throws IOException;
  168.     /**
  169.      * Print the results of the benchmarking.
  170.      */
  171.     abstract void printResults();
  172.     OperationStatsBase() {
  173.       baseDir = BASE_DIR_NAME + "/" + getOpName();
  174.       replication = (short) config.getInt("dfs.replication", 3);
  175.       numOpsRequired = 10;
  176.       numThreads = 3;
  177.     }
  178.     void benchmark() throws IOException {
  179.       daemons = new ArrayList<StatsDaemon>();
  180.       long start = 0;
  181.       try {
  182.         numOpsExecuted = 0;
  183.         cumulativeTime = 0;
  184.         if(numThreads < 1)
  185.           return;
  186.         int tIdx = 0; // thread index < nrThreads
  187.         int opsPerThread[] = new int[numThreads];
  188.         for(int opsScheduled = 0; opsScheduled < numOpsRequired; 
  189.                                   opsScheduled += opsPerThread[tIdx++]) {
  190.           // execute  in a separate thread
  191.           opsPerThread[tIdx] = (numOpsRequired-opsScheduled)/(numThreads-tIdx);
  192.           if(opsPerThread[tIdx] == 0)
  193.             opsPerThread[tIdx] = 1;
  194.         }
  195.         // if numThreads > numOpsRequired then the remaining threads will do nothing
  196.         for(; tIdx < numThreads; tIdx++)
  197.           opsPerThread[tIdx] = 0;
  198.         turnOffNameNodeLogging();
  199.         generateInputs(opsPerThread);
  200.         for(tIdx=0; tIdx < numThreads; tIdx++)
  201.           daemons.add(new StatsDaemon(tIdx, opsPerThread[tIdx], this));
  202.         start = System.currentTimeMillis();
  203.         LOG.info("Starting " + numOpsRequired + " " + getOpName() + "(s).");
  204.         for(StatsDaemon d : daemons)
  205.           d.start();
  206.       } finally {
  207.         while(isInPorgress()) {
  208.           // try {Thread.sleep(500);} catch (InterruptedException e) {}
  209.         }
  210.         elapsedTime = System.currentTimeMillis() - start;
  211.         for(StatsDaemon d : daemons) {
  212.           incrementStats(d.localNumOpsExecuted, d.localCumulativeTime);
  213.           // System.out.println(d.toString() + ": ops Exec = " + d.localNumOpsExecuted);
  214.         }
  215.       }
  216.     }
  217.     private boolean isInPorgress() {
  218.       for(StatsDaemon d : daemons)
  219.         if(d.isInProgress())
  220.           return true;
  221.       return false;
  222.     }
  223.     void cleanUp() throws IOException {
  224.       nameNode.setSafeMode(FSConstants.SafeModeAction.SAFEMODE_LEAVE);
  225.       if(!keepResults)
  226.         nameNode.delete(getBaseDir(), true);
  227.     }
  228.     int getNumOpsExecuted() {
  229.       return numOpsExecuted;
  230.     }
  231.     long getCumulativeTime() {
  232.       return cumulativeTime;
  233.     }
  234.     long getElapsedTime() {
  235.       return elapsedTime;
  236.     }
  237.     long getAverageTime() {
  238.       return numOpsExecuted == 0 ? 0 : cumulativeTime / numOpsExecuted;
  239.     }
  240.     double getOpsPerSecond() {
  241.       return elapsedTime == 0 ? 0 : 1000*(double)numOpsExecuted / elapsedTime;
  242.     }
  243.     String getBaseDir() {
  244.       return baseDir;
  245.     }
  246.     String getClientName(int idx) {
  247.       return getOpName() + "-client-" + idx;
  248.     }
  249.     void incrementStats(int ops, long time) {
  250.       numOpsExecuted += ops;
  251.       cumulativeTime += time;
  252.     }
  253.     /**
  254.      * Parse first 2 arguments, corresponding to the "-op" option.
  255.      * 
  256.      * @param args
  257.      * @return true if operation is all, which means that options not related
  258.      * to this operation should be ignored, or false otherwise, meaning
  259.      * that usage should be printed when an unrelated option is encountered.
  260.      * @throws IOException
  261.      */
  262.     protected boolean verifyOpArgument(List<String> args) {
  263.       if(args.size() < 2 || ! args.get(0).startsWith("-op"))
  264.         printUsage();
  265.       int krIndex = args.indexOf("-keepResults");
  266.       keepResults = (krIndex >= 0);
  267.       if(keepResults) {
  268.         args.remove(krIndex);
  269.       }
  270.       String type = args.get(1);
  271.       if(OP_ALL_NAME.equals(type)) {
  272.         type = getOpName();
  273.         return true;
  274.       }
  275.       if(!getOpName().equals(type))
  276.         printUsage();
  277.       return false;
  278.     }
  279.     void printStats() {
  280.       LOG.info("--- " + getOpName() + " stats  ---");
  281.       LOG.info("# operations: " + getNumOpsExecuted());
  282.       LOG.info("Elapsed Time: " + getElapsedTime());
  283.       LOG.info(" Ops per sec: " + getOpsPerSecond());
  284.       LOG.info("Average Time: " + getAverageTime());
  285.     }
  286.   }
  287.   /**
  288.    * One of the threads that perform stats operations.
  289.    */
  290.   private class StatsDaemon extends Thread {
  291.     private int daemonId;
  292.     private int opsPerThread;
  293.     private String arg1;      // argument passed to executeOp()
  294.     private volatile int  localNumOpsExecuted = 0;
  295.     private volatile long localCumulativeTime = 0;
  296.     private OperationStatsBase statsOp;
  297.     StatsDaemon(int daemonId, int nrOps, OperationStatsBase op) {
  298.       this.daemonId = daemonId;
  299.       this.opsPerThread = nrOps;
  300.       this.statsOp = op;
  301.       setName(toString());
  302.     }
  303.     public void run() {
  304.       UserGroupInformation.setCurrentUser(ugi);
  305.       localNumOpsExecuted = 0;
  306.       localCumulativeTime = 0;
  307.       arg1 = statsOp.getExecutionArgument(daemonId);
  308.       try {
  309.         benchmarkOne();
  310.       } catch(IOException ex) {
  311.         LOG.error("StatsDaemon " + daemonId + " failed: n" 
  312.             + StringUtils.stringifyException(ex));
  313.       }
  314.     }
  315.     public String toString() {
  316.       return "StatsDaemon-" + daemonId;
  317.     }
  318.     void benchmarkOne() throws IOException {
  319.       for(int idx = 0; idx < opsPerThread; idx++) {
  320.         long stat = statsOp.executeOp(daemonId, idx, arg1);
  321.         localNumOpsExecuted++;
  322.         localCumulativeTime += stat;
  323.       }
  324.     }
  325.     boolean isInProgress() {
  326.       return localNumOpsExecuted < opsPerThread;
  327.     }
  328.     /**
  329.      * Schedule to stop this daemon.
  330.      */
  331.     void terminate() {
  332.       opsPerThread = localNumOpsExecuted;
  333.     }
  334.   }
  335.   /**
  336.    * Clean all benchmark result directories.
  337.    */
  338.   class CleanAllStats extends OperationStatsBase {
  339.     // Operation types
  340.     static final String OP_CLEAN_NAME = "clean";
  341.     static final String OP_CLEAN_USAGE = "-op clean";
  342.     CleanAllStats(List<String> args) {
  343.       super();
  344.       parseArguments(args);
  345.       numOpsRequired = 1;
  346.       numThreads = 1;
  347.       keepResults = true;
  348.     }
  349.     String getOpName() {
  350.       return OP_CLEAN_NAME;
  351.     }
  352.     void parseArguments(List<String> args) {
  353.       boolean ignoreUnrelatedOptions = verifyOpArgument(args);
  354.       if(args.size() > 2 && !ignoreUnrelatedOptions)
  355.         printUsage();
  356.     }
  357.     void generateInputs(int[] opsPerThread) throws IOException {
  358.       // do nothing
  359.     }
  360.     /**
  361.      * Does not require the argument
  362.      */
  363.     String getExecutionArgument(int daemonId) {
  364.       return null;
  365.     }
  366.     /**
  367.      * Remove entire benchmark directory.
  368.      */
  369.     long executeOp(int daemonId, int inputIdx, String ignore) 
  370.     throws IOException {
  371.       nameNode.setSafeMode(FSConstants.SafeModeAction.SAFEMODE_LEAVE);
  372.       long start = System.currentTimeMillis();
  373.       nameNode.delete(BASE_DIR_NAME, true);
  374.       long end = System.currentTimeMillis();
  375.       return end-start;
  376.     }
  377.     void printResults() {
  378.       LOG.info("--- " + getOpName() + " inputs ---");
  379.       LOG.info("Remove directory " + BASE_DIR_NAME);
  380.       printStats();
  381.     }
  382.   }
  383.   /**
  384.    * File creation statistics.
  385.    * 
  386.    * Each thread creates the same (+ or -1) number of files.
  387.    * File names are pre-generated during initialization.
  388.    * The created files do not have blocks.
  389.    */
  390.   class CreateFileStats extends OperationStatsBase {
  391.     // Operation types
  392.     static final String OP_CREATE_NAME = "create";
  393.     static final String OP_CREATE_USAGE = 
  394.       "-op create [-threads T] [-files N] [-filesPerDir P] [-close]";
  395.     protected FileNameGenerator nameGenerator;
  396.     protected String[][] fileNames;
  397.     private boolean closeUponCreate;
  398.     CreateFileStats(List<String> args) {
  399.       super();
  400.       parseArguments(args);
  401.     }
  402.     String getOpName() {
  403.       return OP_CREATE_NAME;
  404.     }
  405.     void parseArguments(List<String> args) {
  406.       boolean ignoreUnrelatedOptions = verifyOpArgument(args);
  407.       int nrFilesPerDir = 4;
  408.       closeUponCreate = false;
  409.       for (int i = 2; i < args.size(); i++) {       // parse command line
  410.         if(args.get(i).equals("-files")) {
  411.           if(i+1 == args.size())  printUsage();
  412.           numOpsRequired = Integer.parseInt(args.get(++i));
  413.         } else if(args.get(i).equals("-threads")) {
  414.           if(i+1 == args.size())  printUsage();
  415.           numThreads = Integer.parseInt(args.get(++i));
  416.         } else if(args.get(i).equals("-filesPerDir")) {
  417.           if(i+1 == args.size())  printUsage();
  418.           nrFilesPerDir = Integer.parseInt(args.get(++i));
  419.         } else if(args.get(i).equals("-close")) {
  420.           closeUponCreate = true;
  421.         } else if(!ignoreUnrelatedOptions)
  422.           printUsage();
  423.       }
  424.       nameGenerator = new FileNameGenerator(getBaseDir(), nrFilesPerDir);
  425.     }
  426.     void generateInputs(int[] opsPerThread) throws IOException {
  427.       assert opsPerThread.length == numThreads : "Error opsPerThread.length"; 
  428.       nameNode.setSafeMode(FSConstants.SafeModeAction.SAFEMODE_LEAVE);
  429.       // int generatedFileIdx = 0;
  430.       LOG.info("Generate " + numOpsRequired + " intputs for " + getOpName());
  431.       fileNames = new String[numThreads][];
  432.       for(int idx=0; idx < numThreads; idx++) {
  433.         int threadOps = opsPerThread[idx];
  434.         fileNames[idx] = new String[threadOps];
  435.         for(int jdx=0; jdx < threadOps; jdx++)
  436.           fileNames[idx][jdx] = nameGenerator.
  437.                                   getNextFileName("ThroughputBench");
  438.       }
  439.     }
  440.     void dummyActionNoSynch(int daemonId, int fileIdx) {
  441.       for(int i=0; i < 2000; i++)
  442.         fileNames[daemonId][fileIdx].contains(""+i);
  443.     }
  444.     /**
  445.      * returns client name
  446.      */
  447.     String getExecutionArgument(int daemonId) {
  448.       return getClientName(daemonId);
  449.     }
  450.     /**
  451.      * Do file create.
  452.      */
  453.     long executeOp(int daemonId, int inputIdx, String clientName) 
  454.     throws IOException {
  455.       long start = System.currentTimeMillis();
  456.       // dummyActionNoSynch(fileIdx);
  457.       nameNode.create(fileNames[daemonId][inputIdx], FsPermission.getDefault(),
  458.                       clientName, true, replication, BLOCK_SIZE);
  459.       long end = System.currentTimeMillis();
  460.       for(boolean written = !closeUponCreate; !written; 
  461.         written = nameNode.complete(fileNames[daemonId][inputIdx], clientName));
  462.       return end-start;
  463.     }
  464.     void printResults() {
  465.       LOG.info("--- " + getOpName() + " inputs ---");
  466.       LOG.info("nrFiles = " + numOpsRequired);
  467.       LOG.info("nrThreads = " + numThreads);
  468.       LOG.info("nrFilesPerDir = " + nameGenerator.getFilesPerDirectory());
  469.       printStats();
  470.     }
  471.   }
  472.   /**
  473.    * Open file statistics.
  474.    * 
  475.    * Measure how many open calls (getBlockLocations()) 
  476.    * the name-node can handle per second.
  477.    */
  478.   class OpenFileStats extends CreateFileStats {
  479.     // Operation types
  480.     static final String OP_OPEN_NAME = "open";
  481.     static final String OP_USAGE_ARGS = 
  482.       " [-threads T] [-files N] [-filesPerDir P] [-useExisting]";
  483.     static final String OP_OPEN_USAGE = 
  484.       "-op " + OP_OPEN_NAME + OP_USAGE_ARGS;
  485.     private boolean useExisting;  // do not generate files, use existing ones
  486.     OpenFileStats(List<String> args) {
  487.       super(args);
  488.     }
  489.     String getOpName() {
  490.       return OP_OPEN_NAME;
  491.     }
  492.     void parseArguments(List<String> args) {
  493.       int ueIndex = args.indexOf("-useExisting");
  494.       useExisting = (ueIndex >= 0);
  495.       if(useExisting) {
  496.         args.remove(ueIndex);
  497.       }
  498.       super.parseArguments(args);
  499.     }
  500.     void generateInputs(int[] opsPerThread) throws IOException {
  501.       // create files using opsPerThread
  502.       String[] createArgs = new String[] {
  503.               "-op", "create", 
  504.               "-threads", String.valueOf(this.numThreads), 
  505.               "-files", String.valueOf(numOpsRequired),
  506.               "-filesPerDir", 
  507.               String.valueOf(nameGenerator.getFilesPerDirectory()),
  508.               "-close"};
  509.       CreateFileStats opCreate =  new CreateFileStats(Arrays.asList(createArgs));
  510.       if(!useExisting) {  // create files if they were not created before
  511.         opCreate.benchmark();
  512.         LOG.info("Created " + numOpsRequired + " files.");
  513.       } else {
  514.         LOG.info("useExisting = true. Assuming " 
  515.             + numOpsRequired + " files have been created before.");
  516.       }
  517.       // use the same files for open
  518.       super.generateInputs(opsPerThread);
  519.       if(nameNode.getFileInfo(opCreate.getBaseDir()) != null
  520.           && nameNode.getFileInfo(getBaseDir()) == null) {
  521.         nameNode.rename(opCreate.getBaseDir(), getBaseDir());
  522.       }
  523.       if(nameNode.getFileInfo(getBaseDir()) == null) {
  524.         throw new IOException(getBaseDir() + " does not exist.");
  525.       }
  526.     }
  527.     /**
  528.      * Do file open.
  529.      */
  530.     long executeOp(int daemonId, int inputIdx, String ignore) 
  531.     throws IOException {
  532.       long start = System.currentTimeMillis();
  533.       nameNode.getBlockLocations(fileNames[daemonId][inputIdx], 0L, BLOCK_SIZE);
  534.       long end = System.currentTimeMillis();
  535.       return end-start;
  536.     }
  537.   }
  538.   /**
  539.    * Delete file statistics.
  540.    * 
  541.    * Measure how many delete calls the name-node can handle per second.
  542.    */
  543.   class DeleteFileStats extends OpenFileStats {
  544.     // Operation types
  545.     static final String OP_DELETE_NAME = "delete";
  546.     static final String OP_DELETE_USAGE = 
  547.       "-op " + OP_DELETE_NAME + OP_USAGE_ARGS;
  548.     DeleteFileStats(List<String> args) {
  549.       super(args);
  550.     }
  551.     String getOpName() {
  552.       return OP_DELETE_NAME;
  553.     }
  554.     long executeOp(int daemonId, int inputIdx, String ignore) 
  555.     throws IOException {
  556.       long start = System.currentTimeMillis();
  557.       nameNode.delete(fileNames[daemonId][inputIdx], false);
  558.       long end = System.currentTimeMillis();
  559.       return end-start;
  560.     }
  561.   }
  562.   /**
  563.    * Rename file statistics.
  564.    * 
  565.    * Measure how many rename calls the name-node can handle per second.
  566.    */
  567.   class RenameFileStats extends OpenFileStats {
  568.     // Operation types
  569.     static final String OP_RENAME_NAME = "rename";
  570.     static final String OP_RENAME_USAGE = 
  571.       "-op " + OP_RENAME_NAME + OP_USAGE_ARGS;
  572.     protected String[][] destNames;
  573.     RenameFileStats(List<String> args) {
  574.       super(args);
  575.     }
  576.     String getOpName() {
  577.       return OP_RENAME_NAME;
  578.     }
  579.     void generateInputs(int[] opsPerThread) throws IOException {
  580.       super.generateInputs(opsPerThread);
  581.       destNames = new String[fileNames.length][];
  582.       for(int idx=0; idx < numThreads; idx++) {
  583.         int nrNames = fileNames[idx].length;
  584.         destNames[idx] = new String[nrNames];
  585.         for(int jdx=0; jdx < nrNames; jdx++)
  586.           destNames[idx][jdx] = fileNames[idx][jdx] + ".r";
  587.       }
  588.     }
  589.     long executeOp(int daemonId, int inputIdx, String ignore) 
  590.     throws IOException {
  591.       long start = System.currentTimeMillis();
  592.       nameNode.rename(fileNames[daemonId][inputIdx],
  593.                       destNames[daemonId][inputIdx]);
  594.       long end = System.currentTimeMillis();
  595.       return end-start;
  596.     }
  597.   }
  598.   /**
  599.    * Minimal data-node simulator.
  600.    */
  601.   private static class TinyDatanode implements Comparable<String> {
  602.     private static final long DF_CAPACITY = 100*1024*1024;
  603.     private static final long DF_USED = 0;
  604.     
  605.     NamespaceInfo nsInfo;
  606.     DatanodeRegistration dnRegistration;
  607.     Block[] blocks;
  608.     int nrBlocks; // actual number of blocks
  609.     /**
  610.      * Get data-node in the form 
  611.      * <host name> : <port>
  612.      * where port is a 6 digit integer.
  613.      * This is necessary in order to provide lexocographic ordering.
  614.      * Host names are all the same, the ordering goes by port numbers.
  615.      */
  616.     private static String getNodeName(int port) throws IOException {
  617.       String machineName = DNS.getDefaultHost("default", "default");
  618.       String sPort = String.valueOf(100000 + port);
  619.       if(sPort.length() > 6)
  620.         throw new IOException("Too many data-nodes.");
  621.       return machineName + ":" + sPort;
  622.     }
  623.     TinyDatanode(int dnIdx, int blockCapacity) throws IOException {
  624.       dnRegistration = new DatanodeRegistration(getNodeName(dnIdx));
  625.       this.blocks = new Block[blockCapacity];
  626.       this.nrBlocks = 0;
  627.     }
  628.     String getName() {
  629.       return dnRegistration.getName();
  630.     }
  631.     void register() throws IOException {
  632.       // get versions from the namenode
  633.       nsInfo = nameNode.versionRequest();
  634.       dnRegistration.setStorageInfo(new DataStorage(nsInfo, ""));
  635.       DataNode.setNewStorageID(dnRegistration);
  636.       // register datanode
  637.       dnRegistration = nameNode.register(dnRegistration);
  638.     }
  639.     /**
  640.      * Send a heartbeat to the name-node.
  641.      * Ignore reply commands.
  642.      */
  643.     void sendHeartbeat() throws IOException {
  644.       // register datanode
  645.       DatanodeCommand[] cmds = nameNode.sendHeartbeat(
  646.           dnRegistration, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, 0, 0);
  647.       if(cmds != null) {
  648.         for (DatanodeCommand cmd : cmds ) {
  649.           LOG.debug("sendHeartbeat Name-node reply: " + cmd.getAction());
  650.         }
  651.       }
  652.     }
  653.     boolean addBlock(Block blk) {
  654.       if(nrBlocks == blocks.length) {
  655.         LOG.debug("Cannot add block: datanode capacity = " + blocks.length);
  656.         return false;
  657.       }
  658.       blocks[nrBlocks] = blk;
  659.       nrBlocks++;
  660.       return true;
  661.     }
  662.     void formBlockReport() {
  663.       // fill remaining slots with blocks that do not exist
  664.       for(int idx = blocks.length-1; idx >= nrBlocks; idx--)
  665.         blocks[idx] = new Block(blocks.length - idx, 0, 0);
  666.     }
  667.     public int compareTo(String name) {
  668.       return getName().compareTo(name);
  669.     }
  670.     /**
  671.      * Send a heartbeat to the name-node and replicate blocks if requested.
  672.      */
  673.     int replicateBlocks() throws IOException {
  674.       // register datanode
  675.       DatanodeCommand[] cmds = nameNode.sendHeartbeat(
  676.           dnRegistration, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, 0, 0);
  677.       if (cmds != null) {
  678.         for (DatanodeCommand cmd : cmds) {
  679.           if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) {
  680.             // Send a copy of a block to another datanode
  681.             BlockCommand bcmd = (BlockCommand)cmd;
  682.             return transferBlocks(bcmd.getBlocks(), bcmd.getTargets());
  683.           }
  684.         }
  685.       }
  686.       return 0;
  687.     }
  688.     /**
  689.      * Transfer blocks to another data-node.
  690.      * Just report on behalf of the other data-node
  691.      * that the blocks have been received.
  692.      */
  693.     private int transferBlocks( Block blocks[], 
  694.                                 DatanodeInfo xferTargets[][] 
  695.                               ) throws IOException {
  696.       for(int i = 0; i < blocks.length; i++) {
  697.         DatanodeInfo blockTargets[] = xferTargets[i];
  698.         for(int t = 0; t < blockTargets.length; t++) {
  699.           DatanodeInfo dnInfo = blockTargets[t];
  700.           DatanodeRegistration receivedDNReg;
  701.           receivedDNReg = new DatanodeRegistration(dnInfo.getName());
  702.           receivedDNReg.setStorageInfo(
  703.                           new DataStorage(nsInfo, dnInfo.getStorageID()));
  704.           receivedDNReg.setInfoPort(dnInfo.getInfoPort());
  705.           nameNode.blockReceived( receivedDNReg, 
  706.                                   new Block[] {blocks[i]},
  707.                                   new String[] {DataNode.EMPTY_DEL_HINT});
  708.         }
  709.       }
  710.       return blocks.length;
  711.     }
  712.   }
  713.   /**
  714.    * Block report statistics.
  715.    * 
  716.    * Each thread here represents its own data-node.
  717.    * Data-nodes send the same block report each time.
  718.    * The block report may contain missing or non-existing blocks.
  719.    */
  720.   class BlockReportStats extends OperationStatsBase {
  721.     static final String OP_BLOCK_REPORT_NAME = "blockReport";
  722.     static final String OP_BLOCK_REPORT_USAGE = 
  723.       "-op blockReport [-datanodes T] [-reports N] " +
  724.       "[-blocksPerReport B] [-blocksPerFile F]";
  725.     private int blocksPerReport;
  726.     private int blocksPerFile;
  727.     private TinyDatanode[] datanodes; // array of data-nodes sorted by name
  728.     BlockReportStats(List<String> args) {
  729.       super();
  730.       this.blocksPerReport = 100;
  731.       this.blocksPerFile = 10;
  732.       // set heartbeat interval to 3 min, so that expiration were 40 min
  733.       config.setLong("dfs.heartbeat.interval", 3 * 60);
  734.       parseArguments(args);
  735.       // adjust replication to the number of data-nodes
  736.       this.replication = (short)Math.min((int)replication, getNumDatanodes());
  737.     }
  738.     /**
  739.      * Each thread pretends its a data-node here.
  740.      */
  741.     private int getNumDatanodes() {
  742.       return numThreads;
  743.     }
  744.     String getOpName() {
  745.       return OP_BLOCK_REPORT_NAME;
  746.     }
  747.     void parseArguments(List<String> args) {
  748.       boolean ignoreUnrelatedOptions = verifyOpArgument(args);
  749.       for (int i = 2; i < args.size(); i++) {       // parse command line
  750.         if(args.get(i).equals("-reports")) {
  751.           if(i+1 == args.size())  printUsage();
  752.           numOpsRequired = Integer.parseInt(args.get(++i));
  753.         } else if(args.get(i).equals("-datanodes")) {
  754.           if(i+1 == args.size())  printUsage();
  755.           numThreads = Integer.parseInt(args.get(++i));
  756.         } else if(args.get(i).equals("-blocksPerReport")) {
  757.           if(i+1 == args.size())  printUsage();
  758.           blocksPerReport = Integer.parseInt(args.get(++i));
  759.         } else if(args.get(i).equals("-blocksPerFile")) {
  760.           if(i+1 == args.size())  printUsage();
  761.           blocksPerFile = Integer.parseInt(args.get(++i));
  762.         } else if(!ignoreUnrelatedOptions)
  763.           printUsage();
  764.       }
  765.     }
  766.     void generateInputs(int[] ignore) throws IOException {
  767.       int nrDatanodes = getNumDatanodes();
  768.       int nrBlocks = (int)Math.ceil((double)blocksPerReport * nrDatanodes 
  769.                                     / replication);
  770.       int nrFiles = (int)Math.ceil((double)nrBlocks / blocksPerFile);
  771.       datanodes = new TinyDatanode[nrDatanodes];
  772.       // create data-nodes
  773.       String prevDNName = "";
  774.       for(int idx=0; idx < nrDatanodes; idx++) {
  775.         datanodes[idx] = new TinyDatanode(idx, blocksPerReport);
  776.         datanodes[idx].register();
  777.         assert datanodes[idx].getName().compareTo(prevDNName) > 0
  778.           : "Data-nodes must be sorted lexicographically.";
  779.         datanodes[idx].sendHeartbeat();
  780.         prevDNName = datanodes[idx].getName();
  781.       }
  782.       // create files 
  783.       LOG.info("Creating " + nrFiles + " with " + blocksPerFile + " blocks each.");
  784.       FileNameGenerator nameGenerator;
  785.       nameGenerator = new FileNameGenerator(getBaseDir(), 100);
  786.       String clientName = getClientName(007);
  787.       nameNode.setSafeMode(FSConstants.SafeModeAction.SAFEMODE_LEAVE);
  788.       for(int idx=0; idx < nrFiles; idx++) {
  789.         String fileName = nameGenerator.getNextFileName("ThroughputBench");
  790.         nameNode.create(fileName, FsPermission.getDefault(),
  791.                         clientName, true, replication, BLOCK_SIZE);
  792.         addBlocks(fileName, clientName);
  793.         nameNode.complete(fileName, clientName);
  794.       }
  795.       // prepare block reports
  796.       for(int idx=0; idx < nrDatanodes; idx++) {
  797.         datanodes[idx].formBlockReport();
  798.       }
  799.     }
  800.     private void addBlocks(String fileName, String clientName) throws IOException {
  801.       for(int jdx = 0; jdx < blocksPerFile; jdx++) {
  802.         LocatedBlock loc = nameNode.addBlock(fileName, clientName);
  803.         for(DatanodeInfo dnInfo : loc.getLocations()) {
  804.           int dnIdx = Arrays.binarySearch(datanodes, dnInfo.getName());
  805.           datanodes[dnIdx].addBlock(loc.getBlock());
  806.           nameNode.blockReceived(
  807.               datanodes[dnIdx].dnRegistration, 
  808.               new Block[] {loc.getBlock()},
  809.               new String[] {""});
  810.         }
  811.       }
  812.     }
  813.     /**
  814.      * Does not require the argument
  815.      */
  816.     String getExecutionArgument(int daemonId) {
  817.       return null;
  818.     }
  819.     long executeOp(int daemonId, int inputIdx, String ignore) throws IOException {
  820.       assert daemonId < numThreads : "Wrong daemonId.";
  821.       TinyDatanode dn = datanodes[daemonId];
  822.       long start = System.currentTimeMillis();
  823.       nameNode.blockReport(dn.dnRegistration,
  824.           BlockListAsLongs.convertToArrayLongs(dn.blocks));
  825.       long end = System.currentTimeMillis();
  826.       return end-start;
  827.     }
  828.     void printResults() {
  829.       String blockDistribution = "";
  830.       String delim = "(";
  831.       for(int idx=0; idx < getNumDatanodes(); idx++) {
  832.         blockDistribution += delim + datanodes[idx].nrBlocks;
  833.         delim = ", ";
  834.       }
  835.       blockDistribution += ")";
  836.       LOG.info("--- " + getOpName() + " inputs ---");
  837.       LOG.info("reports = " + numOpsRequired);
  838.       LOG.info("datanodes = " + numThreads + " " + blockDistribution);
  839.       LOG.info("blocksPerReport = " + blocksPerReport);
  840.       LOG.info("blocksPerFile = " + blocksPerFile);
  841.       printStats();
  842.     }
  843.   }   // end BlockReportStats
  844.   /**
  845.    * Measures how fast replication monitor can compute data-node work.
  846.    * 
  847.    * It runs only one thread until no more work can be scheduled.
  848.    */
  849.   class ReplicationStats extends OperationStatsBase {
  850.     static final String OP_REPLICATION_NAME = "replication";
  851.     static final String OP_REPLICATION_USAGE = 
  852.       "-op replication [-datanodes T] [-nodesToDecommission D] " +
  853.       "[-nodeReplicationLimit C] [-totalBlocks B] [-replication R]";
  854.     private BlockReportStats blockReportObject;
  855.     private int numDatanodes;
  856.     private int nodesToDecommission;
  857.     private int nodeReplicationLimit;
  858.     private int totalBlocks;
  859.     private int numDecommissionedBlocks;
  860.     private int numPendingBlocks;
  861.     ReplicationStats(List<String> args) {
  862.       super();
  863.       numThreads = 1;
  864.       numDatanodes = 3;
  865.       nodesToDecommission = 1;
  866.       nodeReplicationLimit = 100;
  867.       totalBlocks = 100;
  868.       parseArguments(args);
  869.       // number of operations is 4 times the number of decommissioned
  870.       // blocks divided by the number of needed replications scanned 
  871.       // by the replication monitor in one iteration
  872.       numOpsRequired = (totalBlocks*replication*nodesToDecommission*2)
  873.             / (numDatanodes*numDatanodes);
  874.       String[] blkReportArgs = {
  875.         "-op", "blockReport",
  876.         "-datanodes", String.valueOf(numDatanodes),
  877.         "-blocksPerReport", String.valueOf(totalBlocks*replication/numDatanodes),
  878.         "-blocksPerFile", String.valueOf(numDatanodes)};
  879.       blockReportObject = new BlockReportStats(Arrays.asList(blkReportArgs));
  880.       numDecommissionedBlocks = 0;
  881.       numPendingBlocks = 0;
  882.     }
  883.     String getOpName() {
  884.       return OP_REPLICATION_NAME;
  885.     }
  886.     void parseArguments(List<String> args) {
  887.       boolean ignoreUnrelatedOptions = verifyOpArgument(args);
  888.       for (int i = 2; i < args.size(); i++) {       // parse command line
  889.         if(args.get(i).equals("-datanodes")) {
  890.           if(i+1 == args.size())  printUsage();
  891.           numDatanodes = Integer.parseInt(args.get(++i));
  892.         } else if(args.get(i).equals("-nodesToDecommission")) {
  893.           if(i+1 == args.size())  printUsage();
  894.           nodesToDecommission = Integer.parseInt(args.get(++i));
  895.         } else if(args.get(i).equals("-nodeReplicationLimit")) {
  896.           if(i+1 == args.size())  printUsage();
  897.           nodeReplicationLimit = Integer.parseInt(args.get(++i));
  898.         } else if(args.get(i).equals("-totalBlocks")) {
  899.           if(i+1 == args.size())  printUsage();
  900.           totalBlocks = Integer.parseInt(args.get(++i));
  901.         } else if(args.get(i).equals("-replication")) {
  902.           if(i+1 == args.size())  printUsage();
  903.           replication = Short.parseShort(args.get(++i));
  904.         } else if(!ignoreUnrelatedOptions)
  905.           printUsage();
  906.       }
  907.     }
  908.     void generateInputs(int[] ignore) throws IOException {
  909.       // start data-nodes; create a bunch of files; generate block reports.
  910.       blockReportObject.generateInputs(ignore);
  911.       // stop replication monitor
  912.       nameNode.namesystem.replthread.interrupt();
  913.       try {
  914.         nameNode.namesystem.replthread.join();
  915.       } catch(InterruptedException ei) {
  916.         return;
  917.       }
  918.       // report blocks once
  919.       int nrDatanodes = blockReportObject.getNumDatanodes();
  920.       for(int idx=0; idx < nrDatanodes; idx++) {
  921.         blockReportObject.executeOp(idx, 0, null);
  922.       }
  923.       // decommission data-nodes
  924.       decommissionNodes();
  925.       // set node replication limit
  926.       nameNode.namesystem.setNodeReplicationLimit(nodeReplicationLimit);
  927.     }
  928.     private void decommissionNodes() throws IOException {
  929.       String excludeFN = config.get("dfs.hosts.exclude", "exclude");
  930.       FileOutputStream excludeFile = new FileOutputStream(excludeFN);
  931.       excludeFile.getChannel().truncate(0L);
  932.       int nrDatanodes = blockReportObject.getNumDatanodes();
  933.       numDecommissionedBlocks = 0;
  934.       for(int i=0; i < nodesToDecommission; i++) {
  935.         TinyDatanode dn = blockReportObject.datanodes[nrDatanodes-1-i];
  936.         numDecommissionedBlocks += dn.nrBlocks;
  937.         excludeFile.write(dn.getName().getBytes());
  938.         excludeFile.write('n');
  939.         LOG.info("Datanode " + dn.getName() + " is decommissioned.");
  940.       }
  941.       excludeFile.close();
  942.       nameNode.refreshNodes();
  943.     }
  944.     /**
  945.      * Does not require the argument
  946.      */
  947.     String getExecutionArgument(int daemonId) {
  948.       return null;
  949.     }
  950.     long executeOp(int daemonId, int inputIdx, String ignore) throws IOException {
  951.       assert daemonId < numThreads : "Wrong daemonId.";
  952.       long start = System.currentTimeMillis();
  953.       // compute data-node work
  954.       int work = nameNode.namesystem.computeDatanodeWork();
  955.       long end = System.currentTimeMillis();
  956.       numPendingBlocks += work;
  957.       if(work == 0)
  958.         daemons.get(daemonId).terminate();
  959.       return end-start;
  960.     }
  961.     void printResults() {
  962.       String blockDistribution = "";
  963.       String delim = "(";
  964.       int totalReplicas = 0;
  965.       for(int idx=0; idx < blockReportObject.getNumDatanodes(); idx++) {
  966.         totalReplicas += blockReportObject.datanodes[idx].nrBlocks;
  967.         blockDistribution += delim + blockReportObject.datanodes[idx].nrBlocks;
  968.         delim = ", ";
  969.       }
  970.       blockDistribution += ")";
  971.       LOG.info("--- " + getOpName() + " inputs ---");
  972.       LOG.info("numOpsRequired = " + numOpsRequired);
  973.       LOG.info("datanodes = " + numDatanodes + " " + blockDistribution);
  974.       LOG.info("decommissioned datanodes = " + nodesToDecommission);
  975.       LOG.info("datanode replication limit = " + nodeReplicationLimit);
  976.       LOG.info("total blocks = " + totalBlocks);
  977.       printStats();
  978.       LOG.info("decommissioned blocks = " + numDecommissionedBlocks);
  979.       LOG.info("pending replications = " + numPendingBlocks);
  980.       LOG.info("replications per sec: " + getBlocksPerSecond());
  981.     }
  982.     private double getBlocksPerSecond() {
  983.       return elapsedTime == 0 ? 0 : 1000*(double)numPendingBlocks / elapsedTime;
  984.     }
  985.   }   // end ReplicationStats
  986.   static void printUsage() {
  987.     System.err.println("Usage: NNThroughputBenchmark"
  988.         + "nt"    + OperationStatsBase.OP_ALL_USAGE
  989.         + " | nt" + CreateFileStats.OP_CREATE_USAGE
  990.         + " | nt" + OpenFileStats.OP_OPEN_USAGE
  991.         + " | nt" + DeleteFileStats.OP_DELETE_USAGE
  992.         + " | nt" + RenameFileStats.OP_RENAME_USAGE
  993.         + " | nt" + BlockReportStats.OP_BLOCK_REPORT_USAGE
  994.         + " | nt" + ReplicationStats.OP_REPLICATION_USAGE
  995.         + " | nt" + CleanAllStats.OP_CLEAN_USAGE
  996.     );
  997.     System.exit(-1);
  998.   }
  999.   /**
  1000.    * Main method of the benchmark.
  1001.    * @param args command line parameters
  1002.    */
  1003.   public static void runBenchmark(Configuration conf, List<String> args) throws Exception {
  1004.     if(args.size() < 2 || ! args.get(0).startsWith("-op"))
  1005.       printUsage();
  1006.     String type = args.get(1);
  1007.     boolean runAll = OperationStatsBase.OP_ALL_NAME.equals(type);
  1008.     NNThroughputBenchmark bench = null;
  1009.     List<OperationStatsBase> ops = new ArrayList<OperationStatsBase>();
  1010.     OperationStatsBase opStat = null;
  1011.     try {
  1012.       bench = new NNThroughputBenchmark(conf);
  1013.       if(runAll || CreateFileStats.OP_CREATE_NAME.equals(type)) {
  1014.         opStat = bench.new CreateFileStats(args);
  1015.         ops.add(opStat);
  1016.       }
  1017.       if(runAll || OpenFileStats.OP_OPEN_NAME.equals(type)) {
  1018.         opStat = bench.new OpenFileStats(args);
  1019.         ops.add(opStat);
  1020.       }
  1021.       if(runAll || DeleteFileStats.OP_DELETE_NAME.equals(type)) {
  1022.         opStat = bench.new DeleteFileStats(args);
  1023.         ops.add(opStat);
  1024.       }
  1025.       if(runAll || RenameFileStats.OP_RENAME_NAME.equals(type)) {
  1026.         opStat = bench.new RenameFileStats(args);
  1027.         ops.add(opStat);
  1028.       }
  1029.       if(runAll || BlockReportStats.OP_BLOCK_REPORT_NAME.equals(type)) {
  1030.         opStat = bench.new BlockReportStats(args);
  1031.         ops.add(opStat);
  1032.       }
  1033.       if(runAll || ReplicationStats.OP_REPLICATION_NAME.equals(type)) {
  1034.         opStat = bench.new ReplicationStats(args);
  1035.         ops.add(opStat);
  1036.       }
  1037.       if(runAll || CleanAllStats.OP_CLEAN_NAME.equals(type)) {
  1038.         opStat = bench.new CleanAllStats(args);
  1039.         ops.add(opStat);
  1040.       }
  1041.       if(ops.size() == 0)
  1042.         printUsage();
  1043.       // run each benchmark
  1044.       for(OperationStatsBase op : ops) {
  1045.         LOG.info("Starting benchmark: " + op.getOpName());
  1046.         op.benchmark();
  1047.         op.cleanUp();
  1048.       }
  1049.       // print statistics
  1050.       for(OperationStatsBase op : ops) {
  1051.         LOG.info("");
  1052.         op.printResults();
  1053.       }
  1054.     } catch(Exception e) {
  1055.       LOG.error(StringUtils.stringifyException(e));
  1056.       throw e;
  1057.     } finally {
  1058.       if(bench != null)
  1059.         bench.close();
  1060.     }
  1061.   }
  1062.   public static void main(String[] args) throws Exception {
  1063.     runBenchmark(new Configuration(), 
  1064.                   new ArrayList<String>(Arrays.asList(args)));
  1065.   }
  1066. }