HftpFileSystem.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:12k
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.apache.hadoop.hdfs;
- import java.io.FileNotFoundException;
- import java.io.InputStream;
- import java.io.IOException;
- import java.net.HttpURLConnection;
- import java.net.InetAddress;
- import java.net.InetSocketAddress;
- import java.net.URI;
- import java.net.URISyntaxException;
- import java.net.URL;
- import java.net.UnknownHostException;
- import java.text.ParseException;
- import java.text.SimpleDateFormat;
- import java.util.ArrayList;
- import java.util.Random;
- import javax.security.auth.login.LoginException;
- import org.xml.sax.Attributes;
- import org.xml.sax.InputSource;
- import org.xml.sax.SAXException;
- import org.xml.sax.XMLReader;
- import org.xml.sax.helpers.DefaultHandler;
- import org.xml.sax.helpers.XMLReaderFactory;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.net.NetUtils;
- import org.apache.hadoop.fs.FileChecksum;
- import org.apache.hadoop.fs.FileStatus;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.FSInputStream;
- import org.apache.hadoop.fs.FSDataInputStream;
- import org.apache.hadoop.fs.FSDataOutputStream;
- import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.fs.permission.FsPermission;
- import org.apache.hadoop.hdfs.server.namenode.ListPathsServlet;
- import org.apache.hadoop.ipc.RemoteException;
- import org.apache.hadoop.security.*;
- import org.apache.hadoop.util.Progressable;
- import org.apache.hadoop.util.StringUtils;
- /** An implementation of a protocol for accessing filesystems over HTTP.
- * The following implementation provides a limited, read-only interface
- * to a filesystem over HTTP.
- * @see org.apache.hadoop.hdfs.server.namenode.ListPathsServlet
- * @see org.apache.hadoop.hdfs.server.namenode.FileDataServlet
- */
- public class HftpFileSystem extends FileSystem {
- static {
- HttpURLConnection.setFollowRedirects(true);
- }
- protected InetSocketAddress nnAddr;
- protected UserGroupInformation ugi;
- protected final Random ran = new Random();
- protected static final SimpleDateFormat df = ListPathsServlet.df;
- @Override
- public void initialize(URI name, Configuration conf) throws IOException {
- super.initialize(name, conf);
- setConf(conf);
- try {
- this.ugi = UnixUserGroupInformation.login(conf, true);
- } catch (LoginException le) {
- throw new IOException(StringUtils.stringifyException(le));
- }
- nnAddr = NetUtils.createSocketAddr(name.toString());
- }
-
- /** randomly pick one from all available IP addresses of a given hostname */
- protected String pickOneAddress(String hostname) throws UnknownHostException {
- if ("localhost".equals(hostname))
- return hostname;
- InetAddress[] addrs = InetAddress.getAllByName(hostname);
- if (addrs.length > 1)
- return addrs[ran.nextInt(addrs.length)].getHostAddress();
- return addrs[0].getHostAddress();
- }
- @Override
- public URI getUri() {
- try {
- return new URI("hftp", null, pickOneAddress(nnAddr.getHostName()), nnAddr.getPort(),
- null, null, null);
- } catch (URISyntaxException e) {
- return null;
- } catch (UnknownHostException e) {
- return null;
- }
- }
- /**
- * Open an HTTP connection to the namenode to read file data and metadata.
- * @param path The path component of the URL
- * @param query The query component of the URL
- */
- protected HttpURLConnection openConnection(String path, String query)
- throws IOException {
- try {
- final URL url = new URI("http", null, pickOneAddress(nnAddr.getHostName()),
- nnAddr.getPort(), path, query, null).toURL();
- if (LOG.isTraceEnabled()) {
- LOG.trace("url=" + url);
- }
- return (HttpURLConnection)url.openConnection();
- } catch (URISyntaxException e) {
- throw (IOException)new IOException().initCause(e);
- }
- }
- @Override
- public FSDataInputStream open(Path f, int buffersize) throws IOException {
- HttpURLConnection connection = null;
- connection = openConnection("/data" + f.toUri().getPath(), "ugi=" + ugi);
- connection.setRequestMethod("GET");
- connection.connect();
- final InputStream in = connection.getInputStream();
- return new FSDataInputStream(new FSInputStream() {
- public int read() throws IOException {
- return in.read();
- }
- public int read(byte[] b, int off, int len) throws IOException {
- return in.read(b, off, len);
- }
- public void close() throws IOException {
- in.close();
- }
- public void seek(long pos) throws IOException {
- throw new IOException("Can't seek!");
- }
- public long getPos() throws IOException {
- throw new IOException("Position unknown!");
- }
- public boolean seekToNewSource(long targetPos) throws IOException {
- return false;
- }
- });
- }
- /** Class to parse and store a listing reply from the server. */
- class LsParser extends DefaultHandler {
- ArrayList<FileStatus> fslist = new ArrayList<FileStatus>();
- public void startElement(String ns, String localname, String qname,
- Attributes attrs) throws SAXException {
- if ("listing".equals(qname)) return;
- if (!"file".equals(qname) && !"directory".equals(qname)) {
- if (RemoteException.class.getSimpleName().equals(qname)) {
- throw new SAXException(RemoteException.valueOf(attrs));
- }
- throw new SAXException("Unrecognized entry: " + qname);
- }
- long modif;
- long atime = 0;
- try {
- modif = df.parse(attrs.getValue("modified")).getTime();
- String astr = attrs.getValue("accesstime");
- if (astr != null) {
- atime = df.parse(astr).getTime();
- }
- } catch (ParseException e) { throw new SAXException(e); }
- FileStatus fs = "file".equals(qname)
- ? new FileStatus(
- Long.valueOf(attrs.getValue("size")).longValue(), false,
- Short.valueOf(attrs.getValue("replication")).shortValue(),
- Long.valueOf(attrs.getValue("blocksize")).longValue(),
- modif, atime, FsPermission.valueOf(attrs.getValue("permission")),
- attrs.getValue("owner"), attrs.getValue("group"),
- new Path(getUri().toString(), attrs.getValue("path"))
- .makeQualified(HftpFileSystem.this))
- : new FileStatus(0L, true, 0, 0L,
- modif, atime, FsPermission.valueOf(attrs.getValue("permission")),
- attrs.getValue("owner"), attrs.getValue("group"),
- new Path(getUri().toString(), attrs.getValue("path"))
- .makeQualified(HftpFileSystem.this));
- fslist.add(fs);
- }
- private void fetchList(String path, boolean recur) throws IOException {
- try {
- XMLReader xr = XMLReaderFactory.createXMLReader();
- xr.setContentHandler(this);
- HttpURLConnection connection = openConnection("/listPaths" + path,
- "ugi=" + ugi + (recur? "&recursive=yes" : ""));
- connection.setRequestMethod("GET");
- connection.connect();
- InputStream resp = connection.getInputStream();
- xr.parse(new InputSource(resp));
- } catch(SAXException e) {
- final Exception embedded = e.getException();
- if (embedded != null && embedded instanceof IOException) {
- throw (IOException)embedded;
- }
- throw new IOException("invalid xml directory content", e);
- }
- }
- public FileStatus getFileStatus(Path f) throws IOException {
- fetchList(f.toUri().getPath(), false);
- if (fslist.size() == 0) {
- throw new FileNotFoundException("File does not exist: " + f);
- }
- return fslist.get(0);
- }
- public FileStatus[] listStatus(Path f, boolean recur) throws IOException {
- fetchList(f.toUri().getPath(), recur);
- if (fslist.size() > 0 && (fslist.size() != 1 || fslist.get(0).isDir())) {
- fslist.remove(0);
- }
- return fslist.toArray(new FileStatus[0]);
- }
- public FileStatus[] listStatus(Path f) throws IOException {
- return listStatus(f, false);
- }
- }
- @Override
- public FileStatus[] listStatus(Path f) throws IOException {
- LsParser lsparser = new LsParser();
- return lsparser.listStatus(f);
- }
- @Override
- public FileStatus getFileStatus(Path f) throws IOException {
- LsParser lsparser = new LsParser();
- return lsparser.getFileStatus(f);
- }
- private class ChecksumParser extends DefaultHandler {
- private FileChecksum filechecksum;
- /** {@inheritDoc} */
- public void startElement(String ns, String localname, String qname,
- Attributes attrs) throws SAXException {
- if (!MD5MD5CRC32FileChecksum.class.getName().equals(qname)) {
- if (RemoteException.class.getSimpleName().equals(qname)) {
- throw new SAXException(RemoteException.valueOf(attrs));
- }
- throw new SAXException("Unrecognized entry: " + qname);
- }
- filechecksum = MD5MD5CRC32FileChecksum.valueOf(attrs);
- }
- private FileChecksum getFileChecksum(Path f) throws IOException {
- final HttpURLConnection connection = openConnection(
- "/fileChecksum" + f, "ugi=" + ugi);
- try {
- final XMLReader xr = XMLReaderFactory.createXMLReader();
- xr.setContentHandler(this);
- connection.setRequestMethod("GET");
- connection.connect();
- xr.parse(new InputSource(connection.getInputStream()));
- } catch(SAXException e) {
- final Exception embedded = e.getException();
- if (embedded != null && embedded instanceof IOException) {
- throw (IOException)embedded;
- }
- throw new IOException("invalid xml directory content", e);
- } finally {
- connection.disconnect();
- }
- return filechecksum;
- }
- }
- /** {@inheritDoc} */
- public FileChecksum getFileChecksum(Path f) throws IOException {
- return new ChecksumParser().getFileChecksum(f);
- }
- @Override
- public Path getWorkingDirectory() {
- return new Path("/").makeQualified(this);
- }
- @Override
- public void setWorkingDirectory(Path f) { }
- /** This optional operation is not yet supported. */
- public FSDataOutputStream append(Path f, int bufferSize,
- Progressable progress) throws IOException {
- throw new IOException("Not supported");
- }
- @Override
- public FSDataOutputStream create(Path f, FsPermission permission,
- boolean overwrite, int bufferSize,
- short replication, long blockSize,
- Progressable progress) throws IOException {
- throw new IOException("Not supported");
- }
- @Override
- public boolean rename(Path src, Path dst) throws IOException {
- throw new IOException("Not supported");
- }
- @Override
- /*
- * @deprecated Use delete(path, boolean)
- */
- @Deprecated
- public boolean delete(Path f) throws IOException {
- throw new IOException("Not supported");
- }
-
- @Override
- public boolean delete(Path f, boolean recursive) throws IOException {
- throw new IOException("Not supported");
- }
-
- @Override
- public boolean mkdirs(Path f, FsPermission permission) throws IOException {
- throw new IOException("Not supported");
- }
- }