TestMapCollection.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:9k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import junit.framework.TestCase;
  20. import java.io.IOException;
  21. import java.io.DataInput;
  22. import java.io.DataOutput;
  23. import java.util.Arrays;
  24. import java.util.Iterator;
  25. import junit.framework.TestCase;
  26. import org.apache.commons.logging.Log;
  27. import org.apache.commons.logging.LogFactory;
  28. import org.apache.hadoop.conf.Configuration;
  29. import org.apache.hadoop.io.*;
  30. import org.apache.hadoop.mapred.lib.NullOutputFormat;
  31. public class TestMapCollection extends TestCase {
  32.   private static final Log LOG = LogFactory.getLog(
  33.       TestMapCollection.class.getName());
  34.   public static class KeyWritable
  35.       implements WritableComparable<KeyWritable>, JobConfigurable {
  36.     private final byte c = (byte)('K' & 0xFF);
  37.     static private boolean pedantic = false;
  38.     protected int expectedlen;
  39.     public void configure(JobConf conf) {
  40.       expectedlen = conf.getInt("test.keywritable.length", 1);
  41.       pedantic = conf.getBoolean("test.pedantic.verification", false);
  42.     }
  43.     public KeyWritable() { }
  44.     public KeyWritable(int len) {
  45.       this();
  46.       expectedlen = len;
  47.     }
  48.     public int getLength() {
  49.       return expectedlen;
  50.     }
  51.     public int compareTo(KeyWritable o) {
  52.       if (o == this) return 0;
  53.       return expectedlen - o.getLength();
  54.     }
  55.     public boolean equals(Object o) {
  56.       if (o == this) return true;
  57.       if (!(o instanceof KeyWritable)) return false;
  58.       return 0 == compareTo((KeyWritable)o);
  59.     }
  60.     public int hashCode() {
  61.       return 37 * expectedlen;
  62.     }
  63.     public void readFields(DataInput in) throws IOException {
  64.       if (expectedlen != 0) {
  65.         int bytesread;
  66.         if (pedantic) {
  67.           for (int i = 0; i < expectedlen; ++i)
  68.             assertEquals("Invalid byte at " + i, c, in.readByte());
  69.           bytesread = expectedlen;
  70.         } else {
  71.           bytesread = in.skipBytes(expectedlen);
  72.         }
  73.         assertEquals("Too few bytes in record", expectedlen, bytesread);
  74.       }
  75.       // cannot verify that the stream has been exhausted
  76.     }
  77.     public void write(DataOutput out) throws IOException {
  78.       if (expectedlen != 0) {
  79.         if (expectedlen > 1024) {
  80.           byte[] b = new byte[expectedlen];
  81.           Arrays.fill(b, c);
  82.           out.write(b);
  83.         } else {
  84.           for (int i = 0; i < expectedlen; ++i) {
  85.             out.write(c);
  86.           }
  87.         }
  88.       }
  89.     }
  90.     public static class Comparator extends WritableComparator {
  91.       public Comparator() {
  92.         super(KeyWritable.class);
  93.       }
  94.       public int compare(byte[] b1, int s1, int l1,
  95.                          byte[] b2, int s2, int l2) {
  96.         if (pedantic) {
  97.           for (int i = s1; i < l1; ++i) {
  98.             assertEquals("Invalid key at " + s1, b1[i], (byte)('K' & 0xFF));
  99.           }
  100.           for (int i = s2; i < l2; ++i) {
  101.             assertEquals("Invalid key at " + s2, b2[i], (byte)('K' & 0xFF));
  102.           }
  103.         }
  104.         return l1 - l2;
  105.       }
  106.     }
  107.     static {
  108.       WritableComparator.define(KeyWritable.class, new Comparator());
  109.     }
  110.   }
  111.   public static class ValWritable extends KeyWritable {
  112.     private final byte c = (byte)('V' & 0xFF);
  113.     public ValWritable() { }
  114.     public ValWritable(int len) {
  115.       this();
  116.       expectedlen = len;
  117.     }
  118.     public void configure(JobConf conf) {
  119.       expectedlen = conf.getInt("test.valwritable.length", 1);
  120.     }
  121.   }
  122.   public static class SpillMapper
  123.       implements Mapper<NullWritable,NullWritable,KeyWritable,ValWritable> {
  124.     private int keylen = 1;
  125.     private int vallen = 1;
  126.     private int numrecs = 100;
  127.     public void configure(JobConf job) {
  128.       keylen = job.getInt("test.keywritable.length", 1);
  129.       vallen = job.getInt("test.valwritable.length", 1);
  130.       numrecs = job.getInt("test.spillmap.records", 100);
  131.     }
  132.     public void map(NullWritable key, NullWritable value,
  133.         OutputCollector<KeyWritable,ValWritable> out, Reporter reporter)
  134.         throws IOException {
  135.       KeyWritable k = new KeyWritable(keylen);
  136.       ValWritable v = new ValWritable(vallen);
  137.       for (int i = 0; i < numrecs; ++i) {
  138.         if ((i % 1000) == 0) {
  139.           reporter.progress();
  140.         }
  141.         out.collect(k, v);
  142.       }
  143.     }
  144.     public void close() { }
  145.   }
  146.   public static class SpillReducer
  147.       implements Reducer<KeyWritable,ValWritable,NullWritable,NullWritable> {
  148.     private int numrecs = 100;
  149.     public void configure(JobConf job) {
  150.       numrecs = job.getInt("test.spillmap.records", 100);
  151.     }
  152.     public void reduce(KeyWritable k, Iterator<ValWritable> values,
  153.         OutputCollector<NullWritable,NullWritable> out, Reporter reporter) {
  154.       int i = 0;
  155.       while (values.hasNext()) {
  156.         values.next();
  157.         ++i;
  158.       }
  159.       assertEquals("Unexpected record count (" + i + "/" +
  160.                    numrecs + ")", numrecs, i);
  161.     }
  162.     public void close() { }
  163.   }
  164.   public static class FakeSplit implements InputSplit {
  165.     public void write(DataOutput out) throws IOException { }
  166.     public void readFields(DataInput in) throws IOException { }
  167.     public long getLength() { return 0L; }
  168.     public String[] getLocations() { return new String[0]; }
  169.   }
  170.   public static class FakeIF
  171.       implements InputFormat<NullWritable,NullWritable> {
  172.     public FakeIF() { }
  173.     public InputSplit[] getSplits(JobConf conf, int numSplits) {
  174.       InputSplit[] splits = new InputSplit[numSplits];
  175.       for (int i = 0; i < splits.length; ++i) {
  176.         splits[i] = new FakeSplit();
  177.       }
  178.       return splits;
  179.     }
  180.     public RecordReader<NullWritable,NullWritable> getRecordReader(
  181.         InputSplit ignored, JobConf conf, Reporter reporter) {
  182.       return new RecordReader<NullWritable,NullWritable>() {
  183.         private boolean done = false;
  184.         public boolean next(NullWritable key, NullWritable value)
  185.             throws IOException {
  186.           if (done)
  187.             return false;
  188.           done = true;
  189.           return true;
  190.         }
  191.         public NullWritable createKey() { return NullWritable.get(); }
  192.         public NullWritable createValue() { return NullWritable.get(); }
  193.         public long getPos() throws IOException { return 0L; }
  194.         public void close() throws IOException { }
  195.         public float getProgress() throws IOException { return 0.0f; }
  196.       };
  197.     }
  198.   }
  199.   private static void runTest(String name, int keylen, int vallen,
  200.       int records, int ioSortMB, float recPer, float spillPer,
  201.       boolean pedantic) throws Exception {
  202.     JobConf conf = new JobConf(new Configuration(), SpillMapper.class);
  203.     conf.setInt("io.sort.mb", ioSortMB);
  204.     conf.set("io.sort.record.percent", Float.toString(recPer));
  205.     conf.set("io.sort.spill.percent", Float.toString(spillPer));
  206.     conf.setInt("test.keywritable.length", keylen);
  207.     conf.setInt("test.valwritable.length", vallen);
  208.     conf.setInt("test.spillmap.records", records);
  209.     conf.setBoolean("test.pedantic.verification", pedantic);
  210.     conf.setNumMapTasks(1);
  211.     conf.setNumReduceTasks(1);
  212.     conf.setInputFormat(FakeIF.class);
  213.     conf.setOutputFormat(NullOutputFormat.class);
  214.     conf.setMapperClass(SpillMapper.class);
  215.     conf.setReducerClass(SpillReducer.class);
  216.     conf.setMapOutputKeyClass(KeyWritable.class);
  217.     conf.setMapOutputValueClass(ValWritable.class);
  218.     LOG.info("Running " + name);
  219.     JobClient.runJob(conf);
  220.   }
  221.   private static void runTest(String name, int keylen, int vallen, int records,
  222.       boolean pedantic) throws Exception {
  223.     runTest(name, keylen, vallen, records, 1, 0.05f, .8f, pedantic);
  224.   }
  225.   public void testLastFill() throws Exception {
  226.     // last byte of record/key is the last/first byte in the spill buffer
  227.     runTest("vallastbyte", 128, 896, 1344, 1, 0.125f, 0.5f, true);
  228.     runTest("keylastbyte", 512, 1024, 896, 1, 0.125f, 0.5f, true);
  229.   }
  230.   public void testLargeRecords() throws Exception {
  231.     // maps emitting records larger than io.sort.mb
  232.     runTest("largerec", 100, 1024*1024, 5, false);
  233.     runTest("largekeyzeroval", 1024*1024, 0, 5, false);
  234.   }
  235.   public void testSpillPer() throws Exception {
  236.     // set non-default, 100% speculative spill boundary
  237.     runTest("fullspill2B", 1, 1, 10000, 1, 0.05f, 1.0f, true);
  238.     runTest("fullspill200B", 100, 100, 10000, 1, 0.05f, 1.0f, true);
  239.     runTest("fullspillbuf", 10 * 1024, 20 * 1024, 256, 1, 0.3f, 1.0f, true);
  240.     runTest("lt50perspill", 100, 100, 10000, 1, 0.05f, 0.3f, true);
  241.   }
  242.   public void testZeroLength() throws Exception {
  243.     // test key/value at zero-length
  244.     runTest("zeroval", 1, 0, 10000, true);
  245.     runTest("zerokey", 0, 1, 10000, true);
  246.     runTest("zerokeyval", 0, 0, 10000, false);
  247.     runTest("zerokeyvalfull", 0, 0, 10000, 1, 0.05f, 1.0f, false);
  248.   }
  249. }