TestLeaseRecovery.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:6k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfs;
  19. import java.io.IOException;
  20. import java.util.Arrays;
  21. import org.apache.hadoop.conf.Configuration;
  22. import org.apache.hadoop.fs.Path;
  23. import org.apache.hadoop.hdfs.protocol.Block;
  24. import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
  25. import org.apache.hadoop.hdfs.protocol.LocatedBlock;
  26. import org.apache.hadoop.hdfs.server.datanode.DataNode;
  27. import org.apache.hadoop.hdfs.server.datanode.TestInterDatanodeProtocol;
  28. import org.apache.hadoop.hdfs.server.protocol.BlockMetaDataInfo;
  29. import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
  30. public class TestLeaseRecovery extends junit.framework.TestCase {
  31.   static final int BLOCK_SIZE = 1024;
  32.   static final short REPLICATION_NUM = (short)3;
  33.   static void checkMetaInfo(Block b, InterDatanodeProtocol idp
  34.       ) throws IOException {
  35.     TestInterDatanodeProtocol.checkMetaInfo(b, idp, null);
  36.   }
  37.   
  38.   static int min(Integer... x) {
  39.     int m = x[0];
  40.     for(int i = 1; i < x.length; i++) {
  41.       if (x[i] < m) {
  42.         m = x[i];
  43.       }
  44.     }
  45.     return m;
  46.   }
  47.   /**
  48.    * The following test first creates a file with a few blocks.
  49.    * It randomly truncates the replica of the last block stored in each datanode.
  50.    * Finally, it triggers block synchronization to synchronize all stored block.
  51.    */
  52.   public void testBlockSynchronization() throws Exception {
  53.     final int ORG_FILE_SIZE = 3000; 
  54.     Configuration conf = new Configuration();
  55.     conf.setLong("dfs.block.size", BLOCK_SIZE);
  56.     conf.setBoolean("dfs.support.append", true);
  57.     MiniDFSCluster cluster = null;
  58.     try {
  59.       cluster = new MiniDFSCluster(conf, 5, true, null);
  60.       cluster.waitActive();
  61.       //create a file
  62.       DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
  63.       String filestr = "/foo";
  64.       Path filepath = new Path(filestr);
  65.       DFSTestUtil.createFile(dfs, filepath, ORG_FILE_SIZE, REPLICATION_NUM, 0L);
  66.       assertTrue(dfs.dfs.exists(filestr));
  67.       DFSTestUtil.waitReplication(dfs, filepath, REPLICATION_NUM);
  68.       //get block info for the last block
  69.       LocatedBlock locatedblock = TestInterDatanodeProtocol.getLastLocatedBlock(
  70.           dfs.dfs.namenode, filestr);
  71.       DatanodeInfo[] datanodeinfos = locatedblock.getLocations();
  72.       assertEquals(REPLICATION_NUM, datanodeinfos.length);
  73.       //connect to data nodes
  74.       InterDatanodeProtocol[] idps = new InterDatanodeProtocol[REPLICATION_NUM];
  75.       DataNode[] datanodes = new DataNode[REPLICATION_NUM];
  76.       for(int i = 0; i < REPLICATION_NUM; i++) {
  77.         idps[i] = DataNode.createInterDataNodeProtocolProxy(datanodeinfos[i], conf);
  78.         datanodes[i] = cluster.getDataNode(datanodeinfos[i].getIpcPort());
  79.         assertTrue(datanodes[i] != null);
  80.       }
  81.       
  82.       //verify BlockMetaDataInfo
  83.       Block lastblock = locatedblock.getBlock();
  84.       DataNode.LOG.info("newblocks=" + lastblock);
  85.       for(int i = 0; i < REPLICATION_NUM; i++) {
  86.         checkMetaInfo(lastblock, idps[i]);
  87.       }
  88.       //setup random block sizes 
  89.       int lastblocksize = ORG_FILE_SIZE % BLOCK_SIZE;
  90.       Integer[] newblocksizes = new Integer[REPLICATION_NUM];
  91.       for(int i = 0; i < REPLICATION_NUM; i++) {
  92.         newblocksizes[i] = AppendTestUtil.nextInt(lastblocksize);
  93.       }
  94.       DataNode.LOG.info("newblocksizes = " + Arrays.asList(newblocksizes)); 
  95.       //update blocks with random block sizes
  96.       Block[] newblocks = new Block[REPLICATION_NUM];
  97.       for(int i = 0; i < REPLICATION_NUM; i++) {
  98.         newblocks[i] = new Block(lastblock.getBlockId(), newblocksizes[i],
  99.             lastblock.getGenerationStamp());
  100.         idps[i].updateBlock(lastblock, newblocks[i], false);
  101.         checkMetaInfo(newblocks[i], idps[i]);
  102.       }
  103.       DataNode.LOG.info("dfs.dfs.clientName=" + dfs.dfs.clientName);
  104.       cluster.getNameNode().append(filestr, dfs.dfs.clientName);
  105.       //block synchronization
  106.       final int primarydatanodeindex = AppendTestUtil.nextInt(datanodes.length);
  107.       DataNode.LOG.info("primarydatanodeindex  =" + primarydatanodeindex);
  108.       DataNode primary = datanodes[primarydatanodeindex];
  109.       DataNode.LOG.info("primary.dnRegistration=" + primary.dnRegistration);
  110.       primary.recoverBlocks(new Block[]{lastblock}, new DatanodeInfo[][]{datanodeinfos}).join();
  111.       BlockMetaDataInfo[] updatedmetainfo = new BlockMetaDataInfo[REPLICATION_NUM];
  112.       int minsize = min(newblocksizes);
  113.       long currentGS = cluster.getNameNode().namesystem.getGenerationStamp();
  114.       lastblock.setGenerationStamp(currentGS);
  115.       for(int i = 0; i < REPLICATION_NUM; i++) {
  116.         updatedmetainfo[i] = idps[i].getBlockMetaDataInfo(lastblock);
  117.         assertEquals(lastblock.getBlockId(), updatedmetainfo[i].getBlockId());
  118.         assertEquals(minsize, updatedmetainfo[i].getNumBytes());
  119.         assertEquals(currentGS, updatedmetainfo[i].getGenerationStamp());
  120.       }
  121.     }
  122.     finally {
  123.       if (cluster != null) {cluster.shutdown();}
  124.     }
  125.   }
  126. }