LeaseManager.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:12k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfs.server.namenode;
  19. import java.io.IOException;
  20. import java.util.ArrayList;
  21. import java.util.Collection;
  22. import java.util.List;
  23. import java.util.Map;
  24. import java.util.SortedMap;
  25. import java.util.SortedSet;
  26. import java.util.TreeMap;
  27. import java.util.TreeSet;
  28. import org.apache.commons.logging.Log;
  29. import org.apache.commons.logging.LogFactory;
  30. import org.apache.hadoop.fs.Path;
  31. import org.apache.hadoop.hdfs.protocol.FSConstants;
  32. /**
  33.  * LeaseManager does the lease housekeeping for writing on files.   
  34.  * This class also provides useful static methods for lease recovery.
  35.  * 
  36.  * Lease Recovery Algorithm
  37.  * 1) Namenode retrieves lease information
  38.  * 2) For each file f in the lease, consider the last block b of f
  39.  * 2.1) Get the datanodes which contains b
  40.  * 2.2) Assign one of the datanodes as the primary datanode p
  41.  * 2.3) p obtains a new generation stamp form the namenode
  42.  * 2.4) p get the block info from each datanode
  43.  * 2.5) p computes the minimum block length
  44.  * 2.6) p updates the datanodes, which have a valid generation stamp,
  45.  *      with the new generation stamp and the minimum block length 
  46.  * 2.7) p acknowledges the namenode the update results
  47.  * 2.8) Namenode updates the BlockInfo
  48.  * 2.9) Namenode removes f from the lease
  49.  *      and removes the lease once all files have been removed
  50.  * 2.10) Namenode commit changes to edit log
  51.  */
  52. public class LeaseManager {
  53.   public static final Log LOG = LogFactory.getLog(LeaseManager.class);
  54.   private final FSNamesystem fsnamesystem;
  55.   private long softLimit = FSConstants.LEASE_SOFTLIMIT_PERIOD;
  56.   private long hardLimit = FSConstants.LEASE_HARDLIMIT_PERIOD;
  57.   //
  58.   // Used for handling lock-leases
  59.   // Mapping: leaseHolder -> Lease
  60.   //
  61.   private SortedMap<String, Lease> leases = new TreeMap<String, Lease>();
  62.   // Set of: Lease
  63.   private SortedSet<Lease> sortedLeases = new TreeSet<Lease>();
  64.   // 
  65.   // Map path names to leases. It is protected by the sortedLeases lock.
  66.   // The map stores pathnames in lexicographical order.
  67.   //
  68.   private SortedMap<String, Lease> sortedLeasesByPath = new TreeMap<String, Lease>();
  69.   LeaseManager(FSNamesystem fsnamesystem) {this.fsnamesystem = fsnamesystem;}
  70.   Lease getLease(String holder) {
  71.     return leases.get(holder);
  72.   }
  73.   
  74.   SortedSet<Lease> getSortedLeases() {return sortedLeases;}
  75.   /** @return the lease containing src */
  76.   public Lease getLeaseByPath(String src) {return sortedLeasesByPath.get(src);}
  77.   /** @return the number of leases currently in the system */
  78.   public synchronized int countLease() {return sortedLeases.size();}
  79.   /** @return the number of paths contained in all leases */
  80.   synchronized int countPath() {
  81.     int count = 0;
  82.     for(Lease lease : sortedLeases) {
  83.       count += lease.getPaths().size();
  84.     }
  85.     return count;
  86.   }
  87.   
  88.   /**
  89.    * Adds (or re-adds) the lease for the specified file.
  90.    */
  91.   synchronized void addLease(String holder, String src) {
  92.     Lease lease = getLease(holder);
  93.     if (lease == null) {
  94.       lease = new Lease(holder);
  95.       leases.put(holder, lease);
  96.       sortedLeases.add(lease);
  97.     } else {
  98.       renewLease(lease);
  99.     }
  100.     sortedLeasesByPath.put(src, lease);
  101.     lease.paths.add(src);
  102.   }
  103.   /**
  104.    * Remove the specified lease and src.
  105.    */
  106.   synchronized void removeLease(Lease lease, String src) {
  107.     sortedLeasesByPath.remove(src);
  108.     if (!lease.removePath(src)) {
  109.       LOG.error(src + " not found in lease.paths (=" + lease.paths + ")");
  110.     }
  111.     if (!lease.hasPath()) {
  112.       leases.remove(lease.holder);
  113.       if (!sortedLeases.remove(lease)) {
  114.         LOG.error(lease + " not found in sortedLeases");
  115.       }
  116.     }
  117.   }
  118.   /**
  119.    * Remove the lease for the specified holder and src
  120.    */
  121.   synchronized void removeLease(String holder, String src) {
  122.     Lease lease = getLease(holder);
  123.     if (lease != null) {
  124.       removeLease(lease, src);
  125.     }
  126.   }
  127.   /**
  128.    * Finds the pathname for the specified pendingFile
  129.    */
  130.   synchronized String findPath(INodeFileUnderConstruction pendingFile
  131.       ) throws IOException {
  132.     Lease lease = getLease(pendingFile.clientName);
  133.     if (lease != null) {
  134.       String src = lease.findPath(pendingFile);
  135.       if (src != null) {
  136.         return src;
  137.       }
  138.     }
  139.     throw new IOException("pendingFile (=" + pendingFile + ") not found."
  140.         + "(lease=" + lease + ")");
  141.   }
  142.   /**
  143.    * Renew the lease(s) held by the given client
  144.    */
  145.   synchronized void renewLease(String holder) {
  146.     renewLease(getLease(holder));
  147.   }
  148.   synchronized void renewLease(Lease lease) {
  149.     if (lease != null) {
  150.       sortedLeases.remove(lease);
  151.       lease.renew();
  152.       sortedLeases.add(lease);
  153.     }
  154.   }
  155.   /************************************************************
  156.    * A Lease governs all the locks held by a single client.
  157.    * For each client there's a corresponding lease, whose
  158.    * timestamp is updated when the client periodically
  159.    * checks in.  If the client dies and allows its lease to
  160.    * expire, all the corresponding locks can be released.
  161.    *************************************************************/
  162.   class Lease implements Comparable<Lease> {
  163.     private final String holder;
  164.     private long lastUpdate;
  165.     private final Collection<String> paths = new TreeSet<String>();
  166.   
  167.     /** Only LeaseManager object can create a lease */
  168.     private Lease(String holder) {
  169.       this.holder = holder;
  170.       renew();
  171.     }
  172.     /** Only LeaseManager object can renew a lease */
  173.     private void renew() {
  174.       this.lastUpdate = FSNamesystem.now();
  175.     }
  176.     /** @return true if the Hard Limit Timer has expired */
  177.     public boolean expiredHardLimit() {
  178.       return FSNamesystem.now() - lastUpdate > hardLimit;
  179.     }
  180.     /** @return true if the Soft Limit Timer has expired */
  181.     public boolean expiredSoftLimit() {
  182.       return FSNamesystem.now() - lastUpdate > softLimit;
  183.     }
  184.     /**
  185.      * @return the path associated with the pendingFile and null if not found.
  186.      */
  187.     private String findPath(INodeFileUnderConstruction pendingFile) {
  188.       for(String src : paths) {
  189.         if (fsnamesystem.dir.getFileINode(src) == pendingFile) {
  190.           return src;
  191.         }
  192.       }
  193.       return null;
  194.     }
  195.     /** Does this lease contain any path? */
  196.     boolean hasPath() {return !paths.isEmpty();}
  197.     boolean removePath(String src) {
  198.       return paths.remove(src);
  199.     }
  200.     /** {@inheritDoc} */
  201.     public String toString() {
  202.       return "[Lease.  Holder: " + holder
  203.           + ", pendingcreates: " + paths.size() + "]";
  204.     }
  205.   
  206.     /** {@inheritDoc} */
  207.     public int compareTo(Lease o) {
  208.       Lease l1 = this;
  209.       Lease l2 = o;
  210.       long lu1 = l1.lastUpdate;
  211.       long lu2 = l2.lastUpdate;
  212.       if (lu1 < lu2) {
  213.         return -1;
  214.       } else if (lu1 > lu2) {
  215.         return 1;
  216.       } else {
  217.         return l1.holder.compareTo(l2.holder);
  218.       }
  219.     }
  220.   
  221.     /** {@inheritDoc} */
  222.     public boolean equals(Object o) {
  223.       if (!(o instanceof Lease)) {
  224.         return false;
  225.       }
  226.       Lease obj = (Lease) o;
  227.       if (lastUpdate == obj.lastUpdate &&
  228.           holder.equals(obj.holder)) {
  229.         return true;
  230.       }
  231.       return false;
  232.     }
  233.   
  234.     /** {@inheritDoc} */
  235.     public int hashCode() {
  236.       return holder.hashCode();
  237.     }
  238.     
  239.     Collection<String> getPaths() {
  240.       return paths;
  241.     }
  242.     
  243.     void replacePath(String oldpath, String newpath) {
  244.       paths.remove(oldpath);
  245.       paths.add(newpath);
  246.     }
  247.   }
  248.   synchronized void changeLease(String src, String dst,
  249.       String overwrite, String replaceBy) {
  250.     if (LOG.isDebugEnabled()) {
  251.       LOG.debug(getClass().getSimpleName() + ".changelease: " +
  252.                " src=" + src + ", dest=" + dst + 
  253.                ", overwrite=" + overwrite +
  254.                ", replaceBy=" + replaceBy);
  255.     }
  256.     for(Map.Entry<String, Lease> entry : findLeaseWithPrefixPath(src, sortedLeasesByPath)) {
  257.       final String oldpath = entry.getKey();
  258.       final Lease lease = entry.getValue();
  259.       final String newpath = oldpath.replaceFirst(
  260.           java.util.regex.Pattern.quote(overwrite), replaceBy);
  261.       if (LOG.isDebugEnabled()) {
  262.         LOG.debug("changeLease: replacing " + oldpath + " with " + newpath);
  263.       }
  264.       lease.replacePath(oldpath, newpath);
  265.       sortedLeasesByPath.remove(oldpath);
  266.       sortedLeasesByPath.put(newpath, lease);
  267.     }
  268.   }
  269.   synchronized void removeLeaseWithPrefixPath(String prefix) {
  270.     for(Map.Entry<String, Lease> entry : findLeaseWithPrefixPath(prefix, sortedLeasesByPath)) {
  271.       if (LOG.isDebugEnabled()) {
  272.         LOG.debug(LeaseManager.class.getSimpleName()
  273.             + ".removeLeaseWithPrefixPath: entry=" + entry);
  274.       }
  275.       removeLease(entry.getValue(), entry.getKey());    
  276.     }
  277.   }
  278.   static private List<Map.Entry<String, Lease>> findLeaseWithPrefixPath(
  279.       String prefix, SortedMap<String, Lease> path2lease) {
  280.     if (LOG.isDebugEnabled()) {
  281.       LOG.debug(LeaseManager.class.getSimpleName() + ".findLease: prefix=" + prefix);
  282.     }
  283.     List<Map.Entry<String, Lease>> entries = new ArrayList<Map.Entry<String, Lease>>();
  284.     final int srclen = prefix.length();
  285.     for(Map.Entry<String, Lease> entry : path2lease.tailMap(prefix).entrySet()) {
  286.       final String p = entry.getKey();
  287.       if (!p.startsWith(prefix)) {
  288.         return entries;
  289.       }
  290.       if (p.length() == srclen || p.charAt(srclen) == Path.SEPARATOR_CHAR) {
  291.         entries.add(entry);
  292.       }
  293.     }
  294.     return entries;
  295.   }
  296.   public void setLeasePeriod(long softLimit, long hardLimit) {
  297.     this.softLimit = softLimit;
  298.     this.hardLimit = hardLimit; 
  299.   }
  300.   
  301.   /******************************************************
  302.    * Monitor checks for leases that have expired,
  303.    * and disposes of them.
  304.    ******************************************************/
  305.   class Monitor implements Runnable {
  306.     final String name = getClass().getSimpleName();
  307.     /** Check leases periodically. */
  308.     public void run() {
  309.       for(; fsnamesystem.isRunning(); ) {
  310.         synchronized(fsnamesystem) {
  311.           checkLeases();
  312.         }
  313.         try {
  314.           Thread.sleep(2000);
  315.         } catch(InterruptedException ie) {
  316.           if (LOG.isDebugEnabled()) {
  317.             LOG.debug(name + " is interrupted", ie);
  318.           }
  319.         }
  320.       }
  321.     }
  322.   }
  323.   /** Check the leases beginning from the oldest. */
  324.   private synchronized void checkLeases() {
  325.     for(; sortedLeases.size() > 0; ) {
  326.       final Lease oldest = sortedLeases.first();
  327.       if (!oldest.expiredHardLimit()) {
  328.         return;
  329.       }
  330.       LOG.info("Lease " + oldest + " has expired hard limit");
  331.       final List<String> removing = new ArrayList<String>();
  332.       // need to create a copy of the oldest lease paths, becuase 
  333.       // internalReleaseLease() removes paths corresponding to empty files,
  334.       // i.e. it needs to modify the collection being iterated over
  335.       // causing ConcurrentModificationException
  336.       String[] leasePaths = new String[oldest.getPaths().size()];
  337.       oldest.getPaths().toArray(leasePaths);
  338.       for(String p : leasePaths) {
  339.         try {
  340.           fsnamesystem.internalReleaseLease(oldest, p);
  341.         } catch (IOException e) {
  342.           LOG.error("Cannot release the path "+p+" in the lease "+oldest, e);
  343.           removing.add(p);
  344.         }
  345.       }
  346.       for(String p : removing) {
  347.         removeLease(oldest, p);
  348.       }
  349.     }
  350.   }
  351.   /** {@inheritDoc} */
  352.   public synchronized String toString() {
  353.     return getClass().getSimpleName() + "= {"
  354.         + "n leases=" + leases
  355.         + "n sortedLeases=" + sortedLeases
  356.         + "n sortedLeasesByPath=" + sortedLeasesByPath
  357.         + "n}";
  358.   }
  359. }