TestMapReduceLocal.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:7k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapreduce;
  19. import java.io.BufferedReader;
  20. import java.io.DataInputStream;
  21. import java.io.DataOutputStream;
  22. import java.io.IOException;
  23. import java.io.InputStreamReader;
  24. import junit.framework.TestCase;
  25. import org.apache.hadoop.conf.Configuration;
  26. import org.apache.hadoop.examples.SecondarySort;
  27. import org.apache.hadoop.examples.WordCount;
  28. import org.apache.hadoop.examples.SecondarySort.FirstGroupingComparator;
  29. import org.apache.hadoop.examples.SecondarySort.FirstPartitioner;
  30. import org.apache.hadoop.examples.SecondarySort.IntPair;
  31. import org.apache.hadoop.examples.WordCount.IntSumReducer;
  32. import org.apache.hadoop.examples.WordCount.TokenizerMapper;
  33. import org.apache.hadoop.fs.FileSystem;
  34. import org.apache.hadoop.fs.Path;
  35. import org.apache.hadoop.io.IntWritable;
  36. import org.apache.hadoop.io.Text;
  37. import org.apache.hadoop.mapred.MiniMRCluster;
  38. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  39. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  40. /**
  41.  * A JUnit test to test min map-reduce cluster with local file system.
  42.  */
  43. public class TestMapReduceLocal extends TestCase {
  44.   private static Path TEST_ROOT_DIR =
  45.     new Path(System.getProperty("test.build.data","/tmp"));
  46.   private static Configuration conf = new Configuration();
  47.   private static FileSystem localFs;
  48.   static {
  49.     try {
  50.       localFs = FileSystem.getLocal(conf);
  51.     } catch (IOException io) {
  52.       throw new RuntimeException("problem getting local fs", io);
  53.     }
  54.   }
  55.   public Path writeFile(String name, String data) throws IOException {
  56.     Path file = new Path(TEST_ROOT_DIR + "/" + name);
  57.     localFs.delete(file, false);
  58.     DataOutputStream f = localFs.create(file);
  59.     f.write(data.getBytes());
  60.     f.close();
  61.     return file;
  62.   }
  63.   public String readFile(String name) throws IOException {
  64.     DataInputStream f = localFs.open(new Path(TEST_ROOT_DIR + "/" + name));
  65.     BufferedReader b = new BufferedReader(new InputStreamReader(f));
  66.     StringBuilder result = new StringBuilder();
  67.     String line = b.readLine();
  68.     while (line != null) {
  69.      result.append(line);
  70.      result.append('n');
  71.      line = b.readLine();
  72.     }
  73.     b.close();
  74.     return result.toString();
  75.   }
  76.   public void testWithLocal() throws Exception {
  77.     MiniMRCluster mr = null;
  78.     try {
  79.       mr = new MiniMRCluster(2, "file:///", 3);
  80.       Configuration conf = mr.createJobConf();
  81.       runWordCount(conf);
  82.       runSecondarySort(conf);
  83.     } finally {
  84.       if (mr != null) { mr.shutdown(); }
  85.     }
  86.   }
  87.   private void runWordCount(Configuration conf
  88.                             ) throws IOException,
  89.                                      InterruptedException,
  90.                                      ClassNotFoundException {
  91.     final String COUNTER_GROUP = "org.apache.hadoop.mapred.Task$Counter";
  92.     localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
  93.     localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);    
  94.     writeFile("in/part1", "this is a testnof word count testntestn");
  95.     writeFile("in/part2", "more test");
  96.     Job job = new Job(conf, "word count");     
  97.     job.setJarByClass(WordCount.class);
  98.     job.setMapperClass(TokenizerMapper.class);
  99.     job.setCombinerClass(IntSumReducer.class);
  100.     job.setReducerClass(IntSumReducer.class);
  101.     job.setOutputKeyClass(Text.class);
  102.     job.setOutputValueClass(IntWritable.class);
  103.     FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
  104.     FileOutputFormat.setOutputPath(job, new Path(TEST_ROOT_DIR + "/out"));
  105.     assertTrue(job.waitForCompletion(false));
  106.     String out = readFile("out/part-r-00000");
  107.     System.out.println(out);
  108.     assertEquals("at1ncountt1nist1nmoret1noft1ntestt4nthist1nwordt1n",
  109.                  out);
  110.     Counters ctrs = job.getCounters();
  111.     System.out.println("Counters: " + ctrs);
  112.     long combineIn = ctrs.findCounter(COUNTER_GROUP,
  113.                                       "COMBINE_INPUT_RECORDS").getValue();
  114.     long combineOut = ctrs.findCounter(COUNTER_GROUP, 
  115.                                        "COMBINE_OUTPUT_RECORDS").getValue();
  116.     long reduceIn = ctrs.findCounter(COUNTER_GROUP,
  117.                                      "REDUCE_INPUT_RECORDS").getValue();
  118.     long mapOut = ctrs.findCounter(COUNTER_GROUP, 
  119.                                    "MAP_OUTPUT_RECORDS").getValue();
  120.     assertEquals("map out = combine in", mapOut, combineIn);
  121.     assertEquals("combine out = reduce in", combineOut, reduceIn);
  122.     assertTrue("combine in > combine out", combineIn > combineOut);
  123.   }
  124.   private void runSecondarySort(Configuration conf) throws IOException,
  125.                                                         InterruptedException,
  126.                                                         ClassNotFoundException {
  127.     localFs.delete(new Path(TEST_ROOT_DIR + "/in"), true);
  128.     localFs.delete(new Path(TEST_ROOT_DIR + "/out"), true);
  129.     writeFile("in/part1", "-1 -4n-3 23n5 10n-1 -2n-1 300n-1 10n4 1n" +
  130.               "4 2n4 10n4 -1n4 -10n10 20n10 30n10 25n");
  131.     Job job = new Job(conf, "word count");     
  132.     job.setJarByClass(WordCount.class);
  133.     job.setMapperClass(SecondarySort.MapClass.class);
  134.     job.setReducerClass(SecondarySort.Reduce.class);
  135.     // group and partition by the first int in the pair
  136.     job.setPartitionerClass(FirstPartitioner.class);
  137.     job.setGroupingComparatorClass(FirstGroupingComparator.class);
  138.     // the map output is IntPair, IntWritable
  139.     job.setMapOutputKeyClass(IntPair.class);
  140.     job.setMapOutputValueClass(IntWritable.class);
  141.     // the reduce output is Text, IntWritable
  142.     job.setOutputKeyClass(Text.class);
  143.     job.setOutputValueClass(IntWritable.class);
  144.     
  145.     FileInputFormat.addInputPath(job, new Path(TEST_ROOT_DIR + "/in"));
  146.     FileOutputFormat.setOutputPath(job, new Path(TEST_ROOT_DIR + "/out"));
  147.     assertTrue(job.waitForCompletion(true));
  148.     String out = readFile("out/part-r-00000");
  149.     assertEquals("------------------------------------------------n" +
  150.                  "-3t23n" +
  151.                  "------------------------------------------------n" +
  152.                  "-1t-4n-1t-2n-1t10n-1t300n" +
  153.                  "------------------------------------------------n" +
  154.                  "4t-10n4t-1n4t1n4t2n4t10n" +
  155.                  "------------------------------------------------n" +
  156.                  "5t10n" +
  157.                  "------------------------------------------------n" +
  158.                  "10t20n10t25n10t30n", out);
  159.   }
  160.   
  161. }