KFSOutputStream.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:2k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  *
  3.  * Licensed under the Apache License, Version 2.0
  4.  * (the "License"); you may not use this file except in compliance with
  5.  * the License. You may obtain a copy of the License at
  6.  *
  7.  * http://www.apache.org/licenses/LICENSE-2.0
  8.  *
  9.  * Unless required by applicable law or agreed to in writing, software
  10.  * distributed under the License is distributed on an "AS IS" BASIS,
  11.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
  12.  * implied. See the License for the specific language governing
  13.  * permissions and limitations under the License.
  14.  *
  15.  * @author: Sriram Rao (Kosmix Corp.)
  16.  * 
  17.  * Implements the Hadoop FSOutputStream interfaces to allow applications to write to
  18.  * files in Kosmos File System (KFS).
  19.  */
  20. package org.apache.hadoop.fs.kfs;
  21. import java.io.*;
  22. import java.net.*;
  23. import java.util.*;
  24. import java.nio.ByteBuffer;
  25. import org.apache.hadoop.conf.Configuration;
  26. import org.apache.hadoop.fs.Path;
  27. import org.apache.hadoop.fs.FSDataOutputStream;
  28. import org.apache.hadoop.util.Progressable;
  29. import org.kosmix.kosmosfs.access.KfsAccess;
  30. import org.kosmix.kosmosfs.access.KfsOutputChannel;
  31. class KFSOutputStream extends OutputStream {
  32.     private String path;
  33.     private KfsOutputChannel kfsChannel;
  34.     public KFSOutputStream(KfsAccess kfsAccess, String path, short replication) {
  35.         this.path = path;
  36.         this.kfsChannel = kfsAccess.kfs_create(path, replication);
  37.     }
  38.     public long getPos() throws IOException {
  39.         if (kfsChannel == null) {
  40.             throw new IOException("File closed");
  41.         }
  42.         return kfsChannel.tell();
  43.     }
  44.     public void write(int v) throws IOException {
  45.         if (kfsChannel == null) {
  46.             throw new IOException("File closed");
  47.         }
  48.         byte[] b = new byte[1];
  49.         b[0] = (byte) v;
  50.         write(b, 0, 1);
  51.     }
  52.     public void write(byte b[], int off, int len) throws IOException {
  53.         if (kfsChannel == null) {
  54.             throw new IOException("File closed");
  55.         }
  56.         kfsChannel.write(ByteBuffer.wrap(b, off, len));
  57.     }
  58.     public void flush() throws IOException {
  59.         if (kfsChannel == null) {
  60.             throw new IOException("File closed");
  61.         }
  62.         kfsChannel.sync();
  63.     }
  64.     public synchronized void close() throws IOException {
  65.         if (kfsChannel == null) {
  66.             return;
  67.         }
  68.         flush();
  69.         kfsChannel.close();
  70.         kfsChannel = null;
  71.     }
  72. }