TestKeyValueTextInputFormat.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:8k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.*;
  20. import java.util.*;
  21. import junit.framework.TestCase;
  22. import org.apache.commons.logging.*;
  23. import org.apache.hadoop.fs.*;
  24. import org.apache.hadoop.io.*;
  25. import org.apache.hadoop.io.compress.*;
  26. import org.apache.hadoop.util.LineReader;
  27. import org.apache.hadoop.util.ReflectionUtils;
  28. public class TestKeyValueTextInputFormat extends TestCase {
  29.   private static final Log LOG =
  30.     LogFactory.getLog(TestKeyValueTextInputFormat.class.getName());
  31.   private static int MAX_LENGTH = 10000;
  32.   
  33.   private static JobConf defaultConf = new JobConf();
  34.   private static FileSystem localFs = null; 
  35.   static {
  36.     try {
  37.       localFs = FileSystem.getLocal(defaultConf);
  38.     } catch (IOException e) {
  39.       throw new RuntimeException("init failure", e);
  40.     }
  41.   }
  42.   private static Path workDir = 
  43.     new Path(new Path(System.getProperty("test.build.data", "."), "data"),
  44.              "TestKeyValueTextInputFormat");
  45.   
  46.   public void testFormat() throws Exception {
  47.     JobConf job = new JobConf();
  48.     Path file = new Path(workDir, "test.txt");
  49.     // A reporter that does nothing
  50.     Reporter reporter = Reporter.NULL;
  51.     
  52.     int seed = new Random().nextInt();
  53.     LOG.info("seed = "+seed);
  54.     Random random = new Random(seed);
  55.     localFs.delete(workDir, true);
  56.     FileInputFormat.setInputPaths(job, workDir);
  57.     // for a variety of lengths
  58.     for (int length = 0; length < MAX_LENGTH;
  59.          length+= random.nextInt(MAX_LENGTH/10)+1) {
  60.       LOG.debug("creating; entries = " + length);
  61.       // create a file with length entries
  62.       Writer writer = new OutputStreamWriter(localFs.create(file));
  63.       try {
  64.         for (int i = 0; i < length; i++) {
  65.           writer.write(Integer.toString(i*2));
  66.           writer.write("t");
  67.           writer.write(Integer.toString(i));
  68.           writer.write("n");
  69.         }
  70.       } finally {
  71.         writer.close();
  72.       }
  73.       // try splitting the file in a variety of sizes
  74.       KeyValueTextInputFormat format = new KeyValueTextInputFormat();
  75.       format.configure(job);
  76.       for (int i = 0; i < 3; i++) {
  77.         int numSplits = random.nextInt(MAX_LENGTH/20)+1;
  78.         LOG.debug("splitting: requesting = " + numSplits);
  79.         InputSplit[] splits = format.getSplits(job, numSplits);
  80.         LOG.debug("splitting: got =        " + splits.length);
  81.         // check each split
  82.         BitSet bits = new BitSet(length);
  83.         for (int j = 0; j < splits.length; j++) {
  84.           LOG.debug("split["+j+"]= " + splits[j]);
  85.           RecordReader<Text, Text> reader =
  86.             format.getRecordReader(splits[j], job, reporter);
  87.           Class readerClass = reader.getClass();
  88.           assertEquals("reader class is KeyValueLineRecordReader.", KeyValueLineRecordReader.class, readerClass);        
  89.           Text key = reader.createKey();
  90.           Class keyClass = key.getClass();
  91.           Text value = reader.createValue();
  92.           Class valueClass = value.getClass();
  93.           assertEquals("Key class is Text.", Text.class, keyClass);
  94.           assertEquals("Value class is Text.", Text.class, valueClass);
  95.           try {
  96.             int count = 0;
  97.             while (reader.next(key, value)) {
  98.               int v = Integer.parseInt(value.toString());
  99.               LOG.debug("read " + v);
  100.               if (bits.get(v)) {
  101.                 LOG.warn("conflict with " + v + 
  102.                          " in split " + j +
  103.                          " at position "+reader.getPos());
  104.               }
  105.               assertFalse("Key in multiple partitions.", bits.get(v));
  106.               bits.set(v);
  107.               count++;
  108.             }
  109.             LOG.debug("splits["+j+"]="+splits[j]+" count=" + count);
  110.           } finally {
  111.             reader.close();
  112.           }
  113.         }
  114.         assertEquals("Some keys in no partition.", length, bits.cardinality());
  115.       }
  116.     }
  117.   }
  118.   private LineReader makeStream(String str) throws IOException {
  119.     return new LineReader(new ByteArrayInputStream
  120.                                            (str.getBytes("UTF-8")), 
  121.                                            defaultConf);
  122.   }
  123.   
  124.   public void testUTF8() throws Exception {
  125.     LineReader in = makeStream("abcdu20acbdcdu20ac");
  126.     Text line = new Text();
  127.     in.readLine(line);
  128.     assertEquals("readLine changed utf8 characters", 
  129.                  "abcdu20acbdcdu20ac", line.toString());
  130.     in = makeStream("abcu200axyz");
  131.     in.readLine(line);
  132.     assertEquals("split on fake newline", "abcu200axyz", line.toString());
  133.   }
  134.   public void testNewLines() throws Exception {
  135.     LineReader in = makeStream("anbbnncccrddddrneeeee");
  136.     Text out = new Text();
  137.     in.readLine(out);
  138.     assertEquals("line1 length", 1, out.getLength());
  139.     in.readLine(out);
  140.     assertEquals("line2 length", 2, out.getLength());
  141.     in.readLine(out);
  142.     assertEquals("line3 length", 0, out.getLength());
  143.     in.readLine(out);
  144.     assertEquals("line4 length", 3, out.getLength());
  145.     in.readLine(out);
  146.     assertEquals("line5 length", 4, out.getLength());
  147.     in.readLine(out);
  148.     assertEquals("line5 length", 5, out.getLength());
  149.     assertEquals("end of file", 0, in.readLine(out));
  150.   }
  151.   
  152.   private static void writeFile(FileSystem fs, Path name, 
  153.                                 CompressionCodec codec,
  154.                                 String contents) throws IOException {
  155.     OutputStream stm;
  156.     if (codec == null) {
  157.       stm = fs.create(name);
  158.     } else {
  159.       stm = codec.createOutputStream(fs.create(name));
  160.     }
  161.     stm.write(contents.getBytes());
  162.     stm.close();
  163.   }
  164.   
  165.   private static final Reporter voidReporter = Reporter.NULL;
  166.   
  167.   private static List<Text> readSplit(KeyValueTextInputFormat format, 
  168.                                       InputSplit split, 
  169.                                       JobConf job) throws IOException {
  170.     List<Text> result = new ArrayList<Text>();
  171.     RecordReader<Text, Text> reader = format.getRecordReader(split, job,
  172.                                                  voidReporter);
  173.     Text key = reader.createKey();
  174.     Text value = reader.createValue();
  175.     while (reader.next(key, value)) {
  176.       result.add(value);
  177.       value = (Text) reader.createValue();
  178.     }
  179.     return result;
  180.   }
  181.   
  182.   /**
  183.    * Test using the gzip codec for reading
  184.    */
  185.   public static void testGzip() throws IOException {
  186.     JobConf job = new JobConf();
  187.     CompressionCodec gzip = new GzipCodec();
  188.     ReflectionUtils.setConf(gzip, job);
  189.     localFs.delete(workDir, true);
  190.     writeFile(localFs, new Path(workDir, "part1.txt.gz"), gzip, 
  191.               "line-1tthe quicknline-2tbrownnline-3tfox jumpednline-4tovernline-5t the lazynline-6t dogn");
  192.     writeFile(localFs, new Path(workDir, "part2.txt.gz"), gzip,
  193.               "line-1tthis is a testnline-1tof gzipn");
  194.     FileInputFormat.setInputPaths(job, workDir);
  195.     KeyValueTextInputFormat format = new KeyValueTextInputFormat();
  196.     format.configure(job);
  197.     InputSplit[] splits = format.getSplits(job, 100);
  198.     assertEquals("compressed splits == 2", 2, splits.length);
  199.     FileSplit tmp = (FileSplit) splits[0];
  200.     if (tmp.getPath().getName().equals("part2.txt.gz")) {
  201.       splits[0] = splits[1];
  202.       splits[1] = tmp;
  203.     }
  204.     List<Text> results = readSplit(format, splits[0], job);
  205.     assertEquals("splits[0] length", 6, results.size());
  206.     assertEquals("splits[0][5]", " dog", results.get(5).toString());
  207.     results = readSplit(format, splits[1], job);
  208.     assertEquals("splits[1] length", 2, results.size());
  209.     assertEquals("splits[1][0]", "this is a test", 
  210.                  results.get(0).toString());    
  211.     assertEquals("splits[1][1]", "of gzip", 
  212.                  results.get(1).toString());    
  213.   }
  214.   
  215.   public static void main(String[] args) throws Exception {
  216.     new TestKeyValueTextInputFormat().testFormat();
  217.   }
  218. }