KosmosFileSystem.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:10k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  *
  3.  * Licensed under the Apache License, Version 2.0
  4.  * (the "License"); you may not use this file except in compliance with
  5.  * the License. You may obtain a copy of the License at
  6.  *
  7.  * http://www.apache.org/licenses/LICENSE-2.0
  8.  *
  9.  * Unless required by applicable law or agreed to in writing, software
  10.  * distributed under the License is distributed on an "AS IS" BASIS,
  11.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
  12.  * implied. See the License for the specific language governing
  13.  * permissions and limitations under the License.
  14.  *
  15.  * @author: Sriram Rao (Kosmix Corp.)
  16.  * 
  17.  * Implements the Hadoop FS interfaces to allow applications to store
  18.  *files in Kosmos File System (KFS).
  19.  */
  20. package org.apache.hadoop.fs.kfs;
  21. import java.io.*;
  22. import java.net.*;
  23. import org.apache.hadoop.conf.Configuration;
  24. import org.apache.hadoop.fs.FSDataInputStream;
  25. import org.apache.hadoop.fs.FSDataOutputStream;
  26. import org.apache.hadoop.fs.FileSystem;
  27. import org.apache.hadoop.fs.FileStatus;
  28. import org.apache.hadoop.fs.FileUtil;
  29. import org.apache.hadoop.fs.Path;
  30. import org.apache.hadoop.fs.permission.FsPermission;
  31. import org.apache.hadoop.util.Progressable;
  32. import org.apache.hadoop.fs.BlockLocation;
  33. /**
  34.  * A FileSystem backed by KFS.
  35.  *
  36.  */
  37. public class KosmosFileSystem extends FileSystem {
  38.     private FileSystem localFs;
  39.     private IFSImpl kfsImpl = null;
  40.     private URI uri;
  41.     private Path workingDir = new Path("/");
  42.     public KosmosFileSystem() {
  43.     }
  44.     KosmosFileSystem(IFSImpl fsimpl) {
  45.         this.kfsImpl = fsimpl;
  46.     }
  47.     public URI getUri() {
  48. return uri;
  49.     }
  50.     public void initialize(URI uri, Configuration conf) throws IOException {
  51.       super.initialize(uri, conf);
  52.       try {
  53.         if (kfsImpl == null) {
  54.           if (uri.getHost() == null) {
  55.             kfsImpl = new KFSImpl(conf.get("fs.kfs.metaServerHost", ""),
  56.                                   conf.getInt("fs.kfs.metaServerPort", -1),
  57.                                   statistics);
  58.           } else {
  59.             kfsImpl = new KFSImpl(uri.getHost(), uri.getPort(), statistics);
  60.           }
  61.         }
  62.         this.localFs = FileSystem.getLocal(conf);
  63.         this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
  64.         this.workingDir = new Path("/user", System.getProperty("user.name")
  65.                                    ).makeQualified(this);
  66.         setConf(conf);
  67.       } catch (Exception e) {
  68.         e.printStackTrace();
  69.         System.out.println("Unable to initialize KFS");
  70.         System.exit(-1);
  71.       }
  72.     }
  73.     @Deprecated
  74.     public String getName() {
  75. return getUri().toString();
  76.     }
  77.     public Path getWorkingDirectory() {
  78. return workingDir;
  79.     }
  80.     public void setWorkingDirectory(Path dir) {
  81. workingDir = makeAbsolute(dir);
  82.     }
  83.     private Path makeAbsolute(Path path) {
  84. if (path.isAbsolute()) {
  85.     return path;
  86. }
  87. return new Path(workingDir, path);
  88.     }
  89.     public boolean mkdirs(Path path, FsPermission permission
  90.         ) throws IOException {
  91. Path absolute = makeAbsolute(path);
  92.         String srep = absolute.toUri().getPath();
  93. int res;
  94. // System.out.println("Calling mkdirs on: " + srep);
  95. res = kfsImpl.mkdirs(srep);
  96. return res == 0;
  97.     }
  98.     @Deprecated
  99.     public boolean isDirectory(Path path) throws IOException {
  100. Path absolute = makeAbsolute(path);
  101.         String srep = absolute.toUri().getPath();
  102. // System.out.println("Calling isdir on: " + srep);
  103.         return kfsImpl.isDirectory(srep);
  104.     }
  105.     @Deprecated
  106.     public boolean isFile(Path path) throws IOException {
  107. Path absolute = makeAbsolute(path);
  108.         String srep = absolute.toUri().getPath();
  109.         return kfsImpl.isFile(srep);
  110.     }
  111.     public FileStatus[] listStatus(Path path) throws IOException {
  112.         Path absolute = makeAbsolute(path);
  113.         String srep = absolute.toUri().getPath();
  114.         if (kfsImpl.isFile(srep))
  115.                 return new FileStatus[] { getFileStatus(path) } ;
  116.         return kfsImpl.readdirplus(absolute);
  117.     }
  118.     public FileStatus getFileStatus(Path path) throws IOException {
  119. Path absolute = makeAbsolute(path);
  120.         String srep = absolute.toUri().getPath();
  121.         if (!kfsImpl.exists(srep)) {
  122.           throw new FileNotFoundException("File " + path + " does not exist.");
  123.         }
  124.         if (kfsImpl.isDirectory(srep)) {
  125.             // System.out.println("Status of path: " + path + " is dir");
  126.             return new FileStatus(0, true, 1, 0, kfsImpl.getModificationTime(srep), 
  127.                                   path.makeQualified(this));
  128.         } else {
  129.             // System.out.println("Status of path: " + path + " is file");
  130.             return new FileStatus(kfsImpl.filesize(srep), false, 
  131.                                   kfsImpl.getReplication(srep),
  132.                                   getDefaultBlockSize(),
  133.                                   kfsImpl.getModificationTime(srep),
  134.                                   path.makeQualified(this));
  135.         }
  136.     }
  137.     
  138.     /** This optional operation is not yet supported. */
  139.     public FSDataOutputStream append(Path f, int bufferSize,
  140.         Progressable progress) throws IOException {
  141.       throw new IOException("Not supported");
  142.     }
  143.     public FSDataOutputStream create(Path file, FsPermission permission,
  144.                                      boolean overwrite, int bufferSize,
  145.      short replication, long blockSize, Progressable progress)
  146. throws IOException {
  147.         if (exists(file)) {
  148.             if (overwrite) {
  149.                 delete(file);
  150.             } else {
  151.                 throw new IOException("File already exists: " + file);
  152.             }
  153.         }
  154. Path parent = file.getParent();
  155. if (parent != null && !mkdirs(parent)) {
  156.     throw new IOException("Mkdirs failed to create " + parent);
  157. }
  158.         Path absolute = makeAbsolute(file);
  159.         String srep = absolute.toUri().getPath();
  160.         return kfsImpl.create(srep, replication, bufferSize);
  161.     }
  162.     public FSDataInputStream open(Path path, int bufferSize) throws IOException {
  163.         if (!exists(path))
  164.             throw new IOException("File does not exist: " + path);
  165.         Path absolute = makeAbsolute(path);
  166.         String srep = absolute.toUri().getPath();
  167.         return kfsImpl.open(srep, bufferSize);
  168.     }
  169.     public boolean rename(Path src, Path dst) throws IOException {
  170. Path absoluteS = makeAbsolute(src);
  171.         String srepS = absoluteS.toUri().getPath();
  172. Path absoluteD = makeAbsolute(dst);
  173.         String srepD = absoluteD.toUri().getPath();
  174.         // System.out.println("Calling rename on: " + srepS + " -> " + srepD);
  175.         return kfsImpl.rename(srepS, srepD) == 0;
  176.     }
  177.     // recursively delete the directory and its contents
  178.     public boolean delete(Path path, boolean recursive) throws IOException {
  179.       Path absolute = makeAbsolute(path);
  180.       String srep = absolute.toUri().getPath();
  181.       if (kfsImpl.isFile(srep))
  182.         return kfsImpl.remove(srep) == 0;
  183.       FileStatus[] dirEntries = listStatus(absolute);
  184.       if ((!recursive) && (dirEntries != null) && 
  185.             (dirEntries.length != 0)) {
  186.         throw new IOException("Directory " + path.toString() + 
  187.         " is not empty.");
  188.       }
  189.       if (dirEntries != null) {
  190.         for (int i = 0; i < dirEntries.length; i++) {
  191.           delete(new Path(absolute, dirEntries[i].getPath()), recursive);
  192.         }
  193.       }
  194.       return kfsImpl.rmdir(srep) == 0;
  195.     }
  196.     
  197.     @Deprecated
  198.     public boolean delete(Path path) throws IOException {
  199.       return delete(path, true);
  200.     }
  201.     
  202.     @Deprecated
  203.     public long getLength(Path path) throws IOException {
  204. Path absolute = makeAbsolute(path);
  205.         String srep = absolute.toUri().getPath();
  206.         return kfsImpl.filesize(srep);
  207.     }
  208.     @Deprecated
  209.     public short getReplication(Path path) throws IOException {
  210. Path absolute = makeAbsolute(path);
  211.         String srep = absolute.toUri().getPath();
  212.         return kfsImpl.getReplication(srep);
  213.     }
  214.     public short getDefaultReplication() {
  215. return 3;
  216.     }
  217.     public boolean setReplication(Path path, short replication)
  218. throws IOException {
  219. Path absolute = makeAbsolute(path);
  220.         String srep = absolute.toUri().getPath();
  221.         int res = kfsImpl.setReplication(srep, replication);
  222.         return res >= 0;
  223.     }
  224.     // 64MB is the KFS block size
  225.     public long getDefaultBlockSize() {
  226. return 1 << 26;
  227.     }
  228.     @Deprecated            
  229.     public void lock(Path path, boolean shared) throws IOException {
  230.     }
  231.     @Deprecated            
  232.     public void release(Path path) throws IOException {
  233.     }
  234.     /**
  235.      * Return null if the file doesn't exist; otherwise, get the
  236.      * locations of the various chunks of the file file from KFS.
  237.      */
  238.     @Override
  239.     public BlockLocation[] getFileBlockLocations(FileStatus file, long start,
  240.         long len) throws IOException {
  241.       if (file == null) {
  242.         return null;
  243.       }
  244.       String srep = makeAbsolute(file.getPath()).toUri().getPath();
  245.       String[][] hints = kfsImpl.getDataLocation(srep, start, len);
  246.       if (hints == null) {
  247.         return null;
  248.       }
  249.       BlockLocation[] result = new BlockLocation[hints.length];
  250.       long blockSize = getDefaultBlockSize();
  251.       long length = len;
  252.       long blockStart = start;
  253.       for(int i=0; i < result.length; ++i) {
  254.         result[i] = new BlockLocation(null, hints[i], blockStart, 
  255.                                       length < blockSize ? length : blockSize);
  256.         blockStart += blockSize;
  257.         length -= blockSize;
  258.       }
  259.       return result;
  260.     }
  261.     public void copyFromLocalFile(boolean delSrc, Path src, Path dst) throws IOException {
  262. FileUtil.copy(localFs, src, this, dst, delSrc, getConf());
  263.     }
  264.     public void copyToLocalFile(boolean delSrc, Path src, Path dst) throws IOException {
  265. FileUtil.copy(this, src, localFs, dst, delSrc, getConf());
  266.     }
  267.     public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
  268. throws IOException {
  269. return tmpLocalFile;
  270.     }
  271.     public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
  272. throws IOException {
  273. moveFromLocalFile(tmpLocalFile, fsOutputFile);
  274.     }
  275. }