FSNamesystem.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:167k
源码类别:

网格计算

开发平台:

Java

  1.     return workFound;
  2.   }
  3.   private int computeInvalidateWork(int nodesToProcess) {
  4.     int blockCnt = 0;
  5.     for(int nodeCnt = 0; nodeCnt < nodesToProcess; nodeCnt++ ) {
  6.       int work = invalidateWorkForOneNode();
  7.       if(work == 0)
  8.         break;
  9.       blockCnt += work;
  10.     }
  11.     return blockCnt;
  12.   }
  13.   /**
  14.    * Scan blocks in {@link #neededReplications} and assign replication
  15.    * work to data-nodes they belong to. 
  16.    * 
  17.    * The number of process blocks equals either twice the number of live 
  18.    * data-nodes or the number of under-replicated blocks whichever is less.
  19.    * 
  20.    * @return number of blocks scheduled for replication during this iteration.
  21.    */
  22.   private int computeReplicationWork(
  23.                                   int blocksToProcess) throws IOException {
  24.     // Choose the blocks to be replicated
  25.     List<List<Block>> blocksToReplicate = 
  26.       chooseUnderReplicatedBlocks(blocksToProcess);
  27.     // replicate blocks
  28.     int scheduledReplicationCount = 0;
  29.     for (int i=0; i<blocksToReplicate.size(); i++) {
  30.       for(Block block : blocksToReplicate.get(i)) {
  31.         if (computeReplicationWorkForBlock(block, i)) {
  32.           scheduledReplicationCount++;
  33.         }
  34.       }
  35.     }
  36.     return scheduledReplicationCount;
  37.   }
  38.   
  39.   /** Get a list of block lists to be replicated
  40.    * The index of block lists represents the 
  41.    * 
  42.    * @param blocksToProcess
  43.    * @return Return a list of block lists to be replicated. 
  44.    *         The block list index represents its replication priority.
  45.    */
  46.   synchronized List<List<Block>> chooseUnderReplicatedBlocks(int blocksToProcess) {
  47.     // initialize data structure for the return value
  48.     List<List<Block>> blocksToReplicate = 
  49.       new ArrayList<List<Block>>(UnderReplicatedBlocks.LEVEL);
  50.     for (int i=0; i<UnderReplicatedBlocks.LEVEL; i++) {
  51.       blocksToReplicate.add(new ArrayList<Block>());
  52.     }
  53.     
  54.     synchronized(neededReplications) {
  55.       if (neededReplications.size() == 0) {
  56.         missingBlocksInCurIter = 0;
  57.         missingBlocksInPrevIter = 0;
  58.         return blocksToReplicate;
  59.       }
  60.       
  61.       // Go through all blocks that need replications.
  62.       BlockIterator neededReplicationsIterator = neededReplications.iterator();
  63.       // skip to the first unprocessed block, which is at replIndex 
  64.       for(int i=0; i < replIndex && neededReplicationsIterator.hasNext(); i++) {
  65.         neededReplicationsIterator.next();
  66.       }
  67.       // # of blocks to process equals either twice the number of live 
  68.       // data-nodes or the number of under-replicated blocks whichever is less
  69.       blocksToProcess = Math.min(blocksToProcess, neededReplications.size());
  70.       for (int blkCnt = 0; blkCnt < blocksToProcess; blkCnt++, replIndex++) {
  71.         if( ! neededReplicationsIterator.hasNext()) {
  72.           // start from the beginning
  73.           replIndex = 0;
  74.           missingBlocksInPrevIter = missingBlocksInCurIter;
  75.           missingBlocksInCurIter = 0;
  76.           blocksToProcess = Math.min(blocksToProcess, neededReplications.size());
  77.           if(blkCnt >= blocksToProcess)
  78.             break;
  79.           neededReplicationsIterator = neededReplications.iterator();
  80.           assert neededReplicationsIterator.hasNext() : 
  81.                                   "neededReplications should not be empty.";
  82.         }
  83.         Block block = neededReplicationsIterator.next();
  84.         int priority = neededReplicationsIterator.getPriority();
  85.         if (priority < 0 || priority >= blocksToReplicate.size()) {
  86.           LOG.warn("Unexpected replication priority: " + priority + " " + block);
  87.         } else {
  88.           blocksToReplicate.get(priority).add(block);
  89.         }
  90.       } // end for
  91.     } // end synchronized
  92.     return blocksToReplicate;
  93.  }
  94.   
  95.   /** Replicate a block
  96.    * 
  97.    * @param block block to be replicated
  98.    * @param priority a hint of its priority in the neededReplication queue
  99.    * @return if the block gets replicated or not
  100.    */
  101.   boolean computeReplicationWorkForBlock(Block block, int priority) {
  102.     int requiredReplication, numEffectiveReplicas; 
  103.     List<DatanodeDescriptor> containingNodes;
  104.     DatanodeDescriptor srcNode;
  105.     
  106.     synchronized (this) {
  107.       synchronized (neededReplications) {
  108.         // block should belong to a file
  109.         INodeFile fileINode = blocksMap.getINode(block);
  110.         // abandoned block or block reopened for append
  111.         if(fileINode == null || fileINode.isUnderConstruction()) { 
  112.           neededReplications.remove(block, priority); // remove from neededReplications
  113.           replIndex--;
  114.           return false;
  115.         }
  116.         requiredReplication = fileINode.getReplication(); 
  117.         // get a source data-node
  118.         containingNodes = new ArrayList<DatanodeDescriptor>();
  119.         NumberReplicas numReplicas = new NumberReplicas();
  120.         srcNode = chooseSourceDatanode(block, containingNodes, numReplicas);
  121.         if ((numReplicas.liveReplicas() + numReplicas.decommissionedReplicas())
  122.             <= 0) {          
  123.           missingBlocksInCurIter++;
  124.         }
  125.         if(srcNode == null) // block can not be replicated from any node
  126.           return false;
  127.         // do not schedule more if enough replicas is already pending
  128.         numEffectiveReplicas = numReplicas.liveReplicas() +
  129.                                 pendingReplications.getNumReplicas(block);
  130.         if(numEffectiveReplicas >= requiredReplication) {
  131.           neededReplications.remove(block, priority); // remove from neededReplications
  132.           replIndex--;
  133.           NameNode.stateChangeLog.info("BLOCK* "
  134.               + "Removing block " + block
  135.               + " from neededReplications as it has enough replicas.");
  136.           return false;
  137.         }
  138.       }
  139.     }
  140.     // choose replication targets: NOT HODING THE GLOBAL LOCK
  141.     DatanodeDescriptor targets[] = replicator.chooseTarget(
  142.         requiredReplication - numEffectiveReplicas,
  143.         srcNode, containingNodes, null, block.getNumBytes());
  144.     if(targets.length == 0)
  145.       return false;
  146.     synchronized (this) {
  147.       synchronized (neededReplications) {
  148.         // Recheck since global lock was released
  149.         // block should belong to a file
  150.         INodeFile fileINode = blocksMap.getINode(block);
  151.         // abandoned block or block reopened for append
  152.         if(fileINode == null || fileINode.isUnderConstruction()) { 
  153.           neededReplications.remove(block, priority); // remove from neededReplications
  154.           replIndex--;
  155.           return false;
  156.         }
  157.         requiredReplication = fileINode.getReplication(); 
  158.         // do not schedule more if enough replicas is already pending
  159.         NumberReplicas numReplicas = countNodes(block);
  160.         numEffectiveReplicas = numReplicas.liveReplicas() +
  161.         pendingReplications.getNumReplicas(block);
  162.         if(numEffectiveReplicas >= requiredReplication) {
  163.           neededReplications.remove(block, priority); // remove from neededReplications
  164.           replIndex--;
  165.           NameNode.stateChangeLog.info("BLOCK* "
  166.               + "Removing block " + block
  167.               + " from neededReplications as it has enough replicas.");
  168.           return false;
  169.         }
  170.         // Add block to the to be replicated list
  171.         srcNode.addBlockToBeReplicated(block, targets);
  172.         for (DatanodeDescriptor dn : targets) {
  173.           dn.incBlocksScheduled();
  174.         }
  175.         
  176.         // Move the block-replication into a "pending" state.
  177.         // The reason we use 'pending' is so we can retry
  178.         // replications that fail after an appropriate amount of time.
  179.         pendingReplications.add(block, targets.length);
  180.         NameNode.stateChangeLog.debug(
  181.             "BLOCK* block " + block
  182.             + " is moved from neededReplications to pendingReplications");
  183.         // remove from neededReplications
  184.         if(numEffectiveReplicas + targets.length >= requiredReplication) {
  185.           neededReplications.remove(block, priority); // remove from neededReplications
  186.           replIndex--;
  187.         }
  188.         if (NameNode.stateChangeLog.isInfoEnabled()) {
  189.           StringBuffer targetList = new StringBuffer("datanode(s)");
  190.           for (int k = 0; k < targets.length; k++) {
  191.             targetList.append(' ');
  192.             targetList.append(targets[k].getName());
  193.           }
  194.           NameNode.stateChangeLog.info(
  195.                     "BLOCK* ask "
  196.                     + srcNode.getName() + " to replicate "
  197.                     + block + " to " + targetList);
  198.           NameNode.stateChangeLog.debug(
  199.                     "BLOCK* neededReplications = " + neededReplications.size()
  200.                     + " pendingReplications = " + pendingReplications.size());
  201.         }
  202.       }
  203.     }
  204.     
  205.     return true;
  206.   }
  207.   /**
  208.    * Parse the data-nodes the block belongs to and choose one,
  209.    * which will be the replication source.
  210.    * 
  211.    * We prefer nodes that are in DECOMMISSION_INPROGRESS state to other nodes
  212.    * since the former do not have write traffic and hence are less busy.
  213.    * We do not use already decommissioned nodes as a source.
  214.    * Otherwise we choose a random node among those that did not reach their 
  215.    * replication limit.
  216.    * 
  217.    * In addition form a list of all nodes containing the block
  218.    * and calculate its replication numbers.
  219.    */
  220.   private DatanodeDescriptor chooseSourceDatanode(
  221.                                     Block block,
  222.                                     List<DatanodeDescriptor> containingNodes,
  223.                                     NumberReplicas numReplicas) {
  224.     containingNodes.clear();
  225.     DatanodeDescriptor srcNode = null;
  226.     int live = 0;
  227.     int decommissioned = 0;
  228.     int corrupt = 0;
  229.     int excess = 0;
  230.     Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
  231.     Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(block);
  232.     while(it.hasNext()) {
  233.       DatanodeDescriptor node = it.next();
  234.       Collection<Block> excessBlocks = 
  235.         excessReplicateMap.get(node.getStorageID());
  236.       if ((nodesCorrupt != null) && (nodesCorrupt.contains(node)))
  237.         corrupt++;
  238.       else if (node.isDecommissionInProgress() || node.isDecommissioned())
  239.         decommissioned++;
  240.       else if (excessBlocks != null && excessBlocks.contains(block)) {
  241.         excess++;
  242.       } else {
  243.         live++;
  244.       }
  245.       containingNodes.add(node);
  246.       // Check if this replica is corrupt
  247.       // If so, do not select the node as src node
  248.       if ((nodesCorrupt != null) && nodesCorrupt.contains(node))
  249.         continue;
  250.       if(node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams)
  251.         continue; // already reached replication limit
  252.       // the block must not be scheduled for removal on srcNode
  253.       if(excessBlocks != null && excessBlocks.contains(block))
  254.         continue;
  255.       // never use already decommissioned nodes
  256.       if(node.isDecommissioned())
  257.         continue;
  258.       // we prefer nodes that are in DECOMMISSION_INPROGRESS state
  259.       if(node.isDecommissionInProgress() || srcNode == null) {
  260.         srcNode = node;
  261.         continue;
  262.       }
  263.       if(srcNode.isDecommissionInProgress())
  264.         continue;
  265.       // switch to a different node randomly
  266.       // this to prevent from deterministically selecting the same node even
  267.       // if the node failed to replicate the block on previous iterations
  268.       if(r.nextBoolean())
  269.         srcNode = node;
  270.     }
  271.     if(numReplicas != null)
  272.       numReplicas.initialize(live, decommissioned, corrupt, excess);
  273.     return srcNode;
  274.   }
  275.   /**
  276.    * Get blocks to invalidate for the first node 
  277.    * in {@link #recentInvalidateSets}.
  278.    * 
  279.    * @return number of blocks scheduled for removal during this iteration.
  280.    */
  281.   private synchronized int invalidateWorkForOneNode() {
  282.     // blocks should not be replicated or removed if safe mode is on
  283.     if (isInSafeMode())
  284.       return 0;
  285.     if(recentInvalidateSets.isEmpty())
  286.       return 0;
  287.     // get blocks to invalidate for the first node
  288.     String firstNodeId = recentInvalidateSets.keySet().iterator().next();
  289.     assert firstNodeId != null;
  290.     DatanodeDescriptor dn = datanodeMap.get(firstNodeId);
  291.     Collection<Block> invalidateSet = recentInvalidateSets.remove(firstNodeId);
  292.  
  293.     if(invalidateSet == null || dn == null)
  294.       return 0;
  295.     ArrayList<Block> blocksToInvalidate = 
  296.       new ArrayList<Block>(blockInvalidateLimit);
  297.     // # blocks that can be sent in one message is limited
  298.     Iterator<Block> it = invalidateSet.iterator();
  299.     for(int blkCount = 0; blkCount < blockInvalidateLimit && it.hasNext();
  300.                                                                 blkCount++) {
  301.       blocksToInvalidate.add(it.next());
  302.       it.remove();
  303.     }
  304.     // If we could not send everything in this message, reinsert this item
  305.     // into the collection.
  306.     if(it.hasNext())
  307.       recentInvalidateSets.put(firstNodeId, invalidateSet);
  308.     dn.addBlocksToBeInvalidated(blocksToInvalidate);
  309.     if(NameNode.stateChangeLog.isInfoEnabled()) {
  310.       StringBuffer blockList = new StringBuffer();
  311.       for(Block blk : blocksToInvalidate) {
  312.         blockList.append(' ');
  313.         blockList.append(blk);
  314.       }
  315.       NameNode.stateChangeLog.info("BLOCK* ask "
  316.           + dn.getName() + " to delete " + blockList);
  317.     }
  318.     return blocksToInvalidate.size();
  319.   }
  320.   public void setNodeReplicationLimit(int limit) {
  321.     this.maxReplicationStreams = limit;
  322.   }
  323.   /**
  324.    * If there were any replication requests that timed out, reap them
  325.    * and put them back into the neededReplication queue
  326.    */
  327.   void processPendingReplications() {
  328.     Block[] timedOutItems = pendingReplications.getTimedOutBlocks();
  329.     if (timedOutItems != null) {
  330.       synchronized (this) {
  331.         for (int i = 0; i < timedOutItems.length; i++) {
  332.           NumberReplicas num = countNodes(timedOutItems[i]);
  333.           neededReplications.add(timedOutItems[i], 
  334.                                  num.liveReplicas(),
  335.                                  num.decommissionedReplicas(),
  336.                                  getReplication(timedOutItems[i]));
  337.         }
  338.       }
  339.       /* If we know the target datanodes where the replication timedout,
  340.        * we could invoke decBlocksScheduled() on it. Its ok for now.
  341.        */
  342.     }
  343.   }
  344.   /**
  345.    * remove a datanode descriptor
  346.    * @param nodeID datanode ID
  347.    */
  348.   synchronized public void removeDatanode(DatanodeID nodeID) 
  349.     throws IOException {
  350.     DatanodeDescriptor nodeInfo = getDatanode(nodeID);
  351.     if (nodeInfo != null) {
  352.       removeDatanode(nodeInfo);
  353.     } else {
  354.       NameNode.stateChangeLog.warn("BLOCK* NameSystem.removeDatanode: "
  355.                                    + nodeID.getName() + " does not exist");
  356.     }
  357.   }
  358.   
  359.   /**
  360.    * remove a datanode descriptor
  361.    * @param nodeInfo datanode descriptor
  362.    */
  363.   private void removeDatanode(DatanodeDescriptor nodeInfo) {
  364.     synchronized (heartbeats) {
  365.       if (nodeInfo.isAlive) {
  366.         updateStats(nodeInfo, false);
  367.         heartbeats.remove(nodeInfo);
  368.         nodeInfo.isAlive = false;
  369.       }
  370.     }
  371.     for (Iterator<Block> it = nodeInfo.getBlockIterator(); it.hasNext();) {
  372.       removeStoredBlock(it.next(), nodeInfo);
  373.     }
  374.     unprotectedRemoveDatanode(nodeInfo);
  375.     clusterMap.remove(nodeInfo);
  376.   }
  377.   void unprotectedRemoveDatanode(DatanodeDescriptor nodeDescr) {
  378.     nodeDescr.resetBlocks();
  379.     removeFromInvalidates(nodeDescr);
  380.     NameNode.stateChangeLog.debug(
  381.                                   "BLOCK* NameSystem.unprotectedRemoveDatanode: "
  382.                                   + nodeDescr.getName() + " is out of service now.");
  383.   }
  384.     
  385.   void unprotectedAddDatanode(DatanodeDescriptor nodeDescr) {
  386.     /* To keep host2DataNodeMap consistent with datanodeMap,
  387.        remove  from host2DataNodeMap the datanodeDescriptor removed
  388.        from datanodeMap before adding nodeDescr to host2DataNodeMap.
  389.     */
  390.     host2DataNodeMap.remove(
  391.                             datanodeMap.put(nodeDescr.getStorageID(), nodeDescr));
  392.     host2DataNodeMap.add(nodeDescr);
  393.       
  394.     NameNode.stateChangeLog.debug(
  395.                                   "BLOCK* NameSystem.unprotectedAddDatanode: "
  396.                                   + "node " + nodeDescr.getName() + " is added to datanodeMap.");
  397.   }
  398.   /**
  399.    * Physically remove node from datanodeMap.
  400.    * 
  401.    * @param nodeID node
  402.    */
  403.   void wipeDatanode(DatanodeID nodeID) throws IOException {
  404.     String key = nodeID.getStorageID();
  405.     host2DataNodeMap.remove(datanodeMap.remove(key));
  406.     NameNode.stateChangeLog.debug(
  407.                                   "BLOCK* NameSystem.wipeDatanode: "
  408.                                   + nodeID.getName() + " storage " + key 
  409.                                   + " is removed from datanodeMap.");
  410.   }
  411.   FSImage getFSImage() {
  412.     return dir.fsImage;
  413.   }
  414.   FSEditLog getEditLog() {
  415.     return getFSImage().getEditLog();
  416.   }
  417.   /**
  418.    * Check if there are any expired heartbeats, and if so,
  419.    * whether any blocks have to be re-replicated.
  420.    * While removing dead datanodes, make sure that only one datanode is marked
  421.    * dead at a time within the synchronized section. Otherwise, a cascading
  422.    * effect causes more datanodes to be declared dead.
  423.    */
  424.   void heartbeatCheck() {
  425.     boolean allAlive = false;
  426.     while (!allAlive) {
  427.       boolean foundDead = false;
  428.       DatanodeID nodeID = null;
  429.       // locate the first dead node.
  430.       synchronized(heartbeats) {
  431.         for (Iterator<DatanodeDescriptor> it = heartbeats.iterator();
  432.              it.hasNext();) {
  433.           DatanodeDescriptor nodeInfo = it.next();
  434.           if (isDatanodeDead(nodeInfo)) {
  435.             foundDead = true;
  436.             nodeID = nodeInfo;
  437.             break;
  438.           }
  439.         }
  440.       }
  441.       // acquire the fsnamesystem lock, and then remove the dead node.
  442.       if (foundDead) {
  443.         synchronized (this) {
  444.           synchronized(heartbeats) {
  445.             synchronized (datanodeMap) {
  446.               DatanodeDescriptor nodeInfo = null;
  447.               try {
  448.                 nodeInfo = getDatanode(nodeID);
  449.               } catch (IOException e) {
  450.                 nodeInfo = null;
  451.               }
  452.               if (nodeInfo != null && isDatanodeDead(nodeInfo)) {
  453.                 NameNode.stateChangeLog.info("BLOCK* NameSystem.heartbeatCheck: "
  454.                                              + "lost heartbeat from " + nodeInfo.getName());
  455.                 removeDatanode(nodeInfo);
  456.               }
  457.             }
  458.           }
  459.         }
  460.       }
  461.       allAlive = !foundDead;
  462.     }
  463.   }
  464.     
  465.   /**
  466.    * The given node is reporting all its blocks.  Use this info to 
  467.    * update the (machine-->blocklist) and (block-->machinelist) tables.
  468.    */
  469.   public synchronized void processReport(DatanodeID nodeID, 
  470.                                          BlockListAsLongs newReport
  471.                                         ) throws IOException {
  472.     long startTime = now();
  473.     if (NameNode.stateChangeLog.isDebugEnabled()) {
  474.       NameNode.stateChangeLog.debug("BLOCK* NameSystem.processReport: "
  475.                              + "from " + nodeID.getName()+" " + 
  476.                              newReport.getNumberOfBlocks()+" blocks");
  477.     }
  478.     DatanodeDescriptor node = getDatanode(nodeID);
  479.     if (node == null) {
  480.       throw new IOException("ProcessReport from unregisterted node: "
  481.                             + nodeID.getName());
  482.     }
  483.     // Check if this datanode should actually be shutdown instead.
  484.     if (shouldNodeShutdown(node)) {
  485.       setDatanodeDead(node);
  486.       throw new DisallowedDatanodeException(node);
  487.     }
  488.     
  489.     //
  490.     // Modify the (block-->datanode) map, according to the difference
  491.     // between the old and new block report.
  492.     //
  493.     Collection<Block> toAdd = new LinkedList<Block>();
  494.     Collection<Block> toRemove = new LinkedList<Block>();
  495.     Collection<Block> toInvalidate = new LinkedList<Block>();
  496.     node.reportDiff(blocksMap, newReport, toAdd, toRemove, toInvalidate);
  497.         
  498.     for (Block b : toRemove) {
  499.       removeStoredBlock(b, node);
  500.     }
  501.     for (Block b : toAdd) {
  502.       addStoredBlock(b, node, null);
  503.     }
  504.     for (Block b : toInvalidate) {
  505.       NameNode.stateChangeLog.info("BLOCK* NameSystem.processReport: block " 
  506.           + b + " on " + node.getName() + " size " + b.getNumBytes()
  507.           + " does not belong to any file.");
  508.       addToInvalidates(b, node);
  509.     }
  510.     NameNode.getNameNodeMetrics().blockReport.inc((int) (now() - startTime));
  511.   }
  512.   /**
  513.    * Modify (block-->datanode) map.  Remove block from set of 
  514.    * needed replications if this takes care of the problem.
  515.    * @return the block that is stored in blockMap.
  516.    */
  517.   synchronized Block addStoredBlock(Block block, 
  518.                                     DatanodeDescriptor node,
  519.                                     DatanodeDescriptor delNodeHint) {
  520.     BlockInfo storedBlock = blocksMap.getStoredBlock(block);
  521.     if(storedBlock == null || storedBlock.getINode() == null) {
  522.       // If this block does not belong to anyfile, then we are done.
  523.       NameNode.stateChangeLog.info("BLOCK* NameSystem.addStoredBlock: "
  524.                                    + "addStoredBlock request received for " 
  525.                                    + block + " on " + node.getName()
  526.                                    + " size " + block.getNumBytes()
  527.                                    + " But it does not belong to any file.");
  528.       // we could add this block to invalidate set of this datanode. 
  529.       // it will happen in next block report otherwise.
  530.       return block;      
  531.     }
  532.      
  533.     // add block to the data-node
  534.     boolean added = node.addBlock(storedBlock);
  535.     
  536.     assert storedBlock != null : "Block must be stored by now";
  537.     if (block != storedBlock) {
  538.       if (block.getNumBytes() >= 0) {
  539.         long cursize = storedBlock.getNumBytes();
  540.         if (cursize == 0) {
  541.           storedBlock.setNumBytes(block.getNumBytes());
  542.         } else if (cursize != block.getNumBytes()) {
  543.           LOG.warn("Inconsistent size for block " + block + 
  544.                    " reported from " + node.getName() + 
  545.                    " current size is " + cursize +
  546.                    " reported size is " + block.getNumBytes());
  547.           try {
  548.             if (cursize > block.getNumBytes()) {
  549.               // new replica is smaller in size than existing block.
  550.               // Mark the new replica as corrupt.
  551.               LOG.warn("Mark new replica " + block + " from " + node.getName() + 
  552.                   "as corrupt because its length is shorter than existing ones");
  553.               markBlockAsCorrupt(block, node);
  554.             } else {
  555.               // new replica is larger in size than existing block.
  556.               // Mark pre-existing replicas as corrupt.
  557.               int numNodes = blocksMap.numNodes(block);
  558.               int count = 0;
  559.               DatanodeDescriptor nodes[] = new DatanodeDescriptor[numNodes];
  560.               Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
  561.               for (; it != null && it.hasNext(); ) {
  562.                 DatanodeDescriptor dd = it.next();
  563.                 if (!dd.equals(node)) {
  564.                   nodes[count++] = dd;
  565.                 }
  566.               }
  567.               for (int j = 0; j < count; j++) {
  568.                 LOG.warn("Mark existing replica " + block + " from " + node.getName() + 
  569.                 " as corrupt because its length is shorter than the new one");
  570.                 markBlockAsCorrupt(block, nodes[j]);
  571.               }
  572.               //
  573.               // change the size of block in blocksMap
  574.               //
  575.               storedBlock = blocksMap.getStoredBlock(block); //extra look up!
  576.               if (storedBlock == null) {
  577.                 LOG.warn("Block " + block + 
  578.                    " reported from " + node.getName() + 
  579.                    " does not exist in blockMap. Surprise! Surprise!");
  580.               } else {
  581.                 storedBlock.setNumBytes(block.getNumBytes());
  582.               }
  583.             }
  584.           } catch (IOException e) {
  585.             LOG.warn("Error in deleting bad block " + block + e);
  586.           }
  587.         }
  588.         
  589.         //Updated space consumed if required.
  590.         INodeFile file = (storedBlock != null) ? storedBlock.getINode() : null;
  591.         long diff = (file == null) ? 0 :
  592.                     (file.getPreferredBlockSize() - storedBlock.getNumBytes());
  593.         
  594.         if (diff > 0 && file.isUnderConstruction() &&
  595.             cursize < storedBlock.getNumBytes()) {
  596.           try {
  597.             String path = /* For finding parents */ 
  598.               leaseManager.findPath((INodeFileUnderConstruction)file);
  599.             dir.updateSpaceConsumed(path, 0, -diff*file.getReplication());
  600.           } catch (IOException e) {
  601.             LOG.warn("Unexpected exception while updating disk space : " +
  602.                      e.getMessage());
  603.           }
  604.         }
  605.       }
  606.       block = storedBlock;
  607.     }
  608.     assert storedBlock == block : "Block must be stored by now";
  609.         
  610.     int curReplicaDelta = 0;
  611.         
  612.     if (added) {
  613.       curReplicaDelta = 1;
  614.       // 
  615.       // At startup time, because too many new blocks come in
  616.       // they take up lots of space in the log file. 
  617.       // So, we log only when namenode is out of safemode.
  618.       //
  619.       if (!isInSafeMode()) {
  620.         NameNode.stateChangeLog.info("BLOCK* NameSystem.addStoredBlock: "
  621.                                       +"blockMap updated: "+node.getName()+" is added to "+block+" size "+block.getNumBytes());
  622.       }
  623.     } else {
  624.       NameNode.stateChangeLog.warn("BLOCK* NameSystem.addStoredBlock: "
  625.                                    + "Redundant addStoredBlock request received for " 
  626.                                    + block + " on " + node.getName()
  627.                                    + " size " + block.getNumBytes());
  628.     }
  629.     // filter out containingNodes that are marked for decommission.
  630.     NumberReplicas num = countNodes(storedBlock);
  631.     int numLiveReplicas = num.liveReplicas();
  632.     int numCurrentReplica = numLiveReplicas
  633.       + pendingReplications.getNumReplicas(block);
  634.     // check whether safe replication is reached for the block
  635.     incrementSafeBlockCount(numCurrentReplica);
  636.  
  637.     //
  638.     // if file is being actively written to, then do not check 
  639.     // replication-factor here. It will be checked when the file is closed.
  640.     //
  641.     INodeFile fileINode = null;
  642.     fileINode = storedBlock.getINode();
  643.     if (fileINode.isUnderConstruction()) {
  644.       return block;
  645.     }
  646.     // do not handle mis-replicated blocks during startup
  647.     if(isInSafeMode())
  648.       return block;
  649.     // handle underReplication/overReplication
  650.     short fileReplication = fileINode.getReplication();
  651.     if (numCurrentReplica >= fileReplication) {
  652.       neededReplications.remove(block, numCurrentReplica, 
  653.                                 num.decommissionedReplicas, fileReplication);
  654.     } else {
  655.       updateNeededReplications(block, curReplicaDelta, 0);
  656.     }
  657.     if (numCurrentReplica > fileReplication) {
  658.       processOverReplicatedBlock(block, fileReplication, node, delNodeHint);
  659.     }
  660.     // If the file replication has reached desired value
  661.     // we can remove any corrupt replicas the block may have
  662.     int corruptReplicasCount = corruptReplicas.numCorruptReplicas(block); 
  663.     int numCorruptNodes = num.corruptReplicas();
  664.     if ( numCorruptNodes != corruptReplicasCount) {
  665.       LOG.warn("Inconsistent number of corrupt replicas for " + 
  666.           block + "blockMap has " + numCorruptNodes + 
  667.           " but corrupt replicas map has " + corruptReplicasCount);
  668.     }
  669.     if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileReplication)) 
  670.       invalidateCorruptReplicas(block);
  671.     return block;
  672.   }
  673.   /**
  674.    * Invalidate corrupt replicas.
  675.    * <p>
  676.    * This will remove the replicas from the block's location list,
  677.    * add them to {@link #recentInvalidateSets} so that they could be further
  678.    * deleted from the respective data-nodes,
  679.    * and remove the block from corruptReplicasMap.
  680.    * <p>
  681.    * This method should be called when the block has sufficient
  682.    * number of live replicas.
  683.    *
  684.    * @param blk Block whose corrupt replicas need to be invalidated
  685.    */
  686.   void invalidateCorruptReplicas(Block blk) {
  687.     Collection<DatanodeDescriptor> nodes = corruptReplicas.getNodes(blk);
  688.     boolean gotException = false;
  689.     if (nodes == null)
  690.       return;
  691.     for (Iterator<DatanodeDescriptor> it = nodes.iterator(); it.hasNext(); ) {
  692.       DatanodeDescriptor node = it.next();
  693.       try {
  694.         invalidateBlock(blk, node);
  695.       } catch (IOException e) {
  696.         NameNode.stateChangeLog.info("NameNode.invalidateCorruptReplicas " +
  697.                                       "error in deleting bad block " + blk +
  698.                                       " on " + node + e);
  699.         gotException = true;
  700.       }
  701.     }
  702.     // Remove the block from corruptReplicasMap
  703.     if (!gotException)
  704.       corruptReplicas.removeFromCorruptReplicasMap(blk);
  705.   }
  706.   /**
  707.    * For each block in the name-node verify whether it belongs to any file,
  708.    * over or under replicated. Place it into the respective queue.
  709.    */
  710.   private synchronized void processMisReplicatedBlocks() {
  711.     long nrInvalid = 0, nrOverReplicated = 0, nrUnderReplicated = 0;
  712.     neededReplications.clear();
  713.     for(BlocksMap.BlockInfo block : blocksMap.getBlocks()) {
  714.       INodeFile fileINode = block.getINode();
  715.       if(fileINode == null) {
  716.         // block does not belong to any file
  717.         nrInvalid++;
  718.         addToInvalidates(block);
  719.         continue;
  720.       }
  721.       // calculate current replication
  722.       short expectedReplication = fileINode.getReplication();
  723.       NumberReplicas num = countNodes(block);
  724.       int numCurrentReplica = num.liveReplicas();
  725.       // add to under-replicated queue if need to be
  726.       if (neededReplications.add(block, 
  727.                                  numCurrentReplica,
  728.                                  num.decommissionedReplicas(),
  729.                                  expectedReplication)) {
  730.         nrUnderReplicated++;
  731.       }
  732.       if (numCurrentReplica > expectedReplication) {
  733.         // over-replicated block
  734.         nrOverReplicated++;
  735.         processOverReplicatedBlock(block, expectedReplication, null, null);
  736.       }
  737.     }
  738.     LOG.info("Total number of blocks = " + blocksMap.size());
  739.     LOG.info("Number of invalid blocks = " + nrInvalid);
  740.     LOG.info("Number of under-replicated blocks = " + nrUnderReplicated);
  741.     LOG.info("Number of  over-replicated blocks = " + nrOverReplicated);
  742.   }
  743.   /**
  744.    * Find how many of the containing nodes are "extra", if any.
  745.    * If there are any extras, call chooseExcessReplicates() to
  746.    * mark them in the excessReplicateMap.
  747.    */
  748.   private void processOverReplicatedBlock(Block block, short replication, 
  749.       DatanodeDescriptor addedNode, DatanodeDescriptor delNodeHint) {
  750.     if(addedNode == delNodeHint) {
  751.       delNodeHint = null;
  752.     }
  753.     Collection<DatanodeDescriptor> nonExcess = new ArrayList<DatanodeDescriptor>();
  754.     Collection<DatanodeDescriptor> corruptNodes = corruptReplicas.getNodes(block);
  755.     for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block); 
  756.          it.hasNext();) {
  757.       DatanodeDescriptor cur = it.next();
  758.       Collection<Block> excessBlocks = excessReplicateMap.get(cur.getStorageID());
  759.       if (excessBlocks == null || !excessBlocks.contains(block)) {
  760.         if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
  761.           // exclude corrupt replicas
  762.           if (corruptNodes == null || !corruptNodes.contains(cur)) {
  763.             nonExcess.add(cur);
  764.           }
  765.         }
  766.       }
  767.     }
  768.     chooseExcessReplicates(nonExcess, block, replication, 
  769.         addedNode, delNodeHint);    
  770.   }
  771.   /**
  772.    * We want "replication" replicates for the block, but we now have too many.  
  773.    * In this method, copy enough nodes from 'srcNodes' into 'dstNodes' such that:
  774.    *
  775.    * srcNodes.size() - dstNodes.size() == replication
  776.    *
  777.    * We pick node that make sure that replicas are spread across racks and
  778.    * also try hard to pick one with least free space.
  779.    * The algorithm is first to pick a node with least free space from nodes
  780.    * that are on a rack holding more than one replicas of the block.
  781.    * So removing such a replica won't remove a rack. 
  782.    * If no such a node is available,
  783.    * then pick a node with least free space
  784.    */
  785.   void chooseExcessReplicates(Collection<DatanodeDescriptor> nonExcess, 
  786.                               Block b, short replication,
  787.                               DatanodeDescriptor addedNode,
  788.                               DatanodeDescriptor delNodeHint) {
  789.     // first form a rack to datanodes map and
  790.     HashMap<String, ArrayList<DatanodeDescriptor>> rackMap =
  791.       new HashMap<String, ArrayList<DatanodeDescriptor>>();
  792.     for (Iterator<DatanodeDescriptor> iter = nonExcess.iterator();
  793.          iter.hasNext();) {
  794.       DatanodeDescriptor node = iter.next();
  795.       String rackName = node.getNetworkLocation();
  796.       ArrayList<DatanodeDescriptor> datanodeList = rackMap.get(rackName);
  797.       if(datanodeList==null) {
  798.         datanodeList = new ArrayList<DatanodeDescriptor>();
  799.       }
  800.       datanodeList.add(node);
  801.       rackMap.put(rackName, datanodeList);
  802.     }
  803.     
  804.     // split nodes into two sets
  805.     // priSet contains nodes on rack with more than one replica
  806.     // remains contains the remaining nodes
  807.     ArrayList<DatanodeDescriptor> priSet = new ArrayList<DatanodeDescriptor>();
  808.     ArrayList<DatanodeDescriptor> remains = new ArrayList<DatanodeDescriptor>();
  809.     for( Iterator<Entry<String, ArrayList<DatanodeDescriptor>>> iter = 
  810.       rackMap.entrySet().iterator(); iter.hasNext(); ) {
  811.       Entry<String, ArrayList<DatanodeDescriptor>> rackEntry = iter.next();
  812.       ArrayList<DatanodeDescriptor> datanodeList = rackEntry.getValue(); 
  813.       if( datanodeList.size() == 1 ) {
  814.         remains.add(datanodeList.get(0));
  815.       } else {
  816.         priSet.addAll(datanodeList);
  817.       }
  818.     }
  819.     
  820.     // pick one node to delete that favors the delete hint
  821.     // otherwise pick one with least space from priSet if it is not empty
  822.     // otherwise one node with least space from remains
  823.     boolean firstOne = true;
  824.     while (nonExcess.size() - replication > 0) {
  825.       DatanodeInfo cur = null;
  826.       long minSpace = Long.MAX_VALUE;
  827.       // check if we can del delNodeHint
  828.       if (firstOne && delNodeHint !=null && nonExcess.contains(delNodeHint) &&
  829.             (priSet.contains(delNodeHint) || (addedNode != null && !priSet.contains(addedNode))) ) {
  830.           cur = delNodeHint;
  831.       } else { // regular excessive replica removal
  832.         Iterator<DatanodeDescriptor> iter = 
  833.           priSet.isEmpty() ? remains.iterator() : priSet.iterator();
  834.           while( iter.hasNext() ) {
  835.             DatanodeDescriptor node = iter.next();
  836.             long free = node.getRemaining();
  837.             if (minSpace > free) {
  838.               minSpace = free;
  839.               cur = node;
  840.             }
  841.           }
  842.       }
  843.       firstOne = false;
  844.       // adjust rackmap, priSet, and remains
  845.       String rack = cur.getNetworkLocation();
  846.       ArrayList<DatanodeDescriptor> datanodes = rackMap.get(rack);
  847.       datanodes.remove(cur);
  848.       if(datanodes.isEmpty()) {
  849.         rackMap.remove(rack);
  850.       }
  851.       if( priSet.remove(cur) ) {
  852.         if (datanodes.size() == 1) {
  853.           priSet.remove(datanodes.get(0));
  854.           remains.add(datanodes.get(0));
  855.         }
  856.       } else {
  857.         remains.remove(cur);
  858.       }
  859.       nonExcess.remove(cur);
  860.       Collection<Block> excessBlocks = excessReplicateMap.get(cur.getStorageID());
  861.       if (excessBlocks == null) {
  862.         excessBlocks = new TreeSet<Block>();
  863.         excessReplicateMap.put(cur.getStorageID(), excessBlocks);
  864.       }
  865.       excessBlocks.add(b);
  866.       NameNode.stateChangeLog.debug("BLOCK* NameSystem.chooseExcessReplicates: "
  867.                                     +"("+cur.getName()+", "+b+") is added to excessReplicateMap");
  868.       //
  869.       // The 'excessblocks' tracks blocks until we get confirmation
  870.       // that the datanode has deleted them; the only way we remove them
  871.       // is when we get a "removeBlock" message.  
  872.       //
  873.       // The 'invalidate' list is used to inform the datanode the block 
  874.       // should be deleted.  Items are removed from the invalidate list
  875.       // upon giving instructions to the namenode.
  876.       //
  877.       addToInvalidatesNoLog(b, cur);
  878.       NameNode.stateChangeLog.info("BLOCK* NameSystem.chooseExcessReplicates: "
  879.                 +"("+cur.getName()+", "+b+") is added to recentInvalidateSets");
  880.     }
  881.   }
  882.   /**
  883.    * Modify (block-->datanode) map.  Possibly generate 
  884.    * replication tasks, if the removed block is still valid.
  885.    */
  886.   synchronized void removeStoredBlock(Block block, DatanodeDescriptor node) {
  887.     NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
  888.                                   +block + " from "+node.getName());
  889.     if (!blocksMap.removeNode(block, node)) {
  890.       NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
  891.                                     +block+" has already been removed from node "+node);
  892.       return;
  893.     }
  894.         
  895.     //
  896.     // It's possible that the block was removed because of a datanode
  897.     // failure.  If the block is still valid, check if replication is
  898.     // necessary.  In that case, put block on a possibly-will-
  899.     // be-replicated list.
  900.     //
  901.     INode fileINode = blocksMap.getINode(block);
  902.     if (fileINode != null) {
  903.       decrementSafeBlockCount(block);
  904.       updateNeededReplications(block, -1, 0);
  905.     }
  906.     //
  907.     // We've removed a block from a node, so it's definitely no longer
  908.     // in "excess" there.
  909.     //
  910.     Collection<Block> excessBlocks = excessReplicateMap.get(node.getStorageID());
  911.     if (excessBlocks != null) {
  912.       excessBlocks.remove(block);
  913.       NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
  914.                                     +block+" is removed from excessBlocks");
  915.       if (excessBlocks.size() == 0) {
  916.         excessReplicateMap.remove(node.getStorageID());
  917.       }
  918.     }
  919.     
  920.     // Remove the replica from corruptReplicas
  921.     corruptReplicas.removeFromCorruptReplicasMap(block, node);
  922.   }
  923.   /**
  924.    * The given node is reporting that it received a certain block.
  925.    */
  926.   public synchronized void blockReceived(DatanodeID nodeID,  
  927.                                          Block block,
  928.                                          String delHint
  929.                                          ) throws IOException {
  930.     DatanodeDescriptor node = getDatanode(nodeID);
  931.     if (node == null) {
  932.       NameNode.stateChangeLog.warn("BLOCK* NameSystem.blockReceived: "
  933.                                    + block + " is received from an unrecorded node " 
  934.                                    + nodeID.getName());
  935.       throw new IllegalArgumentException(
  936.                                          "Unexpected exception.  Got blockReceived message from node " 
  937.                                          + block + ", but there is no info for it");
  938.     }
  939.         
  940.     if (NameNode.stateChangeLog.isDebugEnabled()) {
  941.       NameNode.stateChangeLog.debug("BLOCK* NameSystem.blockReceived: "
  942.                                     +block+" is received from " + nodeID.getName());
  943.     }
  944.     // Check if this datanode should actually be shutdown instead.
  945.     if (shouldNodeShutdown(node)) {
  946.       setDatanodeDead(node);
  947.       throw new DisallowedDatanodeException(node);
  948.     }
  949.     // decrement number of blocks scheduled to this datanode.
  950.     node.decBlocksScheduled();
  951.     
  952.     // get the deletion hint node
  953.     DatanodeDescriptor delHintNode = null;
  954.     if(delHint!=null && delHint.length()!=0) {
  955.       delHintNode = datanodeMap.get(delHint);
  956.       if(delHintNode == null) {
  957.         NameNode.stateChangeLog.warn("BLOCK* NameSystem.blockReceived: "
  958.             + block
  959.             + " is expected to be removed from an unrecorded node " 
  960.             + delHint);
  961.       }
  962.     }
  963.     //
  964.     // Modify the blocks->datanode map and node's map.
  965.     // 
  966.     pendingReplications.remove(block);
  967.     addStoredBlock(block, node, delHintNode );
  968.   }
  969.   public long getMissingBlocksCount() {
  970.     // not locking
  971.     return Math.max(missingBlocksInPrevIter, missingBlocksInCurIter); 
  972.   }
  973.   
  974.   long[] getStats() throws IOException {
  975.     checkSuperuserPrivilege();
  976.     synchronized(heartbeats) {
  977.       return new long[] {this.capacityTotal, this.capacityUsed, 
  978.                          this.capacityRemaining,
  979.                          this.underReplicatedBlocksCount,
  980.                          this.corruptReplicaBlocksCount,
  981.                          getMissingBlocksCount()};
  982.     }
  983.   }
  984.   /**
  985.    * Total raw bytes including non-dfs used space.
  986.    */
  987.   public long getCapacityTotal() {
  988.     synchronized (heartbeats) {
  989.       return this.capacityTotal;
  990.     }
  991.   }
  992.   /**
  993.    * Total used space by data nodes
  994.    */
  995.   public long getCapacityUsed() {
  996.     synchronized(heartbeats){
  997.       return this.capacityUsed;
  998.     }
  999.   }
  1000.   /**
  1001.    * Total used space by data nodes as percentage of total capacity
  1002.    */
  1003.   public float getCapacityUsedPercent() {
  1004.     synchronized(heartbeats){
  1005.       if (capacityTotal <= 0) {
  1006.         return 100;
  1007.       }
  1008.       return ((float)capacityUsed * 100.0f)/(float)capacityTotal;
  1009.     }
  1010.   }
  1011.   /**
  1012.    * Total used space by data nodes for non DFS purposes such
  1013.    * as storing temporary files on the local file system
  1014.    */
  1015.   public long getCapacityUsedNonDFS() {
  1016.     long nonDFSUsed = 0;
  1017.     synchronized(heartbeats){
  1018.       nonDFSUsed = capacityTotal - capacityRemaining - capacityUsed;
  1019.     }
  1020.     return nonDFSUsed < 0 ? 0 : nonDFSUsed;
  1021.   }
  1022.   /**
  1023.    * Total non-used raw bytes.
  1024.    */
  1025.   public long getCapacityRemaining() {
  1026.     synchronized (heartbeats) {
  1027.       return this.capacityRemaining;
  1028.     }
  1029.   }
  1030.   /**
  1031.    * Total remaining space by data nodes as percentage of total capacity
  1032.    */
  1033.   public float getCapacityRemainingPercent() {
  1034.     synchronized(heartbeats){
  1035.       if (capacityTotal <= 0) {
  1036.         return 0;
  1037.       }
  1038.       return ((float)capacityRemaining * 100.0f)/(float)capacityTotal;
  1039.     }
  1040.   }
  1041.   /**
  1042.    * Total number of connections.
  1043.    */
  1044.   public int getTotalLoad() {
  1045.     synchronized (heartbeats) {
  1046.       return this.totalLoad;
  1047.     }
  1048.   }
  1049.   int getNumberOfDatanodes(DatanodeReportType type) {
  1050.     return getDatanodeListForReport(type).size(); 
  1051.   }
  1052.   private synchronized ArrayList<DatanodeDescriptor> getDatanodeListForReport(
  1053.                                                       DatanodeReportType type) {                  
  1054.     
  1055.     boolean listLiveNodes = type == DatanodeReportType.ALL ||
  1056.                             type == DatanodeReportType.LIVE;
  1057.     boolean listDeadNodes = type == DatanodeReportType.ALL ||
  1058.                             type == DatanodeReportType.DEAD;
  1059.     HashMap<String, String> mustList = new HashMap<String, String>();
  1060.     
  1061.     if (listDeadNodes) {
  1062.       //first load all the nodes listed in include and exclude files.
  1063.       for (Iterator<String> it = hostsReader.getHosts().iterator(); 
  1064.            it.hasNext();) {
  1065.         mustList.put(it.next(), "");
  1066.       }
  1067.       for (Iterator<String> it = hostsReader.getExcludedHosts().iterator(); 
  1068.            it.hasNext();) {
  1069.         mustList.put(it.next(), "");
  1070.       }
  1071.     }
  1072.    
  1073.     ArrayList<DatanodeDescriptor> nodes = null;
  1074.     
  1075.     synchronized (datanodeMap) {
  1076.       nodes = new ArrayList<DatanodeDescriptor>(datanodeMap.size() + 
  1077.                                                 mustList.size());
  1078.       
  1079.       for(Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator(); 
  1080.                                                                it.hasNext();) {
  1081.         DatanodeDescriptor dn = it.next();
  1082.         boolean isDead = isDatanodeDead(dn);
  1083.         if ( (isDead && listDeadNodes) || (!isDead && listLiveNodes) ) {
  1084.           nodes.add(dn);
  1085.         }
  1086.         //Remove any form of the this datanode in include/exclude lists.
  1087.         mustList.remove(dn.getName());
  1088.         mustList.remove(dn.getHost());
  1089.         mustList.remove(dn.getHostName());
  1090.       }
  1091.     }
  1092.     
  1093.     if (listDeadNodes) {
  1094.       for (Iterator<String> it = mustList.keySet().iterator(); it.hasNext();) {
  1095.         DatanodeDescriptor dn = 
  1096.             new DatanodeDescriptor(new DatanodeID(it.next()));
  1097.         dn.setLastUpdate(0);
  1098.         nodes.add(dn);
  1099.       }
  1100.     }
  1101.     
  1102.     return nodes;
  1103.   }
  1104.   public synchronized DatanodeInfo[] datanodeReport( DatanodeReportType type
  1105.       ) throws AccessControlException {
  1106.     checkSuperuserPrivilege();
  1107.     ArrayList<DatanodeDescriptor> results = getDatanodeListForReport(type);
  1108.     DatanodeInfo[] arr = new DatanodeInfo[results.size()];
  1109.     for (int i=0; i<arr.length; i++) {
  1110.       arr[i] = new DatanodeInfo(results.get(i));
  1111.     }
  1112.     return arr;
  1113.   }
  1114.   /**
  1115.    * Save namespace image.
  1116.    * This will save current namespace into fsimage file and empty edits file.
  1117.    * Requires superuser privilege and safe mode.
  1118.    * 
  1119.    * @throws AccessControlException if superuser privilege is violated.
  1120.    * @throws IOException if 
  1121.    */
  1122.   synchronized void saveNamespace() throws AccessControlException, IOException {
  1123.     checkSuperuserPrivilege();
  1124.     if(!isInSafeMode()) {
  1125.       throw new IOException("Safe mode should be turned ON " +
  1126.                             "in order to create namespace image.");
  1127.     }
  1128.     getFSImage().saveFSImage();
  1129.     LOG.info("New namespace image has been created.");
  1130.   }
  1131.   /**
  1132.    */
  1133.   public synchronized void DFSNodesStatus(ArrayList<DatanodeDescriptor> live, 
  1134.                                           ArrayList<DatanodeDescriptor> dead) {
  1135.     ArrayList<DatanodeDescriptor> results = 
  1136.                             getDatanodeListForReport(DatanodeReportType.ALL);    
  1137.     for(Iterator<DatanodeDescriptor> it = results.iterator(); it.hasNext();) {
  1138.       DatanodeDescriptor node = it.next();
  1139.       if (isDatanodeDead(node))
  1140.         dead.add(node);
  1141.       else
  1142.         live.add(node);
  1143.     }
  1144.   }
  1145.   /**
  1146.    * Prints information about all datanodes.
  1147.    */
  1148.   private synchronized void datanodeDump(PrintWriter out) {
  1149.     synchronized (datanodeMap) {
  1150.       out.println("Metasave: Number of datanodes: " + datanodeMap.size());
  1151.       for(Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator(); it.hasNext();) {
  1152.         DatanodeDescriptor node = it.next();
  1153.         out.println(node.dumpDatanode());
  1154.       }
  1155.     }
  1156.   }
  1157.   /**
  1158.    * Start decommissioning the specified datanode. 
  1159.    */
  1160.   private void startDecommission (DatanodeDescriptor node) 
  1161.     throws IOException {
  1162.     if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
  1163.       LOG.info("Start Decommissioning node " + node.getName());
  1164.       node.startDecommission();
  1165.       //
  1166.       // all the blocks that reside on this node have to be 
  1167.       // replicated.
  1168.       Iterator<Block> decommissionBlocks = node.getBlockIterator();
  1169.       while(decommissionBlocks.hasNext()) {
  1170.         Block block = decommissionBlocks.next();
  1171.         updateNeededReplications(block, -1, 0);
  1172.       }
  1173.     }
  1174.   }
  1175.   /**
  1176.    * Stop decommissioning the specified datanodes.
  1177.    */
  1178.   public void stopDecommission (DatanodeDescriptor node) 
  1179.     throws IOException {
  1180.     LOG.info("Stop Decommissioning node " + node.getName());
  1181.     node.stopDecommission();
  1182.   }
  1183.   /** 
  1184.    */
  1185.   public DatanodeInfo getDataNodeInfo(String name) {
  1186.     return datanodeMap.get(name);
  1187.   }
  1188.   /**
  1189.    * @deprecated use {@link NameNode#getNameNodeAddress()} instead.
  1190.    */
  1191.   @Deprecated
  1192.   public InetSocketAddress getDFSNameNodeAddress() {
  1193.     return nameNodeAddress;
  1194.   }
  1195.   /**
  1196.    */
  1197.   public Date getStartTime() {
  1198.     return new Date(systemStart); 
  1199.   }
  1200.     
  1201.   short getMaxReplication()     { return (short)maxReplication; }
  1202.   short getMinReplication()     { return (short)minReplication; }
  1203.   short getDefaultReplication() { return (short)defaultReplication; }
  1204.     
  1205.   /**
  1206.    * A immutable object that stores the number of live replicas and
  1207.    * the number of decommissined Replicas.
  1208.    */
  1209.   static class NumberReplicas {
  1210.     private int liveReplicas;
  1211.     private int decommissionedReplicas;
  1212.     private int corruptReplicas;
  1213.     private int excessReplicas;
  1214.     NumberReplicas() {
  1215.       initialize(0, 0, 0, 0);
  1216.     }
  1217.     NumberReplicas(int live, int decommissioned, int corrupt, int excess) {
  1218.       initialize(live, decommissioned, corrupt, excess);
  1219.     }
  1220.     void initialize(int live, int decommissioned, int corrupt, int excess) {
  1221.       liveReplicas = live;
  1222.       decommissionedReplicas = decommissioned;
  1223.       corruptReplicas = corrupt;
  1224.       excessReplicas = excess;
  1225.     }
  1226.     int liveReplicas() {
  1227.       return liveReplicas;
  1228.     }
  1229.     int decommissionedReplicas() {
  1230.       return decommissionedReplicas;
  1231.     }
  1232.     int corruptReplicas() {
  1233.       return corruptReplicas;
  1234.     }
  1235.     int excessReplicas() {
  1236.       return excessReplicas;
  1237.     }
  1238.   } 
  1239.   /**
  1240.    * Counts the number of nodes in the given list into active and
  1241.    * decommissioned counters.
  1242.    */
  1243.   private NumberReplicas countNodes(Block b,
  1244.                                     Iterator<DatanodeDescriptor> nodeIter) {
  1245.     int count = 0;
  1246.     int live = 0;
  1247.     int corrupt = 0;
  1248.     int excess = 0;
  1249.     Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
  1250.     while ( nodeIter.hasNext() ) {
  1251.       DatanodeDescriptor node = nodeIter.next();
  1252.       if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) {
  1253.         corrupt++;
  1254.       }
  1255.       else if (node.isDecommissionInProgress() || node.isDecommissioned()) {
  1256.         count++;
  1257.       }
  1258.       else  {
  1259.         Collection<Block> blocksExcess = 
  1260.           excessReplicateMap.get(node.getStorageID());
  1261.         if (blocksExcess != null && blocksExcess.contains(b)) {
  1262.           excess++;
  1263.         } else {
  1264.           live++;
  1265.         }
  1266.       }
  1267.     }
  1268.     return new NumberReplicas(live, count, corrupt, excess);
  1269.   }
  1270.   /**
  1271.    * Return the number of nodes that are live and decommissioned.
  1272.    */
  1273.   NumberReplicas countNodes(Block b) {
  1274.     return countNodes(b, blocksMap.nodeIterator(b));
  1275.   }
  1276.   /**
  1277.    * Return true if there are any blocks on this node that have not
  1278.    * yet reached their replication factor. Otherwise returns false.
  1279.    */
  1280.   private boolean isReplicationInProgress(DatanodeDescriptor srcNode) {
  1281.     boolean status = false;
  1282.     for(final Iterator<Block> i = srcNode.getBlockIterator(); i.hasNext(); ) {
  1283.       final Block block = i.next();
  1284.       INode fileINode = blocksMap.getINode(block);
  1285.       if (fileINode != null) {
  1286.         NumberReplicas num = countNodes(block);
  1287.         int curReplicas = num.liveReplicas();
  1288.         int curExpectedReplicas = getReplication(block);
  1289.         if (curExpectedReplicas > curReplicas) {
  1290.           status = true;
  1291.           if (!neededReplications.contains(block) &&
  1292.             pendingReplications.getNumReplicas(block) == 0) {
  1293.             //
  1294.             // These blocks have been reported from the datanode
  1295.             // after the startDecommission method has been executed. These
  1296.             // blocks were in flight when the decommission was started.
  1297.             //
  1298.             neededReplications.add(block, 
  1299.                                    curReplicas,
  1300.                                    num.decommissionedReplicas(),
  1301.                                    curExpectedReplicas);
  1302.           }
  1303.         }
  1304.       }
  1305.     }
  1306.     return status;
  1307.   }
  1308.   /**
  1309.    * Change, if appropriate, the admin state of a datanode to 
  1310.    * decommission completed. Return true if decommission is complete.
  1311.    */
  1312.   boolean checkDecommissionStateInternal(DatanodeDescriptor node) {
  1313.     //
  1314.     // Check to see if all blocks in this decommissioned
  1315.     // node has reached their target replication factor.
  1316.     //
  1317.     if (node.isDecommissionInProgress()) {
  1318.       if (!isReplicationInProgress(node)) {
  1319.         node.setDecommissioned();
  1320.         LOG.info("Decommission complete for node " + node.getName());
  1321.       }
  1322.     }
  1323.     if (node.isDecommissioned()) {
  1324.       return true;
  1325.     }
  1326.     return false;
  1327.   }
  1328.   /** 
  1329.    * Keeps track of which datanodes/ipaddress are allowed to connect to the namenode.
  1330.    */
  1331.   private boolean inHostsList(DatanodeID node, String ipAddr) {
  1332.     Set<String> hostsList = hostsReader.getHosts();
  1333.     return (hostsList.isEmpty() || 
  1334.             (ipAddr != null && hostsList.contains(ipAddr)) ||
  1335.             hostsList.contains(node.getHost()) ||
  1336.             hostsList.contains(node.getName()) || 
  1337.             ((node instanceof DatanodeInfo) && 
  1338.              hostsList.contains(((DatanodeInfo)node).getHostName())));
  1339.   }
  1340.   
  1341.   private boolean inExcludedHostsList(DatanodeID node, String ipAddr) {
  1342.     Set<String> excludeList = hostsReader.getExcludedHosts();
  1343.     return  ((ipAddr != null && excludeList.contains(ipAddr)) ||
  1344.             excludeList.contains(node.getHost()) ||
  1345.             excludeList.contains(node.getName()) ||
  1346.             ((node instanceof DatanodeInfo) && 
  1347.              excludeList.contains(((DatanodeInfo)node).getHostName())));
  1348.   }
  1349.   /**
  1350.    * Rereads the config to get hosts and exclude list file names.
  1351.    * Rereads the files to update the hosts and exclude lists.  It
  1352.    * checks if any of the hosts have changed states:
  1353.    * 1. Added to hosts  --> no further work needed here.
  1354.    * 2. Removed from hosts --> mark AdminState as decommissioned. 
  1355.    * 3. Added to exclude --> start decommission.
  1356.    * 4. Removed from exclude --> stop decommission.
  1357.    */
  1358.   public void refreshNodes(Configuration conf) throws IOException {
  1359.     checkSuperuserPrivilege();
  1360.     // Reread the config to get dfs.hosts and dfs.hosts.exclude filenames.
  1361.     // Update the file names and refresh internal includes and excludes list
  1362.     if (conf == null)
  1363.       conf = new Configuration();
  1364.     hostsReader.updateFileNames(conf.get("dfs.hosts",""), 
  1365.                                 conf.get("dfs.hosts.exclude", ""));
  1366.     hostsReader.refresh();
  1367.     synchronized (this) {
  1368.       for (Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
  1369.            it.hasNext();) {
  1370.         DatanodeDescriptor node = it.next();
  1371.         // Check if not include.
  1372.         if (!inHostsList(node, null)) {
  1373.           node.setDecommissioned();  // case 2.
  1374.         } else {
  1375.           if (inExcludedHostsList(node, null)) {
  1376.             if (!node.isDecommissionInProgress() && 
  1377.                 !node.isDecommissioned()) {
  1378.               startDecommission(node);   // case 3.
  1379.             }
  1380.           } else {
  1381.             if (node.isDecommissionInProgress() || 
  1382.                 node.isDecommissioned()) {
  1383.               stopDecommission(node);   // case 4.
  1384.             } 
  1385.           }
  1386.         }
  1387.       }
  1388.     } 
  1389.       
  1390.   }
  1391.     
  1392.   void finalizeUpgrade() throws IOException {
  1393.     checkSuperuserPrivilege();
  1394.     getFSImage().finalizeUpgrade();
  1395.   }
  1396.   /**
  1397.    * Checks if the node is not on the hosts list.  If it is not, then
  1398.    * it will be ignored.  If the node is in the hosts list, but is also 
  1399.    * on the exclude list, then it will be decommissioned.
  1400.    * Returns FALSE if node is rejected for registration. 
  1401.    * Returns TRUE if node is registered (including when it is on the 
  1402.    * exclude list and is being decommissioned). 
  1403.    */
  1404.   private synchronized boolean verifyNodeRegistration(DatanodeRegistration nodeReg, String ipAddr) 
  1405.     throws IOException {
  1406.     if (!inHostsList(nodeReg, ipAddr)) {
  1407.       return false;    
  1408.     }
  1409.     if (inExcludedHostsList(nodeReg, ipAddr)) {
  1410.       DatanodeDescriptor node = getDatanode(nodeReg);
  1411.       if (node == null) {
  1412.         throw new IOException("verifyNodeRegistration: unknown datanode " +
  1413.                               nodeReg.getName());
  1414.       }
  1415.       if (!checkDecommissionStateInternal(node)) {
  1416.         startDecommission(node);
  1417.       }
  1418.     } 
  1419.     return true;
  1420.   }
  1421.     
  1422.   /**
  1423.    * Checks if the Admin state bit is DECOMMISSIONED.  If so, then 
  1424.    * we should shut it down. 
  1425.    * 
  1426.    * Returns true if the node should be shutdown.
  1427.    */
  1428.   private boolean shouldNodeShutdown(DatanodeDescriptor node) {
  1429.     return (node.isDecommissioned());
  1430.   }
  1431.     
  1432.   /**
  1433.    * Get data node by storage ID.
  1434.    * 
  1435.    * @param nodeID
  1436.    * @return DatanodeDescriptor or null if the node is not found.
  1437.    * @throws IOException
  1438.    */
  1439.   public DatanodeDescriptor getDatanode(DatanodeID nodeID) throws IOException {
  1440.     UnregisteredDatanodeException e = null;
  1441.     DatanodeDescriptor node = datanodeMap.get(nodeID.getStorageID());
  1442.     if (node == null) 
  1443.       return null;
  1444.     if (!node.getName().equals(nodeID.getName())) {
  1445.       e = new UnregisteredDatanodeException(nodeID, node);
  1446.       NameNode.stateChangeLog.fatal("BLOCK* NameSystem.getDatanode: "
  1447.                                     + e.getLocalizedMessage());
  1448.       throw e;
  1449.     }
  1450.     return node;
  1451.   }
  1452.     
  1453.   /** Stop at and return the datanode at index (used for content browsing)*/
  1454.   @Deprecated
  1455.   private DatanodeDescriptor getDatanodeByIndex(int index) {
  1456.     int i = 0;
  1457.     for (DatanodeDescriptor node : datanodeMap.values()) {
  1458.       if (i == index) {
  1459.         return node;
  1460.       }
  1461.       i++;
  1462.     }
  1463.     return null;
  1464.   }
  1465.     
  1466.   @Deprecated
  1467.   public String randomDataNode() {
  1468.     int size = datanodeMap.size();
  1469.     int index = 0;
  1470.     if (size != 0) {
  1471.       index = r.nextInt(size);
  1472.       for(int i=0; i<size; i++) {
  1473.         DatanodeDescriptor d = getDatanodeByIndex(index);
  1474.         if (d != null && !d.isDecommissioned() && !isDatanodeDead(d) &&
  1475.             !d.isDecommissionInProgress()) {
  1476.           return d.getHost() + ":" + d.getInfoPort();
  1477.         }
  1478.         index = (index + 1) % size;
  1479.       }
  1480.     }
  1481.     return null;
  1482.   }
  1483.   public DatanodeDescriptor getRandomDatanode() {
  1484.     return replicator.chooseTarget(1, null, null, 0)[0];
  1485.   }
  1486.   /**
  1487.    * SafeModeInfo contains information related to the safe mode.
  1488.    * <p>
  1489.    * An instance of {@link SafeModeInfo} is created when the name node
  1490.    * enters safe mode.
  1491.    * <p>
  1492.    * During name node startup {@link SafeModeInfo} counts the number of
  1493.    * <em>safe blocks</em>, those that have at least the minimal number of
  1494.    * replicas, and calculates the ratio of safe blocks to the total number
  1495.    * of blocks in the system, which is the size of
  1496.    * {@link FSNamesystem#blocksMap}. When the ratio reaches the
  1497.    * {@link #threshold} it starts the {@link SafeModeMonitor} daemon in order
  1498.    * to monitor whether the safe mode {@link #extension} is passed.
  1499.    * Then it leaves safe mode and destroys itself.
  1500.    * <p>
  1501.    * If safe mode is turned on manually then the number of safe blocks is
  1502.    * not tracked because the name node is not intended to leave safe mode
  1503.    * automatically in the case.
  1504.    *
  1505.    * @see ClientProtocol#setSafeMode(FSConstants.SafeModeAction)
  1506.    * @see SafeModeMonitor
  1507.    */
  1508.   class SafeModeInfo {
  1509.     // configuration fields
  1510.     /** Safe mode threshold condition %.*/
  1511.     private double threshold;
  1512.     /** Safe mode extension after the threshold. */
  1513.     private int extension;
  1514.     /** Min replication required by safe mode. */
  1515.     private int safeReplication;
  1516.       
  1517.     // internal fields
  1518.     /** Time when threshold was reached.
  1519.      * 
  1520.      * <br>-1 safe mode is off
  1521.      * <br> 0 safe mode is on, but threshold is not reached yet 
  1522.      */
  1523.     private long reached = -1;  
  1524.     /** Total number of blocks. */
  1525.     int blockTotal; 
  1526.     /** Number of safe blocks. */
  1527.     private int blockSafe;
  1528.     /** time of the last status printout */
  1529.     private long lastStatusReport = 0;
  1530.       
  1531.     /**
  1532.      * Creates SafeModeInfo when the name node enters
  1533.      * automatic safe mode at startup.
  1534.      *  
  1535.      * @param conf configuration
  1536.      */
  1537.     SafeModeInfo(Configuration conf) {
  1538.       this.threshold = conf.getFloat("dfs.safemode.threshold.pct", 0.95f);
  1539.       this.extension = conf.getInt("dfs.safemode.extension", 0);
  1540.       this.safeReplication = conf.getInt("dfs.replication.min", 1);
  1541.       this.blockTotal = 0; 
  1542.       this.blockSafe = 0;
  1543.     }
  1544.     /**
  1545.      * Creates SafeModeInfo when safe mode is entered manually.
  1546.      *
  1547.      * The {@link #threshold} is set to 1.5 so that it could never be reached.
  1548.      * {@link #blockTotal} is set to -1 to indicate that safe mode is manual.
  1549.      * 
  1550.      * @see SafeModeInfo
  1551.      */
  1552.     private SafeModeInfo() {
  1553.       this.threshold = 1.5f;  // this threshold can never be reached
  1554.       this.extension = Integer.MAX_VALUE;
  1555.       this.safeReplication = Short.MAX_VALUE + 1; // more than maxReplication
  1556.       this.blockTotal = -1;
  1557.       this.blockSafe = -1;
  1558.       this.reached = -1;
  1559.       enter();
  1560.       reportStatus("STATE* Safe mode is ON.", true);
  1561.     }
  1562.       
  1563.     /**
  1564.      * Check if safe mode is on.
  1565.      * @return true if in safe mode
  1566.      */
  1567.     synchronized boolean isOn() {
  1568.       try {
  1569.         assert isConsistent() : " SafeMode: Inconsistent filesystem state: "
  1570.           + "Total num of blocks, active blocks, or "
  1571.           + "total safe blocks don't match.";
  1572.       } catch(IOException e) {
  1573.         System.err.print(StringUtils.stringifyException(e));
  1574.       }
  1575.       return this.reached >= 0;
  1576.     }
  1577.       
  1578.     /**
  1579.      * Enter safe mode.
  1580.      */
  1581.     void enter() {
  1582.       this.reached = 0;
  1583.     }
  1584.       
  1585.     /**
  1586.      * Leave safe mode.
  1587.      * <p>
  1588.      * Switch to manual safe mode if distributed upgrade is required.<br>
  1589.      * Check for invalid, under- & over-replicated blocks in the end of startup.
  1590.      */
  1591.     synchronized void leave(boolean checkForUpgrades) {
  1592.       if(checkForUpgrades) {
  1593.         // verify whether a distributed upgrade needs to be started
  1594.         boolean needUpgrade = false;
  1595.         try {
  1596.           needUpgrade = startDistributedUpgradeIfNeeded();
  1597.         } catch(IOException e) {
  1598.           FSNamesystem.LOG.error(StringUtils.stringifyException(e));
  1599.         }
  1600.         if(needUpgrade) {
  1601.           // switch to manual safe mode
  1602.           safeMode = new SafeModeInfo();
  1603.           return;
  1604.         }
  1605.       }
  1606.       // verify blocks replications
  1607.       processMisReplicatedBlocks();
  1608.       long timeInSafemode = now() - systemStart;
  1609.       NameNode.stateChangeLog.info("STATE* Leaving safe mode after " 
  1610.                                     + timeInSafemode/1000 + " secs.");
  1611.       NameNode.getNameNodeMetrics().safeModeTime.set((int) timeInSafemode);
  1612.       
  1613.       if (reached >= 0) {
  1614.         NameNode.stateChangeLog.info("STATE* Safe mode is OFF."); 
  1615.       }
  1616.       reached = -1;
  1617.       safeMode = null;
  1618.       NameNode.stateChangeLog.info("STATE* Network topology has "
  1619.                                    +clusterMap.getNumOfRacks()+" racks and "
  1620.                                    +clusterMap.getNumOfLeaves()+ " datanodes");
  1621.       NameNode.stateChangeLog.info("STATE* UnderReplicatedBlocks has "
  1622.                                    +neededReplications.size()+" blocks");
  1623.     }
  1624.       
  1625.     /** 
  1626.      * Safe mode can be turned off iff 
  1627.      * the threshold is reached and 
  1628.      * the extension time have passed.
  1629.      * @return true if can leave or false otherwise.
  1630.      */
  1631.     synchronized boolean canLeave() {
  1632.       if (reached == 0)
  1633.         return false;
  1634.       if (now() - reached < extension) {
  1635.         reportStatus("STATE* Safe mode ON.", false);
  1636.         return false;
  1637.       }
  1638.       return !needEnter();
  1639.     }
  1640.       
  1641.     /** 
  1642.      * There is no need to enter safe mode 
  1643.      * if DFS is empty or {@link #threshold} == 0
  1644.      */
  1645.     boolean needEnter() {
  1646.       return getSafeBlockRatio() < threshold;
  1647.     }
  1648.       
  1649.     /**
  1650.      * Ratio of the number of safe blocks to the total number of blocks 
  1651.      * to be compared with the threshold.
  1652.      */
  1653.     private float getSafeBlockRatio() {
  1654.       return (blockTotal == 0 ? 1 : (float)blockSafe/blockTotal);
  1655.     }
  1656.       
  1657.     /**
  1658.      * Check and trigger safe mode if needed. 
  1659.      */
  1660.     private void checkMode() {
  1661.       if (needEnter()) {
  1662.         enter();
  1663.         reportStatus("STATE* Safe mode ON.", false);
  1664.         return;
  1665.       }
  1666.       // the threshold is reached
  1667.       if (!isOn() ||                           // safe mode is off
  1668.           extension <= 0 || threshold <= 0) {  // don't need to wait
  1669.         this.leave(true); // leave safe mode
  1670.         return;
  1671.       }
  1672.       if (reached > 0) {  // threshold has already been reached before
  1673.         reportStatus("STATE* Safe mode ON.", false);
  1674.         return;
  1675.       }
  1676.       // start monitor
  1677.       reached = now();
  1678.       smmthread = new Daemon(new SafeModeMonitor());
  1679.       smmthread.start();
  1680.       reportStatus("STATE* Safe mode extension entered.", true);
  1681.     }
  1682.       
  1683.     /**
  1684.      * Set total number of blocks.
  1685.      */
  1686.     synchronized void setBlockTotal(int total) {
  1687.       this.blockTotal = total; 
  1688.       checkMode();
  1689.     }
  1690.       
  1691.     /**
  1692.      * Increment number of safe blocks if current block has 
  1693.      * reached minimal replication.
  1694.      * @param replication current replication 
  1695.      */
  1696.     synchronized void incrementSafeBlockCount(short replication) {
  1697.       if ((int)replication == safeReplication)
  1698.         this.blockSafe++;
  1699.       checkMode();
  1700.     }
  1701.       
  1702.     /**
  1703.      * Decrement number of safe blocks if current block has 
  1704.      * fallen below minimal replication.
  1705.      * @param replication current replication 
  1706.      */
  1707.     synchronized void decrementSafeBlockCount(short replication) {
  1708.       if (replication == safeReplication-1)
  1709.         this.blockSafe--;
  1710.       checkMode();
  1711.     }
  1712.     /**
  1713.      * Check if safe mode was entered manually or at startup.
  1714.      */
  1715.     boolean isManual() {
  1716.       return extension == Integer.MAX_VALUE;
  1717.     }
  1718.     /**
  1719.      * Set manual safe mode.
  1720.      */
  1721.     void setManual() {
  1722.       extension = Integer.MAX_VALUE;
  1723.     }
  1724.     /**
  1725.      * A tip on how safe mode is to be turned off: manually or automatically.
  1726.      */
  1727.     String getTurnOffTip() {
  1728.       String leaveMsg = "Safe mode will be turned off automatically";
  1729.       if(reached < 0)
  1730.         return "Safe mode is OFF.";
  1731.       if(isManual()) {
  1732.         if(getDistributedUpgradeState())
  1733.           return leaveMsg + " upon completion of " + 
  1734.             "the distributed upgrade: upgrade progress = " + 
  1735.             getDistributedUpgradeStatus() + "%";
  1736.         leaveMsg = "Use "hadoop dfs -safemode leave" to turn safe mode off";
  1737.       }
  1738.       if(blockTotal < 0)
  1739.         return leaveMsg + ".";
  1740.       String safeBlockRatioMsg = 
  1741.         String.format("The ratio of reported blocks %.4f has " +
  1742.           (reached == 0 ? "not " : "") + "reached the threshold %.4f. ",
  1743.           getSafeBlockRatio(), threshold) + leaveMsg;
  1744.       if(reached == 0 || isManual())  // threshold is not reached or manual
  1745.         return safeBlockRatioMsg + ".";
  1746.       // extension period is in progress
  1747.       return safeBlockRatioMsg + " in " 
  1748.             + Math.abs(reached + extension - now())/1000 + " seconds.";
  1749.     }
  1750.     /**
  1751.      * Print status every 20 seconds.
  1752.      */
  1753.     private void reportStatus(String msg, boolean rightNow) {
  1754.       long curTime = now();
  1755.       if(!rightNow && (curTime - lastStatusReport < 20 * 1000))
  1756.         return;
  1757.       NameNode.stateChangeLog.info(msg + " n" + getTurnOffTip());
  1758.       lastStatusReport = curTime;
  1759.     }
  1760.     /**
  1761.      * Returns printable state of the class.
  1762.      */
  1763.     public String toString() {
  1764.       String resText = "Current safe block ratio = " 
  1765.         + getSafeBlockRatio() 
  1766.         + ". Target threshold = " + threshold
  1767.         + ". Minimal replication = " + safeReplication + ".";
  1768.       if (reached > 0) 
  1769.         resText += " Threshold was reached " + new Date(reached) + ".";
  1770.       return resText;
  1771.     }
  1772.       
  1773.     /**
  1774.      * Checks consistency of the class state.
  1775.      * This is costly and currently called only in assert.
  1776.      */
  1777.     boolean isConsistent() throws IOException {
  1778.       if (blockTotal == -1 && blockSafe == -1) {
  1779.         return true; // manual safe mode
  1780.       }
  1781.       int activeBlocks = blocksMap.size();
  1782.       for(Iterator<Collection<Block>> it = 
  1783.             recentInvalidateSets.values().iterator(); it.hasNext();) {
  1784.         activeBlocks -= it.next().size();
  1785.       }
  1786.       return (blockTotal == activeBlocks) ||
  1787.         (blockSafe >= 0 && blockSafe <= blockTotal);
  1788.     }
  1789.   }
  1790.     
  1791.   /**
  1792.    * Periodically check whether it is time to leave safe mode.
  1793.    * This thread starts when the threshold level is reached.
  1794.    *
  1795.    */
  1796.   class SafeModeMonitor implements Runnable {
  1797.     /** interval in msec for checking safe mode: {@value} */
  1798.     private static final long recheckInterval = 1000;
  1799.       
  1800.     /**
  1801.      */
  1802.     public void run() {
  1803.       while (fsRunning && (safeMode != null && !safeMode.canLeave())) {
  1804.         try {
  1805.           Thread.sleep(recheckInterval);
  1806.         } catch (InterruptedException ie) {
  1807.         }
  1808.       }
  1809.       // leave safe mode and stop the monitor
  1810.       try {
  1811.         leaveSafeMode(true);
  1812.       } catch(SafeModeException es) { // should never happen
  1813.         String msg = "SafeModeMonitor may not run during distributed upgrade.";
  1814.         assert false : msg;
  1815.         throw new RuntimeException(msg, es);
  1816.       }
  1817.       smmthread = null;
  1818.     }
  1819.   }
  1820.     
  1821.   /**
  1822.    * Current system time.
  1823.    * @return current time in msec.
  1824.    */
  1825.   static long now() {
  1826.     return System.currentTimeMillis();
  1827.   }
  1828.     
  1829.   boolean setSafeMode(SafeModeAction action) throws IOException {
  1830.     if (action != SafeModeAction.SAFEMODE_GET) {
  1831.       checkSuperuserPrivilege();
  1832.       switch(action) {
  1833.       case SAFEMODE_LEAVE: // leave safe mode
  1834.         leaveSafeMode(false);
  1835.         break;
  1836.       case SAFEMODE_ENTER: // enter safe mode
  1837.         enterSafeMode();
  1838.         break;
  1839.       }
  1840.     }
  1841.     return isInSafeMode();
  1842.   }
  1843.   /**
  1844.    * Check whether the name node is in safe mode.
  1845.    * @return true if safe mode is ON, false otherwise
  1846.    */
  1847.   boolean isInSafeMode() {
  1848.     if (safeMode == null)
  1849.       return false;
  1850.     return safeMode.isOn();
  1851.   }
  1852.     
  1853.   /**
  1854.    * Increment number of blocks that reached minimal replication.
  1855.    * @param replication current replication 
  1856.    */
  1857.   void incrementSafeBlockCount(int replication) {
  1858.     if (safeMode == null)
  1859.       return;
  1860.     safeMode.incrementSafeBlockCount((short)replication);
  1861.   }
  1862.   /**
  1863.    * Decrement number of blocks that reached minimal replication.
  1864.    */
  1865.   void decrementSafeBlockCount(Block b) {
  1866.     if (safeMode == null) // mostly true
  1867.       return;
  1868.     safeMode.decrementSafeBlockCount((short)countNodes(b).liveReplicas());
  1869.   }
  1870.   /**
  1871.    * Set the total number of blocks in the system. 
  1872.    */
  1873.   void setBlockTotal() {
  1874.     if (safeMode == null)
  1875.       return;
  1876.     safeMode.setBlockTotal(blocksMap.size());
  1877.   }
  1878.   /**
  1879.    * Get the total number of blocks in the system. 
  1880.    */
  1881.   public long getBlocksTotal() {
  1882.     return blocksMap.size();
  1883.   }
  1884.   /**
  1885.    * Enter safe mode manually.
  1886.    * @throws IOException
  1887.    */
  1888.   synchronized void enterSafeMode() throws IOException {
  1889.     if (!isInSafeMode()) {
  1890.       safeMode = new SafeModeInfo();
  1891.       return;
  1892.     }
  1893.     safeMode.setManual();
  1894.     NameNode.stateChangeLog.info("STATE* Safe mode is ON. " 
  1895.                                 + safeMode.getTurnOffTip());
  1896.   }
  1897.   /**
  1898.    * Leave safe mode.
  1899.    * @throws IOException
  1900.    */
  1901.   synchronized void leaveSafeMode(boolean checkForUpgrades) throws SafeModeException {
  1902.     if (!isInSafeMode()) {
  1903.       NameNode.stateChangeLog.info("STATE* Safe mode is already OFF."); 
  1904.       return;
  1905.     }
  1906.     if(getDistributedUpgradeState())
  1907.       throw new SafeModeException("Distributed upgrade is in progress",
  1908.                                   safeMode);
  1909.     safeMode.leave(checkForUpgrades);
  1910.   }
  1911.     
  1912.   String getSafeModeTip() {
  1913.     if (!isInSafeMode())
  1914.       return "";
  1915.     return safeMode.getTurnOffTip();
  1916.   }
  1917.   long getEditLogSize() throws IOException {
  1918.     return getEditLog().getEditLogSize();
  1919.   }
  1920.   synchronized CheckpointSignature rollEditLog() throws IOException {
  1921.     if (isInSafeMode()) {
  1922.       throw new SafeModeException("Checkpoint not created",
  1923.                                   safeMode);
  1924.     }
  1925.     LOG.info("Roll Edit Log from " + Server.getRemoteAddress());
  1926.     return getFSImage().rollEditLog();
  1927.   }
  1928.   synchronized void rollFSImage() throws IOException {
  1929.     if (isInSafeMode()) {
  1930.       throw new SafeModeException("Checkpoint not created",
  1931.                                   safeMode);
  1932.     }
  1933.     LOG.info("Roll FSImage from " + Server.getRemoteAddress());
  1934.     getFSImage().rollFSImage();
  1935.   }
  1936.   /**
  1937.    * Returns whether the given block is one pointed-to by a file.
  1938.    */
  1939.   private boolean isValidBlock(Block b) {
  1940.     return (blocksMap.getINode(b) != null);
  1941.   }
  1942.   // Distributed upgrade manager
  1943.   UpgradeManagerNamenode upgradeManager = new UpgradeManagerNamenode();
  1944.   UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action 
  1945.                                                  ) throws IOException {
  1946.     return upgradeManager.distributedUpgradeProgress(action);
  1947.   }
  1948.   UpgradeCommand processDistributedUpgradeCommand(UpgradeCommand comm) throws IOException {
  1949.     return upgradeManager.processUpgradeCommand(comm);
  1950.   }
  1951.   int getDistributedUpgradeVersion() {
  1952.     return upgradeManager.getUpgradeVersion();
  1953.   }
  1954.   UpgradeCommand getDistributedUpgradeCommand() throws IOException {
  1955.     return upgradeManager.getBroadcastCommand();
  1956.   }
  1957.   boolean getDistributedUpgradeState() {
  1958.     return upgradeManager.getUpgradeState();
  1959.   }
  1960.   short getDistributedUpgradeStatus() {
  1961.     return upgradeManager.getUpgradeStatus();
  1962.   }
  1963.   boolean startDistributedUpgradeIfNeeded() throws IOException {
  1964.     return upgradeManager.startUpgrade();
  1965.   }
  1966.   PermissionStatus createFsOwnerPermissions(FsPermission permission) {
  1967.     return new PermissionStatus(fsOwner.getUserName(), supergroup, permission);
  1968.   }
  1969.   private PermissionChecker checkOwner(String path) throws AccessControlException {
  1970.     return checkPermission(path, true, null, null, null, null);
  1971.   }
  1972.   private PermissionChecker checkPathAccess(String path, FsAction access
  1973.       ) throws AccessControlException {
  1974.     return checkPermission(path, false, null, null, access, null);
  1975.   }
  1976.   private PermissionChecker checkParentAccess(String path, FsAction access
  1977.       ) throws AccessControlException {
  1978.     return checkPermission(path, false, null, access, null, null);
  1979.   }
  1980.   private PermissionChecker checkAncestorAccess(String path, FsAction access
  1981.       ) throws AccessControlException {
  1982.     return checkPermission(path, false, access, null, null, null);
  1983.   }
  1984.   private PermissionChecker checkTraverse(String path
  1985.       ) throws AccessControlException {
  1986.     return checkPermission(path, false, null, null, null, null);
  1987.   }
  1988.   private void checkSuperuserPrivilege() throws AccessControlException {
  1989.     if (isPermissionEnabled) {
  1990.       PermissionChecker pc = new PermissionChecker(
  1991.           fsOwner.getUserName(), supergroup);
  1992.       if (!pc.isSuper) {
  1993.         throw new AccessControlException("Superuser privilege is required");
  1994.       }
  1995.     }
  1996.   }
  1997.   /**
  1998.    * Check whether current user have permissions to access the path.
  1999.    * For more details of the parameters, see
  2000.    * {@link PermissionChecker#checkPermission(String, INodeDirectory, boolean, FsAction, FsAction, FsAction, FsAction)}.
  2001.    */
  2002.   private PermissionChecker checkPermission(String path, boolean doCheckOwner,
  2003.       FsAction ancestorAccess, FsAction parentAccess, FsAction access,
  2004.       FsAction subAccess) throws AccessControlException {
  2005.     PermissionChecker pc = new PermissionChecker(
  2006.         fsOwner.getUserName(), supergroup);
  2007.     if (!pc.isSuper) {
  2008.       dir.waitForReady();
  2009.       pc.checkPermission(path, dir.rootDir, doCheckOwner,
  2010.           ancestorAccess, parentAccess, access, subAccess);
  2011.     }
  2012.     return pc;
  2013.   }
  2014.   /**
  2015.    * Check to see if we have exceeded the limit on the number
  2016.    * of inodes.
  2017.    */
  2018.   void checkFsObjectLimit() throws IOException {
  2019.     if (maxFsObjects != 0 &&
  2020.         maxFsObjects <= dir.totalInodes() + getBlocksTotal()) {
  2021.       throw new IOException("Exceeded the configured number of objects " +
  2022.                              maxFsObjects + " in the filesystem.");
  2023.     }
  2024.   }
  2025.   /**
  2026.    * Get the total number of objects in the system. 
  2027.    */
  2028.   long getMaxObjects() {
  2029.     return maxFsObjects;
  2030.   }
  2031.   public long getFilesTotal() {
  2032.     return this.dir.totalInodes();
  2033.   }
  2034.   public long getPendingReplicationBlocks() {
  2035.     return pendingReplicationBlocksCount;
  2036.   }
  2037.   public long getUnderReplicatedBlocks() {
  2038.     return underReplicatedBlocksCount;
  2039.   }
  2040.   /** Returns number of blocks with corrupt replicas */
  2041.   public long getCorruptReplicaBlocksCount() {
  2042.     return corruptReplicaBlocksCount;
  2043.   }
  2044.   public long getScheduledReplicationBlocks() {
  2045.     return scheduledReplicationBlocksCount;
  2046.   }
  2047.   public String getFSState() {
  2048.     return isInSafeMode() ? "safeMode" : "Operational";
  2049.   }
  2050.   
  2051.   private ObjectName mbeanName;
  2052.   /**
  2053.    * Register the FSNamesystem MBean using the name
  2054.    *        "hadoop:service=NameNode,name=FSNamesystemState"
  2055.    */
  2056.   void registerMBean(Configuration conf) {
  2057.     // We wrap to bypass standard mbean naming convention.
  2058.     // This wraping can be removed in java 6 as it is more flexible in 
  2059.     // package naming for mbeans and their impl.
  2060.     StandardMBean bean;
  2061.     try {
  2062.       myFSMetrics = new FSNamesystemMetrics(conf);
  2063.       bean = new StandardMBean(this,FSNamesystemMBean.class);
  2064.       mbeanName = MBeanUtil.registerMBean("NameNode", "FSNamesystemState", bean);
  2065.     } catch (NotCompliantMBeanException e) {
  2066.       e.printStackTrace();
  2067.     }
  2068.     LOG.info("Registered FSNamesystemStatusMBean");
  2069.   }
  2070.   /**
  2071.    * get FSNamesystemMetrics
  2072.    */
  2073.   public FSNamesystemMetrics getFSNamesystemMetrics() {
  2074.     return myFSMetrics;
  2075.   }
  2076.   /**
  2077.    * shutdown FSNamesystem
  2078.    */
  2079.   public void shutdown() {
  2080.     if (mbeanName != null)
  2081.       MBeanUtil.unregisterMBean(mbeanName);
  2082.   }
  2083.   
  2084.   /**
  2085.    * Number of live data nodes
  2086.    * @return Number of live data nodes
  2087.    */
  2088.   public int numLiveDataNodes() {
  2089.     int numLive = 0;
  2090.     synchronized (datanodeMap) {   
  2091.       for(Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator(); 
  2092.                                                                it.hasNext();) {
  2093.         DatanodeDescriptor dn = it.next();
  2094.         if (!isDatanodeDead(dn) ) {
  2095.           numLive++;
  2096.         }
  2097.       }
  2098.     }
  2099.     return numLive;
  2100.   }
  2101.   
  2102.   /**
  2103.    * Number of dead data nodes
  2104.    * @return Number of dead data nodes
  2105.    */
  2106.   public int numDeadDataNodes() {
  2107.     int numDead = 0;
  2108.     synchronized (datanodeMap) {   
  2109.       for(Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator(); 
  2110.                                                                it.hasNext();) {
  2111.         DatanodeDescriptor dn = it.next();
  2112.         if (isDatanodeDead(dn) ) {
  2113.           numDead++;
  2114.         }
  2115.       }
  2116.     }
  2117.     return numDead;
  2118.   }
  2119.   /**
  2120.    * Sets the generation stamp for this filesystem
  2121.    */
  2122.   public void setGenerationStamp(long stamp) {
  2123.     generationStamp.setStamp(stamp);
  2124.   }
  2125.   /**
  2126.    * Gets the generation stamp for this filesystem
  2127.    */
  2128.   public long getGenerationStamp() {
  2129.     return generationStamp.getStamp();
  2130.   }
  2131.   /**
  2132.    * Increments, logs and then returns the stamp
  2133.    */
  2134.   long nextGenerationStamp() {
  2135.     long gs = generationStamp.nextStamp();
  2136.     getEditLog().logGenerationStamp(gs);
  2137.     return gs;
  2138.   }
  2139.   /**
  2140.    * Verifies that the block is associated with a file that has a lease.
  2141.    * Increments, logs and then returns the stamp
  2142.    */
  2143.   synchronized long nextGenerationStampForBlock(Block block) throws IOException {
  2144.     BlockInfo storedBlock = blocksMap.getStoredBlock(block);
  2145.     if (storedBlock == null) {
  2146.       String msg = block + " is already commited, storedBlock == null.";
  2147.       LOG.info(msg);
  2148.       throw new IOException(msg);
  2149.     }
  2150.     INodeFile fileINode = storedBlock.getINode();
  2151.     if (!fileINode.isUnderConstruction()) {
  2152.       String msg = block + " is already commited, !fileINode.isUnderConstruction().";
  2153.       LOG.info(msg);
  2154.       throw new IOException(msg);
  2155.     }
  2156.     if (!((INodeFileUnderConstruction)fileINode).setLastRecoveryTime(now())) {
  2157.       String msg = block + " is beening recovered, ignoring this request.";
  2158.       LOG.info(msg);
  2159.       throw new IOException(msg);
  2160.     }
  2161.     return nextGenerationStamp();
  2162.   }
  2163.   // rename was successful. If any part of the renamed subtree had
  2164.   // files that were being written to, update with new filename.
  2165.   //
  2166.   void changeLease(String src, String dst, FileStatus dinfo) 
  2167.                    throws IOException {
  2168.     String overwrite;
  2169.     String replaceBy;
  2170.     boolean destinationExisted = true;
  2171.     if (dinfo == null) {
  2172.       destinationExisted = false;
  2173.     }
  2174.     if (destinationExisted && dinfo.isDir()) {
  2175.       Path spath = new Path(src);
  2176.       overwrite = spath.getParent().toString() + Path.SEPARATOR;
  2177.       replaceBy = dst + Path.SEPARATOR;
  2178.     } else {
  2179.       overwrite = src;
  2180.       replaceBy = dst;
  2181.     }
  2182.     leaseManager.changeLease(src, dst, overwrite, replaceBy);
  2183.   }
  2184.            
  2185.   /**
  2186.    * Serializes leases. 
  2187.    */
  2188.   void saveFilesUnderConstruction(DataOutputStream out) throws IOException {
  2189.     synchronized (leaseManager) {
  2190.       out.writeInt(leaseManager.countPath()); // write the size
  2191.       for (Lease lease : leaseManager.getSortedLeases()) {
  2192.         for(String path : lease.getPaths()) {
  2193.           // verify that path exists in namespace
  2194.           INode node = dir.getFileINode(path);
  2195.           if (node == null) {
  2196.             throw new IOException("saveLeases found path " + path +
  2197.                                   " but no matching entry in namespace.");
  2198.           }
  2199.           if (!node.isUnderConstruction()) {
  2200.             throw new IOException("saveLeases found path " + path +
  2201.                                   " but is not under construction.");
  2202.           }
  2203.           INodeFileUnderConstruction cons = (INodeFileUnderConstruction) node;
  2204.           FSImage.writeINodeUnderConstruction(out, cons, path);
  2205.         }
  2206.       }
  2207.     }
  2208.   }
  2209. }