UpgradeManagerDatanode.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:6k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfs.server.datanode;
  19. import java.io.IOException;
  20. import org.apache.hadoop.hdfs.protocol.FSConstants;
  21. import org.apache.hadoop.hdfs.server.common.HdfsConstants;
  22. import org.apache.hadoop.hdfs.server.common.UpgradeManager;
  23. import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
  24. import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
  25. import org.apache.hadoop.util.Daemon;
  26. /**
  27.  * Upgrade manager for data-nodes.
  28.  *
  29.  * Distributed upgrades for a data-node are performed in a separate thread.
  30.  * The upgrade starts when the data-node receives the start upgrade command
  31.  * from the namenode. At that point the manager finds a respective upgrade
  32.  * object and starts a daemon in order to perform the upgrade defined by the 
  33.  * object.
  34.  */
  35. class UpgradeManagerDatanode extends UpgradeManager {
  36.   DataNode dataNode = null;
  37.   Daemon upgradeDaemon = null;
  38.   UpgradeManagerDatanode(DataNode dataNode) {
  39.     super();
  40.     this.dataNode = dataNode;
  41.   }
  42.   public HdfsConstants.NodeType getType() {
  43.     return HdfsConstants.NodeType.DATA_NODE;
  44.   }
  45.   synchronized void initializeUpgrade(NamespaceInfo nsInfo) throws IOException {
  46.     if( ! super.initializeUpgrade())
  47.       return; // distr upgrade is not needed
  48.     DataNode.LOG.info("n   Distributed upgrade for DataNode " 
  49.         + dataNode.dnRegistration.getName() 
  50.         + " version " + getUpgradeVersion() + " to current LV " 
  51.         + FSConstants.LAYOUT_VERSION + " is initialized.");
  52.     UpgradeObjectDatanode curUO = (UpgradeObjectDatanode)currentUpgrades.first();
  53.     curUO.setDatanode(dataNode);
  54.     upgradeState = curUO.preUpgradeAction(nsInfo);
  55.     // upgradeState is true if the data-node should start the upgrade itself
  56.   }
  57.   /**
  58.    * Start distributed upgrade.
  59.    * Instantiates distributed upgrade objects.
  60.    * 
  61.    * @return true if distributed upgrade is required or false otherwise
  62.    * @throws IOException
  63.    */
  64.   public synchronized boolean startUpgrade() throws IOException {
  65.     if(upgradeState) {  // upgrade is already in progress
  66.       assert currentUpgrades != null : 
  67.         "UpgradeManagerDatanode.currentUpgrades is null.";
  68.       UpgradeObjectDatanode curUO = (UpgradeObjectDatanode)currentUpgrades.first();
  69.       curUO.startUpgrade();
  70.       return true;
  71.     }
  72.     if(broadcastCommand != null) {
  73.       if(broadcastCommand.getVersion() > this.getUpgradeVersion()) {
  74.         // stop broadcasting, the cluster moved on
  75.         // start upgrade for the next version
  76.         broadcastCommand = null;
  77.       } else {
  78.         // the upgrade has been finished by this data-node,
  79.         // but the cluster is still running it, 
  80.         // reply with the broadcast command
  81.         assert currentUpgrades == null : 
  82.           "UpgradeManagerDatanode.currentUpgrades is not null.";
  83.         assert upgradeDaemon == null : 
  84.           "UpgradeManagerDatanode.upgradeDaemon is not null.";
  85.         dataNode.namenode.processUpgradeCommand(broadcastCommand);
  86.         return true;
  87.       }
  88.     }
  89.     if(currentUpgrades == null)
  90.       currentUpgrades = getDistributedUpgrades();
  91.     if(currentUpgrades == null) {
  92.       DataNode.LOG.info("n   Distributed upgrade for DataNode version " 
  93.           + getUpgradeVersion() + " to current LV " 
  94.           + FSConstants.LAYOUT_VERSION + " cannot be started. "
  95.           + "The upgrade object is not defined.");
  96.       return false;
  97.     }
  98.     upgradeState = true;
  99.     UpgradeObjectDatanode curUO = (UpgradeObjectDatanode)currentUpgrades.first();
  100.     curUO.setDatanode(dataNode);
  101.     curUO.startUpgrade();
  102.     upgradeDaemon = new Daemon(curUO);
  103.     upgradeDaemon.start();
  104.     DataNode.LOG.info("n   Distributed upgrade for DataNode " 
  105.         + dataNode.dnRegistration.getName() 
  106.         + " version " + getUpgradeVersion() + " to current LV " 
  107.         + FSConstants.LAYOUT_VERSION + " is started.");
  108.     return true;
  109.   }
  110.   synchronized void processUpgradeCommand(UpgradeCommand command
  111.                                           ) throws IOException {
  112.     assert command.getAction() == UpgradeCommand.UC_ACTION_START_UPGRADE :
  113.       "Only start upgrade action can be processed at this time.";
  114.     this.upgradeVersion = command.getVersion();
  115.     // Start distributed upgrade
  116.     if(startUpgrade()) // upgrade started
  117.       return;
  118.     throw new IOException(
  119.         "Distributed upgrade for DataNode " + dataNode.dnRegistration.getName() 
  120.         + " version " + getUpgradeVersion() + " to current LV " 
  121.         + FSConstants.LAYOUT_VERSION + " cannot be started. "
  122.         + "The upgrade object is not defined.");
  123.   }
  124.   public synchronized void completeUpgrade() throws IOException {
  125.     assert currentUpgrades != null : 
  126.       "UpgradeManagerDatanode.currentUpgrades is null.";
  127.     UpgradeObjectDatanode curUO = (UpgradeObjectDatanode)currentUpgrades.first();
  128.     broadcastCommand = curUO.completeUpgrade();
  129.     upgradeState = false;
  130.     currentUpgrades = null;
  131.     upgradeDaemon = null;
  132.     DataNode.LOG.info("n   Distributed upgrade for DataNode " 
  133.         + dataNode.dnRegistration.getName() 
  134.         + " version " + getUpgradeVersion() + " to current LV " 
  135.         + FSConstants.LAYOUT_VERSION + " is complete.");
  136.   }
  137.   synchronized void shutdownUpgrade() {
  138.     if(upgradeDaemon != null)
  139.       upgradeDaemon.interrupt();
  140.   }
  141. }