UpgradeManagerNamenode.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:6k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfs.server.namenode;
  19. import java.util.SortedSet;
  20. import java.io.IOException;
  21. import org.apache.hadoop.hdfs.protocol.FSConstants;
  22. import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction;
  23. import org.apache.hadoop.hdfs.server.common.HdfsConstants;
  24. import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
  25. import org.apache.hadoop.hdfs.server.common.UpgradeManager;
  26. import org.apache.hadoop.hdfs.server.common.UpgradeObjectCollection;
  27. import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
  28. import org.apache.hadoop.hdfs.server.common.Upgradeable;
  29. import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
  30. /**
  31.  * Upgrade manager for name-nodes.
  32.  *
  33.  * Distributed upgrades for a name-node starts when the safe mode conditions 
  34.  * are met and the name-node is about to exit it.
  35.  * At this point the name-node enters manual safe mode which will remain
  36.  * on until the upgrade is completed.
  37.  * After that the name-nodes processes upgrade commands from data-nodes
  38.  * and updates its status.
  39.  */
  40. class UpgradeManagerNamenode extends UpgradeManager {
  41.   public HdfsConstants.NodeType getType() {
  42.     return HdfsConstants.NodeType.NAME_NODE;
  43.   }
  44.   /**
  45.    * Start distributed upgrade.
  46.    * Instantiates distributed upgrade objects.
  47.    * 
  48.    * @return true if distributed upgrade is required or false otherwise
  49.    * @throws IOException
  50.    */
  51.   public synchronized boolean startUpgrade() throws IOException {
  52.     if(!upgradeState) {
  53.       initializeUpgrade();
  54.       if(!upgradeState) return false;
  55.       // write new upgrade state into disk
  56.       FSNamesystem.getFSNamesystem().getFSImage().writeAll();
  57.     }
  58.     assert currentUpgrades != null : "currentUpgrades is null";
  59.     this.broadcastCommand = currentUpgrades.first().startUpgrade();
  60.     NameNode.LOG.info("n   Distributed upgrade for NameNode version " 
  61.         + getUpgradeVersion() + " to current LV " 
  62.         + FSConstants.LAYOUT_VERSION + " is started.");
  63.     return true;
  64.   }
  65.   synchronized UpgradeCommand processUpgradeCommand(UpgradeCommand command
  66.                                                     ) throws IOException {
  67.     NameNode.LOG.debug("n   Distributed upgrade for NameNode version " 
  68.         + getUpgradeVersion() + " to current LV " 
  69.         + FSConstants.LAYOUT_VERSION + " is processing upgrade command: "
  70.         + command.getAction() + " status = " + getUpgradeStatus() + "%");
  71.     if(currentUpgrades == null) {
  72.       NameNode.LOG.info("Ignoring upgrade command: " 
  73.           + command.getAction() + " version " + command.getVersion()
  74.           + ". No distributed upgrades are currently running on the NameNode");
  75.       return null;
  76.     }
  77.     UpgradeObjectNamenode curUO = (UpgradeObjectNamenode)currentUpgrades.first();
  78.     if(command.getVersion() != curUO.getVersion())
  79.       throw new IncorrectVersionException(command.getVersion(), 
  80.           "UpgradeCommand", curUO.getVersion());
  81.     UpgradeCommand reply = curUO.processUpgradeCommand(command);
  82.     if(curUO.getUpgradeStatus() < 100) {
  83.       return reply;
  84.     }
  85.     // current upgrade is done
  86.     curUO.completeUpgrade();
  87.     NameNode.LOG.info("n   Distributed upgrade for NameNode version " 
  88.         + curUO.getVersion() + " to current LV " 
  89.         + FSConstants.LAYOUT_VERSION + " is complete.");
  90.     // proceede with the next one
  91.     currentUpgrades.remove(curUO);
  92.     if(currentUpgrades.isEmpty()) { // all upgrades are done
  93.       completeUpgrade();
  94.     } else {  // start next upgrade
  95.       curUO = (UpgradeObjectNamenode)currentUpgrades.first();
  96.       this.broadcastCommand = curUO.startUpgrade();
  97.     }
  98.     return reply;
  99.   }
  100.   public synchronized void completeUpgrade() throws IOException {
  101.     // set and write new upgrade state into disk
  102.     setUpgradeState(false, FSConstants.LAYOUT_VERSION);
  103.     FSNamesystem.getFSNamesystem().getFSImage().writeAll();
  104.     currentUpgrades = null;
  105.     broadcastCommand = null;
  106.     FSNamesystem.getFSNamesystem().leaveSafeMode(false);
  107.   }
  108.   UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action 
  109.                                                 ) throws IOException {
  110.     boolean isFinalized = false;
  111.     if(currentUpgrades == null) { // no upgrades are in progress
  112.       FSImage fsimage = FSNamesystem.getFSNamesystem().getFSImage();
  113.       isFinalized = fsimage.isUpgradeFinalized();
  114.       if(isFinalized) // upgrade is finalized
  115.         return null;  // nothing to report
  116.       return new UpgradeStatusReport(fsimage.getLayoutVersion(), 
  117.                                      (short)101, isFinalized);
  118.     }
  119.     UpgradeObjectNamenode curUO = (UpgradeObjectNamenode)currentUpgrades.first();
  120.     boolean details = false;
  121.     switch(action) {
  122.     case GET_STATUS:
  123.       break;
  124.     case DETAILED_STATUS:
  125.       details = true;
  126.       break;
  127.     case FORCE_PROCEED:
  128.       curUO.forceProceed();
  129.     }
  130.     return curUO.getUpgradeStatusReport(details);
  131.   }
  132.   public static void main(String[] args) throws IOException {
  133.     UpgradeManagerNamenode um = new UpgradeManagerNamenode();
  134.     SortedSet<Upgradeable> uos;
  135.     uos = UpgradeObjectCollection.getDistributedUpgrades(-4, 
  136.         HdfsConstants.NodeType.NAME_NODE);
  137.     System.out.println(uos.size());
  138.     um.startUpgrade();
  139.   }
  140. }