TestSequenceFileAsBinaryOutputFormat.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:8k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.IOException;
  20. import java.util.Random;
  21. import org.apache.hadoop.fs.*;
  22. import org.apache.hadoop.io.*;
  23. import org.apache.hadoop.io.SequenceFile.CompressionType;
  24. import junit.framework.TestCase;
  25. import org.apache.commons.logging.*;
  26. public class TestSequenceFileAsBinaryOutputFormat extends TestCase {
  27.   private static final Log LOG =
  28.       LogFactory.getLog(TestSequenceFileAsBinaryOutputFormat.class.getName());
  29.   private static final int RECORDS = 10000;
  30.   // A random task attempt id for testing.
  31.   private static final String attempt = "attempt_200707121733_0001_m_000000_0";
  32.   public void testBinary() throws IOException {
  33.     JobConf job = new JobConf();
  34.     FileSystem fs = FileSystem.getLocal(job);
  35.     
  36.     Path dir = 
  37.       new Path(new Path(new Path(System.getProperty("test.build.data",".")), 
  38.                         FileOutputCommitter.TEMP_DIR_NAME), "_" + attempt);
  39.     Path file = new Path(dir, "testbinary.seq");
  40.     Random r = new Random();
  41.     long seed = r.nextLong();
  42.     r.setSeed(seed);
  43.     fs.delete(dir, true);
  44.     if (!fs.mkdirs(dir)) { 
  45.       fail("Failed to create output directory");
  46.     }
  47.     job.set("mapred.task.id", attempt);
  48.     FileOutputFormat.setOutputPath(job, dir.getParent().getParent());
  49.     FileOutputFormat.setWorkOutputPath(job, dir);
  50.     SequenceFileAsBinaryOutputFormat.setSequenceFileOutputKeyClass(job, 
  51.                                           IntWritable.class );
  52.     SequenceFileAsBinaryOutputFormat.setSequenceFileOutputValueClass(job, 
  53.                                           DoubleWritable.class ); 
  54.     SequenceFileAsBinaryOutputFormat.setCompressOutput(job, true);
  55.     SequenceFileAsBinaryOutputFormat.setOutputCompressionType(job, 
  56.                                                        CompressionType.BLOCK);
  57.     BytesWritable bkey = new BytesWritable();
  58.     BytesWritable bval = new BytesWritable();
  59.     RecordWriter <BytesWritable, BytesWritable> writer = 
  60.       new SequenceFileAsBinaryOutputFormat().getRecordWriter(fs, 
  61.                                                        job, file.toString(),
  62.                                                        Reporter.NULL);
  63.     IntWritable iwritable = new IntWritable();
  64.     DoubleWritable dwritable = new DoubleWritable();
  65.     DataOutputBuffer outbuf = new DataOutputBuffer();
  66.     LOG.info("Creating data by SequenceFileAsBinaryOutputFormat");
  67.     try {
  68.       for (int i = 0; i < RECORDS; ++i) {
  69.         iwritable = new IntWritable(r.nextInt());
  70.         iwritable.write(outbuf);
  71.         bkey.set(outbuf.getData(), 0, outbuf.getLength());
  72.         outbuf.reset();
  73.         dwritable = new DoubleWritable(r.nextDouble());
  74.         dwritable.write(outbuf);
  75.         bval.set(outbuf.getData(), 0, outbuf.getLength());
  76.         outbuf.reset();
  77.         writer.write(bkey, bval);
  78.       }
  79.     } finally {
  80.       writer.close(Reporter.NULL);
  81.     }
  82.     InputFormat<IntWritable,DoubleWritable> iformat =
  83.                     new SequenceFileInputFormat<IntWritable,DoubleWritable>();
  84.     int count = 0;
  85.     r.setSeed(seed);
  86.     DataInputBuffer buf = new DataInputBuffer();
  87.     final int NUM_SPLITS = 3;
  88.     SequenceFileInputFormat.addInputPath(job, file);
  89.     LOG.info("Reading data by SequenceFileInputFormat");
  90.     for (InputSplit split : iformat.getSplits(job, NUM_SPLITS)) {
  91.       RecordReader<IntWritable,DoubleWritable> reader =
  92.         iformat.getRecordReader(split, job, Reporter.NULL);
  93.       try {
  94.         int sourceInt;
  95.         double sourceDouble;
  96.         while (reader.next(iwritable, dwritable)) {
  97.           sourceInt = r.nextInt();
  98.           sourceDouble = r.nextDouble();
  99.           assertEquals(
  100.               "Keys don't match: " + "*" + iwritable.get() + ":" + 
  101.                                            sourceInt + "*",
  102.               sourceInt, iwritable.get());
  103.           assertTrue(
  104.               "Vals don't match: " + "*" + dwritable.get() + ":" +
  105.                                            sourceDouble + "*",
  106.               Double.compare(dwritable.get(), sourceDouble) == 0 );
  107.           ++count;
  108.         }
  109.       } finally {
  110.         reader.close();
  111.       }
  112.     }
  113.     assertEquals("Some records not found", RECORDS, count);
  114.   }
  115.   public void testSequenceOutputClassDefaultsToMapRedOutputClass() 
  116.          throws IOException {
  117.     JobConf job = new JobConf();
  118.     FileSystem fs = FileSystem.getLocal(job);
  119.     // Setting Random class to test getSequenceFileOutput{Key,Value}Class
  120.     job.setOutputKeyClass(FloatWritable.class);
  121.     job.setOutputValueClass(BooleanWritable.class);
  122.     assertEquals("SequenceFileOutputKeyClass should default to ouputKeyClass", 
  123.              FloatWritable.class,
  124.              SequenceFileAsBinaryOutputFormat.getSequenceFileOutputKeyClass(
  125.                                                                          job));
  126.     assertEquals("SequenceFileOutputValueClass should default to " 
  127.              + "ouputValueClass", 
  128.              BooleanWritable.class,
  129.              SequenceFileAsBinaryOutputFormat.getSequenceFileOutputValueClass(
  130.                                                                          job));
  131.     SequenceFileAsBinaryOutputFormat.setSequenceFileOutputKeyClass(job, 
  132.                                           IntWritable.class );
  133.     SequenceFileAsBinaryOutputFormat.setSequenceFileOutputValueClass(job, 
  134.                                           DoubleWritable.class ); 
  135.     assertEquals("SequenceFileOutputKeyClass not updated", 
  136.              IntWritable.class,
  137.              SequenceFileAsBinaryOutputFormat.getSequenceFileOutputKeyClass(
  138.                                                                          job));
  139.     assertEquals("SequenceFileOutputValueClass not updated", 
  140.              DoubleWritable.class,
  141.              SequenceFileAsBinaryOutputFormat.getSequenceFileOutputValueClass(
  142.                                                                          job));
  143.   }
  144.   public void testcheckOutputSpecsForbidRecordCompression() throws IOException {
  145.     JobConf job = new JobConf();
  146.     FileSystem fs = FileSystem.getLocal(job);
  147.     Path dir = new Path(System.getProperty("test.build.data",".") + "/mapred");
  148.     Path outputdir = new Path(System.getProperty("test.build.data",".") 
  149.                               + "/output");
  150.     fs.delete(dir, true);
  151.     fs.delete(outputdir, true);
  152.     if (!fs.mkdirs(dir)) { 
  153.       fail("Failed to create output directory");
  154.     }
  155.     FileOutputFormat.setWorkOutputPath(job, dir);
  156.     // Without outputpath, FileOutputFormat.checkoutputspecs will throw 
  157.     // InvalidJobConfException
  158.     FileOutputFormat.setOutputPath(job, outputdir);
  159.     // SequenceFileAsBinaryOutputFormat doesn't support record compression
  160.     // It should throw an exception when checked by checkOutputSpecs
  161.     SequenceFileAsBinaryOutputFormat.setCompressOutput(job, true);
  162.     SequenceFileAsBinaryOutputFormat.setOutputCompressionType(job, 
  163.                                                        CompressionType.BLOCK);
  164.     try {
  165.       new SequenceFileAsBinaryOutputFormat().checkOutputSpecs(fs, job);
  166.     } catch (Exception e) {
  167.       fail("Block compression should be allowed for " 
  168.                        + "SequenceFileAsBinaryOutputFormat:" 
  169.                        + "Caught " + e.getClass().getName());
  170.     }
  171.     SequenceFileAsBinaryOutputFormat.setOutputCompressionType(job, 
  172.                                                        CompressionType.RECORD);
  173.     try {
  174.       new SequenceFileAsBinaryOutputFormat().checkOutputSpecs(fs, job);
  175.       fail("Record compression should not be allowed for " 
  176.                            +"SequenceFileAsBinaryOutputFormat");
  177.     } catch (InvalidJobConfException ie) {
  178.       // expected
  179.     } catch (Exception e) {
  180.       fail("Expected " + InvalidJobConfException.class.getName() 
  181.                        + "but caught " + e.getClass().getName() );
  182.     }
  183.   }
  184. }