TestTotalOrderPartitioner.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:7k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred.lib;
  19. import java.io.IOException;
  20. import java.util.ArrayList;
  21. import java.util.Arrays;
  22. import junit.framework.Test;
  23. import junit.framework.TestCase;
  24. import junit.framework.TestSuite;
  25. import junit.extensions.TestSetup;
  26. import org.apache.hadoop.fs.FileSystem;
  27. import org.apache.hadoop.fs.Path;
  28. import org.apache.hadoop.io.NullWritable;
  29. import org.apache.hadoop.io.RawComparator;
  30. import org.apache.hadoop.io.SequenceFile;
  31. import org.apache.hadoop.io.Text;
  32. import org.apache.hadoop.io.WritableComparable;
  33. import org.apache.hadoop.io.WritableComparator;
  34. import org.apache.hadoop.io.WritableUtils;
  35. import org.apache.hadoop.mapred.JobConf;
  36. public class TestTotalOrderPartitioner extends TestCase {
  37.   private static final Text[] splitStrings = new Text[] {
  38.     // -inf            // 0
  39.     new Text("aabbb"), // 1
  40.     new Text("babbb"), // 2
  41.     new Text("daddd"), // 3
  42.     new Text("dddee"), // 4
  43.     new Text("ddhee"), // 5
  44.     new Text("dingo"), // 6
  45.     new Text("hijjj"), // 7
  46.     new Text("n"),     // 8
  47.     new Text("yak"),   // 9
  48.   };
  49.   static class Check<T> {
  50.     T data;
  51.     int part;
  52.     Check(T data, int part) {
  53.       this.data = data;
  54.       this.part = part;
  55.     }
  56.   }
  57.   private static final ArrayList<Check<Text>> testStrings =
  58.     new ArrayList<Check<Text>>();
  59.   static {
  60.     testStrings.add(new Check<Text>(new Text("aaaaa"), 0));
  61.     testStrings.add(new Check<Text>(new Text("aaabb"), 0));
  62.     testStrings.add(new Check<Text>(new Text("aabbb"), 1));
  63.     testStrings.add(new Check<Text>(new Text("aaaaa"), 0));
  64.     testStrings.add(new Check<Text>(new Text("babbb"), 2));
  65.     testStrings.add(new Check<Text>(new Text("baabb"), 1));
  66.     testStrings.add(new Check<Text>(new Text("yai"), 8));
  67.     testStrings.add(new Check<Text>(new Text("yak"), 9));
  68.     testStrings.add(new Check<Text>(new Text("z"), 9));
  69.     testStrings.add(new Check<Text>(new Text("ddngo"), 5));
  70.     testStrings.add(new Check<Text>(new Text("hi"), 6));
  71.   };
  72.   private static <T extends WritableComparable> Path writePartitionFile(
  73.       String testname, JobConf conf, T[] splits) throws IOException {
  74.     final FileSystem fs = FileSystem.getLocal(conf);
  75.     final Path testdir = new Path(System.getProperty("test.build.data", "/tmp")
  76.                                  ).makeQualified(fs);
  77.     Path p = new Path(testdir, testname + "/_partition.lst");
  78.     TotalOrderPartitioner.setPartitionFile(conf, p);
  79.     conf.setNumReduceTasks(splits.length + 1);
  80.     SequenceFile.Writer w = null;
  81.     try {
  82.       NullWritable nw = NullWritable.get();
  83.       w = SequenceFile.createWriter(fs, conf, p,
  84.           splits[0].getClass(), NullWritable.class,
  85.           SequenceFile.CompressionType.NONE);
  86.       for (int i = 0; i < splits.length; ++i) {
  87.         w.append(splits[i], NullWritable.get());
  88.       }
  89.     } finally {
  90.       if (null != w)
  91.         w.close();
  92.     }
  93.     return p;
  94.   }
  95.   public void testTotalOrderMemCmp() throws Exception {
  96.     TotalOrderPartitioner<Text,NullWritable> partitioner =
  97.       new TotalOrderPartitioner<Text,NullWritable>();
  98.     JobConf job = new JobConf();
  99.     Path p = TestTotalOrderPartitioner.<Text>writePartitionFile(
  100.         "totalordermemcmp", job, splitStrings);
  101.     job.setMapOutputKeyClass(Text.class);
  102.     try {
  103.       partitioner.configure(job);
  104.       NullWritable nw = NullWritable.get();
  105.       for (Check<Text> chk : testStrings) {
  106.         assertEquals(chk.data.toString(), chk.part,
  107.             partitioner.getPartition(chk.data, nw, splitStrings.length + 1));
  108.       }
  109.     } finally {
  110.       p.getFileSystem(job).delete(p);
  111.     }
  112.   }
  113.   public void testTotalOrderBinarySearch() throws Exception {
  114.     TotalOrderPartitioner<Text,NullWritable> partitioner =
  115.       new TotalOrderPartitioner<Text,NullWritable>();
  116.     JobConf job = new JobConf();
  117.     Path p = TestTotalOrderPartitioner.<Text>writePartitionFile(
  118.         "totalorderbinarysearch", job, splitStrings);
  119.     job.setBoolean("total.order.partitioner.natural.order", false);
  120.     job.setMapOutputKeyClass(Text.class);
  121.     try {
  122.       partitioner.configure(job);
  123.       NullWritable nw = NullWritable.get();
  124.       for (Check<Text> chk : testStrings) {
  125.         assertEquals(chk.data.toString(), chk.part,
  126.             partitioner.getPartition(chk.data, nw, splitStrings.length + 1));
  127.       }
  128.     } finally {
  129.       p.getFileSystem(job).delete(p);
  130.     }
  131.   }
  132.   public static class ReverseStringComparator implements RawComparator<Text> {
  133.     public int compare(Text a, Text b) {
  134.       return -a.compareTo(b);
  135.     }
  136.     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
  137.       int n1 = WritableUtils.decodeVIntSize(b1[s1]);
  138.       int n2 = WritableUtils.decodeVIntSize(b2[s2]);
  139.       return -1 * WritableComparator.compareBytes(b1, s1+n1, l1-n1,
  140.                                                   b2, s2+n2, l2-n2);
  141.     }
  142.   }
  143.   public void testTotalOrderCustomComparator() throws Exception {
  144.     TotalOrderPartitioner<Text,NullWritable> partitioner =
  145.       new TotalOrderPartitioner<Text,NullWritable>();
  146.     JobConf job = new JobConf();
  147.     Text[] revSplitStrings = Arrays.copyOf(splitStrings, splitStrings.length);
  148.     Arrays.sort(revSplitStrings, new ReverseStringComparator());
  149.     Path p = TestTotalOrderPartitioner.<Text>writePartitionFile(
  150.         "totalordercustomcomparator", job, revSplitStrings);
  151.     job.setBoolean("total.order.partitioner.natural.order", false);
  152.     job.setMapOutputKeyClass(Text.class);
  153.     job.setOutputKeyComparatorClass(ReverseStringComparator.class);
  154.     ArrayList<Check<Text>> revCheck = new ArrayList<Check<Text>>();
  155.     revCheck.add(new Check<Text>(new Text("aaaaa"), 9));
  156.     revCheck.add(new Check<Text>(new Text("aaabb"), 9));
  157.     revCheck.add(new Check<Text>(new Text("aabbb"), 9));
  158.     revCheck.add(new Check<Text>(new Text("aaaaa"), 9));
  159.     revCheck.add(new Check<Text>(new Text("babbb"), 8));
  160.     revCheck.add(new Check<Text>(new Text("baabb"), 8));
  161.     revCheck.add(new Check<Text>(new Text("yai"), 1));
  162.     revCheck.add(new Check<Text>(new Text("yak"), 1));
  163.     revCheck.add(new Check<Text>(new Text("z"), 0));
  164.     revCheck.add(new Check<Text>(new Text("ddngo"), 4));
  165.     revCheck.add(new Check<Text>(new Text("hi"), 3));
  166.     try {
  167.       partitioner.configure(job);
  168.       NullWritable nw = NullWritable.get();
  169.       for (Check<Text> chk : revCheck) {
  170.         assertEquals(chk.data.toString(), chk.part,
  171.             partitioner.getPartition(chk.data, nw, splitStrings.length + 1));
  172.       }
  173.     } finally {
  174.       p.getFileSystem(job).delete(p);
  175.     }
  176.   }
  177. }