FSInputChecker.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:13k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.fs;
  19. import java.io.IOException;
  20. import java.io.InputStream;
  21. import java.util.zip.Checksum;
  22. import org.apache.commons.logging.Log;
  23. import org.apache.commons.logging.LogFactory;
  24. import org.apache.hadoop.util.StringUtils;
  25. /**
  26.  * This is a generic input stream for verifying checksums for
  27.  * data before it is read by a user.
  28.  */
  29. abstract public class FSInputChecker extends FSInputStream {
  30.   public static final Log LOG 
  31.   = LogFactory.getLog(FSInputChecker.class);
  32.   
  33.   /** The file name from which data is read from */
  34.   protected Path file;
  35.   private Checksum sum;
  36.   private boolean verifyChecksum = true;
  37.   private byte[] buf;
  38.   private byte[] checksum;
  39.   private int pos;
  40.   private int count;
  41.   
  42.   private int numOfRetries;
  43.   
  44.   // cached file position
  45.   private long chunkPos = 0;
  46.   
  47.   /** Constructor
  48.    * 
  49.    * @param file The name of the file to be read
  50.    * @param numOfRetries Number of read retries when ChecksumError occurs
  51.    */
  52.   protected FSInputChecker( Path file, int numOfRetries) {
  53.     this.file = file;
  54.     this.numOfRetries = numOfRetries;
  55.   }
  56.   
  57.   /** Constructor
  58.    * 
  59.    * @param file The name of the file to be read
  60.    * @param numOfRetries Number of read retries when ChecksumError occurs
  61.    * @param sum the type of Checksum engine
  62.    * @param chunkSize maximun chunk size
  63.    * @param checksumSize the number byte of each checksum
  64.    */
  65.   protected FSInputChecker( Path file, int numOfRetries, 
  66.       boolean verifyChecksum, Checksum sum, int chunkSize, int checksumSize ) {
  67.     this(file, numOfRetries);
  68.     set(verifyChecksum, sum, chunkSize, checksumSize);
  69.   }
  70.   
  71.   /** Reads in next checksum chunk data into <code>buf</code> at <code>offset</code>
  72.    * and checksum into <code>checksum</code>.
  73.    * The method is used for implementing read, therefore, it should be optimized
  74.    * for sequential reading
  75.    * @param pos chunkPos
  76.    * @param buf desitination buffer
  77.    * @param offset offset in buf at which to store data
  78.    * @param len maximun number of bytes to read
  79.    * @return number of bytes read
  80.    */
  81.   abstract protected int readChunk(long pos, byte[] buf, int offset, int len,
  82.       byte[] checksum) throws IOException;
  83.   /** Return position of beginning of chunk containing pos. 
  84.    *
  85.    * @param pos a postion in the file
  86.    * @return the starting position of the chunk which contains the byte
  87.    */
  88.   abstract protected long getChunkPosition(long pos);
  89.   /** Return true if there is a need for checksum verification */
  90.   protected synchronized boolean needChecksum() {
  91.     return verifyChecksum && sum != null;
  92.   }
  93.   
  94.   /**
  95.    * Read one checksum-verified byte
  96.    * 
  97.    * @return     the next byte of data, or <code>-1</code> if the end of the
  98.    *             stream is reached.
  99.    * @exception  IOException  if an I/O error occurs.
  100.    */
  101.   public synchronized int read() throws IOException {
  102.     if (pos >= count) {
  103.       fill();
  104.       if (pos >= count) {
  105.         return -1;
  106.       }
  107.     }
  108.     return buf[pos++] & 0xff;
  109.   }
  110.   
  111.   /**
  112.    * Read checksum verified bytes from this byte-input stream into 
  113.    * the specified byte array, starting at the given offset.
  114.    *
  115.    * <p> This method implements the general contract of the corresponding
  116.    * <code>{@link InputStream#read(byte[], int, int) read}</code> method of
  117.    * the <code>{@link InputStream}</code> class.  As an additional
  118.    * convenience, it attempts to read as many bytes as possible by repeatedly
  119.    * invoking the <code>read</code> method of the underlying stream.  This
  120.    * iterated <code>read</code> continues until one of the following
  121.    * conditions becomes true: <ul>
  122.    *
  123.    *   <li> The specified number of bytes have been read,
  124.    *
  125.    *   <li> The <code>read</code> method of the underlying stream returns
  126.    *   <code>-1</code>, indicating end-of-file.
  127.    *
  128.    * </ul> If the first <code>read</code> on the underlying stream returns
  129.    * <code>-1</code> to indicate end-of-file then this method returns
  130.    * <code>-1</code>.  Otherwise this method returns the number of bytes
  131.    * actually read.
  132.    *
  133.    * @param      b     destination buffer.
  134.    * @param      off   offset at which to start storing bytes.
  135.    * @param      len   maximum number of bytes to read.
  136.    * @return     the number of bytes read, or <code>-1</code> if the end of
  137.    *             the stream has been reached.
  138.    * @exception  IOException  if an I/O error occurs.
  139.    *             ChecksumException if any checksum error occurs
  140.    */
  141.   public synchronized int read(byte[] b, int off, int len) throws IOException {
  142.     // parameter check
  143.     if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
  144.       throw new IndexOutOfBoundsException();
  145.     } else if (len == 0) {
  146.       return 0;
  147.     }
  148.     int n = 0;
  149.     for (;;) {
  150.       int nread = read1(b, off + n, len - n);
  151.       if (nread <= 0) 
  152.         return (n == 0) ? nread : n;
  153.       n += nread;
  154.       if (n >= len)
  155.         return n;
  156.     }
  157.   }
  158.   
  159.   /**
  160.    * Fills the buffer with a chunk data. 
  161.    * No mark is supported.
  162.    * This method assumes that all data in the buffer has already been read in,
  163.    * hence pos > count.
  164.    */
  165.   private void fill(  ) throws IOException {
  166.     assert(pos>=count);
  167.     // fill internal buffer
  168.     count = readChecksumChunk(buf, 0, buf.length);
  169.   }
  170.   
  171.   /*
  172.    * Read characters into a portion of an array, reading from the underlying
  173.    * stream at most once if necessary.
  174.    */
  175.   private int read1(byte b[], int off, int len)
  176.   throws IOException {
  177.     int avail = count-pos;
  178.     if( avail <= 0 ) {
  179.       if(len>=buf.length) {
  180.         // read a chunk to user buffer directly; avoid one copy
  181.         int nread = readChecksumChunk(b, off, len);
  182.         return nread;
  183.       } else {
  184.         // read a chunk into the local buffer
  185.         fill();
  186.         if( count <= 0 ) {
  187.           return -1;
  188.         } else {
  189.           avail = count;
  190.         }
  191.       }
  192.     }
  193.     
  194.     // copy content of the local buffer to the user buffer
  195.     int cnt = (avail < len) ? avail : len;
  196.     System.arraycopy(buf, pos, b, off, cnt);
  197.     pos += cnt;
  198.     return cnt;    
  199.   }
  200.   
  201.   /* Read up one checksum chunk to array <i>b</i> at pos <i>off</i>
  202.    * It requires a checksum chunk boundary
  203.    * in between <cur_pos, cur_pos+len> 
  204.    * and it stops reading at the boundary or at the end of the stream;
  205.    * Otherwise an IllegalArgumentException is thrown.
  206.    * This makes sure that all data read are checksum verified.
  207.    * 
  208.    * @param b   the buffer into which the data is read.
  209.    * @param off the start offset in array <code>b</code>
  210.    *            at which the data is written.
  211.    * @param len the maximum number of bytes to read.
  212.    * @return    the total number of bytes read into the buffer, or
  213.    *            <code>-1</code> if there is no more data because the end of
  214.    *            the stream has been reached.
  215.    * @throws IOException if an I/O error occurs.
  216.    */ 
  217.   private int readChecksumChunk(byte b[], int off, int len)
  218.   throws IOException {
  219.     // invalidate buffer
  220.     count = pos = 0;
  221.           
  222.     int read = 0;
  223.     boolean retry = true;
  224.     int retriesLeft = numOfRetries; 
  225.     do {
  226.       retriesLeft--;
  227.       try {
  228.         read = readChunk(chunkPos, b, off, len, checksum);
  229.         if( read > 0 ) {
  230.           if( needChecksum() ) {
  231.             sum.update(b, off, read);
  232.             verifySum(chunkPos);
  233.           }
  234.           chunkPos += read;
  235.         } 
  236.         retry = false;
  237.       } catch (ChecksumException ce) {
  238.           LOG.info("Found checksum error: b[" + off + ", " + (off+read) + "]="
  239.               + StringUtils.byteToHexString(b, off, off + read), ce);
  240.           if (retriesLeft == 0) {
  241.             throw ce;
  242.           }
  243.           
  244.           // try a new replica
  245.           if (seekToNewSource(chunkPos)) {
  246.             // Since at least one of the sources is different, 
  247.             // the read might succeed, so we'll retry.
  248.             seek(chunkPos);
  249.           } else {
  250.             // Neither the data stream nor the checksum stream are being read
  251.             // from different sources, meaning we'll still get a checksum error 
  252.             // if we try to do the read again.  We throw an exception instead.
  253.             throw ce;
  254.           }
  255.         }
  256.     } while (retry);
  257.     return read;
  258.   }
  259.   
  260.   /* verify checksum for the chunk.
  261.    * @throws ChecksumException if there is a mismatch
  262.    */
  263.   private void verifySum(long errPos) throws ChecksumException {
  264.     long crc = getChecksum();
  265.     long sumValue = sum.getValue();
  266.     sum.reset();
  267.     if (crc != sumValue) {
  268.       throw new ChecksumException(
  269.           "Checksum error: "+file+" at "+errPos, errPos);
  270.     }
  271.   }
  272.   
  273.   /* calculate checksum value */
  274.   private long getChecksum() {
  275.     return checksum2long(checksum);
  276.   }
  277.   /** Convert a checksum byte array to a long */
  278.   static public long checksum2long(byte[] checksum) {
  279.     long crc = 0L;
  280.     for(int i=0; i<checksum.length; i++) {
  281.       crc |= (0xffL&(long)checksum[i])<<((checksum.length-i-1)*8);
  282.     }
  283.     return crc;
  284.   }
  285.   
  286.   @Override
  287.   public synchronized long getPos() throws IOException {
  288.     return chunkPos-(count-pos);
  289.   }
  290.   @Override
  291.   public synchronized int available() throws IOException {
  292.     return count-pos;
  293.   }
  294.   
  295.   /**
  296.    * Skips over and discards <code>n</code> bytes of data from the
  297.    * input stream.
  298.    *
  299.    * <p>This method may skip more bytes than are remaining in the backing
  300.    * file. This produces no exception and the number of bytes skipped
  301.    * may include some number of bytes that were beyond the EOF of the
  302.    * backing file. Attempting to read from the stream after skipping past
  303.    * the end will result in -1 indicating the end of the file.
  304.    *
  305.    *<p>If <code>n</code> is negative, no bytes are skipped.
  306.    *
  307.    * @param      n   the number of bytes to be skipped.
  308.    * @return     the actual number of bytes skipped.
  309.    * @exception  IOException  if an I/O error occurs.
  310.    *             ChecksumException if the chunk to skip to is corrupted
  311.    */
  312.   public synchronized long skip(long n) throws IOException {
  313.     if (n <= 0) {
  314.       return 0;
  315.     }
  316.     seek(getPos()+n);
  317.     return n;
  318.   }
  319.   /**
  320.    * Seek to the given position in the stream.
  321.    * The next read() will be from that position.
  322.    * 
  323.    * <p>This method may seek past the end of the file.
  324.    * This produces no exception and an attempt to read from
  325.    * the stream will result in -1 indicating the end of the file.
  326.    *
  327.    * @param      pos   the postion to seek to.
  328.    * @exception  IOException  if an I/O error occurs.
  329.    *             ChecksumException if the chunk to seek to is corrupted
  330.    */
  331.   public synchronized void seek(long pos) throws IOException {
  332.     if( pos<0 ) {
  333.       return;
  334.     }
  335.     // optimize: check if the pos is in the buffer
  336.     long start = chunkPos - this.count;
  337.     if( pos>=start && pos<chunkPos) {
  338.       this.pos = (int)(pos-start);
  339.       return;
  340.     }
  341.     
  342.     // reset the current state
  343.     resetState();
  344.     
  345.     // seek to a checksum boundary
  346.     chunkPos = getChunkPosition(pos);
  347.     
  348.     // scan to the desired position
  349.     int delta = (int)(pos - chunkPos);
  350.     if( delta > 0) {
  351.       readFully(this, new byte[delta], 0, delta);
  352.     }
  353.   }
  354.   /**
  355.    * A utility function that tries to read up to <code>len</code> bytes from
  356.    * <code>stm</code>
  357.    * 
  358.    * @param stm    an input stream
  359.    * @param buf    destiniation buffer
  360.    * @param offset offset at which to store data
  361.    * @param len    number of bytes to read
  362.    * @return actual number of bytes read
  363.    * @throws IOException if there is any IO error
  364.    */
  365.   protected static int readFully(InputStream stm, 
  366.       byte[] buf, int offset, int len) throws IOException {
  367.     int n = 0;
  368.     for (;;) {
  369.       int nread = stm.read(buf, offset + n, len - n);
  370.       if (nread <= 0) 
  371.         return (n == 0) ? nread : n;
  372.       n += nread;
  373.       if (n >= len)
  374.         return n;
  375.     }
  376.   }
  377.   
  378.   /**
  379.    * Set the checksum related parameters
  380.    * @param verifyChecksum whether to verify checksum
  381.    * @param sum which type of checksum to use
  382.    * @param maxChunkSize maximun chunk size
  383.    * @param checksumSize checksum size
  384.    */
  385.   final protected synchronized void set(boolean verifyChecksum,
  386.       Checksum sum, int maxChunkSize, int checksumSize ) {
  387.     this.verifyChecksum = verifyChecksum;
  388.     this.sum = sum;
  389.     this.buf = new byte[maxChunkSize];
  390.     this.checksum = new byte[checksumSize];
  391.     this.count = 0;
  392.     this.pos = 0;
  393.   }
  394.   final public boolean markSupported() {
  395.     return false;
  396.   }
  397.   
  398.   final public void mark(int readlimit) {
  399.   }
  400.   
  401.   final public void reset() throws IOException {
  402.     throw new IOException("mark/reset not supported");
  403.   }
  404.   
  405.   /* reset this FSInputChecker's state */
  406.   private void resetState() {
  407.     // invalidate buffer
  408.     count = 0;
  409.     pos = 0;
  410.     // reset Checksum
  411.     if (sum != null) {
  412.       sum.reset();
  413.     }
  414.   }
  415. }