BasicTypeSorterBase.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:8k
- /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.apache.hadoop.mapred;
- import java.io.DataOutputStream;
- import java.io.IOException;
- import org.apache.hadoop.io.DataOutputBuffer;
- import org.apache.hadoop.io.OutputBuffer;
- import org.apache.hadoop.io.RawComparator;
- import org.apache.hadoop.io.SequenceFile.ValueBytes;
- import org.apache.hadoop.util.Progress;
- import org.apache.hadoop.mapred.JobConf;
- import org.apache.hadoop.io.SequenceFile.Sorter.RawKeyValueIterator;
- import org.apache.hadoop.util.Progressable;
- /** This class implements the sort interface using primitive int arrays as
- * the data structures (that is why this class is called 'BasicType'SorterBase)
- */
- abstract class BasicTypeSorterBase implements BufferSorter {
-
- protected OutputBuffer keyValBuffer; //the buffer used for storing
- //key/values
- protected int[] startOffsets; //the array used to store the start offsets of
- //keys in keyValBuffer
- protected int[] keyLengths; //the array used to store the lengths of
- //keys
- protected int[] valueLengths; //the array used to store the value lengths
- protected int[] pointers; //the array of startOffsets's indices. This will
- //be sorted at the end to contain a sorted array of
- //indices to offsets
- protected RawComparator comparator; //the comparator for the map output
- protected int count; //the number of key/values
- //the overhead of the arrays in memory
- //12 => 4 for keyoffsets, 4 for keylengths, 4 for valueLengths, and
- //4 for indices into startOffsets array in the
- //pointers array (ignored the partpointers list itself)
- static private final int BUFFERED_KEY_VAL_OVERHEAD = 16;
- static private final int INITIAL_ARRAY_SIZE = 5;
- //we maintain the max lengths of the key/val that we encounter. During
- //iteration of the sorted results, we will create a DataOutputBuffer to
- //return the keys. The max size of the DataOutputBuffer will be the max
- //keylength that we encounter. Expose this value to model memory more
- //accurately.
- private int maxKeyLength = 0;
- private int maxValLength = 0;
- //Reference to the Progressable object for sending KeepAlive
- protected Progressable reporter;
- //Implementation of methods of the SorterBase interface
- //
- public void configure(JobConf conf) {
- comparator = conf.getOutputKeyComparator();
- }
-
- public void setProgressable(Progressable reporter) {
- this.reporter = reporter;
- }
- public void addKeyValue(int recordOffset, int keyLength, int valLength) {
- //Add the start offset of the key in the startOffsets array and the
- //length in the keyLengths array.
- if (startOffsets == null || count == startOffsets.length)
- grow();
- startOffsets[count] = recordOffset;
- keyLengths[count] = keyLength;
- if (keyLength > maxKeyLength) {
- maxKeyLength = keyLength;
- }
- if (valLength > maxValLength) {
- maxValLength = valLength;
- }
- valueLengths[count] = valLength;
- pointers[count] = count;
- count++;
- }
- public void setInputBuffer(OutputBuffer buffer) {
- //store a reference to the keyValBuffer that we need to read during sort
- this.keyValBuffer = buffer;
- }
- public long getMemoryUtilized() {
- //the total length of the arrays + the max{Key,Val}Length (this will be the
- //max size of the DataOutputBuffers during the iteration of the sorted
- //keys).
- if (startOffsets != null) {
- return (startOffsets.length) * BUFFERED_KEY_VAL_OVERHEAD +
- maxKeyLength + maxValLength;
- }
- else { //nothing from this yet
- return 0;
- }
- }
- public abstract RawKeyValueIterator sort();
-
- public void close() {
- //set count to 0; also, we don't reuse the arrays since we want to maintain
- //consistency in the memory model
- count = 0;
- startOffsets = null;
- keyLengths = null;
- valueLengths = null;
- pointers = null;
- maxKeyLength = 0;
- maxValLength = 0;
-
- //release the large key-value buffer so that the GC, if necessary,
- //can collect it away
- keyValBuffer = null;
- }
-
- private void grow() {
- int currLength = 0;
- if (startOffsets != null) {
- currLength = startOffsets.length;
- }
- int newLength = (int)(currLength * 1.1) + 1;
- startOffsets = grow(startOffsets, newLength);
- keyLengths = grow(keyLengths, newLength);
- valueLengths = grow(valueLengths, newLength);
- pointers = grow(pointers, newLength);
- }
-
- private int[] grow(int[] old, int newLength) {
- int[] result = new int[newLength];
- if(old != null) {
- System.arraycopy(old, 0, result, 0, old.length);
- }
- return result;
- }
- } //BasicTypeSorterBase
- //Implementation of methods of the RawKeyValueIterator interface. These
- //methods must be invoked to iterate over key/vals after sort is done.
- //
- class MRSortResultIterator implements RawKeyValueIterator {
-
- private int count;
- private int[] pointers;
- private int[] startOffsets;
- private int[] keyLengths;
- private int[] valLengths;
- private int currStartOffsetIndex;
- private int currIndexInPointers;
- private OutputBuffer keyValBuffer;
- private DataOutputBuffer key = new DataOutputBuffer();
- private InMemUncompressedBytes value = new InMemUncompressedBytes();
-
- public MRSortResultIterator(OutputBuffer keyValBuffer,
- int []pointers, int []startOffsets,
- int []keyLengths, int []valLengths) {
- this.count = pointers.length;
- this.pointers = pointers;
- this.startOffsets = startOffsets;
- this.keyLengths = keyLengths;
- this.valLengths = valLengths;
- this.keyValBuffer = keyValBuffer;
- }
-
- public Progress getProgress() {
- return null;
- }
-
- public DataOutputBuffer getKey() throws IOException {
- int currKeyOffset = startOffsets[currStartOffsetIndex];
- int currKeyLength = keyLengths[currStartOffsetIndex];
- //reuse the same key
- key.reset();
- key.write(keyValBuffer.getData(), currKeyOffset, currKeyLength);
- return key;
- }
- public ValueBytes getValue() throws IOException {
- //value[i] is stored in the following byte range:
- //startOffsets[i] + keyLengths[i] through valLengths[i]
- value.reset(keyValBuffer,
- startOffsets[currStartOffsetIndex] + keyLengths[currStartOffsetIndex],
- valLengths[currStartOffsetIndex]);
- return value;
- }
- public boolean next() throws IOException {
- if (count == currIndexInPointers)
- return false;
- currStartOffsetIndex = pointers[currIndexInPointers];
- currIndexInPointers++;
- return true;
- }
-
- public void close() {
- return;
- }
-
- //An implementation of the ValueBytes interface for the in-memory value
- //buffers.
- private static class InMemUncompressedBytes implements ValueBytes {
- private byte[] data;
- int start;
- int dataSize;
- private void reset(OutputBuffer d, int start, int length)
- throws IOException {
- data = d.getData();
- this.start = start;
- dataSize = length;
- }
-
- public int getSize() {
- return dataSize;
- }
-
- public void writeUncompressedBytes(DataOutputStream outStream)
- throws IOException {
- outStream.write(data, start, dataSize);
- }
- public void writeCompressedBytes(DataOutputStream outStream)
- throws IllegalArgumentException, IOException {
- throw
- new IllegalArgumentException("UncompressedBytes cannot be compressed!");
- }
-
- } // InMemUncompressedBytes
- } //MRSortResultIterator