RPC.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:20k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.ipc;
  19. import java.lang.reflect.Proxy;
  20. import java.lang.reflect.Method;
  21. import java.lang.reflect.Array;
  22. import java.lang.reflect.InvocationHandler;
  23. import java.lang.reflect.InvocationTargetException;
  24. import java.net.ConnectException;
  25. import java.net.InetSocketAddress;
  26. import java.net.SocketTimeoutException;
  27. import java.io.*;
  28. import java.util.Map;
  29. import java.util.HashMap;
  30. import javax.net.SocketFactory;
  31. import javax.security.auth.Subject;
  32. import javax.security.auth.login.LoginException;
  33. import org.apache.commons.logging.*;
  34. import org.apache.hadoop.io.*;
  35. import org.apache.hadoop.net.NetUtils;
  36. import org.apache.hadoop.security.UserGroupInformation;
  37. import org.apache.hadoop.security.authorize.AuthorizationException;
  38. import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
  39. import org.apache.hadoop.conf.*;
  40. import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
  41. /** A simple RPC mechanism.
  42.  *
  43.  * A <i>protocol</i> is a Java interface.  All parameters and return types must
  44.  * be one of:
  45.  *
  46.  * <ul> <li>a primitive type, <code>boolean</code>, <code>byte</code>,
  47.  * <code>char</code>, <code>short</code>, <code>int</code>, <code>long</code>,
  48.  * <code>float</code>, <code>double</code>, or <code>void</code>; or</li>
  49.  *
  50.  * <li>a {@link String}; or</li>
  51.  *
  52.  * <li>a {@link Writable}; or</li>
  53.  *
  54.  * <li>an array of the above types</li> </ul>
  55.  *
  56.  * All methods in the protocol should throw only IOException.  No field data of
  57.  * the protocol instance is transmitted.
  58.  */
  59. public class RPC {
  60.   private static final Log LOG =
  61.     LogFactory.getLog(RPC.class);
  62.   private RPC() {}                                  // no public ctor
  63.   /** A method invocation, including the method name and its parameters.*/
  64.   private static class Invocation implements Writable, Configurable {
  65.     private String methodName;
  66.     private Class[] parameterClasses;
  67.     private Object[] parameters;
  68.     private Configuration conf;
  69.     public Invocation() {}
  70.     public Invocation(Method method, Object[] parameters) {
  71.       this.methodName = method.getName();
  72.       this.parameterClasses = method.getParameterTypes();
  73.       this.parameters = parameters;
  74.     }
  75.     /** The name of the method invoked. */
  76.     public String getMethodName() { return methodName; }
  77.     /** The parameter classes. */
  78.     public Class[] getParameterClasses() { return parameterClasses; }
  79.     /** The parameter instances. */
  80.     public Object[] getParameters() { return parameters; }
  81.     public void readFields(DataInput in) throws IOException {
  82.       methodName = UTF8.readString(in);
  83.       parameters = new Object[in.readInt()];
  84.       parameterClasses = new Class[parameters.length];
  85.       ObjectWritable objectWritable = new ObjectWritable();
  86.       for (int i = 0; i < parameters.length; i++) {
  87.         parameters[i] = ObjectWritable.readObject(in, objectWritable, this.conf);
  88.         parameterClasses[i] = objectWritable.getDeclaredClass();
  89.       }
  90.     }
  91.     public void write(DataOutput out) throws IOException {
  92.       UTF8.writeString(out, methodName);
  93.       out.writeInt(parameterClasses.length);
  94.       for (int i = 0; i < parameterClasses.length; i++) {
  95.         ObjectWritable.writeObject(out, parameters[i], parameterClasses[i],
  96.                                    conf);
  97.       }
  98.     }
  99.     public String toString() {
  100.       StringBuffer buffer = new StringBuffer();
  101.       buffer.append(methodName);
  102.       buffer.append("(");
  103.       for (int i = 0; i < parameters.length; i++) {
  104.         if (i != 0)
  105.           buffer.append(", ");
  106.         buffer.append(parameters[i]);
  107.       }
  108.       buffer.append(")");
  109.       return buffer.toString();
  110.     }
  111.     public void setConf(Configuration conf) {
  112.       this.conf = conf;
  113.     }
  114.     public Configuration getConf() {
  115.       return this.conf;
  116.     }
  117.   }
  118.   /* Cache a client using its socket factory as the hash key */
  119.   static private class ClientCache {
  120.     private Map<SocketFactory, Client> clients =
  121.       new HashMap<SocketFactory, Client>();
  122.     /**
  123.      * Construct & cache an IPC client with the user-provided SocketFactory 
  124.      * if no cached client exists.
  125.      * 
  126.      * @param conf Configuration
  127.      * @return an IPC client
  128.      */
  129.     private synchronized Client getClient(Configuration conf,
  130.         SocketFactory factory) {
  131.       // Construct & cache client.  The configuration is only used for timeout,
  132.       // and Clients have connection pools.  So we can either (a) lose some
  133.       // connection pooling and leak sockets, or (b) use the same timeout for all
  134.       // configurations.  Since the IPC is usually intended globally, not
  135.       // per-job, we choose (a).
  136.       Client client = clients.get(factory);
  137.       if (client == null) {
  138.         client = new Client(ObjectWritable.class, conf, factory);
  139.         clients.put(factory, client);
  140.       } else {
  141.         client.incCount();
  142.       }
  143.       return client;
  144.     }
  145.     /**
  146.      * Construct & cache an IPC client with the default SocketFactory 
  147.      * if no cached client exists.
  148.      * 
  149.      * @param conf Configuration
  150.      * @return an IPC client
  151.      */
  152.     private synchronized Client getClient(Configuration conf) {
  153.       return getClient(conf, SocketFactory.getDefault());
  154.     }
  155.     /**
  156.      * Stop a RPC client connection 
  157.      * A RPC client is closed only when its reference count becomes zero.
  158.      */
  159.     private void stopClient(Client client) {
  160.       synchronized (this) {
  161.         client.decCount();
  162.         if (client.isZeroReference()) {
  163.           clients.remove(client.getSocketFactory());
  164.         }
  165.       }
  166.       if (client.isZeroReference()) {
  167.         client.stop();
  168.       }
  169.     }
  170.   }
  171.   private static ClientCache CLIENTS=new ClientCache();
  172.   
  173.   private static class Invoker implements InvocationHandler {
  174.     private InetSocketAddress address;
  175.     private UserGroupInformation ticket;
  176.     private Client client;
  177.     private boolean isClosed = false;
  178.     public Invoker(InetSocketAddress address, UserGroupInformation ticket, 
  179.                    Configuration conf, SocketFactory factory) {
  180.       this.address = address;
  181.       this.ticket = ticket;
  182.       this.client = CLIENTS.getClient(conf, factory);
  183.     }
  184.     public Object invoke(Object proxy, Method method, Object[] args)
  185.       throws Throwable {
  186.       final boolean logDebug = LOG.isDebugEnabled();
  187.       long startTime = 0;
  188.       if (logDebug) {
  189.         startTime = System.currentTimeMillis();
  190.       }
  191.       ObjectWritable value = (ObjectWritable)
  192.         client.call(new Invocation(method, args), address, 
  193.                     method.getDeclaringClass(), ticket);
  194.       if (logDebug) {
  195.         long callTime = System.currentTimeMillis() - startTime;
  196.         LOG.debug("Call: " + method.getName() + " " + callTime);
  197.       }
  198.       return value.get();
  199.     }
  200.     
  201.     /* close the IPC client that's responsible for this invoker's RPCs */ 
  202.     synchronized private void close() {
  203.       if (!isClosed) {
  204.         isClosed = true;
  205.         CLIENTS.stopClient(client);
  206.       }
  207.     }
  208.   }
  209.   /**
  210.    * A version mismatch for the RPC protocol.
  211.    */
  212.   public static class VersionMismatch extends IOException {
  213.     private String interfaceName;
  214.     private long clientVersion;
  215.     private long serverVersion;
  216.     
  217.     /**
  218.      * Create a version mismatch exception
  219.      * @param interfaceName the name of the protocol mismatch
  220.      * @param clientVersion the client's version of the protocol
  221.      * @param serverVersion the server's version of the protocol
  222.      */
  223.     public VersionMismatch(String interfaceName, long clientVersion,
  224.                            long serverVersion) {
  225.       super("Protocol " + interfaceName + " version mismatch. (client = " +
  226.             clientVersion + ", server = " + serverVersion + ")");
  227.       this.interfaceName = interfaceName;
  228.       this.clientVersion = clientVersion;
  229.       this.serverVersion = serverVersion;
  230.     }
  231.     
  232.     /**
  233.      * Get the interface name
  234.      * @return the java class name 
  235.      *          (eg. org.apache.hadoop.mapred.InterTrackerProtocol)
  236.      */
  237.     public String getInterfaceName() {
  238.       return interfaceName;
  239.     }
  240.     
  241.     /**
  242.      * Get the client's preferred version
  243.      */
  244.     public long getClientVersion() {
  245.       return clientVersion;
  246.     }
  247.     
  248.     /**
  249.      * Get the server's agreed to version.
  250.      */
  251.     public long getServerVersion() {
  252.       return serverVersion;
  253.     }
  254.   }
  255.   
  256.   public static VersionedProtocol waitForProxy(Class protocol,
  257.       long clientVersion,
  258.       InetSocketAddress addr,
  259.       Configuration conf
  260.       ) throws IOException {
  261.     return waitForProxy(protocol, clientVersion, addr, conf, Long.MAX_VALUE);
  262.   }
  263.   /**
  264.    * Get a proxy connection to a remote server
  265.    * @param protocol protocol class
  266.    * @param clientVersion client version
  267.    * @param addr remote address
  268.    * @param conf configuration to use
  269.    * @param timeout time in milliseconds before giving up
  270.    * @return the proxy
  271.    * @throws IOException if the far end through a RemoteException
  272.    */
  273.   static VersionedProtocol waitForProxy(Class protocol,
  274.                                                long clientVersion,
  275.                                                InetSocketAddress addr,
  276.                                                Configuration conf,
  277.                                                long timeout
  278.                                                ) throws IOException { 
  279.     long startTime = System.currentTimeMillis();
  280.     IOException ioe;
  281.     while (true) {
  282.       try {
  283.         return getProxy(protocol, clientVersion, addr, conf);
  284.       } catch(ConnectException se) {  // namenode has not been started
  285.         LOG.info("Server at " + addr + " not available yet, Zzzzz...");
  286.         ioe = se;
  287.       } catch(SocketTimeoutException te) {  // namenode is busy
  288.         LOG.info("Problem connecting to server: " + addr);
  289.         ioe = te;
  290.       }
  291.       // check if timed out
  292.       if (System.currentTimeMillis()-timeout >= startTime) {
  293.         throw ioe;
  294.       }
  295.       // wait for retry
  296.       try {
  297.         Thread.sleep(1000);
  298.       } catch (InterruptedException ie) {
  299.         // IGNORE
  300.       }
  301.     }
  302.   }
  303.   /** Construct a client-side proxy object that implements the named protocol,
  304.    * talking to a server at the named address. */
  305.   public static VersionedProtocol getProxy(Class<?> protocol,
  306.       long clientVersion, InetSocketAddress addr, Configuration conf,
  307.       SocketFactory factory) throws IOException {
  308.     UserGroupInformation ugi = null;
  309.     try {
  310.       ugi = UserGroupInformation.login(conf);
  311.     } catch (LoginException le) {
  312.       throw new RuntimeException("Couldn't login!");
  313.     }
  314.     return getProxy(protocol, clientVersion, addr, ugi, conf, factory);
  315.   }
  316.   
  317.   /** Construct a client-side proxy object that implements the named protocol,
  318.    * talking to a server at the named address. */
  319.   public static VersionedProtocol getProxy(Class<?> protocol,
  320.       long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,
  321.       Configuration conf, SocketFactory factory) throws IOException {    
  322.     VersionedProtocol proxy =
  323.         (VersionedProtocol) Proxy.newProxyInstance(
  324.             protocol.getClassLoader(), new Class[] { protocol },
  325.             new Invoker(addr, ticket, conf, factory));
  326.     long serverVersion = proxy.getProtocolVersion(protocol.getName(), 
  327.                                                   clientVersion);
  328.     if (serverVersion == clientVersion) {
  329.       return proxy;
  330.     } else {
  331.       throw new VersionMismatch(protocol.getName(), clientVersion, 
  332.                                 serverVersion);
  333.     }
  334.   }
  335.   /**
  336.    * Construct a client-side proxy object with the default SocketFactory
  337.    * 
  338.    * @param protocol
  339.    * @param clientVersion
  340.    * @param addr
  341.    * @param conf
  342.    * @return a proxy instance
  343.    * @throws IOException
  344.    */
  345.   public static VersionedProtocol getProxy(Class<?> protocol,
  346.       long clientVersion, InetSocketAddress addr, Configuration conf)
  347.       throws IOException {
  348.     return getProxy(protocol, clientVersion, addr, conf, NetUtils
  349.         .getDefaultSocketFactory(conf));
  350.   }
  351.   /**
  352.    * Stop this proxy and release its invoker's resource
  353.    * @param proxy the proxy to be stopped
  354.    */
  355.   public static void stopProxy(VersionedProtocol proxy) {
  356.     if (proxy!=null) {
  357.       ((Invoker)Proxy.getInvocationHandler(proxy)).close();
  358.     }
  359.   }
  360.   /** 
  361.    * Expert: Make multiple, parallel calls to a set of servers.
  362.    * @deprecated Use {@link #call(Method, Object[][], InetSocketAddress[], UserGroupInformation, Configuration)} instead 
  363.    */
  364.   public static Object[] call(Method method, Object[][] params,
  365.                               InetSocketAddress[] addrs, Configuration conf)
  366.     throws IOException {
  367.     return call(method, params, addrs, null, conf);
  368.   }
  369.   
  370.   /** Expert: Make multiple, parallel calls to a set of servers. */
  371.   public static Object[] call(Method method, Object[][] params,
  372.                               InetSocketAddress[] addrs, 
  373.                               UserGroupInformation ticket, Configuration conf)
  374.     throws IOException {
  375.     Invocation[] invocations = new Invocation[params.length];
  376.     for (int i = 0; i < params.length; i++)
  377.       invocations[i] = new Invocation(method, params[i]);
  378.     Client client = CLIENTS.getClient(conf);
  379.     try {
  380.     Writable[] wrappedValues = 
  381.       client.call(invocations, addrs, method.getDeclaringClass(), ticket);
  382.     
  383.     if (method.getReturnType() == Void.TYPE) {
  384.       return null;
  385.     }
  386.     Object[] values =
  387.       (Object[])Array.newInstance(method.getReturnType(), wrappedValues.length);
  388.     for (int i = 0; i < values.length; i++)
  389.       if (wrappedValues[i] != null)
  390.         values[i] = ((ObjectWritable)wrappedValues[i]).get();
  391.     
  392.     return values;
  393.     } finally {
  394.       CLIENTS.stopClient(client);
  395.     }
  396.   }
  397.   /** Construct a server for a protocol implementation instance listening on a
  398.    * port and address. */
  399.   public static Server getServer(final Object instance, final String bindAddress, final int port, Configuration conf) 
  400.     throws IOException {
  401.     return getServer(instance, bindAddress, port, 1, false, conf);
  402.   }
  403.   /** Construct a server for a protocol implementation instance listening on a
  404.    * port and address. */
  405.   public static Server getServer(final Object instance, final String bindAddress, final int port,
  406.                                  final int numHandlers,
  407.                                  final boolean verbose, Configuration conf) 
  408.     throws IOException {
  409.     return new Server(instance, conf, bindAddress, port, numHandlers, verbose);
  410.   }
  411.   /** An RPC Server. */
  412.   public static class Server extends org.apache.hadoop.ipc.Server {
  413.     private Object instance;
  414.     private boolean verbose;
  415.     private boolean authorize = false;
  416.     /** Construct an RPC server.
  417.      * @param instance the instance whose methods will be called
  418.      * @param conf the configuration to use
  419.      * @param bindAddress the address to bind on to listen for connection
  420.      * @param port the port to listen for connections on
  421.      */
  422.     public Server(Object instance, Configuration conf, String bindAddress, int port) 
  423.       throws IOException {
  424.       this(instance, conf,  bindAddress, port, 1, false);
  425.     }
  426.     
  427.     private static String classNameBase(String className) {
  428.       String[] names = className.split("\.", -1);
  429.       if (names == null || names.length == 0) {
  430.         return className;
  431.       }
  432.       return names[names.length-1];
  433.     }
  434.     
  435.     /** Construct an RPC server.
  436.      * @param instance the instance whose methods will be called
  437.      * @param conf the configuration to use
  438.      * @param bindAddress the address to bind on to listen for connection
  439.      * @param port the port to listen for connections on
  440.      * @param numHandlers the number of method handler threads to run
  441.      * @param verbose whether each call should be logged
  442.      */
  443.     public Server(Object instance, Configuration conf, String bindAddress,  int port,
  444.                   int numHandlers, boolean verbose) throws IOException {
  445.       super(bindAddress, port, Invocation.class, numHandlers, conf, classNameBase(instance.getClass().getName()));
  446.       this.instance = instance;
  447.       this.verbose = verbose;
  448.       this.authorize = 
  449.         conf.getBoolean(ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, 
  450.                         false);
  451.     }
  452.     public Writable call(Class<?> protocol, Writable param, long receivedTime) 
  453.     throws IOException {
  454.       try {
  455.         Invocation call = (Invocation)param;
  456.         if (verbose) log("Call: " + call);
  457.         Method method =
  458.           protocol.getMethod(call.getMethodName(),
  459.                                    call.getParameterClasses());
  460.         method.setAccessible(true);
  461.         long startTime = System.currentTimeMillis();
  462.         Object value = method.invoke(instance, call.getParameters());
  463.         int processingTime = (int) (System.currentTimeMillis() - startTime);
  464.         int qTime = (int) (startTime-receivedTime);
  465.         if (LOG.isDebugEnabled()) {
  466.           LOG.debug("Served: " + call.getMethodName() +
  467.                     " queueTime= " + qTime +
  468.                     " procesingTime= " + processingTime);
  469.         }
  470.         rpcMetrics.rpcQueueTime.inc(qTime);
  471.         rpcMetrics.rpcProcessingTime.inc(processingTime);
  472.         MetricsTimeVaryingRate m =
  473.          (MetricsTimeVaryingRate) rpcMetrics.registry.get(call.getMethodName());
  474.        if (m == null) {
  475.          try {
  476.            m = new MetricsTimeVaryingRate(call.getMethodName(),
  477.                                                rpcMetrics.registry);
  478.          } catch (IllegalArgumentException iae) {
  479.            // the metrics has been registered; re-fetch the handle
  480.            LOG.info("Error register " + call.getMethodName(), iae);
  481.            m = (MetricsTimeVaryingRate) rpcMetrics.registry.get(
  482.                call.getMethodName());
  483.          }
  484.        }
  485.         m.inc(processingTime);
  486.         if (verbose) log("Return: "+value);
  487.         return new ObjectWritable(method.getReturnType(), value);
  488.       } catch (InvocationTargetException e) {
  489.         Throwable target = e.getTargetException();
  490.         if (target instanceof IOException) {
  491.           throw (IOException)target;
  492.         } else {
  493.           IOException ioe = new IOException(target.toString());
  494.           ioe.setStackTrace(target.getStackTrace());
  495.           throw ioe;
  496.         }
  497.       } catch (Throwable e) {
  498.         IOException ioe = new IOException(e.toString());
  499.         ioe.setStackTrace(e.getStackTrace());
  500.         throw ioe;
  501.       }
  502.     }
  503.     @Override
  504.     public void authorize(Subject user, ConnectionHeader connection) 
  505.     throws AuthorizationException {
  506.       if (authorize) {
  507.         Class<?> protocol = null;
  508.         try {
  509.           protocol = getProtocolClass(connection.getProtocol(), getConf());
  510.         } catch (ClassNotFoundException cfne) {
  511.           throw new AuthorizationException("Unknown protocol: " + 
  512.                                            connection.getProtocol());
  513.         }
  514.         ServiceAuthorizationManager.authorize(user, protocol);
  515.       }
  516.     }
  517.   }
  518.   private static void log(String value) {
  519.     if (value!= null && value.length() > 55)
  520.       value = value.substring(0, 55)+"...";
  521.     LOG.info(value);
  522.   }
  523. }