FTPInputStream.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:3k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.fs.ftp;
  19. import java.io.IOException;
  20. import java.io.InputStream;
  21. import org.apache.commons.net.ftp.FTPClient;
  22. import org.apache.hadoop.fs.FSInputStream;
  23. import org.apache.hadoop.fs.FileSystem;
  24. public class FTPInputStream extends FSInputStream {
  25.   InputStream wrappedStream;
  26.   FTPClient client;
  27.   FileSystem.Statistics stats;
  28.   boolean closed;
  29.   long pos;
  30.   public FTPInputStream(InputStream stream, FTPClient client,
  31.       FileSystem.Statistics stats) {
  32.     if (stream == null) {
  33.       throw new IllegalArgumentException("Null InputStream");
  34.     }
  35.     if (client == null || !client.isConnected()) {
  36.       throw new IllegalArgumentException("FTP client null or not connected");
  37.     }
  38.     this.wrappedStream = stream;
  39.     this.client = client;
  40.     this.stats = stats;
  41.     this.pos = 0;
  42.     this.closed = false;
  43.   }
  44.   public long getPos() throws IOException {
  45.     return pos;
  46.   }
  47.   // We don't support seek.
  48.   public void seek(long pos) throws IOException {
  49.     throw new IOException("Seek not supported");
  50.   }
  51.   public boolean seekToNewSource(long targetPos) throws IOException {
  52.     throw new IOException("Seek not supported");
  53.   }
  54.   public synchronized int read() throws IOException {
  55.     if (closed) {
  56.       throw new IOException("Stream closed");
  57.     }
  58.     int byteRead = wrappedStream.read();
  59.     if (byteRead >= 0) {
  60.       pos++;
  61.     }
  62.     if (stats != null & byteRead >= 0) {
  63.       stats.incrementBytesRead(1);
  64.     }
  65.     return byteRead;
  66.   }
  67.   public synchronized int read(byte buf[], int off, int len) throws IOException {
  68.     if (closed) {
  69.       throw new IOException("Stream closed");
  70.     }
  71.     int result = wrappedStream.read(buf, off, len);
  72.     if (result > 0) {
  73.       pos += result;
  74.     }
  75.     if (stats != null & result > 0) {
  76.       stats.incrementBytesRead(result);
  77.     }
  78.     return result;
  79.   }
  80.   public synchronized void close() throws IOException {
  81.     if (closed) {
  82.       throw new IOException("Stream closed");
  83.     }
  84.     super.close();
  85.     closed = true;
  86.     if (!client.isConnected()) {
  87.       throw new FTPException("Client not connected");
  88.     }
  89.     boolean cmdCompleted = client.completePendingCommand();
  90.     client.logout();
  91.     client.disconnect();
  92.     if (!cmdCompleted) {
  93.       throw new FTPException("Could not complete transfer, Reply Code - "
  94.           + client.getReplyCode());
  95.     }
  96.   }
  97.   // Not supported.
  98.   public boolean markSupported() {
  99.     return false;
  100.   }
  101.   public void mark(int readLimit) {
  102.     // Do nothing
  103.   }
  104.   public void reset() throws IOException {
  105.     throw new IOException("Mark not supported");
  106.   }
  107. }