- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.apache.hadoop.hdfs.server.datanode;
- import java.io.IOException;
- import org.apache.hadoop.hdfs.protocol.FSConstants;
- import org.apache.hadoop.hdfs.server.common.HdfsConstants;
- import org.apache.hadoop.hdfs.server.common.UpgradeManager;
- import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
- import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
- import org.apache.hadoop.util.Daemon;
- /**
- * Upgrade manager for data-nodes.
- *
- * Distributed upgrades for a data-node are performed in a separate thread.
- * The upgrade starts when the data-node receives the start upgrade command
- * from the namenode. At that point the manager finds a respective upgrade
- * object and starts a daemon in order to perform the upgrade defined by the
- * object.
- */
- class UpgradeManagerDatanode extends UpgradeManager {
- DataNode dataNode = null;
- Daemon upgradeDaemon = null;
- UpgradeManagerDatanode(DataNode dataNode) {
- super();
- this.dataNode = dataNode;
- }
- public HdfsConstants.NodeType getType() {
- return HdfsConstants.NodeType.DATA_NODE;
- }
- synchronized void initializeUpgrade(NamespaceInfo nsInfo) throws IOException {
- if( ! super.initializeUpgrade())
- return; // distr upgrade is not needed
- DataNode.LOG.info("n Distributed upgrade for DataNode "
- + dataNode.dnRegistration.getName()
- + " version " + getUpgradeVersion() + " to current LV "
- + FSConstants.LAYOUT_VERSION + " is initialized.");
- UpgradeObjectDatanode curUO = (UpgradeObjectDatanode)currentUpgrades.first();
- curUO.setDatanode(dataNode);
- upgradeState = curUO.preUpgradeAction(nsInfo);
- // upgradeState is true if the data-node should start the upgrade itself
- }
- /**
- * Start distributed upgrade.
- * Instantiates distributed upgrade objects.
- *
- * @return true if distributed upgrade is required or false otherwise
- * @throws IOException
- */
- public synchronized boolean startUpgrade() throws IOException {
- if(upgradeState) { // upgrade is already in progress
- assert currentUpgrades != null :
- "UpgradeManagerDatanode.currentUpgrades is null.";
- UpgradeObjectDatanode curUO = (UpgradeObjectDatanode)currentUpgrades.first();
- curUO.startUpgrade();
- return true;
- }
- if(broadcastCommand != null) {
- if(broadcastCommand.getVersion() > this.getUpgradeVersion()) {
- // stop broadcasting, the cluster moved on
- // start upgrade for the next version
- broadcastCommand = null;
- } else {
- // the upgrade has been finished by this data-node,
- // but the cluster is still running it,
- // reply with the broadcast command
- assert currentUpgrades == null :
- "UpgradeManagerDatanode.currentUpgrades is not null.";
- assert upgradeDaemon == null :
- "UpgradeManagerDatanode.upgradeDaemon is not null.";
- dataNode.namenode.processUpgradeCommand(broadcastCommand);
- return true;
- }
- }
- if(currentUpgrades == null)
- currentUpgrades = getDistributedUpgrades();
- if(currentUpgrades == null) {
- DataNode.LOG.info("n Distributed upgrade for DataNode version "
- + getUpgradeVersion() + " to current LV "
- + FSConstants.LAYOUT_VERSION + " cannot be started. "
- + "The upgrade object is not defined.");
- return false;
- }
- upgradeState = true;
- UpgradeObjectDatanode curUO = (UpgradeObjectDatanode)currentUpgrades.first();
- curUO.setDatanode(dataNode);
- curUO.startUpgrade();
- upgradeDaemon = new Daemon(curUO);
- upgradeDaemon.start();
- DataNode.LOG.info("n Distributed upgrade for DataNode "
- + dataNode.dnRegistration.getName()
- + " version " + getUpgradeVersion() + " to current LV "
- + FSConstants.LAYOUT_VERSION + " is started.");
- return true;
- }
- synchronized void processUpgradeCommand(UpgradeCommand command
- ) throws IOException {
- assert command.getAction() == UpgradeCommand.UC_ACTION_START_UPGRADE :
- "Only start upgrade action can be processed at this time.";
- this.upgradeVersion = command.getVersion();
- // Start distributed upgrade
- if(startUpgrade()) // upgrade started
- return;
- throw new IOException(
- "Distributed upgrade for DataNode " + dataNode.dnRegistration.getName()
- + " version " + getUpgradeVersion() + " to current LV "
- + FSConstants.LAYOUT_VERSION + " cannot be started. "
- + "The upgrade object is not defined.");
- }
- public synchronized void completeUpgrade() throws IOException {
- assert currentUpgrades != null :
- "UpgradeManagerDatanode.currentUpgrades is null.";
- UpgradeObjectDatanode curUO = (UpgradeObjectDatanode)currentUpgrades.first();
- broadcastCommand = curUO.completeUpgrade();
- upgradeState = false;
- currentUpgrades = null;
- upgradeDaemon = null;
- DataNode.LOG.info("n Distributed upgrade for DataNode "
- + dataNode.dnRegistration.getName()
- + " version " + getUpgradeVersion() + " to current LV "
- + FSConstants.LAYOUT_VERSION + " is complete.");
- }
- synchronized void shutdownUpgrade() {
- if(upgradeDaemon != null)
- upgradeDaemon.interrupt();
- }
- }