FileSystemDirectory.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:10k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.contrib.index.lucene;
  19. import java.io.IOException;
  20. import org.apache.hadoop.conf.Configuration;
  21. import org.apache.hadoop.fs.FSDataInputStream;
  22. import org.apache.hadoop.fs.FSDataOutputStream;
  23. import org.apache.hadoop.fs.FileStatus;
  24. import org.apache.hadoop.fs.FileSystem;
  25. import org.apache.hadoop.fs.Path;
  26. import org.apache.lucene.store.BufferedIndexInput;
  27. import org.apache.lucene.store.BufferedIndexOutput;
  28. import org.apache.lucene.store.Directory;
  29. import org.apache.lucene.store.IndexInput;
  30. import org.apache.lucene.store.IndexOutput;
  31. import org.apache.lucene.store.Lock;
  32. /**
  33.  * This class implements a Lucene Directory on top of a general FileSystem.
  34.  * Currently it does not support locking.
  35.  */
  36. public class FileSystemDirectory extends Directory {
  37.   private final FileSystem fs;
  38.   private final Path directory;
  39.   private final int ioFileBufferSize;
  40.   /**
  41.    * Constructor
  42.    * @param fs
  43.    * @param directory
  44.    * @param create
  45.    * @param conf
  46.    * @throws IOException
  47.    */
  48.   public FileSystemDirectory(FileSystem fs, Path directory, boolean create,
  49.       Configuration conf) throws IOException {
  50.     this.fs = fs;
  51.     this.directory = directory;
  52.     this.ioFileBufferSize = conf.getInt("io.file.buffer.size", 4096);
  53.     if (create) {
  54.       create();
  55.     }
  56.     boolean isDir = false;
  57.     try {
  58.       FileStatus status = fs.getFileStatus(directory);
  59.       if (status != null) {
  60.         isDir = status.isDir();
  61.       }
  62.     } catch (IOException e) {
  63.       // file does not exist, isDir already set to false
  64.     }
  65.     if (!isDir) {
  66.       throw new IOException(directory + " is not a directory");
  67.     }
  68.   }
  69.   private void create() throws IOException {
  70.     if (!fs.exists(directory)) {
  71.       fs.mkdirs(directory);
  72.     }
  73.     boolean isDir = false;
  74.     try {
  75.       FileStatus status = fs.getFileStatus(directory);
  76.       if (status != null) {
  77.         isDir = status.isDir();
  78.       }
  79.     } catch (IOException e) {
  80.       // file does not exist, isDir already set to false
  81.     }
  82.     if (!isDir) {
  83.       throw new IOException(directory + " is not a directory");
  84.     }
  85.     // clear old index files
  86.     FileStatus[] fileStatus =
  87.         fs.listStatus(directory, LuceneIndexFileNameFilter.getFilter());
  88.     for (int i = 0; i < fileStatus.length; i++) {
  89.       if (!fs.delete(fileStatus[i].getPath())) {
  90.         throw new IOException("Cannot delete index file "
  91.             + fileStatus[i].getPath());
  92.       }
  93.     }
  94.   }
  95.   /* (non-Javadoc)
  96.    * @see org.apache.lucene.store.Directory#list()
  97.    */
  98.   public String[] list() throws IOException {
  99.     FileStatus[] fileStatus =
  100.         fs.listStatus(directory, LuceneIndexFileNameFilter.getFilter());
  101.     String[] result = new String[fileStatus.length];
  102.     for (int i = 0; i < fileStatus.length; i++) {
  103.       result[i] = fileStatus[i].getPath().getName();
  104.     }
  105.     return result;
  106.   }
  107.   /* (non-Javadoc)
  108.    * @see org.apache.lucene.store.Directory#fileExists(java.lang.String)
  109.    */
  110.   public boolean fileExists(String name) throws IOException {
  111.     return fs.exists(new Path(directory, name));
  112.   }
  113.   /* (non-Javadoc)
  114.    * @see org.apache.lucene.store.Directory#fileModified(java.lang.String)
  115.    */
  116.   public long fileModified(String name) {
  117.     throw new UnsupportedOperationException();
  118.   }
  119.   /* (non-Javadoc)
  120.    * @see org.apache.lucene.store.Directory#touchFile(java.lang.String)
  121.    */
  122.   public void touchFile(String name) {
  123.     throw new UnsupportedOperationException();
  124.   }
  125.   /* (non-Javadoc)
  126.    * @see org.apache.lucene.store.Directory#fileLength(java.lang.String)
  127.    */
  128.   public long fileLength(String name) throws IOException {
  129.     return fs.getFileStatus(new Path(directory, name)).getLen();
  130.   }
  131.   /* (non-Javadoc)
  132.    * @see org.apache.lucene.store.Directory#deleteFile(java.lang.String)
  133.    */
  134.   public void deleteFile(String name) throws IOException {
  135.     if (!fs.delete(new Path(directory, name))) {
  136.       throw new IOException("Cannot delete index file " + name);
  137.     }
  138.   }
  139.   /* (non-Javadoc)
  140.    * @see org.apache.lucene.store.Directory#renameFile(java.lang.String, java.lang.String)
  141.    */
  142.   public void renameFile(String from, String to) throws IOException {
  143.     fs.rename(new Path(directory, from), new Path(directory, to));
  144.   }
  145.   /* (non-Javadoc)
  146.    * @see org.apache.lucene.store.Directory#createOutput(java.lang.String)
  147.    */
  148.   public IndexOutput createOutput(String name) throws IOException {
  149.     Path file = new Path(directory, name);
  150.     if (fs.exists(file) && !fs.delete(file)) {
  151.       // delete the existing one if applicable
  152.       throw new IOException("Cannot overwrite index file " + file);
  153.     }
  154.     return new FileSystemIndexOutput(file, ioFileBufferSize);
  155.   }
  156.   /* (non-Javadoc)
  157.    * @see org.apache.lucene.store.Directory#openInput(java.lang.String)
  158.    */
  159.   public IndexInput openInput(String name) throws IOException {
  160.     return openInput(name, ioFileBufferSize);
  161.   }
  162.   /* (non-Javadoc)
  163.    * @see org.apache.lucene.store.Directory#openInput(java.lang.String, int)
  164.    */
  165.   public IndexInput openInput(String name, int bufferSize) throws IOException {
  166.     return new FileSystemIndexInput(new Path(directory, name), bufferSize);
  167.   }
  168.   /* (non-Javadoc)
  169.    * @see org.apache.lucene.store.Directory#makeLock(java.lang.String)
  170.    */
  171.   public Lock makeLock(final String name) {
  172.     return new Lock() {
  173.       public boolean obtain() {
  174.         return true;
  175.       }
  176.       public void release() {
  177.       }
  178.       public boolean isLocked() {
  179.         throw new UnsupportedOperationException();
  180.       }
  181.       public String toString() {
  182.         return "Lock@" + new Path(directory, name);
  183.       }
  184.     };
  185.   }
  186.   /* (non-Javadoc)
  187.    * @see org.apache.lucene.store.Directory#close()
  188.    */
  189.   public void close() throws IOException {
  190.     // do not close the file system
  191.   }
  192.   /* (non-Javadoc)
  193.    * @see java.lang.Object#toString()
  194.    */
  195.   public String toString() {
  196.     return this.getClass().getName() + "@" + directory;
  197.   }
  198.   private class FileSystemIndexInput extends BufferedIndexInput {
  199.     // shared by clones
  200.     private class Descriptor {
  201.       public final FSDataInputStream in;
  202.       public long position; // cache of in.getPos()
  203.       public Descriptor(Path file, int ioFileBufferSize) throws IOException {
  204.         this.in = fs.open(file, ioFileBufferSize);
  205.       }
  206.     }
  207.     private final Path filePath; // for debugging
  208.     private final Descriptor descriptor;
  209.     private final long length;
  210.     private boolean isOpen;
  211.     private boolean isClone;
  212.     public FileSystemIndexInput(Path path, int ioFileBufferSize)
  213.         throws IOException {
  214.       filePath = path;
  215.       descriptor = new Descriptor(path, ioFileBufferSize);
  216.       length = fs.getFileStatus(path).getLen();
  217.       isOpen = true;
  218.     }
  219.     protected void readInternal(byte[] b, int offset, int len)
  220.         throws IOException {
  221.       synchronized (descriptor) {
  222.         long position = getFilePointer();
  223.         if (position != descriptor.position) {
  224.           descriptor.in.seek(position);
  225.           descriptor.position = position;
  226.         }
  227.         int total = 0;
  228.         do {
  229.           int i = descriptor.in.read(b, offset + total, len - total);
  230.           if (i == -1) {
  231.             throw new IOException("Read past EOF");
  232.           }
  233.           descriptor.position += i;
  234.           total += i;
  235.         } while (total < len);
  236.       }
  237.     }
  238.     public void close() throws IOException {
  239.       if (!isClone) {
  240.         if (isOpen) {
  241.           descriptor.in.close();
  242.           isOpen = false;
  243.         } else {
  244.           throw new IOException("Index file " + filePath + " already closed");
  245.         }
  246.       }
  247.     }
  248.     protected void seekInternal(long position) {
  249.       // handled in readInternal()
  250.     }
  251.     public long length() {
  252.       return length;
  253.     }
  254.     protected void finalize() throws IOException {
  255.       if (!isClone && isOpen) {
  256.         close(); // close the file
  257.       }
  258.     }
  259.     public Object clone() {
  260.       FileSystemIndexInput clone = (FileSystemIndexInput) super.clone();
  261.       clone.isClone = true;
  262.       return clone;
  263.     }
  264.   }
  265.   private class FileSystemIndexOutput extends BufferedIndexOutput {
  266.     private final Path filePath; // for debugging
  267.     private final FSDataOutputStream out;
  268.     private boolean isOpen;
  269.     public FileSystemIndexOutput(Path path, int ioFileBufferSize)
  270.         throws IOException {
  271.       filePath = path;
  272.       // overwrite is true by default
  273.       out = fs.create(path, true, ioFileBufferSize);
  274.       isOpen = true;
  275.     }
  276.     public void flushBuffer(byte[] b, int offset, int size) throws IOException {
  277.       out.write(b, offset, size);
  278.     }
  279.     public void close() throws IOException {
  280.       if (isOpen) {
  281.         super.close();
  282.         out.close();
  283.         isOpen = false;
  284.       } else {
  285.         throw new IOException("Index file " + filePath + " already closed");
  286.       }
  287.     }
  288.     public void seek(long pos) throws IOException {
  289.       throw new UnsupportedOperationException();
  290.     }
  291.     public long length() throws IOException {
  292.       return out.getPos();
  293.     }
  294.     protected void finalize() throws IOException {
  295.       if (isOpen) {
  296.         close(); // close the file
  297.       }
  298.     }
  299.   }
  300. }