hodcleanup
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:7k
源码类别:

网格计算

开发平台:

Java

  1. #!/bin/sh
  2. # Licensed to the Apache Software Foundation (ASF) under one or more
  3. # contributor license agreements.  See the NOTICE file distributed with
  4. # this work for additional information regarding copyright ownership.
  5. # The ASF licenses this file to You under the Apache License, Version 2.0
  6. # (the "License"); you may not use this file except in compliance with
  7. # the License.  You may obtain a copy of the License at
  8. #
  9. #     http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. """:"
  17. work_dir=$(dirname $0)
  18. base_name=$(basename $0)
  19. original_dir=$PWD
  20. cd $work_dir
  21. if [ $HOD_PYTHON_HOME ]; then
  22.     exec $HOD_PYTHON_HOME -u -OO $base_name ${1+"$@"}
  23. elif [ -e /usr/bin/python ]; then
  24.     exec /usr/bin/python -u -OO $base_name ${1+"$@"}
  25. elif [ -e /usr/local/bin/python ]; then
  26.     exec /usr/local/bin/python -u -OO $base_name ${1+"$@"}
  27. else
  28.     exec python -u -OO $base_name ${1+"$@"}
  29. fi
  30. ":"""
  31. """The executable to be used by the user"""
  32. import sys, os, re, pwd, threading, sys, random, time, pprint, shutil, time, re
  33. from pprint import pformat
  34. from optparse import OptionParser
  35. myName          = os.path.basename(sys.argv[0])
  36. myName          = re.sub(".*/", "", myName)
  37. binDirectory    = os.path.realpath(sys.argv[0])
  38. rootDirectory   = re.sub("/bin/.*", "", binDirectory)
  39. libDirectory    = rootDirectory
  40. sys.path.append(libDirectory)
  41. from hodlib.Common.threads import simpleCommand
  42. from hodlib.Common.util import local_fqdn, tar, filter_warnings,
  43.                             get_exception_string, get_exception_error_string
  44. from hodlib.Common.logger import hodLog
  45. from hodlib.Common.logger import getLogger
  46. from hodlib.HodRing.hodRing import createMRSystemDirectoryManager
  47. filter_warnings()
  48. reVersion = re.compile(".*(d+_d+).*")
  49. reHdfsURI = re.compile("(hdfs://.*?:d+)(.*)")
  50. VERSION = None
  51. if os.path.exists("./VERSION"):
  52.   vFile = open("./VERSION", 'r')
  53.   VERSION = vFile.readline()
  54.   vFile.close()
  55. def __archive_logs(conf, log):
  56.   # need log-destination-uri, __hadoopLogDirs, temp-dir
  57.   status = True
  58.   logUri = conf['log-destination-uri']
  59.   hadoopLogDirs = conf['hadoop-log-dirs']
  60.   if logUri:
  61.     try:
  62.       if hadoopLogDirs:
  63.         date = time.localtime()
  64.         for logDir in hadoopLogDirs:
  65.           (head, tail) = os.path.split(logDir)
  66.           (head, logType) = os.path.split(head)
  67.           tarBallFile = "%s-%s-%04d%02d%02d%02d%02d%02d-%s.tar.gz" % (
  68.             logType, local_fqdn(), date[0], date[1], date[2], date[3], 
  69.             date[4], date[5], random.randint(0,1000))
  70.           
  71.           if logUri.startswith('file://'):
  72.             tarBallFile = os.path.join(logUri[7:], 
  73.                                        tarBallFile)
  74.           else:
  75.             tarBallFile = os.path.join(conf['temp-dir'], tarBallFile)
  76.           
  77.           log.debug('archiving log files to: %s' % tarBallFile)
  78.           status = tar(tarBallFile, logDir, ['*',])
  79.           log.info('archive %s status: %s' % (tarBallFile, status))
  80.           if status and 
  81.             logUri.startswith('hdfs://'):
  82.             __copy_archive_to_dfs(conf, tarBallFile)
  83.             log.info("copying archive to dfs finished")
  84.         dict = {} 
  85.     except:
  86.       log.error(get_exception_string())
  87.       status = False
  88.   return status
  89. def __copy_archive_to_dfs(conf, archiveFile):
  90.   # need log-destination-uri, hadoopCommandstring and/or pkgs
  91.   hdfsURIMatch = reHdfsURI.match(conf['log-destination-uri'])
  92.   
  93.   (head, tail) = os.path.split(archiveFile)
  94.   destFile = os.path.join(hdfsURIMatch.group(2), conf['user-id'], 'hod-logs', conf['service-id'], tail)
  95.   
  96.   log.info("copying archive %s to DFS %s ..." % (archiveFile, destFile))
  97.   
  98.   hadoopCmd = conf['hadoop-command-string']
  99.   if conf['pkgs']:
  100.     hadoopCmd = os.path.join(conf['pkgs'], 'bin', 'hadoop')
  101.   copyCommand = "%s dfs -fs %s -copyFromLocal %s %s" % (hadoopCmd, 
  102.     hdfsURIMatch.group(1), archiveFile, destFile)
  103.   
  104.   log.debug(copyCommand)
  105.   
  106.   copyThread = simpleCommand('hadoop', copyCommand)
  107.   copyThread.start()
  108.   copyThread.wait()
  109.   copyThread.join()
  110.   log.debug(pprint.pformat(copyThread.output()))
  111.   
  112.   os.unlink(archiveFile)
  113. def unpack():
  114.   parser = OptionParser()
  115.   option_list=["--log-destination-uri", "--hadoop-log-dirs", 
  116.           "--temp-dir", "--hadoop-command-string", "--pkgs", "--user-id", 
  117.           "--service-id", "--hodring-debug", "--hodring-log-dir", 
  118.           "--hodring-syslog-address", "--hodring-cleanup-list", 
  119.           "--jt-pid", "--mr-sys-dir", "--fs-name", "--hadoop-path"]
  120.   regexp = re.compile("^--")
  121.   for opt in option_list:
  122.     parser.add_option(opt,dest=regexp.sub("",opt),action="store")
  123.   option_list.append("--hodring-stream")
  124.   parser.add_option("--hodring-stream",dest="hodring-stream",metavar="bool",
  125.                                                         action="store_true")
  126.   (options, args) = parser.parse_args()
  127.   _options= {}
  128.   _options['hodring'] = {}
  129.   for opt in dir(options):
  130.     if "--"+opt in option_list:
  131.       _options[opt] = getattr(options,opt)
  132.   if _options.has_key('hadoop-log-dirs') and _options['hadoop-log-dirs']:
  133.     _options['hadoop-log-dirs'] = _options['hadoop-log-dirs'].split(",")
  134.   if _options.has_key('hodring-syslog-address') and _options['hodring-syslog-address']:
  135.     _options['hodring']['syslog-address'] = 
  136.         _options['hodring-syslog-address'].split(':')
  137.   _options['hodring']['debug']        = int(_options['hodring-debug'])
  138.   _options['hodring']['log-dir']      = _options['hodring-log-dir']
  139.   _options['hodring']['stream']      = _options['hodring-stream']
  140.   _options['hodring']['userid']      = _options['user-id']
  141.   os.putenv('PBS_JOBID', _options['service-id'] )
  142.   return _options
  143.  
  144. if __name__ == '__main__':  
  145.   log = None
  146.   try:
  147.     conf = unpack()
  148.     # Use the same log as hodring
  149.     log = getLogger(conf['hodring'],'hodring')
  150.     log.debug("Logger initialised successfully")
  151.     mrSysDirManager = createMRSystemDirectoryManager(conf, log)
  152.     if mrSysDirManager is not None:
  153.       mrSysDirManager.removeMRSystemDirectory()
  154.     status =  __archive_logs(conf,log)
  155.     log.info("Archive status : %s" % status)
  156.     list = conf['hodring-cleanup-list'].split(',')
  157.     log.info("now removing %s" % list)
  158.     for dir in list:
  159.      if os.path.exists(dir):
  160.        log.debug('removing %s' % (dir))
  161.        shutil.rmtree(dir, True)
  162.        log.debug("done")
  163.     log.info("Cleanup successfully completed")
  164.   except Exception, e:
  165.     if log:
  166.       log.info("Stack trace:n%sn%s" %(get_exception_error_string(),get_exception_string()))