Client.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:30k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.ipc;
  19. import java.net.Socket;
  20. import java.net.InetSocketAddress;
  21. import java.net.SocketTimeoutException;
  22. import java.net.UnknownHostException;
  23. import java.net.ConnectException;
  24. import java.io.IOException;
  25. import java.io.DataInputStream;
  26. import java.io.DataOutputStream;
  27. import java.io.BufferedInputStream;
  28. import java.io.BufferedOutputStream;
  29. import java.io.FilterInputStream;
  30. import java.io.InputStream;
  31. import java.util.Hashtable;
  32. import java.util.Iterator;
  33. import java.util.Map.Entry;
  34. import java.util.concurrent.atomic.AtomicBoolean;
  35. import java.util.concurrent.atomic.AtomicLong;
  36. import javax.net.SocketFactory;
  37. import org.apache.commons.logging.*;
  38. import org.apache.hadoop.conf.Configuration;
  39. import org.apache.hadoop.io.IOUtils;
  40. import org.apache.hadoop.io.Writable;
  41. import org.apache.hadoop.io.WritableUtils;
  42. import org.apache.hadoop.io.DataOutputBuffer;
  43. import org.apache.hadoop.net.NetUtils;
  44. import org.apache.hadoop.security.UserGroupInformation;
  45. import org.apache.hadoop.util.ReflectionUtils;
  46. /** A client for an IPC service.  IPC calls take a single {@link Writable} as a
  47.  * parameter, and return a {@link Writable} as their value.  A service runs on
  48.  * a port and is defined by a parameter class and a value class.
  49.  * 
  50.  * @see Server
  51.  */
  52. public class Client {
  53.   
  54.   public static final Log LOG =
  55.     LogFactory.getLog(Client.class);
  56.   private Hashtable<ConnectionId, Connection> connections =
  57.     new Hashtable<ConnectionId, Connection>();
  58.   private Class<? extends Writable> valueClass;   // class of call values
  59.   private int counter;                            // counter for call ids
  60.   private AtomicBoolean running = new AtomicBoolean(true); // if client runs
  61.   final private Configuration conf;
  62.   final private int maxIdleTime; //connections will be culled if it was idle for 
  63.                            //maxIdleTime msecs
  64.   final private int maxRetries; //the max. no. of retries for socket connections
  65.   private boolean tcpNoDelay; // if T then disable Nagle's Algorithm
  66.   private int pingInterval; // how often sends ping to the server in msecs
  67.   private SocketFactory socketFactory;           // how to create sockets
  68.   private int refCount = 1;
  69.   
  70.   final private static String PING_INTERVAL_NAME = "ipc.ping.interval";
  71.   final static int DEFAULT_PING_INTERVAL = 60000; // 1 min
  72.   final static int PING_CALL_ID = -1;
  73.   
  74.   /**
  75.    * set the ping interval value in configuration
  76.    * 
  77.    * @param conf Configuration
  78.    * @param pingInterval the ping interval
  79.    */
  80.   final public static void setPingInterval(Configuration conf, int pingInterval) {
  81.     conf.setInt(PING_INTERVAL_NAME, pingInterval);
  82.   }
  83.   /**
  84.    * Get the ping interval from configuration;
  85.    * If not set in the configuration, return the default value.
  86.    * 
  87.    * @param conf Configuration
  88.    * @return the ping interval
  89.    */
  90.   final static int getPingInterval(Configuration conf) {
  91.     return conf.getInt(PING_INTERVAL_NAME, DEFAULT_PING_INTERVAL);
  92.   }
  93.   
  94.   /**
  95.    * Increment this client's reference count
  96.    *
  97.    */
  98.   synchronized void incCount() {
  99.     refCount++;
  100.   }
  101.   
  102.   /**
  103.    * Decrement this client's reference count
  104.    *
  105.    */
  106.   synchronized void decCount() {
  107.     refCount--;
  108.   }
  109.   
  110.   /**
  111.    * Return if this client has no reference
  112.    * 
  113.    * @return true if this client has no reference; false otherwise
  114.    */
  115.   synchronized boolean isZeroReference() {
  116.     return refCount==0;
  117.   }
  118.   /** A call waiting for a value. */
  119.   private class Call {
  120.     int id;                                       // call id
  121.     Writable param;                               // parameter
  122.     Writable value;                               // value, null if error
  123.     IOException error;                            // exception, null if value
  124.     boolean done;                                 // true when call is done
  125.     protected Call(Writable param) {
  126.       this.param = param;
  127.       synchronized (Client.this) {
  128.         this.id = counter++;
  129.       }
  130.     }
  131.     /** Indicate when the call is complete and the
  132.      * value or error are available.  Notifies by default.  */
  133.     protected synchronized void callComplete() {
  134.       this.done = true;
  135.       notify();                                 // notify caller
  136.     }
  137.     /** Set the exception when there is an error.
  138.      * Notify the caller the call is done.
  139.      * 
  140.      * @param error exception thrown by the call; either local or remote
  141.      */
  142.     public synchronized void setException(IOException error) {
  143.       this.error = error;
  144.       callComplete();
  145.     }
  146.     
  147.     /** Set the return value when there is no error. 
  148.      * Notify the caller the call is done.
  149.      * 
  150.      * @param value return value of the call.
  151.      */
  152.     public synchronized void setValue(Writable value) {
  153.       this.value = value;
  154.       callComplete();
  155.     }
  156.   }
  157.   /** Thread that reads responses and notifies callers.  Each connection owns a
  158.    * socket connected to a remote address.  Calls are multiplexed through this
  159.    * socket: responses may be delivered out of order. */
  160.   private class Connection extends Thread {
  161.     private InetSocketAddress server;             // server ip:port
  162.     private ConnectionHeader header;              // connection header
  163.     private ConnectionId remoteId;                // connection id
  164.     
  165.     private Socket socket = null;                 // connected socket
  166.     private DataInputStream in;
  167.     private DataOutputStream out;
  168.     
  169.     // currently active calls
  170.     private Hashtable<Integer, Call> calls = new Hashtable<Integer, Call>();
  171.     private AtomicLong lastActivity = new AtomicLong();// last I/O activity time
  172.     private AtomicBoolean shouldCloseConnection = new AtomicBoolean();  // indicate if the connection is closed
  173.     private IOException closeException; // close reason
  174.     public Connection(ConnectionId remoteId) throws IOException {
  175.       this.remoteId = remoteId;
  176.       this.server = remoteId.getAddress();
  177.       if (server.isUnresolved()) {
  178.         throw new UnknownHostException("unknown host: " + 
  179.                                        remoteId.getAddress().getHostName());
  180.       }
  181.       
  182.       UserGroupInformation ticket = remoteId.getTicket();
  183.       Class<?> protocol = remoteId.getProtocol();
  184.       header = 
  185.         new ConnectionHeader(protocol == null ? null : protocol.getName(), ticket);
  186.       
  187.       this.setName("IPC Client (" + socketFactory.hashCode() +") connection to " +
  188.           remoteId.getAddress().toString() +
  189.           " from " + ((ticket==null)?"an unknown user":ticket.getUserName()));
  190.       this.setDaemon(true);
  191.     }
  192.     /** Update lastActivity with the current time. */
  193.     private void touch() {
  194.       lastActivity.set(System.currentTimeMillis());
  195.     }
  196.     /**
  197.      * Add a call to this connection's call queue and notify
  198.      * a listener; synchronized.
  199.      * Returns false if called during shutdown.
  200.      * @param call to add
  201.      * @return true if the call was added.
  202.      */
  203.     private synchronized boolean addCall(Call call) {
  204.       if (shouldCloseConnection.get())
  205.         return false;
  206.       calls.put(call.id, call);
  207.       notify();
  208.       return true;
  209.     }
  210.     /** This class sends a ping to the remote side when timeout on
  211.      * reading. If no failure is detected, it retries until at least
  212.      * a byte is read.
  213.      */
  214.     private class PingInputStream extends FilterInputStream {
  215.       /* constructor */
  216.       protected PingInputStream(InputStream in) {
  217.         super(in);
  218.       }
  219.       /* Process timeout exception
  220.        * if the connection is not going to be closed, send a ping.
  221.        * otherwise, throw the timeout exception.
  222.        */
  223.       private void handleTimeout(SocketTimeoutException e) throws IOException {
  224.         if (shouldCloseConnection.get() || !running.get()) {
  225.           throw e;
  226.         } else {
  227.           sendPing();
  228.         }
  229.       }
  230.       
  231.       /** Read a byte from the stream.
  232.        * Send a ping if timeout on read. Retries if no failure is detected
  233.        * until a byte is read.
  234.        * @throws IOException for any IO problem other than socket timeout
  235.        */
  236.       public int read() throws IOException {
  237.         do {
  238.           try {
  239.             return super.read();
  240.           } catch (SocketTimeoutException e) {
  241.             handleTimeout(e);
  242.           }
  243.         } while (true);
  244.       }
  245.       /** Read bytes into a buffer starting from offset <code>off</code>
  246.        * Send a ping if timeout on read. Retries if no failure is detected
  247.        * until a byte is read.
  248.        * 
  249.        * @return the total number of bytes read; -1 if the connection is closed.
  250.        */
  251.       public int read(byte[] buf, int off, int len) throws IOException {
  252.         do {
  253.           try {
  254.             return super.read(buf, off, len);
  255.           } catch (SocketTimeoutException e) {
  256.             handleTimeout(e);
  257.           }
  258.         } while (true);
  259.       }
  260.     }
  261.     
  262.     /** Connect to the server and set up the I/O streams. It then sends
  263.      * a header to the server and starts
  264.      * the connection thread that waits for responses.
  265.      */
  266.     private synchronized void setupIOstreams() {
  267.       if (socket != null || shouldCloseConnection.get()) {
  268.         return;
  269.       }
  270.       
  271.       short ioFailures = 0;
  272.       short timeoutFailures = 0;
  273.       try {
  274.         if (LOG.isDebugEnabled()) {
  275.           LOG.debug("Connecting to "+server);
  276.         }
  277.         while (true) {
  278.           try {
  279.             this.socket = socketFactory.createSocket();
  280.             this.socket.setTcpNoDelay(tcpNoDelay);
  281.             // connection time out is 20s
  282.             NetUtils.connect(this.socket, remoteId.getAddress(), 20000);
  283.             this.socket.setSoTimeout(pingInterval);
  284.             break;
  285.           } catch (SocketTimeoutException toe) {
  286.             /* The max number of retries is 45,
  287.              * which amounts to 20s*45 = 15 minutes retries.
  288.              */
  289.             handleConnectionFailure(timeoutFailures++, 45, toe);
  290.           } catch (IOException ie) {
  291.             handleConnectionFailure(ioFailures++, maxRetries, ie);
  292.           }
  293.         }
  294.         this.in = new DataInputStream(new BufferedInputStream
  295.             (new PingInputStream(NetUtils.getInputStream(socket))));
  296.         this.out = new DataOutputStream
  297.             (new BufferedOutputStream(NetUtils.getOutputStream(socket)));
  298.         writeHeader();
  299.         // update last activity time
  300.         touch();
  301.         // start the receiver thread after the socket connection has been set up
  302.         start();
  303.       } catch (IOException e) {
  304.         markClosed(e);
  305.         close();
  306.       }
  307.     }
  308.     /* Handle connection failures
  309.      *
  310.      * If the current number of retries is equal to the max number of retries,
  311.      * stop retrying and throw the exception; Otherwise backoff 1 second and
  312.      * try connecting again.
  313.      *
  314.      * This Method is only called from inside setupIOstreams(), which is
  315.      * synchronized. Hence the sleep is synchronized; the locks will be retained.
  316.      *
  317.      * @param curRetries current number of retries
  318.      * @param maxRetries max number of retries allowed
  319.      * @param ioe failure reason
  320.      * @throws IOException if max number of retries is reached
  321.      */
  322.     private void handleConnectionFailure(
  323.         int curRetries, int maxRetries, IOException ioe) throws IOException {
  324.       // close the current connection
  325.       try {
  326.         socket.close();
  327.       } catch (IOException e) {
  328.         LOG.warn("Not able to close a socket", e);
  329.       }
  330.       // set socket to null so that the next call to setupIOstreams
  331.       // can start the process of connect all over again.
  332.       socket = null;
  333.       // throw the exception if the maximum number of retries is reached
  334.       if (curRetries >= maxRetries) {
  335.         throw ioe;
  336.       }
  337.       // otherwise back off and retry
  338.       try {
  339.         Thread.sleep(1000);
  340.       } catch (InterruptedException ignored) {}
  341.       
  342.       LOG.info("Retrying connect to server: " + server + 
  343.           ". Already tried " + curRetries + " time(s).");
  344.     }
  345.     /* Write the header for each connection
  346.      * Out is not synchronized because only the first thread does this.
  347.      */
  348.     private void writeHeader() throws IOException {
  349.       // Write out the header and version
  350.       out.write(Server.HEADER.array());
  351.       out.write(Server.CURRENT_VERSION);
  352.       // Write out the ConnectionHeader
  353.       DataOutputBuffer buf = new DataOutputBuffer();
  354.       header.write(buf);
  355.       
  356.       // Write out the payload length
  357.       int bufLen = buf.getLength();
  358.       out.writeInt(bufLen);
  359.       out.write(buf.getData(), 0, bufLen);
  360.     }
  361.     
  362.     /* wait till someone signals us to start reading RPC response or
  363.      * it is idle too long, it is marked as to be closed, 
  364.      * or the client is marked as not running.
  365.      * 
  366.      * Return true if it is time to read a response; false otherwise.
  367.      */
  368.     private synchronized boolean waitForWork() {
  369.       if (calls.isEmpty() && !shouldCloseConnection.get()  && running.get())  {
  370.         long timeout = maxIdleTime-
  371.               (System.currentTimeMillis()-lastActivity.get());
  372.         if (timeout>0) {
  373.           try {
  374.             wait(timeout);
  375.           } catch (InterruptedException e) {}
  376.         }
  377.       }
  378.       
  379.       if (!calls.isEmpty() && !shouldCloseConnection.get() && running.get()) {
  380.         return true;
  381.       } else if (shouldCloseConnection.get()) {
  382.         return false;
  383.       } else if (calls.isEmpty()) { // idle connection closed or stopped
  384.         markClosed(null);
  385.         return false;
  386.       } else { // get stopped but there are still pending requests 
  387.         markClosed((IOException)new IOException().initCause(
  388.             new InterruptedException()));
  389.         return false;
  390.       }
  391.     }
  392.     public InetSocketAddress getRemoteAddress() {
  393.       return server;
  394.     }
  395.     /* Send a ping to the server if the time elapsed 
  396.      * since last I/O activity is equal to or greater than the ping interval
  397.      */
  398.     private synchronized void sendPing() throws IOException {
  399.       long curTime = System.currentTimeMillis();
  400.       if ( curTime - lastActivity.get() >= pingInterval) {
  401.         lastActivity.set(curTime);
  402.         synchronized (out) {
  403.           out.writeInt(PING_CALL_ID);
  404.           out.flush();
  405.         }
  406.       }
  407.     }
  408.     public void run() {
  409.       if (LOG.isDebugEnabled())
  410.         LOG.debug(getName() + ": starting, having connections " 
  411.             + connections.size());
  412.       while (waitForWork()) {//wait here for work - read or close connection
  413.         receiveResponse();
  414.       }
  415.       
  416.       close();
  417.       
  418.       if (LOG.isDebugEnabled())
  419.         LOG.debug(getName() + ": stopped, remaining connections "
  420.             + connections.size());
  421.     }
  422.     /** Initiates a call by sending the parameter to the remote server.
  423.      * Note: this is not called from the Connection thread, but by other
  424.      * threads.
  425.      */
  426.     public void sendParam(Call call) {
  427.       if (shouldCloseConnection.get()) {
  428.         return;
  429.       }
  430.       DataOutputBuffer d=null;
  431.       try {
  432.         synchronized (this.out) {
  433.           if (LOG.isDebugEnabled())
  434.             LOG.debug(getName() + " sending #" + call.id);
  435.           
  436.           //for serializing the
  437.           //data to be written
  438.           d = new DataOutputBuffer();
  439.           d.writeInt(call.id);
  440.           call.param.write(d);
  441.           byte[] data = d.getData();
  442.           int dataLength = d.getLength();
  443.           out.writeInt(dataLength);      //first put the data length
  444.           out.write(data, 0, dataLength);//write the data
  445.           out.flush();
  446.         }
  447.       } catch(IOException e) {
  448.         markClosed(e);
  449.       } finally {
  450.         //the buffer is just an in-memory buffer, but it is still polite to
  451.         // close early
  452.         IOUtils.closeStream(d);
  453.       }
  454.     }  
  455.     /* Receive a response.
  456.      * Because only one receiver, so no synchronization on in.
  457.      */
  458.     private void receiveResponse() {
  459.       if (shouldCloseConnection.get()) {
  460.         return;
  461.       }
  462.       touch();
  463.       
  464.       try {
  465.         int id = in.readInt();                    // try to read an id
  466.         if (LOG.isDebugEnabled())
  467.           LOG.debug(getName() + " got value #" + id);
  468.         Call call = calls.remove(id);
  469.         int state = in.readInt();     // read call status
  470.         if (state == Status.SUCCESS.state) {
  471.           Writable value = ReflectionUtils.newInstance(valueClass, conf);
  472.           value.readFields(in);                 // read value
  473.           call.setValue(value);
  474.         } else if (state == Status.ERROR.state) {
  475.           call.setException(new RemoteException(WritableUtils.readString(in),
  476.                                                 WritableUtils.readString(in)));
  477.         } else if (state == Status.FATAL.state) {
  478.           // Close the connection
  479.           markClosed(new RemoteException(WritableUtils.readString(in), 
  480.                                          WritableUtils.readString(in)));
  481.         }
  482.       } catch (IOException e) {
  483.         markClosed(e);
  484.       }
  485.     }
  486.     
  487.     private synchronized void markClosed(IOException e) {
  488.       if (shouldCloseConnection.compareAndSet(false, true)) {
  489.         closeException = e;
  490.         notifyAll();
  491.       }
  492.     }
  493.     
  494.     /** Close the connection. */
  495.     private synchronized void close() {
  496.       if (!shouldCloseConnection.get()) {
  497.         LOG.error("The connection is not in the closed state");
  498.         return;
  499.       }
  500.       // release the resources
  501.       // first thing to do;take the connection out of the connection list
  502.       synchronized (connections) {
  503.         if (connections.get(remoteId) == this) {
  504.           connections.remove(remoteId);
  505.         }
  506.       }
  507.       // close the streams and therefore the socket
  508.       IOUtils.closeStream(out);
  509.       IOUtils.closeStream(in);
  510.       // clean up all calls
  511.       if (closeException == null) {
  512.         if (!calls.isEmpty()) {
  513.           LOG.warn(
  514.               "A connection is closed for no cause and calls are not empty");
  515.           // clean up calls anyway
  516.           closeException = new IOException("Unexpected closed connection");
  517.           cleanupCalls();
  518.         }
  519.       } else {
  520.         // log the info
  521.         if (LOG.isDebugEnabled()) {
  522.           LOG.debug("closing ipc connection to " + server + ": " +
  523.               closeException.getMessage(),closeException);
  524.         }
  525.         // cleanup calls
  526.         cleanupCalls();
  527.       }
  528.       if (LOG.isDebugEnabled())
  529.         LOG.debug(getName() + ": closed");
  530.     }
  531.     
  532.     /* Cleanup all calls and mark them as done */
  533.     private void cleanupCalls() {
  534.       Iterator<Entry<Integer, Call>> itor = calls.entrySet().iterator() ;
  535.       while (itor.hasNext()) {
  536.         Call c = itor.next().getValue(); 
  537.         c.setException(closeException); // local exception
  538.         itor.remove();         
  539.       }
  540.     }
  541.   }
  542.   /** Call implementation used for parallel calls. */
  543.   private class ParallelCall extends Call {
  544.     private ParallelResults results;
  545.     private int index;
  546.     
  547.     public ParallelCall(Writable param, ParallelResults results, int index) {
  548.       super(param);
  549.       this.results = results;
  550.       this.index = index;
  551.     }
  552.     /** Deliver result to result collector. */
  553.     protected void callComplete() {
  554.       results.callComplete(this);
  555.     }
  556.   }
  557.   /** Result collector for parallel calls. */
  558.   private static class ParallelResults {
  559.     private Writable[] values;
  560.     private int size;
  561.     private int count;
  562.     public ParallelResults(int size) {
  563.       this.values = new Writable[size];
  564.       this.size = size;
  565.     }
  566.     /** Collect a result. */
  567.     public synchronized void callComplete(ParallelCall call) {
  568.       values[call.index] = call.value;            // store the value
  569.       count++;                                    // count it
  570.       if (count == size)                          // if all values are in
  571.         notify();                                 // then notify waiting caller
  572.     }
  573.   }
  574.   /** Construct an IPC client whose values are of the given {@link Writable}
  575.    * class. */
  576.   public Client(Class<? extends Writable> valueClass, Configuration conf, 
  577.       SocketFactory factory) {
  578.     this.valueClass = valueClass;
  579.     this.maxIdleTime = 
  580.       conf.getInt("ipc.client.connection.maxidletime", 10000); //10s
  581.     this.maxRetries = conf.getInt("ipc.client.connect.max.retries", 10);
  582.     this.tcpNoDelay = conf.getBoolean("ipc.client.tcpnodelay", false);
  583.     this.pingInterval = getPingInterval(conf);
  584.     if (LOG.isDebugEnabled()) {
  585.       LOG.debug("The ping interval is" + this.pingInterval + "ms.");
  586.     }
  587.     this.conf = conf;
  588.     this.socketFactory = factory;
  589.   }
  590.   /**
  591.    * Construct an IPC client with the default SocketFactory
  592.    * @param valueClass
  593.    * @param conf
  594.    */
  595.   public Client(Class<? extends Writable> valueClass, Configuration conf) {
  596.     this(valueClass, conf, NetUtils.getDefaultSocketFactory(conf));
  597.   }
  598.  
  599.   /** Return the socket factory of this client
  600.    *
  601.    * @return this client's socket factory
  602.    */
  603.   SocketFactory getSocketFactory() {
  604.     return socketFactory;
  605.   }
  606.   /** Stop all threads related to this client.  No further calls may be made
  607.    * using this client. */
  608.   public void stop() {
  609.     if (LOG.isDebugEnabled()) {
  610.       LOG.debug("Stopping client");
  611.     }
  612.     if (!running.compareAndSet(true, false)) {
  613.       return;
  614.     }
  615.     
  616.     // wake up all connections
  617.     synchronized (connections) {
  618.       for (Connection conn : connections.values()) {
  619.         conn.interrupt();
  620.       }
  621.     }
  622.     
  623.     // wait until all connections are closed
  624.     while (!connections.isEmpty()) {
  625.       try {
  626.         Thread.sleep(100);
  627.       } catch (InterruptedException e) {
  628.       }
  629.     }
  630.   }
  631.   /** Make a call, passing <code>param</code>, to the IPC server running at
  632.    * <code>address</code>, returning the value.  Throws exceptions if there are
  633.    * network problems or if the remote code threw an exception.
  634.    * @deprecated Use {@link #call(Writable, InetSocketAddress, Class, UserGroupInformation)} instead 
  635.    */
  636.   @Deprecated
  637.   public Writable call(Writable param, InetSocketAddress address)
  638.   throws InterruptedException, IOException {
  639.       return call(param, address, null);
  640.   }
  641.   
  642.   /** Make a call, passing <code>param</code>, to the IPC server running at
  643.    * <code>address</code> with the <code>ticket</code> credentials, returning 
  644.    * the value.  
  645.    * Throws exceptions if there are network problems or if the remote code 
  646.    * threw an exception.
  647.    * @deprecated Use {@link #call(Writable, InetSocketAddress, Class, UserGroupInformation)} instead 
  648.    */
  649.   @Deprecated
  650.   public Writable call(Writable param, InetSocketAddress addr, 
  651.       UserGroupInformation ticket)  
  652.       throws InterruptedException, IOException {
  653.     return call(param, addr, null, ticket);
  654.   }
  655.   
  656.   /** Make a call, passing <code>param</code>, to the IPC server running at
  657.    * <code>address</code> which is servicing the <code>protocol</code> protocol, 
  658.    * with the <code>ticket</code> credentials, returning the value.  
  659.    * Throws exceptions if there are network problems or if the remote code 
  660.    * threw an exception. */
  661.   public Writable call(Writable param, InetSocketAddress addr, 
  662.                        Class<?> protocol, UserGroupInformation ticket)  
  663.                        throws InterruptedException, IOException {
  664.     Call call = new Call(param);
  665.     Connection connection = getConnection(addr, protocol, ticket, call);
  666.     connection.sendParam(call);                 // send the parameter
  667.     boolean interrupted = false;
  668.     synchronized (call) {
  669.       while (!call.done) {
  670.         try {
  671.           call.wait();                           // wait for the result
  672.         } catch (InterruptedException ie) {
  673.           // save the fact that we were interrupted
  674.           interrupted = true;
  675.         }
  676.       }
  677.       if (interrupted) {
  678.         // set the interrupt flag now that we are done waiting
  679.         Thread.currentThread().interrupt();
  680.       }
  681.       if (call.error != null) {
  682.         if (call.error instanceof RemoteException) {
  683.           call.error.fillInStackTrace();
  684.           throw call.error;
  685.         } else { // local exception
  686.           throw wrapException(addr, call.error);
  687.         }
  688.       } else {
  689.         return call.value;
  690.       }
  691.     }
  692.   }
  693.   /**
  694.    * Take an IOException and the address we were trying to connect to
  695.    * and return an IOException with the input exception as the cause.
  696.    * The new exception provides the stack trace of the place where 
  697.    * the exception is thrown and some extra diagnostics information.
  698.    * If the exception is ConnectException or SocketTimeoutException, 
  699.    * return a new one of the same type; Otherwise return an IOException.
  700.    * 
  701.    * @param addr target address
  702.    * @param exception the relevant exception
  703.    * @return an exception to throw
  704.    */
  705.   private IOException wrapException(InetSocketAddress addr,
  706.                                          IOException exception) {
  707.     if (exception instanceof ConnectException) {
  708.       //connection refused; include the host:port in the error
  709.       return (ConnectException)new ConnectException(
  710.            "Call to " + addr + " failed on connection exception: " + exception)
  711.                     .initCause(exception);
  712.     } else if (exception instanceof SocketTimeoutException) {
  713.       return (SocketTimeoutException)new SocketTimeoutException(
  714.            "Call to " + addr + " failed on socket timeout exception: "
  715.                       + exception).initCause(exception);
  716.     } else {
  717.       return (IOException)new IOException(
  718.            "Call to " + addr + " failed on local exception: " + exception)
  719.                                  .initCause(exception);
  720.     }
  721.   }
  722.   /** 
  723.    * Makes a set of calls in parallel.  Each parameter is sent to the
  724.    * corresponding address.  When all values are available, or have timed out
  725.    * or errored, the collected results are returned in an array.  The array
  726.    * contains nulls for calls that timed out or errored.
  727.    * @deprecated Use {@link #call(Writable[], InetSocketAddress[], Class, UserGroupInformation)} instead 
  728.    */
  729.   @Deprecated
  730.   public Writable[] call(Writable[] params, InetSocketAddress[] addresses)
  731.     throws IOException {
  732.     return call(params, addresses, null, null);
  733.   }
  734.   
  735.   /** Makes a set of calls in parallel.  Each parameter is sent to the
  736.    * corresponding address.  When all values are available, or have timed out
  737.    * or errored, the collected results are returned in an array.  The array
  738.    * contains nulls for calls that timed out or errored.  */
  739.   public Writable[] call(Writable[] params, InetSocketAddress[] addresses, 
  740.                          Class<?> protocol, UserGroupInformation ticket)
  741.     throws IOException {
  742.     if (addresses.length == 0) return new Writable[0];
  743.     ParallelResults results = new ParallelResults(params.length);
  744.     synchronized (results) {
  745.       for (int i = 0; i < params.length; i++) {
  746.         ParallelCall call = new ParallelCall(params[i], results, i);
  747.         try {
  748.           Connection connection = 
  749.             getConnection(addresses[i], protocol, ticket, call);
  750.           connection.sendParam(call);             // send each parameter
  751.         } catch (IOException e) {
  752.           // log errors
  753.           LOG.info("Calling "+addresses[i]+" caught: " + 
  754.                    e.getMessage(),e);
  755.           results.size--;                         //  wait for one fewer result
  756.         }
  757.       }
  758.       while (results.count != results.size) {
  759.         try {
  760.           results.wait();                    // wait for all results
  761.         } catch (InterruptedException e) {}
  762.       }
  763.       return results.values;
  764.     }
  765.   }
  766.   /** Get a connection from the pool, or create a new one and add it to the
  767.    * pool.  Connections to a given host/port are reused. */
  768.   private Connection getConnection(InetSocketAddress addr,
  769.                                    Class<?> protocol,
  770.                                    UserGroupInformation ticket,
  771.                                    Call call)
  772.                                    throws IOException {
  773.     if (!running.get()) {
  774.       // the client is stopped
  775.       throw new IOException("The client is stopped");
  776.     }
  777.     Connection connection;
  778.     /* we could avoid this allocation for each RPC by having a  
  779.      * connectionsId object and with set() method. We need to manage the
  780.      * refs for keys in HashMap properly. For now its ok.
  781.      */
  782.     ConnectionId remoteId = new ConnectionId(addr, protocol, ticket);
  783.     do {
  784.       synchronized (connections) {
  785.         connection = connections.get(remoteId);
  786.         if (connection == null) {
  787.           connection = new Connection(remoteId);
  788.           connections.put(remoteId, connection);
  789.         }
  790.       }
  791.     } while (!connection.addCall(call));
  792.     
  793.     //we don't invoke the method below inside "synchronized (connections)"
  794.     //block above. The reason for that is if the server happens to be slow,
  795.     //it will take longer to establish a connection and that will slow the
  796.     //entire system down.
  797.     connection.setupIOstreams();
  798.     return connection;
  799.   }
  800.   /**
  801.    * This class holds the address and the user ticket. The client connections
  802.    * to servers are uniquely identified by <remoteAddress, protocol, ticket>
  803.    */
  804.   private static class ConnectionId {
  805.     InetSocketAddress address;
  806.     UserGroupInformation ticket;
  807.     Class<?> protocol;
  808.     private static final int PRIME = 16777619;
  809.     
  810.     ConnectionId(InetSocketAddress address, Class<?> protocol, 
  811.                  UserGroupInformation ticket) {
  812.       this.protocol = protocol;
  813.       this.address = address;
  814.       this.ticket = ticket;
  815.     }
  816.     
  817.     InetSocketAddress getAddress() {
  818.       return address;
  819.     }
  820.     
  821.     Class<?> getProtocol() {
  822.       return protocol;
  823.     }
  824.     
  825.     UserGroupInformation getTicket() {
  826.       return ticket;
  827.     }
  828.     
  829.     
  830.     @Override
  831.     public boolean equals(Object obj) {
  832.      if (obj instanceof ConnectionId) {
  833.        ConnectionId id = (ConnectionId) obj;
  834.        return address.equals(id.address) && protocol == id.protocol && 
  835.               ticket == id.ticket;
  836.        //Note : ticket is a ref comparision.
  837.      }
  838.      return false;
  839.     }
  840.     
  841.     @Override
  842.     public int hashCode() {
  843.       return (address.hashCode() + PRIME * System.identityHashCode(protocol)) ^ 
  844.              System.identityHashCode(ticket);
  845.     }
  846.   }  
  847. }