FSOutputSummer.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:6k
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.apache.hadoop.fs;
- import java.io.IOException;
- import java.io.OutputStream;
- import java.util.zip.Checksum;
- /**
- * This is a generic output stream for generating checksums for
- * data before it is written to the underlying stream
- */
- abstract public class FSOutputSummer extends OutputStream {
- // data checksum
- private Checksum sum;
- // internal buffer for storing data before it is checksumed
- private byte buf[];
- // internal buffer for storing checksum
- private byte checksum[];
- // The number of valid bytes in the buffer.
- private int count;
-
- protected FSOutputSummer(Checksum sum, int maxChunkSize, int checksumSize) {
- this.sum = sum;
- this.buf = new byte[maxChunkSize];
- this.checksum = new byte[checksumSize];
- this.count = 0;
- }
-
- /* write the data chunk in <code>b</code> staring at <code>offset</code> with
- * a length of <code>len</code>, and its checksum
- */
- protected abstract void writeChunk(byte[] b, int offset, int len, byte[] checksum)
- throws IOException;
- /** Write one byte */
- public synchronized void write(int b) throws IOException {
- sum.update(b);
- buf[count++] = (byte)b;
- if(count == buf.length) {
- flushBuffer();
- }
- }
- /**
- * Writes <code>len</code> bytes from the specified byte array
- * starting at offset <code>off</code> and generate a checksum for
- * each data chunk.
- *
- * <p> This method stores bytes from the given array into this
- * stream's buffer before it gets checksumed. The buffer gets checksumed
- * and flushed to the underlying output stream when all data
- * in a checksum chunk are in the buffer. If the buffer is empty and
- * requested length is at least as large as the size of next checksum chunk
- * size, this method will checksum and write the chunk directly
- * to the underlying output stream. Thus it avoids uneccessary data copy.
- *
- * @param b the data.
- * @param off the start offset in the data.
- * @param len the number of bytes to write.
- * @exception IOException if an I/O error occurs.
- */
- public synchronized void write(byte b[], int off, int len)
- throws IOException {
- if (off < 0 || len < 0 || off > b.length - len) {
- throw new ArrayIndexOutOfBoundsException();
- }
- for (int n=0;n<len;n+=write1(b, off+n, len-n)) {
- }
- }
-
- /**
- * Write a portion of an array, flushing to the underlying
- * stream at most once if necessary.
- */
- private int write1(byte b[], int off, int len) throws IOException {
- if(count==0 && len>=buf.length) {
- // local buffer is empty and user data has one chunk
- // checksum and output data
- final int length = buf.length;
- sum.update(b, off, length);
- writeChecksumChunk(b, off, length, false);
- return length;
- }
-
- // copy user data to local buffer
- int bytesToCopy = buf.length-count;
- bytesToCopy = (len<bytesToCopy) ? len : bytesToCopy;
- sum.update(b, off, bytesToCopy);
- System.arraycopy(b, off, buf, count, bytesToCopy);
- count += bytesToCopy;
- if (count == buf.length) {
- // local buffer is full
- flushBuffer();
- }
- return bytesToCopy;
- }
- /* Forces any buffered output bytes to be checksumed and written out to
- * the underlying output stream.
- */
- protected synchronized void flushBuffer() throws IOException {
- flushBuffer(false);
- }
- /* Forces any buffered output bytes to be checksumed and written out to
- * the underlying output stream. If keep is true, then the state of
- * this object remains intact.
- */
- protected synchronized void flushBuffer(boolean keep) throws IOException {
- if (count != 0) {
- int chunkLen = count;
- count = 0;
- writeChecksumChunk(buf, 0, chunkLen, keep);
- if (keep) {
- count = chunkLen;
- }
- }
- }
-
- /** Generate checksum for the data chunk and output data chunk & checksum
- * to the underlying output stream. If keep is true then keep the
- * current checksum intact, do not reset it.
- */
- private void writeChecksumChunk(byte b[], int off, int len, boolean keep)
- throws IOException {
- int tempChecksum = (int)sum.getValue();
- if (!keep) {
- sum.reset();
- }
- int2byte(tempChecksum, checksum);
- writeChunk(b, off, len, checksum);
- }
- /**
- * Converts a checksum integer value to a byte stream
- */
- static public byte[] convertToByteStream(Checksum sum, int checksumSize) {
- return int2byte((int)sum.getValue(), new byte[checksumSize]);
- }
- static byte[] int2byte(int integer, byte[] bytes) {
- bytes[0] = (byte)((integer >>> 24) & 0xFF);
- bytes[1] = (byte)((integer >>> 16) & 0xFF);
- bytes[2] = (byte)((integer >>> 8) & 0xFF);
- bytes[3] = (byte)((integer >>> 0) & 0xFF);
- return bytes;
- }
- /**
- * Resets existing buffer with a new one of the specified size.
- */
- protected synchronized void resetChecksumChunk(int size) {
- sum.reset();
- this.buf = new byte[size];
- this.count = 0;
- }
- }