TestSpilledRecordsCounter.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:7k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.File;
  20. import java.io.FileWriter;
  21. import java.io.Writer;
  22. import java.io.BufferedWriter;
  23. import java.io.IOException;
  24. import junit.framework.TestCase;
  25. import org.apache.hadoop.conf.Configuration;
  26. import org.apache.hadoop.fs.FileSystem;
  27. import org.apache.hadoop.fs.Path;
  28. import org.apache.hadoop.io.IntWritable;
  29. import org.apache.hadoop.io.Text;
  30. /**
  31.  * This is an wordcount application that tests the count of records
  32.  * got spilled to disk. It generates simple text input files. Then
  33.  * runs the wordcount map/reduce application on (1) 3 i/p files(with 3 maps
  34.  * and 1 reduce) and verifies the counters and (2) 4 i/p files(with 4 maps
  35.  * and 1 reduce) and verifies counters. Wordcount application reads the
  36.  * text input files, breaks each line into words and counts them. The output
  37.  * is a locally sorted list of words and the count of how often they occurred.
  38.  *
  39.  */
  40. public class TestSpilledRecordsCounter extends TestCase {
  41.   private void validateCounters(Counters counter, long spillRecCnt) {
  42.       // Check if the numer of Spilled Records is same as expected
  43.       assertEquals(counter.findCounter(Task.Counter.SPILLED_RECORDS).
  44.                      getCounter(), spillRecCnt);
  45.   }
  46.   private void createWordsFile(File inpFile) throws Exception {
  47.     Writer out = new BufferedWriter(new FileWriter(inpFile));
  48.     try {
  49.       // 500*4 unique words --- repeated 5 times => 5*2K words
  50.       int REPLICAS=5, NUMLINES=500, NUMWORDSPERLINE=4;
  51.       for (int i = 0; i < REPLICAS; i++) {
  52.         for (int j = 1; j <= NUMLINES*NUMWORDSPERLINE; j+=NUMWORDSPERLINE) {
  53.           out.write("word" + j + " word" + (j+1) + " word" + (j+2) + " word" + (j+3) + 'n');
  54.         }
  55.       }
  56.     } finally {
  57.       out.close();
  58.     }
  59.   }
  60.   /**
  61.    * The main driver for word count map/reduce program.
  62.    * Invoke this method to submit the map/reduce job.
  63.    * @throws IOException When there is communication problems with the
  64.    *                     job tracker.
  65.    */
  66.   public void testSpillCounter() throws Exception {
  67.     JobConf conf = new JobConf(TestSpilledRecordsCounter.class);
  68.     conf.setJobName("wordcountSpilledRecordsCounter");
  69.     // the keys are words (strings)
  70.     conf.setOutputKeyClass(Text.class);
  71.     // the values are counts (ints)
  72.     conf.setOutputValueClass(IntWritable.class);
  73.     conf.setMapperClass(WordCount.MapClass.class);
  74.     conf.setCombinerClass(WordCount.Reduce.class);
  75.     conf.setReducerClass(WordCount.Reduce.class);
  76.     conf.setNumMapTasks(3);
  77.     conf.setNumReduceTasks(1);
  78.     conf.setInt("io.sort.mb", 1);
  79.     conf.setInt("io.sort.factor", 2);
  80.     conf.set("io.sort.record.percent", "0.05");
  81.     conf.set("io.sort.spill.percent", "0.80");
  82.     String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data",
  83.                                       File.separator + "tmp"))
  84.                                .toString().replace(' ', '+');
  85.     conf.set("test.build.data", TEST_ROOT_DIR);
  86.     String IN_DIR = TEST_ROOT_DIR + File.separator +
  87.                       "spilledRecords.countertest" +  File.separator +
  88.                       "genins" + File.separator;
  89.     String OUT_DIR = TEST_ROOT_DIR + File.separator +
  90.                       "spilledRecords.countertest" + File.separator;
  91.     FileSystem fs = FileSystem.get(conf);
  92.     Path testdir = new Path(TEST_ROOT_DIR, "spilledRecords.countertest");
  93.     try {
  94.       if (fs.exists(testdir)) {
  95.         fs.delete(testdir, true);
  96.       }
  97.       if (!fs.mkdirs(testdir)) {
  98.         throw new IOException("Mkdirs failed to create " + testdir.toString());
  99.       }
  100.       Path wordsIns = new Path(testdir, "genins");
  101.       if (!fs.mkdirs(wordsIns)) {
  102.         throw new IOException("Mkdirs failed to create " + wordsIns.toString());
  103.       }
  104.       //create 3 input files each with 5*2k words
  105.       File inpFile = new File(IN_DIR + "input5_2k_1");
  106.       createWordsFile(inpFile);
  107.       inpFile = new File(IN_DIR + "input5_2k_2");
  108.       createWordsFile(inpFile);
  109.       inpFile = new File(IN_DIR + "input5_2k_3");
  110.       createWordsFile(inpFile);
  111.       FileInputFormat.setInputPaths(conf, IN_DIR);
  112.       Path outputPath1=new Path(OUT_DIR, "output5_2k_3");
  113.       FileOutputFormat.setOutputPath(conf, outputPath1);
  114.       RunningJob myJob = JobClient.runJob(conf);
  115.       Counters c1 = myJob.getCounters();
  116.       // 3maps & in each map, 4 first level spills --- So total 12.
  117.       // spilled records count:
  118.       // Each Map: 1st level:2k+2k+2k+2k=8k;2ndlevel=4k+4k=8k;
  119.       //           3rd level=2k(4k from 1st level & 4k from 2nd level & combineAndSpill)
  120.       //           So total 8k+8k+2k=18k
  121.       // For 3 Maps, total = 3*18=54k
  122.       // Reduce: each of the 3 map o/p's(2k each) will be spilled in shuffleToDisk()
  123.       //         So 3*2k=6k in 1st level; 2nd level:4k(2k+2k);
  124.       //         3rd level directly given to reduce(4k+2k --- combineAndSpill => 2k.
  125.       //         So 0 records spilled to disk in 3rd level)
  126.       //         So total of 6k+4k=10k
  127.       // Total job counter will be 54k+10k = 64k
  128.       validateCounters(c1, 64000);
  129.       //create 4th input file each with 5*2k words and test with 4 maps
  130.       inpFile = new File(IN_DIR + "input5_2k_4");
  131.       createWordsFile(inpFile);
  132.       conf.setNumMapTasks(4);
  133.       Path outputPath2=new Path(OUT_DIR, "output5_2k_4");
  134.       FileOutputFormat.setOutputPath(conf, outputPath2);
  135.       myJob = JobClient.runJob(conf);
  136.       c1 = myJob.getCounters();
  137.       // 4maps & in each map 4 first level spills --- So total 16.
  138.       // spilled records count:
  139.       // Each Map: 1st level:2k+2k+2k+2k=8k;2ndlevel=4k+4k=8k;
  140.       //           3rd level=2k(4k from 1st level & 4k from 2nd level & combineAndSpill)
  141.       //           So total 8k+8k+2k=18k
  142.       // For 3 Maps, total = 4*18=72k
  143.       // Reduce: each of the 4 map o/p's(2k each) will be spilled in shuffleToDisk()
  144.       //         So 4*2k=8k in 1st level; 2nd level:4k+4k=8k;
  145.       //         3rd level directly given to reduce(4k+4k --- combineAndSpill => 2k.
  146.       //         So 0 records spilled to disk in 3rd level)
  147.       //         So total of 8k+8k=16k
  148.       // Total job counter will be 72k+16k = 88k
  149.       validateCounters(c1, 88000);
  150.     } finally {
  151.       //clean up the input and output files
  152.       if (fs.exists(testdir)) {
  153.         fs.delete(testdir, true);
  154.       }
  155.     }
  156.   }
  157. }