BasicTypeSorterBase.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:8k
源码类别:

网格计算

开发平台:

Java

  1. /*
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.DataOutputStream;
  20. import java.io.IOException;
  21. import org.apache.hadoop.io.DataOutputBuffer;
  22. import org.apache.hadoop.io.OutputBuffer;
  23. import org.apache.hadoop.io.RawComparator;
  24. import org.apache.hadoop.io.SequenceFile.ValueBytes;
  25. import org.apache.hadoop.util.Progress;
  26. import org.apache.hadoop.mapred.JobConf;
  27. import org.apache.hadoop.io.SequenceFile.Sorter.RawKeyValueIterator;
  28. import org.apache.hadoop.util.Progressable;
  29. /** This class implements the sort interface using primitive int arrays as 
  30.  * the data structures (that is why this class is called 'BasicType'SorterBase)
  31.  */
  32. abstract class BasicTypeSorterBase implements BufferSorter {
  33.   
  34.   protected OutputBuffer keyValBuffer; //the buffer used for storing
  35.                                            //key/values
  36.   protected int[] startOffsets; //the array used to store the start offsets of
  37.                                 //keys in keyValBuffer
  38.   protected int[] keyLengths; //the array used to store the lengths of
  39.                               //keys
  40.   protected int[] valueLengths; //the array used to store the value lengths 
  41.   protected int[] pointers; //the array of startOffsets's indices. This will
  42.                             //be sorted at the end to contain a sorted array of
  43.                             //indices to offsets
  44.   protected RawComparator comparator; //the comparator for the map output
  45.   protected int count; //the number of key/values
  46.   //the overhead of the arrays in memory 
  47.   //12 => 4 for keyoffsets, 4 for keylengths, 4 for valueLengths, and
  48.   //4 for indices into startOffsets array in the
  49.   //pointers array (ignored the partpointers list itself)
  50.   static private final int BUFFERED_KEY_VAL_OVERHEAD = 16;
  51.   static private final int INITIAL_ARRAY_SIZE = 5;
  52.   //we maintain the max lengths of the key/val that we encounter.  During 
  53.   //iteration of the sorted results, we will create a DataOutputBuffer to
  54.   //return the keys. The max size of the DataOutputBuffer will be the max
  55.   //keylength that we encounter. Expose this value to model memory more
  56.   //accurately.
  57.   private int maxKeyLength = 0;
  58.   private int maxValLength = 0;
  59.   //Reference to the Progressable object for sending KeepAlive
  60.   protected Progressable reporter;
  61.   //Implementation of methods of the SorterBase interface
  62.   //
  63.   public void configure(JobConf conf) {
  64.     comparator = conf.getOutputKeyComparator();
  65.   }
  66.   
  67.   public void setProgressable(Progressable reporter) {
  68.     this.reporter = reporter;  
  69.   }
  70.   public void addKeyValue(int recordOffset, int keyLength, int valLength) {
  71.     //Add the start offset of the key in the startOffsets array and the
  72.     //length in the keyLengths array.
  73.     if (startOffsets == null || count == startOffsets.length)
  74.       grow();
  75.     startOffsets[count] = recordOffset;
  76.     keyLengths[count] = keyLength;
  77.     if (keyLength > maxKeyLength) {
  78.       maxKeyLength = keyLength;
  79.     }
  80.     if (valLength > maxValLength) {
  81.       maxValLength = valLength;
  82.     }
  83.     valueLengths[count] = valLength;
  84.     pointers[count] = count;
  85.     count++;
  86.   }
  87.   public void setInputBuffer(OutputBuffer buffer) {
  88.     //store a reference to the keyValBuffer that we need to read during sort
  89.     this.keyValBuffer = buffer;
  90.   }
  91.   public long getMemoryUtilized() {
  92.     //the total length of the arrays + the max{Key,Val}Length (this will be the 
  93.     //max size of the DataOutputBuffers during the iteration of the sorted
  94.     //keys).
  95.     if (startOffsets != null) {
  96.       return (startOffsets.length) * BUFFERED_KEY_VAL_OVERHEAD + 
  97.               maxKeyLength + maxValLength;
  98.     }
  99.     else { //nothing from this yet
  100.       return 0;
  101.     }
  102.   }
  103.   public abstract RawKeyValueIterator sort();
  104.   
  105.   public void close() {
  106.     //set count to 0; also, we don't reuse the arrays since we want to maintain
  107.     //consistency in the memory model
  108.     count = 0;
  109.     startOffsets = null;
  110.     keyLengths = null;
  111.     valueLengths = null;
  112.     pointers = null;
  113.     maxKeyLength = 0;
  114.     maxValLength = 0;
  115.     
  116.     //release the large key-value buffer so that the GC, if necessary,
  117.     //can collect it away
  118.     keyValBuffer = null;
  119.   }
  120.   
  121.   private void grow() {
  122.     int currLength = 0;
  123.     if (startOffsets != null) {
  124.       currLength = startOffsets.length;
  125.     }
  126.     int newLength = (int)(currLength * 1.1) + 1;
  127.     startOffsets = grow(startOffsets, newLength);
  128.     keyLengths = grow(keyLengths, newLength);
  129.     valueLengths = grow(valueLengths, newLength);
  130.     pointers = grow(pointers, newLength);
  131.   }
  132.   
  133.   private int[] grow(int[] old, int newLength) {
  134.     int[] result = new int[newLength];
  135.     if(old != null) { 
  136.       System.arraycopy(old, 0, result, 0, old.length);
  137.     }
  138.     return result;
  139.   }
  140. } //BasicTypeSorterBase
  141. //Implementation of methods of the RawKeyValueIterator interface. These
  142. //methods must be invoked to iterate over key/vals after sort is done.
  143. //
  144. class MRSortResultIterator implements RawKeyValueIterator {
  145.   
  146.   private int count;
  147.   private int[] pointers;
  148.   private int[] startOffsets;
  149.   private int[] keyLengths;
  150.   private int[] valLengths;
  151.   private int currStartOffsetIndex;
  152.   private int currIndexInPointers;
  153.   private OutputBuffer keyValBuffer;
  154.   private DataOutputBuffer key = new DataOutputBuffer();
  155.   private InMemUncompressedBytes value = new InMemUncompressedBytes();
  156.   
  157.   public MRSortResultIterator(OutputBuffer keyValBuffer, 
  158.                               int []pointers, int []startOffsets,
  159.                               int []keyLengths, int []valLengths) {
  160.     this.count = pointers.length;
  161.     this.pointers = pointers;
  162.     this.startOffsets = startOffsets;
  163.     this.keyLengths = keyLengths;
  164.     this.valLengths = valLengths;
  165.     this.keyValBuffer = keyValBuffer;
  166.   }
  167.   
  168.   public Progress getProgress() {
  169.     return null;
  170.   }
  171.   
  172.   public DataOutputBuffer getKey() throws IOException {
  173.     int currKeyOffset = startOffsets[currStartOffsetIndex];
  174.     int currKeyLength = keyLengths[currStartOffsetIndex];
  175.     //reuse the same key
  176.     key.reset();
  177.     key.write(keyValBuffer.getData(), currKeyOffset, currKeyLength);
  178.     return key;
  179.   }
  180.   public ValueBytes getValue() throws IOException {
  181.     //value[i] is stored in the following byte range:
  182.     //startOffsets[i] + keyLengths[i] through valLengths[i]
  183.     value.reset(keyValBuffer,
  184.                 startOffsets[currStartOffsetIndex] + keyLengths[currStartOffsetIndex],
  185.                 valLengths[currStartOffsetIndex]);
  186.     return value;
  187.   }
  188.   public boolean next() throws IOException {
  189.     if (count == currIndexInPointers)
  190.       return false;
  191.     currStartOffsetIndex = pointers[currIndexInPointers];
  192.     currIndexInPointers++;
  193.     return true;
  194.   }
  195.   
  196.   public void close() {
  197.     return;
  198.   }
  199.   
  200.   //An implementation of the ValueBytes interface for the in-memory value
  201.   //buffers. 
  202.   private static class InMemUncompressedBytes implements ValueBytes {
  203.     private byte[] data;
  204.     int start;
  205.     int dataSize;
  206.     private void reset(OutputBuffer d, int start, int length) 
  207.       throws IOException {
  208.       data = d.getData();
  209.       this.start = start;
  210.       dataSize = length;
  211.     }
  212.             
  213.     public int getSize() {
  214.       return dataSize;
  215.     }
  216.             
  217.     public void writeUncompressedBytes(DataOutputStream outStream)
  218.       throws IOException {
  219.       outStream.write(data, start, dataSize);
  220.     }
  221.     public void writeCompressedBytes(DataOutputStream outStream) 
  222.       throws IllegalArgumentException, IOException {
  223.       throw
  224.         new IllegalArgumentException("UncompressedBytes cannot be compressed!");
  225.     }
  226.   
  227.   } // InMemUncompressedBytes
  228. } //MRSortResultIterator