JobTrackerMetricsInst.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:5k
源码类别:

网格计算

开发平台:

Java

  1. /*
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import org.apache.hadoop.metrics.MetricsContext;
  20. import org.apache.hadoop.metrics.MetricsRecord;
  21. import org.apache.hadoop.metrics.MetricsUtil;
  22. import org.apache.hadoop.metrics.Updater;
  23. import org.apache.hadoop.metrics.jvm.JvmMetrics;
  24. import org.apache.hadoop.metrics.util.MetricsRegistry;
  25. import org.apache.hadoop.metrics.util.MetricsTimeVaryingInt;
  26. class JobTrackerMetricsInst extends JobTrackerInstrumentation implements Updater {
  27.   private final MetricsRecord metricsRecord;
  28.   private int numMapTasksLaunched = 0;
  29.   private int numMapTasksCompleted = 0;
  30.   private int numMapTasksFailed = 0;
  31.   private int numReduceTasksLaunched = 0;
  32.   private int numReduceTasksCompleted = 0;
  33.   private int numReduceTasksFailed = 0;
  34.   private int numJobsSubmitted = 0;
  35.   private int numJobsCompleted = 0;
  36.   private int numWaitingTasks = 0;
  37.     
  38.   public JobTrackerMetricsInst(JobTracker tracker, JobConf conf) {
  39.     super(tracker, conf);
  40.     String sessionId = conf.getSessionId();
  41.     // Initiate JVM Metrics
  42.     JvmMetrics.init("JobTracker", sessionId);
  43.     // Create a record for map-reduce metrics
  44.     MetricsContext context = MetricsUtil.getContext("mapred");
  45.     metricsRecord = MetricsUtil.createRecord(context, "jobtracker");
  46.     metricsRecord.setTag("sessionId", sessionId);
  47.     context.registerUpdater(this);
  48.   }
  49.     
  50.   /**
  51.    * Since this object is a registered updater, this method will be called
  52.    * periodically, e.g. every 5 seconds.
  53.    */
  54.   public void doUpdates(MetricsContext unused) {
  55.     synchronized (this) {
  56.       metricsRecord.incrMetric("maps_launched", numMapTasksLaunched);
  57.       metricsRecord.incrMetric("maps_completed", numMapTasksCompleted);
  58.       metricsRecord.incrMetric("maps_failed", numMapTasksFailed);
  59.       metricsRecord.incrMetric("reduces_launched", numReduceTasksLaunched);
  60.       metricsRecord.incrMetric("reduces_completed", numReduceTasksCompleted);
  61.       metricsRecord.incrMetric("reduces_failed", numReduceTasksFailed);
  62.       metricsRecord.incrMetric("jobs_submitted", numJobsSubmitted);
  63.       metricsRecord.incrMetric("jobs_completed", numJobsCompleted);
  64.       metricsRecord.incrMetric("waiting_tasks", numWaitingTasks);
  65.       numMapTasksLaunched = 0;
  66.       numMapTasksCompleted = 0;
  67.       numMapTasksFailed = 0;
  68.       numReduceTasksLaunched = 0;
  69.       numReduceTasksCompleted = 0;
  70.       numReduceTasksFailed = 0;
  71.       numWaitingTasks = 0;
  72.       numJobsSubmitted = 0;
  73.       numJobsCompleted = 0;
  74.     }
  75.     metricsRecord.update();
  76.     if (tracker != null) {
  77.       for (JobInProgress jip : tracker.getRunningJobs()) {
  78.         jip.updateMetrics();
  79.       }
  80.     }
  81.   }
  82.   @Override
  83.   public synchronized void launchMap(TaskAttemptID taskAttemptID) {
  84.     ++numMapTasksLaunched;
  85.     decWaiting(taskAttemptID.getJobID(), 1);
  86.   }
  87.   @Override
  88.   public synchronized void completeMap(TaskAttemptID taskAttemptID) {
  89.     ++numMapTasksCompleted;
  90.   }
  91.   @Override
  92.   public synchronized void failedMap(TaskAttemptID taskAttemptID) {
  93.     ++numMapTasksFailed;
  94.     addWaiting(taskAttemptID.getJobID(), 1);
  95.   }
  96.   @Override
  97.   public synchronized void launchReduce(TaskAttemptID taskAttemptID) {
  98.     ++numReduceTasksLaunched;
  99.     decWaiting(taskAttemptID.getJobID(), 1);
  100.   }
  101.   @Override
  102.   public synchronized void completeReduce(TaskAttemptID taskAttemptID) {
  103.     ++numReduceTasksCompleted;
  104.   }
  105.   @Override
  106.   public synchronized void failedReduce(TaskAttemptID taskAttemptID) {
  107.     ++numReduceTasksFailed;
  108.     addWaiting(taskAttemptID.getJobID(), 1);
  109.   }
  110.   @Override
  111.   public synchronized void submitJob(JobConf conf, JobID id) {
  112.     ++numJobsSubmitted;
  113.   }
  114.   @Override
  115.   public synchronized void completeJob(JobConf conf, JobID id) {
  116.     ++numJobsCompleted;
  117.   }
  118.   @Override
  119.   public synchronized void addWaiting(JobID id, int tasks) {
  120.     numWaitingTasks += tasks;
  121.   }
  122.   @Override
  123.   public synchronized void decWaiting(JobID id, int tasks) {
  124.     numWaitingTasks -= tasks;
  125.   }
  126. }