hodcleanup
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:7k
- #!/bin/sh
- # Licensed to the Apache Software Foundation (ASF) under one or more
- # contributor license agreements. See the NOTICE file distributed with
- # this work for additional information regarding copyright ownership.
- # The ASF licenses this file to You under the Apache License, Version 2.0
- # (the "License"); you may not use this file except in compliance with
- # the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- """:"
- work_dir=$(dirname $0)
- base_name=$(basename $0)
- original_dir=$PWD
- cd $work_dir
- if [ $HOD_PYTHON_HOME ]; then
- exec $HOD_PYTHON_HOME -u -OO $base_name ${1+"$@"}
- elif [ -e /usr/bin/python ]; then
- exec /usr/bin/python -u -OO $base_name ${1+"$@"}
- elif [ -e /usr/local/bin/python ]; then
- exec /usr/local/bin/python -u -OO $base_name ${1+"$@"}
- else
- exec python -u -OO $base_name ${1+"$@"}
- fi
- ":"""
- """The executable to be used by the user"""
- import sys, os, re, pwd, threading, sys, random, time, pprint, shutil, time, re
- from pprint import pformat
- from optparse import OptionParser
- myName = os.path.basename(sys.argv[0])
- myName = re.sub(".*/", "", myName)
- binDirectory = os.path.realpath(sys.argv[0])
- rootDirectory = re.sub("/bin/.*", "", binDirectory)
- libDirectory = rootDirectory
- sys.path.append(libDirectory)
- from hodlib.Common.threads import simpleCommand
- from hodlib.Common.util import local_fqdn, tar, filter_warnings,
- get_exception_string, get_exception_error_string
- from hodlib.Common.logger import hodLog
- from hodlib.Common.logger import getLogger
- from hodlib.HodRing.hodRing import createMRSystemDirectoryManager
- filter_warnings()
- reVersion = re.compile(".*(d+_d+).*")
- reHdfsURI = re.compile("(hdfs://.*?:d+)(.*)")
- VERSION = None
- if os.path.exists("./VERSION"):
- vFile = open("./VERSION", 'r')
- VERSION = vFile.readline()
- vFile.close()
- def __archive_logs(conf, log):
- # need log-destination-uri, __hadoopLogDirs, temp-dir
- status = True
- logUri = conf['log-destination-uri']
- hadoopLogDirs = conf['hadoop-log-dirs']
- if logUri:
- try:
- if hadoopLogDirs:
- date = time.localtime()
- for logDir in hadoopLogDirs:
- (head, tail) = os.path.split(logDir)
- (head, logType) = os.path.split(head)
- tarBallFile = "%s-%s-%04d%02d%02d%02d%02d%02d-%s.tar.gz" % (
- logType, local_fqdn(), date[0], date[1], date[2], date[3],
- date[4], date[5], random.randint(0,1000))
-
- if logUri.startswith('file://'):
- tarBallFile = os.path.join(logUri[7:],
- tarBallFile)
- else:
- tarBallFile = os.path.join(conf['temp-dir'], tarBallFile)
-
- log.debug('archiving log files to: %s' % tarBallFile)
- status = tar(tarBallFile, logDir, ['*',])
- log.info('archive %s status: %s' % (tarBallFile, status))
- if status and
- logUri.startswith('hdfs://'):
- __copy_archive_to_dfs(conf, tarBallFile)
- log.info("copying archive to dfs finished")
- dict = {}
- except:
- log.error(get_exception_string())
- status = False
- return status
- def __copy_archive_to_dfs(conf, archiveFile):
- # need log-destination-uri, hadoopCommandstring and/or pkgs
- hdfsURIMatch = reHdfsURI.match(conf['log-destination-uri'])
-
- (head, tail) = os.path.split(archiveFile)
- destFile = os.path.join(hdfsURIMatch.group(2), conf['user-id'], 'hod-logs', conf['service-id'], tail)
-
- log.info("copying archive %s to DFS %s ..." % (archiveFile, destFile))
-
- hadoopCmd = conf['hadoop-command-string']
- if conf['pkgs']:
- hadoopCmd = os.path.join(conf['pkgs'], 'bin', 'hadoop')
- copyCommand = "%s dfs -fs %s -copyFromLocal %s %s" % (hadoopCmd,
- hdfsURIMatch.group(1), archiveFile, destFile)
-
- log.debug(copyCommand)
-
- copyThread = simpleCommand('hadoop', copyCommand)
- copyThread.start()
- copyThread.wait()
- copyThread.join()
- log.debug(pprint.pformat(copyThread.output()))
-
- os.unlink(archiveFile)
- def unpack():
- parser = OptionParser()
- option_list=["--log-destination-uri", "--hadoop-log-dirs",
- "--temp-dir", "--hadoop-command-string", "--pkgs", "--user-id",
- "--service-id", "--hodring-debug", "--hodring-log-dir",
- "--hodring-syslog-address", "--hodring-cleanup-list",
- "--jt-pid", "--mr-sys-dir", "--fs-name", "--hadoop-path"]
- regexp = re.compile("^--")
- for opt in option_list:
- parser.add_option(opt,dest=regexp.sub("",opt),action="store")
- option_list.append("--hodring-stream")
- parser.add_option("--hodring-stream",dest="hodring-stream",metavar="bool",
- action="store_true")
- (options, args) = parser.parse_args()
- _options= {}
- _options['hodring'] = {}
- for opt in dir(options):
- if "--"+opt in option_list:
- _options[opt] = getattr(options,opt)
- if _options.has_key('hadoop-log-dirs') and _options['hadoop-log-dirs']:
- _options['hadoop-log-dirs'] = _options['hadoop-log-dirs'].split(",")
- if _options.has_key('hodring-syslog-address') and _options['hodring-syslog-address']:
- _options['hodring']['syslog-address'] =
- _options['hodring-syslog-address'].split(':')
- _options['hodring']['debug'] = int(_options['hodring-debug'])
- _options['hodring']['log-dir'] = _options['hodring-log-dir']
- _options['hodring']['stream'] = _options['hodring-stream']
- _options['hodring']['userid'] = _options['user-id']
- os.putenv('PBS_JOBID', _options['service-id'] )
- return _options
-
- if __name__ == '__main__':
- log = None
- try:
- conf = unpack()
- # Use the same log as hodring
- log = getLogger(conf['hodring'],'hodring')
- log.debug("Logger initialised successfully")
- mrSysDirManager = createMRSystemDirectoryManager(conf, log)
- if mrSysDirManager is not None:
- mrSysDirManager.removeMRSystemDirectory()
- status = __archive_logs(conf,log)
- log.info("Archive status : %s" % status)
- list = conf['hodring-cleanup-list'].split(',')
- log.info("now removing %s" % list)
- for dir in list:
- if os.path.exists(dir):
- log.debug('removing %s' % (dir))
- shutil.rmtree(dir, True)
- log.debug("done")
- log.info("Cleanup successfully completed")
- except Exception, e:
- if log:
- log.info("Stack trace:n%sn%s" %(get_exception_error_string(),get_exception_string()))