SkipBadRecords.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:12k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import org.apache.hadoop.conf.Configuration;
  20. import org.apache.hadoop.fs.Path;
  21. /**
  22.  * Utility class for skip bad records functionality. It contains various 
  23.  * settings related to skipping of bad records.
  24.  * 
  25.  * <p>Hadoop provides an optional mode of execution in which the bad records
  26.  * are detected and skipped in further attempts.
  27.  * 
  28.  * <p>This feature can be used when map/reduce tasks crashes deterministically on 
  29.  * certain input. This happens due to bugs in the map/reduce function. The usual
  30.  * course would be to fix these bugs. But sometimes this is not possible; 
  31.  * perhaps the bug is in third party libraries for which the source code is 
  32.  * not available. Due to this, the task never reaches to completion even with 
  33.  * multiple attempts and complete data for that task is lost.</p>
  34.  *  
  35.  * <p>With this feature, only a small portion of data is lost surrounding 
  36.  * the bad record, which may be acceptable for some user applications.
  37.  * see {@link SkipBadRecords#setMapperMaxSkipRecords(Configuration, long)}</p>
  38.  * 
  39.  * <p>The skipping mode gets kicked off after certain no of failures 
  40.  * see {@link SkipBadRecords#setAttemptsToStartSkipping(Configuration, int)}</p>
  41.  *  
  42.  * <p>In the skipping mode, the map/reduce task maintains the record range which 
  43.  * is getting processed at all times. Before giving the input to the
  44.  * map/reduce function, it sends this record range to the Task tracker.
  45.  * If task crashes, the Task tracker knows which one was the last reported
  46.  * range. On further attempts that range get skipped.</p>
  47.  */
  48. public class SkipBadRecords {
  49.   
  50.   /**
  51.    * Special counters which are written by the application and are 
  52.    * used by the framework for detecting bad records. For detecting bad records 
  53.    * these counters must be incremented by the application.
  54.    */
  55.   public static final String COUNTER_GROUP = "SkippingTaskCounters";
  56.   
  57.   /**
  58.    * Number of processed map records.
  59.    * @see SkipBadRecords#getAutoIncrMapperProcCount(Configuration)
  60.    */
  61.   public static final String COUNTER_MAP_PROCESSED_RECORDS = 
  62.     "MapProcessedRecords";
  63.   
  64.   /**
  65.    * Number of processed reduce groups.
  66.    * @see SkipBadRecords#getAutoIncrReducerProcCount(Configuration)
  67.    */
  68.   public static final String COUNTER_REDUCE_PROCESSED_GROUPS = 
  69.     "ReduceProcessedGroups";
  70.   
  71.   private static final String ATTEMPTS_TO_START_SKIPPING = 
  72.     "mapred.skip.attempts.to.start.skipping";
  73.   private static final String AUTO_INCR_MAP_PROC_COUNT = 
  74.     "mapred.skip.map.auto.incr.proc.count";
  75.   private static final String AUTO_INCR_REDUCE_PROC_COUNT = 
  76.     "mapred.skip.reduce.auto.incr.proc.count";
  77.   private static final String OUT_PATH = "mapred.skip.out.dir";
  78.   private static final String MAPPER_MAX_SKIP_RECORDS = 
  79.     "mapred.skip.map.max.skip.records";
  80.   private static final String REDUCER_MAX_SKIP_GROUPS = 
  81.     "mapred.skip.reduce.max.skip.groups";
  82.   
  83.   /**
  84.    * Get the number of Task attempts AFTER which skip mode 
  85.    * will be kicked off. When skip mode is kicked off, the 
  86.    * tasks reports the range of records which it will process 
  87.    * next to the TaskTracker. So that on failures, TT knows which 
  88.    * ones are possibly the bad records. On further executions, 
  89.    * those are skipped.
  90.    * Default value is 2.
  91.    * 
  92.    * @param conf the configuration
  93.    * @return attemptsToStartSkipping no of task attempts
  94.    */
  95.   public static int getAttemptsToStartSkipping(Configuration conf) {
  96.     return conf.getInt(ATTEMPTS_TO_START_SKIPPING, 2);
  97.   }
  98.   /**
  99.    * Set the number of Task attempts AFTER which skip mode 
  100.    * will be kicked off. When skip mode is kicked off, the 
  101.    * tasks reports the range of records which it will process 
  102.    * next to the TaskTracker. So that on failures, TT knows which 
  103.    * ones are possibly the bad records. On further executions, 
  104.    * those are skipped.
  105.    * Default value is 2.
  106.    * 
  107.    * @param conf the configuration
  108.    * @param attemptsToStartSkipping no of task attempts
  109.    */
  110.   public static void setAttemptsToStartSkipping(Configuration conf, 
  111.       int attemptsToStartSkipping) {
  112.     conf.setInt(ATTEMPTS_TO_START_SKIPPING, attemptsToStartSkipping);
  113.   }
  114.   /**
  115.    * Get the flag which if set to true, 
  116.    * {@link SkipBadRecords#COUNTER_MAP_PROCESSED_RECORDS} is incremented 
  117.    * by MapRunner after invoking the map function. This value must be set to 
  118.    * false for applications which process the records asynchronously 
  119.    * or buffer the input records. For example streaming. 
  120.    * In such cases applications should increment this counter on their own.
  121.    * Default value is true.
  122.    * 
  123.    * @param conf the configuration
  124.    * @return <code>true</code> if auto increment 
  125.    *                       {@link SkipBadRecords#COUNTER_MAP_PROCESSED_RECORDS}.
  126.    *         <code>false</code> otherwise.
  127.    */
  128.   public static boolean getAutoIncrMapperProcCount(Configuration conf) {
  129.     return conf.getBoolean(AUTO_INCR_MAP_PROC_COUNT, true);
  130.   }
  131.   
  132.   /**
  133.    * Set the flag which if set to true, 
  134.    * {@link SkipBadRecords#COUNTER_MAP_PROCESSED_RECORDS} is incremented 
  135.    * by MapRunner after invoking the map function. This value must be set to 
  136.    * false for applications which process the records asynchronously 
  137.    * or buffer the input records. For example streaming. 
  138.    * In such cases applications should increment this counter on their own.
  139.    * Default value is true.
  140.    * 
  141.    * @param conf the configuration
  142.    * @param autoIncr whether to auto increment 
  143.    *        {@link SkipBadRecords#COUNTER_MAP_PROCESSED_RECORDS}.
  144.    */
  145.   public static void setAutoIncrMapperProcCount(Configuration conf, 
  146.       boolean autoIncr) {
  147.     conf.setBoolean(AUTO_INCR_MAP_PROC_COUNT, autoIncr);
  148.   }
  149.   
  150.   /**
  151.    * Get the flag which if set to true, 
  152.    * {@link SkipBadRecords#COUNTER_REDUCE_PROCESSED_GROUPS} is incremented 
  153.    * by framework after invoking the reduce function. This value must be set to 
  154.    * false for applications which process the records asynchronously 
  155.    * or buffer the input records. For example streaming. 
  156.    * In such cases applications should increment this counter on their own.
  157.    * Default value is true.
  158.    * 
  159.    * @param conf the configuration
  160.    * @return <code>true</code> if auto increment 
  161.    *                    {@link SkipBadRecords#COUNTER_REDUCE_PROCESSED_GROUPS}.
  162.    *         <code>false</code> otherwise.
  163.    */
  164.   public static boolean getAutoIncrReducerProcCount(Configuration conf) {
  165.     return conf.getBoolean(AUTO_INCR_REDUCE_PROC_COUNT, true);
  166.   }
  167.   
  168.   /**
  169.    * Set the flag which if set to true, 
  170.    * {@link SkipBadRecords#COUNTER_REDUCE_PROCESSED_GROUPS} is incremented 
  171.    * by framework after invoking the reduce function. This value must be set to 
  172.    * false for applications which process the records asynchronously 
  173.    * or buffer the input records. For example streaming. 
  174.    * In such cases applications should increment this counter on their own.
  175.    * Default value is true.
  176.    * 
  177.    * @param conf the configuration
  178.    * @param autoIncr whether to auto increment 
  179.    *        {@link SkipBadRecords#COUNTER_REDUCE_PROCESSED_GROUPS}.
  180.    */
  181.   public static void setAutoIncrReducerProcCount(Configuration conf, 
  182.       boolean autoIncr) {
  183.     conf.setBoolean(AUTO_INCR_REDUCE_PROC_COUNT, autoIncr);
  184.   }
  185.   
  186.   /**
  187.    * Get the directory to which skipped records are written. By default it is 
  188.    * the sub directory of the output _logs directory.
  189.    * User can stop writing skipped records by setting the value null.
  190.    * 
  191.    * @param conf the configuration.
  192.    * @return path skip output directory. Null is returned if this is not set 
  193.    * and output directory is also not set.
  194.    */
  195.   public static Path getSkipOutputPath(Configuration conf) {
  196.     String name =  conf.get(OUT_PATH);
  197.     if(name!=null) {
  198.       if("none".equals(name)) {
  199.         return null;
  200.       }
  201.       return new Path(name);
  202.     }
  203.     Path outPath = FileOutputFormat.getOutputPath(new JobConf(conf));
  204.     return outPath==null ? null : new Path(outPath, 
  205.         "_logs"+Path.SEPARATOR+"skip");
  206.   }
  207.   
  208.   /**
  209.    * Set the directory to which skipped records are written. By default it is 
  210.    * the sub directory of the output _logs directory.
  211.    * User can stop writing skipped records by setting the value null.
  212.    * 
  213.    * @param conf the configuration.
  214.    * @param path skip output directory path
  215.    */
  216.   public static void setSkipOutputPath(JobConf conf, Path path) {
  217.     String pathStr = null;
  218.     if(path==null) {
  219.       pathStr = "none";
  220.     } else {
  221.       pathStr = path.toString();
  222.     }
  223.     conf.set(OUT_PATH, pathStr);
  224.   }
  225.   
  226.   /**
  227.    * Get the number of acceptable skip records surrounding the bad record PER 
  228.    * bad record in mapper. The number includes the bad record as well.
  229.    * To turn the feature of detection/skipping of bad records off, set the 
  230.    * value to 0.
  231.    * The framework tries to narrow down the skipped range by retrying  
  232.    * until this threshold is met OR all attempts get exhausted for this task. 
  233.    * Set the value to Long.MAX_VALUE to indicate that framework need not try to 
  234.    * narrow down. Whatever records(depends on application) get skipped are 
  235.    * acceptable.
  236.    * Default value is 0.
  237.    * 
  238.    * @param conf the configuration
  239.    * @return maxSkipRecs acceptable skip records.
  240.    */
  241.   public static long getMapperMaxSkipRecords(Configuration conf) {
  242.     return conf.getLong(MAPPER_MAX_SKIP_RECORDS, 0);
  243.   }
  244.   
  245.   /**
  246.    * Set the number of acceptable skip records surrounding the bad record PER 
  247.    * bad record in mapper. The number includes the bad record as well.
  248.    * To turn the feature of detection/skipping of bad records off, set the 
  249.    * value to 0.
  250.    * The framework tries to narrow down the skipped range by retrying  
  251.    * until this threshold is met OR all attempts get exhausted for this task. 
  252.    * Set the value to Long.MAX_VALUE to indicate that framework need not try to 
  253.    * narrow down. Whatever records(depends on application) get skipped are 
  254.    * acceptable.
  255.    * Default value is 0.
  256.    * 
  257.    * @param conf the configuration
  258.    * @param maxSkipRecs acceptable skip records.
  259.    */
  260.   public static void setMapperMaxSkipRecords(Configuration conf, 
  261.       long maxSkipRecs) {
  262.     conf.setLong(MAPPER_MAX_SKIP_RECORDS, maxSkipRecs);
  263.   }
  264.   
  265.   /**
  266.    * Get the number of acceptable skip groups surrounding the bad group PER 
  267.    * bad group in reducer. The number includes the bad group as well.
  268.    * To turn the feature of detection/skipping of bad groups off, set the 
  269.    * value to 0.
  270.    * The framework tries to narrow down the skipped range by retrying  
  271.    * until this threshold is met OR all attempts get exhausted for this task. 
  272.    * Set the value to Long.MAX_VALUE to indicate that framework need not try to 
  273.    * narrow down. Whatever groups(depends on application) get skipped are 
  274.    * acceptable.
  275.    * Default value is 0.
  276.    * 
  277.    * @param conf the configuration
  278.    * @return maxSkipGrps acceptable skip groups.
  279.    */
  280.   public static long getReducerMaxSkipGroups(Configuration conf) {
  281.     return conf.getLong(REDUCER_MAX_SKIP_GROUPS, 0);
  282.   }
  283.   
  284.   /**
  285.    * Set the number of acceptable skip groups surrounding the bad group PER 
  286.    * bad group in reducer. The number includes the bad group as well.
  287.    * To turn the feature of detection/skipping of bad groups off, set the 
  288.    * value to 0.
  289.    * The framework tries to narrow down the skipped range by retrying  
  290.    * until this threshold is met OR all attempts get exhausted for this task. 
  291.    * Set the value to Long.MAX_VALUE to indicate that framework need not try to 
  292.    * narrow down. Whatever groups(depends on application) get skipped are 
  293.    * acceptable.
  294.    * Default value is 0.
  295.    * 
  296.    * @param conf the configuration
  297.    * @param maxSkipGrps acceptable skip groups.
  298.    */
  299.   public static void setReducerMaxSkipGroups(Configuration conf, 
  300.       long maxSkipGrps) {
  301.     conf.setLong(REDUCER_MAX_SKIP_GROUPS, maxSkipGrps);
  302.   }
  303. }