ZlibDecompressor.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:8k
源码类别:

网格计算

开发平台:

Java

  1. /*
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.io.compress.zlib;
  19. import java.io.IOException;
  20. import java.nio.Buffer;
  21. import java.nio.ByteBuffer;
  22. import org.apache.hadoop.io.compress.Decompressor;
  23. import org.apache.hadoop.util.NativeCodeLoader;
  24. /**
  25.  * A {@link Decompressor} based on the popular 
  26.  * zlib compression algorithm.
  27.  * http://www.zlib.net/
  28.  * 
  29.  */
  30. public class ZlibDecompressor implements Decompressor {
  31.   private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64*1024;
  32.   
  33.   // HACK - Use this as a global lock in the JNI layer
  34.   private static Class clazz = ZlibDecompressor.class;
  35.   
  36.   private long stream;
  37.   private CompressionHeader header;
  38.   private int directBufferSize;
  39.   private Buffer compressedDirectBuf = null;
  40.   private int compressedDirectBufOff, compressedDirectBufLen;
  41.   private Buffer uncompressedDirectBuf = null;
  42.   private byte[] userBuf = null;
  43.   private int userBufOff = 0, userBufLen = 0;
  44.   private boolean finished;
  45.   private boolean needDict;
  46.   /**
  47.    * The headers to detect from compressed data.
  48.    */
  49.   public static enum CompressionHeader {
  50.     /**
  51.      * No headers/trailers/checksums.
  52.      */
  53.     NO_HEADER (-15),
  54.     
  55.     /**
  56.      * Default headers/trailers/checksums.
  57.      */
  58.     DEFAULT_HEADER (15),
  59.     
  60.     /**
  61.      * Simple gzip headers/trailers.
  62.      */
  63.     GZIP_FORMAT (31),
  64.     
  65.     /**
  66.      * Autodetect gzip/zlib headers/trailers.
  67.      */
  68.     AUTODETECT_GZIP_ZLIB (47);
  69.     private final int windowBits;
  70.     
  71.     CompressionHeader(int windowBits) {
  72.       this.windowBits = windowBits;
  73.     }
  74.     
  75.     public int windowBits() {
  76.       return windowBits;
  77.     }
  78.   }
  79.   private static boolean nativeZlibLoaded = false;
  80.   
  81.   static {
  82.     if (NativeCodeLoader.isNativeCodeLoaded()) {
  83.       try {
  84.         // Initialize the native library
  85.         initIDs();
  86.         nativeZlibLoaded = true;
  87.       } catch (Throwable t) {
  88.         // Ignore failure to load/initialize native-zlib
  89.       }
  90.     }
  91.   }
  92.   
  93.   static boolean isNativeZlibLoaded() {
  94.     return nativeZlibLoaded;
  95.   }
  96.   /**
  97.    * Creates a new decompressor.
  98.    */
  99.   public ZlibDecompressor(CompressionHeader header, int directBufferSize) {
  100.     this.header = header;
  101.     this.directBufferSize = directBufferSize;
  102.     compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
  103.     uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
  104.     uncompressedDirectBuf.position(directBufferSize);
  105.     
  106.     stream = init(this.header.windowBits());
  107.   }
  108.   
  109.   public ZlibDecompressor() {
  110.     this(CompressionHeader.DEFAULT_HEADER, DEFAULT_DIRECT_BUFFER_SIZE);
  111.   }
  112.   public synchronized void setInput(byte[] b, int off, int len) {
  113.     if (b == null) {
  114.       throw new NullPointerException();
  115.     }
  116.     if (off < 0 || len < 0 || off > b.length - len) {
  117.       throw new ArrayIndexOutOfBoundsException();
  118.     }
  119.   
  120.     this.userBuf = b;
  121.     this.userBufOff = off;
  122.     this.userBufLen = len;
  123.     
  124.     setInputFromSavedData();
  125.     
  126.     // Reinitialize zlib's output direct buffer 
  127.     uncompressedDirectBuf.limit(directBufferSize);
  128.     uncompressedDirectBuf.position(directBufferSize);
  129.   }
  130.   
  131.   synchronized void setInputFromSavedData() {
  132.     compressedDirectBufOff = 0;
  133.     compressedDirectBufLen = userBufLen;
  134.     if (compressedDirectBufLen > directBufferSize) {
  135.       compressedDirectBufLen = directBufferSize;
  136.     }
  137.     // Reinitialize zlib's input direct buffer
  138.     compressedDirectBuf.rewind();
  139.     ((ByteBuffer)compressedDirectBuf).put(userBuf, userBufOff, 
  140.                                           compressedDirectBufLen);
  141.     
  142.     // Note how much data is being fed to zlib
  143.     userBufOff += compressedDirectBufLen;
  144.     userBufLen -= compressedDirectBufLen;
  145.   }
  146.   public synchronized void setDictionary(byte[] b, int off, int len) {
  147.     if (stream == 0 || b == null) {
  148.       throw new NullPointerException();
  149.     }
  150.     if (off < 0 || len < 0 || off > b.length - len) {
  151.       throw new ArrayIndexOutOfBoundsException();
  152.     }
  153.     setDictionary(stream, b, off, len);
  154.     needDict = false;
  155.   }
  156.   public synchronized boolean needsInput() {
  157.     // Consume remanining compressed data?
  158.     if (uncompressedDirectBuf.remaining() > 0) {
  159.       return false;
  160.     }
  161.     
  162.     // Check if zlib has consumed all input
  163.     if (compressedDirectBufLen <= 0) {
  164.       // Check if we have consumed all user-input
  165.       if (userBufLen <= 0) {
  166.         return true;
  167.       } else {
  168.         setInputFromSavedData();
  169.       }
  170.     }
  171.     
  172.     return false;
  173.   }
  174.   public synchronized boolean needsDictionary() {
  175.     return needDict;
  176.   }
  177.   public synchronized boolean finished() {
  178.     // Check if 'zlib' says its 'finished' and
  179.     // all compressed data has been consumed
  180.     return (finished && uncompressedDirectBuf.remaining() == 0);
  181.   }
  182.   public synchronized int decompress(byte[] b, int off, int len) 
  183.     throws IOException {
  184.     if (b == null) {
  185.       throw new NullPointerException();
  186.     }
  187.     if (off < 0 || len < 0 || off > b.length - len) {
  188.       throw new ArrayIndexOutOfBoundsException();
  189.     }
  190.     
  191.     int n = 0;
  192.     
  193.     // Check if there is uncompressed data
  194.     n = uncompressedDirectBuf.remaining();
  195.     if (n > 0) {
  196.       n = Math.min(n, len);
  197.       ((ByteBuffer)uncompressedDirectBuf).get(b, off, n);
  198.       return n;
  199.     }
  200.     
  201.     // Re-initialize the zlib's output direct buffer
  202.     uncompressedDirectBuf.rewind();
  203.     uncompressedDirectBuf.limit(directBufferSize);
  204.     // Decompress data
  205.     n = inflateBytesDirect();
  206.     uncompressedDirectBuf.limit(n);
  207.     // Get atmost 'len' bytes
  208.     n = Math.min(n, len);
  209.     ((ByteBuffer)uncompressedDirectBuf).get(b, off, n);
  210.     return n;
  211.   }
  212.   
  213.   /**
  214.    * Returns the total number of compressed bytes output so far.
  215.    *
  216.    * @return the total (non-negative) number of compressed bytes output so far
  217.    */
  218.   public synchronized long getBytesWritten() {
  219.     checkStream();
  220.     return getBytesWritten(stream);
  221.   }
  222.   /**
  223.    * Returns the total number of uncompressed bytes input so far.</p>
  224.    *
  225.    * @return the total (non-negative) number of uncompressed bytes input so far
  226.    */
  227.   public synchronized long getBytesRead() {
  228.     checkStream();
  229.     return getBytesRead(stream);
  230.   }
  231.   public synchronized void reset() {
  232.     checkStream();
  233.     reset(stream);
  234.     finished = false;
  235.     needDict = false;
  236.     compressedDirectBufOff = compressedDirectBufLen = 0;
  237.     uncompressedDirectBuf.limit(directBufferSize);
  238.     uncompressedDirectBuf.position(directBufferSize);
  239.     userBufOff = userBufLen = 0;
  240.   }
  241.   public synchronized void end() {
  242.     if (stream != 0) {
  243.       end(stream);
  244.       stream = 0;
  245.     }
  246.   }
  247.   protected void finalize() {
  248.     end();
  249.   }
  250.   
  251.   private void checkStream() {
  252.     if (stream == 0)
  253.       throw new NullPointerException();
  254.   }
  255.   
  256.   private native static void initIDs();
  257.   private native static long init(int windowBits);
  258.   private native static void setDictionary(long strm, byte[] b, int off,
  259.                                            int len);
  260.   private native int inflateBytesDirect();
  261.   private native static long getBytesRead(long strm);
  262.   private native static long getBytesWritten(long strm);
  263.   private native static void reset(long strm);
  264.   private native static void end(long strm);
  265. }