TestRPC.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:11k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.ipc;
  19. import java.io.IOException;
  20. import java.net.ConnectException;
  21. import java.net.InetSocketAddress;
  22. import java.lang.reflect.Method;
  23. import junit.framework.TestCase;
  24. import java.util.Arrays;
  25. import org.apache.commons.logging.*;
  26. import org.apache.hadoop.conf.Configuration;
  27. import org.apache.hadoop.io.UTF8;
  28. import org.apache.hadoop.io.Writable;
  29. import org.apache.hadoop.net.NetUtils;
  30. import org.apache.hadoop.security.SecurityUtil;
  31. import org.apache.hadoop.security.authorize.AuthorizationException;
  32. import org.apache.hadoop.security.authorize.ConfiguredPolicy;
  33. import org.apache.hadoop.security.authorize.PolicyProvider;
  34. import org.apache.hadoop.security.authorize.Service;
  35. import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
  36. /** Unit tests for RPC. */
  37. public class TestRPC extends TestCase {
  38.   private static final String ADDRESS = "0.0.0.0";
  39.   public static final Log LOG =
  40.     LogFactory.getLog(TestRPC.class);
  41.   
  42.   private static Configuration conf = new Configuration();
  43.   int datasize = 1024*100;
  44.   int numThreads = 50;
  45.   public TestRPC(String name) { super(name); }
  46.   public interface TestProtocol extends VersionedProtocol {
  47.     public static final long versionID = 1L;
  48.     
  49.     void ping() throws IOException;
  50.     void slowPing(boolean shouldSlow) throws IOException;
  51.     String echo(String value) throws IOException;
  52.     String[] echo(String[] value) throws IOException;
  53.     Writable echo(Writable value) throws IOException;
  54.     int add(int v1, int v2) throws IOException;
  55.     int add(int[] values) throws IOException;
  56.     int error() throws IOException;
  57.     void testServerGet() throws IOException;
  58.     int[] exchange(int[] values) throws IOException;
  59.   }
  60.   public class TestImpl implements TestProtocol {
  61.     int fastPingCounter = 0;
  62.     
  63.     public long getProtocolVersion(String protocol, long clientVersion) {
  64.       return TestProtocol.versionID;
  65.     }
  66.     
  67.     public void ping() {}
  68.     public synchronized void slowPing(boolean shouldSlow) {
  69.       if (shouldSlow) {
  70.         while (fastPingCounter < 2) {
  71.           try {
  72.           wait();  // slow response until two fast pings happened
  73.           } catch (InterruptedException ignored) {}
  74.         }
  75.         fastPingCounter -= 2;
  76.       } else {
  77.         fastPingCounter++;
  78.         notify();
  79.       }
  80.     }
  81.     
  82.     public String echo(String value) throws IOException { return value; }
  83.     public String[] echo(String[] values) throws IOException { return values; }
  84.     public Writable echo(Writable writable) {
  85.       return writable;
  86.     }
  87.     public int add(int v1, int v2) {
  88.       return v1 + v2;
  89.     }
  90.     public int add(int[] values) {
  91.       int sum = 0;
  92.       for (int i = 0; i < values.length; i++) {
  93.         sum += values[i];
  94.       }
  95.       return sum;
  96.     }
  97.     public int error() throws IOException {
  98.       throw new IOException("bobo");
  99.     }
  100.     public void testServerGet() throws IOException {
  101.       if (!(Server.get() instanceof RPC.Server)) {
  102.         throw new IOException("Server.get() failed");
  103.       }
  104.     }
  105.     public int[] exchange(int[] values) {
  106.       for (int i = 0; i < values.length; i++) {
  107.         values[i] = i;
  108.       }
  109.       return values;
  110.     }
  111.   }
  112.   //
  113.   // an object that does a bunch of transactions
  114.   //
  115.   static class Transactions implements Runnable {
  116.     int datasize;
  117.     TestProtocol proxy;
  118.     Transactions(TestProtocol proxy, int datasize) {
  119.       this.proxy = proxy;
  120.       this.datasize = datasize;
  121.     }
  122.     // do two RPC that transfers data.
  123.     public void run() {
  124.       int[] indata = new int[datasize];
  125.       int[] outdata = null;
  126.       int val = 0;
  127.       try {
  128.         outdata = proxy.exchange(indata);
  129.         val = proxy.add(1,2);
  130.       } catch (IOException e) {
  131.         assertTrue("Exception from RPC exchange() "  + e, false);
  132.       }
  133.       assertEquals(indata.length, outdata.length);
  134.       assertEquals(val, 3);
  135.       for (int i = 0; i < outdata.length; i++) {
  136.         assertEquals(outdata[i], i);
  137.       }
  138.     }
  139.   }
  140.   //
  141.   // A class that does an RPC but does not read its response.
  142.   //
  143.   static class SlowRPC implements Runnable {
  144.     private TestProtocol proxy;
  145.     private volatile boolean done;
  146.    
  147.     SlowRPC(TestProtocol proxy) {
  148.       this.proxy = proxy;
  149.       done = false;
  150.     }
  151.     boolean isDone() {
  152.       return done;
  153.     }
  154.     public void run() {
  155.       try {
  156.         proxy.slowPing(true);   // this would hang until two fast pings happened
  157.         done = true;
  158.       } catch (IOException e) {
  159.         assertTrue("SlowRPC ping exception " + e, false);
  160.       }
  161.     }
  162.   }
  163.   public void testSlowRpc() throws Exception {
  164.     System.out.println("Testing Slow RPC");
  165.     // create a server with two handlers
  166.     Server server = RPC.getServer(new TestImpl(), ADDRESS, 0, 2, false, conf);
  167.     TestProtocol proxy = null;
  168.     
  169.     try {
  170.     server.start();
  171.     InetSocketAddress addr = NetUtils.getConnectAddress(server);
  172.     // create a client
  173.     proxy = (TestProtocol)RPC.getProxy(
  174.         TestProtocol.class, TestProtocol.versionID, addr, conf);
  175.     SlowRPC slowrpc = new SlowRPC(proxy);
  176.     Thread thread = new Thread(slowrpc, "SlowRPC");
  177.     thread.start(); // send a slow RPC, which won't return until two fast pings
  178.     assertTrue("Slow RPC should not have finished1.", !slowrpc.isDone());
  179.     proxy.slowPing(false); // first fast ping
  180.     
  181.     // verify that the first RPC is still stuck
  182.     assertTrue("Slow RPC should not have finished2.", !slowrpc.isDone());
  183.     proxy.slowPing(false); // second fast ping
  184.     
  185.     // Now the slow ping should be able to be executed
  186.     while (!slowrpc.isDone()) {
  187.       System.out.println("Waiting for slow RPC to get done.");
  188.       try {
  189.         Thread.sleep(1000);
  190.       } catch (InterruptedException e) {}
  191.     }
  192.     } finally {
  193.       server.stop();
  194.       if (proxy != null) {
  195.         RPC.stopProxy(proxy);
  196.       }
  197.       System.out.println("Down slow rpc testing");
  198.     }
  199.   }
  200.   public void testCalls() throws Exception {
  201.     Server server = RPC.getServer(new TestImpl(), ADDRESS, 0, conf);
  202.     TestProtocol proxy = null;
  203.     try {
  204.     server.start();
  205.     InetSocketAddress addr = NetUtils.getConnectAddress(server);
  206.     proxy = (TestProtocol)RPC.getProxy(
  207.         TestProtocol.class, TestProtocol.versionID, addr, conf);
  208.       
  209.     proxy.ping();
  210.     String stringResult = proxy.echo("foo");
  211.     assertEquals(stringResult, "foo");
  212.     stringResult = proxy.echo((String)null);
  213.     assertEquals(stringResult, null);
  214.     String[] stringResults = proxy.echo(new String[]{"foo","bar"});
  215.     assertTrue(Arrays.equals(stringResults, new String[]{"foo","bar"}));
  216.     stringResults = proxy.echo((String[])null);
  217.     assertTrue(Arrays.equals(stringResults, null));
  218.     UTF8 utf8Result = (UTF8)proxy.echo(new UTF8("hello world"));
  219.     assertEquals(utf8Result, new UTF8("hello world"));
  220.     utf8Result = (UTF8)proxy.echo((UTF8)null);
  221.     assertEquals(utf8Result, null);
  222.     int intResult = proxy.add(1, 2);
  223.     assertEquals(intResult, 3);
  224.     intResult = proxy.add(new int[] {1, 2});
  225.     assertEquals(intResult, 3);
  226.     boolean caught = false;
  227.     try {
  228.       proxy.error();
  229.     } catch (IOException e) {
  230.       LOG.debug("Caught " + e);
  231.       caught = true;
  232.     }
  233.     assertTrue(caught);
  234.     proxy.testServerGet();
  235.     // create multiple threads and make them do large data transfers
  236.     System.out.println("Starting multi-threaded RPC test...");
  237.     server.setSocketSendBufSize(1024);
  238.     Thread threadId[] = new Thread[numThreads];
  239.     for (int i = 0; i < numThreads; i++) {
  240.       Transactions trans = new Transactions(proxy, datasize);
  241.       threadId[i] = new Thread(trans, "TransactionThread-" + i);
  242.       threadId[i].start();
  243.     }
  244.     // wait for all transactions to get over
  245.     System.out.println("Waiting for all threads to finish RPCs...");
  246.     for (int i = 0; i < numThreads; i++) {
  247.       try {
  248.         threadId[i].join();
  249.       } catch (InterruptedException e) {
  250.         i--;      // retry
  251.       }
  252.     }
  253.     // try some multi-calls
  254.     Method echo =
  255.       TestProtocol.class.getMethod("echo", new Class[] { String.class });
  256.     String[] strings = (String[])RPC.call(echo, new String[][]{{"a"},{"b"}},
  257.                                           new InetSocketAddress[] {addr, addr}, conf);
  258.     assertTrue(Arrays.equals(strings, new String[]{"a","b"}));
  259.     Method ping = TestProtocol.class.getMethod("ping", new Class[] {});
  260.     Object[] voids = (Object[])RPC.call(ping, new Object[][]{{},{}},
  261.                                         new InetSocketAddress[] {addr, addr}, conf);
  262.     assertEquals(voids, null);
  263.     } finally {
  264.       server.stop();
  265.       if(proxy!=null) RPC.stopProxy(proxy);
  266.     }
  267.   }
  268.   
  269.   public void testStandaloneClient() throws IOException {
  270.     try {
  271.       RPC.waitForProxy(TestProtocol.class,
  272.         TestProtocol.versionID, new InetSocketAddress(ADDRESS, 20), conf, 15000L);
  273.       fail("We should not have reached here");
  274.     } catch (ConnectException ioe) {
  275.       //this is what we expected
  276.     }
  277.   }
  278.   
  279.   private static final String ACL_CONFIG = "test.protocol.acl";
  280.   
  281.   private static class TestPolicyProvider extends PolicyProvider {
  282.     @Override
  283.     public Service[] getServices() {
  284.       return new Service[] { new Service(ACL_CONFIG, TestProtocol.class) };
  285.     }
  286.     
  287.   }
  288.   
  289.   private void doRPCs(Configuration conf, boolean expectFailure) throws Exception {
  290.     SecurityUtil.setPolicy(new ConfiguredPolicy(conf, new TestPolicyProvider()));
  291.     
  292.     Server server = RPC.getServer(new TestImpl(), ADDRESS, 0, 5, true, conf);
  293.     TestProtocol proxy = null;
  294.     server.start();
  295.     InetSocketAddress addr = NetUtils.getConnectAddress(server);
  296.     
  297.     try {
  298.       proxy = (TestProtocol)RPC.getProxy(
  299.           TestProtocol.class, TestProtocol.versionID, addr, conf);
  300.       proxy.ping();
  301.       if (expectFailure) {
  302.         fail("Expect RPC.getProxy to fail with AuthorizationException!");
  303.       }
  304.     } catch (RemoteException e) {
  305.       if (expectFailure) {
  306.         assertTrue(e.unwrapRemoteException() instanceof AuthorizationException);
  307.       } else {
  308.         throw e;
  309.       }
  310.     } finally {
  311.       server.stop();
  312.       if (proxy != null) {
  313.         RPC.stopProxy(proxy);
  314.       }
  315.     }
  316.   }
  317.   
  318.   public void testAuthorization() throws Exception {
  319.     Configuration conf = new Configuration();
  320.     conf.setBoolean(
  321.         ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, true);
  322.     
  323.     // Expect to succeed
  324.     conf.set(ACL_CONFIG, "*");
  325.     doRPCs(conf, false);
  326.     
  327.     // Reset authorization to expect failure
  328.     conf.set(ACL_CONFIG, "invalid invalid");
  329.     doRPCs(conf, true);
  330.   }
  331.   
  332.   public static void main(String[] args) throws Exception {
  333.     new TestRPC("test").testCalls();
  334.   }
  335. }