TestDFSUpgradeFromImage.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:7k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfs;
  19. import junit.framework.TestCase;
  20. import java.io.*;
  21. import java.net.InetSocketAddress;
  22. import java.util.Iterator;
  23. import java.util.LinkedList;
  24. import java.util.TreeMap;
  25. import java.util.zip.CRC32;
  26. import org.apache.hadoop.conf.Configuration;
  27. import org.apache.hadoop.fs.FSInputStream;
  28. import org.apache.hadoop.fs.FileStatus;
  29. import org.apache.hadoop.fs.FileUtil;
  30. import org.apache.hadoop.hdfs.protocol.FSConstants;
  31. import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
  32. import org.apache.commons.logging.Log;
  33. import org.apache.commons.logging.LogFactory;
  34. /**
  35.  * This tests data transfer protocol handling in the Datanode. It sends
  36.  * various forms of wrong data and verifies that Datanode handles it well.
  37.  * 
  38.  * This test uses the following two file from src/test/.../dfs directory :
  39.  *   1) hadoop-version-dfs-dir.tgz : contains DFS directories.
  40.  *   2) hadoop-dfs-dir.txt : checksums that are compared in this test.
  41.  * Please read hadoop-dfs-dir.txt for more information.  
  42.  */
  43. public class TestDFSUpgradeFromImage extends TestCase {
  44.   
  45.   private static final Log LOG = LogFactory.getLog(
  46.                     "org.apache.hadoop.hdfs.TestDFSUpgradeFromImage");
  47.   
  48.   public int numDataNodes = 4;
  49.   
  50.   private static class ReferenceFileInfo {
  51.     String path;
  52.     long checksum;
  53.   }
  54.   
  55.   LinkedList<ReferenceFileInfo> refList = new LinkedList<ReferenceFileInfo>();
  56.   Iterator<ReferenceFileInfo> refIter;
  57.   
  58.   boolean printChecksum = false;
  59.   
  60.   protected void setUp() throws IOException {
  61.     unpackStorage();
  62.   }
  63.   public void unpackStorage() throws IOException {
  64.     String tarFile = System.getProperty("test.cache.data", "build/test/cache") +
  65.                      "/hadoop-14-dfs-dir.tgz";
  66.     String dataDir = System.getProperty("test.build.data", "build/test/data");
  67.     File dfsDir = new File(dataDir, "dfs");
  68.     if ( dfsDir.exists() && !FileUtil.fullyDelete(dfsDir) ) {
  69.       throw new IOException("Could not delete dfs directory '" + dfsDir + "'");
  70.     }
  71.     FileUtil.unTar(new File(tarFile), new File(dataDir));
  72.     //Now read the reference info
  73.     
  74.     BufferedReader reader = new BufferedReader( 
  75.                         new FileReader(System.getProperty("test.cache.data", "build/test/cache") +
  76.                                        "/hadoop-dfs-dir.txt"));
  77.     String line;
  78.     while ( (line = reader.readLine()) != null ) {
  79.       
  80.       line = line.trim();
  81.       if (line.length() <= 0 || line.startsWith("#")) {
  82.         continue;
  83.       }
  84.       String[] arr = line.split("\s+t\s+");
  85.       if (arr.length < 1) {
  86.         continue;
  87.       }
  88.       if (arr[0].equals("printChecksums")) {
  89.         printChecksum = true;
  90.         break;
  91.       }
  92.       if (arr.length < 2) {
  93.         continue;
  94.       }
  95.       ReferenceFileInfo info = new ReferenceFileInfo();
  96.       info.path = arr[0];
  97.       info.checksum = Long.parseLong(arr[1]);
  98.       refList.add(info);
  99.     }
  100.     reader.close();
  101.   }
  102.   private void verifyChecksum(String path, long checksum) throws IOException {
  103.     if ( refIter == null ) {
  104.       refIter = refList.iterator();
  105.     }
  106.     
  107.     if ( printChecksum ) {
  108.       LOG.info("CRC info for reference file : " + path + " t " + checksum);
  109.     } else {
  110.       if ( !refIter.hasNext() ) {
  111.         throw new IOException("Checking checksum for " + path +
  112.                               "Not enough elements in the refList");
  113.       }
  114.       ReferenceFileInfo info = refIter.next();
  115.       // The paths are expected to be listed in the same order 
  116.       // as they are traversed here.
  117.       assertEquals(info.path, path);
  118.       assertEquals("Checking checksum for " + path, info.checksum, checksum);
  119.     }
  120.   }
  121.   
  122.   CRC32 overallChecksum = new CRC32();
  123.   
  124.   private void verifyDir(DFSClient client, String dir) 
  125.                                            throws IOException {
  126.     
  127.     FileStatus[] fileArr = client.listPaths(dir);
  128.     TreeMap<String, Boolean> fileMap = new TreeMap<String, Boolean>();
  129.     
  130.     for(FileStatus file : fileArr) {
  131.       String path = file.getPath().toString();
  132.       fileMap.put(path, Boolean.valueOf(file.isDir()));
  133.     }
  134.     
  135.     for(Iterator<String> it = fileMap.keySet().iterator(); it.hasNext();) {
  136.       String path = it.next();
  137.       boolean isDir = fileMap.get(path);
  138.       
  139.       overallChecksum.update(path.getBytes());
  140.       
  141.       if ( isDir ) {
  142.         verifyDir(client, path);
  143.       } else {
  144.         // this is not a directory. Checksum the file data.
  145.         CRC32 fileCRC = new CRC32();
  146.         FSInputStream in = client.open(path);
  147.         byte[] buf = new byte[4096];
  148.         int nRead = 0;
  149.         while ( (nRead = in.read(buf, 0, buf.length)) > 0 ) {
  150.           fileCRC.update(buf, 0, nRead);
  151.         }
  152.         
  153.         verifyChecksum(path, fileCRC.getValue());
  154.       }
  155.     }
  156.   }
  157.   
  158.   private void verifyFileSystem(DFSClient client) throws IOException {
  159.   
  160.     verifyDir(client, "/");
  161.     
  162.     verifyChecksum("overallCRC", overallChecksum.getValue());
  163.     
  164.     if ( printChecksum ) {
  165.       throw new IOException("Checksums are written to log as requested. " +
  166.                             "Throwing this exception to force an error " +
  167.                             "for this test.");
  168.     }
  169.   }
  170.   
  171.   public void testUpgradeFromImage() throws IOException {
  172.     MiniDFSCluster cluster = null;
  173.     try {
  174.       Configuration conf = new Configuration();
  175.       if (System.getProperty("test.build.data") == null) { // to allow test to be run outside of Ant
  176.         System.setProperty("test.build.data", "build/test/data");
  177.       }
  178.       conf.setInt("dfs.datanode.scan.period.hours", -1); // block scanning off
  179.       cluster = new MiniDFSCluster(0, conf, numDataNodes, false, true,
  180.                                    StartupOption.UPGRADE, null);
  181.       cluster.waitActive();
  182.       DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost",
  183.                                            cluster.getNameNodePort()), conf);
  184.       //Safemode will be off only after upgrade is complete. Wait for it.
  185.       while ( dfsClient.setSafeMode(FSConstants.SafeModeAction.SAFEMODE_GET) ) {
  186.         LOG.info("Waiting for SafeMode to be OFF.");
  187.         try {
  188.           Thread.sleep(1000);
  189.         } catch (InterruptedException ignored) {}
  190.       }
  191.       verifyFileSystem(dfsClient);
  192.     } finally {
  193.       if (cluster != null) { cluster.shutdown(); }
  194.     }
  195.   }
  196. }