TestReplication.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:17k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfs;
  19. import junit.framework.TestCase;
  20. import java.io.*;
  21. import java.util.Iterator;
  22. import java.util.Random;
  23. import java.net.*;
  24. import org.apache.commons.logging.Log;
  25. import org.apache.commons.logging.LogFactory;
  26. import org.apache.hadoop.conf.Configuration;
  27. import org.apache.hadoop.hdfs.protocol.ClientProtocol;
  28. import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
  29. import org.apache.hadoop.hdfs.protocol.LocatedBlock;
  30. import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
  31. import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
  32. import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
  33. import org.apache.hadoop.fs.FSDataOutputStream;
  34. import org.apache.hadoop.fs.FileSystem;
  35. import org.apache.hadoop.fs.Path;
  36. import org.apache.hadoop.fs.FileStatus;
  37. import org.apache.hadoop.fs.BlockLocation;
  38. /**
  39.  * This class tests the replication of a DFS file.
  40.  */
  41. public class TestReplication extends TestCase {
  42.   private static final long seed = 0xDEADBEEFL;
  43.   private static final int blockSize = 8192;
  44.   private static final int fileSize = 16384;
  45.   private static final String racks[] = new String[] {
  46.     "/d1/r1", "/d1/r1", "/d1/r2", "/d1/r2", "/d1/r2", "/d2/r3", "/d2/r3"
  47.   };
  48.   private static final int numDatanodes = racks.length;
  49.   private static final Log LOG = LogFactory.getLog(
  50.                                        "org.apache.hadoop.hdfs.TestReplication");
  51.   private void writeFile(FileSystem fileSys, Path name, int repl)
  52.     throws IOException {
  53.     // create and write a file that contains three blocks of data
  54.     FSDataOutputStream stm = fileSys.create(name, true,
  55.                                             fileSys.getConf().getInt("io.file.buffer.size", 4096),
  56.                                             (short)repl, (long)blockSize);
  57.     byte[] buffer = new byte[fileSize];
  58.     Random rand = new Random(seed);
  59.     rand.nextBytes(buffer);
  60.     stm.write(buffer);
  61.     stm.close();
  62.   }
  63.   
  64.   /* check if there are at least two nodes are on the same rack */
  65.   private void checkFile(FileSystem fileSys, Path name, int repl)
  66.     throws IOException {
  67.     Configuration conf = fileSys.getConf();
  68.     ClientProtocol namenode = DFSClient.createNamenode(conf);
  69.       
  70.     waitForBlockReplication(name.toString(), namenode, 
  71.                             Math.min(numDatanodes, repl), -1);
  72.     
  73.     LocatedBlocks locations = namenode.getBlockLocations(name.toString(),0,
  74.                                                          Long.MAX_VALUE);
  75.     FileStatus stat = fileSys.getFileStatus(name);
  76.     BlockLocation[] blockLocations = fileSys.getFileBlockLocations(stat,0L,
  77.                                                          Long.MAX_VALUE);
  78.     // verify that rack locations match
  79.     assertTrue(blockLocations.length == locations.locatedBlockCount());
  80.     for (int i = 0; i < blockLocations.length; i++) {
  81.       LocatedBlock blk = locations.get(i);
  82.       DatanodeInfo[] datanodes = blk.getLocations();
  83.       String[] topologyPaths = blockLocations[i].getTopologyPaths();
  84.       assertTrue(topologyPaths.length == datanodes.length);
  85.       for (int j = 0; j < topologyPaths.length; j++) {
  86.         boolean found = false;
  87.         for (int k = 0; k < racks.length; k++) {
  88.           if (topologyPaths[j].startsWith(racks[k])) {
  89.             found = true;
  90.             break;
  91.           }
  92.         }
  93.         assertTrue(found);
  94.       }
  95.     }
  96.     boolean isOnSameRack = true, isNotOnSameRack = true;
  97.     for (LocatedBlock blk : locations.getLocatedBlocks()) {
  98.       DatanodeInfo[] datanodes = blk.getLocations();
  99.       if (datanodes.length <= 1) break;
  100.       if (datanodes.length == 2) {
  101.         isNotOnSameRack = !(datanodes[0].getNetworkLocation().equals(
  102.                                                                      datanodes[1].getNetworkLocation()));
  103.         break;
  104.       }
  105.       isOnSameRack = false;
  106.       isNotOnSameRack = false;
  107.       for (int i = 0; i < datanodes.length-1; i++) {
  108.         LOG.info("datanode "+ i + ": "+ datanodes[i].getName());
  109.         boolean onRack = false;
  110.         for( int j=i+1; j<datanodes.length; j++) {
  111.            if( datanodes[i].getNetworkLocation().equals(
  112.             datanodes[j].getNetworkLocation()) ) {
  113.              onRack = true;
  114.            }
  115.         }
  116.         if (onRack) {
  117.           isOnSameRack = true;
  118.         }
  119.         if (!onRack) {
  120.           isNotOnSameRack = true;                      
  121.         }
  122.         if (isOnSameRack && isNotOnSameRack) break;
  123.       }
  124.       if (!isOnSameRack || !isNotOnSameRack) break;
  125.     }
  126.     assertTrue(isOnSameRack);
  127.     assertTrue(isNotOnSameRack);
  128.   }
  129.   
  130.   private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
  131.     assertTrue(fileSys.exists(name));
  132.     fileSys.delete(name, true);
  133.     assertTrue(!fileSys.exists(name));
  134.   }
  135.   /* 
  136.    * Test if Datanode reports bad blocks during replication request
  137.    */
  138.   public void testBadBlockReportOnTransfer() throws Exception {
  139.     Configuration conf = new Configuration();
  140.     FileSystem fs = null;
  141.     DFSClient dfsClient = null;
  142.     LocatedBlocks blocks = null;
  143.     int replicaCount = 0;
  144.     MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
  145.     cluster.waitActive();
  146.     fs = cluster.getFileSystem();
  147.     dfsClient = new DFSClient(new InetSocketAddress("localhost",
  148.                               cluster.getNameNodePort()), conf);
  149.   
  150.     // Create file with replication factor of 1
  151.     Path file1 = new Path("/tmp/testBadBlockReportOnTransfer/file1");
  152.     DFSTestUtil.createFile(fs, file1, 1024, (short)1, 0);
  153.     DFSTestUtil.waitReplication(fs, file1, (short)1);
  154.   
  155.     // Corrupt the block belonging to the created file
  156.     String block = DFSTestUtil.getFirstBlock(fs, file1).getBlockName();
  157.     cluster.corruptBlockOnDataNodes(block);
  158.   
  159.     // Increase replication factor, this should invoke transfer request
  160.     // Receiving datanode fails on checksum and reports it to namenode
  161.     fs.setReplication(file1, (short)2);
  162.   
  163.     // Now get block details and check if the block is corrupt
  164.     blocks = dfsClient.namenode.
  165.               getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
  166.     while (blocks.get(0).isCorrupt() != true) {
  167.       try {
  168.         LOG.info("Waiting until block is marked as corrupt...");
  169.         Thread.sleep(1000);
  170.       } catch (InterruptedException ie) {
  171.       }
  172.       blocks = dfsClient.namenode.
  173.                 getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
  174.     }
  175.     replicaCount = blocks.get(0).getLocations().length;
  176.     assertTrue(replicaCount == 1);
  177.     cluster.shutdown();
  178.   }
  179.   
  180.   /**
  181.    * Tests replication in DFS.
  182.    */
  183.   public void runReplication(boolean simulated) throws IOException {
  184.     Configuration conf = new Configuration();
  185.     conf.setBoolean("dfs.replication.considerLoad", false);
  186.     if (simulated) {
  187.       conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
  188.     }
  189.     MiniDFSCluster cluster = new MiniDFSCluster(conf, numDatanodes, true, racks);
  190.     cluster.waitActive();
  191.     
  192.     InetSocketAddress addr = new InetSocketAddress("localhost",
  193.                                                    cluster.getNameNodePort());
  194.     DFSClient client = new DFSClient(addr, conf);
  195.     
  196.     DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
  197.     assertEquals("Number of Datanodes ", numDatanodes, info.length);
  198.     FileSystem fileSys = cluster.getFileSystem();
  199.     try {
  200.       Path file1 = new Path("/smallblocktest.dat");
  201.       writeFile(fileSys, file1, 3);
  202.       checkFile(fileSys, file1, 3);
  203.       cleanupFile(fileSys, file1);
  204.       writeFile(fileSys, file1, 10);
  205.       checkFile(fileSys, file1, 10);
  206.       cleanupFile(fileSys, file1);
  207.       writeFile(fileSys, file1, 4);
  208.       checkFile(fileSys, file1, 4);
  209.       cleanupFile(fileSys, file1);
  210.       writeFile(fileSys, file1, 1);
  211.       checkFile(fileSys, file1, 1);
  212.       cleanupFile(fileSys, file1);
  213.       writeFile(fileSys, file1, 2);
  214.       checkFile(fileSys, file1, 2);
  215.       cleanupFile(fileSys, file1);
  216.     } finally {
  217.       fileSys.close();
  218.       cluster.shutdown();
  219.     }
  220.   }
  221.   public void testReplicationSimulatedStorag() throws IOException {
  222.     runReplication(true);
  223.   }
  224.   
  225.   
  226.   public void testReplication() throws IOException {
  227.     runReplication(false);
  228.   }
  229.   
  230.   // Waits for all of the blocks to have expected replication
  231.   private void waitForBlockReplication(String filename, 
  232.                                        ClientProtocol namenode,
  233.                                        int expected, long maxWaitSec) 
  234.                                        throws IOException {
  235.     long start = System.currentTimeMillis();
  236.     
  237.     //wait for all the blocks to be replicated;
  238.     LOG.info("Checking for block replication for " + filename);
  239.     int iters = 0;
  240.     while (true) {
  241.       boolean replOk = true;
  242.       LocatedBlocks blocks = namenode.getBlockLocations(filename, 0, 
  243.                                                         Long.MAX_VALUE);
  244.       
  245.       for (Iterator<LocatedBlock> iter = blocks.getLocatedBlocks().iterator();
  246.            iter.hasNext();) {
  247.         LocatedBlock block = iter.next();
  248.         int actual = block.getLocations().length;
  249.         if ( actual < expected ) {
  250.           if (true || iters > 0) {
  251.             LOG.info("Not enough replicas for " + block.getBlock() +
  252.                                " yet. Expecting " + expected + ", got " + 
  253.                                actual + ".");
  254.           }
  255.           replOk = false;
  256.           break;
  257.         }
  258.       }
  259.       
  260.       if (replOk) {
  261.         return;
  262.       }
  263.       
  264.       iters++;
  265.       
  266.       if (maxWaitSec > 0 && 
  267.           (System.currentTimeMillis() - start) > (maxWaitSec * 1000)) {
  268.         throw new IOException("Timedout while waiting for all blocks to " +
  269.                               " be replicated for " + filename);
  270.       }
  271.       
  272.       try {
  273.         Thread.sleep(500);
  274.       } catch (InterruptedException ignored) {}
  275.     }
  276.   }
  277.   
  278.   /* This test makes sure that NameNode retries all the available blocks 
  279.    * for under replicated blocks. 
  280.    * 
  281.    * It creates a file with one block and replication of 4. It corrupts 
  282.    * two of the blocks and removes one of the replicas. Expected behaviour is
  283.    * that missing replica will be copied from one valid source.
  284.    */
  285.   public void testPendingReplicationRetry() throws IOException {
  286.     
  287.     MiniDFSCluster cluster = null;
  288.     int numDataNodes = 4;
  289.     String testFile = "/replication-test-file";
  290.     Path testPath = new Path(testFile);
  291.     
  292.     byte buffer[] = new byte[1024];
  293.     for (int i=0; i<buffer.length; i++) {
  294.       buffer[i] = '1';
  295.     }
  296.     
  297.     try {
  298.       Configuration conf = new Configuration();
  299.       conf.set("dfs.replication", Integer.toString(numDataNodes));
  300.       //first time format
  301.       cluster = new MiniDFSCluster(0, conf, numDataNodes, true,
  302.                                    true, null, null);
  303.       cluster.waitActive();
  304.       DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost",
  305.                                             cluster.getNameNodePort()),
  306.                                             conf);
  307.       
  308.       OutputStream out = cluster.getFileSystem().create(testPath);
  309.       out.write(buffer);
  310.       out.close();
  311.       
  312.       waitForBlockReplication(testFile, dfsClient.namenode, numDataNodes, -1);
  313.       // get first block of the file.
  314.       String block = dfsClient.namenode.
  315.                        getBlockLocations(testFile, 0, Long.MAX_VALUE).
  316.                        get(0).getBlock().getBlockName();
  317.       
  318.       cluster.shutdown();
  319.       cluster = null;
  320.       
  321.       //Now mess up some of the replicas.
  322.       //Delete the first and corrupt the next two.
  323.       File baseDir = new File(System.getProperty("test.build.data"), 
  324.                                                  "dfs/data");
  325.       for (int i=0; i<25; i++) {
  326.         buffer[i] = '0';
  327.       }
  328.       
  329.       int fileCount = 0;
  330.       for (int i=0; i<6; i++) {
  331.         File blockFile = new File(baseDir, "data" + (i+1) + "/current/" + block);
  332.         LOG.info("Checking for file " + blockFile);
  333.         
  334.         if (blockFile.exists()) {
  335.           if (fileCount == 0) {
  336.             LOG.info("Deleting file " + blockFile);
  337.             assertTrue(blockFile.delete());
  338.           } else {
  339.             // corrupt it.
  340.             LOG.info("Corrupting file " + blockFile);
  341.             long len = blockFile.length();
  342.             assertTrue(len > 50);
  343.             RandomAccessFile blockOut = new RandomAccessFile(blockFile, "rw");
  344.             try {
  345.               blockOut.seek(len/3);
  346.               blockOut.write(buffer, 0, 25);
  347.             } finally {
  348.               blockOut.close();
  349.             }
  350.           }
  351.           fileCount++;
  352.         }
  353.       }
  354.       assertEquals(3, fileCount);
  355.       
  356.       /* Start the MiniDFSCluster with more datanodes since once a writeBlock
  357.        * to a datanode node fails, same block can not be written to it
  358.        * immediately. In our case some replication attempts will fail.
  359.        */
  360.       
  361.       LOG.info("Restarting minicluster after deleting a replica and corrupting 2 crcs");
  362.       conf = new Configuration();
  363.       conf.set("dfs.replication", Integer.toString(numDataNodes));
  364.       conf.set("dfs.replication.pending.timeout.sec", Integer.toString(2));
  365.       conf.set("dfs.datanode.block.write.timeout.sec", Integer.toString(5));
  366.       conf.set("dfs.safemode.threshold.pct", "0.75f"); // only 3 copies exist
  367.       
  368.       cluster = new MiniDFSCluster(0, conf, numDataNodes*2, false,
  369.                                    true, null, null);
  370.       cluster.waitActive();
  371.       
  372.       dfsClient = new DFSClient(new InetSocketAddress("localhost",
  373.                                   cluster.getNameNodePort()),
  374.                                   conf);
  375.       
  376.       waitForBlockReplication(testFile, dfsClient.namenode, numDataNodes, -1);
  377.       
  378.     } finally {
  379.       if (cluster != null) {
  380.         cluster.shutdown();
  381.       }
  382.     }  
  383.   }
  384.   
  385.   /**
  386.    * Test if replication can detect mismatched length on-disk blocks
  387.    * @throws Exception
  388.    */
  389.   public void testReplicateLenMismatchedBlock() throws Exception {
  390.     MiniDFSCluster cluster = new MiniDFSCluster(new Configuration(), 2, true, null);
  391.     try {
  392.       cluster.waitActive();
  393.       // test truncated block
  394.       changeBlockLen(cluster, -1);
  395.       // test extended block
  396.       changeBlockLen(cluster, 1);
  397.     } finally {
  398.       cluster.shutdown();
  399.     }
  400.   }
  401.   
  402.   private void changeBlockLen(MiniDFSCluster cluster, 
  403.       int lenDelta) throws IOException, InterruptedException {
  404.     final Path fileName = new Path("/file1");
  405.     final short REPLICATION_FACTOR = (short)1;
  406.     final FileSystem fs = cluster.getFileSystem();
  407.     final int fileLen = fs.getConf().getInt("io.bytes.per.checksum", 512);
  408.     DFSTestUtil.createFile(fs, fileName, fileLen, REPLICATION_FACTOR, 0);
  409.     DFSTestUtil.waitReplication(fs, fileName, REPLICATION_FACTOR);
  410.     String block = DFSTestUtil.getFirstBlock(fs, fileName).getBlockName();
  411.     // Change the length of a replica
  412.     for (int i=0; i<cluster.getDataNodes().size(); i++) {
  413.       if (TestDatanodeBlockScanner.changeReplicaLength(block, i, lenDelta)) {
  414.         break;
  415.       }
  416.     }
  417.     // increase the file's replication factor
  418.     fs.setReplication(fileName, (short)(REPLICATION_FACTOR+1));
  419.     // block replication triggers corrupt block detection
  420.     DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost", 
  421.         cluster.getNameNodePort()), fs.getConf());
  422.     LocatedBlocks blocks = dfsClient.namenode.getBlockLocations(
  423.         fileName.toString(), 0, fileLen);
  424.     if (lenDelta < 0) { // replica truncated
  425.      while (!blocks.get(0).isCorrupt() || 
  426.      REPLICATION_FACTOR != blocks.get(0).getLocations().length) {
  427.      Thread.sleep(100);
  428.      blocks = dfsClient.namenode.getBlockLocations(
  429.      fileName.toString(), 0, fileLen);
  430.      }
  431.     } else { // no corruption detected; block replicated
  432.      while (REPLICATION_FACTOR+1 != blocks.get(0).getLocations().length) {
  433.      Thread.sleep(100);
  434.      blocks = dfsClient.namenode.getBlockLocations(
  435.      fileName.toString(), 0, fileLen);
  436.      }
  437.     }
  438.     fs.delete(fileName, true);
  439.   }
  440. }