JobBase.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:5k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.contrib.utils.join;
  19. import java.util.SortedMap;
  20. import java.util.TreeMap;
  21. import java.util.Map.Entry;
  22. import java.util.Iterator;
  23. import org.apache.commons.logging.Log;
  24. import org.apache.commons.logging.LogFactory;
  25. import org.apache.hadoop.mapred.JobConf;
  26. import org.apache.hadoop.mapred.Mapper;
  27. import org.apache.hadoop.mapred.Reducer;
  28. /**
  29.  * A common base implementing some statics collecting mechanisms that are
  30.  * commonly used in a typical map/reduce job.
  31.  * 
  32.  */
  33. public abstract class JobBase implements Mapper, Reducer {
  34.   public static final Log LOG = LogFactory.getLog("datajoin.job");
  35.   private SortedMap<Object, Long> longCounters = null;
  36.   private SortedMap<Object, Double> doubleCounters = null;
  37.   /**
  38.    * Set the given counter to the given value
  39.    * 
  40.    * @param name
  41.    *          the counter name
  42.    * @param value
  43.    *          the value for the counter
  44.    */
  45.   protected void setLongValue(Object name, long value) {
  46.     this.longCounters.put(name, new Long(value));
  47.   }
  48.   /**
  49.    * Set the given counter to the given value
  50.    * 
  51.    * @param name
  52.    *          the counter name
  53.    * @param value
  54.    *          the value for the counter
  55.    */
  56.   protected void setDoubleValue(Object name, double value) {
  57.     this.doubleCounters.put(name, new Double(value));
  58.   }
  59.   /**
  60.    * 
  61.    * @param name
  62.    *          the counter name
  63.    * @return return the value of the given counter.
  64.    */
  65.   protected Long getLongValue(Object name) {
  66.     return this.longCounters.get(name);
  67.   }
  68.   /**
  69.    * 
  70.    * @param name
  71.    *          the counter name
  72.    * @return return the value of the given counter.
  73.    */
  74.   protected Double getDoubleValue(Object name) {
  75.     return this.doubleCounters.get(name);
  76.   }
  77.   /**
  78.    * Increment the given counter by the given incremental value If the counter
  79.    * does not exist, one is created with value 0.
  80.    * 
  81.    * @param name
  82.    *          the counter name
  83.    * @param inc
  84.    *          the incremental value
  85.    * @return the updated value.
  86.    */
  87.   protected Long addLongValue(Object name, long inc) {
  88.     Long val = this.longCounters.get(name);
  89.     Long retv = null;
  90.     if (val == null) {
  91.       retv = new Long(inc);
  92.     } else {
  93.       retv = new Long(val.longValue() + inc);
  94.     }
  95.     this.longCounters.put(name, retv);
  96.     return retv;
  97.   }
  98.   /**
  99.    * Increment the given counter by the given incremental value If the counter
  100.    * does not exist, one is created with value 0.
  101.    * 
  102.    * @param name
  103.    *          the counter name
  104.    * @param inc
  105.    *          the incremental value
  106.    * @return the updated value.
  107.    */
  108.   protected Double addDoubleValue(Object name, double inc) {
  109.     Double val = this.doubleCounters.get(name);
  110.     Double retv = null;
  111.     if (val == null) {
  112.       retv = new Double(inc);
  113.     } else {
  114.       retv = new Double(val.doubleValue() + inc);
  115.     }
  116.     this.doubleCounters.put(name, retv);
  117.     return retv;
  118.   }
  119.   /**
  120.    * log the counters
  121.    * 
  122.    */
  123.   protected void report() {
  124.     LOG.info(getReport());
  125.   }
  126.   /**
  127.    * log the counters
  128.    * 
  129.    */
  130.   protected String getReport() {
  131.     StringBuffer sb = new StringBuffer();
  132.     Iterator iter = this.longCounters.entrySet().iterator();
  133.     while (iter.hasNext()) {
  134.       Entry e = (Entry) iter.next();
  135.       sb.append(e.getKey().toString()).append("t").append(e.getValue())
  136.         .append("n");
  137.     }
  138.     iter = this.doubleCounters.entrySet().iterator();
  139.     while (iter.hasNext()) {
  140.       Entry e = (Entry) iter.next();
  141.       sb.append(e.getKey().toString()).append("t").append(e.getValue())
  142.         .append("n");
  143.     }
  144.     return sb.toString();
  145.   }
  146.   /**
  147.    * Initializes a new instance from a {@link JobConf}.
  148.    * 
  149.    * @param job
  150.    *          the configuration
  151.    */
  152.   public void configure(JobConf job) {
  153.     this.longCounters = new TreeMap<Object, Long>();
  154.     this.doubleCounters = new TreeMap<Object, Double>();
  155.   }
  156. }