DistributedFileSystem.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:15k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfs;
  19. import java.io.*;
  20. import java.net.*;
  21. import org.apache.hadoop.fs.permission.FsPermission;
  22. import org.apache.hadoop.fs.*;
  23. import org.apache.hadoop.conf.Configuration;
  24. import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
  25. import org.apache.hadoop.hdfs.protocol.FSConstants;
  26. import org.apache.hadoop.hdfs.protocol.Block;
  27. import org.apache.hadoop.hdfs.protocol.LocatedBlock;
  28. import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
  29. import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction;
  30. import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
  31. import org.apache.hadoop.hdfs.server.namenode.NameNode;
  32. import org.apache.hadoop.hdfs.DFSClient.DFSOutputStream;
  33. import org.apache.hadoop.security.AccessControlException;
  34. import org.apache.hadoop.util.Progressable;
  35. /****************************************************************
  36.  * Implementation of the abstract FileSystem for the DFS system.
  37.  * This object is the way end-user code interacts with a Hadoop
  38.  * DistributedFileSystem.
  39.  *
  40.  *****************************************************************/
  41. public class DistributedFileSystem extends FileSystem {
  42.   private Path workingDir;
  43.   private URI uri;
  44.   DFSClient dfs;
  45.   private boolean verifyChecksum = true;
  46.   
  47.   static{
  48.     Configuration.addDefaultResource("hdfs-default.xml");
  49.     Configuration.addDefaultResource("hdfs-site.xml");
  50.   }
  51.   public DistributedFileSystem() {
  52.   }
  53.   /** @deprecated */
  54.   public DistributedFileSystem(InetSocketAddress namenode,
  55.     Configuration conf) throws IOException {
  56.     initialize(NameNode.getUri(namenode), conf);
  57.   }
  58.   /** @deprecated */
  59.   public String getName() { return uri.getAuthority(); }
  60.   public URI getUri() { return uri; }
  61.   public void initialize(URI uri, Configuration conf) throws IOException {
  62.     super.initialize(uri, conf);
  63.     setConf(conf);
  64.     String host = uri.getHost();
  65.     if (host == null) {
  66.       throw new IOException("Incomplete HDFS URI, no host: "+ uri);
  67.     }
  68.     InetSocketAddress namenode = NameNode.getAddress(uri.getAuthority());
  69.     this.dfs = new DFSClient(namenode, conf, statistics);
  70.     this.uri = NameNode.getUri(namenode);
  71.     this.workingDir = getHomeDirectory();
  72.   }
  73.   /** Permit paths which explicitly specify the default port. */
  74.   protected void checkPath(Path path) {
  75.     URI thisUri = this.getUri();
  76.     URI thatUri = path.toUri();
  77.     String thatAuthority = thatUri.getAuthority();
  78.     if (thatUri.getScheme() != null
  79.         && thatUri.getScheme().equalsIgnoreCase(thisUri.getScheme())
  80.         && thatUri.getPort() == NameNode.DEFAULT_PORT
  81.         && thisUri.getPort() == -1
  82.         && thatAuthority.substring(0,thatAuthority.indexOf(":"))
  83.         .equalsIgnoreCase(thisUri.getAuthority()))
  84.       return;
  85.     super.checkPath(path);
  86.   }
  87.   /** Normalize paths that explicitly specify the default port. */
  88.   public Path makeQualified(Path path) {
  89.     URI thisUri = this.getUri();
  90.     URI thatUri = path.toUri();
  91.     String thatAuthority = thatUri.getAuthority();
  92.     if (thatUri.getScheme() != null
  93.         && thatUri.getScheme().equalsIgnoreCase(thisUri.getScheme())
  94.         && thatUri.getPort() == NameNode.DEFAULT_PORT
  95.         && thisUri.getPort() == -1
  96.         && thatAuthority.substring(0,thatAuthority.indexOf(":"))
  97.         .equalsIgnoreCase(thisUri.getAuthority())) {
  98.       path = new Path(thisUri.getScheme(), thisUri.getAuthority(),
  99.                       thatUri.getPath());
  100.     }
  101.     return super.makeQualified(path);
  102.   }
  103.   public Path getWorkingDirectory() {
  104.     return workingDir;
  105.   }
  106.   public long getDefaultBlockSize() {
  107.     return dfs.getDefaultBlockSize();
  108.   }
  109.   public short getDefaultReplication() {
  110.     return dfs.getDefaultReplication();
  111.   }
  112.   private Path makeAbsolute(Path f) {
  113.     if (f.isAbsolute()) {
  114.       return f;
  115.     } else {
  116.       return new Path(workingDir, f);
  117.     }
  118.   }
  119.   public void setWorkingDirectory(Path dir) {
  120.     String result = makeAbsolute(dir).toUri().getPath();
  121.     if (!DFSUtil.isValidName(result)) {
  122.       throw new IllegalArgumentException("Invalid DFS directory name " + 
  123.                                          result);
  124.     }
  125.     workingDir = makeAbsolute(dir);
  126.   }
  127.   /** {@inheritDoc} */
  128.   public Path getHomeDirectory() {
  129.     return new Path("/user/" + dfs.ugi.getUserName()).makeQualified(this);
  130.   }
  131.   private String getPathName(Path file) {
  132.     checkPath(file);
  133.     String result = makeAbsolute(file).toUri().getPath();
  134.     if (!DFSUtil.isValidName(result)) {
  135.       throw new IllegalArgumentException("Pathname " + result + " from " +
  136.                                          file+" is not a valid DFS filename.");
  137.     }
  138.     return result;
  139.   }
  140.   
  141.   public BlockLocation[] getFileBlockLocations(FileStatus file, long start,
  142.       long len) throws IOException {
  143.     if (file == null) {
  144.       return null;
  145.     }
  146.     return dfs.getBlockLocations(getPathName(file.getPath()), start, len);
  147.   }
  148.   public void setVerifyChecksum(boolean verifyChecksum) {
  149.     this.verifyChecksum = verifyChecksum;
  150.   }
  151.   public FSDataInputStream open(Path f, int bufferSize) throws IOException {
  152.     return new DFSClient.DFSDataInputStream(
  153.           dfs.open(getPathName(f), bufferSize, verifyChecksum, statistics));
  154.   }
  155.   /** This optional operation is not yet supported. */
  156.   public FSDataOutputStream append(Path f, int bufferSize,
  157.       Progressable progress) throws IOException {
  158.     DFSOutputStream op = (DFSOutputStream)dfs.append(getPathName(f), bufferSize, progress);
  159.     return new FSDataOutputStream(op, statistics, op.getInitialLen());
  160.   }
  161.   public FSDataOutputStream create(Path f, FsPermission permission,
  162.     boolean overwrite,
  163.     int bufferSize, short replication, long blockSize,
  164.     Progressable progress) throws IOException {
  165.     return new FSDataOutputStream
  166.        (dfs.create(getPathName(f), permission,
  167.                    overwrite, replication, blockSize, progress, bufferSize),
  168.         statistics);
  169.   }
  170.   public boolean setReplication(Path src, 
  171.                                 short replication
  172.                                ) throws IOException {
  173.     return dfs.setReplication(getPathName(src), replication);
  174.   }
  175.   /**
  176.    * Rename files/dirs
  177.    */
  178.   public boolean rename(Path src, Path dst) throws IOException {
  179.     return dfs.rename(getPathName(src), getPathName(dst));
  180.   }
  181.   /**
  182.    * Get rid of Path f, whether a true file or dir.
  183.    */
  184.   @Deprecated
  185.   public boolean delete(Path f) throws IOException {
  186.     return dfs.delete(getPathName(f));
  187.   }
  188.   
  189.   /**
  190.    * requires a boolean check to delete a non 
  191.    * empty directory recursively.
  192.    */
  193.   public boolean delete(Path f, boolean recursive) throws IOException {
  194.    return dfs.delete(getPathName(f), recursive);
  195.   }
  196.   
  197.   /** {@inheritDoc} */
  198.   public ContentSummary getContentSummary(Path f) throws IOException {
  199.     return dfs.getContentSummary(getPathName(f));
  200.   }
  201.   /** Set a directory's quotas
  202.    * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#setQuota(String, long, long) 
  203.    */
  204.   public void setQuota(Path src, long namespaceQuota, long diskspaceQuota) 
  205.                        throws IOException {
  206.     dfs.setQuota(getPathName(src), namespaceQuota, diskspaceQuota);
  207.   }
  208.   
  209.   private FileStatus makeQualified(FileStatus f) {
  210.     return new FileStatus(f.getLen(), f.isDir(), f.getReplication(),
  211.         f.getBlockSize(), f.getModificationTime(),
  212.         f.getAccessTime(),
  213.         f.getPermission(), f.getOwner(), f.getGroup(),
  214.         f.getPath().makeQualified(this)); // fully-qualify path
  215.   }
  216.   public FileStatus[] listStatus(Path p) throws IOException {
  217.     FileStatus[] infos = dfs.listPaths(getPathName(p));
  218.     if (infos == null) return null;
  219.     FileStatus[] stats = new FileStatus[infos.length];
  220.     for (int i = 0; i < infos.length; i++) {
  221.       stats[i] = makeQualified(infos[i]);
  222.     }
  223.     return stats;
  224.   }
  225.   public boolean mkdirs(Path f, FsPermission permission) throws IOException {
  226.     return dfs.mkdirs(getPathName(f), permission);
  227.   }
  228.   /** {@inheritDoc} */
  229.   public void close() throws IOException {
  230.     try {
  231.       super.processDeleteOnExit();
  232.       dfs.close();
  233.     } finally {
  234.       super.close();
  235.     }
  236.   }
  237.   public String toString() {
  238.     return "DFS[" + dfs + "]";
  239.   }
  240.   public DFSClient getClient() {
  241.     return dfs;
  242.   }        
  243.   
  244.   public static class DiskStatus {
  245.     private long capacity;
  246.     private long dfsUsed;
  247.     private long remaining;
  248.     public DiskStatus(long capacity, long dfsUsed, long remaining) {
  249.       this.capacity = capacity;
  250.       this.dfsUsed = dfsUsed;
  251.       this.remaining = remaining;
  252.     }
  253.     
  254.     public long getCapacity() {
  255.       return capacity;
  256.     }
  257.     public long getDfsUsed() {
  258.       return dfsUsed;
  259.     }
  260.     public long getRemaining() {
  261.       return remaining;
  262.     }
  263.   }
  264.   
  265.   /** Return the disk usage of the filesystem, including total capacity,
  266.    * used space, and remaining space */
  267.   public DiskStatus getDiskStatus() throws IOException {
  268.     return dfs.getDiskStatus();
  269.   }
  270.   
  271.   /** Return the total raw capacity of the filesystem, disregarding
  272.    * replication .*/
  273.   public long getRawCapacity() throws IOException{
  274.     return dfs.totalRawCapacity();
  275.   }
  276.   /** Return the total raw used space in the filesystem, disregarding
  277.    * replication .*/
  278.   public long getRawUsed() throws IOException{
  279.     return dfs.totalRawUsed();
  280.   }
  281.    
  282.   /**
  283.    * Returns count of blocks with no good replicas left. Normally should be
  284.    * zero.
  285.    * 
  286.    * @throws IOException
  287.    */
  288.   public long getMissingBlocksCount() throws IOException {
  289.     return dfs.getMissingBlocksCount();
  290.   }
  291.   /**
  292.    * Returns count of blocks with one of more replica missing.
  293.    * 
  294.    * @throws IOException
  295.    */
  296.   public long getUnderReplicatedBlocksCount() throws IOException {
  297.     return dfs.getUnderReplicatedBlocksCount();
  298.   }
  299.   /**
  300.    * Returns count of blocks with at least one replica marked corrupt.
  301.    * 
  302.    * @throws IOException
  303.    */
  304.   public long getCorruptBlocksCount() throws IOException {
  305.     return dfs.getCorruptBlocksCount();
  306.   }
  307.   /** Return statistics for each datanode. */
  308.   public DatanodeInfo[] getDataNodeStats() throws IOException {
  309.     return dfs.datanodeReport(DatanodeReportType.ALL);
  310.   }
  311.   /**
  312.    * Enter, leave or get safe mode.
  313.    *  
  314.    * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#setSafeMode(
  315.    *    FSConstants.SafeModeAction)
  316.    */
  317.   public boolean setSafeMode(FSConstants.SafeModeAction action) 
  318.   throws IOException {
  319.     return dfs.setSafeMode(action);
  320.   }
  321.   /**
  322.    * Save namespace image.
  323.    * 
  324.    * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#saveNamespace()
  325.    */
  326.   public void saveNamespace() throws AccessControlException, IOException {
  327.     dfs.saveNamespace();
  328.   }
  329.   /**
  330.    * Refreshes the list of hosts and excluded hosts from the configured 
  331.    * files.  
  332.    */
  333.   public void refreshNodes() throws IOException {
  334.     dfs.refreshNodes();
  335.   }
  336.   /**
  337.    * Finalize previously upgraded files system state.
  338.    * @throws IOException
  339.    */
  340.   public void finalizeUpgrade() throws IOException {
  341.     dfs.finalizeUpgrade();
  342.   }
  343.   public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action
  344.   ) throws IOException {
  345.     return dfs.distributedUpgradeProgress(action);
  346.   }
  347.   /*
  348.    * Requests the namenode to dump data strcutures into specified 
  349.    * file.
  350.    */
  351.   public void metaSave(String pathname) throws IOException {
  352.     dfs.metaSave(pathname);
  353.   }
  354.   /**
  355.    * We need to find the blocks that didn't match.  Likely only one 
  356.    * is corrupt but we will report both to the namenode.  In the future,
  357.    * we can consider figuring out exactly which block is corrupt.
  358.    */
  359.   public boolean reportChecksumFailure(Path f, 
  360.     FSDataInputStream in, long inPos, 
  361.     FSDataInputStream sums, long sumsPos) {
  362.     LocatedBlock lblocks[] = new LocatedBlock[2];
  363.     // Find block in data stream.
  364.     DFSClient.DFSDataInputStream dfsIn = (DFSClient.DFSDataInputStream) in;
  365.     Block dataBlock = dfsIn.getCurrentBlock();
  366.     if (dataBlock == null) {
  367.       LOG.error("Error: Current block in data stream is null! ");
  368.       return false;
  369.     }
  370.     DatanodeInfo[] dataNode = {dfsIn.getCurrentDatanode()}; 
  371.     lblocks[0] = new LocatedBlock(dataBlock, dataNode);
  372.     LOG.info("Found checksum error in data stream at block="
  373.         + dataBlock + " on datanode="
  374.         + dataNode[0].getName());
  375.     // Find block in checksum stream
  376.     DFSClient.DFSDataInputStream dfsSums = (DFSClient.DFSDataInputStream) sums;
  377.     Block sumsBlock = dfsSums.getCurrentBlock();
  378.     if (sumsBlock == null) {
  379.       LOG.error("Error: Current block in checksum stream is null! ");
  380.       return false;
  381.     }
  382.     DatanodeInfo[] sumsNode = {dfsSums.getCurrentDatanode()}; 
  383.     lblocks[1] = new LocatedBlock(sumsBlock, sumsNode);
  384.     LOG.info("Found checksum error in checksum stream at block="
  385.         + sumsBlock + " on datanode="
  386.         + sumsNode[0].getName());
  387.     // Ask client to delete blocks.
  388.     dfs.reportChecksumFailure(f.toString(), lblocks);
  389.     return true;
  390.   }
  391.   /**
  392.    * Returns the stat information about the file.
  393.    * @throws FileNotFoundException if the file does not exist.
  394.    */
  395.   public FileStatus getFileStatus(Path f) throws IOException {
  396.     FileStatus fi = dfs.getFileInfo(getPathName(f));
  397.     if (fi != null) {
  398.       return makeQualified(fi);
  399.     } else {
  400.       throw new FileNotFoundException("File does not exist: " + f);
  401.     }
  402.   }
  403.   /** {@inheritDoc} */
  404.   public MD5MD5CRC32FileChecksum getFileChecksum(Path f) throws IOException {
  405.     return dfs.getFileChecksum(getPathName(f));
  406.   }
  407.   /** {@inheritDoc }*/
  408.   public void setPermission(Path p, FsPermission permission
  409.       ) throws IOException {
  410.     dfs.setPermission(getPathName(p), permission);
  411.   }
  412.   /** {@inheritDoc }*/
  413.   public void setOwner(Path p, String username, String groupname
  414.       ) throws IOException {
  415.     if (username == null && groupname == null) {
  416.       throw new IOException("username == null && groupname == null");
  417.     }
  418.     dfs.setOwner(getPathName(p), username, groupname);
  419.   }
  420.   /** {@inheritDoc }*/
  421.   public void setTimes(Path p, long mtime, long atime
  422.       ) throws IOException {
  423.     dfs.setTimes(getPathName(p), mtime, atime);
  424.   }
  425.   
  426.   
  427. }