Chain.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:21k
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.apache.hadoop.mapred.lib;
- import org.apache.hadoop.io.DataOutputBuffer;
- import org.apache.hadoop.io.Stringifier;
- import org.apache.hadoop.io.DefaultStringifier;
- import org.apache.hadoop.io.serializer.Deserializer;
- import org.apache.hadoop.io.serializer.Serialization;
- import org.apache.hadoop.io.serializer.SerializationFactory;
- import org.apache.hadoop.io.serializer.Serializer;
- import org.apache.hadoop.mapred.*;
- import org.apache.hadoop.util.ReflectionUtils;
- import org.apache.hadoop.util.GenericsUtil;
- import java.io.ByteArrayInputStream;
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Map;
- /**
- * The Chain class provides all the common functionality for the
- * {@link ChainMapper} and the {@link ChainReducer} classes.
- */
- class Chain {
- private static final String CHAIN_MAPPER = "chain.mapper";
- private static final String CHAIN_REDUCER = "chain.reducer";
- private static final String CHAIN_MAPPER_SIZE = ".size";
- private static final String CHAIN_MAPPER_CLASS = ".mapper.class.";
- private static final String CHAIN_MAPPER_CONFIG = ".mapper.config.";
- private static final String CHAIN_REDUCER_CLASS = ".reducer.class";
- private static final String CHAIN_REDUCER_CONFIG = ".reducer.config";
- private static final String MAPPER_BY_VALUE = "chain.mapper.byValue";
- private static final String REDUCER_BY_VALUE = "chain.reducer.byValue";
- private static final String MAPPER_INPUT_KEY_CLASS =
- "chain.mapper.input.key.class";
- private static final String MAPPER_INPUT_VALUE_CLASS =
- "chain.mapper.input.value.class";
- private static final String MAPPER_OUTPUT_KEY_CLASS =
- "chain.mapper.output.key.class";
- private static final String MAPPER_OUTPUT_VALUE_CLASS =
- "chain.mapper.output.value.class";
- private static final String REDUCER_INPUT_KEY_CLASS =
- "chain.reducer.input.key.class";
- private static final String REDUCER_INPUT_VALUE_CLASS =
- "chain.reducer.input.value.class";
- private static final String REDUCER_OUTPUT_KEY_CLASS =
- "chain.reducer.output.key.class";
- private static final String REDUCER_OUTPUT_VALUE_CLASS =
- "chain.reducer.output.value.class";
- private boolean isMap;
- private JobConf chainJobConf;
- private List<Mapper> mappers = new ArrayList<Mapper>();
- private Reducer reducer;
- // to cache the key/value output class serializations for each chain element
- // to avoid everytime lookup.
- private List<Serialization> mappersKeySerialization =
- new ArrayList<Serialization>();
- private List<Serialization> mappersValueSerialization =
- new ArrayList<Serialization>();
- private Serialization reducerKeySerialization;
- private Serialization reducerValueSerialization;
- /**
- * Creates a Chain instance configured for a Mapper or a Reducer.
- *
- * @param isMap TRUE indicates the chain is for a Mapper, FALSE that is for a
- * Reducer.
- */
- Chain(boolean isMap) {
- this.isMap = isMap;
- }
- /**
- * Returns the prefix to use for the configuration of the chain depending
- * if it is for a Mapper or a Reducer.
- *
- * @param isMap TRUE for Mapper, FALSE for Reducer.
- * @return the prefix to use.
- */
- private static String getPrefix(boolean isMap) {
- return (isMap) ? CHAIN_MAPPER : CHAIN_REDUCER;
- }
- /**
- * Creates a {@link JobConf} for one of the Maps or Reduce in the chain.
- * <p/>
- * It creates a new JobConf using the chain job's JobConf as base and adds to
- * it the configuration properties for the chain element. The keys of the
- * chain element jobConf have precedence over the given JobConf.
- *
- * @param jobConf the chain job's JobConf.
- * @param confKey the key for chain element configuration serialized in the
- * chain job's JobConf.
- * @return a new JobConf aggregating the chain job's JobConf with the chain
- * element configuration properties.
- */
- private static JobConf getChainElementConf(JobConf jobConf, String confKey) {
- JobConf conf;
- try {
- Stringifier<JobConf> stringifier =
- new DefaultStringifier<JobConf>(jobConf, JobConf.class);
- conf = stringifier.fromString(jobConf.get(confKey, null));
- } catch (IOException ioex) {
- throw new RuntimeException(ioex);
- }
- // we have to do this because the Writable desearialization clears all
- // values set in the conf making not possible do do a new JobConf(jobConf)
- // in the creation of the conf above
- jobConf = new JobConf(jobConf);
- for(Map.Entry<String, String> entry : conf) {
- jobConf.set(entry.getKey(), entry.getValue());
- }
- return jobConf;
- }
- /**
- * Adds a Mapper class to the chain job's JobConf.
- * <p/>
- * The configuration properties of the chain job have precedence over the
- * configuration properties of the Mapper.
- *
- * @param isMap indicates if the Chain is for a Mapper or for a
- * Reducer.
- * @param jobConf chain job's JobConf to add the Mapper class.
- * @param klass the Mapper class to add.
- * @param inputKeyClass mapper input key class.
- * @param inputValueClass mapper input value class.
- * @param outputKeyClass mapper output key class.
- * @param outputValueClass mapper output value class.
- * @param byValue indicates if key/values should be passed by value
- * to the next Mapper in the chain, if any.
- * @param mapperConf a JobConf with the configuration for the Mapper
- * class. It is recommended to use a JobConf without default values using the
- * <code>JobConf(boolean loadDefaults)</code> constructor with FALSE.
- */
- public static <K1, V1, K2, V2> void addMapper(boolean isMap, JobConf jobConf,
- Class<? extends Mapper<K1, V1, K2, V2>> klass,
- Class<? extends K1> inputKeyClass,
- Class<? extends V1> inputValueClass,
- Class<? extends K2> outputKeyClass,
- Class<? extends V2> outputValueClass,
- boolean byValue, JobConf mapperConf) {
- String prefix = getPrefix(isMap);
- // if a reducer chain check the Reducer has been already set
- if (!isMap) {
- if (jobConf.getClass(prefix + CHAIN_REDUCER_CLASS,
- Reducer.class) == null) {
- throw new IllegalStateException(
- "A Mapper can be added to the chain only after the Reducer has " +
- "been set");
- }
- }
- int index = jobConf.getInt(prefix + CHAIN_MAPPER_SIZE, 0);
- jobConf.setClass(prefix + CHAIN_MAPPER_CLASS + index, klass, Mapper.class);
- // if it is a reducer chain and the first Mapper is being added check the
- // key and value input classes of the mapper match those of the reducer
- // output.
- if (!isMap && index == 0) {
- JobConf reducerConf =
- getChainElementConf(jobConf, prefix + CHAIN_REDUCER_CONFIG);
- if (! inputKeyClass.isAssignableFrom(
- reducerConf.getClass(REDUCER_OUTPUT_KEY_CLASS, null))) {
- throw new IllegalArgumentException("The Reducer output key class does" +
- " not match the Mapper input key class");
- }
- if (! inputValueClass.isAssignableFrom(
- reducerConf.getClass(REDUCER_OUTPUT_VALUE_CLASS, null))) {
- throw new IllegalArgumentException("The Reducer output value class" +
- " does not match the Mapper input value class");
- }
- } else if (index > 0) {
- // check the that the new Mapper in the chain key and value input classes
- // match those of the previous Mapper output.
- JobConf previousMapperConf =
- getChainElementConf(jobConf, prefix + CHAIN_MAPPER_CONFIG +
- (index - 1));
- if (! inputKeyClass.isAssignableFrom(
- previousMapperConf.getClass(MAPPER_OUTPUT_KEY_CLASS, null))) {
- throw new IllegalArgumentException("The Mapper output key class does" +
- " not match the previous Mapper input key class");
- }
- if (! inputValueClass.isAssignableFrom(
- previousMapperConf.getClass(MAPPER_OUTPUT_VALUE_CLASS, null))) {
- throw new IllegalArgumentException("The Mapper output value class" +
- " does not match the previous Mapper input value class");
- }
- }
- // if the Mapper does not have a private JobConf create an empty one
- if (mapperConf == null) {
- // using a JobConf without defaults to make it lightweight.
- // still the chain JobConf may have all defaults and this conf is
- // overlapped to the chain JobConf one.
- mapperConf = new JobConf(true);
- }
- // store in the private mapper conf the input/output classes of the mapper
- // and if it works by value or by reference
- mapperConf.setBoolean(MAPPER_BY_VALUE, byValue);
- mapperConf.setClass(MAPPER_INPUT_KEY_CLASS, inputKeyClass, Object.class);
- mapperConf.setClass(MAPPER_INPUT_VALUE_CLASS, inputValueClass,
- Object.class);
- mapperConf.setClass(MAPPER_OUTPUT_KEY_CLASS, outputKeyClass, Object.class);
- mapperConf.setClass(MAPPER_OUTPUT_VALUE_CLASS, outputValueClass,
- Object.class);
- // serialize the private mapper jobconf in the chain jobconf.
- Stringifier<JobConf> stringifier =
- new DefaultStringifier<JobConf>(jobConf, JobConf.class);
- try {
- jobConf.set(prefix + CHAIN_MAPPER_CONFIG + index,
- stringifier.toString(new JobConf(mapperConf)));
- }
- catch (IOException ioEx) {
- throw new RuntimeException(ioEx);
- }
- // increment the chain counter
- jobConf.setInt(prefix + CHAIN_MAPPER_SIZE, index + 1);
- }
- /**
- * Sets the Reducer class to the chain job's JobConf.
- * <p/>
- * The configuration properties of the chain job have precedence over the
- * configuration properties of the Reducer.
- *
- * @param jobConf chain job's JobConf to add the Reducer class.
- * @param klass the Reducer class to add.
- * @param inputKeyClass reducer input key class.
- * @param inputValueClass reducer input value class.
- * @param outputKeyClass reducer output key class.
- * @param outputValueClass reducer output value class.
- * @param byValue indicates if key/values should be passed by value
- * to the next Mapper in the chain, if any.
- * @param reducerConf a JobConf with the configuration for the Reducer
- * class. It is recommended to use a JobConf without default values using the
- * <code>JobConf(boolean loadDefaults)</code> constructor with FALSE.
- */
- public static <K1, V1, K2, V2> void setReducer(JobConf jobConf,
- Class<? extends Reducer<K1, V1, K2, V2>> klass,
- Class<? extends K1> inputKeyClass,
- Class<? extends V1> inputValueClass,
- Class<? extends K2> outputKeyClass,
- Class<? extends V2> outputValueClass,
- boolean byValue, JobConf reducerConf) {
- String prefix = getPrefix(false);
- if (jobConf.getClass(prefix + CHAIN_REDUCER_CLASS, null) != null) {
- throw new IllegalStateException("Reducer has been already set");
- }
- jobConf.setClass(prefix + CHAIN_REDUCER_CLASS, klass, Reducer.class);
- // if the Reducer does not have a private JobConf create an empty one
- if (reducerConf == null) {
- // using a JobConf without defaults to make it lightweight.
- // still the chain JobConf may have all defaults and this conf is
- // overlapped to the chain JobConf one.
- reducerConf = new JobConf(false);
- }
- // store in the private reducer conf the input/output classes of the reducer
- // and if it works by value or by reference
- reducerConf.setBoolean(MAPPER_BY_VALUE, byValue);
- reducerConf.setClass(REDUCER_INPUT_KEY_CLASS, inputKeyClass, Object.class);
- reducerConf.setClass(REDUCER_INPUT_VALUE_CLASS, inputValueClass,
- Object.class);
- reducerConf.setClass(REDUCER_OUTPUT_KEY_CLASS, outputKeyClass,
- Object.class);
- reducerConf.setClass(REDUCER_OUTPUT_VALUE_CLASS, outputValueClass,
- Object.class);
- // serialize the private mapper jobconf in the chain jobconf.
- Stringifier<JobConf> stringifier =
- new DefaultStringifier<JobConf>(jobConf, JobConf.class);
- try {
- jobConf.set(prefix + CHAIN_REDUCER_CONFIG,
- stringifier.toString(new JobConf(reducerConf)));
- }
- catch (IOException ioEx) {
- throw new RuntimeException(ioEx);
- }
- }
- /**
- * Configures all the chain elements for the task.
- *
- * @param jobConf chain job's JobConf.
- */
- public void configure(JobConf jobConf) {
- String prefix = getPrefix(isMap);
- chainJobConf = jobConf;
- SerializationFactory serializationFactory =
- new SerializationFactory(chainJobConf);
- int index = jobConf.getInt(prefix + CHAIN_MAPPER_SIZE, 0);
- for (int i = 0; i < index; i++) {
- Class<? extends Mapper> klass =
- jobConf.getClass(prefix + CHAIN_MAPPER_CLASS + i, null, Mapper.class);
- JobConf mConf =
- getChainElementConf(jobConf, prefix + CHAIN_MAPPER_CONFIG + i);
- Mapper mapper = ReflectionUtils.newInstance(klass, mConf);
- mappers.add(mapper);
- if (mConf.getBoolean(MAPPER_BY_VALUE, true)) {
- mappersKeySerialization.add(serializationFactory.getSerialization(
- mConf.getClass(MAPPER_OUTPUT_KEY_CLASS, null)));
- mappersValueSerialization.add(serializationFactory.getSerialization(
- mConf.getClass(MAPPER_OUTPUT_VALUE_CLASS, null)));
- } else {
- mappersKeySerialization.add(null);
- mappersValueSerialization.add(null);
- }
- }
- Class<? extends Reducer> klass =
- jobConf.getClass(prefix + CHAIN_REDUCER_CLASS, null, Reducer.class);
- if (klass != null) {
- JobConf rConf =
- getChainElementConf(jobConf, prefix + CHAIN_REDUCER_CONFIG);
- reducer = ReflectionUtils.newInstance(klass, rConf);
- if (rConf.getBoolean(REDUCER_BY_VALUE, true)) {
- reducerKeySerialization = serializationFactory
- .getSerialization(rConf.getClass(REDUCER_OUTPUT_KEY_CLASS, null));
- reducerValueSerialization = serializationFactory
- .getSerialization(rConf.getClass(REDUCER_OUTPUT_VALUE_CLASS, null));
- } else {
- reducerKeySerialization = null;
- reducerValueSerialization = null;
- }
- }
- }
- /**
- * Returns the chain job conf.
- *
- * @return the chain job conf.
- */
- protected JobConf getChainJobConf() {
- return chainJobConf;
- }
- /**
- * Returns the first Mapper instance in the chain.
- *
- * @return the first Mapper instance in the chain or NULL if none.
- */
- public Mapper getFirstMap() {
- return (mappers.size() > 0) ? mappers.get(0) : null;
- }
- /**
- * Returns the Reducer instance in the chain.
- *
- * @return the Reducer instance in the chain or NULL if none.
- */
- public Reducer getReducer() {
- return reducer;
- }
- /**
- * Returns the OutputCollector to be used by a Mapper instance in the chain.
- *
- * @param mapperIndex index of the Mapper instance to get the OutputCollector.
- * @param output the original OutputCollector of the task.
- * @param reporter the reporter of the task.
- * @return the OutputCollector to be used in the chain.
- */
- @SuppressWarnings({"unchecked"})
- public OutputCollector getMapperCollector(int mapperIndex,
- OutputCollector output,
- Reporter reporter) {
- Serialization keySerialization = mappersKeySerialization.get(mapperIndex);
- Serialization valueSerialization =
- mappersValueSerialization.get(mapperIndex);
- return new ChainOutputCollector(mapperIndex, keySerialization,
- valueSerialization, output, reporter);
- }
- /**
- * Returns the OutputCollector to be used by a Mapper instance in the chain.
- *
- * @param output the original OutputCollector of the task.
- * @param reporter the reporter of the task.
- * @return the OutputCollector to be used in the chain.
- */
- @SuppressWarnings({"unchecked"})
- public OutputCollector getReducerCollector(OutputCollector output,
- Reporter reporter) {
- return new ChainOutputCollector(reducerKeySerialization,
- reducerValueSerialization, output,
- reporter);
- }
- /**
- * Closes all the chain elements.
- *
- * @throws IOException thrown if any of the chain elements threw an
- * IOException exception.
- */
- public void close() throws IOException {
- for (Mapper map : mappers) {
- map.close();
- }
- if (reducer != null) {
- reducer.close();
- }
- }
- // using a ThreadLocal to reuse the ByteArrayOutputStream used for ser/deser
- // it has to be a thread local because if not it would break if used from a
- // MultiThreadedMapRunner.
- private ThreadLocal<DataOutputBuffer> threadLocalDataOutputBuffer =
- new ThreadLocal<DataOutputBuffer>() {
- protected DataOutputBuffer initialValue() {
- return new DataOutputBuffer(1024);
- }
- };
- /**
- * OutputCollector implementation used by the chain tasks.
- * <p/>
- * If it is not the end of the chain, a {@link #collect} invocation invokes
- * the next Mapper in the chain. If it is the end of the chain the task
- * OutputCollector is called.
- */
- private class ChainOutputCollector<K, V> implements OutputCollector<K, V> {
- private int nextMapperIndex;
- private Serialization<K> keySerialization;
- private Serialization<V> valueSerialization;
- private OutputCollector output;
- private Reporter reporter;
- /*
- * Constructor for Mappers
- */
- public ChainOutputCollector(int index, Serialization<K> keySerialization,
- Serialization<V> valueSerialization,
- OutputCollector output, Reporter reporter) {
- this.nextMapperIndex = index + 1;
- this.keySerialization = keySerialization;
- this.valueSerialization = valueSerialization;
- this.output = output;
- this.reporter = reporter;
- }
- /*
- * Constructor for Reducer
- */
- public ChainOutputCollector(Serialization<K> keySerialization,
- Serialization<V> valueSerialization,
- OutputCollector output, Reporter reporter) {
- this.nextMapperIndex = 0;
- this.keySerialization = keySerialization;
- this.valueSerialization = valueSerialization;
- this.output = output;
- this.reporter = reporter;
- }
- @SuppressWarnings({"unchecked"})
- public void collect(K key, V value) throws IOException {
- if (nextMapperIndex < mappers.size()) {
- // there is a next mapper in chain
- // only need to ser/deser if there is next mapper in the chain
- if (keySerialization != null) {
- key = makeCopyForPassByValue(keySerialization, key);
- value = makeCopyForPassByValue(valueSerialization, value);
- }
- // gets ser/deser and mapper of next in chain
- Serialization nextKeySerialization =
- mappersKeySerialization.get(nextMapperIndex);
- Serialization nextValueSerialization =
- mappersValueSerialization.get(nextMapperIndex);
- Mapper nextMapper = mappers.get(nextMapperIndex);
- // invokes next mapper in chain
- nextMapper.map(key, value,
- new ChainOutputCollector(nextMapperIndex,
- nextKeySerialization,
- nextValueSerialization,
- output, reporter),
- reporter);
- } else {
- // end of chain, user real output collector
- output.collect(key, value);
- }
- }
- private <E> E makeCopyForPassByValue(Serialization<E> serialization,
- E obj) throws IOException {
- Serializer<E> ser =
- serialization.getSerializer(GenericsUtil.getClass(obj));
- Deserializer<E> deser =
- serialization.getDeserializer(GenericsUtil.getClass(obj));
- DataOutputBuffer dof = threadLocalDataOutputBuffer.get();
- dof.reset();
- ser.open(dof);
- ser.serialize(obj);
- ser.close();
- obj = ReflectionUtils.newInstance(GenericsUtil.getClass(obj),
- getChainJobConf());
- ByteArrayInputStream bais =
- new ByteArrayInputStream(dof.getData(), 0, dof.getLength());
- deser.open(bais);
- deser.deserialize(obj);
- deser.close();
- return obj;
- }
- }
- }