ChecksumFileSystem.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:18k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.fs;
  19. import java.io.*;
  20. import java.util.Arrays;
  21. import java.util.zip.CRC32;
  22. import org.apache.commons.logging.Log;
  23. import org.apache.commons.logging.LogFactory;
  24. import org.apache.hadoop.conf.Configuration;
  25. import org.apache.hadoop.fs.permission.FsPermission;
  26. import org.apache.hadoop.util.Progressable;
  27. import org.apache.hadoop.util.StringUtils;
  28. /****************************************************************
  29.  * Abstract Checksumed FileSystem.
  30.  * It provide a basice implementation of a Checksumed FileSystem,
  31.  * which creates a checksum file for each raw file.
  32.  * It generates & verifies checksums at the client side.
  33.  *
  34.  *****************************************************************/
  35. public abstract class ChecksumFileSystem extends FilterFileSystem {
  36.   private static final byte[] CHECKSUM_VERSION = new byte[] {'c', 'r', 'c', 0};
  37.   private int bytesPerChecksum = 512;
  38.   private boolean verifyChecksum = true;
  39.   public static double getApproxChkSumLength(long size) {
  40.     return ChecksumFSOutputSummer.CHKSUM_AS_FRACTION * size;
  41.   }
  42.   
  43.   public ChecksumFileSystem(FileSystem fs) {
  44.     super(fs);
  45.   }
  46.   public void setConf(Configuration conf) {
  47.     super.setConf(conf);
  48.     if (conf != null) {
  49.       bytesPerChecksum = conf.getInt("io.bytes.per.checksum", 512);
  50.     }
  51.   }
  52.   
  53.   /**
  54.    * Set whether to verify checksum.
  55.    */
  56.   public void setVerifyChecksum(boolean verifyChecksum) {
  57.     this.verifyChecksum = verifyChecksum;
  58.   }
  59.   /** get the raw file system */
  60.   public FileSystem getRawFileSystem() {
  61.     return fs;
  62.   }
  63.   /** Return the name of the checksum file associated with a file.*/
  64.   public Path getChecksumFile(Path file) {
  65.     return new Path(file.getParent(), "." + file.getName() + ".crc");
  66.   }
  67.   /** Return true iff file is a checksum file name.*/
  68.   public static boolean isChecksumFile(Path file) {
  69.     String name = file.getName();
  70.     return name.startsWith(".") && name.endsWith(".crc");
  71.   }
  72.   /** Return the length of the checksum file given the size of the 
  73.    * actual file.
  74.    **/
  75.   public long getChecksumFileLength(Path file, long fileSize) {
  76.     return getChecksumLength(fileSize, getBytesPerSum());
  77.   }
  78.   /** Return the bytes Per Checksum */
  79.   public int getBytesPerSum() {
  80.     return bytesPerChecksum;
  81.   }
  82.   private int getSumBufferSize(int bytesPerSum, int bufferSize) {
  83.     int defaultBufferSize = getConf().getInt("io.file.buffer.size", 4096);
  84.     int proportionalBufferSize = bufferSize / bytesPerSum;
  85.     return Math.max(bytesPerSum,
  86.                     Math.max(proportionalBufferSize, defaultBufferSize));
  87.   }
  88.   /*******************************************************
  89.    * For open()'s FSInputStream
  90.    * It verifies that data matches checksums.
  91.    *******************************************************/
  92.   private static class ChecksumFSInputChecker extends FSInputChecker {
  93.     public static final Log LOG 
  94.       = LogFactory.getLog(FSInputChecker.class);
  95.     
  96.     private ChecksumFileSystem fs;
  97.     private FSDataInputStream datas;
  98.     private FSDataInputStream sums;
  99.     
  100.     private static final int HEADER_LENGTH = 8;
  101.     
  102.     private int bytesPerSum = 1;
  103.     private long fileLen = -1L;
  104.     
  105.     public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file)
  106.       throws IOException {
  107.       this(fs, file, fs.getConf().getInt("io.file.buffer.size", 4096));
  108.     }
  109.     
  110.     public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file, int bufferSize)
  111.       throws IOException {
  112.       super( file, fs.getFileStatus(file).getReplication() );
  113.       this.datas = fs.getRawFileSystem().open(file, bufferSize);
  114.       this.fs = fs;
  115.       Path sumFile = fs.getChecksumFile(file);
  116.       try {
  117.         int sumBufferSize = fs.getSumBufferSize(fs.getBytesPerSum(), bufferSize);
  118.         sums = fs.getRawFileSystem().open(sumFile, sumBufferSize);
  119.         byte[] version = new byte[CHECKSUM_VERSION.length];
  120.         sums.readFully(version);
  121.         if (!Arrays.equals(version, CHECKSUM_VERSION))
  122.           throw new IOException("Not a checksum file: "+sumFile);
  123.         this.bytesPerSum = sums.readInt();
  124.         set(fs.verifyChecksum, new CRC32(), bytesPerSum, 4);
  125.       } catch (FileNotFoundException e) {         // quietly ignore
  126.         set(fs.verifyChecksum, null, 1, 0);
  127.       } catch (IOException e) {                   // loudly ignore
  128.         LOG.warn("Problem opening checksum file: "+ file + 
  129.                  ".  Ignoring exception: " + 
  130.                  StringUtils.stringifyException(e));
  131.         set(fs.verifyChecksum, null, 1, 0);
  132.       }
  133.     }
  134.     
  135.     private long getChecksumFilePos( long dataPos ) {
  136.       return HEADER_LENGTH + 4*(dataPos/bytesPerSum);
  137.     }
  138.     
  139.     protected long getChunkPosition( long dataPos ) {
  140.       return dataPos/bytesPerSum*bytesPerSum;
  141.     }
  142.     
  143.     public int available() throws IOException {
  144.       return datas.available() + super.available();
  145.     }
  146.     
  147.     public int read(long position, byte[] b, int off, int len)
  148.       throws IOException {
  149.       // parameter check
  150.       if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
  151.         throw new IndexOutOfBoundsException();
  152.       } else if (len == 0) {
  153.         return 0;
  154.       }
  155.       if( position<0 ) {
  156.         throw new IllegalArgumentException(
  157.             "Parameter position can not to be negative");
  158.       }
  159.       ChecksumFSInputChecker checker = new ChecksumFSInputChecker(fs, file);
  160.       checker.seek(position);
  161.       int nread = checker.read(b, off, len);
  162.       checker.close();
  163.       return nread;
  164.     }
  165.     
  166.     public void close() throws IOException {
  167.       datas.close();
  168.       if( sums != null ) {
  169.         sums.close();
  170.       }
  171.       set(fs.verifyChecksum, null, 1, 0);
  172.     }
  173.     
  174.     @Override
  175.     public boolean seekToNewSource(long targetPos) throws IOException {
  176.       long sumsPos = getChecksumFilePos(targetPos);
  177.       fs.reportChecksumFailure(file, datas, targetPos, sums, sumsPos);
  178.       boolean newDataSource = datas.seekToNewSource(targetPos);
  179.       return sums.seekToNewSource(sumsPos) || newDataSource;
  180.     }
  181.     @Override
  182.     protected int readChunk(long pos, byte[] buf, int offset, int len,
  183.         byte[] checksum) throws IOException {
  184.       boolean eof = false;
  185.       if(needChecksum()) {
  186.         try {
  187.           long checksumPos = getChecksumFilePos(pos); 
  188.           if(checksumPos != sums.getPos()) {
  189.             sums.seek(checksumPos);
  190.           }
  191.           sums.readFully(checksum);
  192.         } catch (EOFException e) {
  193.           eof = true;
  194.         }
  195.         len = bytesPerSum;
  196.       }
  197.       if(pos != datas.getPos()) {
  198.         datas.seek(pos);
  199.       }
  200.       int nread = readFully(datas, buf, offset, len);
  201.       if( eof && nread > 0) {
  202.         throw new ChecksumException("Checksum error: "+file+" at "+pos, pos);
  203.       }
  204.       return nread;
  205.     }
  206.     
  207.     /* Return the file length */
  208.     private long getFileLength() throws IOException {
  209.       if( fileLen==-1L ) {
  210.         fileLen = fs.getContentSummary(file).getLength();
  211.       }
  212.       return fileLen;
  213.     }
  214.     
  215.     /**
  216.      * Skips over and discards <code>n</code> bytes of data from the
  217.      * input stream.
  218.      *
  219.      *The <code>skip</code> method skips over some smaller number of bytes
  220.      * when reaching end of file before <code>n</code> bytes have been skipped.
  221.      * The actual number of bytes skipped is returned.  If <code>n</code> is
  222.      * negative, no bytes are skipped.
  223.      *
  224.      * @param      n   the number of bytes to be skipped.
  225.      * @return     the actual number of bytes skipped.
  226.      * @exception  IOException  if an I/O error occurs.
  227.      *             ChecksumException if the chunk to skip to is corrupted
  228.      */
  229.     public synchronized long skip(long n) throws IOException {
  230.       long curPos = getPos();
  231.       long fileLength = getFileLength();
  232.       if( n+curPos > fileLength ) {
  233.         n = fileLength - curPos;
  234.       }
  235.       return super.skip(n);
  236.     }
  237.     
  238.     /**
  239.      * Seek to the given position in the stream.
  240.      * The next read() will be from that position.
  241.      * 
  242.      * <p>This method does not allow seek past the end of the file.
  243.      * This produces IOException.
  244.      *
  245.      * @param      pos   the postion to seek to.
  246.      * @exception  IOException  if an I/O error occurs or seeks after EOF
  247.      *             ChecksumException if the chunk to seek to is corrupted
  248.      */
  249.     public synchronized void seek(long pos) throws IOException {
  250.       if(pos>getFileLength()) {
  251.         throw new IOException("Cannot seek after EOF");
  252.       }
  253.       super.seek(pos);
  254.     }
  255.   }
  256.   /**
  257.    * Opens an FSDataInputStream at the indicated Path.
  258.    * @param f the file name to open
  259.    * @param bufferSize the size of the buffer to be used.
  260.    */
  261.   @Override
  262.   public FSDataInputStream open(Path f, int bufferSize) throws IOException {
  263.     return new FSDataInputStream(
  264.         new ChecksumFSInputChecker(this, f, bufferSize));
  265.   }
  266.   /** {@inheritDoc} */
  267.   public FSDataOutputStream append(Path f, int bufferSize,
  268.       Progressable progress) throws IOException {
  269.     throw new IOException("Not supported");
  270.   }
  271.   /**
  272.    * Calculated the length of the checksum file in bytes.
  273.    * @param size the length of the data file in bytes
  274.    * @param bytesPerSum the number of bytes in a checksum block
  275.    * @return the number of bytes in the checksum file
  276.    */
  277.   public static long getChecksumLength(long size, int bytesPerSum) {
  278.     //the checksum length is equal to size passed divided by bytesPerSum +
  279.     //bytes written in the beginning of the checksum file.  
  280.     return ((size + bytesPerSum - 1) / bytesPerSum) * 4 +
  281.              CHECKSUM_VERSION.length + 4;  
  282.   }
  283.   /** This class provides an output stream for a checksummed file.
  284.    * It generates checksums for data. */
  285.   private static class ChecksumFSOutputSummer extends FSOutputSummer {
  286.     private FSDataOutputStream datas;    
  287.     private FSDataOutputStream sums;
  288.     private static final float CHKSUM_AS_FRACTION = 0.01f;
  289.     
  290.     public ChecksumFSOutputSummer(ChecksumFileSystem fs, 
  291.                           Path file, 
  292.                           boolean overwrite, 
  293.                           short replication,
  294.                           long blockSize,
  295.                           Configuration conf)
  296.       throws IOException {
  297.       this(fs, file, overwrite, 
  298.            conf.getInt("io.file.buffer.size", 4096),
  299.            replication, blockSize, null);
  300.     }
  301.     
  302.     public ChecksumFSOutputSummer(ChecksumFileSystem fs, 
  303.                           Path file, 
  304.                           boolean overwrite,
  305.                           int bufferSize,
  306.                           short replication,
  307.                           long blockSize,
  308.                           Progressable progress)
  309.       throws IOException {
  310.       super(new CRC32(), fs.getBytesPerSum(), 4);
  311.       int bytesPerSum = fs.getBytesPerSum();
  312.       this.datas = fs.getRawFileSystem().create(file, overwrite, bufferSize, 
  313.                                          replication, blockSize, progress);
  314.       int sumBufferSize = fs.getSumBufferSize(bytesPerSum, bufferSize);
  315.       this.sums = fs.getRawFileSystem().create(fs.getChecksumFile(file), true, 
  316.                                                sumBufferSize, replication,
  317.                                                blockSize);
  318.       sums.write(CHECKSUM_VERSION, 0, CHECKSUM_VERSION.length);
  319.       sums.writeInt(bytesPerSum);
  320.     }
  321.     
  322.     public void close() throws IOException {
  323.       flushBuffer();
  324.       sums.close();
  325.       datas.close();
  326.     }
  327.     
  328.     @Override
  329.     protected void writeChunk(byte[] b, int offset, int len, byte[] checksum)
  330.     throws IOException {
  331.       datas.write(b, offset, len);
  332.       sums.write(checksum);
  333.     }
  334.   }
  335.   /** {@inheritDoc} */
  336.   @Override
  337.   public FSDataOutputStream create(Path f, FsPermission permission,
  338.       boolean overwrite, int bufferSize, short replication, long blockSize,
  339.       Progressable progress) throws IOException {
  340.     Path parent = f.getParent();
  341.     if (parent != null && !mkdirs(parent)) {
  342.       throw new IOException("Mkdirs failed to create " + parent);
  343.     }
  344.     final FSDataOutputStream out = new FSDataOutputStream(
  345.         new ChecksumFSOutputSummer(this, f, overwrite, bufferSize, replication,
  346.             blockSize, progress), null);
  347.     if (permission != null) {
  348.       setPermission(f, permission);
  349.     }
  350.     return out;
  351.   }
  352.   /**
  353.    * Set replication for an existing file.
  354.    * Implement the abstract <tt>setReplication</tt> of <tt>FileSystem</tt>
  355.    * @param src file name
  356.    * @param replication new replication
  357.    * @throws IOException
  358.    * @return true if successful;
  359.    *         false if file does not exist or is a directory
  360.    */
  361.   public boolean setReplication(Path src, short replication) throws IOException {
  362.     boolean value = fs.setReplication(src, replication);
  363.     if (!value)
  364.       return false;
  365.     Path checkFile = getChecksumFile(src);
  366.     if (exists(checkFile))
  367.       fs.setReplication(checkFile, replication);
  368.     return true;
  369.   }
  370.   /**
  371.    * Rename files/dirs
  372.    */
  373.   public boolean rename(Path src, Path dst) throws IOException {
  374.     if (fs.isDirectory(src)) {
  375.       return fs.rename(src, dst);
  376.     } else {
  377.       boolean value = fs.rename(src, dst);
  378.       if (!value)
  379.         return false;
  380.       Path checkFile = getChecksumFile(src);
  381.       if (fs.exists(checkFile)) { //try to rename checksum
  382.         if (fs.isDirectory(dst)) {
  383.           value = fs.rename(checkFile, dst);
  384.         } else {
  385.           value = fs.rename(checkFile, getChecksumFile(dst));
  386.         }
  387.       }
  388.       return value;
  389.     }
  390.   }
  391.   /**
  392.    * Implement the delete(Path, boolean) in checksum
  393.    * file system.
  394.    */
  395.   public boolean delete(Path f, boolean recursive) throws IOException{
  396.     FileStatus fstatus = null;
  397.     try {
  398.       fstatus = fs.getFileStatus(f);
  399.     } catch(FileNotFoundException e) {
  400.       return false;
  401.     }
  402.     if(fstatus.isDir()) {
  403.       //this works since the crcs are in the same
  404.       //directories and the files. so we just delete
  405.       //everything in the underlying filesystem
  406.       return fs.delete(f, recursive);
  407.     } else {
  408.       Path checkFile = getChecksumFile(f);
  409.       if (fs.exists(checkFile)) {
  410.         fs.delete(checkFile, true);
  411.       }
  412.       return fs.delete(f, true);
  413.     }
  414.   }
  415.     
  416.   final private static PathFilter DEFAULT_FILTER = new PathFilter() {
  417.     public boolean accept(Path file) {
  418.       return !isChecksumFile(file);
  419.     }
  420.   };
  421.   /**
  422.    * List the statuses of the files/directories in the given path if the path is
  423.    * a directory.
  424.    * 
  425.    * @param f
  426.    *          given path
  427.    * @return the statuses of the files/directories in the given patch
  428.    * @throws IOException
  429.    */
  430.   @Override
  431.   public FileStatus[] listStatus(Path f) throws IOException {
  432.     return fs.listStatus(f, DEFAULT_FILTER);
  433.   }
  434.   
  435.   @Override
  436.   public boolean mkdirs(Path f) throws IOException {
  437.     return fs.mkdirs(f);
  438.   }
  439.   @Override
  440.   public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
  441.     throws IOException {
  442.     Configuration conf = getConf();
  443.     FileUtil.copy(getLocal(conf), src, this, dst, delSrc, conf);
  444.   }
  445.   /**
  446.    * The src file is under FS, and the dst is on the local disk.
  447.    * Copy it from FS control to the local dst name.
  448.    */
  449.   @Override
  450.   public void copyToLocalFile(boolean delSrc, Path src, Path dst)
  451.     throws IOException {
  452.     Configuration conf = getConf();
  453.     FileUtil.copy(this, src, getLocal(conf), dst, delSrc, conf);
  454.   }
  455.   /**
  456.    * The src file is under FS, and the dst is on the local disk.
  457.    * Copy it from FS control to the local dst name.
  458.    * If src and dst are directories, the copyCrc parameter
  459.    * determines whether to copy CRC files.
  460.    */
  461.   public void copyToLocalFile(Path src, Path dst, boolean copyCrc)
  462.     throws IOException {
  463.     if (!fs.isDirectory(src)) { // source is a file
  464.       fs.copyToLocalFile(src, dst);
  465.       FileSystem localFs = getLocal(getConf()).getRawFileSystem();
  466.       if (localFs.isDirectory(dst)) {
  467.         dst = new Path(dst, src.getName());
  468.       }
  469.       dst = getChecksumFile(dst);
  470.       if (localFs.exists(dst)) { //remove old local checksum file
  471.         localFs.delete(dst, true);
  472.       }
  473.       Path checksumFile = getChecksumFile(src);
  474.       if (copyCrc && fs.exists(checksumFile)) { //copy checksum file
  475.         fs.copyToLocalFile(checksumFile, dst);
  476.       }
  477.     } else {
  478.       FileStatus[] srcs = listStatus(src);
  479.       for (FileStatus srcFile : srcs) {
  480.         copyToLocalFile(srcFile.getPath(), 
  481.                         new Path(dst, srcFile.getPath().getName()), copyCrc);
  482.       }
  483.     }
  484.   }
  485.   @Override
  486.   public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
  487.     throws IOException {
  488.     return tmpLocalFile;
  489.   }
  490.   @Override
  491.   public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
  492.     throws IOException {
  493.     moveFromLocalFile(tmpLocalFile, fsOutputFile);
  494.   }
  495.   /**
  496.    * Report a checksum error to the file system.
  497.    * @param f the file name containing the error
  498.    * @param in the stream open on the file
  499.    * @param inPos the position of the beginning of the bad data in the file
  500.    * @param sums the stream open on the checksum file
  501.    * @param sumsPos the position of the beginning of the bad data in the checksum file
  502.    * @return if retry is neccessary
  503.    */
  504.   public boolean reportChecksumFailure(Path f, FSDataInputStream in,
  505.                                        long inPos, FSDataInputStream sums, long sumsPos) {
  506.     return false;
  507.   }
  508. }