CompletedJobStatusStore.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:9k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */  
  18. package org.apache.hadoop.mapred;
  19. import java.io.IOException;
  20. import org.apache.commons.logging.Log;
  21. import org.apache.commons.logging.LogFactory;
  22. import org.apache.hadoop.conf.Configuration;
  23. import org.apache.hadoop.fs.FSDataInputStream;
  24. import org.apache.hadoop.fs.FSDataOutputStream;
  25. import org.apache.hadoop.fs.FileStatus;
  26. import org.apache.hadoop.fs.FileSystem;
  27. import org.apache.hadoop.fs.Path;
  28. /**
  29.  * Persists and retrieves the Job info of a job into/from DFS.
  30.  * <p/>
  31.  * If the retain time is zero jobs are not persisted.
  32.  * <p/>
  33.  * A daemon thread cleans up job info files older than the retain time
  34.  * <p/>
  35.  * The retain time can be set with the 'persist.jobstatus.hours'
  36.  * configuration variable (it is in hours).
  37.  */
  38. class CompletedJobStatusStore implements Runnable {
  39.   private boolean active;
  40.   private String jobInfoDir;
  41.   private long retainTime;
  42.   private FileSystem fs;
  43.   private static final String JOB_INFO_STORE_DIR = "/jobtracker/jobsInfo";
  44.   public static final Log LOG =
  45.           LogFactory.getLog(CompletedJobStatusStore.class);
  46.   private static long HOUR = 1000 * 60 * 60;
  47.   private static long SLEEP_TIME = 1 * HOUR;
  48.   CompletedJobStatusStore(Configuration conf, FileSystem fs) throws IOException {
  49.     active =
  50.       conf.getBoolean("mapred.job.tracker.persist.jobstatus.active", false);
  51.     if (active) {
  52.       this.fs = fs;
  53.       retainTime =
  54.         conf.getInt("mapred.job.tracker.persist.jobstatus.hours", 0) * HOUR;
  55.       jobInfoDir =
  56.         conf.get("mapred.job.tracker.persist.jobstatus.dir", JOB_INFO_STORE_DIR);
  57.       Path path = new Path(jobInfoDir);
  58.       if (!fs.exists(path)) {
  59.         fs.mkdirs(path);
  60.       }
  61.       if (retainTime == 0) {
  62.         // as retain time is zero, all stored jobstatuses are deleted.
  63.         deleteJobStatusDirs();
  64.       }
  65.     }
  66.   }
  67.   /**
  68.    * Indicates if job status persistency is active or not.
  69.    *
  70.    * @return TRUE if active, FALSE otherwise.
  71.    */
  72.   public boolean isActive() {
  73.     return active;
  74.   }
  75.   public void run() {
  76.     if (retainTime > 0) {
  77.       while (true) {
  78.         deleteJobStatusDirs();
  79.         try {
  80.           Thread.sleep(SLEEP_TIME);
  81.         }
  82.         catch (InterruptedException ex) {
  83.           break;
  84.         }
  85.       }
  86.     }
  87.   }
  88.   private void deleteJobStatusDirs() {
  89.     try {
  90.       long currentTime = System.currentTimeMillis();
  91.       FileStatus[] jobInfoFiles = fs.listStatus(
  92.               new Path[]{new Path(jobInfoDir)});
  93.       //noinspection ForLoopReplaceableByForEach
  94.       for (FileStatus jobInfo : jobInfoFiles) {
  95.         try {
  96.           if ((currentTime - jobInfo.getModificationTime()) > retainTime) {
  97.             fs.delete(jobInfo.getPath(), true);
  98.           }
  99.         }
  100.         catch (IOException ie) {
  101.           LOG.warn("Could not do housekeeping for [ " +
  102.                   jobInfo.getPath() + "] job info : " + ie.getMessage(), ie);
  103.         }
  104.       }
  105.     }
  106.     catch (IOException ie) {
  107.       LOG.warn("Could not obtain job info files : " + ie.getMessage(), ie);
  108.     }
  109.   }
  110.   private Path getInfoFilePath(JobID jobId) {
  111.     return new Path(jobInfoDir, jobId + ".info");
  112.   }
  113.   
  114.   /**
  115.    * Persists a job in DFS.
  116.    *
  117.    * @param job the job about to be 'retired'
  118.    */
  119.   public void store(JobInProgress job) {
  120.     if (active && retainTime > 0) {
  121.       JobID jobId = job.getStatus().getJobID();
  122.       Path jobStatusFile = getInfoFilePath(jobId);
  123.       try {
  124.         FSDataOutputStream dataOut = fs.create(jobStatusFile);
  125.         job.getStatus().write(dataOut);
  126.         job.getProfile().write(dataOut);
  127.         job.getCounters().write(dataOut);
  128.         TaskCompletionEvent[] events = 
  129.                 job.getTaskCompletionEvents(0, Integer.MAX_VALUE);
  130.         dataOut.writeInt(events.length);
  131.         for (TaskCompletionEvent event : events) {
  132.           event.write(dataOut);
  133.         }
  134.         dataOut.close();
  135.       } catch (IOException ex) {
  136.         LOG.warn("Could not store [" + jobId + "] job info : " +
  137.                  ex.getMessage(), ex);
  138.         try {
  139.           fs.delete(jobStatusFile, true);
  140.         }
  141.         catch (IOException ex1) {
  142.           //ignore
  143.         }
  144.       }
  145.     }
  146.   }
  147.   private FSDataInputStream getJobInfoFile(JobID jobId) throws IOException {
  148.     Path jobStatusFile = getInfoFilePath(jobId);
  149.     return (fs.exists(jobStatusFile)) ? fs.open(jobStatusFile) : null;
  150.   }
  151.   private JobStatus readJobStatus(FSDataInputStream dataIn) throws IOException {
  152.     JobStatus jobStatus = new JobStatus();
  153.     jobStatus.readFields(dataIn);
  154.     return jobStatus;
  155.   }
  156.   private JobProfile readJobProfile(FSDataInputStream dataIn)
  157.           throws IOException {
  158.     JobProfile jobProfile = new JobProfile();
  159.     jobProfile.readFields(dataIn);
  160.     return jobProfile;
  161.   }
  162.   private Counters readCounters(FSDataInputStream dataIn) throws IOException {
  163.     Counters counters = new Counters();
  164.     counters.readFields(dataIn);
  165.     return counters;
  166.   }
  167.   private TaskCompletionEvent[] readEvents(FSDataInputStream dataIn,
  168.                                            int offset, int len)
  169.           throws IOException {
  170.     int size = dataIn.readInt();
  171.     if (offset > size) {
  172.       return TaskCompletionEvent.EMPTY_ARRAY;
  173.     }
  174.     if (offset + len > size) {
  175.       len = size - offset;
  176.     }
  177.     TaskCompletionEvent[] events = new TaskCompletionEvent[len];
  178.     for (int i = 0; i < (offset + len); i++) {
  179.       TaskCompletionEvent event = new TaskCompletionEvent();
  180.       event.readFields(dataIn);
  181.       if (i >= offset) {
  182.         events[i - offset] = event;
  183.       }
  184.     }
  185.     return events;
  186.   }
  187.   /**
  188.    * This method retrieves JobStatus information from DFS stored using
  189.    * store method.
  190.    *
  191.    * @param jobId the jobId for which jobStatus is queried
  192.    * @return JobStatus object, null if not able to retrieve
  193.    */
  194.   public JobStatus readJobStatus(JobID jobId) {
  195.     JobStatus jobStatus = null;
  196.     
  197.     if (null == jobId) {
  198.       LOG.warn("Could not read job status for null jobId");
  199.       return null;
  200.     }
  201.     
  202.     if (active) {
  203.       try {
  204.         FSDataInputStream dataIn = getJobInfoFile(jobId);
  205.         if (dataIn != null) {
  206.           jobStatus = readJobStatus(dataIn);
  207.           dataIn.close();
  208.         }
  209.       } catch (IOException ex) {
  210.         LOG.warn("Could not read [" + jobId + "] job status : " + ex, ex);
  211.       }
  212.     }
  213.     return jobStatus;
  214.   }
  215.   /**
  216.    * This method retrieves JobProfile information from DFS stored using
  217.    * store method.
  218.    *
  219.    * @param jobId the jobId for which jobProfile is queried
  220.    * @return JobProfile object, null if not able to retrieve
  221.    */
  222.   public JobProfile readJobProfile(JobID jobId) {
  223.     JobProfile jobProfile = null;
  224.     if (active) {
  225.       try {
  226.         FSDataInputStream dataIn = getJobInfoFile(jobId);
  227.         if (dataIn != null) {
  228.           readJobStatus(dataIn);
  229.           jobProfile = readJobProfile(dataIn);
  230.           dataIn.close();
  231.         }
  232.       } catch (IOException ex) {
  233.         LOG.warn("Could not read [" + jobId + "] job profile : " + ex, ex);
  234.       }
  235.     }
  236.     return jobProfile;
  237.   }
  238.   /**
  239.    * This method retrieves Counters information from DFS stored using
  240.    * store method.
  241.    *
  242.    * @param jobId the jobId for which Counters is queried
  243.    * @return Counters object, null if not able to retrieve
  244.    */
  245.   public Counters readCounters(JobID jobId) {
  246.     Counters counters = null;
  247.     if (active) {
  248.       try {
  249.         FSDataInputStream dataIn = getJobInfoFile(jobId);
  250.         if (dataIn != null) {
  251.           readJobStatus(dataIn);
  252.           readJobProfile(dataIn);
  253.           counters = readCounters(dataIn);
  254.           dataIn.close();
  255.         }
  256.       } catch (IOException ex) {
  257.         LOG.warn("Could not read [" + jobId + "] job counters : " + ex, ex);
  258.       }
  259.     }
  260.     return counters;
  261.   }
  262.   /**
  263.    * This method retrieves TaskCompletionEvents information from DFS stored
  264.    * using store method.
  265.    *
  266.    * @param jobId       the jobId for which TaskCompletionEvents is queried
  267.    * @param fromEventId events offset
  268.    * @param maxEvents   max number of events
  269.    * @return TaskCompletionEvent[], empty array if not able to retrieve
  270.    */
  271.   public TaskCompletionEvent[] readJobTaskCompletionEvents(JobID jobId,
  272.                                                                int fromEventId,
  273.                                                                int maxEvents) {
  274.     TaskCompletionEvent[] events = TaskCompletionEvent.EMPTY_ARRAY;
  275.     if (active) {
  276.       try {
  277.         FSDataInputStream dataIn = getJobInfoFile(jobId);
  278.         if (dataIn != null) {
  279.           readJobStatus(dataIn);
  280.           readJobProfile(dataIn);
  281.           readCounters(dataIn);
  282.           events = readEvents(dataIn, fromEventId, maxEvents);
  283.           dataIn.close();
  284.         }
  285.       } catch (IOException ex) {
  286.         LOG.warn("Could not read [" + jobId + "] job events : " + ex, ex);
  287.       }
  288.     }
  289.     return events;
  290.   }
  291. }