Merger.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:19k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.IOException;
  20. import java.util.ArrayList;
  21. import java.util.Collections;
  22. import java.util.Comparator;
  23. import java.util.List;
  24. import org.apache.commons.logging.Log;
  25. import org.apache.commons.logging.LogFactory;
  26. import org.apache.hadoop.conf.Configuration;
  27. import org.apache.hadoop.fs.FSDataInputStream;
  28. import org.apache.hadoop.fs.ChecksumFileSystem;
  29. import org.apache.hadoop.fs.FileSystem;
  30. import org.apache.hadoop.fs.LocalDirAllocator;
  31. import org.apache.hadoop.fs.Path;
  32. import org.apache.hadoop.io.DataInputBuffer;
  33. import org.apache.hadoop.io.RawComparator;
  34. import org.apache.hadoop.io.compress.CompressionCodec;
  35. import org.apache.hadoop.mapred.IFile.Reader;
  36. import org.apache.hadoop.mapred.IFile.Writer;
  37. import org.apache.hadoop.util.PriorityQueue;
  38. import org.apache.hadoop.util.Progress;
  39. import org.apache.hadoop.util.Progressable;
  40. class Merger {  
  41.   private static final Log LOG = LogFactory.getLog(Merger.class);
  42.   // Local directories
  43.   private static LocalDirAllocator lDirAlloc = 
  44.     new LocalDirAllocator("mapred.local.dir");
  45.   public static <K extends Object, V extends Object>
  46.   RawKeyValueIterator merge(Configuration conf, FileSystem fs,
  47.                             Class<K> keyClass, Class<V> valueClass, 
  48.                             CompressionCodec codec,
  49.                             Path[] inputs, boolean deleteInputs, 
  50.                             int mergeFactor, Path tmpDir,
  51.                             RawComparator<K> comparator, Progressable reporter,
  52.                             Counters.Counter readsCounter,
  53.                             Counters.Counter writesCounter)
  54.   throws IOException {
  55.     return 
  56.       new MergeQueue<K, V>(conf, fs, inputs, deleteInputs, codec, comparator, 
  57.                            reporter).merge(keyClass, valueClass,
  58.                                            mergeFactor, tmpDir,
  59.                                            readsCounter, writesCounter);
  60.   }
  61.   
  62.   public static <K extends Object, V extends Object>
  63.   RawKeyValueIterator merge(Configuration conf, FileSystem fs, 
  64.                             Class<K> keyClass, Class<V> valueClass, 
  65.                             List<Segment<K, V>> segments, 
  66.                             int mergeFactor, Path tmpDir,
  67.                             RawComparator<K> comparator, Progressable reporter,
  68.                             Counters.Counter readsCounter,
  69.                             Counters.Counter writesCounter)
  70.       throws IOException {
  71.     return merge(conf, fs, keyClass, valueClass, segments, mergeFactor, tmpDir,
  72.                  comparator, reporter, false, readsCounter, writesCounter);
  73.   }
  74.   public static <K extends Object, V extends Object>
  75.   RawKeyValueIterator merge(Configuration conf, FileSystem fs,
  76.                             Class<K> keyClass, Class<V> valueClass,
  77.                             List<Segment<K, V>> segments,
  78.                             int mergeFactor, Path tmpDir,
  79.                             RawComparator<K> comparator, Progressable reporter,
  80.                             boolean sortSegments,
  81.                             Counters.Counter readsCounter,
  82.                             Counters.Counter writesCounter)
  83.       throws IOException {
  84.     return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
  85.                            sortSegments).merge(keyClass, valueClass,
  86.                                                mergeFactor, tmpDir,
  87.                                                readsCounter, writesCounter);
  88.   }
  89.   static <K extends Object, V extends Object>
  90.     RawKeyValueIterator merge(Configuration conf, FileSystem fs,
  91.                             Class<K> keyClass, Class<V> valueClass,
  92.                             List<Segment<K, V>> segments,
  93.                             int mergeFactor, int inMemSegments, Path tmpDir,
  94.                             RawComparator<K> comparator, Progressable reporter,
  95.                             boolean sortSegments,
  96.                             Counters.Counter readsCounter,
  97.                             Counters.Counter writesCounter)
  98.       throws IOException {
  99.     return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
  100.                            sortSegments).merge(keyClass, valueClass,
  101.                                                mergeFactor, inMemSegments,
  102.                                                tmpDir,
  103.                                                readsCounter, writesCounter);
  104.   }
  105.   public static <K extends Object, V extends Object>
  106.   void writeFile(RawKeyValueIterator records, Writer<K, V> writer, 
  107.                  Progressable progressable, Configuration conf) 
  108.   throws IOException {
  109.     long progressBar = conf.getLong("mapred.merge.recordsBeforeProgress",
  110.         10000);
  111.     long recordCtr = 0;
  112.     while(records.next()) {
  113.       writer.append(records.getKey(), records.getValue());
  114.       
  115.       if (((recordCtr++) % progressBar) == 0) {
  116.         progressable.progress();
  117.       }
  118.     }
  119. }
  120.   public static class Segment<K extends Object, V extends Object> {
  121.     Reader<K, V> reader = null;
  122.     DataInputBuffer key = new DataInputBuffer();
  123.     DataInputBuffer value = new DataInputBuffer();
  124.     
  125.     Configuration conf = null;
  126.     FileSystem fs = null;
  127.     Path file = null;
  128.     boolean preserve = false;
  129.     CompressionCodec codec = null;
  130.     long segmentOffset = 0;
  131.     long segmentLength = -1;
  132.     
  133.     public Segment(Configuration conf, FileSystem fs, Path file,
  134.                    CompressionCodec codec, boolean preserve) throws IOException {
  135.       this(conf, fs, file, 0, fs.getFileStatus(file).getLen(), codec, preserve);
  136.     }
  137.     public Segment(Configuration conf, FileSystem fs, Path file,
  138.         long segmentOffset, long segmentLength, CompressionCodec codec,
  139.         boolean preserve) throws IOException {
  140.       this.conf = conf;
  141.       this.fs = fs;
  142.       this.file = file;
  143.       this.codec = codec;
  144.       this.preserve = preserve;
  145.       this.segmentOffset = segmentOffset;
  146.       this.segmentLength = segmentLength;
  147.     }
  148.     
  149.     public Segment(Reader<K, V> reader, boolean preserve) {
  150.       this.reader = reader;
  151.       this.preserve = preserve;
  152.       
  153.       this.segmentLength = reader.getLength();
  154.     }
  155.     private void init(Counters.Counter readsCounter) throws IOException {
  156.       if (reader == null) {
  157.         FSDataInputStream in = fs.open(file);
  158.         in.seek(segmentOffset);
  159.         reader = new Reader<K, V>(conf, in, segmentLength, codec, readsCounter);
  160.       }
  161.     }
  162.     
  163.     DataInputBuffer getKey() { return key; }
  164.     DataInputBuffer getValue() { return value; }
  165.     long getLength() { 
  166.       return (reader == null) ?
  167.         segmentLength : reader.getLength();
  168.     }
  169.     
  170.     boolean next() throws IOException {
  171.       return reader.next(key, value);
  172.     }
  173.     
  174.     void close() throws IOException {
  175.       reader.close();
  176.       
  177.       if (!preserve && fs != null) {
  178.         fs.delete(file, false);
  179.       }
  180.     }
  181.     public long getPosition() throws IOException {
  182.       return reader.getPosition();
  183.     }
  184.   }
  185.   
  186.   private static class MergeQueue<K extends Object, V extends Object> 
  187.   extends PriorityQueue<Segment<K, V>> implements RawKeyValueIterator {
  188.     Configuration conf;
  189.     FileSystem fs;
  190.     CompressionCodec codec;
  191.     
  192.     List<Segment<K, V>> segments = new ArrayList<Segment<K,V>>();
  193.     
  194.     RawComparator<K> comparator;
  195.     
  196.     private long totalBytesProcessed;
  197.     private float progPerByte;
  198.     private Progress mergeProgress = new Progress();
  199.     
  200.     Progressable reporter;
  201.     
  202.     DataInputBuffer key;
  203.     DataInputBuffer value;
  204.     
  205.     Segment<K, V> minSegment;
  206.     Comparator<Segment<K, V>> segmentComparator =   
  207.       new Comparator<Segment<K, V>>() {
  208.       public int compare(Segment<K, V> o1, Segment<K, V> o2) {
  209.         if (o1.getLength() == o2.getLength()) {
  210.           return 0;
  211.         }
  212.         return o1.getLength() < o2.getLength() ? -1 : 1;
  213.       }
  214.     };
  215.     
  216.     public MergeQueue(Configuration conf, FileSystem fs, 
  217.                       Path[] inputs, boolean deleteInputs, 
  218.                       CompressionCodec codec, RawComparator<K> comparator,
  219.                       Progressable reporter) 
  220.     throws IOException {
  221.       this.conf = conf;
  222.       this.fs = fs;
  223.       this.codec = codec;
  224.       this.comparator = comparator;
  225.       this.reporter = reporter;
  226.       
  227.       for (Path file : inputs) {
  228.         segments.add(new Segment<K, V>(conf, fs, file, codec, !deleteInputs));
  229.       }
  230.       
  231.       // Sort segments on file-lengths
  232.       Collections.sort(segments, segmentComparator); 
  233.     }
  234.     
  235.     public MergeQueue(Configuration conf, FileSystem fs,
  236.         List<Segment<K, V>> segments, RawComparator<K> comparator,
  237.         Progressable reporter) {
  238.       this(conf, fs, segments, comparator, reporter, false);
  239.     }
  240.     public MergeQueue(Configuration conf, FileSystem fs, 
  241.         List<Segment<K, V>> segments, RawComparator<K> comparator,
  242.         Progressable reporter, boolean sortSegments) {
  243.       this.conf = conf;
  244.       this.fs = fs;
  245.       this.comparator = comparator;
  246.       this.segments = segments;
  247.       this.reporter = reporter;
  248.       if (sortSegments) {
  249.         Collections.sort(segments, segmentComparator);
  250.       }
  251.     }
  252.     public void close() throws IOException {
  253.       Segment<K, V> segment;
  254.       while((segment = pop()) != null) {
  255.         segment.close();
  256.       }
  257.     }
  258.     public DataInputBuffer getKey() throws IOException {
  259.       return key;
  260.     }
  261.     public DataInputBuffer getValue() throws IOException {
  262.       return value;
  263.     }
  264.     private void adjustPriorityQueue(Segment<K, V> reader) throws IOException{
  265.       long startPos = reader.getPosition();
  266.       boolean hasNext = reader.next();
  267.       long endPos = reader.getPosition();
  268.       totalBytesProcessed += endPos - startPos;
  269.       mergeProgress.set(totalBytesProcessed * progPerByte);
  270.       if (hasNext) {
  271.         adjustTop();
  272.       } else {
  273.         pop();
  274.         reader.close();
  275.       }
  276.     }
  277.     public boolean next() throws IOException {
  278.       if (size() == 0)
  279.         return false;
  280.       if (minSegment != null) {
  281.         //minSegment is non-null for all invocations of next except the first
  282.         //one. For the first invocation, the priority queue is ready for use
  283.         //but for the subsequent invocations, first adjust the queue 
  284.         adjustPriorityQueue(minSegment);
  285.         if (size() == 0) {
  286.           minSegment = null;
  287.           return false;
  288.         }
  289.       }
  290.       minSegment = top();
  291.       
  292.       key = minSegment.getKey();
  293.       value = minSegment.getValue();
  294.       return true;
  295.     }
  296.     @SuppressWarnings("unchecked")
  297.     protected boolean lessThan(Object a, Object b) {
  298.       DataInputBuffer key1 = ((Segment<K, V>)a).getKey();
  299.       DataInputBuffer key2 = ((Segment<K, V>)b).getKey();
  300.       int s1 = key1.getPosition();
  301.       int l1 = key1.getLength() - s1;
  302.       int s2 = key2.getPosition();
  303.       int l2 = key2.getLength() - s2;
  304.       return comparator.compare(key1.getData(), s1, l1, key2.getData(), s2, l2) < 0;
  305.     }
  306.     
  307.     public RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass,
  308.                                      int factor, Path tmpDir,
  309.                                      Counters.Counter readsCounter,
  310.                                      Counters.Counter writesCounter)
  311.         throws IOException {
  312.       return merge(keyClass, valueClass, factor, 0, tmpDir,
  313.                    readsCounter, writesCounter);
  314.     }
  315.     RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass,
  316.                                      int factor, int inMem, Path tmpDir,
  317.                                      Counters.Counter readsCounter,
  318.                                      Counters.Counter writesCounter)
  319.         throws IOException {
  320.       LOG.info("Merging " + segments.size() + " sorted segments");
  321.       
  322.       //create the MergeStreams from the sorted map created in the constructor
  323.       //and dump the final output to a file
  324.       int numSegments = segments.size();
  325.       int origFactor = factor;
  326.       int passNo = 1;
  327.       do {
  328.         //get the factor for this pass of merge. We assume in-memory segments
  329.         //are the first entries in the segment list and that the pass factor
  330.         //doesn't apply to them
  331.         factor = getPassFactor(factor, passNo, numSegments - inMem);
  332.         if (1 == passNo) {
  333.           factor += inMem;
  334.         }
  335.         List<Segment<K, V>> segmentsToMerge =
  336.           new ArrayList<Segment<K, V>>();
  337.         int segmentsConsidered = 0;
  338.         int numSegmentsToConsider = factor;
  339.         while (true) {
  340.           //extract the smallest 'factor' number of segments  
  341.           //Call cleanup on the empty segments (no key/value data)
  342.           List<Segment<K, V>> mStream = 
  343.             getSegmentDescriptors(numSegmentsToConsider);
  344.           for (Segment<K, V> segment : mStream) {
  345.             // Initialize the segment at the last possible moment;
  346.             // this helps in ensuring we don't use buffers until we need them
  347.             segment.init(readsCounter);
  348.             long startPos = segment.getPosition();
  349.             boolean hasNext = segment.next();
  350.             long endPos = segment.getPosition();
  351.             totalBytesProcessed += endPos - startPos;
  352.             mergeProgress.set(totalBytesProcessed * progPerByte);
  353.             if (hasNext) {
  354.               segmentsToMerge.add(segment);
  355.               segmentsConsidered++;
  356.             }
  357.             else {
  358.               segment.close();
  359.               numSegments--; //we ignore this segment for the merge
  360.             }
  361.           }
  362.           //if we have the desired number of segments
  363.           //or looked at all available segments, we break
  364.           if (segmentsConsidered == factor || 
  365.               segments.size() == 0) {
  366.             break;
  367.           }
  368.             
  369.           numSegmentsToConsider = factor - segmentsConsidered;
  370.         }
  371.         
  372.         //feed the streams to the priority queue
  373.         initialize(segmentsToMerge.size());
  374.         clear();
  375.         for (Segment<K, V> segment : segmentsToMerge) {
  376.           put(segment);
  377.         }
  378.         
  379.         //if we have lesser number of segments remaining, then just return the
  380.         //iterator, else do another single level merge
  381.         if (numSegments <= factor) {
  382.           //calculate the length of the remaining segments. Required for 
  383.           //calculating the merge progress
  384.           long totalBytes = 0;
  385.           for (int i = 0; i < segmentsToMerge.size(); i++) {
  386.             totalBytes += segmentsToMerge.get(i).getLength();
  387.           }
  388.           if (totalBytes != 0) //being paranoid
  389.             progPerByte = 1.0f / (float)totalBytes;
  390.           
  391.           if (totalBytes != 0)
  392.             mergeProgress.set(totalBytesProcessed * progPerByte);
  393.           else
  394.             mergeProgress.set(1.0f); // Last pass and no segments left - we're done
  395.           
  396.           LOG.info("Down to the last merge-pass, with " + numSegments + 
  397.                    " segments left of total size: " + totalBytes + " bytes");
  398.           return this;
  399.         } else {
  400.           LOG.info("Merging " + segmentsToMerge.size() + 
  401.                    " intermediate segments out of a total of " + 
  402.                    (segments.size()+segmentsToMerge.size()));
  403.           
  404.           //we want to spread the creation of temp files on multiple disks if 
  405.           //available under the space constraints
  406.           long approxOutputSize = 0; 
  407.           for (Segment<K, V> s : segmentsToMerge) {
  408.             approxOutputSize += s.getLength() + 
  409.                                 ChecksumFileSystem.getApproxChkSumLength(
  410.                                 s.getLength());
  411.           }
  412.           Path tmpFilename = 
  413.             new Path(tmpDir, "intermediate").suffix("." + passNo);
  414.           Path outputFile =  lDirAlloc.getLocalPathForWrite(
  415.                                               tmpFilename.toString(),
  416.                                               approxOutputSize, conf);
  417.           Writer<K, V> writer = 
  418.             new Writer<K, V>(conf, fs, outputFile, keyClass, valueClass, codec,
  419.                              writesCounter);
  420.           writeFile(this, writer, reporter, conf);
  421.           writer.close();
  422.           
  423.           //we finished one single level merge; now clean up the priority 
  424.           //queue
  425.           this.close();
  426.           // Add the newly create segment to the list of segments to be merged
  427.           Segment<K, V> tempSegment = 
  428.             new Segment<K, V>(conf, fs, outputFile, codec, false);
  429.           segments.add(tempSegment);
  430.           numSegments = segments.size();
  431.           Collections.sort(segments, segmentComparator);
  432.           
  433.           passNo++;
  434.         }
  435.         //we are worried about only the first pass merge factor. So reset the 
  436.         //factor to what it originally was
  437.         factor = origFactor;
  438.       } while(true);
  439.     }
  440.     
  441.     /**
  442.      * Determine the number of segments to merge in a given pass. Assuming more
  443.      * than factor segments, the first pass should attempt to bring the total
  444.      * number of segments - 1 to be divisible by the factor - 1 (each pass
  445.      * takes X segments and produces 1) to minimize the number of merges.
  446.      */
  447.     private int getPassFactor(int factor, int passNo, int numSegments) {
  448.       if (passNo > 1 || numSegments <= factor || factor == 1) 
  449.         return factor;
  450.       int mod = (numSegments - 1) % (factor - 1);
  451.       if (mod == 0)
  452.         return factor;
  453.       return mod + 1;
  454.     }
  455.     
  456.     /** Return (& remove) the requested number of segment descriptors from the
  457.      * sorted map.
  458.      */
  459.     private List<Segment<K, V>> getSegmentDescriptors(int numDescriptors) {
  460.       if (numDescriptors > segments.size()) {
  461.         List<Segment<K, V>> subList = new ArrayList<Segment<K,V>>(segments);
  462.         segments.clear();
  463.         return subList;
  464.       }
  465.       
  466.       List<Segment<K, V>> subList = 
  467.         new ArrayList<Segment<K,V>>(segments.subList(0, numDescriptors));
  468.       for (int i=0; i < numDescriptors; ++i) {
  469.         segments.remove(0);
  470.       }
  471.       return subList;
  472.     }
  473.     public Progress getProgress() {
  474.       return mergeProgress;
  475.     }
  476.   }
  477. }