FSDirectory.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:44k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfs.server.namenode;
  19. import java.io.*;
  20. import java.util.*;
  21. import org.apache.hadoop.conf.Configuration;
  22. import org.apache.hadoop.fs.FileStatus;
  23. import org.apache.hadoop.fs.Path;
  24. import org.apache.hadoop.fs.ContentSummary;
  25. import org.apache.hadoop.fs.permission.*;
  26. import org.apache.hadoop.metrics.MetricsRecord;
  27. import org.apache.hadoop.metrics.MetricsUtil;
  28. import org.apache.hadoop.metrics.MetricsContext;
  29. import org.apache.hadoop.hdfs.protocol.FSConstants;
  30. import org.apache.hadoop.hdfs.protocol.Block;
  31. import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
  32. import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
  33. import org.apache.hadoop.hdfs.server.namenode.BlocksMap.BlockInfo;
  34. /*************************************************
  35.  * FSDirectory stores the filesystem directory state.
  36.  * It handles writing/loading values to disk, and logging
  37.  * changes as we go.
  38.  *
  39.  * It keeps the filename->blockset mapping always-current
  40.  * and logged to disk.
  41.  * 
  42.  *************************************************/
  43. class FSDirectory implements FSConstants, Closeable {
  44.   final FSNamesystem namesystem;
  45.   final INodeDirectoryWithQuota rootDir;
  46.   FSImage fsImage;  
  47.   private boolean ready = false;
  48.   // Metrics record
  49.   private MetricsRecord directoryMetrics = null;
  50.   /** Access an existing dfs name directory. */
  51.   FSDirectory(FSNamesystem ns, Configuration conf) {
  52.     this(new FSImage(), ns, conf);
  53.     fsImage.setCheckpointDirectories(FSImage.getCheckpointDirs(conf, null),
  54.                                 FSImage.getCheckpointEditsDirs(conf, null));
  55.   }
  56.   FSDirectory(FSImage fsImage, FSNamesystem ns, Configuration conf) {
  57.     rootDir = new INodeDirectoryWithQuota(INodeDirectory.ROOT_NAME,
  58.         ns.createFsOwnerPermissions(new FsPermission((short)0755)),
  59.         Integer.MAX_VALUE, -1);
  60.     this.fsImage = fsImage;
  61.     namesystem = ns;
  62.     initialize(conf);
  63.   }
  64.     
  65.   private void initialize(Configuration conf) {
  66.     MetricsContext metricsContext = MetricsUtil.getContext("dfs");
  67.     directoryMetrics = MetricsUtil.createRecord(metricsContext, "FSDirectory");
  68.     directoryMetrics.setTag("sessionId", conf.get("session.id"));
  69.   }
  70.   void loadFSImage(Collection<File> dataDirs,
  71.                    Collection<File> editsDirs,
  72.                    StartupOption startOpt) throws IOException {
  73.     // format before starting up if requested
  74.     if (startOpt == StartupOption.FORMAT) {
  75.       fsImage.setStorageDirectories(dataDirs, editsDirs);
  76.       fsImage.format();
  77.       startOpt = StartupOption.REGULAR;
  78.     }
  79.     try {
  80.       if (fsImage.recoverTransitionRead(dataDirs, editsDirs, startOpt)) {
  81.         fsImage.saveFSImage();
  82.       }
  83.       FSEditLog editLog = fsImage.getEditLog();
  84.       assert editLog != null : "editLog must be initialized";
  85.       if (!editLog.isOpen())
  86.         editLog.open();
  87.       fsImage.setCheckpointDirectories(null, null);
  88.     } catch(IOException e) {
  89.       fsImage.close();
  90.       throw e;
  91.     }
  92.     synchronized (this) {
  93.       this.ready = true;
  94.       this.notifyAll();
  95.     }
  96.   }
  97.   private void incrDeletedFileCount(int count) {
  98.     directoryMetrics.incrMetric("files_deleted", count);
  99.     directoryMetrics.update();
  100.   }
  101.     
  102.   /**
  103.    * Shutdown the filestore
  104.    */
  105.   public void close() throws IOException {
  106.     fsImage.close();
  107.   }
  108.   /**
  109.    * Block until the object is ready to be used.
  110.    */
  111.   void waitForReady() {
  112.     if (!ready) {
  113.       synchronized (this) {
  114.         while (!ready) {
  115.           try {
  116.             this.wait(5000);
  117.           } catch (InterruptedException ie) {
  118.           }
  119.         }
  120.       }
  121.     }
  122.   }
  123.   /**
  124.    * Add the given filename to the fs.
  125.    */
  126.   INodeFileUnderConstruction addFile(String path, 
  127.                 PermissionStatus permissions,
  128.                 short replication,
  129.                 long preferredBlockSize,
  130.                 String clientName,
  131.                 String clientMachine,
  132.                 DatanodeDescriptor clientNode,
  133.                 long generationStamp) 
  134.                 throws IOException {
  135.     waitForReady();
  136.     // Always do an implicit mkdirs for parent directory tree.
  137.     long modTime = FSNamesystem.now();
  138.     if (!mkdirs(new Path(path).getParent().toString(), permissions, true,
  139.         modTime)) {
  140.       return null;
  141.     }
  142.     INodeFileUnderConstruction newNode = new INodeFileUnderConstruction(
  143.                                  permissions,replication,
  144.                                  preferredBlockSize, modTime, clientName, 
  145.                                  clientMachine, clientNode);
  146.     synchronized (rootDir) {
  147.       newNode = addNode(path, newNode, -1, false);
  148.     }
  149.     if (newNode == null) {
  150.       NameNode.stateChangeLog.info("DIR* FSDirectory.addFile: "
  151.                                    +"failed to add "+path
  152.                                    +" to the file system");
  153.       return null;
  154.     }
  155.     // add create file record to log, record new generation stamp
  156.     fsImage.getEditLog().logOpenFile(path, newNode);
  157.     NameNode.stateChangeLog.debug("DIR* FSDirectory.addFile: "
  158.                                   +path+" is added to the file system");
  159.     return newNode;
  160.   }
  161.   /**
  162.    */
  163.   INode unprotectedAddFile( String path, 
  164.                             PermissionStatus permissions,
  165.                             Block[] blocks, 
  166.                             short replication,
  167.                             long modificationTime,
  168.                             long atime,
  169.                             long preferredBlockSize) {
  170.     INode newNode;
  171.     long diskspace = -1; // unknown
  172.     if (blocks == null)
  173.       newNode = new INodeDirectory(permissions, modificationTime);
  174.     else {
  175.       newNode = new INodeFile(permissions, blocks.length, replication,
  176.                               modificationTime, atime, preferredBlockSize);
  177.       diskspace = ((INodeFile)newNode).diskspaceConsumed(blocks);
  178.     }
  179.     synchronized (rootDir) {
  180.       try {
  181.         newNode = addNode(path, newNode, diskspace, false);
  182.         if(newNode != null && blocks != null) {
  183.           int nrBlocks = blocks.length;
  184.           // Add file->block mapping
  185.           INodeFile newF = (INodeFile)newNode;
  186.           for (int i = 0; i < nrBlocks; i++) {
  187.             newF.setBlock(i, namesystem.blocksMap.addINode(blocks[i], newF));
  188.           }
  189.         }
  190.       } catch (IOException e) {
  191.         return null;
  192.       }
  193.       return newNode;
  194.     }
  195.   }
  196.   INodeDirectory addToParent( String src,
  197.                               INodeDirectory parentINode,
  198.                               PermissionStatus permissions,
  199.                               Block[] blocks, 
  200.                               short replication,
  201.                               long modificationTime,
  202.                               long atime,
  203.                               long nsQuota,
  204.                               long dsQuota,
  205.                               long preferredBlockSize) {
  206.     // NOTE: This does not update space counts for parents
  207.     // create new inode
  208.     INode newNode;
  209.     if (blocks == null) {
  210.       if (nsQuota >= 0 || dsQuota >= 0) {
  211.         newNode = new INodeDirectoryWithQuota(
  212.             permissions, modificationTime, nsQuota, dsQuota);
  213.       } else {
  214.         newNode = new INodeDirectory(permissions, modificationTime);
  215.       }
  216.     } else 
  217.       newNode = new INodeFile(permissions, blocks.length, replication,
  218.                               modificationTime, atime, preferredBlockSize);
  219.     // add new node to the parent
  220.     INodeDirectory newParent = null;
  221.     synchronized (rootDir) {
  222.       try {
  223.         newParent = rootDir.addToParent(src, newNode, parentINode, false);
  224.       } catch (FileNotFoundException e) {
  225.         return null;
  226.       }
  227.       if(newParent == null)
  228.         return null;
  229.       if(blocks != null) {
  230.         int nrBlocks = blocks.length;
  231.         // Add file->block mapping
  232.         INodeFile newF = (INodeFile)newNode;
  233.         for (int i = 0; i < nrBlocks; i++) {
  234.           newF.setBlock(i, namesystem.blocksMap.addINode(blocks[i], newF));
  235.         }
  236.       }
  237.     }
  238.     return newParent;
  239.   }
  240.   /**
  241.    * Add a block to the file. Returns a reference to the added block.
  242.    */
  243.   Block addBlock(String path, INode[] inodes, Block block) throws IOException {
  244.     waitForReady();
  245.     synchronized (rootDir) {
  246.       INodeFile fileNode = (INodeFile) inodes[inodes.length-1];
  247.       // check quota limits and updated space consumed
  248.       updateCount(inodes, inodes.length-1, 0, 
  249.                   fileNode.getPreferredBlockSize()*fileNode.getReplication());
  250.       
  251.       // associate the new list of blocks with this file
  252.       namesystem.blocksMap.addINode(block, fileNode);
  253.       BlockInfo blockInfo = namesystem.blocksMap.getStoredBlock(block);
  254.       fileNode.addBlock(blockInfo);
  255.       NameNode.stateChangeLog.debug("DIR* FSDirectory.addFile: "
  256.                                     + path + " with " + block
  257.                                     + " block is added to the in-memory "
  258.                                     + "file system");
  259.     }
  260.     return block;
  261.   }
  262.   /**
  263.    * Persist the block list for the inode.
  264.    */
  265.   void persistBlocks(String path, INodeFileUnderConstruction file) 
  266.                      throws IOException {
  267.     waitForReady();
  268.     synchronized (rootDir) {
  269.       fsImage.getEditLog().logOpenFile(path, file);
  270.       NameNode.stateChangeLog.debug("DIR* FSDirectory.persistBlocks: "
  271.                                     +path+" with "+ file.getBlocks().length 
  272.                                     +" blocks is persisted to the file system");
  273.     }
  274.   }
  275.   /**
  276.    * Close file.
  277.    */
  278.   void closeFile(String path, INodeFile file) throws IOException {
  279.     waitForReady();
  280.     synchronized (rootDir) {
  281.       // file is closed
  282.       fsImage.getEditLog().logCloseFile(path, file);
  283.       if (NameNode.stateChangeLog.isDebugEnabled()) {
  284.         NameNode.stateChangeLog.debug("DIR* FSDirectory.closeFile: "
  285.                                     +path+" with "+ file.getBlocks().length 
  286.                                     +" blocks is persisted to the file system");
  287.       }
  288.     }
  289.   }
  290.   /**
  291.    * Remove a block to the file.
  292.    */
  293.   boolean removeBlock(String path, INodeFileUnderConstruction fileNode, 
  294.                       Block block) throws IOException {
  295.     waitForReady();
  296.     synchronized (rootDir) {
  297.       // modify file-> block and blocksMap
  298.       fileNode.removeBlock(block);
  299.       namesystem.blocksMap.removeINode(block);
  300.       // If block is removed from blocksMap remove it from corruptReplicasMap
  301.       namesystem.corruptReplicas.removeFromCorruptReplicasMap(block);
  302.       // write modified block locations to log
  303.       fsImage.getEditLog().logOpenFile(path, fileNode);
  304.       NameNode.stateChangeLog.debug("DIR* FSDirectory.addFile: "
  305.                                     +path+" with "+block
  306.                                     +" block is added to the file system");
  307.     }
  308.     return true;
  309.   }
  310.   /**
  311.    * @see #unprotectedRenameTo(String, String, long)
  312.    */
  313.   boolean renameTo(String src, String dst) throws QuotaExceededException {
  314.     if (NameNode.stateChangeLog.isDebugEnabled()) {
  315.       NameNode.stateChangeLog.debug("DIR* FSDirectory.renameTo: "
  316.                                   +src+" to "+dst);
  317.     }
  318.     waitForReady();
  319.     long now = FSNamesystem.now();
  320.     if (!unprotectedRenameTo(src, dst, now))
  321.       return false;
  322.     fsImage.getEditLog().logRename(src, dst, now);
  323.     return true;
  324.   }
  325.   /** Change a path name
  326.    * 
  327.    * @param src source path
  328.    * @param dst destination path
  329.    * @return true if rename succeeds; false otherwise
  330.    * @throws QuotaExceededException if the operation violates any quota limit
  331.    */
  332.   boolean unprotectedRenameTo(String src, String dst, long timestamp) 
  333.   throws QuotaExceededException {
  334.     synchronized (rootDir) {
  335.       INode[] srcInodes = rootDir.getExistingPathINodes(src);
  336.       // check the validation of the source
  337.       if (srcInodes[srcInodes.length-1] == null) {
  338.         NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
  339.                                      +"failed to rename "+src+" to "+dst+ " because source does not exist");
  340.         return false;
  341.       } else if (srcInodes.length == 1) {
  342.         NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
  343.             +"failed to rename "+src+" to "+dst+ " because source is the root");
  344.         return false;
  345.       }
  346.       if (isDir(dst)) {
  347.         dst += Path.SEPARATOR + new Path(src).getName();
  348.       }
  349.       
  350.       // remove source
  351.       INode srcChild = null;
  352.       try {
  353.         srcChild = removeChild(srcInodes, srcInodes.length-1);
  354.       } catch (IOException e) {
  355.         // srcChild == null; go to next if statement
  356.       }
  357.       if (srcChild == null) {
  358.         NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
  359.             +"failed to rename "+src+" to "+dst+ " because the source can not be removed");
  360.         return false;
  361.       }
  362.       String srcChildName = srcChild.getLocalName();
  363.       
  364.       // check the validity of the destination
  365.       INode dstChild = null;
  366.       QuotaExceededException failureByQuota = null;
  367.       byte[][] dstComponents = INode.getPathComponents(dst);
  368.       INode[] dstInodes = new INode[dstComponents.length];
  369.       rootDir.getExistingPathINodes(dstComponents, dstInodes);
  370.       if (dstInodes[dstInodes.length-1] != null) { //check if destination exists
  371.         NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
  372.                                      +"failed to rename "+src+" to "+dst+ 
  373.                                      " because destination exists");
  374.       } else if (dstInodes[dstInodes.length-2] == null) { // check if its parent exists
  375.         NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
  376.             +"failed to rename "+src+" to "+dst+ 
  377.             " because destination's parent does not exists");
  378.       }
  379.       else {
  380.         // add to the destination
  381.         srcChild.setLocalName(dstComponents[dstInodes.length-1]);
  382.         try {
  383.           // add it to the namespace
  384.           dstChild = addChild(dstInodes, dstInodes.length-1, srcChild, false);
  385.         } catch (QuotaExceededException qe) {
  386.           failureByQuota = qe;
  387.         }
  388.       }
  389.       if (dstChild != null) {
  390.         if (NameNode.stateChangeLog.isDebugEnabled()) {
  391.           NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedRenameTo: "
  392.             +src+" is renamed to "+dst);
  393.         }
  394.         // update modification time of dst and the parent of src
  395.         srcInodes[srcInodes.length-2].setModificationTime(timestamp);
  396.         dstInodes[dstInodes.length-2].setModificationTime(timestamp);
  397.         return true;
  398.       } else {
  399.         NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
  400.             +"failed to rename "+src+" to "+dst);
  401.         try {
  402.           // put it back
  403.           srcChild.setLocalName(srcChildName);
  404.           addChild(srcInodes, srcInodes.length-1, srcChild, false);
  405.         } catch (IOException ignored) {}
  406.         if (failureByQuota != null) {
  407.           throw failureByQuota;
  408.         } else {
  409.           return false;
  410.         }
  411.       }
  412.     }
  413.   }
  414.   /**
  415.    * Set file replication
  416.    * 
  417.    * @param src file name
  418.    * @param replication new replication
  419.    * @param oldReplication old replication - output parameter
  420.    * @return array of file blocks
  421.    * @throws IOException
  422.    */
  423.   Block[] setReplication(String src, 
  424.                          short replication,
  425.                          int[] oldReplication
  426.                          ) throws IOException {
  427.     waitForReady();
  428.     Block[] fileBlocks = unprotectedSetReplication(src, replication, oldReplication);
  429.     if (fileBlocks != null)  // log replication change
  430.       fsImage.getEditLog().logSetReplication(src, replication);
  431.     return fileBlocks;
  432.   }
  433.   Block[] unprotectedSetReplication( String src, 
  434.                                      short replication,
  435.                                      int[] oldReplication
  436.                                      ) throws IOException {
  437.     if (oldReplication == null)
  438.       oldReplication = new int[1];
  439.     oldReplication[0] = -1;
  440.     Block[] fileBlocks = null;
  441.     synchronized(rootDir) {
  442.       INode[] inodes = rootDir.getExistingPathINodes(src);
  443.       INode inode = inodes[inodes.length - 1];
  444.       if (inode == null)
  445.         return null;
  446.       if (inode.isDirectory())
  447.         return null;
  448.       INodeFile fileNode = (INodeFile)inode;
  449.       oldReplication[0] = fileNode.getReplication();
  450.       // check disk quota
  451.       long dsDelta = (replication - oldReplication[0]) *
  452.            (fileNode.diskspaceConsumed()/oldReplication[0]);
  453.       updateCount(inodes, inodes.length-1, 0, dsDelta);
  454.       fileNode.setReplication(replication);
  455.       fileBlocks = fileNode.getBlocks();
  456.     }
  457.     return fileBlocks;
  458.   }
  459.   /**
  460.    * Get the blocksize of a file
  461.    * @param filename the filename
  462.    * @return the number of bytes 
  463.    * @throws IOException if it is a directory or does not exist.
  464.    */
  465.   long getPreferredBlockSize(String filename) throws IOException {
  466.     synchronized (rootDir) {
  467.       INode fileNode = rootDir.getNode(filename);
  468.       if (fileNode == null) {
  469.         throw new IOException("Unknown file: " + filename);
  470.       }
  471.       if (fileNode.isDirectory()) {
  472.         throw new IOException("Getting block size of a directory: " + 
  473.                               filename);
  474.       }
  475.       return ((INodeFile)fileNode).getPreferredBlockSize();
  476.     }
  477.   }
  478.   boolean exists(String src) {
  479.     src = normalizePath(src);
  480.     synchronized(rootDir) {
  481.       INode inode = rootDir.getNode(src);
  482.       if (inode == null) {
  483.          return false;
  484.       }
  485.       return inode.isDirectory()? true: ((INodeFile)inode).getBlocks() != null;
  486.     }
  487.   }
  488.   void setPermission(String src, FsPermission permission
  489.       ) throws IOException {
  490.     unprotectedSetPermission(src, permission);
  491.     fsImage.getEditLog().logSetPermissions(src, permission);
  492.   }
  493.   void unprotectedSetPermission(String src, FsPermission permissions) throws FileNotFoundException {
  494.     synchronized(rootDir) {
  495.         INode inode = rootDir.getNode(src);
  496.         if(inode == null)
  497.             throw new FileNotFoundException("File does not exist: " + src);
  498.         inode.setPermission(permissions);
  499.     }
  500.   }
  501.   void setOwner(String src, String username, String groupname
  502.       ) throws IOException {
  503.     unprotectedSetOwner(src, username, groupname);
  504.     fsImage.getEditLog().logSetOwner(src, username, groupname);
  505.   }
  506.   void unprotectedSetOwner(String src, String username, String groupname) throws FileNotFoundException {
  507.     synchronized(rootDir) {
  508.       INode inode = rootDir.getNode(src);
  509.       if(inode == null)
  510.           throw new FileNotFoundException("File does not exist: " + src);
  511.       if (username != null) {
  512.         inode.setUser(username);
  513.       }
  514.       if (groupname != null) {
  515.         inode.setGroup(groupname);
  516.       }
  517.     }
  518.   }
  519.     
  520.   /**
  521.    * Remove the file from management, return blocks
  522.    */
  523.   INode delete(String src) {
  524.     if (NameNode.stateChangeLog.isDebugEnabled()) {
  525.       NameNode.stateChangeLog.debug("DIR* FSDirectory.delete: "+src);
  526.     }
  527.     waitForReady();
  528.     long now = FSNamesystem.now();
  529.     INode deletedNode = unprotectedDelete(src, now);
  530.     if (deletedNode != null) {
  531.       fsImage.getEditLog().logDelete(src, now);
  532.     }
  533.     return deletedNode;
  534.   }
  535.   
  536.   /** Return if a directory is empty or not **/
  537.   boolean isDirEmpty(String src) {
  538.    boolean dirNotEmpty = true;
  539.     if (!isDir(src)) {
  540.       return true;
  541.     }
  542.     synchronized(rootDir) {
  543.       INode targetNode = rootDir.getNode(src);
  544.       assert targetNode != null : "should be taken care in isDir() above";
  545.       if (((INodeDirectory)targetNode).getChildren().size() != 0) {
  546.         dirNotEmpty = false;
  547.       }
  548.     }
  549.     return dirNotEmpty;
  550.   }
  551.   
  552.   /**
  553.    * Delete a path from the name space
  554.    * Update the count at each ancestor directory with quota
  555.    * @param src a string representation of a path to an inode
  556.    * @param modificationTime the time the inode is removed
  557.    * @param deletedBlocks the place holder for the blocks to be removed
  558.    * @return if the deletion succeeds
  559.    */ 
  560.   INode unprotectedDelete(String src, long modificationTime) {
  561.     src = normalizePath(src);
  562.     synchronized (rootDir) {
  563.       INode[] inodes =  rootDir.getExistingPathINodes(src);
  564.       INode targetNode = inodes[inodes.length-1];
  565.       if (targetNode == null) { // non-existent src
  566.         NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
  567.             +"failed to remove "+src+" because it does not exist");
  568.         return null;
  569.       } else if (inodes.length == 1) { // src is the root
  570.         NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedDelete: " +
  571.             "failed to remove " + src +
  572.             " because the root is not allowed to be deleted");
  573.         return null;
  574.       } else {
  575.         try {
  576.           // Remove the node from the namespace
  577.           removeChild(inodes, inodes.length-1);
  578.           // set the parent's modification time
  579.           inodes[inodes.length-2].setModificationTime(modificationTime);
  580.           // GC all the blocks underneath the node.
  581.           ArrayList<Block> v = new ArrayList<Block>();
  582.           int filesRemoved = targetNode.collectSubtreeBlocksAndClear(v);
  583.           incrDeletedFileCount(filesRemoved);
  584.           namesystem.removePathAndBlocks(src, v);
  585.           if (NameNode.stateChangeLog.isDebugEnabled()) {
  586.             NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
  587.               +src+" is removed");
  588.           }
  589.           return targetNode;
  590.         } catch (IOException e) {
  591.           NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedDelete: " +
  592.               "failed to remove " + src + " because " + e.getMessage());
  593.           return null;
  594.         }
  595.       }
  596.     }
  597.   }
  598.   /**
  599.    * Replaces the specified inode with the specified one.
  600.    */
  601.   void replaceNode(String path, INodeFile oldnode, INodeFile newnode) 
  602.                                                    throws IOException {
  603.     replaceNode(path, oldnode, newnode, true);
  604.   }
  605.   
  606.   /**
  607.    * @see #replaceNode(String, INodeFile, INodeFile)
  608.    */
  609.   private void replaceNode(String path, INodeFile oldnode, INodeFile newnode,
  610.                            boolean updateDiskspace) throws IOException {    
  611.     synchronized (rootDir) {
  612.       long dsOld = oldnode.diskspaceConsumed();
  613.       
  614.       //
  615.       // Remove the node from the namespace 
  616.       //
  617.       if (!oldnode.removeNode()) {
  618.         NameNode.stateChangeLog.warn("DIR* FSDirectory.replaceNode: " +
  619.                                      "failed to remove " + path);
  620.         throw new IOException("FSDirectory.replaceNode: " +
  621.                               "failed to remove " + path);
  622.       } 
  623.       
  624.       /* Currently oldnode and newnode are assumed to contain the same
  625.        * blocks. Otherwise, blocks need to be removed from the blocksMap.
  626.        */
  627.       
  628.       rootDir.addNode(path, newnode); 
  629.       //check if disk space needs to be updated.
  630.       long dsNew = 0;
  631.       if (updateDiskspace && (dsNew = newnode.diskspaceConsumed()) != dsOld) {
  632.         try {
  633.           updateSpaceConsumed(path, 0, dsNew-dsOld);
  634.         } catch (QuotaExceededException e) {
  635.           // undo
  636.           replaceNode(path, newnode, oldnode, false);
  637.           throw e;
  638.         }
  639.       }
  640.       
  641.       int index = 0;
  642.       for (Block b : newnode.getBlocks()) {
  643.         BlockInfo info = namesystem.blocksMap.addINode(b, newnode);
  644.         newnode.setBlock(index, info); // inode refers to the block in BlocksMap
  645.         index++;
  646.       }
  647.     }
  648.   }
  649.   /**
  650.    * Get a listing of files given path 'src'
  651.    *
  652.    * This function is admittedly very inefficient right now.  We'll
  653.    * make it better later.
  654.    */
  655.   FileStatus[] getListing(String src) {
  656.     String srcs = normalizePath(src);
  657.     synchronized (rootDir) {
  658.       INode targetNode = rootDir.getNode(srcs);
  659.       if (targetNode == null)
  660.         return null;
  661.       if (!targetNode.isDirectory()) {
  662.         return new FileStatus[]{createFileStatus(srcs, targetNode)};
  663.       }
  664.       List<INode> contents = ((INodeDirectory)targetNode).getChildren();
  665.       FileStatus listing[] = new FileStatus[contents.size()];
  666.       if(! srcs.endsWith(Path.SEPARATOR))
  667.         srcs += Path.SEPARATOR;
  668.       int i = 0;
  669.       for (INode cur : contents) {
  670.         listing[i] = createFileStatus(srcs+cur.getLocalName(), cur);
  671.         i++;
  672.       }
  673.       return listing;
  674.     }
  675.   }
  676.   /** Get the file info for a specific file.
  677.    * @param src The string representation of the path to the file
  678.    * @return object containing information regarding the file
  679.    *         or null if file not found
  680.    */
  681.   FileStatus getFileInfo(String src) {
  682.     String srcs = normalizePath(src);
  683.     synchronized (rootDir) {
  684.       INode targetNode = rootDir.getNode(srcs);
  685.       if (targetNode == null) {
  686.         return null;
  687.       }
  688.       else {
  689.         return createFileStatus(srcs, targetNode);
  690.       }
  691.     }
  692.   }
  693.   /**
  694.    * Get the blocks associated with the file.
  695.    */
  696.   Block[] getFileBlocks(String src) {
  697.     waitForReady();
  698.     synchronized (rootDir) {
  699.       INode targetNode = rootDir.getNode(src);
  700.       if (targetNode == null)
  701.         return null;
  702.       if(targetNode.isDirectory())
  703.         return null;
  704.       return ((INodeFile)targetNode).getBlocks();
  705.     }
  706.   }
  707.   /**
  708.    * Get {@link INode} associated with the file.
  709.    */
  710.   INodeFile getFileINode(String src) {
  711.     synchronized (rootDir) {
  712.       INode inode = rootDir.getNode(src);
  713.       if (inode == null || inode.isDirectory())
  714.         return null;
  715.       return (INodeFile)inode;
  716.     }
  717.   }
  718.   /**
  719.    * Retrieve the existing INodes along the given path.
  720.    * 
  721.    * @param path the path to explore
  722.    * @return INodes array containing the existing INodes in the order they
  723.    *         appear when following the path from the root INode to the
  724.    *         deepest INodes. The array size will be the number of expected
  725.    *         components in the path, and non existing components will be
  726.    *         filled with null
  727.    *         
  728.    * @see INodeDirectory#getExistingPathINodes(byte[][], INode[])
  729.    */
  730.   INode[] getExistingPathINodes(String path) {
  731.     synchronized (rootDir){
  732.       return rootDir.getExistingPathINodes(path);
  733.     }
  734.   }
  735.   
  736.   /** 
  737.    * Check whether the filepath could be created
  738.    */
  739.   boolean isValidToCreate(String src) {
  740.     String srcs = normalizePath(src);
  741.     synchronized (rootDir) {
  742.       if (srcs.startsWith("/") && 
  743.           !srcs.endsWith("/") && 
  744.           rootDir.getNode(srcs) == null) {
  745.         return true;
  746.       } else {
  747.         return false;
  748.       }
  749.     }
  750.   }
  751.   /**
  752.    * Check whether the path specifies a directory
  753.    */
  754.   boolean isDir(String src) {
  755.     synchronized (rootDir) {
  756.       INode node = rootDir.getNode(normalizePath(src));
  757.       return node != null && node.isDirectory();
  758.     }
  759.   }
  760.   /** Updates namespace and diskspace consumed for all
  761.    * directories until the parent directory of file represented by path.
  762.    * 
  763.    * @param path path for the file.
  764.    * @param nsDelta the delta change of namespace
  765.    * @param dsDelta the delta change of diskspace
  766.    * @throws QuotaExceededException if the new count violates any quota limit
  767.    * @throws FileNotFound if path does not exist.
  768.    */
  769.   void updateSpaceConsumed(String path, long nsDelta, long dsDelta)
  770.                                          throws QuotaExceededException,
  771.                                                 FileNotFoundException {
  772.     synchronized (rootDir) {
  773.       INode[] inodes = rootDir.getExistingPathINodes(path);
  774.       int len = inodes.length;
  775.       if (inodes[len - 1] == null) {
  776.         throw new FileNotFoundException(path + 
  777.                                         " does not exist under rootDir.");
  778.       }
  779.       updateCount(inodes, len-1, nsDelta, dsDelta);
  780.     }
  781.   }
  782.   
  783.   /** update count of each inode with quota
  784.    * 
  785.    * @param inodes an array of inodes on a path
  786.    * @param numOfINodes the number of inodes to update starting from index 0
  787.    * @param nsDelta the delta change of namespace
  788.    * @param dsDelta the delta change of diskspace
  789.    * @throws QuotaExceededException if the new count violates any quota limit
  790.    */
  791.   private void updateCount(INode[] inodes, int numOfINodes, 
  792.                            long nsDelta, long dsDelta)
  793.                            throws QuotaExceededException {
  794.     if (!ready) {
  795.       //still intializing. do not check or update quotas.
  796.       return;
  797.     }
  798.     if (numOfINodes>inodes.length) {
  799.       numOfINodes = inodes.length;
  800.     }
  801.     // check existing components in the path  
  802.     int i=0;
  803.     try {
  804.       for(; i < numOfINodes; i++) {
  805.         if (inodes[i].isQuotaSet()) { // a directory with quota
  806.           INodeDirectoryWithQuota node =(INodeDirectoryWithQuota)inodes[i]; 
  807.           node.updateNumItemsInTree(nsDelta, dsDelta);
  808.         }
  809.       }
  810.     } catch (QuotaExceededException e) {
  811.       e.setPathName(getFullPathName(inodes, i));
  812.       // undo updates
  813.       for( ; i-- > 0; ) {
  814.         try {
  815.           if (inodes[i].isQuotaSet()) { // a directory with quota
  816.             INodeDirectoryWithQuota node =(INodeDirectoryWithQuota)inodes[i]; 
  817.             node.updateNumItemsInTree(-nsDelta, -dsDelta);
  818.           }
  819.         } catch (IOException ingored) {
  820.         }
  821.       }
  822.       throw e;
  823.     }
  824.   }
  825.   
  826.   /** Return the name of the path represented by inodes at [0, pos] */
  827.   private static String getFullPathName(INode[] inodes, int pos) {
  828.     StringBuilder fullPathName = new StringBuilder();
  829.     for (int i=1; i<=pos; i++) {
  830.       fullPathName.append(Path.SEPARATOR_CHAR).append(inodes[i].getLocalName());
  831.     }
  832.     return fullPathName.toString();
  833.   }
  834.   
  835.   /**
  836.    * Create a directory 
  837.    * If ancestor directories do not exist, automatically create them.
  838.    * @param src string representation of the path to the directory
  839.    * @param permissions the permission of the directory
  840.    * @param inheritPermission if the permission of the directory should inherit
  841.    *                          from its parent or not. The automatically created
  842.    *                          ones always inherit its permission from its parent
  843.    * @param now creation time
  844.    * @return true if the operation succeeds false otherwise
  845.    * @throws FileNotFoundException if an ancestor or itself is a file
  846.    * @throws QuotaExceededException if directory creation violates 
  847.    *                                any quota limit
  848.    */
  849.   boolean mkdirs(String src, PermissionStatus permissions,
  850.       boolean inheritPermission, long now)
  851.       throws FileNotFoundException, QuotaExceededException {
  852.     src = normalizePath(src);
  853.     String[] names = INode.getPathNames(src);
  854.     byte[][] components = INode.getPathComponents(names);
  855.     INode[] inodes = new INode[components.length];
  856.     synchronized(rootDir) {
  857.       rootDir.getExistingPathINodes(components, inodes);
  858.       // find the index of the first null in inodes[]
  859.       StringBuilder pathbuilder = new StringBuilder();
  860.       int i = 1;
  861.       for(; i < inodes.length && inodes[i] != null; i++) {
  862.         pathbuilder.append(Path.SEPARATOR + names[i]);
  863.         if (!inodes[i].isDirectory()) {
  864.           throw new FileNotFoundException("Parent path is not a directory: "
  865.               + pathbuilder);
  866.         }
  867.       }
  868.       // create directories beginning from the first null index
  869.       for(; i < inodes.length; i++) {
  870.         pathbuilder.append(Path.SEPARATOR + names[i]);
  871.         String cur = pathbuilder.toString();
  872.         unprotectedMkdir(inodes, i, components[i], permissions,
  873.             inheritPermission || i != components.length-1, now);
  874.         if (inodes[i] == null) {
  875.           return false;
  876.         }
  877.         // Directory creation also count towards FilesCreated
  878.         // to match count of files_deleted metric. 
  879.         if (namesystem != null)
  880.           NameNode.getNameNodeMetrics().numFilesCreated.inc();
  881.         fsImage.getEditLog().logMkDir(cur, inodes[i]);
  882.         NameNode.stateChangeLog.debug(
  883.             "DIR* FSDirectory.mkdirs: created directory " + cur);
  884.       }
  885.     }
  886.     return true;
  887.   }
  888.   /**
  889.    */
  890.   INode unprotectedMkdir(String src, PermissionStatus permissions,
  891.                           long timestamp) throws QuotaExceededException {
  892.     byte[][] components = INode.getPathComponents(src);
  893.     INode[] inodes = new INode[components.length];
  894.     synchronized (rootDir) {
  895.       rootDir.getExistingPathINodes(components, inodes);
  896.       unprotectedMkdir(inodes, inodes.length-1, components[inodes.length-1],
  897.           permissions, false, timestamp);
  898.       return inodes[inodes.length-1];
  899.     }
  900.   }
  901.   /** create a directory at index pos.
  902.    * The parent path to the directory is at [0, pos-1].
  903.    * All ancestors exist. Newly created one stored at index pos.
  904.    */
  905.   private void unprotectedMkdir(INode[] inodes, int pos,
  906.       byte[] name, PermissionStatus permission, boolean inheritPermission,
  907.       long timestamp) throws QuotaExceededException {
  908.     inodes[pos] = addChild(inodes, pos, 
  909.         new INodeDirectory(name, permission, timestamp),
  910.         inheritPermission );
  911.   }
  912.   
  913.   /** Add a node child to the namespace. The full path name of the node is src.
  914.    * childDiskspace should be -1, if unknown. 
  915.    * QuotaExceededException is thrown if it violates quota limit */
  916.   private <T extends INode> T addNode(String src, T child, 
  917.         long childDiskspace, boolean inheritPermission) 
  918.   throws QuotaExceededException {
  919.     byte[][] components = INode.getPathComponents(src);
  920.     child.setLocalName(components[components.length-1]);
  921.     INode[] inodes = new INode[components.length];
  922.     synchronized (rootDir) {
  923.       rootDir.getExistingPathINodes(components, inodes);
  924.       return addChild(inodes, inodes.length-1, child, childDiskspace,
  925.                       inheritPermission);
  926.     }
  927.   }
  928.   
  929.   /** Add a node child to the inodes at index pos. 
  930.    * Its ancestors are stored at [0, pos-1]. 
  931.    * QuotaExceededException is thrown if it violates quota limit */
  932.   private <T extends INode> T addChild(INode[] pathComponents, int pos, T child,
  933.       boolean inheritPermission) throws QuotaExceededException {
  934.     return addChild(pathComponents, pos, child, -1, inheritPermission);
  935.   }
  936.   
  937.   /** Add a node child to the inodes at index pos. 
  938.    * Its ancestors are stored at [0, pos-1]. 
  939.    * QuotaExceededException is thrown if it violates quota limit */
  940.   private <T extends INode> T addChild(INode[] pathComponents, int pos, T child,
  941.        long childDiskspace, boolean inheritPermission) throws QuotaExceededException {
  942.     INode.DirCounts counts = new INode.DirCounts();
  943.     child.spaceConsumedInTree(counts);
  944.     if (childDiskspace < 0) {
  945.       childDiskspace = counts.getDsCount();
  946.     }
  947.     updateCount(pathComponents, pos, counts.getNsCount(), childDiskspace);
  948.     T addedNode = ((INodeDirectory)pathComponents[pos-1]).addChild(
  949.         child, inheritPermission);
  950.     if (addedNode == null) {
  951.       updateCount(pathComponents, pos, 
  952.                   -counts.getNsCount(), -childDiskspace);
  953.     }
  954.     return addedNode;
  955.   }
  956.   
  957.   /** Remove an inode at index pos from the namespace.
  958.    * Its ancestors are stored at [0, pos-1].
  959.    * Count of each ancestor with quota is also updated.
  960.    * Return the removed node; null if the removal fails.
  961.    */
  962.   private INode removeChild(INode[] pathComponents, int pos)
  963.   throws QuotaExceededException {
  964.     INode removedNode = 
  965.       ((INodeDirectory)pathComponents[pos-1]).removeChild(pathComponents[pos]);
  966.     if (removedNode != null) {
  967.       INode.DirCounts counts = new INode.DirCounts();
  968.       removedNode.spaceConsumedInTree(counts);
  969.       updateCount(pathComponents, pos,
  970.                   -counts.getNsCount(), -counts.getDsCount());
  971.     }
  972.     return removedNode;
  973.   }
  974.   
  975.   /**
  976.    */
  977.   String normalizePath(String src) {
  978.     if (src.length() > 1 && src.endsWith("/")) {
  979.       src = src.substring(0, src.length() - 1);
  980.     }
  981.     return src;
  982.   }
  983.   ContentSummary getContentSummary(String src) throws IOException {
  984.     String srcs = normalizePath(src);
  985.     synchronized (rootDir) {
  986.       INode targetNode = rootDir.getNode(srcs);
  987.       if (targetNode == null) {
  988.         throw new FileNotFoundException("File does not exist: " + srcs);
  989.       }
  990.       else {
  991.         return targetNode.computeContentSummary();
  992.       }
  993.     }
  994.   }
  995.   /** Update the count of each directory with quota in the namespace
  996.    * A directory's count is defined as the total number inodes in the tree
  997.    * rooted at the directory.
  998.    * 
  999.    * This is an update of existing state of the filesystem and does not
  1000.    * throw QuotaExceededException.
  1001.    */
  1002.   void updateCountForINodeWithQuota() {
  1003.     updateCountForINodeWithQuota(rootDir, new INode.DirCounts(), 
  1004.                                  new ArrayList<INode>(50));
  1005.   }
  1006.   
  1007.   /** 
  1008.    * Update the count of the directory if it has a quota and return the count
  1009.    * 
  1010.    * This does not throw a QuotaExceededException. This is just an update
  1011.    * of of existing state and throwing QuotaExceededException does not help
  1012.    * with fixing the state, if there is a problem.
  1013.    * 
  1014.    * @param dir the root of the tree that represents the directory
  1015.    * @param counters counters for name space and disk space
  1016.    * @param nodesInPath INodes for the each of components in the path.
  1017.    * @return the size of the tree
  1018.    */
  1019.   private static void updateCountForINodeWithQuota(INodeDirectory dir, 
  1020.                                                INode.DirCounts counts,
  1021.                                                ArrayList<INode> nodesInPath) {
  1022.     long parentNamespace = counts.nsCount;
  1023.     long parentDiskspace = counts.dsCount;
  1024.     
  1025.     counts.nsCount = 1L;//for self. should not call node.spaceConsumedInTree()
  1026.     counts.dsCount = 0L;
  1027.     
  1028.     /* We don't need nodesInPath if we could use 'parent' field in 
  1029.      * INode. using 'parent' is not currently recommended. */
  1030.     nodesInPath.add(dir);
  1031.     for (INode child : dir.getChildren()) {
  1032.       if (child.isDirectory()) {
  1033.         updateCountForINodeWithQuota((INodeDirectory)child, 
  1034.                                      counts, nodesInPath);
  1035.       } else { // reduce recursive calls
  1036.         counts.nsCount += 1;
  1037.         counts.dsCount += ((INodeFile)child).diskspaceConsumed();
  1038.       }
  1039.     }
  1040.       
  1041.     if (dir.isQuotaSet()) {
  1042.       ((INodeDirectoryWithQuota)dir).setSpaceConsumed(counts.nsCount,
  1043.                                                       counts.dsCount);
  1044.       // check if quota is violated for some reason.
  1045.       if ((dir.getNsQuota() >= 0 && counts.nsCount > dir.getNsQuota()) ||
  1046.           (dir.getDsQuota() >= 0 && counts.dsCount > dir.getDsQuota())) {
  1047.         // can only happen because of a software bug. the bug should be fixed.
  1048.         StringBuilder path = new StringBuilder(512);
  1049.         for (INode n : nodesInPath) {
  1050.           path.append('/');
  1051.           path.append(n.getLocalName());
  1052.         }
  1053.         
  1054.         NameNode.LOG.warn("Unexpected quota violation in image for " + path + 
  1055.                           " (Namespace quota : " + dir.getNsQuota() +
  1056.                           " consumed : " + counts.nsCount + ")" +
  1057.                           " (Diskspace quota : " + dir.getDsQuota() +
  1058.                           " consumed : " + counts.dsCount + ").");
  1059.       }            
  1060.     }
  1061.       
  1062.     // pop 
  1063.     nodesInPath.remove(nodesInPath.size()-1);
  1064.     
  1065.     counts.nsCount += parentNamespace;
  1066.     counts.dsCount += parentDiskspace;
  1067.   }
  1068.   
  1069.   /**
  1070.    * See {@link ClientProtocol#setQuota(String, long, long)} for the contract.
  1071.    * Sets quota for for a directory.
  1072.    * @returns INodeDirectory if any of the quotas have changed. null other wise.
  1073.    * @throws FileNotFoundException if the path does not exist or is a file
  1074.    * @throws QuotaExceededException if the directory tree size is 
  1075.    *                                greater than the given quota
  1076.    */
  1077.   INodeDirectory unprotectedSetQuota(String src, long nsQuota, long dsQuota) 
  1078.                        throws FileNotFoundException, QuotaExceededException {
  1079.     // sanity check
  1080.     if ((nsQuota < 0 && nsQuota != FSConstants.QUOTA_DONT_SET && 
  1081.          nsQuota < FSConstants.QUOTA_RESET) || 
  1082.         (dsQuota < 0 && dsQuota != FSConstants.QUOTA_DONT_SET && 
  1083.           dsQuota < FSConstants.QUOTA_RESET)) {
  1084.       throw new IllegalArgumentException("Illegal value for nsQuota or " +
  1085.                                          "dsQuota : " + nsQuota + " and " +
  1086.                                          dsQuota);
  1087.     }
  1088.     
  1089.     String srcs = normalizePath(src);
  1090.     INode[] inodes = rootDir.getExistingPathINodes(src);
  1091.     INode targetNode = inodes[inodes.length-1];
  1092.     if (targetNode == null) {
  1093.       throw new FileNotFoundException("Directory does not exist: " + srcs);
  1094.     } else if (!targetNode.isDirectory()) {
  1095.       throw new FileNotFoundException("Cannot set quota on a file: " + srcs);  
  1096.     } else { // a directory inode
  1097.       INodeDirectory dirNode = (INodeDirectory)targetNode;
  1098.       long oldNsQuota = dirNode.getNsQuota();
  1099.       long oldDsQuota = dirNode.getDsQuota();
  1100.       if (nsQuota == FSConstants.QUOTA_DONT_SET) {
  1101.         nsQuota = oldNsQuota;
  1102.       }
  1103.       if (dsQuota == FSConstants.QUOTA_DONT_SET) {
  1104.         dsQuota = oldDsQuota;
  1105.       }        
  1106.       if (dirNode instanceof INodeDirectoryWithQuota) { 
  1107.         // a directory with quota; so set the quota to the new value
  1108.         ((INodeDirectoryWithQuota)dirNode).setQuota(nsQuota, dsQuota);
  1109.       } else {
  1110.         // a non-quota directory; so replace it with a directory with quota
  1111.         INodeDirectoryWithQuota newNode = 
  1112.           new INodeDirectoryWithQuota(nsQuota, dsQuota, dirNode);
  1113.         // non-root directory node; parent != null
  1114.         INodeDirectory parent = (INodeDirectory)inodes[inodes.length-2];
  1115.         dirNode = newNode;
  1116.         parent.replaceChild(newNode);
  1117.       }
  1118.       return (oldNsQuota != nsQuota || oldDsQuota != dsQuota) ? dirNode : null;
  1119.     }
  1120.   }
  1121.   
  1122.   /**
  1123.    * See {@link ClientProtocol#setQuota(String, long, long)} for the 
  1124.    * contract.
  1125.    * @see #unprotectedSetQuota(String, long, long)
  1126.    */
  1127.   void setQuota(String src, long nsQuota, long dsQuota) 
  1128.                 throws FileNotFoundException, QuotaExceededException {
  1129.     synchronized (rootDir) {    
  1130.       INodeDirectory dir = unprotectedSetQuota(src, nsQuota, dsQuota);
  1131.       if (dir != null) {
  1132.         fsImage.getEditLog().logSetQuota(src, dir.getNsQuota(), 
  1133.                                          dir.getDsQuota());
  1134.       }
  1135.     }
  1136.   }
  1137.   
  1138.   long totalInodes() {
  1139.     synchronized (rootDir) {
  1140.       return rootDir.numItemsInTree();
  1141.     }
  1142.   }
  1143.   /**
  1144.    * Sets the access time on the file. Logs it in the transaction log
  1145.    */
  1146.   void setTimes(String src, INodeFile inode, long mtime, long atime, boolean force) 
  1147.                                                         throws IOException {
  1148.     if (unprotectedSetTimes(src, inode, mtime, atime, force)) {
  1149.       fsImage.getEditLog().logTimes(src, mtime, atime);
  1150.     }
  1151.   }
  1152.   boolean unprotectedSetTimes(String src, long mtime, long atime, boolean force) 
  1153.                               throws IOException {
  1154.     INodeFile inode = getFileINode(src);
  1155.     return unprotectedSetTimes(src, inode, mtime, atime, force);
  1156.   }
  1157.   private boolean unprotectedSetTimes(String src, INodeFile inode, long mtime,
  1158.                                       long atime, boolean force) throws IOException {
  1159.     boolean status = false;
  1160.     if (mtime != -1) {
  1161.       inode.setModificationTimeForce(mtime);
  1162.       status = true;
  1163.     }
  1164.     if (atime != -1) {
  1165.       long inodeTime = inode.getAccessTime();
  1166.       // if the last access time update was within the last precision interval, then
  1167.       // no need to store access time
  1168.       if (atime <= inodeTime + namesystem.getAccessTimePrecision() && !force) {
  1169.         status =  false;
  1170.       } else {
  1171.         inode.setAccessTime(atime);
  1172.         status = true;
  1173.       }
  1174.     } 
  1175.     return status;
  1176.   }
  1177.   
  1178.   /**
  1179.    * Create FileStatus by file INode 
  1180.    */
  1181.    private static FileStatus createFileStatus(String path, INode node) {
  1182.     // length is zero for directories
  1183.     return new FileStatus(node.isDirectory() ? 0 : node.computeContentSummary().getLength(), 
  1184.         node.isDirectory(), 
  1185.         node.isDirectory() ? 0 : ((INodeFile)node).getReplication(), 
  1186.         node.isDirectory() ? 0 : ((INodeFile)node).getPreferredBlockSize(),
  1187.         node.getModificationTime(),
  1188.         node.getAccessTime(),
  1189.         node.getFsPermission(),
  1190.         node.getUserName(),
  1191.         node.getGroupName(),
  1192.         new Path(path));
  1193.   }
  1194. }