BloomMapFile.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:9k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.io;
  19. import java.io.DataInputStream;
  20. import java.io.DataOutputStream;
  21. import java.io.IOException;
  22. import org.apache.commons.logging.Log;
  23. import org.apache.commons.logging.LogFactory;
  24. import org.apache.hadoop.conf.Configuration;
  25. import org.apache.hadoop.fs.FileSystem;
  26. import org.apache.hadoop.fs.Path;
  27. import org.apache.hadoop.io.SequenceFile.CompressionType;
  28. import org.apache.hadoop.io.compress.CompressionCodec;
  29. import org.apache.hadoop.util.Progressable;
  30. import org.apache.hadoop.util.bloom.DynamicBloomFilter;
  31. import org.apache.hadoop.util.bloom.Filter;
  32. import org.apache.hadoop.util.bloom.Key;
  33. import org.apache.hadoop.util.hash.Hash;
  34. /**
  35.  * This class extends {@link MapFile} and provides very much the same
  36.  * functionality. However, it uses dynamic Bloom filters to provide
  37.  * quick membership test for keys, and it offers a fast version of 
  38.  * {@link Reader#get(WritableComparable, Writable)} operation, especially in
  39.  * case of sparsely populated MapFile-s.
  40.  */
  41. public class BloomMapFile {
  42.   private static final Log LOG = LogFactory.getLog(BloomMapFile.class);
  43.   public static final String BLOOM_FILE_NAME = "bloom";
  44.   public static final int HASH_COUNT = 5;
  45.   
  46.   public static void delete(FileSystem fs, String name) throws IOException {
  47.     Path dir = new Path(name);
  48.     Path data = new Path(dir, MapFile.DATA_FILE_NAME);
  49.     Path index = new Path(dir, MapFile.INDEX_FILE_NAME);
  50.     Path bloom = new Path(dir, BLOOM_FILE_NAME);
  51.     fs.delete(data, true);
  52.     fs.delete(index, true);
  53.     fs.delete(bloom, true);
  54.     fs.delete(dir, true);
  55.   }
  56.   
  57.   public static class Writer extends MapFile.Writer {
  58.     private DynamicBloomFilter bloomFilter;
  59.     private int numKeys;
  60.     private int vectorSize;
  61.     private Key bloomKey = new Key();
  62.     private DataOutputBuffer buf = new DataOutputBuffer();
  63.     private FileSystem fs;
  64.     private Path dir;
  65.     
  66.     public Writer(Configuration conf, FileSystem fs, String dirName,
  67.         Class<? extends WritableComparable> keyClass,
  68.         Class<? extends Writable> valClass, CompressionType compress,
  69.         CompressionCodec codec, Progressable progress) throws IOException {
  70.       super(conf, fs, dirName, keyClass, valClass, compress, codec, progress);
  71.       this.fs = fs;
  72.       this.dir = new Path(dirName);
  73.       initBloomFilter(conf);
  74.     }
  75.     public Writer(Configuration conf, FileSystem fs, String dirName,
  76.         Class<? extends WritableComparable> keyClass,
  77.         Class valClass, CompressionType compress,
  78.         Progressable progress) throws IOException {
  79.       super(conf, fs, dirName, keyClass, valClass, compress, progress);
  80.       this.fs = fs;
  81.       this.dir = new Path(dirName);
  82.       initBloomFilter(conf);
  83.     }
  84.     public Writer(Configuration conf, FileSystem fs, String dirName,
  85.         Class<? extends WritableComparable> keyClass,
  86.         Class valClass, CompressionType compress)
  87.         throws IOException {
  88.       super(conf, fs, dirName, keyClass, valClass, compress);
  89.       this.fs = fs;
  90.       this.dir = new Path(dirName);
  91.       initBloomFilter(conf);
  92.     }
  93.     public Writer(Configuration conf, FileSystem fs, String dirName,
  94.         WritableComparator comparator, Class valClass,
  95.         CompressionType compress, CompressionCodec codec, Progressable progress)
  96.         throws IOException {
  97.       super(conf, fs, dirName, comparator, valClass, compress, codec, progress);
  98.       this.fs = fs;
  99.       this.dir = new Path(dirName);
  100.       initBloomFilter(conf);
  101.     }
  102.     public Writer(Configuration conf, FileSystem fs, String dirName,
  103.         WritableComparator comparator, Class valClass,
  104.         CompressionType compress, Progressable progress) throws IOException {
  105.       super(conf, fs, dirName, comparator, valClass, compress, progress);
  106.       this.fs = fs;
  107.       this.dir = new Path(dirName);
  108.       initBloomFilter(conf);
  109.     }
  110.     public Writer(Configuration conf, FileSystem fs, String dirName,
  111.         WritableComparator comparator, Class valClass, CompressionType compress)
  112.         throws IOException {
  113.       super(conf, fs, dirName, comparator, valClass, compress);
  114.       this.fs = fs;
  115.       this.dir = new Path(dirName);
  116.       initBloomFilter(conf);
  117.     }
  118.     public Writer(Configuration conf, FileSystem fs, String dirName,
  119.         WritableComparator comparator, Class valClass) throws IOException {
  120.       super(conf, fs, dirName, comparator, valClass);
  121.       this.fs = fs;
  122.       this.dir = new Path(dirName);
  123.       initBloomFilter(conf);
  124.     }
  125.     public Writer(Configuration conf, FileSystem fs, String dirName,
  126.         Class<? extends WritableComparable> keyClass,
  127.         Class valClass) throws IOException {
  128.       super(conf, fs, dirName, keyClass, valClass);
  129.       this.fs = fs;
  130.       this.dir = new Path(dirName);
  131.       initBloomFilter(conf);
  132.     }
  133.     private synchronized void initBloomFilter(Configuration conf) {
  134.       numKeys = conf.getInt("io.mapfile.bloom.size", 1024 * 1024);
  135.       // vector size should be <code>-kn / (ln(1 - c^(1/k)))</code> bits for
  136.       // single key, where <code> is the number of hash functions,
  137.       // <code>n</code> is the number of keys and <code>c</code> is the desired
  138.       // max. error rate.
  139.       // Our desired error rate is by default 0.005, i.e. 0.5%
  140.       float errorRate = conf.getFloat("io.mapfile.bloom.error.rate", 0.005f);
  141.       vectorSize = (int)Math.ceil((double)(-HASH_COUNT * numKeys) /
  142.           Math.log(1.0 - Math.pow(errorRate, 1.0/HASH_COUNT)));
  143.       bloomFilter = new DynamicBloomFilter(vectorSize, HASH_COUNT,
  144.           Hash.getHashType(conf), numKeys);
  145.     }
  146.     @Override
  147.     public synchronized void append(WritableComparable key, Writable val)
  148.         throws IOException {
  149.       super.append(key, val);
  150.       buf.reset();
  151.       key.write(buf);
  152.       bloomKey.set(buf.getData(), 1.0);
  153.       bloomFilter.add(bloomKey);
  154.     }
  155.     @Override
  156.     public synchronized void close() throws IOException {
  157.       super.close();
  158.       DataOutputStream out = fs.create(new Path(dir, BLOOM_FILE_NAME), true);
  159.       bloomFilter.write(out);
  160.       out.flush();
  161.       out.close();
  162.     }
  163.   }
  164.   
  165.   public static class Reader extends MapFile.Reader {
  166.     private DynamicBloomFilter bloomFilter;
  167.     private DataOutputBuffer buf = new DataOutputBuffer();
  168.     private Key bloomKey = new Key();
  169.     public Reader(FileSystem fs, String dirName, Configuration conf)
  170.         throws IOException {
  171.       super(fs, dirName, conf);
  172.       initBloomFilter(fs, dirName, conf);
  173.     }
  174.     public Reader(FileSystem fs, String dirName, WritableComparator comparator,
  175.         Configuration conf, boolean open) throws IOException {
  176.       super(fs, dirName, comparator, conf, open);
  177.       initBloomFilter(fs, dirName, conf);
  178.     }
  179.     public Reader(FileSystem fs, String dirName, WritableComparator comparator,
  180.         Configuration conf) throws IOException {
  181.       super(fs, dirName, comparator, conf);
  182.       initBloomFilter(fs, dirName, conf);
  183.     }
  184.     
  185.     private void initBloomFilter(FileSystem fs, String dirName,
  186.         Configuration conf) {
  187.       try {
  188.         DataInputStream in = fs.open(new Path(dirName, BLOOM_FILE_NAME));
  189.         bloomFilter = new DynamicBloomFilter();
  190.         bloomFilter.readFields(in);
  191.         in.close();
  192.       } catch (IOException ioe) {
  193.         LOG.warn("Can't open BloomFilter: " + ioe + " - fallback to MapFile.");
  194.         bloomFilter = null;
  195.       }
  196.     }
  197.     
  198.     /**
  199.      * Checks if this MapFile has the indicated key. The membership test is
  200.      * performed using a Bloom filter, so the result has always non-zero
  201.      * probability of false positives.
  202.      * @param key key to check
  203.      * @return  false iff key doesn't exist, true if key probably exists.
  204.      * @throws IOException
  205.      */
  206.     public boolean probablyHasKey(WritableComparable key) throws IOException {
  207.       if (bloomFilter == null) {
  208.         return true;
  209.       }
  210.       buf.reset();
  211.       key.write(buf);
  212.       bloomKey.set(buf.getData(), 1.0);
  213.       return bloomFilter.membershipTest(bloomKey);
  214.     }
  215.     
  216.     /**
  217.      * Fast version of the
  218.      * {@link MapFile.Reader#get(WritableComparable, Writable)} method. First
  219.      * it checks the Bloom filter for the existence of the key, and only if
  220.      * present it performs the real get operation. This yields significant
  221.      * performance improvements for get operations on sparsely populated files.
  222.      */
  223.     @Override
  224.     public synchronized Writable get(WritableComparable key, Writable val)
  225.         throws IOException {
  226.       if (!probablyHasKey(key)) {
  227.         return null;
  228.       }
  229.       return super.get(key, val);
  230.     }
  231.     
  232.     /**
  233.      * Retrieve the Bloom filter used by this instance of the Reader.
  234.      * @return a Bloom filter (see {@link Filter})
  235.      */
  236.     public Filter getBloomFilter() {
  237.       return bloomFilter;
  238.     }
  239.   }
  240. }