TestSequenceFileInputFilter.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:6k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.*;
  20. import java.util.*;
  21. import junit.framework.TestCase;
  22. import org.apache.commons.logging.*;
  23. import org.apache.hadoop.fs.*;
  24. import org.apache.hadoop.io.*;
  25. import org.apache.hadoop.conf.*;
  26. public class TestSequenceFileInputFilter extends TestCase {
  27.   private static final Log LOG = FileInputFormat.LOG;
  28.   private static final int MAX_LENGTH = 15000;
  29.   private static final Configuration conf = new Configuration();
  30.   private static final JobConf job = new JobConf(conf);
  31.   private static final FileSystem fs;
  32.   private static final Path inDir = new Path(System.getProperty("test.build.data",".") + "/mapred");
  33.   private static final Path inFile = new Path(inDir, "test.seq");
  34.   private static final Random random = new Random(1);
  35.   private static final Reporter reporter = Reporter.NULL;
  36.   
  37.   static {
  38.     FileInputFormat.setInputPaths(job, inDir);
  39.     try {
  40.       fs = FileSystem.getLocal(conf);
  41.     } catch (IOException e) {
  42.       e.printStackTrace();
  43.       throw new RuntimeException(e);
  44.     }
  45.   }
  46.   private static void createSequenceFile(int numRecords) throws Exception {
  47.     // create a file with length entries
  48.     SequenceFile.Writer writer =
  49.       SequenceFile.createWriter(fs, conf, inFile,
  50.                                 Text.class, BytesWritable.class);
  51.     try {
  52.       for (int i = 1; i <= numRecords; i++) {
  53.         Text key = new Text(Integer.toString(i));
  54.         byte[] data = new byte[random.nextInt(10)];
  55.         random.nextBytes(data);
  56.         BytesWritable value = new BytesWritable(data);
  57.         writer.append(key, value);
  58.       }
  59.     } finally {
  60.       writer.close();
  61.     }
  62.   }
  63.   private int countRecords(int numSplits) throws IOException {
  64.     InputFormat<Text, BytesWritable> format =
  65.       new SequenceFileInputFilter<Text, BytesWritable>();
  66.     Text key = new Text();
  67.     BytesWritable value = new BytesWritable();
  68.     if (numSplits==0) {
  69.       numSplits =
  70.         random.nextInt(MAX_LENGTH/(SequenceFile.SYNC_INTERVAL/20))+1;
  71.     }
  72.     InputSplit[] splits = format.getSplits(job, numSplits);
  73.       
  74.     // check each split
  75.     int count = 0;
  76.     LOG.info("Generated " + splits.length + " splits.");
  77.     for (int j = 0; j < splits.length; j++) {
  78.       RecordReader<Text, BytesWritable> reader =
  79.         format.getRecordReader(splits[j], job, reporter);
  80.       try {
  81.         while (reader.next(key, value)) {
  82.           LOG.info("Accept record "+key.toString());
  83.           count++;
  84.         }
  85.       } finally {
  86.         reader.close();
  87.       }
  88.     }
  89.     return count;
  90.   }
  91.   
  92.   public void testRegexFilter() throws Exception {
  93.     // set the filter class
  94.     LOG.info("Testing Regex Filter with patter: \A10*");
  95.     SequenceFileInputFilter.setFilterClass(job, 
  96.                                            SequenceFileInputFilter.RegexFilter.class);
  97.     SequenceFileInputFilter.RegexFilter.setPattern(job, "\A10*");
  98.     
  99.     // clean input dir
  100.     fs.delete(inDir, true);
  101.   
  102.     // for a variety of lengths
  103.     for (int length = 1; length < MAX_LENGTH;
  104.          length+= random.nextInt(MAX_LENGTH/10)+1) {
  105.       LOG.info("******Number of records: "+length);
  106.       createSequenceFile(length);
  107.       int count = countRecords(0);
  108.       assertEquals(count, length==0?0:(int)Math.log10(length)+1);
  109.     }
  110.     
  111.     // clean up
  112.     fs.delete(inDir, true);
  113.   }
  114.   public void testPercentFilter() throws Exception {
  115.     LOG.info("Testing Percent Filter with frequency: 1000");
  116.     // set the filter class
  117.     SequenceFileInputFilter.setFilterClass(job, 
  118.                                            SequenceFileInputFilter.PercentFilter.class);
  119.     SequenceFileInputFilter.PercentFilter.setFrequency(job, 1000);
  120.       
  121.     // clean input dir
  122.     fs.delete(inDir, true);
  123.     
  124.     // for a variety of lengths
  125.     for (int length = 0; length < MAX_LENGTH;
  126.          length+= random.nextInt(MAX_LENGTH/10)+1) {
  127.       LOG.info("******Number of records: "+length);
  128.       createSequenceFile(length);
  129.       int count = countRecords(1);
  130.       LOG.info("Accepted "+count+" records");
  131.       int expectedCount = length/1000;
  132.       if (expectedCount*1000!=length)
  133.         expectedCount++;
  134.       assertEquals(count, expectedCount);
  135.     }
  136.       
  137.     // clean up
  138.     fs.delete(inDir, true);
  139.   }
  140.   
  141.   public void testMD5Filter() throws Exception {
  142.     // set the filter class
  143.     LOG.info("Testing MD5 Filter with frequency: 1000");
  144.     SequenceFileInputFilter.setFilterClass(job, 
  145.                                            SequenceFileInputFilter.MD5Filter.class);
  146.     SequenceFileInputFilter.MD5Filter.setFrequency(job, 1000);
  147.       
  148.     // clean input dir
  149.     fs.delete(inDir, true);
  150.     
  151.     // for a variety of lengths
  152.     for (int length = 0; length < MAX_LENGTH;
  153.          length+= random.nextInt(MAX_LENGTH/10)+1) {
  154.       LOG.info("******Number of records: "+length);
  155.       createSequenceFile(length);
  156.       LOG.info("Accepted "+countRecords(0)+" records");
  157.     }
  158.     // clean up
  159.     fs.delete(inDir, true);
  160.   }
  161.   public static void main(String[] args) throws Exception {
  162.     TestSequenceFileInputFilter filter = new TestSequenceFileInputFilter();
  163.     filter.testRegexFilter();
  164.   }
  165. }