TestSequenceFileMergeProgress.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:3k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.io;
  19. import java.io.IOException;
  20. import java.util.ArrayList;
  21. import java.util.List;
  22. import java.util.Random;
  23. import org.apache.hadoop.fs.*;
  24. import org.apache.hadoop.io.*;
  25. import org.apache.hadoop.io.SequenceFile.CompressionType;
  26. import org.apache.hadoop.io.SequenceFile.Sorter.RawKeyValueIterator;
  27. import org.apache.hadoop.io.SequenceFile.Sorter.SegmentDescriptor;
  28. import org.apache.hadoop.io.compress.CompressionCodec;
  29. import org.apache.hadoop.io.compress.DefaultCodec;
  30. import org.apache.hadoop.mapred.*;
  31. import junit.framework.TestCase;
  32. import org.apache.commons.logging.*;
  33. public class TestSequenceFileMergeProgress extends TestCase {
  34.   private static final Log LOG = FileInputFormat.LOG;
  35.   private static final int RECORDS = 10000;
  36.   
  37.   public void testMergeProgressWithNoCompression() throws IOException {
  38.     runTest(SequenceFile.CompressionType.NONE);
  39.   }
  40.   public void testMergeProgressWithRecordCompression() throws IOException {
  41.     runTest(SequenceFile.CompressionType.RECORD);
  42.   }
  43.   public void testMergeProgressWithBlockCompression() throws IOException {
  44.     runTest(SequenceFile.CompressionType.BLOCK);
  45.   }
  46.   public void runTest(CompressionType compressionType) throws IOException {
  47.     JobConf job = new JobConf();
  48.     FileSystem fs = FileSystem.getLocal(job);
  49.     Path dir = new Path(System.getProperty("test.build.data",".") + "/mapred");
  50.     Path file = new Path(dir, "test.seq");
  51.     Path tempDir = new Path(dir, "tmp");
  52.     fs.delete(dir, true);
  53.     FileInputFormat.setInputPaths(job, dir);
  54.     fs.mkdirs(tempDir);
  55.     LongWritable tkey = new LongWritable();
  56.     Text tval = new Text();
  57.     SequenceFile.Writer writer =
  58.       SequenceFile.createWriter(fs, job, file, LongWritable.class, Text.class,
  59.         compressionType, new DefaultCodec());
  60.     try {
  61.       for (int i = 0; i < RECORDS; ++i) {
  62.         tkey.set(1234);
  63.         tval.set("valuevaluevaluevaluevaluevaluevaluevaluevaluevaluevalue");
  64.         writer.append(tkey, tval);
  65.       }
  66.     } finally {
  67.       writer.close();
  68.     }
  69.     
  70.     long fileLength = fs.getFileStatus(file).getLen();
  71.     LOG.info("With compression = " + compressionType + ": "
  72.         + "compressed length = " + fileLength);
  73.     
  74.     SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs, 
  75.         job.getOutputKeyComparator(), job.getMapOutputKeyClass(),
  76.         job.getMapOutputValueClass(), job);
  77.     Path[] paths = new Path[] {file};
  78.     RawKeyValueIterator rIter = sorter.merge(paths, tempDir, false);
  79.     int count = 0;
  80.     while (rIter.next()) {
  81.       count++;
  82.     }
  83.     assertEquals(RECORDS, count);
  84.     assertEquals(1.0f, rIter.getProgress().get());
  85.   }
  86. }