CompletedJobStatusStore.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:9k
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.apache.hadoop.mapred;
- import java.io.IOException;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FSDataInputStream;
- import org.apache.hadoop.fs.FSDataOutputStream;
- import org.apache.hadoop.fs.FileStatus;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- /**
- * Persists and retrieves the Job info of a job into/from DFS.
- * <p/>
- * If the retain time is zero jobs are not persisted.
- * <p/>
- * A daemon thread cleans up job info files older than the retain time
- * <p/>
- * The retain time can be set with the 'persist.jobstatus.hours'
- * configuration variable (it is in hours).
- */
- class CompletedJobStatusStore implements Runnable {
- private boolean active;
- private String jobInfoDir;
- private long retainTime;
- private FileSystem fs;
- private static final String JOB_INFO_STORE_DIR = "/jobtracker/jobsInfo";
- public static final Log LOG =
- LogFactory.getLog(CompletedJobStatusStore.class);
- private static long HOUR = 1000 * 60 * 60;
- private static long SLEEP_TIME = 1 * HOUR;
- CompletedJobStatusStore(Configuration conf, FileSystem fs) throws IOException {
- active =
- conf.getBoolean("mapred.job.tracker.persist.jobstatus.active", false);
- if (active) {
- this.fs = fs;
- retainTime =
- conf.getInt("mapred.job.tracker.persist.jobstatus.hours", 0) * HOUR;
- jobInfoDir =
- conf.get("mapred.job.tracker.persist.jobstatus.dir", JOB_INFO_STORE_DIR);
- Path path = new Path(jobInfoDir);
- if (!fs.exists(path)) {
- fs.mkdirs(path);
- }
- if (retainTime == 0) {
- // as retain time is zero, all stored jobstatuses are deleted.
- deleteJobStatusDirs();
- }
- }
- }
- /**
- * Indicates if job status persistency is active or not.
- *
- * @return TRUE if active, FALSE otherwise.
- */
- public boolean isActive() {
- return active;
- }
- public void run() {
- if (retainTime > 0) {
- while (true) {
- deleteJobStatusDirs();
- try {
- Thread.sleep(SLEEP_TIME);
- }
- catch (InterruptedException ex) {
- break;
- }
- }
- }
- }
- private void deleteJobStatusDirs() {
- try {
- long currentTime = System.currentTimeMillis();
- FileStatus[] jobInfoFiles = fs.listStatus(
- new Path[]{new Path(jobInfoDir)});
- //noinspection ForLoopReplaceableByForEach
- for (FileStatus jobInfo : jobInfoFiles) {
- try {
- if ((currentTime - jobInfo.getModificationTime()) > retainTime) {
- fs.delete(jobInfo.getPath(), true);
- }
- }
- catch (IOException ie) {
- LOG.warn("Could not do housekeeping for [ " +
- jobInfo.getPath() + "] job info : " + ie.getMessage(), ie);
- }
- }
- }
- catch (IOException ie) {
- LOG.warn("Could not obtain job info files : " + ie.getMessage(), ie);
- }
- }
- private Path getInfoFilePath(JobID jobId) {
- return new Path(jobInfoDir, jobId + ".info");
- }
-
- /**
- * Persists a job in DFS.
- *
- * @param job the job about to be 'retired'
- */
- public void store(JobInProgress job) {
- if (active && retainTime > 0) {
- JobID jobId = job.getStatus().getJobID();
- Path jobStatusFile = getInfoFilePath(jobId);
- try {
- FSDataOutputStream dataOut = fs.create(jobStatusFile);
- job.getStatus().write(dataOut);
- job.getProfile().write(dataOut);
- job.getCounters().write(dataOut);
- TaskCompletionEvent[] events =
- job.getTaskCompletionEvents(0, Integer.MAX_VALUE);
- dataOut.writeInt(events.length);
- for (TaskCompletionEvent event : events) {
- event.write(dataOut);
- }
- dataOut.close();
- } catch (IOException ex) {
- LOG.warn("Could not store [" + jobId + "] job info : " +
- ex.getMessage(), ex);
- try {
- fs.delete(jobStatusFile, true);
- }
- catch (IOException ex1) {
- //ignore
- }
- }
- }
- }
- private FSDataInputStream getJobInfoFile(JobID jobId) throws IOException {
- Path jobStatusFile = getInfoFilePath(jobId);
- return (fs.exists(jobStatusFile)) ? fs.open(jobStatusFile) : null;
- }
- private JobStatus readJobStatus(FSDataInputStream dataIn) throws IOException {
- JobStatus jobStatus = new JobStatus();
- jobStatus.readFields(dataIn);
- return jobStatus;
- }
- private JobProfile readJobProfile(FSDataInputStream dataIn)
- throws IOException {
- JobProfile jobProfile = new JobProfile();
- jobProfile.readFields(dataIn);
- return jobProfile;
- }
- private Counters readCounters(FSDataInputStream dataIn) throws IOException {
- Counters counters = new Counters();
- counters.readFields(dataIn);
- return counters;
- }
- private TaskCompletionEvent[] readEvents(FSDataInputStream dataIn,
- int offset, int len)
- throws IOException {
- int size = dataIn.readInt();
- if (offset > size) {
- return TaskCompletionEvent.EMPTY_ARRAY;
- }
- if (offset + len > size) {
- len = size - offset;
- }
- TaskCompletionEvent[] events = new TaskCompletionEvent[len];
- for (int i = 0; i < (offset + len); i++) {
- TaskCompletionEvent event = new TaskCompletionEvent();
- event.readFields(dataIn);
- if (i >= offset) {
- events[i - offset] = event;
- }
- }
- return events;
- }
- /**
- * This method retrieves JobStatus information from DFS stored using
- * store method.
- *
- * @param jobId the jobId for which jobStatus is queried
- * @return JobStatus object, null if not able to retrieve
- */
- public JobStatus readJobStatus(JobID jobId) {
- JobStatus jobStatus = null;
-
- if (null == jobId) {
- LOG.warn("Could not read job status for null jobId");
- return null;
- }
-
- if (active) {
- try {
- FSDataInputStream dataIn = getJobInfoFile(jobId);
- if (dataIn != null) {
- jobStatus = readJobStatus(dataIn);
- dataIn.close();
- }
- } catch (IOException ex) {
- LOG.warn("Could not read [" + jobId + "] job status : " + ex, ex);
- }
- }
- return jobStatus;
- }
- /**
- * This method retrieves JobProfile information from DFS stored using
- * store method.
- *
- * @param jobId the jobId for which jobProfile is queried
- * @return JobProfile object, null if not able to retrieve
- */
- public JobProfile readJobProfile(JobID jobId) {
- JobProfile jobProfile = null;
- if (active) {
- try {
- FSDataInputStream dataIn = getJobInfoFile(jobId);
- if (dataIn != null) {
- readJobStatus(dataIn);
- jobProfile = readJobProfile(dataIn);
- dataIn.close();
- }
- } catch (IOException ex) {
- LOG.warn("Could not read [" + jobId + "] job profile : " + ex, ex);
- }
- }
- return jobProfile;
- }
- /**
- * This method retrieves Counters information from DFS stored using
- * store method.
- *
- * @param jobId the jobId for which Counters is queried
- * @return Counters object, null if not able to retrieve
- */
- public Counters readCounters(JobID jobId) {
- Counters counters = null;
- if (active) {
- try {
- FSDataInputStream dataIn = getJobInfoFile(jobId);
- if (dataIn != null) {
- readJobStatus(dataIn);
- readJobProfile(dataIn);
- counters = readCounters(dataIn);
- dataIn.close();
- }
- } catch (IOException ex) {
- LOG.warn("Could not read [" + jobId + "] job counters : " + ex, ex);
- }
- }
- return counters;
- }
- /**
- * This method retrieves TaskCompletionEvents information from DFS stored
- * using store method.
- *
- * @param jobId the jobId for which TaskCompletionEvents is queried
- * @param fromEventId events offset
- * @param maxEvents max number of events
- * @return TaskCompletionEvent[], empty array if not able to retrieve
- */
- public TaskCompletionEvent[] readJobTaskCompletionEvents(JobID jobId,
- int fromEventId,
- int maxEvents) {
- TaskCompletionEvent[] events = TaskCompletionEvent.EMPTY_ARRAY;
- if (active) {
- try {
- FSDataInputStream dataIn = getJobInfoFile(jobId);
- if (dataIn != null) {
- readJobStatus(dataIn);
- readJobProfile(dataIn);
- readCounters(dataIn);
- events = readEvents(dataIn, fromEventId, maxEvents);
- dataIn.close();
- }
- } catch (IOException ex) {
- LOG.warn("Could not read [" + jobId + "] job events : " + ex, ex);
- }
- }
- return events;
- }
- }