FileOutputCommitter.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:8k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.IOException;
  20. import java.net.URI;
  21. import org.apache.commons.logging.Log;
  22. import org.apache.commons.logging.LogFactory;
  23. import org.apache.hadoop.fs.FileStatus;
  24. import org.apache.hadoop.fs.FileSystem;
  25. import org.apache.hadoop.fs.Path;
  26. import org.apache.hadoop.util.StringUtils;
  27. /** An {@link OutputCommitter} that commits files specified 
  28.  * in job output directory i.e. ${mapred.output.dir}. 
  29.  **/
  30. public class FileOutputCommitter extends OutputCommitter {
  31.   public static final Log LOG = LogFactory.getLog(
  32.       "org.apache.hadoop.mapred.FileOutputCommitter");
  33. /**
  34.    * Temporary directory name 
  35.    */
  36.   public static final String TEMP_DIR_NAME = "_temporary";
  37.   public void setupJob(JobContext context) throws IOException {
  38.     JobConf conf = context.getJobConf();
  39.     Path outputPath = FileOutputFormat.getOutputPath(conf);
  40.     if (outputPath != null) {
  41.       Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
  42.       FileSystem fileSys = tmpDir.getFileSystem(conf);
  43.       if (!fileSys.mkdirs(tmpDir)) {
  44.         LOG.error("Mkdirs failed to create " + tmpDir.toString());
  45.       }
  46.     }
  47.   }
  48.   public void cleanupJob(JobContext context) throws IOException {
  49.     JobConf conf = context.getJobConf();
  50.     // do the clean up of temporary directory
  51.     Path outputPath = FileOutputFormat.getOutputPath(conf);
  52.     if (outputPath != null) {
  53.       Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
  54.       FileSystem fileSys = tmpDir.getFileSystem(conf);
  55.       context.getProgressible().progress();
  56.       if (fileSys.exists(tmpDir)) {
  57.         fileSys.delete(tmpDir, true);
  58.       }
  59.     }
  60.   }
  61.   public void setupTask(TaskAttemptContext context) throws IOException {
  62.     // FileOutputCommitter's setupTask doesn't do anything. Because the
  63.     // temporary task directory is created on demand when the 
  64.     // task is writing.
  65.   }
  66.   
  67.   public void commitTask(TaskAttemptContext context) 
  68.   throws IOException {
  69.     Path taskOutputPath = getTempTaskOutputPath(context);
  70.     TaskAttemptID attemptId = context.getTaskAttemptID();
  71.     JobConf job = context.getJobConf();
  72.     if (taskOutputPath != null) {
  73.       FileSystem fs = taskOutputPath.getFileSystem(job);
  74.       context.getProgressible().progress();
  75.       if (fs.exists(taskOutputPath)) {
  76.         Path jobOutputPath = taskOutputPath.getParent().getParent();
  77.         // Move the task outputs to their final place
  78.         moveTaskOutputs(context, fs, jobOutputPath, taskOutputPath);
  79.         // Delete the temporary task-specific output directory
  80.         if (!fs.delete(taskOutputPath, true)) {
  81.           LOG.info("Failed to delete the temporary output" + 
  82.           " directory of task: " + attemptId + " - " + taskOutputPath);
  83.         }
  84.         LOG.info("Saved output of task '" + attemptId + "' to " + 
  85.                  jobOutputPath);
  86.       }
  87.     }
  88.   }
  89.   
  90.   private void moveTaskOutputs(TaskAttemptContext context,
  91.                                FileSystem fs,
  92.                                Path jobOutputDir,
  93.                                Path taskOutput) 
  94.   throws IOException {
  95.     TaskAttemptID attemptId = context.getTaskAttemptID();
  96.     context.getProgressible().progress();
  97.     if (fs.isFile(taskOutput)) {
  98.       Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput, 
  99.                                           getTempTaskOutputPath(context));
  100.       if (!fs.rename(taskOutput, finalOutputPath)) {
  101.         if (!fs.delete(finalOutputPath, true)) {
  102.           throw new IOException("Failed to delete earlier output of task: " + 
  103.                                  attemptId);
  104.         }
  105.         if (!fs.rename(taskOutput, finalOutputPath)) {
  106.           throw new IOException("Failed to save output of task: " + 
  107.            attemptId);
  108.         }
  109.       }
  110.       LOG.debug("Moved " + taskOutput + " to " + finalOutputPath);
  111.     } else if(fs.getFileStatus(taskOutput).isDir()) {
  112.       FileStatus[] paths = fs.listStatus(taskOutput);
  113.       Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput, 
  114.           getTempTaskOutputPath(context));
  115.       fs.mkdirs(finalOutputPath);
  116.       if (paths != null) {
  117.         for (FileStatus path : paths) {
  118.           moveTaskOutputs(context, fs, jobOutputDir, path.getPath());
  119.         }
  120.       }
  121.     }
  122.   }
  123.   public void abortTask(TaskAttemptContext context) throws IOException {
  124.     Path taskOutputPath =  getTempTaskOutputPath(context);
  125.     try {
  126.       if (taskOutputPath != null) {
  127.         FileSystem fs = taskOutputPath.getFileSystem(context.getJobConf());
  128.         context.getProgressible().progress();
  129.         fs.delete(taskOutputPath, true);
  130.       }
  131.     } catch (IOException ie) {
  132.       LOG.warn("Error discarding output" + StringUtils.stringifyException(ie));
  133.     }
  134.   }
  135.   private Path getFinalPath(Path jobOutputDir, Path taskOutput, 
  136.                             Path taskOutputPath) throws IOException {
  137.     URI taskOutputUri = taskOutput.toUri();
  138.     URI relativePath = taskOutputPath.toUri().relativize(taskOutputUri);
  139.     if (taskOutputUri == relativePath) {//taskOutputPath is not a parent of taskOutput
  140.       throw new IOException("Can not get the relative path: base = " + 
  141.           taskOutputPath + " child = " + taskOutput);
  142.     }
  143.     if (relativePath.getPath().length() > 0) {
  144.       return new Path(jobOutputDir, relativePath.getPath());
  145.     } else {
  146.       return jobOutputDir;
  147.     }
  148.   }
  149.   public boolean needsTaskCommit(TaskAttemptContext context) 
  150.   throws IOException {
  151.     try {
  152.       Path taskOutputPath = getTempTaskOutputPath(context);
  153.       if (taskOutputPath != null) {
  154.         context.getProgressible().progress();
  155.         // Get the file-system for the task output directory
  156.         FileSystem fs = taskOutputPath.getFileSystem(context.getJobConf());
  157.         // since task output path is created on demand, 
  158.         // if it exists, task needs a commit
  159.         if (fs.exists(taskOutputPath)) {
  160.           return true;
  161.         }
  162.       }
  163.     } catch (IOException  ioe) {
  164.       throw ioe;
  165.     }
  166.     return false;
  167.   }
  168.   Path getTempTaskOutputPath(TaskAttemptContext taskContext) {
  169.     JobConf conf = taskContext.getJobConf();
  170.     Path outputPath = FileOutputFormat.getOutputPath(conf);
  171.     if (outputPath != null) {
  172.       Path p = new Path(outputPath,
  173.                      (FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
  174.                       "_" + taskContext.getTaskAttemptID().toString()));
  175.       try {
  176.         FileSystem fs = p.getFileSystem(conf);
  177.         return p.makeQualified(fs);
  178.       } catch (IOException ie) {
  179.         LOG.warn(StringUtils .stringifyException(ie));
  180.         return p;
  181.       }
  182.     }
  183.     return null;
  184.   }
  185.   
  186.   Path getWorkPath(TaskAttemptContext taskContext, Path basePath) 
  187.   throws IOException {
  188.     // ${mapred.out.dir}/_temporary
  189.     Path jobTmpDir = new Path(basePath, FileOutputCommitter.TEMP_DIR_NAME);
  190.     FileSystem fs = jobTmpDir.getFileSystem(taskContext.getJobConf());
  191.     if (!fs.exists(jobTmpDir)) {
  192.       throw new IOException("The temporary job-output directory " + 
  193.           jobTmpDir.toString() + " doesn't exist!"); 
  194.     }
  195.     // ${mapred.out.dir}/_temporary/_${taskid}
  196.     String taskid = taskContext.getTaskAttemptID().toString();
  197.     Path taskTmpDir = new Path(jobTmpDir, "_" + taskid);
  198.     if (!fs.mkdirs(taskTmpDir)) {
  199.       throw new IOException("Mkdirs failed to create " 
  200.           + taskTmpDir.toString());
  201.     }
  202.     return taskTmpDir;
  203.   }
  204. }