UpgradeUtilities.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:15k
源码类别:

网格计算

开发平台:

Java

  1. /*
  2.  * UpgradeUtilities.java
  3.  *
  4.  * Licensed to the Apache Software Foundation (ASF) under one
  5.  * or more contributor license agreements.  See the NOTICE file
  6.  * distributed with this work for additional information
  7.  * regarding copyright ownership.  The ASF licenses this file
  8.  * to you under the Apache License, Version 2.0 (the
  9.  * "License"); you may not use this file except in compliance
  10.  * with the License.  You may obtain a copy of the License at
  11.  *
  12.  *     http://www.apache.org/licenses/LICENSE-2.0
  13.  *
  14.  * Unless required by applicable law or agreed to in writing, software
  15.  * distributed under the License is distributed on an "AS IS" BASIS,
  16.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  17.  * See the License for the specific language governing permissions and
  18.  * limitations under the License.
  19.  */
  20. package org.apache.hadoop.hdfs;
  21. import java.io.File;
  22. import java.io.FileInputStream;
  23. import java.io.IOException;
  24. import java.io.OutputStream;
  25. import java.io.RandomAccessFile;
  26. import java.util.Arrays;
  27. import java.util.Random;
  28. import java.util.zip.CRC32;
  29. import org.apache.hadoop.conf.Configuration;
  30. import org.apache.hadoop.fs.FileSystem;
  31. import org.apache.hadoop.fs.FileUtil;
  32. import org.apache.hadoop.fs.LocalFileSystem;
  33. import org.apache.hadoop.fs.Path;
  34. import org.apache.hadoop.hdfs.protocol.FSConstants;
  35. import org.apache.hadoop.hdfs.server.common.HdfsConstants.NodeType;
  36. import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
  37. import static org.apache.hadoop.hdfs.server.common.HdfsConstants.NodeType.NAME_NODE;
  38. import static org.apache.hadoop.hdfs.server.common.HdfsConstants.NodeType.DATA_NODE;
  39. import org.apache.hadoop.hdfs.server.common.HdfsConstants;
  40. import org.apache.hadoop.hdfs.server.common.Storage;
  41. import org.apache.hadoop.hdfs.server.common.StorageInfo;
  42. import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
  43. import org.apache.hadoop.hdfs.server.datanode.DataStorage;
  44. import org.apache.hadoop.hdfs.server.namenode.FSImage;
  45. import org.apache.hadoop.hdfs.server.namenode.NameNode;
  46. /**
  47.  * This class defines a number of static helper methods used by the
  48.  * DFS Upgrade unit tests.  By default, a singleton master populated storage
  49.  * directory is created for a Namenode (contains edits, fsimage,
  50.  * version, and time files) and a Datanode (contains version and
  51.  * block files).  The master directories are lazily created.  They are then
  52.  * copied by the createStorageDirs() method to create new storage
  53.  * directories of the appropriate type (Namenode or Datanode).
  54.  */
  55. public class UpgradeUtilities {
  56.   // Root scratch directory on local filesystem 
  57.   private static File TEST_ROOT_DIR = new File(
  58.       System.getProperty("test.build.data","/tmp").replace(' ', '+'));
  59.   // The singleton master storage directory for Namenode
  60.   private static File namenodeStorage = new File(TEST_ROOT_DIR, "namenodeMaster");
  61.   // A checksum of the contents in namenodeStorage directory
  62.   private static long namenodeStorageChecksum;
  63.   // The namespaceId of the namenodeStorage directory
  64.   private static int namenodeStorageNamespaceID;
  65.   // The fsscTime of the namenodeStorage directory
  66.   private static long namenodeStorageFsscTime;
  67.   // The singleton master storage directory for Datanode
  68.   private static File datanodeStorage = new File(TEST_ROOT_DIR, "datanodeMaster");
  69.   // A checksum of the contents in datanodeStorage directory
  70.   private static long datanodeStorageChecksum;
  71.   
  72.   /**
  73.    * Initialize the data structures used by this class.  
  74.    * IMPORTANT NOTE: This method must be called once before calling 
  75.    *                 any other public method on this class.  
  76.    * <p>
  77.    * Creates a singleton master populated storage
  78.    * directory for a Namenode (contains edits, fsimage,
  79.    * version, and time files) and a Datanode (contains version and
  80.    * block files).  This can be a lengthy operation.
  81.    */
  82.   public static void initialize() throws Exception {
  83.     createEmptyDirs(new String[] {TEST_ROOT_DIR.toString()});
  84.     Configuration config = new Configuration();
  85.     config.set("dfs.name.dir", namenodeStorage.toString());
  86.     config.set("dfs.data.dir", datanodeStorage.toString());
  87.     MiniDFSCluster cluster = null;
  88.     try {
  89.       // format data-node
  90.       createEmptyDirs(new String[] {datanodeStorage.toString()});
  91.       
  92.       // format and start NameNode and start DataNode
  93.       NameNode.format(config); 
  94.       cluster = new MiniDFSCluster(config, 1, StartupOption.REGULAR);
  95.         
  96.       NameNode namenode = cluster.getNameNode();
  97.       namenodeStorageNamespaceID = namenode.versionRequest().getNamespaceID();
  98.       namenodeStorageFsscTime = namenode.versionRequest().getCTime();
  99.       
  100.       FileSystem fs = FileSystem.get(config);
  101.       Path baseDir = new Path("/TestUpgrade");
  102.       fs.mkdirs(baseDir);
  103.       
  104.       // write some files
  105.       int bufferSize = 4096;
  106.       byte[] buffer = new byte[bufferSize];
  107.       for(int i=0; i < bufferSize; i++)
  108.         buffer[i] = (byte)('0' + i % 50);
  109.       writeFile(fs, new Path(baseDir, "file1"), buffer, bufferSize);
  110.       writeFile(fs, new Path(baseDir, "file2"), buffer, bufferSize);
  111.       
  112.       // save image
  113.       namenode.getFSImage().saveFSImage();
  114.       namenode.getFSImage().getEditLog().open();
  115.       
  116.       // write more files
  117.       writeFile(fs, new Path(baseDir, "file3"), buffer, bufferSize);
  118.       writeFile(fs, new Path(baseDir, "file4"), buffer, bufferSize);
  119.     } finally {
  120.       // shutdown
  121.       if (cluster != null) cluster.shutdown();
  122.       FileUtil.fullyDelete(new File(namenodeStorage,"in_use.lock"));
  123.       FileUtil.fullyDelete(new File(datanodeStorage,"in_use.lock"));
  124.     }
  125.     namenodeStorageChecksum = checksumContents(
  126.                                                NAME_NODE, new File(namenodeStorage,"current"));
  127.     datanodeStorageChecksum = checksumContents(
  128.                                                DATA_NODE, new File(datanodeStorage,"current"));
  129.   }
  130.   
  131.   // Private helper method that writes a file to the given file system.
  132.   private static void writeFile(FileSystem fs, Path path, byte[] buffer,
  133.                                 int bufferSize) throws IOException 
  134.   {
  135.     OutputStream out;
  136.     out = fs.create(path, true, bufferSize, (short) 1, 1024);
  137.     out.write(buffer, 0, bufferSize);
  138.     out.close();
  139.   }
  140.   
  141.   /**
  142.    * Initialize dfs.name.dir and dfs.data.dir with the specified number of
  143.    * directory entries. Also initialize dfs.blockreport.intervalMsec.
  144.    */
  145.   public static Configuration initializeStorageStateConf(int numDirs,
  146.                                                          Configuration conf) {
  147.     StringBuffer nameNodeDirs =
  148.       new StringBuffer(new File(TEST_ROOT_DIR, "name1").toString());
  149.     StringBuffer dataNodeDirs =
  150.       new StringBuffer(new File(TEST_ROOT_DIR, "data1").toString());
  151.     for (int i = 2; i <= numDirs; i++) {
  152.       nameNodeDirs.append("," + new File(TEST_ROOT_DIR, "name"+i));
  153.       dataNodeDirs.append("," + new File(TEST_ROOT_DIR, "data"+i));
  154.     }
  155.     if (conf == null) {
  156.       conf = new Configuration();
  157.     }
  158.     conf.set("dfs.name.dir", nameNodeDirs.toString());
  159.     conf.set("dfs.data.dir", dataNodeDirs.toString());
  160.     conf.setInt("dfs.blockreport.intervalMsec", 10000);
  161.     return conf;
  162.   }
  163.   
  164.   /**
  165.    * Create empty directories.  If a specified directory already exists
  166.    * then it is first removed.
  167.    */
  168.   public static void createEmptyDirs(String[] dirs) throws IOException {
  169.     for (String d : dirs) {
  170.       File dir = new File(d);
  171.       if (dir.exists()) {
  172.         FileUtil.fullyDelete(dir);
  173.       }
  174.       dir.mkdirs();
  175.     }
  176.   }
  177.   
  178.   /**
  179.    * Return the checksum for the singleton master storage directory
  180.    * of the given node type.
  181.    */
  182.   public static long checksumMasterContents(NodeType nodeType) throws IOException {
  183.     if (nodeType == NAME_NODE) {
  184.       return namenodeStorageChecksum;
  185.     } else {
  186.       return datanodeStorageChecksum;
  187.     }
  188.   }
  189.   
  190.   /**
  191.    * Compute the checksum of all the files in the specified directory.
  192.    * The contents of subdirectories are not included. This method provides
  193.    * an easy way to ensure equality between the contents of two directories.
  194.    *
  195.    * @param nodeType if DATA_NODE then any file named "VERSION" is ignored.
  196.    *    This is because this file file is changed every time
  197.    *    the Datanode is started.
  198.    * @param dir must be a directory. Subdirectories are ignored.
  199.    *
  200.    * @throws IllegalArgumentException if specified directory is not a directory
  201.    * @throws IOException if an IOException occurs while reading the files
  202.    * @return the computed checksum value
  203.    */
  204.   public static long checksumContents(NodeType nodeType, File dir) throws IOException {
  205.     if (!dir.isDirectory()) {
  206.       throw new IllegalArgumentException(
  207.                                          "Given argument is not a directory:" + dir);
  208.     }
  209.     File[] list = dir.listFiles();
  210.     Arrays.sort(list);
  211.     CRC32 checksum = new CRC32();
  212.     for (int i = 0; i < list.length; i++) {
  213.       if (!list[i].isFile()) {
  214.         continue;
  215.       }
  216.       // skip VERSION file for DataNodes
  217.       if (nodeType == DATA_NODE && list[i].getName().equals("VERSION")) {
  218.         continue; 
  219.       }
  220.       FileInputStream fis = null;
  221.       try {
  222.         fis = new FileInputStream(list[i]);
  223.         byte[] buffer = new byte[1024];
  224.         int bytesRead;
  225.         while ((bytesRead = fis.read(buffer)) != -1) {
  226.           checksum.update(buffer, 0, bytesRead);
  227.         }
  228.       } finally {
  229.         if(fis != null) {
  230.           fis.close();
  231.         }
  232.       }
  233.     }
  234.     return checksum.getValue();
  235.   }
  236.   
  237.   /**
  238.    * Simulate the <code>dfs.name.dir</code> or <code>dfs.data.dir</code>
  239.    * of a populated DFS filesystem.
  240.    *
  241.    * This method creates and populates the directory specified by
  242.    *  <code>parent/dirName</code>, for each parent directory.
  243.    * The contents of the new directories will be
  244.    * appropriate for the given node type.  If the directory does not
  245.    * exist, it will be created.  If the directory already exists, it
  246.    * will first be deleted.
  247.    *
  248.    * By default, a singleton master populated storage
  249.    * directory is created for a Namenode (contains edits, fsimage,
  250.    * version, and time files) and a Datanode (contains version and
  251.    * block files).  These directories are then
  252.    * copied by this method to create new storage
  253.    * directories of the appropriate type (Namenode or Datanode).
  254.    *
  255.    * @return the array of created directories
  256.    */
  257.   public static File[] createStorageDirs(NodeType nodeType, String[] parents, String dirName) throws Exception {
  258.     File[] retVal = new File[parents.length];
  259.     for (int i = 0; i < parents.length; i++) {
  260.       File newDir = new File(parents[i], dirName);
  261.       createEmptyDirs(new String[] {newDir.toString()});
  262.       LocalFileSystem localFS = FileSystem.getLocal(new Configuration());
  263.       switch (nodeType) {
  264.       case NAME_NODE:
  265.         localFS.copyToLocalFile(new Path(namenodeStorage.toString(), "current"),
  266.                                 new Path(newDir.toString()),
  267.                                 false);
  268.         Path newImgDir = new Path(newDir.getParent(), "image");
  269.         if (!localFS.exists(newImgDir))
  270.           localFS.copyToLocalFile(
  271.               new Path(namenodeStorage.toString(), "image"),
  272.               newImgDir,
  273.               false);
  274.         break;
  275.       case DATA_NODE:
  276.         localFS.copyToLocalFile(new Path(datanodeStorage.toString(), "current"),
  277.                                 new Path(newDir.toString()),
  278.                                 false);
  279.         Path newStorageFile = new Path(newDir.getParent(), "storage");
  280.         if (!localFS.exists(newStorageFile))
  281.           localFS.copyToLocalFile(
  282.               new Path(datanodeStorage.toString(), "storage"),
  283.               newStorageFile,
  284.               false);
  285.         break;
  286.       }
  287.       retVal[i] = newDir;
  288.     }
  289.     return retVal;
  290.   }
  291.   
  292.   /**
  293.    * Create a <code>version</code> file inside the specified parent
  294.    * directory.  If such a file already exists, it will be overwritten.
  295.    * The given version string will be written to the file as the layout
  296.    * version. None of the parameters may be null.
  297.    *
  298.    * @param version
  299.    *
  300.    * @return the created version file
  301.    */
  302.   public static File[] createVersionFile(NodeType nodeType, File[] parent,
  303.                                          StorageInfo version) throws IOException 
  304.   {
  305.     Storage storage = null;
  306.     File[] versionFiles = new File[parent.length];
  307.     for (int i = 0; i < parent.length; i++) {
  308.       File versionFile = new File(parent[i], "VERSION");
  309.       FileUtil.fullyDelete(versionFile);
  310.       switch (nodeType) {
  311.       case NAME_NODE:
  312.         storage = new FSImage(version);
  313.         break;
  314.       case DATA_NODE:
  315.         storage = new DataStorage(version, "doNotCare");
  316.         break;
  317.       }
  318.       StorageDirectory sd = storage.new StorageDirectory(parent[i].getParentFile());
  319.       sd.write(versionFile);
  320.       versionFiles[i] = versionFile;
  321.     }
  322.     return versionFiles;
  323.   }
  324.   
  325.   /**
  326.    * Corrupt the specified file.  Some random bytes within the file
  327.    * will be changed to some random values.
  328.    *
  329.    * @throws IllegalArgumentException if the given file is not a file
  330.    * @throws IOException if an IOException occurs while reading or writing the file
  331.    */
  332.   public static void corruptFile(File file) throws IOException {
  333.     if (!file.isFile()) {
  334.       throw new IllegalArgumentException(
  335.                                          "Given argument is not a file:" + file);
  336.     }
  337.     RandomAccessFile raf = new RandomAccessFile(file,"rws");
  338.     Random random = new Random();
  339.     for (long i = 0; i < raf.length(); i++) {
  340.       raf.seek(i);
  341.       if (random.nextBoolean()) {
  342.         raf.writeByte(random.nextInt());
  343.       }
  344.     }
  345.     raf.close();
  346.   }
  347.   
  348.   /**
  349.    * Return the layout version inherent in the current version
  350.    * of the Namenode, whether it is running or not.
  351.    */
  352.   public static int getCurrentLayoutVersion() {
  353.     return FSConstants.LAYOUT_VERSION;
  354.   }
  355.   
  356.   /**
  357.    * Return the namespace ID inherent in the currently running
  358.    * Namenode.  If no Namenode is running, return the namespace ID of
  359.    * the master Namenode storage directory.
  360.    *
  361.    * The UpgradeUtilities.initialize() method must be called once before
  362.    * calling this method.
  363.    */
  364.   public static int getCurrentNamespaceID(MiniDFSCluster cluster) throws IOException {
  365.     if (cluster != null) {
  366.       return cluster.getNameNode().versionRequest().getNamespaceID();
  367.     }
  368.     return namenodeStorageNamespaceID;
  369.   }
  370.   
  371.   /**
  372.    * Return the File System State Creation Timestamp (FSSCTime) inherent
  373.    * in the currently running Namenode.  If no Namenode is running,
  374.    * return the FSSCTime of the master Namenode storage directory.
  375.    *
  376.    * The UpgradeUtilities.initialize() method must be called once before
  377.    * calling this method.
  378.    */
  379.   public static long getCurrentFsscTime(MiniDFSCluster cluster) throws IOException {
  380.     if (cluster != null) {
  381.       return cluster.getNameNode().versionRequest().getCTime();
  382.     }
  383.     return namenodeStorageFsscTime;
  384.   }
  385. }