DataNodeCluster.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:9k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfs;
  19. import java.io.IOException;
  20. import java.net.UnknownHostException;
  21. import java.security.NoSuchAlgorithmException;
  22. import java.security.SecureRandom;
  23. import java.util.Random;
  24. import org.apache.hadoop.conf.Configuration;
  25. import org.apache.hadoop.fs.FileSystem;
  26. import org.apache.hadoop.hdfs.protocol.Block;
  27. import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
  28. import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
  29. import org.apache.hadoop.hdfs.server.namenode.CreateEditsLog;
  30. import org.apache.hadoop.net.DNS;
  31. /**
  32.  * 
  33.  * 
  34.  * This program starts a mini cluster of data nodes
  35.  *  (ie a mini cluster without the name node), all within one address space.
  36.  *  It is assumed that the name node has been started separately prior
  37.  *  to running this program.
  38.  *  
  39.  *  A use case of this is to run a real name node with a large number of
  40.  *  simulated data nodes for say a NN benchmark.
  41.  *  
  42.  * Synopisis:
  43.  *   DataNodeCluster -n numDatNodes [-racks numRacks] -simulated
  44.  *              [-inject startingBlockId numBlocksPerDN]
  45.  *              [ -r replicationForInjectedBlocks ]
  46.  *              [-d editsLogDirectory]
  47.  *
  48.  * if -simulated is specified then simulated data nodes are started.
  49.  * if -inject is specified then blocks are injected in each datanode;
  50.  *    -inject option is valid only for simulated data nodes.
  51.  *    
  52.  *    See Also @link #CreateEditsLog for creating a edits log file to
  53.  *    inject a matching set of blocks into into a name node.
  54.  *    Typical use of -inject is to inject blocks into a set of datanodes
  55.  *    using this DataNodeCLuster command
  56.  *    and then to inject the same blocks into a name node using the
  57.  *    CreateEditsLog command.
  58.  *
  59.  */
  60. public class DataNodeCluster {
  61.   static final String DATANODE_DIRS = "/tmp/DataNodeCluster";
  62.   static String dataNodeDirs = DATANODE_DIRS;
  63.   static final String USAGE =
  64.     "Usage: datanodecluster " +
  65.     " -n <numDataNodes> " + 
  66.     " [-racks <numRacks>] " +
  67.     " [-simulated] " +
  68.     " [-inject startingBlockId numBlocksPerDN]" +
  69.     " [-r replicationFactorForInjectedBlocks]" +
  70.     " [-d dataNodeDirs]n" + 
  71.     "      Default datanode direcory is " + DATANODE_DIRS + "n" +
  72.     "      Default replication factor for injected blocks is 1n" +
  73.     "      Defaul rack is used if -racks is not specifiedn" +
  74.     "      Data nodes are simulated if -simulated OR conf file specifies simulatedn";
  75.   
  76.   
  77.   static void printUsageExit() {
  78.     System.out.println(USAGE);
  79.     System.exit(-1); 
  80.   }
  81.   static void printUsageExit(String err) {
  82.     System.out.println(err);
  83.     printUsageExit();
  84.   }
  85.   
  86.   public static void main(String[] args) {
  87.     int numDataNodes = 0;
  88.     int numRacks = 0;
  89.     boolean inject = false;
  90.     long startingBlockId = 1;
  91.     int numBlocksPerDNtoInject = 0;
  92.     int replication = 1;
  93.     
  94.     Configuration conf = new Configuration();
  95.     for (int i = 0; i < args.length; i++) { // parse command line
  96.       if (args[i].equals("-n")) {
  97.         if (++i >= args.length || args[i].startsWith("-")) {
  98.           printUsageExit("missing number of nodes");
  99.         }
  100.         numDataNodes = Integer.parseInt(args[i]);
  101.       } else if (args[i].equals("-racks")) {
  102.         if (++i >= args.length  || args[i].startsWith("-")) {
  103.           printUsageExit("Missing number of racks");
  104.         }
  105.         numRacks = Integer.parseInt(args[i]);
  106.       } else if (args[i].equals("-r")) {
  107.         if (++i >= args.length || args[i].startsWith("-")) {
  108.           printUsageExit("Missing replicaiton factor");
  109.         }
  110.         replication = Integer.parseInt(args[i]);
  111.       } else if (args[i].equals("-d")) {
  112.         if (++i >= args.length || args[i].startsWith("-")) {
  113.           printUsageExit("Missing datanode dirs parameter");
  114.         }
  115.         dataNodeDirs = args[i];
  116.       } else if (args[i].equals("-simulated")) {
  117.         conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
  118.       } else if (args[i].equals("-inject")) {
  119.         if (!conf.getBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED,
  120.                                                                 false) ) {
  121.           System.out.print("-inject is valid only for simulated");
  122.           printUsageExit(); 
  123.         }
  124.        inject = true;
  125.        if (++i >= args.length  || args[i].startsWith("-")) {
  126.          printUsageExit(
  127.              "Missing starting block and number of blocks per DN to inject");
  128.        }
  129.        startingBlockId = Integer.parseInt(args[i]);
  130.        if (++i >= args.length  || args[i].startsWith("-")) {
  131.          printUsageExit("Missing number of blocks to inject");
  132.        }
  133.        numBlocksPerDNtoInject = Integer.parseInt(args[i]);      
  134.       } else {
  135.         printUsageExit();
  136.       }
  137.     }
  138.     if (numDataNodes <= 0 || replication <= 0 ) {
  139.       printUsageExit("numDataNodes and replication have to be greater than zero");
  140.     }
  141.     if (replication > numDataNodes) {
  142.       printUsageExit("Replication must be less than or equal to numDataNodes");
  143.       
  144.     }
  145.     String nameNodeAdr = FileSystem.getDefaultUri(conf).getAuthority();
  146.     if (nameNodeAdr == null) {
  147.       System.out.println("No name node address and port in config");
  148.       System.exit(-1);
  149.     }
  150.     boolean simulated = 
  151.       conf.getBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, false);
  152.     System.out.println("Starting " + numDataNodes + 
  153.           (simulated ? " Simulated " : " ") +
  154.           " Data Nodes that will connect to Name Node at " + nameNodeAdr);
  155.   
  156.     System.setProperty("test.build.data", dataNodeDirs);
  157.     MiniDFSCluster mc = new MiniDFSCluster();
  158.     try {
  159.       mc.formatDataNodeDirs();
  160.     } catch (IOException e) {
  161.       System.out.println("Error formating data node dirs:" + e);
  162.     }
  163.     String[] rack4DataNode = null;
  164.     if (numRacks > 0) {
  165.       System.out.println("Using " + numRacks + " racks: ");
  166.       String rackPrefix = getUniqueRackPrefix();
  167.       rack4DataNode = new String[numDataNodes];
  168.       for (int i = 0; i < numDataNodes; ++i ) {
  169.         //rack4DataNode[i] = racks[i%numRacks];
  170.         rack4DataNode[i] = rackPrefix + "-" + i%numRacks;
  171.         System.out.println("Data Node " + i + " using " + rack4DataNode[i]);
  172.         
  173.         
  174.       }
  175.     }
  176.     try {
  177.       mc.startDataNodes(conf, numDataNodes, true, StartupOption.REGULAR,
  178.           rack4DataNode);
  179.       if (inject) {
  180.         long blockSize = 10;
  181.         System.out.println("Injecting " + numBlocksPerDNtoInject +
  182.             " blocks in each DN starting at blockId " + startingBlockId +
  183.             " with blocksize of " + blockSize);
  184.         Block[] blocks = new Block[numBlocksPerDNtoInject];
  185.         long blkid = startingBlockId;
  186.         for (int i_dn = 0; i_dn < numDataNodes; ++i_dn) {
  187.           for (int i = 0; i < blocks.length; ++i) {
  188.             blocks[i] = new Block(blkid++, blockSize,
  189.                 CreateEditsLog.BLOCK_GENERATION_STAMP);
  190.           }
  191.           for (int i = 1; i <= replication; ++i) { 
  192.             // inject blocks for dn_i into dn_i and replica in dn_i's neighbors 
  193.             mc.injectBlocks((i_dn + i- 1)% numDataNodes, blocks);
  194.             System.out.println("Injecting blocks of dn " + i_dn  + " into dn" + 
  195.                 ((i_dn + i- 1)% numDataNodes));
  196.           }
  197.         }
  198.         System.out.println("Created blocks from Bids " 
  199.             + startingBlockId + " to "  + (blkid -1));
  200.       }
  201.     } catch (IOException e) {
  202.       System.out.println("Error creating data node:" + e);
  203.     }  
  204.   }
  205.   /*
  206.    * There is high probability that the rack id generated here will 
  207.    * not conflict with those of other data node cluster.
  208.    * Not perfect but mostly unique rack ids are good enough
  209.    */
  210.   static private String getUniqueRackPrefix() {
  211.   
  212.     String ip = "unknownIP";
  213.     try {
  214.       ip = DNS.getDefaultIP("default");
  215.     } catch (UnknownHostException ignored) {
  216.       System.out.println("Could not find ip address of "default" inteface.");
  217.     }
  218.     
  219.     int rand = 0;
  220.     try {
  221.       rand = SecureRandom.getInstance("SHA1PRNG").nextInt(Integer.MAX_VALUE);
  222.     } catch (NoSuchAlgorithmException e) {
  223.       rand = (new Random()).nextInt(Integer.MAX_VALUE);
  224.     }
  225.     return "/Rack-" + rand + "-"+ ip  + "-" + 
  226.                       System.currentTimeMillis();
  227.   }
  228. }