TestStreamingBadRecords.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:11k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.streaming;
  19. import java.io.BufferedReader;
  20. import java.io.IOException;
  21. import java.io.InputStream;
  22. import java.io.InputStreamReader;
  23. import java.io.OutputStream;
  24. import java.io.OutputStreamWriter;
  25. import java.io.Writer;
  26. import java.util.ArrayList;
  27. import java.util.Arrays;
  28. import java.util.List;
  29. import java.util.StringTokenizer;
  30. import org.apache.commons.logging.Log;
  31. import org.apache.commons.logging.LogFactory;
  32. import org.apache.hadoop.fs.FileUtil;
  33. import org.apache.hadoop.fs.Path;
  34. import org.apache.hadoop.mapred.ClusterMapReduceTestCase;
  35. import org.apache.hadoop.mapred.Counters;
  36. import org.apache.hadoop.mapred.JobConf;
  37. import org.apache.hadoop.mapred.OutputLogFilter;
  38. import org.apache.hadoop.mapred.RunningJob;
  39. import org.apache.hadoop.mapred.SkipBadRecords;
  40. public class TestStreamingBadRecords extends ClusterMapReduceTestCase
  41. {
  42.   private static final Log LOG = 
  43.     LogFactory.getLog(TestStreamingBadRecords.class);
  44.   
  45.   private static final List<String> MAPPER_BAD_RECORDS = 
  46.     Arrays.asList("hey022","hey023","hey099");
  47.   
  48.   private static final List<String> REDUCER_BAD_RECORDS = 
  49.     Arrays.asList("hey001","hey018");
  50.   
  51.   private static final String badMapper = 
  52.     StreamUtil.makeJavaCommand(BadApp.class, new String[]{});
  53.   private static final String badReducer = 
  54.     StreamUtil.makeJavaCommand(BadApp.class, new String[]{"true"});
  55.   private static final int INPUTSIZE=100;
  56.   
  57.   public TestStreamingBadRecords() throws IOException
  58.   {
  59.     UtilTest utilTest = new UtilTest(getClass().getName());
  60.     utilTest.checkUserDir();
  61.     utilTest.redirectIfAntJunit();
  62.   }
  63.   
  64.   private void createInput() throws Exception {
  65.     OutputStream os = getFileSystem().create(new Path(getInputDir(), 
  66.         "text.txt"));
  67.     Writer wr = new OutputStreamWriter(os);
  68.     //increasing the record size so that we have stream flushing
  69.     String prefix = new String(new byte[20*1024]);
  70.     for(int i=1;i<=INPUTSIZE;i++) {
  71.       String str = ""+i;
  72.       int zerosToPrepend = 3 - str.length();
  73.       for(int j=0;j<zerosToPrepend;j++){
  74.         str = "0"+str;
  75.       }
  76.       wr.write(prefix + "hey"+str+"n");
  77.     }wr.close();
  78.   }
  79.   
  80.   private void validateOutput(RunningJob runningJob, boolean validateCount) 
  81.     throws Exception {
  82.     LOG.info(runningJob.getCounters().toString());
  83.     assertTrue(runningJob.isSuccessful());
  84.     
  85.     if(validateCount) {
  86.      //validate counters
  87.       String counterGrp = "org.apache.hadoop.mapred.Task$Counter";
  88.       Counters counters = runningJob.getCounters();
  89.       assertEquals(counters.findCounter(counterGrp, "MAP_SKIPPED_RECORDS").
  90.           getCounter(),MAPPER_BAD_RECORDS.size());
  91.       
  92.       int mapRecs = INPUTSIZE - MAPPER_BAD_RECORDS.size();
  93.       assertEquals(counters.findCounter(counterGrp, "MAP_INPUT_RECORDS").
  94.           getCounter(),mapRecs);
  95.       assertEquals(counters.findCounter(counterGrp, "MAP_OUTPUT_RECORDS").
  96.           getCounter(),mapRecs);
  97.       
  98.       int redRecs = mapRecs - REDUCER_BAD_RECORDS.size();
  99.       assertEquals(counters.findCounter(counterGrp, "REDUCE_SKIPPED_RECORDS").
  100.           getCounter(),REDUCER_BAD_RECORDS.size());
  101.       assertEquals(counters.findCounter(counterGrp, "REDUCE_SKIPPED_GROUPS").
  102.           getCounter(),REDUCER_BAD_RECORDS.size());
  103.       assertEquals(counters.findCounter(counterGrp, "REDUCE_INPUT_GROUPS").
  104.           getCounter(),redRecs);
  105.       assertEquals(counters.findCounter(counterGrp, "REDUCE_INPUT_RECORDS").
  106.           getCounter(),redRecs);
  107.       assertEquals(counters.findCounter(counterGrp, "REDUCE_OUTPUT_RECORDS").
  108.           getCounter(),redRecs);
  109.     }
  110.     
  111.     List<String> badRecs = new ArrayList<String>();
  112.     badRecs.addAll(MAPPER_BAD_RECORDS);
  113.     badRecs.addAll(REDUCER_BAD_RECORDS);
  114.     Path[] outputFiles = FileUtil.stat2Paths(
  115.         getFileSystem().listStatus(getOutputDir(),
  116.         new OutputLogFilter()));
  117.     
  118.     if (outputFiles.length > 0) {
  119.       InputStream is = getFileSystem().open(outputFiles[0]);
  120.       BufferedReader reader = new BufferedReader(new InputStreamReader(is));
  121.       String line = reader.readLine();
  122.       int counter = 0;
  123.       while (line != null) {
  124.         counter++;
  125.         StringTokenizer tokeniz = new StringTokenizer(line, "t");
  126.         String value = tokeniz.nextToken();
  127.         int index = value.indexOf("hey");
  128.         assertTrue(index>-1);
  129.         if(index>-1) {
  130.           String heyStr = value.substring(index);
  131.           assertTrue(!badRecs.contains(heyStr));
  132.         }
  133.         
  134.         line = reader.readLine();
  135.       }
  136.       reader.close();
  137.       if(validateCount) {
  138.         assertEquals(INPUTSIZE-badRecs.size(), counter);
  139.       }
  140.     }
  141.   }
  142.   public void testSkip() throws Exception {
  143.     JobConf clusterConf = createJobConf();
  144.     createInput();
  145.     int attSkip =0;
  146.     SkipBadRecords.setAttemptsToStartSkipping(clusterConf,attSkip);
  147.     //the no of attempts to successfully complete the task depends 
  148.     //on the no of bad records.
  149.     int mapperAttempts = attSkip+1+MAPPER_BAD_RECORDS.size();
  150.     int reducerAttempts = attSkip+1+REDUCER_BAD_RECORDS.size();
  151.     
  152.     String[] args =  new String[] {
  153.       "-input", (new Path(getInputDir(), "text.txt")).toString(),
  154.       "-output", getOutputDir().toString(),
  155.       "-mapper", badMapper,
  156.       "-reducer", badReducer,
  157.       "-verbose",
  158.       "-inputformat", "org.apache.hadoop.mapred.KeyValueTextInputFormat",
  159.       "-jobconf", "mapred.skip.attempts.to.start.skipping="+attSkip,
  160.       "-jobconf", "mapred.skip.out.dir=none",
  161.       "-jobconf", "mapred.map.max.attempts="+mapperAttempts,
  162.       "-jobconf", "mapred.reduce.max.attempts="+reducerAttempts,
  163.       "-jobconf", "mapred.skip.map.max.skip.records="+Long.MAX_VALUE,
  164.       "-jobconf", "mapred.skip.reduce.max.skip.groups="+Long.MAX_VALUE,
  165.       "-jobconf", "mapred.map.tasks=1",
  166.       "-jobconf", "mapred.reduce.tasks=1",
  167.       "-jobconf", "fs.default.name="+clusterConf.get("fs.default.name"),
  168.       "-jobconf", "mapred.job.tracker="+clusterConf.get("mapred.job.tracker"),
  169.       "-jobconf", "mapred.job.tracker.http.address="
  170.                     +clusterConf.get("mapred.job.tracker.http.address"),
  171.       "-jobconf", "stream.debug=set",
  172.       "-jobconf", "keep.failed.task.files=true",
  173.       "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp")
  174.     };
  175.     StreamJob job = new StreamJob(args, false);      
  176.     job.go();
  177.     validateOutput(job.running_, false);
  178.     //validate that there is no skip directory as it has been set to "none"
  179.     assertTrue(SkipBadRecords.getSkipOutputPath(job.jobConf_)==null);
  180.   }
  181.   
  182.   public void testNarrowDown() throws Exception {
  183.     createInput();
  184.     JobConf clusterConf = createJobConf();
  185.     String[] args =  new String[] {
  186.       "-input", (new Path(getInputDir(), "text.txt")).toString(),
  187.       "-output", getOutputDir().toString(),
  188.       "-mapper", badMapper,
  189.       "-reducer", badReducer,
  190.       "-verbose",
  191.       "-inputformat", "org.apache.hadoop.mapred.KeyValueTextInputFormat",
  192.       "-jobconf", "mapred.skip.attempts.to.start.skipping=1",
  193.       //actually fewer attempts are required than specified
  194.       //but to cater to the case of slow processed counter update, need to 
  195.       //have more attempts
  196.       "-jobconf", "mapred.map.max.attempts=20",
  197.       "-jobconf", "mapred.reduce.max.attempts=15",
  198.       "-jobconf", "mapred.skip.map.max.skip.records=1",
  199.       "-jobconf", "mapred.skip.reduce.max.skip.groups=1",
  200.       "-jobconf", "mapred.map.tasks=1",
  201.       "-jobconf", "mapred.reduce.tasks=1",
  202.       "-jobconf", "fs.default.name="+clusterConf.get("fs.default.name"),
  203.       "-jobconf", "mapred.job.tracker="+clusterConf.get("mapred.job.tracker"),
  204.       "-jobconf", "mapred.job.tracker.http.address="
  205.                     +clusterConf.get("mapred.job.tracker.http.address"),
  206.       "-jobconf", "stream.debug=set",
  207.       "-jobconf", "keep.failed.task.files=true",
  208.       "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp")
  209.     };
  210.     StreamJob job = new StreamJob(args, false);      
  211.     job.go();
  212.     
  213.     validateOutput(job.running_, true);
  214.     assertTrue(SkipBadRecords.getSkipOutputPath(job.jobConf_)!=null);
  215.   }
  216.   
  217.   static class App{
  218.     boolean isReducer;
  219.     
  220.     public App(String[] args) throws Exception{
  221.       if(args.length>0) {
  222.         isReducer = Boolean.parseBoolean(args[0]);
  223.       }
  224.       String counter = SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS;
  225.       if(isReducer) {
  226.         counter = SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS;
  227.       }
  228.       BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
  229.       String line;
  230.       int count = 0;
  231.       while ((line = in.readLine()) != null) {
  232.         processLine(line);
  233.         count++;
  234.         if(count>=10) {
  235.           System.err.println("reporter:counter:"+SkipBadRecords.COUNTER_GROUP+
  236.               ","+counter+","+count);
  237.           count = 0;
  238.         }
  239.       }
  240.     }
  241.     
  242.     protected void processLine(String line) throws Exception{
  243.       System.out.println(line);
  244.     }
  245.     
  246.     
  247.     public static void main(String[] args) throws Exception{
  248.       new App(args);
  249.     }
  250.   }
  251.   
  252.   static class BadApp extends App{
  253.     
  254.     public BadApp(String[] args) throws Exception {
  255.       super(args);
  256.     }
  257.     protected void processLine(String line) throws Exception {
  258.       List<String> badRecords = MAPPER_BAD_RECORDS;
  259.       if(isReducer) {
  260.         badRecords = REDUCER_BAD_RECORDS;
  261.       }
  262.       if(badRecords.size()>0 && line.contains(badRecords.get(0))) {
  263.         LOG.warn("Encountered BAD record");
  264.         System.exit(-1);
  265.       }
  266.       else if(badRecords.size()>1 && line.contains(badRecords.get(1))) {
  267.         LOG.warn("Encountered BAD record");
  268.         throw new Exception("Got bad record..crashing");
  269.       }
  270.       else if(badRecords.size()>2 && line.contains(badRecords.get(2))) {
  271.         LOG.warn("Encountered BAD record");
  272.         System.exit(-1);
  273.       }
  274.       super.processLine(line);
  275.     }
  276.     
  277.     public static void main(String[] args) throws Exception{
  278.       new BadApp(args);
  279.     }
  280.   }
  281.   
  282.   
  283. }