TestStreamedMerge.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:10k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.streaming;
  19. import java.io.DataOutputStream;
  20. import java.io.File;
  21. import java.io.IOException;
  22. import java.io.InputStream;
  23. import java.io.OutputStream;
  24. import java.net.ServerSocket;
  25. import java.net.Socket;
  26. import java.util.ArrayList;
  27. import java.util.Arrays;
  28. import junit.framework.TestCase;
  29. import org.apache.hadoop.conf.Configuration;
  30. import org.apache.hadoop.hdfs.MiniDFSCluster;
  31. import org.apache.hadoop.fs.FileSystem;
  32. import org.apache.hadoop.fs.FsShell;
  33. import org.apache.hadoop.fs.Path;
  34. import org.apache.hadoop.io.Text;
  35. import org.apache.hadoop.util.LineReader;
  36. import org.apache.hadoop.util.ToolRunner;
  37. /**
  38.  * This JUnit test is not pure-Java and is not run as 
  39.  * part of the standard ant test* targets.   
  40.  * Two ways to run this:<pre>
  41.  * 1. main(), a Java application.
  42.  * 2. cd src/contrib/streaming/ 
  43.  *    ant 
  44.  *     [-Dfs.default.name=h:p]  
  45.  *     [-Dhadoop.test.localoutputfile=/tmp/fifo]  
  46.  *     test-unix 
  47.  * </pre>
  48.  */
  49. public class TestStreamedMerge extends TestCase {
  50.   public TestStreamedMerge() throws IOException {
  51.     UtilTest utilTest = new UtilTest(getClass().getName());
  52.     utilTest.checkUserDir();
  53.     //  utilTest.redirectIfAntJunit();
  54.   }
  55.   final static int NAME_PORT = 8200;
  56.   final static int SOC_PORT = 1888;
  57.   void addInput(String path, String contents) throws IOException {
  58.     OutputStream out = fs_.create(new Path(path));
  59.     DataOutputStream dout = new DataOutputStream(out);
  60.     dout.write(contents.getBytes("UTF-8"));
  61.     dout.close();
  62.     System.err.println("addInput done: " + path);
  63.   }
  64.   String createInputs(boolean tag) throws IOException {
  65.     fs_.delete(new Path("/input/"));
  66.     // i18n() replaces some ASCII with multibyte UTF-8 chars
  67.     addInput("/input/part-00", i18n("k1tv1n" + "k3tv5n"));
  68.     addInput("/input/part-01", i18n("k1tv2n" + "k2tv4n"));
  69.     addInput("/input/part-02", i18n("k1tv3n"));
  70.     addInput("/input/part-03", "");
  71.     
  72.     // tags are one-based: ">1" corresponds to part-00, etc.
  73.     // Expected result it the merge-sort order of the records.
  74.     // keys are compared as Strings and ties are broken by stream index
  75.     // For example (k1; stream 2) < (k1; stream 3)
  76.     String expect = i18n(
  77.                          unt(">1tk1tv1n", tag) + 
  78.                          unt(">2tk1tv2n", tag) + 
  79.                          unt(">3tk1tv3n", tag) + 
  80.                          unt(">2tk2tv4n", tag) +
  81.                          unt(">1tk3tv5n", tag)
  82.                          );
  83.     return expect;
  84.   }
  85.   
  86.   String unt(String line, boolean keepTag)
  87.   {
  88.     return keepTag ? line : line.substring(line.indexOf('t')+1);
  89.   }
  90.   String i18n(String c) {
  91.     c = c.replace('k', 'u20ac'); // Euro sign, in UTF-8: E282AC
  92.     c = c.replace('v', 'u00a2'); // Cent sign, in UTF-8: C2A2 ; UTF-16 contains null
  93.     // "ud800udc00" // A surrogate pair, U+10000. OK also works
  94.     return c;
  95.   }
  96.   void lsr() {
  97.     try {
  98.       System.out.println("lsr /");
  99.       ToolRunner.run(conf_, new FsShell(), new String[]{ "-lsr", "/" });
  100.     } catch (Exception e) {
  101.       e.printStackTrace();
  102.     }
  103.   }
  104.   void printSampleInput() {
  105.     try {
  106.       System.out.println("cat /input/part-00");
  107.       String content = StreamUtil.slurpHadoop(new Path("/input/part-00"), fs_);
  108.       System.out.println(content);
  109.       System.out.println("cat done.");
  110.     } catch (Exception e) {
  111.       e.printStackTrace();
  112.     }
  113.   }
  114.   void callStreaming(String argSideOutput, boolean inputTagged) throws IOException {
  115.     String[] testargs = new String[] {
  116.       //"-jobconf", "stream.debug=1",
  117.       "-verbose", 
  118.       "-jobconf", "stream.testmerge=1", 
  119.       "-input", "+/input/part-00 | /input/part-01 | /input/part-02", 
  120.       "-mapper", StreamUtil.localizeBin("/bin/cat"), 
  121.       "-reducer", "NONE", 
  122.       "-output", "/my.output",
  123.       "-mapsideoutput", argSideOutput, 
  124.       "-dfs", conf_.get("fs.default.name"), 
  125.       "-jt", "local",
  126.       "-jobconf", "stream.sideoutput.localfs=true", 
  127.       "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp")
  128.     };
  129.     ArrayList argList = new ArrayList();
  130.     argList.addAll(Arrays.asList(testargs));
  131.     if (inputTagged) {
  132.       argList.add("-inputtagged");
  133.     }
  134.     testargs = (String[])argList.toArray(new String[0]);
  135.     String sss = StreamUtil.collate(argList, " ");
  136.     System.out.println("bin/hadoop jar build/hadoop-streaming.jar " + sss);
  137.     //HadoopStreaming.main(testargs);
  138.     StreamJob job = new StreamJob(testargs, false);
  139.     job.go();
  140.   }
  141.   SideEffectConsumer startSideEffectConsumer(StringBuffer outBuf) {
  142.     SideEffectConsumer t = new SideEffectConsumer(outBuf) {
  143.         ServerSocket listen;
  144.         Socket client;
  145.         InputStream in;
  146.       
  147.         @Override
  148.         InputStream connectInputStream() throws IOException {
  149.           listen = new ServerSocket(SOC_PORT);
  150.           client = listen.accept();
  151.           in = client.getInputStream();
  152.           return in;
  153.         }
  154.       
  155.         @Override
  156.         void close() throws IOException
  157.         {
  158.           listen.close();
  159.           System.out.println("@@@listen closed");
  160.         }
  161.       };
  162.     t.start();
  163.     return t;
  164.   }
  165.   abstract class SideEffectConsumer extends Thread {
  166.     SideEffectConsumer(StringBuffer buf) {
  167.       buf_ = buf;
  168.       setDaemon(true);
  169.     }
  170.     abstract InputStream connectInputStream() throws IOException;
  171.     
  172.     abstract void close() throws IOException;
  173.     
  174.     @Override
  175.     public void run() {
  176.       try {
  177.         in_ = connectInputStream();
  178.         LineReader lineReader = new LineReader((InputStream)in_, conf_);
  179.         Text line = new Text();
  180.         while (lineReader.readLine(line) > 0) {
  181.           buf_.append(line.toString());
  182.           buf_.append('n');
  183.           line.clear();
  184.         }
  185.         lineReader.close();
  186.         in_.close();
  187.       } catch (IOException io) {
  188.         throw new RuntimeException(io);
  189.       }
  190.     }
  191.     
  192.     InputStream in_;
  193.     StringBuffer buf_;
  194.   }
  195.   public void testMain() throws IOException {
  196.     boolean success = false;
  197.     String base = new File(".").getAbsolutePath();
  198.     System.setProperty("hadoop.log.dir", base + "/logs");
  199.     conf_ = new Configuration();
  200.     String overrideFS = StreamUtil.getBoundAntProperty("fs.default.name", null);
  201.     MiniDFSCluster cluster = null;
  202.     try {
  203.       if (overrideFS == null) {
  204.         cluster = new MiniDFSCluster(conf_, 1, true, null);
  205.         fs_ = cluster.getFileSystem();
  206.       } else {
  207.         System.out.println("overrideFS: " + overrideFS);
  208.         FileSystem.setDefaultUri(conf_, overrideFS);
  209.         fs_ = FileSystem.get(conf_);
  210.       }
  211.       doAllTestJobs();
  212.       success = true;
  213.     } catch (IOException io) {
  214.       io.printStackTrace();
  215.     } finally {
  216.       try {
  217.         fs_.close();
  218.       } catch (IOException io) {
  219.       }
  220.       if (cluster != null) {
  221.         cluster.shutdown();
  222.         System.out.println("cluster.shutdown(); DONE");
  223.       }
  224.       System.out.println(getClass().getName() + ": success=" + success);
  225.     }
  226.   }
  227.   void doAllTestJobs() throws IOException
  228.   {
  229.     goSocketTagged(true, false);
  230.     goSocketTagged(false, false);
  231.     goSocketTagged(true, true);
  232.   }
  233.   
  234.   void goSocketTagged(boolean socket, boolean inputTagged) throws IOException {
  235.     System.out.println("***** goSocketTagged: " + socket + ", " + inputTagged);
  236.     String expect = createInputs(inputTagged);
  237.     lsr();
  238.     printSampleInput();
  239.     StringBuffer outputBuf = new StringBuffer();
  240.     String sideOutput = null;
  241.     File f = null;
  242.     SideEffectConsumer consumer = null;
  243.     if (socket) {
  244.       consumer = startSideEffectConsumer(outputBuf);
  245.       sideOutput = "socket://localhost:" + SOC_PORT + "/";
  246.     } else {
  247.       String userOut = StreamUtil.getBoundAntProperty(
  248.                                                       "hadoop.test.localoutputfile", null);
  249.       if (userOut != null) {
  250.         f = new File(userOut);
  251.         // don't delete so they can mkfifo
  252.         maybeFifoOutput_ = true;
  253.       } else {
  254.         f = new File("localoutputfile");
  255.         f.delete();
  256.         maybeFifoOutput_ = false;
  257.       }
  258.       String s = new Path(f.getAbsolutePath()).toString();
  259.       if (!s.startsWith("/")) {
  260.         s = "/" + s; // Windows "file:/C:/"
  261.       }
  262.       sideOutput = "file:" + s;
  263.     }
  264.     System.out.println("sideOutput=" + sideOutput);
  265.     callStreaming(sideOutput, inputTagged);
  266.     String output;
  267.     if (socket) {
  268.       try {
  269.         consumer.join();
  270.         consumer.close();
  271.       } catch (InterruptedException e) {
  272.         throw (IOException) new IOException().initCause(e);
  273.       }
  274.       output = outputBuf.toString();
  275.     } else {
  276.       if (maybeFifoOutput_) {
  277.         System.out.println("assertEquals will fail.");
  278.         output = "potential FIFO: not retrieving to avoid blocking on open() "
  279.           + f.getAbsoluteFile();
  280.       } else {
  281.         output = StreamUtil.slurp(f.getAbsoluteFile());
  282.       }
  283.     }
  284.     lsr();
  285.     
  286.     System.out.println("output=|" + output + "|");
  287.     System.out.println("expect=|" + expect + "|");
  288.     assertEquals(expect, output);
  289.   }
  290.   Configuration conf_;
  291.   FileSystem fs_;
  292.   boolean maybeFifoOutput_;
  293.   public static void main(String[] args) throws Throwable {
  294.     TestStreamedMerge test = new TestStreamedMerge();
  295.     test.testMain();
  296.   }
  297.   
  298. }