UnderReplicatedBlocks.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:8k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfs.server.namenode;
  19. import java.util.*;
  20. import org.apache.hadoop.hdfs.protocol.Block;
  21. /* Class for keeping track of under replication blocks
  22.  * Blocks have replication priority, with priority 0 indicating the highest
  23.  * Blocks have only one replicas has the highest
  24.  */
  25. class UnderReplicatedBlocks implements Iterable<Block> {
  26.   static final int LEVEL = 3;
  27.   private List<TreeSet<Block>> priorityQueues = new ArrayList<TreeSet<Block>>();
  28.       
  29.   /* constructor */
  30.   UnderReplicatedBlocks() {
  31.     for(int i=0; i<LEVEL; i++) {
  32.       priorityQueues.add(new TreeSet<Block>());
  33.     }
  34.   }
  35.   /**
  36.    * Empty the queues.
  37.    */
  38.   void clear() {
  39.     for(int i=0; i<LEVEL; i++) {
  40.       priorityQueues.get(i).clear();
  41.     }
  42.   }
  43.   /* Return the total number of under replication blocks */
  44.   synchronized int size() {
  45.     int size = 0;
  46.     for (int i=0; i<LEVEL; i++) {
  47.       size += priorityQueues.get(i).size();
  48.     }
  49.     return size;
  50.   }
  51.         
  52.   /* Check if a block is in the neededReplication queue */
  53.   synchronized boolean contains(Block block) {
  54.     for(TreeSet<Block> set:priorityQueues) {
  55.       if(set.contains(block)) { return true; }
  56.     }
  57.     return false;
  58.   }
  59.       
  60.   /* Return the priority of a block
  61.    * @param block a under replication block
  62.    * @param curReplicas current number of replicas of the block
  63.    * @param expectedReplicas expected number of replicas of the block
  64.    */
  65.   private int getPriority(Block block, 
  66.                           int curReplicas, 
  67.                           int decommissionedReplicas,
  68.                           int expectedReplicas) {
  69.     if (curReplicas<0 || curReplicas>=expectedReplicas) {
  70.       return LEVEL; // no need to replicate
  71.     } else if(curReplicas==0) {
  72.       // If there are zero non-decommissioned replica but there are
  73.       // some decommissioned replicas, then assign them highest priority
  74.       if (decommissionedReplicas > 0) {
  75.         return 0;
  76.       }
  77.       return 2; // keep these blocks in needed replication.
  78.     } else if(curReplicas==1) {
  79.       return 0; // highest priority
  80.     } else if(curReplicas*3<expectedReplicas) {
  81.       return 1;
  82.     } else {
  83.       return 2;
  84.     }
  85.   }
  86.       
  87.   /* add a block to a under replication queue according to its priority
  88.    * @param block a under replication block
  89.    * @param curReplicas current number of replicas of the block
  90.    * @param expectedReplicas expected number of replicas of the block
  91.    */
  92.   synchronized boolean add(
  93.                            Block block,
  94.                            int curReplicas, 
  95.                            int decomissionedReplicas,
  96.                            int expectedReplicas) {
  97.     if(curReplicas<0 || expectedReplicas <= curReplicas) {
  98.       return false;
  99.     }
  100.     int priLevel = getPriority(block, curReplicas, decomissionedReplicas,
  101.                                expectedReplicas);
  102.     if(priLevel != LEVEL && priorityQueues.get(priLevel).add(block)) {
  103.       NameNode.stateChangeLog.debug(
  104.                                     "BLOCK* NameSystem.UnderReplicationBlock.add:"
  105.                                     + block
  106.                                     + " has only "+curReplicas
  107.                                     + " replicas and need " + expectedReplicas
  108.                                     + " replicas so is added to neededReplications"
  109.                                     + " at priority level " + priLevel);
  110.       return true;
  111.     }
  112.     return false;
  113.   }
  114.   /* remove a block from a under replication queue */
  115.   synchronized boolean remove(Block block, 
  116.                               int oldReplicas, 
  117.                               int decommissionedReplicas,
  118.                               int oldExpectedReplicas) {
  119.     int priLevel = getPriority(block, oldReplicas, 
  120.                                decommissionedReplicas,
  121.                                oldExpectedReplicas);
  122.     return remove(block, priLevel);
  123.   }
  124.       
  125.   /* remove a block from a under replication queue given a priority*/
  126.   boolean remove(Block block, int priLevel) {
  127.     if(priLevel >= 0 && priLevel < LEVEL 
  128.         && priorityQueues.get(priLevel).remove(block)) {
  129.       NameNode.stateChangeLog.debug(
  130.                                     "BLOCK* NameSystem.UnderReplicationBlock.remove: "
  131.                                     + "Removing block " + block
  132.                                     + " from priority queue "+ priLevel);
  133.       return true;
  134.     } else {
  135.       for(int i=0; i<LEVEL; i++) {
  136.         if(i!=priLevel && priorityQueues.get(i).remove(block)) {
  137.           NameNode.stateChangeLog.debug(
  138.                                         "BLOCK* NameSystem.UnderReplicationBlock.remove: "
  139.                                         + "Removing block " + block
  140.                                         + " from priority queue "+ i);
  141.           return true;
  142.         }
  143.       }
  144.     }
  145.     return false;
  146.   }
  147.       
  148.   /* update the priority level of a block */
  149.   synchronized void update(Block block, int curReplicas, 
  150.                            int decommissionedReplicas,
  151.                            int curExpectedReplicas,
  152.                            int curReplicasDelta, int expectedReplicasDelta) {
  153.     int oldReplicas = curReplicas-curReplicasDelta;
  154.     int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
  155.     int curPri = getPriority(block, curReplicas, decommissionedReplicas, curExpectedReplicas);
  156.     int oldPri = getPriority(block, oldReplicas, decommissionedReplicas, oldExpectedReplicas);
  157.     NameNode.stateChangeLog.debug("UnderReplicationBlocks.update " + 
  158.                                   block +
  159.                                   " curReplicas " + curReplicas +
  160.                                   " curExpectedReplicas " + curExpectedReplicas +
  161.                                   " oldReplicas " + oldReplicas +
  162.                                   " oldExpectedReplicas  " + oldExpectedReplicas +
  163.                                   " curPri  " + curPri +
  164.                                   " oldPri  " + oldPri);
  165.     if(oldPri != LEVEL && oldPri != curPri) {
  166.       remove(block, oldPri);
  167.     }
  168.     if(curPri != LEVEL && priorityQueues.get(curPri).add(block)) {
  169.       NameNode.stateChangeLog.debug(
  170.                                     "BLOCK* NameSystem.UnderReplicationBlock.update:"
  171.                                     + block
  172.                                     + " has only "+curReplicas
  173.                                     + " replicas and need " + curExpectedReplicas
  174.                                     + " replicas so is added to neededReplications"
  175.                                     + " at priority level " + curPri);
  176.     }
  177.   }
  178.       
  179.   /* return an iterator of all the under replication blocks */
  180.   public synchronized BlockIterator iterator() {
  181.     return new BlockIterator();
  182.   }
  183.   
  184.     class BlockIterator implements Iterator<Block> {
  185.       private int level;
  186.       private List<Iterator<Block>> iterators = new ArrayList<Iterator<Block>>();
  187.       BlockIterator()  
  188.       {
  189.         level=0;
  190.         for(int i=0; i<LEVEL; i++) {
  191.           iterators.add(priorityQueues.get(i).iterator());
  192.         }
  193.       }
  194.               
  195.       private void update() {
  196.         while(level< LEVEL-1 && !iterators.get(level).hasNext()) {
  197.           level++;
  198.         }
  199.       }
  200.               
  201.       public Block next() {
  202.         update();
  203.         return iterators.get(level).next();
  204.       }
  205.               
  206.       public boolean hasNext() {
  207.         update();
  208.         return iterators.get(level).hasNext();
  209.       }
  210.               
  211.       public void remove() {
  212.         iterators.get(level).remove();
  213.       }
  214.       
  215.       public int getPriority() {
  216.         return level;
  217.     };
  218.   }
  219. }