FileOutputFormat.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:11k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapreduce.lib.output;
  19. import java.io.IOException;
  20. import java.text.NumberFormat;
  21. import org.apache.hadoop.conf.Configuration;
  22. import org.apache.hadoop.fs.FileSystem;
  23. import org.apache.hadoop.fs.Path;
  24. import org.apache.hadoop.io.compress.CompressionCodec;
  25. import org.apache.hadoop.mapred.FileAlreadyExistsException;
  26. import org.apache.hadoop.mapred.InvalidJobConfException;
  27. import org.apache.hadoop.mapreduce.Job;
  28. import org.apache.hadoop.mapreduce.JobContext;
  29. import org.apache.hadoop.mapreduce.OutputCommitter;
  30. import org.apache.hadoop.mapreduce.OutputFormat;
  31. import org.apache.hadoop.mapreduce.RecordWriter;
  32. import org.apache.hadoop.mapreduce.TaskAttemptContext;
  33. import org.apache.hadoop.mapreduce.TaskID;
  34. import org.apache.hadoop.mapreduce.TaskInputOutputContext;
  35. /** A base class for {@link OutputFormat}s that read from {@link FileSystem}s.*/
  36. public abstract class FileOutputFormat<K, V> extends OutputFormat<K, V> {
  37.   /** Construct output file names so that, when an output directory listing is
  38.    * sorted lexicographically, positions correspond to output partitions.*/
  39.   private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
  40.   static {
  41.     NUMBER_FORMAT.setMinimumIntegerDigits(5);
  42.     NUMBER_FORMAT.setGroupingUsed(false);
  43.   }
  44.   private FileOutputCommitter committer = null;
  45.   /**
  46.    * Set whether the output of the job is compressed.
  47.    * @param job the job to modify
  48.    * @param compress should the output of the job be compressed?
  49.    */
  50.   public static void setCompressOutput(Job job, boolean compress) {
  51.     job.getConfiguration().setBoolean("mapred.output.compress", compress);
  52.   }
  53.   
  54.   /**
  55.    * Is the job output compressed?
  56.    * @param job the Job to look in
  57.    * @return <code>true</code> if the job output should be compressed,
  58.    *         <code>false</code> otherwise
  59.    */
  60.   public static boolean getCompressOutput(JobContext job) {
  61.     return job.getConfiguration().getBoolean("mapred.output.compress", false);
  62.   }
  63.   
  64.   /**
  65.    * Set the {@link CompressionCodec} to be used to compress job outputs.
  66.    * @param job the job to modify
  67.    * @param codecClass the {@link CompressionCodec} to be used to
  68.    *                   compress the job outputs
  69.    */
  70.   public static void 
  71.   setOutputCompressorClass(Job job, 
  72.                            Class<? extends CompressionCodec> codecClass) {
  73.     setCompressOutput(job, true);
  74.     job.getConfiguration().setClass("mapred.output.compression.codec", 
  75.                                     codecClass, 
  76.                                     CompressionCodec.class);
  77.   }
  78.   
  79.   /**
  80.    * Get the {@link CompressionCodec} for compressing the job outputs.
  81.    * @param job the {@link Job} to look in
  82.    * @param defaultValue the {@link CompressionCodec} to return if not set
  83.    * @return the {@link CompressionCodec} to be used to compress the 
  84.    *         job outputs
  85.    * @throws IllegalArgumentException if the class was specified, but not found
  86.    */
  87.   public static Class<? extends CompressionCodec> 
  88.   getOutputCompressorClass(JobContext job, 
  89.                        Class<? extends CompressionCodec> defaultValue) {
  90.     Class<? extends CompressionCodec> codecClass = defaultValue;
  91.     Configuration conf = job.getConfiguration();
  92.     String name = conf.get("mapred.output.compression.codec");
  93.     if (name != null) {
  94.       try {
  95.         codecClass = 
  96.          conf.getClassByName(name).asSubclass(CompressionCodec.class);
  97.       } catch (ClassNotFoundException e) {
  98.         throw new IllegalArgumentException("Compression codec " + name + 
  99.                                            " was not found.", e);
  100.       }
  101.     }
  102.     return codecClass;
  103.   }
  104.   
  105.   public abstract RecordWriter<K, V> 
  106.      getRecordWriter(TaskAttemptContext job
  107.                      ) throws IOException, InterruptedException;
  108.   public void checkOutputSpecs(JobContext job
  109.                                ) throws FileAlreadyExistsException, IOException{
  110.     // Ensure that the output directory is set and not already there
  111.     Path outDir = getOutputPath(job);
  112.     if (outDir == null) {
  113.       throw new InvalidJobConfException("Output directory not set.");
  114.     }
  115.     if (outDir.getFileSystem(job.getConfiguration()).exists(outDir)) {
  116.       throw new FileAlreadyExistsException("Output directory " + outDir + 
  117.                                            " already exists");
  118.     }
  119.   }
  120.   /**
  121.    * Set the {@link Path} of the output directory for the map-reduce job.
  122.    *
  123.    * @param job The job to modify
  124.    * @param outputDir the {@link Path} of the output directory for 
  125.    * the map-reduce job.
  126.    */
  127.   public static void setOutputPath(Job job, Path outputDir) {
  128.     job.getConfiguration().set("mapred.output.dir", outputDir.toString());
  129.   }
  130.   /**
  131.    * Get the {@link Path} to the output directory for the map-reduce job.
  132.    * 
  133.    * @return the {@link Path} to the output directory for the map-reduce job.
  134.    * @see FileOutputFormat#getWorkOutputPath(TaskInputOutputContext)
  135.    */
  136.   public static Path getOutputPath(JobContext job) {
  137.     String name = job.getConfiguration().get("mapred.output.dir");
  138.     return name == null ? null: new Path(name);
  139.   }
  140.   
  141.   /**
  142.    *  Get the {@link Path} to the task's temporary output directory 
  143.    *  for the map-reduce job
  144.    *  
  145.    * <h4 id="SideEffectFiles">Tasks' Side-Effect Files</h4>
  146.    * 
  147.    * <p>Some applications need to create/write-to side-files, which differ from
  148.    * the actual job-outputs.
  149.    * 
  150.    * <p>In such cases there could be issues with 2 instances of the same TIP 
  151.    * (running simultaneously e.g. speculative tasks) trying to open/write-to the
  152.    * same file (path) on HDFS. Hence the application-writer will have to pick 
  153.    * unique names per task-attempt (e.g. using the attemptid, say 
  154.    * <tt>attempt_200709221812_0001_m_000000_0</tt>), not just per TIP.</p> 
  155.    * 
  156.    * <p>To get around this the Map-Reduce framework helps the application-writer 
  157.    * out by maintaining a special 
  158.    * <tt>${mapred.output.dir}/_temporary/_${taskid}</tt> 
  159.    * sub-directory for each task-attempt on HDFS where the output of the 
  160.    * task-attempt goes. On successful completion of the task-attempt the files 
  161.    * in the <tt>${mapred.output.dir}/_temporary/_${taskid}</tt> (only) 
  162.    * are <i>promoted</i> to <tt>${mapred.output.dir}</tt>. Of course, the 
  163.    * framework discards the sub-directory of unsuccessful task-attempts. This 
  164.    * is completely transparent to the application.</p>
  165.    * 
  166.    * <p>The application-writer can take advantage of this by creating any 
  167.    * side-files required in a work directory during execution 
  168.    * of his task i.e. via 
  169.    * {@link #getWorkOutputPath(TaskInputOutputContext)}, and
  170.    * the framework will move them out similarly - thus she doesn't have to pick 
  171.    * unique paths per task-attempt.</p>
  172.    * 
  173.    * <p>The entire discussion holds true for maps of jobs with 
  174.    * reducer=NONE (i.e. 0 reduces) since output of the map, in that case, 
  175.    * goes directly to HDFS.</p> 
  176.    * 
  177.    * @return the {@link Path} to the task's temporary output directory 
  178.    * for the map-reduce job.
  179.    */
  180.   public static Path getWorkOutputPath(TaskInputOutputContext<?,?,?,?> context
  181.                                        ) throws IOException, 
  182.                                                 InterruptedException {
  183.     FileOutputCommitter committer = (FileOutputCommitter) 
  184.       context.getOutputCommitter();
  185.     return committer.getWorkPath();
  186.   }
  187.   /**
  188.    * Helper function to generate a {@link Path} for a file that is unique for
  189.    * the task within the job output directory.
  190.    *
  191.    * <p>The path can be used to create custom files from within the map and
  192.    * reduce tasks. The path name will be unique for each task. The path parent
  193.    * will be the job output directory.</p>ls
  194.    *
  195.    * <p>This method uses the {@link #getUniqueFile} method to make the file name
  196.    * unique for the task.</p>
  197.    *
  198.    * @param context the context for the task.
  199.    * @param name the name for the file.
  200.    * @param extension the extension for the file
  201.    * @return a unique path accross all tasks of the job.
  202.    */
  203.   public 
  204.   static Path getPathForWorkFile(TaskInputOutputContext<?,?,?,?> context, 
  205.                                  String name,
  206.                                  String extension
  207.                                 ) throws IOException, InterruptedException {
  208.     return new Path(getWorkOutputPath(context),
  209.                     getUniqueFile(context, name, extension));
  210.   }
  211.   /**
  212.    * Generate a unique filename, based on the task id, name, and extension
  213.    * @param context the task that is calling this
  214.    * @param name the base filename
  215.    * @param extension the filename extension
  216.    * @return a string like $name-[mr]-$id$extension
  217.    */
  218.   public synchronized static String getUniqueFile(TaskAttemptContext context,
  219.                                                   String name,
  220.                                                   String extension) {
  221.     TaskID taskId = context.getTaskAttemptID().getTaskID();
  222.     int partition = taskId.getId();
  223.     StringBuilder result = new StringBuilder();
  224.     result.append(name);
  225.     result.append('-');
  226.     result.append(taskId.isMap() ? 'm' : 'r');
  227.     result.append('-');
  228.     result.append(NUMBER_FORMAT.format(partition));
  229.     result.append(extension);
  230.     return result.toString();
  231.   }
  232.   /**
  233.    * Get the default path and filename for the output format.
  234.    * @param context the task context
  235.    * @param extension an extension to add to the filename
  236.    * @return a full path $output/_temporary/$taskid/part-[mr]-$id
  237.    * @throws IOException
  238.    */
  239.   public Path getDefaultWorkFile(TaskAttemptContext context,
  240.                                  String extension) throws IOException{
  241.     FileOutputCommitter committer = 
  242.       (FileOutputCommitter) getOutputCommitter(context);
  243.     return new Path(committer.getWorkPath(), getUniqueFile(context, "part", 
  244.                                                            extension));
  245.   }
  246.   public synchronized 
  247.      OutputCommitter getOutputCommitter(TaskAttemptContext context
  248.                                         ) throws IOException {
  249.     if (committer == null) {
  250.       Path output = getOutputPath(context);
  251.       committer = new FileOutputCommitter(output, context);
  252.     }
  253.     return committer;
  254.   }
  255. }