TestCodec.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:9k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.io.compress;
  19. import java.io.BufferedInputStream;
  20. import java.io.BufferedOutputStream;
  21. import java.io.DataInputStream;
  22. import java.io.DataOutputStream;
  23. import java.io.IOException;
  24. import java.util.Random;
  25. import junit.framework.TestCase;
  26. import org.apache.commons.logging.Log;
  27. import org.apache.commons.logging.LogFactory;
  28. import org.apache.hadoop.conf.Configuration;
  29. import org.apache.hadoop.fs.FSDataOutputStream;
  30. import org.apache.hadoop.fs.FileSystem;
  31. import org.apache.hadoop.fs.Path;
  32. import org.apache.hadoop.io.DataInputBuffer;
  33. import org.apache.hadoop.io.DataOutputBuffer;
  34. import org.apache.hadoop.io.RandomDatum;
  35. import org.apache.hadoop.io.SequenceFile;
  36. import org.apache.hadoop.io.Text;
  37. import org.apache.hadoop.io.Writable;
  38. import org.apache.hadoop.util.ReflectionUtils;
  39. import org.apache.hadoop.io.SequenceFile.CompressionType;
  40. import org.apache.hadoop.io.compress.CompressionOutputStream;
  41. import org.apache.hadoop.io.compress.zlib.ZlibFactory;
  42. public class TestCodec extends TestCase {
  43.   private static final Log LOG= 
  44.     LogFactory.getLog(TestCodec.class);
  45.   private Configuration conf = new Configuration();
  46.   private int count = 10000;
  47.   private int seed = new Random().nextInt();
  48.   
  49.   public void testDefaultCodec() throws IOException {
  50.     codecTest(conf, seed, count, "org.apache.hadoop.io.compress.DefaultCodec");
  51.   }
  52.   
  53.   public void testGzipCodec() throws IOException {
  54.     codecTest(conf, seed, count, "org.apache.hadoop.io.compress.GzipCodec");
  55.   }
  56.   
  57.   public void testBZip2Codec() throws IOException {    
  58.     codecTest(conf, seed, count, "org.apache.hadoop.io.compress.BZip2Codec");    
  59.   }
  60.   private static void codecTest(Configuration conf, int seed, int count, 
  61.                                 String codecClass) 
  62.     throws IOException {
  63.     
  64.     // Create the codec
  65.     CompressionCodec codec = null;
  66.     try {
  67.       codec = (CompressionCodec)
  68.         ReflectionUtils.newInstance(conf.getClassByName(codecClass), conf);
  69.     } catch (ClassNotFoundException cnfe) {
  70.       throw new IOException("Illegal codec!");
  71.     }
  72.     LOG.info("Created a Codec object of type: " + codecClass);
  73.     // Generate data
  74.     DataOutputBuffer data = new DataOutputBuffer();
  75.     RandomDatum.Generator generator = new RandomDatum.Generator(seed);
  76.     for(int i=0; i < count; ++i) {
  77.       generator.next();
  78.       RandomDatum key = generator.getKey();
  79.       RandomDatum value = generator.getValue();
  80.       
  81.       key.write(data);
  82.       value.write(data);
  83.     }
  84.     DataInputBuffer originalData = new DataInputBuffer();
  85.     DataInputStream originalIn = new DataInputStream(new BufferedInputStream(originalData));
  86.     originalData.reset(data.getData(), 0, data.getLength());
  87.     
  88.     LOG.info("Generated " + count + " records");
  89.     
  90.     // Compress data
  91.     DataOutputBuffer compressedDataBuffer = new DataOutputBuffer();
  92.     CompressionOutputStream deflateFilter = 
  93.       codec.createOutputStream(compressedDataBuffer);
  94.     DataOutputStream deflateOut = 
  95.       new DataOutputStream(new BufferedOutputStream(deflateFilter));
  96.     deflateOut.write(data.getData(), 0, data.getLength());
  97.     deflateOut.flush();
  98.     deflateFilter.finish();
  99.     LOG.info("Finished compressing data");
  100.     
  101.     // De-compress data
  102.     DataInputBuffer deCompressedDataBuffer = new DataInputBuffer();
  103.     deCompressedDataBuffer.reset(compressedDataBuffer.getData(), 0, 
  104.                                  compressedDataBuffer.getLength());
  105.     CompressionInputStream inflateFilter = 
  106.       codec.createInputStream(deCompressedDataBuffer);
  107.     DataInputStream inflateIn = 
  108.       new DataInputStream(new BufferedInputStream(inflateFilter));
  109.     // Check
  110.     for(int i=0; i < count; ++i) {
  111.       RandomDatum k1 = new RandomDatum();
  112.       RandomDatum v1 = new RandomDatum();
  113.       k1.readFields(originalIn);
  114.       v1.readFields(originalIn);
  115.       
  116.       RandomDatum k2 = new RandomDatum();
  117.       RandomDatum v2 = new RandomDatum();
  118.       k2.readFields(inflateIn);
  119.       v2.readFields(inflateIn);
  120.     }
  121.     LOG.info("SUCCESS! Completed checking " + count + " records");
  122.   }
  123.   public void testCodecPoolGzipReuse() throws Exception {
  124.     Configuration conf = new Configuration();
  125.     conf.setBoolean("hadoop.native.lib", true);
  126.     if (!ZlibFactory.isNativeZlibLoaded(conf)) {
  127.       LOG.warn("testCodecPoolGzipReuse skipped: native libs not loaded");
  128.       return;
  129.     }
  130.     GzipCodec gzc = ReflectionUtils.newInstance(GzipCodec.class, conf);
  131.     DefaultCodec dfc = ReflectionUtils.newInstance(DefaultCodec.class, conf);
  132.     Compressor c1 = CodecPool.getCompressor(gzc);
  133.     Compressor c2 = CodecPool.getCompressor(dfc);
  134.     CodecPool.returnCompressor(c1);
  135.     CodecPool.returnCompressor(c2);
  136.     assertTrue("Got mismatched ZlibCompressor", c2 != CodecPool.getCompressor(gzc));
  137.   }
  138.   public void testSequenceFileDefaultCodec() throws IOException, ClassNotFoundException, 
  139.       InstantiationException, IllegalAccessException {
  140.     sequenceFileCodecTest(conf, 100, "org.apache.hadoop.io.compress.DefaultCodec", 100);
  141.     sequenceFileCodecTest(conf, 200000, "org.apache.hadoop.io.compress.DefaultCodec", 1000000);
  142.   }
  143.   
  144.   public void testSequenceFileBZip2Codec() throws IOException, ClassNotFoundException, 
  145.       InstantiationException, IllegalAccessException {
  146.     sequenceFileCodecTest(conf, 100, "org.apache.hadoop.io.compress.BZip2Codec", 100);    
  147.     sequenceFileCodecTest(conf, 200000, "org.apache.hadoop.io.compress.BZip2Codec", 1000000);    
  148.   }
  149.   
  150.   private static void sequenceFileCodecTest(Configuration conf, int lines, 
  151.                                 String codecClass, int blockSize) 
  152.     throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException {
  153.     Path filePath = new Path("SequenceFileCodecTest." + codecClass);
  154.     // Configuration
  155.     conf.setInt("io.seqfile.compress.blocksize", blockSize);
  156.     
  157.     // Create the SequenceFile
  158.     FileSystem fs = FileSystem.get(conf);
  159.     LOG.info("Creating SequenceFile with codec "" + codecClass + """);
  160.     SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, filePath, 
  161.         Text.class, Text.class, CompressionType.BLOCK, 
  162.         (CompressionCodec)Class.forName(codecClass).newInstance());
  163.     
  164.     // Write some data
  165.     LOG.info("Writing to SequenceFile...");
  166.     for (int i=0; i<lines; i++) {
  167.       Text key = new Text("key" + i);
  168.       Text value = new Text("value" + i);
  169.       writer.append(key, value);
  170.     }
  171.     writer.close();
  172.     
  173.     // Read the data back and check
  174.     LOG.info("Reading from the SequenceFile...");
  175.     SequenceFile.Reader reader = new SequenceFile.Reader(fs, filePath, conf);
  176.     
  177.     Writable key = (Writable)reader.getKeyClass().newInstance();
  178.     Writable value = (Writable)reader.getValueClass().newInstance();
  179.     
  180.     int lc = 0;
  181.     try {
  182.       while (reader.next(key, value)) {
  183.         assertEquals("key" + lc, key.toString());
  184.         assertEquals("value" + lc, value.toString());
  185.         lc ++;
  186.       }
  187.     } finally {
  188.       reader.close();
  189.     }
  190.     assertEquals(lines, lc);
  191.     // Delete temporary files
  192.     fs.delete(filePath, false);
  193.     LOG.info("SUCCESS! Completed SequenceFileCodecTest with codec "" + codecClass + """);
  194.   }
  195.   
  196.   public static void main(String[] args) {
  197.     int count = 10000;
  198.     String codecClass = "org.apache.hadoop.io.compress.DefaultCodec";
  199.     String usage = "TestCodec [-count N] [-codec <codec class>]";
  200.     if (args.length == 0) {
  201.       System.err.println(usage);
  202.       System.exit(-1);
  203.     }
  204.     try {
  205.       for (int i=0; i < args.length; ++i) {       // parse command line
  206.         if (args[i] == null) {
  207.           continue;
  208.         } else if (args[i].equals("-count")) {
  209.           count = Integer.parseInt(args[++i]);
  210.         } else if (args[i].equals("-codec")) {
  211.           codecClass = args[++i];
  212.         }
  213.       }
  214.       Configuration conf = new Configuration();
  215.       int seed = 0;
  216.       codecTest(conf, seed, count, codecClass);
  217.     } catch (Exception e) {
  218.       System.err.println("Caught: " + e);
  219.       e.printStackTrace();
  220.     }
  221.     
  222.   }
  223.   public TestCodec(String name) {
  224.     super(name);
  225.   }
  226. }