TestFairScheduler.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:50k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.File;
  20. import java.io.FileWriter;
  21. import java.io.IOException;
  22. import java.io.PrintWriter;
  23. import java.util.ArrayList;
  24. import java.util.Collection;
  25. import java.util.HashMap;
  26. import java.util.List;
  27. import java.util.Map;
  28. import junit.framework.TestCase;
  29. import org.apache.hadoop.io.BytesWritable;
  30. import org.apache.hadoop.mapred.JobStatus;
  31. import org.apache.hadoop.mapred.FairScheduler.JobInfo;
  32. public class TestFairScheduler extends TestCase {
  33.   final static String TEST_DIR = new File(System.getProperty("test.build.data",
  34.       "build/contrib/streaming/test/data")).getAbsolutePath();
  35.   final static String ALLOC_FILE = new File(TEST_DIR, 
  36.       "test-pools").getAbsolutePath();
  37.   
  38.   private static final String POOL_PROPERTY = "pool";
  39.   
  40.   private static int jobCounter;
  41.   private static int taskCounter;
  42.   
  43.   static class FakeJobInProgress extends JobInProgress {
  44.     
  45.     private FakeTaskTrackerManager taskTrackerManager;
  46.     
  47.     public FakeJobInProgress(JobConf jobConf,
  48.         FakeTaskTrackerManager taskTrackerManager) throws IOException {
  49.       super(new JobID("test", ++jobCounter), jobConf);
  50.       this.taskTrackerManager = taskTrackerManager;
  51.       this.startTime = System.currentTimeMillis();
  52.       this.status = new JobStatus();
  53.       this.status.setRunState(JobStatus.PREP);
  54.     }
  55.     
  56.     @Override
  57.     public synchronized void initTasks() throws IOException {
  58.       // do nothing
  59.     }
  60.     @Override
  61.     public Task obtainNewMapTask(final TaskTrackerStatus tts, int clusterSize,
  62.         int ignored) throws IOException {
  63.       TaskAttemptID attemptId = getTaskAttemptID(true);
  64.       Task task = new MapTask("", attemptId, 0, "", new BytesWritable()) {
  65.         @Override
  66.         public String toString() {
  67.           return String.format("%s on %s", getTaskID(), tts.getTrackerName());
  68.         }
  69.       };
  70.       taskTrackerManager.startTask(tts.getTrackerName(), task);
  71.       runningMapTasks++;
  72.       return task;
  73.     }
  74.     
  75.     @Override
  76.     public Task obtainNewReduceTask(final TaskTrackerStatus tts,
  77.         int clusterSize, int ignored) throws IOException {
  78.       TaskAttemptID attemptId = getTaskAttemptID(false);
  79.       Task task = new ReduceTask("", attemptId, 0, 10) {
  80.         @Override
  81.         public String toString() {
  82.           return String.format("%s on %s", getTaskID(), tts.getTrackerName());
  83.         }
  84.       };
  85.       taskTrackerManager.startTask(tts.getTrackerName(), task);
  86.       runningReduceTasks++;
  87.       return task;
  88.     }
  89.     
  90.     private TaskAttemptID getTaskAttemptID(boolean isMap) {
  91.       JobID jobId = getJobID();
  92.       return new TaskAttemptID(jobId.getJtIdentifier(),
  93.           jobId.getId(), isMap, ++taskCounter, 0);
  94.     }
  95.   }
  96.   
  97.   static class FakeTaskTrackerManager implements TaskTrackerManager {
  98.     int maps = 0;
  99.     int reduces = 0;
  100.     int maxMapTasksPerTracker = 2;
  101.     int maxReduceTasksPerTracker = 2;
  102.     List<JobInProgressListener> listeners =
  103.       new ArrayList<JobInProgressListener>();
  104.     
  105.     private Map<String, TaskTrackerStatus> trackers =
  106.       new HashMap<String, TaskTrackerStatus>();
  107.     private Map<String, TaskStatus> taskStatuses = 
  108.       new HashMap<String, TaskStatus>();
  109.     public FakeTaskTrackerManager() {
  110.       trackers.put("tt1", new TaskTrackerStatus("tt1", "tt1.host", 1,
  111.           new ArrayList<TaskStatus>(), 0,
  112.           maxMapTasksPerTracker, maxReduceTasksPerTracker));
  113.       trackers.put("tt2", new TaskTrackerStatus("tt2", "tt2.host", 2,
  114.           new ArrayList<TaskStatus>(), 0,
  115.           maxMapTasksPerTracker, maxReduceTasksPerTracker));
  116.     }
  117.     
  118.     @Override
  119.     public ClusterStatus getClusterStatus() {
  120.       int numTrackers = trackers.size();
  121.       return new ClusterStatus(numTrackers, maps, reduces,
  122.           numTrackers * maxMapTasksPerTracker,
  123.           numTrackers * maxReduceTasksPerTracker,
  124.           JobTracker.State.RUNNING);
  125.     }
  126.     @Override
  127.     public QueueManager getQueueManager() {
  128.       return null;
  129.     }
  130.     
  131.     @Override
  132.     public int getNumberOfUniqueHosts() {
  133.       return 0;
  134.     }
  135.     @Override
  136.     public Collection<TaskTrackerStatus> taskTrackers() {
  137.       return trackers.values();
  138.     }
  139.     @Override
  140.     public void addJobInProgressListener(JobInProgressListener listener) {
  141.       listeners.add(listener);
  142.     }
  143.     @Override
  144.     public void removeJobInProgressListener(JobInProgressListener listener) {
  145.       listeners.remove(listener);
  146.     }
  147.     
  148.     @Override
  149.     public int getNextHeartbeatInterval() {
  150.       return MRConstants.HEARTBEAT_INTERVAL_MIN;
  151.     }
  152.     @Override
  153.     public void killJob(JobID jobid) {
  154.       return;
  155.     }
  156.     @Override
  157.     public JobInProgress getJob(JobID jobid) {
  158.       return null;
  159.     }
  160.     // Test methods
  161.     
  162.     public void submitJob(JobInProgress job) throws IOException {
  163.       for (JobInProgressListener listener : listeners) {
  164.         listener.jobAdded(job);
  165.       }
  166.     }
  167.     
  168.     public TaskTrackerStatus getTaskTracker(String trackerID) {
  169.       return trackers.get(trackerID);
  170.     }
  171.     
  172.     public void startTask(String taskTrackerName, final Task t) {
  173.       if (t.isMapTask()) {
  174.         maps++;
  175.       } else {
  176.         reduces++;
  177.       }
  178.       TaskStatus status = new TaskStatus() {
  179.         @Override
  180.         public boolean getIsMap() {
  181.           return t.isMapTask();
  182.         }
  183.       };
  184.       taskStatuses.put(t.getTaskID().toString(), status);
  185.       status.setRunState(TaskStatus.State.RUNNING);
  186.       trackers.get(taskTrackerName).getTaskReports().add(status);
  187.     }
  188.     
  189.     public void finishTask(String taskTrackerName, String tipId) {
  190.       TaskStatus status = taskStatuses.get(tipId);
  191.       if (status.getIsMap()) {
  192.         maps--;
  193.       } else {
  194.         reduces--;
  195.       }
  196.       status.setRunState(TaskStatus.State.SUCCEEDED);
  197.     }
  198.   }
  199.   
  200.   protected class FakeClock extends FairScheduler.Clock {
  201.     private long time = 0;
  202.     
  203.     public void advance(long millis) {
  204.       time += millis;
  205.     }
  206.     @Override
  207.     long getTime() {
  208.       return time;
  209.     }
  210.   }
  211.   
  212.   protected JobConf conf;
  213.   protected FairScheduler scheduler;
  214.   private FakeTaskTrackerManager taskTrackerManager;
  215.   private FakeClock clock;
  216.   @Override
  217.   protected void setUp() throws Exception {
  218.     jobCounter = 0;
  219.     taskCounter = 0;
  220.     new File(TEST_DIR).mkdirs(); // Make sure data directory exists
  221.     // Create an empty pools file (so we can add/remove pools later)
  222.     FileWriter fileWriter = new FileWriter(ALLOC_FILE);
  223.     fileWriter.write("<?xml version="1.0"?>n");
  224.     fileWriter.write("<allocations />n");
  225.     fileWriter.close();
  226.     conf = new JobConf();
  227.     conf.set("mapred.fairscheduler.allocation.file", ALLOC_FILE);
  228.     conf.set("mapred.fairscheduler.poolnameproperty", POOL_PROPERTY);
  229.     taskTrackerManager = new FakeTaskTrackerManager();
  230.     clock = new FakeClock();
  231.     scheduler = new FairScheduler(clock, false);
  232.     scheduler.waitForMapsBeforeLaunchingReduces = false;
  233.     scheduler.setConf(conf);
  234.     scheduler.setTaskTrackerManager(taskTrackerManager);
  235.     scheduler.start();
  236.   }
  237.   
  238.   @Override
  239.   protected void tearDown() throws Exception {
  240.     if (scheduler != null) {
  241.       scheduler.terminate();
  242.     }
  243.   }
  244.   
  245.   private JobInProgress submitJob(int state, int maps, int reduces)
  246.       throws IOException {
  247.     return submitJob(state, maps, reduces, null);
  248.   }
  249.   
  250.   private JobInProgress submitJob(int state, int maps, int reduces, String pool)
  251.       throws IOException {
  252.     JobConf jobConf = new JobConf(conf);
  253.     jobConf.setNumMapTasks(maps);
  254.     jobConf.setNumReduceTasks(reduces);
  255.     if (pool != null)
  256.       jobConf.set(POOL_PROPERTY, pool);
  257.     JobInProgress job = new FakeJobInProgress(jobConf, taskTrackerManager);
  258.     job.getStatus().setRunState(state);
  259.     taskTrackerManager.submitJob(job);
  260.     job.startTime = clock.time;
  261.     return job;
  262.   }
  263.   
  264.   protected void submitJobs(int number, int state, int maps, int reduces)
  265.     throws IOException {
  266.     for (int i = 0; i < number; i++) {
  267.       submitJob(state, maps, reduces);
  268.     }
  269.   }
  270.   public void testAllocationFileParsing() throws Exception {
  271.     PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
  272.     out.println("<?xml version="1.0"?>");
  273.     out.println("<allocations>"); 
  274.     // Give pool A a minimum of 1 map, 2 reduces
  275.     out.println("<pool name="poolA">");
  276.     out.println("<minMaps>1</minMaps>");
  277.     out.println("<minReduces>2</minReduces>");
  278.     out.println("</pool>");
  279.     // Give pool B a minimum of 2 maps, 1 reduce
  280.     out.println("<pool name="poolB">");
  281.     out.println("<minMaps>2</minMaps>");
  282.     out.println("<minReduces>1</minReduces>");
  283.     out.println("</pool>");
  284.     // Give pool C min maps but no min reduces
  285.     out.println("<pool name="poolC">");
  286.     out.println("<minMaps>2</minMaps>");
  287.     out.println("</pool>");
  288.     // Give pool D a limit of 3 running jobs
  289.     out.println("<pool name="poolD">");
  290.     out.println("<maxRunningJobs>3</maxRunningJobs>");
  291.     out.println("</pool>");
  292.     // Set default limit of jobs per user to 5
  293.     out.println("<userMaxJobsDefault>5</userMaxJobsDefault>");
  294.     // Give user1 a limit of 10 jobs
  295.     out.println("<user name="user1">");
  296.     out.println("<maxRunningJobs>10</maxRunningJobs>");
  297.     out.println("</user>");
  298.     out.println("</allocations>"); 
  299.     out.close();
  300.     
  301.     PoolManager poolManager = scheduler.getPoolManager();
  302.     poolManager.reloadAllocs();
  303.     
  304.     assertEquals(5, poolManager.getPools().size()); // 4 in file + default pool
  305.     assertEquals(0, poolManager.getAllocation(Pool.DEFAULT_POOL_NAME,
  306.         TaskType.MAP));
  307.     assertEquals(0, poolManager.getAllocation(Pool.DEFAULT_POOL_NAME,
  308.         TaskType.REDUCE));
  309.     assertEquals(1, poolManager.getAllocation("poolA", TaskType.MAP));
  310.     assertEquals(2, poolManager.getAllocation("poolA", TaskType.REDUCE));
  311.     assertEquals(2, poolManager.getAllocation("poolB", TaskType.MAP));
  312.     assertEquals(1, poolManager.getAllocation("poolB", TaskType.REDUCE));
  313.     assertEquals(2, poolManager.getAllocation("poolC", TaskType.MAP));
  314.     assertEquals(0, poolManager.getAllocation("poolC", TaskType.REDUCE));
  315.     assertEquals(0, poolManager.getAllocation("poolD", TaskType.MAP));
  316.     assertEquals(0, poolManager.getAllocation("poolD", TaskType.REDUCE));
  317.     assertEquals(Integer.MAX_VALUE, poolManager.getPoolMaxJobs("poolA"));
  318.     assertEquals(3, poolManager.getPoolMaxJobs("poolD"));
  319.     assertEquals(10, poolManager.getUserMaxJobs("user1"));
  320.     assertEquals(5, poolManager.getUserMaxJobs("user2"));
  321.   }
  322.   
  323.   public void testTaskNotAssignedWhenNoJobsArePresent() throws IOException {
  324.     assertNull(scheduler.assignTasks(tracker("tt1")));
  325.   }
  326.   public void testNonRunningJobsAreIgnored() throws IOException {
  327.     submitJobs(1, JobStatus.PREP, 10, 10);
  328.     submitJobs(1, JobStatus.SUCCEEDED, 10, 10);
  329.     submitJobs(1, JobStatus.FAILED, 10, 10);
  330.     submitJobs(1, JobStatus.KILLED, 10, 10);
  331.     assertNull(scheduler.assignTasks(tracker("tt1")));
  332.     advanceTime(100); // Check that we still don't assign jobs after an update
  333.     assertNull(scheduler.assignTasks(tracker("tt1")));
  334.   }
  335.   /**
  336.    * This test contains two jobs with fewer required tasks than there are slots.
  337.    * We check that all tasks are assigned, but job 1 gets them first because it
  338.    * was submitted earlier.
  339.    */
  340.   public void testSmallJobs() throws IOException {
  341.     JobInProgress job1 = submitJob(JobStatus.RUNNING, 2, 1);
  342.     JobInfo info1 = scheduler.infos.get(job1);
  343.     
  344.     // Check scheduler variables
  345.     assertEquals(0,    info1.runningMaps);
  346.     assertEquals(0,    info1.runningReduces);
  347.     assertEquals(2,    info1.neededMaps);
  348.     assertEquals(1,    info1.neededReduces);
  349.     assertEquals(0,    info1.mapDeficit);
  350.     assertEquals(0,    info1.reduceDeficit);
  351.     assertEquals(4.0,  info1.mapFairShare);
  352.     assertEquals(4.0,  info1.reduceFairShare);
  353.     
  354.     // Advance time before submitting another job j2, to make j1 run before j2
  355.     // deterministically.
  356.     advanceTime(100);
  357.     JobInProgress job2 = submitJob(JobStatus.RUNNING, 1, 2);
  358.     JobInfo info2 = scheduler.infos.get(job2);
  359.     
  360.     // Check scheduler variables; the fair shares should now have been allocated
  361.     // equally between j1 and j2, but j1 should have (4 slots)*(100 ms) deficit
  362.     assertEquals(0,    info1.runningMaps);
  363.     assertEquals(0,    info1.runningReduces);
  364.     assertEquals(2,    info1.neededMaps);
  365.     assertEquals(1,    info1.neededReduces);
  366.     assertEquals(400,  info1.mapDeficit);
  367.     assertEquals(400,  info1.reduceDeficit);
  368.     assertEquals(2.0,  info1.mapFairShare);
  369.     assertEquals(2.0,  info1.reduceFairShare);
  370.     assertEquals(0,    info2.runningMaps);
  371.     assertEquals(0,    info2.runningReduces);
  372.     assertEquals(1,    info2.neededMaps);
  373.     assertEquals(2,    info2.neededReduces);
  374.     assertEquals(0,    info2.mapDeficit);
  375.     assertEquals(0,    info2.reduceDeficit);
  376.     assertEquals(2.0,  info2.mapFairShare);
  377.     assertEquals(2.0,  info2.reduceFairShare);
  378.     
  379.     // Assign tasks and check that all slots are filled with j1, then j2
  380.     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
  381.     checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
  382.     checkAssignment("tt1", "attempt_test_0001_r_000003_0 on tt1");
  383.     checkAssignment("tt1", "attempt_test_0002_r_000004_0 on tt1");
  384.     checkAssignment("tt2", "attempt_test_0002_m_000005_0 on tt2");
  385.     checkAssignment("tt2", "attempt_test_0002_r_000006_0 on tt2");
  386.     assertNull(scheduler.assignTasks(tracker("tt2")));
  387.     
  388.     // Check that the scheduler has started counting the tasks as running
  389.     // as soon as it launched them.
  390.     assertEquals(2,  info1.runningMaps);
  391.     assertEquals(1,  info1.runningReduces);
  392.     assertEquals(0,  info1.neededMaps);
  393.     assertEquals(0,  info1.neededReduces);
  394.     assertEquals(1,  info2.runningMaps);
  395.     assertEquals(2,  info2.runningReduces);
  396.     assertEquals(0, info2.neededMaps);
  397.     assertEquals(0, info2.neededReduces);
  398.   }
  399.   
  400.   /**
  401.    * This test begins by submitting two jobs with 10 maps and reduces each.
  402.    * The first job is submitted 100ms after the second, during which time no
  403.    * tasks run. After this, we assign tasks to all slots, which should all be
  404.    * from job 1. These run for 200ms, at which point job 2 now has a deficit
  405.    * of 400 while job 1 is down to a deficit of 0. We then finish all tasks and
  406.    * assign new ones, which should all be from job 2. These run for 50 ms,
  407.    * which is not enough time for job 2 to make up its deficit (it only makes up
  408.    * 100 ms of deficit). Finally we assign a new round of tasks, which should
  409.    * all be from job 2 again.
  410.    */
  411.   public void testLargeJobs() throws IOException {
  412.     JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
  413.     JobInfo info1 = scheduler.infos.get(job1);
  414.     
  415.     // Check scheduler variables
  416.     assertEquals(0,    info1.runningMaps);
  417.     assertEquals(0,    info1.runningReduces);
  418.     assertEquals(10,   info1.neededMaps);
  419.     assertEquals(10,   info1.neededReduces);
  420.     assertEquals(0,    info1.mapDeficit);
  421.     assertEquals(0,    info1.reduceDeficit);
  422.     assertEquals(4.0,  info1.mapFairShare);
  423.     assertEquals(4.0,  info1.reduceFairShare);
  424.     
  425.     // Advance time before submitting another job j2, to make j1 run before j2
  426.     // deterministically.
  427.     advanceTime(100);
  428.     JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10);
  429.     JobInfo info2 = scheduler.infos.get(job2);
  430.     
  431.     // Check scheduler variables; the fair shares should now have been allocated
  432.     // equally between j1 and j2, but j1 should have (4 slots)*(100 ms) deficit
  433.     assertEquals(0,    info1.runningMaps);
  434.     assertEquals(0,    info1.runningReduces);
  435.     assertEquals(10,   info1.neededMaps);
  436.     assertEquals(10,   info1.neededReduces);
  437.     assertEquals(400,  info1.mapDeficit);
  438.     assertEquals(400,  info1.reduceDeficit);
  439.     assertEquals(2.0,  info1.mapFairShare);
  440.     assertEquals(2.0,  info1.reduceFairShare);
  441.     assertEquals(0,    info2.runningMaps);
  442.     assertEquals(0,    info2.runningReduces);
  443.     assertEquals(10,   info2.neededMaps);
  444.     assertEquals(10,   info2.neededReduces);
  445.     assertEquals(0,    info2.mapDeficit);
  446.     assertEquals(0,    info2.reduceDeficit);
  447.     assertEquals(2.0,  info2.mapFairShare);
  448.     assertEquals(2.0,  info2.reduceFairShare);
  449.     
  450.     // Assign tasks and check that all slots are initially filled with job 1
  451.     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
  452.     checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
  453.     checkAssignment("tt1", "attempt_test_0001_r_000003_0 on tt1");
  454.     checkAssignment("tt1", "attempt_test_0001_r_000004_0 on tt1");
  455.     checkAssignment("tt2", "attempt_test_0001_m_000005_0 on tt2");
  456.     checkAssignment("tt2", "attempt_test_0001_m_000006_0 on tt2");
  457.     checkAssignment("tt2", "attempt_test_0001_r_000007_0 on tt2");
  458.     checkAssignment("tt2", "attempt_test_0001_r_000008_0 on tt2");
  459.     
  460.     // Check that the scheduler has started counting the tasks as running
  461.     // as soon as it launched them.
  462.     assertEquals(4,  info1.runningMaps);
  463.     assertEquals(4,  info1.runningReduces);
  464.     assertEquals(6,  info1.neededMaps);
  465.     assertEquals(6,  info1.neededReduces);
  466.     assertEquals(0,  info2.runningMaps);
  467.     assertEquals(0,  info2.runningReduces);
  468.     assertEquals(10, info2.neededMaps);
  469.     assertEquals(10, info2.neededReduces);
  470.     
  471.     // Finish up the tasks and advance time again. Note that we must finish
  472.     // the task since FakeJobInProgress does not properly maintain running
  473.     // tasks, so the scheduler will always get an empty task list from
  474.     // the JobInProgress's getMapTasks/getReduceTasks and think they finished.
  475.     taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0");
  476.     taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000002_0");
  477.     taskTrackerManager.finishTask("tt1", "attempt_test_0001_r_000003_0");
  478.     taskTrackerManager.finishTask("tt1", "attempt_test_0001_r_000004_0");
  479.     taskTrackerManager.finishTask("tt2", "attempt_test_0001_m_000005_0");
  480.     taskTrackerManager.finishTask("tt2", "attempt_test_0001_m_000006_0");
  481.     taskTrackerManager.finishTask("tt2", "attempt_test_0001_r_000007_0");
  482.     taskTrackerManager.finishTask("tt2", "attempt_test_0001_r_000008_0");
  483.     advanceTime(200);
  484.     assertEquals(0,   info1.runningMaps);
  485.     assertEquals(0,   info1.runningReduces);
  486.     assertEquals(0,   info1.mapDeficit);
  487.     assertEquals(0,   info1.reduceDeficit);
  488.     assertEquals(0,   info2.runningMaps);
  489.     assertEquals(0,   info2.runningReduces);
  490.     assertEquals(400, info2.mapDeficit);
  491.     assertEquals(400, info2.reduceDeficit);
  492.     // Assign tasks and check that all slots are now filled with job 2
  493.     checkAssignment("tt1", "attempt_test_0002_m_000009_0 on tt1");
  494.     checkAssignment("tt1", "attempt_test_0002_m_000010_0 on tt1");
  495.     checkAssignment("tt1", "attempt_test_0002_r_000011_0 on tt1");
  496.     checkAssignment("tt1", "attempt_test_0002_r_000012_0 on tt1");
  497.     checkAssignment("tt2", "attempt_test_0002_m_000013_0 on tt2");
  498.     checkAssignment("tt2", "attempt_test_0002_m_000014_0 on tt2");
  499.     checkAssignment("tt2", "attempt_test_0002_r_000015_0 on tt2");
  500.     checkAssignment("tt2", "attempt_test_0002_r_000016_0 on tt2");
  501.     // Finish up the tasks and advance time again, but give job 2 only 50ms.
  502.     taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000009_0");
  503.     taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000010_0");
  504.     taskTrackerManager.finishTask("tt1", "attempt_test_0002_r_000011_0");
  505.     taskTrackerManager.finishTask("tt1", "attempt_test_0002_r_000012_0");
  506.     taskTrackerManager.finishTask("tt2", "attempt_test_0002_m_000013_0");
  507.     taskTrackerManager.finishTask("tt2", "attempt_test_0002_m_000014_0");
  508.     taskTrackerManager.finishTask("tt2", "attempt_test_0002_r_000015_0");
  509.     taskTrackerManager.finishTask("tt2", "attempt_test_0002_r_000016_0");
  510.     advanceTime(50);
  511.     assertEquals(0,   info1.runningMaps);
  512.     assertEquals(0,   info1.runningReduces);
  513.     assertEquals(100, info1.mapDeficit);
  514.     assertEquals(100, info1.reduceDeficit);
  515.     assertEquals(0,   info2.runningMaps);
  516.     assertEquals(0,   info2.runningReduces);
  517.     assertEquals(300, info2.mapDeficit);
  518.     assertEquals(300, info2.reduceDeficit);
  519.     // Assign tasks and check that all slots are now still with job 2
  520.     checkAssignment("tt1", "attempt_test_0002_m_000017_0 on tt1");
  521.     checkAssignment("tt1", "attempt_test_0002_m_000018_0 on tt1");
  522.     checkAssignment("tt1", "attempt_test_0002_r_000019_0 on tt1");
  523.     checkAssignment("tt1", "attempt_test_0002_r_000020_0 on tt1");
  524.     checkAssignment("tt2", "attempt_test_0002_m_000021_0 on tt2");
  525.     checkAssignment("tt2", "attempt_test_0002_m_000022_0 on tt2");
  526.     checkAssignment("tt2", "attempt_test_0002_r_000023_0 on tt2");
  527.     checkAssignment("tt2", "attempt_test_0002_r_000024_0 on tt2");
  528.   }
  529.   
  530.   /**
  531.    * We submit two jobs such that one has 2x the priority of the other, wait
  532.    * for 100 ms, and check that the weights/deficits are okay and that the
  533.    * tasks all go to the high-priority job.
  534.    */
  535.   public void testJobsWithPriorities() throws IOException {
  536.     JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
  537.     JobInfo info1 = scheduler.infos.get(job1);
  538.     JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10);
  539.     JobInfo info2 = scheduler.infos.get(job2);
  540.     job2.setPriority(JobPriority.HIGH);
  541.     scheduler.update();
  542.     
  543.     // Check scheduler variables
  544.     assertEquals(0,    info1.runningMaps);
  545.     assertEquals(0,    info1.runningReduces);
  546.     assertEquals(10,   info1.neededMaps);
  547.     assertEquals(10,   info1.neededReduces);
  548.     assertEquals(0,    info1.mapDeficit);
  549.     assertEquals(0,    info1.reduceDeficit);
  550.     assertEquals(1.33, info1.mapFairShare, 0.1);
  551.     assertEquals(1.33, info1.reduceFairShare, 0.1);
  552.     assertEquals(0,    info2.runningMaps);
  553.     assertEquals(0,    info2.runningReduces);
  554.     assertEquals(10,   info2.neededMaps);
  555.     assertEquals(10,   info2.neededReduces);
  556.     assertEquals(0,    info2.mapDeficit);
  557.     assertEquals(0,    info2.reduceDeficit);
  558.     assertEquals(2.66, info2.mapFairShare, 0.1);
  559.     assertEquals(2.66, info2.reduceFairShare, 0.1);
  560.     
  561.     // Advance time and check deficits
  562.     advanceTime(100);
  563.     assertEquals(133,  info1.mapDeficit, 1.0);
  564.     assertEquals(133,  info1.reduceDeficit, 1.0);
  565.     assertEquals(266,  info2.mapDeficit, 1.0);
  566.     assertEquals(266,  info2.reduceDeficit, 1.0);
  567.     
  568.     // Assign tasks and check that all slots are filled with j1, then j2
  569.     checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
  570.     checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
  571.     checkAssignment("tt1", "attempt_test_0002_r_000003_0 on tt1");
  572.     checkAssignment("tt1", "attempt_test_0002_r_000004_0 on tt1");
  573.     checkAssignment("tt2", "attempt_test_0002_m_000005_0 on tt2");
  574.     checkAssignment("tt2", "attempt_test_0002_m_000006_0 on tt2");
  575.     checkAssignment("tt2", "attempt_test_0002_r_000007_0 on tt2");
  576.     checkAssignment("tt2", "attempt_test_0002_r_000008_0 on tt2");
  577.   }
  578.   
  579.   /**
  580.    * This test starts by submitting three large jobs:
  581.    * - job1 in the default pool, at time 0
  582.    * - job2 in poolA, with an allocation of 1 map / 2 reduces, at time 200
  583.    * - job3 in poolB, with an allocation of 2 maps / 1 reduce, at time 200
  584.    * 
  585.    * After this, we sleep 100ms, until time 300. At this point, job1 has the
  586.    * highest map deficit, job3 the second, and job2 the third. This is because
  587.    * job3 has more maps in its min share than job2, but job1 has been around
  588.    * a long time at the beginning. The reduce deficits are similar, except job2
  589.    * comes before job3 because it had a higher reduce minimum share.
  590.    * 
  591.    * Finally, assign tasks to all slots. The maps should be assigned in the
  592.    * order job3, job2, job1 because 3 and 2 both have guaranteed slots and 3
  593.    * has a higher deficit. The reduces should be assigned as job2, job3, job1.
  594.    */
  595.   public void testLargeJobsWithPools() throws Exception {
  596.     // Set up pools file
  597.     PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
  598.     out.println("<?xml version="1.0"?>");
  599.     out.println("<allocations>");
  600.     // Give pool A a minimum of 1 map, 2 reduces
  601.     out.println("<pool name="poolA">");
  602.     out.println("<minMaps>1</minMaps>");
  603.     out.println("<minReduces>2</minReduces>");
  604.     out.println("</pool>");
  605.     // Give pool B a minimum of 2 maps, 1 reduce
  606.     out.println("<pool name="poolB">");
  607.     out.println("<minMaps>2</minMaps>");
  608.     out.println("<minReduces>1</minReduces>");
  609.     out.println("</pool>");
  610.     out.println("</allocations>");
  611.     out.close();
  612.     scheduler.getPoolManager().reloadAllocs();
  613.     
  614.     JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
  615.     JobInfo info1 = scheduler.infos.get(job1);
  616.     
  617.     // Check scheduler variables
  618.     assertEquals(0,    info1.runningMaps);
  619.     assertEquals(0,    info1.runningReduces);
  620.     assertEquals(10,   info1.neededMaps);
  621.     assertEquals(10,   info1.neededReduces);
  622.     assertEquals(0,    info1.mapDeficit);
  623.     assertEquals(0,    info1.reduceDeficit);
  624.     assertEquals(4.0,  info1.mapFairShare);
  625.     assertEquals(4.0,  info1.reduceFairShare);
  626.     
  627.     // Advance time 200ms and submit jobs 2 and 3
  628.     advanceTime(200);
  629.     assertEquals(800,  info1.mapDeficit);
  630.     assertEquals(800,  info1.reduceDeficit);
  631.     JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "poolA");
  632.     JobInfo info2 = scheduler.infos.get(job2);
  633.     JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "poolB");
  634.     JobInfo info3 = scheduler.infos.get(job3);
  635.     
  636.     // Check that minimum and fair shares have been allocated
  637.     assertEquals(0,    info1.minMaps);
  638.     assertEquals(0,    info1.minReduces);
  639.     assertEquals(1.0,  info1.mapFairShare);
  640.     assertEquals(1.0,  info1.reduceFairShare);
  641.     assertEquals(1,    info2.minMaps);
  642.     assertEquals(2,    info2.minReduces);
  643.     assertEquals(1.0,  info2.mapFairShare);
  644.     assertEquals(2.0,  info2.reduceFairShare);
  645.     assertEquals(2,    info3.minMaps);
  646.     assertEquals(1,    info3.minReduces);
  647.     assertEquals(2.0,  info3.mapFairShare);
  648.     assertEquals(1.0,  info3.reduceFairShare);
  649.     
  650.     // Advance time 100ms and check deficits
  651.     advanceTime(100);
  652.     assertEquals(900,  info1.mapDeficit);
  653.     assertEquals(900,  info1.reduceDeficit);
  654.     assertEquals(100,  info2.mapDeficit);
  655.     assertEquals(200,  info2.reduceDeficit);
  656.     assertEquals(200,  info3.mapDeficit);
  657.     assertEquals(100,  info3.reduceDeficit);
  658.     
  659.     // Assign tasks and check that slots are first given to needy jobs
  660.     checkAssignment("tt1", "attempt_test_0003_m_000001_0 on tt1");
  661.     checkAssignment("tt1", "attempt_test_0003_m_000002_0 on tt1");
  662.     checkAssignment("tt1", "attempt_test_0002_r_000003_0 on tt1");
  663.     checkAssignment("tt1", "attempt_test_0002_r_000004_0 on tt1");
  664.     checkAssignment("tt2", "attempt_test_0002_m_000005_0 on tt2");
  665.     checkAssignment("tt2", "attempt_test_0001_m_000006_0 on tt2");
  666.     checkAssignment("tt2", "attempt_test_0003_r_000007_0 on tt2");
  667.     checkAssignment("tt2", "attempt_test_0001_r_000008_0 on tt2");
  668.   }
  669.   /**
  670.    * This test starts by submitting three large jobs:
  671.    * - job1 in the default pool, at time 0
  672.    * - job2 in poolA, with an allocation of 2 maps / 2 reduces, at time 200
  673.    * - job3 in poolA, with an allocation of 2 maps / 2 reduces, at time 300
  674.    * 
  675.    * After this, we sleep 100ms, until time 400. At this point, job1 has the
  676.    * highest deficit, job2 the second, and job3 the third. The first two tasks
  677.    * should be assigned to job2 and job3 since they are in a pool with an
  678.    * allocation guarantee, but the next two slots should be assigned to job 3
  679.    * because the pool will no longer be needy.
  680.    */
  681.   public void testLargeJobsWithExcessCapacity() throws Exception {
  682.     // Set up pools file
  683.     PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
  684.     out.println("<?xml version="1.0"?>");
  685.     out.println("<allocations>");
  686.     // Give pool A a minimum of 2 maps, 2 reduces
  687.     out.println("<pool name="poolA">");
  688.     out.println("<minMaps>2</minMaps>");
  689.     out.println("<minReduces>2</minReduces>");
  690.     out.println("</pool>");
  691.     out.println("</allocations>");
  692.     out.close();
  693.     scheduler.getPoolManager().reloadAllocs();
  694.     
  695.     JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
  696.     JobInfo info1 = scheduler.infos.get(job1);
  697.     
  698.     // Check scheduler variables
  699.     assertEquals(0,    info1.runningMaps);
  700.     assertEquals(0,    info1.runningReduces);
  701.     assertEquals(10,   info1.neededMaps);
  702.     assertEquals(10,   info1.neededReduces);
  703.     assertEquals(0,    info1.mapDeficit);
  704.     assertEquals(0,    info1.reduceDeficit);
  705.     assertEquals(4.0,  info1.mapFairShare);
  706.     assertEquals(4.0,  info1.reduceFairShare);
  707.     
  708.     // Advance time 200ms and submit job 2
  709.     advanceTime(200);
  710.     assertEquals(800,  info1.mapDeficit);
  711.     assertEquals(800,  info1.reduceDeficit);
  712.     JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "poolA");
  713.     JobInfo info2 = scheduler.infos.get(job2);
  714.     
  715.     // Check that minimum and fair shares have been allocated
  716.     assertEquals(0,    info1.minMaps);
  717.     assertEquals(0,    info1.minReduces);
  718.     assertEquals(2.0,  info1.mapFairShare);
  719.     assertEquals(2.0,  info1.reduceFairShare);
  720.     assertEquals(2,    info2.minMaps);
  721.     assertEquals(2,    info2.minReduces);
  722.     assertEquals(2.0,  info2.mapFairShare);
  723.     assertEquals(2.0,  info2.reduceFairShare);
  724.     
  725.     // Advance time 100ms and submit job 3
  726.     advanceTime(100);
  727.     assertEquals(1000, info1.mapDeficit);
  728.     assertEquals(1000, info1.reduceDeficit);
  729.     assertEquals(200,  info2.mapDeficit);
  730.     assertEquals(200,  info2.reduceDeficit);
  731.     JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "poolA");
  732.     JobInfo info3 = scheduler.infos.get(job3);
  733.     
  734.     // Check that minimum and fair shares have been allocated
  735.     assertEquals(0,    info1.minMaps);
  736.     assertEquals(0,    info1.minReduces);
  737.     assertEquals(2,    info1.mapFairShare, 0.1);
  738.     assertEquals(2,    info1.reduceFairShare, 0.1);
  739.     assertEquals(1,    info2.minMaps);
  740.     assertEquals(1,    info2.minReduces);
  741.     assertEquals(1,    info2.mapFairShare, 0.1);
  742.     assertEquals(1,    info2.reduceFairShare, 0.1);
  743.     assertEquals(1,    info3.minMaps);
  744.     assertEquals(1,    info3.minReduces);
  745.     assertEquals(1,    info3.mapFairShare, 0.1);
  746.     assertEquals(1,    info3.reduceFairShare, 0.1);
  747.     
  748.     // Advance time 100ms and check deficits
  749.     advanceTime(100);
  750.     assertEquals(1200, info1.mapDeficit, 1.0);
  751.     assertEquals(1200, info1.reduceDeficit, 1.0);
  752.     assertEquals(300,  info2.mapDeficit, 1.0);
  753.     assertEquals(300,  info2.reduceDeficit, 1.0);
  754.     assertEquals(100,  info3.mapDeficit, 1.0);
  755.     assertEquals(100,  info3.reduceDeficit, 1.0);
  756.     
  757.     // Assign tasks and check that slots are first given to needy jobs, but
  758.     // that job 1 gets two tasks after due to having a larger deficit.
  759.     checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
  760.     checkAssignment("tt1", "attempt_test_0003_m_000002_0 on tt1");
  761.     checkAssignment("tt1", "attempt_test_0002_r_000003_0 on tt1");
  762.     checkAssignment("tt1", "attempt_test_0003_r_000004_0 on tt1");
  763.     checkAssignment("tt2", "attempt_test_0001_m_000005_0 on tt2");
  764.     checkAssignment("tt2", "attempt_test_0001_m_000006_0 on tt2");
  765.     checkAssignment("tt2", "attempt_test_0001_r_000007_0 on tt2");
  766.     checkAssignment("tt2", "attempt_test_0001_r_000008_0 on tt2");
  767.   }
  768.   
  769.   /**
  770.    * This test starts by submitting two jobs at time 0:
  771.    * - job1 in the default pool
  772.    * - job2, with 1 map and 1 reduce, in poolA, which has an alloc of 4
  773.    *   maps and 4 reduces
  774.    * 
  775.    * When we assign the slots, job2 should only get 1 of each type of task.
  776.    * 
  777.    * The fair share for job 2 should be 2.0 however, because even though it is
  778.    * running only one task, it accumulates deficit in case it will have failures
  779.    * or need speculative tasks later. (TODO: This may not be a good policy.)
  780.    */
  781.   public void testSmallJobInLargePool() throws Exception {
  782.     // Set up pools file
  783.     PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
  784.     out.println("<?xml version="1.0"?>");
  785.     out.println("<allocations>");
  786.     // Give pool A a minimum of 4 maps, 4 reduces
  787.     out.println("<pool name="poolA">");
  788.     out.println("<minMaps>4</minMaps>");
  789.     out.println("<minReduces>4</minReduces>");
  790.     out.println("</pool>");
  791.     out.println("</allocations>");
  792.     out.close();
  793.     scheduler.getPoolManager().reloadAllocs();
  794.     
  795.     JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
  796.     JobInfo info1 = scheduler.infos.get(job1);
  797.     JobInProgress job2 = submitJob(JobStatus.RUNNING, 1, 1, "poolA");
  798.     JobInfo info2 = scheduler.infos.get(job2);
  799.     
  800.     // Check scheduler variables
  801.     assertEquals(0,    info1.runningMaps);
  802.     assertEquals(0,    info1.runningReduces);
  803.     assertEquals(10,   info1.neededMaps);
  804.     assertEquals(10,   info1.neededReduces);
  805.     assertEquals(0,    info1.mapDeficit);
  806.     assertEquals(0,    info1.reduceDeficit);
  807.     assertEquals(2.0,  info1.mapFairShare);
  808.     assertEquals(2.0,  info1.reduceFairShare);
  809.     assertEquals(0,    info2.runningMaps);
  810.     assertEquals(0,    info2.runningReduces);
  811.     assertEquals(1,    info2.neededMaps);
  812.     assertEquals(1,    info2.neededReduces);
  813.     assertEquals(0,    info2.mapDeficit);
  814.     assertEquals(0,    info2.reduceDeficit);
  815.     assertEquals(2.0,  info2.mapFairShare);
  816.     assertEquals(2.0,  info2.reduceFairShare);
  817.     
  818.     // Assign tasks and check that slots are first given to needy jobs
  819.     checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
  820.     checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
  821.     checkAssignment("tt1", "attempt_test_0002_r_000003_0 on tt1");
  822.     checkAssignment("tt1", "attempt_test_0001_r_000004_0 on tt1");
  823.     checkAssignment("tt2", "attempt_test_0001_m_000005_0 on tt2");
  824.     checkAssignment("tt2", "attempt_test_0001_m_000006_0 on tt2");
  825.     checkAssignment("tt2", "attempt_test_0001_r_000007_0 on tt2");
  826.     checkAssignment("tt2", "attempt_test_0001_r_000008_0 on tt2");
  827.   }
  828.   
  829.   /**
  830.    * This test starts by submitting four jobs in the default pool. However, the
  831.    * maxRunningJobs limit for this pool has been set to two. We should see only
  832.    * the first two jobs get scheduled, each with half the total slots.
  833.    */
  834.   public void testPoolMaxJobs() throws Exception {
  835.     // Set up pools file
  836.     PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
  837.     out.println("<?xml version="1.0"?>");
  838.     out.println("<allocations>");
  839.     out.println("<pool name="default">");
  840.     out.println("<maxRunningJobs>2</maxRunningJobs>");
  841.     out.println("</pool>");
  842.     out.println("</allocations>");
  843.     out.close();
  844.     scheduler.getPoolManager().reloadAllocs();
  845.     
  846.     // Submit jobs, advancing time in-between to make sure that they are
  847.     // all submitted at distinct times.
  848.     JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
  849.     JobInfo info1 = scheduler.infos.get(job1);
  850.     advanceTime(10);
  851.     JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10);
  852.     JobInfo info2 = scheduler.infos.get(job2);
  853.     advanceTime(10);
  854.     JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10);
  855.     JobInfo info3 = scheduler.infos.get(job3);
  856.     advanceTime(10);
  857.     JobInProgress job4 = submitJob(JobStatus.RUNNING, 10, 10);
  858.     JobInfo info4 = scheduler.infos.get(job4);
  859.     
  860.     // Check scheduler variables
  861.     assertEquals(2.0,  info1.mapFairShare);
  862.     assertEquals(2.0,  info1.reduceFairShare);
  863.     assertEquals(2.0,  info2.mapFairShare);
  864.     assertEquals(2.0,  info2.reduceFairShare);
  865.     assertEquals(0.0,  info3.mapFairShare);
  866.     assertEquals(0.0,  info3.reduceFairShare);
  867.     assertEquals(0.0,  info4.mapFairShare);
  868.     assertEquals(0.0,  info4.reduceFairShare);
  869.     
  870.     // Assign tasks and check that slots are first to jobs 1 and 2
  871.     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
  872.     checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
  873.     checkAssignment("tt1", "attempt_test_0001_r_000003_0 on tt1");
  874.     checkAssignment("tt1", "attempt_test_0001_r_000004_0 on tt1");
  875.     advanceTime(100);
  876.     checkAssignment("tt2", "attempt_test_0002_m_000005_0 on tt2");
  877.     checkAssignment("tt2", "attempt_test_0002_m_000006_0 on tt2");
  878.     checkAssignment("tt2", "attempt_test_0002_r_000007_0 on tt2");
  879.     checkAssignment("tt2", "attempt_test_0002_r_000008_0 on tt2");
  880.   }
  881.   /**
  882.    * This test starts by submitting two jobs by user "user1" to the default
  883.    * pool, and two jobs by "user2". We set user1's job limit to 1. We should
  884.    * see one job from user1 and two from user2. 
  885.    */
  886.   public void testUserMaxJobs() throws Exception {
  887.     // Set up pools file
  888.     PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
  889.     out.println("<?xml version="1.0"?>");
  890.     out.println("<allocations>");
  891.     out.println("<user name="user1">");
  892.     out.println("<maxRunningJobs>1</maxRunningJobs>");
  893.     out.println("</user>");
  894.     out.println("</allocations>");
  895.     out.close();
  896.     scheduler.getPoolManager().reloadAllocs();
  897.     
  898.     // Submit jobs, advancing time in-between to make sure that they are
  899.     // all submitted at distinct times.
  900.     JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
  901.     job1.getJobConf().set("user.name", "user1");
  902.     JobInfo info1 = scheduler.infos.get(job1);
  903.     advanceTime(10);
  904.     JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10);
  905.     job2.getJobConf().set("user.name", "user1");
  906.     JobInfo info2 = scheduler.infos.get(job2);
  907.     advanceTime(10);
  908.     JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10);
  909.     job3.getJobConf().set("user.name", "user2");
  910.     JobInfo info3 = scheduler.infos.get(job3);
  911.     advanceTime(10);
  912.     JobInProgress job4 = submitJob(JobStatus.RUNNING, 10, 10);
  913.     job4.getJobConf().set("user.name", "user2");
  914.     JobInfo info4 = scheduler.infos.get(job4);
  915.     
  916.     // Check scheduler variables
  917.     assertEquals(1.33,  info1.mapFairShare, 0.1);
  918.     assertEquals(1.33,  info1.reduceFairShare, 0.1);
  919.     assertEquals(0.0,   info2.mapFairShare);
  920.     assertEquals(0.0,   info2.reduceFairShare);
  921.     assertEquals(1.33,  info3.mapFairShare, 0.1);
  922.     assertEquals(1.33,  info3.reduceFairShare, 0.1);
  923.     assertEquals(1.33,  info4.mapFairShare, 0.1);
  924.     assertEquals(1.33,  info4.reduceFairShare, 0.1);
  925.     
  926.     // Assign tasks and check that slots are first to jobs 1 and 3
  927.     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
  928.     checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
  929.     checkAssignment("tt1", "attempt_test_0001_r_000003_0 on tt1");
  930.     checkAssignment("tt1", "attempt_test_0001_r_000004_0 on tt1");
  931.     advanceTime(100);
  932.     checkAssignment("tt2", "attempt_test_0003_m_000005_0 on tt2");
  933.     checkAssignment("tt2", "attempt_test_0003_m_000006_0 on tt2");
  934.     checkAssignment("tt2", "attempt_test_0003_r_000007_0 on tt2");
  935.     checkAssignment("tt2", "attempt_test_0003_r_000008_0 on tt2");
  936.   }
  937.   
  938.   /**
  939.    * Test a combination of pool job limits and user job limits, the latter
  940.    * specified through both the userMaxJobsDefaults (for some users) and
  941.    * user-specific &lt;user&gt; elements in the allocations file. 
  942.    */
  943.   public void testComplexJobLimits() throws Exception {
  944.     // Set up pools file
  945.     PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
  946.     out.println("<?xml version="1.0"?>");
  947.     out.println("<allocations>");
  948.     out.println("<pool name="poolA">");
  949.     out.println("<maxRunningJobs>1</maxRunningJobs>");
  950.     out.println("</pool>");
  951.     out.println("<user name="user1">");
  952.     out.println("<maxRunningJobs>1</maxRunningJobs>");
  953.     out.println("</user>");
  954.     out.println("<user name="user2">");
  955.     out.println("<maxRunningJobs>10</maxRunningJobs>");
  956.     out.println("</user>");
  957.     out.println("<userMaxJobsDefault>2</userMaxJobsDefault>");
  958.     out.println("</allocations>");
  959.     out.close();
  960.     scheduler.getPoolManager().reloadAllocs();
  961.     
  962.     // Submit jobs, advancing time in-between to make sure that they are
  963.     // all submitted at distinct times.
  964.     
  965.     // Two jobs for user1; only one should get to run
  966.     JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
  967.     job1.getJobConf().set("user.name", "user1");
  968.     JobInfo info1 = scheduler.infos.get(job1);
  969.     advanceTime(10);
  970.     JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10);
  971.     job2.getJobConf().set("user.name", "user1");
  972.     JobInfo info2 = scheduler.infos.get(job2);
  973.     advanceTime(10);
  974.     
  975.     // Three jobs for user2; all should get to run
  976.     JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10);
  977.     job3.getJobConf().set("user.name", "user2");
  978.     JobInfo info3 = scheduler.infos.get(job3);
  979.     advanceTime(10);
  980.     JobInProgress job4 = submitJob(JobStatus.RUNNING, 10, 10);
  981.     job4.getJobConf().set("user.name", "user2");
  982.     JobInfo info4 = scheduler.infos.get(job4);
  983.     advanceTime(10);
  984.     JobInProgress job5 = submitJob(JobStatus.RUNNING, 10, 10);
  985.     job5.getJobConf().set("user.name", "user2");
  986.     JobInfo info5 = scheduler.infos.get(job5);
  987.     advanceTime(10);
  988.     
  989.     // Three jobs for user3; only two should get to run
  990.     JobInProgress job6 = submitJob(JobStatus.RUNNING, 10, 10);
  991.     job6.getJobConf().set("user.name", "user3");
  992.     JobInfo info6 = scheduler.infos.get(job6);
  993.     advanceTime(10);
  994.     JobInProgress job7 = submitJob(JobStatus.RUNNING, 10, 10);
  995.     job7.getJobConf().set("user.name", "user3");
  996.     JobInfo info7 = scheduler.infos.get(job7);
  997.     advanceTime(10);
  998.     JobInProgress job8 = submitJob(JobStatus.RUNNING, 10, 10);
  999.     job8.getJobConf().set("user.name", "user3");
  1000.     JobInfo info8 = scheduler.infos.get(job8);
  1001.     advanceTime(10);
  1002.     
  1003.     // Two jobs for user4, in poolA; only one should get to run
  1004.     JobInProgress job9 = submitJob(JobStatus.RUNNING, 10, 10, "poolA");
  1005.     job9.getJobConf().set("user.name", "user4");
  1006.     JobInfo info9 = scheduler.infos.get(job9);
  1007.     advanceTime(10);
  1008.     JobInProgress job10 = submitJob(JobStatus.RUNNING, 10, 10, "poolA");
  1009.     job10.getJobConf().set("user.name", "user4");
  1010.     JobInfo info10 = scheduler.infos.get(job10);
  1011.     advanceTime(10);
  1012.     
  1013.     // Check scheduler variables. The jobs in poolA should get half
  1014.     // the total share, while those in the default pool should get
  1015.     // the other half. This works out to 2 slots each for the jobs
  1016.     // in poolA and 1/3 each for the jobs in the default pool because
  1017.     // there are 2 runnable jobs in poolA and 6 jobs in the default pool.
  1018.     assertEquals(0.33,   info1.mapFairShare, 0.1);
  1019.     assertEquals(0.33,   info1.reduceFairShare, 0.1);
  1020.     assertEquals(0.0,    info2.mapFairShare);
  1021.     assertEquals(0.0,    info2.reduceFairShare);
  1022.     assertEquals(0.33,   info3.mapFairShare, 0.1);
  1023.     assertEquals(0.33,   info3.reduceFairShare, 0.1);
  1024.     assertEquals(0.33,   info4.mapFairShare, 0.1);
  1025.     assertEquals(0.33,   info4.reduceFairShare, 0.1);
  1026.     assertEquals(0.33,   info5.mapFairShare, 0.1);
  1027.     assertEquals(0.33,   info5.reduceFairShare, 0.1);
  1028.     assertEquals(0.33,   info6.mapFairShare, 0.1);
  1029.     assertEquals(0.33,   info6.reduceFairShare, 0.1);
  1030.     assertEquals(0.33,   info7.mapFairShare, 0.1);
  1031.     assertEquals(0.33,   info7.reduceFairShare, 0.1);
  1032.     assertEquals(0.0,    info8.mapFairShare);
  1033.     assertEquals(0.0,    info8.reduceFairShare);
  1034.     assertEquals(2.0,    info9.mapFairShare, 0.1);
  1035.     assertEquals(2.0,    info9.reduceFairShare, 0.1);
  1036.     assertEquals(0.0,    info10.mapFairShare);
  1037.     assertEquals(0.0,    info10.reduceFairShare);
  1038.   }
  1039.   
  1040.   public void testSizeBasedWeight() throws Exception {
  1041.     scheduler.sizeBasedWeight = true;
  1042.     JobInProgress job1 = submitJob(JobStatus.RUNNING, 2, 10);
  1043.     JobInProgress job2 = submitJob(JobStatus.RUNNING, 20, 1);
  1044.     assertTrue(scheduler.infos.get(job2).mapFairShare >
  1045.                scheduler.infos.get(job1).mapFairShare);
  1046.     assertTrue(scheduler.infos.get(job1).reduceFairShare >
  1047.                scheduler.infos.get(job2).reduceFairShare);
  1048.   }
  1049.   
  1050.   public void testWaitForMapsBeforeLaunchingReduces() {
  1051.     // We have set waitForMapsBeforeLaunchingReduces to false by default in
  1052.     // this class, so this should return true
  1053.     assertTrue(scheduler.enoughMapsFinishedToRunReduces(0, 100));
  1054.     
  1055.     // However, if we set waitForMapsBeforeLaunchingReduces to true, we should
  1056.     // now no longer be able to assign reduces until 5 have finished
  1057.     scheduler.waitForMapsBeforeLaunchingReduces = true;
  1058.     assertFalse(scheduler.enoughMapsFinishedToRunReduces(0, 100));
  1059.     assertFalse(scheduler.enoughMapsFinishedToRunReduces(1, 100));
  1060.     assertFalse(scheduler.enoughMapsFinishedToRunReduces(2, 100));
  1061.     assertFalse(scheduler.enoughMapsFinishedToRunReduces(3, 100));
  1062.     assertFalse(scheduler.enoughMapsFinishedToRunReduces(4, 100));
  1063.     assertTrue(scheduler.enoughMapsFinishedToRunReduces(5, 100));
  1064.     assertTrue(scheduler.enoughMapsFinishedToRunReduces(6, 100));
  1065.     
  1066.     // Also test some jobs that have very few maps, in which case we will
  1067.     // wait for at least 1 map to finish
  1068.     assertFalse(scheduler.enoughMapsFinishedToRunReduces(0, 5));
  1069.     assertTrue(scheduler.enoughMapsFinishedToRunReduces(1, 5));
  1070.     assertFalse(scheduler.enoughMapsFinishedToRunReduces(0, 1));
  1071.     assertTrue(scheduler.enoughMapsFinishedToRunReduces(1, 1));
  1072.   }
  1073.   
  1074.   /**
  1075.    * This test submits jobs in three pools: poolA, which has a weight
  1076.    * of 2.0; poolB, which has a weight of 0.5; and the default pool, which
  1077.    * should have a weight of 1.0. It then checks that the map and reduce
  1078.    * fair shares are given out accordingly. We then submit a second job to
  1079.    * pool B and check that each gets half of the pool (weight of 0.25).
  1080.    */
  1081.   public void testPoolWeights() throws Exception {
  1082.     // Set up pools file
  1083.     PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
  1084.     out.println("<?xml version="1.0"?>");
  1085.     out.println("<allocations>");
  1086.     out.println("<pool name="poolA">");
  1087.     out.println("<weight>2.0</weight>");
  1088.     out.println("</pool>");
  1089.     out.println("<pool name="poolB">");
  1090.     out.println("<weight>0.5</weight>");
  1091.     out.println("</pool>");
  1092.     out.println("</allocations>");
  1093.     out.close();
  1094.     scheduler.getPoolManager().reloadAllocs();
  1095.     
  1096.     // Submit jobs, advancing time in-between to make sure that they are
  1097.     // all submitted at distinct times.
  1098.     JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
  1099.     JobInfo info1 = scheduler.infos.get(job1);
  1100.     JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "poolA");
  1101.     JobInfo info2 = scheduler.infos.get(job2);
  1102.     JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "poolB");
  1103.     JobInfo info3 = scheduler.infos.get(job3);
  1104.     advanceTime(10);
  1105.     
  1106.     assertEquals(1.14,  info1.mapFairShare, 0.01);
  1107.     assertEquals(1.14,  info1.reduceFairShare, 0.01);
  1108.     assertEquals(2.28,  info2.mapFairShare, 0.01);
  1109.     assertEquals(2.28,  info2.reduceFairShare, 0.01);
  1110.     assertEquals(0.57,  info3.mapFairShare, 0.01);
  1111.     assertEquals(0.57,  info3.reduceFairShare, 0.01);
  1112.     
  1113.     JobInProgress job4 = submitJob(JobStatus.RUNNING, 10, 10, "poolB");
  1114.     JobInfo info4 = scheduler.infos.get(job4);
  1115.     advanceTime(10);
  1116.     
  1117.     assertEquals(1.14,  info1.mapFairShare, 0.01);
  1118.     assertEquals(1.14,  info1.reduceFairShare, 0.01);
  1119.     assertEquals(2.28,  info2.mapFairShare, 0.01);
  1120.     assertEquals(2.28,  info2.reduceFairShare, 0.01);
  1121.     assertEquals(0.28,  info3.mapFairShare, 0.01);
  1122.     assertEquals(0.28,  info3.reduceFairShare, 0.01);
  1123.     assertEquals(0.28,  info4.mapFairShare, 0.01);
  1124.     assertEquals(0.28,  info4.reduceFairShare, 0.01);
  1125.   }
  1126.   /**
  1127.    * This test submits jobs in two pools, poolA and poolB. None of the
  1128.    * jobs in poolA have maps, but this should not affect their reduce
  1129.    * share.
  1130.    */
  1131.   public void testPoolWeightsWhenNoMaps() throws Exception {
  1132.     // Set up pools file
  1133.     PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
  1134.     out.println("<?xml version="1.0"?>");
  1135.     out.println("<allocations>");
  1136.     out.println("<pool name="poolA">");
  1137.     out.println("<weight>2.0</weight>");
  1138.     out.println("</pool>");
  1139.     out.println("<pool name="poolB">");
  1140.     out.println("<weight>1.0</weight>");
  1141.     out.println("</pool>");
  1142.     out.println("</allocations>");
  1143.     out.close();
  1144.     scheduler.getPoolManager().reloadAllocs();
  1145.     
  1146.     // Submit jobs, advancing time in-between to make sure that they are
  1147.     // all submitted at distinct times.
  1148.     JobInProgress job1 = submitJob(JobStatus.RUNNING, 0, 10, "poolA");
  1149.     JobInfo info1 = scheduler.infos.get(job1);
  1150.     JobInProgress job2 = submitJob(JobStatus.RUNNING, 0, 10, "poolA");
  1151.     JobInfo info2 = scheduler.infos.get(job2);
  1152.     JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "poolB");
  1153.     JobInfo info3 = scheduler.infos.get(job3);
  1154.     advanceTime(10);
  1155.     
  1156.     assertEquals(0,     info1.mapWeight, 0.01);
  1157.     assertEquals(1.0,   info1.reduceWeight, 0.01);
  1158.     assertEquals(0,     info2.mapWeight, 0.01);
  1159.     assertEquals(1.0,   info2.reduceWeight, 0.01);
  1160.     assertEquals(1.0,   info3.mapWeight, 0.01);
  1161.     assertEquals(1.0,   info3.reduceWeight, 0.01);
  1162.     
  1163.     assertEquals(0,     info1.mapFairShare, 0.01);
  1164.     assertEquals(1.33,  info1.reduceFairShare, 0.01);
  1165.     assertEquals(0,     info2.mapFairShare, 0.01);
  1166.     assertEquals(1.33,  info2.reduceFairShare, 0.01);
  1167.     assertEquals(4,     info3.mapFairShare, 0.01);
  1168.     assertEquals(1.33,  info3.reduceFairShare, 0.01);
  1169.   }
  1170.   /**
  1171.    * Tests that max-running-tasks per node are set by assigning load
  1172.    * equally accross the cluster in CapBasedLoadManager.
  1173.    */
  1174.   public void testCapBasedLoadManager() {
  1175.     CapBasedLoadManager loadMgr = new CapBasedLoadManager();
  1176.     // Arguments to getCap: totalRunnableTasks, nodeCap, totalSlots
  1177.     // Desired behavior: return ceil(nodeCap * min(1, runnableTasks/totalSlots))
  1178.     assertEquals(1, loadMgr.getCap(1, 1, 100));
  1179.     assertEquals(1, loadMgr.getCap(1, 2, 100));
  1180.     assertEquals(1, loadMgr.getCap(1, 10, 100));
  1181.     assertEquals(1, loadMgr.getCap(200, 1, 100));
  1182.     assertEquals(1, loadMgr.getCap(1, 5, 100));
  1183.     assertEquals(3, loadMgr.getCap(50, 5, 100));
  1184.     assertEquals(5, loadMgr.getCap(100, 5, 100));
  1185.     assertEquals(5, loadMgr.getCap(200, 5, 100));
  1186.   }
  1187.   
  1188.   private void advanceTime(long time) {
  1189.     clock.advance(time);
  1190.     scheduler.update();
  1191.   }
  1192.   protected TaskTrackerStatus tracker(String taskTrackerName) {
  1193.     return taskTrackerManager.getTaskTracker(taskTrackerName);
  1194.   }
  1195.   
  1196.   protected void checkAssignment(String taskTrackerName,
  1197.       String expectedTaskString) throws IOException {
  1198.     List<Task> tasks = scheduler.assignTasks(tracker(taskTrackerName));
  1199.     assertNotNull(expectedTaskString, tasks);
  1200.     assertEquals(expectedTaskString, 1, tasks.size());
  1201.     assertEquals(expectedTaskString, tasks.get(0).toString());
  1202.   }
  1203.   
  1204. }