hadoop.py
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:27k
源码类别:

网格计算

开发平台:

Java

  1. #Licensed to the Apache Software Foundation (ASF) under one
  2. #or more contributor license agreements.  See the NOTICE file
  3. #distributed with this work for additional information
  4. #regarding copyright ownership.  The ASF licenses this file
  5. #to you under the Apache License, Version 2.0 (the
  6. #"License"); you may not use this file except in compliance
  7. #with the License.  You may obtain a copy of the License at
  8. #     http://www.apache.org/licenses/LICENSE-2.0
  9. #Unless required by applicable law or agreed to in writing, software
  10. #distributed under the License is distributed on an "AS IS" BASIS,
  11. #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. #See the License for the specific language governing permissions and
  13. #limitations under the License.
  14. """define WorkLoad as abstract interface for user job"""
  15. # -*- python -*-
  16. import os, time, sys, shutil, exceptions, re, threading, signal, urllib, pprint, math
  17. from HTMLParser import HTMLParser
  18. import xml.dom.minidom
  19. import xml.dom.pulldom
  20. from xml.dom import getDOMImplementation
  21. from hodlib.Common.util import *
  22. from hodlib.Common.xmlrpc import hodXRClient
  23. from hodlib.Common.miniHTMLParser import miniHTMLParser
  24. from hodlib.Common.nodepoolutil import NodePoolUtil
  25. from hodlib.Common.tcp import tcpError, tcpSocket
  26. reCommandDelimeterString = r"(?<!\);"
  27. reCommandDelimeter = re.compile(reCommandDelimeterString)
  28. class hadoopConfig:
  29.   def __create_xml_element(self, doc, name, value, description, final = False):
  30.     prop = doc.createElement("property")
  31.     nameP = doc.createElement("name")
  32.     string = doc.createTextNode(name)
  33.     nameP.appendChild(string)
  34.     valueP = doc.createElement("value")
  35.     string = doc.createTextNode(value)
  36.     valueP.appendChild(string)
  37.     if final:
  38.       finalP = doc.createElement("final")
  39.       string = doc.createTextNode("true")
  40.       finalP.appendChild(string)
  41.     desc = doc.createElement("description")
  42.     string = doc.createTextNode(description)
  43.     desc.appendChild(string)
  44.     prop.appendChild(nameP)
  45.     prop.appendChild(valueP)
  46.     if final:
  47.       prop.appendChild(finalP)
  48.     prop.appendChild(desc)
  49.     
  50.     return prop
  51.   def gen_site_conf(self, confDir, tempDir, numNodes, hdfsAddr, mrSysDir,
  52.              mapredAddr=None, clientParams=None, serverParams=None,
  53.              finalServerParams=None, clusterFactor=None):
  54.     if not mapredAddr:
  55.       mapredAddr = "dummy:8181"
  56.     
  57.     implementation = getDOMImplementation()
  58.     doc = implementation.createDocument('', 'configuration', None)
  59.     comment = doc.createComment(
  60.       "This is an auto generated hadoop-site.xml, do not modify")
  61.     topElement = doc.documentElement
  62.     topElement.appendChild(comment)
  63.     description = {}
  64.     paramsDict = {  'mapred.job.tracker'    : mapredAddr , 
  65.                     'fs.default.name'       : "hdfs://" + hdfsAddr, 
  66.                     'hadoop.tmp.dir'        : tempDir, 
  67.                  }
  68.     paramsDict['mapred.system.dir'] = mrSysDir
  69.     
  70.     # mapred-default.xml is no longer used now.
  71.     numred = int(math.floor(clusterFactor * (int(numNodes) - 1)))
  72.     paramsDict['mapred.reduce.tasks'] = str(numred)
  73.     # end
  74.     # for all the above vars generated, set the description
  75.     for k, v in paramsDict.iteritems():
  76.       description[k] = 'Hod generated parameter'
  77.     # finalservelParams
  78.     if finalServerParams:
  79.       for k, v in finalServerParams.iteritems():
  80.         if not description.has_key(k):
  81.           description[k] = "final server parameter"
  82.           paramsDict[k] = v
  83.     # servelParams
  84.     if serverParams:
  85.       for k, v in serverParams.iteritems():
  86.         if not description.has_key(k):
  87.           # if no final value for same param is mentioned
  88.           description[k] = "server parameter"
  89.           paramsDict[k] = v
  90.     # clientParams
  91.     if clientParams:
  92.       for k, v in clientParams.iteritems():
  93.         if not description.has_key(k) or description[k] == "server parameter":
  94.           # Just add, if no final value for same param is mentioned.
  95.           # Replace even if server param is mentioned for same config variable
  96.           description[k] = "client-side parameter"
  97.           paramsDict[k] = v
  98.     
  99.     # generate the xml elements
  100.     for k,v in paramsDict.iteritems():
  101.       if ( description[k] == "final server parameter" or 
  102.                              description[k] == "Hod generated parameter" ): 
  103.          final = True
  104.       else: final = False
  105.       prop = self.__create_xml_element(doc, k, v, description[k], final)
  106.       topElement.appendChild(prop)
  107.     siteName = os.path.join(confDir, "hadoop-site.xml")
  108.     sitefile = file(siteName, 'w')
  109.     print >> sitefile, topElement.toxml()
  110.     sitefile.close()
  111. class hadoopCluster:
  112.   def __init__(self, cfg, log):
  113.     self.__cfg = cfg
  114.     self.__log = log
  115.     self.__changedClusterParams = []
  116.     
  117.     self.__hostname = local_fqdn()    
  118.     self.__svcrgyClient = None
  119.     self.__nodePool = NodePoolUtil.getNodePool(self.__cfg['nodepooldesc'], 
  120.                                                self.__cfg, self.__log)        
  121.     self.__hadoopCfg = hadoopConfig()
  122.     self.jobId = None
  123.     self.mapredInfo = None
  124.     self.hdfsInfo = None
  125.     self.ringmasterXRS = None
  126.   def __get_svcrgy_client(self):
  127.     svcrgyUrl = to_http_url(self.__cfg['hod']['xrs-address'])
  128.     return hodXRClient(svcrgyUrl)
  129.   def __get_service_status(self):
  130.     serviceData = self.__get_service_data()
  131.     
  132.     status = True
  133.     hdfs = False
  134.     mapred = False
  135.     
  136.     for host in serviceData.keys():
  137.       for item in serviceData[host]:
  138.         service = item.keys()
  139.         if service[0] == 'hdfs.grid' and 
  140.           self.__cfg['gridservice-hdfs']['external'] == False:
  141.           hdfs = True
  142.         elif service[0] == 'mapred.grid':
  143.           mapred = True
  144.     
  145.     if not mapred:
  146.       status = "mapred"
  147.     
  148.     if not hdfs and self.__cfg['gridservice-hdfs']['external'] == False:
  149.       if status != True:
  150.         status = "mapred and hdfs"
  151.       else:
  152.         status = "hdfs"
  153.       
  154.     return status
  155.   
  156.   def __get_service_data(self):
  157.     registry = to_http_url(self.__cfg['hod']['xrs-address'])
  158.     serviceData = self.__svcrgyClient.getServiceInfo(
  159.       self.__cfg['hod']['userid'], self.__setup.np.getNodePoolId())
  160.     
  161.     return serviceData
  162.   
  163.   def __check_job_status(self):
  164.     failureCount = 0
  165.     status = False
  166.     state = 'Q'
  167.     userLimitsFirstFlag = True
  168.     while (state=='Q') or (state==False):
  169.       if hodInterrupt.isSet():
  170.         raise HodInterruptException()
  171.       jobInfo = self.__nodePool.getJobInfo()
  172.       state = jobInfo['job_state']
  173.       self.__log.debug('job state %s' % state)
  174.       if state == False:
  175.         failureCount += 1
  176.         if (failureCount >= self.__cfg['hod']['job-status-query-failure-retries']):
  177.           self.__log.debug('Number of retries reached max limit while querying job status')
  178.           break
  179.         time.sleep(self.__cfg['hod']['job-command-failure-interval'])
  180.       elif state!='Q':
  181.         break
  182.       else:
  183.         self.__log.debug('querying for job status after job-status-query-interval')
  184.         time.sleep(self.__cfg['hod']['job-status-query-interval'])
  185.       if self.__cfg['hod'].has_key('job-feasibility-attr') and 
  186.                       self.__cfg['hod']['job-feasibility-attr']:
  187.         (status, msg) = self.__isJobFeasible()
  188.         if status == "Never":
  189.           self.__log.critical(TORQUE_USER_LIMITS_EXCEEDED_MSG + msg + 
  190.                 "This cluster cannot be allocated now.")
  191.           return -1
  192.         elif status == False:
  193.           if userLimitsFirstFlag:
  194.             self.__log.critical(TORQUE_USER_LIMITS_EXCEEDED_MSG + msg + 
  195.                 "This cluster allocation will succeed only after other " + 
  196.                 "clusters are deallocated.")
  197.             userLimitsFirstFlag = False
  198.    
  199.     if state and state != 'C':
  200.       status = True
  201.     
  202.     return status
  203.   def __isJobFeasible(self):
  204.     return self.__nodePool.isJobFeasible()
  205.   
  206.   def __get_ringmaster_client(self):
  207.     ringmasterXRS = None
  208.    
  209.     ringList = self.__svcrgyClient.getServiceInfo(
  210.       self.__cfg['ringmaster']['userid'], self.__nodePool.getServiceId(), 
  211.       'ringmaster', 'hod')
  212.     if ringList and len(ringList):
  213.       if isinstance(ringList, list):
  214.         ringmasterXRS = ringList[0]['xrs']
  215.     else:    
  216.       count = 0
  217.       waitTime = self.__cfg['hod']['allocate-wait-time']
  218.   
  219.       while count < waitTime:
  220.         if hodInterrupt.isSet():
  221.           raise HodInterruptException()
  222.         ringList = self.__svcrgyClient.getServiceInfo(
  223.           self.__cfg['ringmaster']['userid'], self.__nodePool.getServiceId(), 
  224.           'ringmaster', 
  225.           'hod')
  226.         
  227.         if ringList and len(ringList):
  228.           if isinstance(ringList, list):        
  229.             ringmasterXRS = ringList[0]['xrs']
  230.         
  231.         if ringmasterXRS is not None:
  232.           break
  233.         else:
  234.           time.sleep(1)
  235.           count = count + 1
  236.           # check to see if the job exited by any chance in that time:
  237.           if (count % self.__cfg['hod']['job-status-query-interval'] == 0):
  238.             if not self.__check_job_status():
  239.               break
  240.     return ringmasterXRS
  241.  
  242.   def __init_hadoop_service(self, serviceName, xmlrpcClient):
  243.     status = True
  244.     serviceAddress = None
  245.     serviceInfo = None
  246.  
  247.     for i in range(0, 250): 
  248.       try:
  249.         if hodInterrupt.isSet():
  250.             raise HodInterruptException()
  251.         serviceAddress = xmlrpcClient.getServiceAddr(serviceName)
  252.         if serviceAddress:
  253.           if serviceAddress == 'not found':
  254.             time.sleep(1)
  255.           # check to see if the job exited by any chance in that time:
  256.             if ((i+1) % self.__cfg['hod']['job-status-query-interval'] == 0):
  257.               if not self.__check_job_status():
  258.                 break
  259.           else:
  260.             serviceInfo = xmlrpcClient.getURLs(serviceName)           
  261.             break 
  262.       except HodInterruptException,h :
  263.         raise h
  264.       except:
  265.         self.__log.critical("'%s': ringmaster xmlrpc error." % serviceName)
  266.         self.__log.debug(get_exception_string())
  267.         status = False
  268.         break
  269.     
  270.     if serviceAddress == 'not found' or not serviceAddress:
  271.       self.__log.critical("Failed to retrieve '%s' service address." % 
  272.                           serviceName)
  273.       status = False
  274.     elif serviceAddress.startswith("Error: "):
  275.       errs = serviceAddress[len("Error: "):]
  276.       self.__log.critical("Cluster could not be allocated because of the following errors.n%s" % 
  277.                              errs)
  278.       status = False
  279.     else:
  280.       try:
  281.         self.__svcrgyClient.registerService(self.__cfg['hodring']['userid'], 
  282.                                             self.jobId, self.__hostname, 
  283.                                             serviceName, 'grid', serviceInfo)
  284.         
  285.       except HodInterruptException, h:
  286.         raise h
  287.       except:
  288.         self.__log.critical("'%s': registry xmlrpc error." % serviceName)    
  289.         self.__log.debug(get_exception_string())
  290.         status = False
  291.         
  292.     return status, serviceAddress, serviceInfo
  293.   def __collect_jobtracker_ui(self, dir):
  294.      link = self.mapredInfo + "/jobtracker.jsp"
  295.      parser = miniHTMLParser()
  296.      parser.setBaseUrl(self.mapredInfo)
  297.      node_cache = {}
  298.      self.__log.debug("collect_jobtracker_ui seeded with " + link)
  299.      def alarm_handler(number, stack):
  300.          raise AlarmException("timeout")
  301.        
  302.      signal.signal(signal.SIGALRM, alarm_handler)
  303.      input = None
  304.      while link:
  305.        self.__log.debug("link: %s" % link)
  306.        # taskstats.jsp,taskdetails.jsp not included since too many to collect
  307.        if re.search(
  308.          "jobfailures.jsp|jobtracker.jsp|jobdetails.jsp|jobtasks.jsp", 
  309.          link):
  310.          for i in range(1,5):
  311.            if hodInterrupt.isSet():
  312.              raise HodInterruptException()
  313.            try:
  314.              input = urllib.urlopen(link)
  315.              break
  316.            except:
  317.              self.__log.debug(get_exception_string())
  318.              time.sleep(1)
  319.   
  320.          if input:
  321.            out = None
  322.     
  323.            self.__log.debug("collecting " + link + "...")
  324.            filename = re.sub(self.mapredInfo, "", link)
  325.            filename = dir + "/"  + filename
  326.            filename = re.sub("http://","", filename)
  327.            filename = re.sub("[?&=:]","_",filename)
  328.            filename = filename + ".html"
  329.     
  330.            try:
  331.              tempdir, tail = os.path.split(filename)
  332.              if not os.path.exists(tempdir):
  333.                os.makedirs(tempdir)
  334.            except:
  335.              self.__log.debug(get_exception_string())
  336.     
  337.            out = open(filename, 'w')
  338.            
  339.            bufSz = 8192
  340.            
  341.            signal.alarm(10)
  342.            
  343.            try:
  344.              self.__log.debug("Starting to grab: %s" % link)
  345.              buf = input.read(bufSz)
  346.       
  347.              while len(buf) > 0:
  348.                # Feed the file into the HTML parser
  349.                parser.feed(buf)
  350.         
  351.          # Re-write the hrefs in the file
  352.                p = re.compile("?(.+?)=(.+?)")
  353.                buf = p.sub(r"_1_2",buf)
  354.                p= re.compile("&(.+?)=(.+?)")
  355.                buf = p.sub(r"_1_2",buf)
  356.                p = re.compile("http://(.+?):(d+)?")
  357.                buf = p.sub(r"1_2/",buf)
  358.                buf = re.sub("href="/","href="",buf)
  359.                p = re.compile("href="(.+?)"")
  360.                buf = p.sub(r"href=1.html",buf)
  361.  
  362.                out.write(buf)
  363.                buf = input.read(bufSz)
  364.       
  365.              signal.alarm(0)
  366.              input.close()
  367.              if out:
  368.                out.close()
  369.                
  370.              self.__log.debug("Finished grabbing: %s" % link)
  371.            except AlarmException:
  372.              if hodInterrupt.isSet():
  373.                raise HodInterruptException()
  374.              if out: out.close()
  375.              if input: input.close()
  376.              
  377.              self.__log.debug("Failed to retrieve: %s" % link)
  378.          else:
  379.            self.__log.debug("Failed to retrieve: %s" % link)
  380.          
  381.        # Get the next link in level traversal order
  382.        link = parser.getNextLink()
  383.      parser.close()
  384.      
  385.   def check_cluster(self, clusterInfo):
  386.     status = 0
  387.     if 'mapred' in clusterInfo:
  388.       mapredAddress = clusterInfo['mapred'][7:]
  389.       hdfsAddress = clusterInfo['hdfs'][7:]
  390.       status = get_cluster_status(hdfsAddress, mapredAddress)
  391.       if status == 0:
  392.         status = 12
  393.     else:
  394.       status = 15
  395.     return status
  396.   def is_cluster_deallocated(self, jobId):
  397.     """Returns True if the JobId that represents this cluster
  398.        is in the Completed or exiting state."""
  399.     jobInfo = self.__nodePool.getJobInfo(jobId)
  400.     state = None
  401.     if jobInfo is not None and jobInfo.has_key('job_state'):
  402.       state = jobInfo['job_state']
  403.     return ((state == 'C') or (state == 'E'))
  404.   def cleanup(self):
  405.     if self.__nodePool: self.__nodePool.finalize()     
  406.   def get_job_id(self):
  407.     return self.jobId
  408.   def delete_job(self, jobId):
  409.     '''Delete a job given it's ID'''
  410.     ret = 0
  411.     if self.__nodePool: 
  412.       ret = self.__nodePool.deleteJob(jobId)
  413.     else:
  414.       raise Exception("Invalid state: Node pool is not initialized to delete the given job.")
  415.     return ret
  416.          
  417.   def is_valid_account(self):
  418.     """Verify if the account being used to submit the job is a valid account.
  419.        This code looks for a file <install-dir>/bin/verify-account. 
  420.        If the file is present, it executes the file, passing as argument 
  421.        the account name. It returns the exit code and output from the 
  422.        script on non-zero exit code."""
  423.     accountValidationScript = os.path.abspath('./verify-account')
  424.     if not os.path.exists(accountValidationScript):
  425.       return (0, None)
  426.     account = self.__nodePool.getAccountString()
  427.     exitCode = 0
  428.     errMsg = None
  429.     try:
  430.       accountValidationCmd = simpleCommand('Account Validation Command',
  431.                                              '%s %s' % (accountValidationScript,
  432.                                                         account))
  433.       accountValidationCmd.start()
  434.       accountValidationCmd.wait()
  435.       accountValidationCmd.join()
  436.       exitCode = accountValidationCmd.exit_code()
  437.       self.__log.debug('account validation script is run %d' 
  438.                           % exitCode)
  439.       errMsg = None
  440.       if exitCode is not 0:
  441.         errMsg = accountValidationCmd.output()
  442.     except Exception, e:
  443.       exitCode = 0
  444.       self.__log.warn('Error executing account script: %s ' 
  445.                          'Accounting is disabled.' 
  446.                           % get_exception_error_string())
  447.       self.__log.debug(get_exception_string())
  448.     return (exitCode, errMsg)
  449.     
  450.   def allocate(self, clusterDir, min, max=None):
  451.     status = 0
  452.     failureCount = 0
  453.     self.__svcrgyClient = self.__get_svcrgy_client()
  454.         
  455.     self.__log.debug("allocate %s %s %s" % (clusterDir, min, max))
  456.     
  457.     if min < 3:
  458.       self.__log.critical("Minimum nodes must be greater than 2.")
  459.       status = 2
  460.     else:
  461.       nodeSet = self.__nodePool.newNodeSet(min)
  462.       walltime = None
  463.       if self.__cfg['hod'].has_key('walltime'):
  464.         walltime = self.__cfg['hod']['walltime']
  465.       self.jobId, exitCode = self.__nodePool.submitNodeSet(nodeSet, walltime)
  466.       # if the job submission returned an error other than no resources
  467.       # retry a couple of times
  468.       while (self.jobId is False) and (exitCode != 188):
  469.         if hodInterrupt.isSet():
  470.           raise HodInterruptException()
  471.         failureCount += 1
  472.         if (failureCount >= self.__cfg['hod']['job-status-query-failure-retries']):
  473.           self.__log.debug("failed submitting job more than the retries. exiting")
  474.           break
  475.         else:
  476.           # wait a bit before retrying
  477.           time.sleep(self.__cfg['hod']['job-command-failure-interval'])
  478.           if hodInterrupt.isSet():
  479.             raise HodInterruptException()
  480.           self.jobId, exitCode = self.__nodePool.submitNodeSet(nodeSet, walltime)
  481.       if self.jobId:
  482.         jobStatus = None
  483.         try:
  484.           jobStatus = self.__check_job_status()
  485.         except HodInterruptException, h:
  486.           self.__log.info(HOD_INTERRUPTED_MESG)
  487.           self.delete_job(self.jobId)
  488.           self.__log.info("Cluster %s removed from queue." % self.jobId)
  489.           raise h
  490.         else:
  491.           if jobStatus == -1:
  492.             self.delete_job(self.jobId);
  493.             status = 4
  494.             return status
  495.         if jobStatus:
  496.           self.__log.info("Cluster Id %s" 
  497.                                                               % self.jobId)
  498.           try:
  499.             self.ringmasterXRS = self.__get_ringmaster_client()
  500.             
  501.             self.__log.debug("Ringmaster at : %s" % self.ringmasterXRS )
  502.             ringClient = None
  503.             if self.ringmasterXRS:
  504.               ringClient =  hodXRClient(self.ringmasterXRS)
  505.                 
  506.               hdfsStatus, hdfsAddr, self.hdfsInfo = 
  507.                 self.__init_hadoop_service('hdfs', ringClient)
  508.                 
  509.               if hdfsStatus:
  510.                 self.__log.info("HDFS UI at http://%s" % self.hdfsInfo)
  511.   
  512.                 mapredStatus, mapredAddr, self.mapredInfo = 
  513.                   self.__init_hadoop_service('mapred', ringClient)
  514.   
  515.                 if mapredStatus:
  516.                   self.__log.info("Mapred UI at http://%s" % self.mapredInfo)
  517.   
  518.                   if self.__cfg['hod'].has_key('update-worker-info') 
  519.                     and self.__cfg['hod']['update-worker-info']:
  520.                     workerInfoMap = {}
  521.                     workerInfoMap['HDFS UI'] = 'http://%s' % self.hdfsInfo
  522.                     workerInfoMap['Mapred UI'] = 'http://%s' % self.mapredInfo
  523.                     # Ringmaster URL sample format : http://hostname:port/
  524.                     workerInfoMap['RM RPC Port'] = '%s' % self.ringmasterXRS.split(":")[2].strip("/")
  525.                     if mapredAddr.find(':') != -1:
  526.                       workerInfoMap['Mapred RPC Port'] = mapredAddr.split(':')[1]
  527.                     ret = self.__nodePool.updateWorkerInfo(workerInfoMap, self.jobId)
  528.                     if ret != 0:
  529.                       self.__log.warn('Could not update HDFS and Mapred information.' 
  530.                                       'User Portal may not show relevant information.' 
  531.                                       'Error code=%s' % ret)
  532.   
  533.                   self.__cfg.replace_escape_seqs()
  534.                     
  535.                   # Go generate the client side hadoop-site.xml now
  536.                   # adding final-params as well, just so that conf on 
  537.                   # client-side and server-side are (almost) the same
  538.                   clientParams = None
  539.                   serverParams = {}
  540.                   finalServerParams = {}
  541.   
  542.                   # client-params
  543.                   if self.__cfg['hod'].has_key('client-params'):
  544.                     clientParams = self.__cfg['hod']['client-params']
  545.   
  546.                   # server-params
  547.                   if self.__cfg['gridservice-mapred'].has_key('server-params'):
  548.                     serverParams.update(
  549.                       self.__cfg['gridservice-mapred']['server-params'])
  550.                   if self.__cfg['gridservice-hdfs'].has_key('server-params'):
  551.                     # note that if there are params in both mapred and hdfs
  552.                     # sections, the ones in hdfs overwirte the ones in mapred
  553.                     serverParams.update(
  554.                         self.__cfg['gridservice-hdfs']['server-params'])
  555.                     
  556.                   # final-server-params
  557.                   if self.__cfg['gridservice-mapred'].has_key(
  558.                                                     'final-server-params'):
  559.                     finalServerParams.update(
  560.                       self.__cfg['gridservice-mapred']['final-server-params'])
  561.                   if self.__cfg['gridservice-hdfs'].has_key(
  562.                                                     'final-server-params'):
  563.                     finalServerParams.update(
  564.                         self.__cfg['gridservice-hdfs']['final-server-params'])
  565.   
  566.                   clusterFactor = self.__cfg['hod']['cluster-factor']
  567.                   tempDir = self.__cfg['hod']['temp-dir']
  568.                   if not os.path.exists(tempDir):
  569.                     os.makedirs(tempDir)
  570.                   tempDir = os.path.join( tempDir, self.__cfg['hod']['userid']
  571.                                   + "." + self.jobId )
  572.                   mrSysDir = getMapredSystemDirectory(self.__cfg['hodring']['mapred-system-dir-root'],
  573.                                       self.__cfg['hod']['userid'], self.jobId)
  574.                   self.__hadoopCfg.gen_site_conf(clusterDir, tempDir, min,
  575.                             hdfsAddr, mrSysDir, mapredAddr, clientParams,
  576.                             serverParams, finalServerParams,
  577.                             clusterFactor)
  578.                   self.__log.info("hadoop-site.xml at %s" % clusterDir)
  579.                   # end of hadoop-site.xml generation
  580.                 else:
  581.                   status = 8
  582.               else:
  583.                 status = 7  
  584.             else:
  585.               status = 6
  586.             if status != 0:
  587.               self.__log.debug("Cleaning up cluster id %s, as cluster could not be allocated." % self.jobId)
  588.               if ringClient is None:
  589.                 self.delete_job(self.jobId)
  590.               else:
  591.                 self.__log.debug("Calling rm.stop()")
  592.                 ringClient.stopRM()
  593.                 self.__log.debug("Returning from rm.stop()")
  594.           except HodInterruptException, h:
  595.             self.__log.info(HOD_INTERRUPTED_MESG)
  596.             if self.ringmasterXRS:
  597.               if ringClient is None:
  598.                 ringClient =  hodXRClient(self.ringmasterXRS)
  599.               self.__log.debug("Calling rm.stop()")
  600.               ringClient.stopRM()
  601.               self.__log.debug("Returning from rm.stop()")
  602.               self.__log.info("Cluster Shutdown by informing ringmaster.")
  603.             else:
  604.               self.delete_job(self.jobId)
  605.               self.__log.info("Cluster %s removed from queue directly." % self.jobId)
  606.             raise h
  607.         else:
  608.           self.__log.critical("No cluster found, ringmaster failed to run.")
  609.           status = 5 
  610.       elif self.jobId == False:
  611.         if exitCode == 188:
  612.           self.__log.critical("Request execeeded maximum resource allocation.")
  613.         else:
  614.           self.__log.critical("Job submission failed with exit code %s" % exitCode)
  615.         status = 4
  616.       else:    
  617.         self.__log.critical("Scheduler failure, allocation failed.nn")        
  618.         status = 4
  619.     
  620.     if status == 5 or status == 6:
  621.       ringMasterErrors = self.__svcrgyClient.getRMError()
  622.       if ringMasterErrors:
  623.         self.__log.critical("Cluster could not be allocated because" 
  624.                             " of the following errors on the "
  625.                             "ringmaster host %s.n%s" % 
  626.                                (ringMasterErrors[0], ringMasterErrors[1]))
  627.         self.__log.debug("Stack trace on ringmaster: %s" % ringMasterErrors[2])
  628.     return status
  629.   def __isRingMasterAlive(self, rmAddr):
  630.     ret = True
  631.     rmSocket = tcpSocket(rmAddr)
  632.     try:
  633.       rmSocket.open()
  634.       rmSocket.close()
  635.     except tcpError:
  636.       ret = False
  637.     return ret
  638.   def deallocate(self, clusterDir, clusterInfo):
  639.     status = 0 
  640.     
  641.     nodeSet = self.__nodePool.newNodeSet(clusterInfo['min'], 
  642.                                          id=clusterInfo['jobid'])
  643.     self.mapredInfo = clusterInfo['mapred']
  644.     self.hdfsInfo = clusterInfo['hdfs']
  645.     try:
  646.       if self.__cfg['hod'].has_key('hadoop-ui-log-dir'):
  647.         clusterStatus = self.check_cluster(clusterInfo)
  648.         if clusterStatus != 14 and clusterStatus != 10:   
  649.           # If JT is still alive
  650.           self.__collect_jobtracker_ui(self.__cfg['hod']['hadoop-ui-log-dir'])
  651.       else:
  652.         self.__log.debug('hadoop-ui-log-dir not specified. Skipping Hadoop UI log collection.')
  653.     except HodInterruptException, h:
  654.       # got an interrupt. just pass and proceed to qdel
  655.       pass 
  656.     except:
  657.       self.__log.info("Exception in collecting Job tracker logs. Ignoring.")
  658.     
  659.     rmAddr = None
  660.     if clusterInfo.has_key('ring'):
  661.       # format is http://host:port/ We need host:port
  662.       rmAddr = clusterInfo['ring'][7:]
  663.       if rmAddr.endswith('/'):
  664.         rmAddr = rmAddr[:-1]
  665.     if (rmAddr is None) or (not self.__isRingMasterAlive(rmAddr)):
  666.       # Cluster is already dead, don't try to contact ringmaster.
  667.       self.__nodePool.finalize()
  668.       status = 10 # As cluster is dead, we just set the status to 'cluster dead'.
  669.     else:
  670.       xrsAddr = clusterInfo['ring']
  671.       rmClient = hodXRClient(xrsAddr)
  672.       self.__log.debug('calling rm.stop')
  673.       rmClient.stopRM()
  674.       self.__log.debug('completed rm.stop')
  675.     # cleanup hod temp dirs
  676.     tempDir = os.path.join( self.__cfg['hod']['temp-dir'], 
  677.                     self.__cfg['hod']['userid'] + "." + clusterInfo['jobid'] )
  678.     if os.path.exists(tempDir):
  679.       shutil.rmtree(tempDir)
  680.    
  681.     return status
  682.   
  683. class hadoopScript:
  684.   def __init__(self, conf, execDir):
  685.     self.__environ = os.environ.copy()
  686.     self.__environ['HADOOP_CONF_DIR'] = conf
  687.     self.__execDir = execDir
  688.     
  689.   def run(self, script):
  690.     scriptThread = simpleCommand(script, script, self.__environ, 4, False, 
  691.                                  False, self.__execDir)
  692.     scriptThread.start()
  693.     scriptThread.wait()
  694.     scriptThread.join()
  695.     
  696.     return scriptThread.exit_code()