NetworkTopology.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:21k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.net;
  19. import java.util.ArrayList;
  20. import java.util.Collection;
  21. import java.util.List;
  22. import java.util.Random;
  23. import java.util.concurrent.locks.ReadWriteLock;
  24. import java.util.concurrent.locks.ReentrantReadWriteLock;
  25. import org.apache.commons.logging.Log;
  26. import org.apache.commons.logging.LogFactory;
  27. /** The class represents a cluster of computer with a tree hierarchical
  28.  * network topology.
  29.  * For example, a cluster may be consists of many data centers filled 
  30.  * with racks of computers.
  31.  * In a network topology, leaves represent data nodes (computers) and inner
  32.  * nodes represent switches/routers that manage traffic in/out of data centers
  33.  * or racks.  
  34.  * 
  35.  */
  36. public class NetworkTopology {
  37.   public final static String DEFAULT_RACK = "/default-rack";
  38.   public final static int DEFAULT_HOST_LEVEL = 2;
  39.   public static final Log LOG = 
  40.     LogFactory.getLog(NetworkTopology.class);
  41.     
  42.   /* Inner Node represent a switch/router of a data center or rack.
  43.    * Different from a leave node, it has non-null children.
  44.    */
  45.   private class InnerNode extends NodeBase {
  46.     private ArrayList<Node> children=new ArrayList<Node>();
  47.     private int numOfLeaves;
  48.         
  49.     /** Construct an InnerNode from a path-like string */
  50.     InnerNode(String path) {
  51.       super(path);
  52.     }
  53.         
  54.     /** Construct an InnerNode from its name and its network location */
  55.     InnerNode(String name, String location) {
  56.       super(name, location);
  57.     }
  58.         
  59.     /** Construct an InnerNode
  60.      * from its name, its network location, its parent, and its level */
  61.     InnerNode(String name, String location, InnerNode parent, int level) {
  62.       super(name, location, parent, level);
  63.     }
  64.         
  65.     /** Get its children */
  66.     Collection<Node> getChildren() {return children;}
  67.         
  68.     /** Return the number of children this node has */
  69.     int getNumOfChildren() {
  70.       return children.size();
  71.     }
  72.         
  73.     /** Judge if this node represents a rack 
  74.      * Return true if it has no child or its children are not InnerNodes
  75.      */ 
  76.     boolean isRack() {
  77.       if (children.isEmpty()) {
  78.         return true;
  79.       }
  80.             
  81.       Node firstChild = children.get(0);
  82.       if (firstChild instanceof InnerNode) {
  83.         return false;
  84.       }
  85.             
  86.       return true;
  87.     }
  88.         
  89.     /** Judge if this node is an ancestor of node <i>n</i>
  90.      * 
  91.      * @param n a node
  92.      * @return true if this node is an ancestor of <i>n</i>
  93.      */
  94.     boolean isAncestor(Node n) {
  95.       return getPath(this).equals(NodeBase.PATH_SEPARATOR_STR) ||
  96.         (n.getNetworkLocation()+NodeBase.PATH_SEPARATOR_STR).
  97.         startsWith(getPath(this)+NodeBase.PATH_SEPARATOR_STR);
  98.     }
  99.         
  100.     /** Judge if this node is the parent of node <i>n</i>
  101.      * 
  102.      * @param n a node
  103.      * @return true if this node is the parent of <i>n</i>
  104.      */
  105.     boolean isParent(Node n) {
  106.       return n.getNetworkLocation().equals(getPath(this));
  107.     }
  108.         
  109.     /* Return a child name of this node who is an ancestor of node <i>n</i> */
  110.     private String getNextAncestorName(Node n) {
  111.       if (!isAncestor(n)) {
  112.         throw new IllegalArgumentException(
  113.                                            this + "is not an ancestor of " + n);
  114.       }
  115.       String name = n.getNetworkLocation().substring(getPath(this).length());
  116.       if (name.charAt(0) == PATH_SEPARATOR) {
  117.         name = name.substring(1);
  118.       }
  119.       int index=name.indexOf(PATH_SEPARATOR);
  120.       if (index !=-1)
  121.         name = name.substring(0, index);
  122.       return name;
  123.     }
  124.         
  125.     /** Add node <i>n</i> to the subtree of this node 
  126.      * @param n node to be added
  127.      * @return true if the node is added; false otherwise
  128.      */
  129.     boolean add(Node n) {
  130.       if (!isAncestor(n))
  131.         throw new IllegalArgumentException(n.getName()+", which is located at "
  132.                 +n.getNetworkLocation()+", is not a decendent of "
  133.                 +getPath(this));
  134.       if (isParent(n)) {
  135.         // this node is the parent of n; add n directly
  136.         n.setParent(this);
  137.         n.setLevel(this.level+1);
  138.         for(int i=0; i<children.size(); i++) {
  139.           if (children.get(i).getName().equals(n.getName())) {
  140.             children.set(i, n);
  141.             return false;
  142.           }
  143.         }
  144.         children.add(n);
  145.         numOfLeaves++;
  146.         return true;
  147.       } else {
  148.         // find the next ancestor node
  149.         String parentName = getNextAncestorName(n);
  150.         InnerNode parentNode = null;
  151.         for(int i=0; i<children.size(); i++) {
  152.           if (children.get(i).getName().equals(parentName)) {
  153.             parentNode = (InnerNode)children.get(i);
  154.             break;
  155.           }
  156.         }
  157.         if (parentNode == null) {
  158.           // create a new InnerNode
  159.           parentNode = new InnerNode(parentName, getPath(this),
  160.                                      this, this.getLevel()+1);
  161.           children.add(parentNode);
  162.         }
  163.         // add n to the subtree of the next ancestor node
  164.         if (parentNode.add(n)) {
  165.           numOfLeaves++;
  166.           return true;
  167.         } else {
  168.           return false;
  169.         }
  170.       }
  171.     }
  172.         
  173.     /** Remove node <i>n</i> from the subtree of this node
  174.      * @param n node to be deleted 
  175.      * @return true if the node is deleted; false otherwise
  176.      */
  177.     boolean remove(Node n) {
  178.       String parent = n.getNetworkLocation();
  179.       String currentPath = getPath(this);
  180.       if (!isAncestor(n))
  181.         throw new IllegalArgumentException(n.getName()
  182.                                            +", which is located at "
  183.                                            +parent+", is not a descendent of "+currentPath);
  184.       if (isParent(n)) {
  185.         // this node is the parent of n; remove n directly
  186.         for(int i=0; i<children.size(); i++) {
  187.           if (children.get(i).getName().equals(n.getName())) {
  188.             children.remove(i);
  189.             numOfLeaves--;
  190.             n.setParent(null);
  191.             return true;
  192.           }
  193.         }
  194.         return false;
  195.       } else {
  196.         // find the next ancestor node: the parent node
  197.         String parentName = getNextAncestorName(n);
  198.         InnerNode parentNode = null;
  199.         int i;
  200.         for(i=0; i<children.size(); i++) {
  201.           if (children.get(i).getName().equals(parentName)) {
  202.             parentNode = (InnerNode)children.get(i);
  203.             break;
  204.           }
  205.         }
  206.         if (parentNode==null) {
  207.           return false;
  208.         }
  209.         // remove n from the parent node
  210.         boolean isRemoved = parentNode.remove(n);
  211.         // if the parent node has no children, remove the parent node too
  212.         if (isRemoved) {
  213.           if (parentNode.getNumOfChildren() == 0) {
  214.             children.remove(i);
  215.           }
  216.           numOfLeaves--;
  217.         }
  218.         return isRemoved;
  219.       }
  220.     } // end of remove
  221.         
  222.     /** Given a node's string representation, return a reference to the node */ 
  223.     private Node getLoc(String loc) {
  224.       if (loc == null || loc.length() == 0) return this;
  225.             
  226.       String[] path = loc.split(PATH_SEPARATOR_STR, 2);
  227.       Node childnode = null;
  228.       for(int i=0; i<children.size(); i++) {
  229.         if (children.get(i).getName().equals(path[0])) {
  230.           childnode = children.get(i);
  231.         }
  232.       }
  233.       if (childnode == null) return null; // non-existing node
  234.       if (path.length == 1) return childnode;
  235.       if (childnode instanceof InnerNode) {
  236.         return ((InnerNode)childnode).getLoc(path[1]);
  237.       } else {
  238.         return null;
  239.       }
  240.     }
  241.         
  242.     /** get <i>leafIndex</i> leaf of this subtree 
  243.      * if it is not in the <i>excludedNode</i>*/
  244.     private Node getLeaf(int leafIndex, Node excludedNode) {
  245.       int count=0;
  246.       // check if the excluded node a leaf
  247.       boolean isLeaf =
  248.         excludedNode == null || !(excludedNode instanceof InnerNode);
  249.       // calculate the total number of excluded leaf nodes
  250.       int numOfExcludedLeaves =
  251.         isLeaf ? 1 : ((InnerNode)excludedNode).getNumOfLeaves();
  252.       if (isRack()) { // children are leaves
  253.         if (isLeaf) { // excluded node is a leaf node
  254.           int excludedIndex = children.indexOf(excludedNode);
  255.           if (excludedIndex != -1 && leafIndex >= 0) {
  256.             // excluded node is one of the children so adjust the leaf index
  257.             leafIndex = leafIndex>=excludedIndex ? leafIndex+1 : leafIndex;
  258.           }
  259.         }
  260.         // range check
  261.         if (leafIndex<0 || leafIndex>=this.getNumOfChildren()) {
  262.           return null;
  263.         }
  264.         return children.get(leafIndex);
  265.       } else {
  266.         for(int i=0; i<children.size(); i++) {
  267.           InnerNode child = (InnerNode)children.get(i);
  268.           if (excludedNode == null || excludedNode != child) {
  269.             // not the excludedNode
  270.             int numOfLeaves = child.getNumOfLeaves();
  271.             if (excludedNode != null && child.isAncestor(excludedNode)) {
  272.               numOfLeaves -= numOfExcludedLeaves;
  273.             }
  274.             if (count+numOfLeaves > leafIndex) {
  275.               // the leaf is in the child subtree
  276.               return child.getLeaf(leafIndex-count, excludedNode);
  277.             } else {
  278.               // go to the next child
  279.               count = count+numOfLeaves;
  280.             }
  281.           } else { // it is the excluededNode
  282.             // skip it and set the excludedNode to be null
  283.             excludedNode = null;
  284.           }
  285.         }
  286.         return null;
  287.       }
  288.     }
  289.         
  290.     int getNumOfLeaves() {
  291.       return numOfLeaves;
  292.     }
  293.   } // end of InnerNode
  294.     
  295.   InnerNode clusterMap = new InnerNode(InnerNode.ROOT); // the root
  296.   private int numOfRacks = 0;  // rack counter
  297.   private ReadWriteLock netlock;
  298.     
  299.   public NetworkTopology() {
  300.     netlock = new ReentrantReadWriteLock();
  301.   }
  302.     
  303.   /** Add a leaf node
  304.    * Update node counter & rack counter if neccessary
  305.    * @param node
  306.    *          node to be added
  307.    * @exception IllegalArgumentException if add a node to a leave 
  308.                                          or node to be added is not a leaf
  309.    */
  310.   public void add(Node node) {
  311.     if (node==null) return;
  312.     if( node instanceof InnerNode ) {
  313.       throw new IllegalArgumentException(
  314.         "Not allow to add an inner node: "+NodeBase.getPath(node));
  315.     }
  316.     netlock.writeLock().lock();
  317.     try {
  318.       Node rack = getNode(node.getNetworkLocation());
  319.       if (rack != null && !(rack instanceof InnerNode)) {
  320.         throw new IllegalArgumentException("Unexpected data node " 
  321.                                            + node.toString() 
  322.                                            + " at an illegal network location");
  323.       }
  324.       if (clusterMap.add(node)) {
  325.         LOG.info("Adding a new node: "+NodeBase.getPath(node));
  326.         if (rack == null) {
  327.           numOfRacks++;
  328.         }
  329.       }
  330.       LOG.debug("NetworkTopology became:n" + this.toString());
  331.     } finally {
  332.       netlock.writeLock().unlock();
  333.     }
  334.   }
  335.     
  336.   /** Remove a node
  337.    * Update node counter & rack counter if neccessary
  338.    * @param node
  339.    *          node to be removed
  340.    */ 
  341.   public void remove(Node node) {
  342.     if (node==null) return;
  343.     if( node instanceof InnerNode ) {
  344.       throw new IllegalArgumentException(
  345.         "Not allow to remove an inner node: "+NodeBase.getPath(node));
  346.     }
  347.     LOG.info("Removing a node: "+NodeBase.getPath(node));
  348.     netlock.writeLock().lock();
  349.     try {
  350.       if (clusterMap.remove(node)) {
  351.         InnerNode rack = (InnerNode)getNode(node.getNetworkLocation());
  352.         if (rack == null) {
  353.           numOfRacks--;
  354.         }
  355.       }
  356.       LOG.debug("NetworkTopology became:n" + this.toString());
  357.     } finally {
  358.       netlock.writeLock().unlock();
  359.     }
  360.   }
  361.        
  362.   /** Check if the tree contains node <i>node</i>
  363.    * 
  364.    * @param node
  365.    *          a node
  366.    * @return true if <i>node</i> is already in the tree; false otherwise
  367.    */
  368.   public boolean contains(Node node) {
  369.     if (node == null) return false;
  370.     netlock.readLock().lock();
  371.     try {
  372.       Node parent = node.getParent();
  373.       for(int level=node.getLevel(); parent!=null&&level>0;
  374.           parent=parent.getParent(), level--) {
  375.         if (parent == clusterMap)
  376.           return true;
  377.       }
  378.     } finally {
  379.       netlock.readLock().unlock();
  380.     }
  381.     return false; 
  382.   }
  383.     
  384.   /** Given a string representation of a node, return its reference
  385.    * 
  386.    * @param loc
  387.    *          a path-like string representation of a node
  388.    * @return a reference to the node; null if the node is not in the tree
  389.    */
  390.   public Node getNode(String loc) {
  391.     netlock.readLock().lock();
  392.     try {
  393.       loc = NodeBase.normalize(loc);
  394.       if (!NodeBase.ROOT.equals(loc))
  395.         loc = loc.substring(1);
  396.       return clusterMap.getLoc(loc);
  397.     } finally {
  398.       netlock.readLock().unlock();
  399.     }
  400.   }
  401.     
  402.   /** Return the total number of racks */
  403.   public int getNumOfRacks() {
  404.     netlock.readLock().lock();
  405.     try {
  406.       return numOfRacks;
  407.     } finally {
  408.       netlock.readLock().unlock();
  409.     }
  410.   }
  411.     
  412.   /** Return the total number of nodes */
  413.   public int getNumOfLeaves() {
  414.     netlock.readLock().lock();
  415.     try {
  416.       return clusterMap.getNumOfLeaves();
  417.     } finally {
  418.       netlock.readLock().unlock();
  419.     }
  420.   }
  421.     
  422.   /** Return the distance between two nodes
  423.    * It is assumed that the distance from one node to its parent is 1
  424.    * The distance between two nodes is calculated by summing up their distances
  425.    * to their closest common  ancestor.
  426.    * @param node1 one node
  427.    * @param node2 another node
  428.    * @return the distance between node1 and node2
  429.    * node1 or node2 do not belong to the cluster
  430.    */
  431.   public int getDistance(Node node1, Node node2) {
  432.     if (node1 == node2) {
  433.       return 0;
  434.     }
  435.     Node n1=node1, n2=node2;
  436.     int dis = 0;
  437.     netlock.readLock().lock();
  438.     try {
  439.       int level1=node1.getLevel(), level2=node2.getLevel();
  440.       while(n1!=null && level1>level2) {
  441.         n1 = n1.getParent();
  442.         level1--;
  443.         dis++;
  444.       }
  445.       while(n2!=null && level2>level1) {
  446.         n2 = n2.getParent();
  447.         level2--;
  448.         dis++;
  449.       }
  450.       while(n1!=null && n2!=null && n1.getParent()!=n2.getParent()) {
  451.         n1=n1.getParent();
  452.         n2=n2.getParent();
  453.         dis+=2;
  454.       }
  455.     } finally {
  456.       netlock.readLock().unlock();
  457.     }
  458.     if (n1==null) {
  459.       LOG.warn("The cluster does not contain node: "+NodeBase.getPath(node1));
  460.       return Integer.MAX_VALUE;
  461.     }
  462.     if (n2==null) {
  463.       LOG.warn("The cluster does not contain node: "+NodeBase.getPath(node2));
  464.       return Integer.MAX_VALUE;
  465.     }
  466.     return dis+2;
  467.   } 
  468.     
  469.   /** Check if two nodes are on the same rack
  470.    * @param node1 one node
  471.    * @param node2 another node
  472.    * @return true if node1 and node2 are pm the same rack; false otherwise
  473.    * @exception IllegalArgumentException when either node1 or node2 is null, or
  474.    * node1 or node2 do not belong to the cluster
  475.    */
  476.   public boolean isOnSameRack( Node node1,  Node node2) {
  477.     if (node1 == null || node2 == null) {
  478.       return false;
  479.     }
  480.       
  481.     netlock.readLock().lock();
  482.     try {
  483.       return node1.getParent()==node2.getParent();
  484.     } finally {
  485.       netlock.readLock().unlock();
  486.     }
  487.   }
  488.     
  489.   final private static Random r = new Random();
  490.   /** randomly choose one node from <i>scope</i>
  491.    * if scope starts with ~, choose one from the all nodes except for the
  492.    * ones in <i>scope</i>; otherwise, choose one from <i>scope</i>
  493.    * @param scope range of nodes from which a node will be choosen
  494.    * @return the choosen node
  495.    */
  496.   public Node chooseRandom(String scope) {
  497.     netlock.readLock().lock();
  498.     try {
  499.       if (scope.startsWith("~")) {
  500.         return chooseRandom(NodeBase.ROOT, scope.substring(1));
  501.       } else {
  502.         return chooseRandom(scope, null);
  503.       }
  504.     } finally {
  505.       netlock.readLock().unlock();
  506.     }
  507.   }
  508.     
  509.   private Node chooseRandom(String scope, String excludedScope){
  510.     if (excludedScope != null) {
  511.       if (scope.startsWith(excludedScope)) {
  512.         return null;
  513.       }
  514.       if (!excludedScope.startsWith(scope)) {
  515.         excludedScope = null;
  516.       }
  517.     }
  518.     Node node = getNode(scope);
  519.     if (!(node instanceof InnerNode)) {
  520.       return node;
  521.     }
  522.     InnerNode innerNode = (InnerNode)node;
  523.     int numOfDatanodes = innerNode.getNumOfLeaves();
  524.     if (excludedScope == null) {
  525.       node = null;
  526.     } else {
  527.       node = getNode(excludedScope);
  528.       if (!(node instanceof InnerNode)) {
  529.         numOfDatanodes -= 1;
  530.       } else {
  531.         numOfDatanodes -= ((InnerNode)node).getNumOfLeaves();
  532.       }
  533.     }
  534.     int leaveIndex = r.nextInt(numOfDatanodes);
  535.     return innerNode.getLeaf(leaveIndex, node);
  536.   }
  537.        
  538.   /** return the number of leaves in <i>scope</i> but not in <i>excludedNodes</i>
  539.    * if scope starts with ~, return the number of nodes that are not
  540.    * in <i>scope</i> and <i>excludedNodes</i>; 
  541.    * @param scope a path string that may start with ~
  542.    * @param excludedNodes a list of nodes
  543.    * @return number of available nodes
  544.    */
  545.   public int countNumOfAvailableNodes(String scope,
  546.                                       List<Node> excludedNodes) {
  547.     boolean isExcluded=false;
  548.     if (scope.startsWith("~")) {
  549.       isExcluded=true;
  550.       scope=scope.substring(1);
  551.     }
  552.     scope = NodeBase.normalize(scope);
  553.     int count=0; // the number of nodes in both scope & excludedNodes
  554.     netlock.readLock().lock();
  555.     try {
  556.       for(Node node:excludedNodes) {
  557.         if ((NodeBase.getPath(node)+NodeBase.PATH_SEPARATOR_STR).
  558.             startsWith(scope+NodeBase.PATH_SEPARATOR_STR)) {
  559.           count++;
  560.         }
  561.       }
  562.       Node n=getNode(scope);
  563.       int scopeNodeCount=1;
  564.       if (n instanceof InnerNode) {
  565.         scopeNodeCount=((InnerNode)n).getNumOfLeaves();
  566.       }
  567.       if (isExcluded) {
  568.         return clusterMap.getNumOfLeaves()-
  569.           scopeNodeCount-excludedNodes.size()+count;
  570.       } else {
  571.         return scopeNodeCount-count;
  572.       }
  573.     } finally {
  574.       netlock.readLock().unlock();
  575.     }
  576.   }
  577.     
  578.   /** convert a network tree to a string */
  579.   public String toString() {
  580.     // print the number of racks
  581.     StringBuffer tree = new StringBuffer();
  582.     tree.append("Number of racks: ");
  583.     tree.append(numOfRacks);
  584.     tree.append("n");
  585.     // print the number of leaves
  586.     int numOfLeaves = getNumOfLeaves();
  587.     tree.append("Expected number of leaves:");
  588.     tree.append(numOfLeaves);
  589.     tree.append("n");
  590.     // print nodes
  591.     for(int i=0; i<numOfLeaves; i++) {
  592.       tree.append(NodeBase.getPath(clusterMap.getLeaf(i, null)));
  593.       tree.append("n");
  594.     }
  595.     return tree.toString();
  596.   }
  597.   /* swap two array items */
  598.   static private void swap(Node[] nodes, int i, int j) {
  599.     Node tempNode;
  600.     tempNode = nodes[j];
  601.     nodes[j] = nodes[i];
  602.     nodes[i] = tempNode;
  603.     
  604.   }
  605.   
  606.   /** Sort nodes array by their distances to <i>reader</i>
  607.    * It linearly scans the array, if a local node is found, swap it with
  608.    * the first element of the array.
  609.    * If a local rack node is found, swap it with the first element following
  610.    * the local node.
  611.    * If neither local node or local rack node is found, put a random replica
  612.    * location at postion 0.
  613.    * It leaves the rest nodes untouched.
  614.    */
  615.   public void pseudoSortByDistance( Node reader, Node[] nodes ) {
  616.     int tempIndex = 0;
  617.     if (reader != null ) {
  618.       int localRackNode = -1;
  619.       //scan the array to find the local node & local rack node
  620.       for(int i=0; i<nodes.length; i++) {
  621.         if(tempIndex == 0 && reader == nodes[i]) { //local node
  622.           //swap the local node and the node at position 0
  623.           if( i != 0 ) {
  624.             swap(nodes, tempIndex, i);
  625.           }
  626.           tempIndex=1;
  627.           if(localRackNode != -1 ) {
  628.             if(localRackNode == 0) {
  629.               localRackNode = i;
  630.             }
  631.             break;
  632.           }
  633.         } else if(localRackNode == -1 && isOnSameRack(reader, nodes[i])) {
  634.           //local rack
  635.           localRackNode = i;
  636.           if(tempIndex != 0 ) break;
  637.         }
  638.       }
  639.       // swap the local rack node and the node at position tempIndex
  640.       if(localRackNode != -1 && localRackNode != tempIndex ) {
  641.         swap(nodes, tempIndex, localRackNode);
  642.         tempIndex++;
  643.       }
  644.     }
  645.     
  646.     // put a random node at position 0 if it is not a local/local-rack node
  647.     if(tempIndex == 0 && nodes.length != 0) {
  648.       swap(nodes, 0, r.nextInt(nodes.length));
  649.     }
  650.   }
  651. }