MigrationTool.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:9k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.fs.s3;
  19. import java.io.IOException;
  20. import java.io.InputStream;
  21. import java.io.UnsupportedEncodingException;
  22. import java.net.URI;
  23. import java.net.URLDecoder;
  24. import java.net.URLEncoder;
  25. import java.util.Set;
  26. import java.util.TreeSet;
  27. import org.apache.hadoop.conf.Configured;
  28. import org.apache.hadoop.fs.Path;
  29. import org.apache.hadoop.util.Tool;
  30. import org.apache.hadoop.util.ToolRunner;
  31. import org.jets3t.service.S3Service;
  32. import org.jets3t.service.S3ServiceException;
  33. import org.jets3t.service.impl.rest.httpclient.RestS3Service;
  34. import org.jets3t.service.model.S3Bucket;
  35. import org.jets3t.service.model.S3Object;
  36. import org.jets3t.service.security.AWSCredentials;
  37. /**
  38.  * <p>
  39.  * This class is a tool for migrating data from an older to a newer version
  40.  * of an S3 filesystem.
  41.  * </p>
  42.  * <p>
  43.  * All files in the filesystem are migrated by re-writing the block metadata
  44.  * - no datafiles are touched.
  45.  * </p>
  46.  */
  47. public class MigrationTool extends Configured implements Tool {
  48.   
  49.   private S3Service s3Service;
  50.   private S3Bucket bucket;
  51.   
  52.   public static void main(String[] args) throws Exception {
  53.     int res = ToolRunner.run(new MigrationTool(), args);
  54.     System.exit(res);
  55.   }
  56.   
  57.   public int run(String[] args) throws Exception {
  58.     
  59.     if (args.length == 0) {
  60.       System.err.println("Usage: MigrationTool <S3 file system URI>");
  61.       System.err.println("t<S3 file system URI>tfilesystem to migrate");
  62.       ToolRunner.printGenericCommandUsage(System.err);
  63.       return -1;
  64.     }
  65.     
  66.     URI uri = URI.create(args[0]);
  67.     
  68.     initialize(uri);
  69.     
  70.     FileSystemStore newStore = new Jets3tFileSystemStore();
  71.     newStore.initialize(uri, getConf());
  72.     
  73.     if (get("%2F") != null) { 
  74.       System.err.println("Current version number is [unversioned].");
  75.       System.err.println("Target version number is " +
  76.           newStore.getVersion() + ".");
  77.       Store oldStore = new UnversionedStore();
  78.       migrate(oldStore, newStore);
  79.       return 0;
  80.     } else {
  81.       S3Object root = get("/");
  82.       if (root != null) {
  83.         String version = (String) root.getMetadata("fs-version");
  84.         if (version == null) {
  85.           System.err.println("Can't detect version - exiting.");
  86.         } else {
  87.           String newVersion = newStore.getVersion();
  88.           System.err.println("Current version number is " + version + ".");
  89.           System.err.println("Target version number is " + newVersion + ".");
  90.           if (version.equals(newStore.getVersion())) {
  91.             System.err.println("No migration required.");
  92.             return 0;
  93.           }
  94.           // use version number to create Store
  95.           //Store oldStore = ... 
  96.           //migrate(oldStore, newStore);
  97.           System.err.println("Not currently implemented.");
  98.           return 0;
  99.         }
  100.       }
  101.       System.err.println("Can't detect version - exiting.");
  102.       return 0;
  103.     }
  104.     
  105.   }
  106.   
  107.   public void initialize(URI uri) throws IOException {
  108.     
  109.     
  110.     
  111.     try {
  112.       String accessKey = null;
  113.       String secretAccessKey = null;
  114.       String userInfo = uri.getUserInfo();
  115.       if (userInfo != null) {
  116.         int index = userInfo.indexOf(':');
  117.         if (index != -1) {
  118.           accessKey = userInfo.substring(0, index);
  119.           secretAccessKey = userInfo.substring(index + 1);
  120.         } else {
  121.           accessKey = userInfo;
  122.         }
  123.       }
  124.       if (accessKey == null) {
  125.         accessKey = getConf().get("fs.s3.awsAccessKeyId");
  126.       }
  127.       if (secretAccessKey == null) {
  128.         secretAccessKey = getConf().get("fs.s3.awsSecretAccessKey");
  129.       }
  130.       if (accessKey == null && secretAccessKey == null) {
  131.         throw new IllegalArgumentException("AWS " +
  132.                                            "Access Key ID and Secret Access Key " +
  133.                                            "must be specified as the username " +
  134.                                            "or password (respectively) of a s3 URL, " +
  135.                                            "or by setting the " +
  136.                                            "fs.s3.awsAccessKeyId or " +                         
  137.                                            "fs.s3.awsSecretAccessKey properties (respectively).");
  138.       } else if (accessKey == null) {
  139.         throw new IllegalArgumentException("AWS " +
  140.                                            "Access Key ID must be specified " +
  141.                                            "as the username of a s3 URL, or by setting the " +
  142.                                            "fs.s3.awsAccessKeyId property.");
  143.       } else if (secretAccessKey == null) {
  144.         throw new IllegalArgumentException("AWS " +
  145.                                            "Secret Access Key must be specified " +
  146.                                            "as the password of a s3 URL, or by setting the " +
  147.                                            "fs.s3.awsSecretAccessKey property.");         
  148.       }
  149.       AWSCredentials awsCredentials =
  150.         new AWSCredentials(accessKey, secretAccessKey);
  151.       this.s3Service = new RestS3Service(awsCredentials);
  152.     } catch (S3ServiceException e) {
  153.       if (e.getCause() instanceof IOException) {
  154.         throw (IOException) e.getCause();
  155.       }
  156.       throw new S3Exception(e);
  157.     }
  158.     bucket = new S3Bucket(uri.getHost());
  159.   }
  160.   
  161.   private void migrate(Store oldStore, FileSystemStore newStore)
  162.       throws IOException {
  163.     for (Path path : oldStore.listAllPaths()) {
  164.       INode inode = oldStore.retrieveINode(path);
  165.       oldStore.deleteINode(path);
  166.       newStore.storeINode(path, inode);
  167.     }
  168.   }
  169.   
  170.   private S3Object get(String key) {
  171.     try {
  172.       return s3Service.getObject(bucket, key);
  173.     } catch (S3ServiceException e) {
  174.       if ("NoSuchKey".equals(e.getS3ErrorCode())) {
  175.         return null;
  176.       }
  177.     }
  178.     return null;
  179.   }
  180.   
  181.   interface Store {
  182.     Set<Path> listAllPaths() throws IOException;
  183.     INode retrieveINode(Path path) throws IOException;
  184.     void deleteINode(Path path) throws IOException;
  185.     
  186.   }
  187.   
  188.   class UnversionedStore implements Store {
  189.     public Set<Path> listAllPaths() throws IOException {
  190.       try {
  191.         String prefix = urlEncode(Path.SEPARATOR);
  192.         S3Object[] objects = s3Service.listObjects(bucket, prefix, null);
  193.         Set<Path> prefixes = new TreeSet<Path>();
  194.         for (int i = 0; i < objects.length; i++) {
  195.           prefixes.add(keyToPath(objects[i].getKey()));
  196.         }
  197.         return prefixes;
  198.       } catch (S3ServiceException e) {
  199.         if (e.getCause() instanceof IOException) {
  200.           throw (IOException) e.getCause();
  201.         }
  202.         throw new S3Exception(e);
  203.       }   
  204.     }
  205.     public void deleteINode(Path path) throws IOException {
  206.       delete(pathToKey(path));
  207.     }
  208.     
  209.     private void delete(String key) throws IOException {
  210.       try {
  211.         s3Service.deleteObject(bucket, key);
  212.       } catch (S3ServiceException e) {
  213.         if (e.getCause() instanceof IOException) {
  214.           throw (IOException) e.getCause();
  215.         }
  216.         throw new S3Exception(e);
  217.       }
  218.     }
  219.     
  220.     public INode retrieveINode(Path path) throws IOException {
  221.       return INode.deserialize(get(pathToKey(path)));
  222.     }
  223.     private InputStream get(String key) throws IOException {
  224.       try {
  225.         S3Object object = s3Service.getObject(bucket, key);
  226.         return object.getDataInputStream();
  227.       } catch (S3ServiceException e) {
  228.         if ("NoSuchKey".equals(e.getS3ErrorCode())) {
  229.           return null;
  230.         }
  231.         if (e.getCause() instanceof IOException) {
  232.           throw (IOException) e.getCause();
  233.         }
  234.         throw new S3Exception(e);
  235.       }
  236.     }
  237.     
  238.     private String pathToKey(Path path) {
  239.       if (!path.isAbsolute()) {
  240.         throw new IllegalArgumentException("Path must be absolute: " + path);
  241.       }
  242.       return urlEncode(path.toUri().getPath());
  243.     }
  244.     
  245.     private Path keyToPath(String key) {
  246.       return new Path(urlDecode(key));
  247.     }
  248.     private String urlEncode(String s) {
  249.       try {
  250.         return URLEncoder.encode(s, "UTF-8");
  251.       } catch (UnsupportedEncodingException e) {
  252.         // Should never happen since every implementation of the Java Platform
  253.         // is required to support UTF-8.
  254.         // See http://java.sun.com/j2se/1.5.0/docs/api/java/nio/charset/Charset.html
  255.         throw new IllegalStateException(e);
  256.       }
  257.     }
  258.     
  259.     private String urlDecode(String s) {
  260.       try {
  261.         return URLDecoder.decode(s, "UTF-8");
  262.       } catch (UnsupportedEncodingException e) {
  263.         // Should never happen since every implementation of the Java Platform
  264.         // is required to support UTF-8.
  265.         // See http://java.sun.com/j2se/1.5.0/docs/api/java/nio/charset/Charset.html
  266.         throw new IllegalStateException(e);
  267.       }
  268.     }
  269.     
  270.   }
  271.   
  272. }