TestJobQueueInformation.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:5k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.IOException;
  20. import java.net.InetAddress;
  21. import java.net.InetSocketAddress;
  22. import java.util.Collection;
  23. import java.util.List;
  24. import javax.security.auth.login.LoginException;
  25. import org.apache.commons.logging.Log;
  26. import org.apache.commons.logging.LogFactory;
  27. import org.apache.hadoop.conf.Configuration;
  28. import org.apache.hadoop.examples.SleepJob;
  29. import org.apache.hadoop.fs.FileSystem;
  30. import org.apache.hadoop.fs.Path;
  31. import org.apache.hadoop.hdfs.MiniDFSCluster;
  32. import org.apache.hadoop.ipc.RPC;
  33. import org.apache.hadoop.net.NetUtils;
  34. import org.apache.hadoop.security.UnixUserGroupInformation;
  35. import junit.framework.TestCase;
  36. public class TestJobQueueInformation extends TestCase {
  37.   private MiniMRCluster mrCluster;
  38.   private MiniDFSCluster dfsCluster;
  39.   private JobConf jc;
  40.   private static final String JOB_SCHEDULING_INFO = "TESTSCHEDULINGINFO";
  41.   private static final Path TEST_DIR = 
  42.     new Path(System.getProperty("test.build.data","/tmp"), 
  43.              "job-queue-info-testing");
  44.   private static final Path IN_DIR = new Path(TEST_DIR, "input");
  45.   private static final Path SHARE_DIR = new Path(TEST_DIR, "share");
  46.   private static final Path OUTPUT_DIR = new Path(TEST_DIR, "output");
  47.   
  48.   static String getSignalFile() {
  49.     return (new Path(SHARE_DIR, "signal")).toString();
  50.   }
  51.   // configure a waiting job with 2 maps
  52.   private JobConf configureWaitingJob(JobConf conf) throws IOException {
  53.     
  54.     UtilsForTests.configureWaitingJobConf(conf, IN_DIR, OUTPUT_DIR, 2, 0, 
  55.         "test-job-queue-info", getSignalFile(), getSignalFile());
  56.     return conf;
  57.   }
  58.   public static class TestTaskScheduler extends LimitTasksPerJobTaskScheduler {
  59.     @Override
  60.     public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)
  61.         throws IOException {
  62.       Collection<JobInProgress> jips = jobQueueJobInProgressListener
  63.           .getJobQueue();
  64.       if (jips != null && !jips.isEmpty()) {
  65.         for (JobInProgress jip : jips) {
  66.           jip.setSchedulingInfo(JOB_SCHEDULING_INFO);
  67.         }
  68.       }
  69.       return super.assignTasks(taskTracker);
  70.     }
  71.   }
  72.   @Override
  73.   protected void setUp() throws Exception {
  74.     super.setUp();
  75.     final int taskTrackers = 4;
  76.     Configuration conf = new Configuration();
  77.     dfsCluster = new MiniDFSCluster(conf, 4, true, null);
  78.     jc = new JobConf();
  79.     jc.setClass("mapred.jobtracker.taskScheduler", TestTaskScheduler.class,
  80.         TaskScheduler.class);
  81.     jc.setLong("mapred.jobtracker.taskScheduler.maxRunningTasksPerJob", 10L);
  82.     mrCluster = new MiniMRCluster(0, 0, taskTrackers, dfsCluster
  83.         .getFileSystem().getUri().toString(), 1, null, null, null, jc);
  84.   }
  85.   @Override
  86.   protected void tearDown() throws Exception {
  87.     super.tearDown();
  88.     mrCluster.shutdown();
  89.     dfsCluster.shutdown();
  90.   }
  91.   public void testJobQueues() throws IOException {
  92.     JobClient jc = new JobClient(mrCluster.createJobConf());
  93.     String expectedQueueInfo = "Maximum Tasks Per Job :: 10";
  94.     JobQueueInfo[] queueInfos = jc.getQueues();
  95.     assertNotNull(queueInfos);
  96.     assertEquals(1, queueInfos.length);
  97.     assertEquals("default", queueInfos[0].getQueueName());
  98.     JobConf conf = mrCluster.createJobConf();
  99.     FileSystem fileSys = dfsCluster.getFileSystem();
  100.     
  101.     // configure a waiting job
  102.     conf = configureWaitingJob(conf);
  103.     conf.setJobName("test-job-queue-info-test");
  104.     
  105.     // clear the signal file if any
  106.     fileSys.delete(SHARE_DIR, true);
  107.     
  108.     RunningJob rJob = jc.submitJob(conf);
  109.     
  110.     while (rJob.getJobState() != JobStatus.RUNNING) {
  111.       UtilsForTests.waitFor(10);
  112.     }
  113.     
  114.     int numberOfJobs = 0;
  115.     for (JobQueueInfo queueInfo : queueInfos) {
  116.       JobStatus[] jobStatusList = jc.getJobsFromQueue(queueInfo
  117.           .getQueueName());
  118.       assertNotNull(queueInfo.getQueueName());
  119.       assertNotNull(queueInfo.getSchedulingInfo());
  120.       assertEquals(expectedQueueInfo, queueInfo.getSchedulingInfo());
  121.       numberOfJobs += jobStatusList.length;
  122.       for (JobStatus status : jobStatusList) {
  123.         assertEquals(JOB_SCHEDULING_INFO, status.getSchedulingInfo());
  124.       }
  125.     }
  126.     assertEquals(1, numberOfJobs);
  127.     
  128.     UtilsForTests.signalTasks(dfsCluster, fileSys, getSignalFile(), 
  129.                               getSignalFile(), 4);
  130.   }
  131. }