InMemoryFileSystem.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:14k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.fs;
  19. import java.io.FileNotFoundException;
  20. import java.io.IOException;
  21. import java.io.OutputStream;
  22. import java.net.URI;
  23. import java.util.*;
  24. import org.apache.hadoop.fs.permission.FsPermission;
  25. import org.apache.hadoop.io.DataInputBuffer;
  26. import org.apache.hadoop.conf.Configuration;
  27. import org.apache.hadoop.util.Progressable;
  28. /** An implementation of the in-memory filesystem. This implementation assumes
  29.  * that the file lengths are known ahead of time and the total lengths of all
  30.  * the files is below a certain number (like 100 MB, configurable). Use the API
  31.  * reserveSpaceWithCheckSum(Path f, int size) (see below for a description of
  32.  * the API for reserving space in the FS. The uri of this filesystem starts with
  33.  * ramfs:// .
  34.  */
  35. @Deprecated
  36. public class InMemoryFileSystem extends ChecksumFileSystem {
  37.   private static class RawInMemoryFileSystem extends FileSystem {
  38.     private URI uri;
  39.     private long fsSize;
  40.     private volatile long totalUsed;
  41.     private Path staticWorkingDir;
  42.   
  43.     //pathToFileAttribs is the final place where a file is put after it is closed
  44.     private Map<String, FileAttributes> pathToFileAttribs =
  45.       new HashMap<String, FileAttributes>();
  46.   
  47.     //tempFileAttribs is a temp place which is updated while reserving memory for
  48.     //files we are going to create. It is read in the createRaw method and the
  49.     //temp key/value is discarded. If the file makes it to "close", then it
  50.     //ends up being in the pathToFileAttribs map.
  51.     private Map<String, FileAttributes> tempFileAttribs =
  52.       new HashMap<String, FileAttributes>();
  53.   
  54.     public RawInMemoryFileSystem() {
  55.       setConf(new Configuration());
  56.     }
  57.     public RawInMemoryFileSystem(URI uri, Configuration conf) {
  58.       initialize(uri, conf);
  59.     }
  60.   
  61.     //inherit javadoc
  62.     public void initialize(URI uri, Configuration conf) {
  63.       setConf(conf);
  64.       int size = Integer.parseInt(conf.get("fs.inmemory.size.mb", "100"));
  65.       this.fsSize = size * 1024L * 1024L;
  66.       this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
  67.       String path = this.uri.getPath();
  68.       if (path.length() == 0) {
  69.         path = Path.CUR_DIR;
  70.       }
  71.       this.staticWorkingDir = new Path(path);
  72.       LOG.info("Initialized InMemoryFileSystem: " + uri.toString() + 
  73.                " of size (in bytes): " + fsSize);
  74.     }
  75.     //inherit javadoc
  76.     public URI getUri() {
  77.       return uri;
  78.     }
  79.     private class InMemoryInputStream extends FSInputStream {
  80.       private DataInputBuffer din = new DataInputBuffer();
  81.       private FileAttributes fAttr;
  82.     
  83.       public InMemoryInputStream(Path f) throws IOException {
  84.         synchronized (RawInMemoryFileSystem.this) {
  85.           fAttr = pathToFileAttribs.get(getPath(f));
  86.           if (fAttr == null) { 
  87.             throw new FileNotFoundException("File " + f + " does not exist");
  88.           }                            
  89.           din.reset(fAttr.data, 0, fAttr.size);
  90.         }
  91.       }
  92.     
  93.       public long getPos() throws IOException {
  94.         return din.getPosition();
  95.       }
  96.     
  97.       public void seek(long pos) throws IOException {
  98.         if ((int)pos > fAttr.size)
  99.           throw new IOException("Cannot seek after EOF");
  100.         din.reset(fAttr.data, (int)pos, fAttr.size - (int)pos);
  101.       }
  102.     
  103.       public boolean seekToNewSource(long targetPos) throws IOException {
  104.         return false;
  105.       }
  106.       public int available() throws IOException {
  107.         return din.available(); 
  108.       }
  109.       public boolean markSupport() { return false; }
  110.       public int read() throws IOException {
  111.         return din.read();
  112.       }
  113.       public int read(byte[] b, int off, int len) throws IOException {
  114.         return din.read(b, off, len);
  115.       }
  116.     
  117.       public long skip(long n) throws IOException { return din.skip(n); }
  118.     }
  119.     public FSDataInputStream open(Path f, int bufferSize) throws IOException {
  120.       return new FSDataInputStream(new InMemoryInputStream(f));
  121.     }
  122.     private class InMemoryOutputStream extends OutputStream {
  123.       private int count;
  124.       private FileAttributes fAttr;
  125.       private Path f;
  126.     
  127.       public InMemoryOutputStream(Path f, FileAttributes fAttr) 
  128.         throws IOException {
  129.         this.fAttr = fAttr;
  130.         this.f = f;
  131.       }
  132.     
  133.       public long getPos() throws IOException {
  134.         return count;
  135.       }
  136.     
  137.       public void close() throws IOException {
  138.         synchronized (RawInMemoryFileSystem.this) {
  139.           pathToFileAttribs.put(getPath(f), fAttr);
  140.         }
  141.       }
  142.     
  143.       public void write(byte[] b, int off, int len) throws IOException {
  144.         if ((off < 0) || (off > b.length) || (len < 0) ||
  145.             ((off + len) > b.length) || ((off + len) < 0)) {
  146.           throw new IndexOutOfBoundsException();
  147.         } else if (len == 0) {
  148.           return;
  149.         }
  150.         int newcount = count + len;
  151.         if (newcount > fAttr.size) {
  152.           throw new IOException("Insufficient space");
  153.         }
  154.         System.arraycopy(b, off, fAttr.data, count, len);
  155.         count = newcount;
  156.       }
  157.     
  158.       public void write(int b) throws IOException {
  159.         int newcount = count + 1;
  160.         if (newcount > fAttr.size) {
  161.           throw new IOException("Insufficient space");
  162.         }
  163.         fAttr.data[count] = (byte)b;
  164.         count = newcount;
  165.       }
  166.     }
  167.   
  168.     /** This optional operation is not yet supported. */
  169.     public FSDataOutputStream append(Path f, int bufferSize,
  170.         Progressable progress) throws IOException {
  171.       throw new IOException("Not supported");
  172.     }
  173.     /**
  174.      * @param permission Currently ignored.
  175.      */
  176.     public FSDataOutputStream create(Path f, FsPermission permission,
  177.                                      boolean overwrite, int bufferSize,
  178.                                      short replication, long blockSize, Progressable progress)
  179.       throws IOException {
  180.       synchronized (this) {
  181.         if (exists(f) && !overwrite) {
  182.           throw new IOException("File already exists:"+f);
  183.         }
  184.         FileAttributes fAttr = tempFileAttribs.remove(getPath(f));
  185.         if (fAttr != null)
  186.           return create(f, fAttr);
  187.         return null;
  188.       }
  189.     }
  190.   
  191.     public FSDataOutputStream create(Path f, FileAttributes fAttr)
  192.       throws IOException {
  193.       // the path is not added into the filesystem (in the pathToFileAttribs
  194.       // map) until close is called on the outputstream that this method is
  195.       // going to return
  196.       // Create an output stream out of data byte array
  197.       return new FSDataOutputStream(new InMemoryOutputStream(f, fAttr), 
  198.                                     statistics);
  199.     }
  200.     public void close() throws IOException {
  201.       super.close();
  202.       synchronized (this) {
  203.         if (pathToFileAttribs != null) { 
  204.           pathToFileAttribs.clear();
  205.         }
  206.         pathToFileAttribs = null;
  207.         if (tempFileAttribs != null) {
  208.           tempFileAttribs.clear();
  209.         }
  210.         tempFileAttribs = null;
  211.       }
  212.     }
  213.     public boolean setReplication(Path src, short replication)
  214.       throws IOException {
  215.       return true;
  216.     }
  217.     public boolean rename(Path src, Path dst) throws IOException {
  218.       synchronized (this) {
  219.         if (exists(dst)) {
  220.           throw new IOException ("Path " + dst + " already exists");
  221.         }
  222.         FileAttributes fAttr = pathToFileAttribs.remove(getPath(src));
  223.         if (fAttr == null) return false;
  224.         pathToFileAttribs.put(getPath(dst), fAttr);
  225.         return true;
  226.       }
  227.     }
  228.     
  229.     @Deprecated
  230.     public boolean delete(Path f) throws IOException {
  231.       return delete(f, true);
  232.     }
  233.     
  234.     public boolean delete(Path f, boolean recursive) throws IOException {
  235.       synchronized (this) {
  236.         FileAttributes fAttr = pathToFileAttribs.remove(getPath(f));
  237.         if (fAttr != null) {
  238.           fAttr.data = null;
  239.           totalUsed -= fAttr.size;
  240.           return true;
  241.         }
  242.         return false;
  243.       }
  244.     }
  245.   
  246.     /**
  247.      * Directory operations are not supported
  248.      */
  249.     public FileStatus[] listStatus(Path f) throws IOException {
  250.       return null;
  251.     }
  252.     public void setWorkingDirectory(Path new_dir) {
  253.       staticWorkingDir = new_dir;
  254.     }
  255.   
  256.     public Path getWorkingDirectory() {
  257.       return staticWorkingDir;
  258.     }
  259.     /**
  260.      * @param permission Currently ignored.
  261.      */
  262.     public boolean mkdirs(Path f, FsPermission permission) throws IOException {
  263.       return true;
  264.     }
  265.   
  266.     public FileStatus getFileStatus(Path f) throws IOException {
  267.       synchronized (this) {
  268.         FileAttributes attr = pathToFileAttribs.get(getPath(f));
  269.         if (attr==null) {
  270.           throw new FileNotFoundException("File " + f + " does not exist.");
  271.         }
  272.         return new InMemoryFileStatus(f.makeQualified(this), attr);
  273.       }
  274.     }
  275.   
  276.     /** Some APIs exclusively for InMemoryFileSystem */
  277.     /** Register a path with its size. */
  278.     public boolean reserveSpace(Path f, long size) {
  279.       synchronized (this) {
  280.         if (!canFitInMemory(size))
  281.           return false;
  282.         FileAttributes fileAttr;
  283.         try {
  284.           fileAttr = new FileAttributes((int)size);
  285.         } catch (OutOfMemoryError o) {
  286.           return false;
  287.         }
  288.         totalUsed += size;
  289.         tempFileAttribs.put(getPath(f), fileAttr);
  290.         return true;
  291.       }
  292.     }
  293.     public void unreserveSpace(Path f) {
  294.       synchronized (this) {
  295.         FileAttributes fAttr = tempFileAttribs.remove(getPath(f));
  296.         if (fAttr != null) {
  297.           fAttr.data = null;
  298.           totalUsed -= fAttr.size;
  299.         }
  300.       }
  301.     }
  302.   
  303.     /** This API getClosedFiles could have been implemented over listPathsRaw
  304.      * but it is an overhead to maintain directory structures for this impl of
  305.      * the in-memory fs.
  306.      */
  307.     public Path[] getFiles(PathFilter filter) {
  308.       synchronized (this) {
  309.         List<String> closedFilesList = new ArrayList<String>();
  310.         synchronized (pathToFileAttribs) {
  311.           Set paths = pathToFileAttribs.keySet();
  312.           if (paths == null || paths.isEmpty()) {
  313.             return new Path[0];
  314.           }
  315.           Iterator iter = paths.iterator();
  316.           while (iter.hasNext()) {
  317.             String f = (String)iter.next();
  318.             if (filter.accept(new Path(f))) {
  319.               closedFilesList.add(f);
  320.             }
  321.           }
  322.         }
  323.         String [] names = 
  324.           closedFilesList.toArray(new String[closedFilesList.size()]);
  325.         Path [] results = new Path[names.length];
  326.         for (int i = 0; i < names.length; i++) {
  327.           results[i] = new Path(names[i]);
  328.         }
  329.         return results;
  330.       }
  331.     }
  332.   
  333.     public int getNumFiles(PathFilter filter) {
  334.       return getFiles(filter).length;
  335.     }
  336.     public long getFSSize() {
  337.       return fsSize;
  338.     }
  339.   
  340.     public float getPercentUsed() {
  341.       if (fsSize > 0)
  342.         return (float)totalUsed/fsSize;
  343.       else return 0.1f;
  344.     }
  345.  
  346.     /**
  347.      * @TODO: Fix for Java6?
  348.      * As of Java5 it is safe to assume that if the file can fit 
  349.      * in-memory then its file-size is less than Integer.MAX_VALUE.
  350.      */ 
  351.     private boolean canFitInMemory(long size) {
  352.       if ((size <= Integer.MAX_VALUE) && ((size + totalUsed) < fsSize)) {
  353.         return true;
  354.       }
  355.       return false;
  356.     }
  357.   
  358.     private String getPath(Path f) {
  359.       return f.toUri().getPath();
  360.     }
  361.   
  362.     private static class FileAttributes {
  363.       private byte[] data;
  364.       private int size;
  365.     
  366.       public FileAttributes(int size) {
  367.         this.size = size;
  368.         this.data = new byte[size];
  369.       }
  370.     }
  371.     private class InMemoryFileStatus extends FileStatus {
  372.       InMemoryFileStatus(Path f, FileAttributes attr) throws IOException {
  373.         super(attr.size, false, 1, getDefaultBlockSize(), 0, f);
  374.       }
  375.     }
  376.   }
  377.     
  378.   public InMemoryFileSystem() {
  379.     super(new RawInMemoryFileSystem());
  380.   }
  381.     
  382.   public InMemoryFileSystem(URI uri, Configuration conf) {
  383.     super(new RawInMemoryFileSystem(uri, conf));
  384.   }
  385.     
  386.   /**
  387.    * Register a file with its size. This will also register a checksum for the
  388.    * file that the user is trying to create. This is required since none of
  389.    * the FileSystem APIs accept the size of the file as argument. But since it
  390.    * is required for us to apriori know the size of the file we are going to
  391.    * create, the user must call this method for each file he wants to create
  392.    * and reserve memory for that file. We either succeed in reserving memory
  393.    * for both the main file and the checksum file and return true, or return
  394.    * false.
  395.    */
  396.   public boolean reserveSpaceWithCheckSum(Path f, long size) {
  397.     RawInMemoryFileSystem mfs = (RawInMemoryFileSystem)getRawFileSystem();
  398.     synchronized(mfs) {
  399.       boolean b = mfs.reserveSpace(f, size);
  400.       if (b) {
  401.         long checksumSize = getChecksumFileLength(f, size);
  402.         b = mfs.reserveSpace(getChecksumFile(f), checksumSize);
  403.         if (!b) {
  404.           mfs.unreserveSpace(f);
  405.         }
  406.       }
  407.       return b;
  408.     }
  409.   }
  410.   public Path[] getFiles(PathFilter filter) {
  411.     return ((RawInMemoryFileSystem)getRawFileSystem()).getFiles(filter);
  412.   }
  413.     
  414.   public int getNumFiles(PathFilter filter) {
  415.     return ((RawInMemoryFileSystem)getRawFileSystem()).getNumFiles(filter);
  416.   }
  417.   public long getFSSize() {
  418.     return ((RawInMemoryFileSystem)getRawFileSystem()).getFSSize();
  419.   }
  420.     
  421.   public float getPercentUsed() {
  422.     return ((RawInMemoryFileSystem)getRawFileSystem()).getPercentUsed();
  423.   }
  424. }