TestUserDefinedCounters.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:3k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.BufferedReader;
  20. import java.io.IOException;
  21. import java.io.InputStream;
  22. import java.io.InputStreamReader;
  23. import java.io.OutputStream;
  24. import java.io.OutputStreamWriter;
  25. import java.io.Writer;
  26. import org.apache.hadoop.fs.FileUtil;
  27. import org.apache.hadoop.fs.Path;
  28. import org.apache.hadoop.io.LongWritable;
  29. import org.apache.hadoop.io.Text;
  30. import org.apache.hadoop.mapred.lib.IdentityMapper;
  31. import org.apache.hadoop.mapred.lib.IdentityReducer;
  32. public class TestUserDefinedCounters extends ClusterMapReduceTestCase {
  33.   
  34.   enum EnumCounter { MAP_RECORDS }
  35.   
  36.   static class CountingMapper<K, V> extends IdentityMapper<K, V> {
  37.     public void map(K key, V value,
  38.         OutputCollector<K, V> output, Reporter reporter)
  39.         throws IOException {
  40.       output.collect(key, value);
  41.       reporter.incrCounter(EnumCounter.MAP_RECORDS, 1);
  42.       reporter.incrCounter("StringCounter", "MapRecords", 1);
  43.     }
  44.   }
  45.   
  46.   public void testMapReduceJob() throws Exception {
  47.     OutputStream os = getFileSystem().create(new Path(getInputDir(), "text.txt"));
  48.     Writer wr = new OutputStreamWriter(os);
  49.     wr.write("hello1n");
  50.     wr.write("hello2n");
  51.     wr.write("hello3n");
  52.     wr.write("hello4n");
  53.     wr.close();
  54.     JobConf conf = createJobConf();
  55.     conf.setJobName("counters");
  56.     
  57.     conf.setInputFormat(TextInputFormat.class);
  58.     conf.setMapOutputKeyClass(LongWritable.class);
  59.     conf.setMapOutputValueClass(Text.class);
  60.     conf.setOutputFormat(TextOutputFormat.class);
  61.     conf.setOutputKeyClass(LongWritable.class);
  62.     conf.setOutputValueClass(Text.class);
  63.     conf.setMapperClass(CountingMapper.class);
  64.     conf.setReducerClass(IdentityReducer.class);
  65.     FileInputFormat.setInputPaths(conf, getInputDir());
  66.     FileOutputFormat.setOutputPath(conf, getOutputDir());
  67.     RunningJob runningJob = JobClient.runJob(conf);
  68.     Path[] outputFiles = FileUtil.stat2Paths(
  69.                            getFileSystem().listStatus(getOutputDir(),
  70.                            new OutputLogFilter()));
  71.     if (outputFiles.length > 0) {
  72.       InputStream is = getFileSystem().open(outputFiles[0]);
  73.       BufferedReader reader = new BufferedReader(new InputStreamReader(is));
  74.       String line = reader.readLine();
  75.       int counter = 0;
  76.       while (line != null) {
  77.         counter++;
  78.         assertTrue(line.contains("hello"));
  79.         line = reader.readLine();
  80.       }
  81.       reader.close();
  82.       assertEquals(4, counter);
  83.     }
  84.     
  85.     assertEquals(4,
  86.         runningJob.getCounters().getCounter(EnumCounter.MAP_RECORDS));
  87.     assertEquals(4,
  88.         runningJob.getCounters().getGroup("StringCounter")
  89.         .getCounter("MapRecords"));
  90.   }
  91. }