TestFuseDFS.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:17k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. import org.apache.hadoop.hdfs.*;
  19. import junit.framework.TestCase;
  20. import java.io.*;
  21. import org.apache.hadoop.conf.Configuration;
  22. import org.apache.hadoop.fs.*;
  23. import org.apache.hadoop.fs.permission.*;
  24. import java.net.*;
  25. /**
  26.  * This class tests that the Fuse module for DFS can mount properly
  27.  * and does a few simple commands:
  28.  * mkdir
  29.  * rmdir
  30.  * ls
  31.  * cat
  32.  *
  33.  * cp and touch are purposely not tested because they won't work with the current module
  34.  *
  35.  */
  36. public class TestFuseDFS extends TestCase {
  37.   /**
  38.    * mount the fuse file system using assumed fuse module library installed in /usr/local/lib or somewhere else on your
  39.    * pre-existing LD_LIBRARY_PATH
  40.    *
  41.    */
  42.   static Process fuse_process;
  43.   static String fuse_cmd;
  44.   static private void mount(String mountpoint, URI dfs) throws IOException, InterruptedException  {
  45.     String cp = System.getProperty("java.class.path");
  46.     Runtime r = Runtime.getRuntime();
  47.     fuse_cmd = System.getProperty("build.test") + "/../fuse_dfs";
  48.     String libhdfs = System.getProperty("build.test") + "/../../../libhdfs/";
  49.     String arch = System.getProperty("os.arch");
  50.     String jvm = System.getProperty("java.home") + "/lib/" + arch + "/server";
  51.     String lp = System.getProperty("LD_LIBRARY_PATH") + ":" + "/usr/local/lib:" + libhdfs + ":" + jvm;
  52.     System.err.println("LD_LIBRARY_PATH=" + lp);
  53.     String cmd[] =  {  fuse_cmd, "dfs://" + dfs.getHost() + ":" + String.valueOf(dfs.getPort()), 
  54.                        mountpoint, "-obig_writes", "-odebug", "-oentry_timeout=0.1",  "-oattribute_timeout=0.1", "-ousetrash", "rw", "-oinitchecks",
  55.                        "-ordbuffer=32768"};
  56.     final String [] envp = {
  57.       "CLASSPATH="+  cp,
  58.       "LD_LIBRARY_PATH=" + lp,
  59.       "PATH=" + "/usr/bin:/bin"
  60.     };
  61.     // ensure the mount point is not currently mounted
  62.     Process p = r.exec("fusermount -u " + mountpoint);
  63.     p.waitFor();
  64.     // clean up the mount point
  65.     p = r.exec("rm -rf " + mountpoint);
  66.     assertTrue(p.waitFor() == 0);
  67.     // make the mount point if needed
  68.     p = r.exec("mkdir -p " + mountpoint);
  69.     assertTrue(p.waitFor() == 0);
  70.     // mount fuse to the mount point
  71.     fuse_process = r.exec(cmd, envp);
  72.     // give DFS a chance to come up
  73.     try { Thread.sleep(3000); } catch(Exception e) { }
  74.   }
  75.   /**
  76.    * unmounts fuse for before shutting down.
  77.    */
  78.   static private void umount(String mpoint) throws IOException, InterruptedException {
  79.     Runtime r= Runtime.getRuntime();
  80.     Process p = r.exec("fusermount -u " + mpoint);
  81.     p.waitFor();
  82.   }
  83.   /**
  84.    * Set things up - create mini dfs cluster and mount the fuse filesystem.
  85.    */
  86.   public TestFuseDFS() throws IOException,InterruptedException  {
  87.   }
  88.   static private MiniDFSCluster cluster;
  89.   static private DistributedFileSystem fileSys;
  90.   final static private String mpoint;
  91.   static {
  92.     mpoint = System.getProperty("build.test") + "/mnt";
  93.     System.runFinalizersOnExit(true);
  94.     startStuff();
  95.   }
  96.   static public void startStuff() {
  97.     try {
  98.       Configuration conf = new Configuration();
  99.       conf.setBoolean("dfs.permissions",false);
  100.       cluster = new MiniDFSCluster(conf, 1, true, null);
  101.       fileSys = (DistributedFileSystem)cluster.getFileSystem();
  102.       assertTrue(fileSys.getFileStatus(new Path("/")).isDir());
  103.       mount(mpoint, fileSys.getUri());
  104.     } catch(Exception e) {
  105.       e.printStackTrace();
  106.     }
  107.   }
  108.   public void setUp() {
  109.   }
  110.   /**
  111.    * use shell to create a dir and then use filesys to see it exists.
  112.    */
  113.   public void testMkdir() throws IOException,InterruptedException, Exception  {
  114.     try {
  115.       // First create a new directory with mkdirs
  116.       Path path = new Path("/foo");
  117.       Runtime r = Runtime.getRuntime();
  118.       String cmd = "mkdir -p " + mpoint + path.toString();
  119.       Process p = r.exec(cmd);
  120.       assertTrue(p.waitFor() == 0);
  121.       // check it is there
  122.       assertTrue(fileSys.getFileStatus(path).isDir());
  123.       // check again through the shell
  124.       String lsCmd = "ls " + mpoint + path.toString();
  125.       p = r.exec(lsCmd);
  126.       assertTrue(p.waitFor() == 0);
  127.     } catch(Exception e) {
  128.       e.printStackTrace();
  129.       throw e;
  130.     }
  131.   }
  132.   /**
  133.    * use shell to create a dir and then use filesys to see it exists.
  134.    */
  135.   public void testWrites() throws IOException,InterruptedException  {
  136.     try {
  137.       // write a hello file
  138.       File file = new File(mpoint, "hello.txt");
  139.       FileOutputStream f = new FileOutputStream(file);
  140.       String s = "hello ";
  141.       f.write(s.getBytes());
  142.       s = "world";
  143.       f.write(s.getBytes());
  144.       f.flush();
  145.       f.close();
  146.       try {
  147.         Thread.sleep(1000);
  148.       } catch(Exception e) {
  149.       }
  150.       // check the file exists.
  151.       Path myPath = new Path("/hello.txt");
  152.       assertTrue(fileSys.exists(myPath));
  153.       // check the data is ok
  154.       FileInputStream fi = new FileInputStream(new File(mpoint, "hello.txt"));
  155.       byte b[] = new byte[12];
  156.       int length = fi.read(b,0,12);
  157.       assertTrue(length > 0);
  158.       String s2 = new String( b, 0, length);
  159.       assertEquals("hello world", s2);
  160.     } catch(Exception e) {
  161.       e.printStackTrace();
  162.     } finally {
  163.     }
  164.   }
  165.   /**
  166.    * Test ls for dir already created in testMkdDir also tests bad ls
  167.    */
  168.   public void testLs() throws IOException,InterruptedException  {
  169.     try {
  170.       // First create a new directory with mkdirs
  171.       Runtime r = Runtime.getRuntime();
  172.       // mkdir
  173.       Process p = r.exec("mkdir -p " + mpoint + "/test/mkdirs");
  174.       assertTrue(p.waitFor() == 0);
  175.       // ls
  176.       p = r.exec("ls " + mpoint + "/test/mkdirs");
  177.       assertTrue(p.waitFor() == 0);
  178.       // ls non-existant directory
  179.       p = r.exec("ls " + mpoint + "/test/mkdirsNotThere");
  180.       int res = p.waitFor();
  181.       assertFalse(res == 0);
  182.     } catch(Exception e) {
  183.       e.printStackTrace();
  184.     }
  185.   }
  186.   /**
  187.    * Remove a dir using the shell and use filesys to see it no longer exists.
  188.    */
  189.   public void testRmdir() throws IOException,InterruptedException  {
  190.     try {
  191.       // First create a new directory with mkdirs
  192.       Runtime r = Runtime.getRuntime();
  193.       Process p = r.exec("mkdir -p " + mpoint + "/test/rmdir");
  194.       assertTrue(p.waitFor() == 0);
  195.       Path myPath = new Path("/test/rmdir");
  196.       assertTrue(fileSys.exists(myPath));
  197.       // remove it
  198.       p = r.exec("rmdir " + mpoint + "/test/rmdir");
  199.       assertTrue(p.waitFor() == 0);
  200.       // check it is not there
  201.       assertFalse(fileSys.exists(myPath));
  202.       Path trashPath = new Path("/user/root/.Trash/Current/test/rmdir");
  203.       assertTrue(fileSys.exists(trashPath));
  204.       // make it again to test trashing same thing twice
  205.       p = r.exec("mkdir -p " + mpoint + "/test/rmdir");
  206.       assertTrue(p.waitFor() == 0);
  207.       assertTrue(fileSys.exists(myPath));
  208.       // remove it
  209.       p = r.exec("rmdir " + mpoint + "/test/rmdir");
  210.       assertTrue(p.waitFor() == 0);
  211.       // check it is not there
  212.       assertFalse(fileSys.exists(myPath));
  213.       trashPath = new Path("/user/root/.Trash/Current/test/rmdir.1");
  214.       assertTrue(fileSys.exists(trashPath));
  215.     } catch(Exception e) {
  216.       e.printStackTrace();
  217.     }
  218.   }
  219.   /**
  220.    * use shell to create a dir and then use filesys to see it exists.
  221.    */
  222.   public void testDF() throws IOException,InterruptedException, Exception  {
  223.     try {
  224.       // First create a new directory with mkdirs
  225.       Path path = new Path("/foo");
  226.       Runtime r = Runtime.getRuntime();
  227.       String cmd = "mkdir -p " + mpoint + path.toString();
  228.       Process p = r.exec(cmd);
  229.       assertTrue(p.waitFor() == 0);
  230.       File f = new File(mpoint + "/foo");
  231.       DistributedFileSystem.DiskStatus d = fileSys.getDiskStatus();
  232.       long fileUsedBlocks =  (f.getTotalSpace() - f.getFreeSpace())/(64 * 1024 * 1024);
  233.       long dfsUsedBlocks = (long)Math.ceil((double)d.getDfsUsed()/(64 * 1024 * 1024));
  234.       assertTrue(fileUsedBlocks == dfsUsedBlocks);
  235.       assertTrue(d.getCapacity() == f.getTotalSpace());
  236.     } catch(Exception e) {
  237.       e.printStackTrace();
  238.       throw e;
  239.     }
  240.   }
  241.   /**
  242.    * use shell to create a dir and then use filesys to see it exists.
  243.    */
  244.   public void testChown() throws IOException,InterruptedException, Exception  {
  245.     try {
  246.       // First create a new directory with mkdirs
  247.       Path path = new Path("/foo");
  248.       Runtime r = Runtime.getRuntime();
  249.       String cmd = "mkdir -p " + mpoint + path.toString();
  250.       Process p = r.exec(cmd);
  251.       assertTrue(p.waitFor() == 0);
  252.       // check it is there
  253.       assertTrue(fileSys.getFileStatus(path).isDir());
  254.       FileStatus foo = fileSys.getFileStatus(path);
  255.       System.err.println("DEBUG:owner=" + foo.getOwner());
  256.       cmd = "chown nobody " + mpoint + path.toString();
  257.       p = r.exec(cmd);
  258.       assertTrue(p.waitFor() == 0);
  259.       //      cmd = "chgrp nobody " + mpoint + path.toString();
  260.       //      p = r.exec(cmd);
  261.       //      assertTrue(p.waitFor() == 0);
  262.       foo = fileSys.getFileStatus(path);
  263.       System.err.println("DEBUG:owner=" + foo.getOwner());
  264.       assertTrue(foo.getOwner().equals("nobody"));
  265.       assertTrue(foo.getGroup().equals("nobody"));
  266.     } catch(Exception e) {
  267.       e.printStackTrace();
  268.       throw e;
  269.     }
  270.   }
  271.   /**
  272.    * use shell to create a dir and then use filesys to see it exists.
  273.    */
  274.   public void testChmod() throws IOException,InterruptedException, Exception  {
  275.     try {
  276.       // First create a new directory with mkdirs
  277.       Path path = new Path("/foo");
  278.       Runtime r = Runtime.getRuntime();
  279.       String cmd = "mkdir -p " + mpoint + path.toString();
  280.       Process p = r.exec(cmd);
  281.       assertTrue(p.waitFor() == 0);
  282.       // check it is there
  283.       assertTrue(fileSys.getFileStatus(path).isDir());
  284.       cmd = "chmod 777 " + mpoint + path.toString();
  285.       p = r.exec(cmd);
  286.       assertTrue(p.waitFor() == 0);
  287.       FileStatus foo = fileSys.getFileStatus(path);
  288.       FsPermission perm = foo.getPermission();
  289.       assertTrue(perm.toShort() == 0777);
  290.     } catch(Exception e) {
  291.       e.printStackTrace();
  292.       throw e;
  293.     }
  294.   }
  295.   /**
  296.    * use shell to create a dir and then use filesys to see it exists.
  297.    */
  298.   public void testUtimes() throws IOException,InterruptedException, Exception  {
  299.     try {
  300.       // First create a new directory with mkdirs
  301.       Path path = new Path("/utimetest");
  302.       Runtime r = Runtime.getRuntime();
  303.       String cmd = "touch " + mpoint + path.toString();
  304.       Process p = r.exec(cmd);
  305.       assertTrue(p.waitFor() == 0);
  306.       // check it is there
  307.       assertTrue(fileSys.exists(path));
  308.       FileStatus foo = fileSys.getFileStatus(path);
  309.       long oldTime = foo.getModificationTime();
  310.       try { Thread.sleep(1000); } catch(Exception e) {}
  311.       cmd = "touch " + mpoint + path.toString();
  312.       p = r.exec(cmd);
  313.       assertTrue(p.waitFor() == 0);
  314.       try { Thread.sleep(1000); } catch(Exception e) {}
  315.       foo = fileSys.getFileStatus(path);
  316.       long newTime = foo.getModificationTime();
  317.       assertTrue(newTime > oldTime);
  318.     } catch(Exception e) {
  319.       e.printStackTrace();
  320.       throw e;
  321.     } finally {
  322.     }
  323.   }
  324.   /**
  325.    *
  326.    * Test dfs_read on a file size that will trigger multiple internal reads. 
  327.    * First, just check raw size reading is ok and then check with smaller reads
  328.    * including checking the validity of the data read.
  329.    *
  330.    */
  331.   public void testReads() throws IOException,InterruptedException  {
  332.     try {
  333.       // First create a new directory with mkdirs
  334.       Runtime r = Runtime.getRuntime();
  335.       Process p;
  336.       // create the file
  337.       Path myPath = new Path("/test/hello.reads");
  338.       FSDataOutputStream s = fileSys.create(myPath);
  339.       String hello = "hello world!";
  340.       int written = 0;
  341.       int mycount = 0;
  342.       while(written < 1024 * 9) {
  343.         s.writeUTF(hello);
  344.         s.writeInt(mycount++);
  345.         written += hello.length() + 4;
  346.       }
  347.       s.close();
  348.       // check it exists
  349.       assertTrue(fileSys.exists(myPath));
  350.       FileStatus foo = fileSys.getFileStatus(myPath);
  351.       assertTrue(foo.getLen() >= 9 * 1024);
  352.       {
  353.         // cat the file
  354.         DataInputStream is = new DataInputStream(new FileInputStream(mpoint + "/test/hello.reads"));
  355.         byte buf [] = new byte[4096];
  356.         // test reading 0 length
  357.         assertTrue(is.read(buf, 0, 0) == 0);
  358.         // test real reads
  359.         assertTrue(is.read(buf, 0, 1024) == 1024);
  360.         assertTrue(is.read(buf, 0, 4096) == 4096);
  361.         assertTrue(is.read(buf, 0, 4096) == 4096);
  362.         is.close();
  363.       }
  364.       {
  365.         DataInputStream is = new DataInputStream(new FileInputStream(mpoint + "/test/hello.reads"));
  366.         int read = 0;
  367.         int counter = 0;
  368.         try {
  369.           while(true) {
  370.             String s2 = DataInputStream.readUTF(is);
  371.             int s3 = is.readInt();
  372.             assertTrue(s2.equals(hello));
  373.             assertTrue(s3 == counter++);
  374.             read += hello.length() + 4;
  375.           }
  376.         } catch(EOFException e) {
  377.           assertTrue(read >= 9 * 1024);
  378.         }
  379.       }
  380.       // check reading an empty file for EOF
  381.       {
  382.         // create the file
  383.         myPath = new Path("/test/hello.reads2");
  384.         s = fileSys.create(myPath);
  385.         s.close();
  386.         FSDataInputStream fs = fileSys.open(myPath);
  387.         assertEquals(-1,  fs.read());
  388.         FileInputStream f = new FileInputStream(mpoint + "/test/hello.reads2");
  389.         assertEquals(-1, f.read());
  390.       }
  391.     } catch(Exception e) {
  392.       e.printStackTrace();
  393.     } finally {
  394.     }
  395.   }
  396.   /**
  397.    * Use filesys to create the hello world! file and then cat it and see its contents are correct.
  398.    */
  399.   public void testCat() throws IOException,InterruptedException  {
  400.     try {
  401.       // First create a new directory with mkdirs
  402.       Runtime r = Runtime.getRuntime();
  403.       Process p = r.exec("rm -rf " + mpoint + "/test/hello");
  404.       assertTrue(p.waitFor() == 0);
  405.       // create the file
  406.       Path myPath = new Path("/test/hello");
  407.       FSDataOutputStream s = fileSys.create(myPath);
  408.       String hello = "hello world!";
  409.       s.writeUTF(hello);
  410.       s.writeInt(1033);
  411.       s.close();
  412.       // check it exists
  413.       assertTrue(fileSys.exists(myPath));
  414.       // cat the file
  415.       DataInputStream is = new DataInputStream(new FileInputStream(mpoint + "/test/hello"));
  416.       String s2 = DataInputStream.readUTF(is);
  417.       int s3 = is.readInt();
  418.       assertTrue(s2.equals(hello));
  419.       assertTrue(s3 == 1033);
  420.     } catch(Exception e) {
  421.       e.printStackTrace();
  422.     } finally {
  423.     }
  424.   }
  425.   /**
  426.    * Use filesys to create the hello world! file and then cat it and see its contents are correct.
  427.    */
  428.   public void testAppends() throws IOException,InterruptedException  {
  429.     try {
  430.       // First create a new directory with mkdirs
  431.       Runtime r = Runtime.getRuntime();
  432.       {
  433.         FileOutputStream os = new FileOutputStream(mpoint + "/appends");
  434.         String hello = "hello";
  435.         os.write(hello.getBytes());
  436.         os.flush();
  437.         os.close();
  438.       }
  439.       // check it exists
  440.       Path myPath = new Path("/appends");
  441.       assertTrue(fileSys.exists(myPath));
  442.       try {
  443.         Thread.sleep(1000);
  444.       } catch(Exception e) {
  445.       }
  446.       FileStatus foo = fileSys.getFileStatus(myPath);
  447.       File f = new File(mpoint + "/appends");
  448.       assertTrue(f.length() > 0);
  449.       {
  450.         FileOutputStream os = new FileOutputStream(mpoint + "/appends", true);
  451.         String hello = " world!";
  452.         os.write(hello.getBytes());
  453.         os.flush();
  454.         os.close();
  455.       }
  456.       // cat the file
  457.       FileInputStream is = new FileInputStream(mpoint + "/appends");
  458.       byte b[] = new byte[1024];
  459.       int len = is.read(b);
  460.       assertTrue(len > 0);
  461.       String s2 = new String(b,0,len);
  462.       assertTrue(s2.equals("hello world!"));
  463.     } catch(Exception e) {
  464.       e.printStackTrace();
  465.     } finally {
  466.     }
  467.   }
  468.   public void testDone() throws IOException {
  469.       close();
  470.   }
  471.   /**
  472.    * Unmount and close
  473.    */
  474.   protected void tearDown() throws Exception {
  475.   }
  476.   /**
  477.    * Unmount and close
  478.    */
  479.   protected void finalize() throws Throwable {
  480.     close();
  481.   }
  482.   public void close() {
  483.     try {
  484.       int length;
  485.       // print out the fuse debug output
  486.       {
  487.       do {
  488.       InputStream i = fuse_process.getInputStream();
  489.       byte b[] = new byte[i.available()];
  490.       length = i.read(b);
  491.       System.err.println("read x bytes: " + length);
  492.       System.err.write(b,0,b.length);
  493.       } while(length > 0) ;
  494.       }
  495.       do {
  496.       InputStream i = fuse_process.getErrorStream();
  497.       byte b[] = new byte[i.available()];
  498.       length = i.read(b);
  499.       System.err.println("read x bytes: " + length);
  500.       System.err.write(b,0,b.length);
  501.       } while(length > 0) ;
  502.       umount(mpoint);
  503.       fuse_process.destroy();
  504.       fuse_process = null;
  505.         if(fileSys != null) {
  506.         fileSys.close();
  507.         fileSys = null;
  508.       }
  509.       if(cluster != null) {
  510.         cluster.shutdown();
  511.         cluster = null;
  512.       }
  513.     } catch(Exception e) { }
  514.   }
  515. };