TestMRServerPorts.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:6k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.IOException;
  20. import junit.framework.TestCase;
  21. import org.apache.hadoop.hdfs.TestHDFSServerPorts;
  22. import org.apache.hadoop.hdfs.server.datanode.DataNode;
  23. import org.apache.hadoop.hdfs.server.namenode.NameNode;
  24. import org.apache.hadoop.fs.FileSystem;
  25. /**
  26.  * This test checks correctness of port usage by mapreduce components:
  27.  * JobTracker, and TaskTracker.
  28.  * 
  29.  * The correct behavior is:<br> 
  30.  * - when a specific port is provided the server must either start on that port 
  31.  * or fail by throwing {@link java.net.BindException}.<br>
  32.  * - if the port = 0 (ephemeral) then the server should choose 
  33.  * a free port and start on it.
  34.  */
  35. public class TestMRServerPorts extends TestCase {
  36.   TestHDFSServerPorts hdfs = new TestHDFSServerPorts();
  37.   // Runs the JT in a separate thread
  38.   private static class JTRunner extends Thread {
  39.     JobTracker jt;
  40.     void setJobTracker(JobTracker jt) {
  41.       this.jt = jt;
  42.     }
  43.     public void run() {
  44.       if (jt != null) {
  45.         try {
  46.           jt.offerService();
  47.         } catch (Exception ioe) {}
  48.       }
  49.     }
  50.   }
  51.   /**
  52.    * Check whether the JobTracker can be started.
  53.    */
  54.   private JobTracker startJobTracker(JobConf conf, JTRunner runner) 
  55.   throws IOException {
  56.     conf.set("mapred.job.tracker", "localhost:0");
  57.     conf.set("mapred.job.tracker.http.address", "0.0.0.0:0");
  58.     JobTracker jt = null;
  59.     try {
  60.       jt = JobTracker.startTracker(conf);
  61.       runner.setJobTracker(jt);
  62.       runner.start();
  63.       conf.set("mapred.job.tracker", "localhost:" + jt.getTrackerPort());
  64.       conf.set("mapred.job.tracker.http.address", 
  65.                             "0.0.0.0:" + jt.getInfoPort());
  66.     } catch(InterruptedException e) {
  67.       throw new IOException(e.getLocalizedMessage());
  68.     }
  69.     return jt;
  70.   }
  71.   /**
  72.    * Check whether the JobTracker can be started.
  73.    */
  74.   private boolean canStartJobTracker(JobConf conf) 
  75.   throws IOException, InterruptedException {
  76.     JobTracker jt = null;
  77.     try {
  78.       jt = JobTracker.startTracker(conf);
  79.     } catch(IOException e) {
  80.       if (e instanceof java.net.BindException)
  81.         return false;
  82.       throw e;
  83.     }
  84.     jt.fs.close();
  85.     jt.stopTracker();
  86.     return true;
  87.   }
  88.   /**
  89.    * Check whether the TaskTracker can be started.
  90.    */
  91.   private boolean canStartTaskTracker(JobConf conf) 
  92.   throws IOException, InterruptedException {
  93.     TaskTracker tt = null;
  94.     try {
  95.       tt = new TaskTracker(conf);
  96.     } catch(IOException e) {
  97.       if (e instanceof java.net.BindException)
  98.         return false;
  99.       throw e;
  100.     }
  101.     tt.shutdown();
  102.     return true;
  103.   }
  104.   /**
  105.    * Verify JobTracker port usage.
  106.    */
  107.   public void testJobTrackerPorts() throws Exception {
  108.     NameNode nn = null;
  109.     DataNode dn = null;
  110.     try {
  111.       nn = hdfs.startNameNode();
  112.       dn = hdfs.startDataNode(1, hdfs.getConfig());
  113.       // start job tracker on the same port as name-node
  114.       JobConf conf2 = new JobConf(hdfs.getConfig());
  115.       conf2.set("mapred.job.tracker",
  116.                 FileSystem.getDefaultUri(hdfs.getConfig()).toString());
  117.       conf2.set("mapred.job.tracker.http.address",
  118.         TestHDFSServerPorts.NAME_NODE_HTTP_HOST + 0);
  119.       boolean started = canStartJobTracker(conf2);
  120.       assertFalse(started); // should fail
  121.       // bind http server to the same port as name-node
  122.       conf2.set("mapred.job.tracker", TestHDFSServerPorts.NAME_NODE_HOST + 0);
  123.       conf2.set("mapred.job.tracker.http.address",
  124.         hdfs.getConfig().get("dfs.http.address"));
  125.       started = canStartJobTracker(conf2);
  126.       assertFalse(started); // should fail again
  127.       // both ports are different from the name-node ones
  128.       conf2.set("mapred.job.tracker", TestHDFSServerPorts.NAME_NODE_HOST + 0);
  129.       conf2.set("mapred.job.tracker.http.address",
  130.         TestHDFSServerPorts.NAME_NODE_HTTP_HOST + 0);
  131.       started = canStartJobTracker(conf2);
  132.       assertTrue(started); // should start now
  133.     } finally {
  134.       hdfs.stopDataNode(dn);
  135.       hdfs.stopNameNode(nn);
  136.     }
  137.   }
  138.   /**
  139.    * Verify JobTracker port usage.
  140.    */
  141.   public void testTaskTrackerPorts() throws Exception {
  142.     NameNode nn = null;
  143.     DataNode dn = null;
  144.     JobTracker jt = null;
  145.     JTRunner runner = null;
  146.     try {
  147.       nn = hdfs.startNameNode();
  148.       dn = hdfs.startDataNode(2, hdfs.getConfig());
  149.       JobConf conf2 = new JobConf(hdfs.getConfig());
  150.       runner = new JTRunner();
  151.       jt = startJobTracker(conf2, runner);
  152.       // start job tracker on the same port as name-node
  153.       conf2.set("mapred.task.tracker.report.address",
  154.                 FileSystem.getDefaultUri(hdfs.getConfig()).toString());
  155.       conf2.set("mapred.task.tracker.http.address",
  156.         TestHDFSServerPorts.NAME_NODE_HTTP_HOST + 0);
  157.       boolean started = canStartTaskTracker(conf2);
  158.       assertFalse(started); // should fail
  159.       // bind http server to the same port as name-node
  160.       conf2.set("mapred.task.tracker.report.address",
  161.         TestHDFSServerPorts.NAME_NODE_HOST + 0);
  162.       conf2.set("mapred.task.tracker.http.address",
  163.         hdfs.getConfig().get("dfs.http.address"));
  164.       started = canStartTaskTracker(conf2);
  165.       assertFalse(started); // should fail again
  166.       // both ports are different from the name-node ones
  167.       conf2.set("mapred.task.tracker.report.address",
  168.         TestHDFSServerPorts.NAME_NODE_HOST + 0);
  169.       conf2.set("mapred.task.tracker.http.address",
  170.         TestHDFSServerPorts.NAME_NODE_HTTP_HOST + 0);
  171.       started = canStartTaskTracker(conf2);
  172.       assertTrue(started); // should start now
  173.     } finally {
  174.       if (jt != null) {
  175.         jt.fs.close();
  176.         jt.stopTracker();
  177.         runner.interrupt();
  178.         runner.join();
  179.       }
  180.       hdfs.stopDataNode(dn);
  181.       hdfs.stopNameNode(nn);
  182.     }
  183.   }
  184. }