DataNode.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:58k
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.apache.hadoop.hdfs.server.datanode;
- import java.io.BufferedOutputStream;
- import java.io.DataOutputStream;
- import java.io.File;
- import java.io.IOException;
- import java.io.OutputStream;
- import java.net.InetSocketAddress;
- import java.net.ServerSocket;
- import java.net.Socket;
- import java.net.SocketTimeoutException;
- import java.net.UnknownHostException;
- import java.nio.channels.ServerSocketChannel;
- import java.nio.channels.SocketChannel;
- import java.security.NoSuchAlgorithmException;
- import java.security.SecureRandom;
- import java.util.AbstractList;
- import java.util.ArrayList;
- import java.util.Arrays;
- import java.util.HashMap;
- import java.util.LinkedList;
- import java.util.List;
- import java.util.Map;
- import java.util.Random;
- import java.util.concurrent.atomic.AtomicInteger;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.conf.Configured;
- import org.apache.hadoop.hdfs.HDFSPolicyProvider;
- import org.apache.hadoop.hdfs.protocol.Block;
- import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
- import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
- import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
- import org.apache.hadoop.hdfs.protocol.DatanodeID;
- import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
- import org.apache.hadoop.hdfs.protocol.FSConstants;
- import org.apache.hadoop.hdfs.protocol.LocatedBlock;
- import org.apache.hadoop.hdfs.protocol.UnregisteredDatanodeException;
- import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
- import org.apache.hadoop.hdfs.server.common.HdfsConstants;
- import org.apache.hadoop.hdfs.server.common.GenerationStamp;
- import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
- import org.apache.hadoop.hdfs.server.common.Storage;
- import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
- import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
- import org.apache.hadoop.hdfs.server.namenode.FileChecksumServlets;
- import org.apache.hadoop.hdfs.server.namenode.NameNode;
- import org.apache.hadoop.hdfs.server.namenode.StreamFile;
- import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
- import org.apache.hadoop.hdfs.server.protocol.BlockMetaDataInfo;
- import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
- import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
- import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
- import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
- import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
- import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
- import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
- import org.apache.hadoop.http.HttpServer;
- import org.apache.hadoop.io.IOUtils;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.ipc.RPC;
- import org.apache.hadoop.ipc.RemoteException;
- import org.apache.hadoop.ipc.Server;
- import org.apache.hadoop.net.DNS;
- import org.apache.hadoop.net.NetUtils;
- import org.apache.hadoop.security.SecurityUtil;
- import org.apache.hadoop.security.authorize.ConfiguredPolicy;
- import org.apache.hadoop.security.authorize.PolicyProvider;
- import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
- import org.apache.hadoop.util.Daemon;
- import org.apache.hadoop.util.DiskChecker;
- import org.apache.hadoop.util.ReflectionUtils;
- import org.apache.hadoop.util.StringUtils;
- import org.apache.hadoop.util.DiskChecker.DiskErrorException;
- import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
- /**********************************************************
- * DataNode is a class (and program) that stores a set of
- * blocks for a DFS deployment. A single deployment can
- * have one or many DataNodes. Each DataNode communicates
- * regularly with a single NameNode. It also communicates
- * with client code and other DataNodes from time to time.
- *
- * DataNodes store a series of named blocks. The DataNode
- * allows client code to read these blocks, or to write new
- * block data. The DataNode may also, in response to instructions
- * from its NameNode, delete blocks or copy blocks to/from other
- * DataNodes.
- *
- * The DataNode maintains just one critical table:
- * block-> stream of bytes (of BLOCK_SIZE or less)
- *
- * This info is stored on a local disk. The DataNode
- * reports the table's contents to the NameNode upon startup
- * and every so often afterwards.
- *
- * DataNodes spend their lives in an endless loop of asking
- * the NameNode for something to do. A NameNode cannot connect
- * to a DataNode directly; a NameNode simply returns values from
- * functions invoked by a DataNode.
- *
- * DataNodes maintain an open server socket so that client code
- * or other DataNodes can read/write data. The host/port for
- * this server is reported to the NameNode, which then sends that
- * information to clients or other DataNodes that might be interested.
- *
- **********************************************************/
- public class DataNode extends Configured
- implements InterDatanodeProtocol, ClientDatanodeProtocol, FSConstants, Runnable {
- public static final Log LOG = LogFactory.getLog(DataNode.class);
-
- static{
- Configuration.addDefaultResource("hdfs-default.xml");
- Configuration.addDefaultResource("hdfs-site.xml");
- }
- public static final String DN_CLIENTTRACE_FORMAT =
- "src: %s" + // src IP
- ", dest: %s" + // dst IP
- ", bytes: %s" + // byte count
- ", op: %s" + // operation
- ", cliID: %s" + // DFSClient id
- ", srvID: %s" + // DatanodeRegistration
- ", blockid: %s"; // block id
- static final Log ClientTraceLog =
- LogFactory.getLog(DataNode.class.getName() + ".clienttrace");
- /**
- * Use {@link NetUtils#createSocketAddr(String)} instead.
- */
- @Deprecated
- public static InetSocketAddress createSocketAddr(String target
- ) throws IOException {
- return NetUtils.createSocketAddr(target);
- }
-
- public DatanodeProtocol namenode = null;
- public FSDatasetInterface data = null;
- public DatanodeRegistration dnRegistration = null;
- volatile boolean shouldRun = true;
- private LinkedList<Block> receivedBlockList = new LinkedList<Block>();
- /** list of blocks being recovered */
- private final Map<Block, Block> ongoingRecovery = new HashMap<Block, Block>();
- private LinkedList<String> delHints = new LinkedList<String>();
- public final static String EMPTY_DEL_HINT = "";
- AtomicInteger xmitsInProgress = new AtomicInteger();
- Daemon dataXceiverServer = null;
- ThreadGroup threadGroup = null;
- long blockReportInterval;
- //disallow the sending of BR before instructed to do so
- long lastBlockReport = 0;
- boolean resetBlockReportTime = true;
- long initialBlockReportDelay = BLOCKREPORT_INITIAL_DELAY * 1000L;
- long lastHeartbeat = 0;
- long heartBeatInterval;
- private DataStorage storage = null;
- private HttpServer infoServer = null;
- DataNodeMetrics myMetrics;
- private static InetSocketAddress nameNodeAddr;
- private InetSocketAddress selfAddr;
- private static DataNode datanodeObject = null;
- private Thread dataNodeThread = null;
- String machineName;
- private static String dnThreadName;
- int socketTimeout;
- int socketWriteTimeout = 0;
- boolean transferToAllowed = true;
- int writePacketSize = 0;
-
- public DataBlockScanner blockScanner = null;
- public Daemon blockScannerThread = null;
-
- private static final Random R = new Random();
-
- // For InterDataNodeProtocol
- public Server ipcServer;
- /**
- * Current system time.
- * @return current time in msec.
- */
- static long now() {
- return System.currentTimeMillis();
- }
- /**
- * Create the DataNode given a configuration and an array of dataDirs.
- * 'dataDirs' is where the blocks are stored.
- */
- DataNode(Configuration conf,
- AbstractList<File> dataDirs) throws IOException {
- super(conf);
- datanodeObject = this;
- try {
- startDataNode(conf, dataDirs);
- } catch (IOException ie) {
- shutdown();
- throw ie;
- }
- }
-
-
- /**
- * This method starts the data node with the specified conf.
- *
- * @param conf - the configuration
- * if conf's CONFIG_PROPERTY_SIMULATED property is set
- * then a simulated storage based data node is created.
- *
- * @param dataDirs - only for a non-simulated storage data node
- * @throws IOException
- */
- void startDataNode(Configuration conf,
- AbstractList<File> dataDirs
- ) throws IOException {
- // use configured nameserver & interface to get local hostname
- if (conf.get("slave.host.name") != null) {
- machineName = conf.get("slave.host.name");
- }
- if (machineName == null) {
- machineName = DNS.getDefaultHost(
- conf.get("dfs.datanode.dns.interface","default"),
- conf.get("dfs.datanode.dns.nameserver","default"));
- }
- InetSocketAddress nameNodeAddr = NameNode.getAddress(conf);
-
- this.socketTimeout = conf.getInt("dfs.socket.timeout",
- HdfsConstants.READ_TIMEOUT);
- this.socketWriteTimeout = conf.getInt("dfs.datanode.socket.write.timeout",
- HdfsConstants.WRITE_TIMEOUT);
- /* Based on results on different platforms, we might need set the default
- * to false on some of them. */
- this.transferToAllowed = conf.getBoolean("dfs.datanode.transferTo.allowed",
- true);
- this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024);
- String address =
- NetUtils.getServerAddress(conf,
- "dfs.datanode.bindAddress",
- "dfs.datanode.port",
- "dfs.datanode.address");
- InetSocketAddress socAddr = NetUtils.createSocketAddr(address);
- int tmpPort = socAddr.getPort();
- storage = new DataStorage();
- // construct registration
- this.dnRegistration = new DatanodeRegistration(machineName + ":" + tmpPort);
- // connect to name node
- this.namenode = (DatanodeProtocol)
- RPC.waitForProxy(DatanodeProtocol.class,
- DatanodeProtocol.versionID,
- nameNodeAddr,
- conf);
- // get version and id info from the name-node
- NamespaceInfo nsInfo = handshake();
- StartupOption startOpt = getStartupOption(conf);
- assert startOpt != null : "Startup option must be set.";
-
- boolean simulatedFSDataset =
- conf.getBoolean("dfs.datanode.simulateddatastorage", false);
- if (simulatedFSDataset) {
- setNewStorageID(dnRegistration);
- dnRegistration.storageInfo.layoutVersion = FSConstants.LAYOUT_VERSION;
- dnRegistration.storageInfo.namespaceID = nsInfo.namespaceID;
- // it would have been better to pass storage as a parameter to
- // constructor below - need to augment ReflectionUtils used below.
- conf.set("StorageId", dnRegistration.getStorageID());
- try {
- //Equivalent of following (can't do because Simulated is in test dir)
- // this.data = new SimulatedFSDataset(conf);
- this.data = (FSDatasetInterface) ReflectionUtils.newInstance(
- Class.forName("org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset"), conf);
- } catch (ClassNotFoundException e) {
- throw new IOException(StringUtils.stringifyException(e));
- }
- } else { // real storage
- // read storage info, lock data dirs and transition fs state if necessary
- storage.recoverTransitionRead(nsInfo, dataDirs, startOpt);
- // adjust
- this.dnRegistration.setStorageInfo(storage);
- // initialize data node internal structure
- this.data = new FSDataset(storage, conf);
- }
-
- // find free port
- ServerSocket ss = (socketWriteTimeout > 0) ?
- ServerSocketChannel.open().socket() : new ServerSocket();
- Server.bind(ss, socAddr, 0);
- ss.setReceiveBufferSize(DEFAULT_DATA_SOCKET_SIZE);
- // adjust machine name with the actual port
- tmpPort = ss.getLocalPort();
- selfAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(),
- tmpPort);
- this.dnRegistration.setName(machineName + ":" + tmpPort);
- LOG.info("Opened info server at " + tmpPort);
-
- this.threadGroup = new ThreadGroup("dataXceiverServer");
- this.dataXceiverServer = new Daemon(threadGroup,
- new DataXceiverServer(ss, conf, this));
- this.threadGroup.setDaemon(true); // auto destroy when empty
- this.blockReportInterval =
- conf.getLong("dfs.blockreport.intervalMsec", BLOCKREPORT_INTERVAL);
- this.initialBlockReportDelay = conf.getLong("dfs.blockreport.initialDelay",
- BLOCKREPORT_INITIAL_DELAY)* 1000L;
- if (this.initialBlockReportDelay >= blockReportInterval) {
- this.initialBlockReportDelay = 0;
- LOG.info("dfs.blockreport.initialDelay is greater than " +
- "dfs.blockreport.intervalMsec." + " Setting initial delay to 0 msec:");
- }
- this.heartBeatInterval = conf.getLong("dfs.heartbeat.interval", HEARTBEAT_INTERVAL) * 1000L;
- DataNode.nameNodeAddr = nameNodeAddr;
- //initialize periodic block scanner
- String reason = null;
- if (conf.getInt("dfs.datanode.scan.period.hours", 0) < 0) {
- reason = "verification is turned off by configuration";
- } else if ( !(data instanceof FSDataset) ) {
- reason = "verifcation is supported only with FSDataset";
- }
- if ( reason == null ) {
- blockScanner = new DataBlockScanner(this, (FSDataset)data, conf);
- } else {
- LOG.info("Periodic Block Verification is disabled because " +
- reason + ".");
- }
- //create a servlet to serve full-file content
- String infoAddr =
- NetUtils.getServerAddress(conf,
- "dfs.datanode.info.bindAddress",
- "dfs.datanode.info.port",
- "dfs.datanode.http.address");
- InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(infoAddr);
- String infoHost = infoSocAddr.getHostName();
- int tmpInfoPort = infoSocAddr.getPort();
- this.infoServer = new HttpServer("datanode", infoHost, tmpInfoPort,
- tmpInfoPort == 0, conf);
- if (conf.getBoolean("dfs.https.enable", false)) {
- boolean needClientAuth = conf.getBoolean("dfs.https.need.client.auth", false);
- InetSocketAddress secInfoSocAddr = NetUtils.createSocketAddr(conf.get(
- "dfs.datanode.https.address", infoHost + ":" + 0));
- Configuration sslConf = new Configuration(false);
- sslConf.addResource(conf.get("dfs.https.server.keystore.resource",
- "ssl-server.xml"));
- this.infoServer.addSslListener(secInfoSocAddr, sslConf, needClientAuth);
- }
- this.infoServer.addInternalServlet(null, "/streamFile/*", StreamFile.class);
- this.infoServer.addInternalServlet(null, "/getFileChecksum/*",
- FileChecksumServlets.GetServlet.class);
- this.infoServer.setAttribute("datanode.blockScanner", blockScanner);
- this.infoServer.addServlet(null, "/blockScannerReport",
- DataBlockScanner.Servlet.class);
- this.infoServer.start();
- // adjust info port
- this.dnRegistration.setInfoPort(this.infoServer.getPort());
- myMetrics = new DataNodeMetrics(conf, dnRegistration.getStorageID());
-
- // set service-level authorization security policy
- if (conf.getBoolean(
- ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) {
- PolicyProvider policyProvider =
- (PolicyProvider)(ReflectionUtils.newInstance(
- conf.getClass(PolicyProvider.POLICY_PROVIDER_CONFIG,
- HDFSPolicyProvider.class, PolicyProvider.class),
- conf));
- SecurityUtil.setPolicy(new ConfiguredPolicy(conf, policyProvider));
- }
- //init ipc server
- InetSocketAddress ipcAddr = NetUtils.createSocketAddr(
- conf.get("dfs.datanode.ipc.address"));
- ipcServer = RPC.getServer(this, ipcAddr.getHostName(), ipcAddr.getPort(),
- conf.getInt("dfs.datanode.handler.count", 3), false, conf);
- ipcServer.start();
- dnRegistration.setIpcPort(ipcServer.getListenerAddress().getPort());
- LOG.info("dnRegistration = " + dnRegistration);
- }
- /**
- * Creates either NIO or regular depending on socketWriteTimeout.
- */
- protected Socket newSocket() throws IOException {
- return (socketWriteTimeout > 0) ?
- SocketChannel.open().socket() : new Socket();
- }
-
- private NamespaceInfo handshake() throws IOException {
- NamespaceInfo nsInfo = new NamespaceInfo();
- while (shouldRun) {
- try {
- nsInfo = namenode.versionRequest();
- break;
- } catch(SocketTimeoutException e) { // namenode is busy
- LOG.info("Problem connecting to server: " + getNameNodeAddr());
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ie) {}
- }
- }
- String errorMsg = null;
- // verify build version
- if( ! nsInfo.getBuildVersion().equals( Storage.getBuildVersion() )) {
- errorMsg = "Incompatible build versions: namenode BV = "
- + nsInfo.getBuildVersion() + "; datanode BV = "
- + Storage.getBuildVersion();
- LOG.fatal( errorMsg );
- try {
- namenode.errorReport( dnRegistration,
- DatanodeProtocol.NOTIFY, errorMsg );
- } catch( SocketTimeoutException e ) { // namenode is busy
- LOG.info("Problem connecting to server: " + getNameNodeAddr());
- }
- throw new IOException( errorMsg );
- }
- assert FSConstants.LAYOUT_VERSION == nsInfo.getLayoutVersion() :
- "Data-node and name-node layout versions must be the same."
- + "Expected: "+ FSConstants.LAYOUT_VERSION + " actual "+ nsInfo.getLayoutVersion();
- return nsInfo;
- }
- /** Return the DataNode object
- *
- */
- public static DataNode getDataNode() {
- return datanodeObject;
- }
- public static InterDatanodeProtocol createInterDataNodeProtocolProxy(
- DatanodeID datanodeid, Configuration conf) throws IOException {
- InetSocketAddress addr = NetUtils.createSocketAddr(
- datanodeid.getHost() + ":" + datanodeid.getIpcPort());
- if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
- InterDatanodeProtocol.LOG.info("InterDatanodeProtocol addr=" + addr);
- }
- return (InterDatanodeProtocol)RPC.getProxy(InterDatanodeProtocol.class,
- InterDatanodeProtocol.versionID, addr, conf);
- }
- public InetSocketAddress getNameNodeAddr() {
- return nameNodeAddr;
- }
-
- public InetSocketAddress getSelfAddr() {
- return selfAddr;
- }
-
- DataNodeMetrics getMetrics() {
- return myMetrics;
- }
-
- /**
- * Return the namenode's identifier
- */
- public String getNamenode() {
- //return namenode.toString();
- return "<namenode>";
- }
- public static void setNewStorageID(DatanodeRegistration dnReg) {
- /* Return
- * "DS-randInt-ipaddr-currentTimeMillis"
- * It is considered extermely rare for all these numbers to match
- * on a different machine accidentally for the following
- * a) SecureRandom(INT_MAX) is pretty much random (1 in 2 billion), and
- * b) Good chance ip address would be different, and
- * c) Even on the same machine, Datanode is designed to use different ports.
- * d) Good chance that these are started at different times.
- * For a confict to occur all the 4 above have to match!.
- * The format of this string can be changed anytime in future without
- * affecting its functionality.
- */
- String ip = "unknownIP";
- try {
- ip = DNS.getDefaultIP("default");
- } catch (UnknownHostException ignored) {
- LOG.warn("Could not find ip address of "default" inteface.");
- }
-
- int rand = 0;
- try {
- rand = SecureRandom.getInstance("SHA1PRNG").nextInt(Integer.MAX_VALUE);
- } catch (NoSuchAlgorithmException e) {
- LOG.warn("Could not use SecureRandom");
- rand = R.nextInt(Integer.MAX_VALUE);
- }
- dnReg.storageID = "DS-" + rand + "-"+ ip + "-" + dnReg.getPort() + "-" +
- System.currentTimeMillis();
- }
- /**
- * Register datanode
- * <p>
- * The datanode needs to register with the namenode on startup in order
- * 1) to report which storage it is serving now and
- * 2) to receive a registrationID
- * issued by the namenode to recognize registered datanodes.
- *
- * @see FSNamesystem#registerDatanode(DatanodeRegistration)
- * @throws IOException
- */
- private void register() throws IOException {
- if (dnRegistration.getStorageID().equals("")) {
- setNewStorageID(dnRegistration);
- }
- while(shouldRun) {
- try {
- // reset name to machineName. Mainly for web interface.
- dnRegistration.name = machineName + ":" + dnRegistration.getPort();
- dnRegistration = namenode.register(dnRegistration);
- break;
- } catch(SocketTimeoutException e) { // namenode is busy
- LOG.info("Problem connecting to server: " + getNameNodeAddr());
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ie) {}
- }
- }
- assert ("".equals(storage.getStorageID())
- && !"".equals(dnRegistration.getStorageID()))
- || storage.getStorageID().equals(dnRegistration.getStorageID()) :
- "New storageID can be assigned only if data-node is not formatted";
- if (storage.getStorageID().equals("")) {
- storage.setStorageID(dnRegistration.getStorageID());
- storage.writeAll();
- LOG.info("New storage id " + dnRegistration.getStorageID()
- + " is assigned to data-node " + dnRegistration.getName());
- }
- if(! storage.getStorageID().equals(dnRegistration.getStorageID())) {
- throw new IOException("Inconsistent storage IDs. Name-node returned "
- + dnRegistration.getStorageID()
- + ". Expecting " + storage.getStorageID());
- }
-
- // random short delay - helps scatter the BR from all DNs
- scheduleBlockReport(initialBlockReportDelay);
- }
- /**
- * Shut down this instance of the datanode.
- * Returns only after shutdown is complete.
- * This method can only be called by the offerService thread.
- * Otherwise, deadlock might occur.
- */
- public void shutdown() {
- if (infoServer != null) {
- try {
- infoServer.stop();
- } catch (Exception e) {
- LOG.warn("Exception shutting down DataNode", e);
- }
- }
- if (ipcServer != null) {
- ipcServer.stop();
- }
- this.shouldRun = false;
- if (dataXceiverServer != null) {
- ((DataXceiverServer) this.dataXceiverServer.getRunnable()).kill();
- this.dataXceiverServer.interrupt();
- // wait for all data receiver threads to exit
- if (this.threadGroup != null) {
- while (true) {
- this.threadGroup.interrupt();
- LOG.info("Waiting for threadgroup to exit, active threads is " +
- this.threadGroup.activeCount());
- if (this.threadGroup.activeCount() == 0) {
- break;
- }
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {}
- }
- }
- // wait for dataXceiveServer to terminate
- try {
- this.dataXceiverServer.join();
- } catch (InterruptedException ie) {
- }
- }
-
- RPC.stopProxy(namenode); // stop the RPC threads
-
- if(upgradeManager != null)
- upgradeManager.shutdownUpgrade();
- if (blockScannerThread != null) {
- blockScannerThread.interrupt();
- try {
- blockScannerThread.join(3600000L); // wait for at most 1 hour
- } catch (InterruptedException ie) {
- }
- }
- if (storage != null) {
- try {
- this.storage.unlockAll();
- } catch (IOException ie) {
- }
- }
- if (dataNodeThread != null) {
- dataNodeThread.interrupt();
- try {
- dataNodeThread.join();
- } catch (InterruptedException ie) {
- }
- }
- if (data != null) {
- data.shutdown();
- }
- if (myMetrics != null) {
- myMetrics.shutdown();
- }
- }
-
-
- /* Check if there is no space in disk or the disk is read-only
- * when IOException occurs.
- * If so, handle the error */
- protected void checkDiskError( IOException e ) throws IOException {
- if (e.getMessage() != null &&
- e.getMessage().startsWith("No space left on device")) {
- throw new DiskOutOfSpaceException("No space left on device");
- } else {
- checkDiskError();
- }
- }
-
- /* Check if there is no disk space and if so, handle the error*/
- protected void checkDiskError( ) throws IOException {
- try {
- data.checkDataDir();
- } catch(DiskErrorException de) {
- handleDiskError(de.getMessage());
- }
- }
-
- private void handleDiskError(String errMsgr) {
- LOG.warn("DataNode is shutting down.n" + errMsgr);
- shouldRun = false;
- try {
- namenode.errorReport(
- dnRegistration, DatanodeProtocol.DISK_ERROR, errMsgr);
- } catch(IOException ignored) {
- }
- }
-
- /** Number of concurrent xceivers per node. */
- int getXceiverCount() {
- return threadGroup == null ? 0 : threadGroup.activeCount();
- }
-
- /**
- * Main loop for the DataNode. Runs until shutdown,
- * forever calling remote NameNode functions.
- */
- public void offerService() throws Exception {
-
- LOG.info("using BLOCKREPORT_INTERVAL of " + blockReportInterval + "msec" +
- " Initial delay: " + initialBlockReportDelay + "msec");
- //
- // Now loop for a long time....
- //
- while (shouldRun) {
- try {
- long startTime = now();
- //
- // Every so often, send heartbeat or block-report
- //
-
- if (startTime - lastHeartbeat > heartBeatInterval) {
- //
- // All heartbeat messages include following info:
- // -- Datanode name
- // -- data transfer port
- // -- Total capacity
- // -- Bytes remaining
- //
- lastHeartbeat = startTime;
- DatanodeCommand[] cmds = namenode.sendHeartbeat(dnRegistration,
- data.getCapacity(),
- data.getDfsUsed(),
- data.getRemaining(),
- xmitsInProgress.get(),
- getXceiverCount());
- myMetrics.heartbeats.inc(now() - startTime);
- //LOG.info("Just sent heartbeat, with name " + localName);
- if (!processCommand(cmds))
- continue;
- }
-
- // check if there are newly received blocks
- Block [] blockArray=null;
- String [] delHintArray=null;
- synchronized(receivedBlockList) {
- synchronized(delHints) {
- int numBlocks = receivedBlockList.size();
- if (numBlocks > 0) {
- if(numBlocks!=delHints.size()) {
- LOG.warn("Panic: receiveBlockList and delHints are not of the same length" );
- }
- //
- // Send newly-received blockids to namenode
- //
- blockArray = receivedBlockList.toArray(new Block[numBlocks]);
- delHintArray = delHints.toArray(new String[numBlocks]);
- }
- }
- }
- if (blockArray != null) {
- if(delHintArray == null || delHintArray.length != blockArray.length ) {
- LOG.warn("Panic: block array & delHintArray are not the same" );
- }
- namenode.blockReceived(dnRegistration, blockArray, delHintArray);
- synchronized (receivedBlockList) {
- synchronized (delHints) {
- for(int i=0; i<blockArray.length; i++) {
- receivedBlockList.remove(blockArray[i]);
- delHints.remove(delHintArray[i]);
- }
- }
- }
- }
- // send block report
- if (startTime - lastBlockReport > blockReportInterval) {
- //
- // Send latest blockinfo report if timer has expired.
- // Get back a list of local block(s) that are obsolete
- // and can be safely GC'ed.
- //
- long brStartTime = now();
- Block[] bReport = data.getBlockReport();
- DatanodeCommand cmd = namenode.blockReport(dnRegistration,
- BlockListAsLongs.convertToArrayLongs(bReport));
- long brTime = now() - brStartTime;
- myMetrics.blockReports.inc(brTime);
- LOG.info("BlockReport of " + bReport.length +
- " blocks got processed in " + brTime + " msecs");
- //
- // If we have sent the first block report, then wait a random
- // time before we start the periodic block reports.
- //
- if (resetBlockReportTime) {
- lastBlockReport = startTime - R.nextInt((int)(blockReportInterval));
- resetBlockReportTime = false;
- } else {
- /* say the last block report was at 8:20:14. The current report
- * should have started around 9:20:14 (default 1 hour interval).
- * If current time is :
- * 1) normal like 9:20:18, next report should be at 10:20:14
- * 2) unexpected like 11:35:43, next report should be at 12:20:14
- */
- lastBlockReport += (now() - lastBlockReport) /
- blockReportInterval * blockReportInterval;
- }
- processCommand(cmd);
- }
- // start block scanner
- if (blockScanner != null && blockScannerThread == null &&
- upgradeManager.isUpgradeCompleted()) {
- LOG.info("Starting Periodic block scanner.");
- blockScannerThread = new Daemon(blockScanner);
- blockScannerThread.start();
- }
-
- //
- // There is no work to do; sleep until hearbeat timer elapses,
- // or work arrives, and then iterate again.
- //
- long waitTime = heartBeatInterval - (System.currentTimeMillis() - lastHeartbeat);
- synchronized(receivedBlockList) {
- if (waitTime > 0 && receivedBlockList.size() == 0) {
- try {
- receivedBlockList.wait(waitTime);
- } catch (InterruptedException ie) {
- }
- }
- } // synchronized
- } catch(RemoteException re) {
- String reClass = re.getClassName();
- if (UnregisteredDatanodeException.class.getName().equals(reClass) ||
- DisallowedDatanodeException.class.getName().equals(reClass) ||
- IncorrectVersionException.class.getName().equals(reClass)) {
- LOG.warn("DataNode is shutting down: " +
- StringUtils.stringifyException(re));
- shutdown();
- return;
- }
- LOG.warn(StringUtils.stringifyException(re));
- } catch (IOException e) {
- LOG.warn(StringUtils.stringifyException(e));
- }
- } // while (shouldRun)
- } // offerService
- /**
- * Process an array of datanode commands
- *
- * @param cmds an array of datanode commands
- * @return true if further processing may be required or false otherwise.
- */
- private boolean processCommand(DatanodeCommand[] cmds) {
- if (cmds != null) {
- for (DatanodeCommand cmd : cmds) {
- try {
- if (processCommand(cmd) == false) {
- return false;
- }
- } catch (IOException ioe) {
- LOG.warn("Error processing datanode Command", ioe);
- }
- }
- }
- return true;
- }
-
- /**
- *
- * @param cmd
- * @return true if further processing may be required or false otherwise.
- * @throws IOException
- */
- private boolean processCommand(DatanodeCommand cmd) throws IOException {
- if (cmd == null)
- return true;
- final BlockCommand bcmd = cmd instanceof BlockCommand? (BlockCommand)cmd: null;
- switch(cmd.getAction()) {
- case DatanodeProtocol.DNA_TRANSFER:
- // Send a copy of a block to another datanode
- transferBlocks(bcmd.getBlocks(), bcmd.getTargets());
- myMetrics.blocksReplicated.inc(bcmd.getBlocks().length);
- break;
- case DatanodeProtocol.DNA_INVALIDATE:
- //
- // Some local block(s) are obsolete and can be
- // safely garbage-collected.
- //
- Block toDelete[] = bcmd.getBlocks();
- try {
- if (blockScanner != null) {
- blockScanner.deleteBlocks(toDelete);
- }
- data.invalidate(toDelete);
- } catch(IOException e) {
- checkDiskError();
- throw e;
- }
- myMetrics.blocksRemoved.inc(toDelete.length);
- break;
- case DatanodeProtocol.DNA_SHUTDOWN:
- // shut down the data node
- this.shutdown();
- return false;
- case DatanodeProtocol.DNA_REGISTER:
- // namenode requested a registration - at start or if NN lost contact
- LOG.info("DatanodeCommand action: DNA_REGISTER");
- if (shouldRun) {
- register();
- }
- break;
- case DatanodeProtocol.DNA_FINALIZE:
- storage.finalizeUpgrade();
- break;
- case UpgradeCommand.UC_ACTION_START_UPGRADE:
- // start distributed upgrade here
- processDistributedUpgradeCommand((UpgradeCommand)cmd);
- break;
- case DatanodeProtocol.DNA_RECOVERBLOCK:
- recoverBlocks(bcmd.getBlocks(), bcmd.getTargets());
- break;
- default:
- LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
- }
- return true;
- }
- // Distributed upgrade manager
- UpgradeManagerDatanode upgradeManager = new UpgradeManagerDatanode(this);
- private void processDistributedUpgradeCommand(UpgradeCommand comm
- ) throws IOException {
- assert upgradeManager != null : "DataNode.upgradeManager is null.";
- upgradeManager.processUpgradeCommand(comm);
- }
- /**
- * Start distributed upgrade if it should be initiated by the data-node.
- */
- private void startDistributedUpgradeIfNeeded() throws IOException {
- UpgradeManagerDatanode um = DataNode.getDataNode().upgradeManager;
- assert um != null : "DataNode.upgradeManager is null.";
- if(!um.getUpgradeState())
- return;
- um.setUpgradeState(false, um.getUpgradeVersion());
- um.startUpgrade();
- return;
- }
- private void transferBlock( Block block,
- DatanodeInfo xferTargets[]
- ) throws IOException {
- if (!data.isValidBlock(block)) {
- // block does not exist or is under-construction
- String errStr = "Can't send invalid block " + block;
- LOG.info(errStr);
- namenode.errorReport(dnRegistration,
- DatanodeProtocol.INVALID_BLOCK,
- errStr);
- return;
- }
- // Check if NN recorded length matches on-disk length
- long onDiskLength = data.getLength(block);
- if (block.getNumBytes() > onDiskLength) {
- // Shorter on-disk len indicates corruption so report NN the corrupt block
- namenode.reportBadBlocks(new LocatedBlock[]{
- new LocatedBlock(block, new DatanodeInfo[] {
- new DatanodeInfo(dnRegistration)})});
- LOG.info("Can't replicate block " + block
- + " because on-disk length " + onDiskLength
- + " is shorter than NameNode recorded length " + block.getNumBytes());
- return;
- }
-
- int numTargets = xferTargets.length;
- if (numTargets > 0) {
- if (LOG.isInfoEnabled()) {
- StringBuilder xfersBuilder = new StringBuilder();
- for (int i = 0; i < numTargets; i++) {
- xfersBuilder.append(xferTargets[i].getName());
- xfersBuilder.append(" ");
- }
- LOG.info(dnRegistration + " Starting thread to transfer block " +
- block + " to " + xfersBuilder);
- }
- new Daemon(new DataTransfer(xferTargets, block, this)).start();
- }
- }
- private void transferBlocks( Block blocks[],
- DatanodeInfo xferTargets[][]
- ) {
- for (int i = 0; i < blocks.length; i++) {
- try {
- transferBlock(blocks[i], xferTargets[i]);
- } catch (IOException ie) {
- LOG.warn("Failed to transfer block " + blocks[i], ie);
- }
- }
- }
- /*
- * Informing the name node could take a long long time! Should we wait
- * till namenode is informed before responding with success to the
- * client? For now we don't.
- */
- protected void notifyNamenodeReceivedBlock(Block block, String delHint) {
- if(block==null || delHint==null) {
- throw new IllegalArgumentException(block==null?"Block is null":"delHint is null");
- }
- synchronized (receivedBlockList) {
- synchronized (delHints) {
- receivedBlockList.add(block);
- delHints.add(delHint);
- receivedBlockList.notifyAll();
- }
- }
- }
-
- /* ********************************************************************
- Protocol when a client reads data from Datanode (Cur Ver: 9):
-
- Client's Request :
- =================
-
- Processed in DataXceiver:
- +----------------------------------------------+
- | Common Header | 1 byte OP == OP_READ_BLOCK |
- +----------------------------------------------+
-
- Processed in readBlock() :
- +-------------------------------------------------------------------------+
- | 8 byte Block ID | 8 byte genstamp | 8 byte start offset | 8 byte length |
- +-------------------------------------------------------------------------+
- | vInt length | <DFSClient id> |
- +-----------------------------------+
-
- Client sends optional response only at the end of receiving data.
-
- DataNode Response :
- ===================
-
- In readBlock() :
- If there is an error while initializing BlockSender :
- +---------------------------+
- | 2 byte OP_STATUS_ERROR | and connection will be closed.
- +---------------------------+
- Otherwise
- +---------------------------+
- | 2 byte OP_STATUS_SUCCESS |
- +---------------------------+
-
- Actual data, sent by BlockSender.sendBlock() :
-
- ChecksumHeader :
- +--------------------------------------------------+
- | 1 byte CHECKSUM_TYPE | 4 byte BYTES_PER_CHECKSUM |
- +--------------------------------------------------+
- Followed by actual data in the form of PACKETS:
- +------------------------------------+
- | Sequence of data PACKETs .... |
- +------------------------------------+
-
- A "PACKET" is defined further below.
-
- The client reads data until it receives a packet with
- "LastPacketInBlock" set to true or with a zero length. If there is
- no checksum error, it replies to DataNode with OP_STATUS_CHECKSUM_OK:
-
- Client optional response at the end of data transmission :
- +------------------------------+
- | 2 byte OP_STATUS_CHECKSUM_OK |
- +------------------------------+
-
- PACKET : Contains a packet header, checksum and data. Amount of data
- ======== carried is set by BUFFER_SIZE.
-
- +-----------------------------------------------------+
- | 4 byte packet length (excluding packet header) |
- +-----------------------------------------------------+
- | 8 byte offset in the block | 8 byte sequence number |
- +-----------------------------------------------------+
- | 1 byte isLastPacketInBlock |
- +-----------------------------------------------------+
- | 4 byte Length of actual data |
- +-----------------------------------------------------+
- | x byte checksum data. x is defined below |
- +-----------------------------------------------------+
- | actual data ...... |
- +-----------------------------------------------------+
-
- x = (length of data + BYTE_PER_CHECKSUM - 1)/BYTES_PER_CHECKSUM *
- CHECKSUM_SIZE
-
- CHECKSUM_SIZE depends on CHECKSUM_TYPE (usually, 4 for CRC32)
-
- The above packet format is used while writing data to DFS also.
- Not all the fields might be used while reading.
-
- ************************************************************************ */
-
- /** Header size for a packet */
- public static final int PKT_HEADER_LEN = ( 4 + /* Packet payload length */
- 8 + /* offset in block */
- 8 + /* seqno */
- 1 /* isLastPacketInBlock */);
-
- /**
- * Used for transferring a block of data. This class
- * sends a piece of data to another DataNode.
- */
- class DataTransfer implements Runnable {
- DatanodeInfo targets[];
- Block b;
- DataNode datanode;
- /**
- * Connect to the first item in the target list. Pass along the
- * entire target list, the block, and the data.
- */
- public DataTransfer(DatanodeInfo targets[], Block b, DataNode datanode) throws IOException {
- this.targets = targets;
- this.b = b;
- this.datanode = datanode;
- }
- /**
- * Do the deed, write the bytes
- */
- public void run() {
- xmitsInProgress.getAndIncrement();
- Socket sock = null;
- DataOutputStream out = null;
- BlockSender blockSender = null;
-
- try {
- InetSocketAddress curTarget =
- NetUtils.createSocketAddr(targets[0].getName());
- sock = newSocket();
- NetUtils.connect(sock, curTarget, socketTimeout);
- sock.setSoTimeout(targets.length * socketTimeout);
- long writeTimeout = socketWriteTimeout +
- HdfsConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1);
- OutputStream baseStream = NetUtils.getOutputStream(sock, writeTimeout);
- out = new DataOutputStream(new BufferedOutputStream(baseStream,
- SMALL_BUFFER_SIZE));
- blockSender = new BlockSender(b, 0, b.getNumBytes(), false, false, false,
- datanode);
- DatanodeInfo srcNode = new DatanodeInfo(dnRegistration);
- //
- // Header info
- //
- out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
- out.writeByte(DataTransferProtocol.OP_WRITE_BLOCK);
- out.writeLong(b.getBlockId());
- out.writeLong(b.getGenerationStamp());
- out.writeInt(0); // no pipelining
- out.writeBoolean(false); // not part of recovery
- Text.writeString(out, ""); // client
- out.writeBoolean(true); // sending src node information
- srcNode.write(out); // Write src node DatanodeInfo
- // write targets
- out.writeInt(targets.length - 1);
- for (int i = 1; i < targets.length; i++) {
- targets[i].write(out);
- }
- // send data & checksum
- blockSender.sendBlock(out, baseStream, null);
- // no response necessary
- LOG.info(dnRegistration + ":Transmitted block " + b + " to " + curTarget);
- } catch (IOException ie) {
- LOG.warn(dnRegistration + ":Failed to transfer " + b + " to " + targets[0].getName()
- + " got " + StringUtils.stringifyException(ie));
- } finally {
- xmitsInProgress.getAndDecrement();
- IOUtils.closeStream(blockSender);
- IOUtils.closeStream(out);
- IOUtils.closeSocket(sock);
- }
- }
- }
- /**
- * No matter what kind of exception we get, keep retrying to offerService().
- * That's the loop that connects to the NameNode and provides basic DataNode
- * functionality.
- *
- * Only stop when "shouldRun" is turned off (which can only happen at shutdown).
- */
- public void run() {
- LOG.info(dnRegistration + "In DataNode.run, data = " + data);
- // start dataXceiveServer
- dataXceiverServer.start();
-
- while (shouldRun) {
- try {
- startDistributedUpgradeIfNeeded();
- offerService();
- } catch (Exception ex) {
- LOG.error("Exception: " + StringUtils.stringifyException(ex));
- if (shouldRun) {
- try {
- Thread.sleep(5000);
- } catch (InterruptedException ie) {
- }
- }
- }
- }
-
- LOG.info(dnRegistration + ":Finishing DataNode in: "+data);
- shutdown();
- }
-
- /** Start a single datanode daemon and wait for it to finish.
- * If this thread is specifically interrupted, it will stop waiting.
- */
- public static void runDatanodeDaemon(DataNode dn) throws IOException {
- if (dn != null) {
- //register datanode
- dn.register();
- dn.dataNodeThread = new Thread(dn, dnThreadName);
- dn.dataNodeThread.setDaemon(true); // needed for JUnit testing
- dn.dataNodeThread.start();
- }
- }
-
- static boolean isDatanodeUp(DataNode dn) {
- return dn.dataNodeThread != null && dn.dataNodeThread.isAlive();
- }
- /** Instantiate a single datanode object. This must be run by invoking
- * {@link DataNode#runDatanodeDaemon(DataNode)} subsequently.
- */
- public static DataNode instantiateDataNode(String args[],
- Configuration conf) throws IOException {
- if (conf == null)
- conf = new Configuration();
- if (!parseArguments(args, conf)) {
- printUsage();
- return null;
- }
- if (conf.get("dfs.network.script") != null) {
- LOG.error("This configuration for rack identification is not supported" +
- " anymore. RackID resolution is handled by the NameNode.");
- System.exit(-1);
- }
- String[] dataDirs = conf.getStrings("dfs.data.dir");
- dnThreadName = "DataNode: [" +
- StringUtils.arrayToString(dataDirs) + "]";
- return makeInstance(dataDirs, conf);
- }
- /** Instantiate & Start a single datanode daemon and wait for it to finish.
- * If this thread is specifically interrupted, it will stop waiting.
- */
- public static DataNode createDataNode(String args[],
- Configuration conf) throws IOException {
- DataNode dn = instantiateDataNode(args, conf);
- runDatanodeDaemon(dn);
- return dn;
- }
- void join() {
- if (dataNodeThread != null) {
- try {
- dataNodeThread.join();
- } catch (InterruptedException e) {}
- }
- }
- /**
- * Make an instance of DataNode after ensuring that at least one of the
- * given data directories (and their parent directories, if necessary)
- * can be created.
- * @param dataDirs List of directories, where the new DataNode instance should
- * keep its files.
- * @param conf Configuration instance to use.
- * @return DataNode instance for given list of data dirs and conf, or null if
- * no directory from this directory list can be created.
- * @throws IOException
- */
- public static DataNode makeInstance(String[] dataDirs, Configuration conf)
- throws IOException {
- ArrayList<File> dirs = new ArrayList<File>();
- for (int i = 0; i < dataDirs.length; i++) {
- File data = new File(dataDirs[i]);
- try {
- DiskChecker.checkDir(data);
- dirs.add(data);
- } catch(DiskErrorException e) {
- LOG.warn("Invalid directory in dfs.data.dir: " + e.getMessage());
- }
- }
- if (dirs.size() > 0)
- return new DataNode(conf, dirs);
- LOG.error("All directories in dfs.data.dir are invalid.");
- return null;
- }
- @Override
- public String toString() {
- return "DataNode{" +
- "data=" + data +
- ", localName='" + dnRegistration.getName() + "'" +
- ", storageID='" + dnRegistration.getStorageID() + "'" +
- ", xmitsInProgress=" + xmitsInProgress.get() +
- "}";
- }
-
- private static void printUsage() {
- System.err.println("Usage: java DataNode");
- System.err.println(" [-rollback]");
- }
- /**
- * Parse and verify command line arguments and set configuration parameters.
- *
- * @return false if passed argements are incorrect
- */
- private static boolean parseArguments(String args[],
- Configuration conf) {
- int argsLen = (args == null) ? 0 : args.length;
- StartupOption startOpt = StartupOption.REGULAR;
- for(int i=0; i < argsLen; i++) {
- String cmd = args[i];
- if ("-r".equalsIgnoreCase(cmd) || "--rack".equalsIgnoreCase(cmd)) {
- LOG.error("-r, --rack arguments are not supported anymore. RackID " +
- "resolution is handled by the NameNode.");
- System.exit(-1);
- } else if ("-rollback".equalsIgnoreCase(cmd)) {
- startOpt = StartupOption.ROLLBACK;
- } else if ("-regular".equalsIgnoreCase(cmd)) {
- startOpt = StartupOption.REGULAR;
- } else
- return false;
- }
- setStartupOption(conf, startOpt);
- return true;
- }
- private static void setStartupOption(Configuration conf, StartupOption opt) {
- conf.set("dfs.datanode.startup", opt.toString());
- }
- static StartupOption getStartupOption(Configuration conf) {
- return StartupOption.valueOf(conf.get("dfs.datanode.startup",
- StartupOption.REGULAR.toString()));
- }
- /**
- * This methods arranges for the data node to send the block report at the next heartbeat.
- */
- public void scheduleBlockReport(long delay) {
- if (delay > 0) { // send BR after random delay
- lastBlockReport = System.currentTimeMillis()
- - ( blockReportInterval - R.nextInt((int)(delay)));
- } else { // send at next heartbeat
- lastBlockReport = lastHeartbeat - blockReportInterval;
- }
- resetBlockReportTime = true; // reset future BRs for randomness
- }
-
-
- /**
- * This method is used for testing.
- * Examples are adding and deleting blocks directly.
- * The most common usage will be when the data node's storage is similated.
- *
- * @return the fsdataset that stores the blocks
- */
- public FSDatasetInterface getFSDataset() {
- return data;
- }
- /**
- */
- public static void main(String args[]) {
- try {
- StringUtils.startupShutdownMessage(DataNode.class, args, LOG);
- DataNode datanode = createDataNode(args, null);
- if (datanode != null)
- datanode.join();
- } catch (Throwable e) {
- LOG.error(StringUtils.stringifyException(e));
- System.exit(-1);
- }
- }
- // InterDataNodeProtocol implementation
- /** {@inheritDoc} */
- public BlockMetaDataInfo getBlockMetaDataInfo(Block block
- ) throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("block=" + block);
- }
- Block stored = data.getStoredBlock(block.getBlockId());
- if (stored == null) {
- return null;
- }
- BlockMetaDataInfo info = new BlockMetaDataInfo(stored,
- blockScanner.getLastScanTime(stored));
- if (LOG.isDebugEnabled()) {
- LOG.debug("getBlockMetaDataInfo successful block=" + stored +
- " length " + stored.getNumBytes() +
- " genstamp " + stored.getGenerationStamp());
- }
- // paranoia! verify that the contents of the stored block
- // matches the block file on disk.
- data.validateBlockMetadata(stored);
- return info;
- }
- public Daemon recoverBlocks(final Block[] blocks, final DatanodeInfo[][] targets) {
- Daemon d = new Daemon(threadGroup, new Runnable() {
- /** Recover a list of blocks. It is run by the primary datanode. */
- public void run() {
- for(int i = 0; i < blocks.length; i++) {
- try {
- logRecoverBlock("NameNode", blocks[i], targets[i]);
- recoverBlock(blocks[i], false, targets[i], true);
- } catch (IOException e) {
- LOG.warn("recoverBlocks FAILED, blocks[" + i + "]=" + blocks[i], e);
- }
- }
- }
- });
- d.start();
- return d;
- }
- /** {@inheritDoc} */
- public void updateBlock(Block oldblock, Block newblock, boolean finalize) throws IOException {
- LOG.info("oldblock=" + oldblock + "(length=" + oldblock.getNumBytes()
- + "), newblock=" + newblock + "(length=" + newblock.getNumBytes()
- + "), datanode=" + dnRegistration.getName());
- data.updateBlock(oldblock, newblock);
- if (finalize) {
- data.finalizeBlock(newblock);
- myMetrics.blocksWritten.inc();
- notifyNamenodeReceivedBlock(newblock, EMPTY_DEL_HINT);
- LOG.info("Received block " + newblock +
- " of size " + newblock.getNumBytes() +
- " as part of lease recovery.");
- }
- }
- /** {@inheritDoc} */
- public long getProtocolVersion(String protocol, long clientVersion
- ) throws IOException {
- if (protocol.equals(InterDatanodeProtocol.class.getName())) {
- return InterDatanodeProtocol.versionID;
- } else if (protocol.equals(ClientDatanodeProtocol.class.getName())) {
- return ClientDatanodeProtocol.versionID;
- }
- throw new IOException("Unknown protocol to " + getClass().getSimpleName()
- + ": " + protocol);
- }
- /** A convenient class used in lease recovery */
- private static class BlockRecord {
- final DatanodeID id;
- final InterDatanodeProtocol datanode;
- final Block block;
-
- BlockRecord(DatanodeID id, InterDatanodeProtocol datanode, Block block) {
- this.id = id;
- this.datanode = datanode;
- this.block = block;
- }
- /** {@inheritDoc} */
- public String toString() {
- return "block:" + block + " node:" + id;
- }
- }
- /** Recover a block */
- private LocatedBlock recoverBlock(Block block, boolean keepLength,
- DatanodeID[] datanodeids, boolean closeFile) throws IOException {
- // If the block is already being recovered, then skip recovering it.
- // This can happen if the namenode and client start recovering the same
- // file at the same time.
- synchronized (ongoingRecovery) {
- Block tmp = new Block();
- tmp.set(block.getBlockId(), block.getNumBytes(), GenerationStamp.WILDCARD_STAMP);
- if (ongoingRecovery.get(tmp) != null) {
- String msg = "Block " + block + " is already being recovered, " +
- " ignoring this request to recover it.";
- LOG.info(msg);
- throw new IOException(msg);
- }
- ongoingRecovery.put(block, block);
- }
- try {
- List<BlockRecord> syncList = new ArrayList<BlockRecord>();
- long minlength = Long.MAX_VALUE;
- int errorCount = 0;
- //check generation stamps
- for(DatanodeID id : datanodeids) {
- try {
- InterDatanodeProtocol datanode = dnRegistration.equals(id)?
- this: DataNode.createInterDataNodeProtocolProxy(id, getConf());
- BlockMetaDataInfo info = datanode.getBlockMetaDataInfo(block);
- if (info != null && info.getGenerationStamp() >= block.getGenerationStamp()) {
- if (keepLength) {
- if (info.getNumBytes() == block.getNumBytes()) {
- syncList.add(new BlockRecord(id, datanode, new Block(info)));
- }
- }
- else {
- syncList.add(new BlockRecord(id, datanode, new Block(info)));
- if (info.getNumBytes() < minlength) {
- minlength = info.getNumBytes();
- }
- }
- }
- } catch (IOException e) {
- ++errorCount;
- InterDatanodeProtocol.LOG.warn(
- "Failed to getBlockMetaDataInfo for block (=" + block
- + ") from datanode (=" + id + ")", e);
- }
- }
- if (syncList.isEmpty() && errorCount > 0) {
- throw new IOException("All datanodes failed: block=" + block
- + ", datanodeids=" + Arrays.asList(datanodeids));
- }
- if (!keepLength) {
- block.setNumBytes(minlength);
- }
- return syncBlock(block, syncList, closeFile);
- } finally {
- synchronized (ongoingRecovery) {
- ongoingRecovery.remove(block);
- }
- }
- }
- /** Block synchronization */
- private LocatedBlock syncBlock(Block block, List<BlockRecord> syncList,
- boolean closeFile) throws IOException {
- if (LOG.isDebugEnabled()) {
- LOG.debug("block=" + block + ", (length=" + block.getNumBytes()
- + "), syncList=" + syncList + ", closeFile=" + closeFile);
- }
- //syncList.isEmpty() that all datanodes do not have the block
- //so the block can be deleted.
- if (syncList.isEmpty()) {
- namenode.commitBlockSynchronization(block, 0, 0, closeFile, true,
- DatanodeID.EMPTY_ARRAY);
- return null;
- }
- List<DatanodeID> successList = new ArrayList<DatanodeID>();
- long generationstamp = namenode.nextGenerationStamp(block);
- Block newblock = new Block(block.getBlockId(), block.getNumBytes(), generationstamp);
- for(BlockRecord r : syncList) {
- try {
- r.datanode.updateBlock(r.block, newblock, closeFile);
- successList.add(r.id);
- } catch (IOException e) {
- InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock="
- + newblock + ", datanode=" + r.id + ")", e);
- }
- }
- if (!successList.isEmpty()) {
- DatanodeID[] nlist = successList.toArray(new DatanodeID[successList.size()]);
- namenode.commitBlockSynchronization(block,
- newblock.getGenerationStamp(), newblock.getNumBytes(), closeFile, false,
- nlist);
- DatanodeInfo[] info = new DatanodeInfo[nlist.length];
- for (int i = 0; i < nlist.length; i++) {
- info[i] = new DatanodeInfo(nlist[i]);
- }
- return new LocatedBlock(newblock, info); // success
- }
- //failed
- StringBuilder b = new StringBuilder();
- for(BlockRecord r : syncList) {
- b.append("n " + r.id);
- }
- throw new IOException("Cannot recover " + block + ", none of these "
- + syncList.size() + " datanodes success {" + b + "n}");
- }
-
- // ClientDataNodeProtocol implementation
- /** {@inheritDoc} */
- public LocatedBlock recoverBlock(Block block, boolean keepLength, DatanodeInfo[] targets
- ) throws IOException {
- logRecoverBlock("Client", block, targets);
- return recoverBlock(block, keepLength, targets, false);
- }
- private static void logRecoverBlock(String who,
- Block block, DatanodeID[] targets) {
- StringBuilder msg = new StringBuilder(targets[0].getName());
- for (int i = 1; i < targets.length; i++) {
- msg.append(", " + targets[i].getName());
- }
- LOG.info(who + " calls recoverBlock(block=" + block
- + ", targets=[" + msg + "])");
- }
- }