TestParallelInitialization.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:6k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.IOException;
  20. import java.util.ArrayList;
  21. import java.util.Collection;
  22. import java.util.HashMap;
  23. import java.util.List;
  24. import java.util.Map;
  25. import junit.framework.TestCase;
  26. import org.apache.hadoop.io.IntWritable;
  27. public class TestParallelInitialization extends TestCase {
  28.   
  29.   private static int jobCounter;
  30.   private static final int NUM_JOBS = 3;
  31.   IntWritable numJobsCompleted = new IntWritable();
  32.   
  33.   static void resetCounters() {
  34.     jobCounter = 0;
  35.   }
  36.   
  37.   class FakeJobInProgress extends JobInProgress {
  38.    
  39.     public FakeJobInProgress(JobConf jobConf,
  40.         FakeTaskTrackerManager taskTrackerManager) throws IOException {
  41.       super(new JobID("test", ++jobCounter), jobConf);
  42.       this.startTime = System.currentTimeMillis();
  43.       this.status = new JobStatus(getJobID(), 0f, 0f, JobStatus.PREP);
  44.       this.status.setJobPriority(JobPriority.NORMAL);
  45.       this.status.setStartTime(startTime);
  46.     }
  47.     @Override
  48.     public synchronized void initTasks() throws IOException {
  49.       try {
  50.         int jobNumber = this.getJobID().getId();
  51.         synchronized (numJobsCompleted) {
  52.           while (numJobsCompleted.get() != (NUM_JOBS - jobNumber)) {
  53.             numJobsCompleted.wait();
  54.           }
  55.           numJobsCompleted.set(numJobsCompleted.get() + 1);
  56.           numJobsCompleted.notifyAll();
  57.           LOG.info("JobNumber " + jobNumber + " succeeded");
  58.         }
  59.       } catch (InterruptedException ie) {};
  60.       this.status.setRunState(JobStatus.SUCCEEDED);
  61.     }
  62.     @Override
  63.     synchronized void fail() {
  64.       this.status.setRunState(JobStatus.FAILED);
  65.     }
  66.   }
  67.   
  68.   static class FakeTaskTrackerManager implements TaskTrackerManager {
  69.     
  70.     int maps = 0;
  71.     int reduces = 0;
  72.     int maxMapTasksPerTracker = 2;
  73.     int maxReduceTasksPerTracker = 2;
  74.     List<JobInProgressListener> listeners =
  75.       new ArrayList<JobInProgressListener>();
  76.     QueueManager queueManager;
  77.     
  78.     private Map<String, TaskTrackerStatus> trackers =
  79.       new HashMap<String, TaskTrackerStatus>();
  80.     public FakeTaskTrackerManager() {
  81.       JobConf conf = new JobConf();
  82.       queueManager = new QueueManager(conf);
  83.       trackers.put("tt1", new TaskTrackerStatus("tt1", "tt1.host", 1,
  84.                    new ArrayList<TaskStatus>(), 0,
  85.                    maxMapTasksPerTracker, maxReduceTasksPerTracker));
  86.     }
  87.     
  88.     public ClusterStatus getClusterStatus() {
  89.       int numTrackers = trackers.size();
  90.       return new ClusterStatus(numTrackers, 0, 
  91.                                JobTracker.TASKTRACKER_EXPIRY_INTERVAL,
  92.                                maps, reduces,
  93.                                numTrackers * maxMapTasksPerTracker,
  94.                                numTrackers * maxReduceTasksPerTracker,
  95.                                JobTracker.State.RUNNING);
  96.     }
  97.     
  98.     public int getNumberOfUniqueHosts() {
  99.       return 0;
  100.     }
  101.     public Collection<TaskTrackerStatus> taskTrackers() {
  102.       return trackers.values();
  103.     }
  104.     public void addJobInProgressListener(JobInProgressListener listener) {
  105.       listeners.add(listener);
  106.     }
  107.     public void removeJobInProgressListener(JobInProgressListener listener) {
  108.       listeners.remove(listener);
  109.     }
  110.     
  111.     
  112.     public QueueManager getQueueManager() {
  113.       return queueManager;
  114.     }
  115.     
  116.     public int getNextHeartbeatInterval() {
  117.       return MRConstants.HEARTBEAT_INTERVAL_MIN;
  118.     }
  119.     public void killJob(JobID jobid) {
  120.       return;
  121.     }
  122.     public JobInProgress getJob(JobID jobid) {
  123.       return null;
  124.     }
  125.     // Test methods
  126.     
  127.     public void submitJob(JobInProgress job) throws IOException {
  128.       for (JobInProgressListener listener : listeners) {
  129.         listener.jobAdded(job);
  130.       }
  131.     }
  132.   }
  133.   
  134.   protected JobConf jobConf;
  135.   protected TaskScheduler scheduler;
  136.   private FakeTaskTrackerManager taskTrackerManager;
  137.   @Override
  138.   protected void setUp() throws Exception {
  139.     resetCounters();
  140.     jobConf = new JobConf();
  141.     taskTrackerManager = new FakeTaskTrackerManager();
  142.     scheduler = createTaskScheduler();
  143.     scheduler.setConf(jobConf);
  144.     scheduler.setTaskTrackerManager(taskTrackerManager);
  145.     scheduler.start();
  146.   }
  147.   
  148.   @Override
  149.   protected void tearDown() throws Exception {
  150.     if (scheduler != null) {
  151.       scheduler.terminate();
  152.     }
  153.   }
  154.   
  155.   protected TaskScheduler createTaskScheduler() {
  156.     return new JobQueueTaskScheduler();
  157.   }
  158.   
  159.   public void testParallelInitJobs() throws IOException {
  160.     FakeJobInProgress[] jobs = new FakeJobInProgress[NUM_JOBS];
  161.     
  162.     // Submit NUM_JOBS jobs in order. The init code will ensure
  163.     // that the jobs get inited in descending order of Job ids
  164.     // i.e. highest job id first and the smallest last.
  165.     // If we were not doing parallel init, the first submitted job
  166.     // will be inited first and that will hang
  167.     
  168.     for (int i = 0; i < NUM_JOBS; i++) {
  169.       jobs[i] = new FakeJobInProgress(jobConf, taskTrackerManager);
  170.       jobs[i].getStatus().setRunState(JobStatus.PREP);
  171.       taskTrackerManager.submitJob(jobs[i]);
  172.     }
  173.     
  174.     try {
  175.       Thread.sleep(1000);
  176.     } catch (InterruptedException ie) {}
  177.     
  178.     for (int i = 0; i < NUM_JOBS; i++) {
  179.       assertTrue(jobs[i].getStatus().getRunState() == JobStatus.SUCCEEDED);
  180.     }
  181.   }  
  182. }