FSImage.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:56k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfs.server.namenode;
  19. import java.io.BufferedInputStream;
  20. import java.io.BufferedOutputStream;
  21. import java.io.DataInput;
  22. import java.io.DataInputStream;
  23. import java.io.DataOutput;
  24. import java.io.DataOutputStream;
  25. import java.io.File;
  26. import java.io.FileInputStream;
  27. import java.io.FileOutputStream;
  28. import java.io.IOException;
  29. import java.io.RandomAccessFile;
  30. import java.text.SimpleDateFormat;
  31. import java.util.ArrayList;
  32. import java.util.Collection;
  33. import java.util.Date;
  34. import java.util.Iterator;
  35. import java.util.List;
  36. import java.util.Properties;
  37. import java.util.Random;
  38. import java.util.Map;
  39. import java.util.HashMap;
  40. import java.lang.Math;
  41. import java.nio.ByteBuffer;
  42. import org.apache.hadoop.fs.Path;
  43. import org.apache.hadoop.fs.permission.PermissionStatus;
  44. import org.apache.hadoop.fs.permission.FsPermission;
  45. import org.apache.hadoop.conf.Configuration;
  46. import org.apache.hadoop.hdfs.protocol.Block;
  47. import org.apache.hadoop.hdfs.protocol.DatanodeID;
  48. import org.apache.hadoop.hdfs.protocol.FSConstants;
  49. import org.apache.hadoop.hdfs.server.common.HdfsConstants.NodeType;
  50. import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
  51. import org.apache.hadoop.io.UTF8;
  52. import org.apache.hadoop.io.Writable;
  53. import org.apache.hadoop.hdfs.server.namenode.NameNode;
  54. import org.apache.hadoop.hdfs.server.namenode.BlocksMap.BlockInfo;
  55. import org.apache.hadoop.hdfs.server.namenode.FSEditLog.EditLogFileInputStream;
  56. import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
  57. import org.apache.hadoop.hdfs.server.common.Storage;
  58. import org.apache.hadoop.hdfs.server.common.StorageInfo;
  59. import org.apache.hadoop.hdfs.server.common.UpgradeManager;
  60. /**
  61.  * FSImage handles checkpointing and logging of the namespace edits.
  62.  * 
  63.  */
  64. public class FSImage extends Storage {
  65.   private static final SimpleDateFormat DATE_FORM =
  66.     new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  67.   //
  68.   // The filenames used for storing the images
  69.   //
  70.   enum NameNodeFile {
  71.     IMAGE     ("fsimage"),
  72.     TIME      ("fstime"),
  73.     EDITS     ("edits"),
  74.     IMAGE_NEW ("fsimage.ckpt"),
  75.     EDITS_NEW ("edits.new");
  76.     
  77.     private String fileName = null;
  78.     private NameNodeFile(String name) {this.fileName = name;}
  79.     String getName() {return fileName;}
  80.   }
  81.   // checkpoint states
  82.   enum CheckpointStates{START, ROLLED_EDITS, UPLOAD_START, UPLOAD_DONE; }
  83.   /**
  84.    * Implementation of StorageDirType specific to namenode storage
  85.    * A Storage directory could be of type IMAGE which stores only fsimage,
  86.    * or of type EDITS which stores edits or of type IMAGE_AND_EDITS which 
  87.    * stores both fsimage and edits.
  88.    */
  89.   static enum NameNodeDirType implements StorageDirType {
  90.     UNDEFINED,
  91.     IMAGE,
  92.     EDITS,
  93.     IMAGE_AND_EDITS;
  94.     
  95.     public StorageDirType getStorageDirType() {
  96.       return this;
  97.     }
  98.     
  99.     public boolean isOfType(StorageDirType type) {
  100.       if ((this == IMAGE_AND_EDITS) && (type == IMAGE || type == EDITS))
  101.         return true;
  102.       return this == type;
  103.     }
  104.   }
  105.   
  106.   protected long checkpointTime = -1L;
  107.   private FSEditLog editLog = null;
  108.   private boolean isUpgradeFinalized = false;
  109.   
  110.   /**
  111.    * list of failed (and thus removed) storages
  112.    */
  113.   protected List<StorageDirectory> removedStorageDirs = new ArrayList<StorageDirectory>();
  114.   
  115.   /**
  116.    * Directories for importing an image from a checkpoint.
  117.    */
  118.   private Collection<File> checkpointDirs;
  119.   private Collection<File> checkpointEditsDirs;
  120.   /**
  121.    * Can fs-image be rolled?
  122.    */
  123.   volatile private CheckpointStates ckptState = FSImage.CheckpointStates.START; 
  124.   /**
  125.    * Used for saving the image to disk
  126.    */
  127.   static private final FsPermission FILE_PERM = new FsPermission((short)0);
  128.   static private final byte[] PATH_SEPARATOR = INode.string2Bytes(Path.SEPARATOR);
  129.   /**
  130.    */
  131.   FSImage() {
  132.     super(NodeType.NAME_NODE);
  133.     this.editLog = new FSEditLog(this);
  134.   }
  135.   /**
  136.    */
  137.   FSImage(Collection<File> fsDirs, Collection<File> fsEditsDirs) 
  138.     throws IOException {
  139.     this();
  140.     setStorageDirectories(fsDirs, fsEditsDirs);
  141.   }
  142.   public FSImage(StorageInfo storageInfo) {
  143.     super(NodeType.NAME_NODE, storageInfo);
  144.   }
  145.   /**
  146.    * Represents an Image (image and edit file).
  147.    */
  148.   public FSImage(File imageDir) throws IOException {
  149.     this();
  150.     ArrayList<File> dirs = new ArrayList<File>(1);
  151.     ArrayList<File> editsDirs = new ArrayList<File>(1);
  152.     dirs.add(imageDir);
  153.     editsDirs.add(imageDir);
  154.     setStorageDirectories(dirs, editsDirs);
  155.   }
  156.   
  157.   void setStorageDirectories(Collection<File> fsNameDirs,
  158.                         Collection<File> fsEditsDirs
  159.                              ) throws IOException {
  160.     this.storageDirs = new ArrayList<StorageDirectory>();
  161.     this.removedStorageDirs = new ArrayList<StorageDirectory>();
  162.    // Add all name dirs with appropriate NameNodeDirType 
  163.     for (File dirName : fsNameDirs) {
  164.       boolean isAlsoEdits = false;
  165.       for (File editsDirName : fsEditsDirs) {
  166.         if (editsDirName.compareTo(dirName) == 0) {
  167.           isAlsoEdits = true;
  168.           fsEditsDirs.remove(editsDirName);
  169.           break;
  170.         }
  171.       }
  172.       NameNodeDirType dirType = (isAlsoEdits) ?
  173.                           NameNodeDirType.IMAGE_AND_EDITS :
  174.                           NameNodeDirType.IMAGE;
  175.       this.addStorageDir(new StorageDirectory(dirName, dirType));
  176.     }
  177.     
  178.     // Add edits dirs if they are different from name dirs
  179.     for (File dirName : fsEditsDirs) {
  180.       this.addStorageDir(new StorageDirectory(dirName, 
  181.                     NameNodeDirType.EDITS));
  182.     }
  183.   }
  184.   void setCheckpointDirectories(Collection<File> dirs,
  185.                                 Collection<File> editsDirs) {
  186.     checkpointDirs = dirs;
  187.     checkpointEditsDirs = editsDirs;
  188.   }
  189.   
  190.   static File getImageFile(StorageDirectory sd, NameNodeFile type) {
  191.     return new File(sd.getCurrentDir(), type.getName());
  192.   }
  193.   
  194.   List<StorageDirectory> getRemovedStorageDirs() {
  195.   return this.removedStorageDirs;
  196.   }
  197.   
  198.   File getEditFile(StorageDirectory sd) {
  199.     return getImageFile(sd, NameNodeFile.EDITS);
  200.   }
  201.   
  202.   File getEditNewFile(StorageDirectory sd) {
  203.     return getImageFile(sd, NameNodeFile.EDITS_NEW);
  204.   }
  205.   File[] getFileNames(NameNodeFile type, NameNodeDirType dirType) {
  206.     ArrayList<File> list = new ArrayList<File>();
  207.     Iterator<StorageDirectory> it = (dirType == null) ? dirIterator() :
  208.                                     dirIterator(dirType);
  209.     for ( ;it.hasNext(); ) {
  210.       list.add(getImageFile(it.next(), type));
  211.     }
  212.     return list.toArray(new File[list.size()]);
  213.   }
  214.   File[] getImageFiles() {
  215.     return getFileNames(NameNodeFile.IMAGE, NameNodeDirType.IMAGE);
  216.   }
  217.   File[] getEditsFiles() {
  218.     return getFileNames(NameNodeFile.EDITS, NameNodeDirType.EDITS);
  219.   }
  220.   File[] getTimeFiles() {
  221.     return getFileNames(NameNodeFile.TIME, null);
  222.   }
  223.   /**
  224.    * Analyze storage directories.
  225.    * Recover from previous transitions if required. 
  226.    * Perform fs state transition if necessary depending on the namespace info.
  227.    * Read storage info. 
  228.    * 
  229.    * @param dataDirs
  230.    * @param startOpt startup option
  231.    * @throws IOException
  232.    * @return true if the image needs to be saved or false otherwise
  233.    */
  234.   boolean recoverTransitionRead(Collection<File> dataDirs,
  235.                              Collection<File> editsDirs,
  236.                                 StartupOption startOpt
  237.                                 ) throws IOException {
  238.     assert startOpt != StartupOption.FORMAT : 
  239.       "NameNode formatting should be performed before reading the image";
  240.     
  241.     // none of the data dirs exist
  242.     if (dataDirs.size() == 0 || editsDirs.size() == 0)  
  243.       throw new IOException(
  244.         "All specified directories are not accessible or do not exist.");
  245.     
  246.     if(startOpt == StartupOption.IMPORT 
  247.         && (checkpointDirs == null || checkpointDirs.isEmpty()))
  248.       throw new IOException("Cannot import image from a checkpoint. "
  249.                           + ""fs.checkpoint.dir" is not set." );
  250.     if(startOpt == StartupOption.IMPORT 
  251.         && (checkpointEditsDirs == null || checkpointEditsDirs.isEmpty()))
  252.       throw new IOException("Cannot import image from a checkpoint. "
  253.                           + ""fs.checkpoint.edits.dir" is not set." );
  254.     
  255.     setStorageDirectories(dataDirs, editsDirs);
  256.     // 1. For each data directory calculate its state and 
  257.     // check whether all is consistent before transitioning.
  258.     Map<StorageDirectory, StorageState> dataDirStates = 
  259.              new HashMap<StorageDirectory, StorageState>();
  260.     boolean isFormatted = false;
  261.     for (Iterator<StorageDirectory> it = 
  262.                       dirIterator(); it.hasNext();) {
  263.       StorageDirectory sd = it.next();
  264.       StorageState curState;
  265.       try {
  266.         curState = sd.analyzeStorage(startOpt);
  267.         // sd is locked but not opened
  268.         switch(curState) {
  269.         case NON_EXISTENT:
  270.           // name-node fails if any of the configured storage dirs are missing
  271.           throw new InconsistentFSStateException(sd.getRoot(),
  272.                                                  "storage directory does not exist or is not accessible.");
  273.         case NOT_FORMATTED:
  274.           break;
  275.         case NORMAL:
  276.           break;
  277.         default:  // recovery is possible
  278.           sd.doRecover(curState);      
  279.         }
  280.         if (curState != StorageState.NOT_FORMATTED 
  281.             && startOpt != StartupOption.ROLLBACK) {
  282.           sd.read(); // read and verify consistency with other directories
  283.           isFormatted = true;
  284.         }
  285.         if (startOpt == StartupOption.IMPORT && isFormatted)
  286.           // import of a checkpoint is allowed only into empty image directories
  287.           throw new IOException("Cannot import image from a checkpoint. " 
  288.               + " NameNode already contains an image in " + sd.getRoot());
  289.       } catch (IOException ioe) {
  290.         sd.unlock();
  291.         throw ioe;
  292.       }
  293.       dataDirStates.put(sd,curState);
  294.     }
  295.     
  296.     if (!isFormatted && startOpt != StartupOption.ROLLBACK 
  297.                      && startOpt != StartupOption.IMPORT)
  298.       throw new IOException("NameNode is not formatted.");
  299.     if (layoutVersion < LAST_PRE_UPGRADE_LAYOUT_VERSION) {
  300.       checkVersionUpgradable(layoutVersion);
  301.     }
  302.     if (startOpt != StartupOption.UPGRADE
  303.           && layoutVersion < LAST_PRE_UPGRADE_LAYOUT_VERSION
  304.           && layoutVersion != FSConstants.LAYOUT_VERSION)
  305.         throw new IOException(
  306.                           "nFile system image contains an old layout version " + layoutVersion
  307.                           + ".nAn upgrade to version " + FSConstants.LAYOUT_VERSION
  308.                           + " is required.nPlease restart NameNode with -upgrade option.");
  309.     // check whether distributed upgrade is reguired and/or should be continued
  310.     verifyDistributedUpgradeProgress(startOpt);
  311.     // 2. Format unformatted dirs.
  312.     this.checkpointTime = 0L;
  313.     for (Iterator<StorageDirectory> it = 
  314.                      dirIterator(); it.hasNext();) {
  315.       StorageDirectory sd = it.next();
  316.       StorageState curState = dataDirStates.get(sd);
  317.       switch(curState) {
  318.       case NON_EXISTENT:
  319.         assert false : StorageState.NON_EXISTENT + " state cannot be here";
  320.       case NOT_FORMATTED:
  321.         LOG.info("Storage directory " + sd.getRoot() + " is not formatted.");
  322.         LOG.info("Formatting ...");
  323.         sd.clearDirectory(); // create empty currrent dir
  324.         break;
  325.       default:
  326.         break;
  327.       }
  328.     }
  329.     // 3. Do transitions
  330.     switch(startOpt) {
  331.     case UPGRADE:
  332.       doUpgrade();
  333.       return false; // upgrade saved image already
  334.     case IMPORT:
  335.       doImportCheckpoint();
  336.       return true;
  337.     case ROLLBACK:
  338.       doRollback();
  339.       break;
  340.     case REGULAR:
  341.       // just load the image
  342.     }
  343.     return loadFSImage();
  344.   }
  345.   private void doUpgrade() throws IOException {
  346.     if(getDistributedUpgradeState()) {
  347.       // only distributed upgrade need to continue
  348.       // don't do version upgrade
  349.       this.loadFSImage();
  350.       initializeDistributedUpgrade();
  351.       return;
  352.     }
  353.     // Upgrade is allowed only if there are 
  354.     // no previous fs states in any of the directories
  355.     for (Iterator<StorageDirectory> it = 
  356.                            dirIterator(); it.hasNext();) {
  357.       StorageDirectory sd = it.next();
  358.       if (sd.getPreviousDir().exists())
  359.         throw new InconsistentFSStateException(sd.getRoot(),
  360.                                                "previous fs state should not exist during upgrade. "
  361.                                                + "Finalize or rollback first.");
  362.     }
  363.     // load the latest image
  364.     this.loadFSImage();
  365.     // Do upgrade for each directory
  366.     long oldCTime = this.getCTime();
  367.     this.cTime = FSNamesystem.now();  // generate new cTime for the state
  368.     int oldLV = this.getLayoutVersion();
  369.     this.layoutVersion = FSConstants.LAYOUT_VERSION;
  370.     this.checkpointTime = FSNamesystem.now();
  371.     for (Iterator<StorageDirectory> it = 
  372.                            dirIterator(); it.hasNext();) {
  373.       StorageDirectory sd = it.next();
  374.       LOG.info("Upgrading image directory " + sd.getRoot()
  375.                + ".n   old LV = " + oldLV
  376.                + "; old CTime = " + oldCTime
  377.                + ".n   new LV = " + this.getLayoutVersion()
  378.                + "; new CTime = " + this.getCTime());
  379.       File curDir = sd.getCurrentDir();
  380.       File prevDir = sd.getPreviousDir();
  381.       File tmpDir = sd.getPreviousTmp();
  382.       assert curDir.exists() : "Current directory must exist.";
  383.       assert !prevDir.exists() : "prvious directory must not exist.";
  384.       assert !tmpDir.exists() : "prvious.tmp directory must not exist.";
  385.       // rename current to tmp
  386.       rename(curDir, tmpDir);
  387.       // save new image
  388.       if (!curDir.mkdir())
  389.         throw new IOException("Cannot create directory " + curDir);
  390.       saveFSImage(getImageFile(sd, NameNodeFile.IMAGE));
  391.       editLog.createEditLogFile(getImageFile(sd, NameNodeFile.EDITS));
  392.       // write version and time files
  393.       sd.write();
  394.       // rename tmp to previous
  395.       rename(tmpDir, prevDir);
  396.       isUpgradeFinalized = false;
  397.       LOG.info("Upgrade of " + sd.getRoot() + " is complete.");
  398.     }
  399.     initializeDistributedUpgrade();
  400.     editLog.open();
  401.   }
  402.   private void doRollback() throws IOException {
  403.     // Rollback is allowed only if there is 
  404.     // a previous fs states in at least one of the storage directories.
  405.     // Directories that don't have previous state do not rollback
  406.     boolean canRollback = false;
  407.     FSImage prevState = new FSImage();
  408.     prevState.layoutVersion = FSConstants.LAYOUT_VERSION;
  409.     for (Iterator<StorageDirectory> it = 
  410.                        dirIterator(); it.hasNext();) {
  411.       StorageDirectory sd = it.next();
  412.       File prevDir = sd.getPreviousDir();
  413.       if (!prevDir.exists()) {  // use current directory then
  414.         LOG.info("Storage directory " + sd.getRoot()
  415.                  + " does not contain previous fs state.");
  416.         sd.read(); // read and verify consistency with other directories
  417.         continue;
  418.       }
  419.       StorageDirectory sdPrev = prevState.new StorageDirectory(sd.getRoot());
  420.       sdPrev.read(sdPrev.getPreviousVersionFile());  // read and verify consistency of the prev dir
  421.       canRollback = true;
  422.     }
  423.     if (!canRollback)
  424.       throw new IOException("Cannot rollback. " 
  425.                             + "None of the storage directories contain previous fs state.");
  426.     // Now that we know all directories are going to be consistent
  427.     // Do rollback for each directory containing previous state
  428.     for (Iterator<StorageDirectory> it = 
  429.                           dirIterator(); it.hasNext();) {
  430.       StorageDirectory sd = it.next();
  431.       File prevDir = sd.getPreviousDir();
  432.       if (!prevDir.exists())
  433.         continue;
  434.       LOG.info("Rolling back storage directory " + sd.getRoot()
  435.                + ".n   new LV = " + prevState.getLayoutVersion()
  436.                + "; new CTime = " + prevState.getCTime());
  437.       File tmpDir = sd.getRemovedTmp();
  438.       assert !tmpDir.exists() : "removed.tmp directory must not exist.";
  439.       // rename current to tmp
  440.       File curDir = sd.getCurrentDir();
  441.       assert curDir.exists() : "Current directory must exist.";
  442.       rename(curDir, tmpDir);
  443.       // rename previous to current
  444.       rename(prevDir, curDir);
  445.       // delete tmp dir
  446.       deleteDir(tmpDir);
  447.       LOG.info("Rollback of " + sd.getRoot()+ " is complete.");
  448.     }
  449.     isUpgradeFinalized = true;
  450.     // check whether name-node can start in regular mode
  451.     verifyDistributedUpgradeProgress(StartupOption.REGULAR);
  452.   }
  453.   private void doFinalize(StorageDirectory sd) throws IOException {
  454.     File prevDir = sd.getPreviousDir();
  455.     if (!prevDir.exists()) { // already discarded
  456.       LOG.info("Directory " + prevDir + " does not exist.");
  457.       LOG.info("Finalize upgrade for " + sd.getRoot()+ " is not required.");
  458.       return;
  459.     }
  460.     LOG.info("Finalizing upgrade for storage directory " 
  461.              + sd.getRoot() + "."
  462.              + (getLayoutVersion()==0 ? "" :
  463.                    "n   cur LV = " + this.getLayoutVersion()
  464.                    + "; cur CTime = " + this.getCTime()));
  465.     assert sd.getCurrentDir().exists() : "Current directory must exist.";
  466.     final File tmpDir = sd.getFinalizedTmp();
  467.     // rename previous to tmp and remove
  468.     rename(prevDir, tmpDir);
  469.     deleteDir(tmpDir);
  470.     isUpgradeFinalized = true;
  471.     LOG.info("Finalize upgrade for " + sd.getRoot()+ " is complete.");
  472.   }
  473.   /**
  474.    * Load image from a checkpoint directory and save it into the current one.
  475.    * @throws IOException
  476.    */
  477.   void doImportCheckpoint() throws IOException {
  478.     FSImage ckptImage = new FSImage();
  479.     FSNamesystem fsNamesys = FSNamesystem.getFSNamesystem();
  480.     // replace real image with the checkpoint image
  481.     FSImage realImage = fsNamesys.getFSImage();
  482.     assert realImage == this;
  483.     fsNamesys.dir.fsImage = ckptImage;
  484.     // load from the checkpoint dirs
  485.     try {
  486.       ckptImage.recoverTransitionRead(checkpointDirs, checkpointEditsDirs,
  487.                                               StartupOption.REGULAR);
  488.     } finally {
  489.       ckptImage.close();
  490.     }
  491.     // return back the real image
  492.     realImage.setStorageInfo(ckptImage);
  493.     fsNamesys.dir.fsImage = realImage;
  494.     // and save it
  495.     saveFSImage();
  496.   }
  497.   void finalizeUpgrade() throws IOException {
  498.     for (Iterator<StorageDirectory> it = 
  499.                           dirIterator(); it.hasNext();) {
  500.       doFinalize(it.next());
  501.     }
  502.   }
  503.   boolean isUpgradeFinalized() {
  504.     return isUpgradeFinalized;
  505.   }
  506.   protected void getFields(Properties props, 
  507.                            StorageDirectory sd 
  508.                            ) throws IOException {
  509.     super.getFields(props, sd);
  510.     if (layoutVersion == 0)
  511.       throw new IOException("NameNode directory " 
  512.                             + sd.getRoot() + " is not formatted.");
  513.     String sDUS, sDUV;
  514.     sDUS = props.getProperty("distributedUpgradeState"); 
  515.     sDUV = props.getProperty("distributedUpgradeVersion");
  516.     setDistributedUpgradeState(
  517.         sDUS == null? false : Boolean.parseBoolean(sDUS),
  518.         sDUV == null? getLayoutVersion() : Integer.parseInt(sDUV));
  519.     this.checkpointTime = readCheckpointTime(sd);
  520.   }
  521.   long readCheckpointTime(StorageDirectory sd) throws IOException {
  522.     File timeFile = getImageFile(sd, NameNodeFile.TIME);
  523.     long timeStamp = 0L;
  524.     if (timeFile.exists() && timeFile.canRead()) {
  525.       DataInputStream in = new DataInputStream(new FileInputStream(timeFile));
  526.       try {
  527.         timeStamp = in.readLong();
  528.       } finally {
  529.         in.close();
  530.       }
  531.     }
  532.     return timeStamp;
  533.   }
  534.   /**
  535.    * Write last checkpoint time and version file into the storage directory.
  536.    * 
  537.    * The version file should always be written last.
  538.    * Missing or corrupted version file indicates that 
  539.    * the checkpoint is not valid.
  540.    * 
  541.    * @param sd storage directory
  542.    * @throws IOException
  543.    */
  544.   protected void setFields(Properties props, 
  545.                            StorageDirectory sd 
  546.                            ) throws IOException {
  547.     super.setFields(props, sd);
  548.     boolean uState = getDistributedUpgradeState();
  549.     int uVersion = getDistributedUpgradeVersion();
  550.     if(uState && uVersion != getLayoutVersion()) {
  551.       props.setProperty("distributedUpgradeState", Boolean.toString(uState));
  552.       props.setProperty("distributedUpgradeVersion", Integer.toString(uVersion)); 
  553.     }
  554.     writeCheckpointTime(sd);
  555.   }
  556.   /**
  557.    * Write last checkpoint time into a separate file.
  558.    * 
  559.    * @param sd
  560.    * @throws IOException
  561.    */
  562.   void writeCheckpointTime(StorageDirectory sd) throws IOException {
  563.     if (checkpointTime < 0L)
  564.       return; // do not write negative time
  565.     File timeFile = getImageFile(sd, NameNodeFile.TIME);
  566.     if (timeFile.exists()) { timeFile.delete(); }
  567.     DataOutputStream out = new DataOutputStream(
  568.                                                 new FileOutputStream(timeFile));
  569.     try {
  570.       out.writeLong(checkpointTime);
  571.     } finally {
  572.       out.close();
  573.     }
  574.   }
  575.   /**
  576.    * Record new checkpoint time in order to
  577.    * distinguish healthy directories from the removed ones.
  578.    * If there is an error writing new checkpoint time, the corresponding
  579.    * storage directory is removed from the list.
  580.    */
  581.   void incrementCheckpointTime() {
  582.     this.checkpointTime++;
  583.     
  584.     // Write new checkpoint time in all storage directories
  585.     for(Iterator<StorageDirectory> it =
  586.                           dirIterator(); it.hasNext();) {
  587.       StorageDirectory sd = it.next();
  588.       try {
  589.         writeCheckpointTime(sd);
  590.       } catch(IOException e) {
  591.         // Close any edits stream associated with this dir and remove directory
  592.      if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS))
  593.        editLog.processIOError(sd);
  594.      
  595.    //add storage to the removed list
  596.      removedStorageDirs.add(sd);
  597.      it.remove();
  598.       }
  599.     }
  600.   }
  601.   
  602.   /**
  603.    * Remove storage directory given directory
  604.    */
  605.   
  606.   void processIOError(File dirName) {
  607.     for (Iterator<StorageDirectory> it = 
  608.       dirIterator(); it.hasNext();) {
  609.       StorageDirectory sd = it.next();
  610.       if (sd.getRoot().getPath().equals(dirName.getPath())) {
  611.         //add storage to the removed list
  612.         LOG.info(" removing " + dirName.getPath());
  613.         removedStorageDirs.add(sd);
  614.         it.remove();
  615.       }
  616.     }
  617.   }
  618.   public FSEditLog getEditLog() {
  619.     return editLog;
  620.   }
  621.   public boolean isConversionNeeded(StorageDirectory sd) throws IOException {
  622.     File oldImageDir = new File(sd.getRoot(), "image");
  623.     if (!oldImageDir.exists()) {
  624.       if(sd.getVersionFile().exists())
  625.         throw new InconsistentFSStateException(sd.getRoot(),
  626.             oldImageDir + " does not exist.");
  627.       return false;
  628.     }
  629.     // check the layout version inside the image file
  630.     File oldF = new File(oldImageDir, "fsimage");
  631.     RandomAccessFile oldFile = new RandomAccessFile(oldF, "rws");
  632.     try {
  633.       oldFile.seek(0);
  634.       int odlVersion = oldFile.readInt();
  635.       if (odlVersion < LAST_PRE_UPGRADE_LAYOUT_VERSION)
  636.         return false;
  637.     } finally {
  638.       oldFile.close();
  639.     }
  640.     return true;
  641.   }
  642.   
  643.   //
  644.   // Atomic move sequence, to recover from interrupted checkpoint
  645.   //
  646.   boolean recoverInterruptedCheckpoint(StorageDirectory nameSD,
  647.                                        StorageDirectory editsSD) 
  648.                                        throws IOException {
  649.     boolean needToSave = false;
  650.     File curFile = getImageFile(nameSD, NameNodeFile.IMAGE);
  651.     File ckptFile = getImageFile(nameSD, NameNodeFile.IMAGE_NEW);
  652.     //
  653.     // If we were in the midst of a checkpoint
  654.     //
  655.     if (ckptFile.exists()) {
  656.       needToSave = true;
  657.       if (getImageFile(editsSD, NameNodeFile.EDITS_NEW).exists()) {
  658.         //
  659.         // checkpointing migth have uploaded a new
  660.         // merged image, but we discard it here because we are
  661.         // not sure whether the entire merged image was uploaded
  662.         // before the namenode crashed.
  663.         //
  664.         if (!ckptFile.delete()) {
  665.           throw new IOException("Unable to delete " + ckptFile);
  666.         }
  667.       } else {
  668.         //
  669.         // checkpointing was in progress when the namenode
  670.         // shutdown. The fsimage.ckpt was created and the edits.new
  671.         // file was moved to edits. We complete that checkpoint by
  672.         // moving fsimage.new to fsimage. There is no need to 
  673.         // update the fstime file here. renameTo fails on Windows
  674.         // if the destination file already exists.
  675.         //
  676.         if (!ckptFile.renameTo(curFile)) {
  677.           if (!curFile.delete())
  678.             LOG.warn("Unable to delete dir " + curFile + " before rename");
  679.           if (!ckptFile.renameTo(curFile)) {
  680.             throw new IOException("Unable to rename " + ckptFile +
  681.                                   " to " + curFile);
  682.           }
  683.         }
  684.       }
  685.     }
  686.     return needToSave;
  687.   }
  688.   /**
  689.    * Choose latest image from one of the directories,
  690.    * load it and merge with the edits from that directory.
  691.    * 
  692.    * @return whether the image should be saved
  693.    * @throws IOException
  694.    */
  695.   boolean loadFSImage() throws IOException {
  696.     // Now check all curFiles and see which is the newest
  697.     long latestNameCheckpointTime = Long.MIN_VALUE;
  698.     long latestEditsCheckpointTime = Long.MIN_VALUE;
  699.     StorageDirectory latestNameSD = null;
  700.     StorageDirectory latestEditsSD = null;
  701.     boolean needToSave = false;
  702.     isUpgradeFinalized = true;
  703.     Collection<String> imageDirs = new ArrayList<String>();
  704.     Collection<String> editsDirs = new ArrayList<String>();
  705.     for (Iterator<StorageDirectory> it = dirIterator(); it.hasNext();) {
  706.       StorageDirectory sd = it.next();
  707.       if (!sd.getVersionFile().exists()) {
  708.         needToSave |= true;
  709.         continue; // some of them might have just been formatted
  710.       }
  711.       boolean imageExists = false, editsExists = false;
  712.       if (sd.getStorageDirType().isOfType(NameNodeDirType.IMAGE)) {
  713.         imageExists = getImageFile(sd, NameNodeFile.IMAGE).exists();
  714.         imageDirs.add(sd.getRoot().getCanonicalPath());
  715.       }
  716.       if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
  717.         editsExists = getImageFile(sd, NameNodeFile.EDITS).exists();
  718.         editsDirs.add(sd.getRoot().getCanonicalPath());
  719.       }
  720.       
  721.       checkpointTime = readCheckpointTime(sd);
  722.       if ((checkpointTime != Long.MIN_VALUE) && 
  723.           ((checkpointTime != latestNameCheckpointTime) || 
  724.            (checkpointTime != latestEditsCheckpointTime))) {
  725.         // Force saving of new image if checkpoint time
  726.         // is not same in all of the storage directories.
  727.         needToSave |= true;
  728.       }
  729.       if (sd.getStorageDirType().isOfType(NameNodeDirType.IMAGE) && 
  730.          (latestNameCheckpointTime < checkpointTime) && imageExists) {
  731.         latestNameCheckpointTime = checkpointTime;
  732.         latestNameSD = sd;
  733.       }
  734.       if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS) && 
  735.            (latestEditsCheckpointTime < checkpointTime) && editsExists) {
  736.         latestEditsCheckpointTime = checkpointTime;
  737.         latestEditsSD = sd;
  738.       }
  739.       if (checkpointTime <= 0L)
  740.         needToSave |= true;
  741.       // set finalized flag
  742.       isUpgradeFinalized = isUpgradeFinalized && !sd.getPreviousDir().exists();
  743.     }
  744.     // We should have at least one image and one edits dirs
  745.     if (latestNameSD == null)
  746.       throw new IOException("Image file is not found in " + imageDirs);
  747.     if (latestEditsSD == null)
  748.       throw new IOException("Edits file is not found in " + editsDirs);
  749.     // Make sure we are loading image and edits from same checkpoint
  750.     if (latestNameCheckpointTime != latestEditsCheckpointTime)
  751.       throw new IOException("Inconsitent storage detected, " +
  752.                             "name and edits storage do not match");
  753.     
  754.     // Recover from previous interrrupted checkpoint if any
  755.     needToSave |= recoverInterruptedCheckpoint(latestNameSD, latestEditsSD);
  756.     long startTime = FSNamesystem.now();
  757.     long imageSize = getImageFile(latestNameSD, NameNodeFile.IMAGE).length();
  758.     //
  759.     // Load in bits
  760.     //
  761.     latestNameSD.read();
  762.     needToSave |= loadFSImage(getImageFile(latestNameSD, NameNodeFile.IMAGE));
  763.     LOG.info("Image file of size " + imageSize + " loaded in " 
  764.         + (FSNamesystem.now() - startTime)/1000 + " seconds.");
  765.     
  766.     // Load latest edits
  767.     needToSave |= (loadFSEdits(latestEditsSD) > 0);
  768.     
  769.     return needToSave;
  770.   }
  771.   /**
  772.    * Load in the filesystem imagefrom file. It's a big list of
  773.    * filenames and blocks.  Return whether we should
  774.    * "re-save" and consolidate the edit-logs
  775.    */
  776.   boolean loadFSImage(File curFile) throws IOException {
  777.     assert this.getLayoutVersion() < 0 : "Negative layout version is expected.";
  778.     assert curFile != null : "curFile is null";
  779.     FSNamesystem fsNamesys = FSNamesystem.getFSNamesystem();
  780.     FSDirectory fsDir = fsNamesys.dir;
  781.     //
  782.     // Load in bits
  783.     //
  784.     boolean needToSave = true;
  785.     DataInputStream in = new DataInputStream(new BufferedInputStream(
  786.                               new FileInputStream(curFile)));
  787.     try {
  788.       /*
  789.        * Note: Remove any checks for version earlier than 
  790.        * Storage.LAST_UPGRADABLE_LAYOUT_VERSION since we should never get 
  791.        * to here with older images.
  792.        */
  793.       
  794.       /*
  795.        * TODO we need to change format of the image file
  796.        * it should not contain version and namespace fields
  797.        */
  798.       // read image version: first appeared in version -1
  799.       int imgVersion = in.readInt();
  800.       // read namespaceID: first appeared in version -2
  801.       this.namespaceID = in.readInt();
  802.       // read number of files
  803.       long numFiles;
  804.       if (imgVersion <= -16) {
  805.         numFiles = in.readLong();
  806.       } else {
  807.         numFiles = in.readInt();
  808.       }
  809.       this.layoutVersion = imgVersion;
  810.       // read in the last generation stamp.
  811.       if (imgVersion <= -12) {
  812.         long genstamp = in.readLong();
  813.         fsNamesys.setGenerationStamp(genstamp); 
  814.       }
  815.       needToSave = (imgVersion != FSConstants.LAYOUT_VERSION);
  816.       // read file info
  817.       short replication = FSNamesystem.getFSNamesystem().getDefaultReplication();
  818.       LOG.info("Number of files = " + numFiles);
  819.       String path;
  820.       String parentPath = "";
  821.       INodeDirectory parentINode = fsDir.rootDir;
  822.       for (long i = 0; i < numFiles; i++) {
  823.         long modificationTime = 0;
  824.         long atime = 0;
  825.         long blockSize = 0;
  826.         path = readString(in);
  827.         replication = in.readShort();
  828.         replication = FSEditLog.adjustReplication(replication);
  829.         modificationTime = in.readLong();
  830.         if (imgVersion <= -17) {
  831.           atime = in.readLong();
  832.         }
  833.         if (imgVersion <= -8) {
  834.           blockSize = in.readLong();
  835.         }
  836.         int numBlocks = in.readInt();
  837.         Block blocks[] = null;
  838.         // for older versions, a blocklist of size 0
  839.         // indicates a directory.
  840.         if ((-9 <= imgVersion && numBlocks > 0) ||
  841.             (imgVersion < -9 && numBlocks >= 0)) {
  842.           blocks = new Block[numBlocks];
  843.           for (int j = 0; j < numBlocks; j++) {
  844.             blocks[j] = new Block();
  845.             if (-14 < imgVersion) {
  846.               blocks[j].set(in.readLong(), in.readLong(), 
  847.                             Block.GRANDFATHER_GENERATION_STAMP);
  848.             } else {
  849.               blocks[j].readFields(in);
  850.             }
  851.           }
  852.         }
  853.         // Older versions of HDFS does not store the block size in inode.
  854.         // If the file has more than one block, use the size of the 
  855.         // first block as the blocksize. Otherwise use the default block size.
  856.         //
  857.         if (-8 <= imgVersion && blockSize == 0) {
  858.           if (numBlocks > 1) {
  859.             blockSize = blocks[0].getNumBytes();
  860.           } else {
  861.             long first = ((numBlocks == 1) ? blocks[0].getNumBytes(): 0);
  862.             blockSize = Math.max(fsNamesys.getDefaultBlockSize(), first);
  863.           }
  864.         }
  865.         
  866.         // get quota only when the node is a directory
  867.         long nsQuota = -1L;
  868.         if (imgVersion <= -16 && blocks == null) {
  869.           nsQuota = in.readLong();
  870.         }
  871.         long dsQuota = -1L;
  872.         if (imgVersion <= -18 && blocks == null) {
  873.           dsQuota = in.readLong();
  874.         }
  875.         
  876.         PermissionStatus permissions = fsNamesys.getUpgradePermission();
  877.         if (imgVersion <= -11) {
  878.           permissions = PermissionStatus.read(in);
  879.         }
  880.         if (path.length() == 0) { // it is the root
  881.           // update the root's attributes
  882.           if (nsQuota != -1 || dsQuota != -1) {
  883.             fsDir.rootDir.setQuota(nsQuota, dsQuota);
  884.           }
  885.           fsDir.rootDir.setModificationTime(modificationTime);
  886.           fsDir.rootDir.setPermissionStatus(permissions);
  887.           continue;
  888.         }
  889.         // check if the new inode belongs to the same parent
  890.         if(!isParent(path, parentPath)) {
  891.           parentINode = null;
  892.           parentPath = getParent(path);
  893.         }
  894.         // add new inode
  895.         parentINode = fsDir.addToParent(path, parentINode, permissions,
  896.                                         blocks, replication, modificationTime, 
  897.                                         atime, nsQuota, dsQuota, blockSize);
  898.       }
  899.       
  900.       // load datanode info
  901.       this.loadDatanodes(imgVersion, in);
  902.       // load Files Under Construction
  903.       this.loadFilesUnderConstruction(imgVersion, in, fsNamesys);
  904.       
  905.     } finally {
  906.       in.close();
  907.     }
  908.     
  909.     return needToSave;
  910.   }
  911.   /**
  912.    * Return string representing the parent of the given path.
  913.    */
  914.   String getParent(String path) {
  915.     return path.substring(0, path.lastIndexOf(Path.SEPARATOR));
  916.   }
  917.   private boolean isParent(String path, String parent) {
  918.     return parent != null && path != null
  919.           && path.indexOf(parent) == 0
  920.           && path.lastIndexOf(Path.SEPARATOR) == parent.length();
  921.   }
  922.   /**
  923.    * Load and merge edits from two edits files
  924.    * 
  925.    * @param sd storage directory
  926.    * @return number of edits loaded
  927.    * @throws IOException
  928.    */
  929.   int loadFSEdits(StorageDirectory sd) throws IOException {
  930.     int numEdits = 0;
  931.     EditLogFileInputStream edits = 
  932.       new EditLogFileInputStream(getImageFile(sd, NameNodeFile.EDITS));
  933.     numEdits = FSEditLog.loadFSEdits(edits);
  934.     edits.close();
  935.     File editsNew = getImageFile(sd, NameNodeFile.EDITS_NEW);
  936.     if (editsNew.exists() && editsNew.length() > 0) {
  937.       edits = new EditLogFileInputStream(editsNew);
  938.       numEdits += FSEditLog.loadFSEdits(edits);
  939.       edits.close();
  940.     }
  941.     // update the counts.
  942.     FSNamesystem.getFSNamesystem().dir.updateCountForINodeWithQuota();    
  943.     return numEdits;
  944.   }
  945.   /**
  946.    * Save the contents of the FS image to the file.
  947.    */
  948.   void saveFSImage(File newFile) throws IOException {
  949.     FSNamesystem fsNamesys = FSNamesystem.getFSNamesystem();
  950.     FSDirectory fsDir = fsNamesys.dir;
  951.     long startTime = FSNamesystem.now();
  952.     //
  953.     // Write out data
  954.     //
  955.     DataOutputStream out = new DataOutputStream(
  956.                                                 new BufferedOutputStream(
  957.                                                                          new FileOutputStream(newFile)));
  958.     try {
  959.       out.writeInt(FSConstants.LAYOUT_VERSION);
  960.       out.writeInt(namespaceID);
  961.       out.writeLong(fsDir.rootDir.numItemsInTree());
  962.       out.writeLong(fsNamesys.getGenerationStamp());
  963.       byte[] byteStore = new byte[4*FSConstants.MAX_PATH_LENGTH];
  964.       ByteBuffer strbuf = ByteBuffer.wrap(byteStore);
  965.       // save the root
  966.       saveINode2Image(strbuf, fsDir.rootDir, out);
  967.       // save the rest of the nodes
  968.       saveImage(strbuf, 0, fsDir.rootDir, out);
  969.       fsNamesys.saveFilesUnderConstruction(out);
  970.       strbuf = null;
  971.     } finally {
  972.       out.close();
  973.     }
  974.     LOG.info("Image file of size " + newFile.length() + " saved in " 
  975.         + (FSNamesystem.now() - startTime)/1000 + " seconds.");
  976.   }
  977.   /**
  978.    * Save the contents of the FS image
  979.    * and create empty edits.
  980.    */
  981.   public void saveFSImage() throws IOException {
  982.     editLog.createNewIfMissing();
  983.     for (Iterator<StorageDirectory> it = 
  984.                            dirIterator(); it.hasNext();) {
  985.       StorageDirectory sd = it.next();
  986.       NameNodeDirType dirType = (NameNodeDirType)sd.getStorageDirType();
  987.       if (dirType.isOfType(NameNodeDirType.IMAGE))
  988.         saveFSImage(getImageFile(sd, NameNodeFile.IMAGE_NEW));
  989.       if (dirType.isOfType(NameNodeDirType.EDITS)) {    
  990.         editLog.createEditLogFile(getImageFile(sd, NameNodeFile.EDITS));
  991.         File editsNew = getImageFile(sd, NameNodeFile.EDITS_NEW);
  992.         if (editsNew.exists()) 
  993.           editLog.createEditLogFile(editsNew);
  994.       }
  995.     }
  996.     ckptState = CheckpointStates.UPLOAD_DONE;
  997.     rollFSImage();
  998.   }
  999.   /**
  1000.    * Generate new namespaceID.
  1001.    * 
  1002.    * namespaceID is a persistent attribute of the namespace.
  1003.    * It is generated when the namenode is formatted and remains the same
  1004.    * during the life cycle of the namenode.
  1005.    * When a datanodes register they receive it as the registrationID,
  1006.    * which is checked every time the datanode is communicating with the 
  1007.    * namenode. Datanodes that do not 'know' the namespaceID are rejected.
  1008.    * 
  1009.    * @return new namespaceID
  1010.    */
  1011.   private int newNamespaceID() {
  1012.     Random r = new Random();
  1013.     r.setSeed(FSNamesystem.now());
  1014.     int newID = 0;
  1015.     while(newID == 0)
  1016.       newID = r.nextInt(0x7FFFFFFF);  // use 31 bits only
  1017.     return newID;
  1018.   }
  1019.   /** Create new dfs name directory.  Caution: this destroys all files
  1020.    * in this filesystem. */
  1021.   void format(StorageDirectory sd) throws IOException {
  1022.     sd.clearDirectory(); // create currrent dir
  1023.     sd.lock();
  1024.     try {
  1025.       NameNodeDirType dirType = (NameNodeDirType)sd.getStorageDirType();
  1026.       if (dirType.isOfType(NameNodeDirType.IMAGE))
  1027.         saveFSImage(getImageFile(sd, NameNodeFile.IMAGE));
  1028.       if (dirType.isOfType(NameNodeDirType.EDITS))
  1029.         editLog.createEditLogFile(getImageFile(sd, NameNodeFile.EDITS));
  1030.       sd.write();
  1031.     } finally {
  1032.       sd.unlock();
  1033.     }
  1034.     LOG.info("Storage directory " + sd.getRoot()
  1035.              + " has been successfully formatted.");
  1036.   }
  1037.   public void format() throws IOException {
  1038.     this.layoutVersion = FSConstants.LAYOUT_VERSION;
  1039.     this.namespaceID = newNamespaceID();
  1040.     this.cTime = 0L;
  1041.     this.checkpointTime = FSNamesystem.now();
  1042.     for (Iterator<StorageDirectory> it = 
  1043.                            dirIterator(); it.hasNext();) {
  1044.       StorageDirectory sd = it.next();
  1045.       format(sd);
  1046.     }
  1047.   }
  1048.   /*
  1049.    * Save one inode's attributes to the image.
  1050.    */
  1051.   private static void saveINode2Image(ByteBuffer name,
  1052.                                       INode node,
  1053.                                       DataOutputStream out) throws IOException {
  1054.     int nameLen = name.position();
  1055.     out.writeShort(nameLen);
  1056.     out.write(name.array(), name.arrayOffset(), nameLen);
  1057.     if (!node.isDirectory()) {  // write file inode
  1058.       INodeFile fileINode = (INodeFile)node;
  1059.       out.writeShort(fileINode.getReplication());
  1060.       out.writeLong(fileINode.getModificationTime());
  1061.       out.writeLong(fileINode.getAccessTime());
  1062.       out.writeLong(fileINode.getPreferredBlockSize());
  1063.       Block[] blocks = fileINode.getBlocks();
  1064.       out.writeInt(blocks.length);
  1065.       for (Block blk : blocks)
  1066.         blk.write(out);
  1067.       FILE_PERM.fromShort(fileINode.getFsPermissionShort());
  1068.       PermissionStatus.write(out, fileINode.getUserName(),
  1069.                              fileINode.getGroupName(),
  1070.                              FILE_PERM);
  1071.     } else {   // write directory inode
  1072.       out.writeShort(0);  // replication
  1073.       out.writeLong(node.getModificationTime());
  1074.       out.writeLong(0);   // access time
  1075.       out.writeLong(0);   // preferred block size
  1076.       out.writeInt(-1);    // # of blocks
  1077.       out.writeLong(node.getNsQuota());
  1078.       out.writeLong(node.getDsQuota());
  1079.       FILE_PERM.fromShort(node.getFsPermissionShort());
  1080.       PermissionStatus.write(out, node.getUserName(),
  1081.                              node.getGroupName(),
  1082.                              FILE_PERM);
  1083.     }
  1084.   }
  1085.   /**
  1086.    * Save file tree image starting from the given root.
  1087.    * This is a recursive procedure, which first saves all children of
  1088.    * a current directory and then moves inside the sub-directories.
  1089.    */
  1090.   private static void saveImage(ByteBuffer parentPrefix,
  1091.                                 int prefixLength,
  1092.                                 INodeDirectory current,
  1093.                                 DataOutputStream out) throws IOException {
  1094.     int newPrefixLength = prefixLength;
  1095.     if (current.getChildrenRaw() == null)
  1096.       return;
  1097.     for(INode child : current.getChildren()) {
  1098.       // print all children first
  1099.       parentPrefix.position(prefixLength);
  1100.       parentPrefix.put(PATH_SEPARATOR).put(child.getLocalNameBytes());
  1101.       saveINode2Image(parentPrefix, child, out);
  1102.     }
  1103.     for(INode child : current.getChildren()) {
  1104.       if(!child.isDirectory())
  1105.         continue;
  1106.       parentPrefix.position(prefixLength);
  1107.       parentPrefix.put(PATH_SEPARATOR).put(child.getLocalNameBytes());
  1108.       newPrefixLength = parentPrefix.position();
  1109.       saveImage(parentPrefix, newPrefixLength, (INodeDirectory)child, out);
  1110.     }
  1111.     parentPrefix.position(prefixLength);
  1112.   }
  1113.   void loadDatanodes(int version, DataInputStream in) throws IOException {
  1114.     if (version > -3) // pre datanode image version
  1115.       return;
  1116.     if (version <= -12) {
  1117.       return; // new versions do not store the datanodes any more.
  1118.     }
  1119.     int size = in.readInt();
  1120.     for(int i = 0; i < size; i++) {
  1121.       DatanodeImage nodeImage = new DatanodeImage();
  1122.       nodeImage.readFields(in);
  1123.       // We don't need to add these descriptors any more.
  1124.     }
  1125.   }
  1126.   private void loadFilesUnderConstruction(int version, DataInputStream in, 
  1127.                                   FSNamesystem fs) throws IOException {
  1128.     FSDirectory fsDir = fs.dir;
  1129.     if (version > -13) // pre lease image version
  1130.       return;
  1131.     int size = in.readInt();
  1132.     LOG.info("Number of files under construction = " + size);
  1133.     for (int i = 0; i < size; i++) {
  1134.       INodeFileUnderConstruction cons = readINodeUnderConstruction(in);
  1135.       // verify that file exists in namespace
  1136.       String path = cons.getLocalName();
  1137.       INode old = fsDir.getFileINode(path);
  1138.       if (old == null) {
  1139.         throw new IOException("Found lease for non-existent file " + path);
  1140.       }
  1141.       if (old.isDirectory()) {
  1142.         throw new IOException("Found lease for directory " + path);
  1143.       }
  1144.       INodeFile oldnode = (INodeFile) old;
  1145.       fsDir.replaceNode(path, oldnode, cons);
  1146.       fs.leaseManager.addLease(cons.clientName, path); 
  1147.     }
  1148.   }
  1149.   // Helper function that reads in an INodeUnderConstruction
  1150.   // from the input stream
  1151.   //
  1152.   static INodeFileUnderConstruction readINodeUnderConstruction(
  1153.                             DataInputStream in) throws IOException {
  1154.     byte[] name = readBytes(in);
  1155.     short blockReplication = in.readShort();
  1156.     long modificationTime = in.readLong();
  1157.     long preferredBlockSize = in.readLong();
  1158.     int numBlocks = in.readInt();
  1159.     BlockInfo[] blocks = new BlockInfo[numBlocks];
  1160.     Block blk = new Block();
  1161.     for (int i = 0; i < numBlocks; i++) {
  1162.       blk.readFields(in);
  1163.       blocks[i] = new BlockInfo(blk, blockReplication);
  1164.     }
  1165.     PermissionStatus perm = PermissionStatus.read(in);
  1166.     String clientName = readString(in);
  1167.     String clientMachine = readString(in);
  1168.     // These locations are not used at all
  1169.     int numLocs = in.readInt();
  1170.     DatanodeDescriptor[] locations = new DatanodeDescriptor[numLocs];
  1171.     for (int i = 0; i < numLocs; i++) {
  1172.       locations[i] = new DatanodeDescriptor();
  1173.       locations[i].readFields(in);
  1174.     }
  1175.     return new INodeFileUnderConstruction(name, 
  1176.                                           blockReplication, 
  1177.                                           modificationTime,
  1178.                                           preferredBlockSize,
  1179.                                           blocks,
  1180.                                           perm,
  1181.                                           clientName,
  1182.                                           clientMachine,
  1183.                                           null);
  1184.   }
  1185.   // Helper function that writes an INodeUnderConstruction
  1186.   // into the input stream
  1187.   //
  1188.   static void writeINodeUnderConstruction(DataOutputStream out,
  1189.                                            INodeFileUnderConstruction cons,
  1190.                                            String path) 
  1191.                                            throws IOException {
  1192.     writeString(path, out);
  1193.     out.writeShort(cons.getReplication());
  1194.     out.writeLong(cons.getModificationTime());
  1195.     out.writeLong(cons.getPreferredBlockSize());
  1196.     int nrBlocks = cons.getBlocks().length;
  1197.     out.writeInt(nrBlocks);
  1198.     for (int i = 0; i < nrBlocks; i++) {
  1199.       cons.getBlocks()[i].write(out);
  1200.     }
  1201.     cons.getPermissionStatus().write(out);
  1202.     writeString(cons.getClientName(), out);
  1203.     writeString(cons.getClientMachine(), out);
  1204.     out.writeInt(0); //  do not store locations of last block
  1205.   }
  1206.   /**
  1207.    * Moves fsimage.ckpt to fsImage and edits.new to edits
  1208.    * Reopens the new edits file.
  1209.    */
  1210.   void rollFSImage() throws IOException {
  1211.     if (ckptState != CheckpointStates.UPLOAD_DONE) {
  1212.       throw new IOException("Cannot roll fsImage before rolling edits log.");
  1213.     }
  1214.     //
  1215.     // First, verify that edits.new and fsimage.ckpt exists in all
  1216.     // checkpoint directories.
  1217.     //
  1218.     if (!editLog.existsNew()) {
  1219.       throw new IOException("New Edits file does not exist");
  1220.     }
  1221.     for (Iterator<StorageDirectory> it = 
  1222.                        dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
  1223.       StorageDirectory sd = it.next();
  1224.       File ckpt = getImageFile(sd, NameNodeFile.IMAGE_NEW);
  1225.       if (!ckpt.exists()) {
  1226.         throw new IOException("Checkpoint file " + ckpt +
  1227.                               " does not exist");
  1228.       }
  1229.     }
  1230.     editLog.purgeEditLog(); // renamed edits.new to edits
  1231.     //
  1232.     // Renames new image
  1233.     //
  1234.     for (Iterator<StorageDirectory> it = 
  1235.                        dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
  1236.       StorageDirectory sd = it.next();
  1237.       File ckpt = getImageFile(sd, NameNodeFile.IMAGE_NEW);
  1238.       File curFile = getImageFile(sd, NameNodeFile.IMAGE);
  1239.       // renameTo fails on Windows if the destination file 
  1240.       // already exists.
  1241.       if (!ckpt.renameTo(curFile)) {
  1242.         curFile.delete();
  1243.         if (!ckpt.renameTo(curFile)) {
  1244.           // Close edit stream, if this directory is also used for edits
  1245.           if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS))
  1246.             editLog.processIOError(sd);
  1247.         // add storage to the removed list
  1248.           removedStorageDirs.add(sd);
  1249.           it.remove();
  1250.         }
  1251.       }
  1252.     }
  1253.     //
  1254.     // Updates the fstime file on all directories (fsimage and edits)
  1255.     // and write version file
  1256.     //
  1257.     this.layoutVersion = FSConstants.LAYOUT_VERSION;
  1258.     this.checkpointTime = FSNamesystem.now();
  1259.     for (Iterator<StorageDirectory> it = 
  1260.                            dirIterator(); it.hasNext();) {
  1261.       StorageDirectory sd = it.next();
  1262.       // delete old edits if sd is the image only the directory
  1263.       if (!sd.getStorageDirType().isOfType(NameNodeDirType.EDITS)) {
  1264.         File editsFile = getImageFile(sd, NameNodeFile.EDITS);
  1265.         editsFile.delete();
  1266.       }
  1267.       // delete old fsimage if sd is the edits only the directory
  1268.       if (!sd.getStorageDirType().isOfType(NameNodeDirType.IMAGE)) {
  1269.         File imageFile = getImageFile(sd, NameNodeFile.IMAGE);
  1270.         imageFile.delete();
  1271.       }
  1272.       try {
  1273.         sd.write();
  1274.       } catch (IOException e) {
  1275.         LOG.error("Cannot write file " + sd.getRoot(), e);
  1276.         // Close edit stream, if this directory is also used for edits
  1277.         if (sd.getStorageDirType().isOfType(NameNodeDirType.EDITS))
  1278.           editLog.processIOError(sd);
  1279.       //add storage to the removed list
  1280.         removedStorageDirs.add(sd);
  1281.         it.remove();
  1282.       }
  1283.     }
  1284.     ckptState = FSImage.CheckpointStates.START;
  1285.   }
  1286.   CheckpointSignature rollEditLog() throws IOException {
  1287.     getEditLog().rollEditLog();
  1288.     ckptState = CheckpointStates.ROLLED_EDITS;
  1289.     return new CheckpointSignature(this);
  1290.   }
  1291.   /**
  1292.    * This is called just before a new checkpoint is uploaded to the
  1293.    * namenode.
  1294.    */
  1295.   void validateCheckpointUpload(CheckpointSignature sig) throws IOException {
  1296.     if (ckptState != CheckpointStates.ROLLED_EDITS) {
  1297.       throw new IOException("Namenode is not expecting an new image " +
  1298.                              ckptState);
  1299.     } 
  1300.     // verify token
  1301.     long modtime = getEditLog().getFsEditTime();
  1302.     if (sig.editsTime != modtime) {
  1303.       throw new IOException("Namenode has an edit log with timestamp of " +
  1304.                             DATE_FORM.format(new Date(modtime)) +
  1305.                             " but new checkpoint was created using editlog " +
  1306.                             " with timestamp " + 
  1307.                             DATE_FORM.format(new Date(sig.editsTime)) + 
  1308.                             ". Checkpoint Aborted.");
  1309.     }
  1310.     sig.validateStorageInfo(this);
  1311.     ckptState = FSImage.CheckpointStates.UPLOAD_START;
  1312.   }
  1313.   /**
  1314.    * This is called when a checkpoint upload finishes successfully.
  1315.    */
  1316.   synchronized void checkpointUploadDone() {
  1317.     ckptState = CheckpointStates.UPLOAD_DONE;
  1318.   }
  1319.   void close() throws IOException {
  1320.     getEditLog().close();
  1321.     unlockAll();
  1322.   }
  1323.   /**
  1324.    * Return the name of the image file.
  1325.    */
  1326.   File getFsImageName() {
  1327.   StorageDirectory sd = null;
  1328.   for (Iterator<StorageDirectory> it = 
  1329.               dirIterator(NameNodeDirType.IMAGE); it.hasNext();)
  1330.     sd = it.next();
  1331.   return getImageFile(sd, NameNodeFile.IMAGE); 
  1332.   }
  1333.   public File getFsEditName() throws IOException {
  1334.     return getEditLog().getFsEditName();
  1335.   }
  1336.   File getFsTimeName() {
  1337.     StorageDirectory sd = null;
  1338.     // NameNodeFile.TIME shoul be same on all directories
  1339.     for (Iterator<StorageDirectory> it = 
  1340.              dirIterator(); it.hasNext();)
  1341.       sd = it.next();
  1342.     return getImageFile(sd, NameNodeFile.TIME);
  1343.   }
  1344.   /**
  1345.    * Return the name of the image file that is uploaded by periodic
  1346.    * checkpointing.
  1347.    */
  1348.   File[] getFsImageNameCheckpoint() {
  1349.     ArrayList<File> list = new ArrayList<File>();
  1350.     for (Iterator<StorageDirectory> it = 
  1351.                  dirIterator(NameNodeDirType.IMAGE); it.hasNext();) {
  1352.       list.add(getImageFile(it.next(), NameNodeFile.IMAGE_NEW));
  1353.     }
  1354.     return list.toArray(new File[list.size()]);
  1355.   }
  1356.   /**
  1357.    * DatanodeImage is used to store persistent information
  1358.    * about datanodes into the fsImage.
  1359.    */
  1360.   static class DatanodeImage implements Writable {
  1361.     DatanodeDescriptor node = new DatanodeDescriptor();
  1362.     /////////////////////////////////////////////////
  1363.     // Writable
  1364.     /////////////////////////////////////////////////
  1365.     /**
  1366.      * Public method that serializes the information about a
  1367.      * Datanode to be stored in the fsImage.
  1368.      */
  1369.     public void write(DataOutput out) throws IOException {
  1370.       new DatanodeID(node).write(out);
  1371.       out.writeLong(node.getCapacity());
  1372.       out.writeLong(node.getRemaining());
  1373.       out.writeLong(node.getLastUpdate());
  1374.       out.writeInt(node.getXceiverCount());
  1375.     }
  1376.     /**
  1377.      * Public method that reads a serialized Datanode
  1378.      * from the fsImage.
  1379.      */
  1380.     public void readFields(DataInput in) throws IOException {
  1381.       DatanodeID id = new DatanodeID();
  1382.       id.readFields(in);
  1383.       long capacity = in.readLong();
  1384.       long remaining = in.readLong();
  1385.       long lastUpdate = in.readLong();
  1386.       int xceiverCount = in.readInt();
  1387.       // update the DatanodeDescriptor with the data we read in
  1388.       node.updateRegInfo(id);
  1389.       node.setStorageID(id.getStorageID());
  1390.       node.setCapacity(capacity);
  1391.       node.setRemaining(remaining);
  1392.       node.setLastUpdate(lastUpdate);
  1393.       node.setXceiverCount(xceiverCount);
  1394.     }
  1395.   }
  1396.   protected void corruptPreUpgradeStorage(File rootDir) throws IOException {
  1397.     File oldImageDir = new File(rootDir, "image");
  1398.     if (!oldImageDir.exists())
  1399.       if (!oldImageDir.mkdir())
  1400.         throw new IOException("Cannot create directory " + oldImageDir);
  1401.     File oldImage = new File(oldImageDir, "fsimage");
  1402.     if (!oldImage.exists())
  1403.       // recreate old image file to let pre-upgrade versions fail
  1404.       if (!oldImage.createNewFile())
  1405.         throw new IOException("Cannot create file " + oldImage);
  1406.     RandomAccessFile oldFile = new RandomAccessFile(oldImage, "rws");
  1407.     // write new version into old image file
  1408.     try {
  1409.       writeCorruptedData(oldFile);
  1410.     } finally {
  1411.       oldFile.close();
  1412.     }
  1413.   }
  1414.   private boolean getDistributedUpgradeState() {
  1415.     return FSNamesystem.getFSNamesystem().getDistributedUpgradeState();
  1416.   }
  1417.   private int getDistributedUpgradeVersion() {
  1418.     return FSNamesystem.getFSNamesystem().getDistributedUpgradeVersion();
  1419.   }
  1420.   private void setDistributedUpgradeState(boolean uState, int uVersion) {
  1421.     FSNamesystem.getFSNamesystem().upgradeManager.setUpgradeState(uState, uVersion);
  1422.   }
  1423.   private void verifyDistributedUpgradeProgress(StartupOption startOpt
  1424.                                                 ) throws IOException {
  1425.     if(startOpt == StartupOption.ROLLBACK || startOpt == StartupOption.IMPORT)
  1426.       return;
  1427.     UpgradeManager um = FSNamesystem.getFSNamesystem().upgradeManager;
  1428.     assert um != null : "FSNameSystem.upgradeManager is null.";
  1429.     if(startOpt != StartupOption.UPGRADE) {
  1430.       if(um.getUpgradeState())
  1431.         throw new IOException(
  1432.                     "n   Previous distributed upgrade was not completed. "
  1433.                   + "n   Please restart NameNode with -upgrade option.");
  1434.       if(um.getDistributedUpgrades() != null)
  1435.         throw new IOException("n   Distributed upgrade for NameNode version " 
  1436.           + um.getUpgradeVersion() + " to current LV " + FSConstants.LAYOUT_VERSION
  1437.           + " is required.n   Please restart NameNode with -upgrade option.");
  1438.     }
  1439.   }
  1440.   private void initializeDistributedUpgrade() throws IOException {
  1441.     UpgradeManagerNamenode um = FSNamesystem.getFSNamesystem().upgradeManager;
  1442.     if(! um.initializeUpgrade())
  1443.       return;
  1444.     // write new upgrade state into disk
  1445.     FSNamesystem.getFSNamesystem().getFSImage().writeAll();
  1446.     NameNode.LOG.info("n   Distributed upgrade for NameNode version " 
  1447.         + um.getUpgradeVersion() + " to current LV " 
  1448.         + FSConstants.LAYOUT_VERSION + " is initialized.");
  1449.   }
  1450.   static Collection<File> getCheckpointDirs(Configuration conf,
  1451.                                             String defaultName) {
  1452.     Collection<String> dirNames = conf.getStringCollection("fs.checkpoint.dir");
  1453.     if (dirNames.size() == 0 && defaultName != null) {
  1454.       dirNames.add(defaultName);
  1455.     }
  1456.     Collection<File> dirs = new ArrayList<File>(dirNames.size());
  1457.     for(String name : dirNames) {
  1458.       dirs.add(new File(name));
  1459.     }
  1460.     return dirs;
  1461.   }
  1462.   
  1463.   static Collection<File> getCheckpointEditsDirs(Configuration conf,
  1464.                                                  String defaultName) {
  1465.     Collection<String> dirNames = 
  1466.                 conf.getStringCollection("fs.checkpoint.edits.dir");
  1467.  if (dirNames.size() == 0 && defaultName != null) {
  1468.    dirNames.add(defaultName);
  1469.  }
  1470.  Collection<File> dirs = new ArrayList<File>(dirNames.size());
  1471.  for(String name : dirNames) {
  1472.    dirs.add(new File(name));
  1473.  }
  1474.  return dirs;    
  1475.   }
  1476.   static private final UTF8 U_STR = new UTF8();
  1477.   static String readString(DataInputStream in) throws IOException {
  1478.     U_STR.readFields(in);
  1479.     return U_STR.toString();
  1480.   }
  1481.   static String readString_EmptyAsNull(DataInputStream in) throws IOException {
  1482.     final String s = readString(in);
  1483.     return s.isEmpty()? null: s;
  1484.   }
  1485.   static byte[] readBytes(DataInputStream in) throws IOException {
  1486.     U_STR.readFields(in);
  1487.     int len = U_STR.getLength();
  1488.     byte[] bytes = new byte[len];
  1489.     System.arraycopy(U_STR.getBytes(), 0, bytes, 0, len);
  1490.     return bytes;
  1491.   }
  1492.   static void writeString(String str, DataOutputStream out) throws IOException {
  1493.     U_STR.set(str);
  1494.     U_STR.write(out);
  1495.   }
  1496. }