TestQueueCapacities.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:17k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.util.Properties;
  20. import org.apache.hadoop.mapred.ControlledMapReduceJob.ControlledMapReduceJobRunner;
  21. /**
  22.  * End to end tests based on MiniMRCluster to verify that queue capacities are
  23.  * honored. Automates the tests related to queue capacities: submits jobs to
  24.  * different queues simultaneously and ensures that capacities are honored
  25.  */
  26. public class TestQueueCapacities extends ClusterWithCapacityScheduler {
  27.   /**
  28.    * Test single queue.
  29.    * 
  30.    * <p>
  31.    * 
  32.    * Submit a job with more M/R tasks than total capacity. Full queue capacity
  33.    * should be utilized and remaining M/R tasks should wait for slots to be
  34.    * available.
  35.    * 
  36.    * @throws Exception
  37.    */
  38.   public void testSingleQueue()
  39.       throws Exception {
  40.     Properties schedulerProps = new Properties();
  41.     schedulerProps.put(
  42.         "mapred.capacity-scheduler.queue.default.guaranteed-capacity", "100");
  43.     Properties clusterProps = new Properties();
  44.     clusterProps
  45.         .put("mapred.tasktracker.map.tasks.maximum", String.valueOf(3));
  46.     clusterProps.put("mapred.tasktracker.reduce.tasks.maximum", String
  47.         .valueOf(3));
  48.     // cluster capacity 12 maps, 12 reduces
  49.     startCluster(4, 2, clusterProps, schedulerProps);
  50.     ControlledMapReduceJobRunner jobRunner =
  51.         ControlledMapReduceJobRunner.getControlledMapReduceJobRunner(
  52.             getJobConf(), 16, 16);
  53.     jobRunner.start();
  54.     ControlledMapReduceJob controlledJob = jobRunner.getJob();
  55.     JobID myJobID = jobRunner.getJobID();
  56.     JobInProgress myJob = getJobTracker().getJob(myJobID);
  57.     ControlledMapReduceJob.waitTillNTasksStartRunning(myJob, true, 12);
  58.     // Wait till the cluster reaches steady state. This confirms that the rest
  59.     // of the tasks are not running and waiting for slots
  60.     // to be freed.
  61.     waitTillAllSlotsAreOccupied(true);
  62.     LOG.info("Trying to finish 2 maps");
  63.     controlledJob.finishNTasks(true, 2);
  64.     ControlledMapReduceJob.waitTillNTotalTasksFinish(myJob, true, 2);
  65.     assertTrue("Number of maps finished", myJob.finishedMaps() == 2);
  66.     ControlledMapReduceJob.waitTillNTasksStartRunning(myJob, true, 12);
  67.     waitTillAllSlotsAreOccupied(true);
  68.     LOG.info("Trying to finish 2 more maps");
  69.     controlledJob.finishNTasks(true, 2);
  70.     ControlledMapReduceJob.waitTillNTotalTasksFinish(myJob, true, 4);
  71.     assertTrue("Number of maps finished", myJob.finishedMaps() == 4);
  72.     ControlledMapReduceJob.waitTillNTasksStartRunning(myJob, true, 12);
  73.     waitTillAllSlotsAreOccupied(true);
  74.     LOG.info("Trying to finish the last 12 maps");
  75.     controlledJob.finishNTasks(true, 12);
  76.     ControlledMapReduceJob.waitTillNTotalTasksFinish(myJob, true, 16);
  77.     assertTrue("Number of maps finished", myJob.finishedMaps() == 16);
  78.     ControlledMapReduceJob.waitTillNTasksStartRunning(myJob, true, 0);
  79.     ControlledMapReduceJob.haveAllTasksFinished(myJob, true);
  80.     ControlledMapReduceJob.waitTillNTasksStartRunning(myJob, false, 12);
  81.     waitTillAllSlotsAreOccupied(false);
  82.     LOG.info("Trying to finish 4 reduces");
  83.     controlledJob.finishNTasks(false, 4);
  84.     ControlledMapReduceJob.waitTillNTotalTasksFinish(myJob, false, 4);
  85.     assertTrue("Number of reduces finished", myJob.finishedReduces() == 4);
  86.     ControlledMapReduceJob.waitTillNTasksStartRunning(myJob, false, 12);
  87.     waitTillAllSlotsAreOccupied(false);
  88.     LOG.info("Trying to finish the last 12 reduces");
  89.     controlledJob.finishNTasks(false, 12);
  90.     ControlledMapReduceJob.waitTillNTotalTasksFinish(myJob, false, 16);
  91.     assertTrue("Number of reduces finished", myJob.finishedReduces() == 16);
  92.     ControlledMapReduceJob.waitTillNTasksStartRunning(myJob, false, 0);
  93.     ControlledMapReduceJob.haveAllTasksFinished(myJob, false);
  94.     jobRunner.join();
  95.   }
  96.   /**
  97.    * Test single queue with multiple jobs.
  98.    * 
  99.    * @throws Exception
  100.    */
  101.   public void testSingleQueueMultipleJobs()
  102.       throws Exception {
  103.     Properties schedulerProps = new Properties();
  104.     schedulerProps.put(
  105.         "mapred.capacity-scheduler.queue.default.guaranteed-capacity", "100");
  106.     Properties clusterProps = new Properties();
  107.     clusterProps
  108.         .put("mapred.tasktracker.map.tasks.maximum", String.valueOf(3));
  109.     clusterProps.put("mapred.tasktracker.reduce.tasks.maximum", String
  110.         .valueOf(0));
  111.     // cluster capacity 12 maps, 0 reduces
  112.     startCluster(4, 2, clusterProps, schedulerProps);
  113.     singleQMultipleJobs1();
  114.     singleQMultipleJobs2();
  115.   }
  116.   /**
  117.    * Test multiple queues.
  118.    * 
  119.    * These tests use 4 queues default, Q2, Q3 and Q4 with guaranteed capacities
  120.    * 10, 20, 30, 40 respectively), user limit 100%, priority not respected, one
  121.    * user per queue. Reclaim time 5 minutes.
  122.    * 
  123.    * @throws Exception
  124.    */
  125.   public void testMultipleQueues()
  126.       throws Exception {
  127.     Properties schedulerProps = new Properties();
  128.     String[] queues = new String[] { "default", "Q2", "Q3", "Q4" };
  129.     int GC = 0;
  130.     for (String q : queues) {
  131.       GC += 10;
  132.       schedulerProps.put(CapacitySchedulerConf.toFullPropertyName(q,
  133.           "guaranteed-capacity"), String.valueOf(GC)); // TODO: use strings
  134.       schedulerProps.put(CapacitySchedulerConf.toFullPropertyName(q,
  135.           "minimum-user-limit-percent"), String.valueOf(100));
  136.       schedulerProps.put(CapacitySchedulerConf.toFullPropertyName(q,
  137.           "reclaim-time-limit"), String.valueOf(300));
  138.     }
  139.     Properties clusterProps = new Properties();
  140.     clusterProps
  141.         .put("mapred.tasktracker.map.tasks.maximum", String.valueOf(2));
  142.     clusterProps.put("mapred.tasktracker.reduce.tasks.maximum", String
  143.         .valueOf(2));
  144.     clusterProps.put("mapred.queue.names", queues[0] + "," + queues[1] + ","
  145.         + queues[2] + "," + queues[3]);
  146.     // cluster capacity 10 maps, 10 reduces and 4 queues with capacities 1, 2,
  147.     // 3, 4 respectively.
  148.     startCluster(5, 2, clusterProps, schedulerProps);
  149.     multipleQsWithOneQBeyondCapacity(queues);
  150.     multipleQueuesWithinCapacities(queues);
  151.   }
  152.   /**
  153.    * Submit a job with more M/R tasks than total queue capacity and then submit
  154.    * another job. First job utilizes all the slots. When the second job is
  155.    * submitted, the tasks of the second job wait for slots to be available. As
  156.    * the tasks of the first jobs finish and there are no more tasks pending, the
  157.    * tasks of the second job start running on the freed up slots.
  158.    * 
  159.    * @throws Exception
  160.    */
  161.   private void singleQMultipleJobs1()
  162.       throws Exception {
  163.     ControlledMapReduceJobRunner jobRunner1 =
  164.         ControlledMapReduceJobRunner.getControlledMapReduceJobRunner(
  165.             getJobConf(), 16, 0);
  166.     ControlledMapReduceJobRunner jobRunner2 =
  167.         ControlledMapReduceJobRunner.getControlledMapReduceJobRunner(
  168.             getJobConf(), 12, 0);
  169.     jobRunner1.start();
  170.     ControlledMapReduceJob controlledJob1 = jobRunner1.getJob();
  171.     JobID jobID1 = jobRunner1.getJobID();
  172.     JobInProgress jip1 = getJobTracker().getJob(jobID1);
  173.     ControlledMapReduceJob.waitTillNTasksStartRunning(jip1, true, 12);
  174.     // Confirm that the rest of the tasks are not running and waiting for slots
  175.     // to be freed.
  176.     waitTillAllSlotsAreOccupied(true);
  177.     // Now start the second job.
  178.     jobRunner2.start();
  179.     JobID jobID2 = jobRunner2.getJobID();
  180.     ControlledMapReduceJob controlledJob2 = jobRunner2.getJob();
  181.     JobInProgress jip2 = getJobTracker().getJob(jobID2);
  182.     LOG.info("Trying to finish 2 map");
  183.     controlledJob1.finishNTasks(true, 2);
  184.     ControlledMapReduceJob.waitTillNTotalTasksFinish(jip1, true, 2);
  185.     assertTrue("Number of maps finished", jip1.finishedMaps() == 2);
  186.     ControlledMapReduceJob.waitTillNTasksStartRunning(jip1, true, 12);
  187.     waitTillAllSlotsAreOccupied(true);
  188.     LOG.info("Trying to finish 2 more maps");
  189.     controlledJob1.finishNTasks(true, 2);
  190.     ControlledMapReduceJob.waitTillNTotalTasksFinish(jip1, true, 4);
  191.     assertTrue("Number of maps finished", jip1.finishedMaps() == 4);
  192.     ControlledMapReduceJob.waitTillNTasksStartRunning(jip1, true, 12);
  193.     waitTillAllSlotsAreOccupied(true);
  194.     // All tasks of Job1 started running/finished. Now job2 should start
  195.     LOG.info("Trying to finish 2 more maps");
  196.     controlledJob1.finishNTasks(true, 2);
  197.     ControlledMapReduceJob.waitTillNTotalTasksFinish(jip1, true, 6);
  198.     assertTrue("Number of maps finished", jip1.finishedMaps() == 6);
  199.     ControlledMapReduceJob.waitTillNTasksStartRunning(jip1, true, 10);
  200.     ControlledMapReduceJob.waitTillNTasksStartRunning(jip2, true, 2);
  201.     waitTillAllSlotsAreOccupied(true);
  202.     ControlledMapReduceJob.assertNumTasksRunning(jip1, true, 10);
  203.     ControlledMapReduceJob.assertNumTasksRunning(jip2, true, 2);
  204.     LOG.info("Trying to finish 10 more maps and hence job1");
  205.     controlledJob1.finishNTasks(true, 10);
  206.     ControlledMapReduceJob.waitTillNTotalTasksFinish(jip1, true, 16);
  207.     assertTrue("Number of maps finished", jip1.finishedMaps() == 16);
  208.     ControlledMapReduceJob.waitTillNTasksStartRunning(jip2, true, 12);
  209.     controlledJob1.finishJob();
  210.     waitTillAllSlotsAreOccupied(true);
  211.     ControlledMapReduceJob.assertNumTasksRunning(jip1, true, 0);
  212.     ControlledMapReduceJob.assertNumTasksRunning(jip2, true, 12);
  213.     // Finish job2 also
  214.     controlledJob2.finishJob();
  215.     ControlledMapReduceJob.waitTillNTotalTasksFinish(jip2, true, 12);
  216.     ControlledMapReduceJob.assertNumTasksRunning(jip2, true, 0);
  217.     jobRunner1.join();
  218.     jobRunner2.join();
  219.   }
  220.   /**
  221.    * Submit a job with less M/R tasks than total capacity and another job with
  222.    * more M/R tasks than the remaining capacity. First job should utilize the
  223.    * required slots and other job should utilize the available slots and its
  224.    * remaining tasks wait for slots to become free.
  225.    * 
  226.    * @throws Exception
  227.    */
  228.   private void singleQMultipleJobs2()
  229.       throws Exception {
  230.     ControlledMapReduceJobRunner jobRunner1 =
  231.         ControlledMapReduceJobRunner.getControlledMapReduceJobRunner(
  232.             getJobConf(), 8, 0);
  233.     ControlledMapReduceJobRunner jobRunner2 =
  234.         ControlledMapReduceJobRunner.getControlledMapReduceJobRunner(
  235.             getJobConf(), 12, 0);
  236.     jobRunner1.start();
  237.     ControlledMapReduceJob controlledJob1 = jobRunner1.getJob();
  238.     JobID jobID1 = jobRunner1.getJobID();
  239.     JobInProgress jip1 = getJobTracker().getJob(jobID1);
  240.     ControlledMapReduceJob.waitTillNTasksStartRunning(jip1, true, 8);
  241.     ControlledMapReduceJob.assertNumTasksRunning(jip1, true, 8);
  242.     // Now start the second job.
  243.     jobRunner2.start();
  244.     JobID jobID2 = jobRunner2.getJobID();
  245.     ControlledMapReduceJob controlledJob2 = jobRunner2.getJob();
  246.     JobInProgress jip2 = getJobTracker().getJob(jobID2);
  247.     ControlledMapReduceJob.waitTillNTasksStartRunning(jip2, true, 4);
  248.     waitTillAllSlotsAreOccupied(true);
  249.     ControlledMapReduceJob.assertNumTasksRunning(jip1, true, 8);
  250.     // The rest of the tasks of job2 should wait.
  251.     ControlledMapReduceJob.assertNumTasksRunning(jip2, true, 4);
  252.     LOG.info("Trying to finish 2 maps of job1");
  253.     controlledJob1.finishNTasks(true, 2);
  254.     ControlledMapReduceJob.waitTillNTotalTasksFinish(jip1, true, 2);
  255.     assertTrue("Number of maps finished", jip1.finishedMaps() == 2);
  256.     ControlledMapReduceJob.waitTillNTasksStartRunning(jip1, true, 6);
  257.     ControlledMapReduceJob.waitTillNTasksStartRunning(jip2, true, 6);
  258.     waitTillAllSlotsAreOccupied(true);
  259.     ControlledMapReduceJob.assertNumTasksRunning(jip1, true, 6);
  260.     ControlledMapReduceJob.assertNumTasksRunning(jip2, true, 6);
  261.     LOG.info("Trying to finish 6 more maps of job1");
  262.     controlledJob1.finishNTasks(true, 6);
  263.     ControlledMapReduceJob.waitTillNTotalTasksFinish(jip1, true, 8);
  264.     assertTrue("Number of maps finished", jip1.finishedMaps() == 8);
  265.     ControlledMapReduceJob.waitTillNTasksStartRunning(jip2, true, 12);
  266.     waitTillAllSlotsAreOccupied(true);
  267.     ControlledMapReduceJob.assertNumTasksRunning(jip1, true, 0);
  268.     ControlledMapReduceJob.assertNumTasksRunning(jip2, true, 12);
  269.     // Finish job2 also
  270.     controlledJob2.finishJob();
  271.     ControlledMapReduceJob.waitTillNTotalTasksFinish(jip2, true, 12);
  272.     ControlledMapReduceJob.assertNumTasksRunning(jip2, true, 0);
  273.     jobRunner1.join();
  274.     jobRunner2.join();
  275.   }
  276.   /**
  277.    * Test to verify running of tasks in a queue going over its capacity. In
  278.    * queue default, user U1 starts a job J1, having more M/R tasks than the
  279.    * total slots. M/R tasks of job J1 should start running on all the nodes (100
  280.    * % utilization).
  281.    * 
  282.    * @throws Exception
  283.    */
  284.   private void multipleQsWithOneQBeyondCapacity(String[] queues)
  285.       throws Exception {
  286.     JobConf conf = getJobConf();
  287.     conf.setQueueName(queues[0]);
  288.     conf.setUser("U1");
  289.     ControlledMapReduceJobRunner jobRunner =
  290.         ControlledMapReduceJobRunner.getControlledMapReduceJobRunner(conf, 15,
  291.             0);
  292.     jobRunner.start();
  293.     ControlledMapReduceJob controlledJob = jobRunner.getJob();
  294.     JobID myJobID = jobRunner.getJobID();
  295.     JobInProgress myJob = getJobTracker().getJob(myJobID);
  296.     ControlledMapReduceJob.waitTillNTasksStartRunning(myJob, true, 10);
  297.     // Confirm that the rest of the tasks are not running and waiting for slots
  298.     // to be freed.
  299.     waitTillAllSlotsAreOccupied(true);
  300.     ControlledMapReduceJob.assertNumTasksRunning(myJob, true, 10);
  301.     LOG.info("Trying to finish 3 maps");
  302.     controlledJob.finishNTasks(true, 3);
  303.     ControlledMapReduceJob.waitTillNTotalTasksFinish(myJob, true, 3);
  304.     assertTrue("Number of maps finished", myJob.finishedMaps() == 3);
  305.     ControlledMapReduceJob.waitTillNTasksStartRunning(myJob, true, 10);
  306.     waitTillAllSlotsAreOccupied(true);
  307.     ControlledMapReduceJob.assertNumTasksRunning(myJob, true, 10);
  308.     LOG.info("Trying to finish 2 more maps");
  309.     controlledJob.finishNTasks(true, 2);
  310.     ControlledMapReduceJob.waitTillNTotalTasksFinish(myJob, true, 5);
  311.     assertTrue("Number of maps finished", myJob.finishedMaps() == 5);
  312.     ControlledMapReduceJob.waitTillNTasksStartRunning(myJob, true, 10);
  313.     waitTillAllSlotsAreOccupied(true);
  314.     ControlledMapReduceJob.assertNumTasksRunning(myJob, true, 10);
  315.     // Finish job
  316.     controlledJob.finishJob();
  317.     ControlledMapReduceJob.waitTillNTotalTasksFinish(myJob, true, 15);
  318.     ControlledMapReduceJob.assertNumTasksRunning(myJob, true, 0);
  319.     jobRunner.join();
  320.   }
  321.   /**
  322.    * Test to verify queue capacities across multiple queues. In this test, jobs
  323.    * are submitted to different queues - all below the queue's capacity and
  324.    * verifies that all the jobs are running. This will test code paths related
  325.    * to job initialization, considering multiple queues for scheduling jobs etc.
  326.    * 
  327.    * <p>
  328.    * 
  329.    * One user per queue. Four jobs are submitted to the four queues such that
  330.    * they exactly fill up the queues. No queue should be beyond capacity. All
  331.    * jobs should be running.
  332.    * 
  333.    * @throws Exception
  334.    */
  335.   private void multipleQueuesWithinCapacities(String[] queues)
  336.       throws Exception {
  337.     String[] users = new String[] { "U1", "U2", "U3", "U4" };
  338.     ControlledMapReduceJobRunner[] jobRunners =
  339.         new ControlledMapReduceJobRunner[4];
  340.     ControlledMapReduceJob[] controlledJobs = new ControlledMapReduceJob[4];
  341.     JobInProgress[] jips = new JobInProgress[4];
  342.     // Initialize all the jobs
  343.     // Start all the jobs in parallel
  344.     JobConf conf = getJobConf();
  345.     int numTasks = 1;
  346.     for (int i = 0; i < 4; i++) {
  347.       conf.setQueueName(queues[i]);
  348.       conf.setUser(users[i]);
  349.       jobRunners[i] =
  350.           ControlledMapReduceJobRunner.getControlledMapReduceJobRunner(
  351.               getJobConf(), numTasks, numTasks);
  352.       jobRunners[i].start();
  353.       controlledJobs[i] = jobRunners[i].getJob();
  354.       JobID jobID = jobRunners[i].getJobID();
  355.       jips[i] = getJobTracker().getJob(jobID);
  356.       // Wait till all the jobs start running all of their tasks
  357.       ControlledMapReduceJob.waitTillNTasksStartRunning(jips[i], true,
  358.           numTasks);
  359.       ControlledMapReduceJob.waitTillNTasksStartRunning(jips[i], false,
  360.           numTasks);
  361.       numTasks += 1;
  362.     }
  363.     // Ensure steady state behavior
  364.     waitTillAllSlotsAreOccupied(true);
  365.     waitTillAllSlotsAreOccupied(false);
  366.     numTasks = 1;
  367.     for (int i = 0; i < 4; i++) {
  368.       ControlledMapReduceJob.assertNumTasksRunning(jips[i], true, numTasks);
  369.       ControlledMapReduceJob.assertNumTasksRunning(jips[i], false, numTasks);
  370.       numTasks += 1;
  371.     }
  372.     // Finish the jobs and join them
  373.     numTasks = 1;
  374.     for (int i = 0; i < 4; i++) {
  375.       controlledJobs[i].finishJob();
  376.       ControlledMapReduceJob
  377.           .waitTillNTotalTasksFinish(jips[i], true, numTasks);
  378.       ControlledMapReduceJob.assertNumTasksRunning(jips[i], true, 0);
  379.       ControlledMapReduceJob.waitTillNTotalTasksFinish(jips[i], false,
  380.           numTasks);
  381.       ControlledMapReduceJob.assertNumTasksRunning(jips[i], false, 0);
  382.       jobRunners[i].join();
  383.       numTasks += 1;
  384.     }
  385.   }
  386. }