PendingReplicationBlocks.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:7k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfs.server.namenode;
  19. import org.apache.hadoop.hdfs.protocol.Block;
  20. import org.apache.hadoop.util.*;
  21. import java.io.*;
  22. import java.util.*;
  23. import java.sql.Time;
  24. /***************************************************
  25.  * PendingReplicationBlocks does the bookkeeping of all
  26.  * blocks that are getting replicated.
  27.  *
  28.  * It does the following:
  29.  * 1)  record blocks that are getting replicated at this instant.
  30.  * 2)  a coarse grain timer to track age of replication request
  31.  * 3)  a thread that periodically identifies replication-requests
  32.  *     that never made it.
  33.  *
  34.  ***************************************************/
  35. class PendingReplicationBlocks {
  36.   private Map<Block, PendingBlockInfo> pendingReplications;
  37.   private ArrayList<Block> timedOutItems;
  38.   Daemon timerThread = null;
  39.   private volatile boolean fsRunning = true;
  40.   //
  41.   // It might take anywhere between 5 to 10 minutes before
  42.   // a request is timed out.
  43.   //
  44.   private long timeout = 5 * 60 * 1000;
  45.   private long defaultRecheckInterval = 5 * 60 * 1000;
  46.   PendingReplicationBlocks(long timeoutPeriod) {
  47.     if ( timeoutPeriod > 0 ) {
  48.       this.timeout = timeoutPeriod;
  49.     }
  50.     init();
  51.   }
  52.   PendingReplicationBlocks() {
  53.     init();
  54.   }
  55.   void init() {
  56.     pendingReplications = new HashMap<Block, PendingBlockInfo>();
  57.     timedOutItems = new ArrayList<Block>();
  58.     this.timerThread = new Daemon(new PendingReplicationMonitor());
  59.     timerThread.start();
  60.   }
  61.   /**
  62.    * Add a block to the list of pending Replications
  63.    */
  64.   void add(Block block, int numReplicas) {
  65.     synchronized (pendingReplications) {
  66.       PendingBlockInfo found = pendingReplications.get(block);
  67.       if (found == null) {
  68.         pendingReplications.put(block, new PendingBlockInfo(numReplicas));
  69.       } else {
  70.         found.incrementReplicas(numReplicas);
  71.         found.setTimeStamp();
  72.       }
  73.     }
  74.   }
  75.   /**
  76.    * One replication request for this block has finished.
  77.    * Decrement the number of pending replication requests
  78.    * for this block.
  79.    */
  80.   void remove(Block block) {
  81.     synchronized (pendingReplications) {
  82.       PendingBlockInfo found = pendingReplications.get(block);
  83.       if (found != null) {
  84.        FSNamesystem.LOG.debug("Removing pending replication for block" + block);
  85.         found.decrementReplicas();
  86.         if (found.getNumReplicas() <= 0) {
  87.           pendingReplications.remove(block);
  88.         }
  89.       }
  90.     }
  91.   }
  92.   /**
  93.    * The total number of blocks that are undergoing replication
  94.    */
  95.   int size() {
  96.     return pendingReplications.size();
  97.   } 
  98.   /**
  99.    * How many copies of this block is pending replication?
  100.    */
  101.   int getNumReplicas(Block block) {
  102.     synchronized (pendingReplications) {
  103.       PendingBlockInfo found = pendingReplications.get(block);
  104.       if (found != null) {
  105.         return found.getNumReplicas();
  106.       }
  107.     }
  108.     return 0;
  109.   }
  110.   /**
  111.    * Returns a list of blocks that have timed out their 
  112.    * replication requests. Returns null if no blocks have
  113.    * timed out.
  114.    */
  115.   Block[] getTimedOutBlocks() {
  116.     synchronized (timedOutItems) {
  117.       if (timedOutItems.size() <= 0) {
  118.         return null;
  119.       }
  120.       Block[] blockList = timedOutItems.toArray(
  121.                                                 new Block[timedOutItems.size()]);
  122.       timedOutItems.clear();
  123.       return blockList;
  124.     }
  125.   }
  126.   /**
  127.    * An object that contains information about a block that 
  128.    * is being replicated. It records the timestamp when the 
  129.    * system started replicating the most recent copy of this
  130.    * block. It also records the number of replication
  131.    * requests that are in progress.
  132.    */
  133.   static class PendingBlockInfo {
  134.     private long timeStamp;
  135.     private int numReplicasInProgress;
  136.     PendingBlockInfo(int numReplicas) {
  137.       this.timeStamp = FSNamesystem.now();
  138.       this.numReplicasInProgress = numReplicas;
  139.     }
  140.     long getTimeStamp() {
  141.       return timeStamp;
  142.     }
  143.     void setTimeStamp() {
  144.       timeStamp = FSNamesystem.now();
  145.     }
  146.     void incrementReplicas(int increment) {
  147.       numReplicasInProgress += increment;
  148.     }
  149.     void decrementReplicas() {
  150.       numReplicasInProgress--;
  151.       assert(numReplicasInProgress >= 0);
  152.     }
  153.     int getNumReplicas() {
  154.       return numReplicasInProgress;
  155.     }
  156.   }
  157.   /*
  158.    * A periodic thread that scans for blocks that never finished
  159.    * their replication request.
  160.    */
  161.   class PendingReplicationMonitor implements Runnable {
  162.     public void run() {
  163.       while (fsRunning) {
  164.         long period = Math.min(defaultRecheckInterval, timeout);
  165.         try {
  166.           pendingReplicationCheck();
  167.           Thread.sleep(period);
  168.         } catch (InterruptedException ie) {
  169.           FSNamesystem.LOG.debug(
  170.                 "PendingReplicationMonitor thread received exception. " + ie);
  171.         }
  172.       }
  173.     }
  174.     /**
  175.      * Iterate through all items and detect timed-out items
  176.      */
  177.     void pendingReplicationCheck() {
  178.       synchronized (pendingReplications) {
  179.         Iterator iter = pendingReplications.entrySet().iterator();
  180.         long now = FSNamesystem.now();
  181.         FSNamesystem.LOG.debug("PendingReplicationMonitor checking Q");
  182.         while (iter.hasNext()) {
  183.           Map.Entry entry = (Map.Entry) iter.next();
  184.           PendingBlockInfo pendingBlock = (PendingBlockInfo) entry.getValue();
  185.           if (now > pendingBlock.getTimeStamp() + timeout) {
  186.             Block block = (Block) entry.getKey();
  187.             synchronized (timedOutItems) {
  188.               timedOutItems.add(block);
  189.             }
  190.             FSNamesystem.LOG.warn(
  191.                 "PendingReplicationMonitor timed out block " + block);
  192.             iter.remove();
  193.           }
  194.         }
  195.       }
  196.     }
  197.   }
  198.   /*
  199.    * Shuts down the pending replication monitor thread.
  200.    * Waits for the thread to exit.
  201.    */
  202.   void stop() {
  203.     fsRunning = false;
  204.     timerThread.interrupt();
  205.     try {
  206.       timerThread.join(3000);
  207.     } catch (InterruptedException ie) {
  208.     }
  209.   }
  210.   /**
  211.    * Iterate through all items and print them.
  212.    */
  213.   void metaSave(PrintWriter out) {
  214.     synchronized (pendingReplications) {
  215.       out.println("Metasave: Blocks being replicated: " +
  216.                   pendingReplications.size());
  217.       Iterator iter = pendingReplications.entrySet().iterator();
  218.       while (iter.hasNext()) {
  219.         Map.Entry entry = (Map.Entry) iter.next();
  220.         PendingBlockInfo pendingBlock = (PendingBlockInfo) entry.getValue();
  221.         Block block = (Block) entry.getKey();
  222.         out.println(block + 
  223.                     " StartTime: " + new Time(pendingBlock.timeStamp) +
  224.                     " NumReplicaInProgress: " + 
  225.                     pendingBlock.numReplicasInProgress);
  226.       }
  227.     }
  228.   }
  229. }