BenchmarkThroughput.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:7k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfs;
  19. import java.io.File;
  20. import java.io.FileInputStream;
  21. import java.io.FileOutputStream;
  22. import java.io.IOException;
  23. import java.io.InputStream;
  24. import java.io.OutputStream;
  25. import org.apache.commons.logging.Log;
  26. import org.apache.commons.logging.LogFactory;
  27. import org.apache.commons.logging.impl.Log4JLogger;
  28. import org.apache.hadoop.conf.Configuration;
  29. import org.apache.hadoop.conf.Configured;
  30. import org.apache.hadoop.fs.ChecksumFileSystem;
  31. import org.apache.hadoop.fs.FileSystem;
  32. import org.apache.hadoop.fs.LocalDirAllocator;
  33. import org.apache.hadoop.fs.Path;
  34. import org.apache.hadoop.util.Tool;
  35. import org.apache.hadoop.util.ToolRunner;
  36. import org.apache.log4j.Level;
  37. /**
  38.  * This class benchmarks the performance of the local file system, raw local
  39.  * file system and HDFS at reading and writing files. The user should invoke
  40.  * the main of this class and optionally include a repetition count.
  41.  */
  42. public class BenchmarkThroughput extends Configured implements Tool {
  43.   // the property in the config that specifies a working directory
  44.   private LocalDirAllocator dir;
  45.   private long startTime;
  46.   // the size of the buffer to use
  47.   private int BUFFER_SIZE;
  48.   private void resetMeasurements() {
  49.     startTime = System.currentTimeMillis();
  50.   }
  51.   private void printMeasurements() {
  52.     System.out.println(" time: " +
  53.                        ((System.currentTimeMillis() - startTime)/1000));
  54.   }
  55.   private Path writeLocalFile(String name, Configuration conf,
  56.                                      long total) throws IOException {
  57.     Path path = dir.getLocalPathForWrite(name, total, conf);
  58.     System.out.print("Writing " + name);
  59.     resetMeasurements();
  60.     OutputStream out = new FileOutputStream(new File(path.toString()));
  61.     byte[] data = new byte[BUFFER_SIZE];
  62.     for(long size=0; size < total; size += BUFFER_SIZE) {
  63.       out.write(data);
  64.     }
  65.     out.close();
  66.     printMeasurements();
  67.     return path;
  68.   }
  69.   private void readLocalFile(Path path,
  70.                                     String name,
  71.                                     Configuration conf) throws IOException {
  72.     System.out.print("Reading " + name);
  73.     resetMeasurements();
  74.     InputStream in = new FileInputStream(new File(path.toString()));
  75.     byte[] data = new byte[BUFFER_SIZE];
  76.     long size = 0;
  77.     while (size >= 0) {
  78.       size = in.read(data);
  79.     }
  80.     in.close();
  81.     printMeasurements();
  82.   }
  83.   private void writeAndReadLocalFile(String name,
  84.                                             Configuration conf,
  85.                                             long size
  86.                                            ) throws IOException {
  87.     Path f = null;
  88.     try {
  89.       f = writeLocalFile(name, conf, size);
  90.       readLocalFile(f, name, conf);
  91.     } finally {
  92.       if (f != null) {
  93.         new File(f.toString()).delete();
  94.       }
  95.     }
  96.   }
  97.   private Path writeFile(FileSystem fs,
  98.                                 String name,
  99.                                 Configuration conf,
  100.                                 long total
  101.                                 ) throws IOException {
  102.     Path f = dir.getLocalPathForWrite(name, total, conf);
  103.     System.out.print("Writing " + name);
  104.     resetMeasurements();
  105.     OutputStream out = fs.create(f);
  106.     byte[] data = new byte[BUFFER_SIZE];
  107.     for(long size = 0; size < total; size += BUFFER_SIZE) {
  108.       out.write(data);
  109.     }
  110.     out.close();
  111.     printMeasurements();
  112.     return f;
  113.   }
  114.   private void readFile(FileSystem fs,
  115.                                Path f,
  116.                                String name,
  117.                                Configuration conf
  118.                                ) throws IOException {
  119.     System.out.print("Reading " + name);
  120.     resetMeasurements();
  121.     InputStream in = fs.open(f);
  122.     byte[] data = new byte[BUFFER_SIZE];
  123.     long val = 0;
  124.     while (val >= 0) {
  125.       val = in.read(data);
  126.     }
  127.     in.close();
  128.     printMeasurements();
  129.   }
  130.   private void writeAndReadFile(FileSystem fs,
  131.                                        String name,
  132.                                        Configuration conf,
  133.                                        long size
  134.                                        ) throws IOException {
  135.     Path f = null;
  136.     try {
  137.       f = writeFile(fs, name, conf, size);
  138.       readFile(fs, f, name, conf);
  139.     } finally {
  140.       try {
  141.         if (f != null) {
  142.           fs.delete(f, true);
  143.         }
  144.       } catch (IOException ie) {
  145.         // IGNORE
  146.       }
  147.     }
  148.   }
  149.   private static void printUsage() {
  150.     ToolRunner.printGenericCommandUsage(System.err);
  151.     System.err.println("Usage: dfsthroughput [#reps]");
  152.     System.err.println("Config properties:n" +
  153.       "  dfsthroughput.file.size:tsize of each write/read (10GB)n" +
  154.       "  dfsthroughput.buffer.size:tbuffer size for write/read (4k)n");
  155.   }
  156.   public int run(String[] args) throws IOException {
  157.     // silence the minidfs cluster
  158.     Log hadoopLog = LogFactory.getLog("org");
  159.     if (hadoopLog instanceof Log4JLogger) {
  160.       ((Log4JLogger) hadoopLog).getLogger().setLevel(Level.WARN);
  161.     }
  162.     int reps = 1;
  163.     if (args.length == 1) {
  164.       try {
  165.         reps = Integer.parseInt(args[0]);
  166.       } catch (NumberFormatException e) {
  167.         printUsage();
  168.         return -1;
  169.       }
  170.     } else if (args.length > 1) {
  171.       printUsage();
  172.       return -1;
  173.     }
  174.     Configuration conf = getConf();
  175.     // the size of the file to write
  176.     long SIZE = conf.getLong("dfsthroughput.file.size",
  177.         10L * 1024 * 1024 * 1024);
  178.     BUFFER_SIZE = conf.getInt("dfsthroughput.buffer.size", 4 * 1024);
  179.     String localDir = conf.get("mapred.temp.dir");
  180.     dir = new LocalDirAllocator("mapred.temp.dir");
  181.     System.setProperty("test.build.data", localDir);
  182.     System.out.println("Local = " + localDir);
  183.     ChecksumFileSystem checkedLocal = FileSystem.getLocal(conf);
  184.     FileSystem rawLocal = checkedLocal.getRawFileSystem();
  185.     for(int i=0; i < reps; ++i) {
  186.       writeAndReadLocalFile("local", conf, SIZE);
  187.       writeAndReadFile(rawLocal, "raw", conf, SIZE);
  188.       writeAndReadFile(checkedLocal, "checked", conf, SIZE);
  189.     }
  190.     MiniDFSCluster cluster = null;
  191.     try {
  192.       cluster = new MiniDFSCluster(conf, 1, true, new String[]{"/foo"});
  193.       cluster.waitActive();
  194.       FileSystem dfs = cluster.getFileSystem();
  195.       for(int i=0; i < reps; ++i) {
  196.         writeAndReadFile(dfs, "dfs", conf, SIZE);
  197.       }
  198.     } finally {
  199.       if (cluster != null) {
  200.         cluster.shutdown();
  201.         // clean up minidfs junk
  202.         rawLocal.delete(new Path(localDir, "dfs"), true);
  203.       }
  204.     }
  205.     return 0;
  206.   }
  207.   /**
  208.    * @param args
  209.    */
  210.   public static void main(String[] args) throws Exception {
  211.     int res = ToolRunner.run(new Configuration(),
  212.         new BenchmarkThroughput(), args);
  213.     System.exit(res);
  214.   }
  215. }