TestDiskError.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:6k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfs.server.datanode;
  19. import java.io.DataOutputStream;
  20. import java.io.File;
  21. import java.net.InetSocketAddress;
  22. import java.net.Socket;
  23. import org.apache.hadoop.conf.Configuration;
  24. import org.apache.hadoop.fs.FileSystem;
  25. import org.apache.hadoop.fs.Path;
  26. import org.apache.hadoop.hdfs.DFSTestUtil;
  27. import org.apache.hadoop.hdfs.MiniDFSCluster;
  28. import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
  29. import org.apache.hadoop.hdfs.protocol.LocatedBlock;
  30. import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
  31. import org.apache.hadoop.io.Text;
  32. import junit.framework.TestCase;
  33. /** Test if a datanode can correctly handle errors during block read/write*/
  34. public class TestDiskError extends TestCase {
  35.   public void testShutdown() throws Exception {
  36.     if (System.getProperty("os.name").startsWith("Windows")) {
  37.       /**
  38.        * This test depends on OS not allowing file creations on a directory
  39.        * that does not have write permissions for the user. Apparently it is 
  40.        * not the case on Windows (at least under Cygwin), and possibly AIX.
  41.        * This is disabled on Windows.
  42.        */
  43.       return;
  44.     }
  45.     // bring up a cluster of 3
  46.     Configuration conf = new Configuration();
  47.     conf.setLong("dfs.block.size", 512L);
  48.     MiniDFSCluster cluster = new MiniDFSCluster(conf, 3, true, null);
  49.     cluster.waitActive();
  50.     FileSystem fs = cluster.getFileSystem();
  51.     final int dnIndex = 0;
  52.     String dataDir = cluster.getDataDirectory();
  53.     File dir1 = new File(new File(dataDir, "data"+(2*dnIndex+1)), "tmp");
  54.     File dir2 = new File(new File(dataDir, "data"+(2*dnIndex+2)), "tmp");
  55.     try {
  56.       // make the data directory of the first datanode to be readonly
  57.       assertTrue(dir1.setReadOnly());
  58.       assertTrue(dir2.setReadOnly());
  59.       // create files and make sure that first datanode will be down
  60.       DataNode dn = cluster.getDataNodes().get(dnIndex);
  61.       for (int i=0; DataNode.isDatanodeUp(dn); i++) {
  62.         Path fileName = new Path("/test.txt"+i);
  63.         DFSTestUtil.createFile(fs, fileName, 1024, (short)2, 1L);
  64.         DFSTestUtil.waitReplication(fs, fileName, (short)2);
  65.         fs.delete(fileName, true);
  66.       }
  67.     } finally {
  68.       // restore its old permission
  69.       dir1.setWritable(true);
  70.       dir2.setWritable(true);
  71.       cluster.shutdown();
  72.     }
  73.   }
  74.   
  75.   public void testReplicationError() throws Exception {
  76.     // bring up a cluster of 1
  77.     Configuration conf = new Configuration();
  78.     MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
  79.     cluster.waitActive();
  80.     FileSystem fs = cluster.getFileSystem();
  81.     
  82.     try {
  83.       // create a file of replication factor of 1
  84.       final Path fileName = new Path("/test.txt");
  85.       final int fileLen = 1;
  86.       DFSTestUtil.createFile(fs, fileName, 1, (short)1, 1L);
  87.       DFSTestUtil.waitReplication(fs, fileName, (short)1);
  88.       // get the block belonged to the created file
  89.       LocatedBlocks blocks = cluster.getNameNode().namesystem.getBlockLocations(
  90.           fileName.toString(), 0, (long)fileLen);
  91.       assertEquals(blocks.locatedBlockCount(), 1);
  92.       LocatedBlock block = blocks.get(0);
  93.       
  94.       // bring up a second datanode
  95.       cluster.startDataNodes(conf, 1, true, null, null);
  96.       cluster.waitActive();
  97.       final int sndNode = 1;
  98.       DataNode datanode = cluster.getDataNodes().get(sndNode);
  99.       
  100.       // replicate the block to the second datanode
  101.       InetSocketAddress target = datanode.getSelfAddr();
  102.       Socket s = new Socket(target.getAddress(), target.getPort());
  103.         //write the header.
  104.       DataOutputStream out = new DataOutputStream(
  105.           s.getOutputStream());
  106.       out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );
  107.       out.write( DataTransferProtocol.OP_WRITE_BLOCK );
  108.       out.writeLong( block.getBlock().getBlockId());
  109.       out.writeLong( block.getBlock().getGenerationStamp() );
  110.       out.writeInt(1);
  111.       out.writeBoolean( false );       // recovery flag
  112.       Text.writeString( out, "" );
  113.       out.writeBoolean(false); // Not sending src node information
  114.       out.writeInt(0);
  115.       
  116.       // write check header
  117.       out.writeByte( 1 );
  118.       out.writeInt( 512 );
  119.       out.flush();
  120.       // close the connection before sending the content of the block
  121.       out.close();
  122.       
  123.       // the temporary block & meta files should be deleted
  124.       String dataDir = cluster.getDataDirectory();
  125.       File dir1 = new File(new File(dataDir, "data"+(2*sndNode+1)), "tmp");
  126.       File dir2 = new File(new File(dataDir, "data"+(2*sndNode+2)), "tmp");
  127.       while (dir1.listFiles().length != 0 || dir2.listFiles().length != 0) {
  128.         Thread.sleep(100);
  129.       }
  130.       
  131.       // then increase the file's replication factor
  132.       fs.setReplication(fileName, (short)2);
  133.       // replication should succeed
  134.       DFSTestUtil.waitReplication(fs, fileName, (short)1);
  135.       
  136.       // clean up the file
  137.       fs.delete(fileName, false);
  138.     } finally {
  139.       cluster.shutdown();
  140.     }
  141.   }
  142. }