DataNode.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:58k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfs.server.datanode;
  19. import java.io.BufferedOutputStream;
  20. import java.io.DataOutputStream;
  21. import java.io.File;
  22. import java.io.IOException;
  23. import java.io.OutputStream;
  24. import java.net.InetSocketAddress;
  25. import java.net.ServerSocket;
  26. import java.net.Socket;
  27. import java.net.SocketTimeoutException;
  28. import java.net.UnknownHostException;
  29. import java.nio.channels.ServerSocketChannel;
  30. import java.nio.channels.SocketChannel;
  31. import java.security.NoSuchAlgorithmException;
  32. import java.security.SecureRandom;
  33. import java.util.AbstractList;
  34. import java.util.ArrayList;
  35. import java.util.Arrays;
  36. import java.util.HashMap;
  37. import java.util.LinkedList;
  38. import java.util.List;
  39. import java.util.Map;
  40. import java.util.Random;
  41. import java.util.concurrent.atomic.AtomicInteger;
  42. import org.apache.commons.logging.Log;
  43. import org.apache.commons.logging.LogFactory;
  44. import org.apache.hadoop.conf.Configuration;
  45. import org.apache.hadoop.conf.Configured;
  46. import org.apache.hadoop.hdfs.HDFSPolicyProvider;
  47. import org.apache.hadoop.hdfs.protocol.Block;
  48. import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
  49. import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
  50. import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
  51. import org.apache.hadoop.hdfs.protocol.DatanodeID;
  52. import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
  53. import org.apache.hadoop.hdfs.protocol.FSConstants;
  54. import org.apache.hadoop.hdfs.protocol.LocatedBlock;
  55. import org.apache.hadoop.hdfs.protocol.UnregisteredDatanodeException;
  56. import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
  57. import org.apache.hadoop.hdfs.server.common.HdfsConstants;
  58. import org.apache.hadoop.hdfs.server.common.GenerationStamp;
  59. import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
  60. import org.apache.hadoop.hdfs.server.common.Storage;
  61. import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
  62. import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
  63. import org.apache.hadoop.hdfs.server.namenode.FileChecksumServlets;
  64. import org.apache.hadoop.hdfs.server.namenode.NameNode;
  65. import org.apache.hadoop.hdfs.server.namenode.StreamFile;
  66. import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
  67. import org.apache.hadoop.hdfs.server.protocol.BlockMetaDataInfo;
  68. import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
  69. import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
  70. import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
  71. import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
  72. import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
  73. import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
  74. import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
  75. import org.apache.hadoop.http.HttpServer;
  76. import org.apache.hadoop.io.IOUtils;
  77. import org.apache.hadoop.io.Text;
  78. import org.apache.hadoop.ipc.RPC;
  79. import org.apache.hadoop.ipc.RemoteException;
  80. import org.apache.hadoop.ipc.Server;
  81. import org.apache.hadoop.net.DNS;
  82. import org.apache.hadoop.net.NetUtils;
  83. import org.apache.hadoop.security.SecurityUtil;
  84. import org.apache.hadoop.security.authorize.ConfiguredPolicy;
  85. import org.apache.hadoop.security.authorize.PolicyProvider;
  86. import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
  87. import org.apache.hadoop.util.Daemon;
  88. import org.apache.hadoop.util.DiskChecker;
  89. import org.apache.hadoop.util.ReflectionUtils;
  90. import org.apache.hadoop.util.StringUtils;
  91. import org.apache.hadoop.util.DiskChecker.DiskErrorException;
  92. import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
  93. /**********************************************************
  94.  * DataNode is a class (and program) that stores a set of
  95.  * blocks for a DFS deployment.  A single deployment can
  96.  * have one or many DataNodes.  Each DataNode communicates
  97.  * regularly with a single NameNode.  It also communicates
  98.  * with client code and other DataNodes from time to time.
  99.  *
  100.  * DataNodes store a series of named blocks.  The DataNode
  101.  * allows client code to read these blocks, or to write new
  102.  * block data.  The DataNode may also, in response to instructions
  103.  * from its NameNode, delete blocks or copy blocks to/from other
  104.  * DataNodes.
  105.  *
  106.  * The DataNode maintains just one critical table:
  107.  *   block-> stream of bytes (of BLOCK_SIZE or less)
  108.  *
  109.  * This info is stored on a local disk.  The DataNode
  110.  * reports the table's contents to the NameNode upon startup
  111.  * and every so often afterwards.
  112.  *
  113.  * DataNodes spend their lives in an endless loop of asking
  114.  * the NameNode for something to do.  A NameNode cannot connect
  115.  * to a DataNode directly; a NameNode simply returns values from
  116.  * functions invoked by a DataNode.
  117.  *
  118.  * DataNodes maintain an open server socket so that client code 
  119.  * or other DataNodes can read/write data.  The host/port for
  120.  * this server is reported to the NameNode, which then sends that
  121.  * information to clients or other DataNodes that might be interested.
  122.  *
  123.  **********************************************************/
  124. public class DataNode extends Configured 
  125.     implements InterDatanodeProtocol, ClientDatanodeProtocol, FSConstants, Runnable {
  126.   public static final Log LOG = LogFactory.getLog(DataNode.class);
  127.   
  128.   static{
  129.     Configuration.addDefaultResource("hdfs-default.xml");
  130.     Configuration.addDefaultResource("hdfs-site.xml");
  131.   }
  132.   public static final String DN_CLIENTTRACE_FORMAT =
  133.         "src: %s" +      // src IP
  134.         ", dest: %s" +   // dst IP
  135.         ", bytes: %s" +  // byte count
  136.         ", op: %s" +     // operation
  137.         ", cliID: %s" +  // DFSClient id
  138.         ", srvID: %s" +  // DatanodeRegistration
  139.         ", blockid: %s"; // block id
  140.   static final Log ClientTraceLog =
  141.     LogFactory.getLog(DataNode.class.getName() + ".clienttrace");
  142.   /**
  143.    * Use {@link NetUtils#createSocketAddr(String)} instead.
  144.    */
  145.   @Deprecated
  146.   public static InetSocketAddress createSocketAddr(String target
  147.                                                    ) throws IOException {
  148.     return NetUtils.createSocketAddr(target);
  149.   }
  150.   
  151.   public DatanodeProtocol namenode = null;
  152.   public FSDatasetInterface data = null;
  153.   public DatanodeRegistration dnRegistration = null;
  154.   volatile boolean shouldRun = true;
  155.   private LinkedList<Block> receivedBlockList = new LinkedList<Block>();
  156.   /** list of blocks being recovered */
  157.   private final Map<Block, Block> ongoingRecovery = new HashMap<Block, Block>();
  158.   private LinkedList<String> delHints = new LinkedList<String>();
  159.   public final static String EMPTY_DEL_HINT = "";
  160.   AtomicInteger xmitsInProgress = new AtomicInteger();
  161.   Daemon dataXceiverServer = null;
  162.   ThreadGroup threadGroup = null;
  163.   long blockReportInterval;
  164.   //disallow the sending of BR before instructed to do so
  165.   long lastBlockReport = 0;
  166.   boolean resetBlockReportTime = true;
  167.   long initialBlockReportDelay = BLOCKREPORT_INITIAL_DELAY * 1000L;
  168.   long lastHeartbeat = 0;
  169.   long heartBeatInterval;
  170.   private DataStorage storage = null;
  171.   private HttpServer infoServer = null;
  172.   DataNodeMetrics myMetrics;
  173.   private static InetSocketAddress nameNodeAddr;
  174.   private InetSocketAddress selfAddr;
  175.   private static DataNode datanodeObject = null;
  176.   private Thread dataNodeThread = null;
  177.   String machineName;
  178.   private static String dnThreadName;
  179.   int socketTimeout;
  180.   int socketWriteTimeout = 0;  
  181.   boolean transferToAllowed = true;
  182.   int writePacketSize = 0;
  183.   
  184.   public DataBlockScanner blockScanner = null;
  185.   public Daemon blockScannerThread = null;
  186.   
  187.   private static final Random R = new Random();
  188.   
  189.   // For InterDataNodeProtocol
  190.   public Server ipcServer;
  191.   /**
  192.    * Current system time.
  193.    * @return current time in msec.
  194.    */
  195.   static long now() {
  196.     return System.currentTimeMillis();
  197.   }
  198.   /**
  199.    * Create the DataNode given a configuration and an array of dataDirs.
  200.    * 'dataDirs' is where the blocks are stored.
  201.    */
  202.   DataNode(Configuration conf, 
  203.            AbstractList<File> dataDirs) throws IOException {
  204.     super(conf);
  205.     datanodeObject = this;
  206.     try {
  207.       startDataNode(conf, dataDirs);
  208.     } catch (IOException ie) {
  209.       shutdown();
  210.       throw ie;
  211.     }
  212.   }
  213.     
  214.   
  215.   /**
  216.    * This method starts the data node with the specified conf.
  217.    * 
  218.    * @param conf - the configuration
  219.    *  if conf's CONFIG_PROPERTY_SIMULATED property is set
  220.    *  then a simulated storage based data node is created.
  221.    * 
  222.    * @param dataDirs - only for a non-simulated storage data node
  223.    * @throws IOException
  224.    */
  225.   void startDataNode(Configuration conf, 
  226.                      AbstractList<File> dataDirs
  227.                      ) throws IOException {
  228.     // use configured nameserver & interface to get local hostname
  229.     if (conf.get("slave.host.name") != null) {
  230.       machineName = conf.get("slave.host.name");   
  231.     }
  232.     if (machineName == null) {
  233.       machineName = DNS.getDefaultHost(
  234.                                      conf.get("dfs.datanode.dns.interface","default"),
  235.                                      conf.get("dfs.datanode.dns.nameserver","default"));
  236.     }
  237.     InetSocketAddress nameNodeAddr = NameNode.getAddress(conf);
  238.     
  239.     this.socketTimeout =  conf.getInt("dfs.socket.timeout",
  240.                                       HdfsConstants.READ_TIMEOUT);
  241.     this.socketWriteTimeout = conf.getInt("dfs.datanode.socket.write.timeout",
  242.                                           HdfsConstants.WRITE_TIMEOUT);
  243.     /* Based on results on different platforms, we might need set the default 
  244.      * to false on some of them. */
  245.     this.transferToAllowed = conf.getBoolean("dfs.datanode.transferTo.allowed", 
  246.                                              true);
  247.     this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024);
  248.     String address = 
  249.       NetUtils.getServerAddress(conf,
  250.                                 "dfs.datanode.bindAddress", 
  251.                                 "dfs.datanode.port",
  252.                                 "dfs.datanode.address");
  253.     InetSocketAddress socAddr = NetUtils.createSocketAddr(address);
  254.     int tmpPort = socAddr.getPort();
  255.     storage = new DataStorage();
  256.     // construct registration
  257.     this.dnRegistration = new DatanodeRegistration(machineName + ":" + tmpPort);
  258.     // connect to name node
  259.     this.namenode = (DatanodeProtocol) 
  260.       RPC.waitForProxy(DatanodeProtocol.class,
  261.                        DatanodeProtocol.versionID,
  262.                        nameNodeAddr, 
  263.                        conf);
  264.     // get version and id info from the name-node
  265.     NamespaceInfo nsInfo = handshake();
  266.     StartupOption startOpt = getStartupOption(conf);
  267.     assert startOpt != null : "Startup option must be set.";
  268.     
  269.     boolean simulatedFSDataset = 
  270.         conf.getBoolean("dfs.datanode.simulateddatastorage", false);
  271.     if (simulatedFSDataset) {
  272.         setNewStorageID(dnRegistration);
  273.         dnRegistration.storageInfo.layoutVersion = FSConstants.LAYOUT_VERSION;
  274.         dnRegistration.storageInfo.namespaceID = nsInfo.namespaceID;
  275.         // it would have been better to pass storage as a parameter to
  276.         // constructor below - need to augment ReflectionUtils used below.
  277.         conf.set("StorageId", dnRegistration.getStorageID());
  278.         try {
  279.           //Equivalent of following (can't do because Simulated is in test dir)
  280.           //  this.data = new SimulatedFSDataset(conf);
  281.           this.data = (FSDatasetInterface) ReflectionUtils.newInstance(
  282.               Class.forName("org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset"), conf);
  283.         } catch (ClassNotFoundException e) {
  284.           throw new IOException(StringUtils.stringifyException(e));
  285.         }
  286.     } else { // real storage
  287.       // read storage info, lock data dirs and transition fs state if necessary
  288.       storage.recoverTransitionRead(nsInfo, dataDirs, startOpt);
  289.       // adjust
  290.       this.dnRegistration.setStorageInfo(storage);
  291.       // initialize data node internal structure
  292.       this.data = new FSDataset(storage, conf);
  293.     }
  294.       
  295.     // find free port
  296.     ServerSocket ss = (socketWriteTimeout > 0) ? 
  297.           ServerSocketChannel.open().socket() : new ServerSocket();
  298.     Server.bind(ss, socAddr, 0);
  299.     ss.setReceiveBufferSize(DEFAULT_DATA_SOCKET_SIZE); 
  300.     // adjust machine name with the actual port
  301.     tmpPort = ss.getLocalPort();
  302.     selfAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(),
  303.                                      tmpPort);
  304.     this.dnRegistration.setName(machineName + ":" + tmpPort);
  305.     LOG.info("Opened info server at " + tmpPort);
  306.       
  307.     this.threadGroup = new ThreadGroup("dataXceiverServer");
  308.     this.dataXceiverServer = new Daemon(threadGroup, 
  309.         new DataXceiverServer(ss, conf, this));
  310.     this.threadGroup.setDaemon(true); // auto destroy when empty
  311.     this.blockReportInterval =
  312.       conf.getLong("dfs.blockreport.intervalMsec", BLOCKREPORT_INTERVAL);
  313.     this.initialBlockReportDelay = conf.getLong("dfs.blockreport.initialDelay",
  314.                                             BLOCKREPORT_INITIAL_DELAY)* 1000L; 
  315.     if (this.initialBlockReportDelay >= blockReportInterval) {
  316.       this.initialBlockReportDelay = 0;
  317.       LOG.info("dfs.blockreport.initialDelay is greater than " +
  318.         "dfs.blockreport.intervalMsec." + " Setting initial delay to 0 msec:");
  319.     }
  320.     this.heartBeatInterval = conf.getLong("dfs.heartbeat.interval", HEARTBEAT_INTERVAL) * 1000L;
  321.     DataNode.nameNodeAddr = nameNodeAddr;
  322.     //initialize periodic block scanner
  323.     String reason = null;
  324.     if (conf.getInt("dfs.datanode.scan.period.hours", 0) < 0) {
  325.       reason = "verification is turned off by configuration";
  326.     } else if ( !(data instanceof FSDataset) ) {
  327.       reason = "verifcation is supported only with FSDataset";
  328.     } 
  329.     if ( reason == null ) {
  330.       blockScanner = new DataBlockScanner(this, (FSDataset)data, conf);
  331.     } else {
  332.       LOG.info("Periodic Block Verification is disabled because " +
  333.                reason + ".");
  334.     }
  335.     //create a servlet to serve full-file content
  336.     String infoAddr = 
  337.       NetUtils.getServerAddress(conf, 
  338.                               "dfs.datanode.info.bindAddress", 
  339.                               "dfs.datanode.info.port",
  340.                               "dfs.datanode.http.address");
  341.     InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(infoAddr);
  342.     String infoHost = infoSocAddr.getHostName();
  343.     int tmpInfoPort = infoSocAddr.getPort();
  344.     this.infoServer = new HttpServer("datanode", infoHost, tmpInfoPort,
  345.         tmpInfoPort == 0, conf);
  346.     if (conf.getBoolean("dfs.https.enable", false)) {
  347.       boolean needClientAuth = conf.getBoolean("dfs.https.need.client.auth", false);
  348.       InetSocketAddress secInfoSocAddr = NetUtils.createSocketAddr(conf.get(
  349.           "dfs.datanode.https.address", infoHost + ":" + 0));
  350.       Configuration sslConf = new Configuration(false);
  351.       sslConf.addResource(conf.get("dfs.https.server.keystore.resource",
  352.           "ssl-server.xml"));
  353.       this.infoServer.addSslListener(secInfoSocAddr, sslConf, needClientAuth);
  354.     }
  355.     this.infoServer.addInternalServlet(null, "/streamFile/*", StreamFile.class);
  356.     this.infoServer.addInternalServlet(null, "/getFileChecksum/*",
  357.         FileChecksumServlets.GetServlet.class);
  358.     this.infoServer.setAttribute("datanode.blockScanner", blockScanner);
  359.     this.infoServer.addServlet(null, "/blockScannerReport", 
  360.                                DataBlockScanner.Servlet.class);
  361.     this.infoServer.start();
  362.     // adjust info port
  363.     this.dnRegistration.setInfoPort(this.infoServer.getPort());
  364.     myMetrics = new DataNodeMetrics(conf, dnRegistration.getStorageID());
  365.     
  366.     // set service-level authorization security policy
  367.     if (conf.getBoolean(
  368.           ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) {
  369.       PolicyProvider policyProvider = 
  370.         (PolicyProvider)(ReflectionUtils.newInstance(
  371.             conf.getClass(PolicyProvider.POLICY_PROVIDER_CONFIG, 
  372.                 HDFSPolicyProvider.class, PolicyProvider.class), 
  373.             conf));
  374.       SecurityUtil.setPolicy(new ConfiguredPolicy(conf, policyProvider));
  375.     }
  376.     //init ipc server
  377.     InetSocketAddress ipcAddr = NetUtils.createSocketAddr(
  378.         conf.get("dfs.datanode.ipc.address"));
  379.     ipcServer = RPC.getServer(this, ipcAddr.getHostName(), ipcAddr.getPort(), 
  380.         conf.getInt("dfs.datanode.handler.count", 3), false, conf);
  381.     ipcServer.start();
  382.     dnRegistration.setIpcPort(ipcServer.getListenerAddress().getPort());
  383.     LOG.info("dnRegistration = " + dnRegistration);
  384.   }
  385.   /**
  386.    * Creates either NIO or regular depending on socketWriteTimeout.
  387.    */
  388.   protected Socket newSocket() throws IOException {
  389.     return (socketWriteTimeout > 0) ? 
  390.            SocketChannel.open().socket() : new Socket();                                   
  391.   }
  392.   
  393.   private NamespaceInfo handshake() throws IOException {
  394.     NamespaceInfo nsInfo = new NamespaceInfo();
  395.     while (shouldRun) {
  396.       try {
  397.         nsInfo = namenode.versionRequest();
  398.         break;
  399.       } catch(SocketTimeoutException e) {  // namenode is busy
  400.         LOG.info("Problem connecting to server: " + getNameNodeAddr());
  401.         try {
  402.           Thread.sleep(1000);
  403.         } catch (InterruptedException ie) {}
  404.       }
  405.     }
  406.     String errorMsg = null;
  407.     // verify build version
  408.     if( ! nsInfo.getBuildVersion().equals( Storage.getBuildVersion() )) {
  409.       errorMsg = "Incompatible build versions: namenode BV = " 
  410.         + nsInfo.getBuildVersion() + "; datanode BV = "
  411.         + Storage.getBuildVersion();
  412.       LOG.fatal( errorMsg );
  413.       try {
  414.         namenode.errorReport( dnRegistration,
  415.                               DatanodeProtocol.NOTIFY, errorMsg );
  416.       } catch( SocketTimeoutException e ) {  // namenode is busy
  417.         LOG.info("Problem connecting to server: " + getNameNodeAddr());
  418.       }
  419.       throw new IOException( errorMsg );
  420.     }
  421.     assert FSConstants.LAYOUT_VERSION == nsInfo.getLayoutVersion() :
  422.       "Data-node and name-node layout versions must be the same."
  423.       + "Expected: "+ FSConstants.LAYOUT_VERSION + " actual "+ nsInfo.getLayoutVersion();
  424.     return nsInfo;
  425.   }
  426.   /** Return the DataNode object
  427.    * 
  428.    */
  429.   public static DataNode getDataNode() {
  430.     return datanodeObject;
  431.   } 
  432.   public static InterDatanodeProtocol createInterDataNodeProtocolProxy(
  433.       DatanodeID datanodeid, Configuration conf) throws IOException {
  434.     InetSocketAddress addr = NetUtils.createSocketAddr(
  435.         datanodeid.getHost() + ":" + datanodeid.getIpcPort());
  436.     if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
  437.       InterDatanodeProtocol.LOG.info("InterDatanodeProtocol addr=" + addr);
  438.     }
  439.     return (InterDatanodeProtocol)RPC.getProxy(InterDatanodeProtocol.class,
  440.         InterDatanodeProtocol.versionID, addr, conf);
  441.   }
  442.   public InetSocketAddress getNameNodeAddr() {
  443.     return nameNodeAddr;
  444.   }
  445.   
  446.   public InetSocketAddress getSelfAddr() {
  447.     return selfAddr;
  448.   }
  449.     
  450.   DataNodeMetrics getMetrics() {
  451.     return myMetrics;
  452.   }
  453.   
  454.   /**
  455.    * Return the namenode's identifier
  456.    */
  457.   public String getNamenode() {
  458.     //return namenode.toString();
  459.     return "<namenode>";
  460.   }
  461.   public static void setNewStorageID(DatanodeRegistration dnReg) {
  462.     /* Return 
  463.      * "DS-randInt-ipaddr-currentTimeMillis"
  464.      * It is considered extermely rare for all these numbers to match
  465.      * on a different machine accidentally for the following 
  466.      * a) SecureRandom(INT_MAX) is pretty much random (1 in 2 billion), and
  467.      * b) Good chance ip address would be different, and
  468.      * c) Even on the same machine, Datanode is designed to use different ports.
  469.      * d) Good chance that these are started at different times.
  470.      * For a confict to occur all the 4 above have to match!.
  471.      * The format of this string can be changed anytime in future without
  472.      * affecting its functionality.
  473.      */
  474.     String ip = "unknownIP";
  475.     try {
  476.       ip = DNS.getDefaultIP("default");
  477.     } catch (UnknownHostException ignored) {
  478.       LOG.warn("Could not find ip address of "default" inteface.");
  479.     }
  480.     
  481.     int rand = 0;
  482.     try {
  483.       rand = SecureRandom.getInstance("SHA1PRNG").nextInt(Integer.MAX_VALUE);
  484.     } catch (NoSuchAlgorithmException e) {
  485.       LOG.warn("Could not use SecureRandom");
  486.       rand = R.nextInt(Integer.MAX_VALUE);
  487.     }
  488.     dnReg.storageID = "DS-" + rand + "-"+ ip + "-" + dnReg.getPort() + "-" + 
  489.                       System.currentTimeMillis();
  490.   }
  491.   /**
  492.    * Register datanode
  493.    * <p>
  494.    * The datanode needs to register with the namenode on startup in order
  495.    * 1) to report which storage it is serving now and 
  496.    * 2) to receive a registrationID 
  497.    * issued by the namenode to recognize registered datanodes.
  498.    * 
  499.    * @see FSNamesystem#registerDatanode(DatanodeRegistration)
  500.    * @throws IOException
  501.    */
  502.   private void register() throws IOException {
  503.     if (dnRegistration.getStorageID().equals("")) {
  504.       setNewStorageID(dnRegistration);
  505.     }
  506.     while(shouldRun) {
  507.       try {
  508.         // reset name to machineName. Mainly for web interface.
  509.         dnRegistration.name = machineName + ":" + dnRegistration.getPort();
  510.         dnRegistration = namenode.register(dnRegistration);
  511.         break;
  512.       } catch(SocketTimeoutException e) {  // namenode is busy
  513.         LOG.info("Problem connecting to server: " + getNameNodeAddr());
  514.         try {
  515.           Thread.sleep(1000);
  516.         } catch (InterruptedException ie) {}
  517.       }
  518.     }
  519.     assert ("".equals(storage.getStorageID()) 
  520.             && !"".equals(dnRegistration.getStorageID()))
  521.             || storage.getStorageID().equals(dnRegistration.getStorageID()) :
  522.             "New storageID can be assigned only if data-node is not formatted";
  523.     if (storage.getStorageID().equals("")) {
  524.       storage.setStorageID(dnRegistration.getStorageID());
  525.       storage.writeAll();
  526.       LOG.info("New storage id " + dnRegistration.getStorageID()
  527.           + " is assigned to data-node " + dnRegistration.getName());
  528.     }
  529.     if(! storage.getStorageID().equals(dnRegistration.getStorageID())) {
  530.       throw new IOException("Inconsistent storage IDs. Name-node returned "
  531.           + dnRegistration.getStorageID() 
  532.           + ". Expecting " + storage.getStorageID());
  533.     }
  534.     
  535.     // random short delay - helps scatter the BR from all DNs
  536.     scheduleBlockReport(initialBlockReportDelay);
  537.   }
  538.   /**
  539.    * Shut down this instance of the datanode.
  540.    * Returns only after shutdown is complete.
  541.    * This method can only be called by the offerService thread.
  542.    * Otherwise, deadlock might occur.
  543.    */
  544.   public void shutdown() {
  545.     if (infoServer != null) {
  546.       try {
  547.         infoServer.stop();
  548.       } catch (Exception e) {
  549.         LOG.warn("Exception shutting down DataNode", e);
  550.       }
  551.     }
  552.     if (ipcServer != null) {
  553.       ipcServer.stop();
  554.     }
  555.     this.shouldRun = false;
  556.     if (dataXceiverServer != null) {
  557.       ((DataXceiverServer) this.dataXceiverServer.getRunnable()).kill();
  558.       this.dataXceiverServer.interrupt();
  559.       // wait for all data receiver threads to exit
  560.       if (this.threadGroup != null) {
  561.         while (true) {
  562.           this.threadGroup.interrupt();
  563.           LOG.info("Waiting for threadgroup to exit, active threads is " +
  564.                    this.threadGroup.activeCount());
  565.           if (this.threadGroup.activeCount() == 0) {
  566.             break;
  567.           }
  568.           try {
  569.             Thread.sleep(1000);
  570.           } catch (InterruptedException e) {}
  571.         }
  572.       }
  573.       // wait for dataXceiveServer to terminate
  574.       try {
  575.         this.dataXceiverServer.join();
  576.       } catch (InterruptedException ie) {
  577.       }
  578.     }
  579.     
  580.     RPC.stopProxy(namenode); // stop the RPC threads
  581.     
  582.     if(upgradeManager != null)
  583.       upgradeManager.shutdownUpgrade();
  584.     if (blockScannerThread != null) { 
  585.       blockScannerThread.interrupt();
  586.       try {
  587.         blockScannerThread.join(3600000L); // wait for at most 1 hour
  588.       } catch (InterruptedException ie) {
  589.       }
  590.     }
  591.     if (storage != null) {
  592.       try {
  593.         this.storage.unlockAll();
  594.       } catch (IOException ie) {
  595.       }
  596.     }
  597.     if (dataNodeThread != null) {
  598.       dataNodeThread.interrupt();
  599.       try {
  600.         dataNodeThread.join();
  601.       } catch (InterruptedException ie) {
  602.       }
  603.     }
  604.     if (data != null) {
  605.       data.shutdown();
  606.     }
  607.     if (myMetrics != null) {
  608.       myMetrics.shutdown();
  609.     }
  610.   }
  611.   
  612.   
  613.   /* Check if there is no space in disk or the disk is read-only
  614.    *  when IOException occurs. 
  615.    * If so, handle the error */
  616.   protected void checkDiskError( IOException e ) throws IOException {
  617.     if (e.getMessage() != null && 
  618.         e.getMessage().startsWith("No space left on device")) {
  619.       throw new DiskOutOfSpaceException("No space left on device");
  620.     } else {
  621.       checkDiskError();
  622.     }
  623.   }
  624.   
  625.   /* Check if there is no disk space and if so, handle the error*/
  626.   protected void checkDiskError( ) throws IOException {
  627.     try {
  628.       data.checkDataDir();
  629.     } catch(DiskErrorException de) {
  630.       handleDiskError(de.getMessage());
  631.     }
  632.   }
  633.   
  634.   private void handleDiskError(String errMsgr) {
  635.     LOG.warn("DataNode is shutting down.n" + errMsgr);
  636.     shouldRun = false;
  637.     try {
  638.       namenode.errorReport(
  639.                            dnRegistration, DatanodeProtocol.DISK_ERROR, errMsgr);
  640.     } catch(IOException ignored) {              
  641.     }
  642.   }
  643.     
  644.   /** Number of concurrent xceivers per node. */
  645.   int getXceiverCount() {
  646.     return threadGroup == null ? 0 : threadGroup.activeCount();
  647.   }
  648.     
  649.   /**
  650.    * Main loop for the DataNode.  Runs until shutdown,
  651.    * forever calling remote NameNode functions.
  652.    */
  653.   public void offerService() throws Exception {
  654.      
  655.     LOG.info("using BLOCKREPORT_INTERVAL of " + blockReportInterval + "msec" + 
  656.        " Initial delay: " + initialBlockReportDelay + "msec");
  657.     //
  658.     // Now loop for a long time....
  659.     //
  660.     while (shouldRun) {
  661.       try {
  662.         long startTime = now();
  663.         //
  664.         // Every so often, send heartbeat or block-report
  665.         //
  666.         
  667.         if (startTime - lastHeartbeat > heartBeatInterval) {
  668.           //
  669.           // All heartbeat messages include following info:
  670.           // -- Datanode name
  671.           // -- data transfer port
  672.           // -- Total capacity
  673.           // -- Bytes remaining
  674.           //
  675.           lastHeartbeat = startTime;
  676.           DatanodeCommand[] cmds = namenode.sendHeartbeat(dnRegistration,
  677.                                                        data.getCapacity(),
  678.                                                        data.getDfsUsed(),
  679.                                                        data.getRemaining(),
  680.                                                        xmitsInProgress.get(),
  681.                                                        getXceiverCount());
  682.           myMetrics.heartbeats.inc(now() - startTime);
  683.           //LOG.info("Just sent heartbeat, with name " + localName);
  684.           if (!processCommand(cmds))
  685.             continue;
  686.         }
  687.             
  688.         // check if there are newly received blocks
  689.         Block [] blockArray=null;
  690.         String [] delHintArray=null;
  691.         synchronized(receivedBlockList) {
  692.           synchronized(delHints) {
  693.             int numBlocks = receivedBlockList.size();
  694.             if (numBlocks > 0) {
  695.               if(numBlocks!=delHints.size()) {
  696.                 LOG.warn("Panic: receiveBlockList and delHints are not of the same length" );
  697.               }
  698.               //
  699.               // Send newly-received blockids to namenode
  700.               //
  701.               blockArray = receivedBlockList.toArray(new Block[numBlocks]);
  702.               delHintArray = delHints.toArray(new String[numBlocks]);
  703.             }
  704.           }
  705.         }
  706.         if (blockArray != null) {
  707.           if(delHintArray == null || delHintArray.length != blockArray.length ) {
  708.             LOG.warn("Panic: block array & delHintArray are not the same" );
  709.           }
  710.           namenode.blockReceived(dnRegistration, blockArray, delHintArray);
  711.           synchronized (receivedBlockList) {
  712.             synchronized (delHints) {
  713.               for(int i=0; i<blockArray.length; i++) {
  714.                 receivedBlockList.remove(blockArray[i]);
  715.                 delHints.remove(delHintArray[i]);
  716.               }
  717.             }
  718.           }
  719.         }
  720.         // send block report
  721.         if (startTime - lastBlockReport > blockReportInterval) {
  722.           //
  723.           // Send latest blockinfo report if timer has expired.
  724.           // Get back a list of local block(s) that are obsolete
  725.           // and can be safely GC'ed.
  726.           //
  727.           long brStartTime = now();
  728.           Block[] bReport = data.getBlockReport();
  729.           DatanodeCommand cmd = namenode.blockReport(dnRegistration,
  730.                   BlockListAsLongs.convertToArrayLongs(bReport));
  731.           long brTime = now() - brStartTime;
  732.           myMetrics.blockReports.inc(brTime);
  733.           LOG.info("BlockReport of " + bReport.length +
  734.               " blocks got processed in " + brTime + " msecs");
  735.           //
  736.           // If we have sent the first block report, then wait a random
  737.           // time before we start the periodic block reports.
  738.           //
  739.           if (resetBlockReportTime) {
  740.             lastBlockReport = startTime - R.nextInt((int)(blockReportInterval));
  741.             resetBlockReportTime = false;
  742.           } else {
  743.             /* say the last block report was at 8:20:14. The current report 
  744.              * should have started around 9:20:14 (default 1 hour interval). 
  745.              * If current time is :
  746.              *   1) normal like 9:20:18, next report should be at 10:20:14
  747.              *   2) unexpected like 11:35:43, next report should be at 12:20:14
  748.              */
  749.             lastBlockReport += (now() - lastBlockReport) / 
  750.                                blockReportInterval * blockReportInterval;
  751.           }
  752.           processCommand(cmd);
  753.         }
  754.         // start block scanner
  755.         if (blockScanner != null && blockScannerThread == null &&
  756.             upgradeManager.isUpgradeCompleted()) {
  757.           LOG.info("Starting Periodic block scanner.");
  758.           blockScannerThread = new Daemon(blockScanner);
  759.           blockScannerThread.start();
  760.         }
  761.             
  762.         //
  763.         // There is no work to do;  sleep until hearbeat timer elapses, 
  764.         // or work arrives, and then iterate again.
  765.         //
  766.         long waitTime = heartBeatInterval - (System.currentTimeMillis() - lastHeartbeat);
  767.         synchronized(receivedBlockList) {
  768.           if (waitTime > 0 && receivedBlockList.size() == 0) {
  769.             try {
  770.               receivedBlockList.wait(waitTime);
  771.             } catch (InterruptedException ie) {
  772.             }
  773.           }
  774.         } // synchronized
  775.       } catch(RemoteException re) {
  776.         String reClass = re.getClassName();
  777.         if (UnregisteredDatanodeException.class.getName().equals(reClass) ||
  778.             DisallowedDatanodeException.class.getName().equals(reClass) ||
  779.             IncorrectVersionException.class.getName().equals(reClass)) {
  780.           LOG.warn("DataNode is shutting down: " + 
  781.                    StringUtils.stringifyException(re));
  782.           shutdown();
  783.           return;
  784.         }
  785.         LOG.warn(StringUtils.stringifyException(re));
  786.       } catch (IOException e) {
  787.         LOG.warn(StringUtils.stringifyException(e));
  788.       }
  789.     } // while (shouldRun)
  790.   } // offerService
  791.   /**
  792.    * Process an array of datanode commands
  793.    * 
  794.    * @param cmds an array of datanode commands
  795.    * @return true if further processing may be required or false otherwise. 
  796.    */
  797.   private boolean processCommand(DatanodeCommand[] cmds) {
  798.     if (cmds != null) {
  799.       for (DatanodeCommand cmd : cmds) {
  800.         try {
  801.           if (processCommand(cmd) == false) {
  802.             return false;
  803.           }
  804.         } catch (IOException ioe) {
  805.           LOG.warn("Error processing datanode Command", ioe);
  806.         }
  807.       }
  808.     }
  809.     return true;
  810.   }
  811.   
  812.     /**
  813.      * 
  814.      * @param cmd
  815.      * @return true if further processing may be required or false otherwise. 
  816.      * @throws IOException
  817.      */
  818.   private boolean processCommand(DatanodeCommand cmd) throws IOException {
  819.     if (cmd == null)
  820.       return true;
  821.     final BlockCommand bcmd = cmd instanceof BlockCommand? (BlockCommand)cmd: null;
  822.     switch(cmd.getAction()) {
  823.     case DatanodeProtocol.DNA_TRANSFER:
  824.       // Send a copy of a block to another datanode
  825.       transferBlocks(bcmd.getBlocks(), bcmd.getTargets());
  826.       myMetrics.blocksReplicated.inc(bcmd.getBlocks().length);
  827.       break;
  828.     case DatanodeProtocol.DNA_INVALIDATE:
  829.       //
  830.       // Some local block(s) are obsolete and can be 
  831.       // safely garbage-collected.
  832.       //
  833.       Block toDelete[] = bcmd.getBlocks();
  834.       try {
  835.         if (blockScanner != null) {
  836.           blockScanner.deleteBlocks(toDelete);
  837.         }
  838.         data.invalidate(toDelete);
  839.       } catch(IOException e) {
  840.         checkDiskError();
  841.         throw e;
  842.       }
  843.       myMetrics.blocksRemoved.inc(toDelete.length);
  844.       break;
  845.     case DatanodeProtocol.DNA_SHUTDOWN:
  846.       // shut down the data node
  847.       this.shutdown();
  848.       return false;
  849.     case DatanodeProtocol.DNA_REGISTER:
  850.       // namenode requested a registration - at start or if NN lost contact
  851.       LOG.info("DatanodeCommand action: DNA_REGISTER");
  852.       if (shouldRun) {
  853.         register();
  854.       }
  855.       break;
  856.     case DatanodeProtocol.DNA_FINALIZE:
  857.       storage.finalizeUpgrade();
  858.       break;
  859.     case UpgradeCommand.UC_ACTION_START_UPGRADE:
  860.       // start distributed upgrade here
  861.       processDistributedUpgradeCommand((UpgradeCommand)cmd);
  862.       break;
  863.     case DatanodeProtocol.DNA_RECOVERBLOCK:
  864.       recoverBlocks(bcmd.getBlocks(), bcmd.getTargets());
  865.       break;
  866.     default:
  867.       LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
  868.     }
  869.     return true;
  870.   }
  871.   // Distributed upgrade manager
  872.   UpgradeManagerDatanode upgradeManager = new UpgradeManagerDatanode(this);
  873.   private void processDistributedUpgradeCommand(UpgradeCommand comm
  874.                                                ) throws IOException {
  875.     assert upgradeManager != null : "DataNode.upgradeManager is null.";
  876.     upgradeManager.processUpgradeCommand(comm);
  877.   }
  878.   /**
  879.    * Start distributed upgrade if it should be initiated by the data-node.
  880.    */
  881.   private void startDistributedUpgradeIfNeeded() throws IOException {
  882.     UpgradeManagerDatanode um = DataNode.getDataNode().upgradeManager;
  883.     assert um != null : "DataNode.upgradeManager is null.";
  884.     if(!um.getUpgradeState())
  885.       return;
  886.     um.setUpgradeState(false, um.getUpgradeVersion());
  887.     um.startUpgrade();
  888.     return;
  889.   }
  890.   private void transferBlock( Block block, 
  891.                               DatanodeInfo xferTargets[] 
  892.                               ) throws IOException {
  893.     if (!data.isValidBlock(block)) {
  894.       // block does not exist or is under-construction
  895.       String errStr = "Can't send invalid block " + block;
  896.       LOG.info(errStr);
  897.       namenode.errorReport(dnRegistration, 
  898.                            DatanodeProtocol.INVALID_BLOCK, 
  899.                            errStr);
  900.       return;
  901.     }
  902.     // Check if NN recorded length matches on-disk length 
  903.     long onDiskLength = data.getLength(block);
  904.     if (block.getNumBytes() > onDiskLength) {
  905.       // Shorter on-disk len indicates corruption so report NN the corrupt block
  906.       namenode.reportBadBlocks(new LocatedBlock[]{
  907.           new LocatedBlock(block, new DatanodeInfo[] {
  908.               new DatanodeInfo(dnRegistration)})});
  909.       LOG.info("Can't replicate block " + block
  910.           + " because on-disk length " + onDiskLength 
  911.           + " is shorter than NameNode recorded length " + block.getNumBytes());
  912.       return;
  913.     }
  914.     
  915.     int numTargets = xferTargets.length;
  916.     if (numTargets > 0) {
  917.       if (LOG.isInfoEnabled()) {
  918.         StringBuilder xfersBuilder = new StringBuilder();
  919.         for (int i = 0; i < numTargets; i++) {
  920.           xfersBuilder.append(xferTargets[i].getName());
  921.           xfersBuilder.append(" ");
  922.         }
  923.         LOG.info(dnRegistration + " Starting thread to transfer block " + 
  924.                  block + " to " + xfersBuilder);                       
  925.       }
  926.       new Daemon(new DataTransfer(xferTargets, block, this)).start();
  927.     }
  928.   }
  929.   private void transferBlocks( Block blocks[], 
  930.                                DatanodeInfo xferTargets[][] 
  931.                                ) {
  932.     for (int i = 0; i < blocks.length; i++) {
  933.       try {
  934.         transferBlock(blocks[i], xferTargets[i]);
  935.       } catch (IOException ie) {
  936.         LOG.warn("Failed to transfer block " + blocks[i], ie);
  937.       }
  938.     }
  939.   }
  940.   /*
  941.    * Informing the name node could take a long long time! Should we wait
  942.    * till namenode is informed before responding with success to the
  943.    * client? For now we don't.
  944.    */
  945.   protected void notifyNamenodeReceivedBlock(Block block, String delHint) {
  946.     if(block==null || delHint==null) {
  947.       throw new IllegalArgumentException(block==null?"Block is null":"delHint is null");
  948.     }
  949.     synchronized (receivedBlockList) {
  950.       synchronized (delHints) {
  951.         receivedBlockList.add(block);
  952.         delHints.add(delHint);
  953.         receivedBlockList.notifyAll();
  954.       }
  955.     }
  956.   }
  957.   
  958.   /* ********************************************************************
  959.   Protocol when a client reads data from Datanode (Cur Ver: 9):
  960.   
  961.   Client's Request :
  962.   =================
  963.    
  964.      Processed in DataXceiver:
  965.      +----------------------------------------------+
  966.      | Common Header   | 1 byte OP == OP_READ_BLOCK |
  967.      +----------------------------------------------+
  968.      
  969.      Processed in readBlock() :
  970.      +-------------------------------------------------------------------------+
  971.      | 8 byte Block ID | 8 byte genstamp | 8 byte start offset | 8 byte length |
  972.      +-------------------------------------------------------------------------+
  973.      |   vInt length   |  <DFSClient id> |
  974.      +-----------------------------------+
  975.      
  976.      Client sends optional response only at the end of receiving data.
  977.        
  978.   DataNode Response :
  979.   ===================
  980.    
  981.     In readBlock() :
  982.     If there is an error while initializing BlockSender :
  983.        +---------------------------+
  984.        | 2 byte OP_STATUS_ERROR    | and connection will be closed.
  985.        +---------------------------+
  986.     Otherwise
  987.        +---------------------------+
  988.        | 2 byte OP_STATUS_SUCCESS  |
  989.        +---------------------------+
  990.        
  991.     Actual data, sent by BlockSender.sendBlock() :
  992.     
  993.       ChecksumHeader :
  994.       +--------------------------------------------------+
  995.       | 1 byte CHECKSUM_TYPE | 4 byte BYTES_PER_CHECKSUM |
  996.       +--------------------------------------------------+
  997.       Followed by actual data in the form of PACKETS: 
  998.       +------------------------------------+
  999.       | Sequence of data PACKETs ....      |
  1000.       +------------------------------------+
  1001.     
  1002.     A "PACKET" is defined further below.
  1003.     
  1004.     The client reads data until it receives a packet with 
  1005.     "LastPacketInBlock" set to true or with a zero length. If there is 
  1006.     no checksum error, it replies to DataNode with OP_STATUS_CHECKSUM_OK:
  1007.     
  1008.     Client optional response at the end of data transmission :
  1009.       +------------------------------+
  1010.       | 2 byte OP_STATUS_CHECKSUM_OK |
  1011.       +------------------------------+
  1012.     
  1013.     PACKET : Contains a packet header, checksum and data. Amount of data
  1014.     ======== carried is set by BUFFER_SIZE.
  1015.     
  1016.       +-----------------------------------------------------+
  1017.       | 4 byte packet length (excluding packet header)      |
  1018.       +-----------------------------------------------------+
  1019.       | 8 byte offset in the block | 8 byte sequence number |
  1020.       +-----------------------------------------------------+
  1021.       | 1 byte isLastPacketInBlock                          |
  1022.       +-----------------------------------------------------+
  1023.       | 4 byte Length of actual data                        |
  1024.       +-----------------------------------------------------+
  1025.       | x byte checksum data. x is defined below            |
  1026.       +-----------------------------------------------------+
  1027.       | actual data ......                                  |
  1028.       +-----------------------------------------------------+
  1029.       
  1030.       x = (length of data + BYTE_PER_CHECKSUM - 1)/BYTES_PER_CHECKSUM *
  1031.           CHECKSUM_SIZE
  1032.           
  1033.       CHECKSUM_SIZE depends on CHECKSUM_TYPE (usually, 4 for CRC32)
  1034.       
  1035.       The above packet format is used while writing data to DFS also.
  1036.       Not all the fields might be used while reading.
  1037.     
  1038.    ************************************************************************ */
  1039.   
  1040.   /** Header size for a packet */
  1041.   public static final int PKT_HEADER_LEN = ( 4 + /* Packet payload length */
  1042.                                       8 + /* offset in block */
  1043.                                       8 + /* seqno */
  1044.                                       1   /* isLastPacketInBlock */);
  1045.   
  1046.   /**
  1047.    * Used for transferring a block of data.  This class
  1048.    * sends a piece of data to another DataNode.
  1049.    */
  1050.   class DataTransfer implements Runnable {
  1051.     DatanodeInfo targets[];
  1052.     Block b;
  1053.     DataNode datanode;
  1054.     /**
  1055.      * Connect to the first item in the target list.  Pass along the 
  1056.      * entire target list, the block, and the data.
  1057.      */
  1058.     public DataTransfer(DatanodeInfo targets[], Block b, DataNode datanode) throws IOException {
  1059.       this.targets = targets;
  1060.       this.b = b;
  1061.       this.datanode = datanode;
  1062.     }
  1063.     /**
  1064.      * Do the deed, write the bytes
  1065.      */
  1066.     public void run() {
  1067.       xmitsInProgress.getAndIncrement();
  1068.       Socket sock = null;
  1069.       DataOutputStream out = null;
  1070.       BlockSender blockSender = null;
  1071.       
  1072.       try {
  1073.         InetSocketAddress curTarget = 
  1074.           NetUtils.createSocketAddr(targets[0].getName());
  1075.         sock = newSocket();
  1076.         NetUtils.connect(sock, curTarget, socketTimeout);
  1077.         sock.setSoTimeout(targets.length * socketTimeout);
  1078.         long writeTimeout = socketWriteTimeout + 
  1079.                             HdfsConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1);
  1080.         OutputStream baseStream = NetUtils.getOutputStream(sock, writeTimeout);
  1081.         out = new DataOutputStream(new BufferedOutputStream(baseStream, 
  1082.                                                             SMALL_BUFFER_SIZE));
  1083.         blockSender = new BlockSender(b, 0, b.getNumBytes(), false, false, false, 
  1084.             datanode);
  1085.         DatanodeInfo srcNode = new DatanodeInfo(dnRegistration);
  1086.         //
  1087.         // Header info
  1088.         //
  1089.         out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
  1090.         out.writeByte(DataTransferProtocol.OP_WRITE_BLOCK);
  1091.         out.writeLong(b.getBlockId());
  1092.         out.writeLong(b.getGenerationStamp());
  1093.         out.writeInt(0);           // no pipelining
  1094.         out.writeBoolean(false);   // not part of recovery
  1095.         Text.writeString(out, ""); // client
  1096.         out.writeBoolean(true); // sending src node information
  1097.         srcNode.write(out); // Write src node DatanodeInfo
  1098.         // write targets
  1099.         out.writeInt(targets.length - 1);
  1100.         for (int i = 1; i < targets.length; i++) {
  1101.           targets[i].write(out);
  1102.         }
  1103.         // send data & checksum
  1104.         blockSender.sendBlock(out, baseStream, null);
  1105.         // no response necessary
  1106.         LOG.info(dnRegistration + ":Transmitted block " + b + " to " + curTarget);
  1107.       } catch (IOException ie) {
  1108.         LOG.warn(dnRegistration + ":Failed to transfer " + b + " to " + targets[0].getName()
  1109.             + " got " + StringUtils.stringifyException(ie));
  1110.       } finally {
  1111.         xmitsInProgress.getAndDecrement();
  1112.         IOUtils.closeStream(blockSender);
  1113.         IOUtils.closeStream(out);
  1114.         IOUtils.closeSocket(sock);
  1115.       }
  1116.     }
  1117.   }
  1118.   /**
  1119.    * No matter what kind of exception we get, keep retrying to offerService().
  1120.    * That's the loop that connects to the NameNode and provides basic DataNode
  1121.    * functionality.
  1122.    *
  1123.    * Only stop when "shouldRun" is turned off (which can only happen at shutdown).
  1124.    */
  1125.   public void run() {
  1126.     LOG.info(dnRegistration + "In DataNode.run, data = " + data);
  1127.     // start dataXceiveServer
  1128.     dataXceiverServer.start();
  1129.         
  1130.     while (shouldRun) {
  1131.       try {
  1132.         startDistributedUpgradeIfNeeded();
  1133.         offerService();
  1134.       } catch (Exception ex) {
  1135.         LOG.error("Exception: " + StringUtils.stringifyException(ex));
  1136.         if (shouldRun) {
  1137.           try {
  1138.             Thread.sleep(5000);
  1139.           } catch (InterruptedException ie) {
  1140.           }
  1141.         }
  1142.       }
  1143.     }
  1144.         
  1145.     LOG.info(dnRegistration + ":Finishing DataNode in: "+data);
  1146.     shutdown();
  1147.   }
  1148.     
  1149.   /** Start a single datanode daemon and wait for it to finish.
  1150.    *  If this thread is specifically interrupted, it will stop waiting.
  1151.    */
  1152.   public static void runDatanodeDaemon(DataNode dn) throws IOException {
  1153.     if (dn != null) {
  1154.       //register datanode
  1155.       dn.register();
  1156.       dn.dataNodeThread = new Thread(dn, dnThreadName);
  1157.       dn.dataNodeThread.setDaemon(true); // needed for JUnit testing
  1158.       dn.dataNodeThread.start();
  1159.     }
  1160.   }
  1161.   
  1162.   static boolean isDatanodeUp(DataNode dn) {
  1163.     return dn.dataNodeThread != null && dn.dataNodeThread.isAlive();
  1164.   }
  1165.   /** Instantiate a single datanode object. This must be run by invoking
  1166.    *  {@link DataNode#runDatanodeDaemon(DataNode)} subsequently. 
  1167.    */
  1168.   public static DataNode instantiateDataNode(String args[],
  1169.                                       Configuration conf) throws IOException {
  1170.     if (conf == null)
  1171.       conf = new Configuration();
  1172.     if (!parseArguments(args, conf)) {
  1173.       printUsage();
  1174.       return null;
  1175.     }
  1176.     if (conf.get("dfs.network.script") != null) {
  1177.       LOG.error("This configuration for rack identification is not supported" +
  1178.           " anymore. RackID resolution is handled by the NameNode.");
  1179.       System.exit(-1);
  1180.     }
  1181.     String[] dataDirs = conf.getStrings("dfs.data.dir");
  1182.     dnThreadName = "DataNode: [" +
  1183.                         StringUtils.arrayToString(dataDirs) + "]";
  1184.     return makeInstance(dataDirs, conf);
  1185.   }
  1186.   /** Instantiate & Start a single datanode daemon and wait for it to finish.
  1187.    *  If this thread is specifically interrupted, it will stop waiting.
  1188.    */
  1189.   public static DataNode createDataNode(String args[],
  1190.                                  Configuration conf) throws IOException {
  1191.     DataNode dn = instantiateDataNode(args, conf);
  1192.     runDatanodeDaemon(dn);
  1193.     return dn;
  1194.   }
  1195.   void join() {
  1196.     if (dataNodeThread != null) {
  1197.       try {
  1198.         dataNodeThread.join();
  1199.       } catch (InterruptedException e) {}
  1200.     }
  1201.   }
  1202.   /**
  1203.    * Make an instance of DataNode after ensuring that at least one of the
  1204.    * given data directories (and their parent directories, if necessary)
  1205.    * can be created.
  1206.    * @param dataDirs List of directories, where the new DataNode instance should
  1207.    * keep its files.
  1208.    * @param conf Configuration instance to use.
  1209.    * @return DataNode instance for given list of data dirs and conf, or null if
  1210.    * no directory from this directory list can be created.
  1211.    * @throws IOException
  1212.    */
  1213.   public static DataNode makeInstance(String[] dataDirs, Configuration conf)
  1214.     throws IOException {
  1215.     ArrayList<File> dirs = new ArrayList<File>();
  1216.     for (int i = 0; i < dataDirs.length; i++) {
  1217.       File data = new File(dataDirs[i]);
  1218.       try {
  1219.         DiskChecker.checkDir(data);
  1220.         dirs.add(data);
  1221.       } catch(DiskErrorException e) {
  1222.         LOG.warn("Invalid directory in dfs.data.dir: " + e.getMessage());
  1223.       }
  1224.     }
  1225.     if (dirs.size() > 0) 
  1226.       return new DataNode(conf, dirs);
  1227.     LOG.error("All directories in dfs.data.dir are invalid.");
  1228.     return null;
  1229.   }
  1230.   @Override
  1231.   public String toString() {
  1232.     return "DataNode{" +
  1233.       "data=" + data +
  1234.       ", localName='" + dnRegistration.getName() + "'" +
  1235.       ", storageID='" + dnRegistration.getStorageID() + "'" +
  1236.       ", xmitsInProgress=" + xmitsInProgress.get() +
  1237.       "}";
  1238.   }
  1239.   
  1240.   private static void printUsage() {
  1241.     System.err.println("Usage: java DataNode");
  1242.     System.err.println("           [-rollback]");
  1243.   }
  1244.   /**
  1245.    * Parse and verify command line arguments and set configuration parameters.
  1246.    *
  1247.    * @return false if passed argements are incorrect
  1248.    */
  1249.   private static boolean parseArguments(String args[], 
  1250.                                         Configuration conf) {
  1251.     int argsLen = (args == null) ? 0 : args.length;
  1252.     StartupOption startOpt = StartupOption.REGULAR;
  1253.     for(int i=0; i < argsLen; i++) {
  1254.       String cmd = args[i];
  1255.       if ("-r".equalsIgnoreCase(cmd) || "--rack".equalsIgnoreCase(cmd)) {
  1256.         LOG.error("-r, --rack arguments are not supported anymore. RackID " +
  1257.             "resolution is handled by the NameNode.");
  1258.         System.exit(-1);
  1259.       } else if ("-rollback".equalsIgnoreCase(cmd)) {
  1260.         startOpt = StartupOption.ROLLBACK;
  1261.       } else if ("-regular".equalsIgnoreCase(cmd)) {
  1262.         startOpt = StartupOption.REGULAR;
  1263.       } else
  1264.         return false;
  1265.     }
  1266.     setStartupOption(conf, startOpt);
  1267.     return true;
  1268.   }
  1269.   private static void setStartupOption(Configuration conf, StartupOption opt) {
  1270.     conf.set("dfs.datanode.startup", opt.toString());
  1271.   }
  1272.   static StartupOption getStartupOption(Configuration conf) {
  1273.     return StartupOption.valueOf(conf.get("dfs.datanode.startup",
  1274.                                           StartupOption.REGULAR.toString()));
  1275.   }
  1276.   /**
  1277.    * This methods  arranges for the data node to send the block report at the next heartbeat.
  1278.    */
  1279.   public void scheduleBlockReport(long delay) {
  1280.     if (delay > 0) { // send BR after random delay
  1281.       lastBlockReport = System.currentTimeMillis()
  1282.                             - ( blockReportInterval - R.nextInt((int)(delay)));
  1283.     } else { // send at next heartbeat
  1284.       lastBlockReport = lastHeartbeat - blockReportInterval;
  1285.     }
  1286.     resetBlockReportTime = true; // reset future BRs for randomness
  1287.   }
  1288.   
  1289.   
  1290.   /**
  1291.    * This method is used for testing. 
  1292.    * Examples are adding and deleting blocks directly.
  1293.    * The most common usage will be when the data node's storage is similated.
  1294.    * 
  1295.    * @return the fsdataset that stores the blocks
  1296.    */
  1297.   public FSDatasetInterface getFSDataset() {
  1298.     return data;
  1299.   }
  1300.   /**
  1301.    */
  1302.   public static void main(String args[]) {
  1303.     try {
  1304.       StringUtils.startupShutdownMessage(DataNode.class, args, LOG);
  1305.       DataNode datanode = createDataNode(args, null);
  1306.       if (datanode != null)
  1307.         datanode.join();
  1308.     } catch (Throwable e) {
  1309.       LOG.error(StringUtils.stringifyException(e));
  1310.       System.exit(-1);
  1311.     }
  1312.   }
  1313.   // InterDataNodeProtocol implementation
  1314.   /** {@inheritDoc} */
  1315.   public BlockMetaDataInfo getBlockMetaDataInfo(Block block
  1316.       ) throws IOException {
  1317.     if (LOG.isDebugEnabled()) {
  1318.       LOG.debug("block=" + block);
  1319.     }
  1320.     Block stored = data.getStoredBlock(block.getBlockId());
  1321.     if (stored == null) {
  1322.       return null;
  1323.     }
  1324.     BlockMetaDataInfo info = new BlockMetaDataInfo(stored,
  1325.                                  blockScanner.getLastScanTime(stored));
  1326.     if (LOG.isDebugEnabled()) {
  1327.       LOG.debug("getBlockMetaDataInfo successful block=" + stored +
  1328.                 " length " + stored.getNumBytes() +
  1329.                 " genstamp " + stored.getGenerationStamp());
  1330.     }
  1331.     // paranoia! verify that the contents of the stored block
  1332.     // matches the block file on disk.
  1333.     data.validateBlockMetadata(stored);
  1334.     return info;
  1335.   }
  1336.   public Daemon recoverBlocks(final Block[] blocks, final DatanodeInfo[][] targets) {
  1337.     Daemon d = new Daemon(threadGroup, new Runnable() {
  1338.       /** Recover a list of blocks. It is run by the primary datanode. */
  1339.       public void run() {
  1340.         for(int i = 0; i < blocks.length; i++) {
  1341.           try {
  1342.             logRecoverBlock("NameNode", blocks[i], targets[i]);
  1343.             recoverBlock(blocks[i], false, targets[i], true);
  1344.           } catch (IOException e) {
  1345.             LOG.warn("recoverBlocks FAILED, blocks[" + i + "]=" + blocks[i], e);
  1346.           }
  1347.         }
  1348.       }
  1349.     });
  1350.     d.start();
  1351.     return d;
  1352.   }
  1353.   /** {@inheritDoc} */
  1354.   public void updateBlock(Block oldblock, Block newblock, boolean finalize) throws IOException {
  1355.     LOG.info("oldblock=" + oldblock + "(length=" + oldblock.getNumBytes()
  1356.         + "), newblock=" + newblock + "(length=" + newblock.getNumBytes()
  1357.         + "), datanode=" + dnRegistration.getName());
  1358.     data.updateBlock(oldblock, newblock);
  1359.     if (finalize) {
  1360.       data.finalizeBlock(newblock);
  1361.       myMetrics.blocksWritten.inc(); 
  1362.       notifyNamenodeReceivedBlock(newblock, EMPTY_DEL_HINT);
  1363.       LOG.info("Received block " + newblock +
  1364.                 " of size " + newblock.getNumBytes() +
  1365.                 " as part of lease recovery.");
  1366.     }
  1367.   }
  1368.   /** {@inheritDoc} */
  1369.   public long getProtocolVersion(String protocol, long clientVersion
  1370.       ) throws IOException {
  1371.     if (protocol.equals(InterDatanodeProtocol.class.getName())) {
  1372.       return InterDatanodeProtocol.versionID; 
  1373.     } else if (protocol.equals(ClientDatanodeProtocol.class.getName())) {
  1374.       return ClientDatanodeProtocol.versionID; 
  1375.     }
  1376.     throw new IOException("Unknown protocol to " + getClass().getSimpleName()
  1377.         + ": " + protocol);
  1378.   }
  1379.   /** A convenient class used in lease recovery */
  1380.   private static class BlockRecord { 
  1381.     final DatanodeID id;
  1382.     final InterDatanodeProtocol datanode;
  1383.     final Block block;
  1384.     
  1385.     BlockRecord(DatanodeID id, InterDatanodeProtocol datanode, Block block) {
  1386.       this.id = id;
  1387.       this.datanode = datanode;
  1388.       this.block = block;
  1389.     }
  1390.     /** {@inheritDoc} */
  1391.     public String toString() {
  1392.       return "block:" + block + " node:" + id;
  1393.     }
  1394.   }
  1395.   /** Recover a block */
  1396.   private LocatedBlock recoverBlock(Block block, boolean keepLength,
  1397.       DatanodeID[] datanodeids, boolean closeFile) throws IOException {
  1398.     // If the block is already being recovered, then skip recovering it.
  1399.     // This can happen if the namenode and client start recovering the same
  1400.     // file at the same time.
  1401.     synchronized (ongoingRecovery) {
  1402.       Block tmp = new Block();
  1403.       tmp.set(block.getBlockId(), block.getNumBytes(), GenerationStamp.WILDCARD_STAMP);
  1404.       if (ongoingRecovery.get(tmp) != null) {
  1405.         String msg = "Block " + block + " is already being recovered, " +
  1406.                      " ignoring this request to recover it.";
  1407.         LOG.info(msg);
  1408.         throw new IOException(msg);
  1409.       }
  1410.       ongoingRecovery.put(block, block);
  1411.     }
  1412.     try {
  1413.       List<BlockRecord> syncList = new ArrayList<BlockRecord>();
  1414.       long minlength = Long.MAX_VALUE;
  1415.       int errorCount = 0;
  1416.       //check generation stamps
  1417.       for(DatanodeID id : datanodeids) {
  1418.         try {
  1419.           InterDatanodeProtocol datanode = dnRegistration.equals(id)?
  1420.               this: DataNode.createInterDataNodeProtocolProxy(id, getConf());
  1421.           BlockMetaDataInfo info = datanode.getBlockMetaDataInfo(block);
  1422.           if (info != null && info.getGenerationStamp() >= block.getGenerationStamp()) {
  1423.             if (keepLength) {
  1424.               if (info.getNumBytes() == block.getNumBytes()) {
  1425.                 syncList.add(new BlockRecord(id, datanode, new Block(info)));
  1426.               }
  1427.             }
  1428.             else {
  1429.               syncList.add(new BlockRecord(id, datanode, new Block(info)));
  1430.               if (info.getNumBytes() < minlength) {
  1431.                 minlength = info.getNumBytes();
  1432.               }
  1433.             }
  1434.           }
  1435.         } catch (IOException e) {
  1436.           ++errorCount;
  1437.           InterDatanodeProtocol.LOG.warn(
  1438.               "Failed to getBlockMetaDataInfo for block (=" + block 
  1439.               + ") from datanode (=" + id + ")", e);
  1440.         }
  1441.       }
  1442.       if (syncList.isEmpty() && errorCount > 0) {
  1443.         throw new IOException("All datanodes failed: block=" + block
  1444.             + ", datanodeids=" + Arrays.asList(datanodeids));
  1445.       }
  1446.       if (!keepLength) {
  1447.         block.setNumBytes(minlength);
  1448.       }
  1449.       return syncBlock(block, syncList, closeFile);
  1450.     } finally {
  1451.       synchronized (ongoingRecovery) {
  1452.         ongoingRecovery.remove(block);
  1453.       }
  1454.     }
  1455.   }
  1456.   /** Block synchronization */
  1457.   private LocatedBlock syncBlock(Block block, List<BlockRecord> syncList,
  1458.       boolean closeFile) throws IOException {
  1459.     if (LOG.isDebugEnabled()) {
  1460.       LOG.debug("block=" + block + ", (length=" + block.getNumBytes()
  1461.           + "), syncList=" + syncList + ", closeFile=" + closeFile);
  1462.     }
  1463.     //syncList.isEmpty() that all datanodes do not have the block
  1464.     //so the block can be deleted.
  1465.     if (syncList.isEmpty()) {
  1466.       namenode.commitBlockSynchronization(block, 0, 0, closeFile, true,
  1467.           DatanodeID.EMPTY_ARRAY);
  1468.       return null;
  1469.     }
  1470.     List<DatanodeID> successList = new ArrayList<DatanodeID>();
  1471.     long generationstamp = namenode.nextGenerationStamp(block);
  1472.     Block newblock = new Block(block.getBlockId(), block.getNumBytes(), generationstamp);
  1473.     for(BlockRecord r : syncList) {
  1474.       try {
  1475.         r.datanode.updateBlock(r.block, newblock, closeFile);
  1476.         successList.add(r.id);
  1477.       } catch (IOException e) {
  1478.         InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock="
  1479.             + newblock + ", datanode=" + r.id + ")", e);
  1480.       }
  1481.     }
  1482.     if (!successList.isEmpty()) {
  1483.       DatanodeID[] nlist = successList.toArray(new DatanodeID[successList.size()]);
  1484.       namenode.commitBlockSynchronization(block,
  1485.           newblock.getGenerationStamp(), newblock.getNumBytes(), closeFile, false,
  1486.           nlist);
  1487.       DatanodeInfo[] info = new DatanodeInfo[nlist.length];
  1488.       for (int i = 0; i < nlist.length; i++) {
  1489.         info[i] = new DatanodeInfo(nlist[i]);
  1490.       }
  1491.       return new LocatedBlock(newblock, info); // success
  1492.     }
  1493.     //failed
  1494.     StringBuilder b = new StringBuilder();
  1495.     for(BlockRecord r : syncList) {
  1496.       b.append("n  " + r.id);
  1497.     }
  1498.     throw new IOException("Cannot recover " + block + ", none of these "
  1499.         + syncList.size() + " datanodes success {" + b + "n}");
  1500.   }
  1501.   
  1502.   // ClientDataNodeProtocol implementation
  1503.   /** {@inheritDoc} */
  1504.   public LocatedBlock recoverBlock(Block block, boolean keepLength, DatanodeInfo[] targets
  1505.       ) throws IOException {
  1506.     logRecoverBlock("Client", block, targets);
  1507.     return recoverBlock(block, keepLength, targets, false);
  1508.   }
  1509.   private static void logRecoverBlock(String who,
  1510.       Block block, DatanodeID[] targets) {
  1511.     StringBuilder msg = new StringBuilder(targets[0].getName());
  1512.     for (int i = 1; i < targets.length; i++) {
  1513.       msg.append(", " + targets[i].getName());
  1514.     }
  1515.     LOG.info(who + " calls recoverBlock(block=" + block
  1516.         + ", targets=[" + msg + "])");
  1517.   }
  1518. }