Server.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:43k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.ipc;
  19. import java.io.IOException;
  20. import java.io.DataInputStream;
  21. import java.io.DataOutputStream;
  22. import java.io.ByteArrayInputStream;
  23. import java.io.ByteArrayOutputStream;
  24. import java.nio.ByteBuffer;
  25. import java.nio.channels.CancelledKeyException;
  26. import java.nio.channels.ClosedChannelException;
  27. import java.nio.channels.ReadableByteChannel;
  28. import java.nio.channels.SelectionKey;
  29. import java.nio.channels.Selector;
  30. import java.nio.channels.ServerSocketChannel;
  31. import java.nio.channels.SocketChannel;
  32. import java.nio.channels.WritableByteChannel;
  33. import java.net.BindException;
  34. import java.net.InetAddress;
  35. import java.net.InetSocketAddress;
  36. import java.net.ServerSocket;
  37. import java.net.Socket;
  38. import java.net.SocketException;
  39. import java.net.UnknownHostException;
  40. import java.security.PrivilegedActionException;
  41. import java.security.PrivilegedExceptionAction;
  42. import java.util.ArrayList;
  43. import java.util.Collections;
  44. import java.util.LinkedList;
  45. import java.util.List;
  46. import java.util.Iterator;
  47. import java.util.Map;
  48. import java.util.Random;
  49. import java.util.concurrent.BlockingQueue;
  50. import java.util.concurrent.ConcurrentHashMap;
  51. import java.util.concurrent.LinkedBlockingQueue;
  52. import javax.security.auth.Subject;
  53. import org.apache.commons.logging.Log;
  54. import org.apache.commons.logging.LogFactory;
  55. import org.apache.hadoop.conf.Configuration;
  56. import org.apache.hadoop.security.SecurityUtil;
  57. import org.apache.hadoop.io.Writable;
  58. import org.apache.hadoop.io.WritableUtils;
  59. import org.apache.hadoop.util.ReflectionUtils;
  60. import org.apache.hadoop.util.StringUtils;
  61. import org.apache.hadoop.ipc.metrics.RpcMetrics;
  62. import org.apache.hadoop.security.authorize.AuthorizationException;
  63. /** An abstract IPC service.  IPC calls take a single {@link Writable} as a
  64.  * parameter, and return a {@link Writable} as their value.  A service runs on
  65.  * a port and is defined by a parameter class and a value class.
  66.  * 
  67.  * @see Client
  68.  */
  69. public abstract class Server {
  70.   
  71.   /**
  72.    * The first four bytes of Hadoop RPC connections
  73.    */
  74.   public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
  75.   
  76.   // 1 : Introduce ping and server does not throw away RPCs
  77.   // 3 : Introduce the protocol into the RPC connection header
  78.   public static final byte CURRENT_VERSION = 3;
  79.   
  80.   /**
  81.    * How many calls/handler are allowed in the queue.
  82.    */
  83.   private static final int MAX_QUEUE_SIZE_PER_HANDLER = 100;
  84.   
  85.   public static final Log LOG = LogFactory.getLog(Server.class);
  86.   private static final ThreadLocal<Server> SERVER = new ThreadLocal<Server>();
  87.   private static final Map<String, Class<?>> PROTOCOL_CACHE = 
  88.     new ConcurrentHashMap<String, Class<?>>();
  89.   
  90.   static Class<?> getProtocolClass(String protocolName, Configuration conf) 
  91.   throws ClassNotFoundException {
  92.     Class<?> protocol = PROTOCOL_CACHE.get(protocolName);
  93.     if (protocol == null) {
  94.       protocol = conf.getClassByName(protocolName);
  95.       PROTOCOL_CACHE.put(protocolName, protocol);
  96.     }
  97.     return protocol;
  98.   }
  99.   
  100.   /** Returns the server instance called under or null.  May be called under
  101.    * {@link #call(Writable, long)} implementations, and under {@link Writable}
  102.    * methods of paramters and return values.  Permits applications to access
  103.    * the server context.*/
  104.   public static Server get() {
  105.     return SERVER.get();
  106.   }
  107.  
  108.   /** This is set to Call object before Handler invokes an RPC and reset
  109.    * after the call returns.
  110.    */
  111.   private static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();
  112.   
  113.   /** Returns the remote side ip address when invoked inside an RPC 
  114.    *  Returns null incase of an error.
  115.    */
  116.   public static InetAddress getRemoteIp() {
  117.     Call call = CurCall.get();
  118.     if (call != null) {
  119.       return call.connection.socket.getInetAddress();
  120.     }
  121.     return null;
  122.   }
  123.   /** Returns remote address as a string when invoked inside an RPC.
  124.    *  Returns null in case of an error.
  125.    */
  126.   public static String getRemoteAddress() {
  127.     InetAddress addr = getRemoteIp();
  128.     return (addr == null) ? null : addr.getHostAddress();
  129.   }
  130.   private String bindAddress; 
  131.   private int port;                               // port we listen on
  132.   private int handlerCount;                       // number of handler threads
  133.   private Class<? extends Writable> paramClass;   // class of call parameters
  134.   private int maxIdleTime;                        // the maximum idle time after 
  135.                                                   // which a client may be disconnected
  136.   private int thresholdIdleConnections;           // the number of idle connections
  137.                                                   // after which we will start
  138.                                                   // cleaning up idle 
  139.                                                   // connections
  140.   int maxConnectionsToNuke;                       // the max number of 
  141.                                                   // connections to nuke
  142.                                                   //during a cleanup
  143.   
  144.   protected RpcMetrics  rpcMetrics;
  145.   
  146.   private Configuration conf;
  147.   private int maxQueueSize;
  148.   private int socketSendBufferSize;
  149.   private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
  150.   volatile private boolean running = true;         // true while server runs
  151.   private BlockingQueue<Call> callQueue; // queued calls
  152.   private List<Connection> connectionList = 
  153.     Collections.synchronizedList(new LinkedList<Connection>());
  154.   //maintain a list
  155.   //of client connections
  156.   private Listener listener = null;
  157.   private Responder responder = null;
  158.   private int numConnections = 0;
  159.   private Handler[] handlers = null;
  160.   /**
  161.    * A convenience method to bind to a given address and report 
  162.    * better exceptions if the address is not a valid host.
  163.    * @param socket the socket to bind
  164.    * @param address the address to bind to
  165.    * @param backlog the number of connections allowed in the queue
  166.    * @throws BindException if the address can't be bound
  167.    * @throws UnknownHostException if the address isn't a valid host name
  168.    * @throws IOException other random errors from bind
  169.    */
  170.   public static void bind(ServerSocket socket, InetSocketAddress address, 
  171.                           int backlog) throws IOException {
  172.     try {
  173.       socket.bind(address, backlog);
  174.     } catch (BindException e) {
  175.       BindException bindException = new BindException("Problem binding to " + address
  176.                                                       + " : " + e.getMessage());
  177.       bindException.initCause(e);
  178.       throw bindException;
  179.     } catch (SocketException e) {
  180.       // If they try to bind to a different host's address, give a better
  181.       // error message.
  182.       if ("Unresolved address".equals(e.getMessage())) {
  183.         throw new UnknownHostException("Invalid hostname for server: " + 
  184.                                        address.getHostName());
  185.       } else {
  186.         throw e;
  187.       }
  188.     }
  189.   }
  190.   /** A call queued for handling. */
  191.   private static class Call {
  192.     private int id;                               // the client's call id
  193.     private Writable param;                       // the parameter passed
  194.     private Connection connection;                // connection to client
  195.     private long timestamp;     // the time received when response is null
  196.                                    // the time served when response is not null
  197.     private ByteBuffer response;                      // the response for this call
  198.     public Call(int id, Writable param, Connection connection) { 
  199.       this.id = id;
  200.       this.param = param;
  201.       this.connection = connection;
  202.       this.timestamp = System.currentTimeMillis();
  203.       this.response = null;
  204.     }
  205.     
  206.     @Override
  207.     public String toString() {
  208.       return param.toString() + " from " + connection.toString();
  209.     }
  210.     public void setResponse(ByteBuffer response) {
  211.       this.response = response;
  212.     }
  213.   }
  214.   /** Listens on the socket. Creates jobs for the handler threads*/
  215.   private class Listener extends Thread {
  216.     
  217.     private ServerSocketChannel acceptChannel = null; //the accept channel
  218.     private Selector selector = null; //the selector that we use for the server
  219.     private InetSocketAddress address; //the address we bind at
  220.     private Random rand = new Random();
  221.     private long lastCleanupRunTime = 0; //the last time when a cleanup connec-
  222.                                          //-tion (for idle connections) ran
  223.     private long cleanupInterval = 10000; //the minimum interval between 
  224.                                           //two cleanup runs
  225.     private int backlogLength = conf.getInt("ipc.server.listen.queue.size", 128);
  226.     
  227.     public Listener() throws IOException {
  228.       address = new InetSocketAddress(bindAddress, port);
  229.       // Create a new server socket and set to non blocking mode
  230.       acceptChannel = ServerSocketChannel.open();
  231.       acceptChannel.configureBlocking(false);
  232.       // Bind the server socket to the local host and port
  233.       bind(acceptChannel.socket(), address, backlogLength);
  234.       port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
  235.       // create a selector;
  236.       selector= Selector.open();
  237.       // Register accepts on the server socket with the selector.
  238.       acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
  239.       this.setName("IPC Server listener on " + port);
  240.       this.setDaemon(true);
  241.     }
  242.     /** cleanup connections from connectionList. Choose a random range
  243.      * to scan and also have a limit on the number of the connections
  244.      * that will be cleanedup per run. The criteria for cleanup is the time
  245.      * for which the connection was idle. If 'force' is true then all 
  246.      * connections will be looked at for the cleanup.
  247.      */
  248.     private void cleanupConnections(boolean force) {
  249.       if (force || numConnections > thresholdIdleConnections) {
  250.         long currentTime = System.currentTimeMillis();
  251.         if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) {
  252.           return;
  253.         }
  254.         int start = 0;
  255.         int end = numConnections - 1;
  256.         if (!force) {
  257.           start = rand.nextInt() % numConnections;
  258.           end = rand.nextInt() % numConnections;
  259.           int temp;
  260.           if (end < start) {
  261.             temp = start;
  262.             start = end;
  263.             end = temp;
  264.           }
  265.         }
  266.         int i = start;
  267.         int numNuked = 0;
  268.         while (i <= end) {
  269.           Connection c;
  270.           synchronized (connectionList) {
  271.             try {
  272.               c = connectionList.get(i);
  273.             } catch (Exception e) {return;}
  274.           }
  275.           if (c.timedOut(currentTime)) {
  276.             if (LOG.isDebugEnabled())
  277.               LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
  278.             closeConnection(c);
  279.             numNuked++;
  280.             end--;
  281.             c = null;
  282.             if (!force && numNuked == maxConnectionsToNuke) break;
  283.           }
  284.           else i++;
  285.         }
  286.         lastCleanupRunTime = System.currentTimeMillis();
  287.       }
  288.     }
  289.     @Override
  290.     public void run() {
  291.       LOG.info(getName() + ": starting");
  292.       SERVER.set(Server.this);
  293.       while (running) {
  294.         SelectionKey key = null;
  295.         try {
  296.           selector.select();
  297.           Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
  298.           while (iter.hasNext()) {
  299.             key = iter.next();
  300.             iter.remove();
  301.             try {
  302.               if (key.isValid()) {
  303.                 if (key.isAcceptable())
  304.                   doAccept(key);
  305.                 else if (key.isReadable())
  306.                   doRead(key);
  307.               }
  308.             } catch (IOException e) {
  309.             }
  310.             key = null;
  311.           }
  312.         } catch (OutOfMemoryError e) {
  313.           // we can run out of memory if we have too many threads
  314.           // log the event and sleep for a minute and give 
  315.           // some thread(s) a chance to finish
  316.           LOG.warn("Out of Memory in server select", e);
  317.           closeCurrentConnection(key, e);
  318.           cleanupConnections(true);
  319.           try { Thread.sleep(60000); } catch (Exception ie) {}
  320.         } catch (InterruptedException e) {
  321.           if (running) {                          // unexpected -- log it
  322.             LOG.info(getName() + " caught: " +
  323.                      StringUtils.stringifyException(e));
  324.           }
  325.         } catch (Exception e) {
  326.           closeCurrentConnection(key, e);
  327.         }
  328.         cleanupConnections(false);
  329.       }
  330.       LOG.info("Stopping " + this.getName());
  331.       synchronized (this) {
  332.         try {
  333.           acceptChannel.close();
  334.           selector.close();
  335.         } catch (IOException e) { }
  336.         selector= null;
  337.         acceptChannel= null;
  338.         
  339.         // clean up all connections
  340.         while (!connectionList.isEmpty()) {
  341.           closeConnection(connectionList.remove(0));
  342.         }
  343.       }
  344.     }
  345.     private void closeCurrentConnection(SelectionKey key, Throwable e) {
  346.       if (key != null) {
  347.         Connection c = (Connection)key.attachment();
  348.         if (c != null) {
  349.           if (LOG.isDebugEnabled())
  350.             LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
  351.           closeConnection(c);
  352.           c = null;
  353.         }
  354.       }
  355.     }
  356.     InetSocketAddress getAddress() {
  357.       return (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();
  358.     }
  359.     
  360.     void doAccept(SelectionKey key) throws IOException,  OutOfMemoryError {
  361.       Connection c = null;
  362.       ServerSocketChannel server = (ServerSocketChannel) key.channel();
  363.       // accept up to 10 connections
  364.       for (int i=0; i<10; i++) {
  365.         SocketChannel channel = server.accept();
  366.         if (channel==null) return;
  367.         channel.configureBlocking(false);
  368.         channel.socket().setTcpNoDelay(tcpNoDelay);
  369.         SelectionKey readKey = channel.register(selector, SelectionKey.OP_READ);
  370.         c = new Connection(readKey, channel, System.currentTimeMillis());
  371.         readKey.attach(c);
  372.         synchronized (connectionList) {
  373.           connectionList.add(numConnections, c);
  374.           numConnections++;
  375.         }
  376.         if (LOG.isDebugEnabled())
  377.           LOG.debug("Server connection from " + c.toString() +
  378.               "; # active connections: " + numConnections +
  379.               "; # queued calls: " + callQueue.size());
  380.       }
  381.     }
  382.     void doRead(SelectionKey key) throws InterruptedException {
  383.       int count = 0;
  384.       Connection c = (Connection)key.attachment();
  385.       if (c == null) {
  386.         return;  
  387.       }
  388.       c.setLastContact(System.currentTimeMillis());
  389.       
  390.       try {
  391.         count = c.readAndProcess();
  392.       } catch (InterruptedException ieo) {
  393.         LOG.info(getName() + ": readAndProcess caught InterruptedException", ieo);
  394.         throw ieo;
  395.       } catch (Exception e) {
  396.         LOG.info(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count, e);
  397.         count = -1; //so that the (count < 0) block is executed
  398.       }
  399.       if (count < 0) {
  400.         if (LOG.isDebugEnabled())
  401.           LOG.debug(getName() + ": disconnecting client " + 
  402.                     c.getHostAddress() + ". Number of active connections: "+
  403.                     numConnections);
  404.         closeConnection(c);
  405.         c = null;
  406.       }
  407.       else {
  408.         c.setLastContact(System.currentTimeMillis());
  409.       }
  410.     }   
  411.     synchronized void doStop() {
  412.       if (selector != null) {
  413.         selector.wakeup();
  414.         Thread.yield();
  415.       }
  416.       if (acceptChannel != null) {
  417.         try {
  418.           acceptChannel.socket().close();
  419.         } catch (IOException e) {
  420.           LOG.info(getName() + ":Exception in closing listener socket. " + e);
  421.         }
  422.       }
  423.     }
  424.   }
  425.   // Sends responses of RPC back to clients.
  426.   private class Responder extends Thread {
  427.     private Selector writeSelector;
  428.     private int pending;         // connections waiting to register
  429.     
  430.     final static int PURGE_INTERVAL = 900000; // 15mins
  431.     Responder() throws IOException {
  432.       this.setName("IPC Server Responder");
  433.       this.setDaemon(true);
  434.       writeSelector = Selector.open(); // create a selector
  435.       pending = 0;
  436.     }
  437.     @Override
  438.     public void run() {
  439.       LOG.info(getName() + ": starting");
  440.       SERVER.set(Server.this);
  441.       long lastPurgeTime = 0;   // last check for old calls.
  442.       while (running) {
  443.         try {
  444.           waitPending();     // If a channel is being registered, wait.
  445.           writeSelector.select(PURGE_INTERVAL);
  446.           Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
  447.           while (iter.hasNext()) {
  448.             SelectionKey key = iter.next();
  449.             iter.remove();
  450.             try {
  451.               if (key.isValid() && key.isWritable()) {
  452.                   doAsyncWrite(key);
  453.               }
  454.             } catch (IOException e) {
  455.               LOG.info(getName() + ": doAsyncWrite threw exception " + e);
  456.             }
  457.           }
  458.           long now = System.currentTimeMillis();
  459.           if (now < lastPurgeTime + PURGE_INTERVAL) {
  460.             continue;
  461.           }
  462.           lastPurgeTime = now;
  463.           //
  464.           // If there were some calls that have not been sent out for a
  465.           // long time, discard them.
  466.           //
  467.           LOG.debug("Checking for old call responses.");
  468.           ArrayList<Call> calls;
  469.           
  470.           // get the list of channels from list of keys.
  471.           synchronized (writeSelector.keys()) {
  472.             calls = new ArrayList<Call>(writeSelector.keys().size());
  473.             iter = writeSelector.keys().iterator();
  474.             while (iter.hasNext()) {
  475.               SelectionKey key = iter.next();
  476.               Call call = (Call)key.attachment();
  477.               if (call != null && key.channel() == call.connection.channel) { 
  478.                 calls.add(call);
  479.               }
  480.             }
  481.           }
  482.           
  483.           for(Call call : calls) {
  484.             try {
  485.               doPurge(call, now);
  486.             } catch (IOException e) {
  487.               LOG.warn("Error in purging old calls " + e);
  488.             }
  489.           }
  490.         } catch (OutOfMemoryError e) {
  491.           //
  492.           // we can run out of memory if we have too many threads
  493.           // log the event and sleep for a minute and give
  494.           // some thread(s) a chance to finish
  495.           //
  496.           LOG.warn("Out of Memory in server select", e);
  497.           try { Thread.sleep(60000); } catch (Exception ie) {}
  498.         } catch (Exception e) {
  499.           LOG.warn("Exception in Responder " + 
  500.                    StringUtils.stringifyException(e));
  501.         }
  502.       }
  503.       LOG.info("Stopping " + this.getName());
  504.     }
  505.     private void doAsyncWrite(SelectionKey key) throws IOException {
  506.       Call call = (Call)key.attachment();
  507.       if (call == null) {
  508.         return;
  509.       }
  510.       if (key.channel() != call.connection.channel) {
  511.         throw new IOException("doAsyncWrite: bad channel");
  512.       }
  513.       synchronized(call.connection.responseQueue) {
  514.         if (processResponse(call.connection.responseQueue, false)) {
  515.           try {
  516.             key.interestOps(0);
  517.           } catch (CancelledKeyException e) {
  518.             /* The Listener/reader might have closed the socket.
  519.              * We don't explicitly cancel the key, so not sure if this will
  520.              * ever fire.
  521.              * This warning could be removed.
  522.              */
  523.             LOG.warn("Exception while changing ops : " + e);
  524.           }
  525.         }
  526.       }
  527.     }
  528.     //
  529.     // Remove calls that have been pending in the responseQueue 
  530.     // for a long time.
  531.     //
  532.     private void doPurge(Call call, long now) throws IOException {
  533.       LinkedList<Call> responseQueue = call.connection.responseQueue;
  534.       synchronized (responseQueue) {
  535.         Iterator<Call> iter = responseQueue.listIterator(0);
  536.         while (iter.hasNext()) {
  537.           call = iter.next();
  538.           if (now > call.timestamp + PURGE_INTERVAL) {
  539.             closeConnection(call.connection);
  540.             break;
  541.           }
  542.         }
  543.       }
  544.     }
  545.     // Processes one response. Returns true if there are no more pending
  546.     // data for this channel.
  547.     //
  548.     private boolean processResponse(LinkedList<Call> responseQueue,
  549.                                     boolean inHandler) throws IOException {
  550.       boolean error = true;
  551.       boolean done = false;       // there is more data for this channel.
  552.       int numElements = 0;
  553.       Call call = null;
  554.       try {
  555.         synchronized (responseQueue) {
  556.           //
  557.           // If there are no items for this channel, then we are done
  558.           //
  559.           numElements = responseQueue.size();
  560.           if (numElements == 0) {
  561.             error = false;
  562.             return true;              // no more data for this channel.
  563.           }
  564.           //
  565.           // Extract the first call
  566.           //
  567.           call = responseQueue.removeFirst();
  568.           SocketChannel channel = call.connection.channel;
  569.           if (LOG.isDebugEnabled()) {
  570.             LOG.debug(getName() + ": responding to #" + call.id + " from " +
  571.                       call.connection);
  572.           }
  573.           //
  574.           // Send as much data as we can in the non-blocking fashion
  575.           //
  576.           int numBytes = channelWrite(channel, call.response);
  577.           if (numBytes < 0) {
  578.             return true;
  579.           }
  580.           if (!call.response.hasRemaining()) {
  581.             call.connection.decRpcCount();
  582.             if (numElements == 1) {    // last call fully processes.
  583.               done = true;             // no more data for this channel.
  584.             } else {
  585.               done = false;            // more calls pending to be sent.
  586.             }
  587.             if (LOG.isDebugEnabled()) {
  588.               LOG.debug(getName() + ": responding to #" + call.id + " from " +
  589.                         call.connection + " Wrote " + numBytes + " bytes.");
  590.             }
  591.           } else {
  592.             //
  593.             // If we were unable to write the entire response out, then 
  594.             // insert in Selector queue. 
  595.             //
  596.             call.connection.responseQueue.addFirst(call);
  597.             
  598.             if (inHandler) {
  599.               // set the serve time when the response has to be sent later
  600.               call.timestamp = System.currentTimeMillis();
  601.               
  602.               incPending();
  603.               try {
  604.                 // Wakeup the thread blocked on select, only then can the call 
  605.                 // to channel.register() complete.
  606.                 writeSelector.wakeup();
  607.                 channel.register(writeSelector, SelectionKey.OP_WRITE, call);
  608.               } catch (ClosedChannelException e) {
  609.                 //Its ok. channel might be closed else where.
  610.                 done = true;
  611.               } finally {
  612.                 decPending();
  613.               }
  614.             }
  615.             if (LOG.isDebugEnabled()) {
  616.               LOG.debug(getName() + ": responding to #" + call.id + " from " +
  617.                         call.connection + " Wrote partial " + numBytes + 
  618.                         " bytes.");
  619.             }
  620.           }
  621.           error = false;              // everything went off well
  622.         }
  623.       } finally {
  624.         if (error && call != null) {
  625.           LOG.warn(getName()+", call " + call + ": output error");
  626.           done = true;               // error. no more data for this channel.
  627.           closeConnection(call.connection);
  628.         }
  629.       }
  630.       return done;
  631.     }
  632.     //
  633.     // Enqueue a response from the application.
  634.     //
  635.     void doRespond(Call call) throws IOException {
  636.       synchronized (call.connection.responseQueue) {
  637.         call.connection.responseQueue.addLast(call);
  638.         if (call.connection.responseQueue.size() == 1) {
  639.           processResponse(call.connection.responseQueue, true);
  640.         }
  641.       }
  642.     }
  643.     private synchronized void incPending() {   // call waiting to be enqueued.
  644.       pending++;
  645.     }
  646.     private synchronized void decPending() { // call done enqueueing.
  647.       pending--;
  648.       notify();
  649.     }
  650.     private synchronized void waitPending() throws InterruptedException {
  651.       while (pending > 0) {
  652.         wait();
  653.       }
  654.     }
  655.   }
  656.   /** Reads calls from a connection and queues them for handling. */
  657.   private class Connection {
  658.     private boolean versionRead = false; //if initial signature and
  659.                                          //version are read
  660.     private boolean headerRead = false;  //if the connection header that
  661.                                          //follows version is read.
  662.     private SocketChannel channel;
  663.     private ByteBuffer data;
  664.     private ByteBuffer dataLengthBuffer;
  665.     private LinkedList<Call> responseQueue;
  666.     private volatile int rpcCount = 0; // number of outstanding rpcs
  667.     private long lastContact;
  668.     private int dataLength;
  669.     private Socket socket;
  670.     // Cache the remote host & port info so that even if the socket is 
  671.     // disconnected, we can say where it used to connect to.
  672.     private String hostAddress;
  673.     private int remotePort;
  674.     
  675.     ConnectionHeader header = new ConnectionHeader();
  676.     Class<?> protocol;
  677.     
  678.     Subject user = null;
  679.     // Fake 'call' for failed authorization response
  680.     private final int AUTHROIZATION_FAILED_CALLID = -1;
  681.     private final Call authFailedCall = 
  682.       new Call(AUTHROIZATION_FAILED_CALLID, null, null);
  683.     private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream();
  684.     
  685.     public Connection(SelectionKey key, SocketChannel channel, 
  686.                       long lastContact) {
  687.       this.channel = channel;
  688.       this.lastContact = lastContact;
  689.       this.data = null;
  690.       this.dataLengthBuffer = ByteBuffer.allocate(4);
  691.       this.socket = channel.socket();
  692.       InetAddress addr = socket.getInetAddress();
  693.       if (addr == null) {
  694.         this.hostAddress = "*Unknown*";
  695.       } else {
  696.         this.hostAddress = addr.getHostAddress();
  697.       }
  698.       this.remotePort = socket.getPort();
  699.       this.responseQueue = new LinkedList<Call>();
  700.       if (socketSendBufferSize != 0) {
  701.         try {
  702.           socket.setSendBufferSize(socketSendBufferSize);
  703.         } catch (IOException e) {
  704.           LOG.warn("Connection: unable to set socket send buffer size to " +
  705.                    socketSendBufferSize);
  706.         }
  707.       }
  708.     }   
  709.     @Override
  710.     public String toString() {
  711.       return getHostAddress() + ":" + remotePort; 
  712.     }
  713.     
  714.     public String getHostAddress() {
  715.       return hostAddress;
  716.     }
  717.     public void setLastContact(long lastContact) {
  718.       this.lastContact = lastContact;
  719.     }
  720.     public long getLastContact() {
  721.       return lastContact;
  722.     }
  723.     /* Return true if the connection has no outstanding rpc */
  724.     private boolean isIdle() {
  725.       return rpcCount == 0;
  726.     }
  727.     
  728.     /* Decrement the outstanding RPC count */
  729.     private void decRpcCount() {
  730.       rpcCount--;
  731.     }
  732.     
  733.     /* Increment the outstanding RPC count */
  734.     private void incRpcCount() {
  735.       rpcCount++;
  736.     }
  737.     
  738.     private boolean timedOut(long currentTime) {
  739.       if (isIdle() && currentTime -  lastContact > maxIdleTime)
  740.         return true;
  741.       return false;
  742.     }
  743.     public int readAndProcess() throws IOException, InterruptedException {
  744.       while (true) {
  745.         /* Read at most one RPC. If the header is not read completely yet
  746.          * then iterate until we read first RPC or until there is no data left.
  747.          */    
  748.         int count = -1;
  749.         if (dataLengthBuffer.remaining() > 0) {
  750.           count = channelRead(channel, dataLengthBuffer);       
  751.           if (count < 0 || dataLengthBuffer.remaining() > 0) 
  752.             return count;
  753.         }
  754.       
  755.         if (!versionRead) {
  756.           //Every connection is expected to send the header.
  757.           ByteBuffer versionBuffer = ByteBuffer.allocate(1);
  758.           count = channelRead(channel, versionBuffer);
  759.           if (count <= 0) {
  760.             return count;
  761.           }
  762.           int version = versionBuffer.get(0);
  763.           
  764.           dataLengthBuffer.flip();          
  765.           if (!HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION) {
  766.             //Warning is ok since this is not supposed to happen.
  767.             LOG.warn("Incorrect header or version mismatch from " + 
  768.                      hostAddress + ":" + remotePort +
  769.                      " got version " + version + 
  770.                      " expected version " + CURRENT_VERSION);
  771.             return -1;
  772.           }
  773.           dataLengthBuffer.clear();
  774.           versionRead = true;
  775.           continue;
  776.         }
  777.         
  778.         if (data == null) {
  779.           dataLengthBuffer.flip();
  780.           dataLength = dataLengthBuffer.getInt();
  781.        
  782.           if (dataLength == Client.PING_CALL_ID) {
  783.             dataLengthBuffer.clear();
  784.             return 0;  //ping message
  785.           }
  786.           data = ByteBuffer.allocate(dataLength);
  787.           incRpcCount();  // Increment the rpc count
  788.         }
  789.         
  790.         count = channelRead(channel, data);
  791.         
  792.         if (data.remaining() == 0) {
  793.           dataLengthBuffer.clear();
  794.           data.flip();
  795.           if (headerRead) {
  796.             processData();
  797.             data = null;
  798.             return count;
  799.           } else {
  800.             processHeader();
  801.             headerRead = true;
  802.             data = null;
  803.             
  804.             // Authorize the connection
  805.             try {
  806.               authorize(user, header);
  807.               
  808.               if (LOG.isDebugEnabled()) {
  809.                 LOG.debug("Successfully authorized " + header);
  810.               }
  811.             } catch (AuthorizationException ae) {
  812.               authFailedCall.connection = this;
  813.               setupResponse(authFailedResponse, authFailedCall, 
  814.                             Status.FATAL, null, 
  815.                             ae.getClass().getName(), ae.getMessage());
  816.               responder.doRespond(authFailedCall);
  817.               
  818.               // Close this connection
  819.               return -1;
  820.             }
  821.             continue;
  822.           }
  823.         } 
  824.         return count;
  825.       }
  826.     }
  827.     /// Reads the connection header following version
  828.     private void processHeader() throws IOException {
  829.       DataInputStream in =
  830.         new DataInputStream(new ByteArrayInputStream(data.array()));
  831.       header.readFields(in);
  832.       try {
  833.         String protocolClassName = header.getProtocol();
  834.         if (protocolClassName != null) {
  835.           protocol = getProtocolClass(header.getProtocol(), conf);
  836.         }
  837.       } catch (ClassNotFoundException cnfe) {
  838.         throw new IOException("Unknown protocol: " + header.getProtocol());
  839.       }
  840.       
  841.       // TODO: Get the user name from the GSS API for Kerberbos-based security
  842.       // Create the user subject
  843.       user = SecurityUtil.getSubject(header.getUgi());
  844.     }
  845.     
  846.     private void processData() throws  IOException, InterruptedException {
  847.       DataInputStream dis =
  848.         new DataInputStream(new ByteArrayInputStream(data.array()));
  849.       int id = dis.readInt();                    // try to read an id
  850.         
  851.       if (LOG.isDebugEnabled())
  852.         LOG.debug(" got #" + id);
  853.       Writable param = ReflectionUtils.newInstance(paramClass, conf);           // read param
  854.       param.readFields(dis);        
  855.         
  856.       Call call = new Call(id, param, this);
  857.       callQueue.put(call);              // queue the call; maybe blocked here
  858.     }
  859.     private synchronized void close() throws IOException {
  860.       data = null;
  861.       dataLengthBuffer = null;
  862.       if (!channel.isOpen())
  863.         return;
  864.       try {socket.shutdownOutput();} catch(Exception e) {}
  865.       if (channel.isOpen()) {
  866.         try {channel.close();} catch(Exception e) {}
  867.       }
  868.       try {socket.close();} catch(Exception e) {}
  869.     }
  870.   }
  871.   /** Handles queued calls . */
  872.   private class Handler extends Thread {
  873.     public Handler(int instanceNumber) {
  874.       this.setDaemon(true);
  875.       this.setName("IPC Server handler "+ instanceNumber + " on " + port);
  876.     }
  877.     @Override
  878.     public void run() {
  879.       LOG.info(getName() + ": starting");
  880.       SERVER.set(Server.this);
  881.       ByteArrayOutputStream buf = new ByteArrayOutputStream(10240);
  882.       while (running) {
  883.         try {
  884.           final Call call = callQueue.take(); // pop the queue; maybe blocked here
  885.           if (LOG.isDebugEnabled())
  886.             LOG.debug(getName() + ": has #" + call.id + " from " +
  887.                       call.connection);
  888.           
  889.           String errorClass = null;
  890.           String error = null;
  891.           Writable value = null;
  892.           CurCall.set(call);
  893.           try {
  894.             // Make the call as the user via Subject.doAs, thus associating
  895.             // the call with the Subject
  896.             value = 
  897.               Subject.doAs(call.connection.user, 
  898.                            new PrivilegedExceptionAction<Writable>() {
  899.                               @Override
  900.                               public Writable run() throws Exception {
  901.                                 // make the call
  902.                                 return call(call.connection.protocol, 
  903.                                             call.param, call.timestamp);
  904.                               }
  905.                            }
  906.                           );
  907.               
  908.           } catch (PrivilegedActionException pae) {
  909.             Exception e = pae.getException();
  910.             LOG.info(getName()+", call "+call+": error: " + e, e);
  911.             errorClass = e.getClass().getName();
  912.             error = StringUtils.stringifyException(e);
  913.           } catch (Throwable e) {
  914.             LOG.info(getName()+", call "+call+": error: " + e, e);
  915.             errorClass = e.getClass().getName();
  916.             error = StringUtils.stringifyException(e);
  917.           }
  918.           CurCall.set(null);
  919.           setupResponse(buf, call, 
  920.                         (error == null) ? Status.SUCCESS : Status.ERROR, 
  921.                         value, errorClass, error);
  922.           responder.doRespond(call);
  923.         } catch (InterruptedException e) {
  924.           if (running) {                          // unexpected -- log it
  925.             LOG.info(getName() + " caught: " +
  926.                      StringUtils.stringifyException(e));
  927.           }
  928.         } catch (Exception e) {
  929.           LOG.info(getName() + " caught: " +
  930.                    StringUtils.stringifyException(e));
  931.         }
  932.       }
  933.       LOG.info(getName() + ": exiting");
  934.     }
  935.   }
  936.   
  937.   protected Server(String bindAddress, int port,
  938.                   Class<? extends Writable> paramClass, int handlerCount, 
  939.                   Configuration conf)
  940.     throws IOException 
  941.   {
  942.     this(bindAddress, port, paramClass, handlerCount,  conf, Integer.toString(port));
  943.   }
  944.   /** Constructs a server listening on the named port and address.  Parameters passed must
  945.    * be of the named class.  The <code>handlerCount</handlerCount> determines
  946.    * the number of handler threads that will be used to process calls.
  947.    * 
  948.    */
  949.   protected Server(String bindAddress, int port, 
  950.                   Class<? extends Writable> paramClass, int handlerCount, 
  951.                   Configuration conf, String serverName) 
  952.     throws IOException {
  953.     this.bindAddress = bindAddress;
  954.     this.conf = conf;
  955.     this.port = port;
  956.     this.paramClass = paramClass;
  957.     this.handlerCount = handlerCount;
  958.     this.socketSendBufferSize = 0;
  959.     this.maxQueueSize = handlerCount * MAX_QUEUE_SIZE_PER_HANDLER;
  960.     this.callQueue  = new LinkedBlockingQueue<Call>(maxQueueSize); 
  961.     this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000);
  962.     this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
  963.     this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);
  964.     
  965.     // Start the listener here and let it bind to the port
  966.     listener = new Listener();
  967.     this.port = listener.getAddress().getPort();    
  968.     this.rpcMetrics = new RpcMetrics(serverName,
  969.                           Integer.toString(this.port), this);
  970.     this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", false);
  971.     // Create the responder here
  972.     responder = new Responder();
  973.   }
  974.   private void closeConnection(Connection connection) {
  975.     synchronized (connectionList) {
  976.       if (connectionList.remove(connection))
  977.         numConnections--;
  978.     }
  979.     try {
  980.       connection.close();
  981.     } catch (IOException e) {
  982.     }
  983.   }
  984.   
  985.   /**
  986.    * Setup response for the IPC Call.
  987.    * 
  988.    * @param response buffer to serialize the response into
  989.    * @param call {@link Call} to which we are setting up the response
  990.    * @param status {@link Status} of the IPC call
  991.    * @param rv return value for the IPC Call, if the call was successful
  992.    * @param errorClass error class, if the the call failed
  993.    * @param error error message, if the call failed
  994.    * @throws IOException
  995.    */
  996.   private void setupResponse(ByteArrayOutputStream response, 
  997.                              Call call, Status status, 
  998.                              Writable rv, String errorClass, String error) 
  999.   throws IOException {
  1000.     response.reset();
  1001.     DataOutputStream out = new DataOutputStream(response);
  1002.     out.writeInt(call.id);                // write call id
  1003.     out.writeInt(status.state);           // write status
  1004.     if (status == Status.SUCCESS) {
  1005.       rv.write(out);
  1006.     } else {
  1007.       WritableUtils.writeString(out, errorClass);
  1008.       WritableUtils.writeString(out, error);
  1009.     }
  1010.     call.setResponse(ByteBuffer.wrap(response.toByteArray()));
  1011.   }
  1012.   
  1013.   Configuration getConf() {
  1014.     return conf;
  1015.   }
  1016.   
  1017.   /** Sets the socket buffer size used for responding to RPCs */
  1018.   public void setSocketSendBufSize(int size) { this.socketSendBufferSize = size; }
  1019.   /** Starts the service.  Must be called before any calls will be handled. */
  1020.   public synchronized void start() throws IOException {
  1021.     responder.start();
  1022.     listener.start();
  1023.     handlers = new Handler[handlerCount];
  1024.     
  1025.     for (int i = 0; i < handlerCount; i++) {
  1026.       handlers[i] = new Handler(i);
  1027.       handlers[i].start();
  1028.     }
  1029.   }
  1030.   /** Stops the service.  No new calls will be handled after this is called. */
  1031.   public synchronized void stop() {
  1032.     LOG.info("Stopping server on " + port);
  1033.     running = false;
  1034.     if (handlers != null) {
  1035.       for (int i = 0; i < handlerCount; i++) {
  1036.         if (handlers[i] != null) {
  1037.           handlers[i].interrupt();
  1038.         }
  1039.       }
  1040.     }
  1041.     listener.interrupt();
  1042.     listener.doStop();
  1043.     responder.interrupt();
  1044.     notifyAll();
  1045.     if (this.rpcMetrics != null) {
  1046.       this.rpcMetrics.shutdown();
  1047.     }
  1048.   }
  1049.   /** Wait for the server to be stopped.
  1050.    * Does not wait for all subthreads to finish.
  1051.    *  See {@link #stop()}.
  1052.    */
  1053.   public synchronized void join() throws InterruptedException {
  1054.     while (running) {
  1055.       wait();
  1056.     }
  1057.   }
  1058.   /**
  1059.    * Return the socket (ip+port) on which the RPC server is listening to.
  1060.    * @return the socket (ip+port) on which the RPC server is listening to.
  1061.    */
  1062.   public synchronized InetSocketAddress getListenerAddress() {
  1063.     return listener.getAddress();
  1064.   }
  1065.   
  1066.   /** 
  1067.    * Called for each call. 
  1068.    * @deprecated Use {@link #call(Class, Writable, long)} instead
  1069.    */
  1070.   @Deprecated
  1071.   public Writable call(Writable param, long receiveTime) throws IOException {
  1072.     return call(null, param, receiveTime);
  1073.   }
  1074.   
  1075.   /** Called for each call. */
  1076.   public abstract Writable call(Class<?> protocol,
  1077.                                Writable param, long receiveTime)
  1078.   throws IOException;
  1079.   
  1080.   /**
  1081.    * Authorize the incoming client connection.
  1082.    * 
  1083.    * @param user client user
  1084.    * @param connection incoming connection
  1085.    * @throws AuthorizationException when the client isn't authorized to talk the protocol
  1086.    */
  1087.   public void authorize(Subject user, ConnectionHeader connection) 
  1088.   throws AuthorizationException {}
  1089.   
  1090.   /**
  1091.    * The number of open RPC conections
  1092.    * @return the number of open rpc connections
  1093.    */
  1094.   public int getNumOpenConnections() {
  1095.     return numConnections;
  1096.   }
  1097.   
  1098.   /**
  1099.    * The number of rpc calls in the queue.
  1100.    * @return The number of rpc calls in the queue.
  1101.    */
  1102.   public int getCallQueueLen() {
  1103.     return callQueue.size();
  1104.   }
  1105.   
  1106.   
  1107.   /**
  1108.    * When the read or write buffer size is larger than this limit, i/o will be 
  1109.    * done in chunks of this size. Most RPC requests and responses would be
  1110.    * be smaller.
  1111.    */
  1112.   private static int NIO_BUFFER_LIMIT = 8*1024; //should not be more than 64KB.
  1113.   
  1114.   /**
  1115.    * This is a wrapper around {@link WritableByteChannel#write(ByteBuffer)}.
  1116.    * If the amount of data is large, it writes to channel in smaller chunks. 
  1117.    * This is to avoid jdk from creating many direct buffers as the size of 
  1118.    * buffer increases. This also minimizes extra copies in NIO layer
  1119.    * as a result of multiple write operations required to write a large 
  1120.    * buffer.  
  1121.    *
  1122.    * @see WritableByteChannel#write(ByteBuffer)
  1123.    */
  1124.   private static int channelWrite(WritableByteChannel channel, 
  1125.                                   ByteBuffer buffer) throws IOException {
  1126.     
  1127.     return (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
  1128.            channel.write(buffer) : channelIO(null, channel, buffer);
  1129.   }
  1130.   
  1131.   
  1132.   /**
  1133.    * This is a wrapper around {@link ReadableByteChannel#read(ByteBuffer)}.
  1134.    * If the amount of data is large, it writes to channel in smaller chunks. 
  1135.    * This is to avoid jdk from creating many direct buffers as the size of 
  1136.    * ByteBuffer increases. There should not be any performance degredation.
  1137.    * 
  1138.    * @see ReadableByteChannel#read(ByteBuffer)
  1139.    */
  1140.   private static int channelRead(ReadableByteChannel channel, 
  1141.                                  ByteBuffer buffer) throws IOException {
  1142.     
  1143.     return (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
  1144.            channel.read(buffer) : channelIO(channel, null, buffer);
  1145.   }
  1146.   
  1147.   /**
  1148.    * Helper for {@link #channelRead(ReadableByteChannel, ByteBuffer)}
  1149.    * and {@link #channelWrite(WritableByteChannel, ByteBuffer)}. Only
  1150.    * one of readCh or writeCh should be non-null.
  1151.    * 
  1152.    * @see #channelRead(ReadableByteChannel, ByteBuffer)
  1153.    * @see #channelWrite(WritableByteChannel, ByteBuffer)
  1154.    */
  1155.   private static int channelIO(ReadableByteChannel readCh, 
  1156.                                WritableByteChannel writeCh,
  1157.                                ByteBuffer buf) throws IOException {
  1158.     
  1159.     int originalLimit = buf.limit();
  1160.     int initialRemaining = buf.remaining();
  1161.     int ret = 0;
  1162.     
  1163.     while (buf.remaining() > 0) {
  1164.       try {
  1165.         int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
  1166.         buf.limit(buf.position() + ioSize);
  1167.         
  1168.         ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf); 
  1169.         
  1170.         if (ret < ioSize) {
  1171.           break;
  1172.         }
  1173.       } finally {
  1174.         buf.limit(originalLimit);        
  1175.       }
  1176.     }
  1177.     int nBytes = initialRemaining - buf.remaining(); 
  1178.     return (nBytes > 0) ? nBytes : ret;
  1179.   }      
  1180. }