SocketIOWithTimeout.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:14k
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.apache.hadoop.net;
- import java.io.IOException;
- import java.io.InterruptedIOException;
- import java.net.SocketAddress;
- import java.net.SocketTimeoutException;
- import java.nio.ByteBuffer;
- import java.nio.channels.SelectableChannel;
- import java.nio.channels.SelectionKey;
- import java.nio.channels.Selector;
- import java.nio.channels.SocketChannel;
- import java.nio.channels.spi.SelectorProvider;
- import java.util.Iterator;
- import java.util.LinkedList;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.apache.hadoop.util.StringUtils;
- /**
- * This supports input and output streams for a socket channels.
- * These streams can have a timeout.
- */
- abstract class SocketIOWithTimeout {
- // This is intentionally package private.
- static final Log LOG = LogFactory.getLog(SocketIOWithTimeout.class);
-
- private SelectableChannel channel;
- private long timeout;
- private boolean closed = false;
-
- private static SelectorPool selector = new SelectorPool();
-
- /* A timeout value of 0 implies wait for ever.
- * We should have a value of timeout that implies zero wait.. i.e.
- * read or write returns immediately.
- *
- * This will set channel to non-blocking.
- */
- SocketIOWithTimeout(SelectableChannel channel, long timeout)
- throws IOException {
- checkChannelValidity(channel);
-
- this.channel = channel;
- this.timeout = timeout;
- // Set non-blocking
- channel.configureBlocking(false);
- }
-
- void close() {
- closed = true;
- }
- boolean isOpen() {
- return !closed && channel.isOpen();
- }
- SelectableChannel getChannel() {
- return channel;
- }
-
- /**
- * Utility function to check if channel is ok.
- * Mainly to throw IOException instead of runtime exception
- * in case of mismatch. This mismatch can occur for many runtime
- * reasons.
- */
- static void checkChannelValidity(Object channel) throws IOException {
- if (channel == null) {
- /* Most common reason is that original socket does not have a channel.
- * So making this an IOException rather than a RuntimeException.
- */
- throw new IOException("Channel is null. Check " +
- "how the channel or socket is created.");
- }
-
- if (!(channel instanceof SelectableChannel)) {
- throw new IOException("Channel should be a SelectableChannel");
- }
- }
-
- /**
- * Performs actual IO operations. This is not expected to block.
- *
- * @param buf
- * @return number of bytes (or some equivalent). 0 implies underlying
- * channel is drained completely. We will wait if more IO is
- * required.
- * @throws IOException
- */
- abstract int performIO(ByteBuffer buf) throws IOException;
-
- /**
- * Performs one IO and returns number of bytes read or written.
- * It waits up to the specified timeout. If the channel is
- * not read before the timeout, SocketTimeoutException is thrown.
- *
- * @param buf buffer for IO
- * @param ops Selection Ops used for waiting. Suggested values:
- * SelectionKey.OP_READ while reading and SelectionKey.OP_WRITE while
- * writing.
- *
- * @return number of bytes read or written. negative implies end of stream.
- * @throws IOException
- */
- int doIO(ByteBuffer buf, int ops) throws IOException {
-
- /* For now only one thread is allowed. If user want to read or write
- * from multiple threads, multiple streams could be created. In that
- * case multiple threads work as well as underlying channel supports it.
- */
- if (!buf.hasRemaining()) {
- throw new IllegalArgumentException("Buffer has no data left.");
- //or should we just return 0?
- }
- while (buf.hasRemaining()) {
- if (closed) {
- return -1;
- }
- try {
- int n = performIO(buf);
- if (n != 0) {
- // successful io or an error.
- return n;
- }
- } catch (IOException e) {
- if (!channel.isOpen()) {
- closed = true;
- }
- throw e;
- }
- //now wait for socket to be ready.
- int count = 0;
- try {
- count = selector.select(channel, ops, timeout);
- } catch (IOException e) { //unexpected IOException.
- closed = true;
- throw e;
- }
- if (count == 0) {
- throw new SocketTimeoutException(timeoutExceptionString(channel,
- timeout, ops));
- }
- // otherwise the socket should be ready for io.
- }
-
- return 0; // does not reach here.
- }
-
- /**
- * The contract is similar to {@link SocketChannel#connect(SocketAddress)}
- * with a timeout.
- *
- * @see SocketChannel#connect(SocketAddress)
- *
- * @param channel - this should be a {@link SelectableChannel}
- * @param endpoint
- * @throws IOException
- */
- static void connect(SocketChannel channel,
- SocketAddress endpoint, int timeout) throws IOException {
-
- boolean blockingOn = channel.isBlocking();
- if (blockingOn) {
- channel.configureBlocking(false);
- }
-
- try {
- if (channel.connect(endpoint)) {
- return;
- }
- long timeoutLeft = timeout;
- long endTime = (timeout > 0) ? (System.currentTimeMillis() + timeout): 0;
-
- while (true) {
- // we might have to call finishConnect() more than once
- // for some channels (with user level protocols)
-
- int ret = selector.select((SelectableChannel)channel,
- SelectionKey.OP_CONNECT, timeoutLeft);
-
- if (ret > 0 && channel.finishConnect()) {
- return;
- }
-
- if (ret == 0 ||
- (timeout > 0 &&
- (timeoutLeft = (endTime - System.currentTimeMillis())) <= 0)) {
- throw new SocketTimeoutException(
- timeoutExceptionString(channel, timeout,
- SelectionKey.OP_CONNECT));
- }
- }
- } catch (IOException e) {
- // javadoc for SocketChannel.connect() says channel should be closed.
- try {
- channel.close();
- } catch (IOException ignored) {}
- throw e;
- } finally {
- if (blockingOn && channel.isOpen()) {
- channel.configureBlocking(true);
- }
- }
- }
- /**
- * This is similar to {@link #doIO(ByteBuffer, int)} except that it
- * does not perform any I/O. It just waits for the channel to be ready
- * for I/O as specified in ops.
- *
- * @param ops Selection Ops used for waiting
- *
- * @throws SocketTimeoutException
- * if select on the channel times out.
- * @throws IOException
- * if any other I/O error occurs.
- */
- void waitForIO(int ops) throws IOException {
-
- if (selector.select(channel, ops, timeout) == 0) {
- throw new SocketTimeoutException(timeoutExceptionString(channel, timeout,
- ops));
- }
- }
-
- private static String timeoutExceptionString(SelectableChannel channel,
- long timeout, int ops) {
-
- String waitingFor;
- switch(ops) {
-
- case SelectionKey.OP_READ :
- waitingFor = "read"; break;
-
- case SelectionKey.OP_WRITE :
- waitingFor = "write"; break;
-
- case SelectionKey.OP_CONNECT :
- waitingFor = "connect"; break;
-
- default :
- waitingFor = "" + ops;
- }
-
- return timeout + " millis timeout while " +
- "waiting for channel to be ready for " +
- waitingFor + ". ch : " + channel;
- }
-
- /**
- * This maintains a pool of selectors. These selectors are closed
- * once they are idle (unused) for a few seconds.
- */
- private static class SelectorPool {
-
- private static class SelectorInfo {
- Selector selector;
- long lastActivityTime;
- LinkedList<SelectorInfo> queue;
-
- void close() {
- if (selector != null) {
- try {
- selector.close();
- } catch (IOException e) {
- LOG.warn("Unexpected exception while closing selector : " +
- StringUtils.stringifyException(e));
- }
- }
- }
- }
-
- private static class ProviderInfo {
- SelectorProvider provider;
- LinkedList<SelectorInfo> queue; // lifo
- ProviderInfo next;
- }
-
- private static final long IDLE_TIMEOUT = 10 * 1000; // 10 seconds.
-
- private ProviderInfo providerList = null;
-
- /**
- * Waits on the channel with the given timeout using one of the
- * cached selectors. It also removes any cached selectors that are
- * idle for a few seconds.
- *
- * @param channel
- * @param ops
- * @param timeout
- * @return
- * @throws IOException
- */
- int select(SelectableChannel channel, int ops, long timeout)
- throws IOException {
-
- SelectorInfo info = get(channel);
-
- SelectionKey key = null;
- int ret = 0;
-
- try {
- while (true) {
- long start = (timeout == 0) ? 0 : System.currentTimeMillis();
- key = channel.register(info.selector, ops);
- ret = info.selector.select(timeout);
-
- if (ret != 0) {
- return ret;
- }
-
- /* Sometimes select() returns 0 much before timeout for
- * unknown reasons. So select again if required.
- */
- if (timeout > 0) {
- timeout -= System.currentTimeMillis() - start;
- if (timeout <= 0) {
- return 0;
- }
- }
-
- if (Thread.currentThread().isInterrupted()) {
- throw new InterruptedIOException("Interruped while waiting for " +
- "IO on channel " + channel +
- ". " + timeout +
- " millis timeout left.");
- }
- }
- } finally {
- if (key != null) {
- key.cancel();
- }
-
- //clear the canceled key.
- try {
- info.selector.selectNow();
- } catch (IOException e) {
- LOG.info("Unexpected Exception while clearing selector : " +
- StringUtils.stringifyException(e));
- // don't put the selector back.
- info.close();
- return ret;
- }
-
- release(info);
- }
- }
-
- /**
- * Takes one selector from end of LRU list of free selectors.
- * If there are no selectors awailable, it creates a new selector.
- * Also invokes trimIdleSelectors().
- *
- * @param channel
- * @return
- * @throws IOException
- */
- private synchronized SelectorInfo get(SelectableChannel channel)
- throws IOException {
- SelectorInfo selInfo = null;
-
- SelectorProvider provider = channel.provider();
-
- // pick the list : rarely there is more than one provider in use.
- ProviderInfo pList = providerList;
- while (pList != null && pList.provider != provider) {
- pList = pList.next;
- }
- if (pList == null) {
- //LOG.info("Creating new ProviderInfo : " + provider.toString());
- pList = new ProviderInfo();
- pList.provider = provider;
- pList.queue = new LinkedList<SelectorInfo>();
- pList.next = providerList;
- providerList = pList;
- }
-
- LinkedList<SelectorInfo> queue = pList.queue;
-
- if (queue.isEmpty()) {
- Selector selector = provider.openSelector();
- selInfo = new SelectorInfo();
- selInfo.selector = selector;
- selInfo.queue = queue;
- } else {
- selInfo = queue.removeLast();
- }
-
- trimIdleSelectors(System.currentTimeMillis());
- return selInfo;
- }
-
- /**
- * puts selector back at the end of LRU list of free selectos.
- * Also invokes trimIdleSelectors().
- *
- * @param info
- */
- private synchronized void release(SelectorInfo info) {
- long now = System.currentTimeMillis();
- trimIdleSelectors(now);
- info.lastActivityTime = now;
- info.queue.addLast(info);
- }
-
- /**
- * Closes selectors that are idle for IDLE_TIMEOUT (10 sec). It does not
- * traverse the whole list, just over the one that have crossed
- * the timeout.
- */
- private void trimIdleSelectors(long now) {
- long cutoff = now - IDLE_TIMEOUT;
-
- for(ProviderInfo pList=providerList; pList != null; pList=pList.next) {
- if (pList.queue.isEmpty()) {
- continue;
- }
- for(Iterator<SelectorInfo> it = pList.queue.iterator(); it.hasNext();) {
- SelectorInfo info = it.next();
- if (info.lastActivityTime > cutoff) {
- break;
- }
- it.remove();
- info.close();
- }
- }
- }
- }
- }