S3OutputStream.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:5k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.fs.s3;
  19. import java.io.File;
  20. import java.io.FileOutputStream;
  21. import java.io.IOException;
  22. import java.io.OutputStream;
  23. import java.util.ArrayList;
  24. import java.util.List;
  25. import java.util.Random;
  26. import org.apache.hadoop.conf.Configuration;
  27. import org.apache.hadoop.fs.Path;
  28. import org.apache.hadoop.fs.s3.INode.FileType;
  29. import org.apache.hadoop.util.Progressable;
  30. class S3OutputStream extends OutputStream {
  31.   private Configuration conf;
  32.   
  33.   private int bufferSize;
  34.   private FileSystemStore store;
  35.   private Path path;
  36.   private long blockSize;
  37.   private File backupFile;
  38.   private OutputStream backupStream;
  39.   private Random r = new Random();
  40.   private boolean closed;
  41.   private int pos = 0;
  42.   private long filePos = 0;
  43.   private int bytesWrittenToBlock = 0;
  44.   private byte[] outBuf;
  45.   private List<Block> blocks = new ArrayList<Block>();
  46.   private Block nextBlock;
  47.   public S3OutputStream(Configuration conf, FileSystemStore store,
  48.                         Path path, long blockSize, Progressable progress,
  49.                         int buffersize) throws IOException {
  50.     
  51.     this.conf = conf;
  52.     this.store = store;
  53.     this.path = path;
  54.     this.blockSize = blockSize;
  55.     this.backupFile = newBackupFile();
  56.     this.backupStream = new FileOutputStream(backupFile);
  57.     this.bufferSize = buffersize;
  58.     this.outBuf = new byte[bufferSize];
  59.   }
  60.   private File newBackupFile() throws IOException {
  61.     File dir = new File(conf.get("fs.s3.buffer.dir"));
  62.     if (!dir.exists() && !dir.mkdirs()) {
  63.       throw new IOException("Cannot create S3 buffer directory: " + dir);
  64.     }
  65.     File result = File.createTempFile("output-", ".tmp", dir);
  66.     result.deleteOnExit();
  67.     return result;
  68.   }
  69.   public long getPos() throws IOException {
  70.     return filePos;
  71.   }
  72.   @Override
  73.   public synchronized void write(int b) throws IOException {
  74.     if (closed) {
  75.       throw new IOException("Stream closed");
  76.     }
  77.     if ((bytesWrittenToBlock + pos == blockSize) || (pos >= bufferSize)) {
  78.       flush();
  79.     }
  80.     outBuf[pos++] = (byte) b;
  81.     filePos++;
  82.   }
  83.   @Override
  84.   public synchronized void write(byte b[], int off, int len) throws IOException {
  85.     if (closed) {
  86.       throw new IOException("Stream closed");
  87.     }
  88.     while (len > 0) {
  89.       int remaining = bufferSize - pos;
  90.       int toWrite = Math.min(remaining, len);
  91.       System.arraycopy(b, off, outBuf, pos, toWrite);
  92.       pos += toWrite;
  93.       off += toWrite;
  94.       len -= toWrite;
  95.       filePos += toWrite;
  96.       if ((bytesWrittenToBlock + pos >= blockSize) || (pos == bufferSize)) {
  97.         flush();
  98.       }
  99.     }
  100.   }
  101.   @Override
  102.   public synchronized void flush() throws IOException {
  103.     if (closed) {
  104.       throw new IOException("Stream closed");
  105.     }
  106.     if (bytesWrittenToBlock + pos >= blockSize) {
  107.       flushData((int) blockSize - bytesWrittenToBlock);
  108.     }
  109.     if (bytesWrittenToBlock == blockSize) {
  110.       endBlock();
  111.     }
  112.     flushData(pos);
  113.   }
  114.   private synchronized void flushData(int maxPos) throws IOException {
  115.     int workingPos = Math.min(pos, maxPos);
  116.     if (workingPos > 0) {
  117.       //
  118.       // To the local block backup, write just the bytes
  119.       //
  120.       backupStream.write(outBuf, 0, workingPos);
  121.       //
  122.       // Track position
  123.       //
  124.       bytesWrittenToBlock += workingPos;
  125.       System.arraycopy(outBuf, workingPos, outBuf, 0, pos - workingPos);
  126.       pos -= workingPos;
  127.     }
  128.   }
  129.   private synchronized void endBlock() throws IOException {
  130.     //
  131.     // Done with local copy
  132.     //
  133.     backupStream.close();
  134.     //
  135.     // Send it to S3
  136.     //
  137.     // TODO: Use passed in Progressable to report progress.
  138.     nextBlockOutputStream();
  139.     store.storeBlock(nextBlock, backupFile);
  140.     internalClose();
  141.     //
  142.     // Delete local backup, start new one
  143.     //
  144.     backupFile.delete();
  145.     backupFile = newBackupFile();
  146.     backupStream = new FileOutputStream(backupFile);
  147.     bytesWrittenToBlock = 0;
  148.   }
  149.   private synchronized void nextBlockOutputStream() throws IOException {
  150.     long blockId = r.nextLong();
  151.     while (store.blockExists(blockId)) {
  152.       blockId = r.nextLong();
  153.     }
  154.     nextBlock = new Block(blockId, bytesWrittenToBlock);
  155.     blocks.add(nextBlock);
  156.     bytesWrittenToBlock = 0;
  157.   }
  158.   private synchronized void internalClose() throws IOException {
  159.     INode inode = new INode(FileType.FILE, blocks.toArray(new Block[blocks
  160.                                                                     .size()]));
  161.     store.storeINode(path, inode);
  162.   }
  163.   @Override
  164.   public synchronized void close() throws IOException {
  165.     if (closed) {
  166.       return;
  167.     }
  168.     flush();
  169.     if (filePos == 0 || bytesWrittenToBlock != 0) {
  170.       endBlock();
  171.     }
  172.     backupStream.close();
  173.     backupFile.delete();
  174.     super.close();
  175.     closed = true;
  176.   }
  177. }