TaskTrackerStatus.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:10k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import org.apache.hadoop.io.*;
  20. import java.io.*;
  21. import java.util.*;
  22. /**************************************************
  23.  * A TaskTrackerStatus is a MapReduce primitive.  Keeps
  24.  * info on a TaskTracker.  The JobTracker maintains a set
  25.  * of the most recent TaskTrackerStatus objects for each
  26.  * unique TaskTracker it knows about.
  27.  *
  28.  **************************************************/
  29. class TaskTrackerStatus implements Writable {
  30.   static {                                        // register a ctor
  31.     WritableFactories.setFactory
  32.       (TaskTrackerStatus.class,
  33.        new WritableFactory() {
  34.          public Writable newInstance() { return new TaskTrackerStatus(); }
  35.        });
  36.   }
  37.   String trackerName;
  38.   String host;
  39.   int httpPort;
  40.   int failures;
  41.   List<TaskStatus> taskReports;
  42.     
  43.   volatile long lastSeen;
  44.   private int maxMapTasks;
  45.   private int maxReduceTasks;
  46.    
  47.   /**
  48.    * Class representing a collection of resources on this tasktracker.
  49.    */
  50.   static class ResourceStatus implements Writable {
  51.     
  52.     private long totalVirtualMemory;
  53.     private long reservedVirtualMemory;
  54.     private long totalPhysicalMemory;
  55.     private long reservedPhysicalMemory;
  56.     private long availableSpace;
  57.     
  58.     ResourceStatus() {
  59.       totalVirtualMemory = JobConf.DISABLED_MEMORY_LIMIT;
  60.       reservedVirtualMemory = JobConf.DISABLED_MEMORY_LIMIT;
  61.       totalPhysicalMemory = JobConf.DISABLED_MEMORY_LIMIT;
  62.       reservedPhysicalMemory = JobConf.DISABLED_MEMORY_LIMIT;
  63.       availableSpace = Long.MAX_VALUE;
  64.     }
  65.     /**
  66.      * Set the maximum amount of virtual memory on the tasktracker.
  67.      * 
  68.      * @param vmem maximum amount of virtual memory on the tasktracker in bytes.
  69.      */
  70.     void setTotalVirtualMemory(long totalMem) {
  71.       totalVirtualMemory = totalMem;
  72.     }
  73.     /**
  74.      * Get the maximum amount of virtual memory on the tasktracker.
  75.      * 
  76.      * If this is {@link JobConf#DISABLED_MEMORY_LIMIT}, it should be ignored
  77.      * and not used in any computation.
  78.      * 
  79.      * @return the maximum amount of virtual memory on the tasktracker in bytes.
  80.      */
  81.     long getTotalVirtualMemory() {
  82.       return totalVirtualMemory;
  83.     }
  84.     /**
  85.      * Set the amount of virtual memory reserved on the TaskTracker for system
  86.      * usage (OS, TT etc).
  87.      * 
  88.      * @param reservedVmem amount of virtual memory reserved in bytes.
  89.      */
  90.     void setReservedVirtualMemory(long reservedVmem) {
  91.       reservedVirtualMemory = reservedVmem;
  92.     }
  93.     /**
  94.      * Get the amount of virtual memory reserved on the TaskTracker for system
  95.      * usage (OS, TT etc).
  96.      */
  97.     long getReservedTotalMemory() {
  98.       return reservedVirtualMemory;
  99.     }
  100.     /**
  101.      * Set the maximum amount of physical memory on the tasktracker.
  102.      * 
  103.      * @param totalRAM maximum amount of physical memory on the tasktracker in
  104.      *          bytes.
  105.      */
  106.     void setTotalPhysicalMemory(long totalRAM) {
  107.       totalPhysicalMemory = totalRAM;
  108.     }
  109.     /**
  110.      * Get the maximum amount of physical memory on the tasktracker.
  111.      * 
  112.      * If this is {@link JobConf#DISABLED_MEMORY_LIMIT}, it should be ignored
  113.      * and not used in any computation.
  114.      * 
  115.      * @return maximum amount of physical memory on the tasktracker in bytes.
  116.      */
  117.     long getTotalPhysicalMemory() {
  118.       return totalPhysicalMemory;
  119.     }
  120.     /**
  121.      * Set the amount of physical memory reserved on the TaskTracker for system
  122.      * usage (OS, TT etc).
  123.      * 
  124.      * @param reservedPmem amount of physical memory reserved in bytes.
  125.      */
  126.     void setReservedPhysicalMemory(long reservedPmem) {
  127.       reservedPhysicalMemory = reservedPmem;
  128.     }
  129.     /**
  130.      * Get the amount of physical memory reserved on the TaskTracker for system
  131.      * usage (OS, TT etc).
  132.      */
  133.     long getReservedPhysicalMemory() {
  134.       return reservedPhysicalMemory;
  135.     }
  136.     void setAvailableSpace(long availSpace) {
  137.       availableSpace = availSpace;
  138.     }
  139.     
  140.     /**
  141.      * Will return LONG_MAX if space hasn't been measured yet.
  142.      * @return bytes of available local disk space on this tasktracker.
  143.      */    
  144.     long getAvailableSpace() {
  145.       return availableSpace;
  146.     }
  147.     
  148.     public void write(DataOutput out) throws IOException {
  149.       WritableUtils.writeVLong(out, totalVirtualMemory);
  150.       WritableUtils.writeVLong(out, reservedVirtualMemory);
  151.       WritableUtils.writeVLong(out, totalPhysicalMemory);
  152.       WritableUtils.writeVLong(out, reservedPhysicalMemory);
  153.       WritableUtils.writeVLong(out, availableSpace);
  154.     }
  155.     
  156.     public void readFields(DataInput in) throws IOException {
  157.       totalVirtualMemory = WritableUtils.readVLong(in);
  158.       reservedVirtualMemory = WritableUtils.readVLong(in);
  159.       totalPhysicalMemory = WritableUtils.readVLong(in);
  160.       reservedPhysicalMemory = WritableUtils.readVLong(in);
  161.       availableSpace = WritableUtils.readVLong(in);
  162.     }
  163.   }
  164.   
  165.   private ResourceStatus resStatus;
  166.   
  167.   /**
  168.    */
  169.   public TaskTrackerStatus() {
  170.     taskReports = new ArrayList<TaskStatus>();
  171.     resStatus = new ResourceStatus();
  172.   }
  173.   /**
  174.    */
  175.   public TaskTrackerStatus(String trackerName, String host, 
  176.                            int httpPort, List<TaskStatus> taskReports, 
  177.                            int failures, int maxMapTasks,
  178.                            int maxReduceTasks) {
  179.     this.trackerName = trackerName;
  180.     this.host = host;
  181.     this.httpPort = httpPort;
  182.     this.taskReports = new ArrayList<TaskStatus>(taskReports);
  183.     this.failures = failures;
  184.     this.maxMapTasks = maxMapTasks;
  185.     this.maxReduceTasks = maxReduceTasks;
  186.     this.resStatus = new ResourceStatus();
  187.   }
  188.   /**
  189.    */
  190.   public String getTrackerName() {
  191.     return trackerName;
  192.   }
  193.   /**
  194.    */
  195.   public String getHost() {
  196.     return host;
  197.   }
  198.   /**
  199.    * Get the port that this task tracker is serving http requests on.
  200.    * @return the http port
  201.    */
  202.   public int getHttpPort() {
  203.     return httpPort;
  204.   }
  205.     
  206.   /**
  207.    * Get the number of tasks that have failed on this tracker.
  208.    * @return The number of failed tasks
  209.    */
  210.   public int getFailures() {
  211.     return failures;
  212.   }
  213.     
  214.   /**
  215.    * Get the current tasks at the TaskTracker.
  216.    * Tasks are tracked by a {@link TaskStatus} object.
  217.    * 
  218.    * @return a list of {@link TaskStatus} representing 
  219.    *         the current tasks at the TaskTracker.
  220.    */
  221.   public List<TaskStatus> getTaskReports() {
  222.     return taskReports;
  223.   }
  224.     
  225.   /**
  226.    * Return the current MapTask count
  227.    */
  228.   public int countMapTasks() {
  229.     int mapCount = 0;
  230.     for (Iterator it = taskReports.iterator(); it.hasNext();) {
  231.       TaskStatus ts = (TaskStatus) it.next();
  232.       TaskStatus.State state = ts.getRunState();
  233.       if (ts.getIsMap() &&
  234.           ((state == TaskStatus.State.RUNNING) ||
  235.            (state == TaskStatus.State.UNASSIGNED) ||
  236.            ts.inTaskCleanupPhase())) {
  237.         mapCount++;
  238.       }
  239.     }
  240.     return mapCount;
  241.   }
  242.   /**
  243.    * Return the current ReduceTask count
  244.    */
  245.   public int countReduceTasks() {
  246.     int reduceCount = 0;
  247.     for (Iterator it = taskReports.iterator(); it.hasNext();) {
  248.       TaskStatus ts = (TaskStatus) it.next();
  249.       TaskStatus.State state = ts.getRunState();
  250.       if ((!ts.getIsMap()) &&
  251.           ((state == TaskStatus.State.RUNNING) ||  
  252.            (state == TaskStatus.State.UNASSIGNED) ||
  253.            ts.inTaskCleanupPhase())) {
  254.         reduceCount++;
  255.       }
  256.     }
  257.     return reduceCount;
  258.   }
  259.   /**
  260.    */
  261.   public long getLastSeen() {
  262.     return lastSeen;
  263.   }
  264.   /**
  265.    */
  266.   public void setLastSeen(long lastSeen) {
  267.     this.lastSeen = lastSeen;
  268.   }
  269.   /**
  270.    * Get the maximum concurrent tasks for this node.  (This applies
  271.    * per type of task - a node with maxTasks==1 will run up to 1 map
  272.    * and 1 reduce concurrently).
  273.    * @return maximum tasks this node supports
  274.    */
  275.   public int getMaxMapTasks() {
  276.     return maxMapTasks;
  277.   }
  278.   public int getMaxReduceTasks() {
  279.     return maxReduceTasks;
  280.   }  
  281.   
  282.   /**
  283.    * Return the {@link ResourceStatus} object configured with this
  284.    * status.
  285.    * 
  286.    * @return the resource status
  287.    */
  288.   ResourceStatus getResourceStatus() {
  289.     return resStatus;
  290.   }
  291.   
  292.   ///////////////////////////////////////////
  293.   // Writable
  294.   ///////////////////////////////////////////
  295.   public void write(DataOutput out) throws IOException {
  296.     UTF8.writeString(out, trackerName);
  297.     UTF8.writeString(out, host);
  298.     out.writeInt(httpPort);
  299.     out.writeInt(failures);
  300.     out.writeInt(maxMapTasks);
  301.     out.writeInt(maxReduceTasks);
  302.     resStatus.write(out);
  303.     out.writeInt(taskReports.size());
  304.     for (TaskStatus taskStatus : taskReports) {
  305.       TaskStatus.writeTaskStatus(out, taskStatus);
  306.     }
  307.   }
  308.   public void readFields(DataInput in) throws IOException {
  309.     this.trackerName = UTF8.readString(in);
  310.     this.host = UTF8.readString(in);
  311.     this.httpPort = in.readInt();
  312.     this.failures = in.readInt();
  313.     this.maxMapTasks = in.readInt();
  314.     this.maxReduceTasks = in.readInt();
  315.     resStatus.readFields(in);
  316.     taskReports.clear();
  317.     int numTasks = in.readInt();
  318.     for (int i = 0; i < numTasks; i++) {
  319.       taskReports.add(TaskStatus.readTaskStatus(in));
  320.     }
  321.   }
  322. }