TestBadRecords.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:10k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.BufferedReader;
  20. import java.io.IOException;
  21. import java.io.InputStream;
  22. import java.io.InputStreamReader;
  23. import java.io.OutputStream;
  24. import java.io.OutputStreamWriter;
  25. import java.io.Writer;
  26. import java.util.ArrayList;
  27. import java.util.Arrays;
  28. import java.util.Iterator;
  29. import java.util.List;
  30. import java.util.StringTokenizer;
  31. import org.apache.commons.logging.Log;
  32. import org.apache.commons.logging.LogFactory;
  33. import org.apache.hadoop.fs.FileUtil;
  34. import org.apache.hadoop.fs.Path;
  35. import org.apache.hadoop.io.LongWritable;
  36. import org.apache.hadoop.io.SequenceFile;
  37. import org.apache.hadoop.io.Text;
  38. import org.apache.hadoop.util.ReflectionUtils;
  39. public class TestBadRecords extends ClusterMapReduceTestCase {
  40.   
  41.   private static final Log LOG = 
  42.     LogFactory.getLog(TestBadRecords.class);
  43.   
  44.   private static final List<String> MAPPER_BAD_RECORDS = 
  45.     Arrays.asList("hello01","hello04","hello05");
  46.   
  47.   private static final List<String> REDUCER_BAD_RECORDS = 
  48.     Arrays.asList("hello08","hello10");
  49.   
  50.   private List<String> input;
  51.   
  52.   public TestBadRecords() {
  53.     input = new ArrayList<String>();
  54.     for(int i=1;i<=10;i++) {
  55.       String str = ""+i;
  56.       int zerosToPrepend = 2 - str.length();
  57.       for(int j=0;j<zerosToPrepend;j++){
  58.         str = "0"+str;
  59.       }
  60.       input.add("hello"+str);
  61.     }
  62.   }
  63.   
  64.   private void runMapReduce(JobConf conf, 
  65.       List<String> mapperBadRecords, List<String> redBadRecords) 
  66.         throws Exception {
  67.     createInput();
  68.     conf.setJobName("mr");
  69.     conf.setNumMapTasks(1);
  70.     conf.setNumReduceTasks(1);
  71.     conf.setInt("mapred.task.timeout", 30*1000);
  72.     SkipBadRecords.setMapperMaxSkipRecords(conf, Long.MAX_VALUE);
  73.     SkipBadRecords.setReducerMaxSkipGroups(conf, Long.MAX_VALUE);
  74.     
  75.     SkipBadRecords.setAttemptsToStartSkipping(conf,0);
  76.     //the no of attempts to successfully complete the task depends 
  77.     //on the no of bad records.
  78.     conf.setMaxMapAttempts(SkipBadRecords.getAttemptsToStartSkipping(conf)+1+
  79.         mapperBadRecords.size());
  80.     conf.setMaxReduceAttempts(SkipBadRecords.getAttemptsToStartSkipping(conf)+
  81.         1+redBadRecords.size());
  82.     
  83.     FileInputFormat.setInputPaths(conf, getInputDir());
  84.     FileOutputFormat.setOutputPath(conf, getOutputDir());
  85.     conf.setInputFormat(TextInputFormat.class);
  86.     conf.setMapOutputKeyClass(LongWritable.class);
  87.     conf.setMapOutputValueClass(Text.class);
  88.     conf.setOutputFormat(TextOutputFormat.class);
  89.     conf.setOutputKeyClass(LongWritable.class);
  90.     conf.setOutputValueClass(Text.class);
  91.     RunningJob runningJob = JobClient.runJob(conf);
  92.     validateOutput(conf, runningJob, mapperBadRecords, redBadRecords);
  93.   }
  94.   
  95.   
  96.   private void createInput() throws Exception {
  97.     OutputStream os = getFileSystem().create(new Path(getInputDir(), 
  98.         "text.txt"));
  99.     Writer wr = new OutputStreamWriter(os);
  100.     for(String inp : input) {
  101.       wr.write(inp+"n");
  102.     }wr.close();
  103.   }
  104.   
  105.   private void validateOutput(JobConf conf, RunningJob runningJob, 
  106.       List<String> mapperBadRecords, List<String> redBadRecords) 
  107.     throws Exception{
  108.     LOG.info(runningJob.getCounters().toString());
  109.     assertTrue(runningJob.isSuccessful());
  110.     
  111.     //validate counters
  112.     Counters counters = runningJob.getCounters();
  113.     assertEquals(counters.findCounter(Task.Counter.MAP_SKIPPED_RECORDS).
  114.         getCounter(),mapperBadRecords.size());
  115.     
  116.     int mapRecs = input.size() - mapperBadRecords.size();
  117.     assertEquals(counters.findCounter(Task.Counter.MAP_INPUT_RECORDS).
  118.         getCounter(),mapRecs);
  119.     assertEquals(counters.findCounter(Task.Counter.MAP_OUTPUT_RECORDS).
  120.         getCounter(),mapRecs);
  121.     
  122.     int redRecs = mapRecs - redBadRecords.size();
  123.     assertEquals(counters.findCounter(Task.Counter.REDUCE_SKIPPED_RECORDS).
  124.         getCounter(),redBadRecords.size());
  125.     assertEquals(counters.findCounter(Task.Counter.REDUCE_SKIPPED_GROUPS).
  126.         getCounter(),redBadRecords.size());
  127.     assertEquals(counters.findCounter(Task.Counter.REDUCE_INPUT_GROUPS).
  128.         getCounter(),redRecs);
  129.     assertEquals(counters.findCounter(Task.Counter.REDUCE_INPUT_RECORDS).
  130.         getCounter(),redRecs);
  131.     assertEquals(counters.findCounter(Task.Counter.REDUCE_OUTPUT_RECORDS).
  132.         getCounter(),redRecs);
  133.     
  134.     //validate skipped records
  135.     Path skipDir = SkipBadRecords.getSkipOutputPath(conf);
  136.     Path[] skips = FileUtil.stat2Paths(getFileSystem().listStatus(skipDir));
  137.     List<String> mapSkipped = new ArrayList<String>();
  138.     List<String> redSkipped = new ArrayList<String>();
  139.     for(Path skipPath : skips) {
  140.       LOG.info("skipPath: " + skipPath);
  141.       
  142.       SequenceFile.Reader reader = new SequenceFile.Reader(
  143.           getFileSystem(), skipPath, conf);
  144.       Object key = ReflectionUtils.newInstance(reader.getKeyClass(), conf);
  145.       Object value = ReflectionUtils.newInstance(reader.getValueClass(), 
  146.           conf);
  147.       key = reader.next(key);
  148.       while(key!=null) {
  149.         value = reader.getCurrentValue(value);
  150.         LOG.debug("key:"+key+" value:"+value.toString());
  151.         if(skipPath.getName().contains("_r_")) {
  152.           redSkipped.add(value.toString());
  153.         } else {
  154.           mapSkipped.add(value.toString());
  155.         }
  156.         key = reader.next(key);
  157.       }
  158.       reader.close();
  159.     }
  160.     assertTrue(mapSkipped.containsAll(mapperBadRecords));
  161.     assertTrue(redSkipped.containsAll(redBadRecords));
  162.     
  163.     Path[] outputFiles = FileUtil.stat2Paths(
  164.         getFileSystem().listStatus(getOutputDir(),
  165.         new OutputLogFilter()));
  166.     
  167.     List<String> mapperOutput=getProcessed(input, mapperBadRecords);
  168.     LOG.debug("mapperOutput " + mapperOutput.size());
  169.     List<String> reducerOutput=getProcessed(mapperOutput, redBadRecords);
  170.     LOG.debug("reducerOutput " + reducerOutput.size());
  171.     
  172.    if (outputFiles.length > 0) {
  173.       InputStream is = getFileSystem().open(outputFiles[0]);
  174.       BufferedReader reader = new BufferedReader(new InputStreamReader(is));
  175.       String line = reader.readLine();
  176.       int counter = 0;
  177.       while (line != null) {
  178.         counter++;
  179.         StringTokenizer tokeniz = new StringTokenizer(line, "t");
  180.         String key = tokeniz.nextToken();
  181.         String value = tokeniz.nextToken();
  182.         LOG.debug("Output: key:"+key + "  value:"+value);
  183.         assertTrue(value.contains("hello"));
  184.         
  185.         
  186.         assertTrue(reducerOutput.contains(value));
  187.         line = reader.readLine();
  188.       }
  189.       reader.close();
  190.       assertEquals(reducerOutput.size(), counter);
  191.     }
  192.   }
  193.   
  194.   private List<String> getProcessed(List<String> inputs, List<String> badRecs) {
  195.     List<String> processed = new ArrayList<String>();
  196.     for(String input : inputs) {
  197.       if(!badRecs.contains(input)) {
  198.         processed.add(input);
  199.       }
  200.     }
  201.     return processed;
  202.   }
  203.   
  204.   public void testBadMapRed() throws Exception {
  205.     JobConf conf = createJobConf();
  206.     conf.setMapperClass(BadMapper.class);
  207.     conf.setReducerClass(BadReducer.class);
  208.     runMapReduce(conf, MAPPER_BAD_RECORDS, REDUCER_BAD_RECORDS);
  209.   }
  210.   
  211.     
  212.   static class BadMapper extends MapReduceBase implements 
  213.     Mapper<LongWritable, Text, LongWritable, Text> {
  214.     
  215.     public void map(LongWritable key, Text val,
  216.         OutputCollector<LongWritable, Text> output, Reporter reporter)
  217.         throws IOException {
  218.       String str = val.toString();
  219.       LOG.debug("MAP key:" +key +"  value:" + str);
  220.       if(MAPPER_BAD_RECORDS.get(0).equals(str)) {
  221.         LOG.warn("MAP Encountered BAD record");
  222.         System.exit(-1);
  223.       }
  224.       else if(MAPPER_BAD_RECORDS.get(1).equals(str)) {
  225.         LOG.warn("MAP Encountered BAD record");
  226.         throw new RuntimeException("Bad record "+str);
  227.       }
  228.       else if(MAPPER_BAD_RECORDS.get(2).equals(str)) {
  229.         try {
  230.           LOG.warn("MAP Encountered BAD record");
  231.           Thread.sleep(15*60*1000);
  232.         } catch (InterruptedException e) {
  233.           e.printStackTrace();
  234.         }
  235.       }
  236.       output.collect(key, val);
  237.     }
  238.   }
  239.   
  240.   static class BadReducer extends MapReduceBase implements 
  241.     Reducer<LongWritable, Text, LongWritable, Text> {
  242.     
  243.     public void reduce(LongWritable key, Iterator<Text> values,
  244.         OutputCollector<LongWritable, Text> output, Reporter reporter)
  245.         throws IOException {
  246.       while(values.hasNext()) {
  247.         Text value = values.next();
  248.         LOG.debug("REDUCE key:" +key +"  value:" + value);
  249.         if(REDUCER_BAD_RECORDS.get(0).equals(value.toString())) {
  250.           LOG.warn("REDUCE Encountered BAD record");
  251.           System.exit(-1);
  252.         }
  253.         else if(REDUCER_BAD_RECORDS.get(1).equals(value.toString())) {
  254.           try {
  255.             LOG.warn("REDUCE Encountered BAD record");
  256.             Thread.sleep(15*60*1000);
  257.           } catch (InterruptedException e) {
  258.             e.printStackTrace();
  259.           }
  260.         }
  261.         output.collect(key, value);
  262.       }
  263.       
  264.     }
  265.   }
  266.   
  267. }