S3FileSystem.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:11k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.fs.s3;
  19. import java.io.FileNotFoundException;
  20. import java.io.IOException;
  21. import java.net.URI;
  22. import java.util.ArrayList;
  23. import java.util.HashMap;
  24. import java.util.List;
  25. import java.util.Map;
  26. import java.util.concurrent.TimeUnit;
  27. import org.apache.hadoop.conf.Configuration;
  28. import org.apache.hadoop.fs.FSDataInputStream;
  29. import org.apache.hadoop.fs.FSDataOutputStream;
  30. import org.apache.hadoop.fs.FileStatus;
  31. import org.apache.hadoop.fs.FileSystem;
  32. import org.apache.hadoop.fs.Path;
  33. import org.apache.hadoop.fs.permission.FsPermission;
  34. import org.apache.hadoop.fs.s3native.NativeS3FileSystem;
  35. import org.apache.hadoop.io.retry.RetryPolicies;
  36. import org.apache.hadoop.io.retry.RetryPolicy;
  37. import org.apache.hadoop.io.retry.RetryProxy;
  38. import org.apache.hadoop.util.Progressable;
  39. /**
  40.  * <p>
  41.  * A block-based {@link FileSystem} backed by
  42.  * <a href="http://aws.amazon.com/s3">Amazon S3</a>.
  43.  * </p>
  44.  * @see NativeS3FileSystem
  45.  */
  46. public class S3FileSystem extends FileSystem {
  47.   private URI uri;
  48.   private FileSystemStore store;
  49.   private Path workingDir;
  50.   public S3FileSystem() {
  51.     // set store in initialize()
  52.   }
  53.   
  54.   public S3FileSystem(FileSystemStore store) {
  55.     this.store = store;
  56.   }
  57.   @Override
  58.   public URI getUri() {
  59.     return uri;
  60.   }
  61.   @Override
  62.   public void initialize(URI uri, Configuration conf) throws IOException {
  63.     super.initialize(uri, conf);
  64.     if (store == null) {
  65.       store = createDefaultStore(conf);
  66.     }
  67.     store.initialize(uri, conf);
  68.     setConf(conf);
  69.     this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());    
  70.     this.workingDir =
  71.       new Path("/user", System.getProperty("user.name")).makeQualified(this);
  72.   }  
  73.   private static FileSystemStore createDefaultStore(Configuration conf) {
  74.     FileSystemStore store = new Jets3tFileSystemStore();
  75.     
  76.     RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
  77.                                                                                conf.getInt("fs.s3.maxRetries", 4),
  78.                                                                                conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS);
  79.     Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
  80.       new HashMap<Class<? extends Exception>, RetryPolicy>();
  81.     exceptionToPolicyMap.put(IOException.class, basePolicy);
  82.     exceptionToPolicyMap.put(S3Exception.class, basePolicy);
  83.     
  84.     RetryPolicy methodPolicy = RetryPolicies.retryByException(
  85.                                                               RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
  86.     Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>();
  87.     methodNameToPolicyMap.put("storeBlock", methodPolicy);
  88.     methodNameToPolicyMap.put("retrieveBlock", methodPolicy);
  89.     
  90.     return (FileSystemStore) RetryProxy.create(FileSystemStore.class,
  91.                                                store, methodNameToPolicyMap);
  92.   }
  93.   
  94.   @Override
  95.   public String getName() {
  96.     return getUri().toString();
  97.   }
  98.   @Override
  99.   public Path getWorkingDirectory() {
  100.     return workingDir;
  101.   }
  102.   @Override
  103.   public void setWorkingDirectory(Path dir) {
  104.     workingDir = makeAbsolute(dir);
  105.   }
  106.   private Path makeAbsolute(Path path) {
  107.     if (path.isAbsolute()) {
  108.       return path;
  109.     }
  110.     return new Path(workingDir, path);
  111.   }
  112.   /**
  113.    * @param permission Currently ignored.
  114.    */
  115.   @Override
  116.   public boolean mkdirs(Path path, FsPermission permission) throws IOException {
  117.     Path absolutePath = makeAbsolute(path);
  118.     List<Path> paths = new ArrayList<Path>();
  119.     do {
  120.       paths.add(0, absolutePath);
  121.       absolutePath = absolutePath.getParent();
  122.     } while (absolutePath != null);
  123.     
  124.     boolean result = true;
  125.     for (Path p : paths) {
  126.       result &= mkdir(p);
  127.     }
  128.     return result;
  129.   }
  130.   
  131.   private boolean mkdir(Path path) throws IOException {
  132.     Path absolutePath = makeAbsolute(path);
  133.     INode inode = store.retrieveINode(absolutePath);
  134.     if (inode == null) {
  135.       store.storeINode(absolutePath, INode.DIRECTORY_INODE);
  136.     } else if (inode.isFile()) {
  137.       throw new IOException(String.format(
  138.           "Can't make directory for path %s since it is a file.",
  139.           absolutePath));
  140.     }
  141.     return true;
  142.   }
  143.   @Override
  144.   public boolean isFile(Path path) throws IOException {
  145.     INode inode = store.retrieveINode(makeAbsolute(path));
  146.     if (inode == null) {
  147.       return false;
  148.     }
  149.     return inode.isFile();
  150.   }
  151.   private INode checkFile(Path path) throws IOException {
  152.     INode inode = store.retrieveINode(makeAbsolute(path));
  153.     if (inode == null) {
  154.       throw new IOException("No such file.");
  155.     }
  156.     if (inode.isDirectory()) {
  157.       throw new IOException("Path " + path + " is a directory.");
  158.     }
  159.     return inode;
  160.   }
  161.   @Override
  162.   public FileStatus[] listStatus(Path f) throws IOException {
  163.     Path absolutePath = makeAbsolute(f);
  164.     INode inode = store.retrieveINode(absolutePath);
  165.     if (inode == null) {
  166.       return null;
  167.     }
  168.     if (inode.isFile()) {
  169.       return new FileStatus[] {
  170.         new S3FileStatus(f.makeQualified(this), inode)
  171.       };
  172.     }
  173.     ArrayList<FileStatus> ret = new ArrayList<FileStatus>();
  174.     for (Path p : store.listSubPaths(absolutePath)) {
  175.       ret.add(getFileStatus(p.makeQualified(this)));
  176.     }
  177.     return ret.toArray(new FileStatus[0]);
  178.   }
  179.   /** This optional operation is not yet supported. */
  180.   public FSDataOutputStream append(Path f, int bufferSize,
  181.       Progressable progress) throws IOException {
  182.     throw new IOException("Not supported");
  183.   }
  184.   /**
  185.    * @param permission Currently ignored.
  186.    */
  187.   @Override
  188.   public FSDataOutputStream create(Path file, FsPermission permission,
  189.       boolean overwrite, int bufferSize,
  190.       short replication, long blockSize, Progressable progress)
  191.     throws IOException {
  192.     INode inode = store.retrieveINode(makeAbsolute(file));
  193.     if (inode != null) {
  194.       if (overwrite) {
  195.         delete(file);
  196.       } else {
  197.         throw new IOException("File already exists: " + file);
  198.       }
  199.     } else {
  200.       Path parent = file.getParent();
  201.       if (parent != null) {
  202.         if (!mkdirs(parent)) {
  203.           throw new IOException("Mkdirs failed to create " + parent.toString());
  204.         }
  205.       }      
  206.     }
  207.     return new FSDataOutputStream
  208.         (new S3OutputStream(getConf(), store, makeAbsolute(file),
  209.                             blockSize, progress, bufferSize),
  210.          statistics);
  211.   }
  212.   @Override
  213.   public FSDataInputStream open(Path path, int bufferSize) throws IOException {
  214.     INode inode = checkFile(path);
  215.     return new FSDataInputStream(new S3InputStream(getConf(), store, inode,
  216.                                                    statistics));
  217.   }
  218.   @Override
  219.   public boolean rename(Path src, Path dst) throws IOException {
  220.     Path absoluteSrc = makeAbsolute(src);
  221.     INode srcINode = store.retrieveINode(absoluteSrc);
  222.     if (srcINode == null) {
  223.       // src path doesn't exist
  224.       return false; 
  225.     }
  226.     Path absoluteDst = makeAbsolute(dst);
  227.     INode dstINode = store.retrieveINode(absoluteDst);
  228.     if (dstINode != null && dstINode.isDirectory()) {
  229.       absoluteDst = new Path(absoluteDst, absoluteSrc.getName());
  230.       dstINode = store.retrieveINode(absoluteDst);
  231.     }
  232.     if (dstINode != null) {
  233.       // dst path already exists - can't overwrite
  234.       return false;
  235.     }
  236.     Path dstParent = absoluteDst.getParent();
  237.     if (dstParent != null) {
  238.       INode dstParentINode = store.retrieveINode(dstParent);
  239.       if (dstParentINode == null || dstParentINode.isFile()) {
  240.         // dst parent doesn't exist or is a file
  241.         return false;
  242.       }
  243.     }
  244.     return renameRecursive(absoluteSrc, absoluteDst);
  245.   }
  246.   
  247.   private boolean renameRecursive(Path src, Path dst) throws IOException {
  248.     INode srcINode = store.retrieveINode(src);
  249.     store.storeINode(dst, srcINode);
  250.     store.deleteINode(src);
  251.     if (srcINode.isDirectory()) {
  252.       for (Path oldSrc : store.listDeepSubPaths(src)) {
  253.         INode inode = store.retrieveINode(oldSrc);
  254.         if (inode == null) {
  255.           return false;
  256.         }
  257.         String oldSrcPath = oldSrc.toUri().getPath();
  258.         String srcPath = src.toUri().getPath();
  259.         String dstPath = dst.toUri().getPath();
  260.         Path newDst = new Path(oldSrcPath.replaceFirst(srcPath, dstPath));
  261.         store.storeINode(newDst, inode);
  262.         store.deleteINode(oldSrc);
  263.       }
  264.     }
  265.     return true;
  266.   }
  267.   public boolean delete(Path path, boolean recursive) throws IOException {
  268.    Path absolutePath = makeAbsolute(path);
  269.    INode inode = store.retrieveINode(absolutePath);
  270.    if (inode == null) {
  271.      return false;
  272.    }
  273.    if (inode.isFile()) {
  274.      store.deleteINode(absolutePath);
  275.      for (Block block: inode.getBlocks()) {
  276.        store.deleteBlock(block);
  277.      }
  278.    } else {
  279.      FileStatus[] contents = listStatus(absolutePath);
  280.      if (contents == null) {
  281.        return false;
  282.      }
  283.      if ((contents.length !=0) && (!recursive)) {
  284.        throw new IOException("Directory " + path.toString() 
  285.            + " is not empty.");
  286.      }
  287.      for (FileStatus p:contents) {
  288.        if (!delete(p.getPath(), recursive)) {
  289.          return false;
  290.        }
  291.      }
  292.      store.deleteINode(absolutePath);
  293.    }
  294.    return true;
  295.   }
  296.   
  297.   @Override
  298.   @Deprecated
  299.   public boolean delete(Path path) throws IOException {
  300.     return delete(path, true);
  301.   }
  302.   /**
  303.    * FileStatus for S3 file systems. 
  304.    */
  305.   @Override
  306.   public FileStatus getFileStatus(Path f)  throws IOException {
  307.     INode inode = store.retrieveINode(makeAbsolute(f));
  308.     if (inode == null) {
  309.       throw new FileNotFoundException(f + ": No such file or directory.");
  310.     }
  311.     return new S3FileStatus(f.makeQualified(this), inode);
  312.   }
  313.   // diagnostic methods
  314.   void dump() throws IOException {
  315.     store.dump();
  316.   }
  317.   void purge() throws IOException {
  318.     store.purge();
  319.   }
  320.   private static class S3FileStatus extends FileStatus {
  321.     S3FileStatus(Path f, INode inode) throws IOException {
  322.       super(findLength(inode), inode.isDirectory(), 1,
  323.             findBlocksize(inode), 0, f);
  324.     }
  325.     private static long findLength(INode inode) {
  326.       if (!inode.isDirectory()) {
  327.         long length = 0L;
  328.         for (Block block : inode.getBlocks()) {
  329.           length += block.getLength();
  330.         }
  331.         return length;
  332.       }
  333.       return 0;
  334.     }
  335.     private static long findBlocksize(INode inode) {
  336.       final Block[] ret = inode.getBlocks();
  337.       return ret == null ? 0L : ret[0].getLength();
  338.     }
  339.   }
  340. }