job_history_summary.py
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:3k
源码类别:

网格计算

开发平台:

Java

  1. #!/usr/bin/env python
  2. #
  3. # Licensed to the Apache Software Foundation (ASF) under one
  4. # or more contributor license agreements.  See the NOTICE file
  5. # distributed with this work for additional information
  6. # regarding copyright ownership.  The ASF licenses this file
  7. # to you under the Apache License, Version 2.0 (the
  8. # "License"); you may not use this file except in compliance
  9. # with the License.  You may obtain a copy of the License at
  10. #
  11. #     http://www.apache.org/licenses/LICENSE-2.0
  12. #
  13. # Unless required by applicable law or agreed to in writing, software
  14. # distributed under the License is distributed on an "AS IS" BASIS,
  15. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  16. # See the License for the specific language governing permissions and
  17. # limitations under the License.
  18. import re
  19. import sys
  20. pat = re.compile('(?P<name>[^=]+)="(?P<value>[^"]*)" *')
  21. counterPat = re.compile('(?P<name>[^:]+):(?P<value>[^,]*),?')
  22. def parse(tail):
  23.   result = {}
  24.   for n,v in re.findall(pat, tail):
  25.     result[n] = v
  26.   return result
  27. mapStartTime = {}
  28. mapEndTime = {}
  29. reduceStartTime = {}
  30. reduceShuffleTime = {}
  31. reduceSortTime = {}
  32. reduceEndTime = {}
  33. reduceBytes = {}
  34. for line in sys.stdin:
  35.   words = line.split(" ",1)
  36.   event = words[0]
  37.   attrs = parse(words[1])
  38.   if event == 'MapAttempt':
  39.     if attrs.has_key("START_TIME"):
  40.       mapStartTime[attrs["TASKID"]] = int(attrs["START_TIME"])/1000
  41.     elif attrs.has_key("FINISH_TIME"):
  42.       mapEndTime[attrs["TASKID"]] = int(attrs["FINISH_TIME"])/1000
  43.   elif event == 'ReduceAttempt':
  44.     if attrs.has_key("START_TIME"):
  45.       reduceStartTime[attrs["TASKID"]] = int(attrs["START_TIME"]) / 1000
  46.     elif attrs.has_key("FINISH_TIME"):
  47.       reduceShuffleTime[attrs["TASKID"]] = int(attrs["SHUFFLE_FINISHED"])/1000
  48.       reduceSortTime[attrs["TASKID"]] = int(attrs["SORT_FINISHED"])/1000
  49.       reduceEndTime[attrs["TASKID"]] = int(attrs["FINISH_TIME"])/1000
  50.   elif event == 'Task':
  51.     if attrs["TASK_TYPE"] == "REDUCE" and attrs.has_key("COUNTERS"):
  52.       for n,v in re.findall(counterPat, attrs["COUNTERS"]):
  53.         if n == "File Systems.HDFS bytes written":
  54.           reduceBytes[attrs["TASKID"]] = int(v)
  55. runningMaps = {}
  56. shufflingReduces = {}
  57. sortingReduces = {}
  58. runningReduces = {}
  59. startTime = min(reduce(min, mapStartTime.values()),
  60.                 reduce(min, reduceStartTime.values()))
  61. endTime = max(reduce(max, mapEndTime.values()),
  62.               reduce(max, reduceEndTime.values()))
  63. reduces = reduceBytes.keys()
  64. reduces.sort()
  65. print "Name reduce-output-bytes shuffle-finish reduce-finish"
  66. for r in reduces:
  67.   print r, reduceBytes[r], reduceShuffleTime[r] - startTime,
  68.   print reduceEndTime[r] - startTime
  69. print
  70. for t in range(startTime, endTime):
  71.   runningMaps[t] = 0
  72.   shufflingReduces[t] = 0
  73.   sortingReduces[t] = 0
  74.   runningReduces[t] = 0
  75. for map in mapStartTime.keys():
  76.   for t in range(mapStartTime[map], mapEndTime[map]):
  77.     runningMaps[t] += 1
  78. for reduce in reduceStartTime.keys():
  79.   for t in range(reduceStartTime[reduce], reduceShuffleTime[reduce]):
  80.     shufflingReduces[t] += 1
  81.   for t in range(reduceShuffleTime[reduce], reduceSortTime[reduce]):
  82.     sortingReduces[t] += 1
  83.   for t in range(reduceSortTime[reduce], reduceEndTime[reduce]):
  84.     runningReduces[t] += 1
  85. print "time maps shuffle merge reduce"
  86. for t in range(startTime, endTime):
  87.   print t - startTime, runningMaps[t], shufflingReduces[t], sortingReduces[t], 
  88.   print runningReduces[t]