testHod.py
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:14k
源码类别:

网格计算

开发平台:

Java

  1. #Licensed to the Apache Software Foundation (ASF) under one
  2. #or more contributor license agreements.  See the NOTICE file
  3. #distributed with this work for additional information
  4. #regarding copyright ownership.  The ASF licenses this file
  5. #to you under the Apache License, Version 2.0 (the
  6. #"License"); you may not use this file except in compliance
  7. #with the License.  You may obtain a copy of the License at
  8. #     http://www.apache.org/licenses/LICENSE-2.0
  9. #Unless required by applicable law or agreed to in writing, software
  10. #distributed under the License is distributed on an "AS IS" BASIS,
  11. #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. #See the License for the specific language governing permissions and
  13. #limitations under the License.
  14. import unittest, getpass, os, sys, re, threading, time
  15. myDirectory = os.path.realpath(sys.argv[0])
  16. rootDirectory   = re.sub("/testing/.*", "", myDirectory)
  17. sys.path.append(rootDirectory)
  18. import tempfile
  19. from testing.lib import BaseTestSuite, MockLogger, MockHadoopCluster
  20. from hodlib.Hod.hod import hodRunner, hodState
  21. from hodlib.Common.desc import NodePoolDesc
  22. excludes = []
  23. # Information about all clusters is written to a file called clusters.state.
  24. from hodlib.Hod.hod import CLUSTER_DATA_FILE as TEST_CLUSTER_DATA_FILE, 
  25.                            INVALID_STATE_FILE_MSGS
  26. # Temp directory prefix
  27. TMP_DIR_PREFIX=os.path.join('/tmp', 'hod-%s' % (getpass.getuser()))
  28. # build a config object with all required keys for initializing hod.
  29. def setupConf():
  30.   cfg = {
  31.           'hod' : {
  32.                     'original-dir' : os.getcwd(),
  33.                     'stream' : True,
  34.                     # store all the info about clusters in this directory
  35.                     'user_state' : '/tmp/hodtest',
  36.                     'debug' : 3,
  37.                     'java-home' : os.getenv('JAVA_HOME'),
  38.                     'cluster' : 'dummy',
  39.                     'cluster-factor' : 1.8,
  40.                     'xrs-port-range' : (32768,65536),
  41.                     'allocate-wait-time' : 3600,
  42.                     'temp-dir' : '/tmp/hod'
  43.                   },
  44.           # just set everything to dummy. Need something to initialize the
  45.           # node pool description object.
  46.           'resource_manager' : {
  47.                                  'id' : 'dummy',
  48.                                  'batch-home' : 'dummy',
  49.                                  'queue' : 'dummy',
  50.                                }
  51.         }
  52.   cfg['nodepooldesc'] = NodePoolDesc(cfg['resource_manager'])
  53.   return cfg
  54. # Test class that defines methods to test invalid arguments to hod operations.
  55. class test_InvalidArgsOperations(unittest.TestCase):
  56.   def setUp(self):
  57.     self.cfg = setupConf()
  58.     # initialize the mock objects
  59.     self.log = MockLogger()
  60.     self.cluster = MockHadoopCluster()
  61.     # Use the test logger. This will be used for test verification.
  62.     self.client = hodRunner(self.cfg, log=self.log, cluster=self.cluster)
  63.     # Create the hodState object to set the test state you want.
  64.     self.state = hodState(self.cfg['hod']['user_state'])
  65.     if not os.path.exists(self.cfg['hod']['user_state']):
  66.       os.path.mkdir(self.cfg['hod']['user_state'])
  67.     p = os.path.join(self.cfg['hod']['user_state'], '%s.state' % TEST_CLUSTER_DATA_FILE)
  68.     # ensure cluster data file exists, so write works in the tests.
  69.     f = open(p, 'w')
  70.     f.close()
  71.   
  72.   def tearDown(self):
  73.     # clean up cluster data file and directory
  74.     p = os.path.join(self.cfg['hod']['user_state'], '%s.state' % TEST_CLUSTER_DATA_FILE)
  75.     os.remove(p)
  76.     os.rmdir(self.cfg['hod']['user_state'])
  77.   # Test that list works with deleted cluster directories - more than one entries which are invalid.
  78.   def testListInvalidDirectory(self):
  79.     userState = { os.path.join(TMP_DIR_PREFIX, 'testListInvalidDirectory1') : '123.dummy.id1', 
  80.                   os.path.join(TMP_DIR_PREFIX, 'testListInvalidDirectory2') : '123.dummy.id2' }
  81.     self.__setupClusterState(userState)
  82.     self.client._op_list(['list'])
  83.     # assert that required errors are logged.
  84.     for clusterDir in userState.keys():
  85.       self.assertTrue(self.log.hasMessage('cluster state unknownt%st%s' 
  86.                             % (userState[clusterDir], clusterDir), 'info'))
  87.     # simulate a test where a directory is deleted, and created again, without deallocation
  88.     clusterDir = os.path.join(TMP_DIR_PREFIX, 'testListEmptyDirectory')
  89.     os.makedirs(clusterDir)
  90.     self.assertTrue(os.path.isdir(clusterDir))
  91.     userState = { clusterDir : '123.dummy.id3' }
  92.     self.__setupClusterState(userState, False)
  93.     self.client._op_list(['list'])
  94.     self.assertTrue(self.log.hasMessage('cluster state unknownt%st%s' 
  95.                           % (userState[clusterDir], clusterDir), 'info'))
  96.     os.rmdir(clusterDir)
  97.     
  98.   # Test that info works with a deleted cluster directory
  99.   def testInfoInvalidDirectory(self):
  100.     clusterDir = os.path.join(TMP_DIR_PREFIX, 'testInfoInvalidDirectory')
  101.     userState = { clusterDir : '456.dummy.id' }
  102.     self.__setupClusterState(userState)
  103.     self.client._op_info(['info', clusterDir])
  104.     self.assertTrue(self.log.hasMessage("Cannot find information for cluster with id '%s' in previously allocated cluster directory '%s'." % (userState[clusterDir], clusterDir), 'critical'))
  105.     # simulate a test where a directory is deleted, and created again, without deallocation
  106.     clusterDir = os.path.join(TMP_DIR_PREFIX, 'testInfoEmptyDirectory')
  107.     os.makedirs(clusterDir)
  108.     self.assertTrue(os.path.isdir(clusterDir))
  109.     userState = { clusterDir : '456.dummy.id1' }
  110.     self.__setupClusterState(userState, False)
  111.     self.client._op_info(['info', clusterDir])
  112.     self.assertTrue(self.log.hasMessage("Cannot find information for cluster with id '%s' in previously allocated cluster directory '%s'." % (userState[clusterDir], clusterDir), 'critical'))
  113.     os.rmdir(clusterDir)
  114.   # Test info works with an invalid cluster directory
  115.   def testInfoNonExistentDirectory(self):
  116.     clusterDir = '/tmp/hod/testInfoNonExistentDirectory'
  117.     self.client._op_info(['info', clusterDir])
  118.     self.assertTrue(self.log.hasMessage("Invalid hod.clusterdir(--hod.clusterdir or -d). %s : No such directory" % (clusterDir), 'critical'))
  119.   # Test that deallocation works on a deleted cluster directory
  120.   # by clearing the job, and removing the state
  121.   def testDeallocateInvalidDirectory(self):
  122.     clusterDir = os.path.join(TMP_DIR_PREFIX,'testDeallocateInvalidDirectory')
  123.     jobid = '789.dummy.id'
  124.     userState = { clusterDir : jobid }
  125.     self.__setupClusterState(userState)
  126.     self.client._op_deallocate(['deallocate', clusterDir])
  127.     # verify job was deleted
  128.     self.assertTrue(self.cluster.wasOperationPerformed('delete_job', jobid))
  129.     # verify appropriate message was logged.
  130.     self.assertTrue(self.log.hasMessage("Cannot find information for cluster with id '%s' in previously allocated cluster directory '%s'." % (userState[clusterDir], clusterDir), 'critical'))
  131.     self.assertTrue(self.log.hasMessage("Freeing resources allocated to the cluster.", 'critical'))
  132.     # verify that the state information was cleared.
  133.     userState = self.state.read(TEST_CLUSTER_DATA_FILE)
  134.     self.assertFalse(clusterDir in userState.keys())
  135.  
  136.     # simulate a test where a directory is deleted, and created again, without deallocation
  137.     clusterDir = os.path.join(TMP_DIR_PREFIX,'testDeallocateEmptyDirectory')
  138.     os.makedirs(clusterDir)
  139.     self.assertTrue(os.path.isdir(clusterDir))
  140.     jobid = '789.dummy.id1'
  141.     userState = { clusterDir : jobid }
  142.     self.__setupClusterState(userState, False)
  143.     self.client._op_deallocate(['deallocate', clusterDir])
  144.     # verify job was deleted
  145.     self.assertTrue(self.cluster.wasOperationPerformed('delete_job', jobid))
  146.     # verify appropriate message was logged.
  147.     self.assertTrue(self.log.hasMessage("Cannot find information for cluster with id '%s' in previously allocated cluster directory '%s'." % (userState[clusterDir], clusterDir), 'critical'))
  148.     self.assertTrue(self.log.hasMessage("Freeing resources allocated to the cluster.", 'critical'))
  149.     # verify that the state information was cleared.
  150.     userState = self.state.read(TEST_CLUSTER_DATA_FILE)
  151.     self.assertFalse(clusterDir in userState.keys())
  152.     os.rmdir(clusterDir)
  153.      
  154.   # Test that deallocation works on a nonexistent directory.
  155.   def testDeallocateNonExistentDirectory(self):
  156.     clusterDir = os.path.join(TMP_DIR_PREFIX,'testDeallocateNonExistentDirectory')
  157.     self.client._op_deallocate(['deallocate', clusterDir])
  158.     # there should be no call..
  159.     self.assertFalse(self.cluster.wasOperationPerformed('delete_job', None))
  160.     self.assertTrue(self.log.hasMessage("Invalid hod.clusterdir(--hod.clusterdir or -d). %s : No such directory" % (clusterDir), 'critical'))
  161.   # Test that allocation on an previously deleted directory fails.    
  162.   def testAllocateOnDeletedDirectory(self):
  163.     clusterDir = os.path.join(TMP_DIR_PREFIX, 'testAllocateOnDeletedDirectory')
  164.     os.makedirs(clusterDir)
  165.     self.assertTrue(os.path.isdir(clusterDir))
  166.     jobid = '1234.abc.com'
  167.     userState = { clusterDir : jobid }
  168.     self.__setupClusterState(userState, False)
  169.     self.client._op_allocate(['allocate', clusterDir, '3'])
  170.     self.assertTrue(self.log.hasMessage("Found a previously allocated cluster at "
  171.                       "cluster directory '%s'. HOD cannot determine if this cluster "
  172.                       "can be automatically deallocated. Deallocate the cluster if it "
  173.                       "is unused." % (clusterDir), 'critical'))
  174.     os.rmdir(clusterDir)
  175.   def __setupClusterState(self, clusterStateMap, verifyDirIsAbsent=True):
  176.     for clusterDir in clusterStateMap.keys():
  177.       # ensure directory doesn't exist, just in case.
  178.       if verifyDirIsAbsent:
  179.         self.assertFalse(os.path.exists(clusterDir))
  180.     # set up required state.
  181.     self.state.write(TEST_CLUSTER_DATA_FILE, clusterStateMap)
  182.     # verify everything is stored correctly.
  183.     state = self.state.read(TEST_CLUSTER_DATA_FILE)
  184.     for clusterDir in clusterStateMap.keys():
  185.       self.assertTrue(clusterDir in state.keys())
  186.       self.assertEquals(clusterStateMap[clusterDir], state[clusterDir])
  187. class test_InvalidHodStateFiles(unittest.TestCase):
  188.   def setUp(self):
  189.     self.rootDir = '/tmp/hod-%s' % getpass.getuser()
  190.     self.cfg = setupConf() # creat a conf
  191.     # Modify hod.user_state
  192.     self.cfg['hod']['user_state'] = tempfile.mkdtemp(dir=self.rootDir,
  193.                               prefix='HodTestSuite.test_InvalidHodStateFiles_')
  194.     self.log = MockLogger() # mock logger
  195.     self.cluster = MockHadoopCluster() # mock hadoop cluster
  196.     self.client = hodRunner(self.cfg, log=self.log, cluster=self.cluster)
  197.     self.state = hodState(self.cfg['hod']['user_state'])
  198.     self.statePath = os.path.join(self.cfg['hod']['user_state'], '%s.state' % 
  199.                                   TEST_CLUSTER_DATA_FILE)
  200.     self.clusterDir = tempfile.mkdtemp(dir=self.rootDir,
  201.                               prefix='HodTestSuite.test_InvalidHodStateFiles_')
  202.   
  203.   def testOperationWithInvalidStateFile(self):
  204.     jobid = '1234.hadoop.apache.org'
  205.     # create user state file with invalid permissions
  206.     stateFile = open(self.statePath, "w")
  207.     os.chmod(self.statePath, 000) # has no read/write permissions
  208.     self.client._hodRunner__cfg['hod']['operation'] = 
  209.                                              "info %s" % self.clusterDir
  210.     ret = self.client.operation()
  211.     os.chmod(self.statePath, 700) # restore permissions
  212.     stateFile.close()
  213.     os.remove(self.statePath)
  214.     # print self.log._MockLogger__logLines
  215.     self.assertTrue(self.log.hasMessage(INVALID_STATE_FILE_MSGS[0] % 
  216.                           os.path.realpath(self.statePath), 'critical'))
  217.     self.assertEquals(ret, 1)
  218.     
  219.   def testAllocateWithInvalidStateFile(self):
  220.     jobid = '1234.hadoop.apache.org'
  221.     # create user state file with invalid permissions
  222.     stateFile = open(self.statePath, "w")
  223.     os.chmod(self.statePath, 0400) # has no write permissions
  224.     self.client._hodRunner__cfg['hod']['operation'] = 
  225.                                         "allocate %s %s" % (self.clusterDir, '3')
  226.     ret = self.client.operation()
  227.     os.chmod(self.statePath, 700) # restore permissions
  228.     stateFile.close()
  229.     os.remove(self.statePath)
  230.     # print self.log._MockLogger__logLines
  231.     self.assertTrue(self.log.hasMessage(INVALID_STATE_FILE_MSGS[2] % 
  232.                         os.path.realpath(self.statePath), 'critical'))
  233.     self.assertEquals(ret, 1)
  234.   
  235.   def testAllocateWithInvalidStateStore(self):
  236.     jobid = '1234.hadoop.apache.org'
  237.     self.client._hodRunner__cfg['hod']['operation'] = 
  238.                                       "allocate %s %s" % (self.clusterDir, 3)
  239.     ###### check with no executable permissions ######
  240.     stateFile = open(self.statePath, "w") # create user state file
  241.     os.chmod(self.cfg['hod']['user_state'], 0600) 
  242.     ret = self.client.operation()
  243.     os.chmod(self.cfg['hod']['user_state'], 0700) # restore permissions
  244.     stateFile.close()
  245.     os.remove(self.statePath)
  246.     # print self.log._MockLogger__logLines
  247.     self.assertTrue(self.log.hasMessage(INVALID_STATE_FILE_MSGS[0] % 
  248.                           os.path.realpath(self.statePath), 'critical'))
  249.     self.assertEquals(ret, 1)
  250.     
  251.     ###### check with no write permissions ######
  252.     stateFile = open(self.statePath, "w") # create user state file
  253.     os.chmod(self.cfg['hod']['user_state'], 0500) 
  254.     ret = self.client.operation()
  255.     os.chmod(self.cfg['hod']['user_state'], 0700) # restore permissions
  256.     stateFile.close()
  257.     os.remove(self.statePath)
  258.     # print self.log._MockLogger__logLines
  259.     self.assertTrue(self.log.hasMessage(INVALID_STATE_FILE_MSGS[0] % 
  260.                           os.path.realpath(self.statePath), 'critical'))
  261.     self.assertEquals(ret, 1)
  262.   def tearDown(self):
  263.     if os.path.exists(self.clusterDir): os.rmdir(self.clusterDir)
  264.     if os.path.exists(self.cfg['hod']['user_state']):
  265.       os.rmdir(self.cfg['hod']['user_state'])
  266. class HodTestSuite(BaseTestSuite):
  267.   def __init__(self):
  268.     # suite setup
  269.     BaseTestSuite.__init__(self, __name__, excludes)
  270.     pass
  271.   
  272.   def cleanUp(self):
  273.     # suite tearDown
  274.     pass
  275. def RunHodTests():
  276.   # modulename_suite
  277.   suite = HodTestSuite()
  278.   testResult = suite.runTests()
  279.   suite.cleanUp()
  280.   return testResult
  281. if __name__ == "__main__":
  282.   RunHodTests()