TestFileAppend2.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:15k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfs;
  19. import java.io.IOException;
  20. import java.util.ArrayList;
  21. import java.util.Arrays;
  22. import junit.framework.TestCase;
  23. import org.apache.hadoop.conf.Configuration;
  24. import org.apache.hadoop.fs.FSDataInputStream;
  25. import org.apache.hadoop.fs.FSDataOutputStream;
  26. import org.apache.hadoop.fs.FileSystem;
  27. import org.apache.hadoop.fs.Path;
  28. import org.apache.hadoop.fs.permission.FsPermission;
  29. import org.apache.hadoop.hdfs.server.datanode.DataNode;
  30. import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
  31. import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
  32. import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
  33. import org.apache.hadoop.hdfs.server.namenode.NameNode;
  34. import org.apache.hadoop.io.IOUtils;
  35. import org.apache.hadoop.security.AccessControlException;
  36. import org.apache.hadoop.security.UnixUserGroupInformation;
  37. import org.apache.hadoop.security.UserGroupInformation;
  38. import org.apache.commons.logging.impl.Log4JLogger;
  39. import org.apache.log4j.Level;
  40. /**
  41.  * This class tests the building blocks that are needed to
  42.  * support HDFS appends.
  43.  */
  44. public class TestFileAppend2 extends TestCase {
  45.   {
  46.     ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
  47.     ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
  48.     ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
  49.     ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
  50.     ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
  51.   }
  52.   static final int blockSize = 1024;
  53.   static final int numBlocks = 5;
  54.   static final int fileSize = numBlocks * blockSize + 1;
  55.   boolean simulatedStorage = false;
  56.   private byte[] fileContents = null;
  57.   int numDatanodes = 5;
  58.   int numberOfFiles = 50;
  59.   int numThreads = 10;
  60.   int numAppendsPerThread = 20;
  61. /***
  62.   int numberOfFiles = 1;
  63.   int numThreads = 1;
  64.   int numAppendsPerThread = 2000;
  65. ****/
  66.   Workload[] workload = null;
  67.   ArrayList<Path> testFiles = new ArrayList<Path>();
  68.   volatile static boolean globalStatus = true;
  69.   //
  70.   // create a buffer that contains the entire test file data.
  71.   //
  72.   private void initBuffer(int size) {
  73.     long seed = AppendTestUtil.nextLong();
  74.     fileContents = AppendTestUtil.randomBytes(seed, size);
  75.   }
  76.   /*
  77.    * creates a file but does not close it
  78.    */ 
  79.   private FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl)
  80.     throws IOException {
  81.     FSDataOutputStream stm = fileSys.create(name, true,
  82.                                             fileSys.getConf().getInt("io.file.buffer.size", 4096),
  83.                                             (short)repl, (long)blockSize);
  84.     return stm;
  85.   }
  86.   private void checkFile(FileSystem fs, Path name, int len) throws IOException {
  87.     FSDataInputStream stm = fs.open(name);
  88.     byte[] actual = new byte[len];
  89.     stm.readFully(0, actual);
  90.     checkData(actual, 0, fileContents, "Read 2");
  91.     stm.close();
  92.   }
  93.   private void checkFullFile(FileSystem fs, Path name) throws IOException {
  94.     checkFile(fs, name, fileSize);
  95.   }
  96.   private void checkData(byte[] actual, int from, byte[] expected, String message) {
  97.     for (int idx = 0; idx < actual.length; idx++) {
  98.       assertEquals(message+" byte "+(from+idx)+" differs. expected "+
  99.                    expected[from+idx]+" actual "+actual[idx],
  100.                    expected[from+idx], actual[idx]);
  101.       actual[idx] = 0;
  102.     }
  103.   }
  104.   /**
  105.    * Creates one file, writes a few bytes to it and then closed it.
  106.    * Reopens the same file for appending, write all blocks and then close.
  107.    * Verify that all data exists in file.
  108.    */ 
  109.   public void testSimpleAppend() throws IOException {
  110.     Configuration conf = new Configuration();
  111.     if (simulatedStorage) {
  112.       conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
  113.     }
  114.     conf.setInt("dfs.datanode.handler.count", 50);
  115.     conf.setBoolean("dfs.support.append", true);
  116.     initBuffer(fileSize);
  117.     MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
  118.     FileSystem fs = cluster.getFileSystem();
  119.     try {
  120.       { // test appending to a file.
  121.         // create a new file.
  122.         Path file1 = new Path("/simpleAppend.dat");
  123.         FSDataOutputStream stm = createFile(fs, file1, 1);
  124.         System.out.println("Created file simpleAppend.dat");
  125.   
  126.         // write to file
  127.         int mid = 186;   // io.bytes.per.checksum bytes
  128.         System.out.println("Writing " + mid + " bytes to file " + file1);
  129.         stm.write(fileContents, 0, mid);
  130.         stm.close();
  131.         System.out.println("Wrote and Closed first part of file.");
  132.   
  133.         // write to file
  134.         int mid2 = 607;   // io.bytes.per.checksum bytes
  135.         System.out.println("Writing " + mid + " bytes to file " + file1);
  136.         stm = fs.append(file1);
  137.         stm.write(fileContents, mid, mid2-mid);
  138.         stm.close();
  139.         System.out.println("Wrote and Closed second part of file.");
  140.   
  141.         // write the remainder of the file
  142.         stm = fs.append(file1);
  143.         // ensure getPos is set to reflect existing size of the file
  144.         assertTrue(stm.getPos() > 0);
  145.         System.out.println("Writing " + (fileSize - mid2) + " bytes to file " + file1);
  146.         stm.write(fileContents, mid2, fileSize - mid2);
  147.         System.out.println("Written second part of file");
  148.         stm.close();
  149.         System.out.println("Wrote and Closed second part of file.");
  150.   
  151.         // verify that entire file is good
  152.         checkFullFile(fs, file1);
  153.       }
  154.       { // test appending to an non-existing file.
  155.         FSDataOutputStream out = null;
  156.         try {
  157.           out = fs.append(new Path("/non-existing.dat"));
  158.           fail("Expected to have FileNotFoundException");
  159.         }
  160.         catch(java.io.FileNotFoundException fnfe) {
  161.           System.out.println("Good: got " + fnfe);
  162.           fnfe.printStackTrace(System.out);
  163.         }
  164.         finally {
  165.           IOUtils.closeStream(out);
  166.         }
  167.       }
  168.       { // test append permission.
  169.         //set root to all writable 
  170.         Path root = new Path("/");
  171.         fs.setPermission(root, new FsPermission((short)0777));
  172.         fs.close();
  173.         // login as a different user
  174.         final UserGroupInformation superuser = UserGroupInformation.getCurrentUGI();
  175.         String username = "testappenduser";
  176.         String group = "testappendgroup";
  177.         assertFalse(superuser.getUserName().equals(username));
  178.         assertFalse(Arrays.asList(superuser.getGroupNames()).contains(group));
  179.         UnixUserGroupInformation appenduser = UnixUserGroupInformation.createImmutable(
  180.             new String[]{username, group});
  181.         UnixUserGroupInformation.saveToConf(conf,
  182.             UnixUserGroupInformation.UGI_PROPERTY_NAME, appenduser);
  183.         fs = FileSystem.get(conf);
  184.         // create a file
  185.         Path dir = new Path(root, getClass().getSimpleName());
  186.         Path foo = new Path(dir, "foo.dat");
  187.         FSDataOutputStream out = null;
  188.         int offset = 0;
  189.         try {
  190.           out = fs.create(foo);
  191.           int len = 10 + AppendTestUtil.nextInt(100);
  192.           out.write(fileContents, offset, len);
  193.           offset += len;
  194.         }
  195.         finally {
  196.           IOUtils.closeStream(out);
  197.         }
  198.         // change dir and foo to minimal permissions.
  199.         fs.setPermission(dir, new FsPermission((short)0100));
  200.         fs.setPermission(foo, new FsPermission((short)0200));
  201.         // try append, should success
  202.         out = null;
  203.         try {
  204.           out = fs.append(foo);
  205.           int len = 10 + AppendTestUtil.nextInt(100);
  206.           out.write(fileContents, offset, len);
  207.           offset += len;
  208.         }
  209.         finally {
  210.           IOUtils.closeStream(out);
  211.         }
  212.         // change dir and foo to all but no write on foo.
  213.         fs.setPermission(foo, new FsPermission((short)0577));
  214.         fs.setPermission(dir, new FsPermission((short)0777));
  215.         // try append, should fail
  216.         out = null;
  217.         try {
  218.           out = fs.append(foo);
  219.           fail("Expected to have AccessControlException");
  220.         }
  221.         catch(AccessControlException ace) {
  222.           System.out.println("Good: got " + ace);
  223.           ace.printStackTrace(System.out);
  224.         }
  225.         finally {
  226.           IOUtils.closeStream(out);
  227.         }
  228.       }
  229.     } catch (IOException e) {
  230.       System.out.println("Exception :" + e);
  231.       throw e; 
  232.     } catch (Throwable e) {
  233.       System.out.println("Throwable :" + e);
  234.       e.printStackTrace();
  235.       throw new IOException("Throwable : " + e);
  236.     } finally {
  237.       fs.close();
  238.       cluster.shutdown();
  239.     }
  240.   }
  241.   //
  242.   // an object that does a bunch of appends to files
  243.   //
  244.   class Workload extends Thread {
  245.     private int id;
  246.     private MiniDFSCluster cluster;
  247.     Workload(MiniDFSCluster cluster, int threadIndex) {
  248.       id = threadIndex;
  249.       this.cluster = cluster;
  250.     }
  251.     // create a bunch of files. Write to them and then verify.
  252.     public void run() {
  253.       System.out.println("Workload " + id + " starting... ");
  254.       for (int i = 0; i < numAppendsPerThread; i++) {
  255.    
  256.         // pick a file at random and remove it from pool
  257.         Path testfile = null;
  258.         synchronized (testFiles) {
  259.           if (testFiles.size() == 0) {
  260.             System.out.println("Completed write to almost all files.");
  261.             return;  
  262.           }
  263.           int index = AppendTestUtil.nextInt(testFiles.size());
  264.           testfile = testFiles.remove(index);
  265.         }
  266.         long len = 0;
  267.         int sizeToAppend = 0;
  268.         try {
  269.           FileSystem fs = cluster.getFileSystem();
  270.           // add a random number of bytes to file
  271.           len = fs.getFileStatus(testfile).getLen();
  272.           // if file is already full, then pick another file
  273.           if (len >= fileSize) {
  274.             System.out.println("File " + testfile + " is full.");
  275.             continue;
  276.           }
  277.   
  278.           // do small size appends so that we can trigger multiple
  279.           // appends to the same file.
  280.           //
  281.           int left = (int)(fileSize - len)/3;
  282.           if (left <= 0) {
  283.             left = 1;
  284.           }
  285.           sizeToAppend = AppendTestUtil.nextInt(left);
  286.           System.out.println("Workload thread " + id +
  287.                              " appending " + sizeToAppend + " bytes " +
  288.                              " to file " + testfile +
  289.                              " of size " + len);
  290.           FSDataOutputStream stm = fs.append(testfile);
  291.           stm.write(fileContents, (int)len, sizeToAppend);
  292.           stm.close();
  293.           // wait for the file size to be reflected in the namenode metadata
  294.           while (fs.getFileStatus(testfile).getLen() != (len + sizeToAppend)) {
  295.             try {
  296.               System.out.println("Workload thread " + id +
  297.                                  " file " + testfile  +
  298.                                  " size " + fs.getFileStatus(testfile).getLen() +
  299.                                  " expected size " + (len + sizeToAppend) +
  300.                                  " waiting for namenode metadata update.");
  301.               Thread.sleep(5000);
  302.             } catch (InterruptedException e) { 
  303.             }
  304.           }
  305.           assertTrue("File " + testfile + " size is " + 
  306.                      fs.getFileStatus(testfile).getLen() +
  307.                      " but expected " + (len + sizeToAppend),
  308.                     fs.getFileStatus(testfile).getLen() == (len + sizeToAppend));
  309.           checkFile(fs, testfile, (int)(len + sizeToAppend));
  310.         } catch (Throwable e) {
  311.           globalStatus = false;
  312.           if (e != null && e.toString() != null) {
  313.             System.out.println("Workload exception " + id + 
  314.                                " testfile " + testfile +
  315.                                " " + e);
  316.             e.printStackTrace();
  317.           }
  318.           assertTrue("Workload exception " + id + " testfile " + testfile +
  319.                      " expected size " + (len + sizeToAppend),
  320.                      false);
  321.         }
  322.         // Add testfile back to the pool of files.
  323.         synchronized (testFiles) {
  324.           testFiles.add(testfile);
  325.         }
  326.       }
  327.     }
  328.   }
  329.   /**
  330.    * Test that appends to files at random offsets.
  331.    */
  332.   public void testComplexAppend() throws IOException {
  333.     initBuffer(fileSize);
  334.     Configuration conf = new Configuration();
  335.     conf.setInt("heartbeat.recheck.interval", 2000);
  336.     conf.setInt("dfs.heartbeat.interval", 2);
  337.     conf.setInt("dfs.replication.pending.timeout.sec", 2);
  338.     conf.setInt("dfs.socket.timeout", 30000);
  339.     conf.setInt("dfs.datanode.socket.write.timeout", 30000);
  340.     conf.setInt("dfs.datanode.handler.count", 50);
  341.     conf.setBoolean("dfs.support.append", true);
  342.     MiniDFSCluster cluster = new MiniDFSCluster(conf, numDatanodes, 
  343.                                                 true, null);
  344.     cluster.waitActive();
  345.     FileSystem fs = cluster.getFileSystem();
  346.     try {
  347.       // create a bunch of test files with random replication factors.
  348.       // Insert them into a linked list.
  349.       //
  350.       for (int i = 0; i < numberOfFiles; i++) {
  351.         short replication = (short)(AppendTestUtil.nextInt(numDatanodes) + 1);
  352.         Path testFile = new Path("/" + i + ".dat");
  353.         FSDataOutputStream stm = createFile(fs, testFile, replication);
  354.         stm.close();
  355.         testFiles.add(testFile);
  356.       }
  357.       // Create threads and make them run workload concurrently.
  358.       workload = new Workload[numThreads];
  359.       for (int i = 0; i < numThreads; i++) {
  360.         workload[i] = new Workload(cluster, i);
  361.         workload[i].start();
  362.       }
  363.       // wait for all transactions to get over
  364.       for (int i = 0; i < numThreads; i++) {
  365.         try {
  366.           System.out.println("Waiting for thread " + i + " to complete...");
  367.           workload[i].join();
  368.           System.out.println("Waiting for thread " + i + " complete.");
  369.         } catch (InterruptedException e) {
  370.           i--;      // retry
  371.         }
  372.       }
  373.     } finally {
  374.       fs.close();
  375.       cluster.shutdown();
  376.     }
  377.     // If any of the worker thread failed in their job, indicate that
  378.     // this test failed.
  379.     //
  380.     assertTrue("testComplexAppend Worker encountered exceptions.", globalStatus);
  381.   }
  382. }