TestCopyFiles.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:33k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.fs;
  19. import java.io.ByteArrayOutputStream;
  20. import java.io.FileNotFoundException;
  21. import java.io.IOException;
  22. import java.io.PrintStream;
  23. import java.net.URI;
  24. import java.util.ArrayList;
  25. import java.util.List;
  26. import java.util.Random;
  27. import java.util.StringTokenizer;
  28. import junit.framework.TestCase;
  29. import org.apache.commons.logging.LogFactory;
  30. import org.apache.commons.logging.impl.Log4JLogger;
  31. import org.apache.hadoop.conf.Configuration;
  32. import org.apache.hadoop.fs.permission.FsPermission;
  33. import org.apache.hadoop.hdfs.MiniDFSCluster;
  34. import org.apache.hadoop.hdfs.server.datanode.DataNode;
  35. import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
  36. import org.apache.hadoop.mapred.MiniMRCluster;
  37. import org.apache.hadoop.security.UnixUserGroupInformation;
  38. import org.apache.hadoop.security.UserGroupInformation;
  39. import org.apache.hadoop.tools.DistCp;
  40. import org.apache.hadoop.util.ToolRunner;
  41. import org.apache.log4j.Level;
  42. /**
  43.  * A JUnit test for copying files recursively.
  44.  */
  45. public class TestCopyFiles extends TestCase {
  46.   {
  47.     ((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hdfs.StateChange")
  48.         ).getLogger().setLevel(Level.OFF);
  49.     ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.OFF);
  50.     ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.OFF);
  51.     ((Log4JLogger)DistCp.LOG).getLogger().setLevel(Level.ALL);
  52.   }
  53.   
  54.   static final URI LOCAL_FS = URI.create("file:///");
  55.   
  56.   private static final Random RAN = new Random();
  57.   private static final int NFILES = 20;
  58.   private static String TEST_ROOT_DIR =
  59.     new Path(System.getProperty("test.build.data","/tmp"))
  60.     .toString().replace(' ', '+');
  61.   /** class MyFile contains enough information to recreate the contents of
  62.    * a single file.
  63.    */
  64.   private static class MyFile {
  65.     private static Random gen = new Random();
  66.     private static final int MAX_LEVELS = 3;
  67.     private static final int MAX_SIZE = 8*1024;
  68.     private static String[] dirNames = {
  69.       "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine"
  70.     };
  71.     private final String name;
  72.     private int size = 0;
  73.     private long seed = 0L;
  74.     MyFile() {
  75.       this(gen.nextInt(MAX_LEVELS));
  76.     }
  77.     MyFile(int nLevels) {
  78.       String xname = "";
  79.       if (nLevels != 0) {
  80.         int[] levels = new int[nLevels];
  81.         for (int idx = 0; idx < nLevels; idx++) {
  82.           levels[idx] = gen.nextInt(10);
  83.         }
  84.         StringBuffer sb = new StringBuffer();
  85.         for (int idx = 0; idx < nLevels; idx++) {
  86.           sb.append(dirNames[levels[idx]]);
  87.           sb.append("/");
  88.         }
  89.         xname = sb.toString();
  90.       }
  91.       long fidx = gen.nextLong() & Long.MAX_VALUE;
  92.       name = xname + Long.toString(fidx);
  93.       reset();
  94.     }
  95.     void reset() {
  96.       final int oldsize = size;
  97.       do { size = gen.nextInt(MAX_SIZE); } while (oldsize == size);
  98.       final long oldseed = seed;
  99.       do { seed = gen.nextLong() & Long.MAX_VALUE; } while (oldseed == seed);
  100.     }
  101.     String getName() { return name; }
  102.     int getSize() { return size; }
  103.     long getSeed() { return seed; }
  104.   }
  105.   private static MyFile[] createFiles(URI fsname, String topdir)
  106.     throws IOException {
  107.     return createFiles(FileSystem.get(fsname, new Configuration()), topdir);
  108.   }
  109.   /** create NFILES with random names and directory hierarchies
  110.    * with random (but reproducible) data in them.
  111.    */
  112.   private static MyFile[] createFiles(FileSystem fs, String topdir)
  113.     throws IOException {
  114.     Path root = new Path(topdir);
  115.     MyFile[] files = new MyFile[NFILES];
  116.     for (int i = 0; i < NFILES; i++) {
  117.       files[i] = createFile(root, fs);
  118.     }
  119.     return files;
  120.   }
  121.   static MyFile createFile(Path root, FileSystem fs, int levels)
  122.       throws IOException {
  123.     MyFile f = levels < 0 ? new MyFile() : new MyFile(levels);
  124.     Path p = new Path(root, f.getName());
  125.     FSDataOutputStream out = fs.create(p);
  126.     byte[] toWrite = new byte[f.getSize()];
  127.     new Random(f.getSeed()).nextBytes(toWrite);
  128.     out.write(toWrite);
  129.     out.close();
  130.     FileSystem.LOG.info("created: " + p + ", size=" + f.getSize());
  131.     return f;
  132.   }
  133.   static MyFile createFile(Path root, FileSystem fs) throws IOException {
  134.     return createFile(root, fs, -1);
  135.   }
  136.   private static boolean checkFiles(FileSystem fs, String topdir, MyFile[] files
  137.       ) throws IOException {
  138.     return checkFiles(fs, topdir, files, false);    
  139.   }
  140.   private static boolean checkFiles(FileSystem fs, String topdir, MyFile[] files,
  141.       boolean existingOnly) throws IOException {
  142.     Path root = new Path(topdir);
  143.     
  144.     for (int idx = 0; idx < files.length; idx++) {
  145.       Path fPath = new Path(root, files[idx].getName());
  146.       try {
  147.         fs.getFileStatus(fPath);
  148.         FSDataInputStream in = fs.open(fPath);
  149.         byte[] toRead = new byte[files[idx].getSize()];
  150.         byte[] toCompare = new byte[files[idx].getSize()];
  151.         Random rb = new Random(files[idx].getSeed());
  152.         rb.nextBytes(toCompare);
  153.         assertEquals("Cannnot read file.", toRead.length, in.read(toRead));
  154.         in.close();
  155.         for (int i = 0; i < toRead.length; i++) {
  156.           if (toRead[i] != toCompare[i]) {
  157.             return false;
  158.           }
  159.         }
  160.         toRead = null;
  161.         toCompare = null;
  162.       }
  163.       catch(FileNotFoundException fnfe) {
  164.         if (!existingOnly) {
  165.           throw fnfe;
  166.         }
  167.       }
  168.     }
  169.     
  170.     return true;
  171.   }
  172.   private static void updateFiles(FileSystem fs, String topdir, MyFile[] files,
  173.         int nupdate) throws IOException {
  174.     assert nupdate <= NFILES;
  175.     Path root = new Path(topdir);
  176.     for (int idx = 0; idx < nupdate; ++idx) {
  177.       Path fPath = new Path(root, files[idx].getName());
  178.       // overwrite file
  179.       assertTrue(fPath.toString() + " does not exist", fs.exists(fPath));
  180.       FSDataOutputStream out = fs.create(fPath);
  181.       files[idx].reset();
  182.       byte[] toWrite = new byte[files[idx].getSize()];
  183.       Random rb = new Random(files[idx].getSeed());
  184.       rb.nextBytes(toWrite);
  185.       out.write(toWrite);
  186.       out.close();
  187.     }
  188.   }
  189.   private static FileStatus[] getFileStatus(FileSystem fs,
  190.       String topdir, MyFile[] files) throws IOException {
  191.     return getFileStatus(fs, topdir, files, false);
  192.   }
  193.   private static FileStatus[] getFileStatus(FileSystem fs,
  194.       String topdir, MyFile[] files, boolean existingOnly) throws IOException {
  195.     Path root = new Path(topdir);
  196.     List<FileStatus> statuses = new ArrayList<FileStatus>();
  197.     for (int idx = 0; idx < NFILES; ++idx) {
  198.       try {
  199.         statuses.add(fs.getFileStatus(new Path(root, files[idx].getName())));
  200.       } catch(FileNotFoundException fnfe) {
  201.         if (!existingOnly) {
  202.           throw fnfe;
  203.         }
  204.       }
  205.     }
  206.     return statuses.toArray(new FileStatus[statuses.size()]);
  207.   }
  208.   private static boolean checkUpdate(FileSystem fs, FileStatus[] old,
  209.       String topdir, MyFile[] upd, final int nupdate) throws IOException {
  210.     Path root = new Path(topdir);
  211.     // overwrote updated files
  212.     for (int idx = 0; idx < nupdate; ++idx) {
  213.       final FileStatus stat =
  214.         fs.getFileStatus(new Path(root, upd[idx].getName()));
  215.       if (stat.getModificationTime() <= old[idx].getModificationTime()) {
  216.         return false;
  217.       }
  218.     }
  219.     // did not overwrite files not updated
  220.     for (int idx = nupdate; idx < NFILES; ++idx) {
  221.       final FileStatus stat =
  222.         fs.getFileStatus(new Path(root, upd[idx].getName()));
  223.       if (stat.getModificationTime() != old[idx].getModificationTime()) {
  224.         return false;
  225.       }
  226.     }
  227.     return true;
  228.   }
  229.   /** delete directory and everything underneath it.*/
  230.   private static void deldir(FileSystem fs, String topdir) throws IOException {
  231.     fs.delete(new Path(topdir), true);
  232.   }
  233.   
  234.   /** copy files from local file system to local file system */
  235.   public void testCopyFromLocalToLocal() throws Exception {
  236.     Configuration conf = new Configuration();
  237.     FileSystem localfs = FileSystem.get(LOCAL_FS, conf);
  238.     MyFile[] files = createFiles(LOCAL_FS, TEST_ROOT_DIR+"/srcdat");
  239.     ToolRunner.run(new DistCp(new Configuration()),
  240.                            new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat",
  241.                                          "file:///"+TEST_ROOT_DIR+"/destdat"});
  242.     assertTrue("Source and destination directories do not match.",
  243.                checkFiles(localfs, TEST_ROOT_DIR+"/destdat", files));
  244.     deldir(localfs, TEST_ROOT_DIR+"/destdat");
  245.     deldir(localfs, TEST_ROOT_DIR+"/srcdat");
  246.   }
  247.   
  248.   /** copy files from dfs file system to dfs file system */
  249.   public void testCopyFromDfsToDfs() throws Exception {
  250.     String namenode = null;
  251.     MiniDFSCluster cluster = null;
  252.     try {
  253.       Configuration conf = new Configuration();
  254.       cluster = new MiniDFSCluster(conf, 2, true, null);
  255.       final FileSystem hdfs = cluster.getFileSystem();
  256.       namenode = FileSystem.getDefaultUri(conf).toString();
  257.       if (namenode.startsWith("hdfs://")) {
  258.         MyFile[] files = createFiles(URI.create(namenode), "/srcdat");
  259.         ToolRunner.run(new DistCp(conf), new String[] {
  260.                                          "-log",
  261.                                          namenode+"/logs",
  262.                                          namenode+"/srcdat",
  263.                                          namenode+"/destdat"});
  264.         assertTrue("Source and destination directories do not match.",
  265.                    checkFiles(hdfs, "/destdat", files));
  266.         FileSystem fs = FileSystem.get(URI.create(namenode+"/logs"), conf);
  267.         assertTrue("Log directory does not exist.",
  268.                    fs.exists(new Path(namenode+"/logs")));
  269.         deldir(hdfs, "/destdat");
  270.         deldir(hdfs, "/srcdat");
  271.         deldir(hdfs, "/logs");
  272.       }
  273.     } finally {
  274.       if (cluster != null) { cluster.shutdown(); }
  275.     }
  276.   }
  277.   
  278.   /** copy files from local file system to dfs file system */
  279.   public void testCopyFromLocalToDfs() throws Exception {
  280.     MiniDFSCluster cluster = null;
  281.     try {
  282.       Configuration conf = new Configuration();
  283.       cluster = new MiniDFSCluster(conf, 1, true, null);
  284.       final FileSystem hdfs = cluster.getFileSystem();
  285.       final String namenode = hdfs.getUri().toString();
  286.       if (namenode.startsWith("hdfs://")) {
  287.         MyFile[] files = createFiles(LOCAL_FS, TEST_ROOT_DIR+"/srcdat");
  288.         ToolRunner.run(new DistCp(conf), new String[] {
  289.                                          "-log",
  290.                                          namenode+"/logs",
  291.                                          "file:///"+TEST_ROOT_DIR+"/srcdat",
  292.                                          namenode+"/destdat"});
  293.         assertTrue("Source and destination directories do not match.",
  294.                    checkFiles(cluster.getFileSystem(), "/destdat", files));
  295.         assertTrue("Log directory does not exist.",
  296.                     hdfs.exists(new Path(namenode+"/logs")));
  297.         deldir(hdfs, "/destdat");
  298.         deldir(hdfs, "/logs");
  299.         deldir(FileSystem.get(LOCAL_FS, conf), TEST_ROOT_DIR+"/srcdat");
  300.       }
  301.     } finally {
  302.       if (cluster != null) { cluster.shutdown(); }
  303.     }
  304.   }
  305.   /** copy files from dfs file system to local file system */
  306.   public void testCopyFromDfsToLocal() throws Exception {
  307.     MiniDFSCluster cluster = null;
  308.     try {
  309.       Configuration conf = new Configuration();
  310.       final FileSystem localfs = FileSystem.get(LOCAL_FS, conf);
  311.       cluster = new MiniDFSCluster(conf, 1, true, null);
  312.       final FileSystem hdfs = cluster.getFileSystem();
  313.       final String namenode = FileSystem.getDefaultUri(conf).toString();
  314.       if (namenode.startsWith("hdfs://")) {
  315.         MyFile[] files = createFiles(URI.create(namenode), "/srcdat");
  316.         ToolRunner.run(new DistCp(conf), new String[] {
  317.                                          "-log",
  318.                                          "/logs",
  319.                                          namenode+"/srcdat",
  320.                                          "file:///"+TEST_ROOT_DIR+"/destdat"});
  321.         assertTrue("Source and destination directories do not match.",
  322.                    checkFiles(localfs, TEST_ROOT_DIR+"/destdat", files));
  323.         assertTrue("Log directory does not exist.",
  324.                     hdfs.exists(new Path("/logs")));
  325.         deldir(localfs, TEST_ROOT_DIR+"/destdat");
  326.         deldir(hdfs, "/logs");
  327.         deldir(hdfs, "/srcdat");
  328.       }
  329.     } finally {
  330.       if (cluster != null) { cluster.shutdown(); }
  331.     }
  332.   }
  333.   public void testCopyDfsToDfsUpdateOverwrite() throws Exception {
  334.     MiniDFSCluster cluster = null;
  335.     try {
  336.       Configuration conf = new Configuration();
  337.       cluster = new MiniDFSCluster(conf, 2, true, null);
  338.       final FileSystem hdfs = cluster.getFileSystem();
  339.       final String namenode = hdfs.getUri().toString();
  340.       if (namenode.startsWith("hdfs://")) {
  341.         MyFile[] files = createFiles(URI.create(namenode), "/srcdat");
  342.         ToolRunner.run(new DistCp(conf), new String[] {
  343.                                          "-p",
  344.                                          "-log",
  345.                                          namenode+"/logs",
  346.                                          namenode+"/srcdat",
  347.                                          namenode+"/destdat"});
  348.         assertTrue("Source and destination directories do not match.",
  349.                    checkFiles(hdfs, "/destdat", files));
  350.         FileSystem fs = FileSystem.get(URI.create(namenode+"/logs"), conf);
  351.         assertTrue("Log directory does not exist.",
  352.                     fs.exists(new Path(namenode+"/logs")));
  353.         FileStatus[] dchkpoint = getFileStatus(hdfs, "/destdat", files);
  354.         final int nupdate = NFILES>>2;
  355.         updateFiles(cluster.getFileSystem(), "/srcdat", files, nupdate);
  356.         deldir(hdfs, "/logs");
  357.         ToolRunner.run(new DistCp(conf), new String[] {
  358.                                          "-p",
  359.                                          "-update",
  360.                                          "-log",
  361.                                          namenode+"/logs",
  362.                                          namenode+"/srcdat",
  363.                                          namenode+"/destdat"});
  364.         assertTrue("Source and destination directories do not match.",
  365.                    checkFiles(hdfs, "/destdat", files));
  366.         assertTrue("Update failed to replicate all changes in src",
  367.                  checkUpdate(hdfs, dchkpoint, "/destdat", files, nupdate));
  368.         deldir(hdfs, "/logs");
  369.         ToolRunner.run(new DistCp(conf), new String[] {
  370.                                          "-p",
  371.                                          "-overwrite",
  372.                                          "-log",
  373.                                          namenode+"/logs",
  374.                                          namenode+"/srcdat",
  375.                                          namenode+"/destdat"});
  376.         assertTrue("Source and destination directories do not match.",
  377.                    checkFiles(hdfs, "/destdat", files));
  378.         assertTrue("-overwrite didn't.",
  379.                  checkUpdate(hdfs, dchkpoint, "/destdat", files, NFILES));
  380.         deldir(hdfs, "/destdat");
  381.         deldir(hdfs, "/srcdat");
  382.         deldir(hdfs, "/logs");
  383.       }
  384.     } finally {
  385.       if (cluster != null) { cluster.shutdown(); }
  386.     }
  387.   }
  388.   public void testCopyDuplication() throws Exception {
  389.     final FileSystem localfs = FileSystem.get(LOCAL_FS, new Configuration());
  390.     try {    
  391.       MyFile[] files = createFiles(localfs, TEST_ROOT_DIR+"/srcdat");
  392.       ToolRunner.run(new DistCp(new Configuration()),
  393.           new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat",
  394.                         "file:///"+TEST_ROOT_DIR+"/src2/srcdat"});
  395.       assertTrue("Source and destination directories do not match.",
  396.                  checkFiles(localfs, TEST_ROOT_DIR+"/src2/srcdat", files));
  397.   
  398.       assertEquals(DistCp.DuplicationException.ERROR_CODE,
  399.           ToolRunner.run(new DistCp(new Configuration()),
  400.           new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat",
  401.                         "file:///"+TEST_ROOT_DIR+"/src2/srcdat",
  402.                         "file:///"+TEST_ROOT_DIR+"/destdat",}));
  403.     }
  404.     finally {
  405.       deldir(localfs, TEST_ROOT_DIR+"/destdat");
  406.       deldir(localfs, TEST_ROOT_DIR+"/srcdat");
  407.       deldir(localfs, TEST_ROOT_DIR+"/src2");
  408.     }
  409.   }
  410.   public void testCopySingleFile() throws Exception {
  411.     FileSystem fs = FileSystem.get(LOCAL_FS, new Configuration());
  412.     Path root = new Path(TEST_ROOT_DIR+"/srcdat");
  413.     try {    
  414.       MyFile[] files = {createFile(root, fs)};
  415.       //copy a dir with a single file
  416.       ToolRunner.run(new DistCp(new Configuration()),
  417.           new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat",
  418.                         "file:///"+TEST_ROOT_DIR+"/destdat"});
  419.       assertTrue("Source and destination directories do not match.",
  420.                  checkFiles(fs, TEST_ROOT_DIR+"/destdat", files));
  421.       
  422.       //copy a single file
  423.       String fname = files[0].getName();
  424.       Path p = new Path(root, fname);
  425.       FileSystem.LOG.info("fname=" + fname + ", exists? " + fs.exists(p));
  426.       ToolRunner.run(new DistCp(new Configuration()),
  427.           new String[] {"file:///"+TEST_ROOT_DIR+"/srcdat/"+fname,
  428.                         "file:///"+TEST_ROOT_DIR+"/dest2/"+fname});
  429.       assertTrue("Source and destination directories do not match.",
  430.           checkFiles(fs, TEST_ROOT_DIR+"/dest2", files));     
  431.       //copy single file to existing dir
  432.       deldir(fs, TEST_ROOT_DIR+"/dest2");
  433.       fs.mkdirs(new Path(TEST_ROOT_DIR+"/dest2"));
  434.       MyFile[] files2 = {createFile(root, fs, 0)};
  435.       String sname = files2[0].getName();
  436.       ToolRunner.run(new DistCp(new Configuration()),
  437.           new String[] {"-update",
  438.                         "file:///"+TEST_ROOT_DIR+"/srcdat/"+sname,
  439.                         "file:///"+TEST_ROOT_DIR+"/dest2/"});
  440.       assertTrue("Source and destination directories do not match.",
  441.           checkFiles(fs, TEST_ROOT_DIR+"/dest2", files2));     
  442.       updateFiles(fs, TEST_ROOT_DIR+"/srcdat", files2, 1);
  443.       //copy single file to existing dir w/ dst name conflict
  444.       ToolRunner.run(new DistCp(new Configuration()),
  445.           new String[] {"-update",
  446.                         "file:///"+TEST_ROOT_DIR+"/srcdat/"+sname,
  447.                         "file:///"+TEST_ROOT_DIR+"/dest2/"});
  448.       assertTrue("Source and destination directories do not match.",
  449.           checkFiles(fs, TEST_ROOT_DIR+"/dest2", files2));     
  450.     }
  451.     finally {
  452.       deldir(fs, TEST_ROOT_DIR+"/destdat");
  453.       deldir(fs, TEST_ROOT_DIR+"/dest2");
  454.       deldir(fs, TEST_ROOT_DIR+"/srcdat");
  455.     }
  456.   }
  457.   public void testPreserveOption() throws Exception {
  458.     Configuration conf = new Configuration();
  459.     MiniDFSCluster cluster = null;
  460.     try {
  461.       cluster = new MiniDFSCluster(conf, 2, true, null);
  462.       String nnUri = FileSystem.getDefaultUri(conf).toString();
  463.       FileSystem fs = FileSystem.get(URI.create(nnUri), conf);
  464.       {//test preserving user
  465.         MyFile[] files = createFiles(URI.create(nnUri), "/srcdat");
  466.         FileStatus[] srcstat = getFileStatus(fs, "/srcdat", files);
  467.         for(int i = 0; i < srcstat.length; i++) {
  468.           fs.setOwner(srcstat[i].getPath(), "u" + i, null);
  469.         }
  470.         ToolRunner.run(new DistCp(conf),
  471.             new String[]{"-pu", nnUri+"/srcdat", nnUri+"/destdat"});
  472.         assertTrue("Source and destination directories do not match.",
  473.                    checkFiles(fs, "/destdat", files));
  474.         
  475.         FileStatus[] dststat = getFileStatus(fs, "/destdat", files);
  476.         for(int i = 0; i < dststat.length; i++) {
  477.           assertEquals("i=" + i, "u" + i, dststat[i].getOwner());
  478.         }
  479.         deldir(fs, "/destdat");
  480.         deldir(fs, "/srcdat");
  481.       }
  482.       {//test preserving group
  483.         MyFile[] files = createFiles(URI.create(nnUri), "/srcdat");
  484.         FileStatus[] srcstat = getFileStatus(fs, "/srcdat", files);
  485.         for(int i = 0; i < srcstat.length; i++) {
  486.           fs.setOwner(srcstat[i].getPath(), null, "g" + i);
  487.         }
  488.         ToolRunner.run(new DistCp(conf),
  489.             new String[]{"-pg", nnUri+"/srcdat", nnUri+"/destdat"});
  490.         assertTrue("Source and destination directories do not match.",
  491.                    checkFiles(fs, "/destdat", files));
  492.         
  493.         FileStatus[] dststat = getFileStatus(fs, "/destdat", files);
  494.         for(int i = 0; i < dststat.length; i++) {
  495.           assertEquals("i=" + i, "g" + i, dststat[i].getGroup());
  496.         }
  497.         deldir(fs, "/destdat");
  498.         deldir(fs, "/srcdat");
  499.       }
  500.       {//test preserving mode
  501.         MyFile[] files = createFiles(URI.create(nnUri), "/srcdat");
  502.         FileStatus[] srcstat = getFileStatus(fs, "/srcdat", files);
  503.         FsPermission[] permissions = new FsPermission[srcstat.length];
  504.         for(int i = 0; i < srcstat.length; i++) {
  505.           permissions[i] = new FsPermission((short)(i & 0666));
  506.           fs.setPermission(srcstat[i].getPath(), permissions[i]);
  507.         }
  508.         ToolRunner.run(new DistCp(conf),
  509.             new String[]{"-pp", nnUri+"/srcdat", nnUri+"/destdat"});
  510.         assertTrue("Source and destination directories do not match.",
  511.                    checkFiles(fs, "/destdat", files));
  512.   
  513.         FileStatus[] dststat = getFileStatus(fs, "/destdat", files);
  514.         for(int i = 0; i < dststat.length; i++) {
  515.           assertEquals("i=" + i, permissions[i], dststat[i].getPermission());
  516.         }
  517.         deldir(fs, "/destdat");
  518.         deldir(fs, "/srcdat");
  519.       }
  520.     } finally {
  521.       if (cluster != null) { cluster.shutdown(); }
  522.     }
  523.   }
  524.   public void testMapCount() throws Exception {
  525.     String namenode = null;
  526.     MiniDFSCluster dfs = null;
  527.     MiniMRCluster mr = null;
  528.     try {
  529.       Configuration conf = new Configuration();
  530.       dfs = new MiniDFSCluster(conf, 3, true, null);
  531.       FileSystem fs = dfs.getFileSystem();
  532.       final FsShell shell = new FsShell(conf);
  533.       namenode = fs.getUri().toString();
  534.       mr = new MiniMRCluster(3, namenode, 1);
  535.       MyFile[] files = createFiles(fs.getUri(), "/srcdat");
  536.       long totsize = 0;
  537.       for (MyFile f : files) {
  538.         totsize += f.getSize();
  539.       }
  540.       Configuration job = mr.createJobConf();
  541.       job.setLong("distcp.bytes.per.map", totsize / 3);
  542.       ToolRunner.run(new DistCp(job),
  543.           new String[] {"-m", "100",
  544.                         "-log",
  545.                         namenode+"/logs",
  546.                         namenode+"/srcdat",
  547.                         namenode+"/destdat"});
  548.       assertTrue("Source and destination directories do not match.",
  549.                  checkFiles(fs, "/destdat", files));
  550.       String logdir = namenode + "/logs";
  551.       System.out.println(execCmd(shell, "-lsr", logdir));
  552.       FileStatus[] logs = fs.listStatus(new Path(logdir));
  553.       // rare case where splits are exact, logs.length can be 4
  554.       assertTrue("Unexpected map count, logs.length=" + logs.length,
  555.           logs.length == 5 || logs.length == 4);
  556.       deldir(fs, "/destdat");
  557.       deldir(fs, "/logs");
  558.       ToolRunner.run(new DistCp(job),
  559.           new String[] {"-m", "1",
  560.                         "-log",
  561.                         namenode+"/logs",
  562.                         namenode+"/srcdat",
  563.                         namenode+"/destdat"});
  564.       System.out.println(execCmd(shell, "-lsr", logdir));
  565.       logs = fs.listStatus(new Path(namenode+"/logs"));
  566.       assertTrue("Unexpected map count, logs.length=" + logs.length,
  567.           logs.length == 2);
  568.     } finally {
  569.       if (dfs != null) { dfs.shutdown(); }
  570.       if (mr != null) { mr.shutdown(); }
  571.     }
  572.   }
  573.   public void testLimits() throws Exception {
  574.     Configuration conf = new Configuration();
  575.     MiniDFSCluster cluster = null;
  576.     try {
  577.       cluster = new MiniDFSCluster(conf, 2, true, null);
  578.       final String nnUri = FileSystem.getDefaultUri(conf).toString();
  579.       final FileSystem fs = FileSystem.get(URI.create(nnUri), conf);
  580.       final DistCp distcp = new DistCp(conf);
  581.       final FsShell shell = new FsShell(conf);  
  582.       final String srcrootdir =  "/src_root";
  583.       final Path srcrootpath = new Path(srcrootdir); 
  584.       final String dstrootdir =  "/dst_root";
  585.       final Path dstrootpath = new Path(dstrootdir); 
  586.       {//test -filelimit
  587.         MyFile[] files = createFiles(URI.create(nnUri), srcrootdir);
  588.         int filelimit = files.length / 2;
  589.         System.out.println("filelimit=" + filelimit);
  590.         ToolRunner.run(distcp,
  591.             new String[]{"-filelimit", ""+filelimit, nnUri+srcrootdir, nnUri+dstrootdir});
  592.         String results = execCmd(shell, "-lsr", dstrootdir);
  593.         results = removePrefix(results, dstrootdir);
  594.         System.out.println("results=" +  results);
  595.         FileStatus[] dststat = getFileStatus(fs, dstrootdir, files, true);
  596.         assertEquals(filelimit, dststat.length);
  597.         deldir(fs, dstrootdir);
  598.         deldir(fs, srcrootdir);
  599.       }
  600.       {//test -sizelimit
  601.         createFiles(URI.create(nnUri), srcrootdir);
  602.         long sizelimit = fs.getContentSummary(srcrootpath).getLength()/2;
  603.         System.out.println("sizelimit=" + sizelimit);
  604.         ToolRunner.run(distcp,
  605.             new String[]{"-sizelimit", ""+sizelimit, nnUri+srcrootdir, nnUri+dstrootdir});
  606.         
  607.         ContentSummary summary = fs.getContentSummary(dstrootpath);
  608.         System.out.println("summary=" + summary);
  609.         assertTrue(summary.getLength() <= sizelimit);
  610.         deldir(fs, dstrootdir);
  611.         deldir(fs, srcrootdir);
  612.       }
  613.       {//test update
  614.         final MyFile[] srcs = createFiles(URI.create(nnUri), srcrootdir);
  615.         final long totalsize = fs.getContentSummary(srcrootpath).getLength();
  616.         System.out.println("src.length=" + srcs.length);
  617.         System.out.println("totalsize =" + totalsize);
  618.         fs.mkdirs(dstrootpath);
  619.         final int parts = RAN.nextInt(NFILES/3 - 1) + 2;
  620.         final int filelimit = srcs.length/parts;
  621.         final long sizelimit = totalsize/parts;
  622.         System.out.println("filelimit=" + filelimit);
  623.         System.out.println("sizelimit=" + sizelimit);
  624.         System.out.println("parts    =" + parts);
  625.         final String[] args = {"-filelimit", ""+filelimit, "-sizelimit", ""+sizelimit,
  626.             "-update", nnUri+srcrootdir, nnUri+dstrootdir};
  627.         int dstfilecount = 0;
  628.         long dstsize = 0;
  629.         for(int i = 0; i <= parts; i++) {
  630.           ToolRunner.run(distcp, args);
  631.         
  632.           FileStatus[] dststat = getFileStatus(fs, dstrootdir, srcs, true);
  633.           System.out.println(i + ") dststat.length=" + dststat.length);
  634.           assertTrue(dststat.length - dstfilecount <= filelimit);
  635.           ContentSummary summary = fs.getContentSummary(dstrootpath);
  636.           System.out.println(i + ") summary.getLength()=" + summary.getLength());
  637.           assertTrue(summary.getLength() - dstsize <= sizelimit);
  638.           assertTrue(checkFiles(fs, dstrootdir, srcs, true));
  639.           dstfilecount = dststat.length;
  640.           dstsize = summary.getLength();
  641.         }
  642.         deldir(fs, dstrootdir);
  643.         deldir(fs, srcrootdir);
  644.       }
  645.     } finally {
  646.       if (cluster != null) { cluster.shutdown(); }
  647.     }
  648.   }
  649.   static final long now = System.currentTimeMillis();
  650.   static UnixUserGroupInformation createUGI(String name, boolean issuper) {
  651.     String username = name + now;
  652.     String group = issuper? "supergroup": username;
  653.     return UnixUserGroupInformation.createImmutable(
  654.         new String[]{username, group});
  655.   }
  656.   static Path createHomeDirectory(FileSystem fs, UserGroupInformation ugi
  657.       ) throws IOException {
  658.     final Path home = new Path("/user/" + ugi.getUserName());
  659.     fs.mkdirs(home);
  660.     fs.setOwner(home, ugi.getUserName(), ugi.getGroupNames()[0]);
  661.     fs.setPermission(home, new FsPermission((short)0700));
  662.     return home;
  663.   }
  664.   public void testHftpAccessControl() throws Exception {
  665.     MiniDFSCluster cluster = null;
  666.     try {
  667.       final UnixUserGroupInformation DFS_UGI = createUGI("dfs", true); 
  668.       final UnixUserGroupInformation USER_UGI = createUGI("user", false); 
  669.       //start cluster by DFS_UGI
  670.       final Configuration dfsConf = new Configuration();
  671.       UnixUserGroupInformation.saveToConf(dfsConf,
  672.           UnixUserGroupInformation.UGI_PROPERTY_NAME, DFS_UGI);
  673.       cluster = new MiniDFSCluster(dfsConf, 2, true, null);
  674.       cluster.waitActive();
  675.       final String httpAdd = dfsConf.get("dfs.http.address");
  676.       final URI nnURI = FileSystem.getDefaultUri(dfsConf);
  677.       final String nnUri = nnURI.toString();
  678.       final Path home = createHomeDirectory(FileSystem.get(nnURI, dfsConf), USER_UGI);
  679.       
  680.       //now, login as USER_UGI
  681.       final Configuration userConf = new Configuration();
  682.       UnixUserGroupInformation.saveToConf(userConf,
  683.           UnixUserGroupInformation.UGI_PROPERTY_NAME, USER_UGI);
  684.       final FileSystem fs = FileSystem.get(nnURI, userConf);
  685.       final Path srcrootpath = new Path(home, "src_root"); 
  686.       final String srcrootdir =  srcrootpath.toString();
  687.       final Path dstrootpath = new Path(home, "dst_root"); 
  688.       final String dstrootdir =  dstrootpath.toString();
  689.       final DistCp distcp = new DistCp(userConf);
  690.       FileSystem.mkdirs(fs, srcrootpath, new FsPermission((short)0700));
  691.       final String[] args = {"hftp://"+httpAdd+srcrootdir, nnUri+dstrootdir};
  692.       { //copy with permission 000, should fail
  693.         fs.setPermission(srcrootpath, new FsPermission((short)0));
  694.         assertEquals(-3, ToolRunner.run(distcp, args));
  695.       }
  696.     } finally {
  697.       if (cluster != null) { cluster.shutdown(); }
  698.     }
  699.   }
  700.   /** test -delete */
  701.   public void testDelete() throws Exception {
  702.     final Configuration conf = new Configuration();
  703.     MiniDFSCluster cluster = null;
  704.     try {
  705.       cluster = new MiniDFSCluster(conf, 2, true, null);
  706.       final URI nnURI = FileSystem.getDefaultUri(conf);
  707.       final String nnUri = nnURI.toString();
  708.       final FileSystem fs = FileSystem.get(URI.create(nnUri), conf);
  709.       final DistCp distcp = new DistCp(conf);
  710.       final FsShell shell = new FsShell(conf);  
  711.       final String srcrootdir = "/src_root";
  712.       final String dstrootdir = "/dst_root";
  713.       {
  714.         //create source files
  715.         createFiles(nnURI, srcrootdir);
  716.         String srcresults = execCmd(shell, "-lsr", srcrootdir);
  717.         srcresults = removePrefix(srcresults, srcrootdir);
  718.         System.out.println("srcresults=" +  srcresults);
  719.         //create some files in dst
  720.         createFiles(nnURI, dstrootdir);
  721.         System.out.println("dstrootdir=" +  dstrootdir);
  722.         shell.run(new String[]{"-lsr", dstrootdir});
  723.         //run distcp
  724.         ToolRunner.run(distcp,
  725.             new String[]{"-delete", "-update", "-log", "/log",
  726.                          nnUri+srcrootdir, nnUri+dstrootdir});
  727.         //make sure src and dst contains the same files
  728.         String dstresults = execCmd(shell, "-lsr", dstrootdir);
  729.         dstresults = removePrefix(dstresults, dstrootdir);
  730.         System.out.println("first dstresults=" +  dstresults);
  731.         assertEquals(srcresults, dstresults);
  732.         //create additional file in dst
  733.         create(fs, new Path(dstrootdir, "foo"));
  734.         create(fs, new Path(dstrootdir, "foobar"));
  735.         //run distcp again
  736.         ToolRunner.run(distcp,
  737.             new String[]{"-delete", "-update", "-log", "/log2",
  738.                          nnUri+srcrootdir, nnUri+dstrootdir});
  739.         
  740.         //make sure src and dst contains the same files
  741.         dstresults = execCmd(shell, "-lsr", dstrootdir);
  742.         dstresults = removePrefix(dstresults, dstrootdir);
  743.         System.out.println("second dstresults=" +  dstresults);
  744.         assertEquals(srcresults, dstresults);
  745.         //cleanup
  746.         deldir(fs, dstrootdir);
  747.         deldir(fs, srcrootdir);
  748.       }
  749.     } finally {
  750.       if (cluster != null) { cluster.shutdown(); }
  751.     }
  752.   }
  753.   static void create(FileSystem fs, Path f) throws IOException {
  754.     FSDataOutputStream out = fs.create(f);
  755.     try {
  756.       byte[] b = new byte[1024 + RAN.nextInt(1024)];
  757.       RAN.nextBytes(b);
  758.       out.write(b);
  759.     } finally {
  760.       if (out != null) out.close();
  761.     }
  762.   }
  763.   
  764.   static String execCmd(FsShell shell, String... args) throws Exception {
  765.     ByteArrayOutputStream baout = new ByteArrayOutputStream();
  766.     PrintStream out = new PrintStream(baout, true);
  767.     PrintStream old = System.out;
  768.     System.setOut(out);
  769.     shell.run(args);
  770.     out.close();
  771.     System.setOut(old);
  772.     return baout.toString();
  773.   }
  774.   
  775.   private static String removePrefix(String lines, String prefix) {
  776.     final int prefixlen = prefix.length();
  777.     final StringTokenizer t = new StringTokenizer(lines, "n");
  778.     final StringBuffer results = new StringBuffer(); 
  779.     for(; t.hasMoreTokens(); ) {
  780.       String s = t.nextToken();
  781.       results.append(s.substring(s.indexOf(prefix) + prefixlen) + "n");
  782.     }
  783.     return results.toString();
  784.   }
  785. }