TestCapacityScheduler.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:95k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.IOException;
  20. import java.util.ArrayList;
  21. import java.util.Collection;
  22. import java.util.HashMap;
  23. import java.util.HashSet;
  24. import java.util.LinkedHashMap;
  25. import java.util.List;
  26. import java.util.Map;
  27. import java.util.Set;
  28. import java.util.TreeMap;
  29. import junit.framework.TestCase;
  30. import org.apache.commons.logging.Log;
  31. import org.apache.commons.logging.LogFactory;
  32. import org.apache.hadoop.io.BytesWritable;
  33. import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
  34. import org.apache.hadoop.util.StringUtils;
  35. import org.apache.hadoop.conf.Configuration;
  36. public class TestCapacityScheduler extends TestCase {
  37.   static final Log LOG =
  38.       LogFactory.getLog(org.apache.hadoop.mapred.TestCapacityScheduler.class);
  39.   private static int jobCounter;
  40.   
  41.   /**
  42.    * Test class that removes the asynchronous nature of job initialization.
  43.    * 
  44.    * The run method is a dummy which just waits for completion. It is
  45.    * expected that test code calls the main method, initializeJobs, directly
  46.    * to trigger initialization.
  47.    */
  48.   class ControlledJobInitializer extends 
  49.                               JobInitializationPoller.JobInitializationThread {
  50.     
  51.     boolean stopRunning;
  52.     
  53.     public ControlledJobInitializer(JobInitializationPoller p) {
  54.       p.super();
  55.     }
  56.     
  57.     @Override
  58.     public void run() {
  59.       while (!stopRunning) {
  60.         try {
  61.           synchronized(this) {
  62.             this.wait();  
  63.           }
  64.         } catch (InterruptedException ie) {
  65.           break;
  66.         }
  67.       }
  68.     }
  69.     
  70.     void stopRunning() {
  71.       stopRunning = true;
  72.     }
  73.   }
  74.   
  75.   /**
  76.    * Test class that removes the asynchronous nature of job initialization.
  77.    * 
  78.    * The run method is a dummy which just waits for completion. It is
  79.    * expected that test code calls the main method, selectJobsToInitialize,
  80.    * directly to trigger initialization.
  81.    * 
  82.    * The class also creates the test worker thread objects of type 
  83.    * ControlledJobInitializer instead of the objects of the actual class
  84.    */
  85.   class ControlledInitializationPoller extends JobInitializationPoller {
  86.     
  87.     private boolean stopRunning;
  88.     private ArrayList<ControlledJobInitializer> workers;
  89.     
  90.     public ControlledInitializationPoller(JobQueuesManager mgr,
  91.                                           CapacitySchedulerConf rmConf,
  92.                                           Set<String> queues) {
  93.       super(mgr, rmConf, queues);
  94.     }
  95.     
  96.     @Override
  97.     public void run() {
  98.       // don't do anything here.
  99.       while (!stopRunning) {
  100.         try {
  101.           synchronized (this) {
  102.             this.wait();
  103.           }
  104.         } catch (InterruptedException ie) {
  105.           break;
  106.         }
  107.       }
  108.     }
  109.     
  110.     @Override
  111.     JobInitializationThread createJobInitializationThread() {
  112.       ControlledJobInitializer t = new ControlledJobInitializer(this);
  113.       if (workers == null) {
  114.         workers = new ArrayList<ControlledJobInitializer>();
  115.       }
  116.       workers.add(t);
  117.       return t;
  118.     }
  119.     @Override
  120.     void selectJobsToInitialize() {
  121.       super.cleanUpInitializedJobsList();
  122.       super.selectJobsToInitialize();
  123.       for (ControlledJobInitializer t : workers) {
  124.         t.initializeJobs();
  125.       }
  126.     }
  127.     
  128.     void stopRunning() {
  129.       stopRunning = true;
  130.       for (ControlledJobInitializer t : workers) {
  131.         t.stopRunning();
  132.         t.interrupt();
  133.       }
  134.     }
  135.   }
  136.   private ControlledInitializationPoller controlledInitializationPoller;
  137.   static class FakeJobInProgress extends JobInProgress {
  138.     
  139.     private FakeTaskTrackerManager taskTrackerManager;
  140.     private int mapTaskCtr;
  141.     private int redTaskCtr;
  142.     private Set<TaskInProgress> mapTips = 
  143.       new HashSet<TaskInProgress>();
  144.     private Set<TaskInProgress> reduceTips = 
  145.       new HashSet<TaskInProgress>();
  146.     
  147.     public FakeJobInProgress(JobID jId, JobConf jobConf,
  148.         FakeTaskTrackerManager taskTrackerManager, String user) {
  149.       super(jId, jobConf);
  150.       this.taskTrackerManager = taskTrackerManager;
  151.       this.startTime = System.currentTimeMillis();
  152.       this.status = new JobStatus(jId, 0f, 0f, JobStatus.PREP);
  153.       this.status.setJobPriority(JobPriority.NORMAL);
  154.       this.status.setStartTime(startTime);
  155.       if (null == jobConf.getQueueName()) {
  156.         this.profile = new JobProfile(user, jId, 
  157.             null, null, null);
  158.       }
  159.       else {
  160.         this.profile = new JobProfile(user, jId, 
  161.             null, null, null, jobConf.getQueueName());
  162.       }
  163.       mapTaskCtr = 0;
  164.       redTaskCtr = 0;
  165.       super.setMaxVirtualMemoryForTask(jobConf.getMaxVirtualMemoryForTask());
  166.       super.setMaxPhysicalMemoryForTask(jobConf.getMaxPhysicalMemoryForTask());
  167.     }
  168.     
  169.     @Override
  170.     public synchronized void initTasks() throws IOException {
  171.       getStatus().setRunState(JobStatus.RUNNING);
  172.     }
  173.     @Override
  174.     public Task obtainNewMapTask(final TaskTrackerStatus tts, int clusterSize,
  175.         int ignored) throws IOException {
  176.       if (mapTaskCtr == numMapTasks) return null;
  177.       TaskAttemptID attemptId = getTaskAttemptID(true);
  178.       Task task = new MapTask("", attemptId, 0, "", new BytesWritable()) {
  179.         @Override
  180.         public String toString() {
  181.           return String.format("%s on %s", getTaskID(), tts.getTrackerName());
  182.         }
  183.       };
  184.       taskTrackerManager.startTask(tts.getTrackerName(), task);
  185.       runningMapTasks++;
  186.       // create a fake TIP and keep track of it
  187.       mapTips.add(new FakeTaskInProgress(getJobID(), 
  188.           getJobConf(), task, true, this));
  189.       return task;
  190.     }
  191.     
  192.     @Override
  193.     public Task obtainNewReduceTask(final TaskTrackerStatus tts,
  194.         int clusterSize, int ignored) throws IOException {
  195.       if (redTaskCtr == numReduceTasks) return null;
  196.       TaskAttemptID attemptId = getTaskAttemptID(false);
  197.       Task task = new ReduceTask("", attemptId, 0, 10) {
  198.         @Override
  199.         public String toString() {
  200.           return String.format("%s on %s", getTaskID(), tts.getTrackerName());
  201.         }
  202.       };
  203.       taskTrackerManager.startTask(tts.getTrackerName(), task);
  204.       runningReduceTasks++;
  205.       // create a fake TIP and keep track of it
  206.       reduceTips.add(new FakeTaskInProgress(getJobID(), 
  207.           getJobConf(), task, false, this));
  208.       return task;
  209.     }
  210.     
  211.     public void mapTaskFinished() {
  212.       runningMapTasks--;
  213.       finishedMapTasks++;
  214.     }
  215.     
  216.     public void reduceTaskFinished() {
  217.       runningReduceTasks--;
  218.       finishedReduceTasks++;
  219.     }
  220.     
  221.     private TaskAttemptID getTaskAttemptID(boolean isMap) {
  222.       JobID jobId = getJobID();
  223.       return new TaskAttemptID(jobId.getJtIdentifier(),
  224.           jobId.getId(), isMap, (isMap)?++mapTaskCtr: ++redTaskCtr, 0);
  225.     }
  226.     
  227.     @Override
  228.     Set<TaskInProgress> getNonLocalRunningMaps() {
  229.       return (Set<TaskInProgress>)mapTips;
  230.     }
  231.     @Override
  232.     Set<TaskInProgress> getRunningReduces() {
  233.       return (Set<TaskInProgress>)reduceTips;
  234.     }
  235.     
  236.   }
  237.   
  238.   static class FakeTaskInProgress extends TaskInProgress {
  239.     private boolean isMap;
  240.     private FakeJobInProgress fakeJob;
  241.     private TreeMap<TaskAttemptID, String> activeTasks;
  242.     private TaskStatus taskStatus;
  243.     FakeTaskInProgress(JobID jId, JobConf jobConf, Task t, 
  244.         boolean isMap, FakeJobInProgress job) {
  245.       super(jId, "", new JobClient.RawSplit(), null, jobConf, job, 0);
  246.       this.isMap = isMap;
  247.       this.fakeJob = job;
  248.       activeTasks = new TreeMap<TaskAttemptID, String>();
  249.       activeTasks.put(t.getTaskID(), "tt");
  250.       // create a fake status for a task that is running for a bit
  251.       this.taskStatus = TaskStatus.createTaskStatus(isMap);
  252.       taskStatus.setProgress(0.5f);
  253.       taskStatus.setRunState(TaskStatus.State.RUNNING);
  254.     }
  255.     
  256.     @Override
  257.     TreeMap<TaskAttemptID, String> getActiveTasks() {
  258.       return activeTasks;
  259.     }
  260.     @Override
  261.     public TaskStatus getTaskStatus(TaskAttemptID taskid) {
  262.       // return a status for a task that has run a bit
  263.       return taskStatus;
  264.     }
  265.     @Override
  266.     boolean killTask(TaskAttemptID taskId, boolean shouldFail) {
  267.       if (isMap) {
  268.         fakeJob.mapTaskFinished();
  269.       }
  270.       else {
  271.         fakeJob.reduceTaskFinished();
  272.       }
  273.       return true;
  274.     }
  275.   }
  276.   
  277.   static class FakeQueueManager extends QueueManager {
  278.     private Set<String> queues = null;
  279.     FakeQueueManager() {
  280.       super(new Configuration());
  281.     }
  282.     void setQueues(Set<String> queues) {
  283.       this.queues = queues;
  284.     }
  285.     public synchronized Set<String> getQueues() {
  286.       return queues;
  287.     }
  288.   }
  289.   
  290.   static class FakeTaskTrackerManager implements TaskTrackerManager {
  291.     int maps = 0;
  292.     int reduces = 0;
  293.     int maxMapTasksPerTracker = 2;
  294.     int maxReduceTasksPerTracker = 1;
  295.     List<JobInProgressListener> listeners =
  296.       new ArrayList<JobInProgressListener>();
  297.     FakeQueueManager qm = new FakeQueueManager();
  298.     
  299.     private Map<String, TaskTrackerStatus> trackers =
  300.       new HashMap<String, TaskTrackerStatus>();
  301.     private Map<String, TaskStatus> taskStatuses = 
  302.       new HashMap<String, TaskStatus>();
  303.     private Map<JobID, JobInProgress> jobs =
  304.         new HashMap<JobID, JobInProgress>();
  305.     public FakeTaskTrackerManager() {
  306.       this(2, 2, 1);
  307.     }
  308.     public FakeTaskTrackerManager(int numTaskTrackers,
  309.         int maxMapTasksPerTracker, int maxReduceTasksPerTracker) {
  310.       this.maxMapTasksPerTracker = maxMapTasksPerTracker;
  311.       this.maxReduceTasksPerTracker = maxReduceTasksPerTracker;
  312.       for (int i = 1; i < numTaskTrackers + 1; i++) {
  313.         String ttName = "tt" + i;
  314.         trackers.put(ttName, new TaskTrackerStatus(ttName, ttName + ".host", i,
  315.             new ArrayList<TaskStatus>(), 0, maxMapTasksPerTracker,
  316.             maxReduceTasksPerTracker));
  317.       }
  318.     }
  319.     
  320.     public void addTaskTracker(String ttName) {
  321.       trackers.put(ttName, new TaskTrackerStatus(ttName, ttName + ".host", 1,
  322.           new ArrayList<TaskStatus>(), 0,
  323.           maxMapTasksPerTracker, maxReduceTasksPerTracker));
  324.     }
  325.     
  326.     public ClusterStatus getClusterStatus() {
  327.       int numTrackers = trackers.size();
  328.       return new ClusterStatus(numTrackers, maps, reduces,
  329.           numTrackers * maxMapTasksPerTracker,
  330.           numTrackers * maxReduceTasksPerTracker,
  331.           JobTracker.State.RUNNING);
  332.     }
  333.     public int getNumberOfUniqueHosts() {
  334.       return 0;
  335.     }
  336.     public int getNextHeartbeatInterval() {
  337.       return MRConstants.HEARTBEAT_INTERVAL_MIN;
  338.     }
  339.     @Override
  340.     public void killJob(JobID jobid) throws IOException {
  341.       JobInProgress job = jobs.get(jobid);
  342.       finalizeJob(job, JobStatus.KILLED);
  343.       job.kill();
  344.     }
  345.     @Override
  346.     public JobInProgress getJob(JobID jobid) {
  347.       return jobs.get(jobid);
  348.     }
  349.     Collection<JobInProgress> getJobs() {
  350.       return jobs.values();
  351.     }
  352.     public Collection<TaskTrackerStatus> taskTrackers() {
  353.       return trackers.values();
  354.     }
  355.     public void addJobInProgressListener(JobInProgressListener listener) {
  356.       listeners.add(listener);
  357.     }
  358.     public void removeJobInProgressListener(JobInProgressListener listener) {
  359.       listeners.remove(listener);
  360.     }
  361.     
  362.     public void submitJob(JobInProgress job) throws IOException {
  363.       jobs.put(job.getJobID(), job);
  364.       for (JobInProgressListener listener : listeners) {
  365.         listener.jobAdded(job);
  366.       }
  367.     }
  368.     
  369.     public TaskTrackerStatus getTaskTracker(String trackerID) {
  370.       return trackers.get(trackerID);
  371.     }
  372.     
  373.     public void startTask(String taskTrackerName, final Task t) {
  374.       if (t.isMapTask()) {
  375.         maps++;
  376.       } else {
  377.         reduces++;
  378.       }
  379.       TaskStatus status = new TaskStatus() {
  380.         @Override
  381.         public TaskAttemptID getTaskID() {
  382.           return t.getTaskID();
  383.         }
  384.         @Override
  385.         public boolean getIsMap() {
  386.           return t.isMapTask();
  387.         }
  388.       };
  389.       taskStatuses.put(t.getTaskID().toString(), status);
  390.       status.setRunState(TaskStatus.State.RUNNING);
  391.       trackers.get(taskTrackerName).getTaskReports().add(status);
  392.     }
  393.     
  394.     public void finishTask(String taskTrackerName, String tipId, 
  395.         FakeJobInProgress j) {
  396.       TaskStatus status = taskStatuses.get(tipId);
  397.       if (status.getIsMap()) {
  398.         maps--;
  399.         j.mapTaskFinished();
  400.       } else {
  401.         reduces--;
  402.         j.reduceTaskFinished();
  403.       }
  404.       status.setRunState(TaskStatus.State.SUCCEEDED);
  405.     }
  406.     
  407.     void finalizeJob(FakeJobInProgress fjob) {
  408.       finalizeJob(fjob, JobStatus.SUCCEEDED);
  409.     }
  410.     void finalizeJob(JobInProgress fjob, int state) {
  411.       // take a snapshot of the status before changing it
  412.       JobStatus oldStatus = (JobStatus)fjob.getStatus().clone();
  413.       fjob.getStatus().setRunState(state);
  414.       JobStatus newStatus = (JobStatus)fjob.getStatus().clone();
  415.       JobStatusChangeEvent event = 
  416.         new JobStatusChangeEvent (fjob, EventType.RUN_STATE_CHANGED, oldStatus, 
  417.                                   newStatus);
  418.       for (JobInProgressListener listener : listeners) {
  419.         listener.jobUpdated(event);
  420.       }
  421.     }
  422.     
  423.     public void setPriority(FakeJobInProgress fjob, JobPriority priority) {
  424.       // take a snapshot of the status before changing it
  425.       JobStatus oldStatus = (JobStatus)fjob.getStatus().clone();
  426.       fjob.setPriority(priority);
  427.       JobStatus newStatus = (JobStatus)fjob.getStatus().clone();
  428.       JobStatusChangeEvent event = 
  429.         new JobStatusChangeEvent (fjob, EventType.PRIORITY_CHANGED, oldStatus, 
  430.                                   newStatus);
  431.       for (JobInProgressListener listener : listeners) {
  432.         listener.jobUpdated(event);
  433.       }
  434.     }
  435.     
  436.     public void setStartTime(FakeJobInProgress fjob, long start) {
  437.       // take a snapshot of the status before changing it
  438.       JobStatus oldStatus = (JobStatus)fjob.getStatus().clone();
  439.       
  440.       fjob.startTime = start; // change the start time of the job
  441.       fjob.status.setStartTime(start); // change the start time of the jobstatus
  442.       
  443.       JobStatus newStatus = (JobStatus)fjob.getStatus().clone();
  444.       
  445.       JobStatusChangeEvent event = 
  446.         new JobStatusChangeEvent (fjob, EventType.START_TIME_CHANGED, oldStatus,
  447.                                   newStatus);
  448.       for (JobInProgressListener listener : listeners) {
  449.         listener.jobUpdated(event);
  450.       }
  451.     }
  452.     
  453.     void addQueues(String[] arr) {
  454.       Set<String> queues = new HashSet<String>();
  455.       for (String s: arr) {
  456.         queues.add(s);
  457.       }
  458.       qm.setQueues(queues);
  459.     }
  460.     
  461.     public QueueManager getQueueManager() {
  462.       return qm;
  463.     }
  464.   }
  465.   
  466.   // represents a fake queue configuration info
  467.   static class FakeQueueInfo {
  468.     String queueName;
  469.     float gc;
  470.     int reclaimTimeLimit;
  471.     boolean supportsPrio;
  472.     int ulMin;
  473.     public FakeQueueInfo(String queueName, float gc,
  474.         int reclaimTimeLimit, boolean supportsPrio, int ulMin) {
  475.       this.queueName = queueName;
  476.       this.gc = gc;
  477.       this.reclaimTimeLimit = reclaimTimeLimit;
  478.       this.supportsPrio = supportsPrio;
  479.       this.ulMin = ulMin;
  480.     }
  481.   }
  482.   
  483.   static class FakeResourceManagerConf extends CapacitySchedulerConf {
  484.   
  485.     // map of queue names to queue info
  486.     private Map<String, FakeQueueInfo> queueMap = 
  487.       new LinkedHashMap<String, FakeQueueInfo>();
  488.     String firstQueue;
  489.     
  490.     private long reclaimCapacityInterval = 1000;
  491.     
  492.     void setFakeQueues(List<FakeQueueInfo> queues) {
  493.       for (FakeQueueInfo q: queues) {
  494.         queueMap.put(q.queueName, q);
  495.       }
  496.       firstQueue = new String(queues.get(0).queueName);
  497.     }
  498.     
  499.     public synchronized Set<String> getQueues() {
  500.       return queueMap.keySet();
  501.     }
  502.     
  503.     /*public synchronized String getFirstQueue() {
  504.       return firstQueue;
  505.     }*/
  506.     
  507.     public float getGuaranteedCapacity(String queue) {
  508.       if(queueMap.get(queue).gc == -1) {
  509.         return super.getGuaranteedCapacity(queue);
  510.       }
  511.       return queueMap.get(queue).gc;
  512.     }
  513.     
  514.     public int getReclaimTimeLimit(String queue) {
  515.       return queueMap.get(queue).reclaimTimeLimit;
  516.     }
  517.     
  518.     public int getMinimumUserLimitPercent(String queue) {
  519.       return queueMap.get(queue).ulMin;
  520.     }
  521.     
  522.     public boolean isPrioritySupported(String queue) {
  523.       return queueMap.get(queue).supportsPrio;
  524.     }
  525.     
  526.     @Override
  527.     public long getSleepInterval() {
  528.       return 1;
  529.     }
  530.     
  531.     @Override
  532.     public int getMaxWorkerThreads() {
  533.       return 1;
  534.     }
  535.     
  536.     @Override
  537.     public long getReclaimCapacityInterval() {
  538.       return reclaimCapacityInterval ;
  539.     }
  540.     
  541.     @Override
  542.     public void setReclaimCapacityInterval(long value) {
  543.       this.reclaimCapacityInterval = value;
  544.     }
  545.   }
  546.   protected class FakeClock extends CapacityTaskScheduler.Clock {
  547.     private long time = 0;
  548.     
  549.     public void advance(long millis) {
  550.       time += millis;
  551.     }
  552.     @Override
  553.     long getTime() {
  554.       return time;
  555.     }
  556.   }
  557.   
  558.   protected JobConf conf;
  559.   protected CapacityTaskScheduler scheduler;
  560.   private FakeTaskTrackerManager taskTrackerManager;
  561.   private FakeResourceManagerConf resConf;
  562.   private FakeClock clock;
  563.   @Override
  564.   protected void setUp() {
  565.     setUp(2, 2, 1);
  566.   }
  567.   private void setUp(int numTaskTrackers, int numMapTasksPerTracker,
  568.       int numReduceTasksPerTracker) {
  569.     jobCounter = 0;
  570.     taskTrackerManager =
  571.         new FakeTaskTrackerManager(numTaskTrackers, numMapTasksPerTracker,
  572.             numReduceTasksPerTracker);
  573.     clock = new FakeClock();
  574.     scheduler = new CapacityTaskScheduler(clock);
  575.     scheduler.setTaskTrackerManager(taskTrackerManager);
  576.     conf = new JobConf();
  577.     // Don't let the JobInitializationPoller come in our way.
  578.     resConf = new FakeResourceManagerConf();
  579.     controlledInitializationPoller = new ControlledInitializationPoller(
  580.         scheduler.jobQueuesManager,
  581.         resConf,
  582.         resConf.getQueues());
  583.     scheduler.setInitializationPoller(controlledInitializationPoller);
  584.     scheduler.setConf(conf);
  585.   }
  586.   
  587.   @Override
  588.   protected void tearDown() throws Exception {
  589.     if (scheduler != null) {
  590.       scheduler.terminate();
  591.     }
  592.   }
  593.   private FakeJobInProgress submitJob(int state, JobConf jobConf) throws IOException {
  594.     FakeJobInProgress job =
  595.         new FakeJobInProgress(new JobID("test", ++jobCounter),
  596.             (jobConf == null ? new JobConf() : jobConf), taskTrackerManager,
  597.             jobConf.getUser());
  598.     job.getStatus().setRunState(state);
  599.     taskTrackerManager.submitJob(job);
  600.     return job;
  601.   }
  602.   private FakeJobInProgress submitJobAndInit(int state, JobConf jobConf)
  603.       throws IOException {
  604.     FakeJobInProgress j = submitJob(state, jobConf);
  605.     scheduler.jobQueuesManager.jobUpdated(initTasksAndReportEvent(j));
  606.     return j;
  607.   }
  608.   private FakeJobInProgress submitJob(int state, int maps, int reduces, 
  609.       String queue, String user) throws IOException {
  610.     JobConf jobConf = new JobConf(conf);
  611.     jobConf.setNumMapTasks(maps);
  612.     jobConf.setNumReduceTasks(reduces);
  613.     if (queue != null)
  614.       jobConf.setQueueName(queue);
  615.     jobConf.setUser(user);
  616.     return submitJob(state, jobConf);
  617.   }
  618.   
  619.   // Submit a job and update the listeners
  620.   private FakeJobInProgress submitJobAndInit(int state, int maps, int reduces,
  621.                                              String queue, String user) 
  622.   throws IOException {
  623.     FakeJobInProgress j = submitJob(state, maps, reduces, queue, user);
  624.     scheduler.jobQueuesManager.jobUpdated(initTasksAndReportEvent(j));
  625.     return j;
  626.   }
  627.   
  628.   // Note that there is no concept of setup tasks here. So init itself should 
  629.   // report the job-status change
  630.   private JobStatusChangeEvent initTasksAndReportEvent(FakeJobInProgress jip) 
  631.   throws IOException {
  632.     JobStatus oldStatus = (JobStatus)jip.getStatus().clone();
  633.     jip.initTasks();
  634.     JobStatus newStatus = (JobStatus)jip.getStatus().clone();
  635.     return new JobStatusChangeEvent(jip, EventType.RUN_STATE_CHANGED, 
  636.                                     oldStatus, newStatus);
  637.   }
  638.   
  639.   // test job run-state change
  640.   public void testJobRunStateChange() throws IOException {
  641.     // start the scheduler
  642.     taskTrackerManager.addQueues(new String[] {"default"});
  643.     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
  644.     queues.add(new FakeQueueInfo("default", 100.0f, 1, true, 1));
  645.     resConf.setFakeQueues(queues);
  646.     scheduler.setResourceManagerConf(resConf);
  647.     scheduler.start();
  648.     
  649.     // submit the job
  650.     FakeJobInProgress fjob1 = 
  651.       submitJob(JobStatus.PREP, 1, 0, "default", "user");
  652.     
  653.     FakeJobInProgress fjob2 = 
  654.       submitJob(JobStatus.PREP, 1, 0, "default", "user");
  655.     
  656.     // test if changing the job priority/start-time works as expected in the 
  657.     // waiting queue
  658.     testJobOrderChange(fjob1, fjob2, true);
  659.     
  660.     // Init the jobs
  661.     // simulate the case where the job with a lower priority becomes running 
  662.     // first (may be because of the setup tasks).
  663.     
  664.     // init the lower ranked job first
  665.     JobChangeEvent event = initTasksAndReportEvent(fjob2);
  666.     
  667.     // inform the scheduler
  668.     scheduler.jobQueuesManager.jobUpdated(event);
  669.     
  670.     // init the higher ordered job later
  671.     event = initTasksAndReportEvent(fjob1);
  672.     
  673.     // inform the scheduler
  674.     scheduler.jobQueuesManager.jobUpdated(event);
  675.     
  676.     // check if the jobs are missing from the waiting queue
  677.     // The jobs are not removed from waiting queue until they are scheduled 
  678.     assertEquals("Waiting queue is garbled on job init", 2, 
  679.                  scheduler.jobQueuesManager.getWaitingJobs("default")
  680.                           .size());
  681.     
  682.     // test if changing the job priority/start-time works as expected in the 
  683.     // running queue
  684.     testJobOrderChange(fjob1, fjob2, false);
  685.     
  686.     // schedule a task
  687.     List<Task> tasks = scheduler.assignTasks(tracker("tt1"));
  688.     
  689.     // complete the job
  690.     taskTrackerManager.finishTask("tt1", tasks.get(0).getTaskID().toString(), 
  691.                                   fjob1);
  692.     
  693.     // mark the job as complete
  694.     taskTrackerManager.finalizeJob(fjob1);
  695.     
  696.     Collection<JobInProgress> rqueue = 
  697.       scheduler.jobQueuesManager.getRunningJobQueue("default");
  698.     
  699.     // check if the job is removed from the scheduler
  700.     assertFalse("Scheduler contains completed job", 
  701.                 rqueue.contains(fjob1));
  702.     
  703.     // check if the running queue size is correct
  704.     assertEquals("Job finish garbles the queue", 
  705.                  1, rqueue.size());
  706.   }
  707.   
  708.   // test if the queue reflects the changes
  709.   private void testJobOrderChange(FakeJobInProgress fjob1, 
  710.                                   FakeJobInProgress fjob2, 
  711.                                   boolean waiting) {
  712.     String queueName = waiting ? "waiting" : "running";
  713.     
  714.     // check if the jobs in the queue are the right order
  715.     JobInProgress[] jobs = getJobsInQueue(waiting);
  716.     assertTrue(queueName + " queue doesnt contain job #1 in right order", 
  717.                 jobs[0].getJobID().equals(fjob1.getJobID()));
  718.     assertTrue(queueName + " queue doesnt contain job #2 in right order", 
  719.                 jobs[1].getJobID().equals(fjob2.getJobID()));
  720.     
  721.     // I. Check the start-time change
  722.     // Change job2 start-time and check if job2 bumps up in the queue 
  723.     taskTrackerManager.setStartTime(fjob2, fjob1.startTime - 1);
  724.     
  725.     jobs = getJobsInQueue(waiting);
  726.     assertTrue("Start time change didnt not work as expected for job #2 in "
  727.                + queueName + " queue", 
  728.                 jobs[0].getJobID().equals(fjob2.getJobID()));
  729.     assertTrue("Start time change didnt not work as expected for job #1 in"
  730.                + queueName + " queue", 
  731.                 jobs[1].getJobID().equals(fjob1.getJobID()));
  732.     
  733.     // check if the queue is fine
  734.     assertEquals("Start-time change garbled the " + queueName + " queue", 
  735.                  2, jobs.length);
  736.     
  737.     // II. Change job priority change
  738.     // Bump up job1's priority and make sure job1 bumps up in the queue
  739.     taskTrackerManager.setPriority(fjob1, JobPriority.HIGH);
  740.     
  741.     // Check if the priority changes are reflected
  742.     jobs = getJobsInQueue(waiting);
  743.     assertTrue("Priority change didnt not work as expected for job #1 in "
  744.                + queueName + " queue",  
  745.                 jobs[0].getJobID().equals(fjob1.getJobID()));
  746.     assertTrue("Priority change didnt not work as expected for job #2 in "
  747.                + queueName + " queue",  
  748.                 jobs[1].getJobID().equals(fjob2.getJobID()));
  749.     
  750.     // check if the queue is fine
  751.     assertEquals("Priority change has garbled the " + queueName + " queue", 
  752.                  2, jobs.length);
  753.     
  754.     // reset the queue state back to normal
  755.     taskTrackerManager.setStartTime(fjob1, fjob2.startTime - 1);
  756.     taskTrackerManager.setPriority(fjob1, JobPriority.NORMAL);
  757.   }
  758.   
  759.   private JobInProgress[] getJobsInQueue(boolean waiting) {
  760.     Collection<JobInProgress> queue = 
  761.       waiting 
  762.       ? scheduler.jobQueuesManager.getWaitingJobs("default")
  763.       : scheduler.jobQueuesManager.getRunningJobQueue("default");
  764.     return queue.toArray(new JobInProgress[0]);
  765.   }
  766.   
  767.   /*protected void submitJobs(int number, int state, int maps, int reduces)
  768.     throws IOException {
  769.     for (int i = 0; i < number; i++) {
  770.       submitJob(state, maps, reduces);
  771.     }
  772.   }*/
  773.   
  774.   // tests if tasks can be assinged when there are multiple jobs from a same
  775.   // user
  776.   public void testJobFinished() throws Exception {
  777.     taskTrackerManager.addQueues(new String[] {"default"});
  778.     
  779.     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
  780.     queues.add(new FakeQueueInfo("default", 50.0f, 10, true, 25));
  781.     resConf.setFakeQueues(queues);
  782.     scheduler.setResourceManagerConf(resConf);
  783.     scheduler.start();
  784.     // submit 2 jobs
  785.     FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 3, 0, "default", "u1");
  786.     FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 3, 0, "default", "u1");
  787.     
  788.     // I. Check multiple assignments with running tasks within job
  789.     // ask for a task from first job
  790.     Task t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
  791.     //  ask for another task from the first job
  792.     t = checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
  793.     
  794.     // complete tasks
  795.     taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", j1);
  796.     taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000002_0", j1);
  797.     
  798.     // II. Check multiple assignments with running tasks across jobs
  799.     // ask for a task from first job
  800.     t = checkAssignment("tt1", "attempt_test_0001_m_000003_0 on tt1");
  801.     
  802.     //  ask for a task from the second job
  803.     t = checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
  804.     
  805.     // complete tasks
  806.     taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0", j2);
  807.     taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000003_0", j1);
  808.     
  809.     // III. Check multiple assignments with completed tasks across jobs
  810.     // ask for a task from the second job
  811.     t = checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
  812.     
  813.     // complete task
  814.     taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000002_0", j2);
  815.     
  816.     // IV. Check assignment with completed job
  817.     // finish first job
  818.     scheduler.jobCompleted(j1);
  819.     
  820.     // ask for another task from the second job
  821.     // if tasks can be assigned then the structures are properly updated 
  822.     t = checkAssignment("tt1", "attempt_test_0002_m_000003_0 on tt1");
  823.     
  824.     // complete task
  825.     taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000003_0", j2);
  826.   }
  827.   
  828.   // basic tests, should be able to submit to queues
  829.   public void testSubmitToQueues() throws Exception {
  830.     // set up some queues
  831.     String[] qs = {"default", "q2"};
  832.     taskTrackerManager.addQueues(qs);
  833.     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
  834.     queues.add(new FakeQueueInfo("default", 50.0f, 5000, true, 25));
  835.     queues.add(new FakeQueueInfo("q2", 50.0f, 5000, true, 25));
  836.     resConf.setFakeQueues(queues);
  837.     scheduler.setResourceManagerConf(resConf);
  838.     scheduler.start();
  839.     // submit a job with no queue specified. It should be accepted
  840.     // and given to the default queue. 
  841.     JobInProgress j = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
  842.     
  843.     // when we ask for a task, we should get one, from the job submitted
  844.     Task t;
  845.     t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
  846.     // submit another job, to a different queue
  847.     j = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
  848.     // now when we get a task, it should be from the second job
  849.     t = checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
  850.   }
  851.   
  852.   public void testGetJobs() throws Exception {
  853.     // need only one queue
  854.     String[] qs = { "default" };
  855.     taskTrackerManager.addQueues(qs);
  856.     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
  857.     queues.add(new FakeQueueInfo("default", 100.0f, 300, true, 100));
  858.     resConf.setFakeQueues(queues);
  859.     scheduler.setResourceManagerConf(resConf);
  860.     scheduler.start();
  861.     HashMap<String, ArrayList<FakeJobInProgress>> subJobsList = 
  862.       submitJobs(1, 4, "default");
  863.    
  864.     JobQueuesManager mgr = scheduler.jobQueuesManager;
  865.     
  866.     while(mgr.getWaitingJobs("default").size() < 4){
  867.       Thread.sleep(1);
  868.     }
  869.     //Raise status change events for jobs submitted.
  870.     raiseStatusChangeEvents(mgr);
  871.     Collection<JobInProgress> jobs = scheduler.getJobs("default");
  872.     
  873.     assertTrue("Number of jobs returned by scheduler is wrong" 
  874.         ,jobs.size() == 4);
  875.     
  876.     assertTrue("Submitted jobs and Returned jobs are not same",
  877.         subJobsList.get("u1").containsAll(jobs));
  878.   }
  879.   
  880.   //Basic test to test GC allocation across the queues which have no
  881.   //GC configured.
  882.   
  883.   public void testGCAllocationToQueues() throws Exception {
  884.     String[] qs = {"default","q1","q2","q3","q4"};
  885.     taskTrackerManager.addQueues(qs);
  886.     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
  887.     queues.add(new FakeQueueInfo("default",25.0f,5000,true,25));
  888.     queues.add(new FakeQueueInfo("q1",-1.0f,5000,true,25));
  889.     queues.add(new FakeQueueInfo("q2",-1.0f,5000,true,25));
  890.     queues.add(new FakeQueueInfo("q3",-1.0f,5000,true,25));
  891.     queues.add(new FakeQueueInfo("q4",-1.0f,5000,true,25));
  892.     resConf.setFakeQueues(queues);
  893.     scheduler.setResourceManagerConf(resConf);
  894.     scheduler.start(); 
  895.     assertEquals(18.75f, resConf.getGuaranteedCapacity("q1"));
  896.     assertEquals(18.75f, resConf.getGuaranteedCapacity("q2"));
  897.     assertEquals(18.75f, resConf.getGuaranteedCapacity("q3"));
  898.     assertEquals(18.75f, resConf.getGuaranteedCapacity("q4"));
  899.   }
  900.   // Tests how GC is computed and assignment of tasks done
  901.   // on the basis of the GC.
  902.   public void testCapacityBasedAllocation() throws Exception {
  903.     // set up some queues
  904.     String[] qs = {"default", "q2"};
  905.     taskTrackerManager.addQueues(qs);
  906.     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
  907.     // set the gc % as 10%, so that gc will be zero initially as 
  908.     // the cluster capacity increase slowly.
  909.     queues.add(new FakeQueueInfo("default", 10.0f, 5000, true, 25));
  910.     queues.add(new FakeQueueInfo("q2", 90.0f, 5000, true, 25));
  911.     resConf.setFakeQueues(queues);
  912.     scheduler.setResourceManagerConf(resConf);
  913.     scheduler.start();
  914.    
  915.     // submit a job to the default queue
  916.     submitJobAndInit(JobStatus.PREP, 10, 0, "default", "u1");
  917.     
  918.     // submit a job to the second queue
  919.     submitJobAndInit(JobStatus.PREP, 10, 0, "q2", "u1");
  920.     
  921.     // job from q2 runs first because it has some non-zero capacity.
  922.     checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
  923.     verifyGuaranteedCapacity("0", "default");
  924.     verifyGuaranteedCapacity("3", "q2");
  925.     
  926.     // add another tt to increase tt slots
  927.     taskTrackerManager.addTaskTracker("tt3");
  928.     checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2");
  929.     verifyGuaranteedCapacity("0", "default");
  930.     verifyGuaranteedCapacity("5", "q2");
  931.     
  932.     // add another tt to increase tt slots
  933.     taskTrackerManager.addTaskTracker("tt4");
  934.     checkAssignment("tt3", "attempt_test_0002_m_000003_0 on tt3");
  935.     verifyGuaranteedCapacity("0", "default");
  936.     verifyGuaranteedCapacity("7", "q2");
  937.     
  938.     // add another tt to increase tt slots
  939.     taskTrackerManager.addTaskTracker("tt5");
  940.     // now job from default should run, as it is furthest away
  941.     // in terms of runningMaps / gc.
  942.     checkAssignment("tt4", "attempt_test_0001_m_000001_0 on tt4");
  943.     verifyGuaranteedCapacity("1", "default");
  944.     verifyGuaranteedCapacity("9", "q2");
  945.   }
  946.   
  947.   private void verifyGuaranteedCapacity(String expectedCapacity, 
  948.                                           String queue) throws IOException {
  949.     String schedInfo = taskTrackerManager.getQueueManager().
  950.                           getSchedulerInfo(queue).toString();
  951.     assertTrue(schedInfo.contains("Map tasksnGuaranteed Capacity: " 
  952.         + expectedCapacity));
  953.   }
  954.   
  955.   // test capacity transfer
  956.   public void testCapacityTransfer() throws Exception {
  957.     // set up some queues
  958.     String[] qs = {"default", "q2"};
  959.     taskTrackerManager.addQueues(qs);
  960.     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
  961.     queues.add(new FakeQueueInfo("default", 50.0f, 5000, true, 25));
  962.     queues.add(new FakeQueueInfo("q2", 50.0f, 5000, true, 25));
  963.     resConf.setFakeQueues(queues);
  964.     scheduler.setResourceManagerConf(resConf);
  965.     scheduler.start();
  966.     // submit a job  
  967.     submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
  968.     // for queue 'q2', the GC for maps is 2. Since we're the only user, 
  969.     // we should get a task 
  970.     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
  971.     // I should get another map task. 
  972.     checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
  973.     // Now we're at full capacity for maps. If I ask for another map task,
  974.     // I should get a map task from the default queue's capacity. 
  975.     checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
  976.     // and another
  977.     checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
  978.   }
  979.   // test user limits
  980.   public void testUserLimits() throws Exception {
  981.     // set up some queues
  982.     String[] qs = {"default", "q2"};
  983.     taskTrackerManager.addQueues(qs);
  984.     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
  985.     queues.add(new FakeQueueInfo("default", 50.0f, 5000, true, 25));
  986.     queues.add(new FakeQueueInfo("q2", 50.0f, 5000, true, 25));
  987.     resConf.setFakeQueues(queues);
  988.     scheduler.setResourceManagerConf(resConf);
  989.     scheduler.start();
  990.     // submit a job  
  991.     submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
  992.     // for queue 'q2', the GC for maps is 2. Since we're the only user, 
  993.     // we should get a task 
  994.     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
  995.     // Submit another job, from a different user
  996.     submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
  997.     // Now if I ask for a map task, it should come from the second job 
  998.     checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
  999.     // Now we're at full capacity for maps. If I ask for another map task,
  1000.     // I should get a map task from the default queue's capacity. 
  1001.     checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
  1002.     // and another
  1003.     checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2");
  1004.   }
  1005.   // test user limits when a 2nd job is submitted much after first job 
  1006.   public void testUserLimits2() throws Exception {
  1007.     // set up some queues
  1008.     String[] qs = {"default", "q2"};
  1009.     taskTrackerManager.addQueues(qs);
  1010.     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
  1011.     queues.add(new FakeQueueInfo("default", 50.0f, 5000, true, 25));
  1012.     queues.add(new FakeQueueInfo("q2", 50.0f, 5000, true, 25));
  1013.     resConf.setFakeQueues(queues);
  1014.     scheduler.setResourceManagerConf(resConf);
  1015.     scheduler.start();
  1016.     // submit a job  
  1017.     submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
  1018.     // for queue 'q2', the GC for maps is 2. Since we're the only user, 
  1019.     // we should get a task 
  1020.     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
  1021.     // since we're the only job, we get another map
  1022.     checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
  1023.     // Submit another job, from a different user
  1024.     submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
  1025.     // Now if I ask for a map task, it should come from the second job 
  1026.     checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
  1027.     // and another
  1028.     checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2");
  1029.   }
  1030.   // test user limits when a 2nd job is submitted much after first job 
  1031.   // and we need to wait for first job's task to complete
  1032.   public void testUserLimits3() throws Exception {
  1033.     // set up some queues
  1034.     String[] qs = {"default", "q2"};
  1035.     taskTrackerManager.addQueues(qs);
  1036.     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
  1037.     queues.add(new FakeQueueInfo("default", 50.0f, 5000, true, 25));
  1038.     queues.add(new FakeQueueInfo("q2", 50.0f, 5000, true, 25));
  1039.     resConf.setFakeQueues(queues);
  1040.     scheduler.setResourceManagerConf(resConf);
  1041.     scheduler.start();
  1042.     // submit a job  
  1043.     FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
  1044.     // for queue 'q2', the GC for maps is 2. Since we're the only user, 
  1045.     // we should get a task 
  1046.     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
  1047.     // since we're the only job, we get another map
  1048.     checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
  1049.     // we get two more maps from 'default queue'
  1050.     checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
  1051.     checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
  1052.     // Submit another job, from a different user
  1053.     FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
  1054.     // one of the task finishes
  1055.     taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", j1);
  1056.     // Now if I ask for a map task, it should come from the second job 
  1057.     checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
  1058.     // another task from job1 finishes, another new task to job2
  1059.     taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000002_0", j1);
  1060.     checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
  1061.     // now we have equal number of tasks from each job. Whichever job's
  1062.     // task finishes, that job gets a new task
  1063.     taskTrackerManager.finishTask("tt2", "attempt_test_0001_m_000003_0", j1);
  1064.     checkAssignment("tt2", "attempt_test_0001_m_000005_0 on tt2");
  1065.     taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0", j2);
  1066.     checkAssignment("tt1", "attempt_test_0002_m_000003_0 on tt1");
  1067.   }
  1068.   // test user limits with many users, more slots
  1069.   public void testUserLimits4() throws Exception {
  1070.     // set up one queue, with 10 slots
  1071.     String[] qs = {"default"};
  1072.     taskTrackerManager.addQueues(qs);
  1073.     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
  1074.     queues.add(new FakeQueueInfo("default", 100.0f, 10000, true, 25));
  1075.     resConf.setFakeQueues(queues);
  1076.     scheduler.setResourceManagerConf(resConf);
  1077.     scheduler.start();
  1078.     // add some more TTs 
  1079.     taskTrackerManager.addTaskTracker("tt3");
  1080.     taskTrackerManager.addTaskTracker("tt4");
  1081.     taskTrackerManager.addTaskTracker("tt5");
  1082.     // u1 submits job
  1083.     FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
  1084.     // it gets the first 5 slots
  1085.     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
  1086.     checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
  1087.     checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
  1088.     checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
  1089.     checkAssignment("tt3", "attempt_test_0001_m_000005_0 on tt3");
  1090.     // u2 submits job with 4 slots
  1091.     FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 4, 4, null, "u2");
  1092.     // u2 should get next 4 slots
  1093.     checkAssignment("tt3", "attempt_test_0002_m_000001_0 on tt3");
  1094.     checkAssignment("tt4", "attempt_test_0002_m_000002_0 on tt4");
  1095.     checkAssignment("tt4", "attempt_test_0002_m_000003_0 on tt4");
  1096.     checkAssignment("tt5", "attempt_test_0002_m_000004_0 on tt5");
  1097.     // last slot should go to u1, since u2 has no more tasks
  1098.     checkAssignment("tt5", "attempt_test_0001_m_000006_0 on tt5");
  1099.     // u1 finishes a task
  1100.     taskTrackerManager.finishTask("tt5", "attempt_test_0001_m_000006_0", j1);
  1101.     // u1 submits a few more jobs 
  1102.     // All the jobs are inited when submitted
  1103.     // because of addition of Eager Job Initializer all jobs in this
  1104.     //case would e initialised.
  1105.     submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
  1106.     submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
  1107.     submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
  1108.     // u2 also submits a job
  1109.     submitJobAndInit(JobStatus.PREP, 10, 10, null, "u2");
  1110.     // now u3 submits a job
  1111.     submitJobAndInit(JobStatus.PREP, 2, 2, null, "u3");
  1112.     // next slot should go to u3, even though u2 has an earlier job, since
  1113.     // user limits have changed and u1/u2 are over limits
  1114.     checkAssignment("tt5", "attempt_test_0007_m_000001_0 on tt5");
  1115.     // some other task finishes and u3 gets it
  1116.     taskTrackerManager.finishTask("tt5", "attempt_test_0002_m_000004_0", j1);
  1117.     checkAssignment("tt5", "attempt_test_0007_m_000002_0 on tt5");
  1118.     // now, u2 finishes a task
  1119.     taskTrackerManager.finishTask("tt4", "attempt_test_0002_m_000002_0", j1);
  1120.     // next slot will go to u1, since u3 has nothing to run and u1's job is 
  1121.     // first in the queue
  1122.     checkAssignment("tt4", "attempt_test_0001_m_000007_0 on tt4");
  1123.   }
  1124.   // test code to reclaim capacity
  1125.   public void testReclaimCapacity() throws Exception {
  1126.     // set up some queues
  1127.     String[] qs = {"default", "q2", "q3"};
  1128.     taskTrackerManager.addQueues(qs);
  1129.     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
  1130.     queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
  1131.     queues.add(new FakeQueueInfo("q2", 25.0f, 1000, true, 25));
  1132.     queues.add(new FakeQueueInfo("q3", 25.0f, 1000, true, 25));
  1133.     resConf.setFakeQueues(queues);
  1134.     resConf.setReclaimCapacityInterval(500);
  1135.     scheduler.setResourceManagerConf(resConf);
  1136.     scheduler.start();
  1137.     // set up a situation where q2 is under capacity, and default & q3
  1138.     // are at/over capacity
  1139.     FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
  1140.     FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 10, 10, "q3", "u1");
  1141.     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
  1142.     checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
  1143.     checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
  1144.     checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
  1145.     // now submit a job to q2
  1146.     FakeJobInProgress j3 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
  1147.     // get scheduler to notice that q2 needs to reclaim
  1148.     scheduler.reclaimCapacity();
  1149.     // our queue reclaim time is 1000s, heartbeat interval is 5 sec, so 
  1150.     // we start reclaiming when 15 secs are left. 
  1151.     clock.advance(400000);
  1152.     scheduler.reclaimCapacity();
  1153.     // no tasks should have been killed yet
  1154.     assertEquals(j1.runningMapTasks, 3);
  1155.     assertEquals(j2.runningMapTasks, 1);
  1156.     clock.advance(200000);
  1157.     scheduler.reclaimCapacity();
  1158.     // task from j1 will be killed
  1159.     assertEquals(j1.runningMapTasks, 2);
  1160.     assertEquals(j2.runningMapTasks, 1);
  1161.     
  1162.   }
  1163.   // test code to reclaim multiple capacity 
  1164.   public void testReclaimCapacity2() throws Exception {
  1165.     // set up some queues
  1166.     String[] qs = {"default", "q2", "q3", "q4"};
  1167.     taskTrackerManager.addQueues(qs);
  1168.     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
  1169.     queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
  1170.     queues.add(new FakeQueueInfo("q2", 20.0f, 1000, true, 25));
  1171.     queues.add(new FakeQueueInfo("q3", 20.0f, 1000, true, 25));
  1172.     queues.add(new FakeQueueInfo("q4", 10.0f, 1000, true, 25));
  1173.     resConf.setFakeQueues(queues);
  1174.     resConf.setReclaimCapacityInterval(500);
  1175.     scheduler.setResourceManagerConf(resConf);
  1176.     scheduler.start();
  1177.     
  1178.     // add some more TTs so our total map capacity is 10
  1179.     taskTrackerManager.addTaskTracker("tt3");
  1180.     taskTrackerManager.addTaskTracker("tt4");
  1181.     taskTrackerManager.addTaskTracker("tt5");
  1182.     // q2 has nothing running, default is under cap, q3 and q4 are over cap
  1183.     FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 2, 2, null, "u1");
  1184.     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
  1185.     FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 10, 10, "q3", "u1");
  1186.     checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
  1187.     FakeJobInProgress j3 = submitJobAndInit(JobStatus.PREP, 10, 10, "q4", "u1");
  1188.     checkAssignment("tt2", "attempt_test_0003_m_000001_0 on tt2");
  1189.     checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
  1190.     checkAssignment("tt3", "attempt_test_0002_m_000002_0 on tt3");
  1191.     checkAssignment("tt3", "attempt_test_0002_m_000003_0 on tt3");
  1192.     checkAssignment("tt4", "attempt_test_0003_m_000002_0 on tt4");
  1193.     checkAssignment("tt4", "attempt_test_0002_m_000004_0 on tt4");
  1194.     checkAssignment("tt5", "attempt_test_0002_m_000005_0 on tt5");
  1195.     checkAssignment("tt5", "attempt_test_0003_m_000003_0 on tt5");
  1196.     // at this point, q3 is running 5 tasks (with a cap of 2), q4 is
  1197.     // running 3 tasks (with a cap of 1). 
  1198.     // If we submit a job to 'default', we need to get 3 slots back. 
  1199.     FakeJobInProgress j4 = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
  1200.     // get scheduler to notice that q2 needs to reclaim
  1201.     scheduler.reclaimCapacity();
  1202.     // our queue reclaim time is 1000s, heartbeat interval is 5 sec, so 
  1203.     // we start reclaiming when 15 secs are left. 
  1204.     clock.advance(400000);
  1205.     scheduler.reclaimCapacity();
  1206.     // nothing should have happened
  1207.     assertEquals(j2.runningMapTasks, 5);
  1208.     assertEquals(j3.runningMapTasks, 3);
  1209.     // 3 tasks to kill, 5 running over cap. q3 should give up 3*3/5 = 2 slots.
  1210.     // q4 should give up 2*3/5 = 1 slot. 
  1211.     clock.advance(200000);
  1212.     scheduler.reclaimCapacity();
  1213.     assertEquals(j2.runningMapTasks, 3);
  1214.     assertEquals(j3.runningMapTasks, 2);
  1215.     
  1216.   }
  1217.   // test code to reclaim capacity when the cluster is completely occupied
  1218.   public void testReclaimCapacityWithFullCluster() throws Exception {
  1219.     // set up some queues
  1220.     String[] qs = {"default", "queue"};
  1221.     taskTrackerManager.addQueues(qs);
  1222.     int maxSlots = taskTrackerManager.maxMapTasksPerTracker 
  1223.                    * taskTrackerManager.taskTrackers().size();
  1224.     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
  1225.     queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
  1226.     queues.add(new FakeQueueInfo("queue", 50.0f, 1000, true, 25));
  1227.     resConf.setFakeQueues(queues);
  1228.     resConf.setReclaimCapacityInterval(500);
  1229.     scheduler.setResourceManagerConf(resConf);
  1230.     scheduler.start();
  1231.     
  1232.     // now submit 1 job to queue "default" which should take up the cluster
  1233.     FakeJobInProgress j1 = 
  1234.       submitJobAndInit(JobStatus.PREP, maxSlots, 0, "default", "u1");
  1235.     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
  1236.     checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
  1237.     checkAssignment("tt1", "attempt_test_0001_m_000003_0 on tt1");
  1238.     checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
  1239.     
  1240.     // now submit a job to queue "queue"
  1241.     submitJobAndInit(JobStatus.PREP, maxSlots, 0, "queue", "u2");
  1242.     
  1243.     scheduler.reclaimCapacity();
  1244.     
  1245.     clock.advance(scheduler.schedConf.getReclaimTimeLimit("default") * 1000);
  1246.     
  1247.     scheduler.reclaimCapacity();
  1248.     
  1249.     // check if the tasks are killed 
  1250.     assertEquals("Failed to reclaim tasks", j1.runningMapTasks, 2);
  1251.   }
  1252.   
  1253.   // test code to reclaim capacity in steps
  1254.   public void testReclaimCapacityInSteps() throws Exception {
  1255.     // set up some queues
  1256.     String[] qs = {"default", "q2"};
  1257.     taskTrackerManager.addQueues(qs);
  1258.     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
  1259.     queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
  1260.     queues.add(new FakeQueueInfo("q2", 50.0f, 1000, true, 25));
  1261.     resConf.setFakeQueues(queues);
  1262.     resConf.setReclaimCapacityInterval(500);
  1263.     scheduler.setResourceManagerConf(resConf);
  1264.     scheduler.start();
  1265.     // set up a situation where q2 is under capacity, and default is
  1266.     // at/over capacity
  1267.     FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
  1268.     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
  1269.     checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
  1270.     checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
  1271.     checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
  1272.     // now submit a job to q2
  1273.     FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 1, 1, "q2", "u1");
  1274.     // get scheduler to notice that q2 needs to reclaim
  1275.     scheduler.reclaimCapacity();
  1276.     // our queue reclaim time is 1000s, heartbeat interval is 5 sec, so 
  1277.     // we start reclaiming when 15 secs are left. 
  1278.     clock.advance(400000);
  1279.     // submit another job to q2 which causes more capacity to be reclaimed
  1280.     j2 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
  1281.     clock.advance(200000);
  1282.     scheduler.reclaimCapacity();
  1283.     // one task from j1 will be killed
  1284.     assertEquals(j1.runningMapTasks, 3);
  1285.     clock.advance(300000);
  1286.     scheduler.reclaimCapacity();
  1287.     // timer for 2nd job hasn't fired, so nothing killed
  1288.     assertEquals(j1.runningMapTasks, 3);
  1289.     clock.advance(400000);
  1290.     scheduler.reclaimCapacity();
  1291.     // one task from j1 will be killed
  1292.     assertEquals(j1.runningMapTasks, 2);
  1293.     
  1294.   }
  1295.   
  1296.   /*
  1297.    * Test case for checking the reclaim capacity with uninitalized jobs.
  1298.    * 
  1299.    * Configure 2 queue with capacity scheduler.
  1300.    * 
  1301.    * Submit a single job to the default queue and make it go above the gc
  1302.    * of the queue.
  1303.    * 
  1304.    * Then submit another job to the second queue but don't initialize it.
  1305.    * 
  1306.    * Run reclaim capacity thread for the scheduler, in order to let scheduler
  1307.    * know that it has to reclaim capacity.
  1308.    * 
  1309.    * Advance the scheduler clock by appropriate milliseconds.
  1310.    * 
  1311.    * Run scheduler.reclaimCapacity() to kill the appropriate tasks.
  1312.    * 
  1313.    * Check running task count of the running job.
  1314.    * 
  1315.    */
  1316.   public void testReclaimCapacityWithUninitializedJobs() throws IOException {
  1317.     String[] qs = {"default", "q2"};
  1318.     taskTrackerManager.addQueues(qs);
  1319.     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
  1320.     queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
  1321.     queues.add(new FakeQueueInfo("q2", 50.0f, 1000, true, 25));
  1322.     resConf.setFakeQueues(queues);
  1323.     scheduler.setResourceManagerConf(resConf);
  1324.     scheduler.start();
  1325.     //Submit one job to the default queue and get the capacity over the 
  1326.     //gc of the particular queue.
  1327.     FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
  1328.     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
  1329.     checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
  1330.     checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
  1331.     checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
  1332.     
  1333.     //Submit another job to the second queue but not initialize it.
  1334.     submitJob(JobStatus.PREP, 10, 10, "q2", "u1");
  1335.     
  1336.     //call scheduler's reclaim capacity in order to start reclaim capacity
  1337.     //process.
  1338.     scheduler.reclaimCapacity();
  1339.     //advance the clock to the position when the two task of the job would
  1340.     //be killed.
  1341.     clock.advance(600000);
  1342.     //run reclaim capacity
  1343.     scheduler.reclaimCapacity();
  1344.     //check the count of the running tasks.
  1345.     assertEquals(j1.runningMapTasks, 2);
  1346.     
  1347.   }
  1348.   
  1349.   // test code to reclaim capacity with one queue haveing zero GC 
  1350.   // (HADOOP-4988). 
  1351.   // Simple test: reclaim capacity should work even if one of the 
  1352.   // queues has a gc of 0. 
  1353.   public void testReclaimCapacityWithZeroGC() throws Exception {
  1354.     // set up some queues
  1355.     String[] qs = {"default", "q2", "q3"};
  1356.     taskTrackerManager.addQueues(qs);
  1357.     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
  1358.     // we want q3 to have 0 GC. Map slots = 4. 
  1359.     queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
  1360.     queues.add(new FakeQueueInfo("q2", 40.0f, 1000, true, 25));
  1361.     queues.add(new FakeQueueInfo("q3", 10.0f, 1000, true, 25));
  1362.     // note: because of the way we convert gc% into actual gc, q2's gc
  1363.     // will be 1, not 2. 
  1364.     resConf.setFakeQueues(queues);
  1365.     resConf.setReclaimCapacityInterval(500);
  1366.     scheduler.setResourceManagerConf(resConf);
  1367.     scheduler.start();
  1368.     // set up a situation where q2 is under capacity, and default 
  1369.     // is over capacity
  1370.     FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
  1371.     //FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 10, 10, "q3", "u1");
  1372.     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
  1373.     checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
  1374.     checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
  1375.     checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
  1376.     // now submit a job to q2
  1377.     FakeJobInProgress j3 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
  1378.     // get scheduler to notice that q2 needs to reclaim
  1379.     scheduler.reclaimCapacity();
  1380.     // our queue reclaim time is 1000s, heartbeat interval is 5 sec, so 
  1381.     // we start reclaiming when 15 secs are left. 
  1382.     clock.advance(400000);
  1383.     scheduler.reclaimCapacity();
  1384.     // no tasks should have been killed yet
  1385.     assertEquals(j1.runningMapTasks, 4);
  1386.     clock.advance(200000);
  1387.     scheduler.reclaimCapacity();
  1388.     // task from j1 will be killed
  1389.     assertEquals(j1.runningMapTasks, 3);
  1390.     
  1391.   }
  1392.   /*
  1393.    * Following is the testing strategy for testing scheduling information.
  1394.    * - start capacity scheduler with two queues.
  1395.    * - check the scheduling information with respect to the configuration
  1396.    * which was used to configure the queues.
  1397.    * - Submit 5 jobs to a queue.
  1398.    * - Check the waiting jobs count, it should be 5.
  1399.    * - Then run initializationPoller()
  1400.    * - Check once again the waiting queue, it should be 5 jobs again.
  1401.    * - Then raise status change events.
  1402.    * - Assign one task to a task tracker. (Map)
  1403.    * - Check waiting job count, it should be 4 now and used map (%) = 100
  1404.    * - Assign another one task (Reduce)
  1405.    * - Check waiting job count, it should be 4 now and used map (%) = 100
  1406.    * and used reduce (%) = 100
  1407.    * - finish the job and then check the used percentage it should go
  1408.    * back to zero
  1409.    * - Then pick an initialized job but not scheduled job and fail it.
  1410.    * - Run the poller
  1411.    * - Check the waiting job count should now be 3.
  1412.    * - Now fail a job which has not been initialized at all.
  1413.    * - Run the poller, so that it can clean up the job queue.
  1414.    * - Check the count, the waiting job count should be 2.
  1415.    * - Now raise status change events to move the initialized jobs which 
  1416.    * should be two in count to running queue.
  1417.    * - Then schedule a map of the job in running queue. 
  1418.    * - Run the poller because the poller is responsible for waiting
  1419.    * jobs count. Check the count, it should be using 100% map and one
  1420.    * waiting job
  1421.    * - fail the running job.
  1422.    * - Check the count, it should be now one waiting job and zero running
  1423.    * tasks
  1424.    */
  1425.   
  1426.   public void testSchedulingInformation() throws Exception {
  1427.     String[] qs = {"default", "q2"};
  1428.     taskTrackerManager = new FakeTaskTrackerManager(2, 1, 1);
  1429.     scheduler.setTaskTrackerManager(taskTrackerManager);
  1430.     taskTrackerManager.addQueues(qs);
  1431.     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
  1432.     queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
  1433.     queues.add(new FakeQueueInfo("q2", 50.0f, 1000, true, 25));
  1434.     resConf.setFakeQueues(queues);
  1435.     scheduler.setResourceManagerConf(resConf);
  1436.     scheduler.start();
  1437.     scheduler.assignTasks(tracker("tt1")); // heartbeat
  1438.     scheduler.assignTasks(tracker("tt2")); // heartbeat
  1439.     int totalMaps = taskTrackerManager.getClusterStatus().getMaxMapTasks();
  1440.     int totalReduces = taskTrackerManager.getClusterStatus().getMaxReduceTasks();
  1441.     QueueManager queueManager = scheduler.taskTrackerManager.getQueueManager();
  1442.     String schedulingInfo = queueManager.getJobQueueInfo("default").getSchedulingInfo();
  1443.     String schedulingInfo2 = queueManager.getJobQueueInfo("q2").getSchedulingInfo();
  1444.     String[] infoStrings = schedulingInfo.split("n");
  1445.     
  1446.     assertEquals(infoStrings.length, 17);
  1447.     assertEquals(infoStrings[1] , "Guaranteed Capacity Percentage: 50.0%");
  1448.     assertEquals(infoStrings[7] , "Guaranteed Capacity: " + totalMaps * 50/100);
  1449.     assertEquals(infoStrings[11] , "Guaranteed Capacity: " + totalReduces * 50/100);
  1450.     assertEquals(infoStrings[2] , "User Limit: 25%");
  1451.     assertEquals(infoStrings[3] , "Reclaim Time limit: " + 
  1452.         StringUtils.formatTime(1000000));
  1453.     assertEquals(infoStrings[4] , "Priority Supported: YES");
  1454.     assertEquals(infoStrings[8], "Running tasks: 0.0% of Guaranteed Capacity");
  1455.     assertEquals(infoStrings[12],"Running tasks: 0.0% of Guaranteed Capacity");
  1456.     assertEquals(infoStrings[15] , "Number of Waiting Jobs: 0");
  1457.     assertEquals(infoStrings[16] , "Number of users who have submitted jobs: 0");
  1458.     assertEquals(schedulingInfo, schedulingInfo2);
  1459.     
  1460.     //Testing with actual job submission.
  1461.     ArrayList<FakeJobInProgress> userJobs = 
  1462.       submitJobs(1, 5, "default").get("u1");
  1463.     schedulingInfo = 
  1464.       queueManager.getJobQueueInfo("default").getSchedulingInfo();
  1465.     infoStrings = schedulingInfo.split("n");
  1466.     
  1467.     //waiting job should be equal to number of jobs submitted.
  1468.     assertEquals(infoStrings.length, 17);
  1469.     assertEquals(infoStrings[8], "Running tasks: 0.0% of Guaranteed Capacity");
  1470.     assertEquals(infoStrings[12],"Running tasks: 0.0% of Guaranteed Capacity");
  1471.     assertEquals(infoStrings[15] , "Number of Waiting Jobs: 5");
  1472.     
  1473.     //Initalize the jobs but don't raise events
  1474.     controlledInitializationPoller.selectJobsToInitialize();
  1475.     
  1476.     schedulingInfo = 
  1477.       queueManager.getJobQueueInfo("default").getSchedulingInfo();
  1478.     infoStrings = schedulingInfo.split("n");
  1479.     assertEquals(infoStrings.length, 17);
  1480.     //should be previous value as nothing is scheduled because no events
  1481.     //has been raised after initialization.
  1482.     assertEquals(infoStrings[8], "Running tasks: 0.0% of Guaranteed Capacity");
  1483.     assertEquals(infoStrings[12],"Running tasks: 0.0% of Guaranteed Capacity");
  1484.     assertEquals(infoStrings[15] , "Number of Waiting Jobs: 5");
  1485.     
  1486.     //Raise status change event so that jobs can move to running queue.
  1487.     raiseStatusChangeEvents(scheduler.jobQueuesManager);
  1488.     //assign one job
  1489.     Task t1 = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
  1490.     //Initalize extra job.
  1491.     controlledInitializationPoller.selectJobsToInitialize();
  1492.     
  1493.     //Get scheduling information, now the number of waiting job should have
  1494.     //changed to 4 as one is scheduled and has become running.
  1495.     // make sure we update our stats
  1496.     scheduler.updateQSIInfoForTests();
  1497.     schedulingInfo = 
  1498.       queueManager.getJobQueueInfo("default").getSchedulingInfo();
  1499.     infoStrings = schedulingInfo.split("n");
  1500.     assertEquals(infoStrings.length, 19);
  1501.     assertEquals(infoStrings[8], "Running tasks: 100.0% of Guaranteed Capacity");
  1502.     assertEquals(infoStrings[14],"Running tasks: 0.0% of Guaranteed Capacity");
  1503.     assertEquals(infoStrings[17] , "Number of Waiting Jobs: 4");
  1504.     
  1505.     //assign a reduce task
  1506.     Task t2 = checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
  1507.     // make sure we update our stats
  1508.     scheduler.updateQSIInfoForTests();
  1509.     schedulingInfo = 
  1510.       queueManager.getJobQueueInfo("default").getSchedulingInfo();
  1511.     infoStrings = schedulingInfo.split("n");
  1512.     assertEquals(infoStrings.length, 21);
  1513.     assertEquals(infoStrings[8], "Running tasks: 100.0% of Guaranteed Capacity");
  1514.     assertEquals(infoStrings[14],"Running tasks: 100.0% of Guaranteed Capacity");
  1515.     assertEquals(infoStrings[19] , "Number of Waiting Jobs: 4");
  1516.     
  1517.     //Complete the job and check the running tasks count
  1518.     FakeJobInProgress u1j1 = userJobs.get(0);
  1519.     taskTrackerManager.finishTask("tt1", t1.getTaskID().toString(), u1j1);
  1520.     taskTrackerManager.finishTask("tt1", t2.getTaskID().toString(), u1j1);
  1521.     taskTrackerManager.finalizeJob(u1j1);
  1522.     
  1523.     // make sure we update our stats
  1524.     scheduler.updateQSIInfoForTests();
  1525.     schedulingInfo = 
  1526.       queueManager.getJobQueueInfo("default").getSchedulingInfo();
  1527.     infoStrings = schedulingInfo.split("n");
  1528.     assertEquals(infoStrings.length, 17);
  1529.     assertEquals(infoStrings[8], "Running tasks: 0.0% of Guaranteed Capacity");
  1530.     assertEquals(infoStrings[12],"Running tasks: 0.0% of Guaranteed Capacity");
  1531.     assertEquals(infoStrings[15] , "Number of Waiting Jobs: 4");
  1532.     
  1533.     //Fail a job which is initialized but not scheduled and check the count.
  1534.     FakeJobInProgress u1j2 = userJobs.get(1);
  1535.     assertTrue("User1 job 2 not initalized ", 
  1536.         u1j2.getStatus().getRunState() == JobStatus.RUNNING);
  1537.     taskTrackerManager.finalizeJob(u1j2, JobStatus.FAILED);
  1538.     //Run initializer to clean up failed jobs
  1539.     controlledInitializationPoller.selectJobsToInitialize();
  1540.     // make sure we update our stats
  1541.     scheduler.updateQSIInfoForTests();
  1542.     schedulingInfo = 
  1543.       queueManager.getJobQueueInfo("default").getSchedulingInfo();
  1544.     infoStrings = schedulingInfo.split("n");
  1545.     assertEquals(infoStrings.length, 17);
  1546.     assertEquals(infoStrings[8], "Running tasks: 0.0% of Guaranteed Capacity");
  1547.     assertEquals(infoStrings[12],"Running tasks: 0.0% of Guaranteed Capacity");
  1548.     assertEquals(infoStrings[15] , "Number of Waiting Jobs: 3");
  1549.     
  1550.     //Fail a job which is not initialized but is in the waiting queue.
  1551.     FakeJobInProgress u1j5 = userJobs.get(4);
  1552.     assertFalse("User1 job 5 initalized ", 
  1553.         u1j5.getStatus().getRunState() == JobStatus.RUNNING);
  1554.     
  1555.     taskTrackerManager.finalizeJob(u1j5, JobStatus.FAILED);
  1556.     //run initializer to clean up failed job
  1557.     controlledInitializationPoller.selectJobsToInitialize();
  1558.     // make sure we update our stats
  1559.     scheduler.updateQSIInfoForTests();
  1560.     schedulingInfo = 
  1561.       queueManager.getJobQueueInfo("default").getSchedulingInfo();
  1562.     infoStrings = schedulingInfo.split("n");
  1563.     assertEquals(infoStrings.length, 17);
  1564.     assertEquals(infoStrings[8], "Running tasks: 0.0% of Guaranteed Capacity");
  1565.     assertEquals(infoStrings[12],"Running tasks: 0.0% of Guaranteed Capacity");
  1566.     assertEquals(infoStrings[15] , "Number of Waiting Jobs: 2");
  1567.     
  1568.     //Raise status change events as none of the intialized jobs would be
  1569.     //in running queue as we just failed the second job which was initialized
  1570.     //and completed the first one.
  1571.     raiseStatusChangeEvents(scheduler.jobQueuesManager);
  1572.     
  1573.     //Now schedule a map should be job3 of the user as job1 succeeded job2
  1574.     //failed and now job3 is running
  1575.     t1 = checkAssignment("tt1", "attempt_test_0003_m_000001_0 on tt1");
  1576.     FakeJobInProgress u1j3 = userJobs.get(2);
  1577.     assertTrue("User Job 3 not running ", 
  1578.         u1j3.getStatus().getRunState() == JobStatus.RUNNING);
  1579.     
  1580.     //now the running count of map should be one and waiting jobs should be
  1581.     //one. run the poller as it is responsible for waiting count
  1582.     controlledInitializationPoller.selectJobsToInitialize();
  1583.     // make sure we update our stats
  1584.     scheduler.updateQSIInfoForTests();
  1585.     schedulingInfo = 
  1586.       queueManager.getJobQueueInfo("default").getSchedulingInfo();
  1587.     infoStrings = schedulingInfo.split("n");
  1588.     assertEquals(infoStrings.length, 19);
  1589.     assertEquals(infoStrings[8], "Running tasks: 100.0% of Guaranteed Capacity");
  1590.     assertEquals(infoStrings[14],"Running tasks: 0.0% of Guaranteed Capacity");
  1591.     assertEquals(infoStrings[17] , "Number of Waiting Jobs: 1");
  1592.     
  1593.     //Fail the executing job
  1594.     taskTrackerManager.finalizeJob(u1j3, JobStatus.FAILED);
  1595.     // make sure we update our stats
  1596.     scheduler.updateQSIInfoForTests();
  1597.     //Now running counts should become zero
  1598.     schedulingInfo = 
  1599.       queueManager.getJobQueueInfo("default").getSchedulingInfo();
  1600.     infoStrings = schedulingInfo.split("n");
  1601.     assertEquals(infoStrings.length, 17);
  1602.     assertEquals(infoStrings[8], "Running tasks: 0.0% of Guaranteed Capacity");
  1603.     assertEquals(infoStrings[12],"Running tasks: 0.0% of Guaranteed Capacity");
  1604.     assertEquals(infoStrings[15] , "Number of Waiting Jobs: 1");
  1605.     
  1606.   }
  1607.   /**
  1608.    * Test to verify that highMemoryJobs are scheduled like all other jobs when
  1609.    * memory-based scheduling is not enabled.
  1610.    * @throws IOException
  1611.    */
  1612.   public void testDisabledMemoryBasedScheduling()
  1613.       throws IOException {
  1614.     LOG.debug("Starting the scheduler.");
  1615.     taskTrackerManager = new FakeTaskTrackerManager(1, 1, 1);
  1616.     // Limited TT - 1GB vmem and 512MB pmem
  1617.     taskTrackerManager.getTaskTracker("tt1").getResourceStatus()
  1618.         .setTotalVirtualMemory(1 * 1024 * 1024 * 1024L);
  1619.     taskTrackerManager.getTaskTracker("tt1").getResourceStatus()
  1620.         .setTotalPhysicalMemory(512 * 1024 * 1024L);
  1621.     taskTrackerManager.addQueues(new String[] { "default" });
  1622.     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
  1623.     queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25));
  1624.     resConf.setFakeQueues(queues);
  1625.     scheduler.setResourceManagerConf(resConf);
  1626.     scheduler.setTaskTrackerManager(taskTrackerManager);
  1627.     // memory-based scheduling disabled by default.
  1628.     scheduler.start();
  1629.     LOG.debug("Submit one high memory(3GB vmem, 1GBpmem) job of 1 map task "
  1630.         + "and 1 reduce task.");
  1631.     JobConf jConf = new JobConf();
  1632.     jConf.setMaxVirtualMemoryForTask(3 * 1024 * 1024 * 1024L); // 3GB vmem
  1633.     jConf.setMaxPhysicalMemoryForTask(1 * 1024 * 1024 * 1024L); // 1 GB pmem
  1634.     jConf.setNumMapTasks(1);
  1635.     jConf.setNumReduceTasks(1);
  1636.     jConf.setQueueName("default");
  1637.     jConf.setUser("u1");
  1638.     submitJobAndInit(JobStatus.RUNNING, jConf);
  1639.     // assert that all tasks are launched even though they transgress the
  1640.     // scheduling limits.
  1641.     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
  1642.     checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
  1643.   }
  1644.   /**
  1645.    * Test to verify that highPmemJobs are scheduled like all other jobs when
  1646.    * physical-memory based scheduling is not enabled.
  1647.    * @throws IOException
  1648.    */
  1649.   public void testDisabledPmemBasedScheduling()
  1650.       throws IOException {
  1651.     LOG.debug("Starting the scheduler.");
  1652.     taskTrackerManager = new FakeTaskTrackerManager(1, 1, 1);
  1653.     // Limited TT - 100GB vmem and 500MB pmem
  1654.     TaskTrackerStatus.ResourceStatus ttStatus =
  1655.         taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
  1656.     ttStatus.setTotalVirtualMemory(100 * 1024 * 1024 * 1024L);
  1657.     ttStatus.setReservedVirtualMemory(0);
  1658.     ttStatus.setTotalPhysicalMemory(500 * 1024 * 1024L);
  1659.     ttStatus.setReservedPhysicalMemory(0);
  1660.     taskTrackerManager.addQueues(new String[] { "default" });
  1661.     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
  1662.     queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25));
  1663.     resConf.setFakeQueues(queues);
  1664.     scheduler.setResourceManagerConf(resConf);
  1665.     scheduler.setTaskTrackerManager(taskTrackerManager);
  1666.     // enable vmem-based scheduling. pmem based scheduling disabled by default.
  1667.     scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
  1668.         1536 * 1024 * 1024L);
  1669.     scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
  1670.         3 * 1024 * 1024 * 1024L);
  1671.     scheduler.start();
  1672.     LOG.debug("Submit one high pmem(3GB vmem, 1GBpmem) job of 1 map task "
  1673.         + "and 1 reduce task.");
  1674.     JobConf jConf = new JobConf();
  1675.     jConf.setMaxVirtualMemoryForTask(3 * 1024 * 1024 * 1024L); // 3GB vmem
  1676.     jConf.setMaxPhysicalMemoryForTask(1 * 1024 * 1024 * 1024L); // 1 GB pmem
  1677.     jConf.setNumMapTasks(1);
  1678.     jConf.setNumReduceTasks(1);
  1679.     jConf.setQueueName("default");
  1680.     jConf.setUser("u1");
  1681.     submitJobAndInit(JobStatus.RUNNING, jConf);
  1682.     // assert that all tasks are launched even though they transgress the
  1683.     // scheduling limits.
  1684.     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
  1685.     checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
  1686.   }
  1687.   /**
  1688.    * Test HighMemoryJobs.
  1689.    * @throws IOException
  1690.    */
  1691.   public void testHighMemoryJobs()
  1692.       throws IOException {
  1693.     LOG.debug("Starting the scheduler.");
  1694.     taskTrackerManager = new FakeTaskTrackerManager(1, 1, 1);
  1695.     TaskTrackerStatus.ResourceStatus ttStatus =
  1696.         taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
  1697.     ttStatus.setTotalVirtualMemory(3 * 1024 * 1024 * 1024L);
  1698.     ttStatus.setReservedVirtualMemory(0);
  1699.     ttStatus.setTotalPhysicalMemory(1 * 1024 * 1024 * 1024L);
  1700.     ttStatus.setReservedPhysicalMemory(0);
  1701.     // Normal job on this TT would be 1.5GB vmem, 0.5GB pmem
  1702.     taskTrackerManager.addQueues(new String[] { "default" });
  1703.     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
  1704.     queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25));
  1705.     resConf.setFakeQueues(queues);
  1706.     scheduler.setTaskTrackerManager(taskTrackerManager);
  1707.     // enabled memory-based scheduling
  1708.     scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
  1709.         1536 * 1024 * 1024L);
  1710.     scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
  1711.         3 * 1024 * 1024 * 1024L);
  1712.     resConf.setDefaultPercentOfPmemInVmem(33.3f);
  1713.     resConf.setLimitMaxPmemForTasks(1 * 1024 * 1024 * 1024L);
  1714.     scheduler.setResourceManagerConf(resConf);
  1715.     scheduler.start();
  1716.     LOG.debug("Submit one high memory(1600MB vmem, 400MB pmem) job of "
  1717.         + "1 map task and 1 reduce task.");
  1718.     JobConf jConf = new JobConf();
  1719.     jConf.setMaxVirtualMemoryForTask(1600 * 1024 * 1024L); // 1.6GB vmem
  1720.     jConf.setMaxPhysicalMemoryForTask(400 * 1024 * 1024L); // 400MB pmem
  1721.     jConf.setNumMapTasks(1);
  1722.     jConf.setNumReduceTasks(1);
  1723.     jConf.setQueueName("default");
  1724.     jConf.setUser("u1");
  1725.     FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
  1726.     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
  1727.     // No more tasks of this job can run on the TT because of lack of vmem
  1728.     assertNull(scheduler.assignTasks(tracker("tt1")));
  1729.     // Let attempt_test_0001_m_000001_0 finish, task assignment should succeed.
  1730.     taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", job1);
  1731.     checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
  1732.     LOG.debug("Submit another high memory(1200MB vmem, 800MB pmem) job of "
  1733.         + "1 map task and 0 reduces.");
  1734.     jConf.setMaxVirtualMemoryForTask(1200 * 1024 * 1024L);
  1735.     jConf.setMaxPhysicalMemoryForTask(800 * 1024 * 1024L);
  1736.     jConf.setNumMapTasks(1);
  1737.     jConf.setNumReduceTasks(0);
  1738.     jConf.setQueueName("default");
  1739.     jConf.setUser("u1");
  1740.     submitJobAndInit(JobStatus.PREP, jConf); // job2
  1741.     // This job shouldn't run the TT now because of lack of pmem
  1742.     assertNull(scheduler.assignTasks(tracker("tt1")));
  1743.     // Let attempt_test_0001_m_000002_0 finish, task assignment should succeed.
  1744.     taskTrackerManager.finishTask("tt1", "attempt_test_0001_r_000001_0", job1);
  1745.     checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
  1746.     LOG.debug("Submit a normal memory(200MB vmem, 100MB pmem) job of "
  1747.         + "0 maps and 1 reduce task.");
  1748.     jConf.setMaxVirtualMemoryForTask(200 * 1024 * 1024L);
  1749.     jConf.setMaxPhysicalMemoryForTask(100 * 1024 * 1024L);
  1750.     jConf.setNumMapTasks(0);
  1751.     jConf.setNumReduceTasks(1);
  1752.     jConf.setQueueName("default");
  1753.     jConf.setUser("u1");
  1754.     submitJobAndInit(JobStatus.PREP, jConf); // job3
  1755.     checkAssignment("tt1", "attempt_test_0003_r_000001_0 on tt1");
  1756.   }
  1757.   /**
  1758.    * Test HADOOP-4979. 
  1759.    * Bug fix for making sure we always return null to TT if there is a 
  1760.    * high-mem job, and not look at reduce jobs (if map tasks are high-mem)
  1761.    * or vice-versa.
  1762.    * @throws IOException
  1763.    */
  1764.   public void testHighMemoryBlocking()
  1765.       throws IOException {
  1766.     // 2 map and 1 reduce slots
  1767.     taskTrackerManager = new FakeTaskTrackerManager(1, 2, 1);
  1768.     TaskTrackerStatus.ResourceStatus ttStatus =
  1769.         taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
  1770.     ttStatus.setTotalVirtualMemory(3 * 1024 * 1024 * 1024L);
  1771.     ttStatus.setReservedVirtualMemory(0);
  1772.     ttStatus.setTotalPhysicalMemory(1536 * 1024 * 1024L);
  1773.     ttStatus.setReservedPhysicalMemory(0);
  1774.     // Normal job on this TT would be 1GB vmem, 0.5GB pmem
  1775.     taskTrackerManager.addQueues(new String[] { "default" });
  1776.     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
  1777.     queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25));
  1778.     resConf.setFakeQueues(queues);
  1779.     scheduler.setTaskTrackerManager(taskTrackerManager);
  1780.     // enabled memory-based scheduling
  1781.     scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
  1782.         1 * 1024 * 1024 * 1024L);
  1783.     scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
  1784.         3 * 1024 * 1024 * 1024L);
  1785.     resConf.setDefaultPercentOfPmemInVmem(33.3f);
  1786.     resConf.setLimitMaxPmemForTasks(1536 * 1024 * 1024L);
  1787.     scheduler.setResourceManagerConf(resConf);
  1788.     scheduler.start();
  1789.     // We need a situation where the scheduler needs to run a map task, 
  1790.     // but the available one has a high-mem requirement. There should
  1791.     // be another job whose maps or reduces can run, but they shouldn't 
  1792.     // be scheduled.
  1793.     
  1794.     LOG.debug("Submit one high memory(2GB vmem, 400MB pmem) job of "
  1795.         + "2 map tasks");
  1796.     JobConf jConf = new JobConf();
  1797.     jConf.setMaxVirtualMemoryForTask(2 * 1024 * 1024 * 1024L); // 2GB vmem
  1798.     jConf.setMaxPhysicalMemoryForTask(400 * 1024 * 1024L); // 400MB pmem
  1799.     jConf.setNumMapTasks(2);
  1800.     jConf.setNumReduceTasks(0);
  1801.     jConf.setQueueName("default");
  1802.     jConf.setUser("u1");
  1803.     FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
  1804.     LOG.debug("Submit another regular memory(900MB vmem, 200MB pmem) job of "
  1805.         + "2 map/red tasks");
  1806.     jConf = new JobConf();
  1807.     jConf.setMaxVirtualMemoryForTask(900 * 1024 * 1024L); // 900MB vmem
  1808.     jConf.setMaxPhysicalMemoryForTask(200 * 1024 * 1024L); // 200MB pmem
  1809.     jConf.setNumMapTasks(2);
  1810.     jConf.setNumReduceTasks(2);
  1811.     jConf.setQueueName("default");
  1812.     jConf.setUser("u1");
  1813.     FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
  1814.     
  1815.     // first, a map from j1 will run
  1816.     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
  1817.     // at this point, the scheduler tries to schedule another map from j1. 
  1818.     // there isn't enough space. There is space to run the second job's
  1819.     // map or reduce task, but they shouldn't be scheduled
  1820.     assertNull(scheduler.assignTasks(tracker("tt1")));
  1821.   }
  1822.   
  1823.   /**
  1824.    * test invalid highMemoryJobs
  1825.    * @throws IOException
  1826.    */
  1827.   public void testHighMemoryJobWithInvalidRequirements()
  1828.       throws IOException {
  1829.     LOG.debug("Starting the scheduler.");
  1830.     taskTrackerManager = new FakeTaskTrackerManager(1, 1, 1);
  1831.     TaskTrackerStatus.ResourceStatus ttStatus =
  1832.         taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
  1833.     ttStatus.setTotalVirtualMemory(3 * 1024 * 1024 * 1024);
  1834.     ttStatus.setReservedVirtualMemory(0);
  1835.     ttStatus.setTotalPhysicalMemory(1 * 1024 * 1024 * 1024);
  1836.     ttStatus.setReservedPhysicalMemory(0);
  1837.     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
  1838.     queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25));
  1839.     taskTrackerManager.addQueues(new String[] { "default" });
  1840.     resConf.setFakeQueues(queues);
  1841.     scheduler.setTaskTrackerManager(taskTrackerManager);
  1842.     // enabled memory-based scheduling
  1843.     long vmemUpperLimit = 1 * 1024 * 1024 * 1024L;
  1844.     long vmemDefault = 1536 * 1024 * 1024L;
  1845.     long pmemUpperLimit = vmemUpperLimit;
  1846.     scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
  1847.         vmemDefault);
  1848.     scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
  1849.         vmemUpperLimit);
  1850.     resConf.setDefaultPercentOfPmemInVmem(33.3f);
  1851.     resConf.setLimitMaxPmemForTasks(pmemUpperLimit);
  1852.     scheduler.setResourceManagerConf(resConf);
  1853.     scheduler.start();
  1854.     LOG.debug("Submit one invalid high ram(5GB vmem, 3GB pmem) job of "
  1855.         + "1 map, 0 reduce tasks.");
  1856.     long jobMaxVmem = 5 * 1024 * 1024 * 1024L;
  1857.     long jobMaxPmem = 3 * 1024 * 1024 * 1024L;
  1858.     JobConf jConf = new JobConf();
  1859.     jConf.setMaxVirtualMemoryForTask(jobMaxVmem);
  1860.     jConf.setMaxPhysicalMemoryForTask(jobMaxPmem);
  1861.     jConf.setNumMapTasks(1);
  1862.     jConf.setNumReduceTasks(0);
  1863.     jConf.setQueueName("default");
  1864.     jConf.setUser("u1");
  1865.     boolean throwsException = false;
  1866.     String msg = null;
  1867.     FakeJobInProgress job;
  1868.     try {
  1869.       job = submitJob(JobStatus.PREP, jConf);
  1870.     } catch (IOException ioe) {
  1871.       // job has to fail
  1872.       throwsException = true;
  1873.       msg = ioe.getMessage();
  1874.     }
  1875.     assertTrue(throwsException);
  1876.     job = (FakeJobInProgress) taskTrackerManager.getJobs().toArray()[0];
  1877.     assertTrue(msg.matches(job.getJobID() + " \(" + jobMaxVmem + "vmem, "
  1878.         + jobMaxPmem + "pmem\) exceeds the cluster's max-memory-limits \("
  1879.         + vmemUpperLimit + "vmem, " + pmemUpperLimit
  1880.         + "pmem\). Cannot run in this cluster, so killing it."));
  1881.     // For job, no cleanup task needed so gets killed immediately.
  1882.     assertTrue(job.getStatus().getRunState() == JobStatus.KILLED);
  1883.   }
  1884.   /**
  1885.    * Test blocking of cluster for lack of memory.
  1886.    * @throws IOException
  1887.    */
  1888.   public void testClusterBlockingForLackOfMemory()
  1889.       throws IOException {
  1890.     LOG.debug("Starting the scheduler.");
  1891.     taskTrackerManager = new FakeTaskTrackerManager(1, 1, 1);
  1892.     TaskTrackerStatus.ResourceStatus ttStatus =
  1893.         taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
  1894.     ttStatus.setTotalVirtualMemory(3 * 1024 * 1024 * 1024L);
  1895.     ttStatus.setReservedVirtualMemory(0);
  1896.     ttStatus.setTotalPhysicalMemory(1 * 1024 * 1024 * 1024L);
  1897.     ttStatus.setReservedPhysicalMemory(0);
  1898.     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
  1899.     queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25));
  1900.     taskTrackerManager.addQueues(new String[] { "default" });
  1901.     resConf.setFakeQueues(queues);
  1902.     scheduler.setTaskTrackerManager(taskTrackerManager);
  1903.     // enabled memory-based scheduling
  1904.     scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
  1905.         1536 * 1024 * 1024L);
  1906.     scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
  1907.         4 * 1024 * 1024 * 1024L);
  1908.     resConf.setDefaultPercentOfPmemInVmem(33.3f);
  1909.     resConf.setLimitMaxPmemForTasks(2 * 1024 * 1024 * 1024L);
  1910.     scheduler.setResourceManagerConf(resConf);
  1911.     scheduler.start();
  1912.     LOG.debug("Submit one high memory(4GB vmem, 512MB pmem) job of "
  1913.         + "1 map, 0 reduce tasks.");
  1914.     JobConf jConf = new JobConf();
  1915.     jConf.setMaxVirtualMemoryForTask(4 * 1024 * 1024 * 1024L);
  1916.     jConf.setMaxPhysicalMemoryForTask(512 * 1024 * 1024L);
  1917.     jConf.setNumMapTasks(1);
  1918.     jConf.setNumReduceTasks(0);
  1919.     jConf.setQueueName("default");
  1920.     jConf.setUser("u1");
  1921.     FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
  1922.     // TTs should not run these jobs i.e. cluster blocked because of lack of
  1923.     // vmem
  1924.     assertNull(scheduler.assignTasks(tracker("tt1")));
  1925.     assertNull(scheduler.assignTasks(tracker("tt1")));
  1926.     // Job should still be alive
  1927.     assertTrue(job1.getStatus().getRunState() == JobStatus.RUNNING);
  1928.     LOG.debug("Submit a normal job of 1 map, 0 reduce tasks.");
  1929.     // Use cluster-wide defaults
  1930.     jConf.setMaxVirtualMemoryForTask(JobConf.DISABLED_MEMORY_LIMIT);
  1931.     jConf.setMaxPhysicalMemoryForTask(JobConf.DISABLED_MEMORY_LIMIT);
  1932.     FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
  1933.     // cluster should still be blocked for job1 and so even job2 should not run
  1934.     // even though it is a normal job
  1935.     assertNull(scheduler.assignTasks(tracker("tt1")));
  1936.     scheduler.taskTrackerManager.killJob(job2.getJobID());
  1937.     scheduler.taskTrackerManager.killJob(job1.getJobID());
  1938.     LOG.debug("Submit one high memory(2GB vmem, 2GB pmem) job of "
  1939.         + "1 map, 0 reduce tasks.");
  1940.     jConf.setMaxVirtualMemoryForTask(2 * 1024 * 1024 * 1024L);
  1941.     jConf.setMaxPhysicalMemoryForTask(2 * 1024 * 1024 * 1024L);
  1942.     FakeJobInProgress job3 = submitJobAndInit(JobStatus.PREP, jConf);
  1943.     // TTs should not run these jobs i.e. cluster blocked because of lack of
  1944.     // pmem now.
  1945.     assertNull(scheduler.assignTasks(tracker("tt1")));
  1946.     assertNull(scheduler.assignTasks(tracker("tt1")));
  1947.     
  1948.     // Job should still be alive
  1949.     assertTrue(job3.getStatus().getRunState() == JobStatus.RUNNING);
  1950.     LOG.debug("Submit a normal job of 1 map, 0 reduce tasks.");
  1951.     // Use cluster-wide defaults
  1952.     jConf.setMaxVirtualMemoryForTask(JobConf.DISABLED_MEMORY_LIMIT);
  1953.     jConf.setMaxPhysicalMemoryForTask(JobConf.DISABLED_MEMORY_LIMIT);
  1954.     submitJobAndInit(JobStatus.PREP, jConf); // job4
  1955.     // cluster should still be blocked for job3 and so even job4 should not run
  1956.     // even though it is a normal job
  1957.     assertNull(scheduler.assignTasks(tracker("tt1")));
  1958.   }
  1959.   protected TaskTrackerStatus tracker(String taskTrackerName) {
  1960.     return taskTrackerManager.getTaskTracker(taskTrackerName);
  1961.   }
  1962.   
  1963.   protected Task checkAssignment(String taskTrackerName,
  1964.       String expectedTaskString) throws IOException {
  1965.     List<Task> tasks = scheduler.assignTasks(tracker(taskTrackerName));
  1966.     assertNotNull(expectedTaskString, tasks);
  1967.     assertEquals(expectedTaskString, 1, tasks.size());
  1968.     assertEquals(expectedTaskString, tasks.get(0).toString());
  1969.     return tasks.get(0);
  1970.   }
  1971.   
  1972.   /*
  1973.    * Test cases for Job Initialization poller.
  1974.    */
  1975.   
  1976.   /*
  1977.    * This test verifies that the correct number of jobs for
  1978.    * correct number of users is initialized.
  1979.    * It also verifies that as jobs of users complete, new jobs
  1980.    * from the correct users are initialized.
  1981.    */
  1982.   public void testJobInitialization() throws Exception {
  1983.     // set up the scheduler
  1984.     String[] qs = { "default" };
  1985.     taskTrackerManager = new FakeTaskTrackerManager(2, 1, 1);
  1986.     scheduler.setTaskTrackerManager(taskTrackerManager);
  1987.     taskTrackerManager.addQueues(qs);
  1988.     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
  1989.     queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 100));
  1990.     resConf.setFakeQueues(queues);
  1991.     scheduler.setResourceManagerConf(resConf);
  1992.     scheduler.start();
  1993.   
  1994.     JobQueuesManager mgr = scheduler.jobQueuesManager;
  1995.     JobInitializationPoller initPoller = scheduler.getInitializationPoller();
  1996.     // submit 4 jobs each for 3 users.
  1997.     HashMap<String, ArrayList<FakeJobInProgress>> userJobs = submitJobs(3,
  1998.         4, "default");
  1999.     // get the jobs submitted.
  2000.     ArrayList<FakeJobInProgress> u1Jobs = userJobs.get("u1");
  2001.     ArrayList<FakeJobInProgress> u2Jobs = userJobs.get("u2");
  2002.     ArrayList<FakeJobInProgress> u3Jobs = userJobs.get("u3");
  2003.     
  2004.     // reference to the initializedJobs data structure
  2005.     // changes are reflected in the set as they are made by the poller
  2006.     Set<JobID> initializedJobs = initPoller.getInitializedJobList();
  2007.     
  2008.     // we should have 12 (3 x 4) jobs in the job queue
  2009.     assertEquals(mgr.getWaitingJobs("default").size(), 12);
  2010.     // run one poller iteration.
  2011.     controlledInitializationPoller.selectJobsToInitialize();
  2012.     
  2013.     // the poller should initialize 6 jobs
  2014.     // 3 users and 2 jobs from each
  2015.     assertEquals(initializedJobs.size(), 6);
  2016.     assertTrue("Initialized jobs didnt contain the user1 job 1",
  2017.         initializedJobs.contains(u1Jobs.get(0).getJobID()));
  2018.     assertTrue("Initialized jobs didnt contain the user1 job 2",
  2019.         initializedJobs.contains(u1Jobs.get(1).getJobID()));
  2020.     assertTrue("Initialized jobs didnt contain the user2 job 1",
  2021.         initializedJobs.contains(u2Jobs.get(0).getJobID()));
  2022.     assertTrue("Initialized jobs didnt contain the user2 job 2",
  2023.         initializedJobs.contains(u2Jobs.get(1).getJobID()));
  2024.     assertTrue("Initialized jobs didnt contain the user3 job 1",
  2025.         initializedJobs.contains(u3Jobs.get(0).getJobID()));
  2026.     assertTrue("Initialized jobs didnt contain the user3 job 2",
  2027.         initializedJobs.contains(u3Jobs.get(1).getJobID()));
  2028.     
  2029.     // now submit one more job from another user.
  2030.     FakeJobInProgress u4j1 = 
  2031.       submitJob(JobStatus.PREP, 1, 1, "default", "u4");
  2032.     // run the poller again.
  2033.     controlledInitializationPoller.selectJobsToInitialize();
  2034.     
  2035.     // since no jobs have started running, there should be no
  2036.     // change to the initialized jobs.
  2037.     assertEquals(initializedJobs.size(), 6);
  2038.     assertFalse("Initialized jobs contains user 4 jobs",
  2039.         initializedJobs.contains(u4j1.getJobID()));
  2040.     
  2041.     // This event simulates raising the event on completion of setup task
  2042.     // and moves the job to the running list for the scheduler to pick up.
  2043.     raiseStatusChangeEvents(mgr);
  2044.     
  2045.     // get some tasks assigned.
  2046.     Task t1 = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
  2047.     Task t2 = checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
  2048.     Task t3 = checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
  2049.     Task t4 = checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
  2050.     taskTrackerManager.finishTask("tt1", t1.getTaskID().toString(), u1Jobs.get(0));
  2051.     taskTrackerManager.finishTask("tt1", t2.getTaskID().toString(), u1Jobs.get(0));
  2052.     taskTrackerManager.finishTask("tt2", t3.getTaskID().toString(), u1Jobs.get(1));
  2053.     taskTrackerManager.finishTask("tt2", t4.getTaskID().toString(), u1Jobs.get(1));
  2054.     // as some jobs have running tasks, the poller will now
  2055.     // pick up new jobs to initialize.
  2056.     controlledInitializationPoller.selectJobsToInitialize();
  2057.     // count should still be the same
  2058.     assertEquals(initializedJobs.size(), 6);
  2059.     
  2060.     // new jobs that have got into the list
  2061.     assertTrue(initializedJobs.contains(u1Jobs.get(2).getJobID()));
  2062.     assertTrue(initializedJobs.contains(u1Jobs.get(3).getJobID()));
  2063.     raiseStatusChangeEvents(mgr);
  2064.     
  2065.     // the first two jobs are done, no longer in the initialized list.
  2066.     assertFalse("Initialized jobs contains the user1 job 1",
  2067.         initializedJobs.contains(u1Jobs.get(0).getJobID()));
  2068.     assertFalse("Initialized jobs contains the user1 job 2",
  2069.         initializedJobs.contains(u1Jobs.get(1).getJobID()));
  2070.     
  2071.     // finish one more job
  2072.     t1 = checkAssignment("tt1", "attempt_test_0003_m_000001_0 on tt1");
  2073.     t2 = checkAssignment("tt1", "attempt_test_0003_r_000001_0 on tt1");
  2074.     taskTrackerManager.finishTask("tt1", t1.getTaskID().toString(), u1Jobs.get(2));
  2075.     taskTrackerManager.finishTask("tt1", t2.getTaskID().toString(), u1Jobs.get(2));
  2076.     // no new jobs should be picked up, because max user limit
  2077.     // is still 3.
  2078.     controlledInitializationPoller.selectJobsToInitialize();
  2079.     
  2080.     assertEquals(initializedJobs.size(), 5);
  2081.     
  2082.     // run 1 more jobs.. 
  2083.     t1 = checkAssignment("tt1", "attempt_test_0004_m_000001_0 on tt1");
  2084.     t1 = checkAssignment("tt1", "attempt_test_0004_r_000001_0 on tt1");
  2085.     taskTrackerManager.finishTask("tt1", t1.getTaskID().toString(), u1Jobs.get(3));
  2086.     taskTrackerManager.finishTask("tt1", t2.getTaskID().toString(), u1Jobs.get(3));
  2087.     
  2088.     // Now initialised jobs should contain user 4's job, as
  2089.     // user 1's jobs are all done and the number of users is
  2090.     // below the limit
  2091.     controlledInitializationPoller.selectJobsToInitialize();
  2092.     assertEquals(initializedJobs.size(), 5);
  2093.     assertTrue(initializedJobs.contains(u4j1.getJobID()));
  2094.     
  2095.     controlledInitializationPoller.stopRunning();
  2096.   }
  2097.   /*
  2098.    * testHighPriorityJobInitialization() shows behaviour when high priority job
  2099.    * is submitted into a queue and how initialisation happens for the same.
  2100.    */
  2101.   public void testHighPriorityJobInitialization() throws Exception {
  2102.     String[] qs = { "default"};
  2103.     taskTrackerManager.addQueues(qs);
  2104.     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
  2105.     queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 100));
  2106.     resConf.setFakeQueues(queues);
  2107.     scheduler.setResourceManagerConf(resConf);
  2108.     scheduler.start();
  2109.     JobInitializationPoller initPoller = scheduler.getInitializationPoller();
  2110.     Set<JobID> initializedJobsList = initPoller.getInitializedJobList();
  2111.     // submit 3 jobs for 3 users
  2112.     submitJobs(3,3,"default");
  2113.     controlledInitializationPoller.selectJobsToInitialize();
  2114.     assertEquals(initializedJobsList.size(), 6);
  2115.     
  2116.     // submit 2 job for a different user. one of them will be made high priority
  2117.     FakeJobInProgress u4j1 = submitJob(JobStatus.PREP, 1, 1, "default", "u4");
  2118.     FakeJobInProgress u4j2 = submitJob(JobStatus.PREP, 1, 1, "default", "u4");
  2119.     
  2120.     controlledInitializationPoller.selectJobsToInitialize();
  2121.     
  2122.     // shouldn't change
  2123.     assertEquals(initializedJobsList.size(), 6);
  2124.     
  2125.     assertFalse("Contains U4J1 high priority job " , 
  2126.         initializedJobsList.contains(u4j1.getJobID()));
  2127.     assertFalse("Contains U4J2 Normal priority job " , 
  2128.         initializedJobsList.contains(u4j2.getJobID()));
  2129.     // change priority of one job
  2130.     taskTrackerManager.setPriority(u4j1, JobPriority.VERY_HIGH);
  2131.     
  2132.     controlledInitializationPoller.selectJobsToInitialize();
  2133.     
  2134.     // the high priority job should get initialized, but not the
  2135.     // low priority job from u4, as we have already exceeded the
  2136.     // limit.
  2137.     assertEquals(initializedJobsList.size(), 7);
  2138.     assertTrue("Does not contain U4J1 high priority job " , 
  2139.         initializedJobsList.contains(u4j1.getJobID()));
  2140.     assertFalse("Contains U4J2 Normal priority job " , 
  2141.         initializedJobsList.contains(u4j2.getJobID()));
  2142.     controlledInitializationPoller.stopRunning();
  2143.   }
  2144.   
  2145.   public void testJobMovement() throws Exception {
  2146.     String[] qs = { "default"};
  2147.     taskTrackerManager.addQueues(qs);
  2148.     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
  2149.     queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 100));
  2150.     resConf.setFakeQueues(queues);
  2151.     scheduler.setResourceManagerConf(resConf);
  2152.     scheduler.start();
  2153.     
  2154.     JobQueuesManager mgr = scheduler.jobQueuesManager;
  2155.     
  2156.     // check proper running job movement and completion
  2157.     checkRunningJobMovementAndCompletion();
  2158.     // check failed running job movement
  2159.     checkFailedRunningJobMovement();
  2160.     // Check job movement of failed initalized job
  2161.     checkFailedInitializedJobMovement();
  2162.     // Check failed waiting job movement
  2163.     checkFailedWaitingJobMovement();
  2164.     
  2165.   }
  2166.   
  2167.   private void checkRunningJobMovementAndCompletion() throws IOException {
  2168.     
  2169.     JobQueuesManager mgr = scheduler.jobQueuesManager;
  2170.     JobInitializationPoller p = scheduler.getInitializationPoller();
  2171.     // submit a job
  2172.     FakeJobInProgress job = 
  2173.       submitJob(JobStatus.PREP, 1, 1, "default", "u1");
  2174.     controlledInitializationPoller.selectJobsToInitialize();
  2175.     
  2176.     assertEquals(p.getInitializedJobList().size(), 1);
  2177.     // make it running.
  2178.     raiseStatusChangeEvents(mgr);
  2179.     
  2180.     // it should be there in both the queues.
  2181.     assertTrue("Job not present in Job Queue",
  2182.         mgr.getWaitingJobs("default").contains(job));
  2183.     assertTrue("Job not present in Running Queue",
  2184.         mgr.getRunningJobQueue("default").contains(job));
  2185.     
  2186.     // assign a task
  2187.     Task t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
  2188.     t = checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
  2189.     
  2190.     controlledInitializationPoller.selectJobsToInitialize();
  2191.     
  2192.     // now this task should be removed from the initialized list.
  2193.     assertTrue(p.getInitializedJobList().isEmpty());
  2194.     // the job should also be removed from the job queue as tasks
  2195.     // are scheduled
  2196.     assertFalse("Job present in Job Queue",
  2197.         mgr.getWaitingJobs("default").contains(job));
  2198.     
  2199.     // complete tasks and job
  2200.     taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", job);
  2201.     taskTrackerManager.finishTask("tt1", "attempt_test_0001_r_000001_0", job);
  2202.     taskTrackerManager.finalizeJob(job);
  2203.     
  2204.     // make sure it is removed from the run queue
  2205.     assertFalse("Job present in running queue",
  2206.         mgr.getRunningJobQueue("default").contains(job));
  2207.   }
  2208.   
  2209.   private void checkFailedRunningJobMovement() throws IOException {
  2210.     
  2211.     JobQueuesManager mgr = scheduler.jobQueuesManager;
  2212.     
  2213.     //submit a job and initalized the same
  2214.     FakeJobInProgress job = 
  2215.       submitJobAndInit(JobStatus.RUNNING, 1, 1, "default", "u1");
  2216.     
  2217.     //check if the job is present in running queue.
  2218.     assertTrue("Running jobs list does not contain submitted job",
  2219.         mgr.getRunningJobQueue("default").contains(job));
  2220.     
  2221.     taskTrackerManager.finalizeJob(job, JobStatus.KILLED);
  2222.     
  2223.     //check if the job is properly removed from running queue.
  2224.     assertFalse("Running jobs list does not contain submitted job",
  2225.         mgr.getRunningJobQueue("default").contains(job));
  2226.     
  2227.   }
  2228.   private void checkFailedInitializedJobMovement() throws IOException {
  2229.     
  2230.     JobQueuesManager mgr = scheduler.jobQueuesManager;
  2231.     JobInitializationPoller p = scheduler.getInitializationPoller();
  2232.     
  2233.     //submit a job
  2234.     FakeJobInProgress job = submitJob(JobStatus.PREP, 1, 1, "default", "u1");
  2235.     //Initialize the job
  2236.     p.selectJobsToInitialize();
  2237.     //Don't raise the status change event.
  2238.     
  2239.     //check in waiting and initialized jobs list.
  2240.     assertTrue("Waiting jobs list does not contain the job",
  2241.         mgr.getWaitingJobs("default").contains(job));
  2242.     
  2243.     assertTrue("Initialized job does not contain the job",
  2244.         p.getInitializedJobList().contains(job.getJobID()));
  2245.     
  2246.     //fail the initalized job
  2247.     taskTrackerManager.finalizeJob(job, JobStatus.KILLED);
  2248.     
  2249.     //Check if the job is present in waiting queue
  2250.     assertFalse("Waiting jobs list contains failed job",
  2251.         mgr.getWaitingJobs("default").contains(job));
  2252.     
  2253.     //run the poller to do the cleanup
  2254.     p.selectJobsToInitialize();
  2255.     
  2256.     //check for failed job in the initialized job list
  2257.     assertFalse("Initialized jobs  contains failed job",
  2258.         p.getInitializedJobList().contains(job.getJobID()));
  2259.   }
  2260.   
  2261.   private void checkFailedWaitingJobMovement() throws IOException {
  2262.     JobQueuesManager mgr = scheduler.jobQueuesManager;
  2263.     // submit a job
  2264.     FakeJobInProgress job = submitJob(JobStatus.PREP, 1, 1, "default",
  2265.         "u1");
  2266.     
  2267.     // check in waiting and initialized jobs list.
  2268.     assertTrue("Waiting jobs list does not contain the job", mgr
  2269.         .getWaitingJobs("default").contains(job));
  2270.     // fail the waiting job
  2271.     taskTrackerManager.finalizeJob(job, JobStatus.KILLED);
  2272.     // Check if the job is present in waiting queue
  2273.     assertFalse("Waiting jobs list contains failed job", mgr
  2274.         .getWaitingJobs("default").contains(job));
  2275.   }
  2276.   
  2277.   private void raiseStatusChangeEvents(JobQueuesManager mgr) {
  2278.     Collection<JobInProgress> jips = mgr.getWaitingJobs("default");
  2279.     for(JobInProgress jip : jips) {
  2280.       if(jip.getStatus().getRunState() == JobStatus.RUNNING) {
  2281.         JobStatusChangeEvent evt = new JobStatusChangeEvent(jip,
  2282.             EventType.RUN_STATE_CHANGED,jip.getStatus());
  2283.         mgr.jobUpdated(evt);
  2284.       }
  2285.     }
  2286.   }
  2287.   private HashMap<String, ArrayList<FakeJobInProgress>> submitJobs(
  2288.       int numberOfUsers, int numberOfJobsPerUser, String queue)
  2289.       throws Exception{
  2290.     HashMap<String, ArrayList<FakeJobInProgress>> userJobs = 
  2291.       new HashMap<String, ArrayList<FakeJobInProgress>>();
  2292.     for (int i = 1; i <= numberOfUsers; i++) {
  2293.       String user = String.valueOf("u" + i);
  2294.       ArrayList<FakeJobInProgress> jips = new ArrayList<FakeJobInProgress>();
  2295.       for (int j = 1; j <= numberOfJobsPerUser; j++) {
  2296.         jips.add(submitJob(JobStatus.PREP, 1, 1, queue, user));
  2297.       }
  2298.       userJobs.put(user, jips);
  2299.     }
  2300.     return userJobs;
  2301.   }
  2302. }