SocketInputStream.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:6k
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.apache.hadoop.net;
- import java.io.IOException;
- import java.io.InputStream;
- import java.net.Socket;
- import java.net.SocketTimeoutException;
- import java.nio.ByteBuffer;
- import java.nio.channels.FileChannel;
- import java.nio.channels.ReadableByteChannel;
- import java.nio.channels.SelectableChannel;
- import java.nio.channels.SelectionKey;
- /**
- * This implements an input stream that can have a timeout while reading.
- * This sets non-blocking flag on the socket channel.
- * So after create this object, read() on
- * {@link Socket#getInputStream()} and write() on
- * {@link Socket#getOutputStream()} for the associated socket will throw
- * IllegalBlockingModeException.
- * Please use {@link SocketOutputStream} for writing.
- */
- public class SocketInputStream extends InputStream
- implements ReadableByteChannel {
- private Reader reader;
- private static class Reader extends SocketIOWithTimeout {
- ReadableByteChannel channel;
-
- Reader(ReadableByteChannel channel, long timeout) throws IOException {
- super((SelectableChannel)channel, timeout);
- this.channel = channel;
- }
-
- int performIO(ByteBuffer buf) throws IOException {
- return channel.read(buf);
- }
- }
-
- /**
- * Create a new input stream with the given timeout. If the timeout
- * is zero, it will be treated as infinite timeout. The socket's
- * channel will be configured to be non-blocking.
- *
- * @param channel
- * Channel for reading, should also be a {@link SelectableChannel}.
- * The channel will be configured to be non-blocking.
- * @param timeout timeout in milliseconds. must not be negative.
- * @throws IOException
- */
- public SocketInputStream(ReadableByteChannel channel, long timeout)
- throws IOException {
- SocketIOWithTimeout.checkChannelValidity(channel);
- reader = new Reader(channel, timeout);
- }
- /**
- * Same as SocketInputStream(socket.getChannel(), timeout): <br><br>
- *
- * Create a new input stream with the given timeout. If the timeout
- * is zero, it will be treated as infinite timeout. The socket's
- * channel will be configured to be non-blocking.
- *
- * @see SocketInputStream#SocketInputStream(ReadableByteChannel, long)
- *
- * @param socket should have a channel associated with it.
- * @param timeout timeout timeout in milliseconds. must not be negative.
- * @throws IOException
- */
- public SocketInputStream(Socket socket, long timeout)
- throws IOException {
- this(socket.getChannel(), timeout);
- }
-
- /**
- * Same as SocketInputStream(socket.getChannel(), socket.getSoTimeout())
- * :<br><br>
- *
- * Create a new input stream with the given timeout. If the timeout
- * is zero, it will be treated as infinite timeout. The socket's
- * channel will be configured to be non-blocking.
- * @see SocketInputStream#SocketInputStream(ReadableByteChannel, long)
- *
- * @param socket should have a channel associated with it.
- * @throws IOException
- */
- public SocketInputStream(Socket socket) throws IOException {
- this(socket.getChannel(), socket.getSoTimeout());
- }
-
- @Override
- public int read() throws IOException {
- /* Allocation can be removed if required.
- * probably no need to optimize or encourage single byte read.
- */
- byte[] buf = new byte[1];
- int ret = read(buf, 0, 1);
- if (ret > 0) {
- return (byte)buf[0];
- }
- if (ret != -1) {
- // unexpected
- throw new IOException("Could not read from stream");
- }
- return ret;
- }
- public int read(byte[] b, int off, int len) throws IOException {
- return read(ByteBuffer.wrap(b, off, len));
- }
- public synchronized void close() throws IOException {
- /* close the channel since Socket.getInputStream().close()
- * closes the socket.
- */
- reader.channel.close();
- reader.close();
- }
- /**
- * Returns underlying channel used by inputstream.
- * This is useful in certain cases like channel for
- * {@link FileChannel#transferFrom(ReadableByteChannel, long, long)}.
- */
- public ReadableByteChannel getChannel() {
- return reader.channel;
- }
-
- //ReadableByteChannel interface
-
- public boolean isOpen() {
- return reader.isOpen();
- }
-
- public int read(ByteBuffer dst) throws IOException {
- return reader.doIO(dst, SelectionKey.OP_READ);
- }
-
- /**
- * waits for the underlying channel to be ready for reading.
- * The timeout specified for this stream applies to this wait.
- *
- * @throws SocketTimeoutException
- * if select on the channel times out.
- * @throws IOException
- * if any other I/O error occurs.
- */
- public void waitForReadable() throws IOException {
- reader.waitForIO(SelectionKey.OP_READ);
- }
- }