LocalDirAllocator.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:16k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.fs;
  19. import java.io.*;
  20. import java.util.*;
  21. import org.apache.commons.logging.*;
  22. import org.apache.hadoop.util.*;
  23. import org.apache.hadoop.fs.FileSystem;
  24. import org.apache.hadoop.fs.Path;
  25. import org.apache.hadoop.util.DiskChecker.DiskErrorException;
  26. import org.apache.hadoop.conf.Configuration; 
  27. /** An implementation of a round-robin scheme for disk allocation for creating
  28.  * files. The way it works is that it is kept track what disk was last
  29.  * allocated for a file write. For the current request, the next disk from
  30.  * the set of disks would be allocated if the free space on the disk is 
  31.  * sufficient enough to accomodate the file that is being considered for
  32.  * creation. If the space requirements cannot be met, the next disk in order
  33.  * would be tried and so on till a disk is found with sufficient capacity.
  34.  * Once a disk with sufficient space is identified, a check is done to make
  35.  * sure that the disk is writable. Also, there is an API provided that doesn't
  36.  * take the space requirements into consideration but just checks whether the
  37.  * disk under consideration is writable (this should be used for cases where
  38.  * the file size is not known apriori). An API is provided to read a path that
  39.  * was created earlier. That API works by doing a scan of all the disks for the
  40.  * input pathname.
  41.  * This implementation also provides the functionality of having multiple 
  42.  * allocators per JVM (one for each unique functionality or context, like 
  43.  * mapred, dfs-client, etc.). It ensures that there is only one instance of
  44.  * an allocator per context per JVM.
  45.  * Note:
  46.  * 1. The contexts referred above are actually the configuration items defined
  47.  * in the Configuration class like "mapred.local.dir" (for which we want to 
  48.  * control the dir allocations). The context-strings are exactly those 
  49.  * configuration items.
  50.  * 2. This implementation does not take into consideration cases where
  51.  * a disk becomes read-only or goes out of space while a file is being written
  52.  * to (disks are shared between multiple processes, and so the latter situation
  53.  * is probable).
  54.  * 3. In the class implementation, "Disk" is referred to as "Dir", which
  55.  * actually points to the configured directory on the Disk which will be the
  56.  * parent for all file write/read allocations.
  57.  */
  58. public class LocalDirAllocator {
  59.   
  60.   //A Map from the config item names like "mapred.local.dir", 
  61.   //"dfs.client.buffer.dir" to the instance of the AllocatorPerContext. This
  62.   //is a static object to make sure there exists exactly one instance per JVM
  63.   private static Map <String, AllocatorPerContext> contexts = 
  64.                  new TreeMap<String, AllocatorPerContext>();
  65.   private String contextCfgItemName;
  66.   /**Create an allocator object
  67.    * @param contextCfgItemName
  68.    */
  69.   public LocalDirAllocator(String contextCfgItemName) {
  70.     this.contextCfgItemName = contextCfgItemName;
  71.   }
  72.   
  73.   /** This method must be used to obtain the dir allocation context for a 
  74.    * particular value of the context name. The context name must be an item
  75.    * defined in the Configuration object for which we want to control the 
  76.    * dir allocations (e.g., <code>mapred.local.dir</code>). The method will
  77.    * create a context for that name if it doesn't already exist.
  78.    */
  79.   private AllocatorPerContext obtainContext(String contextCfgItemName) {
  80.     synchronized (contexts) {
  81.       AllocatorPerContext l = contexts.get(contextCfgItemName);
  82.       if (l == null) {
  83.         contexts.put(contextCfgItemName, 
  84.                     (l = new AllocatorPerContext(contextCfgItemName)));
  85.       }
  86.       return l;
  87.     }
  88.   }
  89.   
  90.   /** Get a path from the local FS. This method should be used if the size of 
  91.    *  the file is not known apriori. We go round-robin over the set of disks
  92.    *  (via the configured dirs) and return the first complete path where
  93.    *  we could create the parent directory of the passed path. 
  94.    *  @param pathStr the requested path (this will be created on the first 
  95.    *  available disk)
  96.    *  @param conf the Configuration object
  97.    *  @return the complete path to the file on a local disk
  98.    *  @throws IOException
  99.    */
  100.   public Path getLocalPathForWrite(String pathStr, 
  101.       Configuration conf) throws IOException {
  102.     return getLocalPathForWrite(pathStr, -1, conf);
  103.   }
  104.   
  105.   /** Get a path from the local FS. Pass size as -1 if not known apriori. We
  106.    *  round-robin over the set of disks (via the configured dirs) and return
  107.    *  the first complete path which has enough space 
  108.    *  @param pathStr the requested path (this will be created on the first 
  109.    *  available disk)
  110.    *  @param size the size of the file that is going to be written
  111.    *  @param conf the Configuration object
  112.    *  @return the complete path to the file on a local disk
  113.    *  @throws IOException
  114.    */
  115.   public Path getLocalPathForWrite(String pathStr, long size, 
  116.       Configuration conf) throws IOException {
  117.     AllocatorPerContext context = obtainContext(contextCfgItemName);
  118.     return context.getLocalPathForWrite(pathStr, size, conf);
  119.   }
  120.   
  121.   /** Get a path from the local FS for reading. We search through all the
  122.    *  configured dirs for the file's existence and return the complete
  123.    *  path to the file when we find one 
  124.    *  @param pathStr the requested file (this will be searched)
  125.    *  @param conf the Configuration object
  126.    *  @return the complete path to the file on a local disk
  127.    *  @throws IOException
  128.    */
  129.   public Path getLocalPathToRead(String pathStr, 
  130.       Configuration conf) throws IOException {
  131.     AllocatorPerContext context = obtainContext(contextCfgItemName);
  132.     return context.getLocalPathToRead(pathStr, conf);
  133.   }
  134.   /** Creates a temporary file in the local FS. Pass size as -1 if not known 
  135.    *  apriori. We round-robin over the set of disks (via the configured dirs) 
  136.    *  and select the first complete path which has enough space. A file is
  137.    *  created on this directory. The file is guaranteed to go away when the
  138.    *  JVM exits.
  139.    *  @param pathStr prefix for the temporary file
  140.    *  @param size the size of the file that is going to be written
  141.    *  @param conf the Configuration object
  142.    *  @return a unique temporary file
  143.    *  @throws IOException
  144.    */
  145.   public File createTmpFileForWrite(String pathStr, long size, 
  146.       Configuration conf) throws IOException {
  147.     AllocatorPerContext context = obtainContext(contextCfgItemName);
  148.     return context.createTmpFileForWrite(pathStr, size, conf);
  149.   }
  150.   
  151.   /** Method to check whether a context is valid
  152.    * @param contextCfgItemName
  153.    * @return true/false
  154.    */
  155.   public static boolean isContextValid(String contextCfgItemName) {
  156.     synchronized (contexts) {
  157.       return contexts.containsKey(contextCfgItemName);
  158.     }
  159.   }
  160.     
  161.   /** We search through all the configured dirs for the file's existence
  162.    *  and return true when we find  
  163.    *  @param pathStr the requested file (this will be searched)
  164.    *  @param conf the Configuration object
  165.    *  @return true if files exist. false otherwise
  166.    *  @throws IOException
  167.    */
  168.   public boolean ifExists(String pathStr,Configuration conf) {
  169.     AllocatorPerContext context = obtainContext(contextCfgItemName);
  170.     return context.ifExists(pathStr, conf);
  171.   }
  172.   /**
  173.    * Get the current directory index for the given configuration item.
  174.    * @return the current directory index for the given configuration item.
  175.    */
  176.   int getCurrentDirectoryIndex() {
  177.     AllocatorPerContext context = obtainContext(contextCfgItemName);
  178.     return context.getCurrentDirectoryIndex();
  179.   }
  180.   
  181.   private static class AllocatorPerContext {
  182.     private final Log LOG =
  183.       LogFactory.getLog(AllocatorPerContext.class);
  184.     private int dirNumLastAccessed;
  185.     private Random dirIndexRandomizer = new Random();
  186.     private FileSystem localFS;
  187.     private DF[] dirDF;
  188.     private String contextCfgItemName;
  189.     private String[] localDirs;
  190.     private String savedLocalDirs = "";
  191.     public AllocatorPerContext(String contextCfgItemName) {
  192.       this.contextCfgItemName = contextCfgItemName;
  193.     }
  194.     /** This method gets called everytime before any read/write to make sure
  195.      * that any change to localDirs is reflected immediately.
  196.      */
  197.     private void confChanged(Configuration conf) throws IOException {
  198.       String newLocalDirs = conf.get(contextCfgItemName);
  199.       if (!newLocalDirs.equals(savedLocalDirs)) {
  200.         localDirs = conf.getStrings(contextCfgItemName);
  201.         localFS = FileSystem.getLocal(conf);
  202.         int numDirs = localDirs.length;
  203.         ArrayList<String> dirs = new ArrayList<String>(numDirs);
  204.         ArrayList<DF> dfList = new ArrayList<DF>(numDirs);
  205.         for (int i = 0; i < numDirs; i++) {
  206.           try {
  207.             // filter problematic directories
  208.             Path tmpDir = new Path(localDirs[i]);
  209.             if(localFS.mkdirs(tmpDir)|| localFS.exists(tmpDir)) {
  210.               try {
  211.                 DiskChecker.checkDir(new File(localDirs[i]));
  212.                 dirs.add(localDirs[i]);
  213.                 dfList.add(new DF(new File(localDirs[i]), 30000));
  214.               } catch (DiskErrorException de) {
  215.                 LOG.warn( localDirs[i] + "is not writablen" +
  216.                     StringUtils.stringifyException(de));
  217.               }
  218.             } else {
  219.               LOG.warn( "Failed to create " + localDirs[i]);
  220.             }
  221.           } catch (IOException ie) { 
  222.             LOG.warn( "Failed to create " + localDirs[i] + ": " +
  223.                 ie.getMessage() + "n" + StringUtils.stringifyException(ie));
  224.           } //ignore
  225.         }
  226.         localDirs = dirs.toArray(new String[dirs.size()]);
  227.         dirDF = dfList.toArray(new DF[dirs.size()]);
  228.         savedLocalDirs = newLocalDirs;
  229.         
  230.         // randomize the first disk picked in the round-robin selection 
  231.         dirNumLastAccessed = dirIndexRandomizer.nextInt(dirs.size());
  232.       }
  233.     }
  234.     private Path createPath(String path) throws IOException {
  235.       Path file = new Path(new Path(localDirs[dirNumLastAccessed]),
  236.                                     path);
  237.       //check whether we are able to create a directory here. If the disk
  238.       //happens to be RDONLY we will fail
  239.       try {
  240.         DiskChecker.checkDir(new File(file.getParent().toUri().getPath()));
  241.         return file;
  242.       } catch (DiskErrorException d) {
  243.         LOG.warn(StringUtils.stringifyException(d));
  244.         return null;
  245.       }
  246.     }
  247.     /**
  248.      * Get the current directory index.
  249.      * @return the current directory index.
  250.      */
  251.     int getCurrentDirectoryIndex() {
  252.       return dirNumLastAccessed;
  253.     }
  254.     
  255.     /** Get a path from the local FS. This method should be used if the size of 
  256.      *  the file is not known a priori. 
  257.      *  
  258.      *  It will use roulette selection, picking directories
  259.      *  with probability proportional to their available space. 
  260.      */
  261.     public synchronized Path getLocalPathForWrite(String path, 
  262.         Configuration conf) throws IOException {
  263.       return getLocalPathForWrite(path, -1, conf);
  264.     }
  265.     /** Get a path from the local FS. If size is known, we go
  266.      *  round-robin over the set of disks (via the configured dirs) and return
  267.      *  the first complete path which has enough space.
  268.      *  
  269.      *  If size is not known, use roulette selection -- pick directories
  270.      *  with probability proportional to their available space.
  271.      */
  272.     public synchronized Path getLocalPathForWrite(String pathStr, long size, 
  273.         Configuration conf) throws IOException {
  274.       confChanged(conf);
  275.       int numDirs = localDirs.length;
  276.       int numDirsSearched = 0;
  277.       //remove the leading slash from the path (to make sure that the uri
  278.       //resolution results in a valid path on the dir being checked)
  279.       if (pathStr.startsWith("/")) {
  280.         pathStr = pathStr.substring(1);
  281.       }
  282.       Path returnPath = null;
  283.       
  284.       if(size == -1) {  //do roulette selection: pick dir with probability 
  285.                     //proportional to available size
  286.         long[] availableOnDisk = new long[dirDF.length];
  287.         long totalAvailable = 0;
  288.         
  289.             //build the "roulette wheel"
  290.         for(int i =0; i < dirDF.length; ++i) {
  291.           availableOnDisk[i] = dirDF[i].getAvailable();
  292.           totalAvailable += availableOnDisk[i];
  293.         }
  294.             // "roll the ball" -- pick a directory
  295.         Random r = new java.util.Random();
  296.         long randomPosition = Math.abs(r.nextLong()) % totalAvailable;
  297.         int dir=0;
  298.         while(randomPosition > availableOnDisk[dir]) {
  299.           randomPosition -= availableOnDisk[dir];
  300.           dir++;
  301.         }
  302.         dirNumLastAccessed = dir;
  303.         returnPath = createPath(pathStr);
  304.       } else {
  305.         while (numDirsSearched < numDirs && returnPath == null) {
  306.           long capacity = dirDF[dirNumLastAccessed].getAvailable();
  307.           if (capacity > size) {
  308.             returnPath = createPath(pathStr);
  309.           }
  310.           dirNumLastAccessed++;
  311.           dirNumLastAccessed = dirNumLastAccessed % numDirs; 
  312.           numDirsSearched++;
  313.         } 
  314.       }
  315.       if (returnPath != null) {
  316.         return returnPath;
  317.       }
  318.       
  319.       //no path found
  320.       throw new DiskErrorException("Could not find any valid local " +
  321.           "directory for " + pathStr);
  322.     }
  323.     /** Creates a file on the local FS. Pass size as -1 if not known apriori. We
  324.      *  round-robin over the set of disks (via the configured dirs) and return
  325.      *  a file on the first path which has enough space. The file is guaranteed
  326.      *  to go away when the JVM exits.
  327.      */
  328.     public File createTmpFileForWrite(String pathStr, long size, 
  329.         Configuration conf) throws IOException {
  330.       // find an appropriate directory
  331.       Path path = getLocalPathForWrite(pathStr, size, conf);
  332.       File dir = new File(path.getParent().toUri().getPath());
  333.       String prefix = path.getName();
  334.       // create a temp file on this directory
  335.       File result = File.createTempFile(prefix, null, dir);
  336.       result.deleteOnExit();
  337.       return result;
  338.     }
  339.     /** Get a path from the local FS for reading. We search through all the
  340.      *  configured dirs for the file's existence and return the complete
  341.      *  path to the file when we find one 
  342.      */
  343.     public synchronized Path getLocalPathToRead(String pathStr, 
  344.         Configuration conf) throws IOException {
  345.       confChanged(conf);
  346.       int numDirs = localDirs.length;
  347.       int numDirsSearched = 0;
  348.       //remove the leading slash from the path (to make sure that the uri
  349.       //resolution results in a valid path on the dir being checked)
  350.       if (pathStr.startsWith("/")) {
  351.         pathStr = pathStr.substring(1);
  352.       }
  353.       while (numDirsSearched < numDirs) {
  354.         Path file = new Path(localDirs[numDirsSearched], pathStr);
  355.         if (localFS.exists(file)) {
  356.           return file;
  357.         }
  358.         numDirsSearched++;
  359.       }
  360.       //no path found
  361.       throw new DiskErrorException ("Could not find " + pathStr +" in any of" +
  362.       " the configured local directories");
  363.     }
  364.     /** We search through all the configured dirs for the file's existence
  365.      *  and return true when we find one 
  366.      */
  367.     public synchronized boolean ifExists(String pathStr,Configuration conf) {
  368.       try {
  369.         int numDirs = localDirs.length;
  370.         int numDirsSearched = 0;
  371.         //remove the leading slash from the path (to make sure that the uri
  372.         //resolution results in a valid path on the dir being checked)
  373.         if (pathStr.startsWith("/")) {
  374.           pathStr = pathStr.substring(1);
  375.         }
  376.         while (numDirsSearched < numDirs) {
  377.           Path file = new Path(localDirs[numDirsSearched], pathStr);
  378.           if (localFS.exists(file)) {
  379.             return true;
  380.           }
  381.           numDirsSearched++;
  382.         }
  383.       } catch (IOException e) {
  384.         // IGNORE and try again
  385.       }
  386.       return false;
  387.     }
  388.   }
  389. }