TestInjectionForSimulatedStorage.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:7k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfs;
  19. import junit.framework.TestCase;
  20. import java.io.*;
  21. import java.util.HashSet;
  22. import java.util.Set;
  23. import java.net.*;
  24. import org.apache.commons.logging.Log;
  25. import org.apache.commons.logging.LogFactory;
  26. import org.apache.hadoop.conf.Configuration;
  27. import org.apache.hadoop.fs.FSDataOutputStream;
  28. import org.apache.hadoop.fs.FileSystem;
  29. import org.apache.hadoop.fs.Path;
  30. import org.apache.hadoop.hdfs.protocol.Block;
  31. import org.apache.hadoop.hdfs.protocol.ClientProtocol;
  32. import org.apache.hadoop.hdfs.protocol.LocatedBlock;
  33. import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
  34. import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
  35. /**
  36.  * This class tests the replication and injection of blocks of a DFS file for simulated storage.
  37.  */
  38. public class TestInjectionForSimulatedStorage extends TestCase {
  39.   private int checksumSize = 16;
  40.   private int blockSize = checksumSize*2;
  41.   private int numBlocks = 4;
  42.   private int filesize = blockSize*numBlocks;
  43.   private int numDataNodes = 4;
  44.   private static final Log LOG = LogFactory.getLog(
  45.       "org.apache.hadoop.hdfs.TestInjectionForSimulatedStorage");
  46.   
  47.   private void writeFile(FileSystem fileSys, Path name, int repl)
  48.                                                 throws IOException {
  49.     // create and write a file that contains three blocks of data
  50.     FSDataOutputStream stm = fileSys.create(name, true,
  51.           fileSys.getConf().getInt("io.file.buffer.size", 4096),
  52.                                       (short)repl, (long)blockSize);
  53.     byte[] buffer = new byte[filesize];
  54.     for (int i=0; i<buffer.length; i++) {
  55.       buffer[i] = '1';
  56.     }
  57.     stm.write(buffer);
  58.     stm.close();
  59.   }
  60.   
  61.   // Waits for all of the blocks to have expected replication
  62.   // Waits for all of the blocks to have expected replication
  63.   private void waitForBlockReplication(String filename, 
  64.                                        ClientProtocol namenode,
  65.                                        int expected, long maxWaitSec) 
  66.                                        throws IOException {
  67.     long start = System.currentTimeMillis();
  68.     
  69.     //wait for all the blocks to be replicated;
  70.     LOG.info("Checking for block replication for " + filename);
  71.     
  72.     LocatedBlocks blocks = namenode.getBlockLocations(filename, 0, Long.MAX_VALUE);
  73.     assertEquals(numBlocks, blocks.locatedBlockCount());
  74.     
  75.     for (int i = 0; i < numBlocks; ++i) {
  76.       LOG.info("Checking for block:" + (i+1));
  77.       while (true) { // Loop to check for block i (usually when 0 is done all will be done
  78.         blocks = namenode.getBlockLocations(filename, 0, Long.MAX_VALUE);
  79.         assertEquals(numBlocks, blocks.locatedBlockCount());
  80.         LocatedBlock block = blocks.get(i);
  81.         int actual = block.getLocations().length;
  82.         if ( actual == expected ) {
  83.           LOG.info("Got enough replicas for " + (i+1) + "th block " + block.getBlock() +
  84.               ", got " + actual + ".");
  85.           break;
  86.         }
  87.         LOG.info("Not enough replicas for " + (i+1) + "th block " + block.getBlock() +
  88.                                " yet. Expecting " + expected + ", got " + 
  89.                                actual + ".");
  90.       
  91.         if (maxWaitSec > 0 && 
  92.             (System.currentTimeMillis() - start) > (maxWaitSec * 1000)) {
  93.           throw new IOException("Timedout while waiting for all blocks to " +
  94.                                 " be replicated for " + filename);
  95.         }
  96.       
  97.         try {
  98.           Thread.sleep(500);
  99.         } catch (InterruptedException ignored) {}
  100.       }
  101.     }
  102.   }
  103.  
  104.   
  105.   
  106.   /* This test makes sure that NameNode retries all the available blocks 
  107.    * for under replicated blocks. This test uses simulated storage and one
  108.    * of its features to inject blocks,
  109.    * 
  110.    * It creates a file with several blocks and replication of 4. 
  111.    * The cluster is then shut down - NN retains its state but the DNs are 
  112.    * all simulated and hence loose their blocks. 
  113.    * The blocks are then injected in one of the DNs. The  expected behaviour is
  114.    * that the NN will arrange for themissing replica will be copied from a valid source.
  115.    */
  116.   public void testInjection() throws IOException {
  117.     
  118.     MiniDFSCluster cluster = null;
  119.     String testFile = "/replication-test-file";
  120.     Path testPath = new Path(testFile);
  121.     
  122.     byte buffer[] = new byte[1024];
  123.     for (int i=0; i<buffer.length; i++) {
  124.       buffer[i] = '1';
  125.     }
  126.     
  127.     try {
  128.       Configuration conf = new Configuration();
  129.       conf.set("dfs.replication", Integer.toString(numDataNodes));
  130.       conf.setInt("io.bytes.per.checksum", checksumSize);
  131.       conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
  132.       //first time format
  133.       cluster = new MiniDFSCluster(0, conf, numDataNodes, true,
  134.                                    true, null, null);
  135.       cluster.waitActive();
  136.       DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost",
  137.                                             cluster.getNameNodePort()),
  138.                                             conf);
  139.       
  140.       writeFile(cluster.getFileSystem(), testPath, numDataNodes);
  141.       
  142.       waitForBlockReplication(testFile, dfsClient.namenode, numDataNodes, 20);
  143.       
  144.       Block[][] blocksList = cluster.getAllBlockReports();
  145.                     
  146.       
  147.       cluster.shutdown();
  148.       cluster = null;
  149.       
  150.       
  151.       /* Start the MiniDFSCluster with more datanodes since once a writeBlock
  152.        * to a datanode node fails, same block can not be written to it
  153.        * immediately. In our case some replication attempts will fail.
  154.        */
  155.       
  156.       LOG.info("Restarting minicluster");
  157.       conf = new Configuration();
  158.       conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
  159.       conf.set("dfs.safemode.threshold.pct", "0.0f"); 
  160.       
  161.       cluster = new MiniDFSCluster(0, conf, numDataNodes*2, false,
  162.                                    true, null, null);
  163.       cluster.waitActive();
  164.       Set<Block> uniqueBlocks = new HashSet<Block>();
  165.       for (int i=0; i<blocksList.length; ++i) {
  166.         for (int j=0; j < blocksList[i].length; ++j) {
  167.           uniqueBlocks.add(blocksList[i][j]);
  168.         }
  169.       }
  170.       // Insert all the blocks in the first data node
  171.       
  172.       LOG.info("Inserting " + uniqueBlocks.size() + " blocks");
  173.       Block[] blocks = uniqueBlocks.toArray(new Block[uniqueBlocks.size()]);
  174.       cluster.injectBlocks(0, blocks);
  175.       
  176.       dfsClient = new DFSClient(new InetSocketAddress("localhost",
  177.                                   cluster.getNameNodePort()),
  178.                                   conf);
  179.       
  180.       waitForBlockReplication(testFile, dfsClient.namenode, numDataNodes, -1);
  181.       
  182.     } finally {
  183.       if (cluster != null) {
  184.         cluster.shutdown();
  185.       }
  186.     }
  187.   }  
  188. }