TestMiniMRWithDFS.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:11k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.BufferedReader;
  20. import java.io.DataOutputStream;
  21. import java.io.File;
  22. import java.io.IOException;
  23. import java.io.InputStreamReader;
  24. import java.util.ArrayList;
  25. import java.util.Arrays;
  26. import java.util.List;
  27. import junit.framework.TestCase;
  28. import org.apache.commons.logging.Log;
  29. import org.apache.commons.logging.LogFactory;
  30. import org.apache.hadoop.conf.Configuration;
  31. import org.apache.hadoop.hdfs.MiniDFSCluster;
  32. import org.apache.hadoop.hdfs.server.namenode.NameNode;
  33. import org.apache.hadoop.fs.FileSystem;
  34. import org.apache.hadoop.fs.FileUtil;
  35. import org.apache.hadoop.fs.Path;
  36. import org.apache.hadoop.io.IntWritable;
  37. import org.apache.hadoop.io.Text;
  38. /**
  39.  * A JUnit test to test Mini Map-Reduce Cluster with Mini-DFS.
  40.  */
  41. public class TestMiniMRWithDFS extends TestCase {
  42.   private static final Log LOG =
  43.     LogFactory.getLog(TestMiniMRWithDFS.class.getName());
  44.   
  45.   static final int NUM_MAPS = 10;
  46.   static final int NUM_SAMPLES = 100000;
  47.   
  48.   public static class TestResult {
  49.     public String output;
  50.     public RunningJob job;
  51.     TestResult(RunningJob job, String output) {
  52.       this.job = job;
  53.       this.output = output;
  54.     }
  55.   }
  56.   public static TestResult launchWordCount(JobConf conf,
  57.                                            Path inDir,
  58.                                            Path outDir,
  59.                                            String input,
  60.                                            int numMaps,
  61.                                            int numReduces) throws IOException {
  62.     FileSystem inFs = inDir.getFileSystem(conf);
  63.     FileSystem outFs = outDir.getFileSystem(conf);
  64.     outFs.delete(outDir, true);
  65.     if (!inFs.mkdirs(inDir)) {
  66.       throw new IOException("Mkdirs failed to create " + inDir.toString());
  67.     }
  68.     {
  69.       DataOutputStream file = inFs.create(new Path(inDir, "part-0"));
  70.       file.writeBytes(input);
  71.       file.close();
  72.     }
  73.     conf.setJobName("wordcount");
  74.     conf.setInputFormat(TextInputFormat.class);
  75.     
  76.     // the keys are words (strings)
  77.     conf.setOutputKeyClass(Text.class);
  78.     // the values are counts (ints)
  79.     conf.setOutputValueClass(IntWritable.class);
  80.     
  81.     conf.setMapperClass(WordCount.MapClass.class);        
  82.     conf.setCombinerClass(WordCount.Reduce.class);
  83.     conf.setReducerClass(WordCount.Reduce.class);
  84.     FileInputFormat.setInputPaths(conf, inDir);
  85.     FileOutputFormat.setOutputPath(conf, outDir);
  86.     conf.setNumMapTasks(numMaps);
  87.     conf.setNumReduceTasks(numReduces);
  88.     RunningJob job = JobClient.runJob(conf);
  89.     return new TestResult(job, readOutput(outDir, conf));
  90.   }
  91.   public static String readOutput(Path outDir, 
  92.                                   JobConf conf) throws IOException {
  93.     FileSystem fs = outDir.getFileSystem(conf);
  94.     StringBuffer result = new StringBuffer();
  95.     {
  96.       
  97.       Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
  98.                                    new OutputLogFilter()));
  99.       for(int i=0; i < fileList.length; ++i) {
  100.         LOG.info("File list[" + i + "]" + ": "+ fileList[i]);
  101.         BufferedReader file = 
  102.           new BufferedReader(new InputStreamReader(fs.open(fileList[i])));
  103.         String line = file.readLine();
  104.         while (line != null) {
  105.           result.append(line);
  106.           result.append("n");
  107.           line = file.readLine();
  108.         }
  109.         file.close();
  110.       }
  111.     }
  112.     return result.toString();
  113.   }
  114.   
  115.   /**
  116.    * Make sure that there are exactly the directories that we expect to find.
  117.    * @param mr the map-reduce cluster
  118.    * @param taskDirs the task ids that should be present
  119.    */
  120.   static void checkTaskDirectories(MiniMRCluster mr,
  121.                                            String[] jobIds,
  122.                                            String[] taskDirs) {
  123.     mr.waitUntilIdle();
  124.     int trackers = mr.getNumTaskTrackers();
  125.     List<String> neededDirs = new ArrayList<String>(Arrays.asList(taskDirs));
  126.     boolean[] found = new boolean[taskDirs.length];
  127.     for(int i=0; i < trackers; ++i) {
  128.       int numNotDel = 0;
  129.       File localDir = new File(mr.getTaskTrackerLocalDir(i));
  130.       LOG.debug("Tracker directory: " + localDir);
  131.       File trackerDir = new File(localDir, "taskTracker");
  132.       assertTrue("local dir " + localDir + " does not exist.", 
  133.                  localDir.isDirectory());
  134.       assertTrue("task tracker dir " + trackerDir + " does not exist.", 
  135.                  trackerDir.isDirectory());
  136.       String contents[] = localDir.list();
  137.       String trackerContents[] = trackerDir.list();
  138.       for(int j=0; j < contents.length; ++j) {
  139.         System.out.println("Local " + localDir + ": " + contents[j]);
  140.       }
  141.       for(int j=0; j < trackerContents.length; ++j) {
  142.         System.out.println("Local jobcache " + trackerDir + ": " + trackerContents[j]);
  143.       }
  144.       for(int fileIdx = 0; fileIdx < contents.length; ++fileIdx) {
  145.         String name = contents[fileIdx];
  146.         if (!("taskTracker".equals(contents[fileIdx]))) {
  147.           LOG.debug("Looking at " + name);
  148.           assertTrue("Spurious directory " + name + " found in " +
  149.                      localDir, false);
  150.         }
  151.       }
  152.       for (int idx = 0; idx < neededDirs.size(); ++idx) {
  153.         String name = neededDirs.get(idx);
  154.         if (new File(new File(new File(trackerDir, "jobcache"),
  155.                               jobIds[idx]), name).isDirectory()) {
  156.           found[idx] = true;
  157.           numNotDel++;
  158.         }  
  159.       }
  160.     }
  161.     for(int i=0; i< found.length; i++) {
  162.       assertTrue("Directory " + taskDirs[i] + " not found", found[i]);
  163.     }
  164.   }
  165.   public static void runPI(MiniMRCluster mr, JobConf jobconf) throws IOException {
  166.     LOG.info("runPI");
  167.     double estimate = org.apache.hadoop.examples.PiEstimator.estimate(
  168.         NUM_MAPS, NUM_SAMPLES, jobconf).doubleValue();
  169.     double error = Math.abs(Math.PI - estimate);
  170.     assertTrue("Error in PI estimation "+error+" exceeds 0.01", (error < 0.01));
  171.     checkTaskDirectories(mr, new String[]{}, new String[]{});
  172.   }
  173.   public static void runWordCount(MiniMRCluster mr, JobConf jobConf) 
  174.   throws IOException {
  175.     LOG.info("runWordCount");
  176.     // Run a word count example
  177.     // Keeping tasks that match this pattern
  178.     String pattern = 
  179.       TaskAttemptID.getTaskAttemptIDsPattern(null, null, true, 1, null);
  180.     jobConf.setKeepTaskFilesPattern(pattern);
  181.     TestResult result;
  182.     final Path inDir = new Path("./wc/input");
  183.     final Path outDir = new Path("./wc/output");
  184.     String input = "The quick brown foxnhas many sillynred fox soxn";
  185.     result = launchWordCount(jobConf, inDir, outDir, input, 3, 1);
  186.     assertEquals("Thet1nbrownt1nfoxt2nhast1nmanyt1n" +
  187.                  "quickt1nredt1nsillyt1nsoxt1n", result.output);
  188.     JobID jobid = result.job.getID();
  189.     TaskAttemptID taskid = new TaskAttemptID(new TaskID(jobid, true, 1),0);
  190.     checkTaskDirectories(mr, new String[]{jobid.toString()}, 
  191.                          new String[]{taskid.toString()});
  192.     // test with maps=0
  193.     jobConf = mr.createJobConf();
  194.     input = "owen is oom";
  195.     result = launchWordCount(jobConf, inDir, outDir, input, 0, 1);
  196.     assertEquals("ist1noomt1nowent1n", result.output);
  197.     Counters counters = result.job.getCounters();
  198.     long hdfsRead = 
  199.       counters.findCounter(Task.FILESYSTEM_COUNTER_GROUP, 
  200.           Task.getFileSystemCounterNames("hdfs")[0]).getCounter();
  201.     long hdfsWrite = 
  202.       counters.findCounter(Task.FILESYSTEM_COUNTER_GROUP, 
  203.           Task.getFileSystemCounterNames("hdfs")[1]).getCounter();
  204.     assertEquals(result.output.length(), hdfsWrite);
  205.     assertEquals(input.length(), hdfsRead);
  206.     // Run a job with input and output going to localfs even though the 
  207.     // default fs is hdfs.
  208.     {
  209.       FileSystem localfs = FileSystem.getLocal(jobConf);
  210.       String TEST_ROOT_DIR =
  211.         new File(System.getProperty("test.build.data","/tmp"))
  212.         .toString().replace(' ', '+');
  213.       Path localIn = localfs.makeQualified
  214.                         (new Path(TEST_ROOT_DIR + "/local/in"));
  215.       Path localOut = localfs.makeQualified
  216.                         (new Path(TEST_ROOT_DIR + "/local/out"));
  217.       result = launchWordCount(jobConf, localIn, localOut,
  218.                                "all your base belong to us", 1, 1);
  219.       assertEquals("allt1nbaset1nbelongt1ntot1nust1nyourt1n", 
  220.                    result.output);
  221.       assertTrue("outputs on localfs", localfs.exists(localOut));
  222.     }
  223.   }
  224.   public void testWithDFS() throws IOException {
  225.     MiniDFSCluster dfs = null;
  226.     MiniMRCluster mr = null;
  227.     FileSystem fileSys = null;
  228.     try {
  229.       final int taskTrackers = 4;
  230.       Configuration conf = new Configuration();
  231.       dfs = new MiniDFSCluster(conf, 4, true, null);
  232.       fileSys = dfs.getFileSystem();
  233.       mr = new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1);
  234.       runPI(mr, mr.createJobConf());
  235.       runWordCount(mr, mr.createJobConf());
  236.     } finally {
  237.       if (dfs != null) { dfs.shutdown(); }
  238.       if (mr != null) { mr.shutdown();
  239.       }
  240.     }
  241.   }
  242.   
  243.   public void testWithDFSWithDefaultPort() throws IOException {
  244.     MiniDFSCluster dfs = null;
  245.     MiniMRCluster mr = null;
  246.     FileSystem fileSys = null;
  247.     try {
  248.       final int taskTrackers = 4;
  249.       Configuration conf = new Configuration();
  250.       // start a dfs with the default port number
  251.       dfs = new MiniDFSCluster(
  252.           NameNode.DEFAULT_PORT, conf, 4, true, true, null, null);
  253.       fileSys = dfs.getFileSystem();
  254.       mr = new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1);
  255.       JobConf jobConf = mr.createJobConf();
  256.       TestResult result;
  257.       final Path inDir = new Path("./wc/input");
  258.       final Path outDir = new Path("hdfs://" +
  259.           dfs.getNameNode().getNameNodeAddress().getHostName() +
  260.           ":" + NameNode.DEFAULT_PORT +"/./wc/output");
  261.       String input = "The quick brown foxnhas many sillynred fox soxn";
  262.       result = launchWordCount(jobConf, inDir, outDir, input, 3, 1);
  263.       assertEquals("Thet1nbrownt1nfoxt2nhast1nmanyt1n" +
  264.                    "quickt1nredt1nsillyt1nsoxt1n", result.output);
  265.       final Path outDir2 = new Path("hdfs:/test/wc/output2");
  266.       jobConf.set("fs.default.name", "hdfs://localhost:" + NameNode.DEFAULT_PORT);
  267.       result = launchWordCount(jobConf, inDir, outDir2, input, 3, 1);
  268.       assertEquals("Thet1nbrownt1nfoxt2nhast1nmanyt1n" +
  269.                    "quickt1nredt1nsillyt1nsoxt1n", result.output);
  270.     } catch (java.net.BindException be) {
  271.       LOG.info("Skip the test this time because can not start namenode on port "
  272.           + NameNode.DEFAULT_PORT, be);
  273.     } finally {
  274.       if (dfs != null) { dfs.shutdown(); }
  275.       if (mr != null) { mr.shutdown();
  276.       }
  277.     }
  278.   }
  279. }