TestQueueManager.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:16k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.IOException;
  20. import java.util.Set;
  21. import java.util.TreeSet;
  22. import javax.security.auth.login.LoginException;
  23. import junit.framework.TestCase;
  24. import org.apache.commons.logging.Log;
  25. import org.apache.commons.logging.LogFactory;
  26. import org.apache.hadoop.examples.SleepJob;
  27. import org.apache.hadoop.mapred.JobConf;
  28. import org.apache.hadoop.fs.FileSystem;
  29. import org.apache.hadoop.fs.Path;
  30. import org.apache.hadoop.hdfs.MiniDFSCluster;
  31. import org.apache.hadoop.security.UserGroupInformation;
  32. import org.apache.hadoop.security.UnixUserGroupInformation;
  33. public class TestQueueManager extends TestCase {
  34.   private static final Log LOG = LogFactory.getLog(TestQueueManager.class);
  35.   
  36.   private MiniDFSCluster miniDFSCluster;
  37.   private MiniMRCluster miniMRCluster;
  38.   public void testDefaultQueueConfiguration() {
  39.     JobConf conf = new JobConf();
  40.     QueueManager qMgr = new QueueManager(conf);
  41.     Set<String> expQueues = new TreeSet<String>();
  42.     expQueues.add("default");
  43.     verifyQueues(expQueues, qMgr.getQueues());
  44.     // pass true so it will fail if the key is not found.
  45.     assertFalse(conf.getBoolean("mapred.acls.enabled", true));
  46.   }
  47.   
  48.   public void testMultipleQueues() {
  49.     JobConf conf = new JobConf();
  50.     conf.set("mapred.queue.names", "q1,q2,Q3");
  51.     QueueManager qMgr = new QueueManager(conf);
  52.     Set<String> expQueues = new TreeSet<String>();
  53.     expQueues.add("q1");
  54.     expQueues.add("q2");
  55.     expQueues.add("Q3");
  56.     verifyQueues(expQueues, qMgr.getQueues());
  57.   }
  58.   
  59.   public void testSchedulerInfo() {
  60.     JobConf conf = new JobConf();
  61.     conf.set("mapred.queue.names", "qq1,qq2");
  62.     QueueManager qMgr = new QueueManager(conf);
  63.     qMgr.setSchedulerInfo("qq1", "queueInfoForqq1");
  64.     qMgr.setSchedulerInfo("qq2", "queueInfoForqq2");
  65.     assertEquals(qMgr.getSchedulerInfo("qq2"), "queueInfoForqq2");
  66.     assertEquals(qMgr.getSchedulerInfo("qq1"), "queueInfoForqq1");
  67.   }
  68.   
  69.   public void testAllEnabledACLForJobSubmission() throws IOException {
  70.     JobConf conf = setupConf("mapred.queue.default.acl-submit-job", "*");
  71.     verifyJobSubmission(conf, true);
  72.   }
  73.   
  74.   public void testAllDisabledACLForJobSubmission() throws IOException {
  75.     JobConf conf = setupConf("mapred.queue.default.acl-submit-job", "");
  76.     verifyJobSubmission(conf, false);
  77.   }
  78.   
  79.   public void testUserDisabledACLForJobSubmission() throws IOException {
  80.     JobConf conf = setupConf("mapred.queue.default.acl-submit-job", 
  81.                                 "3698-non-existent-user");
  82.     verifyJobSubmission(conf, false);
  83.   }
  84.   
  85.   public void testDisabledACLForNonDefaultQueue() throws IOException {
  86.     // allow everyone in default queue
  87.     JobConf conf = setupConf("mapred.queue.default.acl-submit-job", "*");
  88.     // setup a different queue
  89.     conf.set("mapred.queue.names", "default,q1");
  90.     // setup a different acl for this queue.
  91.     conf.set("mapred.queue.q1.acl-submit-job", "dummy-user");
  92.     // verify job submission to other queue fails.
  93.     verifyJobSubmission(conf, false, "q1");
  94.   }
  95.   
  96.   public void testSubmissionToInvalidQueue() throws IOException{
  97.     JobConf conf = new JobConf();
  98.     conf.set("mapred.queue.names","default");
  99.     setUpCluster(conf);
  100.     String queueName = "q1";
  101.     try {
  102.       RunningJob rjob = submitSleepJob(1, 1, 100, 100, true, null, queueName);
  103.     } catch (IOException ioe) {      
  104.        assertTrue(ioe.getMessage().contains("Queue "" + queueName + "" does not exist"));
  105.        return;
  106.     } finally {
  107.       tearDownCluster();
  108.     }
  109.     fail("Job submission to invalid queue job shouldnot complete , it should fail with proper exception ");   
  110.   }
  111.   
  112.   public void testEnabledACLForNonDefaultQueue() throws IOException,
  113.                                                           LoginException {
  114.     // login as self...
  115.     UserGroupInformation ugi = UnixUserGroupInformation.login();
  116.     String userName = ugi.getUserName();
  117.     // allow everyone in default queue
  118.     JobConf conf = setupConf("mapred.queue.default.acl-submit-job", "*");
  119.     // setup a different queue
  120.     conf.set("mapred.queue.names", "default,q2");
  121.     // setup a different acl for this queue.
  122.     conf.set("mapred.queue.q2.acl-submit-job", userName);
  123.     // verify job submission to other queue fails.
  124.     verifyJobSubmission(conf, true, "q2");
  125.   }
  126.   
  127.   public void testUserEnabledACLForJobSubmission() 
  128.                                     throws IOException, LoginException {
  129.     // login as self...
  130.     UserGroupInformation ugi = UnixUserGroupInformation.login();
  131.     String userName = ugi.getUserName();
  132.     JobConf conf = setupConf("mapred.queue.default.acl-submit-job",
  133.                                   "3698-junk-user," + userName 
  134.                                     + " 3698-junk-group1,3698-junk-group2");
  135.     verifyJobSubmission(conf, true);
  136.   }
  137.   
  138.   public void testGroupsEnabledACLForJobSubmission() 
  139.                                     throws IOException, LoginException {
  140.     // login as self, get one group, and add in allowed list.
  141.     UserGroupInformation ugi = UnixUserGroupInformation.login();
  142.     String[] groups = ugi.getGroupNames();
  143.     assertTrue(groups.length > 0);
  144.     JobConf conf = setupConf("mapred.queue.default.acl-submit-job",
  145.                                 "3698-junk-user1,3698-junk-user2 " 
  146.                                   + groups[groups.length-1] 
  147.                                            + ",3698-junk-group");
  148.     verifyJobSubmission(conf, true);
  149.   }
  150.   
  151.   public void testAllEnabledACLForJobKill() throws IOException {
  152.     JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs", "*");
  153.     verifyJobKill(conf, true);
  154.   }
  155.   public void testAllDisabledACLForJobKill() throws IOException {
  156.     JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs", "");
  157.     verifyJobKillAsOtherUser(conf, false, "dummy-user,dummy-user-group");
  158.   }
  159.   
  160.   public void testOwnerAllowedForJobKill() throws IOException {
  161.     JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs", 
  162.                                               "junk-user");
  163.     verifyJobKill(conf, true);
  164.   }
  165.   
  166.   public void testUserDisabledACLForJobKill() throws IOException {
  167.     //setup a cluster allowing a user to submit
  168.     JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs", 
  169.                                               "dummy-user");
  170.     verifyJobKillAsOtherUser(conf, false, "dummy-user,dummy-user-group");
  171.   }
  172.   
  173.   public void testUserEnabledACLForJobKill() throws IOException, 
  174.                                                     LoginException {
  175.     // login as self...
  176.     UserGroupInformation ugi = UnixUserGroupInformation.login();
  177.     String userName = ugi.getUserName();
  178.     JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs",
  179.                                               "dummy-user,"+userName);
  180.     verifyJobKillAsOtherUser(conf, true, "dummy-user,dummy-user-group");
  181.   }
  182.   
  183.   public void testUserDisabledForJobPriorityChange() throws IOException {
  184.     JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs",
  185.                               "junk-user");
  186.     verifyJobPriorityChangeAsOtherUser(conf, false, 
  187.                               "junk-user,junk-user-group");
  188.   }
  189.   
  190.   private JobConf setupConf(String aclName, String aclValue) {
  191.     JobConf conf = new JobConf();
  192.     conf.setBoolean("mapred.acls.enabled", true);
  193.     conf.set(aclName, aclValue);
  194.     return conf;
  195.   }
  196.   
  197.   private void verifyQueues(Set<String> expectedQueues, 
  198.                                           Set<String> actualQueues) {
  199.     assertEquals(expectedQueues.size(), actualQueues.size());
  200.     for (String queue : expectedQueues) {
  201.       assertTrue(actualQueues.contains(queue));
  202.     }
  203.   }
  204.   
  205.   private void verifyJobSubmission(JobConf conf, boolean shouldSucceed) 
  206.                                               throws IOException {
  207.     verifyJobSubmission(conf, shouldSucceed, "default");
  208.   }
  209.   private void verifyJobSubmission(JobConf conf, boolean shouldSucceed, 
  210.                                     String queue) throws IOException {
  211.     setUpCluster(conf);
  212.     try {
  213.       RunningJob rjob = submitSleepJob(1, 1, 100, 100, true, null, queue);
  214.       if (shouldSucceed) {
  215.         assertTrue(rjob.isSuccessful());
  216.       } else {
  217.         fail("Job submission should have failed.");
  218.       }
  219.     } catch (IOException ioe) {
  220.       if (shouldSucceed) {
  221.         throw ioe;
  222.       } else {
  223.         LOG.info("exception while submitting job: " + ioe.getMessage());
  224.         assertTrue(ioe.getMessage().
  225.             contains("cannot perform operation " +
  226.             "SUBMIT_JOB on queue " + queue));
  227.         // check if the system directory gets cleaned up or not
  228.         JobTracker jobtracker = miniMRCluster.getJobTrackerRunner().getJobTracker();
  229.         Path sysDir = new Path(jobtracker.getSystemDir());
  230.         FileSystem fs = sysDir.getFileSystem(conf);
  231.         int size = fs.listStatus(sysDir).length;
  232.         while (size > 1) { // ignore the jobtracker.info file
  233.           System.out.println("Waiting for the job files in sys directory to be cleaned up");
  234.           UtilsForTests.waitFor(100);
  235.           size = fs.listStatus(sysDir).length;
  236.         }
  237.       }
  238.     } finally {
  239.       tearDownCluster();
  240.     }
  241. }
  242.   private void verifyJobKill(JobConf conf, boolean shouldSucceed) 
  243.                                       throws IOException {
  244.     setUpCluster(conf);
  245.     try {
  246.       RunningJob rjob = submitSleepJob(1, 1, 1000, 1000, false);
  247.       assertFalse(rjob.isComplete());
  248.       while(rjob.mapProgress() == 0.0f) {
  249.         try {
  250.           Thread.sleep(10);  
  251.         } catch (InterruptedException ie) {
  252.           break;
  253.         }
  254.       }
  255.       rjob.killJob();
  256.       while(rjob.cleanupProgress() == 0.0f) {
  257.         try {
  258.           Thread.sleep(10);  
  259.         } catch (InterruptedException ie) {
  260.           break;
  261.         }
  262.       }
  263.       if (shouldSucceed) {
  264.         assertTrue(rjob.isComplete());
  265.       } else {
  266.         fail("Job kill should have failed.");
  267.       }
  268.     } catch (IOException ioe) {
  269.       if (shouldSucceed) {
  270.         throw ioe;
  271.       } else {
  272.         LOG.info("exception while submitting job: " + ioe.getMessage());
  273.         assertTrue(ioe.getMessage().
  274.                         contains("cannot perform operation " +
  275.                                     "ADMINISTER_JOBS on queue default"));
  276.       }
  277.     } finally {
  278.       tearDownCluster();
  279.     }
  280.   }
  281.   
  282.   private void verifyJobKillAsOtherUser(JobConf conf, boolean shouldSucceed,
  283.                                         String otherUserInfo) 
  284.                         throws IOException {
  285.     setUpCluster(conf);
  286.     try {
  287.       // submit a job as another user.
  288.       String userInfo = otherUserInfo;
  289.       RunningJob rjob = submitSleepJob(1, 1, 1000, 1000, false, userInfo);
  290.       assertFalse(rjob.isComplete());
  291.       //try to kill as self
  292.       try {
  293.         rjob.killJob();
  294.         if (!shouldSucceed) {
  295.           fail("should fail kill operation");  
  296.         }
  297.       } catch (IOException ioe) {
  298.         if (shouldSucceed) {
  299.           throw ioe;
  300.         }
  301.         //verify it fails
  302.         LOG.info("exception while submitting job: " + ioe.getMessage());
  303.         assertTrue(ioe.getMessage().
  304.                         contains("cannot perform operation " +
  305.                                     "ADMINISTER_JOBS on queue default"));
  306.       }
  307.       //wait for job to complete on its own
  308.       while (!rjob.isComplete()) {
  309.         try {
  310.           Thread.sleep(1000);
  311.         } catch (InterruptedException ie) {
  312.           break;
  313.         }
  314.       }
  315.     } finally {
  316.       tearDownCluster();
  317.     }
  318.   }
  319.   
  320.   private void verifyJobPriorityChangeAsOtherUser(JobConf conf, 
  321.                           boolean shouldSucceed, String otherUserInfo)
  322.                             throws IOException {
  323.     setUpCluster(conf);
  324.     try {
  325.       // submit job as another user.
  326.       String userInfo = otherUserInfo;
  327.       RunningJob rjob = submitSleepJob(1, 1, 1000, 1000, false, userInfo);
  328.       assertFalse(rjob.isComplete());
  329.       
  330.       // try to change priority as self
  331.       try {
  332.         rjob.setJobPriority("VERY_LOW");
  333.         if (!shouldSucceed) {
  334.           fail("changing priority should fail.");
  335.         }
  336.       } catch (IOException ioe) {
  337.         //verify it fails
  338.         LOG.info("exception while submitting job: " + ioe.getMessage());
  339.         assertTrue(ioe.getMessage().
  340.                         contains("cannot perform operation " +
  341.                                     "ADMINISTER_JOBS on queue default"));
  342.       }
  343.       //wait for job to complete on its own
  344.       while (!rjob.isComplete()) {
  345.         try {
  346.           Thread.sleep(1000);
  347.         } catch (InterruptedException ie) {
  348.           break;
  349.         }
  350.       }
  351.     } finally {
  352.       tearDownCluster();
  353.     }
  354.   }
  355.   
  356.   private void setUpCluster(JobConf conf) throws IOException {
  357.     miniDFSCluster = new MiniDFSCluster(conf, 1, true, null);
  358.     FileSystem fileSys = miniDFSCluster.getFileSystem();
  359.     String namenode = fileSys.getUri().toString();
  360.     miniMRCluster = new MiniMRCluster(1, namenode, 3, 
  361.                       null, null, conf);
  362.   }
  363.   
  364.   private void tearDownCluster() throws IOException {
  365.     if (miniMRCluster != null) { miniMRCluster.shutdown(); }
  366.     if (miniDFSCluster != null) { miniDFSCluster.shutdown(); }
  367.   }
  368.   
  369.   private RunningJob submitSleepJob(int numMappers, int numReducers, 
  370.                             long mapSleepTime, long reduceSleepTime,
  371.                             boolean shouldComplete) 
  372.                               throws IOException {
  373.     return submitSleepJob(numMappers, numReducers, mapSleepTime,
  374.                           reduceSleepTime, shouldComplete, null);
  375.   }
  376.   
  377.   private RunningJob submitSleepJob(int numMappers, int numReducers, 
  378.                                       long mapSleepTime, long reduceSleepTime,
  379.                                       boolean shouldComplete, String userInfo) 
  380.                                             throws IOException {
  381.     return submitSleepJob(numMappers, numReducers, mapSleepTime, 
  382.                           reduceSleepTime, shouldComplete, userInfo, null);
  383.   }
  384.   private RunningJob submitSleepJob(int numMappers, int numReducers, 
  385.                                     long mapSleepTime, long reduceSleepTime,
  386.                                     boolean shouldComplete, String userInfo,
  387.                                     String queueName) 
  388.                                       throws IOException {
  389.     JobConf clientConf = new JobConf();
  390.     clientConf.set("mapred.job.tracker", "localhost:"
  391.         + miniMRCluster.getJobTrackerPort());
  392.     SleepJob job = new SleepJob();
  393.     job.setConf(clientConf);
  394.     clientConf = job.setupJobConf(numMappers, numReducers, 
  395.         mapSleepTime, (int)mapSleepTime/100,
  396.         reduceSleepTime, (int)reduceSleepTime/100);
  397.     if (queueName != null) {
  398.       clientConf.setQueueName(queueName);
  399.     }
  400.     RunningJob rJob = null;
  401.     if (shouldComplete) {
  402.       rJob = JobClient.runJob(clientConf);  
  403.     } else {
  404.       JobConf jc = new JobConf(clientConf);
  405.       if (userInfo != null) {
  406.         jc.set(UnixUserGroupInformation.UGI_PROPERTY_NAME, userInfo);
  407.       }
  408.       rJob = new JobClient(clientConf).submitJob(jc);
  409.     }
  410.     return rJob;
  411.   }
  412. }