FSOutputSummer.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:6k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.fs;
  19. import java.io.IOException;
  20. import java.io.OutputStream;
  21. import java.util.zip.Checksum;
  22. /**
  23.  * This is a generic output stream for generating checksums for
  24.  * data before it is written to the underlying stream
  25.  */
  26. abstract public class FSOutputSummer extends OutputStream {
  27.   // data checksum
  28.   private Checksum sum;
  29.   // internal buffer for storing data before it is checksumed
  30.   private byte buf[];
  31.   // internal buffer for storing checksum
  32.   private byte checksum[];
  33.   // The number of valid bytes in the buffer.
  34.   private int count;
  35.   
  36.   protected FSOutputSummer(Checksum sum, int maxChunkSize, int checksumSize) {
  37.     this.sum = sum;
  38.     this.buf = new byte[maxChunkSize];
  39.     this.checksum = new byte[checksumSize];
  40.     this.count = 0;
  41.   }
  42.   
  43.   /* write the data chunk in <code>b</code> staring at <code>offset</code> with
  44.    * a length of <code>len</code>, and its checksum
  45.    */
  46.   protected abstract void writeChunk(byte[] b, int offset, int len, byte[] checksum)
  47.   throws IOException;
  48.   /** Write one byte */
  49.   public synchronized void write(int b) throws IOException {
  50.     sum.update(b);
  51.     buf[count++] = (byte)b;
  52.     if(count == buf.length) {
  53.       flushBuffer();
  54.     }
  55.   }
  56.   /**
  57.    * Writes <code>len</code> bytes from the specified byte array 
  58.    * starting at offset <code>off</code> and generate a checksum for
  59.    * each data chunk.
  60.    *
  61.    * <p> This method stores bytes from the given array into this
  62.    * stream's buffer before it gets checksumed. The buffer gets checksumed 
  63.    * and flushed to the underlying output stream when all data 
  64.    * in a checksum chunk are in the buffer.  If the buffer is empty and
  65.    * requested length is at least as large as the size of next checksum chunk
  66.    * size, this method will checksum and write the chunk directly 
  67.    * to the underlying output stream.  Thus it avoids uneccessary data copy.
  68.    *
  69.    * @param      b     the data.
  70.    * @param      off   the start offset in the data.
  71.    * @param      len   the number of bytes to write.
  72.    * @exception  IOException  if an I/O error occurs.
  73.    */
  74.   public synchronized void write(byte b[], int off, int len)
  75.   throws IOException {
  76.     if (off < 0 || len < 0 || off > b.length - len) {
  77.       throw new ArrayIndexOutOfBoundsException();
  78.     }
  79.     for (int n=0;n<len;n+=write1(b, off+n, len-n)) {
  80.     }
  81.   }
  82.   
  83.   /**
  84.    * Write a portion of an array, flushing to the underlying
  85.    * stream at most once if necessary.
  86.    */
  87.   private int write1(byte b[], int off, int len) throws IOException {
  88.     if(count==0 && len>=buf.length) {
  89.       // local buffer is empty and user data has one chunk
  90.       // checksum and output data
  91.       final int length = buf.length;
  92.       sum.update(b, off, length);
  93.       writeChecksumChunk(b, off, length, false);
  94.       return length;
  95.     }
  96.     
  97.     // copy user data to local buffer
  98.     int bytesToCopy = buf.length-count;
  99.     bytesToCopy = (len<bytesToCopy) ? len : bytesToCopy;
  100.     sum.update(b, off, bytesToCopy);
  101.     System.arraycopy(b, off, buf, count, bytesToCopy);
  102.     count += bytesToCopy;
  103.     if (count == buf.length) {
  104.       // local buffer is full
  105.       flushBuffer();
  106.     } 
  107.     return bytesToCopy;
  108.   }
  109.   /* Forces any buffered output bytes to be checksumed and written out to
  110.    * the underlying output stream. 
  111.    */
  112.   protected synchronized void flushBuffer() throws IOException {
  113.     flushBuffer(false);
  114.   }
  115.   /* Forces any buffered output bytes to be checksumed and written out to
  116.    * the underlying output stream.  If keep is true, then the state of 
  117.    * this object remains intact.
  118.    */
  119.   protected synchronized void flushBuffer(boolean keep) throws IOException {
  120.     if (count != 0) {
  121.       int chunkLen = count;
  122.       count = 0;
  123.       writeChecksumChunk(buf, 0, chunkLen, keep);
  124.       if (keep) {
  125.         count = chunkLen;
  126.       }
  127.     }
  128.   }
  129.   
  130.   /** Generate checksum for the data chunk and output data chunk & checksum
  131.    * to the underlying output stream. If keep is true then keep the
  132.    * current checksum intact, do not reset it.
  133.    */
  134.   private void writeChecksumChunk(byte b[], int off, int len, boolean keep)
  135.   throws IOException {
  136.     int tempChecksum = (int)sum.getValue();
  137.     if (!keep) {
  138.       sum.reset();
  139.     }
  140.     int2byte(tempChecksum, checksum);
  141.     writeChunk(b, off, len, checksum);
  142.   }
  143.   /**
  144.    * Converts a checksum integer value to a byte stream
  145.    */
  146.   static public byte[] convertToByteStream(Checksum sum, int checksumSize) {
  147.     return int2byte((int)sum.getValue(), new byte[checksumSize]);
  148.   }
  149.   static byte[] int2byte(int integer, byte[] bytes) {
  150.     bytes[0] = (byte)((integer >>> 24) & 0xFF);
  151.     bytes[1] = (byte)((integer >>> 16) & 0xFF);
  152.     bytes[2] = (byte)((integer >>>  8) & 0xFF);
  153.     bytes[3] = (byte)((integer >>>  0) & 0xFF);
  154.     return bytes;
  155.   }
  156.   /**
  157.    * Resets existing buffer with a new one of the specified size.
  158.    */
  159.   protected synchronized void resetChecksumChunk(int size) {
  160.     sum.reset();
  161.     this.buf = new byte[size];
  162.     this.count = 0;
  163.   }
  164. }