hodring
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:10k
- #!/bin/sh
- # Licensed to the Apache Software Foundation (ASF) under one or more
- # contributor license agreements. See the NOTICE file distributed with
- # this work for additional information regarding copyright ownership.
- # The ASF licenses this file to You under the Apache License, Version 2.0
- # (the "License"); you may not use this file except in compliance with
- # the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- """:"
- work_dir=$(dirname $0)
- base_name=$(basename $0)
- cd $work_dir
- if [ $HOD_PYTHON_HOME ]; then
- exec $HOD_PYTHON_HOME -OO $base_name ${1+"$@"}
- elif [ -e /usr/bin/python ]; then
- exec /usr/bin/python -OO $base_name ${1+"$@"}
- elif [ -e /usr/local/bin/python ]; then
- exec /usr/local/bin/python -OO $base_name ${1+"$@"}
- else
- exec python -OO $base_name ${1+"$@"}
- fi
- ":"""
- """The executable to be used by the user"""
- import sys, os, re
- myName = os.path.basename(sys.argv[0])
- myName = re.sub(".*/", "", myName)
- binDirectory = os.path.realpath(sys.argv[0])
- rootDirectory = re.sub("/bin/.*", "", binDirectory)
- libDirectory = rootDirectory
- sys.path.append(libDirectory)
- from hodlib.HodRing.hodRing import HodRing
- from hodlib.Common.setup import *
- from hodlib.Common.util import filter_warnings, get_exception_string,
- get_exception_error_string, getMapredSystemDirectory,
- to_http_url, local_fqdn
- from hodlib.Common.logger import getLogger, ensureLogDir
- from hodlib.Common.xmlrpc import hodXRClient
- filter_warnings()
- reVersion = re.compile(".*(d+_d+).*")
- VERSION = '$HeadURL$'
- reMatch = reVersion.match(VERSION)
- if reMatch:
- VERSION = reMatch.group(1)
- VERSION = re.sub("_", ".", VERSION)
- else:
- VERSION = 'DEV'
- # Definition tuple is of the form:
- # (name, type, description, default value, required?, validate?)
- #
- defList = { 'hodring' : (
- ('temp-dir', 'directory', 'hod work directories',
- False, None, True, False),
-
- ('log-dir', 'directory', 'hod logging directory.',
- False, os.path.join(rootDirectory, 'logs'), False, True),
- ('log-destination-uri', 'string',
- 'URI to store logs to, local://some_path or '
- + 'hdfs://host:port/some_path',
- False, None, False, True),
- ('pkgs', 'directory', 'Path to Hadoop to use in case of uploading to HDFS',
- False, None, False, True),
-
- ('syslog-address', 'address', 'Syslog address.',
- False, None, False, True),
-
- ('java-home', 'directory', 'Java home directory.',
- False, None, True, True),
-
- ('debug', 'pos_int', 'Debugging level, 0-4.',
- False, 3, True, True),
-
- ('register', 'bool', 'Register with service registry?',
- False, True, True, True),
-
- ('stream', 'bool', 'Output to stderr.',
- False, False, False, True),
- ('userid', 'user_account',
- 'User ID the hod shell is running under.',
- False, None, True, False),
- ('xrs-port-range', 'range', 'XML-RPC port range n-m.',
- False, None, True, True),
-
- ('http-port-range', 'range', 'HTTP port range n-m.',
- False, None, True, True),
-
- ('command', 'string', 'Command for hodring to run.',
- False, None, False, True),
-
- ('service-id', 'string', 'Service ID.',
- False, None, False, True),
-
- ('download-addr', 'string', 'Download HTTP address.',
- False, None, False, True),
-
- ('svcrgy-addr', 'address', 'Service registry XMLRPC address.',
- False, None, True, True),
-
- ('ringmaster-xrs-addr', 'address', 'Ringmaster XML-RPC address.',
- False, None, False, True),
-
- ('tarball-retry-initial-time', 'pos_float','initial retry time for tarball download',
- False, 1, False, True),
-
- ('tarball-retry-interval', 'pos_float','interval to spread retries for tarball download',
- False, 3, False, True),
-
- ('cmd-retry-initial-time', 'pos_float','initial retry time for getting commands',
- False, 2, False, True),
-
- ('cmd-retry-interval', 'pos_float','interval to spread retries for getting commands',
- False, 2, False, True),
- ('mapred-system-dir-root', 'string', 'Root under which mapreduce system directory names are generated by HOD.',
- False, '/mapredsystem', False, False))
- }
- if __name__ == '__main__':
- confDef = definition()
- confDef.add_defs(defList)
- hodRingOptions = options(confDef, "./%s [OPTIONS]" % myName, VERSION)
- ensureLogDir(hodRingOptions['hodring']['log-dir'])
- service = None
- try:
- (status, statusMsgs) = hodRingOptions.verify()
- if not status:
- raise Exception("%s" % statusMsgs)
- hodRingOptions['hodring']['base-dir'] = rootDirectory
- service = HodRing(hodRingOptions)
- service.start()
- service.wait()
-
- if service.log:
- log = service.log
- else:
- log = getLogger(hodRingOptions['hodring'],'hodring')
- list = []
-
- runningHadoops = service.getRunningValues()
- mrSysDirManager = None
- for cmd in runningHadoops:
- if cmd.name == 'jobtracker':
- mrSysDirManager = cmd.getMRSystemDirectoryManager()
- log.debug("addding %s to cleanup list..." % cmd)
- cmd.addCleanup(list)
-
- list.append(service.getTempDir())
- log.debug(list)
-
- # archive_logs now
- cmdString = os.path.join(rootDirectory, "bin", "hodcleanup") # same python
- if (len(runningHadoops) == 0):
- log.info("len(runningHadoops) == 0, No running cluster?")
- log.info("Skipping __copy_archive_to_dfs")
- hadoopString = ""
- else: hadoopString=runningHadoops[0].path
- #construct the arguments
- if hodRingOptions['hodring'].has_key('log-destination-uri'):
- cmdString = cmdString + " --log-destination-uri "
- + hodRingOptions['hodring']['log-destination-uri']
- hadoopLogDirs = service.getHadoopLogDirs()
- if hadoopLogDirs:
- cmdString = cmdString
- + " --hadoop-log-dirs "
- + ",".join(hadoopLogDirs)
- cmdString = cmdString
- + " --temp-dir "
- + service._cfg['temp-dir']
- + " --hadoop-command-string "
- + hadoopString
- + " --user-id "
- + service._cfg['userid']
- + " --service-id "
- + service._cfg['service-id']
- + " --hodring-debug "
- + str(hodRingOptions['hodring']['debug'])
- + " --hodring-log-dir "
- + hodRingOptions['hodring']['log-dir']
- + " --hodring-cleanup-list "
- + ",".join(list)
- if hodRingOptions['hodring'].has_key('syslog-address'):
- syslogAddr = hodRingOptions['hodring']['syslog-address'][0] +
- ':' + str(hodRingOptions['hodring']['syslog-address'][1])
- cmdString = cmdString + " --hodring-syslog-address " + syslogAddr
- if service._cfg.has_key('pkgs'):
- cmdString = cmdString + " --pkgs " + service._cfg['pkgs']
- if mrSysDirManager is not None:
- cmdString = "%s %s" % (cmdString, mrSysDirManager.toCleanupArgs())
- log.info("cleanup commandstring : ")
- log.info(cmdString)
- # clean up
- cmd = ['/bin/sh', '-c', cmdString]
- mswindows = (sys.platform == "win32")
- originalcwd = os.getcwd()
- if not mswindows:
- try:
- pid = os.fork()
- if pid > 0:
- # exit first parent
- log.info("child(pid: %s) is now doing cleanup" % pid)
- sys.exit(0)
- except OSError, e:
- log.error("fork failed: %d (%s)" % (e.errno, e.strerror))
- sys.exit(1)
- # decouple from parent environment
- os.chdir("/")
- os.setsid()
- os.umask(0)
-
- MAXFD = 128 # more than enough file descriptors to close. Just in case.
- for i in xrange(0, MAXFD):
- try:
- os.close(i)
- except OSError:
- pass
-
- try:
- os.execvp(cmd[0], cmd)
- finally:
- log.critical("exec failed")
- os._exit(1)
- except Exception, e:
- if service:
- if service.log:
- log = service.log
- else:
- log = getLogger(hodRingOptions['hodring'], 'hodring')
- log.error("Error in bin/hodring %s. nStack trace:n%s" %(get_exception_error_string(),get_exception_string()))
-
- log.info("now trying informing to ringmaster")
- log.info(hodRingOptions['hodring']['ringmaster-xrs-addr'])
- log.info(hodRingOptions.normalizeValue('hodring', 'ringmaster-xrs-addr'))
- log.info(to_http_url(hodRingOptions.normalizeValue(
- 'hodring', 'ringmaster-xrs-addr')))
- # Report errors to the Ringmaster if possible
- try:
- ringXRAddress = to_http_url(hodRingOptions.normalizeValue(
- 'hodring', 'ringmaster-xrs-addr'))
- log.debug("Creating ringmaster XML-RPC client.")
- ringClient = hodXRClient(ringXRAddress)
- if ringClient is not None:
- addr = local_fqdn() + "_" + str(os.getpid())
- ringClient.setHodRingErrors(addr, str(e))
- log.info("Reported errors to ringmaster at %s" % ringXRAddress)
- except Exception, e:
- log.error("Failed to report errors to ringmaster at %s" % ringXRAddress)
- log.error("Reason : %s" % get_exception_string())
- # End of reporting errors to the client