TestFileSystem.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:19k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.fs;
  19. import java.io.DataInputStream;
  20. import java.io.IOException;
  21. import java.io.OutputStream;
  22. import java.util.Arrays;
  23. import java.util.Random;
  24. import java.util.List;
  25. import java.util.ArrayList;
  26. import java.util.Set;
  27. import java.util.HashSet;
  28. import java.util.Map;
  29. import java.util.HashMap;
  30. import java.net.InetSocketAddress;
  31. import java.net.URI;
  32. import junit.framework.TestCase;
  33. import org.apache.commons.logging.Log;
  34. import org.apache.hadoop.conf.Configuration;
  35. import org.apache.hadoop.conf.Configured;
  36. import org.apache.hadoop.hdfs.MiniDFSCluster;
  37. import org.apache.hadoop.hdfs.server.namenode.NameNode;
  38. import org.apache.hadoop.fs.shell.CommandFormat;
  39. import org.apache.hadoop.io.LongWritable;
  40. import org.apache.hadoop.io.SequenceFile;
  41. import org.apache.hadoop.io.UTF8;
  42. import org.apache.hadoop.io.WritableComparable;
  43. import org.apache.hadoop.io.SequenceFile.CompressionType;
  44. import org.apache.hadoop.mapred.FileInputFormat;
  45. import org.apache.hadoop.mapred.FileOutputFormat;
  46. import org.apache.hadoop.mapred.JobClient;
  47. import org.apache.hadoop.mapred.JobConf;
  48. import org.apache.hadoop.mapred.Mapper;
  49. import org.apache.hadoop.mapred.OutputCollector;
  50. import org.apache.hadoop.mapred.Reporter;
  51. import org.apache.hadoop.mapred.SequenceFileInputFormat;
  52. import org.apache.hadoop.mapred.lib.LongSumReducer;
  53. import org.apache.hadoop.security.UnixUserGroupInformation;
  54. public class TestFileSystem extends TestCase {
  55.   private static final Log LOG = FileSystem.LOG;
  56.   private static Configuration conf = new Configuration();
  57.   private static int BUFFER_SIZE = conf.getInt("io.file.buffer.size", 4096);
  58.   private static final long MEGA = 1024 * 1024;
  59.   private static final int SEEKS_PER_FILE = 4;
  60.   private static String ROOT = System.getProperty("test.build.data","fs_test");
  61.   private static Path CONTROL_DIR = new Path(ROOT, "fs_control");
  62.   private static Path WRITE_DIR = new Path(ROOT, "fs_write");
  63.   private static Path READ_DIR = new Path(ROOT, "fs_read");
  64.   private static Path DATA_DIR = new Path(ROOT, "fs_data");
  65.   public void testFs() throws Exception {
  66.     testFs(10 * MEGA, 100, 0);
  67.   }
  68.   public static void testFs(long megaBytes, int numFiles, long seed)
  69.     throws Exception {
  70.     FileSystem fs = FileSystem.get(conf);
  71.     if (seed == 0)
  72.       seed = new Random().nextLong();
  73.     LOG.info("seed = "+seed);
  74.     createControlFile(fs, megaBytes, numFiles, seed);
  75.     writeTest(fs, false);
  76.     readTest(fs, false);
  77.     seekTest(fs, false);
  78.     fs.delete(CONTROL_DIR, true);
  79.     fs.delete(DATA_DIR, true);
  80.     fs.delete(WRITE_DIR, true);
  81.     fs.delete(READ_DIR, true);
  82.   }
  83.   public static void testCommandFormat() throws Exception {
  84.     // This should go to TestFsShell.java when it is added.
  85.     CommandFormat cf;
  86.     cf= new CommandFormat("copyToLocal", 2,2,"crc","ignoreCrc");
  87.     assertEquals(cf.parse(new String[] {"-get","file", "-"}, 1).get(1), "-");
  88.     assertEquals(cf.parse(new String[] {"-get","file","-ignoreCrc","/foo"}, 1).get(1),"/foo");
  89.     cf = new CommandFormat("tail", 1, 1, "f");
  90.     assertEquals(cf.parse(new String[] {"-tail","fileName"}, 1).get(0),"fileName");
  91.     assertEquals(cf.parse(new String[] {"-tail","-f","fileName"}, 1).get(0),"fileName");
  92.     cf = new CommandFormat("setrep", 2, 2, "R", "w");
  93.     assertEquals(cf.parse(new String[] {"-setrep","-R","2","/foo/bar"}, 1).get(1), "/foo/bar");
  94.     cf = new CommandFormat("put", 2, 10000);
  95.     assertEquals(cf.parse(new String[] {"-put", "-", "dest"}, 1).get(1), "dest"); 
  96.   }
  97.   public static void createControlFile(FileSystem fs,
  98.                                        long megaBytes, int numFiles,
  99.                                        long seed) throws Exception {
  100.     LOG.info("creating control file: "+megaBytes+" bytes, "+numFiles+" files");
  101.     Path controlFile = new Path(CONTROL_DIR, "files");
  102.     fs.delete(controlFile, true);
  103.     Random random = new Random(seed);
  104.     SequenceFile.Writer writer =
  105.       SequenceFile.createWriter(fs, conf, controlFile, 
  106.                                 UTF8.class, LongWritable.class, CompressionType.NONE);
  107.     long totalSize = 0;
  108.     long maxSize = ((megaBytes / numFiles) * 2) + 1;
  109.     try {
  110.       while (totalSize < megaBytes) {
  111.         UTF8 name = new UTF8(Long.toString(random.nextLong()));
  112.         long size = random.nextLong();
  113.         if (size < 0)
  114.           size = -size;
  115.         size = size % maxSize;
  116.         //LOG.info(" adding: name="+name+" size="+size);
  117.         writer.append(name, new LongWritable(size));
  118.         totalSize += size;
  119.       }
  120.     } finally {
  121.       writer.close();
  122.     }
  123.     LOG.info("created control file for: "+totalSize+" bytes");
  124.   }
  125.   public static class WriteMapper extends Configured
  126.       implements Mapper<UTF8, LongWritable, UTF8, LongWritable> {
  127.     
  128.     private Random random = new Random();
  129.     private byte[] buffer = new byte[BUFFER_SIZE];
  130.     private FileSystem fs;
  131.     private boolean fastCheck;
  132.     // a random suffix per task
  133.     private String suffix = "-"+random.nextLong();
  134.     
  135.     {
  136.       try {
  137.         fs = FileSystem.get(conf);
  138.       } catch (IOException e) {
  139.         throw new RuntimeException(e);
  140.       }
  141.     }
  142.     public WriteMapper() { super(null); }
  143.     
  144.     public WriteMapper(Configuration conf) { super(conf); }
  145.     public void configure(JobConf job) {
  146.       setConf(job);
  147.       fastCheck = job.getBoolean("fs.test.fastCheck", false);
  148.     }
  149.     public void map(UTF8 key, LongWritable value,
  150.                     OutputCollector<UTF8, LongWritable> collector,
  151.                     Reporter reporter)
  152.       throws IOException {
  153.       
  154.       String name = key.toString();
  155.       long size = value.get();
  156.       long seed = Long.parseLong(name);
  157.       random.setSeed(seed);
  158.       reporter.setStatus("creating " + name);
  159.       // write to temp file initially to permit parallel execution
  160.       Path tempFile = new Path(DATA_DIR, name+suffix);
  161.       OutputStream out = fs.create(tempFile);
  162.       long written = 0;
  163.       try {
  164.         while (written < size) {
  165.           if (fastCheck) {
  166.             Arrays.fill(buffer, (byte)random.nextInt(Byte.MAX_VALUE));
  167.           } else {
  168.             random.nextBytes(buffer);
  169.           }
  170.           long remains = size - written;
  171.           int length = (remains<=buffer.length) ? (int)remains : buffer.length;
  172.           out.write(buffer, 0, length);
  173.           written += length;
  174.           reporter.setStatus("writing "+name+"@"+written+"/"+size);
  175.         }
  176.       } finally {
  177.         out.close();
  178.       }
  179.       // rename to final location
  180.       fs.rename(tempFile, new Path(DATA_DIR, name));
  181.       collector.collect(new UTF8("bytes"), new LongWritable(written));
  182.       reporter.setStatus("wrote " + name);
  183.     }
  184.     
  185.     public void close() {
  186.     }
  187.     
  188.   }
  189.   public static void writeTest(FileSystem fs, boolean fastCheck)
  190.     throws Exception {
  191.     fs.delete(DATA_DIR, true);
  192.     fs.delete(WRITE_DIR, true);
  193.     
  194.     JobConf job = new JobConf(conf, TestFileSystem.class);
  195.     job.setBoolean("fs.test.fastCheck", fastCheck);
  196.     FileInputFormat.setInputPaths(job, CONTROL_DIR);
  197.     job.setInputFormat(SequenceFileInputFormat.class);
  198.     job.setMapperClass(WriteMapper.class);
  199.     job.setReducerClass(LongSumReducer.class);
  200.     FileOutputFormat.setOutputPath(job, WRITE_DIR);
  201.     job.setOutputKeyClass(UTF8.class);
  202.     job.setOutputValueClass(LongWritable.class);
  203.     job.setNumReduceTasks(1);
  204.     JobClient.runJob(job);
  205.   }
  206.   public static class ReadMapper extends Configured
  207.       implements Mapper<UTF8, LongWritable, UTF8, LongWritable> {
  208.     
  209.     private Random random = new Random();
  210.     private byte[] buffer = new byte[BUFFER_SIZE];
  211.     private byte[] check  = new byte[BUFFER_SIZE];
  212.     private FileSystem fs;
  213.     private boolean fastCheck;
  214.     {
  215.       try {
  216.         fs = FileSystem.get(conf);
  217.       } catch (IOException e) {
  218.         throw new RuntimeException(e);
  219.       }
  220.     }
  221.     public ReadMapper() { super(null); }
  222.     
  223.     public ReadMapper(Configuration conf) { super(conf); }
  224.     public void configure(JobConf job) {
  225.       setConf(job);
  226.       fastCheck = job.getBoolean("fs.test.fastCheck", false);
  227.     }
  228.     public void map(UTF8 key, LongWritable value,
  229.                     OutputCollector<UTF8, LongWritable> collector,
  230.                     Reporter reporter)
  231.       throws IOException {
  232.       
  233.       String name = key.toString();
  234.       long size = value.get();
  235.       long seed = Long.parseLong(name);
  236.       random.setSeed(seed);
  237.       reporter.setStatus("opening " + name);
  238.       DataInputStream in =
  239.         new DataInputStream(fs.open(new Path(DATA_DIR, name)));
  240.       long read = 0;
  241.       try {
  242.         while (read < size) {
  243.           long remains = size - read;
  244.           int n = (remains<=buffer.length) ? (int)remains : buffer.length;
  245.           in.readFully(buffer, 0, n);
  246.           read += n;
  247.           if (fastCheck) {
  248.             Arrays.fill(check, (byte)random.nextInt(Byte.MAX_VALUE));
  249.           } else {
  250.             random.nextBytes(check);
  251.           }
  252.           if (n != buffer.length) {
  253.             Arrays.fill(buffer, n, buffer.length, (byte)0);
  254.             Arrays.fill(check, n, check.length, (byte)0);
  255.           }
  256.           assertTrue(Arrays.equals(buffer, check));
  257.           reporter.setStatus("reading "+name+"@"+read+"/"+size);
  258.         }
  259.       } finally {
  260.         in.close();
  261.       }
  262.       collector.collect(new UTF8("bytes"), new LongWritable(read));
  263.       reporter.setStatus("read " + name);
  264.     }
  265.     
  266.     public void close() {
  267.     }
  268.     
  269.   }
  270.   public static void readTest(FileSystem fs, boolean fastCheck)
  271.     throws Exception {
  272.     fs.delete(READ_DIR, true);
  273.     JobConf job = new JobConf(conf, TestFileSystem.class);
  274.     job.setBoolean("fs.test.fastCheck", fastCheck);
  275.     FileInputFormat.setInputPaths(job, CONTROL_DIR);
  276.     job.setInputFormat(SequenceFileInputFormat.class);
  277.     job.setMapperClass(ReadMapper.class);
  278.     job.setReducerClass(LongSumReducer.class);
  279.     FileOutputFormat.setOutputPath(job, READ_DIR);
  280.     job.setOutputKeyClass(UTF8.class);
  281.     job.setOutputValueClass(LongWritable.class);
  282.     job.setNumReduceTasks(1);
  283.     JobClient.runJob(job);
  284.   }
  285.   public static class SeekMapper<K> extends Configured
  286.     implements Mapper<WritableComparable, LongWritable, K, LongWritable> {
  287.     
  288.     private Random random = new Random();
  289.     private byte[] check  = new byte[BUFFER_SIZE];
  290.     private FileSystem fs;
  291.     private boolean fastCheck;
  292.     {
  293.       try {
  294.         fs = FileSystem.get(conf);
  295.       } catch (IOException e) {
  296.         throw new RuntimeException(e);
  297.       }
  298.     }
  299.     public SeekMapper() { super(null); }
  300.     
  301.     public SeekMapper(Configuration conf) { super(conf); }
  302.     public void configure(JobConf job) {
  303.       setConf(job);
  304.       fastCheck = job.getBoolean("fs.test.fastCheck", false);
  305.     }
  306.     public void map(WritableComparable key, LongWritable value,
  307.                     OutputCollector<K, LongWritable> collector,
  308.                     Reporter reporter)
  309.       throws IOException {
  310.       String name = key.toString();
  311.       long size = value.get();
  312.       long seed = Long.parseLong(name);
  313.       if (size == 0) return;
  314.       reporter.setStatus("opening " + name);
  315.       FSDataInputStream in = fs.open(new Path(DATA_DIR, name));
  316.         
  317.       try {
  318.         for (int i = 0; i < SEEKS_PER_FILE; i++) {
  319.           // generate a random position
  320.           long position = Math.abs(random.nextLong()) % size;
  321.           
  322.           // seek file to that position
  323.           reporter.setStatus("seeking " + name);
  324.           in.seek(position);
  325.           byte b = in.readByte();
  326.           
  327.           // check that byte matches
  328.           byte checkByte = 0;
  329.           // advance random state to that position
  330.           random.setSeed(seed);
  331.           for (int p = 0; p <= position; p+= check.length) {
  332.             reporter.setStatus("generating data for " + name);
  333.             if (fastCheck) {
  334.               checkByte = (byte)random.nextInt(Byte.MAX_VALUE);
  335.             } else {
  336.               random.nextBytes(check);
  337.               checkByte = check[(int)(position % check.length)];
  338.             }
  339.           }
  340.           assertEquals(b, checkByte);
  341.         }
  342.       } finally {
  343.         in.close();
  344.       }
  345.     }
  346.     
  347.     public void close() {
  348.     }
  349.     
  350.   }
  351.   public static void seekTest(FileSystem fs, boolean fastCheck)
  352.     throws Exception {
  353.     fs.delete(READ_DIR, true);
  354.     JobConf job = new JobConf(conf, TestFileSystem.class);
  355.     job.setBoolean("fs.test.fastCheck", fastCheck);
  356.     FileInputFormat.setInputPaths(job,CONTROL_DIR);
  357.     job.setInputFormat(SequenceFileInputFormat.class);
  358.     job.setMapperClass(SeekMapper.class);
  359.     job.setReducerClass(LongSumReducer.class);
  360.     FileOutputFormat.setOutputPath(job, READ_DIR);
  361.     job.setOutputKeyClass(UTF8.class);
  362.     job.setOutputValueClass(LongWritable.class);
  363.     job.setNumReduceTasks(1);
  364.     JobClient.runJob(job);
  365.   }
  366.   public static void main(String[] args) throws Exception {
  367.     int megaBytes = 10;
  368.     int files = 100;
  369.     boolean noRead = false;
  370.     boolean noWrite = false;
  371.     boolean noSeek = false;
  372.     boolean fastCheck = false;
  373.     long seed = new Random().nextLong();
  374.     String usage = "Usage: TestFileSystem -files N -megaBytes M [-noread] [-nowrite] [-noseek] [-fastcheck]";
  375.     
  376.     if (args.length == 0) {
  377.       System.err.println(usage);
  378.       System.exit(-1);
  379.     }
  380.     for (int i = 0; i < args.length; i++) {       // parse command line
  381.       if (args[i].equals("-files")) {
  382.         files = Integer.parseInt(args[++i]);
  383.       } else if (args[i].equals("-megaBytes")) {
  384.         megaBytes = Integer.parseInt(args[++i]);
  385.       } else if (args[i].equals("-noread")) {
  386.         noRead = true;
  387.       } else if (args[i].equals("-nowrite")) {
  388.         noWrite = true;
  389.       } else if (args[i].equals("-noseek")) {
  390.         noSeek = true;
  391.       } else if (args[i].equals("-fastcheck")) {
  392.         fastCheck = true;
  393.       }
  394.     }
  395.     LOG.info("seed = "+seed);
  396.     LOG.info("files = " + files);
  397.     LOG.info("megaBytes = " + megaBytes);
  398.   
  399.     FileSystem fs = FileSystem.get(conf);
  400.     if (!noWrite) {
  401.       createControlFile(fs, megaBytes*MEGA, files, seed);
  402.       writeTest(fs, fastCheck);
  403.     }
  404.     if (!noRead) {
  405.       readTest(fs, fastCheck);
  406.     }
  407.     if (!noSeek) {
  408.       seekTest(fs, fastCheck);
  409.     }
  410.   }
  411.   static Configuration createConf4Testing(String username) throws Exception {
  412.     Configuration conf = new Configuration();
  413.     UnixUserGroupInformation.saveToConf(conf,
  414.         UnixUserGroupInformation.UGI_PROPERTY_NAME,
  415.         new UnixUserGroupInformation(username, new String[]{"group"}));
  416.     return conf;    
  417.   }
  418.   public void testFsCache() throws Exception {
  419.     {
  420.       long now = System.currentTimeMillis();
  421.       Configuration[] conf = {new Configuration(),
  422.           createConf4Testing("foo" + now), createConf4Testing("bar" + now)};
  423.       FileSystem[] fs = new FileSystem[conf.length];
  424.   
  425.       for(int i = 0; i < conf.length; i++) {
  426.         fs[i] = FileSystem.get(conf[i]);
  427.         assertEquals(fs[i], FileSystem.get(conf[i]));
  428.         for(int j = 0; j < i; j++) {
  429.           assertFalse(fs[j] == fs[i]);
  430.         }
  431.       }
  432.       FileSystem.closeAll();
  433.     }
  434.     
  435.     {
  436.       try {
  437.         runTestCache(NameNode.DEFAULT_PORT);
  438.       } catch(java.net.BindException be) {
  439.         LOG.warn("Cannot test NameNode.DEFAULT_PORT (="
  440.             + NameNode.DEFAULT_PORT + ")", be);
  441.       }
  442.       runTestCache(0);
  443.     }
  444.   }
  445.   
  446.   static void runTestCache(int port) throws Exception {
  447.     Configuration conf = new Configuration();
  448.     MiniDFSCluster cluster = null;
  449.     try {
  450.       cluster = new MiniDFSCluster(port, conf, 2, true, true, null, null);
  451.       URI uri = cluster.getFileSystem().getUri();
  452.       LOG.info("uri=" + uri);
  453.       {
  454.         FileSystem fs = FileSystem.get(uri, new Configuration());
  455.         checkPath(cluster, fs);
  456.         for(int i = 0; i < 100; i++) {
  457.           assertTrue(fs == FileSystem.get(uri, new Configuration()));
  458.         }
  459.       }
  460.       
  461.       if (port == NameNode.DEFAULT_PORT) {
  462.         //test explicit default port
  463.         URI uri2 = new URI(uri.getScheme(), uri.getUserInfo(),
  464.             uri.getHost(), NameNode.DEFAULT_PORT, uri.getPath(),
  465.             uri.getQuery(), uri.getFragment());  
  466.         LOG.info("uri2=" + uri2);
  467.         FileSystem fs = FileSystem.get(uri2, conf);
  468.         checkPath(cluster, fs);
  469.         for(int i = 0; i < 100; i++) {
  470.           assertTrue(fs == FileSystem.get(uri2, new Configuration()));
  471.         }
  472.       }
  473.     } finally {
  474.       if (cluster != null) cluster.shutdown(); 
  475.     }
  476.   }
  477.   
  478.   static void checkPath(MiniDFSCluster cluster, FileSystem fileSys) throws IOException {
  479.     InetSocketAddress add = cluster.getNameNode().getNameNodeAddress();
  480.     // Test upper/lower case
  481.     fileSys.checkPath(new Path("hdfs://" + add.getHostName().toUpperCase() + ":" + add.getPort()));
  482.   }
  483.   public void testFsClose() throws Exception {
  484.     {
  485.       Configuration conf = new Configuration();
  486.       new Path("file:///").getFileSystem(conf);
  487.       UnixUserGroupInformation.login(conf, true);
  488.       FileSystem.closeAll();
  489.     }
  490.     {
  491.       Configuration conf = new Configuration();
  492.       new Path("hftp://localhost:12345/").getFileSystem(conf);
  493.       UnixUserGroupInformation.login(conf, true);
  494.       FileSystem.closeAll();
  495.     }
  496.     {
  497.       Configuration conf = new Configuration();
  498.       FileSystem fs = new Path("hftp://localhost:12345/").getFileSystem(conf);
  499.       UnixUserGroupInformation.login(fs.getConf(), true);
  500.       FileSystem.closeAll();
  501.     }
  502.   }
  503.   public void testCacheKeysAreCaseInsensitive()
  504.     throws Exception
  505.   {
  506.     Configuration conf = new Configuration();
  507.     
  508.     // check basic equality
  509.     FileSystem.Cache.Key lowercaseCachekey1 = new FileSystem.Cache.Key(new URI("hftp://localhost:12345/"), conf);
  510.     FileSystem.Cache.Key lowercaseCachekey2 = new FileSystem.Cache.Key(new URI("hftp://localhost:12345/"), conf);
  511.     assertEquals( lowercaseCachekey1, lowercaseCachekey2 );
  512.     // check insensitive equality    
  513.     FileSystem.Cache.Key uppercaseCachekey = new FileSystem.Cache.Key(new URI("HFTP://Localhost:12345/"), conf);
  514.     assertEquals( lowercaseCachekey2, uppercaseCachekey );
  515.     // check behaviour with collections
  516.     List<FileSystem.Cache.Key> list = new ArrayList<FileSystem.Cache.Key>();
  517.     list.add(uppercaseCachekey);
  518.     assertTrue(list.contains(uppercaseCachekey));
  519.     assertTrue(list.contains(lowercaseCachekey2));
  520.     Set<FileSystem.Cache.Key> set = new HashSet<FileSystem.Cache.Key>();
  521.     set.add(uppercaseCachekey);
  522.     assertTrue(set.contains(uppercaseCachekey));
  523.     assertTrue(set.contains(lowercaseCachekey2));
  524.     Map<FileSystem.Cache.Key, String> map = new HashMap<FileSystem.Cache.Key, String>();
  525.     map.put(uppercaseCachekey, "");
  526.     assertTrue(map.containsKey(uppercaseCachekey));
  527.     assertTrue(map.containsKey(lowercaseCachekey2));    
  528.   }
  529. }