TestFileAppend3.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:9k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfs;
  19. import java.io.IOException;
  20. import java.io.RandomAccessFile;
  21. import junit.extensions.TestSetup;
  22. import junit.framework.Test;
  23. import junit.framework.TestSuite;
  24. import org.apache.hadoop.conf.Configuration;
  25. import org.apache.hadoop.fs.FSDataOutputStream;
  26. import org.apache.hadoop.fs.Path;
  27. import org.apache.hadoop.hdfs.protocol.Block;
  28. import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
  29. import org.apache.hadoop.hdfs.protocol.LocatedBlock;
  30. import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
  31. import org.apache.hadoop.hdfs.server.datanode.DataNode;
  32. import org.apache.hadoop.hdfs.server.datanode.FSDataset;
  33. import org.apache.hadoop.hdfs.server.protocol.BlockMetaDataInfo;
  34. /** This class implements some of tests posted in HADOOP-2658. */
  35. public class TestFileAppend3 extends junit.framework.TestCase {
  36.   static final long BLOCK_SIZE = 64 * 1024;
  37.   static final short REPLICATION = 3;
  38.   static final int DATANODE_NUM = 5;
  39.   private static Configuration conf;
  40.   private static int buffersize;
  41.   private static MiniDFSCluster cluster;
  42.   private static DistributedFileSystem fs;
  43.   public static Test suite() {
  44.     return new TestSetup(new TestSuite(TestFileAppend3.class)) {
  45.       protected void setUp() throws java.lang.Exception {
  46.         AppendTestUtil.LOG.info("setUp()");
  47.         conf = new Configuration();
  48.         conf.setInt("io.bytes.per.checksum", 512);
  49.         conf.setBoolean("dfs.support.append", true);
  50.         buffersize = conf.getInt("io.file.buffer.size", 4096);
  51.         cluster = new MiniDFSCluster(conf, DATANODE_NUM, true, null);
  52.         fs = (DistributedFileSystem)cluster.getFileSystem();
  53.       }
  54.     
  55.       protected void tearDown() throws Exception {
  56.         AppendTestUtil.LOG.info("tearDown()");
  57.         if(fs != null) fs.close();
  58.         if(cluster != null) cluster.shutdown();
  59.       }
  60.     };  
  61.   }
  62.   /** TC1: Append on block boundary. */
  63.   public void testTC1() throws Exception {
  64.     final Path p = new Path("/TC1/foo");
  65.     System.out.println("p=" + p);
  66.     //a. Create file and write one block of data. Close file.
  67.     final int len1 = (int)BLOCK_SIZE; 
  68.     {
  69.       FSDataOutputStream out = fs.create(p, false, buffersize, REPLICATION, BLOCK_SIZE);
  70.       AppendTestUtil.write(out, 0, len1);
  71.       out.close();
  72.     }
  73.     //   Reopen file to append. Append half block of data. Close file.
  74.     final int len2 = (int)BLOCK_SIZE/2; 
  75.     {
  76.       FSDataOutputStream out = fs.append(p);
  77.       AppendTestUtil.write(out, len1, len2);
  78.       out.close();
  79.     }
  80.     
  81.     //b. Reopen file and read 1.5 blocks worth of data. Close file.
  82.     AppendTestUtil.check(fs, p, len1 + len2);
  83.   }
  84.   /** TC2: Append on non-block boundary. */
  85.   public void testTC2() throws Exception {
  86.     final Path p = new Path("/TC2/foo");
  87.     System.out.println("p=" + p);
  88.     //a. Create file with one and a half block of data. Close file.
  89.     final int len1 = (int)(BLOCK_SIZE + BLOCK_SIZE/2); 
  90.     {
  91.       FSDataOutputStream out = fs.create(p, false, buffersize, REPLICATION, BLOCK_SIZE);
  92.       AppendTestUtil.write(out, 0, len1);
  93.       out.close();
  94.     }
  95.     //   Reopen file to append quarter block of data. Close file.
  96.     final int len2 = (int)BLOCK_SIZE/4; 
  97.     {
  98.       FSDataOutputStream out = fs.append(p);
  99.       AppendTestUtil.write(out, len1, len2);
  100.       out.close();
  101.     }
  102.     //b. Reopen file and read 1.75 blocks of data. Close file.
  103.     AppendTestUtil.check(fs, p, len1 + len2);
  104.   }
  105.   /** TC5: Only one simultaneous append. */
  106.   public void testTC5() throws Exception {
  107.     final Path p = new Path("/TC5/foo");
  108.     System.out.println("p=" + p);
  109.     //a. Create file on Machine M1. Write half block to it. Close file.
  110.     {
  111.       FSDataOutputStream out = fs.create(p, false, buffersize, REPLICATION, BLOCK_SIZE);
  112.       AppendTestUtil.write(out, 0, (int)(BLOCK_SIZE/2));
  113.       out.close();
  114.     }
  115.     //b. Reopen file in "append" mode on Machine M1.
  116.     FSDataOutputStream out = fs.append(p);
  117.     //c. On Machine M2, reopen file in "append" mode. This should fail.
  118.     try {
  119.       AppendTestUtil.createHdfsWithDifferentUsername(conf).append(p);
  120.       fail("This should fail.");
  121.     } catch(IOException ioe) {
  122.       AppendTestUtil.LOG.info("GOOD: got an exception", ioe);
  123.     }
  124.     //d. On Machine M1, close file.
  125.     out.close();        
  126.   }
  127.   /** TC7: Corrupted replicas are present. */
  128.   public void testTC7() throws Exception {
  129.     final short repl = 2;
  130.     final Path p = new Path("/TC7/foo");
  131.     System.out.println("p=" + p);
  132.     
  133.     //a. Create file with replication factor of 2. Write half block of data. Close file.
  134.     final int len1 = (int)(BLOCK_SIZE/2); 
  135.     {
  136.       FSDataOutputStream out = fs.create(p, false, buffersize, repl, BLOCK_SIZE);
  137.       AppendTestUtil.write(out, 0, len1);
  138.       out.close();
  139.     }
  140.     DFSTestUtil.waitReplication(fs, p, repl);
  141.     //b. Log into one datanode that has one replica of this block.
  142.     //   Find the block file on this datanode and truncate it to zero size.
  143.     final LocatedBlocks locatedblocks = fs.dfs.namenode.getBlockLocations(p.toString(), 0L, len1);
  144.     assertEquals(1, locatedblocks.locatedBlockCount());
  145.     final LocatedBlock lb = locatedblocks.get(0);
  146.     final Block blk = lb.getBlock();
  147.     assertEquals(len1, lb.getBlockSize());
  148.     DatanodeInfo[] datanodeinfos = lb.getLocations();
  149.     assertEquals(repl, datanodeinfos.length);
  150.     final DataNode dn = cluster.getDataNode(datanodeinfos[0].getIpcPort());
  151.     final FSDataset data = (FSDataset)dn.getFSDataset();
  152.     final RandomAccessFile raf = new RandomAccessFile(data.getBlockFile(blk), "rw");
  153.     AppendTestUtil.LOG.info("dn=" + dn + ", blk=" + blk + " (length=" + blk.getNumBytes() + ")");
  154.     assertEquals(len1, raf.length());
  155.     raf.setLength(0);
  156.     raf.close();
  157.     //c. Open file in "append mode".  Append a new block worth of data. Close file.
  158.     final int len2 = (int)BLOCK_SIZE; 
  159.     {
  160.       FSDataOutputStream out = fs.append(p);
  161.       AppendTestUtil.write(out, len1, len2);
  162.       out.close();
  163.     }
  164.     //d. Reopen file and read two blocks worth of data.
  165.     AppendTestUtil.check(fs, p, len1 + len2);
  166.   }
  167.   /** TC11: Racing rename */
  168.   public void testTC11() throws Exception {
  169.     final Path p = new Path("/TC11/foo");
  170.     System.out.println("p=" + p);
  171.     //a. Create file and write one block of data. Close file.
  172.     final int len1 = (int)BLOCK_SIZE; 
  173.     {
  174.       FSDataOutputStream out = fs.create(p, false, buffersize, REPLICATION, BLOCK_SIZE);
  175.       AppendTestUtil.write(out, 0, len1);
  176.       out.close();
  177.     }
  178.     //b. Reopen file in "append" mode. Append half block of data.
  179.     FSDataOutputStream out = fs.append(p);
  180.     final int len2 = (int)BLOCK_SIZE/2; 
  181.     AppendTestUtil.write(out, len1, len2);
  182.     
  183.     //c. Rename file to file.new.
  184.     final Path pnew = new Path(p + ".new");
  185.     assertTrue(fs.rename(p, pnew));
  186.     //d. Close file handle that was opened in (b). 
  187.     try {
  188.       out.close();
  189.       fail("close() should throw an exception");
  190.     } catch(Exception e) {
  191.       AppendTestUtil.LOG.info("GOOD!", e);
  192.     }
  193.     //wait for the lease recovery 
  194.     cluster.setLeasePeriod(1000, 1000);
  195.     AppendTestUtil.sleep(5000);
  196.     //check block sizes 
  197.     final long len = fs.getFileStatus(pnew).getLen();
  198.     final LocatedBlocks locatedblocks = fs.dfs.namenode.getBlockLocations(pnew.toString(), 0L, len);
  199.     final int numblock = locatedblocks.locatedBlockCount();
  200.     for(int i = 0; i < numblock; i++) {
  201.       final LocatedBlock lb = locatedblocks.get(i);
  202.       final Block blk = lb.getBlock();
  203.       final long size = lb.getBlockSize();
  204.       if (i < numblock - 1) {
  205.         assertEquals(BLOCK_SIZE, size);
  206.       }
  207.       for(DatanodeInfo datanodeinfo : lb.getLocations()) {
  208.         final DataNode dn = cluster.getDataNode(datanodeinfo.getIpcPort());
  209.         final BlockMetaDataInfo metainfo = dn.getBlockMetaDataInfo(blk);
  210.         assertEquals(size, metainfo.getNumBytes());
  211.       }
  212.     }
  213.   }
  214.   /** TC12: Append to partial CRC chunk */
  215.   public void testTC12() throws Exception {
  216.     final Path p = new Path("/TC12/foo");
  217.     System.out.println("p=" + p);
  218.     
  219.     //a. Create file with a block size of 64KB
  220.     //   and a default io.bytes.per.checksum of 512 bytes.
  221.     //   Write 25687 bytes of data. Close file.
  222.     final int len1 = 25687; 
  223.     {
  224.       FSDataOutputStream out = fs.create(p, false, buffersize, REPLICATION, BLOCK_SIZE);
  225.       AppendTestUtil.write(out, 0, len1);
  226.       out.close();
  227.     }
  228.     //b. Reopen file in "append" mode. Append another 5877 bytes of data. Close file.
  229.     final int len2 = 5877; 
  230.     {
  231.       FSDataOutputStream out = fs.append(p);
  232.       AppendTestUtil.write(out, len1, len2);
  233.       out.close();
  234.     }
  235.     //c. Reopen file and read 25687+5877 bytes of data from file. Close file.
  236.     AppendTestUtil.check(fs, p, len1 + len2);
  237.   }
  238. }