BlockCompressorStream.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:5k
源码类别:

网格计算

开发平台:

Java

  1. /*
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.io.compress;
  19. import java.io.IOException;
  20. import java.io.OutputStream;
  21. /**
  22.  * A {@link org.apache.hadoop.io.compress.CompressorStream} which works
  23.  * with 'block-based' based compression algorithms, as opposed to 
  24.  * 'stream-based' compression algorithms.
  25.  *
  26.  * It should be noted that this wrapper does not guarantee that blocks will
  27.  * be sized for the compressor. If the
  28.  * {@link org.apache.hadoop.io.compress.Compressor} requires buffering to
  29.  * effect meaningful compression, it is responsible for it.
  30.  */
  31. public class BlockCompressorStream extends CompressorStream {
  32.   // The 'maximum' size of input data to be compressed, to account
  33.   // for the overhead of the compression algorithm.
  34.   private final int MAX_INPUT_SIZE;
  35.   /**
  36.    * Create a {@link BlockCompressorStream}.
  37.    * 
  38.    * @param out stream
  39.    * @param compressor compressor to be used
  40.    * @param bufferSize size of buffer
  41.    * @param compressionOverhead maximum 'overhead' of the compression 
  42.    *                            algorithm with given bufferSize
  43.    */
  44.   public BlockCompressorStream(OutputStream out, Compressor compressor, 
  45.                                int bufferSize, int compressionOverhead) {
  46.     super(out, compressor, bufferSize);
  47.     MAX_INPUT_SIZE = bufferSize - compressionOverhead;
  48.   }
  49.   /**
  50.    * Create a {@link BlockCompressorStream} with given output-stream and 
  51.    * compressor.
  52.    * Use default of 512 as bufferSize and compressionOverhead of 
  53.    * (1% of bufferSize + 12 bytes) =  18 bytes (zlib algorithm).
  54.    * 
  55.    * @param out stream
  56.    * @param compressor compressor to be used
  57.    */
  58.   public BlockCompressorStream(OutputStream out, Compressor compressor) {
  59.     this(out, compressor, 512, 18);
  60.   }
  61.   /**
  62.    * Write the data provided to the compression codec, compressing no more
  63.    * than the buffer size less the compression overhead as specified during
  64.    * construction for each block.
  65.    *
  66.    * Each block contains the uncompressed length for the block, followed by
  67.    * one or more length-prefixed blocks of compressed data.
  68.    */
  69.   public void write(byte[] b, int off, int len) throws IOException {
  70.     // Sanity checks
  71.     if (compressor.finished()) {
  72.       throw new IOException("write beyond end of stream");
  73.     }
  74.     if (b == null) {
  75.       throw new NullPointerException();
  76.     } else if ((off < 0) || (off > b.length) || (len < 0) ||
  77.                ((off + len) > b.length)) {
  78.       throw new IndexOutOfBoundsException();
  79.     } else if (len == 0) {
  80.       return;
  81.     }
  82.     long limlen = compressor.getBytesRead();
  83.     if (len + limlen > MAX_INPUT_SIZE && limlen > 0) {
  84.       // Adding this segment would exceed the maximum size.
  85.       // Flush data if we have it.
  86.       finish();
  87.       compressor.reset();
  88.     }
  89.     if (len > MAX_INPUT_SIZE) {
  90.       // The data we're given exceeds the maximum size. Any data
  91.       // we had have been flushed, so we write out this chunk in segments
  92.       // not exceeding the maximum size until it is exhausted.
  93.       rawWriteInt(len);
  94.       do {
  95.         int bufLen = Math.min(len, MAX_INPUT_SIZE);
  96.         
  97.         compressor.setInput(b, off, bufLen);
  98.         compressor.finish();
  99.         while (!compressor.finished()) {
  100.           compress();
  101.         }
  102.         compressor.reset();
  103.         off += bufLen;
  104.         len -= bufLen;
  105.       } while (len > 0);
  106.       return;
  107.     }
  108.     // Give data to the compressor
  109.     compressor.setInput(b, off, len);
  110.     if (!compressor.needsInput()) {
  111.       // compressor buffer size might be smaller than the maximum
  112.       // size, so we permit it to flush if required.
  113.       rawWriteInt((int)compressor.getBytesRead());
  114.       do {
  115.         compress();
  116.       } while (!compressor.needsInput());
  117.     }
  118.   }
  119.   public void finish() throws IOException {
  120.     if (!compressor.finished()) {
  121.       rawWriteInt((int)compressor.getBytesRead());
  122.       compressor.finish();
  123.       while (!compressor.finished()) {
  124.         compress();
  125.       }
  126.     }
  127.   }
  128.   protected void compress() throws IOException {
  129.     int len = compressor.compress(buffer, 0, buffer.length);
  130.     if (len > 0) {
  131.       // Write out the compressed chunk
  132.       rawWriteInt(len);
  133.       out.write(buffer, 0, len);
  134.     }
  135.   }
  136.   
  137.   private void rawWriteInt(int v) throws IOException {
  138.     out.write((v >>> 24) & 0xFF);
  139.     out.write((v >>> 16) & 0xFF);
  140.     out.write((v >>>  8) & 0xFF);
  141.     out.write((v >>>  0) & 0xFF);
  142.   }
  143. }