hodsvc.py
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:8k
- #Licensed to the Apache Software Foundation (ASF) under one
- #or more contributor license agreements. See the NOTICE file
- #distributed with this work for additional information
- #regarding copyright ownership. The ASF licenses this file
- #to you under the Apache License, Version 2.0 (the
- #"License"); you may not use this file except in compliance
- #with the License. You may obtain a copy of the License at
- # http://www.apache.org/licenses/LICENSE-2.0
- #Unless required by applicable law or agreed to in writing, software
- #distributed under the License is distributed on an "AS IS" BASIS,
- #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- #See the License for the specific language governing permissions and
- #limitations under the License.
- # $Id:setup.py 5158 2007-04-09 00:14:35Z zim $
- #
- #------------------------------------------------------------------------------
- import os, time, shutil, xmlrpclib, socket, pprint
- from signal import *
- from hodlib.Common.logger import hodLog, hodDummyLogger
- from hodlib.Common.socketServers import hodXMLRPCServer
- from hodlib.Common.util import local_fqdn
- from hodlib.Common.xmlrpc import hodXRClient
- class hodBaseService:
- """hodBaseService class - This class provides service registration, logging,
- and configuration access methods. It also provides an XML-RPC server.
- This class should be extended to create hod services. Methods beginning
- with _xr_method will automatically be added to instances of this class.
- """
- def __init__(self, name, config, xrtype='threaded'):
- """ Initialization requires a name string and a config object of type
- hodlib.Common.setup.options or hodlib.Common.setup.config."""
-
- self.name = name
- self.hostname = local_fqdn()
- self._cfg = config
- self._xrc = None
- self.logs = {}
- self._baseLogger = None
- self._serviceID = os.getenv('PBS_JOBID')
-
- self.__logDir = None
- self.__svcrgy = None
- self.__stop = False
- self.__xrtype = xrtype
-
- self._init_logging()
-
- if name != 'serviceRegistry': self._init_signals()
- self._init_xrc_server()
-
- def __set_logging_level(self, level):
- self.logs['main'].info("Setting log level to %s." % level)
- for loggerName in self.loggers.keys():
- self.logs['main'].set_logger_level(loggerName, level)
- def __get_logging_level(self):
- if self._cfg.has_key('stream'):
- return self.loggers['main'].get_level('stream', 'main')
- elif self._cfg.has_key('log-dir'):
- return self.loggers['main'].get_level('file', 'main')
- else:
- return 0
-
- def _xr_method_stop(self, *args):
- """XML-RPC method, calls stop() on ourselves."""
-
- return self.stop()
-
- def _xr_method_status(self, *args):
- """XML-RPC method, calls status() on ourselves."""
-
- return self.status()
-
- def _init_logging(self):
- if self._cfg.has_key('debug'):
- if self._cfg['debug'] > 0:
- self._baseLogger = hodLog(self.name)
- self.logs['main'] = self._baseLogger.add_logger('main')
-
- if self._cfg.has_key('stream'):
- if self._cfg['stream']:
- self._baseLogger.add_stream(level=self._cfg['debug'],
- addToLoggerNames=('main',))
-
- if self._cfg.has_key('log-dir'):
- if self._serviceID:
- self.__logDir = os.path.join(self._cfg['log-dir'], "%s.%s" % (
- self._cfg['userid'], self._serviceID))
- else:
- self.__logDir = os.path.join(self._cfg['log-dir'],
- self._cfg['userid'])
- if not os.path.exists(self.__logDir):
- os.mkdir(self.__logDir)
-
- self._baseLogger.add_file(logDirectory=self.__logDir,
- level=self._cfg['debug'], addToLoggerNames=('main',))
-
- if self._cfg.has_key('syslog-address'):
- self._baseLogger.add_syslog(self._cfg['syslog-address'],
- level=self._cfg['debug'], addToLoggerNames=('main',))
-
- if not self.logs.has_key('main'):
- self.logs['main'] = hodDummyLogger()
- else:
- self.logs['main'] = hodDummyLogger()
- else:
- self.logs['main'] = hodDummyLogger()
-
- def _init_signals(self):
- def sigStop(sigNum, handler):
- self.sig_wrapper(sigNum, self.stop)
- def toggleLevel():
- currentLevel = self.__get_logging_level()
- if currentLevel == 4:
- self.__set_logging_level(1)
- else:
- self.__set_logging_level(currentLevel + 1)
- def sigStop(sigNum, handler):
- self._sig_wrapper(sigNum, self.stop)
- def sigDebug(sigNum, handler):
- self.sig_wrapper(sigNum, toggleLevel)
- signal(SIGTERM, sigStop)
- signal(SIGQUIT, sigStop)
- signal(SIGINT, sigStop)
- signal(SIGUSR2, sigDebug)
- def _sig_wrapper(self, sigNum, handler, *args):
- self.logs['main'].info("Caught signal %s." % sigNum)
- if args:
- handler(args)
- else:
- handler()
-
- def _init_xrc_server(self):
- host = None
- ports = None
- if self._cfg.has_key('xrs-address'):
- (host, port) = (self._cfg['xrs-address'][0], self._cfg['xrs-address'][1])
- ports = (port,)
- elif self._cfg.has_key('xrs-port-range'):
- host = ''
- ports = self._cfg['xrs-port-range']
-
- if host != None:
- if self.__xrtype == 'threaded':
- self._xrc = hodXMLRPCServer(host, ports)
- elif self.__xrtype == 'twisted':
- try:
- from socketServers import twistedXMLRPCServer
- self._xrc = twistedXMLRPCServer(host, ports, self.logs['main'])
- except ImportError:
- self.logs['main'].error("Twisted XML-RPC server not available, "
- + "falling back on threaded server.")
- self._xrc = hodXMLRPCServer(host, ports)
- for attr in dir(self):
- if attr.startswith('_xr_method_'):
- self._xrc.register_function(getattr(self, attr),
- attr[11:])
-
- self._xrc.register_introspection_functions()
-
- def _register_service(self, port=None, installSignalHandlers=1):
- if self.__svcrgy:
- self.logs['main'].info(
- "Registering service with service registery %s... " % self.__svcrgy)
- svcrgy = hodXRClient(self.__svcrgy, None, None, 0, 0, installSignalHandlers)
-
- if self._xrc and self._http:
- svcrgy.registerService(self._cfg['userid'], self._serviceID,
- self.hostname, self.name, 'hod', {
- 'xrs' : "http://%s:%s" % (
- self._xrc.server_address[0],
- self._xrc.server_address[1]),'http' :
- "http://%s:%s" % (self._http.server_address[0],
- self._http.server_address[1])})
- elif self._xrc:
- svcrgy.registerService(self._cfg['userid'], self._serviceID,
- self.hostname, self.name, 'hod', {
- 'xrs' : "http://%s:%s" % (
- self._xrc.server_address[0],
- self._xrc.server_address[1]),})
- elif self._http:
- svcrgy.registerService(self._cfg['userid'], self._serviceID,
- self.hostname, self.name, 'hod', {'http' :
- "http://%s:%s" % (self._http.server_address[0],
- self._http.server_address[1]),})
- else:
- svcrgy.registerService(self._cfg['userid'], self._serviceID,
- self.hostname, name, 'hod', {} )
-
- def start(self):
- """ Start XML-RPC server and register service."""
-
- self.logs['main'].info("Starting HOD service: %s ..." % self.name)
- if self._xrc: self._xrc.serve_forever()
- if self._cfg.has_key('register') and self._cfg['register']:
- self._register_service()
-
- def stop(self):
- """ Stop XML-RPC server, unregister service and set stop flag. """
-
- self.logs['main'].info("Stopping service...")
- if self._xrc: self._xrc.stop()
- self.__stop = True
-
- return True
-
- def status(self):
- """Returns true, should be overriden."""
-
- return True
-
- def wait(self):
- """Wait until stop method is called."""
-
- while not self.__stop:
- time.sleep(.1)