hod
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:23k
- #!/bin/sh
- # Licensed to the Apache Software Foundation (ASF) under one or more
- # contributor license agreements. See the NOTICE file distributed with
- # this work for additional information regarding copyright ownership.
- # The ASF licenses this file to You under the Apache License, Version 2.0
- # (the "License"); you may not use this file except in compliance with
- # the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- """:"
- work_dir=$(dirname $0)
- base_name=$(basename $0)
- original_dir=$PWD
- cd $work_dir
- if [ $HOD_PYTHON_HOME ]; then
- exec $HOD_PYTHON_HOME -u -OO $base_name ${1+"$@"} --hod.original-dir $original_dir
- elif [ -e /usr/bin/python ]; then
- exec /usr/bin/python -u -OO $base_name ${1+"$@"} --hod.original-dir $original_dir
- elif [ -e /usr/local/bin/python ]; then
- exec /usr/local/bin/python -u -OO $base_name ${1+"$@"} --hod.original-dir $original_dir
- else
- exec python -u -OO $base_name ${1+"$@"} --hod.original-dir $work_dir
- fi
- ":"""
- """The executable to be used by the user"""
- import sys, os, re, pwd, threading, sys
- myName = os.path.basename(sys.argv[0])
- myName = re.sub(".*/", "", myName)
- binDirectory = os.path.realpath(sys.argv[0])
- rootDirectory = re.sub("/bin/.*", "", binDirectory)
- libDirectory = rootDirectory
- sys.path.append(libDirectory)
- from hodlib.Hod.hod import hodRunner
- from hodlib.Common.setup import *
- from hodlib.Common.descGenerator import *
- from hodlib.Common.util import local_fqdn, need_to_allocate, filter_warnings,
- get_exception_error_string, hodInterrupt,
- HOD_INTERRUPTED_MESG, HOD_INTERRUPTED_CODE,
- TORQUE_USER_LIMITS_COMMENT_FIELD
- from hodlib.Common.tcp import tcpError, tcpSocket
- from hodlib.Hod.hod import hodHelp
- filter_warnings()
- reVersion = re.compile(".*(d+_d+).*")
- VERSION = None
- if os.path.exists("./VERSION"):
- vFile = open("./VERSION", 'r')
- VERSION = vFile.readline()
- vFile.close()
- # Always look for hodrc file here unless otherwise specified with -c:
- DEFAULT_LOC = os.path.join(rootDirectory, 'conf')
- DEFAULT_HOD_DIR = os.path.join(os.environ['HOME'], ".hod")
- if not os.path.isdir(DEFAULT_HOD_DIR):
- os.mkdir(DEFAULT_HOD_DIR, 0777)
- DEFAULT_CONFIG = os.path.join(DEFAULT_HOD_DIR, 'hodrc')
- if not os.path.exists(DEFAULT_CONFIG):
- if os.environ.has_key('HOD_CONF_DIR') and os.environ['HOD_CONF_DIR'] is not None:
- DEFAULT_CONFIG = os.path.join(os.environ['HOD_CONF_DIR'], 'hodrc')
- # Definition tuple is of the form:
- # (name, type, description, help?, default value, required?, validate?,
- # short option)
- #
- defList = { 'hod' : (
- ('original-dir', 'directory', 'hod original start directory',
- False, None, True, True, 'r'),
- ('clusterdir', 'directory',
- 'Directory where cluster state information and hadoop-site.xml' +
- ' will be stored.',
- True, None, False, False, 'd'),
- ('syslog-address', 'address', 'Syslog address.',
- False, None, False, True, 'y'),
-
- ('java-home', 'directory', 'Java home directory.',
- True, None, True, True, 'j'),
-
- ('debug', 'pos_int', 'Debugging level, 0-4.',
- True, 3, True, True, 'b'),
-
- ('stream', 'bool', 'Output to stderr.',
- False, True, False, True),
- ('nodecount', 'pos_int',
- 'Number of nodes to allocate at startup. ',
- True, None, False, True, 'n'),
- ('script', 'file', 'Hadoop script to execute.',
- True, None, False, False, 's'),
- ('userid', 'user_account',
- 'User ID the hod shell is running under.',
- False, pwd.getpwuid(os.getuid())[0], False, True, 'u'),
-
- ('allocate-wait-time', 'pos_int',
- 'Time to wait for cluster allocation.',
- False, 300, True, True, 'e'),
-
- ('operation', 'string',
- 'Initiate a hod operation. (help, allocate, deallocate ...)',
- False, None, False, True, 'o'),
-
- ('cluster-factor', 'pos_float',
- 'The number of grid slots per machines', False, 1.9, False, True,
- 'x'),
-
- ('cluster', 'string', 'Name of cluster being used.',
- False, None, True, True, 'w'),
- ('proxy-xrs-address', 'address',
- 'Address to Allocation Manager XML RPC proxy.',
- False, None, False, True, 'p'),
-
- ('xrs-port-range', 'range', 'XML-RPC port range n-m.',
- False, None, True, True),
- ('client-params', 'keyval', 'Hadoop client xml key/value list',
- True, None, False, True, 'C'),
- ('hadoop-ui-log-dir', 'directory', 'Directory to store Web UI Logs of Hadoop',
- True, None, False, True),
- ('temp-dir', 'directory', 'HOD temporary directories.',
- False, None, True, False),
- ('update-worker-info', 'bool', 'Specifies whether to update Worker Info after allocation',
- False, False, False, True),
- ('job-feasibility-attr', 'string', 'Specifies whether to check job feasibility - resource manager and/or scheduler limits, also gives the attribute value',
- False, None, False, True),
- ('title', 'string', 'Title for the current HOD allocation.',
- True, "HOD", False, True, 'N'),
- ('walltime', 'pos_int', 'Walltime in seconds for the current HOD allocation',
- True, None, False, True, 'l'),
- ('script-wait-time', 'pos_int', 'Specifies the time to wait before running the script. Used with the hod.script option.',
- True, 10, False, True, 'W'),
- ('log-rollover-count', 'pos_int', 'Specifies the number of rolled-over log files of HOD client. A zero value disables rollover.',
- True, 5, False, True, 'L'),
- ('job-status-query-interval', 'pos_int', 'Specifies the time between checking for job status',
- False, 30, False, True),
- ('job-command-failure-interval', 'pos_int', 'Specifies the time between checking for failed job status or submission commands',
- False, 10, False, True),
- ('job-status-query-failure-retries', 'pos_int', 'Specifies the number of times job status failure queries are retried',
- False, 3, False, True),
- ('job-submission-failure-retries', 'pos_int', 'Specifies the number of times job submission failure queries are retried',
- False, 3, False, True)),
- 'resource_manager' : (
- ('id', 'string', 'Batch scheduler ID: torque|condor.',
- False, None, True, True),
-
- ('pbs-user', 'user_account', 'User ID jobs are submitted under.',
- False, None, False, True),
-
- ('pbs-account', 'string', 'User Account jobs are submitted under.',
- True, None, False, False, 'A'),
-
- ('queue', 'string', 'Queue of the batch scheduler to query.',
- True, 'batch', False, True, 'Q'),
-
- ('batch-home', 'directory', 'Scheduler installation directory.',
- False, None, True, True),
-
- ('options', 'keyval', 'Options to pass to the scheduler.',
- False, None, False, True),
- ('env-vars', 'keyval', 'Environment variables to pass to the submitted jobs.',
- False, None, False, True)),
-
- 'ringmaster' : (
- ('work-dirs', 'list', 'hod work directories',
- False, None, True, False),
- ('temp-dir', 'directory', 'Ringmaster temporary directory.',
- False, None, True, False),
-
- ('log-dir', 'directory', 'hod logging directory.',
- False, os.path.join(rootDirectory, 'logs'), False, False),
- ('syslog-address', 'address', 'Syslog address.',
- False, None, False, True),
- ('xrs-port-range', 'range', 'XML-RPC port range n-m.',
- False, None, True, True),
-
- ('http-port-range', 'range', 'HTTP port range n-m.',
- False, None, True, True),
-
- ('debug', 'pos_int', 'Debugging level, 0-4.',
- False, 4, True, True),
-
- ('register', 'bool', 'Register with service registry?',
- False, True, True, True),
-
- ('stream', 'bool', 'Output to stderr.',
- False, False, False, True),
-
- ('userid', 'user_account',
- 'User ID the hod shell is running under.',
- False, pwd.getpwuid(os.getuid())[0], False, True),
-
- ('svcrgy-addr', 'address', 'Download HTTP address.',
- False, None, False, False),
-
- ('hadoop-tar-ball', 'uri', 'hadoop program tar ball.',
- True, None, False, False, 't'),
- ('max-connect','pos_int','max connections allowed for a single tarball server',
- False, 30, False, True),
- ('jt-poll-interval', 'pos_int', 'How often to poll the Job tracker for idleness',
- False, 120, False, True),
- ('idleness-limit', 'pos_int', 'Limit after which to deallocate the cluster',
- False, 3600, False, True),
- ('max-master-failures', 'pos_int',
- 'Defines how many times a master can fail before'
- ' failing cluster allocation', False, 5, True, True),
- ('workers_per_ring', 'pos_int', 'Defines number of workers per service per hodring',
- False, 1, False, True)),
- 'gridservice-mapred' : (
- ('external', 'bool', "Connect to an already running MapRed?",
- False, False, True, True),
-
- ('host', 'hostname', 'Mapred hostname.',
- False, 'localhost', False, False),
- ('info_port', 'pos_int', 'Mapred info port.',
- False, None, False, False),
-
- ('tracker_port', 'pos_int', 'Mapred job tracker port.',
- False, None, False, False),
-
- ('cmdline-params', 'keyval', 'Hadoop cmdline key/value list.',
- False, None, False, False),
- ('server-params', 'keyval', 'Hadoop xml key/value list',
- True, None, False, True, 'M'),
-
- ('envs', 'keyval', 'environment to run this package in',
- False, None, False, True),
- ('final-server-params', 'keyval', 'Hadoop final xml key/val list',
- False, None, False, True, 'F'),
- ('pkgs', 'directory', "directory where the package is installed",
- False, None, False, False)),
-
-
- 'gridservice-hdfs' : (
- ('external', 'bool', "Connect to an already running HDFS?",
- False, False, True, True),
-
- ('host', 'hostname', 'HDFS hostname.',
- False, 'localhost', False, False),
-
- ('fs_port', 'pos_int', 'HDFS port.',
- False, None, False, False),
-
- ('info_port', 'pos_int', 'HDFS info port.',
- False, None, False, False),
-
- ('cmdline-params', 'keyval', 'Hadoop cmdline key/value list.',
- False, None, False, False),
- ('server-params', 'keyval', 'Hadoop xml key/value list',
- False, None, False, True, 'H'),
- ('final-server-params', 'keyval', 'Hadoop final xml key/value list',
- False, None, False, True, 'S'),
-
- ('envs', 'keyval', 'Environment in which to run this package.',
- False, None, False, True),
- ('pkgs', 'directory', "directory where the package is installed",
- False, None, False, False)),
-
-
- 'hodring' : (
- ('temp-dir', 'list', 'hodring temporary directory.',
- False, None, True, False),
-
- ('log-dir', 'directory', 'hod logging directory.',
- False, os.path.join(rootDirectory, 'logs'), False, False),
-
- ('log-destination-uri', 'string',
- 'URI to store logs to, local://some_path or '
- + 'hdfs://host:port/some_path',
- False, None, False, True),
- ('pkgs', 'directory', 'Path to Hadoop to use in case of uploading to HDFS',
- False, None, False, False),
-
- ('syslog-address', 'address', 'Syslog address.',
- False, None, False, True),
-
- ('java-home', 'directory', 'Java home directory.',
- False, None, True, False),
-
- ('debug', 'pos_int', 'Debugging level, 0-4.',
- False, 3, True, True),
-
- ('register', 'bool', 'Register with service registry?',
- False, True, True, True),
-
- ('stream', 'bool', 'Output to stderr.',
- False, False, False, True),
- ('userid', 'user_account',
- 'User ID the hod shell is running under.',
- False, pwd.getpwuid(os.getuid())[0], False, True),
-
- ('command', 'string', 'Command for hodring to run.',
- False, None, False, True),
- ('xrs-port-range', 'range', 'XML-RPC port range n-m.',
- False, None, True, True),
-
- ('http-port-range', 'range', 'HTTP port range n-m.',
- False, None, True, True),
-
- ('service-id', 'string', 'Service ID.',
- False, None, False, True),
-
- ('download-addr', 'string', 'Download HTTP address.',
- False, None, False, True),
-
- ('svcrgy-addr', 'address', 'Download HTTP address.',
- False, None, False, True),
-
- ('ringmaster-xrs-addr', 'address', 'Ringmaster XML-RPC address.',
- False, None, False, True),
- ('tarball-retry-initial-time', 'pos_float','Initial Retry time for tarball download',
- False, 1, False, True),
-
- ('tarball-retry-interval', 'pos_float','interval to spread retries for tarball download',
- False, 3, False, True),
-
- ('cmd-retry-initial-time', 'pos_float','Initial retry time for getting commands',
- False, 2, False, True),
-
- ('cmd-retry-interval', 'pos_float','interval to spread retries for getting commands',
- False, 2, False, True),
- ('mapred-system-dir-root', 'string', 'Root under which mapreduce system directory names are generated by HOD.',
- False, '/mapredsystem', False, False))
- }
- defOrder = [ 'hod', 'ringmaster', 'hodring', 'resource_manager',
- 'gridservice-mapred', 'gridservice-hdfs' ]
- def printErrors(msgs):
- for msg in msgs:
- print msg
- def op_requires_pkgs(config):
- if config['hod'].has_key('operation'):
- return config['hod']['operation'].startswith('allocate')
- else:
- return config['hod'].has_key('script')
- if __name__ == '__main__':
- try:
- confDef = definition()
- confDef.add_defs(defList, defOrder)
- hodhelp = hodHelp()
- usage = hodhelp.help()
-
- hodOptions = options(confDef, usage,
- VERSION, withConfig=True, defaultConfig=DEFAULT_CONFIG,
- name=myName )
- # hodConfig is a dict like object, hodConfig[section][name]
- try:
- hodConfig = config(hodOptions['config'], configDef=confDef,
- originalDir=hodOptions['hod']['original-dir'],
- options=hodOptions)
- except IOError, e:
- print >>sys.stderr,"error: %s not found. Specify the path to the HOD configuration file, or define the environment variable %s under which a file named hodrc can be found." % (hodOptions['config'], 'HOD_CONF_DIR')
- sys.exit(1)
-
- # Conditional validation
- statusMsgs = []
- if hodConfig.normalizeValue('gridservice-hdfs', 'external'):
- # For external HDFS
- statusMsgs.extend(hodConfig.validateValue('gridservice-hdfs',
- 'fs_port'))
- statusMsgs.extend(hodConfig.validateValue('gridservice-hdfs',
- 'info_port'))
- statusMsgs.extend(hodConfig.validateValue('gridservice-hdfs',
- 'host'))
- else:
- hodConfig['gridservice-hdfs']['fs_port'] = 0 # Dummy
- hodConfig['gridservice-hdfs']['info_port'] = 0 # Not used at all
- if hodConfig.normalizeValue('gridservice-mapred', 'external'):
- statusMsgs.extend(hodConfig.validateValue('gridservice-mapred',
- 'tracker_port'))
- statusMsgs.extend(hodConfig.validateValue('gridservice-mapred',
- 'info_port'))
- statusMsgs.extend(hodConfig.validateValue('gridservice-mapred',
- 'host'))
- else:
- hodConfig['gridservice-mapred']['tracker_port'] = 0 # Dummy
- hodConfig['gridservice-mapred']['info_port'] = 0 # Not used at all
- if len(statusMsgs) != 0:
- for msg in statusMsgs:
- print >>sys.stderr, msg
- sys.exit(1)
- # End of conditional validation
- status = True
- statusMsgs = []
-
- (status,statusMsgs) = hodConfig.verify()
- if not status:
- print >>sys.stderr,"error: bin/hod failed to start."
- for msg in statusMsgs:
- print >>sys.stderr,"%s" % (msg)
- sys.exit(1)
-
- ## TODO : should move the dependency verification to hodConfig.verify
- if hodConfig['hod'].has_key('operation') and
- hodConfig['hod'].has_key('script'):
- print "Script operation is mutually exclusive with other HOD operations"
- hodOptions.print_help(sys.stderr)
- sys.exit(1)
-
- if 'operation' not in hodConfig['hod'] and 'script' not in hodConfig['hod']:
- print "HOD requires at least a script or operation be specified."
- hodOptions.print_help(sys.stderr)
- sys.exit(1)
-
- if hodConfig['gridservice-hdfs']['external']:
- hdfsAddress = "%s:%s" % (hodConfig['gridservice-hdfs']['host'],
- hodConfig['gridservice-hdfs']['fs_port'])
-
- hdfsSocket = tcpSocket(hdfsAddress)
-
- try:
- hdfsSocket.open()
- hdfsSocket.close()
- except tcpError:
- printErrors(hodConfig.var_error('hod', 'gridservice-hdfs',
- "Failed to open a connection to external hdfs address: %s." %
- hdfsAddress))
- sys.exit(1)
- else:
- hodConfig['gridservice-hdfs']['host'] = 'localhost'
-
- if hodConfig['gridservice-mapred']['external']:
- mapredAddress = "%s:%s" % (hodConfig['gridservice-mapred']['host'],
- hodConfig['gridservice-mapred']['tracker_port'])
-
- mapredSocket = tcpSocket(mapredAddress)
-
- try:
- mapredSocket.open()
- mapredSocket.close()
- except tcpError:
- printErrors(hodConfig.var_error('hod', 'gridservice-mapred',
- "Failed to open a connection to external mapred address: %s." %
- mapredAddress))
- sys.exit(1)
- else:
- hodConfig['gridservice-mapred']['host'] = 'localhost'
-
- if not hodConfig['ringmaster'].has_key('hadoop-tar-ball') and
- not hodConfig['gridservice-hdfs'].has_key('pkgs') and
- op_requires_pkgs(hodConfig):
- printErrors(hodConfig.var_error('gridservice-hdfs', 'pkgs',
- "gridservice-hdfs.pkgs must be defined if ringmaster.hadoop-tar-ball "
- + "is not defined."))
- sys.exit(1)
-
- if not hodConfig['ringmaster'].has_key('hadoop-tar-ball') and
- not hodConfig['gridservice-mapred'].has_key('pkgs') and
- op_requires_pkgs(hodConfig):
- printErrors(hodConfig.var_error('gridservice-mapred', 'pkgs',
- "gridservice-mapred.pkgs must be defined if ringmaster.hadoop-tar-ball "
- + "is not defined."))
- sys.exit(1)
-
- if hodConfig['hodring'].has_key('log-destination-uri'):
- if hodConfig['hodring']['log-destination-uri'].startswith('file://'):
- pass
- elif hodConfig['hodring']['log-destination-uri'].startswith('hdfs://'):
- hostPort = hodConfig['hodring']['log-destination-uri'][7:].split("/")
- hostPort = hostPort[0]
- socket = tcpSocket(hostPort)
- try:
- socket.open()
- socket.close()
- except:
- printErrors(hodConfig.var_error('hodring', 'log-destination-uri',
- "Unable to contact host/port specified in log destination uri: %s" %
- hodConfig['hodring']['log-destination-uri']))
- sys.exit(1)
- else:
- printErrors(hodConfig.var_error('hodring', 'log-destination-uri',
- "The log destiniation uri must be of type local:// or hdfs://."))
- sys.exit(1)
-
- if hodConfig['ringmaster']['workers_per_ring'] < 1:
- printErrors(hodConfig.var_error('ringmaster', 'workers_per_ring',
- "ringmaster.workers_per_ring must be a positive integer " +
- "greater than or equal to 1"))
- sys.exit(1)
-
- ## TODO : end of should move the dependency verification to hodConfig.verif
-
- hodConfig['hod']['base-dir'] = rootDirectory
- hodConfig['hod']['user_state'] = DEFAULT_HOD_DIR
-
- dGen = DescGenerator(hodConfig)
- hodConfig = dGen.initializeDesc()
-
- os.environ['JAVA_HOME'] = hodConfig['hod']['java-home']
-
- if hodConfig['hod']['debug'] == 4:
- print ""
- print "Using Python: %s" % sys.version
- print ""
-
- hod = hodRunner(hodConfig)
-
- # Initiate signal handling
- hodInterrupt.set_log(hod.get_logger())
- hodInterrupt.init_signals()
- # Interrupts set up. Now on we handle signals only when we wish to.
- except KeyboardInterrupt:
- print HOD_INTERRUPTED_MESG
- sys.exit(HOD_INTERRUPTED_CODE)
-
- opCode = 0
- try:
- if hodConfig['hod'].has_key('script'):
- opCode = hod.script()
- else:
- opCode = hod.operation()
- except Exception, e:
- print "Uncaught Exception : %s" % e
- finally:
- sys.exit(opCode)