util.py
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:9k
源码类别:

网格计算

开发平台:

Java

  1. #Licensed to the Apache Software Foundation (ASF) under one
  2. #or more contributor license agreements.  See the NOTICE file
  3. #distributed with this work for additional information
  4. #regarding copyright ownership.  The ASF licenses this file
  5. #to you under the Apache License, Version 2.0 (the
  6. #"License"); you may not use this file except in compliance
  7. #with the License.  You may obtain a copy of the License at
  8. #     http://www.apache.org/licenses/LICENSE-2.0
  9. #Unless required by applicable law or agreed to in writing, software
  10. #distributed under the License is distributed on an "AS IS" BASIS,
  11. #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. #See the License for the specific language governing permissions and
  13. #limitations under the License.
  14. import errno, sys, os, traceback, stat, socket, re, warnings, signal
  15. from hodlib.Common.tcp import tcpSocket, tcpError 
  16. from hodlib.Common.threads import simpleCommand
  17. setUGV   = { 'S_ISUID' : 2, 'S_ISGID' : 1, 'S_ISVTX' : 0 }
  18. reEscapeSeq = r"\(.)?"
  19. reEscapeSeq = re.compile(reEscapeSeq)
  20. HOD_INTERRUPTED_CODE = 127
  21. HOD_INTERRUPTED_MESG = "Hod interrupted. Cleaning up and exiting"
  22. TORQUE_USER_LIMITS_COMMENT_FIELD = "User-limits exceeded. " + 
  23.         "Requested:([0-9]*) Used:([0-9]*) MaxLimit:([0-9]*)"
  24. TORQUE_USER_LIMITS_EXCEEDED_MSG = "Requested number of nodes exceeded " + 
  25.                                   "maximum user limits. "
  26. class AlarmException(Exception):
  27.     def __init__(self, msg=''):
  28.         self.message = msg
  29.         Exception.__init__(self, msg)
  30.     def __repr__(self):
  31.         return self.message
  32. def isProcessRunning(pid):
  33.     '''Check if a process is running, by sending it a 0 signal, and checking for errors'''
  34.     # This method is documented in some email threads on the python mailing list.
  35.     # For e.g.: http://mail.python.org/pipermail/python-list/2002-May/144522.html
  36.     try:
  37.       os.kill(pid, 0)
  38.       return True
  39.     except OSError, err:
  40.       return err.errno == errno.EPERM
  41. def untar(file, targetDir):
  42.     status = False
  43.     command = 'tar -C %s -zxf %s' % (targetDir, file)
  44.     commandObj = simpleCommand('untar', command)
  45.     commandObj.start()
  46.     commandObj.wait()
  47.     commandObj.join()
  48.     if commandObj.exit_code() == 0:
  49.         status = True
  50.         
  51.     return status
  52. def tar(tarFile, tarDirectory, tarList):
  53.     currentDir = os.getcwd()
  54.     os.chdir(tarDirectory)
  55.     status = False
  56.     command = 'tar -czf %s ' % (tarFile)
  57.     for file in tarList:
  58.         command = "%s%s " % (command, file)
  59.     
  60.     commandObj = simpleCommand('tar', command)
  61.     commandObj.start()
  62.     commandObj.wait()
  63.     commandObj.join()
  64.     if commandObj.exit_code() == 0:
  65.         status = True
  66.     else:
  67.         status = commandObj.exit_status_string()
  68.     
  69.     os.chdir(currentDir)
  70.         
  71.     return status
  72.   
  73. def to_http_url(list):
  74.     """convert [hostname, port]  to a http url""" 
  75.     str = ''
  76.     str = "http://%s:%s" % (list[0], list[1])
  77.     
  78.     return str
  79. def get_exception_string():
  80.     (type, value, tb) = sys.exc_info()
  81.     exceptList = traceback.format_exception(type, value, tb)
  82.     exceptString = ''
  83.     for line in exceptList:
  84.         exceptString = "%s%s" % (exceptString, line)
  85.     
  86.     return exceptString
  87.   
  88. def get_exception_error_string():
  89.   (type, value, tb) = sys.exc_info()
  90.   if value:
  91.     exceptString = "%s %s" % (type, value)
  92.   else:
  93.     exceptString = type
  94.     
  95.   return exceptString
  96. def check_timestamp(timeStamp):
  97.     """ Checks the validity of a timeStamp.
  98.         timeStamp - (YYYY-MM-DD HH:MM:SS in UTC)
  99.         returns True or False
  100.     """
  101.     isValid = True
  102.     try:
  103.         timeStruct = time.strptime(timeStamp, "%Y-%m-%d %H:%M:%S")
  104.     except:
  105.         isValid = False
  106.     return isValid
  107. def sig_wrapper(sigNum, handler, *args):
  108.   if args:
  109.       handler(args)
  110.   else:
  111.       handler()
  112.       
  113. def get_perms(filename):
  114.     mode = stat.S_IMODE(os.stat(filename)[stat.ST_MODE])
  115.     permsString = ''
  116.     permSet = 0
  117.     place = 2
  118.     for who in "USR", "GRP", "OTH":
  119.         for what in "R", "W", "X":
  120.             if mode & getattr(stat,"S_I"+what+who):
  121.                 permSet = permSet + 2**place
  122.             place = place - 1
  123.         permsString = "%s%s" % (permsString, permSet)
  124.         permSet = 0
  125.         place = 2
  126.     permSet = 0
  127.     for permFlag in setUGV.keys():
  128.         if mode & getattr(stat, permFlag):
  129.             permSet = permSet + 2**setUGV[permFlag]
  130.     permsString = "%s%s" % (permSet, permsString)
  131.     return permsString
  132. def local_fqdn():
  133.     """Return a system's true FQDN rather than any aliases, which are
  134.        occasionally returned by socket.gethostname."""
  135.     fqdn = None
  136.     me = os.uname()[1]
  137.     nameInfo=socket.gethostbyname_ex(me)
  138.     nameInfo[1].append(nameInfo[0])
  139.     for name in nameInfo[1]:
  140.         if name.count(".") and name.startswith(me):
  141.             fqdn = name
  142.     if fqdn == None:
  143.         fqdn = me
  144.     return(fqdn)
  145.   
  146. def need_to_allocate(allocated, config, command):
  147.     status = True
  148.     
  149.     if allocated.isSet():
  150.         status = False
  151.     elif re.search("s*dfs.*$", command) and 
  152.         config['gridservice-hdfs']['external']:    
  153.         status = False
  154.     elif config['gridservice-mapred']['external']:    
  155.         status = False
  156.         
  157.     return status
  158.   
  159. def filter_warnings():
  160.     warnings.filterwarnings('ignore',
  161.         message=".*?'with' will become a reserved keyword.*")
  162.     
  163. def args_to_string(list):
  164.   """return a string argument space seperated"""
  165.   arg = ''
  166.   for item in list:
  167.     arg = "%s%s " % (arg, item)
  168.   return arg[:-1]
  169. def replace_escapes(object):
  170.   """ replace any escaped character. e.g , with , = with = and so on """
  171.   # here object is either a config object or a options object
  172.   for section in object._mySections:
  173.     for option in object._configDef[section].keys():
  174.       if object[section].has_key(option):
  175.         if object._configDef[section][option]['type'] == 'keyval':
  176.           keyValDict = object[section][option]
  177.           object[section][option] = {}
  178.           for (key,value) in keyValDict.iteritems():
  179.             match = reEscapeSeq.search(value)
  180.             if match:
  181.               value = reEscapeSeq.sub(r"1", value)
  182.             object[section][option][key] = value
  183. def hadoopVersion(hadoopDir, java_home, log):
  184.   # Determine the version of hadoop being used by executing the 
  185.   # hadoop version command. Code earlier in idleTracker.py
  186.   hadoopVersion = { 'major' : None, 'minor' : None }
  187.   hadoopPath = os.path.join(hadoopDir, 'bin', 'hadoop')
  188.   cmd = "%s version" % hadoopPath
  189.   log.debug('Executing command %s to find hadoop version' % cmd)
  190.   env = os.environ
  191.   env['JAVA_HOME'] = java_home
  192.   hadoopVerCmd = simpleCommand('HadoopVersion', cmd, env)
  193.   hadoopVerCmd.start()
  194.   hadoopVerCmd.wait()
  195.   hadoopVerCmd.join()
  196.   if hadoopVerCmd.exit_code() == 0:
  197.     verLine = hadoopVerCmd.output()[0]
  198.     log.debug('Version from hadoop command: %s' % verLine)
  199.     hadoopVerRegExp = re.compile("Hadoop ([0-9]+).([0-9]+).*")
  200.     verMatch = hadoopVerRegExp.match(verLine)
  201.     if verMatch != None:
  202.       hadoopVersion['major'] = verMatch.group(1)
  203.       hadoopVersion['minor'] = verMatch.group(2)
  204.   return hadoopVersion
  205. def get_cluster_status(hdfsAddress, mapredAddress):
  206.   """Determine the status of the cluster based on socket availability
  207.      of HDFS and Map/Reduce."""
  208.   status = 0
  209.   mapredSocket = tcpSocket(mapredAddress)
  210.   try:
  211.     mapredSocket.open()
  212.     mapredSocket.close()
  213.   except tcpError:
  214.     status = 14
  215.   hdfsSocket = tcpSocket(hdfsAddress)
  216.   try:
  217.     hdfsSocket.open()
  218.     hdfsSocket.close()
  219.   except tcpError:
  220.     if status > 0:
  221.       status = 10
  222.     else:
  223.       status = 13
  224.   return status
  225. def parseEquals(list):
  226.   # takes in a list of keyval pairs e.g ['a=b','c=d'] and returns a
  227.   # dict e.g {'a'='b','c'='d'}. Used in GridService/{mapred.py/hdfs.py} and 
  228.   # HodRing/hodring.py. No need for specially treating escaped =. as in =,
  229.   # since all keys are generated by hod and don't contain such anomalies
  230.   dict = {}
  231.   for elems in list:
  232.     splits = elems.split('=')
  233.     dict[splits[0]] = splits[1]
  234.   return dict
  235. def getMapredSystemDirectory(mrSysDirRoot, userid, jobid):
  236.   return os.path.join(mrSysDirRoot, userid, 'mapredsystem', jobid)
  237. class HodInterrupt:
  238.   def __init__(self):
  239.     self.HodInterruptFlag = False
  240.     self.log = None
  241.   def set_log(self, log):
  242.     self.log = log
  243.   def init_signals(self):
  244.     def sigStop(sigNum, handler):
  245.       sig_wrapper(sigNum, self.setFlag)
  246.     signal.signal(signal.SIGTERM, sigStop) # 15 : software termination signal
  247.     signal.signal(signal.SIGQUIT, sigStop) # 3  : Quit program
  248.     signal.signal(signal.SIGINT, sigStop)  # 2 ^C : Interrupt program
  249.     def sig_wrapper(sigNum, handler, *args):
  250.       self.log.critical("Caught signal %s." % sigNum )
  251.       if args:
  252.           handler(args)
  253.       else:
  254.           handler()
  255.   def setFlag(self, val = True):
  256.     self.HodInterruptFlag = val
  257.   def isSet(self):
  258.     return self.HodInterruptFlag
  259. class HodInterruptException(Exception):
  260.   def __init__(self, value = ""):
  261.     self.value = value
  262.     
  263.   def __str__(self):
  264.     return repr(self.value)
  265. hodInterrupt = HodInterrupt()