TestLimitTasksPerJobTaskScheduler.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:6k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.IOException;
  20. import junit.framework.TestCase;
  21. import org.apache.hadoop.mapred.TestJobQueueTaskScheduler.FakeTaskTrackerManager;
  22. public class TestLimitTasksPerJobTaskScheduler extends TestCase {
  23.   protected JobConf jobConf;
  24.   protected TaskScheduler scheduler;
  25.   private FakeTaskTrackerManager taskTrackerManager;
  26.   @Override
  27.   protected void setUp() throws Exception {
  28.     TestJobQueueTaskScheduler.resetCounters();
  29.     jobConf = new JobConf();
  30.     jobConf.setNumMapTasks(10);
  31.     jobConf.setNumReduceTasks(10);
  32.     taskTrackerManager = new FakeTaskTrackerManager();
  33.     scheduler = createTaskScheduler();
  34.     scheduler.setConf(jobConf);
  35.     scheduler.setTaskTrackerManager(taskTrackerManager);
  36.     scheduler.start();
  37.   }
  38.   
  39.   @Override
  40.   protected void tearDown() throws Exception {
  41.     if (scheduler != null) {
  42.       scheduler.terminate();
  43.     }
  44.   }
  45.   protected TaskScheduler createTaskScheduler() {
  46.     return new LimitTasksPerJobTaskScheduler();
  47.   }
  48.   public void testMaxRunningTasksPerJob() throws IOException {
  49.     jobConf.setLong(LimitTasksPerJobTaskScheduler.MAX_TASKS_PER_JOB_PROPERTY,
  50.         4L);
  51.     scheduler.setConf(jobConf);
  52.     TestJobQueueTaskScheduler.submitJobs(taskTrackerManager, jobConf, 
  53.                                          2, JobStatus.RUNNING);
  54.     
  55.     // First 4 slots are filled with job 1, second 4 with job 2
  56.     TestJobQueueTaskScheduler.checkAssignment(
  57.         scheduler, TestJobQueueTaskScheduler.tracker(taskTrackerManager, "tt1"), 
  58.         new String[] {"attempt_test_0001_m_000001_0 on tt1"});
  59.     TestJobQueueTaskScheduler.checkAssignment(
  60.         scheduler, TestJobQueueTaskScheduler.tracker(taskTrackerManager, "tt1"), 
  61.         new String[] {"attempt_test_0001_m_000002_0 on tt1"});
  62.     TestJobQueueTaskScheduler.checkAssignment(
  63.         scheduler, TestJobQueueTaskScheduler.tracker(taskTrackerManager, "tt1"), 
  64.         new String[] {"attempt_test_0001_r_000003_0 on tt1"});
  65.     TestJobQueueTaskScheduler.checkAssignment(
  66.         scheduler, TestJobQueueTaskScheduler.tracker(taskTrackerManager, "tt1"), 
  67.         new String[] {"attempt_test_0001_r_000004_0 on tt1"});
  68.     TestJobQueueTaskScheduler.checkAssignment(
  69.         scheduler, TestJobQueueTaskScheduler.tracker(taskTrackerManager, "tt2"), 
  70.         new String[] {"attempt_test_0002_m_000005_0 on tt2"});
  71.     TestJobQueueTaskScheduler.checkAssignment(
  72.         scheduler, TestJobQueueTaskScheduler.tracker(taskTrackerManager, "tt2"), 
  73.         new String[] {"attempt_test_0002_m_000006_0 on tt2"});
  74.     TestJobQueueTaskScheduler.checkAssignment(
  75.         scheduler, TestJobQueueTaskScheduler.tracker(taskTrackerManager, "tt2"), 
  76.         new String[] {"attempt_test_0002_r_000007_0 on tt2"});
  77.     TestJobQueueTaskScheduler.checkAssignment(
  78.         scheduler, TestJobQueueTaskScheduler.tracker(taskTrackerManager, "tt2"), 
  79.         new String[] {"attempt_test_0002_r_000008_0 on tt2"});
  80.   }
  81.   
  82.   public void testMaxRunningTasksPerJobWithInterleavedTrackers()
  83.       throws IOException {
  84.     jobConf.setLong(LimitTasksPerJobTaskScheduler.MAX_TASKS_PER_JOB_PROPERTY,
  85.         4L);
  86.     scheduler.setConf(jobConf);
  87.     TestJobQueueTaskScheduler.submitJobs(taskTrackerManager, jobConf, 2, JobStatus.RUNNING);
  88.     
  89.     // First 4 slots are filled with job 1, second 4 with job 2
  90.     // even when tracker requests are interleaved
  91.     TestJobQueueTaskScheduler.checkAssignment(
  92.         scheduler, TestJobQueueTaskScheduler.tracker(taskTrackerManager, "tt1"), 
  93.         new String[] {"attempt_test_0001_m_000001_0 on tt1"});
  94.     TestJobQueueTaskScheduler.checkAssignment(
  95.         scheduler, TestJobQueueTaskScheduler.tracker(taskTrackerManager, "tt1"), 
  96.         new String[] {"attempt_test_0001_m_000002_0 on tt1"});
  97.     TestJobQueueTaskScheduler.checkAssignment(
  98.         scheduler, TestJobQueueTaskScheduler.tracker(taskTrackerManager, "tt2"), 
  99.         new String[] {"attempt_test_0001_m_000003_0 on tt2"});
  100.     TestJobQueueTaskScheduler.checkAssignment(
  101.         scheduler, TestJobQueueTaskScheduler.tracker(taskTrackerManager, "tt1"), 
  102.         new String[] {"attempt_test_0001_r_000004_0 on tt1"});
  103.     TestJobQueueTaskScheduler.checkAssignment(
  104.         scheduler, TestJobQueueTaskScheduler.tracker(taskTrackerManager, "tt2"), 
  105.         new String[] {"attempt_test_0002_m_000005_0 on tt2"});
  106.     TestJobQueueTaskScheduler.checkAssignment(
  107.         scheduler, TestJobQueueTaskScheduler.tracker(taskTrackerManager, "tt1"), 
  108.         new String[] {"attempt_test_0002_r_000006_0 on tt1"});
  109.     TestJobQueueTaskScheduler.checkAssignment(
  110.         scheduler, TestJobQueueTaskScheduler.tracker(taskTrackerManager, "tt2"), 
  111.         new String[] {"attempt_test_0002_r_000007_0 on tt2"});
  112.     TestJobQueueTaskScheduler.checkAssignment(
  113.         scheduler, TestJobQueueTaskScheduler.tracker(taskTrackerManager, "tt2"), 
  114.         new String[] {"attempt_test_0002_r_000008_0 on tt2"});
  115.   }
  116.   
  117. }