HadoopJob.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:8k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.eclipse.server;
  19. import java.io.File;
  20. import java.io.IOException;
  21. import org.apache.hadoop.fs.FileSystem;
  22. import org.apache.hadoop.fs.FileUtil;
  23. import org.apache.hadoop.fs.Path;
  24. import org.apache.hadoop.mapred.Counters;
  25. import org.apache.hadoop.mapred.JobConf;
  26. import org.apache.hadoop.mapred.JobID;
  27. import org.apache.hadoop.mapred.JobStatus;
  28. import org.apache.hadoop.mapred.RunningJob;
  29. /**
  30.  * Representation of a Map/Reduce running job on a given location
  31.  */
  32. public class HadoopJob {
  33.   /**
  34.    * Enum representation of a Job state
  35.    */
  36.   public enum JobState {
  37.     PREPARE(JobStatus.PREP), RUNNING(JobStatus.RUNNING), FAILED(
  38.         JobStatus.FAILED), SUCCEEDED(JobStatus.SUCCEEDED);
  39.     final int state;
  40.     JobState(int state) {
  41.       this.state = state;
  42.     }
  43.     static JobState ofInt(int state) {
  44.       switch (state) {
  45.         case JobStatus.PREP:
  46.           return PREPARE;
  47.         case JobStatus.RUNNING:
  48.           return RUNNING;
  49.         case JobStatus.FAILED:
  50.           return FAILED;
  51.         case JobStatus.SUCCEEDED:
  52.           return SUCCEEDED;
  53.         default:
  54.           return null;
  55.       }
  56.     }
  57.   }
  58.   /**
  59.    * Location this Job runs on
  60.    */
  61.   private final HadoopServer location;
  62.   /**
  63.    * Unique identifier of this Job
  64.    */
  65.   final JobID jobId;
  66.   /**
  67.    * Status representation of a running job. This actually contains a
  68.    * reference to a JobClient. Its methods might block.
  69.    */
  70.   RunningJob running;
  71.   /**
  72.    * Last polled status
  73.    * 
  74.    * @deprecated should apparently not be used
  75.    */
  76.   JobStatus status;
  77.   /**
  78.    * Last polled counters
  79.    */
  80.   Counters counters;
  81.   /**
  82.    * Job Configuration
  83.    */
  84.   JobConf jobConf = null;
  85.   boolean completed = false;
  86.   boolean successful = false;
  87.   boolean killed = false;
  88.   int totalMaps;
  89.   int totalReduces;
  90.   int completedMaps;
  91.   int completedReduces;
  92.   float mapProgress;
  93.   float reduceProgress;
  94.   /**
  95.    * Constructor for a Hadoop job representation
  96.    * 
  97.    * @param location
  98.    * @param id
  99.    * @param running
  100.    * @param status
  101.    */
  102.   public HadoopJob(HadoopServer location, JobID id, RunningJob running,
  103.       JobStatus status) {
  104.     this.location = location;
  105.     this.jobId = id;
  106.     this.running = running;
  107.     loadJobFile();
  108.     update(status);
  109.   }
  110.   /**
  111.    * Try to locate and load the JobConf file for this job so to get more
  112.    * details on the job (number of maps and of reduces)
  113.    */
  114.   private void loadJobFile() {
  115.     try {
  116.       String jobFile = getJobFile();
  117.       FileSystem fs = location.getDFS();
  118.       File tmp = File.createTempFile(getJobID().toString(), ".xml");
  119.       if (FileUtil.copy(fs, new Path(jobFile), tmp, false, location
  120.           .getConfiguration())) {
  121.         this.jobConf = new JobConf(tmp.toString());
  122.         this.totalMaps = jobConf.getNumMapTasks();
  123.         this.totalReduces = jobConf.getNumReduceTasks();
  124.       }
  125.     } catch (IOException ioe) {
  126.       ioe.printStackTrace();
  127.     }
  128.   }
  129.   /* @inheritDoc */
  130.   @Override
  131.   public int hashCode() {
  132.     final int prime = 31;
  133.     int result = 1;
  134.     result = prime * result + ((jobId == null) ? 0 : jobId.hashCode());
  135.     result = prime * result + ((location == null) ? 0 : location.hashCode());
  136.     return result;
  137.   }
  138.   /* @inheritDoc */
  139.   @Override
  140.   public boolean equals(Object obj) {
  141.     if (this == obj)
  142.       return true;
  143.     if (obj == null)
  144.       return false;
  145.     if (!(obj instanceof HadoopJob))
  146.       return false;
  147.     final HadoopJob other = (HadoopJob) obj;
  148.     if (jobId == null) {
  149.       if (other.jobId != null)
  150.         return false;
  151.     } else if (!jobId.equals(other.jobId))
  152.       return false;
  153.     if (location == null) {
  154.       if (other.location != null)
  155.         return false;
  156.     } else if (!location.equals(other.location))
  157.       return false;
  158.     return true;
  159.   }
  160.   /**
  161.    * Get the running status of the Job (@see {@link JobStatus}).
  162.    * 
  163.    * @return
  164.    */
  165.   public JobState getState() {
  166.     if (this.completed) {
  167.       if (this.successful) {
  168.         return JobState.SUCCEEDED;
  169.       } else {
  170.         return JobState.FAILED;
  171.       }
  172.     } else {
  173.       return JobState.RUNNING;
  174.     }
  175.     // return JobState.ofInt(this.status.getRunState());
  176.   }
  177.   /**
  178.    * @return
  179.    */
  180.   public JobID getJobID() {
  181.     return this.jobId;
  182.   }
  183.   /**
  184.    * @return
  185.    */
  186.   public HadoopServer getLocation() {
  187.     return this.location;
  188.   }
  189.   /**
  190.    * @return
  191.    */
  192.   public boolean isCompleted() {
  193.     return this.completed;
  194.   }
  195.   /**
  196.    * @return
  197.    */
  198.   public String getJobName() {
  199.     return this.running.getJobName();
  200.   }
  201.   /**
  202.    * @return
  203.    */
  204.   public String getJobFile() {
  205.     return this.running.getJobFile();
  206.   }
  207.   /**
  208.    * Return the tracking URL for this Job.
  209.    * 
  210.    * @return string representation of the tracking URL for this Job
  211.    */
  212.   public String getTrackingURL() {
  213.     return this.running.getTrackingURL();
  214.   }
  215.   /**
  216.    * Returns a string representation of this job status
  217.    * 
  218.    * @return string representation of this job status
  219.    */
  220.   public String getStatus() {
  221.     StringBuffer s = new StringBuffer();
  222.     s.append("Maps : " + completedMaps + "/" + totalMaps);
  223.     s.append(" (" + mapProgress + ")");
  224.     s.append("  Reduces : " + completedReduces + "/" + totalReduces);
  225.     s.append(" (" + reduceProgress + ")");
  226.     return s.toString();
  227.   }
  228.   /**
  229.    * Update this job status according to the given JobStatus
  230.    * 
  231.    * @param status
  232.    */
  233.   void update(JobStatus status) {
  234.     this.status = status;
  235.     try {
  236.       this.counters = running.getCounters();
  237.       this.completed = running.isComplete();
  238.       this.successful = running.isSuccessful();
  239.       this.mapProgress = running.mapProgress();
  240.       this.reduceProgress = running.reduceProgress();
  241.       // running.getTaskCompletionEvents(fromEvent);
  242.     } catch (IOException ioe) {
  243.       ioe.printStackTrace();
  244.     }
  245.     this.completedMaps = (int) (this.totalMaps * this.mapProgress);
  246.     this.completedReduces = (int) (this.totalReduces * this.reduceProgress);
  247.   }
  248.   /**
  249.    * Print this job counters (for debugging purpose)
  250.    */
  251.   void printCounters() {
  252.     System.out.printf("New Job:n", counters);
  253.     for (String groupName : counters.getGroupNames()) {
  254.       Counters.Group group = counters.getGroup(groupName);
  255.       System.out.printf("t%s[%s]n", groupName, group.getDisplayName());
  256.       for (Counters.Counter counter : group) {
  257.         System.out.printf("tt%s: %sn", counter.getDisplayName(),
  258.                                           counter.getCounter());
  259.       }
  260.     }
  261.     System.out.printf("n");
  262.   }
  263.   /**
  264.    * Kill this job
  265.    */
  266.   public void kill() {
  267.     try {
  268.       this.running.killJob();
  269.       this.killed = true;
  270.     } catch (IOException e) {
  271.       e.printStackTrace();
  272.     }
  273.   }
  274.   /**
  275.    * Print this job status (for debugging purpose)
  276.    */
  277.   public void display() {
  278.     System.out.printf("Job id=%s, name=%sn", getJobID(), getJobName());
  279.     System.out.printf("Configuration file: %sn", getJobID());
  280.     System.out.printf("Tracking URL: %sn", getTrackingURL());
  281.     System.out.printf("Completion: map: %f reduce %fn",
  282.         100.0 * this.mapProgress, 100.0 * this.reduceProgress);
  283.     System.out.println("Job total maps = " + totalMaps);
  284.     System.out.println("Job completed maps = " + completedMaps);
  285.     System.out.println("Map percentage complete = " + mapProgress);
  286.     System.out.println("Job total reduces = " + totalReduces);
  287.     System.out.println("Job completed reduces = " + completedReduces);
  288.     System.out.println("Reduce percentage complete = " + reduceProgress);
  289.     System.out.flush();
  290.   }
  291. }