HttpServer.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:17k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.http;
  19. import java.io.IOException;
  20. import java.io.PrintWriter;
  21. import java.net.BindException;
  22. import java.net.InetSocketAddress;
  23. import java.net.URL;
  24. import java.util.ArrayList;
  25. import java.util.HashMap;
  26. import java.util.List;
  27. import java.util.Map;
  28. import java.nio.channels.ServerSocketChannel;
  29. import javax.servlet.ServletException;
  30. import javax.servlet.http.HttpServlet;
  31. import javax.servlet.http.HttpServletRequest;
  32. import javax.servlet.http.HttpServletResponse;
  33. import org.apache.commons.logging.Log;
  34. import org.apache.commons.logging.LogFactory;
  35. import org.apache.hadoop.conf.Configuration;
  36. import org.apache.hadoop.log.LogLevel;
  37. import org.apache.hadoop.util.ReflectionUtils;
  38. import org.mortbay.jetty.Connector;
  39. import org.mortbay.jetty.Handler;
  40. import org.mortbay.jetty.Server;
  41. import org.mortbay.jetty.handler.ContextHandlerCollection;
  42. import org.mortbay.jetty.nio.SelectChannelConnector;
  43. import org.mortbay.jetty.security.SslSocketConnector;
  44. import org.mortbay.jetty.servlet.Context;
  45. import org.mortbay.jetty.servlet.DefaultServlet;
  46. import org.mortbay.jetty.servlet.FilterHolder;
  47. import org.mortbay.jetty.servlet.FilterMapping;
  48. import org.mortbay.jetty.servlet.ServletHandler;
  49. import org.mortbay.jetty.servlet.ServletHolder;
  50. import org.mortbay.jetty.webapp.WebAppContext;
  51. import org.mortbay.thread.QueuedThreadPool;
  52. import org.mortbay.util.MultiException;
  53. /**
  54.  * Create a Jetty embedded server to answer http requests. The primary goal
  55.  * is to serve up status information for the server.
  56.  * There are three contexts:
  57.  *   "/logs/" -> points to the log directory
  58.  *   "/static/" -> points to common static files (src/webapps/static)
  59.  *   "/" -> the jsp server code from (src/webapps/<name>)
  60.  */
  61. public class HttpServer implements FilterContainer {
  62.   public static final Log LOG = LogFactory.getLog(HttpServer.class);
  63.   static final String FILTER_INITIALIZER_PROPERTY
  64.       = "hadoop.http.filter.initializers";
  65.   protected final Server webServer;
  66.   protected final Connector listener;
  67.   protected final WebAppContext webAppContext;
  68.   protected final boolean findPort;
  69.   protected final Map<Context, Boolean> defaultContexts =
  70.       new HashMap<Context, Boolean>();
  71.   protected final List<String> filterNames = new ArrayList<String>();
  72.   private static final int MAX_RETRIES = 10;
  73.   /** Same as this(name, bindAddress, port, findPort, null); */
  74.   public HttpServer(String name, String bindAddress, int port, boolean findPort
  75.       ) throws IOException {
  76.     this(name, bindAddress, port, findPort, new Configuration());
  77.   }
  78.   /**
  79.    * Create a status server on the given port.
  80.    * The jsp scripts are taken from src/webapps/<name>.
  81.    * @param name The name of the server
  82.    * @param port The port to use on the server
  83.    * @param findPort whether the server should start at the given port and 
  84.    *        increment by 1 until it finds a free port.
  85.    * @param conf Configuration 
  86.    */
  87.   public HttpServer(String name, String bindAddress, int port,
  88.       boolean findPort, Configuration conf) throws IOException {
  89.     webServer = new Server();
  90.     this.findPort = findPort;
  91.     listener = createBaseListener(conf);
  92.     listener.setHost(bindAddress);
  93.     listener.setPort(port);
  94.     webServer.addConnector(listener);
  95.     webServer.setThreadPool(new QueuedThreadPool());
  96.     final String appDir = getWebAppsPath();
  97.     ContextHandlerCollection contexts = new ContextHandlerCollection();
  98.     webServer.setHandler(contexts);
  99.     webAppContext = new WebAppContext();
  100.     webAppContext.setContextPath("/");
  101.     webAppContext.setWar(appDir + "/" + name);
  102.     webServer.addHandler(webAppContext);
  103.     addDefaultApps(contexts, appDir);
  104.     final FilterInitializer[] initializers = getFilterInitializers(conf); 
  105.     if (initializers != null) {
  106.       for(FilterInitializer c : initializers) {
  107.         c.initFilter(this);
  108.       }
  109.     }
  110.     addDefaultServlets();
  111.   }
  112.   /**
  113.    * Create a required listener for the Jetty instance listening on the port
  114.    * provided. This wrapper and all subclasses must create at least one
  115.    * listener.
  116.    */
  117.   protected Connector createBaseListener(Configuration conf)
  118.       throws IOException {
  119.     SelectChannelConnector ret = new SelectChannelConnector();
  120.     ret.setLowResourceMaxIdleTime(10000);
  121.     ret.setAcceptQueueSize(128);
  122.     ret.setResolveNames(false);
  123.     ret.setUseDirectBuffers(false);
  124.     return ret;
  125.   }
  126.   /** Get an array of FilterConfiguration specified in the conf */
  127.   private static FilterInitializer[] getFilterInitializers(Configuration conf) {
  128.     if (conf == null) {
  129.       return null;
  130.     }
  131.     Class<?>[] classes = conf.getClasses(FILTER_INITIALIZER_PROPERTY);
  132.     if (classes == null) {
  133.       return null;
  134.     }
  135.     FilterInitializer[] initializers = new FilterInitializer[classes.length];
  136.     for(int i = 0; i < classes.length; i++) {
  137.       initializers[i] = (FilterInitializer)ReflectionUtils.newInstance(
  138.           classes[i], conf);
  139.     }
  140.     return initializers;
  141.   }
  142.   /**
  143.    * Add default apps.
  144.    * @param appDir The application directory
  145.    * @throws IOException
  146.    */
  147.   protected void addDefaultApps(ContextHandlerCollection parent,
  148.       final String appDir) throws IOException {
  149.     // set up the context for "/logs/" if "hadoop.log.dir" property is defined. 
  150.     String logDir = System.getProperty("hadoop.log.dir");
  151.     if (logDir != null) {
  152.       Context logContext = new Context(parent, "/logs");
  153.       logContext.setResourceBase(logDir);
  154.       logContext.addServlet(DefaultServlet.class, "/");
  155.       defaultContexts.put(logContext, true);
  156.     }
  157.     // set up the context for "/static/*"
  158.     Context staticContext = new Context(parent, "/static");
  159.     staticContext.setResourceBase(appDir + "/static");
  160.     staticContext.addServlet(DefaultServlet.class, "/*");
  161.     defaultContexts.put(staticContext, true);
  162.   }
  163.   
  164.   /**
  165.    * Add default servlets.
  166.    */
  167.   protected void addDefaultServlets() {
  168.     // set up default servlets
  169.     addServlet("stacks", "/stacks", StackServlet.class);
  170.     addServlet("logLevel", "/logLevel", LogLevel.Servlet.class);
  171.   }
  172.   public void addContext(Context ctxt, boolean isFiltered)
  173.       throws IOException {
  174.     webServer.addHandler(ctxt);
  175.     defaultContexts.put(ctxt, isFiltered);
  176.   }
  177.   /**
  178.    * Add a context 
  179.    * @param pathSpec The path spec for the context
  180.    * @param dir The directory containing the context
  181.    * @param isFiltered if true, the servlet is added to the filter path mapping 
  182.    * @throws IOException
  183.    */
  184.   protected void addContext(String pathSpec, String dir, boolean isFiltered) throws IOException {
  185.     if (0 == webServer.getHandlers().length) {
  186.       throw new RuntimeException("Couldn't find handler");
  187.     }
  188.     WebAppContext webAppCtx = new WebAppContext();
  189.     webAppCtx.setContextPath(pathSpec);
  190.     webAppCtx.setWar(dir);
  191.     addContext(webAppCtx, true);
  192.   }
  193.   /**
  194.    * Set a value in the webapp context. These values are available to the jsp
  195.    * pages as "application.getAttribute(name)".
  196.    * @param name The name of the attribute
  197.    * @param value The value of the attribute
  198.    */
  199.   public void setAttribute(String name, Object value) {
  200.     webAppContext.setAttribute(name, value);
  201.   }
  202.   /**
  203.    * Add a servlet in the server.
  204.    * @param name The name of the servlet (can be passed as null)
  205.    * @param pathSpec The path spec for the servlet
  206.    * @param clazz The servlet class
  207.    */
  208.   public void addServlet(String name, String pathSpec,
  209.       Class<? extends HttpServlet> clazz) {
  210.     addInternalServlet(name, pathSpec, clazz);
  211.     addFilterPathMapping(pathSpec, webAppContext);
  212.   }
  213.   /**
  214.    * Add an internal servlet in the server.
  215.    * @param name The name of the servlet (can be passed as null)
  216.    * @param pathSpec The path spec for the servlet
  217.    * @param clazz The servlet class
  218.    * @deprecated this is a temporary method
  219.    */
  220.   @Deprecated
  221.   public void addInternalServlet(String name, String pathSpec,
  222.       Class<? extends HttpServlet> clazz) {
  223.     ServletHolder holder = new ServletHolder(clazz);
  224.     if (name != null) {
  225.       holder.setName(name);
  226.     }
  227.     webAppContext.addServlet(holder, pathSpec);
  228.   }
  229.   /** {@inheritDoc} */
  230.   public void addFilter(String name, String classname,
  231.       Map<String, String> parameters) {
  232.     final String[] USER_FACING_URLS = { "*.html", "*.jsp" };
  233.     defineFilter(webAppContext, name, classname, parameters, USER_FACING_URLS);
  234.     final String[] ALL_URLS = { "/*" };
  235.     for (Map.Entry<Context, Boolean> e : defaultContexts.entrySet()) {
  236.       if (e.getValue()) {
  237.         Context ctx = e.getKey();
  238.         defineFilter(ctx, name, classname, parameters, ALL_URLS);
  239.         LOG.info("Added filter " + name + " (class=" + classname
  240.             + ") to context " + ctx.getDisplayName());
  241.       }
  242.     }
  243.     filterNames.add(name);
  244.   }
  245.   /** {@inheritDoc} */
  246.   public void addGlobalFilter(String name, String classname,
  247.       Map<String, String> parameters) {
  248.     final String[] ALL_URLS = { "/*" };
  249.     defineFilter(webAppContext, name, classname, parameters, ALL_URLS);
  250.     for (Context ctx : defaultContexts.keySet()) {
  251.       defineFilter(ctx, name, classname, parameters, ALL_URLS);
  252.     }
  253.     LOG.info("Added global filter" + name + " (class=" + classname + ")");
  254.   }
  255.   /**
  256.    * Define a filter for a context and set up default url mappings.
  257.    */
  258.   protected void defineFilter(Context ctx, String name,
  259.       String classname, Map<String,String> parameters, String[] urls) {
  260.     FilterHolder holder = new FilterHolder();
  261.     holder.setName(name);
  262.     holder.setClassName(classname);
  263.     holder.setInitParameters(parameters);
  264.     FilterMapping fmap = new FilterMapping();
  265.     fmap.setPathSpecs(urls);
  266.     fmap.setDispatches(Handler.ALL);
  267.     fmap.setFilterName(name);
  268.     ServletHandler handler = ctx.getServletHandler();
  269.     handler.addFilter(holder, fmap);
  270.   }
  271.   /**
  272.    * Add the path spec to the filter path mapping.
  273.    * @param pathSpec The path spec
  274.    * @param webAppCtx The WebApplicationContext to add to
  275.    */
  276.   protected void addFilterPathMapping(String pathSpec,
  277.       Context webAppCtx) {
  278.     ServletHandler handler = webAppCtx.getServletHandler();
  279.     for(String name : filterNames) {
  280.       FilterMapping fmap = new FilterMapping();
  281.       fmap.setPathSpec(pathSpec);
  282.       fmap.setFilterName(name);
  283.       fmap.setDispatches(Handler.ALL);
  284.       handler.addFilterMapping(fmap);
  285.     }
  286.   }
  287.   
  288.   /**
  289.    * Get the value in the webapp context.
  290.    * @param name The name of the attribute
  291.    * @return The value of the attribute
  292.    */
  293.   public Object getAttribute(String name) {
  294.     return webAppContext.getAttribute(name);
  295.   }
  296.   /**
  297.    * Get the pathname to the webapps files.
  298.    * @return the pathname as a URL
  299.    * @throws IOException if 'webapps' directory cannot be found on CLASSPATH.
  300.    */
  301.   protected String getWebAppsPath() throws IOException {
  302.     URL url = getClass().getClassLoader().getResource("webapps");
  303.     if (url == null) 
  304.       throw new IOException("webapps not found in CLASSPATH"); 
  305.     return url.toString();
  306.   }
  307.   /**
  308.    * Get the port that the server is on
  309.    * @return the port
  310.    */
  311.   public int getPort() {
  312.     return webServer.getConnectors()[0].getLocalPort();
  313.   }
  314.   /**
  315.    * Set the min, max number of worker threads (simultaneous connections).
  316.    */
  317.   public void setThreads(int min, int max) {
  318.     QueuedThreadPool pool = (QueuedThreadPool) webServer.getThreadPool() ;
  319.     pool.setMinThreads(min);
  320.     pool.setMaxThreads(max);
  321.   }
  322.   /**
  323.    * Configure an ssl listener on the server.
  324.    * @param addr address to listen on
  325.    * @param keystore location of the keystore
  326.    * @param storPass password for the keystore
  327.    * @param keyPass password for the key
  328.    * @deprecated Use {@link #addSslListener(InetSocketAddress, Configuration, boolean)}
  329.    */
  330.   @Deprecated
  331.   public void addSslListener(InetSocketAddress addr, String keystore,
  332.       String storPass, String keyPass) throws IOException {
  333.     if (webServer.isStarted()) {
  334.       throw new IOException("Failed to add ssl listener");
  335.     }
  336.     SslSocketConnector sslListener = new SslSocketConnector();
  337.     sslListener.setHost(addr.getHostName());
  338.     sslListener.setPort(addr.getPort());
  339.     sslListener.setKeystore(keystore);
  340.     sslListener.setPassword(storPass);
  341.     sslListener.setKeyPassword(keyPass);
  342.     webServer.addConnector(sslListener);
  343.   }
  344.   /**
  345.    * Configure an ssl listener on the server.
  346.    * @param addr address to listen on
  347.    * @param sslConf conf to retrieve ssl options
  348.    * @param needClientAuth whether client authentication is required
  349.    */
  350.   public void addSslListener(InetSocketAddress addr, Configuration sslConf,
  351.       boolean needClientAuth) throws IOException {
  352.     if (webServer.isStarted()) {
  353.       throw new IOException("Failed to add ssl listener");
  354.     }
  355.     if (needClientAuth) {
  356.       // setting up SSL truststore for authenticating clients
  357.       System.setProperty("javax.net.ssl.trustStore", sslConf.get(
  358.           "ssl.server.truststore.location", ""));
  359.       System.setProperty("javax.net.ssl.trustStorePassword", sslConf.get(
  360.           "ssl.server.truststore.password", ""));
  361.       System.setProperty("javax.net.ssl.trustStoreType", sslConf.get(
  362.           "ssl.server.truststore.type", "jks"));
  363.     }
  364.     SslSocketConnector sslListener = new SslSocketConnector();
  365.     sslListener.setHost(addr.getHostName());
  366.     sslListener.setPort(addr.getPort());
  367.     sslListener.setKeystore(sslConf.get("ssl.server.keystore.location"));
  368.     sslListener.setPassword(sslConf.get("ssl.server.keystore.password", ""));
  369.     sslListener.setKeyPassword(sslConf.get("ssl.server.keystore.keypassword", ""));
  370.     sslListener.setKeystoreType(sslConf.get("ssl.server.keystore.type", "jks"));
  371.     sslListener.setNeedClientAuth(needClientAuth);
  372.     webServer.addConnector(sslListener);
  373.   }
  374.   /**
  375.    * Start the server. Does not wait for the server to start.
  376.    */
  377.   public void start() throws IOException {
  378.     try {
  379.       int port = 0;
  380.       int oriPort = listener.getPort(); // The original requested port
  381.       while (true) {
  382.         try {
  383.           listener.open();
  384.           port = listener.getLocalPort();
  385.           //Workaround to handle the problem reported in HADOOP-4744
  386.           if (port < 0) {
  387.             Thread.sleep(100);
  388.             int numRetries = 1;
  389.             while (port < 0) {
  390.               LOG.warn("listener.getLocalPort returned " + port);
  391.               if (numRetries++ > MAX_RETRIES) {
  392.                 throw new Exception(" listener.getLocalPort is returning " +
  393.                  "less than 0 even after " +numRetries+" resets");
  394.               }
  395.               for (int i = 0; i < 2; i++) {
  396.                 LOG.info("Retrying listener.getLocalPort()");
  397.                 port = listener.getLocalPort();
  398.                 if (port > 0) {
  399.                   break;
  400.                 }
  401.                 Thread.sleep(200);
  402.               }
  403.               if (port > 0) {
  404.                 break;
  405.               }
  406.               LOG.info("Bouncing the listener");
  407.               listener.close();
  408.               Thread.sleep(1000);
  409.               listener.setPort(oriPort == 0 ? 0 : (oriPort += 1));
  410.               listener.open();
  411.               Thread.sleep(100);
  412.               port = listener.getLocalPort();
  413.             }
  414.           } //Workaround end
  415.           LOG.info("Jetty bound to port " + port);
  416.           webServer.start();
  417.           break;
  418.         } catch (IOException ex) {
  419.           // if this is a bind exception,
  420.           // then try the next port number.
  421.           if (ex instanceof BindException) {
  422.             if (!findPort) {
  423.               throw (BindException) ex;
  424.             }
  425.           } else {
  426.             LOG.info("HttpServer.start() threw a non Bind IOException"); 
  427.             throw ex;
  428.           }
  429.         } catch (MultiException ex) {
  430.           LOG.info("HttpServer.start() threw a MultiException"); 
  431.           throw ex;
  432.         }
  433.         listener.setPort((oriPort += 1));
  434.       }
  435.     } catch (IOException e) {
  436.       throw e;
  437.     } catch (Exception e) {
  438.       throw new IOException("Problem starting http server", e);
  439.     }
  440.   }
  441.   /**
  442.    * stop the server
  443.    */
  444.   public void stop() throws Exception {
  445.     listener.close();
  446.     webServer.stop();
  447.   }
  448.   public void join() throws InterruptedException {
  449.     webServer.join();
  450.   }
  451.   /**
  452.    * A very simple servlet to serve up a text representation of the current
  453.    * stack traces. It both returns the stacks to the caller and logs them.
  454.    * Currently the stack traces are done sequentially rather than exactly the
  455.    * same data.
  456.    */
  457.   public static class StackServlet extends HttpServlet {
  458.     private static final long serialVersionUID = -6284183679759467039L;
  459.     @Override
  460.     public void doGet(HttpServletRequest request, HttpServletResponse response)
  461.       throws ServletException, IOException {
  462.       
  463.       PrintWriter out = new PrintWriter(response.getOutputStream());
  464.       ReflectionUtils.printThreadInfo(out, "");
  465.       out.close();
  466.       ReflectionUtils.logThreadInfo(LOG, "jsp requested", 1);      
  467.     }
  468.   }
  469. }