TestMiniMRClasspath.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:8k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.*;
  20. import junit.framework.TestCase;
  21. import org.apache.hadoop.conf.Configuration;
  22. import org.apache.hadoop.hdfs.MiniDFSCluster;
  23. import org.apache.hadoop.fs.FileSystem;
  24. import org.apache.hadoop.fs.FileUtil;
  25. import org.apache.hadoop.fs.Path;
  26. import org.apache.hadoop.io.IntWritable;
  27. import org.apache.hadoop.io.Text;
  28. /**
  29.  * A JUnit test to test Mini Map-Reduce Cluster with multiple directories
  30.  * and check for correct classpath
  31.  */
  32. public class TestMiniMRClasspath extends TestCase {
  33.   
  34.   
  35.   static String launchWordCount(String fileSys,
  36.                                 String jobTracker,
  37.                                 JobConf conf,
  38.                                 String input,
  39.                                 int numMaps,
  40.                                 int numReduces) throws IOException {
  41.     final Path inDir = new Path("/testing/wc/input");
  42.     final Path outDir = new Path("/testing/wc/output");
  43.     FileSystem fs = FileSystem.getNamed(fileSys, conf);
  44.     fs.delete(outDir, true);
  45.     if (!fs.mkdirs(inDir)) {
  46.       throw new IOException("Mkdirs failed to create " + inDir.toString());
  47.     }
  48.     {
  49.       DataOutputStream file = fs.create(new Path(inDir, "part-0"));
  50.       file.writeBytes(input);
  51.       file.close();
  52.     }
  53.     FileSystem.setDefaultUri(conf, fileSys);
  54.     conf.set("mapred.job.tracker", jobTracker);
  55.     conf.setJobName("wordcount");
  56.     conf.setInputFormat(TextInputFormat.class);
  57.     
  58.     // the keys are words (strings)
  59.     conf.setOutputKeyClass(Text.class);
  60.     // the values are counts (ints)
  61.     conf.setOutputValueClass(IntWritable.class);
  62.     
  63.     conf.set("mapred.mapper.class", "testjar.ClassWordCount$MapClass");        
  64.     conf.set("mapred.combine.class", "testjar.ClassWordCount$Reduce");
  65.     conf.set("mapred.reducer.class", "testjar.ClassWordCount$Reduce");
  66.     FileInputFormat.setInputPaths(conf, inDir);
  67.     FileOutputFormat.setOutputPath(conf, outDir);
  68.     conf.setNumMapTasks(numMaps);
  69.     conf.setNumReduceTasks(numReduces);
  70.     //pass a job.jar already included in the hadoop build
  71.     conf.setJar("build/test/testjar/testjob.jar");
  72.     JobClient.runJob(conf);
  73.     StringBuffer result = new StringBuffer();
  74.     {
  75.       Path[] parents = FileUtil.stat2Paths(fs.listStatus(outDir.getParent()));
  76.       Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
  77.               new OutputLogFilter()));
  78.       for(int i=0; i < fileList.length; ++i) {
  79.         BufferedReader file = 
  80.           new BufferedReader(new InputStreamReader(fs.open(fileList[i])));
  81.         String line = file.readLine();
  82.         while (line != null) {
  83.           result.append(line);
  84.           result.append("n");
  85.           line = file.readLine();
  86.         }
  87.         file.close();
  88.       }
  89.     }
  90.     return result.toString();
  91.   }
  92.   static String launchExternal(String fileSys, String jobTracker, JobConf conf,
  93.                                String input, int numMaps, int numReduces)
  94.     throws IOException {
  95.     final Path inDir = new Path("/testing/ext/input");
  96.     final Path outDir = new Path("/testing/ext/output");
  97.     FileSystem fs = FileSystem.getNamed(fileSys, conf);
  98.     fs.delete(outDir, true);
  99.     if (!fs.mkdirs(inDir)) {
  100.       throw new IOException("Mkdirs failed to create " + inDir.toString());
  101.     }
  102.     {
  103.       DataOutputStream file = fs.create(new Path(inDir, "part-0"));
  104.       file.writeBytes(input);
  105.       file.close();
  106.     }
  107.     FileSystem.setDefaultUri(conf, fileSys);
  108.     conf.set("mapred.job.tracker", jobTracker);
  109.     conf.setJobName("wordcount");
  110.     conf.setInputFormat(TextInputFormat.class);
  111.     // the keys are counts
  112.     conf.setOutputValueClass(IntWritable.class);
  113.     // the values are the messages
  114.     conf.set("mapred.output.key.class", "testjar.ExternalWritable");
  115.     FileInputFormat.setInputPaths(conf, inDir);
  116.     FileOutputFormat.setOutputPath(conf, outDir);
  117.     conf.setNumMapTasks(numMaps);
  118.     conf.setNumReduceTasks(numReduces);
  119.     
  120.     conf.set("mapred.mapper.class", "testjar.ExternalMapperReducer"); 
  121.     conf.set("mapred.reducer.class", "testjar.ExternalMapperReducer");
  122.     //pass a job.jar already included in the hadoop build
  123.     conf.setJar("build/test/testjar/testjob.jar");
  124.     JobClient.runJob(conf);
  125.     StringBuffer result = new StringBuffer();
  126.     Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
  127.                                  new OutputLogFilter()));
  128.     for (int i = 0; i < fileList.length; ++i) {
  129.       BufferedReader file = new BufferedReader(new InputStreamReader(
  130.                                                                      fs.open(fileList[i])));
  131.       String line = file.readLine();
  132.       while (line != null) {
  133.         result.append(line);
  134.         line = file.readLine();
  135.         result.append("n");
  136.       }
  137.       file.close();
  138.     }
  139.     return result.toString();
  140.   }
  141.    
  142.   public void testClassPath() throws IOException {
  143.     String namenode = null;
  144.     MiniDFSCluster dfs = null;
  145.     MiniMRCluster mr = null;
  146.     FileSystem fileSys = null;
  147.     try {
  148.       final int taskTrackers = 4;
  149.       final int jobTrackerPort = 60050;
  150.       Configuration conf = new Configuration();
  151.       dfs = new MiniDFSCluster(conf, 1, true, null);
  152.       fileSys = dfs.getFileSystem();
  153.       namenode = fileSys.getName();
  154.       mr = new MiniMRCluster(taskTrackers, namenode, 3);
  155.       JobConf jobConf = new JobConf();
  156.       String result;
  157.       final String jobTrackerName = "localhost:" + mr.getJobTrackerPort();
  158.       result = launchWordCount(namenode, jobTrackerName, jobConf, 
  159.                                "The quick brown foxnhas many sillyn" + 
  160.                                "red fox soxn",
  161.                                3, 1);
  162.       assertEquals("Thet1nbrownt1nfoxt2nhast1nmanyt1n" +
  163.                    "quickt1nredt1nsillyt1nsoxt1n", result);
  164.           
  165.     } finally {
  166.       if (dfs != null) { dfs.shutdown(); }
  167.       if (mr != null) { mr.shutdown();
  168.       }
  169.     }
  170.   }
  171.   
  172.   public void testExternalWritable()
  173.     throws IOException {
  174.  
  175.     String namenode = null;
  176.     MiniDFSCluster dfs = null;
  177.     MiniMRCluster mr = null;
  178.     FileSystem fileSys = null;
  179.     try {
  180.       
  181.       final int taskTrackers = 4;
  182.       Configuration conf = new Configuration();
  183.       dfs = new MiniDFSCluster(conf, 1, true, null);
  184.       fileSys = dfs.getFileSystem();
  185.       namenode = fileSys.getName();
  186.       mr = new MiniMRCluster(taskTrackers, namenode, 3);      
  187.       JobConf jobConf = new JobConf();
  188.       String result;
  189.       final String jobTrackerName = "localhost:" + mr.getJobTrackerPort();
  190.       
  191.       result = launchExternal(namenode, jobTrackerName, jobConf, 
  192.                               "Dennis was here!nDennis again!",
  193.                               3, 1);
  194.       assertEquals("Dennis again!t1nDennis was here!t1n", result);
  195.       
  196.     } 
  197.     finally {
  198.       if (dfs != null) { dfs.shutdown(); }
  199.       if (mr != null) { mr.shutdown();
  200.       }
  201.     }
  202.   }
  203.   
  204. }