TestSequenceFile.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:20k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.io;
  19. import java.io.*;
  20. import java.util.*;
  21. import junit.framework.TestCase;
  22. import org.apache.commons.logging.*;
  23. import org.apache.hadoop.fs.*;
  24. import org.apache.hadoop.io.SequenceFile.CompressionType;
  25. import org.apache.hadoop.io.compress.CompressionCodec;
  26. import org.apache.hadoop.io.compress.DefaultCodec;
  27. import org.apache.hadoop.util.ReflectionUtils;
  28. import org.apache.hadoop.conf.*;
  29. /** Support for flat files of binary key/value pairs. */
  30. public class TestSequenceFile extends TestCase {
  31.   private static final Log LOG = LogFactory.getLog(TestSequenceFile.class);
  32.   private static Configuration conf = new Configuration();
  33.   
  34.   public TestSequenceFile(String name) { super(name); }
  35.   /** Unit tests for SequenceFile. */
  36.   public void testZlibSequenceFile() throws Exception {
  37.     LOG.info("Testing SequenceFile with DefaultCodec");
  38.     compressedSeqFileTest(new DefaultCodec());
  39.     LOG.info("Successfully tested SequenceFile with DefaultCodec");
  40.   }
  41.   
  42.   public void compressedSeqFileTest(CompressionCodec codec) throws Exception {
  43.     int count = 1024 * 10;
  44.     int megabytes = 1;
  45.     int factor = 5;
  46.     Path file = new Path(System.getProperty("test.build.data",".")+"/test.seq");
  47.     Path recordCompressedFile = 
  48.       new Path(System.getProperty("test.build.data",".")+"/test.rc.seq");
  49.     Path blockCompressedFile = 
  50.       new Path(System.getProperty("test.build.data",".")+"/test.bc.seq");
  51.  
  52.     int seed = new Random().nextInt();
  53.     LOG.info("Seed = " + seed);
  54.     FileSystem fs = FileSystem.getLocal(conf);
  55.     try {
  56.       // SequenceFile.Writer
  57.       writeTest(fs, count, seed, file, CompressionType.NONE, null);
  58.       readTest(fs, count, seed, file);
  59.       sortTest(fs, count, megabytes, factor, false, file);
  60.       checkSort(fs, count, seed, file);
  61.       sortTest(fs, count, megabytes, factor, true, file);
  62.       checkSort(fs, count, seed, file);
  63.       mergeTest(fs, count, seed, file, CompressionType.NONE, false, 
  64.                 factor, megabytes);
  65.       checkSort(fs, count, seed, file);
  66.       mergeTest(fs, count, seed, file, CompressionType.NONE, true, 
  67.                 factor, megabytes);
  68.       checkSort(fs, count, seed, file);
  69.         
  70.       // SequenceFile.RecordCompressWriter
  71.       writeTest(fs, count, seed, recordCompressedFile, CompressionType.RECORD, 
  72.                 codec);
  73.       readTest(fs, count, seed, recordCompressedFile);
  74.       sortTest(fs, count, megabytes, factor, false, recordCompressedFile);
  75.       checkSort(fs, count, seed, recordCompressedFile);
  76.       sortTest(fs, count, megabytes, factor, true, recordCompressedFile);
  77.       checkSort(fs, count, seed, recordCompressedFile);
  78.       mergeTest(fs, count, seed, recordCompressedFile, 
  79.                 CompressionType.RECORD, false, factor, megabytes);
  80.       checkSort(fs, count, seed, recordCompressedFile);
  81.       mergeTest(fs, count, seed, recordCompressedFile, 
  82.                 CompressionType.RECORD, true, factor, megabytes);
  83.       checkSort(fs, count, seed, recordCompressedFile);
  84.         
  85.       // SequenceFile.BlockCompressWriter
  86.       writeTest(fs, count, seed, blockCompressedFile, CompressionType.BLOCK,
  87.                 codec);
  88.       readTest(fs, count, seed, blockCompressedFile);
  89.       sortTest(fs, count, megabytes, factor, false, blockCompressedFile);
  90.       checkSort(fs, count, seed, blockCompressedFile);
  91.       sortTest(fs, count, megabytes, factor, true, blockCompressedFile);
  92.       checkSort(fs, count, seed, blockCompressedFile);
  93.       mergeTest(fs, count, seed, blockCompressedFile, CompressionType.BLOCK, 
  94.                 false, factor, megabytes);
  95.       checkSort(fs, count, seed, blockCompressedFile);
  96.       mergeTest(fs, count, seed, blockCompressedFile, CompressionType.BLOCK, 
  97.                 true, factor, megabytes);
  98.       checkSort(fs, count, seed, blockCompressedFile);
  99.     } finally {
  100.       fs.close();
  101.     }
  102.   }
  103.   private static void writeTest(FileSystem fs, int count, int seed, Path file, 
  104.                                 CompressionType compressionType, CompressionCodec codec)
  105.     throws IOException {
  106.     fs.delete(file, true);
  107.     LOG.info("creating " + count + " records with " + compressionType +
  108.              " compression");
  109.     SequenceFile.Writer writer = 
  110.       SequenceFile.createWriter(fs, conf, file, 
  111.                                 RandomDatum.class, RandomDatum.class, compressionType, codec);
  112.     RandomDatum.Generator generator = new RandomDatum.Generator(seed);
  113.     for (int i = 0; i < count; i++) {
  114.       generator.next();
  115.       RandomDatum key = generator.getKey();
  116.       RandomDatum value = generator.getValue();
  117.       writer.append(key, value);
  118.     }
  119.     writer.close();
  120.   }
  121.   private static void readTest(FileSystem fs, int count, int seed, Path file)
  122.     throws IOException {
  123.     LOG.debug("reading " + count + " records");
  124.     SequenceFile.Reader reader = new SequenceFile.Reader(fs, file, conf);
  125.     RandomDatum.Generator generator = new RandomDatum.Generator(seed);
  126.     RandomDatum k = new RandomDatum();
  127.     RandomDatum v = new RandomDatum();
  128.     DataOutputBuffer rawKey = new DataOutputBuffer();
  129.     SequenceFile.ValueBytes rawValue = reader.createValueBytes();
  130.     
  131.     for (int i = 0; i < count; i++) {
  132.       generator.next();
  133.       RandomDatum key = generator.getKey();
  134.       RandomDatum value = generator.getValue();
  135.       try {
  136.         if ((i%5) == 0) {
  137.           // Testing 'raw' apis
  138.           rawKey.reset();
  139.           reader.nextRaw(rawKey, rawValue);
  140.         } else {
  141.           // Testing 'non-raw' apis 
  142.           if ((i%2) == 0) {
  143.             reader.next(k);
  144.             reader.getCurrentValue(v);
  145.           } else {
  146.             reader.next(k, v);
  147.           }
  148.           
  149.           // Check
  150.           if (!k.equals(key))
  151.             throw new RuntimeException("wrong key at " + i);
  152.           if (!v.equals(value))
  153.             throw new RuntimeException("wrong value at " + i);
  154.         }
  155.       } catch (IOException ioe) {
  156.         LOG.info("Problem on row " + i);
  157.         LOG.info("Expected key = " + key);
  158.         LOG.info("Expected len = " + key.getLength());
  159.         LOG.info("Actual key = " + k);
  160.         LOG.info("Actual len = " + k.getLength());
  161.         LOG.info("Expected value = " + value);
  162.         LOG.info("Expected len = " + value.getLength());
  163.         LOG.info("Actual value = " + v);
  164.         LOG.info("Actual len = " + v.getLength());
  165.         LOG.info("Key equals: " + k.equals(key));
  166.         LOG.info("value equals: " + v.equals(value));
  167.         throw ioe;
  168.       }
  169.     }
  170.     reader.close();
  171.   }
  172.   private static void sortTest(FileSystem fs, int count, int megabytes, 
  173.                                int factor, boolean fast, Path file)
  174.     throws IOException {
  175.     fs.delete(new Path(file+".sorted"), true);
  176.     SequenceFile.Sorter sorter = newSorter(fs, fast, megabytes, factor);
  177.     LOG.debug("sorting " + count + " records");
  178.     sorter.sort(file, file.suffix(".sorted"));
  179.     LOG.info("done sorting " + count + " debug");
  180.   }
  181.   private static void checkSort(FileSystem fs, int count, int seed, Path file)
  182.     throws IOException {
  183.     LOG.info("sorting " + count + " records in memory for debug");
  184.     RandomDatum.Generator generator = new RandomDatum.Generator(seed);
  185.     SortedMap<RandomDatum, RandomDatum> map =
  186.       new TreeMap<RandomDatum, RandomDatum>();
  187.     for (int i = 0; i < count; i++) {
  188.       generator.next();
  189.       RandomDatum key = generator.getKey();
  190.       RandomDatum value = generator.getValue();
  191.       map.put(key, value);
  192.     }
  193.     LOG.debug("checking order of " + count + " records");
  194.     RandomDatum k = new RandomDatum();
  195.     RandomDatum v = new RandomDatum();
  196.     Iterator<Map.Entry<RandomDatum, RandomDatum>> iterator =
  197.       map.entrySet().iterator();
  198.     SequenceFile.Reader reader =
  199.       new SequenceFile.Reader(fs, file.suffix(".sorted"), conf);
  200.     for (int i = 0; i < count; i++) {
  201.       Map.Entry<RandomDatum, RandomDatum> entry = iterator.next();
  202.       RandomDatum key = entry.getKey();
  203.       RandomDatum value = entry.getValue();
  204.       reader.next(k, v);
  205.       if (!k.equals(key))
  206.         throw new RuntimeException("wrong key at " + i);
  207.       if (!v.equals(value))
  208.         throw new RuntimeException("wrong value at " + i);
  209.     }
  210.     reader.close();
  211.     LOG.debug("sucessfully checked " + count + " records");
  212.   }
  213.   private static void mergeTest(FileSystem fs, int count, int seed, Path file, 
  214.                                 CompressionType compressionType,
  215.                                 boolean fast, int factor, int megabytes)
  216.     throws IOException {
  217.     LOG.debug("creating "+factor+" files with "+count/factor+" records");
  218.     SequenceFile.Writer[] writers = new SequenceFile.Writer[factor];
  219.     Path[] names = new Path[factor];
  220.     Path[] sortedNames = new Path[factor];
  221.     
  222.     for (int i = 0; i < factor; i++) {
  223.       names[i] = file.suffix("."+i);
  224.       sortedNames[i] = names[i].suffix(".sorted");
  225.       fs.delete(names[i], true);
  226.       fs.delete(sortedNames[i], true);
  227.       writers[i] = SequenceFile.createWriter(fs, conf, names[i], 
  228.                                              RandomDatum.class, RandomDatum.class, compressionType);
  229.     }
  230.     RandomDatum.Generator generator = new RandomDatum.Generator(seed);
  231.     for (int i = 0; i < count; i++) {
  232.       generator.next();
  233.       RandomDatum key = generator.getKey();
  234.       RandomDatum value = generator.getValue();
  235.       writers[i%factor].append(key, value);
  236.     }
  237.     for (int i = 0; i < factor; i++)
  238.       writers[i].close();
  239.     for (int i = 0; i < factor; i++) {
  240.       LOG.debug("sorting file " + i + " with " + count/factor + " records");
  241.       newSorter(fs, fast, megabytes, factor).sort(names[i], sortedNames[i]);
  242.     }
  243.     LOG.info("merging " + factor + " files with " + count/factor + " debug");
  244.     fs.delete(new Path(file+".sorted"), true);
  245.     newSorter(fs, fast, megabytes, factor)
  246.       .merge(sortedNames, file.suffix(".sorted"));
  247.   }
  248.   private static SequenceFile.Sorter newSorter(FileSystem fs, 
  249.                                                boolean fast,
  250.                                                int megabytes, int factor) {
  251.     SequenceFile.Sorter sorter = 
  252.       fast
  253.       ? new SequenceFile.Sorter(fs, new RandomDatum.Comparator(),
  254.                                 RandomDatum.class, RandomDatum.class, conf)
  255.       : new SequenceFile.Sorter(fs, RandomDatum.class, RandomDatum.class, conf);
  256.     sorter.setMemory(megabytes * 1024*1024);
  257.     sorter.setFactor(factor);
  258.     return sorter;
  259.   }
  260.   /** Unit tests for SequenceFile metadata. */
  261.   public void testSequenceFileMetadata() throws Exception {
  262.     LOG.info("Testing SequenceFile with metadata");
  263.     int count = 1024 * 10;
  264.     int megabytes = 1;
  265.     int factor = 5;
  266.     CompressionCodec codec = new DefaultCodec();
  267.     Path file = new Path(System.getProperty("test.build.data",".")+"/test.seq.metadata");
  268.     Path recordCompressedFile = 
  269.       new Path(System.getProperty("test.build.data",".")+"/test.rc.seq.metadata");
  270.     Path blockCompressedFile = 
  271.       new Path(System.getProperty("test.build.data",".")+"/test.bc.seq.metadata");
  272.  
  273.     FileSystem fs = FileSystem.getLocal(conf);
  274.     SequenceFile.Metadata theMetadata = new SequenceFile.Metadata();
  275.     theMetadata.set(new Text("name_1"), new Text("value_1"));
  276.     theMetadata.set(new Text("name_2"), new Text("value_2"));
  277.     theMetadata.set(new Text("name_3"), new Text("value_3"));
  278.     theMetadata.set(new Text("name_4"), new Text("value_4"));
  279.     
  280.     int seed = new Random().nextInt();
  281.     
  282.     try {
  283.       // SequenceFile.Writer
  284.       writeMetadataTest(fs, count, seed, file, CompressionType.NONE, null, theMetadata);
  285.       SequenceFile.Metadata aMetadata = readMetadata(fs, file);
  286.       if (!theMetadata.equals(aMetadata)) {
  287.         LOG.info("The original metadata:n" + theMetadata.toString());
  288.         LOG.info("The retrieved metadata:n" + aMetadata.toString());
  289.         throw new RuntimeException("metadata not match:  " + 1);
  290.       }
  291.       // SequenceFile.RecordCompressWriter
  292.       writeMetadataTest(fs, count, seed, recordCompressedFile, CompressionType.RECORD, 
  293.                         codec, theMetadata);
  294.       aMetadata = readMetadata(fs, recordCompressedFile);
  295.       if (!theMetadata.equals(aMetadata)) {
  296.         LOG.info("The original metadata:n" + theMetadata.toString());
  297.         LOG.info("The retrieved metadata:n" + aMetadata.toString());
  298.         throw new RuntimeException("metadata not match:  " + 2);
  299.       }
  300.       // SequenceFile.BlockCompressWriter
  301.       writeMetadataTest(fs, count, seed, blockCompressedFile, CompressionType.BLOCK,
  302.                         codec, theMetadata);
  303.       aMetadata =readMetadata(fs, blockCompressedFile);
  304.       if (!theMetadata.equals(aMetadata)) {
  305.         LOG.info("The original metadata:n" + theMetadata.toString());
  306.         LOG.info("The retrieved metadata:n" + aMetadata.toString());
  307.         throw new RuntimeException("metadata not match:  " + 3);
  308.       }
  309.     } finally {
  310.       fs.close();
  311.     }
  312.     LOG.info("Successfully tested SequenceFile with metadata");
  313.   }
  314.   
  315.   
  316.   private static SequenceFile.Metadata readMetadata(FileSystem fs, Path file)
  317.     throws IOException {
  318.     LOG.info("reading file: " + file.toString() + "n");
  319.     SequenceFile.Reader reader = new SequenceFile.Reader(fs, file, conf);
  320.     SequenceFile.Metadata meta = reader.getMetadata(); 
  321.     reader.close();
  322.     return meta;
  323.   }
  324.   private static void writeMetadataTest(FileSystem fs, int count, int seed, Path file, 
  325.                                         CompressionType compressionType, CompressionCodec codec, SequenceFile.Metadata metadata)
  326.     throws IOException {
  327.     fs.delete(file, true);
  328.     LOG.info("creating " + count + " records with metadata and with" + compressionType +
  329.              " compression");
  330.     SequenceFile.Writer writer = 
  331.       SequenceFile.createWriter(fs, conf, file, 
  332.                                 RandomDatum.class, RandomDatum.class, compressionType, codec, null, metadata);
  333.     RandomDatum.Generator generator = new RandomDatum.Generator(seed);
  334.     for (int i = 0; i < count; i++) {
  335.       generator.next();
  336.       RandomDatum key = generator.getKey();
  337.       RandomDatum value = generator.getValue();
  338.       writer.append(key, value);
  339.     }
  340.     writer.close();
  341.   }
  342.   public void testClose() throws IOException {
  343.     Configuration conf = new Configuration();
  344.     LocalFileSystem fs = FileSystem.getLocal(conf);
  345.   
  346.     // create a sequence file 1
  347.     Path path1 = new Path(System.getProperty("test.build.data",".")+"/test1.seq");
  348.     SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, path1,
  349.         Text.class, NullWritable.class, CompressionType.BLOCK);
  350.     writer.append(new Text("file1-1"), NullWritable.get());
  351.     writer.append(new Text("file1-2"), NullWritable.get());
  352.     writer.close();
  353.   
  354.     Path path2 = new Path(System.getProperty("test.build.data",".")+"/test2.seq");
  355.     writer = SequenceFile.createWriter(fs, conf, path2, Text.class,
  356.         NullWritable.class, CompressionType.BLOCK);
  357.     writer.append(new Text("file2-1"), NullWritable.get());
  358.     writer.append(new Text("file2-2"), NullWritable.get());
  359.     writer.close();
  360.   
  361.     // Create a reader which uses 4 BuiltInZLibInflater instances
  362.     SequenceFile.Reader reader = new SequenceFile.Reader(fs, path1, conf);
  363.     // Returns the 4 BuiltInZLibInflater instances to the CodecPool
  364.     reader.close();
  365.     // The second close _could_ erroneously returns the same 
  366.     // 4 BuiltInZLibInflater instances to the CodecPool again
  367.     reader.close();
  368.   
  369.     // The first reader gets 4 BuiltInZLibInflater instances from the CodecPool
  370.     SequenceFile.Reader reader1 = new SequenceFile.Reader(fs, path1, conf);
  371.     // read first value from reader1
  372.     Text text = new Text();
  373.     reader1.next(text);
  374.     assertEquals("file1-1", text.toString());
  375.     
  376.     // The second reader _could_ get the same 4 BuiltInZLibInflater 
  377.     // instances from the CodePool as reader1
  378.     SequenceFile.Reader reader2 = new SequenceFile.Reader(fs, path2, conf);
  379.     
  380.     // read first value from reader2
  381.     reader2.next(text);
  382.     assertEquals("file2-1", text.toString());
  383.     // read second value from reader1
  384.     reader1.next(text);
  385.     assertEquals("file1-2", text.toString());
  386.     // read second value from reader2 (this throws an exception)
  387.     reader2.next(text);
  388.     assertEquals("file2-2", text.toString());
  389.   
  390.     assertFalse(reader1.next(text));
  391.     assertFalse(reader2.next(text));
  392.   }
  393.   /** For debugging and testing. */
  394.   public static void main(String[] args) throws Exception {
  395.     int count = 1024 * 1024;
  396.     int megabytes = 1;
  397.     int factor = 10;
  398.     boolean create = true;
  399.     boolean rwonly = false;
  400.     boolean check = false;
  401.     boolean fast = false;
  402.     boolean merge = false;
  403.     String compressType = "NONE";
  404.     String compressionCodec = "org.apache.hadoop.io.compress.DefaultCodec";
  405.     Path file = null;
  406.     int seed = new Random().nextInt();
  407.     String usage = "Usage: SequenceFile " +
  408.       "[-count N] " + 
  409.       "[-seed #] [-check] [-compressType <NONE|RECORD|BLOCK>] " + 
  410.       "-codec <compressionCodec> " + 
  411.       "[[-rwonly] | {[-megabytes M] [-factor F] [-nocreate] [-fast] [-merge]}] " +
  412.       " file";
  413.     if (args.length == 0) {
  414.       System.err.println(usage);
  415.       System.exit(-1);
  416.     }
  417.     
  418.     FileSystem fs = null;
  419.     try {
  420.       for (int i=0; i < args.length; ++i) {       // parse command line
  421.         if (args[i] == null) {
  422.           continue;
  423.         } else if (args[i].equals("-count")) {
  424.           count = Integer.parseInt(args[++i]);
  425.         } else if (args[i].equals("-megabytes")) {
  426.           megabytes = Integer.parseInt(args[++i]);
  427.         } else if (args[i].equals("-factor")) {
  428.           factor = Integer.parseInt(args[++i]);
  429.         } else if (args[i].equals("-seed")) {
  430.           seed = Integer.parseInt(args[++i]);
  431.         } else if (args[i].equals("-rwonly")) {
  432.           rwonly = true;
  433.         } else if (args[i].equals("-nocreate")) {
  434.           create = false;
  435.         } else if (args[i].equals("-check")) {
  436.           check = true;
  437.         } else if (args[i].equals("-fast")) {
  438.           fast = true;
  439.         } else if (args[i].equals("-merge")) {
  440.           merge = true;
  441.         } else if (args[i].equals("-compressType")) {
  442.           compressType = args[++i];
  443.         } else if (args[i].equals("-codec")) {
  444.           compressionCodec = args[++i];
  445.         } else {
  446.           // file is required parameter
  447.           file = new Path(args[i]);
  448.         }
  449.       }
  450.         
  451.       fs = file.getFileSystem(conf);
  452.       LOG.info("count = " + count);
  453.       LOG.info("megabytes = " + megabytes);
  454.       LOG.info("factor = " + factor);
  455.       LOG.info("create = " + create);
  456.       LOG.info("seed = " + seed);
  457.       LOG.info("rwonly = " + rwonly);
  458.       LOG.info("check = " + check);
  459.       LOG.info("fast = " + fast);
  460.       LOG.info("merge = " + merge);
  461.       LOG.info("compressType = " + compressType);
  462.       LOG.info("compressionCodec = " + compressionCodec);
  463.       LOG.info("file = " + file);
  464.       if (rwonly && (!create || merge || fast)) {
  465.         System.err.println(usage);
  466.         System.exit(-1);
  467.       }
  468.       CompressionType compressionType = 
  469.         CompressionType.valueOf(compressType);
  470.       CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(
  471.                                                                              conf.getClassByName(compressionCodec), 
  472.                                                                              conf);
  473.       if (rwonly || (create && !merge)) {
  474.         writeTest(fs, count, seed, file, compressionType, codec);
  475.         readTest(fs, count, seed, file);
  476.       }
  477.       if (!rwonly) {
  478.         if (merge) {
  479.           mergeTest(fs, count, seed, file, compressionType, 
  480.                     fast, factor, megabytes);
  481.         } else {
  482.           sortTest(fs, count, megabytes, factor, fast, file);
  483.         }
  484.       }
  485.     
  486.       if (check) {
  487.         checkSort(fs, count, seed, file);
  488.       }
  489.     } finally {
  490.       fs.close();
  491.     }
  492.   }
  493. }