TestDatanodeDeath.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:14k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfs;
  19. import java.io.IOException;
  20. import junit.framework.TestCase;
  21. import org.apache.commons.logging.impl.Log4JLogger;
  22. import org.apache.hadoop.conf.Configuration;
  23. import org.apache.hadoop.fs.BlockLocation;
  24. import org.apache.hadoop.fs.FSDataInputStream;
  25. import org.apache.hadoop.fs.FSDataOutputStream;
  26. import org.apache.hadoop.fs.FileSystem;
  27. import org.apache.hadoop.fs.Path;
  28. import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
  29. import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
  30. import org.apache.hadoop.hdfs.server.datanode.DataNode;
  31. import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
  32. import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
  33. import org.apache.hadoop.hdfs.server.namenode.NameNode;
  34. import org.apache.log4j.Level;
  35. /**
  36.  * This class tests that a file need not be closed before its
  37.  * data can be read by another client.
  38.  */
  39. public class TestDatanodeDeath extends TestCase {
  40.   {
  41.     ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
  42.     ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
  43.     ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
  44.     ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
  45.     ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
  46.     ((Log4JLogger)InterDatanodeProtocol.LOG).getLogger().setLevel(Level.ALL);
  47.   }
  48.   static final int blockSize = 8192;
  49.   static final int numBlocks = 2;
  50.   static final int fileSize = numBlocks * blockSize + 1;
  51.   static final int numDatanodes = 15;
  52.   static final short replication = 3;
  53.   int numberOfFiles = 3;
  54.   int numThreads = 5;
  55.   Workload[] workload = null;
  56.   //
  57.   // an object that does a bunch of transactions
  58.   //
  59.   static class Workload extends Thread {
  60.     private short replication;
  61.     private int numberOfFiles;
  62.     private int id;
  63.     private FileSystem fs;
  64.     private long stamp;
  65.     private final long myseed;
  66.     Workload(long myseed, FileSystem fs, int threadIndex, int numberOfFiles, 
  67.              short replication, long stamp) {
  68.       this.myseed = myseed;
  69.       id = threadIndex;
  70.       this.fs = fs;
  71.       this.numberOfFiles = numberOfFiles;
  72.       this.replication = replication;
  73.       this.stamp = stamp;
  74.     }
  75.     // create a bunch of files. Write to them and then verify.
  76.     public void run() {
  77.       System.out.println("Workload starting ");
  78.       for (int i = 0; i < numberOfFiles; i++) {
  79.         Path filename = new Path(id + "." + i);
  80.         try {
  81.           System.out.println("Workload processing file " + filename);
  82.           FSDataOutputStream stm = createFile(fs, filename, replication);
  83.           DFSClient.DFSOutputStream dfstream = (DFSClient.DFSOutputStream)
  84.                                                  (stm.getWrappedStream());
  85.           dfstream.setArtificialSlowdown(1000);
  86.           writeFile(stm, myseed);
  87.           stm.close();
  88.           checkFile(fs, filename, replication, numBlocks, fileSize, myseed);
  89.         } catch (Throwable e) {
  90.           System.out.println("Workload exception " + e);
  91.           assertTrue(e.toString(), false);
  92.         }
  93.         // increment the stamp to indicate that another file is done.
  94.         synchronized (this) {
  95.           stamp++;
  96.         }
  97.       }
  98.     }
  99.     public synchronized void resetStamp() {
  100.       this.stamp = 0;
  101.     }
  102.     public synchronized long getStamp() {
  103.       return stamp;
  104.     }
  105.   }
  106.   //
  107.   // creates a file and returns a descriptor for writing to it.
  108.   //
  109.   static private FSDataOutputStream createFile(FileSystem fileSys, Path name, short repl)
  110.     throws IOException {
  111.     // create and write a file that contains three blocks of data
  112.     FSDataOutputStream stm = fileSys.create(name, true,
  113.                                             fileSys.getConf().getInt("io.file.buffer.size", 4096),
  114.                                             repl, (long)blockSize);
  115.     return stm;
  116.   }
  117.   //
  118.   // writes to file
  119.   //
  120.   static private void writeFile(FSDataOutputStream stm, long seed) throws IOException {
  121.     byte[] buffer = AppendTestUtil.randomBytes(seed, fileSize);
  122.     int mid = fileSize/2;
  123.     stm.write(buffer, 0, mid);
  124.     stm.write(buffer, mid, fileSize - mid);
  125.   }
  126.   //
  127.   // verify that the data written are sane
  128.   // 
  129.   static private void checkFile(FileSystem fileSys, Path name, int repl,
  130.                          int numblocks, int filesize, long seed)
  131.     throws IOException {
  132.     boolean done = false;
  133.     int attempt = 0;
  134.     long len = fileSys.getFileStatus(name).getLen();
  135.     assertTrue(name + " should be of size " + filesize +
  136.                " but found to be of size " + len, 
  137.                len == filesize);
  138.     // wait till all full blocks are confirmed by the datanodes.
  139.     while (!done) {
  140.       attempt++;
  141.       try {
  142.         Thread.sleep(1000);
  143.       } catch (InterruptedException e) {}
  144.       done = true;
  145.       BlockLocation[] locations = fileSys.getFileBlockLocations(
  146.           fileSys.getFileStatus(name), 0, filesize);
  147.       if (locations.length < numblocks) {
  148.         if (attempt > 100) {
  149.           System.out.println("File " + name + " has only " +
  150.                              locations.length + " blocks, " +
  151.                              " but is expected to have " + numblocks +
  152.                              " blocks.");
  153.         }
  154.         done = false;
  155.         continue;
  156.       }
  157.       for (int idx = 0; idx < locations.length; idx++) {
  158.         if (locations[idx].getHosts().length < repl) {
  159.           if (attempt > 100) {
  160.             System.out.println("File " + name + " has " +
  161.                                locations.length + " blocks: " +
  162.                                " The " + idx + " block has only " +
  163.                                locations[idx].getHosts().length + 
  164.                                " replicas but is expected to have " 
  165.                                + repl + " replicas.");
  166.           }
  167.           done = false;
  168.           break;
  169.         }
  170.       }
  171.     }
  172.     FSDataInputStream stm = fileSys.open(name);
  173.     final byte[] expected = AppendTestUtil.randomBytes(seed, fileSize);
  174.     // do a sanity check. Read the file
  175.     byte[] actual = new byte[filesize];
  176.     stm.readFully(0, actual);
  177.     checkData(actual, 0, expected, "Read 1");
  178.   }
  179.   private static void checkData(byte[] actual, int from, byte[] expected, String message) {
  180.     for (int idx = 0; idx < actual.length; idx++) {
  181.       assertEquals(message+" byte "+(from+idx)+" differs. expected "+
  182.                         expected[from+idx]+" actual "+actual[idx],
  183.                         actual[idx], expected[from+idx]);
  184.       actual[idx] = 0;
  185.     }
  186.   }
  187.   /**
  188.    * A class that kills one datanode and recreates a new one. It waits to
  189.    * ensure that that all workers have finished at least one file since the 
  190.    * last kill of a datanode. This guarantees that all three replicas of
  191.    * a block do not get killed (otherwise the file will be corrupt and the
  192.    * test will fail).
  193.    */
  194.   class Modify extends Thread {
  195.     volatile boolean running;
  196.     MiniDFSCluster cluster;
  197.     Configuration conf;
  198.     Modify(Configuration conf, MiniDFSCluster cluster) {
  199.       running = true;
  200.       this.cluster = cluster;
  201.       this.conf = conf;
  202.     }
  203.     public void run() {
  204.       while (running) {
  205.         try {
  206.           Thread.sleep(1000);
  207.         } catch (InterruptedException e) {
  208.           continue;
  209.         }
  210.         // check if all threads have a new stamp. 
  211.         // If so, then all workers have finished at least one file
  212.         // since the last stamp.
  213.         boolean loop = false;
  214.         for (int i = 0; i < numThreads; i++) {
  215.           if (workload[i].getStamp() == 0) {
  216.             loop = true;
  217.             break;
  218.           }
  219.         }
  220.         if (loop) {
  221.           continue;
  222.         }
  223.         // Now it is guaranteed that there will be at least one valid
  224.         // replica of a file.
  225.         for (int i = 0; i < replication - 1; i++) {
  226.           // pick a random datanode to shutdown
  227.           int victim = AppendTestUtil.nextInt(numDatanodes);
  228.           try {
  229.             System.out.println("Stopping datanode " + victim);
  230.             cluster.restartDataNode(victim);
  231.             // cluster.startDataNodes(conf, 1, true, null, null);
  232.           } catch (IOException e) {
  233.             System.out.println("TestDatanodeDeath Modify exception " + e);
  234.             assertTrue("TestDatanodeDeath Modify exception " + e, false);
  235.             running = false;
  236.           }
  237.         }
  238.         // set a new stamp for all workers
  239.         for (int i = 0; i < numThreads; i++) {
  240.           workload[i].resetStamp();
  241.         }
  242.       }
  243.     }
  244.     // Make the thread exit.
  245.     void close() {
  246.       running = false;
  247.       this.interrupt();
  248.     }
  249.   }
  250.   /**
  251.    * Test that writing to files is good even when datanodes in the pipeline
  252.    * dies.
  253.    */
  254.   private void complexTest() throws IOException {
  255.     Configuration conf = new Configuration();
  256.     conf.setInt("heartbeat.recheck.interval", 2000);
  257.     conf.setInt("dfs.heartbeat.interval", 2);
  258.     conf.setInt("dfs.replication.pending.timeout.sec", 2);
  259.     conf.setInt("dfs.socket.timeout", 5000);
  260.     MiniDFSCluster cluster = new MiniDFSCluster(conf, numDatanodes, true, null);
  261.     cluster.waitActive();
  262.     FileSystem fs = cluster.getFileSystem();
  263.     Modify modThread = null;
  264.     try {
  265.       
  266.       // Create threads and make them run workload concurrently.
  267.       workload = new Workload[numThreads];
  268.       for (int i = 0; i < numThreads; i++) {
  269.         workload[i] = new Workload(AppendTestUtil.nextLong(), fs, i, numberOfFiles, replication, 0);
  270.         workload[i].start();
  271.       }
  272.       // Create a thread that kills existing datanodes and creates new ones.
  273.       modThread = new Modify(conf, cluster);
  274.       modThread.start();
  275.       // wait for all transactions to get over
  276.       for (int i = 0; i < numThreads; i++) {
  277.         try {
  278.           System.out.println("Waiting for thread " + i + " to complete...");
  279.           workload[i].join();
  280.           // if most of the threads are done, then stop restarting datanodes.
  281.           if (i >= numThreads/2) {
  282.             modThread.close();
  283.           }
  284.          
  285.         } catch (InterruptedException e) {
  286.           i--;      // retry
  287.         }
  288.       }
  289.     } finally {
  290.       if (modThread != null) {
  291.         modThread.close();
  292.         try {
  293.           modThread.join();
  294.         } catch (InterruptedException e) {}
  295.       }
  296.       fs.close();
  297.       cluster.shutdown();
  298.     }
  299.   }
  300.   /**
  301.    * Write to one file, then kill one datanode in the pipeline and then
  302.    * close the file.
  303.    */
  304.   private void simpleTest(int datanodeToKill) throws IOException {
  305.     Configuration conf = new Configuration();
  306.     conf.setInt("heartbeat.recheck.interval", 2000);
  307.     conf.setInt("dfs.heartbeat.interval", 1);
  308.     conf.setInt("dfs.replication.pending.timeout.sec", 2);
  309.     conf.setInt("dfs.socket.timeout", 5000);
  310.     int myMaxNodes = 5;
  311.     System.out.println("SimpleTest starting with DataNode to Kill " + 
  312.                        datanodeToKill);
  313.     MiniDFSCluster cluster = new MiniDFSCluster(conf, myMaxNodes, true, null);
  314.     cluster.waitActive();
  315.     FileSystem fs = cluster.getFileSystem();
  316.     short repl = 3;
  317.     Path filename = new Path("simpletest.dat");
  318.     try {
  319.       // create a file and write one block of data
  320.       System.out.println("SimpleTest creating file " + filename);
  321.       FSDataOutputStream stm = createFile(fs, filename, repl);
  322.       DFSClient.DFSOutputStream dfstream = (DFSClient.DFSOutputStream)
  323.                                              (stm.getWrappedStream());
  324.       // these are test settings
  325.       dfstream.setChunksPerPacket(5);
  326.       dfstream.setArtificialSlowdown(3000);
  327.       final long myseed = AppendTestUtil.nextLong();
  328.       byte[] buffer = AppendTestUtil.randomBytes(myseed, fileSize);
  329.       int mid = fileSize/4;
  330.       stm.write(buffer, 0, mid);
  331.       DatanodeInfo[] targets = dfstream.getPipeline();
  332.       int count = 5;
  333.       while (count-- > 0 && targets == null) {
  334.         try {
  335.           System.out.println("SimpleTest: Waiting for pipeline to be created.");
  336.           Thread.sleep(1000);
  337.         } catch (InterruptedException e) {
  338.         }
  339.         targets = dfstream.getPipeline();
  340.       }
  341.       if (targets == null) {
  342.         int victim = AppendTestUtil.nextInt(myMaxNodes);
  343.         System.out.println("SimpleTest stopping datanode random " + victim);
  344.         cluster.stopDataNode(victim);
  345.       } else {
  346.         int victim = datanodeToKill;
  347.         System.out.println("SimpleTest stopping datanode " +
  348.                             targets[victim].getName());
  349.         cluster.stopDataNode(targets[victim].getName());
  350.       }
  351.       System.out.println("SimpleTest stopping datanode complete");
  352.       // write some more data to file, close and verify
  353.       stm.write(buffer, mid, fileSize - mid);
  354.       stm.close();
  355.       checkFile(fs, filename, repl, numBlocks, fileSize, myseed);
  356.     } catch (Throwable e) {
  357.       System.out.println("Simple Workload exception " + e);
  358.       e.printStackTrace();
  359.       assertTrue(e.toString(), false);
  360.     } finally {
  361.       fs.close();
  362.       cluster.shutdown();
  363.     }
  364.   }
  365.   public void testSimple0() throws IOException {simpleTest(0);}
  366.   public void testSimple1() throws IOException {simpleTest(1);}
  367.   public void testSimple2() throws IOException {simpleTest(2);}
  368.   public void testComplex() throws IOException {complexTest();}
  369. }