GangliaContext.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:7k
源码类别:

网格计算

开发平台:

Java

  1. /*
  2.  * GangliaContext.java
  3.  *
  4.  * Licensed to the Apache Software Foundation (ASF) under one
  5.  * or more contributor license agreements.  See the NOTICE file
  6.  * distributed with this work for additional information
  7.  * regarding copyright ownership.  The ASF licenses this file
  8.  * to you under the Apache License, Version 2.0 (the
  9.  * "License"); you may not use this file except in compliance
  10.  * with the License.  You may obtain a copy of the License at
  11.  *
  12.  *     http://www.apache.org/licenses/LICENSE-2.0
  13.  *
  14.  * Unless required by applicable law or agreed to in writing, software
  15.  * distributed under the License is distributed on an "AS IS" BASIS,
  16.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  17.  * See the License for the specific language governing permissions and
  18.  * limitations under the License.
  19.  */
  20. package org.apache.hadoop.metrics.ganglia;
  21. import java.io.IOException;
  22. import java.net.DatagramPacket;
  23. import java.net.DatagramSocket;
  24. import java.net.SocketAddress;
  25. import java.net.SocketException;
  26. import java.util.HashMap;
  27. import java.util.List;
  28. import java.util.Map;
  29. import org.apache.commons.logging.Log;
  30. import org.apache.commons.logging.LogFactory;
  31. import org.apache.hadoop.metrics.ContextFactory;
  32. import org.apache.hadoop.metrics.MetricsException;
  33. import org.apache.hadoop.metrics.spi.AbstractMetricsContext;
  34. import org.apache.hadoop.metrics.spi.OutputRecord;
  35. import org.apache.hadoop.metrics.spi.Util;
  36. /**
  37.  * Context for sending metrics to Ganglia.
  38.  * 
  39.  */
  40. public class GangliaContext extends AbstractMetricsContext {
  41.     
  42.   private static final String PERIOD_PROPERTY = "period";
  43.   private static final String SERVERS_PROPERTY = "servers";
  44.   private static final String UNITS_PROPERTY = "units";
  45.   private static final String SLOPE_PROPERTY = "slope";
  46.   private static final String TMAX_PROPERTY = "tmax";
  47.   private static final String DMAX_PROPERTY = "dmax";
  48.     
  49.   private static final String DEFAULT_UNITS = "";
  50.   private static final String DEFAULT_SLOPE = "both";
  51.   private static final int DEFAULT_TMAX = 60;
  52.   private static final int DEFAULT_DMAX = 0;
  53.   private static final int DEFAULT_PORT = 8649;
  54.   private static final int BUFFER_SIZE = 1500;       // as per libgmond.c
  55.   private final Log LOG = LogFactory.getLog(this.getClass());    
  56.   private static final Map<Class,String> typeTable = new HashMap<Class,String>(5);
  57.     
  58.   static {
  59.     typeTable.put(String.class, "string");
  60.     typeTable.put(Byte.class, "int8");
  61.     typeTable.put(Short.class, "int16");
  62.     typeTable.put(Integer.class, "int32");
  63.     typeTable.put(Long.class, "float");
  64.     typeTable.put(Float.class, "float");
  65.   }
  66.     
  67.   private byte[] buffer = new byte[BUFFER_SIZE];
  68.   private int offset;
  69.     
  70.   private List<? extends SocketAddress> metricsServers;
  71.   private Map<String,String> unitsTable;
  72.   private Map<String,String> slopeTable;
  73.   private Map<String,String> tmaxTable;
  74.   private Map<String,String> dmaxTable;
  75.     
  76.   private DatagramSocket datagramSocket;
  77.     
  78.   /** Creates a new instance of GangliaContext */
  79.   public GangliaContext() {
  80.   }
  81.     
  82.   public void init(String contextName, ContextFactory factory) {
  83.     super.init(contextName, factory);
  84.         
  85.     String periodStr = getAttribute(PERIOD_PROPERTY);
  86.     if (periodStr != null) {
  87.       int period = 0;
  88.       try {
  89.         period = Integer.parseInt(periodStr);
  90.       } catch (NumberFormatException nfe) {
  91.       }
  92.       if (period <= 0) {
  93.         throw new MetricsException("Invalid period: " + periodStr);
  94.       }
  95.       setPeriod(period);
  96.     }
  97.         
  98.     metricsServers = 
  99.       Util.parse(getAttribute(SERVERS_PROPERTY), DEFAULT_PORT); 
  100.         
  101.     unitsTable = getAttributeTable(UNITS_PROPERTY);
  102.     slopeTable = getAttributeTable(SLOPE_PROPERTY);
  103.     tmaxTable  = getAttributeTable(TMAX_PROPERTY);
  104.     dmaxTable  = getAttributeTable(DMAX_PROPERTY);
  105.         
  106.     try {
  107.       datagramSocket = new DatagramSocket();
  108.     }
  109.     catch (SocketException se) {
  110.       se.printStackTrace();
  111.     }
  112.   }
  113.   public void emitRecord(String contextName, String recordName,
  114.     OutputRecord outRec) 
  115.   throws IOException {
  116.     // Setup so that the records have the proper leader names so they are
  117.     // unambiguous at the ganglia level, and this prevents a lot of rework
  118.     StringBuilder sb = new StringBuilder();
  119.     sb.append(contextName);
  120.     sb.append('.');
  121.     sb.append(recordName);
  122.     sb.append('.');
  123.     int sbBaseLen = sb.length();
  124.     // emit each metric in turn
  125.     for (String metricName : outRec.getMetricNames()) {
  126.       Object metric = outRec.getMetric(metricName);
  127.       String type = typeTable.get(metric.getClass());
  128.       if (type != null) {
  129.         sb.append(metricName);
  130.         emitMetric(sb.toString(), type, metric.toString());
  131.         sb.setLength(sbBaseLen);
  132.       } else {
  133.         LOG.warn("Unknown metrics type: " + metric.getClass());
  134.       }
  135.     }
  136.   }
  137.     
  138.   private void emitMetric(String name, String type,  String value) 
  139.   throws IOException {
  140.     String units = getUnits(name);
  141.     int slope = getSlope(name);
  142.     int tmax = getTmax(name);
  143.     int dmax = getDmax(name);
  144.         
  145.     offset = 0;
  146.     xdr_int(0);             // metric_user_defined
  147.     xdr_string(type);
  148.     xdr_string(name);
  149.     xdr_string(value);
  150.     xdr_string(units);
  151.     xdr_int(slope);
  152.     xdr_int(tmax);
  153.     xdr_int(dmax);
  154.         
  155.     for (SocketAddress socketAddress : metricsServers) {
  156.       DatagramPacket packet = 
  157.         new DatagramPacket(buffer, offset, socketAddress);
  158.       datagramSocket.send(packet);
  159.     }
  160.   }
  161.     
  162.   private String getUnits(String metricName) {
  163.     String result = unitsTable.get(metricName);
  164.     if (result == null) {
  165.       result = DEFAULT_UNITS;
  166.     }
  167.     return result;
  168.   }
  169.     
  170.   private int getSlope(String metricName) {
  171.     String slopeString = slopeTable.get(metricName);
  172.     if (slopeString == null) {
  173.       slopeString = DEFAULT_SLOPE; 
  174.     }
  175.     return ("zero".equals(slopeString) ? 0 : 3); // see gmetric.c
  176.   }
  177.     
  178.   private int getTmax(String metricName) {
  179.     if (tmaxTable == null) {
  180.       return DEFAULT_TMAX;
  181.     }
  182.     String tmaxString = tmaxTable.get(metricName);
  183.     if (tmaxString == null) {
  184.       return DEFAULT_TMAX;
  185.     }
  186.     else {
  187.       return Integer.parseInt(tmaxString);
  188.     }
  189.   }
  190.     
  191.   private int getDmax(String metricName) {
  192.     String dmaxString = dmaxTable.get(metricName);
  193.     if (dmaxString == null) {
  194.       return DEFAULT_DMAX;
  195.     }
  196.     else {
  197.       return Integer.parseInt(dmaxString);
  198.     }
  199.   }
  200.     
  201.   /**
  202.    * Puts a string into the buffer by first writing the size of the string
  203.    * as an int, followed by the bytes of the string, padded if necessary to
  204.    * a multiple of 4.
  205.    */
  206.   private void xdr_string(String s) {
  207.     byte[] bytes = s.getBytes();
  208.     int len = bytes.length;
  209.     xdr_int(len);
  210.     System.arraycopy(bytes, 0, buffer, offset, len);
  211.     offset += len;
  212.     pad();
  213.   }
  214.   /**
  215.    * Pads the buffer with zero bytes up to the nearest multiple of 4.
  216.    */
  217.   private void pad() {
  218.     int newOffset = ((offset + 3) / 4) * 4;
  219.     while (offset < newOffset) {
  220.       buffer[offset++] = 0;
  221.     }
  222.   }
  223.         
  224.   /**
  225.    * Puts an integer into the buffer as 4 bytes, big-endian.
  226.    */
  227.   private void xdr_int(int i) {
  228.     buffer[offset++] = (byte)((i >> 24) & 0xff);
  229.     buffer[offset++] = (byte)((i >> 16) & 0xff);
  230.     buffer[offset++] = (byte)((i >> 8) & 0xff);
  231.     buffer[offset++] = (byte)(i & 0xff);
  232.   }
  233. }