TestJobQueueTaskScheduler.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:10k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.IOException;
  20. import java.util.ArrayList;
  21. import java.util.Collection;
  22. import java.util.HashMap;
  23. import java.util.List;
  24. import java.util.Map;
  25. import junit.framework.TestCase;
  26. import org.apache.hadoop.io.BytesWritable;
  27. public class TestJobQueueTaskScheduler extends TestCase {
  28.   
  29.   private static int jobCounter;
  30.   private static int taskCounter;
  31.   
  32.   static void resetCounters() {
  33.     jobCounter = 0;
  34.     taskCounter = 0;
  35.   }
  36.   
  37.   static class FakeJobInProgress extends JobInProgress {
  38.     
  39.     private FakeTaskTrackerManager taskTrackerManager;
  40.     
  41.     public FakeJobInProgress(JobConf jobConf,
  42.         FakeTaskTrackerManager taskTrackerManager) throws IOException {
  43.       super(new JobID("test", ++jobCounter), jobConf);
  44.       this.taskTrackerManager = taskTrackerManager;
  45.       this.startTime = System.currentTimeMillis();
  46.       this.status = new JobStatus(getJobID(), 0f, 0f, JobStatus.PREP);
  47.       this.status.setJobPriority(JobPriority.NORMAL);
  48.       this.status.setStartTime(startTime);
  49.     }
  50.     @Override
  51.     public synchronized void initTasks() throws IOException {
  52.       // do nothing
  53.     }
  54.     @Override
  55.     public Task obtainNewLocalMapTask(TaskTrackerStatus tts, int clusterSize, 
  56.                                       int ignored) 
  57.     throws IOException {
  58.       return obtainNewMapTask(tts, clusterSize, ignored);
  59.     }
  60.     
  61.     @Override
  62.     public Task obtainNewNonLocalMapTask(TaskTrackerStatus tts, int clusterSize, 
  63.                                          int ignored) 
  64.     throws IOException {
  65.       return obtainNewMapTask(tts, clusterSize, ignored);
  66.     }
  67.     
  68.     @Override
  69.     public Task obtainNewMapTask(final TaskTrackerStatus tts, int clusterSize,
  70.         int ignored) throws IOException {
  71.       TaskAttemptID attemptId = getTaskAttemptID(true);
  72.       Task task = new MapTask("", attemptId, 0, "", new BytesWritable()) {
  73.         @Override
  74.         public String toString() {
  75.           return String.format("%s on %s", getTaskID(), tts.getTrackerName());
  76.         }
  77.       };
  78.       taskTrackerManager.update(tts.getTrackerName(), task);
  79.       runningMapTasks++;
  80.       return task;
  81.     }
  82.     
  83.     @Override
  84.     public Task obtainNewReduceTask(final TaskTrackerStatus tts,
  85.         int clusterSize, int ignored) throws IOException {
  86.       TaskAttemptID attemptId = getTaskAttemptID(false);
  87.       Task task = new ReduceTask("", attemptId, 0, 10) {
  88.         @Override
  89.         public String toString() {
  90.           return String.format("%s on %s", getTaskID(), tts.getTrackerName());
  91.         }
  92.       };
  93.       taskTrackerManager.update(tts.getTrackerName(), task);
  94.       runningReduceTasks++;
  95.       return task;
  96.     }
  97.     
  98.     private TaskAttemptID getTaskAttemptID(boolean isMap) {
  99.       JobID jobId = getJobID();
  100.       return new TaskAttemptID(jobId.getJtIdentifier(),
  101.           jobId.getId(), isMap, ++taskCounter, 0);
  102.     }
  103.   }
  104.   
  105.   static class FakeTaskTrackerManager implements TaskTrackerManager {
  106.     
  107.     int maps = 0;
  108.     int reduces = 0;
  109.     int maxMapTasksPerTracker = 2;
  110.     int maxReduceTasksPerTracker = 2;
  111.     List<JobInProgressListener> listeners =
  112.       new ArrayList<JobInProgressListener>();
  113.     QueueManager queueManager;
  114.     
  115.     private Map<String, TaskTrackerStatus> trackers =
  116.       new HashMap<String, TaskTrackerStatus>();
  117.     public FakeTaskTrackerManager() {
  118.       JobConf conf = new JobConf();
  119.       queueManager = new QueueManager(conf);
  120.       trackers.put("tt1", new TaskTrackerStatus("tt1", "tt1.host", 1,
  121.                    new ArrayList<TaskStatus>(), 0,
  122.                    maxMapTasksPerTracker, maxReduceTasksPerTracker));
  123.       trackers.put("tt2", new TaskTrackerStatus("tt2", "tt2.host", 2,
  124.                    new ArrayList<TaskStatus>(), 0,
  125.                    maxMapTasksPerTracker, maxReduceTasksPerTracker));
  126.     }
  127.     
  128.     @Override
  129.     public ClusterStatus getClusterStatus() {
  130.       int numTrackers = trackers.size();
  131.       return new ClusterStatus(numTrackers, 0, 
  132.                                JobTracker.TASKTRACKER_EXPIRY_INTERVAL,
  133.                                maps, reduces,
  134.                                numTrackers * maxMapTasksPerTracker,
  135.                                numTrackers * maxReduceTasksPerTracker,
  136.                                JobTracker.State.RUNNING);
  137.     }
  138.     @Override
  139.     public int getNumberOfUniqueHosts() {
  140.       return 0;
  141.     }
  142.     @Override
  143.     public Collection<TaskTrackerStatus> taskTrackers() {
  144.       return trackers.values();
  145.     }
  146.     @Override
  147.     public void addJobInProgressListener(JobInProgressListener listener) {
  148.       listeners.add(listener);
  149.     }
  150.     @Override
  151.     public void removeJobInProgressListener(JobInProgressListener listener) {
  152.       listeners.remove(listener);
  153.     }
  154.     
  155.     @Override
  156.     public QueueManager getQueueManager() {
  157.       return queueManager;
  158.     }
  159.     
  160.     @Override
  161.     public int getNextHeartbeatInterval() {
  162.       return MRConstants.HEARTBEAT_INTERVAL_MIN;
  163.     }
  164.     @Override
  165.     public void killJob(JobID jobid) {
  166.       return;
  167.     }
  168.     @Override
  169.     public JobInProgress getJob(JobID jobid) {
  170.       return null;
  171.     }
  172.     // Test methods
  173.     
  174.     public void submitJob(JobInProgress job) throws IOException {
  175.       for (JobInProgressListener listener : listeners) {
  176.         listener.jobAdded(job);
  177.       }
  178.     }
  179.     
  180.     public TaskTrackerStatus getTaskTracker(String trackerID) {
  181.       return trackers.get(trackerID);
  182.     }
  183.     
  184.     public void update(String taskTrackerName, final Task t) {
  185.       if (t.isMapTask()) {
  186.         maps++;
  187.       } else {
  188.         reduces++;
  189.       }
  190.       TaskStatus status = new TaskStatus() {
  191.         @Override
  192.         public boolean getIsMap() {
  193.           return t.isMapTask();
  194.         }
  195.       };
  196.       status.setRunState(TaskStatus.State.RUNNING);
  197.       trackers.get(taskTrackerName).getTaskReports().add(status);
  198.     }
  199.     
  200.   }
  201.   
  202.   protected JobConf jobConf;
  203.   protected TaskScheduler scheduler;
  204.   private FakeTaskTrackerManager taskTrackerManager;
  205.   @Override
  206.   protected void setUp() throws Exception {
  207.     resetCounters();
  208.     jobConf = new JobConf();
  209.     jobConf.setNumMapTasks(10);
  210.     jobConf.setNumReduceTasks(10);
  211.     taskTrackerManager = new FakeTaskTrackerManager();
  212.     scheduler = createTaskScheduler();
  213.     scheduler.setConf(jobConf);
  214.     scheduler.setTaskTrackerManager(taskTrackerManager);
  215.     scheduler.start();
  216.   }
  217.   
  218.   @Override
  219.   protected void tearDown() throws Exception {
  220.     if (scheduler != null) {
  221.       scheduler.terminate();
  222.     }
  223.   }
  224.   
  225.   protected TaskScheduler createTaskScheduler() {
  226.     return new JobQueueTaskScheduler();
  227.   }
  228.   
  229.   static void submitJobs(FakeTaskTrackerManager taskTrackerManager, JobConf jobConf, 
  230.                          int numJobs, int state)
  231.     throws IOException {
  232.     for (int i = 0; i < numJobs; i++) {
  233.       JobInProgress job = new FakeJobInProgress(jobConf, taskTrackerManager);
  234.       job.getStatus().setRunState(state);
  235.       taskTrackerManager.submitJob(job);
  236.     }
  237.   }
  238.   public void testTaskNotAssignedWhenNoJobsArePresent() throws IOException {
  239.     assertEquals(0, scheduler.assignTasks(tracker(taskTrackerManager, "tt1")).size());
  240.   }
  241.   public void testNonRunningJobsAreIgnored() throws IOException {
  242.     submitJobs(taskTrackerManager, jobConf, 1, JobStatus.PREP);
  243.     submitJobs(taskTrackerManager, jobConf, 1, JobStatus.SUCCEEDED);
  244.     submitJobs(taskTrackerManager, jobConf, 1, JobStatus.FAILED);
  245.     submitJobs(taskTrackerManager, jobConf, 1, JobStatus.KILLED);
  246.     assertEquals(0, scheduler.assignTasks(tracker(taskTrackerManager, "tt1")).size());
  247.   }
  248.   
  249.   public void testDefaultTaskAssignment() throws IOException {
  250.     submitJobs(taskTrackerManager, jobConf, 2, JobStatus.RUNNING);
  251.     // All slots are filled with job 1
  252.     checkAssignment(scheduler, tracker(taskTrackerManager, "tt1"), 
  253.                     new String[] {"attempt_test_0001_m_000001_0 on tt1", 
  254.                                   "attempt_test_0001_m_000002_0 on tt1", 
  255.                                   "attempt_test_0001_r_000003_0 on tt1"});
  256.     checkAssignment(scheduler, tracker(taskTrackerManager, "tt1"), 
  257.                     new String[] {"attempt_test_0001_r_000004_0 on tt1"});
  258.     checkAssignment(scheduler, tracker(taskTrackerManager, "tt1"), new String[] {});
  259.     checkAssignment(scheduler, tracker(taskTrackerManager, "tt2"), 
  260.                     new String[] {"attempt_test_0001_m_000005_0 on tt2", 
  261.                                          "attempt_test_0001_m_000006_0 on tt2", 
  262.                                          "attempt_test_0001_r_000007_0 on tt2"});
  263.     checkAssignment(scheduler, tracker(taskTrackerManager, "tt2"), 
  264.                     new String[] {"attempt_test_0001_r_000008_0 on tt2"});
  265.     checkAssignment(scheduler, tracker(taskTrackerManager, "tt2"), new String[] {});
  266.     checkAssignment(scheduler, tracker(taskTrackerManager, "tt1"), new String[] {});
  267.     checkAssignment(scheduler, tracker(taskTrackerManager, "tt2"), new String[] {});
  268.   }
  269.   static TaskTrackerStatus tracker(FakeTaskTrackerManager taskTrackerManager,
  270.                                       String taskTrackerName) {
  271.     return taskTrackerManager.getTaskTracker(taskTrackerName);
  272.   }
  273.   
  274.   static void checkAssignment(TaskScheduler scheduler, TaskTrackerStatus tts,
  275.       String[] expectedTaskStrings) throws IOException {
  276.     List<Task> tasks = scheduler.assignTasks(tts);
  277.     assertNotNull(tasks);
  278.     assertEquals(expectedTaskStrings.length, tasks.size());
  279.     for (int i=0; i < expectedTaskStrings.length; ++i) {
  280.       assertEquals(expectedTaskStrings[i], tasks.get(i).toString());
  281.     }
  282.   }
  283.   
  284. }