TestHdfsProxy.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:9k
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.apache.hadoop.hdfsproxy;
- import java.io.FileNotFoundException;
- import java.io.IOException;
- import java.net.InetSocketAddress;
- import java.net.URI;
- import java.util.Random;
- import junit.framework.TestCase;
- import org.apache.commons.logging.LogFactory;
- import org.apache.commons.logging.impl.Log4JLogger;
- import org.apache.log4j.Level;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.FSDataInputStream;
- import org.apache.hadoop.fs.FSDataOutputStream;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.hdfs.MiniDFSCluster;
- import org.apache.hadoop.hdfs.server.datanode.DataNode;
- import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
- import org.apache.hadoop.net.NetUtils;
- import org.apache.hadoop.tools.DistCp;
- import org.apache.hadoop.util.ToolRunner;
- /**
- * A JUnit test for HdfsProxy
- */
- public class TestHdfsProxy extends TestCase {
- {
- ((Log4JLogger) LogFactory.getLog("org.apache.hadoop.hdfs.StateChange"))
- .getLogger().setLevel(Level.OFF);
- ((Log4JLogger) DataNode.LOG).getLogger().setLevel(Level.OFF);
- ((Log4JLogger) FSNamesystem.LOG).getLogger().setLevel(Level.OFF);
- ((Log4JLogger) DistCp.LOG).getLogger().setLevel(Level.ALL);
- }
- static final URI LOCAL_FS = URI.create("file:///");
- private static final int NFILES = 10;
- private static String TEST_ROOT_DIR = new Path(System.getProperty(
- "test.build.data", "/tmp")).toString().replace(' ', '+');
- /**
- * class MyFile contains enough information to recreate the contents of a
- * single file.
- */
- private static class MyFile {
- private static Random gen = new Random();
- private static final int MAX_LEVELS = 3;
- private static final int MAX_SIZE = 8 * 1024;
- private static String[] dirNames = { "zero", "one", "two", "three", "four",
- "five", "six", "seven", "eight", "nine" };
- private final String name;
- private int size = 0;
- private long seed = 0L;
- MyFile() {
- this(gen.nextInt(MAX_LEVELS));
- }
- MyFile(int nLevels) {
- String xname = "";
- if (nLevels != 0) {
- int[] levels = new int[nLevels];
- for (int idx = 0; idx < nLevels; idx++) {
- levels[idx] = gen.nextInt(10);
- }
- StringBuffer sb = new StringBuffer();
- for (int idx = 0; idx < nLevels; idx++) {
- sb.append(dirNames[levels[idx]]);
- sb.append("/");
- }
- xname = sb.toString();
- }
- long fidx = gen.nextLong() & Long.MAX_VALUE;
- name = xname + Long.toString(fidx);
- reset();
- }
- void reset() {
- final int oldsize = size;
- do {
- size = gen.nextInt(MAX_SIZE);
- } while (oldsize == size);
- final long oldseed = seed;
- do {
- seed = gen.nextLong() & Long.MAX_VALUE;
- } while (oldseed == seed);
- }
- String getName() {
- return name;
- }
- int getSize() {
- return size;
- }
- long getSeed() {
- return seed;
- }
- }
- private static MyFile[] createFiles(URI fsname, String topdir)
- throws IOException {
- return createFiles(FileSystem.get(fsname, new Configuration()), topdir);
- }
- /**
- * create NFILES with random names and directory hierarchies with random (but
- * reproducible) data in them.
- */
- private static MyFile[] createFiles(FileSystem fs, String topdir)
- throws IOException {
- Path root = new Path(topdir);
- MyFile[] files = new MyFile[NFILES];
- for (int i = 0; i < NFILES; i++) {
- files[i] = createFile(root, fs);
- }
- return files;
- }
- private static MyFile createFile(Path root, FileSystem fs, int levels)
- throws IOException {
- MyFile f = levels < 0 ? new MyFile() : new MyFile(levels);
- Path p = new Path(root, f.getName());
- FSDataOutputStream out = fs.create(p);
- byte[] toWrite = new byte[f.getSize()];
- new Random(f.getSeed()).nextBytes(toWrite);
- out.write(toWrite);
- out.close();
- FileSystem.LOG.info("created: " + p + ", size=" + f.getSize());
- return f;
- }
- private static MyFile createFile(Path root, FileSystem fs) throws IOException {
- return createFile(root, fs, -1);
- }
- private static boolean checkFiles(FileSystem fs, String topdir, MyFile[] files)
- throws IOException {
- return checkFiles(fs, topdir, files, false);
- }
- private static boolean checkFiles(FileSystem fs, String topdir,
- MyFile[] files, boolean existingOnly) throws IOException {
- Path root = new Path(topdir);
- for (int idx = 0; idx < files.length; idx++) {
- Path fPath = new Path(root, files[idx].getName());
- try {
- fs.getFileStatus(fPath);
- FSDataInputStream in = fs.open(fPath);
- byte[] toRead = new byte[files[idx].getSize()];
- byte[] toCompare = new byte[files[idx].getSize()];
- Random rb = new Random(files[idx].getSeed());
- rb.nextBytes(toCompare);
- assertEquals("Cannnot read file.", toRead.length, in.read(toRead));
- in.close();
- for (int i = 0; i < toRead.length; i++) {
- if (toRead[i] != toCompare[i]) {
- return false;
- }
- }
- toRead = null;
- toCompare = null;
- } catch (FileNotFoundException fnfe) {
- if (!existingOnly) {
- throw fnfe;
- }
- }
- }
- return true;
- }
- /** delete directory and everything underneath it. */
- private static void deldir(FileSystem fs, String topdir) throws IOException {
- fs.delete(new Path(topdir), true);
- }
- /** verify hdfsproxy implements the hftp interface */
- public void testHdfsProxyInterface() throws Exception {
- MiniDFSCluster cluster = null;
- HdfsProxy proxy = null;
- try {
- final Configuration dfsConf = new Configuration();
- cluster = new MiniDFSCluster(dfsConf, 2, true, null);
- cluster.waitActive();
- final DistCp distcp = new DistCp(dfsConf);
- final FileSystem localfs = FileSystem.get(LOCAL_FS, dfsConf);
- final FileSystem hdfs = cluster.getFileSystem();
- final Configuration proxyConf = new Configuration(false);
- proxyConf.set("hdfsproxy.dfs.namenode.address", hdfs.getUri().getHost() + ":"
- + hdfs.getUri().getPort());
- proxyConf.set("hdfsproxy.https.address", "localhost:0");
- final String namenode = hdfs.getUri().toString();
- if (namenode.startsWith("hdfs://")) {
- MyFile[] files = createFiles(LOCAL_FS, TEST_ROOT_DIR + "/srcdat");
- ToolRunner.run(distcp, new String[] { "-log", namenode + "/logs",
- "file:///" + TEST_ROOT_DIR + "/srcdat", namenode + "/destdat" });
- assertTrue("Source and destination directories do not match.",
- checkFiles(hdfs, "/destdat", files));
- assertTrue("Log directory does not exist.", hdfs.exists(new Path(
- namenode + "/logs")));
- proxyConf.set("proxy.http.test.listener.addr", "localhost:0");
- proxy = new HdfsProxy(proxyConf);
- proxy.start();
- InetSocketAddress proxyAddr = NetUtils.createSocketAddr("localhost:0");
- final String realProxyAddr = proxyAddr.getHostName() + ":"
- + proxy.getPort();
- ToolRunner.run(distcp, new String[] {
- "hftp://" + realProxyAddr + "/destdat", namenode + "/copied1" });
- assertTrue("Source and copied directories do not match.", checkFiles(
- hdfs, "/copied1", files));
- ToolRunner.run(distcp, new String[] {
- "hftp://" + realProxyAddr + "/destdat",
- "file:///" + TEST_ROOT_DIR + "/copied2" });
- assertTrue("Source and copied directories do not match.", checkFiles(
- localfs, TEST_ROOT_DIR + "/copied2", files));
- deldir(hdfs, "/destdat");
- deldir(hdfs, "/logs");
- deldir(hdfs, "/copied1");
- deldir(localfs, TEST_ROOT_DIR + "/srcdat");
- deldir(localfs, TEST_ROOT_DIR + "/copied2");
- }
- } finally {
- if (cluster != null) {
- cluster.shutdown();
- }
- if (proxy != null) {
- proxy.stop();
- }
- }
- }
- }