TestAggregates.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:4k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred.lib.aggregate;
  19. import org.apache.hadoop.fs.*;
  20. import org.apache.hadoop.io.*;
  21. import org.apache.hadoop.mapred.*;
  22. import org.apache.hadoop.mapred.lib.*;
  23. import junit.framework.TestCase;
  24. import java.io.*;
  25. import java.util.*;
  26. import java.text.NumberFormat;
  27. public class TestAggregates extends TestCase {
  28.   private static NumberFormat idFormat = NumberFormat.getInstance();
  29.     static {
  30.       idFormat.setMinimumIntegerDigits(4);
  31.       idFormat.setGroupingUsed(false);
  32.   }
  33.   public void testAggregates() throws Exception {
  34.     launch();
  35.   }
  36.   public static void launch() throws Exception {
  37.     JobConf conf = new JobConf(TestAggregates.class);
  38.     FileSystem fs = FileSystem.get(conf);
  39.     int numOfInputLines = 20;
  40.     Path OUTPUT_DIR = new Path("build/test/output_for_aggregates_test");
  41.     Path INPUT_DIR = new Path("build/test/input_for_aggregates_test");
  42.     String inputFile = "input.txt";
  43.     fs.delete(INPUT_DIR, true);
  44.     fs.mkdirs(INPUT_DIR);
  45.     fs.delete(OUTPUT_DIR, true);
  46.     StringBuffer inputData = new StringBuffer();
  47.     StringBuffer expectedOutput = new StringBuffer();
  48.     expectedOutput.append("maxt19n");
  49.     expectedOutput.append("mint1n"); 
  50.     FSDataOutputStream fileOut = fs.create(new Path(INPUT_DIR, inputFile));
  51.     for (int i = 1; i < numOfInputLines; i++) {
  52.       expectedOutput.append("count_").append(idFormat.format(i));
  53.       expectedOutput.append("t").append(i).append("n");
  54.       inputData.append(idFormat.format(i));
  55.       for (int j = 1; j < i; j++) {
  56.         inputData.append(" ").append(idFormat.format(i));
  57.       }
  58.       inputData.append("n");
  59.     }
  60.     expectedOutput.append("value_as_string_maxt9n");
  61.     expectedOutput.append("value_as_string_mint1n");
  62.     expectedOutput.append("uniq_countt15n");
  63.     fileOut.write(inputData.toString().getBytes("utf-8"));
  64.     fileOut.close();
  65.     System.out.println("inputData:");
  66.     System.out.println(inputData.toString());
  67.     JobConf job = new JobConf(conf, TestAggregates.class);
  68.     FileInputFormat.setInputPaths(job, INPUT_DIR);
  69.     job.setInputFormat(TextInputFormat.class);
  70.     FileOutputFormat.setOutputPath(job, OUTPUT_DIR);
  71.     job.setOutputFormat(TextOutputFormat.class);
  72.     job.setMapOutputKeyClass(Text.class);
  73.     job.setMapOutputValueClass(Text.class);
  74.     job.setOutputKeyClass(Text.class);
  75.     job.setOutputValueClass(Text.class);
  76.     job.setNumReduceTasks(1);
  77.     job.setMapperClass(ValueAggregatorMapper.class);
  78.     job.setReducerClass(ValueAggregatorReducer.class);
  79.     job.setCombinerClass(ValueAggregatorCombiner.class);
  80.     job.setInt("aggregator.descriptor.num", 1);
  81.     job.set("aggregator.descriptor.0", 
  82.           "UserDefined,org.apache.hadoop.mapred.lib.aggregate.AggregatorTests");
  83.     job.setLong("aggregate.max.num.unique.values", 14);
  84.     JobClient.runJob(job);
  85.     //
  86.     // Finally, we compare the reconstructed answer key with the
  87.     // original one.  Remember, we need to ignore zero-count items
  88.     // in the original key.
  89.     //
  90.     boolean success = true;
  91.     Path outPath = new Path(OUTPUT_DIR, "part-00000");
  92.     String outdata = TestMiniMRWithDFS.readOutput(outPath,job);
  93.     System.out.println("full out data:");
  94.     System.out.println(outdata.toString());
  95.     outdata = outdata.substring(0, expectedOutput.toString().length());
  96.     assertEquals(expectedOutput.toString(),outdata);
  97.     //fs.delete(OUTPUT_DIR);
  98.     fs.delete(INPUT_DIR, true);
  99.   }
  100.   /**
  101.    * Launches all the tasks in order.
  102.    */
  103.   public static void main(String[] argv) throws Exception {
  104.     launch();
  105.   }
  106. }