NotificationTestCase.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:8k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import org.mortbay.jetty.Server;
  20. import org.mortbay.jetty.servlet.Context;
  21. import org.mortbay.jetty.servlet.ServletHolder;
  22. import org.apache.hadoop.fs.Path;
  23. import org.apache.hadoop.fs.FileSystem;
  24. import org.apache.hadoop.io.Text;
  25. import org.apache.hadoop.io.IntWritable;
  26. import javax.servlet.http.HttpServletRequest;
  27. import javax.servlet.http.HttpServletResponse;
  28. import javax.servlet.http.HttpServlet;
  29. import javax.servlet.ServletException;
  30. import java.io.IOException;
  31. import java.io.DataOutputStream;
  32. import java.util.Date;
  33. /**
  34.  * Base class to test Job end notification in local and cluster mode.
  35.  *
  36.  * Starts up hadoop on Local or Cluster mode (by extending of the
  37.  * HadoopTestCase class) and it starts a servlet engine that hosts
  38.  * a servlet that will receive the notification of job finalization.
  39.  *
  40.  * The notification servlet returns a HTTP 400 the first time is called
  41.  * and a HTTP 200 the second time, thus testing retry.
  42.  *
  43.  * In both cases local file system is used (this is irrelevant for
  44.  * the tested functionality)
  45.  *
  46.  * 
  47.  */
  48. public abstract class NotificationTestCase extends HadoopTestCase {
  49.   private static void stdPrintln(String s) {
  50.     //System.out.println(s);
  51.   }
  52.   protected NotificationTestCase(int mode) throws IOException {
  53.     super(mode, HadoopTestCase.LOCAL_FS, 1, 1);
  54.   }
  55.   private int port;
  56.   private String contextPath = "/notification";
  57.   private Class servletClass = NotificationServlet.class;
  58.   private String servletPath = "/mapred";
  59.   private Server webServer;
  60.   private void startHttpServer() throws Exception {
  61.     // Create the webServer
  62.     if (webServer != null) {
  63.       webServer.stop();
  64.       webServer = null;
  65.     }
  66.     webServer = new Server(0);
  67.     Context context = new Context(webServer, contextPath);
  68.     // create servlet handler
  69.     context.addServlet(new ServletHolder(new NotificationServlet()),
  70.                        servletPath);
  71.     // Start webServer
  72.     webServer.start();
  73.     port = webServer.getConnectors()[0].getLocalPort();
  74.   }
  75.   private void stopHttpServer() throws Exception {
  76.     if (webServer != null) {
  77.       webServer.stop();
  78.       webServer.destroy();
  79.       webServer = null;
  80.     }
  81.   }
  82.   public static class NotificationServlet extends HttpServlet {
  83.     public static int counter = 0;
  84.     protected void doGet(HttpServletRequest req, HttpServletResponse res)
  85.       throws ServletException, IOException {
  86.       switch (counter) {
  87.         case 0:
  88.         {
  89.           assertTrue(req.getQueryString().contains("SUCCEEDED"));
  90.         }
  91.         break;
  92.         case 2:
  93.         {
  94.           assertTrue(req.getQueryString().contains("KILLED"));
  95.         }
  96.         break;
  97.         case 4:
  98.         {
  99.           assertTrue(req.getQueryString().contains("FAILED"));
  100.         }
  101.         break;
  102.       }
  103.       if (counter % 2 == 0) {
  104.         stdPrintln((new Date()).toString() +
  105.                    "Receiving First notification for [" + req.getQueryString() +
  106.                    "], returning error");
  107.         res.sendError(HttpServletResponse.SC_BAD_REQUEST, "forcing error");
  108.       }
  109.       else {
  110.         stdPrintln((new Date()).toString() +
  111.                    "Receiving Second notification for [" + req.getQueryString() +
  112.                    "], returning OK");
  113.         res.setStatus(HttpServletResponse.SC_OK);
  114.       }
  115.       counter++;
  116.     }
  117.   }
  118.   private String getNotificationUrlTemplate() {
  119.     return "http://localhost:" + port + contextPath + servletPath +
  120.       "?jobId=$jobId&jobStatus=$jobStatus";
  121.   }
  122.   protected JobConf createJobConf() {
  123.     JobConf conf = super.createJobConf();
  124.     conf.setJobEndNotificationURI(getNotificationUrlTemplate());
  125.     conf.setInt("job.end.retry.attempts", 3);
  126.     conf.setInt("job.end.retry.interval", 200);
  127.     return conf;
  128.   }
  129.   protected void setUp() throws Exception {
  130.     super.setUp();
  131.     startHttpServer();
  132.   }
  133.   protected void tearDown() throws Exception {
  134.     stopHttpServer();
  135.     super.tearDown();
  136.   }
  137.   public void testMR() throws Exception {
  138.     System.out.println(launchWordCount(this.createJobConf(),
  139.                                        "a b c d e f g h", 1, 1));
  140.     synchronized(Thread.currentThread()) {
  141.       stdPrintln("Sleeping for 2 seconds to give time for retry");
  142.       Thread.currentThread().sleep(2000);
  143.     }
  144.     assertEquals(2, NotificationServlet.counter);
  145.     
  146.     Path inDir = new Path("notificationjob/input");
  147.     Path outDir = new Path("notificationjob/output");
  148.     // Hack for local FS that does not have the concept of a 'mounting point'
  149.     if (isLocalFS()) {
  150.       String localPathRoot = System.getProperty("test.build.data","/tmp")
  151.         .toString().replace(' ', '+');;
  152.       inDir = new Path(localPathRoot, inDir);
  153.       outDir = new Path(localPathRoot, outDir);
  154.     }
  155.     // run a job with KILLED status
  156.     System.out.println(UtilsForTests.runJobKill(this.createJobConf(), inDir,
  157.                                                 outDir).getID());
  158.     synchronized(Thread.currentThread()) {
  159.       stdPrintln("Sleeping for 2 seconds to give time for retry");
  160.       Thread.currentThread().sleep(2000);
  161.     }
  162.     assertEquals(4, NotificationServlet.counter);
  163.     
  164.     // run a job with FAILED status
  165.     System.out.println(UtilsForTests.runJobFail(this.createJobConf(), inDir,
  166.                                                 outDir).getID());
  167.     synchronized(Thread.currentThread()) {
  168.       stdPrintln("Sleeping for 2 seconds to give time for retry");
  169.       Thread.currentThread().sleep(2000);
  170.     }
  171.     assertEquals(6, NotificationServlet.counter);
  172.   }
  173.   private String launchWordCount(JobConf conf,
  174.                                  String input,
  175.                                  int numMaps,
  176.                                  int numReduces) throws IOException {
  177.     Path inDir = new Path("testing/wc/input");
  178.     Path outDir = new Path("testing/wc/output");
  179.     // Hack for local FS that does not have the concept of a 'mounting point'
  180.     if (isLocalFS()) {
  181.       String localPathRoot = System.getProperty("test.build.data","/tmp")
  182.         .toString().replace(' ', '+');;
  183.       inDir = new Path(localPathRoot, inDir);
  184.       outDir = new Path(localPathRoot, outDir);
  185.     }
  186.     FileSystem fs = FileSystem.get(conf);
  187.     fs.delete(outDir, true);
  188.     if (!fs.mkdirs(inDir)) {
  189.       throw new IOException("Mkdirs failed to create " + inDir.toString());
  190.     }
  191.     {
  192.       DataOutputStream file = fs.create(new Path(inDir, "part-0"));
  193.       file.writeBytes(input);
  194.       file.close();
  195.     }
  196.     conf.setJobName("wordcount");
  197.     conf.setInputFormat(TextInputFormat.class);
  198.     // the keys are words (strings)
  199.     conf.setOutputKeyClass(Text.class);
  200.     // the values are counts (ints)
  201.     conf.setOutputValueClass(IntWritable.class);
  202.     conf.setMapperClass(WordCount.MapClass.class);
  203.     conf.setCombinerClass(WordCount.Reduce.class);
  204.     conf.setReducerClass(WordCount.Reduce.class);
  205.     FileInputFormat.setInputPaths(conf, inDir);
  206.     FileOutputFormat.setOutputPath(conf, outDir);
  207.     conf.setNumMapTasks(numMaps);
  208.     conf.setNumReduceTasks(numReduces);
  209.     JobClient.runJob(conf);
  210.     return TestMiniMRWithDFS.readOutput(outDir, conf);
  211.   }
  212. }