serviceRegistry.py
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:5k
源码类别:

网格计算

开发平台:

Java

  1. #Licensed to the Apache Software Foundation (ASF) under one
  2. #or more contributor license agreements.  See the NOTICE file
  3. #distributed with this work for additional information
  4. #regarding copyright ownership.  The ASF licenses this file
  5. #to you under the Apache License, Version 2.0 (the
  6. #"License"); you may not use this file except in compliance
  7. #with the License.  You may obtain a copy of the License at
  8. #     http://www.apache.org/licenses/LICENSE-2.0
  9. #Unless required by applicable law or agreed to in writing, software
  10. #distributed under the License is distributed on an "AS IS" BASIS,
  11. #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. #See the License for the specific language governing permissions and
  13. #limitations under the License.
  14. import sys, time, socket, threading, copy, pprint
  15. from hodlib.Common.hodsvc import hodBaseService
  16. from hodlib.Common.threads import loop
  17. from hodlib.Common.tcp import tcpSocket
  18. from hodlib.Common.util import get_exception_string
  19. import logging
  20. class svcrgy(hodBaseService):
  21.     def __init__(self, config, log=None):
  22.         hodBaseService.__init__(self, 'serviceRegistry', config)
  23.         
  24.         self.__serviceDict = {}
  25.         self.__failCount = {}
  26.         self.__released = {}
  27.         self.__locked = {}
  28.         
  29.         self.__serviceDictLock = threading.Lock()
  30.         self.RMErrorMsgs = None # Ringmaster error messages
  31.         self.log = log
  32.         if self.log is None:
  33.           self.log = logging.getLogger()
  34.     
  35.     def __get_job_key(self, userid, job):
  36.         return "%s-%s" % (userid, job)
  37.     
  38.     def _xr_method_registerService(self, userid, job, host, name, type, dict):
  39.        return self.registerService(userid, job, host, name, type, dict)
  40.     
  41.     def _xr_method_getServiceInfo(self, userid=None, job=None, name=None, 
  42.                                   type=None):
  43.         return self.getServiceInfo(userid, job, name, type)
  44.     def _xr_method_setRMError(self, args):
  45.         self.log.debug("setRMError called with %s" % args)
  46.         self.RMErrorMsgs = args
  47.         return True
  48.     def _xr_method_getRMError(self):
  49.         self.log.debug("getRMError called")
  50.         if self.RMErrorMsgs is not None:
  51.           return self.RMErrorMsgs
  52.         else:
  53.           self.log.debug("no Ringmaster error messages")
  54.           return False
  55.     def registerService(self, userid, job, host, name, type, dict):
  56.         """Method thats called upon by
  57.         the ringmaster to register to the
  58.         the service registry"""
  59.         lock = self.__serviceDictLock
  60.         lock.acquire()
  61.         try:
  62.             self.logs['main'].debug("Registering %s.%s.%s.%s.%s..." % (
  63.                                     userid, job, host, name, type))    
  64.             id = "%s.%s" % (name, type) 
  65.    
  66.             if userid in self.__serviceDict:
  67.                 if job in self.__serviceDict[userid]:
  68.                      if host in self.__serviceDict[userid][job]:
  69.                           self.__serviceDict[userid][job][host].append(
  70.                               {id : dict,})
  71.                      else:
  72.                         self.__serviceDict[userid][job][host] = [
  73.                             {id : dict,},] 
  74.                 else:
  75.                     self.__serviceDict[userid][job] = {host : [
  76.                                                        { id : dict,},]}
  77.             else:    
  78.                 self.__serviceDict[userid] = {job : {host : [
  79.                                                      { id : dict,},]}}
  80.         finally:
  81.             lock.release()
  82.             
  83.         return True
  84.     
  85.     def getXMLRPCAddr(self):
  86.       """return the xml rpc server address"""
  87.       return self._xrc.server_address
  88.     
  89.     def getServiceInfo(self, userid=None, job=None, name=None, type=None):
  90.         """This method is called upon by others
  91.         to query for a particular service returns
  92.         a dictionary of elements"""
  93.         
  94.         self.logs['main'].debug("inside getServiceInfo: %s.%s.%s" % (userid, job, name))
  95.         retdict = {}
  96.         lock = self.__serviceDictLock
  97.         lock.acquire()
  98.         try:
  99.             if userid in self.__serviceDict:
  100.                 if job in self.__serviceDict[userid]:
  101.                     if name and type:
  102.                         retdict = []
  103.                         id = "%s.%s" % (name, type)
  104.                         for host in self.__serviceDict[userid][job]:
  105.                             for dict in self.__serviceDict[userid][job][host]:
  106.                                 [loopID, ] = dict.keys()
  107.                                 if loopID == id:
  108.                                     retdict.append(dict[id])
  109.                     else:
  110.                         retdict = copy.deepcopy(
  111.                             self.__serviceDict[userid][job])
  112.                 elif not job:
  113.                     retdict = copy.deepcopy(self.__serviceDict[userid])
  114.             elif not userid:
  115.                 retdict = copy.deepcopy(self.__serviceDict)
  116.         finally:
  117.             lock.release()
  118.         
  119.         return retdict