DatanodeDescriptor.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:15k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfs.server.namenode;
  19. import java.io.DataInput;
  20. import java.io.IOException;
  21. import java.util.*;
  22. import org.apache.hadoop.hdfs.protocol.Block;
  23. import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
  24. import org.apache.hadoop.hdfs.protocol.DatanodeID;
  25. import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
  26. import org.apache.hadoop.hdfs.server.namenode.BlocksMap.BlockInfo;
  27. import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
  28. import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
  29. import org.apache.hadoop.io.Text;
  30. import org.apache.hadoop.io.UTF8;
  31. import org.apache.hadoop.io.WritableUtils;
  32. /**************************************************
  33.  * DatanodeDescriptor tracks stats on a given DataNode,
  34.  * such as available storage capacity, last update time, etc.,
  35.  * and maintains a set of blocks stored on the datanode. 
  36.  *
  37.  * This data structure is a data structure that is internal
  38.  * to the namenode. It is *not* sent over-the-wire to the Client
  39.  * or the Datnodes. Neither is it stored persistently in the
  40.  * fsImage.
  41.  **************************************************/
  42. public class DatanodeDescriptor extends DatanodeInfo {
  43.   /** Block and targets pair */
  44.   public static class BlockTargetPair {
  45.     public final Block block;
  46.     public final DatanodeDescriptor[] targets;    
  47.     BlockTargetPair(Block block, DatanodeDescriptor[] targets) {
  48.       this.block = block;
  49.       this.targets = targets;
  50.     }
  51.   }
  52.   /** A BlockTargetPair queue. */
  53.   private static class BlockQueue {
  54.     private final Queue<BlockTargetPair> blockq = new LinkedList<BlockTargetPair>();
  55.     /** Size of the queue */
  56.     synchronized int size() {return blockq.size();}
  57.     /** Enqueue */
  58.     synchronized boolean offer(Block block, DatanodeDescriptor[] targets) { 
  59.       return blockq.offer(new BlockTargetPair(block, targets));
  60.     }
  61.     /** Dequeue */
  62.     synchronized List<BlockTargetPair> poll(int numBlocks) {
  63.       if (numBlocks <= 0 || blockq.isEmpty()) {
  64.         return null;
  65.       }
  66.       List<BlockTargetPair> results = new ArrayList<BlockTargetPair>();
  67.       for(; !blockq.isEmpty() && numBlocks > 0; numBlocks--) {
  68.         results.add(blockq.poll());
  69.       }
  70.       return results;
  71.     }
  72.   }
  73.   private volatile BlockInfo blockList = null;
  74.   // isAlive == heartbeats.contains(this)
  75.   // This is an optimization, because contains takes O(n) time on Arraylist
  76.   protected boolean isAlive = false;
  77.   /** A queue of blocks to be replicated by this datanode */
  78.   private BlockQueue replicateBlocks = new BlockQueue();
  79.   /** A queue of blocks to be recovered by this datanode */
  80.   private BlockQueue recoverBlocks = new BlockQueue();
  81.   /** A set of blocks to be invalidated by this datanode */
  82.   private Set<Block> invalidateBlocks = new TreeSet<Block>();
  83.   /* Variables for maintaning number of blocks scheduled to be written to
  84.    * this datanode. This count is approximate and might be slightly higger
  85.    * in case of errors (e.g. datanode does not report if an error occurs 
  86.    * while writing the block).
  87.    */
  88.   private int currApproxBlocksScheduled = 0;
  89.   private int prevApproxBlocksScheduled = 0;
  90.   private long lastBlocksScheduledRollTime = 0;
  91.   private static final int BLOCKS_SCHEDULED_ROLL_INTERVAL = 600*1000; //10min
  92.   
  93.   /** Default constructor */
  94.   public DatanodeDescriptor() {}
  95.   
  96.   /** DatanodeDescriptor constructor
  97.    * @param nodeID id of the data node
  98.    */
  99.   public DatanodeDescriptor(DatanodeID nodeID) {
  100.     this(nodeID, 0L, 0L, 0L, 0);
  101.   }
  102.   /** DatanodeDescriptor constructor
  103.    * 
  104.    * @param nodeID id of the data node
  105.    * @param networkLocation location of the data node in network
  106.    */
  107.   public DatanodeDescriptor(DatanodeID nodeID, 
  108.                             String networkLocation) {
  109.     this(nodeID, networkLocation, null);
  110.   }
  111.   
  112.   /** DatanodeDescriptor constructor
  113.    * 
  114.    * @param nodeID id of the data node
  115.    * @param networkLocation location of the data node in network
  116.    * @param hostName it could be different from host specified for DatanodeID
  117.    */
  118.   public DatanodeDescriptor(DatanodeID nodeID, 
  119.                             String networkLocation,
  120.                             String hostName) {
  121.     this(nodeID, networkLocation, hostName, 0L, 0L, 0L, 0);
  122.   }
  123.   
  124.   /** DatanodeDescriptor constructor
  125.    * 
  126.    * @param nodeID id of the data node
  127.    * @param capacity capacity of the data node
  128.    * @param dfsUsed space used by the data node
  129.    * @param remaining remaing capacity of the data node
  130.    * @param xceiverCount # of data transfers at the data node
  131.    */
  132.   public DatanodeDescriptor(DatanodeID nodeID, 
  133.                             long capacity,
  134.                             long dfsUsed,
  135.                             long remaining,
  136.                             int xceiverCount) {
  137.     super(nodeID);
  138.     updateHeartbeat(capacity, dfsUsed, remaining, xceiverCount);
  139.   }
  140.   /** DatanodeDescriptor constructor
  141.    * 
  142.    * @param nodeID id of the data node
  143.    * @param networkLocation location of the data node in network
  144.    * @param capacity capacity of the data node, including space used by non-dfs
  145.    * @param dfsUsed the used space by dfs datanode
  146.    * @param remaining remaing capacity of the data node
  147.    * @param xceiverCount # of data transfers at the data node
  148.    */
  149.   public DatanodeDescriptor(DatanodeID nodeID,
  150.                             String networkLocation,
  151.                             String hostName,
  152.                             long capacity,
  153.                             long dfsUsed,
  154.                             long remaining,
  155.                             int xceiverCount) {
  156.     super(nodeID, networkLocation, hostName);
  157.     updateHeartbeat(capacity, dfsUsed, remaining, xceiverCount);
  158.   }
  159.   /**
  160.    * Add data-node to the block.
  161.    * Add block to the head of the list of blocks belonging to the data-node.
  162.    */
  163.   boolean addBlock(BlockInfo b) {
  164.     if(!b.addNode(this))
  165.       return false;
  166.     // add to the head of the data-node list
  167.     blockList = b.listInsert(blockList, this);
  168.     return true;
  169.   }
  170.   
  171.   /**
  172.    * Remove block from the list of blocks belonging to the data-node.
  173.    * Remove data-node from the block.
  174.    */
  175.   boolean removeBlock(BlockInfo b) {
  176.     blockList = b.listRemove(blockList, this);
  177.     return b.removeNode(this);
  178.   }
  179.   /**
  180.    * Move block to the head of the list of blocks belonging to the data-node.
  181.    */
  182.   void moveBlockToHead(BlockInfo b) {
  183.     blockList = b.listRemove(blockList, this);
  184.     blockList = b.listInsert(blockList, this);
  185.   }
  186.   void resetBlocks() {
  187.     this.capacity = 0;
  188.     this.remaining = 0;
  189.     this.dfsUsed = 0;
  190.     this.xceiverCount = 0;
  191.     this.blockList = null;
  192.     this.invalidateBlocks.clear();
  193.   }
  194.   public int numBlocks() {
  195.     return blockList == null ? 0 : blockList.listCount(this);
  196.   }
  197.   /**
  198.    */
  199.   void updateHeartbeat(long capacity, long dfsUsed, long remaining,
  200.       int xceiverCount) {
  201.     this.capacity = capacity;
  202.     this.dfsUsed = dfsUsed;
  203.     this.remaining = remaining;
  204.     this.lastUpdate = System.currentTimeMillis();
  205.     this.xceiverCount = xceiverCount;
  206.     rollBlocksScheduled(lastUpdate);
  207.   }
  208.   /**
  209.    * Iterates over the list of blocks belonging to the data-node.
  210.    */
  211.   static private class BlockIterator implements Iterator<Block> {
  212.     private BlockInfo current;
  213.     private DatanodeDescriptor node;
  214.       
  215.     BlockIterator(BlockInfo head, DatanodeDescriptor dn) {
  216.       this.current = head;
  217.       this.node = dn;
  218.     }
  219.     public boolean hasNext() {
  220.       return current != null;
  221.     }
  222.     public BlockInfo next() {
  223.       BlockInfo res = current;
  224.       current = current.getNext(current.findDatanode(node));
  225.       return res;
  226.     }
  227.     public void remove()  {
  228.       throw new UnsupportedOperationException("Sorry. can't remove.");
  229.     }
  230.   }
  231.   Iterator<Block> getBlockIterator() {
  232.     return new BlockIterator(this.blockList, this);
  233.   }
  234.   
  235.   /**
  236.    * Store block replication work.
  237.    */
  238.   void addBlockToBeReplicated(Block block, DatanodeDescriptor[] targets) {
  239.     assert(block != null && targets != null && targets.length > 0);
  240.     replicateBlocks.offer(block, targets);
  241.   }
  242.   /**
  243.    * Store block recovery work.
  244.    */
  245.   void addBlockToBeRecovered(Block block, DatanodeDescriptor[] targets) {
  246.     assert(block != null && targets != null && targets.length > 0);
  247.     recoverBlocks.offer(block, targets);
  248.   }
  249.   /**
  250.    * Store block invalidation work.
  251.    */
  252.   void addBlocksToBeInvalidated(List<Block> blocklist) {
  253.     assert(blocklist != null && blocklist.size() > 0);
  254.     synchronized (invalidateBlocks) {
  255.       for(Block blk : blocklist) {
  256.         invalidateBlocks.add(blk);
  257.       }
  258.     }
  259.   }
  260.   /**
  261.    * The number of work items that are pending to be replicated
  262.    */
  263.   int getNumberOfBlocksToBeReplicated() {
  264.     return replicateBlocks.size();
  265.   }
  266.   /**
  267.    * The number of block invalidation items that are pending to 
  268.    * be sent to the datanode
  269.    */
  270.   int getNumberOfBlocksToBeInvalidated() {
  271.     synchronized (invalidateBlocks) {
  272.       return invalidateBlocks.size();
  273.     }
  274.   }
  275.   
  276.   BlockCommand getReplicationCommand(int maxTransfers) {
  277.     List<BlockTargetPair> blocktargetlist = replicateBlocks.poll(maxTransfers);
  278.     return blocktargetlist == null? null:
  279.         new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blocktargetlist);
  280.   }
  281.   BlockCommand getLeaseRecoveryCommand(int maxTransfers) {
  282.     List<BlockTargetPair> blocktargetlist = recoverBlocks.poll(maxTransfers);
  283.     return blocktargetlist == null? null:
  284.         new BlockCommand(DatanodeProtocol.DNA_RECOVERBLOCK, blocktargetlist);
  285.   }
  286.   /**
  287.    * Remove the specified number of blocks to be invalidated
  288.    */
  289.   BlockCommand getInvalidateBlocks(int maxblocks) {
  290.     Block[] deleteList = getBlockArray(invalidateBlocks, maxblocks); 
  291.     return deleteList == null? 
  292.         null: new BlockCommand(DatanodeProtocol.DNA_INVALIDATE, deleteList);
  293.   }
  294.   static private Block[] getBlockArray(Collection<Block> blocks, int max) {
  295.     Block[] blockarray = null;
  296.     synchronized(blocks) {
  297.       int available = blocks.size();
  298.       int n = available;
  299.       if (max > 0 && n > 0) {
  300.         if (max < n) {
  301.           n = max;
  302.         }
  303.         // allocate the properly sized block array ... 
  304.         blockarray = new Block[n];
  305.         // iterate tree collecting n blocks... 
  306.         Iterator<Block> e = blocks.iterator();
  307.         int blockCount = 0;
  308.         while (blockCount < n && e.hasNext()) {
  309.           // insert into array ... 
  310.           blockarray[blockCount++] = e.next();
  311.           // remove from tree via iterator, if we are removing 
  312.           // less than total available blocks
  313.           if (n < available){
  314.             e.remove();
  315.           }
  316.         }
  317.         assert(blockarray.length == n);
  318.         
  319.         // now if the number of blocks removed equals available blocks,
  320.         // them remove all blocks in one fell swoop via clear
  321.         if (n == available) { 
  322.           blocks.clear();
  323.         }
  324.       }
  325.     }
  326.     return blockarray;
  327.   }
  328.   void reportDiff(BlocksMap blocksMap,
  329.                   BlockListAsLongs newReport,
  330.                   Collection<Block> toAdd,
  331.                   Collection<Block> toRemove,
  332.                   Collection<Block> toInvalidate) {
  333.     // place a deilimiter in the list which separates blocks 
  334.     // that have been reported from those that have not
  335.     BlockInfo delimiter = new BlockInfo(new Block(), 1);
  336.     boolean added = this.addBlock(delimiter);
  337.     assert added : "Delimiting block cannot be present in the node";
  338.     if(newReport == null)
  339.       newReport = new BlockListAsLongs( new long[0]);
  340.     // scan the report and collect newly reported blocks
  341.     // Note we are taking special precaution to limit tmp blocks allocated
  342.     // as part this block report - which why block list is stored as longs
  343.     Block iblk = new Block(); // a fixed new'ed block to be reused with index i
  344.     for (int i = 0; i < newReport.getNumberOfBlocks(); ++i) {
  345.       iblk.set(newReport.getBlockId(i), newReport.getBlockLen(i), 
  346.                newReport.getBlockGenStamp(i));
  347.       BlockInfo storedBlock = blocksMap.getStoredBlock(iblk);
  348.       if(storedBlock == null) {
  349.         // If block is not in blocksMap it does not belong to any file
  350.         toInvalidate.add(new Block(iblk));
  351.         continue;
  352.       }
  353.       if(storedBlock.findDatanode(this) < 0) {// Known block, but not on the DN
  354.         // if the size differs from what is in the blockmap, then return
  355.         // the new block. addStoredBlock will then pick up the right size of this
  356.         // block and will update the block object in the BlocksMap
  357.         if (storedBlock.getNumBytes() != iblk.getNumBytes()) {
  358.           toAdd.add(new Block(iblk));
  359.         } else {
  360.           toAdd.add(storedBlock);
  361.         }
  362.         continue;
  363.       }
  364.       // move block to the head of the list
  365.       this.moveBlockToHead(storedBlock);
  366.     }
  367.     // collect blocks that have not been reported
  368.     // all of them are next to the delimiter
  369.     Iterator<Block> it = new BlockIterator(delimiter.getNext(0), this);
  370.     while(it.hasNext())
  371.       toRemove.add(it.next());
  372.     this.removeBlock(delimiter);
  373.   }
  374.   /** Serialization for FSEditLog */
  375.   void readFieldsFromFSEditLog(DataInput in) throws IOException {
  376.     this.name = UTF8.readString(in);
  377.     this.storageID = UTF8.readString(in);
  378.     this.infoPort = in.readShort() & 0x0000ffff;
  379.     this.capacity = in.readLong();
  380.     this.dfsUsed = in.readLong();
  381.     this.remaining = in.readLong();
  382.     this.lastUpdate = in.readLong();
  383.     this.xceiverCount = in.readInt();
  384.     this.location = Text.readString(in);
  385.     this.hostName = Text.readString(in);
  386.     setAdminState(WritableUtils.readEnum(in, AdminStates.class));
  387.   }
  388.   
  389.   /**
  390.    * @return Approximate number of blocks currently scheduled to be written 
  391.    * to this datanode.
  392.    */
  393.   public int getBlocksScheduled() {
  394.     return currApproxBlocksScheduled + prevApproxBlocksScheduled;
  395.   }
  396.   
  397.   /**
  398.    * Increments counter for number of blocks scheduled. 
  399.    */
  400.   void incBlocksScheduled() {
  401.     currApproxBlocksScheduled++;
  402.   }
  403.   
  404.   /**
  405.    * Decrements counter for number of blocks scheduled.
  406.    */
  407.   void decBlocksScheduled() {
  408.     if (prevApproxBlocksScheduled > 0) {
  409.       prevApproxBlocksScheduled--;
  410.     } else if (currApproxBlocksScheduled > 0) {
  411.       currApproxBlocksScheduled--;
  412.     } 
  413.     // its ok if both counters are zero.
  414.   }
  415.   
  416.   /**
  417.    * Adjusts curr and prev number of blocks scheduled every few minutes.
  418.    */
  419.   private void rollBlocksScheduled(long now) {
  420.     if ((now - lastBlocksScheduledRollTime) > 
  421.         BLOCKS_SCHEDULED_ROLL_INTERVAL) {
  422.       prevApproxBlocksScheduled = currApproxBlocksScheduled;
  423.       currApproxBlocksScheduled = 0;
  424.       lastBlocksScheduledRollTime = now;
  425.     }
  426.   }
  427. }