TestReduceTask.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:5k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.IOException;
  20. import junit.framework.TestCase;
  21. import org.apache.hadoop.conf.Configuration;
  22. import org.apache.hadoop.fs.FileSystem;
  23. import org.apache.hadoop.fs.LocalFileSystem;
  24. import org.apache.hadoop.fs.Path;
  25. import org.apache.hadoop.io.Text;
  26. import org.apache.hadoop.io.WritableComparator;
  27. import org.apache.hadoop.io.compress.CompressionCodec;
  28. import org.apache.hadoop.io.compress.DefaultCodec;
  29. import org.apache.hadoop.util.Progressable;
  30. /**
  31.  * This test exercises the ValueIterator.
  32.  */
  33. public class TestReduceTask extends TestCase {
  34.   static class NullProgress implements Progressable {
  35.     public void progress() { }
  36.   }
  37.   private static class Pair {
  38.     String key;
  39.     String value;
  40.     Pair(String k, String v) {
  41.       key = k;
  42.       value = v;
  43.     }
  44.   }
  45.   private static Pair[][] testCases =
  46.     new Pair[][]{
  47.       new Pair[]{
  48.                  new Pair("k1", "v1"),
  49.                  new Pair("k2", "v2"),
  50.                  new Pair("k3", "v3"),
  51.                  new Pair("k3", "v4"),
  52.                  new Pair("k4", "v5"),
  53.                  new Pair("k5", "v6"),
  54.       },
  55.       new Pair[]{
  56.                  new Pair("", "v1"),
  57.                  new Pair("k1", "v2"),
  58.                  new Pair("k2", "v3"),
  59.                  new Pair("k2", "v4"),
  60.       },
  61.       new Pair[] {},
  62.       new Pair[]{
  63.                  new Pair("k1", "v1"),
  64.                  new Pair("k1", "v2"),
  65.                  new Pair("k1", "v3"),
  66.                  new Pair("k1", "v4"),
  67.       }
  68.     };
  69.   
  70.   public void runValueIterator(Path tmpDir, Pair[] vals, 
  71.                                Configuration conf, 
  72.                                CompressionCodec codec) throws IOException {
  73.     FileSystem localFs = FileSystem.getLocal(conf);
  74.     FileSystem rfs = ((LocalFileSystem)localFs).getRaw();
  75.     Path path = new Path(tmpDir, "data.in");
  76.     IFile.Writer<Text, Text> writer = 
  77.       new IFile.Writer<Text, Text>(conf, rfs, path, Text.class, Text.class,
  78.                                    codec, null);
  79.     for(Pair p: vals) {
  80.       writer.append(new Text(p.key), new Text(p.value));
  81.     }
  82.     writer.close();
  83.     
  84.     @SuppressWarnings("unchecked")
  85.     RawKeyValueIterator rawItr = 
  86.       Merger.merge(conf, rfs, Text.class, Text.class, codec, new Path[]{path}, 
  87.                    false, conf.getInt("io.sort.factor", 100), tmpDir, 
  88.                    new Text.Comparator(), new NullProgress(),null,null);
  89.     @SuppressWarnings("unchecked") // WritableComparators are not generic
  90.     ReduceTask.ValuesIterator valItr = 
  91.       new ReduceTask.ValuesIterator<Text,Text>(rawItr,
  92.           WritableComparator.get(Text.class), Text.class, Text.class,
  93.           conf, new NullProgress());
  94.     int i = 0;
  95.     while (valItr.more()) {
  96.       Object key = valItr.getKey();
  97.       String keyString = key.toString();
  98.       // make sure it matches!
  99.       assertEquals(vals[i].key, keyString);
  100.       // must have at least 1 value!
  101.       assertTrue(valItr.hasNext());
  102.       while (valItr.hasNext()) {
  103.         String valueString = valItr.next().toString();
  104.         // make sure the values match
  105.         assertEquals(vals[i].value, valueString);
  106.         // make sure the keys match
  107.         assertEquals(vals[i].key, valItr.getKey().toString());
  108.         i += 1;
  109.       }
  110.       // make sure the key hasn't changed under the hood
  111.       assertEquals(keyString, valItr.getKey().toString());
  112.       valItr.nextKey();
  113.     }
  114.     assertEquals(vals.length, i);
  115.     // make sure we have progress equal to 1.0
  116.     assertEquals(1.0f, rawItr.getProgress().get());
  117.   }
  118.   public void testValueIterator() throws Exception {
  119.     Path tmpDir = new Path("build/test/test.reduce.task");
  120.     Configuration conf = new Configuration();
  121.     for (Pair[] testCase: testCases) {
  122.       runValueIterator(tmpDir, testCase, conf, null);
  123.     }
  124.   }
  125.   
  126.   public void testValueIteratorWithCompression() throws Exception {
  127.     Path tmpDir = new Path("build/test/test.reduce.task.compression");
  128.     Configuration conf = new Configuration();
  129.     DefaultCodec codec = new DefaultCodec();
  130.     codec.setConf(conf);
  131.     for (Pair[] testCase: testCases) {
  132.       runValueIterator(tmpDir, testCase, conf, codec);
  133.     }
  134.   }
  135. }