SocketInputStream.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:6k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.net;
  19. import java.io.IOException;
  20. import java.io.InputStream;
  21. import java.net.Socket;
  22. import java.net.SocketTimeoutException;
  23. import java.nio.ByteBuffer;
  24. import java.nio.channels.FileChannel;
  25. import java.nio.channels.ReadableByteChannel;
  26. import java.nio.channels.SelectableChannel;
  27. import java.nio.channels.SelectionKey;
  28. /**
  29.  * This implements an input stream that can have a timeout while reading.
  30.  * This sets non-blocking flag on the socket channel.
  31.  * So after create this object, read() on 
  32.  * {@link Socket#getInputStream()} and write() on 
  33.  * {@link Socket#getOutputStream()} for the associated socket will throw 
  34.  * IllegalBlockingModeException. 
  35.  * Please use {@link SocketOutputStream} for writing.
  36.  */
  37. public class SocketInputStream extends InputStream
  38.                                implements ReadableByteChannel {
  39.   private Reader reader;
  40.   private static class Reader extends SocketIOWithTimeout {
  41.     ReadableByteChannel channel;
  42.     
  43.     Reader(ReadableByteChannel channel, long timeout) throws IOException {
  44.       super((SelectableChannel)channel, timeout);
  45.       this.channel = channel;
  46.     }
  47.     
  48.     int performIO(ByteBuffer buf) throws IOException {
  49.       return channel.read(buf);
  50.     }
  51.   }
  52.   
  53.   /**
  54.    * Create a new input stream with the given timeout. If the timeout
  55.    * is zero, it will be treated as infinite timeout. The socket's
  56.    * channel will be configured to be non-blocking.
  57.    * 
  58.    * @param channel 
  59.    *        Channel for reading, should also be a {@link SelectableChannel}.
  60.    *        The channel will be configured to be non-blocking.
  61.    * @param timeout timeout in milliseconds. must not be negative.
  62.    * @throws IOException
  63.    */
  64.   public SocketInputStream(ReadableByteChannel channel, long timeout)
  65.                                                         throws IOException {
  66.     SocketIOWithTimeout.checkChannelValidity(channel);
  67.     reader = new Reader(channel, timeout);
  68.   }
  69.   /**
  70.    * Same as SocketInputStream(socket.getChannel(), timeout): <br><br>
  71.    * 
  72.    * Create a new input stream with the given timeout. If the timeout
  73.    * is zero, it will be treated as infinite timeout. The socket's
  74.    * channel will be configured to be non-blocking.
  75.    * 
  76.    * @see SocketInputStream#SocketInputStream(ReadableByteChannel, long)
  77.    *  
  78.    * @param socket should have a channel associated with it.
  79.    * @param timeout timeout timeout in milliseconds. must not be negative.
  80.    * @throws IOException
  81.    */
  82.   public SocketInputStream(Socket socket, long timeout) 
  83.                                          throws IOException {
  84.     this(socket.getChannel(), timeout);
  85.   }
  86.   
  87.   /**
  88.    * Same as SocketInputStream(socket.getChannel(), socket.getSoTimeout())
  89.    * :<br><br>
  90.    * 
  91.    * Create a new input stream with the given timeout. If the timeout
  92.    * is zero, it will be treated as infinite timeout. The socket's
  93.    * channel will be configured to be non-blocking.
  94.    * @see SocketInputStream#SocketInputStream(ReadableByteChannel, long)
  95.    *  
  96.    * @param socket should have a channel associated with it.
  97.    * @throws IOException
  98.    */
  99.   public SocketInputStream(Socket socket) throws IOException {
  100.     this(socket.getChannel(), socket.getSoTimeout());
  101.   }
  102.   
  103.   @Override
  104.   public int read() throws IOException {
  105.     /* Allocation can be removed if required.
  106.      * probably no need to optimize or encourage single byte read.
  107.      */
  108.     byte[] buf = new byte[1];
  109.     int ret = read(buf, 0, 1);
  110.     if (ret > 0) {
  111.       return (byte)buf[0];
  112.     }
  113.     if (ret != -1) {
  114.       // unexpected
  115.       throw new IOException("Could not read from stream");
  116.     }
  117.     return ret;
  118.   }
  119.   public int read(byte[] b, int off, int len) throws IOException {
  120.     return read(ByteBuffer.wrap(b, off, len));
  121.   }
  122.   public synchronized void close() throws IOException {
  123.     /* close the channel since Socket.getInputStream().close()
  124.      * closes the socket.
  125.      */
  126.     reader.channel.close();
  127.     reader.close();
  128.   }
  129.   /**
  130.    * Returns underlying channel used by inputstream.
  131.    * This is useful in certain cases like channel for 
  132.    * {@link FileChannel#transferFrom(ReadableByteChannel, long, long)}.
  133.    */
  134.   public ReadableByteChannel getChannel() {
  135.     return reader.channel; 
  136.   }
  137.   
  138.   //ReadableByteChannel interface
  139.     
  140.   public boolean isOpen() {
  141.     return reader.isOpen();
  142.   }
  143.     
  144.   public int read(ByteBuffer dst) throws IOException {
  145.     return reader.doIO(dst, SelectionKey.OP_READ);
  146.   }
  147.   
  148.   /**
  149.    * waits for the underlying channel to be ready for reading.
  150.    * The timeout specified for this stream applies to this wait.
  151.    * 
  152.    * @throws SocketTimeoutException 
  153.    *         if select on the channel times out.
  154.    * @throws IOException
  155.    *         if any other I/O error occurs. 
  156.    */
  157.   public void waitForReadable() throws IOException {
  158.     reader.waitForIO(SelectionKey.OP_READ);
  159.   }
  160. }