KFSInputStream.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:3k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  *
  3.  * Licensed under the Apache License, Version 2.0
  4.  * (the "License"); you may not use this file except in compliance with
  5.  * the License. You may obtain a copy of the License at
  6.  *
  7.  * http://www.apache.org/licenses/LICENSE-2.0
  8.  *
  9.  * Unless required by applicable law or agreed to in writing, software
  10.  * distributed under the License is distributed on an "AS IS" BASIS,
  11.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
  12.  * implied. See the License for the specific language governing
  13.  * permissions and limitations under the License.
  14.  *
  15.  * @author: Sriram Rao (Kosmix Corp.)
  16.  * 
  17.  * Implements the Hadoop FSInputStream interfaces to allow applications to read
  18.  * files in Kosmos File System (KFS).
  19.  */
  20. package org.apache.hadoop.fs.kfs;
  21. import java.io.*;
  22. import java.nio.ByteBuffer;
  23. import org.apache.hadoop.fs.FileSystem;
  24. import org.apache.hadoop.fs.FSInputStream;
  25. import org.kosmix.kosmosfs.access.KfsAccess;
  26. import org.kosmix.kosmosfs.access.KfsInputChannel;
  27. class KFSInputStream extends FSInputStream {
  28.     private KfsInputChannel kfsChannel;
  29.     private FileSystem.Statistics statistics;
  30.     private long fsize;
  31.     @Deprecated
  32.     public KFSInputStream(KfsAccess kfsAccess, String path) {
  33.       this(kfsAccess, path, null);
  34.     }
  35.     public KFSInputStream(KfsAccess kfsAccess, String path,
  36.                             FileSystem.Statistics stats) {
  37.         this.statistics = stats;
  38.         this.kfsChannel = kfsAccess.kfs_open(path);
  39.         if (this.kfsChannel != null)
  40.             this.fsize = kfsAccess.kfs_filesize(path);
  41.         else
  42.             this.fsize = 0;
  43.     }
  44.     public long getPos() throws IOException {
  45.         if (kfsChannel == null) {
  46.             throw new IOException("File closed");
  47.         }
  48.         return kfsChannel.tell();
  49.     }
  50.     public synchronized int available() throws IOException {
  51.         if (kfsChannel == null) {
  52.             throw new IOException("File closed");
  53.         }
  54.         return (int) (this.fsize - getPos());
  55.     }
  56.     public synchronized void seek(long targetPos) throws IOException {
  57.         if (kfsChannel == null) {
  58.             throw new IOException("File closed");
  59.         }
  60.         kfsChannel.seek(targetPos);
  61.     }
  62.     public synchronized boolean seekToNewSource(long targetPos) throws IOException {
  63.         return false;
  64.     }
  65.     public synchronized int read() throws IOException {
  66.         if (kfsChannel == null) {
  67.             throw new IOException("File closed");
  68.         }
  69.         byte b[] = new byte[1];
  70.         int res = read(b, 0, 1);
  71.         if (res == 1) {
  72.           if (statistics != null) {
  73.             statistics.incrementBytesRead(1);
  74.           }
  75.           return ((int) (b[0] & 0xff));
  76.         }
  77.         return -1;
  78.     }
  79.     public synchronized int read(byte b[], int off, int len) throws IOException {
  80.         if (kfsChannel == null) {
  81.             throw new IOException("File closed");
  82.         }
  83. int res;
  84. res = kfsChannel.read(ByteBuffer.wrap(b, off, len));
  85. // Use -1 to signify EOF
  86. if (res == 0)
  87.     return -1;
  88. if (statistics != null) {
  89.   statistics.incrementBytesRead(res);
  90. }
  91. return res;
  92.     }
  93.     public synchronized void close() throws IOException {
  94.         if (kfsChannel == null) {
  95.             return;
  96.         }
  97.         kfsChannel.close();
  98.         kfsChannel = null;
  99.     }
  100.     public boolean markSupported() {
  101.         return false;
  102.     }
  103.     public void mark(int readLimit) {
  104.         // Do nothing
  105.     }
  106.     public void reset() throws IOException {
  107.         throw new IOException("Mark not supported");
  108.     }
  109. }