SocketIOWithTimeout.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:14k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.net;
  19. import java.io.IOException;
  20. import java.io.InterruptedIOException;
  21. import java.net.SocketAddress;
  22. import java.net.SocketTimeoutException;
  23. import java.nio.ByteBuffer;
  24. import java.nio.channels.SelectableChannel;
  25. import java.nio.channels.SelectionKey;
  26. import java.nio.channels.Selector;
  27. import java.nio.channels.SocketChannel;
  28. import java.nio.channels.spi.SelectorProvider;
  29. import java.util.Iterator;
  30. import java.util.LinkedList;
  31. import org.apache.commons.logging.Log;
  32. import org.apache.commons.logging.LogFactory;
  33. import org.apache.hadoop.util.StringUtils;
  34. /**
  35.  * This supports input and output streams for a socket channels. 
  36.  * These streams can have a timeout.
  37.  */
  38. abstract class SocketIOWithTimeout {
  39.   // This is intentionally package private.
  40.   static final Log LOG = LogFactory.getLog(SocketIOWithTimeout.class);    
  41.   
  42.   private SelectableChannel channel;
  43.   private long timeout;
  44.   private boolean closed = false;
  45.   
  46.   private static SelectorPool selector = new SelectorPool();
  47.   
  48.   /* A timeout value of 0 implies wait for ever. 
  49.    * We should have a value of timeout that implies zero wait.. i.e. 
  50.    * read or write returns immediately.
  51.    * 
  52.    * This will set channel to non-blocking.
  53.    */
  54.   SocketIOWithTimeout(SelectableChannel channel, long timeout) 
  55.                                                  throws IOException {
  56.     checkChannelValidity(channel);
  57.     
  58.     this.channel = channel;
  59.     this.timeout = timeout;
  60.     // Set non-blocking
  61.     channel.configureBlocking(false);
  62.   }
  63.   
  64.   void close() {
  65.     closed = true;
  66.   }
  67.   boolean isOpen() {
  68.     return !closed && channel.isOpen();
  69.   }
  70.   SelectableChannel getChannel() {
  71.     return channel;
  72.   }
  73.   
  74.   /** 
  75.    * Utility function to check if channel is ok.
  76.    * Mainly to throw IOException instead of runtime exception
  77.    * in case of mismatch. This mismatch can occur for many runtime
  78.    * reasons.
  79.    */
  80.   static void checkChannelValidity(Object channel) throws IOException {
  81.     if (channel == null) {
  82.       /* Most common reason is that original socket does not have a channel.
  83.        * So making this an IOException rather than a RuntimeException.
  84.        */
  85.       throw new IOException("Channel is null. Check " +
  86.                             "how the channel or socket is created.");
  87.     }
  88.     
  89.     if (!(channel instanceof SelectableChannel)) {
  90.       throw new IOException("Channel should be a SelectableChannel");
  91.     }    
  92.   }
  93.   
  94.   /**
  95.    * Performs actual IO operations. This is not expected to block.
  96.    *  
  97.    * @param buf
  98.    * @return number of bytes (or some equivalent). 0 implies underlying
  99.    *         channel is drained completely. We will wait if more IO is 
  100.    *         required.
  101.    * @throws IOException
  102.    */
  103.   abstract int performIO(ByteBuffer buf) throws IOException;  
  104.   
  105.   /**
  106.    * Performs one IO and returns number of bytes read or written.
  107.    * It waits up to the specified timeout. If the channel is 
  108.    * not read before the timeout, SocketTimeoutException is thrown.
  109.    * 
  110.    * @param buf buffer for IO
  111.    * @param ops Selection Ops used for waiting. Suggested values: 
  112.    *        SelectionKey.OP_READ while reading and SelectionKey.OP_WRITE while
  113.    *        writing. 
  114.    *        
  115.    * @return number of bytes read or written. negative implies end of stream.
  116.    * @throws IOException
  117.    */
  118.   int doIO(ByteBuffer buf, int ops) throws IOException {
  119.     
  120.     /* For now only one thread is allowed. If user want to read or write
  121.      * from multiple threads, multiple streams could be created. In that
  122.      * case multiple threads work as well as underlying channel supports it.
  123.      */
  124.     if (!buf.hasRemaining()) {
  125.       throw new IllegalArgumentException("Buffer has no data left.");
  126.       //or should we just return 0?
  127.     }
  128.     while (buf.hasRemaining()) {
  129.       if (closed) {
  130.         return -1;
  131.       }
  132.       try {
  133.         int n = performIO(buf);
  134.         if (n != 0) {
  135.           // successful io or an error.
  136.           return n;
  137.         }
  138.       } catch (IOException e) {
  139.         if (!channel.isOpen()) {
  140.           closed = true;
  141.         }
  142.         throw e;
  143.       }
  144.       //now wait for socket to be ready.
  145.       int count = 0;
  146.       try {
  147.         count = selector.select(channel, ops, timeout);  
  148.       } catch (IOException e) { //unexpected IOException.
  149.         closed = true;
  150.         throw e;
  151.       } 
  152.       if (count == 0) {
  153.         throw new SocketTimeoutException(timeoutExceptionString(channel,
  154.                                                                 timeout, ops));
  155.       }
  156.       // otherwise the socket should be ready for io.
  157.     }
  158.     
  159.     return 0; // does not reach here.
  160.   }
  161.   
  162.   /**
  163.    * The contract is similar to {@link SocketChannel#connect(SocketAddress)} 
  164.    * with a timeout.
  165.    * 
  166.    * @see SocketChannel#connect(SocketAddress)
  167.    * 
  168.    * @param channel - this should be a {@link SelectableChannel}
  169.    * @param endpoint
  170.    * @throws IOException
  171.    */
  172.   static void connect(SocketChannel channel, 
  173.                       SocketAddress endpoint, int timeout) throws IOException {
  174.     
  175.     boolean blockingOn = channel.isBlocking();
  176.     if (blockingOn) {
  177.       channel.configureBlocking(false);
  178.     }
  179.     
  180.     try { 
  181.       if (channel.connect(endpoint)) {
  182.         return;
  183.       }
  184.       long timeoutLeft = timeout;
  185.       long endTime = (timeout > 0) ? (System.currentTimeMillis() + timeout): 0;
  186.       
  187.       while (true) {
  188.         // we might have to call finishConnect() more than once
  189.         // for some channels (with user level protocols)
  190.         
  191.         int ret = selector.select((SelectableChannel)channel, 
  192.                                   SelectionKey.OP_CONNECT, timeoutLeft);
  193.         
  194.         if (ret > 0 && channel.finishConnect()) {
  195.           return;
  196.         }
  197.         
  198.         if (ret == 0 ||
  199.             (timeout > 0 &&  
  200.               (timeoutLeft = (endTime - System.currentTimeMillis())) <= 0)) {
  201.           throw new SocketTimeoutException(
  202.                     timeoutExceptionString(channel, timeout, 
  203.                                            SelectionKey.OP_CONNECT));
  204.         }
  205.       }
  206.     } catch (IOException e) {
  207.       // javadoc for SocketChannel.connect() says channel should be closed.
  208.       try {
  209.         channel.close();
  210.       } catch (IOException ignored) {}
  211.       throw e;
  212.     } finally {
  213.       if (blockingOn && channel.isOpen()) {
  214.         channel.configureBlocking(true);
  215.       }
  216.     }
  217.   }
  218.   /**
  219.    * This is similar to {@link #doIO(ByteBuffer, int)} except that it
  220.    * does not perform any I/O. It just waits for the channel to be ready
  221.    * for I/O as specified in ops.
  222.    * 
  223.    * @param ops Selection Ops used for waiting
  224.    * 
  225.    * @throws SocketTimeoutException 
  226.    *         if select on the channel times out.
  227.    * @throws IOException
  228.    *         if any other I/O error occurs. 
  229.    */
  230.   void waitForIO(int ops) throws IOException {
  231.     
  232.     if (selector.select(channel, ops, timeout) == 0) {
  233.       throw new SocketTimeoutException(timeoutExceptionString(channel, timeout,
  234.                                                               ops)); 
  235.     }
  236.   }
  237.     
  238.   private static String timeoutExceptionString(SelectableChannel channel,
  239.                                                long timeout, int ops) {
  240.     
  241.     String waitingFor;
  242.     switch(ops) {
  243.     
  244.     case SelectionKey.OP_READ :
  245.       waitingFor = "read"; break;
  246.       
  247.     case SelectionKey.OP_WRITE :
  248.       waitingFor = "write"; break;      
  249.       
  250.     case SelectionKey.OP_CONNECT :
  251.       waitingFor = "connect"; break;
  252.       
  253.     default :
  254.       waitingFor = "" + ops;  
  255.     }
  256.     
  257.     return timeout + " millis timeout while " +
  258.            "waiting for channel to be ready for " + 
  259.            waitingFor + ". ch : " + channel;    
  260.   }
  261.   
  262.   /**
  263.    * This maintains a pool of selectors. These selectors are closed
  264.    * once they are idle (unused) for a few seconds.
  265.    */
  266.   private static class SelectorPool {
  267.     
  268.     private static class SelectorInfo {
  269.       Selector              selector;
  270.       long                  lastActivityTime;
  271.       LinkedList<SelectorInfo> queue; 
  272.       
  273.       void close() {
  274.         if (selector != null) {
  275.           try {
  276.             selector.close();
  277.           } catch (IOException e) {
  278.             LOG.warn("Unexpected exception while closing selector : " +
  279.                      StringUtils.stringifyException(e));
  280.           }
  281.         }
  282.       }    
  283.     }
  284.     
  285.     private static class ProviderInfo {
  286.       SelectorProvider provider;
  287.       LinkedList<SelectorInfo> queue; // lifo
  288.       ProviderInfo next;
  289.     }
  290.     
  291.     private static final long IDLE_TIMEOUT = 10 * 1000; // 10 seconds.
  292.     
  293.     private ProviderInfo providerList = null;
  294.     
  295.     /**
  296.      * Waits on the channel with the given timeout using one of the 
  297.      * cached selectors. It also removes any cached selectors that are
  298.      * idle for a few seconds.
  299.      * 
  300.      * @param channel
  301.      * @param ops
  302.      * @param timeout
  303.      * @return
  304.      * @throws IOException
  305.      */
  306.     int select(SelectableChannel channel, int ops, long timeout) 
  307.                                                    throws IOException {
  308.      
  309.       SelectorInfo info = get(channel);
  310.       
  311.       SelectionKey key = null;
  312.       int ret = 0;
  313.       
  314.       try {
  315.         while (true) {
  316.           long start = (timeout == 0) ? 0 : System.currentTimeMillis();
  317.           key = channel.register(info.selector, ops);
  318.           ret = info.selector.select(timeout);
  319.           
  320.           if (ret != 0) {
  321.             return ret;
  322.           }
  323.           
  324.           /* Sometimes select() returns 0 much before timeout for 
  325.            * unknown reasons. So select again if required.
  326.            */
  327.           if (timeout > 0) {
  328.             timeout -= System.currentTimeMillis() - start;
  329.             if (timeout <= 0) {
  330.               return 0;
  331.             }
  332.           }
  333.           
  334.           if (Thread.currentThread().isInterrupted()) {
  335.             throw new InterruptedIOException("Interruped while waiting for " +
  336.                                              "IO on channel " + channel +
  337.                                              ". " + timeout + 
  338.                                              " millis timeout left.");
  339.           }
  340.         }
  341.       } finally {
  342.         if (key != null) {
  343.           key.cancel();
  344.         }
  345.         
  346.         //clear the canceled key.
  347.         try {
  348.           info.selector.selectNow();
  349.         } catch (IOException e) {
  350.           LOG.info("Unexpected Exception while clearing selector : " +
  351.                    StringUtils.stringifyException(e));
  352.           // don't put the selector back.
  353.           info.close();
  354.           return ret; 
  355.         }
  356.         
  357.         release(info);
  358.       }
  359.     }
  360.     
  361.     /**
  362.      * Takes one selector from end of LRU list of free selectors.
  363.      * If there are no selectors awailable, it creates a new selector.
  364.      * Also invokes trimIdleSelectors(). 
  365.      * 
  366.      * @param channel
  367.      * @return 
  368.      * @throws IOException
  369.      */
  370.     private synchronized SelectorInfo get(SelectableChannel channel) 
  371.                                                          throws IOException {
  372.       SelectorInfo selInfo = null;
  373.       
  374.       SelectorProvider provider = channel.provider();
  375.       
  376.       // pick the list : rarely there is more than one provider in use.
  377.       ProviderInfo pList = providerList;
  378.       while (pList != null && pList.provider != provider) {
  379.         pList = pList.next;
  380.       }      
  381.       if (pList == null) {
  382.         //LOG.info("Creating new ProviderInfo : " + provider.toString());
  383.         pList = new ProviderInfo();
  384.         pList.provider = provider;
  385.         pList.queue = new LinkedList<SelectorInfo>();
  386.         pList.next = providerList;
  387.         providerList = pList;
  388.       }
  389.       
  390.       LinkedList<SelectorInfo> queue = pList.queue;
  391.       
  392.       if (queue.isEmpty()) {
  393.         Selector selector = provider.openSelector();
  394.         selInfo = new SelectorInfo();
  395.         selInfo.selector = selector;
  396.         selInfo.queue = queue;
  397.       } else {
  398.         selInfo = queue.removeLast();
  399.       }
  400.       
  401.       trimIdleSelectors(System.currentTimeMillis());
  402.       return selInfo;
  403.     }
  404.     
  405.     /**
  406.      * puts selector back at the end of LRU list of free selectos.
  407.      * Also invokes trimIdleSelectors().
  408.      * 
  409.      * @param info
  410.      */
  411.     private synchronized void release(SelectorInfo info) {
  412.       long now = System.currentTimeMillis();
  413.       trimIdleSelectors(now);
  414.       info.lastActivityTime = now;
  415.       info.queue.addLast(info);
  416.     }
  417.     
  418.     /**
  419.      * Closes selectors that are idle for IDLE_TIMEOUT (10 sec). It does not
  420.      * traverse the whole list, just over the one that have crossed 
  421.      * the timeout.
  422.      */
  423.     private void trimIdleSelectors(long now) {
  424.       long cutoff = now - IDLE_TIMEOUT;
  425.       
  426.       for(ProviderInfo pList=providerList; pList != null; pList=pList.next) {
  427.         if (pList.queue.isEmpty()) {
  428.           continue;
  429.         }
  430.         for(Iterator<SelectorInfo> it = pList.queue.iterator(); it.hasNext();) {
  431.           SelectorInfo info = it.next();
  432.           if (info.lastActivityTime > cutoff) {
  433.             break;
  434.           }
  435.           it.remove();
  436.           info.close();
  437.         }
  438.       }
  439.     }
  440.   }
  441. }