scheduler.py
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:7k
源码类别:

网格计算

开发平台:

Java

  1. #!/usr/bin/python
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements.  See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership.  The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License.  You may obtain a copy of the License at
  9. #
  10. #     http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing, software
  13. # distributed under the License is distributed on an "AS IS" BASIS,
  14. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. # See the License for the specific language governing permissions and
  16. # limitations under the License.
  17. # Schedule FailMon execution for nodes of file hosts.list, according to
  18. # the properties file conf/global.config.
  19. import time
  20. import ConfigParser
  21. import subprocess
  22. import threading
  23. import random
  24. jobs = []
  25. username = "user"
  26. connections = 10
  27. failmonDir = ""
  28. maxFiles = 100
  29. # This class represents a thread that connects to a set of cluster
  30. # nodes to locally execute monitoring jobs. These jobs are specified
  31. # as a shell command in the constructor.
  32. class sshThread (threading.Thread):
  33.     def __init__(self, threadname, username, command, failmonDir):
  34.         threading.Thread.__init__(self)
  35.         self.name = threadname
  36.         self.username = username
  37.         self.command = command
  38.         self.failmonDir = failmonDir
  39.         self.hosts = []
  40.     def addHost(self, host):
  41.         self.hosts.append(host)
  42.         
  43.     def run (self):
  44.         for host in self.hosts:
  45.             toRun = ["ssh", self.username + "@" + host, "cd " + self.failmonDir + " ; " + self.command]
  46.             print "Thread", self.name, "invoking command on", host, ":t", toRun, "...",
  47.             subprocess.check_call(toRun)
  48.             print "Done!"
  49. # This class represents a monitoring job. The param member is a string
  50. # that can be passed in the '--only' list of jobs given to the Java
  51. # class org.apache.hadoop.contrib.failmon.RunOnce for execution on a
  52. # node.
  53. class Job:
  54.     def __init__(self, param, interval):
  55.         self.param = param
  56.         self.interval = interval
  57.         self.counter = interval
  58.         return
  59.     def reset(self):
  60.         self.counter = self.interval
  61. # This function reads the configuration file to get the values of the
  62. # configuration parameters.
  63. def getJobs(file):
  64.     global username
  65.     global connections
  66.     global jobs
  67.     global failmonDir
  68.     global maxFiles
  69.     
  70.     conf = ConfigParser.SafeConfigParser()
  71.     conf.read(file)
  72.     username = conf.get("Default", "ssh.username")
  73.     connections = int(conf.get("Default", "max.connections"))
  74.     failmonDir = conf.get("Default", "failmon.dir")
  75.     maxFiles = conf.get("Default", "hdfs.files.max")
  76.     
  77.     # Hadoop Log
  78.     interval = int(conf.get("Default", "log.hadoop.interval"))
  79.     if interval != 0:
  80.         jobs.append(Job("hadoopLog", interval))
  81.     # System Log
  82.     interval = int(conf.get("Default", "log.system.interval"))
  83.     if interval != 0:
  84.         jobs.append(Job("systemLog", interval))
  85.     # NICs
  86.     interval = int(conf.get("Default", "nics.interval"))
  87.     if interval != 0:
  88.         jobs.append(Job("nics", interval))
  89.     # CPU
  90.     interval = int(conf.get("Default", "cpu.interval"))
  91.     if interval != 0:
  92.         jobs.append(Job("cpu", interval))
  93.     # CPU
  94.     interval = int(conf.get("Default", "disks.interval"))
  95.     if interval != 0:
  96.         jobs.append(Job("disks", interval))
  97.     # sensors
  98.     interval = int(conf.get("Default", "sensors.interval"))
  99.     if interval != 0:
  100.         jobs.append(Job("sensors", interval))
  101.     # upload
  102.     interval = int(conf.get("Default", "upload.interval"))
  103.     if interval != 0:
  104.         jobs.append(Job("upload", interval))
  105.     return
  106. # Compute the gcd (Greatest Common Divisor) of two integerss
  107. def GCD(a, b):
  108.     assert isinstance(a, int)
  109.     assert isinstance(b, int)
  110.     while a:
  111.         a, b = b%a, a
  112.     return b
  113. # Compute the gcd (Greatest Common Divisor) of a list of integers
  114. def listGCD(joblist):
  115.     assert isinstance(joblist, list)
  116.     if (len(joblist) == 1):
  117.         return joblist[0].interval
  118.     g = GCD(joblist[0].interval, joblist[1].interval)
  119.     for i in range (2, len(joblist)):
  120.         g = GCD(g, joblist[i].interval)
  121.         
  122.     return g
  123. # Merge all failmon files created on the HDFS into a single file
  124. def mergeFiles():
  125.     global username
  126.     global failmonDir
  127.     hostList = []
  128.     hosts = open('./conf/hosts.list', 'r')
  129.     for host in hosts:
  130.         hostList.append(host.strip().rstrip())
  131.     randomHost = random.sample(hostList, 1)
  132.     mergeCommand = "bin/failmon.sh --mergeFiles"
  133.     toRun = ["ssh", username + "@" + randomHost[0], "cd " + failmonDir + " ; " + mergeCommand]
  134.     print "Invoking command on", randomHost, ":t", mergeCommand, "...",
  135.     subprocess.check_call(toRun)
  136.     print "Done!"
  137.     return
  138. # The actual scheduling is done here
  139. def main():
  140.     getJobs("./conf/global.config")
  141.     for job in jobs:
  142.         print "Configuration: ", job.param, "every", job.interval, "seconds"
  143.         
  144.     globalInterval = listGCD(jobs)
  145.         
  146.     while True :
  147.         time.sleep(globalInterval)
  148.         params = []
  149.         
  150.         for job in jobs:
  151.             job.counter -= globalInterval
  152.             
  153.             if (job.counter <= 0):
  154.                 params.append(job.param)
  155.                 job.reset()
  156.                 
  157.         if (len(params) == 0):
  158.             continue;
  159.                     
  160.         onlyStr = "--only " + params[0]
  161.         for i in range(1, len(params)):
  162.             onlyStr += ',' + params[i] 
  163.                 
  164.         command = "bin/failmon.sh " + onlyStr
  165.         # execute on all nodes
  166.         hosts = open('./conf/hosts.list', 'r')
  167.         threadList = []
  168.         # create a thread for every connection
  169.         for i in range(0, connections):
  170.             threadList.append(sshThread(i, username, command, failmonDir))
  171.         # assign some hosts/connections hosts to every thread
  172.         cur = 0;
  173.         for host in hosts:
  174.             threadList[cur].addHost(host.strip().rstrip())
  175.             cur += 1
  176.             if (cur == len(threadList)):
  177.                 cur = 0    
  178.         for ready in threadList:
  179.             ready.start()
  180.         for ssht in threading.enumerate():
  181.             if ssht != threading.currentThread():
  182.                 ssht.join()
  183.         # if an upload has been done, then maybe we need to merge the
  184.         # HDFS files
  185.         if "upload" in params:
  186.             mergeFiles()
  187.     return
  188. if __name__ == '__main__':
  189.     main()