FTPFileSystem.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:20k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.fs.ftp;
  19. import java.io.FileNotFoundException;
  20. import java.io.IOException;
  21. import java.io.InputStream;
  22. import java.net.URI;
  23. import org.apache.commons.logging.Log;
  24. import org.apache.commons.logging.LogFactory;
  25. import org.apache.commons.net.ftp.FTP;
  26. import org.apache.commons.net.ftp.FTPClient;
  27. import org.apache.commons.net.ftp.FTPFile;
  28. import org.apache.commons.net.ftp.FTPReply;
  29. import org.apache.hadoop.conf.Configuration;
  30. import org.apache.hadoop.fs.FSDataInputStream;
  31. import org.apache.hadoop.fs.FSDataOutputStream;
  32. import org.apache.hadoop.fs.FileStatus;
  33. import org.apache.hadoop.fs.FileSystem;
  34. import org.apache.hadoop.fs.Path;
  35. import org.apache.hadoop.fs.permission.FsAction;
  36. import org.apache.hadoop.fs.permission.FsPermission;
  37. import org.apache.hadoop.util.Progressable;
  38. /**
  39.  * <p>
  40.  * A {@link FileSystem} backed by an FTP client provided by <a
  41.  * href="http://commons.apache.org/net/">Apache Commons Net</a>.
  42.  * </p>
  43.  */
  44. public class FTPFileSystem extends FileSystem {
  45.   public static final Log LOG = LogFactory
  46.       .getLog(FTPFileSystem.class);
  47.   public static final int DEFAULT_BUFFER_SIZE = 1024 * 1024;
  48.   public static final int DEFAULT_BLOCK_SIZE = 4 * 1024;
  49.   private URI uri;
  50.   @Override
  51.   public void initialize(URI uri, Configuration conf) throws IOException { // get
  52.     super.initialize(uri, conf);
  53.     // get host information from uri (overrides info in conf)
  54.     String host = uri.getHost();
  55.     host = (host == null) ? conf.get("fs.ftp.host", null) : host;
  56.     if (host == null) {
  57.       throw new IOException("Invalid host specified");
  58.     }
  59.     conf.set("fs.ftp.host", host);
  60.     // get port information from uri, (overrides info in conf)
  61.     int port = uri.getPort();
  62.     port = (port == -1) ? FTP.DEFAULT_PORT : port;
  63.     conf.setInt("fs.ftp.host.port", port);
  64.     // get user/password information from URI (overrides info in conf)
  65.     String userAndPassword = uri.getUserInfo();
  66.     if (userAndPassword == null) {
  67.       userAndPassword = (conf.get("fs.ftp.user." + host, null) + ":" + conf
  68.           .get("fs.ftp.password." + host, null));
  69.       if (userAndPassword == null) {
  70.         throw new IOException("Invalid user/passsword specified");
  71.       }
  72.     }
  73.     String[] userPasswdInfo = userAndPassword.split(":");
  74.     conf.set("fs.ftp.user." + host, userPasswdInfo[0]);
  75.     if (userPasswdInfo.length > 1) {
  76.       conf.set("fs.ftp.password." + host, userPasswdInfo[1]);
  77.     } else {
  78.       conf.set("fs.ftp.password." + host, null);
  79.     }
  80.     setConf(conf);
  81.     this.uri = uri;
  82.   }
  83.   /**
  84.    * Connect to the FTP server using configuration parameters *
  85.    * 
  86.    * @return An FTPClient instance
  87.    * @throws IOException
  88.    */
  89.   private FTPClient connect() throws IOException {
  90.     FTPClient client = null;
  91.     Configuration conf = getConf();
  92.     String host = conf.get("fs.ftp.host");
  93.     int port = conf.getInt("fs.ftp.host.port", FTP.DEFAULT_PORT);
  94.     String user = conf.get("fs.ftp.user." + host);
  95.     String password = conf.get("fs.ftp.password." + host);
  96.     client = new FTPClient();
  97.     client.connect(host, port);
  98.     int reply = client.getReplyCode();
  99.     if (!FTPReply.isPositiveCompletion(reply)) {
  100.       throw new IOException("Server - " + host
  101.           + " refused connection on port - " + port);
  102.     } else if (client.login(user, password)) {
  103.       client.setFileTransferMode(FTP.BLOCK_TRANSFER_MODE);
  104.       client.setFileType(FTP.BINARY_FILE_TYPE);
  105.       client.setBufferSize(DEFAULT_BUFFER_SIZE);
  106.     } else {
  107.       throw new IOException("Login failed on server - " + host + ", port - "
  108.           + port);
  109.     }
  110.     return client;
  111.   }
  112.   /**
  113.    * Logout and disconnect the given FTPClient. *
  114.    * 
  115.    * @param client
  116.    * @throws IOException
  117.    */
  118.   private void disconnect(FTPClient client) throws IOException {
  119.     if (client != null) {
  120.       if (!client.isConnected()) {
  121.         throw new FTPException("Client not connected");
  122.       }
  123.       boolean logoutSuccess = client.logout();
  124.       client.disconnect();
  125.       if (!logoutSuccess) {
  126.         LOG.warn("Logout failed while disconnecting, error code - "
  127.             + client.getReplyCode());
  128.       }
  129.     }
  130.   }
  131.   /**
  132.    * Resolve against given working directory. *
  133.    * 
  134.    * @param workDir
  135.    * @param path
  136.    * @return
  137.    */
  138.   private Path makeAbsolute(Path workDir, Path path) {
  139.     if (path.isAbsolute()) {
  140.       return path;
  141.     }
  142.     return new Path(workDir, path);
  143.   }
  144.   @Override
  145.   public FSDataInputStream open(Path file, int bufferSize) throws IOException {
  146.     FTPClient client = connect();
  147.     Path workDir = new Path(client.printWorkingDirectory());
  148.     Path absolute = makeAbsolute(workDir, file);
  149.     FileStatus fileStat = getFileStatus(client, absolute);
  150.     if (fileStat.isDir()) {
  151.       disconnect(client);
  152.       throw new IOException("Path " + file + " is a directory.");
  153.     }
  154.     client.allocate(bufferSize);
  155.     Path parent = absolute.getParent();
  156.     // Change to parent directory on the
  157.     // server. Only then can we read the
  158.     // file
  159.     // on the server by opening up an InputStream. As a side effect the working
  160.     // directory on the server is changed to the parent directory of the file.
  161.     // The FTP client connection is closed when close() is called on the
  162.     // FSDataInputStream.
  163.     client.changeWorkingDirectory(parent.toUri().getPath());
  164.     InputStream is = client.retrieveFileStream(file.getName());
  165.     FSDataInputStream fis = new FSDataInputStream(new FTPInputStream(is,
  166.         client, statistics));
  167.     if (!FTPReply.isPositivePreliminary(client.getReplyCode())) {
  168.       // The ftpClient is an inconsistent state. Must close the stream
  169.       // which in turn will logout and disconnect from FTP server
  170.       fis.close();
  171.       throw new IOException("Unable to open file: " + file + ", Aborting");
  172.     }
  173.     return fis;
  174.   }
  175.   /**
  176.    * A stream obtained via this call must be closed before using other APIs of
  177.    * this class or else the invocation will block.
  178.    */
  179.   @Override
  180.   public FSDataOutputStream create(Path file, FsPermission permission,
  181.       boolean overwrite, int bufferSize, short replication, long blockSize,
  182.       Progressable progress) throws IOException {
  183.     final FTPClient client = connect();
  184.     Path workDir = new Path(client.printWorkingDirectory());
  185.     Path absolute = makeAbsolute(workDir, file);
  186.     if (exists(client, file)) {
  187.       if (overwrite) {
  188.         delete(client, file);
  189.       } else {
  190.         disconnect(client);
  191.         throw new IOException("File already exists: " + file);
  192.       }
  193.     }
  194.     Path parent = absolute.getParent();
  195.     if (parent == null || !mkdirs(client, parent, FsPermission.getDefault())) {
  196.       parent = (parent == null) ? new Path("/") : parent;
  197.       disconnect(client);
  198.       throw new IOException("create(): Mkdirs failed to create: " + parent);
  199.     }
  200.     client.allocate(bufferSize);
  201.     // Change to parent directory on the server. Only then can we write to the
  202.     // file on the server by opening up an OutputStream. As a side effect the
  203.     // working directory on the server is changed to the parent directory of the
  204.     // file. The FTP client connection is closed when close() is called on the
  205.     // FSDataOutputStream.
  206.     client.changeWorkingDirectory(parent.toUri().getPath());
  207.     FSDataOutputStream fos = new FSDataOutputStream(client.storeFileStream(file
  208.         .getName()), statistics) {
  209.       @Override
  210.       public void close() throws IOException {
  211.         super.close();
  212.         if (!client.isConnected()) {
  213.           throw new FTPException("Client not connected");
  214.         }
  215.         boolean cmdCompleted = client.completePendingCommand();
  216.         disconnect(client);
  217.         if (!cmdCompleted) {
  218.           throw new FTPException("Could not complete transfer, Reply Code - "
  219.               + client.getReplyCode());
  220.         }
  221.       }
  222.     };
  223.     if (!FTPReply.isPositivePreliminary(client.getReplyCode())) {
  224.       // The ftpClient is an inconsistent state. Must close the stream
  225.       // which in turn will logout and disconnect from FTP server
  226.       fos.close();
  227.       throw new IOException("Unable to create file: " + file + ", Aborting");
  228.     }
  229.     return fos;
  230.   }
  231.   /** This optional operation is not yet supported. */
  232.   public FSDataOutputStream append(Path f, int bufferSize,
  233.       Progressable progress) throws IOException {
  234.     throw new IOException("Not supported");
  235.   }
  236.   
  237.   /**
  238.    * Convenience method, so that we don't open a new connection when using this
  239.    * method from within another method. Otherwise every API invocation incurs
  240.    * the overhead of opening/closing a TCP connection.
  241.    */
  242.   private boolean exists(FTPClient client, Path file) {
  243.     try {
  244.       return getFileStatus(client, file) != null;
  245.     } catch (FileNotFoundException fnfe) {
  246.       return false;
  247.     } catch (IOException ioe) {
  248.       throw new FTPException("Failed to get file status", ioe);
  249.     }
  250.   }
  251.   /** @deprecated Use delete(Path, boolean) instead */
  252.   @Override
  253.   @Deprecated
  254.   public boolean delete(Path file) throws IOException {
  255.     return delete(file, false);
  256.   }
  257.   @Override
  258.   public boolean delete(Path file, boolean recursive) throws IOException {
  259.     FTPClient client = connect();
  260.     try {
  261.       boolean success = delete(client, file, recursive);
  262.       return success;
  263.     } finally {
  264.       disconnect(client);
  265.     }
  266.   }
  267.   /** @deprecated Use delete(Path, boolean) instead */
  268.   @Deprecated
  269.   private boolean delete(FTPClient client, Path file) throws IOException {
  270.     return delete(client, file, false);
  271.   }
  272.   /**
  273.    * Convenience method, so that we don't open a new connection when using this
  274.    * method from within another method. Otherwise every API invocation incurs
  275.    * the overhead of opening/closing a TCP connection.
  276.    */
  277.   private boolean delete(FTPClient client, Path file, boolean recursive)
  278.       throws IOException {
  279.     Path workDir = new Path(client.printWorkingDirectory());
  280.     Path absolute = makeAbsolute(workDir, file);
  281.     String pathName = absolute.toUri().getPath();
  282.     FileStatus fileStat = getFileStatus(client, absolute);
  283.     if (!fileStat.isDir()) {
  284.       return client.deleteFile(pathName);
  285.     }
  286.     FileStatus[] dirEntries = listStatus(client, absolute);
  287.     if (dirEntries != null && dirEntries.length > 0 && !(recursive)) {
  288.       throw new IOException("Directory: " + file + " is not empty.");
  289.     }
  290.     if (dirEntries != null) {
  291.       for (int i = 0; i < dirEntries.length; i++) {
  292.         delete(client, new Path(absolute, dirEntries[i].getPath()), recursive);
  293.       }
  294.     }
  295.     return client.removeDirectory(pathName);
  296.   }
  297.   private FsAction getFsAction(int accessGroup, FTPFile ftpFile) {
  298.     FsAction action = FsAction.NONE;
  299.     if (ftpFile.hasPermission(accessGroup, FTPFile.READ_PERMISSION)) {
  300.       action.or(FsAction.READ);
  301.     }
  302.     if (ftpFile.hasPermission(accessGroup, FTPFile.WRITE_PERMISSION)) {
  303.       action.or(FsAction.WRITE);
  304.     }
  305.     if (ftpFile.hasPermission(accessGroup, FTPFile.EXECUTE_PERMISSION)) {
  306.       action.or(FsAction.EXECUTE);
  307.     }
  308.     return action;
  309.   }
  310.   private FsPermission getPermissions(FTPFile ftpFile) {
  311.     FsAction user, group, others;
  312.     user = getFsAction(FTPFile.USER_ACCESS, ftpFile);
  313.     group = getFsAction(FTPFile.GROUP_ACCESS, ftpFile);
  314.     others = getFsAction(FTPFile.WORLD_ACCESS, ftpFile);
  315.     return new FsPermission(user, group, others);
  316.   }
  317.   @Override
  318.   public URI getUri() {
  319.     return uri;
  320.   }
  321.   @Override
  322.   public FileStatus[] listStatus(Path file) throws IOException {
  323.     FTPClient client = connect();
  324.     try {
  325.       FileStatus[] stats = listStatus(client, file);
  326.       return stats;
  327.     } finally {
  328.       disconnect(client);
  329.     }
  330.   }
  331.   /**
  332.    * Convenience method, so that we don't open a new connection when using this
  333.    * method from within another method. Otherwise every API invocation incurs
  334.    * the overhead of opening/closing a TCP connection.
  335.    */
  336.   private FileStatus[] listStatus(FTPClient client, Path file)
  337.       throws IOException {
  338.     Path workDir = new Path(client.printWorkingDirectory());
  339.     Path absolute = makeAbsolute(workDir, file);
  340.     FileStatus fileStat = getFileStatus(client, absolute);
  341.     if (!fileStat.isDir()) {
  342.       return new FileStatus[] { fileStat };
  343.     }
  344.     FTPFile[] ftpFiles = client.listFiles(absolute.toUri().getPath());
  345.     FileStatus[] fileStats = new FileStatus[ftpFiles.length];
  346.     for (int i = 0; i < ftpFiles.length; i++) {
  347.       fileStats[i] = getFileStatus(ftpFiles[i], absolute);
  348.     }
  349.     return fileStats;
  350.   }
  351.   @Override
  352.   public FileStatus getFileStatus(Path file) throws IOException {
  353.     FTPClient client = connect();
  354.     try {
  355.       FileStatus status = getFileStatus(client, file);
  356.       return status;
  357.     } finally {
  358.       disconnect(client);
  359.     }
  360.   }
  361.   /**
  362.    * Convenience method, so that we don't open a new connection when using this
  363.    * method from within another method. Otherwise every API invocation incurs
  364.    * the overhead of opening/closing a TCP connection.
  365.    */
  366.   private FileStatus getFileStatus(FTPClient client, Path file)
  367.       throws IOException {
  368.     FileStatus fileStat = null;
  369.     Path workDir = new Path(client.printWorkingDirectory());
  370.     Path absolute = makeAbsolute(workDir, file);
  371.     Path parentPath = absolute.getParent();
  372.     if (parentPath == null) { // root dir
  373.       long length = -1; // Length of root dir on server not known
  374.       boolean isDir = true;
  375.       int blockReplication = 1;
  376.       long blockSize = DEFAULT_BLOCK_SIZE; // Block Size not known.
  377.       long modTime = -1; // Modification time of root dir not known.
  378.       Path root = new Path("/");
  379.       return new FileStatus(length, isDir, blockReplication, blockSize,
  380.           modTime, root.makeQualified(this));
  381.     }
  382.     String pathName = parentPath.toUri().getPath();
  383.     FTPFile[] ftpFiles = client.listFiles(pathName);
  384.     if (ftpFiles != null) {
  385.       for (FTPFile ftpFile : ftpFiles) {
  386.         if (ftpFile.getName().equals(file.getName())) { // file found in dir
  387.           fileStat = getFileStatus(ftpFile, parentPath);
  388.           break;
  389.         }
  390.       }
  391.       if (fileStat == null) {
  392.         throw new FileNotFoundException("File " + file + " does not exist.");
  393.       }
  394.     } else {
  395.       throw new FileNotFoundException("File " + file + " does not exist.");
  396.     }
  397.     return fileStat;
  398.   }
  399.   /**
  400.    * Convert the file information in FTPFile to a {@link FileStatus} object. *
  401.    * 
  402.    * @param ftpFile
  403.    * @param parentPath
  404.    * @return FileStatus
  405.    */
  406.   private FileStatus getFileStatus(FTPFile ftpFile, Path parentPath) {
  407.     long length = ftpFile.getSize();
  408.     boolean isDir = ftpFile.isDirectory();
  409.     int blockReplication = 1;
  410.     // Using default block size since there is no way in FTP client to know of
  411.     // block sizes on server. The assumption could be less than ideal.
  412.     long blockSize = DEFAULT_BLOCK_SIZE;
  413.     long modTime = ftpFile.getTimestamp().getTimeInMillis();
  414.     long accessTime = 0;
  415.     FsPermission permission = getPermissions(ftpFile);
  416.     String user = ftpFile.getUser();
  417.     String group = ftpFile.getGroup();
  418.     Path filePath = new Path(parentPath, ftpFile.getName());
  419.     return new FileStatus(length, isDir, blockReplication, blockSize, modTime,
  420.         accessTime, permission, user, group, filePath.makeQualified(this));
  421.   }
  422.   @Override
  423.   public boolean mkdirs(Path file, FsPermission permission) throws IOException {
  424.     FTPClient client = connect();
  425.     try {
  426.       boolean success = mkdirs(client, file, permission);
  427.       return success;
  428.     } finally {
  429.       disconnect(client);
  430.     }
  431.   }
  432.   /**
  433.    * Convenience method, so that we don't open a new connection when using this
  434.    * method from within another method. Otherwise every API invocation incurs
  435.    * the overhead of opening/closing a TCP connection.
  436.    */
  437.   private boolean mkdirs(FTPClient client, Path file, FsPermission permission)
  438.       throws IOException {
  439.     boolean created = true;
  440.     Path workDir = new Path(client.printWorkingDirectory());
  441.     Path absolute = makeAbsolute(workDir, file);
  442.     String pathName = absolute.getName();
  443.     if (!exists(client, absolute)) {
  444.       Path parent = absolute.getParent();
  445.       created = (parent == null || mkdirs(client, parent, FsPermission
  446.           .getDefault()));
  447.       if (created) {
  448.         String parentDir = parent.toUri().getPath();
  449.         client.changeWorkingDirectory(parentDir);
  450.         created = created & client.makeDirectory(pathName);
  451.       }
  452.     } else if (isFile(client, absolute)) {
  453.       throw new IOException(String.format(
  454.           "Can't make directory for path %s since it is a file.", absolute));
  455.     }
  456.     return created;
  457.   }
  458.   /**
  459.    * Convenience method, so that we don't open a new connection when using this
  460.    * method from within another method. Otherwise every API invocation incurs
  461.    * the overhead of opening/closing a TCP connection.
  462.    */
  463.   private boolean isFile(FTPClient client, Path file) {
  464.     try {
  465.       return !getFileStatus(client, file).isDir();
  466.     } catch (FileNotFoundException e) {
  467.       return false; // file does not exist
  468.     } catch (IOException ioe) {
  469.       throw new FTPException("File check failed", ioe);
  470.     }
  471.   }
  472.   /*
  473.    * Assuming that parent of both source and destination is the same. Is the
  474.    * assumption correct or it is suppose to work like 'move' ?
  475.    */
  476.   @Override
  477.   public boolean rename(Path src, Path dst) throws IOException {
  478.     FTPClient client = connect();
  479.     try {
  480.       boolean success = rename(client, src, dst);
  481.       return success;
  482.     } finally {
  483.       disconnect(client);
  484.     }
  485.   }
  486.   /**
  487.    * Convenience method, so that we don't open a new connection when using this
  488.    * method from within another method. Otherwise every API invocation incurs
  489.    * the overhead of opening/closing a TCP connection.
  490.    * 
  491.    * @param client
  492.    * @param src
  493.    * @param dst
  494.    * @return
  495.    * @throws IOException
  496.    */
  497.   private boolean rename(FTPClient client, Path src, Path dst)
  498.       throws IOException {
  499.     Path workDir = new Path(client.printWorkingDirectory());
  500.     Path absoluteSrc = makeAbsolute(workDir, src);
  501.     Path absoluteDst = makeAbsolute(workDir, dst);
  502.     if (!exists(client, absoluteSrc)) {
  503.       throw new IOException("Source path " + src + " does not exist");
  504.     }
  505.     if (exists(client, absoluteDst)) {
  506.       throw new IOException("Destination path " + dst
  507.           + " already exist, cannot rename!");
  508.     }
  509.     String parentSrc = absoluteSrc.getParent().toUri().toString();
  510.     String parentDst = absoluteDst.getParent().toUri().toString();
  511.     String from = src.getName();
  512.     String to = dst.getName();
  513.     if (!parentSrc.equals(parentDst)) {
  514.       throw new IOException("Cannot rename parent(source): " + parentSrc
  515.           + ", parent(destination):  " + parentDst);
  516.     }
  517.     client.changeWorkingDirectory(parentSrc);
  518.     boolean renamed = client.rename(from, to);
  519.     return renamed;
  520.   }
  521.   @Override
  522.   public Path getWorkingDirectory() {
  523.     // Return home directory always since we do not maintain state.
  524.     return getHomeDirectory();
  525.   }
  526.   @Override
  527.   public Path getHomeDirectory() {
  528.     FTPClient client = null;
  529.     try {
  530.       client = connect();
  531.       Path homeDir = new Path(client.printWorkingDirectory());
  532.       return homeDir;
  533.     } catch (IOException ioe) {
  534.       throw new FTPException("Failed to get home directory", ioe);
  535.     } finally {
  536.       try {
  537.         disconnect(client);
  538.       } catch (IOException ioe) {
  539.         throw new FTPException("Failed to disconnect", ioe);
  540.       }
  541.     }
  542.   }
  543.   @Override
  544.   public void setWorkingDirectory(Path newDir) {
  545.     // we do not maintain the working directory state
  546.   }
  547. }