Chain.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:21k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred.lib;
  19. import org.apache.hadoop.io.DataOutputBuffer;
  20. import org.apache.hadoop.io.Stringifier;
  21. import org.apache.hadoop.io.DefaultStringifier;
  22. import org.apache.hadoop.io.serializer.Deserializer;
  23. import org.apache.hadoop.io.serializer.Serialization;
  24. import org.apache.hadoop.io.serializer.SerializationFactory;
  25. import org.apache.hadoop.io.serializer.Serializer;
  26. import org.apache.hadoop.mapred.*;
  27. import org.apache.hadoop.util.ReflectionUtils;
  28. import org.apache.hadoop.util.GenericsUtil;
  29. import java.io.ByteArrayInputStream;
  30. import java.io.IOException;
  31. import java.util.ArrayList;
  32. import java.util.List;
  33. import java.util.Map;
  34. /**
  35.  * The Chain class provides all the common functionality for the
  36.  * {@link ChainMapper} and the {@link ChainReducer} classes.
  37.  */
  38. class Chain {
  39.   private static final String CHAIN_MAPPER = "chain.mapper";
  40.   private static final String CHAIN_REDUCER = "chain.reducer";
  41.   private static final String CHAIN_MAPPER_SIZE = ".size";
  42.   private static final String CHAIN_MAPPER_CLASS = ".mapper.class.";
  43.   private static final String CHAIN_MAPPER_CONFIG = ".mapper.config.";
  44.   private static final String CHAIN_REDUCER_CLASS = ".reducer.class";
  45.   private static final String CHAIN_REDUCER_CONFIG = ".reducer.config";
  46.   private static final String MAPPER_BY_VALUE = "chain.mapper.byValue";
  47.   private static final String REDUCER_BY_VALUE = "chain.reducer.byValue";
  48.   private static final String MAPPER_INPUT_KEY_CLASS =
  49.     "chain.mapper.input.key.class";
  50.   private static final String MAPPER_INPUT_VALUE_CLASS =
  51.     "chain.mapper.input.value.class";
  52.   private static final String MAPPER_OUTPUT_KEY_CLASS =
  53.     "chain.mapper.output.key.class";
  54.   private static final String MAPPER_OUTPUT_VALUE_CLASS =
  55.     "chain.mapper.output.value.class";
  56.   private static final String REDUCER_INPUT_KEY_CLASS =
  57.     "chain.reducer.input.key.class";
  58.   private static final String REDUCER_INPUT_VALUE_CLASS =
  59.     "chain.reducer.input.value.class";
  60.   private static final String REDUCER_OUTPUT_KEY_CLASS =
  61.     "chain.reducer.output.key.class";
  62.   private static final String REDUCER_OUTPUT_VALUE_CLASS =
  63.     "chain.reducer.output.value.class";
  64.   private boolean isMap;
  65.   private JobConf chainJobConf;
  66.   private List<Mapper> mappers = new ArrayList<Mapper>();
  67.   private Reducer reducer;
  68.   // to cache the key/value output class serializations for each chain element
  69.   // to avoid everytime lookup.
  70.   private List<Serialization> mappersKeySerialization =
  71.     new ArrayList<Serialization>();
  72.   private List<Serialization> mappersValueSerialization =
  73.     new ArrayList<Serialization>();
  74.   private Serialization reducerKeySerialization;
  75.   private Serialization reducerValueSerialization;
  76.   /**
  77.    * Creates a Chain instance configured for a Mapper or a Reducer.
  78.    *
  79.    * @param isMap TRUE indicates the chain is for a Mapper, FALSE that is for a
  80.    *              Reducer.
  81.    */
  82.   Chain(boolean isMap) {
  83.     this.isMap = isMap;
  84.   }
  85.   /**
  86.    * Returns the prefix to use for the configuration of the chain depending
  87.    * if it is for a Mapper or a Reducer.
  88.    *
  89.    * @param isMap TRUE for Mapper, FALSE for Reducer.
  90.    * @return the prefix to use.
  91.    */
  92.   private static String getPrefix(boolean isMap) {
  93.     return (isMap) ? CHAIN_MAPPER : CHAIN_REDUCER;
  94.   }
  95.   /**
  96.    * Creates a {@link JobConf} for one of the Maps or Reduce in the chain.
  97.    * <p/>
  98.    * It creates a new JobConf using the chain job's JobConf as base and adds to
  99.    * it the configuration properties for the chain element. The keys of the
  100.    * chain element jobConf have precedence over the given JobConf.
  101.    *
  102.    * @param jobConf the chain job's JobConf.
  103.    * @param confKey the key for chain element configuration serialized in the
  104.    *                chain job's JobConf.
  105.    * @return a new JobConf aggregating the chain job's JobConf with the chain
  106.    *         element configuration properties.
  107.    */
  108.   private static JobConf getChainElementConf(JobConf jobConf, String confKey) {
  109.     JobConf conf;
  110.     try {
  111.       Stringifier<JobConf> stringifier =
  112.         new DefaultStringifier<JobConf>(jobConf, JobConf.class);
  113.       conf = stringifier.fromString(jobConf.get(confKey, null));
  114.     } catch (IOException ioex) {
  115.       throw new RuntimeException(ioex);
  116.     }
  117.     // we have to do this because the Writable desearialization clears all
  118.     // values set in the conf making not possible do do a new JobConf(jobConf)
  119.     // in the creation of the conf above
  120.     jobConf = new JobConf(jobConf);
  121.     for(Map.Entry<String, String> entry : conf) {
  122.       jobConf.set(entry.getKey(), entry.getValue());
  123.     }
  124.     return jobConf;
  125.   }
  126.   /**
  127.    * Adds a Mapper class to the chain job's JobConf.
  128.    * <p/>
  129.    * The configuration properties of the chain job have precedence over the
  130.    * configuration properties of the Mapper.
  131.    *
  132.    * @param isMap            indicates if the Chain is for a Mapper or for a
  133.    * Reducer.
  134.    * @param jobConf              chain job's JobConf to add the Mapper class.
  135.    * @param klass            the Mapper class to add.
  136.    * @param inputKeyClass    mapper input key class.
  137.    * @param inputValueClass  mapper input value class.
  138.    * @param outputKeyClass   mapper output key class.
  139.    * @param outputValueClass mapper output value class.
  140.    * @param byValue          indicates if key/values should be passed by value
  141.    * to the next Mapper in the chain, if any.
  142.    * @param mapperConf       a JobConf with the configuration for the Mapper
  143.    * class. It is recommended to use a JobConf without default values using the
  144.    * <code>JobConf(boolean loadDefaults)</code> constructor with FALSE.
  145.    */
  146.   public static <K1, V1, K2, V2> void addMapper(boolean isMap, JobConf jobConf,
  147.                            Class<? extends Mapper<K1, V1, K2, V2>> klass,
  148.                            Class<? extends K1> inputKeyClass,
  149.                            Class<? extends V1> inputValueClass,
  150.                            Class<? extends K2> outputKeyClass,
  151.                            Class<? extends V2> outputValueClass,
  152.                            boolean byValue, JobConf mapperConf) {
  153.     String prefix = getPrefix(isMap);
  154.     // if a reducer chain check the Reducer has been already set
  155.     if (!isMap) {
  156.       if (jobConf.getClass(prefix + CHAIN_REDUCER_CLASS,
  157.                            Reducer.class) == null) {
  158.         throw new IllegalStateException(
  159.           "A Mapper can be added to the chain only after the Reducer has " +
  160.           "been set");
  161.       }
  162.     }
  163.     int index = jobConf.getInt(prefix + CHAIN_MAPPER_SIZE, 0);
  164.     jobConf.setClass(prefix + CHAIN_MAPPER_CLASS + index, klass, Mapper.class);
  165.     // if it is a reducer chain and the first Mapper is being added check the
  166.     // key and value input classes of the mapper match those of the reducer
  167.     // output.
  168.     if (!isMap && index == 0) {
  169.       JobConf reducerConf =
  170.         getChainElementConf(jobConf, prefix + CHAIN_REDUCER_CONFIG);
  171.       if (! inputKeyClass.isAssignableFrom(
  172.         reducerConf.getClass(REDUCER_OUTPUT_KEY_CLASS, null))) {
  173.         throw new IllegalArgumentException("The Reducer output key class does" +
  174.           " not match the Mapper input key class");
  175.       }
  176.       if (! inputValueClass.isAssignableFrom(
  177.         reducerConf.getClass(REDUCER_OUTPUT_VALUE_CLASS, null))) {
  178.         throw new IllegalArgumentException("The Reducer output value class" +
  179.           " does not match the Mapper input value class");
  180.       }
  181.     } else if (index > 0) {
  182.       // check the that the new Mapper in the chain key and value input classes
  183.       // match those of the previous Mapper output.
  184.       JobConf previousMapperConf =
  185.         getChainElementConf(jobConf, prefix + CHAIN_MAPPER_CONFIG +
  186.           (index - 1));
  187.       if (! inputKeyClass.isAssignableFrom(
  188.         previousMapperConf.getClass(MAPPER_OUTPUT_KEY_CLASS, null))) {
  189.         throw new IllegalArgumentException("The Mapper output key class does" +
  190.           " not match the previous Mapper input key class");
  191.       }
  192.       if (! inputValueClass.isAssignableFrom(
  193.         previousMapperConf.getClass(MAPPER_OUTPUT_VALUE_CLASS, null))) {
  194.         throw new IllegalArgumentException("The Mapper output value class" +
  195.           " does not match the previous Mapper input value class");
  196.       }
  197.     }
  198.     // if the Mapper does not have a private JobConf create an empty one
  199.     if (mapperConf == null) {
  200.       // using a JobConf without defaults to make it lightweight.
  201.       // still the chain JobConf may have all defaults and this conf is
  202.       // overlapped to the chain JobConf one.
  203.       mapperConf = new JobConf(true);
  204.     }
  205.     // store in the private mapper conf the input/output classes of the mapper
  206.     // and if it works by value or by reference
  207.     mapperConf.setBoolean(MAPPER_BY_VALUE, byValue);
  208.     mapperConf.setClass(MAPPER_INPUT_KEY_CLASS, inputKeyClass, Object.class);
  209.     mapperConf.setClass(MAPPER_INPUT_VALUE_CLASS, inputValueClass,
  210.                         Object.class);
  211.     mapperConf.setClass(MAPPER_OUTPUT_KEY_CLASS, outputKeyClass, Object.class);
  212.     mapperConf.setClass(MAPPER_OUTPUT_VALUE_CLASS, outputValueClass,
  213.                         Object.class);
  214.     // serialize the private mapper jobconf in the chain jobconf.
  215.     Stringifier<JobConf> stringifier =
  216.       new DefaultStringifier<JobConf>(jobConf, JobConf.class);
  217.     try {
  218.       jobConf.set(prefix + CHAIN_MAPPER_CONFIG + index,
  219.                   stringifier.toString(new JobConf(mapperConf)));
  220.     }
  221.     catch (IOException ioEx) {
  222.       throw new RuntimeException(ioEx);
  223.     }
  224.     // increment the chain counter
  225.     jobConf.setInt(prefix + CHAIN_MAPPER_SIZE, index + 1);
  226.   }
  227.   /**
  228.    * Sets the Reducer class to the chain job's JobConf.
  229.    * <p/>
  230.    * The configuration properties of the chain job have precedence over the
  231.    * configuration properties of the Reducer.
  232.    *
  233.    * @param jobConf              chain job's JobConf to add the Reducer class.
  234.    * @param klass            the Reducer class to add.
  235.    * @param inputKeyClass    reducer input key class.
  236.    * @param inputValueClass  reducer input value class.
  237.    * @param outputKeyClass   reducer output key class.
  238.    * @param outputValueClass reducer output value class.
  239.    * @param byValue          indicates if key/values should be passed by value
  240.    * to the next Mapper in the chain, if any.
  241.    * @param reducerConf      a JobConf with the configuration for the Reducer
  242.    * class. It is recommended to use a JobConf without default values using the
  243.    * <code>JobConf(boolean loadDefaults)</code> constructor with FALSE.
  244.    */
  245.   public static <K1, V1, K2, V2> void setReducer(JobConf jobConf,
  246.                           Class<? extends Reducer<K1, V1, K2, V2>> klass,
  247.                           Class<? extends K1> inputKeyClass,
  248.                           Class<? extends V1> inputValueClass,
  249.                           Class<? extends K2> outputKeyClass,
  250.                           Class<? extends V2> outputValueClass,
  251.                           boolean byValue, JobConf reducerConf) {
  252.     String prefix = getPrefix(false);
  253.     if (jobConf.getClass(prefix + CHAIN_REDUCER_CLASS, null) != null) {
  254.       throw new IllegalStateException("Reducer has been already set");
  255.     }
  256.     jobConf.setClass(prefix + CHAIN_REDUCER_CLASS, klass, Reducer.class);
  257.     // if the Reducer does not have a private JobConf create an empty one
  258.     if (reducerConf == null) {
  259.       // using a JobConf without defaults to make it lightweight.
  260.       // still the chain JobConf may have all defaults and this conf is
  261.       // overlapped to the chain JobConf one.
  262.       reducerConf = new JobConf(false);
  263.     }
  264.     // store in the private reducer conf the input/output classes of the reducer
  265.     // and if it works by value or by reference
  266.     reducerConf.setBoolean(MAPPER_BY_VALUE, byValue);
  267.     reducerConf.setClass(REDUCER_INPUT_KEY_CLASS, inputKeyClass, Object.class);
  268.     reducerConf.setClass(REDUCER_INPUT_VALUE_CLASS, inputValueClass,
  269.                          Object.class);
  270.     reducerConf.setClass(REDUCER_OUTPUT_KEY_CLASS, outputKeyClass,
  271.                          Object.class);
  272.     reducerConf.setClass(REDUCER_OUTPUT_VALUE_CLASS, outputValueClass,
  273.                          Object.class);
  274.     // serialize the private mapper jobconf in the chain jobconf.
  275.     Stringifier<JobConf> stringifier =
  276.       new DefaultStringifier<JobConf>(jobConf, JobConf.class);
  277.     try {
  278.       jobConf.set(prefix + CHAIN_REDUCER_CONFIG,
  279.                   stringifier.toString(new JobConf(reducerConf)));
  280.     }
  281.     catch (IOException ioEx) {
  282.       throw new RuntimeException(ioEx);
  283.     }
  284.   }
  285.   /**
  286.    * Configures all the chain elements for the task.
  287.    *
  288.    * @param jobConf chain job's JobConf.
  289.    */
  290.   public void configure(JobConf jobConf) {
  291.     String prefix = getPrefix(isMap);
  292.     chainJobConf = jobConf;
  293.     SerializationFactory serializationFactory =
  294.       new SerializationFactory(chainJobConf);
  295.     int index = jobConf.getInt(prefix + CHAIN_MAPPER_SIZE, 0);
  296.     for (int i = 0; i < index; i++) {
  297.       Class<? extends Mapper> klass =
  298.         jobConf.getClass(prefix + CHAIN_MAPPER_CLASS + i, null, Mapper.class);
  299.       JobConf mConf =
  300.         getChainElementConf(jobConf, prefix + CHAIN_MAPPER_CONFIG + i);
  301.       Mapper mapper = ReflectionUtils.newInstance(klass, mConf);
  302.       mappers.add(mapper);
  303.       if (mConf.getBoolean(MAPPER_BY_VALUE, true)) {
  304.         mappersKeySerialization.add(serializationFactory.getSerialization(
  305.           mConf.getClass(MAPPER_OUTPUT_KEY_CLASS, null)));
  306.         mappersValueSerialization.add(serializationFactory.getSerialization(
  307.           mConf.getClass(MAPPER_OUTPUT_VALUE_CLASS, null)));
  308.       } else {
  309.         mappersKeySerialization.add(null);
  310.         mappersValueSerialization.add(null);
  311.       }
  312.     }
  313.     Class<? extends Reducer> klass =
  314.       jobConf.getClass(prefix + CHAIN_REDUCER_CLASS, null, Reducer.class);
  315.     if (klass != null) {
  316.       JobConf rConf =
  317.         getChainElementConf(jobConf, prefix + CHAIN_REDUCER_CONFIG);
  318.       reducer = ReflectionUtils.newInstance(klass, rConf);
  319.       if (rConf.getBoolean(REDUCER_BY_VALUE, true)) {
  320.         reducerKeySerialization = serializationFactory
  321.           .getSerialization(rConf.getClass(REDUCER_OUTPUT_KEY_CLASS, null));
  322.         reducerValueSerialization = serializationFactory
  323.           .getSerialization(rConf.getClass(REDUCER_OUTPUT_VALUE_CLASS, null));
  324.       } else {
  325.         reducerKeySerialization = null;
  326.         reducerValueSerialization = null;
  327.       }
  328.     }
  329.   }
  330.   /**
  331.    * Returns the chain job conf.
  332.    *
  333.    * @return the chain job conf.
  334.    */
  335.   protected JobConf getChainJobConf() {
  336.     return chainJobConf;
  337.   }
  338.   /**
  339.    * Returns the first Mapper instance in the chain.
  340.    *
  341.    * @return the first Mapper instance in the chain or NULL if none.
  342.    */
  343.   public Mapper getFirstMap() {
  344.     return (mappers.size() > 0) ? mappers.get(0) : null;
  345.   }
  346.   /**
  347.    * Returns the Reducer instance in the chain.
  348.    *
  349.    * @return the Reducer instance in the chain or NULL if none.
  350.    */
  351.   public Reducer getReducer() {
  352.     return reducer;
  353.   }
  354.   /**
  355.    * Returns the OutputCollector to be used by a Mapper instance in the chain.
  356.    *
  357.    * @param mapperIndex index of the Mapper instance to get the OutputCollector.
  358.    * @param output      the original OutputCollector of the task.
  359.    * @param reporter    the reporter of the task.
  360.    * @return the OutputCollector to be used in the chain.
  361.    */
  362.   @SuppressWarnings({"unchecked"})
  363.   public OutputCollector getMapperCollector(int mapperIndex,
  364.                                             OutputCollector output,
  365.                                             Reporter reporter) {
  366.     Serialization keySerialization = mappersKeySerialization.get(mapperIndex);
  367.     Serialization valueSerialization =
  368.       mappersValueSerialization.get(mapperIndex);
  369.     return new ChainOutputCollector(mapperIndex, keySerialization,
  370.                                     valueSerialization, output, reporter);
  371.   }
  372.   /**
  373.    * Returns the OutputCollector to be used by a Mapper instance in the chain.
  374.    *
  375.    * @param output   the original OutputCollector of the task.
  376.    * @param reporter the reporter of the task.
  377.    * @return the OutputCollector to be used in the chain.
  378.    */
  379.   @SuppressWarnings({"unchecked"})
  380.   public OutputCollector getReducerCollector(OutputCollector output,
  381.                                              Reporter reporter) {
  382.     return new ChainOutputCollector(reducerKeySerialization,
  383.                                     reducerValueSerialization, output,
  384.                                     reporter);
  385.   }
  386.   /**
  387.    * Closes all the chain elements.
  388.    *
  389.    * @throws IOException thrown if any of the chain elements threw an
  390.    *                     IOException exception.
  391.    */
  392.   public void close() throws IOException {
  393.     for (Mapper map : mappers) {
  394.       map.close();
  395.     }
  396.     if (reducer != null) {
  397.       reducer.close();
  398.     }
  399.   }
  400.   // using a ThreadLocal to reuse the ByteArrayOutputStream used for ser/deser
  401.   // it has to be a thread local because if not it would break if used from a
  402.   // MultiThreadedMapRunner.
  403.   private ThreadLocal<DataOutputBuffer> threadLocalDataOutputBuffer =
  404.     new ThreadLocal<DataOutputBuffer>() {
  405.       protected DataOutputBuffer initialValue() {
  406.         return new DataOutputBuffer(1024);
  407.       }
  408.     };
  409.   /**
  410.    * OutputCollector implementation used by the chain tasks.
  411.    * <p/>
  412.    * If it is not the end of the chain, a {@link #collect} invocation invokes
  413.    * the next Mapper in the chain. If it is the end of the chain the task
  414.    * OutputCollector is called.
  415.    */
  416.   private class ChainOutputCollector<K, V> implements OutputCollector<K, V> {
  417.     private int nextMapperIndex;
  418.     private Serialization<K> keySerialization;
  419.     private Serialization<V> valueSerialization;
  420.     private OutputCollector output;
  421.     private Reporter reporter;
  422.     /*
  423.      * Constructor for Mappers
  424.      */
  425.     public ChainOutputCollector(int index, Serialization<K> keySerialization,
  426.                                 Serialization<V> valueSerialization,
  427.                                 OutputCollector output, Reporter reporter) {
  428.       this.nextMapperIndex = index + 1;
  429.       this.keySerialization = keySerialization;
  430.       this.valueSerialization = valueSerialization;
  431.       this.output = output;
  432.       this.reporter = reporter;
  433.     }
  434.     /*
  435.      * Constructor for Reducer
  436.      */
  437.     public ChainOutputCollector(Serialization<K> keySerialization,
  438.                                 Serialization<V> valueSerialization,
  439.                                 OutputCollector output, Reporter reporter) {
  440.       this.nextMapperIndex = 0;
  441.       this.keySerialization = keySerialization;
  442.       this.valueSerialization = valueSerialization;
  443.       this.output = output;
  444.       this.reporter = reporter;
  445.     }
  446.     @SuppressWarnings({"unchecked"})
  447.     public void collect(K key, V value) throws IOException {
  448.       if (nextMapperIndex < mappers.size()) {
  449.         // there is a next mapper in chain
  450.         // only need to ser/deser if there is next mapper in the chain
  451.         if (keySerialization != null) {
  452.           key = makeCopyForPassByValue(keySerialization, key);
  453.           value = makeCopyForPassByValue(valueSerialization, value);
  454.         }
  455.         // gets ser/deser and mapper of next in chain
  456.         Serialization nextKeySerialization =
  457.           mappersKeySerialization.get(nextMapperIndex);
  458.         Serialization nextValueSerialization =
  459.           mappersValueSerialization.get(nextMapperIndex);
  460.         Mapper nextMapper = mappers.get(nextMapperIndex);
  461.         // invokes next mapper in chain
  462.         nextMapper.map(key, value,
  463.                        new ChainOutputCollector(nextMapperIndex,
  464.                                                 nextKeySerialization,
  465.                                                 nextValueSerialization,
  466.                                                 output, reporter),
  467.                        reporter);
  468.       } else {
  469.         // end of chain, user real output collector
  470.         output.collect(key, value);
  471.       }
  472.     }
  473.     private <E> E makeCopyForPassByValue(Serialization<E> serialization,
  474.                                           E obj) throws IOException {
  475.       Serializer<E> ser =
  476.         serialization.getSerializer(GenericsUtil.getClass(obj));
  477.       Deserializer<E> deser =
  478.         serialization.getDeserializer(GenericsUtil.getClass(obj));
  479.       DataOutputBuffer dof = threadLocalDataOutputBuffer.get();
  480.       dof.reset();
  481.       ser.open(dof);
  482.       ser.serialize(obj);
  483.       ser.close();
  484.       obj = ReflectionUtils.newInstance(GenericsUtil.getClass(obj),
  485.                                         getChainJobConf());
  486.       ByteArrayInputStream bais =
  487.         new ByteArrayInputStream(dof.getData(), 0, dof.getLength());
  488.       deser.open(bais);
  489.       deser.deserialize(obj);
  490.       deser.close();
  491.       return obj;
  492.     }
  493.   }
  494. }