PoolManager.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:12k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.File;
  20. import java.io.IOException;
  21. import java.util.ArrayList;
  22. import java.util.Collection;
  23. import java.util.Collections;
  24. import java.util.HashMap;
  25. import java.util.List;
  26. import java.util.Map;
  27. import javax.xml.parsers.DocumentBuilder;
  28. import javax.xml.parsers.DocumentBuilderFactory;
  29. import javax.xml.parsers.ParserConfigurationException;
  30. import org.apache.commons.logging.Log;
  31. import org.apache.commons.logging.LogFactory;
  32. import org.apache.hadoop.conf.Configuration;
  33. import org.w3c.dom.Document;
  34. import org.w3c.dom.Element;
  35. import org.w3c.dom.Node;
  36. import org.w3c.dom.NodeList;
  37. import org.w3c.dom.Text;
  38. import org.xml.sax.SAXException;
  39. /**
  40.  * Maintains a hierarchy of pools.
  41.  */
  42. public class PoolManager {
  43.   public static final Log LOG = LogFactory.getLog(
  44.     "org.apache.hadoop.mapred.PoolManager");
  45.   /** Time to wait between checks of the allocation file */
  46.   public static final long ALLOC_RELOAD_INTERVAL = 10 * 1000;
  47.   
  48.   /**
  49.    * Time to wait after the allocation has been modified before reloading it
  50.    * (this is done to prevent loading a file that hasn't been fully written).
  51.    */
  52.   public static final long ALLOC_RELOAD_WAIT = 5 * 1000; 
  53.   
  54.   // Map and reduce minimum allocations for each pool
  55.   private Map<String, Integer> mapAllocs = new HashMap<String, Integer>();
  56.   private Map<String, Integer> reduceAllocs = new HashMap<String, Integer>();
  57.   // Sharing weights for each pool
  58.   private Map<String, Double> poolWeights = new HashMap<String, Double>();
  59.   
  60.   // Max concurrent running jobs for each pool and for each user; in addition,
  61.   // for users that have no max specified, we use the userMaxJobsDefault.
  62.   private Map<String, Integer> poolMaxJobs = new HashMap<String, Integer>();
  63.   private Map<String, Integer> userMaxJobs = new HashMap<String, Integer>();
  64.   private int userMaxJobsDefault = Integer.MAX_VALUE;
  65.   private String allocFile; // Path to XML file containing allocations
  66.   private String poolNameProperty; // Jobconf property to use for determining a
  67.                                    // job's pool name (default: mapred.job.queue.name)
  68.   
  69.   private Map<String, Pool> pools = new HashMap<String, Pool>();
  70.   
  71.   private long lastReloadAttempt; // Last time we tried to reload the pools file
  72.   private long lastSuccessfulReload; // Last time we successfully reloaded pools
  73.   private boolean lastReloadAttemptFailed = false;
  74.   public PoolManager(Configuration conf) throws IOException, SAXException,
  75.       AllocationConfigurationException, ParserConfigurationException {
  76.     this.poolNameProperty = conf.get(
  77.         "mapred.fairscheduler.poolnameproperty", "user.name");
  78.     this.allocFile = conf.get("mapred.fairscheduler.allocation.file");
  79.     if (allocFile == null) {
  80.       LOG.warn("No mapred.fairscheduler.allocation.file given in jobconf - " +
  81.           "the fair scheduler will not use any queues.");
  82.     }
  83.     reloadAllocs();
  84.     lastSuccessfulReload = System.currentTimeMillis();
  85.     lastReloadAttempt = System.currentTimeMillis();
  86.     // Create the default pool so that it shows up in the web UI
  87.     getPool(Pool.DEFAULT_POOL_NAME);
  88.   }
  89.   
  90.   /**
  91.    * Get a pool by name, creating it if necessary
  92.    */
  93.   public synchronized Pool getPool(String name) {
  94.     Pool pool = pools.get(name);
  95.     if (pool == null) {
  96.       pool = new Pool(name);
  97.       pools.put(name, pool);
  98.     }
  99.     return pool;
  100.   }
  101.   /**
  102.    * Reload allocations file if it hasn't been loaded in a while
  103.    */
  104.   public void reloadAllocsIfNecessary() {
  105.     long time = System.currentTimeMillis();
  106.     if (time > lastReloadAttempt + ALLOC_RELOAD_INTERVAL) {
  107.       lastReloadAttempt = time;
  108.       try {
  109.         File file = new File(allocFile);
  110.         long lastModified = file.lastModified();
  111.         if (lastModified > lastSuccessfulReload &&
  112.             time > lastModified + ALLOC_RELOAD_WAIT) {
  113.           reloadAllocs();
  114.           lastSuccessfulReload = time;
  115.           lastReloadAttemptFailed = false;
  116.         }
  117.       } catch (Exception e) {
  118.         // Throwing the error further out here won't help - the RPC thread
  119.         // will catch it and report it in a loop. Instead, just log it and
  120.         // hope somebody will notice from the log.
  121.         // We log the error only on the first failure so we don't fill up the
  122.         // JobTracker's log with these messages.
  123.         if (!lastReloadAttemptFailed) {
  124.           LOG.error("Failed to reload allocations file - " +
  125.               "will use existing allocations.", e);
  126.         }
  127.         lastReloadAttemptFailed = true;
  128.       }
  129.     }
  130.   }
  131.   
  132.   /**
  133.    * Updates the allocation list from the allocation config file. This file is
  134.    * expected to be in the following whitespace-separated format:
  135.    * 
  136.    * <code>
  137.    * poolName1 mapAlloc reduceAlloc
  138.    * poolName2 mapAlloc reduceAlloc
  139.    * ...
  140.    * </code>
  141.    * 
  142.    * Blank lines and lines starting with # are ignored.
  143.    *  
  144.    * @throws IOException if the config file cannot be read.
  145.    * @throws AllocationConfigurationException if allocations are invalid.
  146.    * @throws ParserConfigurationException if XML parser is misconfigured.
  147.    * @throws SAXException if config file is malformed.
  148.    */
  149.   public void reloadAllocs() throws IOException, ParserConfigurationException, 
  150.       SAXException, AllocationConfigurationException {
  151.     if (allocFile == null) return;
  152.     // Create some temporary hashmaps to hold the new allocs, and we only save
  153.     // them in our fields if we have parsed the entire allocs file successfully.
  154.     Map<String, Integer> mapAllocs = new HashMap<String, Integer>();
  155.     Map<String, Integer> reduceAllocs = new HashMap<String, Integer>();
  156.     Map<String, Integer> poolMaxJobs = new HashMap<String, Integer>();
  157.     Map<String, Integer> userMaxJobs = new HashMap<String, Integer>();
  158.     Map<String, Double> poolWeights = new HashMap<String, Double>();
  159.     int userMaxJobsDefault = Integer.MAX_VALUE;
  160.     
  161.     // Remember all pool names so we can display them on web UI, etc.
  162.     List<String> poolNamesInAllocFile = new ArrayList<String>();
  163.     
  164.     // Read and parse the allocations file.
  165.     DocumentBuilderFactory docBuilderFactory =
  166.       DocumentBuilderFactory.newInstance();
  167.     docBuilderFactory.setIgnoringComments(true);
  168.     DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
  169.     Document doc = builder.parse(new File(allocFile));
  170.     Element root = doc.getDocumentElement();
  171.     if (!"allocations".equals(root.getTagName()))
  172.       throw new AllocationConfigurationException("Bad allocations file: " + 
  173.           "top-level element not <allocations>");
  174.     NodeList elements = root.getChildNodes();
  175.     for (int i = 0; i < elements.getLength(); i++) {
  176.       Node node = elements.item(i);
  177.       if (!(node instanceof Element))
  178.         continue;
  179.       Element element = (Element)node;
  180.       if ("pool".equals(element.getTagName())) {
  181.         String poolName = element.getAttribute("name");
  182.         poolNamesInAllocFile.add(poolName);
  183.         NodeList fields = element.getChildNodes();
  184.         for (int j = 0; j < fields.getLength(); j++) {
  185.           Node fieldNode = fields.item(j);
  186.           if (!(fieldNode instanceof Element))
  187.             continue;
  188.           Element field = (Element) fieldNode;
  189.           if ("minMaps".equals(field.getTagName())) {
  190.             String text = ((Text)field.getFirstChild()).getData().trim();
  191.             int val = Integer.parseInt(text);
  192.             mapAllocs.put(poolName, val);
  193.           } else if ("minReduces".equals(field.getTagName())) {
  194.             String text = ((Text)field.getFirstChild()).getData().trim();
  195.             int val = Integer.parseInt(text);
  196.             reduceAllocs.put(poolName, val);
  197.           } else if ("maxRunningJobs".equals(field.getTagName())) {
  198.             String text = ((Text)field.getFirstChild()).getData().trim();
  199.             int val = Integer.parseInt(text);
  200.             poolMaxJobs.put(poolName, val);
  201.           } else if ("weight".equals(field.getTagName())) {
  202.             String text = ((Text)field.getFirstChild()).getData().trim();
  203.             double val = Double.parseDouble(text);
  204.             poolWeights.put(poolName, val);
  205.           }
  206.         }
  207.       } else if ("user".equals(element.getTagName())) {
  208.         String userName = element.getAttribute("name");
  209.         NodeList fields = element.getChildNodes();
  210.         for (int j = 0; j < fields.getLength(); j++) {
  211.           Node fieldNode = fields.item(j);
  212.           if (!(fieldNode instanceof Element))
  213.             continue;
  214.           Element field = (Element) fieldNode;
  215.           if ("maxRunningJobs".equals(field.getTagName())) {
  216.             String text = ((Text)field.getFirstChild()).getData().trim();
  217.             int val = Integer.parseInt(text);
  218.             userMaxJobs.put(userName, val);
  219.           }
  220.         }
  221.       } else if ("userMaxJobsDefault".equals(element.getTagName())) {
  222.         String text = ((Text)element.getFirstChild()).getData().trim();
  223.         int val = Integer.parseInt(text);
  224.         userMaxJobsDefault = val;
  225.       } else {
  226.         LOG.warn("Bad element in allocations file: " + element.getTagName());
  227.       }
  228.     }
  229.     
  230.     // Commit the reload; also create any pool defined in the alloc file
  231.     // if it does not already exist, so it can be displayed on the web UI.
  232.     synchronized(this) {
  233.       this.mapAllocs = mapAllocs;
  234.       this.reduceAllocs = reduceAllocs;
  235.       this.poolMaxJobs = poolMaxJobs;
  236.       this.userMaxJobs = userMaxJobs;
  237.       this.userMaxJobsDefault = userMaxJobsDefault;
  238.       this.poolWeights = poolWeights;
  239.       for (String name: poolNamesInAllocFile) {
  240.         getPool(name);
  241.       }
  242.     }
  243.   }
  244.   /**
  245.    * Get the allocation for a particular pool
  246.    */
  247.   public int getAllocation(String pool, TaskType taskType) {
  248.     Map<String, Integer> allocationMap = (taskType == TaskType.MAP ?
  249.         mapAllocs : reduceAllocs);
  250.     Integer alloc = allocationMap.get(pool);
  251.     return (alloc == null ? 0 : alloc);
  252.   }
  253.   
  254.   /**
  255.    * Add a job in the appropriate pool
  256.    */
  257.   public synchronized void addJob(JobInProgress job) {
  258.     getPool(getPoolName(job)).addJob(job);
  259.   }
  260.   
  261.   /**
  262.    * Remove a job
  263.    */
  264.   public synchronized void removeJob(JobInProgress job) {
  265.     getPool(getPoolName(job)).removeJob(job);
  266.   }
  267.   
  268.   /**
  269.    * Change the pool of a particular job
  270.    */
  271.   public synchronized void setPool(JobInProgress job, String pool) {
  272.     removeJob(job);
  273.     job.getJobConf().set(poolNameProperty, pool);
  274.     addJob(job);
  275.   }
  276.   /**
  277.    * Get a collection of all pools
  278.    */
  279.   public synchronized Collection<Pool> getPools() {
  280.     return pools.values();
  281.   }
  282.   
  283.   /**
  284.    * Get the pool name for a JobInProgress from its configuration. This uses
  285.    * the "project" property in the jobconf by default, or the property set with
  286.    * "mapred.fairscheduler.poolnameproperty".
  287.    */
  288.   public String getPoolName(JobInProgress job) {
  289.     JobConf conf = job.getJobConf();
  290.     return conf.get(poolNameProperty, Pool.DEFAULT_POOL_NAME).trim();
  291.   }
  292.   /**
  293.    * Get all pool names that have been seen either in the allocation file or in
  294.    * a MapReduce job.
  295.    */
  296.   public synchronized Collection<String> getPoolNames() {
  297.     List<String> list = new ArrayList<String>();
  298.     for (Pool pool: getPools()) {
  299.       list.add(pool.getName());
  300.     }
  301.     Collections.sort(list);
  302.     return list;
  303.   }
  304.   public int getUserMaxJobs(String user) {
  305.     if (userMaxJobs.containsKey(user)) {
  306.       return userMaxJobs.get(user);
  307.     } else {
  308.       return userMaxJobsDefault;
  309.     }
  310.   }
  311.   public int getPoolMaxJobs(String pool) {
  312.     if (poolMaxJobs.containsKey(pool)) {
  313.       return poolMaxJobs.get(pool);
  314.     } else {
  315.       return Integer.MAX_VALUE;
  316.     }
  317.   }
  318.   public double getPoolWeight(String pool) {
  319.     if (poolWeights.containsKey(pool)) {
  320.       return poolWeights.get(pool);
  321.     } else {
  322.       return 1.0;
  323.     }
  324.   }
  325. }