service.py
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:8k
源码类别:

网格计算

开发平台:

Java

  1. #Licensed to the Apache Software Foundation (ASF) under one
  2. #or more contributor license agreements.  See the NOTICE file
  3. #distributed with this work for additional information
  4. #regarding copyright ownership.  The ASF licenses this file
  5. #to you under the Apache License, Version 2.0 (the
  6. #"License"); you may not use this file except in compliance
  7. #with the License.  You may obtain a copy of the License at
  8. #     http://www.apache.org/licenses/LICENSE-2.0
  9. #Unless required by applicable law or agreed to in writing, software
  10. #distributed under the License is distributed on an "AS IS" BASIS,
  11. #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. #See the License for the specific language governing permissions and
  13. #limitations under the License.
  14. """defines Service as abstract interface"""
  15. # -*- python -*-
  16. import random, socket
  17. class Service:
  18.   """ the service base class that all the 
  19.   other services inherit from. """
  20.   def __init__(self, serviceDesc, workDirs):
  21.     self.serviceDesc = serviceDesc
  22.     self.workDirs = workDirs
  23.   def getName(self):
  24.     return self.serviceDesc.getName()
  25.   def getInfoAddrs(self):
  26.     """Return a list of addresses that provide 
  27.     information about the servie"""
  28.     return []
  29.   def isLost(self):
  30.     """True if the service is down"""
  31.     raise NotImplementedError
  32.   def addNodes(self, nodeList):
  33.     """add nodeSet"""
  34.     raise NotImplementedError
  35.   def removeNodes(self, nodeList):
  36.     """remove a nodeset"""
  37.     raise NotImplementedError
  38.   def getWorkers(self):
  39.      raise NotImplementedError
  40.   def needsMore(self):
  41.     """return number of nodes the service wants to add"""
  42.     raise NotImplementedError
  43.   def needsLess(self):
  44.     """return number of nodes the service wants to remove"""
  45.     raise NotImplementedError
  46. class MasterSlave(Service):
  47.   """ the base class for a master slave 
  48.   service architecture. """
  49.   def __init__(self, serviceDesc, workDirs,requiredNode):
  50.     Service.__init__(self, serviceDesc, workDirs)
  51.     self.launchedMaster = False
  52.     self.masterInitialized = False
  53.     self.masterAddress = 'none'
  54.     self.requiredNode = requiredNode
  55.     self.failedMsg = None
  56.     self.masterFailureCount = 0
  57.   def getRequiredNode(self):
  58.     return self.requiredNode
  59.  
  60.   def getMasterRequest(self):
  61.     """ the number of master you need
  62.     to run for this service. """
  63.     raise NotImplementedError
  64.   
  65.   def isLaunchable(self, serviceDict):
  66.     """ if your service does not depend on
  67.     other services. is set to true by default. """
  68.     return True
  69.   
  70.   def getMasterCommands(self, serviceDict):
  71.     """ a list of master commands you 
  72.     want to run for this service. """
  73.     raise NotImplementedError
  74.   def getAdminCommands(self, serviceDict):
  75.     """ a list of admin commands you 
  76.     want to run for this service. """
  77.     raise NotImplementedError
  78.   def getWorkerCommands(self, serviceDict):
  79.     """ a list of worker commands you want to 
  80.     run for this service. """
  81.     raise NotImplementedError
  82.   def setMasterNodes(self, list):
  83.     """ set the status of master nodes 
  84.     after they start running on a node cluster. """
  85.     raise NotImplementedError
  86.   def addNodes(self, list):
  87.     """ add nodes to a service. Not implemented
  88.     currently. """
  89.     raise NotImplementedError
  90.   def getMasterAddrs(self):
  91.     """ return the addresses of master. the 
  92.     hostname:port to which worker nodes should
  93.     connect. """
  94.     raise NotImplementedError
  95.   
  96.   def setMasterParams(self, list):
  97.     """ set the various master params 
  98.     depending on what each hodring set 
  99.     the master params to. """
  100.     raise NotImplementedError
  101.   def setlaunchedMaster(self):
  102.     """ set the status of master launched
  103.     to true. """
  104.     self.launchedMaster = True
  105.   def isMasterLaunched(self):
  106.     """ return if a master has been launched
  107.     for the service or not. """
  108.     return self.launchedMaster
  109.   def isMasterInitialized(self):
  110.     """ return if a master if launched 
  111.     has been initialized or not. """
  112.     return self.masterInitialized
  113.   def setMasterInitialized(self):
  114.     """ set the master initialized to
  115.     true. """
  116.     self.masterInitialized = True
  117.     # Reset failure related variables, as master is initialized successfully.
  118.     self.masterFailureCount = 0
  119.     self.failedMsg = None
  120.   def getMasterAddress(self):
  121.     """ it needs to change to reflect 
  122.     more that one masters. Currently it 
  123.     keeps a knowledge of where the master 
  124.     was launched and to keep track if it was actually
  125.     up or not. """
  126.     return self.masterAddress
  127.   def setMasterAddress(self, addr):
  128.     self.masterAddress = addr
  129.   def isExternal(self):
  130.     return self.serviceDesc.isExternal()
  131.   def setMasterFailed(self, err):
  132.     """Sets variables related to Master failure"""
  133.     self.masterFailureCount += 1
  134.     self.failedMsg = err
  135.     # When command is sent to HodRings, this would have been set to True.
  136.     # Reset it to reflect the correct status.
  137.     self.launchedMaster = False
  138.   def getMasterFailed(self):
  139.     return self.failedMsg
  140.  
  141.   def getMasterFailureCount(self):
  142.     return self.masterFailureCount
  143.  
  144. class NodeRequest:
  145.   """ A class to define 
  146.   a node request. """
  147.   def __init__(self, n, required = [], preferred = [], isPreemptee = True):
  148.     self.numNodes = n
  149.     self.preferred = preferred
  150.     self.isPreemptee = isPreemptee
  151.     self.required = required
  152.   def setNumNodes(self, n):
  153.     self.numNodes = n
  154.   def setPreferredList(self, list):
  155.     self.preferred = list
  156.   def setIsPreemptee(self, flag):
  157.     self.isPreemptee = flag
  158. class ServiceUtil:
  159.   """ this class should be moved out of 
  160.   service.py to a util file"""
  161.   localPortUsed = {}
  162.     
  163.   def getUniqRandomPort(h=None, low=50000, high=60000, retry=900, log=None):
  164.     """This allocates a randome free port between low and high"""
  165.     # We use a default value of 900 retries, which takes an agreeable
  166.     # time limit of ~ 6.2 seconds to check 900 ports, in the worse case
  167.     # of no available port in those 900.
  168.     while retry > 0:
  169.       n = random.randint(low, high)
  170.       if n in ServiceUtil.localPortUsed:
  171.         continue
  172.       s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  173.       if not h:
  174.         h = socket.gethostname()
  175.       avail = False
  176.       if log: log.debug("Trying to see if port %s is available"% n)
  177.       try:
  178.         s.bind((h, n))
  179.         if log: log.debug("Yes, port %s is available" % n)
  180.         avail = True
  181.       except socket.error,e:
  182.         if log: log.debug("Could not bind to the port %s. Reason %s" % (n,e))
  183.         retry -= 1
  184.         pass
  185.       # The earlier code that used to be here had syntax errors. The code path
  186.       # couldn't be followd anytime, so the error remained uncaught.
  187.       # This time I stumbled upon the error
  188.       s.close()
  189.       if avail:
  190.         ServiceUtil.localPortUsed[n] = True
  191.         return n
  192.     raise ValueError, "Can't find unique local port between %d and %d" % (low, high)
  193.   
  194.   getUniqRandomPort = staticmethod(getUniqRandomPort)
  195.   
  196.   def getUniqPort(h=None, low=40000, high=60000, retry=900, log=None):
  197.     """get unique port on a host that can be used by service
  198.     This and its consumer code should disappear when master
  199.     nodes get allocatet by nodepool"""
  200.     # We use a default value of 900 retries, which takes an agreeable
  201.     # time limit of ~ 6.2 seconds to check 900 ports, in the worse case
  202.     # of no available port in those 900.
  203.     n  = low
  204.     while retry > 0:
  205.       n = n + 1
  206.       if n in ServiceUtil.localPortUsed:
  207.         continue
  208.       s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  209.       if not h:
  210.         h = socket.gethostname()
  211.       avail = False
  212.       if log: log.debug("Trying to see if port %s is available"% n)
  213.       try:
  214.         s.bind((h, n))
  215.         if log: log.debug("Yes, port %s is available" % n)
  216.         avail = True
  217.       except socket.error,e:
  218.         if log: log.debug("Could not bind to the port %s. Reason %s" % (n,e))
  219.         retry -= 1
  220.         pass
  221.       s.close()
  222.       if avail:
  223.         ServiceUtil.localPortUsed[n] = True
  224.         return n
  225.     raise ValueError, "Can't find unique local port between %d and %d" % (low, high)
  226.   getUniqPort = staticmethod(getUniqPort)