S3InputStream.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:5k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.fs.s3;
  19. import java.io.DataInputStream;
  20. import java.io.File;
  21. import java.io.FileInputStream;
  22. import java.io.IOException;
  23. import org.apache.hadoop.conf.Configuration;
  24. import org.apache.hadoop.fs.FSInputStream;
  25. import org.apache.hadoop.fs.FileSystem;
  26. class S3InputStream extends FSInputStream {
  27.   private FileSystemStore store;
  28.   private Block[] blocks;
  29.   private boolean closed;
  30.   private long fileLength;
  31.   private long pos = 0;
  32.   private File blockFile;
  33.   
  34.   private DataInputStream blockStream;
  35.   private long blockEnd = -1;
  36.   
  37.   private FileSystem.Statistics stats;
  38.   @Deprecated
  39.   public S3InputStream(Configuration conf, FileSystemStore store,
  40.                        INode inode) {
  41.     this(conf, store, inode, null);
  42.   }
  43.   public S3InputStream(Configuration conf, FileSystemStore store,
  44.                        INode inode, FileSystem.Statistics stats) {
  45.     
  46.     this.store = store;
  47.     this.stats = stats;
  48.     this.blocks = inode.getBlocks();
  49.     for (Block block : blocks) {
  50.       this.fileLength += block.getLength();
  51.     }
  52.   }
  53.   @Override
  54.   public synchronized long getPos() throws IOException {
  55.     return pos;
  56.   }
  57.   @Override
  58.   public synchronized int available() throws IOException {
  59.     return (int) (fileLength - pos);
  60.   }
  61.   @Override
  62.   public synchronized void seek(long targetPos) throws IOException {
  63.     if (targetPos > fileLength) {
  64.       throw new IOException("Cannot seek after EOF");
  65.     }
  66.     pos = targetPos;
  67.     blockEnd = -1;
  68.   }
  69.   @Override
  70.   public synchronized boolean seekToNewSource(long targetPos) throws IOException {
  71.     return false;
  72.   }
  73.   @Override
  74.   public synchronized int read() throws IOException {
  75.     if (closed) {
  76.       throw new IOException("Stream closed");
  77.     }
  78.     int result = -1;
  79.     if (pos < fileLength) {
  80.       if (pos > blockEnd) {
  81.         blockSeekTo(pos);
  82.       }
  83.       result = blockStream.read();
  84.       if (result >= 0) {
  85.         pos++;
  86.       }
  87.     }
  88.     if (stats != null & result >= 0) {
  89.       stats.incrementBytesRead(1);
  90.     }
  91.     return result;
  92.   }
  93.   @Override
  94.   public synchronized int read(byte buf[], int off, int len) throws IOException {
  95.     if (closed) {
  96.       throw new IOException("Stream closed");
  97.     }
  98.     if (pos < fileLength) {
  99.       if (pos > blockEnd) {
  100.         blockSeekTo(pos);
  101.       }
  102.       int realLen = Math.min(len, (int) (blockEnd - pos + 1));
  103.       int result = blockStream.read(buf, off, realLen);
  104.       if (result >= 0) {
  105.         pos += result;
  106.       }
  107.       if (stats != null && result > 0) {
  108.         stats.incrementBytesRead(result);
  109.       }
  110.       return result;
  111.     }
  112.     return -1;
  113.   }
  114.   private synchronized void blockSeekTo(long target) throws IOException {
  115.     //
  116.     // Compute desired block
  117.     //
  118.     int targetBlock = -1;
  119.     long targetBlockStart = 0;
  120.     long targetBlockEnd = 0;
  121.     for (int i = 0; i < blocks.length; i++) {
  122.       long blockLength = blocks[i].getLength();
  123.       targetBlockEnd = targetBlockStart + blockLength - 1;
  124.       if (target >= targetBlockStart && target <= targetBlockEnd) {
  125.         targetBlock = i;
  126.         break;
  127.       } else {
  128.         targetBlockStart = targetBlockEnd + 1;
  129.       }
  130.     }
  131.     if (targetBlock < 0) {
  132.       throw new IOException(
  133.                             "Impossible situation: could not find target position " + target);
  134.     }
  135.     long offsetIntoBlock = target - targetBlockStart;
  136.     // read block blocks[targetBlock] from position offsetIntoBlock
  137.     this.blockFile = store.retrieveBlock(blocks[targetBlock], offsetIntoBlock);
  138.     this.pos = target;
  139.     this.blockEnd = targetBlockEnd;
  140.     this.blockStream = new DataInputStream(new FileInputStream(blockFile));
  141.   }
  142.   @Override
  143.   public void close() throws IOException {
  144.     if (closed) {
  145.       return;
  146.     }
  147.     if (blockStream != null) {
  148.       blockStream.close();
  149.       blockStream = null;
  150.     }
  151.     if (blockFile != null) {
  152.       blockFile.delete();
  153.     }
  154.     super.close();
  155.     closed = true;
  156.   }
  157.   /**
  158.    * We don't support marks.
  159.    */
  160.   @Override
  161.   public boolean markSupported() {
  162.     return false;
  163.   }
  164.   @Override
  165.   public void mark(int readLimit) {
  166.     // Do nothing
  167.   }
  168.   @Override
  169.   public void reset() throws IOException {
  170.     throw new IOException("Mark not supported");
  171.   }
  172. }