hodsvc.py
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:8k
源码类别:

网格计算

开发平台:

Java

  1. #Licensed to the Apache Software Foundation (ASF) under one
  2. #or more contributor license agreements.  See the NOTICE file
  3. #distributed with this work for additional information
  4. #regarding copyright ownership.  The ASF licenses this file
  5. #to you under the Apache License, Version 2.0 (the
  6. #"License"); you may not use this file except in compliance
  7. #with the License.  You may obtain a copy of the License at
  8. #     http://www.apache.org/licenses/LICENSE-2.0
  9. #Unless required by applicable law or agreed to in writing, software
  10. #distributed under the License is distributed on an "AS IS" BASIS,
  11. #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. #See the License for the specific language governing permissions and
  13. #limitations under the License.
  14. # $Id:setup.py 5158 2007-04-09 00:14:35Z zim $
  15. #
  16. #------------------------------------------------------------------------------
  17. import os, time, shutil, xmlrpclib, socket, pprint
  18. from signal import *
  19. from hodlib.Common.logger import hodLog, hodDummyLogger
  20. from hodlib.Common.socketServers import hodXMLRPCServer
  21. from hodlib.Common.util import local_fqdn
  22. from hodlib.Common.xmlrpc import hodXRClient
  23. class hodBaseService:
  24.   """hodBaseService class - This class provides service registration, logging,
  25.      and configuration access methods.  It also provides an XML-RPC server.
  26.      This class should be extended to create hod services.  Methods beginning
  27.      with _xr_method will automatically be added to instances of this class.
  28.      """
  29.   def __init__(self, name, config, xrtype='threaded'):
  30.     """ Initialization requires a name string and a config object of type
  31.         hodlib.Common.setup.options or hodlib.Common.setup.config."""
  32.         
  33.     self.name = name
  34.     self.hostname = local_fqdn()
  35.     self._cfg = config
  36.     self._xrc = None
  37.     self.logs = {}
  38.     self._baseLogger = None
  39.     self._serviceID = os.getenv('PBS_JOBID')
  40.         
  41.     self.__logDir = None
  42.     self.__svcrgy = None
  43.     self.__stop = False
  44.     self.__xrtype = xrtype
  45.     
  46.     self._init_logging()
  47.         
  48.     if name != 'serviceRegistry': self._init_signals()
  49.     self._init_xrc_server()
  50.     
  51.   def __set_logging_level(self, level):
  52.     self.logs['main'].info("Setting log level to %s." % level)
  53.     for loggerName in self.loggers.keys():
  54.       self.logs['main'].set_logger_level(loggerName, level)
  55.   def __get_logging_level(self):
  56.     if self._cfg.has_key('stream'):
  57.       return self.loggers['main'].get_level('stream', 'main')
  58.     elif self._cfg.has_key('log-dir'):
  59.       return self.loggers['main'].get_level('file', 'main')
  60.     else:
  61.       return 0
  62.   
  63.   def _xr_method_stop(self, *args):
  64.     """XML-RPC method, calls stop() on ourselves."""
  65.     
  66.     return self.stop()
  67.   
  68.   def _xr_method_status(self, *args):
  69.     """XML-RPC method, calls status() on ourselves."""
  70.     
  71.     return self.status()
  72.   
  73.   def _init_logging(self):
  74.     if self._cfg.has_key('debug'):
  75.       if self._cfg['debug'] > 0:
  76.         self._baseLogger = hodLog(self.name)
  77.         self.logs['main'] = self._baseLogger.add_logger('main')
  78.         
  79.         if self._cfg.has_key('stream'):
  80.           if self._cfg['stream']:
  81.             self._baseLogger.add_stream(level=self._cfg['debug'], 
  82.                                  addToLoggerNames=('main',))
  83.             
  84.         if self._cfg.has_key('log-dir'):
  85.           if self._serviceID:
  86.               self.__logDir = os.path.join(self._cfg['log-dir'], "%s.%s" % (
  87.                                        self._cfg['userid'], self._serviceID))
  88.           else:
  89.               self.__logDir = os.path.join(self._cfg['log-dir'], 
  90.                                            self._cfg['userid'])
  91.           if not os.path.exists(self.__logDir):
  92.             os.mkdir(self.__logDir)
  93.             
  94.           self._baseLogger.add_file(logDirectory=self.__logDir, 
  95.             level=self._cfg['debug'], addToLoggerNames=('main',))
  96.           
  97.         if self._cfg.has_key('syslog-address'):
  98.           self._baseLogger.add_syslog(self._cfg['syslog-address'], 
  99.             level=self._cfg['debug'], addToLoggerNames=('main',))
  100.         
  101.         if not self.logs.has_key('main'):
  102.           self.logs['main'] = hodDummyLogger()
  103.       else:
  104.         self.logs['main'] = hodDummyLogger()
  105.     else:
  106.       self.logs['main'] = hodDummyLogger()
  107.   
  108.   def _init_signals(self):
  109.     def sigStop(sigNum, handler):
  110.       self.sig_wrapper(sigNum, self.stop)
  111.     def toggleLevel():
  112.       currentLevel = self.__get_logging_level()
  113.       if currentLevel == 4:
  114.         self.__set_logging_level(1)
  115.       else:
  116.         self.__set_logging_level(currentLevel + 1)
  117.     def sigStop(sigNum, handler):
  118.       self._sig_wrapper(sigNum, self.stop)
  119.     def sigDebug(sigNum, handler):
  120.       self.sig_wrapper(sigNum, toggleLevel)
  121.     signal(SIGTERM, sigStop)
  122.     signal(SIGQUIT, sigStop)
  123.     signal(SIGINT, sigStop)
  124.     signal(SIGUSR2, sigDebug)
  125.   def _sig_wrapper(self, sigNum, handler, *args):
  126.     self.logs['main'].info("Caught signal %s." % sigNum)
  127.     if args:
  128.         handler(args)
  129.     else:
  130.         handler()
  131.   
  132.   def _init_xrc_server(self):
  133.     host = None
  134.     ports = None
  135.     if self._cfg.has_key('xrs-address'):
  136.       (host, port) = (self._cfg['xrs-address'][0], self._cfg['xrs-address'][1])
  137.       ports = (port,)
  138.     elif self._cfg.has_key('xrs-port-range'):
  139.       host = ''
  140.       ports = self._cfg['xrs-port-range']
  141.     
  142.     if host != None:  
  143.       if self.__xrtype == 'threaded':
  144.         self._xrc = hodXMLRPCServer(host, ports)
  145.       elif self.__xrtype == 'twisted':
  146.         try:
  147.           from socketServers import twistedXMLRPCServer
  148.           self._xrc = twistedXMLRPCServer(host, ports, self.logs['main'])
  149.         except ImportError:
  150.           self.logs['main'].error("Twisted XML-RPC server not available, "
  151.                                   + "falling back on threaded server.")
  152.           self._xrc = hodXMLRPCServer(host, ports)
  153.       for attr in dir(self):
  154.         if attr.startswith('_xr_method_'):
  155.           self._xrc.register_function(getattr(self, attr),
  156.                                       attr[11:])
  157.     
  158.       self._xrc.register_introspection_functions()
  159.   
  160.   def _register_service(self, port=None, installSignalHandlers=1):
  161.     if self.__svcrgy:
  162.       self.logs['main'].info(
  163.           "Registering service with service registery %s... " % self.__svcrgy)
  164.       svcrgy = hodXRClient(self.__svcrgy, None, None, 0, 0, installSignalHandlers)
  165.       
  166.       if self._xrc and self._http:
  167.         svcrgy.registerService(self._cfg['userid'], self._serviceID, 
  168.                                self.hostname, self.name, 'hod', {
  169.                                'xrs' : "http://%s:%s" % (
  170.                                self._xrc.server_address[0], 
  171.                                self._xrc.server_address[1]),'http' : 
  172.                                "http://%s:%s" % (self._http.server_address[0], 
  173.                                self._http.server_address[1])})
  174.       elif self._xrc:
  175.         svcrgy.registerService(self._cfg['userid'], self._serviceID, 
  176.                                self.hostname, self.name, 'hod', {
  177.                                'xrs' : "http://%s:%s" % (
  178.                                self._xrc.server_address[0], 
  179.                                self._xrc.server_address[1]),})
  180.       elif self._http:
  181.         svcrgy.registerService(self._cfg['userid'], self._serviceID, 
  182.                                self.hostname, self.name, 'hod', {'http' : 
  183.                                "http://%s:%s" % (self._http.server_address[0], 
  184.                                self._http.server_address[1]),})        
  185.       else:
  186.         svcrgy.registerService(self._cfg['userid'], self._serviceID, 
  187.                                self.hostname, name, 'hod', {} )
  188.   
  189.   def start(self):
  190.     """ Start XML-RPC server and register service."""
  191.     
  192.     self.logs['main'].info("Starting HOD service: %s ..." % self.name)
  193.     if self._xrc: self._xrc.serve_forever()
  194.     if self._cfg.has_key('register') and self._cfg['register']:
  195.         self._register_service()
  196.   
  197.   def stop(self):
  198.     """ Stop XML-RPC server, unregister service and set stop flag. """
  199.     
  200.     self.logs['main'].info("Stopping service...")
  201.     if self._xrc: self._xrc.stop()
  202.     self.__stop = True
  203.   
  204.     return True
  205.   
  206.   def status(self):
  207.     """Returns true, should be overriden."""
  208.     
  209.     return True
  210.   
  211.   def wait(self):
  212.     """Wait until stop method is called."""
  213.     
  214.     while not self.__stop:
  215.       time.sleep(.1)