ReduceContext.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:7k
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.apache.hadoop.mapreduce;
- import java.io.IOException;
- import java.util.Iterator;
- import java.util.NoSuchElementException;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.io.BytesWritable;
- import org.apache.hadoop.io.DataInputBuffer;
- import org.apache.hadoop.io.RawComparator;
- import org.apache.hadoop.io.serializer.Deserializer;
- import org.apache.hadoop.io.serializer.SerializationFactory;
- import org.apache.hadoop.mapred.RawKeyValueIterator;
- import org.apache.hadoop.util.Progressable;
- /**
- * The context passed to the {@link Reducer}.
- * @param <KEYIN> the class of the input keys
- * @param <VALUEIN> the class of the input values
- * @param <KEYOUT> the class of the output keys
- * @param <VALUEOUT> the class of the output values
- */
- public class ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
- extends TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
- private RawKeyValueIterator input;
- private Counter inputCounter;
- private RawComparator<KEYIN> comparator;
- private KEYIN key; // current key
- private VALUEIN value; // current value
- private boolean firstValue = false; // first value in key
- private boolean nextKeyIsSame = false; // more w/ this key
- private boolean hasMore; // more in file
- protected Progressable reporter;
- private Deserializer<KEYIN> keyDeserializer;
- private Deserializer<VALUEIN> valueDeserializer;
- private DataInputBuffer buffer = new DataInputBuffer();
- private BytesWritable currentRawKey = new BytesWritable();
- private ValueIterable iterable = new ValueIterable();
- public ReduceContext(Configuration conf, TaskAttemptID taskid,
- RawKeyValueIterator input,
- Counter inputCounter,
- RecordWriter<KEYOUT,VALUEOUT> output,
- OutputCommitter committer,
- StatusReporter reporter,
- RawComparator<KEYIN> comparator,
- Class<KEYIN> keyClass,
- Class<VALUEIN> valueClass
- ) throws InterruptedException, IOException{
- super(conf, taskid, output, committer, reporter);
- this.input = input;
- this.inputCounter = inputCounter;
- this.comparator = comparator;
- SerializationFactory serializationFactory = new SerializationFactory(conf);
- this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
- this.keyDeserializer.open(buffer);
- this.valueDeserializer = serializationFactory.getDeserializer(valueClass);
- this.valueDeserializer.open(buffer);
- hasMore = input.next();
- }
- /** Start processing next unique key. */
- public boolean nextKey() throws IOException,InterruptedException {
- while (hasMore && nextKeyIsSame) {
- nextKeyValue();
- }
- if (hasMore) {
- return nextKeyValue();
- } else {
- return false;
- }
- }
- /**
- * Advance to the next key/value pair.
- */
- @Override
- public boolean nextKeyValue() throws IOException, InterruptedException {
- if (!hasMore) {
- key = null;
- value = null;
- return false;
- }
- firstValue = !nextKeyIsSame;
- DataInputBuffer next = input.getKey();
- currentRawKey.set(next.getData(), next.getPosition(),
- next.getLength() - next.getPosition());
- buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength());
- key = keyDeserializer.deserialize(key);
- next = input.getValue();
- buffer.reset(next.getData(), next.getPosition(), next.getLength());
- value = valueDeserializer.deserialize(value);
- hasMore = input.next();
- inputCounter.increment(1);
- if (hasMore) {
- next = input.getKey();
- nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0,
- currentRawKey.getLength(),
- next.getData(),
- next.getPosition(),
- next.getLength() - next.getPosition()
- ) == 0;
- } else {
- nextKeyIsSame = false;
- }
- return true;
- }
- public KEYIN getCurrentKey() {
- return key;
- }
- @Override
- public VALUEIN getCurrentValue() {
- return value;
- }
- protected class ValueIterator implements Iterator<VALUEIN> {
- @Override
- public boolean hasNext() {
- return firstValue || nextKeyIsSame;
- }
- @Override
- public VALUEIN next() {
- // if this is the first record, we don't need to advance
- if (firstValue) {
- firstValue = false;
- return value;
- }
- // if this isn't the first record and the next key is different, they
- // can't advance it here.
- if (!nextKeyIsSame) {
- throw new NoSuchElementException("iterate past last value");
- }
- // otherwise, go to the next key/value pair
- try {
- nextKeyValue();
- return value;
- } catch (IOException ie) {
- throw new RuntimeException("next value iterator failed", ie);
- } catch (InterruptedException ie) {
- // this is bad, but we can't modify the exception list of java.util
- throw new RuntimeException("next value iterator interrupted", ie);
- }
- }
- @Override
- public void remove() {
- throw new UnsupportedOperationException("remove not implemented");
- }
-
- }
- protected class ValueIterable implements Iterable<VALUEIN> {
- private ValueIterator iterator = new ValueIterator();
- @Override
- public Iterator<VALUEIN> iterator() {
- return iterator;
- }
- }
-
- /**
- * Iterate through the values for the current key, reusing the same value
- * object, which is stored in the context.
- * @return the series of values associated with the current key. All of the
- * objects returned directly and indirectly from this method are reused.
- */
- public
- Iterable<VALUEIN> getValues() throws IOException, InterruptedException {
- return iterable;
- }
- }