idleJobTracker.py
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:9k
- #Licensed to the Apache Software Foundation (ASF) under one
- #or more contributor license agreements. See the NOTICE file
- #distributed with this work for additional information
- #regarding copyright ownership. The ASF licenses this file
- #to you under the Apache License, Version 2.0 (the
- #"License"); you may not use this file except in compliance
- #with the License. You may obtain a copy of the License at
- # http://www.apache.org/licenses/LICENSE-2.0
- #Unless required by applicable law or agreed to in writing, software
- #distributed under the License is distributed on an "AS IS" BASIS,
- #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- #See the License for the specific language governing permissions and
- #limitations under the License.
- import os, re, time
- from hodlib.Common.threads import loop, func
- from hodlib.Common.threads import simpleCommand
- from hodlib.Common.util import get_exception_string, hadoopVersion
- class HadoopJobStatus:
- """This class represents the status of a single Hadoop job"""
-
- def __init__(self, jobId, status):
- self.__jobId = jobId
- self.__status = status
- def getJobId(self):
- return self.__jobId
- def getStatus(self):
- return self.__status
- class HadoopClientException(Exception):
- """This class represents an exception that is raised when we fail in
- running the job client."""
-
- def __init__(self, errorCode):
- self.errorCode = errorCode
-
- class JobTrackerMonitor:
- """This class monitors the JobTracker of an allocated cluster
- periodically to detect whether it is idle. If it is found
- to be idle for more than a configured limit, it calls back
- registered handlers who can act upon the idle cluster."""
- def __init__(self, log, idleJTHandler, interval, limit,
- hadoopDir, javaHome, servInfoProvider):
- self.__log = log
- self.__idlenessLimit = limit
- self.__idleJobTrackerHandler = idleJTHandler
- self.__hadoopDir = hadoopDir
- hadoopPath = os.path.join(self.__hadoopDir, "bin", "hadoop")
- #hadoop directory can be from pkgs or a temp location like tarball. Verify once.
- if not os.path.exists(hadoopPath):
- raise Exception('Invalid Hadoop path specified: %s' % hadoopPath)
- self.__javaHome = javaHome
- # Note that when this object is created, we don't yet know the JT URL.
- # The service info provider will be polled until we get the URL.
- self.__serviceInfoProvider = servInfoProvider
- self.__jobCountRegExp = re.compile("([0-9]+) jobs currently running.*")
- self.__jobStatusRegExp = re.compile("(S+)s+(d)s+d+s+S+$")
- self.__firstIdleTime = 0
- self.__hadoop15Version = { 'major' : '0', 'minor' : '15' }
- #Assumption: we are not going to support versions older than 0.15 for Idle Job tracker.
- if not self.__isCompatibleHadoopVersion(self.__hadoop15Version):
- raise Exception('Incompatible Hadoop Version: Cannot check status')
- self.__stopFlag = False
- self.__jtURLFinderThread = func(name='JTURLFinderThread', functionRef=self.getJobTrackerURL)
- self.__jtMonitorThread = loop(name='JTMonitorThread', functionRef=self.monitorJobTracker,
- sleep=interval)
- self.__jobTrackerURL = None
- def start(self):
- """This method starts a thread that will determine the JobTracker URL"""
- self.__jtURLFinderThread.start()
- def stop(self):
- self.__log.debug('Joining the monitoring thread.')
- self.__stopFlag = True
- if self.__jtMonitorThread.isAlive():
- self.__jtMonitorThread.join()
- self.__log.debug('Joined the monitoring thread.')
- def getJobTrackerURL(self):
- """This method periodically checks the service info provider for the JT URL"""
- self.__jobTrackerURL = self.__serviceInfoProvider.getServiceAddr('mapred')
- while not self.__stopFlag and not self.__isValidJobTrackerURL():
- time.sleep(10)
- if not self.__stopFlag:
- self.__jobTrackerURL = self.__serviceInfoProvider.getServiceAddr('mapred')
- else:
- break
- if self.__isValidJobTrackerURL():
- self.__log.debug('Got URL %s. Starting monitoring' % self.__jobTrackerURL)
- self.__jtMonitorThread.start()
- def monitorJobTracker(self):
- """This method is periodically called to monitor the JobTracker of the cluster."""
- try:
- if self.__isIdle():
- if self.__idleJobTrackerHandler:
- self.__log.info('Detected cluster as idle. Calling registered callback handler.')
- self.__idleJobTrackerHandler.handleIdleJobTracker()
- except:
- self.__log.debug('Exception while monitoring job tracker. %s' % get_exception_string())
- def getJobsStatus(self):
- """This method should return the status of all jobs that are run on the HOD allocated
- hadoop cluster"""
- jobStatusList = []
- try:
- hadoop16Version = { 'major' : '0', 'minor' : '16' }
- if self.__isCompatibleHadoopVersion(hadoop16Version):
- jtStatusCommand = self.__initStatusCommand(option='-list all')
- jtStatusCommand.start()
- jtStatusCommand.wait()
- jtStatusCommand.join()
- if jtStatusCommand.exit_code() == 0:
- for line in jtStatusCommand.output():
- jobStatus = self.__extractJobStatus(line)
- if jobStatus is not None:
- jobStatusList.append(jobStatus)
- except:
- self.__log.debug('Exception while getting job statuses. %s' % get_exception_string())
- return jobStatusList
- def __isValidJobTrackerURL(self):
- """This method checks that the passed in URL is not one of the special case strings
- returned by the getServiceAddr API"""
- return ((self.__jobTrackerURL != None) and (self.__jobTrackerURL != 'not found')
- and (not self.__jobTrackerURL.startswith('Error')))
- def __extractJobStatus(self, line):
- """This method parses an output line from the job status command and creates
- the JobStatus object if there is a match"""
- jobStatus = None
- line = line.strip()
- jsMatch = self.__jobStatusRegExp.match(line)
- if jsMatch:
- jobStatus = HadoopJobStatus(jsMatch.group(1), int(jsMatch.group(2)))
- return jobStatus
- def __isIdle(self):
- """This method checks if the JobTracker is idle beyond a certain limit."""
- jobCount = 0
- err = False
- try:
- jobCount = self.__getJobCount()
- except HadoopClientException, hce:
- self.__log.debug('HadoopClientException handled in getting job count.
- Error code: %s' % hce.errorCode)
- err = True
- if (jobCount==0) or err:
- if self.__firstIdleTime == 0:
- #detecting idleness for the first time
- self.__firstIdleTime = time.time()
- else:
- if ((time.time()-self.__firstIdleTime) >= self.__idlenessLimit):
- self.__log.info('Idleness limit crossed for cluster')
- return True
- else:
- # reset idleness time
- self.__firstIdleTime = 0
-
- return False
- def __getJobCount(self):
- """This method executes the hadoop job -list command and parses the output to detect
- the number of running jobs."""
- # We assume here that the poll interval is small enough to detect running jobs.
- # If jobs start and stop within the poll interval, the cluster would be incorrectly
- # treated as idle. Hadoop 2266 will provide a better mechanism than this.
- jobs = -1
- jtStatusCommand = self.__initStatusCommand()
- jtStatusCommand.start()
- jtStatusCommand.wait()
- jtStatusCommand.join()
- if jtStatusCommand.exit_code() == 0:
- for line in jtStatusCommand.output():
- match = self.__jobCountRegExp.match(line)
- if match:
- jobs = int(match.group(1))
- elif jtStatusCommand.exit_code() == 1:
- # for now, exit code 1 comes for any exception raised by JobClient. If hadoop gets
- # to differentiate and give more granular exit codes, we can check for those errors
- # corresponding to network errors etc.
- raise HadoopClientException(jtStatusCommand.exit_code())
- return jobs
- def __isCompatibleHadoopVersion(self, expectedVersion):
- """This method determines whether the version of hadoop being used is one that
- is higher than the expectedVersion.
- This can be used for checking if a particular feature is available or not"""
- ver = hadoopVersion(self.__hadoopDir, self.__javaHome, self.__log)
- ret = False
-
- if (ver['major']!=None) and (int(ver['major']) >= int(expectedVersion['major']))
- and (ver['minor']!=None) and (int(ver['minor']) >= int(expectedVersion['minor'])):
- ret = True
- return ret
- def __initStatusCommand(self, option="-list"):
- """This method initializes the command to run to check the JT status"""
- cmd = None
- hadoopPath = os.path.join(self.__hadoopDir, 'bin', 'hadoop')
- cmdStr = "%s job -jt %s" % (hadoopPath, self.__jobTrackerURL)
- cmdStr = "%s %s" % (cmdStr, option)
- self.__log.debug('cmd str %s' % cmdStr)
- env = os.environ
- env['JAVA_HOME'] = self.__javaHome
- cmd = simpleCommand('HadoopStatus', cmdStr, env)
- return cmd
-