TestFileCreation.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:27k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfs;
  19. import java.io.BufferedReader;
  20. import java.io.File;
  21. import java.io.FileReader;
  22. import java.io.IOException;
  23. import java.net.InetSocketAddress;
  24. import org.apache.commons.logging.impl.Log4JLogger;
  25. import org.apache.hadoop.conf.Configuration;
  26. import org.apache.hadoop.fs.BlockLocation;
  27. import org.apache.hadoop.fs.FSDataInputStream;
  28. import org.apache.hadoop.fs.FSDataOutputStream;
  29. import org.apache.hadoop.fs.FileStatus;
  30. import org.apache.hadoop.fs.FileSystem;
  31. import org.apache.hadoop.fs.Path;
  32. import org.apache.hadoop.hdfs.protocol.Block;
  33. import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
  34. import org.apache.hadoop.hdfs.protocol.FSConstants;
  35. import org.apache.hadoop.hdfs.protocol.LocatedBlock;
  36. import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
  37. import org.apache.hadoop.hdfs.server.datanode.DataNode;
  38. import org.apache.hadoop.hdfs.server.datanode.FSDataset;
  39. import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
  40. import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
  41. import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
  42. import org.apache.hadoop.io.IOUtils;
  43. import org.apache.log4j.Level;
  44. /**
  45.  * This class tests that a file need not be closed before its
  46.  * data can be read by another client.
  47.  */
  48. public class TestFileCreation extends junit.framework.TestCase {
  49.   static final String DIR = "/" + TestFileCreation.class.getSimpleName() + "/";
  50.   {
  51.     //((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
  52.     ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
  53.     ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
  54.     ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
  55.   }
  56.   static final long seed = 0xDEADBEEFL;
  57.   static final int blockSize = 8192;
  58.   static final int numBlocks = 2;
  59.   static final int fileSize = numBlocks * blockSize + 1;
  60.   boolean simulatedStorage = false;
  61.   // The test file is 2 times the blocksize plus one. This means that when the
  62.   // entire file is written, the first two blocks definitely get flushed to
  63.   // the datanodes.
  64.   // creates a file but does not close it
  65.   static FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl)
  66.     throws IOException {
  67.     System.out.println("createFile: Created " + name + " with " + repl + " replica.");
  68.     FSDataOutputStream stm = fileSys.create(name, true,
  69.                                             fileSys.getConf().getInt("io.file.buffer.size", 4096),
  70.                                             (short)repl, (long)blockSize);
  71.     return stm;
  72.   }
  73.   //
  74.   // writes to file but does not close it
  75.   //
  76.   static void writeFile(FSDataOutputStream stm) throws IOException {
  77.     writeFile(stm, fileSize);
  78.   }
  79.   //
  80.   // writes specified bytes to file.
  81.   //
  82.   static void writeFile(FSDataOutputStream stm, int size) throws IOException {
  83.     byte[] buffer = AppendTestUtil.randomBytes(seed, size);
  84.     stm.write(buffer, 0, size);
  85.   }
  86.   //
  87.   // verify that the data written to the full blocks are sane
  88.   // 
  89.   private void checkFile(FileSystem fileSys, Path name, int repl)
  90.     throws IOException {
  91.     boolean done = false;
  92.     // wait till all full blocks are confirmed by the datanodes.
  93.     while (!done) {
  94.       try {
  95.         Thread.sleep(1000);
  96.       } catch (InterruptedException e) {}
  97.       done = true;
  98.       BlockLocation[] locations = fileSys.getFileBlockLocations(
  99.           fileSys.getFileStatus(name), 0, fileSize);
  100.       if (locations.length < numBlocks) {
  101.         done = false;
  102.         continue;
  103.       }
  104.       for (int idx = 0; idx < locations.length; idx++) {
  105.         if (locations[idx].getHosts().length < repl) {
  106.           done = false;
  107.           break;
  108.         }
  109.       }
  110.     }
  111.     FSDataInputStream stm = fileSys.open(name);
  112.     final byte[] expected;
  113.     if (simulatedStorage) {
  114.       expected = new byte[numBlocks * blockSize];
  115.       for (int i= 0; i < expected.length; i++) {  
  116.         expected[i] = SimulatedFSDataset.DEFAULT_DATABYTE;
  117.       }
  118.     } else {
  119.       expected = AppendTestUtil.randomBytes(seed, numBlocks*blockSize);
  120.     }
  121.     // do a sanity check. Read the file
  122.     byte[] actual = new byte[numBlocks * blockSize];
  123.     stm.readFully(0, actual);
  124.     stm.close();
  125.     checkData(actual, 0, expected, "Read 1");
  126.   }
  127.   static private void checkData(byte[] actual, int from, byte[] expected, String message) {
  128.     for (int idx = 0; idx < actual.length; idx++) {
  129.       assertEquals(message+" byte "+(from+idx)+" differs. expected "+
  130.                    expected[from+idx]+" actual "+actual[idx],
  131.                    expected[from+idx], actual[idx]);
  132.       actual[idx] = 0;
  133.     }
  134.   }
  135.   static void checkFullFile(FileSystem fs, Path name) throws IOException {
  136.     FileStatus stat = fs.getFileStatus(name);
  137.     BlockLocation[] locations = fs.getFileBlockLocations(stat, 0, 
  138.                                                          fileSize);
  139.     for (int idx = 0; idx < locations.length; idx++) {
  140.       String[] hosts = locations[idx].getNames();
  141.       for (int i = 0; i < hosts.length; i++) {
  142.         System.out.print( hosts[i] + " ");
  143.       }
  144.       System.out.println(" off " + locations[idx].getOffset() +
  145.                          " len " + locations[idx].getLength());
  146.     }
  147.     byte[] expected = AppendTestUtil.randomBytes(seed, fileSize);
  148.     FSDataInputStream stm = fs.open(name);
  149.     byte[] actual = new byte[fileSize];
  150.     stm.readFully(0, actual);
  151.     checkData(actual, 0, expected, "Read 2");
  152.     stm.close();
  153.   }
  154.   /**
  155.    * Test that file data becomes available before file is closed.
  156.    */
  157.   public void testFileCreation() throws IOException {
  158.     Configuration conf = new Configuration();
  159.     if (simulatedStorage) {
  160.       conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
  161.     }
  162.     MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
  163.     FileSystem fs = cluster.getFileSystem();
  164.     try {
  165.       //
  166.       // check that / exists
  167.       //
  168.       Path path = new Path("/");
  169.       System.out.println("Path : "" + path.toString() + """);
  170.       System.out.println(fs.getFileStatus(path).isDir()); 
  171.       assertTrue("/ should be a directory", 
  172.                  fs.getFileStatus(path).isDir() == true);
  173.       //
  174.       // Create a directory inside /, then try to overwrite it
  175.       //
  176.       Path dir1 = new Path("/test_dir");
  177.       fs.mkdirs(dir1);
  178.       System.out.println("createFile: Creating " + dir1.getName() + 
  179.         " for overwrite of existing directory.");
  180.       try {
  181.         fs.create(dir1, true); // Create path, overwrite=true
  182.         fs.close();
  183.         assertTrue("Did not prevent directory from being overwritten.", false);
  184.       } catch (IOException ie) {
  185.         if (!ie.getMessage().contains("already exists as a directory."))
  186.           throw ie;
  187.       }
  188.       
  189.       // create a new file in home directory. Do not close it.
  190.       //
  191.       Path file1 = new Path("filestatus.dat");
  192.       FSDataOutputStream stm = createFile(fs, file1, 1);
  193.       // verify that file exists in FS namespace
  194.       assertTrue(file1 + " should be a file", 
  195.                   fs.getFileStatus(file1).isDir() == false);
  196.       System.out.println("Path : "" + file1 + """);
  197.       // write to file
  198.       writeFile(stm);
  199.       // Make sure a client can read it before it is closed.
  200.       checkFile(fs, file1, 1);
  201.       // verify that file size has changed
  202.       long len = fs.getFileStatus(file1).getLen();
  203.       assertTrue(file1 + " should be of size " + (numBlocks * blockSize) +
  204.                  " but found to be of size " + len, 
  205.                   len == numBlocks * blockSize);
  206.       stm.close();
  207.       // verify that file size has changed to the full size
  208.       len = fs.getFileStatus(file1).getLen();
  209.       assertTrue(file1 + " should be of size " + fileSize +
  210.                  " but found to be of size " + len, 
  211.                   len == fileSize);
  212.       
  213.       
  214.       // Check storage usage 
  215.       // can't check capacities for real storage since the OS file system may be changing under us.
  216.       if (simulatedStorage) {
  217.         DataNode dn = cluster.getDataNodes().get(0);
  218.         assertEquals(fileSize, dn.getFSDataset().getDfsUsed());
  219.         assertEquals(SimulatedFSDataset.DEFAULT_CAPACITY-fileSize, dn.getFSDataset().getRemaining());
  220.       }
  221.     } finally {
  222.       cluster.shutdown();
  223.     }
  224.   }
  225.   /**
  226.    * Test deleteOnExit
  227.    */
  228.   public void testDeleteOnExit() throws IOException {
  229.     Configuration conf = new Configuration();
  230.     if (simulatedStorage) {
  231.       conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
  232.     }
  233.     MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
  234.     FileSystem fs = cluster.getFileSystem();
  235.     FileSystem localfs = FileSystem.getLocal(conf);
  236.     try {
  237.       // Creates files in HDFS and local file system.
  238.       //
  239.       Path file1 = new Path("filestatus.dat");
  240.       Path file2 = new Path("filestatus2.dat");
  241.       Path file3 = new Path("filestatus3.dat");
  242.       FSDataOutputStream stm1 = createFile(fs, file1, 1);
  243.       FSDataOutputStream stm2 = createFile(fs, file2, 1);
  244.       FSDataOutputStream stm3 = createFile(localfs, file3, 1);
  245.       System.out.println("DeleteOnExit: Created files.");
  246.       // write to files and close. Purposely, do not close file2.
  247.       writeFile(stm1);
  248.       writeFile(stm3);
  249.       stm1.close();
  250.       stm2.close();
  251.       stm3.close();
  252.       // set delete on exit flag on files.
  253.       fs.deleteOnExit(file1);
  254.       fs.deleteOnExit(file2);
  255.       localfs.deleteOnExit(file3);
  256.       // close the file system. This should make the above files
  257.       // disappear.
  258.       fs.close();
  259.       localfs.close();
  260.       fs = null;
  261.       localfs = null;
  262.       // reopen file system and verify that file does not exist.
  263.       fs = cluster.getFileSystem();
  264.       localfs = FileSystem.getLocal(conf);
  265.       assertTrue(file1 + " still exists inspite of deletOnExit set.",
  266.                  !fs.exists(file1));
  267.       assertTrue(file2 + " still exists inspite of deletOnExit set.",
  268.                  !fs.exists(file2));
  269.       assertTrue(file3 + " still exists inspite of deletOnExit set.",
  270.                  !localfs.exists(file3));
  271.       System.out.println("DeleteOnExit successful.");
  272.     } finally {
  273.       IOUtils.closeStream(fs);
  274.       IOUtils.closeStream(localfs);
  275.       cluster.shutdown();
  276.     }
  277.   }
  278.   /**
  279.    * Test that file data does not become corrupted even in the face of errors.
  280.    */
  281.   public void testFileCreationError1() throws IOException {
  282.     Configuration conf = new Configuration();
  283.     conf.setInt("heartbeat.recheck.interval", 1000);
  284.     conf.setInt("dfs.heartbeat.interval", 1);
  285.     if (simulatedStorage) {
  286.       conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
  287.     }
  288.     // create cluster
  289.     MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
  290.     FileSystem fs = cluster.getFileSystem();
  291.     cluster.waitActive();
  292.     InetSocketAddress addr = new InetSocketAddress("localhost",
  293.                                                    cluster.getNameNodePort());
  294.     DFSClient client = new DFSClient(addr, conf);
  295.     try {
  296.       // create a new file.
  297.       //
  298.       Path file1 = new Path("/filestatus.dat");
  299.       FSDataOutputStream stm = createFile(fs, file1, 1);
  300.       // verify that file exists in FS namespace
  301.       assertTrue(file1 + " should be a file", 
  302.                   fs.getFileStatus(file1).isDir() == false);
  303.       System.out.println("Path : "" + file1 + """);
  304.       // kill the datanode
  305.       cluster.shutdownDataNodes();
  306.       // wait for the datanode to be declared dead
  307.       while (true) {
  308.         DatanodeInfo[] info = client.datanodeReport(
  309.             FSConstants.DatanodeReportType.LIVE);
  310.         if (info.length == 0) {
  311.           break;
  312.         }
  313.         System.out.println("testFileCreationError1: waiting for datanode " +
  314.                            " to die.");
  315.         try {
  316.           Thread.sleep(1000);
  317.         } catch (InterruptedException e) {
  318.         }
  319.       }
  320.       // write 1 byte to file. 
  321.       // This should fail because all datanodes are dead.
  322.       byte[] buffer = AppendTestUtil.randomBytes(seed, 1);
  323.       try {
  324.         stm.write(buffer);
  325.         stm.close();
  326.       } catch (Exception e) {
  327.         System.out.println("Encountered expected exception");
  328.       }
  329.       // verify that no blocks are associated with this file
  330.       // bad block allocations were cleaned up earlier.
  331.       LocatedBlocks locations = client.namenode.getBlockLocations(
  332.                                   file1.toString(), 0, Long.MAX_VALUE);
  333.       System.out.println("locations = " + locations.locatedBlockCount());
  334.       assertTrue("Error blocks were not cleaned up",
  335.                  locations.locatedBlockCount() == 0);
  336.     } finally {
  337.       cluster.shutdown();
  338.       client.close();
  339.     }
  340.   }
  341.   /**
  342.    * Test that the filesystem removes the last block from a file if its
  343.    * lease expires.
  344.    */
  345.   public void testFileCreationError2() throws IOException {
  346.     long leasePeriod = 1000;
  347.     System.out.println("testFileCreationError2 start");
  348.     Configuration conf = new Configuration();
  349.     conf.setInt("heartbeat.recheck.interval", 1000);
  350.     conf.setInt("dfs.heartbeat.interval", 1);
  351.     if (simulatedStorage) {
  352.       conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
  353.     }
  354.     // create cluster
  355.     MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
  356.     DistributedFileSystem dfs = null;
  357.     try {
  358.       cluster.waitActive();
  359.       dfs = (DistributedFileSystem)cluster.getFileSystem();
  360.       DFSClient client = dfs.dfs;
  361.       // create a new file.
  362.       //
  363.       Path file1 = new Path("/filestatus.dat");
  364.       createFile(dfs, file1, 1);
  365.       System.out.println("testFileCreationError2: "
  366.                          + "Created file filestatus.dat with one replicas.");
  367.       LocatedBlocks locations = client.namenode.getBlockLocations(
  368.                                   file1.toString(), 0, Long.MAX_VALUE);
  369.       System.out.println("testFileCreationError2: "
  370.           + "The file has " + locations.locatedBlockCount() + " blocks.");
  371.       // add another block to the file
  372.       LocatedBlock location = client.namenode.addBlock(file1.toString(), 
  373.           client.clientName);
  374.       System.out.println("testFileCreationError2: "
  375.           + "Added block " + location.getBlock());
  376.       locations = client.namenode.getBlockLocations(file1.toString(), 
  377.                                                     0, Long.MAX_VALUE);
  378.       int count = locations.locatedBlockCount();
  379.       System.out.println("testFileCreationError2: "
  380.           + "The file now has " + count + " blocks.");
  381.       
  382.       // set the soft and hard limit to be 1 second so that the
  383.       // namenode triggers lease recovery
  384.       cluster.setLeasePeriod(leasePeriod, leasePeriod);
  385.       // wait for the lease to expire
  386.       try {
  387.         Thread.sleep(5 * leasePeriod);
  388.       } catch (InterruptedException e) {
  389.       }
  390.       // verify that the last block was synchronized.
  391.       locations = client.namenode.getBlockLocations(file1.toString(), 
  392.                                                     0, Long.MAX_VALUE);
  393.       System.out.println("testFileCreationError2: "
  394.           + "locations = " + locations.locatedBlockCount());
  395.       assertEquals(0, locations.locatedBlockCount());
  396.       System.out.println("testFileCreationError2 successful");
  397.     } finally {
  398.       IOUtils.closeStream(dfs);
  399.       cluster.shutdown();
  400.     }
  401.   }
  402.   /**
  403.    * Test that file leases are persisted across namenode restarts.
  404.    * This test is currently not triggered because more HDFS work is 
  405.    * is needed to handle persistent leases.
  406.    */
  407.   public void xxxtestFileCreationNamenodeRestart() throws IOException {
  408.     Configuration conf = new Configuration();
  409.     final int MAX_IDLE_TIME = 2000; // 2s
  410.     conf.setInt("ipc.client.connection.maxidletime", MAX_IDLE_TIME);
  411.     conf.setInt("heartbeat.recheck.interval", 1000);
  412.     conf.setInt("dfs.heartbeat.interval", 1);
  413.     if (simulatedStorage) {
  414.       conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
  415.     }
  416.     // create cluster
  417.     MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
  418.     FileSystem fs = null;
  419.     try {
  420.       cluster.waitActive();
  421.       fs = cluster.getFileSystem();
  422.       final int nnport = cluster.getNameNodePort();
  423.       // create a new file.
  424.       Path file1 = new Path("/filestatus.dat");
  425.       FSDataOutputStream stm = createFile(fs, file1, 1);
  426.       System.out.println("testFileCreationNamenodeRestart: "
  427.                          + "Created file " + file1);
  428.       // write two full blocks.
  429.       writeFile(stm, numBlocks * blockSize);
  430.       stm.sync();
  431.       // rename file wile keeping it open.
  432.       Path fileRenamed = new Path("/filestatusRenamed.dat");
  433.       fs.rename(file1, fileRenamed);
  434.       System.out.println("testFileCreationNamenodeRestart: "
  435.                          + "Renamed file " + file1 + " to " +
  436.                          fileRenamed);
  437.       file1 = fileRenamed;
  438.       // create another new file.
  439.       //
  440.       Path file2 = new Path("/filestatus2.dat");
  441.       FSDataOutputStream stm2 = createFile(fs, file2, 1);
  442.       System.out.println("testFileCreationNamenodeRestart: "
  443.                          + "Created file " + file2);
  444.       // create yet another new file with full path name. 
  445.       // rename it while open
  446.       //
  447.       Path file3 = new Path("/user/home/fullpath.dat");
  448.       FSDataOutputStream stm3 = createFile(fs, file3, 1);
  449.       System.out.println("testFileCreationNamenodeRestart: "
  450.                          + "Created file " + file3);
  451.       Path file4 = new Path("/user/home/fullpath4.dat");
  452.       FSDataOutputStream stm4 = createFile(fs, file4, 1);
  453.       System.out.println("testFileCreationNamenodeRestart: "
  454.                          + "Created file " + file4);
  455.       fs.mkdirs(new Path("/bin"));
  456.       fs.rename(new Path("/user/home"), new Path("/bin"));
  457.       Path file3new = new Path("/bin/home/fullpath.dat");
  458.       System.out.println("testFileCreationNamenodeRestart: "
  459.                          + "Renamed file " + file3 + " to " +
  460.                          file3new);
  461.       Path file4new = new Path("/bin/home/fullpath4.dat");
  462.       System.out.println("testFileCreationNamenodeRestart: "
  463.                          + "Renamed file " + file4 + " to " +
  464.                          file4new);
  465.       // restart cluster with the same namenode port as before.
  466.       // This ensures that leases are persisted in fsimage.
  467.       cluster.shutdown();
  468.       try {
  469.         Thread.sleep(2*MAX_IDLE_TIME);
  470.       } catch (InterruptedException e) {
  471.       }
  472.       cluster = new MiniDFSCluster(nnport, conf, 1, false, true, 
  473.                                    null, null, null);
  474.       cluster.waitActive();
  475.       // restart cluster yet again. This triggers the code to read in
  476.       // persistent leases from fsimage.
  477.       cluster.shutdown();
  478.       try {
  479.         Thread.sleep(5000);
  480.       } catch (InterruptedException e) {
  481.       }
  482.       cluster = new MiniDFSCluster(nnport, conf, 1, false, true, 
  483.                                    null, null, null);
  484.       cluster.waitActive();
  485.       fs = cluster.getFileSystem();
  486.       // instruct the dfsclient to use a new filename when it requests
  487.       // new blocks for files that were renamed.
  488.       DFSClient.DFSOutputStream dfstream = (DFSClient.DFSOutputStream)
  489.                                                  (stm.getWrappedStream());
  490.       dfstream.setTestFilename(file1.toString());
  491.       dfstream = (DFSClient.DFSOutputStream) (stm3.getWrappedStream());
  492.       dfstream.setTestFilename(file3new.toString());
  493.       dfstream = (DFSClient.DFSOutputStream) (stm4.getWrappedStream());
  494.       dfstream.setTestFilename(file4new.toString());
  495.       // write 1 byte to file.  This should succeed because the 
  496.       // namenode should have persisted leases.
  497.       byte[] buffer = AppendTestUtil.randomBytes(seed, 1);
  498.       stm.write(buffer);
  499.       stm.close();
  500.       stm2.write(buffer);
  501.       stm2.close();
  502.       stm3.close();
  503.       stm4.close();
  504.       // verify that new block is associated with this file
  505.       DFSClient client = ((DistributedFileSystem)fs).dfs;
  506.       LocatedBlocks locations = client.namenode.getBlockLocations(
  507.                                   file1.toString(), 0, Long.MAX_VALUE);
  508.       System.out.println("locations = " + locations.locatedBlockCount());
  509.       assertTrue("Error blocks were not cleaned up for file " + file1,
  510.                  locations.locatedBlockCount() == 3);
  511.       // verify filestatus2.dat
  512.       locations = client.namenode.getBlockLocations(
  513.                                   file2.toString(), 0, Long.MAX_VALUE);
  514.       System.out.println("locations = " + locations.locatedBlockCount());
  515.       assertTrue("Error blocks were not cleaned up for file " + file2,
  516.                  locations.locatedBlockCount() == 1);
  517.     } finally {
  518.       IOUtils.closeStream(fs);
  519.       cluster.shutdown();
  520.     }
  521.   }
  522.   /**
  523.    * Test that all open files are closed when client dies abnormally.
  524.    */
  525.   public void testDFSClientDeath() throws IOException {
  526.     Configuration conf = new Configuration();
  527.     System.out.println("Testing adbornal client death.");
  528.     if (simulatedStorage) {
  529.       conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
  530.     }
  531.     MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
  532.     FileSystem fs = cluster.getFileSystem();
  533.     DistributedFileSystem dfs = (DistributedFileSystem) fs;
  534.     DFSClient dfsclient = dfs.dfs;
  535.     try {
  536.       // create a new file in home directory. Do not close it.
  537.       //
  538.       Path file1 = new Path("/clienttest.dat");
  539.       FSDataOutputStream stm = createFile(fs, file1, 1);
  540.       System.out.println("Created file clienttest.dat");
  541.       // write to file
  542.       writeFile(stm);
  543.       // close the dfsclient before closing the output stream.
  544.       // This should close all existing file.
  545.       dfsclient.close();
  546.       // reopen file system and verify that file exists.
  547.       assertTrue(file1 + " does not exist.", 
  548.           AppendTestUtil.createHdfsWithDifferentUsername(conf).exists(file1));
  549.     } finally {
  550.       cluster.shutdown();
  551.     }
  552.   }
  553. /**
  554.  * Test that file data becomes available before file is closed.
  555.  */
  556.   public void testFileCreationSimulated() throws IOException {
  557.     simulatedStorage = true;
  558.     testFileCreation();
  559.     simulatedStorage = false;
  560.   }
  561.   /**
  562.    * Test creating two files at the same time. 
  563.    */
  564.   public void testConcurrentFileCreation() throws IOException {
  565.     Configuration conf = new Configuration();
  566.     MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
  567.     try {
  568.       FileSystem fs = cluster.getFileSystem();
  569.       
  570.       Path[] p = {new Path("/foo"), new Path("/bar")};
  571.       
  572.       //write 2 files at the same time
  573.       FSDataOutputStream[] out = {fs.create(p[0]), fs.create(p[1])};
  574.       int i = 0;
  575.       for(; i < 100; i++) {
  576.         out[0].write(i);
  577.         out[1].write(i);
  578.       }
  579.       out[0].close();
  580.       for(; i < 200; i++) {out[1].write(i);}
  581.       out[1].close();
  582.       //verify
  583.       FSDataInputStream[] in = {fs.open(p[0]), fs.open(p[1])};  
  584.       for(i = 0; i < 100; i++) {assertEquals(i, in[0].read());}
  585.       for(i = 0; i < 200; i++) {assertEquals(i, in[1].read());}
  586.     } finally {
  587.       if (cluster != null) {cluster.shutdown();}
  588.     }
  589.   }
  590.   /**
  591.    * Create a file, write something, fsync but not close.
  592.    * Then change lease period and wait for lease recovery.
  593.    * Finally, read the block directly from each Datanode and verify the content.
  594.    */
  595.   public void testLeaseExpireHardLimit() throws Exception {
  596.     System.out.println("testLeaseExpireHardLimit start");
  597.     final long leasePeriod = 1000;
  598.     final int DATANODE_NUM = 3;
  599.     Configuration conf = new Configuration();
  600.     conf.setInt("heartbeat.recheck.interval", 1000);
  601.     conf.setInt("dfs.heartbeat.interval", 1);
  602.     // create cluster
  603.     MiniDFSCluster cluster = new MiniDFSCluster(conf, DATANODE_NUM, true, null);
  604.     DistributedFileSystem dfs = null;
  605.     try {
  606.       cluster.waitActive();
  607.       dfs = (DistributedFileSystem)cluster.getFileSystem();
  608.       // create a new file.
  609.       final String f = DIR + "foo";
  610.       final Path fpath = new Path(f);
  611.       FSDataOutputStream out = TestFileCreation.createFile(dfs, fpath, DATANODE_NUM);
  612.       out.write("something".getBytes());
  613.       out.sync();
  614.       // set the soft and hard limit to be 1 second so that the
  615.       // namenode triggers lease recovery
  616.       cluster.setLeasePeriod(leasePeriod, leasePeriod);
  617.       // wait for the lease to expire
  618.       try {Thread.sleep(5 * leasePeriod);} catch (InterruptedException e) {}
  619.       LocatedBlocks locations = dfs.dfs.namenode.getBlockLocations(
  620.           f, 0, Long.MAX_VALUE);
  621.       assertEquals(1, locations.locatedBlockCount());
  622.       LocatedBlock locatedblock = locations.getLocatedBlocks().get(0);
  623.       int successcount = 0;
  624.       for(DatanodeInfo datanodeinfo: locatedblock.getLocations()) {
  625.         DataNode datanode = cluster.getDataNode(datanodeinfo.ipcPort);
  626.         FSDataset dataset = (FSDataset)datanode.data;
  627.         Block b = dataset.getStoredBlock(locatedblock.getBlock().getBlockId());
  628.         File blockfile = dataset.findBlockFile(b.getBlockId());
  629.         System.out.println("blockfile=" + blockfile);
  630.         if (blockfile != null) {
  631.           BufferedReader in = new BufferedReader(new FileReader(blockfile));
  632.           assertEquals("something", in.readLine());
  633.           in.close();
  634.           successcount++;
  635.         }
  636.       }
  637.       System.out.println("successcount=" + successcount);
  638.       assertTrue(successcount > 0); 
  639.     } finally {
  640.       IOUtils.closeStream(dfs);
  641.       cluster.shutdown();
  642.     }
  643.     System.out.println("testLeaseExpireHardLimit successful");
  644.   }
  645.   // test closing file system before all file handles are closed.
  646.   public void testFsClose() throws Exception {
  647.     System.out.println("test file system close start");
  648.     final int DATANODE_NUM = 3;
  649.     Configuration conf = new Configuration();
  650.     // create cluster
  651.     MiniDFSCluster cluster = new MiniDFSCluster(conf, DATANODE_NUM, true, null);
  652.     DistributedFileSystem dfs = null;
  653.     try {
  654.       cluster.waitActive();
  655.       dfs = (DistributedFileSystem)cluster.getFileSystem();
  656.       // create a new file.
  657.       final String f = DIR + "foofs";
  658.       final Path fpath = new Path(f);
  659.       FSDataOutputStream out = TestFileCreation.createFile(dfs, fpath, DATANODE_NUM);
  660.       out.write("something".getBytes());
  661.       // close file system without closing file
  662.       dfs.close();
  663.     } finally {
  664.       System.out.println("testFsClose successful");
  665.     }
  666.   }
  667. }