FSNamesystem.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:167k
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.apache.hadoop.hdfs.server.namenode;
- import org.apache.commons.logging.*;
- import org.apache.hadoop.conf.*;
- import org.apache.hadoop.hdfs.DFSUtil;
- import org.apache.hadoop.hdfs.protocol.*;
- import org.apache.hadoop.hdfs.server.common.GenerationStamp;
- import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
- import org.apache.hadoop.hdfs.server.common.Storage;
- import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
- import org.apache.hadoop.hdfs.server.namenode.BlocksMap.BlockInfo;
- import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
- import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMetrics;
- import org.apache.hadoop.security.AccessControlException;
- import org.apache.hadoop.security.UnixUserGroupInformation;
- import org.apache.hadoop.security.UserGroupInformation;
- import org.apache.hadoop.util.*;
- import org.apache.hadoop.metrics.util.MBeanUtil;
- import org.apache.hadoop.net.CachedDNSToSwitchMapping;
- import org.apache.hadoop.net.DNSToSwitchMapping;
- import org.apache.hadoop.net.NetworkTopology;
- import org.apache.hadoop.net.ScriptBasedMapping;
- import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
- import org.apache.hadoop.hdfs.server.namenode.UnderReplicatedBlocks.BlockIterator;
- import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
- import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
- import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
- import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
- import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
- import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
- import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
- import org.apache.hadoop.fs.ContentSummary;
- import org.apache.hadoop.fs.FileStatus;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.fs.permission.*;
- import org.apache.hadoop.ipc.Server;
- import org.apache.hadoop.io.IOUtils;
- import java.io.BufferedWriter;
- import java.io.File;
- import java.io.FileWriter;
- import java.io.FileNotFoundException;
- import java.io.IOException;
- import java.io.PrintWriter;
- import java.io.DataOutputStream;
- import java.net.InetAddress;
- import java.net.InetSocketAddress;
- import java.util.*;
- import java.util.Map.Entry;
- import javax.management.NotCompliantMBeanException;
- import javax.management.ObjectName;
- import javax.management.StandardMBean;
- import javax.security.auth.login.LoginException;
- /***************************************************
- * FSNamesystem does the actual bookkeeping work for the
- * DataNode.
- *
- * It tracks several important tables.
- *
- * 1) valid fsname --> blocklist (kept on disk, logged)
- * 2) Set of all valid blocks (inverted #1)
- * 3) block --> machinelist (kept in memory, rebuilt dynamically from reports)
- * 4) machine --> blocklist (inverted #2)
- * 5) LRU cache of updated-heartbeat machines
- ***************************************************/
- public class FSNamesystem implements FSConstants, FSNamesystemMBean {
- public static final Log LOG = LogFactory.getLog(FSNamesystem.class);
- public static final String AUDIT_FORMAT =
- "ugi=%st" + // ugi
- "ip=%st" + // remote IP
- "cmd=%st" + // command
- "src=%st" + // src path
- "dst=%st" + // dst path (optional)
- "perm=%s"; // permissions (optional)
- private static final ThreadLocal<Formatter> auditFormatter =
- new ThreadLocal<Formatter>() {
- protected Formatter initialValue() {
- return new Formatter(new StringBuilder(AUDIT_FORMAT.length() * 4));
- }
- };
- private static final void logAuditEvent(UserGroupInformation ugi,
- InetAddress addr, String cmd, String src, String dst,
- FileStatus stat) {
- final Formatter fmt = auditFormatter.get();
- ((StringBuilder)fmt.out()).setLength(0);
- auditLog.info(fmt.format(AUDIT_FORMAT, ugi, addr, cmd, src, dst,
- (stat == null)
- ? null
- : stat.getOwner() + ':' + stat.getGroup() + ':' +
- stat.getPermission()
- ).toString());
- }
- public static final Log auditLog = LogFactory.getLog(
- FSNamesystem.class.getName() + ".audit");
- private boolean isPermissionEnabled;
- private UserGroupInformation fsOwner;
- private String supergroup;
- private PermissionStatus defaultPermission;
- // FSNamesystemMetrics counter variables
- private FSNamesystemMetrics myFSMetrics;
- private long capacityTotal = 0L, capacityUsed = 0L, capacityRemaining = 0L;
- private int totalLoad = 0;
- private long pendingReplicationBlocksCount = 0L, corruptReplicaBlocksCount,
- underReplicatedBlocksCount = 0L, scheduledReplicationBlocksCount = 0L;
- //
- // Stores the correct file name hierarchy
- //
- public FSDirectory dir;
- //
- // Mapping: Block -> { INode, datanodes, self ref }
- // Updated only in response to client-sent information.
- //
- BlocksMap blocksMap = new BlocksMap();
- //
- // Store blocks-->datanodedescriptor(s) map of corrupt replicas
- //
- public CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();
-
- /**
- * Stores the datanode -> block map.
- * <p>
- * Done by storing a set of {@link DatanodeDescriptor} objects, sorted by
- * storage id. In order to keep the storage map consistent it tracks
- * all storages ever registered with the namenode.
- * A descriptor corresponding to a specific storage id can be
- * <ul>
- * <li>added to the map if it is a new storage id;</li>
- * <li>updated with a new datanode started as a replacement for the old one
- * with the same storage id; and </li>
- * <li>removed if and only if an existing datanode is restarted to serve a
- * different storage id.</li>
- * </ul> <br>
- * The list of the {@link DatanodeDescriptor}s in the map is checkpointed
- * in the namespace image file. Only the {@link DatanodeInfo} part is
- * persistent, the list of blocks is restored from the datanode block
- * reports.
- * <p>
- * Mapping: StorageID -> DatanodeDescriptor
- */
- NavigableMap<String, DatanodeDescriptor> datanodeMap =
- new TreeMap<String, DatanodeDescriptor>();
- //
- // Keeps a Collection for every named machine containing
- // blocks that have recently been invalidated and are thought to live
- // on the machine in question.
- // Mapping: StorageID -> ArrayList<Block>
- //
- private Map<String, Collection<Block>> recentInvalidateSets =
- new TreeMap<String, Collection<Block>>();
- //
- // Keeps a TreeSet for every named node. Each treeset contains
- // a list of the blocks that are "extra" at that location. We'll
- // eventually remove these extras.
- // Mapping: StorageID -> TreeSet<Block>
- //
- Map<String, Collection<Block>> excessReplicateMap =
- new TreeMap<String, Collection<Block>>();
- Random r = new Random();
- /**
- * Stores a set of DatanodeDescriptor objects.
- * This is a subset of {@link #datanodeMap}, containing nodes that are
- * considered alive.
- * The {@link HeartbeatMonitor} periodically checks for outdated entries,
- * and removes them from the list.
- */
- ArrayList<DatanodeDescriptor> heartbeats = new ArrayList<DatanodeDescriptor>();
- //
- // Store set of Blocks that need to be replicated 1 or more times.
- // We also store pending replication-orders.
- // Set of: Block
- //
- private UnderReplicatedBlocks neededReplications = new UnderReplicatedBlocks();
- private PendingReplicationBlocks pendingReplications;
- public LeaseManager leaseManager = new LeaseManager(this);
- //
- // Threaded object that checks to see if we have been
- // getting heartbeats from all clients.
- //
- Daemon hbthread = null; // HeartbeatMonitor thread
- public Daemon lmthread = null; // LeaseMonitor thread
- Daemon smmthread = null; // SafeModeMonitor thread
- public Daemon replthread = null; // Replication thread
-
- private volatile boolean fsRunning = true;
- long systemStart = 0;
- // The maximum number of replicates we should allow for a single block
- private int maxReplication;
- // How many outgoing replication streams a given node should have at one time
- private int maxReplicationStreams;
- // MIN_REPLICATION is how many copies we need in place or else we disallow the write
- private int minReplication;
- // Default replication
- private int defaultReplication;
- // heartbeatRecheckInterval is how often namenode checks for expired datanodes
- private long heartbeatRecheckInterval;
- // heartbeatExpireInterval is how long namenode waits for datanode to report
- // heartbeat
- private long heartbeatExpireInterval;
- //replicationRecheckInterval is how often namenode checks for new replication work
- private long replicationRecheckInterval;
- // default block size of a file
- private long defaultBlockSize = 0;
- // allow appending to hdfs files
- private boolean supportAppends = true;
- /**
- * Last block index used for replication work.
- */
- private int replIndex = 0;
- private long missingBlocksInCurIter = 0;
- private long missingBlocksInPrevIter = 0;
- public static FSNamesystem fsNamesystemObject;
- /** NameNode RPC address */
- private InetSocketAddress nameNodeAddress = null; // TODO: name-node has this field, it should be removed here
- private SafeModeInfo safeMode; // safe mode information
- private Host2NodesMap host2DataNodeMap = new Host2NodesMap();
-
- // datanode networktoplogy
- NetworkTopology clusterMap = new NetworkTopology();
- private DNSToSwitchMapping dnsToSwitchMapping;
-
- // for block replicas placement
- ReplicationTargetChooser replicator;
- private HostsFileReader hostsReader;
- private Daemon dnthread = null;
- private long maxFsObjects = 0; // maximum number of fs objects
- /**
- * The global generation stamp for this file system.
- */
- private final GenerationStamp generationStamp = new GenerationStamp();
- // Ask Datanode only up to this many blocks to delete.
- private int blockInvalidateLimit = FSConstants.BLOCK_INVALIDATE_CHUNK;
- // precision of access times.
- private long accessTimePrecision = 0;
- /**
- * FSNamesystem constructor.
- */
- FSNamesystem(NameNode nn, Configuration conf) throws IOException {
- try {
- initialize(nn, conf);
- } catch(IOException e) {
- LOG.error(getClass().getSimpleName() + " initialization failed.", e);
- close();
- throw e;
- }
- }
- /**
- * Initialize FSNamesystem.
- */
- private void initialize(NameNode nn, Configuration conf) throws IOException {
- this.systemStart = now();
- setConfigurationParameters(conf);
- this.nameNodeAddress = nn.getNameNodeAddress();
- this.registerMBean(conf); // register the MBean for the FSNamesystemStutus
- this.dir = new FSDirectory(this, conf);
- StartupOption startOpt = NameNode.getStartupOption(conf);
- this.dir.loadFSImage(getNamespaceDirs(conf),
- getNamespaceEditsDirs(conf), startOpt);
- long timeTakenToLoadFSImage = now() - systemStart;
- LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs");
- NameNode.getNameNodeMetrics().fsImageLoadTime.set(
- (int) timeTakenToLoadFSImage);
- this.safeMode = new SafeModeInfo(conf);
- setBlockTotal();
- pendingReplications = new PendingReplicationBlocks(
- conf.getInt("dfs.replication.pending.timeout.sec",
- -1) * 1000L);
- this.hbthread = new Daemon(new HeartbeatMonitor());
- this.lmthread = new Daemon(leaseManager.new Monitor());
- this.replthread = new Daemon(new ReplicationMonitor());
- hbthread.start();
- lmthread.start();
- replthread.start();
- this.hostsReader = new HostsFileReader(conf.get("dfs.hosts",""),
- conf.get("dfs.hosts.exclude",""));
- this.dnthread = new Daemon(new DecommissionManager(this).new Monitor(
- conf.getInt("dfs.namenode.decommission.interval", 30),
- conf.getInt("dfs.namenode.decommission.nodes.per.interval", 5)));
- dnthread.start();
- this.dnsToSwitchMapping = ReflectionUtils.newInstance(
- conf.getClass("topology.node.switch.mapping.impl", ScriptBasedMapping.class,
- DNSToSwitchMapping.class), conf);
-
- /* If the dns to swith mapping supports cache, resolve network
- * locations of those hosts in the include list,
- * and store the mapping in the cache; so future calls to resolve
- * will be fast.
- */
- if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
- dnsToSwitchMapping.resolve(new ArrayList<String>(hostsReader.getHosts()));
- }
- }
- public static Collection<File> getNamespaceDirs(Configuration conf) {
- Collection<String> dirNames = conf.getStringCollection("dfs.name.dir");
- if (dirNames.isEmpty())
- dirNames.add("/tmp/hadoop/dfs/name");
- Collection<File> dirs = new ArrayList<File>(dirNames.size());
- for(String name : dirNames) {
- dirs.add(new File(name));
- }
- return dirs;
- }
-
- public static Collection<File> getNamespaceEditsDirs(Configuration conf) {
- Collection<String> editsDirNames =
- conf.getStringCollection("dfs.name.edits.dir");
- if (editsDirNames.isEmpty())
- editsDirNames.add("/tmp/hadoop/dfs/name");
- Collection<File> dirs = new ArrayList<File>(editsDirNames.size());
- for(String name : editsDirNames) {
- dirs.add(new File(name));
- }
- return dirs;
- }
- /**
- * dirs is a list of directories where the filesystem directory state
- * is stored
- */
- FSNamesystem(FSImage fsImage, Configuration conf) throws IOException {
- setConfigurationParameters(conf);
- this.dir = new FSDirectory(fsImage, this, conf);
- }
- /**
- * Initializes some of the members from configuration
- */
- private void setConfigurationParameters(Configuration conf)
- throws IOException {
- fsNamesystemObject = this;
- try {
- fsOwner = UnixUserGroupInformation.login(conf);
- } catch (LoginException e) {
- throw new IOException(StringUtils.stringifyException(e));
- }
- LOG.info("fsOwner=" + fsOwner);
- this.supergroup = conf.get("dfs.permissions.supergroup", "supergroup");
- this.isPermissionEnabled = conf.getBoolean("dfs.permissions", true);
- LOG.info("supergroup=" + supergroup);
- LOG.info("isPermissionEnabled=" + isPermissionEnabled);
- short filePermission = (short)conf.getInt("dfs.upgrade.permission", 0777);
- this.defaultPermission = PermissionStatus.createImmutable(
- fsOwner.getUserName(), supergroup, new FsPermission(filePermission));
- this.replicator = new ReplicationTargetChooser(
- conf.getBoolean("dfs.replication.considerLoad", true),
- this,
- clusterMap);
- this.defaultReplication = conf.getInt("dfs.replication", 3);
- this.maxReplication = conf.getInt("dfs.replication.max", 512);
- this.minReplication = conf.getInt("dfs.replication.min", 1);
- if (minReplication <= 0)
- throw new IOException(
- "Unexpected configuration parameters: dfs.replication.min = "
- + minReplication
- + " must be greater than 0");
- if (maxReplication >= (int)Short.MAX_VALUE)
- throw new IOException(
- "Unexpected configuration parameters: dfs.replication.max = "
- + maxReplication + " must be less than " + (Short.MAX_VALUE));
- if (maxReplication < minReplication)
- throw new IOException(
- "Unexpected configuration parameters: dfs.replication.min = "
- + minReplication
- + " must be less than dfs.replication.max = "
- + maxReplication);
- this.maxReplicationStreams = conf.getInt("dfs.max-repl-streams", 2);
- long heartbeatInterval = conf.getLong("dfs.heartbeat.interval", 3) * 1000;
- this.heartbeatRecheckInterval = conf.getInt(
- "heartbeat.recheck.interval", 5 * 60 * 1000); // 5 minutes
- this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval +
- 10 * heartbeatInterval;
- this.replicationRecheckInterval =
- conf.getInt("dfs.replication.interval", 3) * 1000L;
- this.defaultBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
- this.maxFsObjects = conf.getLong("dfs.max.objects", 0);
- this.blockInvalidateLimit = Math.max(this.blockInvalidateLimit,
- 20*(int)(heartbeatInterval/1000));
- this.accessTimePrecision = conf.getLong("dfs.access.time.precision", 0);
- this.supportAppends = conf.getBoolean("dfs.support.append", false);
- }
- /**
- * Return the default path permission when upgrading from releases with no
- * permissions (<=0.15) to releases with permissions (>=0.16)
- */
- protected PermissionStatus getUpgradePermission() {
- return defaultPermission;
- }
-
- /** Return the FSNamesystem object
- *
- */
- public static FSNamesystem getFSNamesystem() {
- return fsNamesystemObject;
- }
- NamespaceInfo getNamespaceInfo() {
- return new NamespaceInfo(dir.fsImage.getNamespaceID(),
- dir.fsImage.getCTime(),
- getDistributedUpgradeVersion());
- }
- /**
- * Close down this file system manager.
- * Causes heartbeat and lease daemons to stop; waits briefly for
- * them to finish, but a short timeout returns control back to caller.
- */
- public void close() {
- fsRunning = false;
- try {
- if (pendingReplications != null) pendingReplications.stop();
- if (hbthread != null) hbthread.interrupt();
- if (replthread != null) replthread.interrupt();
- if (dnthread != null) dnthread.interrupt();
- if (smmthread != null) smmthread.interrupt();
- } catch (Exception e) {
- LOG.warn("Exception shutting down FSNamesystem", e);
- } finally {
- // using finally to ensure we also wait for lease daemon
- try {
- if (lmthread != null) {
- lmthread.interrupt();
- lmthread.join(3000);
- }
- dir.close();
- } catch (InterruptedException ie) {
- } catch (IOException ie) {
- LOG.error("Error closing FSDirectory", ie);
- IOUtils.cleanup(LOG, dir);
- }
- }
- }
- /** Is this name system running? */
- boolean isRunning() {
- return fsRunning;
- }
- /**
- * Dump all metadata into specified file
- */
- synchronized void metaSave(String filename) throws IOException {
- checkSuperuserPrivilege();
- File file = new File(System.getProperty("hadoop.log.dir"),
- filename);
- PrintWriter out = new PrintWriter(new BufferedWriter(
- new FileWriter(file, true)));
-
- //
- // Dump contents of neededReplication
- //
- synchronized (neededReplications) {
- out.println("Metasave: Blocks waiting for replication: " +
- neededReplications.size());
- for (Block block : neededReplications) {
- List<DatanodeDescriptor> containingNodes =
- new ArrayList<DatanodeDescriptor>();
- NumberReplicas numReplicas = new NumberReplicas();
- // source node returned is not used
- chooseSourceDatanode(block, containingNodes, numReplicas);
- int usableReplicas = numReplicas.liveReplicas() +
- numReplicas.decommissionedReplicas();
- // l: == live:, d: == decommissioned c: == corrupt e: == excess
- out.print(block + " (replicas:" +
- " l: " + numReplicas.liveReplicas() +
- " d: " + numReplicas.decommissionedReplicas() +
- " c: " + numReplicas.corruptReplicas() +
- " e: " + numReplicas.excessReplicas() +
- ((usableReplicas > 0)? "" : " MISSING") + ")");
- for (Iterator<DatanodeDescriptor> jt = blocksMap.nodeIterator(block);
- jt.hasNext();) {
- DatanodeDescriptor node = jt.next();
- out.print(" " + node + " : ");
- }
- out.println("");
- }
- }
- //
- // Dump blocks from pendingReplication
- //
- pendingReplications.metaSave(out);
- //
- // Dump blocks that are waiting to be deleted
- //
- dumpRecentInvalidateSets(out);
- //
- // Dump all datanodes
- //
- datanodeDump(out);
- out.flush();
- out.close();
- }
- long getDefaultBlockSize() {
- return defaultBlockSize;
- }
- long getAccessTimePrecision() {
- return accessTimePrecision;
- }
- private boolean isAccessTimeSupported() {
- return accessTimePrecision > 0;
- }
-
- /* get replication factor of a block */
- private int getReplication(Block block) {
- INodeFile fileINode = blocksMap.getINode(block);
- if (fileINode == null) { // block does not belong to any file
- return 0;
- }
- assert !fileINode.isDirectory() : "Block cannot belong to a directory.";
- return fileINode.getReplication();
- }
- /* updates a block in under replication queue */
- synchronized void updateNeededReplications(Block block,
- int curReplicasDelta, int expectedReplicasDelta) {
- NumberReplicas repl = countNodes(block);
- int curExpectedReplicas = getReplication(block);
- neededReplications.update(block,
- repl.liveReplicas(),
- repl.decommissionedReplicas(),
- curExpectedReplicas,
- curReplicasDelta, expectedReplicasDelta);
- }
- /////////////////////////////////////////////////////////
- //
- // These methods are called by secondary namenodes
- //
- /////////////////////////////////////////////////////////
- /**
- * return a list of blocks & their locations on <code>datanode</code> whose
- * total size is <code>size</code>
- *
- * @param datanode on which blocks are located
- * @param size total size of blocks
- */
- synchronized BlocksWithLocations getBlocks(DatanodeID datanode, long size)
- throws IOException {
- checkSuperuserPrivilege();
- DatanodeDescriptor node = getDatanode(datanode);
- if (node == null) {
- NameNode.stateChangeLog.warn("BLOCK* NameSystem.getBlocks: "
- + "Asking for blocks from an unrecorded node " + datanode.getName());
- throw new IllegalArgumentException(
- "Unexpected exception. Got getBlocks message for datanode " +
- datanode.getName() + ", but there is no info for it");
- }
- int numBlocks = node.numBlocks();
- if(numBlocks == 0) {
- return new BlocksWithLocations(new BlockWithLocations[0]);
- }
- Iterator<Block> iter = node.getBlockIterator();
- int startBlock = r.nextInt(numBlocks); // starting from a random block
- // skip blocks
- for(int i=0; i<startBlock; i++) {
- iter.next();
- }
- List<BlockWithLocations> results = new ArrayList<BlockWithLocations>();
- long totalSize = 0;
- while(totalSize<size && iter.hasNext()) {
- totalSize += addBlock(iter.next(), results);
- }
- if(totalSize<size) {
- iter = node.getBlockIterator(); // start from the beginning
- for(int i=0; i<startBlock&&totalSize<size; i++) {
- totalSize += addBlock(iter.next(), results);
- }
- }
-
- return new BlocksWithLocations(
- results.toArray(new BlockWithLocations[results.size()]));
- }
-
- /**
- * Get all valid locations of the block & add the block to results
- * return the length of the added block; 0 if the block is not added
- */
- private long addBlock(Block block, List<BlockWithLocations> results) {
- ArrayList<String> machineSet =
- new ArrayList<String>(blocksMap.numNodes(block));
- for(Iterator<DatanodeDescriptor> it =
- blocksMap.nodeIterator(block); it.hasNext();) {
- String storageID = it.next().getStorageID();
- // filter invalidate replicas
- Collection<Block> blocks = recentInvalidateSets.get(storageID);
- if(blocks==null || !blocks.contains(block)) {
- machineSet.add(storageID);
- }
- }
- if(machineSet.size() == 0) {
- return 0;
- } else {
- results.add(new BlockWithLocations(block,
- machineSet.toArray(new String[machineSet.size()])));
- return block.getNumBytes();
- }
- }
- /////////////////////////////////////////////////////////
- //
- // These methods are called by HadoopFS clients
- //
- /////////////////////////////////////////////////////////
- /**
- * Set permissions for an existing file.
- * @throws IOException
- */
- public synchronized void setPermission(String src, FsPermission permission
- ) throws IOException {
- checkOwner(src);
- dir.setPermission(src, permission);
- getEditLog().logSync();
- if (auditLog.isInfoEnabled()) {
- final FileStatus stat = dir.getFileInfo(src);
- logAuditEvent(UserGroupInformation.getCurrentUGI(),
- Server.getRemoteIp(),
- "setPermission", src, null, stat);
- }
- }
- /**
- * Set owner for an existing file.
- * @throws IOException
- */
- public synchronized void setOwner(String src, String username, String group
- ) throws IOException {
- PermissionChecker pc = checkOwner(src);
- if (!pc.isSuper) {
- if (username != null && !pc.user.equals(username)) {
- throw new AccessControlException("Non-super user cannot change owner.");
- }
- if (group != null && !pc.containsGroup(group)) {
- throw new AccessControlException("User does not belong to " + group
- + " .");
- }
- }
- dir.setOwner(src, username, group);
- getEditLog().logSync();
- if (auditLog.isInfoEnabled()) {
- final FileStatus stat = dir.getFileInfo(src);
- logAuditEvent(UserGroupInformation.getCurrentUGI(),
- Server.getRemoteIp(),
- "setOwner", src, null, stat);
- }
- }
- /**
- * Get block locations within the specified range.
- *
- * @see #getBlockLocations(String, long, long)
- */
- LocatedBlocks getBlockLocations(String clientMachine, String src,
- long offset, long length) throws IOException {
- if (isPermissionEnabled) {
- checkPathAccess(src, FsAction.READ);
- }
- LocatedBlocks blocks = getBlockLocations(src, offset, length, true);
- if (blocks != null) {
- //sort the blocks
- DatanodeDescriptor client = host2DataNodeMap.getDatanodeByHost(
- clientMachine);
- for (LocatedBlock b : blocks.getLocatedBlocks()) {
- clusterMap.pseudoSortByDistance(client, b.getLocations());
- }
- }
- return blocks;
- }
- /**
- * Get block locations within the specified range.
- * @see ClientProtocol#getBlockLocations(String, long, long)
- */
- public LocatedBlocks getBlockLocations(String src, long offset, long length
- ) throws IOException {
- return getBlockLocations(src, offset, length, false);
- }
- /**
- * Get block locations within the specified range.
- * @see ClientProtocol#getBlockLocations(String, long, long)
- */
- public LocatedBlocks getBlockLocations(String src, long offset, long length,
- boolean doAccessTime) throws IOException {
- if (offset < 0) {
- throw new IOException("Negative offset is not supported. File: " + src );
- }
- if (length < 0) {
- throw new IOException("Negative length is not supported. File: " + src );
- }
- final LocatedBlocks ret = getBlockLocationsInternal(src, dir.getFileINode(src),
- offset, length, Integer.MAX_VALUE, doAccessTime);
- if (auditLog.isInfoEnabled()) {
- logAuditEvent(UserGroupInformation.getCurrentUGI(),
- Server.getRemoteIp(),
- "open", src, null, null);
- }
- return ret;
- }
- private synchronized LocatedBlocks getBlockLocationsInternal(String src,
- INodeFile inode,
- long offset,
- long length,
- int nrBlocksToReturn,
- boolean doAccessTime)
- throws IOException {
- if(inode == null) {
- return null;
- }
- if (doAccessTime && isAccessTimeSupported()) {
- dir.setTimes(src, inode, -1, now(), false);
- }
- Block[] blocks = inode.getBlocks();
- if (blocks == null) {
- return null;
- }
- if (blocks.length == 0) {
- return inode.createLocatedBlocks(new ArrayList<LocatedBlock>(blocks.length));
- }
- List<LocatedBlock> results;
- results = new ArrayList<LocatedBlock>(blocks.length);
- int curBlk = 0;
- long curPos = 0, blkSize = 0;
- int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length;
- for (curBlk = 0; curBlk < nrBlocks; curBlk++) {
- blkSize = blocks[curBlk].getNumBytes();
- assert blkSize > 0 : "Block of size 0";
- if (curPos + blkSize > offset) {
- break;
- }
- curPos += blkSize;
- }
-
- if (nrBlocks > 0 && curBlk == nrBlocks) // offset >= end of file
- return null;
-
- long endOff = offset + length;
-
- do {
- // get block locations
- int numNodes = blocksMap.numNodes(blocks[curBlk]);
- int numCorruptNodes = countNodes(blocks[curBlk]).corruptReplicas();
- int numCorruptReplicas = corruptReplicas.numCorruptReplicas(blocks[curBlk]);
- if (numCorruptNodes != numCorruptReplicas) {
- LOG.warn("Inconsistent number of corrupt replicas for " +
- blocks[curBlk] + "blockMap has " + numCorruptNodes +
- " but corrupt replicas map has " + numCorruptReplicas);
- }
- boolean blockCorrupt = (numCorruptNodes == numNodes);
- int numMachineSet = blockCorrupt ? numNodes :
- (numNodes - numCorruptNodes);
- DatanodeDescriptor[] machineSet = new DatanodeDescriptor[numMachineSet];
- if (numMachineSet > 0) {
- numNodes = 0;
- for(Iterator<DatanodeDescriptor> it =
- blocksMap.nodeIterator(blocks[curBlk]); it.hasNext();) {
- DatanodeDescriptor dn = it.next();
- boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blocks[curBlk], dn);
- if (blockCorrupt || (!blockCorrupt && !replicaCorrupt))
- machineSet[numNodes++] = dn;
- }
- }
- results.add(new LocatedBlock(blocks[curBlk], machineSet, curPos,
- blockCorrupt));
- curPos += blocks[curBlk].getNumBytes();
- curBlk++;
- } while (curPos < endOff
- && curBlk < blocks.length
- && results.size() < nrBlocksToReturn);
-
- return inode.createLocatedBlocks(results);
- }
- /**
- * stores the modification and access time for this inode.
- * The access time is precise upto an hour. The transaction, if needed, is
- * written to the edits log but is not flushed.
- */
- public synchronized void setTimes(String src, long mtime, long atime) throws IOException {
- if (!isAccessTimeSupported() && atime != -1) {
- throw new IOException("Access time for hdfs is not configured. " +
- " Please set dfs.support.accessTime configuration parameter.");
- }
- //
- // The caller needs to have write access to set access & modification times.
- if (isPermissionEnabled) {
- checkPathAccess(src, FsAction.WRITE);
- }
- INodeFile inode = dir.getFileINode(src);
- if (inode != null) {
- dir.setTimes(src, inode, mtime, atime, true);
- if (auditLog.isInfoEnabled()) {
- final FileStatus stat = dir.getFileInfo(src);
- logAuditEvent(UserGroupInformation.getCurrentUGI(),
- Server.getRemoteIp(),
- "setTimes", src, null, stat);
- }
- } else {
- throw new FileNotFoundException("File " + src + " does not exist.");
- }
- }
- /**
- * Set replication for an existing file.
- *
- * The NameNode sets new replication and schedules either replication of
- * under-replicated data blocks or removal of the eccessive block copies
- * if the blocks are over-replicated.
- *
- * @see ClientProtocol#setReplication(String, short)
- * @param src file name
- * @param replication new replication
- * @return true if successful;
- * false if file does not exist or is a directory
- */
- public boolean setReplication(String src, short replication)
- throws IOException {
- boolean status = setReplicationInternal(src, replication);
- getEditLog().logSync();
- if (status && auditLog.isInfoEnabled()) {
- logAuditEvent(UserGroupInformation.getCurrentUGI(),
- Server.getRemoteIp(),
- "setReplication", src, null, null);
- }
- return status;
- }
- private synchronized boolean setReplicationInternal(String src,
- short replication
- ) throws IOException {
- if (isInSafeMode())
- throw new SafeModeException("Cannot set replication for " + src, safeMode);
- verifyReplication(src, replication, null);
- if (isPermissionEnabled) {
- checkPathAccess(src, FsAction.WRITE);
- }
- int[] oldReplication = new int[1];
- Block[] fileBlocks;
- fileBlocks = dir.setReplication(src, replication, oldReplication);
- if (fileBlocks == null) // file not found or is a directory
- return false;
- int oldRepl = oldReplication[0];
- if (oldRepl == replication) // the same replication
- return true;
- // update needReplication priority queues
- for(int idx = 0; idx < fileBlocks.length; idx++)
- updateNeededReplications(fileBlocks[idx], 0, replication-oldRepl);
-
- if (oldRepl > replication) {
- // old replication > the new one; need to remove copies
- LOG.info("Reducing replication for file " + src
- + ". New replication is " + replication);
- for(int idx = 0; idx < fileBlocks.length; idx++)
- processOverReplicatedBlock(fileBlocks[idx], replication, null, null);
- } else { // replication factor is increased
- LOG.info("Increasing replication for file " + src
- + ". New replication is " + replication);
- }
- return true;
- }
-
- long getPreferredBlockSize(String filename) throws IOException {
- if (isPermissionEnabled) {
- checkTraverse(filename);
- }
- return dir.getPreferredBlockSize(filename);
- }
-
- /**
- * Check whether the replication parameter is within the range
- * determined by system configuration.
- */
- private void verifyReplication(String src,
- short replication,
- String clientName
- ) throws IOException {
- String text = "file " + src
- + ((clientName != null) ? " on client " + clientName : "")
- + ".n"
- + "Requested replication " + replication;
- if (replication > maxReplication)
- throw new IOException(text + " exceeds maximum " + maxReplication);
-
- if (replication < minReplication)
- throw new IOException(
- text + " is less than the required minimum " + minReplication);
- }
- /**
- * Create a new file entry in the namespace.
- *
- * @see ClientProtocol#create(String, FsPermission, String, boolean, short, long)
- *
- * @throws IOException if file name is invalid
- * {@link FSDirectory#isValidToCreate(String)}.
- */
- void startFile(String src, PermissionStatus permissions,
- String holder, String clientMachine,
- boolean overwrite, short replication, long blockSize
- ) throws IOException {
- startFileInternal(src, permissions, holder, clientMachine, overwrite, false,
- replication, blockSize);
- getEditLog().logSync();
- if (auditLog.isInfoEnabled()) {
- final FileStatus stat = dir.getFileInfo(src);
- logAuditEvent(UserGroupInformation.getCurrentUGI(),
- Server.getRemoteIp(),
- "create", src, null, stat);
- }
- }
- private synchronized void startFileInternal(String src,
- PermissionStatus permissions,
- String holder,
- String clientMachine,
- boolean overwrite,
- boolean append,
- short replication,
- long blockSize
- ) throws IOException {
- if (NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: src=" + src
- + ", holder=" + holder
- + ", clientMachine=" + clientMachine
- + ", replication=" + replication
- + ", overwrite=" + overwrite
- + ", append=" + append);
- }
- if (isInSafeMode())
- throw new SafeModeException("Cannot create file" + src, safeMode);
- if (!DFSUtil.isValidName(src)) {
- throw new IOException("Invalid file name: " + src);
- }
- // Verify that the destination does not exist as a directory already.
- boolean pathExists = dir.exists(src);
- if (pathExists && dir.isDir(src)) {
- throw new IOException("Cannot create file "+ src + "; already exists as a directory.");
- }
- if (isPermissionEnabled) {
- if (append || (overwrite && pathExists)) {
- checkPathAccess(src, FsAction.WRITE);
- }
- else {
- checkAncestorAccess(src, FsAction.WRITE);
- }
- }
- try {
- INode myFile = dir.getFileINode(src);
- if (myFile != null && myFile.isUnderConstruction()) {
- INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) myFile;
- //
- // If the file is under construction , then it must be in our
- // leases. Find the appropriate lease record.
- //
- Lease lease = leaseManager.getLease(holder);
- //
- // We found the lease for this file. And surprisingly the original
- // holder is trying to recreate this file. This should never occur.
- //
- if (lease != null) {
- throw new AlreadyBeingCreatedException(
- "failed to create file " + src + " for " + holder +
- " on client " + clientMachine +
- " because current leaseholder is trying to recreate file.");
- }
- //
- // Find the original holder.
- //
- lease = leaseManager.getLease(pendingFile.clientName);
- if (lease == null) {
- throw new AlreadyBeingCreatedException(
- "failed to create file " + src + " for " + holder +
- " on client " + clientMachine +
- " because pendingCreates is non-null but no leases found.");
- }
- //
- // If the original holder has not renewed in the last SOFTLIMIT
- // period, then start lease recovery.
- //
- if (lease.expiredSoftLimit()) {
- LOG.info("startFile: recover lease " + lease + ", src=" + src);
- internalReleaseLease(lease, src);
- }
- throw new AlreadyBeingCreatedException("failed to create file " + src + " for " + holder +
- " on client " + clientMachine +
- ", because this file is already being created by " +
- pendingFile.getClientName() +
- " on " + pendingFile.getClientMachine());
- }
- try {
- verifyReplication(src, replication, clientMachine);
- } catch(IOException e) {
- throw new IOException("failed to create "+e.getMessage());
- }
- if (append) {
- if (myFile == null) {
- throw new FileNotFoundException("failed to append to non-existent file "
- + src + " on client " + clientMachine);
- } else if (myFile.isDirectory()) {
- throw new IOException("failed to append to directory " + src
- +" on client " + clientMachine);
- }
- } else if (!dir.isValidToCreate(src)) {
- if (overwrite) {
- delete(src, true);
- } else {
- throw new IOException("failed to create file " + src
- +" on client " + clientMachine
- +" either because the filename is invalid or the file exists");
- }
- }
- DatanodeDescriptor clientNode =
- host2DataNodeMap.getDatanodeByHost(clientMachine);
- if (append) {
- //
- // Replace current node with a INodeUnderConstruction.
- // Recreate in-memory lease record.
- //
- INodeFile node = (INodeFile) myFile;
- INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
- node.getLocalNameBytes(),
- node.getReplication(),
- node.getModificationTime(),
- node.getPreferredBlockSize(),
- node.getBlocks(),
- node.getPermissionStatus(),
- holder,
- clientMachine,
- clientNode);
- dir.replaceNode(src, node, cons);
- leaseManager.addLease(cons.clientName, src);
- } else {
- // Now we can add the name to the filesystem. This file has no
- // blocks associated with it.
- //
- checkFsObjectLimit();
- // increment global generation stamp
- long genstamp = nextGenerationStamp();
- INodeFileUnderConstruction newNode = dir.addFile(src, permissions,
- replication, blockSize, holder, clientMachine, clientNode, genstamp);
- if (newNode == null) {
- throw new IOException("DIR* NameSystem.startFile: " +
- "Unable to add file to namespace.");
- }
- leaseManager.addLease(newNode.clientName, src);
- if (NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: "
- +"add "+src+" to namespace for "+holder);
- }
- }
- } catch (IOException ie) {
- NameNode.stateChangeLog.warn("DIR* NameSystem.startFile: "
- +ie.getMessage());
- throw ie;
- }
- }
- /**
- * Append to an existing file in the namespace.
- */
- LocatedBlock appendFile(String src, String holder, String clientMachine
- ) throws IOException {
- if (supportAppends == false) {
- throw new IOException("Append to hdfs not supported." +
- " Please refer to dfs.support.append configuration parameter.");
- }
- startFileInternal(src, null, holder, clientMachine, false, true,
- (short)maxReplication, (long)0);
- getEditLog().logSync();
- //
- // Create a LocatedBlock object for the last block of the file
- // to be returned to the client. Return null if the file does not
- // have a partial block at the end.
- //
- LocatedBlock lb = null;
- synchronized (this) {
- INodeFileUnderConstruction file = (INodeFileUnderConstruction)dir.getFileINode(src);
- Block[] blocks = file.getBlocks();
- if (blocks != null && blocks.length > 0) {
- Block last = blocks[blocks.length-1];
- BlockInfo storedBlock = blocksMap.getStoredBlock(last);
- if (file.getPreferredBlockSize() > storedBlock.getNumBytes()) {
- long fileLength = file.computeContentSummary().getLength();
- DatanodeDescriptor[] targets = new DatanodeDescriptor[blocksMap.numNodes(last)];
- Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(last);
- for (int i = 0; it != null && it.hasNext(); i++) {
- targets[i] = it.next();
- }
- // remove the replica locations of this block from the blocksMap
- for (int i = 0; i < targets.length; i++) {
- targets[i].removeBlock(storedBlock);
- }
- // set the locations of the last block in the lease record
- file.setLastBlock(storedBlock, targets);
- lb = new LocatedBlock(last, targets,
- fileLength-storedBlock.getNumBytes());
- // Remove block from replication queue.
- updateNeededReplications(last, 0, 0);
- // remove this block from the list of pending blocks to be deleted.
- // This reduces the possibility of triggering HADOOP-1349.
- //
- for(Collection<Block> v : recentInvalidateSets.values()) {
- v.remove(last);
- }
- }
- }
- }
- if (lb != null) {
- if (NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug("DIR* NameSystem.appendFile: file "
- +src+" for "+holder+" at "+clientMachine
- +" block " + lb.getBlock()
- +" block size " + lb.getBlock().getNumBytes());
- }
- }
- if (auditLog.isInfoEnabled()) {
- logAuditEvent(UserGroupInformation.getCurrentUGI(),
- Server.getRemoteIp(),
- "append", src, null, null);
- }
- return lb;
- }
- /**
- * The client would like to obtain an additional block for the indicated
- * filename (which is being written-to). Return an array that consists
- * of the block, plus a set of machines. The first on this list should
- * be where the client writes data. Subsequent items in the list must
- * be provided in the connection to the first datanode.
- *
- * Make sure the previous blocks have been reported by datanodes and
- * are replicated. Will return an empty 2-elt array if we want the
- * client to "try again later".
- */
- public LocatedBlock getAdditionalBlock(String src,
- String clientName
- ) throws IOException {
- long fileLength, blockSize;
- int replication;
- DatanodeDescriptor clientNode = null;
- Block newBlock = null;
- NameNode.stateChangeLog.debug("BLOCK* NameSystem.getAdditionalBlock: file "
- +src+" for "+clientName);
- synchronized (this) {
- if (isInSafeMode()) {
- throw new SafeModeException("Cannot add block to " + src, safeMode);
- }
- // have we exceeded the configured limit of fs objects.
- checkFsObjectLimit();
- INodeFileUnderConstruction pendingFile = checkLease(src, clientName);
- //
- // If we fail this, bad things happen!
- //
- if (!checkFileProgress(pendingFile, false)) {
- throw new NotReplicatedYetException("Not replicated yet:" + src);
- }
- fileLength = pendingFile.computeContentSummary().getLength();
- blockSize = pendingFile.getPreferredBlockSize();
- clientNode = pendingFile.getClientNode();
- replication = (int)pendingFile.getReplication();
- }
- // choose targets for the new block tobe allocated.
- DatanodeDescriptor targets[] = replicator.chooseTarget(replication,
- clientNode,
- null,
- blockSize);
- if (targets.length < this.minReplication) {
- throw new IOException("File " + src + " could only be replicated to " +
- targets.length + " nodes, instead of " +
- minReplication);
- }
- // Allocate a new block and record it in the INode.
- synchronized (this) {
- INode[] pathINodes = dir.getExistingPathINodes(src);
- int inodesLen = pathINodes.length;
- checkLease(src, clientName, pathINodes[inodesLen-1]);
- INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)
- pathINodes[inodesLen - 1];
-
- if (!checkFileProgress(pendingFile, false)) {
- throw new NotReplicatedYetException("Not replicated yet:" + src);
- }
- // allocate new block record block locations in INode.
- newBlock = allocateBlock(src, pathINodes);
- pendingFile.setTargets(targets);
-
- for (DatanodeDescriptor dn : targets) {
- dn.incBlocksScheduled();
- }
- }
-
- // Create next block
- return new LocatedBlock(newBlock, targets, fileLength);
- }
- /**
- * The client would like to let go of the given block
- */
- public synchronized boolean abandonBlock(Block b, String src, String holder
- ) throws IOException {
- //
- // Remove the block from the pending creates list
- //
- NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
- +b+"of file "+src);
- INodeFileUnderConstruction file = checkLease(src, holder);
- dir.removeBlock(src, file, b);
- NameNode.stateChangeLog.debug("BLOCK* NameSystem.abandonBlock: "
- + b
- + " is removed from pendingCreates");
- return true;
- }
-
- // make sure that we still have the lease on this file.
- private INodeFileUnderConstruction checkLease(String src, String holder)
- throws IOException {
- INodeFile file = dir.getFileINode(src);
- checkLease(src, holder, file);
- return (INodeFileUnderConstruction)file;
- }
- private void checkLease(String src, String holder, INode file)
- throws IOException {
- if (file == null || file.isDirectory()) {
- Lease lease = leaseManager.getLease(holder);
- throw new LeaseExpiredException("No lease on " + src +
- " File does not exist. " +
- (lease != null ? lease.toString() :
- "Holder " + holder +
- " does not have any open files."));
- }
- if (!file.isUnderConstruction()) {
- Lease lease = leaseManager.getLease(holder);
- throw new LeaseExpiredException("No lease on " + src +
- " File is not open for writing. " +
- (lease != null ? lease.toString() :
- "Holder " + holder +
- " does not have any open files."));
- }
- INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)file;
- if (holder != null && !pendingFile.getClientName().equals(holder)) {
- throw new LeaseExpiredException("Lease mismatch on " + src + " owned by "
- + pendingFile.getClientName() + " but is accessed by " + holder);
- }
- }
- /**
- * The FSNamesystem will already know the blocks that make up the file.
- * Before we return, we make sure that all the file's blocks have
- * been reported by datanodes and are replicated correctly.
- */
-
- enum CompleteFileStatus {
- OPERATION_FAILED,
- STILL_WAITING,
- COMPLETE_SUCCESS
- }
-
- public CompleteFileStatus completeFile(String src, String holder) throws IOException {
- CompleteFileStatus status = completeFileInternal(src, holder);
- getEditLog().logSync();
- return status;
- }
- private synchronized CompleteFileStatus completeFileInternal(String src,
- String holder) throws IOException {
- NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src + " for " + holder);
- if (isInSafeMode())
- throw new SafeModeException("Cannot complete file " + src, safeMode);
- INode iFile = dir.getFileINode(src);
- INodeFileUnderConstruction pendingFile = null;
- Block[] fileBlocks = null;
- if (iFile != null && iFile.isUnderConstruction()) {
- pendingFile = (INodeFileUnderConstruction) iFile;
- fileBlocks = dir.getFileBlocks(src);
- }
- if (fileBlocks == null ) {
- NameNode.stateChangeLog.warn("DIR* NameSystem.completeFile: "
- + "failed to complete " + src
- + " because dir.getFileBlocks() is null " +
- " and pendingFile is " +
- ((pendingFile == null) ? "null" :
- ("from " + pendingFile.getClientMachine()))
- );
- return CompleteFileStatus.OPERATION_FAILED;
- } else if (!checkFileProgress(pendingFile, true)) {
- return CompleteFileStatus.STILL_WAITING;
- }
- finalizeINodeFileUnderConstruction(src, pendingFile);
- if (NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src
- + " blocklist persisted");
- }
- return CompleteFileStatus.COMPLETE_SUCCESS;
- }
- /**
- * Check all blocks of a file. If any blocks are lower than their intended
- * replication factor, then insert them into neededReplication
- */
- private void checkReplicationFactor(INodeFile file) {
- int numExpectedReplicas = file.getReplication();
- Block[] pendingBlocks = file.getBlocks();
- int nrBlocks = pendingBlocks.length;
- for (int i = 0; i < nrBlocks; i++) {
- // filter out containingNodes that are marked for decommission.
- NumberReplicas number = countNodes(pendingBlocks[i]);
- if (number.liveReplicas() < numExpectedReplicas) {
- neededReplications.add(pendingBlocks[i],
- number.liveReplicas(),
- number.decommissionedReplicas,
- numExpectedReplicas);
- }
- }
- }
- static Random randBlockId = new Random();
-
- /**
- * Allocate a block at the given pending filename
- *
- * @param src path to the file
- * @param inodes INode representing each of the components of src.
- * <code>inodes[inodes.length-1]</code> is the INode for the file.
- */
- private Block allocateBlock(String src, INode[] inodes) throws IOException {
- Block b = new Block(FSNamesystem.randBlockId.nextLong(), 0, 0);
- while(isValidBlock(b)) {
- b.setBlockId(FSNamesystem.randBlockId.nextLong());
- }
- b.setGenerationStamp(getGenerationStamp());
- b = dir.addBlock(src, inodes, b);
- NameNode.stateChangeLog.info("BLOCK* NameSystem.allocateBlock: "
- +src+ ". "+b);
- return b;
- }
- /**
- * Check that the indicated file's blocks are present and
- * replicated. If not, return false. If checkall is true, then check
- * all blocks, otherwise check only penultimate block.
- */
- synchronized boolean checkFileProgress(INodeFile v, boolean checkall) {
- if (checkall) {
- //
- // check all blocks of the file.
- //
- for (Block block: v.getBlocks()) {
- if (blocksMap.numNodes(block) < this.minReplication) {
- return false;
- }
- }
- } else {
- //
- // check the penultimate block of this file
- //
- Block b = v.getPenultimateBlock();
- if (b != null) {
- if (blocksMap.numNodes(b) < this.minReplication) {
- return false;
- }
- }
- }
- return true;
- }
- /**
- * Remove a datanode from the invalidatesSet
- * @param n datanode
- */
- private void removeFromInvalidates(DatanodeInfo n) {
- recentInvalidateSets.remove(n.getStorageID());
- }
- /**
- * Adds block to list of blocks which will be invalidated on
- * specified datanode and log the move
- * @param b block
- * @param n datanode
- */
- void addToInvalidates(Block b, DatanodeInfo n) {
- addToInvalidatesNoLog(b, n);
- NameNode.stateChangeLog.info("BLOCK* NameSystem.addToInvalidates: "
- + b.getBlockName() + " is added to invalidSet of " + n.getName());
- }
- /**
- * Adds block to list of blocks which will be invalidated on
- * specified datanode
- * @param b block
- * @param n datanode
- */
- private void addToInvalidatesNoLog(Block b, DatanodeInfo n) {
- Collection<Block> invalidateSet = recentInvalidateSets.get(n.getStorageID());
- if (invalidateSet == null) {
- invalidateSet = new HashSet<Block>();
- recentInvalidateSets.put(n.getStorageID(), invalidateSet);
- }
- invalidateSet.add(b);
- }
-
- /**
- * Adds block to list of blocks which will be invalidated on
- * all its datanodes.
- */
- private void addToInvalidates(Block b) {
- for (Iterator<DatanodeDescriptor> it =
- blocksMap.nodeIterator(b); it.hasNext();) {
- DatanodeDescriptor node = it.next();
- addToInvalidates(b, node);
- }
- }
- /**
- * dumps the contents of recentInvalidateSets
- */
- private synchronized void dumpRecentInvalidateSets(PrintWriter out) {
- int size = recentInvalidateSets.values().size();
- out.println("Metasave: Blocks waiting deletion from "+size+" datanodes.");
- if (size == 0) {
- return;
- }
- for(Map.Entry<String,Collection<Block>> entry : recentInvalidateSets.entrySet()) {
- Collection<Block> blocks = entry.getValue();
- if (blocks.size() > 0) {
- out.println(datanodeMap.get(entry.getKey()).getName() + blocks);
- }
- }
- }
- /**
- * Mark the block belonging to datanode as corrupt
- * @param blk Block to be marked as corrupt
- * @param dn Datanode which holds the corrupt replica
- */
- public synchronized void markBlockAsCorrupt(Block blk, DatanodeInfo dn)
- throws IOException {
- DatanodeDescriptor node = getDatanode(dn);
- if (node == null) {
- throw new IOException("Cannot mark block" + blk.getBlockName() +
- " as corrupt because datanode " + dn.getName() +
- " does not exist. ");
- }
-
- final BlockInfo storedBlockInfo = blocksMap.getStoredBlock(blk);
- if (storedBlockInfo == null) {
- // Check if the replica is in the blockMap, if not
- // ignore the request for now. This could happen when BlockScanner
- // thread of Datanode reports bad block before Block reports are sent
- // by the Datanode on startup
- NameNode.stateChangeLog.info("BLOCK NameSystem.markBlockAsCorrupt: " +
- "block " + blk + " could not be marked " +
- "as corrupt as it does not exists in " +
- "blocksMap");
- } else {
- INodeFile inode = storedBlockInfo.getINode();
- if (inode == null) {
- NameNode.stateChangeLog.info("BLOCK NameSystem.markBlockAsCorrupt: " +
- "block " + blk + " could not be marked " +
- "as corrupt as it does not belong to " +
- "any file");
- addToInvalidates(storedBlockInfo, node);
- return;
- }
- // Add this replica to corruptReplicas Map
- corruptReplicas.addToCorruptReplicasMap(storedBlockInfo, node);
- if (countNodes(storedBlockInfo).liveReplicas()>inode.getReplication()) {
- // the block is over-replicated so invalidate the replicas immediately
- invalidateBlock(storedBlockInfo, node);
- } else {
- // add the block to neededReplication
- updateNeededReplications(storedBlockInfo, -1, 0);
- }
- }
- }
- /**
- * Invalidates the given block on the given datanode.
- */
- public synchronized void invalidateBlock(Block blk, DatanodeInfo dn)
- throws IOException {
- NameNode.stateChangeLog.info("DIR* NameSystem.invalidateBlock: "
- + blk + " on "
- + dn.getName());
- DatanodeDescriptor node = getDatanode(dn);
- if (node == null) {
- throw new IOException("Cannot invalidate block " + blk +
- " because datanode " + dn.getName() +
- " does not exist.");
- }
- // Check how many copies we have of the block. If we have at least one
- // copy on a live node, then we can delete it.
- int count = countNodes(blk).liveReplicas();
- if (count > 1) {
- addToInvalidates(blk, dn);
- removeStoredBlock(blk, node);
- NameNode.stateChangeLog.debug("BLOCK* NameSystem.invalidateBlocks: "
- + blk + " on "
- + dn.getName() + " listed for deletion.");
- } else {
- NameNode.stateChangeLog.info("BLOCK* NameSystem.invalidateBlocks: "
- + blk + " on "
- + dn.getName() + " is the only copy and was not deleted.");
- }
- }
- ////////////////////////////////////////////////////////////////
- // Here's how to handle block-copy failure during client write:
- // -- As usual, the client's write should result in a streaming
- // backup write to a k-machine sequence.
- // -- If one of the backup machines fails, no worries. Fail silently.
- // -- Before client is allowed to close and finalize file, make sure
- // that the blocks are backed up. Namenode may have to issue specific backup
- // commands to make up for earlier datanode failures. Once all copies
- // are made, edit namespace and return to client.
- ////////////////////////////////////////////////////////////////
- /** Change the indicated filename. */
- public boolean renameTo(String src, String dst) throws IOException {
- boolean status = renameToInternal(src, dst);
- getEditLog().logSync();
- if (status && auditLog.isInfoEnabled()) {
- final FileStatus stat = dir.getFileInfo(dst);
- logAuditEvent(UserGroupInformation.getCurrentUGI(),
- Server.getRemoteIp(),
- "rename", src, dst, stat);
- }
- return status;
- }
- private synchronized boolean renameToInternal(String src, String dst
- ) throws IOException {
- NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src + " to " + dst);
- if (isInSafeMode())
- throw new SafeModeException("Cannot rename " + src, safeMode);
- if (!DFSUtil.isValidName(dst)) {
- throw new IOException("Invalid name: " + dst);
- }
- if (isPermissionEnabled) {
- //We should not be doing this. This is move() not renameTo().
- //but for now,
- String actualdst = dir.isDir(dst)?
- dst + Path.SEPARATOR + new Path(src).getName(): dst;
- checkParentAccess(src, FsAction.WRITE);
- checkAncestorAccess(actualdst, FsAction.WRITE);
- }
- FileStatus dinfo = dir.getFileInfo(dst);
- if (dir.renameTo(src, dst)) {
- changeLease(src, dst, dinfo); // update lease with new filename
- return true;
- }
- return false;
- }
- /**
- * Remove the indicated filename from namespace. If the filename
- * is a directory (non empty) and recursive is set to false then throw exception.
- */
- public boolean delete(String src, boolean recursive) throws IOException {
- if ((!recursive) && (!dir.isDirEmpty(src))) {
- throw new IOException(src + " is non empty");
- }
- boolean status = deleteInternal(src, true);
- getEditLog().logSync();
- if (status && auditLog.isInfoEnabled()) {
- logAuditEvent(UserGroupInformation.getCurrentUGI(),
- Server.getRemoteIp(),
- "delete", src, null, null);
- }
- return status;
- }
-
- /**
- * Remove the indicated filename from the namespace. This may
- * invalidate some blocks that make up the file.
- */
- synchronized boolean deleteInternal(String src,
- boolean enforcePermission) throws IOException {
- if (NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug("DIR* NameSystem.delete: " + src);
- }
- if (isInSafeMode())
- throw new SafeModeException("Cannot delete " + src, safeMode);
- if (enforcePermission && isPermissionEnabled) {
- checkPermission(src, false, null, FsAction.WRITE, null, FsAction.ALL);
- }
- return dir.delete(src) != null;
- }
- void removePathAndBlocks(String src, List<Block> blocks) throws IOException {
- leaseManager.removeLeaseWithPrefixPath(src);
- for(Block b : blocks) {
- blocksMap.removeINode(b);
- corruptReplicas.removeFromCorruptReplicasMap(b);
- addToInvalidates(b);
- }
- }
- /** Get the file info for a specific file.
- * @param src The string representation of the path to the file
- * @throws IOException if permission to access file is denied by the system
- * @return object containing information regarding the file
- * or null if file not found
- */
- FileStatus getFileInfo(String src) throws IOException {
- if (isPermissionEnabled) {
- checkTraverse(src);
- }
- return dir.getFileInfo(src);
- }
- /**
- * Create all the necessary directories
- */
- public boolean mkdirs(String src, PermissionStatus permissions
- ) throws IOException {
- boolean status = mkdirsInternal(src, permissions);
- getEditLog().logSync();
- if (status && auditLog.isInfoEnabled()) {
- final FileStatus stat = dir.getFileInfo(src);
- logAuditEvent(UserGroupInformation.getCurrentUGI(),
- Server.getRemoteIp(),
- "mkdirs", src, null, stat);
- }
- return status;
- }
-
- /**
- * Create all the necessary directories
- */
- private synchronized boolean mkdirsInternal(String src,
- PermissionStatus permissions) throws IOException {
- NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src);
- if (isPermissionEnabled) {
- checkTraverse(src);
- }
- if (dir.isDir(src)) {
- // all the users of mkdirs() are used to expect 'true' even if
- // a new directory is not created.
- return true;
- }
- if (isInSafeMode())
- throw new SafeModeException("Cannot create directory " + src, safeMode);
- if (!DFSUtil.isValidName(src)) {
- throw new IOException("Invalid directory name: " + src);
- }
- if (isPermissionEnabled) {
- checkAncestorAccess(src, FsAction.WRITE);
- }
- // validate that we have enough inodes. This is, at best, a
- // heuristic because the mkdirs() operation migth need to
- // create multiple inodes.
- checkFsObjectLimit();
- if (!dir.mkdirs(src, permissions, false, now())) {
- throw new IOException("Invalid directory name: " + src);
- }
- return true;
- }
- ContentSummary getContentSummary(String src) throws IOException {
- if (isPermissionEnabled) {
- checkPermission(src, false, null, null, null, FsAction.READ_EXECUTE);
- }
- return dir.getContentSummary(src);
- }
- /**
- * Set the namespace quota and diskspace quota for a directory.
- * See {@link ClientProtocol#setQuota(String, long, long)} for the
- * contract.
- */
- void setQuota(String path, long nsQuota, long dsQuota) throws IOException {
- if (isPermissionEnabled) {
- checkSuperuserPrivilege();
- }
-
- dir.setQuota(path, nsQuota, dsQuota);
- getEditLog().logSync();
- }
-
- /** Persist all metadata about this file.
- * @param src The string representation of the path
- * @param clientName The string representation of the client
- * @throws IOException if path does not exist
- */
- void fsync(String src, String clientName) throws IOException {
- NameNode.stateChangeLog.info("BLOCK* NameSystem.fsync: file "
- + src + " for " + clientName);
- synchronized (this) {
- if (isInSafeMode()) {
- throw new SafeModeException("Cannot fsync file " + src, safeMode);
- }
- INodeFileUnderConstruction pendingFile = checkLease(src, clientName);
- dir.persistBlocks(src, pendingFile);
- }
- }
- /**
- * Move a file that is being written to be immutable.
- * @param src The filename
- * @param lease The lease for the client creating the file
- */
- void internalReleaseLease(Lease lease, String src) throws IOException {
- LOG.info("Recovering lease=" + lease + ", src=" + src);
- INodeFile iFile = dir.getFileINode(src);
- if (iFile == null) {
- final String message = "DIR* NameSystem.internalReleaseCreate: "
- + "attempt to release a create lock on "
- + src + " file does not exist.";
- NameNode.stateChangeLog.warn(message);
- throw new IOException(message);
- }
- if (!iFile.isUnderConstruction()) {
- final String message = "DIR* NameSystem.internalReleaseCreate: "
- + "attempt to release a create lock on "
- + src + " but file is already closed.";
- NameNode.stateChangeLog.warn(message);
- throw new IOException(message);
- }
- INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) iFile;
- // Initialize lease recovery for pendingFile. If there are no blocks
- // associated with this file, then reap lease immediately. Otherwise
- // renew the lease and trigger lease recovery.
- if (pendingFile.getTargets() == null ||
- pendingFile.getTargets().length == 0) {
- if (pendingFile.getBlocks().length == 0) {
- finalizeINodeFileUnderConstruction(src, pendingFile);
- NameNode.stateChangeLog.warn("BLOCK*"
- + " internalReleaseLease: No blocks found, lease removed.");
- return;
- }
- // setup the Inode.targets for the last block from the blocksMap
- //
- Block[] blocks = pendingFile.getBlocks();
- Block last = blocks[blocks.length-1];
- DatanodeDescriptor[] targets =
- new DatanodeDescriptor[blocksMap.numNodes(last)];
- Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(last);
- for (int i = 0; it != null && it.hasNext(); i++) {
- targets[i] = it.next();
- }
- pendingFile.setTargets(targets);
- }
- // start lease recovery of the last block for this file.
- pendingFile.assignPrimaryDatanode();
- leaseManager.renewLease(lease);
- }
- private void finalizeINodeFileUnderConstruction(String src,
- INodeFileUnderConstruction pendingFile) throws IOException {
- leaseManager.removeLease(pendingFile.clientName, src);
- // The file is no longer pending.
- // Create permanent INode, update blockmap
- INodeFile newFile = pendingFile.convertToInodeFile();
- dir.replaceNode(src, pendingFile, newFile);
- // close file and persist block allocations for this file
- dir.closeFile(src, newFile);
- checkReplicationFactor(newFile);
- }
- synchronized void commitBlockSynchronization(Block lastblock,
- long newgenerationstamp, long newlength,
- boolean closeFile, boolean deleteblock, DatanodeID[] newtargets
- ) throws IOException {
- LOG.info("commitBlockSynchronization(lastblock=" + lastblock
- + ", newgenerationstamp=" + newgenerationstamp
- + ", newlength=" + newlength
- + ", newtargets=" + Arrays.asList(newtargets)
- + ", closeFile=" + closeFile
- + ", deleteBlock=" + deleteblock
- + ")");
- final BlockInfo oldblockinfo = blocksMap.getStoredBlock(lastblock);
- if (oldblockinfo == null) {
- throw new IOException("Block (=" + lastblock + ") not found");
- }
- INodeFile iFile = oldblockinfo.getINode();
- if (!iFile.isUnderConstruction()) {
- throw new IOException("Unexpected block (=" + lastblock
- + ") since the file (=" + iFile.getLocalName()
- + ") is not under construction");
- }
- INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)iFile;
- // Remove old block from blocks map. This always have to be done
- // because the generation stamp of this block is changing.
- blocksMap.removeBlock(oldblockinfo);
- if (deleteblock) {
- pendingFile.removeBlock(lastblock);
- }
- else {
- // update last block, construct newblockinfo and add it to the blocks map
- lastblock.set(lastblock.getBlockId(), newlength, newgenerationstamp);
- final BlockInfo newblockinfo = blocksMap.addINode(lastblock, pendingFile);
- // find the DatanodeDescriptor objects
- // There should be no locations in the blocksMap till now because the
- // file is underConstruction
- DatanodeDescriptor[] descriptors = null;
- if (newtargets.length > 0) {
- descriptors = new DatanodeDescriptor[newtargets.length];
- for(int i = 0; i < newtargets.length; i++) {
- descriptors[i] = getDatanode(newtargets[i]);
- }
- }
- if (closeFile) {
- // the file is getting closed. Insert block locations into blocksMap.
- // Otherwise fsck will report these blocks as MISSING, especially if the
- // blocksReceived from Datanodes take a long time to arrive.
- for (int i = 0; i < descriptors.length; i++) {
- descriptors[i].addBlock(newblockinfo);
- }
- pendingFile.setLastBlock(newblockinfo, null);
- } else {
- // add locations into the INodeUnderConstruction
- pendingFile.setLastBlock(newblockinfo, descriptors);
- }
- }
- // If this commit does not want to close the file, just persist
- // blocks and return
- String src = leaseManager.findPath(pendingFile);
- if (!closeFile) {
- dir.persistBlocks(src, pendingFile);
- getEditLog().logSync();
- LOG.info("commitBlockSynchronization(" + lastblock + ") successful");
- return;
- }
-
- //remove lease, close file
- finalizeINodeFileUnderConstruction(src, pendingFile);
- getEditLog().logSync();
- LOG.info("commitBlockSynchronization(newblock=" + lastblock
- + ", file=" + src
- + ", newgenerationstamp=" + newgenerationstamp
- + ", newlength=" + newlength
- + ", newtargets=" + Arrays.asList(newtargets) + ") successful");
- }
- /**
- * Renew the lease(s) held by the given client
- */
- void renewLease(String holder) throws IOException {
- if (isInSafeMode())
- throw new SafeModeException("Cannot renew lease for " + holder, safeMode);
- leaseManager.renewLease(holder);
- }
- /**
- * Get a listing of all files at 'src'. The Object[] array
- * exists so we can return file attributes (soon to be implemented)
- */
- public FileStatus[] getListing(String src) throws IOException {
- if (isPermissionEnabled) {
- if (dir.isDir(src)) {
- checkPathAccess(src, FsAction.READ_EXECUTE);
- }
- else {
- checkTraverse(src);
- }
- }
- if (auditLog.isInfoEnabled()) {
- logAuditEvent(UserGroupInformation.getCurrentUGI(),
- Server.getRemoteIp(),
- "listStatus", src, null, null);
- }
- return dir.getListing(src);
- }
- /////////////////////////////////////////////////////////
- //
- // These methods are called by datanodes
- //
- /////////////////////////////////////////////////////////
- /**
- * Register Datanode.
- * <p>
- * The purpose of registration is to identify whether the new datanode
- * serves a new data storage, and will report new data block copies,
- * which the namenode was not aware of; or the datanode is a replacement
- * node for the data storage that was previously served by a different
- * or the same (in terms of host:port) datanode.
- * The data storages are distinguished by their storageIDs. When a new
- * data storage is reported the namenode issues a new unique storageID.
- * <p>
- * Finally, the namenode returns its namespaceID as the registrationID
- * for the datanodes.
- * namespaceID is a persistent attribute of the name space.
- * The registrationID is checked every time the datanode is communicating
- * with the namenode.
- * Datanodes with inappropriate registrationID are rejected.
- * If the namenode stops, and then restarts it can restore its
- * namespaceID and will continue serving the datanodes that has previously
- * registered with the namenode without restarting the whole cluster.
- *
- * @see org.apache.hadoop.hdfs.server.datanode.DataNode#register()
- */
- public synchronized void registerDatanode(DatanodeRegistration nodeReg
- ) throws IOException {
- String dnAddress = Server.getRemoteAddress();
- if (dnAddress == null) {
- // Mostly called inside an RPC.
- // But if not, use address passed by the data-node.
- dnAddress = nodeReg.getHost();
- }
- // check if the datanode is allowed to be connect to the namenode
- if (!verifyNodeRegistration(nodeReg, dnAddress)) {
- throw new DisallowedDatanodeException(nodeReg);
- }
- String hostName = nodeReg.getHost();
-
- // update the datanode's name with ip:port
- DatanodeID dnReg = new DatanodeID(dnAddress + ":" + nodeReg.getPort(),
- nodeReg.getStorageID(),
- nodeReg.getInfoPort(),
- nodeReg.getIpcPort());
- nodeReg.updateRegInfo(dnReg);
-
- NameNode.stateChangeLog.info(
- "BLOCK* NameSystem.registerDatanode: "
- + "node registration from " + nodeReg.getName()
- + " storage " + nodeReg.getStorageID());
- DatanodeDescriptor nodeS = datanodeMap.get(nodeReg.getStorageID());
- DatanodeDescriptor nodeN = host2DataNodeMap.getDatanodeByName(nodeReg.getName());
-
- if (nodeN != null && nodeN != nodeS) {
- NameNode.LOG.info("BLOCK* NameSystem.registerDatanode: "
- + "node from name: " + nodeN.getName());
- // nodeN previously served a different data storage,
- // which is not served by anybody anymore.
- removeDatanode(nodeN);
- // physically remove node from datanodeMap
- wipeDatanode(nodeN);
- nodeN = null;
- }
- if (nodeS != null) {
- if (nodeN == nodeS) {
- // The same datanode has been just restarted to serve the same data
- // storage. We do not need to remove old data blocks, the delta will
- // be calculated on the next block report from the datanode
- NameNode.stateChangeLog.debug("BLOCK* NameSystem.registerDatanode: "
- + "node restarted.");
- } else {
- // nodeS is found
- /* The registering datanode is a replacement node for the existing
- data storage, which from now on will be served by a new node.
- If this message repeats, both nodes might have same storageID
- by (insanely rare) random chance. User needs to restart one of the
- nodes with its data cleared (or user can just remove the StorageID
- value in "VERSION" file under the data directory of the datanode,
- but this is might not work if VERSION file format has changed
- */
- NameNode.stateChangeLog.info( "BLOCK* NameSystem.registerDatanode: "
- + "node " + nodeS.getName()
- + " is replaced by " + nodeReg.getName() +
- " with the same storageID " +
- nodeReg.getStorageID());
- }
- // update cluster map
- clusterMap.remove(nodeS);
- nodeS.updateRegInfo(nodeReg);
- nodeS.setHostName(hostName);
-
- // resolve network location
- resolveNetworkLocation(nodeS);
- clusterMap.add(nodeS);
-
- // also treat the registration message as a heartbeat
- synchronized(heartbeats) {
- if( !heartbeats.contains(nodeS)) {
- heartbeats.add(nodeS);
- //update its timestamp
- nodeS.updateHeartbeat(0L, 0L, 0L, 0);
- nodeS.isAlive = true;
- }
- }
- return;
- }
- // this is a new datanode serving a new data storage
- if (nodeReg.getStorageID().equals("")) {
- // this data storage has never been registered
- // it is either empty or was created by pre-storageID version of DFS
- nodeReg.storageID = newStorageID();
- NameNode.stateChangeLog.debug(
- "BLOCK* NameSystem.registerDatanode: "
- + "new storageID " + nodeReg.getStorageID() + " assigned.");
- }
- // register new datanode
- DatanodeDescriptor nodeDescr
- = new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK, hostName);
- resolveNetworkLocation(nodeDescr);
- unprotectedAddDatanode(nodeDescr);
- clusterMap.add(nodeDescr);
-
- // also treat the registration message as a heartbeat
- synchronized(heartbeats) {
- heartbeats.add(nodeDescr);
- nodeDescr.isAlive = true;
- // no need to update its timestamp
- // because its is done when the descriptor is created
- }
- return;
- }
-
- /* Resolve a node's network location */
- private void resolveNetworkLocation (DatanodeDescriptor node) {
- List<String> names = new ArrayList<String>(1);
- if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
- // get the node's IP address
- names.add(node.getHost());
- } else {
- // get the node's host name
- String hostName = node.getHostName();
- int colon = hostName.indexOf(":");
- hostName = (colon==-1)?hostName:hostName.substring(0,colon);
- names.add(hostName);
- }
-
- // resolve its network location
- List<String> rName = dnsToSwitchMapping.resolve(names);
- String networkLocation;
- if (rName == null) {
- LOG.error("The resolve call returned null! Using " +
- NetworkTopology.DEFAULT_RACK + " for host " + names);
- networkLocation = NetworkTopology.DEFAULT_RACK;
- } else {
- networkLocation = rName.get(0);
- }
- node.setNetworkLocation(networkLocation);
- }
-
- /**
- * Get registrationID for datanodes based on the namespaceID.
- *
- * @see #registerDatanode(DatanodeRegistration)
- * @see FSImage#newNamespaceID()
- * @return registration ID
- */
- public String getRegistrationID() {
- return Storage.getRegistrationID(dir.fsImage);
- }
-
- /**
- * Generate new storage ID.
- *
- * @return unique storage ID
- *
- * Note: that collisions are still possible if somebody will try
- * to bring in a data storage from a different cluster.
- */
- private String newStorageID() {
- String newID = null;
- while(newID == null) {
- newID = "DS" + Integer.toString(r.nextInt());
- if (datanodeMap.get(newID) != null)
- newID = null;
- }
- return newID;
- }
-
- private boolean isDatanodeDead(DatanodeDescriptor node) {
- return (node.getLastUpdate() <
- (now() - heartbeatExpireInterval));
- }
-
- private void setDatanodeDead(DatanodeDescriptor node) throws IOException {
- node.setLastUpdate(0);
- }
- /**
- * The given node has reported in. This method should:
- * 1) Record the heartbeat, so the datanode isn't timed out
- * 2) Adjust usage stats for future block allocation
- *
- * If a substantial amount of time passed since the last datanode
- * heartbeat then request an immediate block report.
- *
- * @return an array of datanode commands
- * @throws IOException
- */
- DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
- long capacity, long dfsUsed, long remaining,
- int xceiverCount, int xmitsInProgress) throws IOException {
- DatanodeCommand cmd = null;
- synchronized (heartbeats) {
- synchronized (datanodeMap) {
- DatanodeDescriptor nodeinfo = null;
- try {
- nodeinfo = getDatanode(nodeReg);
- } catch(UnregisteredDatanodeException e) {
- return new DatanodeCommand[]{DatanodeCommand.REGISTER};
- }
-
- // Check if this datanode should actually be shutdown instead.
- if (nodeinfo != null && shouldNodeShutdown(nodeinfo)) {
- setDatanodeDead(nodeinfo);
- throw new DisallowedDatanodeException(nodeinfo);
- }
- if (nodeinfo == null || !nodeinfo.isAlive) {
- return new DatanodeCommand[]{DatanodeCommand.REGISTER};
- }
- updateStats(nodeinfo, false);
- nodeinfo.updateHeartbeat(capacity, dfsUsed, remaining, xceiverCount);
- updateStats(nodeinfo, true);
-
- //check lease recovery
- cmd = nodeinfo.getLeaseRecoveryCommand(Integer.MAX_VALUE);
- if (cmd != null) {
- return new DatanodeCommand[] {cmd};
- }
-
- ArrayList<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>(2);
- //check pending replication
- cmd = nodeinfo.getReplicationCommand(
- maxReplicationStreams - xmitsInProgress);
- if (cmd != null) {
- cmds.add(cmd);
- }
- //check block invalidation
- cmd = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
- if (cmd != null) {
- cmds.add(cmd);
- }
- if (!cmds.isEmpty()) {
- return cmds.toArray(new DatanodeCommand[cmds.size()]);
- }
- }
- }
- //check distributed upgrade
- cmd = getDistributedUpgradeCommand();
- if (cmd != null) {
- return new DatanodeCommand[] {cmd};
- }
- return null;
- }
- private void updateStats(DatanodeDescriptor node, boolean isAdded) {
- //
- // The statistics are protected by the heartbeat lock
- //
- assert(Thread.holdsLock(heartbeats));
- if (isAdded) {
- capacityTotal += node.getCapacity();
- capacityUsed += node.getDfsUsed();
- capacityRemaining += node.getRemaining();
- totalLoad += node.getXceiverCount();
- } else {
- capacityTotal -= node.getCapacity();
- capacityUsed -= node.getDfsUsed();
- capacityRemaining -= node.getRemaining();
- totalLoad -= node.getXceiverCount();
- }
- }
- /**
- * Periodically calls heartbeatCheck().
- */
- class HeartbeatMonitor implements Runnable {
- /**
- */
- public void run() {
- while (fsRunning) {
- try {
- heartbeatCheck();
- } catch (Exception e) {
- FSNamesystem.LOG.error(StringUtils.stringifyException(e));
- }
- try {
- Thread.sleep(heartbeatRecheckInterval);
- } catch (InterruptedException ie) {
- }
- }
- }
- }
- /**
- * Periodically calls computeReplicationWork().
- */
- class ReplicationMonitor implements Runnable {
- static final int INVALIDATE_WORK_PCT_PER_ITERATION = 32;
- static final float REPLICATION_WORK_MULTIPLIER_PER_ITERATION = 2;
- public void run() {
- while (fsRunning) {
- try {
- computeDatanodeWork();
- processPendingReplications();
- Thread.sleep(replicationRecheckInterval);
- } catch (InterruptedException ie) {
- LOG.warn("ReplicationMonitor thread received InterruptedException." + ie);
- break;
- } catch (IOException ie) {
- LOG.warn("ReplicationMonitor thread received exception. " + ie);
- } catch (Throwable t) {
- LOG.warn("ReplicationMonitor thread received Runtime exception. " + t);
- Runtime.getRuntime().exit(-1);
- }
- }
- }
- }
- /////////////////////////////////////////////////////////
- //
- // These methods are called by the Namenode system, to see
- // if there is any work for registered datanodes.
- //
- /////////////////////////////////////////////////////////
- /**
- * Compute block replication and block invalidation work
- * that can be scheduled on data-nodes.
- * The datanode will be informed of this work at the next heartbeat.
- *
- * @return number of blocks scheduled for replication or removal.
- */
- public int computeDatanodeWork() throws IOException {
- int workFound = 0;
- int blocksToProcess = 0;
- int nodesToProcess = 0;
- // blocks should not be replicated or removed if safe mode is on
- if (isInSafeMode())
- return workFound;
- synchronized(heartbeats) {
- blocksToProcess = (int)(heartbeats.size()
- * ReplicationMonitor.REPLICATION_WORK_MULTIPLIER_PER_ITERATION);
- nodesToProcess = (int)Math.ceil((double)heartbeats.size()
- * ReplicationMonitor.INVALIDATE_WORK_PCT_PER_ITERATION / 100);
- }
- workFound = computeReplicationWork(blocksToProcess);
-
- // Update FSNamesystemMetrics counters
- synchronized (this) {
- pendingReplicationBlocksCount = pendingReplications.size();
- underReplicatedBlocksCount = neededReplications.size();
- scheduledReplicationBlocksCount = workFound;
- corruptReplicaBlocksCount = corruptReplicas.size();
- }
-
- workFound += computeInvalidateWork(nodesToProcess);