Jets3tFileSystemStore.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:11k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.fs.s3;
  19. import java.io.BufferedInputStream;
  20. import java.io.BufferedOutputStream;
  21. import java.io.Closeable;
  22. import java.io.File;
  23. import java.io.FileInputStream;
  24. import java.io.FileOutputStream;
  25. import java.io.IOException;
  26. import java.io.InputStream;
  27. import java.io.OutputStream;
  28. import java.net.URI;
  29. import java.util.HashMap;
  30. import java.util.Map;
  31. import java.util.Set;
  32. import java.util.TreeSet;
  33. import org.apache.hadoop.conf.Configuration;
  34. import org.apache.hadoop.fs.Path;
  35. import org.apache.hadoop.fs.s3.INode.FileType;
  36. import org.jets3t.service.S3Service;
  37. import org.jets3t.service.S3ServiceException;
  38. import org.jets3t.service.impl.rest.httpclient.RestS3Service;
  39. import org.jets3t.service.model.S3Bucket;
  40. import org.jets3t.service.model.S3Object;
  41. import org.jets3t.service.security.AWSCredentials;
  42. class Jets3tFileSystemStore implements FileSystemStore {
  43.   
  44.   private static final String FILE_SYSTEM_NAME = "fs";
  45.   private static final String FILE_SYSTEM_VALUE = "Hadoop";
  46.   private static final String FILE_SYSTEM_TYPE_NAME = "fs-type";
  47.   private static final String FILE_SYSTEM_TYPE_VALUE = "block";
  48.   private static final String FILE_SYSTEM_VERSION_NAME = "fs-version";
  49.   private static final String FILE_SYSTEM_VERSION_VALUE = "1";
  50.   
  51.   private static final Map<String, String> METADATA =
  52.     new HashMap<String, String>();
  53.   
  54.   static {
  55.     METADATA.put(FILE_SYSTEM_NAME, FILE_SYSTEM_VALUE);
  56.     METADATA.put(FILE_SYSTEM_TYPE_NAME, FILE_SYSTEM_TYPE_VALUE);
  57.     METADATA.put(FILE_SYSTEM_VERSION_NAME, FILE_SYSTEM_VERSION_VALUE);
  58.   }
  59.   private static final String PATH_DELIMITER = Path.SEPARATOR;
  60.   private static final String BLOCK_PREFIX = "block_";
  61.   private Configuration conf;
  62.   
  63.   private S3Service s3Service;
  64.   private S3Bucket bucket;
  65.   
  66.   private int bufferSize;
  67.   
  68.   public void initialize(URI uri, Configuration conf) throws IOException {
  69.     
  70.     this.conf = conf;
  71.     
  72.     S3Credentials s3Credentials = new S3Credentials();
  73.     s3Credentials.initialize(uri, conf);
  74.     try {
  75.       AWSCredentials awsCredentials =
  76.         new AWSCredentials(s3Credentials.getAccessKey(),
  77.             s3Credentials.getSecretAccessKey());
  78.       this.s3Service = new RestS3Service(awsCredentials);
  79.     } catch (S3ServiceException e) {
  80.       if (e.getCause() instanceof IOException) {
  81.         throw (IOException) e.getCause();
  82.       }
  83.       throw new S3Exception(e);
  84.     }
  85.     bucket = new S3Bucket(uri.getHost());
  86.     this.bufferSize = conf.getInt("io.file.buffer.size", 4096);
  87.   }
  88.   public String getVersion() throws IOException {
  89.     return FILE_SYSTEM_VERSION_VALUE;
  90.   }
  91.   private void delete(String key) throws IOException {
  92.     try {
  93.       s3Service.deleteObject(bucket, key);
  94.     } catch (S3ServiceException e) {
  95.       if (e.getCause() instanceof IOException) {
  96.         throw (IOException) e.getCause();
  97.       }
  98.       throw new S3Exception(e);
  99.     }
  100.   }
  101.   public void deleteINode(Path path) throws IOException {
  102.     delete(pathToKey(path));
  103.   }
  104.   public void deleteBlock(Block block) throws IOException {
  105.     delete(blockToKey(block));
  106.   }
  107.   public boolean inodeExists(Path path) throws IOException {
  108.     InputStream in = get(pathToKey(path), true);
  109.     if (in == null) {
  110.       return false;
  111.     }
  112.     in.close();
  113.     return true;
  114.   }
  115.   
  116.   public boolean blockExists(long blockId) throws IOException {
  117.     InputStream in = get(blockToKey(blockId), false);
  118.     if (in == null) {
  119.       return false;
  120.     }
  121.     in.close();
  122.     return true;
  123.   }
  124.   private InputStream get(String key, boolean checkMetadata)
  125.       throws IOException {
  126.     
  127.     try {
  128.       S3Object object = s3Service.getObject(bucket, key);
  129.       if (checkMetadata) {
  130.         checkMetadata(object);
  131.       }
  132.       return object.getDataInputStream();
  133.     } catch (S3ServiceException e) {
  134.       if ("NoSuchKey".equals(e.getS3ErrorCode())) {
  135.         return null;
  136.       }
  137.       if (e.getCause() instanceof IOException) {
  138.         throw (IOException) e.getCause();
  139.       }
  140.       throw new S3Exception(e);
  141.     }
  142.   }
  143.   private InputStream get(String key, long byteRangeStart) throws IOException {
  144.     try {
  145.       S3Object object = s3Service.getObject(bucket, key, null, null, null,
  146.                                             null, byteRangeStart, null);
  147.       return object.getDataInputStream();
  148.     } catch (S3ServiceException e) {
  149.       if ("NoSuchKey".equals(e.getS3ErrorCode())) {
  150.         return null;
  151.       }
  152.       if (e.getCause() instanceof IOException) {
  153.         throw (IOException) e.getCause();
  154.       }
  155.       throw new S3Exception(e);
  156.     }
  157.   }
  158.   private void checkMetadata(S3Object object) throws S3FileSystemException,
  159.       S3ServiceException {
  160.     
  161.     String name = (String) object.getMetadata(FILE_SYSTEM_NAME);
  162.     if (!FILE_SYSTEM_VALUE.equals(name)) {
  163.       throw new S3FileSystemException("Not a Hadoop S3 file.");
  164.     }
  165.     String type = (String) object.getMetadata(FILE_SYSTEM_TYPE_NAME);
  166.     if (!FILE_SYSTEM_TYPE_VALUE.equals(type)) {
  167.       throw new S3FileSystemException("Not a block file.");
  168.     }
  169.     String dataVersion = (String) object.getMetadata(FILE_SYSTEM_VERSION_NAME);
  170.     if (!FILE_SYSTEM_VERSION_VALUE.equals(dataVersion)) {
  171.       throw new VersionMismatchException(FILE_SYSTEM_VERSION_VALUE,
  172.           dataVersion);
  173.     }
  174.   }
  175.   public INode retrieveINode(Path path) throws IOException {
  176.     return INode.deserialize(get(pathToKey(path), true));
  177.   }
  178.   public File retrieveBlock(Block block, long byteRangeStart)
  179.     throws IOException {
  180.     File fileBlock = null;
  181.     InputStream in = null;
  182.     OutputStream out = null;
  183.     try {
  184.       fileBlock = newBackupFile();
  185.       in = get(blockToKey(block), byteRangeStart);
  186.       out = new BufferedOutputStream(new FileOutputStream(fileBlock));
  187.       byte[] buf = new byte[bufferSize];
  188.       int numRead;
  189.       while ((numRead = in.read(buf)) >= 0) {
  190.         out.write(buf, 0, numRead);
  191.       }
  192.       return fileBlock;
  193.     } catch (IOException e) {
  194.       // close output stream to file then delete file
  195.       closeQuietly(out);
  196.       out = null; // to prevent a second close
  197.       if (fileBlock != null) {
  198.         fileBlock.delete();
  199.       }
  200.       throw e;
  201.     } finally {
  202.       closeQuietly(out);
  203.       closeQuietly(in);
  204.     }
  205.   }
  206.   
  207.   private File newBackupFile() throws IOException {
  208.     File dir = new File(conf.get("fs.s3.buffer.dir"));
  209.     if (!dir.exists() && !dir.mkdirs()) {
  210.       throw new IOException("Cannot create S3 buffer directory: " + dir);
  211.     }
  212.     File result = File.createTempFile("input-", ".tmp", dir);
  213.     result.deleteOnExit();
  214.     return result;
  215.   }
  216.   public Set<Path> listSubPaths(Path path) throws IOException {
  217.     try {
  218.       String prefix = pathToKey(path);
  219.       if (!prefix.endsWith(PATH_DELIMITER)) {
  220.         prefix += PATH_DELIMITER;
  221.       }
  222.       S3Object[] objects = s3Service.listObjects(bucket, prefix, PATH_DELIMITER);
  223.       Set<Path> prefixes = new TreeSet<Path>();
  224.       for (int i = 0; i < objects.length; i++) {
  225.         prefixes.add(keyToPath(objects[i].getKey()));
  226.       }
  227.       prefixes.remove(path);
  228.       return prefixes;
  229.     } catch (S3ServiceException e) {
  230.       if (e.getCause() instanceof IOException) {
  231.         throw (IOException) e.getCause();
  232.       }
  233.       throw new S3Exception(e);
  234.     }
  235.   }
  236.   
  237.   public Set<Path> listDeepSubPaths(Path path) throws IOException {
  238.     try {
  239.       String prefix = pathToKey(path);
  240.       if (!prefix.endsWith(PATH_DELIMITER)) {
  241.         prefix += PATH_DELIMITER;
  242.       }
  243.       S3Object[] objects = s3Service.listObjects(bucket, prefix, null);
  244.       Set<Path> prefixes = new TreeSet<Path>();
  245.       for (int i = 0; i < objects.length; i++) {
  246.         prefixes.add(keyToPath(objects[i].getKey()));
  247.       }
  248.       prefixes.remove(path);
  249.       return prefixes;
  250.     } catch (S3ServiceException e) {
  251.       if (e.getCause() instanceof IOException) {
  252.         throw (IOException) e.getCause();
  253.       }
  254.       throw new S3Exception(e);
  255.     }    
  256.   }
  257.   private void put(String key, InputStream in, long length, boolean storeMetadata)
  258.       throws IOException {
  259.     
  260.     try {
  261.       S3Object object = new S3Object(key);
  262.       object.setDataInputStream(in);
  263.       object.setContentType("binary/octet-stream");
  264.       object.setContentLength(length);
  265.       if (storeMetadata) {
  266.         object.addAllMetadata(METADATA);
  267.       }
  268.       s3Service.putObject(bucket, object);
  269.     } catch (S3ServiceException e) {
  270.       if (e.getCause() instanceof IOException) {
  271.         throw (IOException) e.getCause();
  272.       }
  273.       throw new S3Exception(e);
  274.     }
  275.   }
  276.   public void storeINode(Path path, INode inode) throws IOException {
  277.     put(pathToKey(path), inode.serialize(), inode.getSerializedLength(), true);
  278.   }
  279.   public void storeBlock(Block block, File file) throws IOException {
  280.     BufferedInputStream in = null;
  281.     try {
  282.       in = new BufferedInputStream(new FileInputStream(file));
  283.       put(blockToKey(block), in, block.getLength(), false);
  284.     } finally {
  285.       closeQuietly(in);
  286.     }    
  287.   }
  288.   private void closeQuietly(Closeable closeable) {
  289.     if (closeable != null) {
  290.       try {
  291.         closeable.close();
  292.       } catch (IOException e) {
  293.         // ignore
  294.       }
  295.     }
  296.   }
  297.   private String pathToKey(Path path) {
  298.     if (!path.isAbsolute()) {
  299.       throw new IllegalArgumentException("Path must be absolute: " + path);
  300.     }
  301.     return path.toUri().getPath();
  302.   }
  303.   private Path keyToPath(String key) {
  304.     return new Path(key);
  305.   }
  306.   
  307.   private String blockToKey(long blockId) {
  308.     return BLOCK_PREFIX + blockId;
  309.   }
  310.   private String blockToKey(Block block) {
  311.     return blockToKey(block.getId());
  312.   }
  313.   public void purge() throws IOException {
  314.     try {
  315.       S3Object[] objects = s3Service.listObjects(bucket);
  316.       for (int i = 0; i < objects.length; i++) {
  317.         s3Service.deleteObject(bucket, objects[i].getKey());
  318.       }
  319.     } catch (S3ServiceException e) {
  320.       if (e.getCause() instanceof IOException) {
  321.         throw (IOException) e.getCause();
  322.       }
  323.       throw new S3Exception(e);
  324.     }
  325.   }
  326.   public void dump() throws IOException {
  327.     StringBuilder sb = new StringBuilder("S3 Filesystem, ");
  328.     sb.append(bucket.getName()).append("n");
  329.     try {
  330.       S3Object[] objects = s3Service.listObjects(bucket, PATH_DELIMITER, null);
  331.       for (int i = 0; i < objects.length; i++) {
  332.         Path path = keyToPath(objects[i].getKey());
  333.         sb.append(path).append("n");
  334.         INode m = retrieveINode(path);
  335.         sb.append("t").append(m.getFileType()).append("n");
  336.         if (m.getFileType() == FileType.DIRECTORY) {
  337.           continue;
  338.         }
  339.         for (int j = 0; j < m.getBlocks().length; j++) {
  340.           sb.append("t").append(m.getBlocks()[j]).append("n");
  341.         }
  342.       }
  343.     } catch (S3ServiceException e) {
  344.       if (e.getCause() instanceof IOException) {
  345.         throw (IOException) e.getCause();
  346.       }
  347.       throw new S3Exception(e);
  348.     }
  349.     System.out.println(sb);
  350.   }
  351. }