UpgradeObjectDatanode.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:5k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfs.server.datanode;
  19. import org.apache.hadoop.hdfs.protocol.FSConstants;
  20. import org.apache.hadoop.hdfs.server.common.HdfsConstants;
  21. import org.apache.hadoop.hdfs.server.common.UpgradeObject;
  22. import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
  23. import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
  24. import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
  25. import org.apache.hadoop.util.StringUtils;
  26. import java.io.IOException;
  27. import java.net.SocketTimeoutException;
  28. /**
  29.  * Base class for data-node upgrade objects.
  30.  * Data-node upgrades are run in separate threads.
  31.  */
  32. public abstract class UpgradeObjectDatanode extends UpgradeObject implements Runnable {
  33.   private DataNode dataNode = null;
  34.   public HdfsConstants.NodeType getType() {
  35.     return HdfsConstants.NodeType.DATA_NODE;
  36.   }
  37.   protected DataNode getDatanode() {
  38.     return dataNode;
  39.   }
  40.   void setDatanode(DataNode dataNode) {
  41.     this.dataNode = dataNode;
  42.   }
  43.   /**
  44.    * Specifies how the upgrade is performed. 
  45.    * @throws IOException
  46.    */
  47.   public abstract void doUpgrade() throws IOException;
  48.   /**
  49.    * Specifies what to do before the upgrade is started.
  50.    * 
  51.    * The default implementation checks whether the data-node missed the upgrade
  52.    * and throws an exception if it did. This leads to the data-node shutdown.
  53.    * 
  54.    * Data-nodes usually start distributed upgrade when the name-node replies
  55.    * to its heartbeat with a start upgrade command.
  56.    * Sometimes though, e.g. when a data-node missed the upgrade and wants to
  57.    * catchup with the rest of the cluster, it is necessary to initiate the 
  58.    * upgrade directly on the data-node, since the name-node might not ever 
  59.    * start it. An override of this method should then return true.
  60.    * And the upgrade will start after data-ndoe registration but before sending
  61.    * its first heartbeat.
  62.    * 
  63.    * @param nsInfo name-node versions, verify that the upgrade
  64.    * object can talk to this name-node version if necessary.
  65.    * 
  66.    * @throws IOException
  67.    * @return true if data-node itself should start the upgrade or 
  68.    * false if it should wait until the name-node starts the upgrade.
  69.    */
  70.   boolean preUpgradeAction(NamespaceInfo nsInfo) throws IOException {
  71.     int nsUpgradeVersion = nsInfo.getDistributedUpgradeVersion();
  72.     if(nsUpgradeVersion >= getVersion())
  73.       return false; // name-node will perform the upgrade
  74.     // Missed the upgrade. Report problem to the name-node and throw exception
  75.     String errorMsg = 
  76.               "n   Data-node missed a distributed upgrade and will shutdown."
  77.             + "n   " + getDescription() + "."
  78.             + " Name-node version = " + nsInfo.getLayoutVersion() + ".";
  79.     DataNode.LOG.fatal( errorMsg );
  80.     try {
  81.       dataNode.namenode.errorReport(dataNode.dnRegistration,
  82.                                     DatanodeProtocol.NOTIFY, errorMsg);
  83.     } catch(SocketTimeoutException e) {  // namenode is busy
  84.       DataNode.LOG.info("Problem connecting to server: " 
  85.                         + dataNode.getNameNodeAddr());
  86.     }
  87.     throw new IOException(errorMsg);
  88.   }
  89.   public void run() {
  90.     assert dataNode != null : "UpgradeObjectDatanode.dataNode is null";
  91.     while(dataNode.shouldRun) {
  92.       try {
  93.         doUpgrade();
  94.       } catch(Exception e) {
  95.         DataNode.LOG.error(StringUtils.stringifyException(e));
  96.       }
  97.       break;
  98.     }
  99.     // report results
  100.     if(getUpgradeStatus() < 100) {
  101.       DataNode.LOG.info("n   Distributed upgrade for DataNode version " 
  102.           + getVersion() + " to current LV " 
  103.           + FSConstants.LAYOUT_VERSION + " cannot be completed.");
  104.     }
  105.     // Complete the upgrade by calling the manager method
  106.     try {
  107.       dataNode.upgradeManager.completeUpgrade();
  108.     } catch(IOException e) {
  109.       DataNode.LOG.error(StringUtils.stringifyException(e));
  110.     }
  111.   }
  112.   /**
  113.    * Complete upgrade and return a status complete command for broadcasting.
  114.    * 
  115.    * Data-nodes finish upgrade at different times.
  116.    * The data-node needs to re-confirm with the name-node that the upgrade
  117.    * is complete while other nodes are still upgrading.
  118.    */
  119.   public UpgradeCommand completeUpgrade() throws IOException {
  120.     return new UpgradeCommand(UpgradeCommand.UC_ACTION_REPORT_STATUS,
  121.                               getVersion(), (short)100);
  122.   }
  123. }