DataStorage.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:16k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfs.server.datanode;
  19. import java.io.File;
  20. import java.io.FileInputStream;
  21. import java.io.FileOutputStream;
  22. import java.io.IOException;
  23. import java.io.RandomAccessFile;
  24. import java.nio.channels.FileLock;
  25. import java.util.Collection;
  26. import java.util.ArrayList;
  27. import java.util.Iterator;
  28. import java.util.Properties;
  29. import java.util.regex.Matcher;
  30. import java.util.regex.Pattern;
  31. import org.apache.hadoop.hdfs.protocol.Block;
  32. import org.apache.hadoop.hdfs.protocol.FSConstants;
  33. import org.apache.hadoop.hdfs.server.common.HdfsConstants.NodeType;
  34. import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
  35. import org.apache.hadoop.hdfs.server.common.HdfsConstants;
  36. import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
  37. import org.apache.hadoop.hdfs.server.common.Storage;
  38. import org.apache.hadoop.hdfs.server.common.StorageInfo;
  39. import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
  40. import org.apache.hadoop.util.Daemon;
  41. import org.apache.hadoop.fs.FileUtil.HardLink;
  42. import org.apache.hadoop.io.IOUtils;
  43. /** 
  44.  * Data storage information file.
  45.  * <p>
  46.  * @see Storage
  47.  */
  48. public class DataStorage extends Storage {
  49.   // Constants
  50.   final static String BLOCK_SUBDIR_PREFIX = "subdir";
  51.   final static String BLOCK_FILE_PREFIX = "blk_";
  52.   final static String COPY_FILE_PREFIX = "dncp_";
  53.   
  54.   private String storageID;
  55.   DataStorage() {
  56.     super(NodeType.DATA_NODE);
  57.     storageID = "";
  58.   }
  59.   
  60.   DataStorage(int nsID, long cT, String strgID) {
  61.     super(NodeType.DATA_NODE, nsID, cT);
  62.     this.storageID = strgID;
  63.   }
  64.   
  65.   public DataStorage(StorageInfo storageInfo, String strgID) {
  66.     super(NodeType.DATA_NODE, storageInfo);
  67.     this.storageID = strgID;
  68.   }
  69.   public String getStorageID() {
  70.     return storageID;
  71.   }
  72.   
  73.   void setStorageID(String newStorageID) {
  74.     this.storageID = newStorageID;
  75.   }
  76.   
  77.   /**
  78.    * Analyze storage directories.
  79.    * Recover from previous transitions if required. 
  80.    * Perform fs state transition if necessary depending on the namespace info.
  81.    * Read storage info. 
  82.    * 
  83.    * @param nsInfo namespace information
  84.    * @param dataDirs array of data storage directories
  85.    * @param startOpt startup option
  86.    * @throws IOException
  87.    */
  88.   void recoverTransitionRead(NamespaceInfo nsInfo,
  89.                              Collection<File> dataDirs,
  90.                              StartupOption startOpt
  91.                              ) throws IOException {
  92.     assert FSConstants.LAYOUT_VERSION == nsInfo.getLayoutVersion() :
  93.       "Data-node and name-node layout versions must be the same.";
  94.     
  95.     // 1. For each data directory calculate its state and 
  96.     // check whether all is consistent before transitioning.
  97.     // Format and recover.
  98.     this.storageID = "";
  99.     this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size());
  100.     ArrayList<StorageState> dataDirStates = new ArrayList<StorageState>(dataDirs.size());
  101.     for(Iterator<File> it = dataDirs.iterator(); it.hasNext();) {
  102.       File dataDir = it.next();
  103.       StorageDirectory sd = new StorageDirectory(dataDir);
  104.       StorageState curState;
  105.       try {
  106.         curState = sd.analyzeStorage(startOpt);
  107.         // sd is locked but not opened
  108.         switch(curState) {
  109.         case NORMAL:
  110.           break;
  111.         case NON_EXISTENT:
  112.           // ignore this storage
  113.           LOG.info("Storage directory " + dataDir + " does not exist.");
  114.           it.remove();
  115.           continue;
  116.         case NOT_FORMATTED: // format
  117.           LOG.info("Storage directory " + dataDir + " is not formatted.");
  118.           LOG.info("Formatting ...");
  119.           format(sd, nsInfo);
  120.           break;
  121.         default:  // recovery part is common
  122.           sd.doRecover(curState);
  123.         }
  124.       } catch (IOException ioe) {
  125.         sd.unlock();
  126.         throw ioe;
  127.       }
  128.       // add to the storage list
  129.       addStorageDir(sd);
  130.       dataDirStates.add(curState);
  131.     }
  132.     if (dataDirs.size() == 0)  // none of the data dirs exist
  133.       throw new IOException(
  134.                             "All specified directories are not accessible or do not exist.");
  135.     // 2. Do transitions
  136.     // Each storage directory is treated individually.
  137.     // During sturtup some of them can upgrade or rollback 
  138.     // while others could be uptodate for the regular startup.
  139.     for(int idx = 0; idx < getNumStorageDirs(); idx++) {
  140.       doTransition(getStorageDir(idx), nsInfo, startOpt);
  141.       assert this.getLayoutVersion() == nsInfo.getLayoutVersion() :
  142.         "Data-node and name-node layout versions must be the same.";
  143.       assert this.getCTime() == nsInfo.getCTime() :
  144.         "Data-node and name-node CTimes must be the same.";
  145.     }
  146.     
  147.     // 3. Update all storages. Some of them might have just been formatted.
  148.     this.writeAll();
  149.   }
  150.   void format(StorageDirectory sd, NamespaceInfo nsInfo) throws IOException {
  151.     sd.clearDirectory(); // create directory
  152.     this.layoutVersion = FSConstants.LAYOUT_VERSION;
  153.     this.namespaceID = nsInfo.getNamespaceID();
  154.     this.cTime = 0;
  155.     // store storageID as it currently is
  156.     sd.write();
  157.   }
  158.   protected void setFields(Properties props, 
  159.                            StorageDirectory sd 
  160.                            ) throws IOException {
  161.     super.setFields(props, sd);
  162.     props.setProperty("storageID", storageID);
  163.   }
  164.   protected void getFields(Properties props, 
  165.                            StorageDirectory sd 
  166.                            ) throws IOException {
  167.     super.getFields(props, sd);
  168.     String ssid = props.getProperty("storageID");
  169.     if (ssid == null ||
  170.         !("".equals(storageID) || "".equals(ssid) ||
  171.           storageID.equals(ssid)))
  172.       throw new InconsistentFSStateException(sd.getRoot(),
  173.                                              "has incompatible storage Id.");
  174.     if ("".equals(storageID)) // update id only if it was empty
  175.       storageID = ssid;
  176.   }
  177.   public boolean isConversionNeeded(StorageDirectory sd) throws IOException {
  178.     File oldF = new File(sd.getRoot(), "storage");
  179.     if (!oldF.exists())
  180.       return false;
  181.     // check the layout version inside the storage file
  182.     // Lock and Read old storage file
  183.     RandomAccessFile oldFile = new RandomAccessFile(oldF, "rws");
  184.     FileLock oldLock = oldFile.getChannel().tryLock();
  185.     try {
  186.       oldFile.seek(0);
  187.       int oldVersion = oldFile.readInt();
  188.       if (oldVersion < LAST_PRE_UPGRADE_LAYOUT_VERSION)
  189.         return false;
  190.     } finally {
  191.       oldLock.release();
  192.       oldFile.close();
  193.     }
  194.     return true;
  195.   }
  196.   
  197.   /**
  198.    * Analize which and whether a transition of the fs state is required
  199.    * and perform it if necessary.
  200.    * 
  201.    * Rollback if previousLV >= LAYOUT_VERSION && prevCTime <= namenode.cTime
  202.    * Upgrade if this.LV > LAYOUT_VERSION || this.cTime < namenode.cTime
  203.    * Regular startup if this.LV = LAYOUT_VERSION && this.cTime = namenode.cTime
  204.    * 
  205.    * @param sd  storage directory
  206.    * @param nsInfo  namespace info
  207.    * @param startOpt  startup option
  208.    * @throws IOException
  209.    */
  210.   private void doTransition( StorageDirectory sd, 
  211.                              NamespaceInfo nsInfo, 
  212.                              StartupOption startOpt
  213.                              ) throws IOException {
  214.     if (startOpt == StartupOption.ROLLBACK)
  215.       doRollback(sd, nsInfo); // rollback if applicable
  216.     sd.read();
  217.     checkVersionUpgradable(this.layoutVersion);
  218.     assert this.layoutVersion >= FSConstants.LAYOUT_VERSION :
  219.       "Future version is not allowed";
  220.     if (getNamespaceID() != nsInfo.getNamespaceID())
  221.       throw new IOException(
  222.                             "Incompatible namespaceIDs in " + sd.getRoot().getCanonicalPath()
  223.                             + ": namenode namespaceID = " + nsInfo.getNamespaceID() 
  224.                             + "; datanode namespaceID = " + getNamespaceID());
  225.     if (this.layoutVersion == FSConstants.LAYOUT_VERSION 
  226.         && this.cTime == nsInfo.getCTime())
  227.       return; // regular startup
  228.     // verify necessity of a distributed upgrade
  229.     verifyDistributedUpgradeProgress(nsInfo);
  230.     if (this.layoutVersion > FSConstants.LAYOUT_VERSION
  231.         || this.cTime < nsInfo.getCTime()) {
  232.       doUpgrade(sd, nsInfo);  // upgrade
  233.       return;
  234.     }
  235.     // layoutVersion == LAYOUT_VERSION && this.cTime > nsInfo.cTime
  236.     // must shutdown
  237.     throw new IOException("Datanode state: LV = " + this.getLayoutVersion() 
  238.                           + " CTime = " + this.getCTime() 
  239.                           + " is newer than the namespace state: LV = "
  240.                           + nsInfo.getLayoutVersion() 
  241.                           + " CTime = " + nsInfo.getCTime());
  242.   }
  243.   /**
  244.    * Move current storage into a backup directory,
  245.    * and hardlink all its blocks into the new current directory.
  246.    * 
  247.    * @param sd  storage directory
  248.    * @throws IOException
  249.    */
  250.   void doUpgrade(StorageDirectory sd,
  251.                  NamespaceInfo nsInfo
  252.                  ) throws IOException {
  253.     LOG.info("Upgrading storage directory " + sd.getRoot()
  254.              + ".n   old LV = " + this.getLayoutVersion()
  255.              + "; old CTime = " + this.getCTime()
  256.              + ".n   new LV = " + nsInfo.getLayoutVersion()
  257.              + "; new CTime = " + nsInfo.getCTime());
  258.     File curDir = sd.getCurrentDir();
  259.     File prevDir = sd.getPreviousDir();
  260.     assert curDir.exists() : "Current directory must exist.";
  261.     // delete previous dir before upgrading
  262.     if (prevDir.exists())
  263.       deleteDir(prevDir);
  264.     File tmpDir = sd.getPreviousTmp();
  265.     assert !tmpDir.exists() : "previous.tmp directory must not exist.";
  266.     // rename current to tmp
  267.     rename(curDir, tmpDir);
  268.     // hardlink blocks
  269.     linkBlocks(tmpDir, curDir, this.getLayoutVersion());
  270.     // write version file
  271.     this.layoutVersion = FSConstants.LAYOUT_VERSION;
  272.     assert this.namespaceID == nsInfo.getNamespaceID() :
  273.       "Data-node and name-node layout versions must be the same.";
  274.     this.cTime = nsInfo.getCTime();
  275.     sd.write();
  276.     // rename tmp to previous
  277.     rename(tmpDir, prevDir);
  278.     LOG.info("Upgrade of " + sd.getRoot()+ " is complete.");
  279.   }
  280.   void doRollback( StorageDirectory sd,
  281.                    NamespaceInfo nsInfo
  282.                    ) throws IOException {
  283.     File prevDir = sd.getPreviousDir();
  284.     // regular startup if previous dir does not exist
  285.     if (!prevDir.exists())
  286.       return;
  287.     DataStorage prevInfo = new DataStorage();
  288.     StorageDirectory prevSD = prevInfo.new StorageDirectory(sd.getRoot());
  289.     prevSD.read(prevSD.getPreviousVersionFile());
  290.     // We allow rollback to a state, which is either consistent with
  291.     // the namespace state or can be further upgraded to it.
  292.     if (!(prevInfo.getLayoutVersion() >= FSConstants.LAYOUT_VERSION
  293.           && prevInfo.getCTime() <= nsInfo.getCTime()))  // cannot rollback
  294.       throw new InconsistentFSStateException(prevSD.getRoot(),
  295.                                              "Cannot rollback to a newer state.nDatanode previous state: LV = " 
  296.                                              + prevInfo.getLayoutVersion() + " CTime = " + prevInfo.getCTime() 
  297.                                              + " is newer than the namespace state: LV = "
  298.                                              + nsInfo.getLayoutVersion() + " CTime = " + nsInfo.getCTime());
  299.     LOG.info("Rolling back storage directory " + sd.getRoot()
  300.              + ".n   target LV = " + nsInfo.getLayoutVersion()
  301.              + "; target CTime = " + nsInfo.getCTime());
  302.     File tmpDir = sd.getRemovedTmp();
  303.     assert !tmpDir.exists() : "removed.tmp directory must not exist.";
  304.     // rename current to tmp
  305.     File curDir = sd.getCurrentDir();
  306.     assert curDir.exists() : "Current directory must exist.";
  307.     rename(curDir, tmpDir);
  308.     // rename previous to current
  309.     rename(prevDir, curDir);
  310.     // delete tmp dir
  311.     deleteDir(tmpDir);
  312.     LOG.info("Rollback of " + sd.getRoot() + " is complete.");
  313.   }
  314.   void doFinalize(StorageDirectory sd) throws IOException {
  315.     File prevDir = sd.getPreviousDir();
  316.     if (!prevDir.exists())
  317.       return; // already discarded
  318.     final String dataDirPath = sd.getRoot().getCanonicalPath();
  319.     LOG.info("Finalizing upgrade for storage directory " 
  320.              + dataDirPath 
  321.              + ".n   cur LV = " + this.getLayoutVersion()
  322.              + "; cur CTime = " + this.getCTime());
  323.     assert sd.getCurrentDir().exists() : "Current directory must exist.";
  324.     final File tmpDir = sd.getFinalizedTmp();
  325.     // rename previous to tmp
  326.     rename(prevDir, tmpDir);
  327.     // delete tmp dir in a separate thread
  328.     new Daemon(new Runnable() {
  329.         public void run() {
  330.           try {
  331.             deleteDir(tmpDir);
  332.           } catch(IOException ex) {
  333.             LOG.error("Finalize upgrade for " + dataDirPath + " failed.", ex);
  334.           }
  335.           LOG.info("Finalize upgrade for " + dataDirPath + " is complete.");
  336.         }
  337.         public String toString() { return "Finalize " + dataDirPath; }
  338.       }).start();
  339.   }
  340.   
  341.   void finalizeUpgrade() throws IOException {
  342.     for (Iterator<StorageDirectory> it = storageDirs.iterator(); it.hasNext();) {
  343.       doFinalize(it.next());
  344.     }
  345.   }
  346.   
  347.   static void linkBlocks(File from, File to, int oldLV) throws IOException {
  348.     if (!from.isDirectory()) {
  349.       if (from.getName().startsWith(COPY_FILE_PREFIX)) {
  350.         IOUtils.copyBytes(new FileInputStream(from), 
  351.                           new FileOutputStream(to), 16*1024, true);
  352.       } else {
  353.         
  354.         //check if we are upgrading from pre-generation stamp version.
  355.         if (oldLV >= PRE_GENERATIONSTAMP_LAYOUT_VERSION) {
  356.           // Link to the new file name.
  357.           to = new File(convertMetatadataFileName(to.getAbsolutePath()));
  358.         }
  359.         
  360.         HardLink.createHardLink(from, to);
  361.       }
  362.       return;
  363.     }
  364.     // from is a directory
  365.     if (!to.mkdir())
  366.       throw new IOException("Cannot create directory " + to);
  367.     String[] blockNames = from.list(new java.io.FilenameFilter() {
  368.         public boolean accept(File dir, String name) {
  369.           return name.startsWith(BLOCK_SUBDIR_PREFIX) 
  370.             || name.startsWith(BLOCK_FILE_PREFIX)
  371.             || name.startsWith(COPY_FILE_PREFIX);
  372.         }
  373.       });
  374.     
  375.     for(int i = 0; i < blockNames.length; i++)
  376.       linkBlocks(new File(from, blockNames[i]), 
  377.                  new File(to, blockNames[i]), oldLV);
  378.   }
  379.   protected void corruptPreUpgradeStorage(File rootDir) throws IOException {
  380.     File oldF = new File(rootDir, "storage");
  381.     if (oldF.exists())
  382.       return;
  383.     // recreate old storage file to let pre-upgrade versions fail
  384.     if (!oldF.createNewFile())
  385.       throw new IOException("Cannot create file " + oldF);
  386.     RandomAccessFile oldFile = new RandomAccessFile(oldF, "rws");
  387.     // write new version into old storage file
  388.     try {
  389.       writeCorruptedData(oldFile);
  390.     } finally {
  391.       oldFile.close();
  392.     }
  393.   }
  394.   private void verifyDistributedUpgradeProgress(
  395.                   NamespaceInfo nsInfo
  396.                 ) throws IOException {
  397.     UpgradeManagerDatanode um = DataNode.getDataNode().upgradeManager;
  398.     assert um != null : "DataNode.upgradeManager is null.";
  399.     um.setUpgradeState(false, getLayoutVersion());
  400.     um.initializeUpgrade(nsInfo);
  401.   }
  402.   
  403.   private static final Pattern PRE_GENSTAMP_META_FILE_PATTERN = 
  404.     Pattern.compile("(.*blk_[-]*\d+)\.meta$");
  405.   /**
  406.    * This is invoked on target file names when upgrading from pre generation 
  407.    * stamp version (version -13) to correct the metatadata file name.
  408.    * @param oldFileName
  409.    * @return the new metadata file name with the default generation stamp.
  410.    */
  411.   private static String convertMetatadataFileName(String oldFileName) {
  412.     Matcher matcher = PRE_GENSTAMP_META_FILE_PATTERN.matcher(oldFileName); 
  413.     if (matcher.matches()) {
  414.       //return the current metadata file name
  415.       return FSDataset.getMetaFileName(matcher.group(1),
  416.                                        Block.GRANDFATHER_GENERATION_STAMP); 
  417.     }
  418.     return oldFileName;
  419.   }
  420. }