testHod.py
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:14k
- #Licensed to the Apache Software Foundation (ASF) under one
- #or more contributor license agreements. See the NOTICE file
- #distributed with this work for additional information
- #regarding copyright ownership. The ASF licenses this file
- #to you under the Apache License, Version 2.0 (the
- #"License"); you may not use this file except in compliance
- #with the License. You may obtain a copy of the License at
- # http://www.apache.org/licenses/LICENSE-2.0
- #Unless required by applicable law or agreed to in writing, software
- #distributed under the License is distributed on an "AS IS" BASIS,
- #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- #See the License for the specific language governing permissions and
- #limitations under the License.
- import unittest, getpass, os, sys, re, threading, time
- myDirectory = os.path.realpath(sys.argv[0])
- rootDirectory = re.sub("/testing/.*", "", myDirectory)
- sys.path.append(rootDirectory)
- import tempfile
- from testing.lib import BaseTestSuite, MockLogger, MockHadoopCluster
- from hodlib.Hod.hod import hodRunner, hodState
- from hodlib.Common.desc import NodePoolDesc
- excludes = []
- # Information about all clusters is written to a file called clusters.state.
- from hodlib.Hod.hod import CLUSTER_DATA_FILE as TEST_CLUSTER_DATA_FILE,
- INVALID_STATE_FILE_MSGS
- # Temp directory prefix
- TMP_DIR_PREFIX=os.path.join('/tmp', 'hod-%s' % (getpass.getuser()))
- # build a config object with all required keys for initializing hod.
- def setupConf():
- cfg = {
- 'hod' : {
- 'original-dir' : os.getcwd(),
- 'stream' : True,
- # store all the info about clusters in this directory
- 'user_state' : '/tmp/hodtest',
- 'debug' : 3,
- 'java-home' : os.getenv('JAVA_HOME'),
- 'cluster' : 'dummy',
- 'cluster-factor' : 1.8,
- 'xrs-port-range' : (32768,65536),
- 'allocate-wait-time' : 3600,
- 'temp-dir' : '/tmp/hod'
- },
- # just set everything to dummy. Need something to initialize the
- # node pool description object.
- 'resource_manager' : {
- 'id' : 'dummy',
- 'batch-home' : 'dummy',
- 'queue' : 'dummy',
- }
- }
- cfg['nodepooldesc'] = NodePoolDesc(cfg['resource_manager'])
- return cfg
- # Test class that defines methods to test invalid arguments to hod operations.
- class test_InvalidArgsOperations(unittest.TestCase):
- def setUp(self):
- self.cfg = setupConf()
- # initialize the mock objects
- self.log = MockLogger()
- self.cluster = MockHadoopCluster()
- # Use the test logger. This will be used for test verification.
- self.client = hodRunner(self.cfg, log=self.log, cluster=self.cluster)
- # Create the hodState object to set the test state you want.
- self.state = hodState(self.cfg['hod']['user_state'])
- if not os.path.exists(self.cfg['hod']['user_state']):
- os.path.mkdir(self.cfg['hod']['user_state'])
- p = os.path.join(self.cfg['hod']['user_state'], '%s.state' % TEST_CLUSTER_DATA_FILE)
- # ensure cluster data file exists, so write works in the tests.
- f = open(p, 'w')
- f.close()
-
- def tearDown(self):
- # clean up cluster data file and directory
- p = os.path.join(self.cfg['hod']['user_state'], '%s.state' % TEST_CLUSTER_DATA_FILE)
- os.remove(p)
- os.rmdir(self.cfg['hod']['user_state'])
- # Test that list works with deleted cluster directories - more than one entries which are invalid.
- def testListInvalidDirectory(self):
- userState = { os.path.join(TMP_DIR_PREFIX, 'testListInvalidDirectory1') : '123.dummy.id1',
- os.path.join(TMP_DIR_PREFIX, 'testListInvalidDirectory2') : '123.dummy.id2' }
- self.__setupClusterState(userState)
- self.client._op_list(['list'])
- # assert that required errors are logged.
- for clusterDir in userState.keys():
- self.assertTrue(self.log.hasMessage('cluster state unknownt%st%s'
- % (userState[clusterDir], clusterDir), 'info'))
- # simulate a test where a directory is deleted, and created again, without deallocation
- clusterDir = os.path.join(TMP_DIR_PREFIX, 'testListEmptyDirectory')
- os.makedirs(clusterDir)
- self.assertTrue(os.path.isdir(clusterDir))
- userState = { clusterDir : '123.dummy.id3' }
- self.__setupClusterState(userState, False)
- self.client._op_list(['list'])
- self.assertTrue(self.log.hasMessage('cluster state unknownt%st%s'
- % (userState[clusterDir], clusterDir), 'info'))
- os.rmdir(clusterDir)
-
- # Test that info works with a deleted cluster directory
- def testInfoInvalidDirectory(self):
- clusterDir = os.path.join(TMP_DIR_PREFIX, 'testInfoInvalidDirectory')
- userState = { clusterDir : '456.dummy.id' }
- self.__setupClusterState(userState)
- self.client._op_info(['info', clusterDir])
- self.assertTrue(self.log.hasMessage("Cannot find information for cluster with id '%s' in previously allocated cluster directory '%s'." % (userState[clusterDir], clusterDir), 'critical'))
- # simulate a test where a directory is deleted, and created again, without deallocation
- clusterDir = os.path.join(TMP_DIR_PREFIX, 'testInfoEmptyDirectory')
- os.makedirs(clusterDir)
- self.assertTrue(os.path.isdir(clusterDir))
- userState = { clusterDir : '456.dummy.id1' }
- self.__setupClusterState(userState, False)
- self.client._op_info(['info', clusterDir])
- self.assertTrue(self.log.hasMessage("Cannot find information for cluster with id '%s' in previously allocated cluster directory '%s'." % (userState[clusterDir], clusterDir), 'critical'))
- os.rmdir(clusterDir)
- # Test info works with an invalid cluster directory
- def testInfoNonExistentDirectory(self):
- clusterDir = '/tmp/hod/testInfoNonExistentDirectory'
- self.client._op_info(['info', clusterDir])
- self.assertTrue(self.log.hasMessage("Invalid hod.clusterdir(--hod.clusterdir or -d). %s : No such directory" % (clusterDir), 'critical'))
- # Test that deallocation works on a deleted cluster directory
- # by clearing the job, and removing the state
- def testDeallocateInvalidDirectory(self):
- clusterDir = os.path.join(TMP_DIR_PREFIX,'testDeallocateInvalidDirectory')
- jobid = '789.dummy.id'
- userState = { clusterDir : jobid }
- self.__setupClusterState(userState)
- self.client._op_deallocate(['deallocate', clusterDir])
- # verify job was deleted
- self.assertTrue(self.cluster.wasOperationPerformed('delete_job', jobid))
- # verify appropriate message was logged.
- self.assertTrue(self.log.hasMessage("Cannot find information for cluster with id '%s' in previously allocated cluster directory '%s'." % (userState[clusterDir], clusterDir), 'critical'))
- self.assertTrue(self.log.hasMessage("Freeing resources allocated to the cluster.", 'critical'))
- # verify that the state information was cleared.
- userState = self.state.read(TEST_CLUSTER_DATA_FILE)
- self.assertFalse(clusterDir in userState.keys())
-
- # simulate a test where a directory is deleted, and created again, without deallocation
- clusterDir = os.path.join(TMP_DIR_PREFIX,'testDeallocateEmptyDirectory')
- os.makedirs(clusterDir)
- self.assertTrue(os.path.isdir(clusterDir))
- jobid = '789.dummy.id1'
- userState = { clusterDir : jobid }
- self.__setupClusterState(userState, False)
- self.client._op_deallocate(['deallocate', clusterDir])
- # verify job was deleted
- self.assertTrue(self.cluster.wasOperationPerformed('delete_job', jobid))
- # verify appropriate message was logged.
- self.assertTrue(self.log.hasMessage("Cannot find information for cluster with id '%s' in previously allocated cluster directory '%s'." % (userState[clusterDir], clusterDir), 'critical'))
- self.assertTrue(self.log.hasMessage("Freeing resources allocated to the cluster.", 'critical'))
- # verify that the state information was cleared.
- userState = self.state.read(TEST_CLUSTER_DATA_FILE)
- self.assertFalse(clusterDir in userState.keys())
- os.rmdir(clusterDir)
-
- # Test that deallocation works on a nonexistent directory.
- def testDeallocateNonExistentDirectory(self):
- clusterDir = os.path.join(TMP_DIR_PREFIX,'testDeallocateNonExistentDirectory')
- self.client._op_deallocate(['deallocate', clusterDir])
- # there should be no call..
- self.assertFalse(self.cluster.wasOperationPerformed('delete_job', None))
- self.assertTrue(self.log.hasMessage("Invalid hod.clusterdir(--hod.clusterdir or -d). %s : No such directory" % (clusterDir), 'critical'))
- # Test that allocation on an previously deleted directory fails.
- def testAllocateOnDeletedDirectory(self):
- clusterDir = os.path.join(TMP_DIR_PREFIX, 'testAllocateOnDeletedDirectory')
- os.makedirs(clusterDir)
- self.assertTrue(os.path.isdir(clusterDir))
- jobid = '1234.abc.com'
- userState = { clusterDir : jobid }
- self.__setupClusterState(userState, False)
- self.client._op_allocate(['allocate', clusterDir, '3'])
- self.assertTrue(self.log.hasMessage("Found a previously allocated cluster at "
- "cluster directory '%s'. HOD cannot determine if this cluster "
- "can be automatically deallocated. Deallocate the cluster if it "
- "is unused." % (clusterDir), 'critical'))
- os.rmdir(clusterDir)
- def __setupClusterState(self, clusterStateMap, verifyDirIsAbsent=True):
- for clusterDir in clusterStateMap.keys():
- # ensure directory doesn't exist, just in case.
- if verifyDirIsAbsent:
- self.assertFalse(os.path.exists(clusterDir))
- # set up required state.
- self.state.write(TEST_CLUSTER_DATA_FILE, clusterStateMap)
- # verify everything is stored correctly.
- state = self.state.read(TEST_CLUSTER_DATA_FILE)
- for clusterDir in clusterStateMap.keys():
- self.assertTrue(clusterDir in state.keys())
- self.assertEquals(clusterStateMap[clusterDir], state[clusterDir])
- class test_InvalidHodStateFiles(unittest.TestCase):
- def setUp(self):
- self.rootDir = '/tmp/hod-%s' % getpass.getuser()
- self.cfg = setupConf() # creat a conf
- # Modify hod.user_state
- self.cfg['hod']['user_state'] = tempfile.mkdtemp(dir=self.rootDir,
- prefix='HodTestSuite.test_InvalidHodStateFiles_')
- self.log = MockLogger() # mock logger
- self.cluster = MockHadoopCluster() # mock hadoop cluster
- self.client = hodRunner(self.cfg, log=self.log, cluster=self.cluster)
- self.state = hodState(self.cfg['hod']['user_state'])
- self.statePath = os.path.join(self.cfg['hod']['user_state'], '%s.state' %
- TEST_CLUSTER_DATA_FILE)
- self.clusterDir = tempfile.mkdtemp(dir=self.rootDir,
- prefix='HodTestSuite.test_InvalidHodStateFiles_')
-
- def testOperationWithInvalidStateFile(self):
- jobid = '1234.hadoop.apache.org'
- # create user state file with invalid permissions
- stateFile = open(self.statePath, "w")
- os.chmod(self.statePath, 000) # has no read/write permissions
- self.client._hodRunner__cfg['hod']['operation'] =
- "info %s" % self.clusterDir
- ret = self.client.operation()
- os.chmod(self.statePath, 700) # restore permissions
- stateFile.close()
- os.remove(self.statePath)
- # print self.log._MockLogger__logLines
- self.assertTrue(self.log.hasMessage(INVALID_STATE_FILE_MSGS[0] %
- os.path.realpath(self.statePath), 'critical'))
- self.assertEquals(ret, 1)
-
- def testAllocateWithInvalidStateFile(self):
- jobid = '1234.hadoop.apache.org'
- # create user state file with invalid permissions
- stateFile = open(self.statePath, "w")
- os.chmod(self.statePath, 0400) # has no write permissions
- self.client._hodRunner__cfg['hod']['operation'] =
- "allocate %s %s" % (self.clusterDir, '3')
- ret = self.client.operation()
- os.chmod(self.statePath, 700) # restore permissions
- stateFile.close()
- os.remove(self.statePath)
- # print self.log._MockLogger__logLines
- self.assertTrue(self.log.hasMessage(INVALID_STATE_FILE_MSGS[2] %
- os.path.realpath(self.statePath), 'critical'))
- self.assertEquals(ret, 1)
-
- def testAllocateWithInvalidStateStore(self):
- jobid = '1234.hadoop.apache.org'
- self.client._hodRunner__cfg['hod']['operation'] =
- "allocate %s %s" % (self.clusterDir, 3)
- ###### check with no executable permissions ######
- stateFile = open(self.statePath, "w") # create user state file
- os.chmod(self.cfg['hod']['user_state'], 0600)
- ret = self.client.operation()
- os.chmod(self.cfg['hod']['user_state'], 0700) # restore permissions
- stateFile.close()
- os.remove(self.statePath)
- # print self.log._MockLogger__logLines
- self.assertTrue(self.log.hasMessage(INVALID_STATE_FILE_MSGS[0] %
- os.path.realpath(self.statePath), 'critical'))
- self.assertEquals(ret, 1)
-
- ###### check with no write permissions ######
- stateFile = open(self.statePath, "w") # create user state file
- os.chmod(self.cfg['hod']['user_state'], 0500)
- ret = self.client.operation()
- os.chmod(self.cfg['hod']['user_state'], 0700) # restore permissions
- stateFile.close()
- os.remove(self.statePath)
- # print self.log._MockLogger__logLines
- self.assertTrue(self.log.hasMessage(INVALID_STATE_FILE_MSGS[0] %
- os.path.realpath(self.statePath), 'critical'))
- self.assertEquals(ret, 1)
- def tearDown(self):
- if os.path.exists(self.clusterDir): os.rmdir(self.clusterDir)
- if os.path.exists(self.cfg['hod']['user_state']):
- os.rmdir(self.cfg['hod']['user_state'])
- class HodTestSuite(BaseTestSuite):
- def __init__(self):
- # suite setup
- BaseTestSuite.__init__(self, __name__, excludes)
- pass
-
- def cleanUp(self):
- # suite tearDown
- pass
- def RunHodTests():
- # modulename_suite
- suite = HodTestSuite()
- testResult = suite.runTests()
- suite.cleanUp()
- return testResult
- if __name__ == "__main__":
- RunHodTests()