TestJobInProgressListener.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:14k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.util.ArrayList;
  20. import java.io.File;
  21. import java.io.IOException;
  22. import java.util.List;
  23. import org.apache.hadoop.hdfs.MiniDFSCluster;
  24. import org.apache.hadoop.fs.Path;
  25. import org.apache.hadoop.fs.FileSystem;
  26. import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
  27. import org.apache.commons.logging.Log;
  28. import org.apache.commons.logging.LogFactory;
  29. import junit.framework.TestCase;
  30. /**
  31.  * Test whether the JobInProgressListeners are informed as expected.
  32.  */
  33. public class TestJobInProgressListener extends TestCase {
  34.   private static final Log LOG = 
  35.     LogFactory.getLog(TestJobInProgressListener.class);
  36.   private final Path testDir = new Path("test-jip-listener-update");
  37.   
  38.   private static String TEST_ROOT_DIR = new File(System.getProperty(
  39.           "test.build.data", "/tmp")).toURI().toString().replace(' ', '+');
  40.   private JobConf configureJob(JobConf conf, int m, int r, 
  41.                                Path inDir, Path outputDir,
  42.                                String mapSignalFile, String redSignalFile) 
  43.   throws IOException {
  44.     UtilsForTests.configureWaitingJobConf(conf, inDir, outputDir,  m, r, 
  45.         "job-listener-test", mapSignalFile, redSignalFile);
  46.     return conf; 
  47.   }
  48.   
  49.   /**
  50.    * This test case tests if external updates to JIP do not result into 
  51.    * undesirable effects
  52.    * Test is as follows
  53.    *   - submit 2 jobs of normal priority. job1 is a waiting job which waits and
  54.    *     blocks the cluster
  55.    *   - change one parameter of job2 such that the job bumps up in the queue
  56.    *   - check if the queue looks ok
  57.    *   
  58.    */
  59.   public void testJobQueueChanges() throws IOException {
  60.     LOG.info("Testing job queue changes");
  61.     JobConf conf = new JobConf();
  62.     MiniDFSCluster dfs = new MiniDFSCluster(conf, 1, true, null, null);
  63.     dfs.waitActive();
  64.     FileSystem fileSys = dfs.getFileSystem();
  65.     
  66.     dfs.startDataNodes(conf, 1, true, null, null, null, null);
  67.     dfs.waitActive();
  68.     
  69.     String namenode = (dfs.getFileSystem()).getUri().getHost() + ":" 
  70.                       + (dfs.getFileSystem()).getUri().getPort();
  71.     MiniMRCluster mr = new MiniMRCluster(1, namenode, 1);
  72.     JobClient jobClient = new JobClient(mr.createJobConf());
  73.     
  74.     // clean up
  75.     fileSys.delete(testDir, true);
  76.     
  77.     if (!fileSys.mkdirs(testDir)) {
  78.       throw new IOException("Mkdirs failed to create " + testDir.toString());
  79.     }
  80.     // Write the input file
  81.     Path inDir = new Path(testDir, "input");
  82.     Path shareDir = new Path(testDir, "share");
  83.     String mapSignalFile = UtilsForTests.getMapSignalFile(shareDir);
  84.     String redSignalFile = UtilsForTests.getReduceSignalFile(shareDir);
  85.     UtilsForTests.writeFile(dfs.getNameNode(), conf, new Path(inDir + "/file"), 
  86.                             (short)1);
  87.     
  88.     JobQueueJobInProgressListener myListener = 
  89.       new JobQueueJobInProgressListener();
  90.     
  91.     // add the listener
  92.     mr.getJobTrackerRunner().getJobTracker()
  93.       .addJobInProgressListener(myListener);
  94.     
  95.     // big blocking job
  96.     Path outputDir = new Path(testDir, "output");
  97.     Path newOutputDir = outputDir.suffix("0");
  98.     JobConf job1 = configureJob(mr.createJobConf(), 10, 0, inDir, newOutputDir,
  99.                                 mapSignalFile, redSignalFile);
  100.     
  101.     // short blocked job
  102.     newOutputDir = outputDir.suffix("1");
  103.     JobConf job2 = configureJob(mr.createJobConf(), 1, 0, inDir, newOutputDir,
  104.                                 mapSignalFile, redSignalFile);
  105.     
  106.     RunningJob rJob1 = jobClient.submitJob(job1);
  107.     LOG.info("Running job " + rJob1.getID().toString());
  108.     
  109.     RunningJob rJob2 = jobClient.submitJob(job2);
  110.     LOG.info("Running job " + rJob2.getID().toString());
  111.     
  112.     // I. Check job-priority change
  113.     LOG.info("Testing job priority changes");
  114.     
  115.     // bump up job2's priority
  116.     LOG.info("Increasing job2's priority to HIGH");
  117.     rJob2.setJobPriority("HIGH");
  118.     
  119.     // check if the queue is sane
  120.     assertTrue("Priority change garbles the queue", 
  121.                myListener.getJobQueue().size() == 2);
  122.     
  123.     JobInProgress[] queue = 
  124.       myListener.getJobQueue().toArray(new JobInProgress[0]);
  125.     
  126.     // check if the bump has happened
  127.     assertTrue("Priority change failed to bump up job2 in the queue", 
  128.                queue[0].getJobID().equals(rJob2.getID()));
  129.     
  130.     assertTrue("Priority change failed to bump down job1 in the queue", 
  131.                queue[1].getJobID().equals(rJob1.getID()));
  132.     
  133.     assertEquals("Priority change has garbled the queue", 
  134.                  2, queue.length);
  135.     
  136.     // II. Check start-time change
  137.     LOG.info("Testing job start-time changes");
  138.     
  139.     // reset the priority which will make the order as
  140.     //  - job1
  141.     //  - job2
  142.     // this will help in bumping job2 on start-time change
  143.     LOG.info("Increasing job2's priority to NORMAL"); 
  144.     rJob2.setJobPriority("NORMAL");
  145.     
  146.     // create the change event
  147.     JobInProgress jip2 = mr.getJobTrackerRunner().getJobTracker()
  148.                           .getJob(rJob2.getID());
  149.     JobInProgress jip1 = mr.getJobTrackerRunner().getJobTracker()
  150.                            .getJob(rJob1.getID());
  151.     
  152.     JobStatus prevStatus = (JobStatus)jip2.getStatus().clone();
  153.     
  154.     // change job2's start-time and the status
  155.     jip2.startTime =  jip1.startTime - 1;
  156.     jip2.status.setStartTime(jip2.startTime);
  157.     
  158.     
  159.     JobStatus newStatus = (JobStatus)jip2.getStatus().clone();
  160.     
  161.     // inform the listener
  162.     LOG.info("Updating the listener about job2's start-time change");
  163.     JobStatusChangeEvent event = 
  164.       new JobStatusChangeEvent(jip2, EventType.START_TIME_CHANGED, 
  165.                               prevStatus, newStatus);
  166.     myListener.jobUpdated(event);
  167.     
  168.     // check if the queue is sane
  169.     assertTrue("Start time change garbles the queue", 
  170.                myListener.getJobQueue().size() == 2);
  171.     
  172.     queue = myListener.getJobQueue().toArray(new JobInProgress[0]);
  173.     
  174.     // check if the bump has happened
  175.     assertTrue("Start time change failed to bump up job2 in the queue", 
  176.                queue[0].getJobID().equals(rJob2.getID()));
  177.     
  178.     assertTrue("Start time change failed to bump down job1 in the queue", 
  179.                queue[1].getJobID().equals(rJob1.getID()));
  180.     
  181.     assertEquals("Start time change has garbled the queue", 
  182.                  2, queue.length);
  183.     
  184.     // signal the maps to complete
  185.     UtilsForTests.signalTasks(dfs, fileSys, true, mapSignalFile, redSignalFile);
  186.     
  187.     // check if job completion leaves the queue sane
  188.     while (rJob2.getJobState() != JobStatus.SUCCEEDED) {
  189.       UtilsForTests.waitFor(10);
  190.     }
  191.     
  192.     while (rJob1.getJobState() != JobStatus.SUCCEEDED) {
  193.       UtilsForTests.waitFor(10);
  194.     }
  195.     
  196.     assertTrue("Job completion garbles the queue", 
  197.                myListener.getJobQueue().size() == 0);
  198.   }
  199.   
  200.   // A listener that inits the tasks one at a time and also listens to the 
  201.   // events
  202.   public static class MyListener extends JobInProgressListener {
  203.     private List<JobInProgress> wjobs = new ArrayList<JobInProgress>();
  204.     private List<JobInProgress> jobs = new ArrayList<JobInProgress>(); 
  205.     
  206.     public boolean contains (JobID id) {
  207.       return contains(id, true) || contains(id, false);
  208.     }
  209.     
  210.     public boolean contains (JobID id, boolean waiting) {
  211.       List<JobInProgress> queue = waiting ? wjobs : jobs;
  212.       for (JobInProgress job : queue) {
  213.         if (job.getJobID().equals(id)) {
  214.           return true;
  215.         }
  216.       }
  217.       return false;
  218.     }
  219.     
  220.     public void jobAdded(JobInProgress job) {
  221.       LOG.info("Job " + job.getJobID().toString() + " added");
  222.       wjobs.add(job);
  223.     }
  224.     
  225.     public void jobRemoved(JobInProgress job) {
  226.       LOG.info("Job " + job.getJobID().toString() + " removed");
  227.     }
  228.     
  229.     public void jobUpdated(JobChangeEvent event) {
  230.       LOG.info("Job " + event.getJobInProgress().getJobID().toString() + " updated");
  231.       // remove the job is the event is for a completed job
  232.       if (event instanceof JobStatusChangeEvent) {
  233.         JobStatusChangeEvent statusEvent = (JobStatusChangeEvent)event;
  234.         if (statusEvent.getEventType() == EventType.RUN_STATE_CHANGED) {
  235.           // check if the state changes from 
  236.           // RUNNING->COMPLETE(SUCCESS/KILLED/FAILED)
  237.           JobInProgress jip = event.getJobInProgress();
  238.           String jobId = jip.getJobID().toString();
  239.           if (jip.isComplete()) {
  240.             LOG.info("Job " +  jobId + " deleted from the running queue");
  241.             if (statusEvent.getOldStatus().getRunState() == JobStatus.PREP) {
  242.               wjobs.remove(jip);
  243.             } else {
  244.               jobs.remove(jip);
  245.             }
  246.           } else {
  247.             // PREP->RUNNING
  248.             LOG.info("Job " +  jobId + " deleted from the waiting queue");
  249.             wjobs.remove(jip);
  250.             jobs.add(jip);
  251.           }
  252.         }
  253.       }
  254.     }
  255.   }
  256.   
  257.   public void testJobFailure() throws Exception {
  258.     LOG.info("Testing job-success");
  259.     
  260.     MyListener myListener = new MyListener();
  261.     MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1);
  262.     
  263.     JobConf job = mr.createJobConf();
  264.     
  265.     mr.getJobTrackerRunner().getJobTracker()
  266.       .addJobInProgressListener(myListener);
  267.     Path inDir = new Path(TEST_ROOT_DIR + "/jiplistenerfailjob/input");
  268.     Path outDir = new Path(TEST_ROOT_DIR + "/jiplistenerfailjob/output");
  269.     // submit a job that fails 
  270.     RunningJob rJob = UtilsForTests.runJobFail(job, inDir, outDir);
  271.     JobID id = rJob.getID();
  272.     // check if the job failure was notified
  273.     assertFalse("Missing event notification on failing a running job", 
  274.                 myListener.contains(id));
  275.     
  276.   }
  277.   
  278.   public void testJobKill() throws Exception {
  279.     LOG.info("Testing job-kill");
  280.     
  281.     MyListener myListener = new MyListener();
  282.     MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1);
  283.     
  284.     JobConf job = mr.createJobConf();
  285.     
  286.     mr.getJobTrackerRunner().getJobTracker()
  287.       .addJobInProgressListener(myListener);
  288.     
  289.     Path inDir = new Path(TEST_ROOT_DIR + "/jiplistenerkilljob/input");
  290.     Path outDir = new Path(TEST_ROOT_DIR + "/jiplistenerkilljob/output");
  291.     // submit and kill the job   
  292.     RunningJob rJob = UtilsForTests.runJobKill(job, inDir, outDir);
  293.     JobID id = rJob.getID();
  294.     // check if the job failure was notified
  295.     assertFalse("Missing event notification on killing a running job", 
  296.                 myListener.contains(id));
  297.     
  298.   }
  299.   
  300.   public void testJobSuccess() throws Exception {
  301.     LOG.info("Testing job-success");
  302.     MyListener myListener = new MyListener();
  303.     
  304.     MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1);
  305.     
  306.     JobConf job = mr.createJobConf();
  307.     
  308.     mr.getJobTrackerRunner().getJobTracker()
  309.       .addJobInProgressListener(myListener);
  310.     
  311.     Path inDir = new Path(TEST_ROOT_DIR + "/jiplistenerjob/input");
  312.     Path outDir = new Path(TEST_ROOT_DIR + "/jiplistenerjob/output");
  313.     // submit the job   
  314.     RunningJob rJob = UtilsForTests.runJob(job, inDir, outDir);
  315.     
  316.     // wait for the job to be running
  317.     while (rJob.getJobState() != JobStatus.RUNNING) {
  318.       UtilsForTests.waitFor(10);
  319.     }
  320.     
  321.     LOG.info("Job " +  rJob.getID().toString() + " started running");
  322.     
  323.     // check if the listener was updated about this change
  324.     assertFalse("Missing event notification for a running job", 
  325.                 myListener.contains(rJob.getID(), true));
  326.     
  327.     while (rJob.getJobState() != JobStatus.SUCCEEDED) {
  328.       UtilsForTests.waitFor(10);
  329.     }
  330.     
  331.     // check if the job success was notified
  332.     assertFalse("Missing event notification for a successful job", 
  333.                 myListener.contains(rJob.getID(), false));
  334.   }
  335.   
  336.   /**
  337.    * This scheduler never schedules any task as it doesnt init any task. So all
  338.    * the jobs are queued forever.
  339.    */
  340.   public static class MyScheduler extends JobQueueTaskScheduler {
  341.     @Override
  342.     public synchronized void start() throws IOException {
  343.       super.start();
  344.       // Remove the eager task initializer
  345.       taskTrackerManager.removeJobInProgressListener(
  346.           eagerTaskInitializationListener);
  347.       // terminate it
  348.       eagerTaskInitializationListener.terminate();
  349.     }
  350.   }
  351.   
  352.   public void testQueuedJobKill() throws Exception {
  353.     LOG.info("Testing queued-job-kill");
  354.     
  355.     MyListener myListener = new MyListener();
  356.     
  357.     JobConf job = new JobConf();
  358.     job.setClass("mapred.jobtracker.taskScheduler", MyScheduler.class,
  359.                  TaskScheduler.class);
  360.     MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1, null, null, job);
  361.     
  362.     job = mr.createJobConf();
  363.     
  364.     mr.getJobTrackerRunner().getJobTracker()
  365.       .addJobInProgressListener(myListener);
  366.     
  367.     Path inDir = new Path(TEST_ROOT_DIR + "/jiplistenerjob/input");
  368.     Path outDir = new Path(TEST_ROOT_DIR + "/jiplistenerjob/output");
  369.     RunningJob rJob = UtilsForTests.runJob(job, inDir, outDir);
  370.     JobID id = rJob.getID();
  371.     LOG.info("Job : " + id.toString() + " submitted");
  372.     
  373.     // check if the job is in the waiting queue
  374.     assertTrue("Missing event notification on submiting a job", 
  375.                 myListener.contains(id, true));
  376.     
  377.     // kill the job
  378.     LOG.info("Killing job : " + id.toString());
  379.     rJob.killJob();
  380.     
  381.     // check if the job is killed
  382.     assertEquals("Job status doesnt reflect the kill-job action", 
  383.                  JobStatus.KILLED, rJob.getJobState());
  384.     // check if the job is correctly moved
  385.     // from the waiting list
  386.     assertFalse("Missing event notification on killing a waiting job", 
  387.                 myListener.contains(id, true));
  388.   }
  389. }