HdfsProxy.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:9k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfsproxy;
  19. import java.io.IOException;
  20. import java.net.InetSocketAddress;
  21. import java.net.URI;
  22. import java.net.URISyntaxException;
  23. import java.net.URL;
  24. import java.util.Set;
  25. import javax.net.ssl.HttpsURLConnection;
  26. import javax.net.ssl.HostnameVerifier;
  27. import javax.net.ssl.SSLSession;
  28. import javax.servlet.http.HttpServletResponse;
  29. import org.apache.commons.logging.Log;
  30. import org.apache.commons.logging.LogFactory;
  31. import org.apache.hadoop.conf.Configuration;
  32. import org.apache.hadoop.net.NetUtils;
  33. import org.apache.hadoop.util.HostsFileReader;
  34. import org.apache.hadoop.util.StringUtils;
  35. /**
  36.  * A HTTPS/SSL proxy to HDFS, implementing certificate based access control.
  37.  */
  38. public class HdfsProxy {
  39.   public static final Log LOG = LogFactory.getLog(HdfsProxy.class);
  40.   private ProxyHttpServer server;
  41.   private InetSocketAddress sslAddr;
  42.   
  43.   /** Construct a proxy from the given configuration */
  44.   public HdfsProxy(Configuration conf) throws IOException {
  45.     try {
  46.       initialize(conf);
  47.     } catch (IOException e) {
  48.       this.stop();
  49.       throw e;
  50.     }
  51.   }
  52.   
  53.   private void initialize(Configuration conf) throws IOException {
  54.     sslAddr = getSslAddr(conf);
  55.     String nn = conf.get("hdfsproxy.dfs.namenode.address");
  56.     if (nn == null)
  57.       throw new IOException("HDFS NameNode address is not specified");
  58.     InetSocketAddress nnAddr = NetUtils.createSocketAddr(nn);
  59.     LOG.info("HDFS NameNode is at: " + nnAddr.getHostName() + ":" + nnAddr.getPort());
  60.     Configuration sslConf = new Configuration(false);
  61.     sslConf.addResource(conf.get("hdfsproxy.https.server.keystore.resource",
  62.         "ssl-server.xml"));
  63.     // unit testing
  64.     sslConf.set("proxy.http.test.listener.addr",
  65.                 conf.get("proxy.http.test.listener.addr"));
  66.     this.server = new ProxyHttpServer(sslAddr, sslConf);
  67.     this.server.setAttribute("proxy.https.port", server.getPort());
  68.     this.server.setAttribute("name.node.address", nnAddr);
  69.     this.server.setAttribute("name.conf", new Configuration());
  70.     this.server.addGlobalFilter("ProxyFilter", ProxyFilter.class.getName(), null);
  71.     this.server.addServlet("listPaths", "/listPaths/*", ProxyListPathsServlet.class);
  72.     this.server.addServlet("data", "/data/*", ProxyFileDataServlet.class);
  73.     this.server.addServlet("streamFile", "/streamFile/*", ProxyStreamFile.class);
  74.   }
  75.   /** return the http port if any, only for testing purposes */
  76.   int getPort() throws IOException {
  77.     return server.getPort();
  78.   }
  79.   
  80.   /**
  81.    * Start the server.
  82.    */
  83.   public void start() throws IOException {
  84.     this.server.start();
  85.     LOG.info("HdfsProxy server up at: " + sslAddr.getHostName() + ":"
  86.         + sslAddr.getPort());
  87.   }
  88.   
  89.   /**
  90.    * Stop all server threads and wait for all to finish.
  91.    */
  92.   public void stop() {
  93.     try {
  94.       if (server != null) {
  95.         server.stop();
  96.         server.join();
  97.       }
  98.     } catch (Exception e) {
  99.       LOG.warn("Got exception shutting down proxy", e);
  100.     }
  101.   }
  102.   
  103.   /**
  104.    * Wait for service to finish.
  105.    * (Normally, it runs forever.)
  106.    */
  107.   public void join() {
  108.     try {
  109.       this.server.join();
  110.     } catch (InterruptedException ie) {
  111.     }
  112.   }
  113.   
  114.   private static enum StartupOption {
  115.     RELOAD("-reloadPermFiles"), CLEAR("-clearUgiCache"), REGULAR("-regular");
  116.     private String name = null;
  117.     private StartupOption(String arg) {
  118.       this.name = arg;
  119.     }
  120.     public String getName() {
  121.       return name;
  122.     }
  123.   }
  124.   private static void printUsage() {
  125.     System.err.println("Usage: hdfsproxy ["
  126.         + StartupOption.RELOAD.getName() + "] | ["
  127.         + StartupOption.CLEAR.getName() + "]");
  128.   }
  129.   private static StartupOption parseArguments(String args[]) {
  130.     int argsLen = (args == null) ? 0 : args.length;
  131.     StartupOption startOpt = StartupOption.REGULAR;
  132.     for (int i = 0; i < argsLen; i++) {
  133.       String cmd = args[i];
  134.       if (StartupOption.RELOAD.getName().equalsIgnoreCase(cmd)) {
  135.         startOpt = StartupOption.RELOAD;
  136.       } else if (StartupOption.CLEAR.getName().equalsIgnoreCase(cmd)) {
  137.         startOpt = StartupOption.CLEAR;
  138.       } else if (StartupOption.REGULAR.getName().equalsIgnoreCase(cmd)) {
  139.         startOpt = StartupOption.REGULAR;
  140.       } else
  141.         return null;
  142.     }
  143.     return startOpt;
  144.   }
  145.   /**
  146.    * Dummy hostname verifier that is used to bypass hostname checking
  147.    */
  148.   private static class DummyHostnameVerifier implements HostnameVerifier {
  149.     public boolean verify(String hostname, SSLSession session) {
  150.       return true;
  151.     }
  152.   }
  153.   private static HttpsURLConnection openConnection(String hostname, int port,
  154.       String path) throws IOException {
  155.     try {
  156.       final URL url = new URI("https", null, hostname, port, path, null, null)
  157.           .toURL();
  158.       HttpsURLConnection conn = (HttpsURLConnection) url.openConnection();
  159.       // bypass hostname verification
  160.       conn.setHostnameVerifier(new DummyHostnameVerifier());
  161.       conn.setRequestMethod("GET");
  162.       return conn;
  163.     } catch (URISyntaxException e) {
  164.       throw (IOException) new IOException().initCause(e);
  165.     }
  166.   }
  167.   private static void setupSslProps(Configuration conf) {
  168.     Configuration sslConf = new Configuration(false);
  169.     sslConf.addResource(conf.get("hdfsproxy.https.server.keystore.resource",
  170.         "ssl-server.xml"));
  171.     System.setProperty("javax.net.ssl.trustStore", sslConf
  172.         .get("ssl.server.truststore.location"));
  173.     System.setProperty("javax.net.ssl.trustStorePassword", sslConf.get(
  174.         "ssl.server.truststore.password", ""));
  175.     System.setProperty("javax.net.ssl.trustStoreType", sslConf.get(
  176.         "ssl.server.truststore.type", "jks"));
  177.     System.setProperty("javax.net.ssl.keyStore", sslConf
  178.         .get("ssl.server.keystore.location"));
  179.     System.setProperty("javax.net.ssl.keyStorePassword", sslConf.get(
  180.         "ssl.server.keystore.password", ""));
  181.     System.setProperty("javax.net.ssl.keyPassword", sslConf.get(
  182.         "ssl.server.keystore.keypassword", ""));
  183.     System.setProperty("javax.net.ssl.keyStoreType", sslConf.get(
  184.         "ssl.server.keystore.type", "jks"));
  185.   }
  186.   static InetSocketAddress getSslAddr(Configuration conf) throws IOException {
  187.     String addr = conf.get("hdfsproxy.https.address");
  188.     if (addr == null)
  189.       throw new IOException("HdfsProxy address is not specified");
  190.     return NetUtils.createSocketAddr(addr);
  191.   }
  192.   private static boolean sendCommand(Configuration conf, String path)
  193.       throws IOException {
  194.     setupSslProps(conf);
  195.     int sslPort = getSslAddr(conf).getPort();
  196.     int err = 0;
  197.     StringBuilder b = new StringBuilder();
  198.     HostsFileReader hostsReader = new HostsFileReader(conf.get("hdfsproxy.hosts",
  199.         "hdfsproxy-hosts"), "");
  200.     Set<String> hostsList = hostsReader.getHosts();
  201.     for (String hostname : hostsList) {
  202.       HttpsURLConnection connection = null;
  203.       try {
  204.         connection = openConnection(hostname, sslPort, path);
  205.         connection.connect();
  206.         if (connection.getResponseCode() != HttpServletResponse.SC_OK) {
  207.           b.append("nt" + hostname + ": " + connection.getResponseCode()
  208.               + " " + connection.getResponseMessage());
  209.           err++;
  210.         }
  211.       } catch (IOException e) {
  212.         b.append("nt" + hostname + ": " + e.getLocalizedMessage());
  213.         err++;
  214.       } finally {
  215.         if (connection != null)
  216.           connection.disconnect();
  217.       }
  218.     }
  219.     if (err > 0) {
  220.       System.err.print("Command failed on the following "
  221.           + err + " host" + (err==1?":":"s:") + b.toString() + "n");
  222.       return true;
  223.     }
  224.     return false;
  225.   }
  226.   public static HdfsProxy createHdfsProxy(String argv[], Configuration conf)
  227.       throws IOException {
  228.     if (conf == null) {
  229.       conf = new Configuration(false);
  230.       conf.addResource("hdfsproxy-default.xml");
  231.     }
  232.     StartupOption startOpt = parseArguments(argv);
  233.     if (startOpt == null) {
  234.       printUsage();
  235.       return null;
  236.     }
  237.     switch (startOpt) {
  238.     case RELOAD:
  239.       boolean error = sendCommand(conf, "/reloadPermFiles");
  240.       System.exit(error ? 1 : 0);
  241.     case CLEAR:
  242.       error = sendCommand(conf, "/clearUgiCache");
  243.       System.exit(error ? 1 : 0);
  244.     default:
  245.     }
  246.     StringUtils.startupShutdownMessage(HdfsProxy.class, argv, LOG);
  247.     HdfsProxy proxy = new HdfsProxy(conf);
  248.     //proxy.addSslListener(conf);
  249.     proxy.start();
  250.     return proxy;
  251.   }
  252.   public static void main(String[] argv) throws Exception {
  253.     try {
  254.       HdfsProxy proxy = createHdfsProxy(argv, null);
  255.       if (proxy != null)
  256.         proxy.join();
  257.     } catch (Throwable e) {
  258.       LOG.error(StringUtils.stringifyException(e));
  259.       System.exit(-1);
  260.     }
  261.   }
  262. }