TotalOrderPartitioner.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:9k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred.lib;
  19. import java.io.IOException;
  20. import java.lang.reflect.Array;
  21. import java.util.ArrayList;
  22. import java.util.Arrays;
  23. import org.apache.hadoop.fs.FileSystem;
  24. import org.apache.hadoop.fs.Path;
  25. import org.apache.hadoop.io.BinaryComparable;
  26. import org.apache.hadoop.io.NullWritable;
  27. import org.apache.hadoop.io.SequenceFile;
  28. import org.apache.hadoop.io.RawComparator;
  29. import org.apache.hadoop.io.WritableComparable;
  30. import org.apache.hadoop.mapred.JobConf;
  31. import org.apache.hadoop.mapred.Partitioner;
  32. import org.apache.hadoop.util.ReflectionUtils;
  33. /**
  34.  * Partitioner effecting a total order by reading split points from
  35.  * an externally generated source.
  36.  */
  37. public class TotalOrderPartitioner<K extends WritableComparable,V>
  38.     implements Partitioner<K,V> {
  39.   private Node partitions;
  40.   public static final String DEFAULT_PATH = "_partition.lst";
  41.   public TotalOrderPartitioner() { }
  42.   /**
  43.    * Read in the partition file and build indexing data structures.
  44.    * If the keytype is {@link org.apache.hadoop.io.BinaryComparable} and
  45.    * <tt>total.order.partitioner.natural.order</tt> is not false, a trie
  46.    * of the first <tt>total.order.partitioner.max.trie.depth</tt>(2) + 1 bytes
  47.    * will be built. Otherwise, keys will be located using a binary search of
  48.    * the partition keyset using the {@link org.apache.hadoop.io.RawComparator}
  49.    * defined for this job. The input file must be sorted with the same
  50.    * comparator and contain {@link
  51.      org.apache.hadoop.mapred.JobConf#getNumReduceTasks} - 1 keys.
  52.    */
  53.   @SuppressWarnings("unchecked") // keytype from conf not static
  54.   public void configure(JobConf job) {
  55.     try {
  56.       String parts = getPartitionFile(job);
  57.       final Path partFile = new Path(parts);
  58.       final FileSystem fs = (DEFAULT_PATH.equals(parts))
  59.         ? FileSystem.getLocal(job)     // assume in DistributedCache
  60.         : partFile.getFileSystem(job);
  61.       Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
  62.       K[] splitPoints = readPartitions(fs, partFile, keyClass, job);
  63.       if (splitPoints.length != job.getNumReduceTasks() - 1) {
  64.         throw new IOException("Wrong number of partitions in keyset");
  65.       }
  66.       RawComparator<K> comparator =
  67.         (RawComparator<K>) job.getOutputKeyComparator();
  68.       for (int i = 0; i < splitPoints.length - 1; ++i) {
  69.         if (comparator.compare(splitPoints[i], splitPoints[i+1]) >= 0) {
  70.           throw new IOException("Split points are out of order");
  71.         }
  72.       }
  73.       boolean natOrder =
  74.         job.getBoolean("total.order.partitioner.natural.order", true);
  75.       if (natOrder && BinaryComparable.class.isAssignableFrom(keyClass)) {
  76.         partitions = buildTrie((BinaryComparable[])splitPoints, 0,
  77.             splitPoints.length, new byte[0],
  78.             job.getInt("total.order.partitioner.max.trie.depth", 2));
  79.       } else {
  80.         partitions = new BinarySearchNode(splitPoints, comparator);
  81.       }
  82.     } catch (IOException e) {
  83.       throw new IllegalArgumentException("Can't read partitions file", e);
  84.     }
  85.   }
  86.                                  // by construction, we know if our keytype
  87.   @SuppressWarnings("unchecked") // is memcmp-able and uses the trie
  88.   public int getPartition(K key, V value, int numPartitions) {
  89.     return partitions.findPartition(key);
  90.   }
  91.   /**
  92.    * Set the path to the SequenceFile storing the sorted partition keyset.
  93.    * It must be the case that for <tt>R</tt> reduces, there are <tt>R-1</tt>
  94.    * keys in the SequenceFile.
  95.    */
  96.   public static void setPartitionFile(JobConf job, Path p) {
  97.     job.set("total.order.partitioner.path", p.toString());
  98.   }
  99.   /**
  100.    * Get the path to the SequenceFile storing the sorted partition keyset.
  101.    * @see #setPartitionFile(JobConf,Path)
  102.    */
  103.   public static String getPartitionFile(JobConf job) {
  104.     return job.get("total.order.partitioner.path", DEFAULT_PATH);
  105.   }
  106.   /**
  107.    * Interface to the partitioner to locate a key in the partition keyset.
  108.    */
  109.   interface Node<T> {
  110.     /**
  111.      * Locate partition in keyset K, st [Ki..Ki+1) defines a partition,
  112.      * with implicit K0 = -inf, Kn = +inf, and |K| = #partitions - 1.
  113.      */
  114.     int findPartition(T key);
  115.   }
  116.   /**
  117.    * Base class for trie nodes. If the keytype is memcomp-able, this builds
  118.    * tries of the first <tt>total.order.partitioner.max.trie.depth</tt>
  119.    * bytes.
  120.    */
  121.   static abstract class TrieNode implements Node<BinaryComparable> {
  122.     private final int level;
  123.     TrieNode(int level) {
  124.       this.level = level;
  125.     }
  126.     int getLevel() {
  127.       return level;
  128.     }
  129.   }
  130.   /**
  131.    * For types that are not {@link org.apache.hadoop.io.BinaryComparable} or
  132.    * where disabled by <tt>total.order.partitioner.natural.order</tt>,
  133.    * search the partition keyset with a binary search.
  134.    */
  135.   class BinarySearchNode implements Node<K> {
  136.     private final K[] splitPoints;
  137.     private final RawComparator<K> comparator;
  138.     BinarySearchNode(K[] splitPoints, RawComparator<K> comparator) {
  139.       this.splitPoints = splitPoints;
  140.       this.comparator = comparator;
  141.     }
  142.     public int findPartition(K key) {
  143.       final int pos = Arrays.binarySearch(splitPoints, key, comparator) + 1;
  144.       return (pos < 0) ? -pos : pos;
  145.     }
  146.   }
  147.   /**
  148.    * An inner trie node that contains 256 children based on the next
  149.    * character.
  150.    */
  151.   class InnerTrieNode extends TrieNode {
  152.     private TrieNode[] child = new TrieNode[256];
  153.     InnerTrieNode(int level) {
  154.       super(level);
  155.     }
  156.     public int findPartition(BinaryComparable key) {
  157.       int level = getLevel();
  158.       if (key.getLength() <= level) {
  159.         return child[0].findPartition(key);
  160.       }
  161.       return child[0xFF & key.getBytes()[level]].findPartition(key);
  162.     }
  163.   }
  164.   /**
  165.    * A leaf trie node that scans for the key between lower..upper.
  166.    */
  167.   class LeafTrieNode extends TrieNode {
  168.     final int lower;
  169.     final int upper;
  170.     final BinaryComparable[] splitPoints;
  171.     LeafTrieNode(int level, BinaryComparable[] splitPoints, int lower, int upper) {
  172.       super(level);
  173.       this.lower = lower;
  174.       this.upper = upper;
  175.       this.splitPoints = splitPoints;
  176.     }
  177.     public int findPartition(BinaryComparable key) {
  178.       final int pos = Arrays.binarySearch(splitPoints, lower, upper, key) + 1;
  179.       return (pos < 0) ? -pos : pos;
  180.     }
  181.   }
  182.   /**
  183.    * Read the cut points from the given IFile.
  184.    * @param fs The file system
  185.    * @param p The path to read
  186.    * @param keyClass The map output key class
  187.    * @param job The job config
  188.    * @throws IOException
  189.    */
  190.                                  // matching key types enforced by passing in
  191.   @SuppressWarnings("unchecked") // map output key class
  192.   private K[] readPartitions(FileSystem fs, Path p, Class<K> keyClass,
  193.       JobConf job) throws IOException {
  194.     SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, job);
  195.     ArrayList<K> parts = new ArrayList<K>();
  196.     K key = (K) ReflectionUtils.newInstance(keyClass, job);
  197.     NullWritable value = NullWritable.get();
  198.     while (reader.next(key, value)) {
  199.       parts.add(key);
  200.       key = (K) ReflectionUtils.newInstance(keyClass, job);
  201.     }
  202.     reader.close();
  203.     return parts.toArray((K[])Array.newInstance(keyClass, parts.size()));
  204.   }
  205.   /**
  206.    * Given a sorted set of cut points, build a trie that will find the correct
  207.    * partition quickly.
  208.    * @param splits the list of cut points
  209.    * @param lower the lower bound of partitions 0..numPartitions-1
  210.    * @param upper the upper bound of partitions 0..numPartitions-1
  211.    * @param prefix the prefix that we have already checked against
  212.    * @param maxDepth the maximum depth we will build a trie for
  213.    * @return the trie node that will divide the splits correctly
  214.    */
  215.   private TrieNode buildTrie(BinaryComparable[] splits, int lower,
  216.       int upper, byte[] prefix, int maxDepth) {
  217.     final int depth = prefix.length;
  218.     if (depth >= maxDepth || lower == upper) {
  219.       return new LeafTrieNode(depth, splits, lower, upper);
  220.     }
  221.     InnerTrieNode result = new InnerTrieNode(depth);
  222.     byte[] trial = Arrays.copyOf(prefix, prefix.length + 1);
  223.     // append an extra byte on to the prefix
  224.     int currentBound = lower;
  225.     for(int ch = 0; ch < 255; ++ch) {
  226.       trial[depth] = (byte) (ch + 1);
  227.       lower = currentBound;
  228.       while (currentBound < upper) {
  229.         if (splits[currentBound].compareTo(trial, 0, trial.length) >= 0) {
  230.           break;
  231.         }
  232.         currentBound += 1;
  233.       }
  234.       trial[depth] = (byte) ch;
  235.       result.child[0xFF & ch] = buildTrie(splits, lower, currentBound, trial,
  236.                                    maxDepth);
  237.     }
  238.     // pick up the rest
  239.     trial[depth] = 127;
  240.     result.child[255] = buildTrie(splits, currentBound, upper, trial,
  241.                                   maxDepth);
  242.     return result;
  243.   }
  244. }