- #!/usr/bin/python
- # Licensed to the Apache Software Foundation (ASF) under one
- # or more contributor license agreements. See the NOTICE file
- # distributed with this work for additional information
- # regarding copyright ownership. The ASF licenses this file
- # to you under the Apache License, Version 2.0 (the
- # "License"); you may not use this file except in compliance
- # with the License. You may obtain a copy of the License at
- #
- #
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- # Schedule FailMon execution for nodes of file hosts.list, according to
- # the properties file conf/global.config.
- import time
- import ConfigParser
- import subprocess
- import threading
- import random
- jobs = []
- username = "user"
- connections = 10
- failmonDir = ""
- maxFiles = 100
- # This class represents a thread that connects to a set of cluster
- # nodes to locally execute monitoring jobs. These jobs are specified
- # as a shell command in the constructor.
- class sshThread (threading.Thread):
- def __init__(self, threadname, username, command, failmonDir):
- threading.Thread.__init__(self)
- = threadname
- self.username = username
- self.command = command
- self.failmonDir = failmonDir
- self.hosts = []
- def addHost(self, host):
- self.hosts.append(host)
- def run (self):
- for host in self.hosts:
- toRun = ["ssh", self.username + "@" + host, "cd " + self.failmonDir + " ; " + self.command]
- print "Thread",, "invoking command on", host, ":t", toRun, "...",
- subprocess.check_call(toRun)
- print "Done!"
- # This class represents a monitoring job. The param member is a string
- # that can be passed in the '--only' list of jobs given to the Java
- # class org.apache.hadoop.contrib.failmon.RunOnce for execution on a
- # node.
- class Job:
- def __init__(self, param, interval):
- self.param = param
- self.interval = interval
- self.counter = interval
- return
- def reset(self):
- self.counter = self.interval
- # This function reads the configuration file to get the values of the
- # configuration parameters.
- def getJobs(file):
- global username
- global connections
- global jobs
- global failmonDir
- global maxFiles
- conf = ConfigParser.SafeConfigParser()
- username = conf.get("Default", "ssh.username")
- connections = int(conf.get("Default", "max.connections"))
- failmonDir = conf.get("Default", "failmon.dir")
- maxFiles = conf.get("Default", "hdfs.files.max")
- # Hadoop Log
- interval = int(conf.get("Default", "log.hadoop.interval"))
- if interval != 0:
- jobs.append(Job("hadoopLog", interval))
- # System Log
- interval = int(conf.get("Default", "log.system.interval"))
- if interval != 0:
- jobs.append(Job("systemLog", interval))
- # NICs
- interval = int(conf.get("Default", "nics.interval"))
- if interval != 0:
- jobs.append(Job("nics", interval))
- # CPU
- interval = int(conf.get("Default", "cpu.interval"))
- if interval != 0:
- jobs.append(Job("cpu", interval))
- # CPU
- interval = int(conf.get("Default", "disks.interval"))
- if interval != 0:
- jobs.append(Job("disks", interval))
- # sensors
- interval = int(conf.get("Default", "sensors.interval"))
- if interval != 0:
- jobs.append(Job("sensors", interval))
- # upload
- interval = int(conf.get("Default", "upload.interval"))
- if interval != 0:
- jobs.append(Job("upload", interval))
- return
- # Compute the gcd (Greatest Common Divisor) of two integerss
- def GCD(a, b):
- assert isinstance(a, int)
- assert isinstance(b, int)
- while a:
- a, b = b%a, a
- return b
- # Compute the gcd (Greatest Common Divisor) of a list of integers
- def listGCD(joblist):
- assert isinstance(joblist, list)
- if (len(joblist) == 1):
- return joblist[0].interval
- g = GCD(joblist[0].interval, joblist[1].interval)
- for i in range (2, len(joblist)):
- g = GCD(g, joblist[i].interval)
- return g
- # Merge all failmon files created on the HDFS into a single file
- def mergeFiles():
- global username
- global failmonDir
- hostList = []
- hosts = open('./conf/hosts.list', 'r')
- for host in hosts:
- hostList.append(host.strip().rstrip())
- randomHost = random.sample(hostList, 1)
- mergeCommand = "bin/ --mergeFiles"
- toRun = ["ssh", username + "@" + randomHost[0], "cd " + failmonDir + " ; " + mergeCommand]
- print "Invoking command on", randomHost, ":t", mergeCommand, "...",
- subprocess.check_call(toRun)
- print "Done!"
- return
- # The actual scheduling is done here
- def main():
- getJobs("./conf/global.config")
- for job in jobs:
- print "Configuration: ", job.param, "every", job.interval, "seconds"
- globalInterval = listGCD(jobs)
- while True :
- time.sleep(globalInterval)
- params = []
- for job in jobs:
- job.counter -= globalInterval
- if (job.counter <= 0):
- params.append(job.param)
- job.reset()
- if (len(params) == 0):
- continue;
- onlyStr = "--only " + params[0]
- for i in range(1, len(params)):
- onlyStr += ',' + params[i]
- command = "bin/ " + onlyStr
- # execute on all nodes
- hosts = open('./conf/hosts.list', 'r')
- threadList = []
- # create a thread for every connection
- for i in range(0, connections):
- threadList.append(sshThread(i, username, command, failmonDir))
- # assign some hosts/connections hosts to every thread
- cur = 0;
- for host in hosts:
- threadList[cur].addHost(host.strip().rstrip())
- cur += 1
- if (cur == len(threadList)):
- cur = 0
- for ready in threadList:
- ready.start()
- for ssht in threading.enumerate():
- if ssht != threading.currentThread():
- ssht.join()
- # if an upload has been done, then maybe we need to merge the
- # HDFS files
- if "upload" in params:
- mergeFiles()
- return
- if __name__ == '__main__':
- main()