FileOutputCommitter.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:8k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapreduce.lib.output;
  19. import java.io.IOException;
  20. import java.net.URI;
  21. import org.apache.commons.logging.Log;
  22. import org.apache.commons.logging.LogFactory;
  23. import org.apache.hadoop.fs.FileStatus;
  24. import org.apache.hadoop.fs.FileSystem;
  25. import org.apache.hadoop.fs.Path;
  26. import org.apache.hadoop.mapreduce.JobContext;
  27. import org.apache.hadoop.mapreduce.OutputCommitter;
  28. import org.apache.hadoop.mapreduce.TaskAttemptContext;
  29. import org.apache.hadoop.mapreduce.TaskAttemptID;
  30. import org.apache.hadoop.util.StringUtils;
  31. /** An {@link OutputCommitter} that commits files specified 
  32.  * in job output directory i.e. ${mapred.output.dir}. 
  33.  **/
  34. public class FileOutputCommitter extends OutputCommitter {
  35.   private static final Log LOG = LogFactory.getLog(FileOutputCommitter.class);
  36.   /**
  37.    * Temporary directory name 
  38.    */
  39.   protected static final String TEMP_DIR_NAME = "_temporary";
  40.   private FileSystem outputFileSystem = null;
  41.   private Path outputPath = null;
  42.   private Path workPath = null;
  43.   /**
  44.    * Create a file output committer
  45.    * @param outputPath the job's output path
  46.    * @param context the task's context
  47.    * @throws IOException
  48.    */
  49.   public FileOutputCommitter(Path outputPath, 
  50.                              TaskAttemptContext context) throws IOException {
  51.     if (outputPath != null) {
  52.       this.outputPath = outputPath;
  53.       outputFileSystem = outputPath.getFileSystem(context.getConfiguration());
  54.       workPath = new Path(outputPath,
  55.                           (FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
  56.                            "_" + context.getTaskAttemptID().toString()
  57.                            )).makeQualified(outputFileSystem);
  58.     }
  59.   }
  60.   /**
  61.    * Create the temporary directory that is the root of all of the task 
  62.    * work directories.
  63.    * @param context the job's context
  64.    */
  65.   public void setupJob(JobContext context) throws IOException {
  66.     if (outputPath != null) {
  67.       Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
  68.       FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
  69.       if (!fileSys.mkdirs(tmpDir)) {
  70.         LOG.error("Mkdirs failed to create " + tmpDir.toString());
  71.       }
  72.     }
  73.   }
  74.   /**
  75.    * Delete the temporary directory, including all of the work directories.
  76.    * @param context the job's context
  77.    */
  78.   public void cleanupJob(JobContext context) throws IOException {
  79.     if (outputPath != null) {
  80.       Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
  81.       FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
  82.       if (fileSys.exists(tmpDir)) {
  83.         fileSys.delete(tmpDir, true);
  84.       }
  85.     }
  86.   }
  87.   /**
  88.    * No task setup required.
  89.    */
  90.   @Override
  91.   public void setupTask(TaskAttemptContext context) throws IOException {
  92.     // FileOutputCommitter's setupTask doesn't do anything. Because the
  93.     // temporary task directory is created on demand when the 
  94.     // task is writing.
  95.   }
  96.   /**
  97.    * Move the files from the work directory to the job output directory
  98.    * @param context the task context
  99.    */
  100.   public void commitTask(TaskAttemptContext context) 
  101.   throws IOException {
  102.     TaskAttemptID attemptId = context.getTaskAttemptID();
  103.     if (workPath != null) {
  104.       context.progress();
  105.       if (outputFileSystem.exists(workPath)) {
  106.         // Move the task outputs to their final place
  107.         moveTaskOutputs(context, outputFileSystem, outputPath, workPath);
  108.         // Delete the temporary task-specific output directory
  109.         if (!outputFileSystem.delete(workPath, true)) {
  110.           LOG.warn("Failed to delete the temporary output" + 
  111.           " directory of task: " + attemptId + " - " + workPath);
  112.         }
  113.         LOG.info("Saved output of task '" + attemptId + "' to " + 
  114.                  outputPath);
  115.       }
  116.     }
  117.   }
  118.   /**
  119.    * Move all of the files from the work directory to the final output
  120.    * @param context the task context
  121.    * @param fs the output file system
  122.    * @param jobOutputDir the final output direcotry
  123.    * @param taskOutput the work path
  124.    * @throws IOException
  125.    */
  126.   private void moveTaskOutputs(TaskAttemptContext context,
  127.                                FileSystem fs,
  128.                                Path jobOutputDir,
  129.                                Path taskOutput) 
  130.   throws IOException {
  131.     TaskAttemptID attemptId = context.getTaskAttemptID();
  132.     context.progress();
  133.     if (fs.isFile(taskOutput)) {
  134.       Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput, 
  135.                                           workPath);
  136.       if (!fs.rename(taskOutput, finalOutputPath)) {
  137.         if (!fs.delete(finalOutputPath, true)) {
  138.           throw new IOException("Failed to delete earlier output of task: " + 
  139.                                  attemptId);
  140.         }
  141.         if (!fs.rename(taskOutput, finalOutputPath)) {
  142.           throw new IOException("Failed to save output of task: " + 
  143.            attemptId);
  144.         }
  145.       }
  146.       LOG.debug("Moved " + taskOutput + " to " + finalOutputPath);
  147.     } else if(fs.getFileStatus(taskOutput).isDir()) {
  148.       FileStatus[] paths = fs.listStatus(taskOutput);
  149.       Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput, workPath);
  150.       fs.mkdirs(finalOutputPath);
  151.       if (paths != null) {
  152.         for (FileStatus path : paths) {
  153.           moveTaskOutputs(context, fs, jobOutputDir, path.getPath());
  154.         }
  155.       }
  156.     }
  157.   }
  158.   /**
  159.    * Delete the work directory
  160.    */
  161.   @Override
  162.   public void abortTask(TaskAttemptContext context) {
  163.     try {
  164.       if (workPath != null) { 
  165.         context.progress();
  166.         outputFileSystem.delete(workPath, true);
  167.       }
  168.     } catch (IOException ie) {
  169.       LOG.warn("Error discarding output" + StringUtils.stringifyException(ie));
  170.     }
  171.   }
  172.   /**
  173.    * Find the final name of a given output file, given the job output directory
  174.    * and the work directory.
  175.    * @param jobOutputDir the job's output directory
  176.    * @param taskOutput the specific task output file
  177.    * @param taskOutputPath the job's work directory
  178.    * @return the final path for the specific output file
  179.    * @throws IOException
  180.    */
  181.   private Path getFinalPath(Path jobOutputDir, Path taskOutput, 
  182.                             Path taskOutputPath) throws IOException {
  183.     URI taskOutputUri = taskOutput.toUri();
  184.     URI relativePath = taskOutputPath.toUri().relativize(taskOutputUri);
  185.     if (taskOutputUri == relativePath) {
  186.       throw new IOException("Can not get the relative path: base = " + 
  187.           taskOutputPath + " child = " + taskOutput);
  188.     }
  189.     if (relativePath.getPath().length() > 0) {
  190.       return new Path(jobOutputDir, relativePath.getPath());
  191.     } else {
  192.       return jobOutputDir;
  193.     }
  194.   }
  195.   /**
  196.    * Did this task write any files in the work directory?
  197.    * @param context the task's context
  198.    */
  199.   @Override
  200.   public boolean needsTaskCommit(TaskAttemptContext context
  201.                                  ) throws IOException {
  202.     return workPath != null && outputFileSystem.exists(workPath);
  203.   }
  204.   /**
  205.    * Get the directory that the task should write results into
  206.    * @return the work directory
  207.    * @throws IOException
  208.    */
  209.   public Path getWorkPath() throws IOException {
  210.     return workPath;
  211.   }
  212. }