logcondense.py
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:7k
源码类别:

网格计算

开发平台:

Java

  1. #!/bin/sh
  2. #Licensed to the Apache Software Foundation (ASF) under one
  3. #or more contributor license agreements.  See the NOTICE file
  4. #distributed with this work for additional information
  5. #regarding copyright ownership.  The ASF licenses this file
  6. #to you under the Apache License, Version 2.0 (the
  7. #"License"); you may not use this file except in compliance
  8. #with the License.  You may obtain a copy of the License at
  9. #     http://www.apache.org/licenses/LICENSE-2.0
  10. #Unless required by applicable law or agreed to in writing, software
  11. #distributed under the License is distributed on an "AS IS" BASIS,
  12. #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. #See the License for the specific language governing permissions and
  14. #limitations under the License.
  15. """:"
  16. work_dir=$(dirname $0)
  17. base_name=$(basename $0)
  18. cd $work_dir
  19. if [ $HOD_PYTHON_HOME ]; then
  20. exec $HOD_PYTHON_HOME -OO -u $base_name ${1+"$@"}
  21. elif [ -e /usr/bin/python ]; then
  22. exec /usr/bin/python -OO -u $base_name ${1+"$@"}
  23. elif [ -e /usr/local/bin/python ]; then
  24. exec /usr/local/bin/python -OO -u $base_name ${1+"$@"}
  25. else
  26. exec python -OO -u $base_name ${1+"$@"}
  27. fi
  28. ":"""
  29.       
  30. from os import popen3
  31. import os, sys
  32. import re
  33. import time
  34. from datetime import datetime
  35. from optparse import OptionParser
  36. myName          = os.path.basename(sys.argv[0])
  37. myName          = re.sub(".*/", "", myName)
  38. reVersion = re.compile(".*(d+_d+).*")
  39. VERSION = '$HeadURL: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.20/src/contrib/hod/support/logcondense.py $'
  40. reMatch = reVersion.match(VERSION)
  41. if reMatch:
  42.     VERSION = reMatch.group(1)
  43.     VERSION = re.sub("_", ".", VERSION)
  44. else:
  45.     VERSION = 'DEV'
  46. options = ( {'short'   : "-p",
  47.              'long'    : "--package",
  48.              'type'    : "string",
  49.              'action'  : "store",
  50.              'dest'    : "package",
  51.              'metavar' : " ",
  52.              'default' : 'hadoop',
  53.              'help'    : "Bin file for hadoop"},
  54.     {'short'   : "-d",
  55.      'long'    : "--days",
  56.      'type'    : "int",
  57.      'action'  : "store",
  58.      'dest'    : "days",
  59.      'metavar' : " ",
  60.      'default' : 7,
  61.      'help'    : "Number of days before logs are deleted"},
  62.     
  63.     {'short'   : "-c",
  64.      'long'    : "--config",
  65.      'type'    : "string",
  66.      'action'  : "store",
  67.      'dest'    : "config",
  68.      'metavar' : " ",
  69.      'default' : None,
  70.      'help'    : "config directory for hadoop"},
  71.     
  72.     {'short'   : "-l",
  73.      'long'    : "--logs",
  74.      'type'    : "string",
  75.      'action'  : "store",
  76.      'dest'    : "log",
  77.      'metavar' : " ",
  78.      'default' : "/user",
  79.      'help'    : "directory prefix under which logs are stored per user"},
  80.     {'short'   : "-n",
  81.      'long'    : "--dynamicdfs",
  82.      'type'    : "string",
  83.      'action'  : "store",
  84.      'dest'    : "dynamicdfs",
  85.      'metavar' : " ",
  86.      'default' : "false",
  87.      'help'    : "'true', if the cluster is used to bring up dynamic dfs clusters, 'false' otherwise"}
  88.     )
  89. def getDfsCommand(options, args):
  90.   if (options.config == None): 
  91.     cmd = options.package + " " + "dfs " + args
  92.   else:
  93.     cmd = options.package + " " + "--config " + options.config + " dfs " + args
  94.   return cmd
  95. def runcondense():
  96.   import shutil
  97.   
  98.   options = process_args()
  99.   # if the cluster is used to bring up dynamic dfs, we must leave NameNode and JobTracker logs, 
  100.   # otherwise only JobTracker logs. Likewise, in case of dynamic dfs, we must also look for
  101.   # deleting datanode logs
  102.   filteredNames = ['jobtracker']
  103.   deletedNamePrefixes = ['*-tasktracker-*']
  104.   if options.dynamicdfs == 'true':
  105.     filteredNames.append('namenode')
  106.     deletedNamePrefixes.append('*-datanode-*')
  107.   filepath = '%s/*/hod-logs/' % (options.log)
  108.   cmd = getDfsCommand(options, "-lsr " + filepath)
  109.   (stdin, stdout, stderr) = popen3(cmd)
  110.   lastjobid = 'none'
  111.   toPurge = { }
  112.   for line in stdout:
  113.     try:
  114.       m = re.match("^.*s(.*)n$", line)
  115.       filename = m.group(1)
  116.       # file name format: <prefix>/<user>/hod-logs/<jobid>/[0-9]*-[jobtracker|tasktracker|datanode|namenode|]-hostname-YYYYMMDDtime-random.tar.gz
  117.       # first strip prefix:
  118.       if filename.startswith(options.log):
  119.         filename = filename.lstrip(options.log)
  120.         if not filename.startswith('/'):
  121.           filename = '/' + filename
  122.       else:
  123.         continue
  124.     
  125.       # Now get other details from filename.
  126.       k = re.match("/(.*)/hod-logs/(.*)/.*-.*-([0-9][0-9][0-9][0-9])([0-9][0-9])([0-9][0-9]).*$", filename)
  127.       if k:
  128.         username = k.group(1)
  129.         jobid =  k.group(2)
  130.         datetimefile = datetime(int(k.group(3)), int(k.group(4)), int(k.group(5)))
  131.         datetimenow = datetime.utcnow()
  132.         diff = datetimenow - datetimefile
  133.         filedate = k.group(3) + k.group(4) + k.group(5)
  134.         newdate = datetimenow.strftime("%Y%m%d")
  135.         print "%s %s %s %d" % (filename,  filedate, newdate, diff.days)
  136.         # if the cluster is used to bring up dynamic dfs, we must also leave NameNode logs.
  137.         foundFilteredName = False
  138.         for name in filteredNames:
  139.           if filename.find(name) >= 0:
  140.             foundFilteredName = True
  141.             break
  142.         if foundFilteredName:
  143.           continue
  144.         if (diff.days > options.days):
  145.           desttodel = filename
  146.           if not toPurge.has_key(jobid):
  147.             toPurge[jobid] = options.log.rstrip("/") + "/" + username + "/hod-logs/" + jobid
  148.     except Exception, e:
  149.       print >> sys.stderr, e
  150.   for job in toPurge.keys():
  151.     try:
  152.       for prefix in deletedNamePrefixes:
  153.         cmd = getDfsCommand(options, "-rm " + toPurge[job] + '/' + prefix)
  154.         print cmd
  155.         ret = 0
  156.         ret = os.system(cmd)
  157.         if (ret != 0):
  158.           print >> sys.stderr, "Command failed to delete file " + cmd 
  159.     except Exception, e:
  160.       print >> sys.stderr, e
  161.   
  162. def process_args():
  163.   global options, myName, VERSION
  164.   
  165.   usage = "usage: %s <ARGS>" % (myName)
  166.   
  167.   version = "%s %s" % (myName, VERSION)
  168.   
  169.   argParser = OptionParser(usage=usage, version=VERSION)
  170.   
  171.   for option_element in options:
  172.     argParser.add_option(option_element['short'], option_element['long'],
  173.  type=option_element['type'], action=option_element['action'],
  174.  dest=option_element['dest'], default=option_element['default'],
  175.  metavar=option_element['metavar'], help=option_element['help'])
  176.   (parsedOptions, args) = argParser.parse_args()
  177.   
  178.   if not os.path.exists(parsedOptions.package):
  179.     argParser.error("Could not find path to hadoop binary: %s" % parsedOptions.package)
  180.   if not os.path.exists(parsedOptions.config):
  181.     argParser.error("Could not find config: %s" % parsedOptions.config)
  182.   if parsedOptions.days <= 0:
  183.     argParser.error("Invalid number of days specified, must be > 0: %s" % parsedOptions.config)
  184.   if parsedOptions.dynamicdfs!='true' and parsedOptions.dynamicdfs!='false':
  185.     argParser.error("Invalid option for dynamicdfs, must be true or false: %s" % parsedOptions.dynamicdfs)
  186.   return parsedOptions
  187.   
  188.   
  189. if __name__ == '__main__':
  190.   runcondense()
  191.