NetUtils.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:16k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.net;
  19. import java.io.IOException;
  20. import java.io.InputStream;
  21. import java.io.OutputStream;
  22. import java.net.InetAddress;
  23. import java.net.InetSocketAddress;
  24. import java.net.Socket;
  25. import java.net.SocketAddress;
  26. import java.net.URI;
  27. import java.net.UnknownHostException;
  28. import java.nio.channels.SocketChannel;
  29. import java.util.Map.Entry;
  30. import java.util.*;
  31. import javax.net.SocketFactory;
  32. import org.apache.commons.logging.Log;
  33. import org.apache.commons.logging.LogFactory;
  34. import org.apache.hadoop.conf.Configuration;
  35. import org.apache.hadoop.fs.Path;
  36. import org.apache.hadoop.ipc.Server;
  37. import org.apache.hadoop.ipc.VersionedProtocol;
  38. import org.apache.hadoop.util.ReflectionUtils;
  39. public class NetUtils {
  40.   private static final Log LOG = LogFactory.getLog(NetUtils.class);
  41.   
  42.   private static Map<String, String> hostToResolved = 
  43.                                      new HashMap<String, String>();
  44.   /**
  45.    * Get the socket factory for the given class according to its
  46.    * configuration parameter
  47.    * <tt>hadoop.rpc.socket.factory.class.&lt;ClassName&gt;</tt>. When no
  48.    * such parameter exists then fall back on the default socket factory as
  49.    * configured by <tt>hadoop.rpc.socket.factory.class.default</tt>. If
  50.    * this default socket factory is not configured, then fall back on the JVM
  51.    * default socket factory.
  52.    * 
  53.    * @param conf the configuration
  54.    * @param clazz the class (usually a {@link VersionedProtocol})
  55.    * @return a socket factory
  56.    */
  57.   public static SocketFactory getSocketFactory(Configuration conf,
  58.       Class<?> clazz) {
  59.     SocketFactory factory = null;
  60.     String propValue =
  61.         conf.get("hadoop.rpc.socket.factory.class." + clazz.getSimpleName());
  62.     if ((propValue != null) && (propValue.length() > 0))
  63.       factory = getSocketFactoryFromProperty(conf, propValue);
  64.     if (factory == null)
  65.       factory = getDefaultSocketFactory(conf);
  66.     return factory;
  67.   }
  68.   /**
  69.    * Get the default socket factory as specified by the configuration
  70.    * parameter <tt>hadoop.rpc.socket.factory.default</tt>
  71.    * 
  72.    * @param conf the configuration
  73.    * @return the default socket factory as specified in the configuration or
  74.    *         the JVM default socket factory if the configuration does not
  75.    *         contain a default socket factory property.
  76.    */
  77.   public static SocketFactory getDefaultSocketFactory(Configuration conf) {
  78.     String propValue = conf.get("hadoop.rpc.socket.factory.class.default");
  79.     if ((propValue == null) || (propValue.length() == 0))
  80.       return SocketFactory.getDefault();
  81.     return getSocketFactoryFromProperty(conf, propValue);
  82.   }
  83.   /**
  84.    * Get the socket factory corresponding to the given proxy URI. If the
  85.    * given proxy URI corresponds to an absence of configuration parameter,
  86.    * returns null. If the URI is malformed raises an exception.
  87.    * 
  88.    * @param propValue the property which is the class name of the
  89.    *        SocketFactory to instantiate; assumed non null and non empty.
  90.    * @return a socket factory as defined in the property value.
  91.    */
  92.   public static SocketFactory getSocketFactoryFromProperty(
  93.       Configuration conf, String propValue) {
  94.     try {
  95.       Class<?> theClass = conf.getClassByName(propValue);
  96.       return (SocketFactory) ReflectionUtils.newInstance(theClass, conf);
  97.     } catch (ClassNotFoundException cnfe) {
  98.       throw new RuntimeException("Socket Factory class not found: " + cnfe);
  99.     }
  100.   }
  101.   /**
  102.    * Util method to build socket addr from either:
  103.    *   <host>:<post>
  104.    *   <fs>://<host>:<port>/<path>
  105.    */
  106.   public static InetSocketAddress createSocketAddr(String target) {
  107.     return createSocketAddr(target, -1);
  108.   }
  109.   /**
  110.    * Util method to build socket addr from either:
  111.    *   <host>
  112.    *   <host>:<post>
  113.    *   <fs>://<host>:<port>/<path>
  114.    */
  115.   public static InetSocketAddress createSocketAddr(String target,
  116.                                                    int defaultPort) {
  117.     int colonIndex = target.indexOf(':');
  118.     if (colonIndex < 0 && defaultPort == -1) {
  119.       throw new RuntimeException("Not a host:port pair: " + target);
  120.     }
  121.     String hostname;
  122.     int port = -1;
  123.     if (!target.contains("/")) {
  124.       if (colonIndex == -1) {
  125.         hostname = target;
  126.       } else {
  127.         // must be the old style <host>:<port>
  128.         hostname = target.substring(0, colonIndex);
  129.         port = Integer.parseInt(target.substring(colonIndex + 1));
  130.       }
  131.     } else {
  132.       // a new uri
  133.       URI addr = new Path(target).toUri();
  134.       hostname = addr.getHost();
  135.       port = addr.getPort();
  136.     }
  137.     if (port == -1) {
  138.       port = defaultPort;
  139.     }
  140.   
  141.     if (getStaticResolution(hostname) != null) {
  142.       hostname = getStaticResolution(hostname);
  143.     }
  144.     return new InetSocketAddress(hostname, port);
  145.   }
  146.   /**
  147.    * Handle the transition from pairs of attributes specifying a host and port
  148.    * to a single colon separated one.
  149.    * @param conf the configuration to check
  150.    * @param oldBindAddressName the old address attribute name
  151.    * @param oldPortName the old port attribute name
  152.    * @param newBindAddressName the new combined name
  153.    * @return the complete address from the configuration
  154.    */
  155.   @Deprecated
  156.   public static String getServerAddress(Configuration conf,
  157.                                         String oldBindAddressName,
  158.                                         String oldPortName,
  159.                                         String newBindAddressName) {
  160.     String oldAddr = conf.get(oldBindAddressName);
  161.     String oldPort = conf.get(oldPortName);
  162.     String newAddrPort = conf.get(newBindAddressName);
  163.     if (oldAddr == null && oldPort == null) {
  164.       return newAddrPort;
  165.     }
  166.     String[] newAddrPortParts = newAddrPort.split(":",2);
  167.     if (newAddrPortParts.length != 2) {
  168.       throw new IllegalArgumentException("Invalid address/port: " + 
  169.                                          newAddrPort);
  170.     }
  171.     if (oldAddr == null) {
  172.       oldAddr = newAddrPortParts[0];
  173.     } else {
  174.       LOG.warn("Configuration parameter " + oldBindAddressName +
  175.                " is deprecated. Use " + newBindAddressName + " instead.");
  176.     }
  177.     if (oldPort == null) {
  178.       oldPort = newAddrPortParts[1];
  179.     } else {
  180.       LOG.warn("Configuration parameter " + oldPortName +
  181.                " is deprecated. Use " + newBindAddressName + " instead.");      
  182.     }
  183.     return oldAddr + ":" + oldPort;
  184.   }
  185.   
  186.   /**
  187.    * Adds a static resolution for host. This can be used for setting up
  188.    * hostnames with names that are fake to point to a well known host. For e.g.
  189.    * in some testcases we require to have daemons with different hostnames
  190.    * running on the same machine. In order to create connections to these
  191.    * daemons, one can set up mappings from those hostnames to "localhost".
  192.    * {@link NetUtils#getStaticResolution(String)} can be used to query for
  193.    * the actual hostname. 
  194.    * @param host
  195.    * @param resolvedName
  196.    */
  197.   public static void addStaticResolution(String host, String resolvedName) {
  198.     synchronized (hostToResolved) {
  199.       hostToResolved.put(host, resolvedName);
  200.     }
  201.   }
  202.   
  203.   /**
  204.    * Retrieves the resolved name for the passed host. The resolved name must
  205.    * have been set earlier using 
  206.    * {@link NetUtils#addStaticResolution(String, String)}
  207.    * @param host
  208.    * @return the resolution
  209.    */
  210.   public static String getStaticResolution(String host) {
  211.     synchronized (hostToResolved) {
  212.       return hostToResolved.get(host);
  213.     }
  214.   }
  215.   
  216.   /**
  217.    * This is used to get all the resolutions that were added using
  218.    * {@link NetUtils#addStaticResolution(String, String)}. The return
  219.    * value is a List each element of which contains an array of String 
  220.    * of the form String[0]=hostname, String[1]=resolved-hostname
  221.    * @return the list of resolutions
  222.    */
  223.   public static List <String[]> getAllStaticResolutions() {
  224.     synchronized (hostToResolved) {
  225.       Set <Entry <String, String>>entries = hostToResolved.entrySet();
  226.       if (entries.size() == 0) {
  227.         return null;
  228.       }
  229.       List <String[]> l = new ArrayList<String[]>(entries.size());
  230.       for (Entry<String, String> e : entries) {
  231.         l.add(new String[] {e.getKey(), e.getValue()});
  232.       }
  233.     return l;
  234.     }
  235.   }
  236.   
  237.   /**
  238.    * Returns InetSocketAddress that a client can use to 
  239.    * connect to the server. Server.getListenerAddress() is not correct when
  240.    * the server binds to "0.0.0.0". This returns "127.0.0.1:port" when
  241.    * the getListenerAddress() returns "0.0.0.0:port".
  242.    * 
  243.    * @param server
  244.    * @return socket address that a client can use to connect to the server.
  245.    */
  246.   public static InetSocketAddress getConnectAddress(Server server) {
  247.     InetSocketAddress addr = server.getListenerAddress();
  248.     if (addr.getAddress().getHostAddress().equals("0.0.0.0")) {
  249.       addr = new InetSocketAddress("127.0.0.1", addr.getPort());
  250.     }
  251.     return addr;
  252.   }
  253.   
  254.   /**
  255.    * Same as getInputStream(socket, socket.getSoTimeout()).<br><br>
  256.    * 
  257.    * From documentation for {@link #getInputStream(Socket, long)}:<br>
  258.    * Returns InputStream for the socket. If the socket has an associated
  259.    * SocketChannel then it returns a 
  260.    * {@link SocketInputStream} with the given timeout. If the socket does not
  261.    * have a channel, {@link Socket#getInputStream()} is returned. In the later
  262.    * case, the timeout argument is ignored and the timeout set with 
  263.    * {@link Socket#setSoTimeout(int)} applies for reads.<br><br>
  264.    *
  265.    * Any socket created using socket factories returned by {@link #NetUtils},
  266.    * must use this interface instead of {@link Socket#getInputStream()}.
  267.    *     
  268.    * @see #getInputStream(Socket, long)
  269.    * 
  270.    * @param socket
  271.    * @return InputStream for reading from the socket.
  272.    * @throws IOException
  273.    */
  274.   public static InputStream getInputStream(Socket socket) 
  275.                                            throws IOException {
  276.     return getInputStream(socket, socket.getSoTimeout());
  277.   }
  278.   
  279.   /**
  280.    * Returns InputStream for the socket. If the socket has an associated
  281.    * SocketChannel then it returns a 
  282.    * {@link SocketInputStream} with the given timeout. If the socket does not
  283.    * have a channel, {@link Socket#getInputStream()} is returned. In the later
  284.    * case, the timeout argument is ignored and the timeout set with 
  285.    * {@link Socket#setSoTimeout(int)} applies for reads.<br><br>
  286.    * 
  287.    * Any socket created using socket factories returned by {@link #NetUtils},
  288.    * must use this interface instead of {@link Socket#getInputStream()}.
  289.    *     
  290.    * @see Socket#getChannel()
  291.    * 
  292.    * @param socket
  293.    * @param timeout timeout in milliseconds. This may not always apply. zero
  294.    *        for waiting as long as necessary.
  295.    * @return InputStream for reading from the socket.
  296.    * @throws IOException
  297.    */
  298.   public static InputStream getInputStream(Socket socket, long timeout) 
  299.                                            throws IOException {
  300.     return (socket.getChannel() == null) ? 
  301.           socket.getInputStream() : new SocketInputStream(socket, timeout);
  302.   }
  303.   
  304.   /**
  305.    * Same as getOutputStream(socket, 0). Timeout of zero implies write will
  306.    * wait until data is available.<br><br>
  307.    * 
  308.    * From documentation for {@link #getOutputStream(Socket, long)} : <br>
  309.    * Returns OutputStream for the socket. If the socket has an associated
  310.    * SocketChannel then it returns a 
  311.    * {@link SocketOutputStream} with the given timeout. If the socket does not
  312.    * have a channel, {@link Socket#getOutputStream()} is returned. In the later
  313.    * case, the timeout argument is ignored and the write will wait until 
  314.    * data is available.<br><br>
  315.    * 
  316.    * Any socket created using socket factories returned by {@link #NetUtils},
  317.    * must use this interface instead of {@link Socket#getOutputStream()}.
  318.    * 
  319.    * @see #getOutputStream(Socket, long)
  320.    * 
  321.    * @param socket
  322.    * @return OutputStream for writing to the socket.
  323.    * @throws IOException
  324.    */  
  325.   public static OutputStream getOutputStream(Socket socket) 
  326.                                              throws IOException {
  327.     return getOutputStream(socket, 0);
  328.   }
  329.   
  330.   /**
  331.    * Returns OutputStream for the socket. If the socket has an associated
  332.    * SocketChannel then it returns a 
  333.    * {@link SocketOutputStream} with the given timeout. If the socket does not
  334.    * have a channel, {@link Socket#getOutputStream()} is returned. In the later
  335.    * case, the timeout argument is ignored and the write will wait until 
  336.    * data is available.<br><br>
  337.    * 
  338.    * Any socket created using socket factories returned by {@link #NetUtils},
  339.    * must use this interface instead of {@link Socket#getOutputStream()}.
  340.    * 
  341.    * @see Socket#getChannel()
  342.    * 
  343.    * @param socket
  344.    * @param timeout timeout in milliseconds. This may not always apply. zero
  345.    *        for waiting as long as necessary.
  346.    * @return OutputStream for writing to the socket.
  347.    * @throws IOException   
  348.    */
  349.   public static OutputStream getOutputStream(Socket socket, long timeout) 
  350.                                              throws IOException {
  351.     return (socket.getChannel() == null) ? 
  352.             socket.getOutputStream() : new SocketOutputStream(socket, timeout);            
  353.   }
  354.   
  355.   /**
  356.    * This is a drop-in replacement for 
  357.    * {@link Socket#connect(SocketAddress, int)}.
  358.    * In the case of normal sockets that don't have associated channels, this 
  359.    * just invokes <code>socket.connect(endpoint, timeout)</code>. If 
  360.    * <code>socket.getChannel()</code> returns a non-null channel,
  361.    * connect is implemented using Hadoop's selectors. This is done mainly
  362.    * to avoid Sun's connect implementation from creating thread-local 
  363.    * selectors, since Hadoop does not have control on when these are closed
  364.    * and could end up taking all the available file descriptors.
  365.    * 
  366.    * @see java.net.Socket#connect(java.net.SocketAddress, int)
  367.    * 
  368.    * @param socket
  369.    * @param endpoint 
  370.    * @param timeout - timeout in milliseconds
  371.    */
  372.   public static void connect(Socket socket, 
  373.                              SocketAddress endpoint, 
  374.                              int timeout) throws IOException {
  375.     if (socket == null || endpoint == null || timeout < 0) {
  376.       throw new IllegalArgumentException("Illegal argument for connect()");
  377.     }
  378.     
  379.     SocketChannel ch = socket.getChannel();
  380.     
  381.     if (ch == null) {
  382.       // let the default implementation handle it.
  383.       socket.connect(endpoint, timeout);
  384.     } else {
  385.       SocketIOWithTimeout.connect(ch, endpoint, timeout);
  386.     }
  387.   }
  388.   
  389.   /** 
  390.    * Given a string representation of a host, return its ip address
  391.    * in textual presentation.
  392.    * 
  393.    * @param name a string representation of a host:
  394.    *             either a textual representation its IP address or its host name
  395.    * @return its IP address in the string format
  396.    */
  397.   public static String normalizeHostName(String name) {
  398.     if (Character.digit(name.charAt(0), 16) != -1) { // it is an IP
  399.       return name;
  400.     } else {
  401.       try {
  402.         InetAddress ipAddress = InetAddress.getByName(name);
  403.         return ipAddress.getHostAddress();
  404.       } catch (UnknownHostException e) {
  405.         return name;
  406.       }
  407.     }
  408.   }
  409.   
  410.   /** 
  411.    * Given a collection of string representation of hosts, return a list of
  412.    * corresponding IP addresses in the textual representation.
  413.    * 
  414.    * @param names a collection of string representations of hosts
  415.    * @return a list of corresponding IP addresses in the string format
  416.    * @see #normalizeHostName(String)
  417.    */
  418.   public static List<String> normalizeHostNames(Collection<String> names) {
  419.     List<String> hostNames = new ArrayList<String>(names.size());
  420.     for (String name : names) {
  421.       hostNames.add(normalizeHostName(name));
  422.     }
  423.     return hostNames;
  424.   }
  425. }