ReplicationTargetChooser.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:20k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfs.server.namenode;
  19. import org.apache.commons.logging.*;
  20. import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
  21. import org.apache.hadoop.hdfs.protocol.FSConstants;
  22. import org.apache.hadoop.hdfs.protocol.LocatedBlock;
  23. import org.apache.hadoop.net.NetworkTopology;
  24. import org.apache.hadoop.net.Node;
  25. import org.apache.hadoop.net.NodeBase;
  26. import java.util.*;
  27. /** The class is responsible for choosing the desired number of targets
  28.  * for placing block replicas.
  29.  * The replica placement strategy is that if the writer is on a datanode,
  30.  * the 1st replica is placed on the local machine, 
  31.  * otherwise a random datanode. The 2nd replica is placed on a datanode
  32.  * that is on a different rack. The 3rd replica is placed on a datanode
  33.  * which is on the same rack as the first replca.
  34.  */
  35. class ReplicationTargetChooser {
  36.   private final boolean considerLoad; 
  37.   private NetworkTopology clusterMap;
  38.   private FSNamesystem fs;
  39.     
  40.   ReplicationTargetChooser(boolean considerLoad,  FSNamesystem fs,
  41.                            NetworkTopology clusterMap) {
  42.     this.considerLoad = considerLoad;
  43.     this.fs = fs;
  44.     this.clusterMap = clusterMap;
  45.   }
  46.     
  47.   private static class NotEnoughReplicasException extends Exception {
  48.     NotEnoughReplicasException(String msg) {
  49.       super(msg);
  50.     }
  51.   }
  52.     
  53.   /**
  54.    * choose <i>numOfReplicas</i> data nodes for <i>writer</i> to replicate
  55.    * a block with size <i>blocksize</i> 
  56.    * If not, return as many as we can.
  57.    * 
  58.    * @param numOfReplicas: number of replicas wanted.
  59.    * @param writer: the writer's machine, null if not in the cluster.
  60.    * @param excludedNodes: datanodesthat should not be considered targets.
  61.    * @param blocksize: size of the data to be written.
  62.    * @return array of DatanodeDescriptor instances chosen as targets
  63.    * and sorted as a pipeline.
  64.    */
  65.   DatanodeDescriptor[] chooseTarget(int numOfReplicas,
  66.                                     DatanodeDescriptor writer,
  67.                                     List<Node> excludedNodes,
  68.                                     long blocksize) {
  69.     if (excludedNodes == null) {
  70.       excludedNodes = new ArrayList<Node>();
  71.     }
  72.       
  73.     return chooseTarget(numOfReplicas, writer, 
  74.                         new ArrayList<DatanodeDescriptor>(), excludedNodes, blocksize);
  75.   }
  76.     
  77.   /**
  78.    * choose <i>numOfReplicas</i> data nodes for <i>writer</i> 
  79.    * to re-replicate a block with size <i>blocksize</i> 
  80.    * If not, return as many as we can.
  81.    * 
  82.    * @param numOfReplicas: additional number of replicas wanted.
  83.    * @param writer: the writer's machine, null if not in the cluster.
  84.    * @param choosenNodes: datanodes that have been choosen as targets.
  85.    * @param excludedNodes: datanodesthat should not be considered targets.
  86.    * @param blocksize: size of the data to be written.
  87.    * @return array of DatanodeDescriptor instances chosen as target 
  88.    * and sorted as a pipeline.
  89.    */
  90.   DatanodeDescriptor[] chooseTarget(int numOfReplicas,
  91.                                     DatanodeDescriptor writer,
  92.                                     List<DatanodeDescriptor> choosenNodes,
  93.                                     List<Node> excludedNodes,
  94.                                     long blocksize) {
  95.     if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
  96.       return new DatanodeDescriptor[0];
  97.     }
  98.       
  99.     if (excludedNodes == null) {
  100.       excludedNodes = new ArrayList<Node>();
  101.     }
  102.       
  103.     int clusterSize = clusterMap.getNumOfLeaves();
  104.     int totalNumOfReplicas = choosenNodes.size()+numOfReplicas;
  105.     if (totalNumOfReplicas > clusterSize) {
  106.       numOfReplicas -= (totalNumOfReplicas-clusterSize);
  107.       totalNumOfReplicas = clusterSize;
  108.     }
  109.       
  110.     int maxNodesPerRack = 
  111.       (totalNumOfReplicas-1)/clusterMap.getNumOfRacks()+2;
  112.       
  113.     List<DatanodeDescriptor> results = 
  114.       new ArrayList<DatanodeDescriptor>(choosenNodes);
  115.     excludedNodes.addAll(choosenNodes);
  116.       
  117.     if (!clusterMap.contains(writer)) {
  118.       writer=null;
  119.     }
  120.       
  121.     DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer, 
  122.                                                 excludedNodes, blocksize, maxNodesPerRack, results);
  123.       
  124.     results.removeAll(choosenNodes);
  125.       
  126.     // sorting nodes to form a pipeline
  127.     return getPipeline((writer==null)?localNode:writer,
  128.                        results.toArray(new DatanodeDescriptor[results.size()]));
  129.   }
  130.     
  131.   /* choose <i>numOfReplicas</i> from all data nodes */
  132.   private DatanodeDescriptor chooseTarget(int numOfReplicas,
  133.                                           DatanodeDescriptor writer,
  134.                                           List<Node> excludedNodes,
  135.                                           long blocksize,
  136.                                           int maxNodesPerRack,
  137.                                           List<DatanodeDescriptor> results) {
  138.       
  139.     if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
  140.       return writer;
  141.     }
  142.       
  143.     int numOfResults = results.size();
  144.     boolean newBlock = (numOfResults==0);
  145.     if (writer == null && !newBlock) {
  146.       writer = (DatanodeDescriptor)results.get(0);
  147.     }
  148.       
  149.     try {
  150.       switch(numOfResults) {
  151.       case 0:
  152.         writer = chooseLocalNode(writer, excludedNodes, 
  153.                                  blocksize, maxNodesPerRack, results);
  154.         if (--numOfReplicas == 0) {
  155.           break;
  156.         }
  157.       case 1:
  158.         chooseRemoteRack(1, results.get(0), excludedNodes, 
  159.                          blocksize, maxNodesPerRack, results);
  160.         if (--numOfReplicas == 0) {
  161.           break;
  162.         }
  163.       case 2:
  164.         if (clusterMap.isOnSameRack(results.get(0), results.get(1))) {
  165.           chooseRemoteRack(1, results.get(0), excludedNodes,
  166.                            blocksize, maxNodesPerRack, results);
  167.         } else if (newBlock){
  168.           chooseLocalRack(results.get(1), excludedNodes, blocksize, 
  169.                           maxNodesPerRack, results);
  170.         } else {
  171.           chooseLocalRack(writer, excludedNodes, blocksize,
  172.                           maxNodesPerRack, results);
  173.         }
  174.         if (--numOfReplicas == 0) {
  175.           break;
  176.         }
  177.       default:
  178.         chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, 
  179.                      blocksize, maxNodesPerRack, results);
  180.       }
  181.     } catch (NotEnoughReplicasException e) {
  182.       FSNamesystem.LOG.warn("Not able to place enough replicas, still in need of "
  183.                + numOfReplicas);
  184.     }
  185.     return writer;
  186.   }
  187.     
  188.   /* choose <i>localMachine</i> as the target.
  189.    * if <i>localMachine</i> is not availabe, 
  190.    * choose a node on the same rack
  191.    * @return the choosen node
  192.    */
  193.   private DatanodeDescriptor chooseLocalNode(
  194.                                              DatanodeDescriptor localMachine,
  195.                                              List<Node> excludedNodes,
  196.                                              long blocksize,
  197.                                              int maxNodesPerRack,
  198.                                              List<DatanodeDescriptor> results)
  199.     throws NotEnoughReplicasException {
  200.     // if no local machine, randomly choose one node
  201.     if (localMachine == null)
  202.       return chooseRandom(NodeBase.ROOT, excludedNodes, 
  203.                           blocksize, maxNodesPerRack, results);
  204.       
  205.     // otherwise try local machine first
  206.     if (!excludedNodes.contains(localMachine)) {
  207.       excludedNodes.add(localMachine);
  208.       if (isGoodTarget(localMachine, blocksize,
  209.                        maxNodesPerRack, false, results)) {
  210.         results.add(localMachine);
  211.         return localMachine;
  212.       }
  213.     } 
  214.       
  215.     // try a node on local rack
  216.     return chooseLocalRack(localMachine, excludedNodes, 
  217.                            blocksize, maxNodesPerRack, results);
  218.   }
  219.     
  220.   /* choose one node from the rack that <i>localMachine</i> is on.
  221.    * if no such node is availabe, choose one node from the rack where
  222.    * a second replica is on.
  223.    * if still no such node is available, choose a random node 
  224.    * in the cluster.
  225.    * @return the choosen node
  226.    */
  227.   private DatanodeDescriptor chooseLocalRack(
  228.                                              DatanodeDescriptor localMachine,
  229.                                              List<Node> excludedNodes,
  230.                                              long blocksize,
  231.                                              int maxNodesPerRack,
  232.                                              List<DatanodeDescriptor> results)
  233.     throws NotEnoughReplicasException {
  234.     // no local machine, so choose a random machine
  235.     if (localMachine == null) {
  236.       return chooseRandom(NodeBase.ROOT, excludedNodes, 
  237.                           blocksize, maxNodesPerRack, results);
  238.     }
  239.       
  240.     // choose one from the local rack
  241.     try {
  242.       return chooseRandom(
  243.                           localMachine.getNetworkLocation(),
  244.                           excludedNodes, blocksize, maxNodesPerRack, results);
  245.     } catch (NotEnoughReplicasException e1) {
  246.       // find the second replica
  247.       DatanodeDescriptor newLocal=null;
  248.       for(Iterator<DatanodeDescriptor> iter=results.iterator();
  249.           iter.hasNext();) {
  250.         DatanodeDescriptor nextNode = iter.next();
  251.         if (nextNode != localMachine) {
  252.           newLocal = nextNode;
  253.           break;
  254.         }
  255.       }
  256.       if (newLocal != null) {
  257.         try {
  258.           return chooseRandom(
  259.                               newLocal.getNetworkLocation(),
  260.                               excludedNodes, blocksize, maxNodesPerRack, results);
  261.         } catch(NotEnoughReplicasException e2) {
  262.           //otherwise randomly choose one from the network
  263.           return chooseRandom(NodeBase.ROOT, excludedNodes,
  264.                               blocksize, maxNodesPerRack, results);
  265.         }
  266.       } else {
  267.         //otherwise randomly choose one from the network
  268.         return chooseRandom(NodeBase.ROOT, excludedNodes,
  269.                             blocksize, maxNodesPerRack, results);
  270.       }
  271.     }
  272.   }
  273.     
  274.   /* choose <i>numOfReplicas</i> nodes from the racks 
  275.    * that <i>localMachine</i> is NOT on.
  276.    * if not enough nodes are availabe, choose the remaining ones 
  277.    * from the local rack
  278.    */
  279.     
  280.   private void chooseRemoteRack(int numOfReplicas,
  281.                                 DatanodeDescriptor localMachine,
  282.                                 List<Node> excludedNodes,
  283.                                 long blocksize,
  284.                                 int maxReplicasPerRack,
  285.                                 List<DatanodeDescriptor> results)
  286.     throws NotEnoughReplicasException {
  287.     int oldNumOfReplicas = results.size();
  288.     // randomly choose one node from remote racks
  289.     try {
  290.       chooseRandom(numOfReplicas, "~"+localMachine.getNetworkLocation(),
  291.                    excludedNodes, blocksize, maxReplicasPerRack, results);
  292.     } catch (NotEnoughReplicasException e) {
  293.       chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas),
  294.                    localMachine.getNetworkLocation(), excludedNodes, blocksize, 
  295.                    maxReplicasPerRack, results);
  296.     }
  297.   }
  298.   /* Randomly choose one target from <i>nodes</i>.
  299.    * @return the choosen node
  300.    */
  301.   private DatanodeDescriptor chooseRandom(
  302.                                           String nodes,
  303.                                           List<Node> excludedNodes,
  304.                                           long blocksize,
  305.                                           int maxNodesPerRack,
  306.                                           List<DatanodeDescriptor> results) 
  307.     throws NotEnoughReplicasException {
  308.     DatanodeDescriptor result;
  309.     do {
  310.       DatanodeDescriptor[] selectedNodes = 
  311.         chooseRandom(1, nodes, excludedNodes);
  312.       if (selectedNodes.length == 0) {
  313.         throw new NotEnoughReplicasException(
  314.                                              "Not able to place enough replicas");
  315.       }
  316.       result = (DatanodeDescriptor)(selectedNodes[0]);
  317.     } while(!isGoodTarget(result, blocksize, maxNodesPerRack, results));
  318.     results.add(result);
  319.     return result;
  320.   }
  321.     
  322.   /* Randomly choose <i>numOfReplicas</i> targets from <i>nodes</i>.
  323.    */
  324.   private void chooseRandom(int numOfReplicas,
  325.                             String nodes,
  326.                             List<Node> excludedNodes,
  327.                             long blocksize,
  328.                             int maxNodesPerRack,
  329.                             List<DatanodeDescriptor> results)
  330.     throws NotEnoughReplicasException {
  331.     boolean toContinue = true;
  332.     do {
  333.       DatanodeDescriptor[] selectedNodes = 
  334.         chooseRandom(numOfReplicas, nodes, excludedNodes);
  335.       if (selectedNodes.length < numOfReplicas) {
  336.         toContinue = false;
  337.       }
  338.       for(int i=0; i<selectedNodes.length; i++) {
  339.         DatanodeDescriptor result = selectedNodes[i];
  340.         if (isGoodTarget(result, blocksize, maxNodesPerRack, results)) {
  341.           numOfReplicas--;
  342.           results.add(result);
  343.         }
  344.       } // end of for
  345.     } while (numOfReplicas>0 && toContinue);
  346.       
  347.     if (numOfReplicas>0) {
  348.       throw new NotEnoughReplicasException(
  349.                                            "Not able to place enough replicas");
  350.     }
  351.   }
  352.     
  353.   /* Randomly choose <i>numOfNodes</i> nodes from <i>scope</i>.
  354.    * @return the choosen nodes
  355.    */
  356.   private DatanodeDescriptor[] chooseRandom(int numOfReplicas, 
  357.                                             String nodes,
  358.                                             List<Node> excludedNodes) {
  359.     List<DatanodeDescriptor> results = 
  360.       new ArrayList<DatanodeDescriptor>();
  361.     int numOfAvailableNodes =
  362.       clusterMap.countNumOfAvailableNodes(nodes, excludedNodes);
  363.     numOfReplicas = (numOfAvailableNodes<numOfReplicas)?
  364.       numOfAvailableNodes:numOfReplicas;
  365.     while(numOfReplicas > 0) {
  366.       DatanodeDescriptor choosenNode = 
  367.         (DatanodeDescriptor)(clusterMap.chooseRandom(nodes));
  368.       if (!excludedNodes.contains(choosenNode)) {
  369.         results.add(choosenNode);
  370.         excludedNodes.add(choosenNode);
  371.         numOfReplicas--;
  372.       }
  373.     }
  374.     return (DatanodeDescriptor[])results.toArray(
  375.                                                  new DatanodeDescriptor[results.size()]);    
  376.   }
  377.     
  378.   /* judge if a node is a good target.
  379.    * return true if <i>node</i> has enough space, 
  380.    * does not have too much load, and the rack does not have too many nodes
  381.    */
  382.   private boolean isGoodTarget(DatanodeDescriptor node,
  383.                                long blockSize, int maxTargetPerLoc,
  384.                                List<DatanodeDescriptor> results) {
  385.     return isGoodTarget(node, blockSize, maxTargetPerLoc,
  386.                         this.considerLoad, results);
  387.   }
  388.     
  389.   private boolean isGoodTarget(DatanodeDescriptor node,
  390.                                long blockSize, int maxTargetPerLoc,
  391.                                boolean considerLoad,
  392.                                List<DatanodeDescriptor> results) {
  393.     Log logr = FSNamesystem.LOG;
  394.     // check if the node is (being) decommissed
  395.     if (node.isDecommissionInProgress() || node.isDecommissioned()) {
  396.       logr.debug("Node "+NodeBase.getPath(node)+
  397.                 " is not chosen because the node is (being) decommissioned");
  398.       return false;
  399.     }
  400.     long remaining = node.getRemaining() - 
  401.                      (node.getBlocksScheduled() * blockSize); 
  402.     // check the remaining capacity of the target machine
  403.     if (blockSize* FSConstants.MIN_BLOCKS_FOR_WRITE>remaining) {
  404.       logr.debug("Node "+NodeBase.getPath(node)+
  405.                 " is not chosen because the node does not have enough space");
  406.       return false;
  407.     }
  408.       
  409.     // check the communication traffic of the target machine
  410.     if (considerLoad) {
  411.       double avgLoad = 0;
  412.       int size = clusterMap.getNumOfLeaves();
  413.       if (size != 0) {
  414.         avgLoad = (double)fs.getTotalLoad()/size;
  415.       }
  416.       if (node.getXceiverCount() > (2.0 * avgLoad)) {
  417.         logr.debug("Node "+NodeBase.getPath(node)+
  418.                   " is not chosen because the node is too busy");
  419.         return false;
  420.       }
  421.     }
  422.       
  423.     // check if the target rack has chosen too many nodes
  424.     String rackname = node.getNetworkLocation();
  425.     int counter=1;
  426.     for(Iterator<DatanodeDescriptor> iter = results.iterator();
  427.         iter.hasNext();) {
  428.       Node result = iter.next();
  429.       if (rackname.equals(result.getNetworkLocation())) {
  430.         counter++;
  431.       }
  432.     }
  433.     if (counter>maxTargetPerLoc) {
  434.       logr.debug("Node "+NodeBase.getPath(node)+
  435.                 " is not chosen because the rack has too many chosen nodes");
  436.       return false;
  437.     }
  438.     return true;
  439.   }
  440.     
  441.   /* Return a pipeline of nodes.
  442.    * The pipeline is formed finding a shortest path that 
  443.    * starts from the writer and tranverses all <i>nodes</i>
  444.    * This is basically a traveling salesman problem.
  445.    */
  446.   private DatanodeDescriptor[] getPipeline(
  447.                                            DatanodeDescriptor writer,
  448.                                            DatanodeDescriptor[] nodes) {
  449.     if (nodes.length==0) return nodes;
  450.       
  451.     synchronized(clusterMap) {
  452.       int index=0;
  453.       if (writer == null || !clusterMap.contains(writer)) {
  454.         writer = nodes[0];
  455.       }
  456.       for(;index<nodes.length; index++) {
  457.         DatanodeDescriptor shortestNode = nodes[index];
  458.         int shortestDistance = clusterMap.getDistance(writer, shortestNode);
  459.         int shortestIndex = index;
  460.         for(int i=index+1; i<nodes.length; i++) {
  461.           DatanodeDescriptor currentNode = nodes[i];
  462.           int currentDistance = clusterMap.getDistance(writer, currentNode);
  463.           if (shortestDistance>currentDistance) {
  464.             shortestDistance = currentDistance;
  465.             shortestNode = currentNode;
  466.             shortestIndex = i;
  467.           }
  468.         }
  469.         //switch position index & shortestIndex
  470.         if (index != shortestIndex) {
  471.           nodes[shortestIndex] = nodes[index];
  472.           nodes[index] = shortestNode;
  473.         }
  474.         writer = shortestNode;
  475.       }
  476.     }
  477.     return nodes;
  478.   }
  479.   /**
  480.    * Verify that the block is replicated on at least 2 different racks
  481.    * if there is more than one rack in the system.
  482.    * 
  483.    * @param lBlk block with locations
  484.    * @param cluster 
  485.    * @return 1 if the block must be relicated on additional rack,
  486.    * or 0 if the number of racks is sufficient.
  487.    */
  488.   public static int verifyBlockPlacement(LocatedBlock lBlk,
  489.                                          short replication,
  490.                                          NetworkTopology cluster) {
  491.     int numRacks = verifyBlockPlacement(lBlk, Math.min(2,replication), cluster);
  492.     return numRacks < 0 ? 0 : numRacks;
  493.   }
  494.   /**
  495.    * Verify that the block is replicated on at least minRacks different racks
  496.    * if there is more than minRacks rack in the system.
  497.    * 
  498.    * @param lBlk block with locations
  499.    * @param minRacks number of racks the block should be replicated to
  500.    * @param cluster 
  501.    * @return the difference between the required and the actual number of racks
  502.    * the block is replicated to.
  503.    */
  504.   public static int verifyBlockPlacement(LocatedBlock lBlk,
  505.                                          int minRacks,
  506.                                          NetworkTopology cluster) {
  507.     DatanodeInfo[] locs = lBlk.getLocations();
  508.     if (locs == null)
  509.       locs = new DatanodeInfo[0];
  510.     int numRacks = cluster.getNumOfRacks();
  511.     if(numRacks <= 1) // only one rack
  512.       return 0;
  513.     minRacks = Math.min(minRacks, numRacks);
  514.     // 1. Check that all locations are different.
  515.     // 2. Count locations on different racks.
  516.     Set<String> racks = new TreeSet<String>();
  517.     for (DatanodeInfo dn : locs)
  518.       racks.add(dn.getNetworkLocation());
  519.     return minRacks - racks.size();
  520.   }
  521. } //end of Replicator