HftpFileSystem.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:12k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfs;
  19. import java.io.FileNotFoundException;
  20. import java.io.InputStream;
  21. import java.io.IOException;
  22. import java.net.HttpURLConnection;
  23. import java.net.InetAddress;
  24. import java.net.InetSocketAddress;
  25. import java.net.URI;
  26. import java.net.URISyntaxException;
  27. import java.net.URL;
  28. import java.net.UnknownHostException;
  29. import java.text.ParseException;
  30. import java.text.SimpleDateFormat;
  31. import java.util.ArrayList;
  32. import java.util.Random;
  33. import javax.security.auth.login.LoginException;
  34. import org.xml.sax.Attributes;
  35. import org.xml.sax.InputSource;
  36. import org.xml.sax.SAXException;
  37. import org.xml.sax.XMLReader;
  38. import org.xml.sax.helpers.DefaultHandler;
  39. import org.xml.sax.helpers.XMLReaderFactory;
  40. import org.apache.hadoop.conf.Configuration;
  41. import org.apache.hadoop.net.NetUtils;
  42. import org.apache.hadoop.fs.FileChecksum;
  43. import org.apache.hadoop.fs.FileStatus;
  44. import org.apache.hadoop.fs.FileSystem;
  45. import org.apache.hadoop.fs.FSInputStream;
  46. import org.apache.hadoop.fs.FSDataInputStream;
  47. import org.apache.hadoop.fs.FSDataOutputStream;
  48. import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
  49. import org.apache.hadoop.fs.Path;
  50. import org.apache.hadoop.fs.permission.FsPermission;
  51. import org.apache.hadoop.hdfs.server.namenode.ListPathsServlet;
  52. import org.apache.hadoop.ipc.RemoteException;
  53. import org.apache.hadoop.security.*;
  54. import org.apache.hadoop.util.Progressable;
  55. import org.apache.hadoop.util.StringUtils;
  56. /** An implementation of a protocol for accessing filesystems over HTTP.
  57.  * The following implementation provides a limited, read-only interface
  58.  * to a filesystem over HTTP.
  59.  * @see org.apache.hadoop.hdfs.server.namenode.ListPathsServlet
  60.  * @see org.apache.hadoop.hdfs.server.namenode.FileDataServlet
  61.  */
  62. public class HftpFileSystem extends FileSystem {
  63.   static {
  64.     HttpURLConnection.setFollowRedirects(true);
  65.   }
  66.   protected InetSocketAddress nnAddr;
  67.   protected UserGroupInformation ugi; 
  68.   protected final Random ran = new Random();
  69.   protected static final SimpleDateFormat df = ListPathsServlet.df;
  70.   @Override
  71.   public void initialize(URI name, Configuration conf) throws IOException {
  72.     super.initialize(name, conf);
  73.     setConf(conf);
  74.     try {
  75.       this.ugi = UnixUserGroupInformation.login(conf, true);
  76.     } catch (LoginException le) {
  77.       throw new IOException(StringUtils.stringifyException(le));
  78.     }
  79.     nnAddr = NetUtils.createSocketAddr(name.toString());
  80.   }
  81.   
  82.   /** randomly pick one from all available IP addresses of a given hostname */
  83.   protected String pickOneAddress(String hostname) throws UnknownHostException {
  84.     if ("localhost".equals(hostname))
  85.       return hostname;
  86.     InetAddress[] addrs = InetAddress.getAllByName(hostname);
  87.     if (addrs.length > 1)
  88.       return addrs[ran.nextInt(addrs.length)].getHostAddress();
  89.     return addrs[0].getHostAddress();
  90.   }
  91.   @Override
  92.   public URI getUri() {
  93.     try {
  94.       return new URI("hftp", null, pickOneAddress(nnAddr.getHostName()), nnAddr.getPort(),
  95.                      null, null, null);
  96.     } catch (URISyntaxException e) {
  97.       return null;
  98.     } catch (UnknownHostException e) {
  99.       return null;
  100.     }
  101.   }
  102.   /**
  103.    * Open an HTTP connection to the namenode to read file data and metadata.
  104.    * @param path The path component of the URL
  105.    * @param query The query component of the URL
  106.    */
  107.   protected HttpURLConnection openConnection(String path, String query)
  108.       throws IOException {
  109.     try {
  110.       final URL url = new URI("http", null, pickOneAddress(nnAddr.getHostName()),
  111.           nnAddr.getPort(), path, query, null).toURL();
  112.       if (LOG.isTraceEnabled()) {
  113.         LOG.trace("url=" + url);
  114.       }
  115.       return (HttpURLConnection)url.openConnection();
  116.     } catch (URISyntaxException e) {
  117.       throw (IOException)new IOException().initCause(e);
  118.     }
  119.   }
  120.   @Override
  121.   public FSDataInputStream open(Path f, int buffersize) throws IOException {
  122.     HttpURLConnection connection = null;
  123.     connection = openConnection("/data" + f.toUri().getPath(), "ugi=" + ugi);
  124.     connection.setRequestMethod("GET");
  125.     connection.connect();
  126.     final InputStream in = connection.getInputStream();
  127.     return new FSDataInputStream(new FSInputStream() {
  128.         public int read() throws IOException {
  129.           return in.read();
  130.         }
  131.         public int read(byte[] b, int off, int len) throws IOException {
  132.           return in.read(b, off, len);
  133.         }
  134.         public void close() throws IOException {
  135.           in.close();
  136.         }
  137.         public void seek(long pos) throws IOException {
  138.           throw new IOException("Can't seek!");
  139.         }
  140.         public long getPos() throws IOException {
  141.           throw new IOException("Position unknown!");
  142.         }
  143.         public boolean seekToNewSource(long targetPos) throws IOException {
  144.           return false;
  145.         }
  146.       });
  147.   }
  148.   /** Class to parse and store a listing reply from the server. */
  149.   class LsParser extends DefaultHandler {
  150.     ArrayList<FileStatus> fslist = new ArrayList<FileStatus>();
  151.     public void startElement(String ns, String localname, String qname,
  152.                 Attributes attrs) throws SAXException {
  153.       if ("listing".equals(qname)) return;
  154.       if (!"file".equals(qname) && !"directory".equals(qname)) {
  155.         if (RemoteException.class.getSimpleName().equals(qname)) {
  156.           throw new SAXException(RemoteException.valueOf(attrs));
  157.         }
  158.         throw new SAXException("Unrecognized entry: " + qname);
  159.       }
  160.       long modif;
  161.       long atime = 0;
  162.       try {
  163.         modif = df.parse(attrs.getValue("modified")).getTime();
  164.         String astr = attrs.getValue("accesstime");
  165.         if (astr != null) {
  166.           atime = df.parse(astr).getTime();
  167.         }
  168.       } catch (ParseException e) { throw new SAXException(e); }
  169.       FileStatus fs = "file".equals(qname)
  170.         ? new FileStatus(
  171.               Long.valueOf(attrs.getValue("size")).longValue(), false,
  172.               Short.valueOf(attrs.getValue("replication")).shortValue(),
  173.               Long.valueOf(attrs.getValue("blocksize")).longValue(),
  174.               modif, atime, FsPermission.valueOf(attrs.getValue("permission")),
  175.               attrs.getValue("owner"), attrs.getValue("group"),
  176.               new Path(getUri().toString(), attrs.getValue("path"))
  177.                 .makeQualified(HftpFileSystem.this))
  178.         : new FileStatus(0L, true, 0, 0L,
  179.               modif, atime, FsPermission.valueOf(attrs.getValue("permission")),
  180.               attrs.getValue("owner"), attrs.getValue("group"),
  181.               new Path(getUri().toString(), attrs.getValue("path"))
  182.                 .makeQualified(HftpFileSystem.this));
  183.       fslist.add(fs);
  184.     }
  185.     private void fetchList(String path, boolean recur) throws IOException {
  186.       try {
  187.         XMLReader xr = XMLReaderFactory.createXMLReader();
  188.         xr.setContentHandler(this);
  189.         HttpURLConnection connection = openConnection("/listPaths" + path,
  190.             "ugi=" + ugi + (recur? "&recursive=yes" : ""));
  191.         connection.setRequestMethod("GET");
  192.         connection.connect();
  193.         InputStream resp = connection.getInputStream();
  194.         xr.parse(new InputSource(resp));
  195.       } catch(SAXException e) {
  196.         final Exception embedded = e.getException();
  197.         if (embedded != null && embedded instanceof IOException) {
  198.           throw (IOException)embedded;
  199.         }
  200.         throw new IOException("invalid xml directory content", e);
  201.       }
  202.     }
  203.     public FileStatus getFileStatus(Path f) throws IOException {
  204.       fetchList(f.toUri().getPath(), false);
  205.       if (fslist.size() == 0) {
  206.         throw new FileNotFoundException("File does not exist: " + f);
  207.       }
  208.       return fslist.get(0);
  209.     }
  210.     public FileStatus[] listStatus(Path f, boolean recur) throws IOException {
  211.       fetchList(f.toUri().getPath(), recur);
  212.       if (fslist.size() > 0 && (fslist.size() != 1 || fslist.get(0).isDir())) {
  213.         fslist.remove(0);
  214.       }
  215.       return fslist.toArray(new FileStatus[0]);
  216.     }
  217.     public FileStatus[] listStatus(Path f) throws IOException {
  218.       return listStatus(f, false);
  219.     }
  220.   }
  221.   @Override
  222.   public FileStatus[] listStatus(Path f) throws IOException {
  223.     LsParser lsparser = new LsParser();
  224.     return lsparser.listStatus(f);
  225.   }
  226.   @Override
  227.   public FileStatus getFileStatus(Path f) throws IOException {
  228.     LsParser lsparser = new LsParser();
  229.     return lsparser.getFileStatus(f);
  230.   }
  231.   private class ChecksumParser extends DefaultHandler {
  232.     private FileChecksum filechecksum;
  233.     /** {@inheritDoc} */
  234.     public void startElement(String ns, String localname, String qname,
  235.                 Attributes attrs) throws SAXException {
  236.       if (!MD5MD5CRC32FileChecksum.class.getName().equals(qname)) {
  237.         if (RemoteException.class.getSimpleName().equals(qname)) {
  238.           throw new SAXException(RemoteException.valueOf(attrs));
  239.         }
  240.         throw new SAXException("Unrecognized entry: " + qname);
  241.       }
  242.       filechecksum = MD5MD5CRC32FileChecksum.valueOf(attrs);
  243.     }
  244.     private FileChecksum getFileChecksum(Path f) throws IOException {
  245.       final HttpURLConnection connection = openConnection(
  246.           "/fileChecksum" + f, "ugi=" + ugi);
  247.       try {
  248.         final XMLReader xr = XMLReaderFactory.createXMLReader();
  249.         xr.setContentHandler(this);
  250.         connection.setRequestMethod("GET");
  251.         connection.connect();
  252.         xr.parse(new InputSource(connection.getInputStream()));
  253.       } catch(SAXException e) {
  254.         final Exception embedded = e.getException();
  255.         if (embedded != null && embedded instanceof IOException) {
  256.           throw (IOException)embedded;
  257.         }
  258.         throw new IOException("invalid xml directory content", e);
  259.       } finally {
  260.         connection.disconnect();
  261.       }
  262.       return filechecksum;
  263.     }
  264.   }
  265.   /** {@inheritDoc} */
  266.   public FileChecksum getFileChecksum(Path f) throws IOException {
  267.     return new ChecksumParser().getFileChecksum(f);
  268.   }
  269.   @Override
  270.   public Path getWorkingDirectory() {
  271.     return new Path("/").makeQualified(this);
  272.   }
  273.   @Override
  274.   public void setWorkingDirectory(Path f) { }
  275.   /** This optional operation is not yet supported. */
  276.   public FSDataOutputStream append(Path f, int bufferSize,
  277.       Progressable progress) throws IOException {
  278.     throw new IOException("Not supported");
  279.   }
  280.   @Override
  281.   public FSDataOutputStream create(Path f, FsPermission permission,
  282.                                    boolean overwrite, int bufferSize,
  283.                                    short replication, long blockSize,
  284.                                    Progressable progress) throws IOException {
  285.     throw new IOException("Not supported");
  286.   }
  287.   @Override
  288.   public boolean rename(Path src, Path dst) throws IOException {
  289.     throw new IOException("Not supported");
  290.   }
  291.   @Override
  292.   /*
  293.    * @deprecated Use delete(path, boolean)
  294.    */
  295.   @Deprecated
  296.   public boolean delete(Path f) throws IOException {
  297.     throw new IOException("Not supported");
  298.   }
  299.   
  300.   @Override 
  301.   public boolean delete(Path f, boolean recursive) throws IOException {
  302.     throw new IOException("Not supported");
  303.   }
  304.   
  305.   @Override
  306.   public boolean mkdirs(Path f, FsPermission permission) throws IOException {
  307.     throw new IOException("Not supported");
  308.   }
  309. }