CompositeContext.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:5k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.metrics.spi;
  19. import java.io.IOException;
  20. import java.lang.reflect.InvocationHandler;
  21. import java.lang.reflect.Method;
  22. import java.lang.reflect.Proxy;
  23. import java.util.ArrayList;
  24. import org.apache.commons.logging.Log;
  25. import org.apache.commons.logging.LogFactory;
  26. import org.apache.hadoop.metrics.ContextFactory;
  27. import org.apache.hadoop.metrics.MetricsContext;
  28. import org.apache.hadoop.metrics.MetricsException;
  29. import org.apache.hadoop.metrics.MetricsRecord;
  30. import org.apache.hadoop.metrics.MetricsUtil;
  31. import org.apache.hadoop.metrics.Updater;
  32. public class CompositeContext extends AbstractMetricsContext {
  33.   private static final Log LOG = LogFactory.getLog(CompositeContext.class);
  34.   private static final String ARITY_LABEL = "arity";
  35.   private static final String SUB_FMT = "%s.sub%d";
  36.   private final ArrayList<MetricsContext> subctxt =
  37.     new ArrayList<MetricsContext>();
  38.   public CompositeContext() {
  39.   }
  40.   public void init(String contextName, ContextFactory factory) {
  41.     super.init(contextName, factory);
  42.     int nKids;
  43.     try {
  44.       String sKids = getAttribute(ARITY_LABEL);
  45.       nKids = Integer.valueOf(sKids);
  46.     } catch (Exception e) {
  47.       LOG.error("Unable to initialize composite metric " + contextName +
  48.                 ": could not init arity", e);
  49.       return;
  50.     }
  51.     for (int i = 0; i < nKids; ++i) {
  52.       MetricsContext ctxt = MetricsUtil.getContext(
  53.           String.format(SUB_FMT, contextName, i), contextName);
  54.       if (null != ctxt) {
  55.         subctxt.add(ctxt);
  56.       }
  57.     }
  58.   }
  59.   @Override
  60.   public MetricsRecord newRecord(String recordName) {
  61.     return (MetricsRecord) Proxy.newProxyInstance(
  62.         MetricsRecord.class.getClassLoader(),
  63.         new Class[] { MetricsRecord.class },
  64.         new MetricsRecordDelegator(recordName, subctxt));
  65.   }
  66.   @Override
  67.   protected void emitRecord(String contextName, String recordName,
  68.       OutputRecord outRec) throws IOException {
  69.     for (MetricsContext ctxt : subctxt) {
  70.       try {
  71.         ((AbstractMetricsContext)ctxt).emitRecord(
  72.           contextName, recordName, outRec);
  73.         if (contextName == null || recordName == null || outRec == null) {
  74.           throw new IOException(contextName + ":" + recordName + ":" + outRec);
  75.         }
  76.       } catch (IOException e) {
  77.         LOG.warn("emitRecord failed: " + ctxt.getContextName(), e);
  78.       }
  79.     }
  80.   }
  81.   @Override
  82.   protected void flush() throws IOException {
  83.     for (MetricsContext ctxt : subctxt) {
  84.       try {
  85.         ((AbstractMetricsContext)ctxt).flush();
  86.       } catch (IOException e) {
  87.         LOG.warn("flush failed: " + ctxt.getContextName(), e);
  88.       }
  89.     }
  90.   }
  91.   @Override
  92.   public void startMonitoring() throws IOException {
  93.     for (MetricsContext ctxt : subctxt) {
  94.       try {
  95.         ctxt.startMonitoring();
  96.       } catch (IOException e) {
  97.         LOG.warn("startMonitoring failed: " + ctxt.getContextName(), e);
  98.       }
  99.     }
  100.   }
  101.   @Override
  102.   public void stopMonitoring() {
  103.     for (MetricsContext ctxt : subctxt) {
  104.       ctxt.stopMonitoring();
  105.     }
  106.   }
  107.   /**
  108.    * Return true if all subcontexts are monitoring.
  109.    */
  110.   @Override
  111.   public boolean isMonitoring() {
  112.     boolean ret = true;
  113.     for (MetricsContext ctxt : subctxt) {
  114.       ret &= ctxt.isMonitoring();
  115.     }
  116.     return ret;
  117.   }
  118.   @Override
  119.   public void close() {
  120.     for (MetricsContext ctxt : subctxt) {
  121.       ctxt.close();
  122.     }
  123.   }
  124.   @Override
  125.   public void registerUpdater(Updater updater) {
  126.     for (MetricsContext ctxt : subctxt) {
  127.       ctxt.registerUpdater(updater);
  128.     }
  129.   }
  130.   @Override
  131.   public void unregisterUpdater(Updater updater) {
  132.     for (MetricsContext ctxt : subctxt) {
  133.       ctxt.unregisterUpdater(updater);
  134.     }
  135.   }
  136.   private static class MetricsRecordDelegator implements InvocationHandler {
  137.     private static final Method m_getRecordName = initMethod();
  138.     private static Method initMethod() {
  139.       try {
  140.         return MetricsRecord.class.getMethod("getRecordName", new Class[0]);
  141.       } catch (Exception e) {
  142.         throw new RuntimeException("Internal error", e);
  143.       }
  144.     }
  145.     private final String recordName;
  146.     private final ArrayList<MetricsRecord> subrecs;
  147.     MetricsRecordDelegator(String recordName, ArrayList<MetricsContext> ctxts) {
  148.       this.recordName = recordName;
  149.       this.subrecs = new ArrayList<MetricsRecord>(ctxts.size());
  150.       for (MetricsContext ctxt : ctxts) {
  151.         subrecs.add(ctxt.createRecord(recordName));
  152.       }
  153.     }
  154.     public Object invoke(Object p, Method m, Object[] args) throws Throwable {
  155.       if (m_getRecordName.equals(m)) {
  156.         return recordName;
  157.       }
  158.       assert Void.TYPE.equals(m.getReturnType());
  159.       for (MetricsRecord rec : subrecs) {
  160.         m.invoke(rec, args);
  161.       }
  162.       return null;
  163.     }
  164.   }
  165. }