threads.py
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:12k
源码类别:

网格计算

开发平台:

Java

  1. #Licensed to the Apache Software Foundation (ASF) under one
  2. #or more contributor license agreements.  See the NOTICE file
  3. #distributed with this work for additional information
  4. #regarding copyright ownership.  The ASF licenses this file
  5. #to you under the Apache License, Version 2.0 (the
  6. #"License"); you may not use this file except in compliance
  7. #with the License.  You may obtain a copy of the License at
  8. #     http://www.apache.org/licenses/LICENSE-2.0
  9. #Unless required by applicable law or agreed to in writing, software
  10. #distributed under the License is distributed on an "AS IS" BASIS,
  11. #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. #See the License for the specific language governing permissions and
  13. #limitations under the License.
  14. import threading, time, os, sys, pprint
  15. from popen2 import Popen4, Popen3, MAXFD
  16. from signal import SIGTERM, SIGKILL
  17. class baseThread(threading.Thread):
  18.     """Base CAM threading class.  The run method should be overridden."""
  19.     def __init__(self, name):
  20.         threading.Thread.__init__(self, name=name)
  21.         self.stopFlag = threading.Event()
  22.         self.stopFlag.clear()
  23.         self.running = threading.Event()
  24.         self.running.set()
  25.         self.isFinished = threading.Event()
  26.         self.isFinished.clear()
  27.     def join(self, timeout=None):
  28.         self.stopFlag.set()
  29.         threading.Thread.join(self, timeout)
  30.     def pause(self):
  31.         """Pause thread."""
  32.         self.running.clear()
  33.     def cont(self):
  34.         """Resume thread operation."""
  35.         self.running.set()
  36. class simpleCommand(baseThread):
  37.     """Command execution object.  Command output and exit status are captured.
  38.        Public class attributes:
  39.        cmdString    - command to be executed
  40.        outputBuffer - command output, stdout + stderr
  41.        status       - exit status, as returned by wait
  42.        
  43.        stdin        - standard input for command
  44.        stdout       - standard output of command when buffer == False
  45.        stderr       - standard error of command when mode == 3 and buffer == False
  46.        
  47.        """
  48.     def __init__(self, name, cmdString, env=os.environ, mode=4, buffer=True, 
  49.                  wait=True, chdir=None):
  50.         """Class initialization.
  51.            name        - thread name to use when running the command
  52.            cmdString   - command string to execute
  53.            inputString - string to print to command's stdin
  54.            env         - shell environment dictionary
  55.            mode        - 3 for popen3 and 4 for popen4
  56.            buffer      - out put to be retrieved with output() method
  57.            wait        - return immediately after start() is called and output 
  58.                          command results as they come to stdout"""
  59.         baseThread.__init__(self, name=name)
  60.         self.cmdString = cmdString
  61.         self.__mode = mode
  62.         self.__buffer = buffer
  63.         self.__wait = wait
  64.         self.__chdir = chdir
  65.         self.__outputBuffer = []
  66.         self.__status = None
  67.         self.__pid = None
  68.         self.__isFinished = threading.Event()
  69.         self.__isFinished.clear()
  70.         
  71.         self.stdin = None
  72.         self.stdout = None
  73.         self.stderr = None
  74.         self.__env = env
  75.     
  76.     def run(self):
  77.         """ Overridden run method.  Most of the work happens here.  start()
  78.             should be called in place of this method."""
  79.             
  80.         oldDir = None
  81.         if self.__chdir:
  82.             if os.path.exists(self.__chdir):
  83.                 oldDir = os.getcwd()  
  84.                 os.chdir(self.__chdir)
  85.             else:
  86.                 raise Exception(
  87.                     "simpleCommand: invalid chdir specified: %s" % 
  88.                     self.__chdir)
  89.             
  90.         cmd = None
  91.         if self.__mode == 3:
  92.             cmd = _Popen3Env(self.cmdString, env=self.__env)
  93.         else:
  94.             cmd = _Popen4Env(self.cmdString, env=self.__env)
  95.         self.__pid = cmd.pid
  96.         self.stdin = cmd.tochild
  97.         
  98.         if self.__mode == 3:
  99.             self.stderr = cmd.childerr
  100.         while cmd.fromchild == None:
  101.             time.sleep(1)
  102.         
  103.         if self.__buffer == True:
  104.             output = cmd.fromchild.readline()
  105.             while output != '':
  106.                 while not self.running.isSet():
  107.                     if self.stopFlag.isSet():
  108.                         break
  109.                     time.sleep(1)
  110.                 self.__outputBuffer.append(output)
  111.                 output = cmd.fromchild.readline()
  112.         elif self.__wait == False:
  113.             output = cmd.fromchild.readline()
  114.             while output != '':
  115.                 while not self.running.isSet():
  116.                     if self.stopFlag.isSet():
  117.                         break
  118.                     time.sleep(1)
  119.                 print output,
  120.                 if self.stopFlag.isSet():
  121.                     break
  122.                 output = cmd.fromchild.readline()
  123.         else:
  124.             self.stdout = cmd.fromchild
  125.         self.__status = cmd.poll()
  126.         while self.__status == -1:
  127.             while not self.running.isSet():
  128.                 if self.stopFlag.isSet():
  129.                     break
  130.                 time.sleep(1)
  131.             self.__status = cmd.poll()
  132.             time.sleep(1)
  133.         if oldDir:
  134.             os.chdir(oldDir)
  135.         self.__isFinished.set()
  136.         
  137.         sys.exit(0)
  138.     def getPid(self):
  139.         """return pid of the launches process"""
  140.         return self.__pid
  141.     def output(self):
  142.         return self.__outputBuffer[:]
  143.     def wait(self):
  144.         """Wait blocking until command execution completes."""
  145.         self.__isFinished.wait()
  146.         return os.WEXITSTATUS(self.__status)
  147.     def is_running(self):
  148.         """Returns boolean, are we running?"""
  149.         
  150.         status = True
  151.         if self.__isFinished.isSet():
  152.             status = False
  153.             
  154.         return status 
  155.     def exit_code(self):
  156.         """ Returns process exit code."""
  157.         
  158.         if self.__status != None:
  159.             return os.WEXITSTATUS(self.__status)
  160.         else:
  161.             return None
  162.         
  163.     def exit_status_string(self):
  164.         """Return a string representation of the command's exit status."""
  165.         statusString = None
  166.         if self.__status:
  167.             exitStatus = os.WEXITSTATUS(self.__status)
  168.             exitSignal = os.WIFSIGNALED(self.__status)
  169.             coreDump   = os.WCOREDUMP(self.__status)
  170.             statusString = "exit code: %s | signal: %s | core %s" % 
  171.                 (exitStatus, exitSignal, coreDump)
  172.         return(statusString)
  173.     def stop(self):
  174.         """Stop the running command and join it's execution thread."""
  175.         self.join()
  176.     def kill(self):
  177.         count = 0
  178.         while self.is_running():
  179.           try:
  180.             if count > 20:
  181.               os.kill(self.__pid, SIGKILL)
  182.               break
  183.             else:  
  184.               os.kill(self.__pid, SIGTERM)
  185.           except:
  186.             break
  187.           
  188.           time.sleep(.1)
  189.           count = count + 1
  190.         
  191.         self.stop()
  192.         
  193. class _Popen3Env(Popen3):
  194.     def __init__(self, cmd, capturestderr=False, bufsize=-1, env=os.environ):
  195.         self._env = env
  196.         Popen3.__init__(self, cmd, capturestderr, bufsize)
  197.     
  198.     def _run_child(self, cmd):
  199.         if isinstance(cmd, basestring):
  200.             cmd = ['/bin/sh', '-c', cmd]
  201.         for i in xrange(3, MAXFD):
  202.             try:
  203.                 os.close(i)
  204.             except OSError:
  205.                 pass
  206.         try:
  207.             os.execvpe(cmd[0], cmd, self._env)
  208.         finally:
  209.             os._exit(1)
  210.             
  211. class _Popen4Env(_Popen3Env, Popen4):
  212.     childerr = None
  213.     def __init__(self, cmd, bufsize=-1, env=os.environ):
  214.         self._env = env
  215.         Popen4.__init__(self, cmd, bufsize)
  216.         
  217. class loop(baseThread):
  218.     """ A simple extension of the threading.Thread class which continuously
  219.         executes a block of code until join().
  220.     """
  221.     def __init__(self, name, functionRef, functionArgs=None, sleep=1, wait=0,
  222.         offset=False):
  223.         """Initialize a loop object.
  224.            name         - thread name
  225.            functionRef  - a function reference
  226.            functionArgs - function arguments in the form of a tuple,
  227.            sleep        - time to wait between function execs
  228.            wait         - time to wait before executing the first time
  229.            offset       - set true to sleep as an offset of the start of the
  230.                           last func exec instead of the end of the last func
  231.                           exec
  232.         """
  233.         self.__functionRef  = functionRef
  234.         self.__functionArgs = functionArgs
  235.         self.__sleep        = sleep
  236.         self.__wait         = wait
  237.         self.__offset       = offset
  238.         baseThread.__init__(self, name=name)
  239.     def run(self):
  240.         """Do not call this directly.  Call self.start()."""
  241.         startTime = None
  242.         while not self.stopFlag.isSet():
  243.             sleep = self.__sleep
  244.             if self.__wait > 0:
  245.                 startWaitCount = 0
  246.                 while not self.stopFlag.isSet():
  247.                     while not self.running.isSet():
  248.                         if self.stopFlag.isSet():
  249.                             break
  250.                         time.sleep(1)
  251.                     time.sleep(0.5)
  252.                     startWaitCount = startWaitCount + .5
  253.                     if startWaitCount >= self.__wait:
  254.                         self.__wait = 0
  255.                         break
  256.             startTime = time.time()
  257.             if not self.stopFlag.isSet():
  258.                 if self.running.isSet():
  259.                     if self.__functionArgs:
  260.                         self.__functionRef(self.__functionArgs)
  261.                     else:
  262.                         self.__functionRef()
  263.             endTime = time.time()
  264.             while not self.running.isSet():
  265.                 time.sleep(1)
  266.             while not self.stopFlag.isSet():
  267.                 while not self.running.isSet():
  268.                     if self.stopFlag.isSet():
  269.                         break
  270.                     time.sleep(1)
  271.                 currentTime = time.time()
  272.                 if self.__offset:
  273.                     elapsed = time.time() - startTime
  274.                 else:
  275.                     elapsed = time.time() - endTime
  276.                 if elapsed >= self.__sleep:
  277.                     break
  278.                 time.sleep(0.5)
  279.         
  280.         self.isFinished.set()
  281.     def set_sleep(self, sleep, wait=None, offset=None):
  282.         """Modify loop frequency paramaters.
  283.            sleep        - time to wait between function execs
  284.            wait         - time to wait before executing the first time
  285.            offset       - set true to sleep as an offset of the start of the
  286.                           last func exec instead of the end of the last func
  287.                           exec
  288.         """
  289.         self.__sleep = sleep
  290.         if wait != None:
  291.             self.__wait = wait
  292.         if offset != None:
  293.             self.__offset = offset
  294.     def get_sleep(self):
  295.         """Get loop frequency paramaters.
  296.         Returns a dictionary with sleep, wait, offset.
  297.         """
  298.         return {
  299.             'sleep'  : self.__sleep,
  300.             'wait'   : self.__wait,
  301.             'offset' : self.__offset,
  302.             }
  303.         
  304. class func(baseThread):
  305.     """ A simple extension of the threading.Thread class which executes 
  306.         a function in a separate thread.
  307.     """
  308.     def __init__(self, name, functionRef, functionArgs=None):
  309.         """Initialize a func object.
  310.            name         - thread name
  311.            functionRef  - a function reference
  312.            functionArgs - function arguments in the form of a tuple,
  313.         """
  314.         self.__functionRef  = functionRef
  315.         self.__functionArgs = functionArgs
  316.         baseThread.__init__(self, name=name)
  317.     def run(self):
  318.         """Do not call this directly.  Call self.start()."""
  319.         if not self.stopFlag.isSet():
  320.             if self.running.isSet():
  321.                 if self.__functionArgs:
  322.                     self.__functionRef(self.__functionArgs)
  323.                 else:
  324.                     self.__functionRef()
  325.         sys.exit(0)