TestHdfsProxy.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:9k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfsproxy;
  19. import java.io.FileNotFoundException;
  20. import java.io.IOException;
  21. import java.net.InetSocketAddress;
  22. import java.net.URI;
  23. import java.util.Random;
  24. import junit.framework.TestCase;
  25. import org.apache.commons.logging.LogFactory;
  26. import org.apache.commons.logging.impl.Log4JLogger;
  27. import org.apache.log4j.Level;
  28. import org.apache.hadoop.conf.Configuration;
  29. import org.apache.hadoop.fs.FileSystem;
  30. import org.apache.hadoop.fs.FSDataInputStream;
  31. import org.apache.hadoop.fs.FSDataOutputStream;
  32. import org.apache.hadoop.fs.Path;
  33. import org.apache.hadoop.hdfs.MiniDFSCluster;
  34. import org.apache.hadoop.hdfs.server.datanode.DataNode;
  35. import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
  36. import org.apache.hadoop.net.NetUtils;
  37. import org.apache.hadoop.tools.DistCp;
  38. import org.apache.hadoop.util.ToolRunner;
  39. /**
  40.  * A JUnit test for HdfsProxy
  41.  */
  42. public class TestHdfsProxy extends TestCase {
  43.   {
  44.     ((Log4JLogger) LogFactory.getLog("org.apache.hadoop.hdfs.StateChange"))
  45.         .getLogger().setLevel(Level.OFF);
  46.     ((Log4JLogger) DataNode.LOG).getLogger().setLevel(Level.OFF);
  47.     ((Log4JLogger) FSNamesystem.LOG).getLogger().setLevel(Level.OFF);
  48.     ((Log4JLogger) DistCp.LOG).getLogger().setLevel(Level.ALL);
  49.   }
  50.   static final URI LOCAL_FS = URI.create("file:///");
  51.   private static final int NFILES = 10;
  52.   private static String TEST_ROOT_DIR = new Path(System.getProperty(
  53.       "test.build.data", "/tmp")).toString().replace(' ', '+');
  54.   /**
  55.    * class MyFile contains enough information to recreate the contents of a
  56.    * single file.
  57.    */
  58.   private static class MyFile {
  59.     private static Random gen = new Random();
  60.     private static final int MAX_LEVELS = 3;
  61.     private static final int MAX_SIZE = 8 * 1024;
  62.     private static String[] dirNames = { "zero", "one", "two", "three", "four",
  63.         "five", "six", "seven", "eight", "nine" };
  64.     private final String name;
  65.     private int size = 0;
  66.     private long seed = 0L;
  67.     MyFile() {
  68.       this(gen.nextInt(MAX_LEVELS));
  69.     }
  70.     MyFile(int nLevels) {
  71.       String xname = "";
  72.       if (nLevels != 0) {
  73.         int[] levels = new int[nLevels];
  74.         for (int idx = 0; idx < nLevels; idx++) {
  75.           levels[idx] = gen.nextInt(10);
  76.         }
  77.         StringBuffer sb = new StringBuffer();
  78.         for (int idx = 0; idx < nLevels; idx++) {
  79.           sb.append(dirNames[levels[idx]]);
  80.           sb.append("/");
  81.         }
  82.         xname = sb.toString();
  83.       }
  84.       long fidx = gen.nextLong() & Long.MAX_VALUE;
  85.       name = xname + Long.toString(fidx);
  86.       reset();
  87.     }
  88.     void reset() {
  89.       final int oldsize = size;
  90.       do {
  91.         size = gen.nextInt(MAX_SIZE);
  92.       } while (oldsize == size);
  93.       final long oldseed = seed;
  94.       do {
  95.         seed = gen.nextLong() & Long.MAX_VALUE;
  96.       } while (oldseed == seed);
  97.     }
  98.     String getName() {
  99.       return name;
  100.     }
  101.     int getSize() {
  102.       return size;
  103.     }
  104.     long getSeed() {
  105.       return seed;
  106.     }
  107.   }
  108.   private static MyFile[] createFiles(URI fsname, String topdir)
  109.       throws IOException {
  110.     return createFiles(FileSystem.get(fsname, new Configuration()), topdir);
  111.   }
  112.   /**
  113.    * create NFILES with random names and directory hierarchies with random (but
  114.    * reproducible) data in them.
  115.    */
  116.   private static MyFile[] createFiles(FileSystem fs, String topdir)
  117.       throws IOException {
  118.     Path root = new Path(topdir);
  119.     MyFile[] files = new MyFile[NFILES];
  120.     for (int i = 0; i < NFILES; i++) {
  121.       files[i] = createFile(root, fs);
  122.     }
  123.     return files;
  124.   }
  125.   private static MyFile createFile(Path root, FileSystem fs, int levels)
  126.       throws IOException {
  127.     MyFile f = levels < 0 ? new MyFile() : new MyFile(levels);
  128.     Path p = new Path(root, f.getName());
  129.     FSDataOutputStream out = fs.create(p);
  130.     byte[] toWrite = new byte[f.getSize()];
  131.     new Random(f.getSeed()).nextBytes(toWrite);
  132.     out.write(toWrite);
  133.     out.close();
  134.     FileSystem.LOG.info("created: " + p + ", size=" + f.getSize());
  135.     return f;
  136.   }
  137.   private static MyFile createFile(Path root, FileSystem fs) throws IOException {
  138.     return createFile(root, fs, -1);
  139.   }
  140.   private static boolean checkFiles(FileSystem fs, String topdir, MyFile[] files)
  141.       throws IOException {
  142.     return checkFiles(fs, topdir, files, false);
  143.   }
  144.   private static boolean checkFiles(FileSystem fs, String topdir,
  145.       MyFile[] files, boolean existingOnly) throws IOException {
  146.     Path root = new Path(topdir);
  147.     for (int idx = 0; idx < files.length; idx++) {
  148.       Path fPath = new Path(root, files[idx].getName());
  149.       try {
  150.         fs.getFileStatus(fPath);
  151.         FSDataInputStream in = fs.open(fPath);
  152.         byte[] toRead = new byte[files[idx].getSize()];
  153.         byte[] toCompare = new byte[files[idx].getSize()];
  154.         Random rb = new Random(files[idx].getSeed());
  155.         rb.nextBytes(toCompare);
  156.         assertEquals("Cannnot read file.", toRead.length, in.read(toRead));
  157.         in.close();
  158.         for (int i = 0; i < toRead.length; i++) {
  159.           if (toRead[i] != toCompare[i]) {
  160.             return false;
  161.           }
  162.         }
  163.         toRead = null;
  164.         toCompare = null;
  165.       } catch (FileNotFoundException fnfe) {
  166.         if (!existingOnly) {
  167.           throw fnfe;
  168.         }
  169.       }
  170.     }
  171.     return true;
  172.   }
  173.   /** delete directory and everything underneath it. */
  174.   private static void deldir(FileSystem fs, String topdir) throws IOException {
  175.     fs.delete(new Path(topdir), true);
  176.   }
  177.   /** verify hdfsproxy implements the hftp interface */
  178.   public void testHdfsProxyInterface() throws Exception {
  179.     MiniDFSCluster cluster = null;
  180.     HdfsProxy proxy = null;
  181.     try {
  182.       final Configuration dfsConf = new Configuration();
  183.       cluster = new MiniDFSCluster(dfsConf, 2, true, null);
  184.       cluster.waitActive();
  185.       final DistCp distcp = new DistCp(dfsConf);
  186.       final FileSystem localfs = FileSystem.get(LOCAL_FS, dfsConf);
  187.       final FileSystem hdfs = cluster.getFileSystem();
  188.       final Configuration proxyConf = new Configuration(false);
  189.       proxyConf.set("hdfsproxy.dfs.namenode.address", hdfs.getUri().getHost() + ":"
  190.           + hdfs.getUri().getPort());
  191.       proxyConf.set("hdfsproxy.https.address", "localhost:0");
  192.       final String namenode = hdfs.getUri().toString();
  193.       if (namenode.startsWith("hdfs://")) {
  194.         MyFile[] files = createFiles(LOCAL_FS, TEST_ROOT_DIR + "/srcdat");
  195.         ToolRunner.run(distcp, new String[] { "-log", namenode + "/logs",
  196.             "file:///" + TEST_ROOT_DIR + "/srcdat", namenode + "/destdat" });
  197.         assertTrue("Source and destination directories do not match.",
  198.             checkFiles(hdfs, "/destdat", files));
  199.         assertTrue("Log directory does not exist.", hdfs.exists(new Path(
  200.             namenode + "/logs")));
  201.         proxyConf.set("proxy.http.test.listener.addr", "localhost:0");
  202.         proxy = new HdfsProxy(proxyConf);
  203.         proxy.start();
  204.         InetSocketAddress proxyAddr = NetUtils.createSocketAddr("localhost:0");
  205.         final String realProxyAddr = proxyAddr.getHostName() + ":"
  206.             + proxy.getPort();
  207.         ToolRunner.run(distcp, new String[] {
  208.             "hftp://" + realProxyAddr + "/destdat", namenode + "/copied1" });
  209.         assertTrue("Source and copied directories do not match.", checkFiles(
  210.             hdfs, "/copied1", files));
  211.         ToolRunner.run(distcp, new String[] {
  212.             "hftp://" + realProxyAddr + "/destdat",
  213.             "file:///" + TEST_ROOT_DIR + "/copied2" });
  214.         assertTrue("Source and copied directories do not match.", checkFiles(
  215.             localfs, TEST_ROOT_DIR + "/copied2", files));
  216.         deldir(hdfs, "/destdat");
  217.         deldir(hdfs, "/logs");
  218.         deldir(hdfs, "/copied1");
  219.         deldir(localfs, TEST_ROOT_DIR + "/srcdat");
  220.         deldir(localfs, TEST_ROOT_DIR + "/copied2");
  221.       }
  222.     } finally {
  223.       if (cluster != null) {
  224.         cluster.shutdown();
  225.       }
  226.       if (proxy != null) {
  227.         proxy.stop();
  228.       }
  229.     }
  230.   }
  231. }