TeraSort.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:9k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.examples.terasort;
  19. import java.io.IOException;
  20. import java.io.PrintStream;
  21. import java.net.URI;
  22. import java.util.ArrayList;
  23. import java.util.List;
  24. import org.apache.commons.logging.Log;
  25. import org.apache.commons.logging.LogFactory;
  26. import org.apache.hadoop.conf.Configured;
  27. import org.apache.hadoop.filecache.DistributedCache;
  28. import org.apache.hadoop.fs.FileSystem;
  29. import org.apache.hadoop.fs.Path;
  30. import org.apache.hadoop.io.NullWritable;
  31. import org.apache.hadoop.io.SequenceFile;
  32. import org.apache.hadoop.io.Text;
  33. import org.apache.hadoop.mapred.FileOutputFormat;
  34. import org.apache.hadoop.mapred.JobClient;
  35. import org.apache.hadoop.mapred.JobConf;
  36. import org.apache.hadoop.mapred.Partitioner;
  37. import org.apache.hadoop.util.Tool;
  38. import org.apache.hadoop.util.ToolRunner;
  39. /**
  40.  * Generates the sampled split points, launches the job, and waits for it to
  41.  * finish. 
  42.  * <p>
  43.  * To run the program: 
  44.  * <b>bin/hadoop jar hadoop-*-examples.jar terasort in-dir out-dir</b>
  45.  */
  46. public class TeraSort extends Configured implements Tool {
  47.   private static final Log LOG = LogFactory.getLog(TeraSort.class);
  48.   /**
  49.    * A partitioner that splits text keys into roughly equal partitions
  50.    * in a global sorted order.
  51.    */
  52.   static class TotalOrderPartitioner implements Partitioner<Text,Text>{
  53.     private TrieNode trie;
  54.     private Text[] splitPoints;
  55.     /**
  56.      * A generic trie node
  57.      */
  58.     static abstract class TrieNode {
  59.       private int level;
  60.       TrieNode(int level) {
  61.         this.level = level;
  62.       }
  63.       abstract int findPartition(Text key);
  64.       abstract void print(PrintStream strm) throws IOException;
  65.       int getLevel() {
  66.         return level;
  67.       }
  68.     }
  69.     /**
  70.      * An inner trie node that contains 256 children based on the next
  71.      * character.
  72.      */
  73.     static class InnerTrieNode extends TrieNode {
  74.       private TrieNode[] child = new TrieNode[256];
  75.       
  76.       InnerTrieNode(int level) {
  77.         super(level);
  78.       }
  79.       int findPartition(Text key) {
  80.         int level = getLevel();
  81.         if (key.getLength() <= level) {
  82.           return child[0].findPartition(key);
  83.         }
  84.         return child[key.getBytes()[level]].findPartition(key);
  85.       }
  86.       void setChild(int idx, TrieNode child) {
  87.         this.child[idx] = child;
  88.       }
  89.       void print(PrintStream strm) throws IOException {
  90.         for(int ch=0; ch < 255; ++ch) {
  91.           for(int i = 0; i < 2*getLevel(); ++i) {
  92.             strm.print(' ');
  93.           }
  94.           strm.print(ch);
  95.           strm.println(" ->");
  96.           if (child[ch] != null) {
  97.             child[ch].print(strm);
  98.           }
  99.         }
  100.       }
  101.     }
  102.     /**
  103.      * A leaf trie node that does string compares to figure out where the given
  104.      * key belongs between lower..upper.
  105.      */
  106.     static class LeafTrieNode extends TrieNode {
  107.       int lower;
  108.       int upper;
  109.       Text[] splitPoints;
  110.       LeafTrieNode(int level, Text[] splitPoints, int lower, int upper) {
  111.         super(level);
  112.         this.splitPoints = splitPoints;
  113.         this.lower = lower;
  114.         this.upper = upper;
  115.       }
  116.       int findPartition(Text key) {
  117.         for(int i=lower; i<upper; ++i) {
  118.           if (splitPoints[i].compareTo(key) >= 0) {
  119.             return i;
  120.           }
  121.         }
  122.         return upper;
  123.       }
  124.       void print(PrintStream strm) throws IOException {
  125.         for(int i = 0; i < 2*getLevel(); ++i) {
  126.           strm.print(' ');
  127.         }
  128.         strm.print(lower);
  129.         strm.print(", ");
  130.         strm.println(upper);
  131.       }
  132.     }
  133.     /**
  134.      * Read the cut points from the given sequence file.
  135.      * @param fs the file system
  136.      * @param p the path to read
  137.      * @param job the job config
  138.      * @return the strings to split the partitions on
  139.      * @throws IOException
  140.      */
  141.     private static Text[] readPartitions(FileSystem fs, Path p, 
  142.                                          JobConf job) throws IOException {
  143.       SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, job);
  144.       List<Text> parts = new ArrayList<Text>();
  145.       Text key = new Text();
  146.       NullWritable value = NullWritable.get();
  147.       while (reader.next(key, value)) {
  148.         parts.add(key);
  149.         key = new Text();
  150.       }
  151.       reader.close();
  152.       return parts.toArray(new Text[parts.size()]);  
  153.     }
  154.     /**
  155.      * Given a sorted set of cut points, build a trie that will find the correct
  156.      * partition quickly.
  157.      * @param splits the list of cut points
  158.      * @param lower the lower bound of partitions 0..numPartitions-1
  159.      * @param upper the upper bound of partitions 0..numPartitions-1
  160.      * @param prefix the prefix that we have already checked against
  161.      * @param maxDepth the maximum depth we will build a trie for
  162.      * @return the trie node that will divide the splits correctly
  163.      */
  164.     private static TrieNode buildTrie(Text[] splits, int lower, int upper, 
  165.                                       Text prefix, int maxDepth) {
  166.       int depth = prefix.getLength();
  167.       if (depth >= maxDepth || lower == upper) {
  168.         return new LeafTrieNode(depth, splits, lower, upper);
  169.       }
  170.       InnerTrieNode result = new InnerTrieNode(depth);
  171.       Text trial = new Text(prefix);
  172.       // append an extra byte on to the prefix
  173.       trial.append(new byte[1], 0, 1);
  174.       int currentBound = lower;
  175.       for(int ch = 0; ch < 255; ++ch) {
  176.         trial.getBytes()[depth] = (byte) (ch + 1);
  177.         lower = currentBound;
  178.         while (currentBound < upper) {
  179.           if (splits[currentBound].compareTo(trial) >= 0) {
  180.             break;
  181.           }
  182.           currentBound += 1;
  183.         }
  184.         trial.getBytes()[depth] = (byte) ch;
  185.         result.child[ch] = buildTrie(splits, lower, currentBound, trial, 
  186.                                      maxDepth);
  187.       }
  188.       // pick up the rest
  189.       trial.getBytes()[depth] = 127;
  190.       result.child[255] = buildTrie(splits, currentBound, upper, trial,
  191.                                     maxDepth);
  192.       return result;
  193.     }
  194.     public void configure(JobConf job) {
  195.       try {
  196.         FileSystem fs = FileSystem.getLocal(job);
  197.         Path partFile = new Path(TeraInputFormat.PARTITION_FILENAME);
  198.         splitPoints = readPartitions(fs, partFile, job);
  199.         trie = buildTrie(splitPoints, 0, splitPoints.length, new Text(), 2);
  200.       } catch (IOException ie) {
  201.         throw new IllegalArgumentException("can't read paritions file", ie);
  202.       }
  203.     }
  204.     public TotalOrderPartitioner() {
  205.     }
  206.     public int getPartition(Text key, Text value, int numPartitions) {
  207.       return trie.findPartition(key);
  208.     }
  209.     
  210.   }
  211.   
  212.   public int run(String[] args) throws Exception {
  213.     LOG.info("starting");
  214.     JobConf job = (JobConf) getConf();
  215.     Path inputDir = new Path(args[0]);
  216.     inputDir = inputDir.makeQualified(inputDir.getFileSystem(job));
  217.     Path partitionFile = new Path(inputDir, TeraInputFormat.PARTITION_FILENAME);
  218.     URI partitionUri = new URI(partitionFile.toString() +
  219.                                "#" + TeraInputFormat.PARTITION_FILENAME);
  220.     TeraInputFormat.setInputPaths(job, new Path(args[0]));
  221.     FileOutputFormat.setOutputPath(job, new Path(args[1]));
  222.     job.setJobName("TeraSort");
  223.     job.setJarByClass(TeraSort.class);
  224.     job.setOutputKeyClass(Text.class);
  225.     job.setOutputValueClass(Text.class);
  226.     job.setInputFormat(TeraInputFormat.class);
  227.     job.setOutputFormat(TeraOutputFormat.class);
  228.     job.setPartitionerClass(TotalOrderPartitioner.class);
  229.     TeraInputFormat.writePartitionFile(job, partitionFile);
  230.     DistributedCache.addCacheFile(partitionUri, job);
  231.     DistributedCache.createSymlink(job);
  232.     job.setInt("dfs.replication", 1);
  233.     TeraOutputFormat.setFinalSync(job, true);
  234.     JobClient.runJob(job);
  235.     LOG.info("done");
  236.     return 0;
  237.   }
  238.   /**
  239.    * @param args
  240.    */
  241.   public static void main(String[] args) throws Exception {
  242.     int res = ToolRunner.run(new JobConf(), new TeraSort(), args);
  243.     System.exit(res);
  244.   }
  245. }