TestTextInputFormat.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:12k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.*;
  20. import java.util.*;
  21. import junit.framework.TestCase;
  22. import org.apache.commons.logging.*;
  23. import org.apache.hadoop.fs.*;
  24. import org.apache.hadoop.io.*;
  25. import org.apache.hadoop.io.compress.*;
  26. import org.apache.hadoop.util.LineReader;
  27. import org.apache.hadoop.util.ReflectionUtils;
  28. public class TestTextInputFormat extends TestCase {
  29.   private static final Log LOG =
  30.     LogFactory.getLog(TestTextInputFormat.class.getName());
  31.   private static int MAX_LENGTH = 10000;
  32.   
  33.   private static JobConf defaultConf = new JobConf();
  34.   private static FileSystem localFs = null; 
  35.   static {
  36.     try {
  37.       localFs = FileSystem.getLocal(defaultConf);
  38.     } catch (IOException e) {
  39.       throw new RuntimeException("init failure", e);
  40.     }
  41.   }
  42.   private static Path workDir = 
  43.     new Path(new Path(System.getProperty("test.build.data", "."), "data"),
  44.              "TestTextInputFormat");
  45.   
  46.   public void testFormat() throws Exception {
  47.     JobConf job = new JobConf();
  48.     Path file = new Path(workDir, "test.txt");
  49.     // A reporter that does nothing
  50.     Reporter reporter = Reporter.NULL;
  51.     
  52.     int seed = new Random().nextInt();
  53.     LOG.info("seed = "+seed);
  54.     Random random = new Random(seed);
  55.     localFs.delete(workDir, true);
  56.     FileInputFormat.setInputPaths(job, workDir);
  57.     // for a variety of lengths
  58.     for (int length = 0; length < MAX_LENGTH;
  59.          length+= random.nextInt(MAX_LENGTH/10)+1) {
  60.       LOG.debug("creating; entries = " + length);
  61.       // create a file with length entries
  62.       Writer writer = new OutputStreamWriter(localFs.create(file));
  63.       try {
  64.         for (int i = 0; i < length; i++) {
  65.           writer.write(Integer.toString(i));
  66.           writer.write("n");
  67.         }
  68.       } finally {
  69.         writer.close();
  70.       }
  71.       // try splitting the file in a variety of sizes
  72.       TextInputFormat format = new TextInputFormat();
  73.       format.configure(job);
  74.       LongWritable key = new LongWritable();
  75.       Text value = new Text();
  76.       for (int i = 0; i < 3; i++) {
  77.         int numSplits = random.nextInt(MAX_LENGTH/20)+1;
  78.         LOG.debug("splitting: requesting = " + numSplits);
  79.         InputSplit[] splits = format.getSplits(job, numSplits);
  80.         LOG.debug("splitting: got =        " + splits.length);
  81.         if (length == 0) {
  82.            assertEquals("Files of length 0 are not returned from FileInputFormat.getSplits().", 
  83.                         1, splits.length);
  84.            assertEquals("Empty file length == 0", 0, splits[0].getLength());
  85.         }
  86.         // check each split
  87.         BitSet bits = new BitSet(length);
  88.         for (int j = 0; j < splits.length; j++) {
  89.           LOG.debug("split["+j+"]= " + splits[j]);
  90.           RecordReader<LongWritable, Text> reader =
  91.             format.getRecordReader(splits[j], job, reporter);
  92.           try {
  93.             int count = 0;
  94.             while (reader.next(key, value)) {
  95.               int v = Integer.parseInt(value.toString());
  96.               LOG.debug("read " + v);
  97.               if (bits.get(v)) {
  98.                 LOG.warn("conflict with " + v + 
  99.                          " in split " + j +
  100.                          " at position "+reader.getPos());
  101.               }
  102.               assertFalse("Key in multiple partitions.", bits.get(v));
  103.               bits.set(v);
  104.               count++;
  105.             }
  106.             LOG.debug("splits["+j+"]="+splits[j]+" count=" + count);
  107.           } finally {
  108.             reader.close();
  109.           }
  110.         }
  111.         assertEquals("Some keys in no partition.", length, bits.cardinality());
  112.       }
  113.     }
  114.   }
  115.   private static LineReader makeStream(String str) throws IOException {
  116.     return new LineReader(new ByteArrayInputStream
  117.                                              (str.getBytes("UTF-8")), 
  118.                                            defaultConf);
  119.   }
  120.   private static LineReader makeStream(String str, int bufsz) throws IOException {
  121.     return new LineReader(new ByteArrayInputStream
  122.                                              (str.getBytes("UTF-8")), 
  123.                                            bufsz);
  124.   }
  125.   
  126.   public void testUTF8() throws Exception {
  127.     LineReader in = makeStream("abcdu20acbdcdu20ac");
  128.     Text line = new Text();
  129.     in.readLine(line);
  130.     assertEquals("readLine changed utf8 characters", 
  131.                  "abcdu20acbdcdu20ac", line.toString());
  132.     in = makeStream("abcu200axyz");
  133.     in.readLine(line);
  134.     assertEquals("split on fake newline", "abcu200axyz", line.toString());
  135.   }
  136.   /**
  137.    * Test readLine for various kinds of line termination sequneces.
  138.    * Varies buffer size to stress test.  Also check that returned
  139.    * value matches the string length.
  140.    *
  141.    * @throws Exception
  142.    */
  143.   public void testNewLines() throws Exception {
  144.     final String STR = "anbbnncccrddddrrrnrneeeee";
  145.     final int STRLENBYTES = STR.getBytes().length;
  146.     Text out = new Text();
  147.     for (int bufsz = 1; bufsz < STRLENBYTES+1; ++bufsz) {
  148.       LineReader in = makeStream(STR, bufsz);
  149.       int c = 0;
  150.       c += in.readLine(out); //"a"n
  151.       assertEquals("line1 length, bufsz:"+bufsz, 1, out.getLength());
  152.       c += in.readLine(out); //"bb"n
  153.       assertEquals("line2 length, bufsz:"+bufsz, 2, out.getLength());
  154.       c += in.readLine(out); //""n
  155.       assertEquals("line3 length, bufsz:"+bufsz, 0, out.getLength());
  156.       c += in.readLine(out); //"ccc"r
  157.       assertEquals("line4 length, bufsz:"+bufsz, 3, out.getLength());
  158.       c += in.readLine(out); //ddddr
  159.       assertEquals("line5 length, bufsz:"+bufsz, 4, out.getLength());
  160.       c += in.readLine(out); //""r
  161.       assertEquals("line6 length, bufsz:"+bufsz, 0, out.getLength());
  162.       c += in.readLine(out); //""rn
  163.       assertEquals("line7 length, bufsz:"+bufsz, 0, out.getLength());
  164.       c += in.readLine(out); //""rn
  165.       assertEquals("line8 length, bufsz:"+bufsz, 0, out.getLength());
  166.       c += in.readLine(out); //"eeeee"EOF
  167.       assertEquals("line9 length, bufsz:"+bufsz, 5, out.getLength());
  168.       assertEquals("end of file, bufsz: "+bufsz, 0, in.readLine(out));
  169.       assertEquals("total bytes, bufsz: "+bufsz, c, STRLENBYTES);
  170.     }
  171.   }
  172.   /**
  173.    * Test readLine for correct interpretation of maxLineLength
  174.    * (returned string should be clipped at maxLineLength, and the
  175.    * remaining bytes on the same line should be thrown out).
  176.    * Also check that returned value matches the string length.
  177.    * Varies buffer size to stress test.
  178.    *
  179.    * @throws Exception
  180.    */
  181.   public void testMaxLineLength() throws Exception {
  182.     final String STR = "anbbnncccrddddrneeeee";
  183.     final int STRLENBYTES = STR.getBytes().length;
  184.     Text out = new Text();
  185.     for (int bufsz = 1; bufsz < STRLENBYTES+1; ++bufsz) {
  186.       LineReader in = makeStream(STR, bufsz);
  187.       int c = 0;
  188.       c += in.readLine(out, 1);
  189.       assertEquals("line1 length, bufsz: "+bufsz, 1, out.getLength());
  190.       c += in.readLine(out, 1);
  191.       assertEquals("line2 length, bufsz: "+bufsz, 1, out.getLength());
  192.       c += in.readLine(out, 1);
  193.       assertEquals("line3 length, bufsz: "+bufsz, 0, out.getLength());
  194.       c += in.readLine(out, 3);
  195.       assertEquals("line4 length, bufsz: "+bufsz, 3, out.getLength());
  196.       c += in.readLine(out, 10);
  197.       assertEquals("line5 length, bufsz: "+bufsz, 4, out.getLength());
  198.       c += in.readLine(out, 8);
  199.       assertEquals("line5 length, bufsz: "+bufsz, 5, out.getLength());
  200.       assertEquals("end of file, bufsz: " +bufsz, 0, in.readLine(out));
  201.       assertEquals("total bytes, bufsz: "+bufsz, c, STRLENBYTES);
  202.     }
  203.   }
  204.   private static void writeFile(FileSystem fs, Path name, 
  205.                                 CompressionCodec codec,
  206.                                 String contents) throws IOException {
  207.     OutputStream stm;
  208.     if (codec == null) {
  209.       stm = fs.create(name);
  210.     } else {
  211.       stm = codec.createOutputStream(fs.create(name));
  212.     }
  213.     stm.write(contents.getBytes());
  214.     stm.close();
  215.   }
  216.   
  217.   private static final Reporter voidReporter = Reporter.NULL;
  218.   
  219.   private static List<Text> readSplit(TextInputFormat format, 
  220.                                       InputSplit split, 
  221.                                       JobConf job) throws IOException {
  222.     List<Text> result = new ArrayList<Text>();
  223.     RecordReader<LongWritable, Text> reader =
  224.       format.getRecordReader(split, job, voidReporter);
  225.     LongWritable key = reader.createKey();
  226.     Text value = reader.createValue();
  227.     while (reader.next(key, value)) {
  228.       result.add(value);
  229.       value = (Text) reader.createValue();
  230.     }
  231.     reader.close();
  232.     return result;
  233.   }
  234.   
  235.   /**
  236.    * Test using the gzip codec for reading
  237.    */
  238.   public static void testGzip() throws IOException {
  239.     JobConf job = new JobConf();
  240.     CompressionCodec gzip = new GzipCodec();
  241.     ReflectionUtils.setConf(gzip, job);
  242.     localFs.delete(workDir, true);
  243.     writeFile(localFs, new Path(workDir, "part1.txt.gz"), gzip, 
  244.               "the quicknbrownnfox jumpednovern the lazyn dogn");
  245.     writeFile(localFs, new Path(workDir, "part2.txt.gz"), gzip,
  246.               "this is a testnof gzipn");
  247.     FileInputFormat.setInputPaths(job, workDir);
  248.     TextInputFormat format = new TextInputFormat();
  249.     format.configure(job);
  250.     InputSplit[] splits = format.getSplits(job, 100);
  251.     assertEquals("compressed splits == 2", 2, splits.length);
  252.     FileSplit tmp = (FileSplit) splits[0];
  253.     if (tmp.getPath().getName().equals("part2.txt.gz")) {
  254.       splits[0] = splits[1];
  255.       splits[1] = tmp;
  256.     }
  257.     List<Text> results = readSplit(format, splits[0], job);
  258.     assertEquals("splits[0] length", 6, results.size());
  259.     assertEquals("splits[0][5]", " dog", results.get(5).toString());
  260.     results = readSplit(format, splits[1], job);
  261.     assertEquals("splits[1] length", 2, results.size());
  262.     assertEquals("splits[1][0]", "this is a test", 
  263.                  results.get(0).toString());    
  264.     assertEquals("splits[1][1]", "of gzip", 
  265.                  results.get(1).toString());    
  266.   }
  267.   /**
  268.    * Test using the gzip codec and an empty input file
  269.    */
  270.   public static void testGzipEmpty() throws IOException {
  271.     JobConf job = new JobConf();
  272.     CompressionCodec gzip = new GzipCodec();
  273.     ReflectionUtils.setConf(gzip, job);
  274.     localFs.delete(workDir, true);
  275.     writeFile(localFs, new Path(workDir, "empty.gz"), gzip, "");
  276.     FileInputFormat.setInputPaths(job, workDir);
  277.     TextInputFormat format = new TextInputFormat();
  278.     format.configure(job);
  279.     InputSplit[] splits = format.getSplits(job, 100);
  280.     assertEquals("Compressed files of length 0 are not returned from FileInputFormat.getSplits().",
  281.                  1, splits.length);
  282.     List<Text> results = readSplit(format, splits[0], job);
  283.     assertEquals("Compressed empty file length == 0", 0, results.size());
  284.   }
  285.   
  286.   private static String unquote(String in) {
  287.     StringBuffer result = new StringBuffer();
  288.     for(int i=0; i < in.length(); ++i) {
  289.       char ch = in.charAt(i);
  290.       if (ch == '\') {
  291.         ch = in.charAt(++i);
  292.         switch (ch) {
  293.         case 'n':
  294.           result.append('n');
  295.           break;
  296.         case 'r':
  297.           result.append('r');
  298.           break;
  299.         default:
  300.           result.append(ch);
  301.           break;
  302.         }
  303.       } else {
  304.         result.append(ch);
  305.       }
  306.     }
  307.     return result.toString();
  308.   }
  309.   /**
  310.    * Parse the command line arguments into lines and display the result.
  311.    * @param args
  312.    * @throws Exception
  313.    */
  314.   public static void main(String[] args) throws Exception {
  315.     for(String arg: args) {
  316.       System.out.println("Working on " + arg);
  317.       LineReader reader = makeStream(unquote(arg));
  318.       Text line = new Text();
  319.       int size = reader.readLine(line);
  320.       while (size > 0) {
  321.         System.out.println("Got: " + line.toString());
  322.         size = reader.readLine(line);
  323.       }
  324.       reader.close();
  325.     }
  326.   }
  327. }