FSNamesystem.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:167k
- return workFound;
- }
- private int computeInvalidateWork(int nodesToProcess) {
- int blockCnt = 0;
- for(int nodeCnt = 0; nodeCnt < nodesToProcess; nodeCnt++ ) {
- int work = invalidateWorkForOneNode();
- if(work == 0)
- break;
- blockCnt += work;
- }
- return blockCnt;
- }
- /**
- * Scan blocks in {@link #neededReplications} and assign replication
- * work to data-nodes they belong to.
- *
- * The number of process blocks equals either twice the number of live
- * data-nodes or the number of under-replicated blocks whichever is less.
- *
- * @return number of blocks scheduled for replication during this iteration.
- */
- private int computeReplicationWork(
- int blocksToProcess) throws IOException {
- // Choose the blocks to be replicated
- List<List<Block>> blocksToReplicate =
- chooseUnderReplicatedBlocks(blocksToProcess);
- // replicate blocks
- int scheduledReplicationCount = 0;
- for (int i=0; i<blocksToReplicate.size(); i++) {
- for(Block block : blocksToReplicate.get(i)) {
- if (computeReplicationWorkForBlock(block, i)) {
- scheduledReplicationCount++;
- }
- }
- }
- return scheduledReplicationCount;
- }
-
- /** Get a list of block lists to be replicated
- * The index of block lists represents the
- *
- * @param blocksToProcess
- * @return Return a list of block lists to be replicated.
- * The block list index represents its replication priority.
- */
- synchronized List<List<Block>> chooseUnderReplicatedBlocks(int blocksToProcess) {
- // initialize data structure for the return value
- List<List<Block>> blocksToReplicate =
- new ArrayList<List<Block>>(UnderReplicatedBlocks.LEVEL);
- for (int i=0; i<UnderReplicatedBlocks.LEVEL; i++) {
- blocksToReplicate.add(new ArrayList<Block>());
- }
-
- synchronized(neededReplications) {
- if (neededReplications.size() == 0) {
- missingBlocksInCurIter = 0;
- missingBlocksInPrevIter = 0;
- return blocksToReplicate;
- }
-
- // Go through all blocks that need replications.
- BlockIterator neededReplicationsIterator = neededReplications.iterator();
- // skip to the first unprocessed block, which is at replIndex
- for(int i=0; i < replIndex && neededReplicationsIterator.hasNext(); i++) {
- neededReplicationsIterator.next();
- }
- // # of blocks to process equals either twice the number of live
- // data-nodes or the number of under-replicated blocks whichever is less
- blocksToProcess = Math.min(blocksToProcess, neededReplications.size());
- for (int blkCnt = 0; blkCnt < blocksToProcess; blkCnt++, replIndex++) {
- if( ! neededReplicationsIterator.hasNext()) {
- // start from the beginning
- replIndex = 0;
- missingBlocksInPrevIter = missingBlocksInCurIter;
- missingBlocksInCurIter = 0;
- blocksToProcess = Math.min(blocksToProcess, neededReplications.size());
- if(blkCnt >= blocksToProcess)
- break;
- neededReplicationsIterator = neededReplications.iterator();
- assert neededReplicationsIterator.hasNext() :
- "neededReplications should not be empty.";
- }
- Block block = neededReplicationsIterator.next();
- int priority = neededReplicationsIterator.getPriority();
- if (priority < 0 || priority >= blocksToReplicate.size()) {
- LOG.warn("Unexpected replication priority: " + priority + " " + block);
- } else {
- blocksToReplicate.get(priority).add(block);
- }
- } // end for
- } // end synchronized
- return blocksToReplicate;
- }
-
- /** Replicate a block
- *
- * @param block block to be replicated
- * @param priority a hint of its priority in the neededReplication queue
- * @return if the block gets replicated or not
- */
- boolean computeReplicationWorkForBlock(Block block, int priority) {
- int requiredReplication, numEffectiveReplicas;
- List<DatanodeDescriptor> containingNodes;
- DatanodeDescriptor srcNode;
-
- synchronized (this) {
- synchronized (neededReplications) {
- // block should belong to a file
- INodeFile fileINode = blocksMap.getINode(block);
- // abandoned block or block reopened for append
- if(fileINode == null || fileINode.isUnderConstruction()) {
- neededReplications.remove(block, priority); // remove from neededReplications
- replIndex--;
- return false;
- }
- requiredReplication = fileINode.getReplication();
- // get a source data-node
- containingNodes = new ArrayList<DatanodeDescriptor>();
- NumberReplicas numReplicas = new NumberReplicas();
- srcNode = chooseSourceDatanode(block, containingNodes, numReplicas);
- if ((numReplicas.liveReplicas() + numReplicas.decommissionedReplicas())
- <= 0) {
- missingBlocksInCurIter++;
- }
- if(srcNode == null) // block can not be replicated from any node
- return false;
- // do not schedule more if enough replicas is already pending
- numEffectiveReplicas = numReplicas.liveReplicas() +
- pendingReplications.getNumReplicas(block);
- if(numEffectiveReplicas >= requiredReplication) {
- neededReplications.remove(block, priority); // remove from neededReplications
- replIndex--;
- NameNode.stateChangeLog.info("BLOCK* "
- + "Removing block " + block
- + " from neededReplications as it has enough replicas.");
- return false;
- }
- }
- }
- // choose replication targets: NOT HODING THE GLOBAL LOCK
- DatanodeDescriptor targets[] = replicator.chooseTarget(
- requiredReplication - numEffectiveReplicas,
- srcNode, containingNodes, null, block.getNumBytes());
- if(targets.length == 0)
- return false;
- synchronized (this) {
- synchronized (neededReplications) {
- // Recheck since global lock was released
- // block should belong to a file
- INodeFile fileINode = blocksMap.getINode(block);
- // abandoned block or block reopened for append
- if(fileINode == null || fileINode.isUnderConstruction()) {
- neededReplications.remove(block, priority); // remove from neededReplications
- replIndex--;
- return false;
- }
- requiredReplication = fileINode.getReplication();
- // do not schedule more if enough replicas is already pending
- NumberReplicas numReplicas = countNodes(block);
- numEffectiveReplicas = numReplicas.liveReplicas() +
- pendingReplications.getNumReplicas(block);
- if(numEffectiveReplicas >= requiredReplication) {
- neededReplications.remove(block, priority); // remove from neededReplications
- replIndex--;
- NameNode.stateChangeLog.info("BLOCK* "
- + "Removing block " + block
- + " from neededReplications as it has enough replicas.");
- return false;
- }
- // Add block to the to be replicated list
- srcNode.addBlockToBeReplicated(block, targets);
- for (DatanodeDescriptor dn : targets) {
- dn.incBlocksScheduled();
- }
-
- // Move the block-replication into a "pending" state.
- // The reason we use 'pending' is so we can retry
- // replications that fail after an appropriate amount of time.
- pendingReplications.add(block, targets.length);
- NameNode.stateChangeLog.debug(
- "BLOCK* block " + block
- + " is moved from neededReplications to pendingReplications");
- // remove from neededReplications
- if(numEffectiveReplicas + targets.length >= requiredReplication) {
- neededReplications.remove(block, priority); // remove from neededReplications
- replIndex--;
- }
- if (NameNode.stateChangeLog.isInfoEnabled()) {
- StringBuffer targetList = new StringBuffer("datanode(s)");
- for (int k = 0; k < targets.length; k++) {
- targetList.append(' ');
- targetList.append(targets[k].getName());
- }
- NameNode.stateChangeLog.info(
- "BLOCK* ask "
- + srcNode.getName() + " to replicate "
- + block + " to " + targetList);
- NameNode.stateChangeLog.debug(
- "BLOCK* neededReplications = " + neededReplications.size()
- + " pendingReplications = " + pendingReplications.size());
- }
- }
- }
-
- return true;
- }
- /**
- * Parse the data-nodes the block belongs to and choose one,
- * which will be the replication source.
- *
- * We prefer nodes that are in DECOMMISSION_INPROGRESS state to other nodes
- * since the former do not have write traffic and hence are less busy.
- * We do not use already decommissioned nodes as a source.
- * Otherwise we choose a random node among those that did not reach their
- * replication limit.
- *
- * In addition form a list of all nodes containing the block
- * and calculate its replication numbers.
- */
- private DatanodeDescriptor chooseSourceDatanode(
- Block block,
- List<DatanodeDescriptor> containingNodes,
- NumberReplicas numReplicas) {
- containingNodes.clear();
- DatanodeDescriptor srcNode = null;
- int live = 0;
- int decommissioned = 0;
- int corrupt = 0;
- int excess = 0;
- Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
- Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(block);
- while(it.hasNext()) {
- DatanodeDescriptor node = it.next();
- Collection<Block> excessBlocks =
- excessReplicateMap.get(node.getStorageID());
- if ((nodesCorrupt != null) && (nodesCorrupt.contains(node)))
- corrupt++;
- else if (node.isDecommissionInProgress() || node.isDecommissioned())
- decommissioned++;
- else if (excessBlocks != null && excessBlocks.contains(block)) {
- excess++;
- } else {
- live++;
- }
- containingNodes.add(node);
- // Check if this replica is corrupt
- // If so, do not select the node as src node
- if ((nodesCorrupt != null) && nodesCorrupt.contains(node))
- continue;
- if(node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams)
- continue; // already reached replication limit
- // the block must not be scheduled for removal on srcNode
- if(excessBlocks != null && excessBlocks.contains(block))
- continue;
- // never use already decommissioned nodes
- if(node.isDecommissioned())
- continue;
- // we prefer nodes that are in DECOMMISSION_INPROGRESS state
- if(node.isDecommissionInProgress() || srcNode == null) {
- srcNode = node;
- continue;
- }
- if(srcNode.isDecommissionInProgress())
- continue;
- // switch to a different node randomly
- // this to prevent from deterministically selecting the same node even
- // if the node failed to replicate the block on previous iterations
- if(r.nextBoolean())
- srcNode = node;
- }
- if(numReplicas != null)
- numReplicas.initialize(live, decommissioned, corrupt, excess);
- return srcNode;
- }
- /**
- * Get blocks to invalidate for the first node
- * in {@link #recentInvalidateSets}.
- *
- * @return number of blocks scheduled for removal during this iteration.
- */
- private synchronized int invalidateWorkForOneNode() {
- // blocks should not be replicated or removed if safe mode is on
- if (isInSafeMode())
- return 0;
- if(recentInvalidateSets.isEmpty())
- return 0;
- // get blocks to invalidate for the first node
- String firstNodeId = recentInvalidateSets.keySet().iterator().next();
- assert firstNodeId != null;
- DatanodeDescriptor dn = datanodeMap.get(firstNodeId);
- Collection<Block> invalidateSet = recentInvalidateSets.remove(firstNodeId);
-
- if(invalidateSet == null || dn == null)
- return 0;
- ArrayList<Block> blocksToInvalidate =
- new ArrayList<Block>(blockInvalidateLimit);
- // # blocks that can be sent in one message is limited
- Iterator<Block> it = invalidateSet.iterator();
- for(int blkCount = 0; blkCount < blockInvalidateLimit && it.hasNext();
- blkCount++) {
- blocksToInvalidate.add(it.next());
- it.remove();
- }
- // If we could not send everything in this message, reinsert this item
- // into the collection.
- if(it.hasNext())
- recentInvalidateSets.put(firstNodeId, invalidateSet);
- dn.addBlocksToBeInvalidated(blocksToInvalidate);
- if(NameNode.stateChangeLog.isInfoEnabled()) {
- StringBuffer blockList = new StringBuffer();
- for(Block blk : blocksToInvalidate) {
- blockList.append(' ');
- blockList.append(blk);
- }
- NameNode.stateChangeLog.info("BLOCK* ask "
- + dn.getName() + " to delete " + blockList);
- }
- return blocksToInvalidate.size();
- }
- public void setNodeReplicationLimit(int limit) {
- this.maxReplicationStreams = limit;
- }
- /**
- * If there were any replication requests that timed out, reap them
- * and put them back into the neededReplication queue
- */
- void processPendingReplications() {
- Block[] timedOutItems = pendingReplications.getTimedOutBlocks();
- if (timedOutItems != null) {
- synchronized (this) {
- for (int i = 0; i < timedOutItems.length; i++) {
- NumberReplicas num = countNodes(timedOutItems[i]);
- neededReplications.add(timedOutItems[i],
- num.liveReplicas(),
- num.decommissionedReplicas(),
- getReplication(timedOutItems[i]));
- }
- }
- /* If we know the target datanodes where the replication timedout,
- * we could invoke decBlocksScheduled() on it. Its ok for now.
- */
- }
- }
- /**
- * remove a datanode descriptor
- * @param nodeID datanode ID
- */
- synchronized public void removeDatanode(DatanodeID nodeID)
- throws IOException {
- DatanodeDescriptor nodeInfo = getDatanode(nodeID);
- if (nodeInfo != null) {
- removeDatanode(nodeInfo);
- } else {
- NameNode.stateChangeLog.warn("BLOCK* NameSystem.removeDatanode: "
- + nodeID.getName() + " does not exist");
- }
- }
-
- /**
- * remove a datanode descriptor
- * @param nodeInfo datanode descriptor
- */
- private void removeDatanode(DatanodeDescriptor nodeInfo) {
- synchronized (heartbeats) {
- if (nodeInfo.isAlive) {
- updateStats(nodeInfo, false);
- heartbeats.remove(nodeInfo);
- nodeInfo.isAlive = false;
- }
- }
- for (Iterator<Block> it = nodeInfo.getBlockIterator(); it.hasNext();) {
- removeStoredBlock(it.next(), nodeInfo);
- }
- unprotectedRemoveDatanode(nodeInfo);
- clusterMap.remove(nodeInfo);
- }
- void unprotectedRemoveDatanode(DatanodeDescriptor nodeDescr) {
- nodeDescr.resetBlocks();
- removeFromInvalidates(nodeDescr);
- NameNode.stateChangeLog.debug(
- "BLOCK* NameSystem.unprotectedRemoveDatanode: "
- + nodeDescr.getName() + " is out of service now.");
- }
-
- void unprotectedAddDatanode(DatanodeDescriptor nodeDescr) {
- /* To keep host2DataNodeMap consistent with datanodeMap,
- remove from host2DataNodeMap the datanodeDescriptor removed
- from datanodeMap before adding nodeDescr to host2DataNodeMap.
- */
- host2DataNodeMap.remove(
- datanodeMap.put(nodeDescr.getStorageID(), nodeDescr));
- host2DataNodeMap.add(nodeDescr);
-
- NameNode.stateChangeLog.debug(
- "BLOCK* NameSystem.unprotectedAddDatanode: "
- + "node " + nodeDescr.getName() + " is added to datanodeMap.");
- }
- /**
- * Physically remove node from datanodeMap.
- *
- * @param nodeID node
- */
- void wipeDatanode(DatanodeID nodeID) throws IOException {
- String key = nodeID.getStorageID();
- host2DataNodeMap.remove(datanodeMap.remove(key));
- NameNode.stateChangeLog.debug(
- "BLOCK* NameSystem.wipeDatanode: "
- + nodeID.getName() + " storage " + key
- + " is removed from datanodeMap.");
- }
- FSImage getFSImage() {
- return dir.fsImage;
- }
- FSEditLog getEditLog() {
- return getFSImage().getEditLog();
- }
- /**
- * Check if there are any expired heartbeats, and if so,
- * whether any blocks have to be re-replicated.
- * While removing dead datanodes, make sure that only one datanode is marked
- * dead at a time within the synchronized section. Otherwise, a cascading
- * effect causes more datanodes to be declared dead.
- */
- void heartbeatCheck() {
- boolean allAlive = false;
- while (!allAlive) {
- boolean foundDead = false;
- DatanodeID nodeID = null;
- // locate the first dead node.
- synchronized(heartbeats) {
- for (Iterator<DatanodeDescriptor> it = heartbeats.iterator();
- it.hasNext();) {
- DatanodeDescriptor nodeInfo = it.next();
- if (isDatanodeDead(nodeInfo)) {
- foundDead = true;
- nodeID = nodeInfo;
- break;
- }
- }
- }
- // acquire the fsnamesystem lock, and then remove the dead node.
- if (foundDead) {
- synchronized (this) {
- synchronized(heartbeats) {
- synchronized (datanodeMap) {
- DatanodeDescriptor nodeInfo = null;
- try {
- nodeInfo = getDatanode(nodeID);
- } catch (IOException e) {
- nodeInfo = null;
- }
- if (nodeInfo != null && isDatanodeDead(nodeInfo)) {
- NameNode.stateChangeLog.info("BLOCK* NameSystem.heartbeatCheck: "
- + "lost heartbeat from " + nodeInfo.getName());
- removeDatanode(nodeInfo);
- }
- }
- }
- }
- }
- allAlive = !foundDead;
- }
- }
-
- /**
- * The given node is reporting all its blocks. Use this info to
- * update the (machine-->blocklist) and (block-->machinelist) tables.
- */
- public synchronized void processReport(DatanodeID nodeID,
- BlockListAsLongs newReport
- ) throws IOException {
- long startTime = now();
- if (NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug("BLOCK* NameSystem.processReport: "
- + "from " + nodeID.getName()+" " +
- newReport.getNumberOfBlocks()+" blocks");
- }
- DatanodeDescriptor node = getDatanode(nodeID);
- if (node == null) {
- throw new IOException("ProcessReport from unregisterted node: "
- + nodeID.getName());
- }
- // Check if this datanode should actually be shutdown instead.
- if (shouldNodeShutdown(node)) {
- setDatanodeDead(node);
- throw new DisallowedDatanodeException(node);
- }
-
- //
- // Modify the (block-->datanode) map, according to the difference
- // between the old and new block report.
- //
- Collection<Block> toAdd = new LinkedList<Block>();
- Collection<Block> toRemove = new LinkedList<Block>();
- Collection<Block> toInvalidate = new LinkedList<Block>();
- node.reportDiff(blocksMap, newReport, toAdd, toRemove, toInvalidate);
-
- for (Block b : toRemove) {
- removeStoredBlock(b, node);
- }
- for (Block b : toAdd) {
- addStoredBlock(b, node, null);
- }
- for (Block b : toInvalidate) {
- NameNode.stateChangeLog.info("BLOCK* NameSystem.processReport: block "
- + b + " on " + node.getName() + " size " + b.getNumBytes()
- + " does not belong to any file.");
- addToInvalidates(b, node);
- }
- NameNode.getNameNodeMetrics().blockReport.inc((int) (now() - startTime));
- }
- /**
- * Modify (block-->datanode) map. Remove block from set of
- * needed replications if this takes care of the problem.
- * @return the block that is stored in blockMap.
- */
- synchronized Block addStoredBlock(Block block,
- DatanodeDescriptor node,
- DatanodeDescriptor delNodeHint) {
- BlockInfo storedBlock = blocksMap.getStoredBlock(block);
- if(storedBlock == null || storedBlock.getINode() == null) {
- // If this block does not belong to anyfile, then we are done.
- NameNode.stateChangeLog.info("BLOCK* NameSystem.addStoredBlock: "
- + "addStoredBlock request received for "
- + block + " on " + node.getName()
- + " size " + block.getNumBytes()
- + " But it does not belong to any file.");
- // we could add this block to invalidate set of this datanode.
- // it will happen in next block report otherwise.
- return block;
- }
-
- // add block to the data-node
- boolean added = node.addBlock(storedBlock);
-
- assert storedBlock != null : "Block must be stored by now";
- if (block != storedBlock) {
- if (block.getNumBytes() >= 0) {
- long cursize = storedBlock.getNumBytes();
- if (cursize == 0) {
- storedBlock.setNumBytes(block.getNumBytes());
- } else if (cursize != block.getNumBytes()) {
- LOG.warn("Inconsistent size for block " + block +
- " reported from " + node.getName() +
- " current size is " + cursize +
- " reported size is " + block.getNumBytes());
- try {
- if (cursize > block.getNumBytes()) {
- // new replica is smaller in size than existing block.
- // Mark the new replica as corrupt.
- LOG.warn("Mark new replica " + block + " from " + node.getName() +
- "as corrupt because its length is shorter than existing ones");
- markBlockAsCorrupt(block, node);
- } else {
- // new replica is larger in size than existing block.
- // Mark pre-existing replicas as corrupt.
- int numNodes = blocksMap.numNodes(block);
- int count = 0;
- DatanodeDescriptor nodes[] = new DatanodeDescriptor[numNodes];
- Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
- for (; it != null && it.hasNext(); ) {
- DatanodeDescriptor dd = it.next();
- if (!dd.equals(node)) {
- nodes[count++] = dd;
- }
- }
- for (int j = 0; j < count; j++) {
- LOG.warn("Mark existing replica " + block + " from " + node.getName() +
- " as corrupt because its length is shorter than the new one");
- markBlockAsCorrupt(block, nodes[j]);
- }
- //
- // change the size of block in blocksMap
- //
- storedBlock = blocksMap.getStoredBlock(block); //extra look up!
- if (storedBlock == null) {
- LOG.warn("Block " + block +
- " reported from " + node.getName() +
- " does not exist in blockMap. Surprise! Surprise!");
- } else {
- storedBlock.setNumBytes(block.getNumBytes());
- }
- }
- } catch (IOException e) {
- LOG.warn("Error in deleting bad block " + block + e);
- }
- }
-
- //Updated space consumed if required.
- INodeFile file = (storedBlock != null) ? storedBlock.getINode() : null;
- long diff = (file == null) ? 0 :
- (file.getPreferredBlockSize() - storedBlock.getNumBytes());
-
- if (diff > 0 && file.isUnderConstruction() &&
- cursize < storedBlock.getNumBytes()) {
- try {
- String path = /* For finding parents */
- leaseManager.findPath((INodeFileUnderConstruction)file);
- dir.updateSpaceConsumed(path, 0, -diff*file.getReplication());
- } catch (IOException e) {
- LOG.warn("Unexpected exception while updating disk space : " +
- e.getMessage());
- }
- }
- }
- block = storedBlock;
- }
- assert storedBlock == block : "Block must be stored by now";
-
- int curReplicaDelta = 0;
-
- if (added) {
- curReplicaDelta = 1;
- //
- // At startup time, because too many new blocks come in
- // they take up lots of space in the log file.
- // So, we log only when namenode is out of safemode.
- //
- if (!isInSafeMode()) {
- NameNode.stateChangeLog.info("BLOCK* NameSystem.addStoredBlock: "
- +"blockMap updated: "+node.getName()+" is added to "+block+" size "+block.getNumBytes());
- }
- } else {
- NameNode.stateChangeLog.warn("BLOCK* NameSystem.addStoredBlock: "
- + "Redundant addStoredBlock request received for "
- + block + " on " + node.getName()
- + " size " + block.getNumBytes());
- }
- // filter out containingNodes that are marked for decommission.
- NumberReplicas num = countNodes(storedBlock);
- int numLiveReplicas = num.liveReplicas();
- int numCurrentReplica = numLiveReplicas
- + pendingReplications.getNumReplicas(block);
- // check whether safe replication is reached for the block
- incrementSafeBlockCount(numCurrentReplica);
-
- //
- // if file is being actively written to, then do not check
- // replication-factor here. It will be checked when the file is closed.
- //
- INodeFile fileINode = null;
- fileINode = storedBlock.getINode();
- if (fileINode.isUnderConstruction()) {
- return block;
- }
- // do not handle mis-replicated blocks during startup
- if(isInSafeMode())
- return block;
- // handle underReplication/overReplication
- short fileReplication = fileINode.getReplication();
- if (numCurrentReplica >= fileReplication) {
- neededReplications.remove(block, numCurrentReplica,
- num.decommissionedReplicas, fileReplication);
- } else {
- updateNeededReplications(block, curReplicaDelta, 0);
- }
- if (numCurrentReplica > fileReplication) {
- processOverReplicatedBlock(block, fileReplication, node, delNodeHint);
- }
- // If the file replication has reached desired value
- // we can remove any corrupt replicas the block may have
- int corruptReplicasCount = corruptReplicas.numCorruptReplicas(block);
- int numCorruptNodes = num.corruptReplicas();
- if ( numCorruptNodes != corruptReplicasCount) {
- LOG.warn("Inconsistent number of corrupt replicas for " +
- block + "blockMap has " + numCorruptNodes +
- " but corrupt replicas map has " + corruptReplicasCount);
- }
- if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileReplication))
- invalidateCorruptReplicas(block);
- return block;
- }
- /**
- * Invalidate corrupt replicas.
- * <p>
- * This will remove the replicas from the block's location list,
- * add them to {@link #recentInvalidateSets} so that they could be further
- * deleted from the respective data-nodes,
- * and remove the block from corruptReplicasMap.
- * <p>
- * This method should be called when the block has sufficient
- * number of live replicas.
- *
- * @param blk Block whose corrupt replicas need to be invalidated
- */
- void invalidateCorruptReplicas(Block blk) {
- Collection<DatanodeDescriptor> nodes = corruptReplicas.getNodes(blk);
- boolean gotException = false;
- if (nodes == null)
- return;
- for (Iterator<DatanodeDescriptor> it = nodes.iterator(); it.hasNext(); ) {
- DatanodeDescriptor node = it.next();
- try {
- invalidateBlock(blk, node);
- } catch (IOException e) {
- NameNode.stateChangeLog.info("NameNode.invalidateCorruptReplicas " +
- "error in deleting bad block " + blk +
- " on " + node + e);
- gotException = true;
- }
- }
- // Remove the block from corruptReplicasMap
- if (!gotException)
- corruptReplicas.removeFromCorruptReplicasMap(blk);
- }
- /**
- * For each block in the name-node verify whether it belongs to any file,
- * over or under replicated. Place it into the respective queue.
- */
- private synchronized void processMisReplicatedBlocks() {
- long nrInvalid = 0, nrOverReplicated = 0, nrUnderReplicated = 0;
- neededReplications.clear();
- for(BlocksMap.BlockInfo block : blocksMap.getBlocks()) {
- INodeFile fileINode = block.getINode();
- if(fileINode == null) {
- // block does not belong to any file
- nrInvalid++;
- addToInvalidates(block);
- continue;
- }
- // calculate current replication
- short expectedReplication = fileINode.getReplication();
- NumberReplicas num = countNodes(block);
- int numCurrentReplica = num.liveReplicas();
- // add to under-replicated queue if need to be
- if (neededReplications.add(block,
- numCurrentReplica,
- num.decommissionedReplicas(),
- expectedReplication)) {
- nrUnderReplicated++;
- }
- if (numCurrentReplica > expectedReplication) {
- // over-replicated block
- nrOverReplicated++;
- processOverReplicatedBlock(block, expectedReplication, null, null);
- }
- }
- LOG.info("Total number of blocks = " + blocksMap.size());
- LOG.info("Number of invalid blocks = " + nrInvalid);
- LOG.info("Number of under-replicated blocks = " + nrUnderReplicated);
- LOG.info("Number of over-replicated blocks = " + nrOverReplicated);
- }
- /**
- * Find how many of the containing nodes are "extra", if any.
- * If there are any extras, call chooseExcessReplicates() to
- * mark them in the excessReplicateMap.
- */
- private void processOverReplicatedBlock(Block block, short replication,
- DatanodeDescriptor addedNode, DatanodeDescriptor delNodeHint) {
- if(addedNode == delNodeHint) {
- delNodeHint = null;
- }
- Collection<DatanodeDescriptor> nonExcess = new ArrayList<DatanodeDescriptor>();
- Collection<DatanodeDescriptor> corruptNodes = corruptReplicas.getNodes(block);
- for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
- it.hasNext();) {
- DatanodeDescriptor cur = it.next();
- Collection<Block> excessBlocks = excessReplicateMap.get(cur.getStorageID());
- if (excessBlocks == null || !excessBlocks.contains(block)) {
- if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
- // exclude corrupt replicas
- if (corruptNodes == null || !corruptNodes.contains(cur)) {
- nonExcess.add(cur);
- }
- }
- }
- }
- chooseExcessReplicates(nonExcess, block, replication,
- addedNode, delNodeHint);
- }
- /**
- * We want "replication" replicates for the block, but we now have too many.
- * In this method, copy enough nodes from 'srcNodes' into 'dstNodes' such that:
- *
- * srcNodes.size() - dstNodes.size() == replication
- *
- * We pick node that make sure that replicas are spread across racks and
- * also try hard to pick one with least free space.
- * The algorithm is first to pick a node with least free space from nodes
- * that are on a rack holding more than one replicas of the block.
- * So removing such a replica won't remove a rack.
- * If no such a node is available,
- * then pick a node with least free space
- */
- void chooseExcessReplicates(Collection<DatanodeDescriptor> nonExcess,
- Block b, short replication,
- DatanodeDescriptor addedNode,
- DatanodeDescriptor delNodeHint) {
- // first form a rack to datanodes map and
- HashMap<String, ArrayList<DatanodeDescriptor>> rackMap =
- new HashMap<String, ArrayList<DatanodeDescriptor>>();
- for (Iterator<DatanodeDescriptor> iter = nonExcess.iterator();
- iter.hasNext();) {
- DatanodeDescriptor node = iter.next();
- String rackName = node.getNetworkLocation();
- ArrayList<DatanodeDescriptor> datanodeList = rackMap.get(rackName);
- if(datanodeList==null) {
- datanodeList = new ArrayList<DatanodeDescriptor>();
- }
- datanodeList.add(node);
- rackMap.put(rackName, datanodeList);
- }
-
- // split nodes into two sets
- // priSet contains nodes on rack with more than one replica
- // remains contains the remaining nodes
- ArrayList<DatanodeDescriptor> priSet = new ArrayList<DatanodeDescriptor>();
- ArrayList<DatanodeDescriptor> remains = new ArrayList<DatanodeDescriptor>();
- for( Iterator<Entry<String, ArrayList<DatanodeDescriptor>>> iter =
- rackMap.entrySet().iterator(); iter.hasNext(); ) {
- Entry<String, ArrayList<DatanodeDescriptor>> rackEntry = iter.next();
- ArrayList<DatanodeDescriptor> datanodeList = rackEntry.getValue();
- if( datanodeList.size() == 1 ) {
- remains.add(datanodeList.get(0));
- } else {
- priSet.addAll(datanodeList);
- }
- }
-
- // pick one node to delete that favors the delete hint
- // otherwise pick one with least space from priSet if it is not empty
- // otherwise one node with least space from remains
- boolean firstOne = true;
- while (nonExcess.size() - replication > 0) {
- DatanodeInfo cur = null;
- long minSpace = Long.MAX_VALUE;
- // check if we can del delNodeHint
- if (firstOne && delNodeHint !=null && nonExcess.contains(delNodeHint) &&
- (priSet.contains(delNodeHint) || (addedNode != null && !priSet.contains(addedNode))) ) {
- cur = delNodeHint;
- } else { // regular excessive replica removal
- Iterator<DatanodeDescriptor> iter =
- priSet.isEmpty() ? remains.iterator() : priSet.iterator();
- while( iter.hasNext() ) {
- DatanodeDescriptor node = iter.next();
- long free = node.getRemaining();
- if (minSpace > free) {
- minSpace = free;
- cur = node;
- }
- }
- }
- firstOne = false;
- // adjust rackmap, priSet, and remains
- String rack = cur.getNetworkLocation();
- ArrayList<DatanodeDescriptor> datanodes = rackMap.get(rack);
- datanodes.remove(cur);
- if(datanodes.isEmpty()) {
- rackMap.remove(rack);
- }
- if( priSet.remove(cur) ) {
- if (datanodes.size() == 1) {
- priSet.remove(datanodes.get(0));
- remains.add(datanodes.get(0));
- }
- } else {
- remains.remove(cur);
- }
- nonExcess.remove(cur);
- Collection<Block> excessBlocks = excessReplicateMap.get(cur.getStorageID());
- if (excessBlocks == null) {
- excessBlocks = new TreeSet<Block>();
- excessReplicateMap.put(cur.getStorageID(), excessBlocks);
- }
- excessBlocks.add(b);
- NameNode.stateChangeLog.debug("BLOCK* NameSystem.chooseExcessReplicates: "
- +"("+cur.getName()+", "+b+") is added to excessReplicateMap");
- //
- // The 'excessblocks' tracks blocks until we get confirmation
- // that the datanode has deleted them; the only way we remove them
- // is when we get a "removeBlock" message.
- //
- // The 'invalidate' list is used to inform the datanode the block
- // should be deleted. Items are removed from the invalidate list
- // upon giving instructions to the namenode.
- //
- addToInvalidatesNoLog(b, cur);
- NameNode.stateChangeLog.info("BLOCK* NameSystem.chooseExcessReplicates: "
- +"("+cur.getName()+", "+b+") is added to recentInvalidateSets");
- }
- }
- /**
- * Modify (block-->datanode) map. Possibly generate
- * replication tasks, if the removed block is still valid.
- */
- synchronized void removeStoredBlock(Block block, DatanodeDescriptor node) {
- NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
- +block + " from "+node.getName());
- if (!blocksMap.removeNode(block, node)) {
- NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
- +block+" has already been removed from node "+node);
- return;
- }
-
- //
- // It's possible that the block was removed because of a datanode
- // failure. If the block is still valid, check if replication is
- // necessary. In that case, put block on a possibly-will-
- // be-replicated list.
- //
- INode fileINode = blocksMap.getINode(block);
- if (fileINode != null) {
- decrementSafeBlockCount(block);
- updateNeededReplications(block, -1, 0);
- }
- //
- // We've removed a block from a node, so it's definitely no longer
- // in "excess" there.
- //
- Collection<Block> excessBlocks = excessReplicateMap.get(node.getStorageID());
- if (excessBlocks != null) {
- excessBlocks.remove(block);
- NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
- +block+" is removed from excessBlocks");
- if (excessBlocks.size() == 0) {
- excessReplicateMap.remove(node.getStorageID());
- }
- }
-
- // Remove the replica from corruptReplicas
- corruptReplicas.removeFromCorruptReplicasMap(block, node);
- }
- /**
- * The given node is reporting that it received a certain block.
- */
- public synchronized void blockReceived(DatanodeID nodeID,
- Block block,
- String delHint
- ) throws IOException {
- DatanodeDescriptor node = getDatanode(nodeID);
- if (node == null) {
- NameNode.stateChangeLog.warn("BLOCK* NameSystem.blockReceived: "
- + block + " is received from an unrecorded node "
- + nodeID.getName());
- throw new IllegalArgumentException(
- "Unexpected exception. Got blockReceived message from node "
- + block + ", but there is no info for it");
- }
-
- if (NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug("BLOCK* NameSystem.blockReceived: "
- +block+" is received from " + nodeID.getName());
- }
- // Check if this datanode should actually be shutdown instead.
- if (shouldNodeShutdown(node)) {
- setDatanodeDead(node);
- throw new DisallowedDatanodeException(node);
- }
- // decrement number of blocks scheduled to this datanode.
- node.decBlocksScheduled();
-
- // get the deletion hint node
- DatanodeDescriptor delHintNode = null;
- if(delHint!=null && delHint.length()!=0) {
- delHintNode = datanodeMap.get(delHint);
- if(delHintNode == null) {
- NameNode.stateChangeLog.warn("BLOCK* NameSystem.blockReceived: "
- + block
- + " is expected to be removed from an unrecorded node "
- + delHint);
- }
- }
- //
- // Modify the blocks->datanode map and node's map.
- //
- pendingReplications.remove(block);
- addStoredBlock(block, node, delHintNode );
- }
- public long getMissingBlocksCount() {
- // not locking
- return Math.max(missingBlocksInPrevIter, missingBlocksInCurIter);
- }
-
- long[] getStats() throws IOException {
- checkSuperuserPrivilege();
- synchronized(heartbeats) {
- return new long[] {this.capacityTotal, this.capacityUsed,
- this.capacityRemaining,
- this.underReplicatedBlocksCount,
- this.corruptReplicaBlocksCount,
- getMissingBlocksCount()};
- }
- }
- /**
- * Total raw bytes including non-dfs used space.
- */
- public long getCapacityTotal() {
- synchronized (heartbeats) {
- return this.capacityTotal;
- }
- }
- /**
- * Total used space by data nodes
- */
- public long getCapacityUsed() {
- synchronized(heartbeats){
- return this.capacityUsed;
- }
- }
- /**
- * Total used space by data nodes as percentage of total capacity
- */
- public float getCapacityUsedPercent() {
- synchronized(heartbeats){
- if (capacityTotal <= 0) {
- return 100;
- }
- return ((float)capacityUsed * 100.0f)/(float)capacityTotal;
- }
- }
- /**
- * Total used space by data nodes for non DFS purposes such
- * as storing temporary files on the local file system
- */
- public long getCapacityUsedNonDFS() {
- long nonDFSUsed = 0;
- synchronized(heartbeats){
- nonDFSUsed = capacityTotal - capacityRemaining - capacityUsed;
- }
- return nonDFSUsed < 0 ? 0 : nonDFSUsed;
- }
- /**
- * Total non-used raw bytes.
- */
- public long getCapacityRemaining() {
- synchronized (heartbeats) {
- return this.capacityRemaining;
- }
- }
- /**
- * Total remaining space by data nodes as percentage of total capacity
- */
- public float getCapacityRemainingPercent() {
- synchronized(heartbeats){
- if (capacityTotal <= 0) {
- return 0;
- }
- return ((float)capacityRemaining * 100.0f)/(float)capacityTotal;
- }
- }
- /**
- * Total number of connections.
- */
- public int getTotalLoad() {
- synchronized (heartbeats) {
- return this.totalLoad;
- }
- }
- int getNumberOfDatanodes(DatanodeReportType type) {
- return getDatanodeListForReport(type).size();
- }
- private synchronized ArrayList<DatanodeDescriptor> getDatanodeListForReport(
- DatanodeReportType type) {
-
- boolean listLiveNodes = type == DatanodeReportType.ALL ||
- type == DatanodeReportType.LIVE;
- boolean listDeadNodes = type == DatanodeReportType.ALL ||
- type == DatanodeReportType.DEAD;
- HashMap<String, String> mustList = new HashMap<String, String>();
-
- if (listDeadNodes) {
- //first load all the nodes listed in include and exclude files.
- for (Iterator<String> it = hostsReader.getHosts().iterator();
- it.hasNext();) {
- mustList.put(it.next(), "");
- }
- for (Iterator<String> it = hostsReader.getExcludedHosts().iterator();
- it.hasNext();) {
- mustList.put(it.next(), "");
- }
- }
-
- ArrayList<DatanodeDescriptor> nodes = null;
-
- synchronized (datanodeMap) {
- nodes = new ArrayList<DatanodeDescriptor>(datanodeMap.size() +
- mustList.size());
-
- for(Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
- it.hasNext();) {
- DatanodeDescriptor dn = it.next();
- boolean isDead = isDatanodeDead(dn);
- if ( (isDead && listDeadNodes) || (!isDead && listLiveNodes) ) {
- nodes.add(dn);
- }
- //Remove any form of the this datanode in include/exclude lists.
- mustList.remove(dn.getName());
- mustList.remove(dn.getHost());
- mustList.remove(dn.getHostName());
- }
- }
-
- if (listDeadNodes) {
- for (Iterator<String> it = mustList.keySet().iterator(); it.hasNext();) {
- DatanodeDescriptor dn =
- new DatanodeDescriptor(new DatanodeID(it.next()));
- dn.setLastUpdate(0);
- nodes.add(dn);
- }
- }
-
- return nodes;
- }
- public synchronized DatanodeInfo[] datanodeReport( DatanodeReportType type
- ) throws AccessControlException {
- checkSuperuserPrivilege();
- ArrayList<DatanodeDescriptor> results = getDatanodeListForReport(type);
- DatanodeInfo[] arr = new DatanodeInfo[results.size()];
- for (int i=0; i<arr.length; i++) {
- arr[i] = new DatanodeInfo(results.get(i));
- }
- return arr;
- }
- /**
- * Save namespace image.
- * This will save current namespace into fsimage file and empty edits file.
- * Requires superuser privilege and safe mode.
- *
- * @throws AccessControlException if superuser privilege is violated.
- * @throws IOException if
- */
- synchronized void saveNamespace() throws AccessControlException, IOException {
- checkSuperuserPrivilege();
- if(!isInSafeMode()) {
- throw new IOException("Safe mode should be turned ON " +
- "in order to create namespace image.");
- }
- getFSImage().saveFSImage();
- LOG.info("New namespace image has been created.");
- }
- /**
- */
- public synchronized void DFSNodesStatus(ArrayList<DatanodeDescriptor> live,
- ArrayList<DatanodeDescriptor> dead) {
- ArrayList<DatanodeDescriptor> results =
- getDatanodeListForReport(DatanodeReportType.ALL);
- for(Iterator<DatanodeDescriptor> it = results.iterator(); it.hasNext();) {
- DatanodeDescriptor node = it.next();
- if (isDatanodeDead(node))
- dead.add(node);
- else
- live.add(node);
- }
- }
- /**
- * Prints information about all datanodes.
- */
- private synchronized void datanodeDump(PrintWriter out) {
- synchronized (datanodeMap) {
- out.println("Metasave: Number of datanodes: " + datanodeMap.size());
- for(Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator(); it.hasNext();) {
- DatanodeDescriptor node = it.next();
- out.println(node.dumpDatanode());
- }
- }
- }
- /**
- * Start decommissioning the specified datanode.
- */
- private void startDecommission (DatanodeDescriptor node)
- throws IOException {
- if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
- LOG.info("Start Decommissioning node " + node.getName());
- node.startDecommission();
- //
- // all the blocks that reside on this node have to be
- // replicated.
- Iterator<Block> decommissionBlocks = node.getBlockIterator();
- while(decommissionBlocks.hasNext()) {
- Block block = decommissionBlocks.next();
- updateNeededReplications(block, -1, 0);
- }
- }
- }
- /**
- * Stop decommissioning the specified datanodes.
- */
- public void stopDecommission (DatanodeDescriptor node)
- throws IOException {
- LOG.info("Stop Decommissioning node " + node.getName());
- node.stopDecommission();
- }
- /**
- */
- public DatanodeInfo getDataNodeInfo(String name) {
- return datanodeMap.get(name);
- }
- /**
- * @deprecated use {@link NameNode#getNameNodeAddress()} instead.
- */
- @Deprecated
- public InetSocketAddress getDFSNameNodeAddress() {
- return nameNodeAddress;
- }
- /**
- */
- public Date getStartTime() {
- return new Date(systemStart);
- }
-
- short getMaxReplication() { return (short)maxReplication; }
- short getMinReplication() { return (short)minReplication; }
- short getDefaultReplication() { return (short)defaultReplication; }
-
- /**
- * A immutable object that stores the number of live replicas and
- * the number of decommissined Replicas.
- */
- static class NumberReplicas {
- private int liveReplicas;
- private int decommissionedReplicas;
- private int corruptReplicas;
- private int excessReplicas;
- NumberReplicas() {
- initialize(0, 0, 0, 0);
- }
- NumberReplicas(int live, int decommissioned, int corrupt, int excess) {
- initialize(live, decommissioned, corrupt, excess);
- }
- void initialize(int live, int decommissioned, int corrupt, int excess) {
- liveReplicas = live;
- decommissionedReplicas = decommissioned;
- corruptReplicas = corrupt;
- excessReplicas = excess;
- }
- int liveReplicas() {
- return liveReplicas;
- }
- int decommissionedReplicas() {
- return decommissionedReplicas;
- }
- int corruptReplicas() {
- return corruptReplicas;
- }
- int excessReplicas() {
- return excessReplicas;
- }
- }
- /**
- * Counts the number of nodes in the given list into active and
- * decommissioned counters.
- */
- private NumberReplicas countNodes(Block b,
- Iterator<DatanodeDescriptor> nodeIter) {
- int count = 0;
- int live = 0;
- int corrupt = 0;
- int excess = 0;
- Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
- while ( nodeIter.hasNext() ) {
- DatanodeDescriptor node = nodeIter.next();
- if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) {
- corrupt++;
- }
- else if (node.isDecommissionInProgress() || node.isDecommissioned()) {
- count++;
- }
- else {
- Collection<Block> blocksExcess =
- excessReplicateMap.get(node.getStorageID());
- if (blocksExcess != null && blocksExcess.contains(b)) {
- excess++;
- } else {
- live++;
- }
- }
- }
- return new NumberReplicas(live, count, corrupt, excess);
- }
- /**
- * Return the number of nodes that are live and decommissioned.
- */
- NumberReplicas countNodes(Block b) {
- return countNodes(b, blocksMap.nodeIterator(b));
- }
- /**
- * Return true if there are any blocks on this node that have not
- * yet reached their replication factor. Otherwise returns false.
- */
- private boolean isReplicationInProgress(DatanodeDescriptor srcNode) {
- boolean status = false;
- for(final Iterator<Block> i = srcNode.getBlockIterator(); i.hasNext(); ) {
- final Block block = i.next();
- INode fileINode = blocksMap.getINode(block);
- if (fileINode != null) {
- NumberReplicas num = countNodes(block);
- int curReplicas = num.liveReplicas();
- int curExpectedReplicas = getReplication(block);
- if (curExpectedReplicas > curReplicas) {
- status = true;
- if (!neededReplications.contains(block) &&
- pendingReplications.getNumReplicas(block) == 0) {
- //
- // These blocks have been reported from the datanode
- // after the startDecommission method has been executed. These
- // blocks were in flight when the decommission was started.
- //
- neededReplications.add(block,
- curReplicas,
- num.decommissionedReplicas(),
- curExpectedReplicas);
- }
- }
- }
- }
- return status;
- }
- /**
- * Change, if appropriate, the admin state of a datanode to
- * decommission completed. Return true if decommission is complete.
- */
- boolean checkDecommissionStateInternal(DatanodeDescriptor node) {
- //
- // Check to see if all blocks in this decommissioned
- // node has reached their target replication factor.
- //
- if (node.isDecommissionInProgress()) {
- if (!isReplicationInProgress(node)) {
- node.setDecommissioned();
- LOG.info("Decommission complete for node " + node.getName());
- }
- }
- if (node.isDecommissioned()) {
- return true;
- }
- return false;
- }
- /**
- * Keeps track of which datanodes/ipaddress are allowed to connect to the namenode.
- */
- private boolean inHostsList(DatanodeID node, String ipAddr) {
- Set<String> hostsList = hostsReader.getHosts();
- return (hostsList.isEmpty() ||
- (ipAddr != null && hostsList.contains(ipAddr)) ||
- hostsList.contains(node.getHost()) ||
- hostsList.contains(node.getName()) ||
- ((node instanceof DatanodeInfo) &&
- hostsList.contains(((DatanodeInfo)node).getHostName())));
- }
-
- private boolean inExcludedHostsList(DatanodeID node, String ipAddr) {
- Set<String> excludeList = hostsReader.getExcludedHosts();
- return ((ipAddr != null && excludeList.contains(ipAddr)) ||
- excludeList.contains(node.getHost()) ||
- excludeList.contains(node.getName()) ||
- ((node instanceof DatanodeInfo) &&
- excludeList.contains(((DatanodeInfo)node).getHostName())));
- }
- /**
- * Rereads the config to get hosts and exclude list file names.
- * Rereads the files to update the hosts and exclude lists. It
- * checks if any of the hosts have changed states:
- * 1. Added to hosts --> no further work needed here.
- * 2. Removed from hosts --> mark AdminState as decommissioned.
- * 3. Added to exclude --> start decommission.
- * 4. Removed from exclude --> stop decommission.
- */
- public void refreshNodes(Configuration conf) throws IOException {
- checkSuperuserPrivilege();
- // Reread the config to get dfs.hosts and dfs.hosts.exclude filenames.
- // Update the file names and refresh internal includes and excludes list
- if (conf == null)
- conf = new Configuration();
- hostsReader.updateFileNames(conf.get("dfs.hosts",""),
- conf.get("dfs.hosts.exclude", ""));
- hostsReader.refresh();
- synchronized (this) {
- for (Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
- it.hasNext();) {
- DatanodeDescriptor node = it.next();
- // Check if not include.
- if (!inHostsList(node, null)) {
- node.setDecommissioned(); // case 2.
- } else {
- if (inExcludedHostsList(node, null)) {
- if (!node.isDecommissionInProgress() &&
- !node.isDecommissioned()) {
- startDecommission(node); // case 3.
- }
- } else {
- if (node.isDecommissionInProgress() ||
- node.isDecommissioned()) {
- stopDecommission(node); // case 4.
- }
- }
- }
- }
- }
-
- }
-
- void finalizeUpgrade() throws IOException {
- checkSuperuserPrivilege();
- getFSImage().finalizeUpgrade();
- }
- /**
- * Checks if the node is not on the hosts list. If it is not, then
- * it will be ignored. If the node is in the hosts list, but is also
- * on the exclude list, then it will be decommissioned.
- * Returns FALSE if node is rejected for registration.
- * Returns TRUE if node is registered (including when it is on the
- * exclude list and is being decommissioned).
- */
- private synchronized boolean verifyNodeRegistration(DatanodeRegistration nodeReg, String ipAddr)
- throws IOException {
- if (!inHostsList(nodeReg, ipAddr)) {
- return false;
- }
- if (inExcludedHostsList(nodeReg, ipAddr)) {
- DatanodeDescriptor node = getDatanode(nodeReg);
- if (node == null) {
- throw new IOException("verifyNodeRegistration: unknown datanode " +
- nodeReg.getName());
- }
- if (!checkDecommissionStateInternal(node)) {
- startDecommission(node);
- }
- }
- return true;
- }
-
- /**
- * Checks if the Admin state bit is DECOMMISSIONED. If so, then
- * we should shut it down.
- *
- * Returns true if the node should be shutdown.
- */
- private boolean shouldNodeShutdown(DatanodeDescriptor node) {
- return (node.isDecommissioned());
- }
-
- /**
- * Get data node by storage ID.
- *
- * @param nodeID
- * @return DatanodeDescriptor or null if the node is not found.
- * @throws IOException
- */
- public DatanodeDescriptor getDatanode(DatanodeID nodeID) throws IOException {
- UnregisteredDatanodeException e = null;
- DatanodeDescriptor node = datanodeMap.get(nodeID.getStorageID());
- if (node == null)
- return null;
- if (!node.getName().equals(nodeID.getName())) {
- e = new UnregisteredDatanodeException(nodeID, node);
- NameNode.stateChangeLog.fatal("BLOCK* NameSystem.getDatanode: "
- + e.getLocalizedMessage());
- throw e;
- }
- return node;
- }
-
- /** Stop at and return the datanode at index (used for content browsing)*/
- @Deprecated
- private DatanodeDescriptor getDatanodeByIndex(int index) {
- int i = 0;
- for (DatanodeDescriptor node : datanodeMap.values()) {
- if (i == index) {
- return node;
- }
- i++;
- }
- return null;
- }
-
- @Deprecated
- public String randomDataNode() {
- int size = datanodeMap.size();
- int index = 0;
- if (size != 0) {
- index = r.nextInt(size);
- for(int i=0; i<size; i++) {
- DatanodeDescriptor d = getDatanodeByIndex(index);
- if (d != null && !d.isDecommissioned() && !isDatanodeDead(d) &&
- !d.isDecommissionInProgress()) {
- return d.getHost() + ":" + d.getInfoPort();
- }
- index = (index + 1) % size;
- }
- }
- return null;
- }
- public DatanodeDescriptor getRandomDatanode() {
- return replicator.chooseTarget(1, null, null, 0)[0];
- }
- /**
- * SafeModeInfo contains information related to the safe mode.
- * <p>
- * An instance of {@link SafeModeInfo} is created when the name node
- * enters safe mode.
- * <p>
- * During name node startup {@link SafeModeInfo} counts the number of
- * <em>safe blocks</em>, those that have at least the minimal number of
- * replicas, and calculates the ratio of safe blocks to the total number
- * of blocks in the system, which is the size of
- * {@link FSNamesystem#blocksMap}. When the ratio reaches the
- * {@link #threshold} it starts the {@link SafeModeMonitor} daemon in order
- * to monitor whether the safe mode {@link #extension} is passed.
- * Then it leaves safe mode and destroys itself.
- * <p>
- * If safe mode is turned on manually then the number of safe blocks is
- * not tracked because the name node is not intended to leave safe mode
- * automatically in the case.
- *
- * @see ClientProtocol#setSafeMode(FSConstants.SafeModeAction)
- * @see SafeModeMonitor
- */
- class SafeModeInfo {
- // configuration fields
- /** Safe mode threshold condition %.*/
- private double threshold;
- /** Safe mode extension after the threshold. */
- private int extension;
- /** Min replication required by safe mode. */
- private int safeReplication;
-
- // internal fields
- /** Time when threshold was reached.
- *
- * <br>-1 safe mode is off
- * <br> 0 safe mode is on, but threshold is not reached yet
- */
- private long reached = -1;
- /** Total number of blocks. */
- int blockTotal;
- /** Number of safe blocks. */
- private int blockSafe;
- /** time of the last status printout */
- private long lastStatusReport = 0;
-
- /**
- * Creates SafeModeInfo when the name node enters
- * automatic safe mode at startup.
- *
- * @param conf configuration
- */
- SafeModeInfo(Configuration conf) {
- this.threshold = conf.getFloat("dfs.safemode.threshold.pct", 0.95f);
- this.extension = conf.getInt("dfs.safemode.extension", 0);
- this.safeReplication = conf.getInt("dfs.replication.min", 1);
- this.blockTotal = 0;
- this.blockSafe = 0;
- }
- /**
- * Creates SafeModeInfo when safe mode is entered manually.
- *
- * The {@link #threshold} is set to 1.5 so that it could never be reached.
- * {@link #blockTotal} is set to -1 to indicate that safe mode is manual.
- *
- * @see SafeModeInfo
- */
- private SafeModeInfo() {
- this.threshold = 1.5f; // this threshold can never be reached
- this.extension = Integer.MAX_VALUE;
- this.safeReplication = Short.MAX_VALUE + 1; // more than maxReplication
- this.blockTotal = -1;
- this.blockSafe = -1;
- this.reached = -1;
- enter();
- reportStatus("STATE* Safe mode is ON.", true);
- }
-
- /**
- * Check if safe mode is on.
- * @return true if in safe mode
- */
- synchronized boolean isOn() {
- try {
- assert isConsistent() : " SafeMode: Inconsistent filesystem state: "
- + "Total num of blocks, active blocks, or "
- + "total safe blocks don't match.";
- } catch(IOException e) {
- System.err.print(StringUtils.stringifyException(e));
- }
- return this.reached >= 0;
- }
-
- /**
- * Enter safe mode.
- */
- void enter() {
- this.reached = 0;
- }
-
- /**
- * Leave safe mode.
- * <p>
- * Switch to manual safe mode if distributed upgrade is required.<br>
- * Check for invalid, under- & over-replicated blocks in the end of startup.
- */
- synchronized void leave(boolean checkForUpgrades) {
- if(checkForUpgrades) {
- // verify whether a distributed upgrade needs to be started
- boolean needUpgrade = false;
- try {
- needUpgrade = startDistributedUpgradeIfNeeded();
- } catch(IOException e) {
- FSNamesystem.LOG.error(StringUtils.stringifyException(e));
- }
- if(needUpgrade) {
- // switch to manual safe mode
- safeMode = new SafeModeInfo();
- return;
- }
- }
- // verify blocks replications
- processMisReplicatedBlocks();
- long timeInSafemode = now() - systemStart;
- NameNode.stateChangeLog.info("STATE* Leaving safe mode after "
- + timeInSafemode/1000 + " secs.");
- NameNode.getNameNodeMetrics().safeModeTime.set((int) timeInSafemode);
-
- if (reached >= 0) {
- NameNode.stateChangeLog.info("STATE* Safe mode is OFF.");
- }
- reached = -1;
- safeMode = null;
- NameNode.stateChangeLog.info("STATE* Network topology has "
- +clusterMap.getNumOfRacks()+" racks and "
- +clusterMap.getNumOfLeaves()+ " datanodes");
- NameNode.stateChangeLog.info("STATE* UnderReplicatedBlocks has "
- +neededReplications.size()+" blocks");
- }
-
- /**
- * Safe mode can be turned off iff
- * the threshold is reached and
- * the extension time have passed.
- * @return true if can leave or false otherwise.
- */
- synchronized boolean canLeave() {
- if (reached == 0)
- return false;
- if (now() - reached < extension) {
- reportStatus("STATE* Safe mode ON.", false);
- return false;
- }
- return !needEnter();
- }
-
- /**
- * There is no need to enter safe mode
- * if DFS is empty or {@link #threshold} == 0
- */
- boolean needEnter() {
- return getSafeBlockRatio() < threshold;
- }
-
- /**
- * Ratio of the number of safe blocks to the total number of blocks
- * to be compared with the threshold.
- */
- private float getSafeBlockRatio() {
- return (blockTotal == 0 ? 1 : (float)blockSafe/blockTotal);
- }
-
- /**
- * Check and trigger safe mode if needed.
- */
- private void checkMode() {
- if (needEnter()) {
- enter();
- reportStatus("STATE* Safe mode ON.", false);
- return;
- }
- // the threshold is reached
- if (!isOn() || // safe mode is off
- extension <= 0 || threshold <= 0) { // don't need to wait
- this.leave(true); // leave safe mode
- return;
- }
- if (reached > 0) { // threshold has already been reached before
- reportStatus("STATE* Safe mode ON.", false);
- return;
- }
- // start monitor
- reached = now();
- smmthread = new Daemon(new SafeModeMonitor());
- smmthread.start();
- reportStatus("STATE* Safe mode extension entered.", true);
- }
-
- /**
- * Set total number of blocks.
- */
- synchronized void setBlockTotal(int total) {
- this.blockTotal = total;
- checkMode();
- }
-
- /**
- * Increment number of safe blocks if current block has
- * reached minimal replication.
- * @param replication current replication
- */
- synchronized void incrementSafeBlockCount(short replication) {
- if ((int)replication == safeReplication)
- this.blockSafe++;
- checkMode();
- }
-
- /**
- * Decrement number of safe blocks if current block has
- * fallen below minimal replication.
- * @param replication current replication
- */
- synchronized void decrementSafeBlockCount(short replication) {
- if (replication == safeReplication-1)
- this.blockSafe--;
- checkMode();
- }
- /**
- * Check if safe mode was entered manually or at startup.
- */
- boolean isManual() {
- return extension == Integer.MAX_VALUE;
- }
- /**
- * Set manual safe mode.
- */
- void setManual() {
- extension = Integer.MAX_VALUE;
- }
- /**
- * A tip on how safe mode is to be turned off: manually or automatically.
- */
- String getTurnOffTip() {
- String leaveMsg = "Safe mode will be turned off automatically";
- if(reached < 0)
- return "Safe mode is OFF.";
- if(isManual()) {
- if(getDistributedUpgradeState())
- return leaveMsg + " upon completion of " +
- "the distributed upgrade: upgrade progress = " +
- getDistributedUpgradeStatus() + "%";
- leaveMsg = "Use "hadoop dfs -safemode leave" to turn safe mode off";
- }
- if(blockTotal < 0)
- return leaveMsg + ".";
- String safeBlockRatioMsg =
- String.format("The ratio of reported blocks %.4f has " +
- (reached == 0 ? "not " : "") + "reached the threshold %.4f. ",
- getSafeBlockRatio(), threshold) + leaveMsg;
- if(reached == 0 || isManual()) // threshold is not reached or manual
- return safeBlockRatioMsg + ".";
- // extension period is in progress
- return safeBlockRatioMsg + " in "
- + Math.abs(reached + extension - now())/1000 + " seconds.";
- }
- /**
- * Print status every 20 seconds.
- */
- private void reportStatus(String msg, boolean rightNow) {
- long curTime = now();
- if(!rightNow && (curTime - lastStatusReport < 20 * 1000))
- return;
- NameNode.stateChangeLog.info(msg + " n" + getTurnOffTip());
- lastStatusReport = curTime;
- }
- /**
- * Returns printable state of the class.
- */
- public String toString() {
- String resText = "Current safe block ratio = "
- + getSafeBlockRatio()
- + ". Target threshold = " + threshold
- + ". Minimal replication = " + safeReplication + ".";
- if (reached > 0)
- resText += " Threshold was reached " + new Date(reached) + ".";
- return resText;
- }
-
- /**
- * Checks consistency of the class state.
- * This is costly and currently called only in assert.
- */
- boolean isConsistent() throws IOException {
- if (blockTotal == -1 && blockSafe == -1) {
- return true; // manual safe mode
- }
- int activeBlocks = blocksMap.size();
- for(Iterator<Collection<Block>> it =
- recentInvalidateSets.values().iterator(); it.hasNext();) {
- activeBlocks -= it.next().size();
- }
- return (blockTotal == activeBlocks) ||
- (blockSafe >= 0 && blockSafe <= blockTotal);
- }
- }
-
- /**
- * Periodically check whether it is time to leave safe mode.
- * This thread starts when the threshold level is reached.
- *
- */
- class SafeModeMonitor implements Runnable {
- /** interval in msec for checking safe mode: {@value} */
- private static final long recheckInterval = 1000;
-
- /**
- */
- public void run() {
- while (fsRunning && (safeMode != null && !safeMode.canLeave())) {
- try {
- Thread.sleep(recheckInterval);
- } catch (InterruptedException ie) {
- }
- }
- // leave safe mode and stop the monitor
- try {
- leaveSafeMode(true);
- } catch(SafeModeException es) { // should never happen
- String msg = "SafeModeMonitor may not run during distributed upgrade.";
- assert false : msg;
- throw new RuntimeException(msg, es);
- }
- smmthread = null;
- }
- }
-
- /**
- * Current system time.
- * @return current time in msec.
- */
- static long now() {
- return System.currentTimeMillis();
- }
-
- boolean setSafeMode(SafeModeAction action) throws IOException {
- if (action != SafeModeAction.SAFEMODE_GET) {
- checkSuperuserPrivilege();
- switch(action) {
- case SAFEMODE_LEAVE: // leave safe mode
- leaveSafeMode(false);
- break;
- case SAFEMODE_ENTER: // enter safe mode
- enterSafeMode();
- break;
- }
- }
- return isInSafeMode();
- }
- /**
- * Check whether the name node is in safe mode.
- * @return true if safe mode is ON, false otherwise
- */
- boolean isInSafeMode() {
- if (safeMode == null)
- return false;
- return safeMode.isOn();
- }
-
- /**
- * Increment number of blocks that reached minimal replication.
- * @param replication current replication
- */
- void incrementSafeBlockCount(int replication) {
- if (safeMode == null)
- return;
- safeMode.incrementSafeBlockCount((short)replication);
- }
- /**
- * Decrement number of blocks that reached minimal replication.
- */
- void decrementSafeBlockCount(Block b) {
- if (safeMode == null) // mostly true
- return;
- safeMode.decrementSafeBlockCount((short)countNodes(b).liveReplicas());
- }
- /**
- * Set the total number of blocks in the system.
- */
- void setBlockTotal() {
- if (safeMode == null)
- return;
- safeMode.setBlockTotal(blocksMap.size());
- }
- /**
- * Get the total number of blocks in the system.
- */
- public long getBlocksTotal() {
- return blocksMap.size();
- }
- /**
- * Enter safe mode manually.
- * @throws IOException
- */
- synchronized void enterSafeMode() throws IOException {
- if (!isInSafeMode()) {
- safeMode = new SafeModeInfo();
- return;
- }
- safeMode.setManual();
- NameNode.stateChangeLog.info("STATE* Safe mode is ON. "
- + safeMode.getTurnOffTip());
- }
- /**
- * Leave safe mode.
- * @throws IOException
- */
- synchronized void leaveSafeMode(boolean checkForUpgrades) throws SafeModeException {
- if (!isInSafeMode()) {
- NameNode.stateChangeLog.info("STATE* Safe mode is already OFF.");
- return;
- }
- if(getDistributedUpgradeState())
- throw new SafeModeException("Distributed upgrade is in progress",
- safeMode);
- safeMode.leave(checkForUpgrades);
- }
-
- String getSafeModeTip() {
- if (!isInSafeMode())
- return "";
- return safeMode.getTurnOffTip();
- }
- long getEditLogSize() throws IOException {
- return getEditLog().getEditLogSize();
- }
- synchronized CheckpointSignature rollEditLog() throws IOException {
- if (isInSafeMode()) {
- throw new SafeModeException("Checkpoint not created",
- safeMode);
- }
- LOG.info("Roll Edit Log from " + Server.getRemoteAddress());
- return getFSImage().rollEditLog();
- }
- synchronized void rollFSImage() throws IOException {
- if (isInSafeMode()) {
- throw new SafeModeException("Checkpoint not created",
- safeMode);
- }
- LOG.info("Roll FSImage from " + Server.getRemoteAddress());
- getFSImage().rollFSImage();
- }
- /**
- * Returns whether the given block is one pointed-to by a file.
- */
- private boolean isValidBlock(Block b) {
- return (blocksMap.getINode(b) != null);
- }
- // Distributed upgrade manager
- UpgradeManagerNamenode upgradeManager = new UpgradeManagerNamenode();
- UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action
- ) throws IOException {
- return upgradeManager.distributedUpgradeProgress(action);
- }
- UpgradeCommand processDistributedUpgradeCommand(UpgradeCommand comm) throws IOException {
- return upgradeManager.processUpgradeCommand(comm);
- }
- int getDistributedUpgradeVersion() {
- return upgradeManager.getUpgradeVersion();
- }
- UpgradeCommand getDistributedUpgradeCommand() throws IOException {
- return upgradeManager.getBroadcastCommand();
- }
- boolean getDistributedUpgradeState() {
- return upgradeManager.getUpgradeState();
- }
- short getDistributedUpgradeStatus() {
- return upgradeManager.getUpgradeStatus();
- }
- boolean startDistributedUpgradeIfNeeded() throws IOException {
- return upgradeManager.startUpgrade();
- }
- PermissionStatus createFsOwnerPermissions(FsPermission permission) {
- return new PermissionStatus(fsOwner.getUserName(), supergroup, permission);
- }
- private PermissionChecker checkOwner(String path) throws AccessControlException {
- return checkPermission(path, true, null, null, null, null);
- }
- private PermissionChecker checkPathAccess(String path, FsAction access
- ) throws AccessControlException {
- return checkPermission(path, false, null, null, access, null);
- }
- private PermissionChecker checkParentAccess(String path, FsAction access
- ) throws AccessControlException {
- return checkPermission(path, false, null, access, null, null);
- }
- private PermissionChecker checkAncestorAccess(String path, FsAction access
- ) throws AccessControlException {
- return checkPermission(path, false, access, null, null, null);
- }
- private PermissionChecker checkTraverse(String path
- ) throws AccessControlException {
- return checkPermission(path, false, null, null, null, null);
- }
- private void checkSuperuserPrivilege() throws AccessControlException {
- if (isPermissionEnabled) {
- PermissionChecker pc = new PermissionChecker(
- fsOwner.getUserName(), supergroup);
- if (!pc.isSuper) {
- throw new AccessControlException("Superuser privilege is required");
- }
- }
- }
- /**
- * Check whether current user have permissions to access the path.
- * For more details of the parameters, see
- * {@link PermissionChecker#checkPermission(String, INodeDirectory, boolean, FsAction, FsAction, FsAction, FsAction)}.
- */
- private PermissionChecker checkPermission(String path, boolean doCheckOwner,
- FsAction ancestorAccess, FsAction parentAccess, FsAction access,
- FsAction subAccess) throws AccessControlException {
- PermissionChecker pc = new PermissionChecker(
- fsOwner.getUserName(), supergroup);
- if (!pc.isSuper) {
- dir.waitForReady();
- pc.checkPermission(path, dir.rootDir, doCheckOwner,
- ancestorAccess, parentAccess, access, subAccess);
- }
- return pc;
- }
- /**
- * Check to see if we have exceeded the limit on the number
- * of inodes.
- */
- void checkFsObjectLimit() throws IOException {
- if (maxFsObjects != 0 &&
- maxFsObjects <= dir.totalInodes() + getBlocksTotal()) {
- throw new IOException("Exceeded the configured number of objects " +
- maxFsObjects + " in the filesystem.");
- }
- }
- /**
- * Get the total number of objects in the system.
- */
- long getMaxObjects() {
- return maxFsObjects;
- }
- public long getFilesTotal() {
- return this.dir.totalInodes();
- }
- public long getPendingReplicationBlocks() {
- return pendingReplicationBlocksCount;
- }
- public long getUnderReplicatedBlocks() {
- return underReplicatedBlocksCount;
- }
- /** Returns number of blocks with corrupt replicas */
- public long getCorruptReplicaBlocksCount() {
- return corruptReplicaBlocksCount;
- }
- public long getScheduledReplicationBlocks() {
- return scheduledReplicationBlocksCount;
- }
- public String getFSState() {
- return isInSafeMode() ? "safeMode" : "Operational";
- }
-
- private ObjectName mbeanName;
- /**
- * Register the FSNamesystem MBean using the name
- * "hadoop:service=NameNode,name=FSNamesystemState"
- */
- void registerMBean(Configuration conf) {
- // We wrap to bypass standard mbean naming convention.
- // This wraping can be removed in java 6 as it is more flexible in
- // package naming for mbeans and their impl.
- StandardMBean bean;
- try {
- myFSMetrics = new FSNamesystemMetrics(conf);
- bean = new StandardMBean(this,FSNamesystemMBean.class);
- mbeanName = MBeanUtil.registerMBean("NameNode", "FSNamesystemState", bean);
- } catch (NotCompliantMBeanException e) {
- e.printStackTrace();
- }
- LOG.info("Registered FSNamesystemStatusMBean");
- }
- /**
- * get FSNamesystemMetrics
- */
- public FSNamesystemMetrics getFSNamesystemMetrics() {
- return myFSMetrics;
- }
- /**
- * shutdown FSNamesystem
- */
- public void shutdown() {
- if (mbeanName != null)
- MBeanUtil.unregisterMBean(mbeanName);
- }
-
- /**
- * Number of live data nodes
- * @return Number of live data nodes
- */
- public int numLiveDataNodes() {
- int numLive = 0;
- synchronized (datanodeMap) {
- for(Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
- it.hasNext();) {
- DatanodeDescriptor dn = it.next();
- if (!isDatanodeDead(dn) ) {
- numLive++;
- }
- }
- }
- return numLive;
- }
-
- /**
- * Number of dead data nodes
- * @return Number of dead data nodes
- */
- public int numDeadDataNodes() {
- int numDead = 0;
- synchronized (datanodeMap) {
- for(Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
- it.hasNext();) {
- DatanodeDescriptor dn = it.next();
- if (isDatanodeDead(dn) ) {
- numDead++;
- }
- }
- }
- return numDead;
- }
- /**
- * Sets the generation stamp for this filesystem
- */
- public void setGenerationStamp(long stamp) {
- generationStamp.setStamp(stamp);
- }
- /**
- * Gets the generation stamp for this filesystem
- */
- public long getGenerationStamp() {
- return generationStamp.getStamp();
- }
- /**
- * Increments, logs and then returns the stamp
- */
- long nextGenerationStamp() {
- long gs = generationStamp.nextStamp();
- getEditLog().logGenerationStamp(gs);
- return gs;
- }
- /**
- * Verifies that the block is associated with a file that has a lease.
- * Increments, logs and then returns the stamp
- */
- synchronized long nextGenerationStampForBlock(Block block) throws IOException {
- BlockInfo storedBlock = blocksMap.getStoredBlock(block);
- if (storedBlock == null) {
- String msg = block + " is already commited, storedBlock == null.";
- LOG.info(msg);
- throw new IOException(msg);
- }
- INodeFile fileINode = storedBlock.getINode();
- if (!fileINode.isUnderConstruction()) {
- String msg = block + " is already commited, !fileINode.isUnderConstruction().";
- LOG.info(msg);
- throw new IOException(msg);
- }
- if (!((INodeFileUnderConstruction)fileINode).setLastRecoveryTime(now())) {
- String msg = block + " is beening recovered, ignoring this request.";
- LOG.info(msg);
- throw new IOException(msg);
- }
- return nextGenerationStamp();
- }
- // rename was successful. If any part of the renamed subtree had
- // files that were being written to, update with new filename.
- //
- void changeLease(String src, String dst, FileStatus dinfo)
- throws IOException {
- String overwrite;
- String replaceBy;
- boolean destinationExisted = true;
- if (dinfo == null) {
- destinationExisted = false;
- }
- if (destinationExisted && dinfo.isDir()) {
- Path spath = new Path(src);
- overwrite = spath.getParent().toString() + Path.SEPARATOR;
- replaceBy = dst + Path.SEPARATOR;
- } else {
- overwrite = src;
- replaceBy = dst;
- }
- leaseManager.changeLease(src, dst, overwrite, replaceBy);
- }
-
- /**
- * Serializes leases.
- */
- void saveFilesUnderConstruction(DataOutputStream out) throws IOException {
- synchronized (leaseManager) {
- out.writeInt(leaseManager.countPath()); // write the size
- for (Lease lease : leaseManager.getSortedLeases()) {
- for(String path : lease.getPaths()) {
- // verify that path exists in namespace
- INode node = dir.getFileINode(path);
- if (node == null) {
- throw new IOException("saveLeases found path " + path +
- " but no matching entry in namespace.");
- }
- if (!node.isUnderConstruction()) {
- throw new IOException("saveLeases found path " + path +
- " but is not under construction.");
- }
- INodeFileUnderConstruction cons = (INodeFileUnderConstruction) node;
- FSImage.writeINodeUnderConstruction(out, cons, path);
- }
- }
- }
- }
- }