idleJobTracker.py
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:9k
源码类别:

网格计算

开发平台:

Java

  1. #Licensed to the Apache Software Foundation (ASF) under one
  2. #or more contributor license agreements.  See the NOTICE file
  3. #distributed with this work for additional information
  4. #regarding copyright ownership.  The ASF licenses this file
  5. #to you under the Apache License, Version 2.0 (the
  6. #"License"); you may not use this file except in compliance
  7. #with the License.  You may obtain a copy of the License at
  8. #     http://www.apache.org/licenses/LICENSE-2.0
  9. #Unless required by applicable law or agreed to in writing, software
  10. #distributed under the License is distributed on an "AS IS" BASIS,
  11. #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. #See the License for the specific language governing permissions and
  13. #limitations under the License.
  14. import os, re, time
  15. from hodlib.Common.threads import loop, func
  16. from hodlib.Common.threads import simpleCommand
  17. from hodlib.Common.util import get_exception_string, hadoopVersion
  18. class HadoopJobStatus:
  19.   """This class represents the status of a single Hadoop job"""
  20.   
  21.   def __init__(self, jobId, status):
  22.     self.__jobId = jobId
  23.     self.__status = status
  24.   def getJobId(self):
  25.     return self.__jobId
  26.   def getStatus(self):
  27.     return self.__status
  28. class HadoopClientException(Exception):
  29.   """This class represents an exception that is raised when we fail in
  30.      running the job client."""
  31.   
  32.   def __init__(self, errorCode):
  33.     self.errorCode = errorCode
  34.   
  35. class JobTrackerMonitor:
  36.   """This class monitors the JobTracker of an allocated cluster
  37.      periodically to detect whether it is idle. If it is found
  38.      to be idle for more than a configured limit, it calls back
  39.      registered handlers who can act upon the idle cluster."""
  40.   def __init__(self, log, idleJTHandler, interval, limit,
  41.                       hadoopDir, javaHome, servInfoProvider):
  42.     self.__log = log
  43.     self.__idlenessLimit = limit
  44.     self.__idleJobTrackerHandler = idleJTHandler
  45.     self.__hadoopDir = hadoopDir
  46.     hadoopPath = os.path.join(self.__hadoopDir, "bin", "hadoop")
  47.     #hadoop directory can be from pkgs or a temp location like tarball. Verify once.
  48.     if not os.path.exists(hadoopPath):
  49.       raise Exception('Invalid Hadoop path specified: %s' % hadoopPath)
  50.     self.__javaHome = javaHome
  51.     # Note that when this object is created, we don't yet know the JT URL.
  52.     # The service info provider will be polled until we get the URL.
  53.     self.__serviceInfoProvider = servInfoProvider
  54.     self.__jobCountRegExp = re.compile("([0-9]+) jobs currently running.*")
  55.     self.__jobStatusRegExp = re.compile("(S+)s+(d)s+d+s+S+$")
  56.     self.__firstIdleTime = 0
  57.     self.__hadoop15Version = { 'major' : '0', 'minor' : '15' }
  58.     #Assumption: we are not going to support versions older than 0.15 for Idle Job tracker.
  59.     if not self.__isCompatibleHadoopVersion(self.__hadoop15Version):
  60.       raise Exception('Incompatible Hadoop Version: Cannot check status')
  61.     self.__stopFlag = False
  62.     self.__jtURLFinderThread = func(name='JTURLFinderThread', functionRef=self.getJobTrackerURL)
  63.     self.__jtMonitorThread = loop(name='JTMonitorThread', functionRef=self.monitorJobTracker,
  64.                                   sleep=interval)
  65.     self.__jobTrackerURL = None
  66.   def start(self):
  67.     """This method starts a thread that will determine the JobTracker URL"""
  68.     self.__jtURLFinderThread.start()
  69.   def stop(self):
  70.     self.__log.debug('Joining the monitoring thread.')
  71.     self.__stopFlag = True
  72.     if self.__jtMonitorThread.isAlive():
  73.       self.__jtMonitorThread.join()
  74.     self.__log.debug('Joined the monitoring thread.')
  75.   def getJobTrackerURL(self):
  76.     """This method periodically checks the service info provider for the JT URL"""
  77.     self.__jobTrackerURL = self.__serviceInfoProvider.getServiceAddr('mapred')
  78.     while not self.__stopFlag and not self.__isValidJobTrackerURL():
  79.       time.sleep(10)
  80.       if not self.__stopFlag:
  81.         self.__jobTrackerURL = self.__serviceInfoProvider.getServiceAddr('mapred')
  82.       else:
  83.         break
  84.     if self.__isValidJobTrackerURL():
  85.       self.__log.debug('Got URL %s. Starting monitoring' % self.__jobTrackerURL)
  86.       self.__jtMonitorThread.start()
  87.   def monitorJobTracker(self):
  88.     """This method is periodically called to monitor the JobTracker of the cluster."""
  89.     try:
  90.       if self.__isIdle():
  91.         if self.__idleJobTrackerHandler:
  92.           self.__log.info('Detected cluster as idle. Calling registered callback handler.')
  93.           self.__idleJobTrackerHandler.handleIdleJobTracker()
  94.     except:
  95.       self.__log.debug('Exception while monitoring job tracker. %s' % get_exception_string())
  96.   def getJobsStatus(self):
  97.     """This method should return the status of all jobs that are run on the HOD allocated
  98.        hadoop cluster"""
  99.     jobStatusList = []
  100.     try:
  101.       hadoop16Version = { 'major' : '0', 'minor' : '16' }
  102.       if self.__isCompatibleHadoopVersion(hadoop16Version):
  103.         jtStatusCommand = self.__initStatusCommand(option='-list all')
  104.         jtStatusCommand.start()
  105.         jtStatusCommand.wait()
  106.         jtStatusCommand.join()
  107.         if jtStatusCommand.exit_code() == 0:
  108.           for line in jtStatusCommand.output():
  109.             jobStatus = self.__extractJobStatus(line)
  110.             if jobStatus is not None:
  111.               jobStatusList.append(jobStatus)
  112.     except:
  113.       self.__log.debug('Exception while getting job statuses. %s' % get_exception_string())
  114.     return jobStatusList
  115.   def __isValidJobTrackerURL(self):
  116.     """This method checks that the passed in URL is not one of the special case strings
  117.        returned by the getServiceAddr API"""
  118.     return ((self.__jobTrackerURL != None) and (self.__jobTrackerURL != 'not found') 
  119.               and (not self.__jobTrackerURL.startswith('Error')))
  120.   def __extractJobStatus(self, line):
  121.     """This method parses an output line from the job status command and creates
  122.        the JobStatus object if there is a match"""
  123.     jobStatus = None
  124.     line = line.strip()
  125.     jsMatch = self.__jobStatusRegExp.match(line)
  126.     if jsMatch:
  127.       jobStatus = HadoopJobStatus(jsMatch.group(1), int(jsMatch.group(2)))
  128.     return jobStatus
  129.   def __isIdle(self):
  130.     """This method checks if the JobTracker is idle beyond a certain limit."""
  131.     jobCount = 0
  132.     err = False
  133.     try:
  134.       jobCount = self.__getJobCount()
  135.     except HadoopClientException, hce:
  136.       self.__log.debug('HadoopClientException handled in getting job count. 
  137.                                       Error code: %s' % hce.errorCode)
  138.       err = True
  139.     if (jobCount==0) or err:
  140.       if self.__firstIdleTime == 0:
  141.         #detecting idleness for the first time
  142.         self.__firstIdleTime = time.time()
  143.       else:
  144.         if ((time.time()-self.__firstIdleTime) >= self.__idlenessLimit):
  145.           self.__log.info('Idleness limit crossed for cluster')
  146.           return True
  147.     else:
  148.       # reset idleness time
  149.       self.__firstIdleTime = 0
  150.       
  151.     return False
  152.   def __getJobCount(self):
  153.     """This method executes the hadoop job -list command and parses the output to detect
  154.        the number of running jobs."""
  155.     # We assume here that the poll interval is small enough to detect running jobs. 
  156.     # If jobs start and stop within the poll interval, the cluster would be incorrectly 
  157.     # treated as idle. Hadoop 2266 will provide a better mechanism than this.
  158.     jobs = -1
  159.     jtStatusCommand = self.__initStatusCommand()
  160.     jtStatusCommand.start()
  161.     jtStatusCommand.wait()
  162.     jtStatusCommand.join()
  163.     if jtStatusCommand.exit_code() == 0:
  164.       for line in jtStatusCommand.output():
  165.         match = self.__jobCountRegExp.match(line)
  166.         if match:
  167.           jobs = int(match.group(1))
  168.     elif jtStatusCommand.exit_code() == 1:
  169.       # for now, exit code 1 comes for any exception raised by JobClient. If hadoop gets
  170.       # to differentiate and give more granular exit codes, we can check for those errors
  171.       # corresponding to network errors etc.
  172.       raise HadoopClientException(jtStatusCommand.exit_code())
  173.     return jobs
  174.   def __isCompatibleHadoopVersion(self, expectedVersion):
  175.     """This method determines whether the version of hadoop being used is one that 
  176.        is higher than the expectedVersion.
  177.        This can be used for checking if a particular feature is available or not"""
  178.     ver = hadoopVersion(self.__hadoopDir, self.__javaHome, self.__log)
  179.     ret = False
  180.   
  181.     if (ver['major']!=None) and (int(ver['major']) >= int(expectedVersion['major'])) 
  182.       and (ver['minor']!=None) and (int(ver['minor']) >= int(expectedVersion['minor'])):
  183.       ret = True
  184.     return ret
  185.   def __initStatusCommand(self, option="-list"):
  186.     """This method initializes the command to run to check the JT status"""
  187.     cmd = None
  188.     hadoopPath = os.path.join(self.__hadoopDir, 'bin', 'hadoop')
  189.     cmdStr = "%s job -jt %s" % (hadoopPath, self.__jobTrackerURL)
  190.     cmdStr = "%s %s" % (cmdStr, option)
  191.     self.__log.debug('cmd str %s' % cmdStr)
  192.     env = os.environ
  193.     env['JAVA_HOME'] = self.__javaHome
  194.     cmd = simpleCommand('HadoopStatus', cmdStr, env)
  195.     return cmd
  196.