MemoryMatcher.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:8k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import org.apache.commons.logging.Log;
  20. import org.apache.commons.logging.LogFactory;
  21. class MemoryMatcher {
  22.   private static final Log LOG = LogFactory.getLog(MemoryMatcher.class);
  23.   private CapacityTaskScheduler scheduler;
  24.   public MemoryMatcher(CapacityTaskScheduler capacityTaskScheduler) {
  25.     this.scheduler = capacityTaskScheduler;
  26.   }
  27.   boolean isSchedulingBasedOnVmemEnabled() {
  28.     LOG.debug("defaultMaxVmPerTask : " + scheduler.defaultMaxVmPerTask
  29.         + " limitMaxVmemForTasks : " + scheduler.limitMaxVmemForTasks);
  30.     if (scheduler.defaultMaxVmPerTask == JobConf.DISABLED_MEMORY_LIMIT
  31.         || scheduler.limitMaxVmemForTasks == JobConf.DISABLED_MEMORY_LIMIT) {
  32.       return false;
  33.     }
  34.     return true;
  35.   }
  36.   boolean isSchedulingBasedOnPmemEnabled() {
  37.     LOG.debug("defaultPercentOfPmemInVmem : "
  38.         + scheduler.defaultPercentOfPmemInVmem + " limitMaxPmemForTasks : "
  39.         + scheduler.limitMaxPmemForTasks);
  40.     if (scheduler.defaultPercentOfPmemInVmem == JobConf.DISABLED_MEMORY_LIMIT
  41.         || scheduler.limitMaxPmemForTasks == JobConf.DISABLED_MEMORY_LIMIT) {
  42.       return false;
  43.     }
  44.     return true;
  45.   }
  46.   /**
  47.    * Obtain the virtual memory allocated for a job's tasks.
  48.    * 
  49.    * If the job has a configured value for the max-virtual memory, that will be
  50.    * returned. Else, the cluster-wide default max-virtual memory for tasks is
  51.    * returned.
  52.    * 
  53.    * This method can only be called after
  54.    * {@link CapacityTaskScheduler#initializeMemoryRelatedConf()} is invoked.
  55.    * 
  56.    * @param jConf JobConf of the job
  57.    * @return the virtual memory allocated for the job's tasks.
  58.    */
  59.   private long getVirtualMemoryForTask(JobConf jConf) {
  60.     long vMemForTask = jConf.getMaxVirtualMemoryForTask();
  61.     if (vMemForTask == JobConf.DISABLED_MEMORY_LIMIT) {
  62.       vMemForTask =
  63.           new JobConf().getLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
  64.               scheduler.defaultMaxVmPerTask);
  65.     }
  66.     return vMemForTask;
  67.   }
  68.   /**
  69.    * Obtain the physical memory allocated for a job's tasks.
  70.    * 
  71.    * If the job has a configured value for the max physical memory, that
  72.    * will be returned. Else, the cluster-wide default physical memory for
  73.    * tasks is returned.
  74.    * 
  75.    * This method can only be called after
  76.    * {@link CapacityTaskScheduler#initializeMemoryRelatedConf()} is invoked.
  77.    * 
  78.    * @param jConf JobConf of the job
  79.    * @return the physical memory allocated for the job's tasks
  80.    */
  81.   private long getPhysicalMemoryForTask(JobConf jConf) {
  82.     long pMemForTask = jConf.getMaxPhysicalMemoryForTask();
  83.     if (pMemForTask == JobConf.DISABLED_MEMORY_LIMIT) {
  84.       pMemForTask =
  85.           Math.round(getVirtualMemoryForTask(jConf)
  86.               * scheduler.defaultPercentOfPmemInVmem);
  87.     }
  88.     return pMemForTask;
  89.   }
  90.   static class Memory {
  91.     long vmem;
  92.     long pmem;
  93.     Memory(long vm, long pm) {
  94.       this.vmem = vm;
  95.       this.pmem = pm;
  96.     }
  97.   }
  98.   /**
  99.    * Find the memory that is already used by all the running tasks
  100.    * residing on the given TaskTracker.
  101.    * 
  102.    * @param taskTracker
  103.    * @return amount of memory that is used by the residing tasks
  104.    */
  105.   private synchronized Memory getMemReservedForTasks(
  106.       TaskTrackerStatus taskTracker) {
  107.     boolean disabledVmem = false;
  108.     boolean disabledPmem = false;
  109.     if (scheduler.defaultMaxVmPerTask == JobConf.DISABLED_MEMORY_LIMIT) {
  110.       disabledVmem = true;
  111.     }
  112.     if (scheduler.defaultPercentOfPmemInVmem == JobConf.DISABLED_MEMORY_LIMIT) {
  113.       disabledPmem = true;
  114.     }
  115.     if (disabledVmem && disabledPmem) {
  116.       return new Memory(JobConf.DISABLED_MEMORY_LIMIT,
  117.           JobConf.DISABLED_MEMORY_LIMIT);
  118.     }
  119.     long vmem = 0;
  120.     long pmem = 0;
  121.     for (TaskStatus task : taskTracker.getTaskReports()) {
  122.       // the following task states are one in which the slot is
  123.       // still occupied and hence memory of the task should be
  124.       // accounted in used memory.
  125.       if ((task.getRunState() == TaskStatus.State.RUNNING)
  126.           || (task.getRunState() == TaskStatus.State.COMMIT_PENDING)) {
  127.         JobConf jConf =
  128.             scheduler.taskTrackerManager.getJob(task.getTaskID().getJobID())
  129.                 .getJobConf();
  130.         if (!disabledVmem) {
  131.           vmem += getVirtualMemoryForTask(jConf);
  132.         }
  133.         if (!disabledPmem) {
  134.           pmem += getPhysicalMemoryForTask(jConf);
  135.         }
  136.       }
  137.     }
  138.     return new Memory(vmem, pmem);
  139.   }
  140.   /**
  141.    * Check if a TT has enough pmem and vmem to run this job.
  142.    * @param job
  143.    * @param taskTracker
  144.    * @return true if this TT has enough memory for this job. False otherwise.
  145.    */
  146.   boolean matchesMemoryRequirements(JobInProgress job,
  147.       TaskTrackerStatus taskTracker) {
  148.     // ////////////// vmem based scheduling
  149.     if (!isSchedulingBasedOnVmemEnabled()) {
  150.       LOG.debug("One of the configuration parameters defaultMaxVmPerTask "
  151.           + "and limitMaxVmemPerTasks is not configured. Scheduling based "
  152.           + "on job's memory requirements is disabled, ignoring any value "
  153.           + "set by job.");
  154.       return true;
  155.     }
  156.     TaskTrackerStatus.ResourceStatus resourceStatus =
  157.         taskTracker.getResourceStatus();
  158.     long totalVMemOnTT = resourceStatus.getTotalVirtualMemory();
  159.     long reservedVMemOnTT = resourceStatus.getReservedTotalMemory();
  160.     if (totalVMemOnTT == JobConf.DISABLED_MEMORY_LIMIT
  161.         || reservedVMemOnTT == JobConf.DISABLED_MEMORY_LIMIT) {
  162.       return true;
  163.     }
  164.     if (reservedVMemOnTT > totalVMemOnTT) {
  165.       return true;
  166.     }
  167.     long jobVMemForTask = job.getMaxVirtualMemoryForTask();
  168.     if (jobVMemForTask == JobConf.DISABLED_MEMORY_LIMIT) {
  169.       jobVMemForTask = scheduler.defaultMaxVmPerTask;
  170.     }
  171.     Memory memReservedForTasks = getMemReservedForTasks(taskTracker);
  172.     long vmemUsedOnTT = memReservedForTasks.vmem;
  173.     long pmemUsedOnTT = memReservedForTasks.pmem;
  174.     long freeVmemUsedOnTT = totalVMemOnTT - vmemUsedOnTT - reservedVMemOnTT;
  175.     if (jobVMemForTask > freeVmemUsedOnTT) {
  176.       return false;
  177.     }
  178.     // ////////////// pmem based scheduling
  179.     long totalPmemOnTT = resourceStatus.getTotalPhysicalMemory();
  180.     long reservedPmemOnTT = resourceStatus.getReservedPhysicalMemory();
  181.     long jobPMemForTask = job.getMaxPhysicalMemoryForTask();
  182.     long freePmemUsedOnTT = 0;
  183.     if (isSchedulingBasedOnPmemEnabled()) {
  184.       if (totalPmemOnTT == JobConf.DISABLED_MEMORY_LIMIT
  185.           || reservedPmemOnTT == JobConf.DISABLED_MEMORY_LIMIT) {
  186.         return true;
  187.       }
  188.       if (reservedPmemOnTT > totalPmemOnTT) {
  189.         return true;
  190.       }
  191.       if (jobPMemForTask == JobConf.DISABLED_MEMORY_LIMIT) {
  192.         jobPMemForTask =
  193.             Math.round(jobVMemForTask * scheduler.defaultPercentOfPmemInVmem);
  194.       }
  195.       freePmemUsedOnTT = totalPmemOnTT - pmemUsedOnTT - reservedPmemOnTT;
  196.       if (jobPMemForTask > freePmemUsedOnTT) {
  197.         return false;
  198.       }
  199.     } else {
  200.       LOG.debug("One of the configuration parameters "
  201.           + "defaultPercentOfPmemInVmem and limitMaxPmemPerTasks is not "
  202.           + "configured. Scheduling based on job's physical memory "
  203.           + "requirements is disabled, ignoring any value set by job.");
  204.     }
  205.     LOG.debug("freeVMemOnTT = " + freeVmemUsedOnTT + " totalVMemOnTT = "
  206.         + totalVMemOnTT + " freePMemOnTT = " + freePmemUsedOnTT
  207.         + " totalPMemOnTT = " + totalPmemOnTT + " jobVMemForTask = "
  208.         + jobVMemForTask + " jobPMemForTask = " + jobPMemForTask);
  209.     return true;
  210.   }
  211. }