torque.py
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:5k
源码类别:

网格计算

开发平台:

Java

  1. #Licensed to the Apache Software Foundation (ASF) under one
  2. #or more contributor license agreements.  See the NOTICE file
  3. #distributed with this work for additional information
  4. #regarding copyright ownership.  The ASF licenses this file
  5. #to you under the Apache License, Version 2.0 (the
  6. #"License"); you may not use this file except in compliance
  7. #with the License.  You may obtain a copy of the License at
  8. #     http://www.apache.org/licenses/LICENSE-2.0
  9. #Unless required by applicable law or agreed to in writing, software
  10. #distributed under the License is distributed on an "AS IS" BASIS,
  11. #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. #See the License for the specific language governing permissions and
  13. #limitations under the License.
  14. import os, pprint, re, time
  15. from hodlib.Common.threads import simpleCommand
  16. from hodlib.Common.util import args_to_string
  17. from hodlib.Common.logger import hodDummyLogger
  18. reQstatLine = re.compile("^s*(w+)s*=s*(.*)s*$")
  19. class torqueInterface:
  20.   def __init__(self, torqueDir, environment, log=None):
  21.     self.__qsub = os.path.join(torqueDir, 'bin', 'qsub')
  22.     self.__qdel = os.path.join(torqueDir, 'bin', 'qdel')
  23.     self.__qstat = os.path.join(torqueDir, 'bin', 'qstat')
  24.     self.__pbsNodes = os.path.join(torqueDir, 'bin', 'pbsnodes')
  25.     self.__pbsdsh = os.path.join(torqueDir, 'bin', 'pbsdsh')
  26.     self.__qalter = os.path.join(torqueDir, 'bin', 'qalter')
  27.     self.__env = environment
  28.     
  29.     self.__log = log
  30.     if not self.__log:
  31.       self.__log = hodDummyLogger()
  32.         
  33.   def qsub(self, argList, stdinList):
  34.     jobID = False
  35.     exitCode = 0
  36.     qsubCommand = "%s %s" % (self.__qsub, args_to_string(argList))
  37.     
  38.     self.__log.debug("qsub -> %s" % qsubCommand)
  39.     
  40.     qsubProcess = simpleCommand('qsub', qsubCommand, env=self.__env)
  41.     qsubProcess.start()
  42.     
  43.     while qsubProcess.stdin == None:
  44.       time.sleep(.2)
  45.     try:
  46.       for line in stdinList:
  47.         self.__log.debug("qsub stdin: %s" % line)
  48.         print >>qsubProcess.stdin, line
  49.       qsubProcess.stdin.close()
  50.     except IOError, i:
  51.       # If torque's qsub is given invalid params, it fails & returns immediately
  52.       # Check for such errors here
  53.       # Wait for command execution to finish
  54.       qsubProcess.wait()
  55.       qsubProcess.join()
  56.       output = qsubProcess.output()
  57.       if output!=[]:
  58.         self.__log.critical("qsub Failure : %s " % output[0].strip())
  59.         self.__log.critical("qsub Command : %s" % qsubCommand)
  60.       return None, qsubProcess.exit_code()
  61.     qsubProcess.wait()
  62.     qsubProcess.join()
  63.     
  64.     exitCode = qsubProcess.exit_code()
  65.     if exitCode == 0:
  66.       buffer = qsubProcess.output()
  67.       jobID = buffer[0].rstrip('n')
  68.       self.__log.debug("qsub jobid: %s" % jobID)
  69.     else:
  70.       self.__log.critical("qsub error: %s" % qsubProcess.exit_status_string())    
  71.     
  72.     return jobID, exitCode
  73.   
  74.   def qstat(self, jobID):
  75.     qstatInfo = None  
  76.     
  77.     qstatCommand = "%s -f -1 %s" % (self.__qstat, jobID)
  78.     
  79.     self.__log.debug(qstatCommand)
  80.     qstatProcess = simpleCommand('qstat', qstatCommand, env=self.__env)
  81.     qstatProcess.start()
  82.     qstatProcess.wait()
  83.     qstatProcess.join()
  84.     
  85.     exitCode = qstatProcess.exit_code()
  86.     if exitCode > 0:
  87.       self.__log.warn('qstat error: %s' % qstatProcess.exit_status_string())
  88.     else:
  89.       qstatInfo = {}
  90.       for line in qstatProcess.output():
  91.         line = line.rstrip()
  92.         if line.find('=') != -1:
  93.           qstatMatch = reQstatLine.match(line)
  94.           if qstatMatch:
  95.             key = qstatMatch.group(1)
  96.             value = qstatMatch.group(2)
  97.             qstatInfo[key] = value
  98.           
  99.       if 'exec_host' in qstatInfo:
  100.         list = qstatInfo['exec_host'].split('+')
  101.         addrList = []
  102.         
  103.         for item in list:
  104.           [head, end] = item.split('/', 1)
  105.           addrList.append(head)
  106.         
  107.         qstatInfo['exec_host'] = addrList
  108.         
  109.     return qstatInfo, exitCode
  110.   
  111.   def pbs_nodes(self, argString):
  112.     pass
  113.   
  114.   def qdel(self, jobId, force=False):
  115.     exitCode = 0
  116.     qdel = self.__qdel
  117.     if force:
  118.       qdel = "%s -p %s" % (qdel, jobId)
  119.     else:
  120.       qdel = "%s %s" % (qdel, jobId) 
  121.     self.__log.debug(qdel)
  122.     qdelProcess = simpleCommand('qdel', qdel, env=self.__env)
  123.     qdelProcess.start()
  124.     qdelProcess.wait()
  125.     qdelProcess.join()      
  126.       
  127.     exitCode = qdelProcess.exit_code()
  128.     
  129.     return exitCode
  130.   
  131.   def pbsdsh(self, arguments):
  132.     status = None
  133.     
  134.     pbsdshCommand = "%s %s" % (self.__pbsdsh, args_to_string(arguments))
  135.     
  136.     self.__log.debug("pbsdsh command: %s" % pbsdshCommand)
  137.     
  138.     pbsdsh = simpleCommand('pbsdsh', pbsdshCommand, env=self.__env)
  139.     pbsdsh.start()   
  140.     for i in range(0, 30):
  141.       status = pbsdsh.exit_code()
  142.       if status:
  143.         self.__log.error("pbsdsh failed: %s" % pbsdsh.exit_status_string())
  144.         break  
  145.     
  146.     if not status: status = 0
  147.       
  148.     return status  
  149.   def qalter(self, fieldName, fieldValue, jobId):
  150.     """Update the job field with fieldName with the fieldValue.
  151.        The fieldValue must be modifiable after the job is submitted."""
  152.     # E.g. to alter comment: qalter -W notes='value` jobId
  153.     qalterCmd = '%s -W %s="%s" %s' % (self.__qalter, fieldName, fieldValue, jobId) 
  154.     self.__log.debug("qalter command: %s" % qalterCmd)
  155.     qalterProcess = simpleCommand('qalter', qalterCmd, env=self.__env)
  156.     qalterProcess.start()
  157.     qalterProcess.wait()
  158.     qalterProcess.join()
  159.     exitCode = qalterProcess.exit_code()
  160.     return exitCode