TestPipes.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:10k
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.apache.hadoop.mapred.pipes;
- import java.io.DataOutputStream;
- import java.io.IOException;
- import java.util.List;
- import java.util.ArrayList;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.hdfs.MiniDFSCluster;
- import org.apache.hadoop.fs.FileUtil;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.mapred.Counters;
- import org.apache.hadoop.mapred.FileInputFormat;
- import org.apache.hadoop.mapred.FileOutputFormat;
- import org.apache.hadoop.mapred.JobConf;
- import org.apache.hadoop.mapred.MiniMRCluster;
- import org.apache.hadoop.mapred.OutputLogFilter;
- import org.apache.hadoop.mapred.RunningJob;
- import org.apache.hadoop.mapred.TestMiniMRWithDFS;
- import org.apache.hadoop.mapred.Counters.Counter;
- import org.apache.hadoop.util.StringUtils;
- import junit.framework.TestCase;
- public class TestPipes extends TestCase {
- private static final Log LOG =
- LogFactory.getLog(TestPipes.class.getName());
- static void cleanup(FileSystem fs, Path p) throws IOException {
- fs.delete(p, true);
- assertFalse("output not cleaned up", fs.exists(p));
- }
- public void testPipes() throws IOException {
- if (System.getProperty("compile.c++") == null) {
- LOG.info("compile.c++ is not defined, so skipping TestPipes");
- return;
- }
- MiniDFSCluster dfs = null;
- MiniMRCluster mr = null;
- Path cppExamples = new Path(System.getProperty("install.c++.examples"));
- Path inputPath = new Path("/testing/in");
- Path outputPath = new Path("/testing/out");
- try {
- final int numSlaves = 2;
- Configuration conf = new Configuration();
- dfs = new MiniDFSCluster(conf, numSlaves, true, null);
- mr = new MiniMRCluster(numSlaves, dfs.getFileSystem().getName(), 1);
- writeInputFile(dfs.getFileSystem(), inputPath);
- runProgram(mr, dfs, new Path(cppExamples, "bin/wordcount-simple"),
- inputPath, outputPath, 3, 2, twoSplitOutput);
- cleanup(dfs.getFileSystem(), outputPath);
- runProgram(mr, dfs, new Path(cppExamples, "bin/wordcount-simple"),
- inputPath, outputPath, 3, 0, noSortOutput);
- cleanup(dfs.getFileSystem(), outputPath);
- runProgram(mr, dfs, new Path(cppExamples, "bin/wordcount-part"),
- inputPath, outputPath, 3, 2, fixedPartitionOutput);
- runNonPipedProgram(mr, dfs, new Path(cppExamples,"bin/wordcount-nopipe"));
- mr.waitUntilIdle();
- } finally {
- mr.shutdown();
- dfs.shutdown();
- }
- }
- final static String[] twoSplitOutput = new String[] {
- "`andt1nat1nandt1nbeginningt1nbookt1nbutt1nbyt1n" +
- "conversation?'t1ndo:t1nhadt2nhavingt1nhert2nint1nitt1n"+
- "it,t1nnot1nnothingt1noft3nont1noncet1nort3npeepedt1n"+
- "picturest2nthet3nthoughtt1ntot2nuset1nwast2n",
- "Alicet2n`withoutt1nbank,t1nbook,'t1nconversationst1ngett1n" +
- "intot1nist1nreading,t1nshet1nsistert2nsittingt1ntiredt1n" +
- "twicet1nveryt1nwhatt1n"
- };
- final static String[] noSortOutput = new String[] {
- "it,t1n`andt1nwhatt1nist1nthet1nuset1noft1nat1n" +
- "book,'t1nthoughtt1nAlicet1n`withoutt1npicturest1nort1n"+
- "conversation?'t1n",
- "Alicet1nwast1nbeginningt1ntot1ngett1nveryt1ntiredt1n"+
- "oft1nsittingt1nbyt1nhert1nsistert1nont1nthet1nbank,t1n"+
- "andt1noft1nhavingt1nnothingt1ntot1ndo:t1noncet1n",
- "ort1ntwicet1nshet1nhadt1npeepedt1nintot1nthet1nbookt1n"+
- "hert1nsistert1nwast1nreading,t1nbutt1nitt1nhadt1nnot1n"+
- "picturest1nort1nconversationst1nint1n"
- };
-
- final static String[] fixedPartitionOutput = new String[] {
- "Alicet2n`andt1n`withoutt1nat1nandt1nbank,t1nbeginningt1n" +
- "bookt1nbook,'t1nbutt1nbyt1nconversation?'t1nconversationst1n"+
- "do:t1ngett1nhadt2nhavingt1nhert2nint1nintot1nist1n" +
- "itt1nit,t1nnot1nnothingt1noft3nont1noncet1nort3n" +
- "peepedt1npicturest2nreading,t1nshet1nsistert2nsittingt1n" +
- "thet3nthoughtt1ntiredt1ntot2ntwicet1nuset1n" +
- "veryt1nwast2nwhatt1n",
-
- ""
- };
-
- private void writeInputFile(FileSystem fs, Path dir) throws IOException {
- DataOutputStream out = fs.create(new Path(dir, "part0"));
- out.writeBytes("Alice was beginning to get very tired of sitting by hern");
- out.writeBytes("sister on the bank, and of having nothing to do: oncen");
- out.writeBytes("or twice she had peeped into the book her sister wasn");
- out.writeBytes("reading, but it had no pictures or conversations inn");
- out.writeBytes("it, `and what is the use of a book,' thought Alicen");
- out.writeBytes("`without pictures or conversation?'n");
- out.close();
- }
- private void runProgram(MiniMRCluster mr, MiniDFSCluster dfs,
- Path program, Path inputPath, Path outputPath,
- int numMaps, int numReduces, String[] expectedResults
- ) throws IOException {
- Path wordExec = new Path("/testing/bin/application");
- JobConf job = mr.createJobConf();
- job.setNumMapTasks(numMaps);
- job.setNumReduceTasks(numReduces);
- {
- FileSystem fs = dfs.getFileSystem();
- fs.delete(wordExec.getParent(), true);
- fs.copyFromLocalFile(program, wordExec);
- Submitter.setExecutable(job, fs.makeQualified(wordExec).toString());
- Submitter.setIsJavaRecordReader(job, true);
- Submitter.setIsJavaRecordWriter(job, true);
- FileInputFormat.setInputPaths(job, inputPath);
- FileOutputFormat.setOutputPath(job, outputPath);
- RunningJob rJob = null;
- if (numReduces == 0) {
- rJob = Submitter.jobSubmit(job);
-
- while (!rJob.isComplete()) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ie) {
- throw new RuntimeException(ie);
- }
- }
- } else {
- rJob = Submitter.runJob(job);
- }
- assertTrue("pipes job failed", rJob.isSuccessful());
-
- Counters counters = rJob.getCounters();
- Counters.Group wordCountCounters = counters.getGroup("WORDCOUNT");
- int numCounters = 0;
- for (Counter c : wordCountCounters) {
- System.out.println(c);
- ++numCounters;
- }
- assertTrue("No counters found!", (numCounters > 0));
- }
- List<String> results = new ArrayList<String>();
- for (Path p:FileUtil.stat2Paths(dfs.getFileSystem().listStatus(outputPath,
- new OutputLogFilter()))) {
- results.add(TestMiniMRWithDFS.readOutput(p, job));
- }
- assertEquals("number of reduces is wrong",
- expectedResults.length, results.size());
- for(int i=0; i < results.size(); i++) {
- assertEquals("pipes program " + program + " output " + i + " wrong",
- expectedResults[i], results.get(i));
- }
- }
-
- /**
- * Run a map/reduce word count that does all of the map input and reduce
- * output directly rather than sending it back up to Java.
- * @param mr The mini mr cluster
- * @param dfs the dfs cluster
- * @param program the program to run
- * @throws IOException
- */
- private void runNonPipedProgram(MiniMRCluster mr, MiniDFSCluster dfs,
- Path program) throws IOException {
- JobConf job = mr.createJobConf();
- job.setInputFormat(WordCountInputFormat.class);
- FileSystem local = FileSystem.getLocal(job);
- Path testDir = new Path("file:" + System.getProperty("test.build.data"),
- "pipes");
- Path inDir = new Path(testDir, "input");
- Path outDir = new Path(testDir, "output");
- Path wordExec = new Path("/testing/bin/application");
- Path jobXml = new Path(testDir, "job.xml");
- {
- FileSystem fs = dfs.getFileSystem();
- fs.delete(wordExec.getParent(), true);
- fs.copyFromLocalFile(program, wordExec);
- }
- DataOutputStream out = local.create(new Path(inDir, "part0"));
- out.writeBytes("i am a silly testn");
- out.writeBytes("you are sillyn");
- out.writeBytes("i am a cat testn");
- out.writeBytes("you is sillyn");
- out.writeBytes("i am a billy testn");
- out.writeBytes("hello are sillyn");
- out.close();
- out = local.create(new Path(inDir, "part1"));
- out.writeBytes("mall world things drink javan");
- out.writeBytes("hall silly cats drink javan");
- out.writeBytes("all dogs bow wown");
- out.writeBytes("hello drink javan");
- out.close();
- local.delete(outDir, true);
- local.mkdirs(outDir);
- out = local.create(jobXml);
- job.writeXml(out);
- out.close();
- System.err.println("About to run: Submitter -conf " + jobXml +
- " -input " + inDir + " -output " + outDir +
- " -program " +
- dfs.getFileSystem().makeQualified(wordExec));
- try {
- Submitter.main(new String[]{"-conf", jobXml.toString(),
- "-input", inDir.toString(),
- "-output", outDir.toString(),
- "-program",
- dfs.getFileSystem().makeQualified(wordExec).toString(),
- "-reduces", "2"});
- } catch (Exception e) {
- assertTrue("got exception: " + StringUtils.stringifyException(e), false);
- }
- }
- }