NativeS3FileSystem.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:17k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.fs.s3native;
  19. import java.io.BufferedOutputStream;
  20. import java.io.File;
  21. import java.io.FileNotFoundException;
  22. import java.io.FileOutputStream;
  23. import java.io.IOException;
  24. import java.io.InputStream;
  25. import java.io.OutputStream;
  26. import java.net.URI;
  27. import java.security.DigestOutputStream;
  28. import java.security.MessageDigest;
  29. import java.security.NoSuchAlgorithmException;
  30. import java.util.ArrayList;
  31. import java.util.HashMap;
  32. import java.util.List;
  33. import java.util.Map;
  34. import java.util.Set;
  35. import java.util.TreeSet;
  36. import java.util.concurrent.TimeUnit;
  37. import org.apache.commons.logging.Log;
  38. import org.apache.commons.logging.LogFactory;
  39. import org.apache.hadoop.conf.Configuration;
  40. import org.apache.hadoop.fs.BufferedFSInputStream;
  41. import org.apache.hadoop.fs.FSDataInputStream;
  42. import org.apache.hadoop.fs.FSDataOutputStream;
  43. import org.apache.hadoop.fs.FSInputStream;
  44. import org.apache.hadoop.fs.FileStatus;
  45. import org.apache.hadoop.fs.FileSystem;
  46. import org.apache.hadoop.fs.Path;
  47. import org.apache.hadoop.fs.permission.FsPermission;
  48. import org.apache.hadoop.fs.s3.S3Exception;
  49. import org.apache.hadoop.io.retry.RetryPolicies;
  50. import org.apache.hadoop.io.retry.RetryPolicy;
  51. import org.apache.hadoop.io.retry.RetryProxy;
  52. import org.apache.hadoop.util.Progressable;
  53. /**
  54.  * <p>
  55.  * A {@link FileSystem} for reading and writing files stored on
  56.  * <a href="http://aws.amazon.com/s3">Amazon S3</a>.
  57.  * Unlike {@link org.apache.hadoop.fs.s3.S3FileSystem} this implementation
  58.  * stores files on S3 in their
  59.  * native form so they can be read by other S3 tools.
  60.  * </p>
  61.  * @see org.apache.hadoop.fs.s3.S3FileSystem
  62.  */
  63. public class NativeS3FileSystem extends FileSystem {
  64.   
  65.   public static final Log LOG = 
  66.     LogFactory.getLog(NativeS3FileSystem.class);
  67.   
  68.   private static final String FOLDER_SUFFIX = "_$folder$";
  69.   private static final long MAX_S3_FILE_SIZE = 5 * 1024 * 1024 * 1024L;
  70.   static final String PATH_DELIMITER = Path.SEPARATOR;
  71.   private static final int S3_MAX_LISTING_LENGTH = 1000;
  72.   
  73.   private class NativeS3FsInputStream extends FSInputStream {
  74.     
  75.     private InputStream in;
  76.     private final String key;
  77.     private long pos = 0;
  78.     
  79.     public NativeS3FsInputStream(InputStream in, String key) {
  80.       this.in = in;
  81.       this.key = key;
  82.     }
  83.     
  84.     public synchronized int read() throws IOException {
  85.       int result = in.read();
  86.       if (result != -1) {
  87.         pos++;
  88.       }
  89.       return result;
  90.     }
  91.     public synchronized int read(byte[] b, int off, int len)
  92.       throws IOException {
  93.       
  94.       int result = in.read(b, off, len);
  95.       if (result > 0) {
  96.         pos += result;
  97.       }
  98.       return result;
  99.     }
  100.     public void close() throws IOException {
  101.       in.close();
  102.     }
  103.     public synchronized void seek(long pos) throws IOException {
  104.       in.close();
  105.       in = store.retrieve(key, pos);
  106.       this.pos = pos;
  107.     }
  108.     public synchronized long getPos() throws IOException {
  109.       return pos;
  110.     }
  111.     public boolean seekToNewSource(long targetPos) throws IOException {
  112.       return false;
  113.     }
  114.   }
  115.   
  116.   private class NativeS3FsOutputStream extends OutputStream {
  117.     
  118.     private Configuration conf;
  119.     private String key;
  120.     private File backupFile;
  121.     private OutputStream backupStream;
  122.     private MessageDigest digest;
  123.     private boolean closed;
  124.     
  125.     public NativeS3FsOutputStream(Configuration conf,
  126.         NativeFileSystemStore store, String key, Progressable progress,
  127.         int bufferSize) throws IOException {
  128.       this.conf = conf;
  129.       this.key = key;
  130.       this.backupFile = newBackupFile();
  131.       try {
  132.         this.digest = MessageDigest.getInstance("MD5");
  133.         this.backupStream = new BufferedOutputStream(new DigestOutputStream(
  134.             new FileOutputStream(backupFile), this.digest));
  135.       } catch (NoSuchAlgorithmException e) {
  136.         LOG.warn("Cannot load MD5 digest algorithm," +
  137.             "skipping message integrity check.", e);
  138.         this.backupStream = new BufferedOutputStream(
  139.             new FileOutputStream(backupFile));
  140.       }
  141.     }
  142.     private File newBackupFile() throws IOException {
  143.       File dir = new File(conf.get("fs.s3.buffer.dir"));
  144.       if (!dir.mkdirs() && !dir.exists()) {
  145.         throw new IOException("Cannot create S3 buffer directory: " + dir);
  146.       }
  147.       File result = File.createTempFile("output-", ".tmp", dir);
  148.       result.deleteOnExit();
  149.       return result;
  150.     }
  151.     
  152.     @Override
  153.     public void flush() throws IOException {
  154.       backupStream.flush();
  155.     }
  156.     
  157.     @Override
  158.     public synchronized void close() throws IOException {
  159.       if (closed) {
  160.         return;
  161.       }
  162.       backupStream.close();
  163.       
  164.       try {
  165.         byte[] md5Hash = digest == null ? null : digest.digest();
  166.         store.storeFile(key, backupFile, md5Hash);
  167.       } finally {
  168.         if (!backupFile.delete()) {
  169.           LOG.warn("Could not delete temporary s3n file: " + backupFile);
  170.         }
  171.         super.close();
  172.         closed = true;
  173.       } 
  174.     }
  175.     @Override
  176.     public void write(int b) throws IOException {
  177.       backupStream.write(b);
  178.     }
  179.     @Override
  180.     public void write(byte[] b, int off, int len) throws IOException {
  181.       backupStream.write(b, off, len);
  182.     }
  183.     
  184.     
  185.   }
  186.   
  187.   private URI uri;
  188.   private NativeFileSystemStore store;
  189.   private Path workingDir;
  190.   
  191.   public NativeS3FileSystem() {
  192.     // set store in initialize()
  193.   }
  194.   
  195.   public NativeS3FileSystem(NativeFileSystemStore store) {
  196.     this.store = store;
  197.   }
  198.   
  199.   @Override
  200.   public void initialize(URI uri, Configuration conf) throws IOException {
  201.     super.initialize(uri, conf);
  202.     if (store == null) {
  203.       store = createDefaultStore(conf);
  204.     }
  205.     store.initialize(uri, conf);
  206.     setConf(conf);
  207.     this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
  208.     this.workingDir =
  209.       new Path("/user", System.getProperty("user.name")).makeQualified(this);
  210.   }
  211.   
  212.   private static NativeFileSystemStore createDefaultStore(Configuration conf) {
  213.     NativeFileSystemStore store = new Jets3tNativeFileSystemStore();
  214.     
  215.     RetryPolicy basePolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
  216.         conf.getInt("fs.s3.maxRetries", 4),
  217.         conf.getLong("fs.s3.sleepTimeSeconds", 10), TimeUnit.SECONDS);
  218.     Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
  219.       new HashMap<Class<? extends Exception>, RetryPolicy>();
  220.     exceptionToPolicyMap.put(IOException.class, basePolicy);
  221.     exceptionToPolicyMap.put(S3Exception.class, basePolicy);
  222.     
  223.     RetryPolicy methodPolicy = RetryPolicies.retryByException(
  224.         RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
  225.     Map<String, RetryPolicy> methodNameToPolicyMap =
  226.       new HashMap<String, RetryPolicy>();
  227.     methodNameToPolicyMap.put("storeFile", methodPolicy);
  228.     
  229.     return (NativeFileSystemStore)
  230.       RetryProxy.create(NativeFileSystemStore.class, store,
  231.           methodNameToPolicyMap);
  232.   }
  233.   
  234.   private static String pathToKey(Path path) {
  235.     if (!path.isAbsolute()) {
  236.       throw new IllegalArgumentException("Path must be absolute: " + path);
  237.     }
  238.     return path.toUri().getPath().substring(1); // remove initial slash
  239.   }
  240.   
  241.   private static Path keyToPath(String key) {
  242.     return new Path("/" + key);
  243.   }
  244.   
  245.   private Path makeAbsolute(Path path) {
  246.     if (path.isAbsolute()) {
  247.       return path;
  248.     }
  249.     return new Path(workingDir, path);
  250.   }
  251.   /** This optional operation is not yet supported. */
  252.   public FSDataOutputStream append(Path f, int bufferSize,
  253.       Progressable progress) throws IOException {
  254.     throw new IOException("Not supported");
  255.   }
  256.   
  257.   @Override
  258.   public FSDataOutputStream create(Path f, FsPermission permission,
  259.       boolean overwrite, int bufferSize, short replication, long blockSize,
  260.       Progressable progress) throws IOException {
  261.     if (exists(f) && !overwrite) {
  262.       throw new IOException("File already exists:"+f);
  263.     }
  264.     Path absolutePath = makeAbsolute(f);
  265.     String key = pathToKey(absolutePath);
  266.     return new FSDataOutputStream(new NativeS3FsOutputStream(getConf(), store,
  267.         key, progress, bufferSize), statistics);
  268.   }
  269.   
  270.   @Override
  271.   @Deprecated
  272.   public boolean delete(Path path) throws IOException {
  273.     return delete(path, true);
  274.   }
  275.   @Override
  276.   public boolean delete(Path f, boolean recursive) throws IOException {
  277.     FileStatus status;
  278.     try {
  279.       status = getFileStatus(f);
  280.     } catch (FileNotFoundException e) {
  281.       return false;
  282.     }
  283.     Path absolutePath = makeAbsolute(f);
  284.     String key = pathToKey(absolutePath);
  285.     if (status.isDir()) {
  286.       FileStatus[] contents = listStatus(f);
  287.       if (!recursive && contents.length > 0) {
  288.         throw new IOException("Directory " + f.toString() + " is not empty.");
  289.       }
  290.       for (FileStatus p : contents) {
  291.         if (!delete(p.getPath(), recursive)) {
  292.           return false;
  293.         }
  294.       }
  295.       store.delete(key + FOLDER_SUFFIX);
  296.     } else {
  297.       store.delete(key);
  298.     }
  299.     return true;
  300.   }
  301.   @Override
  302.   public FileStatus getFileStatus(Path f) throws IOException {
  303.     
  304.     Path absolutePath = makeAbsolute(f);
  305.     String key = pathToKey(absolutePath);
  306.     
  307.     if (key.length() == 0) { // root always exists
  308.       return newDirectory(absolutePath);
  309.     }
  310.     
  311.     FileMetadata meta = store.retrieveMetadata(key);
  312.     if (meta != null) {
  313.       return newFile(meta, absolutePath);
  314.     }
  315.     if (store.retrieveMetadata(key + FOLDER_SUFFIX) != null) {
  316.       return newDirectory(absolutePath);
  317.     }
  318.     
  319.     PartialListing listing = store.list(key, 1);
  320.     if (listing.getFiles().length > 0 ||
  321.         listing.getCommonPrefixes().length > 0) {
  322.       return newDirectory(absolutePath);
  323.     }
  324.     
  325.     throw new FileNotFoundException(absolutePath +
  326.         ": No such file or directory.");
  327.     
  328.   }
  329.   @Override
  330.   public URI getUri() {
  331.     return uri;
  332.   }
  333.   /**
  334.    * <p>
  335.    * If <code>f</code> is a file, this method will make a single call to S3.
  336.    * If <code>f</code> is a directory, this method will make a maximum of
  337.    * (<i>n</i> / 1000) + 2 calls to S3, where <i>n</i> is the total number of
  338.    * files and directories contained directly in <code>f</code>.
  339.    * </p>
  340.    */
  341.   @Override
  342.   public FileStatus[] listStatus(Path f) throws IOException {
  343.     Path absolutePath = makeAbsolute(f);
  344.     String key = pathToKey(absolutePath);
  345.     
  346.     if (key.length() > 0) {
  347.       FileMetadata meta = store.retrieveMetadata(key);
  348.       if (meta != null) {
  349.         return new FileStatus[] { newFile(meta, absolutePath) };
  350.       }
  351.     }
  352.     
  353.     URI pathUri = absolutePath.toUri();
  354.     Set<FileStatus> status = new TreeSet<FileStatus>();
  355.     String priorLastKey = null;
  356.     do {
  357.       PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, 
  358.           priorLastKey);
  359.       for (FileMetadata fileMetadata : listing.getFiles()) {
  360.         Path subpath = keyToPath(fileMetadata.getKey());
  361.         String relativePath = pathUri.relativize(subpath.toUri()).getPath();
  362.         if (relativePath.endsWith(FOLDER_SUFFIX)) {
  363.           status.add(newDirectory(new Path(absolutePath,
  364.               relativePath.substring(0,
  365.                   relativePath.indexOf(FOLDER_SUFFIX)))));
  366.         } else {
  367.           status.add(newFile(fileMetadata, subpath));
  368.         }
  369.       }
  370.       for (String commonPrefix : listing.getCommonPrefixes()) {
  371.         Path subpath = keyToPath(commonPrefix);
  372.         String relativePath = pathUri.relativize(subpath.toUri()).getPath();
  373.         status.add(newDirectory(new Path(absolutePath, relativePath)));
  374.       }
  375.       priorLastKey = listing.getPriorLastKey();
  376.     } while (priorLastKey != null);
  377.     
  378.     if (status.isEmpty() &&
  379.         store.retrieveMetadata(key + FOLDER_SUFFIX) == null) {
  380.       return null;
  381.     }
  382.     
  383.     return status.toArray(new FileStatus[0]);
  384.   }
  385.   
  386.   private FileStatus newFile(FileMetadata meta, Path path) {
  387.     return new FileStatus(meta.getLength(), false, 1, MAX_S3_FILE_SIZE,
  388.         meta.getLastModified(), path.makeQualified(this));
  389.   }
  390.   
  391.   private FileStatus newDirectory(Path path) {
  392.     return new FileStatus(0, true, 1, MAX_S3_FILE_SIZE, 0,
  393.         path.makeQualified(this));
  394.   }
  395.   @Override
  396.   public boolean mkdirs(Path f, FsPermission permission) throws IOException {
  397.     Path absolutePath = makeAbsolute(f);
  398.     List<Path> paths = new ArrayList<Path>();
  399.     do {
  400.       paths.add(0, absolutePath);
  401.       absolutePath = absolutePath.getParent();
  402.     } while (absolutePath != null);
  403.     
  404.     boolean result = true;
  405.     for (Path path : paths) {
  406.       result &= mkdir(path);
  407.     }
  408.     return result;
  409.   }
  410.   
  411.   private boolean mkdir(Path f) throws IOException {
  412.     try {
  413.       FileStatus fileStatus = getFileStatus(f);
  414.       if (!fileStatus.isDir()) {
  415.         throw new IOException(String.format(
  416.             "Can't make directory for path %s since it is a file.", f));
  417.       }
  418.     } catch (FileNotFoundException e) {
  419.       String key = pathToKey(f) + FOLDER_SUFFIX;
  420.       store.storeEmptyFile(key);    
  421.     }
  422.     return true;
  423.   }
  424.   @Override
  425.   public FSDataInputStream open(Path f, int bufferSize) throws IOException {
  426.     if (!exists(f)) {
  427.       throw new FileNotFoundException(f.toString());
  428.     }
  429.     Path absolutePath = makeAbsolute(f);
  430.     String key = pathToKey(absolutePath);
  431.     return new FSDataInputStream(new BufferedFSInputStream(
  432.         new NativeS3FsInputStream(store.retrieve(key), key), bufferSize));
  433.   }
  434.   
  435.   // rename() and delete() use this method to ensure that the parent directory
  436.   // of the source does not vanish.
  437.   private void createParent(Path path) throws IOException {
  438.       Path parent = path.getParent();
  439.       if (parent != null) {
  440.           String key = pathToKey(makeAbsolute(parent));
  441.           if (key.length() > 0) {
  442.               store.storeEmptyFile(key + FOLDER_SUFFIX);
  443.           }
  444.       }
  445.   }
  446.   
  447.   private boolean existsAndIsFile(Path f) throws IOException {
  448.     
  449.     Path absolutePath = makeAbsolute(f);
  450.     String key = pathToKey(absolutePath);
  451.     
  452.     if (key.length() == 0) {
  453.         return false;
  454.     }
  455.     
  456.     FileMetadata meta = store.retrieveMetadata(key);
  457.     if (meta != null) {
  458.         // S3 object with given key exists, so this is a file
  459.         return true;
  460.     }
  461.     
  462.     if (store.retrieveMetadata(key + FOLDER_SUFFIX) != null) {
  463.         // Signifies empty directory
  464.         return false;
  465.     }
  466.     
  467.     PartialListing listing = store.list(key, 1, null);
  468.     if (listing.getFiles().length > 0 ||
  469.         listing.getCommonPrefixes().length > 0) {
  470.         // Non-empty directory
  471.         return false;
  472.     }
  473.     
  474.     throw new FileNotFoundException(absolutePath +
  475.         ": No such file or directory");
  476. }
  477.   @Override
  478.   public boolean rename(Path src, Path dst) throws IOException {
  479.     String srcKey = pathToKey(makeAbsolute(src));
  480.     if (srcKey.length() == 0) {
  481.       // Cannot rename root of file system
  482.       return false;
  483.     }
  484.     // Figure out the final destination
  485.     String dstKey;
  486.     try {
  487.       boolean dstIsFile = existsAndIsFile(dst);
  488.       if (dstIsFile) {
  489.         // Attempting to overwrite a file using rename()
  490.         return false;
  491.       } else {
  492.         // Move to within the existent directory
  493.         dstKey = pathToKey(makeAbsolute(new Path(dst, src.getName())));
  494.       }
  495.     } catch (FileNotFoundException e) {
  496.       // dst doesn't exist, so we can proceed
  497.       dstKey = pathToKey(makeAbsolute(dst));
  498.       try {
  499.         if (!getFileStatus(dst.getParent()).isDir()) {
  500.           return false; // parent dst is a file
  501.         }
  502.       } catch (FileNotFoundException ex) {
  503.         return false; // parent dst does not exist
  504.       }
  505.     }
  506.     try {
  507.       boolean srcIsFile = existsAndIsFile(src);
  508.       if (srcIsFile) {
  509.         store.rename(srcKey, dstKey);
  510.       } else {
  511.         // Move the folder object
  512.         store.delete(srcKey + FOLDER_SUFFIX);
  513.         store.storeEmptyFile(dstKey + FOLDER_SUFFIX);
  514.         // Move everything inside the folder
  515.         String priorLastKey = null;
  516.         do {
  517.           PartialListing listing = store.listAll(srcKey, S3_MAX_LISTING_LENGTH,
  518.               priorLastKey);
  519.           for (FileMetadata file : listing.getFiles()) {
  520.             store.rename(file.getKey(), dstKey
  521.                 + file.getKey().substring(srcKey.length()));
  522.           }
  523.           priorLastKey = listing.getPriorLastKey();
  524.         } while (priorLastKey != null);
  525.       }
  526.       createParent(src);
  527.       return true;
  528.     } catch (FileNotFoundException e) {
  529.       // Source file does not exist;
  530.       return false;
  531.     }
  532.   }
  533.   /**
  534.    * Set the working directory to the given directory.
  535.    */
  536.   @Override
  537.   public void setWorkingDirectory(Path newDir) {
  538.     workingDir = newDir;
  539.   }
  540.   
  541.   @Override
  542.   public Path getWorkingDirectory() {
  543.     return workingDir;
  544.   }
  545. }