TestFSInputChecker.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:11k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfs;
  19. import junit.framework.TestCase;
  20. import java.io.*;
  21. import java.util.Random;
  22. import org.apache.hadoop.conf.Configuration;
  23. import org.apache.hadoop.fs.ChecksumException;
  24. import org.apache.hadoop.fs.ChecksumFileSystem;
  25. import org.apache.hadoop.fs.FSDataInputStream;
  26. import org.apache.hadoop.fs.FSDataOutputStream;
  27. import org.apache.hadoop.fs.FileSystem;
  28. import org.apache.hadoop.fs.LocalFileSystem;
  29. import org.apache.hadoop.fs.Path;
  30. import org.apache.hadoop.fs.permission.FsPermission;
  31. import org.apache.hadoop.io.IOUtils;
  32. /**
  33.  * This class tests if FSInputChecker works correctly.
  34.  */
  35. public class TestFSInputChecker extends TestCase {
  36.   static final long seed = 0xDEADBEEFL;
  37.   static final int BYTES_PER_SUM = 10;
  38.   static final int BLOCK_SIZE = 2*BYTES_PER_SUM;
  39.   static final int HALF_CHUNK_SIZE = BYTES_PER_SUM/2;
  40.   static final int FILE_SIZE = 2*BLOCK_SIZE-1;
  41.   static final short NUM_OF_DATANODES = 2;
  42.   byte[] expected = new byte[FILE_SIZE];
  43.   byte[] actual;
  44.   FSDataInputStream stm;
  45.   Random rand = new Random(seed);
  46.   /* create a file */
  47.   private void writeFile(FileSystem fileSys, Path name) throws IOException {
  48.     // create and write a file that contains three blocks of data
  49.     FSDataOutputStream stm = fileSys.create(name, new FsPermission((short)0777),
  50.         true, fileSys.getConf().getInt("io.file.buffer.size", 4096),
  51.         (short)NUM_OF_DATANODES, BLOCK_SIZE, null);
  52.     stm.write(expected);
  53.     stm.close();
  54.   }
  55.   
  56.   /*validate data*/
  57.   private void checkAndEraseData(byte[] actual, int from, byte[] expected, 
  58.       String message) throws Exception {
  59.     for (int idx = 0; idx < actual.length; idx++) {
  60.       assertEquals(message+" byte "+(from+idx)+" differs. expected "+
  61.                         expected[from+idx]+" actual "+actual[idx],
  62.                         actual[idx], expected[from+idx]);
  63.       actual[idx] = 0;
  64.     }
  65.   }
  66.   
  67.   /* test read and getPos */
  68.   private void checkReadAndGetPos() throws Exception {
  69.     actual = new byte[FILE_SIZE];
  70.     // test reads that do not cross checksum boundary
  71.     stm.seek(0);
  72.     int offset;
  73.     for(offset=0; offset<BLOCK_SIZE+BYTES_PER_SUM;
  74.                   offset += BYTES_PER_SUM ) {
  75.       assertEquals(stm.getPos(), offset);
  76.       stm.readFully(actual, offset, BYTES_PER_SUM);
  77.     }
  78.     stm.readFully(actual, offset, FILE_SIZE-BLOCK_SIZE-BYTES_PER_SUM);
  79.     assertEquals(stm.getPos(), FILE_SIZE);
  80.     checkAndEraseData(actual, 0, expected, "Read Sanity Test");
  81.     
  82.     // test reads that cross checksum boundary
  83.     stm.seek(0L);
  84.     assertEquals(stm.getPos(), 0L);
  85.     stm.readFully(actual, 0, HALF_CHUNK_SIZE);
  86.     assertEquals(stm.getPos(), HALF_CHUNK_SIZE);
  87.     stm.readFully(actual, HALF_CHUNK_SIZE, BLOCK_SIZE-HALF_CHUNK_SIZE);
  88.     assertEquals(stm.getPos(), BLOCK_SIZE);
  89.     stm.readFully(actual, BLOCK_SIZE, BYTES_PER_SUM+HALF_CHUNK_SIZE);
  90.     assertEquals(stm.getPos(), BLOCK_SIZE+BYTES_PER_SUM+HALF_CHUNK_SIZE);
  91.     stm.readFully(actual, 2*BLOCK_SIZE-HALF_CHUNK_SIZE, 
  92.         FILE_SIZE-(2*BLOCK_SIZE-HALF_CHUNK_SIZE));
  93.     assertEquals(stm.getPos(), FILE_SIZE);
  94.     checkAndEraseData(actual, 0, expected, "Read Sanity Test");
  95.     
  96.     // test read that cross block boundary
  97.     stm.seek(0L);
  98.     stm.readFully(actual, 0, BYTES_PER_SUM+HALF_CHUNK_SIZE);
  99.     assertEquals(stm.getPos(), BYTES_PER_SUM+HALF_CHUNK_SIZE);
  100.     stm.readFully(actual, BYTES_PER_SUM+HALF_CHUNK_SIZE, BYTES_PER_SUM);
  101.     assertEquals(stm.getPos(), BLOCK_SIZE+HALF_CHUNK_SIZE);
  102.     stm.readFully(actual, BLOCK_SIZE+HALF_CHUNK_SIZE,
  103.         FILE_SIZE-BLOCK_SIZE-HALF_CHUNK_SIZE);
  104.     assertEquals(stm.getPos(), FILE_SIZE);
  105.     checkAndEraseData(actual, 0, expected, "Read Sanity Test");
  106.   }
  107.   
  108.   /* test if one seek is correct */
  109.   private void testSeek1(int offset) 
  110.   throws Exception {
  111.     stm.seek(offset);
  112.     assertEquals(offset, stm.getPos());
  113.     stm.readFully(actual);
  114.     checkAndEraseData(actual, offset, expected, "Read Sanity Test");
  115.   }
  116.   /* test seek() */
  117.   private void checkSeek( ) throws Exception {
  118.     actual = new byte[HALF_CHUNK_SIZE];
  119.     
  120.     // test seeks to checksum boundary
  121.     testSeek1(0);
  122.     testSeek1(BYTES_PER_SUM);
  123.     testSeek1(BLOCK_SIZE);
  124.     
  125.     // test seek to non-checksum-boundary pos
  126.     testSeek1(BLOCK_SIZE+HALF_CHUNK_SIZE);
  127.     testSeek1(HALF_CHUNK_SIZE);
  128.     
  129.     // test seek to a position at the same checksum chunk
  130.     testSeek1(HALF_CHUNK_SIZE/2);
  131.     testSeek1(HALF_CHUNK_SIZE*3/2);
  132.     
  133.     // test end of file
  134.     actual = new byte[1];
  135.     testSeek1(FILE_SIZE-1);
  136.     
  137.     String errMsg = null;
  138.     try {
  139.       stm.seek(FILE_SIZE);
  140.     } catch (IOException e) {
  141.       errMsg = e.getMessage();
  142.     }
  143.     assertTrue(errMsg==null);
  144.   }
  145.   /* test if one skip is correct */
  146.   private void testSkip1(int skippedBytes) 
  147.   throws Exception {
  148.     long oldPos = stm.getPos();
  149.     long nSkipped = stm.skip(skippedBytes);
  150.     long newPos = oldPos+nSkipped;
  151.     assertEquals(stm.getPos(), newPos);
  152.     stm.readFully(actual);
  153.     checkAndEraseData(actual, (int)newPos, expected, "Read Sanity Test");
  154.   }
  155.   /* test skip() */
  156.   private void checkSkip( ) throws Exception {
  157.     actual = new byte[HALF_CHUNK_SIZE];
  158.     
  159.     // test skip to a checksum boundary
  160.     stm.seek(0);
  161.     testSkip1(BYTES_PER_SUM);
  162.     testSkip1(HALF_CHUNK_SIZE);
  163.     testSkip1(HALF_CHUNK_SIZE);
  164.     
  165.     // test skip to non-checksum-boundary pos
  166.     stm.seek(0);
  167.     testSkip1(HALF_CHUNK_SIZE+1);
  168.     testSkip1(BYTES_PER_SUM);
  169.     testSkip1(HALF_CHUNK_SIZE);
  170.     
  171.     // test skip to a position at the same checksum chunk
  172.     stm.seek(0);
  173.     testSkip1(1);
  174.     testSkip1(1);
  175.     
  176.     // test skip to end of file
  177.     stm.seek(0);
  178.     actual = new byte[1];
  179.     testSkip1(FILE_SIZE-1);
  180.     
  181.     stm.seek(0);
  182.     assertEquals(stm.skip(FILE_SIZE), FILE_SIZE);
  183.     assertEquals(stm.skip(10), 0);
  184.     
  185.     stm.seek(0);
  186.     assertEquals(stm.skip(FILE_SIZE+10), FILE_SIZE);
  187.     stm.seek(10);
  188.     assertEquals(stm.skip(FILE_SIZE), FILE_SIZE-10);
  189.   }
  190.   private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
  191.     assertTrue(fileSys.exists(name));
  192.     fileSys.delete(name, true);
  193.     assertTrue(!fileSys.exists(name));
  194.   }
  195.   
  196.   /**
  197.    * Tests read/seek/getPos/skipped opeation for input stream.
  198.    */
  199.   private void testChecker(ChecksumFileSystem fileSys, boolean readCS)
  200.   throws Exception {
  201.     Path file = new Path("try.dat");
  202.     if( readCS ) {
  203.       writeFile(fileSys, file);
  204.     } else {
  205.       writeFile(fileSys.getRawFileSystem(), file);
  206.     }
  207.     stm = fileSys.open(file);
  208.     checkReadAndGetPos();
  209.     checkSeek();
  210.     checkSkip();
  211.     //checkMark
  212.     assertFalse(stm.markSupported());
  213.     stm.close();
  214.     cleanupFile(fileSys, file);
  215.   }
  216.   
  217.   private void testFileCorruption(LocalFileSystem fileSys) throws IOException {
  218.     // create a file and verify that checksum corruption results in 
  219.     // a checksum exception on LocalFS
  220.     
  221.     String dir = System.getProperty("test.build.data", ".");
  222.     Path file = new Path(dir + "/corruption-test.dat");
  223.     Path crcFile = new Path(dir + "/.corruption-test.dat.crc");
  224.     
  225.     writeFile(fileSys, file);
  226.     
  227.     int fileLen = (int)fileSys.getFileStatus(file).getLen();
  228.     
  229.     byte [] buf = new byte[fileLen];
  230.     InputStream in = fileSys.open(file);
  231.     IOUtils.readFully(in, buf, 0, buf.length);
  232.     in.close();
  233.     
  234.     // check .crc corruption
  235.     checkFileCorruption(fileSys, file, crcFile);
  236.     fileSys.delete(file, true);
  237.     
  238.     writeFile(fileSys, file);
  239.     
  240.     // check data corrutpion
  241.     checkFileCorruption(fileSys, file, file);
  242.     
  243.     fileSys.delete(file, true);
  244.   }
  245.   
  246.   private void checkFileCorruption(LocalFileSystem fileSys, Path file, 
  247.                                    Path fileToCorrupt) throws IOException {
  248.     
  249.     // corrupt the file 
  250.     RandomAccessFile out = 
  251.       new RandomAccessFile(new File(fileToCorrupt.toString()), "rw");
  252.     
  253.     byte[] buf = new byte[(int)fileSys.getFileStatus(file).getLen()];    
  254.     int corruptFileLen = (int)fileSys.getFileStatus(fileToCorrupt).getLen();
  255.     assertTrue(buf.length >= corruptFileLen);
  256.     
  257.     rand.nextBytes(buf);
  258.     out.seek(corruptFileLen/2);
  259.     out.write(buf, 0, corruptFileLen/4);
  260.     out.close();
  261.     boolean gotException = false;
  262.     
  263.     InputStream in = fileSys.open(file);
  264.     try {
  265.       IOUtils.readFully(in, buf, 0, buf.length);
  266.     } catch (ChecksumException e) {
  267.       gotException = true;
  268.     }
  269.     assertTrue(gotException);
  270.     in.close();    
  271.   }
  272.   
  273.   public void testFSInputChecker() throws Exception {
  274.     Configuration conf = new Configuration();
  275.     conf.setLong("dfs.block.size", BLOCK_SIZE);
  276.     conf.setInt("io.bytes.per.checksum", BYTES_PER_SUM);
  277.     conf.set("fs.hdfs.impl",
  278.              "org.apache.hadoop.hdfs.ChecksumDistributedFileSystem");
  279.     rand.nextBytes(expected);
  280.     // test DFS
  281.     MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
  282.     ChecksumFileSystem fileSys = (ChecksumFileSystem)cluster.getFileSystem();
  283.     try {
  284.       testChecker(fileSys, true);
  285.       testChecker(fileSys, false);
  286.       testSeekAndRead(fileSys);
  287.     } finally {
  288.       fileSys.close();
  289.       cluster.shutdown();
  290.     }
  291.     
  292.     
  293.     // test Local FS
  294.     fileSys = FileSystem.getLocal(conf);
  295.     try {
  296.       testChecker(fileSys, true);
  297.       testChecker(fileSys, false);
  298.       testFileCorruption((LocalFileSystem)fileSys);
  299.       testSeekAndRead(fileSys);
  300.     }finally {
  301.       fileSys.close();
  302.     }
  303.   }
  304.   private void testSeekAndRead(ChecksumFileSystem fileSys)
  305.   throws IOException {
  306.     Path file = new Path("try.dat");
  307.     writeFile(fileSys, file);
  308.     stm = fileSys.open(file,
  309.         fileSys.getConf().getInt("io.file.buffer.size", 4096));
  310.     checkSeekAndRead();
  311.     stm.close();
  312.     cleanupFile(fileSys, file);
  313.   }
  314.   private void checkSeekAndRead() throws IOException {
  315.     int position = 1;
  316.     int len = 2 * BYTES_PER_SUM - (int) position;
  317.     readAndCompare(stm, position, len);
  318.     position = BYTES_PER_SUM;
  319.     len = BYTES_PER_SUM;
  320.     readAndCompare(stm, position, len);
  321.   }
  322.   private void readAndCompare(FSDataInputStream in, int position, int len)
  323.       throws IOException {
  324.     byte[] b = new byte[len];
  325.     in.seek(position);
  326.     IOUtils.readFully(in, b, 0, b.length);
  327.     for (int i = 0; i < b.length; i++) {
  328.       assertEquals(expected[position + i], b[i]);
  329.     }
  330.   }
  331. }