SecondarySort.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:8k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.examples;
  19. import java.io.DataInput;
  20. import java.io.DataOutput;
  21. import java.io.IOException;
  22. import java.util.StringTokenizer;
  23. import org.apache.hadoop.conf.Configuration;
  24. import org.apache.hadoop.fs.Path;
  25. import org.apache.hadoop.io.IntWritable;
  26. import org.apache.hadoop.io.LongWritable;
  27. import org.apache.hadoop.io.RawComparator;
  28. import org.apache.hadoop.io.Text;
  29. import org.apache.hadoop.io.WritableComparable;
  30. import org.apache.hadoop.io.WritableComparator;
  31. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  32. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  33. import org.apache.hadoop.mapreduce.Job;
  34. import org.apache.hadoop.mapreduce.Mapper;
  35. import org.apache.hadoop.mapreduce.Partitioner;
  36. import org.apache.hadoop.mapreduce.Reducer;
  37. import org.apache.hadoop.util.GenericOptionsParser;
  38. /**
  39.  * This is an example Hadoop Map/Reduce application.
  40.  * It reads the text input files that must contain two integers per a line.
  41.  * The output is sorted by the first and second number and grouped on the 
  42.  * first number.
  43.  *
  44.  * To run: bin/hadoop jar build/hadoop-examples.jar secondarysort
  45.  *            <i>in-dir</i> <i>out-dir</i> 
  46.  */
  47. public class SecondarySort {
  48.  
  49.   /**
  50.    * Define a pair of integers that are writable.
  51.    * They are serialized in a byte comparable format.
  52.    */
  53.   public static class IntPair 
  54.                       implements WritableComparable<IntPair> {
  55.     private int first = 0;
  56.     private int second = 0;
  57.     
  58.     /**
  59.      * Set the left and right values.
  60.      */
  61.     public void set(int left, int right) {
  62.       first = left;
  63.       second = right;
  64.     }
  65.     public int getFirst() {
  66.       return first;
  67.     }
  68.     public int getSecond() {
  69.       return second;
  70.     }
  71.     /**
  72.      * Read the two integers. 
  73.      * Encoded as: MIN_VALUE -> 0, 0 -> -MIN_VALUE, MAX_VALUE-> -1
  74.      */
  75.     @Override
  76.     public void readFields(DataInput in) throws IOException {
  77.       first = in.readInt() + Integer.MIN_VALUE;
  78.       second = in.readInt() + Integer.MIN_VALUE;
  79.     }
  80.     @Override
  81.     public void write(DataOutput out) throws IOException {
  82.       out.writeInt(first - Integer.MIN_VALUE);
  83.       out.writeInt(second - Integer.MIN_VALUE);
  84.     }
  85.     @Override
  86.     public int hashCode() {
  87.       return first * 157 + second;
  88.     }
  89.     @Override
  90.     public boolean equals(Object right) {
  91.       if (right instanceof IntPair) {
  92.         IntPair r = (IntPair) right;
  93.         return r.first == first && r.second == second;
  94.       } else {
  95.         return false;
  96.       }
  97.     }
  98.     /** A Comparator that compares serialized IntPair. */ 
  99.     public static class Comparator extends WritableComparator {
  100.       public Comparator() {
  101.         super(IntPair.class);
  102.       }
  103.       public int compare(byte[] b1, int s1, int l1,
  104.                          byte[] b2, int s2, int l2) {
  105.         return compareBytes(b1, s1, l1, b2, s2, l2);
  106.       }
  107.     }
  108.     static {                                        // register this comparator
  109.       WritableComparator.define(IntPair.class, new Comparator());
  110.     }
  111.     @Override
  112.     public int compareTo(IntPair o) {
  113.       if (first != o.first) {
  114.         return first < o.first ? -1 : 1;
  115.       } else if (second != o.second) {
  116.         return second < o.second ? -1 : 1;
  117.       } else {
  118.         return 0;
  119.       }
  120.     }
  121.   }
  122.   
  123.   /**
  124.    * Partition based on the first part of the pair.
  125.    */
  126.   public static class FirstPartitioner extends Partitioner<IntPair,IntWritable>{
  127.     @Override
  128.     public int getPartition(IntPair key, IntWritable value, 
  129.                             int numPartitions) {
  130.       return Math.abs(key.getFirst() * 127) % numPartitions;
  131.     }
  132.   }
  133.   /**
  134.    * Compare only the first part of the pair, so that reduce is called once
  135.    * for each value of the first part.
  136.    */
  137.   public static class FirstGroupingComparator 
  138.                 implements RawComparator<IntPair> {
  139.     @Override
  140.     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
  141.       return WritableComparator.compareBytes(b1, s1, Integer.SIZE/8, 
  142.                                              b2, s2, Integer.SIZE/8);
  143.     }
  144.     @Override
  145.     public int compare(IntPair o1, IntPair o2) {
  146.       int l = o1.getFirst();
  147.       int r = o2.getFirst();
  148.       return l == r ? 0 : (l < r ? -1 : 1);
  149.     }
  150.   }
  151.   /**
  152.    * Read two integers from each line and generate a key, value pair
  153.    * as ((left, right), right).
  154.    */
  155.   public static class MapClass 
  156.          extends Mapper<LongWritable, Text, IntPair, IntWritable> {
  157.     
  158.     private final IntPair key = new IntPair();
  159.     private final IntWritable value = new IntWritable();
  160.     
  161.     @Override
  162.     public void map(LongWritable inKey, Text inValue, 
  163.                     Context context) throws IOException, InterruptedException {
  164.       StringTokenizer itr = new StringTokenizer(inValue.toString());
  165.       int left = 0;
  166.       int right = 0;
  167.       if (itr.hasMoreTokens()) {
  168.         left = Integer.parseInt(itr.nextToken());
  169.         if (itr.hasMoreTokens()) {
  170.           right = Integer.parseInt(itr.nextToken());
  171.         }
  172.         key.set(left, right);
  173.         value.set(right);
  174.         context.write(key, value);
  175.       }
  176.     }
  177.   }
  178.   
  179.   /**
  180.    * A reducer class that just emits the sum of the input values.
  181.    */
  182.   public static class Reduce 
  183.          extends Reducer<IntPair, IntWritable, Text, IntWritable> {
  184.     private static final Text SEPARATOR = 
  185.       new Text("------------------------------------------------");
  186.     private final Text first = new Text();
  187.     
  188.     @Override
  189.     public void reduce(IntPair key, Iterable<IntWritable> values,
  190.                        Context context
  191.                        ) throws IOException, InterruptedException {
  192.       context.write(SEPARATOR, null);
  193.       first.set(Integer.toString(key.getFirst()));
  194.       for(IntWritable value: values) {
  195.         context.write(first, value);
  196.       }
  197.     }
  198.   }
  199.   
  200.   public static void main(String[] args) throws Exception {
  201.     Configuration conf = new Configuration();
  202.     String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  203.     if (otherArgs.length != 2) {
  204.       System.err.println("Usage: secondarysrot <in> <out>");
  205.       System.exit(2);
  206.     }
  207.     Job job = new Job(conf, "secondary sort");
  208.     job.setJarByClass(SecondarySort.class);
  209.     job.setMapperClass(MapClass.class);
  210.     job.setReducerClass(Reduce.class);
  211.     // group and partition by the first int in the pair
  212.     job.setPartitionerClass(FirstPartitioner.class);
  213.     job.setGroupingComparatorClass(FirstGroupingComparator.class);
  214.     // the map output is IntPair, IntWritable
  215.     job.setMapOutputKeyClass(IntPair.class);
  216.     job.setMapOutputValueClass(IntWritable.class);
  217.     // the reduce output is Text, IntWritable
  218.     job.setOutputKeyClass(Text.class);
  219.     job.setOutputValueClass(IntWritable.class);
  220.     
  221.     FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  222.     FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  223.     System.exit(job.waitForCompletion(true) ? 0 : 1);
  224.   }
  225. }