torque.py
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:10k
源码类别:

网格计算

开发平台:

Java

  1. #Licensed to the Apache Software Foundation (ASF) under one
  2. #or more contributor license agreements.  See the NOTICE file
  3. #distributed with this work for additional information
  4. #regarding copyright ownership.  The ASF licenses this file
  5. #to you under the Apache License, Version 2.0 (the
  6. #"License"); you may not use this file except in compliance
  7. #with the License.  You may obtain a copy of the License at
  8. #     http://www.apache.org/licenses/LICENSE-2.0
  9. #Unless required by applicable law or agreed to in writing, software
  10. #distributed under the License is distributed on an "AS IS" BASIS,
  11. #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. #See the License for the specific language governing permissions and
  13. #limitations under the License.
  14. """Maui/Torque implementation of NodePool"""
  15. # -*- python -*-
  16. import os, sys, csv, socket, time, re, pprint
  17. from hodlib.Hod.nodePool import *
  18. from hodlib.Schedulers.torque import torqueInterface
  19. from hodlib.Common.threads import simpleCommand
  20. from hodlib.Common.util import get_exception_string, args_to_string, local_fqdn, 
  21.                         TORQUE_USER_LIMITS_COMMENT_FIELD
  22. class TorqueNodeSet(NodeSet):
  23.   def __init__(self, id, numNodes, preferredList, isPreemptee):
  24.     NodeSet.__init__(self, id, numNodes, preferredList, isPreemptee)
  25.     self.qsubId = None
  26.     self.addrList = []
  27.   def _setQsubId(self, qsubId):
  28.     self.qsubId = qsubId
  29.   def _setAddrList(self, addrList):
  30.     self.addrList = addrList
  31.   def getAddrList(self):
  32.     return self.addrList
  33. class TorquePool(NodePool):
  34.   def __init__(self, nodePoolDesc, cfg, log):
  35.     NodePool.__init__(self, nodePoolDesc, cfg, log)
  36.     environ = os.environ.copy()
  37.     
  38.     if self._cfg['resource_manager'].has_key('pbs-server'):
  39.       environ['PBS_DEFAULT'] = self._cfg['resource_manager']['pbs-server']
  40.     self.__torque = torqueInterface(
  41.       self._cfg['resource_manager']['batch-home'], environ, self._log)
  42.   def getAccountString(self):
  43.     account = ''
  44.     if self._cfg['resource_manager'].has_key('pbs-account'):
  45.       account = self._cfg['resource_manager']['pbs-account']
  46.     return account
  47.   def __gen_submit_params(self, nodeSet, walltime = None, qosLevel = None, 
  48.                           account = None):
  49.     argList = []
  50.     stdinList = []
  51.     
  52.     npd = self.nodePoolDesc
  53.     
  54.     def gen_stdin_list():
  55.       # Here we are basically generating the standard input for qsub.
  56.       #  Specifically a script to exec ringmaster.
  57.       stdinList.append('#!/bin/sh')
  58.       
  59.       ringBin = os.path.join(self._cfg['hod']['base-dir'], 'bin', 
  60.                              'ringmaster')
  61.       ringArgs = [ringBin,]
  62.       ringArgs.extend(self._cfg.get_args(exclude=('hod')))
  63.       
  64.       ringMasterCommand = args_to_string(ringArgs)
  65.       
  66.       self._log.debug("ringmaster cmd: %s" % ringMasterCommand)
  67.       
  68.       stdinList.append(ringMasterCommand)
  69.       
  70.     def gen_arg_list():      
  71.       def process_qsub_attributes():
  72.         rawAttributes = self.nodePoolDesc.getAttrs()
  73.     
  74.         # 'W:x' is used to specify torque management extentensions ie -W x= ...
  75.         resourceManagementExtensions = ''
  76.         if 'W:x' in rawAttributes:
  77.           resourceManagementExtensions = rawAttributes['W:x']
  78.     
  79.         if qosLevel:
  80.           if len(resourceManagementExtensions) > 0:
  81.             resourceManagementExtensions += ';'
  82.           resourceManagementExtensions += 'QOS:%s' % (qosLevel)
  83.     
  84.         rawAttributes['W:x'] = resourceManagementExtensions
  85.         
  86.         hostname = local_fqdn()
  87.    
  88.         # key values are expected to have string values. 
  89.         rawAttributes['l:nodes'] = "%s" % nodeSet._getNumNodes()
  90.         
  91.         if walltime:
  92.           rawAttributes['l:walltime'] = "%s" % walltime
  93.         
  94.         #create a dict of dictionaries for 
  95.         # various arguments of torque
  96.         cmds = {}
  97.         for key in rawAttributes:
  98.           value = rawAttributes[key]
  99.     
  100.           if key.find(':') == -1:
  101.             raise ValueError, 'Syntax error: missing colon after %s in %s=%s' % (
  102.               key, key, value)
  103.     
  104.           [option, subOption] = key.split(':', 1)
  105.           if not option in cmds:
  106.             cmds[option] = {}
  107.           cmds[option][subOption] = value
  108.         
  109.         opts = []
  110.         #create a string from this
  111.         #dictionary of dictionaries createde above
  112.         for k in cmds:
  113.           csv = []
  114.           nv = cmds[k]
  115.           for n in nv:
  116.             v = nv[n]
  117.             if len(n) == 0:
  118.               csv.append(v)
  119.             else:
  120.               csv.append('%s=%s' % (n, v))
  121.           opts.append('-%s' % (k))
  122.           opts.append(','.join(csv))
  123.     
  124.         for option in cmds:
  125.           commandList = []
  126.           for subOption in cmds[option]:
  127.             value = cmds[option][subOption]
  128.             if len(subOption) == 0:
  129.                 commandList.append(value)
  130.             else:
  131.                 commandList.append("%s=%s" % (subOption, value))
  132.           opts.append('-%s' % option)
  133.           opts.append(','.join(commandList))
  134.           
  135.         return opts
  136.       
  137.       pkgdir = npd.getPkgDir()
  138.   
  139.       qsub = os.path.join(pkgdir, 'bin', 'qsub')
  140.       sdd = self._cfg['servicedesc']
  141.       
  142.       gsvc = None
  143.       for key in sdd:
  144.         gsvc = sdd[key]
  145.         break
  146.       
  147.       argList.extend(process_qsub_attributes())
  148.       argList.extend(('-N', '"' + self._cfg['hod']['title'] + '"'))
  149.       argList.extend(('-r','n'))
  150.       if 'pbs-user' in self._cfg['resource_manager']:
  151.         argList.extend(('-u', self._cfg['resource_manager']['pbs-user']))
  152.   
  153.       argList.extend(('-d','/tmp/'))
  154.       if 'queue' in self._cfg['resource_manager']:
  155.         queue = self._cfg['resource_manager']['queue']
  156.         argList.extend(('-q',queue))
  157.   
  158.       # In HOD 0.4, we pass in an account string only if it is mentioned.
  159.       # Also, we don't append userid to the account string, as HOD jobs run as the 
  160.       # user running them, not as 'HOD' user.
  161.       if self._cfg['resource_manager'].has_key('pbs-account'):
  162.         argList.extend(('-A', (self._cfg['resource_manager']['pbs-account'])))
  163.     
  164.       if 'env-vars' in self._cfg['resource_manager']:
  165.         qsub_envs = self._cfg['resource_manager']['env-vars']
  166.         argList.extend(('-v', self.__keyValToString(qsub_envs)))
  167.     gen_arg_list()
  168.     gen_stdin_list()
  169.     
  170.     return argList, stdinList
  171.     
  172.   def __keyValToString(self, keyValList):
  173.     ret = ""
  174.     for key in keyValList:
  175.       ret = "%s%s=%s," % (ret, key, keyValList[key])
  176.     return ret[:-1]
  177.   
  178.   def newNodeSet(self, numNodes, preferred=[], isPreemptee=True, id=None):
  179.     if not id:
  180.       id = self.getNextNodeSetId()
  181.     
  182.     nodeSet = TorqueNodeSet(id, numNodes, preferred, isPreemptee)
  183.     self.nodeSetDict[nodeSet.getId()] = nodeSet
  184.     
  185.     return nodeSet
  186.       
  187.   def submitNodeSet(self, nodeSet, walltime = None, qosLevel = None, 
  188.                     account = None):
  189.     argList, stdinList = self.__gen_submit_params(nodeSet, walltime, qosLevel, 
  190.                                                   account)
  191.     
  192.     jobId, exitCode = self.__torque.qsub(argList, stdinList)
  193.     
  194.     ## UNUSED CODE: LINE ##
  195.     nodeSet.qsubId = jobId
  196.     return jobId, exitCode
  197.   def freeNodeSet(self, nodeSet):
  198.     
  199.     exitCode = self.deleteJob(nodeSet.getId())
  200.     
  201.     del self.nodeSetDict[nodeSet.getId()]
  202.   
  203.     return exitCode
  204.   
  205.   def finalize(self):
  206.     status = 0
  207.     exitCode = 0
  208.     for nodeSet in self.nodeSetDict.values():
  209.       exitCode = self.freeNodeSet(nodeSet)
  210.       
  211.     if exitCode > 0 and exitCode != 153:
  212.       status = 4
  213.       
  214.     return status
  215.     
  216.   ## UNUSED METHOD ?? ##
  217.   def getWorkers(self):
  218.     hosts = []
  219.     
  220.     qstatInfo = self.__torque(self.getServiceId())
  221.     if qstatInfo:
  222.       hosts = qstatInfop['exec_host']
  223.     
  224.     return hosts
  225.  
  226.   ## UNUSED METHOD ?? ##
  227.   def pollNodeSet(self, nodeSet):
  228.     status = NodeSet.COMPLETE  
  229.     nodeSet = self.nodeSetDict[0] 
  230.     qstatInfo = self.__torque(self.getServiceId())
  231.     if qstatMap:    
  232.       jobstate = qstatMap['job_state']
  233.       exechost = qstatMap['exec_host']
  234.     if jobstate == 'Q':
  235.       status = NodeSet.PENDING
  236.     elif exechost == None:
  237.       status = NodeSet.COMMITTED
  238.     else:
  239.       nodeSet._setAddrList(exec_host)
  240.     return status
  241.         
  242.   def getServiceId(self):
  243.     id = None
  244.     
  245.     nodeSets = self.nodeSetDict.values()
  246.     if len(nodeSets):
  247.       id = nodeSets[0].qsubId
  248.       
  249.     if id == None:
  250.       id = os.getenv('PBS_JOBID')
  251.       
  252.     return id
  253.   def getJobInfo(self, jobId=None):
  254.     jobNonExistentErrorCode = 153
  255.     self.__jobInfo = { 'job_state' : False }
  256.     
  257.     if jobId == None:
  258.       jobId = self.getServiceId()
  259.     qstatInfo, exitCode = self.__torque.qstat(jobId)
  260.     if exitCode == 0:
  261.       self.__jobInfo = qstatInfo
  262.     elif exitCode == jobNonExistentErrorCode:
  263.       # This really means that the job completed
  264.       # However, setting only job_state for now, not 
  265.       # any other attributes, as none seem required.
  266.       self.__jobInfo = { 'job_state' : 'C' }
  267.     return self.__jobInfo
  268.   def deleteJob(self, jobId):
  269.     exitCode = self.__torque.qdel(jobId)
  270.     return exitCode
  271.   def isJobFeasible(self):
  272.     comment = None
  273.     msg = None
  274.     if self.__jobInfo.has_key('comment'):
  275.       comment = self.__jobInfo['comment']
  276.     try:
  277.       if comment:
  278.         commentField = re.compile(self._cfg['hod']['job-feasibility-attr'])
  279.         match = commentField.search(comment)
  280.         if match:
  281.           reqUsage = int(match.group(1))
  282.           currentUsage = int(match.group(2))
  283.           maxUsage = int(match.group(3))
  284.           msg = "Current Usage:%s, Requested:%s, Maximum Limit:%s " % 
  285.                                   (currentUsage, reqUsage, maxUsage)
  286.           if reqUsage > maxUsage:
  287.             return "Never", msg
  288.           if reqUsage + currentUsage > maxUsage:
  289.             return False, msg
  290.     except Exception, e:
  291.       self._log.error("Error in isJobFeasible : %s" %e)
  292.       raise Exception(e)
  293.     return True, msg
  294.     
  295.   def runWorkers(self, args):
  296.     return self.__torque.pbsdsh(args)
  297.   def updateWorkerInfo(self, workerInfoMap, jobId):
  298.     workerInfoStr = ''
  299.     for key in workerInfoMap.keys():
  300.       workerInfoStr = '%s,%s:%s' % (workerInfoStr, key, workerInfoMap[key])
  301.     exitCode = self.__torque.qalter("notes", workerInfoStr[1:], jobId)
  302.     return exitCode