DataJoinReducerBase.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:8k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.contrib.utils.join;
  19. import java.io.IOException;
  20. import java.util.Iterator;
  21. import java.util.SortedMap;
  22. import java.util.TreeMap;
  23. import org.apache.hadoop.io.Text;
  24. import org.apache.hadoop.io.WritableUtils;
  25. import org.apache.hadoop.mapred.JobConf;
  26. import org.apache.hadoop.mapred.OutputCollector;
  27. import org.apache.hadoop.mapred.Reporter;
  28. /**
  29.  * This abstract class serves as the base class for the reducer class of a data
  30.  * join job. The reduce function will first group the values according to their
  31.  * input tags, and then compute the cross product of over the groups. For each
  32.  * tuple in the cross product, it calls the following method, which is expected
  33.  * to be implemented in a subclass.
  34.  * 
  35.  * protected abstract TaggedMapOutput combine(Object[] tags, Object[] values);
  36.  * 
  37.  * The above method is expected to produce one output value from an array of
  38.  * records of different sources. The user code can also perform filtering here.
  39.  * It can return null if it decides to the records do not meet certain
  40.  * conditions.
  41.  * 
  42.  */
  43. public abstract class DataJoinReducerBase extends JobBase {
  44.   protected Reporter reporter = null;
  45.   private long maxNumOfValuesPerGroup = 100;
  46.   protected long largestNumOfValues = 0;
  47.   protected long numOfValues = 0;
  48.   protected long collected = 0;
  49.   protected JobConf job;
  50.   public void close() throws IOException {
  51.     if (this.reporter != null) {
  52.       this.reporter.setStatus(super.getReport());
  53.     }
  54.   }
  55.   public void configure(JobConf job) {
  56.     super.configure(job);
  57.     this.job = job;
  58.     this.maxNumOfValuesPerGroup = job.getLong("datajoin.maxNumOfValuesPerGroup", 100);
  59.   }
  60.   /**
  61.    * The subclass can provide a different implementation on ResetableIterator.
  62.    * This is necessary if the number of values in a reduce call is very high.
  63.    * 
  64.    * The default provided here uses ArrayListBackedIterator
  65.    * 
  66.    * @return an Object of ResetableIterator.
  67.    */
  68.   protected ResetableIterator createResetableIterator() {
  69.     return new ArrayListBackedIterator();
  70.   }
  71.   /**
  72.    * This is the function that re-groups values for a key into sub-groups based
  73.    * on a secondary key (input tag).
  74.    * 
  75.    * @param arg1
  76.    * @return
  77.    */
  78.   private SortedMap<Object, ResetableIterator> regroup(Object key,
  79.                                                        Iterator arg1, Reporter reporter) throws IOException {
  80.     this.numOfValues = 0;
  81.     SortedMap<Object, ResetableIterator> retv = new TreeMap<Object, ResetableIterator>();
  82.     TaggedMapOutput aRecord = null;
  83.     while (arg1.hasNext()) {
  84.       this.numOfValues += 1;
  85.       if (this.numOfValues % 100 == 0) {
  86.         reporter.setStatus("key: " + key.toString() + " numOfValues: "
  87.                            + this.numOfValues);
  88.       }
  89.       if (this.numOfValues > this.maxNumOfValuesPerGroup) {
  90.         continue;
  91.       }
  92.       aRecord = ((TaggedMapOutput) arg1.next()).clone(job);
  93.       Text tag = aRecord.getTag();
  94.       ResetableIterator data = retv.get(tag);
  95.       if (data == null) {
  96.         data = createResetableIterator();
  97.         retv.put(tag, data);
  98.       }
  99.       data.add(aRecord);
  100.     }
  101.     if (this.numOfValues > this.largestNumOfValues) {
  102.       this.largestNumOfValues = numOfValues;
  103.       LOG.info("key: " + key.toString() + " this.largestNumOfValues: "
  104.                + this.largestNumOfValues);
  105.     }
  106.     return retv;
  107.   }
  108.   public void reduce(Object key, Iterator values,
  109.                      OutputCollector output, Reporter reporter) throws IOException {
  110.     if (this.reporter == null) {
  111.       this.reporter = reporter;
  112.     }
  113.     SortedMap<Object, ResetableIterator> groups = regroup(key, values, reporter);
  114.     Object[] tags = groups.keySet().toArray();
  115.     ResetableIterator[] groupValues = new ResetableIterator[tags.length];
  116.     for (int i = 0; i < tags.length; i++) {
  117.       groupValues[i] = groups.get(tags[i]);
  118.     }
  119.     joinAndCollect(tags, groupValues, key, output, reporter);
  120.     addLongValue("groupCount", 1);
  121.     for (int i = 0; i < tags.length; i++) {
  122.       groupValues[i].close();
  123.     }
  124.   }
  125.   /**
  126.    * The subclass can overwrite this method to perform additional filtering
  127.    * and/or other processing logic before a value is collected.
  128.    * 
  129.    * @param key
  130.    * @param aRecord
  131.    * @param output
  132.    * @param reporter
  133.    * @throws IOException
  134.    */
  135.   protected void collect(Object key, TaggedMapOutput aRecord,
  136.                          OutputCollector output, Reporter reporter) throws IOException {
  137.     this.collected += 1;
  138.     addLongValue("collectedCount", 1);
  139.     if (aRecord != null) {
  140.       output.collect(key, aRecord.getData());
  141.       reporter.setStatus("key: " + key.toString() + " collected: " + collected);
  142.       addLongValue("actuallyCollectedCount", 1);
  143.     }
  144.   }
  145.   /**
  146.    * join the list of the value lists, and collect the results.
  147.    * 
  148.    * @param tags
  149.    *          a list of input tags
  150.    * @param values
  151.    *          a list of value lists, each corresponding to one input source
  152.    * @param key
  153.    * @param output
  154.    * @throws IOException
  155.    */
  156.   private void joinAndCollect(Object[] tags, ResetableIterator[] values,
  157.                               Object key, OutputCollector output, Reporter reporter)
  158.     throws IOException {
  159.     if (values.length < 1) {
  160.       return;
  161.     }
  162.     Object[] partialList = new Object[values.length];
  163.     joinAndCollect(tags, values, 0, partialList, key, output, reporter);
  164.   }
  165.   /**
  166.    * Perform the actual join recursively.
  167.    * 
  168.    * @param tags
  169.    *          a list of input tags
  170.    * @param values
  171.    *          a list of value lists, each corresponding to one input source
  172.    * @param pos
  173.    *          indicating the next value list to be joined
  174.    * @param partialList
  175.    *          a list of values, each from one value list considered so far.
  176.    * @param key
  177.    * @param output
  178.    * @throws IOException
  179.    */
  180.   private void joinAndCollect(Object[] tags, ResetableIterator[] values,
  181.                               int pos, Object[] partialList, Object key,
  182.                               OutputCollector output, Reporter reporter) throws IOException {
  183.     if (values.length == pos) {
  184.       // get a value from each source. Combine them
  185.       TaggedMapOutput combined = combine(tags, partialList);
  186.       collect(key, combined, output, reporter);
  187.       return;
  188.     }
  189.     ResetableIterator nextValues = values[pos];
  190.     nextValues.reset();
  191.     while (nextValues.hasNext()) {
  192.       Object v = nextValues.next();
  193.       partialList[pos] = v;
  194.       joinAndCollect(tags, values, pos + 1, partialList, key, output, reporter);
  195.     }
  196.   }
  197.   public static Text SOURCE_TAGS_FIELD = new Text("SOURCE_TAGS");
  198.   public static Text NUM_OF_VALUES_FIELD = new Text("NUM_OF_VALUES");
  199.   /**
  200.    * 
  201.    * @param tags
  202.    *          a list of source tags
  203.    * @param values
  204.    *          a value per source
  205.    * @return combined value derived from values of the sources
  206.    */
  207.   protected abstract TaggedMapOutput combine(Object[] tags, Object[] values);
  208.   public void map(Object arg0, Object arg1, OutputCollector arg2,
  209.                   Reporter arg3) throws IOException {
  210.     // TODO Auto-generated method stub
  211.   }
  212. }