TestPipes.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:10k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred.pipes;
  19. import java.io.DataOutputStream;
  20. import java.io.IOException;
  21. import java.util.List;
  22. import java.util.ArrayList;
  23. import org.apache.commons.logging.Log;
  24. import org.apache.commons.logging.LogFactory;
  25. import org.apache.hadoop.conf.Configuration;
  26. import org.apache.hadoop.hdfs.MiniDFSCluster;
  27. import org.apache.hadoop.fs.FileUtil;
  28. import org.apache.hadoop.fs.FileSystem;
  29. import org.apache.hadoop.fs.Path;
  30. import org.apache.hadoop.mapred.Counters;
  31. import org.apache.hadoop.mapred.FileInputFormat;
  32. import org.apache.hadoop.mapred.FileOutputFormat;
  33. import org.apache.hadoop.mapred.JobConf;
  34. import org.apache.hadoop.mapred.MiniMRCluster;
  35. import org.apache.hadoop.mapred.OutputLogFilter;
  36. import org.apache.hadoop.mapred.RunningJob;
  37. import org.apache.hadoop.mapred.TestMiniMRWithDFS;
  38. import org.apache.hadoop.mapred.Counters.Counter;
  39. import org.apache.hadoop.util.StringUtils;
  40. import junit.framework.TestCase;
  41. public class TestPipes extends TestCase {
  42.   private static final Log LOG =
  43.     LogFactory.getLog(TestPipes.class.getName());
  44.   static void cleanup(FileSystem fs, Path p) throws IOException {
  45.     fs.delete(p, true);
  46.     assertFalse("output not cleaned up", fs.exists(p));
  47.   }
  48.   public void testPipes() throws IOException {
  49.     if (System.getProperty("compile.c++") == null) {
  50.       LOG.info("compile.c++ is not defined, so skipping TestPipes");
  51.       return;
  52.     }
  53.     MiniDFSCluster dfs = null;
  54.     MiniMRCluster mr = null;
  55.     Path cppExamples = new Path(System.getProperty("install.c++.examples"));
  56.     Path inputPath = new Path("/testing/in");
  57.     Path outputPath = new Path("/testing/out");
  58.     try {
  59.       final int numSlaves = 2;
  60.       Configuration conf = new Configuration();
  61.       dfs = new MiniDFSCluster(conf, numSlaves, true, null);
  62.       mr = new MiniMRCluster(numSlaves, dfs.getFileSystem().getName(), 1);
  63.       writeInputFile(dfs.getFileSystem(), inputPath);
  64.       runProgram(mr, dfs, new Path(cppExamples, "bin/wordcount-simple"), 
  65.                  inputPath, outputPath, 3, 2, twoSplitOutput);
  66.       cleanup(dfs.getFileSystem(), outputPath);
  67.       runProgram(mr, dfs, new Path(cppExamples, "bin/wordcount-simple"), 
  68.                  inputPath, outputPath, 3, 0, noSortOutput);
  69.       cleanup(dfs.getFileSystem(), outputPath);
  70.       runProgram(mr, dfs, new Path(cppExamples, "bin/wordcount-part"),
  71.                  inputPath, outputPath, 3, 2, fixedPartitionOutput);
  72.       runNonPipedProgram(mr, dfs, new Path(cppExamples,"bin/wordcount-nopipe"));
  73.       mr.waitUntilIdle();
  74.     } finally {
  75.       mr.shutdown();
  76.       dfs.shutdown();
  77.     }
  78.   }
  79.   final static String[] twoSplitOutput = new String[] {
  80.     "`andt1nat1nandt1nbeginningt1nbookt1nbutt1nbyt1n" +
  81.     "conversation?'t1ndo:t1nhadt2nhavingt1nhert2nint1nitt1n"+
  82.     "it,t1nnot1nnothingt1noft3nont1noncet1nort3npeepedt1n"+
  83.     "picturest2nthet3nthoughtt1ntot2nuset1nwast2n",
  84.     "Alicet2n`withoutt1nbank,t1nbook,'t1nconversationst1ngett1n" +
  85.     "intot1nist1nreading,t1nshet1nsistert2nsittingt1ntiredt1n" +
  86.     "twicet1nveryt1nwhatt1n"
  87.   };
  88.   final static String[] noSortOutput = new String[] {
  89.     "it,t1n`andt1nwhatt1nist1nthet1nuset1noft1nat1n" +
  90.     "book,'t1nthoughtt1nAlicet1n`withoutt1npicturest1nort1n"+
  91.     "conversation?'t1n",
  92.     "Alicet1nwast1nbeginningt1ntot1ngett1nveryt1ntiredt1n"+
  93.     "oft1nsittingt1nbyt1nhert1nsistert1nont1nthet1nbank,t1n"+
  94.     "andt1noft1nhavingt1nnothingt1ntot1ndo:t1noncet1n", 
  95.     "ort1ntwicet1nshet1nhadt1npeepedt1nintot1nthet1nbookt1n"+
  96.     "hert1nsistert1nwast1nreading,t1nbutt1nitt1nhadt1nnot1n"+
  97.     "picturest1nort1nconversationst1nint1n"
  98.   };
  99.   
  100.   final static String[] fixedPartitionOutput = new String[] {
  101.     "Alicet2n`andt1n`withoutt1nat1nandt1nbank,t1nbeginningt1n" +
  102.     "bookt1nbook,'t1nbutt1nbyt1nconversation?'t1nconversationst1n"+
  103.     "do:t1ngett1nhadt2nhavingt1nhert2nint1nintot1nist1n" +
  104.     "itt1nit,t1nnot1nnothingt1noft3nont1noncet1nort3n" +
  105.     "peepedt1npicturest2nreading,t1nshet1nsistert2nsittingt1n" +
  106.     "thet3nthoughtt1ntiredt1ntot2ntwicet1nuset1n" +
  107.     "veryt1nwast2nwhatt1n",
  108.     
  109.     ""                                                   
  110.   };
  111.   
  112.   private void writeInputFile(FileSystem fs, Path dir) throws IOException {
  113.     DataOutputStream out = fs.create(new Path(dir, "part0"));
  114.     out.writeBytes("Alice was beginning to get very tired of sitting by hern");
  115.     out.writeBytes("sister on the bank, and of having nothing to do: oncen");
  116.     out.writeBytes("or twice she had peeped into the book her sister wasn");
  117.     out.writeBytes("reading, but it had no pictures or conversations inn");
  118.     out.writeBytes("it, `and what is the use of a book,' thought Alicen");
  119.     out.writeBytes("`without pictures or conversation?'n");
  120.     out.close();
  121.   }
  122.   private void runProgram(MiniMRCluster mr, MiniDFSCluster dfs, 
  123.                           Path program, Path inputPath, Path outputPath,
  124.                           int numMaps, int numReduces, String[] expectedResults
  125.                          ) throws IOException {
  126.     Path wordExec = new Path("/testing/bin/application");
  127.     JobConf job = mr.createJobConf();
  128.     job.setNumMapTasks(numMaps);
  129.     job.setNumReduceTasks(numReduces);
  130.     {
  131.       FileSystem fs = dfs.getFileSystem();
  132.       fs.delete(wordExec.getParent(), true);
  133.       fs.copyFromLocalFile(program, wordExec);                                         
  134.       Submitter.setExecutable(job, fs.makeQualified(wordExec).toString());
  135.       Submitter.setIsJavaRecordReader(job, true);
  136.       Submitter.setIsJavaRecordWriter(job, true);
  137.       FileInputFormat.setInputPaths(job, inputPath);
  138.       FileOutputFormat.setOutputPath(job, outputPath);
  139.       RunningJob rJob = null;
  140.       if (numReduces == 0) {
  141.         rJob = Submitter.jobSubmit(job);
  142.         
  143.         while (!rJob.isComplete()) {
  144.           try {
  145.             Thread.sleep(1000);
  146.           } catch (InterruptedException ie) {
  147.             throw new RuntimeException(ie);
  148.           }
  149.         }
  150.       } else {
  151.         rJob = Submitter.runJob(job);
  152.       }
  153.       assertTrue("pipes job failed", rJob.isSuccessful());
  154.       
  155.       Counters counters = rJob.getCounters();
  156.       Counters.Group wordCountCounters = counters.getGroup("WORDCOUNT");
  157.       int numCounters = 0;
  158.       for (Counter c : wordCountCounters) {
  159.         System.out.println(c);
  160.         ++numCounters;
  161.       }
  162.       assertTrue("No counters found!", (numCounters > 0));
  163.     }
  164.     List<String> results = new ArrayList<String>();
  165.     for (Path p:FileUtil.stat2Paths(dfs.getFileSystem().listStatus(outputPath,
  166.                              new OutputLogFilter()))) {
  167.       results.add(TestMiniMRWithDFS.readOutput(p, job));
  168.     }
  169.     assertEquals("number of reduces is wrong", 
  170.                  expectedResults.length, results.size());
  171.     for(int i=0; i < results.size(); i++) {
  172.       assertEquals("pipes program " + program + " output " + i + " wrong",
  173.                    expectedResults[i], results.get(i));
  174.     }
  175.   }
  176.   
  177.   /**
  178.    * Run a map/reduce word count that does all of the map input and reduce
  179.    * output directly rather than sending it back up to Java.
  180.    * @param mr The mini mr cluster
  181.    * @param dfs the dfs cluster
  182.    * @param program the program to run
  183.    * @throws IOException
  184.    */
  185.   private void runNonPipedProgram(MiniMRCluster mr, MiniDFSCluster dfs,
  186.                                   Path program) throws IOException {
  187.     JobConf job = mr.createJobConf();
  188.     job.setInputFormat(WordCountInputFormat.class);
  189.     FileSystem local = FileSystem.getLocal(job);
  190.     Path testDir = new Path("file:" + System.getProperty("test.build.data"), 
  191.                             "pipes");
  192.     Path inDir = new Path(testDir, "input");
  193.     Path outDir = new Path(testDir, "output");
  194.     Path wordExec = new Path("/testing/bin/application");
  195.     Path jobXml = new Path(testDir, "job.xml");
  196.     {
  197.       FileSystem fs = dfs.getFileSystem();
  198.       fs.delete(wordExec.getParent(), true);
  199.       fs.copyFromLocalFile(program, wordExec);
  200.     }
  201.     DataOutputStream out = local.create(new Path(inDir, "part0"));
  202.     out.writeBytes("i am a silly testn");
  203.     out.writeBytes("you are sillyn");
  204.     out.writeBytes("i am a cat testn");
  205.     out.writeBytes("you is sillyn");
  206.     out.writeBytes("i am a billy testn");
  207.     out.writeBytes("hello are sillyn");
  208.     out.close();
  209.     out = local.create(new Path(inDir, "part1"));
  210.     out.writeBytes("mall world things drink javan");
  211.     out.writeBytes("hall silly cats drink javan");
  212.     out.writeBytes("all dogs bow wown");
  213.     out.writeBytes("hello drink javan");
  214.     out.close();
  215.     local.delete(outDir, true);
  216.     local.mkdirs(outDir);
  217.     out = local.create(jobXml);
  218.     job.writeXml(out);
  219.     out.close();
  220.     System.err.println("About to run: Submitter -conf " + jobXml + 
  221.                        " -input " + inDir + " -output " + outDir + 
  222.                        " -program " + 
  223.                        dfs.getFileSystem().makeQualified(wordExec));
  224.     try {
  225.       Submitter.main(new String[]{"-conf", jobXml.toString(),
  226.                                   "-input", inDir.toString(),
  227.                                   "-output", outDir.toString(),
  228.                                   "-program", 
  229.                         dfs.getFileSystem().makeQualified(wordExec).toString(),
  230.                                   "-reduces", "2"});
  231.     } catch (Exception e) {
  232.       assertTrue("got exception: " + StringUtils.stringifyException(e), false);
  233.     }
  234.   }
  235. }