TestIPCServerResponder.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:5k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.ipc;
  19. import java.io.IOException;
  20. import java.net.InetSocketAddress;
  21. import java.util.Random;
  22. import junit.framework.TestCase;
  23. import org.apache.commons.logging.Log;
  24. import org.apache.commons.logging.LogFactory;
  25. import org.apache.hadoop.conf.Configuration;
  26. import org.apache.hadoop.io.BytesWritable;
  27. import org.apache.hadoop.io.Writable;
  28. import org.apache.hadoop.net.NetUtils;
  29. /**
  30.  * This test provokes partial writes in the server, which is 
  31.  * serving multiple clients.
  32.  */
  33. public class TestIPCServerResponder extends TestCase {
  34.   public static final Log LOG = 
  35.             LogFactory.getLog(TestIPCServerResponder.class);
  36.   private static Configuration conf = new Configuration();
  37.   public TestIPCServerResponder(final String name) {
  38.     super(name);
  39.   }
  40.   private static final Random RANDOM = new Random();
  41.   private static final String ADDRESS = "0.0.0.0";
  42.   private static final int BYTE_COUNT = 1024;
  43.   private static final byte[] BYTES = new byte[BYTE_COUNT];
  44.   static {
  45.     for (int i = 0; i < BYTE_COUNT; i++)
  46.       BYTES[i] = (byte) ('a' + (i % 26));
  47.   }
  48.   private static class TestServer extends Server {
  49.     private boolean sleep;
  50.     public TestServer(final int handlerCount, final boolean sleep) 
  51.                                               throws IOException {
  52.       super(ADDRESS, 0, BytesWritable.class, handlerCount, conf);
  53.       // Set the buffer size to half of the maximum parameter/result size 
  54.       // to force the socket to block
  55.       this.setSocketSendBufSize(BYTE_COUNT / 2);
  56.       this.sleep = sleep;
  57.     }
  58.     @Override
  59.     public Writable call(Class<?> protocol, Writable param, long receiveTime)
  60.         throws IOException {
  61.       if (sleep) {
  62.         try {
  63.           Thread.sleep(RANDOM.nextInt(20)); // sleep a bit
  64.         } catch (InterruptedException e) {}
  65.       }
  66.       return param;
  67.     }
  68.   }
  69.   private static class Caller extends Thread {
  70.     private Client client;
  71.     private int count;
  72.     private InetSocketAddress address;
  73.     private boolean failed;
  74.     public Caller(final Client client, final InetSocketAddress address, 
  75.                                        final int count) {
  76.       this.client = client;
  77.       this.address = address;
  78.       this.count = count;
  79.     }
  80.     @Override
  81.     public void run() {
  82.       for (int i = 0; i < count; i++) {
  83.         try {
  84.           int byteSize = RANDOM.nextInt(BYTE_COUNT);
  85.           byte[] bytes = new byte[byteSize];
  86.           System.arraycopy(BYTES, 0, bytes, 0, byteSize);
  87.           Writable param = new BytesWritable(bytes);
  88.           Writable value = client.call(param, address);
  89.           Thread.sleep(RANDOM.nextInt(20));
  90.         } catch (Exception e) {
  91.           LOG.fatal("Caught: " + e);
  92.           failed = true;
  93.         }
  94.       }
  95.     }
  96.   }
  97.   public void testServerResponder() throws Exception {
  98.     testServerResponder(10, true, 1, 10, 200);
  99.   }
  100.   public void testServerResponder(final int handlerCount, 
  101.                                   final boolean handlerSleep, 
  102.                                   final int clientCount,
  103.                                   final int callerCount,
  104.                                   final int callCount) throws Exception {
  105.     Server server = new TestServer(handlerCount, handlerSleep);
  106.     server.start();
  107.     InetSocketAddress address = NetUtils.getConnectAddress(server);
  108.     Client[] clients = new Client[clientCount];
  109.     for (int i = 0; i < clientCount; i++) {
  110.       clients[i] = new Client(BytesWritable.class, conf);
  111.     }
  112.     Caller[] callers = new Caller[callerCount];
  113.     for (int i = 0; i < callerCount; i++) {
  114.       callers[i] = new Caller(clients[i % clientCount], address, callCount);
  115.       callers[i].start();
  116.     }
  117.     for (int i = 0; i < callerCount; i++) {
  118.       callers[i].join();
  119.       assertFalse(callers[i].failed);
  120.     }
  121.     for (int i = 0; i < clientCount; i++) {
  122.       clients[i].stop();
  123.     }
  124.     server.stop();
  125.   }
  126. }