RawLocalFileSystem.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:14k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.fs;
  19. import java.io.*;
  20. import java.net.URI;
  21. import java.nio.ByteBuffer;
  22. import java.nio.channels.FileLock;
  23. import java.util.*;
  24. import org.apache.hadoop.conf.Configuration;
  25. import org.apache.hadoop.fs.permission.*;
  26. import org.apache.hadoop.util.Progressable;
  27. import org.apache.hadoop.util.StringUtils;
  28. import org.apache.hadoop.util.Shell;
  29. /****************************************************************
  30.  * Implement the FileSystem API for the raw local filesystem.
  31.  *
  32.  *****************************************************************/
  33. public class RawLocalFileSystem extends FileSystem {
  34.   static final URI NAME = URI.create("file:///");
  35.   private Path workingDir;
  36.   
  37.   public RawLocalFileSystem() {
  38.     workingDir = new Path(System.getProperty("user.dir")).makeQualified(this);
  39.   }
  40.   
  41.   /** Convert a path to a File. */
  42.   public File pathToFile(Path path) {
  43.     checkPath(path);
  44.     if (!path.isAbsolute()) {
  45.       path = new Path(getWorkingDirectory(), path);
  46.     }
  47.     return new File(path.toUri().getPath());
  48.   }
  49.   public URI getUri() { return NAME; }
  50.   
  51.   public void initialize(URI uri, Configuration conf) throws IOException {
  52.     super.initialize(uri, conf);
  53.     setConf(conf);
  54.   }
  55.   
  56.   class TrackingFileInputStream extends FileInputStream {
  57.     public TrackingFileInputStream(File f) throws IOException {
  58.       super(f);
  59.     }
  60.     
  61.     public int read() throws IOException {
  62.       int result = super.read();
  63.       if (result != -1) {
  64.         statistics.incrementBytesRead(1);
  65.       }
  66.       return result;
  67.     }
  68.     
  69.     public int read(byte[] data) throws IOException {
  70.       int result = super.read(data);
  71.       if (result != -1) {
  72.         statistics.incrementBytesRead(result);
  73.       }
  74.       return result;
  75.     }
  76.     
  77.     public int read(byte[] data, int offset, int length) throws IOException {
  78.       int result = super.read(data, offset, length);
  79.       if (result != -1) {
  80.         statistics.incrementBytesRead(result);
  81.       }
  82.       return result;
  83.     }
  84.   }
  85.   /*******************************************************
  86.    * For open()'s FSInputStream
  87.    *******************************************************/
  88.   class LocalFSFileInputStream extends FSInputStream {
  89.     FileInputStream fis;
  90.     private long position;
  91.     public LocalFSFileInputStream(Path f) throws IOException {
  92.       this.fis = new TrackingFileInputStream(pathToFile(f));
  93.     }
  94.     
  95.     public void seek(long pos) throws IOException {
  96.       fis.getChannel().position(pos);
  97.       this.position = pos;
  98.     }
  99.     
  100.     public long getPos() throws IOException {
  101.       return this.position;
  102.     }
  103.     
  104.     public boolean seekToNewSource(long targetPos) throws IOException {
  105.       return false;
  106.     }
  107.     
  108.     /*
  109.      * Just forward to the fis
  110.      */
  111.     public int available() throws IOException { return fis.available(); }
  112.     public void close() throws IOException { fis.close(); }
  113.     public boolean markSupport() { return false; }
  114.     
  115.     public int read() throws IOException {
  116.       try {
  117.         int value = fis.read();
  118.         if (value >= 0) {
  119.           this.position++;
  120.         }
  121.         return value;
  122.       } catch (IOException e) {                 // unexpected exception
  123.         throw new FSError(e);                   // assume native fs error
  124.       }
  125.     }
  126.     
  127.     public int read(byte[] b, int off, int len) throws IOException {
  128.       try {
  129.         int value = fis.read(b, off, len);
  130.         if (value > 0) {
  131.           this.position += value;
  132.         }
  133.         return value;
  134.       } catch (IOException e) {                 // unexpected exception
  135.         throw new FSError(e);                   // assume native fs error
  136.       }
  137.     }
  138.     
  139.     public int read(long position, byte[] b, int off, int len)
  140.       throws IOException {
  141.       ByteBuffer bb = ByteBuffer.wrap(b, off, len);
  142.       try {
  143.         return fis.getChannel().read(bb, position);
  144.       } catch (IOException e) {
  145.         throw new FSError(e);
  146.       }
  147.     }
  148.     
  149.     public long skip(long n) throws IOException {
  150.       long value = fis.skip(n);
  151.       if (value > 0) {
  152.         this.position += value;
  153.       }
  154.       return value;
  155.     }
  156.   }
  157.   
  158.   public FSDataInputStream open(Path f, int bufferSize) throws IOException {
  159.     if (!exists(f)) {
  160.       throw new FileNotFoundException(f.toString());
  161.     }
  162.     return new FSDataInputStream(new BufferedFSInputStream(
  163.         new LocalFSFileInputStream(f), bufferSize));
  164.   }
  165.   
  166.   /*********************************************************
  167.    * For create()'s FSOutputStream.
  168.    *********************************************************/
  169.   class LocalFSFileOutputStream extends OutputStream implements Syncable {
  170.     FileOutputStream fos;
  171.     
  172.     private LocalFSFileOutputStream(Path f, boolean append) throws IOException {
  173.       this.fos = new FileOutputStream(pathToFile(f), append);
  174.     }
  175.     
  176.     /*
  177.      * Just forward to the fos
  178.      */
  179.     public void close() throws IOException { fos.close(); }
  180.     public void flush() throws IOException { fos.flush(); }
  181.     public void write(byte[] b, int off, int len) throws IOException {
  182.       try {
  183.         fos.write(b, off, len);
  184.       } catch (IOException e) {                // unexpected exception
  185.         throw new FSError(e);                  // assume native fs error
  186.       }
  187.     }
  188.     
  189.     public void write(int b) throws IOException {
  190.       try {
  191.         fos.write(b);
  192.       } catch (IOException e) {              // unexpected exception
  193.         throw new FSError(e);                // assume native fs error
  194.       }
  195.     }
  196.     /** {@inheritDoc} */
  197.     public void sync() throws IOException {
  198.       fos.getFD().sync();      
  199.     }
  200.   }
  201.   
  202.   /** {@inheritDoc} */
  203.   public FSDataOutputStream append(Path f, int bufferSize,
  204.       Progressable progress) throws IOException {
  205.     if (!exists(f)) {
  206.       throw new FileNotFoundException("File " + f + " not found.");
  207.     }
  208.     if (getFileStatus(f).isDir()) {
  209.       throw new IOException("Cannot append to a diretory (=" + f + " ).");
  210.     }
  211.     return new FSDataOutputStream(new BufferedOutputStream(
  212.         new LocalFSFileOutputStream(f, true), bufferSize), statistics);
  213.   }
  214.   /** {@inheritDoc} */
  215.   public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize,
  216.                                    short replication, long blockSize, Progressable progress)
  217.     throws IOException {
  218.     if (exists(f) && !overwrite) {
  219.       throw new IOException("File already exists:"+f);
  220.     }
  221.     Path parent = f.getParent();
  222.     if (parent != null && !mkdirs(parent)) {
  223.       throw new IOException("Mkdirs failed to create " + parent.toString());
  224.     }
  225.     return new FSDataOutputStream(new BufferedOutputStream(
  226.         new LocalFSFileOutputStream(f, false), bufferSize), statistics);
  227.   }
  228.   /** {@inheritDoc} */
  229.   @Override
  230.   public FSDataOutputStream create(Path f, FsPermission permission,
  231.       boolean overwrite, int bufferSize, short replication, long blockSize,
  232.       Progressable progress) throws IOException {
  233.     FSDataOutputStream out = create(f,
  234.         overwrite, bufferSize, replication, blockSize, progress);
  235.     setPermission(f, permission);
  236.     return out;
  237.   }
  238.   
  239.   public boolean rename(Path src, Path dst) throws IOException {
  240.     if (pathToFile(src).renameTo(pathToFile(dst))) {
  241.       return true;
  242.     }
  243.     return FileUtil.copy(this, src, this, dst, true, getConf());
  244.   }
  245.   
  246.   @Deprecated
  247.   public boolean delete(Path p) throws IOException {
  248.     return delete(p, true);
  249.   }
  250.   
  251.   public boolean delete(Path p, boolean recursive) throws IOException {
  252.     File f = pathToFile(p);
  253.     if (f.isFile()) {
  254.       return f.delete();
  255.     } else if ((!recursive) && f.isDirectory() && 
  256.         (f.listFiles().length != 0)) {
  257.       throw new IOException("Directory " + f.toString() + " is not empty");
  258.     }
  259.     return FileUtil.fullyDelete(f);
  260.   }
  261.  
  262.   public FileStatus[] listStatus(Path f) throws IOException {
  263.     File localf = pathToFile(f);
  264.     FileStatus[] results;
  265.     if (!localf.exists()) {
  266.       return null;
  267.     }
  268.     if (localf.isFile()) {
  269.       return new FileStatus[] {
  270.           new RawLocalFileStatus(localf, getDefaultBlockSize(), this) };
  271.     }
  272.     String[] names = localf.list();
  273.     if (names == null) {
  274.       return null;
  275.     }
  276.     results = new FileStatus[names.length];
  277.     for (int i = 0; i < names.length; i++) {
  278.       results[i] = getFileStatus(new Path(f, names[i]));
  279.     }
  280.     return results;
  281.   }
  282.   /**
  283.    * Creates the specified directory hierarchy. Does not
  284.    * treat existence as an error.
  285.    */
  286.   public boolean mkdirs(Path f) throws IOException {
  287.     Path parent = f.getParent();
  288.     File p2f = pathToFile(f);
  289.     return (parent == null || mkdirs(parent)) &&
  290.       (p2f.mkdir() || p2f.isDirectory());
  291.   }
  292.   /** {@inheritDoc} */
  293.   @Override
  294.   public boolean mkdirs(Path f, FsPermission permission) throws IOException {
  295.     boolean b = mkdirs(f);
  296.     setPermission(f, permission);
  297.     return b;
  298.   }
  299.   
  300.   @Override
  301.   public Path getHomeDirectory() {
  302.     return new Path(System.getProperty("user.home")).makeQualified(this);
  303.   }
  304.   /**
  305.    * Set the working directory to the given directory.
  306.    */
  307.   @Override
  308.   public void setWorkingDirectory(Path newDir) {
  309.     workingDir = newDir;
  310.   }
  311.   
  312.   @Override
  313.   public Path getWorkingDirectory() {
  314.     return workingDir;
  315.   }
  316.   
  317.   // In the case of the local filesystem, we can just rename the file.
  318.   public void moveFromLocalFile(Path src, Path dst) throws IOException {
  319.     rename(src, dst);
  320.   }
  321.   
  322.   // We can write output directly to the final location
  323.   public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
  324.     throws IOException {
  325.     return fsOutputFile;
  326.   }
  327.   
  328.   // It's in the right place - nothing to do.
  329.   public void completeLocalOutput(Path fsWorkingFile, Path tmpLocalFile)
  330.     throws IOException {
  331.   }
  332.   
  333.   public void close() throws IOException {
  334.     super.close();
  335.   }
  336.   
  337.   public String toString() {
  338.     return "LocalFS";
  339.   }
  340.   
  341.   public FileStatus getFileStatus(Path f) throws IOException {
  342.     File path = pathToFile(f);
  343.     if (path.exists()) {
  344.       return new RawLocalFileStatus(pathToFile(f), getDefaultBlockSize(), this);
  345.     } else {
  346.       throw new FileNotFoundException( "File " + f + " does not exist.");
  347.     }
  348.   }
  349.   static class RawLocalFileStatus extends FileStatus {
  350.     /* We can add extra fields here. It breaks at least CopyFiles.FilePair().
  351.      * We recognize if the information is already loaded by check if
  352.      * onwer.equals("").
  353.      */
  354.     private boolean isPermissionLoaded() {
  355.       return !super.getOwner().equals(""); 
  356.     }
  357.     
  358.     RawLocalFileStatus(File f, long defaultBlockSize, FileSystem fs) {
  359.       super(f.length(), f.isDirectory(), 1, defaultBlockSize,
  360.             f.lastModified(), new Path(f.getPath()).makeQualified(fs));
  361.     }
  362.     
  363.     @Override
  364.     public FsPermission getPermission() {
  365.       if (!isPermissionLoaded()) {
  366.         loadPermissionInfo();
  367.       }
  368.       return super.getPermission();
  369.     }
  370.     @Override
  371.     public String getOwner() {
  372.       if (!isPermissionLoaded()) {
  373.         loadPermissionInfo();
  374.       }
  375.       return super.getOwner();
  376.     }
  377.     @Override
  378.     public String getGroup() {
  379.       if (!isPermissionLoaded()) {
  380.         loadPermissionInfo();
  381.       }
  382.       return super.getGroup();
  383.     }
  384.     /// loads permissions, owner, and group from `ls -ld`
  385.     private void loadPermissionInfo() {
  386.       IOException e = null;
  387.       try {
  388.         StringTokenizer t = new StringTokenizer(
  389.             execCommand(new File(getPath().toUri()), 
  390.                         Shell.getGET_PERMISSION_COMMAND()));
  391.         //expected format
  392.         //-rw-------    1 username groupname ...
  393.         String permission = t.nextToken();
  394.         if (permission.length() > 10) { //files with ACLs might have a '+'
  395.           permission = permission.substring(0, 10);
  396.         }
  397.         setPermission(FsPermission.valueOf(permission));
  398.         t.nextToken();
  399.         setOwner(t.nextToken());
  400.         setGroup(t.nextToken());
  401.       } catch (Shell.ExitCodeException ioe) {
  402.         if (ioe.getExitCode() != 1) {
  403.           e = ioe;
  404.         } else {
  405.           setPermission(null);
  406.           setOwner(null);
  407.           setGroup(null);
  408.         }
  409.       } catch (IOException ioe) {
  410.         e = ioe;
  411.       } finally {
  412.         if (e != null) {
  413.           throw new RuntimeException("Error while running command to get " +
  414.                                      "file permissions : " + 
  415.                                      StringUtils.stringifyException(e));
  416.         }
  417.       }
  418.     }
  419.     @Override
  420.     public void write(DataOutput out) throws IOException {
  421.       if (!isPermissionLoaded()) {
  422.         loadPermissionInfo();
  423.       }
  424.       super.write(out);
  425.     }
  426.   }
  427.   /**
  428.    * Use the command chown to set owner.
  429.    */
  430.   @Override
  431.   public void setOwner(Path p, String username, String groupname
  432.       ) throws IOException {
  433.     if (username == null && groupname == null) {
  434.       throw new IOException("username == null && groupname == null");
  435.     }
  436.     if (username == null) {
  437.       execCommand(pathToFile(p), Shell.SET_GROUP_COMMAND, groupname); 
  438.     } else {
  439.       //OWNER[:[GROUP]]
  440.       String s = username + (groupname == null? "": ":" + groupname);
  441.       execCommand(pathToFile(p), Shell.SET_OWNER_COMMAND, s);
  442.     }
  443.   }
  444.   /**
  445.    * Use the command chmod to set permission.
  446.    */
  447.   @Override
  448.   public void setPermission(Path p, FsPermission permission
  449.       ) throws IOException {
  450.     execCommand(pathToFile(p), Shell.SET_PERMISSION_COMMAND,
  451.         String.format("%04o", permission.toShort()));
  452.   }
  453.   private static String execCommand(File f, String... cmd) throws IOException {
  454.     String[] args = new String[cmd.length + 1];
  455.     System.arraycopy(cmd, 0, args, 0, cmd.length);
  456.     args[cmd.length] = f.getCanonicalPath();
  457.     String output = Shell.execCommand(args);
  458.     return output;
  459.   }
  460. }