MiniDFSCluster.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:32k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfs;
  19. import java.io.File;
  20. import java.io.IOException;
  21. import java.net.InetSocketAddress;
  22. import java.util.ArrayList;
  23. import java.util.Collection;
  24. import java.nio.channels.FileChannel;
  25. import java.util.Random;
  26. import java.io.RandomAccessFile;
  27. import javax.security.auth.login.LoginException;
  28. import org.apache.hadoop.conf.Configuration;
  29. import org.apache.hadoop.net.*;
  30. import org.apache.hadoop.hdfs.protocol.Block;
  31. import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
  32. import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
  33. import org.apache.hadoop.hdfs.server.datanode.DataNode;
  34. import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface;
  35. import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
  36. import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
  37. import org.apache.hadoop.hdfs.server.namenode.NameNode;
  38. import org.apache.hadoop.hdfs.tools.DFSAdmin;
  39. import org.apache.hadoop.fs.FileSystem;
  40. import org.apache.hadoop.fs.FileUtil;
  41. import org.apache.hadoop.security.*;
  42. import org.apache.hadoop.util.ToolRunner;
  43. /**
  44.  * This class creates a single-process DFS cluster for junit testing.
  45.  * The data directories for non-simulated DFS are under the testing directory.
  46.  * For simulated data nodes, no underlying fs storage is used.
  47.  */
  48. public class MiniDFSCluster {
  49.   public class DataNodeProperties {
  50.     DataNode datanode;
  51.     Configuration conf;
  52.     String[] dnArgs;
  53.     DataNodeProperties(DataNode node, Configuration conf, String[] args) {
  54.       this.datanode = node;
  55.       this.conf = conf;
  56.       this.dnArgs = args;
  57.     }
  58.   }
  59.   private Configuration conf;
  60.   private NameNode nameNode;
  61.   private int numDataNodes;
  62.   private ArrayList<DataNodeProperties> dataNodes = 
  63.                          new ArrayList<DataNodeProperties>();
  64.   private File base_dir;
  65.   private File data_dir;
  66.   
  67.   
  68.   /**
  69.    * This null constructor is used only when wishing to start a data node cluster
  70.    * without a name node (ie when the name node is started elsewhere).
  71.    */
  72.   public MiniDFSCluster() {
  73.   }
  74.   
  75.   /**
  76.    * Modify the config and start up the servers with the given operation.
  77.    * Servers will be started on free ports.
  78.    * <p>
  79.    * The caller must manage the creation of NameNode and DataNode directories
  80.    * and have already set dfs.name.dir and dfs.data.dir in the given conf.
  81.    * 
  82.    * @param conf the base configuration to use in starting the servers.  This
  83.    *          will be modified as necessary.
  84.    * @param numDataNodes Number of DataNodes to start; may be zero
  85.    * @param nameNodeOperation the operation with which to start the servers.  If null
  86.    *          or StartupOption.FORMAT, then StartupOption.REGULAR will be used.
  87.    */
  88.   public MiniDFSCluster(Configuration conf,
  89.                         int numDataNodes,
  90.                         StartupOption nameNodeOperation) throws IOException {
  91.     this(0, conf, numDataNodes, false, false, false,  nameNodeOperation, 
  92.           null, null, null);
  93.   }
  94.   
  95.   /**
  96.    * Modify the config and start up the servers.  The rpc and info ports for
  97.    * servers are guaranteed to use free ports.
  98.    * <p>
  99.    * NameNode and DataNode directory creation and configuration will be
  100.    * managed by this class.
  101.    *
  102.    * @param conf the base configuration to use in starting the servers.  This
  103.    *          will be modified as necessary.
  104.    * @param numDataNodes Number of DataNodes to start; may be zero
  105.    * @param format if true, format the NameNode and DataNodes before starting up
  106.    * @param racks array of strings indicating the rack that each DataNode is on
  107.    */
  108.   public MiniDFSCluster(Configuration conf,
  109.                         int numDataNodes,
  110.                         boolean format,
  111.                         String[] racks) throws IOException {
  112.     this(0, conf, numDataNodes, format, true, true,  null, racks, null, null);
  113.   }
  114.   
  115.   /**
  116.    * Modify the config and start up the servers.  The rpc and info ports for
  117.    * servers are guaranteed to use free ports.
  118.    * <p>
  119.    * NameNode and DataNode directory creation and configuration will be
  120.    * managed by this class.
  121.    *
  122.    * @param conf the base configuration to use in starting the servers.  This
  123.    *          will be modified as necessary.
  124.    * @param numDataNodes Number of DataNodes to start; may be zero
  125.    * @param format if true, format the NameNode and DataNodes before starting up
  126.    * @param racks array of strings indicating the rack that each DataNode is on
  127.    * @param hosts array of strings indicating the hostname for each DataNode
  128.    */
  129.   public MiniDFSCluster(Configuration conf,
  130.                         int numDataNodes,
  131.                         boolean format,
  132.                         String[] racks, String[] hosts) throws IOException {
  133.     this(0, conf, numDataNodes, format, true, true, null, racks, hosts, null);
  134.   }
  135.   
  136.   /**
  137.    * NOTE: if possible, the other constructors that don't have nameNode port 
  138.    * parameter should be used as they will ensure that the servers use free ports.
  139.    * <p>
  140.    * Modify the config and start up the servers.  
  141.    * 
  142.    * @param nameNodePort suggestion for which rpc port to use.  caller should
  143.    *          use getNameNodePort() to get the actual port used.
  144.    * @param conf the base configuration to use in starting the servers.  This
  145.    *          will be modified as necessary.
  146.    * @param numDataNodes Number of DataNodes to start; may be zero
  147.    * @param format if true, format the NameNode and DataNodes before starting up
  148.    * @param manageDfsDirs if true, the data directories for servers will be
  149.    *          created and dfs.name.dir and dfs.data.dir will be set in the conf
  150.    * @param operation the operation with which to start the servers.  If null
  151.    *          or StartupOption.FORMAT, then StartupOption.REGULAR will be used.
  152.    * @param racks array of strings indicating the rack that each DataNode is on
  153.    */
  154.   public MiniDFSCluster(int nameNodePort, 
  155.                         Configuration conf,
  156.                         int numDataNodes,
  157.                         boolean format,
  158.                         boolean manageDfsDirs,
  159.                         StartupOption operation,
  160.                         String[] racks) throws IOException {
  161.     this(nameNodePort, conf, numDataNodes, format, manageDfsDirs, manageDfsDirs,
  162.          operation, racks, null, null);
  163.   }
  164.   /**
  165.    * NOTE: if possible, the other constructors that don't have nameNode port 
  166.    * parameter should be used as they will ensure that the servers use free ports.
  167.    * <p>
  168.    * Modify the config and start up the servers.  
  169.    * 
  170.    * @param nameNodePort suggestion for which rpc port to use.  caller should
  171.    *          use getNameNodePort() to get the actual port used.
  172.    * @param conf the base configuration to use in starting the servers.  This
  173.    *          will be modified as necessary.
  174.    * @param numDataNodes Number of DataNodes to start; may be zero
  175.    * @param format if true, format the NameNode and DataNodes before starting up
  176.    * @param manageDfsDirs if true, the data directories for servers will be
  177.    *          created and dfs.name.dir and dfs.data.dir will be set in the conf
  178.    * @param operation the operation with which to start the servers.  If null
  179.    *          or StartupOption.FORMAT, then StartupOption.REGULAR will be used.
  180.    * @param racks array of strings indicating the rack that each DataNode is on
  181.    * @param simulatedCapacities array of capacities of the simulated data nodes
  182.    */
  183.   public MiniDFSCluster(int nameNodePort, 
  184.                         Configuration conf,
  185.                         int numDataNodes,
  186.                         boolean format,
  187.                         boolean manageDfsDirs,
  188.                         StartupOption operation,
  189.                         String[] racks,
  190.                         long[] simulatedCapacities) throws IOException {
  191.     this(nameNodePort, conf, numDataNodes, format, manageDfsDirs, manageDfsDirs,
  192.           operation, racks, null, simulatedCapacities);
  193.   }
  194.   
  195.   /**
  196.    * NOTE: if possible, the other constructors that don't have nameNode port 
  197.    * parameter should be used as they will ensure that the servers use free ports.
  198.    * <p>
  199.    * Modify the config and start up the servers.  
  200.    * 
  201.    * @param nameNodePort suggestion for which rpc port to use.  caller should
  202.    *          use getNameNodePort() to get the actual port used.
  203.    * @param conf the base configuration to use in starting the servers.  This
  204.    *          will be modified as necessary.
  205.    * @param numDataNodes Number of DataNodes to start; may be zero
  206.    * @param format if true, format the NameNode and DataNodes before starting up
  207.    * @param manageNameDfsDirs if true, the data directories for servers will be
  208.    *          created and dfs.name.dir and dfs.data.dir will be set in the conf
  209.    * @param manageDataDfsDirs if true, the data directories for datanodes will
  210.    *          be created and dfs.data.dir set to same in the conf
  211.    * @param operation the operation with which to start the servers.  If null
  212.    *          or StartupOption.FORMAT, then StartupOption.REGULAR will be used.
  213.    * @param racks array of strings indicating the rack that each DataNode is on
  214.    * @param hosts array of strings indicating the hostnames of each DataNode
  215.    * @param simulatedCapacities array of capacities of the simulated data nodes
  216.    */
  217.   public MiniDFSCluster(int nameNodePort, 
  218.                         Configuration conf,
  219.                         int numDataNodes,
  220.                         boolean format,
  221.                         boolean manageNameDfsDirs,
  222.                         boolean manageDataDfsDirs,
  223.                         StartupOption operation,
  224.                         String[] racks, String hosts[],
  225.                         long[] simulatedCapacities) throws IOException {
  226.     this.conf = conf;
  227.     try {
  228.       UserGroupInformation.setCurrentUser(UnixUserGroupInformation.login(conf));
  229.     } catch (LoginException e) {
  230.       IOException ioe = new IOException();
  231.       ioe.initCause(e);
  232.       throw ioe;
  233.     }
  234.     base_dir = new File(System.getProperty("test.build.data", "build/test/data"), "dfs/");
  235.     data_dir = new File(base_dir, "data");
  236.     
  237.     // Setup the NameNode configuration
  238.     FileSystem.setDefaultUri(conf, "hdfs://localhost:"+ Integer.toString(nameNodePort));
  239.     conf.set("dfs.http.address", "127.0.0.1:0");  
  240.     if (manageNameDfsDirs) {
  241.       conf.set("dfs.name.dir", new File(base_dir, "name1").getPath()+","+
  242.                new File(base_dir, "name2").getPath());
  243.       conf.set("fs.checkpoint.dir", new File(base_dir, "namesecondary1").
  244.                 getPath()+"," + new File(base_dir, "namesecondary2").getPath());
  245.     }
  246.     
  247.     int replication = conf.getInt("dfs.replication", 3);
  248.     conf.setInt("dfs.replication", Math.min(replication, numDataNodes));
  249.     conf.setInt("dfs.safemode.extension", 0);
  250.     conf.setInt("dfs.namenode.decommission.interval", 3); // 3 second
  251.     
  252.     // Format and clean out DataNode directories
  253.     if (format) {
  254.       if (data_dir.exists() && !FileUtil.fullyDelete(data_dir)) {
  255.         throw new IOException("Cannot remove data directory: " + data_dir);
  256.       }
  257.       NameNode.format(conf); 
  258.     }
  259.     
  260.     // Start the NameNode
  261.     String[] args = (operation == null ||
  262.                      operation == StartupOption.FORMAT ||
  263.                      operation == StartupOption.REGULAR) ?
  264.       new String[] {} : new String[] {operation.getName()};
  265.     conf.setClass("topology.node.switch.mapping.impl", 
  266.                    StaticMapping.class, DNSToSwitchMapping.class);
  267.     nameNode = NameNode.createNameNode(args, conf);
  268.     
  269.     // Start the DataNodes
  270.     startDataNodes(conf, numDataNodes, manageDataDfsDirs, 
  271.                     operation, racks, hosts, simulatedCapacities);
  272.     waitClusterUp();
  273.   }
  274.   /**
  275.    * wait for the cluster to get out of 
  276.    * safemode.
  277.    */
  278.   public void waitClusterUp() {
  279.     if (numDataNodes > 0) {
  280.       while (!isClusterUp()) {
  281.         try {
  282.           System.err.println("Waiting for the Mini HDFS Cluster to start...");
  283.           Thread.sleep(1000);
  284.         } catch (InterruptedException e) {
  285.         }
  286.       }
  287.     }
  288.   }
  289.   /**
  290.    * Modify the config and start up additional DataNodes.  The info port for
  291.    * DataNodes is guaranteed to use a free port.
  292.    *  
  293.    *  Data nodes can run with the name node in the mini cluster or
  294.    *  a real name node. For example, running with a real name node is useful
  295.    *  when running simulated data nodes with a real name node.
  296.    *  If minicluster's name node is null assume that the conf has been
  297.    *  set with the right address:port of the name node.
  298.    *
  299.    * @param conf the base configuration to use in starting the DataNodes.  This
  300.    *          will be modified as necessary.
  301.    * @param numDataNodes Number of DataNodes to start; may be zero
  302.    * @param manageDfsDirs if true, the data directories for DataNodes will be
  303.    *          created and dfs.data.dir will be set in the conf
  304.    * @param operation the operation with which to start the DataNodes.  If null
  305.    *          or StartupOption.FORMAT, then StartupOption.REGULAR will be used.
  306.    * @param racks array of strings indicating the rack that each DataNode is on
  307.    * @param hosts array of strings indicating the hostnames for each DataNode
  308.    * @param simulatedCapacities array of capacities of the simulated data nodes
  309.    *
  310.    * @throws IllegalStateException if NameNode has been shutdown
  311.    */
  312.   public synchronized void startDataNodes(Configuration conf, int numDataNodes, 
  313.                              boolean manageDfsDirs, StartupOption operation, 
  314.                              String[] racks, String[] hosts,
  315.                              long[] simulatedCapacities) throws IOException {
  316.     int curDatanodesNum = dataNodes.size();
  317.     // for mincluster's the default initialDelay for BRs is 0
  318.     if (conf.get("dfs.blockreport.initialDelay") == null) {
  319.       conf.setLong("dfs.blockreport.initialDelay", 0);
  320.     }
  321.     // If minicluster's name node is null assume that the conf has been
  322.     // set with the right address:port of the name node.
  323.     //
  324.     if (nameNode != null) { // set conf from the name node
  325.       InetSocketAddress nnAddr = nameNode.getNameNodeAddress(); 
  326.       int nameNodePort = nnAddr.getPort(); 
  327.       FileSystem.setDefaultUri(conf, 
  328.                                "hdfs://"+ nnAddr.getHostName() +
  329.                                ":" + Integer.toString(nameNodePort));
  330.     }
  331.     
  332.     if (racks != null && numDataNodes > racks.length ) {
  333.       throw new IllegalArgumentException( "The length of racks [" + racks.length
  334.           + "] is less than the number of datanodes [" + numDataNodes + "].");
  335.     }
  336.     if (hosts != null && numDataNodes > hosts.length ) {
  337.       throw new IllegalArgumentException( "The length of hosts [" + hosts.length
  338.           + "] is less than the number of datanodes [" + numDataNodes + "].");
  339.     }
  340.     //Generate some hostnames if required
  341.     if (racks != null && hosts == null) {
  342.       System.out.println("Generating host names for datanodes");
  343.       hosts = new String[numDataNodes];
  344.       for (int i = curDatanodesNum; i < curDatanodesNum + numDataNodes; i++) {
  345.         hosts[i - curDatanodesNum] = "host" + i + ".foo.com";
  346.       }
  347.     }
  348.     if (simulatedCapacities != null 
  349.         && numDataNodes > simulatedCapacities.length) {
  350.       throw new IllegalArgumentException( "The length of simulatedCapacities [" 
  351.           + simulatedCapacities.length
  352.           + "] is less than the number of datanodes [" + numDataNodes + "].");
  353.     }
  354.     // Set up the right ports for the datanodes
  355.     conf.set("dfs.datanode.address", "127.0.0.1:0");
  356.     conf.set("dfs.datanode.http.address", "127.0.0.1:0");
  357.     conf.set("dfs.datanode.ipc.address", "127.0.0.1:0");
  358.     
  359.     String [] dnArgs = (operation == null ||
  360.                         operation != StartupOption.ROLLBACK) ?
  361.         null : new String[] {operation.getName()};
  362.     
  363.     
  364.     for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; i++) {
  365.       Configuration dnConf = new Configuration(conf);
  366.       if (manageDfsDirs) {
  367.         File dir1 = new File(data_dir, "data"+(2*i+1));
  368.         File dir2 = new File(data_dir, "data"+(2*i+2));
  369.         dir1.mkdirs();
  370.         dir2.mkdirs();
  371.         if (!dir1.isDirectory() || !dir2.isDirectory()) { 
  372.           throw new IOException("Mkdirs failed to create directory for DataNode "
  373.                                 + i + ": " + dir1 + " or " + dir2);
  374.         }
  375.         dnConf.set("dfs.data.dir", dir1.getPath() + "," + dir2.getPath()); 
  376.       }
  377.       if (simulatedCapacities != null) {
  378.         dnConf.setBoolean("dfs.datanode.simulateddatastorage", true);
  379.         dnConf.setLong(SimulatedFSDataset.CONFIG_PROPERTY_CAPACITY,
  380.             simulatedCapacities[i-curDatanodesNum]);
  381.       }
  382.       System.out.println("Starting DataNode " + i + " with dfs.data.dir: " 
  383.                          + dnConf.get("dfs.data.dir"));
  384.       if (hosts != null) {
  385.         dnConf.set("slave.host.name", hosts[i - curDatanodesNum]);
  386.         System.out.println("Starting DataNode " + i + " with hostname set to: " 
  387.                            + dnConf.get("slave.host.name"));
  388.       }
  389.       if (racks != null) {
  390.         String name = hosts[i - curDatanodesNum];
  391.         System.out.println("Adding node with hostname : " + name + " to rack "+
  392.                             racks[i-curDatanodesNum]);
  393.         StaticMapping.addNodeToRack(name,
  394.                                     racks[i-curDatanodesNum]);
  395.       }
  396.       Configuration newconf = new Configuration(dnConf); // save config
  397.       if (hosts != null) {
  398.         NetUtils.addStaticResolution(hosts[i - curDatanodesNum], "localhost");
  399.       }
  400.       DataNode dn = DataNode.instantiateDataNode(dnArgs, dnConf);
  401.       //since the HDFS does things based on IP:port, we need to add the mapping
  402.       //for IP:port to rackId
  403.       String ipAddr = dn.getSelfAddr().getAddress().getHostAddress();
  404.       if (racks != null) {
  405.         int port = dn.getSelfAddr().getPort();
  406.         System.out.println("Adding node with IP:port : " + ipAddr + ":" + port+
  407.                             " to rack " + racks[i-curDatanodesNum]);
  408.         StaticMapping.addNodeToRack(ipAddr + ":" + port,
  409.                                   racks[i-curDatanodesNum]);
  410.       }
  411.       DataNode.runDatanodeDaemon(dn);
  412.       dataNodes.add(new DataNodeProperties(dn, newconf, dnArgs));
  413.     }
  414.     curDatanodesNum += numDataNodes;
  415.     this.numDataNodes += numDataNodes;
  416.     waitActive();
  417.   }
  418.   
  419.   
  420.   
  421.   /**
  422.    * Modify the config and start up the DataNodes.  The info port for
  423.    * DataNodes is guaranteed to use a free port.
  424.    *
  425.    * @param conf the base configuration to use in starting the DataNodes.  This
  426.    *          will be modified as necessary.
  427.    * @param numDataNodes Number of DataNodes to start; may be zero
  428.    * @param manageDfsDirs if true, the data directories for DataNodes will be
  429.    *          created and dfs.data.dir will be set in the conf
  430.    * @param operation the operation with which to start the DataNodes.  If null
  431.    *          or StartupOption.FORMAT, then StartupOption.REGULAR will be used.
  432.    * @param racks array of strings indicating the rack that each DataNode is on
  433.    *
  434.    * @throws IllegalStateException if NameNode has been shutdown
  435.    */
  436.   
  437.   public void startDataNodes(Configuration conf, int numDataNodes, 
  438.       boolean manageDfsDirs, StartupOption operation, 
  439.       String[] racks
  440.       ) throws IOException {
  441.     startDataNodes( conf,  numDataNodes, manageDfsDirs,  operation, racks, null, null);
  442.   }
  443.   
  444.   /**
  445.    * Modify the config and start up additional DataNodes.  The info port for
  446.    * DataNodes is guaranteed to use a free port.
  447.    *  
  448.    *  Data nodes can run with the name node in the mini cluster or
  449.    *  a real name node. For example, running with a real name node is useful
  450.    *  when running simulated data nodes with a real name node.
  451.    *  If minicluster's name node is null assume that the conf has been
  452.    *  set with the right address:port of the name node.
  453.    *
  454.    * @param conf the base configuration to use in starting the DataNodes.  This
  455.    *          will be modified as necessary.
  456.    * @param numDataNodes Number of DataNodes to start; may be zero
  457.    * @param manageDfsDirs if true, the data directories for DataNodes will be
  458.    *          created and dfs.data.dir will be set in the conf
  459.    * @param operation the operation with which to start the DataNodes.  If null
  460.    *          or StartupOption.FORMAT, then StartupOption.REGULAR will be used.
  461.    * @param racks array of strings indicating the rack that each DataNode is on
  462.    * @param simulatedCapacities array of capacities of the simulated data nodes
  463.    *
  464.    * @throws IllegalStateException if NameNode has been shutdown
  465.    */
  466.   public void startDataNodes(Configuration conf, int numDataNodes, 
  467.                              boolean manageDfsDirs, StartupOption operation, 
  468.                              String[] racks,
  469.                              long[] simulatedCapacities) throws IOException {
  470.     startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks, null,
  471.                    simulatedCapacities);
  472.     
  473.   }
  474.   /**
  475.    * If the NameNode is running, attempt to finalize a previous upgrade.
  476.    * When this method return, the NameNode should be finalized, but
  477.    * DataNodes may not be since that occurs asynchronously.
  478.    *
  479.    * @throws IllegalStateException if the Namenode is not running.
  480.    */
  481.   public void finalizeCluster(Configuration conf) throws Exception {
  482.     if (nameNode == null) {
  483.       throw new IllegalStateException("Attempting to finalize "
  484.                                       + "Namenode but it is not running");
  485.     }
  486.     ToolRunner.run(new DFSAdmin(conf), new String[] {"-finalizeUpgrade"});
  487.   }
  488.   
  489.   /**
  490.    * Gets the started NameNode.  May be null.
  491.    */
  492.   public NameNode getNameNode() {
  493.     return nameNode;
  494.   }
  495.   
  496.   /**
  497.    * Gets a list of the started DataNodes.  May be empty.
  498.    */
  499.   public ArrayList<DataNode> getDataNodes() {
  500.     ArrayList<DataNode> list = new ArrayList<DataNode>();
  501.     for (int i = 0; i < dataNodes.size(); i++) {
  502.       DataNode node = dataNodes.get(i).datanode;
  503.       list.add(node);
  504.     }
  505.     return list;
  506.   }
  507.   
  508.   /** @return the datanode having the ipc server listen port */
  509.   public DataNode getDataNode(int ipcPort) {
  510.     for(DataNode dn : getDataNodes()) {
  511.       if (dn.ipcServer.getListenerAddress().getPort() == ipcPort) {
  512.         return dn;
  513.       }
  514.     }
  515.     return null;
  516.   }
  517.   /**
  518.    * Gets the rpc port used by the NameNode, because the caller 
  519.    * supplied port is not necessarily the actual port used.
  520.    */     
  521.   public int getNameNodePort() {
  522.     return nameNode.getNameNodeAddress().getPort();
  523.   }
  524.     
  525.   /**
  526.    * Shut down the servers that are up.
  527.    */
  528.   public void shutdown() {
  529.     System.out.println("Shutting down the Mini HDFS Cluster");
  530.     shutdownDataNodes();
  531.     if (nameNode != null) {
  532.       nameNode.stop();
  533.       nameNode.join();
  534.       nameNode = null;
  535.     }
  536.   }
  537.   
  538.   /**
  539.    * Shutdown all DataNodes started by this class.  The NameNode
  540.    * is left running so that new DataNodes may be started.
  541.    */
  542.   public void shutdownDataNodes() {
  543.     for (int i = dataNodes.size()-1; i >= 0; i--) {
  544.       System.out.println("Shutting down DataNode " + i);
  545.       DataNode dn = dataNodes.remove(i).datanode;
  546.       dn.shutdown();
  547.       numDataNodes--;
  548.     }
  549.   }
  550.   /*
  551.    * Corrupt a block on all datanode
  552.    */
  553.   void corruptBlockOnDataNodes(String blockName) throws Exception{
  554.     for (int i=0; i < dataNodes.size(); i++)
  555.       corruptBlockOnDataNode(i,blockName);
  556.   }
  557.   /*
  558.    * Corrupt a block on a particular datanode
  559.    */
  560.   boolean corruptBlockOnDataNode(int i, String blockName) throws Exception {
  561.     Random random = new Random();
  562.     boolean corrupted = false;
  563.     File dataDir = new File(System.getProperty("test.build.data", "build/test/data"), "dfs/data");
  564.     if (i < 0 || i >= dataNodes.size())
  565.       return false;
  566.     for (int dn = i*2; dn < i*2+2; dn++) {
  567.       File blockFile = new File(dataDir, "data" + (dn+1) + "/current/" +
  568.                                 blockName);
  569.       System.out.println("Corrupting for: " + blockFile);
  570.       if (blockFile.exists()) {
  571.         // Corrupt replica by writing random bytes into replica
  572.         RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
  573.         FileChannel channel = raFile.getChannel();
  574.         String badString = "BADBAD";
  575.         int rand = random.nextInt((int)channel.size()/2);
  576.         raFile.seek(rand);
  577.         raFile.write(badString.getBytes());
  578.         raFile.close();
  579.       }
  580.       corrupted = true;
  581.     }
  582.     return corrupted;
  583.   }
  584.   /*
  585.    * Shutdown a particular datanode
  586.    */
  587.   public DataNodeProperties stopDataNode(int i) {
  588.     if (i < 0 || i >= dataNodes.size()) {
  589.       return null;
  590.     }
  591.     DataNodeProperties dnprop = dataNodes.remove(i);
  592.     DataNode dn = dnprop.datanode;
  593.     System.out.println("MiniDFSCluster Stopping DataNode " + 
  594.                        dn.dnRegistration.getName() +
  595.                        " from a total of " + (dataNodes.size() + 1) + 
  596.                        " datanodes.");
  597.     dn.shutdown();
  598.     numDataNodes--;
  599.     return dnprop;
  600.   }
  601.   /**
  602.    * Restart a datanode
  603.    * @param dnprop datanode's property
  604.    * @return true if restarting is successful
  605.    * @throws IOException
  606.    */
  607.   public synchronized boolean restartDataNode(DataNodeProperties dnprop)
  608.   throws IOException {
  609.     Configuration conf = dnprop.conf;
  610.     String[] args = dnprop.dnArgs;
  611.     Configuration newconf = new Configuration(conf); // save cloned config
  612.     dataNodes.add(new DataNodeProperties(
  613.                      DataNode.createDataNode(args, conf), 
  614.                      newconf, args));
  615.     numDataNodes++;
  616.     return true;
  617.   }
  618.   /*
  619.    * Restart a particular datanode
  620.    */
  621.   public synchronized boolean restartDataNode(int i) throws IOException {
  622.     DataNodeProperties dnprop = stopDataNode(i);
  623.     if (dnprop == null) {
  624.       return false;
  625.     } else {
  626.       return restartDataNode(dnprop);
  627.     }
  628.   }
  629.   /*
  630.    * Shutdown a datanode by name.
  631.    */
  632.   public synchronized DataNodeProperties stopDataNode(String name) {
  633.     int i;
  634.     for (i = 0; i < dataNodes.size(); i++) {
  635.       DataNode dn = dataNodes.get(i).datanode;
  636.       if (dn.dnRegistration.getName().equals(name)) {
  637.         break;
  638.       }
  639.     }
  640.     return stopDataNode(i);
  641.   }
  642.   
  643.   /**
  644.    * Returns true if the NameNode is running and is out of Safe Mode.
  645.    */
  646.   public boolean isClusterUp() {
  647.     if (nameNode == null) {
  648.       return false;
  649.     }
  650.     try {
  651.       long[] sizes = nameNode.getStats();
  652.       boolean isUp = false;
  653.       synchronized (this) {
  654.         isUp = (!nameNode.isInSafeMode() && sizes[0] != 0);
  655.       }
  656.       return isUp;
  657.     } catch (IOException ie) {
  658.       return false;
  659.     }
  660.   }
  661.   
  662.   /**
  663.    * Returns true if there is at least one DataNode running.
  664.    */
  665.   public boolean isDataNodeUp() {
  666.     if (dataNodes == null || dataNodes.size() == 0) {
  667.       return false;
  668.     }
  669.     return true;
  670.   }
  671.   
  672.   /**
  673.    * Get a client handle to the DFS cluster.
  674.    */
  675.   public FileSystem getFileSystem() throws IOException {
  676.     return FileSystem.get(conf);
  677.   }
  678.   /**
  679.    * Get the directories where the namenode stores its image.
  680.    */
  681.   public Collection<File> getNameDirs() {
  682.     return FSNamesystem.getNamespaceDirs(conf);
  683.   }
  684.   /**
  685.    * Get the directories where the namenode stores its edits.
  686.    */
  687.   public Collection<File> getNameEditsDirs() {
  688.     return FSNamesystem.getNamespaceEditsDirs(conf);
  689.   }
  690.   /**
  691.    * Wait until the cluster is active and running.
  692.    */
  693.   public void waitActive() throws IOException {
  694.     if (nameNode == null) {
  695.       return;
  696.     }
  697.     InetSocketAddress addr = new InetSocketAddress("localhost",
  698.                                                    getNameNodePort());
  699.     DFSClient client = new DFSClient(addr, conf);
  700.     // make sure all datanodes are alive
  701.     while(client.datanodeReport(DatanodeReportType.LIVE).length
  702.         != numDataNodes) {
  703.       try {
  704.         Thread.sleep(500);
  705.       } catch (Exception e) {
  706.       }
  707.     }
  708.     client.close();
  709.   }
  710.   
  711.   public void formatDataNodeDirs() throws IOException {
  712.     base_dir = new File(System.getProperty("test.build.data", "build/test/data"), "dfs/");
  713.     data_dir = new File(base_dir, "data");
  714.     if (data_dir.exists() && !FileUtil.fullyDelete(data_dir)) {
  715.       throw new IOException("Cannot remove data directory: " + data_dir);
  716.     }
  717.   }
  718.   
  719.   /**
  720.    * 
  721.    * @param dataNodeIndex - data node whose block report is desired - the index is same as for getDataNodes()
  722.    * @return the block report for the specified data node
  723.    */
  724.   public Block[] getBlockReport(int dataNodeIndex) {
  725.     if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) {
  726.       throw new IndexOutOfBoundsException();
  727.     }
  728.     return dataNodes.get(dataNodeIndex).datanode.getFSDataset().getBlockReport();
  729.   }
  730.   
  731.   
  732.   /**
  733.    * 
  734.    * @return block reports from all data nodes
  735.    *    Block[] is indexed in the same order as the list of datanodes returned by getDataNodes()
  736.    */
  737.   public Block[][] getAllBlockReports() {
  738.     int numDataNodes = dataNodes.size();
  739.     Block[][] result = new Block[numDataNodes][];
  740.     for (int i = 0; i < numDataNodes; ++i) {
  741.      result[i] = getBlockReport(i);
  742.     }
  743.     return result;
  744.   }
  745.   
  746.   
  747.   /**
  748.    * This method is valid only if the data nodes have simulated data
  749.    * @param dataNodeIndex - data node i which to inject - the index is same as for getDataNodes()
  750.    * @param blocksToInject - the blocks
  751.    * @throws IOException
  752.    *              if not simulatedFSDataset
  753.    *             if any of blocks already exist in the data node
  754.    *   
  755.    */
  756.   public void injectBlocks(int dataNodeIndex, Block[] blocksToInject) throws IOException {
  757.     if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) {
  758.       throw new IndexOutOfBoundsException();
  759.     }
  760.     FSDatasetInterface dataSet = dataNodes.get(dataNodeIndex).datanode.getFSDataset();
  761.     if (!(dataSet instanceof SimulatedFSDataset)) {
  762.       throw new IOException("injectBlocks is valid only for SimilatedFSDataset");
  763.     }
  764.     SimulatedFSDataset sdataset = (SimulatedFSDataset) dataSet;
  765.     sdataset.injectBlocks(blocksToInject);
  766.     dataNodes.get(dataNodeIndex).datanode.scheduleBlockReport(0);
  767.   }
  768.   
  769.   /**
  770.    * This method is valid only if the data nodes have simulated data
  771.    * @param blocksToInject - blocksToInject[] is indexed in the same order as the list 
  772.    *             of datanodes returned by getDataNodes()
  773.    * @throws IOException
  774.    *             if not simulatedFSDataset
  775.    *             if any of blocks already exist in the data nodes
  776.    *             Note the rest of the blocks are not injected.
  777.    */
  778.   public void injectBlocks(Block[][] blocksToInject) throws IOException {
  779.     if (blocksToInject.length >  dataNodes.size()) {
  780.       throw new IndexOutOfBoundsException();
  781.     }
  782.     for (int i = 0; i < blocksToInject.length; ++i) {
  783.      injectBlocks(i, blocksToInject[i]);
  784.     }
  785.   }
  786.   /**
  787.    * Set the softLimit and hardLimit of client lease periods
  788.    */
  789.   void setLeasePeriod(long soft, long hard) {
  790.     nameNode.namesystem.leaseManager.setLeasePeriod(soft, hard);
  791.     nameNode.namesystem.lmthread.interrupt();
  792.   }
  793.   /**
  794.    * Returns the current set of datanodes
  795.    */
  796.   DataNode[] listDataNodes() {
  797.     DataNode[] list = new DataNode[dataNodes.size()];
  798.     for (int i = 0; i < dataNodes.size(); i++) {
  799.       list[i] = dataNodes.get(i).datanode;
  800.     }
  801.     return list;
  802.   }
  803.   /**
  804.    * Access to the data directory used for Datanodes
  805.    * @throws IOException 
  806.    */
  807.   public String getDataDirectory() {
  808.     return data_dir.getAbsolutePath();
  809.   }
  810. }