ringMaster.py
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:35k
源码类别:

网格计算

开发平台:

Java

  1. #Licensed to the Apache Software Foundation (ASF) under one
  2. #or more contributor license agreements.  See the NOTICE file
  3. #distributed with this work for additional information
  4. #regarding copyright ownership.  The ASF licenses this file
  5. #to you under the Apache License, Version 2.0 (the
  6. #"License"); you may not use this file except in compliance
  7. #with the License.  You may obtain a copy of the License at
  8. #     http://www.apache.org/licenses/LICENSE-2.0
  9. #Unless required by applicable law or agreed to in writing, software
  10. #distributed under the License is distributed on an "AS IS" BASIS,
  11. #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. #See the License for the specific language governing permissions and
  13. #limitations under the License.
  14. #!/usr/bin/env python
  15. """manages services and nodepool"""
  16. # -*- python -*-
  17. import os, sys, random, time, sets, shutil, threading
  18. import urllib, urlparse, re, getpass, pprint, signal, shutil
  19. from pprint import pformat
  20. from HTMLParser import HTMLParser
  21. binfile = sys.path[0]
  22. libdir = os.path.dirname(binfile)
  23. sys.path.append(libdir)
  24. import hodlib.Common.logger
  25. from hodlib.RingMaster.idleJobTracker import JobTrackerMonitor, HadoopJobStatus
  26. from hodlib.Common.threads import func 
  27. from hodlib.Hod.nodePool import *
  28. from hodlib.Common.util import *
  29. from hodlib.Common.nodepoolutil import NodePoolUtil
  30. from hodlib.Common.socketServers import hodXMLRPCServer
  31. from hodlib.Common.socketServers import threadedHTTPServer
  32. from hodlib.NodePools import *
  33. from hodlib.NodePools.torque import *
  34. from hodlib.GridServices import *
  35. from hodlib.Common.descGenerator import *
  36. from hodlib.Common.xmlrpc import hodXRClient
  37. from hodlib.Common.miniHTMLParser import miniHTMLParser
  38. from hodlib.Common.threads import simpleCommand
  39. class ringMasterServer:
  40.   """The RPC server that exposes all the master config
  41.   changes. Also, one of these RPC servers runs as a proxy
  42.   and all the hodring instances register with this proxy"""
  43.   instance = None
  44.   xmlrpc = None
  45.   
  46.   def __init__(self, cfg, log, logMasterSources, retry=5):
  47.     try:
  48.       from hodlib.Common.socketServers import twistedXMLRPCServer
  49.       ringMasterServer.xmlrpc = twistedXMLRPCServer("", 
  50.         cfg['ringmaster']['xrs-port-range'])
  51.     except ImportError:
  52.       log.info("Twisted interface not found. Using hodXMLRPCServer.")
  53.       ringMasterServer.xmlrpc = hodXMLRPCServer("", 
  54.         cfg['ringmaster']['xrs-port-range'])
  55.     ringMasterServer.xmlrpc.register_instance(logMasterSources)
  56.     self.logMasterSources = logMasterSources
  57.     ringMasterServer.xmlrpc.serve_forever()
  58.         
  59.     while not ringMasterServer.xmlrpc.is_alive():
  60.       time.sleep(.5)
  61.           
  62.     log.debug('Ringmaster RPC Server at %d' % 
  63.                  ringMasterServer.xmlrpc.server_address[1])
  64.     
  65.   def startService(ss, cfg, np, log, rm):
  66.     logMasterSources = _LogMasterSources(ss, cfg, np, log, rm)
  67.     ringMasterServer.instance = ringMasterServer(cfg, log, logMasterSources)
  68.   def stopService():
  69.     ringMasterServer.xmlrpc.stop()
  70.   
  71.   def getPort():
  72.     return ringMasterServer.instance.port
  73.   def getAddress():
  74.     return 'http://%s:%d/' % (socket.gethostname(), 
  75.                               ringMasterServer.xmlrpc.server_address[1])
  76.   
  77.   startService = staticmethod(startService)
  78.   stopService = staticmethod(stopService)
  79.   getPort = staticmethod(getPort)
  80.   getAddress = staticmethod(getAddress)
  81.   
  82. class _LogMasterSources:
  83.   """All the methods that are run by the RPC server are
  84.   added into this class """
  85.   
  86.   def __init__(self, serviceDict, cfg, np, log, rm):
  87.     self.serviceDict = serviceDict
  88.     self.tarSource = []
  89.     self.tarSourceLock = threading.Lock()
  90.     self.dict = {}
  91.     self.count = {}
  92.     self.logsourceList = []
  93.     self.logsourceListLock = threading.Lock()
  94.     self.masterParam = []
  95.     self.masterParamLock = threading.Lock()
  96.     self.verify = 'none'
  97.     self.cmdLock = threading.Lock()
  98.     self.cfg = cfg
  99.     self.log = log
  100.     self.np = np
  101.     self.rm = rm 
  102.     self.hdfsHost = None
  103.     self.mapredHost = None
  104.     self.maxconnect = self.cfg['ringmaster']['max-connect']
  105.     self.log.debug("Using max-connect value %s"%self.maxconnect)
  106.    
  107.   def registerTarSource(self, hostname, url, addr=None):
  108.     self.log.debug("registering: " + url)
  109.     lock = self.tarSourceLock
  110.     lock.acquire()
  111.     self.dict[url] = url
  112.     self.count[url] = 0
  113.     # addr is None when ringMaster himself invokes this method
  114.     if addr:
  115.       c = self.count[addr]
  116.       self.count[addr] = c - 1
  117.     lock.release()
  118.     if addr:
  119.       str = "%s is done" % (addr)
  120.       self.log.debug(str)
  121.     return url
  122.   def getTarList(self,hodring):   # this looks useful
  123.     lock = self.tarSourceLock
  124.     lock.acquire()
  125.     leastkey = None
  126.     leastval = -1
  127.     for k, v in self.count.iteritems():
  128.       if (leastval  == -1):
  129.         leastval = v
  130.         pass
  131.       if (v <= leastval and v < self.maxconnect):
  132.         leastkey = k
  133.         leastval = v
  134.     if (leastkey == None):
  135.       url  = 'none'
  136.     else:
  137.       url = self.dict[leastkey]
  138.       self.count[leastkey] = leastval + 1
  139.       self.log.debug("%s %d" % (leastkey, self.count[leastkey]))
  140.     lock.release()
  141.     self.log.debug('sending url ' + url+" to "+hodring)  # this looks useful
  142.     return url
  143.   def tarDone(self, uri):
  144.     str = "%s is done" % (uri)
  145.     self.log.debug(str)
  146.     lock = self.tarSourceLock
  147.     lock.acquire()
  148.     c = self.count[uri]
  149.     self.count[uri] = c - 1
  150.     lock.release()
  151.     return uri
  152.   def status(self):
  153.     return True
  154. # FIXME: this code is broken, it relies on a central service registry
  155. #
  156. #  def clusterStart(self, changedClusterParams=[]):
  157. #    self.log.debug("clusterStart method invoked.")
  158. #    self.dict = {}
  159. #    self.count = {}
  160. #    try:
  161. #      if (len(changedClusterParams) > 0):
  162. #        self.log.debug("Updating config.")
  163. #        for param in changedClusterParams:
  164. #          (key, sep1, val) = param.partition('=')
  165. #          (i1, sep2, i2) = key.partition('.')
  166. #          try:
  167. #            prev = self.cfg[i1][i2]
  168. #            self.rm.cfg[i1][i2] = val
  169. #            self.cfg[i1][i2] = val
  170. #            self.log.debug("nModified [%s][%s]=%s to [%s][%s]=%s" % (i1, i2, prev, i1, i2, val))
  171. #          except KeyError, e:
  172. #            self.log.info("Skipping %s as no such config parameter found in ringmaster" % param)
  173. #        self.log.debug("Regenerating Service Description.")
  174. #        dGen = DescGenerator(self.rm.cfg)
  175. #        self.rm.cfg['servicedesc'] = dGen.createServiceDescDict()
  176. #        self.cfg['servicedesc'] = self.rm.cfg['servicedesc']
  177. #  
  178. #      self.rm.tar = None
  179. #      if self.rm.cfg['ringmaster'].has_key('hadoop-tar-ball'):
  180. #        self.rm.download = True
  181. #        self.rm.tar = self.rm.cfg['ringmaster']['hadoop-tar-ball']
  182. #        self.log.debug("self.rm.tar=%s" % self.rm.tar)
  183. #      self.rm.cd_to_tempdir()
  184. #
  185. #      self.rm.tarAddress = None 
  186. #      hostname = socket.gethostname()
  187. #      if (self.rm.download):
  188. #        self.rm.basename = os.path.basename(self.rm.tar)
  189. #        dest = os.path.join(os.getcwd(), self.rm.basename)
  190. #        src =  self.rm.tar  
  191. #        self.log.debug("cp %s -> %s" % (src, dest))
  192. #        shutil.copy(src, dest) 
  193. #        self.rm.tarAddress = "%s%s" % (self.rm.httpAddress, self.rm.basename)
  194. #        self.registerTarSource(hostname, self.rm.tarAddress)
  195. #        self.log.debug("Registered new tarAddress %s" % self.rm.tarAddress)
  196. #      else:
  197. #        self.log.debug("Download not set.")
  198. #      
  199. #      if (self.rm.tar != None):
  200. #        self.cfg['hodring']['download-addr'] = self.rm.tarAddress
  201. #        self.rm.cfg['hodring']['download-addr'] = self.rm.tarAddress
  202. #
  203. #      sdl = self.rm.cfg['servicedesc']
  204. #      workDirs = self.rm.getWorkDirs(self.rm.cfg, True)
  205. #      hdfsDesc = sdl['hdfs']
  206. #      hdfs = None
  207. #      if hdfsDesc.isExternal():
  208. #        hdfs = HdfsExternal(hdfsDesc, workDirs)
  209. #      else:
  210. #        hdfs = Hdfs(hdfsDesc, workDirs, 0, False, True)
  211. #    
  212. #      self.rm.serviceDict[hdfs.getName()] = hdfs
  213. #      mrDesc = sdl['mapred']
  214. #      mr = None
  215. #      if mrDesc.isExternal():
  216. #        mr = MapReduceExternal(mrDesc, workDirs)
  217. #      else:
  218. #        mr = MapReduce(mrDesc, workDirs, 1)
  219. #      self.rm.serviceDict[mr.getName()] = mr
  220. #
  221. #      ringList = self.rm.serviceClient.getServiceInfo(self.cfg['hodring']['userid'],
  222. #        self.np.getServiceId(), 'hodring', 'hod') 
  223. #    
  224. #      slaveList = ringList
  225. #      hdfsringXRAddress = None
  226. #      # Start HDFS Master - Step 1
  227. #      if not hdfsDesc.isExternal():
  228. #        masterFound = False
  229. #        for ring in ringList:
  230. #          ringXRAddress = ring['xrs']
  231. #          if ringXRAddress == None:
  232. #            raise Exception("Could not get hodring XML-RPC server address.")
  233. #          if  (ringXRAddress.find(self.hdfsHost) != -1):
  234. #            ringClient = hodXRClient(ringXRAddress, None, None, 0, 0, 0, False, 0)
  235. #            hdfsringXRAddress = ringXRAddress
  236. #            self.log.debug("Invoking clusterStart on " + ringXRAddress + " (HDFS Master)")
  237. #            ringClient.clusterStart()
  238. #            masterFound = True 
  239. #            slaveList.remove(ring)
  240. #            break
  241. #        if not masterFound:
  242. #          raise Exception("HDFS Master host not found")
  243. #        while hdfs.getInfoAddrs() == None:
  244. #          self.log.debug("Waiting for HDFS Master (Name Node) to register dfs.info.port")
  245. #          time.sleep(1)
  246. #
  247. #      # Start MAPRED Master - Step 2
  248. #      if not mrDesc.isExternal():
  249. #        masterFound = False
  250. #        for ring in ringList:
  251. #          ringXRAddress = ring['xrs']
  252. #          if ringXRAddress == None:
  253. #            raise Exception("Could not get hodring XML-RPC server address.")
  254. #          if (not mrDesc.isExternal() and ringXRAddress.find(self.mapredHost) != -1):
  255. #            ringClient = hodXRClient(ringXRAddress, None, None, 0, 0, 0, False, 0)
  256. #            self.log.debug("Invoking clusterStart on " + ringXRAddress + " (MAPRED Master)")
  257. #            ringClient.clusterStart()
  258. #            masterFound = True 
  259. #            slaveList.remove(ring)
  260. #            break
  261. #        if not masterFound:
  262. #          raise Excpetion("MAPRED Master host not found")
  263. #        while mr.getInfoAddrs() == None:
  264. #          self.log.debug("Waiting for MAPRED Master (Job Tracker) to register 
  265. # mapred.job.tracker.info.port")
  266. #          time.sleep(1)
  267. #
  268. #      # Start Slaves - Step 3 
  269. #      for ring in slaveList:
  270. #          ringXRAddress = ring['xrs']
  271. #          if ringXRAddress == None:
  272. #            raise Exception("Could not get hodring XML-RPC server address.")
  273. #          ringClient = hodXRClient(ringXRAddress, None, None, 0, 0, 0, False, 0)
  274. #          self.log.debug("Invoking clusterStart on " + ringXRAddress + " (Slaves)")
  275. #          ringThread = func(name='hodring_slaves_start', functionRef=ringClient.clusterStart())
  276. #          ring['thread'] = ringThread
  277. #          ringThread.start()
  278. #
  279. #      for ring in slaveList:
  280. #        ringThread = ring['thread']
  281. #        if ringThread == None:
  282. #          raise Exception("Could not get hodring thread (Slave).")
  283. #        ringThread.join()
  284. #        self.log.debug("Completed clusterStart on " + ring['xrs'] + " (Slave)")
  285. #
  286. #      # Run Admin Commands on HDFS Master - Step 4
  287. #      if not hdfsDesc.isExternal():
  288. #        if hdfsringXRAddress == None:
  289. #          raise Exception("HDFS Master host not found (to Run Admin Commands)")
  290. #        ringClient = hodXRClient(hdfsringXRAddress, None, None, 0, 0, 0, False, 0)
  291. #        self.log.debug("Invoking clusterStart(False) - Admin on "
  292. #                       + hdfsringXRAddress + " (HDFS Master)")
  293. #        ringClient.clusterStart(False)
  294. #
  295. #    except:
  296. #      self.log.debug(get_exception_string())
  297. #      return False
  298. #
  299. #    self.log.debug("Successfully started cluster.")
  300. #    return True
  301. #
  302. #  def clusterStop(self):
  303. #    self.log.debug("clusterStop method invoked.")
  304. #    try:
  305. #      hdfsAddr = self.getServiceAddr('hdfs')
  306. #      if hdfsAddr.find(':') != -1:
  307. #        h, p = hdfsAddr.split(':', 1)
  308. #        self.hdfsHost = h
  309. #        self.log.debug("hdfsHost: " + self.hdfsHost)
  310. #      mapredAddr = self.getServiceAddr('mapred')
  311. #      if mapredAddr.find(':') != -1:
  312. #        h, p = mapredAddr.split(':', 1)
  313. #        self.mapredHost = h
  314. #        self.log.debug("mapredHost: " + self.mapredHost)
  315. #      ringList = self.rm.serviceClient.getServiceInfo(self.cfg['hodring']['userid'],
  316. #                                                      self.np.getServiceId(),
  317. #                                                      'hodring', 'hod')
  318. #      for ring in ringList:
  319. #        ringXRAddress = ring['xrs']
  320. #        if ringXRAddress == None:
  321. #          raise Exception("Could not get hodring XML-RPC server address.")
  322. #        ringClient = hodXRClient(ringXRAddress, None, None, 0, 0, 0, False)
  323. #        self.log.debug("Invoking clusterStop on " + ringXRAddress)
  324. #        ringThread = func(name='hodring_stop', functionRef=ringClient.clusterStop())
  325. #        ring['thread'] = ringThread
  326. #        ringThread.start()
  327. #
  328. #      for ring in ringList:
  329. #        ringThread = ring['thread']
  330. #        if ringThread == None:
  331. #          raise Exception("Could not get hodring thread.")
  332. #        ringThread.join()
  333. #        self.log.debug("Completed clusterStop on " + ring['xrs'])
  334. #
  335. #    except:
  336. #      self.log.debug(get_exception_string())
  337. #      return False
  338. #
  339. #    self.log.debug("Successfully stopped cluster.")
  340. #    
  341. #    return True
  342.   def getCommand(self, addr):
  343.     """This method is called by the
  344.     hodrings to get commands from
  345.     the ringmaster"""
  346.     lock = self.cmdLock
  347.     cmdList = []
  348.     lock.acquire()
  349.     try:
  350.       try:
  351.         for v in self.serviceDict.itervalues():
  352.           if (not v.isExternal()):
  353.             if v.isLaunchable(self.serviceDict):
  354.               # If a master is still not launched, or the number of 
  355.               # retries for launching master is not reached, 
  356.               # launch master
  357.               if not v.isMasterLaunched() and 
  358.                   (v.getMasterFailureCount() <= 
  359.                       self.cfg['ringmaster']['max-master-failures']):
  360.                 cmdList = v.getMasterCommands(self.serviceDict)
  361.                 v.setlaunchedMaster()
  362.                 v.setMasterAddress(addr)
  363.                 break
  364.         if cmdList == []:
  365.           for s in self.serviceDict.itervalues():
  366.             if (not v.isExternal()):
  367.               if s.isMasterInitialized():
  368.                 cl = s.getWorkerCommands(self.serviceDict)
  369.                 cmdList.extend(cl)
  370.               else:
  371.                 cmdList = []
  372.                 break
  373.       except:
  374.         self.log.debug(get_exception_string())
  375.     finally:
  376.       lock.release()
  377.       pass
  378.     
  379.     cmd = addr + pformat(cmdList)
  380.     self.log.debug("getCommand returning " + cmd)
  381.     return cmdList
  382.   
  383.   def getAdminCommand(self, addr):
  384.     """This method is called by the
  385.     hodrings to get admin commands from
  386.     the ringmaster"""
  387.     lock = self.cmdLock
  388.     cmdList = []
  389.     lock.acquire()
  390.     try:
  391.       try:
  392.         for v in self.serviceDict.itervalues():
  393.           cmdList = v.getAdminCommands(self.serviceDict)
  394.           if cmdList != []:
  395.             break
  396.       except Exception, e:
  397.         self.log.debug(get_exception_string())
  398.     finally:
  399.       lock.release()
  400.       pass
  401.     cmd = addr + pformat(cmdList)
  402.     self.log.debug("getAdminCommand returning " + cmd)
  403.     return cmdList
  404.   def addMasterParams(self, addr, vals):
  405.     """This method is called by
  406.     hodring to update any parameters
  407.     its changed for the commands it was
  408.     running"""
  409.     self.log.debug('Comment: adding master params from %s' % addr)
  410.     self.log.debug(pformat(vals))
  411.     lock = self.masterParamLock
  412.     lock.acquire()
  413.     try:
  414.       for v in self.serviceDict.itervalues():
  415.         if v.isMasterLaunched():
  416.           if (v.getMasterAddress() == addr):
  417.             v.setMasterParams(vals)
  418.             v.setMasterInitialized()
  419.     except:
  420.       self.log.debug(get_exception_string())
  421.       pass
  422.     lock.release()
  423.             
  424.     return addr
  425.   def setHodRingErrors(self, addr, errors):
  426.     """This method is called by the hodrings to update errors 
  427.       it encountered while starting up"""
  428.     self.log.critical("Hodring at %s failed with following errors:n%s" 
  429.                         % (addr, errors))
  430.     lock = self.masterParamLock
  431.     lock.acquire()
  432.     try:
  433.       for v in self.serviceDict.itervalues():
  434.         if v.isMasterLaunched():
  435.           if (v.getMasterAddress() == addr):
  436.             # strip the PID part.
  437.             idx = addr.rfind('_')
  438.             if idx is not -1:
  439.               addr = addr[:idx]
  440.             v.setMasterFailed("Hodring at %s failed with following" 
  441.                                 " errors:n%s" % (addr, errors))
  442.     except:
  443.       self.log.debug(get_exception_string())
  444.       pass
  445.     lock.release()
  446.     return True
  447.   def getKeys(self):
  448.     lock= self.masterParamLock
  449.     lock.acquire()
  450.     keys = self.serviceDict.keys()
  451.     lock.release()    
  452.   
  453.     return keys
  454.   
  455.   def getServiceAddr(self, name):
  456.     addr = 'not found'
  457.     self.log.debug("getServiceAddr name: %s" % name)
  458.     lock= self.masterParamLock
  459.     lock.acquire()
  460.     try:
  461.       service = self.serviceDict[name]
  462.     except KeyError:
  463.       pass
  464.     else:
  465.       self.log.debug("getServiceAddr service: %s" % service)
  466.       # Check if we should give up ! If the limit on max failures is hit, 
  467.       # give up.
  468.       err = service.getMasterFailed()
  469.       if (err is not None) and 
  470.             (service.getMasterFailureCount() > 
  471.                       self.cfg['ringmaster']['max-master-failures']):
  472.         self.log.critical("Detected errors (%s) beyond allowed number"
  473.                             " of failures (%s). Flagging error to client" 
  474.                             % (service.getMasterFailureCount(), 
  475.                               self.cfg['ringmaster']['max-master-failures']))
  476.         addr = "Error: " + err
  477.       elif (service.isMasterInitialized()):
  478.         addr = service.getMasterAddrs()[0]
  479.       else:
  480.         addr = 'not found'
  481.     lock.release()
  482.     self.log.debug("getServiceAddr addr %s: %s" % (name, addr))
  483.     
  484.     return addr
  485.   def getURLs(self, name):
  486.     addr = 'none'
  487.     lock = self.masterParamLock
  488.     lock.acquire()
  489.     
  490.     try:
  491.       service = self.serviceDict[name]
  492.     except KeyError:
  493.       pass
  494.     else:
  495.       if (service.isMasterInitialized()):
  496.         addr = service.getInfoAddrs()[0]
  497.       
  498.     lock.release()
  499.     
  500.     return addr
  501.   def stopRM(self):
  502.     """An XMLRPC call which will spawn a thread to stop the Ringmaster program."""
  503.     # We spawn a thread here because we want the XMLRPC call to return. Calling
  504.     # stop directly from here will also stop the XMLRPC server.
  505.     try:
  506.       self.log.debug("inside xml-rpc call to stop ringmaster")
  507.       rmStopperThread = func('RMStopper', self.rm.stop)
  508.       rmStopperThread.start()
  509.       self.log.debug("returning from xml-rpc call to stop ringmaster")
  510.       return True
  511.     except:
  512.       self.log.debug("Exception in stop: %s" % get_exception_string())
  513.       return False
  514. class RingMaster:
  515.   def __init__(self, cfg, log, **kwds):
  516.     """starts nodepool and services"""
  517.     self.download = False
  518.     self.httpServer = None
  519.     self.cfg = cfg
  520.     self.log = log
  521.     self.__hostname = local_fqdn()
  522.     self.workDirs = None 
  523.     # ref to the idle job tracker object.
  524.     self.__jtMonitor = None
  525.     self.__idlenessDetected = False
  526.     self.__stopInProgress = False
  527.     self.__isStopped = False # to let main exit
  528.     self.__exitCode = 0 # exit code with which the ringmaster main method should return
  529.     self.workers_per_ring = self.cfg['ringmaster']['workers_per_ring']
  530.     self.__initialize_signal_handlers()
  531.     
  532.     sdd = self.cfg['servicedesc']
  533.     gsvc = None
  534.     for key in sdd:
  535.       gsvc = sdd[key]
  536.       break
  537.     
  538.     npd = self.cfg['nodepooldesc']
  539.     self.np = NodePoolUtil.getNodePool(npd, cfg, log)
  540.     self.log.debug("Getting service ID.")
  541.     
  542.     self.serviceId = self.np.getServiceId()
  543.     
  544.     self.log.debug("Got service ID: %s" % self.serviceId)
  545.     self.tarSrcLoc = None
  546.     if self.cfg['ringmaster'].has_key('hadoop-tar-ball'):
  547.       self.download = True
  548.       self.tarSrcLoc = self.cfg['ringmaster']['hadoop-tar-ball']
  549.  
  550.     self.cd_to_tempdir()
  551.     if (self.download):
  552.       self.__copy_tarball(os.getcwd())
  553.       self.basename = self.__find_tarball_in_dir(os.getcwd())
  554.       if self.basename is None:
  555.         raise Exception('Did not find tarball copied from %s in %s.'
  556.                           % (self.tarSrcLoc, os.getcwd()))
  557.       
  558.     self.serviceAddr = to_http_url(self.cfg['ringmaster']['svcrgy-addr'])
  559.     
  560.     self.log.debug("Service registry @ %s" % self.serviceAddr)
  561.     
  562.     self.serviceClient = hodXRClient(self.serviceAddr)
  563.     self.serviceDict  = {}
  564.     try:
  565.       sdl = self.cfg['servicedesc']
  566.       workDirs = self.getWorkDirs(cfg)
  567.       hdfsDesc = sdl['hdfs']
  568.       hdfs = None
  569.  
  570.       # Determine hadoop Version
  571.       hadoopVers = hadoopVersion(self.__getHadoopDir(), 
  572.                                 self.cfg['hodring']['java-home'], self.log)
  573.      
  574.       if (hadoopVers['major']==None) or (hadoopVers['minor']==None):
  575.         raise Exception('Could not retrive the version of Hadoop.'
  576.                         + ' Check the Hadoop installation or the value of the hodring.java-home variable.')
  577.       if hdfsDesc.isExternal():
  578.         hdfs = HdfsExternal(hdfsDesc, workDirs, version=int(hadoopVers['minor']))
  579.         hdfs.setMasterParams( self.cfg['gridservice-hdfs'] )
  580.       else:
  581.         hdfs = Hdfs(hdfsDesc, workDirs, 0, version=int(hadoopVers['minor']),
  582.                     workers_per_ring = self.workers_per_ring)
  583.       self.serviceDict[hdfs.getName()] = hdfs
  584.       
  585.       mrDesc = sdl['mapred']
  586.       mr = None
  587.       if mrDesc.isExternal():
  588.         mr = MapReduceExternal(mrDesc, workDirs, version=int(hadoopVers['minor']))
  589.         mr.setMasterParams( self.cfg['gridservice-mapred'] )
  590.       else:
  591.         mr = MapReduce(mrDesc, workDirs,1, version=int(hadoopVers['minor']),
  592.                        workers_per_ring = self.workers_per_ring)
  593.       self.serviceDict[mr.getName()] = mr
  594.     except:
  595.       self.log.critical("Exception in creating Hdfs and Map/Reduce descriptor objects: 
  596.                             %s." % get_exception_error_string())
  597.       self.log.debug(get_exception_string())
  598.       raise
  599.     # should not be starting these in a constructor
  600.     ringMasterServer.startService(self.serviceDict, cfg, self.np, log, self)
  601.     
  602.     self.rpcserver = ringMasterServer.getAddress()
  603.     
  604.     self.httpAddress = None   
  605.     self.tarAddress = None 
  606.     hostname = socket.gethostname()
  607.     if (self.download):
  608.       self.httpServer = threadedHTTPServer(hostname, 
  609.         self.cfg['ringmaster']['http-port-range'])
  610.       
  611.       self.httpServer.serve_forever()
  612.       self.httpAddress = "http://%s:%d/" % (self.httpServer.server_address[0], 
  613.                                  self.httpServer.server_address[1])
  614.       self.tarAddress = "%s%s" % (self.httpAddress, self.basename)
  615.       
  616.       ringMasterServer.instance.logMasterSources.registerTarSource(hostname, 
  617.                                                                    self.tarAddress)
  618.     else:
  619.       self.log.debug("Download not set.")
  620.     
  621.     self.log.debug("%s %s %s %s %s" % (self.cfg['ringmaster']['userid'], 
  622.       self.serviceId, self.__hostname, 'ringmaster', 'hod'))
  623.     
  624.     if self.cfg['ringmaster']['register']:      
  625.       if self.httpAddress:
  626.         self.serviceClient.registerService(self.cfg['ringmaster']['userid'], 
  627.           self.serviceId, self.__hostname, 'ringmaster', 'hod', {
  628.           'xrs' : self.rpcserver, 'http' : self.httpAddress })
  629.       else:
  630.         self.serviceClient.registerService(self.cfg['ringmaster']['userid'], 
  631.           self.serviceId, self.__hostname, 'ringmaster', 'hod', {
  632.           'xrs' : self.rpcserver, })
  633.     
  634.     self.log.debug("Registered with serivce registry: %s." % self.serviceAddr)
  635.     
  636.     hodRingPath = os.path.join(cfg['ringmaster']['base-dir'], 'bin', 'hodring')
  637.     hodRingWorkDir = os.path.join(cfg['hodring']['temp-dir'], 'hodring' + '_' 
  638.                                   + getpass.getuser())
  639.     
  640.     self.cfg['hodring']['hodring'] = [hodRingWorkDir,]
  641.     self.cfg['hodring']['svcrgy-addr'] = self.cfg['ringmaster']['svcrgy-addr']
  642.     self.cfg['hodring']['service-id'] = self.np.getServiceId()
  643.     self.cfg['hodring']['ringmaster-xrs-addr'] = self.__url_to_addr(self.rpcserver)
  644.     
  645.     if (self.tarSrcLoc != None):
  646.       cfg['hodring']['download-addr'] = self.tarAddress
  647.  
  648.     self.__init_job_tracker_monitor(ringMasterServer.instance.logMasterSources)
  649.   def __init_job_tracker_monitor(self, logMasterSources):
  650.     hadoopDir = self.__getHadoopDir()
  651.     self.log.debug('hadoopdir=%s, java-home=%s' % 
  652.                 (hadoopDir, self.cfg['hodring']['java-home']))
  653.     try:
  654.       self.__jtMonitor = JobTrackerMonitor(self.log, self, 
  655.                             self.cfg['ringmaster']['jt-poll-interval'], 
  656.                             self.cfg['ringmaster']['idleness-limit'],
  657.                             hadoopDir, self.cfg['hodring']['java-home'],
  658.                             logMasterSources)
  659.       self.log.debug('starting jt monitor')
  660.       self.__jtMonitor.start()
  661.     except:
  662.       self.log.critical('Exception in running idle job tracker. This cluster cannot be deallocated if idle.
  663.                           Exception message: %s' % get_exception_error_string())
  664.       self.log.debug('Exception details: %s' % get_exception_string())
  665.   def __getHadoopDir(self):
  666.     hadoopDir = None
  667.     if self.cfg['ringmaster'].has_key('hadoop-tar-ball'):
  668.       tarFile = os.path.join(os.getcwd(), self.basename)
  669.       ret = untar(tarFile, os.getcwd())
  670.       if not ret:
  671.         raise Exception('Untarring tarfile %s to directory %s failed. Cannot find hadoop directory.' 
  672.                             % (tarFile, os.getcwd()))
  673.       hadoopDir = os.path.join(os.getcwd(), self.__get_dir(tarFile))
  674.     else:
  675.       hadoopDir = self.cfg['gridservice-mapred']['pkgs']
  676.     self.log.debug('Returning Hadoop directory as: %s' % hadoopDir)
  677.     return hadoopDir
  678.   def __get_dir(self, name):
  679.     """Return the root directory inside the tarball
  680.     specified by name. Assumes that the tarball begins
  681.     with a root directory."""
  682.     import tarfile
  683.     myTarFile = tarfile.open(name)
  684.     hadoopPackage = myTarFile.getnames()[0]
  685.     self.log.debug("tarball name : %s hadoop package name : %s" %(name,hadoopPackage))
  686.     return hadoopPackage
  687.   def __find_tarball_in_dir(self, dir):
  688.     """Find the tarball among files specified in the given 
  689.     directory. We need this method because how the tarball
  690.     source URI is given depends on the method of copy and
  691.     we can't get the tarball name from that.
  692.     This method will fail if there are multiple tarballs
  693.     in the directory with the same suffix."""
  694.     files = os.listdir(dir)
  695.     for file in files:
  696.       if self.tarSrcLoc.endswith(file):
  697.         return file
  698.     return None
  699.   def __copy_tarball(self, destDir):
  700.     """Copy the hadoop tar ball from a remote location to the
  701.     specified destination directory. Based on the URL it executes
  702.     an appropriate copy command. Throws an exception if the command
  703.     returns a non-zero exit code."""
  704.     # for backwards compatibility, treat the default case as file://
  705.     url = ''
  706.     if self.tarSrcLoc.startswith('/'):
  707.       url = 'file:/'
  708.     src = '%s%s' % (url, self.tarSrcLoc)
  709.     if src.startswith('file://'):
  710.       src = src[len('file://')-1:]
  711.       cpCmd = '/bin/cp'
  712.       cmd = '%s %s %s' % (cpCmd, src, destDir)
  713.       self.log.debug('Command to execute: %s' % cmd)
  714.       copyProc = simpleCommand('remote copy', cmd)
  715.       copyProc.start()
  716.       copyProc.wait()
  717.       copyProc.join()
  718.       ret = copyProc.exit_code()
  719.       self.log.debug('Completed command execution. Exit Code: %s.' % ret)
  720.       if ret != 0:
  721.         output = copyProc.output()
  722.         raise Exception('Could not copy tarball using command %s. Exit code: %s. Output: %s' 
  723.                         % (cmd, ret, output))
  724.     else:
  725.       raise Exception('Unsupported URL for file: %s' % src)
  726. # input: http://hostname:port/. output: [hostname,port]
  727.   def __url_to_addr(self, url):
  728.     addr = url.rstrip('/')
  729.     if addr.startswith('http://'):
  730.       addr = addr.replace('http://', '', 1)
  731.     addr_parts = addr.split(':')
  732.     return [addr_parts[0], int(addr_parts[1])]
  733.   def __initialize_signal_handlers(self): 
  734.     def sigStop(sigNum, handler):
  735.       sig_wrapper(sigNum, self.stop)
  736.   
  737.     signal.signal(signal.SIGTERM, sigStop)
  738.     signal.signal(signal.SIGINT, sigStop)
  739.     signal.signal(signal.SIGQUIT, sigStop)
  740.   def __clean_up(self):
  741.     tempDir = self.__get_tempdir()
  742.     os.chdir(os.path.split(tempDir)[0])
  743.     if os.path.exists(tempDir):
  744.       shutil.rmtree(tempDir, True)
  745.       
  746.     self.log.debug("Cleaned up temporary dir: %s" % tempDir)
  747.   def __get_tempdir(self):
  748.     dir = os.path.join(self.cfg['ringmaster']['temp-dir'], 
  749.                           "%s.%s.ringmaster" % (self.cfg['ringmaster']['userid'], 
  750.                                                 self.np.getServiceId()))
  751.     return dir
  752.   def getWorkDirs(self, cfg, reUse=False):
  753.     if (not reUse) or (self.workDirs == None):
  754.       import math
  755.       frand = random.random()
  756.       while math.ceil(frand) != math.floor(frand):
  757.         frand = frand * 100
  758.       irand = int(frand)
  759.       uniq = '%s-%d-%s' % (socket.gethostname(), os.getpid(), irand)
  760.       dirs = []
  761.       parentDirs = cfg['ringmaster']['work-dirs']
  762.       for p in parentDirs:
  763.         dir = os.path.join(p, uniq)
  764.         dirs.append(dir)
  765.       self.workDirs = dirs
  766.     return self.workDirs
  767.   def _fetchLink(self, link, parentDir):
  768.     parser = miniHTMLParser()
  769.     self.log.debug("Checking link %s" %link)
  770.     while link:
  771.       # Get the file from the site and link
  772.       input = urllib.urlopen(link)
  773.       out = None
  774.       contentType = input.info().gettype()
  775.       isHtml = contentType == 'text/html'
  776.       #print contentType
  777.       if isHtml:
  778.         parser.setBaseUrl(input.geturl())
  779.       else:
  780.         parsed = urlparse.urlparse(link)
  781.         hp = parsed[1]
  782.         h = hp
  783.         p = None
  784.         if hp.find(':') != -1:
  785.           h, p = hp.split(':', 1)
  786.         path = parsed[2]
  787.         path = path.split('/')
  788.         file = os.path.join(parentDir, h, p)
  789.         for c in path:
  790.           if c == '':
  791.             continue
  792.           file = os.path.join(file, c)
  793.         try:
  794.           self.log.debug('Creating %s' % file)
  795.           dir, tail = os.path.split(file)
  796.           if not os.path.exists(dir):
  797.             os.makedirs(dir)
  798.         except:
  799.           self.log.debug(get_exception_string())
  800.         out = open(file, 'w')
  801.       bufSz = 8192
  802.       buf = input.read(bufSz)
  803.       while len(buf) > 0:
  804.         if isHtml:
  805.           # Feed the file into the HTML parser
  806.           parser.feed(buf)
  807.         if out:
  808.           out.write(buf)
  809.         buf = input.read(bufSz)
  810.       input.close()
  811.       if out:
  812.         out.close()
  813.       # Search the retfile here
  814.       # Get the next link in level traversal order
  815.       link = parser.getNextLink()
  816.       
  817.     parser.close()
  818.     
  819.   def _finalize(self):
  820.     try:
  821.       # FIXME: get dir from config
  822.       dir = 'HOD-log-P%d' % (os.getpid())
  823.       dir = os.path.join('.', dir)
  824.     except:
  825.       self.log.debug(get_exception_string())
  826.     self.np.finalize()
  827.   def handleIdleJobTracker(self):
  828.     self.log.critical("Detected idle job tracker for %s seconds. The allocation will be cleaned up." 
  829.                           % self.cfg['ringmaster']['idleness-limit'])
  830.     self.__idlenessDetected = True
  831.   def cd_to_tempdir(self):
  832.     dir = self.__get_tempdir()
  833.     
  834.     if not os.path.exists(dir):
  835.       os.makedirs(dir)
  836.     os.chdir(dir)
  837.     
  838.     return dir
  839.   
  840.   def getWorkload(self):
  841.     return self.workload
  842.   def getHostName(self):
  843.     return self.__hostname
  844.   def start(self):
  845.     """run the thread main loop"""
  846.     
  847.     self.log.debug("Entered start method.")
  848.     hodring = os.path.join(self.cfg['ringmaster']['base-dir'], 
  849.                            'bin', 'hodring')
  850.     largs = [hodring]
  851.     targs = self.cfg.get_args(section='hodring')
  852.     largs.extend(targs) 
  853.     
  854.     hodringCmd = ""
  855.     for item in largs:
  856.       hodringCmd = "%s%s " % (hodringCmd, item)
  857.       
  858.     self.log.debug(hodringCmd)
  859.     
  860.     if self.np.runWorkers(largs) > 0:
  861.       self.log.critical("Failed to start worker.")
  862.     
  863.     self.log.debug("Returned from runWorkers.")
  864.     
  865.     self._finalize()
  866.   def __findExitCode(self):
  867.     """Determine the exit code based on the status of the cluster or jobs run on them"""
  868.     xmlrpcServer = ringMasterServer.instance.logMasterSources
  869.     if xmlrpcServer.getServiceAddr('hdfs') == 'not found' or 
  870.         xmlrpcServer.getServiceAddr('hdfs').startswith("Error: "):
  871.       self.__exitCode = 7
  872.     elif xmlrpcServer.getServiceAddr('mapred') == 'not found' or 
  873.         xmlrpcServer.getServiceAddr('mapred').startswith("Error: "):
  874.       self.__exitCode = 8
  875.     else:
  876.       clusterStatus = get_cluster_status(xmlrpcServer.getServiceAddr('hdfs'),
  877.                                           xmlrpcServer.getServiceAddr('mapred'))
  878.       if clusterStatus != 0:
  879.         self.__exitCode = clusterStatus
  880.       else:
  881.         self.__exitCode = self.__findHadoopJobsExitCode()
  882.     self.log.debug('exit code %s' % self.__exitCode)
  883.   def __findHadoopJobsExitCode(self):
  884.     """Determine the consolidate exit code of hadoop jobs run on this cluster, provided
  885.        this information is available. Return 0 otherwise"""
  886.     ret = 0
  887.     failureStatus = 3
  888.     failureCount = 0
  889.     if self.__jtMonitor:
  890.       jobStatusList = self.__jtMonitor.getJobsStatus()
  891.       try:
  892.         if len(jobStatusList) > 0:
  893.           for jobStatus in jobStatusList:
  894.             self.log.debug('job status for %s: %s' % (jobStatus.getJobId(), 
  895.                                                       jobStatus.getStatus()))
  896.             if jobStatus.getStatus() == failureStatus:
  897.               failureCount = failureCount+1
  898.         if failureCount > 0:
  899.           if failureCount == len(jobStatusList): # all jobs failed
  900.             ret = 16
  901.           else:
  902.             ret = 17
  903.       except:
  904.         self.log.debug('exception in finding hadoop jobs exit code' % get_exception_string())
  905.     return ret
  906.   def stop(self):
  907.     self.log.debug("RingMaster stop method invoked.")
  908.     if self.__stopInProgress or self.__isStopped:
  909.       return
  910.     self.__stopInProgress = True
  911.     if ringMasterServer.instance is not None:
  912.       self.log.debug('finding exit code')
  913.       self.__findExitCode()
  914.       self.log.debug('stopping ringmaster instance')
  915.       ringMasterServer.stopService()
  916.     else:
  917.       self.__exitCode = 6
  918.     if self.__jtMonitor is not None:
  919.       self.__jtMonitor.stop()
  920.     if self.httpServer:
  921.       self.httpServer.stop()
  922.       
  923.     self.__clean_up()
  924.     self.__isStopped = True
  925.   def shouldStop(self):
  926.     """Indicates whether the main loop should exit, either due to idleness condition, 
  927.     or a stop signal was received"""
  928.     return self.__idlenessDetected or self.__isStopped
  929.   def getExitCode(self):
  930.     """return the exit code of the program"""
  931.     return self.__exitCode
  932. def main(cfg,log):
  933.   try:
  934.     rm = None
  935.     dGen = DescGenerator(cfg)
  936.     cfg = dGen.initializeDesc()
  937.     rm = RingMaster(cfg, log)
  938.     rm.start()
  939.     while not rm.shouldStop():
  940.       time.sleep(1)
  941.     rm.stop()
  942.     log.debug('returning from main')
  943.     return rm.getExitCode()
  944.   except Exception, e:
  945.     if log:
  946.       log.critical(get_exception_string())
  947.     raise Exception(e)