InMemoryFileSystem.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:14k
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.apache.hadoop.fs;
- import java.io.FileNotFoundException;
- import java.io.IOException;
- import java.io.OutputStream;
- import java.net.URI;
- import java.util.*;
- import org.apache.hadoop.fs.permission.FsPermission;
- import org.apache.hadoop.io.DataInputBuffer;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.util.Progressable;
- /** An implementation of the in-memory filesystem. This implementation assumes
- * that the file lengths are known ahead of time and the total lengths of all
- * the files is below a certain number (like 100 MB, configurable). Use the API
- * reserveSpaceWithCheckSum(Path f, int size) (see below for a description of
- * the API for reserving space in the FS. The uri of this filesystem starts with
- * ramfs:// .
- */
- @Deprecated
- public class InMemoryFileSystem extends ChecksumFileSystem {
- private static class RawInMemoryFileSystem extends FileSystem {
- private URI uri;
- private long fsSize;
- private volatile long totalUsed;
- private Path staticWorkingDir;
-
- //pathToFileAttribs is the final place where a file is put after it is closed
- private Map<String, FileAttributes> pathToFileAttribs =
- new HashMap<String, FileAttributes>();
-
- //tempFileAttribs is a temp place which is updated while reserving memory for
- //files we are going to create. It is read in the createRaw method and the
- //temp key/value is discarded. If the file makes it to "close", then it
- //ends up being in the pathToFileAttribs map.
- private Map<String, FileAttributes> tempFileAttribs =
- new HashMap<String, FileAttributes>();
-
- public RawInMemoryFileSystem() {
- setConf(new Configuration());
- }
- public RawInMemoryFileSystem(URI uri, Configuration conf) {
- initialize(uri, conf);
- }
-
- //inherit javadoc
- public void initialize(URI uri, Configuration conf) {
- setConf(conf);
- int size = Integer.parseInt(conf.get("fs.inmemory.size.mb", "100"));
- this.fsSize = size * 1024L * 1024L;
- this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
- String path = this.uri.getPath();
- if (path.length() == 0) {
- path = Path.CUR_DIR;
- }
- this.staticWorkingDir = new Path(path);
- LOG.info("Initialized InMemoryFileSystem: " + uri.toString() +
- " of size (in bytes): " + fsSize);
- }
- //inherit javadoc
- public URI getUri() {
- return uri;
- }
- private class InMemoryInputStream extends FSInputStream {
- private DataInputBuffer din = new DataInputBuffer();
- private FileAttributes fAttr;
-
- public InMemoryInputStream(Path f) throws IOException {
- synchronized (RawInMemoryFileSystem.this) {
- fAttr = pathToFileAttribs.get(getPath(f));
- if (fAttr == null) {
- throw new FileNotFoundException("File " + f + " does not exist");
- }
- din.reset(fAttr.data, 0, fAttr.size);
- }
- }
-
- public long getPos() throws IOException {
- return din.getPosition();
- }
-
- public void seek(long pos) throws IOException {
- if ((int)pos > fAttr.size)
- throw new IOException("Cannot seek after EOF");
- din.reset(fAttr.data, (int)pos, fAttr.size - (int)pos);
- }
-
- public boolean seekToNewSource(long targetPos) throws IOException {
- return false;
- }
- public int available() throws IOException {
- return din.available();
- }
- public boolean markSupport() { return false; }
- public int read() throws IOException {
- return din.read();
- }
- public int read(byte[] b, int off, int len) throws IOException {
- return din.read(b, off, len);
- }
-
- public long skip(long n) throws IOException { return din.skip(n); }
- }
- public FSDataInputStream open(Path f, int bufferSize) throws IOException {
- return new FSDataInputStream(new InMemoryInputStream(f));
- }
- private class InMemoryOutputStream extends OutputStream {
- private int count;
- private FileAttributes fAttr;
- private Path f;
-
- public InMemoryOutputStream(Path f, FileAttributes fAttr)
- throws IOException {
- this.fAttr = fAttr;
- this.f = f;
- }
-
- public long getPos() throws IOException {
- return count;
- }
-
- public void close() throws IOException {
- synchronized (RawInMemoryFileSystem.this) {
- pathToFileAttribs.put(getPath(f), fAttr);
- }
- }
-
- public void write(byte[] b, int off, int len) throws IOException {
- if ((off < 0) || (off > b.length) || (len < 0) ||
- ((off + len) > b.length) || ((off + len) < 0)) {
- throw new IndexOutOfBoundsException();
- } else if (len == 0) {
- return;
- }
- int newcount = count + len;
- if (newcount > fAttr.size) {
- throw new IOException("Insufficient space");
- }
- System.arraycopy(b, off, fAttr.data, count, len);
- count = newcount;
- }
-
- public void write(int b) throws IOException {
- int newcount = count + 1;
- if (newcount > fAttr.size) {
- throw new IOException("Insufficient space");
- }
- fAttr.data[count] = (byte)b;
- count = newcount;
- }
- }
-
- /** This optional operation is not yet supported. */
- public FSDataOutputStream append(Path f, int bufferSize,
- Progressable progress) throws IOException {
- throw new IOException("Not supported");
- }
- /**
- * @param permission Currently ignored.
- */
- public FSDataOutputStream create(Path f, FsPermission permission,
- boolean overwrite, int bufferSize,
- short replication, long blockSize, Progressable progress)
- throws IOException {
- synchronized (this) {
- if (exists(f) && !overwrite) {
- throw new IOException("File already exists:"+f);
- }
- FileAttributes fAttr = tempFileAttribs.remove(getPath(f));
- if (fAttr != null)
- return create(f, fAttr);
- return null;
- }
- }
-
- public FSDataOutputStream create(Path f, FileAttributes fAttr)
- throws IOException {
- // the path is not added into the filesystem (in the pathToFileAttribs
- // map) until close is called on the outputstream that this method is
- // going to return
- // Create an output stream out of data byte array
- return new FSDataOutputStream(new InMemoryOutputStream(f, fAttr),
- statistics);
- }
- public void close() throws IOException {
- super.close();
- synchronized (this) {
- if (pathToFileAttribs != null) {
- pathToFileAttribs.clear();
- }
- pathToFileAttribs = null;
- if (tempFileAttribs != null) {
- tempFileAttribs.clear();
- }
- tempFileAttribs = null;
- }
- }
- public boolean setReplication(Path src, short replication)
- throws IOException {
- return true;
- }
- public boolean rename(Path src, Path dst) throws IOException {
- synchronized (this) {
- if (exists(dst)) {
- throw new IOException ("Path " + dst + " already exists");
- }
- FileAttributes fAttr = pathToFileAttribs.remove(getPath(src));
- if (fAttr == null) return false;
- pathToFileAttribs.put(getPath(dst), fAttr);
- return true;
- }
- }
-
- @Deprecated
- public boolean delete(Path f) throws IOException {
- return delete(f, true);
- }
-
- public boolean delete(Path f, boolean recursive) throws IOException {
- synchronized (this) {
- FileAttributes fAttr = pathToFileAttribs.remove(getPath(f));
- if (fAttr != null) {
- fAttr.data = null;
- totalUsed -= fAttr.size;
- return true;
- }
- return false;
- }
- }
-
- /**
- * Directory operations are not supported
- */
- public FileStatus[] listStatus(Path f) throws IOException {
- return null;
- }
- public void setWorkingDirectory(Path new_dir) {
- staticWorkingDir = new_dir;
- }
-
- public Path getWorkingDirectory() {
- return staticWorkingDir;
- }
- /**
- * @param permission Currently ignored.
- */
- public boolean mkdirs(Path f, FsPermission permission) throws IOException {
- return true;
- }
-
- public FileStatus getFileStatus(Path f) throws IOException {
- synchronized (this) {
- FileAttributes attr = pathToFileAttribs.get(getPath(f));
- if (attr==null) {
- throw new FileNotFoundException("File " + f + " does not exist.");
- }
- return new InMemoryFileStatus(f.makeQualified(this), attr);
- }
- }
-
- /** Some APIs exclusively for InMemoryFileSystem */
- /** Register a path with its size. */
- public boolean reserveSpace(Path f, long size) {
- synchronized (this) {
- if (!canFitInMemory(size))
- return false;
- FileAttributes fileAttr;
- try {
- fileAttr = new FileAttributes((int)size);
- } catch (OutOfMemoryError o) {
- return false;
- }
- totalUsed += size;
- tempFileAttribs.put(getPath(f), fileAttr);
- return true;
- }
- }
- public void unreserveSpace(Path f) {
- synchronized (this) {
- FileAttributes fAttr = tempFileAttribs.remove(getPath(f));
- if (fAttr != null) {
- fAttr.data = null;
- totalUsed -= fAttr.size;
- }
- }
- }
-
- /** This API getClosedFiles could have been implemented over listPathsRaw
- * but it is an overhead to maintain directory structures for this impl of
- * the in-memory fs.
- */
- public Path[] getFiles(PathFilter filter) {
- synchronized (this) {
- List<String> closedFilesList = new ArrayList<String>();
- synchronized (pathToFileAttribs) {
- Set paths = pathToFileAttribs.keySet();
- if (paths == null || paths.isEmpty()) {
- return new Path[0];
- }
- Iterator iter = paths.iterator();
- while (iter.hasNext()) {
- String f = (String)iter.next();
- if (filter.accept(new Path(f))) {
- closedFilesList.add(f);
- }
- }
- }
- String [] names =
- closedFilesList.toArray(new String[closedFilesList.size()]);
- Path [] results = new Path[names.length];
- for (int i = 0; i < names.length; i++) {
- results[i] = new Path(names[i]);
- }
- return results;
- }
- }
-
- public int getNumFiles(PathFilter filter) {
- return getFiles(filter).length;
- }
- public long getFSSize() {
- return fsSize;
- }
-
- public float getPercentUsed() {
- if (fsSize > 0)
- return (float)totalUsed/fsSize;
- else return 0.1f;
- }
-
- /**
- * @TODO: Fix for Java6?
- * As of Java5 it is safe to assume that if the file can fit
- * in-memory then its file-size is less than Integer.MAX_VALUE.
- */
- private boolean canFitInMemory(long size) {
- if ((size <= Integer.MAX_VALUE) && ((size + totalUsed) < fsSize)) {
- return true;
- }
- return false;
- }
-
- private String getPath(Path f) {
- return f.toUri().getPath();
- }
-
- private static class FileAttributes {
- private byte[] data;
- private int size;
-
- public FileAttributes(int size) {
- this.size = size;
- this.data = new byte[size];
- }
- }
- private class InMemoryFileStatus extends FileStatus {
- InMemoryFileStatus(Path f, FileAttributes attr) throws IOException {
- super(attr.size, false, 1, getDefaultBlockSize(), 0, f);
- }
- }
- }
-
- public InMemoryFileSystem() {
- super(new RawInMemoryFileSystem());
- }
-
- public InMemoryFileSystem(URI uri, Configuration conf) {
- super(new RawInMemoryFileSystem(uri, conf));
- }
-
- /**
- * Register a file with its size. This will also register a checksum for the
- * file that the user is trying to create. This is required since none of
- * the FileSystem APIs accept the size of the file as argument. But since it
- * is required for us to apriori know the size of the file we are going to
- * create, the user must call this method for each file he wants to create
- * and reserve memory for that file. We either succeed in reserving memory
- * for both the main file and the checksum file and return true, or return
- * false.
- */
- public boolean reserveSpaceWithCheckSum(Path f, long size) {
- RawInMemoryFileSystem mfs = (RawInMemoryFileSystem)getRawFileSystem();
- synchronized(mfs) {
- boolean b = mfs.reserveSpace(f, size);
- if (b) {
- long checksumSize = getChecksumFileLength(f, size);
- b = mfs.reserveSpace(getChecksumFile(f), checksumSize);
- if (!b) {
- mfs.unreserveSpace(f);
- }
- }
- return b;
- }
- }
- public Path[] getFiles(PathFilter filter) {
- return ((RawInMemoryFileSystem)getRawFileSystem()).getFiles(filter);
- }
-
- public int getNumFiles(PathFilter filter) {
- return ((RawInMemoryFileSystem)getRawFileSystem()).getNumFiles(filter);
- }
- public long getFSSize() {
- return ((RawInMemoryFileSystem)getRawFileSystem()).getFSSize();
- }
-
- public float getPercentUsed() {
- return ((RawInMemoryFileSystem)getRawFileSystem()).getPercentUsed();
- }
- }