TestParallelInitialization.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:6k
源码类别:
网格计算
开发平台:
Java
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.apache.hadoop.mapred;
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.Collection;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import junit.framework.TestCase;
- import org.apache.hadoop.io.IntWritable;
- public class TestParallelInitialization extends TestCase {
- private static int jobCounter;
- private static final int NUM_JOBS = 3;
- IntWritable numJobsCompleted = new IntWritable();
- static void resetCounters() {
- jobCounter = 0;
- }
- class FakeJobInProgress extends JobInProgress {
- public FakeJobInProgress(JobConf jobConf,
- FakeTaskTrackerManager taskTrackerManager) throws IOException {
- super(new JobID("test", ++jobCounter), jobConf);
- this.startTime = System.currentTimeMillis();
- this.status = new JobStatus(getJobID(), 0f, 0f, JobStatus.PREP);
- this.status.setJobPriority(JobPriority.NORMAL);
- this.status.setStartTime(startTime);
- }
- @Override
- public synchronized void initTasks() throws IOException {
- try {
- int jobNumber = this.getJobID().getId();
- synchronized (numJobsCompleted) {
- while (numJobsCompleted.get() != (NUM_JOBS - jobNumber)) {
- numJobsCompleted.wait();
- }
- numJobsCompleted.set(numJobsCompleted.get() + 1);
- numJobsCompleted.notifyAll();
- LOG.info("JobNumber " + jobNumber + " succeeded");
- }
- } catch (InterruptedException ie) {};
- this.status.setRunState(JobStatus.SUCCEEDED);
- }
- @Override
- synchronized void fail() {
- this.status.setRunState(JobStatus.FAILED);
- }
- }
- static class FakeTaskTrackerManager implements TaskTrackerManager {
- int maps = 0;
- int reduces = 0;
- int maxMapTasksPerTracker = 2;
- int maxReduceTasksPerTracker = 2;
- List<JobInProgressListener> listeners =
- new ArrayList<JobInProgressListener>();
- QueueManager queueManager;
- private Map<String, TaskTrackerStatus> trackers =
- new HashMap<String, TaskTrackerStatus>();
- public FakeTaskTrackerManager() {
- JobConf conf = new JobConf();
- queueManager = new QueueManager(conf);
- trackers.put("tt1", new TaskTrackerStatus("tt1", "tt1.host", 1,
- new ArrayList<TaskStatus>(), 0,
- maxMapTasksPerTracker, maxReduceTasksPerTracker));
- }
- public ClusterStatus getClusterStatus() {
- int numTrackers = trackers.size();
- return new ClusterStatus(numTrackers, 0,
- JobTracker.TASKTRACKER_EXPIRY_INTERVAL,
- maps, reduces,
- numTrackers * maxMapTasksPerTracker,
- numTrackers * maxReduceTasksPerTracker,
- JobTracker.State.RUNNING);
- }
- public int getNumberOfUniqueHosts() {
- return 0;
- }
- public Collection<TaskTrackerStatus> taskTrackers() {
- return trackers.values();
- }
- public void addJobInProgressListener(JobInProgressListener listener) {
- listeners.add(listener);
- }
- public void removeJobInProgressListener(JobInProgressListener listener) {
- listeners.remove(listener);
- }
- public QueueManager getQueueManager() {
- return queueManager;
- }
- public int getNextHeartbeatInterval() {
- return MRConstants.HEARTBEAT_INTERVAL_MIN;
- }
- public void killJob(JobID jobid) {
- return;
- }
- public JobInProgress getJob(JobID jobid) {
- return null;
- }
- // Test methods
- public void submitJob(JobInProgress job) throws IOException {
- for (JobInProgressListener listener : listeners) {
- listener.jobAdded(job);
- }
- }
- }
- protected JobConf jobConf;
- protected TaskScheduler scheduler;
- private FakeTaskTrackerManager taskTrackerManager;
- @Override
- protected void setUp() throws Exception {
- resetCounters();
- jobConf = new JobConf();
- taskTrackerManager = new FakeTaskTrackerManager();
- scheduler = createTaskScheduler();
- scheduler.setConf(jobConf);
- scheduler.setTaskTrackerManager(taskTrackerManager);
- scheduler.start();
- }
- @Override
- protected void tearDown() throws Exception {
- if (scheduler != null) {
- scheduler.terminate();
- }
- }
- protected TaskScheduler createTaskScheduler() {
- return new JobQueueTaskScheduler();
- }
- public void testParallelInitJobs() throws IOException {
- FakeJobInProgress[] jobs = new FakeJobInProgress[NUM_JOBS];
- // Submit NUM_JOBS jobs in order. The init code will ensure
- // that the jobs get inited in descending order of Job ids
- // i.e. highest job id first and the smallest last.
- // If we were not doing parallel init, the first submitted job
- // will be inited first and that will hang
- for (int i = 0; i < NUM_JOBS; i++) {
- jobs[i] = new FakeJobInProgress(jobConf, taskTrackerManager);
- jobs[i].getStatus().setRunState(JobStatus.PREP);
- taskTrackerManager.submitJob(jobs[i]);
- }
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ie) {}
- for (int i = 0; i < NUM_JOBS; i++) {
- assertTrue(jobs[i].getStatus().getRunState() == JobStatus.SUCCEEDED);
- }
- }
- }