DecompressorStream.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:4k
源码类别:

网格计算

开发平台:

Java

  1. /*
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.io.compress;
  19. import java.io.EOFException;
  20. import java.io.IOException;
  21. import java.io.InputStream;
  22. import org.apache.hadoop.io.compress.Decompressor;
  23. public class DecompressorStream extends CompressionInputStream {
  24.   protected Decompressor decompressor = null;
  25.   protected byte[] buffer;
  26.   protected boolean eof = false;
  27.   protected boolean closed = false;
  28.   
  29.   public DecompressorStream(InputStream in, Decompressor decompressor, int bufferSize) {
  30.     super(in);
  31.     if (in == null || decompressor == null) {
  32.       throw new NullPointerException();
  33.     } else if (bufferSize <= 0) {
  34.       throw new IllegalArgumentException("Illegal bufferSize");
  35.     }
  36.     this.decompressor = decompressor;
  37.     buffer = new byte[bufferSize];
  38.   }
  39.   public DecompressorStream(InputStream in, Decompressor decompressor) {
  40.     this(in, decompressor, 512);
  41.   }
  42.   /**
  43.    * Allow derived classes to directly set the underlying stream.
  44.    * 
  45.    * @param in Underlying input stream.
  46.    */
  47.   protected DecompressorStream(InputStream in) {
  48.     super(in);
  49.   }
  50.   
  51.   private byte[] oneByte = new byte[1];
  52.   public int read() throws IOException {
  53.     checkStream();
  54.     return (read(oneByte, 0, oneByte.length) == -1) ? -1 : (oneByte[0] & 0xff);
  55.   }
  56.   public int read(byte[] b, int off, int len) throws IOException {
  57.     checkStream();
  58.     
  59.     if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
  60.       throw new IndexOutOfBoundsException();
  61.     } else if (len == 0) {
  62.       return 0;
  63.     }
  64.     return decompress(b, off, len);
  65.   }
  66.   protected int decompress(byte[] b, int off, int len) throws IOException {
  67.     int n = 0;
  68.     
  69.     while ((n = decompressor.decompress(b, off, len)) == 0) {
  70.       if (decompressor.finished() || decompressor.needsDictionary()) {
  71.         eof = true;
  72.         return -1;
  73.       }
  74.       if (decompressor.needsInput()) {
  75.         getCompressedData();
  76.       }
  77.     }
  78.     
  79.     return n;
  80.   }
  81.   
  82.   protected void getCompressedData() throws IOException {
  83.     checkStream();
  84.   
  85.     int n = in.read(buffer, 0, buffer.length);
  86.     if (n == -1) {
  87.       throw new EOFException("Unexpected end of input stream");
  88.     }
  89.     decompressor.setInput(buffer, 0, n);
  90.   }
  91.   
  92.   protected void checkStream() throws IOException {
  93.     if (closed) {
  94.       throw new IOException("Stream closed");
  95.     }
  96.   }
  97.   
  98.   public void resetState() throws IOException {
  99.     decompressor.reset();
  100.   }
  101.   private byte[] skipBytes = new byte[512];
  102.   public long skip(long n) throws IOException {
  103.     // Sanity checks
  104.     if (n < 0) {
  105.       throw new IllegalArgumentException("negative skip length");
  106.     }
  107.     checkStream();
  108.     
  109.     // Read 'n' bytes
  110.     int skipped = 0;
  111.     while (skipped < n) {
  112.       int len = Math.min(((int)n - skipped), skipBytes.length);
  113.       len = read(skipBytes, 0, len);
  114.       if (len == -1) {
  115.         eof = true;
  116.         break;
  117.       }
  118.       skipped += len;
  119.     }
  120.     return skipped;
  121.   }
  122.   public int available() throws IOException {
  123.     checkStream();
  124.     return (eof) ? 0 : 1;
  125.   }
  126.   public void close() throws IOException {
  127.     if (!closed) {
  128.       in.close();
  129.       closed = true;
  130.     }
  131.   }
  132.   public boolean markSupported() {
  133.     return false;
  134.   }
  135.   public synchronized void mark(int readlimit) {
  136.   }
  137.   public synchronized void reset() throws IOException {
  138.     throw new IOException("mark/reset not supported");
  139.   }
  140. }