SequenceFileInputFilter.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:9k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18.  
  19. package org.apache.hadoop.mapred;
  20. import java.io.IOException;
  21. import java.nio.ByteBuffer;
  22. import java.security.DigestException;
  23. import java.security.MessageDigest;
  24. import java.security.NoSuchAlgorithmException;
  25. import java.util.regex.Pattern;
  26. import java.util.regex.PatternSyntaxException;
  27. import org.apache.hadoop.conf.Configurable;
  28. import org.apache.hadoop.conf.Configuration;
  29. import org.apache.hadoop.io.BytesWritable;
  30. import org.apache.hadoop.io.Text;
  31. import org.apache.hadoop.util.ReflectionUtils;
  32. /**
  33.  * A class that allows a map/red job to work on a sample of sequence files.
  34.  * The sample is decided by the filter class set by the job.
  35.  * 
  36.  */
  37. public class SequenceFileInputFilter<K, V>
  38.   extends SequenceFileInputFormat<K, V> {
  39.   
  40.   final private static String FILTER_CLASS = "sequencefile.filter.class";
  41.   final private static String FILTER_FREQUENCY
  42.     = "sequencefile.filter.frequency";
  43.   final private static String FILTER_REGEX = "sequencefile.filter.regex";
  44.     
  45.   public SequenceFileInputFilter() {
  46.   }
  47.     
  48.   /** Create a record reader for the given split
  49.    * @param split file split
  50.    * @param job job configuration
  51.    * @param reporter reporter who sends report to task tracker
  52.    * @return RecordReader
  53.    */
  54.   public RecordReader<K, V> getRecordReader(InputSplit split,
  55.                                       JobConf job, Reporter reporter)
  56.     throws IOException {
  57.         
  58.     reporter.setStatus(split.toString());
  59.         
  60.     return new FilterRecordReader<K, V>(job, (FileSplit) split);
  61.   }
  62.   /** set the filter class
  63.    * 
  64.    * @param conf application configuration
  65.    * @param filterClass filter class
  66.    */
  67.   public static void setFilterClass(Configuration conf, Class filterClass) {
  68.     conf.set(FILTER_CLASS, filterClass.getName());
  69.   }
  70.          
  71.   /**
  72.    * filter interface
  73.    */
  74.   public interface Filter extends Configurable {
  75.     /** filter function
  76.      * Decide if a record should be filtered or not
  77.      * @param key record key
  78.      * @return true if a record is accepted; return false otherwise
  79.      */
  80.     public abstract boolean accept(Object key);
  81.   }
  82.     
  83.   /**
  84.    * base class for Filters
  85.    */
  86.   public static abstract class FilterBase implements Filter {
  87.     Configuration conf;
  88.         
  89.     public Configuration getConf() {
  90.       return conf;
  91.     }
  92.   }
  93.     
  94.   /** Records filter by matching key to regex
  95.    */
  96.   public static class RegexFilter extends FilterBase {
  97.     private Pattern p;
  98.     /** Define the filtering regex and stores it in conf
  99.      * @param conf where the regex is set
  100.      * @param regex regex used as a filter
  101.      */
  102.     public static void setPattern(Configuration conf, String regex)
  103.       throws PatternSyntaxException {
  104.       try {
  105.         Pattern.compile(regex);
  106.       } catch (PatternSyntaxException e) {
  107.         throw new IllegalArgumentException("Invalid pattern: "+regex);
  108.       }
  109.       conf.set(FILTER_REGEX, regex);
  110.     }
  111.         
  112.     public RegexFilter() { }
  113.         
  114.     /** configure the Filter by checking the configuration
  115.      */
  116.     public void setConf(Configuration conf) {
  117.       String regex = conf.get(FILTER_REGEX);
  118.       if (regex==null)
  119.         throw new RuntimeException(FILTER_REGEX + "not set");
  120.       this.p = Pattern.compile(regex);
  121.       this.conf = conf;
  122.     }
  123.     /** Filtering method
  124.      * If key matches the regex, return true; otherwise return false
  125.      * @see org.apache.hadoop.mapred.SequenceFileInputFilter.Filter#accept(Object)
  126.      */
  127.     public boolean accept(Object key) {
  128.       return p.matcher(key.toString()).matches();
  129.     }
  130.   }
  131.   /** This class returns a percentage of records
  132.    * The percentage is determined by a filtering frequency <i>f</i> using
  133.    * the criteria record# % f == 0.
  134.    * For example, if the frequency is 10, one out of 10 records is returned.
  135.    */
  136.   public static class PercentFilter extends FilterBase {
  137.     private int frequency;
  138.     private int count;
  139.     /** set the frequency and stores it in conf
  140.      * @param conf configuration
  141.      * @param frequency filtering frequencey
  142.      */
  143.     public static void setFrequency(Configuration conf, int frequency){
  144.       if (frequency<=0)
  145.         throw new IllegalArgumentException(
  146.                                            "Negative " + FILTER_FREQUENCY + ": "+frequency);
  147.       conf.setInt(FILTER_FREQUENCY, frequency);
  148.     }
  149.         
  150.     public PercentFilter() { }
  151.         
  152.     /** configure the filter by checking the configuration
  153.      * 
  154.      * @param conf configuration
  155.      */
  156.     public void setConf(Configuration conf) {
  157.       this.frequency = conf.getInt("sequencefile.filter.frequency", 10);
  158.       if (this.frequency <=0) {
  159.         throw new RuntimeException(
  160.                                    "Negative "+FILTER_FREQUENCY+": "+this.frequency);
  161.       }
  162.       this.conf = conf;
  163.     }
  164.     /** Filtering method
  165.      * If record# % frequency==0, return true; otherwise return false
  166.      * @see org.apache.hadoop.mapred.SequenceFileInputFilter.Filter#accept(Object)
  167.      */
  168.     public boolean accept(Object key) {
  169.       boolean accepted = false;
  170.       if (count == 0)
  171.         accepted = true;
  172.       if (++count == frequency) {
  173.         count = 0;
  174.       }
  175.       return accepted;
  176.     }
  177.   }
  178.   /** This class returns a set of records by examing the MD5 digest of its
  179.    * key against a filtering frequency <i>f</i>. The filtering criteria is
  180.    * MD5(key) % f == 0.
  181.    */
  182.   public static class MD5Filter extends FilterBase {
  183.     private int frequency;
  184.     private static final MessageDigest DIGESTER;
  185.     public static final int MD5_LEN = 16;
  186.     private byte [] digest = new byte[MD5_LEN];
  187.         
  188.     static {
  189.       try {
  190.         DIGESTER = MessageDigest.getInstance("MD5");
  191.       } catch (NoSuchAlgorithmException e) {
  192.         throw new RuntimeException(e);
  193.       }
  194.     }
  195.     /** set the filtering frequency in configuration
  196.      * 
  197.      * @param conf configuration
  198.      * @param frequency filtering frequency
  199.      */
  200.     public static void setFrequency(Configuration conf, int frequency){
  201.       if (frequency<=0)
  202.         throw new IllegalArgumentException(
  203.                                            "Negative " + FILTER_FREQUENCY + ": "+frequency);
  204.       conf.setInt(FILTER_FREQUENCY, frequency);
  205.     }
  206.         
  207.     public MD5Filter() { }
  208.         
  209.     /** configure the filter according to configuration
  210.      * 
  211.      * @param conf configuration
  212.      */
  213.     public void setConf(Configuration conf) {
  214.       this.frequency = conf.getInt(FILTER_FREQUENCY, 10);
  215.       if (this.frequency <=0) {
  216.         throw new RuntimeException(
  217.                                    "Negative "+FILTER_FREQUENCY+": "+this.frequency);
  218.       }
  219.       this.conf = conf;
  220.     }
  221.     /** Filtering method
  222.      * If MD5(key) % frequency==0, return true; otherwise return false
  223.      * @see org.apache.hadoop.mapred.SequenceFileInputFilter.Filter#accept(Object)
  224.      */
  225.     public boolean accept(Object key) {
  226.       try {
  227.         long hashcode;
  228.         if (key instanceof Text) {
  229.           hashcode = MD5Hashcode((Text)key);
  230.         } else if (key instanceof BytesWritable) {
  231.           hashcode = MD5Hashcode((BytesWritable)key);
  232.         } else {
  233.           ByteBuffer bb;
  234.           bb = Text.encode(key.toString());
  235.           hashcode = MD5Hashcode(bb.array(), 0, bb.limit());
  236.         }
  237.         if (hashcode/frequency*frequency==hashcode)
  238.           return true;
  239.       } catch(Exception e) {
  240.         LOG.warn(e);
  241.         throw new RuntimeException(e);
  242.       }
  243.       return false;
  244.     }
  245.         
  246.     private long MD5Hashcode(Text key) throws DigestException {
  247.       return MD5Hashcode(key.getBytes(), 0, key.getLength());
  248.     }
  249.         
  250.     private long MD5Hashcode(BytesWritable key) throws DigestException {
  251.       return MD5Hashcode(key.getBytes(), 0, key.getLength());
  252.     }
  253.     synchronized private long MD5Hashcode(byte[] bytes, 
  254.                                           int start, int length) throws DigestException {
  255.       DIGESTER.update(bytes, 0, length);
  256.       DIGESTER.digest(digest, 0, MD5_LEN);
  257.       long hashcode=0;
  258.       for (int i = 0; i < 8; i++)
  259.         hashcode |= ((digest[i] & 0xffL) << (8*(7-i)));
  260.       return hashcode;
  261.     }
  262.   }
  263.     
  264.   private static class FilterRecordReader<K, V>
  265.     extends SequenceFileRecordReader<K, V> {
  266.     
  267.     private Filter filter;
  268.         
  269.     public FilterRecordReader(Configuration conf, FileSplit split)
  270.       throws IOException {
  271.       super(conf, split);
  272.       // instantiate filter
  273.       filter = (Filter)ReflectionUtils.newInstance(
  274.                                                    conf.getClass(FILTER_CLASS, PercentFilter.class), 
  275.                                                    conf);
  276.     }
  277.         
  278.     public synchronized boolean next(K key, V value) throws IOException {
  279.       while (next(key)) {
  280.         if (filter.accept(key)) {
  281.           getCurrentValue(value);
  282.           return true;
  283.         }
  284.       }
  285.             
  286.       return false;
  287.     }
  288.   }
  289. }