FSNamesystem.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:167k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfs.server.namenode;
  19. import org.apache.commons.logging.*;
  20. import org.apache.hadoop.conf.*;
  21. import org.apache.hadoop.hdfs.DFSUtil;
  22. import org.apache.hadoop.hdfs.protocol.*;
  23. import org.apache.hadoop.hdfs.server.common.GenerationStamp;
  24. import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
  25. import org.apache.hadoop.hdfs.server.common.Storage;
  26. import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
  27. import org.apache.hadoop.hdfs.server.namenode.BlocksMap.BlockInfo;
  28. import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
  29. import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMetrics;
  30. import org.apache.hadoop.security.AccessControlException;
  31. import org.apache.hadoop.security.UnixUserGroupInformation;
  32. import org.apache.hadoop.security.UserGroupInformation;
  33. import org.apache.hadoop.util.*;
  34. import org.apache.hadoop.metrics.util.MBeanUtil;
  35. import org.apache.hadoop.net.CachedDNSToSwitchMapping;
  36. import org.apache.hadoop.net.DNSToSwitchMapping;
  37. import org.apache.hadoop.net.NetworkTopology;
  38. import org.apache.hadoop.net.ScriptBasedMapping;
  39. import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
  40. import org.apache.hadoop.hdfs.server.namenode.UnderReplicatedBlocks.BlockIterator;
  41. import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
  42. import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
  43. import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
  44. import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
  45. import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
  46. import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
  47. import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
  48. import org.apache.hadoop.fs.ContentSummary;
  49. import org.apache.hadoop.fs.FileStatus;
  50. import org.apache.hadoop.fs.Path;
  51. import org.apache.hadoop.fs.permission.*;
  52. import org.apache.hadoop.ipc.Server;
  53. import org.apache.hadoop.io.IOUtils;
  54. import java.io.BufferedWriter;
  55. import java.io.File;
  56. import java.io.FileWriter;
  57. import java.io.FileNotFoundException;
  58. import java.io.IOException;
  59. import java.io.PrintWriter;
  60. import java.io.DataOutputStream;
  61. import java.net.InetAddress;
  62. import java.net.InetSocketAddress;
  63. import java.util.*;
  64. import java.util.Map.Entry;
  65. import javax.management.NotCompliantMBeanException;
  66. import javax.management.ObjectName;
  67. import javax.management.StandardMBean;
  68. import javax.security.auth.login.LoginException;
  69. /***************************************************
  70.  * FSNamesystem does the actual bookkeeping work for the
  71.  * DataNode.
  72.  *
  73.  * It tracks several important tables.
  74.  *
  75.  * 1)  valid fsname --> blocklist  (kept on disk, logged)
  76.  * 2)  Set of all valid blocks (inverted #1)
  77.  * 3)  block --> machinelist (kept in memory, rebuilt dynamically from reports)
  78.  * 4)  machine --> blocklist (inverted #2)
  79.  * 5)  LRU cache of updated-heartbeat machines
  80.  ***************************************************/
  81. public class FSNamesystem implements FSConstants, FSNamesystemMBean {
  82.   public static final Log LOG = LogFactory.getLog(FSNamesystem.class);
  83.   public static final String AUDIT_FORMAT =
  84.     "ugi=%st" +  // ugi
  85.     "ip=%st" +   // remote IP
  86.     "cmd=%st" +  // command
  87.     "src=%st" +  // src path
  88.     "dst=%st" +  // dst path (optional)
  89.     "perm=%s";    // permissions (optional)
  90.   private static final ThreadLocal<Formatter> auditFormatter =
  91.     new ThreadLocal<Formatter>() {
  92.       protected Formatter initialValue() {
  93.         return new Formatter(new StringBuilder(AUDIT_FORMAT.length() * 4));
  94.       }
  95.   };
  96.   private static final void logAuditEvent(UserGroupInformation ugi,
  97.       InetAddress addr, String cmd, String src, String dst,
  98.       FileStatus stat) {
  99.     final Formatter fmt = auditFormatter.get();
  100.     ((StringBuilder)fmt.out()).setLength(0);
  101.     auditLog.info(fmt.format(AUDIT_FORMAT, ugi, addr, cmd, src, dst,
  102.                   (stat == null)
  103.                     ? null
  104.                     : stat.getOwner() + ':' + stat.getGroup() + ':' +
  105.                       stat.getPermission()
  106.           ).toString());
  107.   }
  108.   public static final Log auditLog = LogFactory.getLog(
  109.       FSNamesystem.class.getName() + ".audit");
  110.   private boolean isPermissionEnabled;
  111.   private UserGroupInformation fsOwner;
  112.   private String supergroup;
  113.   private PermissionStatus defaultPermission;
  114.   // FSNamesystemMetrics counter variables
  115.   private FSNamesystemMetrics myFSMetrics;
  116.   private long capacityTotal = 0L, capacityUsed = 0L, capacityRemaining = 0L;
  117.   private int totalLoad = 0;
  118.   private long pendingReplicationBlocksCount = 0L, corruptReplicaBlocksCount,
  119.     underReplicatedBlocksCount = 0L, scheduledReplicationBlocksCount = 0L;
  120.   //
  121.   // Stores the correct file name hierarchy
  122.   //
  123.   public FSDirectory dir;
  124.   //
  125.   // Mapping: Block -> { INode, datanodes, self ref } 
  126.   // Updated only in response to client-sent information.
  127.   //
  128.   BlocksMap blocksMap = new BlocksMap();
  129.   //
  130.   // Store blocks-->datanodedescriptor(s) map of corrupt replicas
  131.   //
  132.   public CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();
  133.     
  134.   /**
  135.    * Stores the datanode -> block map.  
  136.    * <p>
  137.    * Done by storing a set of {@link DatanodeDescriptor} objects, sorted by 
  138.    * storage id. In order to keep the storage map consistent it tracks 
  139.    * all storages ever registered with the namenode.
  140.    * A descriptor corresponding to a specific storage id can be
  141.    * <ul> 
  142.    * <li>added to the map if it is a new storage id;</li>
  143.    * <li>updated with a new datanode started as a replacement for the old one 
  144.    * with the same storage id; and </li>
  145.    * <li>removed if and only if an existing datanode is restarted to serve a
  146.    * different storage id.</li>
  147.    * </ul> <br>
  148.    * The list of the {@link DatanodeDescriptor}s in the map is checkpointed
  149.    * in the namespace image file. Only the {@link DatanodeInfo} part is 
  150.    * persistent, the list of blocks is restored from the datanode block
  151.    * reports. 
  152.    * <p>
  153.    * Mapping: StorageID -> DatanodeDescriptor
  154.    */
  155.   NavigableMap<String, DatanodeDescriptor> datanodeMap = 
  156.     new TreeMap<String, DatanodeDescriptor>();
  157.   //
  158.   // Keeps a Collection for every named machine containing
  159.   // blocks that have recently been invalidated and are thought to live
  160.   // on the machine in question.
  161.   // Mapping: StorageID -> ArrayList<Block>
  162.   //
  163.   private Map<String, Collection<Block>> recentInvalidateSets = 
  164.     new TreeMap<String, Collection<Block>>();
  165.   //
  166.   // Keeps a TreeSet for every named node.  Each treeset contains
  167.   // a list of the blocks that are "extra" at that location.  We'll
  168.   // eventually remove these extras.
  169.   // Mapping: StorageID -> TreeSet<Block>
  170.   //
  171.   Map<String, Collection<Block>> excessReplicateMap = 
  172.     new TreeMap<String, Collection<Block>>();
  173.   Random r = new Random();
  174.   /**
  175.    * Stores a set of DatanodeDescriptor objects.
  176.    * This is a subset of {@link #datanodeMap}, containing nodes that are 
  177.    * considered alive.
  178.    * The {@link HeartbeatMonitor} periodically checks for outdated entries,
  179.    * and removes them from the list.
  180.    */
  181.   ArrayList<DatanodeDescriptor> heartbeats = new ArrayList<DatanodeDescriptor>();
  182.   //
  183.   // Store set of Blocks that need to be replicated 1 or more times.
  184.   // We also store pending replication-orders.
  185.   // Set of: Block
  186.   //
  187.   private UnderReplicatedBlocks neededReplications = new UnderReplicatedBlocks();
  188.   private PendingReplicationBlocks pendingReplications;
  189.   public LeaseManager leaseManager = new LeaseManager(this); 
  190.   //
  191.   // Threaded object that checks to see if we have been
  192.   // getting heartbeats from all clients. 
  193.   //
  194.   Daemon hbthread = null;   // HeartbeatMonitor thread
  195.   public Daemon lmthread = null;   // LeaseMonitor thread
  196.   Daemon smmthread = null;  // SafeModeMonitor thread
  197.   public Daemon replthread = null;  // Replication thread
  198.   
  199.   private volatile boolean fsRunning = true;
  200.   long systemStart = 0;
  201.   //  The maximum number of replicates we should allow for a single block
  202.   private int maxReplication;
  203.   //  How many outgoing replication streams a given node should have at one time
  204.   private int maxReplicationStreams;
  205.   // MIN_REPLICATION is how many copies we need in place or else we disallow the write
  206.   private int minReplication;
  207.   // Default replication
  208.   private int defaultReplication;
  209.   // heartbeatRecheckInterval is how often namenode checks for expired datanodes
  210.   private long heartbeatRecheckInterval;
  211.   // heartbeatExpireInterval is how long namenode waits for datanode to report
  212.   // heartbeat
  213.   private long heartbeatExpireInterval;
  214.   //replicationRecheckInterval is how often namenode checks for new replication work
  215.   private long replicationRecheckInterval;
  216.   // default block size of a file
  217.   private long defaultBlockSize = 0;
  218.   // allow appending to hdfs files
  219.   private boolean supportAppends = true;
  220.   /**
  221.    * Last block index used for replication work.
  222.    */
  223.   private int replIndex = 0;
  224.   private long missingBlocksInCurIter = 0;
  225.   private long missingBlocksInPrevIter = 0; 
  226.   public static FSNamesystem fsNamesystemObject;
  227.   /** NameNode RPC address */
  228.   private InetSocketAddress nameNodeAddress = null; // TODO: name-node has this field, it should be removed here
  229.   private SafeModeInfo safeMode;  // safe mode information
  230.   private Host2NodesMap host2DataNodeMap = new Host2NodesMap();
  231.     
  232.   // datanode networktoplogy
  233.   NetworkTopology clusterMap = new NetworkTopology();
  234.   private DNSToSwitchMapping dnsToSwitchMapping;
  235.   
  236.   // for block replicas placement
  237.   ReplicationTargetChooser replicator;
  238.   private HostsFileReader hostsReader; 
  239.   private Daemon dnthread = null;
  240.   private long maxFsObjects = 0;          // maximum number of fs objects
  241.   /**
  242.    * The global generation stamp for this file system. 
  243.    */
  244.   private final GenerationStamp generationStamp = new GenerationStamp();
  245.   // Ask Datanode only up to this many blocks to delete.
  246.   private int blockInvalidateLimit = FSConstants.BLOCK_INVALIDATE_CHUNK;
  247.   // precision of access times.
  248.   private long accessTimePrecision = 0;
  249.   /**
  250.    * FSNamesystem constructor.
  251.    */
  252.   FSNamesystem(NameNode nn, Configuration conf) throws IOException {
  253.     try {
  254.       initialize(nn, conf);
  255.     } catch(IOException e) {
  256.       LOG.error(getClass().getSimpleName() + " initialization failed.", e);
  257.       close();
  258.       throw e;
  259.     }
  260.   }
  261.   /**
  262.    * Initialize FSNamesystem.
  263.    */
  264.   private void initialize(NameNode nn, Configuration conf) throws IOException {
  265.     this.systemStart = now();
  266.     setConfigurationParameters(conf);
  267.     this.nameNodeAddress = nn.getNameNodeAddress();
  268.     this.registerMBean(conf); // register the MBean for the FSNamesystemStutus
  269.     this.dir = new FSDirectory(this, conf);
  270.     StartupOption startOpt = NameNode.getStartupOption(conf);
  271.     this.dir.loadFSImage(getNamespaceDirs(conf),
  272.                          getNamespaceEditsDirs(conf), startOpt);
  273.     long timeTakenToLoadFSImage = now() - systemStart;
  274.     LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs");
  275.     NameNode.getNameNodeMetrics().fsImageLoadTime.set(
  276.                               (int) timeTakenToLoadFSImage);
  277.     this.safeMode = new SafeModeInfo(conf);
  278.     setBlockTotal();
  279.     pendingReplications = new PendingReplicationBlocks(
  280.                             conf.getInt("dfs.replication.pending.timeout.sec", 
  281.                                         -1) * 1000L);
  282.     this.hbthread = new Daemon(new HeartbeatMonitor());
  283.     this.lmthread = new Daemon(leaseManager.new Monitor());
  284.     this.replthread = new Daemon(new ReplicationMonitor());
  285.     hbthread.start();
  286.     lmthread.start();
  287.     replthread.start();
  288.     this.hostsReader = new HostsFileReader(conf.get("dfs.hosts",""),
  289.                                            conf.get("dfs.hosts.exclude",""));
  290.     this.dnthread = new Daemon(new DecommissionManager(this).new Monitor(
  291.         conf.getInt("dfs.namenode.decommission.interval", 30),
  292.         conf.getInt("dfs.namenode.decommission.nodes.per.interval", 5)));
  293.     dnthread.start();
  294.     this.dnsToSwitchMapping = ReflectionUtils.newInstance(
  295.         conf.getClass("topology.node.switch.mapping.impl", ScriptBasedMapping.class,
  296.             DNSToSwitchMapping.class), conf);
  297.     
  298.     /* If the dns to swith mapping supports cache, resolve network 
  299.      * locations of those hosts in the include list, 
  300.      * and store the mapping in the cache; so future calls to resolve
  301.      * will be fast.
  302.      */
  303.     if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
  304.       dnsToSwitchMapping.resolve(new ArrayList<String>(hostsReader.getHosts()));
  305.     }
  306.   }
  307.   public static Collection<File> getNamespaceDirs(Configuration conf) {
  308.     Collection<String> dirNames = conf.getStringCollection("dfs.name.dir");
  309.     if (dirNames.isEmpty())
  310.       dirNames.add("/tmp/hadoop/dfs/name");
  311.     Collection<File> dirs = new ArrayList<File>(dirNames.size());
  312.     for(String name : dirNames) {
  313.       dirs.add(new File(name));
  314.     }
  315.     return dirs;
  316.   }
  317.   
  318.   public static Collection<File> getNamespaceEditsDirs(Configuration conf) {
  319.     Collection<String> editsDirNames = 
  320.             conf.getStringCollection("dfs.name.edits.dir");
  321.     if (editsDirNames.isEmpty())
  322.       editsDirNames.add("/tmp/hadoop/dfs/name");
  323.     Collection<File> dirs = new ArrayList<File>(editsDirNames.size());
  324.     for(String name : editsDirNames) {
  325.       dirs.add(new File(name));
  326.     }
  327.     return dirs;
  328.   }
  329.   /**
  330.    * dirs is a list of directories where the filesystem directory state 
  331.    * is stored
  332.    */
  333.   FSNamesystem(FSImage fsImage, Configuration conf) throws IOException {
  334.     setConfigurationParameters(conf);
  335.     this.dir = new FSDirectory(fsImage, this, conf);
  336.   }
  337.   /**
  338.    * Initializes some of the members from configuration
  339.    */
  340.   private void setConfigurationParameters(Configuration conf) 
  341.                                           throws IOException {
  342.     fsNamesystemObject = this;
  343.     try {
  344.       fsOwner = UnixUserGroupInformation.login(conf);
  345.     } catch (LoginException e) {
  346.       throw new IOException(StringUtils.stringifyException(e));
  347.     }
  348.     LOG.info("fsOwner=" + fsOwner);
  349.     this.supergroup = conf.get("dfs.permissions.supergroup", "supergroup");
  350.     this.isPermissionEnabled = conf.getBoolean("dfs.permissions", true);
  351.     LOG.info("supergroup=" + supergroup);
  352.     LOG.info("isPermissionEnabled=" + isPermissionEnabled);
  353.     short filePermission = (short)conf.getInt("dfs.upgrade.permission", 0777);
  354.     this.defaultPermission = PermissionStatus.createImmutable(
  355.         fsOwner.getUserName(), supergroup, new FsPermission(filePermission));
  356.     this.replicator = new ReplicationTargetChooser(
  357.                          conf.getBoolean("dfs.replication.considerLoad", true),
  358.                          this,
  359.                          clusterMap);
  360.     this.defaultReplication = conf.getInt("dfs.replication", 3);
  361.     this.maxReplication = conf.getInt("dfs.replication.max", 512);
  362.     this.minReplication = conf.getInt("dfs.replication.min", 1);
  363.     if (minReplication <= 0)
  364.       throw new IOException(
  365.                             "Unexpected configuration parameters: dfs.replication.min = " 
  366.                             + minReplication
  367.                             + " must be greater than 0");
  368.     if (maxReplication >= (int)Short.MAX_VALUE)
  369.       throw new IOException(
  370.                             "Unexpected configuration parameters: dfs.replication.max = " 
  371.                             + maxReplication + " must be less than " + (Short.MAX_VALUE));
  372.     if (maxReplication < minReplication)
  373.       throw new IOException(
  374.                             "Unexpected configuration parameters: dfs.replication.min = " 
  375.                             + minReplication
  376.                             + " must be less than dfs.replication.max = " 
  377.                             + maxReplication);
  378.     this.maxReplicationStreams = conf.getInt("dfs.max-repl-streams", 2);
  379.     long heartbeatInterval = conf.getLong("dfs.heartbeat.interval", 3) * 1000;
  380.     this.heartbeatRecheckInterval = conf.getInt(
  381.         "heartbeat.recheck.interval", 5 * 60 * 1000); // 5 minutes
  382.     this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval +
  383.       10 * heartbeatInterval;
  384.     this.replicationRecheckInterval = 
  385.       conf.getInt("dfs.replication.interval", 3) * 1000L;
  386.     this.defaultBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
  387.     this.maxFsObjects = conf.getLong("dfs.max.objects", 0);
  388.     this.blockInvalidateLimit = Math.max(this.blockInvalidateLimit, 
  389.                                          20*(int)(heartbeatInterval/1000));
  390.     this.accessTimePrecision = conf.getLong("dfs.access.time.precision", 0);
  391.     this.supportAppends = conf.getBoolean("dfs.support.append", false);
  392.   }
  393.   /**
  394.    * Return the default path permission when upgrading from releases with no
  395.    * permissions (<=0.15) to releases with permissions (>=0.16)
  396.    */
  397.   protected PermissionStatus getUpgradePermission() {
  398.     return defaultPermission;
  399.   }
  400.   
  401.   /** Return the FSNamesystem object
  402.    * 
  403.    */
  404.   public static FSNamesystem getFSNamesystem() {
  405.     return fsNamesystemObject;
  406.   } 
  407.   NamespaceInfo getNamespaceInfo() {
  408.     return new NamespaceInfo(dir.fsImage.getNamespaceID(),
  409.                              dir.fsImage.getCTime(),
  410.                              getDistributedUpgradeVersion());
  411.   }
  412.   /**
  413.    * Close down this file system manager.
  414.    * Causes heartbeat and lease daemons to stop; waits briefly for
  415.    * them to finish, but a short timeout returns control back to caller.
  416.    */
  417.   public void close() {
  418.     fsRunning = false;
  419.     try {
  420.       if (pendingReplications != null) pendingReplications.stop();
  421.       if (hbthread != null) hbthread.interrupt();
  422.       if (replthread != null) replthread.interrupt();
  423.       if (dnthread != null) dnthread.interrupt();
  424.       if (smmthread != null) smmthread.interrupt();
  425.     } catch (Exception e) {
  426.       LOG.warn("Exception shutting down FSNamesystem", e);
  427.     } finally {
  428.       // using finally to ensure we also wait for lease daemon
  429.       try {
  430.         if (lmthread != null) {
  431.           lmthread.interrupt();
  432.           lmthread.join(3000);
  433.         }
  434.         dir.close();
  435.       } catch (InterruptedException ie) {
  436.       } catch (IOException ie) {
  437.         LOG.error("Error closing FSDirectory", ie);
  438.         IOUtils.cleanup(LOG, dir);
  439.       }
  440.     }
  441.   }
  442.   /** Is this name system running? */
  443.   boolean isRunning() {
  444.     return fsRunning;
  445.   }
  446.   /**
  447.    * Dump all metadata into specified file
  448.    */
  449.   synchronized void metaSave(String filename) throws IOException {
  450.     checkSuperuserPrivilege();
  451.     File file = new File(System.getProperty("hadoop.log.dir"), 
  452.                          filename);
  453.     PrintWriter out = new PrintWriter(new BufferedWriter(
  454.                                                          new FileWriter(file, true)));
  455.  
  456.     //
  457.     // Dump contents of neededReplication
  458.     //
  459.     synchronized (neededReplications) {
  460.       out.println("Metasave: Blocks waiting for replication: " + 
  461.                   neededReplications.size());
  462.       for (Block block : neededReplications) {
  463.         List<DatanodeDescriptor> containingNodes =
  464.                                           new ArrayList<DatanodeDescriptor>();
  465.         NumberReplicas numReplicas = new NumberReplicas();
  466.         // source node returned is not used
  467.         chooseSourceDatanode(block, containingNodes, numReplicas);
  468.         int usableReplicas = numReplicas.liveReplicas() + 
  469.                              numReplicas.decommissionedReplicas(); 
  470.         // l: == live:, d: == decommissioned c: == corrupt e: == excess
  471.         out.print(block + " (replicas:" +
  472.                   " l: " + numReplicas.liveReplicas() + 
  473.                   " d: " + numReplicas.decommissionedReplicas() + 
  474.                   " c: " + numReplicas.corruptReplicas() + 
  475.                   " e: " + numReplicas.excessReplicas() + 
  476.                   ((usableReplicas > 0)? "" : " MISSING") + ")"); 
  477.         for (Iterator<DatanodeDescriptor> jt = blocksMap.nodeIterator(block);
  478.              jt.hasNext();) {
  479.           DatanodeDescriptor node = jt.next();
  480.           out.print(" " + node + " : ");
  481.         }
  482.         out.println("");
  483.       }
  484.     }
  485.     //
  486.     // Dump blocks from pendingReplication
  487.     //
  488.     pendingReplications.metaSave(out);
  489.     //
  490.     // Dump blocks that are waiting to be deleted
  491.     //
  492.     dumpRecentInvalidateSets(out);
  493.     //
  494.     // Dump all datanodes
  495.     //
  496.     datanodeDump(out);
  497.     out.flush();
  498.     out.close();
  499.   }
  500.   long getDefaultBlockSize() {
  501.     return defaultBlockSize;
  502.   }
  503.   long getAccessTimePrecision() {
  504.     return accessTimePrecision;
  505.   }
  506.   private boolean isAccessTimeSupported() {
  507.     return accessTimePrecision > 0;
  508.   }
  509.     
  510.   /* get replication factor of a block */
  511.   private int getReplication(Block block) {
  512.     INodeFile fileINode = blocksMap.getINode(block);
  513.     if (fileINode == null) { // block does not belong to any file
  514.       return 0;
  515.     }
  516.     assert !fileINode.isDirectory() : "Block cannot belong to a directory.";
  517.     return fileINode.getReplication();
  518.   }
  519.   /* updates a block in under replication queue */
  520.   synchronized void updateNeededReplications(Block block,
  521.                         int curReplicasDelta, int expectedReplicasDelta) {
  522.     NumberReplicas repl = countNodes(block);
  523.     int curExpectedReplicas = getReplication(block);
  524.     neededReplications.update(block, 
  525.                               repl.liveReplicas(), 
  526.                               repl.decommissionedReplicas(),
  527.                               curExpectedReplicas,
  528.                               curReplicasDelta, expectedReplicasDelta);
  529.   }
  530.   /////////////////////////////////////////////////////////
  531.   //
  532.   // These methods are called by secondary namenodes
  533.   //
  534.   /////////////////////////////////////////////////////////
  535.   /**
  536.    * return a list of blocks & their locations on <code>datanode</code> whose
  537.    * total size is <code>size</code>
  538.    * 
  539.    * @param datanode on which blocks are located
  540.    * @param size total size of blocks
  541.    */
  542.   synchronized BlocksWithLocations getBlocks(DatanodeID datanode, long size)
  543.       throws IOException {
  544.     checkSuperuserPrivilege();
  545.     DatanodeDescriptor node = getDatanode(datanode);
  546.     if (node == null) {
  547.       NameNode.stateChangeLog.warn("BLOCK* NameSystem.getBlocks: "
  548.           + "Asking for blocks from an unrecorded node " + datanode.getName());
  549.       throw new IllegalArgumentException(
  550.           "Unexpected exception.  Got getBlocks message for datanode " + 
  551.           datanode.getName() + ", but there is no info for it");
  552.     }
  553.     int numBlocks = node.numBlocks();
  554.     if(numBlocks == 0) {
  555.       return new BlocksWithLocations(new BlockWithLocations[0]);
  556.     }
  557.     Iterator<Block> iter = node.getBlockIterator();
  558.     int startBlock = r.nextInt(numBlocks); // starting from a random block
  559.     // skip blocks
  560.     for(int i=0; i<startBlock; i++) {
  561.       iter.next();
  562.     }
  563.     List<BlockWithLocations> results = new ArrayList<BlockWithLocations>();
  564.     long totalSize = 0;
  565.     while(totalSize<size && iter.hasNext()) {
  566.       totalSize += addBlock(iter.next(), results);
  567.     }
  568.     if(totalSize<size) {
  569.       iter = node.getBlockIterator(); // start from the beginning
  570.       for(int i=0; i<startBlock&&totalSize<size; i++) {
  571.         totalSize += addBlock(iter.next(), results);
  572.       }
  573.     }
  574.     
  575.     return new BlocksWithLocations(
  576.         results.toArray(new BlockWithLocations[results.size()]));
  577.   }
  578.   
  579.   /**
  580.    * Get all valid locations of the block & add the block to results
  581.    * return the length of the added block; 0 if the block is not added
  582.    */
  583.   private long addBlock(Block block, List<BlockWithLocations> results) {
  584.     ArrayList<String> machineSet =
  585.       new ArrayList<String>(blocksMap.numNodes(block));
  586.     for(Iterator<DatanodeDescriptor> it = 
  587.       blocksMap.nodeIterator(block); it.hasNext();) {
  588.       String storageID = it.next().getStorageID();
  589.       // filter invalidate replicas
  590.       Collection<Block> blocks = recentInvalidateSets.get(storageID); 
  591.       if(blocks==null || !blocks.contains(block)) {
  592.         machineSet.add(storageID);
  593.       }
  594.     }
  595.     if(machineSet.size() == 0) {
  596.       return 0;
  597.     } else {
  598.       results.add(new BlockWithLocations(block, 
  599.           machineSet.toArray(new String[machineSet.size()])));
  600.       return block.getNumBytes();
  601.     }
  602.   }
  603.   /////////////////////////////////////////////////////////
  604.   //
  605.   // These methods are called by HadoopFS clients
  606.   //
  607.   /////////////////////////////////////////////////////////
  608.   /**
  609.    * Set permissions for an existing file.
  610.    * @throws IOException
  611.    */
  612.   public synchronized void setPermission(String src, FsPermission permission
  613.       ) throws IOException {
  614.     checkOwner(src);
  615.     dir.setPermission(src, permission);
  616.     getEditLog().logSync();
  617.     if (auditLog.isInfoEnabled()) {
  618.       final FileStatus stat = dir.getFileInfo(src);
  619.       logAuditEvent(UserGroupInformation.getCurrentUGI(),
  620.                     Server.getRemoteIp(),
  621.                     "setPermission", src, null, stat);
  622.     }
  623.   }
  624.   /**
  625.    * Set owner for an existing file.
  626.    * @throws IOException
  627.    */
  628.   public synchronized void setOwner(String src, String username, String group
  629.       ) throws IOException {
  630.     PermissionChecker pc = checkOwner(src);
  631.     if (!pc.isSuper) {
  632.       if (username != null && !pc.user.equals(username)) {
  633.         throw new AccessControlException("Non-super user cannot change owner.");
  634.       }
  635.       if (group != null && !pc.containsGroup(group)) {
  636.         throw new AccessControlException("User does not belong to " + group
  637.             + " .");
  638.       }
  639.     }
  640.     dir.setOwner(src, username, group);
  641.     getEditLog().logSync();
  642.     if (auditLog.isInfoEnabled()) {
  643.       final FileStatus stat = dir.getFileInfo(src);
  644.       logAuditEvent(UserGroupInformation.getCurrentUGI(),
  645.                     Server.getRemoteIp(),
  646.                     "setOwner", src, null, stat);
  647.     }
  648.   }
  649.   /**
  650.    * Get block locations within the specified range.
  651.    * 
  652.    * @see #getBlockLocations(String, long, long)
  653.    */
  654.   LocatedBlocks getBlockLocations(String clientMachine, String src,
  655.       long offset, long length) throws IOException {
  656.     if (isPermissionEnabled) {
  657.       checkPathAccess(src, FsAction.READ);
  658.     }
  659.     LocatedBlocks blocks = getBlockLocations(src, offset, length, true);
  660.     if (blocks != null) {
  661.       //sort the blocks
  662.       DatanodeDescriptor client = host2DataNodeMap.getDatanodeByHost(
  663.           clientMachine);
  664.       for (LocatedBlock b : blocks.getLocatedBlocks()) {
  665.         clusterMap.pseudoSortByDistance(client, b.getLocations());
  666.       }
  667.     }
  668.     return blocks;
  669.   }
  670.   /**
  671.    * Get block locations within the specified range.
  672.    * @see ClientProtocol#getBlockLocations(String, long, long)
  673.    */
  674.   public LocatedBlocks getBlockLocations(String src, long offset, long length
  675.       ) throws IOException {
  676.     return getBlockLocations(src, offset, length, false);
  677.   }
  678.   /**
  679.    * Get block locations within the specified range.
  680.    * @see ClientProtocol#getBlockLocations(String, long, long)
  681.    */
  682.   public LocatedBlocks getBlockLocations(String src, long offset, long length,
  683.       boolean doAccessTime) throws IOException {
  684.     if (offset < 0) {
  685.       throw new IOException("Negative offset is not supported. File: " + src );
  686.     }
  687.     if (length < 0) {
  688.       throw new IOException("Negative length is not supported. File: " + src );
  689.     }
  690.     final LocatedBlocks ret = getBlockLocationsInternal(src, dir.getFileINode(src),
  691.         offset, length, Integer.MAX_VALUE, doAccessTime);  
  692.     if (auditLog.isInfoEnabled()) {
  693.       logAuditEvent(UserGroupInformation.getCurrentUGI(),
  694.                     Server.getRemoteIp(),
  695.                     "open", src, null, null);
  696.     }
  697.     return ret;
  698.   }
  699.   private synchronized LocatedBlocks getBlockLocationsInternal(String src,
  700.                                                        INodeFile inode,
  701.                                                        long offset, 
  702.                                                        long length,
  703.                                                        int nrBlocksToReturn,
  704.                                                        boolean doAccessTime) 
  705.                                                        throws IOException {
  706.     if(inode == null) {
  707.       return null;
  708.     }
  709.     if (doAccessTime && isAccessTimeSupported()) {
  710.       dir.setTimes(src, inode, -1, now(), false);
  711.     }
  712.     Block[] blocks = inode.getBlocks();
  713.     if (blocks == null) {
  714.       return null;
  715.     }
  716.     if (blocks.length == 0) {
  717.       return inode.createLocatedBlocks(new ArrayList<LocatedBlock>(blocks.length));
  718.     }
  719.     List<LocatedBlock> results;
  720.     results = new ArrayList<LocatedBlock>(blocks.length);
  721.     int curBlk = 0;
  722.     long curPos = 0, blkSize = 0;
  723.     int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length;
  724.     for (curBlk = 0; curBlk < nrBlocks; curBlk++) {
  725.       blkSize = blocks[curBlk].getNumBytes();
  726.       assert blkSize > 0 : "Block of size 0";
  727.       if (curPos + blkSize > offset) {
  728.         break;
  729.       }
  730.       curPos += blkSize;
  731.     }
  732.     
  733.     if (nrBlocks > 0 && curBlk == nrBlocks)   // offset >= end of file
  734.       return null;
  735.     
  736.     long endOff = offset + length;
  737.     
  738.     do {
  739.       // get block locations
  740.       int numNodes = blocksMap.numNodes(blocks[curBlk]);
  741.       int numCorruptNodes = countNodes(blocks[curBlk]).corruptReplicas();
  742.       int numCorruptReplicas = corruptReplicas.numCorruptReplicas(blocks[curBlk]); 
  743.       if (numCorruptNodes != numCorruptReplicas) {
  744.         LOG.warn("Inconsistent number of corrupt replicas for " + 
  745.             blocks[curBlk] + "blockMap has " + numCorruptNodes + 
  746.             " but corrupt replicas map has " + numCorruptReplicas);
  747.       }
  748.       boolean blockCorrupt = (numCorruptNodes == numNodes);
  749.       int numMachineSet = blockCorrupt ? numNodes : 
  750.                             (numNodes - numCorruptNodes);
  751.       DatanodeDescriptor[] machineSet = new DatanodeDescriptor[numMachineSet];
  752.       if (numMachineSet > 0) {
  753.         numNodes = 0;
  754.         for(Iterator<DatanodeDescriptor> it = 
  755.             blocksMap.nodeIterator(blocks[curBlk]); it.hasNext();) {
  756.           DatanodeDescriptor dn = it.next();
  757.           boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blocks[curBlk], dn);
  758.           if (blockCorrupt || (!blockCorrupt && !replicaCorrupt))
  759.             machineSet[numNodes++] = dn;
  760.         }
  761.       }
  762.       results.add(new LocatedBlock(blocks[curBlk], machineSet, curPos,
  763.                   blockCorrupt));
  764.       curPos += blocks[curBlk].getNumBytes();
  765.       curBlk++;
  766.     } while (curPos < endOff 
  767.           && curBlk < blocks.length 
  768.           && results.size() < nrBlocksToReturn);
  769.     
  770.     return inode.createLocatedBlocks(results);
  771.   }
  772.   /**
  773.    * stores the modification and access time for this inode. 
  774.    * The access time is precise upto an hour. The transaction, if needed, is
  775.    * written to the edits log but is not flushed.
  776.    */
  777.   public synchronized void setTimes(String src, long mtime, long atime) throws IOException {
  778.     if (!isAccessTimeSupported() && atime != -1) {
  779.       throw new IOException("Access time for hdfs is not configured. " +
  780.                             " Please set dfs.support.accessTime configuration parameter.");
  781.     }
  782.     //
  783.     // The caller needs to have write access to set access & modification times.
  784.     if (isPermissionEnabled) {
  785.       checkPathAccess(src, FsAction.WRITE);
  786.     }
  787.     INodeFile inode = dir.getFileINode(src);
  788.     if (inode != null) {
  789.       dir.setTimes(src, inode, mtime, atime, true);
  790.       if (auditLog.isInfoEnabled()) {
  791.         final FileStatus stat = dir.getFileInfo(src);
  792.         logAuditEvent(UserGroupInformation.getCurrentUGI(),
  793.                       Server.getRemoteIp(),
  794.                       "setTimes", src, null, stat);
  795.       }
  796.     } else {
  797.       throw new FileNotFoundException("File " + src + " does not exist.");
  798.     }
  799.   }
  800.   /**
  801.    * Set replication for an existing file.
  802.    * 
  803.    * The NameNode sets new replication and schedules either replication of 
  804.    * under-replicated data blocks or removal of the eccessive block copies 
  805.    * if the blocks are over-replicated.
  806.    * 
  807.    * @see ClientProtocol#setReplication(String, short)
  808.    * @param src file name
  809.    * @param replication new replication
  810.    * @return true if successful; 
  811.    *         false if file does not exist or is a directory
  812.    */
  813.   public boolean setReplication(String src, short replication) 
  814.                                 throws IOException {
  815.     boolean status = setReplicationInternal(src, replication);
  816.     getEditLog().logSync();
  817.     if (status && auditLog.isInfoEnabled()) {
  818.       logAuditEvent(UserGroupInformation.getCurrentUGI(),
  819.                     Server.getRemoteIp(),
  820.                     "setReplication", src, null, null);
  821.     }
  822.     return status;
  823.   }
  824.   private synchronized boolean setReplicationInternal(String src, 
  825.                                              short replication
  826.                                              ) throws IOException {
  827.     if (isInSafeMode())
  828.       throw new SafeModeException("Cannot set replication for " + src, safeMode);
  829.     verifyReplication(src, replication, null);
  830.     if (isPermissionEnabled) {
  831.       checkPathAccess(src, FsAction.WRITE);
  832.     }
  833.     int[] oldReplication = new int[1];
  834.     Block[] fileBlocks;
  835.     fileBlocks = dir.setReplication(src, replication, oldReplication);
  836.     if (fileBlocks == null)  // file not found or is a directory
  837.       return false;
  838.     int oldRepl = oldReplication[0];
  839.     if (oldRepl == replication) // the same replication
  840.       return true;
  841.     // update needReplication priority queues
  842.     for(int idx = 0; idx < fileBlocks.length; idx++)
  843.       updateNeededReplications(fileBlocks[idx], 0, replication-oldRepl);
  844.       
  845.     if (oldRepl > replication) {  
  846.       // old replication > the new one; need to remove copies
  847.       LOG.info("Reducing replication for file " + src 
  848.                + ". New replication is " + replication);
  849.       for(int idx = 0; idx < fileBlocks.length; idx++)
  850.         processOverReplicatedBlock(fileBlocks[idx], replication, null, null);
  851.     } else { // replication factor is increased
  852.       LOG.info("Increasing replication for file " + src 
  853.           + ". New replication is " + replication);
  854.     }
  855.     return true;
  856.   }
  857.     
  858.   long getPreferredBlockSize(String filename) throws IOException {
  859.     if (isPermissionEnabled) {
  860.       checkTraverse(filename);
  861.     }
  862.     return dir.getPreferredBlockSize(filename);
  863.   }
  864.     
  865.   /**
  866.    * Check whether the replication parameter is within the range
  867.    * determined by system configuration.
  868.    */
  869.   private void verifyReplication(String src, 
  870.                                  short replication, 
  871.                                  String clientName 
  872.                                  ) throws IOException {
  873.     String text = "file " + src 
  874.       + ((clientName != null) ? " on client " + clientName : "")
  875.       + ".n"
  876.       + "Requested replication " + replication;
  877.     if (replication > maxReplication)
  878.       throw new IOException(text + " exceeds maximum " + maxReplication);
  879.       
  880.     if (replication < minReplication)
  881.       throw new IOException( 
  882.                             text + " is less than the required minimum " + minReplication);
  883.   }
  884.   /**
  885.    * Create a new file entry in the namespace.
  886.    * 
  887.    * @see ClientProtocol#create(String, FsPermission, String, boolean, short, long)
  888.    * 
  889.    * @throws IOException if file name is invalid
  890.    *         {@link FSDirectory#isValidToCreate(String)}.
  891.    */
  892.   void startFile(String src, PermissionStatus permissions,
  893.                  String holder, String clientMachine,
  894.                  boolean overwrite, short replication, long blockSize
  895.                 ) throws IOException {
  896.     startFileInternal(src, permissions, holder, clientMachine, overwrite, false,
  897.                       replication, blockSize);
  898.     getEditLog().logSync();
  899.     if (auditLog.isInfoEnabled()) {
  900.       final FileStatus stat = dir.getFileInfo(src);
  901.       logAuditEvent(UserGroupInformation.getCurrentUGI(),
  902.                     Server.getRemoteIp(),
  903.                     "create", src, null, stat);
  904.     }
  905.   }
  906.   private synchronized void startFileInternal(String src,
  907.                                               PermissionStatus permissions,
  908.                                               String holder, 
  909.                                               String clientMachine, 
  910.                                               boolean overwrite,
  911.                                               boolean append,
  912.                                               short replication,
  913.                                               long blockSize
  914.                                               ) throws IOException {
  915.     if (NameNode.stateChangeLog.isDebugEnabled()) {
  916.       NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: src=" + src
  917.           + ", holder=" + holder
  918.           + ", clientMachine=" + clientMachine
  919.           + ", replication=" + replication
  920.           + ", overwrite=" + overwrite
  921.           + ", append=" + append);
  922.     }
  923.     if (isInSafeMode())
  924.       throw new SafeModeException("Cannot create file" + src, safeMode);
  925.     if (!DFSUtil.isValidName(src)) {
  926.       throw new IOException("Invalid file name: " + src);
  927.     }
  928.     // Verify that the destination does not exist as a directory already.
  929.     boolean pathExists = dir.exists(src);
  930.     if (pathExists && dir.isDir(src)) {
  931.       throw new IOException("Cannot create file "+ src + "; already exists as a directory.");
  932.     }
  933.     if (isPermissionEnabled) {
  934.       if (append || (overwrite && pathExists)) {
  935.         checkPathAccess(src, FsAction.WRITE);
  936.       }
  937.       else {
  938.         checkAncestorAccess(src, FsAction.WRITE);
  939.       }
  940.     }
  941.     try {
  942.       INode myFile = dir.getFileINode(src);
  943.       if (myFile != null && myFile.isUnderConstruction()) {
  944.         INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) myFile;
  945.         //
  946.         // If the file is under construction , then it must be in our
  947.         // leases. Find the appropriate lease record.
  948.         //
  949.         Lease lease = leaseManager.getLease(holder);
  950.         //
  951.         // We found the lease for this file. And surprisingly the original
  952.         // holder is trying to recreate this file. This should never occur.
  953.         //
  954.         if (lease != null) {
  955.           throw new AlreadyBeingCreatedException(
  956.                                                  "failed to create file " + src + " for " + holder +
  957.                                                  " on client " + clientMachine + 
  958.                                                  " because current leaseholder is trying to recreate file.");
  959.         }
  960.         //
  961.         // Find the original holder.
  962.         //
  963.         lease = leaseManager.getLease(pendingFile.clientName);
  964.         if (lease == null) {
  965.           throw new AlreadyBeingCreatedException(
  966.                                                  "failed to create file " + src + " for " + holder +
  967.                                                  " on client " + clientMachine + 
  968.                                                  " because pendingCreates is non-null but no leases found.");
  969.         }
  970.         //
  971.         // If the original holder has not renewed in the last SOFTLIMIT 
  972.         // period, then start lease recovery.
  973.         //
  974.         if (lease.expiredSoftLimit()) {
  975.           LOG.info("startFile: recover lease " + lease + ", src=" + src);
  976.           internalReleaseLease(lease, src);
  977.         }
  978.         throw new AlreadyBeingCreatedException("failed to create file " + src + " for " + holder +
  979.                                                " on client " + clientMachine + 
  980.                                                ", because this file is already being created by " +
  981.                                                pendingFile.getClientName() + 
  982.                                                " on " + pendingFile.getClientMachine());
  983.       }
  984.       try {
  985.         verifyReplication(src, replication, clientMachine);
  986.       } catch(IOException e) {
  987.         throw new IOException("failed to create "+e.getMessage());
  988.       }
  989.       if (append) {
  990.         if (myFile == null) {
  991.           throw new FileNotFoundException("failed to append to non-existent file "
  992.               + src + " on client " + clientMachine);
  993.         } else if (myFile.isDirectory()) {
  994.           throw new IOException("failed to append to directory " + src 
  995.                                 +" on client " + clientMachine);
  996.         }
  997.       } else if (!dir.isValidToCreate(src)) {
  998.         if (overwrite) {
  999.           delete(src, true);
  1000.         } else {
  1001.           throw new IOException("failed to create file " + src 
  1002.                                 +" on client " + clientMachine
  1003.                                 +" either because the filename is invalid or the file exists");
  1004.         }
  1005.       }
  1006.       DatanodeDescriptor clientNode = 
  1007.         host2DataNodeMap.getDatanodeByHost(clientMachine);
  1008.       if (append) {
  1009.         //
  1010.         // Replace current node with a INodeUnderConstruction.
  1011.         // Recreate in-memory lease record.
  1012.         //
  1013.         INodeFile node = (INodeFile) myFile;
  1014.         INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
  1015.                                         node.getLocalNameBytes(),
  1016.                                         node.getReplication(),
  1017.                                         node.getModificationTime(),
  1018.                                         node.getPreferredBlockSize(),
  1019.                                         node.getBlocks(),
  1020.                                         node.getPermissionStatus(),
  1021.                                         holder,
  1022.                                         clientMachine,
  1023.                                         clientNode);
  1024.         dir.replaceNode(src, node, cons);
  1025.         leaseManager.addLease(cons.clientName, src);
  1026.       } else {
  1027.        // Now we can add the name to the filesystem. This file has no
  1028.        // blocks associated with it.
  1029.        //
  1030.        checkFsObjectLimit();
  1031.         // increment global generation stamp
  1032.         long genstamp = nextGenerationStamp();
  1033.         INodeFileUnderConstruction newNode = dir.addFile(src, permissions,
  1034.             replication, blockSize, holder, clientMachine, clientNode, genstamp);
  1035.         if (newNode == null) {
  1036.           throw new IOException("DIR* NameSystem.startFile: " +
  1037.                                 "Unable to add file to namespace.");
  1038.         }
  1039.         leaseManager.addLease(newNode.clientName, src);
  1040.         if (NameNode.stateChangeLog.isDebugEnabled()) {
  1041.           NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: "
  1042.                                      +"add "+src+" to namespace for "+holder);
  1043.         }
  1044.       }
  1045.     } catch (IOException ie) {
  1046.       NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: "
  1047.                                    +ie.getMessage());
  1048.       throw ie;
  1049.     }
  1050.   }
  1051.   /**
  1052.    * Append to an existing file in the namespace.
  1053.    */
  1054.   LocatedBlock appendFile(String src, String holder, String clientMachine
  1055.       ) throws IOException {
  1056.     if (supportAppends == false) {
  1057.       throw new IOException("Append to hdfs not supported." +
  1058.                             " Please refer to dfs.support.append configuration parameter.");
  1059.     }
  1060.     startFileInternal(src, null, holder, clientMachine, false, true, 
  1061.                       (short)maxReplication, (long)0);
  1062.     getEditLog().logSync();
  1063.     //
  1064.     // Create a LocatedBlock object for the last block of the file
  1065.     // to be returned to the client. Return null if the file does not
  1066.     // have a partial block at the end.
  1067.     //
  1068.     LocatedBlock lb = null;
  1069.     synchronized (this) {
  1070.       INodeFileUnderConstruction file = (INodeFileUnderConstruction)dir.getFileINode(src);
  1071.       Block[] blocks = file.getBlocks();
  1072.       if (blocks != null && blocks.length > 0) {
  1073.         Block last = blocks[blocks.length-1];
  1074.         BlockInfo storedBlock = blocksMap.getStoredBlock(last);
  1075.         if (file.getPreferredBlockSize() > storedBlock.getNumBytes()) {
  1076.           long fileLength = file.computeContentSummary().getLength();
  1077.           DatanodeDescriptor[] targets = new DatanodeDescriptor[blocksMap.numNodes(last)];
  1078.           Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(last);
  1079.           for (int i = 0; it != null && it.hasNext(); i++) {
  1080.             targets[i] = it.next();
  1081.           }
  1082.           // remove the replica locations of this block from the blocksMap
  1083.           for (int i = 0; i < targets.length; i++) {
  1084.             targets[i].removeBlock(storedBlock);
  1085.           }
  1086.           // set the locations of the last block in the lease record
  1087.           file.setLastBlock(storedBlock, targets);
  1088.           lb = new LocatedBlock(last, targets, 
  1089.                                 fileLength-storedBlock.getNumBytes());
  1090.           // Remove block from replication queue.
  1091.           updateNeededReplications(last, 0, 0);
  1092.           // remove this block from the list of pending blocks to be deleted. 
  1093.           // This reduces the possibility of triggering HADOOP-1349.
  1094.           //
  1095.           for(Collection<Block> v : recentInvalidateSets.values()) {
  1096.             v.remove(last);
  1097.           }
  1098.         }
  1099.       }
  1100.     }
  1101.     if (lb != null) {
  1102.       if (NameNode.stateChangeLog.isDebugEnabled()) {
  1103.         NameNode.stateChangeLog.debug("DIR* NameSystem.appendFile: file "
  1104.             +src+" for "+holder+" at "+clientMachine
  1105.             +" block " + lb.getBlock()
  1106.             +" block size " + lb.getBlock().getNumBytes());
  1107.       }
  1108.     }
  1109.     if (auditLog.isInfoEnabled()) {
  1110.       logAuditEvent(UserGroupInformation.getCurrentUGI(),
  1111.                     Server.getRemoteIp(),
  1112.                     "append", src, null, null);
  1113.     }
  1114.     return lb;
  1115.   }
  1116.   /**
  1117.    * The client would like to obtain an additional block for the indicated
  1118.    * filename (which is being written-to).  Return an array that consists
  1119.    * of the block, plus a set of machines.  The first on this list should
  1120.    * be where the client writes data.  Subsequent items in the list must
  1121.    * be provided in the connection to the first datanode.
  1122.    *
  1123.    * Make sure the previous blocks have been reported by datanodes and
  1124.    * are replicated.  Will return an empty 2-elt array if we want the
  1125.    * client to "try again later".
  1126.    */
  1127.   public LocatedBlock getAdditionalBlock(String src, 
  1128.                                          String clientName
  1129.                                          ) throws IOException {
  1130.     long fileLength, blockSize;
  1131.     int replication;
  1132.     DatanodeDescriptor clientNode = null;
  1133.     Block newBlock = null;
  1134.     NameNode.stateChangeLog.debug("BLOCK* NameSystem.getAdditionalBlock: file "
  1135.                                   +src+" for "+clientName);
  1136.     synchronized (this) {
  1137.       if (isInSafeMode()) {
  1138.         throw new SafeModeException("Cannot add block to " + src, safeMode);
  1139.       }
  1140.       // have we exceeded the configured limit of fs objects.
  1141.       checkFsObjectLimit();
  1142.       INodeFileUnderConstruction pendingFile  = checkLease(src, clientName);
  1143.       //
  1144.       // If we fail this, bad things happen!
  1145.       //
  1146.       if (!checkFileProgress(pendingFile, false)) {
  1147.         throw new NotReplicatedYetException("Not replicated yet:" + src);
  1148.       }
  1149.       fileLength = pendingFile.computeContentSummary().getLength();
  1150.       blockSize = pendingFile.getPreferredBlockSize();
  1151.       clientNode = pendingFile.getClientNode();
  1152.       replication = (int)pendingFile.getReplication();
  1153.     }
  1154.     // choose targets for the new block tobe allocated.
  1155.     DatanodeDescriptor targets[] = replicator.chooseTarget(replication,
  1156.                                                            clientNode,
  1157.                                                            null,
  1158.                                                            blockSize);
  1159.     if (targets.length < this.minReplication) {
  1160.       throw new IOException("File " + src + " could only be replicated to " +
  1161.                             targets.length + " nodes, instead of " +
  1162.                             minReplication);
  1163.     }
  1164.     // Allocate a new block and record it in the INode. 
  1165.     synchronized (this) {
  1166.       INode[] pathINodes = dir.getExistingPathINodes(src);
  1167.       int inodesLen = pathINodes.length;
  1168.       checkLease(src, clientName, pathINodes[inodesLen-1]);
  1169.       INodeFileUnderConstruction pendingFile  = (INodeFileUnderConstruction) 
  1170.                                                 pathINodes[inodesLen - 1];
  1171.                                                            
  1172.       if (!checkFileProgress(pendingFile, false)) {
  1173.         throw new NotReplicatedYetException("Not replicated yet:" + src);
  1174.       }
  1175.       // allocate new block record block locations in INode.
  1176.       newBlock = allocateBlock(src, pathINodes);
  1177.       pendingFile.setTargets(targets);
  1178.       
  1179.       for (DatanodeDescriptor dn : targets) {
  1180.         dn.incBlocksScheduled();
  1181.       }      
  1182.     }
  1183.         
  1184.     // Create next block
  1185.     return new LocatedBlock(newBlock, targets, fileLength);
  1186.   }
  1187.   /**
  1188.    * The client would like to let go of the given block
  1189.    */
  1190.   public synchronized boolean abandonBlock(Block b, String src, String holder
  1191.       ) throws IOException {
  1192.     //
  1193.     // Remove the block from the pending creates list
  1194.     //
  1195.     NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
  1196.                                   +b+"of file "+src);
  1197.     INodeFileUnderConstruction file = checkLease(src, holder);
  1198.     dir.removeBlock(src, file, b);
  1199.     NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
  1200.                                     + b
  1201.                                     + " is removed from pendingCreates");
  1202.     return true;
  1203.   }
  1204.   
  1205.   // make sure that we still have the lease on this file.
  1206.   private INodeFileUnderConstruction checkLease(String src, String holder) 
  1207.                                                       throws IOException {
  1208.     INodeFile file = dir.getFileINode(src);
  1209.     checkLease(src, holder, file);
  1210.     return (INodeFileUnderConstruction)file;
  1211.   }
  1212.   private void checkLease(String src, String holder, INode file) 
  1213.                                                      throws IOException {
  1214.     if (file == null || file.isDirectory()) {
  1215.       Lease lease = leaseManager.getLease(holder);
  1216.       throw new LeaseExpiredException("No lease on " + src +
  1217.                                       " File does not exist. " +
  1218.                                       (lease != null ? lease.toString() :
  1219.                                        "Holder " + holder + 
  1220.                                        " does not have any open files."));
  1221.     }
  1222.     if (!file.isUnderConstruction()) {
  1223.       Lease lease = leaseManager.getLease(holder);
  1224.       throw new LeaseExpiredException("No lease on " + src + 
  1225.                                       " File is not open for writing. " +
  1226.                                       (lease != null ? lease.toString() :
  1227.                                        "Holder " + holder + 
  1228.                                        " does not have any open files."));
  1229.     }
  1230.     INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)file;
  1231.     if (holder != null && !pendingFile.getClientName().equals(holder)) {
  1232.       throw new LeaseExpiredException("Lease mismatch on " + src + " owned by "
  1233.           + pendingFile.getClientName() + " but is accessed by " + holder);
  1234.     }
  1235.   }
  1236.   /**
  1237.    * The FSNamesystem will already know the blocks that make up the file.
  1238.    * Before we return, we make sure that all the file's blocks have 
  1239.    * been reported by datanodes and are replicated correctly.
  1240.    */
  1241.   
  1242.   enum CompleteFileStatus {
  1243.     OPERATION_FAILED,
  1244.     STILL_WAITING,
  1245.     COMPLETE_SUCCESS
  1246.   }
  1247.   
  1248.   public CompleteFileStatus completeFile(String src, String holder) throws IOException {
  1249.     CompleteFileStatus status = completeFileInternal(src, holder);
  1250.     getEditLog().logSync();
  1251.     return status;
  1252.   }
  1253.   private synchronized CompleteFileStatus completeFileInternal(String src, 
  1254.                                                 String holder) throws IOException {
  1255.     NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src + " for " + holder);
  1256.     if (isInSafeMode())
  1257.       throw new SafeModeException("Cannot complete file " + src, safeMode);
  1258.     INode iFile = dir.getFileINode(src);
  1259.     INodeFileUnderConstruction pendingFile = null;
  1260.     Block[] fileBlocks = null;
  1261.     if (iFile != null && iFile.isUnderConstruction()) {
  1262.       pendingFile = (INodeFileUnderConstruction) iFile;
  1263.       fileBlocks =  dir.getFileBlocks(src);
  1264.     }
  1265.     if (fileBlocks == null ) {    
  1266.       NameNode.stateChangeLog.warn("DIR* NameSystem.completeFile: "
  1267.                                    + "failed to complete " + src
  1268.                                    + " because dir.getFileBlocks() is null " + 
  1269.                                    " and pendingFile is " + 
  1270.                                    ((pendingFile == null) ? "null" : 
  1271.                                      ("from " + pendingFile.getClientMachine()))
  1272.                                   );                      
  1273.       return CompleteFileStatus.OPERATION_FAILED;
  1274.     } else if (!checkFileProgress(pendingFile, true)) {
  1275.       return CompleteFileStatus.STILL_WAITING;
  1276.     }
  1277.     finalizeINodeFileUnderConstruction(src, pendingFile);
  1278.     if (NameNode.stateChangeLog.isDebugEnabled()) {
  1279.       NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src
  1280.                                   + " blocklist persisted");
  1281.     }
  1282.     return CompleteFileStatus.COMPLETE_SUCCESS;
  1283.   }
  1284.   /** 
  1285.    * Check all blocks of a file. If any blocks are lower than their intended
  1286.    * replication factor, then insert them into neededReplication
  1287.    */
  1288.   private void checkReplicationFactor(INodeFile file) {
  1289.     int numExpectedReplicas = file.getReplication();
  1290.     Block[] pendingBlocks = file.getBlocks();
  1291.     int nrBlocks = pendingBlocks.length;
  1292.     for (int i = 0; i < nrBlocks; i++) {
  1293.       // filter out containingNodes that are marked for decommission.
  1294.       NumberReplicas number = countNodes(pendingBlocks[i]);
  1295.       if (number.liveReplicas() < numExpectedReplicas) {
  1296.         neededReplications.add(pendingBlocks[i], 
  1297.                                number.liveReplicas(), 
  1298.                                number.decommissionedReplicas,
  1299.                                numExpectedReplicas);
  1300.       }
  1301.     }
  1302.   }
  1303.   static Random randBlockId = new Random();
  1304.     
  1305.   /**
  1306.    * Allocate a block at the given pending filename
  1307.    * 
  1308.    * @param src path to the file
  1309.    * @param inodes INode representing each of the components of src. 
  1310.    *        <code>inodes[inodes.length-1]</code> is the INode for the file.
  1311.    */
  1312.   private Block allocateBlock(String src, INode[] inodes) throws IOException {
  1313.     Block b = new Block(FSNamesystem.randBlockId.nextLong(), 0, 0); 
  1314.     while(isValidBlock(b)) {
  1315.       b.setBlockId(FSNamesystem.randBlockId.nextLong());
  1316.     }
  1317.     b.setGenerationStamp(getGenerationStamp());
  1318.     b = dir.addBlock(src, inodes, b);
  1319.     NameNode.stateChangeLog.info("BLOCK* NameSystem.allocateBlock: "
  1320.                                  +src+ ". "+b);
  1321.     return b;
  1322.   }
  1323.   /**
  1324.    * Check that the indicated file's blocks are present and
  1325.    * replicated.  If not, return false. If checkall is true, then check
  1326.    * all blocks, otherwise check only penultimate block.
  1327.    */
  1328.   synchronized boolean checkFileProgress(INodeFile v, boolean checkall) {
  1329.     if (checkall) {
  1330.       //
  1331.       // check all blocks of the file.
  1332.       //
  1333.       for (Block block: v.getBlocks()) {
  1334.         if (blocksMap.numNodes(block) < this.minReplication) {
  1335.           return false;
  1336.         }
  1337.       }
  1338.     } else {
  1339.       //
  1340.       // check the penultimate block of this file
  1341.       //
  1342.       Block b = v.getPenultimateBlock();
  1343.       if (b != null) {
  1344.         if (blocksMap.numNodes(b) < this.minReplication) {
  1345.           return false;
  1346.         }
  1347.       }
  1348.     }
  1349.     return true;
  1350.   }
  1351.   /**
  1352.    * Remove a datanode from the invalidatesSet
  1353.    * @param n datanode
  1354.    */
  1355.   private void removeFromInvalidates(DatanodeInfo n) {
  1356.     recentInvalidateSets.remove(n.getStorageID());
  1357.   }
  1358.   /**
  1359.    * Adds block to list of blocks which will be invalidated on 
  1360.    * specified datanode and log the move
  1361.    * @param b block
  1362.    * @param n datanode
  1363.    */
  1364.   void addToInvalidates(Block b, DatanodeInfo n) {
  1365.     addToInvalidatesNoLog(b, n);
  1366.     NameNode.stateChangeLog.info("BLOCK* NameSystem.addToInvalidates: "
  1367.         + b.getBlockName() + " is added to invalidSet of " + n.getName());
  1368.   }
  1369.   /**
  1370.    * Adds block to list of blocks which will be invalidated on 
  1371.    * specified datanode
  1372.    * @param b block
  1373.    * @param n datanode
  1374.    */
  1375.   private void addToInvalidatesNoLog(Block b, DatanodeInfo n) {
  1376.     Collection<Block> invalidateSet = recentInvalidateSets.get(n.getStorageID());
  1377.     if (invalidateSet == null) {
  1378.       invalidateSet = new HashSet<Block>();
  1379.       recentInvalidateSets.put(n.getStorageID(), invalidateSet);
  1380.     }
  1381.     invalidateSet.add(b);
  1382.   }
  1383.   
  1384.   /**
  1385.    * Adds block to list of blocks which will be invalidated on 
  1386.    * all its datanodes.
  1387.    */
  1388.   private void addToInvalidates(Block b) {
  1389.     for (Iterator<DatanodeDescriptor> it = 
  1390.                                 blocksMap.nodeIterator(b); it.hasNext();) {
  1391.       DatanodeDescriptor node = it.next();
  1392.       addToInvalidates(b, node);
  1393.     }
  1394.   }
  1395.   /**
  1396.    * dumps the contents of recentInvalidateSets
  1397.    */
  1398.   private synchronized void dumpRecentInvalidateSets(PrintWriter out) {
  1399.     int size = recentInvalidateSets.values().size();
  1400.     out.println("Metasave: Blocks waiting deletion from "+size+" datanodes.");
  1401.     if (size == 0) {
  1402.       return;
  1403.     }
  1404.     for(Map.Entry<String,Collection<Block>> entry : recentInvalidateSets.entrySet()) {
  1405.       Collection<Block> blocks = entry.getValue();
  1406.       if (blocks.size() > 0) {
  1407.         out.println(datanodeMap.get(entry.getKey()).getName() + blocks);
  1408.       }
  1409.     }
  1410.   }
  1411.   /**
  1412.    * Mark the block belonging to datanode as corrupt
  1413.    * @param blk Block to be marked as corrupt
  1414.    * @param dn Datanode which holds the corrupt replica
  1415.    */
  1416.   public synchronized void markBlockAsCorrupt(Block blk, DatanodeInfo dn)
  1417.     throws IOException {
  1418.     DatanodeDescriptor node = getDatanode(dn);
  1419.     if (node == null) {
  1420.       throw new IOException("Cannot mark block" + blk.getBlockName() +
  1421.                             " as corrupt because datanode " + dn.getName() +
  1422.                             " does not exist. ");
  1423.     }
  1424.     
  1425.     final BlockInfo storedBlockInfo = blocksMap.getStoredBlock(blk);
  1426.     if (storedBlockInfo == null) {
  1427.       // Check if the replica is in the blockMap, if not 
  1428.       // ignore the request for now. This could happen when BlockScanner
  1429.       // thread of Datanode reports bad block before Block reports are sent
  1430.       // by the Datanode on startup
  1431.       NameNode.stateChangeLog.info("BLOCK NameSystem.markBlockAsCorrupt: " +
  1432.                                    "block " + blk + " could not be marked " +
  1433.                                    "as corrupt as it does not exists in " +
  1434.                                    "blocksMap");
  1435.     } else {
  1436.       INodeFile inode = storedBlockInfo.getINode();
  1437.       if (inode == null) {
  1438.         NameNode.stateChangeLog.info("BLOCK NameSystem.markBlockAsCorrupt: " +
  1439.                                      "block " + blk + " could not be marked " +
  1440.                                      "as corrupt as it does not belong to " +
  1441.                                      "any file");
  1442.         addToInvalidates(storedBlockInfo, node);
  1443.         return;
  1444.       } 
  1445.       // Add this replica to corruptReplicas Map 
  1446.       corruptReplicas.addToCorruptReplicasMap(storedBlockInfo, node);
  1447.       if (countNodes(storedBlockInfo).liveReplicas()>inode.getReplication()) {
  1448.         // the block is over-replicated so invalidate the replicas immediately
  1449.         invalidateBlock(storedBlockInfo, node);
  1450.       } else {
  1451.         // add the block to neededReplication 
  1452.         updateNeededReplications(storedBlockInfo, -1, 0);
  1453.       }
  1454.     }
  1455.   }
  1456.   /**
  1457.    * Invalidates the given block on the given datanode.
  1458.    */
  1459.   public synchronized void invalidateBlock(Block blk, DatanodeInfo dn)
  1460.     throws IOException {
  1461.     NameNode.stateChangeLog.info("DIR* NameSystem.invalidateBlock: " 
  1462.                                  + blk + " on " 
  1463.                                  + dn.getName());
  1464.     DatanodeDescriptor node = getDatanode(dn);
  1465.     if (node == null) {
  1466.       throw new IOException("Cannot invalidate block " + blk +
  1467.                             " because datanode " + dn.getName() +
  1468.                             " does not exist.");
  1469.     }
  1470.     // Check how many copies we have of the block.  If we have at least one
  1471.     // copy on a live node, then we can delete it. 
  1472.     int count = countNodes(blk).liveReplicas();
  1473.     if (count > 1) {
  1474.       addToInvalidates(blk, dn);
  1475.       removeStoredBlock(blk, node);
  1476.       NameNode.stateChangeLog.debug("BLOCK* NameSystem.invalidateBlocks: "
  1477.                                    + blk + " on " 
  1478.                                    + dn.getName() + " listed for deletion.");
  1479.     } else {
  1480.       NameNode.stateChangeLog.info("BLOCK* NameSystem.invalidateBlocks: "
  1481.                                    + blk + " on " 
  1482.                                    + dn.getName() + " is the only copy and was not deleted.");
  1483.     }
  1484.   }
  1485.   ////////////////////////////////////////////////////////////////
  1486.   // Here's how to handle block-copy failure during client write:
  1487.   // -- As usual, the client's write should result in a streaming
  1488.   // backup write to a k-machine sequence.
  1489.   // -- If one of the backup machines fails, no worries.  Fail silently.
  1490.   // -- Before client is allowed to close and finalize file, make sure
  1491.   // that the blocks are backed up.  Namenode may have to issue specific backup
  1492.   // commands to make up for earlier datanode failures.  Once all copies
  1493.   // are made, edit namespace and return to client.
  1494.   ////////////////////////////////////////////////////////////////
  1495.   /** Change the indicated filename. */
  1496.   public boolean renameTo(String src, String dst) throws IOException {
  1497.     boolean status = renameToInternal(src, dst);
  1498.     getEditLog().logSync();
  1499.     if (status && auditLog.isInfoEnabled()) {
  1500.       final FileStatus stat = dir.getFileInfo(dst);
  1501.       logAuditEvent(UserGroupInformation.getCurrentUGI(),
  1502.                     Server.getRemoteIp(),
  1503.                     "rename", src, dst, stat);
  1504.     }
  1505.     return status;
  1506.   }
  1507.   private synchronized boolean renameToInternal(String src, String dst
  1508.       ) throws IOException {
  1509.     NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src + " to " + dst);
  1510.     if (isInSafeMode())
  1511.       throw new SafeModeException("Cannot rename " + src, safeMode);
  1512.     if (!DFSUtil.isValidName(dst)) {
  1513.       throw new IOException("Invalid name: " + dst);
  1514.     }
  1515.     if (isPermissionEnabled) {
  1516.       //We should not be doing this.  This is move() not renameTo().
  1517.       //but for now,
  1518.       String actualdst = dir.isDir(dst)?
  1519.           dst + Path.SEPARATOR + new Path(src).getName(): dst;
  1520.       checkParentAccess(src, FsAction.WRITE);
  1521.       checkAncestorAccess(actualdst, FsAction.WRITE);
  1522.     }
  1523.     FileStatus dinfo = dir.getFileInfo(dst);
  1524.     if (dir.renameTo(src, dst)) {
  1525.       changeLease(src, dst, dinfo);     // update lease with new filename
  1526.       return true;
  1527.     }
  1528.     return false;
  1529.   }
  1530.   /**
  1531.    * Remove the indicated filename from namespace. If the filename 
  1532.    * is a directory (non empty) and recursive is set to false then throw exception.
  1533.    */
  1534.     public boolean delete(String src, boolean recursive) throws IOException {
  1535.       if ((!recursive) && (!dir.isDirEmpty(src))) {
  1536.         throw new IOException(src + " is non empty");
  1537.       }
  1538.       boolean status = deleteInternal(src, true);
  1539.       getEditLog().logSync();
  1540.       if (status && auditLog.isInfoEnabled()) {
  1541.         logAuditEvent(UserGroupInformation.getCurrentUGI(),
  1542.                       Server.getRemoteIp(),
  1543.                       "delete", src, null, null);
  1544.       }
  1545.       return status;
  1546.     }
  1547.     
  1548.   /**
  1549.    * Remove the indicated filename from the namespace.  This may
  1550.    * invalidate some blocks that make up the file.
  1551.    */
  1552.   synchronized boolean deleteInternal(String src, 
  1553.       boolean enforcePermission) throws IOException {
  1554.     if (NameNode.stateChangeLog.isDebugEnabled()) {
  1555.       NameNode.stateChangeLog.debug("DIR* NameSystem.delete: " + src);
  1556.     }
  1557.     if (isInSafeMode())
  1558.       throw new SafeModeException("Cannot delete " + src, safeMode);
  1559.     if (enforcePermission && isPermissionEnabled) {
  1560.       checkPermission(src, false, null, FsAction.WRITE, null, FsAction.ALL);
  1561.     }
  1562.     return dir.delete(src) != null;
  1563.   }
  1564.   void removePathAndBlocks(String src, List<Block> blocks) throws IOException {
  1565.     leaseManager.removeLeaseWithPrefixPath(src);
  1566.     for(Block b : blocks) {
  1567.       blocksMap.removeINode(b);
  1568.       corruptReplicas.removeFromCorruptReplicasMap(b);
  1569.       addToInvalidates(b);
  1570.     }
  1571.   }
  1572.   /** Get the file info for a specific file.
  1573.    * @param src The string representation of the path to the file
  1574.    * @throws IOException if permission to access file is denied by the system 
  1575.    * @return object containing information regarding the file
  1576.    *         or null if file not found
  1577.    */
  1578.   FileStatus getFileInfo(String src) throws IOException {
  1579.     if (isPermissionEnabled) {
  1580.       checkTraverse(src);
  1581.     }
  1582.     return dir.getFileInfo(src);
  1583.   }
  1584.   /**
  1585.    * Create all the necessary directories
  1586.    */
  1587.   public boolean mkdirs(String src, PermissionStatus permissions
  1588.       ) throws IOException {
  1589.     boolean status = mkdirsInternal(src, permissions);
  1590.     getEditLog().logSync();
  1591.     if (status && auditLog.isInfoEnabled()) {
  1592.       final FileStatus stat = dir.getFileInfo(src);
  1593.       logAuditEvent(UserGroupInformation.getCurrentUGI(),
  1594.                     Server.getRemoteIp(),
  1595.                     "mkdirs", src, null, stat);
  1596.     }
  1597.     return status;
  1598.   }
  1599.     
  1600.   /**
  1601.    * Create all the necessary directories
  1602.    */
  1603.   private synchronized boolean mkdirsInternal(String src,
  1604.       PermissionStatus permissions) throws IOException {
  1605.     NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src);
  1606.     if (isPermissionEnabled) {
  1607.       checkTraverse(src);
  1608.     }
  1609.     if (dir.isDir(src)) {
  1610.       // all the users of mkdirs() are used to expect 'true' even if
  1611.       // a new directory is not created.
  1612.       return true;
  1613.     }
  1614.     if (isInSafeMode())
  1615.       throw new SafeModeException("Cannot create directory " + src, safeMode);
  1616.     if (!DFSUtil.isValidName(src)) {
  1617.       throw new IOException("Invalid directory name: " + src);
  1618.     }
  1619.     if (isPermissionEnabled) {
  1620.       checkAncestorAccess(src, FsAction.WRITE);
  1621.     }
  1622.     // validate that we have enough inodes. This is, at best, a 
  1623.     // heuristic because the mkdirs() operation migth need to 
  1624.     // create multiple inodes.
  1625.     checkFsObjectLimit();
  1626.     if (!dir.mkdirs(src, permissions, false, now())) {
  1627.       throw new IOException("Invalid directory name: " + src);
  1628.     }
  1629.     return true;
  1630.   }
  1631.   ContentSummary getContentSummary(String src) throws IOException {
  1632.     if (isPermissionEnabled) {
  1633.       checkPermission(src, false, null, null, null, FsAction.READ_EXECUTE);
  1634.     }
  1635.     return dir.getContentSummary(src);
  1636.   }
  1637.   /**
  1638.    * Set the namespace quota and diskspace quota for a directory.
  1639.    * See {@link ClientProtocol#setQuota(String, long, long)} for the 
  1640.    * contract.
  1641.    */
  1642.   void setQuota(String path, long nsQuota, long dsQuota) throws IOException {
  1643.     if (isPermissionEnabled) {
  1644.       checkSuperuserPrivilege();
  1645.     }
  1646.     
  1647.     dir.setQuota(path, nsQuota, dsQuota);
  1648.     getEditLog().logSync();
  1649.   }
  1650.   
  1651.   /** Persist all metadata about this file.
  1652.    * @param src The string representation of the path
  1653.    * @param clientName The string representation of the client
  1654.    * @throws IOException if path does not exist
  1655.    */
  1656.   void fsync(String src, String clientName) throws IOException {
  1657.     NameNode.stateChangeLog.info("BLOCK* NameSystem.fsync: file "
  1658.                                   + src + " for " + clientName);
  1659.     synchronized (this) {
  1660.       if (isInSafeMode()) {
  1661.         throw new SafeModeException("Cannot fsync file " + src, safeMode);
  1662.       }
  1663.       INodeFileUnderConstruction pendingFile  = checkLease(src, clientName);
  1664.       dir.persistBlocks(src, pendingFile);
  1665.     }
  1666.   }
  1667.   /**
  1668.    * Move a file that is being written to be immutable.
  1669.    * @param src The filename
  1670.    * @param lease The lease for the client creating the file
  1671.    */
  1672.   void internalReleaseLease(Lease lease, String src) throws IOException {
  1673.     LOG.info("Recovering lease=" + lease + ", src=" + src);
  1674.     INodeFile iFile = dir.getFileINode(src);
  1675.     if (iFile == null) {
  1676.       final String message = "DIR* NameSystem.internalReleaseCreate: "
  1677.         + "attempt to release a create lock on "
  1678.         + src + " file does not exist.";
  1679.       NameNode.stateChangeLog.warn(message);
  1680.       throw new IOException(message);
  1681.     }
  1682.     if (!iFile.isUnderConstruction()) {
  1683.       final String message = "DIR* NameSystem.internalReleaseCreate: "
  1684.         + "attempt to release a create lock on "
  1685.         + src + " but file is already closed.";
  1686.       NameNode.stateChangeLog.warn(message);
  1687.       throw new IOException(message);
  1688.     }
  1689.     INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) iFile;
  1690.     // Initialize lease recovery for pendingFile. If there are no blocks 
  1691.     // associated with this file, then reap lease immediately. Otherwise 
  1692.     // renew the lease and trigger lease recovery.
  1693.     if (pendingFile.getTargets() == null ||
  1694.         pendingFile.getTargets().length == 0) {
  1695.       if (pendingFile.getBlocks().length == 0) {
  1696.         finalizeINodeFileUnderConstruction(src, pendingFile);
  1697.         NameNode.stateChangeLog.warn("BLOCK*"
  1698.           + " internalReleaseLease: No blocks found, lease removed.");
  1699.         return;
  1700.       }
  1701.       // setup the Inode.targets for the last block from the blocksMap
  1702.       //
  1703.       Block[] blocks = pendingFile.getBlocks();
  1704.       Block last = blocks[blocks.length-1];
  1705.       DatanodeDescriptor[] targets = 
  1706.          new DatanodeDescriptor[blocksMap.numNodes(last)];
  1707.       Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(last);
  1708.       for (int i = 0; it != null && it.hasNext(); i++) {
  1709.         targets[i] = it.next();
  1710.       }
  1711.       pendingFile.setTargets(targets);
  1712.     }
  1713.     // start lease recovery of the last block for this file.
  1714.     pendingFile.assignPrimaryDatanode();
  1715.     leaseManager.renewLease(lease);
  1716.   }
  1717.   private void finalizeINodeFileUnderConstruction(String src,
  1718.       INodeFileUnderConstruction pendingFile) throws IOException {
  1719.     leaseManager.removeLease(pendingFile.clientName, src);
  1720.     // The file is no longer pending.
  1721.     // Create permanent INode, update blockmap
  1722.     INodeFile newFile = pendingFile.convertToInodeFile();
  1723.     dir.replaceNode(src, pendingFile, newFile);
  1724.     // close file and persist block allocations for this file
  1725.     dir.closeFile(src, newFile);
  1726.     checkReplicationFactor(newFile);
  1727.   }
  1728.   synchronized void commitBlockSynchronization(Block lastblock,
  1729.       long newgenerationstamp, long newlength,
  1730.       boolean closeFile, boolean deleteblock, DatanodeID[] newtargets
  1731.       ) throws IOException {
  1732.     LOG.info("commitBlockSynchronization(lastblock=" + lastblock
  1733.           + ", newgenerationstamp=" + newgenerationstamp
  1734.           + ", newlength=" + newlength
  1735.           + ", newtargets=" + Arrays.asList(newtargets)
  1736.           + ", closeFile=" + closeFile
  1737.           + ", deleteBlock=" + deleteblock
  1738.           + ")");
  1739.     final BlockInfo oldblockinfo = blocksMap.getStoredBlock(lastblock);
  1740.     if (oldblockinfo == null) {
  1741.       throw new IOException("Block (=" + lastblock + ") not found");
  1742.     }
  1743.     INodeFile iFile = oldblockinfo.getINode();
  1744.     if (!iFile.isUnderConstruction()) {
  1745.       throw new IOException("Unexpected block (=" + lastblock
  1746.           + ") since the file (=" + iFile.getLocalName()
  1747.           + ") is not under construction");
  1748.     }
  1749.     INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)iFile;
  1750.     // Remove old block from blocks map. This always have to be done
  1751.     // because the generation stamp of this block is changing.
  1752.     blocksMap.removeBlock(oldblockinfo);
  1753.     if (deleteblock) {
  1754.       pendingFile.removeBlock(lastblock);
  1755.     }
  1756.     else {
  1757.       // update last block, construct newblockinfo and add it to the blocks map
  1758.       lastblock.set(lastblock.getBlockId(), newlength, newgenerationstamp);
  1759.       final BlockInfo newblockinfo = blocksMap.addINode(lastblock, pendingFile);
  1760.       // find the DatanodeDescriptor objects
  1761.       // There should be no locations in the blocksMap till now because the
  1762.       // file is underConstruction
  1763.       DatanodeDescriptor[] descriptors = null;
  1764.       if (newtargets.length > 0) {
  1765.         descriptors = new DatanodeDescriptor[newtargets.length];
  1766.         for(int i = 0; i < newtargets.length; i++) {
  1767.           descriptors[i] = getDatanode(newtargets[i]);
  1768.         }
  1769.       }
  1770.       if (closeFile) {
  1771.         // the file is getting closed. Insert block locations into blocksMap.
  1772.         // Otherwise fsck will report these blocks as MISSING, especially if the
  1773.         // blocksReceived from Datanodes take a long time to arrive.
  1774.         for (int i = 0; i < descriptors.length; i++) {
  1775.           descriptors[i].addBlock(newblockinfo);
  1776.         }
  1777.         pendingFile.setLastBlock(newblockinfo, null);
  1778.       } else {
  1779.         // add locations into the INodeUnderConstruction
  1780.         pendingFile.setLastBlock(newblockinfo, descriptors);
  1781.       }
  1782.     }
  1783.     // If this commit does not want to close the file, just persist
  1784.     // blocks and return
  1785.     String src = leaseManager.findPath(pendingFile);
  1786.     if (!closeFile) {
  1787.       dir.persistBlocks(src, pendingFile);
  1788.       getEditLog().logSync();
  1789.       LOG.info("commitBlockSynchronization(" + lastblock + ") successful");
  1790.       return;
  1791.     }
  1792.     
  1793.     //remove lease, close file
  1794.     finalizeINodeFileUnderConstruction(src, pendingFile);
  1795.     getEditLog().logSync();
  1796.     LOG.info("commitBlockSynchronization(newblock=" + lastblock
  1797.           + ", file=" + src
  1798.           + ", newgenerationstamp=" + newgenerationstamp
  1799.           + ", newlength=" + newlength
  1800.           + ", newtargets=" + Arrays.asList(newtargets) + ") successful");
  1801.   }
  1802.   /**
  1803.    * Renew the lease(s) held by the given client
  1804.    */
  1805.   void renewLease(String holder) throws IOException {
  1806.     if (isInSafeMode())
  1807.       throw new SafeModeException("Cannot renew lease for " + holder, safeMode);
  1808.     leaseManager.renewLease(holder);
  1809.   }
  1810.   /**
  1811.    * Get a listing of all files at 'src'.  The Object[] array
  1812.    * exists so we can return file attributes (soon to be implemented)
  1813.    */
  1814.   public FileStatus[] getListing(String src) throws IOException {
  1815.     if (isPermissionEnabled) {
  1816.       if (dir.isDir(src)) {
  1817.         checkPathAccess(src, FsAction.READ_EXECUTE);
  1818.       }
  1819.       else {
  1820.         checkTraverse(src);
  1821.       }
  1822.     }
  1823.     if (auditLog.isInfoEnabled()) {
  1824.       logAuditEvent(UserGroupInformation.getCurrentUGI(),
  1825.                     Server.getRemoteIp(),
  1826.                     "listStatus", src, null, null);
  1827.     }
  1828.     return dir.getListing(src);
  1829.   }
  1830.   /////////////////////////////////////////////////////////
  1831.   //
  1832.   // These methods are called by datanodes
  1833.   //
  1834.   /////////////////////////////////////////////////////////
  1835.   /**
  1836.    * Register Datanode.
  1837.    * <p>
  1838.    * The purpose of registration is to identify whether the new datanode
  1839.    * serves a new data storage, and will report new data block copies,
  1840.    * which the namenode was not aware of; or the datanode is a replacement
  1841.    * node for the data storage that was previously served by a different
  1842.    * or the same (in terms of host:port) datanode.
  1843.    * The data storages are distinguished by their storageIDs. When a new
  1844.    * data storage is reported the namenode issues a new unique storageID.
  1845.    * <p>
  1846.    * Finally, the namenode returns its namespaceID as the registrationID
  1847.    * for the datanodes. 
  1848.    * namespaceID is a persistent attribute of the name space.
  1849.    * The registrationID is checked every time the datanode is communicating
  1850.    * with the namenode. 
  1851.    * Datanodes with inappropriate registrationID are rejected.
  1852.    * If the namenode stops, and then restarts it can restore its 
  1853.    * namespaceID and will continue serving the datanodes that has previously
  1854.    * registered with the namenode without restarting the whole cluster.
  1855.    * 
  1856.    * @see org.apache.hadoop.hdfs.server.datanode.DataNode#register()
  1857.    */
  1858.   public synchronized void registerDatanode(DatanodeRegistration nodeReg
  1859.                                             ) throws IOException {
  1860.     String dnAddress = Server.getRemoteAddress();
  1861.     if (dnAddress == null) {
  1862.       // Mostly called inside an RPC.
  1863.       // But if not, use address passed by the data-node.
  1864.       dnAddress = nodeReg.getHost();
  1865.     }      
  1866.     // check if the datanode is allowed to be connect to the namenode
  1867.     if (!verifyNodeRegistration(nodeReg, dnAddress)) {
  1868.       throw new DisallowedDatanodeException(nodeReg);
  1869.     }
  1870.     String hostName = nodeReg.getHost();
  1871.       
  1872.     // update the datanode's name with ip:port
  1873.     DatanodeID dnReg = new DatanodeID(dnAddress + ":" + nodeReg.getPort(),
  1874.                                       nodeReg.getStorageID(),
  1875.                                       nodeReg.getInfoPort(),
  1876.                                       nodeReg.getIpcPort());
  1877.     nodeReg.updateRegInfo(dnReg);
  1878.       
  1879.     NameNode.stateChangeLog.info(
  1880.                                  "BLOCK* NameSystem.registerDatanode: "
  1881.                                  + "node registration from " + nodeReg.getName()
  1882.                                  + " storage " + nodeReg.getStorageID());
  1883.     DatanodeDescriptor nodeS = datanodeMap.get(nodeReg.getStorageID());
  1884.     DatanodeDescriptor nodeN = host2DataNodeMap.getDatanodeByName(nodeReg.getName());
  1885.       
  1886.     if (nodeN != null && nodeN != nodeS) {
  1887.       NameNode.LOG.info("BLOCK* NameSystem.registerDatanode: "
  1888.                         + "node from name: " + nodeN.getName());
  1889.       // nodeN previously served a different data storage, 
  1890.       // which is not served by anybody anymore.
  1891.       removeDatanode(nodeN);
  1892.       // physically remove node from datanodeMap
  1893.       wipeDatanode(nodeN);
  1894.       nodeN = null;
  1895.     }
  1896.     if (nodeS != null) {
  1897.       if (nodeN == nodeS) {
  1898.         // The same datanode has been just restarted to serve the same data 
  1899.         // storage. We do not need to remove old data blocks, the delta will
  1900.         // be calculated on the next block report from the datanode
  1901.         NameNode.stateChangeLog.debug("BLOCK* NameSystem.registerDatanode: "
  1902.                                       + "node restarted.");
  1903.       } else {
  1904.         // nodeS is found
  1905.         /* The registering datanode is a replacement node for the existing 
  1906.           data storage, which from now on will be served by a new node.
  1907.           If this message repeats, both nodes might have same storageID 
  1908.           by (insanely rare) random chance. User needs to restart one of the
  1909.           nodes with its data cleared (or user can just remove the StorageID
  1910.           value in "VERSION" file under the data directory of the datanode,
  1911.           but this is might not work if VERSION file format has changed 
  1912.        */        
  1913.         NameNode.stateChangeLog.info( "BLOCK* NameSystem.registerDatanode: "
  1914.                                       + "node " + nodeS.getName()
  1915.                                       + " is replaced by " + nodeReg.getName() + 
  1916.                                       " with the same storageID " +
  1917.                                       nodeReg.getStorageID());
  1918.       }
  1919.       // update cluster map
  1920.       clusterMap.remove(nodeS);
  1921.       nodeS.updateRegInfo(nodeReg);
  1922.       nodeS.setHostName(hostName);
  1923.       
  1924.       // resolve network location
  1925.       resolveNetworkLocation(nodeS);
  1926.       clusterMap.add(nodeS);
  1927.         
  1928.       // also treat the registration message as a heartbeat
  1929.       synchronized(heartbeats) {
  1930.         if( !heartbeats.contains(nodeS)) {
  1931.           heartbeats.add(nodeS);
  1932.           //update its timestamp
  1933.           nodeS.updateHeartbeat(0L, 0L, 0L, 0);
  1934.           nodeS.isAlive = true;
  1935.         }
  1936.       }
  1937.       return;
  1938.     } 
  1939.     // this is a new datanode serving a new data storage
  1940.     if (nodeReg.getStorageID().equals("")) {
  1941.       // this data storage has never been registered
  1942.       // it is either empty or was created by pre-storageID version of DFS
  1943.       nodeReg.storageID = newStorageID();
  1944.       NameNode.stateChangeLog.debug(
  1945.                                     "BLOCK* NameSystem.registerDatanode: "
  1946.                                     + "new storageID " + nodeReg.getStorageID() + " assigned.");
  1947.     }
  1948.     // register new datanode
  1949.     DatanodeDescriptor nodeDescr 
  1950.       = new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK, hostName);
  1951.     resolveNetworkLocation(nodeDescr);
  1952.     unprotectedAddDatanode(nodeDescr);
  1953.     clusterMap.add(nodeDescr);
  1954.       
  1955.     // also treat the registration message as a heartbeat
  1956.     synchronized(heartbeats) {
  1957.       heartbeats.add(nodeDescr);
  1958.       nodeDescr.isAlive = true;
  1959.       // no need to update its timestamp
  1960.       // because its is done when the descriptor is created
  1961.     }
  1962.     return;
  1963.   }
  1964.     
  1965.   /* Resolve a node's network location */
  1966.   private void resolveNetworkLocation (DatanodeDescriptor node) {
  1967.     List<String> names = new ArrayList<String>(1);
  1968.     if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
  1969.       // get the node's IP address
  1970.       names.add(node.getHost());
  1971.     } else {
  1972.       // get the node's host name
  1973.       String hostName = node.getHostName();
  1974.       int colon = hostName.indexOf(":");
  1975.       hostName = (colon==-1)?hostName:hostName.substring(0,colon);
  1976.       names.add(hostName);
  1977.     }
  1978.     
  1979.     // resolve its network location
  1980.     List<String> rName = dnsToSwitchMapping.resolve(names);
  1981.     String networkLocation;
  1982.     if (rName == null) {
  1983.       LOG.error("The resolve call returned null! Using " + 
  1984.           NetworkTopology.DEFAULT_RACK + " for host " + names);
  1985.       networkLocation = NetworkTopology.DEFAULT_RACK;
  1986.     } else {
  1987.       networkLocation = rName.get(0);
  1988.     }
  1989.     node.setNetworkLocation(networkLocation);
  1990.   }
  1991.   
  1992.   /**
  1993.    * Get registrationID for datanodes based on the namespaceID.
  1994.    * 
  1995.    * @see #registerDatanode(DatanodeRegistration)
  1996.    * @see FSImage#newNamespaceID()
  1997.    * @return registration ID
  1998.    */
  1999.   public String getRegistrationID() {
  2000.     return Storage.getRegistrationID(dir.fsImage);
  2001.   }
  2002.     
  2003.   /**
  2004.    * Generate new storage ID.
  2005.    * 
  2006.    * @return unique storage ID
  2007.    * 
  2008.    * Note: that collisions are still possible if somebody will try 
  2009.    * to bring in a data storage from a different cluster.
  2010.    */
  2011.   private String newStorageID() {
  2012.     String newID = null;
  2013.     while(newID == null) {
  2014.       newID = "DS" + Integer.toString(r.nextInt());
  2015.       if (datanodeMap.get(newID) != null)
  2016.         newID = null;
  2017.     }
  2018.     return newID;
  2019.   }
  2020.     
  2021.   private boolean isDatanodeDead(DatanodeDescriptor node) {
  2022.     return (node.getLastUpdate() <
  2023.             (now() - heartbeatExpireInterval));
  2024.   }
  2025.     
  2026.   private void setDatanodeDead(DatanodeDescriptor node) throws IOException {
  2027.     node.setLastUpdate(0);
  2028.   }
  2029.   /**
  2030.    * The given node has reported in.  This method should:
  2031.    * 1) Record the heartbeat, so the datanode isn't timed out
  2032.    * 2) Adjust usage stats for future block allocation
  2033.    * 
  2034.    * If a substantial amount of time passed since the last datanode 
  2035.    * heartbeat then request an immediate block report.  
  2036.    * 
  2037.    * @return an array of datanode commands 
  2038.    * @throws IOException
  2039.    */
  2040.   DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
  2041.       long capacity, long dfsUsed, long remaining,
  2042.       int xceiverCount, int xmitsInProgress) throws IOException {
  2043.     DatanodeCommand cmd = null;
  2044.     synchronized (heartbeats) {
  2045.       synchronized (datanodeMap) {
  2046.         DatanodeDescriptor nodeinfo = null;
  2047.         try {
  2048.           nodeinfo = getDatanode(nodeReg);
  2049.         } catch(UnregisteredDatanodeException e) {
  2050.           return new DatanodeCommand[]{DatanodeCommand.REGISTER};
  2051.         }
  2052.           
  2053.         // Check if this datanode should actually be shutdown instead. 
  2054.         if (nodeinfo != null && shouldNodeShutdown(nodeinfo)) {
  2055.           setDatanodeDead(nodeinfo);
  2056.           throw new DisallowedDatanodeException(nodeinfo);
  2057.         }
  2058.         if (nodeinfo == null || !nodeinfo.isAlive) {
  2059.           return new DatanodeCommand[]{DatanodeCommand.REGISTER};
  2060.         }
  2061.         updateStats(nodeinfo, false);
  2062.         nodeinfo.updateHeartbeat(capacity, dfsUsed, remaining, xceiverCount);
  2063.         updateStats(nodeinfo, true);
  2064.         
  2065.         //check lease recovery
  2066.         cmd = nodeinfo.getLeaseRecoveryCommand(Integer.MAX_VALUE);
  2067.         if (cmd != null) {
  2068.           return new DatanodeCommand[] {cmd};
  2069.         }
  2070.       
  2071.         ArrayList<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>(2);
  2072.         //check pending replication
  2073.         cmd = nodeinfo.getReplicationCommand(
  2074.               maxReplicationStreams - xmitsInProgress);
  2075.         if (cmd != null) {
  2076.           cmds.add(cmd);
  2077.         }
  2078.         //check block invalidation
  2079.         cmd = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
  2080.         if (cmd != null) {
  2081.           cmds.add(cmd);
  2082.         }
  2083.         if (!cmds.isEmpty()) {
  2084.           return cmds.toArray(new DatanodeCommand[cmds.size()]);
  2085.         }
  2086.       }
  2087.     }
  2088.     //check distributed upgrade
  2089.     cmd = getDistributedUpgradeCommand();
  2090.     if (cmd != null) {
  2091.       return new DatanodeCommand[] {cmd};
  2092.     }
  2093.     return null;
  2094.   }
  2095.   private void updateStats(DatanodeDescriptor node, boolean isAdded) {
  2096.     //
  2097.     // The statistics are protected by the heartbeat lock
  2098.     //
  2099.     assert(Thread.holdsLock(heartbeats));
  2100.     if (isAdded) {
  2101.       capacityTotal += node.getCapacity();
  2102.       capacityUsed += node.getDfsUsed();
  2103.       capacityRemaining += node.getRemaining();
  2104.       totalLoad += node.getXceiverCount();
  2105.     } else {
  2106.       capacityTotal -= node.getCapacity();
  2107.       capacityUsed -= node.getDfsUsed();
  2108.       capacityRemaining -= node.getRemaining();
  2109.       totalLoad -= node.getXceiverCount();
  2110.     }
  2111.   }
  2112.   /**
  2113.    * Periodically calls heartbeatCheck().
  2114.    */
  2115.   class HeartbeatMonitor implements Runnable {
  2116.     /**
  2117.      */
  2118.     public void run() {
  2119.       while (fsRunning) {
  2120.         try {
  2121.           heartbeatCheck();
  2122.         } catch (Exception e) {
  2123.           FSNamesystem.LOG.error(StringUtils.stringifyException(e));
  2124.         }
  2125.         try {
  2126.           Thread.sleep(heartbeatRecheckInterval);
  2127.         } catch (InterruptedException ie) {
  2128.         }
  2129.       }
  2130.     }
  2131.   }
  2132.   /**
  2133.    * Periodically calls computeReplicationWork().
  2134.    */
  2135.   class ReplicationMonitor implements Runnable {
  2136.     static final int INVALIDATE_WORK_PCT_PER_ITERATION = 32;
  2137.     static final float REPLICATION_WORK_MULTIPLIER_PER_ITERATION = 2;
  2138.     public void run() {
  2139.       while (fsRunning) {
  2140.         try {
  2141.           computeDatanodeWork();
  2142.           processPendingReplications();
  2143.           Thread.sleep(replicationRecheckInterval);
  2144.         } catch (InterruptedException ie) {
  2145.           LOG.warn("ReplicationMonitor thread received InterruptedException." + ie);
  2146.           break;
  2147.         } catch (IOException ie) {
  2148.           LOG.warn("ReplicationMonitor thread received exception. " + ie);
  2149.         } catch (Throwable t) {
  2150.           LOG.warn("ReplicationMonitor thread received Runtime exception. " + t);
  2151.           Runtime.getRuntime().exit(-1);
  2152.         }
  2153.       }
  2154.     }
  2155.   }
  2156.   /////////////////////////////////////////////////////////
  2157.   //
  2158.   // These methods are called by the Namenode system, to see
  2159.   // if there is any work for registered datanodes.
  2160.   //
  2161.   /////////////////////////////////////////////////////////
  2162.   /**
  2163.    * Compute block replication and block invalidation work 
  2164.    * that can be scheduled on data-nodes.
  2165.    * The datanode will be informed of this work at the next heartbeat.
  2166.    * 
  2167.    * @return number of blocks scheduled for replication or removal.
  2168.    */
  2169.   public int computeDatanodeWork() throws IOException {
  2170.     int workFound = 0;
  2171.     int blocksToProcess = 0;
  2172.     int nodesToProcess = 0;
  2173.     // blocks should not be replicated or removed if safe mode is on
  2174.     if (isInSafeMode())
  2175.       return workFound;
  2176.     synchronized(heartbeats) {
  2177.       blocksToProcess = (int)(heartbeats.size() 
  2178.           * ReplicationMonitor.REPLICATION_WORK_MULTIPLIER_PER_ITERATION);
  2179.       nodesToProcess = (int)Math.ceil((double)heartbeats.size() 
  2180.           * ReplicationMonitor.INVALIDATE_WORK_PCT_PER_ITERATION / 100);
  2181.     }
  2182.     workFound = computeReplicationWork(blocksToProcess); 
  2183.     
  2184.     // Update FSNamesystemMetrics counters
  2185.     synchronized (this) {
  2186.       pendingReplicationBlocksCount = pendingReplications.size();
  2187.       underReplicatedBlocksCount = neededReplications.size();
  2188.       scheduledReplicationBlocksCount = workFound;
  2189.       corruptReplicaBlocksCount = corruptReplicas.size();
  2190.     }
  2191.     
  2192.     workFound += computeInvalidateWork(nodesToProcess);