AbstractMetricsContext.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:13k
源码类别:

网格计算

开发平台:

Java

  1. /*
  2.  * AbstractMetricsContext.java
  3.  *
  4.  * Licensed to the Apache Software Foundation (ASF) under one
  5.  * or more contributor license agreements.  See the NOTICE file
  6.  * distributed with this work for additional information
  7.  * regarding copyright ownership.  The ASF licenses this file
  8.  * to you under the Apache License, Version 2.0 (the
  9.  * "License"); you may not use this file except in compliance
  10.  * with the License.  You may obtain a copy of the License at
  11.  *
  12.  *     http://www.apache.org/licenses/LICENSE-2.0
  13.  *
  14.  * Unless required by applicable law or agreed to in writing, software
  15.  * distributed under the License is distributed on an "AS IS" BASIS,
  16.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  17.  * See the License for the specific language governing permissions and
  18.  * limitations under the License.
  19.  */
  20. package org.apache.hadoop.metrics.spi;
  21. import java.io.IOException;
  22. import java.util.ArrayList;
  23. import java.util.Collection;
  24. import java.util.HashMap;
  25. import java.util.HashSet;
  26. import java.util.Iterator;
  27. import java.util.Map;
  28. import java.util.Set;
  29. import java.util.Timer;
  30. import java.util.TimerTask;
  31. import java.util.TreeMap;
  32. import java.util.Map.Entry;
  33. import org.apache.hadoop.metrics.ContextFactory;
  34. import org.apache.hadoop.metrics.MetricsContext;
  35. import org.apache.hadoop.metrics.MetricsException;
  36. import org.apache.hadoop.metrics.MetricsRecord;
  37. import org.apache.hadoop.metrics.Updater;
  38. /**
  39.  * The main class of the Service Provider Interface.  This class should be
  40.  * extended in order to integrate the Metrics API with a specific metrics
  41.  * client library. <p/>
  42.  *
  43.  * This class implements the internal table of metric data, and the timer
  44.  * on which data is to be sent to the metrics system.  Subclasses must
  45.  * override the abstract <code>emitRecord</code> method in order to transmit
  46.  * the data. <p/>
  47.  */
  48. public abstract class AbstractMetricsContext implements MetricsContext {
  49.     
  50.   private int period = MetricsContext.DEFAULT_PERIOD;
  51.   private Timer timer = null;
  52.     
  53.   private Set<Updater> updaters = new HashSet<Updater>(1);
  54.   private volatile boolean isMonitoring = false;
  55.     
  56.   private ContextFactory factory = null;
  57.   private String contextName = null;
  58.     
  59.   static class TagMap extends TreeMap<String,Object> {
  60.     private static final long serialVersionUID = 3546309335061952993L;
  61.     TagMap() {
  62.       super();
  63.     }
  64.     TagMap(TagMap orig) {
  65.       super(orig);
  66.     }
  67.     /**
  68.      * Returns true if this tagmap contains every tag in other.
  69.      */
  70.     public boolean containsAll(TagMap other) {
  71.       for (Map.Entry<String,Object> entry : other.entrySet()) {
  72.         Object value = get(entry.getKey());
  73.         if (value == null || !value.equals(entry.getValue())) {
  74.           // either key does not exist here, or the value is different
  75.           return false;
  76.         }
  77.       }
  78.       return true;
  79.     }
  80.   }
  81.   
  82.   static class MetricMap extends TreeMap<String,Number> {
  83.     private static final long serialVersionUID = -7495051861141631609L;
  84.   }
  85.             
  86.   static class RecordMap extends HashMap<TagMap,MetricMap> {
  87.     private static final long serialVersionUID = 259835619700264611L;
  88.   }
  89.     
  90.   private Map<String,RecordMap> bufferedData = new HashMap<String,RecordMap>();
  91.     
  92.   /**
  93.    * Creates a new instance of AbstractMetricsContext
  94.    */
  95.   protected AbstractMetricsContext() {
  96.   }
  97.     
  98.   /**
  99.    * Initializes the context.
  100.    */
  101.   public void init(String contextName, ContextFactory factory) 
  102.   {
  103.     this.contextName = contextName;
  104.     this.factory = factory;
  105.   }
  106.     
  107.   /**
  108.    * Convenience method for subclasses to access factory attributes.
  109.    */
  110.   protected String getAttribute(String attributeName) {
  111.     String factoryAttribute = contextName + "." + attributeName;
  112.     return (String) factory.getAttribute(factoryAttribute);  
  113.   }
  114.     
  115.   /**
  116.    * Returns an attribute-value map derived from the factory attributes
  117.    * by finding all factory attributes that begin with 
  118.    * <i>contextName</i>.<i>tableName</i>.  The returned map consists of
  119.    * those attributes with the contextName and tableName stripped off.
  120.    */
  121.   protected Map<String,String> getAttributeTable(String tableName) {
  122.     String prefix = contextName + "." + tableName + ".";
  123.     Map<String,String> result = new HashMap<String,String>();
  124.     for (String attributeName : factory.getAttributeNames()) {
  125.       if (attributeName.startsWith(prefix)) {
  126.         String name = attributeName.substring(prefix.length());
  127.         String value = (String) factory.getAttribute(attributeName);
  128.         result.put(name, value);
  129.       }
  130.     }
  131.     return result;
  132.   }
  133.     
  134.   /**
  135.    * Returns the context name.
  136.    */
  137.   public String getContextName() {
  138.     return contextName;
  139.   }
  140.     
  141.   /**
  142.    * Returns the factory by which this context was created.
  143.    */
  144.   public ContextFactory getContextFactory() {
  145.     return factory;
  146.   }
  147.     
  148.   /**
  149.    * Starts or restarts monitoring, the emitting of metrics records.
  150.    */
  151.   public synchronized void startMonitoring()
  152.     throws IOException {
  153.     if (!isMonitoring) {
  154.       startTimer();
  155.       isMonitoring = true;
  156.     }
  157.   }
  158.     
  159.   /**
  160.    * Stops monitoring.  This does not free buffered data. 
  161.    * @see #close()
  162.    */
  163.   public synchronized void stopMonitoring() {
  164.     if (isMonitoring) {
  165.       stopTimer();
  166.       isMonitoring = false;
  167.     }
  168.   }
  169.     
  170.   /**
  171.    * Returns true if monitoring is currently in progress.
  172.    */
  173.   public boolean isMonitoring() {
  174.     return isMonitoring;
  175.   }
  176.     
  177.   /**
  178.    * Stops monitoring and frees buffered data, returning this
  179.    * object to its initial state.  
  180.    */
  181.   public synchronized void close() {
  182.     stopMonitoring();
  183.     clearUpdaters();
  184.   } 
  185.     
  186.   /**
  187.    * Creates a new AbstractMetricsRecord instance with the given <code>recordName</code>.
  188.    * Throws an exception if the metrics implementation is configured with a fixed
  189.    * set of record names and <code>recordName</code> is not in that set.
  190.    * 
  191.    * @param recordName the name of the record
  192.    * @throws MetricsException if recordName conflicts with configuration data
  193.    */
  194.   public final synchronized MetricsRecord createRecord(String recordName) {
  195.     if (bufferedData.get(recordName) == null) {
  196.       bufferedData.put(recordName, new RecordMap());
  197.     }
  198.     return newRecord(recordName);
  199.   }
  200.     
  201.   /**
  202.    * Subclasses should override this if they subclass MetricsRecordImpl.
  203.    * @param recordName the name of the record
  204.    * @return newly created instance of MetricsRecordImpl or subclass
  205.    */
  206.   protected MetricsRecord newRecord(String recordName) {
  207.     return new MetricsRecordImpl(recordName, this);
  208.   }
  209.     
  210.   /**
  211.    * Registers a callback to be called at time intervals determined by
  212.    * the configuration.
  213.    *
  214.    * @param updater object to be run periodically; it should update
  215.    * some metrics records 
  216.    */
  217.   public synchronized void registerUpdater(final Updater updater) {
  218.     if (!updaters.contains(updater)) {
  219.       updaters.add(updater);
  220.     }
  221.   }
  222.     
  223.   /**
  224.    * Removes a callback, if it exists.
  225.    *
  226.    * @param updater object to be removed from the callback list
  227.    */
  228.   public synchronized void unregisterUpdater(Updater updater) {
  229.     updaters.remove(updater);
  230.   }
  231.     
  232.   private synchronized void clearUpdaters() {
  233.     updaters.clear();
  234.   }
  235.     
  236.   /**
  237.    * Starts timer if it is not already started
  238.    */
  239.   private synchronized void startTimer() {
  240.     if (timer == null) {
  241.       timer = new Timer("Timer thread for monitoring " + getContextName(), 
  242.                         true);
  243.       TimerTask task = new TimerTask() {
  244.           public void run() {
  245.             try {
  246.               timerEvent();
  247.             }
  248.             catch (IOException ioe) {
  249.               ioe.printStackTrace();
  250.             }
  251.           }
  252.         };
  253.       long millis = period * 1000;
  254.       timer.scheduleAtFixedRate(task, millis, millis);
  255.     }
  256.   }
  257.     
  258.   /**
  259.    * Stops timer if it is running
  260.    */
  261.   private synchronized void stopTimer() {
  262.     if (timer != null) {
  263.       timer.cancel();
  264.       timer = null;
  265.     }
  266.   }
  267.     
  268.   /**
  269.    * Timer callback.
  270.    */
  271.   private void timerEvent() throws IOException {
  272.     if (isMonitoring) {
  273.       Collection<Updater> myUpdaters;
  274.       synchronized (this) {
  275.         myUpdaters = new ArrayList<Updater>(updaters);
  276.       }     
  277.       // Run all the registered updates without holding a lock
  278.       // on this context
  279.       for (Updater updater : myUpdaters) {
  280.         try {
  281.           updater.doUpdates(this);
  282.         }
  283.         catch (Throwable throwable) {
  284.           throwable.printStackTrace();
  285.         }
  286.       }
  287.       emitRecords();
  288.     }
  289.   }
  290.     
  291.   /**
  292.    *  Emits the records.
  293.    */
  294.   private synchronized void emitRecords() throws IOException {
  295.     for (String recordName : bufferedData.keySet()) {
  296.       RecordMap recordMap = bufferedData.get(recordName);
  297.       synchronized (recordMap) {
  298.         Set<Entry<TagMap, MetricMap>> entrySet = recordMap.entrySet ();
  299.         for (Entry<TagMap, MetricMap> entry : entrySet) {
  300.           OutputRecord outRec = new OutputRecord(entry.getKey(), entry.getValue());
  301.           emitRecord(contextName, recordName, outRec);
  302.         }
  303.       }
  304.     }
  305.     flush();
  306.   }
  307.   /**
  308.    * Sends a record to the metrics system.
  309.    */
  310.   protected abstract void emitRecord(String contextName, String recordName, 
  311.                                      OutputRecord outRec) throws IOException;
  312.     
  313.   /**
  314.    * Called each period after all records have been emitted, this method does nothing.
  315.    * Subclasses may override it in order to perform some kind of flush.
  316.    */
  317.   protected void flush() throws IOException {
  318.   }
  319.     
  320.   /**
  321.    * Called by MetricsRecordImpl.update().  Creates or updates a row in
  322.    * the internal table of metric data.
  323.    */
  324.   protected void update(MetricsRecordImpl record) {
  325.     String recordName = record.getRecordName();
  326.     TagMap tagTable = record.getTagTable();
  327.     Map<String,MetricValue> metricUpdates = record.getMetricTable();
  328.         
  329.     RecordMap recordMap = getRecordMap(recordName);
  330.     synchronized (recordMap) {
  331.       MetricMap metricMap = recordMap.get(tagTable);
  332.       if (metricMap == null) {
  333.         metricMap = new MetricMap();
  334.         TagMap tagMap = new TagMap(tagTable); // clone tags
  335.         recordMap.put(tagMap, metricMap);
  336.       }
  337.       Set<Entry<String, MetricValue>> entrySet = metricUpdates.entrySet();
  338.       for (Entry<String, MetricValue> entry : entrySet) {
  339.         String metricName = entry.getKey ();
  340.         MetricValue updateValue = entry.getValue ();
  341.         Number updateNumber = updateValue.getNumber();
  342.         Number currentNumber = metricMap.get(metricName);
  343.         if (currentNumber == null || updateValue.isAbsolute()) {
  344.           metricMap.put(metricName, updateNumber);
  345.         }
  346.         else {
  347.           Number newNumber = sum(updateNumber, currentNumber);
  348.           metricMap.put(metricName, newNumber);
  349.         }
  350.       }
  351.     }
  352.   }
  353.     
  354.   private synchronized RecordMap getRecordMap(String recordName) {
  355.     return bufferedData.get(recordName);
  356.   }
  357.     
  358.   /**
  359.    * Adds two numbers, coercing the second to the type of the first.
  360.    *
  361.    */
  362.   private Number sum(Number a, Number b) {
  363.     if (a instanceof Integer) {
  364.       return Integer.valueOf(a.intValue() + b.intValue());
  365.     }
  366.     else if (a instanceof Float) {
  367.       return new Float(a.floatValue() + b.floatValue());
  368.     }
  369.     else if (a instanceof Short) {
  370.       return Short.valueOf((short)(a.shortValue() + b.shortValue()));
  371.     }
  372.     else if (a instanceof Byte) {
  373.       return Byte.valueOf((byte)(a.byteValue() + b.byteValue()));
  374.     }
  375.     else if (a instanceof Long) {
  376.       return Long.valueOf((a.longValue() + b.longValue()));
  377.     }
  378.     else {
  379.       // should never happen
  380.       throw new MetricsException("Invalid number type");
  381.     }
  382.             
  383.   }
  384.     
  385.   /**
  386.    * Called by MetricsRecordImpl.remove().  Removes all matching rows in
  387.    * the internal table of metric data.  A row matches if it has the same
  388.    * tag names and values as record, but it may also have additional
  389.    * tags.
  390.    */    
  391.   protected void remove(MetricsRecordImpl record) {
  392.     String recordName = record.getRecordName();
  393.     TagMap tagTable = record.getTagTable();
  394.         
  395.     RecordMap recordMap = getRecordMap(recordName);
  396.     synchronized (recordMap) {
  397.       Iterator<TagMap> it = recordMap.keySet().iterator();
  398.       while (it.hasNext()) {
  399.         TagMap rowTags = it.next();
  400.         if (rowTags.containsAll(tagTable)) {
  401.           it.remove();
  402.         }
  403.       }
  404.     }
  405.   }
  406.     
  407.   /**
  408.    * Returns the timer period.
  409.    */
  410.   public int getPeriod() {
  411.     return period;
  412.   }
  413.     
  414.   /**
  415.    * Sets the timer period
  416.    */
  417.   protected void setPeriod(int period) {
  418.     this.period = period;
  419.   }
  420. }