TestMiniMRLocalFS.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:10k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.DataInput;
  20. import java.io.DataOutput;
  21. import java.io.DataOutputStream;
  22. import java.io.File;
  23. import java.io.IOException;
  24. import java.util.Iterator;
  25. import junit.framework.TestCase;
  26. import org.apache.hadoop.fs.FileSystem;
  27. import org.apache.hadoop.fs.Path;
  28. import org.apache.hadoop.io.IntWritable;
  29. import org.apache.hadoop.io.Text;
  30. import org.apache.hadoop.io.Writable;
  31. import org.apache.hadoop.io.WritableComparable;
  32. import org.apache.hadoop.io.WritableUtils;
  33. import org.apache.hadoop.mapred.MRCaching.TestResult;
  34. import org.apache.hadoop.util.Progressable;
  35. /**
  36.  * A JUnit test to test min map-reduce cluster with local file system.
  37.  */
  38. public class TestMiniMRLocalFS extends TestCase {
  39.   private static String TEST_ROOT_DIR =
  40.     new File(System.getProperty("test.build.data","/tmp"))
  41.     .toURI().toString().replace(' ', '+');
  42.     
  43.   public void testWithLocal() throws IOException {
  44.     MiniMRCluster mr = null;
  45.     try {
  46.       mr = new MiniMRCluster(2, "file:///", 3);
  47.       TestMiniMRWithDFS.runPI(mr, mr.createJobConf());
  48.       // run the wordcount example with caching
  49.       JobConf job = mr.createJobConf();
  50.       TestResult ret = MRCaching.launchMRCache(TEST_ROOT_DIR + "/wc/input",
  51.                                             TEST_ROOT_DIR + "/wc/output", 
  52.                                             TEST_ROOT_DIR + "/cachedir",
  53.                                             job,
  54.                                             "The quick brown foxn" 
  55.                                             + "has many sillyn"
  56.                                             + "red fox soxn");
  57.       // assert the number of lines read during caching
  58.       assertTrue("Failed test archives not matching", ret.isOutputOk);
  59.       // test the task report fetchers
  60.       JobClient client = new JobClient(job);
  61.       JobID jobid = ret.job.getID();
  62.       TaskReport[] reports;
  63.       reports = client.getSetupTaskReports(jobid);
  64.       assertEquals("number of setups", 2, reports.length);
  65.       reports = client.getMapTaskReports(jobid);
  66.       assertEquals("number of maps", 1, reports.length);
  67.       reports = client.getReduceTaskReports(jobid);
  68.       assertEquals("number of reduces", 1, reports.length);
  69.       reports = client.getCleanupTaskReports(jobid);
  70.       assertEquals("number of cleanups", 2, reports.length);
  71.       Counters counters = ret.job.getCounters();
  72.       assertEquals("number of map inputs", 3, 
  73.                    counters.getCounter(Task.Counter.MAP_INPUT_RECORDS));
  74.       assertEquals("number of reduce outputs", 9, 
  75.                    counters.getCounter(Task.Counter.REDUCE_OUTPUT_RECORDS));
  76.       runCustomFormats(mr);
  77.     } finally {
  78.       if (mr != null) { mr.shutdown(); }
  79.     }
  80.   }
  81.   
  82.   private void runCustomFormats(MiniMRCluster mr) throws IOException {
  83.     JobConf job = mr.createJobConf();
  84.     FileSystem fileSys = FileSystem.get(job);
  85.     Path testDir = new Path(TEST_ROOT_DIR + "/test_mini_mr_local");
  86.     Path outDir = new Path(testDir, "out");
  87.     System.out.println("testDir= " + testDir);
  88.     fileSys.delete(testDir, true);
  89.     
  90.     job.setInputFormat(MyInputFormat.class);
  91.     job.setOutputFormat(MyOutputFormat.class);
  92.     job.setOutputKeyClass(Text.class);
  93.     job.setOutputValueClass(IntWritable.class);
  94.     
  95.     job.setMapperClass(MyMapper.class);        
  96.     job.setReducerClass(MyReducer.class);
  97.     job.setNumMapTasks(100);
  98.     job.setNumReduceTasks(1);
  99.     // explicitly do not use "normal" job.setOutputPath to make sure
  100.     // that it is not hardcoded anywhere in the framework.
  101.     job.set("non.std.out", outDir.toString());
  102.     try {
  103.       JobClient.runJob(job);
  104.       String result = 
  105.         TestMiniMRWithDFS.readOutput(outDir, job);
  106.       assertEquals("output", ("aunt anniet1n" +
  107.                               "bumble boatt4n" +
  108.                               "crocodile pantst0n" +
  109.                               "duck-dogt5n"+
  110.                               "eggst2n" + 
  111.                               "finagle the agentt3n"), result);
  112.     } finally {
  113.       fileSys.delete(testDir, true);
  114.     }
  115.     
  116.   }
  117.   
  118.   private static class MyInputFormat
  119.     implements InputFormat<IntWritable, Text> {
  120.     
  121.     static final String[] data = new String[]{
  122.       "crocodile pants", 
  123.       "aunt annie", 
  124.       "eggs",
  125.       "finagle the agent",
  126.       "bumble boat", 
  127.       "duck-dog",
  128.     };
  129.     private static class MySplit implements InputSplit {
  130.       int first;
  131.       int length;
  132.       public MySplit() { }
  133.       public MySplit(int first, int length) {
  134.         this.first = first;
  135.         this.length = length;
  136.       }
  137.       public String[] getLocations() {
  138.         return new String[0];
  139.       }
  140.       public long getLength() {
  141.         return length;
  142.       }
  143.       public void write(DataOutput out) throws IOException {
  144.         WritableUtils.writeVInt(out, first);
  145.         WritableUtils.writeVInt(out, length);
  146.       }
  147.       public void readFields(DataInput in) throws IOException {
  148.         first = WritableUtils.readVInt(in);
  149.         length = WritableUtils.readVInt(in);
  150.       }
  151.     }
  152.     static class MyRecordReader implements RecordReader<IntWritable, Text> {
  153.       int index;
  154.       int past;
  155.       int length;
  156.       
  157.       MyRecordReader(int index, int length) {
  158.         this.index = index;
  159.         this.past = index + length;
  160.         this.length = length;
  161.       }
  162.       public boolean next(IntWritable key, Text value) throws IOException {
  163.         if (index < past) {
  164.           key.set(index);
  165.           value.set(data[index]);
  166.           index += 1;
  167.           return true;
  168.         }
  169.         return false;
  170.       }
  171.       
  172.       public IntWritable createKey() {
  173.         return new IntWritable();
  174.       }
  175.       
  176.       public Text createValue() {
  177.         return new Text();
  178.       }
  179.       public long getPos() throws IOException {
  180.         return index;
  181.       }
  182.       public void close() throws IOException {}
  183.       public float getProgress() throws IOException {
  184.         return 1.0f - (past-index)/length;
  185.       }
  186.     }
  187.     
  188.     public InputSplit[] getSplits(JobConf job, 
  189.                                   int numSplits) throws IOException {
  190.       return new MySplit[]{new MySplit(0, 1), new MySplit(1, 3),
  191.                            new MySplit(4, 2)};
  192.     }
  193.     public RecordReader<IntWritable, Text> getRecordReader(InputSplit split,
  194.                                                            JobConf job, 
  195.                                                            Reporter reporter)
  196.                                                            throws IOException {
  197.       MySplit sp = (MySplit) split;
  198.       return new MyRecordReader(sp.first, sp.length);
  199.     }
  200.     
  201.   }
  202.   
  203.   static class MyMapper extends MapReduceBase
  204.     implements Mapper<WritableComparable, Writable,
  205.                       WritableComparable, Writable> {
  206.     
  207.     public void map(WritableComparable key, Writable value, 
  208.                     OutputCollector<WritableComparable, Writable> out,
  209.                     Reporter reporter) throws IOException {
  210.       System.out.println("map: " + key + ", " + value);
  211.       out.collect((WritableComparable) value, key);
  212.       InputSplit split = reporter.getInputSplit();
  213.       if (split.getClass() != MyInputFormat.MySplit.class) {
  214.         throw new IOException("Got wrong split in MyMapper! " + 
  215.                               split.getClass().getName());
  216.       }
  217.     }
  218.   }
  219.   static class MyReducer extends MapReduceBase
  220.     implements Reducer<WritableComparable, Writable,
  221.                       WritableComparable, Writable> {
  222.     public void reduce(WritableComparable key, Iterator<Writable> values, 
  223.                        OutputCollector<WritableComparable, Writable> output,
  224.                        Reporter reporter) throws IOException {
  225.       try {
  226.         InputSplit split = reporter.getInputSplit();
  227.         throw new IOException("Got an input split of " + split);
  228.       } catch (UnsupportedOperationException e) {
  229.         // expected result
  230.       }
  231.       while (values.hasNext()) {
  232.         Writable value = values.next();
  233.         System.out.println("reduce: " + key + ", " + value);
  234.         output.collect(key, value);
  235.       }
  236.     }
  237.   }
  238.   static class MyOutputFormat implements OutputFormat {
  239.     static class MyRecordWriter implements RecordWriter<Object, Object> {
  240.       private DataOutputStream out;
  241.       
  242.       public MyRecordWriter(Path outputFile, JobConf job) throws IOException {
  243.         out = outputFile.getFileSystem(job).create(outputFile);
  244.       }
  245.       
  246.       public void write(Object key, Object value) throws IOException {
  247.         out.writeBytes(key.toString() + "t" + value.toString() + "n");
  248.       }
  249.       public void close(Reporter reporter) throws IOException { 
  250.         out.close();
  251.       }
  252.     }
  253.     
  254.     public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, 
  255.                                         String name,
  256.                                         Progressable progress
  257.                                         ) throws IOException {
  258.       return new MyRecordWriter(new Path(job.get("non.std.out")), job);
  259.     }
  260.     public void checkOutputSpecs(FileSystem ignored, 
  261.                                  JobConf job) throws IOException {
  262.     }
  263.   }
  264. }