util.py
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:9k
- #Licensed to the Apache Software Foundation (ASF) under one
- #or more contributor license agreements. See the NOTICE file
- #distributed with this work for additional information
- #regarding copyright ownership. The ASF licenses this file
- #to you under the Apache License, Version 2.0 (the
- #"License"); you may not use this file except in compliance
- #with the License. You may obtain a copy of the License at
- # http://www.apache.org/licenses/LICENSE-2.0
- #Unless required by applicable law or agreed to in writing, software
- #distributed under the License is distributed on an "AS IS" BASIS,
- #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- #See the License for the specific language governing permissions and
- #limitations under the License.
- import errno, sys, os, traceback, stat, socket, re, warnings, signal
- from hodlib.Common.tcp import tcpSocket, tcpError
- from hodlib.Common.threads import simpleCommand
- setUGV = { 'S_ISUID' : 2, 'S_ISGID' : 1, 'S_ISVTX' : 0 }
- reEscapeSeq = r"\(.)?"
- reEscapeSeq = re.compile(reEscapeSeq)
- HOD_INTERRUPTED_CODE = 127
- HOD_INTERRUPTED_MESG = "Hod interrupted. Cleaning up and exiting"
- TORQUE_USER_LIMITS_COMMENT_FIELD = "User-limits exceeded. " +
- "Requested:([0-9]*) Used:([0-9]*) MaxLimit:([0-9]*)"
- TORQUE_USER_LIMITS_EXCEEDED_MSG = "Requested number of nodes exceeded " +
- "maximum user limits. "
- class AlarmException(Exception):
- def __init__(self, msg=''):
- self.message = msg
- Exception.__init__(self, msg)
- def __repr__(self):
- return self.message
- def isProcessRunning(pid):
- '''Check if a process is running, by sending it a 0 signal, and checking for errors'''
- # This method is documented in some email threads on the python mailing list.
- # For e.g.: http://mail.python.org/pipermail/python-list/2002-May/144522.html
- try:
- os.kill(pid, 0)
- return True
- except OSError, err:
- return err.errno == errno.EPERM
- def untar(file, targetDir):
- status = False
- command = 'tar -C %s -zxf %s' % (targetDir, file)
- commandObj = simpleCommand('untar', command)
- commandObj.start()
- commandObj.wait()
- commandObj.join()
- if commandObj.exit_code() == 0:
- status = True
-
- return status
- def tar(tarFile, tarDirectory, tarList):
- currentDir = os.getcwd()
- os.chdir(tarDirectory)
- status = False
- command = 'tar -czf %s ' % (tarFile)
- for file in tarList:
- command = "%s%s " % (command, file)
-
- commandObj = simpleCommand('tar', command)
- commandObj.start()
- commandObj.wait()
- commandObj.join()
- if commandObj.exit_code() == 0:
- status = True
- else:
- status = commandObj.exit_status_string()
-
- os.chdir(currentDir)
-
- return status
-
- def to_http_url(list):
- """convert [hostname, port] to a http url"""
- str = ''
- str = "http://%s:%s" % (list[0], list[1])
-
- return str
- def get_exception_string():
- (type, value, tb) = sys.exc_info()
- exceptList = traceback.format_exception(type, value, tb)
- exceptString = ''
- for line in exceptList:
- exceptString = "%s%s" % (exceptString, line)
-
- return exceptString
-
- def get_exception_error_string():
- (type, value, tb) = sys.exc_info()
- if value:
- exceptString = "%s %s" % (type, value)
- else:
- exceptString = type
-
- return exceptString
- def check_timestamp(timeStamp):
- """ Checks the validity of a timeStamp.
- timeStamp - (YYYY-MM-DD HH:MM:SS in UTC)
- returns True or False
- """
- isValid = True
- try:
- timeStruct = time.strptime(timeStamp, "%Y-%m-%d %H:%M:%S")
- except:
- isValid = False
- return isValid
- def sig_wrapper(sigNum, handler, *args):
- if args:
- handler(args)
- else:
- handler()
-
- def get_perms(filename):
- mode = stat.S_IMODE(os.stat(filename)[stat.ST_MODE])
- permsString = ''
- permSet = 0
- place = 2
- for who in "USR", "GRP", "OTH":
- for what in "R", "W", "X":
- if mode & getattr(stat,"S_I"+what+who):
- permSet = permSet + 2**place
- place = place - 1
- permsString = "%s%s" % (permsString, permSet)
- permSet = 0
- place = 2
- permSet = 0
- for permFlag in setUGV.keys():
- if mode & getattr(stat, permFlag):
- permSet = permSet + 2**setUGV[permFlag]
- permsString = "%s%s" % (permSet, permsString)
- return permsString
- def local_fqdn():
- """Return a system's true FQDN rather than any aliases, which are
- occasionally returned by socket.gethostname."""
- fqdn = None
- me = os.uname()[1]
- nameInfo=socket.gethostbyname_ex(me)
- nameInfo[1].append(nameInfo[0])
- for name in nameInfo[1]:
- if name.count(".") and name.startswith(me):
- fqdn = name
- if fqdn == None:
- fqdn = me
- return(fqdn)
-
- def need_to_allocate(allocated, config, command):
- status = True
-
- if allocated.isSet():
- status = False
- elif re.search("s*dfs.*$", command) and
- config['gridservice-hdfs']['external']:
- status = False
- elif config['gridservice-mapred']['external']:
- status = False
-
- return status
-
- def filter_warnings():
- warnings.filterwarnings('ignore',
- message=".*?'with' will become a reserved keyword.*")
-
- def args_to_string(list):
- """return a string argument space seperated"""
- arg = ''
- for item in list:
- arg = "%s%s " % (arg, item)
- return arg[:-1]
- def replace_escapes(object):
- """ replace any escaped character. e.g , with , = with = and so on """
- # here object is either a config object or a options object
- for section in object._mySections:
- for option in object._configDef[section].keys():
- if object[section].has_key(option):
- if object._configDef[section][option]['type'] == 'keyval':
- keyValDict = object[section][option]
- object[section][option] = {}
- for (key,value) in keyValDict.iteritems():
- match = reEscapeSeq.search(value)
- if match:
- value = reEscapeSeq.sub(r"1", value)
- object[section][option][key] = value
- def hadoopVersion(hadoopDir, java_home, log):
- # Determine the version of hadoop being used by executing the
- # hadoop version command. Code earlier in idleTracker.py
- hadoopVersion = { 'major' : None, 'minor' : None }
- hadoopPath = os.path.join(hadoopDir, 'bin', 'hadoop')
- cmd = "%s version" % hadoopPath
- log.debug('Executing command %s to find hadoop version' % cmd)
- env = os.environ
- env['JAVA_HOME'] = java_home
- hadoopVerCmd = simpleCommand('HadoopVersion', cmd, env)
- hadoopVerCmd.start()
- hadoopVerCmd.wait()
- hadoopVerCmd.join()
- if hadoopVerCmd.exit_code() == 0:
- verLine = hadoopVerCmd.output()[0]
- log.debug('Version from hadoop command: %s' % verLine)
- hadoopVerRegExp = re.compile("Hadoop ([0-9]+).([0-9]+).*")
- verMatch = hadoopVerRegExp.match(verLine)
- if verMatch != None:
- hadoopVersion['major'] = verMatch.group(1)
- hadoopVersion['minor'] = verMatch.group(2)
- return hadoopVersion
- def get_cluster_status(hdfsAddress, mapredAddress):
- """Determine the status of the cluster based on socket availability
- of HDFS and Map/Reduce."""
- status = 0
- mapredSocket = tcpSocket(mapredAddress)
- try:
- mapredSocket.open()
- mapredSocket.close()
- except tcpError:
- status = 14
- hdfsSocket = tcpSocket(hdfsAddress)
- try:
- hdfsSocket.open()
- hdfsSocket.close()
- except tcpError:
- if status > 0:
- status = 10
- else:
- status = 13
- return status
- def parseEquals(list):
- # takes in a list of keyval pairs e.g ['a=b','c=d'] and returns a
- # dict e.g {'a'='b','c'='d'}. Used in GridService/{mapred.py/hdfs.py} and
- # HodRing/hodring.py. No need for specially treating escaped =. as in =,
- # since all keys are generated by hod and don't contain such anomalies
- dict = {}
- for elems in list:
- splits = elems.split('=')
- dict[splits[0]] = splits[1]
- return dict
- def getMapredSystemDirectory(mrSysDirRoot, userid, jobid):
- return os.path.join(mrSysDirRoot, userid, 'mapredsystem', jobid)
- class HodInterrupt:
- def __init__(self):
- self.HodInterruptFlag = False
- self.log = None
- def set_log(self, log):
- self.log = log
- def init_signals(self):
- def sigStop(sigNum, handler):
- sig_wrapper(sigNum, self.setFlag)
- signal.signal(signal.SIGTERM, sigStop) # 15 : software termination signal
- signal.signal(signal.SIGQUIT, sigStop) # 3 : Quit program
- signal.signal(signal.SIGINT, sigStop) # 2 ^C : Interrupt program
- def sig_wrapper(sigNum, handler, *args):
- self.log.critical("Caught signal %s." % sigNum )
- if args:
- handler(args)
- else:
- handler()
- def setFlag(self, val = True):
- self.HodInterruptFlag = val
- def isSet(self):
- return self.HodInterruptFlag
- class HodInterruptException(Exception):
- def __init__(self, value = ""):
- self.value = value
-
- def __str__(self):
- return repr(self.value)
- hodInterrupt = HodInterrupt()