TestGetBlocks.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:6k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfs;
  19. import java.io.IOException;
  20. import java.net.InetSocketAddress;
  21. import java.util.*;
  22. import org.apache.hadoop.conf.Configuration;
  23. import org.apache.hadoop.hdfs.protocol.Block;
  24. import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
  25. import org.apache.hadoop.hdfs.protocol.LocatedBlock;
  26. import org.apache.hadoop.hdfs.server.common.GenerationStamp;
  27. import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
  28. import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
  29. import org.apache.hadoop.fs.FSDataOutputStream;
  30. import org.apache.hadoop.fs.FileSystem;
  31. import org.apache.hadoop.fs.Path;
  32. import org.apache.hadoop.ipc.RPC;
  33. import org.apache.hadoop.ipc.RemoteException;
  34. import org.apache.hadoop.net.NetUtils;
  35. import org.apache.hadoop.security.UnixUserGroupInformation;
  36. import junit.framework.TestCase;
  37. /**
  38.  * This class tests if block replacement request to data nodes work correctly.
  39.  */
  40. public class TestGetBlocks extends TestCase {
  41.   /** test getBlocks */
  42.   public void testGetBlocks() throws Exception {
  43.     final Configuration CONF = new Configuration();
  44.     final short REPLICATION_FACTOR = (short)2;
  45.     final int DEFAULT_BLOCK_SIZE = 1024;
  46.     final Random r = new Random();
  47.     
  48.     CONF.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
  49.     MiniDFSCluster cluster = new MiniDFSCluster(
  50.           CONF, REPLICATION_FACTOR, true, null );
  51.     try {
  52.       cluster.waitActive();
  53.       
  54.       // create a file with two blocks
  55.       FileSystem fs = cluster.getFileSystem();
  56.       FSDataOutputStream out = fs.create(new Path("/tmp.txt"),
  57.           REPLICATION_FACTOR);
  58.       byte [] data = new byte[1024];
  59.       long fileLen = 2*DEFAULT_BLOCK_SIZE;
  60.       long bytesToWrite = fileLen;
  61.       while( bytesToWrite > 0 ) {
  62.         r.nextBytes(data);
  63.         int bytesToWriteNext = (1024<bytesToWrite)?1024:(int)bytesToWrite;
  64.         out.write(data, 0, bytesToWriteNext);
  65.         bytesToWrite -= bytesToWriteNext;
  66.       }
  67.       out.close();
  68.       // get blocks & data nodes
  69.       List<LocatedBlock> locatedBlocks;
  70.       DatanodeInfo[] dataNodes=null;
  71.       boolean notWritten;
  72.       do {
  73.         DFSClient dfsclient = new DFSClient(CONF);
  74.         locatedBlocks = dfsclient.namenode.
  75.           getBlockLocations("/tmp.txt", 0, fileLen).getLocatedBlocks();
  76.         assertEquals(2, locatedBlocks.size());
  77.         notWritten = false;
  78.         for(int i=0; i<2; i++) {
  79.           dataNodes = locatedBlocks.get(i).getLocations();
  80.           if(dataNodes.length != REPLICATION_FACTOR) {
  81.             notWritten = true;
  82.             try {
  83.               Thread.sleep(10);
  84.             } catch(InterruptedException e) {
  85.             }
  86.             break;
  87.           }
  88.         }
  89.       } while(notWritten);
  90.       
  91.       // get RPC client to namenode
  92.       InetSocketAddress addr = new InetSocketAddress("localhost",
  93.           cluster.getNameNodePort());
  94.       NamenodeProtocol namenode = (NamenodeProtocol) RPC.getProxy(
  95.           NamenodeProtocol.class, NamenodeProtocol.versionID, addr,
  96.           UnixUserGroupInformation.login(CONF), CONF,
  97.           NetUtils.getDefaultSocketFactory(CONF));
  98.       // get blocks of size fileLen from dataNodes[0]
  99.       BlockWithLocations[] locs;
  100.       locs = namenode.getBlocks(dataNodes[0], fileLen).getBlocks();
  101.       assertEquals(locs.length, 2);
  102.       assertEquals(locs[0].getDatanodes().length, 2);
  103.       assertEquals(locs[1].getDatanodes().length, 2);
  104.       // get blocks of size BlockSize from dataNodes[0]
  105.       locs = namenode.getBlocks(dataNodes[0], DEFAULT_BLOCK_SIZE).getBlocks();
  106.       assertEquals(locs.length, 1);
  107.       assertEquals(locs[0].getDatanodes().length, 2);
  108.       // get blocks of size 1 from dataNodes[0]
  109.       locs = namenode.getBlocks(dataNodes[0], 1).getBlocks();
  110.       assertEquals(locs.length, 1);
  111.       assertEquals(locs[0].getDatanodes().length, 2);
  112.       // get blocks of size 0 from dataNodes[0]
  113.       getBlocksWithException(namenode, dataNodes[0], 0);     
  114.       // get blocks of size -1 from dataNodes[0]
  115.       getBlocksWithException(namenode, dataNodes[0], -1);
  116.       // get blocks of size BlockSize from a non-existent datanode
  117.       getBlocksWithException(namenode, new DatanodeInfo(), 2);
  118.     } finally {
  119.       cluster.shutdown();
  120.     }
  121.   }
  122.   private void getBlocksWithException(NamenodeProtocol namenode,
  123.                                       DatanodeInfo datanode,
  124.                                       long size) throws IOException {
  125.     boolean getException = false;
  126.     try {
  127.         namenode.getBlocks(new DatanodeInfo(), 2);
  128.     } catch(RemoteException e) {
  129.       getException = true;
  130.       assertTrue(e.getMessage().contains("IllegalArgumentException"));
  131.     }
  132.     assertTrue(getException);
  133.   }
  134.  
  135.   public void testGenerationStampWildCard() {
  136.     Map<Block, Long> map = new HashMap<Block, Long>();
  137.     final Random RAN = new Random();
  138.     final long seed = RAN.nextLong();
  139.     System.out.println("seed=" +  seed);
  140.     RAN.setSeed(seed);
  141.     long[] blkids = new long[10]; 
  142.     for(int i = 0; i < blkids.length; i++) {
  143.       blkids[i] = 1000L + RAN.nextInt(100000);
  144.       map.put(new Block(blkids[i], 0, blkids[i]), blkids[i]);
  145.     }
  146.     System.out.println("map=" + map.toString().replace(",", "n  "));
  147.     
  148.     for(int i = 0; i < blkids.length; i++) {
  149.       Block b = new Block(blkids[i], 0, GenerationStamp.WILDCARD_STAMP);
  150.       Long v = map.get(b);
  151.       System.out.println(b + " => " + v);
  152.       assertEquals(blkids[i], v.longValue());
  153.     }
  154.   }
  155.   /**
  156.    * @param args
  157.    */
  158.   public static void main(String[] args) throws Exception {
  159.     (new TestGetBlocks()).testGetBlocks();
  160.   }
  161. }