- #Licensed to the Apache Software Foundation (ASF) under one
- #or more contributor license agreements. See the NOTICE file
- #distributed with this work for additional information
- #regarding copyright ownership. The ASF licenses this file
- #to you under the Apache License, Version 2.0 (the
- #"License"); you may not use this file except in compliance
- #with the License. You may obtain a copy of the License at
- # http://www.apache.org/licenses/LICENSE-2.0
- #Unless required by applicable law or agreed to in writing, software
- #distributed under the License is distributed on an "AS IS" BASIS,
- #See the License for the specific language governing permissions and
- #limitations under the License.
- #!/usr/bin/env python
- """manages services and nodepool"""
- # -*- python -*-
- import os, sys, random, time, sets, shutil, threading
- import urllib, urlparse, re, getpass, pprint, signal, shutil
- from pprint import pformat
- from HTMLParser import HTMLParser
- binfile = sys.path[0]
- libdir = os.path.dirname(binfile)
- sys.path.append(libdir)
- import hodlib.Common.logger
- from hodlib.RingMaster.idleJobTracker import JobTrackerMonitor, HadoopJobStatus
- from hodlib.Common.threads import func
- from hodlib.Hod.nodePool import *
- from hodlib.Common.util import *
- from hodlib.Common.nodepoolutil import NodePoolUtil
- from hodlib.Common.socketServers import hodXMLRPCServer
- from hodlib.Common.socketServers import threadedHTTPServer
- from hodlib.NodePools import *
- from hodlib.NodePools.torque import *
- from hodlib.GridServices import *
- from hodlib.Common.descGenerator import *
- from hodlib.Common.xmlrpc import hodXRClient
- from hodlib.Common.miniHTMLParser import miniHTMLParser
- from hodlib.Common.threads import simpleCommand
- class ringMasterServer:
- """The RPC server that exposes all the master config
- changes. Also, one of these RPC servers runs as a proxy
- and all the hodring instances register with this proxy"""
- instance = None
- xmlrpc = None
- def __init__(self, cfg, log, logMasterSources, retry=5):
- try:
- from hodlib.Common.socketServers import twistedXMLRPCServer
- ringMasterServer.xmlrpc = twistedXMLRPCServer("",
- cfg['ringmaster']['xrs-port-range'])
- except ImportError:
- log.info("Twisted interface not found. Using hodXMLRPCServer.")
- ringMasterServer.xmlrpc = hodXMLRPCServer("",
- cfg['ringmaster']['xrs-port-range'])
- ringMasterServer.xmlrpc.register_instance(logMasterSources)
- self.logMasterSources = logMasterSources
- ringMasterServer.xmlrpc.serve_forever()
- while not ringMasterServer.xmlrpc.is_alive():
- time.sleep(.5)
- log.debug('Ringmaster RPC Server at %d' %
- ringMasterServer.xmlrpc.server_address[1])
- def startService(ss, cfg, np, log, rm):
- logMasterSources = _LogMasterSources(ss, cfg, np, log, rm)
- ringMasterServer.instance = ringMasterServer(cfg, log, logMasterSources)
- def stopService():
- ringMasterServer.xmlrpc.stop()
- def getPort():
- return ringMasterServer.instance.port
- def getAddress():
- return 'http://%s:%d/' % (socket.gethostname(),
- ringMasterServer.xmlrpc.server_address[1])
- startService = staticmethod(startService)
- stopService = staticmethod(stopService)
- getPort = staticmethod(getPort)
- getAddress = staticmethod(getAddress)
- class _LogMasterSources:
- """All the methods that are run by the RPC server are
- added into this class """
- def __init__(self, serviceDict, cfg, np, log, rm):
- self.serviceDict = serviceDict
- self.tarSource = []
- self.tarSourceLock = threading.Lock()
- self.dict = {}
- self.count = {}
- self.logsourceList = []
- self.logsourceListLock = threading.Lock()
- self.masterParam = []
- self.masterParamLock = threading.Lock()
- self.verify = 'none'
- self.cmdLock = threading.Lock()
- self.cfg = cfg
- self.log = log
- self.np = np
- self.rm = rm
- self.hdfsHost = None
- self.mapredHost = None
- self.maxconnect = self.cfg['ringmaster']['max-connect']
- self.log.debug("Using max-connect value %s"%self.maxconnect)
- def registerTarSource(self, hostname, url, addr=None):
- self.log.debug("registering: " + url)
- lock = self.tarSourceLock
- lock.acquire()
- self.dict[url] = url
- self.count[url] = 0
- # addr is None when ringMaster himself invokes this method
- if addr:
- c = self.count[addr]
- self.count[addr] = c - 1
- lock.release()
- if addr:
- str = "%s is done" % (addr)
- self.log.debug(str)
- return url
- def getTarList(self,hodring): # this looks useful
- lock = self.tarSourceLock
- lock.acquire()
- leastkey = None
- leastval = -1
- for k, v in self.count.iteritems():
- if (leastval == -1):
- leastval = v
- pass
- if (v <= leastval and v < self.maxconnect):
- leastkey = k
- leastval = v
- if (leastkey == None):
- url = 'none'
- else:
- url = self.dict[leastkey]
- self.count[leastkey] = leastval + 1
- self.log.debug("%s %d" % (leastkey, self.count[leastkey]))
- lock.release()
- self.log.debug('sending url ' + url+" to "+hodring) # this looks useful
- return url
- def tarDone(self, uri):
- str = "%s is done" % (uri)
- self.log.debug(str)
- lock = self.tarSourceLock
- lock.acquire()
- c = self.count[uri]
- self.count[uri] = c - 1
- lock.release()
- return uri
- def status(self):
- return True
- # FIXME: this code is broken, it relies on a central service registry
- #
- # def clusterStart(self, changedClusterParams=[]):
- # self.log.debug("clusterStart method invoked.")
- # self.dict = {}
- # self.count = {}
- # try:
- # if (len(changedClusterParams) > 0):
- # self.log.debug("Updating config.")
- # for param in changedClusterParams:
- # (key, sep1, val) = param.partition('=')
- # (i1, sep2, i2) = key.partition('.')
- # try:
- # prev = self.cfg[i1][i2]
- # self.rm.cfg[i1][i2] = val
- # self.cfg[i1][i2] = val
- # self.log.debug("nModified [%s][%s]=%s to [%s][%s]=%s" % (i1, i2, prev, i1, i2, val))
- # except KeyError, e:
- # self.log.info("Skipping %s as no such config parameter found in ringmaster" % param)
- # self.log.debug("Regenerating Service Description.")
- # dGen = DescGenerator(self.rm.cfg)
- # self.rm.cfg['servicedesc'] = dGen.createServiceDescDict()
- # self.cfg['servicedesc'] = self.rm.cfg['servicedesc']
- #
- # self.rm.tar = None
- # if self.rm.cfg['ringmaster'].has_key('hadoop-tar-ball'):
- # self.rm.download = True
- # self.rm.tar = self.rm.cfg['ringmaster']['hadoop-tar-ball']
- # self.log.debug("self.rm.tar=%s" % self.rm.tar)
- #
- # self.rm.cd_to_tempdir()
- #
- # self.rm.tarAddress = None
- # hostname = socket.gethostname()
- # if (self.rm.download):
- # self.rm.basename = os.path.basename(self.rm.tar)
- # dest = os.path.join(os.getcwd(), self.rm.basename)
- # src = self.rm.tar
- # self.log.debug("cp %s -> %s" % (src, dest))
- # shutil.copy(src, dest)
- # self.rm.tarAddress = "%s%s" % (self.rm.httpAddress, self.rm.basename)
- # self.registerTarSource(hostname, self.rm.tarAddress)
- # self.log.debug("Registered new tarAddress %s" % self.rm.tarAddress)
- # else:
- # self.log.debug("Download not set.")
- #
- # if (self.rm.tar != None):
- # self.cfg['hodring']['download-addr'] = self.rm.tarAddress
- # self.rm.cfg['hodring']['download-addr'] = self.rm.tarAddress
- #
- # sdl = self.rm.cfg['servicedesc']
- # workDirs = self.rm.getWorkDirs(self.rm.cfg, True)
- # hdfsDesc = sdl['hdfs']
- # hdfs = None
- # if hdfsDesc.isExternal():
- # hdfs = HdfsExternal(hdfsDesc, workDirs)
- # else:
- # hdfs = Hdfs(hdfsDesc, workDirs, 0, False, True)
- #
- # self.rm.serviceDict[hdfs.getName()] = hdfs
- # mrDesc = sdl['mapred']
- # mr = None
- # if mrDesc.isExternal():
- # mr = MapReduceExternal(mrDesc, workDirs)
- # else:
- # mr = MapReduce(mrDesc, workDirs, 1)
- # self.rm.serviceDict[mr.getName()] = mr
- #
- # ringList = self.rm.serviceClient.getServiceInfo(self.cfg['hodring']['userid'],
- # self.np.getServiceId(), 'hodring', 'hod')
- #
- # slaveList = ringList
- # hdfsringXRAddress = None
- # # Start HDFS Master - Step 1
- # if not hdfsDesc.isExternal():
- # masterFound = False
- # for ring in ringList:
- # ringXRAddress = ring['xrs']
- # if ringXRAddress == None:
- # raise Exception("Could not get hodring XML-RPC server address.")
- # if (ringXRAddress.find(self.hdfsHost) != -1):
- # ringClient = hodXRClient(ringXRAddress, None, None, 0, 0, 0, False, 0)
- # hdfsringXRAddress = ringXRAddress
- # self.log.debug("Invoking clusterStart on " + ringXRAddress + " (HDFS Master)")
- # ringClient.clusterStart()
- # masterFound = True
- # slaveList.remove(ring)
- # break
- # if not masterFound:
- # raise Exception("HDFS Master host not found")
- # while hdfs.getInfoAddrs() == None:
- # self.log.debug("Waiting for HDFS Master (Name Node) to register dfs.info.port")
- # time.sleep(1)
- #
- # # Start MAPRED Master - Step 2
- # if not mrDesc.isExternal():
- # masterFound = False
- # for ring in ringList:
- # ringXRAddress = ring['xrs']
- # if ringXRAddress == None:
- # raise Exception("Could not get hodring XML-RPC server address.")
- # if (not mrDesc.isExternal() and ringXRAddress.find(self.mapredHost) != -1):
- # ringClient = hodXRClient(ringXRAddress, None, None, 0, 0, 0, False, 0)
- # self.log.debug("Invoking clusterStart on " + ringXRAddress + " (MAPRED Master)")
- # ringClient.clusterStart()
- # masterFound = True
- # slaveList.remove(ring)
- # break
- # if not masterFound:
- # raise Excpetion("MAPRED Master host not found")
- # while mr.getInfoAddrs() == None:
- # self.log.debug("Waiting for MAPRED Master (Job Tracker) to register
- # mapred.job.tracker.info.port")
- # time.sleep(1)
- #
- # # Start Slaves - Step 3
- # for ring in slaveList:
- # ringXRAddress = ring['xrs']
- # if ringXRAddress == None:
- # raise Exception("Could not get hodring XML-RPC server address.")
- # ringClient = hodXRClient(ringXRAddress, None, None, 0, 0, 0, False, 0)
- # self.log.debug("Invoking clusterStart on " + ringXRAddress + " (Slaves)")
- # ringThread = func(name='hodring_slaves_start', functionRef=ringClient.clusterStart())
- # ring['thread'] = ringThread
- # ringThread.start()
- #
- # for ring in slaveList:
- # ringThread = ring['thread']
- # if ringThread == None:
- # raise Exception("Could not get hodring thread (Slave).")
- # ringThread.join()
- # self.log.debug("Completed clusterStart on " + ring['xrs'] + " (Slave)")
- #
- # # Run Admin Commands on HDFS Master - Step 4
- # if not hdfsDesc.isExternal():
- # if hdfsringXRAddress == None:
- # raise Exception("HDFS Master host not found (to Run Admin Commands)")
- # ringClient = hodXRClient(hdfsringXRAddress, None, None, 0, 0, 0, False, 0)
- # self.log.debug("Invoking clusterStart(False) - Admin on "
- # + hdfsringXRAddress + " (HDFS Master)")
- # ringClient.clusterStart(False)
- #
- # except:
- # self.log.debug(get_exception_string())
- # return False
- #
- # self.log.debug("Successfully started cluster.")
- # return True
- #
- # def clusterStop(self):
- # self.log.debug("clusterStop method invoked.")
- # try:
- # hdfsAddr = self.getServiceAddr('hdfs')
- # if hdfsAddr.find(':') != -1:
- # h, p = hdfsAddr.split(':', 1)
- # self.hdfsHost = h
- # self.log.debug("hdfsHost: " + self.hdfsHost)
- # mapredAddr = self.getServiceAddr('mapred')
- # if mapredAddr.find(':') != -1:
- # h, p = mapredAddr.split(':', 1)
- # self.mapredHost = h
- # self.log.debug("mapredHost: " + self.mapredHost)
- # ringList = self.rm.serviceClient.getServiceInfo(self.cfg['hodring']['userid'],
- # self.np.getServiceId(),
- # 'hodring', 'hod')
- # for ring in ringList:
- # ringXRAddress = ring['xrs']
- # if ringXRAddress == None:
- # raise Exception("Could not get hodring XML-RPC server address.")
- # ringClient = hodXRClient(ringXRAddress, None, None, 0, 0, 0, False)
- # self.log.debug("Invoking clusterStop on " + ringXRAddress)
- # ringThread = func(name='hodring_stop', functionRef=ringClient.clusterStop())
- # ring['thread'] = ringThread
- # ringThread.start()
- #
- # for ring in ringList:
- # ringThread = ring['thread']
- # if ringThread == None:
- # raise Exception("Could not get hodring thread.")
- # ringThread.join()
- # self.log.debug("Completed clusterStop on " + ring['xrs'])
- #
- # except:
- # self.log.debug(get_exception_string())
- # return False
- #
- # self.log.debug("Successfully stopped cluster.")
- #
- # return True
- def getCommand(self, addr):
- """This method is called by the
- hodrings to get commands from
- the ringmaster"""
- lock = self.cmdLock
- cmdList = []
- lock.acquire()
- try:
- try:
- for v in self.serviceDict.itervalues():
- if (not v.isExternal()):
- if v.isLaunchable(self.serviceDict):
- # If a master is still not launched, or the number of
- # retries for launching master is not reached,
- # launch master
- if not v.isMasterLaunched() and
- (v.getMasterFailureCount() <=
- self.cfg['ringmaster']['max-master-failures']):
- cmdList = v.getMasterCommands(self.serviceDict)
- v.setlaunchedMaster()
- v.setMasterAddress(addr)
- break
- if cmdList == []:
- for s in self.serviceDict.itervalues():
- if (not v.isExternal()):
- if s.isMasterInitialized():
- cl = s.getWorkerCommands(self.serviceDict)
- cmdList.extend(cl)
- else:
- cmdList = []
- break
- except:
- self.log.debug(get_exception_string())
- finally:
- lock.release()
- pass
- cmd = addr + pformat(cmdList)
- self.log.debug("getCommand returning " + cmd)
- return cmdList
- def getAdminCommand(self, addr):
- """This method is called by the
- hodrings to get admin commands from
- the ringmaster"""
- lock = self.cmdLock
- cmdList = []
- lock.acquire()
- try:
- try:
- for v in self.serviceDict.itervalues():
- cmdList = v.getAdminCommands(self.serviceDict)
- if cmdList != []:
- break
- except Exception, e:
- self.log.debug(get_exception_string())
- finally:
- lock.release()
- pass
- cmd = addr + pformat(cmdList)
- self.log.debug("getAdminCommand returning " + cmd)
- return cmdList
- def addMasterParams(self, addr, vals):
- """This method is called by
- hodring to update any parameters
- its changed for the commands it was
- running"""
- self.log.debug('Comment: adding master params from %s' % addr)
- self.log.debug(pformat(vals))
- lock = self.masterParamLock
- lock.acquire()
- try:
- for v in self.serviceDict.itervalues():
- if v.isMasterLaunched():
- if (v.getMasterAddress() == addr):
- v.setMasterParams(vals)
- v.setMasterInitialized()
- except:
- self.log.debug(get_exception_string())
- pass
- lock.release()
- return addr
- def setHodRingErrors(self, addr, errors):
- """This method is called by the hodrings to update errors
- it encountered while starting up"""
- self.log.critical("Hodring at %s failed with following errors:n%s"
- % (addr, errors))
- lock = self.masterParamLock
- lock.acquire()
- try:
- for v in self.serviceDict.itervalues():
- if v.isMasterLaunched():
- if (v.getMasterAddress() == addr):
- # strip the PID part.
- idx = addr.rfind('_')
- if idx is not -1:
- addr = addr[:idx]
- v.setMasterFailed("Hodring at %s failed with following"
- " errors:n%s" % (addr, errors))
- except:
- self.log.debug(get_exception_string())
- pass
- lock.release()
- return True
- def getKeys(self):
- lock= self.masterParamLock
- lock.acquire()
- keys = self.serviceDict.keys()
- lock.release()
- return keys
- def getServiceAddr(self, name):
- addr = 'not found'
- self.log.debug("getServiceAddr name: %s" % name)
- lock= self.masterParamLock
- lock.acquire()
- try:
- service = self.serviceDict[name]
- except KeyError:
- pass
- else:
- self.log.debug("getServiceAddr service: %s" % service)
- # Check if we should give up ! If the limit on max failures is hit,
- # give up.
- err = service.getMasterFailed()
- if (err is not None) and
- (service.getMasterFailureCount() >
- self.cfg['ringmaster']['max-master-failures']):
- self.log.critical("Detected errors (%s) beyond allowed number"
- " of failures (%s). Flagging error to client"
- % (service.getMasterFailureCount(),
- self.cfg['ringmaster']['max-master-failures']))
- addr = "Error: " + err
- elif (service.isMasterInitialized()):
- addr = service.getMasterAddrs()[0]
- else:
- addr = 'not found'
- lock.release()
- self.log.debug("getServiceAddr addr %s: %s" % (name, addr))
- return addr
- def getURLs(self, name):
- addr = 'none'
- lock = self.masterParamLock
- lock.acquire()
- try:
- service = self.serviceDict[name]
- except KeyError:
- pass
- else:
- if (service.isMasterInitialized()):
- addr = service.getInfoAddrs()[0]
- lock.release()
- return addr
- def stopRM(self):
- """An XMLRPC call which will spawn a thread to stop the Ringmaster program."""
- # We spawn a thread here because we want the XMLRPC call to return. Calling
- # stop directly from here will also stop the XMLRPC server.
- try:
- self.log.debug("inside xml-rpc call to stop ringmaster")
- rmStopperThread = func('RMStopper', self.rm.stop)
- rmStopperThread.start()
- self.log.debug("returning from xml-rpc call to stop ringmaster")
- return True
- except:
- self.log.debug("Exception in stop: %s" % get_exception_string())
- return False
- class RingMaster:
- def __init__(self, cfg, log, **kwds):
- """starts nodepool and services"""
- self.download = False
- self.httpServer = None
- self.cfg = cfg
- self.log = log
- self.__hostname = local_fqdn()
- self.workDirs = None
- # ref to the idle job tracker object.
- self.__jtMonitor = None
- self.__idlenessDetected = False
- self.__stopInProgress = False
- self.__isStopped = False # to let main exit
- self.__exitCode = 0 # exit code with which the ringmaster main method should return
- self.workers_per_ring = self.cfg['ringmaster']['workers_per_ring']
- self.__initialize_signal_handlers()
- sdd = self.cfg['servicedesc']
- gsvc = None
- for key in sdd:
- gsvc = sdd[key]
- break
- npd = self.cfg['nodepooldesc']
- self.np = NodePoolUtil.getNodePool(npd, cfg, log)
- self.log.debug("Getting service ID.")
- self.serviceId = self.np.getServiceId()
- self.log.debug("Got service ID: %s" % self.serviceId)
- self.tarSrcLoc = None
- if self.cfg['ringmaster'].has_key('hadoop-tar-ball'):
- self.download = True
- self.tarSrcLoc = self.cfg['ringmaster']['hadoop-tar-ball']
- self.cd_to_tempdir()
- if (self.download):
- self.__copy_tarball(os.getcwd())
- self.basename = self.__find_tarball_in_dir(os.getcwd())
- if self.basename is None:
- raise Exception('Did not find tarball copied from %s in %s.'
- % (self.tarSrcLoc, os.getcwd()))
- self.serviceAddr = to_http_url(self.cfg['ringmaster']['svcrgy-addr'])
- self.log.debug("Service registry @ %s" % self.serviceAddr)
- self.serviceClient = hodXRClient(self.serviceAddr)
- self.serviceDict = {}
- try:
- sdl = self.cfg['servicedesc']
- workDirs = self.getWorkDirs(cfg)
- hdfsDesc = sdl['hdfs']
- hdfs = None
- # Determine hadoop Version
- hadoopVers = hadoopVersion(self.__getHadoopDir(),
- self.cfg['hodring']['java-home'], self.log)
- if (hadoopVers['major']==None) or (hadoopVers['minor']==None):
- raise Exception('Could not retrive the version of Hadoop.'
- + ' Check the Hadoop installation or the value of the hodring.java-home variable.')
- if hdfsDesc.isExternal():
- hdfs = HdfsExternal(hdfsDesc, workDirs, version=int(hadoopVers['minor']))
- hdfs.setMasterParams( self.cfg['gridservice-hdfs'] )
- else:
- hdfs = Hdfs(hdfsDesc, workDirs, 0, version=int(hadoopVers['minor']),
- workers_per_ring = self.workers_per_ring)
- self.serviceDict[hdfs.getName()] = hdfs
- mrDesc = sdl['mapred']
- mr = None
- if mrDesc.isExternal():
- mr = MapReduceExternal(mrDesc, workDirs, version=int(hadoopVers['minor']))
- mr.setMasterParams( self.cfg['gridservice-mapred'] )
- else:
- mr = MapReduce(mrDesc, workDirs,1, version=int(hadoopVers['minor']),
- workers_per_ring = self.workers_per_ring)
- self.serviceDict[mr.getName()] = mr
- except:
- self.log.critical("Exception in creating Hdfs and Map/Reduce descriptor objects:
- %s." % get_exception_error_string())
- self.log.debug(get_exception_string())
- raise
- # should not be starting these in a constructor
- ringMasterServer.startService(self.serviceDict, cfg, self.np, log, self)
- self.rpcserver = ringMasterServer.getAddress()
- self.httpAddress = None
- self.tarAddress = None
- hostname = socket.gethostname()
- if (self.download):
- self.httpServer = threadedHTTPServer(hostname,
- self.cfg['ringmaster']['http-port-range'])
- self.httpServer.serve_forever()
- self.httpAddress = "http://%s:%d/" % (self.httpServer.server_address[0],
- self.httpServer.server_address[1])
- self.tarAddress = "%s%s" % (self.httpAddress, self.basename)
- ringMasterServer.instance.logMasterSources.registerTarSource(hostname,
- self.tarAddress)
- else:
- self.log.debug("Download not set.")
- self.log.debug("%s %s %s %s %s" % (self.cfg['ringmaster']['userid'],
- self.serviceId, self.__hostname, 'ringmaster', 'hod'))
- if self.cfg['ringmaster']['register']:
- if self.httpAddress:
- self.serviceClient.registerService(self.cfg['ringmaster']['userid'],
- self.serviceId, self.__hostname, 'ringmaster', 'hod', {
- 'xrs' : self.rpcserver, 'http' : self.httpAddress })
- else:
- self.serviceClient.registerService(self.cfg['ringmaster']['userid'],
- self.serviceId, self.__hostname, 'ringmaster', 'hod', {
- 'xrs' : self.rpcserver, })
- self.log.debug("Registered with serivce registry: %s." % self.serviceAddr)
- hodRingPath = os.path.join(cfg['ringmaster']['base-dir'], 'bin', 'hodring')
- hodRingWorkDir = os.path.join(cfg['hodring']['temp-dir'], 'hodring' + '_'
- + getpass.getuser())
- self.cfg['hodring']['hodring'] = [hodRingWorkDir,]
- self.cfg['hodring']['svcrgy-addr'] = self.cfg['ringmaster']['svcrgy-addr']
- self.cfg['hodring']['service-id'] = self.np.getServiceId()
- self.cfg['hodring']['ringmaster-xrs-addr'] = self.__url_to_addr(self.rpcserver)
- if (self.tarSrcLoc != None):
- cfg['hodring']['download-addr'] = self.tarAddress
- self.__init_job_tracker_monitor(ringMasterServer.instance.logMasterSources)
- def __init_job_tracker_monitor(self, logMasterSources):
- hadoopDir = self.__getHadoopDir()
- self.log.debug('hadoopdir=%s, java-home=%s' %
- (hadoopDir, self.cfg['hodring']['java-home']))
- try:
- self.__jtMonitor = JobTrackerMonitor(self.log, self,
- self.cfg['ringmaster']['jt-poll-interval'],
- self.cfg['ringmaster']['idleness-limit'],
- hadoopDir, self.cfg['hodring']['java-home'],
- logMasterSources)
- self.log.debug('starting jt monitor')
- self.__jtMonitor.start()
- except:
- self.log.critical('Exception in running idle job tracker. This cluster cannot be deallocated if idle.
- Exception message: %s' % get_exception_error_string())
- self.log.debug('Exception details: %s' % get_exception_string())
- def __getHadoopDir(self):
- hadoopDir = None
- if self.cfg['ringmaster'].has_key('hadoop-tar-ball'):
- tarFile = os.path.join(os.getcwd(), self.basename)
- ret = untar(tarFile, os.getcwd())
- if not ret:
- raise Exception('Untarring tarfile %s to directory %s failed. Cannot find hadoop directory.'
- % (tarFile, os.getcwd()))
- hadoopDir = os.path.join(os.getcwd(), self.__get_dir(tarFile))
- else:
- hadoopDir = self.cfg['gridservice-mapred']['pkgs']
- self.log.debug('Returning Hadoop directory as: %s' % hadoopDir)
- return hadoopDir
- def __get_dir(self, name):
- """Return the root directory inside the tarball
- specified by name. Assumes that the tarball begins
- with a root directory."""
- import tarfile
- myTarFile = tarfile.open(name)
- hadoopPackage = myTarFile.getnames()[0]
- self.log.debug("tarball name : %s hadoop package name : %s" %(name,hadoopPackage))
- return hadoopPackage
- def __find_tarball_in_dir(self, dir):
- """Find the tarball among files specified in the given
- directory. We need this method because how the tarball
- source URI is given depends on the method of copy and
- we can't get the tarball name from that.
- This method will fail if there are multiple tarballs
- in the directory with the same suffix."""
- files = os.listdir(dir)
- for file in files:
- if self.tarSrcLoc.endswith(file):
- return file
- return None
- def __copy_tarball(self, destDir):
- """Copy the hadoop tar ball from a remote location to the
- specified destination directory. Based on the URL it executes
- an appropriate copy command. Throws an exception if the command
- returns a non-zero exit code."""
- # for backwards compatibility, treat the default case as file://
- url = ''
- if self.tarSrcLoc.startswith('/'):
- url = 'file:/'
- src = '%s%s' % (url, self.tarSrcLoc)
- if src.startswith('file://'):
- src = src[len('file://')-1:]
- cpCmd = '/bin/cp'
- cmd = '%s %s %s' % (cpCmd, src, destDir)
- self.log.debug('Command to execute: %s' % cmd)
- copyProc = simpleCommand('remote copy', cmd)
- copyProc.start()
- copyProc.wait()
- copyProc.join()
- ret = copyProc.exit_code()
- self.log.debug('Completed command execution. Exit Code: %s.' % ret)
- if ret != 0:
- output = copyProc.output()
- raise Exception('Could not copy tarball using command %s. Exit code: %s. Output: %s'
- % (cmd, ret, output))
- else:
- raise Exception('Unsupported URL for file: %s' % src)
- # input: http://hostname:port/. output: [hostname,port]
- def __url_to_addr(self, url):
- addr = url.rstrip('/')
- if addr.startswith('http://'):
- addr = addr.replace('http://', '', 1)
- addr_parts = addr.split(':')
- return [addr_parts[0], int(addr_parts[1])]
- def __initialize_signal_handlers(self):
- def sigStop(sigNum, handler):
- sig_wrapper(sigNum, self.stop)
- signal.signal(signal.SIGTERM, sigStop)
- signal.signal(signal.SIGINT, sigStop)
- signal.signal(signal.SIGQUIT, sigStop)
- def __clean_up(self):
- tempDir = self.__get_tempdir()
- os.chdir(os.path.split(tempDir)[0])
- if os.path.exists(tempDir):
- shutil.rmtree(tempDir, True)
- self.log.debug("Cleaned up temporary dir: %s" % tempDir)
- def __get_tempdir(self):
- dir = os.path.join(self.cfg['ringmaster']['temp-dir'],
- "%s.%s.ringmaster" % (self.cfg['ringmaster']['userid'],
- self.np.getServiceId()))
- return dir
- def getWorkDirs(self, cfg, reUse=False):
- if (not reUse) or (self.workDirs == None):
- import math
- frand = random.random()
- while math.ceil(frand) != math.floor(frand):
- frand = frand * 100
- irand = int(frand)
- uniq = '%s-%d-%s' % (socket.gethostname(), os.getpid(), irand)
- dirs = []
- parentDirs = cfg['ringmaster']['work-dirs']
- for p in parentDirs:
- dir = os.path.join(p, uniq)
- dirs.append(dir)
- self.workDirs = dirs
- return self.workDirs
- def _fetchLink(self, link, parentDir):
- parser = miniHTMLParser()
- self.log.debug("Checking link %s" %link)
- while link:
- # Get the file from the site and link
- input = urllib.urlopen(link)
- out = None
- contentType = input.info().gettype()
- isHtml = contentType == 'text/html'
- #print contentType
- if isHtml:
- parser.setBaseUrl(input.geturl())
- else:
- parsed = urlparse.urlparse(link)
- hp = parsed[1]
- h = hp
- p = None
- if hp.find(':') != -1:
- h, p = hp.split(':', 1)
- path = parsed[2]
- path = path.split('/')
- file = os.path.join(parentDir, h, p)
- for c in path:
- if c == '':
- continue
- file = os.path.join(file, c)
- try:
- self.log.debug('Creating %s' % file)
- dir, tail = os.path.split(file)
- if not os.path.exists(dir):
- os.makedirs(dir)
- except:
- self.log.debug(get_exception_string())
- out = open(file, 'w')
- bufSz = 8192
- buf = input.read(bufSz)
- while len(buf) > 0:
- if isHtml:
- # Feed the file into the HTML parser
- parser.feed(buf)
- if out:
- out.write(buf)
- buf = input.read(bufSz)
- input.close()
- if out:
- out.close()
- # Search the retfile here
- # Get the next link in level traversal order
- link = parser.getNextLink()
- parser.close()
- def _finalize(self):
- try:
- # FIXME: get dir from config
- dir = 'HOD-log-P%d' % (os.getpid())
- dir = os.path.join('.', dir)
- except:
- self.log.debug(get_exception_string())
- self.np.finalize()
- def handleIdleJobTracker(self):
- self.log.critical("Detected idle job tracker for %s seconds. The allocation will be cleaned up."
- % self.cfg['ringmaster']['idleness-limit'])
- self.__idlenessDetected = True
- def cd_to_tempdir(self):
- dir = self.__get_tempdir()
- if not os.path.exists(dir):
- os.makedirs(dir)
- os.chdir(dir)
- return dir
- def getWorkload(self):
- return self.workload
- def getHostName(self):
- return self.__hostname
- def start(self):
- """run the thread main loop"""
- self.log.debug("Entered start method.")
- hodring = os.path.join(self.cfg['ringmaster']['base-dir'],
- 'bin', 'hodring')
- largs = [hodring]
- targs = self.cfg.get_args(section='hodring')
- largs.extend(targs)
- hodringCmd = ""
- for item in largs:
- hodringCmd = "%s%s " % (hodringCmd, item)
- self.log.debug(hodringCmd)
- if self.np.runWorkers(largs) > 0:
- self.log.critical("Failed to start worker.")
- self.log.debug("Returned from runWorkers.")
- self._finalize()
- def __findExitCode(self):
- """Determine the exit code based on the status of the cluster or jobs run on them"""
- xmlrpcServer = ringMasterServer.instance.logMasterSources
- if xmlrpcServer.getServiceAddr('hdfs') == 'not found' or
- xmlrpcServer.getServiceAddr('hdfs').startswith("Error: "):
- self.__exitCode = 7
- elif xmlrpcServer.getServiceAddr('mapred') == 'not found' or
- xmlrpcServer.getServiceAddr('mapred').startswith("Error: "):
- self.__exitCode = 8
- else:
- clusterStatus = get_cluster_status(xmlrpcServer.getServiceAddr('hdfs'),
- xmlrpcServer.getServiceAddr('mapred'))
- if clusterStatus != 0:
- self.__exitCode = clusterStatus
- else:
- self.__exitCode = self.__findHadoopJobsExitCode()
- self.log.debug('exit code %s' % self.__exitCode)
- def __findHadoopJobsExitCode(self):
- """Determine the consolidate exit code of hadoop jobs run on this cluster, provided
- this information is available. Return 0 otherwise"""
- ret = 0
- failureStatus = 3
- failureCount = 0
- if self.__jtMonitor:
- jobStatusList = self.__jtMonitor.getJobsStatus()
- try:
- if len(jobStatusList) > 0:
- for jobStatus in jobStatusList:
- self.log.debug('job status for %s: %s' % (jobStatus.getJobId(),
- jobStatus.getStatus()))
- if jobStatus.getStatus() == failureStatus:
- failureCount = failureCount+1
- if failureCount > 0:
- if failureCount == len(jobStatusList): # all jobs failed
- ret = 16
- else:
- ret = 17
- except:
- self.log.debug('exception in finding hadoop jobs exit code' % get_exception_string())
- return ret
- def stop(self):
- self.log.debug("RingMaster stop method invoked.")
- if self.__stopInProgress or self.__isStopped:
- return
- self.__stopInProgress = True
- if ringMasterServer.instance is not None:
- self.log.debug('finding exit code')
- self.__findExitCode()
- self.log.debug('stopping ringmaster instance')
- ringMasterServer.stopService()
- else:
- self.__exitCode = 6
- if self.__jtMonitor is not None:
- self.__jtMonitor.stop()
- if self.httpServer:
- self.httpServer.stop()
- self.__clean_up()
- self.__isStopped = True
- def shouldStop(self):
- """Indicates whether the main loop should exit, either due to idleness condition,
- or a stop signal was received"""
- return self.__idlenessDetected or self.__isStopped
- def getExitCode(self):
- """return the exit code of the program"""
- return self.__exitCode
- def main(cfg,log):
- try:
- rm = None
- dGen = DescGenerator(cfg)
- cfg = dGen.initializeDesc()
- rm = RingMaster(cfg, log)
- rm.start()
- while not rm.shouldStop():
- time.sleep(1)
- rm.stop()
- log.debug('returning from main')
- return rm.getExitCode()
- except Exception, e:
- if log:
- log.critical(get_exception_string())
- raise Exception(e)